Compare commits

..

1 Commits

Author SHA1 Message Date
4df156f210 workflow example 2026-03-04 13:49:14 -06:00
9 changed files with 865 additions and 112 deletions

View File

@@ -21,8 +21,18 @@ This pack exercises as many parts of the Attune SDLC as possible:
| Ref | Description | | Ref | Description |
|-----|-------------| |-----|-------------|
| `python_example.hello` | Returns `"Hello, Python"` — minimal action | | `python_example.hello` | Returns `"Hello, Python"` — minimal action |
| `python_example.http_example` | Uses `requests` to GET `https://example.com` | | `python_example.http_example` | Uses `urllib` to GET `https://example.com` |
| `python_example.read_counter` | Consumes a counter value and returns a formatted message | | `python_example.read_counter` | Consumes a counter value and returns a formatted message |
| `python_example.list_numbers` | Returns a list of sequential integers as JSON |
| `python_example.flaky_fail` | Randomly fails with configurable probability — useful for testing error handling and retry logic |
| `python_example.simulate_work` | Simulates a unit of work with configurable duration, optional failure, and structured output — useful for testing workflows and the timeline visualizer |
| `python_example.artifact_demo` | Creates file and progress artifacts via the Attune API, demonstrating the artifact system |
### Workflows
| Ref | Description |
|-----|-------------|
| `python_example.timeline_demo` | Comprehensive demo workflow exercising parallel fan-out/fan-in, `with_items` concurrency, failure paths, retries, timeouts, publish directives, and custom edge styling — designed to produce a rich Timeline DAG visualization |
### Triggers ### Triggers
@@ -188,6 +198,49 @@ attune action execute python_example.read_counter --param counter=99 --param rul
# Output: {"message": "Counter value is 99 (from rule: test)", ...} # Output: {"message": "Counter value is 99 (from rule: test)", ...}
``` ```
### Test the simulate_work action
```bash
attune action execute python_example.simulate_work \
--param duration_seconds=2.0 --param label=demo
# Output: {"label": "demo", "duration_seconds": 2.003, "requested_seconds": 2.0, "success": true}
# Test failure simulation:
attune action execute python_example.simulate_work \
--param fail=true --param label=crash-test
# Exits non-zero with error on stderr
```
### Run the Timeline Demo workflow
The `timeline_demo` workflow is designed to produce a visually rich Timeline DAG
on the execution detail page. It exercises parallel branches, `with_items`
expansion, failure handling, and custom transition styling.
```bash
# Happy path (all tasks succeed, ~25s total):
attune action execute python_example.timeline_demo
# With more items and faster durations:
attune action execute python_example.timeline_demo \
--param item_count=10 --param item_duration=1.0 --param build_duration=4.0
# Exercise the failure/error-handling path:
attune action execute python_example.timeline_demo \
--param fail_validation=true
# Then open the execution detail page in the Web UI to see the Timeline DAG.
```
**What to look for in the Timeline DAG:**
- **Fan-out** from `initialize` into 3 parallel branches (`build_artifacts`, `run_linter`, `security_scan`) with different durations
- **Fan-in** at `merge_results` with a `join: 3` barrier — the bar starts only after the slowest branch completes
- **`with_items` expansion** at `process_items` — each item appears as a separate child execution bar, with `concurrency: 3` controlling how many run simultaneously
- **Custom edge colors**: indigo for fan-out/merge, green for success, red for failure, orange for timeout/error-handled paths
- **Custom edge labels**: "fan-out", "build ok", "lint clean", "scan clear", "valid ✓", "invalid ✗", etc.
- **Failure path** (when `fail_validation=true`): the DAG shows the red edge from `validate``handle_failure``finalize_failure`
### Enable the rule to start the counter sensor loop ### Enable the rule to start the counter sensor loop
```bash ```bash

View File

@@ -4,12 +4,35 @@ Artifact Demo Action - Python Example Pack
Demonstrates creating file and progress artifacts via the Attune API. Demonstrates creating file and progress artifacts via the Attune API.
Each iteration: Each iteration:
1. Appends a timestamped log line to a file artifact (via version upload) 1. Appends a line to an in-memory log
2. Updates a progress artifact by 2% 2. Updates a progress artifact
3. Sleeps for 0.5 seconds 3. Sleeps for 0.5 seconds
The action authenticates to the API using the provided credentials (or defaults) After all iterations complete, the full log is written directly to the
and uses ATTUNE_API_URL / ATTUNE_EXEC_ID environment variables set by the worker. shared artifact volume as a **single version** of a file artifact with a
stable ref (`python_example.artifact_demo.log`).
File-based artifact flow (single API call):
1. POST /api/v1/artifacts/ref/{ref}/versions/file — upserts the artifact
(creating it if it doesn't exist) and allocates a version number,
returning a relative `file_path`
2. Write the file to $ATTUNE_ARTIFACTS_DIR/{file_path} on the shared volume
No HTTP upload is needed — the worker and action process share an artifact
volume, so the action writes directly to disk.
The progress artifact is still per-execution (ephemeral status indicator).
Parameters:
iterations - Number of iterations to run (default: 50)
visibility - Artifact visibility level for file artifacts: "public" or "private"
(default: "private"). Public artifacts are viewable by all authenticated
users on the platform. Private artifacts are restricted based on their
scope/owner fields.
Note: Progress artifacts always use the server default visibility (public), since
they are informational status indicators that anyone watching an execution should
be able to see. The visibility parameter only controls file artifacts.
""" """
import json import json
@@ -49,67 +72,51 @@ def api_request(
raise raise
def multipart_upload( def allocate_file_version_by_ref(
base_url, path, file_bytes, filename, token, content_type="text/plain"
):
"""Upload a file via multipart/form-data."""
boundary = f"----AttuneArtifact{int(time.time() * 1000)}"
body_parts = []
# file field
body_parts.append(f"--{boundary}\r\n".encode())
body_parts.append(
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode()
)
body_parts.append(f"Content-Type: {content_type}\r\n\r\n".encode())
body_parts.append(file_bytes)
body_parts.append(b"\r\n")
# content_type field
body_parts.append(f"--{boundary}\r\n".encode())
body_parts.append(b'Content-Disposition: form-data; name="content_type"\r\n\r\n')
body_parts.append(content_type.encode())
body_parts.append(b"\r\n")
# created_by field
body_parts.append(f"--{boundary}\r\n".encode())
body_parts.append(b'Content-Disposition: form-data; name="created_by"\r\n\r\n')
body_parts.append(b"python_example.artifact_demo")
body_parts.append(b"\r\n")
body_parts.append(f"--{boundary}--\r\n".encode())
full_body = b"".join(body_parts)
url = f"{base_url}{path}"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Accept": "application/json",
}
req = urllib.request.Request(url, data=full_body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8")), resp.status
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8", errors="replace")
print(f"Upload error {e.code} on {path}: {error_body}", file=sys.stderr)
raise
def login(base_url, username, password):
"""Authenticate and return a JWT token."""
data, _ = api_request(
base_url, base_url,
"/auth/login", token,
artifact_ref,
execution_id=None,
visibility=None,
content_type="text/plain",
name=None,
description=None,
):
"""Upsert an artifact by ref and allocate a file-backed version.
Single API call that creates the artifact if it doesn't exist (or
reuses the existing one) and allocates a new version with a file_path.
Returns:
(artifact_id, version_id, file_path) tuple
"""
payload = {
"scope": "action",
"owner": "python_example.artifact_demo",
"type": "file_text",
"retention_policy": "versions",
"retention_limit": 10,
"content_type": content_type,
"created_by": "python_example.artifact_demo",
}
if execution_id is not None:
payload["execution"] = execution_id
if visibility is not None:
payload["visibility"] = visibility
if name is not None:
payload["name"] = name
if description is not None:
payload["description"] = description
resp, _ = api_request(
base_url,
f"/api/v1/artifacts/ref/{artifact_ref}/versions/file",
method="POST", method="POST",
data={ data=payload,
"username": username, token=token,
"password": password,
},
) )
return data["data"]["access_token"] version_data = resp["data"]
return version_data["artifact"], version_data["id"], version_data["file_path"]
def create_artifact( def create_artifact(
@@ -119,7 +126,7 @@ def create_artifact(
name, name,
artifact_type, artifact_type,
execution_id, execution_id,
content_type=None, visibility=None,
description=None, description=None,
data=None, data=None,
): ):
@@ -134,9 +141,9 @@ def create_artifact(
"name": name, "name": name,
"execution": execution_id, "execution": execution_id,
} }
if content_type: if visibility is not None:
payload["content_type"] = content_type payload["visibility"] = visibility
if description: if description is not None:
payload["description"] = description payload["description"] = description
if data is not None: if data is not None:
payload["data"] = data payload["data"] = data
@@ -165,50 +172,73 @@ def main():
# Read parameters from stdin (JSON format) # Read parameters from stdin (JSON format)
params = json.loads(sys.stdin.readline()) params = json.loads(sys.stdin.readline())
iterations = params.get("iterations", 50) iterations = params.get("iterations", 50)
username = params.get("username") or os.environ.get( visibility = params.get("visibility", "private")
"ATTUNE_USERNAME", "test@attune.local"
) # Validate visibility value
password = params.get("password") or os.environ.get( if visibility not in ("public", "private"):
"ATTUNE_PASSWORD", "TestPass123!" raise ValueError(
f"Invalid visibility '{visibility}': must be 'public' or 'private'"
) )
# Get execution context from environment # Get execution context from environment
api_url = os.environ.get("ATTUNE_API_URL", "http://localhost:8080") api_url = os.environ.get("ATTUNE_API_URL", "")
token = os.environ.get("ATTUNE_API_TOKEN", "")
exec_id_str = os.environ.get("ATTUNE_EXEC_ID", "") exec_id_str = os.environ.get("ATTUNE_EXEC_ID", "")
artifacts_dir = os.environ.get("ATTUNE_ARTIFACTS_DIR", "")
execution_id = int(exec_id_str) if exec_id_str else None execution_id = int(exec_id_str) if exec_id_str else None
if not api_url:
raise RuntimeError(
"ATTUNE_API_URL environment variable is not set. "
"This action must be run by the Attune worker."
)
if not token:
raise RuntimeError(
"ATTUNE_API_TOKEN environment variable is not set. "
"This action must be run by the Attune worker."
)
if not artifacts_dir:
raise RuntimeError(
"ATTUNE_ARTIFACTS_DIR environment variable is not set. "
"This action must be run by the Attune worker."
)
print( print(
f"Artifact demo starting: {iterations} iterations, API at {api_url}", f"Artifact demo starting: {iterations} iterations, "
f"visibility={visibility}, API at {api_url}, "
f"artifacts_dir={artifacts_dir}",
file=sys.stderr, file=sys.stderr,
) )
# Authenticate # ----------------------------------------------------------------
token = login(api_url, username, password) # File artifact — single call upserts artifact + allocates version
print("Authenticated successfully", file=sys.stderr) # ----------------------------------------------------------------
file_ref = "python_example.artifact_demo.log"
# Build unique artifact refs using execution ID to avoid collisions file_artifact_id, version_id, file_path = allocate_file_version_by_ref(
ts = int(time.time())
ref_suffix = f"{execution_id}_{ts}" if execution_id else str(ts)
file_ref = f"python_example.artifact_demo.log.{ref_suffix}"
progress_ref = f"python_example.artifact_demo.progress.{ref_suffix}"
# Create file artifact (file_text type)
file_artifact_id = create_artifact(
base_url=api_url, base_url=api_url,
token=token, token=token,
ref=file_ref, artifact_ref=file_ref,
name="Artifact Demo Log",
artifact_type="file_text",
execution_id=execution_id, execution_id=execution_id,
visibility=visibility,
content_type="text/plain", content_type="text/plain",
description=f"Log output from artifact demo ({iterations} iterations)", name="Demo Log",
description="Log output from the artifact demo action",
) )
full_file_path = os.path.join(artifacts_dir, file_path)
print( print(
f"Created file artifact ID={file_artifact_id} ref={file_ref}", f"Allocated file artifact ref={file_ref} id={file_artifact_id} "
f"version={version_id} path={file_path}",
file=sys.stderr, file=sys.stderr,
) )
# Create progress artifact # ----------------------------------------------------------------
# Progress artifact — per-execution (ephemeral status indicator)
# ----------------------------------------------------------------
ts = int(time.time())
ref_suffix = f"{execution_id}_{ts}" if execution_id else str(ts)
progress_ref = f"python_example.artifact_demo.progress.{ref_suffix}"
progress_artifact_id = create_artifact( progress_artifact_id = create_artifact(
base_url=api_url, base_url=api_url,
token=token, token=token,
@@ -220,11 +250,14 @@ def main():
data=[], # Initialize with empty array data=[], # Initialize with empty array
) )
print( print(
f"Created progress artifact ID={progress_artifact_id} ref={progress_ref}", f"Created progress artifact ID={progress_artifact_id} ref={progress_ref} "
f"visibility=public (server default)",
file=sys.stderr, file=sys.stderr,
) )
# Run iterations # ----------------------------------------------------------------
# Run iterations — collect log lines, write file at the end
# ----------------------------------------------------------------
log_lines = [] log_lines = []
for i in range(iterations): for i in range(iterations):
iteration = i + 1 iteration = i + 1
@@ -237,17 +270,6 @@ def main():
print(f" Iteration {iteration}/{iterations} ({pct}%)", file=sys.stderr) print(f" Iteration {iteration}/{iterations} ({pct}%)", file=sys.stderr)
# Upload the full log as a new file version
full_log = "\n".join(log_lines) + "\n"
multipart_upload(
base_url=api_url,
path=f"/api/v1/artifacts/{file_artifact_id}/versions/upload",
file_bytes=full_log.encode("utf-8"),
filename="artifact_demo.log",
token=token,
content_type="text/plain",
)
# Append progress entry # Append progress entry
append_progress( append_progress(
api_url, api_url,
@@ -266,11 +288,26 @@ def main():
if iteration < iterations: if iteration < iterations:
time.sleep(0.5) time.sleep(0.5)
# ----------------------------------------------------------------
# Write the complete log file to the shared artifact volume
# ----------------------------------------------------------------
full_log = "\n".join(log_lines) + "\n"
with open(full_file_path, "w", encoding="utf-8") as f:
f.write(full_log)
print(
f"Wrote {len(full_log)} bytes to {full_file_path}",
file=sys.stderr,
)
elapsed = round(time.time() - start_time, 3) elapsed = round(time.time() - start_time, 3)
result = { result = {
"file_artifact_id": file_artifact_id, "file_artifact_id": file_artifact_id,
"file_artifact_ref": file_ref,
"file_version_id": version_id,
"file_path": file_path,
"progress_artifact_id": progress_artifact_id, "progress_artifact_id": progress_artifact_id,
"iterations_completed": iterations, "iterations_completed": iterations,
"visibility": visibility,
"elapsed_seconds": elapsed, "elapsed_seconds": elapsed,
"success": True, "success": True,
} }

View File

@@ -3,7 +3,7 @@
ref: python_example.artifact_demo ref: python_example.artifact_demo
label: "Artifact Demo" label: "Artifact Demo"
description: "Creates a file artifact and a progress artifact, writing lines and updating progress over multiple iterations" description: "Creates a file artifact (written directly to the shared volume) and a progress artifact, updating progress over multiple iterations"
enabled: true enabled: true
# Runner type determines how the action is executed # Runner type determines how the action is executed
@@ -26,34 +26,48 @@ output_format: json
parameters: parameters:
iterations: iterations:
type: integer type: integer
description: "Number of iterations to run (each adds a log line and 2% progress)" description: "Number of iterations to run (each adds a log line and updates progress)"
default: 50 default: 50
minimum: 1 minimum: 1
maximum: 200 maximum: 200
username: visibility:
type: string type: string
description: "API username for authentication (defaults to ATTUNE_USERNAME env var or test@attune.local)" description: "Artifact visibility level: public (all authenticated users) or private (scope/owner restricted)"
default: "test@attune.local" default: "private"
password: enum:
type: string - "public"
description: "API password for authentication (defaults to ATTUNE_PASSWORD env var or TestPass123!)" - "private"
secret: true
default: "TestPass123!"
# Output schema (flat format) # Output schema (flat format)
output_schema: output_schema:
file_artifact_id: file_artifact_id:
type: integer type: integer
description: "ID of the created file artifact" description: "ID of the file artifact"
required: true
file_artifact_ref:
type: string
description: "Stable ref of the file artifact"
required: true
file_version_id:
type: integer
description: "ID of the file artifact version created for this execution"
required: true
file_path:
type: string
description: "Relative path of the file on the shared artifact volume"
required: true required: true
progress_artifact_id: progress_artifact_id:
type: integer type: integer
description: "ID of the created progress artifact" description: "ID of the progress artifact"
required: true required: true
iterations_completed: iterations_completed:
type: integer type: integer
description: "Number of iterations completed" description: "Number of iterations completed"
required: true required: true
visibility:
type: string
description: "Visibility level that was applied to the file artifact"
required: true
success: success:
type: boolean type: boolean
description: "Whether the demo completed successfully" description: "Whether the demo completed successfully"

52
actions/flaky_fail.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""
Flaky Fail Action - Python Example Pack
A Python action that randomly fails with a configurable probability.
Useful for testing error handling, retry logic, and workflow failure paths.
Actions receive parameters as JSON on stdin and write results to stdout.
"""
import json
import random
import sys
def main():
# Read parameters from stdin (JSON format)
params = json.loads(sys.stdin.readline())
failure_probability = float(params.get("failure_probability", 0.1))
# Clamp to valid range
failure_probability = max(0.0, min(1.0, failure_probability))
roll = random.random()
failed = roll < failure_probability
if failed:
print(
json.dumps(
{
"error": "Random failure triggered",
"failure_probability": failure_probability,
"roll": round(roll, 6),
}
),
file=sys.stderr,
)
sys.exit(1)
print(
json.dumps(
{
"message": "Success! Did not fail this time.",
"failure_probability": failure_probability,
"roll": round(roll, 6),
}
)
)
if __name__ == "__main__":
main()

52
actions/flaky_fail.yaml Normal file
View File

@@ -0,0 +1,52 @@
# Flaky Fail Action
# Randomly fails with a configurable probability, useful for testing error handling and retry logic
ref: python_example.flaky_fail
label: "Flaky Fail"
description: "An action that randomly fails with a configurable probability. Useful for testing error handling, retry logic, and workflow failure paths."
enabled: true
# Runner type determines how the action is executed
runner_type: python
# Minimum Python version required (semver constraint)
runtime_version: ">=3.9"
# Entry point is the Python script to execute
entry_point: flaky_fail.py
# Parameter delivery: stdin for secure parameter passing
parameter_delivery: stdin
parameter_format: json
# Output format: json (structured data parsing enabled)
output_format: json
# Action parameters schema (flat format with inline required/secret)
parameters:
failure_probability:
type: number
description: "Probability of failure between 0.0 (never fail) and 1.0 (always fail)"
default: 0.1
# Output schema (flat format)
output_schema:
succeeded:
type: boolean
description: "Whether the action succeeded (always true when it doesn't fail)"
required: true
roll:
type: number
description: "The random value that was rolled (0.0 to 1.0)"
required: true
threshold:
type: number
description: "The failure probability threshold that was used"
required: true
# Tags for categorization
tags:
- python
- example
- testing
- error-handling

105
actions/simulate_work.py Normal file
View File

@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Simulate Work Action - Python Example Pack
Simulates a unit of work that takes a configurable amount of time.
Returns structured JSON output with timing information and an optional
payload. Useful for testing workflows, the timeline DAG visualizer,
and execution monitoring.
Parameters (via stdin JSON):
duration_seconds - How long to simulate work (float, default: 1.0)
label - A label for this work unit (string, default: "work")
fail - Whether to simulate a failure (bool, default: false)
fail_after - If failing, fail after this many seconds (float, default: 0)
output_data - Arbitrary JSON data to include in the result (default: null)
Output (JSON):
label - The label that was passed in
duration_seconds - Actual elapsed time
requested_seconds - The requested duration
output_data - The pass-through data (if any)
success - Always true on success (failures exit non-zero)
"""
import json
import sys
import time
def main():
params = json.loads(sys.stdin.readline())
duration_seconds = float(params.get("duration_seconds", 1.0))
label = params.get("label", "work")
fail = params.get("fail", False)
fail_after = float(params.get("fail_after", 0))
output_data = params.get("output_data", None)
# Clamp duration to a reasonable range
duration_seconds = max(0.0, min(duration_seconds, 300.0))
print(
f"[simulate_work] Starting '{label}' for {duration_seconds}s", file=sys.stderr
)
start = time.time()
if fail and fail_after > 0:
# Sleep for fail_after seconds then crash
time.sleep(min(fail_after, duration_seconds))
elapsed = round(time.time() - start, 3)
print(
json.dumps(
{
"error": f"Simulated failure in '{label}' after {elapsed}s",
"label": label,
"elapsed": elapsed,
}
),
file=sys.stderr,
)
sys.exit(1)
if fail:
# Immediate failure
print(
json.dumps(
{
"error": f"Simulated immediate failure in '{label}'",
"label": label,
}
),
file=sys.stderr,
)
sys.exit(1)
# Simulate work with periodic progress to stderr
remaining = duration_seconds
while remaining > 0:
chunk = min(remaining, 1.0)
time.sleep(chunk)
remaining -= chunk
elapsed = round(time.time() - start, 3)
pct = min(100.0, round((elapsed / max(duration_seconds, 0.001)) * 100, 1))
print(
f"[simulate_work] '{label}' progress: {pct}% ({elapsed}s)", file=sys.stderr
)
elapsed = round(time.time() - start, 3)
result = {
"label": label,
"duration_seconds": elapsed,
"requested_seconds": duration_seconds,
"success": True,
}
if output_data is not None:
result["output_data"] = output_data
print(json.dumps(result))
print(f"[simulate_work] '{label}' completed in {elapsed}s", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,76 @@
# Simulate Work Action
# Simulates a unit of work with configurable duration and structured output.
# Useful for testing workflows, the timeline DAG visualizer, and execution monitoring.
ref: python_example.simulate_work
label: "Simulate Work"
description: "Simulates a unit of work that takes a configurable amount of time, returning structured JSON with timing info. Supports simulated failures for testing error-handling paths."
enabled: true
# Runner type determines how the action is executed
runner_type: python
# Minimum Python version required (semver constraint)
runtime_version: ">=3.9"
# Entry point is the Python script to execute
entry_point: simulate_work.py
# Parameter delivery: stdin for secure parameter passing
parameter_delivery: stdin
parameter_format: json
# Output format: json (structured data parsing enabled)
output_format: json
# Action parameters schema (flat format with inline required/secret)
parameters:
duration_seconds:
type: number
description: "How long to simulate work in seconds (clamped to 0300)"
default: 1.0
label:
type: string
description: "A human-readable label for this work unit"
default: "work"
fail:
type: boolean
description: "Whether to simulate a failure (exits non-zero)"
default: false
fail_after:
type: number
description: "If failing, wait this many seconds before crashing (0 = immediate)"
default: 0
output_data:
type: object
description: "Arbitrary JSON data to pass through to the result"
# Output schema (flat format)
output_schema:
label:
type: string
description: "The label that was passed in"
required: true
duration_seconds:
type: number
description: "Actual elapsed wall-clock time in seconds"
required: true
requested_seconds:
type: number
description: "The requested duration"
required: true
output_data:
type: object
description: "The pass-through data (if provided)"
success:
type: boolean
description: "Always true on success (failures exit non-zero)"
required: true
# Tags for categorization
tags:
- python
- example
- testing
- workflow
- simulation

View File

@@ -0,0 +1,71 @@
# Timeline Demo Workflow Action
# Action metadata for the timeline_demo workflow definition.
#
# The workflow graph (tasks, transitions, variables) lives in the separate
# workflow file referenced by `workflow_file`. This action YAML controls
# action-level concerns: ref, label, parameter schema, output schema, tags,
# and (in future) execution policies.
#
# Multiple actions can reference the same workflow file with different
# parameter schemas or policy configurations.
ref: python_example.timeline_demo
label: "Timeline DAG Demo"
description: >
A comprehensive demo workflow that exercises every feature of the
Workflow Timeline DAG visualizer: parallel branches with different
durations, with_items fan-out, failure handling paths, publish
directives, retries, timeouts, and custom edge styling.
enabled: true
# Reference to the workflow definition file (relative to actions/ directory)
workflow_file: workflows/timeline_demo.yaml
# Action parameters schema (flat format with inline required/secret)
# These are the inputs exposed when executing this action. They map to
# the workflow's `parameters` block but are authoritative for the action.
parameters:
item_count:
type: integer
description: "Number of items to process in the with_items stage (220)"
default: 5
build_duration:
type: number
description: "Duration of the simulated build step in seconds"
default: 6.0
lint_duration:
type: number
description: "Duration of the simulated lint step in seconds"
default: 3.0
scan_duration:
type: number
description: "Duration of the simulated security scan in seconds"
default: 4.0
item_duration:
type: number
description: "Duration of each with_items work unit in seconds"
default: 2.0
fail_validation:
type: boolean
description: "If true, the validate step will intentionally fail to exercise the error path"
default: false
# Output schema (flat format)
output:
status:
type: string
description: "Final workflow status: 'success' or 'failed'"
items_processed:
type: integer
description: "Number of items that were processed"
total_duration:
type: number
description: "Wall-clock duration of the final task in seconds"
# Tags for categorization
tags:
- demo
- timeline
- workflow
- visualization
- python

View File

@@ -0,0 +1,293 @@
# Timeline Demo Workflow
# Demonstrates various workflow features and exercises the Timeline DAG visualizer.
#
# This is an action-linked workflow file — action-level metadata (ref, label,
# description, parameters, output, tags) is defined in the companion action
# YAML at actions/timeline_demo.yaml. This file contains only the execution
# graph: version, vars, tasks, and output_map.
#
# Features exercised:
# - Sequential task chains
# - Parallel fan-out (3 branches) and fan-in (merge)
# - with_items expansion with concurrency limiting
# - Failure/error handling paths (succeeded/failed/timed_out transitions)
# - Variable-duration tasks for interesting Gantt chart shapes
# - Publish directives passing data between tasks
# - Custom transition labels and colors via __chart_meta__
# - Retry configuration
# - Timeout handling
#
# Expected timeline shape (approximate):
#
# initialize ─┬─► build_artifacts ──────────────────────┐
# ├─► run_linter ──────────┐ ├─► merge_results ─► process_items(×5) ─► validate ─┬─► finalize_success
# └─► security_scan ───────┘ │ └─► handle_failure ─► finalize_failure
# └────────────────┘
version: "1.0.0"
vars:
build_result: null
lint_result: null
scan_result: null
merged_summary: null
items_processed: 0
validation_passed: false
tasks:
# ── Stage 1: Initialize ──────────────────────────────────────────────
- name: initialize
action: python_example.simulate_work
input:
duration_seconds: 1.0
label: "initialize"
output_data:
item_count: "{{ parameters.item_count }}"
started: true
next:
- when: "{{ succeeded() }}"
publish:
- item_count: "{{ parameters.item_count }}"
do:
- build_artifacts
- run_linter
- security_scan
__chart_meta__:
label: "fan-out"
color: "#6366f1"
- when: "{{ failed() }}"
do:
- finalize_failure
__chart_meta__:
label: "init failed"
color: "#ef4444"
# ── Stage 2a: Build artifacts (longest parallel branch) ──────────────
- name: build_artifacts
action: python_example.simulate_work
input:
duration_seconds: "{{ parameters.build_duration }}"
label: "build"
output_data:
artifact: "app-v1.2.3.tar.gz"
size_mb: 48
retry:
count: 2
delay: 1
backoff: constant
next:
- when: "{{ succeeded() }}"
publish:
- build_result: "{{ result() }}"
do:
- merge_results
__chart_meta__:
label: "build ok"
color: "#22c55e"
- when: "{{ failed() }}"
do:
- handle_failure
__chart_meta__:
label: "build failed"
color: "#ef4444"
# ── Stage 2b: Run linter (medium parallel branch) ────────────────────
- name: run_linter
action: python_example.simulate_work
input:
duration_seconds: "{{ parameters.lint_duration }}"
label: "lint"
output_data:
warnings: 3
errors: 0
files_checked: 42
next:
- when: "{{ succeeded() }}"
publish:
- lint_result: "{{ result() }}"
do:
- merge_results
__chart_meta__:
label: "lint clean"
color: "#22c55e"
- when: "{{ failed() }}"
do:
- handle_failure
__chart_meta__:
label: "lint errors"
color: "#ef4444"
# ── Stage 2c: Security scan (short parallel branch) ──────────────────
- name: security_scan
action: python_example.simulate_work
input:
duration_seconds: "{{ parameters.scan_duration }}"
label: "security-scan"
output_data:
vulnerabilities: 0
packages_scanned: 128
timeout: 30
next:
- when: "{{ succeeded() }}"
publish:
- scan_result: "{{ result() }}"
do:
- merge_results
__chart_meta__:
label: "scan clear"
color: "#22c55e"
- when: "{{ failed() }}"
do:
- handle_failure
__chart_meta__:
label: "scan failed"
color: "#ef4444"
- when: "{{ timed_out() }}"
do:
- handle_failure
__chart_meta__:
label: "scan timed out"
color: "#f97316"
# ── Stage 3: Merge results (join from 3 parallel branches) ──────────
- name: merge_results
action: python_example.simulate_work
join: 3
input:
duration_seconds: 1.5
label: "merge"
output_data:
build: "{{ workflow.build_result }}"
lint: "{{ workflow.lint_result }}"
scan: "{{ workflow.scan_result }}"
next:
- when: "{{ succeeded() }}"
publish:
- merged_summary: "{{ result() }}"
do:
- generate_item_list
__chart_meta__:
label: "merged"
color: "#6366f1"
# ── Stage 4a: Generate the item list for with_items ──────────────────
- name: generate_item_list
action: python_example.list_numbers
input:
n: "{{ parameters.item_count }}"
start: 1
next:
- when: "{{ succeeded() }}"
publish:
- number_list: "{{ result().data.items }}"
do:
- process_items
__chart_meta__:
label: "items ready"
color: "#6366f1"
# ── Stage 4b: Process each item (with_items + concurrency) ──────────
- name: process_items
action: python_example.simulate_work
with_items: "{{ workflow.number_list }}"
concurrency: 3
input:
duration_seconds: "{{ parameters.item_duration }}"
label: "item-{{ item }}"
output_data:
item_number: "{{ item }}"
index: "{{ index }}"
next:
- when: "{{ succeeded() }}"
publish:
- items_processed: "{{ parameters.item_count }}"
do:
- validate
__chart_meta__:
label: "all items done"
color: "#22c55e"
- when: "{{ failed() }}"
do:
- handle_failure
__chart_meta__:
label: "item failed"
color: "#ef4444"
# ── Stage 5: Validate everything ─────────────────────────────────────
- name: validate
action: python_example.simulate_work
input:
duration_seconds: 2.0
label: "validate"
fail: "{{ parameters.fail_validation }}"
fail_after: 1.0
output_data:
checks_passed: 12
checks_total: 12
retry:
count: 1
delay: 2
backoff: constant
next:
- when: "{{ succeeded() }}"
publish:
- validation_passed: true
do:
- finalize_success
__chart_meta__:
label: "valid ✓"
color: "#22c55e"
- when: "{{ failed() }}"
publish:
- validation_passed: false
do:
- handle_failure
__chart_meta__:
label: "invalid ✗"
color: "#ef4444"
# ── Terminal: Success ────────────────────────────────────────────────
- name: finalize_success
action: python_example.simulate_work
input:
duration_seconds: 1.0
label: "finalize-success"
output_data:
status: "success"
items_processed: "{{ workflow.items_processed }}"
validation: "{{ workflow.validation_passed }}"
# ── Error path: Handle failure ───────────────────────────────────────
- name: handle_failure
action: python_example.simulate_work
input:
duration_seconds: 1.5
label: "handle-failure"
output_data:
status: "handling_error"
build: "{{ workflow.build_result }}"
lint: "{{ workflow.lint_result }}"
scan: "{{ workflow.scan_result }}"
next:
- when: "{{ succeeded() }}"
do:
- finalize_failure
__chart_meta__:
label: "error handled"
color: "#f97316"
# ── Terminal: Failure ────────────────────────────────────────────────
- name: finalize_failure
action: python_example.simulate_work
input:
duration_seconds: 0.5
label: "finalize-failure"
output_data:
status: "failed"
items_processed: "{{ workflow.items_processed }}"
validation: "{{ workflow.validation_passed }}"
output_map:
status: "{{ 'success' if workflow.validation_passed else 'failed' }}"
items_processed: "{{ workflow.items_processed }}"
total_duration: "{{ task.finalize_success.result.duration_seconds if workflow.validation_passed else task.finalize_failure.result.duration_seconds }}"