Files
attune/docs/workflows/workflow-orchestration.md
2026-02-04 17:46:30 -06:00

1098 lines
33 KiB
Markdown

# Workflow Orchestration Design
## Overview
This document describes the architecture and implementation plan for **workflow orchestration** in Attune. Workflows enable the composition of multiple actions into complex, conditional execution graphs with variable passing, iteration, and error handling.
## Design Philosophy
Workflows in Attune follow these core principles:
1. **Workflows are Actions**: A workflow is itself an action that can be invoked by rules, other workflows, or directly via API
2. **YAML-First**: Workflow definitions are declarative YAML files stored in packs
3. **Event-Driven**: Workflows execute asynchronously via the same message queue infrastructure as regular actions
4. **Composable**: Workflows can invoke other workflows recursively
5. **Observable**: Each task in a workflow creates an execution record with full traceability
6. **Recoverable**: Failed workflows can be resumed or retried from specific tasks
## Architecture Components
### 1. Workflow Definition Format
Workflows are defined as YAML files in pack directories alongside regular actions:
```yaml
# packs/my_pack/workflows/deploy_application.yaml
ref: my_pack.deploy_application
label: "Deploy Application Workflow"
description: "Deploys application with health checks and rollback"
version: "1.0.0"
# Input parameters (like action parameters)
parameters:
app_name:
type: string
required: true
description: "Name of the application to deploy"
version:
type: string
required: true
environment:
type: string
enum: [dev, staging, production]
default: dev
# Output schema (what this workflow produces)
output:
type: object
properties:
deployment_id:
type: string
status:
type: string
deployed_version:
type: string
# Workflow variables (scoped to this workflow execution)
vars:
deployment_id: null
health_check_url: null
rollback_required: false
# Task graph definition
tasks:
# Task 1: Create deployment record
- name: create_deployment
action: my_pack.create_deployment_record
input:
app_name: "{{ parameters.app_name }}"
version: "{{ parameters.version }}"
environment: "{{ parameters.environment }}"
publish:
- deployment_id: "{{ task.create_deployment.result.id }}"
- health_check_url: "{{ task.create_deployment.result.health_url }}"
on_success: build_image
on_failure: notify_failure
# Task 2: Build container image
- name: build_image
action: docker.build_and_push
input:
app_name: "{{ parameters.app_name }}"
version: "{{ parameters.version }}"
registry: "{{ pack.config.docker_registry }}"
on_success: deploy_containers
on_failure: cleanup_deployment
# Task 3: Deploy containers
- name: deploy_containers
action: kubernetes.apply_deployment
input:
app_name: "{{ parameters.app_name }}"
image: "{{ task.build_image.result.image_uri }}"
replicas: 3
on_success: wait_for_ready
on_failure: rollback_deployment
# Task 4: Wait for pods to be ready
- name: wait_for_ready
action: kubernetes.wait_for_ready
input:
deployment: "{{ parameters.app_name }}"
timeout: 300
retry:
count: 3
delay: 10
on_success: health_check
on_failure: rollback_deployment
# Task 5: Perform health check
- name: health_check
action: http.get
input:
url: "{{ vars.health_check_url }}"
expected_status: 200
on_success: update_deployment_status
on_failure: rollback_deployment
# Task 6: Update deployment as successful
- name: update_deployment_status
action: my_pack.update_deployment_status
input:
deployment_id: "{{ vars.deployment_id }}"
status: "success"
on_success: notify_success
# Task 7: Rollback on failure
- name: rollback_deployment
action: kubernetes.rollback_deployment
input:
app_name: "{{ parameters.app_name }}"
publish:
- rollback_required: true
on_complete: cleanup_deployment
# Task 8: Cleanup resources
- name: cleanup_deployment
action: my_pack.cleanup_resources
input:
deployment_id: "{{ vars.deployment_id }}"
rollback: "{{ vars.rollback_required }}"
on_complete: notify_failure
# Task 9: Success notification
- name: notify_success
action: slack.post_message
input:
channel: "#deployments"
message: "✅ Deployed {{ parameters.app_name }} v{{ parameters.version }} to {{ parameters.environment }}"
# Task 10: Failure notification
- name: notify_failure
action: slack.post_message
input:
channel: "#deployments"
message: "❌ Failed to deploy {{ parameters.app_name }} v{{ parameters.version }}"
# Workflow output mapping
output_map:
deployment_id: "{{ vars.deployment_id }}"
status: "{{ task.update_deployment_status.result.status }}"
deployed_version: "{{ parameters.version }}"
```
### 2. Advanced Workflow Features
#### 2.1 Parallel Execution
Execute multiple tasks concurrently:
```yaml
tasks:
- name: parallel_checks
type: parallel
tasks:
- name: check_database
action: postgres.health_check
- name: check_redis
action: redis.ping
- name: check_rabbitmq
action: rabbitmq.cluster_status
# All parallel tasks must complete before proceeding
on_success: deploy_app
on_failure: abort_deployment
```
#### 2.2 Iteration (with-items)
Iterate over lists with optional batching:
```yaml
tasks:
# Without batch_size: process items individually (one execution per item)
- name: deploy_to_regions
action: cloud.deploy_instance
with_items: "{{ parameters.regions }}"
input:
# item is the individual region value
region: "{{ item }}"
instance_type: "{{ parameters.instance_type }}"
# Creates one execution per item
on_success: verify_deployments
# With batch_size: items split into batches (batch processing)
- name: process_large_dataset
action: data.transform
with_items: "{{ vars.records }}"
batch_size: 100 # Process 100 items at a time
concurrency: 5 # Process up to 5 batches concurrently
input:
# item is an array containing up to 100 records
# Last batch may contain fewer items
records: "{{ item }}"
publish:
- total_processed: "{{ task.process_large_dataset.total_count }}"
```
**Batch Processing Behavior**:
- **Without `batch_size`**: Each item is processed individually (one execution per item)
- `item` variable contains a single value
- `index` variable contains the item index (0-based)
- **With `batch_size`**: Items are split into batches and processed as arrays
- `item` variable contains an array of items (batch)
- `index` variable contains the batch index (0-based)
- Last batch may contain fewer items than `batch_size`
- Use `batch_size` for efficient bulk processing when actions support arrays
#### 2.3 Conditional Execution
Execute tasks based on conditions:
```yaml
tasks:
- name: check_environment
action: core.noop
when: "{{ parameters.environment == 'production' }}"
on_complete: require_approval
- name: require_approval
action: core.inquiry
input:
prompt: "Approve production deployment of {{ parameters.app_name }}?"
schema:
type: object
properties:
approved:
type: boolean
on_success: deploy_app
# Only proceed if approved
decision:
- when: "{{ task.require_approval.result.approved == true }}"
next: deploy_app
- default: cancel_deployment
```
#### 2.4 Error Handling and Retry
```yaml
tasks:
- name: flaky_api_call
action: http.post
input:
url: "{{ vars.api_endpoint }}"
retry:
count: 5
delay: 10 # seconds
backoff: exponential # linear, exponential, constant
max_delay: 60
on_error: "{{ task.flaky_api_call.error.type == 'timeout' }}"
on_success: process_response
on_failure: log_error
```
#### 2.5 Timeout and Cancellation
```yaml
tasks:
- name: long_running_task
action: ml.train_model
input:
dataset: "{{ vars.dataset_path }}"
timeout: 3600 # 1 hour
on_timeout: cleanup_and_notify
```
#### 2.6 Subworkflows
Invoke other workflows:
```yaml
tasks:
- name: provision_infrastructure
action: infrastructure.provision_stack # This is also a workflow
input:
stack_name: "{{ parameters.app_name }}-{{ parameters.environment }}"
region: "{{ parameters.region }}"
on_success: deploy_application
```
## Data Model Changes
### New Tables
#### 1. `workflow_definition` Table
```sql
CREATE TABLE attune.workflow_definition (
id BIGSERIAL PRIMARY KEY,
ref VARCHAR(255) NOT NULL UNIQUE,
pack BIGINT NOT NULL REFERENCES attune.pack(id) ON DELETE CASCADE,
pack_ref VARCHAR(255) NOT NULL,
label VARCHAR(255) NOT NULL,
description TEXT,
version VARCHAR(50) NOT NULL,
-- Workflow specification (parsed YAML)
param_schema JSONB,
out_schema JSONB,
definition JSONB NOT NULL, -- Full workflow definition
-- Metadata
tags TEXT[] DEFAULT '{}',
enabled BOOLEAN DEFAULT true,
created TIMESTAMPTZ DEFAULT NOW(),
updated TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_workflow_pack ON attune.workflow_definition(pack);
CREATE INDEX idx_workflow_enabled ON attune.workflow_definition(enabled);
```
#### 2. `workflow_execution` Table
Tracks the state of a workflow execution:
```sql
CREATE TABLE attune.workflow_execution (
id BIGSERIAL PRIMARY KEY,
execution BIGINT NOT NULL REFERENCES attune.execution(id) ON DELETE CASCADE,
workflow_def BIGINT NOT NULL REFERENCES attune.workflow_definition(id),
-- Workflow state
current_tasks TEXT[] DEFAULT '{}', -- Currently executing task names
completed_tasks TEXT[] DEFAULT '{}',
failed_tasks TEXT[] DEFAULT '{}',
-- Variable context (scoped to this workflow)
variables JSONB DEFAULT '{}',
-- Graph traversal state
task_graph JSONB NOT NULL, -- Adjacency list representation
-- Status tracking
status attune.execution_status_enum NOT NULL DEFAULT 'requested',
error_message TEXT,
created TIMESTAMPTZ DEFAULT NOW(),
updated TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_workflow_exec_execution ON attune.workflow_execution(execution);
CREATE INDEX idx_workflow_exec_status ON attune.workflow_execution(status);
```
#### 3. `workflow_task_execution` Table
Tracks individual task executions within a workflow:
```sql
CREATE TABLE attune.workflow_task_execution (
id BIGSERIAL PRIMARY KEY,
workflow_execution BIGINT NOT NULL REFERENCES attune.workflow_execution(id) ON DELETE CASCADE,
execution BIGINT NOT NULL REFERENCES attune.execution(id) ON DELETE CASCADE,
task_name VARCHAR(255) NOT NULL,
task_index INTEGER, -- For with-items iterations
-- Status
status attune.execution_status_enum NOT NULL DEFAULT 'requested',
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
-- Results
result JSONB,
error JSONB,
-- Retry tracking
retry_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 0,
created TIMESTAMPTZ DEFAULT NOW(),
updated TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_wf_task_exec_workflow ON attune.workflow_task_execution(workflow_execution);
CREATE INDEX idx_wf_task_exec_execution ON attune.workflow_task_execution(execution);
CREATE INDEX idx_wf_task_exec_status ON attune.workflow_task_execution(status);
```
### Modified Tables
#### Update `action` Table
Add a flag to distinguish workflow actions:
```sql
ALTER TABLE attune.action ADD COLUMN is_workflow BOOLEAN DEFAULT false;
ALTER TABLE attune.action ADD COLUMN workflow_def BIGINT REFERENCES attune.workflow_definition(id);
```
## Variable Scoping and Templating
### Template Engine
Use a Jinja2-like template engine (recommend **tera** crate in Rust) for variable interpolation.
### Variable Scopes
Variables are accessible from multiple scopes in order of precedence:
1. **`task.*`** - Results from completed tasks
- `{{ task.task_name.result.field }}`
- `{{ task.task_name.status }}`
- `{{ task.task_name.error }}`
2. **`vars.*`** - Workflow-scoped variables
- `{{ vars.deployment_id }}`
- `{{ vars.custom_variable }}`
3. **`parameters.*`** - Input parameters to the workflow
- `{{ parameters.app_name }}`
- `{{ parameters.environment }}`
4. **`pack.config.*`** - Pack configuration
- `{{ pack.config.api_key }}`
- `{{ pack.config.base_url }}`
5. **`system.*`** - System-level variables
- `{{ system.execution_id }}`
- `{{ system.timestamp }}`
- `{{ system.identity.login }}`
6. **`kv.*`** - Key-value datastore
- `{{ kv.get('global.feature_flags.new_ui') }}`
### Special Variables for Iteration
When using `with_items`:
- **`{{ item }}`** - Current item or batch
- **Without `batch_size`**: Individual item value
- **With `batch_size`**: Array of items (batch)
- **`{{ index }}`** - Zero-based index
- **Without `batch_size`**: Item index
- **With `batch_size`**: Batch index
Example usage:
```yaml
# Without batch_size - individual items
- name: process_one_by_one
with_items: "{{ parameters.files }}"
input:
file: "{{ item }}" # Single filename
position: "{{ index }}" # Item position
# With batch_size - arrays
- name: process_in_batches
with_items: "{{ parameters.files }}"
batch_size: 10
input:
files: "{{ item }}" # Array of up to 10 filenames
batch_num: "{{ index }}" # Batch number
```
### Template Helper Functions
```yaml
# String manipulation
message: "{{ parameters.app_name | upper }}"
path: "{{ vars.base_path | trim | append('/logs') }}"
# JSON manipulation
config: "{{ vars.raw_config | from_json }}"
payload: "{{ vars.data | to_json }}"
# List operations
regions: "{{ parameters.all_regions | filter(enabled=true) }}"
first_region: "{{ parameters.regions | first }}"
count: "{{ vars.results | length }}"
# Batching helper
batches: "{{ vars.large_list | batch(size=100) }}"
# Conditional helpers
status: "{{ task.deploy.status | default('unknown') }}"
url: "{{ vars.host | default('localhost') | prepend('https://') }}"
# Key-value store access
flag: "{{ kv.get('feature.flags.enabled', default=false) }}"
secret: "{{ kv.get_secret('api.credentials.token') }}"
```
## Workflow Execution Engine
### Architecture
The workflow execution engine is a new component within the **Executor Service**:
```
┌────────────────────────────────────────────────┐
│ Executor Service │
├────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌───────────────────┐ │
│ │ Enforcement │ │ Execution │ │
│ │ Processor │ │ Scheduler │ │
│ └──────────────────┘ └───────────────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Workflow Engine (NEW) │ │
│ ├──────────────────────────────────────────┤ │
│ │ - Workflow Parser │ │
│ │ - Graph Executor │ │
│ │ - Variable Context Manager │ │
│ │ - Task Scheduler │ │
│ │ - State Machine │ │
│ └──────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────┘
```
### Workflow Lifecycle
```
1. Rule triggers workflow action
2. Executor recognizes workflow action (is_workflow = true)
3. Workflow Engine loads workflow definition
4. Create workflow_execution record
5. Initialize variable context with parameters
6. Build task dependency graph
7. Schedule initial tasks (entry points with no dependencies)
8. For each task:
a. Template task inputs using current variable context
b. Create child execution for the action
c. Publish execution.scheduled message
d. Create workflow_task_execution record
9. Worker executes task and publishes result
10. Workflow Engine receives execution.completed
a. Update workflow_task_execution
b. Publish variables to workflow context
c. Evaluate next tasks based on transitions
d. Schedule next tasks or complete workflow
11. Repeat until all tasks complete or workflow fails
12. Update parent workflow execution status
13. Publish workflow.completed event
```
### State Machine
```
┌──────────────┐
│ Requested │
└──────┬───────┘
┌──────▼───────┐
│ Scheduling │
└──────┬───────┘
┌──────▼───────┐
│ Running │◄────┐
└──┬───┬───┬───┘ │
│ │ │ │
┌────────────┘ │ └─────┐ │
│ │ │ │
┌────▼────┐ ┌─────▼─────┐ │ │
│ Paused │ │ Waiting │───┘ │
└────┬────┘ │ (for task)│ │
│ └───────────┘ │
└─────────────────────────────┘
┌────────────┬────────────┐
│ │ │
┌────▼────┐ ┌───▼─────┐ ┌──▼────────┐
│Completed│ │ Failed │ │ Cancelled │
└─────────┘ └─────────┘ └───────────┘
```
### Core Components (Rust Implementation)
#### 1. Workflow Definition Parser
```rust
// crates/executor/src/workflow/parser.rs
pub struct WorkflowDefinition {
pub ref_: String,
pub label: String,
pub parameters: JsonSchema,
pub output: JsonSchema,
pub vars: HashMap<String, JsonValue>,
pub tasks: Vec<TaskDefinition>,
pub output_map: HashMap<String, String>,
}
pub struct TaskDefinition {
pub name: String,
pub task_type: TaskType,
pub action: Option<String>,
pub input: HashMap<String, String>, // Template strings
pub publish: Vec<PublishMapping>,
pub transitions: TaskTransitions,
pub retry: Option<RetryPolicy>,
pub timeout: Option<u64>,
pub when: Option<String>, // Condition template
pub with_items: Option<String>, // List template
pub batch_size: Option<usize>,
}
pub enum TaskType {
Action,
Parallel,
Workflow, // Subworkflow
}
pub struct TaskTransitions {
pub on_success: Option<String>,
pub on_failure: Option<String>,
pub on_complete: Option<String>,
pub on_timeout: Option<String>,
pub decision: Vec<DecisionBranch>,
}
pub struct DecisionBranch {
pub when: Option<String>, // Condition template
pub next: String,
pub is_default: bool,
}
```
#### 2. Variable Context Manager
```rust
// crates/executor/src/workflow/context.rs
pub struct WorkflowContext {
pub execution_id: i64,
pub parameters: JsonValue,
pub vars: HashMap<String, JsonValue>,
pub task_results: HashMap<String, TaskResult>,
pub pack_config: JsonValue,
pub system: SystemContext,
}
impl WorkflowContext {
pub fn render_template(&self, template: &str) -> Result<String> {
// Use tera template engine
let mut tera = Tera::default();
let context = self.to_tera_context();
tera.render_str(template, &context)
}
pub fn publish_variables(&mut self, mappings: &[PublishMapping]) -> Result<()> {
for mapping in mappings {
let value = self.render_template(&mapping.template)?;
self.vars.insert(mapping.var_name.clone(), value);
}
Ok(())
}
}
```
#### 3. Graph Executor
```rust
// crates/executor/src/workflow/graph.rs
pub struct TaskGraph {
nodes: HashMap<String, TaskNode>,
edges: HashMap<String, Vec<Edge>>,
}
pub struct TaskNode {
pub name: String,
pub definition: TaskDefinition,
pub status: TaskStatus,
}
pub struct Edge {
pub from: String,
pub to: String,
pub condition: EdgeCondition,
}
pub enum EdgeCondition {
OnSuccess,
OnFailure,
OnComplete,
OnTimeout,
Decision(String), // Template condition
}
impl TaskGraph {
pub fn find_next_tasks(&self, completed_task: &str, result: &TaskResult) -> Vec<String> {
// Traverse graph based on task result and transitions
}
pub fn get_ready_tasks(&self) -> Vec<&TaskNode> {
// Find tasks with all dependencies satisfied
}
}
```
#### 4. Workflow Executor
```rust
// crates/executor/src/workflow/executor.rs
pub struct WorkflowExecutor {
pool: PgPool,
publisher: MessagePublisher,
template_engine: Tera,
}
impl WorkflowExecutor {
pub async fn execute_workflow(
&self,
execution_id: i64,
workflow_def: WorkflowDefinition,
parameters: JsonValue,
) -> Result<()> {
// 1. Create workflow_execution record
let wf_exec = self.create_workflow_execution(execution_id, &workflow_def).await?;
// 2. Initialize variable context
let mut context = WorkflowContext::new(execution_id, parameters, &workflow_def);
// 3. Build task graph
let graph = TaskGraph::from_definition(&workflow_def)?;
// 4. Schedule initial tasks
let initial_tasks = graph.get_ready_tasks();
for task in initial_tasks {
self.schedule_task(&wf_exec, task, &context).await?;
}
Ok(())
}
pub async fn handle_task_completion(
&self,
wf_execution_id: i64,
task_name: String,
result: TaskResult,
) -> Result<()> {
// 1. Load workflow execution and context
let wf_exec = self.load_workflow_execution(wf_execution_id).await?;
let mut context = self.load_context(&wf_exec).await?;
// 2. Update task result in context
context.task_results.insert(task_name.clone(), result.clone());
// 3. Publish variables from task
let task_def = wf_exec.definition.get_task(&task_name)?;
context.publish_variables(&task_def.publish)?;
// 4. Save updated context
self.save_context(wf_execution_id, &context).await?;
// 5. Determine next tasks
let next_tasks = wf_exec.graph.find_next_tasks(&task_name, &result);
// 6. Schedule next tasks
for next_task in next_tasks {
let task_def = wf_exec.definition.get_task(&next_task)?;
// Evaluate condition if present
if let Some(when) = &task_def.when {
let should_run = context.evaluate_condition(when)?;
if !should_run {
continue;
}
}
self.schedule_task(&wf_exec, task_def, &context).await?;
}
// 7. Check if workflow is complete
if self.is_workflow_complete(&wf_exec).await? {
self.complete_workflow(wf_execution_id, &context).await?;
}
Ok(())
}
async fn schedule_task(
&self,
wf_exec: &WorkflowExecution,
task: &TaskDefinition,
context: &WorkflowContext,
) -> Result<()> {
// Handle with_items (iteration)
let items = if let Some(with_items_template) = &task.with_items {
let items_json = context.render_template(with_items_template)?;
serde_json::from_str::<Vec<JsonValue>>(&items_json)?
} else {
vec![JsonValue::Null] // Single execution
};
// Batch items if batch_size specified
let batches = if let Some(batch_size) = task.batch_size {
items.chunks(batch_size).collect::<Vec<_>>()
} else {
items.iter().map(|i| vec![i]).collect::<Vec<_>>()
};
for (batch_idx, batch) in batches.iter().enumerate() {
for (item_idx, item) in batch.iter().enumerate() {
// Create item-specific context
let mut item_context = context.clone();
item_context.add_item_vars(item, item_idx, batch, batch_idx);
// Render task inputs
let rendered_input = self.render_task_input(task, &item_context)?;
// Create child execution
let execution = self.create_task_execution(
wf_exec.id,
&task.action.unwrap(),
rendered_input,
wf_exec.execution,
).await?;
// Create workflow_task_execution record
self.create_workflow_task_execution(
wf_exec.id,
execution.id,
&task.name,
item_idx,
).await?;
// Publish execution.scheduled message
self.publisher.publish_execution_scheduled(execution.id).await?;
}
}
Ok(())
}
}
```
### Message Flow
#### New Message Types
```rust
// Workflow-specific messages
pub enum WorkflowMessage {
WorkflowStarted { execution_id: i64, workflow_id: i64 },
WorkflowCompleted { execution_id: i64, result: JsonValue },
WorkflowFailed { execution_id: i64, error: String },
TaskScheduled { workflow_execution_id: i64, task_name: String },
TaskCompleted { workflow_execution_id: i64, task_name: String, result: TaskResult },
TaskFailed { workflow_execution_id: i64, task_name: String, error: String },
}
```
## API Endpoints
### Workflow Management
```
POST /api/v1/packs/{pack_ref}/workflows - Create workflow
GET /api/v1/packs/{pack_ref}/workflows - List workflows
GET /api/v1/workflows/{workflow_ref} - Get workflow
PUT /api/v1/workflows/{workflow_ref} - Update workflow
DELETE /api/v1/workflows/{workflow_ref} - Delete workflow
POST /api/v1/workflows/{workflow_ref}/execute - Execute workflow directly
GET /api/v1/workflows/{workflow_ref}/executions - List executions
```
### Workflow Execution Monitoring
```
GET /api/v1/workflow-executions/{id} - Get workflow execution
GET /api/v1/workflow-executions/{id}/tasks - List task executions
GET /api/v1/workflow-executions/{id}/graph - Get execution graph
POST /api/v1/workflow-executions/{id}/pause - Pause workflow
POST /api/v1/workflow-executions/{id}/resume - Resume workflow
POST /api/v1/workflow-executions/{id}/cancel - Cancel workflow
POST /api/v1/workflow-executions/{id}/retry - Retry failed workflow
```
## Pack Structure with Workflows
```
packs/
└── my_pack/
├── pack.yaml # Pack metadata
├── config.yaml # Pack configuration schema
├── actions/
│ ├── action1.py
│ ├── action2.py
│ └── action.yaml # Action metadata
├── sensors/
│ ├── sensor1.py
│ └── sensor.yaml
├── workflows/ # NEW
│ ├── deploy.yaml
│ ├── backup.yaml
│ └── migrate.yaml
├── rules/
│ └── on_push.yaml
└── tests/
└── test_workflows.yaml
```
### Workflow Pack Registration
When a pack is registered, the system:
1. Scans `workflows/` directory for `.yaml` files
2. Parses and validates each workflow definition
3. Creates `workflow_definition` record
4. Creates synthetic `action` record with `is_workflow=true`
5. Makes workflow invokable like any other action
## Implementation Plan
### Phase 1: Foundation (Week 1-2)
1. **Database Schema**
- Create migration for workflow tables
- Add workflow columns to action table
- Create indexes
2. **Data Models**
- Add workflow models to `common/src/models.rs`
- Create workflow definition parser
- Create repositories
3. **Template Engine**
- Integrate `tera` crate
- Implement variable context
- Create helper functions
### Phase 2: Core Engine (Week 3-4)
4. **Graph Engine**
- Implement task graph builder
- Implement graph traversal logic
- Handle conditional branching
5. **Workflow Executor**
- Implement workflow execution logic
- Handle task scheduling
- Implement state management
6. **Message Integration**
- Add workflow message handlers
- Integrate with executor service
- Handle task completion events
### Phase 3: Advanced Features (Week 5-6)
7. **Iteration Support**
- Implement `with_items` logic
- Add batching support
- Handle concurrent iterations
8. **Parallel Execution**
- Implement parallel task type
- Handle synchronization
- Aggregate results
9. **Error Handling**
- Implement retry logic
- Handle timeouts
- Implement failure paths
### Phase 4: API & Tools (Week 7-8)
10. **API Endpoints**
- Workflow CRUD operations
- Execution monitoring
- Control operations (pause/resume/cancel)
11. **CLI Support**
- Workflow validation command
- Workflow execution command
- Workflow visualization
12. **Testing & Documentation**
- Unit tests for all components
- Integration tests
- Example workflows
- User documentation
## Testing Strategy
### Unit Tests
- Template rendering with all scope types
- Graph traversal algorithms
- Condition evaluation
- Variable publishing
- Task scheduling logic
### Integration Tests
- End-to-end workflow execution
- Parallel task execution
- Error handling and retry
- Workflow cancellation
- Nested workflow execution
### Example Test Workflows
```yaml
# tests/workflows/simple_sequence.yaml
ref: test.simple_sequence
label: "Simple Sequential Workflow Test"
tasks:
- name: task1
action: core.echo
input:
message: "Task 1"
on_success: task2
- name: task2
action: core.echo
input:
message: "Task 2"
```
## Performance Considerations
1. **Graph Optimization**: Cache parsed workflow graphs
2. **Template Caching**: Cache compiled templates per workflow
3. **Parallel Scheduling**: Schedule independent tasks concurrently
4. **Database Batching**: Batch task creation/updates
5. **Context Serialization**: Use efficient JSON serialization
## Security Considerations
1. **Template Injection**: Sanitize template inputs
2. **Variable Scoping**: Strict scope isolation between workflows
3. **Secret Access**: Only allow `kv.get_secret()` for authorized users
4. **Resource Limits**: Enforce max task count, max depth, max iterations
5. **Audit Trail**: Log all workflow decisions and transitions
## Monitoring & Observability
### Metrics
- Workflow execution duration
- Task execution duration
- Workflow success/failure rate
- Task retry count
- Queue depth for workflow tasks
### Logging
```
INFO Workflow execution started: id=123, workflow=deploy_app
INFO Task scheduled: workflow=123, task=build_image
INFO Task completed: workflow=123, task=build_image, duration=45s
INFO Publishing variables: deployment_id=456
INFO Scheduling next tasks: [deploy_containers, health_check]
INFO Workflow execution completed: id=123, duration=2m30s
```
### Tracing
- Link all tasks to parent workflow execution
- Propagate trace_id through entire workflow
- Enable distributed tracing visualization
## Future Enhancements
1. **Workflow Versioning**: Support multiple versions of same workflow
2. **Workflow Templates**: Reusable workflow patterns
3. **Dynamic Workflows**: Generate workflow graph at runtime
4. **Workflow Marketplace**: Share workflows across organizations
5. **Workflow Testing Framework**: Built-in testing tools
6. **Workflow Debugger**: Step-through execution debugging
7. **Workflow Visualization**: Real-time execution graph UI
8. **Workflow Analytics**: Performance analysis and optimization suggestions
## References
- [StackStorm Orquesta](https://docs.stackstorm.com/orquesta/index.html)
- [Argo Workflows](https://argoproj.github.io/argo-workflows/)
- [AWS Step Functions](https://aws.amazon.com/step-functions/)
- [Temporal Workflows](https://temporal.io/)
## Appendix: Complete Workflow Example
See `docs/examples/complete-workflow.yaml` for a comprehensive example demonstrating all workflow features.