Files
attune/crates/sensor/src/service.rs

344 lines
11 KiB
Rust

//! Sensor Service
//!
//! Main service orchestrator that coordinates sensor management
//! and rule lifecycle listening.
//!
//! Shutdown follows the same pattern as the worker service:
//! 1. Deregister worker (mark inactive, stop receiving new work)
//! 2. Stop heartbeat
//! 3. Stop sensor processes with configurable timeout
//! 4. Close MQ and DB connections
use crate::rule_lifecycle_listener::RuleLifecycleListener;
use crate::sensor_manager::SensorManager;
use crate::sensor_worker_registration::SensorWorkerRegistration;
use anyhow::Result;
use attune_common::config::Config;
use attune_common::db::Database;
use attune_common::mq::MessageQueue;
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
/// Sensor Service state
#[derive(Clone)]
pub struct SensorService {
inner: Arc<SensorServiceInner>,
}
struct SensorServiceInner {
config: Config,
db: PgPool,
mq: MessageQueue,
sensor_manager: Arc<SensorManager>,
rule_lifecycle_listener: Arc<RuleLifecycleListener>,
sensor_worker_registration: Arc<RwLock<SensorWorkerRegistration>>,
heartbeat_interval: u64,
heartbeat_running: Arc<RwLock<bool>>,
}
impl SensorService {
/// Create a new sensor service
pub async fn new(config: Config) -> Result<Self> {
info!("Initializing Sensor Service");
// Connect to database
info!("Connecting to database...");
let database = Database::new(&config.database).await?;
let db = database.pool().clone();
info!("Database connection established");
// Connect to message queue
info!("Connecting to message queue...");
let mq_config = config
.message_queue
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Message queue configuration is required"))?;
let mq = MessageQueue::connect(&mq_config.url).await?;
info!("Message queue connection established");
// Setup common message queue infrastructure (exchanges and DLX)
let mq_setup_config = attune_common::mq::MessageQueueConfig::default();
match mq
.get_connection()
.setup_common_infrastructure(&mq_setup_config)
.await
{
Ok(_) => info!("Common message queue infrastructure setup completed"),
Err(e) => {
warn!(
"Failed to setup common MQ infrastructure (may already exist): {}",
e
);
}
}
// Setup sensor-specific queues and bindings
match mq
.get_connection()
.setup_sensor_infrastructure(&mq_setup_config)
.await
{
Ok(_) => info!("Sensor message queue infrastructure setup completed"),
Err(e) => {
warn!(
"Failed to setup sensor MQ infrastructure (may already exist): {}",
e
);
}
}
// Create service components
info!("Creating service components...");
let sensor_manager = Arc::new(SensorManager::new(db.clone()));
// Create rule lifecycle listener
let rule_lifecycle_listener = Arc::new(RuleLifecycleListener::new(
db.clone(),
mq.get_connection().clone(),
sensor_manager.clone(),
));
// Create sensor worker registration
let sensor_worker_registration = SensorWorkerRegistration::new(db.clone(), &config);
let heartbeat_interval = config
.sensor
.as_ref()
.map(|s| s.heartbeat_interval)
.unwrap_or(30);
Ok(Self {
inner: Arc::new(SensorServiceInner {
config,
db,
mq,
sensor_manager,
rule_lifecycle_listener,
sensor_worker_registration: Arc::new(RwLock::new(sensor_worker_registration)),
heartbeat_interval,
heartbeat_running: Arc::new(RwLock::new(false)),
}),
})
}
/// Start the sensor service
///
/// Spawns background tasks (heartbeat, rule listener, sensor manager) and returns.
/// The caller is responsible for blocking on shutdown signals and calling `stop()`.
pub async fn start(&self) -> Result<()> {
info!("Starting Sensor Service");
// Register sensor worker
info!("Registering sensor worker...");
let worker_id = self
.inner
.sensor_worker_registration
.write()
.await
.register(&self.inner.config)
.await?;
info!("Sensor worker registered with ID: {}", worker_id);
// Start rule lifecycle listener
info!("Starting rule lifecycle listener...");
if let Err(e) = self.inner.rule_lifecycle_listener.start().await {
error!("Failed to start rule lifecycle listener: {}", e);
return Err(e);
}
info!("Rule lifecycle listener started");
// Start sensor manager
info!("Starting sensor manager...");
if let Err(e) = self.inner.sensor_manager.start().await {
error!("Failed to start sensor manager: {}", e);
return Err(e);
}
info!("Sensor manager started");
// Start heartbeat loop
*self.inner.heartbeat_running.write().await = true;
let registration = self.inner.sensor_worker_registration.clone();
let heartbeat_interval = self.inner.heartbeat_interval;
let heartbeat_running = self.inner.heartbeat_running.clone();
tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(heartbeat_interval));
loop {
ticker.tick().await;
if !*heartbeat_running.read().await {
info!("Heartbeat loop stopping");
break;
}
if let Err(e) = registration.read().await.heartbeat().await {
error!("Failed to send sensor worker heartbeat: {}", e);
}
}
info!("Heartbeat loop stopped");
});
info!("Sensor Service started successfully");
Ok(())
}
/// Stop the sensor service gracefully
///
/// Shutdown order (mirrors worker service pattern):
/// 1. Deregister worker (mark inactive to stop being scheduled for new work)
/// 2. Stop heartbeat
/// 3. Stop sensor processes with timeout
/// 4. Stop rule lifecycle listener
/// 5. Close MQ and DB connections
pub async fn stop(&self) -> Result<()> {
info!("Stopping Sensor Service - initiating graceful shutdown");
// 1. Deregister sensor worker first to stop receiving new work
info!("Marking sensor worker as inactive to stop receiving new work");
if let Err(e) = self
.inner
.sensor_worker_registration
.read()
.await
.deregister()
.await
{
error!("Failed to deregister sensor worker: {}", e);
}
// 2. Stop heartbeat
info!("Stopping heartbeat updates");
*self.inner.heartbeat_running.write().await = false;
// Wait a bit for heartbeat loop to notice the flag
tokio::time::sleep(Duration::from_millis(100)).await;
// 3. Stop sensor processes with timeout
let shutdown_timeout = self
.inner
.config
.sensor
.as_ref()
.map(|s| s.shutdown_timeout)
.unwrap_or(30);
info!(
"Waiting up to {} seconds for sensor processes to stop",
shutdown_timeout
);
let sensor_manager = self.inner.sensor_manager.clone();
let timeout_duration = Duration::from_secs(shutdown_timeout);
match tokio::time::timeout(timeout_duration, sensor_manager.stop()).await {
Ok(Ok(_)) => info!("All sensor processes stopped"),
Ok(Err(e)) => error!("Error stopping sensor processes: {}", e),
Err(_) => warn!(
"Shutdown timeout reached ({} seconds) - some sensor processes may have been interrupted",
shutdown_timeout
),
}
// 4. Stop rule lifecycle listener
info!("Stopping rule lifecycle listener...");
if let Err(e) = self.inner.rule_lifecycle_listener.stop().await {
error!("Failed to stop rule lifecycle listener: {}", e);
}
// 5. Close message queue connection
info!("Closing message queue connection...");
if let Err(e) = self.inner.mq.close().await {
warn!("Error closing message queue: {}", e);
}
// 6. Close database connection
info!("Closing database connection...");
self.inner.db.close().await;
info!("Sensor Service stopped successfully");
Ok(())
}
/// Get database pool
pub fn db(&self) -> &PgPool {
&self.inner.db
}
/// Get message queue
pub fn mq(&self) -> &MessageQueue {
&self.inner.mq
}
/// Get sensor manager
pub fn sensor_manager(&self) -> Arc<SensorManager> {
self.inner.sensor_manager.clone()
}
/// Get health status
pub async fn health_check(&self) -> HealthStatus {
// Check database connection
if let Err(e) = sqlx::query("SELECT 1").execute(&self.inner.db).await {
return HealthStatus::Unhealthy(format!("Database connection failed: {}", e));
}
// Check sensor manager health
let active_sensors = self.inner.sensor_manager.active_count().await;
let failed_sensors = self.inner.sensor_manager.failed_count().await;
if active_sensors == 0 {
return HealthStatus::Degraded("No active sensors".to_string());
}
if failed_sensors > 10 {
return HealthStatus::Degraded(format!("{} sensors have failed", failed_sensors));
}
HealthStatus::Healthy
}
}
/// Health status enumeration
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
/// Service is healthy
Healthy,
/// Service is degraded but operational
Degraded(String),
/// Service is unhealthy
Unhealthy(String),
}
impl std::fmt::Display for HealthStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HealthStatus::Healthy => write!(f, "healthy"),
HealthStatus::Degraded(msg) => write!(f, "degraded: {}", msg),
HealthStatus::Unhealthy(msg) => write!(f, "unhealthy: {}", msg),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_health_status_display() {
assert_eq!(HealthStatus::Healthy.to_string(), "healthy");
assert_eq!(
HealthStatus::Degraded("test".to_string()).to_string(),
"degraded: test"
);
assert_eq!(
HealthStatus::Unhealthy("error".to_string()).to_string(),
"unhealthy: error"
);
}
}