diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml new file mode 100644 index 0000000..41abbdc --- /dev/null +++ b/.gitea/workflows/ci.yml @@ -0,0 +1,141 @@ +name: CI + +on: + pull_request: + push: + branches: + - main + - master + +env: + CARGO_TERM_COLOR: always + RUST_MIN_STACK: 16777216 + +jobs: + rust-blocking: + name: Rust Blocking Checks + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Rust + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt, clippy + + - name: Rustfmt + run: cargo fmt --all -- --check + + - name: Clippy + run: cargo clippy --workspace --all-targets --all-features -- -D warnings + + - name: Tests + run: cargo test --workspace --all-features + + - name: Install Rust security tooling + run: cargo install --locked cargo-audit cargo-deny + + - name: Cargo Audit + run: cargo audit + + - name: Cargo Deny + run: cargo deny check + + web-blocking: + name: Web Blocking Checks + runs-on: ubuntu-latest + defaults: + run: + working-directory: web + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "22" + cache: "npm" + cache-dependency-path: web/package-lock.json + + - name: Install dependencies + run: npm ci + + - name: ESLint + run: npm run lint + + - name: TypeScript + run: npm run typecheck + + - name: Build + run: npm run build + + security-blocking: + name: Security Blocking Checks + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Gitleaks + run: | + mkdir -p "$HOME/bin" + curl -sSfL https://raw.githubusercontent.com/gitleaks/gitleaks/master/install.sh \ + | sh -s -- -b "$HOME/bin" v8.24.2 + + - name: Gitleaks + run: | + "$HOME/bin/gitleaks" git \ + --report-format sarif \ + --report-path gitleaks.sarif \ + --config .gitleaks.toml + + web-advisory: + name: Web Advisory Checks + runs-on: ubuntu-latest + continue-on-error: true + defaults: + run: + working-directory: web + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Node.js + uses: actions/setup-node@v4 + with: + node-version: "22" + cache: "npm" + cache-dependency-path: web/package-lock.json + + - name: Install dependencies + run: npm ci + + - name: Knip + run: npm run knip + continue-on-error: true + + - name: NPM Audit (prod deps) + run: npm audit --omit=dev + continue-on-error: true + + security-advisory: + name: Security Advisory Checks + runs-on: ubuntu-latest + continue-on-error: true + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install Semgrep + run: pip install semgrep + + - name: Semgrep + run: semgrep scan --config p/default --error + continue-on-error: true diff --git a/.gitleaks.toml b/.gitleaks.toml new file mode 100644 index 0000000..428f99e --- /dev/null +++ b/.gitleaks.toml @@ -0,0 +1,16 @@ +title = "attune-gitleaks-config" + +[allowlist] +description = "Known development credentials and examples" +regexes = [ + '''test@attune\.local''', + '''TestPass123!''', + '''JWT_SECRET''', + '''ENCRYPTION_KEY''', +] +paths = [ + '''^docs/''', + '''^reference/''', + '''^web/openapi\.json$''', + '''^work-summary/''', +] diff --git a/.semgrepignore b/.semgrepignore new file mode 100644 index 0000000..bafa56d --- /dev/null +++ b/.semgrepignore @@ -0,0 +1,7 @@ +target/ +web/dist/ +web/node_modules/ +web/src/api/ +packs/ +packs.dev/ +packs.external/ diff --git a/Makefile b/Makefile index 85855e5..d31036f 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,8 @@ check fmt clippy install-tools db-create db-migrate db-reset docker-build \ docker-up docker-down docker-cache-warm docker-stop-system-services dev watch generate-agents-index \ docker-build-workers docker-build-worker-base docker-build-worker-python \ - docker-build-worker-node docker-build-worker-full + docker-build-worker-node docker-build-worker-full deny ci-rust ci-web-blocking ci-web-advisory \ + ci-security-blocking ci-security-advisory ci-blocking ci-advisory # Default target help: @@ -319,6 +320,42 @@ update: audit: cargo audit +deny: + cargo deny check + +ci-rust: + cargo fmt --all -- --check + cargo clippy --workspace --all-targets --all-features -- -D warnings + cargo test --workspace --all-features + cargo audit + cargo deny check + +ci-web-blocking: + cd web && npm ci + cd web && npm run lint + cd web && npm run typecheck + cd web && npm run build + +ci-web-advisory: + cd web && npm ci + cd web && npm run knip + cd web && npm audit --omit=dev + +ci-security-blocking: + mkdir -p $$HOME/bin + curl -sSfL https://raw.githubusercontent.com/gitleaks/gitleaks/master/install.sh | sh -s -- -b $$HOME/bin v8.24.2 + $$HOME/bin/gitleaks git --report-format sarif --report-path gitleaks.sarif --config .gitleaks.toml + +ci-security-advisory: + pip install semgrep + semgrep scan --config p/default --error + +ci-blocking: ci-rust ci-web-blocking ci-security-blocking + @echo "✅ Blocking CI checks passed!" + +ci-advisory: ci-web-advisory ci-security-advisory + @echo "Advisory CI checks complete." + # Check dependency tree tree: cargo tree @@ -333,5 +370,5 @@ pre-commit: fmt clippy test @echo "✅ All checks passed! Ready to commit." # CI simulation -ci: check clippy test +ci: ci-blocking ci-advisory @echo "✅ CI checks passed!" diff --git a/crates/api/src/routes/executions.rs b/crates/api/src/routes/executions.rs index 98c2702..82de45c 100644 --- a/crates/api/src/routes/executions.rs +++ b/crates/api/src/routes/executions.rs @@ -15,11 +15,17 @@ use std::sync::Arc; use tokio_stream::wrappers::BroadcastStream; use attune_common::models::enums::ExecutionStatus; -use attune_common::mq::{ExecutionRequestedPayload, MessageEnvelope, MessageType}; +use attune_common::mq::{ + ExecutionCancelRequestedPayload, ExecutionRequestedPayload, MessageEnvelope, MessageType, + Publisher, +}; use attune_common::repositories::{ action::ActionRepository, - execution::{CreateExecutionInput, ExecutionRepository, ExecutionSearchFilters}, - Create, FindById, FindByRef, + execution::{ + CreateExecutionInput, ExecutionRepository, ExecutionSearchFilters, UpdateExecutionInput, + }, + workflow::WorkflowExecutionRepository, + Create, FindById, FindByRef, Update, }; use sqlx::Row; @@ -357,6 +363,279 @@ pub async fn get_execution_stats( Ok((StatusCode::OK, Json(response))) } +/// Cancel a running execution +/// +/// This endpoint requests cancellation of an execution. The execution must be in a +/// cancellable state (requested, scheduling, scheduled, running, or canceling). +/// For running executions, the worker will send SIGINT to the process, then SIGTERM +/// after a 10-second grace period if it hasn't stopped. +/// +/// **Workflow cascading**: When a workflow (parent) execution is cancelled, all of +/// its incomplete child task executions are also cancelled. Children that haven't +/// reached a worker yet are set to Cancelled immediately; children that are running +/// receive a cancel MQ message so their worker can gracefully stop the process. +/// The workflow_execution record is also marked as Cancelled to prevent the +/// scheduler from dispatching any further tasks. +#[utoipa::path( + post, + path = "/api/v1/executions/{id}/cancel", + tag = "executions", + params( + ("id" = i64, Path, description = "Execution ID") + ), + responses( + (status = 200, description = "Cancellation requested", body = inline(ApiResponse)), + (status = 404, description = "Execution not found"), + (status = 409, description = "Execution is not in a cancellable state"), + ), + security(("bearer_auth" = [])) +)] +pub async fn cancel_execution( + State(state): State>, + RequireAuth(_user): RequireAuth, + Path(id): Path, +) -> ApiResult { + // Load the execution + let execution = ExecutionRepository::find_by_id(&state.db, id) + .await? + .ok_or_else(|| ApiError::NotFound(format!("Execution with ID {} not found", id)))?; + + // Check if the execution is in a cancellable state + let cancellable = matches!( + execution.status, + ExecutionStatus::Requested + | ExecutionStatus::Scheduling + | ExecutionStatus::Scheduled + | ExecutionStatus::Running + | ExecutionStatus::Canceling + ); + + if !cancellable { + return Err(ApiError::Conflict(format!( + "Execution {} is in status '{}' and cannot be cancelled", + id, + format!("{:?}", execution.status).to_lowercase() + ))); + } + + // If already canceling, just return the current state + if execution.status == ExecutionStatus::Canceling { + let response = ApiResponse::new(ExecutionResponse::from(execution)); + return Ok((StatusCode::OK, Json(response))); + } + + let publisher = state.get_publisher().await; + + // For executions that haven't reached a worker yet, cancel immediately + if matches!( + execution.status, + ExecutionStatus::Requested | ExecutionStatus::Scheduling | ExecutionStatus::Scheduled + ) { + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Cancelled), + result: Some( + serde_json::json!({"error": "Cancelled by user before execution started"}), + ), + ..Default::default() + }; + let updated = ExecutionRepository::update(&state.db, id, update).await?; + + // Cascade to workflow children if this is a workflow execution + cancel_workflow_children(&state.db, publisher.as_deref(), id).await; + + let response = ApiResponse::new(ExecutionResponse::from(updated)); + return Ok((StatusCode::OK, Json(response))); + } + + // For running executions, set status to Canceling and send cancel message to the worker + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Canceling), + ..Default::default() + }; + let updated = ExecutionRepository::update(&state.db, id, update).await?; + + // Send cancel request to the worker via MQ + if let Some(worker_id) = execution.executor { + send_cancel_to_worker(publisher.as_deref(), id, worker_id).await; + } else { + tracing::warn!( + "Execution {} has no executor/worker assigned; marked as canceling but no MQ message sent", + id + ); + } + + // Cascade to workflow children if this is a workflow execution + cancel_workflow_children(&state.db, publisher.as_deref(), id).await; + + let response = ApiResponse::new(ExecutionResponse::from(updated)); + Ok((StatusCode::OK, Json(response))) +} + +/// Send a cancel MQ message to a specific worker for a specific execution. +async fn send_cancel_to_worker(publisher: Option<&Publisher>, execution_id: i64, worker_id: i64) { + let payload = ExecutionCancelRequestedPayload { + execution_id, + worker_id, + }; + + let envelope = MessageEnvelope::new(MessageType::ExecutionCancelRequested, payload) + .with_source("api-service") + .with_correlation_id(uuid::Uuid::new_v4()); + + if let Some(publisher) = publisher { + let routing_key = format!("execution.cancel.worker.{}", worker_id); + let exchange = "attune.executions"; + if let Err(e) = publisher + .publish_envelope_with_routing(&envelope, exchange, &routing_key) + .await + { + tracing::error!( + "Failed to publish cancel request for execution {}: {}", + execution_id, + e + ); + } + } else { + tracing::warn!( + "No MQ publisher available to send cancel request for execution {}", + execution_id + ); + } +} + +/// Cancel all incomplete child executions of a workflow parent execution. +/// +/// This handles the workflow cascade: when a workflow execution is cancelled, +/// its child task executions must also be cancelled to prevent further work. +/// Additionally, the `workflow_execution` record is marked Cancelled so the +/// scheduler's `advance_workflow` will short-circuit and not dispatch new tasks. +/// +/// Children in pre-running states (Requested, Scheduling, Scheduled) are set +/// to Cancelled immediately. Children that are Running receive a cancel MQ +/// message so their worker can gracefully stop the process. +async fn cancel_workflow_children( + db: &sqlx::PgPool, + publisher: Option<&Publisher>, + parent_execution_id: i64, +) { + // Find all child executions that are still incomplete + let children: Vec = match sqlx::query_as::< + _, + attune_common::models::Execution, + >(&format!( + "SELECT {} FROM execution WHERE parent = $1 AND status NOT IN ('completed', 'failed', 'timeout', 'cancelled', 'abandoned')", + attune_common::repositories::execution::SELECT_COLUMNS + )) + .bind(parent_execution_id) + .fetch_all(db) + .await + { + Ok(rows) => rows, + Err(e) => { + tracing::error!( + "Failed to fetch child executions for parent {}: {}", + parent_execution_id, + e + ); + return; + } + }; + + if children.is_empty() { + return; + } + + tracing::info!( + "Cascading cancellation from execution {} to {} child execution(s)", + parent_execution_id, + children.len() + ); + + for child in &children { + let child_id = child.id; + + if matches!( + child.status, + ExecutionStatus::Requested | ExecutionStatus::Scheduling | ExecutionStatus::Scheduled + ) { + // Pre-running: cancel immediately in DB + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Cancelled), + result: Some(serde_json::json!({ + "error": "Cancelled: parent workflow execution was cancelled" + })), + ..Default::default() + }; + if let Err(e) = ExecutionRepository::update(db, child_id, update).await { + tracing::error!("Failed to cancel child execution {}: {}", child_id, e); + } else { + tracing::info!("Cancelled pre-running child execution {}", child_id); + } + } else if matches!( + child.status, + ExecutionStatus::Running | ExecutionStatus::Canceling + ) { + // Running: set to Canceling and send MQ message to the worker + if child.status != ExecutionStatus::Canceling { + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Canceling), + ..Default::default() + }; + if let Err(e) = ExecutionRepository::update(db, child_id, update).await { + tracing::error!( + "Failed to set child execution {} to canceling: {}", + child_id, + e + ); + } + } + + if let Some(worker_id) = child.executor { + send_cancel_to_worker(publisher, child_id, worker_id).await; + } + } + + // Recursively cancel grandchildren (nested workflows) + // Use Box::pin to allow the recursive async call + Box::pin(cancel_workflow_children(db, publisher, child_id)).await; + } + + // Also mark any associated workflow_execution record as Cancelled so that + // advance_workflow short-circuits and does not dispatch new tasks. + // A workflow_execution is linked to the parent execution via its `execution` column. + if let Ok(Some(wf_exec)) = + WorkflowExecutionRepository::find_by_execution(db, parent_execution_id).await + { + if !matches!( + wf_exec.status, + ExecutionStatus::Completed | ExecutionStatus::Failed | ExecutionStatus::Cancelled + ) { + let wf_update = attune_common::repositories::workflow::UpdateWorkflowExecutionInput { + status: Some(ExecutionStatus::Cancelled), + error_message: Some( + "Cancelled: parent workflow execution was cancelled".to_string(), + ), + current_tasks: Some(vec![]), + completed_tasks: None, + failed_tasks: None, + skipped_tasks: None, + variables: None, + paused: None, + pause_reason: None, + }; + if let Err(e) = WorkflowExecutionRepository::update(db, wf_exec.id, wf_update).await { + tracing::error!("Failed to cancel workflow_execution {}: {}", wf_exec.id, e); + } else { + tracing::info!( + "Cancelled workflow_execution {} for parent execution {}", + wf_exec.id, + parent_execution_id + ); + } + } + } +} + /// Create execution routes /// Stream execution updates via Server-Sent Events /// @@ -443,6 +722,10 @@ pub fn routes() -> Router> { .route("/executions/stats", get(get_execution_stats)) .route("/executions/stream", get(stream_execution_updates)) .route("/executions/{id}", get(get_execution)) + .route( + "/executions/{id}/cancel", + axum::routing::post(cancel_execution), + ) .route( "/executions/status/{status}", get(list_executions_by_status), diff --git a/crates/common/src/mq/connection.rs b/crates/common/src/mq/connection.rs index 2dccf2b..039522d 100644 --- a/crates/common/src/mq/connection.rs +++ b/crates/common/src/mq/connection.rs @@ -525,6 +525,28 @@ impl Connection { ) .await?; + // --- Cancel queue --- + // Each worker gets its own queue for execution cancel requests so that + // the API can target a specific worker to gracefully stop a running process. + let cancel_queue_name = format!("worker.{}.cancel", worker_id); + let cancel_queue_config = QueueConfig { + name: cancel_queue_name.clone(), + durable: true, + exclusive: false, + auto_delete: false, + }; + + self.declare_queue_with_optional_dlx(&cancel_queue_config, dlx) + .await?; + + // Bind to worker-specific cancel routing key on the executions exchange + self.bind_queue( + &cancel_queue_name, + &config.rabbitmq.exchanges.executions.name, + &format!("execution.cancel.worker.{}", worker_id), + ) + .await?; + info!( "Worker infrastructure setup complete for worker ID {}", worker_id diff --git a/crates/common/src/mq/messages.rs b/crates/common/src/mq/messages.rs index 0d7bcb4..34c5946 100644 --- a/crates/common/src/mq/messages.rs +++ b/crates/common/src/mq/messages.rs @@ -67,6 +67,8 @@ pub enum MessageType { RuleDisabled, /// Pack registered or installed (triggers runtime environment setup in workers) PackRegistered, + /// Execution cancel requested (sent to worker to gracefully stop a running execution) + ExecutionCancelRequested, } impl MessageType { @@ -85,6 +87,7 @@ impl MessageType { Self::RuleEnabled => "rule.enabled".to_string(), Self::RuleDisabled => "rule.disabled".to_string(), Self::PackRegistered => "pack.registered".to_string(), + Self::ExecutionCancelRequested => "execution.cancel".to_string(), } } @@ -102,6 +105,7 @@ impl MessageType { "attune.events".to_string() } Self::PackRegistered => "attune.events".to_string(), + Self::ExecutionCancelRequested => "attune.executions".to_string(), } } @@ -120,6 +124,7 @@ impl MessageType { Self::RuleEnabled => "RuleEnabled", Self::RuleDisabled => "RuleDisabled", Self::PackRegistered => "PackRegistered", + Self::ExecutionCancelRequested => "ExecutionCancelRequested", } } } @@ -474,6 +479,19 @@ pub struct PackRegisteredPayload { pub runtime_names: Vec, } +/// Payload for ExecutionCancelRequested message +/// +/// Sent by the API to the worker that is running a specific execution, +/// instructing it to gracefully terminate the process (SIGINT, then SIGTERM +/// after a grace period). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExecutionCancelRequestedPayload { + /// Execution ID to cancel + pub execution_id: Id, + /// Worker ID that should handle this cancel (used for routing) + pub worker_id: Id, +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/common/src/mq/mod.rs b/crates/common/src/mq/mod.rs index 3464e26..f2fbfeb 100644 --- a/crates/common/src/mq/mod.rs +++ b/crates/common/src/mq/mod.rs @@ -57,10 +57,11 @@ pub use consumer::{Consumer, ConsumerConfig}; pub use error::{MqError, MqResult}; pub use message_queue::MessageQueue; pub use messages::{ - EnforcementCreatedPayload, EventCreatedPayload, ExecutionCompletedPayload, - ExecutionRequestedPayload, ExecutionStatusChangedPayload, InquiryCreatedPayload, - InquiryRespondedPayload, Message, MessageEnvelope, MessageType, NotificationCreatedPayload, - PackRegisteredPayload, RuleCreatedPayload, RuleDisabledPayload, RuleEnabledPayload, + EnforcementCreatedPayload, EventCreatedPayload, ExecutionCancelRequestedPayload, + ExecutionCompletedPayload, ExecutionRequestedPayload, ExecutionStatusChangedPayload, + InquiryCreatedPayload, InquiryRespondedPayload, Message, MessageEnvelope, MessageType, + NotificationCreatedPayload, PackRegisteredPayload, RuleCreatedPayload, RuleDisabledPayload, + RuleEnabledPayload, }; pub use publisher::{Publisher, PublisherConfig}; @@ -224,6 +225,8 @@ pub mod routing_keys { pub const NOTIFICATION_CREATED: &str = "notification.created"; /// Pack registered routing key pub const PACK_REGISTERED: &str = "pack.registered"; + /// Execution cancel requested routing key + pub const EXECUTION_CANCEL: &str = "execution.cancel"; } #[cfg(test)] diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 3f2e3d2..b6b105e 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -13,6 +13,7 @@ path = "src/main.rs" [dependencies] attune-common = { path = "../common" } tokio = { workspace = true } +tokio-util = { workspace = true } sqlx = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -34,5 +35,6 @@ sha2 = { workspace = true } base64 = { workspace = true } tempfile = { workspace = true } jsonwebtoken = { workspace = true } +libc = "0.2" [dev-dependencies] diff --git a/crates/worker/src/executor.rs b/crates/worker/src/executor.rs index 0a8949c..e6c9072 100644 --- a/crates/worker/src/executor.rs +++ b/crates/worker/src/executor.rs @@ -48,6 +48,8 @@ pub struct ActionExecutor { jwt_config: JwtConfig, } +use tokio_util::sync::CancellationToken; + /// Normalize a server bind address into a connectable URL. /// /// When the server binds to `0.0.0.0` (all interfaces) or `::` (IPv6 any), @@ -90,6 +92,19 @@ impl ActionExecutor { /// Execute an action for the given execution pub async fn execute(&self, execution_id: i64) -> Result { + self.execute_with_cancel(execution_id, CancellationToken::new()) + .await + } + + /// Execute an action for the given execution, with cancellation support. + /// + /// When the `cancel_token` is triggered, the running process receives + /// SIGINT → SIGTERM → SIGKILL with escalating grace periods. + pub async fn execute_with_cancel( + &self, + execution_id: i64, + cancel_token: CancellationToken, + ) -> Result { info!("Starting execution: {}", execution_id); // Update execution status to running @@ -108,7 +123,7 @@ impl ActionExecutor { let action = self.load_action(&execution).await?; // Prepare execution context - let context = match self.prepare_execution_context(&execution, &action).await { + let mut context = match self.prepare_execution_context(&execution, &action).await { Ok(ctx) => ctx, Err(e) => { error!("Failed to prepare execution context: {}", e); @@ -122,6 +137,9 @@ impl ActionExecutor { } }; + // Attach the cancellation token so the process executor can monitor it + context.cancel_token = Some(cancel_token); + // Execute the action // Note: execute_action should rarely return Err - most failures should be // captured in ExecutionResult with non-zero exit codes @@ -520,6 +538,7 @@ impl ActionExecutor { parameter_delivery: action.parameter_delivery, parameter_format: action.parameter_format, output_format: action.output_format, + cancel_token: None, }; Ok(context) diff --git a/crates/worker/src/runtime/local.rs b/crates/worker/src/runtime/local.rs index b2a5aeb..af0c9b2 100644 --- a/crates/worker/src/runtime/local.rs +++ b/crates/worker/src/runtime/local.rs @@ -177,6 +177,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; assert!(runtime.can_execute(&context)); @@ -209,6 +210,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; assert!(!runtime.can_execute(&context)); diff --git a/crates/worker/src/runtime/mod.rs b/crates/worker/src/runtime/mod.rs index 001cdc0..fdffbb9 100644 --- a/crates/worker/src/runtime/mod.rs +++ b/crates/worker/src/runtime/mod.rs @@ -43,6 +43,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::PathBuf; use thiserror::Error; +use tokio_util::sync::CancellationToken; // Re-export dependency management types pub use dependency::{ @@ -90,7 +91,7 @@ pub enum RuntimeError { } /// Action execution context -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub struct ExecutionContext { /// Execution ID pub execution_id: i64, @@ -130,7 +131,6 @@ pub struct ExecutionContext { /// "Python" runtime). When present, `ProcessRuntime` uses this config /// instead of its built-in one for interpreter resolution, environment /// setup, and dependency management. - #[serde(skip)] pub runtime_config_override: Option, /// Optional override of the environment directory suffix. When a specific @@ -144,28 +144,24 @@ pub struct ExecutionContext { pub selected_runtime_version: Option, /// Maximum stdout size in bytes (for log truncation) - #[serde(default = "default_max_log_bytes")] pub max_stdout_bytes: usize, /// Maximum stderr size in bytes (for log truncation) - #[serde(default = "default_max_log_bytes")] pub max_stderr_bytes: usize, /// How parameters should be delivered to the action - #[serde(default)] pub parameter_delivery: ParameterDelivery, /// Format for parameter serialization - #[serde(default)] pub parameter_format: ParameterFormat, /// Format for output parsing - #[serde(default)] pub output_format: OutputFormat, -} -fn default_max_log_bytes() -> usize { - 10 * 1024 * 1024 // 10MB + /// Optional cancellation token for graceful process termination. + /// When triggered, the executor sends SIGINT → SIGTERM → SIGKILL + /// with escalating grace periods. + pub cancel_token: Option, } impl ExecutionContext { @@ -193,6 +189,7 @@ impl ExecutionContext { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, } } } diff --git a/crates/worker/src/runtime/process.rs b/crates/worker/src/runtime/process.rs index e58681a..65bd9c4 100644 --- a/crates/worker/src/runtime/process.rs +++ b/crates/worker/src/runtime/process.rs @@ -725,8 +725,8 @@ impl Runtime for ProcessRuntime { .unwrap_or_else(|| "".to_string()), ); - // Execute with streaming output capture - process_executor::execute_streaming( + // Execute with streaming output capture (with optional cancellation support) + process_executor::execute_streaming_cancellable( cmd, &context.secrets, parameters_stdin, @@ -734,6 +734,7 @@ impl Runtime for ProcessRuntime { context.max_stdout_bytes, context.max_stderr_bytes, context.output_format, + context.cancel_token.clone(), ) .await } @@ -905,6 +906,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; assert!(runtime.can_execute(&context)); @@ -939,6 +941,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; assert!(runtime.can_execute(&context)); @@ -973,6 +976,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; assert!(!runtime.can_execute(&context)); @@ -1063,6 +1067,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -1120,6 +1125,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -1158,6 +1164,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -1208,6 +1215,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -1316,6 +1324,7 @@ mod tests { parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); diff --git a/crates/worker/src/runtime/process_executor.rs b/crates/worker/src/runtime/process_executor.rs index 650556f..e9018e9 100644 --- a/crates/worker/src/runtime/process_executor.rs +++ b/crates/worker/src/runtime/process_executor.rs @@ -4,6 +4,14 @@ //! implementations. Handles streaming stdout/stderr capture, bounded log //! collection, timeout management, stdin parameter/secret delivery, and //! output format parsing. +//! +//! ## Cancellation Support +//! +//! When a `CancellationToken` is provided, the executor monitors it alongside +//! the running process. On cancellation: +//! 1. SIGINT is sent to the process (allows graceful shutdown) +//! 2. After a 10-second grace period, SIGTERM is sent if the process hasn't exited +//! 3. After another 5-second grace period, SIGKILL is sent as a last resort use super::{BoundedLogWriter, ExecutionResult, OutputFormat, RuntimeResult}; use std::collections::HashMap; @@ -12,7 +20,8 @@ use std::time::Instant; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::Command; use tokio::time::timeout; -use tracing::{debug, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; /// Execute a subprocess command with streaming output capture. /// @@ -33,6 +42,48 @@ use tracing::{debug, warn}; /// * `max_stderr_bytes` - Maximum stderr size before truncation /// * `output_format` - How to parse stdout (Text, Json, Yaml, Jsonl) pub async fn execute_streaming( + cmd: Command, + secrets: &HashMap, + parameters_stdin: Option<&str>, + timeout_secs: Option, + max_stdout_bytes: usize, + max_stderr_bytes: usize, + output_format: OutputFormat, +) -> RuntimeResult { + execute_streaming_cancellable( + cmd, + secrets, + parameters_stdin, + timeout_secs, + max_stdout_bytes, + max_stderr_bytes, + output_format, + None, + ) + .await +} + +/// Execute a subprocess command with streaming output capture and optional cancellation. +/// +/// This is the core execution function used by all runtime implementations. +/// It handles: +/// - Spawning the process with piped I/O +/// - Writing parameters and secrets to stdin +/// - Streaming stdout/stderr with bounded log collection +/// - Timeout management +/// - Graceful cancellation via SIGINT → SIGTERM → SIGKILL escalation +/// - Output format parsing (JSON, YAML, JSONL, text) +/// +/// # Arguments +/// * `cmd` - Pre-configured `Command` (interpreter, args, env vars, working dir already set) +/// * `secrets` - Secrets to pass via stdin (as JSON) +/// * `parameters_stdin` - Optional parameter data to write to stdin before secrets +/// * `timeout_secs` - Optional execution timeout in seconds +/// * `max_stdout_bytes` - Maximum stdout size before truncation +/// * `max_stderr_bytes` - Maximum stderr size before truncation +/// * `output_format` - How to parse stdout (Text, Json, Yaml, Jsonl) +/// * `cancel_token` - Optional cancellation token for graceful process termination +pub async fn execute_streaming_cancellable( mut cmd: Command, secrets: &HashMap, parameters_stdin: Option<&str>, @@ -40,6 +91,7 @@ pub async fn execute_streaming( max_stdout_bytes: usize, max_stderr_bytes: usize, output_format: OutputFormat, + cancel_token: Option, ) -> RuntimeResult { let start = Instant::now(); @@ -134,15 +186,74 @@ pub async fn execute_streaming( stderr_writer }; - // Wait for both streams and the process - let (stdout_writer, stderr_writer, wait_result) = - tokio::join!(stdout_task, stderr_task, async { + // Determine the process ID for signal-based cancellation. + // Must be read before we move `child` into the wait future. + let child_pid = child.id(); + + // Build the wait future that handles timeout, cancellation, and normal completion. + // + // The result is a tuple: (wait_result, was_cancelled) + // - wait_result mirrors the original type: Result, Elapsed> + // - was_cancelled indicates the process was stopped by a cancel request + let wait_future = async { + // Inner future: wait for the child process to exit + let wait_child = child.wait(); + + // Apply optional timeout wrapping + let timed_wait = async { if let Some(timeout_secs) = timeout_secs { - timeout(std::time::Duration::from_secs(timeout_secs), child.wait()).await + timeout(std::time::Duration::from_secs(timeout_secs), wait_child).await } else { - Ok(child.wait().await) + Ok(wait_child.await) } - }); + }; + + // If we have a cancel token, race it against the (possibly-timed) wait + if let Some(ref token) = cancel_token { + tokio::select! { + result = timed_wait => (result, false), + _ = token.cancelled() => { + // Cancellation requested — escalate signals to the child process. + info!("Cancel signal received, sending SIGINT to process"); + if let Some(pid) = child_pid { + send_signal(pid, libc::SIGINT); + } + + // Grace period: wait up to 10s for the process to exit after SIGINT. + match timeout(std::time::Duration::from_secs(10), child.wait()).await { + Ok(status) => (Ok(status), true), + Err(_) => { + // Still alive — escalate to SIGTERM + warn!("Process did not exit after SIGINT + 10s grace period, sending SIGTERM"); + if let Some(pid) = child_pid { + send_signal(pid, libc::SIGTERM); + } + + // Final grace period: wait up to 5s for SIGTERM + match timeout(std::time::Duration::from_secs(5), child.wait()).await { + Ok(status) => (Ok(status), true), + Err(_) => { + // Last resort — SIGKILL + warn!("Process did not exit after SIGTERM + 5s, sending SIGKILL"); + if let Some(pid) = child_pid { + send_signal(pid, libc::SIGKILL); + } + // Wait indefinitely for the SIGKILL to take effect + (Ok(child.wait().await), true) + } + } + } + } + } + } + } else { + (timed_wait.await, false) + } + }; + + // Wait for both streams and the process + let (stdout_writer, stderr_writer, (wait_result, was_cancelled)) = + tokio::join!(stdout_task, stderr_task, wait_future); let duration_ms = start.elapsed().as_millis() as u64; @@ -177,6 +288,22 @@ pub async fn execute_streaming( } }; + // If the process was cancelled, return a specific result + if was_cancelled { + return Ok(ExecutionResult { + exit_code, + stdout: stdout_result.content.clone(), + stderr: stderr_result.content.clone(), + result: None, + duration_ms, + error: Some("Execution cancelled by user".to_string()), + stdout_truncated: stdout_result.truncated, + stderr_truncated: stderr_result.truncated, + stdout_bytes_truncated: stdout_result.bytes_truncated, + stderr_bytes_truncated: stderr_result.bytes_truncated, + }); + } + debug!( "Process execution completed: exit_code={}, duration={}ms, stdout_truncated={}, stderr_truncated={}", exit_code, duration_ms, stdout_result.truncated, stderr_result.truncated @@ -248,6 +375,19 @@ pub async fn execute_streaming( } /// Parse stdout content according to the specified output format. +/// Send a Unix signal to a process by PID. +/// +/// Uses raw `libc::kill()` to deliver signals for graceful process termination. +/// This is safe because we only send signals to child processes we spawned. +fn send_signal(pid: u32, signal: i32) { + // Safety: we're sending a signal to a known child process PID. + // The PID is valid because we obtained it from `child.id()` before the + // child exited. + unsafe { + libc::kill(pid as i32, signal); + } +} + fn parse_output(stdout: &str, format: OutputFormat) -> Option { let trimmed = stdout.trim(); if trimmed.is_empty() { diff --git a/crates/worker/src/runtime/shell.rs b/crates/worker/src/runtime/shell.rs index dbd7266..ef8c4e5 100644 --- a/crates/worker/src/runtime/shell.rs +++ b/crates/worker/src/runtime/shell.rs @@ -623,6 +623,7 @@ mod tests { parameter_delivery: attune_common::models::ParameterDelivery::default(), parameter_format: attune_common::models::ParameterFormat::default(), output_format: attune_common::models::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -659,6 +660,7 @@ mod tests { parameter_delivery: attune_common::models::ParameterDelivery::default(), parameter_format: attune_common::models::ParameterFormat::default(), output_format: attune_common::models::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -690,6 +692,7 @@ mod tests { parameter_delivery: attune_common::models::ParameterDelivery::default(), parameter_format: attune_common::models::ParameterFormat::default(), output_format: attune_common::models::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -723,6 +726,7 @@ mod tests { parameter_delivery: attune_common::models::ParameterDelivery::default(), parameter_format: attune_common::models::ParameterFormat::default(), output_format: attune_common::models::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -771,6 +775,7 @@ echo "missing=$missing" parameter_delivery: attune_common::models::ParameterDelivery::default(), parameter_format: attune_common::models::ParameterFormat::default(), output_format: attune_common::models::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -814,6 +819,7 @@ echo '{"id": 3, "name": "Charlie"}' parameter_delivery: attune_common::models::ParameterDelivery::default(), parameter_format: attune_common::models::ParameterFormat::default(), output_format: attune_common::models::OutputFormat::Jsonl, + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -880,6 +886,7 @@ printf '{"status_code":200,"body":"hello","json":{\n "args": {\n "hello": "w parameter_delivery: attune_common::models::ParameterDelivery::default(), parameter_format: attune_common::models::ParameterFormat::default(), output_format: attune_common::models::OutputFormat::Json, + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -935,6 +942,7 @@ echo '{"result": "success", "count": 42}' parameter_delivery: attune_common::models::ParameterDelivery::default(), parameter_format: attune_common::models::ParameterFormat::default(), output_format: attune_common::models::OutputFormat::Json, + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); diff --git a/crates/worker/src/service.rs b/crates/worker/src/service.rs index f94f662..ff096b2 100644 --- a/crates/worker/src/service.rs +++ b/crates/worker/src/service.rs @@ -11,7 +11,7 @@ //! 4. **Verify runtime versions** — run verification commands for each registered //! `RuntimeVersion` to determine which are available on this host/container //! 5. **Set up runtime environments** — create per-version environments for packs -//! 6. Start heartbeat, execution consumer, and pack registration consumer +//! 6. Start heartbeat, execution consumer, pack registration consumer, and cancel consumer use attune_common::config::Config; use attune_common::db::Database; @@ -19,19 +19,21 @@ use attune_common::error::{Error, Result}; use attune_common::models::ExecutionStatus; use attune_common::mq::{ config::MessageQueueConfig as MqConfig, Connection, Consumer, ConsumerConfig, - ExecutionCompletedPayload, ExecutionStatusChangedPayload, MessageEnvelope, MessageType, - PackRegisteredPayload, Publisher, PublisherConfig, + ExecutionCancelRequestedPayload, ExecutionCompletedPayload, ExecutionStatusChangedPayload, + MessageEnvelope, MessageType, PackRegisteredPayload, Publisher, PublisherConfig, }; use attune_common::repositories::{execution::ExecutionRepository, FindById}; use attune_common::runtime_detection::runtime_in_filter; use chrono::Utc; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock, Semaphore}; use tokio::task::{JoinHandle, JoinSet}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use crate::artifacts::ArtifactManager; @@ -72,6 +74,8 @@ pub struct WorkerService { consumer_handle: Option>, pack_consumer: Option>, pack_consumer_handle: Option>, + cancel_consumer: Option>, + cancel_consumer_handle: Option>, worker_id: Option, /// Runtime filter derived from ATTUNE_WORKER_RUNTIMES runtime_filter: Option>, @@ -83,6 +87,10 @@ pub struct WorkerService { execution_semaphore: Arc, /// Tracks in-flight execution tasks for graceful shutdown in_flight_tasks: Arc>>, + /// Maps execution ID → CancellationToken for running processes. + /// When a cancel request arrives, the token is triggered, causing + /// the process executor to send SIGINT → SIGTERM → SIGKILL. + cancel_tokens: Arc>>, } impl WorkerService { @@ -362,12 +370,15 @@ impl WorkerService { consumer_handle: None, pack_consumer: None, pack_consumer_handle: None, + cancel_consumer: None, + cancel_consumer_handle: None, worker_id: None, runtime_filter: runtime_filter_for_service, packs_base_dir, runtime_envs_dir, execution_semaphore: Arc::new(Semaphore::new(max_concurrent_tasks)), in_flight_tasks: Arc::new(Mutex::new(JoinSet::new())), + cancel_tokens: Arc::new(Mutex::new(HashMap::new())), }) } @@ -416,6 +427,9 @@ impl WorkerService { // Start consuming pack registration events self.start_pack_consumer().await?; + // Start consuming cancel requests + self.start_cancel_consumer().await?; + info!("Worker Service started successfully"); Ok(()) @@ -640,6 +654,12 @@ impl WorkerService { let _ = handle.await; } + if let Some(handle) = self.cancel_consumer_handle.take() { + info!("Stopping cancel consumer task..."); + handle.abort(); + let _ = handle.await; + } + info!("Closing message queue connection..."); if let Err(e) = self.mq_connection.close().await { warn!("Error closing message queue: {}", e); @@ -733,6 +753,7 @@ impl WorkerService { let queue_name_for_log = queue_name.clone(); let semaphore = self.execution_semaphore.clone(); let in_flight = self.in_flight_tasks.clone(); + let cancel_tokens = self.cancel_tokens.clone(); // Spawn the consumer loop as a background task so start() can return let handle = tokio::spawn(async move { @@ -745,6 +766,7 @@ impl WorkerService { let db_pool = db_pool.clone(); let semaphore = semaphore.clone(); let in_flight = in_flight.clone(); + let cancel_tokens = cancel_tokens.clone(); async move { let execution_id = envelope.payload.execution_id; @@ -765,6 +787,13 @@ impl WorkerService { semaphore.available_permits() ); + // Create a cancellation token for this execution + let cancel_token = CancellationToken::new(); + { + let mut tokens = cancel_tokens.lock().await; + tokens.insert(execution_id, cancel_token.clone()); + } + // Spawn the actual execution as a background task so this // handler returns immediately, acking the message and freeing // the consumer loop to process the next delivery. @@ -775,12 +804,20 @@ impl WorkerService { let _permit = permit; if let Err(e) = Self::handle_execution_scheduled( - executor, publisher, db_pool, envelope, + executor, + publisher, + db_pool, + envelope, + cancel_token, ) .await { error!("Execution {} handler error: {}", execution_id, e); } + + // Remove the cancel token now that execution is done + let mut tokens = cancel_tokens.lock().await; + tokens.remove(&execution_id); }); Ok(()) @@ -813,6 +850,7 @@ impl WorkerService { publisher: Arc, db_pool: PgPool, envelope: MessageEnvelope, + cancel_token: CancellationToken, ) -> Result<()> { let execution_id = envelope.payload.execution_id; @@ -821,6 +859,42 @@ impl WorkerService { execution_id ); + // Check if the execution was already cancelled before we started + // (e.g. pre-running cancellation via the API). + { + if let Ok(Some(exec)) = ExecutionRepository::find_by_id(&db_pool, execution_id).await { + if matches!( + exec.status, + ExecutionStatus::Cancelled | ExecutionStatus::Canceling + ) { + info!( + "Execution {} already in {:?} state, skipping", + execution_id, exec.status + ); + // If it was Canceling, finalize to Cancelled + if exec.status == ExecutionStatus::Canceling { + let _ = Self::publish_status_update( + &db_pool, + &publisher, + execution_id, + ExecutionStatus::Cancelled, + None, + Some("Cancelled before execution started".to_string()), + ) + .await; + let _ = Self::publish_completion_notification( + &db_pool, + &publisher, + execution_id, + ExecutionStatus::Cancelled, + ) + .await; + } + return Ok(()); + } + } + } + // Publish status: running if let Err(e) = Self::publish_status_update( &db_pool, @@ -836,42 +910,88 @@ impl WorkerService { // Continue anyway - we'll update the database directly } - // Execute the action - match executor.execute(execution_id).await { + // Execute the action (with cancellation support) + match executor + .execute_with_cancel(execution_id, cancel_token.clone()) + .await + { Ok(result) => { - info!( - "Execution {} completed successfully in {}ms", - execution_id, result.duration_ms - ); + // Check if this was a cancellation + let was_cancelled = cancel_token.is_cancelled() + || result + .error + .as_deref() + .is_some_and(|e| e.contains("cancelled")); - // Publish status: completed - if let Err(e) = Self::publish_status_update( - &db_pool, - &publisher, - execution_id, - ExecutionStatus::Completed, - result.result.clone(), - None, - ) - .await - { - error!("Failed to publish success status: {}", e); - } - - // Publish completion notification for queue management - if let Err(e) = Self::publish_completion_notification( - &db_pool, - &publisher, - execution_id, - ExecutionStatus::Completed, - ) - .await - { - error!( - "Failed to publish completion notification for execution {}: {}", - execution_id, e + if was_cancelled { + info!( + "Execution {} was cancelled in {}ms", + execution_id, result.duration_ms ); - // Continue - this is important for queue management but not fatal + + // Publish status: cancelled + if let Err(e) = Self::publish_status_update( + &db_pool, + &publisher, + execution_id, + ExecutionStatus::Cancelled, + None, + Some("Cancelled by user".to_string()), + ) + .await + { + error!("Failed to publish cancelled status: {}", e); + } + + // Publish completion notification for queue management + if let Err(e) = Self::publish_completion_notification( + &db_pool, + &publisher, + execution_id, + ExecutionStatus::Cancelled, + ) + .await + { + error!( + "Failed to publish completion notification for cancelled execution {}: {}", + execution_id, e + ); + } + } else { + info!( + "Execution {} completed successfully in {}ms", + execution_id, result.duration_ms + ); + + // Publish status: completed + if let Err(e) = Self::publish_status_update( + &db_pool, + &publisher, + execution_id, + ExecutionStatus::Completed, + result.result.clone(), + None, + ) + .await + { + error!("Failed to publish success status: {}", e); + } + + // Publish completion notification for queue management + if let Err(e) = Self::publish_completion_notification( + &db_pool, + &publisher, + execution_id, + ExecutionStatus::Completed, + ) + .await + { + error!( + "Failed to publish completion notification for execution {}: {}", + execution_id, e + ); + // Continue - this is important for queue management but not fatal + } } } Err(e) => { @@ -912,6 +1032,87 @@ impl WorkerService { Ok(()) } + /// Start consuming execution cancel requests from the per-worker cancel queue. + async fn start_cancel_consumer(&mut self) -> Result<()> { + let worker_id = self + .worker_id + .ok_or_else(|| Error::Internal("Worker not registered".to_string()))?; + + let queue_name = format!("worker.{}.cancel", worker_id); + + info!("Starting cancel consumer for queue: {}", queue_name); + + let consumer = Arc::new( + Consumer::new( + &self.mq_connection, + ConsumerConfig { + queue: queue_name.clone(), + tag: format!("worker-{}-cancel", worker_id), + prefetch_count: 10, + auto_ack: false, + exclusive: false, + }, + ) + .await + .map_err(|e| Error::Internal(format!("Failed to create cancel consumer: {}", e)))?, + ); + + let consumer_for_task = consumer.clone(); + let cancel_tokens = self.cancel_tokens.clone(); + let queue_name_for_log = queue_name.clone(); + + let handle = tokio::spawn(async move { + info!( + "Cancel consumer loop started for queue '{}'", + queue_name_for_log + ); + let result = consumer_for_task + .consume_with_handler( + move |envelope: MessageEnvelope| { + let cancel_tokens = cancel_tokens.clone(); + + async move { + let execution_id = envelope.payload.execution_id; + info!("Received cancel request for execution {}", execution_id); + + let tokens = cancel_tokens.lock().await; + if let Some(token) = tokens.get(&execution_id) { + info!("Triggering cancellation for execution {}", execution_id); + token.cancel(); + } else { + warn!( + "No cancel token found for execution {} \ + (may have already completed or not yet started)", + execution_id + ); + } + + Ok(()) + } + }, + ) + .await; + + match result { + Ok(()) => info!( + "Cancel consumer loop for queue '{}' ended", + queue_name_for_log + ), + Err(e) => error!( + "Cancel consumer loop for queue '{}' failed: {}", + queue_name_for_log, e + ), + } + }); + + self.cancel_consumer = Some(consumer); + self.cancel_consumer_handle = Some(handle); + + info!("Cancel consumer initialized for queue: {}", queue_name); + + Ok(()) + } + /// Publish execution status update async fn publish_status_update( db_pool: &PgPool, @@ -935,6 +1136,7 @@ impl WorkerService { ExecutionStatus::Running => "running", ExecutionStatus::Completed => "completed", ExecutionStatus::Failed => "failed", + ExecutionStatus::Canceling => "canceling", ExecutionStatus::Cancelled => "cancelled", ExecutionStatus::Timeout => "timeout", _ => "unknown", diff --git a/crates/worker/tests/dependency_isolation_test.rs b/crates/worker/tests/dependency_isolation_test.rs index ea09928..03054f9 100644 --- a/crates/worker/tests/dependency_isolation_test.rs +++ b/crates/worker/tests/dependency_isolation_test.rs @@ -86,6 +86,7 @@ fn make_context(action_ref: &str, entry_point: &str, runtime_name: &str) -> Exec parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), + cancel_token: None, } } diff --git a/crates/worker/tests/log_truncation_test.rs b/crates/worker/tests/log_truncation_test.rs index 4254eca..2b95a1c 100644 --- a/crates/worker/tests/log_truncation_test.rs +++ b/crates/worker/tests/log_truncation_test.rs @@ -51,6 +51,7 @@ fn make_python_context( parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), + cancel_token: None, } } @@ -133,6 +134,7 @@ done parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -291,6 +293,7 @@ async fn test_shell_process_runtime_truncation() { parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); diff --git a/crates/worker/tests/security_tests.rs b/crates/worker/tests/security_tests.rs index 11c6b67..dc4042c 100644 --- a/crates/worker/tests/security_tests.rs +++ b/crates/worker/tests/security_tests.rs @@ -77,6 +77,7 @@ print(json.dumps(result)) parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::Json, + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -170,6 +171,7 @@ echo "SECURITY_PASS: Secrets not in environment but accessible via get_secret" parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -234,6 +236,7 @@ print(json.dumps({'secret_a': secrets.get('secret_a')})) parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::Json, + cancel_token: None, }; let result1 = runtime.execute(context1).await.unwrap(); @@ -279,6 +282,7 @@ print(json.dumps({ parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::Json, + cancel_token: None, }; let result2 = runtime.execute(context2).await.unwrap(); @@ -333,6 +337,7 @@ print("ok") parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -384,6 +389,7 @@ fi parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -456,6 +462,7 @@ echo "PASS: No secrets in environment" parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); @@ -527,6 +534,7 @@ print(json.dumps({"leaked": leaked})) parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::Json, + cancel_token: None, }; let result = runtime.execute(context).await.unwrap(); diff --git a/deny.toml b/deny.toml new file mode 100644 index 0000000..72109a5 --- /dev/null +++ b/deny.toml @@ -0,0 +1,38 @@ +[graph] +all-features = true + +[advisories] +version = 2 +yanked = "deny" +ignore = [] + +[licenses] +version = 2 +confidence-threshold = 0.9 +allow = [ + "MIT", + "Apache-2.0", + "BSD-2-Clause", + "BSD-3-Clause", + "ISC", + "MPL-2.0", + "Unicode-3.0", + "Zlib", + "CC0-1.0", + "OpenSSL", + "BSL-1.0", +] + +[bans] +multiple-versions = "warn" +wildcards = "allow" +highlight = "all" +deny = [] +skip = [] +skip-tree = [] + +[sources] +unknown-registry = "deny" +unknown-git = "deny" +allow-registry = ["https://github.com/rust-lang/crates.io-index"] +allow-git = [] diff --git a/web/knip.json b/web/knip.json new file mode 100644 index 0000000..3f9cd69 --- /dev/null +++ b/web/knip.json @@ -0,0 +1,6 @@ +{ + "$schema": "https://unpkg.com/knip@latest/schema.json", + "entry": ["src/main.tsx", "vite.config.ts"], + "project": ["src/**/*.{ts,tsx}", "scripts/**/*.js"], + "ignore": ["src/api/**", "dist/**", "node_modules/**"] +} diff --git a/web/package.json b/web/package.json index 319e6ef..a8f6f03 100644 --- a/web/package.json +++ b/web/package.json @@ -6,7 +6,9 @@ "scripts": { "dev": "vite", "build": "tsc -b && vite build", + "typecheck": "tsc -b --pretty false", "lint": "eslint .", + "knip": "npx --yes knip --config knip.json --production", "preview": "vite preview", "generate:api": "curl -s http://localhost:8080/api-spec/openapi.json > openapi.json && npx openapi-typescript-codegen --input ./openapi.json --output ./src/api --client axios --useOptions" }, diff --git a/web/src/components/executions/ExecutionPreviewPanel.tsx b/web/src/components/executions/ExecutionPreviewPanel.tsx index 0fa5316..e4fa3bd 100644 --- a/web/src/components/executions/ExecutionPreviewPanel.tsx +++ b/web/src/components/executions/ExecutionPreviewPanel.tsx @@ -1,7 +1,7 @@ import { memo, useEffect } from "react"; import { Link } from "react-router-dom"; -import { X, ExternalLink, Loader2 } from "lucide-react"; -import { useExecution } from "@/hooks/useExecutions"; +import { X, ExternalLink, Loader2, XCircle } from "lucide-react"; +import { useExecution, useCancelExecution } from "@/hooks/useExecutions"; import { useExecutionStream } from "@/hooks/useExecutionStream"; import { formatDistanceToNow } from "date-fns"; import type { ExecutionStatus } from "@/api"; @@ -51,6 +51,7 @@ const ExecutionPreviewPanel = memo(function ExecutionPreviewPanel({ }: ExecutionPreviewPanelProps) { const { data, isLoading, error } = useExecution(executionId); const execution = data?.data; + const cancelExecution = useCancelExecution(); // Subscribe to real-time updates for this execution useExecutionStream({ executionId, enabled: true }); @@ -70,6 +71,8 @@ const ExecutionPreviewPanel = memo(function ExecutionPreviewPanel({ execution?.status === "scheduled" || execution?.status === "requested"; + const isCancellable = isRunning || execution?.status === "canceling"; + const startedAt = execution?.started_at ? new Date(execution.started_at) : null; @@ -100,6 +103,28 @@ const ExecutionPreviewPanel = memo(function ExecutionPreviewPanel({ )}
+ {isCancellable && ( + + )} { + const response = await __request(OpenAPI, { + method: "POST", + url: "/api/v1/executions/{id}/cancel", + path: { id: executionId }, + mediaType: "application/json", + }); + return response as { data: ExecutionResponse }; + }, + onSuccess: (_data, executionId) => { + // Invalidate the specific execution and the list + queryClient.invalidateQueries({ queryKey: ["executions", executionId] }); + queryClient.invalidateQueries({ queryKey: ["executions"] }); + }, + }); +} + export function useChildExecutions(parentId: number | undefined) { return useQuery({ queryKey: ["executions", { parent: parentId }], diff --git a/web/src/pages/executions/ExecutionDetailPage.tsx b/web/src/pages/executions/ExecutionDetailPage.tsx index 8628460..27ac738 100644 --- a/web/src/pages/executions/ExecutionDetailPage.tsx +++ b/web/src/pages/executions/ExecutionDetailPage.tsx @@ -12,14 +12,14 @@ function formatDuration(ms: number): string { const remainMins = mins % 60; return `${hrs}h ${remainMins}m`; } -import { useExecution } from "@/hooks/useExecutions"; +import { useExecution, useCancelExecution } from "@/hooks/useExecutions"; import { useAction } from "@/hooks/useActions"; import { useExecutionStream } from "@/hooks/useExecutionStream"; import { useExecutionHistory } from "@/hooks/useHistory"; import { formatDistanceToNow } from "date-fns"; import { ExecutionStatus } from "@/api"; import { useState, useMemo } from "react"; -import { RotateCcw, Loader2 } from "lucide-react"; +import { RotateCcw, Loader2, XCircle } from "lucide-react"; import ExecuteActionModal from "@/components/common/ExecuteActionModal"; import EntityHistoryPanel from "@/components/common/EntityHistoryPanel"; import ExecutionArtifactsPanel from "@/components/executions/ExecutionArtifactsPanel"; @@ -123,6 +123,7 @@ export default function ExecutionDetailPage() { const isWorkflow = !!actionData?.data?.workflow_def; const [showRerunModal, setShowRerunModal] = useState(false); + const cancelExecution = useCancelExecution(); // Fetch status history for the timeline const { data: historyData, isLoading: historyLoading } = useExecutionHistory( @@ -200,6 +201,9 @@ export default function ExecutionDetailPage() { execution.status === ExecutionStatus.SCHEDULED || execution.status === ExecutionStatus.REQUESTED; + const isCancellable = + isRunning || execution.status === ExecutionStatus.CANCELING; + return (
{/* Header */} @@ -236,19 +240,44 @@ export default function ExecutionDetailPage() {
)}
- +
+ {isCancellable && ( + + )} + +