Compare commits

...

2 Commits

Author SHA1 Message Date
4df156f210 workflow example 2026-03-04 13:49:14 -06:00
9414ee34e2 artifact demo 2026-03-02 15:56:22 -06:00
9 changed files with 1116 additions and 1 deletions

View File

@@ -21,8 +21,18 @@ This pack exercises as many parts of the Attune SDLC as possible:
| Ref | Description | | Ref | Description |
|-----|-------------| |-----|-------------|
| `python_example.hello` | Returns `"Hello, Python"` — minimal action | | `python_example.hello` | Returns `"Hello, Python"` — minimal action |
| `python_example.http_example` | Uses `requests` to GET `https://example.com` | | `python_example.http_example` | Uses `urllib` to GET `https://example.com` |
| `python_example.read_counter` | Consumes a counter value and returns a formatted message | | `python_example.read_counter` | Consumes a counter value and returns a formatted message |
| `python_example.list_numbers` | Returns a list of sequential integers as JSON |
| `python_example.flaky_fail` | Randomly fails with configurable probability — useful for testing error handling and retry logic |
| `python_example.simulate_work` | Simulates a unit of work with configurable duration, optional failure, and structured output — useful for testing workflows and the timeline visualizer |
| `python_example.artifact_demo` | Creates file and progress artifacts via the Attune API, demonstrating the artifact system |
### Workflows
| Ref | Description |
|-----|-------------|
| `python_example.timeline_demo` | Comprehensive demo workflow exercising parallel fan-out/fan-in, `with_items` concurrency, failure paths, retries, timeouts, publish directives, and custom edge styling — designed to produce a rich Timeline DAG visualization |
### Triggers ### Triggers
@@ -188,6 +198,49 @@ attune action execute python_example.read_counter --param counter=99 --param rul
# Output: {"message": "Counter value is 99 (from rule: test)", ...} # Output: {"message": "Counter value is 99 (from rule: test)", ...}
``` ```
### Test the simulate_work action
```bash
attune action execute python_example.simulate_work \
--param duration_seconds=2.0 --param label=demo
# Output: {"label": "demo", "duration_seconds": 2.003, "requested_seconds": 2.0, "success": true}
# Test failure simulation:
attune action execute python_example.simulate_work \
--param fail=true --param label=crash-test
# Exits non-zero with error on stderr
```
### Run the Timeline Demo workflow
The `timeline_demo` workflow is designed to produce a visually rich Timeline DAG
on the execution detail page. It exercises parallel branches, `with_items`
expansion, failure handling, and custom transition styling.
```bash
# Happy path (all tasks succeed, ~25s total):
attune action execute python_example.timeline_demo
# With more items and faster durations:
attune action execute python_example.timeline_demo \
--param item_count=10 --param item_duration=1.0 --param build_duration=4.0
# Exercise the failure/error-handling path:
attune action execute python_example.timeline_demo \
--param fail_validation=true
# Then open the execution detail page in the Web UI to see the Timeline DAG.
```
**What to look for in the Timeline DAG:**
- **Fan-out** from `initialize` into 3 parallel branches (`build_artifacts`, `run_linter`, `security_scan`) with different durations
- **Fan-in** at `merge_results` with a `join: 3` barrier — the bar starts only after the slowest branch completes
- **`with_items` expansion** at `process_items` — each item appears as a separate child execution bar, with `concurrency: 3` controlling how many run simultaneously
- **Custom edge colors**: indigo for fan-out/merge, green for success, red for failure, orange for timeout/error-handled paths
- **Custom edge labels**: "fan-out", "build ok", "lint clean", "scan clear", "valid ✓", "invalid ✗", etc.
- **Failure path** (when `fail_validation=true`): the DAG shows the red edge from `validate``handle_failure``finalize_failure`
### Enable the rule to start the counter sensor loop ### Enable the rule to start the counter sensor loop
```bash ```bash

331
actions/artifact_demo.py Normal file
View File

@@ -0,0 +1,331 @@
#!/usr/bin/env python3
"""
Artifact Demo Action - Python Example Pack
Demonstrates creating file and progress artifacts via the Attune API.
Each iteration:
1. Appends a line to an in-memory log
2. Updates a progress artifact
3. Sleeps for 0.5 seconds
After all iterations complete, the full log is written directly to the
shared artifact volume as a **single version** of a file artifact with a
stable ref (`python_example.artifact_demo.log`).
File-based artifact flow (single API call):
1. POST /api/v1/artifacts/ref/{ref}/versions/file — upserts the artifact
(creating it if it doesn't exist) and allocates a version number,
returning a relative `file_path`
2. Write the file to $ATTUNE_ARTIFACTS_DIR/{file_path} on the shared volume
No HTTP upload is needed — the worker and action process share an artifact
volume, so the action writes directly to disk.
The progress artifact is still per-execution (ephemeral status indicator).
Parameters:
iterations - Number of iterations to run (default: 50)
visibility - Artifact visibility level for file artifacts: "public" or "private"
(default: "private"). Public artifacts are viewable by all authenticated
users on the platform. Private artifacts are restricted based on their
scope/owner fields.
Note: Progress artifacts always use the server default visibility (public), since
they are informational status indicators that anyone watching an execution should
be able to see. The visibility parameter only controls file artifacts.
"""
import json
import os
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
def api_request(
base_url, path, method="GET", data=None, token=None, content_type="application/json"
):
"""Make an HTTP request to the Attune API."""
url = f"{base_url}{path}"
headers = {"Accept": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
body = None
if data is not None:
if content_type == "application/json":
body = json.dumps(data).encode("utf-8")
headers["Content-Type"] = "application/json"
else:
body = data if isinstance(data, bytes) else data.encode("utf-8")
headers["Content-Type"] = content_type
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8")), resp.status
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8", errors="replace")
print(f"API error {e.code} on {method} {path}: {error_body}", file=sys.stderr)
raise
def allocate_file_version_by_ref(
base_url,
token,
artifact_ref,
execution_id=None,
visibility=None,
content_type="text/plain",
name=None,
description=None,
):
"""Upsert an artifact by ref and allocate a file-backed version.
Single API call that creates the artifact if it doesn't exist (or
reuses the existing one) and allocates a new version with a file_path.
Returns:
(artifact_id, version_id, file_path) tuple
"""
payload = {
"scope": "action",
"owner": "python_example.artifact_demo",
"type": "file_text",
"retention_policy": "versions",
"retention_limit": 10,
"content_type": content_type,
"created_by": "python_example.artifact_demo",
}
if execution_id is not None:
payload["execution"] = execution_id
if visibility is not None:
payload["visibility"] = visibility
if name is not None:
payload["name"] = name
if description is not None:
payload["description"] = description
resp, _ = api_request(
base_url,
f"/api/v1/artifacts/ref/{artifact_ref}/versions/file",
method="POST",
data=payload,
token=token,
)
version_data = resp["data"]
return version_data["artifact"], version_data["id"], version_data["file_path"]
def create_artifact(
base_url,
token,
ref,
name,
artifact_type,
execution_id,
visibility=None,
description=None,
data=None,
):
"""Create a new artifact and return its ID."""
payload = {
"ref": ref,
"scope": "action",
"owner": "python_example.artifact_demo",
"type": artifact_type,
"retention_policy": "versions",
"retention_limit": 10,
"name": name,
"execution": execution_id,
}
if visibility is not None:
payload["visibility"] = visibility
if description is not None:
payload["description"] = description
if data is not None:
payload["data"] = data
resp, _ = api_request(
base_url, "/api/v1/artifacts", method="POST", data=payload, token=token
)
return resp["data"]["id"]
def append_progress(base_url, token, artifact_id, entry):
"""Append an entry to a progress artifact."""
api_request(
base_url,
f"/api/v1/artifacts/{artifact_id}/progress",
method="POST",
data={"entry": entry},
token=token,
)
def main():
start_time = time.time()
try:
# Read parameters from stdin (JSON format)
params = json.loads(sys.stdin.readline())
iterations = params.get("iterations", 50)
visibility = params.get("visibility", "private")
# Validate visibility value
if visibility not in ("public", "private"):
raise ValueError(
f"Invalid visibility '{visibility}': must be 'public' or 'private'"
)
# Get execution context from environment
api_url = os.environ.get("ATTUNE_API_URL", "")
token = os.environ.get("ATTUNE_API_TOKEN", "")
exec_id_str = os.environ.get("ATTUNE_EXEC_ID", "")
artifacts_dir = os.environ.get("ATTUNE_ARTIFACTS_DIR", "")
execution_id = int(exec_id_str) if exec_id_str else None
if not api_url:
raise RuntimeError(
"ATTUNE_API_URL environment variable is not set. "
"This action must be run by the Attune worker."
)
if not token:
raise RuntimeError(
"ATTUNE_API_TOKEN environment variable is not set. "
"This action must be run by the Attune worker."
)
if not artifacts_dir:
raise RuntimeError(
"ATTUNE_ARTIFACTS_DIR environment variable is not set. "
"This action must be run by the Attune worker."
)
print(
f"Artifact demo starting: {iterations} iterations, "
f"visibility={visibility}, API at {api_url}, "
f"artifacts_dir={artifacts_dir}",
file=sys.stderr,
)
# ----------------------------------------------------------------
# File artifact — single call upserts artifact + allocates version
# ----------------------------------------------------------------
file_ref = "python_example.artifact_demo.log"
file_artifact_id, version_id, file_path = allocate_file_version_by_ref(
base_url=api_url,
token=token,
artifact_ref=file_ref,
execution_id=execution_id,
visibility=visibility,
content_type="text/plain",
name="Demo Log",
description="Log output from the artifact demo action",
)
full_file_path = os.path.join(artifacts_dir, file_path)
print(
f"Allocated file artifact ref={file_ref} id={file_artifact_id} "
f"version={version_id} path={file_path}",
file=sys.stderr,
)
# ----------------------------------------------------------------
# Progress artifact — per-execution (ephemeral status indicator)
# ----------------------------------------------------------------
ts = int(time.time())
ref_suffix = f"{execution_id}_{ts}" if execution_id else str(ts)
progress_ref = f"python_example.artifact_demo.progress.{ref_suffix}"
progress_artifact_id = create_artifact(
base_url=api_url,
token=token,
ref=progress_ref,
name="Artifact Demo Progress",
artifact_type="progress",
execution_id=execution_id,
description=f"Progress tracker for artifact demo ({iterations} iterations)",
data=[], # Initialize with empty array
)
print(
f"Created progress artifact ID={progress_artifact_id} ref={progress_ref} "
f"visibility=public (server default)",
file=sys.stderr,
)
# ----------------------------------------------------------------
# Run iterations — collect log lines, write file at the end
# ----------------------------------------------------------------
log_lines = []
for i in range(iterations):
iteration = i + 1
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
pct = min(round(iteration * (100.0 / iterations), 1), 100.0)
# Build log line
log_line = f"[{now}] Iteration {iteration}/{iterations} — progress {pct}%"
log_lines.append(log_line)
print(f" Iteration {iteration}/{iterations} ({pct}%)", file=sys.stderr)
# Append progress entry
append_progress(
api_url,
token,
progress_artifact_id,
{
"iteration": iteration,
"total": iterations,
"percent": pct,
"message": f"Completed iteration {iteration}",
"timestamp": now,
},
)
# Sleep between iterations
if iteration < iterations:
time.sleep(0.5)
# ----------------------------------------------------------------
# Write the complete log file to the shared artifact volume
# ----------------------------------------------------------------
full_log = "\n".join(log_lines) + "\n"
with open(full_file_path, "w", encoding="utf-8") as f:
f.write(full_log)
print(
f"Wrote {len(full_log)} bytes to {full_file_path}",
file=sys.stderr,
)
elapsed = round(time.time() - start_time, 3)
result = {
"file_artifact_id": file_artifact_id,
"file_artifact_ref": file_ref,
"file_version_id": version_id,
"file_path": file_path,
"progress_artifact_id": progress_artifact_id,
"iterations_completed": iterations,
"visibility": visibility,
"elapsed_seconds": elapsed,
"success": True,
}
print(json.dumps(result))
print(f"Artifact demo completed in {elapsed}s", file=sys.stderr)
return 0
except Exception as e:
elapsed = round(time.time() - start_time, 3)
error_result = {
"success": False,
"error": str(e),
"elapsed_seconds": elapsed,
}
print(json.dumps(error_result))
print(f"ERROR: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,82 @@
# Artifact Demo Action
# Demonstrates creating file and progress artifacts via the Attune API
ref: python_example.artifact_demo
label: "Artifact Demo"
description: "Creates a file artifact (written directly to the shared volume) and a progress artifact, updating progress over multiple iterations"
enabled: true
# Runner type determines how the action is executed
runner_type: python
# Minimum Python version required (semver constraint)
runtime_version: ">=3.9"
# Entry point is the Python script to execute
entry_point: artifact_demo.py
# Parameter delivery: stdin for secure parameter passing
parameter_delivery: stdin
parameter_format: json
# Output format: json (structured data parsing enabled)
output_format: json
# Action parameters schema (flat format with inline required/secret)
parameters:
iterations:
type: integer
description: "Number of iterations to run (each adds a log line and updates progress)"
default: 50
minimum: 1
maximum: 200
visibility:
type: string
description: "Artifact visibility level: public (all authenticated users) or private (scope/owner restricted)"
default: "private"
enum:
- "public"
- "private"
# Output schema (flat format)
output_schema:
file_artifact_id:
type: integer
description: "ID of the file artifact"
required: true
file_artifact_ref:
type: string
description: "Stable ref of the file artifact"
required: true
file_version_id:
type: integer
description: "ID of the file artifact version created for this execution"
required: true
file_path:
type: string
description: "Relative path of the file on the shared artifact volume"
required: true
progress_artifact_id:
type: integer
description: "ID of the progress artifact"
required: true
iterations_completed:
type: integer
description: "Number of iterations completed"
required: true
visibility:
type: string
description: "Visibility level that was applied to the file artifact"
required: true
success:
type: boolean
description: "Whether the demo completed successfully"
required: true
# Tags for categorization
tags:
- python
- example
- artifacts
- progress
- demo

52
actions/flaky_fail.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""
Flaky Fail Action - Python Example Pack
A Python action that randomly fails with a configurable probability.
Useful for testing error handling, retry logic, and workflow failure paths.
Actions receive parameters as JSON on stdin and write results to stdout.
"""
import json
import random
import sys
def main():
# Read parameters from stdin (JSON format)
params = json.loads(sys.stdin.readline())
failure_probability = float(params.get("failure_probability", 0.1))
# Clamp to valid range
failure_probability = max(0.0, min(1.0, failure_probability))
roll = random.random()
failed = roll < failure_probability
if failed:
print(
json.dumps(
{
"error": "Random failure triggered",
"failure_probability": failure_probability,
"roll": round(roll, 6),
}
),
file=sys.stderr,
)
sys.exit(1)
print(
json.dumps(
{
"message": "Success! Did not fail this time.",
"failure_probability": failure_probability,
"roll": round(roll, 6),
}
)
)
if __name__ == "__main__":
main()

52
actions/flaky_fail.yaml Normal file
View File

@@ -0,0 +1,52 @@
# Flaky Fail Action
# Randomly fails with a configurable probability, useful for testing error handling and retry logic
ref: python_example.flaky_fail
label: "Flaky Fail"
description: "An action that randomly fails with a configurable probability. Useful for testing error handling, retry logic, and workflow failure paths."
enabled: true
# Runner type determines how the action is executed
runner_type: python
# Minimum Python version required (semver constraint)
runtime_version: ">=3.9"
# Entry point is the Python script to execute
entry_point: flaky_fail.py
# Parameter delivery: stdin for secure parameter passing
parameter_delivery: stdin
parameter_format: json
# Output format: json (structured data parsing enabled)
output_format: json
# Action parameters schema (flat format with inline required/secret)
parameters:
failure_probability:
type: number
description: "Probability of failure between 0.0 (never fail) and 1.0 (always fail)"
default: 0.1
# Output schema (flat format)
output_schema:
succeeded:
type: boolean
description: "Whether the action succeeded (always true when it doesn't fail)"
required: true
roll:
type: number
description: "The random value that was rolled (0.0 to 1.0)"
required: true
threshold:
type: number
description: "The failure probability threshold that was used"
required: true
# Tags for categorization
tags:
- python
- example
- testing
- error-handling

105
actions/simulate_work.py Normal file
View File

@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Simulate Work Action - Python Example Pack
Simulates a unit of work that takes a configurable amount of time.
Returns structured JSON output with timing information and an optional
payload. Useful for testing workflows, the timeline DAG visualizer,
and execution monitoring.
Parameters (via stdin JSON):
duration_seconds - How long to simulate work (float, default: 1.0)
label - A label for this work unit (string, default: "work")
fail - Whether to simulate a failure (bool, default: false)
fail_after - If failing, fail after this many seconds (float, default: 0)
output_data - Arbitrary JSON data to include in the result (default: null)
Output (JSON):
label - The label that was passed in
duration_seconds - Actual elapsed time
requested_seconds - The requested duration
output_data - The pass-through data (if any)
success - Always true on success (failures exit non-zero)
"""
import json
import sys
import time
def main():
params = json.loads(sys.stdin.readline())
duration_seconds = float(params.get("duration_seconds", 1.0))
label = params.get("label", "work")
fail = params.get("fail", False)
fail_after = float(params.get("fail_after", 0))
output_data = params.get("output_data", None)
# Clamp duration to a reasonable range
duration_seconds = max(0.0, min(duration_seconds, 300.0))
print(
f"[simulate_work] Starting '{label}' for {duration_seconds}s", file=sys.stderr
)
start = time.time()
if fail and fail_after > 0:
# Sleep for fail_after seconds then crash
time.sleep(min(fail_after, duration_seconds))
elapsed = round(time.time() - start, 3)
print(
json.dumps(
{
"error": f"Simulated failure in '{label}' after {elapsed}s",
"label": label,
"elapsed": elapsed,
}
),
file=sys.stderr,
)
sys.exit(1)
if fail:
# Immediate failure
print(
json.dumps(
{
"error": f"Simulated immediate failure in '{label}'",
"label": label,
}
),
file=sys.stderr,
)
sys.exit(1)
# Simulate work with periodic progress to stderr
remaining = duration_seconds
while remaining > 0:
chunk = min(remaining, 1.0)
time.sleep(chunk)
remaining -= chunk
elapsed = round(time.time() - start, 3)
pct = min(100.0, round((elapsed / max(duration_seconds, 0.001)) * 100, 1))
print(
f"[simulate_work] '{label}' progress: {pct}% ({elapsed}s)", file=sys.stderr
)
elapsed = round(time.time() - start, 3)
result = {
"label": label,
"duration_seconds": elapsed,
"requested_seconds": duration_seconds,
"success": True,
}
if output_data is not None:
result["output_data"] = output_data
print(json.dumps(result))
print(f"[simulate_work] '{label}' completed in {elapsed}s", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,76 @@
# Simulate Work Action
# Simulates a unit of work with configurable duration and structured output.
# Useful for testing workflows, the timeline DAG visualizer, and execution monitoring.
ref: python_example.simulate_work
label: "Simulate Work"
description: "Simulates a unit of work that takes a configurable amount of time, returning structured JSON with timing info. Supports simulated failures for testing error-handling paths."
enabled: true
# Runner type determines how the action is executed
runner_type: python
# Minimum Python version required (semver constraint)
runtime_version: ">=3.9"
# Entry point is the Python script to execute
entry_point: simulate_work.py
# Parameter delivery: stdin for secure parameter passing
parameter_delivery: stdin
parameter_format: json
# Output format: json (structured data parsing enabled)
output_format: json
# Action parameters schema (flat format with inline required/secret)
parameters:
duration_seconds:
type: number
description: "How long to simulate work in seconds (clamped to 0300)"
default: 1.0
label:
type: string
description: "A human-readable label for this work unit"
default: "work"
fail:
type: boolean
description: "Whether to simulate a failure (exits non-zero)"
default: false
fail_after:
type: number
description: "If failing, wait this many seconds before crashing (0 = immediate)"
default: 0
output_data:
type: object
description: "Arbitrary JSON data to pass through to the result"
# Output schema (flat format)
output_schema:
label:
type: string
description: "The label that was passed in"
required: true
duration_seconds:
type: number
description: "Actual elapsed wall-clock time in seconds"
required: true
requested_seconds:
type: number
description: "The requested duration"
required: true
output_data:
type: object
description: "The pass-through data (if provided)"
success:
type: boolean
description: "Always true on success (failures exit non-zero)"
required: true
# Tags for categorization
tags:
- python
- example
- testing
- workflow
- simulation

View File

@@ -0,0 +1,71 @@
# Timeline Demo Workflow Action
# Action metadata for the timeline_demo workflow definition.
#
# The workflow graph (tasks, transitions, variables) lives in the separate
# workflow file referenced by `workflow_file`. This action YAML controls
# action-level concerns: ref, label, parameter schema, output schema, tags,
# and (in future) execution policies.
#
# Multiple actions can reference the same workflow file with different
# parameter schemas or policy configurations.
ref: python_example.timeline_demo
label: "Timeline DAG Demo"
description: >
A comprehensive demo workflow that exercises every feature of the
Workflow Timeline DAG visualizer: parallel branches with different
durations, with_items fan-out, failure handling paths, publish
directives, retries, timeouts, and custom edge styling.
enabled: true
# Reference to the workflow definition file (relative to actions/ directory)
workflow_file: workflows/timeline_demo.yaml
# Action parameters schema (flat format with inline required/secret)
# These are the inputs exposed when executing this action. They map to
# the workflow's `parameters` block but are authoritative for the action.
parameters:
item_count:
type: integer
description: "Number of items to process in the with_items stage (220)"
default: 5
build_duration:
type: number
description: "Duration of the simulated build step in seconds"
default: 6.0
lint_duration:
type: number
description: "Duration of the simulated lint step in seconds"
default: 3.0
scan_duration:
type: number
description: "Duration of the simulated security scan in seconds"
default: 4.0
item_duration:
type: number
description: "Duration of each with_items work unit in seconds"
default: 2.0
fail_validation:
type: boolean
description: "If true, the validate step will intentionally fail to exercise the error path"
default: false
# Output schema (flat format)
output:
status:
type: string
description: "Final workflow status: 'success' or 'failed'"
items_processed:
type: integer
description: "Number of items that were processed"
total_duration:
type: number
description: "Wall-clock duration of the final task in seconds"
# Tags for categorization
tags:
- demo
- timeline
- workflow
- visualization
- python

View File

@@ -0,0 +1,293 @@
# Timeline Demo Workflow
# Demonstrates various workflow features and exercises the Timeline DAG visualizer.
#
# This is an action-linked workflow file — action-level metadata (ref, label,
# description, parameters, output, tags) is defined in the companion action
# YAML at actions/timeline_demo.yaml. This file contains only the execution
# graph: version, vars, tasks, and output_map.
#
# Features exercised:
# - Sequential task chains
# - Parallel fan-out (3 branches) and fan-in (merge)
# - with_items expansion with concurrency limiting
# - Failure/error handling paths (succeeded/failed/timed_out transitions)
# - Variable-duration tasks for interesting Gantt chart shapes
# - Publish directives passing data between tasks
# - Custom transition labels and colors via __chart_meta__
# - Retry configuration
# - Timeout handling
#
# Expected timeline shape (approximate):
#
# initialize ─┬─► build_artifacts ──────────────────────┐
# ├─► run_linter ──────────┐ ├─► merge_results ─► process_items(×5) ─► validate ─┬─► finalize_success
# └─► security_scan ───────┘ │ └─► handle_failure ─► finalize_failure
# └────────────────┘
version: "1.0.0"
vars:
build_result: null
lint_result: null
scan_result: null
merged_summary: null
items_processed: 0
validation_passed: false
tasks:
# ── Stage 1: Initialize ──────────────────────────────────────────────
- name: initialize
action: python_example.simulate_work
input:
duration_seconds: 1.0
label: "initialize"
output_data:
item_count: "{{ parameters.item_count }}"
started: true
next:
- when: "{{ succeeded() }}"
publish:
- item_count: "{{ parameters.item_count }}"
do:
- build_artifacts
- run_linter
- security_scan
__chart_meta__:
label: "fan-out"
color: "#6366f1"
- when: "{{ failed() }}"
do:
- finalize_failure
__chart_meta__:
label: "init failed"
color: "#ef4444"
# ── Stage 2a: Build artifacts (longest parallel branch) ──────────────
- name: build_artifacts
action: python_example.simulate_work
input:
duration_seconds: "{{ parameters.build_duration }}"
label: "build"
output_data:
artifact: "app-v1.2.3.tar.gz"
size_mb: 48
retry:
count: 2
delay: 1
backoff: constant
next:
- when: "{{ succeeded() }}"
publish:
- build_result: "{{ result() }}"
do:
- merge_results
__chart_meta__:
label: "build ok"
color: "#22c55e"
- when: "{{ failed() }}"
do:
- handle_failure
__chart_meta__:
label: "build failed"
color: "#ef4444"
# ── Stage 2b: Run linter (medium parallel branch) ────────────────────
- name: run_linter
action: python_example.simulate_work
input:
duration_seconds: "{{ parameters.lint_duration }}"
label: "lint"
output_data:
warnings: 3
errors: 0
files_checked: 42
next:
- when: "{{ succeeded() }}"
publish:
- lint_result: "{{ result() }}"
do:
- merge_results
__chart_meta__:
label: "lint clean"
color: "#22c55e"
- when: "{{ failed() }}"
do:
- handle_failure
__chart_meta__:
label: "lint errors"
color: "#ef4444"
# ── Stage 2c: Security scan (short parallel branch) ──────────────────
- name: security_scan
action: python_example.simulate_work
input:
duration_seconds: "{{ parameters.scan_duration }}"
label: "security-scan"
output_data:
vulnerabilities: 0
packages_scanned: 128
timeout: 30
next:
- when: "{{ succeeded() }}"
publish:
- scan_result: "{{ result() }}"
do:
- merge_results
__chart_meta__:
label: "scan clear"
color: "#22c55e"
- when: "{{ failed() }}"
do:
- handle_failure
__chart_meta__:
label: "scan failed"
color: "#ef4444"
- when: "{{ timed_out() }}"
do:
- handle_failure
__chart_meta__:
label: "scan timed out"
color: "#f97316"
# ── Stage 3: Merge results (join from 3 parallel branches) ──────────
- name: merge_results
action: python_example.simulate_work
join: 3
input:
duration_seconds: 1.5
label: "merge"
output_data:
build: "{{ workflow.build_result }}"
lint: "{{ workflow.lint_result }}"
scan: "{{ workflow.scan_result }}"
next:
- when: "{{ succeeded() }}"
publish:
- merged_summary: "{{ result() }}"
do:
- generate_item_list
__chart_meta__:
label: "merged"
color: "#6366f1"
# ── Stage 4a: Generate the item list for with_items ──────────────────
- name: generate_item_list
action: python_example.list_numbers
input:
n: "{{ parameters.item_count }}"
start: 1
next:
- when: "{{ succeeded() }}"
publish:
- number_list: "{{ result().data.items }}"
do:
- process_items
__chart_meta__:
label: "items ready"
color: "#6366f1"
# ── Stage 4b: Process each item (with_items + concurrency) ──────────
- name: process_items
action: python_example.simulate_work
with_items: "{{ workflow.number_list }}"
concurrency: 3
input:
duration_seconds: "{{ parameters.item_duration }}"
label: "item-{{ item }}"
output_data:
item_number: "{{ item }}"
index: "{{ index }}"
next:
- when: "{{ succeeded() }}"
publish:
- items_processed: "{{ parameters.item_count }}"
do:
- validate
__chart_meta__:
label: "all items done"
color: "#22c55e"
- when: "{{ failed() }}"
do:
- handle_failure
__chart_meta__:
label: "item failed"
color: "#ef4444"
# ── Stage 5: Validate everything ─────────────────────────────────────
- name: validate
action: python_example.simulate_work
input:
duration_seconds: 2.0
label: "validate"
fail: "{{ parameters.fail_validation }}"
fail_after: 1.0
output_data:
checks_passed: 12
checks_total: 12
retry:
count: 1
delay: 2
backoff: constant
next:
- when: "{{ succeeded() }}"
publish:
- validation_passed: true
do:
- finalize_success
__chart_meta__:
label: "valid ✓"
color: "#22c55e"
- when: "{{ failed() }}"
publish:
- validation_passed: false
do:
- handle_failure
__chart_meta__:
label: "invalid ✗"
color: "#ef4444"
# ── Terminal: Success ────────────────────────────────────────────────
- name: finalize_success
action: python_example.simulate_work
input:
duration_seconds: 1.0
label: "finalize-success"
output_data:
status: "success"
items_processed: "{{ workflow.items_processed }}"
validation: "{{ workflow.validation_passed }}"
# ── Error path: Handle failure ───────────────────────────────────────
- name: handle_failure
action: python_example.simulate_work
input:
duration_seconds: 1.5
label: "handle-failure"
output_data:
status: "handling_error"
build: "{{ workflow.build_result }}"
lint: "{{ workflow.lint_result }}"
scan: "{{ workflow.scan_result }}"
next:
- when: "{{ succeeded() }}"
do:
- finalize_failure
__chart_meta__:
label: "error handled"
color: "#f97316"
# ── Terminal: Failure ────────────────────────────────────────────────
- name: finalize_failure
action: python_example.simulate_work
input:
duration_seconds: 0.5
label: "finalize-failure"
output_data:
status: "failed"
items_processed: "{{ workflow.items_processed }}"
validation: "{{ workflow.validation_passed }}"
output_map:
status: "{{ 'success' if workflow.validation_passed else 'failed' }}"
items_processed: "{{ workflow.items_processed }}"
total_duration: "{{ task.finalize_success.result.duration_seconds if workflow.validation_passed else task.finalize_failure.result.duration_seconds }}"