From 104dcbb1b18c5cfad7b5cd6cc3a0658b6df17e01 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Wed, 1 Apr 2026 20:23:56 -0500 Subject: [PATCH] [WIP] client action streaming --- .codex | 0 Cargo.lock | 1 + crates/api/src/routes/executions.rs | 422 ++++++++- crates/cli/Cargo.toml | 1 + crates/cli/src/client.rs | 53 ++ crates/cli/src/commands/action.rs | 33 +- crates/cli/src/wait.rs | 479 ++++++++++ crates/worker/src/executor.rs | 10 + crates/worker/src/runtime/log_writer.rs | 82 +- crates/worker/src/runtime/mod.rs | 10 +- crates/worker/src/runtime/native.rs | 34 +- crates/worker/src/runtime/process.rs | 2 + crates/worker/src/runtime/process_executor.rs | 34 +- crates/worker/src/runtime/python.rs | 819 ------------------ 14 files changed, 1152 insertions(+), 828 deletions(-) create mode 100644 .codex delete mode 100644 crates/worker/src/runtime/python.rs diff --git a/.codex b/.codex new file mode 100644 index 0000000..e69de29 diff --git a/Cargo.lock b/Cargo.lock index 87f13ea..551ecee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -528,6 +528,7 @@ dependencies = [ "mockito", "predicates", "reqwest 0.13.2", + "reqwest-eventsource", "serde", "serde_json", "serde_yaml_ng", diff --git a/crates/api/src/routes/executions.rs b/crates/api/src/routes/executions.rs index 610f1bd..9d13f36 100644 --- a/crates/api/src/routes/executions.rs +++ b/crates/api/src/routes/executions.rs @@ -2,6 +2,7 @@ use axum::{ extract::{Path, Query, State}, + http::HeaderMap, http::StatusCode, response::{ sse::{Event, KeepAlive, Sse}, @@ -13,6 +14,7 @@ use axum::{ use chrono::Utc; use futures::stream::{Stream, StreamExt}; use std::sync::Arc; +use std::time::Duration; use tokio_stream::wrappers::BroadcastStream; use attune_common::models::enums::ExecutionStatus; @@ -32,7 +34,10 @@ use attune_common::workflow::{CancellationPolicy, WorkflowDefinition}; use sqlx::Row; use crate::{ - auth::middleware::RequireAuth, + auth::{ + jwt::{validate_token, Claims, JwtConfig, TokenType}, + middleware::{AuthenticatedUser, RequireAuth}, + }, authz::{AuthorizationCheck, AuthorizationService}, dto::{ common::{PaginatedResponse, PaginationParams}, @@ -46,6 +51,9 @@ use crate::{ }; use attune_common::rbac::{Action, AuthorizationContext, Resource}; +const LOG_STREAM_POLL_INTERVAL: Duration = Duration::from_millis(250); +const LOG_STREAM_READ_CHUNK_SIZE: usize = 64 * 1024; + /// Create a new execution (manual execution) /// /// This endpoint allows directly executing an action without a trigger or rule. @@ -925,6 +933,398 @@ pub async fn stream_execution_updates( Ok(Sse::new(filtered_stream).keep_alive(KeepAlive::default())) } +#[derive(serde::Deserialize)] +pub struct StreamExecutionLogParams { + pub token: Option, + pub offset: Option, +} + +#[derive(Clone, Copy)] +enum ExecutionLogStream { + Stdout, + Stderr, +} + +impl ExecutionLogStream { + fn parse(name: &str) -> Result { + match name { + "stdout" => Ok(Self::Stdout), + "stderr" => Ok(Self::Stderr), + _ => Err(ApiError::BadRequest(format!( + "Unsupported log stream '{}'. Expected 'stdout' or 'stderr'.", + name + ))), + } + } + + fn file_name(self) -> &'static str { + match self { + Self::Stdout => "stdout.log", + Self::Stderr => "stderr.log", + } + } +} + +enum ExecutionLogTailState { + WaitingForFile { + full_path: std::path::PathBuf, + execution_id: i64, + }, + SendInitial { + full_path: std::path::PathBuf, + execution_id: i64, + offset: u64, + pending_utf8: Vec, + }, + Tail { + full_path: std::path::PathBuf, + execution_id: i64, + offset: u64, + idle_polls: u32, + pending_utf8: Vec, + }, + Finished, +} + +/// Stream stdout/stderr for an execution as SSE. +/// +/// This tails the worker's live log files directly from the shared artifacts +/// volume. The file may not exist yet when the worker has not emitted any +/// output, so the stream waits briefly for it to appear. +#[utoipa::path( + get, + path = "/api/v1/executions/{id}/logs/{stream}/stream", + tag = "executions", + params( + ("id" = i64, Path, description = "Execution ID"), + ("stream" = String, Path, description = "Log stream name: stdout or stderr"), + ("token" = String, Query, description = "JWT access token for authentication"), + ), + responses( + (status = 200, description = "SSE stream of execution log content", content_type = "text/event-stream"), + (status = 401, description = "Unauthorized"), + (status = 404, description = "Execution not found"), + ), +)] +pub async fn stream_execution_log( + State(state): State>, + headers: HeaderMap, + Path((id, stream_name)): Path<(i64, String)>, + Query(params): Query, + user: Result, +) -> Result>>, ApiError> { + let authenticated_user = + authenticate_execution_log_stream_user(&state, &headers, user, params.token.as_deref())?; + validate_execution_log_stream_user(&authenticated_user, id)?; + + let execution = ExecutionRepository::find_by_id(&state.db, id) + .await? + .ok_or_else(|| ApiError::NotFound(format!("Execution with ID {} not found", id)))?; + authorize_execution_log_stream(&state, &authenticated_user, &execution).await?; + + let stream_name = ExecutionLogStream::parse(&stream_name)?; + let full_path = std::path::PathBuf::from(&state.config.artifacts_dir) + .join(format!("execution_{}", id)) + .join(stream_name.file_name()); + let db = state.db.clone(); + + let initial_state = ExecutionLogTailState::WaitingForFile { + full_path, + execution_id: id, + }; + let start_offset = params.offset.unwrap_or(0); + + let stream = futures::stream::unfold(initial_state, move |state| { + let db = db.clone(); + async move { + match state { + ExecutionLogTailState::Finished => None, + ExecutionLogTailState::WaitingForFile { + full_path, + execution_id, + } => { + if full_path.exists() { + Some(( + Ok(Event::default().event("waiting").data("Log file found")), + ExecutionLogTailState::SendInitial { + full_path, + execution_id, + offset: start_offset, + pending_utf8: Vec::new(), + }, + )) + } else if execution_log_execution_terminal(&db, execution_id).await { + Some(( + Ok(Event::default().event("done").data("")), + ExecutionLogTailState::Finished, + )) + } else { + tokio::time::sleep(LOG_STREAM_POLL_INTERVAL).await; + Some(( + Ok(Event::default() + .event("waiting") + .data("Waiting for log output")), + ExecutionLogTailState::WaitingForFile { + full_path, + execution_id, + }, + )) + } + } + ExecutionLogTailState::SendInitial { + full_path, + execution_id, + offset, + pending_utf8, + } => { + let pending_utf8_on_empty = pending_utf8.clone(); + match read_log_chunk( + &full_path, + offset, + LOG_STREAM_READ_CHUNK_SIZE, + pending_utf8, + ) + .await + { + Some((content, new_offset, pending_utf8)) => Some(( + Ok(Event::default() + .id(new_offset.to_string()) + .event("content") + .data(content)), + ExecutionLogTailState::SendInitial { + full_path, + execution_id, + offset: new_offset, + pending_utf8, + }, + )), + None => Some(( + Ok(Event::default().comment("initial-catchup-complete")), + ExecutionLogTailState::Tail { + full_path, + execution_id, + offset, + idle_polls: 0, + pending_utf8: pending_utf8_on_empty, + }, + )), + } + } + ExecutionLogTailState::Tail { + full_path, + execution_id, + offset, + idle_polls, + pending_utf8, + } => { + let pending_utf8_on_empty = pending_utf8.clone(); + match read_log_chunk( + &full_path, + offset, + LOG_STREAM_READ_CHUNK_SIZE, + pending_utf8, + ) + .await + { + Some((append, new_offset, pending_utf8)) => Some(( + Ok(Event::default() + .id(new_offset.to_string()) + .event("append") + .data(append)), + ExecutionLogTailState::Tail { + full_path, + execution_id, + offset: new_offset, + idle_polls: 0, + pending_utf8, + }, + )), + None => { + let terminal = + execution_log_execution_terminal(&db, execution_id).await; + if terminal && idle_polls >= 2 { + Some(( + Ok(Event::default().event("done").data("Execution complete")), + ExecutionLogTailState::Finished, + )) + } else { + tokio::time::sleep(LOG_STREAM_POLL_INTERVAL).await; + Some(( + Ok(Event::default() + .event("waiting") + .data("Waiting for log output")), + ExecutionLogTailState::Tail { + full_path, + execution_id, + offset, + idle_polls: idle_polls + 1, + pending_utf8: pending_utf8_on_empty, + }, + )) + } + } + } + } + } + } + }); + + Ok(Sse::new(stream).keep_alive(KeepAlive::default())) +} + +async fn read_log_chunk( + path: &std::path::Path, + offset: u64, + max_bytes: usize, + mut pending_utf8: Vec, +) -> Option<(String, u64, Vec)> { + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + let mut file = tokio::fs::File::open(path).await.ok()?; + let metadata = file.metadata().await.ok()?; + if metadata.len() <= offset { + return None; + } + + file.seek(std::io::SeekFrom::Start(offset)).await.ok()?; + let bytes_to_read = ((metadata.len() - offset) as usize).min(max_bytes); + let mut buf = vec![0u8; bytes_to_read]; + let read = file.read(&mut buf).await.ok()?; + buf.truncate(read); + if buf.is_empty() { + return None; + } + + pending_utf8.extend_from_slice(&buf); + let (content, pending_utf8) = decode_utf8_chunk(pending_utf8); + + Some((content, offset + read as u64, pending_utf8)) +} + +async fn execution_log_execution_terminal(db: &sqlx::PgPool, execution_id: i64) -> bool { + match ExecutionRepository::find_by_id(db, execution_id).await { + Ok(Some(execution)) => matches!( + execution.status, + ExecutionStatus::Completed + | ExecutionStatus::Failed + | ExecutionStatus::Cancelled + | ExecutionStatus::Timeout + | ExecutionStatus::Abandoned + ), + _ => true, + } +} + +fn decode_utf8_chunk(mut bytes: Vec) -> (String, Vec) { + match std::str::from_utf8(&bytes) { + Ok(valid) => (valid.to_string(), Vec::new()), + Err(err) if err.error_len().is_none() => { + let pending = bytes.split_off(err.valid_up_to()); + (String::from_utf8_lossy(&bytes).into_owned(), pending) + } + Err(_) => (String::from_utf8_lossy(&bytes).into_owned(), Vec::new()), + } +} + +async fn authorize_execution_log_stream( + state: &Arc, + user: &AuthenticatedUser, + execution: &attune_common::models::Execution, +) -> Result<(), ApiError> { + if user.claims.token_type != TokenType::Access { + return Ok(()); + } + + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_id = Some(execution.id); + ctx.target_ref = Some(execution.action_ref.clone()); + + authz + .authorize( + user, + AuthorizationCheck { + resource: Resource::Executions, + action: Action::Read, + context: ctx, + }, + ) + .await +} + +fn authenticate_execution_log_stream_user( + state: &Arc, + headers: &HeaderMap, + user: Result, + query_token: Option<&str>, +) -> Result { + match user { + Ok(RequireAuth(user)) => Ok(user), + Err(_) => { + if let Some(user) = crate::auth::oidc::cookie_authenticated_user(headers, state)? { + return Ok(user); + } + + let token = query_token.ok_or(ApiError::Unauthorized( + "Missing authentication token".to_string(), + ))?; + authenticate_execution_log_stream_query_token(token, &state.jwt_config) + } + } +} + +fn authenticate_execution_log_stream_query_token( + token: &str, + jwt_config: &JwtConfig, +) -> Result { + let claims = validate_token(token, jwt_config) + .map_err(|_| ApiError::Unauthorized("Invalid authentication token".to_string()))?; + + Ok(AuthenticatedUser { claims }) +} + +fn validate_execution_log_stream_user( + user: &AuthenticatedUser, + execution_id: i64, +) -> Result<(), ApiError> { + let claims = &user.claims; + + match claims.token_type { + TokenType::Access => Ok(()), + TokenType::Execution => validate_execution_token_scope(claims, execution_id), + TokenType::Sensor | TokenType::Refresh => Err(ApiError::Unauthorized( + "Invalid authentication token".to_string(), + )), + } +} + +fn validate_execution_token_scope(claims: &Claims, execution_id: i64) -> Result<(), ApiError> { + if claims.scope.as_deref() != Some("execution") { + return Err(ApiError::Unauthorized( + "Invalid authentication token".to_string(), + )); + } + + let token_execution_id = claims + .metadata + .as_ref() + .and_then(|metadata| metadata.get("execution_id")) + .and_then(|value| value.as_i64()) + .ok_or_else(|| ApiError::Unauthorized("Invalid authentication token".to_string()))?; + + if token_execution_id != execution_id { + return Err(ApiError::Forbidden(format!( + "Execution token is not valid for execution {}", + execution_id + ))); + } + + Ok(()) +} + #[derive(serde::Deserialize)] pub struct StreamExecutionParams { pub execution_id: Option, @@ -937,6 +1337,10 @@ pub fn routes() -> Router> { .route("/executions/execute", axum::routing::post(create_execution)) .route("/executions/stats", get(get_execution_stats)) .route("/executions/stream", get(stream_execution_updates)) + .route( + "/executions/{id}/logs/{stream}/stream", + get(stream_execution_log), + ) .route("/executions/{id}", get(get_execution)) .route( "/executions/{id}/cancel", @@ -955,10 +1359,26 @@ pub fn routes() -> Router> { #[cfg(test)] mod tests { use super::*; + use attune_common::auth::jwt::generate_execution_token; #[test] fn test_execution_routes_structure() { // Just verify the router can be constructed let _router = routes(); } + + #[test] + fn execution_token_scope_must_match_requested_execution() { + let jwt_config = JwtConfig { + secret: "test_secret_key_for_testing".to_string(), + access_token_expiration: 3600, + refresh_token_expiration: 604800, + }; + + let token = generate_execution_token(42, 123, "core.echo", &jwt_config, None).unwrap(); + + let user = authenticate_execution_log_stream_query_token(&token, &jwt_config).unwrap(); + let err = validate_execution_log_stream_user(&user, 456).unwrap_err(); + assert!(matches!(err, ApiError::Forbidden(_))); + } } diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 6a04984..89d5e1c 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -23,6 +23,7 @@ clap = { workspace = true, features = ["derive", "env", "string"] } # HTTP client reqwest = { workspace = true, features = ["multipart", "stream"] } +reqwest-eventsource = { workspace = true } # Serialization serde = { workspace = true } diff --git a/crates/cli/src/client.rs b/crates/cli/src/client.rs index 930cb7b..c7a55ae 100644 --- a/crates/cli/src/client.rs +++ b/crates/cli/src/client.rs @@ -21,6 +21,11 @@ pub struct ApiResponse { pub data: T, } +#[derive(Debug, serde::Deserialize)] +struct PaginatedResponse { + data: Vec, +} + /// API error response #[derive(Debug, serde::Deserialize)] pub struct ApiError { @@ -55,6 +60,10 @@ impl ApiClient { &self.base_url } + pub fn auth_token(&self) -> Option<&str> { + self.auth_token.as_deref() + } + #[cfg(test)] pub fn new(base_url: String, auth_token: Option) -> Self { let client = HttpClient::builder() @@ -255,6 +264,31 @@ impl ApiClient { } } + async fn handle_paginated_response( + &self, + response: reqwest::Response, + ) -> Result> { + let status = response.status(); + if status.is_success() { + let paginated: PaginatedResponse = response + .json() + .await + .context("Failed to parse paginated API response")?; + Ok(paginated.data) + } else { + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + + if let Ok(api_error) = serde_json::from_str::(&error_text) { + anyhow::bail!("API error ({}): {}", status, api_error.error); + } else { + anyhow::bail!("API error ({}): {}", status, error_text); + } + } + } + /// Handle a response where we only care about success/failure, not a body. async fn handle_empty_response(&self, response: reqwest::Response) -> Result<()> { let status = response.status(); @@ -281,6 +315,25 @@ impl ApiClient { self.execute_json::(Method::GET, path, None).await } + pub async fn get_paginated(&mut self, path: &str) -> Result> { + let req = self.build_request(Method::GET, path); + let response = req.send().await.context("Failed to send request to API")?; + + if response.status() == StatusCode::UNAUTHORIZED + && self.refresh_token.is_some() + && self.refresh_auth_token().await? + { + let req = self.build_request(Method::GET, path); + let response = req + .send() + .await + .context("Failed to send request to API (retry)")?; + return self.handle_paginated_response(response).await; + } + + self.handle_paginated_response(response).await + } + /// GET request with query parameters (query string must be in path) /// /// Part of REST client API - reserved for future advanced filtering/search features. diff --git a/crates/cli/src/commands/action.rs b/crates/cli/src/commands/action.rs index f121816..c08af3c 100644 --- a/crates/cli/src/commands/action.rs +++ b/crates/cli/src/commands/action.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; use crate::client::ApiClient; use crate::config::CliConfig; use crate::output::{self, OutputFormat}; -use crate::wait::{wait_for_execution, WaitOptions}; +use crate::wait::{extract_stdout, spawn_execution_output_watch, wait_for_execution, WaitOptions}; #[derive(Subcommand)] pub enum ActionCommands { @@ -493,6 +493,15 @@ async fn handle_execute( } let verbose = matches!(output_format, OutputFormat::Table); + let watch_task = if verbose { + Some(spawn_execution_output_watch( + ApiClient::from_config(&config, api_url), + execution.id, + verbose, + )) + } else { + None + }; let summary = wait_for_execution(WaitOptions { execution_id: execution.id, timeout_secs: timeout, @@ -501,6 +510,13 @@ async fn handle_execute( verbose, }) .await?; + let suppress_final_stdout = watch_task + .as_ref() + .is_some_and(|task| task.delivered_output() && task.root_stdout_completed()); + + if let Some(task) = watch_task { + let _ = tokio::time::timeout(tokio::time::Duration::from_secs(2), task.handle).await; + } match output_format { OutputFormat::Json | OutputFormat::Yaml => { @@ -517,7 +533,20 @@ async fn handle_execute( ("Updated", output::format_timestamp(&summary.updated)), ]); - if let Some(result) = summary.result { + let stdout = extract_stdout(&summary.result); + if !suppress_final_stdout { + if let Some(stdout) = &stdout { + output::print_section("Stdout"); + println!("{}", stdout); + } + } + + if let Some(mut result) = summary.result { + if stdout.is_some() { + if let Some(obj) = result.as_object_mut() { + obj.remove("stdout"); + } + } if !result.is_null() { output::print_section("Result"); println!("{}", serde_json::to_string_pretty(&result)?); diff --git a/crates/cli/src/wait.rs b/crates/cli/src/wait.rs index 7e05f9e..d223b99 100644 --- a/crates/cli/src/wait.rs +++ b/crates/cli/src/wait.rs @@ -11,7 +11,13 @@ use anyhow::Result; use futures::{SinkExt, StreamExt}; +use reqwest_eventsource::{Event as SseEvent, EventSource}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, +}; use std::time::{Duration, Instant}; use tokio_tungstenite::{connect_async, tungstenite::Message}; @@ -54,6 +60,22 @@ pub struct WaitOptions<'a> { pub verbose: bool, } +pub struct OutputWatchTask { + pub handle: tokio::task::JoinHandle<()>, + delivered_output: Arc, + root_stdout_completed: Arc, +} + +impl OutputWatchTask { + pub fn delivered_output(&self) -> bool { + self.delivered_output.load(Ordering::Relaxed) + } + + pub fn root_stdout_completed(&self) -> bool { + self.root_stdout_completed.load(Ordering::Relaxed) + } +} + // ── notifier WebSocket messages (mirrors websocket_server.rs) ──────────────── #[derive(Debug, Serialize)] @@ -102,6 +124,41 @@ struct RestExecution { updated: String, } +#[derive(Debug, Clone, Deserialize)] +struct WorkflowTaskMetadata { + task_name: String, + #[serde(default)] + task_index: Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct ExecutionListItem { + id: i64, + action_ref: String, + status: String, + #[serde(default)] + workflow_task: Option, +} + +#[derive(Debug)] +struct ChildWatchState { + label: String, + status: String, + announced_terminal: bool, + stream_handles: Vec, +} + +struct RootWatchState { + stream_handles: Vec, +} + +#[derive(Debug)] +struct StreamWatchHandle { + stream_name: &'static str, + offset: Arc, + handle: tokio::task::JoinHandle<()>, +} + impl From for ExecutionSummary { fn from(e: RestExecution) -> Self { Self { @@ -177,6 +234,269 @@ pub async fn wait_for_execution(opts: WaitOptions<'_>) -> Result OutputWatchTask { + let delivered_output = Arc::new(AtomicBool::new(false)); + let root_stdout_completed = Arc::new(AtomicBool::new(false)); + let delivered_output_for_task = delivered_output.clone(); + let root_stdout_completed_for_task = root_stdout_completed.clone(); + let handle = tokio::spawn(async move { + if let Err(err) = watch_execution_output( + &mut client, + execution_id, + verbose, + delivered_output_for_task, + root_stdout_completed_for_task, + ) + .await + { + if verbose { + eprintln!(" [watch] {}", err); + } + } + }); + + OutputWatchTask { + handle, + delivered_output, + root_stdout_completed, + } +} + +async fn watch_execution_output( + client: &mut ApiClient, + execution_id: i64, + verbose: bool, + delivered_output: Arc, + root_stdout_completed: Arc, +) -> Result<()> { + let base_url = client.base_url().to_string(); + let mut root_watch: Option = None; + let mut children: HashMap = HashMap::new(); + + loop { + let execution: RestExecution = client.get(&format!("/executions/{}", execution_id)).await?; + + if root_watch + .as_ref() + .is_none_or(|state| streams_need_restart(&state.stream_handles)) + { + if let Some(token) = client.auth_token().map(str::to_string) { + match root_watch.as_mut() { + Some(state) => restart_finished_streams( + &mut state.stream_handles, + &base_url, + token, + execution_id, + None, + verbose, + delivered_output.clone(), + Some(root_stdout_completed.clone()), + ), + None => { + root_watch = Some(RootWatchState { + stream_handles: spawn_execution_log_streams( + &base_url, + token, + execution_id, + None, + verbose, + delivered_output.clone(), + Some(root_stdout_completed.clone()), + ), + }); + } + } + } + } + + let child_items = list_child_executions(client, execution_id) + .await + .unwrap_or_default(); + + for child in child_items { + let label = format_task_label(&child.workflow_task, &child.action_ref, child.id); + let entry = children.entry(child.id).or_insert_with(|| { + if verbose { + eprintln!(" [{}] started ({})", label, child.action_ref); + } + let stream_handles = client + .auth_token() + .map(str::to_string) + .map(|token| { + spawn_execution_log_streams( + &base_url, + token, + child.id, + Some(label.clone()), + verbose, + delivered_output.clone(), + None, + ) + }) + .unwrap_or_default(); + ChildWatchState { + label, + status: child.status.clone(), + announced_terminal: false, + stream_handles, + } + }); + + if entry.status != child.status { + entry.status = child.status.clone(); + } + + let child_is_terminal = is_terminal(&entry.status); + if !child_is_terminal && streams_need_restart(&entry.stream_handles) { + if let Some(token) = client.auth_token().map(str::to_string) { + restart_finished_streams( + &mut entry.stream_handles, + &base_url, + token, + child.id, + Some(entry.label.clone()), + verbose, + delivered_output.clone(), + None, + ); + } + } + + if !entry.announced_terminal && is_terminal(&child.status) { + entry.announced_terminal = true; + if verbose { + eprintln!(" [{}] {}", entry.label, child.status); + } + } + } + + if is_terminal(&execution.status) { + break; + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } + + if let Some(root_watch) = root_watch { + wait_for_stream_handles(root_watch.stream_handles).await; + } + + for child in children.into_values() { + wait_for_stream_handles(child.stream_handles).await; + } + + Ok(()) +} + +fn spawn_execution_log_streams( + base_url: &str, + token: String, + execution_id: i64, + prefix: Option, + verbose: bool, + delivered_output: Arc, + root_stdout_completed: Option>, +) -> Vec { + ["stdout", "stderr"] + .into_iter() + .map(|stream_name| { + let offset = Arc::new(AtomicU64::new(0)); + let completion_flag = if stream_name == "stdout" { + root_stdout_completed.clone() + } else { + None + }; + StreamWatchHandle { + stream_name, + handle: tokio::spawn(stream_execution_log( + base_url.to_string(), + token.clone(), + execution_id, + stream_name, + prefix.clone(), + verbose, + offset.clone(), + delivered_output.clone(), + completion_flag, + )), + offset, + } + }) + .collect() +} + +fn streams_need_restart(handles: &[StreamWatchHandle]) -> bool { + handles.is_empty() || handles.iter().any(|handle| handle.handle.is_finished()) +} + +fn restart_finished_streams( + handles: &mut Vec, + base_url: &str, + token: String, + execution_id: i64, + prefix: Option, + verbose: bool, + delivered_output: Arc, + root_stdout_completed: Option>, +) { + for stream in handles.iter_mut() { + if stream.handle.is_finished() { + let offset = stream.offset.clone(); + let completion_flag = if stream.stream_name == "stdout" { + root_stdout_completed.clone() + } else { + None + }; + stream.handle = tokio::spawn(stream_execution_log( + base_url.to_string(), + token.clone(), + execution_id, + stream.stream_name, + prefix.clone(), + verbose, + offset, + delivered_output.clone(), + completion_flag, + )); + } + } +} + +async fn wait_for_stream_handles(handles: Vec) { + for handle in handles { + let _ = handle.handle.await; + } +} + +async fn list_child_executions( + client: &mut ApiClient, + execution_id: i64, +) -> Result> { + const PER_PAGE: u32 = 100; + + let mut page = 1; + let mut all_children = Vec::new(); + + loop { + let path = format!("/executions?parent={execution_id}&page={page}&per_page={PER_PAGE}"); + let mut page_items: Vec = client.get_paginated(&path).await?; + let page_len = page_items.len(); + all_children.append(&mut page_items); + + if page_len < PER_PAGE as usize { + break; + } + + page += 1; + } + + Ok(all_children) +} + // ── WebSocket path ──────────────────────────────────────────────────────────── async fn wait_via_websocket( @@ -491,6 +811,143 @@ fn derive_notifier_url(api_url: &str) -> Option { Some(format!("{}://{}:8081", ws_scheme, host)) } +pub fn extract_stdout(result: &Option) -> Option { + result + .as_ref() + .and_then(|value| value.get("stdout")) + .and_then(|stdout| stdout.as_str()) + .filter(|stdout| !stdout.is_empty()) + .map(ToOwned::to_owned) +} + +fn format_task_label( + workflow_task: &Option, + action_ref: &str, + execution_id: i64, +) -> String { + if let Some(workflow_task) = workflow_task { + if let Some(index) = workflow_task.task_index { + format!("{}[{}]", workflow_task.task_name, index) + } else { + workflow_task.task_name.clone() + } + } else { + format!("{}#{}", action_ref, execution_id) + } +} + +async fn stream_execution_log( + base_url: String, + token: String, + execution_id: i64, + stream_name: &'static str, + prefix: Option, + verbose: bool, + offset: Arc, + delivered_output: Arc, + root_stdout_completed: Option>, +) { + let mut stream_url = match url::Url::parse(&format!( + "{}/api/v1/executions/{}/logs/{}/stream", + base_url.trim_end_matches('/'), + execution_id, + stream_name + )) { + Ok(url) => url, + Err(err) => { + if verbose { + eprintln!(" [watch] failed to build stream URL: {}", err); + } + return; + } + }; + let current_offset = offset.load(Ordering::Relaxed).to_string(); + stream_url + .query_pairs_mut() + .append_pair("token", &token) + .append_pair("offset", ¤t_offset); + + let mut event_source = EventSource::get(stream_url); + let mut carry = String::new(); + + while let Some(event) = event_source.next().await { + match event { + Ok(SseEvent::Open) => {} + Ok(SseEvent::Message(message)) => match message.event.as_str() { + "content" | "append" => { + if let Ok(server_offset) = message.id.parse::() { + offset.store(server_offset, Ordering::Relaxed); + } + if !message.data.is_empty() { + delivered_output.store(true, Ordering::Relaxed); + } + print_stream_chunk(prefix.as_deref(), &message.data, &mut carry); + } + "done" => { + if let Some(flag) = &root_stdout_completed { + flag.store(true, Ordering::Relaxed); + } + flush_stream_chunk(prefix.as_deref(), &mut carry); + break; + } + "error" => { + if verbose && !message.data.is_empty() { + eprintln!(" [watch] {}", message.data); + } + break; + } + _ => {} + }, + Err(err) => { + flush_stream_chunk(prefix.as_deref(), &mut carry); + if verbose { + eprintln!( + " [watch] stream error for execution {}: {}", + execution_id, err + ); + } + break; + } + } + } + + flush_stream_chunk(prefix.as_deref(), &mut carry); + let _ = event_source.close(); +} + +fn print_stream_chunk(prefix: Option<&str>, chunk: &str, carry: &mut String) { + carry.push_str(chunk); + + while let Some(idx) = carry.find('\n') { + let mut line = carry.drain(..=idx).collect::(); + if line.ends_with('\n') { + line.pop(); + } + if line.ends_with('\r') { + line.pop(); + } + + if let Some(prefix) = prefix { + eprintln!("[{}] {}", prefix, line); + } else { + eprintln!("{}", line); + } + } +} + +fn flush_stream_chunk(prefix: Option<&str>, carry: &mut String) { + if carry.is_empty() { + return; + } + + if let Some(prefix) = prefix { + eprintln!("[{}] {}", prefix, carry); + } else { + eprintln!("{}", carry); + } + carry.clear(); +} + #[cfg(test)] mod tests { use super::*; @@ -553,4 +1010,26 @@ mod tests { assert_eq!(summary.status, "failed"); assert_eq!(summary.action_ref, ""); } + + #[test] + fn test_extract_stdout() { + let result = Some(serde_json::json!({ + "stdout": "hello world", + "stderr_log": "/tmp/stderr.log" + })); + assert_eq!(extract_stdout(&result).as_deref(), Some("hello world")); + } + + #[test] + fn test_format_task_label() { + let workflow_task = Some(WorkflowTaskMetadata { + task_name: "build".to_string(), + task_index: Some(2), + }); + assert_eq!( + format_task_label(&workflow_task, "core.echo", 42), + "build[2]" + ); + assert_eq!(format_task_label(&None, "core.echo", 42), "core.echo#42"); + } } diff --git a/crates/worker/src/executor.rs b/crates/worker/src/executor.rs index 6ee9888..3561009 100644 --- a/crates/worker/src/executor.rs +++ b/crates/worker/src/executor.rs @@ -543,6 +543,16 @@ impl ActionExecutor { selected_runtime_version, max_stdout_bytes: self.max_stdout_bytes, max_stderr_bytes: self.max_stderr_bytes, + stdout_log_path: Some( + self.artifact_manager + .get_execution_dir(execution.id) + .join("stdout.log"), + ), + stderr_log_path: Some( + self.artifact_manager + .get_execution_dir(execution.id) + .join("stderr.log"), + ), parameter_delivery: action.parameter_delivery, parameter_format: action.parameter_format, output_format: action.output_format, diff --git a/crates/worker/src/runtime/log_writer.rs b/crates/worker/src/runtime/log_writer.rs index 1eb0894..bee1187 100644 --- a/crates/worker/src/runtime/log_writer.rs +++ b/crates/worker/src/runtime/log_writer.rs @@ -2,9 +2,10 @@ //! //! Provides bounded log writers that limit output size to prevent OOM issues. +use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::AsyncWrite; +use tokio::io::{AsyncWrite, AsyncWriteExt}; const TRUNCATION_NOTICE_STDOUT: &str = "\n\n[OUTPUT TRUNCATED: stdout exceeded size limit]\n"; const TRUNCATION_NOTICE_STDERR: &str = "\n\n[OUTPUT TRUNCATED: stderr exceeded size limit]\n"; @@ -76,6 +77,15 @@ pub struct BoundedLogWriter { truncation_notice: &'static str, } +/// A file-backed writer that applies the same truncation policy as `BoundedLogWriter`. +pub struct BoundedLogFileWriter { + file: tokio::fs::File, + max_bytes: usize, + truncated: bool, + data_bytes_written: usize, + truncation_notice: &'static str, +} + impl BoundedLogWriter { /// Create a new bounded log writer for stdout pub fn new_stdout(max_bytes: usize) -> Self { @@ -166,6 +176,76 @@ impl BoundedLogWriter { } } +impl BoundedLogFileWriter { + pub async fn new_stdout(path: &Path, max_bytes: usize) -> std::io::Result { + Self::create(path, max_bytes, TRUNCATION_NOTICE_STDOUT).await + } + + pub async fn new_stderr(path: &Path, max_bytes: usize) -> std::io::Result { + Self::create(path, max_bytes, TRUNCATION_NOTICE_STDERR).await + } + + async fn create( + path: &Path, + max_bytes: usize, + truncation_notice: &'static str, + ) -> std::io::Result { + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + let file = tokio::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(path) + .await?; + + Ok(Self { + file, + max_bytes, + truncated: false, + data_bytes_written: 0, + truncation_notice, + }) + } + + pub async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { + if self.truncated { + return Ok(()); + } + + let effective_limit = self.max_bytes.saturating_sub(NOTICE_RESERVE_BYTES); + let remaining_space = effective_limit.saturating_sub(self.data_bytes_written); + + if remaining_space == 0 { + self.add_truncation_notice().await?; + return Ok(()); + } + + let bytes_to_write = std::cmp::min(buf.len(), remaining_space); + if bytes_to_write > 0 { + self.file.write_all(&buf[..bytes_to_write]).await?; + self.data_bytes_written += bytes_to_write; + } + + if bytes_to_write < buf.len() { + self.add_truncation_notice().await?; + } + + self.file.flush().await + } + + async fn add_truncation_notice(&mut self) -> std::io::Result<()> { + if self.truncated { + return Ok(()); + } + + self.truncated = true; + self.file.write_all(self.truncation_notice.as_bytes()).await + } +} + impl AsyncWrite for BoundedLogWriter { fn poll_write( mut self: Pin<&mut Self>, diff --git a/crates/worker/src/runtime/mod.rs b/crates/worker/src/runtime/mod.rs index b9e96c0..fc1beae 100644 --- a/crates/worker/src/runtime/mod.rs +++ b/crates/worker/src/runtime/mod.rs @@ -48,7 +48,7 @@ pub use dependency::{ DependencyError, DependencyManager, DependencyManagerRegistry, DependencyResult, DependencySpec, EnvironmentInfo, }; -pub use log_writer::{BoundedLogResult, BoundedLogWriter}; +pub use log_writer::{BoundedLogFileWriter, BoundedLogResult, BoundedLogWriter}; pub use parameter_passing::{ParameterDeliveryConfig, PreparedParameters}; // Re-export parameter types from common @@ -148,6 +148,12 @@ pub struct ExecutionContext { /// Maximum stderr size in bytes (for log truncation) pub max_stderr_bytes: usize, + /// Optional live stdout log path for incremental writes during execution. + pub stdout_log_path: Option, + + /// Optional live stderr log path for incremental writes during execution. + pub stderr_log_path: Option, + /// How parameters should be delivered to the action pub parameter_delivery: ParameterDelivery, @@ -185,6 +191,8 @@ impl ExecutionContext { selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), diff --git a/crates/worker/src/runtime/native.rs b/crates/worker/src/runtime/native.rs index d173629..f8adb8f 100644 --- a/crates/worker/src/runtime/native.rs +++ b/crates/worker/src/runtime/native.rs @@ -5,10 +5,11 @@ use super::{ parameter_passing::{self, ParameterDeliveryConfig}, - BoundedLogWriter, ExecutionContext, ExecutionResult, Runtime, RuntimeError, RuntimeResult, + BoundedLogFileWriter, BoundedLogWriter, ExecutionContext, ExecutionResult, Runtime, + RuntimeError, RuntimeResult, }; use async_trait::async_trait; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process::Stdio; use std::time::Instant; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; @@ -45,6 +46,8 @@ impl NativeRuntime { timeout: Option, max_stdout_bytes: usize, max_stderr_bytes: usize, + stdout_log_path: Option<&Path>, + stderr_log_path: Option<&Path>, ) -> RuntimeResult { let start = Instant::now(); @@ -131,6 +134,8 @@ impl NativeRuntime { let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes); let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes); + let mut stdout_file = open_live_log_file(stdout_log_path, max_stdout_bytes, true).await?; + let mut stderr_file = open_live_log_file(stderr_log_path, max_stderr_bytes, false).await?; // Create buffered readers let mut stdout_reader = BufReader::new(stdout_handle); @@ -147,6 +152,9 @@ impl NativeRuntime { if stdout_writer.write_all(&line).await.is_err() { break; } + if let Some(file) = stdout_file.as_mut() { + let _ = file.write_all(&line).await; + } } Err(_) => break, } @@ -164,6 +172,9 @@ impl NativeRuntime { if stderr_writer.write_all(&line).await.is_err() { break; } + if let Some(file) = stderr_file.as_mut() { + let _ = file.write_all(&line).await; + } } Err(_) => break, } @@ -352,6 +363,8 @@ impl Runtime for NativeRuntime { context.timeout, context.max_stdout_bytes, context.max_stderr_bytes, + context.stdout_log_path.as_deref(), + context.stderr_log_path.as_deref(), ) .await } @@ -401,6 +414,23 @@ impl Runtime for NativeRuntime { } } +async fn open_live_log_file( + path: Option<&Path>, + max_bytes: usize, + is_stdout: bool, +) -> std::io::Result> { + let Some(path) = path else { + return Ok(None); + }; + + let writer = if is_stdout { + BoundedLogFileWriter::new_stdout(path, max_bytes).await? + } else { + BoundedLogFileWriter::new_stderr(path, max_bytes).await? + }; + Ok(Some(writer)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/worker/src/runtime/process.rs b/crates/worker/src/runtime/process.rs index 01b5852..ceada55 100644 --- a/crates/worker/src/runtime/process.rs +++ b/crates/worker/src/runtime/process.rs @@ -962,6 +962,8 @@ impl Runtime for ProcessRuntime { context.max_stderr_bytes, context.output_format, context.cancel_token.clone(), + context.stdout_log_path.as_deref(), + context.stderr_log_path.as_deref(), ) .await; diff --git a/crates/worker/src/runtime/process_executor.rs b/crates/worker/src/runtime/process_executor.rs index 9f65405..89cbdb3 100644 --- a/crates/worker/src/runtime/process_executor.rs +++ b/crates/worker/src/runtime/process_executor.rs @@ -12,10 +12,10 @@ //! 1. SIGTERM is sent to the process immediately //! 2. After a 5-second grace period, SIGKILL is sent as a last resort -use super::{BoundedLogWriter, ExecutionResult, OutputFormat, RuntimeResult}; +use super::{BoundedLogFileWriter, BoundedLogWriter, ExecutionResult, OutputFormat, RuntimeResult}; use std::collections::HashMap; use std::io; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::time::Instant; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; @@ -59,6 +59,8 @@ pub async fn execute_streaming( max_stderr_bytes, output_format, None, + None, + None, ) .await } @@ -93,6 +95,8 @@ pub async fn execute_streaming_cancellable( max_stderr_bytes: usize, output_format: OutputFormat, cancel_token: Option, + stdout_log_path: Option<&Path>, + stderr_log_path: Option<&Path>, ) -> RuntimeResult { let start = Instant::now(); @@ -130,6 +134,8 @@ pub async fn execute_streaming_cancellable( // Create bounded writers let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes); let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes); + let mut stdout_file = open_live_log_file(stdout_log_path, max_stdout_bytes, true).await?; + let mut stderr_file = open_live_log_file(stderr_log_path, max_stderr_bytes, false).await?; // Take stdout and stderr streams let stdout = child.stdout.take().expect("stdout not captured"); @@ -150,6 +156,9 @@ pub async fn execute_streaming_cancellable( if stdout_writer.write_all(&line).await.is_err() { break; } + if let Some(file) = stdout_file.as_mut() { + let _ = file.write_all(&line).await; + } } Err(_) => break, } @@ -167,6 +176,9 @@ pub async fn execute_streaming_cancellable( if stderr_writer.write_all(&line).await.is_err() { break; } + if let Some(file) = stderr_file.as_mut() { + let _ = file.write_all(&line).await; + } } Err(_) => break, } @@ -351,6 +363,24 @@ pub async fn execute_streaming_cancellable( }) } +async fn open_live_log_file( + path: Option<&Path>, + max_bytes: usize, + is_stdout: bool, +) -> io::Result> { + let Some(path) = path else { + return Ok(None); + }; + + let path: PathBuf = path.to_path_buf(); + let writer = if is_stdout { + BoundedLogFileWriter::new_stdout(&path, max_bytes).await? + } else { + BoundedLogFileWriter::new_stderr(&path, max_bytes).await? + }; + Ok(Some(writer)) +} + /// Parse stdout content according to the specified output format. fn configure_child_process(cmd: &mut Command) -> io::Result<()> { #[cfg(unix)] diff --git a/crates/worker/src/runtime/python.rs b/crates/worker/src/runtime/python.rs deleted file mode 100644 index 47c6e87..0000000 --- a/crates/worker/src/runtime/python.rs +++ /dev/null @@ -1,819 +0,0 @@ -//! Python Runtime Implementation -//! -//! Executes Python actions using subprocess execution. - -use super::{ - BoundedLogWriter, DependencyManagerRegistry, DependencySpec, ExecutionContext, ExecutionResult, - OutputFormat, Runtime, RuntimeError, RuntimeResult, -}; -use async_trait::async_trait; -use std::path::PathBuf; -use std::process::Stdio; -use std::sync::Arc; -use std::time::Instant; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::Command; -use tokio::time::timeout; -use tracing::{debug, info, warn}; - -/// Python runtime for executing Python scripts and functions -pub struct PythonRuntime { - /// Python interpreter path (fallback when no venv exists) - python_path: PathBuf, - - /// Base directory for storing action code - work_dir: PathBuf, - - /// Optional dependency manager registry for isolated environments - dependency_manager: Option>, -} - -impl PythonRuntime { - /// Create a new Python runtime - pub fn new() -> Self { - Self { - python_path: PathBuf::from("python3"), - work_dir: PathBuf::from("/tmp/attune/actions"), - dependency_manager: None, - } - } - - /// Create a Python runtime with custom settings - pub fn with_config(python_path: PathBuf, work_dir: PathBuf) -> Self { - Self { - python_path, - work_dir, - dependency_manager: None, - } - } - - /// Create a Python runtime with dependency manager support - pub fn with_dependency_manager( - python_path: PathBuf, - work_dir: PathBuf, - dependency_manager: Arc, - ) -> Self { - Self { - python_path, - work_dir, - dependency_manager: Some(dependency_manager), - } - } - - /// Get the Python executable path to use for a given context - /// - /// If the action has a pack_ref with dependencies, use the venv Python. - /// Otherwise, use the default Python interpreter. - async fn get_python_executable(&self, context: &ExecutionContext) -> RuntimeResult { - // Check if we have a dependency manager and can extract pack_ref - if let Some(ref dep_mgr) = self.dependency_manager { - // Extract pack_ref from action_ref (format: "pack_ref.action_name") - if let Some(pack_ref) = context.action_ref.split('.').next() { - // Try to get the executable path for this pack - match dep_mgr.get_executable_path(pack_ref, "python").await { - Ok(python_path) => { - debug!( - "Using pack-specific Python from venv: {}", - python_path.display() - ); - return Ok(python_path); - } - Err(e) => { - // Venv doesn't exist or failed - this is OK if pack has no dependencies - debug!( - "No venv found for pack {} ({}), using default Python", - pack_ref, e - ); - } - } - } - } - - // Fall back to default Python interpreter - debug!("Using default Python interpreter: {:?}", self.python_path); - Ok(self.python_path.clone()) - } - - /// Generate Python wrapper script that loads parameters and executes the action - fn generate_wrapper_script(&self, context: &ExecutionContext) -> RuntimeResult { - let params_json = serde_json::to_string(&context.parameters)?; - - // Use base64 encoding for code to avoid any quote/escape issues - let code_bytes = context.code.as_deref().unwrap_or("").as_bytes(); - let code_base64 = - base64::Engine::encode(&base64::engine::general_purpose::STANDARD, code_bytes); - - let wrapper = format!( - r#"#!/usr/bin/env python3 -import sys -import json -import traceback -import base64 -from pathlib import Path - -# Global secrets storage (read from stdin, NOT from environment) -_attune_secrets = {{}} - -def get_secret(name): - """ - Get a secret value by name. - - Secrets are passed securely via stdin and are never exposed in - environment variables or process listings. - - Args: - name (str): The name of the secret to retrieve - - Returns: - str: The secret value, or None if not found - """ - return _attune_secrets.get(name) - -def main(): - global _attune_secrets - - try: - # Read secrets from stdin FIRST (before executing action code) - # This prevents secrets from being visible in process environment - secrets_line = sys.stdin.readline().strip() - if secrets_line: - _attune_secrets = json.loads(secrets_line) - - # Parse parameters - parameters = json.loads('''{}''') - - # Decode action code from base64 (avoids quote/escape issues) - action_code = base64.b64decode('{}').decode('utf-8') - - # Execute the code in a controlled namespace - # Include get_secret helper function - namespace = {{ - '__name__': '__main__', - 'parameters': parameters, - 'get_secret': get_secret - }} - exec(action_code, namespace) - - # Look for main function or run function - if '{}' in namespace: - result = namespace['{}'](**parameters) - elif 'run' in namespace: - result = namespace['run'](**parameters) - elif 'main' in namespace: - result = namespace['main'](**parameters) - else: - # No entry point found, return the namespace (only JSON-serializable values) - def is_json_serializable(obj): - """Check if an object is JSON serializable""" - if obj is None: - return True - if isinstance(obj, (bool, int, float, str)): - return True - if isinstance(obj, (list, tuple)): - return all(is_json_serializable(item) for item in obj) - if isinstance(obj, dict): - return all(is_json_serializable(k) and is_json_serializable(v) - for k, v in obj.items()) - return False - - result = {{k: v for k, v in namespace.items() - if not k.startswith('__') and is_json_serializable(v)}} - - # Output result as JSON - if result is not None: - print(json.dumps({{'result': result, 'status': 'success'}})) - else: - print(json.dumps({{'status': 'success'}})) - - sys.exit(0) - - except Exception as e: - error_info = {{ - 'status': 'error', - 'error': str(e), - 'error_type': type(e).__name__, - 'traceback': traceback.format_exc() - }} - print(json.dumps(error_info), file=sys.stderr) - sys.exit(1) - -if __name__ == '__main__': - main() -"#, - params_json, code_base64, context.entry_point, context.entry_point - ); - - Ok(wrapper) - } - - /// Execute with streaming and bounded log collection - async fn execute_with_streaming( - &self, - mut cmd: Command, - secrets: &std::collections::HashMap, - timeout_secs: Option, - max_stdout_bytes: usize, - max_stderr_bytes: usize, - output_format: OutputFormat, - ) -> RuntimeResult { - let start = Instant::now(); - - // Spawn process with piped I/O - let mut child = cmd - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - // Write secrets to stdin - if let Some(mut stdin) = child.stdin.take() { - let secrets_json = serde_json::to_string(secrets)?; - stdin.write_all(secrets_json.as_bytes()).await?; - stdin.write_all(b"\n").await?; - drop(stdin); - } - - // Create bounded writers - let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes); - let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes); - - // Take stdout and stderr streams - let stdout = child.stdout.take().expect("stdout not captured"); - let stderr = child.stderr.take().expect("stderr not captured"); - - // Create buffered readers - let mut stdout_reader = BufReader::new(stdout); - let mut stderr_reader = BufReader::new(stderr); - - // Stream both outputs concurrently - let stdout_task = async { - let mut line = Vec::new(); - loop { - line.clear(); - match stdout_reader.read_until(b'\n', &mut line).await { - Ok(0) => break, // EOF - Ok(_) => { - if stdout_writer.write_all(&line).await.is_err() { - break; - } - } - Err(_) => break, - } - } - stdout_writer - }; - - let stderr_task = async { - let mut line = Vec::new(); - loop { - line.clear(); - match stderr_reader.read_until(b'\n', &mut line).await { - Ok(0) => break, // EOF - Ok(_) => { - if stderr_writer.write_all(&line).await.is_err() { - break; - } - } - Err(_) => break, - } - } - stderr_writer - }; - - // Wait for both streams and the process - let (stdout_writer, stderr_writer, wait_result) = - tokio::join!(stdout_task, stderr_task, async { - if let Some(timeout_secs) = timeout_secs { - timeout(std::time::Duration::from_secs(timeout_secs), child.wait()).await - } else { - Ok(child.wait().await) - } - }); - - let duration_ms = start.elapsed().as_millis() as u64; - - // Handle timeout - let status = match wait_result { - Ok(Ok(status)) => status, - Ok(Err(e)) => { - return Err(RuntimeError::ProcessError(format!( - "Process wait failed: {}", - e - ))); - } - Err(_) => { - return Ok(ExecutionResult { - exit_code: -1, - stdout: String::new(), - stderr: String::new(), - result: None, - duration_ms, - error: Some(format!( - "Execution timed out after {} seconds", - timeout_secs.unwrap() - )), - stdout_truncated: false, - stderr_truncated: false, - stdout_bytes_truncated: 0, - stderr_bytes_truncated: 0, - }); - } - }; - - // Get results from bounded writers - let stdout_result = stdout_writer.into_result(); - let stderr_result = stderr_writer.into_result(); - - let exit_code = status.code().unwrap_or(-1); - - debug!( - "Python execution completed: exit_code={}, duration={}ms, stdout_truncated={}, stderr_truncated={}", - exit_code, duration_ms, stdout_result.truncated, stderr_result.truncated - ); - - // Parse result from stdout based on output_format - let result = if exit_code == 0 && !stdout_result.content.trim().is_empty() { - match output_format { - OutputFormat::Text => { - // No parsing - text output is captured in stdout field - None - } - OutputFormat::Json => { - // Try to parse full stdout as JSON first (handles multi-line JSON), - // then fall back to last line only (for scripts that log before output) - let trimmed = stdout_result.content.trim(); - serde_json::from_str(trimmed).ok().or_else(|| { - trimmed - .lines() - .last() - .and_then(|line| serde_json::from_str(line).ok()) - }) - } - OutputFormat::Yaml => { - // Try to parse stdout as YAML - serde_yaml_ng::from_str(stdout_result.content.trim()).ok() - } - OutputFormat::Jsonl => { - // Parse each line as JSON and collect into array - let mut items = Vec::new(); - for line in stdout_result.content.trim().lines() { - if let Ok(value) = serde_json::from_str::(line) { - items.push(value); - } - } - if items.is_empty() { - None - } else { - Some(serde_json::Value::Array(items)) - } - } - } - } else { - None - }; - - Ok(ExecutionResult { - exit_code, - // Only populate stdout if result wasn't parsed (avoid duplication) - stdout: if result.is_some() { - String::new() - } else { - stdout_result.content.clone() - }, - stderr: stderr_result.content.clone(), - result, - duration_ms, - error: if exit_code != 0 { - Some(stderr_result.content) - } else { - None - }, - stdout_truncated: stdout_result.truncated, - stderr_truncated: stderr_result.truncated, - stdout_bytes_truncated: stdout_result.bytes_truncated, - stderr_bytes_truncated: stderr_result.bytes_truncated, - }) - } - - async fn execute_python_code( - &self, - script: String, - secrets: &std::collections::HashMap, - env: &std::collections::HashMap, - timeout_secs: Option, - python_path: PathBuf, - max_stdout_bytes: usize, - max_stderr_bytes: usize, - output_format: OutputFormat, - ) -> RuntimeResult { - debug!( - "Executing Python script with {} secrets (passed via stdin)", - secrets.len() - ); - - // Build command - let mut cmd = Command::new(&python_path); - cmd.arg("-c").arg(&script); - - // Add environment variables - for (key, value) in env { - cmd.env(key, value); - } - - self.execute_with_streaming( - cmd, - secrets, - timeout_secs, - max_stdout_bytes, - max_stderr_bytes, - output_format, - ) - .await - } - - /// Execute Python script from file - async fn execute_python_file( - &self, - code_path: PathBuf, - secrets: &std::collections::HashMap, - env: &std::collections::HashMap, - timeout_secs: Option, - python_path: PathBuf, - max_stdout_bytes: usize, - max_stderr_bytes: usize, - output_format: OutputFormat, - ) -> RuntimeResult { - debug!( - "Executing Python file: {:?} with {} secrets", - code_path, - secrets.len() - ); - - // Build command - let mut cmd = Command::new(&python_path); - cmd.arg(&code_path); - - // Add environment variables - for (key, value) in env { - cmd.env(key, value); - } - - self.execute_with_streaming( - cmd, - secrets, - timeout_secs, - max_stdout_bytes, - max_stderr_bytes, - output_format, - ) - .await - } -} - -impl Default for PythonRuntime { - fn default() -> Self { - Self::new() - } -} - -impl PythonRuntime { - /// Ensure pack dependencies are installed (called before execution if needed) - /// - /// This is a helper method that can be called by the worker service to ensure - /// a pack's Python dependencies are set up before executing actions. - pub async fn ensure_pack_dependencies( - &self, - pack_ref: &str, - spec: &DependencySpec, - ) -> RuntimeResult<()> { - if let Some(ref dep_mgr) = self.dependency_manager { - if spec.has_dependencies() { - info!( - "Ensuring Python dependencies for pack: {} ({} dependencies)", - pack_ref, - spec.dependencies.len() - ); - - dep_mgr - .ensure_environment(pack_ref, spec) - .await - .map_err(|e| { - RuntimeError::SetupError(format!( - "Failed to setup Python environment for {}: {}", - pack_ref, e - )) - })?; - - info!("Python dependencies ready for pack: {}", pack_ref); - } else { - debug!("Pack {} has no Python dependencies", pack_ref); - } - } else { - warn!("Dependency manager not configured, skipping dependency isolation"); - } - - Ok(()) - } -} - -#[async_trait] -impl Runtime for PythonRuntime { - fn name(&self) -> &str { - "python" - } - - fn can_execute(&self, context: &ExecutionContext) -> bool { - // Check if action reference suggests Python - let is_python = context.action_ref.contains(".py") - || context.entry_point.ends_with(".py") - || context - .code_path - .as_ref() - .map(|p| p.extension().and_then(|e| e.to_str()) == Some("py")) - .unwrap_or(false); - - is_python - } - - async fn execute(&self, context: ExecutionContext) -> RuntimeResult { - info!( - "Executing Python action: {} (execution_id: {})", - context.action_ref, context.execution_id - ); - - // Get the appropriate Python executable (venv or default) - let python_path = self.get_python_executable(&context).await?; - - // If code_path is provided, execute the file directly - if let Some(code_path) = &context.code_path { - return self - .execute_python_file( - code_path.clone(), - &context.secrets, - &context.env, - context.timeout, - python_path, - context.max_stdout_bytes, - context.max_stderr_bytes, - context.output_format, - ) - .await; - } - - // Otherwise, generate wrapper script and execute - let script = self.generate_wrapper_script(&context)?; - self.execute_python_code( - script, - &context.secrets, - &context.env, - context.timeout, - python_path, - context.max_stdout_bytes, - context.max_stderr_bytes, - context.output_format, - ) - .await - } - - async fn setup(&self) -> RuntimeResult<()> { - info!("Setting up Python runtime"); - - // Ensure work directory exists - tokio::fs::create_dir_all(&self.work_dir) - .await - .map_err(|e| RuntimeError::SetupError(format!("Failed to create work dir: {}", e)))?; - - // Verify Python is available - let output = Command::new(&self.python_path) - .arg("--version") - .output() - .await - .map_err(|e| { - RuntimeError::SetupError(format!( - "Python not found at {:?}: {}", - self.python_path, e - )) - })?; - - if !output.status.success() { - return Err(RuntimeError::SetupError( - "Python interpreter is not working".to_string(), - )); - } - - let version = String::from_utf8_lossy(&output.stdout); - info!("Python runtime ready: {}", version.trim()); - - Ok(()) - } - - async fn cleanup(&self) -> RuntimeResult<()> { - info!("Cleaning up Python runtime"); - // Could clean up temporary files here - Ok(()) - } - - async fn validate(&self) -> RuntimeResult<()> { - debug!("Validating Python runtime"); - - // Check if Python is available - let output = Command::new(&self.python_path) - .arg("--version") - .output() - .await - .map_err(|e| RuntimeError::SetupError(format!("Python validation failed: {}", e)))?; - - if !output.status.success() { - return Err(RuntimeError::SetupError( - "Python interpreter validation failed".to_string(), - )); - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashMap; - - #[tokio::test] - async fn test_python_runtime_simple() { - let runtime = PythonRuntime::new(); - - let context = ExecutionContext { - execution_id: 1, - action_ref: "test.simple".to_string(), - parameters: { - let mut map = HashMap::new(); - map.insert("x".to_string(), serde_json::json!(5)); - map.insert("y".to_string(), serde_json::json!(10)); - map - }, - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(10), - working_dir: None, - entry_point: "run".to_string(), - code: Some( - r#" -def run(x, y): - return x + y -"# - .to_string(), - ), - code_path: None, - runtime_name: Some("python".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(result.is_success()); - assert_eq!(result.exit_code, 0); - } - - #[tokio::test] - async fn test_python_runtime_timeout() { - let runtime = PythonRuntime::new(); - - let context = ExecutionContext { - execution_id: 2, - action_ref: "test.timeout".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(1), - working_dir: None, - entry_point: "run".to_string(), - code: Some( - r#" -import time -def run(): - time.sleep(10) - return "done" -"# - .to_string(), - ), - code_path: None, - runtime_name: Some("python".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(!result.is_success()); - assert!(result.error.is_some()); - let error_msg = result.error.unwrap(); - assert!(error_msg.contains("timeout") || error_msg.contains("timed out")); - } - - #[tokio::test] - async fn test_python_runtime_error() { - let runtime = PythonRuntime::new(); - - let context = ExecutionContext { - execution_id: 3, - action_ref: "test.error".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(10), - working_dir: None, - entry_point: "run".to_string(), - code: Some( - r#" -def run(): - raise ValueError("Test error") -"# - .to_string(), - ), - code_path: None, - runtime_name: Some("python".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(!result.is_success()); - assert!(result.error.is_some()); - } - - #[tokio::test] - #[ignore = "Pre-existing failure - secrets not being passed correctly"] - async fn test_python_runtime_with_secrets() { - let runtime = PythonRuntime::new(); - - let context = ExecutionContext { - execution_id: 4, - action_ref: "test.secrets".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: { - let mut s = HashMap::new(); - s.insert("api_key".to_string(), "secret_key_12345".to_string()); - s.insert("db_password".to_string(), "super_secret_pass".to_string()); - s - }, - timeout: Some(10), - working_dir: None, - entry_point: "run".to_string(), - code: Some( - r#" -def run(): - # Access secrets via get_secret() helper - api_key = get_secret('api_key') - db_pass = get_secret('db_password') - missing = get_secret('nonexistent') - - return { - 'api_key': api_key, - 'db_pass': db_pass, - 'missing': missing - } -"# - .to_string(), - ), - code_path: None, - runtime_name: Some("python".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(result.is_success()); - assert_eq!(result.exit_code, 0); - - // Verify secrets are accessible in action code - let result_data = result.result.unwrap(); - let result_obj = result_data.get("result").unwrap(); - assert_eq!(result_obj.get("api_key").unwrap(), "secret_key_12345"); - assert_eq!(result_obj.get("db_pass").unwrap(), "super_secret_pass"); - assert_eq!(result_obj.get("missing"), Some(&serde_json::Value::Null)); - } -}