15 KiB
Sensor Lifecycle Management
Overview
Attune implements intelligent sensor lifecycle management to optimize resource usage and enhance security. Sensors are only started when there are active rules that subscribe to their triggers, and they are stopped (with token revocation) when no active rules exist.
This ensures:
- Resource efficiency: No CPU/memory wasted on sensors without consumers
- Security: API tokens are revoked when sensors are not in use
- Cost optimization: Reduced cloud infrastructure costs
- Clean architecture: Sensors operate on-demand based on actual usage
Architecture
Components
- SensorManager - Manages sensor process lifecycle
- RuleLifecycleListener - Monitors rule creation/enable/disable events via RabbitMQ
- Token Management - Issues and revokes sensor authentication tokens
- Database Queries - Tracks active rule counts per sensor
Data Flow
Rule Change Event (RabbitMQ)
↓
RuleLifecycleListener
↓
SensorManager.handle_rule_change()
↓
Check active rule count for sensor
↓
┌─────────────────────────────┐
│ Active rules > 0? │
├─────────────────────────────┤
│ YES → Sensor not running? │
│ ├─ Issue token │
│ ├─ Start sensor │
│ └─ Register process │
│ │
│ NO → Sensor running? │
│ ├─ Stop sensor │
│ ├─ Revoke token │
│ └─ Cleanup process │
└─────────────────────────────┘
Rule-Sensor-Trigger Relationship
Database Schema
-- A sensor monitors a specific trigger type
sensor.trigger → trigger.id
-- A rule subscribes to a trigger
rule.trigger → trigger.id
-- Relationship: sensor ← trigger → rule(s)
-- Multiple rules can subscribe to the same trigger
-- One sensor can serve multiple rules (all sharing the trigger type)
Active Rule Query
To determine if a sensor should be running:
SELECT COUNT(*)
FROM rule
WHERE trigger = (SELECT trigger FROM sensor WHERE id = $sensor_id)
AND enabled = TRUE;
If count > 0: Sensor should be running If count = 0: Sensor should be stopped
Lifecycle States
Sensor States
- STOPPED - Sensor process not running, no token issued
- STARTING - Token issued, process spawning
- RUNNING - Process active, monitoring for trigger events
- STOPPING - Process shutting down, token being revoked
- ERROR - Failed to start/stop (requires manual intervention)
State Transitions
STOPPED ──(rule created/enabled)──> STARTING ──(process ready)──> RUNNING
│
│
STOPPED <──(token revoked)──< STOPPING <──(rule disabled/deleted)────┘
Implementation Details
SensorManager Methods
start_sensor(sensor_id)
- Query database for sensor configuration
- Issue service account token via API
- Type:
sensor - Scope: Sensor-specific trigger types
- TTL: 90 days (with auto-refresh)
- Type:
- Start sensor process:
- Native sensors: Spawn binary with environment config
- Python/Script sensors: Execute via runtime
- Register process handle in memory
- Monitor process health
stop_sensor(sensor_id, revoke_token)
- Send SIGTERM to sensor process
- Wait for graceful shutdown (timeout: 30s)
- Force kill (SIGKILL) if timeout exceeded
- If
revoke_token == true:- Call API to revoke sensor token
- Add token to revocation table
- Remove from running sensors registry
- Log shutdown event
handle_rule_change(trigger_id)
- Find all sensors for the given trigger
- For each sensor:
- Query active rule count
- Check if sensor is currently running
- Determine action based on state matrix:
| Active Rules | Running | Action |
|---|---|---|
| Yes | Yes | No action (continue running) |
| Yes | No | Start sensor + issue token |
| No | Yes | Stop sensor + revoke token |
| No | No | No action (remain stopped) |
RuleLifecycleListener Integration
The RuleLifecycleListener subscribes to these RabbitMQ events:
rule.created- New rule addedrule.enabled- Existing rule activatedrule.disabled- Existing rule deactivatedrule.deleted- Rule removed (future)
On each event:
async fn handle_rule_event(event: RuleEvent) {
// Extract trigger_id from rule
let trigger_id = get_trigger_for_rule(event.rule_id).await?;
// Notify sensor manager
sensor_manager.handle_rule_change(trigger_id).await?;
}
Token Management
Token Issuance
When a sensor needs to start:
// Create service account for sensor
let token = api_client.create_sensor_token(SensorTokenRequest {
sensor_id,
sensor_ref: "core.interval_timer_sensor",
trigger_types: vec!["core.intervaltimer"],
ttl_days: 90,
}).await?;
// Pass token to sensor via environment variable
env::set_var("ATTUNE_API_TOKEN", token.access_token);
Token Revocation
When a sensor is stopped:
// Revoke sensor token
api_client.revoke_token(token_id).await?;
// Token is added to revocation table with expiration
// Cleanup job removes expired revocations periodically
Token Refresh
Native sensors (like attune-core-timer-sensor) implement automatic token refresh:
// TokenRefreshManager runs in background
// Refreshes token at 80% of TTL (72 days for 90-day tokens)
let refresh_manager = TokenRefreshManager::new(api_client, 0.8);
refresh_manager.start();
Sensor Process Management
Native Sensors (Rust Binaries)
Native sensors are standalone executables managed by the SensorManager:
# Start command
ATTUNE_API_URL=http://api:8080 \
ATTUNE_API_TOKEN=<token> \
ATTUNE_SENSOR_REF=core.interval_timer_sensor \
ATTUNE_MQ_URL=amqp://rabbitmq:5672 \
./attune-core-timer-sensor
# Process management
- PID tracking in SensorManager
- SIGTERM for graceful shutdown
- SIGKILL fallback after 30s
- Restart on crash (max 3 attempts)
Script-Based Sensors (Python/Shell)
Script sensors are executed through the worker runtime:
# Python sensor example
class IntervalTimerSensor:
def __init__(self, api_token, sensor_ref):
self.api_client = ApiClient(token=api_token)
self.sensor_ref = sensor_ref
def run(self):
while True:
# Check triggers
# Emit events
time.sleep(self.poll_interval)
Managed similarly to native sensors but executed via Python runtime.
Database Schema Additions
Sensor Process Tracking
-- Add to sensor table (future enhancement)
ALTER TABLE sensor ADD COLUMN process_id INTEGER;
ALTER TABLE sensor ADD COLUMN last_started TIMESTAMPTZ;
ALTER TABLE sensor ADD COLUMN last_stopped TIMESTAMPTZ;
ALTER TABLE sensor ADD COLUMN active_token_id BIGINT REFERENCES identity(id);
ALTER TABLE sensor ADD COLUMN restart_count INTEGER DEFAULT 0;
ALTER TABLE sensor ADD COLUMN status sensor_status_enum DEFAULT 'stopped';
CREATE TYPE sensor_status_enum AS ENUM (
'stopped',
'starting',
'running',
'stopping',
'error'
);
Active Rules View
-- View to quickly check sensors that should be running
CREATE VIEW active_sensors AS
SELECT
s.id,
s.ref AS sensor_ref,
s.trigger,
t.ref AS trigger_ref,
COUNT(r.id) AS active_rule_count,
CASE WHEN COUNT(r.id) > 0 THEN true ELSE false END AS should_be_running
FROM sensor s
JOIN trigger t ON t.id = s.trigger
LEFT JOIN rule r ON r.trigger = s.trigger AND r.enabled = TRUE
WHERE s.enabled = TRUE
GROUP BY s.id, s.ref, s.trigger, t.ref;
Monitoring and Observability
Metrics
Track the following metrics:
- Sensor lifecycle events: starts, stops, crashes
- Token operations: issued, refreshed, revoked
- Active sensor count: gauge of running sensors
- Rule-to-sensor ratio: avg rules per sensor
- Token refresh success rate: % of successful refreshes
Logging
All lifecycle events are logged with structured data:
{
"event": "sensor_started",
"sensor_id": 42,
"sensor_ref": "core.interval_timer_sensor",
"trigger_ref": "core.intervaltimer",
"active_rules": 3,
"token_issued": true,
"timestamp": "2025-01-29T22:00:00Z"
}
{
"event": "sensor_stopped",
"sensor_id": 42,
"sensor_ref": "core.interval_timer_sensor",
"reason": "no_active_rules",
"token_revoked": true,
"uptime_seconds": 3600,
"timestamp": "2025-01-29T23:00:00Z"
}
Health Checks
SensorManager runs a monitoring loop (every 60s) to:
- Check process health (is PID alive?)
- Verify event emission (has sensor emitted events recently?)
- Restart crashed sensors (if rules still active)
- Update sensor status in database
API Endpoints
Token Management
POST /auth/sensor-token
Content-Type: application/json
{
"sensor_id": 42,
"sensor_ref": "core.interval_timer_sensor",
"trigger_types": ["core.intervaltimer"],
"ttl_days": 90
}
Response: {
"access_token": "eyJ...",
"token_type": "bearer",
"expires_in": 7776000,
"sensor_ref": "core.interval_timer_sensor"
}
POST /auth/refresh
Authorization: Bearer <current_token>
Response: {
"access_token": "eyJ...",
"expires_in": 7776000
}
DELETE /auth/token/:token_id
Authorization: Bearer <admin_token>
Response: 204 No Content
Sensor Status
GET /api/v1/sensors/:sensor_id/status
Authorization: Bearer <token>
Response: {
"sensor_id": 42,
"sensor_ref": "core.interval_timer_sensor",
"status": "running",
"active_rules": 3,
"last_started": "2025-01-29T22:00:00Z",
"uptime_seconds": 3600,
"events_emitted": 120
}
Edge Cases and Error Handling
Rapid Rule Toggling
Scenario: Rule is rapidly enabled/disabled
Solution: Debounce sensor lifecycle changes (5s window)
// Only process one lifecycle change per sensor per 5 seconds
let last_change = sensor_manager.last_change_time(sensor_id);
if last_change.elapsed() < Duration::from_secs(5) {
debug!("Debouncing lifecycle change for sensor {}", sensor_id);
return Ok(());
}
Sensor Crash During Startup
Scenario: Sensor process crashes immediately after starting
Solution: Exponential backoff with max retry limit
async fn start_sensor_with_retry(sensor_id: i64) -> Result<()> {
for attempt in 1..=MAX_RETRIES {
match start_sensor(sensor_id).await {
Ok(_) => return Ok(()),
Err(e) => {
error!("Sensor start attempt {} failed: {}", attempt, e);
if attempt < MAX_RETRIES {
let delay = Duration::from_secs(2u64.pow(attempt));
tokio::time::sleep(delay).await;
} else {
return Err(e);
}
}
}
}
Err(anyhow!("Max retries exceeded"))
}
Token Revocation Failure
Scenario: API is unreachable when trying to revoke token
Solution: Queue revocation for retry, proceed with shutdown
if let Err(e) = revoke_token(token_id).await {
error!("Failed to revoke token {}: {}", token_id, e);
// Queue for retry
pending_revocations.push(token_id);
// Continue with sensor shutdown anyway
}
Database Connectivity Loss
Scenario: Cannot query active rule count
Solution: Fail-safe to keep sensors running (avoid downtime)
match get_active_rule_count(sensor_id).await {
Ok(count) => handle_based_on_count(count),
Err(e) => {
error!("Cannot query rule count: {}", e);
// Keep sensor running to avoid disruption
warn!("Keeping sensor running due to DB error");
}
}
Migration Strategy
Phase 1: Implement Core Logic (Current)
- Add
has_active_rules()to SensorManager ✓ - Modify
start()to check active rules before starting ✓ - Add
handle_rule_change()method ✓ - Integrate with RuleLifecycleListener ✓
Phase 2: Token Management
- Add sensor token issuance to API
- Implement token revocation endpoint
- Add token cleanup job for expired revocations
- Update sensor startup to use issued tokens
Phase 3: Process Management
- Track sensor PIDs in SensorManager
- Implement graceful shutdown (SIGTERM)
- Add process health monitoring
- Implement restart logic with backoff
Phase 4: Observability
- Add structured logging for lifecycle events
- Expose metrics for monitoring
- Add sensor status endpoint to API
- Create admin dashboard for sensor management
Testing Strategy
Unit Tests
#[tokio::test]
async fn test_sensor_starts_with_active_rules() {
let manager = SensorManager::new(...);
let sensor = create_test_sensor();
let rule = create_test_rule(sensor.trigger);
manager.handle_rule_change(sensor.trigger).await.unwrap();
assert!(manager.is_running(sensor.id));
}
#[tokio::test]
async fn test_sensor_stops_when_last_rule_disabled() {
let manager = SensorManager::new(...);
let sensor = create_running_sensor();
// Disable all rules
disable_all_rules(sensor.trigger).await;
manager.handle_rule_change(sensor.trigger).await.unwrap();
assert!(!manager.is_running(sensor.id));
}
Integration Tests
#[tokio::test]
async fn test_end_to_end_lifecycle() {
// 1. Create sensor (should not start)
let sensor = create_sensor().await;
assert_sensor_stopped(sensor.id);
// 2. Create enabled rule (sensor should start)
let rule = create_enabled_rule(sensor.trigger).await;
wait_for_sensor_running(sensor.id);
// 3. Disable rule (sensor should stop)
disable_rule(rule.id).await;
wait_for_sensor_stopped(sensor.id);
// 4. Verify token was revoked
assert_token_revoked(sensor.token_id);
}
Future Enhancements
- Smart Scheduling: Start sensors 30s before first rule execution
- Shared Sensors: Multiple sensor types sharing same infrastructure
- Auto-scaling: Spawn multiple sensor instances for high-volume triggers
- Circuit Breakers: Disable sensors that repeatedly fail
- Cost Tracking: Track resource consumption per sensor
- Sensor Pools: Pre-warmed sensor processes for fast activation