1098 lines
33 KiB
Markdown
1098 lines
33 KiB
Markdown
# Workflow Orchestration Design
|
|
|
|
## Overview
|
|
|
|
This document describes the architecture and implementation plan for **workflow orchestration** in Attune. Workflows enable the composition of multiple actions into complex, conditional execution graphs with variable passing, iteration, and error handling.
|
|
|
|
## Design Philosophy
|
|
|
|
Workflows in Attune follow these core principles:
|
|
|
|
1. **Workflows are Actions**: A workflow is itself an action that can be invoked by rules, other workflows, or directly via API
|
|
2. **YAML-First**: Workflow definitions are declarative YAML files stored in packs
|
|
3. **Event-Driven**: Workflows execute asynchronously via the same message queue infrastructure as regular actions
|
|
4. **Composable**: Workflows can invoke other workflows recursively
|
|
5. **Observable**: Each task in a workflow creates an execution record with full traceability
|
|
6. **Recoverable**: Failed workflows can be resumed or retried from specific tasks
|
|
|
|
## Architecture Components
|
|
|
|
### 1. Workflow Definition Format
|
|
|
|
Workflows are defined as YAML files in pack directories alongside regular actions:
|
|
|
|
```yaml
|
|
# packs/my_pack/workflows/deploy_application.yaml
|
|
ref: my_pack.deploy_application
|
|
label: "Deploy Application Workflow"
|
|
description: "Deploys application with health checks and rollback"
|
|
version: "1.0.0"
|
|
|
|
# Input parameters (like action parameters)
|
|
parameters:
|
|
app_name:
|
|
type: string
|
|
required: true
|
|
description: "Name of the application to deploy"
|
|
version:
|
|
type: string
|
|
required: true
|
|
environment:
|
|
type: string
|
|
enum: [dev, staging, production]
|
|
default: dev
|
|
|
|
# Output schema (what this workflow produces)
|
|
output:
|
|
type: object
|
|
properties:
|
|
deployment_id:
|
|
type: string
|
|
status:
|
|
type: string
|
|
deployed_version:
|
|
type: string
|
|
|
|
# Workflow variables (scoped to this workflow execution)
|
|
vars:
|
|
deployment_id: null
|
|
health_check_url: null
|
|
rollback_required: false
|
|
|
|
# Task graph definition
|
|
tasks:
|
|
# Task 1: Create deployment record
|
|
- name: create_deployment
|
|
action: my_pack.create_deployment_record
|
|
input:
|
|
app_name: "{{ parameters.app_name }}"
|
|
version: "{{ parameters.version }}"
|
|
environment: "{{ parameters.environment }}"
|
|
publish:
|
|
- deployment_id: "{{ task.create_deployment.result.id }}"
|
|
- health_check_url: "{{ task.create_deployment.result.health_url }}"
|
|
on_success: build_image
|
|
on_failure: notify_failure
|
|
|
|
# Task 2: Build container image
|
|
- name: build_image
|
|
action: docker.build_and_push
|
|
input:
|
|
app_name: "{{ parameters.app_name }}"
|
|
version: "{{ parameters.version }}"
|
|
registry: "{{ pack.config.docker_registry }}"
|
|
on_success: deploy_containers
|
|
on_failure: cleanup_deployment
|
|
|
|
# Task 3: Deploy containers
|
|
- name: deploy_containers
|
|
action: kubernetes.apply_deployment
|
|
input:
|
|
app_name: "{{ parameters.app_name }}"
|
|
image: "{{ task.build_image.result.image_uri }}"
|
|
replicas: 3
|
|
on_success: wait_for_ready
|
|
on_failure: rollback_deployment
|
|
|
|
# Task 4: Wait for pods to be ready
|
|
- name: wait_for_ready
|
|
action: kubernetes.wait_for_ready
|
|
input:
|
|
deployment: "{{ parameters.app_name }}"
|
|
timeout: 300
|
|
retry:
|
|
count: 3
|
|
delay: 10
|
|
on_success: health_check
|
|
on_failure: rollback_deployment
|
|
|
|
# Task 5: Perform health check
|
|
- name: health_check
|
|
action: http.get
|
|
input:
|
|
url: "{{ vars.health_check_url }}"
|
|
expected_status: 200
|
|
on_success: update_deployment_status
|
|
on_failure: rollback_deployment
|
|
|
|
# Task 6: Update deployment as successful
|
|
- name: update_deployment_status
|
|
action: my_pack.update_deployment_status
|
|
input:
|
|
deployment_id: "{{ vars.deployment_id }}"
|
|
status: "success"
|
|
on_success: notify_success
|
|
|
|
# Task 7: Rollback on failure
|
|
- name: rollback_deployment
|
|
action: kubernetes.rollback_deployment
|
|
input:
|
|
app_name: "{{ parameters.app_name }}"
|
|
publish:
|
|
- rollback_required: true
|
|
on_complete: cleanup_deployment
|
|
|
|
# Task 8: Cleanup resources
|
|
- name: cleanup_deployment
|
|
action: my_pack.cleanup_resources
|
|
input:
|
|
deployment_id: "{{ vars.deployment_id }}"
|
|
rollback: "{{ vars.rollback_required }}"
|
|
on_complete: notify_failure
|
|
|
|
# Task 9: Success notification
|
|
- name: notify_success
|
|
action: slack.post_message
|
|
input:
|
|
channel: "#deployments"
|
|
message: "✅ Deployed {{ parameters.app_name }} v{{ parameters.version }} to {{ parameters.environment }}"
|
|
|
|
# Task 10: Failure notification
|
|
- name: notify_failure
|
|
action: slack.post_message
|
|
input:
|
|
channel: "#deployments"
|
|
message: "❌ Failed to deploy {{ parameters.app_name }} v{{ parameters.version }}"
|
|
|
|
# Workflow output mapping
|
|
output_map:
|
|
deployment_id: "{{ vars.deployment_id }}"
|
|
status: "{{ task.update_deployment_status.result.status }}"
|
|
deployed_version: "{{ parameters.version }}"
|
|
```
|
|
|
|
### 2. Advanced Workflow Features
|
|
|
|
#### 2.1 Parallel Execution
|
|
|
|
Execute multiple tasks concurrently:
|
|
|
|
```yaml
|
|
tasks:
|
|
- name: parallel_checks
|
|
type: parallel
|
|
tasks:
|
|
- name: check_database
|
|
action: postgres.health_check
|
|
- name: check_redis
|
|
action: redis.ping
|
|
- name: check_rabbitmq
|
|
action: rabbitmq.cluster_status
|
|
# All parallel tasks must complete before proceeding
|
|
on_success: deploy_app
|
|
on_failure: abort_deployment
|
|
```
|
|
|
|
#### 2.2 Iteration (with-items)
|
|
|
|
Iterate over lists with optional batching:
|
|
|
|
```yaml
|
|
tasks:
|
|
# Without batch_size: process items individually (one execution per item)
|
|
- name: deploy_to_regions
|
|
action: cloud.deploy_instance
|
|
with_items: "{{ parameters.regions }}"
|
|
input:
|
|
# item is the individual region value
|
|
region: "{{ item }}"
|
|
instance_type: "{{ parameters.instance_type }}"
|
|
# Creates one execution per item
|
|
on_success: verify_deployments
|
|
|
|
# With batch_size: items split into batches (batch processing)
|
|
- name: process_large_dataset
|
|
action: data.transform
|
|
with_items: "{{ vars.records }}"
|
|
batch_size: 100 # Process 100 items at a time
|
|
concurrency: 5 # Process up to 5 batches concurrently
|
|
input:
|
|
# item is an array containing up to 100 records
|
|
# Last batch may contain fewer items
|
|
records: "{{ item }}"
|
|
publish:
|
|
- total_processed: "{{ task.process_large_dataset.total_count }}"
|
|
```
|
|
|
|
**Batch Processing Behavior**:
|
|
- **Without `batch_size`**: Each item is processed individually (one execution per item)
|
|
- `item` variable contains a single value
|
|
- `index` variable contains the item index (0-based)
|
|
- **With `batch_size`**: Items are split into batches and processed as arrays
|
|
- `item` variable contains an array of items (batch)
|
|
- `index` variable contains the batch index (0-based)
|
|
- Last batch may contain fewer items than `batch_size`
|
|
- Use `batch_size` for efficient bulk processing when actions support arrays
|
|
|
|
#### 2.3 Conditional Execution
|
|
|
|
Execute tasks based on conditions:
|
|
|
|
```yaml
|
|
tasks:
|
|
- name: check_environment
|
|
action: core.noop
|
|
when: "{{ parameters.environment == 'production' }}"
|
|
on_complete: require_approval
|
|
|
|
- name: require_approval
|
|
action: core.inquiry
|
|
input:
|
|
prompt: "Approve production deployment of {{ parameters.app_name }}?"
|
|
schema:
|
|
type: object
|
|
properties:
|
|
approved:
|
|
type: boolean
|
|
on_success: deploy_app
|
|
# Only proceed if approved
|
|
decision:
|
|
- when: "{{ task.require_approval.result.approved == true }}"
|
|
next: deploy_app
|
|
- default: cancel_deployment
|
|
```
|
|
|
|
#### 2.4 Error Handling and Retry
|
|
|
|
```yaml
|
|
tasks:
|
|
- name: flaky_api_call
|
|
action: http.post
|
|
input:
|
|
url: "{{ vars.api_endpoint }}"
|
|
retry:
|
|
count: 5
|
|
delay: 10 # seconds
|
|
backoff: exponential # linear, exponential, constant
|
|
max_delay: 60
|
|
on_error: "{{ task.flaky_api_call.error.type == 'timeout' }}"
|
|
on_success: process_response
|
|
on_failure: log_error
|
|
```
|
|
|
|
#### 2.5 Timeout and Cancellation
|
|
|
|
```yaml
|
|
tasks:
|
|
- name: long_running_task
|
|
action: ml.train_model
|
|
input:
|
|
dataset: "{{ vars.dataset_path }}"
|
|
timeout: 3600 # 1 hour
|
|
on_timeout: cleanup_and_notify
|
|
```
|
|
|
|
#### 2.6 Subworkflows
|
|
|
|
Invoke other workflows:
|
|
|
|
```yaml
|
|
tasks:
|
|
- name: provision_infrastructure
|
|
action: infrastructure.provision_stack # This is also a workflow
|
|
input:
|
|
stack_name: "{{ parameters.app_name }}-{{ parameters.environment }}"
|
|
region: "{{ parameters.region }}"
|
|
on_success: deploy_application
|
|
```
|
|
|
|
## Data Model Changes
|
|
|
|
### New Tables
|
|
|
|
#### 1. `workflow_definition` Table
|
|
|
|
```sql
|
|
CREATE TABLE attune.workflow_definition (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
ref VARCHAR(255) NOT NULL UNIQUE,
|
|
pack BIGINT NOT NULL REFERENCES attune.pack(id) ON DELETE CASCADE,
|
|
pack_ref VARCHAR(255) NOT NULL,
|
|
label VARCHAR(255) NOT NULL,
|
|
description TEXT,
|
|
version VARCHAR(50) NOT NULL,
|
|
|
|
-- Workflow specification (parsed YAML)
|
|
param_schema JSONB,
|
|
out_schema JSONB,
|
|
definition JSONB NOT NULL, -- Full workflow definition
|
|
|
|
-- Metadata
|
|
tags TEXT[] DEFAULT '{}',
|
|
enabled BOOLEAN DEFAULT true,
|
|
|
|
created TIMESTAMPTZ DEFAULT NOW(),
|
|
updated TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_workflow_pack ON attune.workflow_definition(pack);
|
|
CREATE INDEX idx_workflow_enabled ON attune.workflow_definition(enabled);
|
|
```
|
|
|
|
#### 2. `workflow_execution` Table
|
|
|
|
Tracks the state of a workflow execution:
|
|
|
|
```sql
|
|
CREATE TABLE attune.workflow_execution (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
execution BIGINT NOT NULL REFERENCES attune.execution(id) ON DELETE CASCADE,
|
|
workflow_def BIGINT NOT NULL REFERENCES attune.workflow_definition(id),
|
|
|
|
-- Workflow state
|
|
current_tasks TEXT[] DEFAULT '{}', -- Currently executing task names
|
|
completed_tasks TEXT[] DEFAULT '{}',
|
|
failed_tasks TEXT[] DEFAULT '{}',
|
|
|
|
-- Variable context (scoped to this workflow)
|
|
variables JSONB DEFAULT '{}',
|
|
|
|
-- Graph traversal state
|
|
task_graph JSONB NOT NULL, -- Adjacency list representation
|
|
|
|
-- Status tracking
|
|
status attune.execution_status_enum NOT NULL DEFAULT 'requested',
|
|
error_message TEXT,
|
|
|
|
created TIMESTAMPTZ DEFAULT NOW(),
|
|
updated TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_workflow_exec_execution ON attune.workflow_execution(execution);
|
|
CREATE INDEX idx_workflow_exec_status ON attune.workflow_execution(status);
|
|
```
|
|
|
|
#### 3. `workflow_task_execution` Table
|
|
|
|
Tracks individual task executions within a workflow:
|
|
|
|
```sql
|
|
CREATE TABLE attune.workflow_task_execution (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
workflow_execution BIGINT NOT NULL REFERENCES attune.workflow_execution(id) ON DELETE CASCADE,
|
|
execution BIGINT NOT NULL REFERENCES attune.execution(id) ON DELETE CASCADE,
|
|
|
|
task_name VARCHAR(255) NOT NULL,
|
|
task_index INTEGER, -- For with-items iterations
|
|
|
|
-- Status
|
|
status attune.execution_status_enum NOT NULL DEFAULT 'requested',
|
|
started_at TIMESTAMPTZ,
|
|
completed_at TIMESTAMPTZ,
|
|
|
|
-- Results
|
|
result JSONB,
|
|
error JSONB,
|
|
|
|
-- Retry tracking
|
|
retry_count INTEGER DEFAULT 0,
|
|
max_retries INTEGER DEFAULT 0,
|
|
|
|
created TIMESTAMPTZ DEFAULT NOW(),
|
|
updated TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_wf_task_exec_workflow ON attune.workflow_task_execution(workflow_execution);
|
|
CREATE INDEX idx_wf_task_exec_execution ON attune.workflow_task_execution(execution);
|
|
CREATE INDEX idx_wf_task_exec_status ON attune.workflow_task_execution(status);
|
|
```
|
|
|
|
### Modified Tables
|
|
|
|
#### Update `action` Table
|
|
|
|
Add a flag to distinguish workflow actions:
|
|
|
|
```sql
|
|
ALTER TABLE attune.action ADD COLUMN is_workflow BOOLEAN DEFAULT false;
|
|
ALTER TABLE attune.action ADD COLUMN workflow_def BIGINT REFERENCES attune.workflow_definition(id);
|
|
```
|
|
|
|
## Variable Scoping and Templating
|
|
|
|
### Template Engine
|
|
|
|
Use a Jinja2-like template engine (recommend **tera** crate in Rust) for variable interpolation.
|
|
|
|
### Variable Scopes
|
|
|
|
Variables are accessible from multiple scopes in order of precedence:
|
|
|
|
1. **`task.*`** - Results from completed tasks
|
|
- `{{ task.task_name.result.field }}`
|
|
- `{{ task.task_name.status }}`
|
|
- `{{ task.task_name.error }}`
|
|
|
|
2. **`vars.*`** - Workflow-scoped variables
|
|
- `{{ vars.deployment_id }}`
|
|
- `{{ vars.custom_variable }}`
|
|
|
|
3. **`parameters.*`** - Input parameters to the workflow
|
|
- `{{ parameters.app_name }}`
|
|
- `{{ parameters.environment }}`
|
|
|
|
4. **`pack.config.*`** - Pack configuration
|
|
- `{{ pack.config.api_key }}`
|
|
- `{{ pack.config.base_url }}`
|
|
|
|
5. **`system.*`** - System-level variables
|
|
- `{{ system.execution_id }}`
|
|
- `{{ system.timestamp }}`
|
|
- `{{ system.identity.login }}`
|
|
|
|
6. **`kv.*`** - Key-value datastore
|
|
- `{{ kv.get('global.feature_flags.new_ui') }}`
|
|
|
|
### Special Variables for Iteration
|
|
|
|
When using `with_items`:
|
|
|
|
- **`{{ item }}`** - Current item or batch
|
|
- **Without `batch_size`**: Individual item value
|
|
- **With `batch_size`**: Array of items (batch)
|
|
- **`{{ index }}`** - Zero-based index
|
|
- **Without `batch_size`**: Item index
|
|
- **With `batch_size`**: Batch index
|
|
|
|
Example usage:
|
|
```yaml
|
|
# Without batch_size - individual items
|
|
- name: process_one_by_one
|
|
with_items: "{{ parameters.files }}"
|
|
input:
|
|
file: "{{ item }}" # Single filename
|
|
position: "{{ index }}" # Item position
|
|
|
|
# With batch_size - arrays
|
|
- name: process_in_batches
|
|
with_items: "{{ parameters.files }}"
|
|
batch_size: 10
|
|
input:
|
|
files: "{{ item }}" # Array of up to 10 filenames
|
|
batch_num: "{{ index }}" # Batch number
|
|
```
|
|
|
|
### Template Helper Functions
|
|
|
|
```yaml
|
|
# String manipulation
|
|
message: "{{ parameters.app_name | upper }}"
|
|
path: "{{ vars.base_path | trim | append('/logs') }}"
|
|
|
|
# JSON manipulation
|
|
config: "{{ vars.raw_config | from_json }}"
|
|
payload: "{{ vars.data | to_json }}"
|
|
|
|
# List operations
|
|
regions: "{{ parameters.all_regions | filter(enabled=true) }}"
|
|
first_region: "{{ parameters.regions | first }}"
|
|
count: "{{ vars.results | length }}"
|
|
|
|
# Batching helper
|
|
batches: "{{ vars.large_list | batch(size=100) }}"
|
|
|
|
# Conditional helpers
|
|
status: "{{ task.deploy.status | default('unknown') }}"
|
|
url: "{{ vars.host | default('localhost') | prepend('https://') }}"
|
|
|
|
# Key-value store access
|
|
flag: "{{ kv.get('feature.flags.enabled', default=false) }}"
|
|
secret: "{{ kv.get_secret('api.credentials.token') }}"
|
|
```
|
|
|
|
## Workflow Execution Engine
|
|
|
|
### Architecture
|
|
|
|
The workflow execution engine is a new component within the **Executor Service**:
|
|
|
|
```
|
|
┌────────────────────────────────────────────────┐
|
|
│ Executor Service │
|
|
├────────────────────────────────────────────────┤
|
|
│ │
|
|
│ ┌──────────────────┐ ┌───────────────────┐ │
|
|
│ │ Enforcement │ │ Execution │ │
|
|
│ │ Processor │ │ Scheduler │ │
|
|
│ └──────────────────┘ └───────────────────┘ │
|
|
│ │
|
|
│ ┌──────────────────────────────────────────┐ │
|
|
│ │ Workflow Engine (NEW) │ │
|
|
│ ├──────────────────────────────────────────┤ │
|
|
│ │ - Workflow Parser │ │
|
|
│ │ - Graph Executor │ │
|
|
│ │ - Variable Context Manager │ │
|
|
│ │ - Task Scheduler │ │
|
|
│ │ - State Machine │ │
|
|
│ └──────────────────────────────────────────┘ │
|
|
│ │
|
|
└────────────────────────────────────────────────┘
|
|
```
|
|
|
|
### Workflow Lifecycle
|
|
|
|
```
|
|
1. Rule triggers workflow action
|
|
2. Executor recognizes workflow action (is_workflow = true)
|
|
3. Workflow Engine loads workflow definition
|
|
4. Create workflow_execution record
|
|
5. Initialize variable context with parameters
|
|
6. Build task dependency graph
|
|
7. Schedule initial tasks (entry points with no dependencies)
|
|
8. For each task:
|
|
a. Template task inputs using current variable context
|
|
b. Create child execution for the action
|
|
c. Publish execution.scheduled message
|
|
d. Create workflow_task_execution record
|
|
9. Worker executes task and publishes result
|
|
10. Workflow Engine receives execution.completed
|
|
a. Update workflow_task_execution
|
|
b. Publish variables to workflow context
|
|
c. Evaluate next tasks based on transitions
|
|
d. Schedule next tasks or complete workflow
|
|
11. Repeat until all tasks complete or workflow fails
|
|
12. Update parent workflow execution status
|
|
13. Publish workflow.completed event
|
|
```
|
|
|
|
### State Machine
|
|
|
|
```
|
|
┌──────────────┐
|
|
│ Requested │
|
|
└──────┬───────┘
|
|
│
|
|
┌──────▼───────┐
|
|
│ Scheduling │
|
|
└──────┬───────┘
|
|
│
|
|
┌──────▼───────┐
|
|
│ Running │◄────┐
|
|
└──┬───┬───┬───┘ │
|
|
│ │ │ │
|
|
┌────────────┘ │ └─────┐ │
|
|
│ │ │ │
|
|
┌────▼────┐ ┌─────▼─────┐ │ │
|
|
│ Paused │ │ Waiting │───┘ │
|
|
└────┬────┘ │ (for task)│ │
|
|
│ └───────────┘ │
|
|
└─────────────────────────────┘
|
|
|
|
┌────────────┬────────────┐
|
|
│ │ │
|
|
┌────▼────┐ ┌───▼─────┐ ┌──▼────────┐
|
|
│Completed│ │ Failed │ │ Cancelled │
|
|
└─────────┘ └─────────┘ └───────────┘
|
|
```
|
|
|
|
### Core Components (Rust Implementation)
|
|
|
|
#### 1. Workflow Definition Parser
|
|
|
|
```rust
|
|
// crates/executor/src/workflow/parser.rs
|
|
|
|
pub struct WorkflowDefinition {
|
|
pub ref_: String,
|
|
pub label: String,
|
|
pub parameters: JsonSchema,
|
|
pub output: JsonSchema,
|
|
pub vars: HashMap<String, JsonValue>,
|
|
pub tasks: Vec<TaskDefinition>,
|
|
pub output_map: HashMap<String, String>,
|
|
}
|
|
|
|
pub struct TaskDefinition {
|
|
pub name: String,
|
|
pub task_type: TaskType,
|
|
pub action: Option<String>,
|
|
pub input: HashMap<String, String>, // Template strings
|
|
pub publish: Vec<PublishMapping>,
|
|
pub transitions: TaskTransitions,
|
|
pub retry: Option<RetryPolicy>,
|
|
pub timeout: Option<u64>,
|
|
pub when: Option<String>, // Condition template
|
|
pub with_items: Option<String>, // List template
|
|
pub batch_size: Option<usize>,
|
|
}
|
|
|
|
pub enum TaskType {
|
|
Action,
|
|
Parallel,
|
|
Workflow, // Subworkflow
|
|
}
|
|
|
|
pub struct TaskTransitions {
|
|
pub on_success: Option<String>,
|
|
pub on_failure: Option<String>,
|
|
pub on_complete: Option<String>,
|
|
pub on_timeout: Option<String>,
|
|
pub decision: Vec<DecisionBranch>,
|
|
}
|
|
|
|
pub struct DecisionBranch {
|
|
pub when: Option<String>, // Condition template
|
|
pub next: String,
|
|
pub is_default: bool,
|
|
}
|
|
```
|
|
|
|
#### 2. Variable Context Manager
|
|
|
|
```rust
|
|
// crates/executor/src/workflow/context.rs
|
|
|
|
pub struct WorkflowContext {
|
|
pub execution_id: i64,
|
|
pub parameters: JsonValue,
|
|
pub vars: HashMap<String, JsonValue>,
|
|
pub task_results: HashMap<String, TaskResult>,
|
|
pub pack_config: JsonValue,
|
|
pub system: SystemContext,
|
|
}
|
|
|
|
impl WorkflowContext {
|
|
pub fn render_template(&self, template: &str) -> Result<String> {
|
|
// Use tera template engine
|
|
let mut tera = Tera::default();
|
|
let context = self.to_tera_context();
|
|
tera.render_str(template, &context)
|
|
}
|
|
|
|
pub fn publish_variables(&mut self, mappings: &[PublishMapping]) -> Result<()> {
|
|
for mapping in mappings {
|
|
let value = self.render_template(&mapping.template)?;
|
|
self.vars.insert(mapping.var_name.clone(), value);
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
```
|
|
|
|
#### 3. Graph Executor
|
|
|
|
```rust
|
|
// crates/executor/src/workflow/graph.rs
|
|
|
|
pub struct TaskGraph {
|
|
nodes: HashMap<String, TaskNode>,
|
|
edges: HashMap<String, Vec<Edge>>,
|
|
}
|
|
|
|
pub struct TaskNode {
|
|
pub name: String,
|
|
pub definition: TaskDefinition,
|
|
pub status: TaskStatus,
|
|
}
|
|
|
|
pub struct Edge {
|
|
pub from: String,
|
|
pub to: String,
|
|
pub condition: EdgeCondition,
|
|
}
|
|
|
|
pub enum EdgeCondition {
|
|
OnSuccess,
|
|
OnFailure,
|
|
OnComplete,
|
|
OnTimeout,
|
|
Decision(String), // Template condition
|
|
}
|
|
|
|
impl TaskGraph {
|
|
pub fn find_next_tasks(&self, completed_task: &str, result: &TaskResult) -> Vec<String> {
|
|
// Traverse graph based on task result and transitions
|
|
}
|
|
|
|
pub fn get_ready_tasks(&self) -> Vec<&TaskNode> {
|
|
// Find tasks with all dependencies satisfied
|
|
}
|
|
}
|
|
```
|
|
|
|
#### 4. Workflow Executor
|
|
|
|
```rust
|
|
// crates/executor/src/workflow/executor.rs
|
|
|
|
pub struct WorkflowExecutor {
|
|
pool: PgPool,
|
|
publisher: MessagePublisher,
|
|
template_engine: Tera,
|
|
}
|
|
|
|
impl WorkflowExecutor {
|
|
pub async fn execute_workflow(
|
|
&self,
|
|
execution_id: i64,
|
|
workflow_def: WorkflowDefinition,
|
|
parameters: JsonValue,
|
|
) -> Result<()> {
|
|
// 1. Create workflow_execution record
|
|
let wf_exec = self.create_workflow_execution(execution_id, &workflow_def).await?;
|
|
|
|
// 2. Initialize variable context
|
|
let mut context = WorkflowContext::new(execution_id, parameters, &workflow_def);
|
|
|
|
// 3. Build task graph
|
|
let graph = TaskGraph::from_definition(&workflow_def)?;
|
|
|
|
// 4. Schedule initial tasks
|
|
let initial_tasks = graph.get_ready_tasks();
|
|
for task in initial_tasks {
|
|
self.schedule_task(&wf_exec, task, &context).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn handle_task_completion(
|
|
&self,
|
|
wf_execution_id: i64,
|
|
task_name: String,
|
|
result: TaskResult,
|
|
) -> Result<()> {
|
|
// 1. Load workflow execution and context
|
|
let wf_exec = self.load_workflow_execution(wf_execution_id).await?;
|
|
let mut context = self.load_context(&wf_exec).await?;
|
|
|
|
// 2. Update task result in context
|
|
context.task_results.insert(task_name.clone(), result.clone());
|
|
|
|
// 3. Publish variables from task
|
|
let task_def = wf_exec.definition.get_task(&task_name)?;
|
|
context.publish_variables(&task_def.publish)?;
|
|
|
|
// 4. Save updated context
|
|
self.save_context(wf_execution_id, &context).await?;
|
|
|
|
// 5. Determine next tasks
|
|
let next_tasks = wf_exec.graph.find_next_tasks(&task_name, &result);
|
|
|
|
// 6. Schedule next tasks
|
|
for next_task in next_tasks {
|
|
let task_def = wf_exec.definition.get_task(&next_task)?;
|
|
|
|
// Evaluate condition if present
|
|
if let Some(when) = &task_def.when {
|
|
let should_run = context.evaluate_condition(when)?;
|
|
if !should_run {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
self.schedule_task(&wf_exec, task_def, &context).await?;
|
|
}
|
|
|
|
// 7. Check if workflow is complete
|
|
if self.is_workflow_complete(&wf_exec).await? {
|
|
self.complete_workflow(wf_execution_id, &context).await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn schedule_task(
|
|
&self,
|
|
wf_exec: &WorkflowExecution,
|
|
task: &TaskDefinition,
|
|
context: &WorkflowContext,
|
|
) -> Result<()> {
|
|
// Handle with_items (iteration)
|
|
let items = if let Some(with_items_template) = &task.with_items {
|
|
let items_json = context.render_template(with_items_template)?;
|
|
serde_json::from_str::<Vec<JsonValue>>(&items_json)?
|
|
} else {
|
|
vec![JsonValue::Null] // Single execution
|
|
};
|
|
|
|
// Batch items if batch_size specified
|
|
let batches = if let Some(batch_size) = task.batch_size {
|
|
items.chunks(batch_size).collect::<Vec<_>>()
|
|
} else {
|
|
items.iter().map(|i| vec![i]).collect::<Vec<_>>()
|
|
};
|
|
|
|
for (batch_idx, batch) in batches.iter().enumerate() {
|
|
for (item_idx, item) in batch.iter().enumerate() {
|
|
// Create item-specific context
|
|
let mut item_context = context.clone();
|
|
item_context.add_item_vars(item, item_idx, batch, batch_idx);
|
|
|
|
// Render task inputs
|
|
let rendered_input = self.render_task_input(task, &item_context)?;
|
|
|
|
// Create child execution
|
|
let execution = self.create_task_execution(
|
|
wf_exec.id,
|
|
&task.action.unwrap(),
|
|
rendered_input,
|
|
wf_exec.execution,
|
|
).await?;
|
|
|
|
// Create workflow_task_execution record
|
|
self.create_workflow_task_execution(
|
|
wf_exec.id,
|
|
execution.id,
|
|
&task.name,
|
|
item_idx,
|
|
).await?;
|
|
|
|
// Publish execution.scheduled message
|
|
self.publisher.publish_execution_scheduled(execution.id).await?;
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
```
|
|
|
|
### Message Flow
|
|
|
|
#### New Message Types
|
|
|
|
```rust
|
|
// Workflow-specific messages
|
|
pub enum WorkflowMessage {
|
|
WorkflowStarted { execution_id: i64, workflow_id: i64 },
|
|
WorkflowCompleted { execution_id: i64, result: JsonValue },
|
|
WorkflowFailed { execution_id: i64, error: String },
|
|
TaskScheduled { workflow_execution_id: i64, task_name: String },
|
|
TaskCompleted { workflow_execution_id: i64, task_name: String, result: TaskResult },
|
|
TaskFailed { workflow_execution_id: i64, task_name: String, error: String },
|
|
}
|
|
```
|
|
|
|
## API Endpoints
|
|
|
|
### Workflow Management
|
|
|
|
```
|
|
POST /api/v1/packs/{pack_ref}/workflows - Create workflow
|
|
GET /api/v1/packs/{pack_ref}/workflows - List workflows
|
|
GET /api/v1/workflows/{workflow_ref} - Get workflow
|
|
PUT /api/v1/workflows/{workflow_ref} - Update workflow
|
|
DELETE /api/v1/workflows/{workflow_ref} - Delete workflow
|
|
|
|
POST /api/v1/workflows/{workflow_ref}/execute - Execute workflow directly
|
|
GET /api/v1/workflows/{workflow_ref}/executions - List executions
|
|
```
|
|
|
|
### Workflow Execution Monitoring
|
|
|
|
```
|
|
GET /api/v1/workflow-executions/{id} - Get workflow execution
|
|
GET /api/v1/workflow-executions/{id}/tasks - List task executions
|
|
GET /api/v1/workflow-executions/{id}/graph - Get execution graph
|
|
POST /api/v1/workflow-executions/{id}/pause - Pause workflow
|
|
POST /api/v1/workflow-executions/{id}/resume - Resume workflow
|
|
POST /api/v1/workflow-executions/{id}/cancel - Cancel workflow
|
|
POST /api/v1/workflow-executions/{id}/retry - Retry failed workflow
|
|
```
|
|
|
|
## Pack Structure with Workflows
|
|
|
|
```
|
|
packs/
|
|
└── my_pack/
|
|
├── pack.yaml # Pack metadata
|
|
├── config.yaml # Pack configuration schema
|
|
├── actions/
|
|
│ ├── action1.py
|
|
│ ├── action2.py
|
|
│ └── action.yaml # Action metadata
|
|
├── sensors/
|
|
│ ├── sensor1.py
|
|
│ └── sensor.yaml
|
|
├── workflows/ # NEW
|
|
│ ├── deploy.yaml
|
|
│ ├── backup.yaml
|
|
│ └── migrate.yaml
|
|
├── rules/
|
|
│ └── on_push.yaml
|
|
└── tests/
|
|
└── test_workflows.yaml
|
|
```
|
|
|
|
### Workflow Pack Registration
|
|
|
|
When a pack is registered, the system:
|
|
|
|
1. Scans `workflows/` directory for `.yaml` files
|
|
2. Parses and validates each workflow definition
|
|
3. Creates `workflow_definition` record
|
|
4. Creates synthetic `action` record with `is_workflow=true`
|
|
5. Makes workflow invokable like any other action
|
|
|
|
## Implementation Plan
|
|
|
|
### Phase 1: Foundation (Week 1-2)
|
|
|
|
1. **Database Schema**
|
|
- Create migration for workflow tables
|
|
- Add workflow columns to action table
|
|
- Create indexes
|
|
|
|
2. **Data Models**
|
|
- Add workflow models to `common/src/models.rs`
|
|
- Create workflow definition parser
|
|
- Create repositories
|
|
|
|
3. **Template Engine**
|
|
- Integrate `tera` crate
|
|
- Implement variable context
|
|
- Create helper functions
|
|
|
|
### Phase 2: Core Engine (Week 3-4)
|
|
|
|
4. **Graph Engine**
|
|
- Implement task graph builder
|
|
- Implement graph traversal logic
|
|
- Handle conditional branching
|
|
|
|
5. **Workflow Executor**
|
|
- Implement workflow execution logic
|
|
- Handle task scheduling
|
|
- Implement state management
|
|
|
|
6. **Message Integration**
|
|
- Add workflow message handlers
|
|
- Integrate with executor service
|
|
- Handle task completion events
|
|
|
|
### Phase 3: Advanced Features (Week 5-6)
|
|
|
|
7. **Iteration Support**
|
|
- Implement `with_items` logic
|
|
- Add batching support
|
|
- Handle concurrent iterations
|
|
|
|
8. **Parallel Execution**
|
|
- Implement parallel task type
|
|
- Handle synchronization
|
|
- Aggregate results
|
|
|
|
9. **Error Handling**
|
|
- Implement retry logic
|
|
- Handle timeouts
|
|
- Implement failure paths
|
|
|
|
### Phase 4: API & Tools (Week 7-8)
|
|
|
|
10. **API Endpoints**
|
|
- Workflow CRUD operations
|
|
- Execution monitoring
|
|
- Control operations (pause/resume/cancel)
|
|
|
|
11. **CLI Support**
|
|
- Workflow validation command
|
|
- Workflow execution command
|
|
- Workflow visualization
|
|
|
|
12. **Testing & Documentation**
|
|
- Unit tests for all components
|
|
- Integration tests
|
|
- Example workflows
|
|
- User documentation
|
|
|
|
## Testing Strategy
|
|
|
|
### Unit Tests
|
|
|
|
- Template rendering with all scope types
|
|
- Graph traversal algorithms
|
|
- Condition evaluation
|
|
- Variable publishing
|
|
- Task scheduling logic
|
|
|
|
### Integration Tests
|
|
|
|
- End-to-end workflow execution
|
|
- Parallel task execution
|
|
- Error handling and retry
|
|
- Workflow cancellation
|
|
- Nested workflow execution
|
|
|
|
### Example Test Workflows
|
|
|
|
```yaml
|
|
# tests/workflows/simple_sequence.yaml
|
|
ref: test.simple_sequence
|
|
label: "Simple Sequential Workflow Test"
|
|
tasks:
|
|
- name: task1
|
|
action: core.echo
|
|
input:
|
|
message: "Task 1"
|
|
on_success: task2
|
|
- name: task2
|
|
action: core.echo
|
|
input:
|
|
message: "Task 2"
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
1. **Graph Optimization**: Cache parsed workflow graphs
|
|
2. **Template Caching**: Cache compiled templates per workflow
|
|
3. **Parallel Scheduling**: Schedule independent tasks concurrently
|
|
4. **Database Batching**: Batch task creation/updates
|
|
5. **Context Serialization**: Use efficient JSON serialization
|
|
|
|
## Security Considerations
|
|
|
|
1. **Template Injection**: Sanitize template inputs
|
|
2. **Variable Scoping**: Strict scope isolation between workflows
|
|
3. **Secret Access**: Only allow `kv.get_secret()` for authorized users
|
|
4. **Resource Limits**: Enforce max task count, max depth, max iterations
|
|
5. **Audit Trail**: Log all workflow decisions and transitions
|
|
|
|
## Monitoring & Observability
|
|
|
|
### Metrics
|
|
|
|
- Workflow execution duration
|
|
- Task execution duration
|
|
- Workflow success/failure rate
|
|
- Task retry count
|
|
- Queue depth for workflow tasks
|
|
|
|
### Logging
|
|
|
|
```
|
|
INFO Workflow execution started: id=123, workflow=deploy_app
|
|
INFO Task scheduled: workflow=123, task=build_image
|
|
INFO Task completed: workflow=123, task=build_image, duration=45s
|
|
INFO Publishing variables: deployment_id=456
|
|
INFO Scheduling next tasks: [deploy_containers, health_check]
|
|
INFO Workflow execution completed: id=123, duration=2m30s
|
|
```
|
|
|
|
### Tracing
|
|
|
|
- Link all tasks to parent workflow execution
|
|
- Propagate trace_id through entire workflow
|
|
- Enable distributed tracing visualization
|
|
|
|
## Future Enhancements
|
|
|
|
1. **Workflow Versioning**: Support multiple versions of same workflow
|
|
2. **Workflow Templates**: Reusable workflow patterns
|
|
3. **Dynamic Workflows**: Generate workflow graph at runtime
|
|
4. **Workflow Marketplace**: Share workflows across organizations
|
|
5. **Workflow Testing Framework**: Built-in testing tools
|
|
6. **Workflow Debugger**: Step-through execution debugging
|
|
7. **Workflow Visualization**: Real-time execution graph UI
|
|
8. **Workflow Analytics**: Performance analysis and optimization suggestions
|
|
|
|
## References
|
|
|
|
- [StackStorm Orquesta](https://docs.stackstorm.com/orquesta/index.html)
|
|
- [Argo Workflows](https://argoproj.github.io/argo-workflows/)
|
|
- [AWS Step Functions](https://aws.amazon.com/step-functions/)
|
|
- [Temporal Workflows](https://temporal.io/)
|
|
|
|
## Appendix: Complete Workflow Example
|
|
|
|
See `docs/examples/complete-workflow.yaml` for a comprehensive example demonstrating all workflow features. |