ha executor
Some checks failed
CI / Rustfmt (pull_request) Successful in 19s
CI / Cargo Audit & Deny (pull_request) Successful in 33s
CI / Security Blocking Checks (pull_request) Successful in 5s
CI / Web Blocking Checks (pull_request) Successful in 49s
CI / Web Advisory Checks (pull_request) Successful in 33s
CI / Clippy (pull_request) Has been cancelled
CI / Security Advisory Checks (pull_request) Has been cancelled
CI / Tests (pull_request) Has been cancelled

This commit is contained in:
2026-04-02 17:15:59 -05:00
parent 8e91440f23
commit f93e9229d2
25 changed files with 2736 additions and 422 deletions

View File

@@ -1412,7 +1412,7 @@ pub mod artifact {
pub content_type: Option<String>,
/// Size of the latest version's content in bytes
pub size_bytes: Option<i64>,
/// Execution that produced this artifact (no FK — execution is a hypertable)
/// Execution that produced this artifact (no FK by design)
pub execution: Option<Id>,
/// Structured JSONB data for progress artifacts or metadata
pub data: Option<serde_json::Value>,

View File

@@ -80,7 +80,7 @@ pub struct EnforcementVolumeBucket {
pub enforcement_count: i64,
}
/// A single hourly bucket of execution volume (from execution hypertable directly).
/// A single hourly bucket of execution volume (from the execution table directly).
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct ExecutionVolumeBucket {
/// Start of the 1-hour bucket
@@ -468,7 +468,7 @@ impl AnalyticsRepository {
}
// =======================================================================
// Execution volume (from execution hypertable directly)
// Execution volume (from the execution table directly)
// =======================================================================
/// Query the `execution_volume_hourly` continuous aggregate for execution

View File

@@ -65,6 +65,12 @@ pub struct EnforcementSearchResult {
pub total: u64,
}
#[derive(Debug, Clone)]
pub struct EnforcementCreateOrGetResult {
pub enforcement: Enforcement,
pub created: bool,
}
/// Repository for Event operations
pub struct EventRepository;
@@ -493,11 +499,7 @@ impl EnforcementRepository {
Ok(enforcement)
}
/// Update an enforcement using the loaded row's hypertable keys.
///
/// This avoids wide scans across compressed chunks by including both the
/// partitioning column (`created`) and compression segment key (`rule_ref`)
/// in the locator.
/// Update an enforcement using the loaded row's primary key.
pub async fn update_loaded<'e, E>(
executor: E,
enforcement: &Enforcement,
@@ -510,19 +512,73 @@ impl EnforcementRepository {
return Ok(enforcement.clone());
}
let rule_ref = enforcement.rule_ref.clone();
Self::update_with_locator(executor, input, |query| {
query.push(" WHERE id = ");
query.push_bind(enforcement.id);
query.push(" AND created = ");
query.push_bind(enforcement.created);
query.push(" AND rule_ref = ");
query.push_bind(rule_ref);
})
.await
}
pub async fn update_loaded_if_status<'e, E>(
executor: E,
enforcement: &Enforcement,
expected_status: EnforcementStatus,
input: UpdateEnforcementInput,
) -> Result<Option<Enforcement>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
if input.status.is_none() && input.payload.is_none() && input.resolved_at.is_none() {
return Ok(Some(enforcement.clone()));
}
let mut query = QueryBuilder::new("UPDATE enforcement SET ");
let mut has_updates = false;
if let Some(status) = input.status {
query.push("status = ");
query.push_bind(status);
has_updates = true;
}
if let Some(payload) = &input.payload {
if has_updates {
query.push(", ");
}
query.push("payload = ");
query.push_bind(payload);
has_updates = true;
}
if let Some(resolved_at) = input.resolved_at {
if has_updates {
query.push(", ");
}
query.push("resolved_at = ");
query.push_bind(resolved_at);
has_updates = true;
}
if !has_updates {
return Ok(Some(enforcement.clone()));
}
query.push(" WHERE id = ");
query.push_bind(enforcement.id);
query.push(" AND status = ");
query.push_bind(expected_status);
query.push(
" RETURNING id, rule, rule_ref, trigger_ref, config, event, status, payload, \
condition, conditions, created, resolved_at",
);
query
.build_query_as::<Enforcement>()
.fetch_optional(executor)
.await
.map_err(Into::into)
}
/// Find enforcements by rule ID
pub async fn find_by_rule<'e, E>(executor: E, rule_id: Id) -> Result<Vec<Enforcement>>
where
@@ -589,6 +645,90 @@ impl EnforcementRepository {
Ok(enforcements)
}
pub async fn find_by_rule_and_event<'e, E>(
executor: E,
rule_id: Id,
event_id: Id,
) -> Result<Option<Enforcement>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
sqlx::query_as::<_, Enforcement>(
r#"
SELECT id, rule, rule_ref, trigger_ref, config, event, status, payload,
condition, conditions, created, resolved_at
FROM enforcement
WHERE rule = $1 AND event = $2
LIMIT 1
"#,
)
.bind(rule_id)
.bind(event_id)
.fetch_optional(executor)
.await
.map_err(Into::into)
}
pub async fn create_or_get_by_rule_event<'e, E>(
executor: E,
input: CreateEnforcementInput,
) -> Result<EnforcementCreateOrGetResult>
where
E: Executor<'e, Database = Postgres> + Copy + 'e,
{
let (Some(rule_id), Some(event_id)) = (input.rule, input.event) else {
let enforcement = Self::create(executor, input).await?;
return Ok(EnforcementCreateOrGetResult {
enforcement,
created: true,
});
};
let inserted = sqlx::query_as::<_, Enforcement>(
r#"
INSERT INTO enforcement (rule, rule_ref, trigger_ref, config, event, status,
payload, condition, conditions)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (rule, event) WHERE rule IS NOT NULL AND event IS NOT NULL DO NOTHING
RETURNING id, rule, rule_ref, trigger_ref, config, event, status, payload,
condition, conditions, created, resolved_at
"#,
)
.bind(input.rule)
.bind(&input.rule_ref)
.bind(&input.trigger_ref)
.bind(&input.config)
.bind(input.event)
.bind(input.status)
.bind(&input.payload)
.bind(input.condition)
.bind(&input.conditions)
.fetch_optional(executor)
.await?;
if let Some(enforcement) = inserted {
return Ok(EnforcementCreateOrGetResult {
enforcement,
created: true,
});
}
let enforcement = Self::find_by_rule_and_event(executor, rule_id, event_id)
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"enforcement for rule {} and event {} disappeared after dedupe conflict",
rule_id,
event_id
)
})?;
Ok(EnforcementCreateOrGetResult {
enforcement,
created: false,
})
}
/// Search enforcements with all filters pushed into SQL.
///
/// All filter fields are combinable (AND). Pagination is server-side.

View File

@@ -4,7 +4,8 @@ use chrono::{DateTime, Utc};
use crate::models::{enums::ExecutionStatus, execution::*, Id, JsonDict};
use crate::Result;
use sqlx::{Executor, Postgres, QueryBuilder};
use sqlx::{Executor, PgConnection, PgPool, Postgres, QueryBuilder};
use tokio::time::{sleep, Duration};
use super::{Create, Delete, FindById, List, Repository, Update};
@@ -47,6 +48,12 @@ pub struct WorkflowTaskExecutionCreateOrGetResult {
pub created: bool,
}
#[derive(Debug, Clone)]
pub struct EnforcementExecutionCreateOrGetResult {
pub execution: Execution,
pub created: bool,
}
/// An execution row with optional `rule_ref` / `trigger_ref` populated from
/// the joined `enforcement` table. This avoids a separate in-memory lookup.
#[derive(Debug, Clone, sqlx::FromRow)]
@@ -215,34 +222,394 @@ impl Update for ExecutionRepository {
}
impl ExecutionRepository {
pub async fn create_workflow_task_if_absent<'e, E>(
pub async fn find_top_level_by_enforcement<'e, E>(
executor: E,
enforcement_id: Id,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let sql = format!(
"SELECT {SELECT_COLUMNS} \
FROM execution \
WHERE enforcement = $1
AND parent IS NULL
AND (config IS NULL OR NOT (config ? 'retry_of')) \
ORDER BY created ASC \
LIMIT 1"
);
sqlx::query_as::<_, Execution>(&sql)
.bind(enforcement_id)
.fetch_optional(executor)
.await
.map_err(Into::into)
}
pub async fn create_top_level_for_enforcement_if_absent<'e, E>(
executor: E,
input: CreateExecutionInput,
enforcement_id: Id,
) -> Result<EnforcementExecutionCreateOrGetResult>
where
E: Executor<'e, Database = Postgres> + Copy + 'e,
{
let inserted = sqlx::query_as::<_, Execution>(&format!(
"INSERT INTO execution \
(action, action_ref, config, env_vars, parent, enforcement, executor, worker, status, result, workflow_task) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \
ON CONFLICT (enforcement)
WHERE enforcement IS NOT NULL
AND parent IS NULL
AND (config IS NULL OR NOT (config ? 'retry_of'))
DO NOTHING \
RETURNING {SELECT_COLUMNS}"
))
.bind(input.action)
.bind(&input.action_ref)
.bind(&input.config)
.bind(&input.env_vars)
.bind(input.parent)
.bind(input.enforcement)
.bind(input.executor)
.bind(input.worker)
.bind(input.status)
.bind(&input.result)
.bind(sqlx::types::Json(&input.workflow_task))
.fetch_optional(executor)
.await?;
if let Some(execution) = inserted {
return Ok(EnforcementExecutionCreateOrGetResult {
execution,
created: true,
});
}
let execution = Self::find_top_level_by_enforcement(executor, enforcement_id)
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"top-level execution for enforcement {} disappeared after dedupe conflict",
enforcement_id
)
})?;
Ok(EnforcementExecutionCreateOrGetResult {
execution,
created: false,
})
}
async fn claim_workflow_task_dispatch<'e, E>(
executor: E,
workflow_execution_id: Id,
task_name: &str,
task_index: Option<i32>,
) -> Result<bool>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let inserted: Option<(i64,)> = sqlx::query_as(
"INSERT INTO workflow_task_dispatch (workflow_execution, task_name, task_index)
VALUES ($1, $2, $3)
ON CONFLICT (workflow_execution, task_name, COALESCE(task_index, -1)) DO NOTHING
RETURNING id",
)
.bind(workflow_execution_id)
.bind(task_name)
.bind(task_index)
.fetch_optional(executor)
.await?;
Ok(inserted.is_some())
}
async fn assign_workflow_task_dispatch_execution<'e, E>(
executor: E,
workflow_execution_id: Id,
task_name: &str,
task_index: Option<i32>,
execution_id: Id,
) -> Result<()>
where
E: Executor<'e, Database = Postgres> + 'e,
{
sqlx::query(
"UPDATE workflow_task_dispatch
SET execution_id = COALESCE(execution_id, $4)
WHERE workflow_execution = $1
AND task_name = $2
AND task_index IS NOT DISTINCT FROM $3",
)
.bind(workflow_execution_id)
.bind(task_name)
.bind(task_index)
.bind(execution_id)
.execute(executor)
.await?;
Ok(())
}
async fn lock_workflow_task_dispatch<'e, E>(
executor: E,
workflow_execution_id: Id,
task_name: &str,
task_index: Option<i32>,
) -> Result<Option<Option<Id>>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let row: Option<(Option<i64>,)> = sqlx::query_as(
"SELECT execution_id
FROM workflow_task_dispatch
WHERE workflow_execution = $1
AND task_name = $2
AND task_index IS NOT DISTINCT FROM $3
FOR UPDATE",
)
.bind(workflow_execution_id)
.bind(task_name)
.bind(task_index)
.fetch_optional(executor)
.await?;
// Map the outer Option to distinguish three cases:
// - None → no row exists
// - Some(None) → row exists but execution_id is still NULL (mid-creation)
// - Some(Some(id)) → row exists with a completed execution_id
Ok(row.map(|(execution_id,)| execution_id))
}
async fn create_workflow_task_if_absent_in_conn(
conn: &mut PgConnection,
input: CreateExecutionInput,
workflow_execution_id: Id,
task_name: &str,
task_index: Option<i32>,
) -> Result<WorkflowTaskExecutionCreateOrGetResult>
where
E: Executor<'e, Database = Postgres> + Copy + 'e,
{
if let Some(execution) =
Self::find_by_workflow_task(executor, workflow_execution_id, task_name, task_index)
) -> Result<WorkflowTaskExecutionCreateOrGetResult> {
let claimed = Self::claim_workflow_task_dispatch(
&mut *conn,
workflow_execution_id,
task_name,
task_index,
)
.await?;
if claimed {
let execution = Self::create(&mut *conn, input).await?;
Self::assign_workflow_task_dispatch_execution(
&mut *conn,
workflow_execution_id,
task_name,
task_index,
execution.id,
)
.await?;
return Ok(WorkflowTaskExecutionCreateOrGetResult {
execution,
created: true,
});
}
let dispatch_state = Self::lock_workflow_task_dispatch(
&mut *conn,
workflow_execution_id,
task_name,
task_index,
)
.await?;
match dispatch_state {
Some(Some(existing_execution_id)) => {
// Row exists with execution_id — return the existing execution.
let execution = Self::find_by_id(&mut *conn, existing_execution_id)
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"workflow child execution {} missing for workflow_execution {} task '{}' index {:?}",
existing_execution_id,
workflow_execution_id,
task_name,
task_index
)
})?;
Ok(WorkflowTaskExecutionCreateOrGetResult {
execution,
created: false,
})
}
Some(None) => {
// Row exists but execution_id is still NULL: another transaction is
// mid-creation (between claim and assign). Retry until it's filled in.
// If the original creator's transaction rolled back, the row also
// disappears — handled by the `None` branch inside the loop.
'wait: {
for _ in 0..20_u32 {
sleep(Duration::from_millis(50)).await;
match Self::lock_workflow_task_dispatch(
&mut *conn,
workflow_execution_id,
task_name,
task_index,
)
.await?
{
Some(Some(execution_id)) => {
let execution =
Self::find_by_id(&mut *conn, execution_id).await?.ok_or_else(
|| {
anyhow::anyhow!(
"workflow child execution {} missing for workflow_execution {} task '{}' index {:?}",
execution_id,
workflow_execution_id,
task_name,
task_index
)
},
)?;
return Ok(WorkflowTaskExecutionCreateOrGetResult {
execution,
created: false,
});
}
Some(None) => {} // still NULL, keep waiting
None => break 'wait, // row rolled back; fall through to re-claim
}
}
// Exhausted all retries without the execution_id being set.
return Err(anyhow::anyhow!(
"Timed out waiting for workflow task dispatch execution_id to be set \
for workflow_execution {} task '{}' index {:?}",
workflow_execution_id,
task_name,
task_index
)
.into());
}
let execution = Self::create(executor, input).await?;
// Row disappeared (original creator rolled back) — re-claim and create.
let re_claimed = Self::claim_workflow_task_dispatch(
&mut *conn,
workflow_execution_id,
task_name,
task_index,
)
.await?;
if !re_claimed {
return Err(anyhow::anyhow!(
"Workflow task dispatch for workflow_execution {} task '{}' index {:?} \
was reclaimed by another executor after rollback",
workflow_execution_id,
task_name,
task_index
)
.into());
}
let execution = Self::create(&mut *conn, input).await?;
Self::assign_workflow_task_dispatch_execution(
&mut *conn,
workflow_execution_id,
task_name,
task_index,
execution.id,
)
.await?;
Ok(WorkflowTaskExecutionCreateOrGetResult {
execution,
created: true,
})
}
None => {
// No row at all — the original INSERT was rolled back before we arrived.
// Attempt to re-claim and create as if this were a fresh dispatch.
let re_claimed = Self::claim_workflow_task_dispatch(
&mut *conn,
workflow_execution_id,
task_name,
task_index,
)
.await?;
if !re_claimed {
return Err(anyhow::anyhow!(
"Workflow task dispatch for workflow_execution {} task '{}' index {:?} \
was claimed by another executor",
workflow_execution_id,
task_name,
task_index
)
.into());
}
let execution = Self::create(&mut *conn, input).await?;
Self::assign_workflow_task_dispatch_execution(
&mut *conn,
workflow_execution_id,
task_name,
task_index,
execution.id,
)
.await?;
Ok(WorkflowTaskExecutionCreateOrGetResult {
execution,
created: true,
})
}
}
}
pub async fn create_workflow_task_if_absent(
pool: &PgPool,
input: CreateExecutionInput,
workflow_execution_id: Id,
task_name: &str,
task_index: Option<i32>,
) -> Result<WorkflowTaskExecutionCreateOrGetResult> {
let mut conn = pool.acquire().await?;
sqlx::query("BEGIN").execute(&mut *conn).await?;
let result = Self::create_workflow_task_if_absent_in_conn(
&mut conn,
input,
workflow_execution_id,
task_name,
task_index,
)
.await;
match result {
Ok(result) => {
sqlx::query("COMMIT").execute(&mut *conn).await?;
Ok(result)
}
Err(err) => {
sqlx::query("ROLLBACK").execute(&mut *conn).await?;
Err(err)
}
}
}
pub async fn create_workflow_task_if_absent_with_conn(
conn: &mut PgConnection,
input: CreateExecutionInput,
workflow_execution_id: Id,
task_name: &str,
task_index: Option<i32>,
) -> Result<WorkflowTaskExecutionCreateOrGetResult> {
Self::create_workflow_task_if_absent_in_conn(
conn,
input,
workflow_execution_id,
task_name,
task_index,
)
.await
}
pub async fn claim_for_scheduling<'e, E>(
executor: E,
id: Id,
@@ -320,6 +687,62 @@ impl ExecutionRepository {
.await
}
pub async fn update_if_status_and_updated_before<'e, E>(
executor: E,
id: Id,
expected_status: ExecutionStatus,
stale_before: DateTime<Utc>,
input: UpdateExecutionInput,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
if input.status.is_none()
&& input.result.is_none()
&& input.executor.is_none()
&& input.worker.is_none()
&& input.started_at.is_none()
&& input.workflow_task.is_none()
{
return Self::find_by_id(executor, id).await;
}
Self::update_with_locator_optional(executor, input, |query| {
query.push(" WHERE id = ").push_bind(id);
query.push(" AND status = ").push_bind(expected_status);
query.push(" AND updated < ").push_bind(stale_before);
})
.await
}
pub async fn update_if_status_and_updated_at<'e, E>(
executor: E,
id: Id,
expected_status: ExecutionStatus,
expected_updated: DateTime<Utc>,
input: UpdateExecutionInput,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
if input.status.is_none()
&& input.result.is_none()
&& input.executor.is_none()
&& input.worker.is_none()
&& input.started_at.is_none()
&& input.workflow_task.is_none()
{
return Self::find_by_id(executor, id).await;
}
Self::update_with_locator_optional(executor, input, |query| {
query.push(" WHERE id = ").push_bind(id);
query.push(" AND status = ").push_bind(expected_status);
query.push(" AND updated = ").push_bind(expected_updated);
})
.await
}
pub async fn revert_scheduled_to_requested<'e, E>(
executor: E,
id: Id,
@@ -473,10 +896,7 @@ impl ExecutionRepository {
.map_err(Into::into)
}
/// Update an execution using the loaded row's hypertable keys.
///
/// Including both the partition key (`created`) and compression segment key
/// (`action_ref`) avoids broad scans across compressed chunks.
/// Update an execution using the loaded row's primary key.
pub async fn update_loaded<'e, E>(
executor: E,
execution: &Execution,
@@ -495,12 +915,8 @@ impl ExecutionRepository {
return Ok(execution.clone());
}
let action_ref = execution.action_ref.clone();
Self::update_with_locator(executor, input, |query| {
query.push(" WHERE id = ").push_bind(execution.id);
query.push(" AND created = ").push_bind(execution.created);
query.push(" AND action_ref = ").push_bind(action_ref);
})
.await
}

View File

@@ -0,0 +1,909 @@
use chrono::{DateTime, Utc};
use sqlx::{PgPool, Postgres, Row, Transaction};
use crate::error::Result;
use crate::models::Id;
use crate::repositories::queue_stats::{QueueStatsRepository, UpsertQueueStatsInput};
#[derive(Debug, Clone)]
pub struct AdmissionSlotAcquireOutcome {
pub acquired: bool,
pub current_count: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdmissionEnqueueOutcome {
Acquired,
Enqueued,
}
#[derive(Debug, Clone)]
pub struct AdmissionSlotReleaseOutcome {
pub action_id: Id,
pub group_key: Option<String>,
pub next_execution_id: Option<Id>,
}
#[derive(Debug, Clone)]
pub struct AdmissionQueuedRemovalOutcome {
pub action_id: Id,
pub group_key: Option<String>,
pub next_execution_id: Option<Id>,
pub execution_id: Id,
pub queue_order: i64,
pub enqueued_at: DateTime<Utc>,
pub removed_index: usize,
}
#[derive(Debug, Clone)]
pub struct AdmissionQueueStats {
pub action_id: Id,
pub queue_length: usize,
pub active_count: u32,
pub max_concurrent: u32,
pub oldest_enqueued_at: Option<DateTime<Utc>>,
pub total_enqueued: u64,
pub total_completed: u64,
}
#[derive(Debug, Clone)]
struct AdmissionState {
id: Id,
action_id: Id,
group_key: Option<String>,
max_concurrent: i32,
}
#[derive(Debug, Clone)]
struct ExecutionEntry {
state_id: Id,
action_id: Id,
group_key: Option<String>,
status: String,
queue_order: i64,
enqueued_at: DateTime<Utc>,
}
pub struct ExecutionAdmissionRepository;
impl ExecutionAdmissionRepository {
pub async fn enqueue(
pool: &PgPool,
max_queue_length: usize,
action_id: Id,
execution_id: Id,
max_concurrent: u32,
group_key: Option<String>,
) -> Result<AdmissionEnqueueOutcome> {
let mut tx = pool.begin().await?;
let state = Self::lock_state(&mut tx, action_id, group_key, max_concurrent).await?;
let outcome =
Self::enqueue_in_state(&mut tx, &state, max_queue_length, execution_id, true).await?;
Self::refresh_queue_stats(&mut tx, action_id).await?;
tx.commit().await?;
Ok(outcome)
}
pub async fn wait_status(pool: &PgPool, execution_id: Id) -> Result<Option<bool>> {
let row = sqlx::query_scalar::<Postgres, bool>(
r#"
SELECT status = 'active'
FROM execution_admission_entry
WHERE execution_id = $1
"#,
)
.bind(execution_id)
.fetch_optional(pool)
.await?;
Ok(row)
}
pub async fn try_acquire(
pool: &PgPool,
action_id: Id,
execution_id: Id,
max_concurrent: u32,
group_key: Option<String>,
) -> Result<AdmissionSlotAcquireOutcome> {
let mut tx = pool.begin().await?;
let state = Self::lock_state(&mut tx, action_id, group_key, max_concurrent).await?;
let active_count = Self::active_count(&mut tx, state.id).await? as u32;
let outcome = match Self::find_execution_entry(&mut tx, execution_id).await? {
Some(entry) if entry.status == "active" => AdmissionSlotAcquireOutcome {
acquired: true,
current_count: active_count,
},
Some(entry) if entry.status == "queued" && entry.state_id == state.id => {
let promoted =
Self::maybe_promote_existing_queued(&mut tx, &state, execution_id).await?;
AdmissionSlotAcquireOutcome {
acquired: promoted,
current_count: active_count,
}
}
Some(_) => AdmissionSlotAcquireOutcome {
acquired: false,
current_count: active_count,
},
None => {
if active_count < max_concurrent
&& Self::queued_count(&mut tx, state.id).await? == 0
{
let queue_order = Self::allocate_queue_order(&mut tx, state.id).await?;
Self::insert_entry(
&mut tx,
state.id,
execution_id,
"active",
queue_order,
Utc::now(),
)
.await?;
Self::increment_total_enqueued(&mut tx, state.id).await?;
Self::refresh_queue_stats(&mut tx, action_id).await?;
AdmissionSlotAcquireOutcome {
acquired: true,
current_count: active_count,
}
} else {
AdmissionSlotAcquireOutcome {
acquired: false,
current_count: active_count,
}
}
}
};
tx.commit().await?;
Ok(outcome)
}
pub async fn release_active_slot(
pool: &PgPool,
execution_id: Id,
) -> Result<Option<AdmissionSlotReleaseOutcome>> {
let mut tx = pool.begin().await?;
let Some(entry) = Self::find_execution_entry_for_update(&mut tx, execution_id).await?
else {
tx.commit().await?;
return Ok(None);
};
if entry.status != "active" {
tx.commit().await?;
return Ok(None);
}
let state = Self::lock_existing_state(&mut tx, entry.action_id, entry.group_key.clone())
.await?
.ok_or_else(|| {
crate::Error::internal("missing execution_admission_state for active execution")
})?;
sqlx::query("DELETE FROM execution_admission_entry WHERE execution_id = $1")
.bind(execution_id)
.execute(&mut *tx)
.await?;
Self::increment_total_completed(&mut tx, state.id).await?;
let next_execution_id = Self::promote_next_queued(&mut tx, &state).await?;
Self::refresh_queue_stats(&mut tx, state.action_id).await?;
tx.commit().await?;
Ok(Some(AdmissionSlotReleaseOutcome {
action_id: state.action_id,
group_key: state.group_key,
next_execution_id,
}))
}
pub async fn restore_active_slot(
pool: &PgPool,
execution_id: Id,
outcome: &AdmissionSlotReleaseOutcome,
) -> Result<()> {
let mut tx = pool.begin().await?;
let state =
Self::lock_existing_state(&mut tx, outcome.action_id, outcome.group_key.clone())
.await?
.ok_or_else(|| {
crate::Error::internal("missing execution_admission_state on restore")
})?;
if let Some(next_execution_id) = outcome.next_execution_id {
sqlx::query(
r#"
UPDATE execution_admission_entry
SET status = 'queued', activated_at = NULL
WHERE execution_id = $1
AND state_id = $2
AND status = 'active'
"#,
)
.bind(next_execution_id)
.bind(state.id)
.execute(&mut *tx)
.await?;
}
sqlx::query(
r#"
INSERT INTO execution_admission_entry (
state_id, execution_id, status, queue_order, enqueued_at, activated_at
) VALUES ($1, $2, 'active', $3, NOW(), NOW())
ON CONFLICT (execution_id) DO UPDATE
SET state_id = EXCLUDED.state_id,
status = 'active',
activated_at = EXCLUDED.activated_at
"#,
)
.bind(state.id)
.bind(execution_id)
.bind(Self::allocate_queue_order(&mut tx, state.id).await?)
.execute(&mut *tx)
.await?;
sqlx::query(
r#"
UPDATE execution_admission_state
SET total_completed = GREATEST(total_completed - 1, 0)
WHERE id = $1
"#,
)
.bind(state.id)
.execute(&mut *tx)
.await?;
Self::refresh_queue_stats(&mut tx, state.action_id).await?;
tx.commit().await?;
Ok(())
}
pub async fn remove_queued_execution(
pool: &PgPool,
execution_id: Id,
) -> Result<Option<AdmissionQueuedRemovalOutcome>> {
let mut tx = pool.begin().await?;
let Some(entry) = Self::find_execution_entry_for_update(&mut tx, execution_id).await?
else {
tx.commit().await?;
return Ok(None);
};
if entry.status != "queued" {
tx.commit().await?;
return Ok(None);
}
let state = Self::lock_existing_state(&mut tx, entry.action_id, entry.group_key.clone())
.await?
.ok_or_else(|| {
crate::Error::internal("missing execution_admission_state for queued execution")
})?;
let removed_index = sqlx::query_scalar::<Postgres, i64>(
r#"
SELECT COUNT(*)
FROM execution_admission_entry
WHERE state_id = $1
AND status = 'queued'
AND (enqueued_at, id) < (
SELECT enqueued_at, id
FROM execution_admission_entry
WHERE execution_id = $2
)
"#,
)
.bind(state.id)
.bind(execution_id)
.fetch_one(&mut *tx)
.await? as usize;
sqlx::query("DELETE FROM execution_admission_entry WHERE execution_id = $1")
.bind(execution_id)
.execute(&mut *tx)
.await?;
let next_execution_id =
if Self::active_count(&mut tx, state.id).await? < state.max_concurrent as i64 {
Self::promote_next_queued(&mut tx, &state).await?
} else {
None
};
Self::refresh_queue_stats(&mut tx, state.action_id).await?;
tx.commit().await?;
Ok(Some(AdmissionQueuedRemovalOutcome {
action_id: state.action_id,
group_key: state.group_key,
next_execution_id,
execution_id,
queue_order: entry.queue_order,
enqueued_at: entry.enqueued_at,
removed_index,
}))
}
pub async fn restore_queued_execution(
pool: &PgPool,
outcome: &AdmissionQueuedRemovalOutcome,
) -> Result<()> {
let mut tx = pool.begin().await?;
let state =
Self::lock_existing_state(&mut tx, outcome.action_id, outcome.group_key.clone())
.await?
.ok_or_else(|| {
crate::Error::internal("missing execution_admission_state on queued restore")
})?;
if let Some(next_execution_id) = outcome.next_execution_id {
sqlx::query(
r#"
UPDATE execution_admission_entry
SET status = 'queued', activated_at = NULL
WHERE execution_id = $1
AND state_id = $2
AND status = 'active'
"#,
)
.bind(next_execution_id)
.bind(state.id)
.execute(&mut *tx)
.await?;
}
sqlx::query(
r#"
INSERT INTO execution_admission_entry (
state_id, execution_id, status, queue_order, enqueued_at, activated_at
) VALUES ($1, $2, 'queued', $3, $4, NULL)
ON CONFLICT (execution_id) DO NOTHING
"#,
)
.bind(state.id)
.bind(outcome.execution_id)
.bind(outcome.queue_order)
.bind(outcome.enqueued_at)
.execute(&mut *tx)
.await?;
Self::refresh_queue_stats(&mut tx, state.action_id).await?;
tx.commit().await?;
Ok(())
}
pub async fn get_queue_stats(
pool: &PgPool,
action_id: Id,
) -> Result<Option<AdmissionQueueStats>> {
let row = sqlx::query(
r#"
WITH state_rows AS (
SELECT
COUNT(*) AS state_count,
COALESCE(SUM(max_concurrent), 0) AS max_concurrent,
COALESCE(SUM(total_enqueued), 0) AS total_enqueued,
COALESCE(SUM(total_completed), 0) AS total_completed
FROM execution_admission_state
WHERE action_id = $1
),
entry_rows AS (
SELECT
COUNT(*) FILTER (WHERE e.status = 'queued') AS queue_length,
COUNT(*) FILTER (WHERE e.status = 'active') AS active_count,
MIN(e.enqueued_at) FILTER (WHERE e.status = 'queued') AS oldest_enqueued_at
FROM execution_admission_state s
LEFT JOIN execution_admission_entry e ON e.state_id = s.id
WHERE s.action_id = $1
)
SELECT
sr.state_count,
er.queue_length,
er.active_count,
sr.max_concurrent,
er.oldest_enqueued_at,
sr.total_enqueued,
sr.total_completed
FROM state_rows sr
CROSS JOIN entry_rows er
"#,
)
.bind(action_id)
.fetch_one(pool)
.await?;
let state_count: i64 = row.try_get("state_count")?;
if state_count == 0 {
return Ok(None);
}
Ok(Some(AdmissionQueueStats {
action_id,
queue_length: row.try_get::<i64, _>("queue_length")? as usize,
active_count: row.try_get::<i64, _>("active_count")? as u32,
max_concurrent: row.try_get::<i64, _>("max_concurrent")? as u32,
oldest_enqueued_at: row.try_get("oldest_enqueued_at")?,
total_enqueued: row.try_get::<i64, _>("total_enqueued")? as u64,
total_completed: row.try_get::<i64, _>("total_completed")? as u64,
}))
}
async fn enqueue_in_state(
tx: &mut Transaction<'_, Postgres>,
state: &AdmissionState,
max_queue_length: usize,
execution_id: Id,
allow_queue: bool,
) -> Result<AdmissionEnqueueOutcome> {
if let Some(entry) = Self::find_execution_entry(tx, execution_id).await? {
if entry.status == "active" {
return Ok(AdmissionEnqueueOutcome::Acquired);
}
if entry.status == "queued" && entry.state_id == state.id {
if Self::maybe_promote_existing_queued(tx, state, execution_id).await? {
return Ok(AdmissionEnqueueOutcome::Acquired);
}
return Ok(AdmissionEnqueueOutcome::Enqueued);
}
return Ok(AdmissionEnqueueOutcome::Enqueued);
}
let active_count = Self::active_count(tx, state.id).await?;
let queued_count = Self::queued_count(tx, state.id).await?;
if active_count < state.max_concurrent as i64 && queued_count == 0 {
let queue_order = Self::allocate_queue_order(tx, state.id).await?;
Self::insert_entry(
tx,
state.id,
execution_id,
"active",
queue_order,
Utc::now(),
)
.await?;
Self::increment_total_enqueued(tx, state.id).await?;
return Ok(AdmissionEnqueueOutcome::Acquired);
}
if !allow_queue {
return Ok(AdmissionEnqueueOutcome::Enqueued);
}
if queued_count >= max_queue_length as i64 {
return Err(anyhow::anyhow!(
"Queue full for action {}: maximum {} entries",
state.action_id,
max_queue_length
)
.into());
}
let queue_order = Self::allocate_queue_order(tx, state.id).await?;
Self::insert_entry(
tx,
state.id,
execution_id,
"queued",
queue_order,
Utc::now(),
)
.await?;
Self::increment_total_enqueued(tx, state.id).await?;
Ok(AdmissionEnqueueOutcome::Enqueued)
}
async fn maybe_promote_existing_queued(
tx: &mut Transaction<'_, Postgres>,
state: &AdmissionState,
execution_id: Id,
) -> Result<bool> {
let active_count = Self::active_count(tx, state.id).await?;
if active_count >= state.max_concurrent as i64 {
return Ok(false);
}
let front_execution_id = sqlx::query_scalar::<Postgres, Id>(
r#"
SELECT execution_id
FROM execution_admission_entry
WHERE state_id = $1
AND status = 'queued'
ORDER BY queue_order ASC
LIMIT 1
"#,
)
.bind(state.id)
.fetch_optional(&mut **tx)
.await?;
if front_execution_id != Some(execution_id) {
return Ok(false);
}
sqlx::query(
r#"
UPDATE execution_admission_entry
SET status = 'active',
activated_at = NOW()
WHERE execution_id = $1
AND state_id = $2
AND status = 'queued'
"#,
)
.bind(execution_id)
.bind(state.id)
.execute(&mut **tx)
.await?;
Ok(true)
}
async fn promote_next_queued(
tx: &mut Transaction<'_, Postgres>,
state: &AdmissionState,
) -> Result<Option<Id>> {
let next_execution_id = sqlx::query_scalar::<Postgres, Id>(
r#"
SELECT execution_id
FROM execution_admission_entry
WHERE state_id = $1
AND status = 'queued'
ORDER BY queue_order ASC
LIMIT 1
"#,
)
.bind(state.id)
.fetch_optional(&mut **tx)
.await?;
if let Some(next_execution_id) = next_execution_id {
sqlx::query(
r#"
UPDATE execution_admission_entry
SET status = 'active',
activated_at = NOW()
WHERE execution_id = $1
AND state_id = $2
AND status = 'queued'
"#,
)
.bind(next_execution_id)
.bind(state.id)
.execute(&mut **tx)
.await?;
}
Ok(next_execution_id)
}
async fn lock_state(
tx: &mut Transaction<'_, Postgres>,
action_id: Id,
group_key: Option<String>,
max_concurrent: u32,
) -> Result<AdmissionState> {
sqlx::query(
r#"
INSERT INTO execution_admission_state (action_id, group_key, max_concurrent)
VALUES ($1, $2, $3)
ON CONFLICT (action_id, group_key_normalized)
DO UPDATE SET max_concurrent = EXCLUDED.max_concurrent
"#,
)
.bind(action_id)
.bind(group_key.clone())
.bind(max_concurrent as i32)
.execute(&mut **tx)
.await?;
let state = sqlx::query(
r#"
SELECT id, action_id, group_key, max_concurrent
FROM execution_admission_state
WHERE action_id = $1
AND group_key_normalized = COALESCE($2, '')
FOR UPDATE
"#,
)
.bind(action_id)
.bind(group_key)
.fetch_one(&mut **tx)
.await?;
Ok(AdmissionState {
id: state.try_get("id")?,
action_id: state.try_get("action_id")?,
group_key: state.try_get("group_key")?,
max_concurrent: state.try_get("max_concurrent")?,
})
}
async fn lock_existing_state(
tx: &mut Transaction<'_, Postgres>,
action_id: Id,
group_key: Option<String>,
) -> Result<Option<AdmissionState>> {
let row = sqlx::query(
r#"
SELECT id, action_id, group_key, max_concurrent
FROM execution_admission_state
WHERE action_id = $1
AND group_key_normalized = COALESCE($2, '')
FOR UPDATE
"#,
)
.bind(action_id)
.bind(group_key)
.fetch_optional(&mut **tx)
.await?;
Ok(row.map(|state| AdmissionState {
id: state.try_get("id").expect("state.id"),
action_id: state.try_get("action_id").expect("state.action_id"),
group_key: state.try_get("group_key").expect("state.group_key"),
max_concurrent: state
.try_get("max_concurrent")
.expect("state.max_concurrent"),
}))
}
async fn find_execution_entry(
tx: &mut Transaction<'_, Postgres>,
execution_id: Id,
) -> Result<Option<ExecutionEntry>> {
let row = sqlx::query(
r#"
SELECT
e.state_id,
s.action_id,
s.group_key,
e.execution_id,
e.status,
e.queue_order,
e.enqueued_at
FROM execution_admission_entry e
JOIN execution_admission_state s ON s.id = e.state_id
WHERE e.execution_id = $1
"#,
)
.bind(execution_id)
.fetch_optional(&mut **tx)
.await?;
Ok(row.map(|entry| ExecutionEntry {
state_id: entry.try_get("state_id").expect("entry.state_id"),
action_id: entry.try_get("action_id").expect("entry.action_id"),
group_key: entry.try_get("group_key").expect("entry.group_key"),
status: entry.try_get("status").expect("entry.status"),
queue_order: entry.try_get("queue_order").expect("entry.queue_order"),
enqueued_at: entry.try_get("enqueued_at").expect("entry.enqueued_at"),
}))
}
async fn find_execution_entry_for_update(
tx: &mut Transaction<'_, Postgres>,
execution_id: Id,
) -> Result<Option<ExecutionEntry>> {
let row = sqlx::query(
r#"
SELECT
e.state_id,
s.action_id,
s.group_key,
e.execution_id,
e.status,
e.queue_order,
e.enqueued_at
FROM execution_admission_entry e
JOIN execution_admission_state s ON s.id = e.state_id
WHERE e.execution_id = $1
FOR UPDATE OF e, s
"#,
)
.bind(execution_id)
.fetch_optional(&mut **tx)
.await?;
Ok(row.map(|entry| ExecutionEntry {
state_id: entry.try_get("state_id").expect("entry.state_id"),
action_id: entry.try_get("action_id").expect("entry.action_id"),
group_key: entry.try_get("group_key").expect("entry.group_key"),
status: entry.try_get("status").expect("entry.status"),
queue_order: entry.try_get("queue_order").expect("entry.queue_order"),
enqueued_at: entry.try_get("enqueued_at").expect("entry.enqueued_at"),
}))
}
async fn active_count(tx: &mut Transaction<'_, Postgres>, state_id: Id) -> Result<i64> {
Ok(sqlx::query_scalar::<Postgres, i64>(
r#"
SELECT COUNT(*)
FROM execution_admission_entry
WHERE state_id = $1
AND status = 'active'
"#,
)
.bind(state_id)
.fetch_one(&mut **tx)
.await?)
}
async fn queued_count(tx: &mut Transaction<'_, Postgres>, state_id: Id) -> Result<i64> {
Ok(sqlx::query_scalar::<Postgres, i64>(
r#"
SELECT COUNT(*)
FROM execution_admission_entry
WHERE state_id = $1
AND status = 'queued'
"#,
)
.bind(state_id)
.fetch_one(&mut **tx)
.await?)
}
async fn insert_entry(
tx: &mut Transaction<'_, Postgres>,
state_id: Id,
execution_id: Id,
status: &str,
queue_order: i64,
enqueued_at: DateTime<Utc>,
) -> Result<()> {
sqlx::query(
r#"
INSERT INTO execution_admission_entry (
state_id, execution_id, status, queue_order, enqueued_at, activated_at
) VALUES (
$1, $2, $3, $4, $5,
CASE WHEN $3 = 'active' THEN NOW() ELSE NULL END
)
"#,
)
.bind(state_id)
.bind(execution_id)
.bind(status)
.bind(queue_order)
.bind(enqueued_at)
.execute(&mut **tx)
.await?;
Ok(())
}
async fn allocate_queue_order(tx: &mut Transaction<'_, Postgres>, state_id: Id) -> Result<i64> {
let queue_order = sqlx::query_scalar::<Postgres, i64>(
r#"
UPDATE execution_admission_state
SET next_queue_order = next_queue_order + 1
WHERE id = $1
RETURNING next_queue_order - 1
"#,
)
.bind(state_id)
.fetch_one(&mut **tx)
.await?;
Ok(queue_order)
}
async fn increment_total_enqueued(
tx: &mut Transaction<'_, Postgres>,
state_id: Id,
) -> Result<()> {
sqlx::query(
r#"
UPDATE execution_admission_state
SET total_enqueued = total_enqueued + 1
WHERE id = $1
"#,
)
.bind(state_id)
.execute(&mut **tx)
.await?;
Ok(())
}
async fn increment_total_completed(
tx: &mut Transaction<'_, Postgres>,
state_id: Id,
) -> Result<()> {
sqlx::query(
r#"
UPDATE execution_admission_state
SET total_completed = total_completed + 1
WHERE id = $1
"#,
)
.bind(state_id)
.execute(&mut **tx)
.await?;
Ok(())
}
async fn refresh_queue_stats(tx: &mut Transaction<'_, Postgres>, action_id: Id) -> Result<()> {
let Some(stats) = Self::get_queue_stats_from_tx(tx, action_id).await? else {
QueueStatsRepository::delete(&mut **tx, action_id).await?;
return Ok(());
};
QueueStatsRepository::upsert(
&mut **tx,
UpsertQueueStatsInput {
action_id,
queue_length: stats.queue_length as i32,
active_count: stats.active_count as i32,
max_concurrent: stats.max_concurrent as i32,
oldest_enqueued_at: stats.oldest_enqueued_at,
total_enqueued: stats.total_enqueued as i64,
total_completed: stats.total_completed as i64,
},
)
.await?;
Ok(())
}
async fn get_queue_stats_from_tx(
tx: &mut Transaction<'_, Postgres>,
action_id: Id,
) -> Result<Option<AdmissionQueueStats>> {
let row = sqlx::query(
r#"
WITH state_rows AS (
SELECT
COUNT(*) AS state_count,
COALESCE(SUM(max_concurrent), 0) AS max_concurrent,
COALESCE(SUM(total_enqueued), 0) AS total_enqueued,
COALESCE(SUM(total_completed), 0) AS total_completed
FROM execution_admission_state
WHERE action_id = $1
),
entry_rows AS (
SELECT
COUNT(*) FILTER (WHERE e.status = 'queued') AS queue_length,
COUNT(*) FILTER (WHERE e.status = 'active') AS active_count,
MIN(e.enqueued_at) FILTER (WHERE e.status = 'queued') AS oldest_enqueued_at
FROM execution_admission_state s
LEFT JOIN execution_admission_entry e ON e.state_id = s.id
WHERE s.action_id = $1
)
SELECT
sr.state_count,
er.queue_length,
er.active_count,
sr.max_concurrent,
er.oldest_enqueued_at,
sr.total_enqueued,
sr.total_completed
FROM state_rows sr
CROSS JOIN entry_rows er
"#,
)
.bind(action_id)
.fetch_one(&mut **tx)
.await?;
let state_count: i64 = row.try_get("state_count")?;
if state_count == 0 {
return Ok(None);
}
Ok(Some(AdmissionQueueStats {
action_id,
queue_length: row.try_get::<i64, _>("queue_length")? as usize,
active_count: row.try_get::<i64, _>("active_count")? as u32,
max_concurrent: row.try_get::<i64, _>("max_concurrent")? as u32,
oldest_enqueued_at: row.try_get("oldest_enqueued_at")?,
total_enqueued: row.try_get::<i64, _>("total_enqueued")? as u64,
total_completed: row.try_get::<i64, _>("total_completed")? as u64,
}))
}
}

View File

@@ -33,6 +33,7 @@ pub mod artifact;
pub mod entity_history;
pub mod event;
pub mod execution;
pub mod execution_admission;
pub mod identity;
pub mod inquiry;
pub mod key;
@@ -53,6 +54,7 @@ pub use artifact::{ArtifactRepository, ArtifactVersionRepository};
pub use entity_history::EntityHistoryRepository;
pub use event::{EnforcementRepository, EventRepository};
pub use execution::ExecutionRepository;
pub use execution_admission::ExecutionAdmissionRepository;
pub use identity::{IdentityRepository, PermissionAssignmentRepository, PermissionSetRepository};
pub use inquiry::InquiryRepository;
pub use key::KeyRepository;

View File

@@ -3,7 +3,7 @@
//! Provides database operations for queue statistics persistence.
use chrono::{DateTime, Utc};
use sqlx::{PgPool, Postgres, QueryBuilder};
use sqlx::{Executor, PgPool, Postgres, QueryBuilder};
use crate::error::Result;
use crate::models::Id;
@@ -38,7 +38,10 @@ pub struct QueueStatsRepository;
impl QueueStatsRepository {
/// Upsert queue statistics (insert or update)
pub async fn upsert(pool: &PgPool, input: UpsertQueueStatsInput) -> Result<QueueStats> {
pub async fn upsert<'e, E>(executor: E, input: UpsertQueueStatsInput) -> Result<QueueStats>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let stats = sqlx::query_as::<Postgres, QueueStats>(
r#"
INSERT INTO queue_stats (
@@ -69,14 +72,17 @@ impl QueueStatsRepository {
.bind(input.oldest_enqueued_at)
.bind(input.total_enqueued)
.bind(input.total_completed)
.fetch_one(pool)
.fetch_one(executor)
.await?;
Ok(stats)
}
/// Get queue statistics for a specific action
pub async fn find_by_action(pool: &PgPool, action_id: Id) -> Result<Option<QueueStats>> {
pub async fn find_by_action<'e, E>(executor: E, action_id: Id) -> Result<Option<QueueStats>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let stats = sqlx::query_as::<Postgres, QueueStats>(
r#"
SELECT
@@ -93,14 +99,17 @@ impl QueueStatsRepository {
"#,
)
.bind(action_id)
.fetch_optional(pool)
.fetch_optional(executor)
.await?;
Ok(stats)
}
/// List all queue statistics with active queues (queue_length > 0 or active_count > 0)
pub async fn list_active(pool: &PgPool) -> Result<Vec<QueueStats>> {
pub async fn list_active<'e, E>(executor: E) -> Result<Vec<QueueStats>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let stats = sqlx::query_as::<Postgres, QueueStats>(
r#"
SELECT
@@ -117,14 +126,17 @@ impl QueueStatsRepository {
ORDER BY last_updated DESC
"#,
)
.fetch_all(pool)
.fetch_all(executor)
.await?;
Ok(stats)
}
/// List all queue statistics
pub async fn list_all(pool: &PgPool) -> Result<Vec<QueueStats>> {
pub async fn list_all<'e, E>(executor: E) -> Result<Vec<QueueStats>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let stats = sqlx::query_as::<Postgres, QueueStats>(
r#"
SELECT
@@ -140,14 +152,17 @@ impl QueueStatsRepository {
ORDER BY last_updated DESC
"#,
)
.fetch_all(pool)
.fetch_all(executor)
.await?;
Ok(stats)
}
/// Delete queue statistics for a specific action
pub async fn delete(pool: &PgPool, action_id: Id) -> Result<bool> {
pub async fn delete<'e, E>(executor: E, action_id: Id) -> Result<bool>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let result = sqlx::query(
r#"
DELETE FROM queue_stats
@@ -155,7 +170,7 @@ impl QueueStatsRepository {
"#,
)
.bind(action_id)
.execute(pool)
.execute(executor)
.await?;
Ok(result.rows_affected() > 0)
@@ -163,7 +178,7 @@ impl QueueStatsRepository {
/// Batch upsert multiple queue statistics
pub async fn batch_upsert(
pool: &PgPool,
executor: &PgPool,
inputs: Vec<UpsertQueueStatsInput>,
) -> Result<Vec<QueueStats>> {
if inputs.is_empty() {
@@ -213,14 +228,17 @@ impl QueueStatsRepository {
let stats = query_builder
.build_query_as::<QueueStats>()
.fetch_all(pool)
.fetch_all(executor)
.await?;
Ok(stats)
}
/// Clear stale statistics (older than specified duration)
pub async fn clear_stale(pool: &PgPool, older_than_seconds: i64) -> Result<u64> {
pub async fn clear_stale<'e, E>(executor: E, older_than_seconds: i64) -> Result<u64>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let result = sqlx::query(
r#"
DELETE FROM queue_stats
@@ -230,7 +248,7 @@ impl QueueStatsRepository {
"#,
)
.bind(older_than_seconds)
.execute(pool)
.execute(executor)
.await?;
Ok(result.rows_affected())

View File

@@ -612,6 +612,26 @@ impl Delete for WorkflowExecutionRepository {
}
impl WorkflowExecutionRepository {
pub async fn find_by_id_for_update<'e, E>(
executor: E,
id: Id,
) -> Result<Option<WorkflowExecution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
sqlx::query_as::<_, WorkflowExecution>(
"SELECT id, execution, workflow_def, current_tasks, completed_tasks, failed_tasks, skipped_tasks,
variables, task_graph, status, error_message, paused, pause_reason, created, updated
FROM workflow_execution
WHERE id = $1
FOR UPDATE"
)
.bind(id)
.fetch_optional(executor)
.await
.map_err(Into::into)
}
pub async fn create_or_get_by_execution<'e, E>(
executor: E,
input: CreateWorkflowExecutionInput,

View File

@@ -157,7 +157,11 @@ impl CompletionListener {
"Failed to advance workflow for execution {}: {}",
execution_id, e
);
// Continue processing — don't fail the entire completion
if let Some(mq_err) = Self::retryable_mq_error(&e) {
return Err(mq_err.into());
}
// Non-retryable workflow advancement errors are logged but
// do not fail the entire completion processing path.
}
}

View File

@@ -14,7 +14,7 @@ use attune_common::{
error::Error,
models::ExecutionStatus,
mq::{Consumer, ConsumerConfig, MessageEnvelope, MessageType, MqResult},
repositories::{execution::UpdateExecutionInput, ExecutionRepository, FindById, Update},
repositories::{execution::UpdateExecutionInput, ExecutionRepository, FindById},
};
use chrono::Utc;
use serde_json::json;
@@ -179,13 +179,12 @@ async fn handle_execution_requested(
}
};
// Only fail if still in a non-terminal state
if !matches!(
execution.status,
ExecutionStatus::Scheduled | ExecutionStatus::Running
) {
// Only scheduled executions are still legitimately owned by the scheduler.
// If the execution already moved to running or a terminal state, this DLQ
// delivery is stale and must not overwrite newer state.
if execution.status != ExecutionStatus::Scheduled {
info!(
"Execution {} already in terminal state {:?}, skipping",
"Execution {} already left Scheduled state ({:?}), skipping stale DLQ handling",
execution_id, execution.status
);
return Ok(()); // Acknowledge to remove from queue
@@ -193,6 +192,12 @@ async fn handle_execution_requested(
// Get worker info from payload for better error message
let worker_id = envelope.payload.get("worker_id").and_then(|v| v.as_i64());
let scheduled_attempt_updated_at = envelope
.payload
.get("scheduled_attempt_updated_at")
.and_then(|v| v.as_str())
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc));
let error_message = if let Some(wid) = worker_id {
format!(
@@ -214,26 +219,87 @@ async fn handle_execution_requested(
..Default::default()
};
match ExecutionRepository::update(pool, execution_id, update_input).await {
Ok(_) => {
if let Some(timestamp) = scheduled_attempt_updated_at {
// Guard on both status and the exact updated_at from when the execution was
// scheduled — prevents overwriting state that changed after this DLQ message
// was enqueued.
match ExecutionRepository::update_if_status_and_updated_at(
pool,
execution_id,
ExecutionStatus::Scheduled,
timestamp,
update_input,
)
.await
{
Ok(Some(_)) => {
info!(
"Successfully failed execution {} due to worker queue expiration",
execution_id
);
Ok(())
}
Ok(None) => {
info!(
"Skipping DLQ failure for execution {} because it already left Scheduled state",
execution_id
);
Ok(())
}
Err(e) => {
error!(
"Failed to update execution {} to failed state: {}",
execution_id, e
);
// Return error to nack and potentially retry
Err(attune_common::mq::MqError::Consume(format!(
"Failed to update execution: {}",
e
)))
}
}
} else {
// Fallback for DLQ messages that predate the scheduled_attempt_updated_at
// field. Use a status-only guard — same safety guarantee as the original code
// (never overwrites terminal or running state).
warn!(
"DLQ message for execution {} lacks scheduled_attempt_updated_at; \
falling back to status-only guard",
execution_id
);
match ExecutionRepository::update_if_status(
pool,
execution_id,
ExecutionStatus::Scheduled,
update_input,
)
.await
{
Ok(Some(_)) => {
info!(
"Successfully failed execution {} due to worker queue expiration (status-only guard)",
execution_id
);
Ok(())
}
Ok(None) => {
info!(
"Skipping DLQ failure for execution {} because it already left Scheduled state",
execution_id
);
Ok(())
}
Err(e) => {
error!(
"Failed to update execution {} to failed state: {}",
execution_id, e
);
Err(attune_common::mq::MqError::Consume(format!(
"Failed to update execution: {}",
e
)))
}
}
}
}
/// Create a dead letter consumer configuration

View File

@@ -19,7 +19,7 @@ use attune_common::{
event::{EnforcementRepository, EventRepository, UpdateEnforcementInput},
execution::{CreateExecutionInput, ExecutionRepository},
rule::RuleRepository,
Create, FindById,
FindById,
},
};
@@ -116,6 +116,14 @@ impl EnforcementProcessor {
.await?
.ok_or_else(|| anyhow::anyhow!("Enforcement not found: {}", enforcement_id))?;
if enforcement.status != EnforcementStatus::Created {
debug!(
"Enforcement {} already left Created state ({:?}), skipping duplicate processing",
enforcement_id, enforcement.status
);
return Ok(());
}
// Fetch associated rule
let rule = RuleRepository::find_by_id(
pool,
@@ -135,7 +143,7 @@ impl EnforcementProcessor {
// Evaluate whether to create execution
if Self::should_create_execution(&enforcement, &rule, event.as_ref())? {
Self::create_execution(
let execution_created = Self::create_execution(
pool,
publisher,
policy_enforcer,
@@ -145,10 +153,10 @@ impl EnforcementProcessor {
)
.await?;
// Update enforcement status to Processed after successful execution creation
EnforcementRepository::update_loaded(
let updated = EnforcementRepository::update_loaded_if_status(
pool,
&enforcement,
EnforcementStatus::Created,
UpdateEnforcementInput {
status: Some(EnforcementStatus::Processed),
payload: None,
@@ -157,17 +165,27 @@ impl EnforcementProcessor {
)
.await?;
debug!("Updated enforcement {} status to Processed", enforcement_id);
if updated.is_some() {
debug!(
"Updated enforcement {} status to Processed after {} execution path",
enforcement_id,
if execution_created {
"new"
} else {
"idempotent"
}
);
}
} else {
info!(
"Skipping execution creation for enforcement: {}",
enforcement_id
);
// Update enforcement status to Disabled since it was not actionable
EnforcementRepository::update_loaded(
let updated = EnforcementRepository::update_loaded_if_status(
pool,
&enforcement,
EnforcementStatus::Created,
UpdateEnforcementInput {
status: Some(EnforcementStatus::Disabled),
payload: None,
@@ -176,11 +194,13 @@ impl EnforcementProcessor {
)
.await?;
if updated.is_some() {
debug!(
"Updated enforcement {} status to Disabled (skipped)",
enforcement_id
);
}
}
Ok(())
}
@@ -234,7 +254,7 @@ impl EnforcementProcessor {
_queue_manager: &ExecutionQueueManager,
enforcement: &Enforcement,
rule: &Rule,
) -> Result<()> {
) -> Result<bool> {
// Extract action ID — should_create_execution already verified it's Some,
// but guard defensively here as well.
let action_id = match rule.action {
@@ -275,21 +295,36 @@ impl EnforcementProcessor {
workflow_task: None, // Non-workflow execution
};
let execution = ExecutionRepository::create(pool, execution_input).await?;
let execution_result = ExecutionRepository::create_top_level_for_enforcement_if_absent(
pool,
execution_input,
enforcement.id,
)
.await?;
let execution = execution_result.execution;
if execution_result.created {
info!(
"Created execution: {} for enforcement: {}",
execution.id, enforcement.id
);
} else {
info!(
"Reusing execution: {} for enforcement: {}",
execution.id, enforcement.id
);
}
// Publish ExecutionRequested message
if execution_result.created
|| execution.status == attune_common::models::enums::ExecutionStatus::Requested
{
let payload = ExecutionRequestedPayload {
execution_id: execution.id,
action_id: Some(action_id),
action_ref: action_ref.clone(),
parent_id: None,
enforcement_id: Some(enforcement.id),
config: enforcement.config.clone(),
config: execution.config.clone(),
};
let envelope =
@@ -308,11 +343,12 @@ impl EnforcementProcessor {
"Published execution.requested message for execution: {} (enforcement: {}, action: {})",
execution.id, enforcement.id, action_id
);
}
// NOTE: Queue slot will be released when worker publishes execution.completed
// and CompletionListener calls queue_manager.notify_completion(action_id)
Ok(())
Ok(execution_result.created)
}
}

View File

@@ -19,7 +19,7 @@ use attune_common::{
event::{CreateEnforcementInput, EnforcementRepository, EventRepository},
pack::PackRepository,
rule::RuleRepository,
Create, FindById, List,
FindById, List,
},
template_resolver::{resolve_templates, TemplateContext},
};
@@ -206,14 +206,23 @@ impl EventProcessor {
conditions: rule.conditions.clone(),
};
let enforcement = EnforcementRepository::create(pool, create_input).await?;
let enforcement_result =
EnforcementRepository::create_or_get_by_rule_event(pool, create_input).await?;
let enforcement = enforcement_result.enforcement;
if enforcement_result.created {
info!(
"Enforcement {} created for rule {} (event: {})",
enforcement.id, rule.r#ref, event.id
);
} else {
info!(
"Reusing enforcement {} for rule {} (event: {})",
enforcement.id, rule.r#ref, event.id
);
}
// Publish EnforcementCreated message
if enforcement_result.created || enforcement.status == EnforcementStatus::Created {
let enforcement_payload = EnforcementCreatedPayload {
enforcement_id: enforcement.id,
rule_id: Some(rule.id),
@@ -223,7 +232,8 @@ impl EventProcessor {
payload: payload.clone(),
};
let envelope = MessageEnvelope::new(MessageType::EnforcementCreated, enforcement_payload)
let envelope =
MessageEnvelope::new(MessageType::EnforcementCreated, enforcement_payload)
.with_source("event-processor");
publisher.publish_envelope(&envelope).await?;
@@ -232,6 +242,7 @@ impl EventProcessor {
"Published EnforcementCreated message for enforcement {}",
enforcement.id
);
}
Ok(())
}

View File

@@ -9,13 +9,14 @@
use anyhow::Result;
use attune_common::{
error::Error as AttuneError,
models::{enums::InquiryStatus, inquiry::Inquiry, Execution, Id},
mq::{
Consumer, InquiryCreatedPayload, InquiryRespondedPayload, MessageEnvelope, MessageType,
Publisher,
},
repositories::{
execution::{ExecutionRepository, UpdateExecutionInput},
execution::{ExecutionRepository, UpdateExecutionInput, SELECT_COLUMNS},
inquiry::{CreateInquiryInput, InquiryRepository},
Create, FindById, Update,
},
@@ -28,6 +29,8 @@ use tracing::{debug, error, info, warn};
/// Special key in action result to indicate an inquiry should be created
pub const INQUIRY_RESULT_KEY: &str = "__inquiry";
const INQUIRY_ID_RESULT_KEY: &str = "__inquiry_id";
const INQUIRY_CREATED_PUBLISHED_RESULT_KEY: &str = "__inquiry_created_published";
/// Structure for inquiry data in action results
#[derive(Debug, Clone, serde::Deserialize)]
@@ -104,26 +107,71 @@ impl InquiryHandler {
let inquiry_request: InquiryRequest = serde_json::from_value(inquiry_value.clone())?;
Ok(inquiry_request)
}
}
/// Returns true when `e` represents a PostgreSQL unique constraint violation (code 23505).
fn is_db_unique_violation(e: &AttuneError) -> bool {
if let AttuneError::Database(sqlx_err) = e {
return sqlx_err
.as_database_error()
.and_then(|db| db.code())
.as_deref()
== Some("23505");
}
false
}
impl InquiryHandler {
/// Create an inquiry for an execution and pause it
pub async fn create_inquiry_from_result(
pool: &PgPool,
publisher: &Publisher,
execution_id: Id,
result: &JsonValue,
_result: &JsonValue,
) -> Result<Inquiry> {
info!("Creating inquiry for execution {}", execution_id);
// Extract inquiry request
let inquiry_request = Self::extract_inquiry_request(result)?;
let mut tx = pool.begin().await?;
let execution = sqlx::query_as::<_, Execution>(&format!(
"SELECT {SELECT_COLUMNS} FROM execution WHERE id = $1 FOR UPDATE"
))
.bind(execution_id)
.fetch_one(&mut *tx)
.await?;
// Calculate timeout if specified
let mut result = execution
.result
.clone()
.ok_or_else(|| anyhow::anyhow!("Execution {} has no result", execution_id))?;
let inquiry_request = Self::extract_inquiry_request(&result)?;
let timeout_at = inquiry_request
.timeout_seconds
.map(|seconds| Utc::now() + chrono::Duration::seconds(seconds));
// Create inquiry in database
let inquiry_input = CreateInquiryInput {
let existing_inquiry_id = result
.get(INQUIRY_ID_RESULT_KEY)
.and_then(|value| value.as_i64());
let published = result
.get(INQUIRY_CREATED_PUBLISHED_RESULT_KEY)
.and_then(|value| value.as_bool())
.unwrap_or(false);
let (inquiry, should_publish) = if let Some(inquiry_id) = existing_inquiry_id {
let inquiry = InquiryRepository::find_by_id(&mut *tx, inquiry_id)
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"Inquiry {} referenced by execution {} result not found",
inquiry_id,
execution_id
)
})?;
let should_publish = !published && inquiry.status == InquiryStatus::Pending;
(inquiry, should_publish)
} else {
let create_result = InquiryRepository::create(
&mut *tx,
CreateInquiryInput {
execution: execution_id,
prompt: inquiry_request.prompt.clone(),
response_schema: inquiry_request.response_schema.clone(),
@@ -131,20 +179,55 @@ impl InquiryHandler {
status: InquiryStatus::Pending,
response: None,
timeout_at,
},
)
.await;
let inquiry = match create_result {
Ok(inq) => inq,
Err(e) => {
// Unique constraint violation (23505): another replica already
// created the inquiry for this execution. Treat as idempotent
// success — drop the aborted transaction and return the existing row.
if is_db_unique_violation(&e) {
info!(
"Inquiry for execution {} already created by another replica \
(unique constraint 23505); treating as idempotent",
execution_id
);
// tx is in an aborted state; dropping it issues ROLLBACK.
drop(tx);
let inquiries =
InquiryRepository::find_by_execution(pool, execution_id).await?;
let existing = inquiries.into_iter().next().ok_or_else(|| {
anyhow::anyhow!(
"Inquiry for execution {} not found after unique constraint violation",
execution_id
)
})?;
return Ok(existing);
}
return Err(e.into());
}
};
let inquiry = InquiryRepository::create(pool, inquiry_input).await?;
Self::set_inquiry_result_metadata(&mut result, inquiry.id, false)?;
ExecutionRepository::update(
&mut *tx,
execution_id,
UpdateExecutionInput {
result: Some(result),
..Default::default()
},
)
.await?;
info!(
"Created inquiry {} for execution {}",
inquiry.id, execution_id
);
(inquiry, true)
};
// Update execution status to paused/waiting
// Note: We use a special status or keep it as "running" with inquiry tracking
// For now, we'll keep status as-is and track via inquiry relationship
tx.commit().await?;
// Publish InquiryCreated message
if should_publish {
let payload = InquiryCreatedPayload {
inquiry_id: inquiry.id,
execution_id,
@@ -158,15 +241,64 @@ impl InquiryHandler {
MessageEnvelope::new(MessageType::InquiryCreated, payload).with_source("executor");
publisher.publish_envelope(&envelope).await?;
Self::mark_inquiry_created_published(pool, execution_id).await?;
debug!(
"Published InquiryCreated message for inquiry {}",
inquiry.id
);
}
Ok(inquiry)
}
fn set_inquiry_result_metadata(
result: &mut JsonValue,
inquiry_id: Id,
published: bool,
) -> Result<()> {
let obj = result
.as_object_mut()
.ok_or_else(|| anyhow::anyhow!("execution result is not a JSON object"))?;
obj.insert(
INQUIRY_ID_RESULT_KEY.to_string(),
JsonValue::Number(inquiry_id.into()),
);
obj.insert(
INQUIRY_CREATED_PUBLISHED_RESULT_KEY.to_string(),
JsonValue::Bool(published),
);
Ok(())
}
async fn mark_inquiry_created_published(pool: &PgPool, execution_id: Id) -> Result<()> {
let execution = ExecutionRepository::find_by_id(pool, execution_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?;
let mut result = execution
.result
.clone()
.ok_or_else(|| anyhow::anyhow!("Execution {} has no result", execution_id))?;
let inquiry_id = result
.get(INQUIRY_ID_RESULT_KEY)
.and_then(|value| value.as_i64())
.ok_or_else(|| anyhow::anyhow!("Execution {} missing __inquiry_id", execution_id))?;
Self::set_inquiry_result_metadata(&mut result, inquiry_id, true)?;
ExecutionRepository::update(
pool,
execution_id,
UpdateExecutionInput {
result: Some(result),
..Default::default()
},
)
.await?;
Ok(())
}
/// Handle an inquiry response message
async fn handle_inquiry_response(
pool: &PgPool,
@@ -235,9 +367,13 @@ impl InquiryHandler {
if let Some(obj) = updated_result.as_object_mut() {
obj.insert("__inquiry_response".to_string(), response.clone());
obj.insert(
"__inquiry_id".to_string(),
INQUIRY_ID_RESULT_KEY.to_string(),
JsonValue::Number(inquiry.id.into()),
);
obj.insert(
INQUIRY_CREATED_PUBLISHED_RESULT_KEY.to_string(),
JsonValue::Bool(true),
);
}
// Update execution with new result

View File

@@ -933,8 +933,8 @@ mod tests {
assert_eq!(enforcer.get_concurrency_limit(2, Some(200)), Some(20));
}
#[test]
fn test_build_parameter_group_key_uses_exact_values() {
#[tokio::test]
async fn test_build_parameter_group_key_uses_exact_values() {
let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap();
let enforcer = PolicyEnforcer::new(pool);
let config = serde_json::json!({

View File

@@ -23,7 +23,13 @@ use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, info, warn};
use attune_common::models::Id;
use attune_common::repositories::queue_stats::{QueueStatsRepository, UpsertQueueStatsInput};
use attune_common::repositories::{
execution_admission::{
AdmissionEnqueueOutcome, AdmissionQueueStats, AdmissionQueuedRemovalOutcome,
AdmissionSlotReleaseOutcome, ExecutionAdmissionRepository,
},
queue_stats::{QueueStatsRepository, UpsertQueueStatsInput},
};
/// Configuration for the queue manager
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -51,6 +57,8 @@ impl Default for QueueConfig {
struct QueueEntry {
/// Execution or enforcement ID being queued
execution_id: Id,
/// Durable FIFO position for the DB-backed admission path.
queue_order: Option<i64>,
/// When this entry was added to the queue
enqueued_at: DateTime<Utc>,
}
@@ -224,6 +232,12 @@ impl ExecutionQueueManager {
max_concurrent: u32,
group_key: Option<String>,
) -> Result<()> {
if self.db_pool.is_some() {
return self
.enqueue_and_wait_db(action_id, execution_id, max_concurrent, group_key)
.await;
}
if self.active_execution_keys.contains_key(&execution_id) {
debug!(
"Execution {} already owns an active slot, skipping queue wait",
@@ -311,6 +325,7 @@ impl ExecutionQueueManager {
// Add to queue
let entry = QueueEntry {
execution_id,
queue_order: None,
enqueued_at: Utc::now(),
};
@@ -392,6 +407,24 @@ impl ExecutionQueueManager {
max_concurrent: u32,
group_key: Option<String>,
) -> Result<SlotEnqueueOutcome> {
if let Some(pool) = &self.db_pool {
return Ok(
match ExecutionAdmissionRepository::enqueue(
pool,
self.config.max_queue_length,
action_id,
execution_id,
max_concurrent,
group_key,
)
.await?
{
AdmissionEnqueueOutcome::Acquired => SlotEnqueueOutcome::Acquired,
AdmissionEnqueueOutcome::Enqueued => SlotEnqueueOutcome::Enqueued,
},
);
}
if self.active_execution_keys.contains_key(&execution_id) {
debug!(
"Execution {} already owns an active slot, treating as acquired",
@@ -463,6 +496,7 @@ impl ExecutionQueueManager {
queue.queue.push_back(QueueEntry {
execution_id,
queue_order: None,
enqueued_at: Utc::now(),
});
queue.total_enqueued += 1;
@@ -480,6 +514,21 @@ impl ExecutionQueueManager {
max_concurrent: u32,
group_key: Option<String>,
) -> Result<SlotAcquireOutcome> {
if let Some(pool) = &self.db_pool {
let outcome = ExecutionAdmissionRepository::try_acquire(
pool,
action_id,
execution_id,
max_concurrent,
group_key,
)
.await?;
return Ok(SlotAcquireOutcome {
acquired: outcome.acquired,
current_count: outcome.current_count,
});
}
let queue_key = self.queue_key(action_id, group_key);
let queue_arc = self
.get_or_create_queue(queue_key.clone(), max_concurrent)
@@ -530,6 +579,14 @@ impl ExecutionQueueManager {
&self,
execution_id: Id,
) -> Result<Option<SlotReleaseOutcome>> {
if let Some(pool) = &self.db_pool {
return Ok(
ExecutionAdmissionRepository::release_active_slot(pool, execution_id)
.await?
.map(Self::map_release_outcome),
);
}
let Some((_, queue_key)) = self.active_execution_keys.remove(&execution_id) else {
debug!(
"No active queue slot found for execution {} (queue may have been cleared)",
@@ -610,6 +667,16 @@ impl ExecutionQueueManager {
execution_id: Id,
outcome: &SlotReleaseOutcome,
) -> Result<()> {
if let Some(pool) = &self.db_pool {
ExecutionAdmissionRepository::restore_active_slot(
pool,
execution_id,
&Self::to_admission_release_outcome(outcome),
)
.await?;
return Ok(());
}
let action_id = outcome.queue_key.action_id;
let queue_arc = self.get_or_create_queue(outcome.queue_key.clone(), 1).await;
let mut queue = queue_arc.lock().await;
@@ -630,6 +697,14 @@ impl ExecutionQueueManager {
&self,
execution_id: Id,
) -> Result<Option<QueuedRemovalOutcome>> {
if let Some(pool) = &self.db_pool {
return Ok(
ExecutionAdmissionRepository::remove_queued_execution(pool, execution_id)
.await?
.map(Self::map_removal_outcome),
);
}
for entry in self.queues.iter() {
let queue_key = entry.key().clone();
let queue_arc = entry.value().clone();
@@ -666,6 +741,15 @@ impl ExecutionQueueManager {
}
pub async fn restore_queued_execution(&self, outcome: &QueuedRemovalOutcome) -> Result<()> {
if let Some(pool) = &self.db_pool {
ExecutionAdmissionRepository::restore_queued_execution(
pool,
&Self::to_admission_removal_outcome(outcome),
)
.await?;
return Ok(());
}
let action_id = outcome.queue_key.action_id;
let queue_arc = self.get_or_create_queue(outcome.queue_key.clone(), 1).await;
let mut queue = queue_arc.lock().await;
@@ -709,6 +793,19 @@ impl ExecutionQueueManager {
/// Get statistics for a specific action's queue
pub async fn get_queue_stats(&self, action_id: Id) -> Option<QueueStats> {
if let Some(pool) = &self.db_pool {
return ExecutionAdmissionRepository::get_queue_stats(pool, action_id)
.await
.map(|stats| stats.map(Self::map_queue_stats))
.unwrap_or_else(|err| {
warn!(
"Failed to load shared queue stats for action {}: {}",
action_id, err
);
None
});
}
let queue_arcs: Vec<Arc<Mutex<ActionQueue>>> = self
.queues
.iter()
@@ -757,6 +854,26 @@ impl ExecutionQueueManager {
/// Get statistics for all queues
#[allow(dead_code)]
pub async fn get_all_queue_stats(&self) -> Vec<QueueStats> {
if let Some(pool) = &self.db_pool {
return QueueStatsRepository::list_all(pool)
.await
.map(|stats| {
stats
.into_iter()
.map(|stat| QueueStats {
action_id: stat.action_id,
queue_length: stat.queue_length as usize,
active_count: stat.active_count as u32,
max_concurrent: stat.max_concurrent as u32,
oldest_enqueued_at: stat.oldest_enqueued_at,
total_enqueued: stat.total_enqueued as u64,
total_completed: stat.total_completed as u64,
})
.collect()
})
.unwrap_or_default();
}
let mut stats = Vec::new();
let mut action_ids = std::collections::BTreeSet::new();
@@ -787,6 +904,14 @@ impl ExecutionQueueManager {
/// * `Ok(false)` - Execution not found in queue
#[allow(dead_code)]
pub async fn cancel_execution(&self, action_id: Id, execution_id: Id) -> Result<bool> {
if let Some(pool) = &self.db_pool {
return Ok(
ExecutionAdmissionRepository::remove_queued_execution(pool, execution_id)
.await?
.is_some(),
);
}
debug!(
"Attempting to cancel execution {} for action {}",
execution_id, action_id
@@ -838,12 +963,147 @@ impl ExecutionQueueManager {
/// Get the number of actions with active queues
#[allow(dead_code)]
pub fn active_queue_count(&self) -> usize {
if self.db_pool.is_some() {
return 0;
}
self.queues
.iter()
.map(|entry| entry.key().action_id)
.collect::<std::collections::BTreeSet<_>>()
.len()
}
async fn enqueue_and_wait_db(
&self,
action_id: Id,
execution_id: Id,
max_concurrent: u32,
group_key: Option<String>,
) -> Result<()> {
let pool = self
.db_pool
.as_ref()
.ok_or_else(|| anyhow::anyhow!("database pool required for shared admission"))?;
match ExecutionAdmissionRepository::enqueue(
pool,
self.config.max_queue_length,
action_id,
execution_id,
max_concurrent,
group_key.clone(),
)
.await?
{
AdmissionEnqueueOutcome::Acquired => return Ok(()),
AdmissionEnqueueOutcome::Enqueued => {}
}
let deadline = Instant::now() + Duration::from_secs(self.config.queue_timeout_seconds);
loop {
sleep(Duration::from_millis(10)).await;
match ExecutionAdmissionRepository::wait_status(pool, execution_id).await? {
Some(true) => return Ok(()),
Some(false) => {}
None => {
return Err(anyhow::anyhow!(
"Queue state for execution {} disappeared while waiting",
execution_id
));
}
}
if Instant::now() < deadline {
continue;
}
match ExecutionAdmissionRepository::remove_queued_execution(pool, execution_id).await? {
Some(_) => {
return Err(anyhow::anyhow!(
"Queue timeout for execution {}: waited {} seconds",
execution_id,
self.config.queue_timeout_seconds
));
}
None => {
if matches!(
ExecutionAdmissionRepository::wait_status(pool, execution_id).await?,
Some(true)
) {
return Ok(());
}
return Err(anyhow::anyhow!(
"Queue timeout for execution {}: waited {} seconds",
execution_id,
self.config.queue_timeout_seconds
));
}
}
}
}
fn map_release_outcome(outcome: AdmissionSlotReleaseOutcome) -> SlotReleaseOutcome {
SlotReleaseOutcome {
next_execution_id: outcome.next_execution_id,
queue_key: QueueKey {
action_id: outcome.action_id,
group_key: outcome.group_key,
},
}
}
fn to_admission_release_outcome(outcome: &SlotReleaseOutcome) -> AdmissionSlotReleaseOutcome {
AdmissionSlotReleaseOutcome {
action_id: outcome.queue_key.action_id,
group_key: outcome.queue_key.group_key.clone(),
next_execution_id: outcome.next_execution_id,
}
}
fn map_removal_outcome(outcome: AdmissionQueuedRemovalOutcome) -> QueuedRemovalOutcome {
QueuedRemovalOutcome {
next_execution_id: outcome.next_execution_id,
queue_key: QueueKey {
action_id: outcome.action_id,
group_key: outcome.group_key,
},
removed_entry: QueueEntry {
execution_id: outcome.execution_id,
queue_order: Some(outcome.queue_order),
enqueued_at: outcome.enqueued_at,
},
removed_index: outcome.removed_index,
}
}
fn to_admission_removal_outcome(
outcome: &QueuedRemovalOutcome,
) -> AdmissionQueuedRemovalOutcome {
AdmissionQueuedRemovalOutcome {
action_id: outcome.queue_key.action_id,
group_key: outcome.queue_key.group_key.clone(),
next_execution_id: outcome.next_execution_id,
execution_id: outcome.removed_entry.execution_id,
queue_order: outcome.removed_entry.queue_order.unwrap_or_default(),
enqueued_at: outcome.removed_entry.enqueued_at,
removed_index: outcome.removed_index,
}
}
fn map_queue_stats(stats: AdmissionQueueStats) -> QueueStats {
QueueStats {
action_id: stats.action_id,
queue_length: stats.queue_length,
active_count: stats.active_count,
max_concurrent: stats.max_concurrent,
oldest_enqueued_at: stats.oldest_enqueued_at,
total_enqueued: stats.total_enqueued,
total_completed: stats.total_completed,
}
}
}
#[cfg(test)]

View File

@@ -25,12 +25,12 @@ use attune_common::{
workflow::{
CreateWorkflowExecutionInput, WorkflowDefinitionRepository, WorkflowExecutionRepository,
},
Create, FindById, FindByRef, Update,
FindById, FindByRef, Update,
},
runtime_detection::runtime_aliases_contain,
workflow::WorkflowDefinition,
};
use chrono::Utc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use sqlx::{PgConnection, PgPool};
@@ -102,6 +102,17 @@ struct ExecutionScheduledPayload {
worker_id: i64,
action_ref: String,
config: Option<JsonValue>,
scheduled_attempt_updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone)]
struct PendingExecutionRequested {
execution_id: i64,
action_id: i64,
action_ref: String,
parent_id: i64,
enforcement_id: Option<i64>,
config: Option<JsonValue>,
}
/// Execution scheduler that routes executions to workers
@@ -509,6 +520,7 @@ impl ExecutionScheduler {
&worker.id,
&envelope.payload.action_ref,
&execution_config,
scheduled_execution.updated,
&action,
)
.await
@@ -1021,13 +1033,13 @@ impl ExecutionScheduler {
#[allow(clippy::too_many_arguments)]
async fn dispatch_workflow_task_with_conn(
conn: &mut PgConnection,
publisher: &Publisher,
_round_robin_counter: &AtomicUsize,
parent_execution: &Execution,
workflow_execution_id: &i64,
task_node: &crate::workflow::graph::TaskNode,
wf_ctx: &WorkflowContext,
triggered_by: Option<&str>,
pending_messages: &mut Vec<PendingExecutionRequested>,
) -> Result<()> {
let action_ref: String = match &task_node.action {
Some(a) => a.clone(),
@@ -1059,7 +1071,6 @@ impl ExecutionScheduler {
if let Some(ref with_items_expr) = task_node.with_items {
return Self::dispatch_with_items_task_with_conn(
conn,
publisher,
parent_execution,
workflow_execution_id,
task_node,
@@ -1068,6 +1079,7 @@ impl ExecutionScheduler {
with_items_expr,
wf_ctx,
triggered_by,
pending_messages,
)
.await;
}
@@ -1115,17 +1127,7 @@ impl ExecutionScheduler {
completed_at: None,
};
let child_execution = if let Some(existing) = ExecutionRepository::find_by_workflow_task(
&mut *conn,
*workflow_execution_id,
&task_node.name,
None,
)
.await?
{
existing
} else {
ExecutionRepository::create(
let child_execution_result = ExecutionRepository::create_workflow_task_if_absent_with_conn(
&mut *conn,
CreateExecutionInput {
action: Some(task_action.id),
@@ -1140,11 +1142,14 @@ impl ExecutionScheduler {
result: None,
workflow_task: Some(workflow_task),
},
*workflow_execution_id,
&task_node.name,
None,
)
.await?
};
.await?;
let child_execution = child_execution_result.execution;
if child_execution.status == ExecutionStatus::Requested {
if child_execution_result.created {
info!(
"Created child execution {} for workflow task '{}' (action '{}', workflow_execution {})",
child_execution.id, task_node.name, action_ref, workflow_execution_id
@@ -1157,24 +1162,14 @@ impl ExecutionScheduler {
}
if child_execution.status == ExecutionStatus::Requested {
let payload = ExecutionRequestedPayload {
pending_messages.push(PendingExecutionRequested {
execution_id: child_execution.id,
action_id: Some(task_action.id),
action_id: task_action.id,
action_ref: action_ref.clone(),
parent_id: Some(parent_execution.id),
parent_id: parent_execution.id,
enforcement_id: parent_execution.enforcement,
config: child_execution.config.clone(),
};
let envelope = MessageEnvelope::new(MessageType::ExecutionRequested, payload)
.with_source("executor-scheduler");
publisher.publish_envelope(&envelope).await?;
info!(
"Published ExecutionRequested for child execution {} (task '{}')",
child_execution.id, task_node.name
);
});
}
Ok(())
@@ -1392,7 +1387,6 @@ impl ExecutionScheduler {
#[allow(clippy::too_many_arguments)]
async fn dispatch_with_items_task_with_conn(
conn: &mut PgConnection,
publisher: &Publisher,
parent_execution: &Execution,
workflow_execution_id: &i64,
task_node: &crate::workflow::graph::TaskNode,
@@ -1401,6 +1395,7 @@ impl ExecutionScheduler {
with_items_expr: &str,
wf_ctx: &WorkflowContext,
triggered_by: Option<&str>,
pending_messages: &mut Vec<PendingExecutionRequested>,
) -> Result<()> {
let items_value = wf_ctx
.render_json(&JsonValue::String(with_items_expr.to_string()))
@@ -1511,18 +1506,8 @@ impl ExecutionScheduler {
completed_at: None,
};
let child_execution = if let Some(existing) =
ExecutionRepository::find_by_workflow_task(
&mut *conn,
*workflow_execution_id,
&task_node.name,
Some(index as i32),
)
.await?
{
existing
} else {
ExecutionRepository::create(
let child_execution_result =
ExecutionRepository::create_workflow_task_if_absent_with_conn(
&mut *conn,
CreateExecutionInput {
action: Some(task_action.id),
@@ -1537,11 +1522,14 @@ impl ExecutionScheduler {
result: None,
workflow_task: Some(workflow_task),
},
*workflow_execution_id,
&task_node.name,
Some(index as i32),
)
.await?
};
.await?;
let child_execution = child_execution_result.execution;
if child_execution.status == ExecutionStatus::Requested {
if child_execution_result.created {
info!(
"Created with_items child execution {} for task '{}' item {} \
(action '{}', workflow_execution {})",
@@ -1566,11 +1554,11 @@ impl ExecutionScheduler {
if child.status == ExecutionStatus::Requested {
Self::publish_execution_requested_with_conn(
&mut *conn,
publisher,
child_id,
task_action.id,
action_ref,
parent_execution,
pending_messages,
)
.await?;
}
@@ -1622,25 +1610,17 @@ impl ExecutionScheduler {
Ok(())
}
async fn publish_execution_requested_with_conn(
conn: &mut PgConnection,
async fn publish_execution_requested_payload(
publisher: &Publisher,
execution_id: i64,
action_id: i64,
action_ref: &str,
parent_execution: &Execution,
pending: PendingExecutionRequested,
) -> Result<()> {
let child = ExecutionRepository::find_by_id(&mut *conn, execution_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?;
let payload = ExecutionRequestedPayload {
execution_id: child.id,
action_id: Some(action_id),
action_ref: action_ref.to_string(),
parent_id: Some(parent_execution.id),
enforcement_id: parent_execution.enforcement,
config: child.config.clone(),
execution_id: pending.execution_id,
action_id: Some(pending.action_id),
action_ref: pending.action_ref,
parent_id: Some(pending.parent_id),
enforcement_id: pending.enforcement_id,
config: pending.config,
};
let envelope = MessageEnvelope::new(MessageType::ExecutionRequested, payload)
@@ -1650,12 +1630,36 @@ impl ExecutionScheduler {
debug!(
"Published deferred ExecutionRequested for child execution {}",
execution_id
envelope.payload.execution_id
);
Ok(())
}
async fn publish_execution_requested_with_conn(
conn: &mut PgConnection,
execution_id: i64,
action_id: i64,
action_ref: &str,
parent_execution: &Execution,
pending_messages: &mut Vec<PendingExecutionRequested>,
) -> Result<()> {
let child = ExecutionRepository::find_by_id(&mut *conn, execution_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?;
pending_messages.push(PendingExecutionRequested {
execution_id: child.id,
action_id,
action_ref: action_ref.to_string(),
parent_id: parent_execution.id,
enforcement_id: parent_execution.enforcement,
config: child.config.clone(),
});
Ok(())
}
/// Publish the next `Requested`-status with_items siblings to fill freed
/// concurrency slots.
///
@@ -1734,11 +1738,11 @@ impl ExecutionScheduler {
async fn publish_pending_with_items_children_with_conn(
conn: &mut PgConnection,
publisher: &Publisher,
parent_execution: &Execution,
workflow_execution_id: i64,
task_name: &str,
slots: usize,
pending_messages: &mut Vec<PendingExecutionRequested>,
) -> Result<usize> {
if slots == 0 {
return Ok(0);
@@ -1768,11 +1772,11 @@ impl ExecutionScheduler {
if let Err(e) = Self::publish_execution_requested_with_conn(
&mut *conn,
publisher,
*child_id,
*action_id,
&child.action_ref,
parent_execution,
pending_messages,
)
.await
{
@@ -1819,12 +1823,35 @@ impl ExecutionScheduler {
.execute(&mut *lock_conn)
.await?;
let result = Self::advance_workflow_serialized(
&mut lock_conn,
publisher,
round_robin_counter,
execution,
)
let result = async {
sqlx::query("BEGIN").execute(&mut *lock_conn).await?;
let advance_result =
Self::advance_workflow_serialized(&mut lock_conn, round_robin_counter, execution)
.await;
match advance_result {
Ok(pending_messages) => {
sqlx::query("COMMIT").execute(&mut *lock_conn).await?;
for pending in pending_messages {
Self::publish_execution_requested_payload(publisher, pending).await?;
}
Ok(())
}
Err(err) => {
let rollback_result = sqlx::query("ROLLBACK").execute(&mut *lock_conn).await;
if let Err(rollback_err) = rollback_result {
error!(
"Failed to roll back workflow_execution {} advancement transaction: {}",
workflow_execution_id, rollback_err
);
}
Err(err)
}
}
}
.await;
let unlock_result = sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(workflow_execution_id)
@@ -1838,13 +1865,12 @@ impl ExecutionScheduler {
async fn advance_workflow_serialized(
conn: &mut PgConnection,
publisher: &Publisher,
round_robin_counter: &AtomicUsize,
execution: &Execution,
) -> Result<()> {
) -> Result<Vec<PendingExecutionRequested>> {
let workflow_task = match &execution.workflow_task {
Some(wt) => wt,
None => return Ok(()), // Not a workflow task, nothing to do
None => return Ok(vec![]), // Not a workflow task, nothing to do
};
let workflow_execution_id = workflow_task.workflow_execution;
@@ -1867,7 +1893,7 @@ impl ExecutionScheduler {
// Load the workflow execution record
let workflow_execution =
WorkflowExecutionRepository::find_by_id(&mut *conn, workflow_execution_id)
WorkflowExecutionRepository::find_by_id_for_update(&mut *conn, workflow_execution_id)
.await?
.ok_or_else(|| {
anyhow::anyhow!("Workflow execution {} not found", workflow_execution_id)
@@ -1882,9 +1908,11 @@ impl ExecutionScheduler {
"Workflow execution {} already in terminal state {:?}, skipping advance",
workflow_execution_id, workflow_execution.status
);
return Ok(());
return Ok(vec![]);
}
let mut pending_messages = Vec::new();
let parent_execution =
ExecutionRepository::find_by_id(&mut *conn, workflow_execution.execution)
.await?
@@ -1944,7 +1972,7 @@ impl ExecutionScheduler {
);
}
return Ok(());
return Ok(pending_messages);
}
// Load the workflow definition so we can apply param_schema defaults
@@ -2021,11 +2049,11 @@ impl ExecutionScheduler {
if free_slots > 0 {
if let Err(e) = Self::publish_pending_with_items_children_with_conn(
&mut *conn,
publisher,
&parent_for_pending,
workflow_execution_id,
task_name,
free_slots,
&mut pending_messages,
)
.await
{
@@ -2060,7 +2088,7 @@ impl ExecutionScheduler {
workflow_task.task_index.unwrap_or(-1),
siblings_remaining.len(),
);
return Ok(());
return Ok(pending_messages);
}
// ---------------------------------------------------------
@@ -2093,7 +2121,7 @@ impl ExecutionScheduler {
another advance_workflow call already handled final completion, skipping",
task_name,
);
return Ok(());
return Ok(pending_messages);
}
// All items done — check if any failed
@@ -2280,13 +2308,13 @@ impl ExecutionScheduler {
if let Some(task_node) = graph.get_task(next_task_name) {
if let Err(e) = Self::dispatch_workflow_task_with_conn(
&mut *conn,
publisher,
round_robin_counter,
&parent_execution,
&workflow_execution_id,
task_node,
&wf_ctx,
Some(task_name), // predecessor that triggered this task
&mut pending_messages,
)
.await
{
@@ -2349,7 +2377,7 @@ impl ExecutionScheduler {
.await?;
}
Ok(())
Ok(pending_messages)
}
/// Count child executions that are still in progress for a workflow.
@@ -3139,6 +3167,7 @@ impl ExecutionScheduler {
worker_id: &i64,
action_ref: &str,
config: &Option<JsonValue>,
scheduled_attempt_updated_at: DateTime<Utc>,
_action: &Action,
) -> Result<()> {
debug!("Queuing execution {} to worker {}", execution_id, worker_id);
@@ -3149,6 +3178,7 @@ impl ExecutionScheduler {
worker_id: *worker_id,
action_ref: action_ref.to_string(),
config: config.clone(),
scheduled_attempt_updated_at,
};
let envelope =

View File

@@ -12,7 +12,10 @@ use anyhow::Result;
use attune_common::{
models::{enums::ExecutionStatus, Execution},
mq::{MessageEnvelope, MessageType, Publisher},
repositories::execution::SELECT_COLUMNS as EXECUTION_COLUMNS,
repositories::{
execution::{UpdateExecutionInput, SELECT_COLUMNS as EXECUTION_COLUMNS},
ExecutionRepository,
},
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
@@ -178,20 +181,27 @@ impl ExecutionTimeoutMonitor {
"original_status": "scheduled"
});
// Update execution status in database
sqlx::query(
"UPDATE execution
SET status = $1,
result = $2,
updated = NOW()
WHERE id = $3",
let updated = ExecutionRepository::update_if_status_and_updated_before(
&self.pool,
execution_id,
ExecutionStatus::Scheduled,
self.calculate_cutoff_time(),
UpdateExecutionInput {
status: Some(ExecutionStatus::Failed),
result: Some(result.clone()),
..Default::default()
},
)
.bind(ExecutionStatus::Failed)
.bind(&result)
.bind(execution_id)
.execute(&self.pool)
.await?;
if updated.is_none() {
debug!(
"Skipping timeout failure for execution {} because it already left Scheduled or is no longer stale",
execution_id
);
return Ok(());
}
info!("Execution {} marked as failed in database", execution_id);
// Publish completion notification

View File

@@ -912,6 +912,115 @@ async fn test_queue_stats_persistence() {
cleanup_test_data(&pool, pack_id).await;
}
#[tokio::test]
#[ignore] // Requires database
async fn test_release_restore_recovers_active_slot_and_next_queue_head() {
let pool = setup_db().await;
let timestamp = Utc::now().timestamp();
let suffix = format!("restore_release_{}", timestamp);
let pack_id = create_test_pack(&pool, &suffix).await;
let pack_ref = format!("fifo_test_pack_{}", suffix);
let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await;
let action_ref = format!("fifo_test_action_{}", suffix);
let manager = ExecutionQueueManager::with_db_pool(QueueConfig::default(), pool.clone());
let first =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
let second =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
let third =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
manager.enqueue(action_id, first, 1, None).await.unwrap();
manager.enqueue(action_id, second, 1, None).await.unwrap();
manager.enqueue(action_id, third, 1, None).await.unwrap();
let stats = manager.get_queue_stats(action_id).await.unwrap();
assert_eq!(stats.active_count, 1);
assert_eq!(stats.queue_length, 2);
let release = manager
.release_active_slot(first)
.await
.unwrap()
.expect("first execution should own an active slot");
assert_eq!(release.next_execution_id, Some(second));
let stats = manager.get_queue_stats(action_id).await.unwrap();
assert_eq!(stats.active_count, 1);
assert_eq!(stats.queue_length, 1);
manager.restore_active_slot(first, &release).await.unwrap();
let stats = manager.get_queue_stats(action_id).await.unwrap();
assert_eq!(stats.active_count, 1);
assert_eq!(stats.queue_length, 2);
assert_eq!(stats.total_completed, 0);
let next = manager
.release_active_slot(first)
.await
.unwrap()
.expect("restored execution should still own the active slot");
assert_eq!(next.next_execution_id, Some(second));
cleanup_test_data(&pool, pack_id).await;
}
#[tokio::test]
#[ignore] // Requires database
async fn test_remove_restore_recovers_queued_execution_position() {
let pool = setup_db().await;
let timestamp = Utc::now().timestamp();
let suffix = format!("restore_queue_{}", timestamp);
let pack_id = create_test_pack(&pool, &suffix).await;
let pack_ref = format!("fifo_test_pack_{}", suffix);
let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await;
let action_ref = format!("fifo_test_action_{}", suffix);
let manager = ExecutionQueueManager::with_db_pool(QueueConfig::default(), pool.clone());
let first =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
let second =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
let third =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
manager.enqueue(action_id, first, 1, None).await.unwrap();
manager.enqueue(action_id, second, 1, None).await.unwrap();
manager.enqueue(action_id, third, 1, None).await.unwrap();
let removal = manager
.remove_queued_execution(second)
.await
.unwrap()
.expect("second execution should be queued");
assert_eq!(removal.next_execution_id, None);
let stats = manager.get_queue_stats(action_id).await.unwrap();
assert_eq!(stats.active_count, 1);
assert_eq!(stats.queue_length, 1);
manager.restore_queued_execution(&removal).await.unwrap();
let stats = manager.get_queue_stats(action_id).await.unwrap();
assert_eq!(stats.active_count, 1);
assert_eq!(stats.queue_length, 2);
let release = manager
.release_active_slot(first)
.await
.unwrap()
.expect("first execution should own the active slot");
assert_eq!(release.next_execution_id, Some(second));
cleanup_test_data(&pool, pack_id).await;
}
#[tokio::test]
#[ignore] // Requires database
async fn test_queue_full_rejection() {

View File

@@ -65,6 +65,35 @@ A workflow execution should have exactly one active mutator at a time when evalu
## Proposed Implementation Phases
## Current Status
As of the current implementation state:
- Phase 1 is substantially implemented.
- Phases 2, 3, 4, and 5 are implemented.
Completed so far:
- Atomic `requested -> scheduling` claim support was added in `ExecutionRepository`.
- Scheduler state transitions for regular action dispatch were converted to conditional/CAS-style updates.
- Redelivered `execution.requested` messages for stale `scheduling` rows are now retried/reclaimed instead of being silently acknowledged away.
- Shared concurrency/FIFO coordination now uses durable PostgreSQL admission tables for action/group slot ownership and queued execution ordering.
- `ExecutionQueueManager` now acts as a thin API-compatible facade over the DB-backed admission path when constructed with a pool.
- Slot release, queued removal, and rollback/restore flows now operate against shared DB state rather than process-local memory.
- `queue_stats` remains derived telemetry, but it is now refreshed transactionally from the shared admission state.
- Workflow start is now idempotent at the parent workflow state level via `workflow_execution(execution)` uniqueness plus repository create-or-get behavior.
- Workflow advancement now runs under a per-workflow PostgreSQL advisory lock, row-locks `workflow_execution` with `SELECT ... FOR UPDATE`, and performs serialized mutation inside an explicit SQL transaction.
- Durable workflow child dispatch dedupe is now enforced with the `workflow_task_dispatch` coordination table and repository create-or-get helpers.
- `execution` and `enforcement` were switched from Timescale hypertables back to normal PostgreSQL tables to remove HA/idempotency friction around foreign keys and unique constraints. `event` remains a hypertable, and history tables remain Timescale-backed.
- Direct uniqueness/idempotency invariants were added for `enforcement(rule, event)`, top-level `execution(enforcement)`, and `inquiry(execution)`.
- Event, enforcement, and inquiry handlers were updated to use create-or-get flows and conditional status transitions so duplicate delivery becomes safe.
- Timeout and DLQ recovery loops now use conditional state transitions and only emit side effects when the guarded update succeeds.
Partially complete / still open:
- HA-focused integration and failure-injection coverage still needs to be expanded around the new invariants and recovery behavior.
- The new migrations and DB-backed FIFO tests still need end-to-end validation against a real Postgres/Timescale environment.
## Phase 1: Atomic Execution Claiming
### Objective
@@ -93,6 +122,10 @@ Ensure only one executor replica can claim a `requested` execution for schedulin
- Two schedulers racing on the same execution cannot both dispatch it
- Redelivered `execution.requested` messages become harmless no-ops after the first successful claim
### Status
Implemented for regular action scheduling, with additional stale-claim recovery for redelivered `execution.requested` messages. The remaining gap for this area is broader integration with the still-pending shared admission/queueing work in Phase 2.
## Phase 2: Shared Concurrency Control and FIFO Queueing
### Objective
@@ -147,6 +180,19 @@ Alternative naming is fine, but the design needs to support:
- FIFO ordering holds across multiple executor replicas
- Restarting an executor does not lose queue ownership state
### Status
Implemented.
Completed:
- Shared admission state now lives in PostgreSQL via durable action/group queue rows and execution entry rows.
- Action-level concurrency limits and parameter-group concurrency keys are enforced against that shared admission state.
- FIFO ordering is determined by durable queued-entry order rather than process-local memory.
- Completion-time slot release promotes the next queued execution inside the same DB transaction.
- Rollback helpers can restore released slots or removed queued entries if republish/cleanup fails after the DB mutation.
- `ExecutionQueueManager` remains as a facade for the existing scheduler/policy code paths, but it no longer acts as the correctness source of truth when running with a DB pool.
## Phase 3: Workflow Start Idempotency and Serialized Advancement
### Objective
@@ -196,6 +242,20 @@ This may be implemented with explicit columns or a dedupe table if indexing the
- Duplicate `execution.completed` delivery for a workflow child cannot create duplicate successor executions
- Two executor replicas cannot concurrently mutate the same workflow state
### Status
Implemented.
Completed:
- `workflow_execution(execution)` uniqueness is part of the workflow schema and workflow start uses create-or-get semantics.
- Workflow parent executions are claimed before orchestration starts.
- Workflow advancement now runs under a per-workflow PostgreSQL advisory lock held on the same DB connection that performs the serialized advancement work.
- The serialized workflow path is wrapped in an explicit SQL transaction.
- `workflow_execution` is row-locked with `SELECT ... FOR UPDATE` before mutation.
- Successor/child dispatch dedupe is enforced with the durable `workflow_task_dispatch` table keyed by `(workflow_execution, task_name, COALESCE(task_index, -1))`.
- Child `ExecutionRequested` messages are staged and published only after the workflow transaction commits.
## Phase 4: Idempotent Event, Enforcement, and Inquiry Handling
### Objective
@@ -241,6 +301,19 @@ WHERE event IS NOT NULL;
- Duplicate `enforcement.created` does not create duplicate executions
- Duplicate completion handling does not create duplicate inquiries
### Status
Implemented.
Completed:
- `enforcement(rule, event)` uniqueness is enforced directly with a partial unique index when both keys are present.
- Top-level execution creation is deduped with a unique invariant on `execution(enforcement)` where `parent IS NULL`.
- Inquiry creation is deduped with a unique invariant on `inquiry(execution)`.
- `event_processor` now uses create-or-get enforcement handling and only republishes when the persisted enforcement still needs processing.
- `enforcement_processor` now skips duplicate non-`created` enforcements, creates or reuses the top-level execution, and conditionally resolves enforcement state.
- `inquiry_handler` now uses create-or-get inquiry handling and only emits `InquiryCreated` when the inquiry was actually created.
## Phase 5: Safe Recovery Loops
### Objective
@@ -273,6 +346,17 @@ Make timeout and DLQ processing safe under races and multiple replicas.
- DLQ handler cannot overwrite newer state
- Running multiple timeout monitors produces no conflicting state transitions
### Status
Implemented.
Completed:
- Timeout failure now uses a conditional transition that only succeeds when the execution is still `scheduled` and still older than the timeout cutoff.
- Timeout-driven completion side effects are only published when that guarded update succeeds.
- DLQ handling now treats messages as stale unless the execution is still exactly `scheduled`.
- DLQ failure transitions now use conditional status updates and no longer overwrite newer `running` or terminal state.
## Testing Plan
Add focused HA tests after the repository and scheduler primitives are in place.
@@ -298,13 +382,10 @@ Add focused HA tests after the repository and scheduler primitives are in place.
## Recommended Execution Order for Next Session
1. Add migrations and repository primitives for atomic execution claim
2. Convert scheduler to claim-first semantics
3. Implement shared DB-backed concurrency/FIFO coordination
4. Add workflow uniqueness and serialized advancement
5. Add idempotency to event/enforcement/inquiry paths
6. Fix timeout and DLQ handlers to use conditional transitions
7. Add HA-focused tests
1. Add more HA-focused integration tests for duplicate delivery, cross-replica completion, and recovery rollback paths
2. Add failure-injection tests for crash/replay scenarios around `scheduling` reclaim, workflow advancement, and post-commit publish paths
3. Validate the new migrations and DB-backed FIFO behavior end-to-end against a real Postgres/Timescale environment
4. Consider a small follow-up cleanup pass to reduce or remove the in-memory fallback code in `ExecutionQueueManager` once the DB path is fully baked
## Expected Outcome
@@ -315,3 +396,5 @@ After this plan is implemented, the executor should be able to scale horizontall
- correct workflow orchestration
- safe replay handling
- safe recovery behavior during failures and redelivery
At the current state, the core executor HA phases are implemented. The remaining work is confidence-building: failure-injection coverage, multi-replica integration testing, and end-to-end migration validation in a live database environment.

View File

@@ -201,6 +201,9 @@ CREATE INDEX idx_enforcement_rule_status ON enforcement(rule, status);
CREATE INDEX idx_enforcement_event_status ON enforcement(event, status);
CREATE INDEX idx_enforcement_payload_gin ON enforcement USING GIN (payload);
CREATE INDEX idx_enforcement_conditions_gin ON enforcement USING GIN (conditions);
CREATE UNIQUE INDEX uq_enforcement_rule_event
ON enforcement (rule, event)
WHERE rule IS NOT NULL AND event IS NOT NULL;
-- Comments
COMMENT ON TABLE enforcement IS 'Enforcements represent rule triggering by events';

View File

@@ -4,13 +4,8 @@
-- Consolidates former migrations: 000006 (execution_system), 000008
-- (worker_notification), 000014 (worker_table), and 20260209 (phase3).
--
-- NOTE: The execution table is converted to a TimescaleDB hypertable in
-- migration 000009. Hypertables cannot be the target of FK constraints,
-- so columns referencing execution (inquiry.execution, workflow_execution.execution)
-- are plain BIGINT with no FK. Similarly, columns ON the execution table that
-- would self-reference or reference other hypertables (parent, enforcement,
-- original_execution) are plain BIGINT. The action and executor FKs are also
-- omitted since they would need to be dropped during hypertable conversion.
-- NOTE: `execution` remains a regular PostgreSQL table. Time-series
-- audit and analytics are handled by `execution_history`.
-- Version: 20250101000005
-- ============================================================================
@@ -19,27 +14,27 @@
CREATE TABLE execution (
id BIGSERIAL PRIMARY KEY,
action BIGINT, -- references action(id); no FK because execution becomes a hypertable
action BIGINT,
action_ref TEXT NOT NULL,
config JSONB,
env_vars JSONB,
parent BIGINT, -- self-reference; no FK because execution becomes a hypertable
enforcement BIGINT, -- references enforcement(id); no FK (both are hypertables)
executor BIGINT, -- references identity(id); no FK because execution becomes a hypertable
worker BIGINT, -- references worker(id); no FK because execution becomes a hypertable
parent BIGINT,
enforcement BIGINT,
executor BIGINT,
worker BIGINT,
status execution_status_enum NOT NULL DEFAULT 'requested',
result JSONB,
started_at TIMESTAMPTZ, -- set when execution transitions to 'running'
created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
is_workflow BOOLEAN DEFAULT false NOT NULL,
workflow_def BIGINT, -- references workflow_definition(id); no FK because execution becomes a hypertable
workflow_def BIGINT,
workflow_task JSONB,
-- Retry tracking (baked in from phase 3)
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER,
retry_reason TEXT,
original_execution BIGINT, -- self-reference; no FK because execution becomes a hypertable
original_execution BIGINT,
updated TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@@ -64,6 +59,11 @@ CREATE INDEX idx_execution_result_gin ON execution USING GIN (result);
CREATE INDEX idx_execution_env_vars_gin ON execution USING GIN (env_vars);
CREATE INDEX idx_execution_original_execution ON execution(original_execution) WHERE original_execution IS NOT NULL;
CREATE INDEX idx_execution_status_retry ON execution(status, retry_count) WHERE status = 'failed' AND retry_count < COALESCE(max_retries, 0);
CREATE UNIQUE INDEX uq_execution_top_level_enforcement
ON execution (enforcement)
WHERE enforcement IS NOT NULL
AND parent IS NULL
AND (config IS NULL OR NOT (config ? 'retry_of'));
-- Trigger
CREATE TRIGGER update_execution_updated
@@ -77,10 +77,10 @@ COMMENT ON COLUMN execution.action IS 'Action being executed (may be null if act
COMMENT ON COLUMN execution.action_ref IS 'Action reference (preserved even if action deleted)';
COMMENT ON COLUMN execution.config IS 'Snapshot of action configuration at execution time';
COMMENT ON COLUMN execution.env_vars IS 'Environment variables for this execution as key-value pairs (string -> string). These are set in the execution environment and are separate from action parameters. Used for execution context, configuration, and non-sensitive metadata.';
COMMENT ON COLUMN execution.parent IS 'Parent execution ID for workflow hierarchies (no FK — execution is a hypertable)';
COMMENT ON COLUMN execution.enforcement IS 'Enforcement that triggered this execution (no FK — both are hypertables)';
COMMENT ON COLUMN execution.executor IS 'Identity that initiated the execution (no FK — execution is a hypertable)';
COMMENT ON COLUMN execution.worker IS 'Assigned worker handling this execution (no FK — execution is a hypertable)';
COMMENT ON COLUMN execution.parent IS 'Parent execution ID for workflow hierarchies';
COMMENT ON COLUMN execution.enforcement IS 'Enforcement that triggered this execution';
COMMENT ON COLUMN execution.executor IS 'Identity that initiated the execution';
COMMENT ON COLUMN execution.worker IS 'Assigned worker handling this execution';
COMMENT ON COLUMN execution.status IS 'Current execution lifecycle status';
COMMENT ON COLUMN execution.result IS 'Execution output/results';
COMMENT ON COLUMN execution.retry_count IS 'Current retry attempt number (0 = first attempt, 1 = first retry, etc.)';
@@ -96,7 +96,7 @@ COMMENT ON COLUMN execution.original_execution IS 'ID of the original execution
CREATE TABLE inquiry (
id BIGSERIAL PRIMARY KEY,
execution BIGINT NOT NULL, -- references execution(id); no FK because execution is a hypertable
execution BIGINT NOT NULL,
prompt TEXT NOT NULL,
response_schema JSONB,
assigned_to BIGINT REFERENCES identity(id) ON DELETE SET NULL,
@@ -109,7 +109,7 @@ CREATE TABLE inquiry (
);
-- Indexes
CREATE INDEX idx_inquiry_execution ON inquiry(execution);
CREATE UNIQUE INDEX uq_inquiry_execution ON inquiry(execution) WHERE execution IS NOT NULL;
CREATE INDEX idx_inquiry_assigned_to ON inquiry(assigned_to);
CREATE INDEX idx_inquiry_status ON inquiry(status);
CREATE INDEX idx_inquiry_timeout_at ON inquiry(timeout_at) WHERE timeout_at IS NOT NULL;
@@ -127,7 +127,31 @@ CREATE TRIGGER update_inquiry_updated
-- Comments
COMMENT ON TABLE inquiry IS 'Inquiries enable human-in-the-loop workflows with async user interactions';
COMMENT ON COLUMN inquiry.execution IS 'Execution that is waiting on this inquiry (no FK — execution is a hypertable)';
COMMENT ON COLUMN inquiry.execution IS 'Execution that is waiting on this inquiry';
ALTER TABLE execution
ADD CONSTRAINT execution_action_fkey
FOREIGN KEY (action) REFERENCES action(id) ON DELETE SET NULL;
ALTER TABLE execution
ADD CONSTRAINT execution_parent_fkey
FOREIGN KEY (parent) REFERENCES execution(id) ON DELETE SET NULL;
ALTER TABLE execution
ADD CONSTRAINT execution_original_execution_fkey
FOREIGN KEY (original_execution) REFERENCES execution(id) ON DELETE SET NULL;
ALTER TABLE execution
ADD CONSTRAINT execution_enforcement_fkey
FOREIGN KEY (enforcement) REFERENCES enforcement(id) ON DELETE SET NULL;
ALTER TABLE execution
ADD CONSTRAINT execution_executor_fkey
FOREIGN KEY (executor) REFERENCES identity(id) ON DELETE SET NULL;
ALTER TABLE inquiry
ADD CONSTRAINT inquiry_execution_fkey
FOREIGN KEY (execution) REFERENCES execution(id) ON DELETE CASCADE;
COMMENT ON COLUMN inquiry.prompt IS 'Question or prompt text for the user';
COMMENT ON COLUMN inquiry.response_schema IS 'JSON schema defining expected response format';
COMMENT ON COLUMN inquiry.assigned_to IS 'Identity who should respond to this inquiry';
@@ -261,6 +285,10 @@ COMMENT ON COLUMN worker.capabilities IS 'Worker capabilities (e.g., max_concurr
COMMENT ON COLUMN worker.meta IS 'Additional worker metadata';
COMMENT ON COLUMN worker.last_heartbeat IS 'Timestamp of last heartbeat from worker';
ALTER TABLE execution
ADD CONSTRAINT execution_worker_fkey
FOREIGN KEY (worker) REFERENCES worker(id) ON DELETE SET NULL;
-- ============================================================================
-- NOTIFICATION TABLE
-- ============================================================================

View File

@@ -1,13 +1,11 @@
-- Migration: Workflow System
-- Description: Creates workflow_definition and workflow_execution tables
-- Description: Creates workflow_definition, workflow_execution, and
-- workflow_task_dispatch tables
-- (workflow_task_execution consolidated into execution.workflow_task JSONB)
--
-- NOTE: The execution table is converted to a TimescaleDB hypertable in
-- migration 000009. Hypertables cannot be the target of FK constraints,
-- so workflow_execution.execution is a plain BIGINT with no FK.
-- execution.workflow_def also has no FK (added as plain BIGINT in 000005)
-- since execution is a hypertable and FKs from hypertables are only
-- supported for simple cases — we omit it for consistency.
-- NOTE: `execution` remains a regular PostgreSQL table, so
-- workflow_execution.execution, workflow_task_dispatch.execution_id,
-- and execution.workflow_def use normal foreign keys.
-- Version: 20250101000006
-- ============================================================================
@@ -54,7 +52,7 @@ COMMENT ON COLUMN workflow_definition.out_schema IS 'JSON schema for workflow ou
CREATE TABLE workflow_execution (
id BIGSERIAL PRIMARY KEY,
execution BIGINT NOT NULL, -- references execution(id); no FK because execution is a hypertable
execution BIGINT NOT NULL REFERENCES execution(id) ON DELETE CASCADE,
workflow_def BIGINT NOT NULL REFERENCES workflow_definition(id) ON DELETE CASCADE,
current_tasks TEXT[] DEFAULT '{}',
completed_tasks TEXT[] DEFAULT '{}',
@@ -83,12 +81,51 @@ CREATE TRIGGER update_workflow_execution_updated
EXECUTE FUNCTION update_updated_column();
-- Comments
COMMENT ON TABLE workflow_execution IS 'Runtime state tracking for workflow executions. execution column has no FK — execution is a hypertable.';
COMMENT ON TABLE workflow_execution IS 'Runtime state tracking for workflow executions.';
COMMENT ON COLUMN workflow_execution.variables IS 'Workflow-scoped variables, updated via publish directives';
COMMENT ON COLUMN workflow_execution.task_graph IS 'Execution graph with dependencies and transitions';
COMMENT ON COLUMN workflow_execution.current_tasks IS 'Array of task names currently executing';
COMMENT ON COLUMN workflow_execution.paused IS 'True if workflow execution is paused (can be resumed)';
-- ============================================================================
-- WORKFLOW TASK DISPATCH TABLE
-- ============================================================================
CREATE TABLE workflow_task_dispatch (
id BIGSERIAL PRIMARY KEY,
workflow_execution BIGINT NOT NULL REFERENCES workflow_execution(id) ON DELETE CASCADE,
task_name TEXT NOT NULL,
task_index INT,
execution_id BIGINT,
created TIMESTAMPTZ DEFAULT NOW() NOT NULL,
updated TIMESTAMPTZ DEFAULT NOW() NOT NULL
);
CREATE UNIQUE INDEX uq_workflow_task_dispatch_identity
ON workflow_task_dispatch (
workflow_execution,
task_name,
COALESCE(task_index, -1)
);
CREATE INDEX idx_workflow_task_dispatch_execution_id
ON workflow_task_dispatch (execution_id)
WHERE execution_id IS NOT NULL;
CREATE TRIGGER update_workflow_task_dispatch_updated
BEFORE UPDATE ON workflow_task_dispatch
FOR EACH ROW
EXECUTE FUNCTION update_updated_column();
COMMENT ON TABLE workflow_task_dispatch IS
'Durable dedupe/ownership records for workflow child execution dispatch';
COMMENT ON COLUMN workflow_task_dispatch.execution_id IS
'Associated execution.id';
ALTER TABLE workflow_task_dispatch
ADD CONSTRAINT workflow_task_dispatch_execution_id_fkey
FOREIGN KEY (execution_id) REFERENCES execution(id) ON DELETE CASCADE;
-- ============================================================================
-- MODIFY ACTION TABLE - Add Workflow Support
-- ============================================================================
@@ -100,9 +137,9 @@ CREATE INDEX idx_action_workflow_def ON action(workflow_def);
COMMENT ON COLUMN action.workflow_def IS 'Reference to workflow definition (non-null means this action is a workflow)';
-- NOTE: execution.workflow_def has no FK constraint because execution is a
-- TimescaleDB hypertable (converted in migration 000009). The column was
-- created as a plain BIGINT in migration 000005.
ALTER TABLE execution
ADD CONSTRAINT execution_workflow_def_fkey
FOREIGN KEY (workflow_def) REFERENCES workflow_definition(id) ON DELETE SET NULL;
-- ============================================================================
-- WORKFLOW VIEWS

View File

@@ -1,6 +1,6 @@
-- Migration: Supporting Systems
-- Description: Creates keys, artifacts, queue_stats, pack_environment, pack_testing,
-- and webhook function tables.
-- Description: Creates keys, artifacts, queue_stats, execution_admission,
-- pack_environment, pack_testing, and webhook function tables.
-- Consolidates former migrations: 000009 (keys_artifacts), 000010 (webhook_system),
-- 000011 (pack_environments), and 000012 (pack_testing).
-- Version: 20250101000007
@@ -206,6 +206,76 @@ COMMENT ON COLUMN queue_stats.total_enqueued IS 'Total executions enqueued since
COMMENT ON COLUMN queue_stats.total_completed IS 'Total executions completed since queue creation';
COMMENT ON COLUMN queue_stats.last_updated IS 'Timestamp of last statistics update';
-- ============================================================================
-- EXECUTION ADMISSION TABLES
-- ============================================================================
CREATE TABLE execution_admission_state (
id BIGSERIAL PRIMARY KEY,
action_id BIGINT NOT NULL REFERENCES action(id) ON DELETE CASCADE,
group_key TEXT,
group_key_normalized TEXT GENERATED ALWAYS AS (COALESCE(group_key, '')) STORED,
max_concurrent INTEGER NOT NULL,
next_queue_order BIGINT NOT NULL DEFAULT 1,
total_enqueued BIGINT NOT NULL DEFAULT 0,
total_completed BIGINT NOT NULL DEFAULT 0,
created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT uq_execution_admission_state_identity
UNIQUE (action_id, group_key_normalized)
);
CREATE TABLE execution_admission_entry (
id BIGSERIAL PRIMARY KEY,
state_id BIGINT NOT NULL REFERENCES execution_admission_state(id) ON DELETE CASCADE,
execution_id BIGINT NOT NULL UNIQUE REFERENCES execution(id) ON DELETE CASCADE,
status TEXT NOT NULL CHECK (status IN ('active', 'queued')),
queue_order BIGINT NOT NULL,
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
activated_at TIMESTAMPTZ,
created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_execution_admission_state_action
ON execution_admission_state (action_id);
CREATE INDEX idx_execution_admission_entry_state_status_queue
ON execution_admission_entry (state_id, status, queue_order);
CREATE INDEX idx_execution_admission_entry_execution
ON execution_admission_entry (execution_id);
CREATE TRIGGER update_execution_admission_state_updated
BEFORE UPDATE ON execution_admission_state
FOR EACH ROW
EXECUTE FUNCTION update_updated_column();
CREATE TRIGGER update_execution_admission_entry_updated
BEFORE UPDATE ON execution_admission_entry
FOR EACH ROW
EXECUTE FUNCTION update_updated_column();
COMMENT ON TABLE execution_admission_state IS
'Shared admission state per action/group for executor concurrency and FIFO coordination';
COMMENT ON COLUMN execution_admission_state.group_key IS
'Optional parameter-derived concurrency grouping key';
COMMENT ON COLUMN execution_admission_state.max_concurrent IS
'Current concurrency limit for this action/group queue';
COMMENT ON COLUMN execution_admission_state.next_queue_order IS
'Monotonic sequence used to preserve exact FIFO order for queued executions';
COMMENT ON COLUMN execution_admission_state.total_enqueued IS
'Cumulative number of executions admitted into this queue';
COMMENT ON COLUMN execution_admission_state.total_completed IS
'Cumulative number of active executions released from this queue';
COMMENT ON TABLE execution_admission_entry IS
'Active slot ownership and queued executions for shared admission control';
COMMENT ON COLUMN execution_admission_entry.status IS
'active rows own a concurrency slot; queued rows wait in FIFO order';
COMMENT ON COLUMN execution_admission_entry.queue_order IS
'Durable FIFO position within an action/group queue';
-- ============================================================================
-- PACK ENVIRONMENT TABLE
-- ============================================================================

View File

@@ -143,52 +143,8 @@ SELECT create_hypertable('event', 'created',
COMMENT ON TABLE event IS 'Events are instances of triggers firing (TimescaleDB hypertable partitioned on created)';
-- ============================================================================
-- CONVERT ENFORCEMENT TABLE TO HYPERTABLE
-- ============================================================================
-- Enforcements are created and then updated exactly once (status changes from
-- `created` to `processed` or `disabled` within ~1 second). This single update
-- happens well before the 7-day compression window, so UPDATE on uncompressed
-- chunks works without issues.
--
-- No FK constraints reference enforcement(id) — execution.enforcement was
-- created as a plain BIGINT in migration 000005.
-- ----------------------------------------------------------------------------
ALTER TABLE enforcement DROP CONSTRAINT enforcement_pkey;
ALTER TABLE enforcement ADD PRIMARY KEY (id, created);
SELECT create_hypertable('enforcement', 'created',
chunk_time_interval => INTERVAL '1 day',
migrate_data => true);
COMMENT ON TABLE enforcement IS 'Enforcements represent rule triggering by events (TimescaleDB hypertable partitioned on created)';
-- ============================================================================
-- CONVERT EXECUTION TABLE TO HYPERTABLE
-- ============================================================================
-- Executions are updated ~4 times during their lifecycle (requested → scheduled
-- → running → completed/failed), completing within at most ~1 day — well before
-- the 7-day compression window. The `updated` column and its BEFORE UPDATE
-- trigger are preserved (used by timeout monitor and UI).
--
-- No FK constraints reference execution(id) — inquiry.execution,
-- workflow_execution.execution, execution.parent, and execution.original_execution
-- were all created as plain BIGINT columns in migrations 000005 and 000006.
--
-- The existing execution_history hypertable and its trigger are preserved —
-- they track field-level diffs of each update, which remains valuable for
-- a mutable table.
-- ----------------------------------------------------------------------------
ALTER TABLE execution DROP CONSTRAINT execution_pkey;
ALTER TABLE execution ADD PRIMARY KEY (id, created);
SELECT create_hypertable('execution', 'created',
chunk_time_interval => INTERVAL '1 day',
migrate_data => true);
COMMENT ON TABLE execution IS 'Executions represent action runs with workflow support (TimescaleDB hypertable partitioned on created). Updated ~4 times during lifecycle, completing within ~1 day (well before 7-day compression window).';
COMMENT ON TABLE enforcement IS 'Enforcements represent rule triggering by events';
COMMENT ON TABLE execution IS 'Executions represent action runs with workflow support. History and analytics are stored in execution_history.';
-- ============================================================================
-- TRIGGER FUNCTIONS
@@ -410,22 +366,6 @@ ALTER TABLE event SET (
);
SELECT add_compression_policy('event', INTERVAL '7 days');
-- Enforcement table (hypertable)
ALTER TABLE enforcement SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'rule_ref',
timescaledb.compress_orderby = 'created DESC'
);
SELECT add_compression_policy('enforcement', INTERVAL '7 days');
-- Execution table (hypertable)
ALTER TABLE execution SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'action_ref',
timescaledb.compress_orderby = 'created DESC'
);
SELECT add_compression_policy('execution', INTERVAL '7 days');
-- ============================================================================
-- RETENTION POLICIES
-- ============================================================================
@@ -433,8 +373,6 @@ SELECT add_compression_policy('execution', INTERVAL '7 days');
SELECT add_retention_policy('execution_history', INTERVAL '90 days');
SELECT add_retention_policy('worker_history', INTERVAL '180 days');
SELECT add_retention_policy('event', INTERVAL '90 days');
SELECT add_retention_policy('enforcement', INTERVAL '90 days');
SELECT add_retention_policy('execution', INTERVAL '90 days');
-- ============================================================================
-- CONTINUOUS AGGREGATES
@@ -449,6 +387,8 @@ DROP MATERIALIZED VIEW IF EXISTS event_volume_hourly CASCADE;
DROP MATERIALIZED VIEW IF EXISTS worker_status_hourly CASCADE;
DROP MATERIALIZED VIEW IF EXISTS enforcement_volume_hourly CASCADE;
DROP MATERIALIZED VIEW IF EXISTS execution_volume_hourly CASCADE;
DROP VIEW IF EXISTS enforcement_volume_hourly CASCADE;
DROP VIEW IF EXISTS execution_volume_hourly CASCADE;
-- ----------------------------------------------------------------------------
-- execution_status_hourly
@@ -553,49 +493,35 @@ SELECT add_continuous_aggregate_policy('worker_status_hourly',
-- instead of a separate enforcement_history table.
-- ----------------------------------------------------------------------------
CREATE MATERIALIZED VIEW enforcement_volume_hourly
WITH (timescaledb.continuous) AS
CREATE VIEW enforcement_volume_hourly AS
SELECT
time_bucket('1 hour', created) AS bucket,
date_trunc('hour', created) AS bucket,
rule_ref,
COUNT(*) AS enforcement_count
FROM enforcement
GROUP BY bucket, rule_ref
WITH NO DATA;
SELECT add_continuous_aggregate_policy('enforcement_volume_hourly',
start_offset => INTERVAL '7 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '30 minutes'
);
;
-- ----------------------------------------------------------------------------
-- execution_volume_hourly
-- Tracks execution creation volume per hour by action_ref and status.
-- This queries the execution hypertable directly (like event_volume_hourly
-- queries the event table). Complements the existing execution_status_hourly
-- and execution_throughput_hourly aggregates which query execution_history.
-- This queries the execution table directly. Complements the existing
-- execution_status_hourly and execution_throughput_hourly aggregates which
-- query execution_history.
--
-- Use case: direct execution volume monitoring without relying on the history
-- trigger (belt-and-suspenders, plus captures the initial status at creation).
-- ----------------------------------------------------------------------------
CREATE MATERIALIZED VIEW execution_volume_hourly
WITH (timescaledb.continuous) AS
CREATE VIEW execution_volume_hourly AS
SELECT
time_bucket('1 hour', created) AS bucket,
date_trunc('hour', created) AS bucket,
action_ref,
status AS initial_status,
COUNT(*) AS execution_count
FROM execution
GROUP BY bucket, action_ref, status
WITH NO DATA;
SELECT add_continuous_aggregate_policy('execution_volume_hourly',
start_offset => INTERVAL '7 days',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '30 minutes'
);
;
-- ============================================================================
-- INITIAL REFRESH NOTE

View File

@@ -26,7 +26,7 @@ ALTER TABLE artifact ADD COLUMN IF NOT EXISTS content_type TEXT;
-- Total size in bytes of the latest version's content (NULL for progress artifacts)
ALTER TABLE artifact ADD COLUMN IF NOT EXISTS size_bytes BIGINT;
-- Execution that produced/owns this artifact (plain BIGINT, no FK — execution is a hypertable)
-- Execution that produced/owns this artifact (plain BIGINT, no FK by design)
ALTER TABLE artifact ADD COLUMN IF NOT EXISTS execution BIGINT;
-- Structured data for progress-type artifacts and small structured payloads.
@@ -52,7 +52,7 @@ COMMENT ON COLUMN artifact.name IS 'Human-readable artifact name';
COMMENT ON COLUMN artifact.description IS 'Optional description of the artifact';
COMMENT ON COLUMN artifact.content_type IS 'MIME content type (e.g. application/json, text/plain)';
COMMENT ON COLUMN artifact.size_bytes IS 'Size of latest version content in bytes';
COMMENT ON COLUMN artifact.execution IS 'Execution that produced this artifact (no FK — execution is a hypertable)';
COMMENT ON COLUMN artifact.execution IS 'Execution that produced this artifact (no FK by design)';
COMMENT ON COLUMN artifact.data IS 'Structured JSONB data for progress artifacts or metadata';
COMMENT ON COLUMN artifact.visibility IS 'Access visibility: public (all users) or private (scope/owner-restricted)';