Files
attune/docs/guides/workflow-quickstart.md
2026-02-04 17:46:30 -06:00

563 lines
14 KiB
Markdown

# Workflow Orchestration Quick-Start Guide
This guide helps developers get started implementing the workflow orchestration feature in Attune.
## Overview
Workflows are composable YAML-based action graphs that enable complex automation. This implementation adds workflow capabilities to Attune over 5 phases spanning 9 weeks.
## Before You Start
### Required Reading
1. `docs/workflow-orchestration.md` - Full technical design (1,063 lines)
2. `docs/workflow-implementation-plan.md` - Implementation roadmap (562 lines)
3. `docs/workflow-summary.md` - Quick reference (400 lines)
### Required Knowledge
- Rust async programming (tokio)
- PostgreSQL and SQLx
- RabbitMQ message patterns
- Graph algorithms (basic traversal)
- Template engines (Jinja2-style syntax)
### Development Environment
```bash
# Ensure you have:
- Rust 1.70+
- PostgreSQL 14+
- RabbitMQ 3.12+
- Docker (for testing)
# Clone and setup
cd attune
cargo build
```
## Implementation Phases
### Phase 1: Foundation (Weeks 1-2)
**Goal**: Database schema, models, parser, template engine
#### Step 1.1: Database Migration
```bash
# Create migration file
cd migrations
touch 020_workflow_orchestration.sql
```
Copy content from `docs/examples/workflow-migration.sql`:
- 3 new tables: `workflow_definition`, `workflow_execution`, `workflow_task_execution`
- Modify `action` table with `is_workflow` and `workflow_def` columns
- Add indexes, triggers, views
Run migration:
```bash
sqlx migrate run
```
#### Step 1.2: Data Models
Add to `crates/common/src/models.rs`:
```rust
pub mod workflow {
use super::*;
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct WorkflowDefinition {
pub id: Id,
pub r#ref: String,
pub pack: Id,
pub pack_ref: String,
pub label: String,
pub description: Option<String>,
pub version: String,
pub param_schema: Option<JsonSchema>,
pub out_schema: Option<JsonSchema>,
pub definition: JsonValue, // Full workflow YAML as JSON
pub tags: Vec<String>,
pub enabled: bool,
pub created: DateTime<Utc>,
pub updated: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct WorkflowExecution {
pub id: Id,
pub execution: Id,
pub workflow_def: Id,
pub current_tasks: Vec<String>,
pub completed_tasks: Vec<String>,
pub failed_tasks: Vec<String>,
pub skipped_tasks: Vec<String>,
pub variables: JsonValue,
pub task_graph: JsonValue,
pub status: ExecutionStatus,
pub error_message: Option<String>,
pub paused: bool,
pub pause_reason: Option<String>,
pub created: DateTime<Utc>,
pub updated: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct WorkflowTaskExecution {
pub id: Id,
pub workflow_execution: Id,
pub execution: Id,
pub task_name: String,
pub task_index: Option<i32>,
pub task_batch: Option<i32>,
pub status: ExecutionStatus,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub duration_ms: Option<i64>,
pub result: Option<JsonValue>,
pub error: Option<JsonValue>,
pub retry_count: i32,
pub max_retries: i32,
pub next_retry_at: Option<DateTime<Utc>>,
pub timeout_seconds: Option<i32>,
pub timed_out: bool,
pub created: DateTime<Utc>,
pub updated: DateTime<Utc>,
}
}
```
#### Step 1.3: Repositories
Create `crates/common/src/repositories/workflow_definition.rs`:
```rust
use sqlx::PgPool;
use crate::models::workflow::WorkflowDefinition;
use crate::error::Result;
pub struct WorkflowDefinitionRepository;
impl WorkflowDefinitionRepository {
pub async fn create(pool: &PgPool, def: &WorkflowDefinition) -> Result<WorkflowDefinition> {
// INSERT implementation
}
pub async fn find_by_ref(pool: &PgPool, ref_: &str) -> Result<Option<WorkflowDefinition>> {
// SELECT WHERE ref = ?
}
pub async fn list_by_pack(pool: &PgPool, pack_id: i64) -> Result<Vec<WorkflowDefinition>> {
// SELECT WHERE pack = ?
}
// ... other CRUD methods
}
```
Create similar repositories for `workflow_execution` and `workflow_task_execution`.
#### Step 1.4: YAML Parser
Create `crates/executor/src/workflow/parser.rs`:
```rust
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowSpec {
pub r#ref: String,
pub label: String,
pub description: Option<String>,
pub version: String,
pub parameters: Option<serde_json::Value>,
pub output: Option<serde_json::Value>,
pub vars: HashMap<String, serde_json::Value>,
pub tasks: Vec<TaskSpec>,
pub output_map: Option<HashMap<String, String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskSpec {
pub name: String,
#[serde(rename = "type")]
pub task_type: Option<TaskType>,
pub action: Option<String>,
pub input: HashMap<String, String>,
pub publish: Option<Vec<String>>,
pub on_success: Option<String>,
pub on_failure: Option<String>,
pub on_complete: Option<String>,
pub on_timeout: Option<String>,
pub decision: Option<Vec<DecisionBranch>>,
pub when: Option<String>,
pub with_items: Option<String>,
pub batch_size: Option<usize>,
pub retry: Option<RetryPolicy>,
pub timeout: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TaskType {
Action,
Parallel,
Workflow,
}
pub fn parse_workflow_yaml(yaml: &str) -> Result<WorkflowSpec> {
serde_yaml::from_str(yaml)
.map_err(|e| Error::InvalidWorkflowDefinition(e.to_string()))
}
```
#### Step 1.5: Template Engine
Add to `crates/executor/Cargo.toml`:
```toml
[dependencies]
tera = "1.19"
```
Create `crates/executor/src/workflow/context.rs`:
```rust
use tera::{Tera, Context};
use std::collections::HashMap;
use serde_json::Value as JsonValue;
pub struct WorkflowContext {
pub execution_id: i64,
pub parameters: JsonValue,
pub vars: HashMap<String, JsonValue>,
pub task_results: HashMap<String, TaskResult>,
pub pack_config: JsonValue,
pub system: SystemContext,
}
impl WorkflowContext {
pub fn new(execution_id: i64, parameters: JsonValue) -> Self {
Self {
execution_id,
parameters,
vars: HashMap::new(),
task_results: HashMap::new(),
pack_config: JsonValue::Null,
system: SystemContext::default(),
}
}
pub fn render_template(&self, template: &str) -> Result<String> {
let mut tera = Tera::default();
let context = self.to_tera_context();
tera.render_str(template, &context)
.map_err(|e| Error::TemplateError(e.to_string()))
}
fn to_tera_context(&self) -> Context {
let mut ctx = Context::new();
ctx.insert("parameters", &self.parameters);
ctx.insert("vars", &self.vars);
ctx.insert("task", &self.task_results);
ctx.insert("system", &self.system);
ctx.insert("pack", &json!({"config": self.pack_config}));
ctx
}
}
```
**Phase 1 Testing**:
```bash
cargo test -p attune-common workflow
cargo test -p attune-executor workflow::parser
```
---
### Phase 2: Execution Engine (Weeks 3-4)
**Goal**: Graph builder, workflow executor, message handlers
#### Step 2.1: Task Graph
Create `crates/executor/src/workflow/graph.rs`:
```rust
use std::collections::HashMap;
pub struct TaskGraph {
nodes: HashMap<String, TaskNode>,
edges: HashMap<String, Vec<Edge>>,
}
impl TaskGraph {
pub fn from_workflow_spec(spec: &WorkflowSpec) -> Result<Self> {
// Build graph from task definitions
// Create nodes for each task
// Create edges based on transitions (on_success, on_failure, etc.)
}
pub fn get_entry_tasks(&self) -> Vec<&TaskNode> {
// Return tasks with no incoming edges
}
pub fn get_next_tasks(&self, completed_task: &str, result: &TaskResult) -> Vec<&TaskNode> {
// Follow edges based on result (success/failure)
}
}
```
#### Step 2.2: Workflow Executor
Create `crates/executor/src/workflow/executor.rs`:
```rust
pub struct WorkflowExecutor {
pool: PgPool,
publisher: MessagePublisher,
}
impl WorkflowExecutor {
pub async fn execute_workflow(
&self,
execution_id: i64,
workflow_ref: &str,
parameters: JsonValue,
) -> Result<()> {
// 1. Load workflow definition
// 2. Create workflow_execution record
// 3. Initialize context
// 4. Build task graph
// 5. Schedule initial tasks
}
pub async fn handle_task_completion(
&self,
workflow_execution_id: i64,
task_name: String,
result: TaskResult,
) -> Result<()> {
// 1. Update workflow_task_execution
// 2. Publish variables
// 3. Determine next tasks
// 4. Schedule next tasks
// 5. Check if workflow complete
}
}
```
#### Step 2.3: Message Handlers
Integrate with existing executor message loops:
```rust
// In executor/src/main.rs
async fn start_workflow_message_handlers(
pool: PgPool,
publisher: MessagePublisher,
) -> Result<()> {
let executor = WorkflowExecutor::new(pool.clone(), publisher.clone());
// Listen for execution.completed on workflow tasks
let consumer = create_consumer("workflow.task.completions").await?;
consumer.consume_with_handler(move |envelope| {
let executor = executor.clone();
async move {
executor.handle_task_completion(
envelope.payload.workflow_execution_id,
envelope.payload.task_name,
envelope.payload.result,
).await
}
}).await?;
Ok(())
}
```
**Phase 2 Testing**:
```bash
cargo test -p attune-executor workflow::graph
cargo test -p attune-executor workflow::executor
```
---
### Phase 3: Advanced Features (Weeks 5-6)
**Goal**: Iteration, parallelism, retry, conditionals
#### Step 3.1: Iteration
Create `crates/executor/src/workflow/iterator.rs`:
```rust
pub struct TaskIterator {
items: Vec<JsonValue>,
batch_size: Option<usize>,
}
impl TaskIterator {
pub fn from_template(
template: &str,
context: &WorkflowContext,
batch_size: Option<usize>,
) -> Result<Self> {
let rendered = context.render_template(template)?;
let items: Vec<JsonValue> = serde_json::from_str(&rendered)?;
Ok(Self { items, batch_size })
}
pub fn batches(&self) -> Vec<Vec<&JsonValue>> {
if let Some(size) = self.batch_size {
self.items.chunks(size).map(|c| c.iter().collect()).collect()
} else {
vec![self.items.iter().collect()]
}
}
}
```
#### Step 3.2: Parallel Execution
Create `crates/executor/src/workflow/parallel.rs`:
```rust
pub struct ParallelExecutor {
// Execute multiple tasks simultaneously
// Wait for all to complete
// Aggregate results
}
```
#### Step 3.3: Retry Logic
Create `crates/executor/src/workflow/retry.rs`:
```rust
pub struct RetryHandler {
// Exponential/linear/constant backoff
// Max retries
// Condition evaluation
}
```
---
### Phase 4: API & Tools (Weeks 7-8)
**Goal**: REST endpoints, validation, pack integration
#### Step 4.1: API Routes
Create `crates/api/src/routes/workflows.rs`:
```rust
pub fn workflow_routes() -> Router {
Router::new()
.route("/packs/:pack_ref/workflows", post(create_workflow))
.route("/packs/:pack_ref/workflows", get(list_workflows))
.route("/workflows/:workflow_ref", get(get_workflow))
.route("/workflows/:workflow_ref", put(update_workflow))
.route("/workflows/:workflow_ref", delete(delete_workflow))
.route("/workflows/:workflow_ref/execute", post(execute_workflow))
}
```
#### Step 4.2: Pack Integration
Update pack registration to scan `workflows/` directory:
```rust
// In pack registration logic
async fn register_workflows_in_pack(pool: &PgPool, pack_id: i64, pack_path: &Path) -> Result<()> {
let workflows_dir = pack_path.join("workflows");
if !workflows_dir.exists() {
return Ok(());
}
for entry in std::fs::read_dir(workflows_dir)? {
let path = entry?.path();
if path.extension() == Some("yaml".as_ref()) {
let yaml = std::fs::read_to_string(&path)?;
let spec = parse_workflow_yaml(&yaml)?;
// Create workflow_definition
// Create synthetic action with is_workflow=true
}
}
Ok(())
}
```
---
### Phase 5: Testing & Documentation (Week 9)
**Goal**: Comprehensive tests and documentation
#### Integration Tests
Create `crates/executor/tests/workflow_integration.rs`:
```rust
#[tokio::test]
async fn test_simple_sequential_workflow() {
// Test basic workflow execution
}
#[tokio::test]
async fn test_parallel_execution() {
// Test parallel tasks
}
#[tokio::test]
async fn test_conditional_branching() {
// Test decision trees
}
#[tokio::test]
async fn test_iteration_with_batching() {
// Test with-items
}
```
---
## Development Tips
### Debugging Workflows
```bash
# Enable debug logging
RUST_LOG=attune_executor::workflow=debug cargo run
# Watch workflow execution
psql -d attune -c "SELECT * FROM attune.workflow_execution_summary;"
# Check task status
psql -d attune -c "SELECT * FROM attune.workflow_task_detail WHERE workflow_execution = ?;"
```
### Testing YAML Parsing
```bash
# Validate workflow YAML
cargo run --bin attune-cli -- workflow validate path/to/workflow.yaml
```
### Common Pitfalls
1. **Circular Dependencies**: Validate graph for cycles
2. **Template Errors**: Always handle template rendering failures
3. **Variable Scope**: Test all 6 scopes independently
4. **Message Ordering**: Ensure task completions processed in order
5. **Resource Limits**: Enforce max tasks/depth/iterations
---
## Resources
- **Design Docs**: `docs/workflow-*.md`
- **Examples**: `docs/examples/simple-workflow.yaml`, `complete-workflow.yaml`
- **Migration**: `docs/examples/workflow-migration.sql`
- **TODO Tasks**: `work-summary/TODO.md` Phase 8.1
---
## Getting Help
- Review full design: `docs/workflow-orchestration.md`
- Check implementation plan: `docs/workflow-implementation-plan.md`
- See examples: `docs/examples/`
- Ask questions in project discussions
---
**Ready to start? Begin with Phase 1, Step 1.1: Database Migration**