commit 9072c93fe42da8ea7a18237c9a0fde26a40c345e Author: David Culbreth Date: Wed Feb 11 17:36:38 2026 -0600 Initial commit: Node.js Example Pack for Attune Includes: - 3 Node.js actions (hello, http_example, read_counter) - 1 counter trigger type - 1 counter sensor (Node.js, keystore-backed, per-rule state) - 1 example rule (count_and_log) - package.json with node-fetch and amqplib - README with full usage documentation diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e3656b8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +# Node.js +node_modules/ +npm-debug.log* +yarn-debug.log* +yarn-error.log* +.npm +package-lock.json +yarn.lock + +# Build output +dist/ +build/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Test artifacts +coverage/ +.nyc_output/ + +# Environment +.env +.env.local diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c73338a --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +MIT License + +Copyright (c) 2025 Attune Automation + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE diff --git a/README.md b/README.md new file mode 100644 index 0000000..40243b5 --- /dev/null +++ b/README.md @@ -0,0 +1,255 @@ +# Node.js Example Pack for Attune + +A complete example pack demonstrating Node.js actions, a stateful counter sensor with keystore integration, and HTTP requests using the `node-fetch` library. + +## Purpose + +This pack exercises as many parts of the Attune SDLC as possible: + +- **Node.js actions** with the wrapper-based execution model +- **Node.js sensor** with RabbitMQ rule lifecycle integration (amqplib) +- **Trigger types** with structured payload schemas +- **Rules** connecting triggers to actions with parameter mapping +- **Keystore integration** for persistent sensor state across restarts +- **External Node.js dependencies** (`node-fetch`, `amqplib`) +- **Per-rule scoped state** — each rule subscription gets its own counter + +## Components + +### Actions + +| Ref | Description | +|-----|-------------| +| `nodejs_example.hello` | Returns `"Hello, Node.js"` — minimal action | +| `nodejs_example.http_example` | Uses `node-fetch` to GET `https://example.com` | +| `nodejs_example.read_counter` | Consumes a counter value and returns a formatted message | + +### Triggers + +| Ref | Description | +|-----|-------------| +| `nodejs_example.counter` | Fires periodically with an incrementing counter per rule | + +### Sensors + +| Ref | Description | +|-----|-------------| +| `nodejs_example.counter_sensor` | Manages per-rule counters stored in the Attune keystore | + +### Rules + +| Ref | Description | +|-----|-------------| +| `nodejs_example.count_and_log` | Wires the counter trigger to the `read_counter` action | + +## Installation + +### As a Git Pack (recommended) + +```bash +# Install via the Attune CLI from a git repository +attune pack install https://github.com/attune-automation/pack-nodejs-example.git + +# Or via the API +curl -X POST "http://localhost:8080/api/v1/packs/install" \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"source": "git", "url": "https://github.com/attune-automation/pack-nodejs-example.git"}' +``` + +### Local Development (submodule) + +If you're developing against the Attune repository: + +```bash +cd attune + +# Add as a git submodule in packs.examples/ +git submodule add packs.examples/nodejs_example + +# Or if you already have the directory, initialize it: +cd packs.examples/nodejs_example +git init +git remote add origin +``` + +### Manual / Volume Mount + +Copy or symlink the pack into your Attune packs directory: + +```bash +cp -r nodejs_example /opt/attune/packs/nodejs_example +# Then restart services to pick it up, or use the dev packs volume +``` + +## Dependencies + +Declared in `package.json`: + +- `node-fetch@^2.7.0` — HTTP client for the `http_example` action (CJS-compatible v2) +- `amqplib@^0.10.4` — RabbitMQ client for the counter sensor + +These are installed automatically when the pack is loaded by a Node.js worker with dependency management enabled. + +## How It Works + +### Counter Sensor Flow + +``` +┌──────────────────────────────────────────────────────────┐ +│ counter_sensor.js │ +│ │ +│ 1. Startup: fetch active rules from GET /api/v1/rules │ +│ 2. Listen: RabbitMQ queue sensor.nodejs_example.* │ +│ for rule.created / rule.enabled / rule.disabled / │ +│ rule.deleted messages │ +│ 3. Per active rule, spawn a setInterval timer: │ +│ │ +│ ┌────────────────────────────────────────┐ │ +│ │ Timer (1 tick/sec per rule) │ │ +│ │ │ │ +│ │ GET /api/v1/keys/{key} → read counter │ │ +│ │ counter += 1 │ │ +│ │ PUT /api/v1/keys/{key} → write back │ │ +│ │ POST /api/v1/events → emit event │ │ +│ └────────────────────────────────────────┘ │ +│ │ +│ 4. On shutdown: clearTimeout all timers gracefully │ +└──────────────────────────────────────────────────────────┘ +``` + +### Keystore Key Naming + +Each rule gets its own counter key: + +``` +nodejs_example.counter. +``` + +For example, a rule with ref `nodejs_example.count_and_log` stores its counter at: + +``` +nodejs_example.counter.nodejs_example_count_and_log +``` + +### Event Payload + +Each emitted event has this structure: + +```json +{ + "counter": 42, + "rule_ref": "nodejs_example.count_and_log", + "sensor_ref": "nodejs_example.counter_sensor", + "fired_at": "2025-01-15T12:00:00.000Z" +} +``` + +### Rule Parameter Mapping + +The included `count_and_log` rule maps trigger payload fields to action parameters: + +```yaml +action_params: + counter: "{{ trigger.payload.counter }}" + rule_ref: "{{ trigger.payload.rule_ref }}" +``` + +The `read_counter` action then returns: + +```json +{ + "message": "Counter value is 42 (from rule: nodejs_example.count_and_log)", + "counter": 42, + "rule_ref": "nodejs_example.count_and_log" +} +``` + +## Testing Individual Components + +### Test the hello action + +```bash +attune action execute nodejs_example.hello +# Output: {"message": "Hello, Node.js"} +``` + +### Test the HTTP action + +```bash +attune action execute nodejs_example.http_example +# Output: {"status_code": 200, "url": "https://example.com", ...} +``` + +### Test the read_counter action directly + +```bash +attune action execute nodejs_example.read_counter --param counter=99 --param rule_ref=test +# Output: {"message": "Counter value is 99 (from rule: test)", ...} +``` + +### Enable the rule to start the counter sensor loop + +```bash +# The rule is enabled by default when the pack is loaded. +# To manually enable/disable: +attune rule enable nodejs_example.count_and_log +attune rule disable nodejs_example.count_and_log + +# Monitor executions produced by the rule: +attune execution list --action nodejs_example.read_counter +``` + +## Configuration + +The pack supports the following configuration in `pack.yaml`: + +| Setting | Default | Description | +|---------|---------|-------------| +| `counter_key_prefix` | `nodejs_example.counter` | Prefix for keystore keys | + +The sensor supports these parameters: + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `default_interval_seconds` | `1` | Default tick interval per rule | +| `key_prefix` | `nodejs_example.counter` | Keystore key prefix | + +The trigger supports per-rule configuration: + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `interval_seconds` | `1` | Seconds between counter ticks | + +## Development + +```bash +# Install dependencies locally +npm install + +# Run the sensor manually for testing +export ATTUNE_API_URL=http://localhost:8080 +export ATTUNE_API_TOKEN= +export ATTUNE_MQ_URL=amqp://guest:guest@localhost:5672/ +node sensors/counter_sensor.js + +# Run an action manually (direct execution) +node actions/hello.js +``` + +## Comparison with Python Example Pack + +This pack is a direct Node.js equivalent of the `python_example` pack. Both exercise the same Attune features but use their respective language ecosystems: + +| Feature | Python Pack | Node.js Pack | +|---------|-------------|--------------| +| HTTP client | `requests` | `node-fetch` | +| RabbitMQ client | `pika` | `amqplib` | +| Concurrency model | `threading.Thread` + `threading.Event` | `setTimeout` + `EventEmitter` | +| Sensor API calls | `requests` (same lib as actions) | Built-in `http`/`https` (no extra deps) | +| Dependency file | `requirements.txt` | `package.json` | +| Module exports | `def run(**kwargs)` | `module.exports = { run }` | + +## License + +MIT \ No newline at end of file diff --git a/actions/hello.js b/actions/hello.js new file mode 100644 index 0000000..2c029d4 --- /dev/null +++ b/actions/hello.js @@ -0,0 +1,30 @@ +#!/usr/bin/env node +/** + * Hello Action - Node.js Example Pack + * + * A minimal Node.js action that returns "Hello, Node.js". + * Demonstrates the basic structure of a Node.js action in Attune. + * + * When invoked via the Node.js wrapper, the `run` export is called + * with the action parameters as its argument. + */ + +"use strict"; + +/** + * Return a simple greeting message. + * @param {object} params - Action parameters (unused). + * @returns {{ message: string }} + */ +function run(params) { + return { message: "Hello, Node.js" }; +} + +// Direct execution support (without the wrapper) +if (require.main === module) { + const result = run({}); + process.stdout.write(JSON.stringify({ result, status: "success" }) + "\n"); + process.exit(0); +} + +module.exports = { run }; diff --git a/actions/hello.yaml b/actions/hello.yaml new file mode 100644 index 0000000..8355bff --- /dev/null +++ b/actions/hello.yaml @@ -0,0 +1,45 @@ +# Hello Action +# Simple Node.js action that returns "Hello, Node.js" + +ref: nodejs_example.hello +label: "Hello Node.js" +description: "A simple Node.js action that returns a greeting message" +enabled: true + +# Runner type determines how the action is executed +runner_type: nodejs + +# Entry point is the JavaScript file to execute +entry_point: hello.js + +# Parameter delivery: stdin for secure parameter passing +parameter_delivery: stdin +parameter_format: json + +# Output format: json (structured data parsing enabled) +output_format: json + +# Action parameters schema (standard JSON Schema format) +parameters: + type: object + properties: + name: + type: string + description: "Optional name to include in greeting" + default: "Node.js" + +# Output schema +output_schema: + type: object + properties: + message: + type: string + description: "The greeting message" + required: + - message + +# Tags for categorization +tags: + - nodejs + - example + - greeting diff --git a/actions/http_example.js b/actions/http_example.js new file mode 100644 index 0000000..7cd7518 --- /dev/null +++ b/actions/http_example.js @@ -0,0 +1,26 @@ +#!/usr/bin/env node +/** + * HTTP Example Action - Node.js Example Pack + * + * Demonstrates using the `node-fetch` library to make an HTTP call to example.com. + * Receives parameters via the Node.js wrapper (stdin JSON with code_path). + */ + +const fetch = require("node-fetch"); + +async function run(params) { + const url = params.url || "https://example.com"; + + const response = await fetch(url, { timeout: 10000 }); + const text = await response.text(); + + return { + status_code: response.status, + url: response.url, + content_length: text.length, + snippet: text.slice(0, 500), + success: response.ok, + }; +} + +module.exports = { run }; diff --git a/actions/http_example.yaml b/actions/http_example.yaml new file mode 100644 index 0000000..03d7d5f --- /dev/null +++ b/actions/http_example.yaml @@ -0,0 +1,65 @@ +# HTTP Example Action +# Demonstrates using node-fetch to make HTTP calls + +ref: nodejs_example.http_example +label: "HTTP Example" +description: "Makes an HTTP GET request to example.com using the node-fetch library" +enabled: true + +# Runner type +runner_type: nodejs + +# Entry point +entry_point: http_example.js + +# Parameter delivery: stdin for secure parameter passing +parameter_delivery: stdin +parameter_format: json + +# Output format: json (structured data) +output_format: json + +# Action parameters schema +parameters: + type: object + properties: + url: + type: string + description: "URL to request (defaults to https://example.com)" + default: "https://example.com" + method: + type: string + description: "HTTP method" + default: "GET" + enum: + - GET + - POST + - PUT + - DELETE + +# Output schema +output_schema: + type: object + properties: + status_code: + type: integer + description: "HTTP response status code" + url: + type: string + description: "URL that was requested" + content_length: + type: integer + description: "Length of the response body in characters" + snippet: + type: string + description: "First 500 characters of the response body" + success: + type: boolean + description: "Whether the request succeeded (2xx status)" + +# Tags for categorization +tags: + - http + - nodejs + - example + - fetch diff --git a/actions/read_counter.js b/actions/read_counter.js new file mode 100644 index 0000000..677dbf9 --- /dev/null +++ b/actions/read_counter.js @@ -0,0 +1,30 @@ +#!/usr/bin/env node +/** + * Read Counter Action - Node.js Example Pack + * + * Consumes a counter value (typically from the counter sensor trigger payload) + * and returns a formatted message containing the counter value. + * + * Parameters are delivered via stdin as JSON from the Node.js wrapper. + */ + +"use strict"; + +/** + * @param {object} params + * @param {number} params.counter - The counter value from the trigger payload. + * @param {string} params.rule_ref - The rule reference the counter is scoped to. + * @returns {object} Formatted message and raw counter value. + */ +function run(params) { + const counter = params.counter !== undefined ? params.counter : 0; + const ruleRef = params.rule_ref || "unknown"; + + return { + message: `Counter value is ${counter} (from rule: ${ruleRef})`, + counter: counter, + rule_ref: ruleRef, + }; +} + +module.exports = { run }; diff --git a/actions/read_counter.yaml b/actions/read_counter.yaml new file mode 100644 index 0000000..d3a8fec --- /dev/null +++ b/actions/read_counter.yaml @@ -0,0 +1,58 @@ +# Read Counter Action +# Consumes a counter value and returns a formatted message + +ref: nodejs_example.read_counter +label: "Read Counter" +description: "Receives a counter value (typically from the counter trigger) and returns a formatted message containing it" +enabled: true + +# Runner type +runner_type: nodejs + +# Entry point +entry_point: read_counter.js + +# Parameter delivery: stdin for secure parameter passing +parameter_delivery: stdin +parameter_format: json + +# Output format: json (structured data) +output_format: json + +# Action parameters schema +parameters: + type: object + properties: + counter: + type: integer + description: "The counter value to consume" + rule_ref: + type: string + description: "The rule reference the counter is scoped to" + default: "" + required: + - counter + +# Output schema +output_schema: + type: object + properties: + message: + type: string + description: "Formatted message containing the counter value" + counter: + type: integer + description: "The counter value that was consumed" + rule_ref: + type: string + description: "The rule reference the counter is scoped to" + required: + - message + - counter + +# Tags for categorization +tags: + - counter + - example + - nodejs + - consumer diff --git a/pack.yaml b/pack.yaml new file mode 100644 index 0000000..173a4af --- /dev/null +++ b/pack.yaml @@ -0,0 +1,55 @@ +# Node.js Example Pack +# Demonstrates Node.js actions, sensors, triggers, and keystore integration + +ref: nodejs_example +label: "Node.js Example Pack" +description: "Example pack demonstrating Node.js actions, a counter sensor with keystore integration, and HTTP requests" +version: "1.0.0" +author: "Attune Team" +email: "support@attune.io" + +system: false +enabled: true + +# Pack configuration schema +conf_schema: + type: object + properties: + counter_key_prefix: + type: string + description: "Prefix for counter keys in the keystore" + default: "nodejs_example.counter" + +# Default pack configuration +config: + counter_key_prefix: "nodejs_example.counter" + +# Pack metadata +meta: + category: "examples" + keywords: + - "nodejs" + - "javascript" + - "examples" + - "counter" + - "sensor" + - "keystore" + - "http" + node_dependencies: + - "node-fetch@^3.3.0" + - "amqplib@^0.10.0" + documentation_url: "https://github.com/attune-automation/pack-nodejs-example" + repository_url: "https://github.com/attune-automation/pack-nodejs-example" + +# Tags for categorization +tags: + - nodejs + - javascript + - examples + - counter + - sensor + - http + +# Runtime dependencies +runtime_deps: + - nodejs diff --git a/package.json b/package.json new file mode 100644 index 0000000..ce1867f --- /dev/null +++ b/package.json @@ -0,0 +1,10 @@ +{ + "name": "attune-pack-nodejs-example", + "version": "1.0.0", + "description": "Example Attune pack demonstrating Node.js actions, a counter sensor with keystore integration, and HTTP requests", + "private": true, + "dependencies": { + "node-fetch": "^2.7.0", + "amqplib": "^0.10.4" + } +} diff --git a/rules/count_and_log.yaml b/rules/count_and_log.yaml new file mode 100644 index 0000000..92c3aa9 --- /dev/null +++ b/rules/count_and_log.yaml @@ -0,0 +1,25 @@ +# Count and Log Rule +# Connects the counter sensor trigger to the read_counter action +# +# When the counter sensor fires, this rule passes the counter value +# and rule reference from the trigger payload into the read_counter action. + +ref: nodejs_example.count_and_log +pack_ref: nodejs_example +label: "Count and Log" +description: "Fires on each counter tick and logs the current counter value" + +# Link trigger to action +trigger_ref: nodejs_example.counter +action_ref: nodejs_example.read_counter + +# Map trigger payload fields into action parameters +action_params: + counter: "{{ trigger.payload.counter }}" + rule_ref: "{{ trigger.payload.rule_ref }}" + +# No conditions — fire on every counter event +conditions: {} + +# Active by default +enabled: true diff --git a/sensors/counter_sensor.js b/sensors/counter_sensor.js new file mode 100644 index 0000000..08148ac --- /dev/null +++ b/sensors/counter_sensor.js @@ -0,0 +1,658 @@ +#!/usr/bin/env node +/** + * Counter Sensor - Node.js Example Pack + * + * A stateful Node.js sensor that demonstrates: + * - RabbitMQ integration (amqplib) for rule lifecycle events + * - Attune keystore API for persistent per-rule counter state + * - Periodic event emission via the Attune events API + * - Per-rule scoped counters with independent timers + * - Graceful shutdown with timer cleanup + * + * Environment Variables (provided by attune-sensor service): + * ATTUNE_API_URL - Base URL of the Attune API (e.g. http://localhost:8080) + * ATTUNE_API_TOKEN - JWT token for authenticating API calls + * ATTUNE_SENSOR_ID - Database ID of this sensor instance + * ATTUNE_SENSOR_REF - Reference name (nodejs_example.counter_sensor) + * ATTUNE_MQ_URL - RabbitMQ connection URL (e.g. amqp://localhost:5672) + * ATTUNE_MQ_EXCHANGE - RabbitMQ exchange name (default: attune) + * ATTUNE_LOG_LEVEL - Logging verbosity (default: info) + */ + +"use strict"; + +const http = require("http"); +const https = require("https"); +const url = require("url"); +const amqplib = require("amqplib"); + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +const API_URL = process.env.ATTUNE_API_URL || "http://localhost:8080"; +const API_TOKEN = process.env.ATTUNE_API_TOKEN || ""; +const SENSOR_ID = process.env.ATTUNE_SENSOR_ID || "0"; +const SENSOR_REF = + process.env.ATTUNE_SENSOR_REF || "nodejs_example.counter_sensor"; +const MQ_URL = process.env.ATTUNE_MQ_URL || "amqp://localhost:5672"; +const MQ_EXCHANGE = process.env.ATTUNE_MQ_EXCHANGE || "attune"; +const LOG_LEVEL = (process.env.ATTUNE_LOG_LEVEL || "info").toLowerCase(); + +const TRIGGER_TYPE = "nodejs_example.counter"; +const KEY_PREFIX = "nodejs_example.counter"; +const DEFAULT_INTERVAL = 1; // seconds + +const LOG_LEVELS = { debug: 0, info: 1, warn: 2, error: 3 }; +const CURRENT_LOG_LEVEL = LOG_LEVELS[LOG_LEVEL] !== undefined ? LOG_LEVELS[LOG_LEVEL] : 1; + +const ROUTING_KEYS = [ + "rule.created", + "rule.enabled", + "rule.disabled", + "rule.deleted", +]; + +// --------------------------------------------------------------------------- +// Logging +// --------------------------------------------------------------------------- + +function logJson(level, message, extra) { + if (LOG_LEVELS[level] === undefined || LOG_LEVELS[level] < CURRENT_LOG_LEVEL) { + return; + } + const entry = Object.assign( + { + timestamp: new Date().toISOString(), + level: level, + sensor: SENSOR_REF, + message: message, + }, + extra || {} + ); + process.stderr.write(JSON.stringify(entry) + "\n"); +} + +// --------------------------------------------------------------------------- +// HTTP helpers (built-in, no external deps for sensor API calls) +// --------------------------------------------------------------------------- + +/** + * Make an HTTP request using Node built-in http/https modules. + * Returns a Promise resolving to { statusCode, body }. + */ +function httpRequest(method, reqUrl, body) { + return new Promise(function (resolve, reject) { + const parsed = new url.URL(reqUrl); + const isHttps = parsed.protocol === "https:"; + const lib = isHttps ? https : http; + + const options = { + method: method, + hostname: parsed.hostname, + port: parsed.port || (isHttps ? 443 : 80), + path: parsed.pathname + parsed.search, + headers: { + Authorization: "Bearer " + API_TOKEN, + "Content-Type": "application/json", + }, + timeout: 5000, + }; + + const req = lib.request(options, function (res) { + const chunks = []; + res.on("data", function (chunk) { + chunks.push(chunk); + }); + res.on("end", function () { + const raw = Buffer.concat(chunks).toString("utf8"); + let parsed = null; + try { + parsed = JSON.parse(raw); + } catch (_e) { + // not JSON + } + resolve({ statusCode: res.statusCode, body: parsed, raw: raw }); + }); + }); + + req.on("error", function (err) { + reject(err); + }); + + req.on("timeout", function () { + req.destroy(new Error("Request timeout")); + }); + + if (body !== undefined && body !== null) { + req.write(typeof body === "string" ? body : JSON.stringify(body)); + } + req.end(); + }); +} + +// --------------------------------------------------------------------------- +// Keystore API helpers +// --------------------------------------------------------------------------- + +async function keystoreGet(keyRef) { + try { + const res = await httpRequest( + "GET", + API_URL + "/api/v1/keys/" + encodeURIComponent(keyRef) + ); + if (res.statusCode === 200 && res.body && res.body.data) { + return res.body.data.value; + } + if (res.statusCode === 404) { + return null; + } + logJson("warn", "keystore GET unexpected status", { + key: keyRef, + status: res.statusCode, + }); + return null; + } catch (err) { + logJson("error", "keystore GET failed", { + key: keyRef, + error: err.message, + }); + return null; + } +} + +async function keystorePut(keyRef, value) { + try { + const res = await httpRequest( + "PUT", + API_URL + "/api/v1/keys/" + encodeURIComponent(keyRef), + { value: String(value), name: "Counter: " + keyRef } + ); + if (res.statusCode !== 200 && res.statusCode !== 201) { + logJson("warn", "keystore PUT unexpected status", { + key: keyRef, + status: res.statusCode, + }); + return false; + } + return true; + } catch (err) { + logJson("error", "keystore PUT failed", { + key: keyRef, + error: err.message, + }); + return false; + } +} + +async function keystoreCreate(keyRef, value) { + try { + const res = await httpRequest("POST", API_URL + "/api/v1/keys", { + ref: keyRef, + owner_type: "sensor", + owner_sensor_ref: SENSOR_REF, + name: "Counter: " + keyRef, + value: String(value), + encrypted: false, + }); + if (res.statusCode === 200 || res.statusCode === 201) { + return true; + } + if (res.statusCode === 409) { + // Key already exists (race condition) — update instead + return await keystorePut(keyRef, value); + } + logJson("warn", "keystore POST unexpected status", { + key: keyRef, + status: res.statusCode, + }); + return false; + } catch (err) { + logJson("error", "keystore POST failed", { + key: keyRef, + error: err.message, + }); + return false; + } +} + +// --------------------------------------------------------------------------- +// Event emission +// --------------------------------------------------------------------------- + +async function emitEvent(payload, ruleRef) { + const body = { + trigger_type: TRIGGER_TYPE, + payload: payload, + trigger_instance_id: "rule_" + ruleRef, + }; + + for (let attempt = 0; attempt < 3; attempt++) { + try { + const res = await httpRequest("POST", API_URL + "/api/v1/events", body); + if (res.statusCode === 200 || res.statusCode === 201) { + return true; + } + logJson("warn", "event POST unexpected status", { + status: res.statusCode, + attempt: attempt + 1, + rule_ref: ruleRef, + }); + } catch (err) { + logJson("error", "event POST failed", { + error: err.message, + attempt: attempt + 1, + }); + } + // Exponential backoff + await sleep(500 * Math.pow(2, attempt)); + } + return false; +} + +// --------------------------------------------------------------------------- +// Utilities +// --------------------------------------------------------------------------- + +function sleep(ms) { + return new Promise(function (resolve) { + setTimeout(resolve, ms); + }); +} + +// --------------------------------------------------------------------------- +// Per-rule counter +// --------------------------------------------------------------------------- + +class RuleCounter { + /** + * @param {number} ruleId + * @param {string} ruleRef + * @param {object} triggerParams + */ + constructor(ruleId, ruleRef, triggerParams) { + this.ruleId = ruleId; + this.ruleRef = ruleRef; + this.interval = (triggerParams && triggerParams.interval_seconds) || DEFAULT_INTERVAL; + this.keyRef = KEY_PREFIX + "." + ruleRef.replace(/\./g, "_"); + this._timer = null; + this._stopped = false; + this._ticking = false; + } + + start() { + if (this._timer !== null) { + return; + } + this._stopped = false; + this._scheduleTick(); + logJson("info", "counter started", { + rule_id: this.ruleId, + rule_ref: this.ruleRef, + interval: this.interval, + }); + } + + stop() { + this._stopped = true; + if (this._timer !== null) { + clearTimeout(this._timer); + this._timer = null; + } + logJson("info", "counter stopped", { + rule_id: this.ruleId, + rule_ref: this.ruleRef, + }); + } + + _scheduleTick() { + if (this._stopped) return; + const self = this; + this._timer = setTimeout(function () { + self._timer = null; + if (self._stopped) return; + self._ticking = true; + self + ._tick() + .catch(function (err) { + logJson("error", "counter tick failed", { + rule_ref: self.ruleRef, + error: err.message, + }); + }) + .then(function () { + self._ticking = false; + self._scheduleTick(); + }); + }, this.interval * 1000); + } + + async _tick() { + // Read current value from keystore + const raw = await keystoreGet(this.keyRef); + let counter = 0; + if (raw !== null) { + const parsed = parseInt(raw, 10); + counter = isNaN(parsed) ? 0 : parsed; + } + + // Increment + counter += 1; + + // Write back to keystore + if (raw === null) { + await keystoreCreate(this.keyRef, counter); + } else { + await keystorePut(this.keyRef, counter); + } + + // Build event payload + const payload = { + counter: counter, + rule_ref: this.ruleRef, + sensor_ref: SENSOR_REF, + fired_at: new Date().toISOString(), + }; + + // Emit event + await emitEvent(payload, this.ruleRef); + } +} + +// --------------------------------------------------------------------------- +// Rule manager +// --------------------------------------------------------------------------- + +class RuleManager { + constructor() { + /** @type {Map} */ + this._rules = new Map(); + } + + handleMessage(message) { + const eventType = message.event_type || ""; + const ruleId = message.rule_id; + const ruleRef = message.rule_ref || "rule_" + ruleId; + const triggerType = message.trigger_type || ""; + const triggerParams = message.trigger_params || {}; + + // Only handle messages for our trigger type + if (triggerType && triggerType !== TRIGGER_TYPE) { + return; + } + + if (eventType === "RuleCreated" || eventType === "RuleEnabled") { + this._startRule(ruleId, ruleRef, triggerParams); + } else if (eventType === "RuleDisabled" || eventType === "RuleDeleted") { + this._stopRule(ruleId); + } else { + logJson("debug", "ignoring unknown event_type", { + event_type: eventType, + }); + } + } + + _startRule(ruleId, ruleRef, triggerParams) { + if (this._rules.has(ruleId)) { + // Already tracking — restart with potentially new params + this._rules.get(ruleId).stop(); + } + const rc = new RuleCounter(ruleId, ruleRef, triggerParams); + this._rules.set(ruleId, rc); + rc.start(); + } + + _stopRule(ruleId) { + const rc = this._rules.get(ruleId); + if (rc) { + rc.stop(); + this._rules.delete(ruleId); + } + } + + stopAll() { + for (const [_id, rc] of this._rules) { + rc.stop(); + } + this._rules.clear(); + } + + get activeCount() { + return this._rules.size; + } +} + +// --------------------------------------------------------------------------- +// RabbitMQ consumer +// --------------------------------------------------------------------------- + +async function startMqConsumer(ruleManager, shutdownSignal) { + const queueName = "sensor." + SENSOR_REF; + + while (!shutdownSignal.stopped) { + let connection = null; + let channel = null; + try { + logJson("info", "connecting to RabbitMQ", { url: MQ_URL }); + connection = await amqplib.connect(MQ_URL, { heartbeat: 30 }); + + connection.on("error", function (err) { + logJson("warn", "RabbitMQ connection error", { error: err.message }); + }); + connection.on("close", function () { + logJson("debug", "RabbitMQ connection closed"); + }); + + channel = await connection.createChannel(); + + // Declare exchange (idempotent) + await channel.assertExchange(MQ_EXCHANGE, "topic", { durable: true }); + + // Declare and bind queue + await channel.assertQueue(queueName, { durable: true }); + for (const rk of ROUTING_KEYS) { + await channel.bindQueue(queueName, MQ_EXCHANGE, rk); + } + + logJson("info", "RabbitMQ connected, consuming", { queue: queueName }); + + // Set up consumer + await channel.consume( + queueName, + function (msg) { + if (msg === null) return; // consumer cancelled + + try { + const message = JSON.parse(msg.content.toString("utf8")); + logJson("debug", "received MQ message", { + event_type: message.event_type, + }); + ruleManager.handleMessage(message); + } catch (err) { + if (err instanceof SyntaxError) { + logJson("warn", "invalid JSON in MQ message", { + body: msg.content.toString("utf8").slice(0, 200), + }); + } else { + logJson("error", "error processing MQ message", { + error: err.message, + }); + } + } + + channel.ack(msg); + }, + { noAck: false } + ); + + // Wait until shutdown or connection drops + await new Promise(function (resolve) { + function onShutdown() { + resolve(); + } + shutdownSignal.once("shutdown", onShutdown); + connection.once("close", function () { + shutdownSignal.removeListener("shutdown", onShutdown); + resolve(); + }); + }); + } catch (err) { + logJson("warn", "RabbitMQ error, retrying in 5s", { + error: err.message, + }); + } finally { + if (channel) { + try { + await channel.close(); + } catch (_e) { + // ignore + } + } + if (connection) { + try { + await connection.close(); + } catch (_e) { + // ignore + } + } + } + + if (!shutdownSignal.stopped) { + await sleep(5000); + } + } +} + +// --------------------------------------------------------------------------- +// Bootstrap: fetch existing active rules on startup +// --------------------------------------------------------------------------- + +async function fetchActiveRules(ruleManager) { + try { + const reqUrl = + API_URL + + "/api/v1/rules?trigger_ref=" + + encodeURIComponent(TRIGGER_TYPE) + + "&enabled=true"; + const res = await httpRequest("GET", reqUrl); + + if (res.statusCode !== 200) { + logJson("warn", "failed to fetch active rules", { + status: res.statusCode, + }); + return; + } + + const rules = + res.body && Array.isArray(res.body.data) ? res.body.data : []; + + for (const rule of rules) { + const ruleId = rule.id; + const ruleRef = rule.ref || "rule_" + ruleId; + const enabled = rule.enabled !== false; + if (ruleId && enabled) { + ruleManager.handleMessage({ + event_type: "RuleCreated", + rule_id: ruleId, + rule_ref: ruleRef, + trigger_type: TRIGGER_TYPE, + trigger_params: rule.trigger_params || {}, + }); + } + } + + logJson("info", "bootstrapped active rules", { count: rules.length }); + } catch (err) { + logJson("warn", "could not fetch active rules on startup", { + error: err.message, + }); + } +} + +// --------------------------------------------------------------------------- +// Shutdown signal (simple EventEmitter-like) +// --------------------------------------------------------------------------- + +const EventEmitter = require("events"); + +class ShutdownSignal extends EventEmitter { + constructor() { + super(); + this.stopped = false; + } + + trigger() { + if (this.stopped) return; + this.stopped = true; + this.emit("shutdown"); + } +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +async function main() { + logJson("info", "sensor starting", { + sensor_ref: SENSOR_REF, + api_url: API_URL, + }); + + const shutdownSignal = new ShutdownSignal(); + const ruleManager = new RuleManager(); + + // Handle termination signals + function onSignal(sig) { + logJson("info", "shutdown signal received", { signal: sig }); + shutdownSignal.trigger(); + } + process.on("SIGTERM", function () { + onSignal("SIGTERM"); + }); + process.on("SIGINT", function () { + onSignal("SIGINT"); + }); + + // Bootstrap: load already-active rules from the API + await fetchActiveRules(ruleManager); + + // Start RabbitMQ consumer (runs until shutdown) + const mqPromise = startMqConsumer(ruleManager, shutdownSignal); + + logJson("info", "sensor running", { + active_rules: ruleManager.activeCount, + }); + + // Heartbeat loop in the main flow + while (!shutdownSignal.stopped) { + await new Promise(function (resolve) { + const timer = setTimeout(resolve, 10000); + shutdownSignal.once("shutdown", function () { + clearTimeout(timer); + resolve(); + }); + }); + if (!shutdownSignal.stopped) { + logJson("debug", "heartbeat", { + active_rules: ruleManager.activeCount, + }); + } + } + + // Graceful shutdown + logJson("info", "shutting down", { + active_rules: ruleManager.activeCount, + }); + ruleManager.stopAll(); + + // Give MQ consumer a moment to clean up + await Promise.race([mqPromise, sleep(5000)]); + + logJson("info", "sensor stopped"); + process.exit(0); +} + +main().catch(function (err) { + logJson("error", "fatal error", { error: err.message, stack: err.stack }); + process.exit(1); +}); diff --git a/sensors/counter_sensor.yaml b/sensors/counter_sensor.yaml new file mode 100644 index 0000000..420c0bc --- /dev/null +++ b/sensors/counter_sensor.yaml @@ -0,0 +1,75 @@ +# Counter Sensor +# Emits incrementing counter events, storing state in the Attune keystore +# +# Each subscribing rule gets its own independent counter, keyed by rule ref. +# The sensor listens for rule lifecycle events via RabbitMQ and manages +# per-rule timer loops that emit one event per second (configurable). + +ref: nodejs_example.counter_sensor +label: "Counter Sensor" +description: "Emits periodic counter events with per-rule state stored in the Attune keystore" +enabled: true + +# Sensor runner type +runner_type: nodejs + +# Entry point for sensor execution +entry_point: counter_sensor.js + +# Trigger types this sensor monitors +trigger_types: + - nodejs_example.counter + +# Sensor configuration schema +parameters: + type: object + properties: + default_interval_seconds: + type: integer + description: "Default interval between counter emissions (in seconds)" + default: 1 + minimum: 1 + maximum: 3600 + key_prefix: + type: string + description: "Prefix for counter keys in the Attune keystore" + default: "nodejs_example.counter" + +# Poll interval (how often the sensor checks for events) +poll_interval: 1 + +# Tags for categorization +tags: + - counter + - nodejs + - example + - keystore + +# Metadata +meta: + builtin: false + system: false + description: | + The counter sensor demonstrates a stateful Node.js sensor that integrates + with the Attune keystore. It maintains a separate monotonically-increasing + counter for each subscribing rule, persisting the value in the keystore + so that counters survive sensor restarts. + + Features exercised: + - Node.js sensor lifecycle (startup, rule subscription, shutdown) + - RabbitMQ integration (amqplib) for rule lifecycle events + - Attune keystore API for persistent state (GET/PUT/POST /api/v1/keys) + - Per-rule scoped state via keystore key naming + - Periodic event emission via POST /events API + - Graceful shutdown with timer cleanup + +# Documentation +examples: + - description: "Counter firing every second (default)" + trigger_type: nodejs_example.counter + trigger_config: {} + + - description: "Counter firing every 5 seconds" + trigger_type: nodejs_example.counter + trigger_config: + interval_seconds: 5 diff --git a/triggers/counter.yaml b/triggers/counter.yaml new file mode 100644 index 0000000..1b3b3be --- /dev/null +++ b/triggers/counter.yaml @@ -0,0 +1,59 @@ +# Counter Trigger +# Fires periodically with an incrementing counter value + +ref: nodejs_example.counter +label: "Counter Trigger" +description: "Fires at regular intervals with an incrementing counter value scoped per rule" +enabled: true + +# Trigger type +type: custom + +# Parameter schema - configuration for the trigger instance +parameters: + type: object + properties: + interval_seconds: + type: integer + description: "Seconds between each counter emission" + default: 1 + minimum: 1 + maximum: 3600 + required: [] + +# Payload schema - data emitted when trigger fires +output: + type: object + properties: + counter: + type: integer + description: "Current counter value (monotonically increasing per rule)" + rule_ref: + type: string + description: "Reference of the rule that this counter is scoped to" + sensor_ref: + type: string + description: "Reference to the sensor that generated this event" + fired_at: + type: string + format: date-time + description: "Timestamp when the trigger fired" + required: + - counter + - rule_ref + - fired_at + +# Tags for categorization +tags: + - counter + - example + - nodejs + +# Documentation +examples: + - description: "Counter firing every second (default)" + parameters: {} + + - description: "Counter firing every 5 seconds" + parameters: + interval_seconds: 5