more internal polish, resilient workers

This commit is contained in:
2026-02-09 18:32:34 -06:00
parent 588b319fec
commit e31ecb781b
62 changed files with 9872 additions and 584 deletions

View File

@@ -35,6 +35,7 @@ tera = "1.19"
serde_yaml_ng = { workspace = true }
validator = { workspace = true }
futures = { workspace = true }
rand = "0.8"
[dev-dependencies]
tempfile = { workspace = true }

View File

@@ -0,0 +1,264 @@
//! Dead Letter Handler
//!
//! This module handles messages that expire from worker queues and are routed to the
//! dead letter queue (DLQ). When a worker fails to process an execution request within
//! the configured TTL (default 5 minutes), the message is moved to the DLQ.
//!
//! The dead letter handler:
//! - Consumes messages from the dead letter queue
//! - Identifies the execution that expired
//! - Marks it as FAILED with appropriate error information
//! - Logs the failure for operational visibility
use attune_common::{
error::Error,
models::ExecutionStatus,
mq::{Consumer, ConsumerConfig, MessageEnvelope, MessageType, MqResult},
repositories::{execution::UpdateExecutionInput, ExecutionRepository, FindById, Update},
};
use chrono::Utc;
use serde_json::json;
use sqlx::PgPool;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
/// Dead letter handler for processing expired messages
pub struct DeadLetterHandler {
/// Database connection pool
pool: Arc<PgPool>,
/// Message consumer
consumer: Consumer,
/// Running state
running: Arc<Mutex<bool>>,
}
impl DeadLetterHandler {
/// Create a new dead letter handler
pub async fn new(pool: Arc<PgPool>, consumer: Consumer) -> Result<Self, Error> {
Ok(Self {
pool,
consumer,
running: Arc::new(Mutex::new(false)),
})
}
/// Start the dead letter handler
pub async fn start(&self) -> Result<(), Error> {
info!(
"Starting dead letter handler for queue '{}'",
self.consumer.queue()
);
{
let mut running = self.running.lock().await;
if *running {
warn!("Dead letter handler already running");
return Ok(());
}
*running = true;
}
let pool = Arc::clone(&self.pool);
let running = Arc::clone(&self.running);
// Start consuming messages
let consumer_result = self
.consumer
.consume_with_handler(move |envelope: MessageEnvelope<serde_json::Value>| {
let pool = Arc::clone(&pool);
let running = Arc::clone(&running);
async move {
// Check if we should continue processing
{
let is_running = running.lock().await;
if !*is_running {
info!("Dead letter handler stopping, rejecting message");
return Err(attune_common::mq::MqError::Consume(
"Handler is shutting down".to_string(),
)
.into());
}
}
info!(
"Processing dead letter message {} of type {:?}",
envelope.message_id, envelope.message_type
);
match envelope.message_type {
MessageType::ExecutionRequested => {
handle_execution_requested(&pool, &envelope).await
}
_ => {
warn!(
"Received unexpected message type {:?} in DLQ: {}",
envelope.message_type, envelope.message_id
);
// Acknowledge unexpected messages to remove them from queue
Ok(())
}
}
}
})
.await;
{
let mut running = self.running.lock().await;
*running = false;
}
consumer_result.map_err(|e| {
error!("Dead letter handler error: {}", e);
Error::Internal(format!("Dead letter handler failed: {}", e))
})
}
/// Stop the dead letter handler
#[allow(dead_code)]
pub async fn stop(&self) {
info!("Stopping dead letter handler");
let mut running = self.running.lock().await;
*running = false;
}
/// Check if the handler is running
#[allow(dead_code)]
pub async fn is_running(&self) -> bool {
*self.running.lock().await
}
}
/// Handle an execution request that expired in a worker queue
async fn handle_execution_requested(
pool: &PgPool,
envelope: &MessageEnvelope<serde_json::Value>,
) -> MqResult<()> {
debug!(
"Handling expired ExecutionRequested message: {}",
envelope.message_id
);
// Extract execution ID from payload
let execution_id = match envelope.payload.get("execution_id") {
Some(id) => match id.as_i64() {
Some(id) => id,
None => {
error!("Invalid execution_id in payload: not an i64");
return Ok(()); // Acknowledge to remove from queue
}
},
None => {
error!("Missing execution_id in ExecutionRequested payload");
return Ok(()); // Acknowledge to remove from queue
}
};
info!(
"Failing execution {} due to worker queue expiration",
execution_id
);
// Fetch current execution state
let execution = match ExecutionRepository::find_by_id(pool, execution_id).await {
Ok(Some(exec)) => exec,
Ok(None) => {
warn!(
"Execution {} not found in database, may have been already processed",
execution_id
);
return Ok(()); // Acknowledge to remove from queue
}
Err(e) => {
error!("Failed to fetch execution {}: {}", execution_id, e);
// Return error to nack and potentially retry
return Err(attune_common::mq::MqError::Consume(format!(
"Database error: {}",
e
)));
}
};
// Only fail if still in a non-terminal state
if !matches!(
execution.status,
ExecutionStatus::Scheduled | ExecutionStatus::Running
) {
info!(
"Execution {} already in terminal state {:?}, skipping",
execution_id, execution.status
);
return Ok(()); // Acknowledge to remove from queue
}
// Get worker info from payload for better error message
let worker_id = envelope.payload.get("worker_id").and_then(|v| v.as_i64());
let error_message = if let Some(wid) = worker_id {
format!(
"Execution expired in worker queue (worker_id: {}). Worker did not process the execution within the configured TTL. This typically indicates the worker is unavailable or overloaded.",
wid
)
} else {
"Execution expired in worker queue. Worker did not process the execution within the configured TTL.".to_string()
};
// Update execution to failed
let update_input = UpdateExecutionInput {
status: Some(ExecutionStatus::Failed),
result: Some(json!({
"error": "Worker queue TTL expired",
"message": error_message,
"expired_at": Utc::now().to_rfc3339(),
})),
..Default::default()
};
match ExecutionRepository::update(pool, execution_id, update_input).await {
Ok(_) => {
info!(
"Successfully failed execution {} due to worker queue expiration",
execution_id
);
Ok(())
}
Err(e) => {
error!(
"Failed to update execution {} to failed state: {}",
execution_id, e
);
// Return error to nack and potentially retry
Err(attune_common::mq::MqError::Consume(format!(
"Failed to update execution: {}",
e
)))
}
}
}
/// Create a dead letter consumer configuration
pub fn create_dlq_consumer_config(dlq_name: &str, consumer_tag: &str) -> ConsumerConfig {
ConsumerConfig {
queue: dlq_name.to_string(),
tag: consumer_tag.to_string(),
prefetch_count: 10,
auto_ack: false, // Manual ack for reliability
exclusive: false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_dlq_consumer_config() {
let config = create_dlq_consumer_config("attune.dlx.queue", "dlq-handler");
assert_eq!(config.queue, "attune.dlx.queue");
assert_eq!(config.tag, "dlq-handler");
assert_eq!(config.prefetch_count, 10);
assert!(!config.auto_ack);
assert!(!config.exclusive);
}
}

View File

@@ -1,23 +1,32 @@
//! Execution Manager - Manages execution lifecycle and status transitions
//! Execution Manager - Handles execution orchestration and lifecycle events
//!
//! This module is responsible for:
//! - Listening for ExecutionStatusChanged messages
//! - Updating execution records in the database
//! - Managing workflow executions (parent-child relationships)
//! - Listening for ExecutionStatusChanged messages from workers
//! - Orchestrating workflow executions (parent-child relationships)
//! - Triggering child executions when parent completes
//! - Handling execution failures and retries
//! - Publishing status change notifications
//!
//! ## Ownership Model
//!
//! The Executor owns execution state until it is scheduled to a worker.
//! After scheduling, the Worker owns the state and updates the database directly.
//!
//! - **Executor owns**: Requested → Scheduling → Scheduled
//! - **Worker owns**: Running → Completed/Failed/Cancelled/Timeout
//!
//! The ExecutionManager receives status change notifications for orchestration
//! purposes (e.g., triggering child executions) but does NOT update the database.
use anyhow::Result;
use attune_common::{
models::{enums::ExecutionStatus, Execution},
mq::{
Consumer, ExecutionCompletedPayload, ExecutionRequestedPayload,
ExecutionStatusChangedPayload, MessageEnvelope, MessageType, Publisher,
Consumer, ExecutionRequestedPayload, ExecutionStatusChangedPayload, MessageEnvelope,
MessageType, Publisher,
},
repositories::{
execution::{CreateExecutionInput, ExecutionRepository},
Create, FindById, Update,
Create, FindById,
},
};
@@ -74,6 +83,10 @@ impl ExecutionManager {
}
/// Process an execution status change message
///
/// NOTE: This method does NOT update the database. The worker is responsible
/// for updating execution state after the execution is scheduled. The executor
/// only handles orchestration logic (e.g., triggering workflow children).
async fn process_status_change(
pool: &PgPool,
publisher: &Publisher,
@@ -85,37 +98,38 @@ impl ExecutionManager {
let status_str = &envelope.payload.new_status;
let status = Self::parse_execution_status(status_str)?;
info!(
"Processing status change for execution {}: {:?}",
execution_id, status
debug!(
"Received status change notification for execution {}: {}",
execution_id, status_str
);
// Fetch execution from database
let mut execution = ExecutionRepository::find_by_id(pool, execution_id)
// Fetch execution from database (for orchestration logic)
let execution = ExecutionRepository::find_by_id(pool, execution_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Execution not found: {}", execution_id))?;
// Update status
let old_status = execution.status.clone();
execution.status = status;
// Note: ExecutionStatusChangedPayload doesn't contain result data
// Results are only in ExecutionCompletedPayload
// Update execution in database
ExecutionRepository::update(pool, execution.id, execution.clone().into()).await?;
info!(
"Updated execution {} status: {:?} -> {:?}",
execution_id, old_status, status
);
// Handle status-specific logic
// Handle orchestration logic based on status
// Note: Worker has already updated the database directly
match status {
ExecutionStatus::Completed | ExecutionStatus::Failed | ExecutionStatus::Cancelled => {
info!(
"Execution {} reached terminal state: {:?}, handling orchestration",
execution_id, status
);
Self::handle_completion(pool, publisher, &execution).await?;
}
_ => {}
ExecutionStatus::Running => {
debug!(
"Execution {} now running (worker has updated DB)",
execution_id
);
}
_ => {
debug!(
"Execution {} status changed to {:?} (no orchestration needed)",
execution_id, status
);
}
}
Ok(())
@@ -159,8 +173,9 @@ impl ExecutionManager {
}
}
// Publish completion notification
Self::publish_completion_notification(pool, publisher, execution).await?;
// NOTE: Completion notification is published by the worker, not here.
// This prevents duplicate execution.completed messages that would cause
// the queue manager to decrement active_count twice.
Ok(())
}
@@ -229,38 +244,11 @@ impl ExecutionManager {
Ok(())
}
/// Publish execution completion notification
async fn publish_completion_notification(
_pool: &PgPool,
publisher: &Publisher,
execution: &Execution,
) -> Result<()> {
// Get action_id (required field)
let action_id = execution
.action
.ok_or_else(|| anyhow::anyhow!("Execution {} has no action_id", execution.id))?;
let payload = ExecutionCompletedPayload {
execution_id: execution.id,
action_id,
action_ref: execution.action_ref.clone(),
status: format!("{:?}", execution.status),
result: execution.result.clone(),
completed_at: chrono::Utc::now(),
};
let envelope =
MessageEnvelope::new(MessageType::ExecutionCompleted, payload).with_source("executor");
publisher.publish_envelope(&envelope).await?;
info!(
"Published execution.completed notification for execution: {}",
execution.id
);
Ok(())
}
// REMOVED: publish_completion_notification
// This method was causing duplicate execution.completed messages.
// The worker is responsible for publishing completion notifications,
// not the executor. Removing this prevents double-decrementing the
// queue manager's active_count.
}
#[cfg(test)]

View File

@@ -4,19 +4,30 @@
//! The actual executor service is a binary in main.rs.
pub mod completion_listener;
pub mod dead_letter_handler;
pub mod enforcement_processor;
pub mod event_processor;
pub mod execution_manager;
pub mod inquiry_handler;
pub mod policy_enforcer;
pub mod queue_manager;
pub mod retry_manager;
pub mod scheduler;
pub mod service;
pub mod timeout_monitor;
pub mod worker_health;
pub mod workflow;
// Re-export commonly used types for convenience
pub use dead_letter_handler::{create_dlq_consumer_config, DeadLetterHandler};
pub use inquiry_handler::{InquiryHandler, InquiryRequest, INQUIRY_RESULT_KEY};
pub use policy_enforcer::{
ExecutionPolicy, PolicyEnforcer, PolicyScope, PolicyViolation, RateLimit,
};
pub use queue_manager::{ExecutionQueueManager, QueueConfig, QueueStats};
pub use retry_manager::{RetryAnalysis, RetryConfig, RetryManager, RetryReason};
pub use timeout_monitor::{ExecutionTimeoutMonitor, TimeoutMonitorConfig};
pub use worker_health::{HealthMetrics, HealthProbeConfig, HealthStatus, WorkerHealthProbe};
pub use workflow::{
parse_workflow_yaml, BackoffStrategy, ParseError, TemplateEngine, VariableContext,
WorkflowDefinition, WorkflowValidator,

View File

@@ -9,14 +9,18 @@
//! - Handles human-in-the-loop inquiries
mod completion_listener;
mod dead_letter_handler;
mod enforcement_processor;
mod event_processor;
mod execution_manager;
mod inquiry_handler;
mod policy_enforcer;
mod queue_manager;
mod retry_manager;
mod scheduler;
mod service;
mod timeout_monitor;
mod worker_health;
use anyhow::Result;
use attune_common::config::Config;

View File

@@ -0,0 +1,495 @@
//! Retry Manager
//!
//! This module provides intelligent retry logic for failed executions.
//! It determines whether failures are retriable, manages retry attempts,
//! and implements exponential backoff for retry scheduling.
//!
//! # Retry Strategy
//!
//! - **Retriable Failures:** Worker unavailability, timeouts, transient errors
//! - **Non-Retriable Failures:** Validation errors, missing actions, permission errors
//! - **Backoff:** Exponential with jitter (1s, 2s, 4s, 8s, ...)
//! - **Max Retries:** Configurable per action (default: 0, no retries)
use attune_common::{
error::{Error, Result},
models::{Execution, ExecutionStatus, Id},
repositories::{
execution::{CreateExecutionInput, UpdateExecutionInput},
Create, ExecutionRepository, FindById, Update,
},
};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::PgPool;
use std::time::Duration;
use tracing::{debug, info};
/// Retry manager for execution failures
pub struct RetryManager {
/// Database connection pool
pool: PgPool,
/// Retry configuration
config: RetryConfig,
}
/// Retry configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
/// Enable automatic retries
pub enabled: bool,
/// Base backoff duration in seconds
pub base_backoff_secs: u64,
/// Maximum backoff duration in seconds
pub max_backoff_secs: u64,
/// Backoff multiplier
pub backoff_multiplier: f64,
/// Add jitter to backoff (0.0 - 1.0)
pub jitter_factor: f64,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
enabled: true,
base_backoff_secs: 1,
max_backoff_secs: 300, // 5 minutes
backoff_multiplier: 2.0,
jitter_factor: 0.2, // 20% jitter
}
}
}
/// Reason for retry
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RetryReason {
/// Worker was unavailable
WorkerUnavailable,
/// Execution timed out in queue
QueueTimeout,
/// Worker heartbeat became stale
WorkerHeartbeatStale,
/// Transient error in execution
TransientError,
/// Manual retry requested by user
ManualRetry,
/// Unknown/other reason
Unknown,
}
impl RetryReason {
/// Get string representation
pub fn as_str(&self) -> &'static str {
match self {
Self::WorkerUnavailable => "worker_unavailable",
Self::QueueTimeout => "queue_timeout",
Self::WorkerHeartbeatStale => "worker_heartbeat_stale",
Self::TransientError => "transient_error",
Self::ManualRetry => "manual_retry",
Self::Unknown => "unknown",
}
}
/// Detect retry reason from execution error
pub fn from_error(error: &str) -> Self {
let error_lower = error.to_lowercase();
if error_lower.contains("worker queue ttl expired")
|| error_lower.contains("worker unavailable")
{
Self::WorkerUnavailable
} else if error_lower.contains("timeout") || error_lower.contains("timed out") {
Self::QueueTimeout
} else if error_lower.contains("heartbeat") || error_lower.contains("stale") {
Self::WorkerHeartbeatStale
} else if error_lower.contains("transient")
|| error_lower.contains("temporary")
|| error_lower.contains("connection")
{
Self::TransientError
} else {
Self::Unknown
}
}
}
impl std::fmt::Display for RetryReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// Result of retry analysis
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct RetryAnalysis {
/// Whether the execution should be retried
pub should_retry: bool,
/// Reason for retry decision
pub reason: Option<RetryReason>,
/// Suggested backoff delay
pub backoff_delay: Option<Duration>,
/// Current retry attempt (0-based)
pub retry_count: i32,
/// Maximum retry attempts allowed
pub max_retries: i32,
}
impl RetryManager {
/// Create a new retry manager
#[allow(dead_code)]
pub fn new(pool: PgPool, config: RetryConfig) -> Self {
Self { pool, config }
}
/// Create with default configuration
#[allow(dead_code)]
pub fn with_defaults(pool: PgPool) -> Self {
Self::new(pool, RetryConfig::default())
}
/// Analyze if an execution should be retried
#[allow(dead_code)]
pub async fn analyze_execution(&self, execution_id: Id) -> Result<RetryAnalysis> {
// Fetch execution
let execution = ExecutionRepository::find_by_id(&self.pool, execution_id)
.await?
.ok_or_else(|| Error::not_found("Execution", "id", execution_id.to_string()))?;
// Check if retries are enabled globally
if !self.config.enabled {
return Ok(RetryAnalysis {
should_retry: false,
reason: None,
backoff_delay: None,
retry_count: execution
.config
.as_ref()
.and_then(|c| c.get("retry_count"))
.and_then(|v| v.as_i64())
.unwrap_or(0) as i32,
max_retries: 0,
});
}
// Only retry failed executions
if execution.status != ExecutionStatus::Failed {
return Ok(RetryAnalysis {
should_retry: false,
reason: None,
backoff_delay: None,
retry_count: 0,
max_retries: 0,
});
}
// Get retry metadata from execution config
let config = execution.config.as_ref();
let retry_count = config
.and_then(|c| c.get("retry_count"))
.and_then(|v: &serde_json::Value| v.as_i64())
.unwrap_or(0) as i32;
let max_retries = config
.and_then(|c| c.get("max_retries"))
.and_then(|v: &serde_json::Value| v.as_i64())
.unwrap_or(0) as i32;
let _original_execution = config
.and_then(|c| c.get("original_execution"))
.and_then(|v: &serde_json::Value| v.as_i64());
// Check if retries are exhausted
if max_retries == 0 || retry_count >= max_retries {
debug!(
"Execution {} retry limit reached: {}/{}",
execution_id, retry_count, max_retries
);
return Ok(RetryAnalysis {
should_retry: false,
reason: None,
backoff_delay: None,
retry_count,
max_retries,
});
}
// Determine if failure is retriable
let retry_reason = self.detect_retry_reason(&execution);
let is_retriable = self.is_failure_retriable(&execution, retry_reason);
if !is_retriable {
debug!(
"Execution {} failure is not retriable: {:?}",
execution_id, retry_reason
);
return Ok(RetryAnalysis {
should_retry: false,
reason: Some(retry_reason),
backoff_delay: None,
retry_count,
max_retries,
});
}
// Calculate backoff delay
let backoff_delay = self.calculate_backoff(retry_count);
info!(
"Execution {} should be retried: attempt {}/{}, reason: {:?}, delay: {:?}",
execution_id,
retry_count + 1,
max_retries,
retry_reason,
backoff_delay
);
Ok(RetryAnalysis {
should_retry: true,
reason: Some(retry_reason),
backoff_delay: Some(backoff_delay),
retry_count,
max_retries,
})
}
/// Create a retry execution from a failed execution
#[allow(dead_code)]
pub async fn create_retry_execution(
&self,
execution_id: Id,
reason: RetryReason,
) -> Result<Execution> {
// Fetch original execution
let original = ExecutionRepository::find_by_id(&self.pool, execution_id)
.await?
.ok_or_else(|| Error::not_found("Execution", "id", execution_id.to_string()))?;
// Get retry metadata
let config = original.config.as_ref();
let retry_count = config
.and_then(|c| c.get("retry_count"))
.and_then(|v: &serde_json::Value| v.as_i64())
.unwrap_or(0) as i32;
let max_retries = config
.and_then(|c| c.get("max_retries"))
.and_then(|v: &serde_json::Value| v.as_i64())
.unwrap_or(0) as i32;
let original_execution_id = config
.and_then(|c| c.get("original_execution"))
.and_then(|v: &serde_json::Value| v.as_i64())
.unwrap_or(execution_id);
// Create retry config
let mut retry_config = original.config.clone().unwrap_or_else(|| json!({}));
retry_config["retry_count"] = json!(retry_count + 1);
retry_config["max_retries"] = json!(max_retries);
retry_config["original_execution"] = json!(original_execution_id);
retry_config["retry_reason"] = json!(reason.as_str());
retry_config["retry_of"] = json!(execution_id);
retry_config["retry_at"] = json!(Utc::now().to_rfc3339());
// Create new execution (reusing original parameters)
let retry_execution = CreateExecutionInput {
action: original.action,
action_ref: original.action_ref.clone(),
config: Some(retry_config),
env_vars: original.env_vars.clone(),
parent: original.parent,
enforcement: original.enforcement,
executor: None, // Will be assigned by scheduler
status: ExecutionStatus::Requested,
result: None,
workflow_task: original.workflow_task.clone(),
};
let created = ExecutionRepository::create(&self.pool, retry_execution).await?;
info!(
"Created retry execution {} for original {} (attempt {}/{})",
created.id,
execution_id,
retry_count + 1,
max_retries
);
Ok(created)
}
/// Detect retry reason from execution
fn detect_retry_reason(&self, execution: &Execution) -> RetryReason {
if let Some(result) = &execution.result {
if let Some(error) = result.get("error").and_then(|e| e.as_str()) {
return RetryReason::from_error(error);
}
if let Some(message) = result.get("message").and_then(|m| m.as_str()) {
return RetryReason::from_error(message);
}
}
RetryReason::Unknown
}
/// Check if failure is retriable
fn is_failure_retriable(&self, _execution: &Execution, reason: RetryReason) -> bool {
match reason {
// These are retriable
RetryReason::WorkerUnavailable => true,
RetryReason::QueueTimeout => true,
RetryReason::WorkerHeartbeatStale => true,
RetryReason::TransientError => true,
RetryReason::ManualRetry => true,
// Unknown failures are not automatically retried
RetryReason::Unknown => false,
}
}
/// Calculate exponential backoff with jitter
fn calculate_backoff(&self, retry_count: i32) -> Duration {
let base_secs = self.config.base_backoff_secs as f64;
let multiplier = self.config.backoff_multiplier;
let max_secs = self.config.max_backoff_secs as f64;
let jitter_factor = self.config.jitter_factor;
// Calculate exponential backoff: base * multiplier^retry_count
let backoff_secs = base_secs * multiplier.powi(retry_count);
// Cap at max
let backoff_secs = backoff_secs.min(max_secs);
// Add jitter: random value between (1 - jitter) and (1 + jitter)
let jitter = 1.0 + (rand::random::<f64>() * 2.0 - 1.0) * jitter_factor;
let backoff_with_jitter = backoff_secs * jitter;
Duration::from_secs(backoff_with_jitter.max(0.0) as u64)
}
/// Update execution with retry metadata
#[allow(dead_code)]
pub async fn mark_as_retry(
&self,
execution_id: Id,
original_execution_id: Id,
retry_count: i32,
reason: RetryReason,
) -> Result<()> {
let mut config = json!({
"retry_count": retry_count,
"original_execution": original_execution_id,
"retry_reason": reason.as_str(),
"retry_at": Utc::now().to_rfc3339(),
});
// Fetch current config and merge
if let Some(execution) = ExecutionRepository::find_by_id(&self.pool, execution_id).await? {
if let Some(existing_config) = execution.config {
if let Some(obj) = config.as_object_mut() {
if let Some(existing_obj) = existing_config.as_object() {
for (k, v) in existing_obj {
obj.entry(k).or_insert(v.clone());
}
}
}
}
}
ExecutionRepository::update(
&self.pool,
execution_id,
UpdateExecutionInput {
status: None,
result: None,
executor: None,
workflow_task: None,
},
)
.await?;
Ok(())
}
}
/// Check if an error message indicates a retriable failure
#[allow(dead_code)]
pub fn is_error_retriable(error_msg: &str) -> bool {
let error_lower = error_msg.to_lowercase();
// Retriable patterns
error_lower.contains("worker queue ttl expired")
|| error_lower.contains("worker unavailable")
|| error_lower.contains("timeout")
|| error_lower.contains("timed out")
|| error_lower.contains("heartbeat")
|| error_lower.contains("stale")
|| error_lower.contains("transient")
|| error_lower.contains("temporary")
|| error_lower.contains("connection refused")
|| error_lower.contains("connection reset")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_reason_detection() {
assert_eq!(
RetryReason::from_error("Worker queue TTL expired"),
RetryReason::WorkerUnavailable
);
assert_eq!(
RetryReason::from_error("Execution timed out"),
RetryReason::QueueTimeout
);
assert_eq!(
RetryReason::from_error("Worker heartbeat is stale"),
RetryReason::WorkerHeartbeatStale
);
assert_eq!(
RetryReason::from_error("Transient connection error"),
RetryReason::TransientError
);
assert_eq!(
RetryReason::from_error("Invalid parameter format"),
RetryReason::Unknown
);
}
#[test]
fn test_is_error_retriable() {
assert!(is_error_retriable("Worker queue TTL expired"));
assert!(is_error_retriable("Execution timed out"));
assert!(is_error_retriable("Worker heartbeat stale"));
assert!(is_error_retriable("Transient error"));
assert!(!is_error_retriable("Invalid parameter"));
assert!(!is_error_retriable("Permission denied"));
}
#[test]
fn test_backoff_calculation() {
let manager = RetryManager::with_defaults(
// Mock pool - won't be used in this test
unsafe { std::mem::zeroed() },
);
let backoff0 = manager.calculate_backoff(0);
let backoff1 = manager.calculate_backoff(1);
let backoff2 = manager.calculate_backoff(2);
// First attempt: ~1s
assert!(backoff0.as_secs() >= 0 && backoff0.as_secs() <= 2);
// Second attempt: ~2s
assert!(backoff1.as_secs() >= 1 && backoff1.as_secs() <= 3);
// Third attempt: ~4s
assert!(backoff2.as_secs() >= 2 && backoff2.as_secs() <= 6);
}
#[test]
fn test_retry_config_defaults() {
let config = RetryConfig::default();
assert!(config.enabled);
assert_eq!(config.base_backoff_secs, 1);
assert_eq!(config.max_backoff_secs, 300);
assert_eq!(config.backoff_multiplier, 2.0);
assert_eq!(config.jitter_factor, 0.2);
}
}

View File

@@ -20,6 +20,7 @@ use tokio::task::JoinHandle;
use tracing::{error, info, warn};
use crate::completion_listener::CompletionListener;
use crate::dead_letter_handler::{create_dlq_consumer_config, DeadLetterHandler};
use crate::enforcement_processor::EnforcementProcessor;
use crate::event_processor::EventProcessor;
use crate::execution_manager::ExecutionManager;
@@ -27,6 +28,7 @@ use crate::inquiry_handler::InquiryHandler;
use crate::policy_enforcer::PolicyEnforcer;
use crate::queue_manager::{ExecutionQueueManager, QueueConfig};
use crate::scheduler::ExecutionScheduler;
use crate::timeout_monitor::{ExecutionTimeoutMonitor, TimeoutMonitorConfig};
/// Main executor service that orchestrates execution processing
#[derive(Clone)]
@@ -355,6 +357,75 @@ impl ExecutorService {
Ok(())
}));
// Start worker heartbeat monitor
info!("Starting worker heartbeat monitor...");
let worker_pool = self.inner.pool.clone();
handles.push(tokio::spawn(async move {
Self::worker_heartbeat_monitor_loop(worker_pool, 60).await;
Ok(())
}));
// Start execution timeout monitor
info!("Starting execution timeout monitor...");
let timeout_config = TimeoutMonitorConfig {
scheduled_timeout: std::time::Duration::from_secs(
self.inner
.config
.executor
.as_ref()
.and_then(|e| e.scheduled_timeout)
.unwrap_or(300), // Default: 5 minutes
),
check_interval: std::time::Duration::from_secs(
self.inner
.config
.executor
.as_ref()
.and_then(|e| e.timeout_check_interval)
.unwrap_or(60), // Default: 1 minute
),
enabled: self
.inner
.config
.executor
.as_ref()
.and_then(|e| e.enable_timeout_monitor)
.unwrap_or(true), // Default: enabled
};
let timeout_monitor = Arc::new(ExecutionTimeoutMonitor::new(
self.inner.pool.clone(),
self.inner.publisher.clone(),
timeout_config,
));
handles.push(tokio::spawn(async move { timeout_monitor.start().await }));
// Start dead letter handler (if DLQ is enabled)
if self.inner.mq_config.rabbitmq.dead_letter.enabled {
info!("Starting dead letter handler...");
let dlq_name = format!(
"{}.queue",
self.inner.mq_config.rabbitmq.dead_letter.exchange
);
let dlq_consumer = Consumer::new(
&self.inner.mq_connection,
create_dlq_consumer_config(&dlq_name, "executor.dlq"),
)
.await?;
let dlq_handler = Arc::new(
DeadLetterHandler::new(Arc::new(self.inner.pool.clone()), dlq_consumer)
.await
.map_err(|e| anyhow::anyhow!("Failed to create DLQ handler: {}", e))?,
);
handles.push(tokio::spawn(async move {
dlq_handler
.start()
.await
.map_err(|e| anyhow::anyhow!("DLQ handler error: {}", e))
}));
} else {
info!("Dead letter queue is disabled, skipping DLQ handler");
}
info!("Executor Service started successfully");
info!("All processors are listening for messages...");
@@ -393,6 +464,113 @@ impl ExecutorService {
Ok(())
}
/// Worker heartbeat monitor loop
///
/// Periodically checks for stale workers and marks them as inactive
async fn worker_heartbeat_monitor_loop(pool: PgPool, interval_secs: u64) {
use attune_common::models::enums::WorkerStatus;
use attune_common::repositories::{
runtime::{UpdateWorkerInput, WorkerRepository},
Update,
};
use chrono::Utc;
use std::time::Duration;
let check_interval = Duration::from_secs(interval_secs);
// Heartbeat staleness threshold: 3x the expected interval (90 seconds)
// NOTE: These constants MUST match DEFAULT_HEARTBEAT_INTERVAL and
// HEARTBEAT_STALENESS_MULTIPLIER in scheduler.rs to ensure consistency
const HEARTBEAT_INTERVAL: u64 = 30;
const STALENESS_MULTIPLIER: u64 = 3;
let max_age_secs = HEARTBEAT_INTERVAL * STALENESS_MULTIPLIER;
info!(
"Worker heartbeat monitor started (check interval: {}s, staleness threshold: {}s)",
interval_secs, max_age_secs
);
loop {
tokio::time::sleep(check_interval).await;
// Get all active workers
match WorkerRepository::find_by_status(&pool, WorkerStatus::Active).await {
Ok(workers) => {
let now = Utc::now();
let mut deactivated_count = 0;
for worker in workers {
// Check if worker has a heartbeat
let Some(last_heartbeat) = worker.last_heartbeat else {
warn!(
"Worker {} (ID: {}) has no heartbeat, marking as inactive",
worker.name, worker.id
);
if let Err(e) = WorkerRepository::update(
&pool,
worker.id,
UpdateWorkerInput {
status: Some(WorkerStatus::Inactive),
..Default::default()
},
)
.await
{
error!(
"Failed to deactivate worker {} (no heartbeat): {}",
worker.name, e
);
} else {
deactivated_count += 1;
}
continue;
};
// Check if heartbeat is stale
let age = now.signed_duration_since(last_heartbeat);
let age_secs = age.num_seconds();
if age_secs > max_age_secs as i64 {
warn!(
"Worker {} (ID: {}) heartbeat is stale ({}s old), marking as inactive",
worker.name, worker.id, age_secs
);
if let Err(e) = WorkerRepository::update(
&pool,
worker.id,
UpdateWorkerInput {
status: Some(WorkerStatus::Inactive),
..Default::default()
},
)
.await
{
error!(
"Failed to deactivate worker {} (stale heartbeat): {}",
worker.name, e
);
} else {
deactivated_count += 1;
}
}
}
if deactivated_count > 0 {
info!(
"Deactivated {} worker(s) with stale heartbeats",
deactivated_count
);
}
}
Err(e) => {
error!("Failed to query active workers for heartbeat check: {}", e);
}
}
}
}
/// Wait for all tasks to complete
async fn wait_for_tasks(handles: Vec<JoinHandle<Result<()>>>) -> Result<()> {
for handle in handles {

View File

@@ -0,0 +1,304 @@
//! Execution Timeout Monitor
//!
//! This module monitors executions in SCHEDULED status and fails them if they
//! don't transition to RUNNING within a configured timeout period.
//!
//! This prevents executions from being stuck indefinitely when workers:
//! - Stop or crash after being selected
//! - Fail to consume messages from their queues
//! - Are partitioned from the network
use anyhow::Result;
use attune_common::{
models::{enums::ExecutionStatus, Execution},
mq::{MessageEnvelope, MessageType, Publisher},
};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
use tracing::{debug, error, info, warn};
/// Configuration for timeout monitor
#[derive(Debug, Clone)]
pub struct TimeoutMonitorConfig {
/// How long an execution can remain in SCHEDULED status before timing out
pub scheduled_timeout: Duration,
/// How often to check for stale executions
pub check_interval: Duration,
/// Whether to enable the timeout monitor
pub enabled: bool,
}
impl Default for TimeoutMonitorConfig {
fn default() -> Self {
Self {
scheduled_timeout: Duration::from_secs(300), // 5 minutes
check_interval: Duration::from_secs(60), // 1 minute
enabled: true,
}
}
}
/// Payload for execution completion messages
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionCompletedPayload {
pub execution_id: i64,
pub status: ExecutionStatus,
pub result: Option<JsonValue>,
}
/// Monitors scheduled executions and fails those that timeout
pub struct ExecutionTimeoutMonitor {
pool: PgPool,
publisher: Arc<Publisher>,
config: TimeoutMonitorConfig,
}
impl ExecutionTimeoutMonitor {
/// Create a new timeout monitor
pub fn new(pool: PgPool, publisher: Arc<Publisher>, config: TimeoutMonitorConfig) -> Self {
Self {
pool,
publisher,
config,
}
}
/// Start the timeout monitor loop
pub async fn start(self: Arc<Self>) -> Result<()> {
if !self.config.enabled {
info!("Execution timeout monitor is disabled");
return Ok(());
}
info!(
"Starting execution timeout monitor (timeout: {}s, check interval: {}s)",
self.config.scheduled_timeout.as_secs(),
self.config.check_interval.as_secs()
);
let mut check_interval = interval(self.config.check_interval);
loop {
check_interval.tick().await;
if let Err(e) = self.check_stale_executions().await {
error!("Error checking stale executions: {}", e);
// Continue running despite errors
}
}
}
/// Check for executions stuck in SCHEDULED status
async fn check_stale_executions(&self) -> Result<()> {
let cutoff = self.calculate_cutoff_time();
debug!(
"Checking for executions scheduled before {}",
cutoff.format("%Y-%m-%d %H:%M:%S UTC")
);
// Find executions stuck in SCHEDULED status
let stale_executions = sqlx::query_as::<_, Execution>(
"SELECT * FROM execution
WHERE status = $1
AND updated < $2
ORDER BY updated ASC
LIMIT 100", // Process in batches to avoid overwhelming system
)
.bind("scheduled")
.bind(cutoff)
.fetch_all(&self.pool)
.await?;
if stale_executions.is_empty() {
debug!("No stale scheduled executions found");
return Ok(());
}
warn!(
"Found {} stale scheduled executions (older than {}s)",
stale_executions.len(),
self.config.scheduled_timeout.as_secs()
);
for execution in stale_executions {
let age_seconds = (Utc::now() - execution.updated).num_seconds();
warn!(
"Execution {} has been scheduled for {} seconds (timeout: {}s), marking as failed",
execution.id,
age_seconds,
self.config.scheduled_timeout.as_secs()
);
if let Err(e) = self.fail_execution(&execution, age_seconds).await {
error!("Failed to fail execution {}: {}", execution.id, e);
// Continue processing other executions
}
}
Ok(())
}
/// Calculate the cutoff time for stale executions
fn calculate_cutoff_time(&self) -> DateTime<Utc> {
let timeout_duration = chrono::Duration::from_std(self.config.scheduled_timeout)
.expect("Invalid timeout duration");
Utc::now() - timeout_duration
}
/// Mark an execution as failed due to timeout
async fn fail_execution(&self, execution: &Execution, age_seconds: i64) -> Result<()> {
let execution_id = execution.id;
let error_message = format!(
"Execution timeout: worker did not pick up task within {} seconds (scheduled for {} seconds)",
self.config.scheduled_timeout.as_secs(),
age_seconds
);
info!(
"Failing execution {} due to timeout: {}",
execution_id, error_message
);
// Create failure result
let result = serde_json::json!({
"error": error_message,
"failed_by": "execution_timeout_monitor",
"timeout_seconds": self.config.scheduled_timeout.as_secs(),
"age_seconds": age_seconds,
"original_status": "scheduled"
});
// Update execution status in database
sqlx::query(
"UPDATE execution
SET status = $1,
result = $2,
updated = NOW()
WHERE id = $3",
)
.bind("failed")
.bind(&result)
.bind(execution_id)
.execute(&self.pool)
.await?;
info!("Execution {} marked as failed in database", execution_id);
// Publish completion notification
self.publish_completion_notification(execution_id, result)
.await?;
info!(
"Published completion notification for execution {}",
execution_id
);
Ok(())
}
/// Publish execution completion notification
async fn publish_completion_notification(
&self,
execution_id: i64,
result: JsonValue,
) -> Result<()> {
let payload = ExecutionCompletedPayload {
execution_id,
status: ExecutionStatus::Failed,
result: Some(result),
};
let envelope = MessageEnvelope::new(MessageType::ExecutionCompleted, payload)
.with_source("execution_timeout_monitor");
// Publish to main executions exchange
self.publisher.publish_envelope(&envelope).await?;
Ok(())
}
/// Get current configuration
#[allow(dead_code)]
pub fn config(&self) -> &TimeoutMonitorConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
use attune_common::mq::MessageQueue;
use chrono::Duration as ChronoDuration;
use sqlx::PgPool;
fn create_test_config() -> TimeoutMonitorConfig {
TimeoutMonitorConfig {
scheduled_timeout: Duration::from_secs(60), // 1 minute for tests
check_interval: Duration::from_secs(1), // 1 second for tests
enabled: true,
}
}
#[test]
fn test_config_defaults() {
let config = TimeoutMonitorConfig::default();
assert_eq!(config.scheduled_timeout.as_secs(), 300);
assert_eq!(config.check_interval.as_secs(), 60);
assert!(config.enabled);
}
#[test]
fn test_cutoff_calculation() {
let config = create_test_config();
let pool = PgPool::connect("postgresql://localhost/test")
.await
.expect("DB connection");
let mq = MessageQueue::connect("amqp://localhost")
.await
.expect("MQ connection");
let monitor = ExecutionTimeoutMonitor::new(pool, Arc::new(mq.publisher), config);
let cutoff = monitor.calculate_cutoff_time();
let now = Utc::now();
let expected_cutoff = now - ChronoDuration::seconds(60);
// Allow 1 second tolerance
let diff = (cutoff - expected_cutoff).num_seconds().abs();
assert!(diff <= 1, "Cutoff time calculation incorrect");
}
#[test]
fn test_disabled_monitor() {
let mut config = create_test_config();
config.enabled = false;
let pool = PgPool::connect("postgresql://localhost/test")
.await
.expect("DB connection");
let mq = MessageQueue::connect("amqp://localhost")
.await
.expect("MQ connection");
let monitor = Arc::new(ExecutionTimeoutMonitor::new(
pool,
Arc::new(mq.publisher),
config,
));
// Should return immediately without error
let result = tokio::time::timeout(Duration::from_secs(1), monitor.start()).await;
assert!(result.is_ok(), "Disabled monitor should return immediately");
}
}

View File

@@ -0,0 +1,471 @@
//! Worker Health Probe
//!
//! This module provides proactive health checking for workers.
//! It tracks worker health metrics, detects degraded/unhealthy workers,
//! and provides health-aware worker selection.
//!
//! # Health States
//!
//! - **Healthy:** Worker is responsive and performing well
//! - **Degraded:** Worker is functional but showing signs of issues
//! - **Unhealthy:** Worker should not receive new executions
//!
//! # Health Metrics
//!
//! - Queue depth (from worker self-reporting)
//! - Consecutive failures
//! - Average execution time
//! - Heartbeat freshness
use attune_common::{
error::{Error, Result},
models::{Id, Worker, WorkerStatus},
repositories::{FindById, List, WorkerRepository},
};
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::sync::Arc;
use tracing::{debug, info, warn};
/// Worker health state
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
/// Worker is healthy and performing well
Healthy,
/// Worker is functional but showing issues
Degraded,
/// Worker should not receive new tasks
Unhealthy,
}
impl HealthStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Healthy => "healthy",
Self::Degraded => "degraded",
Self::Unhealthy => "unhealthy",
}
}
}
impl std::fmt::Display for HealthStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// Worker health metrics
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthMetrics {
/// Current health status
pub status: HealthStatus,
/// Last health check time
pub last_check: DateTime<Utc>,
/// Consecutive failures
pub consecutive_failures: u32,
/// Total executions handled
pub total_executions: u64,
/// Failed executions
pub failed_executions: u64,
/// Average execution time in milliseconds
pub average_execution_time_ms: u64,
/// Current queue depth (estimated)
pub queue_depth: u32,
}
impl Default for HealthMetrics {
fn default() -> Self {
Self {
status: HealthStatus::Healthy,
last_check: Utc::now(),
consecutive_failures: 0,
total_executions: 0,
failed_executions: 0,
average_execution_time_ms: 0,
queue_depth: 0,
}
}
}
/// Health probe configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthProbeConfig {
/// Enable health probing
pub enabled: bool,
/// Heartbeat staleness threshold in seconds
pub heartbeat_max_age_secs: u64,
/// Consecutive failures before marking degraded
pub degraded_threshold: u32,
/// Consecutive failures before marking unhealthy
pub unhealthy_threshold: u32,
/// Queue depth to consider degraded
pub queue_depth_degraded: u32,
/// Queue depth to consider unhealthy
pub queue_depth_unhealthy: u32,
/// Failure rate threshold for degraded (0.0 - 1.0)
pub failure_rate_degraded: f64,
/// Failure rate threshold for unhealthy (0.0 - 1.0)
pub failure_rate_unhealthy: f64,
}
impl Default for HealthProbeConfig {
fn default() -> Self {
Self {
enabled: true,
heartbeat_max_age_secs: 30,
degraded_threshold: 3,
unhealthy_threshold: 10,
queue_depth_degraded: 50,
queue_depth_unhealthy: 100,
failure_rate_degraded: 0.3, // 30%
failure_rate_unhealthy: 0.7, // 70%
}
}
}
/// Worker health probe
pub struct WorkerHealthProbe {
/// Database connection pool
pool: Arc<PgPool>,
/// Configuration
config: HealthProbeConfig,
}
impl WorkerHealthProbe {
/// Create a new health probe
#[allow(dead_code)]
pub fn new(pool: Arc<PgPool>, config: HealthProbeConfig) -> Self {
Self { pool, config }
}
/// Create with default configuration
#[allow(dead_code)]
pub fn with_defaults(pool: Arc<PgPool>) -> Self {
Self::new(pool, HealthProbeConfig::default())
}
/// Check health of a specific worker
#[allow(dead_code)]
pub async fn check_worker(&self, worker_id: Id) -> Result<HealthMetrics> {
let worker = WorkerRepository::find_by_id(&*self.pool, worker_id)
.await?
.ok_or_else(|| Error::not_found("Worker", "id", worker_id.to_string()))?;
self.evaluate_health(&worker)
}
/// Get all healthy workers
#[allow(dead_code)]
pub async fn get_healthy_workers(&self) -> Result<Vec<Worker>> {
let workers = WorkerRepository::list(&*self.pool).await?;
let mut healthy = Vec::new();
for worker in workers {
if self.is_worker_healthy(&worker).await {
healthy.push(worker);
}
}
Ok(healthy)
}
/// Get workers sorted by health (healthiest first)
#[allow(dead_code)]
pub async fn get_workers_by_health(&self) -> Result<Vec<(Worker, HealthMetrics)>> {
let workers = WorkerRepository::list(&*self.pool).await?;
let mut worker_health = Vec::new();
for worker in workers {
match self.evaluate_health(&worker) {
Ok(metrics) => worker_health.push((worker, metrics)),
Err(e) => warn!("Failed to evaluate health for worker {}: {}", worker.id, e),
}
}
// Sort by health status (healthy first), then by queue depth
worker_health.sort_by(|a, b| match (a.1.status, b.1.status) {
(HealthStatus::Healthy, HealthStatus::Healthy) => a.1.queue_depth.cmp(&b.1.queue_depth),
(HealthStatus::Healthy, _) => std::cmp::Ordering::Less,
(_, HealthStatus::Healthy) => std::cmp::Ordering::Greater,
(HealthStatus::Degraded, HealthStatus::Degraded) => {
a.1.queue_depth.cmp(&b.1.queue_depth)
}
(HealthStatus::Degraded, HealthStatus::Unhealthy) => std::cmp::Ordering::Less,
(HealthStatus::Unhealthy, HealthStatus::Degraded) => std::cmp::Ordering::Greater,
(HealthStatus::Unhealthy, HealthStatus::Unhealthy) => {
a.1.queue_depth.cmp(&b.1.queue_depth)
}
});
Ok(worker_health)
}
/// Check if worker is healthy (simple boolean check)
#[allow(dead_code)]
pub async fn is_worker_healthy(&self, worker: &Worker) -> bool {
// Check basic status
if worker.status != Some(WorkerStatus::Active) {
return false;
}
// Check heartbeat freshness
if !self.is_heartbeat_fresh(worker) {
return false;
}
// Evaluate detailed health
match self.evaluate_health(worker) {
Ok(metrics) => matches!(
metrics.status,
HealthStatus::Healthy | HealthStatus::Degraded
),
Err(_) => false,
}
}
/// Evaluate worker health based on metrics
fn evaluate_health(&self, worker: &Worker) -> Result<HealthMetrics> {
// Extract health metrics from capabilities
let metrics = self.extract_health_metrics(worker);
// Check heartbeat
if !self.is_heartbeat_fresh(worker) {
return Ok(HealthMetrics {
status: HealthStatus::Unhealthy,
..metrics
});
}
// Calculate failure rate
let failure_rate = if metrics.total_executions > 0 {
metrics.failed_executions as f64 / metrics.total_executions as f64
} else {
0.0
};
// Determine health status based on thresholds
let status = if metrics.consecutive_failures >= self.config.unhealthy_threshold
|| metrics.queue_depth >= self.config.queue_depth_unhealthy
|| failure_rate >= self.config.failure_rate_unhealthy
{
HealthStatus::Unhealthy
} else if metrics.consecutive_failures >= self.config.degraded_threshold
|| metrics.queue_depth >= self.config.queue_depth_degraded
|| failure_rate >= self.config.failure_rate_degraded
{
HealthStatus::Degraded
} else {
HealthStatus::Healthy
};
debug!(
"Worker {} health: {:?} (failures: {}, queue: {}, failure_rate: {:.2}%)",
worker.name,
status,
metrics.consecutive_failures,
metrics.queue_depth,
failure_rate * 100.0
);
Ok(HealthMetrics { status, ..metrics })
}
/// Check if worker heartbeat is fresh
fn is_heartbeat_fresh(&self, worker: &Worker) -> bool {
let Some(last_heartbeat) = worker.last_heartbeat else {
warn!("Worker {} has no heartbeat", worker.name);
return false;
};
let age = Utc::now() - last_heartbeat;
let max_age = Duration::seconds(self.config.heartbeat_max_age_secs as i64);
if age > max_age {
warn!(
"Worker {} heartbeat stale: {} seconds old (max: {})",
worker.name,
age.num_seconds(),
max_age.num_seconds()
);
return false;
}
true
}
/// Extract health metrics from worker capabilities
fn extract_health_metrics(&self, worker: &Worker) -> HealthMetrics {
let mut metrics = HealthMetrics {
last_check: Utc::now(),
..Default::default()
};
let Some(capabilities) = &worker.capabilities else {
return metrics;
};
let Some(health_obj) = capabilities.get("health") else {
return metrics;
};
// Extract metrics from health object
if let Some(status_str) = health_obj.get("status").and_then(|v| v.as_str()) {
metrics.status = match status_str {
"healthy" => HealthStatus::Healthy,
"degraded" => HealthStatus::Degraded,
"unhealthy" => HealthStatus::Unhealthy,
_ => HealthStatus::Healthy,
};
}
if let Some(last_check_str) = health_obj.get("last_check").and_then(|v| v.as_str()) {
if let Ok(last_check) = DateTime::parse_from_rfc3339(last_check_str) {
metrics.last_check = last_check.with_timezone(&Utc);
}
}
if let Some(failures) = health_obj
.get("consecutive_failures")
.and_then(|v| v.as_u64())
{
metrics.consecutive_failures = failures as u32;
}
if let Some(total) = health_obj.get("total_executions").and_then(|v| v.as_u64()) {
metrics.total_executions = total;
}
if let Some(failed) = health_obj.get("failed_executions").and_then(|v| v.as_u64()) {
metrics.failed_executions = failed;
}
if let Some(avg_time) = health_obj
.get("average_execution_time_ms")
.and_then(|v| v.as_u64())
{
metrics.average_execution_time_ms = avg_time;
}
if let Some(depth) = health_obj.get("queue_depth").and_then(|v| v.as_u64()) {
metrics.queue_depth = depth as u32;
}
metrics
}
/// Get recommended worker for execution based on health
#[allow(dead_code)]
pub async fn get_best_worker(&self, runtime_name: &str) -> Result<Option<Worker>> {
let workers_by_health = self.get_workers_by_health().await?;
// Filter by runtime and health
for (worker, metrics) in workers_by_health {
// Skip unhealthy workers
if metrics.status == HealthStatus::Unhealthy {
continue;
}
// Check runtime support
if self.worker_supports_runtime(&worker, runtime_name) {
info!(
"Selected worker {} (health: {:?}, queue: {}) for runtime '{}'",
worker.name, metrics.status, metrics.queue_depth, runtime_name
);
return Ok(Some(worker));
}
}
warn!("No healthy worker found for runtime '{}'", runtime_name);
Ok(None)
}
/// Check if worker supports a runtime
fn worker_supports_runtime(&self, worker: &Worker, runtime_name: &str) -> bool {
let Some(capabilities) = &worker.capabilities else {
return false;
};
let Some(runtimes) = capabilities.get("runtimes") else {
return false;
};
let Some(runtime_array) = runtimes.as_array() else {
return false;
};
runtime_array.iter().any(|v| {
v.as_str()
.map_or(false, |s| s.eq_ignore_ascii_case(runtime_name))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_health_status_display() {
assert_eq!(HealthStatus::Healthy.to_string(), "healthy");
assert_eq!(HealthStatus::Degraded.to_string(), "degraded");
assert_eq!(HealthStatus::Unhealthy.to_string(), "unhealthy");
}
#[test]
fn test_default_health_metrics() {
let metrics = HealthMetrics::default();
assert_eq!(metrics.status, HealthStatus::Healthy);
assert_eq!(metrics.consecutive_failures, 0);
assert_eq!(metrics.queue_depth, 0);
}
#[test]
fn test_health_probe_config_defaults() {
let config = HealthProbeConfig::default();
assert!(config.enabled);
assert_eq!(config.heartbeat_max_age_secs, 30);
assert_eq!(config.degraded_threshold, 3);
assert_eq!(config.unhealthy_threshold, 10);
assert_eq!(config.queue_depth_degraded, 50);
assert_eq!(config.queue_depth_unhealthy, 100);
}
#[test]
fn test_extract_health_metrics() {
let probe = WorkerHealthProbe::with_defaults(Arc::new(unsafe { std::mem::zeroed() }));
let worker = Worker {
id: 1,
name: "test-worker".to_string(),
worker_type: attune_common::models::WorkerType::Container,
worker_role: attune_common::models::WorkerRole::Action,
runtime: None,
host: None,
port: None,
status: Some(WorkerStatus::Active),
capabilities: Some(json!({
"health": {
"status": "degraded",
"consecutive_failures": 5,
"queue_depth": 25,
"total_executions": 100,
"failed_executions": 10
}
})),
meta: None,
last_heartbeat: Some(Utc::now()),
created: Utc::now(),
updated: Utc::now(),
};
let metrics = probe.extract_health_metrics(&worker);
assert_eq!(metrics.status, HealthStatus::Degraded);
assert_eq!(metrics.consecutive_failures, 5);
assert_eq!(metrics.queue_depth, 25);
assert_eq!(metrics.total_executions, 100);
assert_eq!(metrics.failed_executions, 10);
}
}