Initial commit: Node.js Example Pack for Attune

Includes:
- 3 Node.js actions (hello, http_example, read_counter)
- 1 counter trigger type
- 1 counter sensor (Node.js, keystore-backed, per-rule state)
- 1 example rule (count_and_log)
- package.json with node-fetch and amqplib
- README with full usage documentation
This commit is contained in:
2026-02-11 17:36:38 -06:00
commit 9072c93fe4
15 changed files with 1442 additions and 0 deletions

658
sensors/counter_sensor.js Normal file
View File

@@ -0,0 +1,658 @@
#!/usr/bin/env node
/**
* Counter Sensor - Node.js Example Pack
*
* A stateful Node.js sensor that demonstrates:
* - RabbitMQ integration (amqplib) for rule lifecycle events
* - Attune keystore API for persistent per-rule counter state
* - Periodic event emission via the Attune events API
* - Per-rule scoped counters with independent timers
* - Graceful shutdown with timer cleanup
*
* Environment Variables (provided by attune-sensor service):
* ATTUNE_API_URL - Base URL of the Attune API (e.g. http://localhost:8080)
* ATTUNE_API_TOKEN - JWT token for authenticating API calls
* ATTUNE_SENSOR_ID - Database ID of this sensor instance
* ATTUNE_SENSOR_REF - Reference name (nodejs_example.counter_sensor)
* ATTUNE_MQ_URL - RabbitMQ connection URL (e.g. amqp://localhost:5672)
* ATTUNE_MQ_EXCHANGE - RabbitMQ exchange name (default: attune)
* ATTUNE_LOG_LEVEL - Logging verbosity (default: info)
*/
"use strict";
const http = require("http");
const https = require("https");
const url = require("url");
const amqplib = require("amqplib");
// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------
const API_URL = process.env.ATTUNE_API_URL || "http://localhost:8080";
const API_TOKEN = process.env.ATTUNE_API_TOKEN || "";
const SENSOR_ID = process.env.ATTUNE_SENSOR_ID || "0";
const SENSOR_REF =
process.env.ATTUNE_SENSOR_REF || "nodejs_example.counter_sensor";
const MQ_URL = process.env.ATTUNE_MQ_URL || "amqp://localhost:5672";
const MQ_EXCHANGE = process.env.ATTUNE_MQ_EXCHANGE || "attune";
const LOG_LEVEL = (process.env.ATTUNE_LOG_LEVEL || "info").toLowerCase();
const TRIGGER_TYPE = "nodejs_example.counter";
const KEY_PREFIX = "nodejs_example.counter";
const DEFAULT_INTERVAL = 1; // seconds
const LOG_LEVELS = { debug: 0, info: 1, warn: 2, error: 3 };
const CURRENT_LOG_LEVEL = LOG_LEVELS[LOG_LEVEL] !== undefined ? LOG_LEVELS[LOG_LEVEL] : 1;
const ROUTING_KEYS = [
"rule.created",
"rule.enabled",
"rule.disabled",
"rule.deleted",
];
// ---------------------------------------------------------------------------
// Logging
// ---------------------------------------------------------------------------
function logJson(level, message, extra) {
if (LOG_LEVELS[level] === undefined || LOG_LEVELS[level] < CURRENT_LOG_LEVEL) {
return;
}
const entry = Object.assign(
{
timestamp: new Date().toISOString(),
level: level,
sensor: SENSOR_REF,
message: message,
},
extra || {}
);
process.stderr.write(JSON.stringify(entry) + "\n");
}
// ---------------------------------------------------------------------------
// HTTP helpers (built-in, no external deps for sensor API calls)
// ---------------------------------------------------------------------------
/**
* Make an HTTP request using Node built-in http/https modules.
* Returns a Promise resolving to { statusCode, body }.
*/
function httpRequest(method, reqUrl, body) {
return new Promise(function (resolve, reject) {
const parsed = new url.URL(reqUrl);
const isHttps = parsed.protocol === "https:";
const lib = isHttps ? https : http;
const options = {
method: method,
hostname: parsed.hostname,
port: parsed.port || (isHttps ? 443 : 80),
path: parsed.pathname + parsed.search,
headers: {
Authorization: "Bearer " + API_TOKEN,
"Content-Type": "application/json",
},
timeout: 5000,
};
const req = lib.request(options, function (res) {
const chunks = [];
res.on("data", function (chunk) {
chunks.push(chunk);
});
res.on("end", function () {
const raw = Buffer.concat(chunks).toString("utf8");
let parsed = null;
try {
parsed = JSON.parse(raw);
} catch (_e) {
// not JSON
}
resolve({ statusCode: res.statusCode, body: parsed, raw: raw });
});
});
req.on("error", function (err) {
reject(err);
});
req.on("timeout", function () {
req.destroy(new Error("Request timeout"));
});
if (body !== undefined && body !== null) {
req.write(typeof body === "string" ? body : JSON.stringify(body));
}
req.end();
});
}
// ---------------------------------------------------------------------------
// Keystore API helpers
// ---------------------------------------------------------------------------
async function keystoreGet(keyRef) {
try {
const res = await httpRequest(
"GET",
API_URL + "/api/v1/keys/" + encodeURIComponent(keyRef)
);
if (res.statusCode === 200 && res.body && res.body.data) {
return res.body.data.value;
}
if (res.statusCode === 404) {
return null;
}
logJson("warn", "keystore GET unexpected status", {
key: keyRef,
status: res.statusCode,
});
return null;
} catch (err) {
logJson("error", "keystore GET failed", {
key: keyRef,
error: err.message,
});
return null;
}
}
async function keystorePut(keyRef, value) {
try {
const res = await httpRequest(
"PUT",
API_URL + "/api/v1/keys/" + encodeURIComponent(keyRef),
{ value: String(value), name: "Counter: " + keyRef }
);
if (res.statusCode !== 200 && res.statusCode !== 201) {
logJson("warn", "keystore PUT unexpected status", {
key: keyRef,
status: res.statusCode,
});
return false;
}
return true;
} catch (err) {
logJson("error", "keystore PUT failed", {
key: keyRef,
error: err.message,
});
return false;
}
}
async function keystoreCreate(keyRef, value) {
try {
const res = await httpRequest("POST", API_URL + "/api/v1/keys", {
ref: keyRef,
owner_type: "sensor",
owner_sensor_ref: SENSOR_REF,
name: "Counter: " + keyRef,
value: String(value),
encrypted: false,
});
if (res.statusCode === 200 || res.statusCode === 201) {
return true;
}
if (res.statusCode === 409) {
// Key already exists (race condition) — update instead
return await keystorePut(keyRef, value);
}
logJson("warn", "keystore POST unexpected status", {
key: keyRef,
status: res.statusCode,
});
return false;
} catch (err) {
logJson("error", "keystore POST failed", {
key: keyRef,
error: err.message,
});
return false;
}
}
// ---------------------------------------------------------------------------
// Event emission
// ---------------------------------------------------------------------------
async function emitEvent(payload, ruleRef) {
const body = {
trigger_type: TRIGGER_TYPE,
payload: payload,
trigger_instance_id: "rule_" + ruleRef,
};
for (let attempt = 0; attempt < 3; attempt++) {
try {
const res = await httpRequest("POST", API_URL + "/api/v1/events", body);
if (res.statusCode === 200 || res.statusCode === 201) {
return true;
}
logJson("warn", "event POST unexpected status", {
status: res.statusCode,
attempt: attempt + 1,
rule_ref: ruleRef,
});
} catch (err) {
logJson("error", "event POST failed", {
error: err.message,
attempt: attempt + 1,
});
}
// Exponential backoff
await sleep(500 * Math.pow(2, attempt));
}
return false;
}
// ---------------------------------------------------------------------------
// Utilities
// ---------------------------------------------------------------------------
function sleep(ms) {
return new Promise(function (resolve) {
setTimeout(resolve, ms);
});
}
// ---------------------------------------------------------------------------
// Per-rule counter
// ---------------------------------------------------------------------------
class RuleCounter {
/**
* @param {number} ruleId
* @param {string} ruleRef
* @param {object} triggerParams
*/
constructor(ruleId, ruleRef, triggerParams) {
this.ruleId = ruleId;
this.ruleRef = ruleRef;
this.interval = (triggerParams && triggerParams.interval_seconds) || DEFAULT_INTERVAL;
this.keyRef = KEY_PREFIX + "." + ruleRef.replace(/\./g, "_");
this._timer = null;
this._stopped = false;
this._ticking = false;
}
start() {
if (this._timer !== null) {
return;
}
this._stopped = false;
this._scheduleTick();
logJson("info", "counter started", {
rule_id: this.ruleId,
rule_ref: this.ruleRef,
interval: this.interval,
});
}
stop() {
this._stopped = true;
if (this._timer !== null) {
clearTimeout(this._timer);
this._timer = null;
}
logJson("info", "counter stopped", {
rule_id: this.ruleId,
rule_ref: this.ruleRef,
});
}
_scheduleTick() {
if (this._stopped) return;
const self = this;
this._timer = setTimeout(function () {
self._timer = null;
if (self._stopped) return;
self._ticking = true;
self
._tick()
.catch(function (err) {
logJson("error", "counter tick failed", {
rule_ref: self.ruleRef,
error: err.message,
});
})
.then(function () {
self._ticking = false;
self._scheduleTick();
});
}, this.interval * 1000);
}
async _tick() {
// Read current value from keystore
const raw = await keystoreGet(this.keyRef);
let counter = 0;
if (raw !== null) {
const parsed = parseInt(raw, 10);
counter = isNaN(parsed) ? 0 : parsed;
}
// Increment
counter += 1;
// Write back to keystore
if (raw === null) {
await keystoreCreate(this.keyRef, counter);
} else {
await keystorePut(this.keyRef, counter);
}
// Build event payload
const payload = {
counter: counter,
rule_ref: this.ruleRef,
sensor_ref: SENSOR_REF,
fired_at: new Date().toISOString(),
};
// Emit event
await emitEvent(payload, this.ruleRef);
}
}
// ---------------------------------------------------------------------------
// Rule manager
// ---------------------------------------------------------------------------
class RuleManager {
constructor() {
/** @type {Map<number, RuleCounter>} */
this._rules = new Map();
}
handleMessage(message) {
const eventType = message.event_type || "";
const ruleId = message.rule_id;
const ruleRef = message.rule_ref || "rule_" + ruleId;
const triggerType = message.trigger_type || "";
const triggerParams = message.trigger_params || {};
// Only handle messages for our trigger type
if (triggerType && triggerType !== TRIGGER_TYPE) {
return;
}
if (eventType === "RuleCreated" || eventType === "RuleEnabled") {
this._startRule(ruleId, ruleRef, triggerParams);
} else if (eventType === "RuleDisabled" || eventType === "RuleDeleted") {
this._stopRule(ruleId);
} else {
logJson("debug", "ignoring unknown event_type", {
event_type: eventType,
});
}
}
_startRule(ruleId, ruleRef, triggerParams) {
if (this._rules.has(ruleId)) {
// Already tracking — restart with potentially new params
this._rules.get(ruleId).stop();
}
const rc = new RuleCounter(ruleId, ruleRef, triggerParams);
this._rules.set(ruleId, rc);
rc.start();
}
_stopRule(ruleId) {
const rc = this._rules.get(ruleId);
if (rc) {
rc.stop();
this._rules.delete(ruleId);
}
}
stopAll() {
for (const [_id, rc] of this._rules) {
rc.stop();
}
this._rules.clear();
}
get activeCount() {
return this._rules.size;
}
}
// ---------------------------------------------------------------------------
// RabbitMQ consumer
// ---------------------------------------------------------------------------
async function startMqConsumer(ruleManager, shutdownSignal) {
const queueName = "sensor." + SENSOR_REF;
while (!shutdownSignal.stopped) {
let connection = null;
let channel = null;
try {
logJson("info", "connecting to RabbitMQ", { url: MQ_URL });
connection = await amqplib.connect(MQ_URL, { heartbeat: 30 });
connection.on("error", function (err) {
logJson("warn", "RabbitMQ connection error", { error: err.message });
});
connection.on("close", function () {
logJson("debug", "RabbitMQ connection closed");
});
channel = await connection.createChannel();
// Declare exchange (idempotent)
await channel.assertExchange(MQ_EXCHANGE, "topic", { durable: true });
// Declare and bind queue
await channel.assertQueue(queueName, { durable: true });
for (const rk of ROUTING_KEYS) {
await channel.bindQueue(queueName, MQ_EXCHANGE, rk);
}
logJson("info", "RabbitMQ connected, consuming", { queue: queueName });
// Set up consumer
await channel.consume(
queueName,
function (msg) {
if (msg === null) return; // consumer cancelled
try {
const message = JSON.parse(msg.content.toString("utf8"));
logJson("debug", "received MQ message", {
event_type: message.event_type,
});
ruleManager.handleMessage(message);
} catch (err) {
if (err instanceof SyntaxError) {
logJson("warn", "invalid JSON in MQ message", {
body: msg.content.toString("utf8").slice(0, 200),
});
} else {
logJson("error", "error processing MQ message", {
error: err.message,
});
}
}
channel.ack(msg);
},
{ noAck: false }
);
// Wait until shutdown or connection drops
await new Promise(function (resolve) {
function onShutdown() {
resolve();
}
shutdownSignal.once("shutdown", onShutdown);
connection.once("close", function () {
shutdownSignal.removeListener("shutdown", onShutdown);
resolve();
});
});
} catch (err) {
logJson("warn", "RabbitMQ error, retrying in 5s", {
error: err.message,
});
} finally {
if (channel) {
try {
await channel.close();
} catch (_e) {
// ignore
}
}
if (connection) {
try {
await connection.close();
} catch (_e) {
// ignore
}
}
}
if (!shutdownSignal.stopped) {
await sleep(5000);
}
}
}
// ---------------------------------------------------------------------------
// Bootstrap: fetch existing active rules on startup
// ---------------------------------------------------------------------------
async function fetchActiveRules(ruleManager) {
try {
const reqUrl =
API_URL +
"/api/v1/rules?trigger_ref=" +
encodeURIComponent(TRIGGER_TYPE) +
"&enabled=true";
const res = await httpRequest("GET", reqUrl);
if (res.statusCode !== 200) {
logJson("warn", "failed to fetch active rules", {
status: res.statusCode,
});
return;
}
const rules =
res.body && Array.isArray(res.body.data) ? res.body.data : [];
for (const rule of rules) {
const ruleId = rule.id;
const ruleRef = rule.ref || "rule_" + ruleId;
const enabled = rule.enabled !== false;
if (ruleId && enabled) {
ruleManager.handleMessage({
event_type: "RuleCreated",
rule_id: ruleId,
rule_ref: ruleRef,
trigger_type: TRIGGER_TYPE,
trigger_params: rule.trigger_params || {},
});
}
}
logJson("info", "bootstrapped active rules", { count: rules.length });
} catch (err) {
logJson("warn", "could not fetch active rules on startup", {
error: err.message,
});
}
}
// ---------------------------------------------------------------------------
// Shutdown signal (simple EventEmitter-like)
// ---------------------------------------------------------------------------
const EventEmitter = require("events");
class ShutdownSignal extends EventEmitter {
constructor() {
super();
this.stopped = false;
}
trigger() {
if (this.stopped) return;
this.stopped = true;
this.emit("shutdown");
}
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
async function main() {
logJson("info", "sensor starting", {
sensor_ref: SENSOR_REF,
api_url: API_URL,
});
const shutdownSignal = new ShutdownSignal();
const ruleManager = new RuleManager();
// Handle termination signals
function onSignal(sig) {
logJson("info", "shutdown signal received", { signal: sig });
shutdownSignal.trigger();
}
process.on("SIGTERM", function () {
onSignal("SIGTERM");
});
process.on("SIGINT", function () {
onSignal("SIGINT");
});
// Bootstrap: load already-active rules from the API
await fetchActiveRules(ruleManager);
// Start RabbitMQ consumer (runs until shutdown)
const mqPromise = startMqConsumer(ruleManager, shutdownSignal);
logJson("info", "sensor running", {
active_rules: ruleManager.activeCount,
});
// Heartbeat loop in the main flow
while (!shutdownSignal.stopped) {
await new Promise(function (resolve) {
const timer = setTimeout(resolve, 10000);
shutdownSignal.once("shutdown", function () {
clearTimeout(timer);
resolve();
});
});
if (!shutdownSignal.stopped) {
logJson("debug", "heartbeat", {
active_rules: ruleManager.activeCount,
});
}
}
// Graceful shutdown
logJson("info", "shutting down", {
active_rules: ruleManager.activeCount,
});
ruleManager.stopAll();
// Give MQ consumer a moment to clean up
await Promise.race([mqPromise, sleep(5000)]);
logJson("info", "sensor stopped");
process.exit(0);
}
main().catch(function (err) {
logJson("error", "fatal error", { error: err.message, stack: err.stack });
process.exit(1);
});