Files
attune/docs/workflows/workflow-implementation-plan.md
2026-02-04 17:46:30 -06:00

18 KiB

Workflow Orchestration Implementation Plan

Executive Summary

This document outlines the implementation plan for adding workflow orchestration capabilities to Attune. Workflows enable composing multiple actions into complex, conditional execution graphs with variable passing, iteration, and error handling.

Key Design Decisions

1. Workflows as Actions

Workflows are first-class actions that can be:

  • Triggered by rules (event-driven)
  • Invoked by other workflows (composable)
  • Executed directly via API
  • Referenced in the same way as regular actions

2. YAML-Based Definition

Workflows are defined declaratively in YAML files within pack directories, making them:

  • Version-controllable
  • Human-readable
  • Easy to author and maintain
  • Portable across environments

3. Event-Driven Execution

Workflows leverage the existing message queue infrastructure:

  • Each task creates a child execution
  • Tasks execute asynchronously via workers
  • Progress is tracked via execution status messages
  • No blocking or polling required

4. Multi-Scope Variable System

Variables are accessible from 6 scopes (in precedence order):

  1. task.* - Results from completed tasks
  2. vars.* - Workflow-scoped variables
  3. parameters.* - Input parameters
  4. pack.config.* - Pack configuration
  5. system.* - System variables (execution_id, timestamp, identity)
  6. kv.* - Key-value datastore

Architecture Overview

┌────────────────────────────────────────────────────────────┐
│                    Attune Platform                          │
├────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────┐  ┌────────────┐  ┌──────────────────┐    │
│  │ API Service │  │  Executor  │  │ Worker Service   │    │
│  │             │  │  Service   │  │                  │    │
│  │  Workflow   │  │            │  │  Runtime Engine  │    │
│  │  CRUD       │  │ ┌────────┐ │  │                  │    │
│  │             │  │ │Workflow│ │  │  Execute Actions │    │
│  │             │  │ │Engine  │ │  │                  │    │
│  └─────────────┘  │ │        │ │  └──────────────────┘    │
│                    │ │- Parser│ │                          │
│                    │ │- Graph │ │                          │
│                    │ │- Context│ │                         │
│                    │ │- Sched │ │                          │
│                    │ └────────┘ │                          │
│                    └────────────┘                          │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐  │
│  │           PostgreSQL Database                        │  │
│  │  - workflow_definition                               │  │
│  │  - workflow_execution                                │  │
│  │  - workflow_task_execution                           │  │
│  └──────────────────────────────────────────────────────┘  │
│                                                              │
└────────────────────────────────────────────────────────────┘

Database Schema Changes

New Tables

  1. workflow_definition

    • Stores parsed workflow YAML as JSON
    • Links to pack
    • Contains parameter/output schemas
    • Full task graph definition
  2. workflow_execution

    • Tracks runtime state of workflow
    • Stores variable context
    • Maintains task completion tracking
    • Links to parent execution
  3. workflow_task_execution

    • Individual task execution tracking
    • Supports iteration (with-items)
    • Retry tracking
    • Result storage

Modified Tables

  • action table gets two new columns:
    • is_workflow (boolean)
    • workflow_def (foreign key)

Core Features

1. Sequential Execution

Tasks execute one after another based on transitions:

tasks:
  - name: task1
    action: pack.action1
    on_success: task2
  - name: task2
    action: pack.action2

2. Parallel Execution

Multiple tasks execute concurrently:

tasks:
  - name: parallel_checks
    type: parallel
    tasks:
      - name: check_db
        action: db.health
      - name: check_cache
        action: cache.health
    on_success: deploy

3. Conditional Branching

Execute tasks based on conditions:

tasks:
  - name: check_env
    action: core.noop
    decision:
      - when: "{{ parameters.env == 'production' }}"
        next: require_approval
      - default: deploy_directly

4. Iteration (with-items)

Process lists with optional batching:

tasks:
  - name: deploy_regions
    action: deploy.to_region
    with_items: "{{ parameters.regions }}"
    batch_size: 5  # Process 5 at a time
    input:
      region: "{{ item }}"

5. Variable Publishing

Tasks can publish results to workflow scope:

tasks:
  - name: create_resource
    action: cloud.create
    publish:
      - resource_id: "{{ task.create_resource.result.id }}"
      - resource_url: "{{ task.create_resource.result.url }}"

6. Error Handling & Retry

Built-in retry with backoff:

tasks:
  - name: flaky_task
    action: http.request
    retry:
      count: 5
      delay: 10
      backoff: exponential
    on_success: next_task
    on_failure: cleanup_task

7. Human-in-the-Loop

Integrate inquiry (approval) steps:

tasks:
  - name: require_approval
    action: core.inquiry
    input:
      prompt: "Approve deployment?"
      schema:
        type: object
        properties:
          approved:
            type: boolean
    decision:
      - when: "{{ task.require_approval.result.approved }}"
        next: deploy
      - default: cancel

8. Nested Workflows

Workflows can invoke other workflows:

tasks:
  - name: provision_infra
    action: infrastructure.full_stack  # This is also a workflow
    input:
      environment: "{{ parameters.env }}"

Template System

Template Engine: Tera (Rust)

  • Jinja2-like syntax
  • Variable interpolation: {{ vars.name }}
  • Filters: {{ text | upper }}
  • Conditionals: {{ value ? 'yes' : 'no' }}

Helper Functions

# String operations
message: "{{ parameters.name | upper | trim }}"

# List operations
first: "{{ vars.list | first }}"
count: "{{ vars.list | length }}"

# JSON operations
parsed: "{{ vars.json_string | from_json }}"

# Batching
batches: "{{ vars.items | batch(size=100) }}"

# Key-value store
value: "{{ kv.get('config.key', default='fallback') }}"
secret: "{{ kv.get_secret('api.token') }}"

Workflow Lifecycle

1. Rule/API triggers workflow action
   ↓
2. Executor detects is_workflow=true
   ↓
3. Load workflow_definition from database
   ↓
4. Create workflow_execution record
   ↓
5. Initialize variable context with parameters
   ↓
6. Build task dependency graph
   ↓
7. Schedule initial tasks (entry points)
   ↓
8. For each task:
   a. Template task inputs
   b. Create child execution
   c. Create workflow_task_execution record
   d. Publish execution.scheduled message
   ↓
9. Worker executes task, publishes result
   ↓
10. Workflow Engine receives completion:
    a. Update workflow_task_execution
    b. Publish variables to context
    c. Evaluate transitions
    d. Schedule next tasks
    ↓
11. Repeat until all tasks complete
    ↓
12. Update workflow_execution status
    ↓
13. Publish workflow.completed event

Implementation Phases

Phase 1: Foundation (2 weeks)

Goal: Core data structures and parsing

  • Database migration for workflow tables
  • Add workflow models to common/src/models.rs
  • Create workflow repositories
  • Implement YAML parser for workflow definitions
  • Integrate Tera template engine
  • Create variable context manager

Deliverables:

  • Migration file: migrations/020_workflow_orchestration.sql
  • Models: common/src/models/workflow.rs
  • Repositories: common/src/repositories/workflow*.rs
  • Parser: executor/src/workflow/parser.rs
  • Context: executor/src/workflow/context.rs

Phase 2: Execution Engine (2 weeks)

Goal: Core workflow execution logic

  • Implement task graph builder
  • Implement graph traversal logic
  • Create workflow executor service
  • Add workflow message handlers
  • Implement task scheduling
  • Handle task completion events

Deliverables:

  • Graph engine: executor/src/workflow/graph.rs
  • Executor: executor/src/workflow/executor.rs
  • Message handlers: executor/src/workflow/messages.rs
  • State machine: executor/src/workflow/state.rs

Phase 3: Advanced Features (2 weeks)

Goal: Iteration, parallelism, error handling

  • Implement with-items iteration
  • Add batching support
  • Implement parallel task execution
  • Add retry logic with backoff
  • Implement timeout handling
  • Add conditional branching (decision trees)

Deliverables:

  • Iterator: executor/src/workflow/iterator.rs
  • Parallel executor: executor/src/workflow/parallel.rs
  • Retry handler: executor/src/workflow/retry.rs

Phase 4: API & Tools (2 weeks)

Goal: Management interface and tooling

  • Workflow CRUD API endpoints
  • Workflow execution monitoring API
  • Control operations (pause/resume/cancel)
  • Workflow validation CLI command
  • Workflow visualization endpoint
  • Pack registration workflow scanning

Deliverables:

  • API routes: api/src/routes/workflows.rs
  • API handlers: api/src/handlers/workflows.rs
  • CLI commands: cli/src/commands/workflow.rs (future)
  • Documentation updates

Phase 5: Testing & Documentation (1 week)

Goal: Comprehensive testing and docs

  • Unit tests for all components
  • Integration tests for workflows
  • Example workflows (simple, complex, failure cases)
  • User documentation
  • API documentation
  • Migration guide

Deliverables:

  • Test suite: executor/tests/workflow_tests.rs
  • Examples: docs/examples/workflow-*.yaml
  • User guide: docs/workflow-user-guide.md
  • Migration guide: docs/workflow-migration.md

Total Timeline: 9 Weeks

Testing Strategy

Unit Tests

  • Template rendering with all scope types
  • Graph construction and traversal
  • Condition evaluation
  • Variable publishing
  • Task scheduling logic
  • Retry logic
  • Timeout handling

Integration Tests

  • Simple sequential workflow
  • Parallel execution workflow
  • Conditional branching workflow
  • Iteration workflow (with batching)
  • Error handling and retry
  • Nested workflow execution
  • Workflow cancellation
  • Long-running workflow

Example Test Workflows

Located in docs/examples/:

  • simple-workflow.yaml - Basic sequential flow
  • complete-workflow.yaml - All features demonstrated
  • parallel-workflow.yaml - Parallel execution
  • conditional-workflow.yaml - Branching logic
  • iteration-workflow.yaml - with-items examples

API Endpoints

Workflow Management

POST   /api/v1/packs/{pack_ref}/workflows          - Create workflow
GET    /api/v1/packs/{pack_ref}/workflows          - List workflows in pack
GET    /api/v1/workflows                           - List all workflows
GET    /api/v1/workflows/{workflow_ref}            - Get workflow definition
PUT    /api/v1/workflows/{workflow_ref}            - Update workflow
DELETE /api/v1/workflows/{workflow_ref}            - Delete workflow
POST   /api/v1/workflows/{workflow_ref}/execute    - Execute workflow directly
POST   /api/v1/workflows/{workflow_ref}/validate   - Validate workflow definition

Workflow Execution Management

GET    /api/v1/workflow-executions                 - List workflow executions
GET    /api/v1/workflow-executions/{id}            - Get workflow execution details
GET    /api/v1/workflow-executions/{id}/tasks      - List task executions
GET    /api/v1/workflow-executions/{id}/graph      - Get execution graph (visualization)
GET    /api/v1/workflow-executions/{id}/context    - Get variable context
POST   /api/v1/workflow-executions/{id}/pause      - Pause workflow
POST   /api/v1/workflow-executions/{id}/resume     - Resume paused workflow
POST   /api/v1/workflow-executions/{id}/cancel     - Cancel workflow
POST   /api/v1/workflow-executions/{id}/retry      - Retry failed workflow

Pack Structure with Workflows

packs/
└── my_pack/
    ├── pack.yaml               # Pack metadata
    ├── config.yaml             # Pack configuration schema
    ├── actions/
    │   ├── action1.py
    │   ├── action2.py
    │   └── action.yaml
    ├── sensors/
    │   ├── sensor1.py
    │   └── sensor.yaml
    ├── workflows/              # NEW: Workflow definitions
    │   ├── deploy.yaml
    │   ├── backup.yaml
    │   ├── migrate.yaml
    │   └── rollback.yaml
    ├── rules/
    │   └── on_push.yaml
    └── tests/
        ├── test_actions.py
        └── test_workflows.yaml  # Workflow test definitions

Pack Registration Process

When a pack is registered:

  1. Scan workflows/ directory for .yaml files
  2. Parse and validate each workflow definition
  3. Create workflow_definition record in database
  4. Create synthetic action record with is_workflow=true
  5. Link action to workflow via workflow_def foreign key
  6. Workflow is now invokable like any other action

Performance Considerations

Optimizations

  1. Graph Caching: Cache parsed task graphs per workflow definition
  2. Template Compilation: Compile templates once, reuse for iterations
  3. Parallel Scheduling: Schedule independent tasks concurrently
  4. Database Batching: Batch task creation/updates when using with-items
  5. Context Serialization: Use efficient JSON serialization for variable context

Resource Limits

  • Max workflow depth: 10 levels (prevent infinite recursion)
  • Max tasks per workflow: 1000 (prevent resource exhaustion)
  • Max iterations per with-items: 10,000 (configurable)
  • Max parallel tasks: 100 (configurable)
  • Variable context size: 10MB (prevent memory issues)

Security Considerations

  1. Template Injection: Sanitize all template inputs, no arbitrary code execution
  2. Variable Scoping: Strict isolation between workflow executions
  3. Secret Access: Only allow kv.get_secret() for authorized identities
  4. Resource Limits: Enforce max task count, depth, iterations
  5. Audit Trail: Log all workflow decisions, transitions, variable changes
  6. RBAC: Workflow execution requires action execution permissions
  7. Input Validation: Validate parameters against param_schema

Monitoring & Observability

Metrics to Track

  • Workflow executions per second
  • Average workflow duration
  • Task execution duration (p50, p95, p99)
  • Workflow success/failure rates
  • Task retry counts
  • Queue depth for workflow tasks
  • Variable context size distribution

Logging Standards

INFO  [workflow.start] execution=123 workflow=deploy_app version=1.0.0
INFO  [workflow.task.schedule] execution=123 task=build_image
INFO  [workflow.task.complete] execution=123 task=build_image duration=45s
INFO  [workflow.vars.publish] execution=123 vars=["image_uri"]
INFO  [workflow.task.schedule] execution=123 tasks=["deploy","health_check"]
WARN  [workflow.task.retry] execution=123 task=flaky_api attempt=2
ERROR [workflow.task.failed] execution=123 task=deploy_db error="connection_timeout"
INFO  [workflow.complete] execution=123 status=success duration=2m30s

Distributed Tracing

  • Propagate trace_id through entire workflow
  • Link all task executions to parent workflow
  • Enable end-to-end request tracing
  • Integration with OpenTelemetry (future)

Dependencies

New Rust Crates

  • tera (^1.19) - Template engine
  • petgraph (^0.6) - Graph data structures and algorithms

Existing Dependencies

  • sqlx - Database access
  • serde/serde_json - Serialization
  • tokio - Async runtime
  • lapin - RabbitMQ client

Future Enhancements

Short Term (3-6 months)

  • Workflow versioning (multiple versions of same workflow)
  • Workflow pausing/resuming with state persistence
  • Advanced retry strategies (circuit breaker, adaptive)
  • Workflow templates (reusable patterns)

Medium Term (6-12 months)

  • Dynamic workflows (generate graph at runtime)
  • Workflow debugging tools (step-through execution)
  • Performance analytics and optimization suggestions
  • Workflow marketplace (share workflows)

Long Term (12+ months)

  • Visual workflow editor (drag-and-drop UI)
  • AI-powered workflow generation
  • Workflow optimization recommendations
  • Multi-cloud orchestration patterns

Success Criteria

This implementation will be considered successful when:

  1. Workflows can be defined in YAML and registered via packs
  2. Workflows execute reliably with all features working
  3. Variables are properly scoped and templated across all scopes
  4. Parallel execution works correctly with proper synchronization
  5. Iteration handles lists efficiently with batching
  6. Error handling and retry work as specified
  7. Human-in-the-loop (inquiry) tasks integrate seamlessly
  8. Nested workflows execute correctly
  9. API provides full CRUD and control operations
  10. Comprehensive tests cover all features
  11. Documentation enables users to create workflows easily

References

  • Full design: docs/workflow-orchestration.md
  • Simple example: docs/examples/simple-workflow.yaml
  • Complex example: docs/examples/complete-workflow.yaml
  • Migration SQL: docs/examples/workflow-migration.sql

Next Steps

  1. Review this plan with stakeholders
  2. Prioritize features if timeline needs adjustment
  3. Set up project tracking (GitHub issues/milestones)
  4. Begin Phase 1 implementation
  5. Schedule weekly progress reviews