344 lines
11 KiB
Rust
344 lines
11 KiB
Rust
//! Sensor Service
|
|
//!
|
|
//! Main service orchestrator that coordinates sensor management
|
|
//! and rule lifecycle listening.
|
|
//!
|
|
//! Shutdown follows the same pattern as the worker service:
|
|
//! 1. Deregister worker (mark inactive, stop receiving new work)
|
|
//! 2. Stop heartbeat
|
|
//! 3. Stop sensor processes with configurable timeout
|
|
//! 4. Close MQ and DB connections
|
|
|
|
use crate::rule_lifecycle_listener::RuleLifecycleListener;
|
|
use crate::sensor_manager::SensorManager;
|
|
use crate::sensor_worker_registration::SensorWorkerRegistration;
|
|
use anyhow::Result;
|
|
use attune_common::config::Config;
|
|
use attune_common::db::Database;
|
|
use attune_common::mq::MessageQueue;
|
|
use sqlx::PgPool;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::sync::RwLock;
|
|
use tracing::{error, info, warn};
|
|
|
|
/// Sensor Service state
|
|
#[derive(Clone)]
|
|
pub struct SensorService {
|
|
inner: Arc<SensorServiceInner>,
|
|
}
|
|
|
|
struct SensorServiceInner {
|
|
config: Config,
|
|
db: PgPool,
|
|
mq: MessageQueue,
|
|
sensor_manager: Arc<SensorManager>,
|
|
rule_lifecycle_listener: Arc<RuleLifecycleListener>,
|
|
sensor_worker_registration: Arc<RwLock<SensorWorkerRegistration>>,
|
|
heartbeat_interval: u64,
|
|
heartbeat_running: Arc<RwLock<bool>>,
|
|
}
|
|
|
|
impl SensorService {
|
|
/// Create a new sensor service
|
|
pub async fn new(config: Config) -> Result<Self> {
|
|
info!("Initializing Sensor Service");
|
|
|
|
// Connect to database
|
|
info!("Connecting to database...");
|
|
let database = Database::new(&config.database).await?;
|
|
let db = database.pool().clone();
|
|
info!("Database connection established");
|
|
|
|
// Connect to message queue
|
|
info!("Connecting to message queue...");
|
|
let mq_config = config
|
|
.message_queue
|
|
.as_ref()
|
|
.ok_or_else(|| anyhow::anyhow!("Message queue configuration is required"))?;
|
|
let mq = MessageQueue::connect(&mq_config.url).await?;
|
|
info!("Message queue connection established");
|
|
|
|
// Setup common message queue infrastructure (exchanges and DLX)
|
|
let mq_setup_config = attune_common::mq::MessageQueueConfig::default();
|
|
match mq
|
|
.get_connection()
|
|
.setup_common_infrastructure(&mq_setup_config)
|
|
.await
|
|
{
|
|
Ok(_) => info!("Common message queue infrastructure setup completed"),
|
|
Err(e) => {
|
|
warn!(
|
|
"Failed to setup common MQ infrastructure (may already exist): {}",
|
|
e
|
|
);
|
|
}
|
|
}
|
|
|
|
// Setup sensor-specific queues and bindings
|
|
match mq
|
|
.get_connection()
|
|
.setup_sensor_infrastructure(&mq_setup_config)
|
|
.await
|
|
{
|
|
Ok(_) => info!("Sensor message queue infrastructure setup completed"),
|
|
Err(e) => {
|
|
warn!(
|
|
"Failed to setup sensor MQ infrastructure (may already exist): {}",
|
|
e
|
|
);
|
|
}
|
|
}
|
|
|
|
// Create service components
|
|
info!("Creating service components...");
|
|
|
|
let sensor_manager = Arc::new(SensorManager::new(db.clone()));
|
|
|
|
// Create rule lifecycle listener
|
|
let rule_lifecycle_listener = Arc::new(RuleLifecycleListener::new(
|
|
db.clone(),
|
|
mq.get_connection().clone(),
|
|
sensor_manager.clone(),
|
|
));
|
|
|
|
// Create sensor worker registration
|
|
let sensor_worker_registration = SensorWorkerRegistration::new(db.clone(), &config);
|
|
let heartbeat_interval = config
|
|
.sensor
|
|
.as_ref()
|
|
.map(|s| s.heartbeat_interval)
|
|
.unwrap_or(30);
|
|
|
|
Ok(Self {
|
|
inner: Arc::new(SensorServiceInner {
|
|
config,
|
|
db,
|
|
mq,
|
|
sensor_manager,
|
|
rule_lifecycle_listener,
|
|
sensor_worker_registration: Arc::new(RwLock::new(sensor_worker_registration)),
|
|
heartbeat_interval,
|
|
heartbeat_running: Arc::new(RwLock::new(false)),
|
|
}),
|
|
})
|
|
}
|
|
|
|
/// Start the sensor service
|
|
///
|
|
/// Spawns background tasks (heartbeat, rule listener, sensor manager) and returns.
|
|
/// The caller is responsible for blocking on shutdown signals and calling `stop()`.
|
|
pub async fn start(&self) -> Result<()> {
|
|
info!("Starting Sensor Service");
|
|
|
|
// Register sensor worker
|
|
info!("Registering sensor worker...");
|
|
let worker_id = self
|
|
.inner
|
|
.sensor_worker_registration
|
|
.write()
|
|
.await
|
|
.register(&self.inner.config)
|
|
.await?;
|
|
info!("Sensor worker registered with ID: {}", worker_id);
|
|
|
|
// Start rule lifecycle listener
|
|
info!("Starting rule lifecycle listener...");
|
|
if let Err(e) = self.inner.rule_lifecycle_listener.start().await {
|
|
error!("Failed to start rule lifecycle listener: {}", e);
|
|
return Err(e);
|
|
}
|
|
info!("Rule lifecycle listener started");
|
|
|
|
// Start sensor manager
|
|
info!("Starting sensor manager...");
|
|
if let Err(e) = self.inner.sensor_manager.start().await {
|
|
error!("Failed to start sensor manager: {}", e);
|
|
return Err(e);
|
|
}
|
|
info!("Sensor manager started");
|
|
|
|
// Start heartbeat loop
|
|
*self.inner.heartbeat_running.write().await = true;
|
|
|
|
let registration = self.inner.sensor_worker_registration.clone();
|
|
let heartbeat_interval = self.inner.heartbeat_interval;
|
|
let heartbeat_running = self.inner.heartbeat_running.clone();
|
|
tokio::spawn(async move {
|
|
let mut ticker = tokio::time::interval(Duration::from_secs(heartbeat_interval));
|
|
|
|
loop {
|
|
ticker.tick().await;
|
|
|
|
if !*heartbeat_running.read().await {
|
|
info!("Heartbeat loop stopping");
|
|
break;
|
|
}
|
|
|
|
if let Err(e) = registration.read().await.heartbeat().await {
|
|
error!("Failed to send sensor worker heartbeat: {}", e);
|
|
}
|
|
}
|
|
|
|
info!("Heartbeat loop stopped");
|
|
});
|
|
|
|
info!("Sensor Service started successfully");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Stop the sensor service gracefully
|
|
///
|
|
/// Shutdown order (mirrors worker service pattern):
|
|
/// 1. Deregister worker (mark inactive to stop being scheduled for new work)
|
|
/// 2. Stop heartbeat
|
|
/// 3. Stop sensor processes with timeout
|
|
/// 4. Stop rule lifecycle listener
|
|
/// 5. Close MQ and DB connections
|
|
pub async fn stop(&self) -> Result<()> {
|
|
info!("Stopping Sensor Service - initiating graceful shutdown");
|
|
|
|
// 1. Deregister sensor worker first to stop receiving new work
|
|
info!("Marking sensor worker as inactive to stop receiving new work");
|
|
if let Err(e) = self
|
|
.inner
|
|
.sensor_worker_registration
|
|
.read()
|
|
.await
|
|
.deregister()
|
|
.await
|
|
{
|
|
error!("Failed to deregister sensor worker: {}", e);
|
|
}
|
|
|
|
// 2. Stop heartbeat
|
|
info!("Stopping heartbeat updates");
|
|
*self.inner.heartbeat_running.write().await = false;
|
|
|
|
// Wait a bit for heartbeat loop to notice the flag
|
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
|
|
// 3. Stop sensor processes with timeout
|
|
let shutdown_timeout = self
|
|
.inner
|
|
.config
|
|
.sensor
|
|
.as_ref()
|
|
.map(|s| s.shutdown_timeout)
|
|
.unwrap_or(30);
|
|
|
|
info!(
|
|
"Waiting up to {} seconds for sensor processes to stop",
|
|
shutdown_timeout
|
|
);
|
|
|
|
let sensor_manager = self.inner.sensor_manager.clone();
|
|
let timeout_duration = Duration::from_secs(shutdown_timeout);
|
|
match tokio::time::timeout(timeout_duration, sensor_manager.stop()).await {
|
|
Ok(Ok(_)) => info!("All sensor processes stopped"),
|
|
Ok(Err(e)) => error!("Error stopping sensor processes: {}", e),
|
|
Err(_) => warn!(
|
|
"Shutdown timeout reached ({} seconds) - some sensor processes may have been interrupted",
|
|
shutdown_timeout
|
|
),
|
|
}
|
|
|
|
// 4. Stop rule lifecycle listener
|
|
info!("Stopping rule lifecycle listener...");
|
|
if let Err(e) = self.inner.rule_lifecycle_listener.stop().await {
|
|
error!("Failed to stop rule lifecycle listener: {}", e);
|
|
}
|
|
|
|
// 5. Close message queue connection
|
|
info!("Closing message queue connection...");
|
|
if let Err(e) = self.inner.mq.close().await {
|
|
warn!("Error closing message queue: {}", e);
|
|
}
|
|
|
|
// 6. Close database connection
|
|
info!("Closing database connection...");
|
|
self.inner.db.close().await;
|
|
|
|
info!("Sensor Service stopped successfully");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Get database pool
|
|
pub fn db(&self) -> &PgPool {
|
|
&self.inner.db
|
|
}
|
|
|
|
/// Get message queue
|
|
pub fn mq(&self) -> &MessageQueue {
|
|
&self.inner.mq
|
|
}
|
|
|
|
/// Get sensor manager
|
|
pub fn sensor_manager(&self) -> Arc<SensorManager> {
|
|
self.inner.sensor_manager.clone()
|
|
}
|
|
|
|
/// Get health status
|
|
pub async fn health_check(&self) -> HealthStatus {
|
|
// Check database connection
|
|
if let Err(e) = sqlx::query("SELECT 1").execute(&self.inner.db).await {
|
|
return HealthStatus::Unhealthy(format!("Database connection failed: {}", e));
|
|
}
|
|
|
|
// Check sensor manager health
|
|
let active_sensors = self.inner.sensor_manager.active_count().await;
|
|
let failed_sensors = self.inner.sensor_manager.failed_count().await;
|
|
|
|
if active_sensors == 0 {
|
|
return HealthStatus::Degraded("No active sensors".to_string());
|
|
}
|
|
|
|
if failed_sensors > 10 {
|
|
return HealthStatus::Degraded(format!("{} sensors have failed", failed_sensors));
|
|
}
|
|
|
|
HealthStatus::Healthy
|
|
}
|
|
}
|
|
|
|
/// Health status enumeration
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub enum HealthStatus {
|
|
/// Service is healthy
|
|
Healthy,
|
|
/// Service is degraded but operational
|
|
Degraded(String),
|
|
/// Service is unhealthy
|
|
Unhealthy(String),
|
|
}
|
|
|
|
impl std::fmt::Display for HealthStatus {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
HealthStatus::Healthy => write!(f, "healthy"),
|
|
HealthStatus::Degraded(msg) => write!(f, "degraded: {}", msg),
|
|
HealthStatus::Unhealthy(msg) => write!(f, "unhealthy: {}", msg),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_health_status_display() {
|
|
assert_eq!(HealthStatus::Healthy.to_string(), "healthy");
|
|
assert_eq!(
|
|
HealthStatus::Degraded("test".to_string()).to_string(),
|
|
"degraded: test"
|
|
);
|
|
assert_eq!(
|
|
HealthStatus::Unhealthy("error".to_string()).to_string(),
|
|
"unhealthy: error"
|
|
);
|
|
}
|
|
}
|