33 KiB
Workflow Orchestration Design
Overview
This document describes the architecture and implementation plan for workflow orchestration in Attune. Workflows enable the composition of multiple actions into complex, conditional execution graphs with variable passing, iteration, and error handling.
Design Philosophy
Workflows in Attune follow these core principles:
- Workflows are Actions: A workflow is itself an action that can be invoked by rules, other workflows, or directly via API
- YAML-First: Workflow definitions are declarative YAML files stored in packs
- Event-Driven: Workflows execute asynchronously via the same message queue infrastructure as regular actions
- Composable: Workflows can invoke other workflows recursively
- Observable: Each task in a workflow creates an execution record with full traceability
- Recoverable: Failed workflows can be resumed or retried from specific tasks
Architecture Components
1. Workflow Definition Format
Workflows are defined as YAML files in pack directories alongside regular actions:
# packs/my_pack/workflows/deploy_application.yaml
ref: my_pack.deploy_application
label: "Deploy Application Workflow"
description: "Deploys application with health checks and rollback"
version: "1.0.0"
# Input parameters (like action parameters)
parameters:
app_name:
type: string
required: true
description: "Name of the application to deploy"
version:
type: string
required: true
environment:
type: string
enum: [dev, staging, production]
default: dev
# Output schema (what this workflow produces)
output:
type: object
properties:
deployment_id:
type: string
status:
type: string
deployed_version:
type: string
# Workflow variables (scoped to this workflow execution)
vars:
deployment_id: null
health_check_url: null
rollback_required: false
# Task graph definition
tasks:
# Task 1: Create deployment record
- name: create_deployment
action: my_pack.create_deployment_record
input:
app_name: "{{ parameters.app_name }}"
version: "{{ parameters.version }}"
environment: "{{ parameters.environment }}"
publish:
- deployment_id: "{{ task.create_deployment.result.id }}"
- health_check_url: "{{ task.create_deployment.result.health_url }}"
on_success: build_image
on_failure: notify_failure
# Task 2: Build container image
- name: build_image
action: docker.build_and_push
input:
app_name: "{{ parameters.app_name }}"
version: "{{ parameters.version }}"
registry: "{{ pack.config.docker_registry }}"
on_success: deploy_containers
on_failure: cleanup_deployment
# Task 3: Deploy containers
- name: deploy_containers
action: kubernetes.apply_deployment
input:
app_name: "{{ parameters.app_name }}"
image: "{{ task.build_image.result.image_uri }}"
replicas: 3
on_success: wait_for_ready
on_failure: rollback_deployment
# Task 4: Wait for pods to be ready
- name: wait_for_ready
action: kubernetes.wait_for_ready
input:
deployment: "{{ parameters.app_name }}"
timeout: 300
retry:
count: 3
delay: 10
on_success: health_check
on_failure: rollback_deployment
# Task 5: Perform health check
- name: health_check
action: http.get
input:
url: "{{ vars.health_check_url }}"
expected_status: 200
on_success: update_deployment_status
on_failure: rollback_deployment
# Task 6: Update deployment as successful
- name: update_deployment_status
action: my_pack.update_deployment_status
input:
deployment_id: "{{ vars.deployment_id }}"
status: "success"
on_success: notify_success
# Task 7: Rollback on failure
- name: rollback_deployment
action: kubernetes.rollback_deployment
input:
app_name: "{{ parameters.app_name }}"
publish:
- rollback_required: true
on_complete: cleanup_deployment
# Task 8: Cleanup resources
- name: cleanup_deployment
action: my_pack.cleanup_resources
input:
deployment_id: "{{ vars.deployment_id }}"
rollback: "{{ vars.rollback_required }}"
on_complete: notify_failure
# Task 9: Success notification
- name: notify_success
action: slack.post_message
input:
channel: "#deployments"
message: "✅ Deployed {{ parameters.app_name }} v{{ parameters.version }} to {{ parameters.environment }}"
# Task 10: Failure notification
- name: notify_failure
action: slack.post_message
input:
channel: "#deployments"
message: "❌ Failed to deploy {{ parameters.app_name }} v{{ parameters.version }}"
# Workflow output mapping
output_map:
deployment_id: "{{ vars.deployment_id }}"
status: "{{ task.update_deployment_status.result.status }}"
deployed_version: "{{ parameters.version }}"
2. Advanced Workflow Features
2.1 Parallel Execution
Execute multiple tasks concurrently:
tasks:
- name: parallel_checks
type: parallel
tasks:
- name: check_database
action: postgres.health_check
- name: check_redis
action: redis.ping
- name: check_rabbitmq
action: rabbitmq.cluster_status
# All parallel tasks must complete before proceeding
on_success: deploy_app
on_failure: abort_deployment
2.2 Iteration (with-items)
Iterate over lists with optional batching:
tasks:
# Without batch_size: process items individually (one execution per item)
- name: deploy_to_regions
action: cloud.deploy_instance
with_items: "{{ parameters.regions }}"
input:
# item is the individual region value
region: "{{ item }}"
instance_type: "{{ parameters.instance_type }}"
# Creates one execution per item
on_success: verify_deployments
# With batch_size: items split into batches (batch processing)
- name: process_large_dataset
action: data.transform
with_items: "{{ vars.records }}"
batch_size: 100 # Process 100 items at a time
concurrency: 5 # Process up to 5 batches concurrently
input:
# item is an array containing up to 100 records
# Last batch may contain fewer items
records: "{{ item }}"
publish:
- total_processed: "{{ task.process_large_dataset.total_count }}"
Batch Processing Behavior:
- Without
batch_size: Each item is processed individually (one execution per item)itemvariable contains a single valueindexvariable contains the item index (0-based)
- With
batch_size: Items are split into batches and processed as arraysitemvariable contains an array of items (batch)indexvariable contains the batch index (0-based)- Last batch may contain fewer items than
batch_size - Use
batch_sizefor efficient bulk processing when actions support arrays
2.3 Conditional Execution
Execute tasks based on conditions:
tasks:
- name: check_environment
action: core.noop
when: "{{ parameters.environment == 'production' }}"
on_complete: require_approval
- name: require_approval
action: core.inquiry
input:
prompt: "Approve production deployment of {{ parameters.app_name }}?"
schema:
type: object
properties:
approved:
type: boolean
on_success: deploy_app
# Only proceed if approved
decision:
- when: "{{ task.require_approval.result.approved == true }}"
next: deploy_app
- default: cancel_deployment
2.4 Error Handling and Retry
tasks:
- name: flaky_api_call
action: http.post
input:
url: "{{ vars.api_endpoint }}"
retry:
count: 5
delay: 10 # seconds
backoff: exponential # linear, exponential, constant
max_delay: 60
on_error: "{{ task.flaky_api_call.error.type == 'timeout' }}"
on_success: process_response
on_failure: log_error
2.5 Timeout and Cancellation
tasks:
- name: long_running_task
action: ml.train_model
input:
dataset: "{{ vars.dataset_path }}"
timeout: 3600 # 1 hour
on_timeout: cleanup_and_notify
2.6 Subworkflows
Invoke other workflows:
tasks:
- name: provision_infrastructure
action: infrastructure.provision_stack # This is also a workflow
input:
stack_name: "{{ parameters.app_name }}-{{ parameters.environment }}"
region: "{{ parameters.region }}"
on_success: deploy_application
Data Model Changes
New Tables
1. workflow_definition Table
CREATE TABLE attune.workflow_definition (
id BIGSERIAL PRIMARY KEY,
ref VARCHAR(255) NOT NULL UNIQUE,
pack BIGINT NOT NULL REFERENCES attune.pack(id) ON DELETE CASCADE,
pack_ref VARCHAR(255) NOT NULL,
label VARCHAR(255) NOT NULL,
description TEXT,
version VARCHAR(50) NOT NULL,
-- Workflow specification (parsed YAML)
param_schema JSONB,
out_schema JSONB,
definition JSONB NOT NULL, -- Full workflow definition
-- Metadata
tags TEXT[] DEFAULT '{}',
enabled BOOLEAN DEFAULT true,
created TIMESTAMPTZ DEFAULT NOW(),
updated TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_workflow_pack ON attune.workflow_definition(pack);
CREATE INDEX idx_workflow_enabled ON attune.workflow_definition(enabled);
2. workflow_execution Table
Tracks the state of a workflow execution:
CREATE TABLE attune.workflow_execution (
id BIGSERIAL PRIMARY KEY,
execution BIGINT NOT NULL REFERENCES attune.execution(id) ON DELETE CASCADE,
workflow_def BIGINT NOT NULL REFERENCES attune.workflow_definition(id),
-- Workflow state
current_tasks TEXT[] DEFAULT '{}', -- Currently executing task names
completed_tasks TEXT[] DEFAULT '{}',
failed_tasks TEXT[] DEFAULT '{}',
-- Variable context (scoped to this workflow)
variables JSONB DEFAULT '{}',
-- Graph traversal state
task_graph JSONB NOT NULL, -- Adjacency list representation
-- Status tracking
status attune.execution_status_enum NOT NULL DEFAULT 'requested',
error_message TEXT,
created TIMESTAMPTZ DEFAULT NOW(),
updated TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_workflow_exec_execution ON attune.workflow_execution(execution);
CREATE INDEX idx_workflow_exec_status ON attune.workflow_execution(status);
3. workflow_task_execution Table
Tracks individual task executions within a workflow:
CREATE TABLE attune.workflow_task_execution (
id BIGSERIAL PRIMARY KEY,
workflow_execution BIGINT NOT NULL REFERENCES attune.workflow_execution(id) ON DELETE CASCADE,
execution BIGINT NOT NULL REFERENCES attune.execution(id) ON DELETE CASCADE,
task_name VARCHAR(255) NOT NULL,
task_index INTEGER, -- For with-items iterations
-- Status
status attune.execution_status_enum NOT NULL DEFAULT 'requested',
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
-- Results
result JSONB,
error JSONB,
-- Retry tracking
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 0,
created TIMESTAMPTZ DEFAULT NOW(),
updated TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_wf_task_exec_workflow ON attune.workflow_task_execution(workflow_execution);
CREATE INDEX idx_wf_task_exec_execution ON attune.workflow_task_execution(execution);
CREATE INDEX idx_wf_task_exec_status ON attune.workflow_task_execution(status);
Modified Tables
Update action Table
Add a flag to distinguish workflow actions:
ALTER TABLE attune.action ADD COLUMN is_workflow BOOLEAN DEFAULT false;
ALTER TABLE attune.action ADD COLUMN workflow_def BIGINT REFERENCES attune.workflow_definition(id);
Variable Scoping and Templating
Template Engine
Use a Jinja2-like template engine (recommend tera crate in Rust) for variable interpolation.
Variable Scopes
Variables are accessible from multiple scopes in order of precedence:
-
task.*- Results from completed tasks{{ task.task_name.result.field }}{{ task.task_name.status }}{{ task.task_name.error }}
-
vars.*- Workflow-scoped variables{{ vars.deployment_id }}{{ vars.custom_variable }}
-
parameters.*- Input parameters to the workflow{{ parameters.app_name }}{{ parameters.environment }}
-
pack.config.*- Pack configuration{{ pack.config.api_key }}{{ pack.config.base_url }}
-
system.*- System-level variables{{ system.execution_id }}{{ system.timestamp }}{{ system.identity.login }}
-
kv.*- Key-value datastore{{ kv.get('global.feature_flags.new_ui') }}
Special Variables for Iteration
When using with_items:
{{ item }}- Current item or batch- Without
batch_size: Individual item value - With
batch_size: Array of items (batch)
- Without
{{ index }}- Zero-based index- Without
batch_size: Item index - With
batch_size: Batch index
- Without
Example usage:
# Without batch_size - individual items
- name: process_one_by_one
with_items: "{{ parameters.files }}"
input:
file: "{{ item }}" # Single filename
position: "{{ index }}" # Item position
# With batch_size - arrays
- name: process_in_batches
with_items: "{{ parameters.files }}"
batch_size: 10
input:
files: "{{ item }}" # Array of up to 10 filenames
batch_num: "{{ index }}" # Batch number
Template Helper Functions
# String manipulation
message: "{{ parameters.app_name | upper }}"
path: "{{ vars.base_path | trim | append('/logs') }}"
# JSON manipulation
config: "{{ vars.raw_config | from_json }}"
payload: "{{ vars.data | to_json }}"
# List operations
regions: "{{ parameters.all_regions | filter(enabled=true) }}"
first_region: "{{ parameters.regions | first }}"
count: "{{ vars.results | length }}"
# Batching helper
batches: "{{ vars.large_list | batch(size=100) }}"
# Conditional helpers
status: "{{ task.deploy.status | default('unknown') }}"
url: "{{ vars.host | default('localhost') | prepend('https://') }}"
# Key-value store access
flag: "{{ kv.get('feature.flags.enabled', default=false) }}"
secret: "{{ kv.get_secret('api.credentials.token') }}"
Workflow Execution Engine
Architecture
The workflow execution engine is a new component within the Executor Service:
┌────────────────────────────────────────────────┐
│ Executor Service │
├────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌───────────────────┐ │
│ │ Enforcement │ │ Execution │ │
│ │ Processor │ │ Scheduler │ │
│ └──────────────────┘ └───────────────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Workflow Engine (NEW) │ │
│ ├──────────────────────────────────────────┤ │
│ │ - Workflow Parser │ │
│ │ - Graph Executor │ │
│ │ - Variable Context Manager │ │
│ │ - Task Scheduler │ │
│ │ - State Machine │ │
│ └──────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────┘
Workflow Lifecycle
1. Rule triggers workflow action
2. Executor recognizes workflow action (is_workflow = true)
3. Workflow Engine loads workflow definition
4. Create workflow_execution record
5. Initialize variable context with parameters
6. Build task dependency graph
7. Schedule initial tasks (entry points with no dependencies)
8. For each task:
a. Template task inputs using current variable context
b. Create child execution for the action
c. Publish execution.scheduled message
d. Create workflow_task_execution record
9. Worker executes task and publishes result
10. Workflow Engine receives execution.completed
a. Update workflow_task_execution
b. Publish variables to workflow context
c. Evaluate next tasks based on transitions
d. Schedule next tasks or complete workflow
11. Repeat until all tasks complete or workflow fails
12. Update parent workflow execution status
13. Publish workflow.completed event
State Machine
┌──────────────┐
│ Requested │
└──────┬───────┘
│
┌──────▼───────┐
│ Scheduling │
└──────┬───────┘
│
┌──────▼───────┐
│ Running │◄────┐
└──┬───┬───┬───┘ │
│ │ │ │
┌────────────┘ │ └─────┐ │
│ │ │ │
┌────▼────┐ ┌─────▼─────┐ │ │
│ Paused │ │ Waiting │───┘ │
└────┬────┘ │ (for task)│ │
│ └───────────┘ │
└─────────────────────────────┘
┌────────────┬────────────┐
│ │ │
┌────▼────┐ ┌───▼─────┐ ┌──▼────────┐
│Completed│ │ Failed │ │ Cancelled │
└─────────┘ └─────────┘ └───────────┘
Core Components (Rust Implementation)
1. Workflow Definition Parser
// crates/executor/src/workflow/parser.rs
pub struct WorkflowDefinition {
pub ref_: String,
pub label: String,
pub parameters: JsonSchema,
pub output: JsonSchema,
pub vars: HashMap<String, JsonValue>,
pub tasks: Vec<TaskDefinition>,
pub output_map: HashMap<String, String>,
}
pub struct TaskDefinition {
pub name: String,
pub task_type: TaskType,
pub action: Option<String>,
pub input: HashMap<String, String>, // Template strings
pub publish: Vec<PublishMapping>,
pub transitions: TaskTransitions,
pub retry: Option<RetryPolicy>,
pub timeout: Option<u64>,
pub when: Option<String>, // Condition template
pub with_items: Option<String>, // List template
pub batch_size: Option<usize>,
}
pub enum TaskType {
Action,
Parallel,
Workflow, // Subworkflow
}
pub struct TaskTransitions {
pub on_success: Option<String>,
pub on_failure: Option<String>,
pub on_complete: Option<String>,
pub on_timeout: Option<String>,
pub decision: Vec<DecisionBranch>,
}
pub struct DecisionBranch {
pub when: Option<String>, // Condition template
pub next: String,
pub is_default: bool,
}
2. Variable Context Manager
// crates/executor/src/workflow/context.rs
pub struct WorkflowContext {
pub execution_id: i64,
pub parameters: JsonValue,
pub vars: HashMap<String, JsonValue>,
pub task_results: HashMap<String, TaskResult>,
pub pack_config: JsonValue,
pub system: SystemContext,
}
impl WorkflowContext {
pub fn render_template(&self, template: &str) -> Result<String> {
// Use tera template engine
let mut tera = Tera::default();
let context = self.to_tera_context();
tera.render_str(template, &context)
}
pub fn publish_variables(&mut self, mappings: &[PublishMapping]) -> Result<()> {
for mapping in mappings {
let value = self.render_template(&mapping.template)?;
self.vars.insert(mapping.var_name.clone(), value);
}
Ok(())
}
}
3. Graph Executor
// crates/executor/src/workflow/graph.rs
pub struct TaskGraph {
nodes: HashMap<String, TaskNode>,
edges: HashMap<String, Vec<Edge>>,
}
pub struct TaskNode {
pub name: String,
pub definition: TaskDefinition,
pub status: TaskStatus,
}
pub struct Edge {
pub from: String,
pub to: String,
pub condition: EdgeCondition,
}
pub enum EdgeCondition {
OnSuccess,
OnFailure,
OnComplete,
OnTimeout,
Decision(String), // Template condition
}
impl TaskGraph {
pub fn find_next_tasks(&self, completed_task: &str, result: &TaskResult) -> Vec<String> {
// Traverse graph based on task result and transitions
}
pub fn get_ready_tasks(&self) -> Vec<&TaskNode> {
// Find tasks with all dependencies satisfied
}
}
4. Workflow Executor
// crates/executor/src/workflow/executor.rs
pub struct WorkflowExecutor {
pool: PgPool,
publisher: MessagePublisher,
template_engine: Tera,
}
impl WorkflowExecutor {
pub async fn execute_workflow(
&self,
execution_id: i64,
workflow_def: WorkflowDefinition,
parameters: JsonValue,
) -> Result<()> {
// 1. Create workflow_execution record
let wf_exec = self.create_workflow_execution(execution_id, &workflow_def).await?;
// 2. Initialize variable context
let mut context = WorkflowContext::new(execution_id, parameters, &workflow_def);
// 3. Build task graph
let graph = TaskGraph::from_definition(&workflow_def)?;
// 4. Schedule initial tasks
let initial_tasks = graph.get_ready_tasks();
for task in initial_tasks {
self.schedule_task(&wf_exec, task, &context).await?;
}
Ok(())
}
pub async fn handle_task_completion(
&self,
wf_execution_id: i64,
task_name: String,
result: TaskResult,
) -> Result<()> {
// 1. Load workflow execution and context
let wf_exec = self.load_workflow_execution(wf_execution_id).await?;
let mut context = self.load_context(&wf_exec).await?;
// 2. Update task result in context
context.task_results.insert(task_name.clone(), result.clone());
// 3. Publish variables from task
let task_def = wf_exec.definition.get_task(&task_name)?;
context.publish_variables(&task_def.publish)?;
// 4. Save updated context
self.save_context(wf_execution_id, &context).await?;
// 5. Determine next tasks
let next_tasks = wf_exec.graph.find_next_tasks(&task_name, &result);
// 6. Schedule next tasks
for next_task in next_tasks {
let task_def = wf_exec.definition.get_task(&next_task)?;
// Evaluate condition if present
if let Some(when) = &task_def.when {
let should_run = context.evaluate_condition(when)?;
if !should_run {
continue;
}
}
self.schedule_task(&wf_exec, task_def, &context).await?;
}
// 7. Check if workflow is complete
if self.is_workflow_complete(&wf_exec).await? {
self.complete_workflow(wf_execution_id, &context).await?;
}
Ok(())
}
async fn schedule_task(
&self,
wf_exec: &WorkflowExecution,
task: &TaskDefinition,
context: &WorkflowContext,
) -> Result<()> {
// Handle with_items (iteration)
let items = if let Some(with_items_template) = &task.with_items {
let items_json = context.render_template(with_items_template)?;
serde_json::from_str::<Vec<JsonValue>>(&items_json)?
} else {
vec![JsonValue::Null] // Single execution
};
// Batch items if batch_size specified
let batches = if let Some(batch_size) = task.batch_size {
items.chunks(batch_size).collect::<Vec<_>>()
} else {
items.iter().map(|i| vec![i]).collect::<Vec<_>>()
};
for (batch_idx, batch) in batches.iter().enumerate() {
for (item_idx, item) in batch.iter().enumerate() {
// Create item-specific context
let mut item_context = context.clone();
item_context.add_item_vars(item, item_idx, batch, batch_idx);
// Render task inputs
let rendered_input = self.render_task_input(task, &item_context)?;
// Create child execution
let execution = self.create_task_execution(
wf_exec.id,
&task.action.unwrap(),
rendered_input,
wf_exec.execution,
).await?;
// Create workflow_task_execution record
self.create_workflow_task_execution(
wf_exec.id,
execution.id,
&task.name,
item_idx,
).await?;
// Publish execution.scheduled message
self.publisher.publish_execution_scheduled(execution.id).await?;
}
}
Ok(())
}
}
Message Flow
New Message Types
// Workflow-specific messages
pub enum WorkflowMessage {
WorkflowStarted { execution_id: i64, workflow_id: i64 },
WorkflowCompleted { execution_id: i64, result: JsonValue },
WorkflowFailed { execution_id: i64, error: String },
TaskScheduled { workflow_execution_id: i64, task_name: String },
TaskCompleted { workflow_execution_id: i64, task_name: String, result: TaskResult },
TaskFailed { workflow_execution_id: i64, task_name: String, error: String },
}
API Endpoints
Workflow Management
POST /api/v1/packs/{pack_ref}/workflows - Create workflow
GET /api/v1/packs/{pack_ref}/workflows - List workflows
GET /api/v1/workflows/{workflow_ref} - Get workflow
PUT /api/v1/workflows/{workflow_ref} - Update workflow
DELETE /api/v1/workflows/{workflow_ref} - Delete workflow
POST /api/v1/workflows/{workflow_ref}/execute - Execute workflow directly
GET /api/v1/workflows/{workflow_ref}/executions - List executions
Workflow Execution Monitoring
GET /api/v1/workflow-executions/{id} - Get workflow execution
GET /api/v1/workflow-executions/{id}/tasks - List task executions
GET /api/v1/workflow-executions/{id}/graph - Get execution graph
POST /api/v1/workflow-executions/{id}/pause - Pause workflow
POST /api/v1/workflow-executions/{id}/resume - Resume workflow
POST /api/v1/workflow-executions/{id}/cancel - Cancel workflow
POST /api/v1/workflow-executions/{id}/retry - Retry failed workflow
Pack Structure with Workflows
packs/
└── my_pack/
├── pack.yaml # Pack metadata
├── config.yaml # Pack configuration schema
├── actions/
│ ├── action1.py
│ ├── action2.py
│ └── action.yaml # Action metadata
├── sensors/
│ ├── sensor1.py
│ └── sensor.yaml
├── workflows/ # NEW
│ ├── deploy.yaml
│ ├── backup.yaml
│ └── migrate.yaml
├── rules/
│ └── on_push.yaml
└── tests/
└── test_workflows.yaml
Workflow Pack Registration
When a pack is registered, the system:
- Scans
workflows/directory for.yamlfiles - Parses and validates each workflow definition
- Creates
workflow_definitionrecord - Creates synthetic
actionrecord withis_workflow=true - Makes workflow invokable like any other action
Implementation Plan
Phase 1: Foundation (Week 1-2)
-
Database Schema
- Create migration for workflow tables
- Add workflow columns to action table
- Create indexes
-
Data Models
- Add workflow models to
common/src/models.rs - Create workflow definition parser
- Create repositories
- Add workflow models to
-
Template Engine
- Integrate
teracrate - Implement variable context
- Create helper functions
- Integrate
Phase 2: Core Engine (Week 3-4)
-
Graph Engine
- Implement task graph builder
- Implement graph traversal logic
- Handle conditional branching
-
Workflow Executor
- Implement workflow execution logic
- Handle task scheduling
- Implement state management
-
Message Integration
- Add workflow message handlers
- Integrate with executor service
- Handle task completion events
Phase 3: Advanced Features (Week 5-6)
-
Iteration Support
- Implement
with_itemslogic - Add batching support
- Handle concurrent iterations
- Implement
-
Parallel Execution
- Implement parallel task type
- Handle synchronization
- Aggregate results
-
Error Handling
- Implement retry logic
- Handle timeouts
- Implement failure paths
Phase 4: API & Tools (Week 7-8)
-
API Endpoints
- Workflow CRUD operations
- Execution monitoring
- Control operations (pause/resume/cancel)
-
CLI Support
- Workflow validation command
- Workflow execution command
- Workflow visualization
-
Testing & Documentation
- Unit tests for all components
- Integration tests
- Example workflows
- User documentation
Testing Strategy
Unit Tests
- Template rendering with all scope types
- Graph traversal algorithms
- Condition evaluation
- Variable publishing
- Task scheduling logic
Integration Tests
- End-to-end workflow execution
- Parallel task execution
- Error handling and retry
- Workflow cancellation
- Nested workflow execution
Example Test Workflows
# tests/workflows/simple_sequence.yaml
ref: test.simple_sequence
label: "Simple Sequential Workflow Test"
tasks:
- name: task1
action: core.echo
input:
message: "Task 1"
on_success: task2
- name: task2
action: core.echo
input:
message: "Task 2"
Performance Considerations
- Graph Optimization: Cache parsed workflow graphs
- Template Caching: Cache compiled templates per workflow
- Parallel Scheduling: Schedule independent tasks concurrently
- Database Batching: Batch task creation/updates
- Context Serialization: Use efficient JSON serialization
Security Considerations
- Template Injection: Sanitize template inputs
- Variable Scoping: Strict scope isolation between workflows
- Secret Access: Only allow
kv.get_secret()for authorized users - Resource Limits: Enforce max task count, max depth, max iterations
- Audit Trail: Log all workflow decisions and transitions
Monitoring & Observability
Metrics
- Workflow execution duration
- Task execution duration
- Workflow success/failure rate
- Task retry count
- Queue depth for workflow tasks
Logging
INFO Workflow execution started: id=123, workflow=deploy_app
INFO Task scheduled: workflow=123, task=build_image
INFO Task completed: workflow=123, task=build_image, duration=45s
INFO Publishing variables: deployment_id=456
INFO Scheduling next tasks: [deploy_containers, health_check]
INFO Workflow execution completed: id=123, duration=2m30s
Tracing
- Link all tasks to parent workflow execution
- Propagate trace_id through entire workflow
- Enable distributed tracing visualization
Future Enhancements
- Workflow Versioning: Support multiple versions of same workflow
- Workflow Templates: Reusable workflow patterns
- Dynamic Workflows: Generate workflow graph at runtime
- Workflow Marketplace: Share workflows across organizations
- Workflow Testing Framework: Built-in testing tools
- Workflow Debugger: Step-through execution debugging
- Workflow Visualization: Real-time execution graph UI
- Workflow Analytics: Performance analysis and optimization suggestions
References
Appendix: Complete Workflow Example
See docs/examples/complete-workflow.yaml for a comprehensive example demonstrating all workflow features.