From 4df156f21036fe7c22d5847d1658ae75c56b62b2 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Wed, 4 Mar 2026 13:49:14 -0600 Subject: [PATCH] workflow example --- README.md | 55 ++++- actions/artifact_demo.py | 235 ++++++++++++--------- actions/artifact_demo.yaml | 38 ++-- actions/flaky_fail.py | 52 +++++ actions/flaky_fail.yaml | 52 +++++ actions/simulate_work.py | 105 ++++++++++ actions/simulate_work.yaml | 76 +++++++ actions/timeline_demo.yaml | 71 +++++++ actions/workflows/timeline_demo.yaml | 293 +++++++++++++++++++++++++++ 9 files changed, 865 insertions(+), 112 deletions(-) create mode 100644 actions/flaky_fail.py create mode 100644 actions/flaky_fail.yaml create mode 100644 actions/simulate_work.py create mode 100644 actions/simulate_work.yaml create mode 100644 actions/timeline_demo.yaml create mode 100644 actions/workflows/timeline_demo.yaml diff --git a/README.md b/README.md index 487e919..4c38023 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,18 @@ This pack exercises as many parts of the Attune SDLC as possible: | Ref | Description | |-----|-------------| | `python_example.hello` | Returns `"Hello, Python"` — minimal action | -| `python_example.http_example` | Uses `requests` to GET `https://example.com` | +| `python_example.http_example` | Uses `urllib` to GET `https://example.com` | | `python_example.read_counter` | Consumes a counter value and returns a formatted message | +| `python_example.list_numbers` | Returns a list of sequential integers as JSON | +| `python_example.flaky_fail` | Randomly fails with configurable probability — useful for testing error handling and retry logic | +| `python_example.simulate_work` | Simulates a unit of work with configurable duration, optional failure, and structured output — useful for testing workflows and the timeline visualizer | +| `python_example.artifact_demo` | Creates file and progress artifacts via the Attune API, demonstrating the artifact system | + +### Workflows + +| Ref | Description | +|-----|-------------| +| `python_example.timeline_demo` | Comprehensive demo workflow exercising parallel fan-out/fan-in, `with_items` concurrency, failure paths, retries, timeouts, publish directives, and custom edge styling — designed to produce a rich Timeline DAG visualization | ### Triggers @@ -188,6 +198,49 @@ attune action execute python_example.read_counter --param counter=99 --param rul # Output: {"message": "Counter value is 99 (from rule: test)", ...} ``` +### Test the simulate_work action + +```bash +attune action execute python_example.simulate_work \ + --param duration_seconds=2.0 --param label=demo +# Output: {"label": "demo", "duration_seconds": 2.003, "requested_seconds": 2.0, "success": true} + +# Test failure simulation: +attune action execute python_example.simulate_work \ + --param fail=true --param label=crash-test +# Exits non-zero with error on stderr +``` + +### Run the Timeline Demo workflow + +The `timeline_demo` workflow is designed to produce a visually rich Timeline DAG +on the execution detail page. It exercises parallel branches, `with_items` +expansion, failure handling, and custom transition styling. + +```bash +# Happy path (all tasks succeed, ~25s total): +attune action execute python_example.timeline_demo + +# With more items and faster durations: +attune action execute python_example.timeline_demo \ + --param item_count=10 --param item_duration=1.0 --param build_duration=4.0 + +# Exercise the failure/error-handling path: +attune action execute python_example.timeline_demo \ + --param fail_validation=true + +# Then open the execution detail page in the Web UI to see the Timeline DAG. +``` + +**What to look for in the Timeline DAG:** + +- **Fan-out** from `initialize` into 3 parallel branches (`build_artifacts`, `run_linter`, `security_scan`) with different durations +- **Fan-in** at `merge_results` with a `join: 3` barrier — the bar starts only after the slowest branch completes +- **`with_items` expansion** at `process_items` — each item appears as a separate child execution bar, with `concurrency: 3` controlling how many run simultaneously +- **Custom edge colors**: indigo for fan-out/merge, green for success, red for failure, orange for timeout/error-handled paths +- **Custom edge labels**: "fan-out", "build ok", "lint clean", "scan clear", "valid ✓", "invalid ✗", etc. +- **Failure path** (when `fail_validation=true`): the DAG shows the red edge from `validate` → `handle_failure` → `finalize_failure` + ### Enable the rule to start the counter sensor loop ```bash diff --git a/actions/artifact_demo.py b/actions/artifact_demo.py index 1476058..f5dcbab 100644 --- a/actions/artifact_demo.py +++ b/actions/artifact_demo.py @@ -4,12 +4,35 @@ Artifact Demo Action - Python Example Pack Demonstrates creating file and progress artifacts via the Attune API. Each iteration: - 1. Appends a timestamped log line to a file artifact (via version upload) - 2. Updates a progress artifact by 2% + 1. Appends a line to an in-memory log + 2. Updates a progress artifact 3. Sleeps for 0.5 seconds -The action authenticates to the API using the provided credentials (or defaults) -and uses ATTUNE_API_URL / ATTUNE_EXEC_ID environment variables set by the worker. +After all iterations complete, the full log is written directly to the +shared artifact volume as a **single version** of a file artifact with a +stable ref (`python_example.artifact_demo.log`). + +File-based artifact flow (single API call): + 1. POST /api/v1/artifacts/ref/{ref}/versions/file — upserts the artifact + (creating it if it doesn't exist) and allocates a version number, + returning a relative `file_path` + 2. Write the file to $ATTUNE_ARTIFACTS_DIR/{file_path} on the shared volume + +No HTTP upload is needed — the worker and action process share an artifact +volume, so the action writes directly to disk. + +The progress artifact is still per-execution (ephemeral status indicator). + +Parameters: + iterations - Number of iterations to run (default: 50) + visibility - Artifact visibility level for file artifacts: "public" or "private" + (default: "private"). Public artifacts are viewable by all authenticated + users on the platform. Private artifacts are restricted based on their + scope/owner fields. + +Note: Progress artifacts always use the server default visibility (public), since +they are informational status indicators that anyone watching an execution should +be able to see. The visibility parameter only controls file artifacts. """ import json @@ -49,67 +72,51 @@ def api_request( raise -def multipart_upload( - base_url, path, file_bytes, filename, token, content_type="text/plain" +def allocate_file_version_by_ref( + base_url, + token, + artifact_ref, + execution_id=None, + visibility=None, + content_type="text/plain", + name=None, + description=None, ): - """Upload a file via multipart/form-data.""" - boundary = f"----AttuneArtifact{int(time.time() * 1000)}" - body_parts = [] + """Upsert an artifact by ref and allocate a file-backed version. - # file field - body_parts.append(f"--{boundary}\r\n".encode()) - body_parts.append( - f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode() - ) - body_parts.append(f"Content-Type: {content_type}\r\n\r\n".encode()) - body_parts.append(file_bytes) - body_parts.append(b"\r\n") + Single API call that creates the artifact if it doesn't exist (or + reuses the existing one) and allocates a new version with a file_path. - # content_type field - body_parts.append(f"--{boundary}\r\n".encode()) - body_parts.append(b'Content-Disposition: form-data; name="content_type"\r\n\r\n') - body_parts.append(content_type.encode()) - body_parts.append(b"\r\n") - - # created_by field - body_parts.append(f"--{boundary}\r\n".encode()) - body_parts.append(b'Content-Disposition: form-data; name="created_by"\r\n\r\n') - body_parts.append(b"python_example.artifact_demo") - body_parts.append(b"\r\n") - - body_parts.append(f"--{boundary}--\r\n".encode()) - - full_body = b"".join(body_parts) - - url = f"{base_url}{path}" - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": f"multipart/form-data; boundary={boundary}", - "Accept": "application/json", + Returns: + (artifact_id, version_id, file_path) tuple + """ + payload = { + "scope": "action", + "owner": "python_example.artifact_demo", + "type": "file_text", + "retention_policy": "versions", + "retention_limit": 10, + "content_type": content_type, + "created_by": "python_example.artifact_demo", } + if execution_id is not None: + payload["execution"] = execution_id + if visibility is not None: + payload["visibility"] = visibility + if name is not None: + payload["name"] = name + if description is not None: + payload["description"] = description - req = urllib.request.Request(url, data=full_body, headers=headers, method="POST") - try: - with urllib.request.urlopen(req, timeout=30) as resp: - return json.loads(resp.read().decode("utf-8")), resp.status - except urllib.error.HTTPError as e: - error_body = e.read().decode("utf-8", errors="replace") - print(f"Upload error {e.code} on {path}: {error_body}", file=sys.stderr) - raise - - -def login(base_url, username, password): - """Authenticate and return a JWT token.""" - data, _ = api_request( + resp, _ = api_request( base_url, - "/auth/login", + f"/api/v1/artifacts/ref/{artifact_ref}/versions/file", method="POST", - data={ - "username": username, - "password": password, - }, + data=payload, + token=token, ) - return data["data"]["access_token"] + version_data = resp["data"] + return version_data["artifact"], version_data["id"], version_data["file_path"] def create_artifact( @@ -119,7 +126,7 @@ def create_artifact( name, artifact_type, execution_id, - content_type=None, + visibility=None, description=None, data=None, ): @@ -134,9 +141,9 @@ def create_artifact( "name": name, "execution": execution_id, } - if content_type: - payload["content_type"] = content_type - if description: + if visibility is not None: + payload["visibility"] = visibility + if description is not None: payload["description"] = description if data is not None: payload["data"] = data @@ -165,50 +172,73 @@ def main(): # Read parameters from stdin (JSON format) params = json.loads(sys.stdin.readline()) iterations = params.get("iterations", 50) - username = params.get("username") or os.environ.get( - "ATTUNE_USERNAME", "test@attune.local" - ) - password = params.get("password") or os.environ.get( - "ATTUNE_PASSWORD", "TestPass123!" - ) + visibility = params.get("visibility", "private") + + # Validate visibility value + if visibility not in ("public", "private"): + raise ValueError( + f"Invalid visibility '{visibility}': must be 'public' or 'private'" + ) # Get execution context from environment - api_url = os.environ.get("ATTUNE_API_URL", "http://localhost:8080") + api_url = os.environ.get("ATTUNE_API_URL", "") + token = os.environ.get("ATTUNE_API_TOKEN", "") exec_id_str = os.environ.get("ATTUNE_EXEC_ID", "") + artifacts_dir = os.environ.get("ATTUNE_ARTIFACTS_DIR", "") execution_id = int(exec_id_str) if exec_id_str else None + if not api_url: + raise RuntimeError( + "ATTUNE_API_URL environment variable is not set. " + "This action must be run by the Attune worker." + ) + if not token: + raise RuntimeError( + "ATTUNE_API_TOKEN environment variable is not set. " + "This action must be run by the Attune worker." + ) + if not artifacts_dir: + raise RuntimeError( + "ATTUNE_ARTIFACTS_DIR environment variable is not set. " + "This action must be run by the Attune worker." + ) + print( - f"Artifact demo starting: {iterations} iterations, API at {api_url}", + f"Artifact demo starting: {iterations} iterations, " + f"visibility={visibility}, API at {api_url}, " + f"artifacts_dir={artifacts_dir}", file=sys.stderr, ) - # Authenticate - token = login(api_url, username, password) - print("Authenticated successfully", file=sys.stderr) + # ---------------------------------------------------------------- + # File artifact — single call upserts artifact + allocates version + # ---------------------------------------------------------------- + file_ref = "python_example.artifact_demo.log" - # Build unique artifact refs using execution ID to avoid collisions - ts = int(time.time()) - ref_suffix = f"{execution_id}_{ts}" if execution_id else str(ts) - file_ref = f"python_example.artifact_demo.log.{ref_suffix}" - progress_ref = f"python_example.artifact_demo.progress.{ref_suffix}" - - # Create file artifact (file_text type) - file_artifact_id = create_artifact( + file_artifact_id, version_id, file_path = allocate_file_version_by_ref( base_url=api_url, token=token, - ref=file_ref, - name="Artifact Demo Log", - artifact_type="file_text", + artifact_ref=file_ref, execution_id=execution_id, + visibility=visibility, content_type="text/plain", - description=f"Log output from artifact demo ({iterations} iterations)", + name="Demo Log", + description="Log output from the artifact demo action", ) + full_file_path = os.path.join(artifacts_dir, file_path) print( - f"Created file artifact ID={file_artifact_id} ref={file_ref}", + f"Allocated file artifact ref={file_ref} id={file_artifact_id} " + f"version={version_id} path={file_path}", file=sys.stderr, ) - # Create progress artifact + # ---------------------------------------------------------------- + # Progress artifact — per-execution (ephemeral status indicator) + # ---------------------------------------------------------------- + ts = int(time.time()) + ref_suffix = f"{execution_id}_{ts}" if execution_id else str(ts) + progress_ref = f"python_example.artifact_demo.progress.{ref_suffix}" + progress_artifact_id = create_artifact( base_url=api_url, token=token, @@ -220,11 +250,14 @@ def main(): data=[], # Initialize with empty array ) print( - f"Created progress artifact ID={progress_artifact_id} ref={progress_ref}", + f"Created progress artifact ID={progress_artifact_id} ref={progress_ref} " + f"visibility=public (server default)", file=sys.stderr, ) - # Run iterations + # ---------------------------------------------------------------- + # Run iterations — collect log lines, write file at the end + # ---------------------------------------------------------------- log_lines = [] for i in range(iterations): iteration = i + 1 @@ -237,17 +270,6 @@ def main(): print(f" Iteration {iteration}/{iterations} ({pct}%)", file=sys.stderr) - # Upload the full log as a new file version - full_log = "\n".join(log_lines) + "\n" - multipart_upload( - base_url=api_url, - path=f"/api/v1/artifacts/{file_artifact_id}/versions/upload", - file_bytes=full_log.encode("utf-8"), - filename="artifact_demo.log", - token=token, - content_type="text/plain", - ) - # Append progress entry append_progress( api_url, @@ -266,11 +288,26 @@ def main(): if iteration < iterations: time.sleep(0.5) + # ---------------------------------------------------------------- + # Write the complete log file to the shared artifact volume + # ---------------------------------------------------------------- + full_log = "\n".join(log_lines) + "\n" + with open(full_file_path, "w", encoding="utf-8") as f: + f.write(full_log) + print( + f"Wrote {len(full_log)} bytes to {full_file_path}", + file=sys.stderr, + ) + elapsed = round(time.time() - start_time, 3) result = { "file_artifact_id": file_artifact_id, + "file_artifact_ref": file_ref, + "file_version_id": version_id, + "file_path": file_path, "progress_artifact_id": progress_artifact_id, "iterations_completed": iterations, + "visibility": visibility, "elapsed_seconds": elapsed, "success": True, } diff --git a/actions/artifact_demo.yaml b/actions/artifact_demo.yaml index 11d3ae3..cbe1857 100644 --- a/actions/artifact_demo.yaml +++ b/actions/artifact_demo.yaml @@ -3,7 +3,7 @@ ref: python_example.artifact_demo label: "Artifact Demo" -description: "Creates a file artifact and a progress artifact, writing lines and updating progress over multiple iterations" +description: "Creates a file artifact (written directly to the shared volume) and a progress artifact, updating progress over multiple iterations" enabled: true # Runner type determines how the action is executed @@ -26,34 +26,48 @@ output_format: json parameters: iterations: type: integer - description: "Number of iterations to run (each adds a log line and 2% progress)" + description: "Number of iterations to run (each adds a log line and updates progress)" default: 50 minimum: 1 maximum: 200 - username: + visibility: type: string - description: "API username for authentication (defaults to ATTUNE_USERNAME env var or test@attune.local)" - default: "test@attune.local" - password: - type: string - description: "API password for authentication (defaults to ATTUNE_PASSWORD env var or TestPass123!)" - secret: true - default: "TestPass123!" + description: "Artifact visibility level: public (all authenticated users) or private (scope/owner restricted)" + default: "private" + enum: + - "public" + - "private" # Output schema (flat format) output_schema: file_artifact_id: type: integer - description: "ID of the created file artifact" + description: "ID of the file artifact" + required: true + file_artifact_ref: + type: string + description: "Stable ref of the file artifact" + required: true + file_version_id: + type: integer + description: "ID of the file artifact version created for this execution" + required: true + file_path: + type: string + description: "Relative path of the file on the shared artifact volume" required: true progress_artifact_id: type: integer - description: "ID of the created progress artifact" + description: "ID of the progress artifact" required: true iterations_completed: type: integer description: "Number of iterations completed" required: true + visibility: + type: string + description: "Visibility level that was applied to the file artifact" + required: true success: type: boolean description: "Whether the demo completed successfully" diff --git a/actions/flaky_fail.py b/actions/flaky_fail.py new file mode 100644 index 0000000..ec8fa00 --- /dev/null +++ b/actions/flaky_fail.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +""" +Flaky Fail Action - Python Example Pack + +A Python action that randomly fails with a configurable probability. +Useful for testing error handling, retry logic, and workflow failure paths. + +Actions receive parameters as JSON on stdin and write results to stdout. +""" + +import json +import random +import sys + + +def main(): + # Read parameters from stdin (JSON format) + params = json.loads(sys.stdin.readline()) + failure_probability = float(params.get("failure_probability", 0.1)) + + # Clamp to valid range + failure_probability = max(0.0, min(1.0, failure_probability)) + + roll = random.random() + failed = roll < failure_probability + + if failed: + print( + json.dumps( + { + "error": "Random failure triggered", + "failure_probability": failure_probability, + "roll": round(roll, 6), + } + ), + file=sys.stderr, + ) + sys.exit(1) + + print( + json.dumps( + { + "message": "Success! Did not fail this time.", + "failure_probability": failure_probability, + "roll": round(roll, 6), + } + ) + ) + + +if __name__ == "__main__": + main() diff --git a/actions/flaky_fail.yaml b/actions/flaky_fail.yaml new file mode 100644 index 0000000..67ae528 --- /dev/null +++ b/actions/flaky_fail.yaml @@ -0,0 +1,52 @@ +# Flaky Fail Action +# Randomly fails with a configurable probability, useful for testing error handling and retry logic + +ref: python_example.flaky_fail +label: "Flaky Fail" +description: "An action that randomly fails with a configurable probability. Useful for testing error handling, retry logic, and workflow failure paths." +enabled: true + +# Runner type determines how the action is executed +runner_type: python + +# Minimum Python version required (semver constraint) +runtime_version: ">=3.9" + +# Entry point is the Python script to execute +entry_point: flaky_fail.py + +# Parameter delivery: stdin for secure parameter passing +parameter_delivery: stdin +parameter_format: json + +# Output format: json (structured data parsing enabled) +output_format: json + +# Action parameters schema (flat format with inline required/secret) +parameters: + failure_probability: + type: number + description: "Probability of failure between 0.0 (never fail) and 1.0 (always fail)" + default: 0.1 + +# Output schema (flat format) +output_schema: + succeeded: + type: boolean + description: "Whether the action succeeded (always true when it doesn't fail)" + required: true + roll: + type: number + description: "The random value that was rolled (0.0 to 1.0)" + required: true + threshold: + type: number + description: "The failure probability threshold that was used" + required: true + +# Tags for categorization +tags: + - python + - example + - testing + - error-handling diff --git a/actions/simulate_work.py b/actions/simulate_work.py new file mode 100644 index 0000000..8c70428 --- /dev/null +++ b/actions/simulate_work.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +""" +Simulate Work Action - Python Example Pack + +Simulates a unit of work that takes a configurable amount of time. +Returns structured JSON output with timing information and an optional +payload. Useful for testing workflows, the timeline DAG visualizer, +and execution monitoring. + +Parameters (via stdin JSON): + duration_seconds - How long to simulate work (float, default: 1.0) + label - A label for this work unit (string, default: "work") + fail - Whether to simulate a failure (bool, default: false) + fail_after - If failing, fail after this many seconds (float, default: 0) + output_data - Arbitrary JSON data to include in the result (default: null) + +Output (JSON): + label - The label that was passed in + duration_seconds - Actual elapsed time + requested_seconds - The requested duration + output_data - The pass-through data (if any) + success - Always true on success (failures exit non-zero) +""" + +import json +import sys +import time + + +def main(): + params = json.loads(sys.stdin.readline()) + + duration_seconds = float(params.get("duration_seconds", 1.0)) + label = params.get("label", "work") + fail = params.get("fail", False) + fail_after = float(params.get("fail_after", 0)) + output_data = params.get("output_data", None) + + # Clamp duration to a reasonable range + duration_seconds = max(0.0, min(duration_seconds, 300.0)) + + print( + f"[simulate_work] Starting '{label}' for {duration_seconds}s", file=sys.stderr + ) + + start = time.time() + + if fail and fail_after > 0: + # Sleep for fail_after seconds then crash + time.sleep(min(fail_after, duration_seconds)) + elapsed = round(time.time() - start, 3) + print( + json.dumps( + { + "error": f"Simulated failure in '{label}' after {elapsed}s", + "label": label, + "elapsed": elapsed, + } + ), + file=sys.stderr, + ) + sys.exit(1) + + if fail: + # Immediate failure + print( + json.dumps( + { + "error": f"Simulated immediate failure in '{label}'", + "label": label, + } + ), + file=sys.stderr, + ) + sys.exit(1) + + # Simulate work with periodic progress to stderr + remaining = duration_seconds + while remaining > 0: + chunk = min(remaining, 1.0) + time.sleep(chunk) + remaining -= chunk + elapsed = round(time.time() - start, 3) + pct = min(100.0, round((elapsed / max(duration_seconds, 0.001)) * 100, 1)) + print( + f"[simulate_work] '{label}' progress: {pct}% ({elapsed}s)", file=sys.stderr + ) + + elapsed = round(time.time() - start, 3) + + result = { + "label": label, + "duration_seconds": elapsed, + "requested_seconds": duration_seconds, + "success": True, + } + if output_data is not None: + result["output_data"] = output_data + + print(json.dumps(result)) + print(f"[simulate_work] '{label}' completed in {elapsed}s", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/actions/simulate_work.yaml b/actions/simulate_work.yaml new file mode 100644 index 0000000..8a221cf --- /dev/null +++ b/actions/simulate_work.yaml @@ -0,0 +1,76 @@ +# Simulate Work Action +# Simulates a unit of work with configurable duration and structured output. +# Useful for testing workflows, the timeline DAG visualizer, and execution monitoring. + +ref: python_example.simulate_work +label: "Simulate Work" +description: "Simulates a unit of work that takes a configurable amount of time, returning structured JSON with timing info. Supports simulated failures for testing error-handling paths." +enabled: true + +# Runner type determines how the action is executed +runner_type: python + +# Minimum Python version required (semver constraint) +runtime_version: ">=3.9" + +# Entry point is the Python script to execute +entry_point: simulate_work.py + +# Parameter delivery: stdin for secure parameter passing +parameter_delivery: stdin +parameter_format: json + +# Output format: json (structured data parsing enabled) +output_format: json + +# Action parameters schema (flat format with inline required/secret) +parameters: + duration_seconds: + type: number + description: "How long to simulate work in seconds (clamped to 0–300)" + default: 1.0 + label: + type: string + description: "A human-readable label for this work unit" + default: "work" + fail: + type: boolean + description: "Whether to simulate a failure (exits non-zero)" + default: false + fail_after: + type: number + description: "If failing, wait this many seconds before crashing (0 = immediate)" + default: 0 + output_data: + type: object + description: "Arbitrary JSON data to pass through to the result" + +# Output schema (flat format) +output_schema: + label: + type: string + description: "The label that was passed in" + required: true + duration_seconds: + type: number + description: "Actual elapsed wall-clock time in seconds" + required: true + requested_seconds: + type: number + description: "The requested duration" + required: true + output_data: + type: object + description: "The pass-through data (if provided)" + success: + type: boolean + description: "Always true on success (failures exit non-zero)" + required: true + +# Tags for categorization +tags: + - python + - example + - testing + - workflow + - simulation diff --git a/actions/timeline_demo.yaml b/actions/timeline_demo.yaml new file mode 100644 index 0000000..bb51c72 --- /dev/null +++ b/actions/timeline_demo.yaml @@ -0,0 +1,71 @@ +# Timeline Demo Workflow Action +# Action metadata for the timeline_demo workflow definition. +# +# The workflow graph (tasks, transitions, variables) lives in the separate +# workflow file referenced by `workflow_file`. This action YAML controls +# action-level concerns: ref, label, parameter schema, output schema, tags, +# and (in future) execution policies. +# +# Multiple actions can reference the same workflow file with different +# parameter schemas or policy configurations. + +ref: python_example.timeline_demo +label: "Timeline DAG Demo" +description: > + A comprehensive demo workflow that exercises every feature of the + Workflow Timeline DAG visualizer: parallel branches with different + durations, with_items fan-out, failure handling paths, publish + directives, retries, timeouts, and custom edge styling. +enabled: true + +# Reference to the workflow definition file (relative to actions/ directory) +workflow_file: workflows/timeline_demo.yaml + +# Action parameters schema (flat format with inline required/secret) +# These are the inputs exposed when executing this action. They map to +# the workflow's `parameters` block but are authoritative for the action. +parameters: + item_count: + type: integer + description: "Number of items to process in the with_items stage (2–20)" + default: 5 + build_duration: + type: number + description: "Duration of the simulated build step in seconds" + default: 6.0 + lint_duration: + type: number + description: "Duration of the simulated lint step in seconds" + default: 3.0 + scan_duration: + type: number + description: "Duration of the simulated security scan in seconds" + default: 4.0 + item_duration: + type: number + description: "Duration of each with_items work unit in seconds" + default: 2.0 + fail_validation: + type: boolean + description: "If true, the validate step will intentionally fail to exercise the error path" + default: false + +# Output schema (flat format) +output: + status: + type: string + description: "Final workflow status: 'success' or 'failed'" + items_processed: + type: integer + description: "Number of items that were processed" + total_duration: + type: number + description: "Wall-clock duration of the final task in seconds" + +# Tags for categorization +tags: + - demo + - timeline + - workflow + - visualization + - python diff --git a/actions/workflows/timeline_demo.yaml b/actions/workflows/timeline_demo.yaml new file mode 100644 index 0000000..e81fe96 --- /dev/null +++ b/actions/workflows/timeline_demo.yaml @@ -0,0 +1,293 @@ +# Timeline Demo Workflow +# Demonstrates various workflow features and exercises the Timeline DAG visualizer. +# +# This is an action-linked workflow file — action-level metadata (ref, label, +# description, parameters, output, tags) is defined in the companion action +# YAML at actions/timeline_demo.yaml. This file contains only the execution +# graph: version, vars, tasks, and output_map. +# +# Features exercised: +# - Sequential task chains +# - Parallel fan-out (3 branches) and fan-in (merge) +# - with_items expansion with concurrency limiting +# - Failure/error handling paths (succeeded/failed/timed_out transitions) +# - Variable-duration tasks for interesting Gantt chart shapes +# - Publish directives passing data between tasks +# - Custom transition labels and colors via __chart_meta__ +# - Retry configuration +# - Timeout handling +# +# Expected timeline shape (approximate): +# +# initialize ─┬─► build_artifacts ──────────────────────┐ +# ├─► run_linter ──────────┐ ├─► merge_results ─► process_items(×5) ─► validate ─┬─► finalize_success +# └─► security_scan ───────┘ │ └─► handle_failure ─► finalize_failure +# └────────────────┘ + +version: "1.0.0" + +vars: + build_result: null + lint_result: null + scan_result: null + merged_summary: null + items_processed: 0 + validation_passed: false + +tasks: + # ── Stage 1: Initialize ────────────────────────────────────────────── + - name: initialize + action: python_example.simulate_work + input: + duration_seconds: 1.0 + label: "initialize" + output_data: + item_count: "{{ parameters.item_count }}" + started: true + next: + - when: "{{ succeeded() }}" + publish: + - item_count: "{{ parameters.item_count }}" + do: + - build_artifacts + - run_linter + - security_scan + __chart_meta__: + label: "fan-out" + color: "#6366f1" + - when: "{{ failed() }}" + do: + - finalize_failure + __chart_meta__: + label: "init failed" + color: "#ef4444" + + # ── Stage 2a: Build artifacts (longest parallel branch) ────────────── + - name: build_artifacts + action: python_example.simulate_work + input: + duration_seconds: "{{ parameters.build_duration }}" + label: "build" + output_data: + artifact: "app-v1.2.3.tar.gz" + size_mb: 48 + retry: + count: 2 + delay: 1 + backoff: constant + next: + - when: "{{ succeeded() }}" + publish: + - build_result: "{{ result() }}" + do: + - merge_results + __chart_meta__: + label: "build ok" + color: "#22c55e" + - when: "{{ failed() }}" + do: + - handle_failure + __chart_meta__: + label: "build failed" + color: "#ef4444" + + # ── Stage 2b: Run linter (medium parallel branch) ──────────────────── + - name: run_linter + action: python_example.simulate_work + input: + duration_seconds: "{{ parameters.lint_duration }}" + label: "lint" + output_data: + warnings: 3 + errors: 0 + files_checked: 42 + next: + - when: "{{ succeeded() }}" + publish: + - lint_result: "{{ result() }}" + do: + - merge_results + __chart_meta__: + label: "lint clean" + color: "#22c55e" + - when: "{{ failed() }}" + do: + - handle_failure + __chart_meta__: + label: "lint errors" + color: "#ef4444" + + # ── Stage 2c: Security scan (short parallel branch) ────────────────── + - name: security_scan + action: python_example.simulate_work + input: + duration_seconds: "{{ parameters.scan_duration }}" + label: "security-scan" + output_data: + vulnerabilities: 0 + packages_scanned: 128 + timeout: 30 + next: + - when: "{{ succeeded() }}" + publish: + - scan_result: "{{ result() }}" + do: + - merge_results + __chart_meta__: + label: "scan clear" + color: "#22c55e" + - when: "{{ failed() }}" + do: + - handle_failure + __chart_meta__: + label: "scan failed" + color: "#ef4444" + - when: "{{ timed_out() }}" + do: + - handle_failure + __chart_meta__: + label: "scan timed out" + color: "#f97316" + + # ── Stage 3: Merge results (join from 3 parallel branches) ────────── + - name: merge_results + action: python_example.simulate_work + join: 3 + input: + duration_seconds: 1.5 + label: "merge" + output_data: + build: "{{ workflow.build_result }}" + lint: "{{ workflow.lint_result }}" + scan: "{{ workflow.scan_result }}" + next: + - when: "{{ succeeded() }}" + publish: + - merged_summary: "{{ result() }}" + do: + - generate_item_list + __chart_meta__: + label: "merged" + color: "#6366f1" + + # ── Stage 4a: Generate the item list for with_items ────────────────── + - name: generate_item_list + action: python_example.list_numbers + input: + n: "{{ parameters.item_count }}" + start: 1 + next: + - when: "{{ succeeded() }}" + publish: + - number_list: "{{ result().data.items }}" + do: + - process_items + __chart_meta__: + label: "items ready" + color: "#6366f1" + + # ── Stage 4b: Process each item (with_items + concurrency) ────────── + - name: process_items + action: python_example.simulate_work + with_items: "{{ workflow.number_list }}" + concurrency: 3 + input: + duration_seconds: "{{ parameters.item_duration }}" + label: "item-{{ item }}" + output_data: + item_number: "{{ item }}" + index: "{{ index }}" + next: + - when: "{{ succeeded() }}" + publish: + - items_processed: "{{ parameters.item_count }}" + do: + - validate + __chart_meta__: + label: "all items done" + color: "#22c55e" + - when: "{{ failed() }}" + do: + - handle_failure + __chart_meta__: + label: "item failed" + color: "#ef4444" + + # ── Stage 5: Validate everything ───────────────────────────────────── + - name: validate + action: python_example.simulate_work + input: + duration_seconds: 2.0 + label: "validate" + fail: "{{ parameters.fail_validation }}" + fail_after: 1.0 + output_data: + checks_passed: 12 + checks_total: 12 + retry: + count: 1 + delay: 2 + backoff: constant + next: + - when: "{{ succeeded() }}" + publish: + - validation_passed: true + do: + - finalize_success + __chart_meta__: + label: "valid ✓" + color: "#22c55e" + - when: "{{ failed() }}" + publish: + - validation_passed: false + do: + - handle_failure + __chart_meta__: + label: "invalid ✗" + color: "#ef4444" + + # ── Terminal: Success ──────────────────────────────────────────────── + - name: finalize_success + action: python_example.simulate_work + input: + duration_seconds: 1.0 + label: "finalize-success" + output_data: + status: "success" + items_processed: "{{ workflow.items_processed }}" + validation: "{{ workflow.validation_passed }}" + + # ── Error path: Handle failure ─────────────────────────────────────── + - name: handle_failure + action: python_example.simulate_work + input: + duration_seconds: 1.5 + label: "handle-failure" + output_data: + status: "handling_error" + build: "{{ workflow.build_result }}" + lint: "{{ workflow.lint_result }}" + scan: "{{ workflow.scan_result }}" + next: + - when: "{{ succeeded() }}" + do: + - finalize_failure + __chart_meta__: + label: "error handled" + color: "#f97316" + + # ── Terminal: Failure ──────────────────────────────────────────────── + - name: finalize_failure + action: python_example.simulate_work + input: + duration_seconds: 0.5 + label: "finalize-failure" + output_data: + status: "failed" + items_processed: "{{ workflow.items_processed }}" + validation: "{{ workflow.validation_passed }}" + +output_map: + status: "{{ 'success' if workflow.validation_passed else 'failed' }}" + items_processed: "{{ workflow.items_processed }}" + total_duration: "{{ task.finalize_success.result.duration_seconds if workflow.validation_passed else task.finalize_failure.result.duration_seconds }}"