From 8e91440f23b935c67ce6b3c6fea79bb53ae527c0 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Thu, 2 Apr 2026 11:33:26 -0500 Subject: [PATCH] [WIP] making executor ha --- crates/common/src/repositories/execution.rs | 227 +++ crates/common/src/repositories/workflow.rs | 51 + .../tests/execution_repository_tests.rs | 105 ++ crates/executor/src/scheduler.rs | 1316 +++++++++++++++-- docs/plans/executor-ha-horizontal-scaling.md | 317 ++++ migrations/20250101000006_workflow_system.sql | 4 +- 6 files changed, 1876 insertions(+), 144 deletions(-) create mode 100644 docs/plans/executor-ha-horizontal-scaling.md diff --git a/crates/common/src/repositories/execution.rs b/crates/common/src/repositories/execution.rs index 574696f..2982e0d 100644 --- a/crates/common/src/repositories/execution.rs +++ b/crates/common/src/repositories/execution.rs @@ -41,6 +41,12 @@ pub struct ExecutionSearchResult { pub total: u64, } +#[derive(Debug, Clone)] +pub struct WorkflowTaskExecutionCreateOrGetResult { + pub execution: Execution, + pub created: bool, +} + /// An execution row with optional `rule_ref` / `trigger_ref` populated from /// the joined `enforcement` table. This avoids a separate in-memory lookup. #[derive(Debug, Clone, sqlx::FromRow)] @@ -209,6 +215,134 @@ impl Update for ExecutionRepository { } impl ExecutionRepository { + pub async fn create_workflow_task_if_absent<'e, E>( + executor: E, + input: CreateExecutionInput, + workflow_execution_id: Id, + task_name: &str, + task_index: Option, + ) -> Result + where + E: Executor<'e, Database = Postgres> + Copy + 'e, + { + if let Some(execution) = + Self::find_by_workflow_task(executor, workflow_execution_id, task_name, task_index) + .await? + { + return Ok(WorkflowTaskExecutionCreateOrGetResult { + execution, + created: false, + }); + } + + let execution = Self::create(executor, input).await?; + + Ok(WorkflowTaskExecutionCreateOrGetResult { + execution, + created: true, + }) + } + + pub async fn claim_for_scheduling<'e, E>( + executor: E, + id: Id, + claiming_executor: Option, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + let sql = format!( + "UPDATE execution \ + SET status = $2, executor = COALESCE($3, executor), updated = NOW() \ + WHERE id = $1 AND status = $4 \ + RETURNING {SELECT_COLUMNS}" + ); + + sqlx::query_as::<_, Execution>(&sql) + .bind(id) + .bind(ExecutionStatus::Scheduling) + .bind(claiming_executor) + .bind(ExecutionStatus::Requested) + .fetch_optional(executor) + .await + .map_err(Into::into) + } + + pub async fn reclaim_stale_scheduling<'e, E>( + executor: E, + id: Id, + claiming_executor: Option, + stale_before: DateTime, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + let sql = format!( + "UPDATE execution \ + SET executor = COALESCE($2, executor), updated = NOW() \ + WHERE id = $1 AND status = $3 AND updated <= $4 \ + RETURNING {SELECT_COLUMNS}" + ); + + sqlx::query_as::<_, Execution>(&sql) + .bind(id) + .bind(claiming_executor) + .bind(ExecutionStatus::Scheduling) + .bind(stale_before) + .fetch_optional(executor) + .await + .map_err(Into::into) + } + + pub async fn update_if_status<'e, E>( + executor: E, + id: Id, + expected_status: ExecutionStatus, + input: UpdateExecutionInput, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + if input.status.is_none() + && input.result.is_none() + && input.executor.is_none() + && input.worker.is_none() + && input.started_at.is_none() + && input.workflow_task.is_none() + { + return Self::find_by_id(executor, id).await; + } + + Self::update_with_locator_optional(executor, input, |query| { + query.push(" WHERE id = ").push_bind(id); + query.push(" AND status = ").push_bind(expected_status); + }) + .await + } + + pub async fn revert_scheduled_to_requested<'e, E>( + executor: E, + id: Id, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + let sql = format!( + "UPDATE execution \ + SET status = $2, worker = NULL, executor = NULL, updated = NOW() \ + WHERE id = $1 AND status = $3 \ + RETURNING {SELECT_COLUMNS}" + ); + + sqlx::query_as::<_, Execution>(&sql) + .bind(id) + .bind(ExecutionStatus::Requested) + .bind(ExecutionStatus::Scheduled) + .fetch_optional(executor) + .await + .map_err(Into::into) + } + async fn update_with_locator<'e, E, F>( executor: E, input: UpdateExecutionInput, @@ -274,6 +408,71 @@ impl ExecutionRepository { .map_err(Into::into) } + async fn update_with_locator_optional<'e, E, F>( + executor: E, + input: UpdateExecutionInput, + where_clause: F, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + F: FnOnce(&mut QueryBuilder<'_, Postgres>), + { + let mut query = QueryBuilder::new("UPDATE execution SET "); + let mut has_updates = false; + + if let Some(status) = input.status { + query.push("status = ").push_bind(status); + has_updates = true; + } + if let Some(result) = &input.result { + if has_updates { + query.push(", "); + } + query.push("result = ").push_bind(result); + has_updates = true; + } + if let Some(executor_id) = input.executor { + if has_updates { + query.push(", "); + } + query.push("executor = ").push_bind(executor_id); + has_updates = true; + } + if let Some(worker_id) = input.worker { + if has_updates { + query.push(", "); + } + query.push("worker = ").push_bind(worker_id); + has_updates = true; + } + if let Some(started_at) = input.started_at { + if has_updates { + query.push(", "); + } + query.push("started_at = ").push_bind(started_at); + has_updates = true; + } + if let Some(workflow_task) = &input.workflow_task { + if has_updates { + query.push(", "); + } + query + .push("workflow_task = ") + .push_bind(sqlx::types::Json(workflow_task)); + } + + query.push(", updated = NOW()"); + where_clause(&mut query); + query.push(" RETURNING "); + query.push(SELECT_COLUMNS); + + query + .build_query_as::() + .fetch_optional(executor) + .await + .map_err(Into::into) + } + /// Update an execution using the loaded row's hypertable keys. /// /// Including both the partition key (`created`) and compression segment key @@ -356,6 +555,34 @@ impl ExecutionRepository { .map_err(Into::into) } + pub async fn find_by_workflow_task<'e, E>( + executor: E, + workflow_execution_id: Id, + task_name: &str, + task_index: Option, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + let sql = format!( + "SELECT {SELECT_COLUMNS} \ + FROM execution \ + WHERE workflow_task->>'workflow_execution' = $1::text \ + AND workflow_task->>'task_name' = $2 \ + AND (workflow_task->>'task_index')::int IS NOT DISTINCT FROM $3 \ + ORDER BY created ASC \ + LIMIT 1" + ); + + sqlx::query_as::<_, Execution>(&sql) + .bind(workflow_execution_id.to_string()) + .bind(task_name) + .bind(task_index) + .fetch_optional(executor) + .await + .map_err(Into::into) + } + /// Find all child executions for a given parent execution ID. /// /// Returns child executions ordered by creation time (ascending), diff --git a/crates/common/src/repositories/workflow.rs b/crates/common/src/repositories/workflow.rs index 0e5dc4d..3f885b1 100644 --- a/crates/common/src/repositories/workflow.rs +++ b/crates/common/src/repositories/workflow.rs @@ -411,6 +411,12 @@ impl WorkflowDefinitionRepository { pub struct WorkflowExecutionRepository; +#[derive(Debug, Clone)] +pub struct WorkflowExecutionCreateOrGetResult { + pub workflow_execution: WorkflowExecution, + pub created: bool, +} + impl Repository for WorkflowExecutionRepository { type Entity = WorkflowExecution; fn table_name() -> &'static str { @@ -606,6 +612,51 @@ impl Delete for WorkflowExecutionRepository { } impl WorkflowExecutionRepository { + pub async fn create_or_get_by_execution<'e, E>( + executor: E, + input: CreateWorkflowExecutionInput, + ) -> Result + where + E: Executor<'e, Database = Postgres> + Copy + 'e, + { + let inserted = sqlx::query_as::<_, WorkflowExecution>( + "INSERT INTO workflow_execution + (execution, workflow_def, task_graph, variables, status) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (execution) DO NOTHING + RETURNING id, execution, workflow_def, current_tasks, completed_tasks, failed_tasks, skipped_tasks, + variables, task_graph, status, error_message, paused, pause_reason, created, updated" + ) + .bind(input.execution) + .bind(input.workflow_def) + .bind(&input.task_graph) + .bind(&input.variables) + .bind(input.status) + .fetch_optional(executor) + .await?; + + if let Some(workflow_execution) = inserted { + return Ok(WorkflowExecutionCreateOrGetResult { + workflow_execution, + created: true, + }); + } + + let workflow_execution = Self::find_by_execution(executor, input.execution) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "workflow_execution for parent execution {} disappeared after conflict", + input.execution + ) + })?; + + Ok(WorkflowExecutionCreateOrGetResult { + workflow_execution, + created: false, + }) + } + /// Find workflow execution by the parent execution ID pub async fn find_by_execution<'e, E>( executor: E, diff --git a/crates/common/tests/execution_repository_tests.rs b/crates/common/tests/execution_repository_tests.rs index c30b7ba..7d87460 100644 --- a/crates/common/tests/execution_repository_tests.rs +++ b/crates/common/tests/execution_repository_tests.rs @@ -1153,3 +1153,108 @@ async fn test_execution_result_json() { assert_eq!(updated.result, Some(complex_result)); } + +#[tokio::test] +#[ignore = "integration test — requires database"] +async fn test_claim_for_scheduling_succeeds_once() { + let pool = create_test_pool().await.unwrap(); + + let pack = PackFixture::new_unique("claim_pack") + .create(&pool) + .await + .unwrap(); + let action = ActionFixture::new_unique(pack.id, &pack.r#ref, "claim_action") + .create(&pool) + .await + .unwrap(); + + let created = ExecutionRepository::create( + &pool, + CreateExecutionInput { + action: Some(action.id), + action_ref: action.r#ref.clone(), + config: None, + env_vars: None, + parent: None, + enforcement: None, + executor: None, + worker: None, + status: ExecutionStatus::Requested, + result: None, + workflow_task: None, + }, + ) + .await + .unwrap(); + + let first = ExecutionRepository::claim_for_scheduling(&pool, created.id, None) + .await + .unwrap(); + let second = ExecutionRepository::claim_for_scheduling(&pool, created.id, None) + .await + .unwrap(); + + assert_eq!(first.unwrap().status, ExecutionStatus::Scheduling); + assert!(second.is_none()); +} + +#[tokio::test] +#[ignore = "integration test — requires database"] +async fn test_update_if_status_only_updates_matching_row() { + let pool = create_test_pool().await.unwrap(); + + let pack = PackFixture::new_unique("conditional_pack") + .create(&pool) + .await + .unwrap(); + let action = ActionFixture::new_unique(pack.id, &pack.r#ref, "conditional_action") + .create(&pool) + .await + .unwrap(); + + let created = ExecutionRepository::create( + &pool, + CreateExecutionInput { + action: Some(action.id), + action_ref: action.r#ref.clone(), + config: None, + env_vars: None, + parent: None, + enforcement: None, + executor: None, + worker: None, + status: ExecutionStatus::Scheduling, + result: None, + workflow_task: None, + }, + ) + .await + .unwrap(); + + let updated = ExecutionRepository::update_if_status( + &pool, + created.id, + ExecutionStatus::Scheduling, + UpdateExecutionInput { + status: Some(ExecutionStatus::Scheduled), + worker: Some(77), + ..Default::default() + }, + ) + .await + .unwrap(); + let skipped = ExecutionRepository::update_if_status( + &pool, + created.id, + ExecutionStatus::Scheduling, + UpdateExecutionInput { + status: Some(ExecutionStatus::Failed), + ..Default::default() + }, + ) + .await + .unwrap(); + + assert_eq!(updated.unwrap().status, ExecutionStatus::Scheduled); + assert!(skipped.is_none()); +} diff --git a/crates/executor/src/scheduler.rs b/crates/executor/src/scheduler.rs index bead498..0b824f2 100644 --- a/crates/executor/src/scheduler.rs +++ b/crates/executor/src/scheduler.rs @@ -33,7 +33,7 @@ use attune_common::{ use chrono::Utc; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; -use sqlx::PgPool; +use sqlx::{PgConnection, PgPool}; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -120,6 +120,7 @@ const DEFAULT_HEARTBEAT_INTERVAL: u64 = 30; /// Maximum age multiplier for heartbeat staleness check /// Workers are considered stale if heartbeat is older than HEARTBEAT_INTERVAL * HEARTBEAT_STALENESS_MULTIPLIER const HEARTBEAT_STALENESS_MULTIPLIER: u64 = 3; +const SCHEDULING_RECLAIM_GRACE_SECONDS: i64 = 30; impl ExecutionScheduler { fn retryable_mq_error(error: &anyhow::Error) -> Option { @@ -230,6 +231,37 @@ impl ExecutionScheduler { } }; + if execution.status == ExecutionStatus::Scheduling { + if let Some(execution) = ExecutionRepository::reclaim_stale_scheduling( + pool, + execution_id, + None, + Utc::now() - chrono::Duration::seconds(SCHEDULING_RECLAIM_GRACE_SECONDS), + ) + .await? + { + warn!( + "Reclaimed stale scheduling claim for execution {} after {}s", + execution_id, SCHEDULING_RECLAIM_GRACE_SECONDS + ); + return Self::process_claimed_execution( + pool, + publisher, + policy_enforcer, + round_robin_counter, + envelope, + execution, + ) + .await; + } + + return Err(MqError::Timeout(format!( + "Execution {} is already being scheduled; retry later", + execution_id + )) + .into()); + } + if execution.status != ExecutionStatus::Requested { debug!( "Skipping execution {} with status {:?}; only Requested executions are schedulable", @@ -240,6 +272,43 @@ impl ExecutionScheduler { return Ok(()); } + let execution = + match ExecutionRepository::claim_for_scheduling(pool, execution_id, None).await? { + Some(execution) => execution, + None => { + return Self::handle_failed_scheduling_claim( + pool, + publisher, + policy_enforcer, + round_robin_counter, + envelope, + execution_id, + ) + .await; + } + }; + + Self::process_claimed_execution( + pool, + publisher, + policy_enforcer, + round_robin_counter, + envelope, + execution, + ) + .await + } + + async fn process_claimed_execution( + pool: &PgPool, + publisher: &Publisher, + policy_enforcer: &PolicyEnforcer, + round_robin_counter: &AtomicUsize, + envelope: &MessageEnvelope, + execution: Execution, + ) -> Result<()> { + let execution_id = execution.id; + // Fetch action to determine runtime requirements let action = Self::get_action_for_execution(pool, &execution).await?; @@ -249,7 +318,7 @@ impl ExecutionScheduler { "Action '{}' is a workflow, orchestrating instead of dispatching to worker", action.r#ref ); - return Self::process_workflow_execution( + let result = Self::process_workflow_execution( pool, publisher, round_robin_counter, @@ -257,6 +326,10 @@ impl ExecutionScheduler { &action, ) .await; + if result.is_err() { + Self::revert_scheduling_claim(pool, execution_id).await?; + } + return result; } // Apply parameter defaults from the action's param_schema. @@ -287,6 +360,23 @@ impl ExecutionScheduler { .await { Ok(SchedulingPolicyOutcome::Queued) => { + if ExecutionRepository::update_if_status( + pool, + execution_id, + ExecutionStatus::Scheduling, + UpdateExecutionInput { + status: Some(ExecutionStatus::Requested), + ..Default::default() + }, + ) + .await? + .is_none() + { + warn!( + "Execution {} could not be returned to Requested after queueing", + execution_id + ); + } info!( "Execution {} queued by policy for action {}; deferring worker selection", execution_id, action.id @@ -316,6 +406,23 @@ impl ExecutionScheduler { return Ok(()); } + if ExecutionRepository::update_if_status( + pool, + execution_id, + ExecutionStatus::Scheduling, + UpdateExecutionInput { + status: Some(ExecutionStatus::Requested), + ..Default::default() + }, + ) + .await? + .is_none() + { + warn!( + "Execution {} lost its scheduling claim before policy retry cleanup", + execution_id + ); + } return Err(err); } } @@ -343,6 +450,23 @@ impl ExecutionScheduler { Err(err) => { Self::release_acquired_policy_slot(policy_enforcer, pool, publisher, execution_id) .await?; + if ExecutionRepository::update_if_status( + pool, + execution_id, + ExecutionStatus::Scheduling, + UpdateExecutionInput { + status: Some(ExecutionStatus::Requested), + ..Default::default() + }, + ) + .await? + .is_none() + { + warn!( + "Execution {} lost its scheduling claim before worker-selection retry cleanup", + execution_id + ); + } return Err(err); } }; @@ -354,20 +478,29 @@ impl ExecutionScheduler { // Persist the selected worker so later cancellation requests can be // routed to the correct per-worker cancel queue. - let mut execution_for_update = execution; - execution_for_update.status = ExecutionStatus::Scheduled; - execution_for_update.worker = Some(worker.id); - if let Err(err) = ExecutionRepository::update_loaded( + let scheduled_execution = match ExecutionRepository::update_if_status( pool, - &execution_for_update, - execution_for_update.clone().into(), + execution_id, + ExecutionStatus::Scheduling, + UpdateExecutionInput { + status: Some(ExecutionStatus::Scheduled), + worker: Some(worker.id), + ..Default::default() + }, ) - .await + .await? { - Self::release_acquired_policy_slot(policy_enforcer, pool, publisher, execution_id) - .await?; - return Err(err.into()); - } + Some(execution) => execution, + None => { + warn!( + "Execution {} left Scheduling before worker {} could be assigned", + execution_id, worker.id + ); + Self::release_acquired_policy_slot(policy_enforcer, pool, publisher, execution_id) + .await?; + return Ok(()); + } + }; // Publish message to worker-specific queue if let Err(err) = Self::queue_to_worker( @@ -380,34 +513,93 @@ impl ExecutionScheduler { ) .await { - if let Err(revert_err) = ExecutionRepository::update( - pool, - execution_id, - UpdateExecutionInput { - status: Some(ExecutionStatus::Requested), - ..Default::default() - }, - ) - .await + if let Err(revert_err) = + Self::revert_scheduled_execution(pool, execution_id, policy_enforcer, publisher) + .await { warn!( "Failed to revert execution {} back to Requested after worker publish error: {}", execution_id, revert_err ); } - Self::release_acquired_policy_slot(policy_enforcer, pool, publisher, execution_id) - .await?; return Err(err); } info!( "Execution {} scheduled to worker {}", - execution_id, worker.id + execution_id, + scheduled_execution.worker.unwrap_or(worker.id) ); Ok(()) } + async fn handle_failed_scheduling_claim( + pool: &PgPool, + publisher: &Publisher, + policy_enforcer: &PolicyEnforcer, + round_robin_counter: &AtomicUsize, + envelope: &MessageEnvelope, + execution_id: i64, + ) -> Result<()> { + let execution = match ExecutionRepository::find_by_id(pool, execution_id).await? { + Some(execution) => execution, + None => { + Self::remove_queued_policy_execution( + policy_enforcer, + pool, + publisher, + execution_id, + ) + .await; + return Ok(()); + } + }; + + match execution.status { + ExecutionStatus::Requested => Err(MqError::Timeout(format!( + "Execution {} changed while claiming; retry later", + execution_id + )) + .into()), + ExecutionStatus::Scheduling => { + if let Some(execution) = ExecutionRepository::reclaim_stale_scheduling( + pool, + execution_id, + None, + Utc::now() - chrono::Duration::seconds(SCHEDULING_RECLAIM_GRACE_SECONDS), + ) + .await? + { + warn!( + "Recovered stale scheduling claim for execution {} after failed initial claim", + execution_id + ); + return Self::process_claimed_execution( + pool, + publisher, + policy_enforcer, + round_robin_counter, + envelope, + execution, + ) + .await; + } + + Err(MqError::Timeout(format!( + "Execution {} is still being scheduled; retry later", + execution_id + )) + .into()) + } + _ => { + Self::cleanup_unclaimable_execution(policy_enforcer, pool, publisher, execution_id) + .await?; + Ok(()) + } + } + } + // ----------------------------------------------------------------------- // Workflow orchestration // ----------------------------------------------------------------------- @@ -462,8 +654,7 @@ impl ExecutionScheduler { let initial_vars: JsonValue = serde_json::to_value(&definition.vars).unwrap_or_else(|_| serde_json::json!({})); - // Create workflow_execution record - let workflow_execution = WorkflowExecutionRepository::create( + let workflow_execution_result = WorkflowExecutionRepository::create_or_get_by_execution( pool, CreateWorkflowExecutionInput { execution: execution.id, @@ -474,17 +665,19 @@ impl ExecutionScheduler { }, ) .await?; + let workflow_execution = workflow_execution_result.workflow_execution; - info!( - "Created workflow_execution {} for workflow '{}' (parent execution {})", - workflow_execution.id, workflow_def.r#ref, execution.id - ); - - // Mark the parent execution as Running - let mut running_exec = execution.clone(); - running_exec.status = ExecutionStatus::Running; - ExecutionRepository::update_loaded(pool, &running_exec, running_exec.clone().into()) - .await?; + if workflow_execution_result.created { + info!( + "Created workflow_execution {} for workflow '{}' (parent execution {})", + workflow_execution.id, workflow_def.r#ref, execution.id + ); + } else { + info!( + "Reusing existing workflow_execution {} for workflow '{}' (parent execution {})", + workflow_execution.id, workflow_def.r#ref, execution.id + ); + } if graph.entry_points.is_empty() { warn!( @@ -495,6 +688,32 @@ impl ExecutionScheduler { return Ok(()); } + if ExecutionRepository::update_if_status( + pool, + execution.id, + ExecutionStatus::Scheduling, + UpdateExecutionInput { + status: Some(ExecutionStatus::Running), + ..Default::default() + }, + ) + .await? + .is_none() + { + let current = ExecutionRepository::find_by_id(pool, execution.id).await?; + if !matches!( + current.as_ref().map(|execution| execution.status), + Some( + ExecutionStatus::Running | ExecutionStatus::Completed | ExecutionStatus::Failed + ) + ) { + return Err(anyhow::anyhow!( + "Workflow parent execution {} left Scheduling before entry dispatch", + execution.id + )); + } + } + // Build initial workflow context from execution parameters and // workflow-level vars so that entry-point task inputs are rendered. // Apply defaults from the workflow's param_schema for any parameters @@ -517,7 +736,7 @@ impl ExecutionScheduler { // For each entry-point task, create a child execution and dispatch it for entry_task_name in &graph.entry_points { if let Some(task_node) = graph.get_task(entry_task_name) { - Self::dispatch_workflow_task( + Self::dispatch_or_resume_entry_workflow_task( pool, publisher, round_robin_counter, @@ -539,6 +758,85 @@ impl ExecutionScheduler { Ok(()) } + #[allow(clippy::too_many_arguments)] + async fn dispatch_or_resume_entry_workflow_task( + pool: &PgPool, + publisher: &Publisher, + round_robin_counter: &AtomicUsize, + parent_execution: &Execution, + workflow_execution_id: &i64, + task_node: &crate::workflow::graph::TaskNode, + wf_ctx: &WorkflowContext, + triggered_by: Option<&str>, + ) -> Result<()> { + let existing_children: Vec<(i64, Option, ExecutionStatus)> = sqlx::query_as( + "SELECT id, action, status \ + FROM execution \ + WHERE workflow_task->>'workflow_execution' = $1::text \ + AND workflow_task->>'task_name' = $2 \ + ORDER BY created ASC", + ) + .bind(workflow_execution_id.to_string()) + .bind(task_node.name.as_str()) + .fetch_all(pool) + .await?; + + if existing_children.is_empty() { + return Self::dispatch_workflow_task( + pool, + publisher, + round_robin_counter, + parent_execution, + workflow_execution_id, + task_node, + wf_ctx, + triggered_by, + ) + .await; + } + + if task_node.with_items.is_some() { + return Self::dispatch_workflow_task( + pool, + publisher, + round_robin_counter, + parent_execution, + workflow_execution_id, + task_node, + wf_ctx, + triggered_by, + ) + .await; + } + + for (child_id, action_id, status) in existing_children { + if status == ExecutionStatus::Requested { + let action_id = action_id.ok_or_else(|| { + anyhow::anyhow!( + "Workflow child execution {} has no action id while resuming task '{}'", + child_id, + task_node.name + ) + })?; + let child = ExecutionRepository::find_by_id(pool, child_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Execution {} not found", child_id))?; + + Self::publish_execution_requested( + pool, + publisher, + child_id, + action_id, + &child.action_ref, + parent_execution, + ) + .await?; + } + } + + Ok(()) + } + /// Create a child execution for a single workflow task and dispatch it to /// a worker. The child execution references the parent workflow execution /// via `workflow_task` metadata. @@ -657,8 +955,9 @@ impl ExecutionScheduler { completed_at: None, }; - // Create child execution record - let child_execution = ExecutionRepository::create( + // Create child execution record, or reuse an existing one if another + // scheduler/advance path already dispatched this workflow task. + let child_execution_result = ExecutionRepository::create_workflow_task_if_absent( pool, CreateExecutionInput { action: Some(task_action.id), @@ -673,35 +972,210 @@ impl ExecutionScheduler { result: None, workflow_task: Some(workflow_task), }, + *workflow_execution_id, + &task_node.name, + None, ) .await?; + let child_execution = child_execution_result.execution; - info!( - "Created child execution {} for workflow task '{}' (action '{}', workflow_execution {})", - child_execution.id, task_node.name, action_ref, workflow_execution_id - ); + if child_execution_result.created { + info!( + "Created child execution {} for workflow task '{}' (action '{}', workflow_execution {})", + child_execution.id, task_node.name, action_ref, workflow_execution_id + ); + } else { + debug!( + "Reusing child execution {} for workflow task '{}' (workflow_execution {})", + child_execution.id, task_node.name, workflow_execution_id + ); + } - // If the task's action is itself a workflow, the recursive - // `process_execution_requested` call will detect that and orchestrate - // it in turn. For regular actions it will be dispatched to a worker. - let payload = ExecutionRequestedPayload { - execution_id: child_execution.id, - action_id: Some(task_action.id), - action_ref: action_ref.clone(), - parent_id: Some(parent_execution.id), - enforcement_id: parent_execution.enforcement, - config: child_execution.config.clone(), + if child_execution.status == ExecutionStatus::Requested { + // If the task's action is itself a workflow, the recursive + // `process_execution_requested` call will detect that and orchestrate + // it in turn. For regular actions it will be dispatched to a worker. + let payload = ExecutionRequestedPayload { + execution_id: child_execution.id, + action_id: Some(task_action.id), + action_ref: action_ref.clone(), + parent_id: Some(parent_execution.id), + enforcement_id: parent_execution.enforcement, + config: child_execution.config.clone(), + }; + + let envelope = MessageEnvelope::new(MessageType::ExecutionRequested, payload) + .with_source("executor-scheduler"); + + publisher.publish_envelope(&envelope).await?; + + info!( + "Published ExecutionRequested for child execution {} (task '{}')", + child_execution.id, task_node.name + ); + } + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn dispatch_workflow_task_with_conn( + conn: &mut PgConnection, + publisher: &Publisher, + _round_robin_counter: &AtomicUsize, + parent_execution: &Execution, + workflow_execution_id: &i64, + task_node: &crate::workflow::graph::TaskNode, + wf_ctx: &WorkflowContext, + triggered_by: Option<&str>, + ) -> Result<()> { + let action_ref: String = match &task_node.action { + Some(a) => a.clone(), + None => { + warn!( + "Workflow task '{}' has no action reference, skipping", + task_node.name + ); + return Ok(()); + } }; - let envelope = MessageEnvelope::new(MessageType::ExecutionRequested, payload) - .with_source("executor-scheduler"); + let task_action = ActionRepository::find_by_ref(&mut *conn, &action_ref).await?; + let task_action = match task_action { + Some(a) => a, + None => { + error!( + "Action '{}' not found for workflow task '{}'", + action_ref, task_node.name + ); + return Err(anyhow::anyhow!( + "Action '{}' not found for workflow task '{}'", + action_ref, + task_node.name + )); + } + }; - publisher.publish_envelope(&envelope).await?; + if let Some(ref with_items_expr) = task_node.with_items { + return Self::dispatch_with_items_task_with_conn( + conn, + publisher, + parent_execution, + workflow_execution_id, + task_node, + &task_action, + &action_ref, + with_items_expr, + wf_ctx, + triggered_by, + ) + .await; + } - info!( - "Published ExecutionRequested for child execution {} (task '{}')", - child_execution.id, task_node.name - ); + let rendered_input = + if task_node.input.is_object() && !task_node.input.as_object().unwrap().is_empty() { + match wf_ctx.render_json(&task_node.input) { + Ok(rendered) => rendered, + Err(e) => { + warn!( + "Template rendering failed for task '{}': {}. Using raw input.", + task_node.name, e + ); + task_node.input.clone() + } + } + } else { + task_node.input.clone() + }; + + let task_config: Option = + if rendered_input.is_object() && !rendered_input.as_object().unwrap().is_empty() { + Some(rendered_input.clone()) + } else { + parent_execution.config.clone() + }; + + let workflow_task = WorkflowTaskMetadata { + workflow_execution: *workflow_execution_id, + task_name: task_node.name.clone(), + triggered_by: triggered_by.map(String::from), + task_index: None, + task_batch: None, + retry_count: 0, + max_retries: task_node + .retry + .as_ref() + .map(|r| r.count as i32) + .unwrap_or(0), + next_retry_at: None, + timeout_seconds: task_node.timeout.map(|t| t as i32), + timed_out: false, + duration_ms: None, + started_at: None, + completed_at: None, + }; + + let child_execution = if let Some(existing) = ExecutionRepository::find_by_workflow_task( + &mut *conn, + *workflow_execution_id, + &task_node.name, + None, + ) + .await? + { + existing + } else { + ExecutionRepository::create( + &mut *conn, + CreateExecutionInput { + action: Some(task_action.id), + action_ref: action_ref.clone(), + config: task_config, + env_vars: parent_execution.env_vars.clone(), + parent: Some(parent_execution.id), + enforcement: parent_execution.enforcement, + executor: None, + worker: None, + status: ExecutionStatus::Requested, + result: None, + workflow_task: Some(workflow_task), + }, + ) + .await? + }; + + if child_execution.status == ExecutionStatus::Requested { + info!( + "Created child execution {} for workflow task '{}' (action '{}', workflow_execution {})", + child_execution.id, task_node.name, action_ref, workflow_execution_id + ); + } else { + debug!( + "Reusing child execution {} for workflow task '{}' (workflow_execution {})", + child_execution.id, task_node.name, workflow_execution_id + ); + } + + if child_execution.status == ExecutionStatus::Requested { + let payload = ExecutionRequestedPayload { + execution_id: child_execution.id, + action_id: Some(task_action.id), + action_ref: action_ref.clone(), + parent_id: Some(parent_execution.id), + enforcement_id: parent_execution.enforcement, + config: child_execution.config.clone(), + }; + + let envelope = MessageEnvelope::new(MessageType::ExecutionRequested, payload) + .with_source("executor-scheduler"); + + publisher.publish_envelope(&envelope).await?; + + info!( + "Published ExecutionRequested for child execution {} (task '{}')", + child_execution.id, task_node.name + ); + } Ok(()) } @@ -767,9 +1241,37 @@ impl ExecutionScheduler { // Phase 1: Create ALL child execution records in the database. // Each row captures the fully-rendered input so we never need to // re-render templates later when publishing deferred items. + let existing_children: Vec<(i64, i32, ExecutionStatus)> = sqlx::query_as( + "SELECT id, COALESCE((workflow_task->>'task_index')::int, -1) as task_index, status \ + FROM execution \ + WHERE workflow_task->>'workflow_execution' = $1::text \ + AND workflow_task->>'task_name' = $2 \ + AND workflow_task->>'task_index' IS NOT NULL \ + ORDER BY (workflow_task->>'task_index')::int ASC", + ) + .bind(workflow_execution_id.to_string()) + .bind(task_node.name.as_str()) + .fetch_all(pool) + .await?; + + let existing_by_index: std::collections::HashMap = + existing_children + .into_iter() + .filter_map(|(id, task_index, status)| { + usize::try_from(task_index) + .ok() + .map(|index| (index, (id, status))) + }) + .collect(); + let mut child_ids: Vec = Vec::with_capacity(total); for (index, item) in items.iter().enumerate() { + if let Some((existing_id, _)) = existing_by_index.get(&index) { + child_ids.push(*existing_id); + continue; + } + let mut item_ctx = wf_ctx.clone(); item_ctx.set_current_item(item.clone(), index); @@ -819,7 +1321,7 @@ impl ExecutionScheduler { completed_at: None, }; - let child_execution = ExecutionRepository::create( + let child_execution_result = ExecutionRepository::create_workflow_task_if_absent( pool, CreateExecutionInput { action: Some(task_action.id), @@ -834,14 +1336,26 @@ impl ExecutionScheduler { result: None, workflow_task: Some(workflow_task), }, + *workflow_execution_id, + &task_node.name, + Some(index as i32), ) .await?; + let child_execution = child_execution_result.execution; - info!( - "Created with_items child execution {} for task '{}' item {} \ - (action '{}', workflow_execution {})", - child_execution.id, task_node.name, index, action_ref, workflow_execution_id - ); + if child_execution_result.created { + info!( + "Created with_items child execution {} for task '{}' item {} \ + (action '{}', workflow_execution {})", + child_execution.id, task_node.name, index, action_ref, workflow_execution_id + ); + } else { + debug!( + "Reusing with_items child execution {} for task '{}' item {} \ + (workflow_execution {})", + child_execution.id, task_node.name, index, workflow_execution_id + ); + } child_ids.push(child_execution.id); } @@ -850,15 +1364,216 @@ impl ExecutionScheduler { // The rest stay at Requested status until advance_workflow picks // them up as earlier items complete. for &child_id in child_ids.iter().take(dispatch_count) { - Self::publish_execution_requested( - pool, - publisher, - child_id, - task_action.id, - action_ref, - parent_execution, - ) - .await?; + let child = ExecutionRepository::find_by_id(pool, child_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Execution {} not found", child_id))?; + + if child.status == ExecutionStatus::Requested { + Self::publish_execution_requested( + pool, + publisher, + child_id, + task_action.id, + action_ref, + parent_execution, + ) + .await?; + } + } + + info!( + "Dispatched {} of {} with_items child executions for task '{}'", + dispatch_count, total, task_node.name + ); + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn dispatch_with_items_task_with_conn( + conn: &mut PgConnection, + publisher: &Publisher, + parent_execution: &Execution, + workflow_execution_id: &i64, + task_node: &crate::workflow::graph::TaskNode, + task_action: &Action, + action_ref: &str, + with_items_expr: &str, + wf_ctx: &WorkflowContext, + triggered_by: Option<&str>, + ) -> Result<()> { + let items_value = wf_ctx + .render_json(&JsonValue::String(with_items_expr.to_string())) + .map_err(|e| { + anyhow::anyhow!( + "Failed to resolve with_items expression '{}' for task '{}': {}", + with_items_expr, + task_node.name, + e + ) + })?; + + let items = match items_value.as_array() { + Some(arr) => arr.clone(), + None => { + warn!( + "with_items for task '{}' resolved to non-array value: {:?}. \ + Wrapping in single-element array.", + task_node.name, items_value + ); + vec![items_value] + } + }; + + let total = items.len(); + let concurrency_limit = task_node.concurrency.unwrap_or(1); + let dispatch_count = total.min(concurrency_limit); + + info!( + "Expanding with_items for task '{}': {} items (concurrency: {}, dispatching first {})", + task_node.name, total, concurrency_limit, dispatch_count + ); + + let existing_children: Vec<(i64, i32, ExecutionStatus)> = sqlx::query_as( + "SELECT id, COALESCE((workflow_task->>'task_index')::int, -1) as task_index, status \ + FROM execution \ + WHERE workflow_task->>'workflow_execution' = $1::text \ + AND workflow_task->>'task_name' = $2 \ + AND workflow_task->>'task_index' IS NOT NULL \ + ORDER BY (workflow_task->>'task_index')::int ASC", + ) + .bind(workflow_execution_id.to_string()) + .bind(task_node.name.as_str()) + .fetch_all(&mut *conn) + .await?; + + let existing_by_index: HashMap = existing_children + .into_iter() + .filter_map(|(id, task_index, status)| { + usize::try_from(task_index) + .ok() + .map(|index| (index, (id, status))) + }) + .collect(); + + let mut child_ids: Vec = Vec::with_capacity(total); + + for (index, item) in items.iter().enumerate() { + if let Some((existing_id, _)) = existing_by_index.get(&index) { + child_ids.push(*existing_id); + continue; + } + + let mut item_ctx = wf_ctx.clone(); + item_ctx.set_current_item(item.clone(), index); + + let rendered_input = if task_node.input.is_object() + && !task_node.input.as_object().unwrap().is_empty() + { + match item_ctx.render_json(&task_node.input) { + Ok(rendered) => rendered, + Err(e) => { + warn!( + "Template rendering failed for task '{}' item {}: {}. Using raw input.", + task_node.name, index, e + ); + task_node.input.clone() + } + } + } else { + task_node.input.clone() + }; + + let task_config: Option = + if rendered_input.is_object() && !rendered_input.as_object().unwrap().is_empty() { + Some(rendered_input.clone()) + } else { + parent_execution.config.clone() + }; + + let workflow_task = WorkflowTaskMetadata { + workflow_execution: *workflow_execution_id, + task_name: task_node.name.clone(), + triggered_by: triggered_by.map(String::from), + task_index: Some(index as i32), + task_batch: None, + retry_count: 0, + max_retries: task_node + .retry + .as_ref() + .map(|r| r.count as i32) + .unwrap_or(0), + next_retry_at: None, + timeout_seconds: task_node.timeout.map(|t| t as i32), + timed_out: false, + duration_ms: None, + started_at: None, + completed_at: None, + }; + + let child_execution = if let Some(existing) = + ExecutionRepository::find_by_workflow_task( + &mut *conn, + *workflow_execution_id, + &task_node.name, + Some(index as i32), + ) + .await? + { + existing + } else { + ExecutionRepository::create( + &mut *conn, + CreateExecutionInput { + action: Some(task_action.id), + action_ref: action_ref.to_string(), + config: task_config, + env_vars: parent_execution.env_vars.clone(), + parent: Some(parent_execution.id), + enforcement: parent_execution.enforcement, + executor: None, + worker: None, + status: ExecutionStatus::Requested, + result: None, + workflow_task: Some(workflow_task), + }, + ) + .await? + }; + + if child_execution.status == ExecutionStatus::Requested { + info!( + "Created with_items child execution {} for task '{}' item {} \ + (action '{}', workflow_execution {})", + child_execution.id, task_node.name, index, action_ref, workflow_execution_id + ); + } else { + debug!( + "Reusing with_items child execution {} for task '{}' item {} \ + (workflow_execution {})", + child_execution.id, task_node.name, index, workflow_execution_id + ); + } + + child_ids.push(child_execution.id); + } + + for &child_id in child_ids.iter().take(dispatch_count) { + let child = ExecutionRepository::find_by_id(&mut *conn, child_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Execution {} not found", child_id))?; + + if child.status == ExecutionStatus::Requested { + Self::publish_execution_requested_with_conn( + &mut *conn, + publisher, + child_id, + task_action.id, + action_ref, + parent_execution, + ) + .await?; + } } info!( @@ -907,6 +1622,40 @@ impl ExecutionScheduler { Ok(()) } + async fn publish_execution_requested_with_conn( + conn: &mut PgConnection, + publisher: &Publisher, + execution_id: i64, + action_id: i64, + action_ref: &str, + parent_execution: &Execution, + ) -> Result<()> { + let child = ExecutionRepository::find_by_id(&mut *conn, execution_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?; + + let payload = ExecutionRequestedPayload { + execution_id: child.id, + action_id: Some(action_id), + action_ref: action_ref.to_string(), + parent_id: Some(parent_execution.id), + enforcement_id: parent_execution.enforcement, + config: child.config.clone(), + }; + + let envelope = MessageEnvelope::new(MessageType::ExecutionRequested, payload) + .with_source("executor-scheduler"); + + publisher.publish_envelope(&envelope).await?; + + debug!( + "Published deferred ExecutionRequested for child execution {}", + execution_id + ); + + Ok(()) + } + /// Publish the next `Requested`-status with_items siblings to fill freed /// concurrency slots. /// @@ -916,6 +1665,7 @@ impl ExecutionScheduler { /// concurrency window. /// /// Returns the number of items dispatched. + #[allow(dead_code)] async fn publish_pending_with_items_children( pool: &PgPool, publisher: &Publisher, @@ -982,6 +1732,70 @@ impl ExecutionScheduler { Ok(dispatched) } + async fn publish_pending_with_items_children_with_conn( + conn: &mut PgConnection, + publisher: &Publisher, + parent_execution: &Execution, + workflow_execution_id: i64, + task_name: &str, + slots: usize, + ) -> Result { + if slots == 0 { + return Ok(0); + } + + let pending_rows: Vec<(i64, i64)> = sqlx::query_as( + "SELECT id, COALESCE(action, 0) as action_id \ + FROM execution \ + WHERE workflow_task->>'workflow_execution' = $1::text \ + AND workflow_task->>'task_name' = $2 \ + AND status = 'requested' \ + ORDER BY (workflow_task->>'task_index')::int ASC \ + LIMIT $3", + ) + .bind(workflow_execution_id.to_string()) + .bind(task_name) + .bind(slots as i64) + .fetch_all(&mut *conn) + .await?; + + let mut dispatched = 0usize; + for (child_id, action_id) in &pending_rows { + let child = match ExecutionRepository::find_by_id(&mut *conn, *child_id).await? { + Some(c) => c, + None => continue, + }; + + if let Err(e) = Self::publish_execution_requested_with_conn( + &mut *conn, + publisher, + *child_id, + *action_id, + &child.action_ref, + parent_execution, + ) + .await + { + error!( + "Failed to publish pending with_items child {}: {}", + child_id, e + ); + } else { + dispatched += 1; + } + } + + if dispatched > 0 { + info!( + "Published {} pending with_items children for task '{}' \ + (workflow_execution {})", + dispatched, task_name, workflow_execution_id + ); + } + + Ok(dispatched) + } + /// Advance a workflow after a child task completes. Called from the /// completion listener when it detects that the completed execution has /// `workflow_task` metadata. @@ -993,6 +1807,40 @@ impl ExecutionScheduler { publisher: &Publisher, round_robin_counter: &AtomicUsize, execution: &Execution, + ) -> Result<()> { + let workflow_execution_id = match execution.workflow_task.as_ref() { + Some(workflow_task) => workflow_task.workflow_execution, + None => return Ok(()), + }; + + let mut lock_conn = pool.acquire().await?; + sqlx::query("SELECT pg_advisory_lock($1)") + .bind(workflow_execution_id) + .execute(&mut *lock_conn) + .await?; + + let result = Self::advance_workflow_serialized( + &mut lock_conn, + publisher, + round_robin_counter, + execution, + ) + .await; + let unlock_result = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(workflow_execution_id) + .execute(&mut *lock_conn) + .await; + + result?; + unlock_result?; + Ok(()) + } + + async fn advance_workflow_serialized( + conn: &mut PgConnection, + publisher: &Publisher, + round_robin_counter: &AtomicUsize, + execution: &Execution, ) -> Result<()> { let workflow_task = match &execution.workflow_task { Some(wt) => wt, @@ -1019,7 +1867,7 @@ impl ExecutionScheduler { // Load the workflow execution record let workflow_execution = - WorkflowExecutionRepository::find_by_id(pool, workflow_execution_id) + WorkflowExecutionRepository::find_by_id(&mut *conn, workflow_execution_id) .await? .ok_or_else(|| { anyhow::anyhow!("Workflow execution {} not found", workflow_execution_id) @@ -1037,15 +1885,16 @@ impl ExecutionScheduler { return Ok(()); } - let parent_execution = ExecutionRepository::find_by_id(pool, workflow_execution.execution) - .await? - .ok_or_else(|| { - anyhow::anyhow!( - "Parent execution {} not found for workflow_execution {}", - workflow_execution.execution, - workflow_execution_id - ) - })?; + let parent_execution = + ExecutionRepository::find_by_id(&mut *conn, workflow_execution.execution) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "Parent execution {} not found for workflow_execution {}", + workflow_execution.execution, + workflow_execution_id + ) + })?; // Cancellation must be a hard stop for workflow orchestration. Once // either the workflow record, the parent execution, or the completed @@ -1057,8 +1906,8 @@ impl ExecutionScheduler { execution.status, ) { if workflow_execution.status == ExecutionStatus::Cancelled { - let running = Self::count_running_workflow_children( - pool, + let running = Self::count_running_workflow_children_with_conn( + &mut *conn, workflow_execution_id, &workflow_execution.completed_tasks, &workflow_execution.failed_tasks, @@ -1071,8 +1920,8 @@ impl ExecutionScheduler { finalizing parent execution {} as Cancelled", workflow_execution_id, workflow_execution.execution ); - Self::finalize_cancelled_workflow( - pool, + Self::finalize_cancelled_workflow_with_conn( + &mut *conn, workflow_execution.execution, workflow_execution_id, ) @@ -1100,7 +1949,7 @@ impl ExecutionScheduler { // Load the workflow definition so we can apply param_schema defaults let workflow_def = - WorkflowDefinitionRepository::find_by_id(pool, workflow_execution.workflow_def) + WorkflowDefinitionRepository::find_by_id(&mut *conn, workflow_execution.workflow_def) .await? .ok_or_else(|| { anyhow::anyhow!( @@ -1134,7 +1983,7 @@ impl ExecutionScheduler { // fill the slot freed by this completion. // --------------------------------------------------------- let parent_for_pending = - ExecutionRepository::find_by_id(pool, workflow_execution.execution) + ExecutionRepository::find_by_id(&mut *conn, workflow_execution.execution) .await? .ok_or_else(|| { anyhow::anyhow!( @@ -1158,7 +2007,7 @@ impl ExecutionScheduler { .bind(workflow_execution_id.to_string()) .bind(task_name) .bind(execution.id) - .fetch_one(pool) + .fetch_one(&mut *conn) .await?; // Determine the concurrency limit from the task graph @@ -1170,8 +2019,8 @@ impl ExecutionScheduler { let free_slots = concurrency_limit.saturating_sub(in_flight_count.0 as usize); if free_slots > 0 { - if let Err(e) = Self::publish_pending_with_items_children( - pool, + if let Err(e) = Self::publish_pending_with_items_children_with_conn( + &mut *conn, publisher, &parent_for_pending, workflow_execution_id, @@ -1200,7 +2049,7 @@ impl ExecutionScheduler { .bind(workflow_execution_id.to_string()) .bind(task_name) .bind(execution.id) - .fetch_all(pool) + .fetch_all(&mut *conn) .await?; if !siblings_remaining.is_empty() { @@ -1258,7 +2107,7 @@ impl ExecutionScheduler { ) .bind(workflow_execution_id.to_string()) .bind(task_name) - .fetch_all(pool) + .fetch_all(&mut *conn) .await?; if any_failed.is_empty() { @@ -1288,7 +2137,7 @@ impl ExecutionScheduler { // Collect results from all completed children of this workflow let child_executions = - ExecutionRepository::find_by_parent(pool, parent_execution.id).await?; + ExecutionRepository::find_by_parent(&mut *conn, parent_execution.id).await?; let mut task_results_map: HashMap = HashMap::new(); for child in &child_executions { if let Some(ref wt) = child.workflow_task { @@ -1388,41 +2237,6 @@ impl ExecutionScheduler { continue; } - // Guard against dispatching a task that has already - // been dispatched (exists as a child execution in - // this workflow). This catches edge cases where - // the persisted completed/failed lists haven't been - // updated yet but a child execution was already - // created by a prior advance_workflow call. - // - // This is critical for with_items predecessors: - // workers update DB status to Completed before the - // completion MQ message is processed, so multiple - // with_items items can each see "0 siblings - // remaining" and attempt to dispatch the same - // successor. The query covers both regular tasks - // (task_index IS NULL) and with_items tasks - // (task_index IS NOT NULL) so that neither case - // can be double-dispatched. - let already_dispatched: (i64,) = sqlx::query_as( - "SELECT COUNT(*) \ - FROM execution \ - WHERE workflow_task->>'workflow_execution' = $1::text \ - AND workflow_task->>'task_name' = $2", - ) - .bind(workflow_execution_id.to_string()) - .bind(next_task_name.as_str()) - .fetch_one(pool) - .await?; - - if already_dispatched.0 > 0 { - debug!( - "Skipping task '{}' — already dispatched ({} existing execution(s))", - next_task_name, already_dispatched.0 - ); - continue; - } - // Check join barrier: if the task has a `join` count, // only schedule it when enough predecessors are done. if let Some(next_node) = graph.get_task(next_task_name) { @@ -1453,8 +2267,8 @@ impl ExecutionScheduler { // Check if any tasks are still running (children of this workflow // that haven't completed yet). We query child executions that have // workflow_task metadata pointing to our workflow_execution. - let running_children = Self::count_running_workflow_children( - pool, + let running_children = Self::count_running_workflow_children_with_conn( + &mut *conn, workflow_execution_id, &completed_tasks, &failed_tasks, @@ -1464,8 +2278,8 @@ impl ExecutionScheduler { // Dispatch successor tasks, passing the updated workflow context for next_task_name in &tasks_to_schedule { if let Some(task_node) = graph.get_task(next_task_name) { - if let Err(e) = Self::dispatch_workflow_task( - pool, + if let Err(e) = Self::dispatch_workflow_task_with_conn( + &mut *conn, publisher, round_robin_counter, &parent_execution, @@ -1494,7 +2308,7 @@ impl ExecutionScheduler { // completed/failed task lists. let updated_variables = wf_ctx.export_variables(); WorkflowExecutionRepository::update( - pool, + &mut *conn, workflow_execution_id, attune_common::repositories::workflow::UpdateWorkflowExecutionInput { current_tasks: Some(current_tasks), @@ -1525,8 +2339,8 @@ impl ExecutionScheduler { } else { None }; - Self::complete_workflow( - pool, + Self::complete_workflow_with_conn( + &mut *conn, parent_execution.id, workflow_execution_id, !has_failures, @@ -1539,6 +2353,7 @@ impl ExecutionScheduler { } /// Count child executions that are still in progress for a workflow. + #[allow(dead_code)] async fn count_running_workflow_children( pool: &PgPool, workflow_execution_id: i64, @@ -1566,6 +2381,30 @@ impl ExecutionScheduler { Ok(count) } + async fn count_running_workflow_children_with_conn( + conn: &mut PgConnection, + workflow_execution_id: i64, + completed_tasks: &[String], + failed_tasks: &[String], + ) -> Result { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT workflow_task->>'task_name' as task_name \ + FROM execution \ + WHERE workflow_task->>'workflow_execution' = $1::text \ + AND status NOT IN ('completed', 'failed', 'timeout', 'cancelled')", + ) + .bind(workflow_execution_id.to_string()) + .fetch_all(&mut *conn) + .await?; + + let count = rows + .iter() + .filter(|(tn,)| !completed_tasks.contains(tn) && !failed_tasks.contains(tn)) + .count(); + + Ok(count) + } + fn should_halt_workflow_advancement( workflow_status: ExecutionStatus, parent_status: ExecutionStatus, @@ -1586,6 +2425,7 @@ impl ExecutionScheduler { /// Finalize a cancelled workflow by updating the parent `execution` record /// to `Cancelled`. The `workflow_execution` record is already `Cancelled` /// (set by `cancel_workflow_children`); this only touches the parent. + #[allow(dead_code)] async fn finalize_cancelled_workflow( pool: &PgPool, parent_execution_id: i64, @@ -1609,6 +2449,29 @@ impl ExecutionScheduler { Ok(()) } + async fn finalize_cancelled_workflow_with_conn( + conn: &mut PgConnection, + parent_execution_id: i64, + workflow_execution_id: i64, + ) -> Result<()> { + info!( + "Finalizing cancelled workflow: parent execution {} (workflow_execution {})", + parent_execution_id, workflow_execution_id + ); + + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Cancelled), + result: Some(serde_json::json!({ + "error": "Workflow cancelled", + "succeeded": false, + })), + ..Default::default() + }; + ExecutionRepository::update(&mut *conn, parent_execution_id, update).await?; + + Ok(()) + } + /// Mark a workflow as completed (success or failure) and update both the /// `workflow_execution` and parent `execution` records. async fn complete_workflow( @@ -1667,6 +2530,60 @@ impl ExecutionScheduler { Ok(()) } + async fn complete_workflow_with_conn( + conn: &mut PgConnection, + parent_execution_id: i64, + workflow_execution_id: i64, + success: bool, + error_message: Option<&str>, + ) -> Result<()> { + let status = if success { + ExecutionStatus::Completed + } else { + ExecutionStatus::Failed + }; + + info!( + "Completing workflow_execution {} with status {:?} (parent execution {})", + workflow_execution_id, status, parent_execution_id + ); + + WorkflowExecutionRepository::update( + &mut *conn, + workflow_execution_id, + attune_common::repositories::workflow::UpdateWorkflowExecutionInput { + current_tasks: Some(vec![]), + completed_tasks: None, + failed_tasks: None, + skipped_tasks: None, + variables: None, + status: Some(status), + error_message: error_message.map(|s| s.to_string()), + paused: None, + pause_reason: None, + }, + ) + .await?; + + let parent = ExecutionRepository::find_by_id(&mut *conn, parent_execution_id).await?; + if let Some(mut parent) = parent { + parent.status = status; + parent.result = if !success { + Some(serde_json::json!({ + "error": error_message.unwrap_or("Workflow failed"), + "succeeded": false, + })) + } else { + Some(serde_json::json!({ + "succeeded": true, + })) + }; + ExecutionRepository::update(&mut *conn, parent.id, parent.into()).await?; + } + + Ok(()) + } + // ----------------------------------------------------------------------- // Regular action scheduling helpers // ----------------------------------------------------------------------- @@ -1974,9 +2891,10 @@ impl ExecutionScheduler { "failed_at": completed_at.to_rfc3339(), }); - ExecutionRepository::update( + let updated = ExecutionRepository::update_if_status( pool, execution_id, + ExecutionStatus::Scheduling, UpdateExecutionInput { status: Some(ExecutionStatus::Failed), result: Some(result.clone()), @@ -1985,6 +2903,14 @@ impl ExecutionScheduler { ) .await?; + if updated.is_none() { + warn!( + "Skipping unschedulable failure for execution {} because it already left Scheduling", + execution_id + ); + return Ok(()); + } + let completed = MessageEnvelope::new( MessageType::ExecutionCompleted, ExecutionCompletedPayload { @@ -2027,9 +2953,10 @@ impl ExecutionScheduler { "cancelled_at": completed_at.to_rfc3339(), }); - ExecutionRepository::update( + let updated = ExecutionRepository::update_if_status( pool, execution_id, + ExecutionStatus::Scheduling, UpdateExecutionInput { status: Some(ExecutionStatus::Cancelled), result: Some(result.clone()), @@ -2038,6 +2965,14 @@ impl ExecutionScheduler { ) .await?; + if updated.is_none() { + warn!( + "Skipping policy cancellation for execution {} because it already left Scheduling", + execution_id + ); + return Ok(()); + } + let completed = MessageEnvelope::new( MessageType::ExecutionCompleted, ExecutionCompletedPayload { @@ -2062,6 +2997,103 @@ impl ExecutionScheduler { Ok(()) } + async fn revert_scheduled_execution( + pool: &PgPool, + execution_id: i64, + policy_enforcer: &PolicyEnforcer, + publisher: &Publisher, + ) -> Result<()> { + match ExecutionRepository::revert_scheduled_to_requested(pool, execution_id).await? { + Some(_) => { + Self::release_acquired_policy_slot(policy_enforcer, pool, publisher, execution_id) + .await?; + } + None => { + let execution = ExecutionRepository::find_by_id(pool, execution_id).await?; + let should_release_slot = match execution.as_ref().map(|execution| execution.status) + { + Some( + ExecutionStatus::Running + | ExecutionStatus::Completed + | ExecutionStatus::Timeout + | ExecutionStatus::Abandoned, + ) => false, + Some( + ExecutionStatus::Requested + | ExecutionStatus::Scheduling + | ExecutionStatus::Scheduled + | ExecutionStatus::Failed + | ExecutionStatus::Canceling + | ExecutionStatus::Cancelled, + ) => true, + None => true, + }; + + if should_release_slot { + Self::release_acquired_policy_slot( + policy_enforcer, + pool, + publisher, + execution_id, + ) + .await?; + } + + warn!( + "Execution {} left Scheduled before scheduler could revert it after publish failure", + execution_id + ); + } + } + + Ok(()) + } + + async fn revert_scheduling_claim(pool: &PgPool, execution_id: i64) -> Result<()> { + if ExecutionRepository::update_if_status( + pool, + execution_id, + ExecutionStatus::Scheduling, + UpdateExecutionInput { + status: Some(ExecutionStatus::Requested), + ..Default::default() + }, + ) + .await? + .is_none() + { + debug!( + "Execution {} left Scheduling before claim revert after workflow-start error", + execution_id + ); + } + + Ok(()) + } + + async fn cleanup_unclaimable_execution( + policy_enforcer: &PolicyEnforcer, + pool: &PgPool, + publisher: &Publisher, + execution_id: i64, + ) -> Result<()> { + let execution = ExecutionRepository::find_by_id(pool, execution_id).await?; + match execution.as_ref().map(|execution| execution.status) { + Some(ExecutionStatus::Requested | ExecutionStatus::Scheduling) => {} + _ => { + Self::remove_queued_policy_execution( + policy_enforcer, + pool, + publisher, + execution_id, + ) + .await; + } + } + + Ok(()) + } + /// Check if a worker's heartbeat is fresh enough to schedule work /// /// A worker is considered fresh if its last heartbeat is within diff --git a/docs/plans/executor-ha-horizontal-scaling.md b/docs/plans/executor-ha-horizontal-scaling.md new file mode 100644 index 0000000..529ce99 --- /dev/null +++ b/docs/plans/executor-ha-horizontal-scaling.md @@ -0,0 +1,317 @@ +# Executor HA Horizontal Scaling Plan + +## Overview + +This plan describes the changes required to make the Attune executor service safe to run with multiple replicas. The current implementation already uses RabbitMQ competing consumers for most work distribution, but several correctness-critical parts of the executor still rely on process-local memory or non-atomic database updates. Those assumptions make the service unsafe under horizontal scaling, message replay, or partial failure. + +The goal of this plan is to make executor behavior correct under: + +- Multiple executor replicas running concurrently +- Message redelivery from RabbitMQ +- Replica crash/restart during scheduling or workflow advancement +- Background recovery loops running on more than one replica + +## Problem Summary + +The current executor has five HA blockers: + +1. **Concurrency/FIFO control is process-local.** + `ExecutionQueueManager` stores active slots and waiting queues in memory. That means one replica can admit work while another replica receives the completion notification, causing slot release and queue advancement to fail. + +2. **Execution scheduling has no atomic claim step.** + The scheduler reads an execution in `requested`, does policy checks and worker selection, then updates it later. Two replicas can both observe the same row as schedulable and both dispatch it. + +3. **Workflow orchestration is not serialized.** + Workflow start and workflow advancement perform read-check-create-update sequences without a distributed lock or optimistic version check, so duplicate successor tasks can be created. + +4. **Event/enforcement/inquiry handlers are not idempotent.** + Duplicate message delivery can create duplicate enforcements, duplicate executions, duplicate workflow starts, and duplicate inquiries. + +5. **Timeout and DLQ handlers use non-conditional updates.** + Recovery loops can overwrite newer worker-owned state if the execution changes between the initial read and the final update. + +## Goals + +- Make execution scheduling single-winner under multiple executor replicas +- Make policy-based concurrency control and FIFO queueing shared across replicas +- Make workflow start and workflow advancement idempotent and serialized +- Make duplicate message delivery safe for executor-owned entities +- Make recovery loops safe to run on every replica + +## Non-Goals + +- Re-architecting RabbitMQ usage across the whole platform +- Replacing PostgreSQL with a dedicated distributed lock service +- Solving general worker autoscaling or Kubernetes deployment concerns in this phase +- Reworking unrelated executor features like retry policy design unless needed for HA correctness + +## Design Principles + +### Database is the source of truth + +All coordination state that affects correctness must live in PostgreSQL, not in executor memory. In-memory caches and metrics are fine as optimization or observability layers, but correctness cannot depend on them. + +### State transitions must be compare-and-swap + +Any executor action that changes ownership or lifecycle state must use an atomic update that verifies the prior state in the same statement or transaction. + +### Handlers must be idempotent on domain keys + +RabbitMQ gives at-least-once delivery semantics. The executor must therefore tolerate duplicate delivery even when the same message is processed more than once or by different replicas. + +### Workflow orchestration must be serialized per workflow + +A workflow execution should have exactly one active mutator at a time when evaluating transitions and dispatching successor tasks. + +## Proposed Implementation Phases + +## Phase 1: Atomic Execution Claiming + +### Objective + +Ensure only one executor replica can claim a `requested` execution for scheduling. + +### Changes + +**`crates/common/src/repositories/execution.rs`** + +- Add a repository method for atomic status transition, for example: + - `claim_for_scheduling(id, expected_status, executor_id) -> Option` + - or a more general compare-and-swap helper +- Implement the claim as a single `UPDATE ... WHERE id = $1 AND status = 'requested' RETURNING ...` +- Optionally persist the claiming replica identity in `execution.executor` for debugging and traceability + +**`crates/executor/src/scheduler.rs`** + +- Claim the execution before policy enforcement, worker selection, workflow start, or any side effects +- Use `scheduling` as the claimed intermediate state +- If the claim returns no row, treat the execution as already claimed/handled and acknowledge the message +- Convert all later scheduler writes to conditional transitions from claimed state + +### Success Criteria + +- Two schedulers racing on the same execution cannot both dispatch it +- Redelivered `execution.requested` messages become harmless no-ops after the first successful claim + +## Phase 2: Shared Concurrency Control and FIFO Queueing + +### Objective + +Replace the in-memory `ExecutionQueueManager` as the source of truth for concurrency slots and waiting order. + +### Changes + +**New schema** + +Add database-backed coordination tables, likely along these lines: + +- `execution_admission_slot` + - active slot ownership for an action/group key +- `execution_admission_queue` + - ordered waiting executions for an action/group key + +Alternative naming is fine, but the design needs to support: + +- Action-level concurrency limits +- Parameter-group concurrency keys +- FIFO ordering within each action/group +- Deterministic advancement when a slot is released + +**`crates/executor/src/policy_enforcer.rs`** + +- Replace `ExecutionQueueManager` slot acquisition with DB-backed admission logic +- Keep existing policy semantics: + - `enqueue` + - `cancel` + - parameter-based concurrency grouping + +**`crates/executor/src/completion_listener.rs`** + +- Release the shared slot transactionally on completion +- Select and wake the next queued execution in the same transaction +- Republish only after the DB state is committed + +**`crates/executor/src/queue_manager.rs`** + +- Either remove it entirely or reduce it to a thin adapter over DB-backed coordination +- Do not keep active slot ownership in process-local `DashMap` + +**`crates/common/src/repositories/queue_stats.rs`** + +- Keep `queue_stats` as derived telemetry only +- Do not rely on it for correctness + +### Success Criteria + +- Completion processed by a different executor replica still releases the correct slot +- FIFO ordering holds across multiple executor replicas +- Restarting an executor does not lose queue ownership state + +## Phase 3: Workflow Start Idempotency and Serialized Advancement + +### Objective + +Ensure workflow orchestration is safe under concurrent replicas and duplicate completion messages. + +### Changes + +**Migration** + +Add a uniqueness constraint to guarantee one workflow state row per parent execution: + +```sql +ALTER TABLE workflow_execution +ADD CONSTRAINT uq_workflow_execution_execution UNIQUE (execution); +``` + +**`crates/executor/src/scheduler.rs`** + +- Change workflow start to be idempotent: + - either `INSERT ... ON CONFLICT ...` + - or claim parent execution first and only create workflow state once +- When advancing a workflow: + - wrap read/decide/write logic in a transaction + - lock the `workflow_execution` row with `SELECT ... FOR UPDATE` + - or use advisory locks keyed by workflow execution id + +**Successor dispatch dedupe** + +Add a durable uniqueness guarantee for child task dispatch, for example: + +- one unique key for regular workflow tasks: + - `(workflow_execution_id, task_name, task_index IS NULL)` +- one unique key for `with_items` children: + - `(workflow_execution_id, task_name, task_index)` + +This may be implemented with explicit columns or a dedupe table if indexing the current JSONB layout is awkward. + +**Repository support** + +- Add workflow repository helpers that support transactional locking and conditional updates +- Avoid blind overwrite of `completed_tasks`, `failed_tasks`, and `variables` outside a serialized transaction + +### Success Criteria + +- Starting the same workflow twice cannot create two `workflow_execution` rows +- Duplicate `execution.completed` delivery for a workflow child cannot create duplicate successor executions +- Two executor replicas cannot concurrently mutate the same workflow state + +## Phase 4: Idempotent Event, Enforcement, and Inquiry Handling + +### Objective + +Make duplicate delivery safe for all earlier and later executor-owned side effects. + +### Changes + +**Enforcement dedupe** + +Add a uniqueness rule so one event/rule pair produces at most one enforcement when `event` is present. + +Example: + +```sql +CREATE UNIQUE INDEX uq_enforcement_rule_event +ON enforcement(rule, event) +WHERE event IS NOT NULL; +``` + +**`crates/executor/src/event_processor.rs`** + +- Use upsert-or-ignore semantics for enforcement creation +- Treat uniqueness conflicts as idempotent success + +**`crates/executor/src/enforcement_processor.rs`** + +- Check current enforcement status before creating an execution +- Add a durable relation that prevents an enforcement from creating more than one top-level execution +- Options: + - unique partial index on `execution(enforcement)` for top-level executions + - or a separate coordination record + +**Inquiry dedupe** + +- Prevent duplicate inquiry creation per execution result/completion path +- Add a unique invariant such as one active inquiry per execution, if that matches product semantics +- Update completion handling to tolerate duplicate `execution.completed` + +### Success Criteria + +- Duplicate `event.created` does not create duplicate enforcements +- Duplicate `enforcement.created` does not create duplicate executions +- Duplicate completion handling does not create duplicate inquiries + +## Phase 5: Safe Recovery Loops + +### Objective + +Make timeout and DLQ processing safe under races and multiple replicas. + +### Changes + +**`crates/executor/src/timeout_monitor.rs`** + +- Replace unconditional updates with conditional state transitions: + - `UPDATE execution SET ... WHERE id = $1 AND status = 'scheduled' ... RETURNING ...` +- Only publish completion side effects when a row was actually updated +- Consider including `updated < cutoff` in the same update statement + +**`crates/executor/src/dead_letter_handler.rs`** + +- Change failure transition to conditional update based on current state +- Do not overwrite executions that have already moved to `running` or terminal state +- Only emit side effects when the row transition succeeded + +**`crates/executor/src/service.rs`** + +- It is acceptable for these loops to run on every replica once updates are conditional +- Optional future optimization: leader election for janitor loops to reduce duplicate scans and log noise + +### Success Criteria + +- Timeout monitor cannot fail an execution that has already moved to `running` +- DLQ handler cannot overwrite newer state +- Running multiple timeout monitors produces no conflicting state transitions + +## Testing Plan + +Add focused HA tests after the repository and scheduler primitives are in place. + +### Repository tests + +- Compare-and-swap execution claim succeeds exactly once +- Conditional timeout/DLQ transition updates exactly one row or zero rows as expected +- Workflow uniqueness constraint prevents duplicate workflow state rows + +### Executor integration tests + +- Two scheduler instances processing the same `execution.requested` message only dispatch once +- Completion consumed by a different executor replica still advances the shared queue +- Duplicate workflow child completion does not create duplicate successor tasks +- Duplicate `event.created` and `enforcement.created` messages do not create duplicate downstream records + +### Failure-injection tests + +- Executor crashes after claiming but before publish +- Executor crashes after slot release but before republish +- Duplicate `execution.completed` delivery after successful workflow advancement + +## Recommended Execution Order for Next Session + +1. Add migrations and repository primitives for atomic execution claim +2. Convert scheduler to claim-first semantics +3. Implement shared DB-backed concurrency/FIFO coordination +4. Add workflow uniqueness and serialized advancement +5. Add idempotency to event/enforcement/inquiry paths +6. Fix timeout and DLQ handlers to use conditional transitions +7. Add HA-focused tests + +## Expected Outcome + +After this plan is implemented, the executor should be able to scale horizontally without relying on singleton behavior. Multiple executor replicas should be able to process work concurrently while preserving: + +- exactly-once scheduling semantics at the execution state level +- shared concurrency limits and FIFO behavior +- correct workflow orchestration +- safe replay handling +- safe recovery behavior during failures and redelivery diff --git a/migrations/20250101000006_workflow_system.sql b/migrations/20250101000006_workflow_system.sql index 254ebd4..62e07b3 100644 --- a/migrations/20250101000006_workflow_system.sql +++ b/migrations/20250101000006_workflow_system.sql @@ -67,11 +67,11 @@ CREATE TABLE workflow_execution ( paused BOOLEAN DEFAULT false NOT NULL, pause_reason TEXT, created TIMESTAMPTZ DEFAULT NOW() NOT NULL, - updated TIMESTAMPTZ DEFAULT NOW() NOT NULL + updated TIMESTAMPTZ DEFAULT NOW() NOT NULL, + CONSTRAINT uq_workflow_execution_execution UNIQUE (execution) ); -- Indexes -CREATE INDEX idx_workflow_exec_execution ON workflow_execution(execution); CREATE INDEX idx_workflow_exec_workflow_def ON workflow_execution(workflow_def); CREATE INDEX idx_workflow_exec_status ON workflow_execution(status); CREATE INDEX idx_workflow_exec_paused ON workflow_execution(paused) WHERE paused = true;