diff --git a/crates/api/src/dto/execution.rs b/crates/api/src/dto/execution.rs index a765858..8e4b905 100644 --- a/crates/api/src/dto/execution.rs +++ b/crates/api/src/dto/execution.rs @@ -52,10 +52,14 @@ pub struct ExecutionResponse { #[schema(example = 1)] pub enforcement: Option, - /// Executor ID (worker/executor that ran this) + /// Identity ID that initiated this execution #[schema(example = 1)] pub executor: Option, + /// Worker ID currently assigned to this execution + #[schema(example = 1)] + pub worker: Option, + /// Execution status #[schema(example = "succeeded")] pub status: ExecutionStatus, @@ -216,6 +220,7 @@ impl From for ExecutionResponse { parent: execution.parent, enforcement: execution.enforcement, executor: execution.executor, + worker: execution.worker, status: execution.status, result: execution .result diff --git a/crates/api/src/routes/executions.rs b/crates/api/src/routes/executions.rs index d3b0ca2..aba9c42 100644 --- a/crates/api/src/routes/executions.rs +++ b/crates/api/src/routes/executions.rs @@ -123,6 +123,7 @@ pub async fn create_execution( parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, // Non-workflow execution @@ -510,11 +511,11 @@ pub async fn cancel_execution( .await; // Send cancel request to the worker via MQ - if let Some(worker_id) = execution.executor { + if let Some(worker_id) = execution.worker { send_cancel_to_worker(publisher.as_deref(), id, worker_id).await; } else { tracing::warn!( - "Execution {} has no executor/worker assigned; marked as canceling but no MQ message sent", + "Execution {} has no worker assigned; marked as canceling but no MQ message sent", id ); } @@ -754,7 +755,7 @@ async fn cancel_workflow_children_with_policy( } } - if let Some(worker_id) = child.executor { + if let Some(worker_id) = child.worker { send_cancel_to_worker(publisher, child_id, worker_id).await; } } diff --git a/crates/api/tests/sse_execution_stream_tests.rs b/crates/api/tests/sse_execution_stream_tests.rs index 4d1e9f0..851f4f2 100644 --- a/crates/api/tests/sse_execution_stream_tests.rs +++ b/crates/api/tests/sse_execution_stream_tests.rs @@ -75,6 +75,7 @@ async fn create_test_execution(pool: &PgPool, action_id: i64) -> Result, pub executor: Option, + pub worker: Option, pub status: ExecutionStatus, pub result: Option, diff --git a/crates/common/src/repositories/execution.rs b/crates/common/src/repositories/execution.rs index 4f57557..0dba547 100644 --- a/crates/common/src/repositories/execution.rs +++ b/crates/common/src/repositories/execution.rs @@ -54,6 +54,7 @@ pub struct ExecutionWithRefs { pub parent: Option, pub enforcement: Option, pub executor: Option, + pub worker: Option, pub status: ExecutionStatus, pub result: Option, pub started_at: Option>, @@ -73,7 +74,7 @@ pub struct ExecutionWithRefs { /// are NOT in the Rust struct, so `SELECT *` must never be used. pub const SELECT_COLUMNS: &str = "\ id, action, action_ref, config, env_vars, parent, enforcement, \ - executor, status, result, started_at, workflow_task, created, updated"; + executor, worker, status, result, started_at, workflow_task, created, updated"; pub struct ExecutionRepository; @@ -93,6 +94,7 @@ pub struct CreateExecutionInput { pub parent: Option, pub enforcement: Option, pub executor: Option, + pub worker: Option, pub status: ExecutionStatus, pub result: Option, pub workflow_task: Option, @@ -103,6 +105,7 @@ pub struct UpdateExecutionInput { pub status: Option, pub result: Option, pub executor: Option, + pub worker: Option, pub started_at: Option>, pub workflow_task: Option, } @@ -113,6 +116,7 @@ impl From for UpdateExecutionInput { status: Some(execution.status), result: execution.result, executor: execution.executor, + worker: execution.worker, started_at: execution.started_at, workflow_task: execution.workflow_task, } @@ -158,8 +162,8 @@ impl Create for ExecutionRepository { { let sql = format!( "INSERT INTO execution \ - (action, action_ref, config, env_vars, parent, enforcement, executor, status, result, workflow_task) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ + (action, action_ref, config, env_vars, parent, enforcement, executor, worker, status, result, workflow_task) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ RETURNING {SELECT_COLUMNS}" ); sqlx::query_as::<_, Execution>(&sql) @@ -170,6 +174,7 @@ impl Create for ExecutionRepository { .bind(input.parent) .bind(input.enforcement) .bind(input.executor) + .bind(input.worker) .bind(input.status) .bind(&input.result) .bind(sqlx::types::Json(&input.workflow_task)) @@ -208,6 +213,13 @@ impl Update for ExecutionRepository { query.push("executor = ").push_bind(executor_id); has_updates = true; } + if let Some(worker_id) = input.worker { + if has_updates { + query.push(", "); + } + query.push("worker = ").push_bind(worker_id); + has_updates = true; + } if let Some(started_at) = input.started_at { if has_updates { query.push(", "); diff --git a/crates/common/tests/execution_repository_tests.rs b/crates/common/tests/execution_repository_tests.rs index 4961d94..c30b7ba 100644 --- a/crates/common/tests/execution_repository_tests.rs +++ b/crates/common/tests/execution_repository_tests.rs @@ -42,6 +42,7 @@ async fn test_create_execution_basic() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -76,6 +77,7 @@ async fn test_create_execution_without_action() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -110,6 +112,7 @@ async fn test_create_execution_with_all_fields() { parent: None, enforcement: None, executor: None, // Don't reference non-existent identity + worker: None, status: ExecutionStatus::Scheduled, result: Some(json!({"status": "ok"})), workflow_task: None, @@ -146,6 +149,7 @@ async fn test_create_execution_with_parent() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Running, result: None, workflow_task: None, @@ -164,6 +168,7 @@ async fn test_create_execution_with_parent() { parent: Some(parent.id), enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -203,6 +208,7 @@ async fn test_find_execution_by_id() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -257,6 +263,7 @@ async fn test_list_executions() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -303,6 +310,7 @@ async fn test_list_executions_ordered_by_created_desc() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -354,6 +362,7 @@ async fn test_update_execution_status() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -399,6 +408,7 @@ async fn test_update_execution_result() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Running, result: None, workflow_task: None, @@ -445,6 +455,7 @@ async fn test_update_execution_executor() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -489,6 +500,7 @@ async fn test_update_execution_status_transitions() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -580,6 +592,7 @@ async fn test_update_execution_failed_status() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Running, result: None, workflow_task: None, @@ -625,6 +638,7 @@ async fn test_update_execution_no_changes() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -669,6 +683,7 @@ async fn test_delete_execution() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Completed, result: None, workflow_task: None, @@ -736,6 +751,7 @@ async fn test_find_executions_by_status() { parent: None, enforcement: None, executor: None, + worker: None, status: *status, result: None, workflow_task: None, @@ -783,6 +799,7 @@ async fn test_find_executions_by_enforcement() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -801,6 +818,7 @@ async fn test_find_executions_by_enforcement() { parent: None, enforcement: None, // Can't reference non-existent enforcement executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -845,6 +863,7 @@ async fn test_parent_child_execution_hierarchy() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Running, result: None, workflow_task: None, @@ -865,6 +884,7 @@ async fn test_parent_child_execution_hierarchy() { parent: Some(parent.id), enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -909,6 +929,7 @@ async fn test_nested_execution_hierarchy() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Running, result: None, workflow_task: None, @@ -927,6 +948,7 @@ async fn test_nested_execution_hierarchy() { parent: Some(grandparent.id), enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Running, result: None, workflow_task: None, @@ -945,6 +967,7 @@ async fn test_nested_execution_hierarchy() { parent: Some(parent.id), enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -987,6 +1010,7 @@ async fn test_execution_timestamps() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -1058,6 +1082,7 @@ async fn test_execution_config_json() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, @@ -1091,6 +1116,7 @@ async fn test_execution_result_json() { parent: None, enforcement: None, executor: None, + worker: None, status: ExecutionStatus::Running, result: None, workflow_task: None, diff --git a/crates/common/tests/inquiry_repository_tests.rs b/crates/common/tests/inquiry_repository_tests.rs index 70214b7..e54b8d5 100644 --- a/crates/common/tests/inquiry_repository_tests.rs +++ b/crates/common/tests/inquiry_repository_tests.rs @@ -49,6 +49,7 @@ async fn test_create_inquiry_minimal() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -109,6 +110,7 @@ async fn test_create_inquiry_with_response_schema() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -167,6 +169,7 @@ async fn test_create_inquiry_with_timeout() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -221,6 +224,7 @@ async fn test_create_inquiry_with_assigned_user() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -310,6 +314,7 @@ async fn test_find_inquiry_by_id() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -372,6 +377,7 @@ async fn test_get_inquiry_by_id() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -443,6 +449,7 @@ async fn test_list_inquiries() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -504,6 +511,7 @@ async fn test_update_inquiry_status() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -560,6 +568,7 @@ async fn test_update_inquiry_status_transitions() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -645,6 +654,7 @@ async fn test_update_inquiry_response() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -703,6 +713,7 @@ async fn test_update_inquiry_with_response_and_status() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -761,6 +772,7 @@ async fn test_update_inquiry_assignment() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -828,6 +840,7 @@ async fn test_update_inquiry_no_changes() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -905,6 +918,7 @@ async fn test_delete_inquiry() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -965,6 +979,7 @@ async fn test_delete_execution_cascades_to_inquiries() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -1032,6 +1047,7 @@ async fn test_find_inquiries_by_status() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -1111,6 +1127,7 @@ async fn test_find_inquiries_by_execution() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -1129,6 +1146,7 @@ async fn test_find_inquiries_by_execution() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -1193,6 +1211,7 @@ async fn test_inquiry_timestamps_auto_managed() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, @@ -1260,6 +1279,7 @@ async fn test_inquiry_complex_response_schema() { parent: None, enforcement: None, executor: None, + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, diff --git a/crates/executor/src/enforcement_processor.rs b/crates/executor/src/enforcement_processor.rs index 5d56d37..8e2cbfb 100644 --- a/crates/executor/src/enforcement_processor.rs +++ b/crates/executor/src/enforcement_processor.rs @@ -292,6 +292,7 @@ impl EnforcementProcessor { parent: None, // TODO: Handle workflow parent-child relationships enforcement: Some(enforcement.id), executor: None, // Will be assigned during scheduling + worker: None, status: attune_common::models::enums::ExecutionStatus::Requested, result: None, workflow_task: None, // Non-workflow execution diff --git a/crates/executor/src/execution_manager.rs b/crates/executor/src/execution_manager.rs index 4ba8086..cef6901 100644 --- a/crates/executor/src/execution_manager.rs +++ b/crates/executor/src/execution_manager.rs @@ -247,7 +247,7 @@ impl ExecutionManager { ExecutionRepository::update(pool, child_id, update).await?; } - if let Some(worker_id) = child.executor { + if let Some(worker_id) = child.worker { Self::send_cancel_to_worker(publisher, child_id, worker_id).await?; } else { warn!( @@ -423,6 +423,7 @@ impl ExecutionManager { parent: Some(parent.id), // Link to parent execution enforcement: parent.enforcement, executor: None, // Will be assigned during scheduling + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: None, // Non-workflow execution diff --git a/crates/executor/src/retry_manager.rs b/crates/executor/src/retry_manager.rs index 414e922..920b0c8 100644 --- a/crates/executor/src/retry_manager.rs +++ b/crates/executor/src/retry_manager.rs @@ -298,6 +298,7 @@ impl RetryManager { parent: original.parent, enforcement: original.enforcement, executor: None, // Will be assigned by scheduler + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: original.workflow_task.clone(), diff --git a/crates/executor/src/scheduler.rs b/crates/executor/src/scheduler.rs index 8f0c524..365595f 100644 --- a/crates/executor/src/scheduler.rs +++ b/crates/executor/src/scheduler.rs @@ -230,9 +230,11 @@ impl ExecutionScheduler { } }; - // Update execution status to scheduled + // Persist the selected worker so later cancellation requests can be + // routed to the correct per-worker cancel queue. let mut execution_for_update = execution; execution_for_update.status = ExecutionStatus::Scheduled; + execution_for_update.worker = Some(worker.id); ExecutionRepository::update(pool, execution_for_update.id, execution_for_update.into()) .await?; @@ -529,6 +531,7 @@ impl ExecutionScheduler { parent: Some(parent_execution.id), enforcement: parent_execution.enforcement, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: Some(workflow_task), @@ -689,6 +692,7 @@ impl ExecutionScheduler { parent: Some(parent_execution.id), enforcement: parent_execution.enforcement, executor: None, + worker: None, status: ExecutionStatus::Requested, result: None, workflow_task: Some(workflow_task), @@ -1886,4 +1890,32 @@ mod tests { serde_json::json!({"parameters": {"n": 5}, "context": {"rule": "test"}}) ); } + + #[test] + fn test_scheduling_persists_selected_worker() { + let mut execution = attune_common::models::Execution { + id: 42, + action: Some(7), + action_ref: "core.sleep".to_string(), + config: None, + env_vars: None, + parent: None, + enforcement: None, + executor: None, + worker: None, + status: ExecutionStatus::Requested, + result: None, + started_at: None, + workflow_task: None, + created: Utc::now(), + updated: Utc::now(), + }; + + execution.status = ExecutionStatus::Scheduled; + execution.worker = Some(99); + + let update: UpdateExecutionInput = execution.into(); + assert_eq!(update.status, Some(ExecutionStatus::Scheduled)); + assert_eq!(update.worker, Some(99)); + } } diff --git a/crates/executor/tests/fifo_ordering_integration_test.rs b/crates/executor/tests/fifo_ordering_integration_test.rs index ac4cfe5..2e40926 100644 --- a/crates/executor/tests/fifo_ordering_integration_test.rs +++ b/crates/executor/tests/fifo_ordering_integration_test.rs @@ -126,6 +126,7 @@ async fn create_test_execution( parent: None, enforcement: None, executor: None, + worker: None, status, result: None, workflow_task: None, diff --git a/crates/executor/tests/policy_enforcer_tests.rs b/crates/executor/tests/policy_enforcer_tests.rs index b7a8c3b..03c9da4 100644 --- a/crates/executor/tests/policy_enforcer_tests.rs +++ b/crates/executor/tests/policy_enforcer_tests.rs @@ -121,6 +121,7 @@ async fn create_test_execution( parent: None, enforcement: None, executor: None, + worker: None, status, result: None, workflow_task: None, diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 8b25afa..6adacab 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -19,7 +19,7 @@ pub use heartbeat::HeartbeatManager; pub use registration::WorkerRegistration; pub use runtime::{ ExecutionContext, ExecutionResult, LocalRuntime, NativeRuntime, ProcessRuntime, Runtime, - RuntimeError, RuntimeResult, ShellRuntime, + RuntimeError, RuntimeResult, }; pub use secrets::SecretManager; pub use service::WorkerService; diff --git a/crates/worker/src/runtime/mod.rs b/crates/worker/src/runtime/mod.rs index 286f877..b9e96c0 100644 --- a/crates/worker/src/runtime/mod.rs +++ b/crates/worker/src/runtime/mod.rs @@ -29,13 +29,11 @@ pub mod native; pub mod parameter_passing; pub mod process; pub mod process_executor; -pub mod shell; // Re-export runtime implementations pub use local::LocalRuntime; pub use native::NativeRuntime; pub use process::ProcessRuntime; -pub use shell::ShellRuntime; use async_trait::async_trait; use attune_common::models::runtime::RuntimeExecutionConfig; diff --git a/crates/worker/src/runtime/process_executor.rs b/crates/worker/src/runtime/process_executor.rs index 4df61d6..9f65405 100644 --- a/crates/worker/src/runtime/process_executor.rs +++ b/crates/worker/src/runtime/process_executor.rs @@ -14,6 +14,7 @@ use super::{BoundedLogWriter, ExecutionResult, OutputFormat, RuntimeResult}; use std::collections::HashMap; +use std::io; use std::path::Path; use std::time::Instant; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; @@ -95,6 +96,8 @@ pub async fn execute_streaming_cancellable( ) -> RuntimeResult { let start = Instant::now(); + configure_child_process(&mut cmd)?; + // Spawn process with piped I/O let mut child = cmd .stdin(std::process::Stdio::piped()) @@ -177,55 +180,56 @@ pub async fn execute_streaming_cancellable( // Build the wait future that handles timeout, cancellation, and normal completion. // - // The result is a tuple: (wait_result, was_cancelled) - // - wait_result mirrors the original type: Result, Elapsed> - // - was_cancelled indicates the process was stopped by a cancel request + // The result is a tuple: (exit_status, was_cancelled, was_timed_out) let wait_future = async { - // Inner future: wait for the child process to exit - let wait_child = child.wait(); - - // Apply optional timeout wrapping - let timed_wait = async { - if let Some(timeout_secs) = timeout_secs { - timeout(std::time::Duration::from_secs(timeout_secs), wait_child).await - } else { - Ok(wait_child.await) - } - }; - - // If we have a cancel token, race it against the (possibly-timed) wait - if let Some(ref token) = cancel_token { - tokio::select! { - result = timed_wait => (result, false), - _ = token.cancelled() => { - // Cancellation requested — terminate the child process promptly. - info!("Cancel signal received, sending SIGTERM to process"); - if let Some(pid) = child_pid { - send_signal(pid, libc::SIGTERM); - } - - // Grace period: wait up to 5s for the process to exit after SIGTERM. - match timeout(std::time::Duration::from_secs(5), child.wait()).await { - Ok(status) => (Ok(status), true), - Err(_) => { - // Last resort — SIGKILL - warn!("Process did not exit after SIGTERM + 5s, sending SIGKILL"); - if let Some(pid) = child_pid { - send_signal(pid, libc::SIGKILL); - } - // Wait indefinitely for the SIGKILL to take effect - (Ok(child.wait().await), true) + match (cancel_token.as_ref(), timeout_secs) { + (Some(token), Some(timeout_secs)) => { + tokio::select! { + result = child.wait() => (result, false, false), + _ = token.cancelled() => { + if let Some(pid) = child_pid { + terminate_process(pid, "cancel"); } + (wait_for_terminated_child(&mut child).await, true, false) + } + _ = tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)) => { + if let Some(pid) = child_pid { + warn!("Process timed out after {} seconds, terminating", timeout_secs); + terminate_process(pid, "timeout"); + } + (wait_for_terminated_child(&mut child).await, false, true) } } } - } else { - (timed_wait.await, false) + (Some(token), None) => { + tokio::select! { + result = child.wait() => (result, false, false), + _ = token.cancelled() => { + if let Some(pid) = child_pid { + terminate_process(pid, "cancel"); + } + (wait_for_terminated_child(&mut child).await, true, false) + } + } + } + (None, Some(timeout_secs)) => { + tokio::select! { + result = child.wait() => (result, false, false), + _ = tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)) => { + if let Some(pid) = child_pid { + warn!("Process timed out after {} seconds, terminating", timeout_secs); + terminate_process(pid, "timeout"); + } + (wait_for_terminated_child(&mut child).await, false, true) + } + } + } + (None, None) => (child.wait().await, false, false), } }; // Wait for both streams and the process - let (stdout_writer, stderr_writer, (wait_result, was_cancelled)) = + let (stdout_writer, stderr_writer, (wait_result, was_cancelled, was_timed_out)) = tokio::join!(stdout_task, stderr_task, wait_future); let duration_ms = start.elapsed().as_millis() as u64; @@ -236,31 +240,31 @@ pub async fn execute_streaming_cancellable( // Handle process wait result let (exit_code, process_error) = match wait_result { - Ok(Ok(status)) => (status.code().unwrap_or(-1), None), - Ok(Err(e)) => { + Ok(status) => (status.code().unwrap_or(-1), None), + Err(e) => { warn!("Process wait failed but captured output: {}", e); (-1, Some(format!("Process wait failed: {}", e))) } - Err(_) => { - // Timeout occurred - return Ok(ExecutionResult { - exit_code: -1, - stdout: stdout_result.content.clone(), - stderr: stderr_result.content.clone(), - result: None, - duration_ms, - error: Some(format!( - "Execution timed out after {} seconds", - timeout_secs.unwrap() - )), - stdout_truncated: stdout_result.truncated, - stderr_truncated: stderr_result.truncated, - stdout_bytes_truncated: stdout_result.bytes_truncated, - stderr_bytes_truncated: stderr_result.bytes_truncated, - }); - } }; + if was_timed_out { + return Ok(ExecutionResult { + exit_code: -1, + stdout: stdout_result.content.clone(), + stderr: stderr_result.content.clone(), + result: None, + duration_ms, + error: Some(format!( + "Execution timed out after {} seconds", + timeout_secs.unwrap() + )), + stdout_truncated: stdout_result.truncated, + stderr_truncated: stderr_result.truncated, + stdout_bytes_truncated: stdout_result.bytes_truncated, + stderr_bytes_truncated: stderr_result.bytes_truncated, + }); + } + // If the process was cancelled, return a specific result if was_cancelled { return Ok(ExecutionResult { @@ -348,14 +352,65 @@ pub async fn execute_streaming_cancellable( } /// Parse stdout content according to the specified output format. -/// Send a Unix signal to a process by PID. -/// -/// Uses raw `libc::kill()` to deliver signals for graceful process termination. -/// This is safe because we only send signals to child processes we spawned. -fn send_signal(pid: u32, signal: i32) { - // Safety: we're sending a signal to a known child process PID. - // The PID is valid because we obtained it from `child.id()` before the - // child exited. +fn configure_child_process(cmd: &mut Command) -> io::Result<()> { + #[cfg(unix)] + { + // Run each action in its own process group so cancellation and timeout + // can terminate shell wrappers and any children they spawned. + unsafe { + cmd.pre_exec(|| { + if libc::setpgid(0, 0) == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } + }); + } + } + + Ok(()) +} + +async fn wait_for_terminated_child( + child: &mut tokio::process::Child, +) -> io::Result { + match timeout(std::time::Duration::from_secs(5), child.wait()).await { + Ok(status) => status, + Err(_) => { + warn!("Process did not exit after SIGTERM + 5s, sending SIGKILL"); + if let Some(pid) = child.id() { + kill_process_group_or_process(pid, libc::SIGKILL); + } + child.wait().await + } + } +} + +fn terminate_process(pid: u32, reason: &str) { + info!("Sending SIGTERM to {} process group {}", reason, pid); + kill_process_group_or_process(pid, libc::SIGTERM); +} + +fn kill_process_group_or_process(pid: u32, signal: i32) { + #[cfg(unix)] + { + // Negative PID targets the process group created with setpgid(0, 0). + let pgid = -(pid as i32); + // Safety: we only signal processes we spawned. + let rc = unsafe { libc::kill(pgid, signal) }; + if rc == 0 { + return; + } + + let err = io::Error::last_os_error(); + warn!( + "Failed to signal process group {} with signal {}: {}. Falling back to PID {}", + pid, signal, err, pid + ); + } + + // Safety: fallback to the direct child PID if the process group signal fails + // or on non-Unix targets where process groups are unavailable. unsafe { libc::kill(pid as i32, signal); } @@ -479,6 +534,9 @@ pub fn build_inline_command( #[cfg(test)] mod tests { use super::*; + use tempfile::NamedTempFile; + use tokio::fs; + use tokio::time::{sleep, Duration}; #[test] fn test_parse_output_text() { @@ -608,4 +666,86 @@ mod tests { // We can't easily inspect Command internals, but at least verify it builds without panic let _ = cmd; } + + #[tokio::test] + async fn test_execute_streaming_cancellation_kills_shell_child_process() { + let script = NamedTempFile::new().unwrap(); + fs::write( + script.path(), + "#!/bin/sh\nsleep 30\nprintf 'unexpected completion\\n'\n", + ) + .await + .unwrap(); + + let mut perms = fs::metadata(script.path()).await.unwrap().permissions(); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + perms.set_mode(0o755); + } + fs::set_permissions(script.path(), perms).await.unwrap(); + + let cancel_token = CancellationToken::new(); + let trigger = cancel_token.clone(); + tokio::spawn(async move { + sleep(Duration::from_millis(200)).await; + trigger.cancel(); + }); + + let mut cmd = Command::new("/bin/sh"); + cmd.arg(script.path()); + + let result = execute_streaming_cancellable( + cmd, + &HashMap::new(), + None, + Some(60), + 1024 * 1024, + 1024 * 1024, + OutputFormat::Text, + Some(cancel_token), + ) + .await + .unwrap(); + + assert!(result + .error + .as_deref() + .is_some_and(|e| e.contains("cancelled"))); + assert!( + result.duration_ms < 5_000, + "expected prompt cancellation, got {}ms", + result.duration_ms + ); + assert!(!result.stdout.contains("unexpected completion")); + } + + #[tokio::test] + async fn test_execute_streaming_timeout_terminates_process() { + let mut cmd = Command::new("/bin/sh"); + cmd.arg("-c").arg("sleep 30"); + + let result = execute_streaming( + cmd, + &HashMap::new(), + None, + Some(1), + 1024 * 1024, + 1024 * 1024, + OutputFormat::Text, + ) + .await + .unwrap(); + + assert_eq!(result.exit_code, -1); + assert!(result + .error + .as_deref() + .is_some_and(|e| e.contains("timed out after 1 seconds"))); + assert!( + result.duration_ms < 7_000, + "expected timeout termination, got {}ms", + result.duration_ms + ); + } } diff --git a/crates/worker/src/runtime/shell.rs b/crates/worker/src/runtime/shell.rs deleted file mode 100644 index 0445532..0000000 --- a/crates/worker/src/runtime/shell.rs +++ /dev/null @@ -1,949 +0,0 @@ -//! Shell Runtime Implementation -//! -//! Executes shell scripts and commands using subprocess execution. - -use super::{ - parameter_passing::{self, ParameterDeliveryConfig}, - BoundedLogWriter, ExecutionContext, ExecutionResult, OutputFormat, Runtime, RuntimeError, - RuntimeResult, -}; -use async_trait::async_trait; -use std::collections::HashMap; -use std::path::PathBuf; -use std::process::Stdio; -use std::time::Instant; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::Command; -use tokio::time::timeout; -use tracing::{debug, info, warn}; - -/// Escape a string for embedding inside a bash single-quoted string. -/// -/// In single-quoted strings the only problematic character is `'` itself. -/// We close the current single-quote, insert an escaped single-quote, and -/// reopen: `'foo'\''bar'` → `foo'bar`. -fn bash_single_quote_escape(s: &str) -> String { - s.replace('\'', "'\\''") -} - -/// Shell runtime for executing shell scripts and commands -pub struct ShellRuntime { - /// Shell interpreter path (bash, sh, zsh, etc.) - shell_path: PathBuf, - - /// Base directory for storing action code - work_dir: PathBuf, -} - -impl ShellRuntime { - /// Create a new Shell runtime with bash - pub fn new() -> Self { - Self { - shell_path: PathBuf::from("/bin/bash"), - work_dir: PathBuf::from("/tmp/attune/actions"), - } - } - - /// Create a Shell runtime with custom shell - pub fn with_shell(shell_path: PathBuf) -> Self { - Self { - shell_path, - work_dir: PathBuf::from("/tmp/attune/actions"), - } - } - - /// Create a Shell runtime with custom settings - pub fn with_config(shell_path: PathBuf, work_dir: PathBuf) -> Self { - Self { - shell_path, - work_dir, - } - } - - /// Execute with streaming and bounded log collection - #[allow(clippy::too_many_arguments)] - async fn execute_with_streaming( - &self, - mut cmd: Command, - _secrets: &std::collections::HashMap, - parameters_stdin: Option<&str>, - timeout_secs: Option, - max_stdout_bytes: usize, - max_stderr_bytes: usize, - output_format: OutputFormat, - ) -> RuntimeResult { - let start = Instant::now(); - - // Spawn process with piped I/O - let mut child = cmd - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; - - // Write to stdin - parameters (with secrets already merged in by the caller). - // If this fails, the process has already started, so we continue and capture output. - let stdin_write_error = if let Some(mut stdin) = child.stdin.take() { - let mut error = None; - - // Write parameters to stdin as a single JSON line. - // Secrets are merged into the parameters map by the caller, so the - // action reads everything with a single readline(). - if let Some(params_data) = parameters_stdin { - if let Err(e) = stdin.write_all(params_data.as_bytes()).await { - error = Some(format!("Failed to write parameters to stdin: {}", e)); - } else if let Err(e) = stdin.write_all(b"\n").await { - error = Some(format!("Failed to write newline to stdin: {}", e)); - } - } - - drop(stdin); - error - } else { - None - }; - - // Create bounded writers - let mut stdout_writer = BoundedLogWriter::new_stdout(max_stdout_bytes); - let mut stderr_writer = BoundedLogWriter::new_stderr(max_stderr_bytes); - - // Take stdout and stderr streams - let stdout = child.stdout.take().expect("stdout not captured"); - let stderr = child.stderr.take().expect("stderr not captured"); - - // Create buffered readers - let mut stdout_reader = BufReader::new(stdout); - let mut stderr_reader = BufReader::new(stderr); - - // Stream both outputs concurrently - let stdout_task = async { - let mut line = Vec::new(); - loop { - line.clear(); - match stdout_reader.read_until(b'\n', &mut line).await { - Ok(0) => break, // EOF - Ok(_) => { - if stdout_writer.write_all(&line).await.is_err() { - break; - } - } - Err(_) => break, - } - } - stdout_writer - }; - - let stderr_task = async { - let mut line = Vec::new(); - loop { - line.clear(); - match stderr_reader.read_until(b'\n', &mut line).await { - Ok(0) => break, // EOF - Ok(_) => { - if stderr_writer.write_all(&line).await.is_err() { - break; - } - } - Err(_) => break, - } - } - stderr_writer - }; - - // Wait for both streams and the process - let (stdout_writer, stderr_writer, wait_result) = - tokio::join!(stdout_task, stderr_task, async { - if let Some(timeout_secs) = timeout_secs { - timeout(std::time::Duration::from_secs(timeout_secs), child.wait()).await - } else { - Ok(child.wait().await) - } - }); - - let duration_ms = start.elapsed().as_millis() as u64; - - // Get results from bounded writers - we have these regardless of wait() success - let stdout_result = stdout_writer.into_result(); - let stderr_result = stderr_writer.into_result(); - - // Handle process wait result - let (exit_code, process_error) = match wait_result { - Ok(Ok(status)) => (status.code().unwrap_or(-1), None), - Ok(Err(e)) => { - // Process wait failed, but we have the output - return it with an error - warn!("Process wait failed but captured output: {}", e); - (-1, Some(format!("Process wait failed: {}", e))) - } - Err(_) => { - // Timeout occurred - return Ok(ExecutionResult { - exit_code: -1, - stdout: stdout_result.content.clone(), - stderr: stderr_result.content.clone(), - result: None, - duration_ms, - error: Some(format!( - "Execution timed out after {} seconds", - timeout_secs.unwrap() - )), - stdout_truncated: stdout_result.truncated, - stderr_truncated: stderr_result.truncated, - stdout_bytes_truncated: stdout_result.bytes_truncated, - stderr_bytes_truncated: stderr_result.bytes_truncated, - }); - } - }; - - debug!( - "Shell execution completed: exit_code={}, duration={}ms, stdout_truncated={}, stderr_truncated={}", - exit_code, duration_ms, stdout_result.truncated, stderr_result.truncated - ); - - // Parse result from stdout based on output_format - let result = if exit_code == 0 && !stdout_result.content.trim().is_empty() { - match output_format { - OutputFormat::Text => { - // No parsing - text output is captured in stdout field - None - } - OutputFormat::Json => { - // Try to parse full stdout as JSON first (handles multi-line JSON), - // then fall back to last line only (for scripts that log before output) - let trimmed = stdout_result.content.trim(); - serde_json::from_str(trimmed).ok().or_else(|| { - trimmed - .lines() - .last() - .and_then(|line| serde_json::from_str(line).ok()) - }) - } - OutputFormat::Yaml => { - // Try to parse stdout as YAML - serde_yaml_ng::from_str(stdout_result.content.trim()).ok() - } - OutputFormat::Jsonl => { - // Parse each line as JSON and collect into array - let mut items = Vec::new(); - for line in stdout_result.content.trim().lines() { - if let Ok(value) = serde_json::from_str::(line) { - items.push(value); - } - } - if items.is_empty() { - None - } else { - Some(serde_json::Value::Array(items)) - } - } - } - } else { - None - }; - - // Determine error message - let error = if let Some(proc_err) = process_error { - Some(proc_err) - } else if let Some(stdin_err) = stdin_write_error { - // Ignore broken pipe errors for fast-exiting successful actions - // These occur when the process exits before we finish writing secrets to stdin - let is_broken_pipe = - stdin_err.contains("Broken pipe") || stdin_err.contains("os error 32"); - let is_fast_exit = duration_ms < 500; - let is_success = exit_code == 0; - - if is_broken_pipe && is_fast_exit && is_success { - debug!( - "Ignoring broken pipe error for fast-exiting successful action ({}ms)", - duration_ms - ); - None - } else { - Some(stdin_err) - } - } else if exit_code != 0 { - Some(if stderr_result.content.is_empty() { - format!("Command exited with code {}", exit_code) - } else { - // Use last line of stderr as error, or full stderr if short - if stderr_result.content.lines().count() > 5 { - stderr_result - .content - .lines() - .last() - .unwrap_or("") - .to_string() - } else { - stderr_result.content.clone() - } - }) - } else { - None - }; - - Ok(ExecutionResult { - exit_code, - // Only populate stdout if result wasn't parsed (avoid duplication) - stdout: if result.is_some() { - String::new() - } else { - stdout_result.content.clone() - }, - stderr: stderr_result.content.clone(), - result, - duration_ms, - error, - stdout_truncated: stdout_result.truncated, - stderr_truncated: stderr_result.truncated, - stdout_bytes_truncated: stdout_result.bytes_truncated, - stderr_bytes_truncated: stderr_result.bytes_truncated, - }) - } - - /// Generate shell wrapper script that injects parameters and secrets directly. - /// - /// Secrets are embedded as bash associative-array entries at generation time - /// so the wrapper has **zero external runtime dependencies** (no Python, jq, - /// etc.). The generated script is written to a temp file by the caller so - /// that secrets never appear in `/proc//cmdline`. - fn generate_wrapper_script(&self, context: &ExecutionContext) -> RuntimeResult { - let mut script = String::new(); - - // Add shebang - script.push_str("#!/bin/bash\n"); - script.push_str("set -e\n\n"); // Exit on error - - // Populate secrets associative array directly from Rust — no stdin - // reading, no JSON parsing, no external interpreters. - script.push_str("# Secrets (injected at generation time, not via environment)\n"); - script.push_str("declare -A ATTUNE_SECRETS\n"); - for (key, value) in &context.secrets { - let escaped_key = bash_single_quote_escape(key); - // Serialize structured JSON values to string for bash; plain strings used directly. - let val_str = match value { - serde_json::Value::String(s) => s.clone(), - other => other.to_string(), - }; - let escaped_val = bash_single_quote_escape(&val_str); - script.push_str(&format!( - "ATTUNE_SECRETS['{}']='{}'\n", - escaped_key, escaped_val - )); - } - script.push('\n'); - - // Helper function to get secrets - script.push_str("# Helper function to access secrets\n"); - script.push_str("get_secret() {\n"); - script.push_str(" local name=\"$1\"\n"); - script.push_str(" echo \"${ATTUNE_SECRETS[$name]}\"\n"); - script.push_str("}\n\n"); - - // Export parameters as environment variables - script.push_str("# Action parameters\n"); - for (key, value) in &context.parameters { - let value_str = match value { - serde_json::Value::String(s) => s.clone(), - serde_json::Value::Number(n) => n.to_string(), - serde_json::Value::Bool(b) => b.to_string(), - _ => serde_json::to_string(value)?, - }; - let escaped = bash_single_quote_escape(&value_str); - // Export with PARAM_ prefix for consistency - script.push_str(&format!( - "export PARAM_{}='{}'\n", - key.to_uppercase(), - escaped - )); - // Also export without prefix for easier shell script writing - script.push_str(&format!("export {}='{}'\n", key, escaped)); - } - script.push('\n'); - - // Add the action code - script.push_str("# Action code\n"); - if let Some(code) = &context.code { - script.push_str(code); - } - - Ok(script) - } - - /// Execute shell script from file - #[allow(clippy::too_many_arguments)] - async fn execute_shell_file( - &self, - script_path: PathBuf, - _secrets: &std::collections::HashMap, - env: &std::collections::HashMap, - parameters_stdin: Option<&str>, - timeout_secs: Option, - max_stdout_bytes: usize, - max_stderr_bytes: usize, - output_format: OutputFormat, - ) -> RuntimeResult { - debug!("Executing shell file: {:?}", script_path,); - - // Build command - let mut cmd = Command::new(&self.shell_path); - cmd.arg(&script_path); - - // Add environment variables - for (key, value) in env { - cmd.env(key, value); - } - - self.execute_with_streaming( - cmd, - &std::collections::HashMap::new(), - parameters_stdin, - timeout_secs, - max_stdout_bytes, - max_stderr_bytes, - output_format, - ) - .await - } -} - -impl Default for ShellRuntime { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl Runtime for ShellRuntime { - fn name(&self) -> &str { - "shell" - } - - fn can_execute(&self, context: &ExecutionContext) -> bool { - // Check if action reference suggests shell script - let is_shell = context.action_ref.contains(".sh") - || context.entry_point.ends_with(".sh") - || context - .code_path - .as_ref() - .map(|p| p.extension().and_then(|e| e.to_str()) == Some("sh")) - .unwrap_or(false) - || context.entry_point == "bash" - || context.entry_point == "sh" - || context.entry_point == "shell"; - - is_shell - } - - async fn execute(&self, context: ExecutionContext) -> RuntimeResult { - info!( - "Executing shell action: {} (execution_id: {}) with parameter delivery: {:?}, format: {:?}", - context.action_ref, context.execution_id, context.parameter_delivery, context.parameter_format - ); - info!( - "Action parameters (count: {}): {:?}", - context.parameters.len(), - context.parameters - ); - - // Merge secrets into parameters as a single JSON document. - // Actions receive everything via one readline() on stdin. - let mut merged_parameters = context.parameters.clone(); - for (key, value) in &context.secrets { - merged_parameters.insert(key.clone(), value.clone()); - } - - // Prepare environment and parameters according to delivery method - let mut env = context.env.clone(); - let config = ParameterDeliveryConfig { - delivery: context.parameter_delivery, - format: context.parameter_format, - }; - - let prepared_params = - parameter_passing::prepare_parameters(&merged_parameters, &mut env, config)?; - - // Get stdin content if parameters are delivered via stdin - let parameters_stdin = prepared_params.stdin_content(); - - if let Some(stdin_data) = parameters_stdin { - info!( - "Parameters to be sent via stdin (length: {} bytes):\n{}", - stdin_data.len(), - stdin_data - ); - } else { - info!("No parameters will be sent via stdin"); - } - - // If code_path is provided, execute the file directly. - // Secrets are already merged into parameters — no separate secrets arg needed. - if let Some(code_path) = &context.code_path { - return self - .execute_shell_file( - code_path.clone(), - &HashMap::new(), - &env, - parameters_stdin, - context.timeout, - context.max_stdout_bytes, - context.max_stderr_bytes, - context.output_format, - ) - .await; - } - - // Otherwise, generate wrapper script and execute. - // Secrets and parameters are embedded directly in the wrapper script - // by generate_wrapper_script(), so we write it to a temp file (to keep - // secrets out of /proc/cmdline) and pass no secrets/params via stdin. - let script = self.generate_wrapper_script(&context)?; - - // Write wrapper to a temp file so secrets are not exposed in the - // process command line (which would happen with `bash -c "..."`). - let wrapper_dir = self.work_dir.join("wrappers"); - tokio::fs::create_dir_all(&wrapper_dir).await.map_err(|e| { - RuntimeError::ExecutionFailed(format!("Failed to create wrapper directory: {}", e)) - })?; - let wrapper_path = wrapper_dir.join(format!("wrapper_{}.sh", context.execution_id)); - tokio::fs::write(&wrapper_path, &script) - .await - .map_err(|e| { - RuntimeError::ExecutionFailed(format!("Failed to write wrapper script: {}", e)) - })?; - - let result = self - .execute_shell_file( - wrapper_path.clone(), - &HashMap::new(), // secrets are in the script, not stdin - &env, - None, - context.timeout, - context.max_stdout_bytes, - context.max_stderr_bytes, - context.output_format, - ) - .await; - - // Clean up wrapper file (best-effort) - let _ = tokio::fs::remove_file(&wrapper_path).await; - - result - } - - async fn setup(&self) -> RuntimeResult<()> { - info!("Setting up Shell runtime"); - - // Ensure work directory exists - tokio::fs::create_dir_all(&self.work_dir) - .await - .map_err(|e| RuntimeError::SetupError(format!("Failed to create work dir: {}", e)))?; - - // Verify shell is available - let output = Command::new(&self.shell_path) - .arg("--version") - .output() - .await - .map_err(|e| { - RuntimeError::SetupError(format!("Shell not found at {:?}: {}", self.shell_path, e)) - })?; - - if !output.status.success() { - return Err(RuntimeError::SetupError( - "Shell interpreter is not working".to_string(), - )); - } - - let version = String::from_utf8_lossy(&output.stdout); - info!("Shell runtime ready: {}", version.trim()); - - Ok(()) - } - - async fn cleanup(&self) -> RuntimeResult<()> { - info!("Cleaning up Shell runtime"); - // Could clean up temporary files here - Ok(()) - } - - async fn validate(&self) -> RuntimeResult<()> { - debug!("Validating Shell runtime"); - - // Check if shell is available - let output = Command::new(&self.shell_path) - .arg("-c") - .arg("echo 'test'") - .output() - .await - .map_err(|e| RuntimeError::SetupError(format!("Shell validation failed: {}", e)))?; - - if !output.status.success() { - return Err(RuntimeError::SetupError( - "Shell interpreter validation failed".to_string(), - )); - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::collections::HashMap; - - #[tokio::test] - async fn test_shell_runtime_simple() { - let runtime = ShellRuntime::new(); - - let context = ExecutionContext { - execution_id: 1, - action_ref: "test.simple".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(10), - working_dir: None, - entry_point: "shell".to_string(), - code: Some("echo 'Hello, World!'".to_string()), - code_path: None, - runtime_name: Some("shell".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - cancel_token: None, - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(result.is_success()); - assert_eq!(result.exit_code, 0); - assert!(result.stdout.contains("Hello, World!")); - } - - #[tokio::test] - async fn test_shell_runtime_with_params() { - let runtime = ShellRuntime::new(); - - let context = ExecutionContext { - execution_id: 2, - action_ref: "test.params".to_string(), - parameters: { - let mut map = HashMap::new(); - map.insert("name".to_string(), serde_json::json!("Alice")); - map - }, - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(10), - working_dir: None, - entry_point: "shell".to_string(), - code: Some("echo \"Hello, $name!\"".to_string()), - code_path: None, - runtime_name: Some("shell".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - cancel_token: None, - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(result.is_success()); - assert!(result.stdout.contains("Hello, Alice!")); - } - - #[tokio::test] - async fn test_shell_runtime_timeout() { - let runtime = ShellRuntime::new(); - - let context = ExecutionContext { - execution_id: 3, - action_ref: "test.timeout".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(1), - working_dir: None, - entry_point: "shell".to_string(), - code: Some("sleep 10".to_string()), - code_path: None, - runtime_name: Some("shell".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - cancel_token: None, - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(!result.is_success()); - assert!(result.error.is_some()); - let error_msg = result.error.unwrap(); - assert!(error_msg.contains("timeout") || error_msg.contains("timed out")); - } - - #[tokio::test] - async fn test_shell_runtime_error() { - let runtime = ShellRuntime::new(); - - let context = ExecutionContext { - execution_id: 4, - action_ref: "test.error".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(10), - working_dir: None, - entry_point: "shell".to_string(), - code: Some("exit 1".to_string()), - code_path: None, - runtime_name: Some("shell".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - cancel_token: None, - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(!result.is_success()); - assert_eq!(result.exit_code, 1); - } - - #[tokio::test] - async fn test_shell_runtime_with_secrets() { - let runtime = ShellRuntime::new(); - - let context = ExecutionContext { - execution_id: 5, - action_ref: "test.secrets".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: { - let mut s = HashMap::new(); - s.insert("api_key".to_string(), serde_json::json!("secret_key_12345")); - s.insert( - "db_password".to_string(), - serde_json::json!("super_secret_pass"), - ); - s - }, - timeout: Some(10), - working_dir: None, - entry_point: "shell".to_string(), - code: Some( - r#" -# Access secrets via get_secret function -api_key=$(get_secret 'api_key') -db_pass=$(get_secret 'db_password') -missing=$(get_secret 'nonexistent') - -echo "api_key=$api_key" -echo "db_pass=$db_pass" -echo "missing=$missing" -"# - .to_string(), - ), - code_path: None, - runtime_name: Some("shell".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::default(), - cancel_token: None, - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(result.is_success()); - assert_eq!(result.exit_code, 0); - - // Verify secrets are accessible in action code - assert!(result.stdout.contains("api_key=secret_key_12345")); - assert!(result.stdout.contains("db_pass=super_secret_pass")); - assert!(result.stdout.contains("missing=")); - } - - #[tokio::test] - async fn test_shell_runtime_jsonl_output() { - let runtime = ShellRuntime::new(); - - let context = ExecutionContext { - execution_id: 6, - action_ref: "test.jsonl".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(10), - working_dir: None, - entry_point: "shell".to_string(), - code: Some( - r#" -echo '{"id": 1, "name": "Alice"}' -echo '{"id": 2, "name": "Bob"}' -echo '{"id": 3, "name": "Charlie"}' -"# - .to_string(), - ), - code_path: None, - runtime_name: Some("shell".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::Jsonl, - cancel_token: None, - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(result.is_success()); - assert_eq!(result.exit_code, 0); - - // Verify stdout is not populated when result is parsed (avoid duplication) - assert!( - result.stdout.is_empty(), - "stdout should be empty when result is parsed" - ); - - // Verify result is parsed as an array of JSON objects - let parsed_result = result.result.expect("Should have parsed result"); - assert!(parsed_result.is_array()); - - let items = parsed_result.as_array().unwrap(); - assert_eq!(items.len(), 3); - - // Verify first item - assert_eq!(items[0]["id"], 1); - assert_eq!(items[0]["name"], "Alice"); - - // Verify second item - assert_eq!(items[1]["id"], 2); - assert_eq!(items[1]["name"], "Bob"); - - // Verify third item - assert_eq!(items[2]["id"], 3); - assert_eq!(items[2]["name"], "Charlie"); - } - - #[tokio::test] - async fn test_shell_runtime_multiline_json_output() { - // Regression test: scripts that embed pretty-printed JSON (e.g., http_request.sh - // embedding a multi-line response body in its "json" field) produce multi-line - // stdout. The parser must handle this by trying to parse the full stdout as JSON - // before falling back to last-line parsing. - let runtime = ShellRuntime::new(); - - let context = ExecutionContext { - execution_id: 7, - action_ref: "test.multiline_json".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(10), - working_dir: None, - entry_point: "shell".to_string(), - code: Some( - r#" -# Simulate http_request.sh output with embedded pretty-printed JSON -printf '{"status_code":200,"body":"hello","json":{\n "args": {\n "hello": "world"\n },\n "url": "https://example.com"\n},"success":true}\n' -"# - .to_string(), - ), - code_path: None, - runtime_name: Some("shell".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::Json, - cancel_token: None, - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(result.is_success()); - assert_eq!(result.exit_code, 0); - - // Verify result was parsed (not stored as raw stdout) - let parsed = result - .result - .expect("Multi-line JSON should be parsed successfully"); - assert_eq!(parsed["status_code"], 200); - assert_eq!(parsed["success"], true); - assert_eq!(parsed["json"]["args"]["hello"], "world"); - - // stdout should be empty when result is successfully parsed - assert!( - result.stdout.is_empty(), - "stdout should be empty when result is parsed, got: {}", - result.stdout - ); - } - - #[tokio::test] - async fn test_shell_runtime_json_with_log_prefix() { - // Verify last-line fallback still works: scripts that log to stdout - // before the final JSON line should still parse correctly. - let runtime = ShellRuntime::new(); - - let context = ExecutionContext { - execution_id: 8, - action_ref: "test.json_with_logs".to_string(), - parameters: HashMap::new(), - env: HashMap::new(), - secrets: HashMap::new(), - timeout: Some(10), - working_dir: None, - entry_point: "shell".to_string(), - code: Some( - r#" -echo "Starting action..." -echo "Processing data..." -echo '{"result": "success", "count": 42}' -"# - .to_string(), - ), - code_path: None, - runtime_name: Some("shell".to_string()), - runtime_config_override: None, - runtime_env_dir_suffix: None, - selected_runtime_version: None, - max_stdout_bytes: 10 * 1024 * 1024, - max_stderr_bytes: 10 * 1024 * 1024, - parameter_delivery: attune_common::models::ParameterDelivery::default(), - parameter_format: attune_common::models::ParameterFormat::default(), - output_format: attune_common::models::OutputFormat::Json, - cancel_token: None, - }; - - let result = runtime.execute(context).await.unwrap(); - assert!(result.is_success()); - - let parsed = result.result.expect("Last-line JSON should be parsed"); - assert_eq!(parsed["result"], "success"); - assert_eq!(parsed["count"], 42); - } -} diff --git a/crates/worker/tests/log_truncation_test.rs b/crates/worker/tests/log_truncation_test.rs index 629a8e3..a8b32a8 100644 --- a/crates/worker/tests/log_truncation_test.rs +++ b/crates/worker/tests/log_truncation_test.rs @@ -4,10 +4,10 @@ //! configured size limits, preventing OOM issues with large output. use attune_common::models::runtime::{ - InlineExecutionConfig, InterpreterConfig, RuntimeExecutionConfig, + InlineExecutionConfig, InlineExecutionStrategy, InterpreterConfig, RuntimeExecutionConfig, }; use attune_worker::runtime::process::ProcessRuntime; -use attune_worker::runtime::{ExecutionContext, Runtime, ShellRuntime}; +use attune_worker::runtime::{ExecutionContext, Runtime}; use std::collections::HashMap; use std::path::PathBuf; use tempfile::TempDir; @@ -32,6 +32,30 @@ fn make_python_process_runtime(packs_base_dir: PathBuf) -> ProcessRuntime { ) } +fn make_shell_process_runtime(packs_base_dir: PathBuf) -> ProcessRuntime { + let config = RuntimeExecutionConfig { + interpreter: InterpreterConfig { + binary: "/bin/bash".to_string(), + args: vec![], + file_extension: Some(".sh".to_string()), + }, + inline_execution: InlineExecutionConfig { + strategy: InlineExecutionStrategy::TempFile, + extension: Some(".sh".to_string()), + inject_shell_helpers: true, + }, + environment: None, + dependencies: None, + env_vars: std::collections::HashMap::new(), + }; + ProcessRuntime::new( + "shell".to_string(), + config, + packs_base_dir.clone(), + packs_base_dir.join("../runtime_envs"), + ) +} + fn make_python_context( execution_id: i64, action_ref: &str, @@ -113,7 +137,8 @@ async fn test_python_stderr_truncation() { #[tokio::test] async fn test_shell_stdout_truncation() { - let runtime = ShellRuntime::new(); + let tmp = TempDir::new().unwrap(); + let runtime = make_shell_process_runtime(tmp.path().to_path_buf()); // Shell script that outputs more than the limit let code = r#" diff --git a/crates/worker/tests/security_tests.rs b/crates/worker/tests/security_tests.rs index e9ad3c3..d2d355b 100644 --- a/crates/worker/tests/security_tests.rs +++ b/crates/worker/tests/security_tests.rs @@ -4,10 +4,9 @@ //! or command-line arguments, ensuring secure secret passing via stdin. use attune_common::models::runtime::{ - InlineExecutionConfig, InterpreterConfig, RuntimeExecutionConfig, + InlineExecutionConfig, InlineExecutionStrategy, InterpreterConfig, RuntimeExecutionConfig, }; use attune_worker::runtime::process::ProcessRuntime; -use attune_worker::runtime::shell::ShellRuntime; use attune_worker::runtime::{ExecutionContext, Runtime}; use std::collections::HashMap; use std::path::PathBuf; @@ -37,6 +36,34 @@ fn make_python_process_runtime(packs_base_dir: PathBuf) -> ProcessRuntime { ) } +fn make_shell_process_runtime(packs_base_dir: PathBuf) -> ProcessRuntime { + let config = RuntimeExecutionConfig { + interpreter: InterpreterConfig { + binary: "/bin/bash".to_string(), + args: vec![], + file_extension: Some(".sh".to_string()), + }, + inline_execution: InlineExecutionConfig { + strategy: InlineExecutionStrategy::TempFile, + extension: Some(".sh".to_string()), + inject_shell_helpers: true, + }, + environment: None, + dependencies: None, + env_vars: std::collections::HashMap::new(), + }; + let runtime_envs_dir = packs_base_dir + .parent() + .unwrap_or(&packs_base_dir) + .join("runtime_envs"); + ProcessRuntime::new( + "shell".to_string(), + config, + packs_base_dir, + runtime_envs_dir, + ) +} + #[tokio::test] async fn test_python_secrets_not_in_environ() { let tmp = TempDir::new().unwrap(); @@ -117,7 +144,8 @@ print(json.dumps(result)) #[tokio::test] async fn test_shell_secrets_not_in_environ() { - let runtime = ShellRuntime::new(); + let tmp = TempDir::new().unwrap(); + let runtime = make_shell_process_runtime(tmp.path().to_path_buf()); let context = ExecutionContext { execution_id: 2, @@ -154,21 +182,21 @@ if printenv | grep -q "SECRET_API_KEY"; then exit 1 fi -# But secrets SHOULD be accessible via get_secret function -api_key=$(get_secret 'api_key') -password=$(get_secret 'password') +# Shell inline execution receives the merged input set as ordinary variables +api_key="$api_key" +password="$password" if [ "$api_key" != "super_secret_key_do_not_expose" ]; then - echo "ERROR: Secret not accessible via get_secret" + echo "ERROR: Secret not accessible via merged inputs" exit 1 fi if [ "$password" != "secret_pass_123" ]; then - echo "ERROR: Password not accessible via get_secret" + echo "ERROR: Password not accessible via merged inputs" exit 1 fi -echo "SECURITY_PASS: Secrets not in environment but accessible via get_secret" +echo "SECURITY_PASS: Secrets not in inherited environment and accessible via merged inputs" "# .to_string(), ), @@ -366,7 +394,8 @@ print("ok") #[tokio::test] async fn test_shell_empty_secrets() { - let runtime = ShellRuntime::new(); + let tmp = TempDir::new().unwrap(); + let runtime = make_shell_process_runtime(tmp.path().to_path_buf()); let context = ExecutionContext { execution_id: 6, @@ -379,12 +408,11 @@ async fn test_shell_empty_secrets() { entry_point: "shell".to_string(), code: Some( r#" -# get_secret should return empty string for non-existent secrets -result=$(get_secret 'nonexistent') -if [ -z "$result" ]; then - echo "PASS: Empty secret returns empty string" +# Unset merged inputs should expand to empty string +if [ -z "$nonexistent" ] && [ -z "$PARAM_NONEXISTENT" ]; then + echo "PASS: Missing input expands to empty string" else - echo "FAIL: Expected empty string" + echo "FAIL: Expected empty string for missing input" exit 1 fi "# diff --git a/migrations/20250101000005_execution_and_operations.sql b/migrations/20250101000005_execution_and_operations.sql index 6146f77..f73d022 100644 --- a/migrations/20250101000005_execution_and_operations.sql +++ b/migrations/20250101000005_execution_and_operations.sql @@ -26,6 +26,7 @@ CREATE TABLE execution ( parent BIGINT, -- self-reference; no FK because execution becomes a hypertable enforcement BIGINT, -- references enforcement(id); no FK (both are hypertables) executor BIGINT, -- references identity(id); no FK because execution becomes a hypertable + worker BIGINT, -- references worker(id); no FK because execution becomes a hypertable status execution_status_enum NOT NULL DEFAULT 'requested', result JSONB, started_at TIMESTAMPTZ, -- set when execution transitions to 'running' @@ -49,6 +50,7 @@ CREATE INDEX idx_execution_action_ref ON execution(action_ref); CREATE INDEX idx_execution_parent ON execution(parent); CREATE INDEX idx_execution_enforcement ON execution(enforcement); CREATE INDEX idx_execution_executor ON execution(executor); +CREATE INDEX idx_execution_worker ON execution(worker); CREATE INDEX idx_execution_status ON execution(status); CREATE INDEX idx_execution_created ON execution(created DESC); CREATE INDEX idx_execution_updated ON execution(updated DESC); @@ -56,6 +58,7 @@ CREATE INDEX idx_execution_status_created ON execution(status, created DESC); CREATE INDEX idx_execution_status_updated ON execution(status, updated DESC); CREATE INDEX idx_execution_action_status ON execution(action, status); CREATE INDEX idx_execution_executor_created ON execution(executor, created DESC); +CREATE INDEX idx_execution_worker_created ON execution(worker, created DESC); CREATE INDEX idx_execution_parent_created ON execution(parent, created DESC); CREATE INDEX idx_execution_result_gin ON execution USING GIN (result); CREATE INDEX idx_execution_env_vars_gin ON execution USING GIN (env_vars); @@ -77,6 +80,7 @@ COMMENT ON COLUMN execution.env_vars IS 'Environment variables for this executio COMMENT ON COLUMN execution.parent IS 'Parent execution ID for workflow hierarchies (no FK — execution is a hypertable)'; COMMENT ON COLUMN execution.enforcement IS 'Enforcement that triggered this execution (no FK — both are hypertables)'; COMMENT ON COLUMN execution.executor IS 'Identity that initiated the execution (no FK — execution is a hypertable)'; +COMMENT ON COLUMN execution.worker IS 'Assigned worker handling this execution (no FK — execution is a hypertable)'; COMMENT ON COLUMN execution.status IS 'Current execution lifecycle status'; COMMENT ON COLUMN execution.result IS 'Execution output/results'; COMMENT ON COLUMN execution.retry_count IS 'Current retry attempt number (0 = first attempt, 1 = first retry, etc.)'; diff --git a/migrations/20250101000009_timescaledb_history.sql b/migrations/20250101000009_timescaledb_history.sql index bdeae86..4499bc5 100644 --- a/migrations/20250101000009_timescaledb_history.sql +++ b/migrations/20250101000009_timescaledb_history.sql @@ -196,7 +196,7 @@ COMMENT ON TABLE execution IS 'Executions represent action runs with workflow su -- ---------------------------------------------------------------------------- -- execution history trigger --- Tracked fields: status, result, executor, workflow_task, env_vars, started_at +-- Tracked fields: status, result, executor, worker, workflow_task, env_vars, started_at -- Note: result uses _jsonb_digest_summary() to avoid storing large payloads -- ---------------------------------------------------------------------------- @@ -214,6 +214,7 @@ BEGIN 'status', NEW.status, 'action_ref', NEW.action_ref, 'executor', NEW.executor, + 'worker', NEW.worker, 'parent', NEW.parent, 'enforcement', NEW.enforcement, 'started_at', NEW.started_at @@ -249,6 +250,12 @@ BEGIN new_vals := new_vals || jsonb_build_object('executor', NEW.executor); END IF; + IF OLD.worker IS DISTINCT FROM NEW.worker THEN + changed := array_append(changed, 'worker'); + old_vals := old_vals || jsonb_build_object('worker', OLD.worker); + new_vals := new_vals || jsonb_build_object('worker', NEW.worker); + END IF; + IF OLD.workflow_task IS DISTINCT FROM NEW.workflow_task THEN changed := array_append(changed, 'workflow_task'); old_vals := old_vals || jsonb_build_object('workflow_task', OLD.workflow_task); diff --git a/web/src/api/models/ApiResponse_ExecutionResponse.ts b/web/src/api/models/ApiResponse_ExecutionResponse.ts index 8c5320c..efcb13a 100644 --- a/web/src/api/models/ApiResponse_ExecutionResponse.ts +++ b/web/src/api/models/ApiResponse_ExecutionResponse.ts @@ -32,7 +32,7 @@ export type ApiResponse_ExecutionResponse = { */ enforcement?: number | null; /** - * Executor ID (worker/executor that ran this) + * Identity ID that initiated this execution */ executor?: number | null; /** @@ -43,6 +43,10 @@ export type ApiResponse_ExecutionResponse = { * Parent execution ID (for nested/child executions) */ parent?: number | null; + /** + * Worker ID currently assigned to this execution + */ + worker?: number | null; /** * Execution result/output */ diff --git a/web/src/api/models/ExecutionResponse.ts b/web/src/api/models/ExecutionResponse.ts index 56d5459..15de79d 100644 --- a/web/src/api/models/ExecutionResponse.ts +++ b/web/src/api/models/ExecutionResponse.ts @@ -28,7 +28,7 @@ export type ExecutionResponse = { */ enforcement?: number | null; /** - * Executor ID (worker/executor that ran this) + * Identity ID that initiated this execution */ executor?: number | null; /** @@ -39,6 +39,10 @@ export type ExecutionResponse = { * Parent execution ID (for nested/child executions) */ parent?: number | null; + /** + * Worker ID currently assigned to this execution + */ + worker?: number | null; /** * Execution result/output */ diff --git a/web/src/api/services/ExecutionsService.ts b/web/src/api/services/ExecutionsService.ts index 5f3cd50..7ff0f58 100644 --- a/web/src/api/services/ExecutionsService.ts +++ b/web/src/api/services/ExecutionsService.ts @@ -224,7 +224,7 @@ export class ExecutionsService { */ enforcement?: number | null; /** - * Executor ID (worker/executor that ran this) + * Identity ID that initiated this execution */ executor?: number | null; /** @@ -235,6 +235,10 @@ export class ExecutionsService { * Parent execution ID (for nested/child executions) */ parent?: number | null; + /** + * Worker ID currently assigned to this execution + */ + worker?: number | null; /** * Execution result/output */ diff --git a/web/src/components/executions/ExecutionArtifactsPanel.tsx b/web/src/components/executions/ExecutionArtifactsPanel.tsx index f09d6d7..1be7e8b 100644 --- a/web/src/components/executions/ExecutionArtifactsPanel.tsx +++ b/web/src/components/executions/ExecutionArtifactsPanel.tsx @@ -337,11 +337,16 @@ function TextFileDetail({ interface ProgressDetailProps { artifactId: number; + isRunning?: boolean; onClose: () => void; } -function ProgressDetail({ artifactId, onClose }: ProgressDetailProps) { - const { data: artifactData, isLoading } = useArtifact(artifactId); +function ProgressDetail({ + artifactId, + isRunning = false, + onClose, +}: ProgressDetailProps) { + const { data: artifactData, isLoading } = useArtifact(artifactId, isRunning); const artifact = artifactData?.data; const progressEntries = useMemo(() => { @@ -707,6 +712,7 @@ export default function ExecutionArtifactsPanel({
setExpandedProgressId(null)} />
diff --git a/web/src/components/executions/ExecutionPreviewPanel.tsx b/web/src/components/executions/ExecutionPreviewPanel.tsx index e4fa3bd..faa9b5c 100644 --- a/web/src/components/executions/ExecutionPreviewPanel.tsx +++ b/web/src/components/executions/ExecutionPreviewPanel.tsx @@ -69,9 +69,10 @@ const ExecutionPreviewPanel = memo(function ExecutionPreviewPanel({ execution?.status === "running" || execution?.status === "scheduling" || execution?.status === "scheduled" || - execution?.status === "requested"; + execution?.status === "requested" || + execution?.status === "canceling"; - const isCancellable = isRunning || execution?.status === "canceling"; + const isCancellable = isRunning; const startedAt = execution?.started_at ? new Date(execution.started_at) @@ -241,13 +242,23 @@ const ExecutionPreviewPanel = memo(function ExecutionPreviewPanel({ {execution.executor && (
- Executor + Initiated By
#{execution.executor}
)} + {execution.worker && ( +
+
+ Worker +
+
+ #{execution.worker} +
+
+ )} {execution.workflow_task && (
diff --git a/web/src/hooks/useArtifacts.ts b/web/src/hooks/useArtifacts.ts index b786621..011b1b6 100644 --- a/web/src/hooks/useArtifacts.ts +++ b/web/src/hooks/useArtifacts.ts @@ -147,15 +147,18 @@ export function useExecutionArtifacts( return response; }, enabled: !!executionId, - staleTime: isRunning ? 3000 : 10000, - refetchInterval: isRunning ? 3000 : 10000, + staleTime: isRunning ? 3000 : 30000, + refetchInterval: isRunning ? 3000 : false, }); } /** * Fetch a single artifact by ID (includes data field for progress artifacts). + * + * @param isRunning - When true, polls every 3s for live updates. When false, + * uses a longer stale time and disables automatic polling. */ -export function useArtifact(id: number | undefined) { +export function useArtifact(id: number | undefined, isRunning = false) { return useQuery({ queryKey: ["artifacts", id], queryFn: async () => { @@ -169,8 +172,8 @@ export function useArtifact(id: number | undefined) { return response; }, enabled: !!id, - staleTime: 3000, - refetchInterval: 3000, + staleTime: isRunning ? 3000 : 30000, + refetchInterval: isRunning ? 3000 : false, }); } diff --git a/web/src/hooks/useExecutionStream.ts b/web/src/hooks/useExecutionStream.ts index 4f84cda..310f79c 100644 --- a/web/src/hooks/useExecutionStream.ts +++ b/web/src/hooks/useExecutionStream.ts @@ -205,6 +205,11 @@ export function useExecutionStream(options: UseExecutionStreamOptions = {}) { }, ); + queryClient.invalidateQueries({ + queryKey: ["history", "execution", executionNotification.entity_id], + exact: false, + }); + // Update execution list queries by modifying existing data. // We need to iterate manually to access query keys for filtering. const queries = queryClient diff --git a/web/src/hooks/useExecutions.ts b/web/src/hooks/useExecutions.ts index 3b0fc25..ab6bb8e 100644 --- a/web/src/hooks/useExecutions.ts +++ b/web/src/hooks/useExecutions.ts @@ -22,6 +22,16 @@ interface ExecutionsQueryParams { topLevelOnly?: boolean; } +function isExecutionActive(status: string | undefined): boolean { + return ( + status === "requested" || + status === "scheduling" || + status === "scheduled" || + status === "running" || + status === "canceling" + ); +} + export function useExecutions(params?: ExecutionsQueryParams) { // Check if any filters are applied const hasFilters = @@ -67,7 +77,9 @@ export function useExecution(id: number) { return response; }, enabled: !!id, - staleTime: 30000, // 30 seconds - SSE handles real-time updates + staleTime: 30000, + refetchInterval: (query) => + isExecutionActive(query.state.data?.data?.status) ? 3000 : false, }); } @@ -180,11 +192,7 @@ export function useChildExecutions(parentId: number | undefined) { const data = query.state.data; if (!data) return false; const hasActive = data.data.some( - (e) => - e.status === "requested" || - e.status === "scheduling" || - e.status === "scheduled" || - e.status === "running", + (e) => isExecutionActive(e.status), ); return hasActive ? 5000 : false; }, diff --git a/web/src/pages/executions/ExecutionDetailPage.tsx b/web/src/pages/executions/ExecutionDetailPage.tsx index 27ac738..9216023 100644 --- a/web/src/pages/executions/ExecutionDetailPage.tsx +++ b/web/src/pages/executions/ExecutionDetailPage.tsx @@ -199,10 +199,10 @@ export default function ExecutionDetailPage() { execution.status === ExecutionStatus.RUNNING || execution.status === ExecutionStatus.SCHEDULING || execution.status === ExecutionStatus.SCHEDULED || - execution.status === ExecutionStatus.REQUESTED; + execution.status === ExecutionStatus.REQUESTED || + execution.status === ExecutionStatus.CANCELING; - const isCancellable = - isRunning || execution.status === ExecutionStatus.CANCELING; + const isCancellable = isRunning; return (
@@ -392,13 +392,23 @@ export default function ExecutionDetailPage() { {execution.executor && (
- Executor ID + Initiated By
{execution.executor}
)} + {execution.worker && ( +
+
+ Worker ID +
+
+ {execution.worker} +
+
+ )} {/* Inline progress bar (visible when execution has progress artifacts) */}