[wip] single runtime handling
This commit is contained in:
@@ -21,13 +21,17 @@ use anyhow::Result;
|
||||
use attune_common::{
|
||||
models::{enums::ExecutionStatus, Execution},
|
||||
mq::{
|
||||
Consumer, ExecutionRequestedPayload, ExecutionStatusChangedPayload, MessageEnvelope,
|
||||
MessageType, Publisher,
|
||||
Consumer, ExecutionCancelRequestedPayload, ExecutionRequestedPayload,
|
||||
ExecutionStatusChangedPayload, MessageEnvelope, MessageType, Publisher,
|
||||
},
|
||||
repositories::{
|
||||
execution::{CreateExecutionInput, ExecutionRepository},
|
||||
Create, FindById,
|
||||
execution::{CreateExecutionInput, ExecutionRepository, UpdateExecutionInput},
|
||||
workflow::{
|
||||
UpdateWorkflowExecutionInput, WorkflowDefinitionRepository, WorkflowExecutionRepository,
|
||||
},
|
||||
Create, FindById, Update,
|
||||
},
|
||||
workflow::{CancellationPolicy, WorkflowDefinition},
|
||||
};
|
||||
|
||||
use sqlx::PgPool;
|
||||
@@ -116,8 +120,18 @@ impl ExecutionManager {
|
||||
"Execution {} reached terminal state: {:?}, handling orchestration",
|
||||
execution_id, status
|
||||
);
|
||||
if status == ExecutionStatus::Cancelled {
|
||||
Self::handle_workflow_cancellation(pool, publisher, &execution).await?;
|
||||
}
|
||||
Self::handle_completion(pool, publisher, &execution).await?;
|
||||
}
|
||||
ExecutionStatus::Canceling => {
|
||||
debug!(
|
||||
"Execution {} entered canceling state; checking for workflow child cancellation",
|
||||
execution_id
|
||||
);
|
||||
Self::handle_workflow_cancellation(pool, publisher, &execution).await?;
|
||||
}
|
||||
ExecutionStatus::Running => {
|
||||
debug!(
|
||||
"Execution {} now running (worker has updated DB)",
|
||||
@@ -135,6 +149,202 @@ impl ExecutionManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_workflow_cancellation(
|
||||
pool: &PgPool,
|
||||
publisher: &Publisher,
|
||||
execution: &Execution,
|
||||
) -> Result<()> {
|
||||
let Some(_) = WorkflowExecutionRepository::find_by_execution(pool, execution.id).await?
|
||||
else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let policy = Self::resolve_cancellation_policy(pool, execution.id).await;
|
||||
Self::cancel_workflow_children_with_policy(pool, publisher, execution.id, policy).await
|
||||
}
|
||||
|
||||
async fn resolve_cancellation_policy(
|
||||
pool: &PgPool,
|
||||
parent_execution_id: i64,
|
||||
) -> CancellationPolicy {
|
||||
let wf_exec =
|
||||
match WorkflowExecutionRepository::find_by_execution(pool, parent_execution_id).await {
|
||||
Ok(Some(wf)) => wf,
|
||||
_ => return CancellationPolicy::default(),
|
||||
};
|
||||
|
||||
let wf_def =
|
||||
match WorkflowDefinitionRepository::find_by_id(pool, wf_exec.workflow_def).await {
|
||||
Ok(Some(def)) => def,
|
||||
_ => return CancellationPolicy::default(),
|
||||
};
|
||||
|
||||
match serde_json::from_value::<WorkflowDefinition>(wf_def.definition) {
|
||||
Ok(def) => def.cancellation_policy,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to deserialize workflow definition for workflow_def {}: {}. Falling back to default cancellation policy.",
|
||||
wf_exec.workflow_def, e
|
||||
);
|
||||
CancellationPolicy::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn cancel_workflow_children_with_policy(
|
||||
pool: &PgPool,
|
||||
publisher: &Publisher,
|
||||
parent_execution_id: i64,
|
||||
policy: CancellationPolicy,
|
||||
) -> Result<()> {
|
||||
let children: Vec<Execution> = sqlx::query_as::<_, Execution>(&format!(
|
||||
"SELECT {} FROM execution WHERE parent = $1 AND status NOT IN ('completed', 'failed', 'timeout', 'cancelled', 'abandoned')",
|
||||
attune_common::repositories::execution::SELECT_COLUMNS
|
||||
))
|
||||
.bind(parent_execution_id)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
if children.is_empty() {
|
||||
return Self::finalize_cancelled_workflow_if_idle(pool, parent_execution_id).await;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Executor cascading cancellation from workflow execution {} to {} child execution(s) with policy {:?}",
|
||||
parent_execution_id,
|
||||
children.len(),
|
||||
policy,
|
||||
);
|
||||
|
||||
for child in &children {
|
||||
let child_id = child.id;
|
||||
|
||||
if matches!(
|
||||
child.status,
|
||||
ExecutionStatus::Requested
|
||||
| ExecutionStatus::Scheduling
|
||||
| ExecutionStatus::Scheduled
|
||||
) {
|
||||
let update = UpdateExecutionInput {
|
||||
status: Some(ExecutionStatus::Cancelled),
|
||||
result: Some(serde_json::json!({
|
||||
"error": "Cancelled: parent workflow execution was cancelled"
|
||||
})),
|
||||
..Default::default()
|
||||
};
|
||||
ExecutionRepository::update(pool, child_id, update).await?;
|
||||
} else if matches!(
|
||||
child.status,
|
||||
ExecutionStatus::Running | ExecutionStatus::Canceling
|
||||
) {
|
||||
match policy {
|
||||
CancellationPolicy::CancelRunning => {
|
||||
if child.status != ExecutionStatus::Canceling {
|
||||
let update = UpdateExecutionInput {
|
||||
status: Some(ExecutionStatus::Canceling),
|
||||
..Default::default()
|
||||
};
|
||||
ExecutionRepository::update(pool, child_id, update).await?;
|
||||
}
|
||||
|
||||
if let Some(worker_id) = child.executor {
|
||||
Self::send_cancel_to_worker(publisher, child_id, worker_id).await?;
|
||||
} else {
|
||||
warn!(
|
||||
"Workflow child execution {} is {:?} but has no assigned worker",
|
||||
child_id, child.status
|
||||
);
|
||||
}
|
||||
}
|
||||
CancellationPolicy::AllowFinish => {
|
||||
info!(
|
||||
"AllowFinish policy: leaving running workflow child execution {} alone",
|
||||
child_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Box::pin(Self::cancel_workflow_children_with_policy(
|
||||
pool, publisher, child_id, policy,
|
||||
))
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Some(wf_exec) =
|
||||
WorkflowExecutionRepository::find_by_execution(pool, parent_execution_id).await?
|
||||
{
|
||||
if !matches!(
|
||||
wf_exec.status,
|
||||
ExecutionStatus::Completed | ExecutionStatus::Failed | ExecutionStatus::Cancelled
|
||||
) {
|
||||
let wf_update = UpdateWorkflowExecutionInput {
|
||||
status: Some(ExecutionStatus::Cancelled),
|
||||
error_message: Some(
|
||||
"Cancelled: parent workflow execution was cancelled".to_string(),
|
||||
),
|
||||
current_tasks: Some(vec![]),
|
||||
..Default::default()
|
||||
};
|
||||
WorkflowExecutionRepository::update(pool, wf_exec.id, wf_update).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Self::finalize_cancelled_workflow_if_idle(pool, parent_execution_id).await
|
||||
}
|
||||
|
||||
async fn finalize_cancelled_workflow_if_idle(
|
||||
pool: &PgPool,
|
||||
parent_execution_id: i64,
|
||||
) -> Result<()> {
|
||||
let still_running: Vec<Execution> = sqlx::query_as::<_, Execution>(&format!(
|
||||
"SELECT {} FROM execution WHERE parent = $1 AND status IN ('running', 'canceling', 'scheduling', 'scheduled', 'requested')",
|
||||
attune_common::repositories::execution::SELECT_COLUMNS
|
||||
))
|
||||
.bind(parent_execution_id)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
if still_running.is_empty() {
|
||||
let update = UpdateExecutionInput {
|
||||
status: Some(ExecutionStatus::Cancelled),
|
||||
result: Some(serde_json::json!({
|
||||
"error": "Workflow cancelled",
|
||||
"succeeded": false,
|
||||
})),
|
||||
..Default::default()
|
||||
};
|
||||
let _ = ExecutionRepository::update(pool, parent_execution_id, update).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_cancel_to_worker(
|
||||
publisher: &Publisher,
|
||||
execution_id: i64,
|
||||
worker_id: i64,
|
||||
) -> Result<()> {
|
||||
let payload = ExecutionCancelRequestedPayload {
|
||||
execution_id,
|
||||
worker_id,
|
||||
};
|
||||
|
||||
let envelope = MessageEnvelope::new(MessageType::ExecutionCancelRequested, payload)
|
||||
.with_source("executor-service")
|
||||
.with_correlation_id(uuid::Uuid::new_v4());
|
||||
|
||||
publisher
|
||||
.publish_envelope_with_routing(
|
||||
&envelope,
|
||||
"attune.executions",
|
||||
&format!("execution.cancel.worker.{}", worker_id),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parse execution status from string
|
||||
fn parse_execution_status(status: &str) -> Result<ExecutionStatus> {
|
||||
match status.to_lowercase().as_str() {
|
||||
|
||||
Reference in New Issue
Block a user