migration reorg basically done
This commit is contained in:
@@ -75,6 +75,21 @@ async fn main() -> Result<()> {
|
||||
Ok(mq_connection) => {
|
||||
info!("Message queue connection established");
|
||||
|
||||
// Setup common message queue infrastructure (exchanges and DLX)
|
||||
let mq_setup_config = attune_common::mq::MessageQueueConfig::default();
|
||||
match mq_connection
|
||||
.setup_common_infrastructure(&mq_setup_config)
|
||||
.await
|
||||
{
|
||||
Ok(_) => info!("Common message queue infrastructure setup completed"),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to setup common MQ infrastructure (may already exist): {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Create publisher
|
||||
match Publisher::new(
|
||||
&mq_connection,
|
||||
|
||||
@@ -12,7 +12,7 @@ mod helpers;
|
||||
use attune_common::{
|
||||
models::Pack,
|
||||
pack_registry::calculate_directory_checksum,
|
||||
repositories::{pack::PackRepository, List},
|
||||
repositories::{pack::PackRepository, FindById, List},
|
||||
};
|
||||
use helpers::{Result, TestContext};
|
||||
use serde_json::json;
|
||||
|
||||
@@ -135,19 +135,14 @@ pub async fn mock_login_failure(server: &MockServer) {
|
||||
|
||||
/// Mock a whoami response
|
||||
#[allow(dead_code)]
|
||||
pub async fn mock_whoami_success(server: &MockServer, username: &str, email: &str) {
|
||||
pub async fn mock_whoami_success(server: &MockServer, username: &str, display_name: &str) {
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/auth/whoami"))
|
||||
.and(path("/auth/me"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"data": {
|
||||
"id": 1,
|
||||
"name": "Test User",
|
||||
"username": username,
|
||||
"email": email,
|
||||
"identity_type": "user",
|
||||
"enabled": true,
|
||||
"created": "2024-01-01T00:00:00Z",
|
||||
"updated": "2024-01-01T00:00:00Z"
|
||||
"login": username,
|
||||
"display_name": display_name
|
||||
}
|
||||
})))
|
||||
.mount(server)
|
||||
|
||||
@@ -75,7 +75,7 @@ async fn test_whoami_authenticated() {
|
||||
fixture.write_authenticated_config("valid_token", "refresh_token");
|
||||
|
||||
// Mock whoami endpoint
|
||||
mock_whoami_success(&fixture.mock_server, "testuser", "test@example.com").await;
|
||||
mock_whoami_success(&fixture.mock_server, "testuser", "Test User").await;
|
||||
|
||||
let mut cmd = Command::cargo_bin("attune").unwrap();
|
||||
cmd.env("XDG_CONFIG_HOME", fixture.config_dir_path())
|
||||
@@ -88,7 +88,7 @@ async fn test_whoami_authenticated() {
|
||||
cmd.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains("testuser"))
|
||||
.stdout(predicate::str::contains("test@example.com"));
|
||||
.stdout(predicate::str::contains("Test User"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -97,7 +97,7 @@ async fn test_whoami_unauthenticated() {
|
||||
fixture.write_default_config();
|
||||
|
||||
// Mock unauthorized response
|
||||
mock_unauthorized(&fixture.mock_server, "/auth/whoami").await;
|
||||
mock_unauthorized(&fixture.mock_server, "/auth/me").await;
|
||||
|
||||
let mut cmd = Command::cargo_bin("attune").unwrap();
|
||||
cmd.env("XDG_CONFIG_HOME", fixture.config_dir_path())
|
||||
@@ -185,7 +185,7 @@ async fn test_whoami_json_output() {
|
||||
fixture.write_authenticated_config("valid_token", "refresh_token");
|
||||
|
||||
// Mock whoami endpoint
|
||||
mock_whoami_success(&fixture.mock_server, "testuser", "test@example.com").await;
|
||||
mock_whoami_success(&fixture.mock_server, "testuser", "Test User").await;
|
||||
|
||||
let mut cmd = Command::cargo_bin("attune").unwrap();
|
||||
cmd.env("XDG_CONFIG_HOME", fixture.config_dir_path())
|
||||
@@ -198,7 +198,7 @@ async fn test_whoami_json_output() {
|
||||
|
||||
cmd.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains(r#""username":"#))
|
||||
.stdout(predicate::str::contains(r#""login":"#))
|
||||
.stdout(predicate::str::contains("testuser"));
|
||||
}
|
||||
|
||||
@@ -208,7 +208,7 @@ async fn test_whoami_yaml_output() {
|
||||
fixture.write_authenticated_config("valid_token", "refresh_token");
|
||||
|
||||
// Mock whoami endpoint
|
||||
mock_whoami_success(&fixture.mock_server, "testuser", "test@example.com").await;
|
||||
mock_whoami_success(&fixture.mock_server, "testuser", "Test User").await;
|
||||
|
||||
let mut cmd = Command::cargo_bin("attune").unwrap();
|
||||
cmd.env("XDG_CONFIG_HOME", fixture.config_dir_path())
|
||||
@@ -221,6 +221,6 @@ async fn test_whoami_yaml_output() {
|
||||
|
||||
cmd.assert()
|
||||
.success()
|
||||
.stdout(predicate::str::contains("username:"))
|
||||
.stdout(predicate::str::contains("login:"))
|
||||
.stdout(predicate::str::contains("testuser"));
|
||||
}
|
||||
|
||||
@@ -30,15 +30,6 @@ impl Database {
|
||||
Self::validate_schema_name(&schema)?;
|
||||
|
||||
// Log schema configuration prominently
|
||||
if schema != "attune" {
|
||||
warn!(
|
||||
"Using non-standard schema: '{}'. Production should use 'attune'",
|
||||
schema
|
||||
);
|
||||
} else {
|
||||
info!("Using production schema: {}", schema);
|
||||
}
|
||||
|
||||
info!(
|
||||
"Connecting to database with max_connections={}, schema={}",
|
||||
config.max_connections, schema
|
||||
|
||||
@@ -314,9 +314,15 @@ impl Connection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Setup complete infrastructure (exchanges, queues, bindings)
|
||||
pub async fn setup_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> {
|
||||
info!("Setting up RabbitMQ infrastructure");
|
||||
/// Setup common infrastructure (exchanges, DLX) - safe to call from any service
|
||||
///
|
||||
/// This sets up the shared infrastructure that all services need:
|
||||
/// - All exchanges (events, executions, notifications)
|
||||
/// - Dead letter exchange (if enabled)
|
||||
///
|
||||
/// This is idempotent and can be called by multiple services safely.
|
||||
pub async fn setup_common_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> {
|
||||
info!("Setting up common RabbitMQ infrastructure (exchanges and DLX)");
|
||||
|
||||
// Declare exchanges
|
||||
self.declare_exchange(&config.rabbitmq.exchanges.events)
|
||||
@@ -335,83 +341,63 @@ impl Connection {
|
||||
auto_delete: false,
|
||||
};
|
||||
self.declare_exchange(&dlx_config).await?;
|
||||
|
||||
// Declare dead letter queue (derive name from exchange)
|
||||
let dlq_name = format!("{}.queue", config.rabbitmq.dead_letter.exchange);
|
||||
let dlq_config = QueueConfig {
|
||||
name: dlq_name.clone(),
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
auto_delete: false,
|
||||
};
|
||||
self.declare_queue(&dlq_config).await?;
|
||||
|
||||
// Bind DLQ to DLX
|
||||
self.bind_queue(&dlq_name, &config.rabbitmq.dead_letter.exchange, "#")
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Declare queues with or without DLX
|
||||
let dlx_exchange = if config.rabbitmq.dead_letter.enabled {
|
||||
info!("Common RabbitMQ infrastructure setup complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Setup executor-specific queues and bindings
|
||||
pub async fn setup_executor_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> {
|
||||
info!("Setting up Executor infrastructure");
|
||||
|
||||
let dlx = if config.rabbitmq.dead_letter.enabled {
|
||||
Some(config.rabbitmq.dead_letter.exchange.as_str())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(dlx) = dlx_exchange {
|
||||
self.declare_queue_with_dlx(&config.rabbitmq.queues.events, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_dlx(&config.rabbitmq.queues.executions, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_dlx(&config.rabbitmq.queues.enforcements, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_dlx(&config.rabbitmq.queues.execution_requests, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_dlx(&config.rabbitmq.queues.execution_status, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_dlx(&config.rabbitmq.queues.execution_completed, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_dlx(&config.rabbitmq.queues.inquiry_responses, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_dlx(&config.rabbitmq.queues.notifications, dlx)
|
||||
.await?;
|
||||
} else {
|
||||
self.declare_queue(&config.rabbitmq.queues.events).await?;
|
||||
self.declare_queue(&config.rabbitmq.queues.executions)
|
||||
.await?;
|
||||
self.declare_queue(&config.rabbitmq.queues.enforcements)
|
||||
.await?;
|
||||
self.declare_queue(&config.rabbitmq.queues.execution_requests)
|
||||
.await?;
|
||||
self.declare_queue(&config.rabbitmq.queues.execution_status)
|
||||
.await?;
|
||||
self.declare_queue(&config.rabbitmq.queues.execution_completed)
|
||||
.await?;
|
||||
self.declare_queue(&config.rabbitmq.queues.inquiry_responses)
|
||||
.await?;
|
||||
self.declare_queue(&config.rabbitmq.queues.notifications)
|
||||
.await?;
|
||||
}
|
||||
// Declare executor queues
|
||||
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.enforcements, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.execution_requests, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.execution_status, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.execution_completed, dlx)
|
||||
.await?;
|
||||
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.inquiry_responses, dlx)
|
||||
.await?;
|
||||
|
||||
// Bind queues to exchanges
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.events.name,
|
||||
&config.rabbitmq.exchanges.events.name,
|
||||
"#", // All events (topic exchange)
|
||||
)
|
||||
.await?;
|
||||
|
||||
// LEGACY BINDING DISABLED: This was causing all messages to go to the legacy queue
|
||||
// instead of being routed to the new specific queues (execution_requests, enforcements, etc.)
|
||||
// self.bind_queue(
|
||||
// &config.rabbitmq.queues.executions.name,
|
||||
// &config.rabbitmq.exchanges.executions.name,
|
||||
// "#", // All execution-related messages (topic exchange) - legacy, to be deprecated
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
// Bind new executor-specific queues
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.enforcements.name,
|
||||
&config.rabbitmq.exchanges.executions.name,
|
||||
"enforcement.#", // Enforcement messages
|
||||
"enforcement.#",
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.execution_requests.name,
|
||||
&config.rabbitmq.exchanges.executions.name,
|
||||
"execution.requested", // Execution request messages
|
||||
"execution.requested",
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Bind execution_status queue to status changed messages for ExecutionManager
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.execution_status.name,
|
||||
&config.rabbitmq.exchanges.executions.name,
|
||||
@@ -419,7 +405,6 @@ impl Connection {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Bind execution_completed queue to completed messages for CompletionListener
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.execution_completed.name,
|
||||
&config.rabbitmq.exchanges.executions.name,
|
||||
@@ -427,7 +412,6 @@ impl Connection {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Bind inquiry_responses queue to inquiry responded messages for InquiryHandler
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.inquiry_responses.name,
|
||||
&config.rabbitmq.exchanges.executions.name,
|
||||
@@ -435,16 +419,115 @@ impl Connection {
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("Executor infrastructure setup complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Setup worker-specific queue for a worker instance
|
||||
pub async fn setup_worker_infrastructure(
|
||||
&self,
|
||||
worker_id: i64,
|
||||
config: &MessageQueueConfig,
|
||||
) -> MqResult<()> {
|
||||
info!(
|
||||
"Setting up Worker infrastructure for worker ID {}",
|
||||
worker_id
|
||||
);
|
||||
|
||||
let queue_name = format!("worker.{}.executions", worker_id);
|
||||
let queue_config = QueueConfig {
|
||||
name: queue_name.clone(),
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
auto_delete: false,
|
||||
};
|
||||
|
||||
let dlx = if config.rabbitmq.dead_letter.enabled {
|
||||
Some(config.rabbitmq.dead_letter.exchange.as_str())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.declare_queue_with_optional_dlx(&queue_config, dlx)
|
||||
.await?;
|
||||
|
||||
// Bind to execution dispatch routing key
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.notifications.name,
|
||||
&config.rabbitmq.exchanges.notifications.name,
|
||||
"", // Fanout doesn't use routing key
|
||||
&queue_name,
|
||||
&config.rabbitmq.exchanges.executions.name,
|
||||
&format!("execution.dispatch.worker.{}", worker_id),
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("RabbitMQ infrastructure setup complete");
|
||||
info!(
|
||||
"Worker infrastructure setup complete for worker ID {}",
|
||||
worker_id
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Setup sensor-specific queues and bindings
|
||||
pub async fn setup_sensor_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> {
|
||||
info!("Setting up Sensor infrastructure");
|
||||
|
||||
let dlx = if config.rabbitmq.dead_letter.enabled {
|
||||
Some(config.rabbitmq.dead_letter.exchange.as_str())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.events, dlx)
|
||||
.await?;
|
||||
|
||||
// Bind to all events
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.events.name,
|
||||
&config.rabbitmq.exchanges.events.name,
|
||||
"#",
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("Sensor infrastructure setup complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Setup notifier-specific queues and bindings
|
||||
pub async fn setup_notifier_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> {
|
||||
info!("Setting up Notifier infrastructure");
|
||||
|
||||
let dlx = if config.rabbitmq.dead_letter.enabled {
|
||||
Some(config.rabbitmq.dead_letter.exchange.as_str())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.notifications, dlx)
|
||||
.await?;
|
||||
|
||||
// Bind to notifications exchange (fanout, no routing key)
|
||||
self.bind_queue(
|
||||
&config.rabbitmq.queues.notifications.name,
|
||||
&config.rabbitmq.exchanges.notifications.name,
|
||||
"",
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!("Notifier infrastructure setup complete");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper to declare queue with optional DLX
|
||||
async fn declare_queue_with_optional_dlx(
|
||||
&self,
|
||||
config: &QueueConfig,
|
||||
dlx: Option<&str>,
|
||||
) -> MqResult<()> {
|
||||
if let Some(dlx_exchange) = dlx {
|
||||
self.declare_queue_with_dlx(config, dlx_exchange).await
|
||||
} else {
|
||||
self.declare_queue(config).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Connection pool for managing multiple RabbitMQ connections
|
||||
|
||||
@@ -240,7 +240,7 @@ impl Update for ActionRepository {
|
||||
|
||||
query.push(", updated = NOW() WHERE id = ");
|
||||
query.push_bind(id);
|
||||
query.push(" RETURNING id, ref, pack, pack_ref, label, description, entrypoint, runtime, param_schema, out_schema, is_workflow, workflow_def, created, updated");
|
||||
query.push(" RETURNING id, ref, pack, pack_ref, label, description, entrypoint, runtime, param_schema, out_schema, is_workflow, workflow_def, is_adhoc, created, updated");
|
||||
|
||||
let action = query
|
||||
.build_query_as::<Action>()
|
||||
|
||||
@@ -126,7 +126,11 @@ impl RuleLifecycleListener {
|
||||
for rule in rules {
|
||||
if rule.enabled {
|
||||
if let Err(e) = self
|
||||
.start_timer_from_params(rule.id, Some(rule.trigger_params))
|
||||
.start_timer_from_params(
|
||||
rule.id,
|
||||
"core.intervaltimer",
|
||||
Some(rule.trigger_params),
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("Failed to start timer for rule {}: {}", rule.id, e);
|
||||
@@ -232,7 +236,7 @@ impl RuleLifecycleListener {
|
||||
);
|
||||
|
||||
if enabled {
|
||||
self.start_timer_from_params(rule_id, trigger_params)
|
||||
self.start_timer_from_params(rule_id, &trigger_type, trigger_params)
|
||||
.await?;
|
||||
} else {
|
||||
info!("Rule {} is disabled, not starting timer", rule_id);
|
||||
@@ -241,6 +245,7 @@ impl RuleLifecycleListener {
|
||||
RuleLifecycleEvent::RuleEnabled {
|
||||
rule_id,
|
||||
rule_ref,
|
||||
trigger_type,
|
||||
trigger_params,
|
||||
..
|
||||
} => {
|
||||
@@ -249,7 +254,7 @@ impl RuleLifecycleListener {
|
||||
rule_id, rule_ref
|
||||
);
|
||||
|
||||
self.start_timer_from_params(rule_id, trigger_params)
|
||||
self.start_timer_from_params(rule_id, &trigger_type, trigger_params)
|
||||
.await?;
|
||||
}
|
||||
RuleLifecycleEvent::RuleDisabled {
|
||||
@@ -281,13 +286,21 @@ impl RuleLifecycleListener {
|
||||
async fn start_timer_from_params(
|
||||
&self,
|
||||
rule_id: i64,
|
||||
trigger_ref: &str,
|
||||
trigger_params: Option<JsonValue>,
|
||||
) -> Result<()> {
|
||||
let params = trigger_params.ok_or_else(|| {
|
||||
anyhow::anyhow!("Timer trigger requires trigger_params but none provided")
|
||||
})?;
|
||||
|
||||
let config: TimerConfig = serde_json::from_value(params)
|
||||
info!(
|
||||
"Parsing timer config for rule {}: trigger_ref='{}', params={}",
|
||||
rule_id,
|
||||
trigger_ref,
|
||||
serde_json::to_string(¶ms).unwrap_or_else(|_| "<invalid json>".to_string())
|
||||
);
|
||||
|
||||
let config = TimerConfig::from_trigger_params(trigger_ref, params)
|
||||
.context("Failed to parse trigger_params as TimerConfig")?;
|
||||
|
||||
info!(
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
//! Shared types for timer sensor
|
||||
//!
|
||||
//! Defines timer configurations and common data structures.
|
||||
//! Updated: 2026-02-05 - Fixed TimerConfig parsing to use trigger_ref
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Timer configuration for different timer types
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
#[serde(untagged)]
|
||||
pub enum TimerConfig {
|
||||
/// Interval-based timer (fires every N seconds/minutes/hours)
|
||||
Interval {
|
||||
@@ -44,6 +45,75 @@ pub enum TimeUnit {
|
||||
}
|
||||
|
||||
impl TimerConfig {
|
||||
/// Deserialize TimerConfig from JSON value based on trigger_ref
|
||||
///
|
||||
/// Maps trigger_ref to the appropriate TimerConfig variant:
|
||||
/// - "core.intervaltimer" -> TimerConfig::Interval
|
||||
/// - "core.crontimer" -> TimerConfig::Cron
|
||||
/// - "core.datetimetimer" -> TimerConfig::DateTime
|
||||
pub fn from_trigger_params(
|
||||
trigger_ref: &str,
|
||||
params: serde_json::Value,
|
||||
) -> anyhow::Result<Self> {
|
||||
match trigger_ref {
|
||||
"core.intervaltimer" => {
|
||||
// Parse interval and unit from params
|
||||
let interval =
|
||||
params
|
||||
.get("interval")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"Missing or invalid 'interval' field in params: {}",
|
||||
serde_json::to_string(¶ms)
|
||||
.unwrap_or_else(|_| "<invalid>".to_string())
|
||||
)
|
||||
})?;
|
||||
|
||||
let unit = if let Some(unit_val) = params.get("unit") {
|
||||
serde_json::from_value(unit_val.clone())
|
||||
.map_err(|e| anyhow::anyhow!("Failed to parse 'unit' field: {}", e))?
|
||||
} else {
|
||||
TimeUnit::Seconds
|
||||
};
|
||||
|
||||
Ok(TimerConfig::Interval { interval, unit })
|
||||
}
|
||||
"core.crontimer" => {
|
||||
let expression = params
|
||||
.get("expression")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"Missing or invalid 'expression' field in params: {}",
|
||||
serde_json::to_string(¶ms)
|
||||
.unwrap_or_else(|_| "<invalid>".to_string())
|
||||
)
|
||||
})?
|
||||
.to_string();
|
||||
|
||||
Ok(TimerConfig::Cron { expression })
|
||||
}
|
||||
"core.datetimetimer" => {
|
||||
let fire_at = params.get("fire_at").ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"Missing 'fire_at' field in params: {}",
|
||||
serde_json::to_string(¶ms).unwrap_or_else(|_| "<invalid>".to_string())
|
||||
)
|
||||
})?;
|
||||
|
||||
let fire_at: DateTime<Utc> = serde_json::from_value(fire_at.clone())
|
||||
.map_err(|e| anyhow::anyhow!("Failed to parse 'fire_at' as DateTime: {}", e))?;
|
||||
|
||||
Ok(TimerConfig::DateTime { fire_at })
|
||||
}
|
||||
_ => Err(anyhow::anyhow!(
|
||||
"Unknown timer trigger type: {}",
|
||||
trigger_ref
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate total interval in seconds
|
||||
#[allow(dead_code)]
|
||||
pub fn interval_seconds(&self) -> Option<u64> {
|
||||
@@ -204,39 +274,57 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timer_config_deserialization_interval() {
|
||||
let json = r#"{
|
||||
"type": "interval",
|
||||
fn test_timer_config_from_trigger_params_interval() {
|
||||
let params = serde_json::json!({
|
||||
"interval": 30,
|
||||
"unit": "seconds"
|
||||
}"#;
|
||||
});
|
||||
|
||||
let config: TimerConfig = serde_json::from_str(json).unwrap();
|
||||
let config = TimerConfig::from_trigger_params("core.intervaltimer", params).unwrap();
|
||||
assert_eq!(config.interval_seconds(), Some(30));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timer_config_deserialization_interval_default_unit() {
|
||||
let json = r#"{
|
||||
"type": "interval",
|
||||
fn test_timer_config_from_trigger_params_interval_default_unit() {
|
||||
let params = serde_json::json!({
|
||||
"interval": 60
|
||||
}"#;
|
||||
});
|
||||
|
||||
let config: TimerConfig = serde_json::from_str(json).unwrap();
|
||||
let config = TimerConfig::from_trigger_params("core.intervaltimer", params).unwrap();
|
||||
assert_eq!(config.interval_seconds(), Some(60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timer_config_deserialization_cron() {
|
||||
let json = r#"{
|
||||
"type": "cron",
|
||||
fn test_timer_config_from_trigger_params_cron() {
|
||||
let params = serde_json::json!({
|
||||
"expression": "0 0 * * *"
|
||||
}"#;
|
||||
});
|
||||
|
||||
let config: TimerConfig = serde_json::from_str(json).unwrap();
|
||||
let config = TimerConfig::from_trigger_params("core.crontimer", params).unwrap();
|
||||
assert_eq!(config.cron_expression(), Some("0 0 * * *"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timer_config_from_trigger_params_datetime() {
|
||||
let fire_at = chrono::Utc::now();
|
||||
let params = serde_json::json!({
|
||||
"fire_at": fire_at
|
||||
});
|
||||
|
||||
let config = TimerConfig::from_trigger_params("core.datetimetimer", params).unwrap();
|
||||
assert_eq!(config.fire_time(), Some(fire_at));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_timer_config_from_trigger_params_unknown_trigger() {
|
||||
let params = serde_json::json!({
|
||||
"interval": 30
|
||||
});
|
||||
|
||||
let result = TimerConfig::from_trigger_params("unknown.trigger", params);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rule_lifecycle_event_rule_id() {
|
||||
let event = RuleLifecycleEvent::RuleCreated {
|
||||
|
||||
@@ -276,7 +276,7 @@ impl ExecutionScheduler {
|
||||
MessageEnvelope::new(MessageType::ExecutionRequested, payload).with_source("executor");
|
||||
|
||||
// Publish to worker-specific queue with routing key
|
||||
let routing_key = format!("worker.{}", worker_id);
|
||||
let routing_key = format!("execution.dispatch.worker.{}", worker_id);
|
||||
let exchange = "attune.executions";
|
||||
|
||||
publisher
|
||||
|
||||
@@ -88,13 +88,27 @@ impl ExecutorService {
|
||||
let mq_connection = Connection::connect(mq_url).await?;
|
||||
info!("Message queue connection established");
|
||||
|
||||
// Setup message queue infrastructure (exchanges, queues, bindings)
|
||||
// Setup common message queue infrastructure (exchanges and DLX)
|
||||
let mq_config = MessageQueueConfig::default();
|
||||
match mq_connection.setup_infrastructure(&mq_config).await {
|
||||
Ok(_) => info!("Message queue infrastructure setup completed"),
|
||||
match mq_connection.setup_common_infrastructure(&mq_config).await {
|
||||
Ok(_) => info!("Common message queue infrastructure setup completed"),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to setup MQ infrastructure (may already exist): {}",
|
||||
"Failed to setup common MQ infrastructure (may already exist): {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Setup executor-specific queues and bindings
|
||||
match mq_connection
|
||||
.setup_executor_infrastructure(&mq_config)
|
||||
.await
|
||||
{
|
||||
Ok(_) => info!("Executor message queue infrastructure setup completed"),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to setup executor MQ infrastructure (may already exist): {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
|
||||
@@ -52,6 +52,37 @@ impl SensorService {
|
||||
let mq = MessageQueue::connect(&mq_config.url).await?;
|
||||
info!("Message queue connection established");
|
||||
|
||||
// Setup common message queue infrastructure (exchanges and DLX)
|
||||
let mq_setup_config = attune_common::mq::MessageQueueConfig::default();
|
||||
match mq
|
||||
.get_connection()
|
||||
.setup_common_infrastructure(&mq_setup_config)
|
||||
.await
|
||||
{
|
||||
Ok(_) => info!("Common message queue infrastructure setup completed"),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to setup common MQ infrastructure (may already exist): {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Setup sensor-specific queues and bindings
|
||||
match mq
|
||||
.get_connection()
|
||||
.setup_sensor_infrastructure(&mq_setup_config)
|
||||
.await
|
||||
{
|
||||
Ok(_) => info!("Sensor message queue infrastructure setup completed"),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to setup sensor MQ infrastructure (may already exist): {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Create service components
|
||||
info!("Creating service components...");
|
||||
|
||||
|
||||
@@ -77,13 +77,13 @@ impl WorkerService {
|
||||
.map_err(|e| Error::Internal(format!("Failed to connect to message queue: {}", e)))?;
|
||||
info!("Message queue connection established");
|
||||
|
||||
// Setup message queue infrastructure (exchanges, queues, bindings)
|
||||
// Setup common message queue infrastructure (exchanges and DLX)
|
||||
let mq_config = MqConfig::default();
|
||||
match mq_connection.setup_infrastructure(&mq_config).await {
|
||||
Ok(_) => info!("Message queue infrastructure setup completed"),
|
||||
match mq_connection.setup_common_infrastructure(&mq_config).await {
|
||||
Ok(_) => info!("Common message queue infrastructure setup completed"),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to setup MQ infrastructure (may already exist): {}",
|
||||
"Failed to setup common MQ infrastructure (may already exist): {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
@@ -278,6 +278,16 @@ impl WorkerService {
|
||||
|
||||
info!("Worker registered with ID: {}", worker_id);
|
||||
|
||||
// Setup worker-specific message queue infrastructure
|
||||
let mq_config = MqConfig::default();
|
||||
self.mq_connection
|
||||
.setup_worker_infrastructure(worker_id, &mq_config)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Error::Internal(format!("Failed to setup worker MQ infrastructure: {}", e))
|
||||
})?;
|
||||
info!("Worker-specific message queue infrastructure setup completed");
|
||||
|
||||
// Start heartbeat
|
||||
self.heartbeat.start().await?;
|
||||
|
||||
@@ -316,40 +326,10 @@ impl WorkerService {
|
||||
.worker_id
|
||||
.ok_or_else(|| Error::Internal("Worker not registered".to_string()))?;
|
||||
|
||||
// Create queue name for this worker
|
||||
// Queue name for this worker (already created in setup_worker_infrastructure)
|
||||
let queue_name = format!("worker.{}.executions", worker_id);
|
||||
|
||||
info!("Creating worker-specific queue: {}", queue_name);
|
||||
|
||||
// Create the worker-specific queue
|
||||
let worker_queue = QueueConfig {
|
||||
name: queue_name.clone(),
|
||||
durable: false, // Worker queues are temporary
|
||||
exclusive: false,
|
||||
auto_delete: true, // Delete when worker disconnects
|
||||
};
|
||||
|
||||
self.mq_connection
|
||||
.declare_queue(&worker_queue)
|
||||
.await
|
||||
.map_err(|e| Error::Internal(format!("Failed to declare queue: {}", e)))?;
|
||||
|
||||
info!("Worker queue created: {}", queue_name);
|
||||
|
||||
// Bind the queue to the executions exchange with worker-specific routing key
|
||||
self.mq_connection
|
||||
.bind_queue(
|
||||
&queue_name,
|
||||
"attune.executions",
|
||||
&format!("worker.{}", worker_id),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Error::Internal(format!("Failed to bind queue: {}", e)))?;
|
||||
|
||||
info!(
|
||||
"Queue bound to exchange with routing key 'worker.{}'",
|
||||
worker_id
|
||||
);
|
||||
info!("Starting consumer for worker queue: {}", queue_name);
|
||||
|
||||
// Create consumer
|
||||
let consumer = Consumer::new(
|
||||
|
||||
Reference in New Issue
Block a user