From 966a5af188d7cdefcceebea5fe06ca4f98a2c14a Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Mon, 9 Feb 2026 23:21:23 -0600 Subject: [PATCH] http_request action working nicely --- config.development.yaml | 1 + crates/common/src/config.rs | 8 + crates/executor/src/timeout_monitor.rs | 4 +- crates/sensor/src/main.rs | 45 +-- crates/sensor/src/rule_lifecycle_listener.rs | 29 +- crates/sensor/src/service.rs | 114 +++++--- crates/worker/src/runtime/local.rs | 2 +- crates/worker/src/runtime/python.rs | 16 +- crates/worker/src/runtime/shell.rs | 109 ++++++- crates/worker/src/service.rs | 136 ++++++--- docker-compose.yaml | 34 +-- docker/Dockerfile | 8 +- docker/Dockerfile.optimized | 21 +- docker/nginx.conf | 20 +- packs/core/actions/http_request.sh | 24 +- .../components/common/ExecuteActionModal.tsx | 273 ++++++++++++++++++ web/src/pages/actions/ActionsPage.tsx | 233 +-------------- .../pages/executions/ExecutionDetailPage.tsx | 38 +++ 18 files changed, 720 insertions(+), 395 deletions(-) create mode 100644 web/src/components/common/ExecuteActionModal.tsx diff --git a/config.development.yaml b/config.development.yaml index 50b4d25..a9cd0a9 100644 --- a/config.development.yaml +++ b/config.development.yaml @@ -80,6 +80,7 @@ sensor: heartbeat_interval: 10 max_concurrent_sensors: 20 sensor_timeout: 120 + shutdown_timeout: 30 polling_interval: 5 # Check for new sensors every 5 seconds cleanup_interval: 60 diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index d1eafd4..3ec7509 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -407,6 +407,10 @@ pub struct SensorConfig { /// Sensor execution timeout in seconds #[serde(default = "default_sensor_timeout")] pub sensor_timeout: u64, + + /// Graceful shutdown timeout in seconds + #[serde(default = "default_sensor_shutdown_timeout")] + pub shutdown_timeout: u64, } fn default_sensor_poll_interval() -> u64 { @@ -417,6 +421,10 @@ fn default_sensor_timeout() -> u64 { 30 } +fn default_sensor_shutdown_timeout() -> u64 { + 30 +} + /// Pack registry index configuration #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RegistryIndexConfig { diff --git a/crates/executor/src/timeout_monitor.rs b/crates/executor/src/timeout_monitor.rs index ce8c8c5..81a3680 100644 --- a/crates/executor/src/timeout_monitor.rs +++ b/crates/executor/src/timeout_monitor.rs @@ -112,7 +112,7 @@ impl ExecutionTimeoutMonitor { ORDER BY updated ASC LIMIT 100", // Process in batches to avoid overwhelming system ) - .bind("scheduled") + .bind(ExecutionStatus::Scheduled) .bind(cutoff) .fetch_all(&self.pool) .await?; @@ -186,7 +186,7 @@ impl ExecutionTimeoutMonitor { updated = NOW() WHERE id = $3", ) - .bind("failed") + .bind(ExecutionStatus::Failed) .bind(&result) .bind(execution_id) .execute(&self.pool) diff --git a/crates/sensor/src/main.rs b/crates/sensor/src/main.rs index b037bf2..7c3c516 100644 --- a/crates/sensor/src/main.rs +++ b/crates/sensor/src/main.rs @@ -8,6 +8,7 @@ use anyhow::Result; use attune_common::config::Config; use attune_sensor::service::SensorService; use clap::Parser; +use tokio::signal::unix::{signal, SignalKind}; use tracing::{error, info}; #[derive(Parser, Debug)] @@ -56,32 +57,38 @@ async fn main() -> Result<()> { info!("Message Queue: {}", mask_connection_string(&mq_config.url)); } - // Create sensor service + // Create and start sensor service let service = SensorService::new(config).await?; info!("Sensor Service initialized successfully"); - // Set up graceful shutdown handler - let service_clone = service.clone(); - tokio::spawn(async move { - if let Err(e) = tokio::signal::ctrl_c().await { - error!("Failed to listen for shutdown signal: {}", e); - } else { - info!("Shutdown signal received"); - if let Err(e) = service_clone.stop().await { - error!("Error during shutdown: {}", e); - } - } - }); - - // Start the service + // Start the service (spawns background tasks and returns) info!("Starting Sensor Service components..."); - if let Err(e) = service.start().await { - error!("Sensor Service error: {}", e); - return Err(e); + service.start().await?; + + info!("Attune Sensor Service is ready"); + + // Setup signal handlers for graceful shutdown + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; + + tokio::select! { + _ = sigint.recv() => { + info!("Received SIGINT signal"); + } + _ = sigterm.recv() => { + info!("Received SIGTERM signal"); + } } - info!("Sensor Service has shut down gracefully"); + info!("Shutting down gracefully..."); + + // Stop the service: deregister worker, stop sensors, clean up connections + if let Err(e) = service.stop().await { + error!("Error during shutdown: {}", e); + } + + info!("Attune Sensor Service shutdown complete"); Ok(()) } diff --git a/crates/sensor/src/rule_lifecycle_listener.rs b/crates/sensor/src/rule_lifecycle_listener.rs index ad821fd..c646a64 100644 --- a/crates/sensor/src/rule_lifecycle_listener.rs +++ b/crates/sensor/src/rule_lifecycle_listener.rs @@ -12,6 +12,7 @@ use serde_json::Value as JsonValue; use sqlx::PgPool; use std::sync::Arc; use tokio::sync::RwLock; +use tokio::task::JoinHandle; use tracing::{error, info, warn}; use crate::sensor_manager::SensorManager; @@ -22,6 +23,7 @@ pub struct RuleLifecycleListener { connection: Connection, sensor_manager: Arc, consumer: Arc>>, + task_handle: RwLock>>, } impl RuleLifecycleListener { @@ -32,6 +34,7 @@ impl RuleLifecycleListener { connection, sensor_manager, consumer: Arc::new(RwLock::new(None)), + task_handle: RwLock::new(None), } } @@ -88,19 +91,20 @@ impl RuleLifecycleListener { ); } - // Store consumer + // Store consumer reference (for cleanup on drop) *self.consumer.write().await = Some(consumer); - // Clone self for async handler + // Clone references for the spawned task let db = self.db.clone(); let sensor_manager = self.sensor_manager.clone(); let consumer_ref = self.consumer.clone(); - // Start consuming messages - tokio::spawn(async move { - // Get consumer from the Arc>> - let consumer_guard = consumer_ref.read().await; - if let Some(consumer) = consumer_guard.as_ref() { + // Start consuming messages in a background task. + // Take the consumer out of the Arc so we don't hold the read lock + // for the entire duration of consume_with_handler (which would deadlock stop()). + let handle = tokio::spawn(async move { + let consumer = consumer_ref.write().await.take(); + if let Some(consumer) = consumer { let result = consumer .consume_with_handler::(move |envelope| { let db = db.clone(); @@ -129,6 +133,8 @@ impl RuleLifecycleListener { } }); + *self.task_handle.write().await = Some(handle); + info!("Rule lifecycle listener started"); Ok(()) @@ -138,8 +144,15 @@ impl RuleLifecycleListener { pub async fn stop(&self) -> Result<()> { info!("Stopping rule lifecycle listener"); + // Abort the consumer task first — this ends the consume_with_handler loop + // and drops the Consumer (and its channel) inside the task. + if let Some(handle) = self.task_handle.write().await.take() { + handle.abort(); + let _ = handle.await; // wait for abort to complete + } + + // Clean up any consumer that wasn't taken by the task (e.g. if task never started) if let Some(consumer) = self.consumer.write().await.take() { - // Consumer will be dropped and connection closed drop(consumer); } diff --git a/crates/sensor/src/service.rs b/crates/sensor/src/service.rs index 7f317de..751ae01 100644 --- a/crates/sensor/src/service.rs +++ b/crates/sensor/src/service.rs @@ -2,6 +2,12 @@ //! //! Main service orchestrator that coordinates sensor management //! and rule lifecycle listening. +//! +//! Shutdown follows the same pattern as the worker service: +//! 1. Deregister worker (mark inactive, stop receiving new work) +//! 2. Stop heartbeat +//! 3. Stop sensor processes with configurable timeout +//! 4. Close MQ and DB connections use crate::rule_lifecycle_listener::RuleLifecycleListener; use crate::sensor_manager::SensorManager; @@ -12,6 +18,7 @@ use attune_common::db::Database; use attune_common::mq::MessageQueue; use sqlx::PgPool; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; use tracing::{error, info, warn}; @@ -29,7 +36,7 @@ struct SensorServiceInner { rule_lifecycle_listener: Arc, sensor_worker_registration: Arc>, heartbeat_interval: u64, - running: Arc>, + heartbeat_running: Arc>, } impl SensorService { @@ -112,18 +119,18 @@ impl SensorService { rule_lifecycle_listener, sensor_worker_registration: Arc::new(RwLock::new(sensor_worker_registration)), heartbeat_interval, - running: Arc::new(RwLock::new(false)), + heartbeat_running: Arc::new(RwLock::new(false)), }), }) } /// Start the sensor service + /// + /// Spawns background tasks (heartbeat, rule listener, sensor manager) and returns. + /// The caller is responsible for blocking on shutdown signals and calling `stop()`. pub async fn start(&self) -> Result<()> { info!("Starting Sensor Service"); - // Mark as running - *self.inner.running.write().await = true; - // Register sensor worker info!("Registering sensor worker..."); let worker_id = self @@ -152,37 +159,48 @@ impl SensorService { info!("Sensor manager started"); // Start heartbeat loop + *self.inner.heartbeat_running.write().await = true; + let registration = self.inner.sensor_worker_registration.clone(); let heartbeat_interval = self.inner.heartbeat_interval; - let running = self.inner.running.clone(); + let heartbeat_running = self.inner.heartbeat_running.clone(); tokio::spawn(async move { - while *running.read().await { - tokio::time::sleep(tokio::time::Duration::from_secs(heartbeat_interval)).await; + let mut ticker = tokio::time::interval(Duration::from_secs(heartbeat_interval)); + + loop { + ticker.tick().await; + + if !*heartbeat_running.read().await { + info!("Heartbeat loop stopping"); + break; + } + if let Err(e) = registration.read().await.heartbeat().await { error!("Failed to send sensor worker heartbeat: {}", e); } } + + info!("Heartbeat loop stopped"); }); - // Wait until stopped - while *self.inner.running.read().await { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - - info!("Sensor Service stopped"); + info!("Sensor Service started successfully"); Ok(()) } - /// Stop the sensor service + /// Stop the sensor service gracefully + /// + /// Shutdown order (mirrors worker service pattern): + /// 1. Deregister worker (mark inactive to stop being scheduled for new work) + /// 2. Stop heartbeat + /// 3. Stop sensor processes with timeout + /// 4. Stop rule lifecycle listener + /// 5. Close MQ and DB connections pub async fn stop(&self) -> Result<()> { - info!("Stopping Sensor Service"); + info!("Stopping Sensor Service - initiating graceful shutdown"); - // Mark as not running - *self.inner.running.write().await = false; - - // Deregister sensor worker - info!("Deregistering sensor worker..."); + // 1. Deregister sensor worker first to stop receiving new work + info!("Marking sensor worker as inactive to stop receiving new work"); if let Err(e) = self .inner .sensor_worker_registration @@ -194,25 +212,51 @@ impl SensorService { error!("Failed to deregister sensor worker: {}", e); } - // Stop rule lifecycle listener + // 2. Stop heartbeat + info!("Stopping heartbeat updates"); + *self.inner.heartbeat_running.write().await = false; + + // Wait a bit for heartbeat loop to notice the flag + tokio::time::sleep(Duration::from_millis(100)).await; + + // 3. Stop sensor processes with timeout + let shutdown_timeout = self + .inner + .config + .sensor + .as_ref() + .map(|s| s.shutdown_timeout) + .unwrap_or(30); + + info!( + "Waiting up to {} seconds for sensor processes to stop", + shutdown_timeout + ); + + let sensor_manager = self.inner.sensor_manager.clone(); + let timeout_duration = Duration::from_secs(shutdown_timeout); + match tokio::time::timeout(timeout_duration, sensor_manager.stop()).await { + Ok(Ok(_)) => info!("All sensor processes stopped"), + Ok(Err(e)) => error!("Error stopping sensor processes: {}", e), + Err(_) => warn!( + "Shutdown timeout reached ({} seconds) - some sensor processes may have been interrupted", + shutdown_timeout + ), + } + + // 4. Stop rule lifecycle listener info!("Stopping rule lifecycle listener..."); if let Err(e) = self.inner.rule_lifecycle_listener.stop().await { error!("Failed to stop rule lifecycle listener: {}", e); } - // Stop sensor manager - info!("Stopping sensor manager..."); - if let Err(e) = self.inner.sensor_manager.stop().await { - error!("Failed to stop sensor manager: {}", e); - } - - // Close message queue connection + // 5. Close message queue connection info!("Closing message queue connection..."); if let Err(e) = self.inner.mq.close().await { warn!("Error closing message queue: {}", e); } - // Close database connection + // 6. Close database connection info!("Closing database connection..."); self.inner.db.close().await; @@ -221,11 +265,6 @@ impl SensorService { Ok(()) } - /// Check if service is running - pub async fn is_running(&self) -> bool { - *self.inner.running.read().await - } - /// Get database pool pub fn db(&self) -> &PgPool { &self.inner.db @@ -243,11 +282,6 @@ impl SensorService { /// Get health status pub async fn health_check(&self) -> HealthStatus { - // Check if service is running - if !*self.inner.running.read().await { - return HealthStatus::Unhealthy("Service not running".to_string()); - } - // Check database connection if let Err(e) = sqlx::query("SELECT 1").execute(&self.inner.db).await { return HealthStatus::Unhealthy(format!("Database connection failed: {}", e)); diff --git a/crates/worker/src/runtime/local.rs b/crates/worker/src/runtime/local.rs index 04b6e17..2f3f798 100644 --- a/crates/worker/src/runtime/local.rs +++ b/crates/worker/src/runtime/local.rs @@ -123,7 +123,7 @@ impl Runtime for LocalRuntime { #[cfg(test)] mod tests { use super::*; - use crate::runtime::{ParameterDelivery, ParameterFormat}; + use crate::runtime::{OutputFormat, ParameterDelivery, ParameterFormat}; use std::collections::HashMap; #[tokio::test] diff --git a/crates/worker/src/runtime/python.rs b/crates/worker/src/runtime/python.rs index 0b571e6..8e7d213 100644 --- a/crates/worker/src/runtime/python.rs +++ b/crates/worker/src/runtime/python.rs @@ -339,13 +339,15 @@ if __name__ == '__main__': None } OutputFormat::Json => { - // Try to parse last line of stdout as JSON - stdout_result - .content - .trim() - .lines() - .last() - .and_then(|line| serde_json::from_str(line).ok()) + // Try to parse full stdout as JSON first (handles multi-line JSON), + // then fall back to last line only (for scripts that log before output) + let trimmed = stdout_result.content.trim(); + serde_json::from_str(trimmed).ok().or_else(|| { + trimmed + .lines() + .last() + .and_then(|line| serde_json::from_str(line).ok()) + }) } OutputFormat::Yaml => { // Try to parse stdout as YAML diff --git a/crates/worker/src/runtime/shell.rs b/crates/worker/src/runtime/shell.rs index b393b34..2688bea 100644 --- a/crates/worker/src/runtime/shell.rs +++ b/crates/worker/src/runtime/shell.rs @@ -208,13 +208,15 @@ impl ShellRuntime { None } OutputFormat::Json => { - // Try to parse last line of stdout as JSON - stdout_result - .content - .trim() - .lines() - .last() - .and_then(|line| serde_json::from_str(line).ok()) + // Try to parse full stdout as JSON first (handles multi-line JSON), + // then fall back to last line only (for scripts that log before output) + let trimmed = stdout_result.content.trim(); + serde_json::from_str(trimmed).ok().or_else(|| { + trimmed + .lines() + .last() + .and_then(|line| serde_json::from_str(line).ok()) + }) } OutputFormat::Yaml => { // Try to parse stdout as YAML @@ -823,4 +825,97 @@ echo '{"id": 3, "name": "Charlie"}' assert_eq!(items[2]["id"], 3); assert_eq!(items[2]["name"], "Charlie"); } + + #[tokio::test] + async fn test_shell_runtime_multiline_json_output() { + // Regression test: scripts that embed pretty-printed JSON (e.g., http_request.sh + // embedding a multi-line response body in its "json" field) produce multi-line + // stdout. The parser must handle this by trying to parse the full stdout as JSON + // before falling back to last-line parsing. + let runtime = ShellRuntime::new(); + + let context = ExecutionContext { + execution_id: 7, + action_ref: "test.multiline_json".to_string(), + parameters: HashMap::new(), + env: HashMap::new(), + secrets: HashMap::new(), + timeout: Some(10), + working_dir: None, + entry_point: "shell".to_string(), + code: Some( + r#" +# Simulate http_request.sh output with embedded pretty-printed JSON +printf '{"status_code":200,"body":"hello","json":{\n "args": {\n "hello": "world"\n },\n "url": "https://example.com"\n},"success":true}\n' +"# + .to_string(), + ), + code_path: None, + runtime_name: Some("shell".to_string()), + max_stdout_bytes: 10 * 1024 * 1024, + max_stderr_bytes: 10 * 1024 * 1024, + parameter_delivery: attune_common::models::ParameterDelivery::default(), + parameter_format: attune_common::models::ParameterFormat::default(), + output_format: attune_common::models::OutputFormat::Json, + }; + + let result = runtime.execute(context).await.unwrap(); + assert!(result.is_success()); + assert_eq!(result.exit_code, 0); + + // Verify result was parsed (not stored as raw stdout) + let parsed = result + .result + .expect("Multi-line JSON should be parsed successfully"); + assert_eq!(parsed["status_code"], 200); + assert_eq!(parsed["success"], true); + assert_eq!(parsed["json"]["args"]["hello"], "world"); + + // stdout should be empty when result is successfully parsed + assert!( + result.stdout.is_empty(), + "stdout should be empty when result is parsed, got: {}", + result.stdout + ); + } + + #[tokio::test] + async fn test_shell_runtime_json_with_log_prefix() { + // Verify last-line fallback still works: scripts that log to stdout + // before the final JSON line should still parse correctly. + let runtime = ShellRuntime::new(); + + let context = ExecutionContext { + execution_id: 8, + action_ref: "test.json_with_logs".to_string(), + parameters: HashMap::new(), + env: HashMap::new(), + secrets: HashMap::new(), + timeout: Some(10), + working_dir: None, + entry_point: "shell".to_string(), + code: Some( + r#" +echo "Starting action..." +echo "Processing data..." +echo '{"result": "success", "count": 42}' +"# + .to_string(), + ), + code_path: None, + runtime_name: Some("shell".to_string()), + max_stdout_bytes: 10 * 1024 * 1024, + max_stderr_bytes: 10 * 1024 * 1024, + parameter_delivery: attune_common::models::ParameterDelivery::default(), + parameter_format: attune_common::models::ParameterFormat::default(), + output_format: attune_common::models::OutputFormat::Json, + }; + + let result = runtime.execute(context).await.unwrap(); + assert!(result.is_success()); + + let parsed = result.result.expect("Last-line JSON should be parsed"); + assert_eq!(parsed["result"], "success"); + assert_eq!(parsed["count"], 42); + } } diff --git a/crates/worker/src/service.rs b/crates/worker/src/service.rs index 020d2f3..3a3fbf4 100644 --- a/crates/worker/src/service.rs +++ b/crates/worker/src/service.rs @@ -19,6 +19,7 @@ use sqlx::PgPool; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; +use tokio::task::JoinHandle; use tracing::{error, info, warn}; use crate::artifacts::ArtifactManager; @@ -51,6 +52,7 @@ pub struct WorkerService { mq_connection: Arc, publisher: Arc, consumer: Option>, + consumer_handle: Option>, worker_id: Option, } @@ -266,6 +268,7 @@ impl WorkerService { mq_connection: Arc::new(mq_connection), publisher: Arc::new(publisher), consumer: None, + consumer_handle: None, worker_id: None, }) } @@ -305,25 +308,35 @@ impl WorkerService { Ok(()) } - /// Stop the worker service + /// Stop the worker service gracefully + /// + /// Shutdown order (mirrors sensor service pattern): + /// 1. Deregister worker (mark inactive to stop receiving new work) + /// 2. Stop heartbeat + /// 3. Wait for in-flight tasks with timeout + /// 4. Close MQ connection + /// 5. Close DB connection pub async fn stop(&mut self) -> Result<()> { info!("Stopping Worker Service - initiating graceful shutdown"); - // Mark worker as inactive first to stop receiving new tasks + // 1. Mark worker as inactive first to stop receiving new tasks + // Use if-let instead of ? so shutdown continues even if DB call fails { let reg = self.registration.read().await; info!("Marking worker as inactive to stop receiving new tasks"); - reg.deregister().await?; + if let Err(e) = reg.deregister().await { + error!("Failed to deregister worker: {}", e); + } } - // Stop heartbeat + // 2. Stop heartbeat info!("Stopping heartbeat updates"); self.heartbeat.stop().await; - // Wait a bit for heartbeat to stop + // Wait a bit for heartbeat loop to notice the flag tokio::time::sleep(Duration::from_millis(100)).await; - // Wait for in-flight tasks to complete (with timeout) + // 3. Wait for in-flight tasks to complete (with timeout) let shutdown_timeout = self .config .worker @@ -342,6 +355,23 @@ impl WorkerService { Err(_) => warn!("Shutdown timeout reached - some tasks may have been interrupted"), } + // 4. Abort consumer task and close message queue connection + if let Some(handle) = self.consumer_handle.take() { + info!("Stopping consumer task..."); + handle.abort(); + // Wait briefly for the task to finish + let _ = handle.await; + } + + info!("Closing message queue connection..."); + if let Err(e) = self.mq_connection.close().await { + warn!("Error closing message queue: {}", e); + } + + // 5. Close database connection + info!("Closing database connection..."); + self.db_pool.close().await; + info!("Worker Service stopped"); Ok(()) @@ -364,6 +394,9 @@ impl WorkerService { } /// Start consuming execution.scheduled messages + /// + /// Spawns the consumer loop as a background task so that `start()` returns + /// immediately, allowing the caller to set up signal handlers. async fn start_execution_consumer(&mut self) -> Result<()> { let worker_id = self .worker_id @@ -375,48 +408,63 @@ impl WorkerService { info!("Starting consumer for worker queue: {}", queue_name); // Create consumer - let consumer = Consumer::new( - &self.mq_connection, - ConsumerConfig { - queue: queue_name.clone(), - tag: format!("worker-{}", worker_id), - prefetch_count: 10, - auto_ack: false, - exclusive: false, - }, - ) - .await - .map_err(|e| Error::Internal(format!("Failed to create consumer: {}", e)))?; - - info!("Consumer started for queue: {}", queue_name); - - info!("Message queue consumer initialized"); - - // Clone Arc references for the handler - let executor = self.executor.clone(); - let publisher = self.publisher.clone(); - let db_pool = self.db_pool.clone(); - - // Consume messages with handler - consumer - .consume_with_handler( - move |envelope: MessageEnvelope| { - let executor = executor.clone(); - let publisher = publisher.clone(); - let db_pool = db_pool.clone(); - - async move { - Self::handle_execution_scheduled(executor, publisher, db_pool, envelope) - .await - .map_err(|e| format!("Execution handler error: {}", e).into()) - } + let consumer = Arc::new( + Consumer::new( + &self.mq_connection, + ConsumerConfig { + queue: queue_name.clone(), + tag: format!("worker-{}", worker_id), + prefetch_count: 10, + auto_ack: false, + exclusive: false, }, ) .await - .map_err(|e| Error::Internal(format!("Failed to start consumer: {}", e)))?; + .map_err(|e| Error::Internal(format!("Failed to create consumer: {}", e)))?, + ); - // Store consumer reference - self.consumer = Some(Arc::new(consumer)); + info!("Consumer created for queue: {}", queue_name); + + // Clone Arc references for the spawned task + let executor = self.executor.clone(); + let publisher = self.publisher.clone(); + let db_pool = self.db_pool.clone(); + let consumer_for_task = consumer.clone(); + let queue_name_for_log = queue_name.clone(); + + // Spawn the consumer loop as a background task so start() can return + let handle = tokio::spawn(async move { + info!("Consumer loop started for queue '{}'", queue_name_for_log); + let result = consumer_for_task + .consume_with_handler( + move |envelope: MessageEnvelope| { + let executor = executor.clone(); + let publisher = publisher.clone(); + let db_pool = db_pool.clone(); + + async move { + Self::handle_execution_scheduled(executor, publisher, db_pool, envelope) + .await + .map_err(|e| format!("Execution handler error: {}", e).into()) + } + }, + ) + .await; + + match result { + Ok(()) => info!("Consumer loop for queue '{}' ended", queue_name_for_log), + Err(e) => error!( + "Consumer loop for queue '{}' failed: {}", + queue_name_for_log, e + ), + } + }); + + // Store consumer reference and task handle + self.consumer = Some(consumer); + self.consumer_handle = Some(handle); + + info!("Message queue consumer initialized"); Ok(()) } diff --git a/docker-compose.yaml b/docker-compose.yaml index 05d3b50..20e9109 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -170,7 +170,7 @@ services: container_name: attune-api environment: RUST_LOG: info - ATTUNE_CONFIG: /opt/attune/config.docker.yaml + ATTUNE_CONFIG: /opt/attune/config.yaml # Security - MUST set these in production via .env file ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} ATTUNE__SECURITY__ENCRYPTION_KEY: ${ENCRYPTION_KEY:-docker-dev-encryption-key-please-change-in-production-32plus} @@ -221,7 +221,7 @@ services: container_name: attune-executor environment: RUST_LOG: info - ATTUNE_CONFIG: /opt/attune/config.docker.yaml + ATTUNE_CONFIG: /opt/attune/config.yaml ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} ATTUNE__SECURITY__ENCRYPTION_KEY: ${ENCRYPTION_KEY:-docker-dev-encryption-key-please-change-in-production-32plus} ATTUNE__DATABASE__URL: postgresql://attune:attune@postgres:5432/attune @@ -246,7 +246,7 @@ services: redis: condition: service_healthy healthcheck: - test: ["CMD-SHELL", "pgrep -f attune-service || exit 1"] + test: ["CMD-SHELL", "kill -0 1 || exit 1"] interval: 30s timeout: 10s retries: 3 @@ -256,10 +256,8 @@ services: restart: unless-stopped # ============================================================================ - # Worker Services (Multiple variants with different runtime capabilities) + # Workers # ============================================================================ - - # Base worker - Shell commands only worker-shell: build: context: . @@ -271,7 +269,7 @@ services: stop_grace_period: 45s environment: RUST_LOG: info - ATTUNE_CONFIG: /opt/attune/config.docker.yaml + ATTUNE_CONFIG: /opt/attune/config.yaml ATTUNE_WORKER_RUNTIMES: shell ATTUNE_WORKER_TYPE: container ATTUNE_WORKER_NAME: worker-shell-01 @@ -316,7 +314,7 @@ services: stop_grace_period: 45s environment: RUST_LOG: info - ATTUNE_CONFIG: /opt/attune/config.docker.yaml + ATTUNE_CONFIG: /opt/attune/config.yaml ATTUNE_WORKER_RUNTIMES: shell,python ATTUNE_WORKER_TYPE: container ATTUNE_WORKER_NAME: worker-python-01 @@ -361,7 +359,7 @@ services: stop_grace_period: 45s environment: RUST_LOG: info - ATTUNE_CONFIG: /opt/attune/config.docker.yaml + ATTUNE_CONFIG: /opt/attune/config.yaml ATTUNE_WORKER_RUNTIMES: shell,node ATTUNE_WORKER_TYPE: container ATTUNE_WORKER_NAME: worker-node-01 @@ -406,7 +404,7 @@ services: stop_grace_period: 45s environment: RUST_LOG: info - ATTUNE_CONFIG: /opt/attune/config.docker.yaml + ATTUNE_CONFIG: /opt/attune/config.yaml ATTUNE_WORKER_RUNTIMES: shell,python,node,native ATTUNE_WORKER_TYPE: container ATTUNE_WORKER_NAME: worker-full-01 @@ -447,9 +445,10 @@ services: SERVICE: sensor BUILDKIT_INLINE_CACHE: 1 container_name: attune-sensor + stop_grace_period: 45s environment: RUST_LOG: debug - ATTUNE_CONFIG: /opt/attune/config.docker.yaml + ATTUNE_CONFIG: /opt/attune/config.yaml ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} ATTUNE__SECURITY__ENCRYPTION_KEY: ${ENCRYPTION_KEY:-docker-dev-encryption-key-please-change-in-production-32plus} ATTUNE__DATABASE__URL: postgresql://attune:attune@postgres:5432/attune @@ -475,7 +474,7 @@ services: rabbitmq: condition: service_healthy healthcheck: - test: ["CMD-SHELL", "pgrep -f attune-service || exit 1"] + test: ["CMD-SHELL", "kill -0 1 || exit 1"] interval: 30s timeout: 10s retries: 3 @@ -494,7 +493,7 @@ services: container_name: attune-notifier environment: RUST_LOG: info - ATTUNE_CONFIG: /opt/attune/config.docker.yaml + ATTUNE_CONFIG: /opt/attune/config.yaml ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} ATTUNE__SECURITY__ENCRYPTION_KEY: ${ENCRYPTION_KEY:-docker-dev-encryption-key-please-change-in-production-32plus} ATTUNE__DATABASE__URL: postgresql://attune:attune@postgres:5432/attune @@ -512,7 +511,7 @@ services: rabbitmq: condition: service_healthy healthcheck: - test: ["CMD-SHELL", "pgrep -f attune-service || exit 1"] + test: ["CMD", "curl", "-f", "http://localhost:8081/health"] interval: 30s timeout: 10s retries: 3 @@ -524,7 +523,6 @@ services: # ============================================================================ # Web UI # ============================================================================ - web: build: context: . @@ -537,8 +535,10 @@ services: ports: - "3000:80" depends_on: - - api - - notifier + api: + condition: service_healthy + notifier: + condition: service_healthy healthcheck: test: [ diff --git a/docker/Dockerfile b/docker/Dockerfile index 07a16d7..c0d857d 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -109,9 +109,9 @@ WORKDIR /opt/attune # Note: We copy from /build/attune-service-binary because the cache mount is not available in COPY COPY --from=builder /build/attune-service-binary /usr/local/bin/attune-service -# Copy configuration files -COPY config.production.yaml ./config.yaml -COPY config.docker.yaml ./config.docker.yaml +# Copy configuration for Docker Compose development +# Production: mount config files as a volume instead of baking them into the image +COPY config.docker.yaml ./config.yaml # Copy migrations for services that need them COPY migrations/ ./migrations/ @@ -132,7 +132,7 @@ USER attune # Environment variables (can be overridden at runtime) ENV RUST_LOG=info -ENV ATTUNE_CONFIG=/opt/attune/config.docker.yaml +ENV ATTUNE_CONFIG=/opt/attune/config.yaml # Health check (will be overridden per service in docker-compose) HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \ diff --git a/docker/Dockerfile.optimized b/docker/Dockerfile.optimized index 1c62d0d..adcde5e 100644 --- a/docker/Dockerfile.optimized +++ b/docker/Dockerfile.optimized @@ -123,6 +123,17 @@ COPY migrations/ ./migrations/ # Copy the common crate (almost all services depend on this) COPY crates/common/ ./crates/common/ +# Build the specified service +# The cargo registry and git cache are pre-populated from the planner stage +# Only the actual compilation happens here +# - registry/git use sharing=shared (concurrent builds of different services are safe) +# - target uses service-specific cache ID (each service compiles different crates) +RUN --mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \ + --mount=type=cache,target=/usr/local/cargo/git,sharing=shared \ + --mount=type=cache,target=/build/target,sharing=locked \ + cargo build --release --lib -p attune-common + + # Build argument to specify which service to build ARG SERVICE=api @@ -137,7 +148,7 @@ COPY crates/${SERVICE}/ ./crates/${SERVICE}/ # - target uses service-specific cache ID (each service compiles different crates) RUN --mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \ --mount=type=cache,target=/usr/local/cargo/git,sharing=shared \ - --mount=type=cache,target=/build/target,id=target-builder-${SERVICE} \ + --mount=type=cache,target=/build/target,sharing=shared \ cargo build --release --bin attune-${SERVICE} && \ cp /build/target/release/attune-${SERVICE} /build/attune-service-binary @@ -164,9 +175,9 @@ WORKDIR /opt/attune # Copy the service binary from builder COPY --from=builder /build/attune-service-binary /usr/local/bin/attune-service -# Copy configuration files -COPY config.production.yaml ./config.yaml -COPY config.docker.yaml ./config.docker.yaml +# Copy configuration file for Docker Compose development +# In production, mount config files as a volume instead of baking them into the image +COPY config.docker.yaml ./config.yaml # Copy migrations for services that need them COPY migrations/ ./migrations/ @@ -184,7 +195,7 @@ USER attune # Environment variables (can be overridden at runtime) ENV RUST_LOG=info -ENV ATTUNE_CONFIG=/opt/attune/config.docker.yaml +ENV ATTUNE_CONFIG=/opt/attune/config.yaml # Health check (will be overridden per service in docker-compose) HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \ diff --git a/docker/nginx.conf b/docker/nginx.conf index 83da9f1..9ad6a6b 100644 --- a/docker/nginx.conf +++ b/docker/nginx.conf @@ -26,9 +26,18 @@ server { add_header Content-Type text/plain; } + # Use Docker's embedded DNS resolver so that proxy_pass with variables + # resolves hostnames at request time, not config load time. + # This prevents nginx from crashing if backends aren't ready yet. + resolver 127.0.0.11 valid=10s; + set $api_upstream http://api:8080; + set $notifier_upstream http://notifier:8081; + # Auth proxy - forward auth requests to backend + # With variable proxy_pass (no URI path), the full original request URI + # (e.g. /auth/login) is passed through to the backend as-is. location /auth/ { - proxy_pass http://api:8080/auth/; + proxy_pass $api_upstream; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection 'upgrade'; @@ -45,8 +54,10 @@ server { } # API proxy - forward API requests to backend (preserves /api prefix) + # With variable proxy_pass (no URI path), the full original request URI + # (e.g. /api/packs?page=1) is passed through to the backend as-is. location /api/ { - proxy_pass http://api:8080/api/; + proxy_pass $api_upstream; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection 'upgrade'; @@ -63,8 +74,11 @@ server { } # WebSocket proxy for notifier service + # Strip the /ws/ prefix before proxying (notifier expects paths at root). + # e.g. /ws/events → /events location /ws/ { - proxy_pass http://notifier:8081/; + rewrite ^/ws/(.*) /$1 break; + proxy_pass $notifier_upstream; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "Upgrade"; diff --git a/packs/core/actions/http_request.sh b/packs/core/actions/http_request.sh index c8c982e..8e1fec8 100755 --- a/packs/core/actions/http_request.sh +++ b/packs/core/actions/http_request.sh @@ -27,10 +27,13 @@ query_params_file=$(mktemp) body_file="" temp_headers=$(mktemp) curl_output=$(mktemp) +write_out_file=$(mktemp) cleanup() { - rm -f "$headers_file" "$query_params_file" "$temp_headers" "$curl_output" + local exit_code=$? + rm -f "$headers_file" "$query_params_file" "$temp_headers" "$curl_output" "$write_out_file" [ -n "$body_file" ] && [ -f "$body_file" ] && rm -f "$body_file" + return "$exit_code" } trap cleanup EXIT @@ -119,13 +122,16 @@ curl_args=$(mktemp) { printf -- '-X\n%s\n' "$method" printf -- '-s\n' - printf -- '-w\n\n%%{http_code}\n%%{url_effective}\n\n' + # Use @file for -w to avoid xargs escape interpretation issues + # curl's @file mode requires literal \n (two chars) not actual newlines + printf '\\n%%{http_code}\\n%%{url_effective}\\n' > "$write_out_file" + printf -- '-w\n@%s\n' "$write_out_file" printf -- '--max-time\n%s\n' "$timeout" printf -- '--connect-timeout\n10\n' printf -- '--dump-header\n%s\n' "$temp_headers" - + [ "$verify_ssl" = "false" ] && printf -- '-k\n' - + if [ "$follow_redirects" = "true" ]; then printf -- '-L\n' printf -- '--max-redirs\n%s\n' "$max_redirects" @@ -208,7 +214,7 @@ first_header=true if [ -f "$temp_headers" ]; then while IFS= read -r line; do case "$line" in HTTP/*|'') continue ;; esac - + header_name="${line%%:*}" header_value="${line#*:}" [ "$header_name" = "$line" ] && continue @@ -241,7 +247,13 @@ if [ -n "$body_output" ]; then case "$first_char" in '{'|'[') case "$last_char" in - '}'|']') json_parsed="$body_output" ;; + '}'|']') + # Compact multi-line JSON to single line to avoid breaking + # the worker's last-line JSON parser. In valid JSON, literal + # newlines only appear as whitespace outside strings (inside + # strings they must be escaped as \n), so tr is safe here. + json_parsed=$(printf '%s' "$body_output" | tr '\n' ' ' | tr '\r' ' ') + ;; esac ;; esac diff --git a/web/src/components/common/ExecuteActionModal.tsx b/web/src/components/common/ExecuteActionModal.tsx new file mode 100644 index 0000000..50db70f --- /dev/null +++ b/web/src/components/common/ExecuteActionModal.tsx @@ -0,0 +1,273 @@ +import { useState } from "react"; +import { useMutation, useQueryClient } from "@tanstack/react-query"; +import { OpenAPI } from "@/api"; +import { Play, X } from "lucide-react"; +import ParamSchemaForm, { + validateParamSchema, + type ParamSchema, +} from "@/components/common/ParamSchemaForm"; + +interface ExecuteActionModalProps { + action: any; + onClose: () => void; + initialParameters?: Record; +} + +/** + * Shared modal for executing an action with a dynamic parameter form. + * + * Used from: + * - ActionDetail page (Execute button) + * - ExecutionDetailPage (Re-Run button, with initialParameters pre-filled from previous execution config) + */ +export default function ExecuteActionModal({ + action, + onClose, + initialParameters, +}: ExecuteActionModalProps) { + const queryClient = useQueryClient(); + + const paramSchema: ParamSchema = (action.param_schema as ParamSchema) || {}; + + // If initialParameters are provided, use them (stripping out any keys not in the schema) + const buildInitialValues = (): Record => { + if (!initialParameters) return {}; + const properties = paramSchema.properties || {}; + const values: Record = {}; + // Include all initial parameters - even those not in the schema + // so users can see exactly what was run before + for (const [key, value] of Object.entries(initialParameters)) { + if (value !== undefined && value !== null) { + values[key] = value; + } + } + // Also fill in defaults for any schema properties not covered + for (const [key, param] of Object.entries(properties)) { + if (values[key] === undefined && param?.default !== undefined) { + values[key] = param.default; + } + } + return values; + }; + + const [parameters, setParameters] = useState>( + buildInitialValues, + ); + const [paramErrors, setParamErrors] = useState>({}); + const [envVars, setEnvVars] = useState>( + [{ key: "", value: "" }], + ); + + const executeAction = useMutation({ + mutationFn: async (params: { + parameters: Record; + envVars: Array<{ key: string; value: string }>; + }) => { + const token = + typeof OpenAPI.TOKEN === "function" + ? await OpenAPI.TOKEN({} as any) + : OpenAPI.TOKEN; + + const response = await fetch( + `${OpenAPI.BASE}/api/v1/executions/execute`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ + action_ref: action.ref, + parameters: params.parameters, + env_vars: params.envVars + .filter((ev) => ev.key.trim() !== "") + .reduce( + (acc, ev) => { + acc[ev.key] = ev.value; + return acc; + }, + {} as Record, + ), + }), + }, + ); + + if (!response.ok) { + const error = await response.json(); + throw new Error(error.message || "Failed to execute action"); + } + + return response.json(); + }, + onSuccess: (data) => { + queryClient.invalidateQueries({ queryKey: ["executions"] }); + onClose(); + if (data?.data?.id) { + window.location.href = `/executions/${data.data.id}`; + } + }, + }); + + const validateForm = (): boolean => { + const errors = validateParamSchema(paramSchema, parameters); + setParamErrors(errors); + return Object.keys(errors).length === 0; + }; + + const handleExecute = async () => { + if (!validateForm()) { + return; + } + + try { + await executeAction.mutateAsync({ parameters, envVars }); + } catch (err) { + console.error("Failed to execute action:", err); + } + }; + + const addEnvVar = () => { + setEnvVars([...envVars, { key: "", value: "" }]); + }; + + const removeEnvVar = (index: number) => { + if (envVars.length > 1) { + setEnvVars(envVars.filter((_, i) => i !== index)); + } + }; + + const updateEnvVar = ( + index: number, + field: "key" | "value", + value: string, + ) => { + const updated = [...envVars]; + updated[index][field] = value; + setEnvVars(updated); + }; + + return ( +
+
+
+

+ {initialParameters ? "Re-Run Action" : "Execute Action"} +

+ +
+ +
+

+ Action:{" "} + {action.ref} +

+ {action.description && ( +

{action.description}

+ )} + {initialParameters && ( +

+ Parameters pre-filled from previous execution. Modify as needed + before re-running. +

+ )} +
+ + {executeAction.error && ( +
+ {(executeAction.error as Error).message} +
+ )} + +
+

+ Parameters +

+ +
+ +
+

+ Environment Variables +

+

+ Optional environment variables for this execution (e.g., DEBUG, + LOG_LEVEL) +

+
+ {envVars.map((envVar, index) => ( +
+ updateEnvVar(index, "key", e.target.value)} + className="flex-1 px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-blue-500" + /> + updateEnvVar(index, "value", e.target.value)} + className="flex-1 px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-blue-500" + /> + +
+ ))} +
+ +
+ +
+ + +
+
+
+ ); +} diff --git a/web/src/pages/actions/ActionsPage.tsx b/web/src/pages/actions/ActionsPage.tsx index 1ca3ab2..8efa6c0 100644 --- a/web/src/pages/actions/ActionsPage.tsx +++ b/web/src/pages/actions/ActionsPage.tsx @@ -3,12 +3,7 @@ import { useActions, useAction, useDeleteAction } from "@/hooks/useActions"; import { useExecutions } from "@/hooks/useExecutions"; import { useState, useMemo } from "react"; import { ChevronDown, ChevronRight, Search, X, Play } from "lucide-react"; -import { useMutation, useQueryClient } from "@tanstack/react-query"; -import { OpenAPI } from "@/api"; -import ParamSchemaForm, { - validateParamSchema, - type ParamSchema, -} from "@/components/common/ParamSchemaForm"; +import ExecuteActionModal from "@/components/common/ExecuteActionModal"; import ErrorDisplay from "@/components/common/ErrorDisplay"; export default function ActionsPage() { @@ -573,229 +568,3 @@ function ActionDetail({ actionRef }: { actionRef: string }) { ); } - -function ExecuteActionModal({ - action, - onClose, -}: { - action: any; - onClose: () => void; -}) { - const queryClient = useQueryClient(); - - // Initialize parameters with default values from schema - const paramSchema: ParamSchema = (action.param_schema as ParamSchema) || {}; - - const [parameters, setParameters] = useState>({}); - const [paramErrors, setParamErrors] = useState>({}); - const [envVars, setEnvVars] = useState>( - [{ key: "", value: "" }], - ); - - const executeAction = useMutation({ - mutationFn: async (params: { - parameters: Record; - envVars: Array<{ key: string; value: string }>; - }) => { - // Get the token by calling the TOKEN function - const token = - typeof OpenAPI.TOKEN === "function" - ? await OpenAPI.TOKEN({} as any) - : OpenAPI.TOKEN; - - const response = await fetch( - `${OpenAPI.BASE}/api/v1/executions/execute`, - { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${token}`, - }, - body: JSON.stringify({ - action_ref: action.ref, - parameters: params.parameters, - env_vars: params.envVars - .filter((ev) => ev.key.trim() !== "") - .reduce( - (acc, ev) => { - acc[ev.key] = ev.value; - return acc; - }, - {} as Record, - ), - }), - }, - ); - - if (!response.ok) { - const error = await response.json(); - throw new Error(error.message || "Failed to execute action"); - } - - return response.json(); - }, - onSuccess: (data) => { - queryClient.invalidateQueries({ queryKey: ["executions"] }); - onClose(); - // Redirect to execution detail page - if (data?.data?.id) { - window.location.href = `/executions/${data.data.id}`; - } - }, - }); - - const validateForm = (): boolean => { - const errors = validateParamSchema(paramSchema, parameters); - setParamErrors(errors); - return Object.keys(errors).length === 0; - }; - - const handleExecute = async () => { - if (!validateForm()) { - return; - } - - try { - await executeAction.mutateAsync({ parameters, envVars }); - } catch (err) { - console.error("Failed to execute action:", err); - } - }; - - const addEnvVar = () => { - setEnvVars([...envVars, { key: "", value: "" }]); - }; - - const removeEnvVar = (index: number) => { - if (envVars.length > 1) { - setEnvVars(envVars.filter((_, i) => i !== index)); - } - }; - - const updateEnvVar = ( - index: number, - field: "key" | "value", - value: string, - ) => { - const updated = [...envVars]; - updated[index][field] = value; - setEnvVars(updated); - }; - - return ( -
-
-
-

Execute Action

- -
- -
-

- Action:{" "} - {action.ref} -

- {action.description && ( -

{action.description}

- )} -
- - {executeAction.error && ( -
- {(executeAction.error as Error).message} -
- )} - -
-

- Parameters -

- -
- -
-

- Environment Variables -

-

- Optional environment variables for this execution (e.g., DEBUG, - LOG_LEVEL) -

-
- {envVars.map((envVar, index) => ( -
- updateEnvVar(index, "key", e.target.value)} - className="flex-1 px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-blue-500" - /> - updateEnvVar(index, "value", e.target.value)} - className="flex-1 px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-blue-500" - /> - -
- ))} -
- -
- -
- - -
-
-
- ); -} diff --git a/web/src/pages/executions/ExecutionDetailPage.tsx b/web/src/pages/executions/ExecutionDetailPage.tsx index 12212ed..f916480 100644 --- a/web/src/pages/executions/ExecutionDetailPage.tsx +++ b/web/src/pages/executions/ExecutionDetailPage.tsx @@ -1,8 +1,12 @@ import { useParams, Link } from "react-router-dom"; import { useExecution } from "@/hooks/useExecutions"; +import { useAction } from "@/hooks/useActions"; import { useExecutionStream } from "@/hooks/useExecutionStream"; import { formatDistanceToNow } from "date-fns"; import { ExecutionStatus } from "@/api"; +import { useState } from "react"; +import { RotateCcw } from "lucide-react"; +import ExecuteActionModal from "@/components/common/ExecuteActionModal"; const getStatusColor = (status: string) => { switch (status) { @@ -31,6 +35,11 @@ export default function ExecutionDetailPage() { const { data: executionData, isLoading, error } = useExecution(Number(id)); const execution = executionData?.data; + // Fetch the action so we can get param_schema for the re-run modal + const { data: actionData } = useAction(execution?.action_ref || ""); + + const [showRerunModal, setShowRerunModal] = useState(false); + // Subscribe to real-time updates for this execution const { isConnected } = useExecutionStream({ executionId: Number(id), @@ -102,6 +111,19 @@ export default function ExecutionDetailPage() { )} +

+ {/* Re-Run Modal */} + {showRerunModal && actionData?.data && ( + setShowRerunModal(false)} + initialParameters={execution.config} + /> + )} +

{/* Main Content */}
@@ -295,6 +326,13 @@ export default function ExecutionDetailPage() {

Quick Actions

+