17 KiB
Workflow Execution Engine
Overview
The Workflow Execution Engine is responsible for orchestrating the execution of workflows in Attune. It manages task dependencies, parallel execution, state transitions, context passing, retries, timeouts, and error handling.
Architecture
The execution engine consists of four main components:
1. Task Graph Builder (workflow/graph.rs)
Purpose: Converts workflow definitions into executable task graphs with dependency information.
Key Features:
- Builds directed acyclic graph (DAG) from workflow tasks
- Topological sorting for execution order
- Dependency computation from task transitions
- Cycle detection
- Entry point identification
Data Structures:
TaskGraph: Complete executable graph with nodes, dependencies, and execution orderTaskNode: Individual task with configuration, transitions, and dependenciesTaskTransitions: Success/failure/complete/timeout transitions and decision branchesRetryConfig: Retry configuration with backoff strategies
Example Usage:
use attune_executor::workflow::{TaskGraph, parse_workflow_yaml};
let workflow = parse_workflow_yaml(yaml_content)?;
let graph = TaskGraph::from_workflow(&workflow)?;
// Get entry points (tasks with no dependencies)
for entry in &graph.entry_points {
println!("Entry point: {}", entry);
}
// Get tasks ready to execute
let completed = HashSet::new();
let ready = graph.ready_tasks(&completed);
2. Context Manager (workflow/context.rs)
Purpose: Manages workflow execution context, including variables, parameters, and template rendering.
Key Features:
- Workflow-level and task-level variable management
- Jinja2-like template rendering with
{{ variable }}syntax - Task result storage and retrieval
- With-items iteration support (current item and index)
- Nested value access (e.g.,
{{ parameters.config.server.port }}) - Context import/export for persistence
Variable Scopes:
parameters.*- Input parameters to the workflowvars.*orvariables.*- Workflow-scoped variablestask.*ortasks.*- Task resultsitem- Current item in with-items iterationindex- Current index in with-items iterationsystem.*- System variables (e.g., workflow start time)
Example Usage:
use attune_executor::workflow::WorkflowContext;
use serde_json::json;
let params = json!({"name": "Alice"});
let mut ctx = WorkflowContext::new(params, HashMap::new());
// Render template
let result = ctx.render_template("Hello {{ parameters.name }}!")?;
// Result: "Hello Alice!"
// Store task result
ctx.set_task_result("task1", json!({"status": "success"}));
// Publish variables from result
let result = json!({"output": "value"});
ctx.publish_from_result(&result, &["my_var".to_string()], None)?;
3. Task Executor (workflow/task_executor.rs)
Purpose: Executes individual workflow tasks with support for different task types, retries, and timeouts.
Key Features:
- Action task execution (queues actions for workers)
- Parallel task execution (spawns multiple tasks concurrently)
- Workflow task execution (nested workflows - TODO)
- With-items iteration (batch processing with concurrency limits)
- Conditional execution (when clauses)
- Retry logic with configurable backoff strategies
- Timeout handling
- Task result publishing to context
Task Types:
- Action: Execute a single action
- Parallel: Execute multiple sub-tasks concurrently
- Workflow: Execute a nested workflow (not yet implemented)
Retry Strategies:
- Constant: Fixed delay between retries
- Linear: Linearly increasing delay
- Exponential: Exponentially increasing delay with optional max delay
Example Task Execution Flow:
1. Check if task should be skipped (when condition)
2. Check if task has with-items iteration
- If yes, process items in batches with concurrency limits
- If no, execute single task
3. Render task input with context
4. Execute based on task type (action/parallel/workflow)
5. Apply timeout if configured
6. Handle retries on failure
7. Publish variables from result
8. Update task execution record in database
4. Workflow Coordinator (workflow/coordinator.rs)
Purpose: Main orchestration component that manages the complete workflow execution lifecycle.
Key Features:
- Workflow lifecycle management (start, pause, resume, cancel)
- State management (completed, failed, skipped tasks)
- Concurrent task execution coordination
- Database state persistence
- Execution result aggregation
- Error handling and recovery
Workflow Execution States:
Requested- Workflow execution requestedScheduling- Being scheduledScheduled- Ready to executeRunning- Currently executingCompleted- Successfully completedFailed- Failed with errorsCancelled- Cancelled by userTimeout- Timed out
Example Usage:
use attune_executor::workflow::WorkflowCoordinator;
use serde_json::json;
let coordinator = WorkflowCoordinator::new(db_pool, mq);
// Start workflow execution
let handle = coordinator
.start_workflow("my_pack.my_workflow", json!({"param": "value"}), None)
.await?;
// Execute to completion
let result = handle.execute().await?;
println!("Status: {:?}", result.status);
println!("Completed tasks: {}", result.completed_tasks);
println!("Failed tasks: {}", result.failed_tasks);
// Or control execution
handle.pause(Some("User requested pause".to_string())).await?;
handle.resume().await?;
handle.cancel().await?;
// Check status
let status = handle.status().await;
println!("Current: {}/{} tasks", status.completed_tasks, status.total_tasks);
Execution Flow
High-Level Workflow Execution
1. Load workflow definition from database
2. Parse workflow YAML definition
3. Build task graph with dependencies
4. Create parent execution record
5. Initialize workflow context with parameters and variables
6. Create workflow execution record in database
7. Enter execution loop:
a. Check if workflow is paused -> wait
b. Check if workflow is complete -> exit
c. Get ready tasks (dependencies satisfied)
d. Spawn async execution for each ready task
e. Wait briefly before checking again
8. Aggregate results and return
Task Execution Flow
1. Create task execution record in database
2. Get current workflow context
3. Execute task (action/parallel/workflow/with-items)
4. Update task execution record with result
5. Update workflow state:
- Add to completed_tasks on success
- Add to failed_tasks on failure (unless retrying)
- Add to skipped_tasks if skipped
- Update context with task result
6. Persist workflow state to database
Database Schema
workflow_execution Table
Stores workflow execution state:
CREATE TABLE attune.workflow_execution (
id BIGSERIAL PRIMARY KEY,
execution BIGINT NOT NULL REFERENCES attune.execution(id),
workflow_def BIGINT NOT NULL REFERENCES attune.workflow_definition(id),
current_tasks TEXT[] NOT NULL DEFAULT '{}',
completed_tasks TEXT[] NOT NULL DEFAULT '{}',
failed_tasks TEXT[] NOT NULL DEFAULT '{}',
skipped_tasks TEXT[] NOT NULL DEFAULT '{}',
variables JSONB NOT NULL DEFAULT '{}',
task_graph JSONB NOT NULL,
status execution_status_enum NOT NULL,
error_message TEXT,
paused BOOLEAN NOT NULL DEFAULT false,
pause_reason TEXT,
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
workflow_task_execution Table
Stores individual task execution state:
CREATE TABLE attune.workflow_task_execution (
id BIGSERIAL PRIMARY KEY,
workflow_execution BIGINT NOT NULL REFERENCES attune.workflow_execution(id),
execution BIGINT NOT NULL REFERENCES attune.execution(id),
task_name TEXT NOT NULL,
task_index INTEGER,
task_batch INTEGER,
status execution_status_enum NOT NULL,
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
duration_ms BIGINT,
result JSONB,
error JSONB,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 0,
next_retry_at TIMESTAMP WITH TIME ZONE,
timeout_seconds INTEGER,
timed_out BOOLEAN NOT NULL DEFAULT false,
created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
Template Rendering
Syntax
Templates use Jinja2-like syntax with {{ expression }}:
tasks:
- name: greet
action: core.echo
input:
message: "Hello {{ parameters.name }}!"
- name: process
action: core.process
input:
data: "{{ task.greet.result.output }}"
count: "{{ variables.counter }}"
Supported Expressions
Parameters:
{{ parameters.name }}
{{ parameters.config.server.port }}
Variables:
{{ vars.my_variable }}
{{ variables.counter }}
{{ my_var }} # Direct variable reference
Task Results:
{{ task.task_name.result }}
{{ task.task_name.output.key }}
{{ tasks.previous_task.status }}
With-Items Context:
{{ item }}
{{ item.name }}
{{ index }}
System Variables:
{{ system.workflow_start }}
With-Items Iteration
Execute a task multiple times with different items:
tasks:
- name: process_servers
action: server.configure
with_items: "{{ parameters.servers }}"
batch_size: 5 # Process 5 items at a time
concurrency: 10 # Max 10 concurrent executions
input:
server: "{{ item.hostname }}"
index: "{{ index }}"
Features:
- Batch processing: Process items in batches of specified size
- Concurrency control: Limit number of concurrent executions
- Context isolation: Each iteration has its own
itemandindex - Result aggregation: All results collected in array
Retry Strategies
Constant Backoff
Fixed delay between retries:
tasks:
- name: flaky_task
action: external.api_call
retry:
count: 3
delay: 10 # 10 seconds between each retry
backoff: constant
Linear Backoff
Linearly increasing delay:
retry:
count: 5
delay: 5
backoff: linear
# Delays: 5s, 10s, 15s, 20s, 25s
Exponential Backoff
Exponentially increasing delay:
retry:
count: 5
delay: 2
backoff: exponential
max_delay: 60
# Delays: 2s, 4s, 8s, 16s, 32s (capped at 60s)
Task Transitions
Control workflow flow with transitions:
tasks:
- name: check
action: core.check_status
on_success: deploy # Go to deploy on success
on_failure: rollback # Go to rollback on failure
on_complete: notify # Always go to notify
on_timeout: alert # Go to alert on timeout
- name: decision
action: core.evaluate
decision:
- when: "{{ task.decision.result.action == 'approve' }}"
next: deploy
- when: "{{ task.decision.result.action == 'reject' }}"
next: rollback
- default: true
next: manual_review
Error Handling
Task Execution Errors
Errors are captured with:
- Error message
- Error type
- Optional error details (JSON)
Workflow Failure Handling
- Individual task failures don't immediately stop the workflow
- Dependent tasks won't execute if prerequisites failed
- Workflow completes when all executable tasks finish
- Final status is
Failedif any task failed
Retry on Error
retry:
count: 3
delay: 5
backoff: exponential
on_error: "{{ result.error_code == 'RETRY_ABLE' }}" # Only retry specific errors
Parallel Execution
Execute multiple tasks concurrently:
tasks:
- name: parallel_checks
type: parallel
tasks:
- name: check_service_a
action: monitoring.check_health
input:
service: "service-a"
- name: check_service_b
action: monitoring.check_health
input:
service: "service-b"
- name: check_database
action: monitoring.check_db
on_success: deploy
on_failure: abort
Features:
- All sub-tasks execute concurrently
- Parent task waits for all sub-tasks to complete
- Success only if all sub-tasks succeed
- Individual sub-task results aggregated
Conditional Execution
Skip tasks based on conditions:
tasks:
- name: deploy
action: deployment.deploy
when: "{{ parameters.environment == 'production' }}"
input:
version: "{{ parameters.version }}"
When Clause Evaluation:
- Template rendered with current context
- Evaluated as boolean (truthy/falsy)
- Task skipped if condition is false
State Persistence
Workflow state is persisted to the database after every task completion:
- Current executing tasks
- Completed tasks list
- Failed tasks list
- Skipped tasks list
- Workflow variables (entire context)
- Execution status
- Pause state and reason
- Error messages
This enables:
- Workflow resume after service restart
- Pause/resume functionality
- Execution history and auditing
- Progress monitoring
Integration Points
Message Queue
Tasks queue action executions via RabbitMQ:
// Task executor creates execution record
let execution = create_execution_record(...).await?;
// Queues execution for worker (TODO: implement MQ publishing)
self.mq.publish_execution_request(execution.id, action_ref, &input).await?;
Worker Coordination
- Executor creates execution records
- Workers pick up and execute actions
- Workers update execution status
- Coordinator monitors completion (TODO: implement completion listener)
Event Publishing
Workflow events should be published for:
- Workflow started
- Workflow completed/failed
- Task started/completed/failed
- Workflow paused/resumed/cancelled
Future Enhancements
TODO Items
- Completion Listener: Listen for task completion events from workers
- Nested Workflows: Execute workflows as tasks within workflows
- MQ Publishing: Implement actual message queue publishing for action execution
- Advanced Expressions: Support comparisons, logical operators in templates
- Error Condition Evaluation: Evaluate
on_errorexpressions for selective retries - Workflow Timeouts: Global workflow timeout configuration
- Task Dependencies: Explicit
depends_ontask specification - Loop Constructs: While/until loops in addition to with-items
- Manual Steps: Human-in-the-loop approval tasks
- Sub-workflow Output: Capture and use nested workflow results
Testing
Unit Tests
Each module includes unit tests:
# Run all executor tests
cargo test -p attune-executor
# Run specific module tests
cargo test -p attune-executor --lib workflow::graph
cargo test -p attune-executor --lib workflow::context
Integration Tests
Integration tests require database and message queue:
# Set up test database
export DATABASE_URL="postgresql://attune_test:attune_test@localhost:5432/attune_test"
sqlx migrate run
# Run integration tests
cargo test -p attune-executor --test '*'
Performance Considerations
Concurrency
- Parallel tasks execute truly concurrently using
futures::join_all - With-items supports configurable concurrency limits
- Task graph execution is optimized with topological sorting
Database Operations
- Workflow state persisted after each task completion
- Batch operations used where possible
- Connection pooling for database access
Memory
- Task graphs and contexts can be large for complex workflows
- Consider workflow size limits in production
- Context variables should be reasonably sized
Troubleshooting
Workflow Not Progressing
Symptoms: Workflow stuck in Running state
Causes:
- Circular dependencies (should be caught during parsing)
- All tasks waiting on failed dependencies
- Database connection issues
Solution: Check workflow state in database, review task dependencies
Tasks Not Executing
Symptoms: Ready tasks not starting
Causes:
- Worker service not running
- Message queue not connected
- Execution records not being created
Solution: Check worker logs, verify MQ connection, check database
Template Rendering Errors
Symptoms: Tasks fail with template errors
Causes:
- Invalid variable references
- Missing context data
- Malformed expressions
Solution: Validate templates, check available context variables
Examples
See docs/workflows/ for complete workflow examples demonstrating:
- Sequential workflows
- Parallel execution
- With-items iteration
- Conditional execution
- Error handling and retries
- Complex workflows with decisions