9.7 KiB
Work Summary: Critical Fixes for Timer-Driven Execution Pipeline
Date: 2026-01-16
Status: COMPLETED ✅
Overview
Fixed two P0 blocking issues that prevented the timer-driven automation pipeline from completing end-to-end:
- Message Loop Bug - Execution manager processing completion messages infinitely
- Worker Runtime Resolution - Worker unable to find correct runtime for actions
Both issues are now resolved, and executions complete successfully with the correct runtime.
Issue 1: Message Loop in Execution Manager
Problem
Execution manager entered an infinite loop where ExecutionCompleted messages were routed back to the status queue and reprocessed indefinitely.
Symptoms:
INFO Processing status change for execution 137: Failed
INFO Updated execution 137 status: Failed -> Failed
INFO Handling completion for execution: 137
INFO Published execution.completed notification for execution: 137
[repeats infinitely]
Root Cause
The execution manager's queue was bound with wildcard pattern execution.status.# which matched:
execution.status.changed✅ (intended)execution.completed❌ (unintended - caused loop)
When an execution completed, the manager published an ExecutionCompleted message with routing key execution.completed. This message was routed back to the same queue due to the wildcard pattern, causing infinite reprocessing.
Solution
Changed queue binding in common/src/mq/connection.rs from wildcard to exact match:
// Before
self.bind_queue(
&config.rabbitmq.queues.execution_status.name,
&config.rabbitmq.exchanges.executions.name,
"execution.status.#", // Matches ALL execution.status.* messages
)
// After
self.bind_queue(
&config.rabbitmq.queues.execution_status.name,
&config.rabbitmq.exchanges.executions.name,
"execution.status.changed", // Only matches status change messages
)
Files Modified
crates/common/src/mq/connection.rs- Line 407
Result
✅ ExecutionCompleted messages no longer route to status queue
✅ Manager processes each status change exactly once
✅ No more infinite loops
Issue 2: Worker Runtime Resolution Failure
Problem
Worker received execution messages but failed with:
ERROR Runtime not found: No runtime found for action: core.echo
Even though:
- Worker had shell runtime available ✅
- Action
core.echorequires shell runtime ✅ - Runtime matching logic existed ✅
Root Cause
The worker's runtime selection relied on can_execute() methods that checked file extensions and action_ref patterns:
- Python runtime checked for
.pyin action_ref - Shell runtime checked for specific patterns
core.echodidn't match any patterns
The action's runtime metadata (stored in database as runtime: 3 → shell runtime) was being ignored.
Solution Architecture
1. Added runtime_name field to ExecutionContext
pub struct ExecutionContext {
// ... existing fields
pub runtime_name: Option<String>, // NEW: "python", "shell", etc.
}
2. Updated worker executor to load runtime from database
// Load runtime information if specified
let runtime_name = if let Some(runtime_id) = action.runtime {
match sqlx::query_as::<_, RuntimeModel>("SELECT * FROM attune.runtime WHERE id = $1")
.bind(runtime_id)
.fetch_optional(&self.pool)
.await
{
Ok(Some(runtime)) => {
debug!("Loaded runtime '{}' for action '{}'", runtime.name, action.r#ref);
Some(runtime.name)
}
// ... error handling
}
} else {
None
};
3. Updated RuntimeRegistry::get_runtime() to prefer runtime_name
pub fn get_runtime(&self, context: &ExecutionContext) -> RuntimeResult<&dyn Runtime> {
// If runtime_name is specified, use it directly (no guessing!)
if let Some(ref runtime_name) = context.runtime_name {
return self
.runtimes
.iter()
.find(|r| r.name() == runtime_name)
.map(|r| r.as_ref())
.ok_or_else(|| {
RuntimeError::RuntimeNotFound(format!(
"Runtime '{}' not found (available: {})",
runtime_name,
self.list_runtimes().join(", ")
))
});
}
// Fall back to can_execute() for ad-hoc executions
self.runtimes
.iter()
.find(|r| r.can_execute(context))
.map(|r| r.as_ref())
.ok_or_else(|| RuntimeError::RuntimeNotFound(...))
}
4. Updated worker to register individual runtimes
// Register individual runtimes so they can be matched by name
runtime_registry.register(Box::new(PythonRuntime::new()));
runtime_registry.register(Box::new(ShellRuntime::new()));
// Also keep local runtime as fallback
runtime_registry.register(Box::new(LocalRuntime::new()));
Files Modified
crates/worker/src/runtime/mod.rs- Added runtime_name field, updated get_runtime()crates/worker/src/executor.rs- Load runtime from database, populate runtime_namecrates/worker/src/service.rs- Register individual Python/Shell runtimes- Test files updated to include new field:
crates/worker/src/runtime/python.rscrates/worker/src/runtime/shell.rscrates/worker/src/runtime/local.rs
Result
✅ Worker correctly identifies runtime from database metadata
✅ Runtime selection based on authoritative data, not pattern matching
✅ Backward compatible with can_execute() for ad-hoc executions
✅ Actions execute successfully with correct runtime
Verification
Test Execution (Execution ID 180)
2026-01-16T04:01:08.319898Z INFO Starting execution: 180
2026-01-16T04:01:08.326189Z INFO Executing shell action: core.echo (execution_id: 180)
2026-01-16T04:01:08.328574Z INFO Execution 180 succeeded
2026-01-16T04:01:08.330727Z INFO Execution 180 completed successfully in 2ms
Database State
SELECT id, status FROM attune.execution WHERE id >= 177;
id | status
-----+-----------
182 | completed
181 | completed
180 | completed
179 | completed
178 | completed
Runtime Registration
2026-01-16T04:00:46.998849Z INFO Registered runtimes: ["python", "shell", "local"]
Architecture Improvements
Before
Action (core.echo)
└─> Worker receives execution
└─> RuntimeRegistry.get_runtime()
└─> Check can_execute() on each runtime
└─> PythonRuntime.can_execute() → ❌ (no .py)
└─> ShellRuntime.can_execute() → ❌ (no pattern match)
└─> ERROR: Runtime not found
After
Action (core.echo, runtime_id: 3)
└─> Worker loads action from database
└─> Query runtime table: runtime_id=3 → name="shell"
└─> ExecutionContext { runtime_name: Some("shell"), ... }
└─> RuntimeRegistry.get_runtime()
└─> Direct name match: "shell" → ShellRuntime ✅
└─> SUCCESS: Execute with shell runtime
Impact
Immediate Benefits
- End-to-end timer pipeline works - Events trigger actions successfully
- Reliable runtime selection - Based on database metadata, not heuristics
- No more infinite loops - Execution manager processes messages correctly
- Fast execution - Shell actions complete in ~2-3ms
System Status
- ✅ Sensor: Firing timer events every 10 seconds
- ✅ Executor: Processing enforcements, scheduling executions
- ✅ Worker: Executing actions with correct runtime
- ✅ End-to-end: Timer → Event → Rule → Enforcement → Execution → Completion
Known Issues
Message Re-delivery
Executions are being processed multiple times due to message acknowledgment issues. This is a separate concern from the fixed issues and does not prevent successful execution.
Example:
Processing execution.scheduled for execution: 180
Starting execution: 180
Execution 180 completed successfully
Processing execution.scheduled for execution: 180 # Re-delivered
Starting execution: 180
Execution 180 completed successfully
Impact: Minor - Executions still complete successfully, just with redundant processing.
Next Steps: Investigate message acknowledgment in worker consumer to ensure proper ACK/NACK handling.
Testing Recommendations
Manual Testing
# 1. Start all services
./target/debug/attune-sensor -c config.development.yaml &
./target/debug/attune-executor -c config.development.yaml &
./target/debug/attune-worker -c config.development.yaml &
# 2. Monitor logs
tail -f /tmp/sensor.log | grep "Timer.*fired"
tail -f /tmp/executor.log | grep "Execution.*scheduled"
tail -f /tmp/worker.log | grep "completed successfully"
# 3. Check database
psql -U postgres -h localhost -d attune \
-c "SELECT id, status, action_ref FROM attune.execution ORDER BY id DESC LIMIT 10;"
Integration Tests
- Test timer events trigger executions
- Test Python actions (verify python runtime)
- Test shell actions (verify shell runtime)
- Test execution lifecycle transitions
- Test message routing (no loops, no duplicate processing)
Configuration Note
Added message queue configuration to config.development.yaml:
message_queue:
url: amqp://guest:guest@localhost:5672
This was missing and caused sensor/executor/worker to fail on startup in development environment.
Summary
Both P0 blocking issues are RESOLVED:
- ✅ Message loop fixed via queue binding change
- ✅ Runtime resolution fixed via database-driven selection
The timer-driven automation pipeline now works end-to-end:
Timer (10s) → Event → Rule Match → Enforcement → Execution → Worker (shell) → Completion (2-3ms)
Next Priority: Address message re-delivery issue to eliminate redundant processing.