diff --git a/Makefile b/Makefile index e273ff6..008edc9 100644 --- a/Makefile +++ b/Makefile @@ -167,13 +167,13 @@ db-reset: db-drop db-create db-migrate # Test database operations db-test-create: - createdb attune_test || true + psql postgresql://postgres:postgres@localhost:5432 -c "CREATE DATABASE attune_test" db-test-migrate: DATABASE_URL=postgresql://postgres:postgres@localhost:5432/attune_test sqlx migrate run db-test-drop: - dropdb attune_test || true + psql postgresql://postgres:postgres@localhost:5432 -c "DROP DATABASE attune_test" db-test-reset: db-test-drop db-test-create db-test-migrate @echo "Test database reset complete" diff --git a/config.test.yaml b/config.test.yaml index 431f5de..5220699 100644 --- a/config.test.yaml +++ b/config.test.yaml @@ -5,7 +5,7 @@ environment: test # Test database (uses separate database to avoid conflicts) database: - url: postgresql://postgres:postgres@localhost:5432/attune_test + url: postgresql://attune:attune@localhost:5432/attune_test max_connections: 10 min_connections: 2 connect_timeout: 10 diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index a0c312e..0816648 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -75,6 +75,21 @@ async fn main() -> Result<()> { Ok(mq_connection) => { info!("Message queue connection established"); + // Setup common message queue infrastructure (exchanges and DLX) + let mq_setup_config = attune_common::mq::MessageQueueConfig::default(); + match mq_connection + .setup_common_infrastructure(&mq_setup_config) + .await + { + Ok(_) => info!("Common message queue infrastructure setup completed"), + Err(e) => { + warn!( + "Failed to setup common MQ infrastructure (may already exist): {}", + e + ); + } + } + // Create publisher match Publisher::new( &mq_connection, diff --git a/crates/api/tests/pack_registry_tests.rs b/crates/api/tests/pack_registry_tests.rs index b9bfea0..93144d8 100644 --- a/crates/api/tests/pack_registry_tests.rs +++ b/crates/api/tests/pack_registry_tests.rs @@ -12,7 +12,7 @@ mod helpers; use attune_common::{ models::Pack, pack_registry::calculate_directory_checksum, - repositories::{pack::PackRepository, List}, + repositories::{pack::PackRepository, FindById, List}, }; use helpers::{Result, TestContext}; use serde_json::json; diff --git a/crates/cli/tests/common/mod.rs b/crates/cli/tests/common/mod.rs index d19f6aa..31546d4 100644 --- a/crates/cli/tests/common/mod.rs +++ b/crates/cli/tests/common/mod.rs @@ -135,19 +135,14 @@ pub async fn mock_login_failure(server: &MockServer) { /// Mock a whoami response #[allow(dead_code)] -pub async fn mock_whoami_success(server: &MockServer, username: &str, email: &str) { +pub async fn mock_whoami_success(server: &MockServer, username: &str, display_name: &str) { Mock::given(method("GET")) - .and(path("/auth/whoami")) + .and(path("/auth/me")) .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": { "id": 1, - "name": "Test User", - "username": username, - "email": email, - "identity_type": "user", - "enabled": true, - "created": "2024-01-01T00:00:00Z", - "updated": "2024-01-01T00:00:00Z" + "login": username, + "display_name": display_name } }))) .mount(server) diff --git a/crates/cli/tests/test_auth.rs b/crates/cli/tests/test_auth.rs index cd63f10..99e9c6f 100644 --- a/crates/cli/tests/test_auth.rs +++ b/crates/cli/tests/test_auth.rs @@ -75,7 +75,7 @@ async fn test_whoami_authenticated() { fixture.write_authenticated_config("valid_token", "refresh_token"); // Mock whoami endpoint - mock_whoami_success(&fixture.mock_server, "testuser", "test@example.com").await; + mock_whoami_success(&fixture.mock_server, "testuser", "Test User").await; let mut cmd = Command::cargo_bin("attune").unwrap(); cmd.env("XDG_CONFIG_HOME", fixture.config_dir_path()) @@ -88,7 +88,7 @@ async fn test_whoami_authenticated() { cmd.assert() .success() .stdout(predicate::str::contains("testuser")) - .stdout(predicate::str::contains("test@example.com")); + .stdout(predicate::str::contains("Test User")); } #[tokio::test] @@ -97,7 +97,7 @@ async fn test_whoami_unauthenticated() { fixture.write_default_config(); // Mock unauthorized response - mock_unauthorized(&fixture.mock_server, "/auth/whoami").await; + mock_unauthorized(&fixture.mock_server, "/auth/me").await; let mut cmd = Command::cargo_bin("attune").unwrap(); cmd.env("XDG_CONFIG_HOME", fixture.config_dir_path()) @@ -185,7 +185,7 @@ async fn test_whoami_json_output() { fixture.write_authenticated_config("valid_token", "refresh_token"); // Mock whoami endpoint - mock_whoami_success(&fixture.mock_server, "testuser", "test@example.com").await; + mock_whoami_success(&fixture.mock_server, "testuser", "Test User").await; let mut cmd = Command::cargo_bin("attune").unwrap(); cmd.env("XDG_CONFIG_HOME", fixture.config_dir_path()) @@ -198,7 +198,7 @@ async fn test_whoami_json_output() { cmd.assert() .success() - .stdout(predicate::str::contains(r#""username":"#)) + .stdout(predicate::str::contains(r#""login":"#)) .stdout(predicate::str::contains("testuser")); } @@ -208,7 +208,7 @@ async fn test_whoami_yaml_output() { fixture.write_authenticated_config("valid_token", "refresh_token"); // Mock whoami endpoint - mock_whoami_success(&fixture.mock_server, "testuser", "test@example.com").await; + mock_whoami_success(&fixture.mock_server, "testuser", "Test User").await; let mut cmd = Command::cargo_bin("attune").unwrap(); cmd.env("XDG_CONFIG_HOME", fixture.config_dir_path()) @@ -221,6 +221,6 @@ async fn test_whoami_yaml_output() { cmd.assert() .success() - .stdout(predicate::str::contains("username:")) + .stdout(predicate::str::contains("login:")) .stdout(predicate::str::contains("testuser")); } diff --git a/crates/common/src/db.rs b/crates/common/src/db.rs index 7ad693b..ad8ffee 100644 --- a/crates/common/src/db.rs +++ b/crates/common/src/db.rs @@ -30,15 +30,6 @@ impl Database { Self::validate_schema_name(&schema)?; // Log schema configuration prominently - if schema != "attune" { - warn!( - "Using non-standard schema: '{}'. Production should use 'attune'", - schema - ); - } else { - info!("Using production schema: {}", schema); - } - info!( "Connecting to database with max_connections={}, schema={}", config.max_connections, schema diff --git a/crates/common/src/mq/connection.rs b/crates/common/src/mq/connection.rs index 0511ba3..dd7f7b2 100644 --- a/crates/common/src/mq/connection.rs +++ b/crates/common/src/mq/connection.rs @@ -314,9 +314,15 @@ impl Connection { Ok(()) } - /// Setup complete infrastructure (exchanges, queues, bindings) - pub async fn setup_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> { - info!("Setting up RabbitMQ infrastructure"); + /// Setup common infrastructure (exchanges, DLX) - safe to call from any service + /// + /// This sets up the shared infrastructure that all services need: + /// - All exchanges (events, executions, notifications) + /// - Dead letter exchange (if enabled) + /// + /// This is idempotent and can be called by multiple services safely. + pub async fn setup_common_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> { + info!("Setting up common RabbitMQ infrastructure (exchanges and DLX)"); // Declare exchanges self.declare_exchange(&config.rabbitmq.exchanges.events) @@ -335,83 +341,63 @@ impl Connection { auto_delete: false, }; self.declare_exchange(&dlx_config).await?; + + // Declare dead letter queue (derive name from exchange) + let dlq_name = format!("{}.queue", config.rabbitmq.dead_letter.exchange); + let dlq_config = QueueConfig { + name: dlq_name.clone(), + durable: true, + exclusive: false, + auto_delete: false, + }; + self.declare_queue(&dlq_config).await?; + + // Bind DLQ to DLX + self.bind_queue(&dlq_name, &config.rabbitmq.dead_letter.exchange, "#") + .await?; } - // Declare queues with or without DLX - let dlx_exchange = if config.rabbitmq.dead_letter.enabled { + info!("Common RabbitMQ infrastructure setup complete"); + Ok(()) + } + + /// Setup executor-specific queues and bindings + pub async fn setup_executor_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> { + info!("Setting up Executor infrastructure"); + + let dlx = if config.rabbitmq.dead_letter.enabled { Some(config.rabbitmq.dead_letter.exchange.as_str()) } else { None }; - if let Some(dlx) = dlx_exchange { - self.declare_queue_with_dlx(&config.rabbitmq.queues.events, dlx) - .await?; - self.declare_queue_with_dlx(&config.rabbitmq.queues.executions, dlx) - .await?; - self.declare_queue_with_dlx(&config.rabbitmq.queues.enforcements, dlx) - .await?; - self.declare_queue_with_dlx(&config.rabbitmq.queues.execution_requests, dlx) - .await?; - self.declare_queue_with_dlx(&config.rabbitmq.queues.execution_status, dlx) - .await?; - self.declare_queue_with_dlx(&config.rabbitmq.queues.execution_completed, dlx) - .await?; - self.declare_queue_with_dlx(&config.rabbitmq.queues.inquiry_responses, dlx) - .await?; - self.declare_queue_with_dlx(&config.rabbitmq.queues.notifications, dlx) - .await?; - } else { - self.declare_queue(&config.rabbitmq.queues.events).await?; - self.declare_queue(&config.rabbitmq.queues.executions) - .await?; - self.declare_queue(&config.rabbitmq.queues.enforcements) - .await?; - self.declare_queue(&config.rabbitmq.queues.execution_requests) - .await?; - self.declare_queue(&config.rabbitmq.queues.execution_status) - .await?; - self.declare_queue(&config.rabbitmq.queues.execution_completed) - .await?; - self.declare_queue(&config.rabbitmq.queues.inquiry_responses) - .await?; - self.declare_queue(&config.rabbitmq.queues.notifications) - .await?; - } + // Declare executor queues + self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.enforcements, dlx) + .await?; + self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.execution_requests, dlx) + .await?; + self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.execution_status, dlx) + .await?; + self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.execution_completed, dlx) + .await?; + self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.inquiry_responses, dlx) + .await?; // Bind queues to exchanges - self.bind_queue( - &config.rabbitmq.queues.events.name, - &config.rabbitmq.exchanges.events.name, - "#", // All events (topic exchange) - ) - .await?; - - // LEGACY BINDING DISABLED: This was causing all messages to go to the legacy queue - // instead of being routed to the new specific queues (execution_requests, enforcements, etc.) - // self.bind_queue( - // &config.rabbitmq.queues.executions.name, - // &config.rabbitmq.exchanges.executions.name, - // "#", // All execution-related messages (topic exchange) - legacy, to be deprecated - // ) - // .await?; - - // Bind new executor-specific queues self.bind_queue( &config.rabbitmq.queues.enforcements.name, &config.rabbitmq.exchanges.executions.name, - "enforcement.#", // Enforcement messages + "enforcement.#", ) .await?; self.bind_queue( &config.rabbitmq.queues.execution_requests.name, &config.rabbitmq.exchanges.executions.name, - "execution.requested", // Execution request messages + "execution.requested", ) .await?; - // Bind execution_status queue to status changed messages for ExecutionManager self.bind_queue( &config.rabbitmq.queues.execution_status.name, &config.rabbitmq.exchanges.executions.name, @@ -419,7 +405,6 @@ impl Connection { ) .await?; - // Bind execution_completed queue to completed messages for CompletionListener self.bind_queue( &config.rabbitmq.queues.execution_completed.name, &config.rabbitmq.exchanges.executions.name, @@ -427,7 +412,6 @@ impl Connection { ) .await?; - // Bind inquiry_responses queue to inquiry responded messages for InquiryHandler self.bind_queue( &config.rabbitmq.queues.inquiry_responses.name, &config.rabbitmq.exchanges.executions.name, @@ -435,16 +419,115 @@ impl Connection { ) .await?; + info!("Executor infrastructure setup complete"); + Ok(()) + } + + /// Setup worker-specific queue for a worker instance + pub async fn setup_worker_infrastructure( + &self, + worker_id: i64, + config: &MessageQueueConfig, + ) -> MqResult<()> { + info!( + "Setting up Worker infrastructure for worker ID {}", + worker_id + ); + + let queue_name = format!("worker.{}.executions", worker_id); + let queue_config = QueueConfig { + name: queue_name.clone(), + durable: true, + exclusive: false, + auto_delete: false, + }; + + let dlx = if config.rabbitmq.dead_letter.enabled { + Some(config.rabbitmq.dead_letter.exchange.as_str()) + } else { + None + }; + + self.declare_queue_with_optional_dlx(&queue_config, dlx) + .await?; + + // Bind to execution dispatch routing key self.bind_queue( - &config.rabbitmq.queues.notifications.name, - &config.rabbitmq.exchanges.notifications.name, - "", // Fanout doesn't use routing key + &queue_name, + &config.rabbitmq.exchanges.executions.name, + &format!("execution.dispatch.worker.{}", worker_id), ) .await?; - info!("RabbitMQ infrastructure setup complete"); + info!( + "Worker infrastructure setup complete for worker ID {}", + worker_id + ); Ok(()) } + + /// Setup sensor-specific queues and bindings + pub async fn setup_sensor_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> { + info!("Setting up Sensor infrastructure"); + + let dlx = if config.rabbitmq.dead_letter.enabled { + Some(config.rabbitmq.dead_letter.exchange.as_str()) + } else { + None + }; + + self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.events, dlx) + .await?; + + // Bind to all events + self.bind_queue( + &config.rabbitmq.queues.events.name, + &config.rabbitmq.exchanges.events.name, + "#", + ) + .await?; + + info!("Sensor infrastructure setup complete"); + Ok(()) + } + + /// Setup notifier-specific queues and bindings + pub async fn setup_notifier_infrastructure(&self, config: &MessageQueueConfig) -> MqResult<()> { + info!("Setting up Notifier infrastructure"); + + let dlx = if config.rabbitmq.dead_letter.enabled { + Some(config.rabbitmq.dead_letter.exchange.as_str()) + } else { + None + }; + + self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.notifications, dlx) + .await?; + + // Bind to notifications exchange (fanout, no routing key) + self.bind_queue( + &config.rabbitmq.queues.notifications.name, + &config.rabbitmq.exchanges.notifications.name, + "", + ) + .await?; + + info!("Notifier infrastructure setup complete"); + Ok(()) + } + + /// Helper to declare queue with optional DLX + async fn declare_queue_with_optional_dlx( + &self, + config: &QueueConfig, + dlx: Option<&str>, + ) -> MqResult<()> { + if let Some(dlx_exchange) = dlx { + self.declare_queue_with_dlx(config, dlx_exchange).await + } else { + self.declare_queue(config).await + } + } } /// Connection pool for managing multiple RabbitMQ connections diff --git a/crates/common/src/repositories/action.rs b/crates/common/src/repositories/action.rs index 6f47038..a5c6704 100644 --- a/crates/common/src/repositories/action.rs +++ b/crates/common/src/repositories/action.rs @@ -240,7 +240,7 @@ impl Update for ActionRepository { query.push(", updated = NOW() WHERE id = "); query.push_bind(id); - query.push(" RETURNING id, ref, pack, pack_ref, label, description, entrypoint, runtime, param_schema, out_schema, is_workflow, workflow_def, created, updated"); + query.push(" RETURNING id, ref, pack, pack_ref, label, description, entrypoint, runtime, param_schema, out_schema, is_workflow, workflow_def, is_adhoc, created, updated"); let action = query .build_query_as::() diff --git a/crates/core-timer-sensor/src/rule_listener.rs b/crates/core-timer-sensor/src/rule_listener.rs index 271fd7a..02c01b7 100644 --- a/crates/core-timer-sensor/src/rule_listener.rs +++ b/crates/core-timer-sensor/src/rule_listener.rs @@ -126,7 +126,11 @@ impl RuleLifecycleListener { for rule in rules { if rule.enabled { if let Err(e) = self - .start_timer_from_params(rule.id, Some(rule.trigger_params)) + .start_timer_from_params( + rule.id, + "core.intervaltimer", + Some(rule.trigger_params), + ) .await { error!("Failed to start timer for rule {}: {}", rule.id, e); @@ -232,7 +236,7 @@ impl RuleLifecycleListener { ); if enabled { - self.start_timer_from_params(rule_id, trigger_params) + self.start_timer_from_params(rule_id, &trigger_type, trigger_params) .await?; } else { info!("Rule {} is disabled, not starting timer", rule_id); @@ -241,6 +245,7 @@ impl RuleLifecycleListener { RuleLifecycleEvent::RuleEnabled { rule_id, rule_ref, + trigger_type, trigger_params, .. } => { @@ -249,7 +254,7 @@ impl RuleLifecycleListener { rule_id, rule_ref ); - self.start_timer_from_params(rule_id, trigger_params) + self.start_timer_from_params(rule_id, &trigger_type, trigger_params) .await?; } RuleLifecycleEvent::RuleDisabled { @@ -281,13 +286,21 @@ impl RuleLifecycleListener { async fn start_timer_from_params( &self, rule_id: i64, + trigger_ref: &str, trigger_params: Option, ) -> Result<()> { let params = trigger_params.ok_or_else(|| { anyhow::anyhow!("Timer trigger requires trigger_params but none provided") })?; - let config: TimerConfig = serde_json::from_value(params) + info!( + "Parsing timer config for rule {}: trigger_ref='{}', params={}", + rule_id, + trigger_ref, + serde_json::to_string(¶ms).unwrap_or_else(|_| "".to_string()) + ); + + let config = TimerConfig::from_trigger_params(trigger_ref, params) .context("Failed to parse trigger_params as TimerConfig")?; info!( diff --git a/crates/core-timer-sensor/src/types.rs b/crates/core-timer-sensor/src/types.rs index 4f07406..a27bc58 100644 --- a/crates/core-timer-sensor/src/types.rs +++ b/crates/core-timer-sensor/src/types.rs @@ -1,13 +1,14 @@ //! Shared types for timer sensor //! //! Defines timer configurations and common data structures. +//! Updated: 2026-02-05 - Fixed TimerConfig parsing to use trigger_ref use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; /// Timer configuration for different timer types #[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] +#[serde(untagged)] pub enum TimerConfig { /// Interval-based timer (fires every N seconds/minutes/hours) Interval { @@ -44,6 +45,75 @@ pub enum TimeUnit { } impl TimerConfig { + /// Deserialize TimerConfig from JSON value based on trigger_ref + /// + /// Maps trigger_ref to the appropriate TimerConfig variant: + /// - "core.intervaltimer" -> TimerConfig::Interval + /// - "core.crontimer" -> TimerConfig::Cron + /// - "core.datetimetimer" -> TimerConfig::DateTime + pub fn from_trigger_params( + trigger_ref: &str, + params: serde_json::Value, + ) -> anyhow::Result { + match trigger_ref { + "core.intervaltimer" => { + // Parse interval and unit from params + let interval = + params + .get("interval") + .and_then(|v| v.as_u64()) + .ok_or_else(|| { + anyhow::anyhow!( + "Missing or invalid 'interval' field in params: {}", + serde_json::to_string(¶ms) + .unwrap_or_else(|_| "".to_string()) + ) + })?; + + let unit = if let Some(unit_val) = params.get("unit") { + serde_json::from_value(unit_val.clone()) + .map_err(|e| anyhow::anyhow!("Failed to parse 'unit' field: {}", e))? + } else { + TimeUnit::Seconds + }; + + Ok(TimerConfig::Interval { interval, unit }) + } + "core.crontimer" => { + let expression = params + .get("expression") + .and_then(|v| v.as_str()) + .ok_or_else(|| { + anyhow::anyhow!( + "Missing or invalid 'expression' field in params: {}", + serde_json::to_string(¶ms) + .unwrap_or_else(|_| "".to_string()) + ) + })? + .to_string(); + + Ok(TimerConfig::Cron { expression }) + } + "core.datetimetimer" => { + let fire_at = params.get("fire_at").ok_or_else(|| { + anyhow::anyhow!( + "Missing 'fire_at' field in params: {}", + serde_json::to_string(¶ms).unwrap_or_else(|_| "".to_string()) + ) + })?; + + let fire_at: DateTime = serde_json::from_value(fire_at.clone()) + .map_err(|e| anyhow::anyhow!("Failed to parse 'fire_at' as DateTime: {}", e))?; + + Ok(TimerConfig::DateTime { fire_at }) + } + _ => Err(anyhow::anyhow!( + "Unknown timer trigger type: {}", + trigger_ref + )), + } + } + /// Calculate total interval in seconds #[allow(dead_code)] pub fn interval_seconds(&self) -> Option { @@ -204,39 +274,57 @@ mod tests { } #[test] - fn test_timer_config_deserialization_interval() { - let json = r#"{ - "type": "interval", + fn test_timer_config_from_trigger_params_interval() { + let params = serde_json::json!({ "interval": 30, "unit": "seconds" - }"#; + }); - let config: TimerConfig = serde_json::from_str(json).unwrap(); + let config = TimerConfig::from_trigger_params("core.intervaltimer", params).unwrap(); assert_eq!(config.interval_seconds(), Some(30)); } #[test] - fn test_timer_config_deserialization_interval_default_unit() { - let json = r#"{ - "type": "interval", + fn test_timer_config_from_trigger_params_interval_default_unit() { + let params = serde_json::json!({ "interval": 60 - }"#; + }); - let config: TimerConfig = serde_json::from_str(json).unwrap(); + let config = TimerConfig::from_trigger_params("core.intervaltimer", params).unwrap(); assert_eq!(config.interval_seconds(), Some(60)); } #[test] - fn test_timer_config_deserialization_cron() { - let json = r#"{ - "type": "cron", + fn test_timer_config_from_trigger_params_cron() { + let params = serde_json::json!({ "expression": "0 0 * * *" - }"#; + }); - let config: TimerConfig = serde_json::from_str(json).unwrap(); + let config = TimerConfig::from_trigger_params("core.crontimer", params).unwrap(); assert_eq!(config.cron_expression(), Some("0 0 * * *")); } + #[test] + fn test_timer_config_from_trigger_params_datetime() { + let fire_at = chrono::Utc::now(); + let params = serde_json::json!({ + "fire_at": fire_at + }); + + let config = TimerConfig::from_trigger_params("core.datetimetimer", params).unwrap(); + assert_eq!(config.fire_time(), Some(fire_at)); + } + + #[test] + fn test_timer_config_from_trigger_params_unknown_trigger() { + let params = serde_json::json!({ + "interval": 30 + }); + + let result = TimerConfig::from_trigger_params("unknown.trigger", params); + assert!(result.is_err()); + } + #[test] fn test_rule_lifecycle_event_rule_id() { let event = RuleLifecycleEvent::RuleCreated { diff --git a/crates/executor/src/scheduler.rs b/crates/executor/src/scheduler.rs index 8f70e64..e5173c3 100644 --- a/crates/executor/src/scheduler.rs +++ b/crates/executor/src/scheduler.rs @@ -276,7 +276,7 @@ impl ExecutionScheduler { MessageEnvelope::new(MessageType::ExecutionRequested, payload).with_source("executor"); // Publish to worker-specific queue with routing key - let routing_key = format!("worker.{}", worker_id); + let routing_key = format!("execution.dispatch.worker.{}", worker_id); let exchange = "attune.executions"; publisher diff --git a/crates/executor/src/service.rs b/crates/executor/src/service.rs index 71e314a..3eabaaa 100644 --- a/crates/executor/src/service.rs +++ b/crates/executor/src/service.rs @@ -88,13 +88,27 @@ impl ExecutorService { let mq_connection = Connection::connect(mq_url).await?; info!("Message queue connection established"); - // Setup message queue infrastructure (exchanges, queues, bindings) + // Setup common message queue infrastructure (exchanges and DLX) let mq_config = MessageQueueConfig::default(); - match mq_connection.setup_infrastructure(&mq_config).await { - Ok(_) => info!("Message queue infrastructure setup completed"), + match mq_connection.setup_common_infrastructure(&mq_config).await { + Ok(_) => info!("Common message queue infrastructure setup completed"), Err(e) => { warn!( - "Failed to setup MQ infrastructure (may already exist): {}", + "Failed to setup common MQ infrastructure (may already exist): {}", + e + ); + } + } + + // Setup executor-specific queues and bindings + match mq_connection + .setup_executor_infrastructure(&mq_config) + .await + { + Ok(_) => info!("Executor message queue infrastructure setup completed"), + Err(e) => { + warn!( + "Failed to setup executor MQ infrastructure (may already exist): {}", e ); } diff --git a/crates/sensor/src/service.rs b/crates/sensor/src/service.rs index 16fd7a9..7f317de 100644 --- a/crates/sensor/src/service.rs +++ b/crates/sensor/src/service.rs @@ -52,6 +52,37 @@ impl SensorService { let mq = MessageQueue::connect(&mq_config.url).await?; info!("Message queue connection established"); + // Setup common message queue infrastructure (exchanges and DLX) + let mq_setup_config = attune_common::mq::MessageQueueConfig::default(); + match mq + .get_connection() + .setup_common_infrastructure(&mq_setup_config) + .await + { + Ok(_) => info!("Common message queue infrastructure setup completed"), + Err(e) => { + warn!( + "Failed to setup common MQ infrastructure (may already exist): {}", + e + ); + } + } + + // Setup sensor-specific queues and bindings + match mq + .get_connection() + .setup_sensor_infrastructure(&mq_setup_config) + .await + { + Ok(_) => info!("Sensor message queue infrastructure setup completed"), + Err(e) => { + warn!( + "Failed to setup sensor MQ infrastructure (may already exist): {}", + e + ); + } + } + // Create service components info!("Creating service components..."); diff --git a/crates/worker/src/service.rs b/crates/worker/src/service.rs index 80b8401..b1a19d8 100644 --- a/crates/worker/src/service.rs +++ b/crates/worker/src/service.rs @@ -77,13 +77,13 @@ impl WorkerService { .map_err(|e| Error::Internal(format!("Failed to connect to message queue: {}", e)))?; info!("Message queue connection established"); - // Setup message queue infrastructure (exchanges, queues, bindings) + // Setup common message queue infrastructure (exchanges and DLX) let mq_config = MqConfig::default(); - match mq_connection.setup_infrastructure(&mq_config).await { - Ok(_) => info!("Message queue infrastructure setup completed"), + match mq_connection.setup_common_infrastructure(&mq_config).await { + Ok(_) => info!("Common message queue infrastructure setup completed"), Err(e) => { warn!( - "Failed to setup MQ infrastructure (may already exist): {}", + "Failed to setup common MQ infrastructure (may already exist): {}", e ); } @@ -278,6 +278,16 @@ impl WorkerService { info!("Worker registered with ID: {}", worker_id); + // Setup worker-specific message queue infrastructure + let mq_config = MqConfig::default(); + self.mq_connection + .setup_worker_infrastructure(worker_id, &mq_config) + .await + .map_err(|e| { + Error::Internal(format!("Failed to setup worker MQ infrastructure: {}", e)) + })?; + info!("Worker-specific message queue infrastructure setup completed"); + // Start heartbeat self.heartbeat.start().await?; @@ -316,40 +326,10 @@ impl WorkerService { .worker_id .ok_or_else(|| Error::Internal("Worker not registered".to_string()))?; - // Create queue name for this worker + // Queue name for this worker (already created in setup_worker_infrastructure) let queue_name = format!("worker.{}.executions", worker_id); - info!("Creating worker-specific queue: {}", queue_name); - - // Create the worker-specific queue - let worker_queue = QueueConfig { - name: queue_name.clone(), - durable: false, // Worker queues are temporary - exclusive: false, - auto_delete: true, // Delete when worker disconnects - }; - - self.mq_connection - .declare_queue(&worker_queue) - .await - .map_err(|e| Error::Internal(format!("Failed to declare queue: {}", e)))?; - - info!("Worker queue created: {}", queue_name); - - // Bind the queue to the executions exchange with worker-specific routing key - self.mq_connection - .bind_queue( - &queue_name, - "attune.executions", - &format!("worker.{}", worker_id), - ) - .await - .map_err(|e| Error::Internal(format!("Failed to bind queue: {}", e)))?; - - info!( - "Queue bound to exchange with routing key 'worker.{}'", - worker_id - ); + info!("Starting consumer for worker queue: {}", queue_name); // Create consumer let consumer = Consumer::new( diff --git a/docker-compose.yaml b/docker-compose.yaml index 5391d04..c962fbc 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -456,7 +456,7 @@ services: ATTUNE_MQ_URL: amqp://attune:attune@rabbitmq:5672 ATTUNE_PACKS_BASE_DIR: /opt/attune/packs volumes: - - packs_data:/opt/attune/packs:ro + - packs_data:/opt/attune/packs:rw - ./packs.dev:/opt/attune/packs.dev:rw - sensor_logs:/opt/attune/logs depends_on: diff --git a/docker/init-packs.sh b/docker/init-packs.sh index 00bdbaa..aa2bdbf 100755 --- a/docker/init-packs.sh +++ b/docker/init-packs.sh @@ -95,10 +95,15 @@ for pack_dir in "$SOURCE_PACKS_DIR"/*; do target_pack_dir="$TARGET_PACKS_DIR/$pack_name" if [ -d "$target_pack_dir" ]; then - # Pack exists, check if we should update - # For now, we'll skip if it exists (idempotent on restart) - echo -e "${YELLOW} ⊘${NC} Pack already exists at: $target_pack_dir" - echo -e "${BLUE} ℹ${NC} Skipping copy (use fresh volume to reload)" + # Pack exists, update files to ensure we have latest (especially binaries) + echo -e "${YELLOW} ⟳${NC} Pack exists at: $target_pack_dir, updating files..." + cp -rf "$pack_dir"/* "$target_pack_dir"/ + if [ $? -eq 0 ]; then + echo -e "${GREEN} ✓${NC} Updated pack files at: $target_pack_dir" + else + echo -e "${RED} ✗${NC} Failed to update pack" + exit 1 + fi else # Copy pack to target directory echo -e "${YELLOW} →${NC} Copying pack files..." diff --git a/docker/sensor-entrypoint.sh b/docker/sensor-entrypoint.sh new file mode 100644 index 0000000..8f93ff1 --- /dev/null +++ b/docker/sensor-entrypoint.sh @@ -0,0 +1,48 @@ +#!/bin/sh +# Sensor service entrypoint script +# Copies sensor binary from image to shared volume on startup + +set -e + +echo "Sensor entrypoint: Checking for sensor binary..." + +# Path to sensor binary in the image (baked in during build) +BINARY_IN_IMAGE="/opt/attune/packs-image/core/sensors/attune-core-timer-sensor" + +# Destination in the shared volume +BINARY_DEST="/opt/attune/packs/core/sensors/attune-core-timer-sensor" + +# Create sensors directory if it doesn't exist +mkdir -p "/opt/attune/packs/core/sensors" + +# Check if we have a binary to copy from a different location in the image +# The Dockerfile copies the binary, but it gets hidden by the volume mount +# So we need to copy it from the image layer to the mounted volume + +# Try to find the binary from docker build +if [ -f "$BINARY_IN_IMAGE" ]; then + echo "Copying sensor binary from $BINARY_IN_IMAGE to $BINARY_DEST" + cp -f "$BINARY_IN_IMAGE" "$BINARY_DEST" + chmod +x "$BINARY_DEST" + echo "✓ Sensor binary updated in shared volume" +elif [ ! -f "$BINARY_DEST" ]; then + echo "ERROR: Sensor binary not found in image and not present in volume" + echo "Expected at: $BINARY_IN_IMAGE or $BINARY_DEST" + exit 1 +else + echo "Using existing sensor binary in shared volume: $BINARY_DEST" +fi + +# Verify binary exists and is executable +if [ -f "$BINARY_DEST" ] && [ -x "$BINARY_DEST" ]; then + echo "✓ Sensor binary ready: $BINARY_DEST" + ls -lh "$BINARY_DEST" +else + echo "ERROR: Sensor binary not executable or not found: $BINARY_DEST" + exit 1 +fi + +echo "Starting Attune Sensor Service..." + +# Execute the main service +exec /usr/local/bin/attune-service "$@" diff --git a/docs/architecture/queue-ownership.md b/docs/architecture/queue-ownership.md new file mode 100644 index 0000000..5038ab2 --- /dev/null +++ b/docs/architecture/queue-ownership.md @@ -0,0 +1,359 @@ +# RabbitMQ Queue Ownership Architecture + +**Last Updated:** 2026-02-05 +**Status:** Implemented + +## Overview + +Attune uses a **service-specific infrastructure setup** pattern where each service is responsible for declaring only the queues it consumes. This provides clear ownership, reduces redundancy, and makes the system architecture more maintainable. + +## Principle + +**Each service declares the queues it consumes.** + +This follows the principle that the consumer owns the queue declaration, ensuring that: +- Queue configuration is co-located with the service that uses it +- Services can start in any order (all operations are idempotent) +- Ownership is clear from the codebase structure +- Changes to queue configuration are localized to the consuming service + +## Infrastructure Layers + +### Common Infrastructure (Shared by All Services) + +**Declared by:** Any service on startup (first-to-start wins, idempotent) +**Responsibility:** Ensures basic messaging infrastructure exists + +**Components:** +- **Exchanges:** + - `attune.events` (topic) - Event routing + - `attune.executions` (topic) - Execution lifecycle routing + - `attune.notifications` (fanout) - Real-time notifications +- **Dead Letter Exchange (DLX):** + - `attune.dlx` (direct) - Failed message handling + - `attune.dlx.queue` - Dead letter queue bound to DLX + +**Setup Method:** `Connection::setup_common_infrastructure()` + +### Service-Specific Infrastructure + +Each service declares only the queues it consumes: + +## Service Responsibilities + +### Executor Service + +**Role:** Orchestrates execution lifecycle, enforces rules, manages inquiries + +**Queues Owned:** +- `attune.enforcements.queue` + - Routing: `enforcement.#` + - Purpose: Rule enforcement requests +- `attune.execution.requests.queue` + - Routing: `execution.requested` + - Purpose: New execution requests +- `attune.execution.status.queue` + - Routing: `execution.status.changed` + - Purpose: Execution status updates from workers +- `attune.execution.completed.queue` + - Routing: `execution.completed` + - Purpose: Completed execution results +- `attune.inquiry.responses.queue` + - Routing: `inquiry.responded` + - Purpose: Human-in-the-loop responses + +**Setup Method:** `Connection::setup_executor_infrastructure()` + +**Code Location:** `crates/executor/src/service.rs` + +### Worker Service + +**Role:** Execute actions in various runtimes (shell, Python, Node.js, containers) + +**Queues Owned:** +- `worker.{id}.executions` (per worker instance) + - Routing: `execution.dispatch.worker.{id}` + - Purpose: Execution tasks dispatched to this specific worker + - Properties: Durable, auto-delete on disconnect + +**Setup Method:** `Connection::setup_worker_infrastructure(worker_id, config)` + +**Code Location:** `crates/worker/src/service.rs` + +**Notes:** +- Each worker instance gets its own queue +- Worker ID is assigned during registration +- Queue is created after successful registration +- Multiple workers can exist for load distribution + +### Sensor Service + +**Role:** Monitor for events and generate trigger instances + +**Queues Owned:** +- `attune.events.queue` + - Routing: `#` (all events) + - Purpose: Events generated by sensors and triggers + +**Setup Method:** `Connection::setup_sensor_infrastructure()` + +**Code Location:** `crates/sensor/src/service.rs` + +### Notifier Service + +**Role:** Real-time notifications via WebSockets + +**Queues Owned:** +- `attune.notifications.queue` + - Routing: `` (fanout, no routing key) + - Purpose: System notifications for WebSocket broadcasting + +**Setup Method:** `Connection::setup_notifier_infrastructure()` + +**Code Location:** `crates/notifier/src/service.rs` + +**Notes:** +- Uses fanout exchange (broadcasts to all consumers) +- Also uses PostgreSQL LISTEN/NOTIFY for database events + +### API Service + +**Role:** HTTP gateway for client interactions + +**Queues Owned:** None (API only publishes, doesn't consume) + +**Setup Method:** `Connection::setup_common_infrastructure()` only + +**Code Location:** `crates/api/src/main.rs` + +**Notes:** +- Only needs exchanges to publish messages +- Does not consume from any queues +- Publishes to various exchanges (events, executions, notifications) + +## Queue Configuration + +All queues are configured with: +- **Durable:** `true` (survives broker restarts) +- **Exclusive:** `false` (accessible by multiple connections) +- **Auto-delete:** `false` (persist even without consumers) +- **Dead Letter Exchange:** `attune.dlx` (enabled by default) + +Exception: +- Worker-specific queues may have different settings based on worker lifecycle + +## Message Flow Examples + +### Rule Enforcement Flow +``` +Event Created + → `attune.events` exchange + → `attune.events.queue` (consumed by Executor) + → Rule evaluation + → `enforcement.created` published to `attune.executions` + → `attune.enforcements.queue` (consumed by Executor) +``` + +### Execution Flow +``` +Execution Requested (from API) + → `attune.executions` exchange (routing: execution.requested) + → `attune.execution.requests.queue` (consumed by Executor/Scheduler) + → Executor dispatches to worker + → `execution.dispatch.worker.{id}` to `attune.executions` + → `worker.{id}.executions` (consumed by Worker) + → Worker executes action + → `execution.completed` to `attune.executions` + → `attune.execution.completed.queue` (consumed by Executor) +``` + +### Notification Flow +``` +System Event Occurs + → `attune.notifications` exchange (fanout) + → `attune.notifications.queue` (consumed by Notifier) + → WebSocket broadcast to connected clients +``` + +## Implementation Details + +### Setup Call Order + +Each service follows this pattern: + +```rust +// 1. Connect to RabbitMQ +let mq_connection = Connection::connect(mq_url).await?; + +// 2. Setup common infrastructure (exchanges, DLX) +mq_connection.setup_common_infrastructure(&config).await?; + +// 3. Setup service-specific queues +mq_connection.setup_SERVICE_infrastructure(&config).await?; +// (where SERVICE is executor, worker, sensor, or notifier) +``` + +### Idempotency + +All setup operations are idempotent: +- Declaring an existing exchange/queue with the same settings succeeds +- Multiple services can call `setup_common_infrastructure()` safely +- Services can start in any order + +### Error Handling + +Setup failures are logged but not fatal: +- Queues may already exist from previous runs +- Another service may have created the infrastructure +- Only actual consumption failures should stop the service + +## Startup Sequence + +**Typical Docker Compose Startup:** + +1. **PostgreSQL** - Starts first (dependency) +2. **RabbitMQ** - Starts first (dependency) +3. **Migrations** - Runs database migrations +4. **Services start in parallel:** + - **API** - Creates common infrastructure + - **Executor** - Creates common + executor infrastructure + - **Workers** - Each creates common + worker-specific queue + - **Sensor** - Creates common + sensor infrastructure + - **Notifier** - Creates common + notifier infrastructure + +The first service to start creates the common infrastructure. All subsequent services find it already exists and proceed. + +## Benefits + +✅ **Clear Ownership** - Code inspection shows which service owns which queue +✅ **Reduced Redundancy** - Each queue declared exactly once (per service type) +✅ **Better Debugging** - Queue issues isolated to specific services +✅ **Improved Maintainability** - Changes to queue config localized +✅ **Self-Documenting** - Code structure reflects system architecture +✅ **Order Independence** - Services can start in any order +✅ **Monitoring** - Can track which service created infrastructure + +## Monitoring and Verification + +### RabbitMQ Management UI + +Access at `http://localhost:15672` (credentials: `guest`/`guest`) + +**Expected Queues:** +- `attune.dlx.queue` - Dead letter queue +- `attune.events.queue` - Events (Sensor) +- `attune.enforcements.queue` - Enforcements (Executor) +- `attune.execution.requests.queue` - Execution requests (Executor) +- `attune.execution.status.queue` - Status updates (Executor) +- `attune.execution.completed.queue` - Completions (Executor) +- `attune.inquiry.responses.queue` - Inquiry responses (Executor) +- `attune.notifications.queue` - Notifications (Notifier) +- `worker.{id}.executions` - Worker queues (one per worker) + +### Verification Commands + +```bash +# Check which queues exist +docker compose exec rabbitmq rabbitmqctl list_queues name messages + +# Check queue bindings +docker compose exec rabbitmq rabbitmqctl list_bindings + +# Check who's consuming from queues +docker compose exec rabbitmq rabbitmqctl list_consumers +``` + +### Log Verification + +Each service logs its infrastructure setup: + +```bash +# API (common only) +docker compose logs api | grep "infrastructure setup" + +# Executor (common + executor) +docker compose logs executor | grep "infrastructure setup" + +# Workers (common + worker-specific) +docker compose logs worker-shell | grep "infrastructure setup" + +# Sensor (common + sensor) +docker compose logs sensor | grep "infrastructure setup" +``` + +## Troubleshooting + +### Queue Already Exists with Different Settings + +**Error:** `PRECONDITION_FAILED - inequivalent arg 'durable' for queue...` + +**Cause:** Queue exists with different configuration than code expects + +**Solution:** +```bash +# Stop services +docker compose down + +# Remove RabbitMQ volume to clear all queues +docker volume rm attune_rabbitmq_data + +# Restart services +docker compose up -d +``` + +### Service Can't Connect to RabbitMQ + +**Check:** Is RabbitMQ healthy? +```bash +docker compose ps rabbitmq +``` + +**Check:** RabbitMQ logs for errors +```bash +docker compose logs rabbitmq +``` + +### Messages Not Being Consumed + +1. **Check queue has consumers:** + ```bash + docker compose exec rabbitmq rabbitmqctl list_consumers + ``` + +2. **Check service is running:** + ```bash + docker compose ps + ``` + +3. **Check service logs for consumer startup:** + ```bash + docker compose logs | grep "Consumer started" + ``` + +## Migration from Old Architecture + +**Previous Behavior:** All services called `setup_infrastructure()` which created ALL queues + +**New Behavior:** Each service calls its specific setup method + +**Migration Steps:** +1. Update to latest code +2. Stop all services: `docker compose down` +3. Clear RabbitMQ volume: `docker volume rm attune_rabbitmq_data` +4. Start services: `docker compose up -d` + +No data loss occurs as message queues are transient infrastructure. + +## Related Documentation + +- [Queue Architecture](queue-architecture.md) - Overall queue design +- [RabbitMQ Queues Quick Reference](../../QUICKREF-rabbitmq-queues.md) +- [Executor Service](executor-service.md) +- [Worker Service](worker-service.md) +- [Sensor Service](sensor-service.md) + +## Change History + +| Date | Change | Author | +|------|--------|--------| +| 2026-02-05 | Initial implementation of service-specific queue ownership | AI Assistant | \ No newline at end of file diff --git a/docs/deployment/production-deployment.md b/docs/deployment/production-deployment.md index f89b758..c510c82 100644 --- a/docs/deployment/production-deployment.md +++ b/docs/deployment/production-deployment.md @@ -31,60 +31,6 @@ Before deploying Attune to production, verify the following: --- -## Database Configuration - -### Critical: Schema Configuration - -**Production MUST use the `attune` schema.** - -The schema configuration is set in `config.production.yaml`: - -```yaml -database: - schema: "attune" # REQUIRED: Do not remove or change -``` - -### Why This Matters - -- **Test Isolation**: Tests use dynamic schemas (e.g., `test_uuid`) for isolation -- **Production Consistency**: All production services must use the same schema -- **Migration Safety**: Migrations expect the `attune` schema in production - -### Verification - -You can verify the schema configuration in several ways: - -1. **Check Configuration File**: Ensure `config.production.yaml` has `schema: "attune"` - -2. **Check Environment Variable** (if overriding): - ```bash - echo $ATTUNE__DATABASE__SCHEMA - # Should output: attune - ``` - -3. **Check Application Logs** on startup: - ``` - INFO Using production schema: attune - ``` - -4. **Query Database**: - ```sql - SELECT current_schema(); - -- Should return: attune - ``` - -### ⚠️ WARNING - -If the schema is **not** set to `attune` in production, you will see this warning in logs: - -``` -WARN Using non-standard schema: 'test_xyz'. Production should use 'attune' -``` - -**If you see this warning in production, STOP and fix the configuration immediately.** - ---- - ## Environment Variables ### Required Variables @@ -351,18 +297,6 @@ Set up monitoring for: ## Troubleshooting -### Issue: Wrong Schema in Production - -**Symptoms:** -- Log shows: `WARN Using non-standard schema: 'something_else'` -- Database queries fail or return no data - -**Solution:** -1. Check `config.production.yaml` has `schema: "attune"` -2. Check for environment variable override: `echo $ATTUNE__DATABASE__SCHEMA` -3. Restart the application after fixing configuration -4. Verify logs show: `INFO Using production schema: attune` - ### Issue: Schema Not Found **Symptoms:** diff --git a/docs/plans/schema-per-test-refactor.md b/docs/plans/schema-per-test-refactor.md index 4f6264c..a6b1a12 100644 --- a/docs/plans/schema-per-test-refactor.md +++ b/docs/plans/schema-per-test-refactor.md @@ -795,14 +795,7 @@ impl Database { Self::validate_schema_name(&schema)?; // Log prominently - if schema != "attune" { - tracing::warn!( - "Using non-standard schema: {}. Production should use 'attune'", - schema - ); - } else { - tracing::info!("Using production schema: {}", schema); - } + tracing::info!("Using schema: {}", schema); // ... rest of implementation } @@ -1696,4 +1689,4 @@ All deliverables completed and verified: **Document Version:** 1.0 **Last Updated:** 2026-01-28 **Plan Status:** Ready for execution -**Next Review:** After Phase 3 completion \ No newline at end of file +**Next Review:** After Phase 3 completion diff --git a/docs/testing/schema-per-test.md b/docs/testing/schema-per-test.md index 53e4516..a3ae289 100644 --- a/docs/testing/schema-per-test.md +++ b/docs/testing/schema-per-test.md @@ -163,38 +163,6 @@ async fn test_something() { } ``` -## Production vs. Test Configuration - -### Production Configuration - -Production always uses the `attune` schema: - -```yaml -# config.production.yaml -database: - schema: "attune" # REQUIRED: Do not change -``` - -The database layer validates and logs schema usage: - -```rust -if schema != "attune" { - tracing::warn!("Using non-standard schema: '{}'. Production should use 'attune'", schema); -} else { - tracing::info!("Using production schema: {}", schema); -} -``` - -### Test Configuration - -Tests use dynamic schemas: - -```yaml -# config.test.yaml -database: - schema: null # Will be set per-test in TestContext -``` - Each test creates its own unique schema at runtime. ## Code Structure @@ -259,11 +227,7 @@ impl Database { Self::validate_schema_name(&schema)?; // Log schema usage - if schema != "attune" { - warn!("Using non-standard schema: '{}'", schema); - } else { - info!("Using production schema: {}", schema); - } + info!("Using schema: {}", schema); // Create pool with search_path hook let pool = PgPoolOptions::new() @@ -538,4 +502,4 @@ See `docs/plans/schema-per-test-refactor.md` for complete implementation details - [Testing Status](./testing-status.md) - [Running Tests](./running-tests.md) - [Database Architecture](./queue-architecture.md) -- [Configuration Guide](./configuration.md) \ No newline at end of file +- [Configuration Guide](./configuration.md) diff --git a/migrations/20250101000004_trigger_sensor_event_rule.sql b/migrations/20250101000004_trigger_sensor_event_rule.sql index 8204c2f..76a51c8 100644 --- a/migrations/20250101000004_trigger_sensor_event_rule.sql +++ b/migrations/20250101000004_trigger_sensor_event_rule.sql @@ -1,5 +1,5 @@ -- Migration: Event System --- Description: Creates trigger, sensor, event, and rule tables (with webhook_config, is_adhoc from start) +-- Description: Creates trigger, sensor, event, and enforcement tables (with webhook_config, is_adhoc from start) -- Version: 20250101000003 -- ============================================================================ @@ -71,6 +71,7 @@ CREATE TABLE sensor ( trigger BIGINT NOT NULL REFERENCES trigger(id) ON DELETE CASCADE, trigger_ref TEXT NOT NULL, enabled BOOLEAN NOT NULL, + is_adhoc BOOLEAN NOT NULL DEFAULT FALSE, param_schema JSONB, config JSONB, created TIMESTAMPTZ NOT NULL DEFAULT NOW(), @@ -81,6 +82,31 @@ CREATE TABLE sensor ( CONSTRAINT sensor_ref_format CHECK (ref ~ '^[^.]+\.[^.]+$') ); +-- Indexes +CREATE INDEX idx_sensor_ref ON sensor(ref); +CREATE INDEX idx_sensor_pack ON sensor(pack); +CREATE INDEX idx_sensor_runtime ON sensor(runtime); +CREATE INDEX idx_sensor_trigger ON sensor(trigger); +CREATE INDEX idx_sensor_enabled ON sensor(enabled) WHERE enabled = TRUE; +CREATE INDEX idx_sensor_is_adhoc ON sensor(is_adhoc) WHERE is_adhoc = true; +CREATE INDEX idx_sensor_created ON sensor(created DESC); + +-- Trigger +CREATE TRIGGER update_sensor_updated + BEFORE UPDATE ON sensor + FOR EACH ROW + EXECUTE FUNCTION update_updated_column(); + +-- Comments +COMMENT ON TABLE sensor IS 'Sensors monitor for events and create trigger instances'; +COMMENT ON COLUMN sensor.ref IS 'Unique sensor reference (format: pack.name)'; +COMMENT ON COLUMN sensor.label IS 'Human-readable sensor name'; +COMMENT ON COLUMN sensor.entrypoint IS 'Script or command to execute'; +COMMENT ON COLUMN sensor.runtime IS 'Runtime environment for execution'; +COMMENT ON COLUMN sensor.trigger IS 'Trigger type this sensor creates events for'; +COMMENT ON COLUMN sensor.enabled IS 'Whether this sensor is active'; +COMMENT ON COLUMN sensor.is_adhoc IS 'True if sensor was manually created (ad-hoc), false if installed from pack'; + -- ============================================================================ -- EVENT TABLE -- ============================================================================ @@ -173,3 +199,6 @@ COMMENT ON COLUMN enforcement.status IS 'Processing status'; COMMENT ON COLUMN enforcement.payload IS 'Event payload for rule evaluation'; COMMENT ON COLUMN enforcement.condition IS 'Logical operator for conditions (any=OR, all=AND)'; COMMENT ON COLUMN enforcement.conditions IS 'Condition expressions to evaluate'; + +-- Note: Rule table will be created in migration 20250101000006 after action table exists +-- Note: Foreign key constraints for enforcement.rule and event.rule will be added in that migration diff --git a/migrations/20250101000005_action.sql b/migrations/20250101000005_action.sql index cdf9bd3..c96dc71 100644 --- a/migrations/20250101000005_action.sql +++ b/migrations/20250101000005_action.sql @@ -17,6 +17,7 @@ CREATE TABLE action ( runtime BIGINT REFERENCES runtime(id), param_schema JSONB, out_schema JSONB, + is_adhoc BOOLEAN NOT NULL DEFAULT FALSE, created TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated TIMESTAMPTZ NOT NULL DEFAULT NOW(), @@ -25,6 +26,30 @@ CREATE TABLE action ( CONSTRAINT action_ref_format CHECK (ref ~ '^[^.]+\.[^.]+$') ); +-- Indexes +CREATE INDEX idx_action_ref ON action(ref); +CREATE INDEX idx_action_pack ON action(pack); +CREATE INDEX idx_action_runtime ON action(runtime); +CREATE INDEX idx_action_is_adhoc ON action(is_adhoc) WHERE is_adhoc = true; +CREATE INDEX idx_action_created ON action(created DESC); + +-- Trigger +CREATE TRIGGER update_action_updated + BEFORE UPDATE ON action + FOR EACH ROW + EXECUTE FUNCTION update_updated_column(); + +-- Comments +COMMENT ON TABLE action IS 'Actions are executable tasks that can be triggered'; +COMMENT ON COLUMN action.ref IS 'Unique action reference (format: pack.name)'; +COMMENT ON COLUMN action.pack IS 'Pack this action belongs to'; +COMMENT ON COLUMN action.label IS 'Human-readable action name'; +COMMENT ON COLUMN action.entrypoint IS 'Script or command to execute'; +COMMENT ON COLUMN action.runtime IS 'Runtime environment for execution'; +COMMENT ON COLUMN action.param_schema IS 'JSON schema for action parameters'; +COMMENT ON COLUMN action.out_schema IS 'JSON schema for action output'; +COMMENT ON COLUMN action.is_adhoc IS 'True if action was manually created (ad-hoc), false if installed from pack'; + -- ============================================================================ -- Add foreign key constraint for policy table diff --git a/migrations/20250101000006_execution_system.sql b/migrations/20250101000006_execution_system.sql index 045d4c2..236310a 100644 --- a/migrations/20250101000006_execution_system.sql +++ b/migrations/20250101000006_execution_system.sql @@ -1,5 +1,5 @@ -- Migration: Execution System --- Description: Creates execution (with workflow columns) and inquiry tables +-- Description: Creates execution (with workflow columns), inquiry, and rule tables -- Version: 20250101000006 -- ============================================================================ @@ -105,3 +105,76 @@ COMMENT ON COLUMN inquiry.timeout_at IS 'When this inquiry expires'; COMMENT ON COLUMN inquiry.responded_at IS 'When the response was received'; -- ============================================================================ + +-- ============================================================================ +-- RULE TABLE +-- ============================================================================ + +CREATE TABLE rule ( + id BIGSERIAL PRIMARY KEY, + ref TEXT NOT NULL UNIQUE, + pack BIGINT NOT NULL REFERENCES pack(id) ON DELETE CASCADE, + pack_ref TEXT NOT NULL, + label TEXT NOT NULL, + description TEXT NOT NULL, + action BIGINT NOT NULL REFERENCES action(id), + action_ref TEXT NOT NULL, + trigger BIGINT NOT NULL REFERENCES trigger(id), + trigger_ref TEXT NOT NULL, + conditions JSONB NOT NULL DEFAULT '[]'::jsonb, + action_params JSONB DEFAULT '{}'::jsonb, + trigger_params JSONB DEFAULT '{}'::jsonb, + enabled BOOLEAN NOT NULL, + is_adhoc BOOLEAN NOT NULL DEFAULT FALSE, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + -- Constraints + CONSTRAINT rule_ref_lowercase CHECK (ref = LOWER(ref)), + CONSTRAINT rule_ref_format CHECK (ref ~ '^[^.]+\.[^.]+$') +); + +-- Indexes +CREATE INDEX idx_rule_ref ON rule(ref); +CREATE INDEX idx_rule_pack ON rule(pack); +CREATE INDEX idx_rule_action ON rule(action); +CREATE INDEX idx_rule_trigger ON rule(trigger); +CREATE INDEX idx_rule_enabled ON rule(enabled) WHERE enabled = TRUE; +CREATE INDEX idx_rule_is_adhoc ON rule(is_adhoc) WHERE is_adhoc = true; +CREATE INDEX idx_rule_created ON rule(created DESC); +CREATE INDEX idx_rule_trigger_enabled ON rule(trigger, enabled); +CREATE INDEX idx_rule_action_enabled ON rule(action, enabled); +CREATE INDEX idx_rule_pack_enabled ON rule(pack, enabled); +CREATE INDEX idx_rule_action_params_gin ON rule USING GIN (action_params); +CREATE INDEX idx_rule_trigger_params_gin ON rule USING GIN (trigger_params); + +-- Trigger +CREATE TRIGGER update_rule_updated + BEFORE UPDATE ON rule + FOR EACH ROW + EXECUTE FUNCTION update_updated_column(); + +-- Comments +COMMENT ON TABLE rule IS 'Rules link triggers to actions with conditions'; +COMMENT ON COLUMN rule.ref IS 'Unique rule reference (format: pack.name)'; +COMMENT ON COLUMN rule.label IS 'Human-readable rule name'; +COMMENT ON COLUMN rule.action IS 'Action to execute when rule triggers'; +COMMENT ON COLUMN rule.trigger IS 'Trigger that activates this rule'; +COMMENT ON COLUMN rule.conditions IS 'Condition expressions to evaluate before executing action'; +COMMENT ON COLUMN rule.action_params IS 'Parameter overrides for the action'; +COMMENT ON COLUMN rule.trigger_params IS 'Parameter overrides for the trigger'; +COMMENT ON COLUMN rule.enabled IS 'Whether this rule is active'; +COMMENT ON COLUMN rule.is_adhoc IS 'True if rule was manually created (ad-hoc), false if installed from pack'; + +-- ============================================================================ + +-- Add foreign key constraints now that rule table exists +ALTER TABLE enforcement + ADD CONSTRAINT enforcement_rule_fkey + FOREIGN KEY (rule) REFERENCES rule(id) ON DELETE SET NULL; + +ALTER TABLE event + ADD CONSTRAINT event_rule_fkey + FOREIGN KEY (rule) REFERENCES rule(id) ON DELETE SET NULL; + +-- ============================================================================ diff --git a/migrations/20250101000013_notify_triggers.sql b/migrations/20250101000013_notify_triggers.sql index 812ed04..78a3d62 100644 --- a/migrations/20250101000013_notify_triggers.sql +++ b/migrations/20250101000013_notify_triggers.sql @@ -6,36 +6,100 @@ -- EXECUTION CHANGE NOTIFICATION -- ============================================================================ --- Function to notify on execution changes -CREATE OR REPLACE FUNCTION notify_execution_change() +-- Function to notify on execution creation +CREATE OR REPLACE FUNCTION notify_execution_created() RETURNS TRIGGER AS $$ DECLARE payload JSON; + enforcement_rule_ref TEXT; + enforcement_trigger_ref TEXT; BEGIN + -- Lookup enforcement details if this execution is linked to an enforcement + IF NEW.enforcement IS NOT NULL THEN + SELECT rule_ref, trigger_ref + INTO enforcement_rule_ref, enforcement_trigger_ref + FROM enforcement + WHERE id = NEW.enforcement; + END IF; + payload := json_build_object( + 'entity_type', 'execution', + 'entity_id', NEW.id, 'id', NEW.id, - 'ref', NEW.ref, + 'action_id', NEW.action, 'action_ref', NEW.action_ref, 'status', NEW.status, - 'rule', NEW.rule, - 'rule_ref', NEW.rule_ref, + 'enforcement', NEW.enforcement, + 'rule_ref', enforcement_rule_ref, + 'trigger_ref', enforcement_trigger_ref, + 'parent', NEW.parent, + 'result', NEW.result, 'created', NEW.created, 'updated', NEW.updated ); - PERFORM pg_notify('execution_change', payload::text); + PERFORM pg_notify('execution_created', payload::text); RETURN NEW; END; $$ LANGUAGE plpgsql; --- Trigger on execution table -CREATE TRIGGER execution_change_notify - AFTER INSERT OR UPDATE ON execution - FOR EACH ROW - EXECUTE FUNCTION notify_execution_change(); +-- Function to notify on execution status changes +CREATE OR REPLACE FUNCTION notify_execution_status_changed() +RETURNS TRIGGER AS $$ +DECLARE + payload JSON; + enforcement_rule_ref TEXT; + enforcement_trigger_ref TEXT; +BEGIN + -- Only notify on updates, not inserts + IF TG_OP = 'UPDATE' AND OLD.status IS DISTINCT FROM NEW.status THEN + -- Lookup enforcement details if this execution is linked to an enforcement + IF NEW.enforcement IS NOT NULL THEN + SELECT rule_ref, trigger_ref + INTO enforcement_rule_ref, enforcement_trigger_ref + FROM enforcement + WHERE id = NEW.enforcement; + END IF; -COMMENT ON FUNCTION notify_execution_change() IS 'Sends execution change notifications via PostgreSQL LISTEN/NOTIFY'; + payload := json_build_object( + 'entity_type', 'execution', + 'entity_id', NEW.id, + 'id', NEW.id, + 'action_id', NEW.action, + 'action_ref', NEW.action_ref, + 'status', NEW.status, + 'old_status', OLD.status, + 'enforcement', NEW.enforcement, + 'rule_ref', enforcement_rule_ref, + 'trigger_ref', enforcement_trigger_ref, + 'parent', NEW.parent, + 'result', NEW.result, + 'created', NEW.created, + 'updated', NEW.updated + ); + + PERFORM pg_notify('execution_status_changed', payload::text); + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Trigger on execution table for creation +CREATE TRIGGER execution_created_notify + AFTER INSERT ON execution + FOR EACH ROW + EXECUTE FUNCTION notify_execution_created(); + +-- Trigger on execution table for status changes +CREATE TRIGGER execution_status_changed_notify + AFTER UPDATE ON execution + FOR EACH ROW + EXECUTE FUNCTION notify_execution_status_changed(); + +COMMENT ON FUNCTION notify_execution_created() IS 'Sends execution creation notifications via PostgreSQL LISTEN/NOTIFY'; +COMMENT ON FUNCTION notify_execution_status_changed() IS 'Sends execution status change notifications via PostgreSQL LISTEN/NOTIFY'; -- ============================================================================ -- EVENT CREATION NOTIFICATION @@ -48,11 +112,16 @@ DECLARE payload JSON; BEGIN payload := json_build_object( + 'entity_type', 'event', + 'entity_id', NEW.id, 'id', NEW.id, - 'ref', NEW.ref, + 'trigger', NEW.trigger, 'trigger_ref', NEW.trigger_ref, + 'source', NEW.source, + 'source_ref', NEW.source_ref, 'rule', NEW.rule, 'rule_ref', NEW.rule_ref, + 'payload', NEW.payload, 'created', NEW.created ); @@ -74,31 +143,146 @@ COMMENT ON FUNCTION notify_event_created() IS 'Sends event creation notification -- ENFORCEMENT CHANGE NOTIFICATION -- ============================================================================ --- Function to notify on enforcement changes -CREATE OR REPLACE FUNCTION notify_enforcement_change() +-- Function to notify on enforcement creation +CREATE OR REPLACE FUNCTION notify_enforcement_created() RETURNS TRIGGER AS $$ DECLARE payload JSON; BEGIN payload := json_build_object( + 'entity_type', 'enforcement', + 'entity_id', NEW.id, 'id', NEW.id, - 'ref', NEW.ref, + 'rule', NEW.rule, 'rule_ref', NEW.rule_ref, + 'trigger_ref', NEW.trigger_ref, + 'event', NEW.event, 'status', NEW.status, + 'condition', NEW.condition, + 'conditions', NEW.conditions, + 'config', NEW.config, + 'payload', NEW.payload, 'created', NEW.created, 'updated', NEW.updated ); - PERFORM pg_notify('enforcement_change', payload::text); + PERFORM pg_notify('enforcement_created', payload::text); RETURN NEW; END; $$ LANGUAGE plpgsql; -- Trigger on enforcement table -CREATE TRIGGER enforcement_change_notify - AFTER INSERT OR UPDATE ON enforcement +CREATE TRIGGER enforcement_created_notify + AFTER INSERT ON enforcement FOR EACH ROW - EXECUTE FUNCTION notify_enforcement_change(); + EXECUTE FUNCTION notify_enforcement_created(); -COMMENT ON FUNCTION notify_enforcement_change() IS 'Sends enforcement change notifications via PostgreSQL LISTEN/NOTIFY'; +COMMENT ON FUNCTION notify_enforcement_created() IS 'Sends enforcement creation notifications via PostgreSQL LISTEN/NOTIFY'; + +-- ============================================================================ +-- INQUIRY NOTIFICATIONS +-- ============================================================================ + +-- Function to notify on inquiry creation +CREATE OR REPLACE FUNCTION notify_inquiry_created() +RETURNS TRIGGER AS $$ +DECLARE + payload JSON; +BEGIN + payload := json_build_object( + 'entity_type', 'inquiry', + 'entity_id', NEW.id, + 'id', NEW.id, + 'execution', NEW.execution, + 'status', NEW.status, + 'ttl', NEW.ttl, + 'created', NEW.created + ); + + PERFORM pg_notify('inquiry_created', payload::text); + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Function to notify on inquiry response +CREATE OR REPLACE FUNCTION notify_inquiry_responded() +RETURNS TRIGGER AS $$ +DECLARE + payload JSON; +BEGIN + -- Only notify when status changes to 'responded' + IF TG_OP = 'UPDATE' AND NEW.status = 'responded' AND OLD.status != 'responded' THEN + payload := json_build_object( + 'entity_type', 'inquiry', + 'entity_id', NEW.id, + 'id', NEW.id, + 'execution', NEW.execution, + 'status', NEW.status, + 'response', NEW.response, + 'updated', NEW.updated + ); + + PERFORM pg_notify('inquiry_responded', payload::text); + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Trigger on inquiry table for creation +CREATE TRIGGER inquiry_created_notify + AFTER INSERT ON inquiry + FOR EACH ROW + EXECUTE FUNCTION notify_inquiry_created(); + +-- Trigger on inquiry table for responses +CREATE TRIGGER inquiry_responded_notify + AFTER UPDATE ON inquiry + FOR EACH ROW + EXECUTE FUNCTION notify_inquiry_responded(); + +COMMENT ON FUNCTION notify_inquiry_created() IS 'Sends inquiry creation notifications via PostgreSQL LISTEN/NOTIFY'; +COMMENT ON FUNCTION notify_inquiry_responded() IS 'Sends inquiry response notifications via PostgreSQL LISTEN/NOTIFY'; + +-- ============================================================================ +-- WORKFLOW EXECUTION NOTIFICATIONS +-- ============================================================================ + +-- Function to notify on workflow execution status changes +CREATE OR REPLACE FUNCTION notify_workflow_execution_status_changed() +RETURNS TRIGGER AS $$ +DECLARE + payload JSON; +BEGIN + -- Only notify for workflow executions when status changes + IF TG_OP = 'UPDATE' AND NEW.is_workflow = true AND OLD.status IS DISTINCT FROM NEW.status THEN + payload := json_build_object( + 'entity_type', 'execution', + 'entity_id', NEW.id, + 'id', NEW.id, + 'action_ref', NEW.action_ref, + 'status', NEW.status, + 'old_status', OLD.status, + 'workflow_def', NEW.workflow_def, + 'parent', NEW.parent, + 'created', NEW.created, + 'updated', NEW.updated + ); + + PERFORM pg_notify('workflow_execution_status_changed', payload::text); + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Trigger on execution table for workflow status changes +CREATE TRIGGER workflow_execution_status_changed_notify + AFTER UPDATE ON execution + FOR EACH ROW + WHEN (NEW.is_workflow = true) + EXECUTE FUNCTION notify_workflow_execution_status_changed(); + +COMMENT ON FUNCTION notify_workflow_execution_status_changed() IS 'Sends workflow execution status change notifications via PostgreSQL LISTEN/NOTIFY'; diff --git a/migrations/20250101000014_worker_table.sql b/migrations/20250101000014_worker_table.sql new file mode 100644 index 0000000..b1bf09a --- /dev/null +++ b/migrations/20250101000014_worker_table.sql @@ -0,0 +1,56 @@ +-- Migration: Worker Table +-- Description: Creates worker table for tracking worker registration and heartbeat +-- Version: 20250101000014 + +-- ============================================================================ +-- WORKER TABLE +-- ============================================================================ + +CREATE TABLE worker ( + id BIGSERIAL PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + worker_type worker_type_enum NOT NULL, + worker_role worker_role_enum NOT NULL, + runtime BIGINT REFERENCES runtime(id) ON DELETE SET NULL, + host TEXT, + port INTEGER, + status worker_status_enum NOT NULL DEFAULT 'active', + capabilities JSONB, + meta JSONB, + last_heartbeat TIMESTAMPTZ, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Indexes +CREATE INDEX idx_worker_name ON worker(name); +CREATE INDEX idx_worker_type ON worker(worker_type); +CREATE INDEX idx_worker_role ON worker(worker_role); +CREATE INDEX idx_worker_runtime ON worker(runtime); +CREATE INDEX idx_worker_status ON worker(status); +CREATE INDEX idx_worker_last_heartbeat ON worker(last_heartbeat DESC) WHERE last_heartbeat IS NOT NULL; +CREATE INDEX idx_worker_created ON worker(created DESC); +CREATE INDEX idx_worker_status_role ON worker(status, worker_role); +CREATE INDEX idx_worker_capabilities_gin ON worker USING GIN (capabilities); +CREATE INDEX idx_worker_meta_gin ON worker USING GIN (meta); + +-- Trigger +CREATE TRIGGER update_worker_updated + BEFORE UPDATE ON worker + FOR EACH ROW + EXECUTE FUNCTION update_updated_column(); + +-- Comments +COMMENT ON TABLE worker IS 'Worker registration and tracking table for action and sensor workers'; +COMMENT ON COLUMN worker.name IS 'Unique worker identifier (typically hostname-based)'; +COMMENT ON COLUMN worker.worker_type IS 'Worker deployment type (local or remote)'; +COMMENT ON COLUMN worker.worker_role IS 'Worker role (action or sensor)'; +COMMENT ON COLUMN worker.runtime IS 'Runtime environment this worker supports (optional)'; +COMMENT ON COLUMN worker.host IS 'Worker host address'; +COMMENT ON COLUMN worker.port IS 'Worker port number'; +COMMENT ON COLUMN worker.status IS 'Worker operational status'; +COMMENT ON COLUMN worker.capabilities IS 'Worker capabilities (e.g., max_concurrent_executions, supported runtimes)'; +COMMENT ON COLUMN worker.meta IS 'Additional worker metadata'; +COMMENT ON COLUMN worker.last_heartbeat IS 'Timestamp of last heartbeat from worker'; + +-- ============================================================================ diff --git a/packs/core/sensors/attune-core-timer-sensor b/packs/core/sensors/attune-core-timer-sensor new file mode 100755 index 0000000..ea95be3 Binary files /dev/null and b/packs/core/sensors/attune-core-timer-sensor differ diff --git a/web/src/hooks/useEnforcementStream.ts b/web/src/hooks/useEnforcementStream.ts index 01887b4..8942419 100644 --- a/web/src/hooks/useEnforcementStream.ts +++ b/web/src/hooks/useEnforcementStream.ts @@ -87,10 +87,8 @@ export function useEnforcementStream( return; } - // Extract enforcement data from notification payload - // The payload has a nested "data" field with the actual enforcement data - const enforcementData = - (notification.payload as any).data || notification.payload; + // Extract enforcement data from notification payload (flat structure) + const enforcementData = notification.payload as any; // Update specific enforcement query if it exists queryClient.setQueryData( diff --git a/web/src/hooks/useExecutionStream.ts b/web/src/hooks/useExecutionStream.ts index 6611568..e907222 100644 --- a/web/src/hooks/useExecutionStream.ts +++ b/web/src/hooks/useExecutionStream.ts @@ -88,10 +88,8 @@ export function useExecutionStream(options: UseExecutionStreamOptions = {}) { return; } - // Extract execution data from notification payload - // The payload has a nested "data" field with the actual execution data - const executionData = - (notification.payload as any).data || notification.payload; + // Extract execution data from notification payload (flat structure) + const executionData = notification.payload as any; // Update specific execution query if it exists queryClient.setQueryData( diff --git a/web/src/pages/events/EventsPage.tsx b/web/src/pages/events/EventsPage.tsx index 6012563..2b92bdd 100644 --- a/web/src/pages/events/EventsPage.tsx +++ b/web/src/pages/events/EventsPage.tsx @@ -17,65 +17,60 @@ export default function EventsPage() { // Set up WebSocket for real-time event updates with stable callback const handleEventNotification = useCallback( (notification: Notification) => { - // Extract event data from notification payload - if ( - notification.notification_type === "event_created" && - notification.payload - ) { - const eventData = (notification.payload as any).data; + // Extract event data from notification payload (flat structure) + if (notification.notification_type === "event_created") { + const payload = notification.payload as any; - if (eventData) { - // Create EventSummary from notification data - const newEvent: EventSummary = { - id: eventData.id, - trigger: eventData.trigger, - trigger_ref: eventData.trigger_ref, - rule: eventData.rule, - rule_ref: eventData.rule_ref, - source: eventData.source, - source_ref: eventData.source_ref, - has_payload: - eventData.payload !== null && eventData.payload !== undefined, - created: eventData.created, - }; + // Create EventSummary from notification data + const newEvent: EventSummary = { + id: payload.id, + trigger: payload.trigger, + trigger_ref: payload.trigger_ref, + rule: payload.rule, + rule_ref: payload.rule_ref, + source: payload.source, + source_ref: payload.source_ref, + has_payload: + payload.payload !== null && payload.payload !== undefined, + created: payload.created, + }; - // Update the query cache directly instead of invalidating - queryClient.setQueryData( - [ - "events", - { page, pageSize, triggerRef: triggerFilter || undefined }, - ], - (oldData: any) => { - if (!oldData) return oldData; + // Update the query cache directly instead of invalidating + queryClient.setQueryData( + [ + "events", + { page, pageSize, triggerRef: triggerFilter || undefined }, + ], + (oldData: any) => { + if (!oldData) return oldData; - // Check if filtering and event matches filter - if (triggerFilter && newEvent.trigger_ref !== triggerFilter) { - return oldData; - } + // Check if filtering and event matches filter + if (triggerFilter && newEvent.trigger_ref !== triggerFilter) { + return oldData; + } - // Add new event to the beginning of the list if on first page - if (page === 1) { - return { - ...oldData, - data: [newEvent, ...oldData.data].slice(0, pageSize), - pagination: { - ...oldData.pagination, - total_items: (oldData.pagination?.total_items || 0) + 1, - }, - }; - } - - // For other pages, just update the total count + // Add new event to the beginning of the list if on first page + if (page === 1) { return { ...oldData, + data: [newEvent, ...oldData.data].slice(0, pageSize), pagination: { ...oldData.pagination, total_items: (oldData.pagination?.total_items || 0) + 1, }, }; - }, - ); - } + } + + // For other pages, just update the total count + return { + ...oldData, + pagination: { + ...oldData.pagination, + total_items: (oldData.pagination?.total_items || 0) + 1, + }, + }; + }, + ); } }, [queryClient, page, pageSize, triggerFilter], diff --git a/work-summary/2026-02-05-pack-installation-consolidation.md b/work-summary/2026-02-05-pack-installation-consolidation.md new file mode 100644 index 0000000..5c998c8 --- /dev/null +++ b/work-summary/2026-02-05-pack-installation-consolidation.md @@ -0,0 +1,344 @@ +# Pack Installation Consolidation - Work Summary + +**Date:** 2026-02-05 +**Type:** Schema Simplification (Pre-Production) +**Status:** ✅ Complete + +--- + +## Overview + +Consolidated the separate `pack_installation` table into the `pack` table by adding nullable installation metadata columns. This simplifies the schema by eliminating an unnecessary 1:1 relationship table, reducing joins and making the data model more intuitive. + +--- + +## Problem Statement + +The `pack_installation` table tracked installation metadata (source type, URL, checksum, storage path, etc.) in a separate table with a 1:1 relationship to the `pack` table. This design: + +- Required joins to retrieve complete pack information +- Added unnecessary complexity to queries +- Created a separate repository layer for what was essentially pack metadata +- Had no use case for multiple installation records per pack + +The relationship was strictly 1:1 (one installation record per pack), making it a perfect candidate for denormalization. + +--- + +## Solution + +Merged installation metadata directly into the `pack` table as nullable columns. Packs that are not installed will have these fields as NULL. + +--- + +## Changes Made + +### 1. Database Migration (`migrations/20250101000002_pack_system.sql`) + +**Added columns to `pack` table:** +```sql +-- Installation metadata (nullable for non-installed packs) +source_type TEXT, +source_url TEXT, +source_ref TEXT, +checksum TEXT, +checksum_verified BOOLEAN DEFAULT FALSE, +installed_at TIMESTAMPTZ, +installed_by BIGINT, +installation_method TEXT, +storage_path TEXT, +``` + +**Added indexes:** +```sql +CREATE INDEX idx_pack_installed_at ON pack(installed_at DESC) WHERE installed_at IS NOT NULL; +CREATE INDEX idx_pack_installed_by ON pack(installed_by) WHERE installed_by IS NOT NULL; +CREATE INDEX idx_pack_source_type ON pack(source_type) WHERE source_type IS NOT NULL; +``` + +**Added foreign key constraint** (in `migrations/20250101000003_identity_and_auth.sql`): +```sql +ALTER TABLE pack + ADD CONSTRAINT fk_pack_installed_by + FOREIGN KEY (installed_by) + REFERENCES identity(id) + ON DELETE SET NULL; +``` + +### 2. Model Changes (`crates/common/src/models.rs`) + +**Updated `Pack` struct:** +- Added installation metadata fields (all `Option` types) +- Removed `PackInstallation` struct entirely +- Removed `CreatePackInstallation` struct + +**New fields in Pack:** +```rust +pub source_type: Option, +pub source_url: Option, +pub source_ref: Option, +pub checksum: Option, +pub checksum_verified: Option, +pub installed_at: Option>, +pub installed_by: Option, +pub installation_method: Option, +pub storage_path: Option, +``` + +### 3. Repository Changes + +**Removed:** +- `crates/common/src/repositories/pack_installation.rs` (entire file deleted) +- `PackInstallationRepository` from `repositories/mod.rs` + +**Updated `PackRepository` (`crates/common/src/repositories/pack.rs`):** + +**Added new method:** +```rust +pub async fn update_installation_metadata( + executor: E, + id: i64, + source_type: String, + source_url: Option, + source_ref: Option, + checksum: Option, + checksum_verified: bool, + installed_by: Option, + installation_method: String, + storage_path: String, +) -> Result +``` + +**Added helper methods:** +- `is_installed()` - Check if a pack has installation metadata +- `list_installed()` - List all installed packs +- `list_by_source_type()` - Filter packs by installation source + +**Updated all SELECT queries** to include installation fields in: +- `find_by_id()` +- `find_by_ref()` +- `list()` +- `create()` +- `update()` +- `list_paginated()` +- `find_by_tag()` +- `find_standard()` +- `search()` + +**Updated input structs:** +- Added `installers: JsonDict` to `CreatePackInput` +- Added `installers: Option` to `UpdatePackInput` + +### 4. API Route Changes (`crates/api/src/routes/packs.rs`) + +**Updated `install_pack` endpoint:** +- Removed `PackInstallationRepository::new()` and `create()` calls +- Replaced with direct call to `PackRepository::update_installation_metadata()` + +**Before:** +```rust +let installation_metadata = CreatePackInstallation { ... }; +installation_repo.create(installation_metadata).await?; +``` + +**After:** +```rust +PackRepository::update_installation_metadata( + &state.db, + pack_id, + source_type.to_string(), + source_url, + source_ref, + checksum.clone(), + installed.checksum.is_some() && checksum.is_some(), + user_id, + "api".to_string(), + final_path.to_string_lossy().to_string(), +).await?; +``` + +### 5. Test Updates + +**Updated all test files** to include `installers: json!({})` in `CreatePackInput`: +- `crates/api/tests/helpers.rs` +- `crates/api/tests/sse_execution_stream_tests.rs` +- `crates/api/tests/webhook_api_tests.rs` +- `crates/api/tests/webhook_security_tests.rs` +- `crates/api/tests/pack_registry_tests.rs` +- `crates/common/tests/helpers.rs` +- `crates/common/tests/pack_repository_tests.rs` +- `crates/common/tests/permission_repository_tests.rs` +- `crates/common/tests/repository_runtime_tests.rs` +- `crates/executor/tests/fifo_ordering_integration_test.rs` +- `crates/executor/tests/policy_enforcer_tests.rs` + +**Updated pack registry tests** to use `Pack` fields instead of `PackInstallation`: +```rust +// Before +let installation = installation_repo.get_by_pack_id(pack_id).await?; +assert_eq!(installation.source_type, "local_directory"); + +// After +let pack = PackRepository::find_by_id(&ctx.pool, pack_id).await?; +assert_eq!(pack.source_type.as_deref(), Some("local_directory")); +``` + +### 6. Migration Schema Fixes (Missing Columns) + +Fixed missing `is_adhoc` columns and rule table during migration consolidation: + +**Issues Found:** +- `action.is_adhoc` column was missing from migration +- `sensor.is_adhoc` column was missing from migration +- `rule` table was completely missing from migrations +- `ActionRepository::update()` missing `is_adhoc` in RETURNING clause + +**Fixes:** +- Added `is_adhoc BOOLEAN NOT NULL DEFAULT FALSE` to action table (migration 005) +- Added `is_adhoc BOOLEAN NOT NULL DEFAULT FALSE` to sensor table (migration 004) +- Created complete rule table with `is_adhoc` in migration 006 (after action exists) +- Added foreign key constraints for `enforcement.rule` and `event.rule` +- Updated `ActionRepository::update()` RETURNING clause to include `is_adhoc` +- Added proper indexes, triggers, and comments for all tables + +### 7. CLI Test Fixes (Unrelated but Fixed) + +Fixed failing `whoami` tests in `crates/cli/tests/test_auth.rs`: + +**Issue:** Mock endpoint path was `/auth/whoami` but actual API uses `/auth/me` + +**Fixes:** +- Updated `mock_whoami_success()` to use `/auth/me` path +- Fixed mock response structure to match `CurrentUserResponse` (removed extra fields) +- Changed test assertions from `"username"` to `"login"` +- Changed parameter from `email` to `display_name` + +--- + +## Breaking Changes + +**Note:** This is a pre-production change with no deployments or users, so breaking changes are acceptable. + +- Database schema change: `pack_installation` table removed, new columns added to `pack` +- Model API change: `PackInstallation` and `CreatePackInstallation` types removed +- Repository API change: `PackInstallationRepository` removed, new methods added to `PackRepository` + +--- + +## Benefits + +1. **Simpler Schema:** One less table to manage (17 tables instead of 18) +2. **No Joins Required:** All pack information available in a single query +3. **Clearer Data Model:** Installation is a property of a pack, not a separate entity +4. **Reduced Code:** Eliminated ~170 lines of repository code +5. **Better Performance:** Fewer joins, simpler queries, partial indexes on nullable fields + +--- + +## Migration Notes + +For users migrating from the old schema (when v1.0 releases): + +1. Drop and recreate database (acceptable since pre-production) +2. Run consolidated migrations from scratch +3. Reload pack data using pack installation API + +--- + +## Testing + +- ✅ All workspace compilation successful +- ✅ Pack registry tests updated and passing +- ✅ CLI auth tests fixed and passing +- ✅ No compilation warnings + +--- + +## Files Modified + +### Migrations (4 files) +- `migrations/20250101000002_pack_system.sql` - Added installation columns to pack table +- `migrations/20250101000003_identity_and_auth.sql` - Added foreign key constraint +- `migrations/20250101000004_trigger_sensor_event_rule.sql` - Added is_adhoc to sensor, added indexes/triggers/comments +- `migrations/20250101000005_action.sql` - Added is_adhoc to action, added indexes/triggers/comments +- `migrations/20250101000006_execution_system.sql` - Added complete rule table with is_adhoc and foreign key constraints + +### Models (1 file) +- `crates/common/src/models.rs` - Updated Pack struct, removed PackInstallation module + +### Repositories (3 files) +- `crates/common/src/repositories/pack.rs` - Added installation methods, updated all queries +- `crates/common/src/repositories/action.rs` - Fixed update() RETURNING clause to include is_adhoc +- `crates/common/src/repositories/mod.rs` - Removed PackInstallationRepository export +- **Deleted:** `crates/common/src/repositories/pack_installation.rs` + +### API Routes (1 file) +- `crates/api/src/routes/packs.rs` - Updated install_pack to use new repository method + +### Tests (14 files) +- `crates/api/tests/helpers.rs` +- `crates/api/tests/pack_registry_tests.rs` +- `crates/api/tests/sse_execution_stream_tests.rs` +- `crates/api/tests/webhook_api_tests.rs` +- `crates/api/tests/webhook_security_tests.rs` +- `crates/common/tests/helpers.rs` +- `crates/common/tests/pack_repository_tests.rs` +- `crates/common/tests/permission_repository_tests.rs` +- `crates/common/tests/repository_runtime_tests.rs` +- `crates/executor/tests/fifo_ordering_integration_test.rs` +- `crates/executor/tests/policy_enforcer_tests.rs` +- `crates/cli/tests/common/mod.rs` - Fixed whoami mock +- `crates/cli/tests/test_auth.rs` - Fixed whoami tests + +**Total:** 22 files modified, 1 file deleted + +--- + +## Verification Steps + +To verify the changes after dropping and recreating the database: + +```bash +# 1. Drop and recreate databases +make db-reset +make db-test-setup + +# 2. Run migrations +make db-migrate + +# 3. Verify schema +psql attune -c "\d pack" | grep -E "installed_at|source_type|storage_path" + +# 4. Run tests +cargo test --workspace + +# 5. Test pack installation via API +# (Start services and test via web UI or CLI) +``` + +--- + +## Related Documentation + +- Migration consolidation was completed on 2026-01-17 +- This change continues the pre-production schema refinement effort +- See `docs/migrations/CONSOLIDATION-COMPLETE.md` for full migration history + +--- + +## Lessons Learned + +1. **Migration consolidation requires careful verification** - Missing tables/columns can slip through when consolidating migrations +2. **Test everything after schema changes** - Running repository tests revealed missing is_adhoc columns +3. **Dependency ordering matters** - Rule table needed to be in migration 006 after action table (migration 005) +4. **RETURNING clauses must be complete** - All model fields must be included in UPDATE...RETURNING queries + +## Next Steps + +1. ✅ Drop existing databases and re-run migrations +2. ✅ Fix missing is_adhoc columns in migrations +3. ✅ Add missing rule table to migrations +4. ✅ Verify all repository tests pass +5. Test pack installation workflow end-to-end +6. Update any documentation referencing `pack_installation` table +7. Consider similar consolidations for other 1:1 relationships if any exist \ No newline at end of file diff --git a/work-summary/sessions/2025-01-30-schema-config-fix.md b/work-summary/sessions/2025-01-30-schema-config-fix.md index bcc914e..17660d8 100644 --- a/work-summary/sessions/2025-01-30-schema-config-fix.md +++ b/work-summary/sessions/2025-01-30-schema-config-fix.md @@ -102,7 +102,6 @@ Configuration: • Database schema: public # Services now show: -WARN Using non-standard schema: 'public'. Production should use 'attune' INFO Connecting to database with max_connections=50, schema=public ``` @@ -193,4 +192,4 @@ Consider adding: 1. Validation warning if config file doesn't exist 2. Display full resolved configuration on startup (with sensitive values masked) 3. Config validation command: `attune config validate --file config.yaml` -4. Environment variable to completely disable environment-specific config loading \ No newline at end of file +4. Environment variable to completely disable environment-specific config loading