18 KiB
Workflow Orchestration Implementation Plan
Executive Summary
This document outlines the implementation plan for adding workflow orchestration capabilities to Attune. Workflows enable composing multiple actions into complex, conditional execution graphs with variable passing, iteration, and error handling.
Key Design Decisions
1. Workflows as Actions
Workflows are first-class actions that can be:
- Triggered by rules (event-driven)
- Invoked by other workflows (composable)
- Executed directly via API
- Referenced in the same way as regular actions
2. YAML-Based Definition
Workflows are defined declaratively in YAML files within pack directories, making them:
- Version-controllable
- Human-readable
- Easy to author and maintain
- Portable across environments
3. Event-Driven Execution
Workflows leverage the existing message queue infrastructure:
- Each task creates a child execution
- Tasks execute asynchronously via workers
- Progress is tracked via execution status messages
- No blocking or polling required
4. Multi-Scope Variable System
Variables are accessible from 6 scopes (in precedence order):
task.*- Results from completed tasksvars.*- Workflow-scoped variablesparameters.*- Input parameterspack.config.*- Pack configurationsystem.*- System variables (execution_id, timestamp, identity)kv.*- Key-value datastore
Architecture Overview
┌────────────────────────────────────────────────────────────┐
│ Attune Platform │
├────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌────────────┐ ┌──────────────────┐ │
│ │ API Service │ │ Executor │ │ Worker Service │ │
│ │ │ │ Service │ │ │ │
│ │ Workflow │ │ │ │ Runtime Engine │ │
│ │ CRUD │ │ ┌────────┐ │ │ │ │
│ │ │ │ │Workflow│ │ │ Execute Actions │ │
│ │ │ │ │Engine │ │ │ │ │
│ └─────────────┘ │ │ │ │ └──────────────────┘ │
│ │ │- Parser│ │ │
│ │ │- Graph │ │ │
│ │ │- Context│ │ │
│ │ │- Sched │ │ │
│ │ └────────┘ │ │
│ └────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ PostgreSQL Database │ │
│ │ - workflow_definition │ │
│ │ - workflow_execution │ │
│ │ - workflow_task_execution │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────┘
Database Schema Changes
New Tables
-
workflow_definition- Stores parsed workflow YAML as JSON
- Links to pack
- Contains parameter/output schemas
- Full task graph definition
-
workflow_execution- Tracks runtime state of workflow
- Stores variable context
- Maintains task completion tracking
- Links to parent execution
-
workflow_task_execution- Individual task execution tracking
- Supports iteration (with-items)
- Retry tracking
- Result storage
Modified Tables
actiontable gets two new columns:is_workflow(boolean)workflow_def(foreign key)
Core Features
1. Sequential Execution
Tasks execute one after another based on transitions:
tasks:
- name: task1
action: pack.action1
on_success: task2
- name: task2
action: pack.action2
2. Parallel Execution
Multiple tasks execute concurrently:
tasks:
- name: parallel_checks
type: parallel
tasks:
- name: check_db
action: db.health
- name: check_cache
action: cache.health
on_success: deploy
3. Conditional Branching
Execute tasks based on conditions:
tasks:
- name: check_env
action: core.noop
decision:
- when: "{{ parameters.env == 'production' }}"
next: require_approval
- default: deploy_directly
4. Iteration (with-items)
Process lists with optional batching:
tasks:
- name: deploy_regions
action: deploy.to_region
with_items: "{{ parameters.regions }}"
batch_size: 5 # Process 5 at a time
input:
region: "{{ item }}"
5. Variable Publishing
Tasks can publish results to workflow scope:
tasks:
- name: create_resource
action: cloud.create
publish:
- resource_id: "{{ task.create_resource.result.id }}"
- resource_url: "{{ task.create_resource.result.url }}"
6. Error Handling & Retry
Built-in retry with backoff:
tasks:
- name: flaky_task
action: http.request
retry:
count: 5
delay: 10
backoff: exponential
on_success: next_task
on_failure: cleanup_task
7. Human-in-the-Loop
Integrate inquiry (approval) steps:
tasks:
- name: require_approval
action: core.inquiry
input:
prompt: "Approve deployment?"
schema:
type: object
properties:
approved:
type: boolean
decision:
- when: "{{ task.require_approval.result.approved }}"
next: deploy
- default: cancel
8. Nested Workflows
Workflows can invoke other workflows:
tasks:
- name: provision_infra
action: infrastructure.full_stack # This is also a workflow
input:
environment: "{{ parameters.env }}"
Template System
Template Engine: Tera (Rust)
- Jinja2-like syntax
- Variable interpolation:
{{ vars.name }} - Filters:
{{ text | upper }} - Conditionals:
{{ value ? 'yes' : 'no' }}
Helper Functions
# String operations
message: "{{ parameters.name | upper | trim }}"
# List operations
first: "{{ vars.list | first }}"
count: "{{ vars.list | length }}"
# JSON operations
parsed: "{{ vars.json_string | from_json }}"
# Batching
batches: "{{ vars.items | batch(size=100) }}"
# Key-value store
value: "{{ kv.get('config.key', default='fallback') }}"
secret: "{{ kv.get_secret('api.token') }}"
Workflow Lifecycle
1. Rule/API triggers workflow action
↓
2. Executor detects is_workflow=true
↓
3. Load workflow_definition from database
↓
4. Create workflow_execution record
↓
5. Initialize variable context with parameters
↓
6. Build task dependency graph
↓
7. Schedule initial tasks (entry points)
↓
8. For each task:
a. Template task inputs
b. Create child execution
c. Create workflow_task_execution record
d. Publish execution.scheduled message
↓
9. Worker executes task, publishes result
↓
10. Workflow Engine receives completion:
a. Update workflow_task_execution
b. Publish variables to context
c. Evaluate transitions
d. Schedule next tasks
↓
11. Repeat until all tasks complete
↓
12. Update workflow_execution status
↓
13. Publish workflow.completed event
Implementation Phases
Phase 1: Foundation (2 weeks)
Goal: Core data structures and parsing
- Database migration for workflow tables
- Add workflow models to
common/src/models.rs - Create workflow repositories
- Implement YAML parser for workflow definitions
- Integrate Tera template engine
- Create variable context manager
Deliverables:
- Migration file:
migrations/020_workflow_orchestration.sql - Models:
common/src/models/workflow.rs - Repositories:
common/src/repositories/workflow*.rs - Parser:
executor/src/workflow/parser.rs - Context:
executor/src/workflow/context.rs
Phase 2: Execution Engine (2 weeks)
Goal: Core workflow execution logic
- Implement task graph builder
- Implement graph traversal logic
- Create workflow executor service
- Add workflow message handlers
- Implement task scheduling
- Handle task completion events
Deliverables:
- Graph engine:
executor/src/workflow/graph.rs - Executor:
executor/src/workflow/executor.rs - Message handlers:
executor/src/workflow/messages.rs - State machine:
executor/src/workflow/state.rs
Phase 3: Advanced Features (2 weeks)
Goal: Iteration, parallelism, error handling
- Implement with-items iteration
- Add batching support
- Implement parallel task execution
- Add retry logic with backoff
- Implement timeout handling
- Add conditional branching (decision trees)
Deliverables:
- Iterator:
executor/src/workflow/iterator.rs - Parallel executor:
executor/src/workflow/parallel.rs - Retry handler:
executor/src/workflow/retry.rs
Phase 4: API & Tools (2 weeks)
Goal: Management interface and tooling
- Workflow CRUD API endpoints
- Workflow execution monitoring API
- Control operations (pause/resume/cancel)
- Workflow validation CLI command
- Workflow visualization endpoint
- Pack registration workflow scanning
Deliverables:
- API routes:
api/src/routes/workflows.rs - API handlers:
api/src/handlers/workflows.rs - CLI commands:
cli/src/commands/workflow.rs(future) - Documentation updates
Phase 5: Testing & Documentation (1 week)
Goal: Comprehensive testing and docs
- Unit tests for all components
- Integration tests for workflows
- Example workflows (simple, complex, failure cases)
- User documentation
- API documentation
- Migration guide
Deliverables:
- Test suite:
executor/tests/workflow_tests.rs - Examples:
docs/examples/workflow-*.yaml - User guide:
docs/workflow-user-guide.md - Migration guide:
docs/workflow-migration.md
Total Timeline: 9 Weeks
Testing Strategy
Unit Tests
- Template rendering with all scope types
- Graph construction and traversal
- Condition evaluation
- Variable publishing
- Task scheduling logic
- Retry logic
- Timeout handling
Integration Tests
- Simple sequential workflow
- Parallel execution workflow
- Conditional branching workflow
- Iteration workflow (with batching)
- Error handling and retry
- Nested workflow execution
- Workflow cancellation
- Long-running workflow
Example Test Workflows
Located in docs/examples/:
simple-workflow.yaml- Basic sequential flowcomplete-workflow.yaml- All features demonstratedparallel-workflow.yaml- Parallel executionconditional-workflow.yaml- Branching logiciteration-workflow.yaml- with-items examples
API Endpoints
Workflow Management
POST /api/v1/packs/{pack_ref}/workflows - Create workflow
GET /api/v1/packs/{pack_ref}/workflows - List workflows in pack
GET /api/v1/workflows - List all workflows
GET /api/v1/workflows/{workflow_ref} - Get workflow definition
PUT /api/v1/workflows/{workflow_ref} - Update workflow
DELETE /api/v1/workflows/{workflow_ref} - Delete workflow
POST /api/v1/workflows/{workflow_ref}/execute - Execute workflow directly
POST /api/v1/workflows/{workflow_ref}/validate - Validate workflow definition
Workflow Execution Management
GET /api/v1/workflow-executions - List workflow executions
GET /api/v1/workflow-executions/{id} - Get workflow execution details
GET /api/v1/workflow-executions/{id}/tasks - List task executions
GET /api/v1/workflow-executions/{id}/graph - Get execution graph (visualization)
GET /api/v1/workflow-executions/{id}/context - Get variable context
POST /api/v1/workflow-executions/{id}/pause - Pause workflow
POST /api/v1/workflow-executions/{id}/resume - Resume paused workflow
POST /api/v1/workflow-executions/{id}/cancel - Cancel workflow
POST /api/v1/workflow-executions/{id}/retry - Retry failed workflow
Pack Structure with Workflows
packs/
└── my_pack/
├── pack.yaml # Pack metadata
├── config.yaml # Pack configuration schema
├── actions/
│ ├── action1.py
│ ├── action2.py
│ └── action.yaml
├── sensors/
│ ├── sensor1.py
│ └── sensor.yaml
├── workflows/ # NEW: Workflow definitions
│ ├── deploy.yaml
│ ├── backup.yaml
│ ├── migrate.yaml
│ └── rollback.yaml
├── rules/
│ └── on_push.yaml
└── tests/
├── test_actions.py
└── test_workflows.yaml # Workflow test definitions
Pack Registration Process
When a pack is registered:
- Scan
workflows/directory for.yamlfiles - Parse and validate each workflow definition
- Create
workflow_definitionrecord in database - Create synthetic
actionrecord withis_workflow=true - Link action to workflow via
workflow_defforeign key - Workflow is now invokable like any other action
Performance Considerations
Optimizations
- Graph Caching: Cache parsed task graphs per workflow definition
- Template Compilation: Compile templates once, reuse for iterations
- Parallel Scheduling: Schedule independent tasks concurrently
- Database Batching: Batch task creation/updates when using with-items
- Context Serialization: Use efficient JSON serialization for variable context
Resource Limits
- Max workflow depth: 10 levels (prevent infinite recursion)
- Max tasks per workflow: 1000 (prevent resource exhaustion)
- Max iterations per with-items: 10,000 (configurable)
- Max parallel tasks: 100 (configurable)
- Variable context size: 10MB (prevent memory issues)
Security Considerations
- Template Injection: Sanitize all template inputs, no arbitrary code execution
- Variable Scoping: Strict isolation between workflow executions
- Secret Access: Only allow
kv.get_secret()for authorized identities - Resource Limits: Enforce max task count, depth, iterations
- Audit Trail: Log all workflow decisions, transitions, variable changes
- RBAC: Workflow execution requires action execution permissions
- Input Validation: Validate parameters against param_schema
Monitoring & Observability
Metrics to Track
- Workflow executions per second
- Average workflow duration
- Task execution duration (p50, p95, p99)
- Workflow success/failure rates
- Task retry counts
- Queue depth for workflow tasks
- Variable context size distribution
Logging Standards
INFO [workflow.start] execution=123 workflow=deploy_app version=1.0.0
INFO [workflow.task.schedule] execution=123 task=build_image
INFO [workflow.task.complete] execution=123 task=build_image duration=45s
INFO [workflow.vars.publish] execution=123 vars=["image_uri"]
INFO [workflow.task.schedule] execution=123 tasks=["deploy","health_check"]
WARN [workflow.task.retry] execution=123 task=flaky_api attempt=2
ERROR [workflow.task.failed] execution=123 task=deploy_db error="connection_timeout"
INFO [workflow.complete] execution=123 status=success duration=2m30s
Distributed Tracing
- Propagate
trace_idthrough entire workflow - Link all task executions to parent workflow
- Enable end-to-end request tracing
- Integration with OpenTelemetry (future)
Dependencies
New Rust Crates
- tera (^1.19) - Template engine
- petgraph (^0.6) - Graph data structures and algorithms
Existing Dependencies
- sqlx - Database access
- serde/serde_json - Serialization
- tokio - Async runtime
- lapin - RabbitMQ client
Future Enhancements
Short Term (3-6 months)
- Workflow versioning (multiple versions of same workflow)
- Workflow pausing/resuming with state persistence
- Advanced retry strategies (circuit breaker, adaptive)
- Workflow templates (reusable patterns)
Medium Term (6-12 months)
- Dynamic workflows (generate graph at runtime)
- Workflow debugging tools (step-through execution)
- Performance analytics and optimization suggestions
- Workflow marketplace (share workflows)
Long Term (12+ months)
- Visual workflow editor (drag-and-drop UI)
- AI-powered workflow generation
- Workflow optimization recommendations
- Multi-cloud orchestration patterns
Success Criteria
This implementation will be considered successful when:
- ✅ Workflows can be defined in YAML and registered via packs
- ✅ Workflows execute reliably with all features working
- ✅ Variables are properly scoped and templated across all scopes
- ✅ Parallel execution works correctly with proper synchronization
- ✅ Iteration handles lists efficiently with batching
- ✅ Error handling and retry work as specified
- ✅ Human-in-the-loop (inquiry) tasks integrate seamlessly
- ✅ Nested workflows execute correctly
- ✅ API provides full CRUD and control operations
- ✅ Comprehensive tests cover all features
- ✅ Documentation enables users to create workflows easily
References
- Full design:
docs/workflow-orchestration.md - Simple example:
docs/examples/simple-workflow.yaml - Complex example:
docs/examples/complete-workflow.yaml - Migration SQL:
docs/examples/workflow-migration.sql
Next Steps
- Review this plan with stakeholders
- Prioritize features if timeline needs adjustment
- Set up project tracking (GitHub issues/milestones)
- Begin Phase 1 implementation
- Schedule weekly progress reviews