working on workflows

This commit is contained in:
2026-03-04 22:02:34 -06:00
parent b54aa3ec26
commit 7438f92502
63 changed files with 10231 additions and 731 deletions

View File

@@ -42,27 +42,12 @@ use crate::workflow::graph::TaskGraph;
/// Extract workflow parameters from an execution's `config` field.
///
/// The config may be stored in two formats:
/// 1. Wrapped: `{"parameters": {"n": 5, ...}}` — used by child task executions
/// 2. Flat: `{"n": 5, ...}` — used by the API for manual executions
///
/// This helper checks for a `"parameters"` key first, and if absent treats
/// the entire config object as the parameters (matching the worker's logic
/// in `ActionExecutor::prepare_execution_context`).
/// All executions store config in flat format: `{"n": 5, ...}`.
/// The config object itself IS the parameters map.
fn extract_workflow_params(config: &Option<JsonValue>) -> JsonValue {
match config {
Some(c) => {
// Prefer the wrapped format if present
if let Some(params) = c.get("parameters") {
params.clone()
} else if c.is_object() {
// Flat format — the config itself is the parameters
c.clone()
} else {
serde_json::json!({})
}
}
None => serde_json::json!({}),
Some(c) if c.is_object() => c.clone(),
_ => serde_json::json!({}),
}
}
@@ -100,10 +85,7 @@ fn apply_param_defaults(params: JsonValue, param_schema: &Option<JsonValue>) ->
};
if needs_default {
if let Some(default_val) = prop.get("default") {
debug!(
"Applying default for parameter '{}': {}",
key, default_val
);
debug!("Applying default for parameter '{}': {}", key, default_val);
obj.insert(key.clone(), default_val.clone());
}
}
@@ -234,8 +216,25 @@ impl ExecutionScheduler {
worker.id, execution_id
);
// Apply parameter defaults from the action's param_schema.
// This mirrors what `process_workflow_execution` does for workflows
// so that non-workflow executions also get missing parameters filled
// in from the action's declared defaults.
let execution_config = {
let raw_config = execution.config.clone();
let params = extract_workflow_params(&raw_config);
let params_with_defaults = apply_param_defaults(params, &action.param_schema);
// Config is already flat — just use the defaults-applied version
if params_with_defaults.is_object()
&& !params_with_defaults.as_object().unwrap().is_empty()
{
Some(params_with_defaults)
} else {
raw_config
}
};
// Update execution status to scheduled
let execution_config = execution.config.clone();
let mut execution_for_update = execution;
execution_for_update.status = ExecutionStatus::Scheduled;
ExecutionRepository::update(pool, execution_for_update.id, execution_for_update.into())
@@ -391,6 +390,7 @@ impl ExecutionScheduler {
&workflow_execution.id,
task_node,
&wf_ctx,
None, // entry-point task — no predecessor
)
.await?;
} else {
@@ -407,6 +407,10 @@ impl ExecutionScheduler {
/// Create a child execution for a single workflow task and dispatch it to
/// a worker. The child execution references the parent workflow execution
/// via `workflow_task` metadata.
///
/// `triggered_by` is the name of the predecessor task whose completion
/// caused this task to be scheduled. Pass `None` for entry-point tasks
/// dispatched at workflow start.
async fn dispatch_workflow_task(
pool: &PgPool,
publisher: &Publisher,
@@ -415,6 +419,7 @@ impl ExecutionScheduler {
workflow_execution_id: &i64,
task_node: &crate::workflow::graph::TaskNode,
wf_ctx: &WorkflowContext,
triggered_by: Option<&str>,
) -> Result<()> {
let action_ref: String = match &task_node.action {
Some(a) => a.clone(),
@@ -461,6 +466,7 @@ impl ExecutionScheduler {
&action_ref,
with_items_expr,
wf_ctx,
triggered_by,
)
.await;
}
@@ -484,12 +490,12 @@ impl ExecutionScheduler {
task_node.input.clone()
};
// Build task config from the (rendered) input
// Build task config from the (rendered) input.
// Store as flat parameters (consistent with manual and rule-triggered
// executions) — no {"parameters": ...} wrapper.
let task_config: Option<JsonValue> =
if rendered_input.is_object() && !rendered_input.as_object().unwrap().is_empty() {
Some(serde_json::json!({
"parameters": rendered_input
}))
Some(rendered_input.clone())
} else if let Some(parent_config) = &parent_execution.config {
Some(parent_config.clone())
} else {
@@ -500,6 +506,7 @@ impl ExecutionScheduler {
let workflow_task = WorkflowTaskMetadata {
workflow_execution: *workflow_execution_id,
task_name: task_node.name.clone(),
triggered_by: triggered_by.map(String::from),
task_index: None,
task_batch: None,
retry_count: 0,
@@ -587,6 +594,7 @@ impl ExecutionScheduler {
action_ref: &str,
with_items_expr: &str,
wf_ctx: &WorkflowContext,
triggered_by: Option<&str>,
) -> Result<()> {
// Resolve the with_items expression to a JSON array
let items_value = wf_ctx
@@ -647,9 +655,11 @@ impl ExecutionScheduler {
task_node.input.clone()
};
// Store as flat parameters (consistent with manual and rule-triggered
// executions) — no {"parameters": ...} wrapper.
let task_config: Option<JsonValue> =
if rendered_input.is_object() && !rendered_input.as_object().unwrap().is_empty() {
Some(serde_json::json!({ "parameters": rendered_input }))
Some(rendered_input.clone())
} else if let Some(parent_config) = &parent_execution.config {
Some(parent_config.clone())
} else {
@@ -659,6 +669,7 @@ impl ExecutionScheduler {
let workflow_task = WorkflowTaskMetadata {
workflow_execution: *workflow_execution_id,
task_name: task_node.name.clone(),
triggered_by: triggered_by.map(String::from),
task_index: Some(index as i32),
task_batch: None,
retry_count: 0,
@@ -961,8 +972,7 @@ impl ExecutionScheduler {
.and_then(|n| n.concurrency)
.unwrap_or(1);
let free_slots =
concurrency_limit.saturating_sub(in_flight_count.0 as usize);
let free_slots = concurrency_limit.saturating_sub(in_flight_count.0 as usize);
if free_slots > 0 {
if let Err(e) = Self::publish_pending_with_items_children(
@@ -1009,6 +1019,39 @@ impl ExecutionScheduler {
return Ok(());
}
// ---------------------------------------------------------
// Race-condition guard: when multiple with_items children
// complete nearly simultaneously, the worker updates their
// DB status to Completed *before* the completion MQ message
// is processed. This means several advance_workflow calls
// (processed sequentially by the completion listener) can
// each see "0 siblings remaining" and fall through to
// transition evaluation, dispatching successor tasks
// multiple times.
//
// To prevent this we re-check the *persisted*
// completed/failed task lists that were loaded from the
// workflow_execution record at the top of this function.
// If `task_name` is already present, a previous
// advance_workflow invocation already handled the final
// completion of this with_items task and dispatched its
// successors — we can safely return.
// ---------------------------------------------------------
if workflow_execution
.completed_tasks
.contains(&task_name.to_string())
|| workflow_execution
.failed_tasks
.contains(&task_name.to_string())
{
debug!(
"with_items task '{}' already in persisted completed/failed list — \
another advance_workflow call already handled final completion, skipping",
task_name,
);
return Ok(());
}
// All items done — check if any failed
let any_failed: Vec<(i64,)> = sqlx::query_as(
"SELECT id \
@@ -1129,10 +1172,10 @@ impl ExecutionScheduler {
if should_fire {
// Process publish directives from this transition
if !transition.publish.is_empty() {
let publish_map: HashMap<String, String> = transition
let publish_map: HashMap<String, JsonValue> = transition
.publish
.iter()
.map(|p| (p.name.clone(), p.expression.clone()))
.map(|p| (p.name.clone(), p.value.clone()))
.collect();
if let Err(e) = wf_ctx.publish_from_result(
&serde_json::json!({}),
@@ -1161,6 +1204,41 @@ impl ExecutionScheduler {
continue;
}
// Guard against dispatching a task that has already
// been dispatched (exists as a child execution in
// this workflow). This catches edge cases where
// the persisted completed/failed lists haven't been
// updated yet but a child execution was already
// created by a prior advance_workflow call.
//
// This is critical for with_items predecessors:
// workers update DB status to Completed before the
// completion MQ message is processed, so multiple
// with_items items can each see "0 siblings
// remaining" and attempt to dispatch the same
// successor. The query covers both regular tasks
// (task_index IS NULL) and with_items tasks
// (task_index IS NOT NULL) so that neither case
// can be double-dispatched.
let already_dispatched: (i64,) = sqlx::query_as(
"SELECT COUNT(*) \
FROM execution \
WHERE workflow_task->>'workflow_execution' = $1::text \
AND workflow_task->>'task_name' = $2",
)
.bind(workflow_execution_id.to_string())
.bind(next_task_name.as_str())
.fetch_one(pool)
.await?;
if already_dispatched.0 > 0 {
debug!(
"Skipping task '{}' — already dispatched ({} existing execution(s))",
next_task_name, already_dispatched.0
);
continue;
}
// Check join barrier: if the task has a `join` count,
// only schedule it when enough predecessors are done.
if let Some(next_node) = graph.get_task(next_task_name) {
@@ -1210,6 +1288,7 @@ impl ExecutionScheduler {
&workflow_execution_id,
task_node,
&wf_ctx,
Some(task_name), // predecessor that triggered this task
)
.await
{
@@ -1716,19 +1795,8 @@ mod tests {
assert_eq!(free, 0);
}
#[test]
fn test_extract_workflow_params_wrapped_format() {
// Child task executions store config as {"parameters": {...}}
let config = Some(serde_json::json!({
"parameters": {"n": 5, "name": "test"}
}));
let params = extract_workflow_params(&config);
assert_eq!(params, serde_json::json!({"n": 5, "name": "test"}));
}
#[test]
fn test_extract_workflow_params_flat_format() {
// API manual executions store config as flat {"n": 5, ...}
let config = Some(serde_json::json!({"n": 5, "name": "test"}));
let params = extract_workflow_params(&config);
assert_eq!(params, serde_json::json!({"n": 5, "name": "test"}));
@@ -1742,7 +1810,6 @@ mod tests {
#[test]
fn test_extract_workflow_params_non_object() {
// Edge case: config is a non-object JSON value
let config = Some(serde_json::json!("not an object"));
let params = extract_workflow_params(&config);
assert_eq!(params, serde_json::json!({}));
@@ -1756,14 +1823,17 @@ mod tests {
}
#[test]
fn test_extract_workflow_params_wrapped_takes_precedence() {
// If config has a "parameters" key, that value is used even if
// the config object also has other top-level keys
fn test_extract_workflow_params_with_parameters_key() {
// A "parameters" key is just a regular parameter — not unwrapped
let config = Some(serde_json::json!({
"parameters": {"n": 5},
"context": {"rule": "test"}
}));
let params = extract_workflow_params(&config);
assert_eq!(params, serde_json::json!({"n": 5}));
// Returns the whole object as-is — "parameters" is treated as a normal key
assert_eq!(
params,
serde_json::json!({"parameters": {"n": 5}, "context": {"rule": "test"}})
);
}
}