queueing fixes

This commit is contained in:
2026-04-02 08:06:02 -05:00
parent cf82de87ea
commit b6446cc574
10 changed files with 431 additions and 430 deletions

View File

@@ -416,8 +416,43 @@ impl Update for EnforcementRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
// Build update query
if input.status.is_none() && input.payload.is_none() && input.resolved_at.is_none() {
return Self::get_by_id(executor, id).await;
}
Self::update_with_locator(executor, input, |query| {
query.push(" WHERE id = ");
query.push_bind(id);
})
.await
}
}
#[async_trait::async_trait]
impl Delete for EnforcementRepository {
async fn delete<'e, E>(executor: E, id: i64) -> Result<bool>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let result = sqlx::query("DELETE FROM enforcement WHERE id = $1")
.bind(id)
.execute(executor)
.await?;
Ok(result.rows_affected() > 0)
}
}
impl EnforcementRepository {
async fn update_with_locator<'e, E, F>(
executor: E,
input: UpdateEnforcementInput,
where_clause: F,
) -> Result<Enforcement>
where
E: Executor<'e, Database = Postgres> + 'e,
F: FnOnce(&mut QueryBuilder<'_, Postgres>),
{
let mut query = QueryBuilder::new("UPDATE enforcement SET ");
let mut has_updates = false;
@@ -442,17 +477,13 @@ impl Update for EnforcementRepository {
}
query.push("resolved_at = ");
query.push_bind(resolved_at);
has_updates = true;
}
if !has_updates {
// No updates requested, fetch and return existing entity
return Self::get_by_id(executor, id).await;
}
query.push(" WHERE id = ");
query.push_bind(id);
query.push(" RETURNING id, rule, rule_ref, trigger_ref, config, event, status, payload, condition, conditions, created, resolved_at");
where_clause(&mut query);
query.push(
" RETURNING id, rule, rule_ref, trigger_ref, config, event, status, payload, \
condition, conditions, created, resolved_at",
);
let enforcement = query
.build_query_as::<Enforcement>()
@@ -461,24 +492,37 @@ impl Update for EnforcementRepository {
Ok(enforcement)
}
}
#[async_trait::async_trait]
impl Delete for EnforcementRepository {
async fn delete<'e, E>(executor: E, id: i64) -> Result<bool>
/// Update an enforcement using the loaded row's hypertable keys.
///
/// This avoids wide scans across compressed chunks by including both the
/// partitioning column (`created`) and compression segment key (`rule_ref`)
/// in the locator.
pub async fn update_loaded<'e, E>(
executor: E,
enforcement: &Enforcement,
input: UpdateEnforcementInput,
) -> Result<Enforcement>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let result = sqlx::query("DELETE FROM enforcement WHERE id = $1")
.bind(id)
.execute(executor)
.await?;
if input.status.is_none() && input.payload.is_none() && input.resolved_at.is_none() {
return Ok(enforcement.clone());
}
Ok(result.rows_affected() > 0)
let rule_ref = enforcement.rule_ref.clone();
Self::update_with_locator(executor, input, |query| {
query.push(" WHERE id = ");
query.push_bind(enforcement.id);
query.push(" AND created = ");
query.push_bind(enforcement.created);
query.push(" AND rule_ref = ");
query.push_bind(rule_ref);
})
.await
}
}
impl EnforcementRepository {
/// Find enforcements by rule ID
pub async fn find_by_rule<'e, E>(executor: E, rule_id: Id) -> Result<Vec<Enforcement>>
where

View File

@@ -191,7 +191,33 @@ impl Update for ExecutionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
// Build update query
if input.status.is_none()
&& input.result.is_none()
&& input.executor.is_none()
&& input.worker.is_none()
&& input.started_at.is_none()
&& input.workflow_task.is_none()
{
return Self::get_by_id(executor, id).await;
}
Self::update_with_locator(executor, input, |query| {
query.push(" WHERE id = ").push_bind(id);
})
.await
}
}
impl ExecutionRepository {
async fn update_with_locator<'e, E, F>(
executor: E,
input: UpdateExecutionInput,
where_clause: F,
) -> Result<Execution>
where
E: Executor<'e, Database = Postgres> + 'e,
F: FnOnce(&mut QueryBuilder<'_, Postgres>),
{
let mut query = QueryBuilder::new("UPDATE execution SET ");
let mut has_updates = false;
@@ -234,15 +260,10 @@ impl Update for ExecutionRepository {
query
.push("workflow_task = ")
.push_bind(sqlx::types::Json(workflow_task));
has_updates = true;
}
if !has_updates {
// No updates requested, fetch and return existing entity
return Self::get_by_id(executor, id).await;
}
query.push(", updated = NOW() WHERE id = ").push_bind(id);
query.push(", updated = NOW()");
where_clause(&mut query);
query.push(" RETURNING ");
query.push(SELECT_COLUMNS);
@@ -252,6 +273,38 @@ impl Update for ExecutionRepository {
.await
.map_err(Into::into)
}
/// Update an execution using the loaded row's hypertable keys.
///
/// Including both the partition key (`created`) and compression segment key
/// (`action_ref`) avoids broad scans across compressed chunks.
pub async fn update_loaded<'e, E>(
executor: E,
execution: &Execution,
input: UpdateExecutionInput,
) -> Result<Execution>
where
E: Executor<'e, Database = Postgres> + 'e,
{
if input.status.is_none()
&& input.result.is_none()
&& input.executor.is_none()
&& input.worker.is_none()
&& input.started_at.is_none()
&& input.workflow_task.is_none()
{
return Ok(execution.clone());
}
let action_ref = execution.action_ref.clone();
Self::update_with_locator(executor, input, |query| {
query.push(" WHERE id = ").push_bind(execution.id);
query.push(" AND created = ").push_bind(execution.created);
query.push(" AND action_ref = ").push_bind(action_ref);
})
.await
}
}
#[async_trait::async_trait]

View File

@@ -1430,3 +1430,70 @@ async fn test_enforcement_resolved_at_lifecycle() {
assert!(updated.resolved_at.is_some());
assert!(updated.resolved_at.unwrap() >= enforcement.created);
}
#[tokio::test]
#[ignore = "integration test — requires database"]
async fn test_update_loaded_enforcement_uses_loaded_locator() {
let pool = create_test_pool().await.unwrap();
let pack = PackFixture::new_unique("targeted_update_pack")
.create(&pool)
.await
.unwrap();
let trigger = TriggerFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "webhook")
.create(&pool)
.await
.unwrap();
let action = ActionFixture::new_unique(pack.id, &pack.r#ref, "action")
.create(&pool)
.await
.unwrap();
use attune_common::repositories::rule::{CreateRuleInput, RuleRepository};
let rule = RuleRepository::create(
&pool,
CreateRuleInput {
r#ref: format!("{}.test_rule", pack.r#ref),
pack: pack.id,
pack_ref: pack.r#ref.clone(),
label: "Test Rule".to_string(),
description: Some("Test".to_string()),
action: action.id,
action_ref: action.r#ref.clone(),
trigger: trigger.id,
trigger_ref: trigger.r#ref.clone(),
conditions: json!({}),
action_params: json!({}),
trigger_params: json!({}),
enabled: true,
is_adhoc: false,
},
)
.await
.unwrap();
let enforcement = EnforcementFixture::new_unique(Some(rule.id), &rule.r#ref, &trigger.r#ref)
.create(&pool)
.await
.unwrap();
let updated = EnforcementRepository::update_loaded(
&pool,
&enforcement,
UpdateEnforcementInput {
status: Some(EnforcementStatus::Processed),
payload: None,
resolved_at: Some(chrono::Utc::now()),
},
)
.await
.unwrap();
assert_eq!(updated.id, enforcement.id);
assert_eq!(updated.created, enforcement.created);
assert_eq!(updated.rule_ref, enforcement.rule_ref);
assert_eq!(updated.status, EnforcementStatus::Processed);
assert!(updated.resolved_at.is_some());
}

View File

@@ -304,7 +304,7 @@ mod tests {
use crate::queue_manager::ExecutionQueueManager;
#[tokio::test]
async fn test_notify_completion_releases_slot() {
async fn test_release_active_slot_releases_slot() {
let queue_manager = Arc::new(ExecutionQueueManager::with_defaults());
let action_id = 1;
@@ -320,8 +320,9 @@ mod tests {
assert_eq!(stats.queue_length, 0);
// Simulate completion notification
let notified = queue_manager.notify_completion(100).await.unwrap();
assert!(!notified); // No one waiting
let release = queue_manager.release_active_slot(100).await.unwrap();
assert!(release.is_some());
assert_eq!(release.unwrap().next_execution_id, None);
// Verify slot is released
let stats = queue_manager.get_queue_stats(action_id).await.unwrap();
@@ -329,7 +330,7 @@ mod tests {
}
#[tokio::test]
async fn test_notify_completion_wakes_waiting() {
async fn test_release_active_slot_wakes_waiting() {
let queue_manager = Arc::new(ExecutionQueueManager::with_defaults());
let action_id = 1;
@@ -357,8 +358,8 @@ mod tests {
assert_eq!(stats.queue_length, 1);
// Notify completion
let notified = queue_manager.notify_completion(100).await.unwrap();
assert!(notified); // Should wake the waiting execution
let release = queue_manager.release_active_slot(100).await.unwrap();
assert_eq!(release.unwrap().next_execution_id, Some(101));
// Wait for queued execution to proceed
handle.await.unwrap();
@@ -406,7 +407,11 @@ mod tests {
// Release them one by one
for execution_id in 100..103 {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
queue_manager.notify_completion(execution_id).await.unwrap();
let release = queue_manager
.release_active_slot(execution_id)
.await
.unwrap();
assert!(release.is_some());
}
// Wait for all to complete
@@ -425,8 +430,8 @@ mod tests {
let execution_id = 999; // Non-existent execution
// Should succeed but not notify anyone
let result = queue_manager.notify_completion(execution_id).await;
let result = queue_manager.release_active_slot(execution_id).await;
assert!(result.is_ok());
assert!(!result.unwrap());
assert!(result.unwrap().is_none());
}
}

View File

@@ -19,7 +19,7 @@ use attune_common::{
event::{EnforcementRepository, EventRepository, UpdateEnforcementInput},
execution::{CreateExecutionInput, ExecutionRepository},
rule::RuleRepository,
Create, FindById, Update,
Create, FindById,
},
};
@@ -146,9 +146,9 @@ impl EnforcementProcessor {
.await?;
// Update enforcement status to Processed after successful execution creation
EnforcementRepository::update(
EnforcementRepository::update_loaded(
pool,
enforcement_id,
&enforcement,
UpdateEnforcementInput {
status: Some(EnforcementStatus::Processed),
payload: None,
@@ -165,9 +165,9 @@ impl EnforcementProcessor {
);
// Update enforcement status to Disabled since it was not actionable
EnforcementRepository::update(
EnforcementRepository::update_loaded(
pool,
enforcement_id,
&enforcement,
UpdateEnforcementInput {
status: Some(EnforcementStatus::Disabled),
payload: None,

View File

@@ -432,106 +432,6 @@ impl PolicyEnforcer {
self.global_policy.concurrency_limit
}
/// Enforce policies and wait in queue if necessary
///
/// This method combines policy checking with queue management to ensure:
/// 1. Policy violations are detected early
/// 2. FIFO ordering is maintained when capacity is limited
/// 3. Executions wait efficiently for available slots
///
/// # Arguments
/// * `action_id` - The action to execute
/// * `pack_id` - The pack containing the action
/// * `execution_id` - The execution/enforcement ID for queue tracking
///
/// # Returns
/// * `Ok(())` - Policy allows execution and queue slot obtained
/// * `Err(PolicyViolation)` - Policy prevents execution
/// * `Err(QueueError)` - Queue timeout or other queue error
#[allow(dead_code)]
pub async fn enforce_and_wait(
&self,
action_id: Id,
pack_id: Option<Id>,
execution_id: Id,
config: Option<&JsonValue>,
) -> Result<()> {
// First, check for policy violations (rate limit, quotas, etc.)
// Note: We skip concurrency check here since queue manages that
if let Some(violation) = self
.check_policies_except_concurrency(action_id, pack_id)
.await?
{
warn!("Policy violation for action {}: {}", action_id, violation);
return Err(anyhow::anyhow!("Policy violation: {}", violation));
}
// If queue manager is available, use it for concurrency control
if let Some(concurrency) = self.resolve_concurrency_policy(action_id, pack_id).await? {
let group_key = self.build_parameter_group_key(&concurrency.parameters, config);
if let Some(queue_manager) = &self.queue_manager {
debug!(
"Applying concurrency policy to execution {} for action {} (limit: {}, method: {:?}, group: {:?})",
execution_id, action_id, concurrency.limit, concurrency.method, group_key
);
match concurrency.method {
PolicyMethod::Enqueue => {
queue_manager
.enqueue_and_wait(
action_id,
execution_id,
concurrency.limit,
group_key.clone(),
)
.await?;
}
PolicyMethod::Cancel => {
let outcome = queue_manager
.try_acquire(
action_id,
execution_id,
concurrency.limit,
group_key.clone(),
)
.await?;
if !outcome.acquired {
let violation = PolicyViolation::ConcurrencyLimitExceeded {
limit: concurrency.limit,
current_count: outcome.current_count,
};
warn!("Policy violation for action {}: {}", action_id, violation);
return Err(anyhow::anyhow!("Policy violation: {}", violation));
}
}
}
info!(
"Execution {} obtained queue slot for action {} (group: {:?})",
execution_id, action_id, group_key
);
} else {
// No queue manager - use legacy polling behavior
debug!(
"No queue manager configured, using legacy policy wait for action {}",
action_id
);
let scope = PolicyScope::Action(action_id);
if let Some(violation) = self
.check_concurrency_limit(concurrency.limit, &scope)
.await?
{
return Err(anyhow::anyhow!("Policy violation: {}", violation));
}
}
}
Ok(())
}
/// Check policies except concurrency (which is handled by queue)
async fn check_policies_except_concurrency(
&self,
@@ -899,8 +799,6 @@ fn extract_parameter_value(config: Option<&JsonValue>, path: &str) -> JsonValue
#[cfg(test)]
mod tests {
use super::*;
use crate::queue_manager::QueueConfig;
use tokio::time::{sleep, Duration};
#[test]
fn test_policy_violation_display() {
@@ -1035,138 +933,6 @@ mod tests {
assert_eq!(enforcer.get_concurrency_limit(2, Some(200)), Some(20));
}
#[tokio::test]
async fn test_enforce_and_wait_with_queue_manager() {
let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap();
let queue_manager = Arc::new(ExecutionQueueManager::with_defaults());
let mut enforcer = PolicyEnforcer::with_queue_manager(pool, queue_manager.clone());
// Set concurrency limit
enforcer.set_action_policy(
1,
ExecutionPolicy {
concurrency_limit: Some(1),
..Default::default()
},
);
// First execution should proceed immediately
let result = enforcer.enforce_and_wait(1, None, 100, None).await;
assert!(result.is_ok());
// Check queue stats
let stats = queue_manager.get_queue_stats(1).await.unwrap();
assert_eq!(stats.active_count, 1);
assert_eq!(stats.queue_length, 0);
}
#[tokio::test]
async fn test_enforce_and_wait_fifo_ordering() {
let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap();
let queue_manager = Arc::new(ExecutionQueueManager::with_defaults());
let mut enforcer = PolicyEnforcer::with_queue_manager(pool, queue_manager.clone());
enforcer.set_action_policy(
1,
ExecutionPolicy {
concurrency_limit: Some(1),
..Default::default()
},
);
let enforcer = Arc::new(enforcer);
// First execution
let result = enforcer.enforce_and_wait(1, None, 100, None).await;
assert!(result.is_ok());
// Queue multiple executions
let execution_order = Arc::new(tokio::sync::Mutex::new(Vec::new()));
let mut handles = vec![];
for exec_id in 101..=103 {
let enforcer = enforcer.clone();
let queue_manager = queue_manager.clone();
let order = execution_order.clone();
let handle = tokio::spawn(async move {
enforcer
.enforce_and_wait(1, None, exec_id, None)
.await
.unwrap();
order.lock().await.push(exec_id);
// Simulate work
sleep(Duration::from_millis(10)).await;
queue_manager.notify_completion(exec_id).await.unwrap();
});
handles.push(handle);
}
// Give tasks time to queue
sleep(Duration::from_millis(100)).await;
// Release first execution
queue_manager.notify_completion(100).await.unwrap();
// Wait for all
for handle in handles {
handle.await.unwrap();
}
// Verify FIFO order
let order = execution_order.lock().await;
assert_eq!(*order, vec![101, 102, 103]);
}
#[tokio::test]
async fn test_enforce_and_wait_without_queue_manager() {
let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap();
let mut enforcer = PolicyEnforcer::new(pool);
// Set unlimited concurrency
enforcer.set_action_policy(
1,
ExecutionPolicy {
concurrency_limit: None,
..Default::default()
},
);
// Should work without queue manager (legacy behavior)
let result = enforcer.enforce_and_wait(1, None, 100, None).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_enforce_and_wait_queue_timeout() {
let config = QueueConfig {
max_queue_length: 100,
queue_timeout_seconds: 1, // Short timeout for test
enable_metrics: true,
};
let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap();
let queue_manager = Arc::new(ExecutionQueueManager::new(config));
let mut enforcer = PolicyEnforcer::with_queue_manager(pool, queue_manager.clone());
// Set concurrency limit
enforcer.set_action_policy(
1,
ExecutionPolicy {
concurrency_limit: Some(1),
..Default::default()
},
);
// First execution proceeds
enforcer.enforce_and_wait(1, None, 100, None).await.unwrap();
// Second execution should timeout
let result = enforcer.enforce_and_wait(1, None, 101, None).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("timeout"));
}
#[test]
fn test_build_parameter_group_key_uses_exact_values() {
let pool = sqlx::PgPool::connect_lazy("postgresql://localhost/test").unwrap();

View File

@@ -18,8 +18,8 @@ use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use tokio::time::{timeout, Duration};
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration, Instant};
use tracing::{debug, info, warn};
use attune_common::models::Id;
@@ -53,8 +53,6 @@ struct QueueEntry {
execution_id: Id,
/// When this entry was added to the queue
enqueued_at: DateTime<Utc>,
/// Notifier to wake up this specific waiter
notifier: Arc<Notify>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -244,9 +242,6 @@ impl ExecutionQueueManager {
.get_or_create_queue(queue_key.clone(), max_concurrent)
.await;
// Create notifier for this execution
let notifier = Arc::new(Notify::new());
// Try to enqueue
{
let mut queue = queue_arc.lock().await;
@@ -276,7 +271,7 @@ impl ExecutionQueueManager {
}
// Check if we can run immediately
if queue.has_capacity() {
if queue.has_capacity() && queue.queue.is_empty() {
debug!(
"Execution {} can run immediately for action {} (active: {}/{}, group: {:?})",
execution_id,
@@ -317,7 +312,6 @@ impl ExecutionQueueManager {
let entry = QueueEntry {
execution_id,
enqueued_at: Utc::now(),
notifier: notifier.clone(),
};
queue.queue.push_back(entry);
@@ -337,16 +331,40 @@ impl ExecutionQueueManager {
// Persist stats to database if available
self.persist_queue_stats(action_id).await;
// Wait for notification with timeout
let wait_duration = Duration::from_secs(self.config.queue_timeout_seconds);
// Wait until this execution reaches the front of the queue and can
// activate itself. Production code uses non-blocking queue advancement;
// this blocking helper exists mainly for tests and legacy call sites.
let deadline = Instant::now() + Duration::from_secs(self.config.queue_timeout_seconds);
loop {
{
let mut queue = queue_arc.lock().await;
let queued_index = queue
.queue
.iter()
.position(|entry| entry.execution_id == execution_id);
match timeout(wait_duration, notifier.notified()).await {
Ok(_) => {
debug!("Execution {} notified, can proceed", execution_id);
Ok(())
if let Some(0) = queued_index {
if queue.has_capacity() {
let entry = queue.queue.pop_front().expect("front entry just checked");
queue.active_count += 1;
self.active_execution_keys
.insert(entry.execution_id, queue_key.clone());
drop(queue);
self.persist_queue_stats(action_id).await;
debug!(
"Execution {} reached front of queue and can proceed",
execution_id
);
return Ok(());
}
} else if queued_index.is_none()
&& self.active_execution_keys.contains_key(&execution_id)
{
return Ok(());
}
}
Err(_) => {
// Timeout - remove from queue
if Instant::now() >= deadline {
let mut queue = queue_arc.lock().await;
queue.queue.retain(|e| e.execution_id != execution_id);
@@ -355,12 +373,14 @@ impl ExecutionQueueManager {
execution_id, self.config.queue_timeout_seconds
);
Err(anyhow::anyhow!(
return Err(anyhow::anyhow!(
"Queue timeout for execution {}: waited {} seconds",
execution_id,
self.config.queue_timeout_seconds
))
));
}
sleep(Duration::from_millis(10)).await;
}
}
@@ -415,7 +435,7 @@ impl ExecutionQueueManager {
return Ok(SlotEnqueueOutcome::Enqueued);
}
if queue.has_capacity() {
if queue.has_capacity() && queue.queue.is_empty() {
queue.active_count += 1;
queue.total_enqueued += 1;
self.active_execution_keys
@@ -444,7 +464,6 @@ impl ExecutionQueueManager {
queue.queue.push_back(QueueEntry {
execution_id,
enqueued_at: Utc::now(),
notifier: Arc::new(Notify::new()),
});
queue.total_enqueued += 1;
}
@@ -481,7 +500,7 @@ impl ExecutionQueueManager {
});
}
if queue.has_capacity() {
if queue.has_capacity() && queue.queue.is_empty() {
queue.active_count += 1;
queue.total_enqueued += 1;
self.active_execution_keys
@@ -507,39 +526,6 @@ impl ExecutionQueueManager {
/// 2. Check if there are queued executions
/// 3. Notify the first (oldest) queued execution
/// 4. Increment active count for the notified execution
///
/// # Arguments
/// * `execution_id` - The execution that completed
///
/// # Returns
/// * `Ok(true)` - A queued execution was notified
/// * `Ok(false)` - No executions were waiting
/// * `Err(_)` - Error accessing queue
pub async fn notify_completion(&self, execution_id: Id) -> Result<bool> {
Ok(self
.notify_completion_with_next(execution_id)
.await?
.is_some())
}
pub async fn notify_completion_with_next(&self, execution_id: Id) -> Result<Option<Id>> {
let release = match self.release_active_slot(execution_id).await? {
Some(release) => release,
None => return Ok(None),
};
let Some(next_execution_id) = release.next_execution_id else {
return Ok(None);
};
if self.activate_queued_execution(next_execution_id).await? {
Ok(Some(next_execution_id))
} else {
self.restore_active_slot(execution_id, &release).await?;
Ok(None)
}
}
pub async fn release_active_slot(
&self,
execution_id: Id,
@@ -602,6 +588,7 @@ impl ExecutionQueueManager {
}
let next_execution_id = queue.queue.front().map(|entry| entry.execution_id);
if let Some(next_execution_id) = next_execution_id {
info!(
"Execution {} is next for action {} group {:?}",
@@ -639,45 +626,6 @@ impl ExecutionQueueManager {
Ok(())
}
pub async fn activate_queued_execution(&self, execution_id: Id) -> Result<bool> {
for entry in self.queues.iter() {
let queue_key = entry.key().clone();
let queue_arc = entry.value().clone();
let mut queue = queue_arc.lock().await;
let Some(front) = queue.queue.front() else {
continue;
};
if front.execution_id != execution_id {
continue;
}
if !queue.has_capacity() {
return Ok(false);
}
let entry = queue.queue.pop_front().expect("front entry just checked");
info!(
"Activating queued execution {} for action {} group {:?} (queued for {:?})",
entry.execution_id,
queue_key.action_id,
queue_key.group_key,
Utc::now() - entry.enqueued_at
);
queue.active_count += 1;
self.active_execution_keys
.insert(entry.execution_id, queue_key.clone());
drop(queue);
entry.notifier.notify_one();
self.persist_queue_stats(queue_key.action_id).await;
return Ok(true);
}
Ok(false)
}
pub async fn remove_queued_execution(
&self,
execution_id: Id,
@@ -787,7 +735,7 @@ impl ExecutionQueueManager {
total_enqueued += queue.total_enqueued;
total_completed += queue.total_completed;
if let Some(candidate) = queue.queue.front().map(|e| e.enqueued_at) {
if let Some(candidate) = queue.queue.front().map(|entry| entry.enqueued_at) {
oldest_enqueued_at = Some(match oldest_enqueued_at {
Some(current) => current.min(candidate),
None => candidate,
@@ -854,8 +802,12 @@ impl ExecutionQueueManager {
for queue_arc in queue_arcs {
let mut queue = queue_arc.lock().await;
let initial_len = queue.queue.len();
queue.queue.retain(|e| e.execution_id != execution_id);
queue
.queue
.retain(|entry| entry.execution_id != execution_id);
if initial_len != queue.queue.len() {
drop(queue);
self.persist_queue_stats(action_id).await;
info!("Cancelled execution {} from queue", execution_id);
return Ok(true);
}
@@ -961,7 +913,7 @@ mod tests {
// Release them one by one
for execution_id in 100..103 {
sleep(Duration::from_millis(50)).await;
manager.notify_completion(execution_id).await.unwrap();
manager.release_active_slot(execution_id).await.unwrap();
}
// Wait for all to complete
@@ -1005,8 +957,9 @@ mod tests {
assert_eq!(stats.active_count, 1);
// Notify completion
let notified = manager_clone.notify_completion(100).await.unwrap();
assert!(notified);
let release = manager_clone.release_active_slot(100).await.unwrap();
assert!(release.is_some());
assert_eq!(release.unwrap().next_execution_id, Some(101));
// Wait for queued execution to proceed
handle.await.unwrap();
@@ -1033,7 +986,7 @@ mod tests {
assert_eq!(stats2.active_count, 1);
// Completion on action 1 shouldn't affect action 2
manager.notify_completion(100).await.unwrap();
manager.release_active_slot(100).await.unwrap();
let stats1 = manager.get_queue_stats(1).await.unwrap();
let stats2 = manager.get_queue_stats(2).await.unwrap();
@@ -1042,6 +995,51 @@ mod tests {
assert_eq!(stats2.active_count, 1);
}
#[tokio::test]
async fn test_release_reserves_front_of_queue_before_new_enqueues() {
let manager = Arc::new(ExecutionQueueManager::with_defaults());
let action_id = 1;
manager
.enqueue_and_wait(action_id, 100, 1, None)
.await
.unwrap();
let manager_clone = manager.clone();
let waiting = tokio::spawn(async move {
manager_clone
.enqueue_and_wait(action_id, 101, 1, None)
.await
.unwrap();
});
sleep(Duration::from_millis(100)).await;
let release = manager.release_active_slot(100).await.unwrap().unwrap();
assert_eq!(release.next_execution_id, Some(101));
let enqueue_outcome = manager.enqueue(action_id, 102, 1, None).await.unwrap();
assert_eq!(enqueue_outcome, SlotEnqueueOutcome::Enqueued);
let stats = manager.get_queue_stats(action_id).await.unwrap();
assert_eq!(stats.active_count, 0);
assert_eq!(stats.queue_length, 2);
assert_eq!(
manager.enqueue(action_id, 101, 1, None).await.unwrap(),
SlotEnqueueOutcome::Acquired
);
let stats = manager.get_queue_stats(action_id).await.unwrap();
assert_eq!(stats.active_count, 1);
assert_eq!(stats.queue_length, 1);
let stats = manager.get_queue_stats(action_id).await.unwrap();
assert_eq!(stats.active_count, 1);
assert_eq!(stats.queue_length, 1);
waiting.await.unwrap();
}
#[tokio::test]
async fn test_grouped_queues_are_independent() {
let manager = ExecutionQueueManager::with_defaults();
@@ -1199,7 +1197,7 @@ mod tests {
// Release them all
for execution_id in 0..num_executions {
sleep(Duration::from_millis(10)).await;
manager.notify_completion(execution_id).await.unwrap();
manager.release_active_slot(execution_id).await.unwrap();
}
// Wait for completion

View File

@@ -357,9 +357,12 @@ impl ExecutionScheduler {
let mut execution_for_update = execution;
execution_for_update.status = ExecutionStatus::Scheduled;
execution_for_update.worker = Some(worker.id);
if let Err(err) =
ExecutionRepository::update(pool, execution_for_update.id, execution_for_update.into())
.await
if let Err(err) = ExecutionRepository::update_loaded(
pool,
&execution_for_update,
execution_for_update.clone().into(),
)
.await
{
Self::release_acquired_policy_slot(policy_enforcer, pool, publisher, execution_id)
.await?;
@@ -480,7 +483,8 @@ impl ExecutionScheduler {
// Mark the parent execution as Running
let mut running_exec = execution.clone();
running_exec.status = ExecutionStatus::Running;
ExecutionRepository::update(pool, running_exec.id, running_exec.into()).await?;
ExecutionRepository::update_loaded(pool, &running_exec, running_exec.clone().into())
.await?;
if graph.entry_points.is_empty() {
warn!(

View File

@@ -26,6 +26,7 @@ use attune_executor::queue_manager::{ExecutionQueueManager, QueueConfig};
use chrono::Utc;
use serde_json::json;
use sqlx::PgPool;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
@@ -172,6 +173,26 @@ async fn cleanup_test_data(pool: &PgPool, pack_id: i64) {
.ok();
}
async fn release_next_active(
manager: &ExecutionQueueManager,
active_execution_ids: &mut VecDeque<i64>,
) -> Option<i64> {
let execution_id = active_execution_ids
.pop_front()
.expect("Expected an active execution to release");
let release = manager
.release_active_slot(execution_id)
.await
.expect("Release should succeed")
.expect("Active execution should have a tracked slot");
if let Some(next_execution_id) = release.next_execution_id {
active_execution_ids.push_back(next_execution_id);
}
release.next_execution_id
}
#[tokio::test]
#[ignore] // Requires database
async fn test_fifo_ordering_with_database() {
@@ -198,6 +219,7 @@ async fn test_fifo_ordering_with_database() {
// Create first execution in database and enqueue
let first_exec_id =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
let mut active_execution_ids = VecDeque::from([first_exec_id]);
manager
.enqueue_and_wait(action_id, first_exec_id, max_concurrent, None)
.await
@@ -250,10 +272,7 @@ async fn test_fifo_ordering_with_database() {
// Release them one by one
for _ in 0..num_executions {
sleep(Duration::from_millis(50)).await;
manager
.notify_completion(action_id)
.await
.expect("Notify should succeed");
release_next_active(&manager, &mut active_execution_ids).await;
}
// Wait for all to complete
@@ -295,6 +314,7 @@ async fn test_high_concurrency_stress() {
let num_executions: i64 = 1000;
let execution_order = Arc::new(Mutex::new(Vec::new()));
let mut handles = vec![];
let execution_ids = Arc::new(Mutex::new(vec![None; num_executions as usize]));
println!("Starting stress test with {} executions...", num_executions);
let start_time = std::time::Instant::now();
@@ -305,6 +325,7 @@ async fn test_high_concurrency_stress() {
let manager_clone = manager.clone();
let action_ref_clone = action_ref.clone();
let order = execution_order.clone();
let ids = execution_ids.clone();
let handle = tokio::spawn(async move {
let exec_id = create_test_execution(
@@ -314,6 +335,7 @@ async fn test_high_concurrency_stress() {
ExecutionStatus::Requested,
)
.await;
ids.lock().await[i as usize] = Some(exec_id);
manager_clone
.enqueue_and_wait(action_id, exec_id, max_concurrent, None)
@@ -332,6 +354,7 @@ async fn test_high_concurrency_stress() {
let manager_clone = manager.clone();
let action_ref_clone = action_ref.clone();
let order = execution_order.clone();
let ids = execution_ids.clone();
let handle = tokio::spawn(async move {
let exec_id = create_test_execution(
@@ -341,6 +364,7 @@ async fn test_high_concurrency_stress() {
ExecutionStatus::Requested,
)
.await;
ids.lock().await[i as usize] = Some(exec_id);
manager_clone
.enqueue_and_wait(action_id, exec_id, max_concurrent, None)
@@ -376,15 +400,21 @@ async fn test_high_concurrency_stress() {
);
// Release all executions
let ids = execution_ids.lock().await;
let mut active_execution_ids = VecDeque::from(
ids.iter()
.take(max_concurrent as usize)
.map(|id| id.expect("Initial execution id should be recorded"))
.collect::<Vec<_>>(),
);
drop(ids);
println!("Releasing executions...");
for i in 0..num_executions {
if i % 100 == 0 {
println!("Released {} executions", i);
}
manager
.notify_completion(action_id)
.await
.expect("Notify should succeed");
release_next_active(&manager, &mut active_execution_ids).await;
// Small delay to allow queue processing
if i % 50 == 0 {
@@ -416,7 +446,7 @@ async fn test_high_concurrency_stress() {
"All executions should complete"
);
let expected: Vec<i64> = (0..num_executions).collect();
let expected: Vec<_> = (0..num_executions).collect();
assert_eq!(
*order, expected,
"Executions should complete in strict FIFO order"
@@ -461,9 +491,31 @@ async fn test_multiple_workers_simulation() {
let num_executions = 30;
let execution_order = Arc::new(Mutex::new(Vec::new()));
let mut handles = vec![];
let mut active_execution_ids = VecDeque::new();
// Spawn all executions
for i in 0..num_executions {
// Fill the initial worker slots deterministically.
for i in 0..max_concurrent {
let exec_id =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
active_execution_ids.push_back(exec_id);
let manager_clone = manager.clone();
let order = execution_order.clone();
let handle = tokio::spawn(async move {
manager_clone
.enqueue_and_wait(action_id, exec_id, max_concurrent, None)
.await
.expect("Enqueue should succeed");
order.lock().await.push(i);
});
handles.push(handle);
}
// Queue the remaining executions.
for i in max_concurrent..num_executions {
let pool_clone = pool.clone();
let manager_clone = manager.clone();
let action_ref_clone = action_ref.clone();
@@ -499,6 +551,8 @@ async fn test_multiple_workers_simulation() {
let worker_completions = Arc::new(Mutex::new(vec![0, 0, 0]));
let worker_completions_clone = worker_completions.clone();
let manager_clone = manager.clone();
let active_execution_ids = Arc::new(Mutex::new(active_execution_ids));
let active_execution_ids_clone = active_execution_ids.clone();
// Spawn worker simulators
let worker_handle = tokio::spawn(async move {
@@ -514,10 +568,8 @@ async fn test_multiple_workers_simulation() {
sleep(Duration::from_millis(delay)).await;
// Worker completes and notifies
manager_clone
.notify_completion(action_id)
.await
.expect("Notify should succeed");
let mut active_execution_ids = active_execution_ids_clone.lock().await;
release_next_active(&manager_clone, &mut active_execution_ids).await;
worker_completions_clone.lock().await[next_worker] += 1;
@@ -536,7 +588,7 @@ async fn test_multiple_workers_simulation() {
// Verify FIFO order maintained despite different worker speeds
let order = execution_order.lock().await;
let expected: Vec<i64> = (0..num_executions).collect();
let expected: Vec<_> = (0..num_executions).collect();
assert_eq!(
*order, expected,
"FIFO order should be maintained regardless of worker speed"
@@ -576,25 +628,28 @@ async fn test_cross_action_independence() {
let executions_per_action = 50;
let mut handles = vec![];
let mut action1_active = VecDeque::new();
let mut action2_active = VecDeque::new();
let mut action3_active = VecDeque::new();
// Spawn executions for all three actions simultaneously
for action_id in [action1_id, action2_id, action3_id] {
let action_ref = format!("fifo_test_action_{}_{}", suffix, action_id);
for i in 0..executions_per_action {
let pool_clone = pool.clone();
let exec_id =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested)
.await;
match action_id {
id if id == action1_id && i == 0 => action1_active.push_back(exec_id),
id if id == action2_id && i == 0 => action2_active.push_back(exec_id),
id if id == action3_id && i == 0 => action3_active.push_back(exec_id),
_ => {}
}
let manager_clone = manager.clone();
let action_ref_clone = action_ref.clone();
let handle = tokio::spawn(async move {
let exec_id = create_test_execution(
&pool_clone,
action_id,
&action_ref_clone,
ExecutionStatus::Requested,
)
.await;
manager_clone
.enqueue_and_wait(action_id, exec_id, 1, None)
.await
@@ -634,18 +689,9 @@ async fn test_cross_action_independence() {
// Release all actions in an interleaved pattern
for i in 0..executions_per_action {
// Release one from each action
manager
.notify_completion(action1_id)
.await
.expect("Notify should succeed");
manager
.notify_completion(action2_id)
.await
.expect("Notify should succeed");
manager
.notify_completion(action3_id)
.await
.expect("Notify should succeed");
release_next_active(&manager, &mut action1_active).await;
release_next_active(&manager, &mut action2_active).await;
release_next_active(&manager, &mut action3_active).await;
if i % 10 == 0 {
sleep(Duration::from_millis(10)).await;
@@ -698,6 +744,7 @@ async fn test_cancellation_during_queue() {
// Fill capacity
let exec_id =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
let mut active_execution_ids = VecDeque::from([exec_id]);
manager
.enqueue_and_wait(action_id, exec_id, max_concurrent, None)
.await
@@ -757,7 +804,7 @@ async fn test_cancellation_during_queue() {
// Release remaining
for _ in 0..8 {
manager.notify_completion(action_id).await.unwrap();
release_next_active(&manager, &mut active_execution_ids).await;
sleep(Duration::from_millis(20)).await;
}
@@ -798,11 +845,15 @@ async fn test_queue_stats_persistence() {
let max_concurrent = 5;
let num_executions = 50;
let mut active_execution_ids = VecDeque::new();
// Enqueue executions
for i in 0..num_executions {
let exec_id =
create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await;
if i < max_concurrent {
active_execution_ids.push_back(exec_id);
}
// Start the enqueue in background
let manager_clone = manager.clone();
@@ -838,7 +889,7 @@ async fn test_queue_stats_persistence() {
// Release all
for _ in 0..num_executions {
manager.notify_completion(action_id).await.unwrap();
release_next_active(&manager, &mut active_execution_ids).await;
sleep(Duration::from_millis(10)).await;
}
@@ -854,8 +905,8 @@ async fn test_queue_stats_persistence() {
assert_eq!(final_db_stats.queue_length, 0);
assert_eq!(final_mem_stats.queue_length, 0);
assert_eq!(final_db_stats.total_enqueued, num_executions);
assert_eq!(final_db_stats.total_completed, num_executions);
assert_eq!(final_db_stats.total_enqueued, num_executions as i64);
assert_eq!(final_db_stats.total_completed, num_executions as i64);
// Cleanup
cleanup_test_data(&pool, pack_id).await;
@@ -951,6 +1002,7 @@ async fn test_extreme_stress_10k_executions() {
let max_concurrent = 10;
let num_executions: i64 = 10000;
let completed = Arc::new(Mutex::new(0u64));
let execution_ids = Arc::new(Mutex::new(vec![None; num_executions as usize]));
println!(
"Starting extreme stress test with {} executions...",
@@ -965,6 +1017,7 @@ async fn test_extreme_stress_10k_executions() {
let manager_clone = manager.clone();
let action_ref_clone = action_ref.clone();
let completed_clone = completed.clone();
let ids = execution_ids.clone();
let handle = tokio::spawn(async move {
let exec_id = create_test_execution(
@@ -974,6 +1027,7 @@ async fn test_extreme_stress_10k_executions() {
ExecutionStatus::Requested,
)
.await;
ids.lock().await[i as usize] = Some(exec_id);
manager_clone
.enqueue_and_wait(action_id, exec_id, max_concurrent, None)
@@ -999,12 +1053,18 @@ async fn test_extreme_stress_10k_executions() {
println!("All executions spawned");
// Release all
let ids = execution_ids.lock().await;
let mut active_execution_ids = VecDeque::from(
ids.iter()
.take(max_concurrent as usize)
.map(|id| id.expect("Initial execution id should be recorded"))
.collect::<Vec<_>>(),
);
drop(ids);
let release_start = std::time::Instant::now();
for i in 0i64..num_executions {
manager
.notify_completion(action_id)
.await
.expect("Notify should succeed");
release_next_active(&manager, &mut active_execution_ids).await;
if i % 1000 == 0 {
println!("Released: {}", i);

View File

@@ -1003,7 +1003,11 @@ impl ActionExecutor {
..Default::default()
};
ExecutionRepository::update(&self.pool, execution_id, input).await?;
let execution = ExecutionRepository::find_by_id(&self.pool, execution_id)
.await?
.ok_or_else(|| anyhow::anyhow!("Execution {} not found", execution_id))?;
ExecutionRepository::update_loaded(&self.pool, &execution, input).await?;
Ok(())
}