[WIP] making executor ha

This commit is contained in:
2026-04-02 11:33:26 -05:00
parent 8278030699
commit 8e91440f23
6 changed files with 1876 additions and 144 deletions

View File

@@ -41,6 +41,12 @@ pub struct ExecutionSearchResult {
pub total: u64,
}
#[derive(Debug, Clone)]
pub struct WorkflowTaskExecutionCreateOrGetResult {
pub execution: Execution,
pub created: bool,
}
/// An execution row with optional `rule_ref` / `trigger_ref` populated from
/// the joined `enforcement` table. This avoids a separate in-memory lookup.
#[derive(Debug, Clone, sqlx::FromRow)]
@@ -209,6 +215,134 @@ impl Update for ExecutionRepository {
}
impl ExecutionRepository {
pub async fn create_workflow_task_if_absent<'e, E>(
executor: E,
input: CreateExecutionInput,
workflow_execution_id: Id,
task_name: &str,
task_index: Option<i32>,
) -> Result<WorkflowTaskExecutionCreateOrGetResult>
where
E: Executor<'e, Database = Postgres> + Copy + 'e,
{
if let Some(execution) =
Self::find_by_workflow_task(executor, workflow_execution_id, task_name, task_index)
.await?
{
return Ok(WorkflowTaskExecutionCreateOrGetResult {
execution,
created: false,
});
}
let execution = Self::create(executor, input).await?;
Ok(WorkflowTaskExecutionCreateOrGetResult {
execution,
created: true,
})
}
pub async fn claim_for_scheduling<'e, E>(
executor: E,
id: Id,
claiming_executor: Option<Id>,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let sql = format!(
"UPDATE execution \
SET status = $2, executor = COALESCE($3, executor), updated = NOW() \
WHERE id = $1 AND status = $4 \
RETURNING {SELECT_COLUMNS}"
);
sqlx::query_as::<_, Execution>(&sql)
.bind(id)
.bind(ExecutionStatus::Scheduling)
.bind(claiming_executor)
.bind(ExecutionStatus::Requested)
.fetch_optional(executor)
.await
.map_err(Into::into)
}
pub async fn reclaim_stale_scheduling<'e, E>(
executor: E,
id: Id,
claiming_executor: Option<Id>,
stale_before: DateTime<Utc>,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let sql = format!(
"UPDATE execution \
SET executor = COALESCE($2, executor), updated = NOW() \
WHERE id = $1 AND status = $3 AND updated <= $4 \
RETURNING {SELECT_COLUMNS}"
);
sqlx::query_as::<_, Execution>(&sql)
.bind(id)
.bind(claiming_executor)
.bind(ExecutionStatus::Scheduling)
.bind(stale_before)
.fetch_optional(executor)
.await
.map_err(Into::into)
}
pub async fn update_if_status<'e, E>(
executor: E,
id: Id,
expected_status: ExecutionStatus,
input: UpdateExecutionInput,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
if input.status.is_none()
&& input.result.is_none()
&& input.executor.is_none()
&& input.worker.is_none()
&& input.started_at.is_none()
&& input.workflow_task.is_none()
{
return Self::find_by_id(executor, id).await;
}
Self::update_with_locator_optional(executor, input, |query| {
query.push(" WHERE id = ").push_bind(id);
query.push(" AND status = ").push_bind(expected_status);
})
.await
}
pub async fn revert_scheduled_to_requested<'e, E>(
executor: E,
id: Id,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let sql = format!(
"UPDATE execution \
SET status = $2, worker = NULL, executor = NULL, updated = NOW() \
WHERE id = $1 AND status = $3 \
RETURNING {SELECT_COLUMNS}"
);
sqlx::query_as::<_, Execution>(&sql)
.bind(id)
.bind(ExecutionStatus::Requested)
.bind(ExecutionStatus::Scheduled)
.fetch_optional(executor)
.await
.map_err(Into::into)
}
async fn update_with_locator<'e, E, F>(
executor: E,
input: UpdateExecutionInput,
@@ -274,6 +408,71 @@ impl ExecutionRepository {
.map_err(Into::into)
}
async fn update_with_locator_optional<'e, E, F>(
executor: E,
input: UpdateExecutionInput,
where_clause: F,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
F: FnOnce(&mut QueryBuilder<'_, Postgres>),
{
let mut query = QueryBuilder::new("UPDATE execution SET ");
let mut has_updates = false;
if let Some(status) = input.status {
query.push("status = ").push_bind(status);
has_updates = true;
}
if let Some(result) = &input.result {
if has_updates {
query.push(", ");
}
query.push("result = ").push_bind(result);
has_updates = true;
}
if let Some(executor_id) = input.executor {
if has_updates {
query.push(", ");
}
query.push("executor = ").push_bind(executor_id);
has_updates = true;
}
if let Some(worker_id) = input.worker {
if has_updates {
query.push(", ");
}
query.push("worker = ").push_bind(worker_id);
has_updates = true;
}
if let Some(started_at) = input.started_at {
if has_updates {
query.push(", ");
}
query.push("started_at = ").push_bind(started_at);
has_updates = true;
}
if let Some(workflow_task) = &input.workflow_task {
if has_updates {
query.push(", ");
}
query
.push("workflow_task = ")
.push_bind(sqlx::types::Json(workflow_task));
}
query.push(", updated = NOW()");
where_clause(&mut query);
query.push(" RETURNING ");
query.push(SELECT_COLUMNS);
query
.build_query_as::<Execution>()
.fetch_optional(executor)
.await
.map_err(Into::into)
}
/// Update an execution using the loaded row's hypertable keys.
///
/// Including both the partition key (`created`) and compression segment key
@@ -356,6 +555,34 @@ impl ExecutionRepository {
.map_err(Into::into)
}
pub async fn find_by_workflow_task<'e, E>(
executor: E,
workflow_execution_id: Id,
task_name: &str,
task_index: Option<i32>,
) -> Result<Option<Execution>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let sql = format!(
"SELECT {SELECT_COLUMNS} \
FROM execution \
WHERE workflow_task->>'workflow_execution' = $1::text \
AND workflow_task->>'task_name' = $2 \
AND (workflow_task->>'task_index')::int IS NOT DISTINCT FROM $3 \
ORDER BY created ASC \
LIMIT 1"
);
sqlx::query_as::<_, Execution>(&sql)
.bind(workflow_execution_id.to_string())
.bind(task_name)
.bind(task_index)
.fetch_optional(executor)
.await
.map_err(Into::into)
}
/// Find all child executions for a given parent execution ID.
///
/// Returns child executions ordered by creation time (ascending),

View File

@@ -411,6 +411,12 @@ impl WorkflowDefinitionRepository {
pub struct WorkflowExecutionRepository;
#[derive(Debug, Clone)]
pub struct WorkflowExecutionCreateOrGetResult {
pub workflow_execution: WorkflowExecution,
pub created: bool,
}
impl Repository for WorkflowExecutionRepository {
type Entity = WorkflowExecution;
fn table_name() -> &'static str {
@@ -606,6 +612,51 @@ impl Delete for WorkflowExecutionRepository {
}
impl WorkflowExecutionRepository {
pub async fn create_or_get_by_execution<'e, E>(
executor: E,
input: CreateWorkflowExecutionInput,
) -> Result<WorkflowExecutionCreateOrGetResult>
where
E: Executor<'e, Database = Postgres> + Copy + 'e,
{
let inserted = sqlx::query_as::<_, WorkflowExecution>(
"INSERT INTO workflow_execution
(execution, workflow_def, task_graph, variables, status)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (execution) DO NOTHING
RETURNING id, execution, workflow_def, current_tasks, completed_tasks, failed_tasks, skipped_tasks,
variables, task_graph, status, error_message, paused, pause_reason, created, updated"
)
.bind(input.execution)
.bind(input.workflow_def)
.bind(&input.task_graph)
.bind(&input.variables)
.bind(input.status)
.fetch_optional(executor)
.await?;
if let Some(workflow_execution) = inserted {
return Ok(WorkflowExecutionCreateOrGetResult {
workflow_execution,
created: true,
});
}
let workflow_execution = Self::find_by_execution(executor, input.execution)
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"workflow_execution for parent execution {} disappeared after conflict",
input.execution
)
})?;
Ok(WorkflowExecutionCreateOrGetResult {
workflow_execution,
created: false,
})
}
/// Find workflow execution by the parent execution ID
pub async fn find_by_execution<'e, E>(
executor: E,

View File

@@ -1153,3 +1153,108 @@ async fn test_execution_result_json() {
assert_eq!(updated.result, Some(complex_result));
}
#[tokio::test]
#[ignore = "integration test — requires database"]
async fn test_claim_for_scheduling_succeeds_once() {
let pool = create_test_pool().await.unwrap();
let pack = PackFixture::new_unique("claim_pack")
.create(&pool)
.await
.unwrap();
let action = ActionFixture::new_unique(pack.id, &pack.r#ref, "claim_action")
.create(&pool)
.await
.unwrap();
let created = ExecutionRepository::create(
&pool,
CreateExecutionInput {
action: Some(action.id),
action_ref: action.r#ref.clone(),
config: None,
env_vars: None,
parent: None,
enforcement: None,
executor: None,
worker: None,
status: ExecutionStatus::Requested,
result: None,
workflow_task: None,
},
)
.await
.unwrap();
let first = ExecutionRepository::claim_for_scheduling(&pool, created.id, None)
.await
.unwrap();
let second = ExecutionRepository::claim_for_scheduling(&pool, created.id, None)
.await
.unwrap();
assert_eq!(first.unwrap().status, ExecutionStatus::Scheduling);
assert!(second.is_none());
}
#[tokio::test]
#[ignore = "integration test — requires database"]
async fn test_update_if_status_only_updates_matching_row() {
let pool = create_test_pool().await.unwrap();
let pack = PackFixture::new_unique("conditional_pack")
.create(&pool)
.await
.unwrap();
let action = ActionFixture::new_unique(pack.id, &pack.r#ref, "conditional_action")
.create(&pool)
.await
.unwrap();
let created = ExecutionRepository::create(
&pool,
CreateExecutionInput {
action: Some(action.id),
action_ref: action.r#ref.clone(),
config: None,
env_vars: None,
parent: None,
enforcement: None,
executor: None,
worker: None,
status: ExecutionStatus::Scheduling,
result: None,
workflow_task: None,
},
)
.await
.unwrap();
let updated = ExecutionRepository::update_if_status(
&pool,
created.id,
ExecutionStatus::Scheduling,
UpdateExecutionInput {
status: Some(ExecutionStatus::Scheduled),
worker: Some(77),
..Default::default()
},
)
.await
.unwrap();
let skipped = ExecutionRepository::update_if_status(
&pool,
created.id,
ExecutionStatus::Scheduling,
UpdateExecutionInput {
status: Some(ExecutionStatus::Failed),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(updated.unwrap().status, ExecutionStatus::Scheduled);
assert!(skipped.is_none());
}