trying to run a gitea workflow
Some checks failed
CI / Security Advisory Checks (push) Waiting to run
CI / Rust Blocking Checks (push) Failing after 47s
CI / Web Blocking Checks (push) Failing after 46s
CI / Security Blocking Checks (push) Failing after 8s
CI / Web Advisory Checks (push) Failing after 9s

This commit is contained in:
2026-03-04 22:36:16 -06:00
parent 7438f92502
commit 67a1c02543
25 changed files with 1129 additions and 83 deletions

View File

@@ -13,6 +13,7 @@ path = "src/main.rs"
[dependencies]
attune-common = { path = "../common" }
tokio = { workspace = true }
tokio-util = { workspace = true }
sqlx = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
@@ -34,5 +35,6 @@ sha2 = { workspace = true }
base64 = { workspace = true }
tempfile = { workspace = true }
jsonwebtoken = { workspace = true }
libc = "0.2"
[dev-dependencies]

View File

@@ -48,6 +48,8 @@ pub struct ActionExecutor {
jwt_config: JwtConfig,
}
use tokio_util::sync::CancellationToken;
/// Normalize a server bind address into a connectable URL.
///
/// When the server binds to `0.0.0.0` (all interfaces) or `::` (IPv6 any),
@@ -90,6 +92,19 @@ impl ActionExecutor {
/// Execute an action for the given execution
pub async fn execute(&self, execution_id: i64) -> Result<ExecutionResult> {
self.execute_with_cancel(execution_id, CancellationToken::new())
.await
}
/// Execute an action for the given execution, with cancellation support.
///
/// When the `cancel_token` is triggered, the running process receives
/// SIGINT → SIGTERM → SIGKILL with escalating grace periods.
pub async fn execute_with_cancel(
&self,
execution_id: i64,
cancel_token: CancellationToken,
) -> Result<ExecutionResult> {
info!("Starting execution: {}", execution_id);
// Update execution status to running
@@ -108,7 +123,7 @@ impl ActionExecutor {
let action = self.load_action(&execution).await?;
// Prepare execution context
let context = match self.prepare_execution_context(&execution, &action).await {
let mut context = match self.prepare_execution_context(&execution, &action).await {
Ok(ctx) => ctx,
Err(e) => {
error!("Failed to prepare execution context: {}", e);
@@ -122,6 +137,9 @@ impl ActionExecutor {
}
};
// Attach the cancellation token so the process executor can monitor it
context.cancel_token = Some(cancel_token);
// Execute the action
// Note: execute_action should rarely return Err - most failures should be
// captured in ExecutionResult with non-zero exit codes
@@ -520,6 +538,7 @@ impl ActionExecutor {
parameter_delivery: action.parameter_delivery,
parameter_format: action.parameter_format,
output_format: action.output_format,
cancel_token: None,
};
Ok(context)

View File

@@ -177,6 +177,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
assert!(runtime.can_execute(&context));
@@ -209,6 +210,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
assert!(!runtime.can_execute(&context));

View File

@@ -43,6 +43,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use thiserror::Error;
use tokio_util::sync::CancellationToken;
// Re-export dependency management types
pub use dependency::{
@@ -90,7 +91,7 @@ pub enum RuntimeError {
}
/// Action execution context
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct ExecutionContext {
/// Execution ID
pub execution_id: i64,
@@ -130,7 +131,6 @@ pub struct ExecutionContext {
/// "Python" runtime). When present, `ProcessRuntime` uses this config
/// instead of its built-in one for interpreter resolution, environment
/// setup, and dependency management.
#[serde(skip)]
pub runtime_config_override: Option<RuntimeExecutionConfig>,
/// Optional override of the environment directory suffix. When a specific
@@ -144,28 +144,24 @@ pub struct ExecutionContext {
pub selected_runtime_version: Option<String>,
/// Maximum stdout size in bytes (for log truncation)
#[serde(default = "default_max_log_bytes")]
pub max_stdout_bytes: usize,
/// Maximum stderr size in bytes (for log truncation)
#[serde(default = "default_max_log_bytes")]
pub max_stderr_bytes: usize,
/// How parameters should be delivered to the action
#[serde(default)]
pub parameter_delivery: ParameterDelivery,
/// Format for parameter serialization
#[serde(default)]
pub parameter_format: ParameterFormat,
/// Format for output parsing
#[serde(default)]
pub output_format: OutputFormat,
}
fn default_max_log_bytes() -> usize {
10 * 1024 * 1024 // 10MB
/// Optional cancellation token for graceful process termination.
/// When triggered, the executor sends SIGINT → SIGTERM → SIGKILL
/// with escalating grace periods.
pub cancel_token: Option<CancellationToken>,
}
impl ExecutionContext {
@@ -193,6 +189,7 @@ impl ExecutionContext {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
}
}
}

View File

@@ -725,8 +725,8 @@ impl Runtime for ProcessRuntime {
.unwrap_or_else(|| "<none>".to_string()),
);
// Execute with streaming output capture
process_executor::execute_streaming(
// Execute with streaming output capture (with optional cancellation support)
process_executor::execute_streaming_cancellable(
cmd,
&context.secrets,
parameters_stdin,
@@ -734,6 +734,7 @@ impl Runtime for ProcessRuntime {
context.max_stdout_bytes,
context.max_stderr_bytes,
context.output_format,
context.cancel_token.clone(),
)
.await
}
@@ -905,6 +906,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
assert!(runtime.can_execute(&context));
@@ -939,6 +941,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
assert!(runtime.can_execute(&context));
@@ -973,6 +976,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
assert!(!runtime.can_execute(&context));
@@ -1063,6 +1067,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -1120,6 +1125,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -1158,6 +1164,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -1208,6 +1215,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -1316,6 +1324,7 @@ mod tests {
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();

View File

@@ -4,6 +4,14 @@
//! implementations. Handles streaming stdout/stderr capture, bounded log
//! collection, timeout management, stdin parameter/secret delivery, and
//! output format parsing.
//!
//! ## Cancellation Support
//!
//! When a `CancellationToken` is provided, the executor monitors it alongside
//! the running process. On cancellation:
//! 1. SIGINT is sent to the process (allows graceful shutdown)
//! 2. After a 10-second grace period, SIGTERM is sent if the process hasn't exited
//! 3. After another 5-second grace period, SIGKILL is sent as a last resort
use super::{BoundedLogWriter, ExecutionResult, OutputFormat, RuntimeResult};
use std::collections::HashMap;
@@ -12,7 +20,8 @@ use std::time::Instant;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::time::timeout;
use tracing::{debug, warn};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
/// Execute a subprocess command with streaming output capture.
///
@@ -33,6 +42,48 @@ use tracing::{debug, warn};
/// * `max_stderr_bytes` - Maximum stderr size before truncation
/// * `output_format` - How to parse stdout (Text, Json, Yaml, Jsonl)
pub async fn execute_streaming(
cmd: Command,
secrets: &HashMap<String, String>,
parameters_stdin: Option<&str>,
timeout_secs: Option<u64>,
max_stdout_bytes: usize,
max_stderr_bytes: usize,
output_format: OutputFormat,
) -> RuntimeResult<ExecutionResult> {
execute_streaming_cancellable(
cmd,
secrets,
parameters_stdin,
timeout_secs,
max_stdout_bytes,
max_stderr_bytes,
output_format,
None,
)
.await
}
/// Execute a subprocess command with streaming output capture and optional cancellation.
///
/// This is the core execution function used by all runtime implementations.
/// It handles:
/// - Spawning the process with piped I/O
/// - Writing parameters and secrets to stdin
/// - Streaming stdout/stderr with bounded log collection
/// - Timeout management
/// - Graceful cancellation via SIGINT → SIGTERM → SIGKILL escalation
/// - Output format parsing (JSON, YAML, JSONL, text)
///
/// # Arguments
/// * `cmd` - Pre-configured `Command` (interpreter, args, env vars, working dir already set)
/// * `secrets` - Secrets to pass via stdin (as JSON)
/// * `parameters_stdin` - Optional parameter data to write to stdin before secrets
/// * `timeout_secs` - Optional execution timeout in seconds
/// * `max_stdout_bytes` - Maximum stdout size before truncation
/// * `max_stderr_bytes` - Maximum stderr size before truncation
/// * `output_format` - How to parse stdout (Text, Json, Yaml, Jsonl)
/// * `cancel_token` - Optional cancellation token for graceful process termination
pub async fn execute_streaming_cancellable(
mut cmd: Command,
secrets: &HashMap<String, String>,
parameters_stdin: Option<&str>,
@@ -40,6 +91,7 @@ pub async fn execute_streaming(
max_stdout_bytes: usize,
max_stderr_bytes: usize,
output_format: OutputFormat,
cancel_token: Option<CancellationToken>,
) -> RuntimeResult<ExecutionResult> {
let start = Instant::now();
@@ -134,15 +186,74 @@ pub async fn execute_streaming(
stderr_writer
};
// Wait for both streams and the process
let (stdout_writer, stderr_writer, wait_result) =
tokio::join!(stdout_task, stderr_task, async {
// Determine the process ID for signal-based cancellation.
// Must be read before we move `child` into the wait future.
let child_pid = child.id();
// Build the wait future that handles timeout, cancellation, and normal completion.
//
// The result is a tuple: (wait_result, was_cancelled)
// - wait_result mirrors the original type: Result<Result<ExitStatus, io::Error>, Elapsed>
// - was_cancelled indicates the process was stopped by a cancel request
let wait_future = async {
// Inner future: wait for the child process to exit
let wait_child = child.wait();
// Apply optional timeout wrapping
let timed_wait = async {
if let Some(timeout_secs) = timeout_secs {
timeout(std::time::Duration::from_secs(timeout_secs), child.wait()).await
timeout(std::time::Duration::from_secs(timeout_secs), wait_child).await
} else {
Ok(child.wait().await)
Ok(wait_child.await)
}
});
};
// If we have a cancel token, race it against the (possibly-timed) wait
if let Some(ref token) = cancel_token {
tokio::select! {
result = timed_wait => (result, false),
_ = token.cancelled() => {
// Cancellation requested — escalate signals to the child process.
info!("Cancel signal received, sending SIGINT to process");
if let Some(pid) = child_pid {
send_signal(pid, libc::SIGINT);
}
// Grace period: wait up to 10s for the process to exit after SIGINT.
match timeout(std::time::Duration::from_secs(10), child.wait()).await {
Ok(status) => (Ok(status), true),
Err(_) => {
// Still alive — escalate to SIGTERM
warn!("Process did not exit after SIGINT + 10s grace period, sending SIGTERM");
if let Some(pid) = child_pid {
send_signal(pid, libc::SIGTERM);
}
// Final grace period: wait up to 5s for SIGTERM
match timeout(std::time::Duration::from_secs(5), child.wait()).await {
Ok(status) => (Ok(status), true),
Err(_) => {
// Last resort — SIGKILL
warn!("Process did not exit after SIGTERM + 5s, sending SIGKILL");
if let Some(pid) = child_pid {
send_signal(pid, libc::SIGKILL);
}
// Wait indefinitely for the SIGKILL to take effect
(Ok(child.wait().await), true)
}
}
}
}
}
}
} else {
(timed_wait.await, false)
}
};
// Wait for both streams and the process
let (stdout_writer, stderr_writer, (wait_result, was_cancelled)) =
tokio::join!(stdout_task, stderr_task, wait_future);
let duration_ms = start.elapsed().as_millis() as u64;
@@ -177,6 +288,22 @@ pub async fn execute_streaming(
}
};
// If the process was cancelled, return a specific result
if was_cancelled {
return Ok(ExecutionResult {
exit_code,
stdout: stdout_result.content.clone(),
stderr: stderr_result.content.clone(),
result: None,
duration_ms,
error: Some("Execution cancelled by user".to_string()),
stdout_truncated: stdout_result.truncated,
stderr_truncated: stderr_result.truncated,
stdout_bytes_truncated: stdout_result.bytes_truncated,
stderr_bytes_truncated: stderr_result.bytes_truncated,
});
}
debug!(
"Process execution completed: exit_code={}, duration={}ms, stdout_truncated={}, stderr_truncated={}",
exit_code, duration_ms, stdout_result.truncated, stderr_result.truncated
@@ -248,6 +375,19 @@ pub async fn execute_streaming(
}
/// Parse stdout content according to the specified output format.
/// Send a Unix signal to a process by PID.
///
/// Uses raw `libc::kill()` to deliver signals for graceful process termination.
/// This is safe because we only send signals to child processes we spawned.
fn send_signal(pid: u32, signal: i32) {
// Safety: we're sending a signal to a known child process PID.
// The PID is valid because we obtained it from `child.id()` before the
// child exited.
unsafe {
libc::kill(pid as i32, signal);
}
}
fn parse_output(stdout: &str, format: OutputFormat) -> Option<serde_json::Value> {
let trimmed = stdout.trim();
if trimmed.is_empty() {

View File

@@ -623,6 +623,7 @@ mod tests {
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -659,6 +660,7 @@ mod tests {
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -690,6 +692,7 @@ mod tests {
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -723,6 +726,7 @@ mod tests {
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -771,6 +775,7 @@ echo "missing=$missing"
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -814,6 +819,7 @@ echo '{"id": 3, "name": "Charlie"}'
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::Jsonl,
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -880,6 +886,7 @@ printf '{"status_code":200,"body":"hello","json":{\n "args": {\n "hello": "w
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::Json,
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -935,6 +942,7 @@ echo '{"result": "success", "count": 42}'
parameter_delivery: attune_common::models::ParameterDelivery::default(),
parameter_format: attune_common::models::ParameterFormat::default(),
output_format: attune_common::models::OutputFormat::Json,
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();

View File

@@ -11,7 +11,7 @@
//! 4. **Verify runtime versions** — run verification commands for each registered
//! `RuntimeVersion` to determine which are available on this host/container
//! 5. **Set up runtime environments** — create per-version environments for packs
//! 6. Start heartbeat, execution consumer, and pack registration consumer
//! 6. Start heartbeat, execution consumer, pack registration consumer, and cancel consumer
use attune_common::config::Config;
use attune_common::db::Database;
@@ -19,19 +19,21 @@ use attune_common::error::{Error, Result};
use attune_common::models::ExecutionStatus;
use attune_common::mq::{
config::MessageQueueConfig as MqConfig, Connection, Consumer, ConsumerConfig,
ExecutionCompletedPayload, ExecutionStatusChangedPayload, MessageEnvelope, MessageType,
PackRegisteredPayload, Publisher, PublisherConfig,
ExecutionCancelRequestedPayload, ExecutionCompletedPayload, ExecutionStatusChangedPayload,
MessageEnvelope, MessageType, PackRegisteredPayload, Publisher, PublisherConfig,
};
use attune_common::repositories::{execution::ExecutionRepository, FindById};
use attune_common::runtime_detection::runtime_in_filter;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock, Semaphore};
use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::artifacts::ArtifactManager;
@@ -72,6 +74,8 @@ pub struct WorkerService {
consumer_handle: Option<JoinHandle<()>>,
pack_consumer: Option<Arc<Consumer>>,
pack_consumer_handle: Option<JoinHandle<()>>,
cancel_consumer: Option<Arc<Consumer>>,
cancel_consumer_handle: Option<JoinHandle<()>>,
worker_id: Option<i64>,
/// Runtime filter derived from ATTUNE_WORKER_RUNTIMES
runtime_filter: Option<Vec<String>>,
@@ -83,6 +87,10 @@ pub struct WorkerService {
execution_semaphore: Arc<Semaphore>,
/// Tracks in-flight execution tasks for graceful shutdown
in_flight_tasks: Arc<Mutex<JoinSet<()>>>,
/// Maps execution ID → CancellationToken for running processes.
/// When a cancel request arrives, the token is triggered, causing
/// the process executor to send SIGINT → SIGTERM → SIGKILL.
cancel_tokens: Arc<Mutex<HashMap<i64, CancellationToken>>>,
}
impl WorkerService {
@@ -362,12 +370,15 @@ impl WorkerService {
consumer_handle: None,
pack_consumer: None,
pack_consumer_handle: None,
cancel_consumer: None,
cancel_consumer_handle: None,
worker_id: None,
runtime_filter: runtime_filter_for_service,
packs_base_dir,
runtime_envs_dir,
execution_semaphore: Arc::new(Semaphore::new(max_concurrent_tasks)),
in_flight_tasks: Arc::new(Mutex::new(JoinSet::new())),
cancel_tokens: Arc::new(Mutex::new(HashMap::new())),
})
}
@@ -416,6 +427,9 @@ impl WorkerService {
// Start consuming pack registration events
self.start_pack_consumer().await?;
// Start consuming cancel requests
self.start_cancel_consumer().await?;
info!("Worker Service started successfully");
Ok(())
@@ -640,6 +654,12 @@ impl WorkerService {
let _ = handle.await;
}
if let Some(handle) = self.cancel_consumer_handle.take() {
info!("Stopping cancel consumer task...");
handle.abort();
let _ = handle.await;
}
info!("Closing message queue connection...");
if let Err(e) = self.mq_connection.close().await {
warn!("Error closing message queue: {}", e);
@@ -733,6 +753,7 @@ impl WorkerService {
let queue_name_for_log = queue_name.clone();
let semaphore = self.execution_semaphore.clone();
let in_flight = self.in_flight_tasks.clone();
let cancel_tokens = self.cancel_tokens.clone();
// Spawn the consumer loop as a background task so start() can return
let handle = tokio::spawn(async move {
@@ -745,6 +766,7 @@ impl WorkerService {
let db_pool = db_pool.clone();
let semaphore = semaphore.clone();
let in_flight = in_flight.clone();
let cancel_tokens = cancel_tokens.clone();
async move {
let execution_id = envelope.payload.execution_id;
@@ -765,6 +787,13 @@ impl WorkerService {
semaphore.available_permits()
);
// Create a cancellation token for this execution
let cancel_token = CancellationToken::new();
{
let mut tokens = cancel_tokens.lock().await;
tokens.insert(execution_id, cancel_token.clone());
}
// Spawn the actual execution as a background task so this
// handler returns immediately, acking the message and freeing
// the consumer loop to process the next delivery.
@@ -775,12 +804,20 @@ impl WorkerService {
let _permit = permit;
if let Err(e) = Self::handle_execution_scheduled(
executor, publisher, db_pool, envelope,
executor,
publisher,
db_pool,
envelope,
cancel_token,
)
.await
{
error!("Execution {} handler error: {}", execution_id, e);
}
// Remove the cancel token now that execution is done
let mut tokens = cancel_tokens.lock().await;
tokens.remove(&execution_id);
});
Ok(())
@@ -813,6 +850,7 @@ impl WorkerService {
publisher: Arc<Publisher>,
db_pool: PgPool,
envelope: MessageEnvelope<ExecutionScheduledPayload>,
cancel_token: CancellationToken,
) -> Result<()> {
let execution_id = envelope.payload.execution_id;
@@ -821,6 +859,42 @@ impl WorkerService {
execution_id
);
// Check if the execution was already cancelled before we started
// (e.g. pre-running cancellation via the API).
{
if let Ok(Some(exec)) = ExecutionRepository::find_by_id(&db_pool, execution_id).await {
if matches!(
exec.status,
ExecutionStatus::Cancelled | ExecutionStatus::Canceling
) {
info!(
"Execution {} already in {:?} state, skipping",
execution_id, exec.status
);
// If it was Canceling, finalize to Cancelled
if exec.status == ExecutionStatus::Canceling {
let _ = Self::publish_status_update(
&db_pool,
&publisher,
execution_id,
ExecutionStatus::Cancelled,
None,
Some("Cancelled before execution started".to_string()),
)
.await;
let _ = Self::publish_completion_notification(
&db_pool,
&publisher,
execution_id,
ExecutionStatus::Cancelled,
)
.await;
}
return Ok(());
}
}
}
// Publish status: running
if let Err(e) = Self::publish_status_update(
&db_pool,
@@ -836,42 +910,88 @@ impl WorkerService {
// Continue anyway - we'll update the database directly
}
// Execute the action
match executor.execute(execution_id).await {
// Execute the action (with cancellation support)
match executor
.execute_with_cancel(execution_id, cancel_token.clone())
.await
{
Ok(result) => {
info!(
"Execution {} completed successfully in {}ms",
execution_id, result.duration_ms
);
// Check if this was a cancellation
let was_cancelled = cancel_token.is_cancelled()
|| result
.error
.as_deref()
.is_some_and(|e| e.contains("cancelled"));
// Publish status: completed
if let Err(e) = Self::publish_status_update(
&db_pool,
&publisher,
execution_id,
ExecutionStatus::Completed,
result.result.clone(),
None,
)
.await
{
error!("Failed to publish success status: {}", e);
}
// Publish completion notification for queue management
if let Err(e) = Self::publish_completion_notification(
&db_pool,
&publisher,
execution_id,
ExecutionStatus::Completed,
)
.await
{
error!(
"Failed to publish completion notification for execution {}: {}",
execution_id, e
if was_cancelled {
info!(
"Execution {} was cancelled in {}ms",
execution_id, result.duration_ms
);
// Continue - this is important for queue management but not fatal
// Publish status: cancelled
if let Err(e) = Self::publish_status_update(
&db_pool,
&publisher,
execution_id,
ExecutionStatus::Cancelled,
None,
Some("Cancelled by user".to_string()),
)
.await
{
error!("Failed to publish cancelled status: {}", e);
}
// Publish completion notification for queue management
if let Err(e) = Self::publish_completion_notification(
&db_pool,
&publisher,
execution_id,
ExecutionStatus::Cancelled,
)
.await
{
error!(
"Failed to publish completion notification for cancelled execution {}: {}",
execution_id, e
);
}
} else {
info!(
"Execution {} completed successfully in {}ms",
execution_id, result.duration_ms
);
// Publish status: completed
if let Err(e) = Self::publish_status_update(
&db_pool,
&publisher,
execution_id,
ExecutionStatus::Completed,
result.result.clone(),
None,
)
.await
{
error!("Failed to publish success status: {}", e);
}
// Publish completion notification for queue management
if let Err(e) = Self::publish_completion_notification(
&db_pool,
&publisher,
execution_id,
ExecutionStatus::Completed,
)
.await
{
error!(
"Failed to publish completion notification for execution {}: {}",
execution_id, e
);
// Continue - this is important for queue management but not fatal
}
}
}
Err(e) => {
@@ -912,6 +1032,87 @@ impl WorkerService {
Ok(())
}
/// Start consuming execution cancel requests from the per-worker cancel queue.
async fn start_cancel_consumer(&mut self) -> Result<()> {
let worker_id = self
.worker_id
.ok_or_else(|| Error::Internal("Worker not registered".to_string()))?;
let queue_name = format!("worker.{}.cancel", worker_id);
info!("Starting cancel consumer for queue: {}", queue_name);
let consumer = Arc::new(
Consumer::new(
&self.mq_connection,
ConsumerConfig {
queue: queue_name.clone(),
tag: format!("worker-{}-cancel", worker_id),
prefetch_count: 10,
auto_ack: false,
exclusive: false,
},
)
.await
.map_err(|e| Error::Internal(format!("Failed to create cancel consumer: {}", e)))?,
);
let consumer_for_task = consumer.clone();
let cancel_tokens = self.cancel_tokens.clone();
let queue_name_for_log = queue_name.clone();
let handle = tokio::spawn(async move {
info!(
"Cancel consumer loop started for queue '{}'",
queue_name_for_log
);
let result = consumer_for_task
.consume_with_handler(
move |envelope: MessageEnvelope<ExecutionCancelRequestedPayload>| {
let cancel_tokens = cancel_tokens.clone();
async move {
let execution_id = envelope.payload.execution_id;
info!("Received cancel request for execution {}", execution_id);
let tokens = cancel_tokens.lock().await;
if let Some(token) = tokens.get(&execution_id) {
info!("Triggering cancellation for execution {}", execution_id);
token.cancel();
} else {
warn!(
"No cancel token found for execution {} \
(may have already completed or not yet started)",
execution_id
);
}
Ok(())
}
},
)
.await;
match result {
Ok(()) => info!(
"Cancel consumer loop for queue '{}' ended",
queue_name_for_log
),
Err(e) => error!(
"Cancel consumer loop for queue '{}' failed: {}",
queue_name_for_log, e
),
}
});
self.cancel_consumer = Some(consumer);
self.cancel_consumer_handle = Some(handle);
info!("Cancel consumer initialized for queue: {}", queue_name);
Ok(())
}
/// Publish execution status update
async fn publish_status_update(
db_pool: &PgPool,
@@ -935,6 +1136,7 @@ impl WorkerService {
ExecutionStatus::Running => "running",
ExecutionStatus::Completed => "completed",
ExecutionStatus::Failed => "failed",
ExecutionStatus::Canceling => "canceling",
ExecutionStatus::Cancelled => "cancelled",
ExecutionStatus::Timeout => "timeout",
_ => "unknown",

View File

@@ -86,6 +86,7 @@ fn make_context(action_ref: &str, entry_point: &str, runtime_name: &str) -> Exec
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
cancel_token: None,
}
}

View File

@@ -51,6 +51,7 @@ fn make_python_context(
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
cancel_token: None,
}
}
@@ -133,6 +134,7 @@ done
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -291,6 +293,7 @@ async fn test_shell_process_runtime_truncation() {
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();

View File

@@ -77,6 +77,7 @@ print(json.dumps(result))
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::Json,
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -170,6 +171,7 @@ echo "SECURITY_PASS: Secrets not in environment but accessible via get_secret"
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -234,6 +236,7 @@ print(json.dumps({'secret_a': secrets.get('secret_a')}))
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::Json,
cancel_token: None,
};
let result1 = runtime.execute(context1).await.unwrap();
@@ -279,6 +282,7 @@ print(json.dumps({
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::Json,
cancel_token: None,
};
let result2 = runtime.execute(context2).await.unwrap();
@@ -333,6 +337,7 @@ print("ok")
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -384,6 +389,7 @@ fi
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -456,6 +462,7 @@ echo "PASS: No secrets in environment"
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();
@@ -527,6 +534,7 @@ print(json.dumps({"leaked": leaked}))
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::Json,
cancel_token: None,
};
let result = runtime.execute(context).await.unwrap();