proper sql filtering
This commit is contained in:
@@ -66,6 +66,53 @@ fn extract_workflow_params(config: &Option<JsonValue>) -> JsonValue {
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply default values from a workflow's `param_schema` to the provided
|
||||
/// parameters.
|
||||
///
|
||||
/// The param_schema uses the flat format where each key maps to an object
|
||||
/// that may contain a `"default"` field:
|
||||
///
|
||||
/// ```json
|
||||
/// { "n": { "type": "integer", "default": 10 } }
|
||||
/// ```
|
||||
///
|
||||
/// Any parameter that has a default in the schema but is missing (or `null`)
|
||||
/// in the supplied `params` will be filled in. Parameters already provided
|
||||
/// by the caller are never overwritten.
|
||||
fn apply_param_defaults(params: JsonValue, param_schema: &Option<JsonValue>) -> JsonValue {
|
||||
let schema = match param_schema {
|
||||
Some(s) if s.is_object() => s,
|
||||
_ => return params,
|
||||
};
|
||||
|
||||
let mut obj = match params {
|
||||
JsonValue::Object(m) => m,
|
||||
_ => return params,
|
||||
};
|
||||
|
||||
if let Some(schema_obj) = schema.as_object() {
|
||||
for (key, prop) in schema_obj {
|
||||
// Only fill in missing / null parameters
|
||||
let needs_default = match obj.get(key) {
|
||||
None => true,
|
||||
Some(JsonValue::Null) => true,
|
||||
_ => false,
|
||||
};
|
||||
if needs_default {
|
||||
if let Some(default_val) = prop.get("default") {
|
||||
debug!(
|
||||
"Applying default for parameter '{}': {}",
|
||||
key, default_val
|
||||
);
|
||||
obj.insert(key.clone(), default_val.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
JsonValue::Object(obj)
|
||||
}
|
||||
|
||||
/// Payload for execution scheduled messages
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct ExecutionScheduledPayload {
|
||||
@@ -316,7 +363,10 @@ impl ExecutionScheduler {
|
||||
|
||||
// Build initial workflow context from execution parameters and
|
||||
// workflow-level vars so that entry-point task inputs are rendered.
|
||||
// Apply defaults from the workflow's param_schema for any parameters
|
||||
// that were not supplied by the caller.
|
||||
let workflow_params = extract_workflow_params(&execution.config);
|
||||
let workflow_params = apply_param_defaults(workflow_params, &workflow_def.param_schema);
|
||||
let wf_ctx = WorkflowContext::new(
|
||||
workflow_params,
|
||||
definition
|
||||
@@ -563,7 +613,7 @@ impl ExecutionScheduler {
|
||||
};
|
||||
|
||||
let total = items.len();
|
||||
let concurrency_limit = task_node.concurrency.unwrap_or(total);
|
||||
let concurrency_limit = task_node.concurrency.unwrap_or(1);
|
||||
let dispatch_count = total.min(concurrency_limit);
|
||||
|
||||
info!(
|
||||
@@ -842,6 +892,18 @@ impl ExecutionScheduler {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Load the workflow definition so we can apply param_schema defaults
|
||||
let workflow_def =
|
||||
WorkflowDefinitionRepository::find_by_id(pool, workflow_execution.workflow_def)
|
||||
.await?
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"Workflow definition {} not found for workflow_execution {}",
|
||||
workflow_execution.workflow_def,
|
||||
workflow_execution_id
|
||||
)
|
||||
})?;
|
||||
|
||||
// Rebuild the task graph from the stored JSON
|
||||
let graph: TaskGraph = serde_json::from_value(workflow_execution.task_graph.clone())
|
||||
.map_err(|e| {
|
||||
@@ -897,7 +959,7 @@ impl ExecutionScheduler {
|
||||
let concurrency_limit = graph
|
||||
.get_task(task_name)
|
||||
.and_then(|n| n.concurrency)
|
||||
.unwrap_or(usize::MAX);
|
||||
.unwrap_or(1);
|
||||
|
||||
let free_slots =
|
||||
concurrency_limit.saturating_sub(in_flight_count.0 as usize);
|
||||
@@ -995,6 +1057,7 @@ impl ExecutionScheduler {
|
||||
// results so that successor task inputs can be rendered.
|
||||
// -----------------------------------------------------------------
|
||||
let workflow_params = extract_workflow_params(&parent_execution.config);
|
||||
let workflow_params = apply_param_defaults(workflow_params, &workflow_def.param_schema);
|
||||
|
||||
// Collect results from all completed children of this workflow
|
||||
let child_executions =
|
||||
@@ -1619,11 +1682,11 @@ mod tests {
|
||||
let dispatch_count = total.min(concurrency_limit);
|
||||
assert_eq!(dispatch_count, 3);
|
||||
|
||||
// No concurrency limit → dispatch all
|
||||
// No concurrency limit → default to serial (1 at a time)
|
||||
let concurrency: Option<usize> = None;
|
||||
let concurrency_limit = concurrency.unwrap_or(total);
|
||||
let concurrency_limit = concurrency.unwrap_or(1);
|
||||
let dispatch_count = total.min(concurrency_limit);
|
||||
assert_eq!(dispatch_count, 20);
|
||||
assert_eq!(dispatch_count, 1);
|
||||
|
||||
// Concurrency exceeds total → dispatch all
|
||||
let concurrency: Option<usize> = Some(50);
|
||||
|
||||
Reference in New Issue
Block a user