11 KiB
RabbitMQ Queue Ownership Architecture
Last Updated: 2026-02-05
Status: Implemented
Overview
Attune uses a service-specific infrastructure setup pattern where each service is responsible for declaring only the queues it consumes. This provides clear ownership, reduces redundancy, and makes the system architecture more maintainable.
Principle
Each service declares the queues it consumes.
This follows the principle that the consumer owns the queue declaration, ensuring that:
- Queue configuration is co-located with the service that uses it
- Services can start in any order (all operations are idempotent)
- Ownership is clear from the codebase structure
- Changes to queue configuration are localized to the consuming service
Infrastructure Layers
Common Infrastructure (Shared by All Services)
Declared by: Any service on startup (first-to-start wins, idempotent)
Responsibility: Ensures basic messaging infrastructure exists
Components:
- Exchanges:
attune.events(topic) - Event routingattune.executions(topic) - Execution lifecycle routingattune.notifications(fanout) - Real-time notifications
- Dead Letter Exchange (DLX):
attune.dlx(direct) - Failed message handlingattune.dlx.queue- Dead letter queue bound to DLX
Setup Method: Connection::setup_common_infrastructure()
Service-Specific Infrastructure
Each service declares only the queues it consumes:
Service Responsibilities
Executor Service
Role: Orchestrates execution lifecycle, enforces rules, manages inquiries
Queues Owned:
attune.enforcements.queue- Routing:
enforcement.# - Purpose: Rule enforcement requests
- Routing:
attune.execution.requests.queue- Routing:
execution.requested - Purpose: New execution requests
- Routing:
attune.execution.status.queue- Routing:
execution.status.changed - Purpose: Execution status updates from workers
- Routing:
attune.execution.completed.queue- Routing:
execution.completed - Purpose: Completed execution results
- Routing:
attune.inquiry.responses.queue- Routing:
inquiry.responded - Purpose: Human-in-the-loop responses
- Routing:
Setup Method: Connection::setup_executor_infrastructure()
Code Location: crates/executor/src/service.rs
Worker Service
Role: Execute actions in various runtimes (shell, Python, Node.js, containers)
Queues Owned:
worker.{id}.executions(per worker instance)- Routing:
execution.dispatch.worker.{id} - Purpose: Execution tasks dispatched to this specific worker
- Properties: Durable, auto-delete on disconnect
- Routing:
Setup Method: Connection::setup_worker_infrastructure(worker_id, config)
Code Location: crates/worker/src/service.rs
Notes:
- Each worker instance gets its own queue
- Worker ID is assigned during registration
- Queue is created after successful registration
- Multiple workers can exist for load distribution
Sensor Service
Role: Monitor for events and generate trigger instances
Queues Owned:
attune.events.queue- Routing:
#(all events) - Purpose: Events generated by sensors and triggers
- Routing:
Setup Method: Connection::setup_sensor_infrastructure()
Code Location: crates/sensor/src/service.rs
Notifier Service
Role: Real-time notifications via WebSockets
Queues Owned:
attune.notifications.queue- Routing: `` (fanout, no routing key)
- Purpose: System notifications for WebSocket broadcasting
Setup Method: Connection::setup_notifier_infrastructure()
Code Location: crates/notifier/src/service.rs
Notes:
- Uses fanout exchange (broadcasts to all consumers)
- Also uses PostgreSQL LISTEN/NOTIFY for database events
API Service
Role: HTTP gateway for client interactions
Queues Owned: None (API only publishes, doesn't consume)
Setup Method: Connection::setup_common_infrastructure() only
Code Location: crates/api/src/main.rs
Notes:
- Only needs exchanges to publish messages
- Does not consume from any queues
- Publishes to various exchanges (events, executions, notifications)
Queue Configuration
All queues are configured with:
- Durable:
true(survives broker restarts) - Exclusive:
false(accessible by multiple connections) - Auto-delete:
false(persist even without consumers) - Dead Letter Exchange:
attune.dlx(enabled by default)
Exception:
- Worker-specific queues may have different settings based on worker lifecycle
Message Flow Examples
Rule Enforcement Flow
Event Created
→ `attune.events` exchange
→ `attune.events.queue` (consumed by Executor)
→ Rule evaluation
→ `enforcement.created` published to `attune.executions`
→ `attune.enforcements.queue` (consumed by Executor)
Execution Flow
Execution Requested (from API)
→ `attune.executions` exchange (routing: execution.requested)
→ `attune.execution.requests.queue` (consumed by Executor/Scheduler)
→ Executor dispatches to worker
→ `execution.dispatch.worker.{id}` to `attune.executions`
→ `worker.{id}.executions` (consumed by Worker)
→ Worker executes action
→ `execution.completed` to `attune.executions`
→ `attune.execution.completed.queue` (consumed by Executor)
Notification Flow
System Event Occurs
→ `attune.notifications` exchange (fanout)
→ `attune.notifications.queue` (consumed by Notifier)
→ WebSocket broadcast to connected clients
Implementation Details
Setup Call Order
Each service follows this pattern:
// 1. Connect to RabbitMQ
let mq_connection = Connection::connect(mq_url).await?;
// 2. Setup common infrastructure (exchanges, DLX)
mq_connection.setup_common_infrastructure(&config).await?;
// 3. Setup service-specific queues
mq_connection.setup_SERVICE_infrastructure(&config).await?;
// (where SERVICE is executor, worker, sensor, or notifier)
Idempotency
All setup operations are idempotent:
- Declaring an existing exchange/queue with the same settings succeeds
- Multiple services can call
setup_common_infrastructure()safely - Services can start in any order
Error Handling
Setup failures are logged but not fatal:
- Queues may already exist from previous runs
- Another service may have created the infrastructure
- Only actual consumption failures should stop the service
Startup Sequence
Typical Docker Compose Startup:
- PostgreSQL - Starts first (dependency)
- RabbitMQ - Starts first (dependency)
- Migrations - Runs database migrations
- Services start in parallel:
- API - Creates common infrastructure
- Executor - Creates common + executor infrastructure
- Workers - Each creates common + worker-specific queue
- Sensor - Creates common + sensor infrastructure
- Notifier - Creates common + notifier infrastructure
The first service to start creates the common infrastructure. All subsequent services find it already exists and proceed.
Benefits
✅ Clear Ownership - Code inspection shows which service owns which queue
✅ Reduced Redundancy - Each queue declared exactly once (per service type)
✅ Better Debugging - Queue issues isolated to specific services
✅ Improved Maintainability - Changes to queue config localized
✅ Self-Documenting - Code structure reflects system architecture
✅ Order Independence - Services can start in any order
✅ Monitoring - Can track which service created infrastructure
Monitoring and Verification
RabbitMQ Management UI
Access at http://localhost:15672 (credentials: guest/guest)
Expected Queues:
attune.dlx.queue- Dead letter queueattune.events.queue- Events (Sensor)attune.enforcements.queue- Enforcements (Executor)attune.execution.requests.queue- Execution requests (Executor)attune.execution.status.queue- Status updates (Executor)attune.execution.completed.queue- Completions (Executor)attune.inquiry.responses.queue- Inquiry responses (Executor)attune.notifications.queue- Notifications (Notifier)worker.{id}.executions- Worker queues (one per worker)
Verification Commands
# Check which queues exist
docker compose exec rabbitmq rabbitmqctl list_queues name messages
# Check queue bindings
docker compose exec rabbitmq rabbitmqctl list_bindings
# Check who's consuming from queues
docker compose exec rabbitmq rabbitmqctl list_consumers
Log Verification
Each service logs its infrastructure setup:
# API (common only)
docker compose logs api | grep "infrastructure setup"
# Executor (common + executor)
docker compose logs executor | grep "infrastructure setup"
# Workers (common + worker-specific)
docker compose logs worker-shell | grep "infrastructure setup"
# Sensor (common + sensor)
docker compose logs sensor | grep "infrastructure setup"
Troubleshooting
Queue Already Exists with Different Settings
Error: PRECONDITION_FAILED - inequivalent arg 'durable' for queue...
Cause: Queue exists with different configuration than code expects
Solution:
# Stop services
docker compose down
# Remove RabbitMQ volume to clear all queues
docker volume rm attune_rabbitmq_data
# Restart services
docker compose up -d
Service Can't Connect to RabbitMQ
Check: Is RabbitMQ healthy?
docker compose ps rabbitmq
Check: RabbitMQ logs for errors
docker compose logs rabbitmq
Messages Not Being Consumed
-
Check queue has consumers:
docker compose exec rabbitmq rabbitmqctl list_consumers -
Check service is running:
docker compose ps -
Check service logs for consumer startup:
docker compose logs <service-name> | grep "Consumer started"
Migration from Old Architecture
Previous Behavior: All services called setup_infrastructure() which created ALL queues
New Behavior: Each service calls its specific setup method
Migration Steps:
- Update to latest code
- Stop all services:
docker compose down - Clear RabbitMQ volume:
docker volume rm attune_rabbitmq_data - Start services:
docker compose up -d
No data loss occurs as message queues are transient infrastructure.
Related Documentation
- Queue Architecture - Overall queue design
- RabbitMQ Queues Quick Reference
- Executor Service
- Worker Service
- Sensor Service
Change History
| Date | Change | Author |
|---|---|---|
| 2026-02-05 | Initial implementation of service-specific queue ownership | AI Assistant |