diff --git a/crates/common/src/repositories/event.rs b/crates/common/src/repositories/event.rs index 0344b20..086a031 100644 --- a/crates/common/src/repositories/event.rs +++ b/crates/common/src/repositories/event.rs @@ -416,8 +416,43 @@ impl Update for EnforcementRepository { where E: Executor<'e, Database = Postgres> + 'e, { - // Build update query + if input.status.is_none() && input.payload.is_none() && input.resolved_at.is_none() { + return Self::get_by_id(executor, id).await; + } + Self::update_with_locator(executor, input, |query| { + query.push(" WHERE id = "); + query.push_bind(id); + }) + .await + } +} + +#[async_trait::async_trait] +impl Delete for EnforcementRepository { + async fn delete<'e, E>(executor: E, id: i64) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { + let result = sqlx::query("DELETE FROM enforcement WHERE id = $1") + .bind(id) + .execute(executor) + .await?; + + Ok(result.rows_affected() > 0) + } +} + +impl EnforcementRepository { + async fn update_with_locator<'e, E, F>( + executor: E, + input: UpdateEnforcementInput, + where_clause: F, + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + F: FnOnce(&mut QueryBuilder<'_, Postgres>), + { let mut query = QueryBuilder::new("UPDATE enforcement SET "); let mut has_updates = false; @@ -442,17 +477,13 @@ impl Update for EnforcementRepository { } query.push("resolved_at = "); query.push_bind(resolved_at); - has_updates = true; } - if !has_updates { - // No updates requested, fetch and return existing entity - return Self::get_by_id(executor, id).await; - } - - query.push(" WHERE id = "); - query.push_bind(id); - query.push(" RETURNING id, rule, rule_ref, trigger_ref, config, event, status, payload, condition, conditions, created, resolved_at"); + where_clause(&mut query); + query.push( + " RETURNING id, rule, rule_ref, trigger_ref, config, event, status, payload, \ + condition, conditions, created, resolved_at", + ); let enforcement = query .build_query_as::() @@ -461,24 +492,37 @@ impl Update for EnforcementRepository { Ok(enforcement) } -} -#[async_trait::async_trait] -impl Delete for EnforcementRepository { - async fn delete<'e, E>(executor: E, id: i64) -> Result + /// Update an enforcement using the loaded row's hypertable keys. + /// + /// This avoids wide scans across compressed chunks by including both the + /// partitioning column (`created`) and compression segment key (`rule_ref`) + /// in the locator. + pub async fn update_loaded<'e, E>( + executor: E, + enforcement: &Enforcement, + input: UpdateEnforcementInput, + ) -> Result where E: Executor<'e, Database = Postgres> + 'e, { - let result = sqlx::query("DELETE FROM enforcement WHERE id = $1") - .bind(id) - .execute(executor) - .await?; + if input.status.is_none() && input.payload.is_none() && input.resolved_at.is_none() { + return Ok(enforcement.clone()); + } - Ok(result.rows_affected() > 0) + let rule_ref = enforcement.rule_ref.clone(); + + Self::update_with_locator(executor, input, |query| { + query.push(" WHERE id = "); + query.push_bind(enforcement.id); + query.push(" AND created = "); + query.push_bind(enforcement.created); + query.push(" AND rule_ref = "); + query.push_bind(rule_ref); + }) + .await } -} -impl EnforcementRepository { /// Find enforcements by rule ID pub async fn find_by_rule<'e, E>(executor: E, rule_id: Id) -> Result> where diff --git a/crates/common/src/repositories/execution.rs b/crates/common/src/repositories/execution.rs index 0dba547..574696f 100644 --- a/crates/common/src/repositories/execution.rs +++ b/crates/common/src/repositories/execution.rs @@ -191,7 +191,33 @@ impl Update for ExecutionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - // Build update query + if input.status.is_none() + && input.result.is_none() + && input.executor.is_none() + && input.worker.is_none() + && input.started_at.is_none() + && input.workflow_task.is_none() + { + return Self::get_by_id(executor, id).await; + } + + Self::update_with_locator(executor, input, |query| { + query.push(" WHERE id = ").push_bind(id); + }) + .await + } +} + +impl ExecutionRepository { + async fn update_with_locator<'e, E, F>( + executor: E, + input: UpdateExecutionInput, + where_clause: F, + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + F: FnOnce(&mut QueryBuilder<'_, Postgres>), + { let mut query = QueryBuilder::new("UPDATE execution SET "); let mut has_updates = false; @@ -234,15 +260,10 @@ impl Update for ExecutionRepository { query .push("workflow_task = ") .push_bind(sqlx::types::Json(workflow_task)); - has_updates = true; } - if !has_updates { - // No updates requested, fetch and return existing entity - return Self::get_by_id(executor, id).await; - } - - query.push(", updated = NOW() WHERE id = ").push_bind(id); + query.push(", updated = NOW()"); + where_clause(&mut query); query.push(" RETURNING "); query.push(SELECT_COLUMNS); @@ -252,6 +273,38 @@ impl Update for ExecutionRepository { .await .map_err(Into::into) } + + /// Update an execution using the loaded row's hypertable keys. + /// + /// Including both the partition key (`created`) and compression segment key + /// (`action_ref`) avoids broad scans across compressed chunks. + pub async fn update_loaded<'e, E>( + executor: E, + execution: &Execution, + input: UpdateExecutionInput, + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { + if input.status.is_none() + && input.result.is_none() + && input.executor.is_none() + && input.worker.is_none() + && input.started_at.is_none() + && input.workflow_task.is_none() + { + return Ok(execution.clone()); + } + + let action_ref = execution.action_ref.clone(); + + Self::update_with_locator(executor, input, |query| { + query.push(" WHERE id = ").push_bind(execution.id); + query.push(" AND created = ").push_bind(execution.created); + query.push(" AND action_ref = ").push_bind(action_ref); + }) + .await + } } #[async_trait::async_trait] diff --git a/crates/common/tests/enforcement_repository_tests.rs b/crates/common/tests/enforcement_repository_tests.rs index eed7337..38cdeb8 100644 --- a/crates/common/tests/enforcement_repository_tests.rs +++ b/crates/common/tests/enforcement_repository_tests.rs @@ -1430,3 +1430,70 @@ async fn test_enforcement_resolved_at_lifecycle() { assert!(updated.resolved_at.is_some()); assert!(updated.resolved_at.unwrap() >= enforcement.created); } + +#[tokio::test] +#[ignore = "integration test — requires database"] +async fn test_update_loaded_enforcement_uses_loaded_locator() { + let pool = create_test_pool().await.unwrap(); + + let pack = PackFixture::new_unique("targeted_update_pack") + .create(&pool) + .await + .unwrap(); + + let trigger = TriggerFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "webhook") + .create(&pool) + .await + .unwrap(); + + let action = ActionFixture::new_unique(pack.id, &pack.r#ref, "action") + .create(&pool) + .await + .unwrap(); + + use attune_common::repositories::rule::{CreateRuleInput, RuleRepository}; + let rule = RuleRepository::create( + &pool, + CreateRuleInput { + r#ref: format!("{}.test_rule", pack.r#ref), + pack: pack.id, + pack_ref: pack.r#ref.clone(), + label: "Test Rule".to_string(), + description: Some("Test".to_string()), + action: action.id, + action_ref: action.r#ref.clone(), + trigger: trigger.id, + trigger_ref: trigger.r#ref.clone(), + conditions: json!({}), + action_params: json!({}), + trigger_params: json!({}), + enabled: true, + is_adhoc: false, + }, + ) + .await + .unwrap(); + + let enforcement = EnforcementFixture::new_unique(Some(rule.id), &rule.r#ref, &trigger.r#ref) + .create(&pool) + .await + .unwrap(); + + let updated = EnforcementRepository::update_loaded( + &pool, + &enforcement, + UpdateEnforcementInput { + status: Some(EnforcementStatus::Processed), + payload: None, + resolved_at: Some(chrono::Utc::now()), + }, + ) + .await + .unwrap(); + + assert_eq!(updated.id, enforcement.id); + assert_eq!(updated.created, enforcement.created); + assert_eq!(updated.rule_ref, enforcement.rule_ref); + assert_eq!(updated.status, EnforcementStatus::Processed); + assert!(updated.resolved_at.is_some()); +} diff --git a/crates/executor/src/completion_listener.rs b/crates/executor/src/completion_listener.rs index 92da0a6..252bc83 100644 --- a/crates/executor/src/completion_listener.rs +++ b/crates/executor/src/completion_listener.rs @@ -304,7 +304,7 @@ mod tests { use crate::queue_manager::ExecutionQueueManager; #[tokio::test] - async fn test_notify_completion_releases_slot() { + async fn test_release_active_slot_releases_slot() { let queue_manager = Arc::new(ExecutionQueueManager::with_defaults()); let action_id = 1; @@ -320,8 +320,9 @@ mod tests { assert_eq!(stats.queue_length, 0); // Simulate completion notification - let notified = queue_manager.notify_completion(100).await.unwrap(); - assert!(!notified); // No one waiting + let release = queue_manager.release_active_slot(100).await.unwrap(); + assert!(release.is_some()); + assert_eq!(release.unwrap().next_execution_id, None); // Verify slot is released let stats = queue_manager.get_queue_stats(action_id).await.unwrap(); @@ -329,7 +330,7 @@ mod tests { } #[tokio::test] - async fn test_notify_completion_wakes_waiting() { + async fn test_release_active_slot_wakes_waiting() { let queue_manager = Arc::new(ExecutionQueueManager::with_defaults()); let action_id = 1; @@ -357,8 +358,8 @@ mod tests { assert_eq!(stats.queue_length, 1); // Notify completion - let notified = queue_manager.notify_completion(100).await.unwrap(); - assert!(notified); // Should wake the waiting execution + let release = queue_manager.release_active_slot(100).await.unwrap(); + assert_eq!(release.unwrap().next_execution_id, Some(101)); // Wait for queued execution to proceed handle.await.unwrap(); @@ -406,7 +407,11 @@ mod tests { // Release them one by one for execution_id in 100..103 { tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - queue_manager.notify_completion(execution_id).await.unwrap(); + let release = queue_manager + .release_active_slot(execution_id) + .await + .unwrap(); + assert!(release.is_some()); } // Wait for all to complete @@ -425,8 +430,8 @@ mod tests { let execution_id = 999; // Non-existent execution // Should succeed but not notify anyone - let result = queue_manager.notify_completion(execution_id).await; + let result = queue_manager.release_active_slot(execution_id).await; assert!(result.is_ok()); - assert!(!result.unwrap()); + assert!(result.unwrap().is_none()); } } diff --git a/crates/executor/src/enforcement_processor.rs b/crates/executor/src/enforcement_processor.rs index 7b5cfb6..abfbe1d 100644 --- a/crates/executor/src/enforcement_processor.rs +++ b/crates/executor/src/enforcement_processor.rs @@ -19,7 +19,7 @@ use attune_common::{ event::{EnforcementRepository, EventRepository, UpdateEnforcementInput}, execution::{CreateExecutionInput, ExecutionRepository}, rule::RuleRepository, - Create, FindById, Update, + Create, FindById, }, }; @@ -146,9 +146,9 @@ impl EnforcementProcessor { .await?; // Update enforcement status to Processed after successful execution creation - EnforcementRepository::update( + EnforcementRepository::update_loaded( pool, - enforcement_id, + &enforcement, UpdateEnforcementInput { status: Some(EnforcementStatus::Processed), payload: None, @@ -165,9 +165,9 @@ impl EnforcementProcessor { ); // Update enforcement status to Disabled since it was not actionable - EnforcementRepository::update( + EnforcementRepository::update_loaded( pool, - enforcement_id, + &enforcement, UpdateEnforcementInput { status: Some(EnforcementStatus::Disabled), payload: None, diff --git a/crates/executor/src/policy_enforcer.rs b/crates/executor/src/policy_enforcer.rs index dc4b82f..56e9c6f 100644 --- a/crates/executor/src/policy_enforcer.rs +++ b/crates/executor/src/policy_enforcer.rs @@ -432,106 +432,6 @@ impl PolicyEnforcer { self.global_policy.concurrency_limit } - /// Enforce policies and wait in queue if necessary - /// - /// This method combines policy checking with queue management to ensure: - /// 1. Policy violations are detected early - /// 2. FIFO ordering is maintained when capacity is limited - /// 3. Executions wait efficiently for available slots - /// - /// # Arguments - /// * `action_id` - The action to execute - /// * `pack_id` - The pack containing the action - /// * `execution_id` - The execution/enforcement ID for queue tracking - /// - /// # Returns - /// * `Ok(())` - Policy allows execution and queue slot obtained - /// * `Err(PolicyViolation)` - Policy prevents execution - /// * `Err(QueueError)` - Queue timeout or other queue error - #[allow(dead_code)] - pub async fn enforce_and_wait( - &self, - action_id: Id, - pack_id: Option, - execution_id: Id, - config: Option<&JsonValue>, - ) -> Result<()> { - // First, check for policy violations (rate limit, quotas, etc.) - // Note: We skip concurrency check here since queue manages that - if let Some(violation) = self - .check_policies_except_concurrency(action_id, pack_id) - .await? - { - warn!("Policy violation for action {}: {}", action_id, violation); - return Err(anyhow::anyhow!("Policy violation: {}", violation)); - } - - // If queue manager is available, use it for concurrency control - if let Some(concurrency) = self.resolve_concurrency_policy(action_id, pack_id).await? { - let group_key = self.build_parameter_group_key(&concurrency.parameters, config); - - if let Some(queue_manager) = &self.queue_manager { - debug!( - "Applying concurrency policy to execution {} for action {} (limit: {}, method: {:?}, group: {:?})", - execution_id, action_id, concurrency.limit, concurrency.method, group_key - ); - - match concurrency.method { - PolicyMethod::Enqueue => { - queue_manager - .enqueue_and_wait( - action_id, - execution_id, - concurrency.limit, - group_key.clone(), - ) - .await?; - } - PolicyMethod::Cancel => { - let outcome = queue_manager - .try_acquire( - action_id, - execution_id, - concurrency.limit, - group_key.clone(), - ) - .await?; - - if !outcome.acquired { - let violation = PolicyViolation::ConcurrencyLimitExceeded { - limit: concurrency.limit, - current_count: outcome.current_count, - }; - warn!("Policy violation for action {}: {}", action_id, violation); - return Err(anyhow::anyhow!("Policy violation: {}", violation)); - } - } - } - - info!( - "Execution {} obtained queue slot for action {} (group: {:?})", - execution_id, action_id, group_key - ); - } else { - // No queue manager - use legacy polling behavior - debug!( - "No queue manager configured, using legacy policy wait for action {}", - action_id - ); - - let scope = PolicyScope::Action(action_id); - if let Some(violation) = self - .check_concurrency_limit(concurrency.limit, &scope) - .await? - { - return Err(anyhow::anyhow!("Policy violation: {}", violation)); - } - } - } - - Ok(()) - } - /// Check policies except concurrency (which is handled by queue) async fn check_policies_except_concurrency( &self, @@ -899,8 +799,6 @@ fn extract_parameter_value(config: Option<&JsonValue>, path: &str) -> JsonValue #[cfg(test)] mod tests { use super::*; - use crate::queue_manager::QueueConfig; - use tokio::time::{sleep, Duration}; #[test] fn test_policy_violation_display() { @@ -1035,138 +933,6 @@ mod tests { assert_eq!(enforcer.get_concurrency_limit(2, Some(200)), Some(20)); } - #[tokio::test] - async fn test_enforce_and_wait_with_queue_manager() { - let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap(); - let queue_manager = Arc::new(ExecutionQueueManager::with_defaults()); - let mut enforcer = PolicyEnforcer::with_queue_manager(pool, queue_manager.clone()); - - // Set concurrency limit - enforcer.set_action_policy( - 1, - ExecutionPolicy { - concurrency_limit: Some(1), - ..Default::default() - }, - ); - - // First execution should proceed immediately - let result = enforcer.enforce_and_wait(1, None, 100, None).await; - assert!(result.is_ok()); - - // Check queue stats - let stats = queue_manager.get_queue_stats(1).await.unwrap(); - assert_eq!(stats.active_count, 1); - assert_eq!(stats.queue_length, 0); - } - - #[tokio::test] - async fn test_enforce_and_wait_fifo_ordering() { - let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap(); - let queue_manager = Arc::new(ExecutionQueueManager::with_defaults()); - let mut enforcer = PolicyEnforcer::with_queue_manager(pool, queue_manager.clone()); - - enforcer.set_action_policy( - 1, - ExecutionPolicy { - concurrency_limit: Some(1), - ..Default::default() - }, - ); - let enforcer = Arc::new(enforcer); - - // First execution - let result = enforcer.enforce_and_wait(1, None, 100, None).await; - assert!(result.is_ok()); - - // Queue multiple executions - let execution_order = Arc::new(tokio::sync::Mutex::new(Vec::new())); - let mut handles = vec![]; - - for exec_id in 101..=103 { - let enforcer = enforcer.clone(); - let queue_manager = queue_manager.clone(); - let order = execution_order.clone(); - - let handle = tokio::spawn(async move { - enforcer - .enforce_and_wait(1, None, exec_id, None) - .await - .unwrap(); - order.lock().await.push(exec_id); - // Simulate work - sleep(Duration::from_millis(10)).await; - queue_manager.notify_completion(exec_id).await.unwrap(); - }); - - handles.push(handle); - } - - // Give tasks time to queue - sleep(Duration::from_millis(100)).await; - - // Release first execution - queue_manager.notify_completion(100).await.unwrap(); - - // Wait for all - for handle in handles { - handle.await.unwrap(); - } - - // Verify FIFO order - let order = execution_order.lock().await; - assert_eq!(*order, vec![101, 102, 103]); - } - - #[tokio::test] - async fn test_enforce_and_wait_without_queue_manager() { - let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap(); - let mut enforcer = PolicyEnforcer::new(pool); - - // Set unlimited concurrency - enforcer.set_action_policy( - 1, - ExecutionPolicy { - concurrency_limit: None, - ..Default::default() - }, - ); - - // Should work without queue manager (legacy behavior) - let result = enforcer.enforce_and_wait(1, None, 100, None).await; - assert!(result.is_ok()); - } - - #[tokio::test] - async fn test_enforce_and_wait_queue_timeout() { - let config = QueueConfig { - max_queue_length: 100, - queue_timeout_seconds: 1, // Short timeout for test - enable_metrics: true, - }; - - let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap(); - let queue_manager = Arc::new(ExecutionQueueManager::new(config)); - let mut enforcer = PolicyEnforcer::with_queue_manager(pool, queue_manager.clone()); - - // Set concurrency limit - enforcer.set_action_policy( - 1, - ExecutionPolicy { - concurrency_limit: Some(1), - ..Default::default() - }, - ); - - // First execution proceeds - enforcer.enforce_and_wait(1, None, 100, None).await.unwrap(); - - // Second execution should timeout - let result = enforcer.enforce_and_wait(1, None, 101, None).await; - assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("timeout")); - } - #[test] fn test_build_parameter_group_key_uses_exact_values() { let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap(); diff --git a/crates/executor/src/queue_manager.rs b/crates/executor/src/queue_manager.rs index 741b390..23642e1 100644 --- a/crates/executor/src/queue_manager.rs +++ b/crates/executor/src/queue_manager.rs @@ -18,8 +18,8 @@ use serde::{Deserialize, Serialize}; use sqlx::PgPool; use std::collections::VecDeque; use std::sync::Arc; -use tokio::sync::{Mutex, Notify}; -use tokio::time::{timeout, Duration}; +use tokio::sync::Mutex; +use tokio::time::{sleep, Duration, Instant}; use tracing::{debug, info, warn}; use attune_common::models::Id; @@ -53,8 +53,6 @@ struct QueueEntry { execution_id: Id, /// When this entry was added to the queue enqueued_at: DateTime, - /// Notifier to wake up this specific waiter - notifier: Arc, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -244,9 +242,6 @@ impl ExecutionQueueManager { .get_or_create_queue(queue_key.clone(), max_concurrent) .await; - // Create notifier for this execution - let notifier = Arc::new(Notify::new()); - // Try to enqueue { let mut queue = queue_arc.lock().await; @@ -276,7 +271,7 @@ impl ExecutionQueueManager { } // Check if we can run immediately - if queue.has_capacity() { + if queue.has_capacity() && queue.queue.is_empty() { debug!( "Execution {} can run immediately for action {} (active: {}/{}, group: {:?})", execution_id, @@ -317,7 +312,6 @@ impl ExecutionQueueManager { let entry = QueueEntry { execution_id, enqueued_at: Utc::now(), - notifier: notifier.clone(), }; queue.queue.push_back(entry); @@ -337,16 +331,40 @@ impl ExecutionQueueManager { // Persist stats to database if available self.persist_queue_stats(action_id).await; - // Wait for notification with timeout - let wait_duration = Duration::from_secs(self.config.queue_timeout_seconds); + // Wait until this execution reaches the front of the queue and can + // activate itself. Production code uses non-blocking queue advancement; + // this blocking helper exists mainly for tests and legacy call sites. + let deadline = Instant::now() + Duration::from_secs(self.config.queue_timeout_seconds); + loop { + { + let mut queue = queue_arc.lock().await; + let queued_index = queue + .queue + .iter() + .position(|entry| entry.execution_id == execution_id); - match timeout(wait_duration, notifier.notified()).await { - Ok(_) => { - debug!("Execution {} notified, can proceed", execution_id); - Ok(()) + if let Some(0) = queued_index { + if queue.has_capacity() { + let entry = queue.queue.pop_front().expect("front entry just checked"); + queue.active_count += 1; + self.active_execution_keys + .insert(entry.execution_id, queue_key.clone()); + drop(queue); + self.persist_queue_stats(action_id).await; + debug!( + "Execution {} reached front of queue and can proceed", + execution_id + ); + return Ok(()); + } + } else if queued_index.is_none() + && self.active_execution_keys.contains_key(&execution_id) + { + return Ok(()); + } } - Err(_) => { - // Timeout - remove from queue + + if Instant::now() >= deadline { let mut queue = queue_arc.lock().await; queue.queue.retain(|e| e.execution_id != execution_id); @@ -355,12 +373,14 @@ impl ExecutionQueueManager { execution_id, self.config.queue_timeout_seconds ); - Err(anyhow::anyhow!( + return Err(anyhow::anyhow!( "Queue timeout for execution {}: waited {} seconds", execution_id, self.config.queue_timeout_seconds - )) + )); } + + sleep(Duration::from_millis(10)).await; } } @@ -415,7 +435,7 @@ impl ExecutionQueueManager { return Ok(SlotEnqueueOutcome::Enqueued); } - if queue.has_capacity() { + if queue.has_capacity() && queue.queue.is_empty() { queue.active_count += 1; queue.total_enqueued += 1; self.active_execution_keys @@ -444,7 +464,6 @@ impl ExecutionQueueManager { queue.queue.push_back(QueueEntry { execution_id, enqueued_at: Utc::now(), - notifier: Arc::new(Notify::new()), }); queue.total_enqueued += 1; } @@ -481,7 +500,7 @@ impl ExecutionQueueManager { }); } - if queue.has_capacity() { + if queue.has_capacity() && queue.queue.is_empty() { queue.active_count += 1; queue.total_enqueued += 1; self.active_execution_keys @@ -507,39 +526,6 @@ impl ExecutionQueueManager { /// 2. Check if there are queued executions /// 3. Notify the first (oldest) queued execution /// 4. Increment active count for the notified execution - /// - /// # Arguments - /// * `execution_id` - The execution that completed - /// - /// # Returns - /// * `Ok(true)` - A queued execution was notified - /// * `Ok(false)` - No executions were waiting - /// * `Err(_)` - Error accessing queue - pub async fn notify_completion(&self, execution_id: Id) -> Result { - Ok(self - .notify_completion_with_next(execution_id) - .await? - .is_some()) - } - - pub async fn notify_completion_with_next(&self, execution_id: Id) -> Result> { - let release = match self.release_active_slot(execution_id).await? { - Some(release) => release, - None => return Ok(None), - }; - - let Some(next_execution_id) = release.next_execution_id else { - return Ok(None); - }; - - if self.activate_queued_execution(next_execution_id).await? { - Ok(Some(next_execution_id)) - } else { - self.restore_active_slot(execution_id, &release).await?; - Ok(None) - } - } - pub async fn release_active_slot( &self, execution_id: Id, @@ -602,6 +588,7 @@ impl ExecutionQueueManager { } let next_execution_id = queue.queue.front().map(|entry| entry.execution_id); + if let Some(next_execution_id) = next_execution_id { info!( "Execution {} is next for action {} group {:?}", @@ -639,45 +626,6 @@ impl ExecutionQueueManager { Ok(()) } - pub async fn activate_queued_execution(&self, execution_id: Id) -> Result { - for entry in self.queues.iter() { - let queue_key = entry.key().clone(); - let queue_arc = entry.value().clone(); - let mut queue = queue_arc.lock().await; - - let Some(front) = queue.queue.front() else { - continue; - }; - - if front.execution_id != execution_id { - continue; - } - - if !queue.has_capacity() { - return Ok(false); - } - - let entry = queue.queue.pop_front().expect("front entry just checked"); - info!( - "Activating queued execution {} for action {} group {:?} (queued for {:?})", - entry.execution_id, - queue_key.action_id, - queue_key.group_key, - Utc::now() - entry.enqueued_at - ); - queue.active_count += 1; - self.active_execution_keys - .insert(entry.execution_id, queue_key.clone()); - - drop(queue); - entry.notifier.notify_one(); - self.persist_queue_stats(queue_key.action_id).await; - return Ok(true); - } - - Ok(false) - } - pub async fn remove_queued_execution( &self, execution_id: Id, @@ -787,7 +735,7 @@ impl ExecutionQueueManager { total_enqueued += queue.total_enqueued; total_completed += queue.total_completed; - if let Some(candidate) = queue.queue.front().map(|e| e.enqueued_at) { + if let Some(candidate) = queue.queue.front().map(|entry| entry.enqueued_at) { oldest_enqueued_at = Some(match oldest_enqueued_at { Some(current) => current.min(candidate), None => candidate, @@ -854,8 +802,12 @@ impl ExecutionQueueManager { for queue_arc in queue_arcs { let mut queue = queue_arc.lock().await; let initial_len = queue.queue.len(); - queue.queue.retain(|e| e.execution_id != execution_id); + queue + .queue + .retain(|entry| entry.execution_id != execution_id); if initial_len != queue.queue.len() { + drop(queue); + self.persist_queue_stats(action_id).await; info!("Cancelled execution {} from queue", execution_id); return Ok(true); } @@ -961,7 +913,7 @@ mod tests { // Release them one by one for execution_id in 100..103 { sleep(Duration::from_millis(50)).await; - manager.notify_completion(execution_id).await.unwrap(); + manager.release_active_slot(execution_id).await.unwrap(); } // Wait for all to complete @@ -1005,8 +957,9 @@ mod tests { assert_eq!(stats.active_count, 1); // Notify completion - let notified = manager_clone.notify_completion(100).await.unwrap(); - assert!(notified); + let release = manager_clone.release_active_slot(100).await.unwrap(); + assert!(release.is_some()); + assert_eq!(release.unwrap().next_execution_id, Some(101)); // Wait for queued execution to proceed handle.await.unwrap(); @@ -1033,7 +986,7 @@ mod tests { assert_eq!(stats2.active_count, 1); // Completion on action 1 shouldn't affect action 2 - manager.notify_completion(100).await.unwrap(); + manager.release_active_slot(100).await.unwrap(); let stats1 = manager.get_queue_stats(1).await.unwrap(); let stats2 = manager.get_queue_stats(2).await.unwrap(); @@ -1042,6 +995,51 @@ mod tests { assert_eq!(stats2.active_count, 1); } + #[tokio::test] + async fn test_release_reserves_front_of_queue_before_new_enqueues() { + let manager = Arc::new(ExecutionQueueManager::with_defaults()); + let action_id = 1; + + manager + .enqueue_and_wait(action_id, 100, 1, None) + .await + .unwrap(); + + let manager_clone = manager.clone(); + let waiting = tokio::spawn(async move { + manager_clone + .enqueue_and_wait(action_id, 101, 1, None) + .await + .unwrap(); + }); + + sleep(Duration::from_millis(100)).await; + + let release = manager.release_active_slot(100).await.unwrap().unwrap(); + assert_eq!(release.next_execution_id, Some(101)); + + let enqueue_outcome = manager.enqueue(action_id, 102, 1, None).await.unwrap(); + assert_eq!(enqueue_outcome, SlotEnqueueOutcome::Enqueued); + + let stats = manager.get_queue_stats(action_id).await.unwrap(); + assert_eq!(stats.active_count, 0); + assert_eq!(stats.queue_length, 2); + + assert_eq!( + manager.enqueue(action_id, 101, 1, None).await.unwrap(), + SlotEnqueueOutcome::Acquired + ); + let stats = manager.get_queue_stats(action_id).await.unwrap(); + assert_eq!(stats.active_count, 1); + assert_eq!(stats.queue_length, 1); + + let stats = manager.get_queue_stats(action_id).await.unwrap(); + assert_eq!(stats.active_count, 1); + assert_eq!(stats.queue_length, 1); + + waiting.await.unwrap(); + } + #[tokio::test] async fn test_grouped_queues_are_independent() { let manager = ExecutionQueueManager::with_defaults(); @@ -1199,7 +1197,7 @@ mod tests { // Release them all for execution_id in 0..num_executions { sleep(Duration::from_millis(10)).await; - manager.notify_completion(execution_id).await.unwrap(); + manager.release_active_slot(execution_id).await.unwrap(); } // Wait for completion diff --git a/crates/executor/src/scheduler.rs b/crates/executor/src/scheduler.rs index 62af490..bead498 100644 --- a/crates/executor/src/scheduler.rs +++ b/crates/executor/src/scheduler.rs @@ -357,9 +357,12 @@ impl ExecutionScheduler { let mut execution_for_update = execution; execution_for_update.status = ExecutionStatus::Scheduled; execution_for_update.worker = Some(worker.id); - if let Err(err) = - ExecutionRepository::update(pool, execution_for_update.id, execution_for_update.into()) - .await + if let Err(err) = ExecutionRepository::update_loaded( + pool, + &execution_for_update, + execution_for_update.clone().into(), + ) + .await { Self::release_acquired_policy_slot(policy_enforcer, pool, publisher, execution_id) .await?; @@ -480,7 +483,8 @@ impl ExecutionScheduler { // Mark the parent execution as Running let mut running_exec = execution.clone(); running_exec.status = ExecutionStatus::Running; - ExecutionRepository::update(pool, running_exec.id, running_exec.into()).await?; + ExecutionRepository::update_loaded(pool, &running_exec, running_exec.clone().into()) + .await?; if graph.entry_points.is_empty() { warn!( diff --git a/crates/executor/tests/fifo_ordering_integration_test.rs b/crates/executor/tests/fifo_ordering_integration_test.rs index f3c387a..5023234 100644 --- a/crates/executor/tests/fifo_ordering_integration_test.rs +++ b/crates/executor/tests/fifo_ordering_integration_test.rs @@ -26,6 +26,7 @@ use attune_executor::queue_manager::{ExecutionQueueManager, QueueConfig}; use chrono::Utc; use serde_json::json; use sqlx::PgPool; +use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; @@ -172,6 +173,26 @@ async fn cleanup_test_data(pool: &PgPool, pack_id: i64) { .ok(); } +async fn release_next_active( + manager: &ExecutionQueueManager, + active_execution_ids: &mut VecDeque, +) -> Option { + let execution_id = active_execution_ids + .pop_front() + .expect("Expected an active execution to release"); + let release = manager + .release_active_slot(execution_id) + .await + .expect("Release should succeed") + .expect("Active execution should have a tracked slot"); + + if let Some(next_execution_id) = release.next_execution_id { + active_execution_ids.push_back(next_execution_id); + } + + release.next_execution_id +} + #[tokio::test] #[ignore] // Requires database async fn test_fifo_ordering_with_database() { @@ -198,6 +219,7 @@ async fn test_fifo_ordering_with_database() { // Create first execution in database and enqueue let first_exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + let mut active_execution_ids = VecDeque::from([first_exec_id]); manager .enqueue_and_wait(action_id, first_exec_id, max_concurrent, None) .await @@ -250,10 +272,7 @@ async fn test_fifo_ordering_with_database() { // Release them one by one for _ in 0..num_executions { sleep(Duration::from_millis(50)).await; - manager - .notify_completion(action_id) - .await - .expect("Notify should succeed"); + release_next_active(&manager, &mut active_execution_ids).await; } // Wait for all to complete @@ -295,6 +314,7 @@ async fn test_high_concurrency_stress() { let num_executions: i64 = 1000; let execution_order = Arc::new(Mutex::new(Vec::new())); let mut handles = vec![]; + let execution_ids = Arc::new(Mutex::new(vec![None; num_executions as usize])); println!("Starting stress test with {} executions...", num_executions); let start_time = std::time::Instant::now(); @@ -305,6 +325,7 @@ async fn test_high_concurrency_stress() { let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let order = execution_order.clone(); + let ids = execution_ids.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( @@ -314,6 +335,7 @@ async fn test_high_concurrency_stress() { ExecutionStatus::Requested, ) .await; + ids.lock().await[i as usize] = Some(exec_id); manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent, None) @@ -332,6 +354,7 @@ async fn test_high_concurrency_stress() { let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let order = execution_order.clone(); + let ids = execution_ids.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( @@ -341,6 +364,7 @@ async fn test_high_concurrency_stress() { ExecutionStatus::Requested, ) .await; + ids.lock().await[i as usize] = Some(exec_id); manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent, None) @@ -376,15 +400,21 @@ async fn test_high_concurrency_stress() { ); // Release all executions + let ids = execution_ids.lock().await; + let mut active_execution_ids = VecDeque::from( + ids.iter() + .take(max_concurrent as usize) + .map(|id| id.expect("Initial execution id should be recorded")) + .collect::>(), + ); + drop(ids); + println!("Releasing executions..."); for i in 0..num_executions { if i % 100 == 0 { println!("Released {} executions", i); } - manager - .notify_completion(action_id) - .await - .expect("Notify should succeed"); + release_next_active(&manager, &mut active_execution_ids).await; // Small delay to allow queue processing if i % 50 == 0 { @@ -416,7 +446,7 @@ async fn test_high_concurrency_stress() { "All executions should complete" ); - let expected: Vec = (0..num_executions).collect(); + let expected: Vec<_> = (0..num_executions).collect(); assert_eq!( *order, expected, "Executions should complete in strict FIFO order" @@ -461,9 +491,31 @@ async fn test_multiple_workers_simulation() { let num_executions = 30; let execution_order = Arc::new(Mutex::new(Vec::new())); let mut handles = vec![]; + let mut active_execution_ids = VecDeque::new(); - // Spawn all executions - for i in 0..num_executions { + // Fill the initial worker slots deterministically. + for i in 0..max_concurrent { + let exec_id = + create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + active_execution_ids.push_back(exec_id); + + let manager_clone = manager.clone(); + let order = execution_order.clone(); + + let handle = tokio::spawn(async move { + manager_clone + .enqueue_and_wait(action_id, exec_id, max_concurrent, None) + .await + .expect("Enqueue should succeed"); + + order.lock().await.push(i); + }); + + handles.push(handle); + } + + // Queue the remaining executions. + for i in max_concurrent..num_executions { let pool_clone = pool.clone(); let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); @@ -499,6 +551,8 @@ async fn test_multiple_workers_simulation() { let worker_completions = Arc::new(Mutex::new(vec![0, 0, 0])); let worker_completions_clone = worker_completions.clone(); let manager_clone = manager.clone(); + let active_execution_ids = Arc::new(Mutex::new(active_execution_ids)); + let active_execution_ids_clone = active_execution_ids.clone(); // Spawn worker simulators let worker_handle = tokio::spawn(async move { @@ -514,10 +568,8 @@ async fn test_multiple_workers_simulation() { sleep(Duration::from_millis(delay)).await; // Worker completes and notifies - manager_clone - .notify_completion(action_id) - .await - .expect("Notify should succeed"); + let mut active_execution_ids = active_execution_ids_clone.lock().await; + release_next_active(&manager_clone, &mut active_execution_ids).await; worker_completions_clone.lock().await[next_worker] += 1; @@ -536,7 +588,7 @@ async fn test_multiple_workers_simulation() { // Verify FIFO order maintained despite different worker speeds let order = execution_order.lock().await; - let expected: Vec = (0..num_executions).collect(); + let expected: Vec<_> = (0..num_executions).collect(); assert_eq!( *order, expected, "FIFO order should be maintained regardless of worker speed" @@ -576,25 +628,28 @@ async fn test_cross_action_independence() { let executions_per_action = 50; let mut handles = vec![]; + let mut action1_active = VecDeque::new(); + let mut action2_active = VecDeque::new(); + let mut action3_active = VecDeque::new(); // Spawn executions for all three actions simultaneously for action_id in [action1_id, action2_id, action3_id] { let action_ref = format!("fifo_test_action_{}_{}", suffix, action_id); for i in 0..executions_per_action { - let pool_clone = pool.clone(); + let exec_id = + create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested) + .await; + + match action_id { + id if id == action1_id && i == 0 => action1_active.push_back(exec_id), + id if id == action2_id && i == 0 => action2_active.push_back(exec_id), + id if id == action3_id && i == 0 => action3_active.push_back(exec_id), + _ => {} + } + let manager_clone = manager.clone(); - let action_ref_clone = action_ref.clone(); - let handle = tokio::spawn(async move { - let exec_id = create_test_execution( - &pool_clone, - action_id, - &action_ref_clone, - ExecutionStatus::Requested, - ) - .await; - manager_clone .enqueue_and_wait(action_id, exec_id, 1, None) .await @@ -634,18 +689,9 @@ async fn test_cross_action_independence() { // Release all actions in an interleaved pattern for i in 0..executions_per_action { // Release one from each action - manager - .notify_completion(action1_id) - .await - .expect("Notify should succeed"); - manager - .notify_completion(action2_id) - .await - .expect("Notify should succeed"); - manager - .notify_completion(action3_id) - .await - .expect("Notify should succeed"); + release_next_active(&manager, &mut action1_active).await; + release_next_active(&manager, &mut action2_active).await; + release_next_active(&manager, &mut action3_active).await; if i % 10 == 0 { sleep(Duration::from_millis(10)).await; @@ -698,6 +744,7 @@ async fn test_cancellation_during_queue() { // Fill capacity let exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + let mut active_execution_ids = VecDeque::from([exec_id]); manager .enqueue_and_wait(action_id, exec_id, max_concurrent, None) .await @@ -757,7 +804,7 @@ async fn test_cancellation_during_queue() { // Release remaining for _ in 0..8 { - manager.notify_completion(action_id).await.unwrap(); + release_next_active(&manager, &mut active_execution_ids).await; sleep(Duration::from_millis(20)).await; } @@ -798,11 +845,15 @@ async fn test_queue_stats_persistence() { let max_concurrent = 5; let num_executions = 50; + let mut active_execution_ids = VecDeque::new(); // Enqueue executions for i in 0..num_executions { let exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; + if i < max_concurrent { + active_execution_ids.push_back(exec_id); + } // Start the enqueue in background let manager_clone = manager.clone(); @@ -838,7 +889,7 @@ async fn test_queue_stats_persistence() { // Release all for _ in 0..num_executions { - manager.notify_completion(action_id).await.unwrap(); + release_next_active(&manager, &mut active_execution_ids).await; sleep(Duration::from_millis(10)).await; } @@ -854,8 +905,8 @@ async fn test_queue_stats_persistence() { assert_eq!(final_db_stats.queue_length, 0); assert_eq!(final_mem_stats.queue_length, 0); - assert_eq!(final_db_stats.total_enqueued, num_executions); - assert_eq!(final_db_stats.total_completed, num_executions); + assert_eq!(final_db_stats.total_enqueued, num_executions as i64); + assert_eq!(final_db_stats.total_completed, num_executions as i64); // Cleanup cleanup_test_data(&pool, pack_id).await; @@ -951,6 +1002,7 @@ async fn test_extreme_stress_10k_executions() { let max_concurrent = 10; let num_executions: i64 = 10000; let completed = Arc::new(Mutex::new(0u64)); + let execution_ids = Arc::new(Mutex::new(vec![None; num_executions as usize])); println!( "Starting extreme stress test with {} executions...", @@ -965,6 +1017,7 @@ async fn test_extreme_stress_10k_executions() { let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let completed_clone = completed.clone(); + let ids = execution_ids.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( @@ -974,6 +1027,7 @@ async fn test_extreme_stress_10k_executions() { ExecutionStatus::Requested, ) .await; + ids.lock().await[i as usize] = Some(exec_id); manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent, None) @@ -999,12 +1053,18 @@ async fn test_extreme_stress_10k_executions() { println!("All executions spawned"); // Release all + let ids = execution_ids.lock().await; + let mut active_execution_ids = VecDeque::from( + ids.iter() + .take(max_concurrent as usize) + .map(|id| id.expect("Initial execution id should be recorded")) + .collect::>(), + ); + drop(ids); + let release_start = std::time::Instant::now(); for i in 0i64..num_executions { - manager - .notify_completion(action_id) - .await - .expect("Notify should succeed"); + release_next_active(&manager, &mut active_execution_ids).await; if i % 1000 == 0 { println!("Released: {}", i); diff --git a/crates/worker/src/executor.rs b/crates/worker/src/executor.rs index e642ed7..8c2ccef 100644 --- a/crates/worker/src/executor.rs +++ b/crates/worker/src/executor.rs @@ -1003,7 +1003,11 @@ impl ActionExecutor { ..Default::default() }; - ExecutionRepository::update(&self.pool, execution_id, input).await?; + let execution = ExecutionRepository::find_by_id(&self.pool, execution_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?; + + ExecutionRepository::update_loaded(&self.pool, &execution, input).await?; Ok(()) }