21 KiB
Queue Architecture and FIFO Execution Ordering
Status: Production Ready (v0.1)
Last Updated: 2025-01-27
Overview
Attune implements a per-action FIFO queue system to guarantee deterministic execution ordering when policy limits (concurrency, delays) are enforced. This ensures fairness, predictability, and correct workflow execution.
Why Queue Ordering Matters
Problem: Without ordered queuing, when multiple executions are blocked by policies, they proceed in random order based on tokio's task scheduling. This causes:
- ❌ Fairness Violations: Later requests execute before earlier ones
- ❌ Non-determinism: Same workflow produces different orders across runs
- ❌ Broken Dependencies: Parent executions may proceed after children
- ❌ Poor UX: Unpredictable queue behavior frustrates users
Solution: FIFO queues with async notification ensure executions proceed in strict request order.
Architecture Components
1. ExecutionQueueManager
Location: crates/executor/src/queue_manager.rs
The central component managing all execution queues.
pub struct ExecutionQueueManager {
queues: DashMap<i64, Arc<Mutex<ActionQueue>>>, // Key: action_id
config: QueueConfig,
db_pool: Option<PgPool>,
}
Key Features:
- One queue per action: Isolated FIFO queues prevent cross-action interference
- Thread-safe: Uses
DashMapfor lock-free map access - Async-friendly: Uses
tokio::Notifyfor efficient waiting - Observable: Tracks statistics for monitoring
2. ActionQueue
Per-action queue structure with FIFO ordering guarantees.
struct ActionQueue {
queue: VecDeque<QueueEntry>, // FIFO queue
active_count: u32, // Currently running
max_concurrent: u32, // Policy limit
total_enqueued: u64, // Lifetime counter
total_completed: u64, // Lifetime counter
}
3. QueueEntry
Individual execution waiting in queue.
struct QueueEntry {
execution_id: i64,
enqueued_at: DateTime<Utc>,
notifier: Arc<Notify>, // Async notification
}
Notification Mechanism:
- Each queued execution gets a
tokio::Notifyhandle - Worker completion triggers
notify.notify_one()on next waiter - No polling required - efficient async waiting
Execution Flow
Normal Flow (With Capacity)
┌─────────────────────────────────────────────────────────────┐
│ 1. EnforcementProcessor receives enforcement.created │
│ └─ Enforcement: rule fired, needs execution │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 2. PolicyEnforcer.check_policies(action_id) │
│ └─ Verify rate limits, quotas │
│ └─ Return: None (no violation) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. QueueManager.enqueue_and_wait(action_id, exec_id, limit)│
│ └─ Check: active_count < max_concurrent? │
│ └─ YES: Increment active_count │
│ └─ Return immediately (no waiting) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 4. Create Execution record in database │
│ └─ Status: REQUESTED │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 5. Publish execution.requested to scheduler │
│ └─ Scheduler selects worker and forwards │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 6. Worker executes action │
│ └─ Status: RUNNING → SUCCEEDED/FAILED │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 7. Worker publishes execution.completed │
│ └─ Payload: { execution_id, action_id, status, result } │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 8. CompletionListener receives message │
│ └─ QueueManager.notify_completion(action_id) │
│ └─ Decrement active_count │
│ └─ Notify next waiter in queue (if any) │
└─────────────────────────────────────────────────────────────┘
Queued Flow (At Capacity)
┌─────────────────────────────────────────────────────────────┐
│ 1-2. Same as normal flow (enforcement, policy check) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. QueueManager.enqueue_and_wait(action_id, exec_id, limit)│
│ └─ Check: active_count < max_concurrent? │
│ └─ NO: Queue is at capacity │
│ └─ Create QueueEntry with Notify handle │
│ └─ Push to VecDeque (FIFO position) │
│ └─ await notifier.notified() ← BLOCKS HERE │
└─────────────────────────────────────────────────────────────┘
│
│ (waits for notification)
▼
┌─────────────────────────────────────────────────────────────┐
│ WORKER COMPLETES EARLIER EXECUTION │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ CompletionListener.notify_completion(action_id) │
│ └─ Lock queue │
│ └─ Pop front QueueEntry (FIFO!) │
│ └─ Decrement active_count (was N) │
│ └─ entry.notifier.notify_one() ← WAKES WAITER │
│ └─ Increment active_count (back to N) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. (continued) enqueue_and_wait() resumes │
│ └─ Return Ok(()) - slot acquired │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 4-8. Same as normal flow (create execution, execute, etc.) │
└─────────────────────────────────────────────────────────────┘
FIFO Guarantee
How FIFO is Maintained
- Single Queue per Action: Each action has independent
VecDeque<QueueEntry> - Push Back, Pop Front: New entries added to back, next waiter from front
- Locked Mutations: All queue operations protected by
Mutex - No Reordering: No priority, no jumping - strict first-in-first-out
Example Scenario
Action: core.http.get (max_concurrent = 2)
T=0: Exec A arrives → active_count=0 → proceeds immediately (active=1)
T=1: Exec B arrives → active_count=1 → proceeds immediately (active=2)
T=2: Exec C arrives → active_count=2 → QUEUED at position 0
T=3: Exec D arrives → active_count=2 → QUEUED at position 1
T=4: Exec E arrives → active_count=2 → QUEUED at position 2
Queue state: [C, D, E]
T=5: A completes → pop C from front → C proceeds (active=2, queue=[D, E])
T=6: B completes → pop D from front → D proceeds (active=2, queue=[E])
T=7: C completes → pop E from front → E proceeds (active=2, queue=[])
T=8: D completes → (queue empty, active=1)
T=9: E completes → (queue empty, active=0)
Result: Executions proceeded in exact order: A, B, C, D, E ✅
Queue Statistics
Data Model
pub struct QueueStats {
pub action_id: i64,
pub queue_length: usize, // Waiting count
pub active_count: u32, // Running count
pub max_concurrent: u32, // Policy limit
pub oldest_enqueued_at: Option<DateTime<Utc>>,
pub total_enqueued: u64, // Lifetime counter
pub total_completed: u64, // Lifetime counter
}
Persistence
Queue statistics are persisted to the attune.queue_stats table for:
- API visibility: Real-time queue monitoring
- Historical tracking: Execution patterns over time
- Alerting: Detect stuck or growing queues
Update Frequency: On every queue state change (enqueue, dequeue, complete)
Accessing Stats
In-Memory (Executor service):
let stats = queue_manager.get_queue_stats(action_id).await;
Database (Any service):
let stats = QueueStatsRepository::find_by_action(pool, action_id).await?;
API Endpoint:
GET /api/v1/actions/core.http.get/queue-stats
Configuration
Executor Configuration
executor:
queue:
# Maximum executions per queue (prevents memory exhaustion)
max_queue_length: 10000
# Maximum time an execution can wait in queue (seconds)
queue_timeout_seconds: 3600
# Enable/disable queue metrics persistence
enable_metrics: true
Environment Variables
# Override via environment
export ATTUNE__EXECUTOR__QUEUE__MAX_QUEUE_LENGTH=5000
export ATTUNE__EXECUTOR__QUEUE__QUEUE_TIMEOUT_SECONDS=1800
Performance Characteristics
Memory Usage
Per Queue: ~128 bytes (DashMap entry + Arc + Mutex overhead)
Per Queued Execution: ~80 bytes (QueueEntry + Arc)
Example: 100 actions with 50 queued executions each:
- Queue overhead: 100 × 128 bytes = ~12 KB
- Entry overhead: 5000 × 80 bytes = ~400 KB
- Total: ~412 KB (negligible)
Latency
- Enqueue (with capacity): < 1 μs (just increment counter)
- Enqueue (at capacity): O(1) to queue, then async wait
- Dequeue (notify): < 10 μs (pop + notify)
- Stats lookup: < 1 μs (DashMap read)
Throughput
Measured Performance (from stress tests):
- 1,000 executions (concurrency=5): ~200 exec/sec
- 10,000 executions (concurrency=10): ~500 exec/sec
Bottleneck: Database writes and worker execution time, not queue overhead
Monitoring and Observability
Health Indicators
Healthy Queue:
- ✅
queue_lengthis 0 or low (< 10% of max) - ✅
active_count≈max_concurrentduring load - ✅
oldest_enqueued_atis recent (< 5 minutes) - ✅
total_completedincreases steadily
Unhealthy Queue:
- ⚠️
queue_lengthconsistently high (> 50% of max) - ⚠️
oldest_enqueued_atis old (> 30 minutes) - 🚨
queue_lengthapproachesmax_queue_length - 🚨
active_count<max_concurrent(workers stuck)
Monitoring Queries
Active queues:
SELECT action_id, queue_length, active_count, max_concurrent,
oldest_enqueued_at, last_updated
FROM attune.queue_stats
WHERE queue_length > 0 OR active_count > 0
ORDER BY queue_length DESC;
Stuck queues (not progressing):
SELECT a.ref, qs.queue_length, qs.active_count,
qs.oldest_enqueued_at,
NOW() - qs.last_updated AS stale_duration
FROM attune.queue_stats qs
JOIN attune.action a ON a.id = qs.action_id
WHERE (queue_length > 0 OR active_count > 0)
AND last_updated < NOW() - INTERVAL '10 minutes';
Queue throughput:
SELECT a.ref, qs.total_completed, qs.total_enqueued,
qs.total_completed::float / NULLIF(qs.total_enqueued, 0) * 100 AS completion_rate
FROM attune.queue_stats qs
JOIN attune.action a ON a.id = qs.action_id
WHERE total_enqueued > 0
ORDER BY total_enqueued DESC;
Troubleshooting
Queue Not Progressing
Symptom: queue_length stays constant, executions don't proceed
Possible Causes:
- Workers not completing: Check worker logs for crashes/hangs
- Completion messages not publishing: Check worker MQ connection
- CompletionListener not running: Check executor service logs
- Database deadlock: Check PostgreSQL logs
Diagnosis:
# Check active executions for this action
psql -c "SELECT id, status, created FROM attune.execution
WHERE action = <action_id> AND status IN ('running', 'requested')
ORDER BY created DESC LIMIT 10;"
# Check worker logs
tail -f /var/log/attune/worker.log | grep "execution_id"
# Check completion messages
rabbitmqctl list_queues name messages
Queue Full Errors
Symptom: Error: Queue full (max length: 10000)
Causes:
- Action is overwhelmed with requests
- Workers are too slow or stuck
max_queue_lengthis too low
Solutions:
-
Increase limit (short-term):
executor: queue: max_queue_length: 20000 -
Add more workers (medium-term):
- Scale worker service horizontally
- Increase worker concurrency
-
Increase concurrency limit (if safe):
- Adjust action-specific policy
- Higher
max_concurrent= more parallel executions
-
Rate limit at API (long-term):
- Add API-level rate limiting
- Reject requests before they enter system
Memory Exhaustion
Symptom: Executor OOM killed, high memory usage
Causes:
- Too many queues with large queue lengths
- Memory leak in queue entries
Diagnosis:
# Check queue stats in database
psql -c "SELECT SUM(queue_length) as total_queued,
COUNT(*) as num_actions,
MAX(queue_length) as max_queue
FROM attune.queue_stats;"
# Monitor executor memory
ps aux | grep attune-executor
Solutions:
- Reduce
max_queue_length - Clear old queues:
queue_manager.clear_all_queues() - Restart executor service (queues rebuild from DB)
FIFO Violation (Critical Bug)
Symptom: Executions complete out of order
This should NEVER happen - indicates a critical bug.
Diagnosis:
-
Enable detailed logging:
// In queue_manager.rs tracing::debug!( "Enqueued exec {} at position {} for action {}", execution_id, queue.len(), action_id ); -
Check for race conditions:
- Multiple threads modifying same queue
- Lock not held during entire operation
- Notify called before entry dequeued
Report immediately with:
- Executor logs with timestamps
- Database query showing execution order
- Queue stats at time of violation
Best Practices
For Operators
- Monitor queue depths: Alert on
queue_length > 100 - Set reasonable limits: Don't set
max_queue_lengthtoo high - Scale workers: Add workers when queues consistently fill
- Regular cleanup: Run cleanup jobs to remove stale stats
- Test policies: Validate concurrency limits in staging first
For Developers
- Test with queues: Always test actions with concurrency limits
- Handle timeouts: Implement proper timeout handling in actions
- Idempotent actions: Design actions to be safely retried
- Log execution order: Log start/end times for debugging
- Monitor completion rate: Track
total_completed / total_enqueued
For Action Authors
- Know your limits: Understand action's concurrency safety
- Fast completions: Minimize action execution time
- Proper error handling: Always complete (success or failure)
- No indefinite blocking: Use timeouts on external calls
- Test at scale: Stress test with many concurrent requests
Security Considerations
Queue Exhaustion DoS
Attack: Attacker floods system with action requests to fill queues
Mitigations:
- Rate limiting: API-level request throttling
- Authentication: Require auth for action triggers
- Queue limits:
max_queue_lengthprevents unbounded growth - Queue timeouts:
queue_timeout_secondsevicts old entries - Monitoring: Alert on sudden queue growth
Priority Escalation
Non-Issue: FIFO prevents priority jumping - no user can skip the queue
Information Disclosure
Concern: Queue stats reveal system load
Mitigation: Restrict /queue-stats endpoint to authenticated users with appropriate RBAC
Future Enhancements
Planned Features
- Priority queues: Allow high-priority executions to jump queue
- Queue pausing: Temporarily stop processing specific actions
- Batch notifications: Notify multiple waiters at once
- Queue persistence: Survive executor restarts
- Cross-executor coordination: Distributed queue management
- Advanced metrics: Latency percentiles, queue age histograms
- Auto-scaling: Automatically adjust
max_concurrentbased on load
Related Documentation
- Executor Service Architecture
- Policy Enforcement
- Worker Service
- API: Actions - Queue Stats Endpoint
- Operational Runbook
References
- Implementation:
crates/executor/src/queue_manager.rs - Tests:
crates/executor/tests/fifo_ordering_integration_test.rs - Implementation Plan:
work-summary/2025-01-policy-ordering-plan.md - Status:
work-summary/FIFO-ORDERING-STATUS.md
Version: 1.0
Status: Production Ready
Last Updated: 2025-01-27