//! Integration and stress tests for FIFO Policy Execution Ordering //! //! These tests verify the complete execution ordering system including: //! - End-to-end FIFO ordering with database persistence //! - High-concurrency stress scenarios (1000+ executions) //! - Multiple worker simulation //! - Queue statistics accuracy under load //! - Policy integration (concurrency + delays) //! - Failure and cancellation scenarios //! - Cross-action independence at scale use attune_common::{ config::Config, db::Database, models::enums::ExecutionStatus, repositories::{ action::{ActionRepository, CreateActionInput}, execution::{CreateExecutionInput, ExecutionRepository}, pack::{CreatePackInput, PackRepository}, queue_stats::QueueStatsRepository, runtime::{CreateRuntimeInput, RuntimeRepository}, Create, }, }; use attune_executor::queue_manager::{ExecutionQueueManager, QueueConfig}; use chrono::Utc; use serde_json::json; use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; use tokio::time::sleep; /// Test helper to set up database connection async fn setup_db() -> PgPool { let config = Config::load().expect("Failed to load config"); let db = Database::new(&config.database) .await .expect("Failed to connect to database"); db.pool().clone() } /// Test helper to create a test pack async fn create_test_pack(pool: &PgPool, suffix: &str) -> i64 { let pack_input = CreatePackInput { r#ref: format!("fifo_test_pack_{}", suffix), label: format!("FIFO Test Pack {}", suffix), description: Some(format!("Test pack for FIFO ordering tests {}", suffix)), version: "1.0.0".to_string(), conf_schema: json!({}), config: json!({}), meta: json!({}), tags: vec![], runtime_deps: vec![], dependencies: vec![], is_standard: false, installers: json!({}), }; PackRepository::create(pool, pack_input) .await .expect("Failed to create test pack") .id } /// Test helper to create a test runtime #[allow(dead_code)] async fn _create_test_runtime(pool: &PgPool, suffix: &str) -> i64 { let runtime_input = CreateRuntimeInput { r#ref: format!("fifo_test_runtime_{}", suffix), pack: None, pack_ref: None, description: Some(format!("Test runtime {}", suffix)), name: format!("Python {}", suffix), aliases: vec![], distributions: json!({"ubuntu": "python3"}), installation: Some(json!({"method": "apt"})), execution_config: json!({ "interpreter": { "binary": "python3", "args": ["-u"], "file_extension": ".py" } }), auto_detected: false, detection_config: json!({}), }; RuntimeRepository::create(pool, runtime_input) .await .expect("Failed to create test runtime") .id } /// Test helper to create a test action async fn create_test_action(pool: &PgPool, pack_id: i64, pack_ref: &str, suffix: &str) -> i64 { let action_input = CreateActionInput { r#ref: format!("fifo_test_action_{}", suffix), pack: pack_id, pack_ref: pack_ref.to_string(), label: format!("FIFO Test Action {}", suffix), description: format!("Test action {}", suffix), entrypoint: "echo test".to_string(), runtime: None, runtime_version_constraint: None, param_schema: None, out_schema: None, is_adhoc: false, }; ActionRepository::create(pool, action_input) .await .expect("Failed to create test action") .id } /// Test helper to create a test execution async fn create_test_execution( pool: &PgPool, action_id: i64, action_ref: &str, status: ExecutionStatus, ) -> i64 { let execution_input = CreateExecutionInput { action: Some(action_id), action_ref: action_ref.to_string(), config: None, env_vars: None, parent: None, enforcement: None, executor: None, worker: None, status, result: None, workflow_task: None, }; ExecutionRepository::create(pool, execution_input) .await .expect("Failed to create test execution") .id } /// Test helper to cleanup test data async fn cleanup_test_data(pool: &PgPool, pack_id: i64) { // Delete queue stats sqlx::query("DELETE FROM attune.queue_stats WHERE action_id IN (SELECT id FROM attune.action WHERE pack = $1)") .bind(pack_id) .execute(pool) .await .ok(); // Delete executions sqlx::query("DELETE FROM attune.execution WHERE action IN (SELECT id FROM attune.action WHERE pack = $1)") .bind(pack_id) .execute(pool) .await .ok(); // Delete actions sqlx::query("DELETE FROM attune.action WHERE pack = $1") .bind(pack_id) .execute(pool) .await .ok(); // Delete pack sqlx::query("DELETE FROM attune.pack WHERE id = $1") .bind(pack_id) .execute(pool) .await .ok(); } #[tokio::test] #[ignore] // Requires database async fn test_fifo_ordering_with_database() { let pool = setup_db().await; let timestamp = Utc::now().timestamp(); let suffix = format!("fifo_db_{}", timestamp); let pack_id = create_test_pack(&pool, &suffix).await; let pack_ref = format!("fifo_test_pack_{}", suffix); let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; let action_ref = format!("fifo_test_action_{}", suffix); // Create queue manager with database pool let manager = Arc::new(ExecutionQueueManager::with_db_pool( QueueConfig::default(), pool.clone(), )); let max_concurrent = 1; let num_executions = 10; let execution_order = Arc::new(Mutex::new(Vec::new())); let mut handles = vec![]; // Create first execution in database and enqueue let first_exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; manager .enqueue_and_wait(action_id, first_exec_id, max_concurrent) .await .expect("First execution should enqueue"); // Spawn multiple executions for i in 1..num_executions { let pool_clone = pool.clone(); let manager_clone = manager.clone(); let order = execution_order.clone(); let action_ref_clone = action_ref.clone(); let handle = tokio::spawn(async move { // Create execution in database let exec_id = create_test_execution( &pool_clone, action_id, &action_ref_clone, ExecutionStatus::Requested, ) .await; // Enqueue and wait manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .expect("Enqueue should succeed"); // Record order order.lock().await.push(i); }); handles.push(handle); } // Give tasks time to queue sleep(Duration::from_millis(200)).await; // Verify queue stats in database let stats = QueueStatsRepository::find_by_action(&pool, action_id) .await .expect("Should get queue stats") .expect("Queue stats should exist"); assert_eq!(stats.action_id, action_id); assert_eq!(stats.active_count as u32, 1); assert_eq!(stats.queue_length as usize, (num_executions - 1) as usize); assert_eq!(stats.max_concurrent as u32, max_concurrent); // Release them one by one for _ in 0..num_executions { sleep(Duration::from_millis(50)).await; manager .notify_completion(action_id) .await .expect("Notify should succeed"); } // Wait for all to complete for handle in handles { handle.await.expect("Task should complete"); } // Verify FIFO order let order = execution_order.lock().await; let expected: Vec = (1..num_executions).collect(); assert_eq!(*order, expected, "Executions should complete in FIFO order"); // Cleanup cleanup_test_data(&pool, pack_id).await; } #[tokio::test] #[ignore] // Requires database - stress test async fn test_high_concurrency_stress() { let pool = setup_db().await; let timestamp = Utc::now().timestamp(); let suffix = format!("stress_{}", timestamp); let pack_id = create_test_pack(&pool, &suffix).await; let pack_ref = format!("fifo_test_pack_{}", suffix); let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; let action_ref = format!("fifo_test_action_{}", suffix); let manager = Arc::new(ExecutionQueueManager::with_db_pool( QueueConfig { max_queue_length: 2000, queue_timeout_seconds: 300, enable_metrics: true, }, pool.clone(), )); let max_concurrent = 5; let num_executions: i64 = 1000; let execution_order = Arc::new(Mutex::new(Vec::new())); let mut handles = vec![]; println!("Starting stress test with {} executions...", num_executions); let start_time = std::time::Instant::now(); // Start first batch to fill capacity for i in 0i64..max_concurrent as i64 { let pool_clone = pool.clone(); let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let order = execution_order.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( &pool_clone, action_id, &action_ref_clone, ExecutionStatus::Requested, ) .await; manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .expect("Enqueue should succeed"); order.lock().await.push(i); }); handles.push(handle); } // Queue remaining executions for i in max_concurrent as i64..num_executions { let pool_clone = pool.clone(); let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let order = execution_order.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( &pool_clone, action_id, &action_ref_clone, ExecutionStatus::Requested, ) .await; manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .expect("Enqueue should succeed"); order.lock().await.push(i); }); handles.push(handle); // Small delay to avoid overwhelming the system if i % 100 == 0 { sleep(Duration::from_millis(10)).await; } } // Give tasks time to queue sleep(Duration::from_millis(500)).await; println!("All tasks queued, checking stats..."); // Verify queue stats let stats = manager.get_queue_stats(action_id).await; assert!(stats.is_some(), "Queue stats should exist"); let stats = stats.unwrap(); assert_eq!(stats.active_count, max_concurrent); assert!(stats.queue_length > 0, "Should have queued executions"); println!( "Queue stats - Active: {}, Queued: {}, Total: {}", stats.active_count, stats.queue_length, stats.total_enqueued ); // Release all executions println!("Releasing executions..."); for i in 0..num_executions { if i % 100 == 0 { println!("Released {} executions", i); } manager .notify_completion(action_id) .await .expect("Notify should succeed"); // Small delay to allow queue processing if i % 50 == 0 { sleep(Duration::from_millis(5)).await; } } // Wait for all to complete println!("Waiting for all tasks to complete..."); for (i, handle) in handles.into_iter().enumerate() { if i % 100 == 0 { println!("Completed {} tasks", i); } handle.await.expect("Task should complete"); } let elapsed = start_time.elapsed(); println!( "Stress test completed in {:.2}s ({:.0} exec/sec)", elapsed.as_secs_f64(), num_executions as f64 / elapsed.as_secs_f64() ); // Verify FIFO order let order = execution_order.lock().await; assert_eq!( order.len(), num_executions as usize, "All executions should complete" ); let expected: Vec = (0..num_executions).collect(); assert_eq!( *order, expected, "Executions should complete in strict FIFO order" ); // Verify final queue stats let final_stats = manager.get_queue_stats(action_id).await.unwrap(); assert_eq!(final_stats.queue_length, 0, "Queue should be empty"); assert_eq!( final_stats.total_enqueued, num_executions as u64, "Should track all enqueues" ); assert_eq!( final_stats.total_completed, num_executions as u64, "Should track all completions" ); println!("Final stats verified - Test passed!"); // Cleanup cleanup_test_data(&pool, pack_id).await; } #[tokio::test] #[ignore] // Requires database async fn test_multiple_workers_simulation() { let pool = setup_db().await; let timestamp = Utc::now().timestamp(); let suffix = format!("workers_{}", timestamp); let pack_id = create_test_pack(&pool, &suffix).await; let pack_ref = format!("fifo_test_pack_{}", suffix); let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; let action_ref = format!("fifo_test_action_{}", suffix); let manager = Arc::new(ExecutionQueueManager::with_db_pool( QueueConfig::default(), pool.clone(), )); let max_concurrent = 3; let num_executions = 30; let execution_order = Arc::new(Mutex::new(Vec::new())); let mut handles = vec![]; // Spawn all executions for i in 0..num_executions { let pool_clone = pool.clone(); let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let order = execution_order.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( &pool_clone, action_id, &action_ref_clone, ExecutionStatus::Requested, ) .await; manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .expect("Enqueue should succeed"); order.lock().await.push(i); }); handles.push(handle); } sleep(Duration::from_millis(200)).await; // Simulate workers completing at different rates // Worker 1: Fast (completes every 10ms) // Worker 2: Medium (completes every 30ms) // Worker 3: Slow (completes every 50ms) let worker_completions = Arc::new(Mutex::new(vec![0, 0, 0])); let worker_completions_clone = worker_completions.clone(); let manager_clone = manager.clone(); // Spawn worker simulators let worker_handle = tokio::spawn(async move { let mut next_worker = 0; for _ in 0..num_executions { // Simulate varying completion times let delay = match next_worker { 0 => 10, // Fast worker 1 => 30, // Medium worker _ => 50, // Slow worker }; sleep(Duration::from_millis(delay)).await; // Worker completes and notifies manager_clone .notify_completion(action_id) .await .expect("Notify should succeed"); worker_completions_clone.lock().await[next_worker] += 1; // Round-robin between workers next_worker = (next_worker + 1) % 3; } }); // Wait for all executions and workers for handle in handles { handle.await.expect("Task should complete"); } worker_handle .await .expect("Worker simulator should complete"); // Verify FIFO order maintained despite different worker speeds let order = execution_order.lock().await; let expected: Vec = (0..num_executions).collect(); assert_eq!( *order, expected, "FIFO order should be maintained regardless of worker speed" ); // Verify workers distributed load let completions = worker_completions.lock().await; println!("Worker completions: {:?}", *completions); assert!( completions.iter().all(|&c| c > 0), "All workers should have completed some executions" ); // Cleanup cleanup_test_data(&pool, pack_id).await; } #[tokio::test] #[ignore] // Requires database async fn test_cross_action_independence() { let pool = setup_db().await; let timestamp = Utc::now().timestamp(); let suffix = format!("independence_{}", timestamp); let pack_id = create_test_pack(&pool, &suffix).await; let pack_ref = format!("fifo_test_pack_{}", suffix); // Create three different actions let action1_id = create_test_action(&pool, pack_id, &pack_ref, &format!("{}_a1", suffix)).await; let action2_id = create_test_action(&pool, pack_id, &pack_ref, &format!("{}_a2", suffix)).await; let action3_id = create_test_action(&pool, pack_id, &pack_ref, &format!("{}_a3", suffix)).await; let manager = Arc::new(ExecutionQueueManager::with_db_pool( QueueConfig::default(), pool.clone(), )); let executions_per_action = 50; let mut handles = vec![]; // Spawn executions for all three actions simultaneously for action_id in [action1_id, action2_id, action3_id] { let action_ref = format!("fifo_test_action_{}_{}", suffix, action_id); for i in 0..executions_per_action { let pool_clone = pool.clone(); let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( &pool_clone, action_id, &action_ref_clone, ExecutionStatus::Requested, ) .await; manager_clone .enqueue_and_wait(action_id, exec_id, 1) .await .expect("Enqueue should succeed"); (action_id, i) }); handles.push(handle); } } sleep(Duration::from_millis(300)).await; // Verify all three queues exist independently let stats1 = manager.get_queue_stats(action1_id).await.unwrap(); let stats2 = manager.get_queue_stats(action2_id).await.unwrap(); let stats3 = manager.get_queue_stats(action3_id).await.unwrap(); assert_eq!(stats1.action_id, action1_id); assert_eq!(stats2.action_id, action2_id); assert_eq!(stats3.action_id, action3_id); println!( "Action 1 - Active: {}, Queued: {}", stats1.active_count, stats1.queue_length ); println!( "Action 2 - Active: {}, Queued: {}", stats2.active_count, stats2.queue_length ); println!( "Action 3 - Active: {}, Queued: {}", stats3.active_count, stats3.queue_length ); // Release all actions in an interleaved pattern for i in 0..executions_per_action { // Release one from each action manager .notify_completion(action1_id) .await .expect("Notify should succeed"); manager .notify_completion(action2_id) .await .expect("Notify should succeed"); manager .notify_completion(action3_id) .await .expect("Notify should succeed"); if i % 10 == 0 { sleep(Duration::from_millis(10)).await; } } // Wait for all to complete for handle in handles { handle.await.expect("Task should complete"); } // Verify all queues are empty let final_stats1 = manager.get_queue_stats(action1_id).await.unwrap(); let final_stats2 = manager.get_queue_stats(action2_id).await.unwrap(); let final_stats3 = manager.get_queue_stats(action3_id).await.unwrap(); assert_eq!(final_stats1.queue_length, 0); assert_eq!(final_stats2.queue_length, 0); assert_eq!(final_stats3.queue_length, 0); assert_eq!(final_stats1.total_enqueued, executions_per_action as u64); assert_eq!(final_stats2.total_enqueued, executions_per_action as u64); assert_eq!(final_stats3.total_enqueued, executions_per_action as u64); // Cleanup cleanup_test_data(&pool, pack_id).await; } #[tokio::test] #[ignore] // Requires database async fn test_cancellation_during_queue() { let pool = setup_db().await; let timestamp = Utc::now().timestamp(); let suffix = format!("cancel_{}", timestamp); let pack_id = create_test_pack(&pool, &suffix).await; let pack_ref = format!("fifo_test_pack_{}", suffix); let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; let action_ref = format!("fifo_test_action_{}", suffix); let manager = Arc::new(ExecutionQueueManager::with_db_pool( QueueConfig::default(), pool.clone(), )); let max_concurrent = 1; let mut handles = vec![]; let execution_ids = Arc::new(Mutex::new(Vec::new())); // Fill capacity let exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; manager .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .unwrap(); // Queue 10 more for _ in 0..10 { let pool_clone = pool.clone(); let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let ids = execution_ids.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( &pool_clone, action_id, &action_ref_clone, ExecutionStatus::Requested, ) .await; ids.lock().await.push(exec_id); manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent) .await }); handles.push(handle); } sleep(Duration::from_millis(200)).await; // Verify queue has 10 items let stats = manager.get_queue_stats(action_id).await.unwrap(); assert_eq!(stats.queue_length, 10); // Cancel executions at positions 2, 5, 8 let ids = execution_ids.lock().await; let to_cancel = vec![ids[2], ids[5], ids[8]]; drop(ids); for cancel_id in &to_cancel { let cancelled = manager .cancel_execution(action_id, *cancel_id) .await .unwrap(); assert!(cancelled, "Should successfully cancel queued execution"); } // Verify queue length decreased let stats = manager.get_queue_stats(action_id).await.unwrap(); assert_eq!( stats.queue_length, 7, "Three executions should be removed from queue" ); // Release remaining for _ in 0..8 { manager.notify_completion(action_id).await.unwrap(); sleep(Duration::from_millis(20)).await; } // Wait for handles to complete or error let mut completed = 0; let mut cancelled = 0; for handle in handles { match handle.await { Ok(Ok(_)) => completed += 1, Ok(Err(_)) => cancelled += 1, Err(_) => cancelled += 1, } } assert_eq!(completed, 7, "Seven executions should complete"); assert_eq!(cancelled, 3, "Three executions should be cancelled"); // Cleanup cleanup_test_data(&pool, pack_id).await; } #[tokio::test] #[ignore] // Requires database async fn test_queue_stats_persistence() { let pool = setup_db().await; let timestamp = Utc::now().timestamp(); let suffix = format!("stats_{}", timestamp); let pack_id = create_test_pack(&pool, &suffix).await; let pack_ref = format!("fifo_test_pack_{}", suffix); let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; let action_ref = format!("fifo_test_action_{}", suffix); let manager = Arc::new(ExecutionQueueManager::with_db_pool( QueueConfig::default(), pool.clone(), )); let max_concurrent = 5; let num_executions = 50; // Enqueue executions for i in 0..num_executions { let exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; // Start the enqueue in background let manager_clone = manager.clone(); tokio::spawn(async move { manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .ok(); }); if i % 10 == 0 { sleep(Duration::from_millis(50)).await; // Check database stats persistence let db_stats = QueueStatsRepository::find_by_action(&pool, action_id) .await .expect("Should query database") .expect("Stats should exist in database"); let mem_stats = manager.get_queue_stats(action_id).await.unwrap(); // Verify memory and database are in sync assert_eq!(db_stats.action_id, mem_stats.action_id); assert_eq!(db_stats.queue_length as usize, mem_stats.queue_length); assert_eq!(db_stats.active_count as u32, mem_stats.active_count); assert_eq!(db_stats.max_concurrent as u32, mem_stats.max_concurrent); assert_eq!(db_stats.total_enqueued as u64, mem_stats.total_enqueued); assert_eq!(db_stats.total_completed as u64, mem_stats.total_completed); } } sleep(Duration::from_millis(200)).await; // Release all for _ in 0..num_executions { manager.notify_completion(action_id).await.unwrap(); sleep(Duration::from_millis(10)).await; } sleep(Duration::from_millis(100)).await; // Final verification let final_db_stats = QueueStatsRepository::find_by_action(&pool, action_id) .await .expect("Should query database") .expect("Stats should exist"); let final_mem_stats = manager.get_queue_stats(action_id).await.unwrap(); assert_eq!(final_db_stats.queue_length, 0); assert_eq!(final_mem_stats.queue_length, 0); assert_eq!(final_db_stats.total_enqueued, num_executions); assert_eq!(final_db_stats.total_completed, num_executions); // Cleanup cleanup_test_data(&pool, pack_id).await; } #[tokio::test] #[ignore] // Requires database async fn test_queue_full_rejection() { let pool = setup_db().await; let timestamp = Utc::now().timestamp(); let suffix = format!("full_{}", timestamp); let pack_id = create_test_pack(&pool, &suffix).await; let pack_ref = format!("fifo_test_pack_{}", suffix); let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; let action_ref = format!("fifo_test_action_{}", suffix); let manager = Arc::new(ExecutionQueueManager::with_db_pool( QueueConfig { max_queue_length: 10, queue_timeout_seconds: 60, enable_metrics: true, }, pool.clone(), )); let max_concurrent = 1; // Fill capacity (1 active) let exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; manager .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .unwrap(); // Fill queue (10 queued) for _ in 0..10 { let exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; let manager_clone = manager.clone(); tokio::spawn(async move { manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .ok(); }); } sleep(Duration::from_millis(200)).await; // Verify queue is full let stats = manager.get_queue_stats(action_id).await.unwrap(); assert_eq!(stats.active_count, 1); assert_eq!(stats.queue_length, 10); // Next enqueue should fail let exec_id = create_test_execution(&pool, action_id, &action_ref, ExecutionStatus::Requested).await; let result = manager .enqueue_and_wait(action_id, exec_id, max_concurrent) .await; assert!(result.is_err(), "Should reject when queue is full"); assert!(result.unwrap_err().to_string().contains("Queue full")); // Cleanup cleanup_test_data(&pool, pack_id).await; } #[tokio::test] #[ignore] // Requires database - very long stress test async fn test_extreme_stress_10k_executions() { let pool = setup_db().await; let timestamp = Utc::now().timestamp(); let suffix = format!("extreme_{}", timestamp); let pack_id = create_test_pack(&pool, &suffix).await; let pack_ref = format!("fifo_test_pack_{}", suffix); let action_id = create_test_action(&pool, pack_id, &pack_ref, &suffix).await; let action_ref = format!("fifo_test_action_{}", suffix); let manager = Arc::new(ExecutionQueueManager::with_db_pool( QueueConfig { max_queue_length: 15000, queue_timeout_seconds: 600, enable_metrics: true, }, pool.clone(), )); let max_concurrent = 10; let num_executions: i64 = 10000; let completed = Arc::new(Mutex::new(0u64)); println!( "Starting extreme stress test with {} executions...", num_executions ); let start_time = std::time::Instant::now(); // Spawn all executions let mut handles = vec![]; for i in 0i64..num_executions { let pool_clone = pool.clone(); let manager_clone = manager.clone(); let action_ref_clone = action_ref.clone(); let completed_clone = completed.clone(); let handle = tokio::spawn(async move { let exec_id = create_test_execution( &pool_clone, action_id, &action_ref_clone, ExecutionStatus::Requested, ) .await; manager_clone .enqueue_and_wait(action_id, exec_id, max_concurrent) .await .expect("Enqueue should succeed"); let mut count = completed_clone.lock().await; *count += 1; if *count % 1000 == 0 { println!("Enqueued: {}", *count); } }); handles.push(handle); // Batch spawn to avoid overwhelming scheduler if i % 500 == 0 { sleep(Duration::from_millis(10)).await; } } sleep(Duration::from_millis(1000)).await; println!("All executions spawned"); // Release all let release_start = std::time::Instant::now(); for i in 0i64..num_executions { manager .notify_completion(action_id) .await .expect("Notify should succeed"); if i % 1000 == 0 { println!("Released: {}", i); sleep(Duration::from_millis(10)).await; } } println!( "All releases sent in {:.2}s", release_start.elapsed().as_secs_f64() ); // Wait for all to complete println!("Waiting for all tasks to complete..."); for (i, handle) in handles.into_iter().enumerate() { if i % 1000 == 0 { println!("Awaited: {}", i); } handle.await.expect("Task should complete"); } let elapsed = start_time.elapsed(); println!( "Extreme stress test completed in {:.2}s ({:.0} exec/sec)", elapsed.as_secs_f64(), num_executions as f64 / elapsed.as_secs_f64() ); // Verify final state let final_stats = manager.get_queue_stats(action_id).await.unwrap(); assert_eq!(final_stats.queue_length, 0); assert_eq!(final_stats.total_enqueued as i64, num_executions); assert_eq!(final_stats.total_completed as i64, num_executions); println!("Extreme stress test passed!"); // Cleanup cleanup_test_data(&pool, pack_id).await; }