Compare commits
1 Commits
9414ee34e2
...
4df156f210
| Author | SHA1 | Date | |
|---|---|---|---|
| 4df156f210 |
55
README.md
55
README.md
@@ -21,8 +21,18 @@ This pack exercises as many parts of the Attune SDLC as possible:
|
||||
| Ref | Description |
|
||||
|-----|-------------|
|
||||
| `python_example.hello` | Returns `"Hello, Python"` — minimal action |
|
||||
| `python_example.http_example` | Uses `requests` to GET `https://example.com` |
|
||||
| `python_example.http_example` | Uses `urllib` to GET `https://example.com` |
|
||||
| `python_example.read_counter` | Consumes a counter value and returns a formatted message |
|
||||
| `python_example.list_numbers` | Returns a list of sequential integers as JSON |
|
||||
| `python_example.flaky_fail` | Randomly fails with configurable probability — useful for testing error handling and retry logic |
|
||||
| `python_example.simulate_work` | Simulates a unit of work with configurable duration, optional failure, and structured output — useful for testing workflows and the timeline visualizer |
|
||||
| `python_example.artifact_demo` | Creates file and progress artifacts via the Attune API, demonstrating the artifact system |
|
||||
|
||||
### Workflows
|
||||
|
||||
| Ref | Description |
|
||||
|-----|-------------|
|
||||
| `python_example.timeline_demo` | Comprehensive demo workflow exercising parallel fan-out/fan-in, `with_items` concurrency, failure paths, retries, timeouts, publish directives, and custom edge styling — designed to produce a rich Timeline DAG visualization |
|
||||
|
||||
### Triggers
|
||||
|
||||
@@ -188,6 +198,49 @@ attune action execute python_example.read_counter --param counter=99 --param rul
|
||||
# Output: {"message": "Counter value is 99 (from rule: test)", ...}
|
||||
```
|
||||
|
||||
### Test the simulate_work action
|
||||
|
||||
```bash
|
||||
attune action execute python_example.simulate_work \
|
||||
--param duration_seconds=2.0 --param label=demo
|
||||
# Output: {"label": "demo", "duration_seconds": 2.003, "requested_seconds": 2.0, "success": true}
|
||||
|
||||
# Test failure simulation:
|
||||
attune action execute python_example.simulate_work \
|
||||
--param fail=true --param label=crash-test
|
||||
# Exits non-zero with error on stderr
|
||||
```
|
||||
|
||||
### Run the Timeline Demo workflow
|
||||
|
||||
The `timeline_demo` workflow is designed to produce a visually rich Timeline DAG
|
||||
on the execution detail page. It exercises parallel branches, `with_items`
|
||||
expansion, failure handling, and custom transition styling.
|
||||
|
||||
```bash
|
||||
# Happy path (all tasks succeed, ~25s total):
|
||||
attune action execute python_example.timeline_demo
|
||||
|
||||
# With more items and faster durations:
|
||||
attune action execute python_example.timeline_demo \
|
||||
--param item_count=10 --param item_duration=1.0 --param build_duration=4.0
|
||||
|
||||
# Exercise the failure/error-handling path:
|
||||
attune action execute python_example.timeline_demo \
|
||||
--param fail_validation=true
|
||||
|
||||
# Then open the execution detail page in the Web UI to see the Timeline DAG.
|
||||
```
|
||||
|
||||
**What to look for in the Timeline DAG:**
|
||||
|
||||
- **Fan-out** from `initialize` into 3 parallel branches (`build_artifacts`, `run_linter`, `security_scan`) with different durations
|
||||
- **Fan-in** at `merge_results` with a `join: 3` barrier — the bar starts only after the slowest branch completes
|
||||
- **`with_items` expansion** at `process_items` — each item appears as a separate child execution bar, with `concurrency: 3` controlling how many run simultaneously
|
||||
- **Custom edge colors**: indigo for fan-out/merge, green for success, red for failure, orange for timeout/error-handled paths
|
||||
- **Custom edge labels**: "fan-out", "build ok", "lint clean", "scan clear", "valid ✓", "invalid ✗", etc.
|
||||
- **Failure path** (when `fail_validation=true`): the DAG shows the red edge from `validate` → `handle_failure` → `finalize_failure`
|
||||
|
||||
### Enable the rule to start the counter sensor loop
|
||||
|
||||
```bash
|
||||
|
||||
@@ -4,12 +4,35 @@ Artifact Demo Action - Python Example Pack
|
||||
|
||||
Demonstrates creating file and progress artifacts via the Attune API.
|
||||
Each iteration:
|
||||
1. Appends a timestamped log line to a file artifact (via version upload)
|
||||
2. Updates a progress artifact by 2%
|
||||
1. Appends a line to an in-memory log
|
||||
2. Updates a progress artifact
|
||||
3. Sleeps for 0.5 seconds
|
||||
|
||||
The action authenticates to the API using the provided credentials (or defaults)
|
||||
and uses ATTUNE_API_URL / ATTUNE_EXEC_ID environment variables set by the worker.
|
||||
After all iterations complete, the full log is written directly to the
|
||||
shared artifact volume as a **single version** of a file artifact with a
|
||||
stable ref (`python_example.artifact_demo.log`).
|
||||
|
||||
File-based artifact flow (single API call):
|
||||
1. POST /api/v1/artifacts/ref/{ref}/versions/file — upserts the artifact
|
||||
(creating it if it doesn't exist) and allocates a version number,
|
||||
returning a relative `file_path`
|
||||
2. Write the file to $ATTUNE_ARTIFACTS_DIR/{file_path} on the shared volume
|
||||
|
||||
No HTTP upload is needed — the worker and action process share an artifact
|
||||
volume, so the action writes directly to disk.
|
||||
|
||||
The progress artifact is still per-execution (ephemeral status indicator).
|
||||
|
||||
Parameters:
|
||||
iterations - Number of iterations to run (default: 50)
|
||||
visibility - Artifact visibility level for file artifacts: "public" or "private"
|
||||
(default: "private"). Public artifacts are viewable by all authenticated
|
||||
users on the platform. Private artifacts are restricted based on their
|
||||
scope/owner fields.
|
||||
|
||||
Note: Progress artifacts always use the server default visibility (public), since
|
||||
they are informational status indicators that anyone watching an execution should
|
||||
be able to see. The visibility parameter only controls file artifacts.
|
||||
"""
|
||||
|
||||
import json
|
||||
@@ -49,67 +72,51 @@ def api_request(
|
||||
raise
|
||||
|
||||
|
||||
def multipart_upload(
|
||||
base_url, path, file_bytes, filename, token, content_type="text/plain"
|
||||
):
|
||||
"""Upload a file via multipart/form-data."""
|
||||
boundary = f"----AttuneArtifact{int(time.time() * 1000)}"
|
||||
body_parts = []
|
||||
|
||||
# file field
|
||||
body_parts.append(f"--{boundary}\r\n".encode())
|
||||
body_parts.append(
|
||||
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode()
|
||||
)
|
||||
body_parts.append(f"Content-Type: {content_type}\r\n\r\n".encode())
|
||||
body_parts.append(file_bytes)
|
||||
body_parts.append(b"\r\n")
|
||||
|
||||
# content_type field
|
||||
body_parts.append(f"--{boundary}\r\n".encode())
|
||||
body_parts.append(b'Content-Disposition: form-data; name="content_type"\r\n\r\n')
|
||||
body_parts.append(content_type.encode())
|
||||
body_parts.append(b"\r\n")
|
||||
|
||||
# created_by field
|
||||
body_parts.append(f"--{boundary}\r\n".encode())
|
||||
body_parts.append(b'Content-Disposition: form-data; name="created_by"\r\n\r\n')
|
||||
body_parts.append(b"python_example.artifact_demo")
|
||||
body_parts.append(b"\r\n")
|
||||
|
||||
body_parts.append(f"--{boundary}--\r\n".encode())
|
||||
|
||||
full_body = b"".join(body_parts)
|
||||
|
||||
url = f"{base_url}{path}"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": f"multipart/form-data; boundary={boundary}",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
req = urllib.request.Request(url, data=full_body, headers=headers, method="POST")
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode("utf-8")), resp.status
|
||||
except urllib.error.HTTPError as e:
|
||||
error_body = e.read().decode("utf-8", errors="replace")
|
||||
print(f"Upload error {e.code} on {path}: {error_body}", file=sys.stderr)
|
||||
raise
|
||||
|
||||
|
||||
def login(base_url, username, password):
|
||||
"""Authenticate and return a JWT token."""
|
||||
data, _ = api_request(
|
||||
def allocate_file_version_by_ref(
|
||||
base_url,
|
||||
"/auth/login",
|
||||
token,
|
||||
artifact_ref,
|
||||
execution_id=None,
|
||||
visibility=None,
|
||||
content_type="text/plain",
|
||||
name=None,
|
||||
description=None,
|
||||
):
|
||||
"""Upsert an artifact by ref and allocate a file-backed version.
|
||||
|
||||
Single API call that creates the artifact if it doesn't exist (or
|
||||
reuses the existing one) and allocates a new version with a file_path.
|
||||
|
||||
Returns:
|
||||
(artifact_id, version_id, file_path) tuple
|
||||
"""
|
||||
payload = {
|
||||
"scope": "action",
|
||||
"owner": "python_example.artifact_demo",
|
||||
"type": "file_text",
|
||||
"retention_policy": "versions",
|
||||
"retention_limit": 10,
|
||||
"content_type": content_type,
|
||||
"created_by": "python_example.artifact_demo",
|
||||
}
|
||||
if execution_id is not None:
|
||||
payload["execution"] = execution_id
|
||||
if visibility is not None:
|
||||
payload["visibility"] = visibility
|
||||
if name is not None:
|
||||
payload["name"] = name
|
||||
if description is not None:
|
||||
payload["description"] = description
|
||||
|
||||
resp, _ = api_request(
|
||||
base_url,
|
||||
f"/api/v1/artifacts/ref/{artifact_ref}/versions/file",
|
||||
method="POST",
|
||||
data={
|
||||
"username": username,
|
||||
"password": password,
|
||||
},
|
||||
data=payload,
|
||||
token=token,
|
||||
)
|
||||
return data["data"]["access_token"]
|
||||
version_data = resp["data"]
|
||||
return version_data["artifact"], version_data["id"], version_data["file_path"]
|
||||
|
||||
|
||||
def create_artifact(
|
||||
@@ -119,7 +126,7 @@ def create_artifact(
|
||||
name,
|
||||
artifact_type,
|
||||
execution_id,
|
||||
content_type=None,
|
||||
visibility=None,
|
||||
description=None,
|
||||
data=None,
|
||||
):
|
||||
@@ -134,9 +141,9 @@ def create_artifact(
|
||||
"name": name,
|
||||
"execution": execution_id,
|
||||
}
|
||||
if content_type:
|
||||
payload["content_type"] = content_type
|
||||
if description:
|
||||
if visibility is not None:
|
||||
payload["visibility"] = visibility
|
||||
if description is not None:
|
||||
payload["description"] = description
|
||||
if data is not None:
|
||||
payload["data"] = data
|
||||
@@ -165,50 +172,73 @@ def main():
|
||||
# Read parameters from stdin (JSON format)
|
||||
params = json.loads(sys.stdin.readline())
|
||||
iterations = params.get("iterations", 50)
|
||||
username = params.get("username") or os.environ.get(
|
||||
"ATTUNE_USERNAME", "test@attune.local"
|
||||
)
|
||||
password = params.get("password") or os.environ.get(
|
||||
"ATTUNE_PASSWORD", "TestPass123!"
|
||||
visibility = params.get("visibility", "private")
|
||||
|
||||
# Validate visibility value
|
||||
if visibility not in ("public", "private"):
|
||||
raise ValueError(
|
||||
f"Invalid visibility '{visibility}': must be 'public' or 'private'"
|
||||
)
|
||||
|
||||
# Get execution context from environment
|
||||
api_url = os.environ.get("ATTUNE_API_URL", "http://localhost:8080")
|
||||
api_url = os.environ.get("ATTUNE_API_URL", "")
|
||||
token = os.environ.get("ATTUNE_API_TOKEN", "")
|
||||
exec_id_str = os.environ.get("ATTUNE_EXEC_ID", "")
|
||||
artifacts_dir = os.environ.get("ATTUNE_ARTIFACTS_DIR", "")
|
||||
execution_id = int(exec_id_str) if exec_id_str else None
|
||||
|
||||
if not api_url:
|
||||
raise RuntimeError(
|
||||
"ATTUNE_API_URL environment variable is not set. "
|
||||
"This action must be run by the Attune worker."
|
||||
)
|
||||
if not token:
|
||||
raise RuntimeError(
|
||||
"ATTUNE_API_TOKEN environment variable is not set. "
|
||||
"This action must be run by the Attune worker."
|
||||
)
|
||||
if not artifacts_dir:
|
||||
raise RuntimeError(
|
||||
"ATTUNE_ARTIFACTS_DIR environment variable is not set. "
|
||||
"This action must be run by the Attune worker."
|
||||
)
|
||||
|
||||
print(
|
||||
f"Artifact demo starting: {iterations} iterations, API at {api_url}",
|
||||
f"Artifact demo starting: {iterations} iterations, "
|
||||
f"visibility={visibility}, API at {api_url}, "
|
||||
f"artifacts_dir={artifacts_dir}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# Authenticate
|
||||
token = login(api_url, username, password)
|
||||
print("Authenticated successfully", file=sys.stderr)
|
||||
# ----------------------------------------------------------------
|
||||
# File artifact — single call upserts artifact + allocates version
|
||||
# ----------------------------------------------------------------
|
||||
file_ref = "python_example.artifact_demo.log"
|
||||
|
||||
# Build unique artifact refs using execution ID to avoid collisions
|
||||
ts = int(time.time())
|
||||
ref_suffix = f"{execution_id}_{ts}" if execution_id else str(ts)
|
||||
file_ref = f"python_example.artifact_demo.log.{ref_suffix}"
|
||||
progress_ref = f"python_example.artifact_demo.progress.{ref_suffix}"
|
||||
|
||||
# Create file artifact (file_text type)
|
||||
file_artifact_id = create_artifact(
|
||||
file_artifact_id, version_id, file_path = allocate_file_version_by_ref(
|
||||
base_url=api_url,
|
||||
token=token,
|
||||
ref=file_ref,
|
||||
name="Artifact Demo Log",
|
||||
artifact_type="file_text",
|
||||
artifact_ref=file_ref,
|
||||
execution_id=execution_id,
|
||||
visibility=visibility,
|
||||
content_type="text/plain",
|
||||
description=f"Log output from artifact demo ({iterations} iterations)",
|
||||
name="Demo Log",
|
||||
description="Log output from the artifact demo action",
|
||||
)
|
||||
full_file_path = os.path.join(artifacts_dir, file_path)
|
||||
print(
|
||||
f"Created file artifact ID={file_artifact_id} ref={file_ref}",
|
||||
f"Allocated file artifact ref={file_ref} id={file_artifact_id} "
|
||||
f"version={version_id} path={file_path}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# Create progress artifact
|
||||
# ----------------------------------------------------------------
|
||||
# Progress artifact — per-execution (ephemeral status indicator)
|
||||
# ----------------------------------------------------------------
|
||||
ts = int(time.time())
|
||||
ref_suffix = f"{execution_id}_{ts}" if execution_id else str(ts)
|
||||
progress_ref = f"python_example.artifact_demo.progress.{ref_suffix}"
|
||||
|
||||
progress_artifact_id = create_artifact(
|
||||
base_url=api_url,
|
||||
token=token,
|
||||
@@ -220,11 +250,14 @@ def main():
|
||||
data=[], # Initialize with empty array
|
||||
)
|
||||
print(
|
||||
f"Created progress artifact ID={progress_artifact_id} ref={progress_ref}",
|
||||
f"Created progress artifact ID={progress_artifact_id} ref={progress_ref} "
|
||||
f"visibility=public (server default)",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# Run iterations
|
||||
# ----------------------------------------------------------------
|
||||
# Run iterations — collect log lines, write file at the end
|
||||
# ----------------------------------------------------------------
|
||||
log_lines = []
|
||||
for i in range(iterations):
|
||||
iteration = i + 1
|
||||
@@ -237,17 +270,6 @@ def main():
|
||||
|
||||
print(f" Iteration {iteration}/{iterations} ({pct}%)", file=sys.stderr)
|
||||
|
||||
# Upload the full log as a new file version
|
||||
full_log = "\n".join(log_lines) + "\n"
|
||||
multipart_upload(
|
||||
base_url=api_url,
|
||||
path=f"/api/v1/artifacts/{file_artifact_id}/versions/upload",
|
||||
file_bytes=full_log.encode("utf-8"),
|
||||
filename="artifact_demo.log",
|
||||
token=token,
|
||||
content_type="text/plain",
|
||||
)
|
||||
|
||||
# Append progress entry
|
||||
append_progress(
|
||||
api_url,
|
||||
@@ -266,11 +288,26 @@ def main():
|
||||
if iteration < iterations:
|
||||
time.sleep(0.5)
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# Write the complete log file to the shared artifact volume
|
||||
# ----------------------------------------------------------------
|
||||
full_log = "\n".join(log_lines) + "\n"
|
||||
with open(full_file_path, "w", encoding="utf-8") as f:
|
||||
f.write(full_log)
|
||||
print(
|
||||
f"Wrote {len(full_log)} bytes to {full_file_path}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
elapsed = round(time.time() - start_time, 3)
|
||||
result = {
|
||||
"file_artifact_id": file_artifact_id,
|
||||
"file_artifact_ref": file_ref,
|
||||
"file_version_id": version_id,
|
||||
"file_path": file_path,
|
||||
"progress_artifact_id": progress_artifact_id,
|
||||
"iterations_completed": iterations,
|
||||
"visibility": visibility,
|
||||
"elapsed_seconds": elapsed,
|
||||
"success": True,
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
ref: python_example.artifact_demo
|
||||
label: "Artifact Demo"
|
||||
description: "Creates a file artifact and a progress artifact, writing lines and updating progress over multiple iterations"
|
||||
description: "Creates a file artifact (written directly to the shared volume) and a progress artifact, updating progress over multiple iterations"
|
||||
enabled: true
|
||||
|
||||
# Runner type determines how the action is executed
|
||||
@@ -26,34 +26,48 @@ output_format: json
|
||||
parameters:
|
||||
iterations:
|
||||
type: integer
|
||||
description: "Number of iterations to run (each adds a log line and 2% progress)"
|
||||
description: "Number of iterations to run (each adds a log line and updates progress)"
|
||||
default: 50
|
||||
minimum: 1
|
||||
maximum: 200
|
||||
username:
|
||||
visibility:
|
||||
type: string
|
||||
description: "API username for authentication (defaults to ATTUNE_USERNAME env var or test@attune.local)"
|
||||
default: "test@attune.local"
|
||||
password:
|
||||
type: string
|
||||
description: "API password for authentication (defaults to ATTUNE_PASSWORD env var or TestPass123!)"
|
||||
secret: true
|
||||
default: "TestPass123!"
|
||||
description: "Artifact visibility level: public (all authenticated users) or private (scope/owner restricted)"
|
||||
default: "private"
|
||||
enum:
|
||||
- "public"
|
||||
- "private"
|
||||
|
||||
# Output schema (flat format)
|
||||
output_schema:
|
||||
file_artifact_id:
|
||||
type: integer
|
||||
description: "ID of the created file artifact"
|
||||
description: "ID of the file artifact"
|
||||
required: true
|
||||
file_artifact_ref:
|
||||
type: string
|
||||
description: "Stable ref of the file artifact"
|
||||
required: true
|
||||
file_version_id:
|
||||
type: integer
|
||||
description: "ID of the file artifact version created for this execution"
|
||||
required: true
|
||||
file_path:
|
||||
type: string
|
||||
description: "Relative path of the file on the shared artifact volume"
|
||||
required: true
|
||||
progress_artifact_id:
|
||||
type: integer
|
||||
description: "ID of the created progress artifact"
|
||||
description: "ID of the progress artifact"
|
||||
required: true
|
||||
iterations_completed:
|
||||
type: integer
|
||||
description: "Number of iterations completed"
|
||||
required: true
|
||||
visibility:
|
||||
type: string
|
||||
description: "Visibility level that was applied to the file artifact"
|
||||
required: true
|
||||
success:
|
||||
type: boolean
|
||||
description: "Whether the demo completed successfully"
|
||||
|
||||
52
actions/flaky_fail.py
Normal file
52
actions/flaky_fail.py
Normal file
@@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Flaky Fail Action - Python Example Pack
|
||||
|
||||
A Python action that randomly fails with a configurable probability.
|
||||
Useful for testing error handling, retry logic, and workflow failure paths.
|
||||
|
||||
Actions receive parameters as JSON on stdin and write results to stdout.
|
||||
"""
|
||||
|
||||
import json
|
||||
import random
|
||||
import sys
|
||||
|
||||
|
||||
def main():
|
||||
# Read parameters from stdin (JSON format)
|
||||
params = json.loads(sys.stdin.readline())
|
||||
failure_probability = float(params.get("failure_probability", 0.1))
|
||||
|
||||
# Clamp to valid range
|
||||
failure_probability = max(0.0, min(1.0, failure_probability))
|
||||
|
||||
roll = random.random()
|
||||
failed = roll < failure_probability
|
||||
|
||||
if failed:
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"error": "Random failure triggered",
|
||||
"failure_probability": failure_probability,
|
||||
"roll": round(roll, 6),
|
||||
}
|
||||
),
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"message": "Success! Did not fail this time.",
|
||||
"failure_probability": failure_probability,
|
||||
"roll": round(roll, 6),
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
52
actions/flaky_fail.yaml
Normal file
52
actions/flaky_fail.yaml
Normal file
@@ -0,0 +1,52 @@
|
||||
# Flaky Fail Action
|
||||
# Randomly fails with a configurable probability, useful for testing error handling and retry logic
|
||||
|
||||
ref: python_example.flaky_fail
|
||||
label: "Flaky Fail"
|
||||
description: "An action that randomly fails with a configurable probability. Useful for testing error handling, retry logic, and workflow failure paths."
|
||||
enabled: true
|
||||
|
||||
# Runner type determines how the action is executed
|
||||
runner_type: python
|
||||
|
||||
# Minimum Python version required (semver constraint)
|
||||
runtime_version: ">=3.9"
|
||||
|
||||
# Entry point is the Python script to execute
|
||||
entry_point: flaky_fail.py
|
||||
|
||||
# Parameter delivery: stdin for secure parameter passing
|
||||
parameter_delivery: stdin
|
||||
parameter_format: json
|
||||
|
||||
# Output format: json (structured data parsing enabled)
|
||||
output_format: json
|
||||
|
||||
# Action parameters schema (flat format with inline required/secret)
|
||||
parameters:
|
||||
failure_probability:
|
||||
type: number
|
||||
description: "Probability of failure between 0.0 (never fail) and 1.0 (always fail)"
|
||||
default: 0.1
|
||||
|
||||
# Output schema (flat format)
|
||||
output_schema:
|
||||
succeeded:
|
||||
type: boolean
|
||||
description: "Whether the action succeeded (always true when it doesn't fail)"
|
||||
required: true
|
||||
roll:
|
||||
type: number
|
||||
description: "The random value that was rolled (0.0 to 1.0)"
|
||||
required: true
|
||||
threshold:
|
||||
type: number
|
||||
description: "The failure probability threshold that was used"
|
||||
required: true
|
||||
|
||||
# Tags for categorization
|
||||
tags:
|
||||
- python
|
||||
- example
|
||||
- testing
|
||||
- error-handling
|
||||
105
actions/simulate_work.py
Normal file
105
actions/simulate_work.py
Normal file
@@ -0,0 +1,105 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simulate Work Action - Python Example Pack
|
||||
|
||||
Simulates a unit of work that takes a configurable amount of time.
|
||||
Returns structured JSON output with timing information and an optional
|
||||
payload. Useful for testing workflows, the timeline DAG visualizer,
|
||||
and execution monitoring.
|
||||
|
||||
Parameters (via stdin JSON):
|
||||
duration_seconds - How long to simulate work (float, default: 1.0)
|
||||
label - A label for this work unit (string, default: "work")
|
||||
fail - Whether to simulate a failure (bool, default: false)
|
||||
fail_after - If failing, fail after this many seconds (float, default: 0)
|
||||
output_data - Arbitrary JSON data to include in the result (default: null)
|
||||
|
||||
Output (JSON):
|
||||
label - The label that was passed in
|
||||
duration_seconds - Actual elapsed time
|
||||
requested_seconds - The requested duration
|
||||
output_data - The pass-through data (if any)
|
||||
success - Always true on success (failures exit non-zero)
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
||||
def main():
|
||||
params = json.loads(sys.stdin.readline())
|
||||
|
||||
duration_seconds = float(params.get("duration_seconds", 1.0))
|
||||
label = params.get("label", "work")
|
||||
fail = params.get("fail", False)
|
||||
fail_after = float(params.get("fail_after", 0))
|
||||
output_data = params.get("output_data", None)
|
||||
|
||||
# Clamp duration to a reasonable range
|
||||
duration_seconds = max(0.0, min(duration_seconds, 300.0))
|
||||
|
||||
print(
|
||||
f"[simulate_work] Starting '{label}' for {duration_seconds}s", file=sys.stderr
|
||||
)
|
||||
|
||||
start = time.time()
|
||||
|
||||
if fail and fail_after > 0:
|
||||
# Sleep for fail_after seconds then crash
|
||||
time.sleep(min(fail_after, duration_seconds))
|
||||
elapsed = round(time.time() - start, 3)
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"error": f"Simulated failure in '{label}' after {elapsed}s",
|
||||
"label": label,
|
||||
"elapsed": elapsed,
|
||||
}
|
||||
),
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
if fail:
|
||||
# Immediate failure
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"error": f"Simulated immediate failure in '{label}'",
|
||||
"label": label,
|
||||
}
|
||||
),
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Simulate work with periodic progress to stderr
|
||||
remaining = duration_seconds
|
||||
while remaining > 0:
|
||||
chunk = min(remaining, 1.0)
|
||||
time.sleep(chunk)
|
||||
remaining -= chunk
|
||||
elapsed = round(time.time() - start, 3)
|
||||
pct = min(100.0, round((elapsed / max(duration_seconds, 0.001)) * 100, 1))
|
||||
print(
|
||||
f"[simulate_work] '{label}' progress: {pct}% ({elapsed}s)", file=sys.stderr
|
||||
)
|
||||
|
||||
elapsed = round(time.time() - start, 3)
|
||||
|
||||
result = {
|
||||
"label": label,
|
||||
"duration_seconds": elapsed,
|
||||
"requested_seconds": duration_seconds,
|
||||
"success": True,
|
||||
}
|
||||
if output_data is not None:
|
||||
result["output_data"] = output_data
|
||||
|
||||
print(json.dumps(result))
|
||||
print(f"[simulate_work] '{label}' completed in {elapsed}s", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
76
actions/simulate_work.yaml
Normal file
76
actions/simulate_work.yaml
Normal file
@@ -0,0 +1,76 @@
|
||||
# Simulate Work Action
|
||||
# Simulates a unit of work with configurable duration and structured output.
|
||||
# Useful for testing workflows, the timeline DAG visualizer, and execution monitoring.
|
||||
|
||||
ref: python_example.simulate_work
|
||||
label: "Simulate Work"
|
||||
description: "Simulates a unit of work that takes a configurable amount of time, returning structured JSON with timing info. Supports simulated failures for testing error-handling paths."
|
||||
enabled: true
|
||||
|
||||
# Runner type determines how the action is executed
|
||||
runner_type: python
|
||||
|
||||
# Minimum Python version required (semver constraint)
|
||||
runtime_version: ">=3.9"
|
||||
|
||||
# Entry point is the Python script to execute
|
||||
entry_point: simulate_work.py
|
||||
|
||||
# Parameter delivery: stdin for secure parameter passing
|
||||
parameter_delivery: stdin
|
||||
parameter_format: json
|
||||
|
||||
# Output format: json (structured data parsing enabled)
|
||||
output_format: json
|
||||
|
||||
# Action parameters schema (flat format with inline required/secret)
|
||||
parameters:
|
||||
duration_seconds:
|
||||
type: number
|
||||
description: "How long to simulate work in seconds (clamped to 0–300)"
|
||||
default: 1.0
|
||||
label:
|
||||
type: string
|
||||
description: "A human-readable label for this work unit"
|
||||
default: "work"
|
||||
fail:
|
||||
type: boolean
|
||||
description: "Whether to simulate a failure (exits non-zero)"
|
||||
default: false
|
||||
fail_after:
|
||||
type: number
|
||||
description: "If failing, wait this many seconds before crashing (0 = immediate)"
|
||||
default: 0
|
||||
output_data:
|
||||
type: object
|
||||
description: "Arbitrary JSON data to pass through to the result"
|
||||
|
||||
# Output schema (flat format)
|
||||
output_schema:
|
||||
label:
|
||||
type: string
|
||||
description: "The label that was passed in"
|
||||
required: true
|
||||
duration_seconds:
|
||||
type: number
|
||||
description: "Actual elapsed wall-clock time in seconds"
|
||||
required: true
|
||||
requested_seconds:
|
||||
type: number
|
||||
description: "The requested duration"
|
||||
required: true
|
||||
output_data:
|
||||
type: object
|
||||
description: "The pass-through data (if provided)"
|
||||
success:
|
||||
type: boolean
|
||||
description: "Always true on success (failures exit non-zero)"
|
||||
required: true
|
||||
|
||||
# Tags for categorization
|
||||
tags:
|
||||
- python
|
||||
- example
|
||||
- testing
|
||||
- workflow
|
||||
- simulation
|
||||
71
actions/timeline_demo.yaml
Normal file
71
actions/timeline_demo.yaml
Normal file
@@ -0,0 +1,71 @@
|
||||
# Timeline Demo Workflow Action
|
||||
# Action metadata for the timeline_demo workflow definition.
|
||||
#
|
||||
# The workflow graph (tasks, transitions, variables) lives in the separate
|
||||
# workflow file referenced by `workflow_file`. This action YAML controls
|
||||
# action-level concerns: ref, label, parameter schema, output schema, tags,
|
||||
# and (in future) execution policies.
|
||||
#
|
||||
# Multiple actions can reference the same workflow file with different
|
||||
# parameter schemas or policy configurations.
|
||||
|
||||
ref: python_example.timeline_demo
|
||||
label: "Timeline DAG Demo"
|
||||
description: >
|
||||
A comprehensive demo workflow that exercises every feature of the
|
||||
Workflow Timeline DAG visualizer: parallel branches with different
|
||||
durations, with_items fan-out, failure handling paths, publish
|
||||
directives, retries, timeouts, and custom edge styling.
|
||||
enabled: true
|
||||
|
||||
# Reference to the workflow definition file (relative to actions/ directory)
|
||||
workflow_file: workflows/timeline_demo.yaml
|
||||
|
||||
# Action parameters schema (flat format with inline required/secret)
|
||||
# These are the inputs exposed when executing this action. They map to
|
||||
# the workflow's `parameters` block but are authoritative for the action.
|
||||
parameters:
|
||||
item_count:
|
||||
type: integer
|
||||
description: "Number of items to process in the with_items stage (2–20)"
|
||||
default: 5
|
||||
build_duration:
|
||||
type: number
|
||||
description: "Duration of the simulated build step in seconds"
|
||||
default: 6.0
|
||||
lint_duration:
|
||||
type: number
|
||||
description: "Duration of the simulated lint step in seconds"
|
||||
default: 3.0
|
||||
scan_duration:
|
||||
type: number
|
||||
description: "Duration of the simulated security scan in seconds"
|
||||
default: 4.0
|
||||
item_duration:
|
||||
type: number
|
||||
description: "Duration of each with_items work unit in seconds"
|
||||
default: 2.0
|
||||
fail_validation:
|
||||
type: boolean
|
||||
description: "If true, the validate step will intentionally fail to exercise the error path"
|
||||
default: false
|
||||
|
||||
# Output schema (flat format)
|
||||
output:
|
||||
status:
|
||||
type: string
|
||||
description: "Final workflow status: 'success' or 'failed'"
|
||||
items_processed:
|
||||
type: integer
|
||||
description: "Number of items that were processed"
|
||||
total_duration:
|
||||
type: number
|
||||
description: "Wall-clock duration of the final task in seconds"
|
||||
|
||||
# Tags for categorization
|
||||
tags:
|
||||
- demo
|
||||
- timeline
|
||||
- workflow
|
||||
- visualization
|
||||
- python
|
||||
293
actions/workflows/timeline_demo.yaml
Normal file
293
actions/workflows/timeline_demo.yaml
Normal file
@@ -0,0 +1,293 @@
|
||||
# Timeline Demo Workflow
|
||||
# Demonstrates various workflow features and exercises the Timeline DAG visualizer.
|
||||
#
|
||||
# This is an action-linked workflow file — action-level metadata (ref, label,
|
||||
# description, parameters, output, tags) is defined in the companion action
|
||||
# YAML at actions/timeline_demo.yaml. This file contains only the execution
|
||||
# graph: version, vars, tasks, and output_map.
|
||||
#
|
||||
# Features exercised:
|
||||
# - Sequential task chains
|
||||
# - Parallel fan-out (3 branches) and fan-in (merge)
|
||||
# - with_items expansion with concurrency limiting
|
||||
# - Failure/error handling paths (succeeded/failed/timed_out transitions)
|
||||
# - Variable-duration tasks for interesting Gantt chart shapes
|
||||
# - Publish directives passing data between tasks
|
||||
# - Custom transition labels and colors via __chart_meta__
|
||||
# - Retry configuration
|
||||
# - Timeout handling
|
||||
#
|
||||
# Expected timeline shape (approximate):
|
||||
#
|
||||
# initialize ─┬─► build_artifacts ──────────────────────┐
|
||||
# ├─► run_linter ──────────┐ ├─► merge_results ─► process_items(×5) ─► validate ─┬─► finalize_success
|
||||
# └─► security_scan ───────┘ │ └─► handle_failure ─► finalize_failure
|
||||
# └────────────────┘
|
||||
|
||||
version: "1.0.0"
|
||||
|
||||
vars:
|
||||
build_result: null
|
||||
lint_result: null
|
||||
scan_result: null
|
||||
merged_summary: null
|
||||
items_processed: 0
|
||||
validation_passed: false
|
||||
|
||||
tasks:
|
||||
# ── Stage 1: Initialize ──────────────────────────────────────────────
|
||||
- name: initialize
|
||||
action: python_example.simulate_work
|
||||
input:
|
||||
duration_seconds: 1.0
|
||||
label: "initialize"
|
||||
output_data:
|
||||
item_count: "{{ parameters.item_count }}"
|
||||
started: true
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
publish:
|
||||
- item_count: "{{ parameters.item_count }}"
|
||||
do:
|
||||
- build_artifacts
|
||||
- run_linter
|
||||
- security_scan
|
||||
__chart_meta__:
|
||||
label: "fan-out"
|
||||
color: "#6366f1"
|
||||
- when: "{{ failed() }}"
|
||||
do:
|
||||
- finalize_failure
|
||||
__chart_meta__:
|
||||
label: "init failed"
|
||||
color: "#ef4444"
|
||||
|
||||
# ── Stage 2a: Build artifacts (longest parallel branch) ──────────────
|
||||
- name: build_artifacts
|
||||
action: python_example.simulate_work
|
||||
input:
|
||||
duration_seconds: "{{ parameters.build_duration }}"
|
||||
label: "build"
|
||||
output_data:
|
||||
artifact: "app-v1.2.3.tar.gz"
|
||||
size_mb: 48
|
||||
retry:
|
||||
count: 2
|
||||
delay: 1
|
||||
backoff: constant
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
publish:
|
||||
- build_result: "{{ result() }}"
|
||||
do:
|
||||
- merge_results
|
||||
__chart_meta__:
|
||||
label: "build ok"
|
||||
color: "#22c55e"
|
||||
- when: "{{ failed() }}"
|
||||
do:
|
||||
- handle_failure
|
||||
__chart_meta__:
|
||||
label: "build failed"
|
||||
color: "#ef4444"
|
||||
|
||||
# ── Stage 2b: Run linter (medium parallel branch) ────────────────────
|
||||
- name: run_linter
|
||||
action: python_example.simulate_work
|
||||
input:
|
||||
duration_seconds: "{{ parameters.lint_duration }}"
|
||||
label: "lint"
|
||||
output_data:
|
||||
warnings: 3
|
||||
errors: 0
|
||||
files_checked: 42
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
publish:
|
||||
- lint_result: "{{ result() }}"
|
||||
do:
|
||||
- merge_results
|
||||
__chart_meta__:
|
||||
label: "lint clean"
|
||||
color: "#22c55e"
|
||||
- when: "{{ failed() }}"
|
||||
do:
|
||||
- handle_failure
|
||||
__chart_meta__:
|
||||
label: "lint errors"
|
||||
color: "#ef4444"
|
||||
|
||||
# ── Stage 2c: Security scan (short parallel branch) ──────────────────
|
||||
- name: security_scan
|
||||
action: python_example.simulate_work
|
||||
input:
|
||||
duration_seconds: "{{ parameters.scan_duration }}"
|
||||
label: "security-scan"
|
||||
output_data:
|
||||
vulnerabilities: 0
|
||||
packages_scanned: 128
|
||||
timeout: 30
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
publish:
|
||||
- scan_result: "{{ result() }}"
|
||||
do:
|
||||
- merge_results
|
||||
__chart_meta__:
|
||||
label: "scan clear"
|
||||
color: "#22c55e"
|
||||
- when: "{{ failed() }}"
|
||||
do:
|
||||
- handle_failure
|
||||
__chart_meta__:
|
||||
label: "scan failed"
|
||||
color: "#ef4444"
|
||||
- when: "{{ timed_out() }}"
|
||||
do:
|
||||
- handle_failure
|
||||
__chart_meta__:
|
||||
label: "scan timed out"
|
||||
color: "#f97316"
|
||||
|
||||
# ── Stage 3: Merge results (join from 3 parallel branches) ──────────
|
||||
- name: merge_results
|
||||
action: python_example.simulate_work
|
||||
join: 3
|
||||
input:
|
||||
duration_seconds: 1.5
|
||||
label: "merge"
|
||||
output_data:
|
||||
build: "{{ workflow.build_result }}"
|
||||
lint: "{{ workflow.lint_result }}"
|
||||
scan: "{{ workflow.scan_result }}"
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
publish:
|
||||
- merged_summary: "{{ result() }}"
|
||||
do:
|
||||
- generate_item_list
|
||||
__chart_meta__:
|
||||
label: "merged"
|
||||
color: "#6366f1"
|
||||
|
||||
# ── Stage 4a: Generate the item list for with_items ──────────────────
|
||||
- name: generate_item_list
|
||||
action: python_example.list_numbers
|
||||
input:
|
||||
n: "{{ parameters.item_count }}"
|
||||
start: 1
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
publish:
|
||||
- number_list: "{{ result().data.items }}"
|
||||
do:
|
||||
- process_items
|
||||
__chart_meta__:
|
||||
label: "items ready"
|
||||
color: "#6366f1"
|
||||
|
||||
# ── Stage 4b: Process each item (with_items + concurrency) ──────────
|
||||
- name: process_items
|
||||
action: python_example.simulate_work
|
||||
with_items: "{{ workflow.number_list }}"
|
||||
concurrency: 3
|
||||
input:
|
||||
duration_seconds: "{{ parameters.item_duration }}"
|
||||
label: "item-{{ item }}"
|
||||
output_data:
|
||||
item_number: "{{ item }}"
|
||||
index: "{{ index }}"
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
publish:
|
||||
- items_processed: "{{ parameters.item_count }}"
|
||||
do:
|
||||
- validate
|
||||
__chart_meta__:
|
||||
label: "all items done"
|
||||
color: "#22c55e"
|
||||
- when: "{{ failed() }}"
|
||||
do:
|
||||
- handle_failure
|
||||
__chart_meta__:
|
||||
label: "item failed"
|
||||
color: "#ef4444"
|
||||
|
||||
# ── Stage 5: Validate everything ─────────────────────────────────────
|
||||
- name: validate
|
||||
action: python_example.simulate_work
|
||||
input:
|
||||
duration_seconds: 2.0
|
||||
label: "validate"
|
||||
fail: "{{ parameters.fail_validation }}"
|
||||
fail_after: 1.0
|
||||
output_data:
|
||||
checks_passed: 12
|
||||
checks_total: 12
|
||||
retry:
|
||||
count: 1
|
||||
delay: 2
|
||||
backoff: constant
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
publish:
|
||||
- validation_passed: true
|
||||
do:
|
||||
- finalize_success
|
||||
__chart_meta__:
|
||||
label: "valid ✓"
|
||||
color: "#22c55e"
|
||||
- when: "{{ failed() }}"
|
||||
publish:
|
||||
- validation_passed: false
|
||||
do:
|
||||
- handle_failure
|
||||
__chart_meta__:
|
||||
label: "invalid ✗"
|
||||
color: "#ef4444"
|
||||
|
||||
# ── Terminal: Success ────────────────────────────────────────────────
|
||||
- name: finalize_success
|
||||
action: python_example.simulate_work
|
||||
input:
|
||||
duration_seconds: 1.0
|
||||
label: "finalize-success"
|
||||
output_data:
|
||||
status: "success"
|
||||
items_processed: "{{ workflow.items_processed }}"
|
||||
validation: "{{ workflow.validation_passed }}"
|
||||
|
||||
# ── Error path: Handle failure ───────────────────────────────────────
|
||||
- name: handle_failure
|
||||
action: python_example.simulate_work
|
||||
input:
|
||||
duration_seconds: 1.5
|
||||
label: "handle-failure"
|
||||
output_data:
|
||||
status: "handling_error"
|
||||
build: "{{ workflow.build_result }}"
|
||||
lint: "{{ workflow.lint_result }}"
|
||||
scan: "{{ workflow.scan_result }}"
|
||||
next:
|
||||
- when: "{{ succeeded() }}"
|
||||
do:
|
||||
- finalize_failure
|
||||
__chart_meta__:
|
||||
label: "error handled"
|
||||
color: "#f97316"
|
||||
|
||||
# ── Terminal: Failure ────────────────────────────────────────────────
|
||||
- name: finalize_failure
|
||||
action: python_example.simulate_work
|
||||
input:
|
||||
duration_seconds: 0.5
|
||||
label: "finalize-failure"
|
||||
output_data:
|
||||
status: "failed"
|
||||
items_processed: "{{ workflow.items_processed }}"
|
||||
validation: "{{ workflow.validation_passed }}"
|
||||
|
||||
output_map:
|
||||
status: "{{ 'success' if workflow.validation_passed else 'failed' }}"
|
||||
items_processed: "{{ workflow.items_processed }}"
|
||||
total_duration: "{{ task.finalize_success.result.duration_seconds if workflow.validation_passed else task.finalize_failure.result.duration_seconds }}"
|
||||
Reference in New Issue
Block a user