Files
python_example/actions/artifact_demo.py
2026-03-04 13:49:14 -06:00

332 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Artifact Demo Action - Python Example Pack
Demonstrates creating file and progress artifacts via the Attune API.
Each iteration:
1. Appends a line to an in-memory log
2. Updates a progress artifact
3. Sleeps for 0.5 seconds
After all iterations complete, the full log is written directly to the
shared artifact volume as a **single version** of a file artifact with a
stable ref (`python_example.artifact_demo.log`).
File-based artifact flow (single API call):
1. POST /api/v1/artifacts/ref/{ref}/versions/file — upserts the artifact
(creating it if it doesn't exist) and allocates a version number,
returning a relative `file_path`
2. Write the file to $ATTUNE_ARTIFACTS_DIR/{file_path} on the shared volume
No HTTP upload is needed — the worker and action process share an artifact
volume, so the action writes directly to disk.
The progress artifact is still per-execution (ephemeral status indicator).
Parameters:
iterations - Number of iterations to run (default: 50)
visibility - Artifact visibility level for file artifacts: "public" or "private"
(default: "private"). Public artifacts are viewable by all authenticated
users on the platform. Private artifacts are restricted based on their
scope/owner fields.
Note: Progress artifacts always use the server default visibility (public), since
they are informational status indicators that anyone watching an execution should
be able to see. The visibility parameter only controls file artifacts.
"""
import json
import os
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
def api_request(
base_url, path, method="GET", data=None, token=None, content_type="application/json"
):
"""Make an HTTP request to the Attune API."""
url = f"{base_url}{path}"
headers = {"Accept": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
body = None
if data is not None:
if content_type == "application/json":
body = json.dumps(data).encode("utf-8")
headers["Content-Type"] = "application/json"
else:
body = data if isinstance(data, bytes) else data.encode("utf-8")
headers["Content-Type"] = content_type
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8")), resp.status
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8", errors="replace")
print(f"API error {e.code} on {method} {path}: {error_body}", file=sys.stderr)
raise
def allocate_file_version_by_ref(
base_url,
token,
artifact_ref,
execution_id=None,
visibility=None,
content_type="text/plain",
name=None,
description=None,
):
"""Upsert an artifact by ref and allocate a file-backed version.
Single API call that creates the artifact if it doesn't exist (or
reuses the existing one) and allocates a new version with a file_path.
Returns:
(artifact_id, version_id, file_path) tuple
"""
payload = {
"scope": "action",
"owner": "python_example.artifact_demo",
"type": "file_text",
"retention_policy": "versions",
"retention_limit": 10,
"content_type": content_type,
"created_by": "python_example.artifact_demo",
}
if execution_id is not None:
payload["execution"] = execution_id
if visibility is not None:
payload["visibility"] = visibility
if name is not None:
payload["name"] = name
if description is not None:
payload["description"] = description
resp, _ = api_request(
base_url,
f"/api/v1/artifacts/ref/{artifact_ref}/versions/file",
method="POST",
data=payload,
token=token,
)
version_data = resp["data"]
return version_data["artifact"], version_data["id"], version_data["file_path"]
def create_artifact(
base_url,
token,
ref,
name,
artifact_type,
execution_id,
visibility=None,
description=None,
data=None,
):
"""Create a new artifact and return its ID."""
payload = {
"ref": ref,
"scope": "action",
"owner": "python_example.artifact_demo",
"type": artifact_type,
"retention_policy": "versions",
"retention_limit": 10,
"name": name,
"execution": execution_id,
}
if visibility is not None:
payload["visibility"] = visibility
if description is not None:
payload["description"] = description
if data is not None:
payload["data"] = data
resp, _ = api_request(
base_url, "/api/v1/artifacts", method="POST", data=payload, token=token
)
return resp["data"]["id"]
def append_progress(base_url, token, artifact_id, entry):
"""Append an entry to a progress artifact."""
api_request(
base_url,
f"/api/v1/artifacts/{artifact_id}/progress",
method="POST",
data={"entry": entry},
token=token,
)
def main():
start_time = time.time()
try:
# Read parameters from stdin (JSON format)
params = json.loads(sys.stdin.readline())
iterations = params.get("iterations", 50)
visibility = params.get("visibility", "private")
# Validate visibility value
if visibility not in ("public", "private"):
raise ValueError(
f"Invalid visibility '{visibility}': must be 'public' or 'private'"
)
# Get execution context from environment
api_url = os.environ.get("ATTUNE_API_URL", "")
token = os.environ.get("ATTUNE_API_TOKEN", "")
exec_id_str = os.environ.get("ATTUNE_EXEC_ID", "")
artifacts_dir = os.environ.get("ATTUNE_ARTIFACTS_DIR", "")
execution_id = int(exec_id_str) if exec_id_str else None
if not api_url:
raise RuntimeError(
"ATTUNE_API_URL environment variable is not set. "
"This action must be run by the Attune worker."
)
if not token:
raise RuntimeError(
"ATTUNE_API_TOKEN environment variable is not set. "
"This action must be run by the Attune worker."
)
if not artifacts_dir:
raise RuntimeError(
"ATTUNE_ARTIFACTS_DIR environment variable is not set. "
"This action must be run by the Attune worker."
)
print(
f"Artifact demo starting: {iterations} iterations, "
f"visibility={visibility}, API at {api_url}, "
f"artifacts_dir={artifacts_dir}",
file=sys.stderr,
)
# ----------------------------------------------------------------
# File artifact — single call upserts artifact + allocates version
# ----------------------------------------------------------------
file_ref = "python_example.artifact_demo.log"
file_artifact_id, version_id, file_path = allocate_file_version_by_ref(
base_url=api_url,
token=token,
artifact_ref=file_ref,
execution_id=execution_id,
visibility=visibility,
content_type="text/plain",
name="Demo Log",
description="Log output from the artifact demo action",
)
full_file_path = os.path.join(artifacts_dir, file_path)
print(
f"Allocated file artifact ref={file_ref} id={file_artifact_id} "
f"version={version_id} path={file_path}",
file=sys.stderr,
)
# ----------------------------------------------------------------
# Progress artifact — per-execution (ephemeral status indicator)
# ----------------------------------------------------------------
ts = int(time.time())
ref_suffix = f"{execution_id}_{ts}" if execution_id else str(ts)
progress_ref = f"python_example.artifact_demo.progress.{ref_suffix}"
progress_artifact_id = create_artifact(
base_url=api_url,
token=token,
ref=progress_ref,
name="Artifact Demo Progress",
artifact_type="progress",
execution_id=execution_id,
description=f"Progress tracker for artifact demo ({iterations} iterations)",
data=[], # Initialize with empty array
)
print(
f"Created progress artifact ID={progress_artifact_id} ref={progress_ref} "
f"visibility=public (server default)",
file=sys.stderr,
)
# ----------------------------------------------------------------
# Run iterations — collect log lines, write file at the end
# ----------------------------------------------------------------
log_lines = []
for i in range(iterations):
iteration = i + 1
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
pct = min(round(iteration * (100.0 / iterations), 1), 100.0)
# Build log line
log_line = f"[{now}] Iteration {iteration}/{iterations} — progress {pct}%"
log_lines.append(log_line)
print(f" Iteration {iteration}/{iterations} ({pct}%)", file=sys.stderr)
# Append progress entry
append_progress(
api_url,
token,
progress_artifact_id,
{
"iteration": iteration,
"total": iterations,
"percent": pct,
"message": f"Completed iteration {iteration}",
"timestamp": now,
},
)
# Sleep between iterations
if iteration < iterations:
time.sleep(0.5)
# ----------------------------------------------------------------
# Write the complete log file to the shared artifact volume
# ----------------------------------------------------------------
full_log = "\n".join(log_lines) + "\n"
with open(full_file_path, "w", encoding="utf-8") as f:
f.write(full_log)
print(
f"Wrote {len(full_log)} bytes to {full_file_path}",
file=sys.stderr,
)
elapsed = round(time.time() - start_time, 3)
result = {
"file_artifact_id": file_artifact_id,
"file_artifact_ref": file_ref,
"file_version_id": version_id,
"file_path": file_path,
"progress_artifact_id": progress_artifact_id,
"iterations_completed": iterations,
"visibility": visibility,
"elapsed_seconds": elapsed,
"success": True,
}
print(json.dumps(result))
print(f"Artifact demo completed in {elapsed}s", file=sys.stderr)
return 0
except Exception as e:
elapsed = round(time.time() - start_time, 3)
error_result = {
"success": False,
"error": str(e),
"elapsed_seconds": elapsed,
}
print(json.dumps(error_result))
print(f"ERROR: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())