Files
attune/docs/architecture/worker-availability-handling.md

15 KiB

Worker Availability Handling

Status: Implementation Gap Identified
Priority: High
Date: 2026-02-09

Problem Statement

When workers are stopped or become unavailable, the executor continues attempting to schedule executions to them, resulting in:

  1. Stuck executions: Executions remain in SCHEDULING or SCHEDULED status indefinitely
  2. Queue buildup: Messages accumulate in worker-specific RabbitMQ queues
  3. No failure notification: Users don't know their executions are stuck
  4. Resource waste: System resources consumed by queued messages and database records

Current Architecture

Heartbeat Mechanism

Workers send heartbeat updates to the database periodically (default: 30 seconds).

// From crates/executor/src/scheduler.rs
const DEFAULT_HEARTBEAT_INTERVAL: u64 = 30;
const HEARTBEAT_STALENESS_MULTIPLIER: u64 = 3;

fn is_worker_heartbeat_fresh(worker: &Worker) -> bool {
    // Worker is fresh if heartbeat < 90 seconds old
    let max_age = Duration::from_secs(
        DEFAULT_HEARTBEAT_INTERVAL * HEARTBEAT_STALENESS_MULTIPLIER
    );
    // ...
}

Scheduling Flow

Execution Created (REQUESTED)
    ↓
Scheduler receives message
    ↓
Find compatible worker with fresh heartbeat
    ↓
Update execution to SCHEDULED
    ↓
Publish message to worker-specific queue
    ↓
Worker consumes and executes

Failure Points

  1. Worker stops after heartbeat: Worker has fresh heartbeat but is actually down
  2. Worker crashes: No graceful shutdown, heartbeat appears fresh temporarily
  3. Network partition: Worker isolated but appears healthy
  4. Queue accumulation: Messages sit in worker-specific queues indefinitely

Current Mitigations (Insufficient)

1. Heartbeat Staleness Check

fn select_worker(pool: &PgPool, action: &Action) -> Result<Worker> {
    // Filter by active workers
    let active_workers: Vec<_> = workers
        .into_iter()
        .filter(|w| w.status == WorkerStatus::Active)
        .collect();

    // Filter by heartbeat freshness
    let fresh_workers: Vec<_> = active_workers
        .into_iter()
        .filter(|w| is_worker_heartbeat_fresh(w))
        .collect();

    if fresh_workers.is_empty() {
        return Err(anyhow!("No workers with fresh heartbeats"));
    }

    // Select first available worker
    Ok(fresh_workers.into_iter().next().unwrap())
}

Gap: Workers can stop within the 90-second staleness window.

2. Message Requeue on Error

// From crates/common/src/mq/consumer.rs
match handler(envelope.clone()).await {
    Err(e) => {
        let requeue = e.is_retriable();
        channel.basic_nack(delivery_tag, BasicNackOptions {
            requeue,
            multiple: false,
        }).await?;
    }
}

Gap: Only requeues on retriable errors (connection/timeout), not worker unavailability.

3. Message TTL Configuration

// From crates/common/src/config.rs
pub struct MessageQueueConfig {
    #[serde(default = "default_message_ttl")]
    pub message_ttl: u64,
}

fn default_message_ttl() -> u64 {
    3600 // 1 hour
}

Gap: TTL not currently applied to worker queues, and 1 hour is too long.

Proposed Solutions

Solution 1: Execution Timeout Mechanism (HIGH PRIORITY)

Add a background task that monitors scheduled executions and fails them if they don't start within a timeout.

Implementation:

// crates/executor/src/execution_timeout_monitor.rs

pub struct ExecutionTimeoutMonitor {
    pool: PgPool,
    publisher: Arc<Publisher>,
    check_interval: Duration,
    scheduled_timeout: Duration,
}

impl ExecutionTimeoutMonitor {
    pub async fn start(&self) -> Result<()> {
        let mut interval = tokio::time::interval(self.check_interval);

        loop {
            interval.tick().await;

            if let Err(e) = self.check_stale_executions().await {
                error!("Error checking stale executions: {}", e);
            }
        }
    }

    async fn check_stale_executions(&self) -> Result<()> {
        let cutoff = Utc::now() - chrono::Duration::from_std(self.scheduled_timeout)?;

        // Find executions stuck in SCHEDULED status
        let stale_executions = sqlx::query_as::<_, Execution>(
            "SELECT * FROM execution 
             WHERE status = 'scheduled' 
             AND updated < $1"
        )
        .bind(cutoff)
        .fetch_all(&self.pool)
        .await?;

        for execution in stale_executions {
            warn!(
                "Execution {} has been scheduled for too long, marking as failed",
                execution.id
            );

            self.fail_execution(
                execution.id,
                "Execution timeout: worker did not pick up task within timeout"
            ).await?;
        }

        Ok(())
    }

    async fn fail_execution(&self, execution_id: i64, reason: &str) -> Result<()> {
        // Update execution status
        sqlx::query(
            "UPDATE execution 
             SET status = 'failed', 
                 result = $2,
                 updated = NOW() 
             WHERE id = $1"
        )
        .bind(execution_id)
        .bind(serde_json::json!({
            "error": reason,
            "failed_by": "execution_timeout_monitor"
        }))
        .execute(&self.pool)
        .await?;

        // Publish completion notification
        let payload = ExecutionCompletedPayload {
            execution_id,
            status: ExecutionStatus::Failed,
            result: Some(serde_json::json!({"error": reason})),
        };

        self.publisher
            .publish_envelope(
                MessageType::ExecutionCompleted,
                payload,
                "attune.executions",
            )
            .await?;

        Ok(())
    }
}

Configuration:

# config.yaml
executor:
  scheduled_timeout: 300  # 5 minutes (fail if not running within 5 min)
  timeout_check_interval: 60  # Check every minute

Solution 2: Worker Queue TTL and DLQ (MEDIUM PRIORITY)

Apply message TTL to worker-specific queues with dead letter exchange.

Implementation:

// When declaring worker-specific queues
let mut queue_args = FieldTable::default();

// Set message TTL (5 minutes)
queue_args.insert(
    "x-message-ttl".into(),
    AMQPValue::LongInt(300_000) // 5 minutes in milliseconds
);

// Set dead letter exchange
queue_args.insert(
    "x-dead-letter-exchange".into(),
    AMQPValue::LongString("attune.executions.dlx".into())
);

channel.queue_declare(
    &format!("attune.execution.worker.{}", worker_id),
    QueueDeclareOptions {
        durable: true,
        ..Default::default()
    },
    queue_args,
).await?;

Dead Letter Handler:

// crates/executor/src/dead_letter_handler.rs

pub struct DeadLetterHandler {
    pool: PgPool,
    consumer: Arc<Consumer>,
}

impl DeadLetterHandler {
    pub async fn start(&self) -> Result<()> {
        self.consumer
            .consume_with_handler(|envelope: MessageEnvelope<ExecutionScheduledPayload>| {
                let pool = self.pool.clone();
                
                async move {
                    warn!("Received dead letter for execution {}", envelope.payload.execution_id);
                    
                    // Mark execution as failed
                    sqlx::query(
                        "UPDATE execution 
                         SET status = 'failed', 
                             result = $2,
                             updated = NOW() 
                         WHERE id = $1 AND status = 'scheduled'"
                    )
                    .bind(envelope.payload.execution_id)
                    .bind(serde_json::json!({
                        "error": "Message expired in worker queue (worker unavailable)",
                        "failed_by": "dead_letter_handler"
                    }))
                    .execute(&pool)
                    .await?;
                    
                    Ok(())
                }
            })
            .await
    }
}

Solution 3: Worker Health Probes (LOW PRIORITY)

Add active health checking instead of relying solely on heartbeats.

Implementation:

// crates/executor/src/worker_health_checker.rs

pub struct WorkerHealthChecker {
    pool: PgPool,
    check_interval: Duration,
}

impl WorkerHealthChecker {
    pub async fn start(&self) -> Result<()> {
        let mut interval = tokio::time::interval(self.check_interval);

        loop {
            interval.tick().await;

            if let Err(e) = self.check_worker_health().await {
                error!("Error checking worker health: {}", e);
            }
        }
    }

    async fn check_worker_health(&self) -> Result<()> {
        let workers = WorkerRepository::find_action_workers(&self.pool).await?;

        for worker in workers {
            // Skip if heartbeat is very stale (worker is definitely down)
            if !is_heartbeat_recent(&worker) {
                continue;
            }

            // Attempt health check
            match self.ping_worker(&worker).await {
                Ok(true) => {
                    // Worker is healthy, ensure status is Active
                    if worker.status != Some(WorkerStatus::Active) {
                        self.update_worker_status(worker.id, WorkerStatus::Active).await?;
                    }
                }
                Ok(false) | Err(_) => {
                    // Worker is unhealthy, mark as inactive
                    warn!("Worker {} failed health check", worker.name);
                    self.update_worker_status(worker.id, WorkerStatus::Inactive).await?;
                }
            }
        }

        Ok(())
    }

    async fn ping_worker(&self, worker: &Worker) -> Result<bool> {
        // TODO: Implement health endpoint on worker
        // For now, check if worker's queue is being consumed
        Ok(true)
    }
}

Solution 4: Graceful Worker Shutdown (MEDIUM PRIORITY)

Ensure workers mark themselves as inactive before shutdown.

Implementation:

// In worker service shutdown handler
impl WorkerService {
    pub async fn shutdown(&self) -> Result<()> {
        info!("Worker shutting down gracefully...");

        // Mark worker as inactive
        sqlx::query(
            "UPDATE worker SET status = 'inactive', updated = NOW() WHERE id = $1"
        )
        .bind(self.worker_id)
        .execute(&self.pool)
        .await?;

        // Stop accepting new tasks
        self.stop_consuming().await?;

        // Wait for in-flight tasks to complete (with timeout)
        let timeout = Duration::from_secs(30);
        tokio::time::timeout(timeout, self.wait_for_completion()).await?;

        info!("Worker shutdown complete");
        Ok(())
    }
}

Docker Signal Handling:

# docker-compose.yaml
services:
  worker-shell:
    stop_grace_period: 45s  # Give worker time to finish tasks

Implementation Priority

Phase 1: Immediate (Week 1)

  1. Execution Timeout Monitor - Prevents stuck executions
  2. Graceful Shutdown - Marks workers inactive on stop

Phase 2: Short-term (Week 2)

  1. Worker Queue TTL + DLQ - Prevents message buildup
  2. Dead Letter Handler - Fails expired executions

Phase 3: Long-term (Month 1)

  1. Worker Health Probes - Active availability verification
  2. Retry Logic - Reschedule to different worker on failure

Configuration

executor:
  # How long an execution can stay SCHEDULED before failing
  scheduled_timeout: 300  # 5 minutes

  # How often to check for stale executions
  timeout_check_interval: 60  # 1 minute

  # Message TTL in worker queues
  worker_queue_ttl: 300  # 5 minutes (match scheduled_timeout)

  # Worker health check interval
  health_check_interval: 30  # 30 seconds

worker:
  # How often to send heartbeats
  heartbeat_interval: 10  # 10 seconds (more frequent)

  # Grace period for shutdown
  shutdown_timeout: 30  # 30 seconds

Staleness Calculation

Heartbeat Staleness Threshold = heartbeat_interval * 3
                               = 10 * 3 = 30 seconds

This means:
- Worker sends heartbeat every 10s
- If heartbeat is > 30s old, worker is considered stale
- Reduces window where stopped worker appears healthy from 90s to 30s

Monitoring and Observability

Metrics to Track

  1. Execution timeout rate: Number of executions failed due to timeout
  2. Worker downtime: Time between last heartbeat and status change
  3. Dead letter queue depth: Number of expired messages
  4. Average scheduling latency: Time from REQUESTED to RUNNING

Alerts

alerts:
  - name: high_execution_timeout_rate
    condition: execution_timeouts > 10 per minute
    severity: warning

  - name: no_active_workers
    condition: active_workers == 0
    severity: critical

  - name: dlq_buildup
    condition: dlq_depth > 100
    severity: warning

  - name: stale_executions
    condition: scheduled_executions_older_than_5min > 0
    severity: warning

Testing

Test Scenarios

  1. Worker stops mid-execution: Should timeout and fail
  2. Worker never picks up task: Should timeout after 5 minutes
  3. All workers down: Should immediately fail with "no workers available"
  4. Worker stops gracefully: Should mark inactive and not receive new tasks
  5. Message expires in queue: Should be moved to DLQ and execution failed

Integration Test Example

#[tokio::test]
async fn test_execution_timeout_on_worker_down() {
    let pool = setup_test_db().await;
    let mq = setup_test_mq().await;

    // Create worker and execution
    let worker = create_test_worker(&pool).await;
    let execution = create_test_execution(&pool).await;

    // Schedule execution to worker
    schedule_execution(&pool, &mq, execution.id, worker.id).await;

    // Stop worker (simulate crash - no graceful shutdown)
    stop_worker(worker.id).await;

    // Wait for timeout
    tokio::time::sleep(Duration::from_secs(310)).await;

    // Verify execution is marked as failed
    let execution = get_execution(&pool, execution.id).await;
    assert_eq!(execution.status, ExecutionStatus::Failed);
    assert!(execution.result.unwrap()["error"]
        .as_str()
        .unwrap()
        .contains("timeout"));
}

Migration Path

Step 1: Add Monitoring (No Breaking Changes)

  • Deploy execution timeout monitor
  • Monitor logs for timeout events
  • Tune timeout values based on actual workload

Step 2: Add DLQ (Requires Queue Reconfiguration)

  • Create dead letter exchange
  • Update queue declarations with TTL and DLX
  • Deploy dead letter handler
  • Monitor DLQ depth

Step 3: Graceful Shutdown (Worker Update)

  • Add shutdown handler to worker
  • Update Docker Compose stop_grace_period
  • Test worker restarts

Step 4: Health Probes (Future Enhancement)

  • Add health endpoint to worker
  • Deploy health checker service
  • Transition from heartbeat-only to active probing