From f3c159913eb4aef62f510202bbc764861a3df807 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Wed, 11 Feb 2026 08:18:43 -0600 Subject: [PATCH] Initial commit: Python Example Pack for Attune Includes: - 3 Python actions (hello, http_example, read_counter) - 1 counter trigger type - 1 counter sensor (Python, keystore-backed, per-rule state) - 1 example rule (count_and_log) - requirements.txt with requests and pika - README with full usage documentation --- .gitignore | 30 ++ LICENSE | 18 ++ README.md | 239 ++++++++++++++++ actions/hello.py | 21 ++ actions/hello.yaml | 45 +++ actions/http_example.py | 22 ++ actions/http_example.yaml | 68 +++++ actions/read_counter.py | 51 ++++ actions/read_counter.yaml | 58 ++++ pack.yaml | 53 ++++ requirements.txt | 2 + rules/count_and_log.yaml | 25 ++ sensors/counter_sensor.py | 540 ++++++++++++++++++++++++++++++++++++ sensors/counter_sensor.yaml | 75 +++++ triggers/counter.yaml | 59 ++++ 15 files changed, 1306 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 actions/hello.py create mode 100644 actions/hello.yaml create mode 100644 actions/http_example.py create mode 100644 actions/http_example.yaml create mode 100644 actions/read_counter.py create mode 100644 actions/read_counter.yaml create mode 100644 pack.yaml create mode 100644 requirements.txt create mode 100644 rules/count_and_log.yaml create mode 100644 sensors/counter_sensor.py create mode 100644 sensors/counter_sensor.yaml create mode 100644 triggers/counter.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1e3157e --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +*.egg +dist/ +build/ +.eggs/ + +# Virtual environments +.venv/ +venv/ +env/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Test artifacts +.pytest_cache/ +.coverage +htmlcov/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..e08ba3d --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +MIT License + +Copyright (c) 2025 Attune Automation + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR diff --git a/README.md b/README.md new file mode 100644 index 0000000..487e919 --- /dev/null +++ b/README.md @@ -0,0 +1,239 @@ +# Python Example Pack for Attune + +A complete example pack demonstrating Python actions, a stateful counter sensor with keystore integration, and HTTP requests using the `requests` library. + +## Purpose + +This pack exercises as many parts of the Attune SDLC as possible: + +- **Python actions** with the wrapper-based execution model +- **Python sensor** with RabbitMQ rule lifecycle integration +- **Trigger types** with structured payload schemas +- **Rules** connecting triggers to actions with parameter mapping +- **Keystore integration** for persistent sensor state across restarts +- **External Python dependencies** (`requests`, `pika`) +- **Per-rule scoped state** — each rule subscription gets its own counter + +## Components + +### Actions + +| Ref | Description | +|-----|-------------| +| `python_example.hello` | Returns `"Hello, Python"` — minimal action | +| `python_example.http_example` | Uses `requests` to GET `https://example.com` | +| `python_example.read_counter` | Consumes a counter value and returns a formatted message | + +### Triggers + +| Ref | Description | +|-----|-------------| +| `python_example.counter` | Fires periodically with an incrementing counter per rule | + +### Sensors + +| Ref | Description | +|-----|-------------| +| `python_example.counter_sensor` | Manages per-rule counters stored in the Attune keystore | + +### Rules + +| Ref | Description | +|-----|-------------| +| `python_example.count_and_log` | Wires the counter trigger to the `read_counter` action | + +## Installation + +### As a Git Pack (recommended) + +```bash +# Install via the Attune CLI from a git repository +attune pack install https://github.com/attune-automation/pack-python-example.git + +# Or via the API +curl -X POST "http://localhost:8080/api/v1/packs/install" \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"source": "git", "url": "https://github.com/attune-automation/pack-python-example.git"}' +``` + +### Local Development (submodule) + +If you're developing against the Attune repository: + +```bash +cd attune + +# Add as a git submodule in packs.examples/ +git submodule add packs.examples/python_example + +# Or if you already have the directory, initialize it: +cd packs.examples/python_example +git init +git remote add origin +``` + +### Manual / Volume Mount + +Copy or symlink the pack into your Attune packs directory: + +```bash +cp -r python_example /opt/attune/packs/python_example +# Then restart services to pick it up, or use the dev packs volume +``` + +## Dependencies + +Declared in `requirements.txt`: + +- `requests>=2.28.0` — HTTP client for the `http_example` action and sensor API calls +- `pika>=1.3.0` — RabbitMQ client for the counter sensor + +These are installed automatically when the pack is loaded by a Python worker with dependency management enabled. + +## How It Works + +### Counter Sensor Flow + +``` +┌──────────────────────────────────────────────────────────┐ +│ counter_sensor.py │ +│ │ +│ 1. Startup: fetch active rules from GET /api/v1/rules │ +│ 2. Listen: RabbitMQ queue sensor.python_example.* │ +│ for rule.created / rule.enabled / rule.disabled / │ +│ rule.deleted messages │ +│ 3. Per active rule, spawn a timer thread: │ +│ │ +│ ┌────────────────────────────────────────┐ │ +│ │ Timer Thread (1 tick/sec per rule) │ │ +│ │ │ │ +│ │ GET /api/v1/keys/{key} → read counter │ │ +│ │ counter += 1 │ │ +│ │ PUT /api/v1/keys/{key} → write back │ │ +│ │ POST /api/v1/events → emit event │ │ +│ └────────────────────────────────────────┘ │ +│ │ +│ 4. On shutdown: stop all timer threads gracefully │ +└──────────────────────────────────────────────────────────┘ +``` + +### Keystore Key Naming + +Each rule gets its own counter key: + +``` +python_example.counter. +``` + +For example, a rule with ref `python_example.count_and_log` stores its counter at: + +``` +python_example.counter.python_example_count_and_log +``` + +### Event Payload + +Each emitted event has this structure: + +```json +{ + "counter": 42, + "rule_ref": "python_example.count_and_log", + "sensor_ref": "python_example.counter_sensor", + "fired_at": "2025-01-15T12:00:00.000000+00:00" +} +``` + +### Rule Parameter Mapping + +The included `count_and_log` rule maps trigger payload fields to action parameters: + +```yaml +action_params: + counter: "{{ trigger.payload.counter }}" + rule_ref: "{{ trigger.payload.rule_ref }}" +``` + +The `read_counter` action then returns: + +```json +{ + "message": "Counter value is 42 (from rule: python_example.count_and_log)", + "counter": 42, + "rule_ref": "python_example.count_and_log" +} +``` + +## Testing Individual Components + +### Test the hello action + +```bash +attune action execute python_example.hello +# Output: {"message": "Hello, Python"} +``` + +### Test the HTTP action + +```bash +attune action execute python_example.http_example +# Output: {"status_code": 200, "url": "https://example.com", ...} +``` + +### Test the read_counter action directly + +```bash +attune action execute python_example.read_counter --param counter=99 --param rule_ref=test +# Output: {"message": "Counter value is 99 (from rule: test)", ...} +``` + +### Enable the rule to start the counter sensor loop + +```bash +# The rule is enabled by default when the pack is loaded. +# To manually enable/disable: +attune rule enable python_example.count_and_log +attune rule disable python_example.count_and_log + +# Monitor executions produced by the rule: +attune execution list --action python_example.read_counter +``` + +## Configuration + +The pack supports the following configuration in `pack.yaml`: + +| Setting | Default | Description | +|---------|---------|-------------| +| `counter_key_prefix` | `python_example.counter` | Prefix for keystore keys | + +The sensor supports these parameters: + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `default_interval_seconds` | `1` | Default tick interval per rule | +| `key_prefix` | `python_example.counter` | Keystore key prefix | + +The trigger supports per-rule configuration: + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `interval_seconds` | `1` | Seconds between counter ticks | + +## Development + +```bash +# Run the sensor manually for testing +export ATTUNE_API_URL=http://localhost:8080 +export ATTUNE_API_TOKEN= +export ATTUNE_MQ_URL=amqp://guest:guest@localhost:5672/ +python3 sensors/counter_sensor.py + +# Run an action manually +echo '{"parameters": {"name": "World"}}' | python3 actions/hello.py +``` + +## License + +MIT \ No newline at end of file diff --git a/actions/hello.py b/actions/hello.py new file mode 100644 index 0000000..8a55c43 --- /dev/null +++ b/actions/hello.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +""" +Hello Action - Python Example Pack + +A minimal Python action that returns "Hello, Python". +Demonstrates the basic structure of a Python action in Attune. +""" + +import json +import sys + + +def run(**kwargs): + """Return a simple greeting message.""" + return {"message": "Hello, Python"} + + +if __name__ == "__main__": + result = run() + print(json.dumps({"result": result, "status": "success"})) + sys.exit(0) diff --git a/actions/hello.yaml b/actions/hello.yaml new file mode 100644 index 0000000..28f5040 --- /dev/null +++ b/actions/hello.yaml @@ -0,0 +1,45 @@ +# Hello Action +# Simple Python action that returns "Hello, Python" + +ref: python_example.hello +label: "Hello Python" +description: "A simple Python action that returns a greeting message" +enabled: true + +# Runner type determines how the action is executed +runner_type: python + +# Entry point is the Python script to execute +entry_point: hello.py + +# Parameter delivery: stdin for secure parameter passing +parameter_delivery: stdin +parameter_format: json + +# Output format: json (structured data parsing enabled) +output_format: json + +# Action parameters schema (standard JSON Schema format) +parameters: + type: object + properties: + name: + type: string + description: "Optional name to include in greeting" + default: "Python" + +# Output schema +output_schema: + type: object + properties: + message: + type: string + description: "The greeting message" + required: + - message + +# Tags for categorization +tags: + - python + - example + - greeting diff --git a/actions/http_example.py b/actions/http_example.py new file mode 100644 index 0000000..6feed74 --- /dev/null +++ b/actions/http_example.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +""" +HTTP Example Action - Python Example Pack + +Demonstrates using the `requests` library to make an HTTP call to example.com. +Receives parameters via stdin JSON (through the Python wrapper). +""" + +import requests + + +def run(url="https://example.com", **kwargs): + """Fetch a URL and return status and a snippet of the response body.""" + response = requests.get(url, timeout=10) + + return { + "status_code": response.status_code, + "url": response.url, + "content_length": len(response.text), + "snippet": response.text[:500], + "success": response.ok, + } diff --git a/actions/http_example.yaml b/actions/http_example.yaml new file mode 100644 index 0000000..f9eee7d --- /dev/null +++ b/actions/http_example.yaml @@ -0,0 +1,68 @@ +# HTTP Example Action +# Demonstrates using the requests library to make HTTP calls + +ref: python_example.http_example +label: "HTTP Example" +description: "Makes an HTTP GET request to example.com using the requests library" +enabled: true + +# Runner type +runner_type: python + +# Entry point +entry_point: http_example.py + +# Parameter delivery: stdin for secure parameter passing +parameter_delivery: stdin +parameter_format: json + +# Output format: json (structured data) +output_format: json + +# Action parameters schema +parameters: + type: object + properties: + url: + type: string + description: "URL to request (defaults to https://example.com)" + default: "https://example.com" + method: + type: string + description: "HTTP method" + default: "GET" + enum: + - GET + - POST + - PUT + - DELETE + +# Output schema +output_schema: + type: object + properties: + status_code: + type: integer + description: "HTTP response status code" + url: + type: string + description: "URL that was requested" + content_length: + type: integer + description: "Length of the response body in bytes" + content_type: + type: string + description: "Content-Type header from the response" + title: + type: string + description: "Extracted page title (if HTML response)" + success: + type: boolean + description: "Whether the request succeeded (2xx status)" + +# Tags for categorization +tags: + - http + - python + - example + - requests diff --git a/actions/read_counter.py b/actions/read_counter.py new file mode 100644 index 0000000..a1a8368 --- /dev/null +++ b/actions/read_counter.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +""" +Read Counter Action - Python Example Pack + +Consumes a counter value (typically from the counter sensor trigger payload) +and returns a formatted message containing the counter value. + +Parameters are delivered via stdin as JSON from the Python wrapper. +""" + +import json +import sys + + +def run(counter=0, rule_ref="unknown", **kwargs): + """Return a message containing the counter value. + + Args: + counter: The counter value from the trigger payload. + rule_ref: The rule reference that produced this counter. + **kwargs: Additional parameters (ignored). + + Returns: + dict with a formatted message and the raw counter value. + """ + return { + "message": f"Counter value is {counter} (from rule: {rule_ref})", + "counter": counter, + "rule_ref": rule_ref, + } + + +def main(): + """Entry point when run directly (without the Python wrapper).""" + try: + content = sys.stdin.read().strip() + if content: + parts = content.split("---ATTUNE_PARAMS_END---") + params = json.loads(parts[0].strip()) if parts[0].strip() else {} + else: + params = {} + except (json.JSONDecodeError, IndexError): + params = {} + + result = run(**params) + print(json.dumps(result, indent=2)) + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/actions/read_counter.yaml b/actions/read_counter.yaml new file mode 100644 index 0000000..4b4afe9 --- /dev/null +++ b/actions/read_counter.yaml @@ -0,0 +1,58 @@ +# Read Counter Action +# Consumes a counter value and returns a formatted message + +ref: python_example.read_counter +label: "Read Counter" +description: "Receives a counter value (typically from the counter trigger) and returns a formatted message containing it" +enabled: true + +# Runner type +runner_type: python + +# Entry point +entry_point: read_counter.py + +# Parameter delivery: stdin for secure parameter passing +parameter_delivery: stdin +parameter_format: json + +# Output format: json (structured data) +output_format: json + +# Action parameters schema +parameters: + type: object + properties: + counter: + type: integer + description: "The counter value to consume" + rule_ref: + type: string + description: "The rule reference the counter is scoped to" + default: "" + required: + - counter + +# Output schema +output_schema: + type: object + properties: + message: + type: string + description: "Formatted message containing the counter value" + counter: + type: integer + description: "The counter value that was consumed" + rule_ref: + type: string + description: "The rule reference the counter is scoped to" + required: + - message + - counter + +# Tags for categorization +tags: + - counter + - example + - python + - consumer diff --git a/pack.yaml b/pack.yaml new file mode 100644 index 0000000..e1c9ac3 --- /dev/null +++ b/pack.yaml @@ -0,0 +1,53 @@ +# Python Example Pack +# Demonstrates Python actions, sensors, triggers, and keystore integration + +ref: python_example +label: "Python Example Pack" +description: "Example pack demonstrating Python actions, a counter sensor with keystore integration, and HTTP requests" +version: "1.0.0" +author: "Attune Team" +email: "support@attune.io" + +system: false +enabled: true + +# Pack configuration schema +conf_schema: + type: object + properties: + counter_key_prefix: + type: string + description: "Prefix for counter keys in the keystore" + default: "python_example.counter" + +# Default pack configuration +config: + counter_key_prefix: "python_example.counter" + +# Pack metadata +meta: + category: "examples" + keywords: + - "python" + - "examples" + - "counter" + - "sensor" + - "keystore" + - "http" + python_dependencies: + - "requests>=2.28.0" + - "pika>=1.3.0" + documentation_url: "https://github.com/attune-automation/pack-python-example" + repository_url: "https://github.com/attune-automation/pack-python-example" + +# Tags for categorization +tags: + - python + - examples + - counter + - sensor + - http + +# Runtime dependencies +runtime_deps: + - python3 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f0bd0f0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.28.0 +pika>=1.3.0 diff --git a/rules/count_and_log.yaml b/rules/count_and_log.yaml new file mode 100644 index 0000000..f851aa6 --- /dev/null +++ b/rules/count_and_log.yaml @@ -0,0 +1,25 @@ +# Count and Log Rule +# Connects the counter sensor trigger to the read_counter action +# +# When the counter sensor fires, this rule passes the counter value +# and rule reference from the trigger payload into the read_counter action. + +ref: python_example.count_and_log +pack_ref: python_example +label: "Count and Log" +description: "Fires on each counter tick and logs the current counter value" + +# Link trigger to action +trigger_ref: python_example.counter +action_ref: python_example.read_counter + +# Map trigger payload fields into action parameters +action_params: + counter: "{{ trigger.payload.counter }}" + rule_ref: "{{ trigger.payload.rule_ref }}" + +# No conditions — fire on every counter event +conditions: {} + +# Active by default +enabled: true diff --git a/sensors/counter_sensor.py b/sensors/counter_sensor.py new file mode 100644 index 0000000..74a0ec5 --- /dev/null +++ b/sensors/counter_sensor.py @@ -0,0 +1,540 @@ +#!/usr/bin/env python3 +""" +Counter Sensor - Python Example Pack + +A stateful Python sensor that demonstrates: +- RabbitMQ integration for rule lifecycle events +- Attune keystore API for persistent per-rule counter state +- Periodic event emission via the Attune events API +- Per-rule scoped counters with independent timer threads +- Graceful shutdown with thread cleanup + +Environment Variables (provided by attune-sensor service): + ATTUNE_API_URL - Base URL of the Attune API (e.g. http://localhost:8080) + ATTUNE_API_TOKEN - JWT token for authenticating API calls + ATTUNE_SENSOR_ID - Database ID of this sensor instance + ATTUNE_SENSOR_REF - Reference name (python_example.counter_sensor) + ATTUNE_MQ_URL - RabbitMQ connection URL (e.g. amqp://localhost:5672) + ATTUNE_MQ_EXCHANGE - RabbitMQ exchange name (default: attune) + ATTUNE_LOG_LEVEL - Logging verbosity (default: info) +""" + +import json +import logging +import os +import signal +import sys +import threading +import time +from datetime import datetime, timezone + +import pika +import requests + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +API_URL = os.environ.get("ATTUNE_API_URL", "http://localhost:8080") +API_TOKEN = os.environ.get("ATTUNE_API_TOKEN", "") +SENSOR_ID = os.environ.get("ATTUNE_SENSOR_ID", "0") +SENSOR_REF = os.environ.get("ATTUNE_SENSOR_REF", "python_example.counter_sensor") +MQ_URL = os.environ.get("ATTUNE_MQ_URL", "amqp://localhost:5672") +MQ_EXCHANGE = os.environ.get("ATTUNE_MQ_EXCHANGE", "attune") +LOG_LEVEL = os.environ.get("ATTUNE_LOG_LEVEL", "info").upper() + +TRIGGER_TYPE = "python_example.counter" +KEY_PREFIX = "python_example.counter" +DEFAULT_INTERVAL = 1 # seconds + +# --------------------------------------------------------------------------- +# Logging +# --------------------------------------------------------------------------- + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL, logging.INFO), + format="%(message)s", + stream=sys.stderr, +) +logger = logging.getLogger("counter_sensor") + + +def log_json(level, message, **extra): + """Emit a structured JSON log line to stderr.""" + entry = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "level": level, + "sensor": SENSOR_REF, + "message": message, + **extra, + } + print(json.dumps(entry), file=sys.stderr, flush=True) + + +# --------------------------------------------------------------------------- +# API helpers +# --------------------------------------------------------------------------- + + +def _api_headers(): + return { + "Authorization": f"Bearer {API_TOKEN}", + "Content-Type": "application/json", + } + + +def keystore_get(key_ref): + """Read a key from the Attune keystore. Returns the value or None.""" + try: + resp = requests.get( + f"{API_URL}/api/v1/keys/{key_ref}", + headers=_api_headers(), + timeout=5, + ) + if resp.status_code == 200: + data = resp.json().get("data", {}) + return data.get("value") + if resp.status_code == 404: + return None + log_json( + "warn", + "keystore GET unexpected status", + key=key_ref, + status=resp.status_code, + ) + return None + except requests.RequestException as exc: + log_json("error", "keystore GET failed", key=key_ref, error=str(exc)) + return None + + +def keystore_put(key_ref, value): + """Update an existing key in the Attune keystore.""" + try: + resp = requests.put( + f"{API_URL}/api/v1/keys/{key_ref}", + headers=_api_headers(), + json={"value": str(value), "name": f"Counter: {key_ref}"}, + timeout=5, + ) + if resp.status_code not in (200, 201): + log_json( + "warn", + "keystore PUT unexpected status", + key=key_ref, + status=resp.status_code, + ) + return resp.status_code in (200, 201) + except requests.RequestException as exc: + log_json("error", "keystore PUT failed", key=key_ref, error=str(exc)) + return False + + +def keystore_create(key_ref, value, rule_ref): + """Create a new key in the Attune keystore.""" + try: + resp = requests.post( + f"{API_URL}/api/v1/keys", + headers=_api_headers(), + json={ + "ref": key_ref, + "owner_type": "sensor", + "owner_sensor_ref": SENSOR_REF, + "name": f"Counter: {key_ref}", + "value": str(value), + "encrypted": False, + }, + timeout=5, + ) + if resp.status_code in (200, 201): + return True + if resp.status_code == 409: + # Key already exists (race condition) — update instead + return keystore_put(key_ref, value) + log_json( + "warn", + "keystore POST unexpected status", + key=key_ref, + status=resp.status_code, + ) + return False + except requests.RequestException as exc: + log_json("error", "keystore POST failed", key=key_ref, error=str(exc)) + return False + + +def emit_event(payload, rule_ref): + """Create an event via POST /events.""" + body = { + "trigger_type": TRIGGER_TYPE, + "payload": payload, + "trigger_instance_id": f"rule_{rule_ref}", + } + for attempt in range(3): + try: + resp = requests.post( + f"{API_URL}/api/v1/events", + headers=_api_headers(), + json=body, + timeout=5, + ) + if resp.status_code in (200, 201): + return True + log_json( + "warn", + "event POST unexpected status", + status=resp.status_code, + attempt=attempt + 1, + rule_ref=rule_ref, + ) + except requests.RequestException as exc: + log_json("error", "event POST failed", error=str(exc), attempt=attempt + 1) + # Exponential backoff + time.sleep(0.5 * (2**attempt)) + return False + + +# --------------------------------------------------------------------------- +# Per-rule counter loop +# --------------------------------------------------------------------------- + + +class RuleCounter: + """Manages a per-rule counter with a background timer thread.""" + + def __init__(self, rule_id, rule_ref, trigger_params): + self.rule_id = rule_id + self.rule_ref = rule_ref + self.interval = trigger_params.get("interval_seconds", DEFAULT_INTERVAL) + self.key_ref = f"{KEY_PREFIX}.{rule_ref.replace('.', '_')}" + self._stop = threading.Event() + self._thread = None + + def start(self): + """Start the counter loop in a background thread.""" + if self._thread is not None and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread( + target=self._loop, + name=f"counter-{self.rule_ref}", + daemon=True, + ) + self._thread.start() + log_json( + "info", + "counter started", + rule_id=self.rule_id, + rule_ref=self.rule_ref, + interval=self.interval, + ) + + def stop(self): + """Signal the counter loop to stop and wait for it.""" + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=5) + self._thread = None + log_json( + "info", "counter stopped", rule_id=self.rule_id, rule_ref=self.rule_ref + ) + + def _loop(self): + """Main counter loop: read → increment → write → emit, repeat.""" + while not self._stop.is_set(): + try: + self._tick() + except Exception as exc: + log_json( + "error", + "counter tick failed", + rule_ref=self.rule_ref, + error=str(exc), + ) + self._stop.wait(timeout=self.interval) + + def _tick(self): + """Execute one counter cycle.""" + # Read current value from keystore + raw = keystore_get(self.key_ref) + if raw is not None: + try: + counter = int(raw) + except (ValueError, TypeError): + counter = 0 + else: + counter = 0 + + # Increment + counter += 1 + + # Write back to keystore + if raw is None: + keystore_create(self.key_ref, counter, self.rule_ref) + else: + keystore_put(self.key_ref, counter) + + # Build event payload + payload = { + "counter": counter, + "rule_ref": self.rule_ref, + "sensor_ref": SENSOR_REF, + "fired_at": datetime.now(timezone.utc).isoformat(), + } + + # Emit event + emit_event(payload, self.rule_ref) + + +# --------------------------------------------------------------------------- +# Rule manager +# --------------------------------------------------------------------------- + + +class RuleManager: + """Tracks active rules and their counter threads.""" + + def __init__(self): + self._rules = {} # rule_id -> RuleCounter + self._lock = threading.Lock() + + def handle_message(self, message): + """Process a rule lifecycle message from RabbitMQ.""" + event_type = message.get("event_type", "") + rule_id = message.get("rule_id") + rule_ref = message.get("rule_ref", f"rule_{rule_id}") + trigger_type = message.get("trigger_type", "") + trigger_params = message.get("trigger_params", {}) + + # Only handle messages for our trigger type + if trigger_type and trigger_type != TRIGGER_TYPE: + return + + if event_type in ("RuleCreated", "RuleEnabled"): + self._start_rule(rule_id, rule_ref, trigger_params) + elif event_type in ("RuleDisabled", "RuleDeleted"): + self._stop_rule(rule_id) + else: + log_json("debug", "ignoring unknown event_type", event_type=event_type) + + def _start_rule(self, rule_id, rule_ref, trigger_params): + with self._lock: + if rule_id in self._rules: + # Already tracking — restart with potentially new params + self._rules[rule_id].stop() + rc = RuleCounter(rule_id, rule_ref, trigger_params) + self._rules[rule_id] = rc + rc.start() + + def _stop_rule(self, rule_id): + with self._lock: + rc = self._rules.pop(rule_id, None) + if rc is not None: + rc.stop() + + def stop_all(self): + """Stop all active counter threads.""" + with self._lock: + rule_ids = list(self._rules.keys()) + for rid in rule_ids: + self._stop_rule(rid) + + @property + def active_count(self): + with self._lock: + return len(self._rules) + + +# --------------------------------------------------------------------------- +# RabbitMQ consumer +# --------------------------------------------------------------------------- + +ROUTING_KEYS = ["rule.created", "rule.enabled", "rule.disabled", "rule.deleted"] + + +def start_mq_consumer(rule_manager, shutdown_event): + """Connect to RabbitMQ and consume rule lifecycle messages. + + Runs in a dedicated thread. Reconnects automatically on failure. + """ + queue_name = f"sensor.{SENSOR_REF}" + + while not shutdown_event.is_set(): + connection = None + try: + log_json("info", "connecting to RabbitMQ", url=MQ_URL) + params = pika.URLParameters(MQ_URL) + params.heartbeat = 30 + params.blocked_connection_timeout = 30 + connection = pika.BlockingConnection(params) + channel = connection.channel() + + # Declare exchange (idempotent) + channel.exchange_declare( + exchange=MQ_EXCHANGE, + exchange_type="topic", + durable=True, + ) + + # Declare and bind queue + channel.queue_declare(queue=queue_name, durable=True) + for rk in ROUTING_KEYS: + channel.queue_bind( + queue=queue_name, + exchange=MQ_EXCHANGE, + routing_key=rk, + ) + + log_json("info", "RabbitMQ connected, consuming", queue=queue_name) + + # Consume messages with a timeout so we can check shutdown_event + for method, _properties, body in channel.consume( + queue=queue_name, + inactivity_timeout=1, + ): + if shutdown_event.is_set(): + break + + if method is None: + # Inactivity timeout — just loop to check shutdown + continue + + try: + message = json.loads(body) + log_json( + "debug", + "received MQ message", + event_type=message.get("event_type"), + ) + rule_manager.handle_message(message) + except json.JSONDecodeError: + log_json( + "warn", + "invalid JSON in MQ message", + body=body.decode("utf-8", errors="replace")[:200], + ) + except Exception as exc: + log_json("error", "error processing MQ message", error=str(exc)) + + channel.basic_ack(delivery_tag=method.delivery_tag) + + except pika.exceptions.AMQPConnectionError as exc: + log_json( + "warn", "RabbitMQ connection error, retrying in 5s", error=str(exc) + ) + except Exception as exc: + log_json("error", "unexpected MQ error, retrying in 5s", error=str(exc)) + finally: + if connection and not connection.is_closed: + try: + connection.close() + except Exception: + pass + + # Wait before reconnecting (unless shutting down) + shutdown_event.wait(timeout=5) + + +# --------------------------------------------------------------------------- +# Bootstrap: fetch existing active rules on startup +# --------------------------------------------------------------------------- + + +def fetch_active_rules(rule_manager): + """Query the API for rules that reference our trigger type and start counters.""" + try: + resp = requests.get( + f"{API_URL}/api/v1/rules", + headers=_api_headers(), + params={"trigger_ref": TRIGGER_TYPE, "enabled": "true"}, + timeout=10, + ) + if resp.status_code != 200: + log_json("warn", "failed to fetch active rules", status=resp.status_code) + return + + body = resp.json() + rules = body.get("data", []) + if not isinstance(rules, list): + # Might be paginated — handle the common shapes + rules = [] + + for rule in rules: + rule_id = rule.get("id") + rule_ref = rule.get("ref", f"rule_{rule_id}") + enabled = rule.get("enabled", True) + if rule_id and enabled: + rule_manager.handle_message( + { + "event_type": "RuleCreated", + "rule_id": rule_id, + "rule_ref": rule_ref, + "trigger_type": TRIGGER_TYPE, + "trigger_params": rule.get("trigger_params", {}), + } + ) + + log_json("info", "bootstrapped active rules", count=len(rules)) + + except requests.RequestException as exc: + log_json("warn", "could not fetch active rules on startup", error=str(exc)) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def main(): + log_json("info", "sensor starting", sensor_ref=SENSOR_REF, api_url=API_URL) + + shutdown_event = threading.Event() + rule_manager = RuleManager() + + # Handle termination signals + def _shutdown(signum, _frame): + sig_name = ( + signal.Signals(signum).name if hasattr(signal, "Signals") else str(signum) + ) + log_json("info", "shutdown signal received", signal=sig_name) + shutdown_event.set() + + signal.signal(signal.SIGTERM, _shutdown) + signal.signal(signal.SIGINT, _shutdown) + + # Bootstrap: load already-active rules from the API + fetch_active_rules(rule_manager) + + # Start RabbitMQ consumer in a background thread + mq_thread = threading.Thread( + target=start_mq_consumer, + args=(rule_manager, shutdown_event), + name="mq-consumer", + daemon=True, + ) + mq_thread.start() + + log_json("info", "sensor running", active_rules=rule_manager.active_count) + + # Main thread just waits for shutdown + try: + while not shutdown_event.is_set(): + shutdown_event.wait(timeout=10) + if not shutdown_event.is_set(): + log_json("debug", "heartbeat", active_rules=rule_manager.active_count) + except KeyboardInterrupt: + log_json("info", "keyboard interrupt") + shutdown_event.set() + + # Graceful shutdown + log_json("info", "shutting down", active_rules=rule_manager.active_count) + rule_manager.stop_all() + + # Give the MQ thread a moment to clean up + mq_thread.join(timeout=5) + + log_json("info", "sensor stopped") + + +if __name__ == "__main__": + main() diff --git a/sensors/counter_sensor.yaml b/sensors/counter_sensor.yaml new file mode 100644 index 0000000..b77008a --- /dev/null +++ b/sensors/counter_sensor.yaml @@ -0,0 +1,75 @@ +# Counter Sensor +# Emits incrementing counter events, storing state in the Attune keystore +# +# Each subscribing rule gets its own independent counter, keyed by rule ref. +# The sensor listens for rule lifecycle events via RabbitMQ and manages +# per-rule timer loops that emit one event per second (configurable). + +ref: python_example.counter_sensor +label: "Counter Sensor" +description: "Emits periodic counter events with per-rule state stored in the Attune keystore" +enabled: true + +# Sensor runner type +runner_type: python + +# Entry point for sensor execution +entry_point: counter_sensor.py + +# Trigger types this sensor monitors +trigger_types: + - python_example.counter + +# Sensor configuration schema +parameters: + type: object + properties: + default_interval_seconds: + type: integer + description: "Default interval between counter emissions (in seconds)" + default: 1 + minimum: 1 + maximum: 3600 + key_prefix: + type: string + description: "Prefix for counter keys in the Attune keystore" + default: "python_example.counter" + +# Poll interval (how often the sensor checks for events) +poll_interval: 1 + +# Tags for categorization +tags: + - counter + - python + - example + - keystore + +# Metadata +meta: + builtin: false + system: false + description: | + The counter sensor demonstrates a stateful Python sensor that integrates + with the Attune keystore. It maintains a separate monotonically-increasing + counter for each subscribing rule, persisting the value in the keystore + so that counters survive sensor restarts. + + Features exercised: + - Python sensor lifecycle (startup, rule subscription, shutdown) + - RabbitMQ integration for rule lifecycle events + - Attune keystore API for persistent state (GET/PUT/POST /api/v1/keys) + - Per-rule scoped state via keystore key naming + - Periodic event emission via POST /events API + - Graceful shutdown with thread cleanup + +# Documentation +examples: + - description: "Counter firing every second (default)" + trigger_type: python_example.counter + trigger_config: {} + + - description: "Counter firing every 5 seconds" + trigger_type: python_example.counter + trigger_config: + interval_seconds: 5 diff --git a/triggers/counter.yaml b/triggers/counter.yaml new file mode 100644 index 0000000..4a48d0e --- /dev/null +++ b/triggers/counter.yaml @@ -0,0 +1,59 @@ +# Counter Trigger +# Fires periodically with an incrementing counter value + +ref: python_example.counter +label: "Counter Trigger" +description: "Fires at regular intervals with an incrementing counter value scoped per rule" +enabled: true + +# Trigger type +type: custom + +# Parameter schema - configuration for the trigger instance +parameters: + type: object + properties: + interval_seconds: + type: integer + description: "Seconds between each counter emission" + default: 1 + minimum: 1 + maximum: 3600 + required: [] + +# Payload schema - data emitted when trigger fires +output: + type: object + properties: + counter: + type: integer + description: "Current counter value (monotonically increasing per rule)" + rule_ref: + type: string + description: "Reference of the rule that this counter is scoped to" + sensor_ref: + type: string + description: "Reference to the sensor that generated this event" + fired_at: + type: string + format: date-time + description: "Timestamp when the trigger fired" + required: + - counter + - rule_ref + - fired_at + +# Tags for categorization +tags: + - counter + - example + - python + +# Documentation +examples: + - description: "Counter firing every second (default)" + parameters: {} + + - description: "Counter firing every 5 seconds" + parameters: + interval_seconds: 5