From 9e7e35cbe3e18c204008d49823a2eeae535c76a7 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Mon, 9 Mar 2026 14:08:01 -0500 Subject: [PATCH] [wip] workflow cancellation policy --- crates/api/src/routes/executions.rs | 172 +++++++++++++++--- crates/common/src/workflow/mod.rs | 6 +- crates/common/src/workflow/parser.rs | 121 ++++++++++++ crates/executor/src/scheduler.rs | 67 ++++++- web/src/pages/actions/WorkflowBuilderPage.tsx | 33 +++- web/src/types/workflow.ts | 30 +++ ...2026-03-09-workflow-cancellation-policy.md | 54 ++++++ 7 files changed, 451 insertions(+), 32 deletions(-) create mode 100644 work-summary/2026-03-09-workflow-cancellation-policy.md diff --git a/crates/api/src/routes/executions.rs b/crates/api/src/routes/executions.rs index 82de45c..da95d87 100644 --- a/crates/api/src/routes/executions.rs +++ b/crates/api/src/routes/executions.rs @@ -24,9 +24,10 @@ use attune_common::repositories::{ execution::{ CreateExecutionInput, ExecutionRepository, ExecutionSearchFilters, UpdateExecutionInput, }, - workflow::WorkflowExecutionRepository, + workflow::{WorkflowDefinitionRepository, WorkflowExecutionRepository}, Create, FindById, FindByRef, Update, }; +use attune_common::workflow::{CancellationPolicy, WorkflowDefinition}; use sqlx::Row; use crate::{ @@ -503,6 +504,42 @@ async fn send_cancel_to_worker(publisher: Option<&Publisher>, execution_id: i64, } } +/// Resolve the [`CancellationPolicy`] for a workflow parent execution. +/// +/// Looks up the `workflow_execution` → `workflow_definition` chain and +/// deserialises the stored definition to extract the policy. Returns +/// [`CancellationPolicy::AllowFinish`] (the default) when any lookup +/// step fails so that the safest behaviour is used as a fallback. +async fn resolve_cancellation_policy( + db: &sqlx::PgPool, + parent_execution_id: i64, +) -> CancellationPolicy { + let wf_exec = + match WorkflowExecutionRepository::find_by_execution(db, parent_execution_id).await { + Ok(Some(wf)) => wf, + _ => return CancellationPolicy::default(), + }; + + let wf_def = match WorkflowDefinitionRepository::find_by_id(db, wf_exec.workflow_def).await { + Ok(Some(def)) => def, + _ => return CancellationPolicy::default(), + }; + + // Deserialise the stored JSON definition to extract the policy field. + match serde_json::from_value::(wf_def.definition) { + Ok(def) => def.cancellation_policy, + Err(e) => { + tracing::warn!( + "Failed to deserialise workflow definition for workflow_def {}: {}. \ + Falling back to AllowFinish cancellation policy.", + wf_exec.workflow_def, + e + ); + CancellationPolicy::default() + } + } +} + /// Cancel all incomplete child executions of a workflow parent execution. /// /// This handles the workflow cascade: when a workflow execution is cancelled, @@ -510,13 +547,35 @@ async fn send_cancel_to_worker(publisher: Option<&Publisher>, execution_id: i64, /// Additionally, the `workflow_execution` record is marked Cancelled so the /// scheduler's `advance_workflow` will short-circuit and not dispatch new tasks. /// -/// Children in pre-running states (Requested, Scheduling, Scheduled) are set -/// to Cancelled immediately. Children that are Running receive a cancel MQ -/// message so their worker can gracefully stop the process. +/// Behaviour depends on the workflow's [`CancellationPolicy`]: +/// +/// - **`AllowFinish`** (default): Children in pre-running states (Requested, +/// Scheduling, Scheduled) are set to Cancelled immediately. Running children +/// are left alone and will complete naturally; `advance_workflow` sees the +/// cancelled `workflow_execution` and will not dispatch further tasks. +/// +/// - **`CancelRunning`**: Pre-running children are cancelled as above. +/// Running children also receive a cancel MQ message so their worker can +/// gracefully stop the process (SIGINT → SIGTERM → SIGKILL). async fn cancel_workflow_children( db: &sqlx::PgPool, publisher: Option<&Publisher>, parent_execution_id: i64, +) { + // Determine the cancellation policy from the workflow definition. + let policy = resolve_cancellation_policy(db, parent_execution_id).await; + + cancel_workflow_children_with_policy(db, publisher, parent_execution_id, policy).await; +} + +/// Inner implementation that carries the resolved [`CancellationPolicy`] +/// through recursive calls so that nested child workflows inherit the +/// top-level policy. +async fn cancel_workflow_children_with_policy( + db: &sqlx::PgPool, + publisher: Option<&Publisher>, + parent_execution_id: i64, + policy: CancellationPolicy, ) { // Find all child executions that are still incomplete let children: Vec = match sqlx::query_as::< @@ -546,9 +605,10 @@ async fn cancel_workflow_children( } tracing::info!( - "Cascading cancellation from execution {} to {} child execution(s)", + "Cascading cancellation from execution {} to {} child execution(s) (policy: {:?})", parent_execution_id, - children.len() + children.len(), + policy, ); for child in &children { @@ -558,7 +618,7 @@ async fn cancel_workflow_children( child.status, ExecutionStatus::Requested | ExecutionStatus::Scheduling | ExecutionStatus::Scheduled ) { - // Pre-running: cancel immediately in DB + // Pre-running: cancel immediately in DB (both policies) let update = UpdateExecutionInput { status: Some(ExecutionStatus::Cancelled), result: Some(serde_json::json!({ @@ -575,29 +635,45 @@ async fn cancel_workflow_children( child.status, ExecutionStatus::Running | ExecutionStatus::Canceling ) { - // Running: set to Canceling and send MQ message to the worker - if child.status != ExecutionStatus::Canceling { - let update = UpdateExecutionInput { - status: Some(ExecutionStatus::Canceling), - ..Default::default() - }; - if let Err(e) = ExecutionRepository::update(db, child_id, update).await { - tracing::error!( - "Failed to set child execution {} to canceling: {}", - child_id, - e + match policy { + CancellationPolicy::CancelRunning => { + // Running: set to Canceling and send MQ message to the worker + if child.status != ExecutionStatus::Canceling { + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Canceling), + ..Default::default() + }; + if let Err(e) = ExecutionRepository::update(db, child_id, update).await { + tracing::error!( + "Failed to set child execution {} to canceling: {}", + child_id, + e + ); + } + } + + if let Some(worker_id) = child.executor { + send_cancel_to_worker(publisher, child_id, worker_id).await; + } + } + CancellationPolicy::AllowFinish => { + // Running tasks are allowed to complete naturally. + // advance_workflow will see the cancelled workflow_execution + // and will not dispatch any further tasks. + tracing::info!( + "AllowFinish policy: leaving running child execution {} alone", + child_id ); } } - - if let Some(worker_id) = child.executor { - send_cancel_to_worker(publisher, child_id, worker_id).await; - } } // Recursively cancel grandchildren (nested workflows) // Use Box::pin to allow the recursive async call - Box::pin(cancel_workflow_children(db, publisher, child_id)).await; + Box::pin(cancel_workflow_children_with_policy( + db, publisher, child_id, policy, + )) + .await; } // Also mark any associated workflow_execution record as Cancelled so that @@ -634,6 +710,56 @@ async fn cancel_workflow_children( } } } + + // If no children are still running (all were pre-running or were + // cancelled), finalize the parent execution as Cancelled immediately. + // Without this, the parent would stay stuck in "Canceling" because no + // task completion would trigger advance_workflow to finalize it. + let still_running: Vec = match sqlx::query_as::< + _, + attune_common::models::Execution, + >(&format!( + "SELECT {} FROM execution WHERE parent = $1 AND status IN ('running', 'canceling', 'scheduling', 'scheduled', 'requested')", + attune_common::repositories::execution::SELECT_COLUMNS + )) + .bind(parent_execution_id) + .fetch_all(db) + .await + { + Ok(rows) => rows, + Err(e) => { + tracing::error!( + "Failed to check remaining children for parent {}: {}", + parent_execution_id, + e + ); + return; + } + }; + + if still_running.is_empty() { + // No children left in flight — finalize the parent execution now. + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Cancelled), + result: Some(serde_json::json!({ + "error": "Workflow cancelled", + "succeeded": false, + })), + ..Default::default() + }; + if let Err(e) = ExecutionRepository::update(db, parent_execution_id, update).await { + tracing::error!( + "Failed to finalize parent execution {} as Cancelled: {}", + parent_execution_id, + e + ); + } else { + tracing::info!( + "Finalized parent execution {} as Cancelled (no running children remain)", + parent_execution_id + ); + } + } } /// Create execution routes diff --git a/crates/common/src/workflow/mod.rs b/crates/common/src/workflow/mod.rs index ba6dd4c..23234db 100644 --- a/crates/common/src/workflow/mod.rs +++ b/crates/common/src/workflow/mod.rs @@ -15,9 +15,9 @@ pub use pack_service::{ PackSyncResult, PackValidationResult, PackWorkflowService, PackWorkflowServiceConfig, }; pub use parser::{ - parse_workflow_file, parse_workflow_yaml, workflow_to_json, BackoffStrategy, DecisionBranch, - ParseError, ParseResult, PublishDirective, RetryConfig, Task, TaskTransition, TaskType, - WorkflowDefinition, + parse_workflow_file, parse_workflow_yaml, workflow_to_json, BackoffStrategy, + CancellationPolicy, DecisionBranch, ParseError, ParseResult, PublishDirective, RetryConfig, + Task, TaskTransition, TaskType, WorkflowDefinition, }; pub use registrar::{RegistrationOptions, RegistrationResult, WorkflowRegistrar}; pub use validator::{ValidationError, ValidationResult, WorkflowValidator}; diff --git a/crates/common/src/workflow/parser.rs b/crates/common/src/workflow/parser.rs index 85b6964..d32e408 100644 --- a/crates/common/src/workflow/parser.rs +++ b/crates/common/src/workflow/parser.rs @@ -127,6 +127,17 @@ pub struct WorkflowDefinition { /// Tags for categorization #[serde(default)] pub tags: Vec, + + /// Cancellation policy for the workflow. + /// + /// Controls what happens to running tasks when the workflow is cancelled: + /// - `allow_finish` (default): Running tasks are allowed to complete naturally. + /// Only pending/requested tasks are cancelled. The workflow waits for running + /// tasks to finish but does not dispatch any new tasks. + /// - `cancel_running`: All running and pending tasks are forcefully cancelled. + /// Running processes receive SIGINT → SIGTERM → SIGKILL via the worker. + #[serde(default, skip_serializing_if = "CancellationPolicy::is_default")] + pub cancellation_policy: CancellationPolicy, } // --------------------------------------------------------------------------- @@ -411,6 +422,27 @@ fn default_task_type() -> TaskType { TaskType::Action } +/// Policy controlling how running tasks are handled when a workflow is cancelled. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum CancellationPolicy { + /// Running tasks are allowed to complete naturally; only pending tasks are + /// cancelled and no new tasks are dispatched. This is the default. + #[default] + AllowFinish, + /// All running and pending tasks are forcefully cancelled. Running + /// processes receive SIGINT → SIGTERM → SIGKILL via the worker. + CancelRunning, +} + +impl CancellationPolicy { + /// Returns `true` when the value is the default ([`AllowFinish`]). + /// Used by `#[serde(skip_serializing_if)]` to keep stored JSON compact. + pub fn is_default(&self) -> bool { + matches!(self, Self::AllowFinish) + } +} + /// Task type enumeration #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] @@ -1509,4 +1541,93 @@ tasks: panic!("Expected Simple publish directive"); } } + + #[test] + fn test_cancellation_policy_defaults_to_allow_finish() { + let yaml = r#" + version: "1.0.0" + tasks: + - name: task1 + action: core.echo + input: + message: hello + "#; + let workflow = parse_workflow_yaml(yaml).unwrap(); + assert_eq!( + workflow.cancellation_policy, + CancellationPolicy::AllowFinish + ); + } + + #[test] + fn test_cancellation_policy_cancel_running() { + let yaml = r#" + version: "1.0.0" + cancellation_policy: cancel_running + tasks: + - name: task1 + action: core.echo + input: + message: hello + "#; + let workflow = parse_workflow_yaml(yaml).unwrap(); + assert_eq!( + workflow.cancellation_policy, + CancellationPolicy::CancelRunning + ); + } + + #[test] + fn test_cancellation_policy_allow_finish_explicit() { + let yaml = r#" + version: "1.0.0" + cancellation_policy: allow_finish + tasks: + - name: task1 + action: core.echo + input: + message: hello + "#; + let workflow = parse_workflow_yaml(yaml).unwrap(); + assert_eq!( + workflow.cancellation_policy, + CancellationPolicy::AllowFinish + ); + } + + #[test] + fn test_cancellation_policy_json_roundtrip() { + let yaml = r#" + version: "1.0.0" + cancellation_policy: cancel_running + tasks: + - name: step1 + action: core.echo + input: + message: hello + "#; + let workflow = parse_workflow_yaml(yaml).unwrap(); + let json = workflow_to_json(&workflow).unwrap(); + let restored: WorkflowDefinition = serde_json::from_value(json).unwrap(); + assert_eq!( + restored.cancellation_policy, + CancellationPolicy::CancelRunning + ); + } + + #[test] + fn test_cancellation_policy_absent_in_json_defaults() { + // Simulate a definition stored in the DB before this field existed + let json = serde_json::json!({ + "ref": "test.wf", + "label": "Test", + "version": "1.0.0", + "tasks": [{"name": "t1", "action": "core.echo", "input": {"message": "hi"}}] + }); + let workflow: WorkflowDefinition = serde_json::from_value(json).unwrap(); + assert_eq!( + workflow.cancellation_policy, + CancellationPolicy::AllowFinish + ); + } } diff --git a/crates/executor/src/scheduler.rs b/crates/executor/src/scheduler.rs index d207993..8f0c524 100644 --- a/crates/executor/src/scheduler.rs +++ b/crates/executor/src/scheduler.rs @@ -17,7 +17,7 @@ use attune_common::{ mq::{Consumer, ExecutionRequestedPayload, MessageEnvelope, MessageType, Publisher}, repositories::{ action::ActionRepository, - execution::{CreateExecutionInput, ExecutionRepository}, + execution::{CreateExecutionInput, ExecutionRepository, UpdateExecutionInput}, runtime::{RuntimeRepository, WorkerRepository}, workflow::{ CreateWorkflowExecutionInput, WorkflowDefinitionRepository, WorkflowExecutionRepository, @@ -884,10 +884,10 @@ impl ExecutionScheduler { anyhow::anyhow!("Workflow execution {} not found", workflow_execution_id) })?; - // Already in a terminal state — nothing to do + // Already fully terminal (Completed / Failed) — nothing to do if matches!( workflow_execution.status, - ExecutionStatus::Completed | ExecutionStatus::Failed | ExecutionStatus::Cancelled + ExecutionStatus::Completed | ExecutionStatus::Failed ) { debug!( "Workflow execution {} already in terminal state {:?}, skipping advance", @@ -896,6 +896,41 @@ impl ExecutionScheduler { return Ok(()); } + // Cancelled workflow: don't dispatch new tasks, but check whether all + // running children have now finished. When none remain, finalize the + // parent execution as Cancelled so it doesn't stay stuck in "Canceling". + if workflow_execution.status == ExecutionStatus::Cancelled { + let running = Self::count_running_workflow_children( + pool, + workflow_execution_id, + &workflow_execution.completed_tasks, + &workflow_execution.failed_tasks, + ) + .await?; + + if running == 0 { + info!( + "Cancelled workflow_execution {} has no more running children, \ + finalizing parent execution {} as Cancelled", + workflow_execution_id, workflow_execution.execution + ); + Self::finalize_cancelled_workflow( + pool, + workflow_execution.execution, + workflow_execution_id, + ) + .await?; + } else { + debug!( + "Cancelled workflow_execution {} still has {} running children, \ + waiting for them to finish", + workflow_execution_id, running + ); + } + + return Ok(()); + } + // Load the workflow definition so we can apply param_schema defaults let workflow_def = WorkflowDefinitionRepository::find_by_id(pool, workflow_execution.workflow_def) @@ -1375,6 +1410,32 @@ impl ExecutionScheduler { Ok(count) } + /// Finalize a cancelled workflow by updating the parent `execution` record + /// to `Cancelled`. The `workflow_execution` record is already `Cancelled` + /// (set by `cancel_workflow_children`); this only touches the parent. + async fn finalize_cancelled_workflow( + pool: &PgPool, + parent_execution_id: i64, + workflow_execution_id: i64, + ) -> Result<()> { + info!( + "Finalizing cancelled workflow: parent execution {} (workflow_execution {})", + parent_execution_id, workflow_execution_id + ); + + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Cancelled), + result: Some(serde_json::json!({ + "error": "Workflow cancelled", + "succeeded": false, + })), + ..Default::default() + }; + ExecutionRepository::update(pool, parent_execution_id, update).await?; + + Ok(()) + } + /// Mark a workflow as completed (success or failure) and update both the /// `workflow_execution` and parent `execution` records. async fn complete_workflow( diff --git a/web/src/pages/actions/WorkflowBuilderPage.tsx b/web/src/pages/actions/WorkflowBuilderPage.tsx index 61755ca..bb0f12e 100644 --- a/web/src/pages/actions/WorkflowBuilderPage.tsx +++ b/web/src/pages/actions/WorkflowBuilderPage.tsx @@ -40,6 +40,7 @@ import type { WorkflowBuilderState, PaletteAction, TransitionPreset, + CancellationPolicy, } from "@/types/workflow"; import { generateUniqueTaskName, @@ -53,6 +54,7 @@ import { removeTaskFromTransitions, renameTaskInTransitions, findStartingTaskIds, + CANCELLATION_POLICY_LABELS, } from "@/types/workflow"; const INITIAL_STATE: WorkflowBuilderState = { @@ -67,6 +69,7 @@ const INITIAL_STATE: WorkflowBuilderState = { tasks: [], tags: [], enabled: true, + cancellationPolicy: "allow_finish", }; export default function WorkflowBuilderPage() { @@ -135,6 +138,7 @@ export default function WorkflowBuilderPage() { const name = refParts.length > 1 ? refParts.slice(1).join(".") : workflow.ref; + const defn = workflow.definition as Record | undefined; const builderState = definitionToBuilderState( { ref: workflow.ref, @@ -143,10 +147,15 @@ export default function WorkflowBuilderPage() { version: workflow.version, parameters: workflow.param_schema || undefined, output: workflow.out_schema || undefined, - tasks: - ((workflow.definition as Record) - ?.tasks as WorkflowYamlDefinition["tasks"]) || [], + vars: (defn?.vars as Record) || undefined, + tasks: (defn?.tasks as WorkflowYamlDefinition["tasks"]) || [], + output_map: (defn?.output_map as Record) || undefined, tags: workflow.tags, + cancellation_policy: + (defn?.cancellation_policy as + | "allow_finish" + | "cancel_running" + | undefined) || undefined, }, workflow.pack_ref, name, @@ -843,6 +852,24 @@ export default function WorkflowBuilderPage() { /> Enabled + diff --git a/web/src/types/workflow.ts b/web/src/types/workflow.ts index 0b9819f..8c33bad 100644 --- a/web/src/types/workflow.ts +++ b/web/src/types/workflow.ts @@ -185,6 +185,23 @@ export interface WorkflowEdge { labelPosition?: number; } +/** + * Cancellation policy for a workflow. + * + * Controls what happens to running tasks when a workflow is cancelled: + * - `allow_finish` (default): Running tasks complete naturally; only + * pending/requested tasks are cancelled and no new tasks are dispatched. + * - `cancel_running`: All running and pending tasks are forcefully cancelled. + * Running processes receive SIGINT → SIGTERM → SIGKILL via the worker. + */ +export type CancellationPolicy = "allow_finish" | "cancel_running"; + +/** Human-readable labels for each cancellation policy */ +export const CANCELLATION_POLICY_LABELS: Record = { + allow_finish: "Allow running tasks to finish", + cancel_running: "Cancel running tasks", +}; + /** Complete workflow builder state */ export interface WorkflowBuilderState { /** Workflow name (used to derive ref and filename) */ @@ -209,6 +226,8 @@ export interface WorkflowBuilderState { tags: string[]; /** Whether the workflow is enabled */ enabled: boolean; + /** Cancellation policy (default: allow_finish) */ + cancellationPolicy: CancellationPolicy; } /** Parameter definition in flat schema format */ @@ -238,6 +257,7 @@ export interface WorkflowYamlDefinition { tasks: WorkflowYamlTask[]; output_map?: Record; tags?: string[]; + cancellation_policy?: CancellationPolicy; } /** @@ -252,6 +272,7 @@ export interface WorkflowGraphDefinition { vars?: Record; tasks: WorkflowYamlTask[]; output_map?: Record; + cancellation_policy?: CancellationPolicy; } /** @@ -450,6 +471,10 @@ export function builderStateToDefinition( definition.tags = state.tags; } + if (state.cancellationPolicy !== "allow_finish") { + definition.cancellation_policy = state.cancellationPolicy; + } + return definition; } @@ -537,6 +562,10 @@ export function builderStateToGraph( graph.vars = state.vars; } + if (state.cancellationPolicy !== "allow_finish") { + graph.cancellation_policy = state.cancellationPolicy; + } + return graph; } @@ -723,6 +752,7 @@ export function definitionToBuilderState( tasks, tags: definition.tags || [], enabled: true, + cancellationPolicy: definition.cancellation_policy || "allow_finish", }; } diff --git a/work-summary/2026-03-09-workflow-cancellation-policy.md b/work-summary/2026-03-09-workflow-cancellation-policy.md new file mode 100644 index 0000000..a905546 --- /dev/null +++ b/work-summary/2026-03-09-workflow-cancellation-policy.md @@ -0,0 +1,54 @@ +# Work Summary: Workflow Cancellation Policy + +**Date**: 2026-03-09 + +## Overview + +Added configurable cancellation behavior for workflows. When a workflow is cancelled, the policy controls whether currently running tasks are allowed to finish (default) or are forcefully terminated via SIGTERM. + +## Changes + +### Backend (Rust) + +**`crates/common/src/workflow/parser.rs`** +- Added `CancellationPolicy` enum with two variants: `AllowFinish` (default) and `CancelRunning` +- Added `cancellation_policy` field to `WorkflowDefinition` with `#[serde(default, skip_serializing_if)]` for backward compatibility +- Added 5 unit tests covering default behavior, explicit values, JSON round-trip, and deserialization of legacy definitions without the field + +**`crates/common/src/workflow/mod.rs`** +- Re-exported `CancellationPolicy` from the workflow module + +**`crates/api/src/routes/executions.rs`** +- Added `resolve_cancellation_policy()` helper that loads the workflow definition from DB and extracts the policy (falls back to `AllowFinish` on any failure) +- Refactored `cancel_workflow_children()` to resolve the policy and delegate to `cancel_workflow_children_with_policy()` +- `cancel_workflow_children_with_policy()` respects the policy: + - **`AllowFinish`**: Only cancels pre-running children (Requested/Scheduling/Scheduled). Running children are left alone to complete naturally. `advance_workflow` sees the cancelled `workflow_execution` and stops dispatching new tasks. + - **`CancelRunning`**: Cancels all children including running ones via SIGINT→SIGTERM→SIGKILL MQ messages to workers (previous hard-coded behavior). +- Policy is inherited through recursive cancellation of nested workflows + +### Frontend (TypeScript/React) + +**`web/src/types/workflow.ts`** +- Added `CancellationPolicy` type and `CANCELLATION_POLICY_LABELS` constant +- Added `cancellationPolicy` field to `WorkflowBuilderState` +- Added `cancellation_policy` field to `WorkflowYamlDefinition` and `WorkflowGraphDefinition` +- Updated `builderStateToDefinition()` and `builderStateToGraph()` to include the field (omitted when default) +- Updated `definitionToBuilderState()` to read the field back + +**`web/src/pages/actions/WorkflowBuilderPage.tsx`** +- Added `cancellationPolicy` to `INITIAL_STATE` +- Added a dropdown select in the metadata row for choosing the cancellation policy +- The setting persists into both the full definition and the raw YAML graph view + +## Design Decisions + +- **Default is `AllowFinish`**: This is the safest behavior — running tasks complete naturally, preventing data corruption from interrupted operations. +- **No migration needed**: The field is stored inside the `workflow_definition.definition` JSONB column. `#[serde(default)]` handles existing definitions that lack the field. +- **Policy inherited by nested workflows**: When a parent workflow is cancelled, its cancellation policy propagates to all descendant workflows rather than each resolving its own. +- **`skip_serializing_if`**: The default value is omitted from serialized JSON to keep stored definitions compact and backward-compatible. + +## Testing + +- 29/29 workflow parser tests pass (5 new) +- Zero warnings across entire Rust workspace +- Zero TypeScript errors \ No newline at end of file