[WIP] client action streaming

This commit is contained in:
2026-04-01 20:23:56 -05:00
parent 4b525f4641
commit 104dcbb1b1
14 changed files with 1152 additions and 828 deletions

0
.codex Normal file
View File

1
Cargo.lock generated
View File

@@ -528,6 +528,7 @@ dependencies = [
"mockito", "mockito",
"predicates", "predicates",
"reqwest 0.13.2", "reqwest 0.13.2",
"reqwest-eventsource",
"serde", "serde",
"serde_json", "serde_json",
"serde_yaml_ng", "serde_yaml_ng",

View File

@@ -2,6 +2,7 @@
use axum::{ use axum::{
extract::{Path, Query, State}, extract::{Path, Query, State},
http::HeaderMap,
http::StatusCode, http::StatusCode,
response::{ response::{
sse::{Event, KeepAlive, Sse}, sse::{Event, KeepAlive, Sse},
@@ -13,6 +14,7 @@ use axum::{
use chrono::Utc; use chrono::Utc;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
use attune_common::models::enums::ExecutionStatus; use attune_common::models::enums::ExecutionStatus;
@@ -32,7 +34,10 @@ use attune_common::workflow::{CancellationPolicy, WorkflowDefinition};
use sqlx::Row; use sqlx::Row;
use crate::{ use crate::{
auth::middleware::RequireAuth, auth::{
jwt::{validate_token, Claims, JwtConfig, TokenType},
middleware::{AuthenticatedUser, RequireAuth},
},
authz::{AuthorizationCheck, AuthorizationService}, authz::{AuthorizationCheck, AuthorizationService},
dto::{ dto::{
common::{PaginatedResponse, PaginationParams}, common::{PaginatedResponse, PaginationParams},
@@ -46,6 +51,9 @@ use crate::{
}; };
use attune_common::rbac::{Action, AuthorizationContext, Resource}; use attune_common::rbac::{Action, AuthorizationContext, Resource};
const LOG_STREAM_POLL_INTERVAL: Duration = Duration::from_millis(250);
const LOG_STREAM_READ_CHUNK_SIZE: usize = 64 * 1024;
/// Create a new execution (manual execution) /// Create a new execution (manual execution)
/// ///
/// This endpoint allows directly executing an action without a trigger or rule. /// This endpoint allows directly executing an action without a trigger or rule.
@@ -925,6 +933,398 @@ pub async fn stream_execution_updates(
Ok(Sse::new(filtered_stream).keep_alive(KeepAlive::default())) Ok(Sse::new(filtered_stream).keep_alive(KeepAlive::default()))
} }
#[derive(serde::Deserialize)]
pub struct StreamExecutionLogParams {
pub token: Option<String>,
pub offset: Option<u64>,
}
#[derive(Clone, Copy)]
enum ExecutionLogStream {
Stdout,
Stderr,
}
impl ExecutionLogStream {
fn parse(name: &str) -> Result<Self, ApiError> {
match name {
"stdout" => Ok(Self::Stdout),
"stderr" => Ok(Self::Stderr),
_ => Err(ApiError::BadRequest(format!(
"Unsupported log stream '{}'. Expected 'stdout' or 'stderr'.",
name
))),
}
}
fn file_name(self) -> &'static str {
match self {
Self::Stdout => "stdout.log",
Self::Stderr => "stderr.log",
}
}
}
enum ExecutionLogTailState {
WaitingForFile {
full_path: std::path::PathBuf,
execution_id: i64,
},
SendInitial {
full_path: std::path::PathBuf,
execution_id: i64,
offset: u64,
pending_utf8: Vec<u8>,
},
Tail {
full_path: std::path::PathBuf,
execution_id: i64,
offset: u64,
idle_polls: u32,
pending_utf8: Vec<u8>,
},
Finished,
}
/// Stream stdout/stderr for an execution as SSE.
///
/// This tails the worker's live log files directly from the shared artifacts
/// volume. The file may not exist yet when the worker has not emitted any
/// output, so the stream waits briefly for it to appear.
#[utoipa::path(
get,
path = "/api/v1/executions/{id}/logs/{stream}/stream",
tag = "executions",
params(
("id" = i64, Path, description = "Execution ID"),
("stream" = String, Path, description = "Log stream name: stdout or stderr"),
("token" = String, Query, description = "JWT access token for authentication"),
),
responses(
(status = 200, description = "SSE stream of execution log content", content_type = "text/event-stream"),
(status = 401, description = "Unauthorized"),
(status = 404, description = "Execution not found"),
),
)]
pub async fn stream_execution_log(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path((id, stream_name)): Path<(i64, String)>,
Query(params): Query<StreamExecutionLogParams>,
user: Result<RequireAuth, crate::auth::middleware::AuthError>,
) -> Result<Sse<impl Stream<Item = Result<Event, std::convert::Infallible>>>, ApiError> {
let authenticated_user =
authenticate_execution_log_stream_user(&state, &headers, user, params.token.as_deref())?;
validate_execution_log_stream_user(&authenticated_user, id)?;
let execution = ExecutionRepository::find_by_id(&state.db, id)
.await?
.ok_or_else(|| ApiError::NotFound(format!("Execution with ID {} not found", id)))?;
authorize_execution_log_stream(&state, &authenticated_user, &execution).await?;
let stream_name = ExecutionLogStream::parse(&stream_name)?;
let full_path = std::path::PathBuf::from(&state.config.artifacts_dir)
.join(format!("execution_{}", id))
.join(stream_name.file_name());
let db = state.db.clone();
let initial_state = ExecutionLogTailState::WaitingForFile {
full_path,
execution_id: id,
};
let start_offset = params.offset.unwrap_or(0);
let stream = futures::stream::unfold(initial_state, move |state| {
let db = db.clone();
async move {
match state {
ExecutionLogTailState::Finished => None,
ExecutionLogTailState::WaitingForFile {
full_path,
execution_id,
} => {
if full_path.exists() {
Some((
Ok(Event::default().event("waiting").data("Log file found")),
ExecutionLogTailState::SendInitial {
full_path,
execution_id,
offset: start_offset,
pending_utf8: Vec::new(),
},
))
} else if execution_log_execution_terminal(&db, execution_id).await {
Some((
Ok(Event::default().event("done").data("")),
ExecutionLogTailState::Finished,
))
} else {
tokio::time::sleep(LOG_STREAM_POLL_INTERVAL).await;
Some((
Ok(Event::default()
.event("waiting")
.data("Waiting for log output")),
ExecutionLogTailState::WaitingForFile {
full_path,
execution_id,
},
))
}
}
ExecutionLogTailState::SendInitial {
full_path,
execution_id,
offset,
pending_utf8,
} => {
let pending_utf8_on_empty = pending_utf8.clone();
match read_log_chunk(
&full_path,
offset,
LOG_STREAM_READ_CHUNK_SIZE,
pending_utf8,
)
.await
{
Some((content, new_offset, pending_utf8)) => Some((
Ok(Event::default()
.id(new_offset.to_string())
.event("content")
.data(content)),
ExecutionLogTailState::SendInitial {
full_path,
execution_id,
offset: new_offset,
pending_utf8,
},
)),
None => Some((
Ok(Event::default().comment("initial-catchup-complete")),
ExecutionLogTailState::Tail {
full_path,
execution_id,
offset,
idle_polls: 0,
pending_utf8: pending_utf8_on_empty,
},
)),
}
}
ExecutionLogTailState::Tail {
full_path,
execution_id,
offset,
idle_polls,
pending_utf8,
} => {
let pending_utf8_on_empty = pending_utf8.clone();
match read_log_chunk(
&full_path,
offset,
LOG_STREAM_READ_CHUNK_SIZE,
pending_utf8,
)
.await
{
Some((append, new_offset, pending_utf8)) => Some((
Ok(Event::default()
.id(new_offset.to_string())
.event("append")
.data(append)),
ExecutionLogTailState::Tail {
full_path,
execution_id,
offset: new_offset,
idle_polls: 0,
pending_utf8,
},
)),
None => {
let terminal =
execution_log_execution_terminal(&db, execution_id).await;
if terminal && idle_polls >= 2 {
Some((
Ok(Event::default().event("done").data("Execution complete")),
ExecutionLogTailState::Finished,
))
} else {
tokio::time::sleep(LOG_STREAM_POLL_INTERVAL).await;
Some((
Ok(Event::default()
.event("waiting")
.data("Waiting for log output")),
ExecutionLogTailState::Tail {
full_path,
execution_id,
offset,
idle_polls: idle_polls + 1,
pending_utf8: pending_utf8_on_empty,
},
))
}
}
}
}
}
}
});
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}
async fn read_log_chunk(
path: &std::path::Path,
offset: u64,
max_bytes: usize,
mut pending_utf8: Vec<u8>,
) -> Option<(String, u64, Vec<u8>)> {
use tokio::io::{AsyncReadExt, AsyncSeekExt};
let mut file = tokio::fs::File::open(path).await.ok()?;
let metadata = file.metadata().await.ok()?;
if metadata.len() <= offset {
return None;
}
file.seek(std::io::SeekFrom::Start(offset)).await.ok()?;
let bytes_to_read = ((metadata.len() - offset) as usize).min(max_bytes);
let mut buf = vec![0u8; bytes_to_read];
let read = file.read(&mut buf).await.ok()?;
buf.truncate(read);
if buf.is_empty() {
return None;
}
pending_utf8.extend_from_slice(&buf);
let (content, pending_utf8) = decode_utf8_chunk(pending_utf8);
Some((content, offset + read as u64, pending_utf8))
}
async fn execution_log_execution_terminal(db: &sqlx::PgPool, execution_id: i64) -> bool {
match ExecutionRepository::find_by_id(db, execution_id).await {
Ok(Some(execution)) => matches!(
execution.status,
ExecutionStatus::Completed
| ExecutionStatus::Failed
| ExecutionStatus::Cancelled
| ExecutionStatus::Timeout
| ExecutionStatus::Abandoned
),
_ => true,
}
}
fn decode_utf8_chunk(mut bytes: Vec<u8>) -> (String, Vec<u8>) {
match std::str::from_utf8(&bytes) {
Ok(valid) => (valid.to_string(), Vec::new()),
Err(err) if err.error_len().is_none() => {
let pending = bytes.split_off(err.valid_up_to());
(String::from_utf8_lossy(&bytes).into_owned(), pending)
}
Err(_) => (String::from_utf8_lossy(&bytes).into_owned(), Vec::new()),
}
}
async fn authorize_execution_log_stream(
state: &Arc<AppState>,
user: &AuthenticatedUser,
execution: &attune_common::models::Execution,
) -> Result<(), ApiError> {
if user.claims.token_type != TokenType::Access {
return Ok(());
}
let identity_id = user
.identity_id()
.map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?;
let authz = AuthorizationService::new(state.db.clone());
let mut ctx = AuthorizationContext::new(identity_id);
ctx.target_id = Some(execution.id);
ctx.target_ref = Some(execution.action_ref.clone());
authz
.authorize(
user,
AuthorizationCheck {
resource: Resource::Executions,
action: Action::Read,
context: ctx,
},
)
.await
}
fn authenticate_execution_log_stream_user(
state: &Arc<AppState>,
headers: &HeaderMap,
user: Result<RequireAuth, crate::auth::middleware::AuthError>,
query_token: Option<&str>,
) -> Result<AuthenticatedUser, ApiError> {
match user {
Ok(RequireAuth(user)) => Ok(user),
Err(_) => {
if let Some(user) = crate::auth::oidc::cookie_authenticated_user(headers, state)? {
return Ok(user);
}
let token = query_token.ok_or(ApiError::Unauthorized(
"Missing authentication token".to_string(),
))?;
authenticate_execution_log_stream_query_token(token, &state.jwt_config)
}
}
}
fn authenticate_execution_log_stream_query_token(
token: &str,
jwt_config: &JwtConfig,
) -> Result<AuthenticatedUser, ApiError> {
let claims = validate_token(token, jwt_config)
.map_err(|_| ApiError::Unauthorized("Invalid authentication token".to_string()))?;
Ok(AuthenticatedUser { claims })
}
fn validate_execution_log_stream_user(
user: &AuthenticatedUser,
execution_id: i64,
) -> Result<(), ApiError> {
let claims = &user.claims;
match claims.token_type {
TokenType::Access => Ok(()),
TokenType::Execution => validate_execution_token_scope(claims, execution_id),
TokenType::Sensor | TokenType::Refresh => Err(ApiError::Unauthorized(
"Invalid authentication token".to_string(),
)),
}
}
fn validate_execution_token_scope(claims: &Claims, execution_id: i64) -> Result<(), ApiError> {
if claims.scope.as_deref() != Some("execution") {
return Err(ApiError::Unauthorized(
"Invalid authentication token".to_string(),
));
}
let token_execution_id = claims
.metadata
.as_ref()
.and_then(|metadata| metadata.get("execution_id"))
.and_then(|value| value.as_i64())
.ok_or_else(|| ApiError::Unauthorized("Invalid authentication token".to_string()))?;
if token_execution_id != execution_id {
return Err(ApiError::Forbidden(format!(
"Execution token is not valid for execution {}",
execution_id
)));
}
Ok(())
}
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
pub struct StreamExecutionParams { pub struct StreamExecutionParams {
pub execution_id: Option<i64>, pub execution_id: Option<i64>,
@@ -937,6 +1337,10 @@ pub fn routes() -> Router<Arc<AppState>> {
.route("/executions/execute", axum::routing::post(create_execution)) .route("/executions/execute", axum::routing::post(create_execution))
.route("/executions/stats", get(get_execution_stats)) .route("/executions/stats", get(get_execution_stats))
.route("/executions/stream", get(stream_execution_updates)) .route("/executions/stream", get(stream_execution_updates))
.route(
"/executions/{id}/logs/{stream}/stream",
get(stream_execution_log),
)
.route("/executions/{id}", get(get_execution)) .route("/executions/{id}", get(get_execution))
.route( .route(
"/executions/{id}/cancel", "/executions/{id}/cancel",
@@ -955,10 +1359,26 @@ pub fn routes() -> Router<Arc<AppState>> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use attune_common::auth::jwt::generate_execution_token;
#[test] #[test]
fn test_execution_routes_structure() { fn test_execution_routes_structure() {
// Just verify the router can be constructed // Just verify the router can be constructed
let _router = routes(); let _router = routes();
} }
#[test]
fn execution_token_scope_must_match_requested_execution() {
let jwt_config = JwtConfig {
secret: "test_secret_key_for_testing".to_string(),
access_token_expiration: 3600,
refresh_token_expiration: 604800,
};
let token = generate_execution_token(42, 123, "core.echo", &jwt_config, None).unwrap();
let user = authenticate_execution_log_stream_query_token(&token, &jwt_config).unwrap();
let err = validate_execution_log_stream_user(&user, 456).unwrap_err();
assert!(matches!(err, ApiError::Forbidden(_)));
}
} }

View File

@@ -23,6 +23,7 @@ clap = { workspace = true, features = ["derive", "env", "string"] }
# HTTP client # HTTP client
reqwest = { workspace = true, features = ["multipart", "stream"] } reqwest = { workspace = true, features = ["multipart", "stream"] }
reqwest-eventsource = { workspace = true }
# Serialization # Serialization
serde = { workspace = true } serde = { workspace = true }

View File

@@ -21,6 +21,11 @@ pub struct ApiResponse<T> {
pub data: T, pub data: T,
} }
#[derive(Debug, serde::Deserialize)]
struct PaginatedResponse<T> {
data: Vec<T>,
}
/// API error response /// API error response
#[derive(Debug, serde::Deserialize)] #[derive(Debug, serde::Deserialize)]
pub struct ApiError { pub struct ApiError {
@@ -55,6 +60,10 @@ impl ApiClient {
&self.base_url &self.base_url
} }
pub fn auth_token(&self) -> Option<&str> {
self.auth_token.as_deref()
}
#[cfg(test)] #[cfg(test)]
pub fn new(base_url: String, auth_token: Option<String>) -> Self { pub fn new(base_url: String, auth_token: Option<String>) -> Self {
let client = HttpClient::builder() let client = HttpClient::builder()
@@ -255,6 +264,31 @@ impl ApiClient {
} }
} }
async fn handle_paginated_response<T: DeserializeOwned>(
&self,
response: reqwest::Response,
) -> Result<Vec<T>> {
let status = response.status();
if status.is_success() {
let paginated: PaginatedResponse<T> = response
.json()
.await
.context("Failed to parse paginated API response")?;
Ok(paginated.data)
} else {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
if let Ok(api_error) = serde_json::from_str::<ApiError>(&error_text) {
anyhow::bail!("API error ({}): {}", status, api_error.error);
} else {
anyhow::bail!("API error ({}): {}", status, error_text);
}
}
}
/// Handle a response where we only care about success/failure, not a body. /// Handle a response where we only care about success/failure, not a body.
async fn handle_empty_response(&self, response: reqwest::Response) -> Result<()> { async fn handle_empty_response(&self, response: reqwest::Response) -> Result<()> {
let status = response.status(); let status = response.status();
@@ -281,6 +315,25 @@ impl ApiClient {
self.execute_json::<T, ()>(Method::GET, path, None).await self.execute_json::<T, ()>(Method::GET, path, None).await
} }
pub async fn get_paginated<T: DeserializeOwned>(&mut self, path: &str) -> Result<Vec<T>> {
let req = self.build_request(Method::GET, path);
let response = req.send().await.context("Failed to send request to API")?;
if response.status() == StatusCode::UNAUTHORIZED
&& self.refresh_token.is_some()
&& self.refresh_auth_token().await?
{
let req = self.build_request(Method::GET, path);
let response = req
.send()
.await
.context("Failed to send request to API (retry)")?;
return self.handle_paginated_response(response).await;
}
self.handle_paginated_response(response).await
}
/// GET request with query parameters (query string must be in path) /// GET request with query parameters (query string must be in path)
/// ///
/// Part of REST client API - reserved for future advanced filtering/search features. /// Part of REST client API - reserved for future advanced filtering/search features.

View File

@@ -6,7 +6,7 @@ use std::collections::HashMap;
use crate::client::ApiClient; use crate::client::ApiClient;
use crate::config::CliConfig; use crate::config::CliConfig;
use crate::output::{self, OutputFormat}; use crate::output::{self, OutputFormat};
use crate::wait::{wait_for_execution, WaitOptions}; use crate::wait::{extract_stdout, spawn_execution_output_watch, wait_for_execution, WaitOptions};
#[derive(Subcommand)] #[derive(Subcommand)]
pub enum ActionCommands { pub enum ActionCommands {
@@ -493,6 +493,15 @@ async fn handle_execute(
} }
let verbose = matches!(output_format, OutputFormat::Table); let verbose = matches!(output_format, OutputFormat::Table);
let watch_task = if verbose {
Some(spawn_execution_output_watch(
ApiClient::from_config(&config, api_url),
execution.id,
verbose,
))
} else {
None
};
let summary = wait_for_execution(WaitOptions { let summary = wait_for_execution(WaitOptions {
execution_id: execution.id, execution_id: execution.id,
timeout_secs: timeout, timeout_secs: timeout,
@@ -501,6 +510,13 @@ async fn handle_execute(
verbose, verbose,
}) })
.await?; .await?;
let suppress_final_stdout = watch_task
.as_ref()
.is_some_and(|task| task.delivered_output() && task.root_stdout_completed());
if let Some(task) = watch_task {
let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), task.handle).await;
}
match output_format { match output_format {
OutputFormat::Json | OutputFormat::Yaml => { OutputFormat::Json | OutputFormat::Yaml => {
@@ -517,7 +533,20 @@ async fn handle_execute(
("Updated", output::format_timestamp(&summary.updated)), ("Updated", output::format_timestamp(&summary.updated)),
]); ]);
if let Some(result) = summary.result { let stdout = extract_stdout(&summary.result);
if !suppress_final_stdout {
if let Some(stdout) = &stdout {
output::print_section("Stdout");
println!("{}", stdout);
}
}
if let Some(mut result) = summary.result {
if stdout.is_some() {
if let Some(obj) = result.as_object_mut() {
obj.remove("stdout");
}
}
if !result.is_null() { if !result.is_null() {
output::print_section("Result"); output::print_section("Result");
println!("{}", serde_json::to_string_pretty(&result)?); println!("{}", serde_json::to_string_pretty(&result)?);

View File

@@ -11,7 +11,13 @@
use anyhow::Result; use anyhow::Result;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use reqwest_eventsource::{Event as SseEvent, EventSource};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_tungstenite::{connect_async, tungstenite::Message};
@@ -54,6 +60,22 @@ pub struct WaitOptions<'a> {
pub verbose: bool, pub verbose: bool,
} }
pub struct OutputWatchTask {
pub handle: tokio::task::JoinHandle<()>,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Arc<AtomicBool>,
}
impl OutputWatchTask {
pub fn delivered_output(&self) -> bool {
self.delivered_output.load(Ordering::Relaxed)
}
pub fn root_stdout_completed(&self) -> bool {
self.root_stdout_completed.load(Ordering::Relaxed)
}
}
// ── notifier WebSocket messages (mirrors websocket_server.rs) ──────────────── // ── notifier WebSocket messages (mirrors websocket_server.rs) ────────────────
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@@ -102,6 +124,41 @@ struct RestExecution {
updated: String, updated: String,
} }
#[derive(Debug, Clone, Deserialize)]
struct WorkflowTaskMetadata {
task_name: String,
#[serde(default)]
task_index: Option<i32>,
}
#[derive(Debug, Clone, Deserialize)]
struct ExecutionListItem {
id: i64,
action_ref: String,
status: String,
#[serde(default)]
workflow_task: Option<WorkflowTaskMetadata>,
}
#[derive(Debug)]
struct ChildWatchState {
label: String,
status: String,
announced_terminal: bool,
stream_handles: Vec<StreamWatchHandle>,
}
struct RootWatchState {
stream_handles: Vec<StreamWatchHandle>,
}
#[derive(Debug)]
struct StreamWatchHandle {
stream_name: &'static str,
offset: Arc<AtomicU64>,
handle: tokio::task::JoinHandle<()>,
}
impl From<RestExecution> for ExecutionSummary { impl From<RestExecution> for ExecutionSummary {
fn from(e: RestExecution) -> Self { fn from(e: RestExecution) -> Self {
Self { Self {
@@ -177,6 +234,269 @@ pub async fn wait_for_execution(opts: WaitOptions<'_>) -> Result<ExecutionSummar
.await .await
} }
pub fn spawn_execution_output_watch(
mut client: ApiClient,
execution_id: i64,
verbose: bool,
) -> OutputWatchTask {
let delivered_output = Arc::new(AtomicBool::new(false));
let root_stdout_completed = Arc::new(AtomicBool::new(false));
let delivered_output_for_task = delivered_output.clone();
let root_stdout_completed_for_task = root_stdout_completed.clone();
let handle = tokio::spawn(async move {
if let Err(err) = watch_execution_output(
&mut client,
execution_id,
verbose,
delivered_output_for_task,
root_stdout_completed_for_task,
)
.await
{
if verbose {
eprintln!(" [watch] {}", err);
}
}
});
OutputWatchTask {
handle,
delivered_output,
root_stdout_completed,
}
}
async fn watch_execution_output(
client: &mut ApiClient,
execution_id: i64,
verbose: bool,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Arc<AtomicBool>,
) -> Result<()> {
let base_url = client.base_url().to_string();
let mut root_watch: Option<RootWatchState> = None;
let mut children: HashMap<i64, ChildWatchState> = HashMap::new();
loop {
let execution: RestExecution = client.get(&format!("/executions/{}", execution_id)).await?;
if root_watch
.as_ref()
.is_none_or(|state| streams_need_restart(&state.stream_handles))
{
if let Some(token) = client.auth_token().map(str::to_string) {
match root_watch.as_mut() {
Some(state) => restart_finished_streams(
&mut state.stream_handles,
&base_url,
token,
execution_id,
None,
verbose,
delivered_output.clone(),
Some(root_stdout_completed.clone()),
),
None => {
root_watch = Some(RootWatchState {
stream_handles: spawn_execution_log_streams(
&base_url,
token,
execution_id,
None,
verbose,
delivered_output.clone(),
Some(root_stdout_completed.clone()),
),
});
}
}
}
}
let child_items = list_child_executions(client, execution_id)
.await
.unwrap_or_default();
for child in child_items {
let label = format_task_label(&child.workflow_task, &child.action_ref, child.id);
let entry = children.entry(child.id).or_insert_with(|| {
if verbose {
eprintln!(" [{}] started ({})", label, child.action_ref);
}
let stream_handles = client
.auth_token()
.map(str::to_string)
.map(|token| {
spawn_execution_log_streams(
&base_url,
token,
child.id,
Some(label.clone()),
verbose,
delivered_output.clone(),
None,
)
})
.unwrap_or_default();
ChildWatchState {
label,
status: child.status.clone(),
announced_terminal: false,
stream_handles,
}
});
if entry.status != child.status {
entry.status = child.status.clone();
}
let child_is_terminal = is_terminal(&entry.status);
if !child_is_terminal && streams_need_restart(&entry.stream_handles) {
if let Some(token) = client.auth_token().map(str::to_string) {
restart_finished_streams(
&mut entry.stream_handles,
&base_url,
token,
child.id,
Some(entry.label.clone()),
verbose,
delivered_output.clone(),
None,
);
}
}
if !entry.announced_terminal && is_terminal(&child.status) {
entry.announced_terminal = true;
if verbose {
eprintln!(" [{}] {}", entry.label, child.status);
}
}
}
if is_terminal(&execution.status) {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
if let Some(root_watch) = root_watch {
wait_for_stream_handles(root_watch.stream_handles).await;
}
for child in children.into_values() {
wait_for_stream_handles(child.stream_handles).await;
}
Ok(())
}
fn spawn_execution_log_streams(
base_url: &str,
token: String,
execution_id: i64,
prefix: Option<String>,
verbose: bool,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Option<Arc<AtomicBool>>,
) -> Vec<StreamWatchHandle> {
["stdout", "stderr"]
.into_iter()
.map(|stream_name| {
let offset = Arc::new(AtomicU64::new(0));
let completion_flag = if stream_name == "stdout" {
root_stdout_completed.clone()
} else {
None
};
StreamWatchHandle {
stream_name,
handle: tokio::spawn(stream_execution_log(
base_url.to_string(),
token.clone(),
execution_id,
stream_name,
prefix.clone(),
verbose,
offset.clone(),
delivered_output.clone(),
completion_flag,
)),
offset,
}
})
.collect()
}
fn streams_need_restart(handles: &[StreamWatchHandle]) -> bool {
handles.is_empty() || handles.iter().any(|handle| handle.handle.is_finished())
}
fn restart_finished_streams(
handles: &mut Vec<StreamWatchHandle>,
base_url: &str,
token: String,
execution_id: i64,
prefix: Option<String>,
verbose: bool,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Option<Arc<AtomicBool>>,
) {
for stream in handles.iter_mut() {
if stream.handle.is_finished() {
let offset = stream.offset.clone();
let completion_flag = if stream.stream_name == "stdout" {
root_stdout_completed.clone()
} else {
None
};
stream.handle = tokio::spawn(stream_execution_log(
base_url.to_string(),
token.clone(),
execution_id,
stream.stream_name,
prefix.clone(),
verbose,
offset,
delivered_output.clone(),
completion_flag,
));
}
}
}
async fn wait_for_stream_handles(handles: Vec<StreamWatchHandle>) {
for handle in handles {
let _ = handle.handle.await;
}
}
async fn list_child_executions(
client: &mut ApiClient,
execution_id: i64,
) -> Result<Vec<ExecutionListItem>> {
const PER_PAGE: u32 = 100;
let mut page = 1;
let mut all_children = Vec::new();
loop {
let path = format!("/executions?parent={execution_id}&page={page}&per_page={PER_PAGE}");
let mut page_items: Vec<ExecutionListItem> = client.get_paginated(&path).await?;
let page_len = page_items.len();
all_children.append(&mut page_items);
if page_len < PER_PAGE as usize {
break;
}
page += 1;
}
Ok(all_children)
}
// ── WebSocket path ──────────────────────────────────────────────────────────── // ── WebSocket path ────────────────────────────────────────────────────────────
async fn wait_via_websocket( async fn wait_via_websocket(
@@ -491,6 +811,143 @@ fn derive_notifier_url(api_url: &str) -> Option<String> {
Some(format!("{}://{}:8081", ws_scheme, host)) Some(format!("{}://{}:8081", ws_scheme, host))
} }
pub fn extract_stdout(result: &Option<serde_json::Value>) -> Option<String> {
result
.as_ref()
.and_then(|value| value.get("stdout"))
.and_then(|stdout| stdout.as_str())
.filter(|stdout| !stdout.is_empty())
.map(ToOwned::to_owned)
}
fn format_task_label(
workflow_task: &Option<WorkflowTaskMetadata>,
action_ref: &str,
execution_id: i64,
) -> String {
if let Some(workflow_task) = workflow_task {
if let Some(index) = workflow_task.task_index {
format!("{}[{}]", workflow_task.task_name, index)
} else {
workflow_task.task_name.clone()
}
} else {
format!("{}#{}", action_ref, execution_id)
}
}
async fn stream_execution_log(
base_url: String,
token: String,
execution_id: i64,
stream_name: &'static str,
prefix: Option<String>,
verbose: bool,
offset: Arc<AtomicU64>,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Option<Arc<AtomicBool>>,
) {
let mut stream_url = match url::Url::parse(&format!(
"{}/api/v1/executions/{}/logs/{}/stream",
base_url.trim_end_matches('/'),
execution_id,
stream_name
)) {
Ok(url) => url,
Err(err) => {
if verbose {
eprintln!(" [watch] failed to build stream URL: {}", err);
}
return;
}
};
let current_offset = offset.load(Ordering::Relaxed).to_string();
stream_url
.query_pairs_mut()
.append_pair("token", &token)
.append_pair("offset", &current_offset);
let mut event_source = EventSource::get(stream_url);
let mut carry = String::new();
while let Some(event) = event_source.next().await {
match event {
Ok(SseEvent::Open) => {}
Ok(SseEvent::Message(message)) => match message.event.as_str() {
"content" | "append" => {
if let Ok(server_offset) = message.id.parse::<u64>() {
offset.store(server_offset, Ordering::Relaxed);
}
if !message.data.is_empty() {
delivered_output.store(true, Ordering::Relaxed);
}
print_stream_chunk(prefix.as_deref(), &message.data, &mut carry);
}
"done" => {
if let Some(flag) = &root_stdout_completed {
flag.store(true, Ordering::Relaxed);
}
flush_stream_chunk(prefix.as_deref(), &mut carry);
break;
}
"error" => {
if verbose && !message.data.is_empty() {
eprintln!(" [watch] {}", message.data);
}
break;
}
_ => {}
},
Err(err) => {
flush_stream_chunk(prefix.as_deref(), &mut carry);
if verbose {
eprintln!(
" [watch] stream error for execution {}: {}",
execution_id, err
);
}
break;
}
}
}
flush_stream_chunk(prefix.as_deref(), &mut carry);
let _ = event_source.close();
}
fn print_stream_chunk(prefix: Option<&str>, chunk: &str, carry: &mut String) {
carry.push_str(chunk);
while let Some(idx) = carry.find('\n') {
let mut line = carry.drain(..=idx).collect::<String>();
if line.ends_with('\n') {
line.pop();
}
if line.ends_with('\r') {
line.pop();
}
if let Some(prefix) = prefix {
eprintln!("[{}] {}", prefix, line);
} else {
eprintln!("{}", line);
}
}
}
fn flush_stream_chunk(prefix: Option<&str>, carry: &mut String) {
if carry.is_empty() {
return;
}
if let Some(prefix) = prefix {
eprintln!("[{}] {}", prefix, carry);
} else {
eprintln!("{}", carry);
}
carry.clear();
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@@ -553,4 +1010,26 @@ mod tests {
assert_eq!(summary.status, "failed"); assert_eq!(summary.status, "failed");
assert_eq!(summary.action_ref, ""); assert_eq!(summary.action_ref, "");
} }
#[test]
fn test_extract_stdout() {
let result = Some(serde_json::json!({
"stdout": "hello world",
"stderr_log": "/tmp/stderr.log"
}));
assert_eq!(extract_stdout(&result).as_deref(), Some("hello world"));
}
#[test]
fn test_format_task_label() {
let workflow_task = Some(WorkflowTaskMetadata {
task_name: "build".to_string(),
task_index: Some(2),
});
assert_eq!(
format_task_label(&workflow_task, "core.echo", 42),
"build[2]"
);
assert_eq!(format_task_label(&None, "core.echo", 42), "core.echo#42");
}
} }

View File

@@ -543,6 +543,16 @@ impl ActionExecutor {
selected_runtime_version, selected_runtime_version,
max_stdout_bytes: self.max_stdout_bytes, max_stdout_bytes: self.max_stdout_bytes,
max_stderr_bytes: self.max_stderr_bytes, max_stderr_bytes: self.max_stderr_bytes,
stdout_log_path: Some(
self.artifact_manager
.get_execution_dir(execution.id)
.join("stdout.log"),
),
stderr_log_path: Some(
self.artifact_manager
.get_execution_dir(execution.id)
.join("stderr.log"),
),
parameter_delivery: action.parameter_delivery, parameter_delivery: action.parameter_delivery,
parameter_format: action.parameter_format, parameter_format: action.parameter_format,
output_format: action.output_format, output_format: action.output_format,

View File

@@ -2,9 +2,10 @@
//! //!
//! Provides bounded log writers that limit output size to prevent OOM issues. //! Provides bounded log writers that limit output size to prevent OOM issues.
use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::io::AsyncWrite; use tokio::io::{AsyncWrite, AsyncWriteExt};
const TRUNCATION_NOTICE_STDOUT: &str = "\n\n[OUTPUT TRUNCATED: stdout exceeded size limit]\n"; const TRUNCATION_NOTICE_STDOUT: &str = "\n\n[OUTPUT TRUNCATED: stdout exceeded size limit]\n";
const TRUNCATION_NOTICE_STDERR: &str = "\n\n[OUTPUT TRUNCATED: stderr exceeded size limit]\n"; const TRUNCATION_NOTICE_STDERR: &str = "\n\n[OUTPUT TRUNCATED: stderr exceeded size limit]\n";
@@ -76,6 +77,15 @@ pub struct BoundedLogWriter {
truncation_notice: &'static str, truncation_notice: &'static str,
} }
/// A file-backed writer that applies the same truncation policy as `BoundedLogWriter`.
pub struct BoundedLogFileWriter {
file: tokio::fs::File,
max_bytes: usize,
truncated: bool,
data_bytes_written: usize,
truncation_notice: &'static str,
}
impl BoundedLogWriter { impl BoundedLogWriter {
/// Create a new bounded log writer for stdout /// Create a new bounded log writer for stdout
pub fn new_stdout(max_bytes: usize) -> Self { pub fn new_stdout(max_bytes: usize) -> Self {
@@ -166,6 +176,76 @@ impl BoundedLogWriter {
} }
} }
impl BoundedLogFileWriter {
pub async fn new_stdout(path: &Path, max_bytes: usize) -> std::io::Result<Self> {
Self::create(path, max_bytes, TRUNCATION_NOTICE_STDOUT).await
}
pub async fn new_stderr(path: &Path, max_bytes: usize) -> std::io::Result<Self> {
Self::create(path, max_bytes, TRUNCATION_NOTICE_STDERR).await
}
async fn create(
path: &Path,
max_bytes: usize,
truncation_notice: &'static str,
) -> std::io::Result<Self> {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.await?;
Ok(Self {
file,
max_bytes,
truncated: false,
data_bytes_written: 0,
truncation_notice,
})
}
pub async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
if self.truncated {
return Ok(());
}
let effective_limit = self.max_bytes.saturating_sub(NOTICE_RESERVE_BYTES);
let remaining_space = effective_limit.saturating_sub(self.data_bytes_written);
if remaining_space == 0 {
self.add_truncation_notice().await?;
return Ok(());
}
let bytes_to_write = std::cmp::min(buf.len(), remaining_space);
if bytes_to_write > 0 {
self.file.write_all(&buf[..bytes_to_write]).await?;
self.data_bytes_written += bytes_to_write;
}
if bytes_to_write < buf.len() {
self.add_truncation_notice().await?;
}
self.file.flush().await
}
async fn add_truncation_notice(&mut self) -> std::io::Result<()> {
if self.truncated {
return Ok(());
}
self.truncated = true;
self.file.write_all(self.truncation_notice.as_bytes()).await
}
}
impl AsyncWrite for BoundedLogWriter { impl AsyncWrite for BoundedLogWriter {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,

View File

@@ -48,7 +48,7 @@ pub use dependency::{
DependencyError, DependencyManager, DependencyManagerRegistry, DependencyResult, DependencyError, DependencyManager, DependencyManagerRegistry, DependencyResult,
DependencySpec, EnvironmentInfo, DependencySpec, EnvironmentInfo,
}; };
pub use log_writer::{BoundedLogResult, BoundedLogWriter}; pub use log_writer::{BoundedLogFileWriter, BoundedLogResult, BoundedLogWriter};
pub use parameter_passing::{ParameterDeliveryConfig, PreparedParameters}; pub use parameter_passing::{ParameterDeliveryConfig, PreparedParameters};
// Re-export parameter types from common // Re-export parameter types from common
@@ -148,6 +148,12 @@ pub struct ExecutionContext {
/// Maximum stderr size in bytes (for log truncation) /// Maximum stderr size in bytes (for log truncation)
pub max_stderr_bytes: usize, pub max_stderr_bytes: usize,
/// Optional live stdout log path for incremental writes during execution.
pub stdout_log_path: Option<PathBuf>,
/// Optional live stderr log path for incremental writes during execution.
pub stderr_log_path: Option<PathBuf>,
/// How parameters should be delivered to the action /// How parameters should be delivered to the action
pub parameter_delivery: ParameterDelivery, pub parameter_delivery: ParameterDelivery,
@@ -185,6 +191,8 @@ impl ExecutionContext {
selected_runtime_version: None, selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024, max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(), parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(), parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(), output_format: OutputFormat::default(),

View File

@@ -5,10 +5,11 @@
use super::{ use super::{
parameter_passing::{self, ParameterDeliveryConfig}, parameter_passing::{self, ParameterDeliveryConfig},
BoundedLogWriter, ExecutionContext, ExecutionResult, Runtime, RuntimeError, RuntimeResult, BoundedLogFileWriter, BoundedLogWriter, ExecutionContext, ExecutionResult, Runtime,
RuntimeError, RuntimeResult,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use std::path::PathBuf; use std::path::{Path, PathBuf};
use std::process::Stdio; use std::process::Stdio;
use std::time::Instant; use std::time::Instant;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
@@ -45,6 +46,8 @@ impl NativeRuntime {
timeout: Option<u64>, timeout: Option<u64>,
max_stdout_bytes: usize, max_stdout_bytes: usize,
max_stderr_bytes: usize, max_stderr_bytes: usize,
stdout_log_path: Option<&Path>,
stderr_log_path: Option<&Path>,
) -> RuntimeResult<ExecutionResult> { ) -> RuntimeResult<ExecutionResult> {
let start = Instant::now(); let start = Instant::now();
@@ -131,6 +134,8 @@ impl NativeRuntime {
let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes); let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes);
let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes); let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes);
let mut stdout_file = open_live_log_file(stdout_log_path, max_stdout_bytes, true).await?;
let mut stderr_file = open_live_log_file(stderr_log_path, max_stderr_bytes, false).await?;
// Create buffered readers // Create buffered readers
let mut stdout_reader = BufReader::new(stdout_handle); let mut stdout_reader = BufReader::new(stdout_handle);
@@ -147,6 +152,9 @@ impl NativeRuntime {
if stdout_writer.write_all(&line).await.is_err() { if stdout_writer.write_all(&line).await.is_err() {
break; break;
} }
if let Some(file) = stdout_file.as_mut() {
let _ = file.write_all(&line).await;
}
} }
Err(_) => break, Err(_) => break,
} }
@@ -164,6 +172,9 @@ impl NativeRuntime {
if stderr_writer.write_all(&line).await.is_err() { if stderr_writer.write_all(&line).await.is_err() {
break; break;
} }
if let Some(file) = stderr_file.as_mut() {
let _ = file.write_all(&line).await;
}
} }
Err(_) => break, Err(_) => break,
} }
@@ -352,6 +363,8 @@ impl Runtime for NativeRuntime {
context.timeout, context.timeout,
context.max_stdout_bytes, context.max_stdout_bytes,
context.max_stderr_bytes, context.max_stderr_bytes,
context.stdout_log_path.as_deref(),
context.stderr_log_path.as_deref(),
) )
.await .await
} }
@@ -401,6 +414,23 @@ impl Runtime for NativeRuntime {
} }
} }
async fn open_live_log_file(
path: Option<&Path>,
max_bytes: usize,
is_stdout: bool,
) -> std::io::Result<Option<BoundedLogFileWriter>> {
let Some(path) = path else {
return Ok(None);
};
let writer = if is_stdout {
BoundedLogFileWriter::new_stdout(path, max_bytes).await?
} else {
BoundedLogFileWriter::new_stderr(path, max_bytes).await?
};
Ok(Some(writer))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -962,6 +962,8 @@ impl Runtime for ProcessRuntime {
context.max_stderr_bytes, context.max_stderr_bytes,
context.output_format, context.output_format,
context.cancel_token.clone(), context.cancel_token.clone(),
context.stdout_log_path.as_deref(),
context.stderr_log_path.as_deref(),
) )
.await; .await;

View File

@@ -12,10 +12,10 @@
//! 1. SIGTERM is sent to the process immediately //! 1. SIGTERM is sent to the process immediately
//! 2. After a 5-second grace period, SIGKILL is sent as a last resort //! 2. After a 5-second grace period, SIGKILL is sent as a last resort
use super::{BoundedLogWriter, ExecutionResult, OutputFormat, RuntimeResult}; use super::{BoundedLogFileWriter, BoundedLogWriter, ExecutionResult, OutputFormat, RuntimeResult};
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::path::Path; use std::path::{Path, PathBuf};
use std::time::Instant; use std::time::Instant;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command; use tokio::process::Command;
@@ -59,6 +59,8 @@ pub async fn execute_streaming(
max_stderr_bytes, max_stderr_bytes,
output_format, output_format,
None, None,
None,
None,
) )
.await .await
} }
@@ -93,6 +95,8 @@ pub async fn execute_streaming_cancellable(
max_stderr_bytes: usize, max_stderr_bytes: usize,
output_format: OutputFormat, output_format: OutputFormat,
cancel_token: Option<CancellationToken>, cancel_token: Option<CancellationToken>,
stdout_log_path: Option<&Path>,
stderr_log_path: Option<&Path>,
) -> RuntimeResult<ExecutionResult> { ) -> RuntimeResult<ExecutionResult> {
let start = Instant::now(); let start = Instant::now();
@@ -130,6 +134,8 @@ pub async fn execute_streaming_cancellable(
// Create bounded writers // Create bounded writers
let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes); let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes);
let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes); let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes);
let mut stdout_file = open_live_log_file(stdout_log_path, max_stdout_bytes, true).await?;
let mut stderr_file = open_live_log_file(stderr_log_path, max_stderr_bytes, false).await?;
// Take stdout and stderr streams // Take stdout and stderr streams
let stdout = child.stdout.take().expect("stdout not captured"); let stdout = child.stdout.take().expect("stdout not captured");
@@ -150,6 +156,9 @@ pub async fn execute_streaming_cancellable(
if stdout_writer.write_all(&line).await.is_err() { if stdout_writer.write_all(&line).await.is_err() {
break; break;
} }
if let Some(file) = stdout_file.as_mut() {
let _ = file.write_all(&line).await;
}
} }
Err(_) => break, Err(_) => break,
} }
@@ -167,6 +176,9 @@ pub async fn execute_streaming_cancellable(
if stderr_writer.write_all(&line).await.is_err() { if stderr_writer.write_all(&line).await.is_err() {
break; break;
} }
if let Some(file) = stderr_file.as_mut() {
let _ = file.write_all(&line).await;
}
} }
Err(_) => break, Err(_) => break,
} }
@@ -351,6 +363,24 @@ pub async fn execute_streaming_cancellable(
}) })
} }
async fn open_live_log_file(
path: Option<&Path>,
max_bytes: usize,
is_stdout: bool,
) -> io::Result<Option<BoundedLogFileWriter>> {
let Some(path) = path else {
return Ok(None);
};
let path: PathBuf = path.to_path_buf();
let writer = if is_stdout {
BoundedLogFileWriter::new_stdout(&path, max_bytes).await?
} else {
BoundedLogFileWriter::new_stderr(&path, max_bytes).await?
};
Ok(Some(writer))
}
/// Parse stdout content according to the specified output format. /// Parse stdout content according to the specified output format.
fn configure_child_process(cmd: &mut Command) -> io::Result<()> { fn configure_child_process(cmd: &mut Command) -> io::Result<()> {
#[cfg(unix)] #[cfg(unix)]

View File

@@ -1,819 +0,0 @@
//! Python Runtime Implementation
//!
//! Executes Python actions using subprocess execution.
use super::{
BoundedLogWriter, DependencyManagerRegistry, DependencySpec, ExecutionContext, ExecutionResult,
OutputFormat, Runtime, RuntimeError, RuntimeResult,
};
use async_trait::async_trait;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Instant;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::time::timeout;
use tracing::{debug, info, warn};
/// Python runtime for executing Python scripts and functions
pub struct PythonRuntime {
/// Python interpreter path (fallback when no venv exists)
python_path: PathBuf,
/// Base directory for storing action code
work_dir: PathBuf,
/// Optional dependency manager registry for isolated environments
dependency_manager: Option<Arc<DependencyManagerRegistry>>,
}
impl PythonRuntime {
/// Create a new Python runtime
pub fn new() -> Self {
Self {
python_path: PathBuf::from("python3"),
work_dir: PathBuf::from("/tmp/attune/actions"),
dependency_manager: None,
}
}
/// Create a Python runtime with custom settings
pub fn with_config(python_path: PathBuf, work_dir: PathBuf) -> Self {
Self {
python_path,
work_dir,
dependency_manager: None,
}
}
/// Create a Python runtime with dependency manager support
pub fn with_dependency_manager(
python_path: PathBuf,
work_dir: PathBuf,
dependency_manager: Arc<DependencyManagerRegistry>,
) -> Self {
Self {
python_path,
work_dir,
dependency_manager: Some(dependency_manager),
}
}
/// Get the Python executable path to use for a given context
///
/// If the action has a pack_ref with dependencies, use the venv Python.
/// Otherwise, use the default Python interpreter.
async fn get_python_executable(&self, context: &ExecutionContext) -> RuntimeResult<PathBuf> {
// Check if we have a dependency manager and can extract pack_ref
if let Some(ref dep_mgr) = self.dependency_manager {
// Extract pack_ref from action_ref (format: "pack_ref.action_name")
if let Some(pack_ref) = context.action_ref.split('.').next() {
// Try to get the executable path for this pack
match dep_mgr.get_executable_path(pack_ref, "python").await {
Ok(python_path) => {
debug!(
"Using pack-specific Python from venv: {}",
python_path.display()
);
return Ok(python_path);
}
Err(e) => {
// Venv doesn't exist or failed - this is OK if pack has no dependencies
debug!(
"No venv found for pack {} ({}), using default Python",
pack_ref, e
);
}
}
}
}
// Fall back to default Python interpreter
debug!("Using default Python interpreter: {:?}", self.python_path);
Ok(self.python_path.clone())
}
/// Generate Python wrapper script that loads parameters and executes the action
fn generate_wrapper_script(&self, context: &ExecutionContext) -> RuntimeResult<String> {
let params_json = serde_json::to_string(&context.parameters)?;
// Use base64 encoding for code to avoid any quote/escape issues
let code_bytes = context.code.as_deref().unwrap_or("").as_bytes();
let code_base64 =
base64::Engine::encode(&base64::engine::general_purpose::STANDARD, code_bytes);
let wrapper = format!(
r#"#!/usr/bin/env python3
import sys
import json
import traceback
import base64
from pathlib import Path
# Global secrets storage (read from stdin, NOT from environment)
_attune_secrets = {{}}
def get_secret(name):
"""
Get a secret value by name.
Secrets are passed securely via stdin and are never exposed in
environment variables or process listings.
Args:
name (str): The name of the secret to retrieve
Returns:
str: The secret value, or None if not found
"""
return _attune_secrets.get(name)
def main():
global _attune_secrets
try:
# Read secrets from stdin FIRST (before executing action code)
# This prevents secrets from being visible in process environment
secrets_line = sys.stdin.readline().strip()
if secrets_line:
_attune_secrets = json.loads(secrets_line)
# Parse parameters
parameters = json.loads('''{}''')
# Decode action code from base64 (avoids quote/escape issues)
action_code = base64.b64decode('{}').decode('utf-8')
# Execute the code in a controlled namespace
# Include get_secret helper function
namespace = {{
'__name__': '__main__',
'parameters': parameters,
'get_secret': get_secret
}}
exec(action_code, namespace)
# Look for main function or run function
if '{}' in namespace:
result = namespace['{}'](**parameters)
elif 'run' in namespace:
result = namespace['run'](**parameters)
elif 'main' in namespace:
result = namespace['main'](**parameters)
else:
# No entry point found, return the namespace (only JSON-serializable values)
def is_json_serializable(obj):
"""Check if an object is JSON serializable"""
if obj is None:
return True
if isinstance(obj, (bool, int, float, str)):
return True
if isinstance(obj, (list, tuple)):
return all(is_json_serializable(item) for item in obj)
if isinstance(obj, dict):
return all(is_json_serializable(k) and is_json_serializable(v)
for k, v in obj.items())
return False
result = {{k: v for k, v in namespace.items()
if not k.startswith('__') and is_json_serializable(v)}}
# Output result as JSON
if result is not None:
print(json.dumps({{'result': result, 'status': 'success'}}))
else:
print(json.dumps({{'status': 'success'}}))
sys.exit(0)
except Exception as e:
error_info = {{
'status': 'error',
'error': str(e),
'error_type': type(e).__name__,
'traceback': traceback.format_exc()
}}
print(json.dumps(error_info), file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
"#,
params_json, code_base64, context.entry_point, context.entry_point
);
Ok(wrapper)
}
/// Execute with streaming and bounded log collection
async fn execute_with_streaming(
&self,
mut cmd: Command,
secrets: &std::collections::HashMap<String, String>,
timeout_secs: Option<u64>,
max_stdout_bytes: usize,
max_stderr_bytes: usize,
output_format: OutputFormat,
) -> RuntimeResult<ExecutionResult> {
let start = Instant::now();
// Spawn process with piped I/O
let mut child = cmd
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
// Write secrets to stdin
if let Some(mut stdin) = child.stdin.take() {
let secrets_json = serde_json::to_string(secrets)?;
stdin.write_all(secrets_json.as_bytes()).await?;
stdin.write_all(b"\n").await?;
drop(stdin);
}
// Create bounded writers
let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes);
let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes);
// Take stdout and stderr streams
let stdout = child.stdout.take().expect("stdout not captured");
let stderr = child.stderr.take().expect("stderr not captured");
// Create buffered readers
let mut stdout_reader = BufReader::new(stdout);
let mut stderr_reader = BufReader::new(stderr);
// Stream both outputs concurrently
let stdout_task = async {
let mut line = Vec::new();
loop {
line.clear();
match stdout_reader.read_until(b'\n', &mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
if stdout_writer.write_all(&line).await.is_err() {
break;
}
}
Err(_) => break,
}
}
stdout_writer
};
let stderr_task = async {
let mut line = Vec::new();
loop {
line.clear();
match stderr_reader.read_until(b'\n', &mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
if stderr_writer.write_all(&line).await.is_err() {
break;
}
}
Err(_) => break,
}
}
stderr_writer
};
// Wait for both streams and the process
let (stdout_writer, stderr_writer, wait_result) =
tokio::join!(stdout_task, stderr_task, async {
if let Some(timeout_secs) = timeout_secs {
timeout(std::time::Duration::from_secs(timeout_secs), child.wait()).await
} else {
Ok(child.wait().await)
}
});
let duration_ms = start.elapsed().as_millis() as u64;
// Handle timeout
let status = match wait_result {
Ok(Ok(status)) => status,
Ok(Err(e)) => {
return Err(RuntimeError::ProcessError(format!(
"Process wait failed: {}",
e
)));
}
Err(_) => {
return Ok(ExecutionResult {
exit_code: -1,
stdout: String::new(),
stderr: String::new(),
result: None,
duration_ms,
error: Some(format!(
"Execution timed out after {} seconds",
timeout_secs.unwrap()
)),
stdout_truncated: false,
stderr_truncated: false,
stdout_bytes_truncated: 0,
stderr_bytes_truncated: 0,
});
}
};
// Get results from bounded writers
let stdout_result = stdout_writer.into_result();
let stderr_result = stderr_writer.into_result();
let exit_code = status.code().unwrap_or(-1);
debug!(
"Python execution completed: exit_code={}, duration={}ms, stdout_truncated={}, stderr_truncated={}",
exit_code, duration_ms, stdout_result.truncated, stderr_result.truncated
);
// Parse result from stdout based on output_format
let result = if exit_code == 0 && !stdout_result.content.trim().is_empty() {
match output_format {
OutputFormat::Text => {
// No parsing - text output is captured in stdout field
None
}
OutputFormat::Json => {
// Try to parse full stdout as JSON first (handles multi-line JSON),
// then fall back to last line only (for scripts that log before output)
let trimmed = stdout_result.content.trim();
serde_json::from_str(trimmed).ok().or_else(|| {
trimmed
.lines()
.last()
.and_then(|line| serde_json::from_str(line).ok())
})
}
OutputFormat::Yaml => {
// Try to parse stdout as YAML
serde_yaml_ng::from_str(stdout_result.content.trim()).ok()
}
OutputFormat::Jsonl => {
// Parse each line as JSON and collect into array
let mut items = Vec::new();
for line in stdout_result.content.trim().lines() {
if let Ok(value) = serde_json::from_str::<serde_json::Value>(line) {
items.push(value);
}
}
if items.is_empty() {
None
} else {
Some(serde_json::Value::Array(items))
}
}
}
} else {
None
};
Ok(ExecutionResult {
exit_code,
// Only populate stdout if result wasn't parsed (avoid duplication)
stdout: if result.is_some() {
String::new()
} else {
stdout_result.content.clone()
},
stderr: stderr_result.content.clone(),
result,
duration_ms,
error: if exit_code != 0 {
Some(stderr_result.content)
} else {
None
},
stdout_truncated: stdout_result.truncated,
stderr_truncated: stderr_result.truncated,
stdout_bytes_truncated: stdout_result.bytes_truncated,
stderr_bytes_truncated: stderr_result.bytes_truncated,
})
}
async fn execute_python_code(
&self,
script: String,
secrets: &std::collections::HashMap<String, String>,
env: &std::collections::HashMap<String, String>,
timeout_secs: Option<u64>,
python_path: PathBuf,
max_stdout_bytes: usize,
max_stderr_bytes: usize,
output_format: OutputFormat,
) -> RuntimeResult<ExecutionResult> {
debug!(
"Executing Python script with {} secrets (passed via stdin)",
secrets.len()
);
// Build command
let mut cmd = Command::new(&python_path);
cmd.arg("-c").arg(&script);
// Add environment variables
for (key, value) in env {
cmd.env(key, value);
}
self.execute_with_streaming(
cmd,
secrets,
timeout_secs,
max_stdout_bytes,
max_stderr_bytes,
output_format,
)
.await
}
/// Execute Python script from file
async fn execute_python_file(
&self,
code_path: PathBuf,
secrets: &std::collections::HashMap<String, String>,
env: &std::collections::HashMap<String, String>,
timeout_secs: Option<u64>,
python_path: PathBuf,
max_stdout_bytes: usize,
max_stderr_bytes: usize,
output_format: OutputFormat,
) -> RuntimeResult<ExecutionResult> {
debug!(
"Executing Python file: {:?} with {} secrets",
code_path,
secrets.len()
);
// Build command
let mut cmd = Command::new(&python_path);
cmd.arg(&code_path);
// Add environment variables
for (key, value) in env {
cmd.env(key, value);
}
self.execute_with_streaming(
cmd,
secrets,
timeout_secs,
max_stdout_bytes,
max_stderr_bytes,
output_format,
)
.await
}
}
impl Default for PythonRuntime {
fn default() -> Self {
Self::new()
}
}
impl PythonRuntime {
/// Ensure pack dependencies are installed (called before execution if needed)
///
/// This is a helper method that can be called by the worker service to ensure
/// a pack's Python dependencies are set up before executing actions.
pub async fn ensure_pack_dependencies(
&self,
pack_ref: &str,
spec: &DependencySpec,
) -> RuntimeResult<()> {
if let Some(ref dep_mgr) = self.dependency_manager {
if spec.has_dependencies() {
info!(
"Ensuring Python dependencies for pack: {} ({} dependencies)",
pack_ref,
spec.dependencies.len()
);
dep_mgr
.ensure_environment(pack_ref, spec)
.await
.map_err(|e| {
RuntimeError::SetupError(format!(
"Failed to setup Python environment for {}: {}",
pack_ref, e
))
})?;
info!("Python dependencies ready for pack: {}", pack_ref);
} else {
debug!("Pack {} has no Python dependencies", pack_ref);
}
} else {
warn!("Dependency manager not configured, skipping dependency isolation");
}
Ok(())
}
}
#[async_trait]
impl Runtime for PythonRuntime {
fn name(&self) -> &str {
"python"
}
fn can_execute(&self, context: &ExecutionContext) -> bool {
// Check if action reference suggests Python
let is_python = context.action_ref.contains(".py")
|| context.entry_point.ends_with(".py")
|| context
.code_path
.as_ref()
.map(|p| p.extension().and_then(|e| e.to_str()) == Some("py"))
.unwrap_or(false);
is_python
}
async fn execute(&self, context: ExecutionContext) -> RuntimeResult<ExecutionResult> {
info!(
"Executing Python action: {} (execution_id: {})",
context.action_ref, context.execution_id
);
// Get the appropriate Python executable (venv or default)
let python_path = self.get_python_executable(&context).await?;
// If code_path is provided, execute the file directly
if let Some(code_path) = &context.code_path {
return self
.execute_python_file(
code_path.clone(),
&context.secrets,
&context.env,
context.timeout,
python_path,
context.max_stdout_bytes,
context.max_stderr_bytes,
context.output_format,
)
.await;
}
// Otherwise, generate wrapper script and execute
let script = self.generate_wrapper_script(&context)?;
self.execute_python_code(
script,
&context.secrets,
&context.env,
context.timeout,
python_path,
context.max_stdout_bytes,
context.max_stderr_bytes,
context.output_format,
)
.await
}
async fn setup(&self) -> RuntimeResult<()> {
info!("Setting up Python runtime");
// Ensure work directory exists
tokio::fs::create_dir_all(&self.work_dir)
.await
.map_err(|e| RuntimeError::SetupError(format!("Failed to create work dir: {}", e)))?;
// Verify Python is available
let output = Command::new(&self.python_path)
.arg("--version")
.output()
.await
.map_err(|e| {
RuntimeError::SetupError(format!(
"Python not found at {:?}: {}",
self.python_path, e
))
})?;
if !output.status.success() {
return Err(RuntimeError::SetupError(
"Python interpreter is not working".to_string(),
));
}
let version = String::from_utf8_lossy(&output.stdout);
info!("Python runtime ready: {}", version.trim());
Ok(())
}
async fn cleanup(&self) -> RuntimeResult<()> {
info!("Cleaning up Python runtime");
// Could clean up temporary files here
Ok(())
}
async fn validate(&self) -> RuntimeResult<()> {
debug!("Validating Python runtime");
// Check if Python is available
let output = Command::new(&self.python_path)
.arg("--version")
.output()
.await
.map_err(|e| RuntimeError::SetupError(format!("Python validation failed: {}", e)))?;
if !output.status.success() {
return Err(RuntimeError::SetupError(
"Python interpreter validation failed".to_string(),
));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[tokio::test]
async fn test_python_runtime_simple() {
let runtime = PythonRuntime::new();
let context = ExecutionContext {
execution_id: 1,
action_ref: "test.simple".to_string(),
parameters: {
let mut map = HashMap::new();
map.insert("x".to_string(), serde_json::json!(5));
map.insert("y".to_string(), serde_json::json!(10));
map
},
env: HashMap::new(),
secrets: HashMap::new(),
timeout: Some(10),
working_dir: None,
entry_point: "run".to_string(),
code: Some(
r#"
def run(x, y):
return x + y
"#
.to_string(),
),
code_path: None,
runtime_name: Some("python".to_string()),
runtime_config_override: None,
runtime_env_dir_suffix: None,
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
};
let result = runtime.execute(context).await.unwrap();
assert!(result.is_success());
assert_eq!(result.exit_code, 0);
}
#[tokio::test]
async fn test_python_runtime_timeout() {
let runtime = PythonRuntime::new();
let context = ExecutionContext {
execution_id: 2,
action_ref: "test.timeout".to_string(),
parameters: HashMap::new(),
env: HashMap::new(),
secrets: HashMap::new(),
timeout: Some(1),
working_dir: None,
entry_point: "run".to_string(),
code: Some(
r#"
import time
def run():
time.sleep(10)
return "done"
"#
.to_string(),
),
code_path: None,
runtime_name: Some("python".to_string()),
runtime_config_override: None,
runtime_env_dir_suffix: None,
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
};
let result = runtime.execute(context).await.unwrap();
assert!(!result.is_success());
assert!(result.error.is_some());
let error_msg = result.error.unwrap();
assert!(error_msg.contains("timeout") || error_msg.contains("timed out"));
}
#[tokio::test]
async fn test_python_runtime_error() {
let runtime = PythonRuntime::new();
let context = ExecutionContext {
execution_id: 3,
action_ref: "test.error".to_string(),
parameters: HashMap::new(),
env: HashMap::new(),
secrets: HashMap::new(),
timeout: Some(10),
working_dir: None,
entry_point: "run".to_string(),
code: Some(
r#"
def run():
raise ValueError("Test error")
"#
.to_string(),
),
code_path: None,
runtime_name: Some("python".to_string()),
runtime_config_override: None,
runtime_env_dir_suffix: None,
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
};
let result = runtime.execute(context).await.unwrap();
assert!(!result.is_success());
assert!(result.error.is_some());
}
#[tokio::test]
#[ignore = "Pre-existing failure - secrets not being passed correctly"]
async fn test_python_runtime_with_secrets() {
let runtime = PythonRuntime::new();
let context = ExecutionContext {
execution_id: 4,
action_ref: "test.secrets".to_string(),
parameters: HashMap::new(),
env: HashMap::new(),
secrets: {
let mut s = HashMap::new();
s.insert("api_key".to_string(), "secret_key_12345".to_string());
s.insert("db_password".to_string(), "super_secret_pass".to_string());
s
},
timeout: Some(10),
working_dir: None,
entry_point: "run".to_string(),
code: Some(
r#"
def run():
# Access secrets via get_secret() helper
api_key = get_secret('api_key')
db_pass = get_secret('db_password')
missing = get_secret('nonexistent')
return {
'api_key': api_key,
'db_pass': db_pass,
'missing': missing
}
"#
.to_string(),
),
code_path: None,
runtime_name: Some("python".to_string()),
runtime_config_override: None,
runtime_env_dir_suffix: None,
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
};
let result = runtime.execute(context).await.unwrap();
assert!(result.is_success());
assert_eq!(result.exit_code, 0);
// Verify secrets are accessible in action code
let result_data = result.result.unwrap();
let result_obj = result_data.get("result").unwrap();
assert_eq!(result_obj.get("api_key").unwrap(), "secret_key_12345");
assert_eq!(result_obj.get("db_pass").unwrap(), "super_secret_pass");
assert_eq!(result_obj.get("missing"), Some(&serde_json::Value::Null));
}
}