//! Integration tests for SSE execution stream endpoint //! //! These tests verify that: //! 1. PostgreSQL LISTEN/NOTIFY correctly triggers notifications //! 2. The SSE endpoint streams execution updates in real-time //! 3. Filtering by execution_id works correctly //! 4. Authentication is properly enforced //! 5. Reconnection and error handling work as expected use attune_common::{ models::*, repositories::{ action::{ActionRepository, CreateActionInput}, execution::{CreateExecutionInput, ExecutionRepository}, pack::{CreatePackInput, PackRepository}, Create, }, }; use futures::StreamExt; use reqwest_eventsource::{Event, EventSource}; use serde_json::{json, Value}; use sqlx::PgPool; use std::time::Duration; use tokio::time::timeout; mod helpers; use helpers::TestContext; type Result = std::result::Result>; /// Helper to set up test pack and action async fn setup_test_pack_and_action(pool: &PgPool) -> Result<(Pack, Action)> { let pack_input = CreatePackInput { r#ref: "test_sse_pack".to_string(), label: "Test SSE Pack".to_string(), description: Some("Pack for SSE testing".to_string()), version: "1.0.0".to_string(), conf_schema: json!({}), config: json!({}), meta: json!({"author": "test"}), tags: vec!["test".to_string()], runtime_deps: vec![], is_standard: false, installers: json!({}), }; let pack = PackRepository::create(pool, pack_input).await?; let action_input = CreateActionInput { r#ref: format!("{}.test_action", pack.r#ref), pack: pack.id, pack_ref: pack.r#ref.clone(), label: "Test Action".to_string(), description: "Test action for SSE tests".to_string(), entrypoint: "test.sh".to_string(), runtime: None, param_schema: None, out_schema: None, is_adhoc: false, }; let action = ActionRepository::create(pool, action_input).await?; Ok((pack, action)) } /// Helper to create a test execution async fn create_test_execution(pool: &PgPool, action_id: i64) -> Result { let input = CreateExecutionInput { action: Some(action_id), action_ref: format!("action_{}", action_id), config: None, env_vars: None, parent: None, enforcement: None, executor: None, status: ExecutionStatus::Scheduled, result: None, workflow_task: None, }; Ok(ExecutionRepository::create(pool, input).await?) } /// This test requires a running API server on port 8080 /// Run with: cargo test test_sse_stream_receives_execution_updates -- --ignored --nocapture /// After starting: cargo run -p attune-api -- -c config.test.yaml #[tokio::test] #[ignore] async fn test_sse_stream_receives_execution_updates() -> Result<()> { // Set up test context with auth let ctx = TestContext::new().await?.with_auth().await?; let token = ctx.token().unwrap(); // Create test pack, action, and execution let (_pack, action) = setup_test_pack_and_action(&ctx.pool).await?; let execution = create_test_execution(&ctx.pool, action.id).await?; println!( "Created execution: id={}, status={:?}", execution.id, execution.status ); // Build SSE URL with authentication let sse_url = format!( "http://localhost:8080/api/v1/executions/stream?execution_id={}&token={}", execution.id, token ); // Create SSE stream let mut stream = EventSource::get(&sse_url); // Spawn a task to update the execution status after a short delay let pool_clone = ctx.pool.clone(); let execution_id = execution.id; tokio::spawn(async move { // Wait a bit to ensure SSE connection is established tokio::time::sleep(Duration::from_millis(500)).await; println!("Updating execution {} to 'running' status", execution_id); // Update execution status - this should trigger PostgreSQL NOTIFY let _ = sqlx::query( "UPDATE execution SET status = 'running', start_time = NOW() WHERE id = $1", ) .bind(execution_id) .execute(&pool_clone) .await; println!("Update executed, waiting before setting to succeeded"); tokio::time::sleep(Duration::from_millis(500)).await; // Update to succeeded let _ = sqlx::query( "UPDATE execution SET status = 'succeeded', end_time = NOW() WHERE id = $1", ) .bind(execution_id) .execute(&pool_clone) .await; println!("Execution {} updated to 'succeeded'", execution_id); }); // Wait for SSE events with timeout let mut received_running = false; let mut received_succeeded = false; let mut attempts = 0; let max_attempts = 20; // 10 seconds total while attempts < max_attempts && (!received_running || !received_succeeded) { match timeout(Duration::from_millis(500), stream.next()).await { Ok(Some(Ok(event))) => { println!("Received SSE event: {:?}", event); match event { Event::Open => { println!("SSE connection established"); } Event::Message(msg) => { if let Ok(data) = serde_json::from_str::(&msg.data) { println!( "Parsed event data: {}", serde_json::to_string_pretty(&data)? ); if let Some(entity_type) = data.get("entity_type").and_then(|v| v.as_str()) { if entity_type == "execution" { if let Some(event_data) = data.get("data") { if let Some(status) = event_data.get("status").and_then(|v| v.as_str()) { println!( "Received execution update with status: {}", status ); if status == "running" { received_running = true; println!("✓ Received 'running' status"); } else if status == "succeeded" { received_succeeded = true; println!("✓ Received 'succeeded' status"); } } } } } } } } } Ok(Some(Err(e))) => { eprintln!("SSE stream error: {}", e); break; } Ok(None) => { println!("SSE stream ended"); break; } Err(_) => { // Timeout waiting for next event attempts += 1; println!( "Timeout waiting for event (attempt {}/{})", attempts, max_attempts ); } } } // Verify we received both updates assert!( received_running, "Should have received execution update with status 'running'" ); assert!( received_succeeded, "Should have received execution update with status 'succeeded'" ); println!("✓ Test passed: SSE stream received all expected updates"); Ok(()) } /// Test that SSE stream correctly filters by execution_id #[tokio::test] #[ignore] async fn test_sse_stream_filters_by_execution_id() -> Result<()> { // Set up test context with auth let ctx = TestContext::new().await?.with_auth().await?; let token = ctx.token().unwrap(); // Create test pack, action, and TWO executions let (_pack, action) = setup_test_pack_and_action(&ctx.pool).await?; let execution1 = create_test_execution(&ctx.pool, action.id).await?; let execution2 = create_test_execution(&ctx.pool, action.id).await?; println!( "Created executions: id1={}, id2={}", execution1.id, execution2.id ); // Subscribe to updates for execution1 only let sse_url = format!( "http://localhost:8080/api/v1/executions/stream?execution_id={}&token={}", execution1.id, token ); let mut stream = EventSource::get(&sse_url); // Update both executions let pool_clone = ctx.pool.clone(); let exec1_id = execution1.id; let exec2_id = execution2.id; tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(500)).await; // Update execution2 (should NOT appear in filtered stream) let _ = sqlx::query("UPDATE execution SET status = 'completed' WHERE id = $1") .bind(exec2_id) .execute(&pool_clone) .await; println!("Updated execution2 {} to 'completed'", exec2_id); tokio::time::sleep(Duration::from_millis(200)).await; // Update execution1 (SHOULD appear in filtered stream) let _ = sqlx::query("UPDATE execution SET status = 'running' WHERE id = $1") .bind(exec1_id) .execute(&pool_clone) .await; println!("Updated execution1 {} to 'running'", exec1_id); }); // Wait for events let mut received_exec1_update = false; let mut received_exec2_update = false; let mut attempts = 0; let max_attempts = 20; while attempts < max_attempts && !received_exec1_update { match timeout(Duration::from_millis(500), stream.next()).await { Ok(Some(Ok(event))) => match event { Event::Open => {} Event::Message(msg) => { if let Ok(data) = serde_json::from_str::(&msg.data) { if let Some(entity_id) = data.get("entity_id").and_then(|v| v.as_i64()) { println!("Received update for execution: {}", entity_id); if entity_id == execution1.id { received_exec1_update = true; println!("✓ Received update for execution1 (correct)"); } else if entity_id == execution2.id { received_exec2_update = true; println!( "✗ Received update for execution2 (should be filtered out)" ); } } } } }, Ok(Some(Err(_))) | Ok(None) => break, Err(_) => { attempts += 1; } } } // Should receive execution1 update but NOT execution2 assert!( received_exec1_update, "Should have received update for execution1" ); assert!( !received_exec2_update, "Should NOT have received update for execution2 (filtered out)" ); println!("✓ Test passed: SSE stream correctly filters by execution_id"); Ok(()) } #[tokio::test] #[ignore] async fn test_sse_stream_requires_authentication() -> Result<()> { // Try to connect without token let sse_url = "http://localhost:8080/api/v1/executions/stream"; let mut stream = EventSource::get(sse_url); // Should receive an error due to missing authentication let mut received_error = false; let mut attempts = 0; let max_attempts = 5; while attempts < max_attempts && !received_error { match timeout(Duration::from_millis(500), stream.next()).await { Ok(Some(Ok(_))) => { // Should not receive successful events without auth panic!("Received SSE event without authentication - this should not happen"); } Ok(Some(Err(e))) => { println!("Correctly received error without auth: {}", e); received_error = true; } Ok(None) => { println!("Stream ended (expected behavior for unauthorized)"); received_error = true; break; } Err(_) => { attempts += 1; println!("Timeout waiting for response (attempt {})", attempts); } } } assert!( received_error, "Should have received error or stream closure due to missing authentication" ); println!("✓ Test passed: SSE stream requires authentication"); Ok(()) } /// Test streaming all executions (no filter) #[tokio::test] #[ignore] async fn test_sse_stream_all_executions() -> Result<()> { // Set up test context with auth let ctx = TestContext::new().await?.with_auth().await?; let token = ctx.token().unwrap(); // Create test pack, action, and multiple executions let (_pack, action) = setup_test_pack_and_action(&ctx.pool).await?; let execution1 = create_test_execution(&ctx.pool, action.id).await?; let execution2 = create_test_execution(&ctx.pool, action.id).await?; println!( "Created executions: id1={}, id2={}", execution1.id, execution2.id ); // Subscribe to ALL execution updates (no execution_id filter) let sse_url = format!( "http://localhost:8080/api/v1/executions/stream?token={}", token ); let mut stream = EventSource::get(&sse_url); // Update both executions let pool_clone = ctx.pool.clone(); let exec1_id = execution1.id; let exec2_id = execution2.id; tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(500)).await; // Update execution1 let _ = sqlx::query("UPDATE execution SET status = 'running' WHERE id = $1") .bind(exec1_id) .execute(&pool_clone) .await; println!("Updated execution1 {} to 'running'", exec1_id); tokio::time::sleep(Duration::from_millis(200)).await; // Update execution2 let _ = sqlx::query("UPDATE execution SET status = 'running' WHERE id = $1") .bind(exec2_id) .execute(&pool_clone) .await; println!("Updated execution2 {} to 'running'", exec2_id); }); // Wait for events from BOTH executions let mut received_updates = std::collections::HashSet::new(); let mut attempts = 0; let max_attempts = 20; while attempts < max_attempts && received_updates.len() < 2 { match timeout(Duration::from_millis(500), stream.next()).await { Ok(Some(Ok(event))) => match event { Event::Open => {} Event::Message(msg) => { if let Ok(data) = serde_json::from_str::(&msg.data) { if let Some(entity_id) = data.get("entity_id").and_then(|v| v.as_i64()) { println!("Received update for execution: {}", entity_id); received_updates.insert(entity_id); } } } }, Ok(Some(Err(_))) | Ok(None) => break, Err(_) => { attempts += 1; } } } // Should have received updates for BOTH executions assert!( received_updates.contains(&execution1.id), "Should have received update for execution1" ); assert!( received_updates.contains(&execution2.id), "Should have received update for execution2" ); println!("✓ Test passed: SSE stream received updates for all executions (no filter)"); Ok(()) } /// Test that PostgreSQL NOTIFY triggers actually fire #[tokio::test] #[ignore] async fn test_postgresql_notify_trigger_fires() -> Result<()> { let ctx = TestContext::new().await?; // Create test pack, action, and execution let (_pack, action) = setup_test_pack_and_action(&ctx.pool).await?; let execution = create_test_execution(&ctx.pool, action.id).await?; println!("Created execution: id={}", execution.id); // Set up a listener on the PostgreSQL channel let mut listener = sqlx::postgres::PgListener::connect_with(&ctx.pool).await?; listener.listen("execution_events").await?; println!("Listening on channel 'execution_events'"); // Update the execution in another task let pool_clone = ctx.pool.clone(); let execution_id = execution.id; tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(500)).await; println!("Updating execution {} to trigger NOTIFY", execution_id); let _ = sqlx::query("UPDATE execution SET status = 'running' WHERE id = $1") .bind(execution_id) .execute(&pool_clone) .await; }); // Wait for the NOTIFY with a timeout let mut received_notification = false; let mut attempts = 0; let max_attempts = 10; while attempts < max_attempts && !received_notification { match timeout(Duration::from_millis(1000), listener.recv()).await { Ok(Ok(notification)) => { println!("Received NOTIFY: channel={}", notification.channel()); println!("Payload: {}", notification.payload()); // Parse the payload if let Ok(data) = serde_json::from_str::(notification.payload()) { if let Some(entity_id) = data.get("entity_id").and_then(|v| v.as_i64()) { if entity_id == execution.id { println!("✓ Received NOTIFY for our execution"); received_notification = true; } } } } Ok(Err(e)) => { eprintln!("Error receiving notification: {}", e); break; } Err(_) => { attempts += 1; println!("Timeout waiting for NOTIFY (attempt {})", attempts); } } } assert!( received_notification, "Should have received PostgreSQL NOTIFY when execution was updated" ); println!("✓ Test passed: PostgreSQL NOTIFY trigger fires correctly"); Ok(()) }