From f93e9229d203f7ce9bc2b3b6389b4515a78e444a Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Thu, 2 Apr 2026 17:15:59 -0500 Subject: [PATCH] ha executor --- crates/common/src/models.rs | 2 +- crates/common/src/repositories/analytics.rs | 4 +- crates/common/src/repositories/event.rs | 162 +++- crates/common/src/repositories/execution.rs | 464 ++++++++- .../src/repositories/execution_admission.rs | 909 ++++++++++++++++++ crates/common/src/repositories/mod.rs | 2 + crates/common/src/repositories/queue_stats.rs | 48 +- crates/common/src/repositories/workflow.rs | 20 + crates/executor/src/completion_listener.rs | 6 +- crates/executor/src/dead_letter_handler.rs | 114 ++- crates/executor/src/enforcement_processor.rs | 116 ++- crates/executor/src/event_processor.rs | 55 +- crates/executor/src/inquiry_handler.rs | 218 ++++- crates/executor/src/policy_enforcer.rs | 4 +- crates/executor/src/queue_manager.rs | 262 ++++- crates/executor/src/scheduler.rs | 220 +++-- crates/executor/src/timeout_monitor.rs | 34 +- .../tests/fifo_ordering_integration_test.rs | 109 +++ docs/plans/executor-ha-horizontal-scaling.md | 97 +- ...250101000004_trigger_sensor_event_rule.sql | 3 + ...0250101000005_execution_and_operations.sql | 70 +- migrations/20250101000006_workflow_system.sql | 61 +- .../20250101000007_supporting_systems.sql | 74 +- .../20250101000009_timescaledb_history.sql | 100 +- .../20250101000010_artifact_content.sql | 4 +- 25 files changed, 2736 insertions(+), 422 deletions(-) create mode 100644 crates/common/src/repositories/execution_admission.rs diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index 3b7d011..75d22f1 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -1412,7 +1412,7 @@ pub mod artifact { pub content_type: Option, /// Size of the latest version's content in bytes pub size_bytes: Option, - /// Execution that produced this artifact (no FK — execution is a hypertable) + /// Execution that produced this artifact (no FK by design) pub execution: Option, /// Structured JSONB data for progress artifacts or metadata pub data: Option, diff --git a/crates/common/src/repositories/analytics.rs b/crates/common/src/repositories/analytics.rs index 86e90e2..9aa1e51 100644 --- a/crates/common/src/repositories/analytics.rs +++ b/crates/common/src/repositories/analytics.rs @@ -80,7 +80,7 @@ pub struct EnforcementVolumeBucket { pub enforcement_count: i64, } -/// A single hourly bucket of execution volume (from execution hypertable directly). +/// A single hourly bucket of execution volume (from the execution table directly). #[derive(Debug, Clone, Serialize, FromRow)] pub struct ExecutionVolumeBucket { /// Start of the 1-hour bucket @@ -468,7 +468,7 @@ impl AnalyticsRepository { } // ======================================================================= - // Execution volume (from execution hypertable directly) + // Execution volume (from the execution table directly) // ======================================================================= /// Query the `execution_volume_hourly` continuous aggregate for execution diff --git a/crates/common/src/repositories/event.rs b/crates/common/src/repositories/event.rs index 086a031..b1db1e6 100644 --- a/crates/common/src/repositories/event.rs +++ b/crates/common/src/repositories/event.rs @@ -65,6 +65,12 @@ pub struct EnforcementSearchResult { pub total: u64, } +#[derive(Debug, Clone)] +pub struct EnforcementCreateOrGetResult { + pub enforcement: Enforcement, + pub created: bool, +} + /// Repository for Event operations pub struct EventRepository; @@ -493,11 +499,7 @@ impl EnforcementRepository { Ok(enforcement) } - /// Update an enforcement using the loaded row's hypertable keys. - /// - /// This avoids wide scans across compressed chunks by including both the - /// partitioning column (`created`) and compression segment key (`rule_ref`) - /// in the locator. + /// Update an enforcement using the loaded row's primary key. pub async fn update_loaded<'e, E>( executor: E, enforcement: &Enforcement, @@ -510,19 +512,73 @@ impl EnforcementRepository { return Ok(enforcement.clone()); } - let rule_ref = enforcement.rule_ref.clone(); - Self::update_with_locator(executor, input, |query| { query.push(" WHERE id = "); query.push_bind(enforcement.id); - query.push(" AND created = "); - query.push_bind(enforcement.created); - query.push(" AND rule_ref = "); - query.push_bind(rule_ref); }) .await } + pub async fn update_loaded_if_status<'e, E>( + executor: E, + enforcement: &Enforcement, + expected_status: EnforcementStatus, + input: UpdateEnforcementInput, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + if input.status.is_none() && input.payload.is_none() && input.resolved_at.is_none() { + return Ok(Some(enforcement.clone())); + } + + let mut query = QueryBuilder::new("UPDATE enforcement SET "); + let mut has_updates = false; + + if let Some(status) = input.status { + query.push("status = "); + query.push_bind(status); + has_updates = true; + } + + if let Some(payload) = &input.payload { + if has_updates { + query.push(", "); + } + query.push("payload = "); + query.push_bind(payload); + has_updates = true; + } + + if let Some(resolved_at) = input.resolved_at { + if has_updates { + query.push(", "); + } + query.push("resolved_at = "); + query.push_bind(resolved_at); + has_updates = true; + } + + if !has_updates { + return Ok(Some(enforcement.clone())); + } + + query.push(" WHERE id = "); + query.push_bind(enforcement.id); + query.push(" AND status = "); + query.push_bind(expected_status); + query.push( + " RETURNING id, rule, rule_ref, trigger_ref, config, event, status, payload, \ + condition, conditions, created, resolved_at", + ); + + query + .build_query_as::() + .fetch_optional(executor) + .await + .map_err(Into::into) + } + /// Find enforcements by rule ID pub async fn find_by_rule<'e, E>(executor: E, rule_id: Id) -> Result> where @@ -589,6 +645,90 @@ impl EnforcementRepository { Ok(enforcements) } + pub async fn find_by_rule_and_event<'e, E>( + executor: E, + rule_id: Id, + event_id: Id, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + sqlx::query_as::<_, Enforcement>( + r#" + SELECT id, rule, rule_ref, trigger_ref, config, event, status, payload, + condition, conditions, created, resolved_at + FROM enforcement + WHERE rule = $1 AND event = $2 + LIMIT 1 + "#, + ) + .bind(rule_id) + .bind(event_id) + .fetch_optional(executor) + .await + .map_err(Into::into) + } + + pub async fn create_or_get_by_rule_event<'e, E>( + executor: E, + input: CreateEnforcementInput, + ) -> Result + where + E: Executor<'e, Database = Postgres> + Copy + 'e, + { + let (Some(rule_id), Some(event_id)) = (input.rule, input.event) else { + let enforcement = Self::create(executor, input).await?; + return Ok(EnforcementCreateOrGetResult { + enforcement, + created: true, + }); + }; + + let inserted = sqlx::query_as::<_, Enforcement>( + r#" + INSERT INTO enforcement (rule, rule_ref, trigger_ref, config, event, status, + payload, condition, conditions) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (rule, event) WHERE rule IS NOT NULL AND event IS NOT NULL DO NOTHING + RETURNING id, rule, rule_ref, trigger_ref, config, event, status, payload, + condition, conditions, created, resolved_at + "#, + ) + .bind(input.rule) + .bind(&input.rule_ref) + .bind(&input.trigger_ref) + .bind(&input.config) + .bind(input.event) + .bind(input.status) + .bind(&input.payload) + .bind(input.condition) + .bind(&input.conditions) + .fetch_optional(executor) + .await?; + + if let Some(enforcement) = inserted { + return Ok(EnforcementCreateOrGetResult { + enforcement, + created: true, + }); + } + + let enforcement = Self::find_by_rule_and_event(executor, rule_id, event_id) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "enforcement for rule {} and event {} disappeared after dedupe conflict", + rule_id, + event_id + ) + })?; + + Ok(EnforcementCreateOrGetResult { + enforcement, + created: false, + }) + } + /// Search enforcements with all filters pushed into SQL. /// /// All filter fields are combinable (AND). Pagination is server-side. diff --git a/crates/common/src/repositories/execution.rs b/crates/common/src/repositories/execution.rs index 2982e0d..265605c 100644 --- a/crates/common/src/repositories/execution.rs +++ b/crates/common/src/repositories/execution.rs @@ -4,7 +4,8 @@ use chrono::{DateTime, Utc}; use crate::models::{enums::ExecutionStatus, execution::*, Id, JsonDict}; use crate::Result; -use sqlx::{Executor, Postgres, QueryBuilder}; +use sqlx::{Executor, PgConnection, PgPool, Postgres, QueryBuilder}; +use tokio::time::{sleep, Duration}; use super::{Create, Delete, FindById, List, Repository, Update}; @@ -47,6 +48,12 @@ pub struct WorkflowTaskExecutionCreateOrGetResult { pub created: bool, } +#[derive(Debug, Clone)] +pub struct EnforcementExecutionCreateOrGetResult { + pub execution: Execution, + pub created: bool, +} + /// An execution row with optional `rule_ref` / `trigger_ref` populated from /// the joined `enforcement` table. This avoids a separate in-memory lookup. #[derive(Debug, Clone, sqlx::FromRow)] @@ -215,32 +222,392 @@ impl Update for ExecutionRepository { } impl ExecutionRepository { - pub async fn create_workflow_task_if_absent<'e, E>( + pub async fn find_top_level_by_enforcement<'e, E>( executor: E, + enforcement_id: Id, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + let sql = format!( + "SELECT {SELECT_COLUMNS} \ + FROM execution \ + WHERE enforcement = $1 + AND parent IS NULL + AND (config IS NULL OR NOT (config ? 'retry_of')) \ + ORDER BY created ASC \ + LIMIT 1" + ); + + sqlx::query_as::<_, Execution>(&sql) + .bind(enforcement_id) + .fetch_optional(executor) + .await + .map_err(Into::into) + } + + pub async fn create_top_level_for_enforcement_if_absent<'e, E>( + executor: E, + input: CreateExecutionInput, + enforcement_id: Id, + ) -> Result + where + E: Executor<'e, Database = Postgres> + Copy + 'e, + { + let inserted = sqlx::query_as::<_, Execution>(&format!( + "INSERT INTO execution \ + (action, action_ref, config, env_vars, parent, enforcement, executor, worker, status, result, workflow_task) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ + ON CONFLICT (enforcement) + WHERE enforcement IS NOT NULL + AND parent IS NULL + AND (config IS NULL OR NOT (config ? 'retry_of')) + DO NOTHING \ + RETURNING {SELECT_COLUMNS}" + )) + .bind(input.action) + .bind(&input.action_ref) + .bind(&input.config) + .bind(&input.env_vars) + .bind(input.parent) + .bind(input.enforcement) + .bind(input.executor) + .bind(input.worker) + .bind(input.status) + .bind(&input.result) + .bind(sqlx::types::Json(&input.workflow_task)) + .fetch_optional(executor) + .await?; + + if let Some(execution) = inserted { + return Ok(EnforcementExecutionCreateOrGetResult { + execution, + created: true, + }); + } + + let execution = Self::find_top_level_by_enforcement(executor, enforcement_id) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "top-level execution for enforcement {} disappeared after dedupe conflict", + enforcement_id + ) + })?; + + Ok(EnforcementExecutionCreateOrGetResult { + execution, + created: false, + }) + } + + async fn claim_workflow_task_dispatch<'e, E>( + executor: E, + workflow_execution_id: Id, + task_name: &str, + task_index: Option, + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { + let inserted: Option<(i64,)> = sqlx::query_as( + "INSERT INTO workflow_task_dispatch (workflow_execution, task_name, task_index) + VALUES ($1, $2, $3) + ON CONFLICT (workflow_execution, task_name, COALESCE(task_index, -1)) DO NOTHING + RETURNING id", + ) + .bind(workflow_execution_id) + .bind(task_name) + .bind(task_index) + .fetch_optional(executor) + .await?; + + Ok(inserted.is_some()) + } + + async fn assign_workflow_task_dispatch_execution<'e, E>( + executor: E, + workflow_execution_id: Id, + task_name: &str, + task_index: Option, + execution_id: Id, + ) -> Result<()> + where + E: Executor<'e, Database = Postgres> + 'e, + { + sqlx::query( + "UPDATE workflow_task_dispatch + SET execution_id = COALESCE(execution_id, $4) + WHERE workflow_execution = $1 + AND task_name = $2 + AND task_index IS NOT DISTINCT FROM $3", + ) + .bind(workflow_execution_id) + .bind(task_name) + .bind(task_index) + .bind(execution_id) + .execute(executor) + .await?; + + Ok(()) + } + + async fn lock_workflow_task_dispatch<'e, E>( + executor: E, + workflow_execution_id: Id, + task_name: &str, + task_index: Option, + ) -> Result>> + where + E: Executor<'e, Database = Postgres> + 'e, + { + let row: Option<(Option,)> = sqlx::query_as( + "SELECT execution_id + FROM workflow_task_dispatch + WHERE workflow_execution = $1 + AND task_name = $2 + AND task_index IS NOT DISTINCT FROM $3 + FOR UPDATE", + ) + .bind(workflow_execution_id) + .bind(task_name) + .bind(task_index) + .fetch_optional(executor) + .await?; + + // Map the outer Option to distinguish three cases: + // - None → no row exists + // - Some(None) → row exists but execution_id is still NULL (mid-creation) + // - Some(Some(id)) → row exists with a completed execution_id + Ok(row.map(|(execution_id,)| execution_id)) + } + + async fn create_workflow_task_if_absent_in_conn( + conn: &mut PgConnection, input: CreateExecutionInput, workflow_execution_id: Id, task_name: &str, task_index: Option, - ) -> Result - where - E: Executor<'e, Database = Postgres> + Copy + 'e, - { - if let Some(execution) = - Self::find_by_workflow_task(executor, workflow_execution_id, task_name, task_index) - .await? - { + ) -> Result { + let claimed = Self::claim_workflow_task_dispatch( + &mut *conn, + workflow_execution_id, + task_name, + task_index, + ) + .await?; + + if claimed { + let execution = Self::create(&mut *conn, input).await?; + Self::assign_workflow_task_dispatch_execution( + &mut *conn, + workflow_execution_id, + task_name, + task_index, + execution.id, + ) + .await?; + return Ok(WorkflowTaskExecutionCreateOrGetResult { execution, - created: false, + created: true, }); } - let execution = Self::create(executor, input).await?; + let dispatch_state = Self::lock_workflow_task_dispatch( + &mut *conn, + workflow_execution_id, + task_name, + task_index, + ) + .await?; - Ok(WorkflowTaskExecutionCreateOrGetResult { - execution, - created: true, - }) + match dispatch_state { + Some(Some(existing_execution_id)) => { + // Row exists with execution_id — return the existing execution. + let execution = Self::find_by_id(&mut *conn, existing_execution_id) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "workflow child execution {} missing for workflow_execution {} task '{}' index {:?}", + existing_execution_id, + workflow_execution_id, + task_name, + task_index + ) + })?; + + Ok(WorkflowTaskExecutionCreateOrGetResult { + execution, + created: false, + }) + } + + Some(None) => { + // Row exists but execution_id is still NULL: another transaction is + // mid-creation (between claim and assign). Retry until it's filled in. + // If the original creator's transaction rolled back, the row also + // disappears — handled by the `None` branch inside the loop. + 'wait: { + for _ in 0..20_u32 { + sleep(Duration::from_millis(50)).await; + match Self::lock_workflow_task_dispatch( + &mut *conn, + workflow_execution_id, + task_name, + task_index, + ) + .await? + { + Some(Some(execution_id)) => { + let execution = + Self::find_by_id(&mut *conn, execution_id).await?.ok_or_else( + || { + anyhow::anyhow!( + "workflow child execution {} missing for workflow_execution {} task '{}' index {:?}", + execution_id, + workflow_execution_id, + task_name, + task_index + ) + }, + )?; + return Ok(WorkflowTaskExecutionCreateOrGetResult { + execution, + created: false, + }); + } + Some(None) => {} // still NULL, keep waiting + None => break 'wait, // row rolled back; fall through to re-claim + } + } + // Exhausted all retries without the execution_id being set. + return Err(anyhow::anyhow!( + "Timed out waiting for workflow task dispatch execution_id to be set \ + for workflow_execution {} task '{}' index {:?}", + workflow_execution_id, + task_name, + task_index + ) + .into()); + } + + // Row disappeared (original creator rolled back) — re-claim and create. + let re_claimed = Self::claim_workflow_task_dispatch( + &mut *conn, + workflow_execution_id, + task_name, + task_index, + ) + .await?; + if !re_claimed { + return Err(anyhow::anyhow!( + "Workflow task dispatch for workflow_execution {} task '{}' index {:?} \ + was reclaimed by another executor after rollback", + workflow_execution_id, + task_name, + task_index + ) + .into()); + } + let execution = Self::create(&mut *conn, input).await?; + Self::assign_workflow_task_dispatch_execution( + &mut *conn, + workflow_execution_id, + task_name, + task_index, + execution.id, + ) + .await?; + Ok(WorkflowTaskExecutionCreateOrGetResult { + execution, + created: true, + }) + } + + None => { + // No row at all — the original INSERT was rolled back before we arrived. + // Attempt to re-claim and create as if this were a fresh dispatch. + let re_claimed = Self::claim_workflow_task_dispatch( + &mut *conn, + workflow_execution_id, + task_name, + task_index, + ) + .await?; + if !re_claimed { + return Err(anyhow::anyhow!( + "Workflow task dispatch for workflow_execution {} task '{}' index {:?} \ + was claimed by another executor", + workflow_execution_id, + task_name, + task_index + ) + .into()); + } + let execution = Self::create(&mut *conn, input).await?; + Self::assign_workflow_task_dispatch_execution( + &mut *conn, + workflow_execution_id, + task_name, + task_index, + execution.id, + ) + .await?; + Ok(WorkflowTaskExecutionCreateOrGetResult { + execution, + created: true, + }) + } + } + } + + pub async fn create_workflow_task_if_absent( + pool: &PgPool, + input: CreateExecutionInput, + workflow_execution_id: Id, + task_name: &str, + task_index: Option, + ) -> Result { + let mut conn = pool.acquire().await?; + sqlx::query("BEGIN").execute(&mut *conn).await?; + + let result = Self::create_workflow_task_if_absent_in_conn( + &mut conn, + input, + workflow_execution_id, + task_name, + task_index, + ) + .await; + + match result { + Ok(result) => { + sqlx::query("COMMIT").execute(&mut *conn).await?; + Ok(result) + } + Err(err) => { + sqlx::query("ROLLBACK").execute(&mut *conn).await?; + Err(err) + } + } + } + + pub async fn create_workflow_task_if_absent_with_conn( + conn: &mut PgConnection, + input: CreateExecutionInput, + workflow_execution_id: Id, + task_name: &str, + task_index: Option, + ) -> Result { + Self::create_workflow_task_if_absent_in_conn( + conn, + input, + workflow_execution_id, + task_name, + task_index, + ) + .await } pub async fn claim_for_scheduling<'e, E>( @@ -320,6 +687,62 @@ impl ExecutionRepository { .await } + pub async fn update_if_status_and_updated_before<'e, E>( + executor: E, + id: Id, + expected_status: ExecutionStatus, + stale_before: DateTime, + input: UpdateExecutionInput, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + if input.status.is_none() + && input.result.is_none() + && input.executor.is_none() + && input.worker.is_none() + && input.started_at.is_none() + && input.workflow_task.is_none() + { + return Self::find_by_id(executor, id).await; + } + + Self::update_with_locator_optional(executor, input, |query| { + query.push(" WHERE id = ").push_bind(id); + query.push(" AND status = ").push_bind(expected_status); + query.push(" AND updated < ").push_bind(stale_before); + }) + .await + } + + pub async fn update_if_status_and_updated_at<'e, E>( + executor: E, + id: Id, + expected_status: ExecutionStatus, + expected_updated: DateTime, + input: UpdateExecutionInput, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + if input.status.is_none() + && input.result.is_none() + && input.executor.is_none() + && input.worker.is_none() + && input.started_at.is_none() + && input.workflow_task.is_none() + { + return Self::find_by_id(executor, id).await; + } + + Self::update_with_locator_optional(executor, input, |query| { + query.push(" WHERE id = ").push_bind(id); + query.push(" AND status = ").push_bind(expected_status); + query.push(" AND updated = ").push_bind(expected_updated); + }) + .await + } + pub async fn revert_scheduled_to_requested<'e, E>( executor: E, id: Id, @@ -473,10 +896,7 @@ impl ExecutionRepository { .map_err(Into::into) } - /// Update an execution using the loaded row's hypertable keys. - /// - /// Including both the partition key (`created`) and compression segment key - /// (`action_ref`) avoids broad scans across compressed chunks. + /// Update an execution using the loaded row's primary key. pub async fn update_loaded<'e, E>( executor: E, execution: &Execution, @@ -495,12 +915,8 @@ impl ExecutionRepository { return Ok(execution.clone()); } - let action_ref = execution.action_ref.clone(); - Self::update_with_locator(executor, input, |query| { query.push(" WHERE id = ").push_bind(execution.id); - query.push(" AND created = ").push_bind(execution.created); - query.push(" AND action_ref = ").push_bind(action_ref); }) .await } diff --git a/crates/common/src/repositories/execution_admission.rs b/crates/common/src/repositories/execution_admission.rs new file mode 100644 index 0000000..360a7c0 --- /dev/null +++ b/crates/common/src/repositories/execution_admission.rs @@ -0,0 +1,909 @@ +use chrono::{DateTime, Utc}; +use sqlx::{PgPool, Postgres, Row, Transaction}; + +use crate::error::Result; +use crate::models::Id; +use crate::repositories::queue_stats::{QueueStatsRepository, UpsertQueueStatsInput}; + +#[derive(Debug, Clone)] +pub struct AdmissionSlotAcquireOutcome { + pub acquired: bool, + pub current_count: u32, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AdmissionEnqueueOutcome { + Acquired, + Enqueued, +} + +#[derive(Debug, Clone)] +pub struct AdmissionSlotReleaseOutcome { + pub action_id: Id, + pub group_key: Option, + pub next_execution_id: Option, +} + +#[derive(Debug, Clone)] +pub struct AdmissionQueuedRemovalOutcome { + pub action_id: Id, + pub group_key: Option, + pub next_execution_id: Option, + pub execution_id: Id, + pub queue_order: i64, + pub enqueued_at: DateTime, + pub removed_index: usize, +} + +#[derive(Debug, Clone)] +pub struct AdmissionQueueStats { + pub action_id: Id, + pub queue_length: usize, + pub active_count: u32, + pub max_concurrent: u32, + pub oldest_enqueued_at: Option>, + pub total_enqueued: u64, + pub total_completed: u64, +} + +#[derive(Debug, Clone)] +struct AdmissionState { + id: Id, + action_id: Id, + group_key: Option, + max_concurrent: i32, +} + +#[derive(Debug, Clone)] +struct ExecutionEntry { + state_id: Id, + action_id: Id, + group_key: Option, + status: String, + queue_order: i64, + enqueued_at: DateTime, +} + +pub struct ExecutionAdmissionRepository; + +impl ExecutionAdmissionRepository { + pub async fn enqueue( + pool: &PgPool, + max_queue_length: usize, + action_id: Id, + execution_id: Id, + max_concurrent: u32, + group_key: Option, + ) -> Result { + let mut tx = pool.begin().await?; + let state = Self::lock_state(&mut tx, action_id, group_key, max_concurrent).await?; + let outcome = + Self::enqueue_in_state(&mut tx, &state, max_queue_length, execution_id, true).await?; + Self::refresh_queue_stats(&mut tx, action_id).await?; + tx.commit().await?; + Ok(outcome) + } + + pub async fn wait_status(pool: &PgPool, execution_id: Id) -> Result> { + let row = sqlx::query_scalar::( + r#" + SELECT status = 'active' + FROM execution_admission_entry + WHERE execution_id = $1 + "#, + ) + .bind(execution_id) + .fetch_optional(pool) + .await?; + + Ok(row) + } + + pub async fn try_acquire( + pool: &PgPool, + action_id: Id, + execution_id: Id, + max_concurrent: u32, + group_key: Option, + ) -> Result { + let mut tx = pool.begin().await?; + let state = Self::lock_state(&mut tx, action_id, group_key, max_concurrent).await?; + let active_count = Self::active_count(&mut tx, state.id).await? as u32; + + let outcome = match Self::find_execution_entry(&mut tx, execution_id).await? { + Some(entry) if entry.status == "active" => AdmissionSlotAcquireOutcome { + acquired: true, + current_count: active_count, + }, + Some(entry) if entry.status == "queued" && entry.state_id == state.id => { + let promoted = + Self::maybe_promote_existing_queued(&mut tx, &state, execution_id).await?; + AdmissionSlotAcquireOutcome { + acquired: promoted, + current_count: active_count, + } + } + Some(_) => AdmissionSlotAcquireOutcome { + acquired: false, + current_count: active_count, + }, + None => { + if active_count < max_concurrent + && Self::queued_count(&mut tx, state.id).await? == 0 + { + let queue_order = Self::allocate_queue_order(&mut tx, state.id).await?; + Self::insert_entry( + &mut tx, + state.id, + execution_id, + "active", + queue_order, + Utc::now(), + ) + .await?; + Self::increment_total_enqueued(&mut tx, state.id).await?; + Self::refresh_queue_stats(&mut tx, action_id).await?; + AdmissionSlotAcquireOutcome { + acquired: true, + current_count: active_count, + } + } else { + AdmissionSlotAcquireOutcome { + acquired: false, + current_count: active_count, + } + } + } + }; + + tx.commit().await?; + Ok(outcome) + } + + pub async fn release_active_slot( + pool: &PgPool, + execution_id: Id, + ) -> Result> { + let mut tx = pool.begin().await?; + let Some(entry) = Self::find_execution_entry_for_update(&mut tx, execution_id).await? + else { + tx.commit().await?; + return Ok(None); + }; + + if entry.status != "active" { + tx.commit().await?; + return Ok(None); + } + + let state = Self::lock_existing_state(&mut tx, entry.action_id, entry.group_key.clone()) + .await? + .ok_or_else(|| { + crate::Error::internal("missing execution_admission_state for active execution") + })?; + + sqlx::query("DELETE FROM execution_admission_entry WHERE execution_id = $1") + .bind(execution_id) + .execute(&mut *tx) + .await?; + + Self::increment_total_completed(&mut tx, state.id).await?; + + let next_execution_id = Self::promote_next_queued(&mut tx, &state).await?; + Self::refresh_queue_stats(&mut tx, state.action_id).await?; + tx.commit().await?; + + Ok(Some(AdmissionSlotReleaseOutcome { + action_id: state.action_id, + group_key: state.group_key, + next_execution_id, + })) + } + + pub async fn restore_active_slot( + pool: &PgPool, + execution_id: Id, + outcome: &AdmissionSlotReleaseOutcome, + ) -> Result<()> { + let mut tx = pool.begin().await?; + let state = + Self::lock_existing_state(&mut tx, outcome.action_id, outcome.group_key.clone()) + .await? + .ok_or_else(|| { + crate::Error::internal("missing execution_admission_state on restore") + })?; + + if let Some(next_execution_id) = outcome.next_execution_id { + sqlx::query( + r#" + UPDATE execution_admission_entry + SET status = 'queued', activated_at = NULL + WHERE execution_id = $1 + AND state_id = $2 + AND status = 'active' + "#, + ) + .bind(next_execution_id) + .bind(state.id) + .execute(&mut *tx) + .await?; + } + + sqlx::query( + r#" + INSERT INTO execution_admission_entry ( + state_id, execution_id, status, queue_order, enqueued_at, activated_at + ) VALUES ($1, $2, 'active', $3, NOW(), NOW()) + ON CONFLICT (execution_id) DO UPDATE + SET state_id = EXCLUDED.state_id, + status = 'active', + activated_at = EXCLUDED.activated_at + "#, + ) + .bind(state.id) + .bind(execution_id) + .bind(Self::allocate_queue_order(&mut tx, state.id).await?) + .execute(&mut *tx) + .await?; + + sqlx::query( + r#" + UPDATE execution_admission_state + SET total_completed = GREATEST(total_completed - 1, 0) + WHERE id = $1 + "#, + ) + .bind(state.id) + .execute(&mut *tx) + .await?; + + Self::refresh_queue_stats(&mut tx, state.action_id).await?; + tx.commit().await?; + Ok(()) + } + + pub async fn remove_queued_execution( + pool: &PgPool, + execution_id: Id, + ) -> Result> { + let mut tx = pool.begin().await?; + let Some(entry) = Self::find_execution_entry_for_update(&mut tx, execution_id).await? + else { + tx.commit().await?; + return Ok(None); + }; + + if entry.status != "queued" { + tx.commit().await?; + return Ok(None); + } + + let state = Self::lock_existing_state(&mut tx, entry.action_id, entry.group_key.clone()) + .await? + .ok_or_else(|| { + crate::Error::internal("missing execution_admission_state for queued execution") + })?; + + let removed_index = sqlx::query_scalar::( + r#" + SELECT COUNT(*) + FROM execution_admission_entry + WHERE state_id = $1 + AND status = 'queued' + AND (enqueued_at, id) < ( + SELECT enqueued_at, id + FROM execution_admission_entry + WHERE execution_id = $2 + ) + "#, + ) + .bind(state.id) + .bind(execution_id) + .fetch_one(&mut *tx) + .await? as usize; + + sqlx::query("DELETE FROM execution_admission_entry WHERE execution_id = $1") + .bind(execution_id) + .execute(&mut *tx) + .await?; + + let next_execution_id = + if Self::active_count(&mut tx, state.id).await? < state.max_concurrent as i64 { + Self::promote_next_queued(&mut tx, &state).await? + } else { + None + }; + + Self::refresh_queue_stats(&mut tx, state.action_id).await?; + tx.commit().await?; + + Ok(Some(AdmissionQueuedRemovalOutcome { + action_id: state.action_id, + group_key: state.group_key, + next_execution_id, + execution_id, + queue_order: entry.queue_order, + enqueued_at: entry.enqueued_at, + removed_index, + })) + } + + pub async fn restore_queued_execution( + pool: &PgPool, + outcome: &AdmissionQueuedRemovalOutcome, + ) -> Result<()> { + let mut tx = pool.begin().await?; + let state = + Self::lock_existing_state(&mut tx, outcome.action_id, outcome.group_key.clone()) + .await? + .ok_or_else(|| { + crate::Error::internal("missing execution_admission_state on queued restore") + })?; + + if let Some(next_execution_id) = outcome.next_execution_id { + sqlx::query( + r#" + UPDATE execution_admission_entry + SET status = 'queued', activated_at = NULL + WHERE execution_id = $1 + AND state_id = $2 + AND status = 'active' + "#, + ) + .bind(next_execution_id) + .bind(state.id) + .execute(&mut *tx) + .await?; + } + + sqlx::query( + r#" + INSERT INTO execution_admission_entry ( + state_id, execution_id, status, queue_order, enqueued_at, activated_at + ) VALUES ($1, $2, 'queued', $3, $4, NULL) + ON CONFLICT (execution_id) DO NOTHING + "#, + ) + .bind(state.id) + .bind(outcome.execution_id) + .bind(outcome.queue_order) + .bind(outcome.enqueued_at) + .execute(&mut *tx) + .await?; + + Self::refresh_queue_stats(&mut tx, state.action_id).await?; + tx.commit().await?; + Ok(()) + } + + pub async fn get_queue_stats( + pool: &PgPool, + action_id: Id, + ) -> Result> { + let row = sqlx::query( + r#" + WITH state_rows AS ( + SELECT + COUNT(*) AS state_count, + COALESCE(SUM(max_concurrent), 0) AS max_concurrent, + COALESCE(SUM(total_enqueued), 0) AS total_enqueued, + COALESCE(SUM(total_completed), 0) AS total_completed + FROM execution_admission_state + WHERE action_id = $1 + ), + entry_rows AS ( + SELECT + COUNT(*) FILTER (WHERE e.status = 'queued') AS queue_length, + COUNT(*) FILTER (WHERE e.status = 'active') AS active_count, + MIN(e.enqueued_at) FILTER (WHERE e.status = 'queued') AS oldest_enqueued_at + FROM execution_admission_state s + LEFT JOIN execution_admission_entry e ON e.state_id = s.id + WHERE s.action_id = $1 + ) + SELECT + sr.state_count, + er.queue_length, + er.active_count, + sr.max_concurrent, + er.oldest_enqueued_at, + sr.total_enqueued, + sr.total_completed + FROM state_rows sr + CROSS JOIN entry_rows er + "#, + ) + .bind(action_id) + .fetch_one(pool) + .await?; + + let state_count: i64 = row.try_get("state_count")?; + if state_count == 0 { + return Ok(None); + } + + Ok(Some(AdmissionQueueStats { + action_id, + queue_length: row.try_get::("queue_length")? as usize, + active_count: row.try_get::("active_count")? as u32, + max_concurrent: row.try_get::("max_concurrent")? as u32, + oldest_enqueued_at: row.try_get("oldest_enqueued_at")?, + total_enqueued: row.try_get::("total_enqueued")? as u64, + total_completed: row.try_get::("total_completed")? as u64, + })) + } + + async fn enqueue_in_state( + tx: &mut Transaction<'_, Postgres>, + state: &AdmissionState, + max_queue_length: usize, + execution_id: Id, + allow_queue: bool, + ) -> Result { + if let Some(entry) = Self::find_execution_entry(tx, execution_id).await? { + if entry.status == "active" { + return Ok(AdmissionEnqueueOutcome::Acquired); + } + + if entry.status == "queued" && entry.state_id == state.id { + if Self::maybe_promote_existing_queued(tx, state, execution_id).await? { + return Ok(AdmissionEnqueueOutcome::Acquired); + } + return Ok(AdmissionEnqueueOutcome::Enqueued); + } + + return Ok(AdmissionEnqueueOutcome::Enqueued); + } + + let active_count = Self::active_count(tx, state.id).await?; + let queued_count = Self::queued_count(tx, state.id).await?; + + if active_count < state.max_concurrent as i64 && queued_count == 0 { + let queue_order = Self::allocate_queue_order(tx, state.id).await?; + Self::insert_entry( + tx, + state.id, + execution_id, + "active", + queue_order, + Utc::now(), + ) + .await?; + Self::increment_total_enqueued(tx, state.id).await?; + return Ok(AdmissionEnqueueOutcome::Acquired); + } + + if !allow_queue { + return Ok(AdmissionEnqueueOutcome::Enqueued); + } + + if queued_count >= max_queue_length as i64 { + return Err(anyhow::anyhow!( + "Queue full for action {}: maximum {} entries", + state.action_id, + max_queue_length + ) + .into()); + } + + let queue_order = Self::allocate_queue_order(tx, state.id).await?; + Self::insert_entry( + tx, + state.id, + execution_id, + "queued", + queue_order, + Utc::now(), + ) + .await?; + Self::increment_total_enqueued(tx, state.id).await?; + Ok(AdmissionEnqueueOutcome::Enqueued) + } + + async fn maybe_promote_existing_queued( + tx: &mut Transaction<'_, Postgres>, + state: &AdmissionState, + execution_id: Id, + ) -> Result { + let active_count = Self::active_count(tx, state.id).await?; + if active_count >= state.max_concurrent as i64 { + return Ok(false); + } + + let front_execution_id = sqlx::query_scalar::( + r#" + SELECT execution_id + FROM execution_admission_entry + WHERE state_id = $1 + AND status = 'queued' + ORDER BY queue_order ASC + LIMIT 1 + "#, + ) + .bind(state.id) + .fetch_optional(&mut **tx) + .await?; + + if front_execution_id != Some(execution_id) { + return Ok(false); + } + + sqlx::query( + r#" + UPDATE execution_admission_entry + SET status = 'active', + activated_at = NOW() + WHERE execution_id = $1 + AND state_id = $2 + AND status = 'queued' + "#, + ) + .bind(execution_id) + .bind(state.id) + .execute(&mut **tx) + .await?; + + Ok(true) + } + + async fn promote_next_queued( + tx: &mut Transaction<'_, Postgres>, + state: &AdmissionState, + ) -> Result> { + let next_execution_id = sqlx::query_scalar::( + r#" + SELECT execution_id + FROM execution_admission_entry + WHERE state_id = $1 + AND status = 'queued' + ORDER BY queue_order ASC + LIMIT 1 + "#, + ) + .bind(state.id) + .fetch_optional(&mut **tx) + .await?; + + if let Some(next_execution_id) = next_execution_id { + sqlx::query( + r#" + UPDATE execution_admission_entry + SET status = 'active', + activated_at = NOW() + WHERE execution_id = $1 + AND state_id = $2 + AND status = 'queued' + "#, + ) + .bind(next_execution_id) + .bind(state.id) + .execute(&mut **tx) + .await?; + } + + Ok(next_execution_id) + } + + async fn lock_state( + tx: &mut Transaction<'_, Postgres>, + action_id: Id, + group_key: Option, + max_concurrent: u32, + ) -> Result { + sqlx::query( + r#" + INSERT INTO execution_admission_state (action_id, group_key, max_concurrent) + VALUES ($1, $2, $3) + ON CONFLICT (action_id, group_key_normalized) + DO UPDATE SET max_concurrent = EXCLUDED.max_concurrent + "#, + ) + .bind(action_id) + .bind(group_key.clone()) + .bind(max_concurrent as i32) + .execute(&mut **tx) + .await?; + + let state = sqlx::query( + r#" + SELECT id, action_id, group_key, max_concurrent + FROM execution_admission_state + WHERE action_id = $1 + AND group_key_normalized = COALESCE($2, '') + FOR UPDATE + "#, + ) + .bind(action_id) + .bind(group_key) + .fetch_one(&mut **tx) + .await?; + + Ok(AdmissionState { + id: state.try_get("id")?, + action_id: state.try_get("action_id")?, + group_key: state.try_get("group_key")?, + max_concurrent: state.try_get("max_concurrent")?, + }) + } + + async fn lock_existing_state( + tx: &mut Transaction<'_, Postgres>, + action_id: Id, + group_key: Option, + ) -> Result> { + let row = sqlx::query( + r#" + SELECT id, action_id, group_key, max_concurrent + FROM execution_admission_state + WHERE action_id = $1 + AND group_key_normalized = COALESCE($2, '') + FOR UPDATE + "#, + ) + .bind(action_id) + .bind(group_key) + .fetch_optional(&mut **tx) + .await?; + + Ok(row.map(|state| AdmissionState { + id: state.try_get("id").expect("state.id"), + action_id: state.try_get("action_id").expect("state.action_id"), + group_key: state.try_get("group_key").expect("state.group_key"), + max_concurrent: state + .try_get("max_concurrent") + .expect("state.max_concurrent"), + })) + } + + async fn find_execution_entry( + tx: &mut Transaction<'_, Postgres>, + execution_id: Id, + ) -> Result> { + let row = sqlx::query( + r#" + SELECT + e.state_id, + s.action_id, + s.group_key, + e.execution_id, + e.status, + e.queue_order, + e.enqueued_at + FROM execution_admission_entry e + JOIN execution_admission_state s ON s.id = e.state_id + WHERE e.execution_id = $1 + "#, + ) + .bind(execution_id) + .fetch_optional(&mut **tx) + .await?; + + Ok(row.map(|entry| ExecutionEntry { + state_id: entry.try_get("state_id").expect("entry.state_id"), + action_id: entry.try_get("action_id").expect("entry.action_id"), + group_key: entry.try_get("group_key").expect("entry.group_key"), + status: entry.try_get("status").expect("entry.status"), + queue_order: entry.try_get("queue_order").expect("entry.queue_order"), + enqueued_at: entry.try_get("enqueued_at").expect("entry.enqueued_at"), + })) + } + + async fn find_execution_entry_for_update( + tx: &mut Transaction<'_, Postgres>, + execution_id: Id, + ) -> Result> { + let row = sqlx::query( + r#" + SELECT + e.state_id, + s.action_id, + s.group_key, + e.execution_id, + e.status, + e.queue_order, + e.enqueued_at + FROM execution_admission_entry e + JOIN execution_admission_state s ON s.id = e.state_id + WHERE e.execution_id = $1 + FOR UPDATE OF e, s + "#, + ) + .bind(execution_id) + .fetch_optional(&mut **tx) + .await?; + + Ok(row.map(|entry| ExecutionEntry { + state_id: entry.try_get("state_id").expect("entry.state_id"), + action_id: entry.try_get("action_id").expect("entry.action_id"), + group_key: entry.try_get("group_key").expect("entry.group_key"), + status: entry.try_get("status").expect("entry.status"), + queue_order: entry.try_get("queue_order").expect("entry.queue_order"), + enqueued_at: entry.try_get("enqueued_at").expect("entry.enqueued_at"), + })) + } + + async fn active_count(tx: &mut Transaction<'_, Postgres>, state_id: Id) -> Result { + Ok(sqlx::query_scalar::( + r#" + SELECT COUNT(*) + FROM execution_admission_entry + WHERE state_id = $1 + AND status = 'active' + "#, + ) + .bind(state_id) + .fetch_one(&mut **tx) + .await?) + } + + async fn queued_count(tx: &mut Transaction<'_, Postgres>, state_id: Id) -> Result { + Ok(sqlx::query_scalar::( + r#" + SELECT COUNT(*) + FROM execution_admission_entry + WHERE state_id = $1 + AND status = 'queued' + "#, + ) + .bind(state_id) + .fetch_one(&mut **tx) + .await?) + } + + async fn insert_entry( + tx: &mut Transaction<'_, Postgres>, + state_id: Id, + execution_id: Id, + status: &str, + queue_order: i64, + enqueued_at: DateTime, + ) -> Result<()> { + sqlx::query( + r#" + INSERT INTO execution_admission_entry ( + state_id, execution_id, status, queue_order, enqueued_at, activated_at + ) VALUES ( + $1, $2, $3, $4, $5, + CASE WHEN $3 = 'active' THEN NOW() ELSE NULL END + ) + "#, + ) + .bind(state_id) + .bind(execution_id) + .bind(status) + .bind(queue_order) + .bind(enqueued_at) + .execute(&mut **tx) + .await?; + + Ok(()) + } + + async fn allocate_queue_order(tx: &mut Transaction<'_, Postgres>, state_id: Id) -> Result { + let queue_order = sqlx::query_scalar::( + r#" + UPDATE execution_admission_state + SET next_queue_order = next_queue_order + 1 + WHERE id = $1 + RETURNING next_queue_order - 1 + "#, + ) + .bind(state_id) + .fetch_one(&mut **tx) + .await?; + + Ok(queue_order) + } + + async fn increment_total_enqueued( + tx: &mut Transaction<'_, Postgres>, + state_id: Id, + ) -> Result<()> { + sqlx::query( + r#" + UPDATE execution_admission_state + SET total_enqueued = total_enqueued + 1 + WHERE id = $1 + "#, + ) + .bind(state_id) + .execute(&mut **tx) + .await?; + Ok(()) + } + + async fn increment_total_completed( + tx: &mut Transaction<'_, Postgres>, + state_id: Id, + ) -> Result<()> { + sqlx::query( + r#" + UPDATE execution_admission_state + SET total_completed = total_completed + 1 + WHERE id = $1 + "#, + ) + .bind(state_id) + .execute(&mut **tx) + .await?; + Ok(()) + } + + async fn refresh_queue_stats(tx: &mut Transaction<'_, Postgres>, action_id: Id) -> Result<()> { + let Some(stats) = Self::get_queue_stats_from_tx(tx, action_id).await? else { + QueueStatsRepository::delete(&mut **tx, action_id).await?; + return Ok(()); + }; + + QueueStatsRepository::upsert( + &mut **tx, + UpsertQueueStatsInput { + action_id, + queue_length: stats.queue_length as i32, + active_count: stats.active_count as i32, + max_concurrent: stats.max_concurrent as i32, + oldest_enqueued_at: stats.oldest_enqueued_at, + total_enqueued: stats.total_enqueued as i64, + total_completed: stats.total_completed as i64, + }, + ) + .await?; + + Ok(()) + } + + async fn get_queue_stats_from_tx( + tx: &mut Transaction<'_, Postgres>, + action_id: Id, + ) -> Result> { + let row = sqlx::query( + r#" + WITH state_rows AS ( + SELECT + COUNT(*) AS state_count, + COALESCE(SUM(max_concurrent), 0) AS max_concurrent, + COALESCE(SUM(total_enqueued), 0) AS total_enqueued, + COALESCE(SUM(total_completed), 0) AS total_completed + FROM execution_admission_state + WHERE action_id = $1 + ), + entry_rows AS ( + SELECT + COUNT(*) FILTER (WHERE e.status = 'queued') AS queue_length, + COUNT(*) FILTER (WHERE e.status = 'active') AS active_count, + MIN(e.enqueued_at) FILTER (WHERE e.status = 'queued') AS oldest_enqueued_at + FROM execution_admission_state s + LEFT JOIN execution_admission_entry e ON e.state_id = s.id + WHERE s.action_id = $1 + ) + SELECT + sr.state_count, + er.queue_length, + er.active_count, + sr.max_concurrent, + er.oldest_enqueued_at, + sr.total_enqueued, + sr.total_completed + FROM state_rows sr + CROSS JOIN entry_rows er + "#, + ) + .bind(action_id) + .fetch_one(&mut **tx) + .await?; + + let state_count: i64 = row.try_get("state_count")?; + if state_count == 0 { + return Ok(None); + } + + Ok(Some(AdmissionQueueStats { + action_id, + queue_length: row.try_get::("queue_length")? as usize, + active_count: row.try_get::("active_count")? as u32, + max_concurrent: row.try_get::("max_concurrent")? as u32, + oldest_enqueued_at: row.try_get("oldest_enqueued_at")?, + total_enqueued: row.try_get::("total_enqueued")? as u64, + total_completed: row.try_get::("total_completed")? as u64, + })) + } +} diff --git a/crates/common/src/repositories/mod.rs b/crates/common/src/repositories/mod.rs index dd17333..d6b7ea2 100644 --- a/crates/common/src/repositories/mod.rs +++ b/crates/common/src/repositories/mod.rs @@ -33,6 +33,7 @@ pub mod artifact; pub mod entity_history; pub mod event; pub mod execution; +pub mod execution_admission; pub mod identity; pub mod inquiry; pub mod key; @@ -53,6 +54,7 @@ pub use artifact::{ArtifactRepository, ArtifactVersionRepository}; pub use entity_history::EntityHistoryRepository; pub use event::{EnforcementRepository, EventRepository}; pub use execution::ExecutionRepository; +pub use execution_admission::ExecutionAdmissionRepository; pub use identity::{IdentityRepository, PermissionAssignmentRepository, PermissionSetRepository}; pub use inquiry::InquiryRepository; pub use key::KeyRepository; diff --git a/crates/common/src/repositories/queue_stats.rs b/crates/common/src/repositories/queue_stats.rs index 50f8a4f..a1ce886 100644 --- a/crates/common/src/repositories/queue_stats.rs +++ b/crates/common/src/repositories/queue_stats.rs @@ -3,7 +3,7 @@ //! Provides database operations for queue statistics persistence. use chrono::{DateTime, Utc}; -use sqlx::{PgPool, Postgres, QueryBuilder}; +use sqlx::{Executor, PgPool, Postgres, QueryBuilder}; use crate::error::Result; use crate::models::Id; @@ -38,7 +38,10 @@ pub struct QueueStatsRepository; impl QueueStatsRepository { /// Upsert queue statistics (insert or update) - pub async fn upsert(pool: &PgPool, input: UpsertQueueStatsInput) -> Result { + pub async fn upsert<'e, E>(executor: E, input: UpsertQueueStatsInput) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { let stats = sqlx::query_as::( r#" INSERT INTO queue_stats ( @@ -69,14 +72,17 @@ impl QueueStatsRepository { .bind(input.oldest_enqueued_at) .bind(input.total_enqueued) .bind(input.total_completed) - .fetch_one(pool) + .fetch_one(executor) .await?; Ok(stats) } /// Get queue statistics for a specific action - pub async fn find_by_action(pool: &PgPool, action_id: Id) -> Result> { + pub async fn find_by_action<'e, E>(executor: E, action_id: Id) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { let stats = sqlx::query_as::( r#" SELECT @@ -93,14 +99,17 @@ impl QueueStatsRepository { "#, ) .bind(action_id) - .fetch_optional(pool) + .fetch_optional(executor) .await?; Ok(stats) } /// List all queue statistics with active queues (queue_length > 0 or active_count > 0) - pub async fn list_active(pool: &PgPool) -> Result> { + pub async fn list_active<'e, E>(executor: E) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { let stats = sqlx::query_as::( r#" SELECT @@ -117,14 +126,17 @@ impl QueueStatsRepository { ORDER BY last_updated DESC "#, ) - .fetch_all(pool) + .fetch_all(executor) .await?; Ok(stats) } /// List all queue statistics - pub async fn list_all(pool: &PgPool) -> Result> { + pub async fn list_all<'e, E>(executor: E) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { let stats = sqlx::query_as::( r#" SELECT @@ -140,14 +152,17 @@ impl QueueStatsRepository { ORDER BY last_updated DESC "#, ) - .fetch_all(pool) + .fetch_all(executor) .await?; Ok(stats) } /// Delete queue statistics for a specific action - pub async fn delete(pool: &PgPool, action_id: Id) -> Result { + pub async fn delete<'e, E>(executor: E, action_id: Id) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { let result = sqlx::query( r#" DELETE FROM queue_stats @@ -155,7 +170,7 @@ impl QueueStatsRepository { "#, ) .bind(action_id) - .execute(pool) + .execute(executor) .await?; Ok(result.rows_affected() > 0) @@ -163,7 +178,7 @@ impl QueueStatsRepository { /// Batch upsert multiple queue statistics pub async fn batch_upsert( - pool: &PgPool, + executor: &PgPool, inputs: Vec, ) -> Result> { if inputs.is_empty() { @@ -213,14 +228,17 @@ impl QueueStatsRepository { let stats = query_builder .build_query_as::() - .fetch_all(pool) + .fetch_all(executor) .await?; Ok(stats) } /// Clear stale statistics (older than specified duration) - pub async fn clear_stale(pool: &PgPool, older_than_seconds: i64) -> Result { + pub async fn clear_stale<'e, E>(executor: E, older_than_seconds: i64) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { let result = sqlx::query( r#" DELETE FROM queue_stats @@ -230,7 +248,7 @@ impl QueueStatsRepository { "#, ) .bind(older_than_seconds) - .execute(pool) + .execute(executor) .await?; Ok(result.rows_affected()) diff --git a/crates/common/src/repositories/workflow.rs b/crates/common/src/repositories/workflow.rs index 3f885b1..3401a0b 100644 --- a/crates/common/src/repositories/workflow.rs +++ b/crates/common/src/repositories/workflow.rs @@ -612,6 +612,26 @@ impl Delete for WorkflowExecutionRepository { } impl WorkflowExecutionRepository { + pub async fn find_by_id_for_update<'e, E>( + executor: E, + id: Id, + ) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + sqlx::query_as::<_, WorkflowExecution>( + "SELECT id, execution, workflow_def, current_tasks, completed_tasks, failed_tasks, skipped_tasks, + variables, task_graph, status, error_message, paused, pause_reason, created, updated + FROM workflow_execution + WHERE id = $1 + FOR UPDATE" + ) + .bind(id) + .fetch_optional(executor) + .await + .map_err(Into::into) + } + pub async fn create_or_get_by_execution<'e, E>( executor: E, input: CreateWorkflowExecutionInput, diff --git a/crates/executor/src/completion_listener.rs b/crates/executor/src/completion_listener.rs index 252bc83..29c5aa7 100644 --- a/crates/executor/src/completion_listener.rs +++ b/crates/executor/src/completion_listener.rs @@ -157,7 +157,11 @@ impl CompletionListener { "Failed to advance workflow for execution {}: {}", execution_id, e ); - // Continue processing — don't fail the entire completion + if let Some(mq_err) = Self::retryable_mq_error(&e) { + return Err(mq_err.into()); + } + // Non-retryable workflow advancement errors are logged but + // do not fail the entire completion processing path. } } diff --git a/crates/executor/src/dead_letter_handler.rs b/crates/executor/src/dead_letter_handler.rs index 65a5123..367d7f4 100644 --- a/crates/executor/src/dead_letter_handler.rs +++ b/crates/executor/src/dead_letter_handler.rs @@ -14,7 +14,7 @@ use attune_common::{ error::Error, models::ExecutionStatus, mq::{Consumer, ConsumerConfig, MessageEnvelope, MessageType, MqResult}, - repositories::{execution::UpdateExecutionInput, ExecutionRepository, FindById, Update}, + repositories::{execution::UpdateExecutionInput, ExecutionRepository, FindById}, }; use chrono::Utc; use serde_json::json; @@ -179,13 +179,12 @@ async fn handle_execution_requested( } }; - // Only fail if still in a non-terminal state - if !matches!( - execution.status, - ExecutionStatus::Scheduled | ExecutionStatus::Running - ) { + // Only scheduled executions are still legitimately owned by the scheduler. + // If the execution already moved to running or a terminal state, this DLQ + // delivery is stale and must not overwrite newer state. + if execution.status != ExecutionStatus::Scheduled { info!( - "Execution {} already in terminal state {:?}, skipping", + "Execution {} already left Scheduled state ({:?}), skipping stale DLQ handling", execution_id, execution.status ); return Ok(()); // Acknowledge to remove from queue @@ -193,6 +192,12 @@ async fn handle_execution_requested( // Get worker info from payload for better error message let worker_id = envelope.payload.get("worker_id").and_then(|v| v.as_i64()); + let scheduled_attempt_updated_at = envelope + .payload + .get("scheduled_attempt_updated_at") + .and_then(|v| v.as_str()) + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)); let error_message = if let Some(wid) = worker_id { format!( @@ -214,24 +219,85 @@ async fn handle_execution_requested( ..Default::default() }; - match ExecutionRepository::update(pool, execution_id, update_input).await { - Ok(_) => { - info!( - "Successfully failed execution {} due to worker queue expiration", - execution_id - ); - Ok(()) + if let Some(timestamp) = scheduled_attempt_updated_at { + // Guard on both status and the exact updated_at from when the execution was + // scheduled — prevents overwriting state that changed after this DLQ message + // was enqueued. + match ExecutionRepository::update_if_status_and_updated_at( + pool, + execution_id, + ExecutionStatus::Scheduled, + timestamp, + update_input, + ) + .await + { + Ok(Some(_)) => { + info!( + "Successfully failed execution {} due to worker queue expiration", + execution_id + ); + Ok(()) + } + Ok(None) => { + info!( + "Skipping DLQ failure for execution {} because it already left Scheduled state", + execution_id + ); + Ok(()) + } + Err(e) => { + error!( + "Failed to update execution {} to failed state: {}", + execution_id, e + ); + Err(attune_common::mq::MqError::Consume(format!( + "Failed to update execution: {}", + e + ))) + } } - Err(e) => { - error!( - "Failed to update execution {} to failed state: {}", - execution_id, e - ); - // Return error to nack and potentially retry - Err(attune_common::mq::MqError::Consume(format!( - "Failed to update execution: {}", - e - ))) + } else { + // Fallback for DLQ messages that predate the scheduled_attempt_updated_at + // field. Use a status-only guard — same safety guarantee as the original code + // (never overwrites terminal or running state). + warn!( + "DLQ message for execution {} lacks scheduled_attempt_updated_at; \ + falling back to status-only guard", + execution_id + ); + match ExecutionRepository::update_if_status( + pool, + execution_id, + ExecutionStatus::Scheduled, + update_input, + ) + .await + { + Ok(Some(_)) => { + info!( + "Successfully failed execution {} due to worker queue expiration (status-only guard)", + execution_id + ); + Ok(()) + } + Ok(None) => { + info!( + "Skipping DLQ failure for execution {} because it already left Scheduled state", + execution_id + ); + Ok(()) + } + Err(e) => { + error!( + "Failed to update execution {} to failed state: {}", + execution_id, e + ); + Err(attune_common::mq::MqError::Consume(format!( + "Failed to update execution: {}", + e + ))) + } } } } diff --git a/crates/executor/src/enforcement_processor.rs b/crates/executor/src/enforcement_processor.rs index abfbe1d..40b6b1a 100644 --- a/crates/executor/src/enforcement_processor.rs +++ b/crates/executor/src/enforcement_processor.rs @@ -19,7 +19,7 @@ use attune_common::{ event::{EnforcementRepository, EventRepository, UpdateEnforcementInput}, execution::{CreateExecutionInput, ExecutionRepository}, rule::RuleRepository, - Create, FindById, + FindById, }, }; @@ -116,6 +116,14 @@ impl EnforcementProcessor { .await? .ok_or_else(|| anyhow::anyhow!("Enforcement not found: {}", enforcement_id))?; + if enforcement.status != EnforcementStatus::Created { + debug!( + "Enforcement {} already left Created state ({:?}), skipping duplicate processing", + enforcement_id, enforcement.status + ); + return Ok(()); + } + // Fetch associated rule let rule = RuleRepository::find_by_id( pool, @@ -135,7 +143,7 @@ impl EnforcementProcessor { // Evaluate whether to create execution if Self::should_create_execution(&enforcement, &rule, event.as_ref())? { - Self::create_execution( + let execution_created = Self::create_execution( pool, publisher, policy_enforcer, @@ -145,10 +153,10 @@ impl EnforcementProcessor { ) .await?; - // Update enforcement status to Processed after successful execution creation - EnforcementRepository::update_loaded( + let updated = EnforcementRepository::update_loaded_if_status( pool, &enforcement, + EnforcementStatus::Created, UpdateEnforcementInput { status: Some(EnforcementStatus::Processed), payload: None, @@ -157,17 +165,27 @@ impl EnforcementProcessor { ) .await?; - debug!("Updated enforcement {} status to Processed", enforcement_id); + if updated.is_some() { + debug!( + "Updated enforcement {} status to Processed after {} execution path", + enforcement_id, + if execution_created { + "new" + } else { + "idempotent" + } + ); + } } else { info!( "Skipping execution creation for enforcement: {}", enforcement_id ); - // Update enforcement status to Disabled since it was not actionable - EnforcementRepository::update_loaded( + let updated = EnforcementRepository::update_loaded_if_status( pool, &enforcement, + EnforcementStatus::Created, UpdateEnforcementInput { status: Some(EnforcementStatus::Disabled), payload: None, @@ -176,10 +194,12 @@ impl EnforcementProcessor { ) .await?; - debug!( - "Updated enforcement {} status to Disabled (skipped)", - enforcement_id - ); + if updated.is_some() { + debug!( + "Updated enforcement {} status to Disabled (skipped)", + enforcement_id + ); + } } Ok(()) @@ -234,7 +254,7 @@ impl EnforcementProcessor { _queue_manager: &ExecutionQueueManager, enforcement: &Enforcement, rule: &Rule, - ) -> Result<()> { + ) -> Result { // Extract action ID — should_create_execution already verified it's Some, // but guard defensively here as well. let action_id = match rule.action { @@ -275,44 +295,60 @@ impl EnforcementProcessor { workflow_task: None, // Non-workflow execution }; - let execution = ExecutionRepository::create(pool, execution_input).await?; + let execution_result = ExecutionRepository::create_top_level_for_enforcement_if_absent( + pool, + execution_input, + enforcement.id, + ) + .await?; + let execution = execution_result.execution; - info!( - "Created execution: {} for enforcement: {}", - execution.id, enforcement.id - ); + if execution_result.created { + info!( + "Created execution: {} for enforcement: {}", + execution.id, enforcement.id + ); + } else { + info!( + "Reusing execution: {} for enforcement: {}", + execution.id, enforcement.id + ); + } - // Publish ExecutionRequested message - let payload = ExecutionRequestedPayload { - execution_id: execution.id, - action_id: Some(action_id), - action_ref: action_ref.clone(), - parent_id: None, - enforcement_id: Some(enforcement.id), - config: enforcement.config.clone(), - }; + if execution_result.created + || execution.status == attune_common::models::enums::ExecutionStatus::Requested + { + let payload = ExecutionRequestedPayload { + execution_id: execution.id, + action_id: Some(action_id), + action_ref: action_ref.clone(), + parent_id: None, + enforcement_id: Some(enforcement.id), + config: execution.config.clone(), + }; - let envelope = - MessageEnvelope::new(attune_common::mq::MessageType::ExecutionRequested, payload) - .with_source("executor"); + let envelope = + MessageEnvelope::new(attune_common::mq::MessageType::ExecutionRequested, payload) + .with_source("executor"); - // Publish to execution requests queue with routing key - let routing_key = "execution.requested"; - let exchange = "attune.executions"; + // Publish to execution requests queue with routing key + let routing_key = "execution.requested"; + let exchange = "attune.executions"; - publisher - .publish_envelope_with_routing(&envelope, exchange, routing_key) - .await?; + publisher + .publish_envelope_with_routing(&envelope, exchange, routing_key) + .await?; - info!( - "Published execution.requested message for execution: {} (enforcement: {}, action: {})", - execution.id, enforcement.id, action_id - ); + info!( + "Published execution.requested message for execution: {} (enforcement: {}, action: {})", + execution.id, enforcement.id, action_id + ); + } // NOTE: Queue slot will be released when worker publishes execution.completed // and CompletionListener calls queue_manager.notify_completion(action_id) - Ok(()) + Ok(execution_result.created) } } diff --git a/crates/executor/src/event_processor.rs b/crates/executor/src/event_processor.rs index 8cc118f..7a4664b 100644 --- a/crates/executor/src/event_processor.rs +++ b/crates/executor/src/event_processor.rs @@ -19,7 +19,7 @@ use attune_common::{ event::{CreateEnforcementInput, EnforcementRepository, EventRepository}, pack::PackRepository, rule::RuleRepository, - Create, FindById, List, + FindById, List, }, template_resolver::{resolve_templates, TemplateContext}, }; @@ -206,32 +206,43 @@ impl EventProcessor { conditions: rule.conditions.clone(), }; - let enforcement = EnforcementRepository::create(pool, create_input).await?; + let enforcement_result = + EnforcementRepository::create_or_get_by_rule_event(pool, create_input).await?; + let enforcement = enforcement_result.enforcement; - info!( - "Enforcement {} created for rule {} (event: {})", - enforcement.id, rule.r#ref, event.id - ); + if enforcement_result.created { + info!( + "Enforcement {} created for rule {} (event: {})", + enforcement.id, rule.r#ref, event.id + ); + } else { + info!( + "Reusing enforcement {} for rule {} (event: {})", + enforcement.id, rule.r#ref, event.id + ); + } - // Publish EnforcementCreated message - let enforcement_payload = EnforcementCreatedPayload { - enforcement_id: enforcement.id, - rule_id: Some(rule.id), - rule_ref: rule.r#ref.clone(), - event_id: Some(event.id), - trigger_ref: event.trigger_ref.clone(), - payload: payload.clone(), - }; + if enforcement_result.created || enforcement.status == EnforcementStatus::Created { + let enforcement_payload = EnforcementCreatedPayload { + enforcement_id: enforcement.id, + rule_id: Some(rule.id), + rule_ref: rule.r#ref.clone(), + event_id: Some(event.id), + trigger_ref: event.trigger_ref.clone(), + payload: payload.clone(), + }; - let envelope = MessageEnvelope::new(MessageType::EnforcementCreated, enforcement_payload) - .with_source("event-processor"); + let envelope = + MessageEnvelope::new(MessageType::EnforcementCreated, enforcement_payload) + .with_source("event-processor"); - publisher.publish_envelope(&envelope).await?; + publisher.publish_envelope(&envelope).await?; - debug!( - "Published EnforcementCreated message for enforcement {}", - enforcement.id - ); + debug!( + "Published EnforcementCreated message for enforcement {}", + enforcement.id + ); + } Ok(()) } diff --git a/crates/executor/src/inquiry_handler.rs b/crates/executor/src/inquiry_handler.rs index 9f05e09..1e80574 100644 --- a/crates/executor/src/inquiry_handler.rs +++ b/crates/executor/src/inquiry_handler.rs @@ -9,13 +9,14 @@ use anyhow::Result; use attune_common::{ + error::Error as AttuneError, models::{enums::InquiryStatus, inquiry::Inquiry, Execution, Id}, mq::{ Consumer, InquiryCreatedPayload, InquiryRespondedPayload, MessageEnvelope, MessageType, Publisher, }, repositories::{ - execution::{ExecutionRepository, UpdateExecutionInput}, + execution::{ExecutionRepository, UpdateExecutionInput, SELECT_COLUMNS}, inquiry::{CreateInquiryInput, InquiryRepository}, Create, FindById, Update, }, @@ -28,6 +29,8 @@ use tracing::{debug, error, info, warn}; /// Special key in action result to indicate an inquiry should be created pub const INQUIRY_RESULT_KEY: &str = "__inquiry"; +const INQUIRY_ID_RESULT_KEY: &str = "__inquiry_id"; +const INQUIRY_CREATED_PUBLISHED_RESULT_KEY: &str = "__inquiry_created_published"; /// Structure for inquiry data in action results #[derive(Debug, Clone, serde::Deserialize)] @@ -104,69 +107,198 @@ impl InquiryHandler { let inquiry_request: InquiryRequest = serde_json::from_value(inquiry_value.clone())?; Ok(inquiry_request) } +} +/// Returns true when `e` represents a PostgreSQL unique constraint violation (code 23505). +fn is_db_unique_violation(e: &AttuneError) -> bool { + if let AttuneError::Database(sqlx_err) = e { + return sqlx_err + .as_database_error() + .and_then(|db| db.code()) + .as_deref() + == Some("23505"); + } + false +} + +impl InquiryHandler { /// Create an inquiry for an execution and pause it pub async fn create_inquiry_from_result( pool: &PgPool, publisher: &Publisher, execution_id: Id, - result: &JsonValue, + _result: &JsonValue, ) -> Result { info!("Creating inquiry for execution {}", execution_id); - // Extract inquiry request - let inquiry_request = Self::extract_inquiry_request(result)?; + let mut tx = pool.begin().await?; + let execution = sqlx::query_as::<_, Execution>(&format!( + "SELECT {SELECT_COLUMNS} FROM execution WHERE id = $1 FOR UPDATE" + )) + .bind(execution_id) + .fetch_one(&mut *tx) + .await?; - // Calculate timeout if specified + let mut result = execution + .result + .clone() + .ok_or_else(|| anyhow::anyhow!("Execution {} has no result", execution_id))?; + let inquiry_request = Self::extract_inquiry_request(&result)?; let timeout_at = inquiry_request .timeout_seconds .map(|seconds| Utc::now() + chrono::Duration::seconds(seconds)); - // Create inquiry in database - let inquiry_input = CreateInquiryInput { - execution: execution_id, - prompt: inquiry_request.prompt.clone(), - response_schema: inquiry_request.response_schema.clone(), - assigned_to: inquiry_request.assigned_to, - status: InquiryStatus::Pending, - response: None, - timeout_at, + let existing_inquiry_id = result + .get(INQUIRY_ID_RESULT_KEY) + .and_then(|value| value.as_i64()); + let published = result + .get(INQUIRY_CREATED_PUBLISHED_RESULT_KEY) + .and_then(|value| value.as_bool()) + .unwrap_or(false); + + let (inquiry, should_publish) = if let Some(inquiry_id) = existing_inquiry_id { + let inquiry = InquiryRepository::find_by_id(&mut *tx, inquiry_id) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "Inquiry {} referenced by execution {} result not found", + inquiry_id, + execution_id + ) + })?; + let should_publish = !published && inquiry.status == InquiryStatus::Pending; + (inquiry, should_publish) + } else { + let create_result = InquiryRepository::create( + &mut *tx, + CreateInquiryInput { + execution: execution_id, + prompt: inquiry_request.prompt.clone(), + response_schema: inquiry_request.response_schema.clone(), + assigned_to: inquiry_request.assigned_to, + status: InquiryStatus::Pending, + response: None, + timeout_at, + }, + ) + .await; + + let inquiry = match create_result { + Ok(inq) => inq, + Err(e) => { + // Unique constraint violation (23505): another replica already + // created the inquiry for this execution. Treat as idempotent + // success — drop the aborted transaction and return the existing row. + if is_db_unique_violation(&e) { + info!( + "Inquiry for execution {} already created by another replica \ + (unique constraint 23505); treating as idempotent", + execution_id + ); + // tx is in an aborted state; dropping it issues ROLLBACK. + drop(tx); + let inquiries = + InquiryRepository::find_by_execution(pool, execution_id).await?; + let existing = inquiries.into_iter().next().ok_or_else(|| { + anyhow::anyhow!( + "Inquiry for execution {} not found after unique constraint violation", + execution_id + ) + })?; + return Ok(existing); + } + return Err(e.into()); + } + }; + + Self::set_inquiry_result_metadata(&mut result, inquiry.id, false)?; + ExecutionRepository::update( + &mut *tx, + execution_id, + UpdateExecutionInput { + result: Some(result), + ..Default::default() + }, + ) + .await?; + + (inquiry, true) }; - let inquiry = InquiryRepository::create(pool, inquiry_input).await?; + tx.commit().await?; - info!( - "Created inquiry {} for execution {}", - inquiry.id, execution_id - ); + if should_publish { + let payload = InquiryCreatedPayload { + inquiry_id: inquiry.id, + execution_id, + prompt: inquiry_request.prompt, + response_schema: inquiry_request.response_schema, + assigned_to: inquiry_request.assigned_to, + timeout_at, + }; - // Update execution status to paused/waiting - // Note: We use a special status or keep it as "running" with inquiry tracking - // For now, we'll keep status as-is and track via inquiry relationship + let envelope = + MessageEnvelope::new(MessageType::InquiryCreated, payload).with_source("executor"); - // Publish InquiryCreated message - let payload = InquiryCreatedPayload { - inquiry_id: inquiry.id, - execution_id, - prompt: inquiry_request.prompt, - response_schema: inquiry_request.response_schema, - assigned_to: inquiry_request.assigned_to, - timeout_at, - }; + publisher.publish_envelope(&envelope).await?; + Self::mark_inquiry_created_published(pool, execution_id).await?; - let envelope = - MessageEnvelope::new(MessageType::InquiryCreated, payload).with_source("executor"); - - publisher.publish_envelope(&envelope).await?; - - debug!( - "Published InquiryCreated message for inquiry {}", - inquiry.id - ); + debug!( + "Published InquiryCreated message for inquiry {}", + inquiry.id + ); + } Ok(inquiry) } + fn set_inquiry_result_metadata( + result: &mut JsonValue, + inquiry_id: Id, + published: bool, + ) -> Result<()> { + let obj = result + .as_object_mut() + .ok_or_else(|| anyhow::anyhow!("execution result is not a JSON object"))?; + + obj.insert( + INQUIRY_ID_RESULT_KEY.to_string(), + JsonValue::Number(inquiry_id.into()), + ); + obj.insert( + INQUIRY_CREATED_PUBLISHED_RESULT_KEY.to_string(), + JsonValue::Bool(published), + ); + Ok(()) + } + + async fn mark_inquiry_created_published(pool: &PgPool, execution_id: Id) -> Result<()> { + let execution = ExecutionRepository::find_by_id(pool, execution_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?; + let mut result = execution + .result + .clone() + .ok_or_else(|| anyhow::anyhow!("Execution {} has no result", execution_id))?; + let inquiry_id = result + .get(INQUIRY_ID_RESULT_KEY) + .and_then(|value| value.as_i64()) + .ok_or_else(|| anyhow::anyhow!("Execution {} missing __inquiry_id", execution_id))?; + + Self::set_inquiry_result_metadata(&mut result, inquiry_id, true)?; + ExecutionRepository::update( + pool, + execution_id, + UpdateExecutionInput { + result: Some(result), + ..Default::default() + }, + ) + .await?; + + Ok(()) + } + /// Handle an inquiry response message async fn handle_inquiry_response( pool: &PgPool, @@ -235,9 +367,13 @@ impl InquiryHandler { if let Some(obj) = updated_result.as_object_mut() { obj.insert("__inquiry_response".to_string(), response.clone()); obj.insert( - "__inquiry_id".to_string(), + INQUIRY_ID_RESULT_KEY.to_string(), JsonValue::Number(inquiry.id.into()), ); + obj.insert( + INQUIRY_CREATED_PUBLISHED_RESULT_KEY.to_string(), + JsonValue::Bool(true), + ); } // Update execution with new result diff --git a/crates/executor/src/policy_enforcer.rs b/crates/executor/src/policy_enforcer.rs index 56e9c6f..93b59df 100644 --- a/crates/executor/src/policy_enforcer.rs +++ b/crates/executor/src/policy_enforcer.rs @@ -933,8 +933,8 @@ mod tests { assert_eq!(enforcer.get_concurrency_limit(2, Some(200)), Some(20)); } - #[test] - fn test_build_parameter_group_key_uses_exact_values() { + #[tokio::test] + async fn test_build_parameter_group_key_uses_exact_values() { let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap(); let enforcer = PolicyEnforcer::new(pool); let config = serde_json::json!({ diff --git a/crates/executor/src/queue_manager.rs b/crates/executor/src/queue_manager.rs index 23642e1..1787f88 100644 --- a/crates/executor/src/queue_manager.rs +++ b/crates/executor/src/queue_manager.rs @@ -23,7 +23,13 @@ use tokio::time::{sleep, Duration, Instant}; use tracing::{debug, info, warn}; use attune_common::models::Id; -use attune_common::repositories::queue_stats::{QueueStatsRepository, UpsertQueueStatsInput}; +use attune_common::repositories::{ + execution_admission::{ + AdmissionEnqueueOutcome, AdmissionQueueStats, AdmissionQueuedRemovalOutcome, + AdmissionSlotReleaseOutcome, ExecutionAdmissionRepository, + }, + queue_stats::{QueueStatsRepository, UpsertQueueStatsInput}, +}; /// Configuration for the queue manager #[derive(Debug, Clone, Serialize, Deserialize)] @@ -51,6 +57,8 @@ impl Default for QueueConfig { struct QueueEntry { /// Execution or enforcement ID being queued execution_id: Id, + /// Durable FIFO position for the DB-backed admission path. + queue_order: Option, /// When this entry was added to the queue enqueued_at: DateTime, } @@ -224,6 +232,12 @@ impl ExecutionQueueManager { max_concurrent: u32, group_key: Option, ) -> Result<()> { + if self.db_pool.is_some() { + return self + .enqueue_and_wait_db(action_id, execution_id, max_concurrent, group_key) + .await; + } + if self.active_execution_keys.contains_key(&execution_id) { debug!( "Execution {} already owns an active slot, skipping queue wait", @@ -311,6 +325,7 @@ impl ExecutionQueueManager { // Add to queue let entry = QueueEntry { execution_id, + queue_order: None, enqueued_at: Utc::now(), }; @@ -392,6 +407,24 @@ impl ExecutionQueueManager { max_concurrent: u32, group_key: Option, ) -> Result { + if let Some(pool) = &self.db_pool { + return Ok( + match ExecutionAdmissionRepository::enqueue( + pool, + self.config.max_queue_length, + action_id, + execution_id, + max_concurrent, + group_key, + ) + .await? + { + AdmissionEnqueueOutcome::Acquired => SlotEnqueueOutcome::Acquired, + AdmissionEnqueueOutcome::Enqueued => SlotEnqueueOutcome::Enqueued, + }, + ); + } + if self.active_execution_keys.contains_key(&execution_id) { debug!( "Execution {} already owns an active slot, treating as acquired", @@ -463,6 +496,7 @@ impl ExecutionQueueManager { queue.queue.push_back(QueueEntry { execution_id, + queue_order: None, enqueued_at: Utc::now(), }); queue.total_enqueued += 1; @@ -480,6 +514,21 @@ impl ExecutionQueueManager { max_concurrent: u32, group_key: Option, ) -> Result { + if let Some(pool) = &self.db_pool { + let outcome = ExecutionAdmissionRepository::try_acquire( + pool, + action_id, + execution_id, + max_concurrent, + group_key, + ) + .await?; + return Ok(SlotAcquireOutcome { + acquired: outcome.acquired, + current_count: outcome.current_count, + }); + } + let queue_key = self.queue_key(action_id, group_key); let queue_arc = self .get_or_create_queue(queue_key.clone(), max_concurrent) @@ -530,6 +579,14 @@ impl ExecutionQueueManager { &self, execution_id: Id, ) -> Result> { + if let Some(pool) = &self.db_pool { + return Ok( + ExecutionAdmissionRepository::release_active_slot(pool, execution_id) + .await? + .map(Self::map_release_outcome), + ); + } + let Some((_, queue_key)) = self.active_execution_keys.remove(&execution_id) else { debug!( "No active queue slot found for execution {} (queue may have been cleared)", @@ -610,6 +667,16 @@ impl ExecutionQueueManager { execution_id: Id, outcome: &SlotReleaseOutcome, ) -> Result<()> { + if let Some(pool) = &self.db_pool { + ExecutionAdmissionRepository::restore_active_slot( + pool, + execution_id, + &Self::to_admission_release_outcome(outcome), + ) + .await?; + return Ok(()); + } + let action_id = outcome.queue_key.action_id; let queue_arc = self.get_or_create_queue(outcome.queue_key.clone(), 1).await; let mut queue = queue_arc.lock().await; @@ -630,6 +697,14 @@ impl ExecutionQueueManager { &self, execution_id: Id, ) -> Result> { + if let Some(pool) = &self.db_pool { + return Ok( + ExecutionAdmissionRepository::remove_queued_execution(pool, execution_id) + .await? + .map(Self::map_removal_outcome), + ); + } + for entry in self.queues.iter() { let queue_key = entry.key().clone(); let queue_arc = entry.value().clone(); @@ -666,6 +741,15 @@ impl ExecutionQueueManager { } pub async fn restore_queued_execution(&self, outcome: &QueuedRemovalOutcome) -> Result<()> { + if let Some(pool) = &self.db_pool { + ExecutionAdmissionRepository::restore_queued_execution( + pool, + &Self::to_admission_removal_outcome(outcome), + ) + .await?; + return Ok(()); + } + let action_id = outcome.queue_key.action_id; let queue_arc = self.get_or_create_queue(outcome.queue_key.clone(), 1).await; let mut queue = queue_arc.lock().await; @@ -709,6 +793,19 @@ impl ExecutionQueueManager { /// Get statistics for a specific action's queue pub async fn get_queue_stats(&self, action_id: Id) -> Option { + if let Some(pool) = &self.db_pool { + return ExecutionAdmissionRepository::get_queue_stats(pool, action_id) + .await + .map(|stats| stats.map(Self::map_queue_stats)) + .unwrap_or_else(|err| { + warn!( + "Failed to load shared queue stats for action {}: {}", + action_id, err + ); + None + }); + } + let queue_arcs: Vec>> = self .queues .iter() @@ -757,6 +854,26 @@ impl ExecutionQueueManager { /// Get statistics for all queues #[allow(dead_code)] pub async fn get_all_queue_stats(&self) -> Vec { + if let Some(pool) = &self.db_pool { + return QueueStatsRepository::list_all(pool) + .await + .map(|stats| { + stats + .into_iter() + .map(|stat| QueueStats { + action_id: stat.action_id, + queue_length: stat.queue_length as usize, + active_count: stat.active_count as u32, + max_concurrent: stat.max_concurrent as u32, + oldest_enqueued_at: stat.oldest_enqueued_at, + total_enqueued: stat.total_enqueued as u64, + total_completed: stat.total_completed as u64, + }) + .collect() + }) + .unwrap_or_default(); + } + let mut stats = Vec::new(); let mut action_ids = std::collections::BTreeSet::new(); @@ -787,6 +904,14 @@ impl ExecutionQueueManager { /// * `Ok(false)` - Execution not found in queue #[allow(dead_code)] pub async fn cancel_execution(&self, action_id: Id, execution_id: Id) -> Result { + if let Some(pool) = &self.db_pool { + return Ok( + ExecutionAdmissionRepository::remove_queued_execution(pool, execution_id) + .await? + .is_some(), + ); + } + debug!( "Attempting to cancel execution {} for action {}", execution_id, action_id @@ -838,12 +963,147 @@ impl ExecutionQueueManager { /// Get the number of actions with active queues #[allow(dead_code)] pub fn active_queue_count(&self) -> usize { + if self.db_pool.is_some() { + return 0; + } + self.queues .iter() .map(|entry| entry.key().action_id) .collect::>() .len() } + + async fn enqueue_and_wait_db( + &self, + action_id: Id, + execution_id: Id, + max_concurrent: u32, + group_key: Option, + ) -> Result<()> { + let pool = self + .db_pool + .as_ref() + .ok_or_else(|| anyhow::anyhow!("database pool required for shared admission"))?; + + match ExecutionAdmissionRepository::enqueue( + pool, + self.config.max_queue_length, + action_id, + execution_id, + max_concurrent, + group_key.clone(), + ) + .await? + { + AdmissionEnqueueOutcome::Acquired => return Ok(()), + AdmissionEnqueueOutcome::Enqueued => {} + } + + let deadline = Instant::now() + Duration::from_secs(self.config.queue_timeout_seconds); + loop { + sleep(Duration::from_millis(10)).await; + + match ExecutionAdmissionRepository::wait_status(pool, execution_id).await? { + Some(true) => return Ok(()), + Some(false) => {} + None => { + return Err(anyhow::anyhow!( + "Queue state for execution {} disappeared while waiting", + execution_id + )); + } + } + + if Instant::now() < deadline { + continue; + } + + match ExecutionAdmissionRepository::remove_queued_execution(pool, execution_id).await? { + Some(_) => { + return Err(anyhow::anyhow!( + "Queue timeout for execution {}: waited {} seconds", + execution_id, + self.config.queue_timeout_seconds + )); + } + None => { + if matches!( + ExecutionAdmissionRepository::wait_status(pool, execution_id).await?, + Some(true) + ) { + return Ok(()); + } + + return Err(anyhow::anyhow!( + "Queue timeout for execution {}: waited {} seconds", + execution_id, + self.config.queue_timeout_seconds + )); + } + } + } + } + + fn map_release_outcome(outcome: AdmissionSlotReleaseOutcome) -> SlotReleaseOutcome { + SlotReleaseOutcome { + next_execution_id: outcome.next_execution_id, + queue_key: QueueKey { + action_id: outcome.action_id, + group_key: outcome.group_key, + }, + } + } + + fn to_admission_release_outcome(outcome: &SlotReleaseOutcome) -> AdmissionSlotReleaseOutcome { + AdmissionSlotReleaseOutcome { + action_id: outcome.queue_key.action_id, + group_key: outcome.queue_key.group_key.clone(), + next_execution_id: outcome.next_execution_id, + } + } + + fn map_removal_outcome(outcome: AdmissionQueuedRemovalOutcome) -> QueuedRemovalOutcome { + QueuedRemovalOutcome { + next_execution_id: outcome.next_execution_id, + queue_key: QueueKey { + action_id: outcome.action_id, + group_key: outcome.group_key, + }, + removed_entry: QueueEntry { + execution_id: outcome.execution_id, + queue_order: Some(outcome.queue_order), + enqueued_at: outcome.enqueued_at, + }, + removed_index: outcome.removed_index, + } + } + + fn to_admission_removal_outcome( + outcome: &QueuedRemovalOutcome, + ) -> AdmissionQueuedRemovalOutcome { + AdmissionQueuedRemovalOutcome { + action_id: outcome.queue_key.action_id, + group_key: outcome.queue_key.group_key.clone(), + next_execution_id: outcome.next_execution_id, + execution_id: outcome.removed_entry.execution_id, + queue_order: outcome.removed_entry.queue_order.unwrap_or_default(), + enqueued_at: outcome.removed_entry.enqueued_at, + removed_index: outcome.removed_index, + } + } + + fn map_queue_stats(stats: AdmissionQueueStats) -> QueueStats { + QueueStats { + action_id: stats.action_id, + queue_length: stats.queue_length, + active_count: stats.active_count, + max_concurrent: stats.max_concurrent, + oldest_enqueued_at: stats.oldest_enqueued_at, + total_enqueued: stats.total_enqueued, + total_completed: stats.total_completed, + } + } } #[cfg(test)] diff --git a/crates/executor/src/scheduler.rs b/crates/executor/src/scheduler.rs index 0b824f2..30eccc7 100644 --- a/crates/executor/src/scheduler.rs +++ b/crates/executor/src/scheduler.rs @@ -25,12 +25,12 @@ use attune_common::{ workflow::{ CreateWorkflowExecutionInput, WorkflowDefinitionRepository, WorkflowExecutionRepository, }, - Create, FindById, FindByRef, Update, + FindById, FindByRef, Update, }, runtime_detection::runtime_aliases_contain, workflow::WorkflowDefinition, }; -use chrono::Utc; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use sqlx::{PgConnection, PgPool}; @@ -102,6 +102,17 @@ struct ExecutionScheduledPayload { worker_id: i64, action_ref: String, config: Option, + scheduled_attempt_updated_at: DateTime, +} + +#[derive(Debug, Clone)] +struct PendingExecutionRequested { + execution_id: i64, + action_id: i64, + action_ref: String, + parent_id: i64, + enforcement_id: Option, + config: Option, } /// Execution scheduler that routes executions to workers @@ -509,6 +520,7 @@ impl ExecutionScheduler { &worker.id, &envelope.payload.action_ref, &execution_config, + scheduled_execution.updated, &action, ) .await @@ -1021,13 +1033,13 @@ impl ExecutionScheduler { #[allow(clippy::too_many_arguments)] async fn dispatch_workflow_task_with_conn( conn: &mut PgConnection, - publisher: &Publisher, _round_robin_counter: &AtomicUsize, parent_execution: &Execution, workflow_execution_id: &i64, task_node: &crate::workflow::graph::TaskNode, wf_ctx: &WorkflowContext, triggered_by: Option<&str>, + pending_messages: &mut Vec, ) -> Result<()> { let action_ref: String = match &task_node.action { Some(a) => a.clone(), @@ -1059,7 +1071,6 @@ impl ExecutionScheduler { if let Some(ref with_items_expr) = task_node.with_items { return Self::dispatch_with_items_task_with_conn( conn, - publisher, parent_execution, workflow_execution_id, task_node, @@ -1068,6 +1079,7 @@ impl ExecutionScheduler { with_items_expr, wf_ctx, triggered_by, + pending_messages, ) .await; } @@ -1115,36 +1127,29 @@ impl ExecutionScheduler { completed_at: None, }; - let child_execution = if let Some(existing) = ExecutionRepository::find_by_workflow_task( + let child_execution_result = ExecutionRepository::create_workflow_task_if_absent_with_conn( &mut *conn, + CreateExecutionInput { + action: Some(task_action.id), + action_ref: action_ref.clone(), + config: task_config, + env_vars: parent_execution.env_vars.clone(), + parent: Some(parent_execution.id), + enforcement: parent_execution.enforcement, + executor: None, + worker: None, + status: ExecutionStatus::Requested, + result: None, + workflow_task: Some(workflow_task), + }, *workflow_execution_id, &task_node.name, None, ) - .await? - { - existing - } else { - ExecutionRepository::create( - &mut *conn, - CreateExecutionInput { - action: Some(task_action.id), - action_ref: action_ref.clone(), - config: task_config, - env_vars: parent_execution.env_vars.clone(), - parent: Some(parent_execution.id), - enforcement: parent_execution.enforcement, - executor: None, - worker: None, - status: ExecutionStatus::Requested, - result: None, - workflow_task: Some(workflow_task), - }, - ) - .await? - }; + .await?; + let child_execution = child_execution_result.execution; - if child_execution.status == ExecutionStatus::Requested { + if child_execution_result.created { info!( "Created child execution {} for workflow task '{}' (action '{}', workflow_execution {})", child_execution.id, task_node.name, action_ref, workflow_execution_id @@ -1157,24 +1162,14 @@ impl ExecutionScheduler { } if child_execution.status == ExecutionStatus::Requested { - let payload = ExecutionRequestedPayload { + pending_messages.push(PendingExecutionRequested { execution_id: child_execution.id, - action_id: Some(task_action.id), + action_id: task_action.id, action_ref: action_ref.clone(), - parent_id: Some(parent_execution.id), + parent_id: parent_execution.id, enforcement_id: parent_execution.enforcement, config: child_execution.config.clone(), - }; - - let envelope = MessageEnvelope::new(MessageType::ExecutionRequested, payload) - .with_source("executor-scheduler"); - - publisher.publish_envelope(&envelope).await?; - - info!( - "Published ExecutionRequested for child execution {} (task '{}')", - child_execution.id, task_node.name - ); + }); } Ok(()) @@ -1392,7 +1387,6 @@ impl ExecutionScheduler { #[allow(clippy::too_many_arguments)] async fn dispatch_with_items_task_with_conn( conn: &mut PgConnection, - publisher: &Publisher, parent_execution: &Execution, workflow_execution_id: &i64, task_node: &crate::workflow::graph::TaskNode, @@ -1401,6 +1395,7 @@ impl ExecutionScheduler { with_items_expr: &str, wf_ctx: &WorkflowContext, triggered_by: Option<&str>, + pending_messages: &mut Vec, ) -> Result<()> { let items_value = wf_ctx .render_json(&JsonValue::String(with_items_expr.to_string())) @@ -1511,18 +1506,8 @@ impl ExecutionScheduler { completed_at: None, }; - let child_execution = if let Some(existing) = - ExecutionRepository::find_by_workflow_task( - &mut *conn, - *workflow_execution_id, - &task_node.name, - Some(index as i32), - ) - .await? - { - existing - } else { - ExecutionRepository::create( + let child_execution_result = + ExecutionRepository::create_workflow_task_if_absent_with_conn( &mut *conn, CreateExecutionInput { action: Some(task_action.id), @@ -1537,11 +1522,14 @@ impl ExecutionScheduler { result: None, workflow_task: Some(workflow_task), }, + *workflow_execution_id, + &task_node.name, + Some(index as i32), ) - .await? - }; + .await?; + let child_execution = child_execution_result.execution; - if child_execution.status == ExecutionStatus::Requested { + if child_execution_result.created { info!( "Created with_items child execution {} for task '{}' item {} \ (action '{}', workflow_execution {})", @@ -1566,11 +1554,11 @@ impl ExecutionScheduler { if child.status == ExecutionStatus::Requested { Self::publish_execution_requested_with_conn( &mut *conn, - publisher, child_id, task_action.id, action_ref, parent_execution, + pending_messages, ) .await?; } @@ -1622,25 +1610,17 @@ impl ExecutionScheduler { Ok(()) } - async fn publish_execution_requested_with_conn( - conn: &mut PgConnection, + async fn publish_execution_requested_payload( publisher: &Publisher, - execution_id: i64, - action_id: i64, - action_ref: &str, - parent_execution: &Execution, + pending: PendingExecutionRequested, ) -> Result<()> { - let child = ExecutionRepository::find_by_id(&mut *conn, execution_id) - .await? - .ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?; - let payload = ExecutionRequestedPayload { - execution_id: child.id, - action_id: Some(action_id), - action_ref: action_ref.to_string(), - parent_id: Some(parent_execution.id), - enforcement_id: parent_execution.enforcement, - config: child.config.clone(), + execution_id: pending.execution_id, + action_id: Some(pending.action_id), + action_ref: pending.action_ref, + parent_id: Some(pending.parent_id), + enforcement_id: pending.enforcement_id, + config: pending.config, }; let envelope = MessageEnvelope::new(MessageType::ExecutionRequested, payload) @@ -1650,12 +1630,36 @@ impl ExecutionScheduler { debug!( "Published deferred ExecutionRequested for child execution {}", - execution_id + envelope.payload.execution_id ); Ok(()) } + async fn publish_execution_requested_with_conn( + conn: &mut PgConnection, + execution_id: i64, + action_id: i64, + action_ref: &str, + parent_execution: &Execution, + pending_messages: &mut Vec, + ) -> Result<()> { + let child = ExecutionRepository::find_by_id(&mut *conn, execution_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?; + + pending_messages.push(PendingExecutionRequested { + execution_id: child.id, + action_id, + action_ref: action_ref.to_string(), + parent_id: parent_execution.id, + enforcement_id: parent_execution.enforcement, + config: child.config.clone(), + }); + + Ok(()) + } + /// Publish the next `Requested`-status with_items siblings to fill freed /// concurrency slots. /// @@ -1734,11 +1738,11 @@ impl ExecutionScheduler { async fn publish_pending_with_items_children_with_conn( conn: &mut PgConnection, - publisher: &Publisher, parent_execution: &Execution, workflow_execution_id: i64, task_name: &str, slots: usize, + pending_messages: &mut Vec, ) -> Result { if slots == 0 { return Ok(0); @@ -1768,11 +1772,11 @@ impl ExecutionScheduler { if let Err(e) = Self::publish_execution_requested_with_conn( &mut *conn, - publisher, *child_id, *action_id, &child.action_ref, parent_execution, + pending_messages, ) .await { @@ -1819,12 +1823,35 @@ impl ExecutionScheduler { .execute(&mut *lock_conn) .await?; - let result = Self::advance_workflow_serialized( - &mut lock_conn, - publisher, - round_robin_counter, - execution, - ) + let result = async { + sqlx::query("BEGIN").execute(&mut *lock_conn).await?; + + let advance_result = + Self::advance_workflow_serialized(&mut lock_conn, round_robin_counter, execution) + .await; + + match advance_result { + Ok(pending_messages) => { + sqlx::query("COMMIT").execute(&mut *lock_conn).await?; + + for pending in pending_messages { + Self::publish_execution_requested_payload(publisher, pending).await?; + } + + Ok(()) + } + Err(err) => { + let rollback_result = sqlx::query("ROLLBACK").execute(&mut *lock_conn).await; + if let Err(rollback_err) = rollback_result { + error!( + "Failed to roll back workflow_execution {} advancement transaction: {}", + workflow_execution_id, rollback_err + ); + } + Err(err) + } + } + } .await; let unlock_result = sqlx::query("SELECT pg_advisory_unlock($1)") .bind(workflow_execution_id) @@ -1838,13 +1865,12 @@ impl ExecutionScheduler { async fn advance_workflow_serialized( conn: &mut PgConnection, - publisher: &Publisher, round_robin_counter: &AtomicUsize, execution: &Execution, - ) -> Result<()> { + ) -> Result> { let workflow_task = match &execution.workflow_task { Some(wt) => wt, - None => return Ok(()), // Not a workflow task, nothing to do + None => return Ok(vec![]), // Not a workflow task, nothing to do }; let workflow_execution_id = workflow_task.workflow_execution; @@ -1867,7 +1893,7 @@ impl ExecutionScheduler { // Load the workflow execution record let workflow_execution = - WorkflowExecutionRepository::find_by_id(&mut *conn, workflow_execution_id) + WorkflowExecutionRepository::find_by_id_for_update(&mut *conn, workflow_execution_id) .await? .ok_or_else(|| { anyhow::anyhow!("Workflow execution {} not found", workflow_execution_id) @@ -1882,9 +1908,11 @@ impl ExecutionScheduler { "Workflow execution {} already in terminal state {:?}, skipping advance", workflow_execution_id, workflow_execution.status ); - return Ok(()); + return Ok(vec![]); } + let mut pending_messages = Vec::new(); + let parent_execution = ExecutionRepository::find_by_id(&mut *conn, workflow_execution.execution) .await? @@ -1944,7 +1972,7 @@ impl ExecutionScheduler { ); } - return Ok(()); + return Ok(pending_messages); } // Load the workflow definition so we can apply param_schema defaults @@ -2021,11 +2049,11 @@ impl ExecutionScheduler { if free_slots > 0 { if let Err(e) = Self::publish_pending_with_items_children_with_conn( &mut *conn, - publisher, &parent_for_pending, workflow_execution_id, task_name, free_slots, + &mut pending_messages, ) .await { @@ -2060,7 +2088,7 @@ impl ExecutionScheduler { workflow_task.task_index.unwrap_or(-1), siblings_remaining.len(), ); - return Ok(()); + return Ok(pending_messages); } // --------------------------------------------------------- @@ -2093,7 +2121,7 @@ impl ExecutionScheduler { another advance_workflow call already handled final completion, skipping", task_name, ); - return Ok(()); + return Ok(pending_messages); } // All items done — check if any failed @@ -2280,13 +2308,13 @@ impl ExecutionScheduler { if let Some(task_node) = graph.get_task(next_task_name) { if let Err(e) = Self::dispatch_workflow_task_with_conn( &mut *conn, - publisher, round_robin_counter, &parent_execution, &workflow_execution_id, task_node, &wf_ctx, Some(task_name), // predecessor that triggered this task + &mut pending_messages, ) .await { @@ -2349,7 +2377,7 @@ impl ExecutionScheduler { .await?; } - Ok(()) + Ok(pending_messages) } /// Count child executions that are still in progress for a workflow. @@ -3139,6 +3167,7 @@ impl ExecutionScheduler { worker_id: &i64, action_ref: &str, config: &Option, + scheduled_attempt_updated_at: DateTime, _action: &Action, ) -> Result<()> { debug!("Queuing execution {} to worker {}", execution_id, worker_id); @@ -3149,6 +3178,7 @@ impl ExecutionScheduler { worker_id: *worker_id, action_ref: action_ref.to_string(), config: config.clone(), + scheduled_attempt_updated_at, }; let envelope = diff --git a/crates/executor/src/timeout_monitor.rs b/crates/executor/src/timeout_monitor.rs index 4760e13..5b9d5ab 100644 --- a/crates/executor/src/timeout_monitor.rs +++ b/crates/executor/src/timeout_monitor.rs @@ -12,7 +12,10 @@ use anyhow::Result; use attune_common::{ models::{enums::ExecutionStatus, Execution}, mq::{MessageEnvelope, MessageType, Publisher}, - repositories::execution::SELECT_COLUMNS as EXECUTION_COLUMNS, + repositories::{ + execution::{UpdateExecutionInput, SELECT_COLUMNS as EXECUTION_COLUMNS}, + ExecutionRepository, + }, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -178,20 +181,27 @@ impl ExecutionTimeoutMonitor { "original_status": "scheduled" }); - // Update execution status in database - sqlx::query( - "UPDATE execution - SET status = $1, - result = $2, - updated = NOW() - WHERE id = $3", + let updated = ExecutionRepository::update_if_status_and_updated_before( + &self.pool, + execution_id, + ExecutionStatus::Scheduled, + self.calculate_cutoff_time(), + UpdateExecutionInput { + status: Some(ExecutionStatus::Failed), + result: Some(result.clone()), + ..Default::default() + }, ) - .bind(ExecutionStatus::Failed) - .bind(&result) - .bind(execution_id) - .execute(&self.pool) .await?; + if updated.is_none() { + debug!( + "Skipping timeout failure for execution {} because it already left Scheduled or is no longer stale", + execution_id + ); + return Ok(()); + } + info!("Execution {} marked as failed in database", execution_id); // Publish completion notification diff --git a/crates/executor/tests/fifo_ordering_integration_test.rs b/crates/executor/tests/fifo_ordering_integration_test.rs index 5023234..c24965d 100644 --- a/crates/executor/tests/fifo_ordering_integration_test.rs +++ b/crates/executor/tests/fifo_ordering_integration_test.rs @@ -912,6 +912,115 @@ async fn test_queue_stats_persistence() { cleanup_test_data(&pool, pack_id).await; } +#[tokio::test] +#[ignore] // Requires database +async fn test_release_restore_recovers_active_slot_and_next_queue_head() { + let pool = setup_db().await; + let timestamp = Utc::now().timestamp(); + let suffix = format!("restore_release_{}", timestamp); + + let pack_id = create_test_pack(&pool, &suffix).await; + let pack_ref = format!("fifo_test_pack_{}", suffix); + let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; + let action_ref = format!("fifo_test_action_{}", suffix); + + let manager = ExecutionQueueManager::with_db_pool(QueueConfig::default(), pool.clone()); + + let first = + create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + let second = + create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + let third = + create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + + manager.enqueue(action_id, first, 1, None).await.unwrap(); + manager.enqueue(action_id, second, 1, None).await.unwrap(); + manager.enqueue(action_id, third, 1, None).await.unwrap(); + + let stats = manager.get_queue_stats(action_id).await.unwrap(); + assert_eq!(stats.active_count, 1); + assert_eq!(stats.queue_length, 2); + + let release = manager + .release_active_slot(first) + .await + .unwrap() + .expect("first execution should own an active slot"); + assert_eq!(release.next_execution_id, Some(second)); + + let stats = manager.get_queue_stats(action_id).await.unwrap(); + assert_eq!(stats.active_count, 1); + assert_eq!(stats.queue_length, 1); + + manager.restore_active_slot(first, &release).await.unwrap(); + + let stats = manager.get_queue_stats(action_id).await.unwrap(); + assert_eq!(stats.active_count, 1); + assert_eq!(stats.queue_length, 2); + assert_eq!(stats.total_completed, 0); + + let next = manager + .release_active_slot(first) + .await + .unwrap() + .expect("restored execution should still own the active slot"); + assert_eq!(next.next_execution_id, Some(second)); + + cleanup_test_data(&pool, pack_id).await; +} + +#[tokio::test] +#[ignore] // Requires database +async fn test_remove_restore_recovers_queued_execution_position() { + let pool = setup_db().await; + let timestamp = Utc::now().timestamp(); + let suffix = format!("restore_queue_{}", timestamp); + + let pack_id = create_test_pack(&pool, &suffix).await; + let pack_ref = format!("fifo_test_pack_{}", suffix); + let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; + let action_ref = format!("fifo_test_action_{}", suffix); + + let manager = ExecutionQueueManager::with_db_pool(QueueConfig::default(), pool.clone()); + + let first = + create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + let second = + create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + let third = + create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + + manager.enqueue(action_id, first, 1, None).await.unwrap(); + manager.enqueue(action_id, second, 1, None).await.unwrap(); + manager.enqueue(action_id, third, 1, None).await.unwrap(); + + let removal = manager + .remove_queued_execution(second) + .await + .unwrap() + .expect("second execution should be queued"); + assert_eq!(removal.next_execution_id, None); + + let stats = manager.get_queue_stats(action_id).await.unwrap(); + assert_eq!(stats.active_count, 1); + assert_eq!(stats.queue_length, 1); + + manager.restore_queued_execution(&removal).await.unwrap(); + + let stats = manager.get_queue_stats(action_id).await.unwrap(); + assert_eq!(stats.active_count, 1); + assert_eq!(stats.queue_length, 2); + + let release = manager + .release_active_slot(first) + .await + .unwrap() + .expect("first execution should own the active slot"); + assert_eq!(release.next_execution_id, Some(second)); + + cleanup_test_data(&pool, pack_id).await; +} + #[tokio::test] #[ignore] // Requires database async fn test_queue_full_rejection() { diff --git a/docs/plans/executor-ha-horizontal-scaling.md b/docs/plans/executor-ha-horizontal-scaling.md index 529ce99..e0623f8 100644 --- a/docs/plans/executor-ha-horizontal-scaling.md +++ b/docs/plans/executor-ha-horizontal-scaling.md @@ -65,6 +65,35 @@ A workflow execution should have exactly one active mutator at a time when evalu ## Proposed Implementation Phases +## Current Status + +As of the current implementation state: + +- Phase 1 is substantially implemented. +- Phases 2, 3, 4, and 5 are implemented. + +Completed so far: + +- Atomic `requested -> scheduling` claim support was added in `ExecutionRepository`. +- Scheduler state transitions for regular action dispatch were converted to conditional/CAS-style updates. +- Redelivered `execution.requested` messages for stale `scheduling` rows are now retried/reclaimed instead of being silently acknowledged away. +- Shared concurrency/FIFO coordination now uses durable PostgreSQL admission tables for action/group slot ownership and queued execution ordering. +- `ExecutionQueueManager` now acts as a thin API-compatible facade over the DB-backed admission path when constructed with a pool. +- Slot release, queued removal, and rollback/restore flows now operate against shared DB state rather than process-local memory. +- `queue_stats` remains derived telemetry, but it is now refreshed transactionally from the shared admission state. +- Workflow start is now idempotent at the parent workflow state level via `workflow_execution(execution)` uniqueness plus repository create-or-get behavior. +- Workflow advancement now runs under a per-workflow PostgreSQL advisory lock, row-locks `workflow_execution` with `SELECT ... FOR UPDATE`, and performs serialized mutation inside an explicit SQL transaction. +- Durable workflow child dispatch dedupe is now enforced with the `workflow_task_dispatch` coordination table and repository create-or-get helpers. +- `execution` and `enforcement` were switched from Timescale hypertables back to normal PostgreSQL tables to remove HA/idempotency friction around foreign keys and unique constraints. `event` remains a hypertable, and history tables remain Timescale-backed. +- Direct uniqueness/idempotency invariants were added for `enforcement(rule, event)`, top-level `execution(enforcement)`, and `inquiry(execution)`. +- Event, enforcement, and inquiry handlers were updated to use create-or-get flows and conditional status transitions so duplicate delivery becomes safe. +- Timeout and DLQ recovery loops now use conditional state transitions and only emit side effects when the guarded update succeeds. + +Partially complete / still open: + +- HA-focused integration and failure-injection coverage still needs to be expanded around the new invariants and recovery behavior. +- The new migrations and DB-backed FIFO tests still need end-to-end validation against a real Postgres/Timescale environment. + ## Phase 1: Atomic Execution Claiming ### Objective @@ -93,6 +122,10 @@ Ensure only one executor replica can claim a `requested` execution for schedulin - Two schedulers racing on the same execution cannot both dispatch it - Redelivered `execution.requested` messages become harmless no-ops after the first successful claim +### Status + +Implemented for regular action scheduling, with additional stale-claim recovery for redelivered `execution.requested` messages. The remaining gap for this area is broader integration with the still-pending shared admission/queueing work in Phase 2. + ## Phase 2: Shared Concurrency Control and FIFO Queueing ### Objective @@ -147,6 +180,19 @@ Alternative naming is fine, but the design needs to support: - FIFO ordering holds across multiple executor replicas - Restarting an executor does not lose queue ownership state +### Status + +Implemented. + +Completed: + +- Shared admission state now lives in PostgreSQL via durable action/group queue rows and execution entry rows. +- Action-level concurrency limits and parameter-group concurrency keys are enforced against that shared admission state. +- FIFO ordering is determined by durable queued-entry order rather than process-local memory. +- Completion-time slot release promotes the next queued execution inside the same DB transaction. +- Rollback helpers can restore released slots or removed queued entries if republish/cleanup fails after the DB mutation. +- `ExecutionQueueManager` remains as a facade for the existing scheduler/policy code paths, but it no longer acts as the correctness source of truth when running with a DB pool. + ## Phase 3: Workflow Start Idempotency and Serialized Advancement ### Objective @@ -196,6 +242,20 @@ This may be implemented with explicit columns or a dedupe table if indexing the - Duplicate `execution.completed` delivery for a workflow child cannot create duplicate successor executions - Two executor replicas cannot concurrently mutate the same workflow state +### Status + +Implemented. + +Completed: + +- `workflow_execution(execution)` uniqueness is part of the workflow schema and workflow start uses create-or-get semantics. +- Workflow parent executions are claimed before orchestration starts. +- Workflow advancement now runs under a per-workflow PostgreSQL advisory lock held on the same DB connection that performs the serialized advancement work. +- The serialized workflow path is wrapped in an explicit SQL transaction. +- `workflow_execution` is row-locked with `SELECT ... FOR UPDATE` before mutation. +- Successor/child dispatch dedupe is enforced with the durable `workflow_task_dispatch` table keyed by `(workflow_execution, task_name, COALESCE(task_index, -1))`. +- Child `ExecutionRequested` messages are staged and published only after the workflow transaction commits. + ## Phase 4: Idempotent Event, Enforcement, and Inquiry Handling ### Objective @@ -241,6 +301,19 @@ WHERE event IS NOT NULL; - Duplicate `enforcement.created` does not create duplicate executions - Duplicate completion handling does not create duplicate inquiries +### Status + +Implemented. + +Completed: + +- `enforcement(rule, event)` uniqueness is enforced directly with a partial unique index when both keys are present. +- Top-level execution creation is deduped with a unique invariant on `execution(enforcement)` where `parent IS NULL`. +- Inquiry creation is deduped with a unique invariant on `inquiry(execution)`. +- `event_processor` now uses create-or-get enforcement handling and only republishes when the persisted enforcement still needs processing. +- `enforcement_processor` now skips duplicate non-`created` enforcements, creates or reuses the top-level execution, and conditionally resolves enforcement state. +- `inquiry_handler` now uses create-or-get inquiry handling and only emits `InquiryCreated` when the inquiry was actually created. + ## Phase 5: Safe Recovery Loops ### Objective @@ -273,6 +346,17 @@ Make timeout and DLQ processing safe under races and multiple replicas. - DLQ handler cannot overwrite newer state - Running multiple timeout monitors produces no conflicting state transitions +### Status + +Implemented. + +Completed: + +- Timeout failure now uses a conditional transition that only succeeds when the execution is still `scheduled` and still older than the timeout cutoff. +- Timeout-driven completion side effects are only published when that guarded update succeeds. +- DLQ handling now treats messages as stale unless the execution is still exactly `scheduled`. +- DLQ failure transitions now use conditional status updates and no longer overwrite newer `running` or terminal state. + ## Testing Plan Add focused HA tests after the repository and scheduler primitives are in place. @@ -298,13 +382,10 @@ Add focused HA tests after the repository and scheduler primitives are in place. ## Recommended Execution Order for Next Session -1. Add migrations and repository primitives for atomic execution claim -2. Convert scheduler to claim-first semantics -3. Implement shared DB-backed concurrency/FIFO coordination -4. Add workflow uniqueness and serialized advancement -5. Add idempotency to event/enforcement/inquiry paths -6. Fix timeout and DLQ handlers to use conditional transitions -7. Add HA-focused tests +1. Add more HA-focused integration tests for duplicate delivery, cross-replica completion, and recovery rollback paths +2. Add failure-injection tests for crash/replay scenarios around `scheduling` reclaim, workflow advancement, and post-commit publish paths +3. Validate the new migrations and DB-backed FIFO behavior end-to-end against a real Postgres/Timescale environment +4. Consider a small follow-up cleanup pass to reduce or remove the in-memory fallback code in `ExecutionQueueManager` once the DB path is fully baked ## Expected Outcome @@ -315,3 +396,5 @@ After this plan is implemented, the executor should be able to scale horizontall - correct workflow orchestration - safe replay handling - safe recovery behavior during failures and redelivery + +At the current state, the core executor HA phases are implemented. The remaining work is confidence-building: failure-injection coverage, multi-replica integration testing, and end-to-end migration validation in a live database environment. diff --git a/migrations/20250101000004_trigger_sensor_event_rule.sql b/migrations/20250101000004_trigger_sensor_event_rule.sql index bc0e4ad..f35a2b2 100644 --- a/migrations/20250101000004_trigger_sensor_event_rule.sql +++ b/migrations/20250101000004_trigger_sensor_event_rule.sql @@ -201,6 +201,9 @@ CREATE INDEX idx_enforcement_rule_status ON enforcement(rule, status); CREATE INDEX idx_enforcement_event_status ON enforcement(event, status); CREATE INDEX idx_enforcement_payload_gin ON enforcement USING GIN (payload); CREATE INDEX idx_enforcement_conditions_gin ON enforcement USING GIN (conditions); +CREATE UNIQUE INDEX uq_enforcement_rule_event + ON enforcement (rule, event) + WHERE rule IS NOT NULL AND event IS NOT NULL; -- Comments COMMENT ON TABLE enforcement IS 'Enforcements represent rule triggering by events'; diff --git a/migrations/20250101000005_execution_and_operations.sql b/migrations/20250101000005_execution_and_operations.sql index 076ae8c..92f5449 100644 --- a/migrations/20250101000005_execution_and_operations.sql +++ b/migrations/20250101000005_execution_and_operations.sql @@ -4,13 +4,8 @@ -- Consolidates former migrations: 000006 (execution_system), 000008 -- (worker_notification), 000014 (worker_table), and 20260209 (phase3). -- --- NOTE: The execution table is converted to a TimescaleDB hypertable in --- migration 000009. Hypertables cannot be the target of FK constraints, --- so columns referencing execution (inquiry.execution, workflow_execution.execution) --- are plain BIGINT with no FK. Similarly, columns ON the execution table that --- would self-reference or reference other hypertables (parent, enforcement, --- original_execution) are plain BIGINT. The action and executor FKs are also --- omitted since they would need to be dropped during hypertable conversion. +-- NOTE: `execution` remains a regular PostgreSQL table. Time-series +-- audit and analytics are handled by `execution_history`. -- Version: 20250101000005 -- ============================================================================ @@ -19,27 +14,27 @@ CREATE TABLE execution ( id BIGSERIAL PRIMARY KEY, - action BIGINT, -- references action(id); no FK because execution becomes a hypertable + action BIGINT, action_ref TEXT NOT NULL, config JSONB, env_vars JSONB, - parent BIGINT, -- self-reference; no FK because execution becomes a hypertable - enforcement BIGINT, -- references enforcement(id); no FK (both are hypertables) - executor BIGINT, -- references identity(id); no FK because execution becomes a hypertable - worker BIGINT, -- references worker(id); no FK because execution becomes a hypertable + parent BIGINT, + enforcement BIGINT, + executor BIGINT, + worker BIGINT, status execution_status_enum NOT NULL DEFAULT 'requested', result JSONB, started_at TIMESTAMPTZ, -- set when execution transitions to 'running' created TIMESTAMPTZ NOT NULL DEFAULT NOW(), is_workflow BOOLEAN DEFAULT false NOT NULL, - workflow_def BIGINT, -- references workflow_definition(id); no FK because execution becomes a hypertable + workflow_def BIGINT, workflow_task JSONB, -- Retry tracking (baked in from phase 3) retry_count INTEGER NOT NULL DEFAULT 0, max_retries INTEGER, retry_reason TEXT, - original_execution BIGINT, -- self-reference; no FK because execution becomes a hypertable + original_execution BIGINT, updated TIMESTAMPTZ NOT NULL DEFAULT NOW() ); @@ -64,6 +59,11 @@ CREATE INDEX idx_execution_result_gin ON execution USING GIN (result); CREATE INDEX idx_execution_env_vars_gin ON execution USING GIN (env_vars); CREATE INDEX idx_execution_original_execution ON execution(original_execution) WHERE original_execution IS NOT NULL; CREATE INDEX idx_execution_status_retry ON execution(status, retry_count) WHERE status = 'failed' AND retry_count < COALESCE(max_retries, 0); +CREATE UNIQUE INDEX uq_execution_top_level_enforcement + ON execution (enforcement) + WHERE enforcement IS NOT NULL + AND parent IS NULL + AND (config IS NULL OR NOT (config ? 'retry_of')); -- Trigger CREATE TRIGGER update_execution_updated @@ -77,10 +77,10 @@ COMMENT ON COLUMN execution.action IS 'Action being executed (may be null if act COMMENT ON COLUMN execution.action_ref IS 'Action reference (preserved even if action deleted)'; COMMENT ON COLUMN execution.config IS 'Snapshot of action configuration at execution time'; COMMENT ON COLUMN execution.env_vars IS 'Environment variables for this execution as key-value pairs (string -> string). These are set in the execution environment and are separate from action parameters. Used for execution context, configuration, and non-sensitive metadata.'; -COMMENT ON COLUMN execution.parent IS 'Parent execution ID for workflow hierarchies (no FK — execution is a hypertable)'; -COMMENT ON COLUMN execution.enforcement IS 'Enforcement that triggered this execution (no FK — both are hypertables)'; -COMMENT ON COLUMN execution.executor IS 'Identity that initiated the execution (no FK — execution is a hypertable)'; -COMMENT ON COLUMN execution.worker IS 'Assigned worker handling this execution (no FK — execution is a hypertable)'; +COMMENT ON COLUMN execution.parent IS 'Parent execution ID for workflow hierarchies'; +COMMENT ON COLUMN execution.enforcement IS 'Enforcement that triggered this execution'; +COMMENT ON COLUMN execution.executor IS 'Identity that initiated the execution'; +COMMENT ON COLUMN execution.worker IS 'Assigned worker handling this execution'; COMMENT ON COLUMN execution.status IS 'Current execution lifecycle status'; COMMENT ON COLUMN execution.result IS 'Execution output/results'; COMMENT ON COLUMN execution.retry_count IS 'Current retry attempt number (0 = first attempt, 1 = first retry, etc.)'; @@ -96,7 +96,7 @@ COMMENT ON COLUMN execution.original_execution IS 'ID of the original execution CREATE TABLE inquiry ( id BIGSERIAL PRIMARY KEY, - execution BIGINT NOT NULL, -- references execution(id); no FK because execution is a hypertable + execution BIGINT NOT NULL, prompt TEXT NOT NULL, response_schema JSONB, assigned_to BIGINT REFERENCES identity(id) ON DELETE SET NULL, @@ -109,7 +109,7 @@ CREATE TABLE inquiry ( ); -- Indexes -CREATE INDEX idx_inquiry_execution ON inquiry(execution); +CREATE UNIQUE INDEX uq_inquiry_execution ON inquiry(execution) WHERE execution IS NOT NULL; CREATE INDEX idx_inquiry_assigned_to ON inquiry(assigned_to); CREATE INDEX idx_inquiry_status ON inquiry(status); CREATE INDEX idx_inquiry_timeout_at ON inquiry(timeout_at) WHERE timeout_at IS NOT NULL; @@ -127,7 +127,31 @@ CREATE TRIGGER update_inquiry_updated -- Comments COMMENT ON TABLE inquiry IS 'Inquiries enable human-in-the-loop workflows with async user interactions'; -COMMENT ON COLUMN inquiry.execution IS 'Execution that is waiting on this inquiry (no FK — execution is a hypertable)'; +COMMENT ON COLUMN inquiry.execution IS 'Execution that is waiting on this inquiry'; + +ALTER TABLE execution + ADD CONSTRAINT execution_action_fkey + FOREIGN KEY (action) REFERENCES action(id) ON DELETE SET NULL; + +ALTER TABLE execution + ADD CONSTRAINT execution_parent_fkey + FOREIGN KEY (parent) REFERENCES execution(id) ON DELETE SET NULL; + +ALTER TABLE execution + ADD CONSTRAINT execution_original_execution_fkey + FOREIGN KEY (original_execution) REFERENCES execution(id) ON DELETE SET NULL; + +ALTER TABLE execution + ADD CONSTRAINT execution_enforcement_fkey + FOREIGN KEY (enforcement) REFERENCES enforcement(id) ON DELETE SET NULL; + +ALTER TABLE execution + ADD CONSTRAINT execution_executor_fkey + FOREIGN KEY (executor) REFERENCES identity(id) ON DELETE SET NULL; + +ALTER TABLE inquiry + ADD CONSTRAINT inquiry_execution_fkey + FOREIGN KEY (execution) REFERENCES execution(id) ON DELETE CASCADE; COMMENT ON COLUMN inquiry.prompt IS 'Question or prompt text for the user'; COMMENT ON COLUMN inquiry.response_schema IS 'JSON schema defining expected response format'; COMMENT ON COLUMN inquiry.assigned_to IS 'Identity who should respond to this inquiry'; @@ -261,6 +285,10 @@ COMMENT ON COLUMN worker.capabilities IS 'Worker capabilities (e.g., max_concurr COMMENT ON COLUMN worker.meta IS 'Additional worker metadata'; COMMENT ON COLUMN worker.last_heartbeat IS 'Timestamp of last heartbeat from worker'; +ALTER TABLE execution + ADD CONSTRAINT execution_worker_fkey + FOREIGN KEY (worker) REFERENCES worker(id) ON DELETE SET NULL; + -- ============================================================================ -- NOTIFICATION TABLE -- ============================================================================ diff --git a/migrations/20250101000006_workflow_system.sql b/migrations/20250101000006_workflow_system.sql index 62e07b3..6fd84d0 100644 --- a/migrations/20250101000006_workflow_system.sql +++ b/migrations/20250101000006_workflow_system.sql @@ -1,13 +1,11 @@ -- Migration: Workflow System --- Description: Creates workflow_definition and workflow_execution tables +-- Description: Creates workflow_definition, workflow_execution, and +-- workflow_task_dispatch tables -- (workflow_task_execution consolidated into execution.workflow_task JSONB) -- --- NOTE: The execution table is converted to a TimescaleDB hypertable in --- migration 000009. Hypertables cannot be the target of FK constraints, --- so workflow_execution.execution is a plain BIGINT with no FK. --- execution.workflow_def also has no FK (added as plain BIGINT in 000005) --- since execution is a hypertable and FKs from hypertables are only --- supported for simple cases — we omit it for consistency. +-- NOTE: `execution` remains a regular PostgreSQL table, so +-- workflow_execution.execution, workflow_task_dispatch.execution_id, +-- and execution.workflow_def use normal foreign keys. -- Version: 20250101000006 -- ============================================================================ @@ -54,7 +52,7 @@ COMMENT ON COLUMN workflow_definition.out_schema IS 'JSON schema for workflow ou CREATE TABLE workflow_execution ( id BIGSERIAL PRIMARY KEY, - execution BIGINT NOT NULL, -- references execution(id); no FK because execution is a hypertable + execution BIGINT NOT NULL REFERENCES execution(id) ON DELETE CASCADE, workflow_def BIGINT NOT NULL REFERENCES workflow_definition(id) ON DELETE CASCADE, current_tasks TEXT[] DEFAULT '{}', completed_tasks TEXT[] DEFAULT '{}', @@ -83,12 +81,51 @@ CREATE TRIGGER update_workflow_execution_updated EXECUTE FUNCTION update_updated_column(); -- Comments -COMMENT ON TABLE workflow_execution IS 'Runtime state tracking for workflow executions. execution column has no FK — execution is a hypertable.'; +COMMENT ON TABLE workflow_execution IS 'Runtime state tracking for workflow executions.'; COMMENT ON COLUMN workflow_execution.variables IS 'Workflow-scoped variables, updated via publish directives'; COMMENT ON COLUMN workflow_execution.task_graph IS 'Execution graph with dependencies and transitions'; COMMENT ON COLUMN workflow_execution.current_tasks IS 'Array of task names currently executing'; COMMENT ON COLUMN workflow_execution.paused IS 'True if workflow execution is paused (can be resumed)'; +-- ============================================================================ +-- WORKFLOW TASK DISPATCH TABLE +-- ============================================================================ + +CREATE TABLE workflow_task_dispatch ( + id BIGSERIAL PRIMARY KEY, + workflow_execution BIGINT NOT NULL REFERENCES workflow_execution(id) ON DELETE CASCADE, + task_name TEXT NOT NULL, + task_index INT, + execution_id BIGINT, + created TIMESTAMPTZ DEFAULT NOW() NOT NULL, + updated TIMESTAMPTZ DEFAULT NOW() NOT NULL +); + +CREATE UNIQUE INDEX uq_workflow_task_dispatch_identity + ON workflow_task_dispatch ( + workflow_execution, + task_name, + COALESCE(task_index, -1) + ); + +CREATE INDEX idx_workflow_task_dispatch_execution_id + ON workflow_task_dispatch (execution_id) + WHERE execution_id IS NOT NULL; + +CREATE TRIGGER update_workflow_task_dispatch_updated + BEFORE UPDATE ON workflow_task_dispatch + FOR EACH ROW + EXECUTE FUNCTION update_updated_column(); + +COMMENT ON TABLE workflow_task_dispatch IS + 'Durable dedupe/ownership records for workflow child execution dispatch'; +COMMENT ON COLUMN workflow_task_dispatch.execution_id IS + 'Associated execution.id'; + +ALTER TABLE workflow_task_dispatch + ADD CONSTRAINT workflow_task_dispatch_execution_id_fkey + FOREIGN KEY (execution_id) REFERENCES execution(id) ON DELETE CASCADE; + -- ============================================================================ -- MODIFY ACTION TABLE - Add Workflow Support -- ============================================================================ @@ -100,9 +137,9 @@ CREATE INDEX idx_action_workflow_def ON action(workflow_def); COMMENT ON COLUMN action.workflow_def IS 'Reference to workflow definition (non-null means this action is a workflow)'; --- NOTE: execution.workflow_def has no FK constraint because execution is a --- TimescaleDB hypertable (converted in migration 000009). The column was --- created as a plain BIGINT in migration 000005. +ALTER TABLE execution + ADD CONSTRAINT execution_workflow_def_fkey + FOREIGN KEY (workflow_def) REFERENCES workflow_definition(id) ON DELETE SET NULL; -- ============================================================================ -- WORKFLOW VIEWS diff --git a/migrations/20250101000007_supporting_systems.sql b/migrations/20250101000007_supporting_systems.sql index 9fbe942..3fa1b50 100644 --- a/migrations/20250101000007_supporting_systems.sql +++ b/migrations/20250101000007_supporting_systems.sql @@ -1,6 +1,6 @@ -- Migration: Supporting Systems --- Description: Creates keys, artifacts, queue_stats, pack_environment, pack_testing, --- and webhook function tables. +-- Description: Creates keys, artifacts, queue_stats, execution_admission, +-- pack_environment, pack_testing, and webhook function tables. -- Consolidates former migrations: 000009 (keys_artifacts), 000010 (webhook_system), -- 000011 (pack_environments), and 000012 (pack_testing). -- Version: 20250101000007 @@ -206,6 +206,76 @@ COMMENT ON COLUMN queue_stats.total_enqueued IS 'Total executions enqueued since COMMENT ON COLUMN queue_stats.total_completed IS 'Total executions completed since queue creation'; COMMENT ON COLUMN queue_stats.last_updated IS 'Timestamp of last statistics update'; +-- ============================================================================ +-- EXECUTION ADMISSION TABLES +-- ============================================================================ + +CREATE TABLE execution_admission_state ( + id BIGSERIAL PRIMARY KEY, + action_id BIGINT NOT NULL REFERENCES action(id) ON DELETE CASCADE, + group_key TEXT, + group_key_normalized TEXT GENERATED ALWAYS AS (COALESCE(group_key, '')) STORED, + max_concurrent INTEGER NOT NULL, + next_queue_order BIGINT NOT NULL DEFAULT 1, + total_enqueued BIGINT NOT NULL DEFAULT 0, + total_completed BIGINT NOT NULL DEFAULT 0, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated TIMESTAMPTZ NOT NULL DEFAULT NOW(), + CONSTRAINT uq_execution_admission_state_identity + UNIQUE (action_id, group_key_normalized) +); + +CREATE TABLE execution_admission_entry ( + id BIGSERIAL PRIMARY KEY, + state_id BIGINT NOT NULL REFERENCES execution_admission_state(id) ON DELETE CASCADE, + execution_id BIGINT NOT NULL UNIQUE REFERENCES execution(id) ON DELETE CASCADE, + status TEXT NOT NULL CHECK (status IN ('active', 'queued')), + queue_order BIGINT NOT NULL, + enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + activated_at TIMESTAMPTZ, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_execution_admission_state_action + ON execution_admission_state (action_id); + +CREATE INDEX idx_execution_admission_entry_state_status_queue + ON execution_admission_entry (state_id, status, queue_order); + +CREATE INDEX idx_execution_admission_entry_execution + ON execution_admission_entry (execution_id); + +CREATE TRIGGER update_execution_admission_state_updated + BEFORE UPDATE ON execution_admission_state + FOR EACH ROW + EXECUTE FUNCTION update_updated_column(); + +CREATE TRIGGER update_execution_admission_entry_updated + BEFORE UPDATE ON execution_admission_entry + FOR EACH ROW + EXECUTE FUNCTION update_updated_column(); + +COMMENT ON TABLE execution_admission_state IS + 'Shared admission state per action/group for executor concurrency and FIFO coordination'; +COMMENT ON COLUMN execution_admission_state.group_key IS + 'Optional parameter-derived concurrency grouping key'; +COMMENT ON COLUMN execution_admission_state.max_concurrent IS + 'Current concurrency limit for this action/group queue'; +COMMENT ON COLUMN execution_admission_state.next_queue_order IS + 'Monotonic sequence used to preserve exact FIFO order for queued executions'; +COMMENT ON COLUMN execution_admission_state.total_enqueued IS + 'Cumulative number of executions admitted into this queue'; +COMMENT ON COLUMN execution_admission_state.total_completed IS + 'Cumulative number of active executions released from this queue'; + +COMMENT ON TABLE execution_admission_entry IS + 'Active slot ownership and queued executions for shared admission control'; +COMMENT ON COLUMN execution_admission_entry.status IS + 'active rows own a concurrency slot; queued rows wait in FIFO order'; +COMMENT ON COLUMN execution_admission_entry.queue_order IS + 'Durable FIFO position within an action/group queue'; + -- ============================================================================ -- PACK ENVIRONMENT TABLE -- ============================================================================ diff --git a/migrations/20250101000009_timescaledb_history.sql b/migrations/20250101000009_timescaledb_history.sql index 4499bc5..fa6352f 100644 --- a/migrations/20250101000009_timescaledb_history.sql +++ b/migrations/20250101000009_timescaledb_history.sql @@ -143,52 +143,8 @@ SELECT create_hypertable('event', 'created', COMMENT ON TABLE event IS 'Events are instances of triggers firing (TimescaleDB hypertable partitioned on created)'; --- ============================================================================ --- CONVERT ENFORCEMENT TABLE TO HYPERTABLE --- ============================================================================ --- Enforcements are created and then updated exactly once (status changes from --- `created` to `processed` or `disabled` within ~1 second). This single update --- happens well before the 7-day compression window, so UPDATE on uncompressed --- chunks works without issues. --- --- No FK constraints reference enforcement(id) — execution.enforcement was --- created as a plain BIGINT in migration 000005. --- ---------------------------------------------------------------------------- - -ALTER TABLE enforcement DROP CONSTRAINT enforcement_pkey; -ALTER TABLE enforcement ADD PRIMARY KEY (id, created); - -SELECT create_hypertable('enforcement', 'created', - chunk_time_interval => INTERVAL '1 day', - migrate_data => true); - -COMMENT ON TABLE enforcement IS 'Enforcements represent rule triggering by events (TimescaleDB hypertable partitioned on created)'; - --- ============================================================================ --- CONVERT EXECUTION TABLE TO HYPERTABLE --- ============================================================================ --- Executions are updated ~4 times during their lifecycle (requested → scheduled --- → running → completed/failed), completing within at most ~1 day — well before --- the 7-day compression window. The `updated` column and its BEFORE UPDATE --- trigger are preserved (used by timeout monitor and UI). --- --- No FK constraints reference execution(id) — inquiry.execution, --- workflow_execution.execution, execution.parent, and execution.original_execution --- were all created as plain BIGINT columns in migrations 000005 and 000006. --- --- The existing execution_history hypertable and its trigger are preserved — --- they track field-level diffs of each update, which remains valuable for --- a mutable table. --- ---------------------------------------------------------------------------- - -ALTER TABLE execution DROP CONSTRAINT execution_pkey; -ALTER TABLE execution ADD PRIMARY KEY (id, created); - -SELECT create_hypertable('execution', 'created', - chunk_time_interval => INTERVAL '1 day', - migrate_data => true); - -COMMENT ON TABLE execution IS 'Executions represent action runs with workflow support (TimescaleDB hypertable partitioned on created). Updated ~4 times during lifecycle, completing within ~1 day (well before 7-day compression window).'; +COMMENT ON TABLE enforcement IS 'Enforcements represent rule triggering by events'; +COMMENT ON TABLE execution IS 'Executions represent action runs with workflow support. History and analytics are stored in execution_history.'; -- ============================================================================ -- TRIGGER FUNCTIONS @@ -410,22 +366,6 @@ ALTER TABLE event SET ( ); SELECT add_compression_policy('event', INTERVAL '7 days'); --- Enforcement table (hypertable) -ALTER TABLE enforcement SET ( - timescaledb.compress, - timescaledb.compress_segmentby = 'rule_ref', - timescaledb.compress_orderby = 'created DESC' -); -SELECT add_compression_policy('enforcement', INTERVAL '7 days'); - --- Execution table (hypertable) -ALTER TABLE execution SET ( - timescaledb.compress, - timescaledb.compress_segmentby = 'action_ref', - timescaledb.compress_orderby = 'created DESC' -); -SELECT add_compression_policy('execution', INTERVAL '7 days'); - -- ============================================================================ -- RETENTION POLICIES -- ============================================================================ @@ -433,8 +373,6 @@ SELECT add_compression_policy('execution', INTERVAL '7 days'); SELECT add_retention_policy('execution_history', INTERVAL '90 days'); SELECT add_retention_policy('worker_history', INTERVAL '180 days'); SELECT add_retention_policy('event', INTERVAL '90 days'); -SELECT add_retention_policy('enforcement', INTERVAL '90 days'); -SELECT add_retention_policy('execution', INTERVAL '90 days'); -- ============================================================================ -- CONTINUOUS AGGREGATES @@ -449,6 +387,8 @@ DROP MATERIALIZED VIEW IF EXISTS event_volume_hourly CASCADE; DROP MATERIALIZED VIEW IF EXISTS worker_status_hourly CASCADE; DROP MATERIALIZED VIEW IF EXISTS enforcement_volume_hourly CASCADE; DROP MATERIALIZED VIEW IF EXISTS execution_volume_hourly CASCADE; +DROP VIEW IF EXISTS enforcement_volume_hourly CASCADE; +DROP VIEW IF EXISTS execution_volume_hourly CASCADE; -- ---------------------------------------------------------------------------- -- execution_status_hourly @@ -553,49 +493,35 @@ SELECT add_continuous_aggregate_policy('worker_status_hourly', -- instead of a separate enforcement_history table. -- ---------------------------------------------------------------------------- -CREATE MATERIALIZED VIEW enforcement_volume_hourly -WITH (timescaledb.continuous) AS +CREATE VIEW enforcement_volume_hourly AS SELECT - time_bucket('1 hour', created) AS bucket, + date_trunc('hour', created) AS bucket, rule_ref, COUNT(*) AS enforcement_count FROM enforcement GROUP BY bucket, rule_ref -WITH NO DATA; - -SELECT add_continuous_aggregate_policy('enforcement_volume_hourly', - start_offset => INTERVAL '7 days', - end_offset => INTERVAL '1 hour', - schedule_interval => INTERVAL '30 minutes' -); +; -- ---------------------------------------------------------------------------- -- execution_volume_hourly -- Tracks execution creation volume per hour by action_ref and status. --- This queries the execution hypertable directly (like event_volume_hourly --- queries the event table). Complements the existing execution_status_hourly --- and execution_throughput_hourly aggregates which query execution_history. +-- This queries the execution table directly. Complements the existing +-- execution_status_hourly and execution_throughput_hourly aggregates which +-- query execution_history. -- -- Use case: direct execution volume monitoring without relying on the history -- trigger (belt-and-suspenders, plus captures the initial status at creation). -- ---------------------------------------------------------------------------- -CREATE MATERIALIZED VIEW execution_volume_hourly -WITH (timescaledb.continuous) AS +CREATE VIEW execution_volume_hourly AS SELECT - time_bucket('1 hour', created) AS bucket, + date_trunc('hour', created) AS bucket, action_ref, status AS initial_status, COUNT(*) AS execution_count FROM execution GROUP BY bucket, action_ref, status -WITH NO DATA; - -SELECT add_continuous_aggregate_policy('execution_volume_hourly', - start_offset => INTERVAL '7 days', - end_offset => INTERVAL '1 hour', - schedule_interval => INTERVAL '30 minutes' -); +; -- ============================================================================ -- INITIAL REFRESH NOTE diff --git a/migrations/20250101000010_artifact_content.sql b/migrations/20250101000010_artifact_content.sql index e136192..0065614 100644 --- a/migrations/20250101000010_artifact_content.sql +++ b/migrations/20250101000010_artifact_content.sql @@ -26,7 +26,7 @@ ALTER TABLE artifact ADD COLUMN IF NOT EXISTS content_type TEXT; -- Total size in bytes of the latest version's content (NULL for progress artifacts) ALTER TABLE artifact ADD COLUMN IF NOT EXISTS size_bytes BIGINT; --- Execution that produced/owns this artifact (plain BIGINT, no FK — execution is a hypertable) +-- Execution that produced/owns this artifact (plain BIGINT, no FK by design) ALTER TABLE artifact ADD COLUMN IF NOT EXISTS execution BIGINT; -- Structured data for progress-type artifacts and small structured payloads. @@ -52,7 +52,7 @@ COMMENT ON COLUMN artifact.name IS 'Human-readable artifact name'; COMMENT ON COLUMN artifact.description IS 'Optional description of the artifact'; COMMENT ON COLUMN artifact.content_type IS 'MIME content type (e.g. application/json, text/plain)'; COMMENT ON COLUMN artifact.size_bytes IS 'Size of latest version content in bytes'; -COMMENT ON COLUMN artifact.execution IS 'Execution that produced this artifact (no FK — execution is a hypertable)'; +COMMENT ON COLUMN artifact.execution IS 'Execution that produced this artifact (no FK by design)'; COMMENT ON COLUMN artifact.data IS 'Structured JSONB data for progress artifacts or metadata'; COMMENT ON COLUMN artifact.visibility IS 'Access visibility: public (all users) or private (scope/owner-restricted)';