http_request action working nicely
This commit is contained in:
@@ -407,6 +407,10 @@ pub struct SensorConfig {
|
||||
/// Sensor execution timeout in seconds
|
||||
#[serde(default = "default_sensor_timeout")]
|
||||
pub sensor_timeout: u64,
|
||||
|
||||
/// Graceful shutdown timeout in seconds
|
||||
#[serde(default = "default_sensor_shutdown_timeout")]
|
||||
pub shutdown_timeout: u64,
|
||||
}
|
||||
|
||||
fn default_sensor_poll_interval() -> u64 {
|
||||
@@ -417,6 +421,10 @@ fn default_sensor_timeout() -> u64 {
|
||||
30
|
||||
}
|
||||
|
||||
fn default_sensor_shutdown_timeout() -> u64 {
|
||||
30
|
||||
}
|
||||
|
||||
/// Pack registry index configuration
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RegistryIndexConfig {
|
||||
|
||||
@@ -112,7 +112,7 @@ impl ExecutionTimeoutMonitor {
|
||||
ORDER BY updated ASC
|
||||
LIMIT 100", // Process in batches to avoid overwhelming system
|
||||
)
|
||||
.bind("scheduled")
|
||||
.bind(ExecutionStatus::Scheduled)
|
||||
.bind(cutoff)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
@@ -186,7 +186,7 @@ impl ExecutionTimeoutMonitor {
|
||||
updated = NOW()
|
||||
WHERE id = $3",
|
||||
)
|
||||
.bind("failed")
|
||||
.bind(ExecutionStatus::Failed)
|
||||
.bind(&result)
|
||||
.bind(execution_id)
|
||||
.execute(&self.pool)
|
||||
|
||||
@@ -8,6 +8,7 @@ use anyhow::Result;
|
||||
use attune_common::config::Config;
|
||||
use attune_sensor::service::SensorService;
|
||||
use clap::Parser;
|
||||
use tokio::signal::unix::{signal, SignalKind};
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
@@ -56,32 +57,38 @@ async fn main() -> Result<()> {
|
||||
info!("Message Queue: {}", mask_connection_string(&mq_config.url));
|
||||
}
|
||||
|
||||
// Create sensor service
|
||||
// Create and start sensor service
|
||||
let service = SensorService::new(config).await?;
|
||||
|
||||
info!("Sensor Service initialized successfully");
|
||||
|
||||
// Set up graceful shutdown handler
|
||||
let service_clone = service.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = tokio::signal::ctrl_c().await {
|
||||
error!("Failed to listen for shutdown signal: {}", e);
|
||||
} else {
|
||||
info!("Shutdown signal received");
|
||||
if let Err(e) = service_clone.stop().await {
|
||||
error!("Error during shutdown: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Start the service
|
||||
// Start the service (spawns background tasks and returns)
|
||||
info!("Starting Sensor Service components...");
|
||||
if let Err(e) = service.start().await {
|
||||
error!("Sensor Service error: {}", e);
|
||||
return Err(e);
|
||||
service.start().await?;
|
||||
|
||||
info!("Attune Sensor Service is ready");
|
||||
|
||||
// Setup signal handlers for graceful shutdown
|
||||
let mut sigint = signal(SignalKind::interrupt())?;
|
||||
let mut sigterm = signal(SignalKind::terminate())?;
|
||||
|
||||
tokio::select! {
|
||||
_ = sigint.recv() => {
|
||||
info!("Received SIGINT signal");
|
||||
}
|
||||
_ = sigterm.recv() => {
|
||||
info!("Received SIGTERM signal");
|
||||
}
|
||||
}
|
||||
|
||||
info!("Sensor Service has shut down gracefully");
|
||||
info!("Shutting down gracefully...");
|
||||
|
||||
// Stop the service: deregister worker, stop sensors, clean up connections
|
||||
if let Err(e) = service.stop().await {
|
||||
error!("Error during shutdown: {}", e);
|
||||
}
|
||||
|
||||
info!("Attune Sensor Service shutdown complete");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use serde_json::Value as JsonValue;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::sensor_manager::SensorManager;
|
||||
@@ -22,6 +23,7 @@ pub struct RuleLifecycleListener {
|
||||
connection: Connection,
|
||||
sensor_manager: Arc<SensorManager>,
|
||||
consumer: Arc<RwLock<Option<Consumer>>>,
|
||||
task_handle: RwLock<Option<JoinHandle<()>>>,
|
||||
}
|
||||
|
||||
impl RuleLifecycleListener {
|
||||
@@ -32,6 +34,7 @@ impl RuleLifecycleListener {
|
||||
connection,
|
||||
sensor_manager,
|
||||
consumer: Arc::new(RwLock::new(None)),
|
||||
task_handle: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,19 +91,20 @@ impl RuleLifecycleListener {
|
||||
);
|
||||
}
|
||||
|
||||
// Store consumer
|
||||
// Store consumer reference (for cleanup on drop)
|
||||
*self.consumer.write().await = Some(consumer);
|
||||
|
||||
// Clone self for async handler
|
||||
// Clone references for the spawned task
|
||||
let db = self.db.clone();
|
||||
let sensor_manager = self.sensor_manager.clone();
|
||||
let consumer_ref = self.consumer.clone();
|
||||
|
||||
// Start consuming messages
|
||||
tokio::spawn(async move {
|
||||
// Get consumer from the Arc<RwLock<Option<Consumer>>>
|
||||
let consumer_guard = consumer_ref.read().await;
|
||||
if let Some(consumer) = consumer_guard.as_ref() {
|
||||
// Start consuming messages in a background task.
|
||||
// Take the consumer out of the Arc<RwLock> so we don't hold the read lock
|
||||
// for the entire duration of consume_with_handler (which would deadlock stop()).
|
||||
let handle = tokio::spawn(async move {
|
||||
let consumer = consumer_ref.write().await.take();
|
||||
if let Some(consumer) = consumer {
|
||||
let result = consumer
|
||||
.consume_with_handler::<JsonValue, _, _>(move |envelope| {
|
||||
let db = db.clone();
|
||||
@@ -129,6 +133,8 @@ impl RuleLifecycleListener {
|
||||
}
|
||||
});
|
||||
|
||||
*self.task_handle.write().await = Some(handle);
|
||||
|
||||
info!("Rule lifecycle listener started");
|
||||
|
||||
Ok(())
|
||||
@@ -138,8 +144,15 @@ impl RuleLifecycleListener {
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
info!("Stopping rule lifecycle listener");
|
||||
|
||||
// Abort the consumer task first — this ends the consume_with_handler loop
|
||||
// and drops the Consumer (and its channel) inside the task.
|
||||
if let Some(handle) = self.task_handle.write().await.take() {
|
||||
handle.abort();
|
||||
let _ = handle.await; // wait for abort to complete
|
||||
}
|
||||
|
||||
// Clean up any consumer that wasn't taken by the task (e.g. if task never started)
|
||||
if let Some(consumer) = self.consumer.write().await.take() {
|
||||
// Consumer will be dropped and connection closed
|
||||
drop(consumer);
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,12 @@
|
||||
//!
|
||||
//! Main service orchestrator that coordinates sensor management
|
||||
//! and rule lifecycle listening.
|
||||
//!
|
||||
//! Shutdown follows the same pattern as the worker service:
|
||||
//! 1. Deregister worker (mark inactive, stop receiving new work)
|
||||
//! 2. Stop heartbeat
|
||||
//! 3. Stop sensor processes with configurable timeout
|
||||
//! 4. Close MQ and DB connections
|
||||
|
||||
use crate::rule_lifecycle_listener::RuleLifecycleListener;
|
||||
use crate::sensor_manager::SensorManager;
|
||||
@@ -12,6 +18,7 @@ use attune_common::db::Database;
|
||||
use attune_common::mq::MessageQueue;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
@@ -29,7 +36,7 @@ struct SensorServiceInner {
|
||||
rule_lifecycle_listener: Arc<RuleLifecycleListener>,
|
||||
sensor_worker_registration: Arc<RwLock<SensorWorkerRegistration>>,
|
||||
heartbeat_interval: u64,
|
||||
running: Arc<RwLock<bool>>,
|
||||
heartbeat_running: Arc<RwLock<bool>>,
|
||||
}
|
||||
|
||||
impl SensorService {
|
||||
@@ -112,18 +119,18 @@ impl SensorService {
|
||||
rule_lifecycle_listener,
|
||||
sensor_worker_registration: Arc::new(RwLock::new(sensor_worker_registration)),
|
||||
heartbeat_interval,
|
||||
running: Arc::new(RwLock::new(false)),
|
||||
heartbeat_running: Arc::new(RwLock::new(false)),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
/// Start the sensor service
|
||||
///
|
||||
/// Spawns background tasks (heartbeat, rule listener, sensor manager) and returns.
|
||||
/// The caller is responsible for blocking on shutdown signals and calling `stop()`.
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
info!("Starting Sensor Service");
|
||||
|
||||
// Mark as running
|
||||
*self.inner.running.write().await = true;
|
||||
|
||||
// Register sensor worker
|
||||
info!("Registering sensor worker...");
|
||||
let worker_id = self
|
||||
@@ -152,37 +159,48 @@ impl SensorService {
|
||||
info!("Sensor manager started");
|
||||
|
||||
// Start heartbeat loop
|
||||
*self.inner.heartbeat_running.write().await = true;
|
||||
|
||||
let registration = self.inner.sensor_worker_registration.clone();
|
||||
let heartbeat_interval = self.inner.heartbeat_interval;
|
||||
let running = self.inner.running.clone();
|
||||
let heartbeat_running = self.inner.heartbeat_running.clone();
|
||||
tokio::spawn(async move {
|
||||
while *running.read().await {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(heartbeat_interval)).await;
|
||||
let mut ticker = tokio::time::interval(Duration::from_secs(heartbeat_interval));
|
||||
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
|
||||
if !*heartbeat_running.read().await {
|
||||
info!("Heartbeat loop stopping");
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = registration.read().await.heartbeat().await {
|
||||
error!("Failed to send sensor worker heartbeat: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
info!("Heartbeat loop stopped");
|
||||
});
|
||||
|
||||
// Wait until stopped
|
||||
while *self.inner.running.read().await {
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
|
||||
info!("Sensor Service stopped");
|
||||
info!("Sensor Service started successfully");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stop the sensor service
|
||||
/// Stop the sensor service gracefully
|
||||
///
|
||||
/// Shutdown order (mirrors worker service pattern):
|
||||
/// 1. Deregister worker (mark inactive to stop being scheduled for new work)
|
||||
/// 2. Stop heartbeat
|
||||
/// 3. Stop sensor processes with timeout
|
||||
/// 4. Stop rule lifecycle listener
|
||||
/// 5. Close MQ and DB connections
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
info!("Stopping Sensor Service");
|
||||
info!("Stopping Sensor Service - initiating graceful shutdown");
|
||||
|
||||
// Mark as not running
|
||||
*self.inner.running.write().await = false;
|
||||
|
||||
// Deregister sensor worker
|
||||
info!("Deregistering sensor worker...");
|
||||
// 1. Deregister sensor worker first to stop receiving new work
|
||||
info!("Marking sensor worker as inactive to stop receiving new work");
|
||||
if let Err(e) = self
|
||||
.inner
|
||||
.sensor_worker_registration
|
||||
@@ -194,25 +212,51 @@ impl SensorService {
|
||||
error!("Failed to deregister sensor worker: {}", e);
|
||||
}
|
||||
|
||||
// Stop rule lifecycle listener
|
||||
// 2. Stop heartbeat
|
||||
info!("Stopping heartbeat updates");
|
||||
*self.inner.heartbeat_running.write().await = false;
|
||||
|
||||
// Wait a bit for heartbeat loop to notice the flag
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// 3. Stop sensor processes with timeout
|
||||
let shutdown_timeout = self
|
||||
.inner
|
||||
.config
|
||||
.sensor
|
||||
.as_ref()
|
||||
.map(|s| s.shutdown_timeout)
|
||||
.unwrap_or(30);
|
||||
|
||||
info!(
|
||||
"Waiting up to {} seconds for sensor processes to stop",
|
||||
shutdown_timeout
|
||||
);
|
||||
|
||||
let sensor_manager = self.inner.sensor_manager.clone();
|
||||
let timeout_duration = Duration::from_secs(shutdown_timeout);
|
||||
match tokio::time::timeout(timeout_duration, sensor_manager.stop()).await {
|
||||
Ok(Ok(_)) => info!("All sensor processes stopped"),
|
||||
Ok(Err(e)) => error!("Error stopping sensor processes: {}", e),
|
||||
Err(_) => warn!(
|
||||
"Shutdown timeout reached ({} seconds) - some sensor processes may have been interrupted",
|
||||
shutdown_timeout
|
||||
),
|
||||
}
|
||||
|
||||
// 4. Stop rule lifecycle listener
|
||||
info!("Stopping rule lifecycle listener...");
|
||||
if let Err(e) = self.inner.rule_lifecycle_listener.stop().await {
|
||||
error!("Failed to stop rule lifecycle listener: {}", e);
|
||||
}
|
||||
|
||||
// Stop sensor manager
|
||||
info!("Stopping sensor manager...");
|
||||
if let Err(e) = self.inner.sensor_manager.stop().await {
|
||||
error!("Failed to stop sensor manager: {}", e);
|
||||
}
|
||||
|
||||
// Close message queue connection
|
||||
// 5. Close message queue connection
|
||||
info!("Closing message queue connection...");
|
||||
if let Err(e) = self.inner.mq.close().await {
|
||||
warn!("Error closing message queue: {}", e);
|
||||
}
|
||||
|
||||
// Close database connection
|
||||
// 6. Close database connection
|
||||
info!("Closing database connection...");
|
||||
self.inner.db.close().await;
|
||||
|
||||
@@ -221,11 +265,6 @@ impl SensorService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if service is running
|
||||
pub async fn is_running(&self) -> bool {
|
||||
*self.inner.running.read().await
|
||||
}
|
||||
|
||||
/// Get database pool
|
||||
pub fn db(&self) -> &PgPool {
|
||||
&self.inner.db
|
||||
@@ -243,11 +282,6 @@ impl SensorService {
|
||||
|
||||
/// Get health status
|
||||
pub async fn health_check(&self) -> HealthStatus {
|
||||
// Check if service is running
|
||||
if !*self.inner.running.read().await {
|
||||
return HealthStatus::Unhealthy("Service not running".to_string());
|
||||
}
|
||||
|
||||
// Check database connection
|
||||
if let Err(e) = sqlx::query("SELECT 1").execute(&self.inner.db).await {
|
||||
return HealthStatus::Unhealthy(format!("Database connection failed: {}", e));
|
||||
|
||||
@@ -123,7 +123,7 @@ impl Runtime for LocalRuntime {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::runtime::{ParameterDelivery, ParameterFormat};
|
||||
use crate::runtime::{OutputFormat, ParameterDelivery, ParameterFormat};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -339,13 +339,15 @@ if __name__ == '__main__':
|
||||
None
|
||||
}
|
||||
OutputFormat::Json => {
|
||||
// Try to parse last line of stdout as JSON
|
||||
stdout_result
|
||||
.content
|
||||
.trim()
|
||||
.lines()
|
||||
.last()
|
||||
.and_then(|line| serde_json::from_str(line).ok())
|
||||
// Try to parse full stdout as JSON first (handles multi-line JSON),
|
||||
// then fall back to last line only (for scripts that log before output)
|
||||
let trimmed = stdout_result.content.trim();
|
||||
serde_json::from_str(trimmed).ok().or_else(|| {
|
||||
trimmed
|
||||
.lines()
|
||||
.last()
|
||||
.and_then(|line| serde_json::from_str(line).ok())
|
||||
})
|
||||
}
|
||||
OutputFormat::Yaml => {
|
||||
// Try to parse stdout as YAML
|
||||
|
||||
@@ -208,13 +208,15 @@ impl ShellRuntime {
|
||||
None
|
||||
}
|
||||
OutputFormat::Json => {
|
||||
// Try to parse last line of stdout as JSON
|
||||
stdout_result
|
||||
.content
|
||||
.trim()
|
||||
.lines()
|
||||
.last()
|
||||
.and_then(|line| serde_json::from_str(line).ok())
|
||||
// Try to parse full stdout as JSON first (handles multi-line JSON),
|
||||
// then fall back to last line only (for scripts that log before output)
|
||||
let trimmed = stdout_result.content.trim();
|
||||
serde_json::from_str(trimmed).ok().or_else(|| {
|
||||
trimmed
|
||||
.lines()
|
||||
.last()
|
||||
.and_then(|line| serde_json::from_str(line).ok())
|
||||
})
|
||||
}
|
||||
OutputFormat::Yaml => {
|
||||
// Try to parse stdout as YAML
|
||||
@@ -823,4 +825,97 @@ echo '{"id": 3, "name": "Charlie"}'
|
||||
assert_eq!(items[2]["id"], 3);
|
||||
assert_eq!(items[2]["name"], "Charlie");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shell_runtime_multiline_json_output() {
|
||||
// Regression test: scripts that embed pretty-printed JSON (e.g., http_request.sh
|
||||
// embedding a multi-line response body in its "json" field) produce multi-line
|
||||
// stdout. The parser must handle this by trying to parse the full stdout as JSON
|
||||
// before falling back to last-line parsing.
|
||||
let runtime = ShellRuntime::new();
|
||||
|
||||
let context = ExecutionContext {
|
||||
execution_id: 7,
|
||||
action_ref: "test.multiline_json".to_string(),
|
||||
parameters: HashMap::new(),
|
||||
env: HashMap::new(),
|
||||
secrets: HashMap::new(),
|
||||
timeout: Some(10),
|
||||
working_dir: None,
|
||||
entry_point: "shell".to_string(),
|
||||
code: Some(
|
||||
r#"
|
||||
# Simulate http_request.sh output with embedded pretty-printed JSON
|
||||
printf '{"status_code":200,"body":"hello","json":{\n "args": {\n "hello": "world"\n },\n "url": "https://example.com"\n},"success":true}\n'
|
||||
"#
|
||||
.to_string(),
|
||||
),
|
||||
code_path: None,
|
||||
runtime_name: Some("shell".to_string()),
|
||||
max_stdout_bytes: 10 * 1024 * 1024,
|
||||
max_stderr_bytes: 10 * 1024 * 1024,
|
||||
parameter_delivery: attune_common::models::ParameterDelivery::default(),
|
||||
parameter_format: attune_common::models::ParameterFormat::default(),
|
||||
output_format: attune_common::models::OutputFormat::Json,
|
||||
};
|
||||
|
||||
let result = runtime.execute(context).await.unwrap();
|
||||
assert!(result.is_success());
|
||||
assert_eq!(result.exit_code, 0);
|
||||
|
||||
// Verify result was parsed (not stored as raw stdout)
|
||||
let parsed = result
|
||||
.result
|
||||
.expect("Multi-line JSON should be parsed successfully");
|
||||
assert_eq!(parsed["status_code"], 200);
|
||||
assert_eq!(parsed["success"], true);
|
||||
assert_eq!(parsed["json"]["args"]["hello"], "world");
|
||||
|
||||
// stdout should be empty when result is successfully parsed
|
||||
assert!(
|
||||
result.stdout.is_empty(),
|
||||
"stdout should be empty when result is parsed, got: {}",
|
||||
result.stdout
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shell_runtime_json_with_log_prefix() {
|
||||
// Verify last-line fallback still works: scripts that log to stdout
|
||||
// before the final JSON line should still parse correctly.
|
||||
let runtime = ShellRuntime::new();
|
||||
|
||||
let context = ExecutionContext {
|
||||
execution_id: 8,
|
||||
action_ref: "test.json_with_logs".to_string(),
|
||||
parameters: HashMap::new(),
|
||||
env: HashMap::new(),
|
||||
secrets: HashMap::new(),
|
||||
timeout: Some(10),
|
||||
working_dir: None,
|
||||
entry_point: "shell".to_string(),
|
||||
code: Some(
|
||||
r#"
|
||||
echo "Starting action..."
|
||||
echo "Processing data..."
|
||||
echo '{"result": "success", "count": 42}'
|
||||
"#
|
||||
.to_string(),
|
||||
),
|
||||
code_path: None,
|
||||
runtime_name: Some("shell".to_string()),
|
||||
max_stdout_bytes: 10 * 1024 * 1024,
|
||||
max_stderr_bytes: 10 * 1024 * 1024,
|
||||
parameter_delivery: attune_common::models::ParameterDelivery::default(),
|
||||
parameter_format: attune_common::models::ParameterFormat::default(),
|
||||
output_format: attune_common::models::OutputFormat::Json,
|
||||
};
|
||||
|
||||
let result = runtime.execute(context).await.unwrap();
|
||||
assert!(result.is_success());
|
||||
|
||||
let parsed = result.result.expect("Last-line JSON should be parsed");
|
||||
assert_eq!(parsed["result"], "success");
|
||||
assert_eq!(parsed["count"], 42);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use crate::artifacts::ArtifactManager;
|
||||
@@ -51,6 +52,7 @@ pub struct WorkerService {
|
||||
mq_connection: Arc<Connection>,
|
||||
publisher: Arc<Publisher>,
|
||||
consumer: Option<Arc<Consumer>>,
|
||||
consumer_handle: Option<JoinHandle<()>>,
|
||||
worker_id: Option<i64>,
|
||||
}
|
||||
|
||||
@@ -266,6 +268,7 @@ impl WorkerService {
|
||||
mq_connection: Arc::new(mq_connection),
|
||||
publisher: Arc::new(publisher),
|
||||
consumer: None,
|
||||
consumer_handle: None,
|
||||
worker_id: None,
|
||||
})
|
||||
}
|
||||
@@ -305,25 +308,35 @@ impl WorkerService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stop the worker service
|
||||
/// Stop the worker service gracefully
|
||||
///
|
||||
/// Shutdown order (mirrors sensor service pattern):
|
||||
/// 1. Deregister worker (mark inactive to stop receiving new work)
|
||||
/// 2. Stop heartbeat
|
||||
/// 3. Wait for in-flight tasks with timeout
|
||||
/// 4. Close MQ connection
|
||||
/// 5. Close DB connection
|
||||
pub async fn stop(&mut self) -> Result<()> {
|
||||
info!("Stopping Worker Service - initiating graceful shutdown");
|
||||
|
||||
// Mark worker as inactive first to stop receiving new tasks
|
||||
// 1. Mark worker as inactive first to stop receiving new tasks
|
||||
// Use if-let instead of ? so shutdown continues even if DB call fails
|
||||
{
|
||||
let reg = self.registration.read().await;
|
||||
info!("Marking worker as inactive to stop receiving new tasks");
|
||||
reg.deregister().await?;
|
||||
if let Err(e) = reg.deregister().await {
|
||||
error!("Failed to deregister worker: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Stop heartbeat
|
||||
// 2. Stop heartbeat
|
||||
info!("Stopping heartbeat updates");
|
||||
self.heartbeat.stop().await;
|
||||
|
||||
// Wait a bit for heartbeat to stop
|
||||
// Wait a bit for heartbeat loop to notice the flag
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Wait for in-flight tasks to complete (with timeout)
|
||||
// 3. Wait for in-flight tasks to complete (with timeout)
|
||||
let shutdown_timeout = self
|
||||
.config
|
||||
.worker
|
||||
@@ -342,6 +355,23 @@ impl WorkerService {
|
||||
Err(_) => warn!("Shutdown timeout reached - some tasks may have been interrupted"),
|
||||
}
|
||||
|
||||
// 4. Abort consumer task and close message queue connection
|
||||
if let Some(handle) = self.consumer_handle.take() {
|
||||
info!("Stopping consumer task...");
|
||||
handle.abort();
|
||||
// Wait briefly for the task to finish
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
info!("Closing message queue connection...");
|
||||
if let Err(e) = self.mq_connection.close().await {
|
||||
warn!("Error closing message queue: {}", e);
|
||||
}
|
||||
|
||||
// 5. Close database connection
|
||||
info!("Closing database connection...");
|
||||
self.db_pool.close().await;
|
||||
|
||||
info!("Worker Service stopped");
|
||||
|
||||
Ok(())
|
||||
@@ -364,6 +394,9 @@ impl WorkerService {
|
||||
}
|
||||
|
||||
/// Start consuming execution.scheduled messages
|
||||
///
|
||||
/// Spawns the consumer loop as a background task so that `start()` returns
|
||||
/// immediately, allowing the caller to set up signal handlers.
|
||||
async fn start_execution_consumer(&mut self) -> Result<()> {
|
||||
let worker_id = self
|
||||
.worker_id
|
||||
@@ -375,48 +408,63 @@ impl WorkerService {
|
||||
info!("Starting consumer for worker queue: {}", queue_name);
|
||||
|
||||
// Create consumer
|
||||
let consumer = Consumer::new(
|
||||
&self.mq_connection,
|
||||
ConsumerConfig {
|
||||
queue: queue_name.clone(),
|
||||
tag: format!("worker-{}", worker_id),
|
||||
prefetch_count: 10,
|
||||
auto_ack: false,
|
||||
exclusive: false,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Error::Internal(format!("Failed to create consumer: {}", e)))?;
|
||||
|
||||
info!("Consumer started for queue: {}", queue_name);
|
||||
|
||||
info!("Message queue consumer initialized");
|
||||
|
||||
// Clone Arc references for the handler
|
||||
let executor = self.executor.clone();
|
||||
let publisher = self.publisher.clone();
|
||||
let db_pool = self.db_pool.clone();
|
||||
|
||||
// Consume messages with handler
|
||||
consumer
|
||||
.consume_with_handler(
|
||||
move |envelope: MessageEnvelope<ExecutionScheduledPayload>| {
|
||||
let executor = executor.clone();
|
||||
let publisher = publisher.clone();
|
||||
let db_pool = db_pool.clone();
|
||||
|
||||
async move {
|
||||
Self::handle_execution_scheduled(executor, publisher, db_pool, envelope)
|
||||
.await
|
||||
.map_err(|e| format!("Execution handler error: {}", e).into())
|
||||
}
|
||||
let consumer = Arc::new(
|
||||
Consumer::new(
|
||||
&self.mq_connection,
|
||||
ConsumerConfig {
|
||||
queue: queue_name.clone(),
|
||||
tag: format!("worker-{}", worker_id),
|
||||
prefetch_count: 10,
|
||||
auto_ack: false,
|
||||
exclusive: false,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Error::Internal(format!("Failed to start consumer: {}", e)))?;
|
||||
.map_err(|e| Error::Internal(format!("Failed to create consumer: {}", e)))?,
|
||||
);
|
||||
|
||||
// Store consumer reference
|
||||
self.consumer = Some(Arc::new(consumer));
|
||||
info!("Consumer created for queue: {}", queue_name);
|
||||
|
||||
// Clone Arc references for the spawned task
|
||||
let executor = self.executor.clone();
|
||||
let publisher = self.publisher.clone();
|
||||
let db_pool = self.db_pool.clone();
|
||||
let consumer_for_task = consumer.clone();
|
||||
let queue_name_for_log = queue_name.clone();
|
||||
|
||||
// Spawn the consumer loop as a background task so start() can return
|
||||
let handle = tokio::spawn(async move {
|
||||
info!("Consumer loop started for queue '{}'", queue_name_for_log);
|
||||
let result = consumer_for_task
|
||||
.consume_with_handler(
|
||||
move |envelope: MessageEnvelope<ExecutionScheduledPayload>| {
|
||||
let executor = executor.clone();
|
||||
let publisher = publisher.clone();
|
||||
let db_pool = db_pool.clone();
|
||||
|
||||
async move {
|
||||
Self::handle_execution_scheduled(executor, publisher, db_pool, envelope)
|
||||
.await
|
||||
.map_err(|e| format!("Execution handler error: {}", e).into())
|
||||
}
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(()) => info!("Consumer loop for queue '{}' ended", queue_name_for_log),
|
||||
Err(e) => error!(
|
||||
"Consumer loop for queue '{}' failed: {}",
|
||||
queue_name_for_log, e
|
||||
),
|
||||
}
|
||||
});
|
||||
|
||||
// Store consumer reference and task handle
|
||||
self.consumer = Some(consumer);
|
||||
self.consumer_handle = Some(handle);
|
||||
|
||||
info!("Message queue consumer initialized");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user