[wip] workflow cancellation policy
Some checks failed
CI / Rustfmt (push) Successful in 21s
CI / Cargo Audit & Deny (push) Successful in 32s
CI / Web Blocking Checks (push) Successful in 50s
CI / Security Blocking Checks (push) Successful in 9s
CI / Clippy (push) Failing after 1m58s
CI / Web Advisory Checks (push) Successful in 34s
CI / Security Advisory Checks (push) Successful in 1m26s
CI / Tests (push) Successful in 8m47s

This commit is contained in:
2026-03-09 14:08:01 -05:00
parent 87d830f952
commit 9e7e35cbe3
7 changed files with 451 additions and 32 deletions

View File

@@ -24,9 +24,10 @@ use attune_common::repositories::{
execution::{
CreateExecutionInput, ExecutionRepository, ExecutionSearchFilters, UpdateExecutionInput,
},
workflow::WorkflowExecutionRepository,
workflow::{WorkflowDefinitionRepository, WorkflowExecutionRepository},
Create, FindById, FindByRef, Update,
};
use attune_common::workflow::{CancellationPolicy, WorkflowDefinition};
use sqlx::Row;
use crate::{
@@ -503,6 +504,42 @@ async fn send_cancel_to_worker(publisher: Option<&Publisher>, execution_id: i64,
}
}
/// Resolve the [`CancellationPolicy`] for a workflow parent execution.
///
/// Looks up the `workflow_execution` → `workflow_definition` chain and
/// deserialises the stored definition to extract the policy. Returns
/// [`CancellationPolicy::AllowFinish`] (the default) when any lookup
/// step fails so that the safest behaviour is used as a fallback.
async fn resolve_cancellation_policy(
db: &sqlx::PgPool,
parent_execution_id: i64,
) -> CancellationPolicy {
let wf_exec =
match WorkflowExecutionRepository::find_by_execution(db, parent_execution_id).await {
Ok(Some(wf)) => wf,
_ => return CancellationPolicy::default(),
};
let wf_def = match WorkflowDefinitionRepository::find_by_id(db, wf_exec.workflow_def).await {
Ok(Some(def)) => def,
_ => return CancellationPolicy::default(),
};
// Deserialise the stored JSON definition to extract the policy field.
match serde_json::from_value::<WorkflowDefinition>(wf_def.definition) {
Ok(def) => def.cancellation_policy,
Err(e) => {
tracing::warn!(
"Failed to deserialise workflow definition for workflow_def {}: {}. \
Falling back to AllowFinish cancellation policy.",
wf_exec.workflow_def,
e
);
CancellationPolicy::default()
}
}
}
/// Cancel all incomplete child executions of a workflow parent execution.
///
/// This handles the workflow cascade: when a workflow execution is cancelled,
@@ -510,13 +547,35 @@ async fn send_cancel_to_worker(publisher: Option<&Publisher>, execution_id: i64,
/// Additionally, the `workflow_execution` record is marked Cancelled so the
/// scheduler's `advance_workflow` will short-circuit and not dispatch new tasks.
///
/// Children in pre-running states (Requested, Scheduling, Scheduled) are set
/// to Cancelled immediately. Children that are Running receive a cancel MQ
/// message so their worker can gracefully stop the process.
/// Behaviour depends on the workflow's [`CancellationPolicy`]:
///
/// - **`AllowFinish`** (default): Children in pre-running states (Requested,
/// Scheduling, Scheduled) are set to Cancelled immediately. Running children
/// are left alone and will complete naturally; `advance_workflow` sees the
/// cancelled `workflow_execution` and will not dispatch further tasks.
///
/// - **`CancelRunning`**: Pre-running children are cancelled as above.
/// Running children also receive a cancel MQ message so their worker can
/// gracefully stop the process (SIGINT → SIGTERM → SIGKILL).
async fn cancel_workflow_children(
db: &sqlx::PgPool,
publisher: Option<&Publisher>,
parent_execution_id: i64,
) {
// Determine the cancellation policy from the workflow definition.
let policy = resolve_cancellation_policy(db, parent_execution_id).await;
cancel_workflow_children_with_policy(db, publisher, parent_execution_id, policy).await;
}
/// Inner implementation that carries the resolved [`CancellationPolicy`]
/// through recursive calls so that nested child workflows inherit the
/// top-level policy.
async fn cancel_workflow_children_with_policy(
db: &sqlx::PgPool,
publisher: Option<&Publisher>,
parent_execution_id: i64,
policy: CancellationPolicy,
) {
// Find all child executions that are still incomplete
let children: Vec<attune_common::models::Execution> = match sqlx::query_as::<
@@ -546,9 +605,10 @@ async fn cancel_workflow_children(
}
tracing::info!(
"Cascading cancellation from execution {} to {} child execution(s)",
"Cascading cancellation from execution {} to {} child execution(s) (policy: {:?})",
parent_execution_id,
children.len()
children.len(),
policy,
);
for child in &children {
@@ -558,7 +618,7 @@ async fn cancel_workflow_children(
child.status,
ExecutionStatus::Requested | ExecutionStatus::Scheduling | ExecutionStatus::Scheduled
) {
// Pre-running: cancel immediately in DB
// Pre-running: cancel immediately in DB (both policies)
let update = UpdateExecutionInput {
status: Some(ExecutionStatus::Cancelled),
result: Some(serde_json::json!({
@@ -575,29 +635,45 @@ async fn cancel_workflow_children(
child.status,
ExecutionStatus::Running | ExecutionStatus::Canceling
) {
// Running: set to Canceling and send MQ message to the worker
if child.status != ExecutionStatus::Canceling {
let update = UpdateExecutionInput {
status: Some(ExecutionStatus::Canceling),
..Default::default()
};
if let Err(e) = ExecutionRepository::update(db, child_id, update).await {
tracing::error!(
"Failed to set child execution {} to canceling: {}",
child_id,
e
match policy {
CancellationPolicy::CancelRunning => {
// Running: set to Canceling and send MQ message to the worker
if child.status != ExecutionStatus::Canceling {
let update = UpdateExecutionInput {
status: Some(ExecutionStatus::Canceling),
..Default::default()
};
if let Err(e) = ExecutionRepository::update(db, child_id, update).await {
tracing::error!(
"Failed to set child execution {} to canceling: {}",
child_id,
e
);
}
}
if let Some(worker_id) = child.executor {
send_cancel_to_worker(publisher, child_id, worker_id).await;
}
}
CancellationPolicy::AllowFinish => {
// Running tasks are allowed to complete naturally.
// advance_workflow will see the cancelled workflow_execution
// and will not dispatch any further tasks.
tracing::info!(
"AllowFinish policy: leaving running child execution {} alone",
child_id
);
}
}
if let Some(worker_id) = child.executor {
send_cancel_to_worker(publisher, child_id, worker_id).await;
}
}
// Recursively cancel grandchildren (nested workflows)
// Use Box::pin to allow the recursive async call
Box::pin(cancel_workflow_children(db, publisher, child_id)).await;
Box::pin(cancel_workflow_children_with_policy(
db, publisher, child_id, policy,
))
.await;
}
// Also mark any associated workflow_execution record as Cancelled so that
@@ -634,6 +710,56 @@ async fn cancel_workflow_children(
}
}
}
// If no children are still running (all were pre-running or were
// cancelled), finalize the parent execution as Cancelled immediately.
// Without this, the parent would stay stuck in "Canceling" because no
// task completion would trigger advance_workflow to finalize it.
let still_running: Vec<attune_common::models::Execution> = match sqlx::query_as::<
_,
attune_common::models::Execution,
>(&format!(
"SELECT {} FROM execution WHERE parent = $1 AND status IN ('running', 'canceling', 'scheduling', 'scheduled', 'requested')",
attune_common::repositories::execution::SELECT_COLUMNS
))
.bind(parent_execution_id)
.fetch_all(db)
.await
{
Ok(rows) => rows,
Err(e) => {
tracing::error!(
"Failed to check remaining children for parent {}: {}",
parent_execution_id,
e
);
return;
}
};
if still_running.is_empty() {
// No children left in flight — finalize the parent execution now.
let update = UpdateExecutionInput {
status: Some(ExecutionStatus::Cancelled),
result: Some(serde_json::json!({
"error": "Workflow cancelled",
"succeeded": false,
})),
..Default::default()
};
if let Err(e) = ExecutionRepository::update(db, parent_execution_id, update).await {
tracing::error!(
"Failed to finalize parent execution {} as Cancelled: {}",
parent_execution_id,
e
);
} else {
tracing::info!(
"Finalized parent execution {} as Cancelled (no running children remain)",
parent_execution_id
);
}
}
}
/// Create execution routes

View File

@@ -15,9 +15,9 @@ pub use pack_service::{
PackSyncResult, PackValidationResult, PackWorkflowService, PackWorkflowServiceConfig,
};
pub use parser::{
parse_workflow_file, parse_workflow_yaml, workflow_to_json, BackoffStrategy, DecisionBranch,
ParseError, ParseResult, PublishDirective, RetryConfig, Task, TaskTransition, TaskType,
WorkflowDefinition,
parse_workflow_file, parse_workflow_yaml, workflow_to_json, BackoffStrategy,
CancellationPolicy, DecisionBranch, ParseError, ParseResult, PublishDirective, RetryConfig,
Task, TaskTransition, TaskType, WorkflowDefinition,
};
pub use registrar::{RegistrationOptions, RegistrationResult, WorkflowRegistrar};
pub use validator::{ValidationError, ValidationResult, WorkflowValidator};

View File

@@ -127,6 +127,17 @@ pub struct WorkflowDefinition {
/// Tags for categorization
#[serde(default)]
pub tags: Vec<String>,
/// Cancellation policy for the workflow.
///
/// Controls what happens to running tasks when the workflow is cancelled:
/// - `allow_finish` (default): Running tasks are allowed to complete naturally.
/// Only pending/requested tasks are cancelled. The workflow waits for running
/// tasks to finish but does not dispatch any new tasks.
/// - `cancel_running`: All running and pending tasks are forcefully cancelled.
/// Running processes receive SIGINT → SIGTERM → SIGKILL via the worker.
#[serde(default, skip_serializing_if = "CancellationPolicy::is_default")]
pub cancellation_policy: CancellationPolicy,
}
// ---------------------------------------------------------------------------
@@ -411,6 +422,27 @@ fn default_task_type() -> TaskType {
TaskType::Action
}
/// Policy controlling how running tasks are handled when a workflow is cancelled.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum CancellationPolicy {
/// Running tasks are allowed to complete naturally; only pending tasks are
/// cancelled and no new tasks are dispatched. This is the default.
#[default]
AllowFinish,
/// All running and pending tasks are forcefully cancelled. Running
/// processes receive SIGINT → SIGTERM → SIGKILL via the worker.
CancelRunning,
}
impl CancellationPolicy {
/// Returns `true` when the value is the default ([`AllowFinish`]).
/// Used by `#[serde(skip_serializing_if)]` to keep stored JSON compact.
pub fn is_default(&self) -> bool {
matches!(self, Self::AllowFinish)
}
}
/// Task type enumeration
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
@@ -1509,4 +1541,93 @@ tasks:
panic!("Expected Simple publish directive");
}
}
#[test]
fn test_cancellation_policy_defaults_to_allow_finish() {
let yaml = r#"
version: "1.0.0"
tasks:
- name: task1
action: core.echo
input:
message: hello
"#;
let workflow = parse_workflow_yaml(yaml).unwrap();
assert_eq!(
workflow.cancellation_policy,
CancellationPolicy::AllowFinish
);
}
#[test]
fn test_cancellation_policy_cancel_running() {
let yaml = r#"
version: "1.0.0"
cancellation_policy: cancel_running
tasks:
- name: task1
action: core.echo
input:
message: hello
"#;
let workflow = parse_workflow_yaml(yaml).unwrap();
assert_eq!(
workflow.cancellation_policy,
CancellationPolicy::CancelRunning
);
}
#[test]
fn test_cancellation_policy_allow_finish_explicit() {
let yaml = r#"
version: "1.0.0"
cancellation_policy: allow_finish
tasks:
- name: task1
action: core.echo
input:
message: hello
"#;
let workflow = parse_workflow_yaml(yaml).unwrap();
assert_eq!(
workflow.cancellation_policy,
CancellationPolicy::AllowFinish
);
}
#[test]
fn test_cancellation_policy_json_roundtrip() {
let yaml = r#"
version: "1.0.0"
cancellation_policy: cancel_running
tasks:
- name: step1
action: core.echo
input:
message: hello
"#;
let workflow = parse_workflow_yaml(yaml).unwrap();
let json = workflow_to_json(&workflow).unwrap();
let restored: WorkflowDefinition = serde_json::from_value(json).unwrap();
assert_eq!(
restored.cancellation_policy,
CancellationPolicy::CancelRunning
);
}
#[test]
fn test_cancellation_policy_absent_in_json_defaults() {
// Simulate a definition stored in the DB before this field existed
let json = serde_json::json!({
"ref": "test.wf",
"label": "Test",
"version": "1.0.0",
"tasks": [{"name": "t1", "action": "core.echo", "input": {"message": "hi"}}]
});
let workflow: WorkflowDefinition = serde_json::from_value(json).unwrap();
assert_eq!(
workflow.cancellation_policy,
CancellationPolicy::AllowFinish
);
}
}

View File

@@ -17,7 +17,7 @@ use attune_common::{
mq::{Consumer, ExecutionRequestedPayload, MessageEnvelope, MessageType, Publisher},
repositories::{
action::ActionRepository,
execution::{CreateExecutionInput, ExecutionRepository},
execution::{CreateExecutionInput, ExecutionRepository, UpdateExecutionInput},
runtime::{RuntimeRepository, WorkerRepository},
workflow::{
CreateWorkflowExecutionInput, WorkflowDefinitionRepository, WorkflowExecutionRepository,
@@ -884,10 +884,10 @@ impl ExecutionScheduler {
anyhow::anyhow!("Workflow execution {} not found", workflow_execution_id)
})?;
// Already in a terminal state — nothing to do
// Already fully terminal (Completed / Failed) — nothing to do
if matches!(
workflow_execution.status,
ExecutionStatus::Completed | ExecutionStatus::Failed | ExecutionStatus::Cancelled
ExecutionStatus::Completed | ExecutionStatus::Failed
) {
debug!(
"Workflow execution {} already in terminal state {:?}, skipping advance",
@@ -896,6 +896,41 @@ impl ExecutionScheduler {
return Ok(());
}
// Cancelled workflow: don't dispatch new tasks, but check whether all
// running children have now finished. When none remain, finalize the
// parent execution as Cancelled so it doesn't stay stuck in "Canceling".
if workflow_execution.status == ExecutionStatus::Cancelled {
let running = Self::count_running_workflow_children(
pool,
workflow_execution_id,
&workflow_execution.completed_tasks,
&workflow_execution.failed_tasks,
)
.await?;
if running == 0 {
info!(
"Cancelled workflow_execution {} has no more running children, \
finalizing parent execution {} as Cancelled",
workflow_execution_id, workflow_execution.execution
);
Self::finalize_cancelled_workflow(
pool,
workflow_execution.execution,
workflow_execution_id,
)
.await?;
} else {
debug!(
"Cancelled workflow_execution {} still has {} running children, \
waiting for them to finish",
workflow_execution_id, running
);
}
return Ok(());
}
// Load the workflow definition so we can apply param_schema defaults
let workflow_def =
WorkflowDefinitionRepository::find_by_id(pool, workflow_execution.workflow_def)
@@ -1375,6 +1410,32 @@ impl ExecutionScheduler {
Ok(count)
}
/// Finalize a cancelled workflow by updating the parent `execution` record
/// to `Cancelled`. The `workflow_execution` record is already `Cancelled`
/// (set by `cancel_workflow_children`); this only touches the parent.
async fn finalize_cancelled_workflow(
pool: &PgPool,
parent_execution_id: i64,
workflow_execution_id: i64,
) -> Result<()> {
info!(
"Finalizing cancelled workflow: parent execution {} (workflow_execution {})",
parent_execution_id, workflow_execution_id
);
let update = UpdateExecutionInput {
status: Some(ExecutionStatus::Cancelled),
result: Some(serde_json::json!({
"error": "Workflow cancelled",
"succeeded": false,
})),
..Default::default()
};
ExecutionRepository::update(pool, parent_execution_id, update).await?;
Ok(())
}
/// Mark a workflow as completed (success or failure) and update both the
/// `workflow_execution` and parent `execution` records.
async fn complete_workflow(