From ed74dfad6c0d956e885dbaaac59369ed40b31d78 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Tue, 7 Apr 2026 08:55:27 -0500 Subject: [PATCH] log streams in watched cli executions --- Cargo.lock | 60 +- Cargo.toml | 10 +- crates/cli/Cargo.toml | 1 + crates/cli/README.md | 10 +- crates/cli/src/commands/action.rs | 42 +- crates/cli/src/main.rs | 14 +- crates/cli/src/output.rs | 198 +++- crates/cli/src/wait.rs | 1001 +++++++++++++++++- crates/cli/tests/README.md | 4 +- crates/cli/tests/test_actions.rs | 4 +- crates/common/src/pack_registry/installer.rs | 67 +- crates/worker/src/service.rs | 15 +- deny.toml | 37 +- docker-compose.yaml | 48 +- docs/cli/cli.md | 10 +- 15 files changed, 1368 insertions(+), 153 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79c0ee7..8ba2857 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,7 +201,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -212,7 +212,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -535,6 +535,7 @@ dependencies = [ "sha2", "tar", "tempfile", + "terminal_size", "thiserror 2.0.18", "tokio", "tokio-test", @@ -1131,7 +1132,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -1795,7 +1796,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -1979,7 +1980,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -3006,9 +3007,9 @@ dependencies = [ [[package]] name = "lapin" -version = "4.3.0" +version = "4.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1586ef35d652d6c47ed7449277a4483805b73b84ab368c85af44205fe3457972" +checksum = "39338badb3f992d800f6964501b056b575bdf142eb288202f973d218fe253b90" dependencies = [ "amq-protocol", "async-rs", @@ -3209,9 +3210,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", "wasi", @@ -3334,7 +3335,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -3466,7 +3467,7 @@ version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51e219e79014df21a225b1860a479e2dcd7cbd9130f4defd4bd0e191ea31d67d" dependencies = [ - "base64 0.22.1", + "base64 0.21.7", "chrono", "getrandom 0.2.17", "http", @@ -4204,12 +4205,13 @@ dependencies = [ [[package]] name = "redis" -version = "1.0.5" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b36964393906eb775b89b25b05b7b95685b8dd14062f1663a31ff93e75c452e5" +checksum = "f44e94c96d8870a387d88ce3de3fdd608cbfc0705f03cb343cdde91509d3e49a" dependencies = [ "arc-swap", "arcstr", + "async-lock", "backon", "bytes", "cfg-if", @@ -4566,7 +4568,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -4640,7 +4642,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -5094,7 +5096,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -5456,7 +5458,17 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.61.2", + "windows-sys 0.60.2", +] + +[[package]] +name = "terminal_size" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" +dependencies = [ + "rustix", + "windows-sys 0.60.2", ] [[package]] @@ -5591,9 +5603,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.50.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd" dependencies = [ "bytes", "libc", @@ -5624,9 +5636,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.1" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" dependencies = [ "proc-macro2", "quote", @@ -6052,9 +6064,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.22.0" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" +checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" dependencies = [ "getrandom 0.4.2", "js-sys", @@ -6398,7 +6410,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7b79f41..3354fc3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ repository = "https://git.rdrx.app/attune-system/attune" [workspace.dependencies] # Async runtime -tokio = { version = "1.50", features = ["full"] } +tokio = { version = "1.51", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } tokio-stream = { version = "0.1", features = ["sync"] } @@ -52,7 +52,7 @@ config = "0.15" chrono = { version = "0.4", features = ["serde"] } # UUID -uuid = { version = "1.22", features = ["v4", "serde"] } +uuid = { version = "1.23", features = ["v4", "serde"] } # Validation validator = { version = "0.20", features = ["derive"] } @@ -62,9 +62,9 @@ clap = { version = "4.6", features = ["derive"] } # Message queue / PubSub # RabbitMQ -lapin = "4.3" +lapin = "4.4" # Redis -redis = { version = "1.0", features = ["tokio-comp", "connection-manager"] } +redis = { version = "1.2", features = ["tokio-comp", "connection-manager"] } # JSON Schema schemars = { version = "1.2", features = ["chrono04"] } @@ -91,7 +91,7 @@ regex = "1.12" # HTTP client reqwest = { version = "0.13", features = ["json"] } reqwest-eventsource = "0.6" -hyper = { version = "1.8", features = ["full"] } +hyper = { version = "1.9", features = ["full"] } # File system utilities walkdir = "2.5" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 89d5e1c..3991175 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -59,6 +59,7 @@ sha2 = { workspace = true } colored = "3.1" comfy-table = { version = "7.2", features = ["custom_styling"] } dialoguer = "0.12" +terminal_size = "0.4" # Authentication jsonwebtoken = { workspace = true } diff --git a/crates/cli/README.md b/crates/cli/README.md index d4609c8..3cef2fb 100644 --- a/crates/cli/README.md +++ b/crates/cli/README.md @@ -175,11 +175,11 @@ attune action execute core.echo --param message="Hello World" --param count=3 # With JSON parameters attune action execute core.echo --params-json '{"message": "Hello", "count": 5}' -# Wait for completion -attune action execute core.long_task --wait +# Watch until completion +attune action execute core.long_task --watch -# Wait with custom timeout (default 300 seconds) -attune action execute core.long_task --wait --timeout 600 +# Watch with custom timeout (default 300 seconds) +attune action execute core.long_task --watch --timeout 600 ``` ## Rule Management @@ -588,4 +588,4 @@ Key dependencies: - `colored`: Terminal colors - `comfy-table`: Table formatting - `dialoguer`: Interactive prompts -- `indicatif`: Progress indicators (for future use) \ No newline at end of file +- `indicatif`: Progress indicators (for future use) diff --git a/crates/cli/src/commands/action.rs b/crates/cli/src/commands/action.rs index c08af3c..804e545 100644 --- a/crates/cli/src/commands/action.rs +++ b/crates/cli/src/commands/action.rs @@ -68,17 +68,17 @@ pub enum ActionCommands { #[arg(long, conflicts_with = "param")] params_json: Option, - /// Wait for execution to complete + /// Watch execution until it completes #[arg(short, long)] - wait: bool, + watch: bool, - /// Timeout in seconds when waiting (default: 300) - #[arg(long, default_value = "300", requires = "wait")] + /// Timeout in seconds when watching (default: 300) + #[arg(long, default_value = "300", requires = "watch")] timeout: u64, /// Notifier WebSocket base URL (e.g. ws://localhost:8081). /// Derived from --api-url automatically when not set. - #[arg(long, requires = "wait")] + #[arg(long, requires = "watch")] notifier_url: Option, }, } @@ -186,7 +186,7 @@ pub async fn handle_action_command( action_ref, param, params_json, - wait, + watch, timeout, notifier_url, } => { @@ -196,7 +196,7 @@ pub async fn handle_action_command( params_json, profile, api_url, - wait, + watch, timeout, notifier_url, output_format, @@ -307,7 +307,7 @@ async fn handle_show( if let Some(params) = action.param_schema { if !params.is_null() { output::print_section("Parameters Schema"); - println!("{}", serde_json::to_string_pretty(¶ms)?); + output::print_schema(¶ms)?; } } } @@ -428,7 +428,7 @@ async fn handle_execute( params_json: Option, profile: &Option, api_url: &Option, - wait: bool, + watch: bool, timeout: u64, notifier_url: Option, output_format: OutputFormat, @@ -468,7 +468,7 @@ async fn handle_execute( let path = "/executions/execute".to_string(); let execution: Execution = client.post(&path, &request).await?; - if !wait { + if !watch { match output_format { OutputFormat::Json | OutputFormat::Yaml => { output::print_output(&execution, output_format)?; @@ -492,22 +492,22 @@ async fn handle_execute( )); } - let verbose = matches!(output_format, OutputFormat::Table); - let watch_task = if verbose { - Some(spawn_execution_output_watch( - ApiClient::from_config(&config, api_url), - execution.id, - verbose, - )) - } else { - None - }; + let interactive_wait = true; + let stream_live_logs = true; + let debug_wait = false; + let watch_task = Some(spawn_execution_output_watch( + ApiClient::from_config(&config, api_url), + execution.id, + interactive_wait, + stream_live_logs, + debug_wait, + )); let summary = wait_for_execution(WaitOptions { execution_id: execution.id, timeout_secs: timeout, api_client: &mut client, notifier_ws_url: notifier_url, - verbose, + verbose: debug_wait, }) .await?; let suppress_final_stdout = watch_task diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index a1f5100..20c7ac5 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -124,17 +124,17 @@ enum Commands { #[arg(long, conflicts_with = "param")] params_json: Option, - /// Wait for execution to complete + /// Watch execution until it completes #[arg(short, long)] - wait: bool, + watch: bool, - /// Timeout in seconds when waiting (default: 300) - #[arg(long, default_value = "300", requires = "wait")] + /// Timeout in seconds when watching (default: 300) + #[arg(long, default_value = "300", requires = "watch")] timeout: u64, /// Notifier WebSocket base URL (e.g. ws://localhost:8081). /// Derived from --api-url automatically when not set. - #[arg(long, requires = "wait")] + #[arg(long, requires = "watch")] notifier_url: Option, }, } @@ -243,7 +243,7 @@ async fn main() { action_ref, param, params_json, - wait, + watch, timeout, notifier_url, } => { @@ -254,7 +254,7 @@ async fn main() { action_ref, param, params_json, - wait, + watch, timeout, notifier_url, }, diff --git a/crates/cli/src/output.rs b/crates/cli/src/output.rs index 3660ed6..b97ff19 100644 --- a/crates/cli/src/output.rs +++ b/crates/cli/src/output.rs @@ -4,6 +4,7 @@ use colored::Colorize; use comfy_table::{modifiers::UTF8_ROUND_CORNERS, presets::UTF8_FULL, Cell, Color, Table}; use serde::Serialize; use std::fmt::Display; +use terminal_size::{terminal_size, Width}; /// Output format for CLI commands #[derive(Debug, Clone, Copy, ValueEnum, PartialEq)] @@ -88,14 +89,69 @@ pub fn add_header(table: &mut Table, headers: Vec<&str>) { pub fn print_key_value_table(pairs: Vec<(&str, String)>) { let mut table = create_table(); add_header(&mut table, vec!["Key", "Value"]); + let width = terminal_width(); + let key_width = pairs + .iter() + .map(|(key, _)| display_width(key)) + .max() + .unwrap_or(3) + .clamp(8, 18); + let value_width = width.saturating_sub(key_width + 9).max(20); for (key, value) in pairs { - table.add_row(vec![Cell::new(key).fg(Color::Yellow), Cell::new(value)]); + table.add_row(vec![ + Cell::new(wrap_text(key, key_width)).fg(Color::Yellow), + Cell::new(wrap_text(&value, value_width)), + ]); } println!("{}", table); } +/// Print a schema in a readable multi-line format instead of a raw JSON dump. +pub fn print_schema(schema: &serde_json::Value) -> Result<()> { + if let Some(properties) = schema.as_object() { + if properties.values().all(|value| value.is_object()) { + let width = terminal_width(); + let content_width = width.saturating_sub(4).max(24); + let mut names = properties.keys().collect::>(); + names.sort(); + + for (index, name) in names.into_iter().enumerate() { + if index > 0 { + println!(); + } + + println!("{}", name.bold()); + if let Some(definition) = properties.get(name).and_then(|value| value.as_object()) { + print_schema_field("Type", &schema_type_label(definition), content_width); + + if let Some(default) = definition.get("default") { + print_schema_field("Default", &compact_json(default), content_width); + } + + if let Some(description) = definition + .get("description") + .and_then(|value| value.as_str()) + { + print_schema_field("Description", description, content_width); + } + + let constraints = schema_constraints(definition); + if !constraints.is_empty() { + print_schema_field("Constraints", &constraints.join(", "), content_width); + } + } + } + + return Ok(()); + } + } + + println!("{}", serde_yaml_ng::to_string(schema)?); + Ok(()) +} + /// Print a simple list pub fn print_list(items: Vec) { for item in items { @@ -137,6 +193,146 @@ pub fn truncate(s: &str, max_len: usize) -> String { } } +fn terminal_width() -> usize { + terminal_size() + .map(|(Width(width), _)| width as usize) + .filter(|width| *width > 20) + .unwrap_or(100) +} + +fn display_width(value: &str) -> usize { + value.chars().count() +} + +fn wrap_text(value: &str, width: usize) -> String { + let width = width.max(1); + let mut wrapped = Vec::new(); + + for paragraph in value.split('\n') { + if paragraph.is_empty() { + wrapped.push(String::new()); + continue; + } + + let mut line = String::new(); + for word in paragraph.split_whitespace() { + if line.is_empty() { + append_wrapped_word(&mut wrapped, &mut line, word, width); + continue; + } + + if display_width(&line) + 1 + display_width(word) <= width { + line.push(' '); + line.push_str(word); + } else { + wrapped.push(line); + line = String::new(); + append_wrapped_word(&mut wrapped, &mut line, word, width); + } + } + + if !line.is_empty() { + wrapped.push(line); + } + } + + wrapped.join("\n") +} + +fn append_wrapped_word( + lines: &mut Vec, + current_line: &mut String, + word: &str, + width: usize, +) { + if display_width(word) <= width { + current_line.push_str(word); + return; + } + + let mut chunk = String::new(); + for ch in word.chars() { + chunk.push(ch); + if display_width(&chunk) >= width { + if current_line.is_empty() { + lines.push(std::mem::take(&mut chunk)); + } else { + lines.push(std::mem::take(current_line)); + lines.push(std::mem::take(&mut chunk)); + } + } + } + + if !chunk.is_empty() { + current_line.push_str(&chunk); + } +} + +fn schema_type_label(definition: &serde_json::Map) -> String { + match definition.get("type") { + Some(serde_json::Value::String(kind)) => kind.clone(), + Some(serde_json::Value::Array(kinds)) => kinds + .iter() + .filter_map(|value| value.as_str()) + .collect::>() + .join(" | "), + _ => "any".to_string(), + } +} + +fn schema_constraints(definition: &serde_json::Map) -> Vec { + let mut constraints = Vec::new(); + + if let Some(values) = definition.get("enum").and_then(|value| value.as_array()) { + constraints.push(format!( + "enum: {}", + values + .iter() + .map(compact_json) + .collect::>() + .join(", ") + )); + } + + for key in [ + "minimum", + "maximum", + "minLength", + "maxLength", + "pattern", + "format", + ] { + if let Some(value) = definition.get(key) { + constraints.push(format!("{key}: {}", compact_json(value))); + } + } + + constraints +} + +fn compact_json(value: &serde_json::Value) -> String { + serde_json::to_string(value).unwrap_or_else(|_| value.to_string()) +} + +fn print_schema_field(label: &str, value: &str, width: usize) { + let indent = " "; + let label_prefix = format!("{indent}{label}: "); + let continuation = " ".repeat(label_prefix.chars().count()); + let wrapped = wrap_text( + value, + width.saturating_sub(label_prefix.chars().count()).max(12), + ); + let mut lines = wrapped.lines(); + + if let Some(first_line) = lines.next() { + println!("{label_prefix}{first_line}"); + } + + for line in lines { + println!("{continuation}{line}"); + } +} + /// Format a timestamp in a human-readable way pub fn format_timestamp(timestamp: &str) -> String { // Try to parse and format nicely, otherwise return as-is diff --git a/crates/cli/src/wait.rs b/crates/cli/src/wait.rs index 7e7623c..e10e9dd 100644 --- a/crates/cli/src/wait.rs +++ b/crates/cli/src/wait.rs @@ -10,15 +10,19 @@ //! - [`wait_for_execution`] – the single entry point use anyhow::Result; +use chrono::{DateTime, Utc}; +use colored::Colorize; use futures::{SinkExt, StreamExt}; use reqwest_eventsource::{Event as SseEvent, EventSource}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::io::{self, IsTerminal, Write}; use std::sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, + Arc, Mutex, }; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use terminal_size::{terminal_size, Width}; use tokio_tungstenite::{connect_async, tungstenite::Message}; use crate::client::ApiClient; @@ -121,6 +125,8 @@ struct RestExecution { status: String, result: Option, created: String, + #[serde(default)] + started_at: Option, updated: String, } @@ -137,6 +143,9 @@ struct ExecutionListItem { action_ref: String, status: String, #[serde(default)] + started_at: Option, + updated: String, + #[serde(default)] workflow_task: Option, } @@ -165,7 +174,10 @@ struct StreamWatchConfig { token: String, execution_id: i64, prefix: Option, - verbose: bool, + debug: bool, + emit_output: bool, + task_id: Option, + live_renderer: Option, delivered_output: Arc, root_stdout_completed: Option>, } @@ -176,6 +188,280 @@ struct StreamLogTask { config: StreamWatchConfig, } +const MAX_TASK_TAIL_LINES: usize = 4; +const RENDER_TICK: Duration = Duration::from_millis(120); +const SPINNER_FRAMES: &[&str] = &["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]; + +#[derive(Debug, Clone)] +struct LiveTaskState { + label: String, + task_name: String, + is_root: bool, + is_iterated: bool, + action_ref: String, + status: String, + started_at: Option>, + finished_at: Option>, + stderr_lines: VecDeque, + stdout_lines: VecDeque, +} + +#[derive(Debug, Clone)] +struct LiveTaskUpdate { + id: i64, + label: String, + task_name: String, + is_root: bool, + is_iterated: bool, + action_ref: String, + status: String, + started_at: Option>, + finished_at: Option>, +} + +#[derive(Debug, Default)] +struct IteratedTaskSummary { + task_name: String, + pending: usize, + running: usize, + completed: usize, + failed: usize, + first_started_at: Option>, + first_seen_id: i64, +} + +#[derive(Debug, Default)] +struct LiveRendererState { + tasks: BTreeMap, + rendered_lines: usize, +} + +#[derive(Clone)] +struct LiveRenderer { + enabled: bool, + state: Arc>, + stop: Arc, +} + +impl LiveRenderer { + fn new(show_progress: bool) -> Self { + Self { + enabled: show_progress && io::stderr().is_terminal(), + state: Arc::new(Mutex::new(LiveRendererState::default())), + stop: Arc::new(AtomicBool::new(false)), + } + } + + fn enabled(&self) -> bool { + self.enabled + } + + fn spawn(&self) -> Option> { + if !self.enabled { + return None; + } + + let renderer = self.clone(); + Some(tokio::spawn(async move { + loop { + renderer.render(false); + if renderer.stop.load(Ordering::Relaxed) { + renderer.render(true); + break; + } + tokio::time::sleep(RENDER_TICK).await; + } + })) + } + + fn stop(&self) { + if self.enabled { + self.stop.store(true, Ordering::Relaxed); + } + } + + fn upsert_task(&self, update: LiveTaskUpdate) { + if !self.enabled { + return; + } + + let LiveTaskUpdate { + id, + label, + task_name, + is_root, + is_iterated, + action_ref, + status, + started_at, + finished_at, + } = update; + + let mut state = self.state.lock().expect("live renderer poisoned"); + let entry = state.tasks.entry(id).or_insert_with(|| LiveTaskState { + label: label.clone(), + task_name: task_name.clone(), + is_root, + is_iterated, + action_ref: action_ref.clone(), + status: status.clone(), + started_at, + finished_at, + stderr_lines: VecDeque::new(), + stdout_lines: VecDeque::new(), + }); + entry.label = label; + entry.task_name = task_name; + entry.is_root = is_root; + entry.is_iterated = is_iterated; + entry.action_ref = action_ref; + entry.status = status.clone(); + entry.started_at = started_at.or(entry.started_at); + entry.finished_at = if is_terminal(&status) { + finished_at + } else { + None + }; + if should_clear_task_tail(&status) { + entry.stderr_lines.clear(); + entry.stdout_lines.clear(); + } + } + + fn push_line(&self, id: i64, stream_name: &str, line: String) { + if !self.enabled || line.is_empty() { + return; + } + + let mut state = self.state.lock().expect("live renderer poisoned"); + if let Some(task) = state.tasks.get_mut(&id) { + if should_clear_task_tail(&task.status) { + task.stderr_lines.clear(); + task.stdout_lines.clear(); + return; + } + + let target_lines = if stream_name == "stdout" { + &mut task.stdout_lines + } else { + &mut task.stderr_lines + }; + target_lines.push_back(truncate_log_line(&line)); + while target_lines.len() > MAX_TASK_TAIL_LINES { + target_lines.pop_front(); + } + } + } + + fn render(&self, force: bool) { + if !self.enabled { + return; + } + + let mut state = self.state.lock().expect("live renderer poisoned"); + if state.tasks.is_empty() && !force { + return; + } + + let now = Instant::now(); + let width = current_terminal_width(); + let has_child_tasks = state.tasks.values().any(|task| !task.is_root); + let iterated_summaries = build_iterated_summaries(&state.tasks); + let mut lines = Vec::new(); + let mut summary_by_name = HashMap::new(); + for summary in &iterated_summaries { + summary_by_name.insert(summary.task_name.as_str(), summary); + } + + let mut items = state + .tasks + .iter() + .filter_map(|(id, task)| { + if task.is_root && has_child_tasks { + return None; + } + if task.is_iterated { + return should_render_iterated_task(task).then(|| RenderItem { + group_started_at: summary_by_name + .get(task.task_name.as_str()) + .and_then(|summary| summary.first_started_at.as_ref()) + .map(DateTime::timestamp_millis), + group_id: summary_by_name + .get(task.task_name.as_str()) + .map_or(*id, |summary| summary.first_seen_id), + within_group_rank: 1, + started_at: task.started_at.as_ref().map(DateTime::timestamp_millis), + id: *id, + kind: RenderItemKind::Task(task), + }); + } + + Some(RenderItem { + group_started_at: task.started_at.as_ref().map(DateTime::timestamp_millis), + group_id: *id, + within_group_rank: 0, + started_at: task.started_at.as_ref().map(DateTime::timestamp_millis), + id: *id, + kind: RenderItemKind::Task(task), + }) + }) + .collect::>(); + + items.extend(iterated_summaries.iter().map(|summary| { + RenderItem { + group_started_at: summary + .first_started_at + .as_ref() + .map(DateTime::timestamp_millis), + group_id: summary.first_seen_id, + within_group_rank: 0, + started_at: summary + .first_started_at + .as_ref() + .map(DateTime::timestamp_millis), + id: summary.first_seen_id, + kind: RenderItemKind::IteratedSummary(summary), + } + })); + items.sort_by(render_item_cmp); + + for item in items { + match item.kind { + RenderItemKind::Task(task) => lines.extend(render_task_lines(task, now, width)), + RenderItemKind::IteratedSummary(summary) => { + lines.push(render_iterated_summary_line(summary, now, width)) + } + } + } + + let mut stderr = io::stderr().lock(); + if state.rendered_lines > 0 { + let _ = write!(stderr, "\x1b[{}F\x1b[J", state.rendered_lines); + } + for line in &lines { + let _ = writeln!(stderr, "{line}"); + } + let _ = stderr.flush(); + state.rendered_lines = lines.len(); + } +} + +#[derive(Clone, Copy)] +struct RenderItem<'a> { + group_started_at: Option, + group_id: i64, + within_group_rank: u8, + started_at: Option, + id: i64, + kind: RenderItemKind<'a>, +} + +#[derive(Clone, Copy)] +enum RenderItemKind<'a> { + Task(&'a LiveTaskState), + IteratedSummary(&'a IteratedTaskSummary), +} + impl From for ExecutionSummary { fn from(e: RestExecution) -> Self { Self { @@ -254,7 +540,9 @@ pub async fn wait_for_execution(opts: WaitOptions<'_>) -> Result OutputWatchTask { let delivered_output = Arc::new(AtomicBool::new(false)); let root_stdout_completed = Arc::new(AtomicBool::new(false)); @@ -264,13 +552,15 @@ pub fn spawn_execution_output_watch( if let Err(err) = watch_execution_output( &mut client, execution_id, - verbose, + show_progress, + stream_logs, + debug, delivered_output_for_task, root_stdout_completed_for_task, ) .await { - if verbose { + if debug { eprintln!(" [watch] {}", err); } } @@ -286,20 +576,49 @@ pub fn spawn_execution_output_watch( async fn watch_execution_output( client: &mut ApiClient, execution_id: i64, - verbose: bool, + show_progress: bool, + stream_logs: bool, + debug: bool, delivered_output: Arc, root_stdout_completed: Arc, ) -> Result<()> { let base_url = client.base_url().to_string(); + let live_renderer = LiveRenderer::new(show_progress); + let render_handle = live_renderer.spawn(); + let plain_progress = show_progress && !live_renderer.enabled(); let mut root_watch: Option = None; let mut children: HashMap = HashMap::new(); loop { let execution: RestExecution = client.get(&format!("/executions/{}", execution_id)).await?; + let root_started_at = execution + .started_at + .as_deref() + .and_then(parse_api_timestamp); + let root_finished_at = if is_terminal(&execution.status) { + parse_api_timestamp(&execution.updated) + } else { + None + }; - if root_watch - .as_ref() - .is_none_or(|state| streams_need_restart(&state.stream_handles)) + if live_renderer.enabled() { + live_renderer.upsert_task(LiveTaskUpdate { + id: execution.id, + label: format!("execution#{}", execution.id), + task_name: execution.action_ref.clone(), + is_root: true, + is_iterated: false, + action_ref: execution.action_ref.clone(), + status: execution.status.clone(), + started_at: root_started_at, + finished_at: root_finished_at, + }); + } + + if stream_logs + && root_watch + .as_ref() + .is_none_or(|state| streams_need_restart(&state.stream_handles)) { if let Some(token) = client.auth_token().map(str::to_string) { match root_watch.as_mut() { @@ -310,7 +629,10 @@ async fn watch_execution_output( token, execution_id, prefix: None, - verbose, + debug, + emit_output: !live_renderer.enabled(), + task_id: live_renderer.enabled().then_some(execution_id), + live_renderer: live_renderer.enabled().then_some(live_renderer.clone()), delivered_output: delivered_output.clone(), root_stdout_completed: Some(root_stdout_completed.clone()), }, @@ -321,8 +643,13 @@ async fn watch_execution_output( base_url: base_url.clone(), token, execution_id, - verbose, + debug, prefix: None, + emit_output: !live_renderer.enabled(), + task_id: live_renderer.enabled().then_some(execution_id), + live_renderer: live_renderer + .enabled() + .then_some(live_renderer.clone()), delivered_output: delivered_output.clone(), root_stdout_completed: Some(root_stdout_completed.clone()), }), @@ -338,25 +665,63 @@ async fn watch_execution_output( for child in child_items { let label = format_task_label(&child.workflow_task, &child.action_ref, child.id); + let task_name = child + .workflow_task + .as_ref() + .map(|task| task.task_name.clone()) + .unwrap_or_else(|| label.clone()); + let is_iterated = child + .workflow_task + .as_ref() + .and_then(|task| task.task_index) + .is_some(); + let started_at = child.started_at.as_deref().and_then(parse_api_timestamp); + let finished_at = if is_terminal(&child.status) { + parse_api_timestamp(&child.updated) + } else { + None + }; let entry = children.entry(child.id).or_insert_with(|| { - if verbose { + if plain_progress { eprintln!(" [{}] started ({})", label, child.action_ref); } - let stream_handles = client - .auth_token() - .map(str::to_string) - .map(|token| { - spawn_execution_log_streams(StreamWatchConfig { - base_url: base_url.clone(), - token, - execution_id: child.id, - prefix: Some(label.clone()), - verbose, - delivered_output: delivered_output.clone(), - root_stdout_completed: None, + if live_renderer.enabled() { + live_renderer.upsert_task(LiveTaskUpdate { + id: child.id, + label: label.clone(), + task_name: task_name.clone(), + is_root: false, + is_iterated, + action_ref: child.action_ref.clone(), + status: child.status.clone(), + started_at, + finished_at, + }); + } + let stream_handles = if stream_logs && should_stream_logs(&child) { + client + .auth_token() + .map(str::to_string) + .map(|token| { + spawn_execution_log_streams(StreamWatchConfig { + base_url: base_url.clone(), + token, + execution_id: child.id, + prefix: Some(label.clone()), + debug, + emit_output: !live_renderer.enabled(), + task_id: Some(child.id), + live_renderer: live_renderer + .enabled() + .then_some(live_renderer.clone()), + delivered_output: delivered_output.clone(), + root_stdout_completed: None, + }) }) - }) - .unwrap_or_default(); + .unwrap_or_default() + } else { + Vec::new() + }; ChildWatchState { label, status: child.status.clone(), @@ -368,18 +733,38 @@ async fn watch_execution_output( if entry.status != child.status { entry.status = child.status.clone(); } + if live_renderer.enabled() { + live_renderer.upsert_task(LiveTaskUpdate { + id: child.id, + label: entry.label.clone(), + task_name: task_name.clone(), + is_root: false, + is_iterated, + action_ref: child.action_ref.clone(), + status: child.status.clone(), + started_at, + finished_at, + }); + } let child_is_terminal = is_terminal(&entry.status); - if !child_is_terminal && streams_need_restart(&entry.stream_handles) { + if stream_logs + && should_stream_logs(&child) + && !child_is_terminal + && streams_need_restart(&entry.stream_handles) + { if let Some(token) = client.auth_token().map(str::to_string) { - restart_finished_streams( + ensure_streams_running( &mut entry.stream_handles, &StreamWatchConfig { base_url: base_url.clone(), token, execution_id: child.id, prefix: Some(entry.label.clone()), - verbose, + debug, + emit_output: !live_renderer.enabled(), + task_id: Some(child.id), + live_renderer: live_renderer.enabled().then_some(live_renderer.clone()), delivered_output: delivered_output.clone(), root_stdout_completed: None, }, @@ -389,7 +774,7 @@ async fn watch_execution_output( if !entry.announced_terminal && is_terminal(&child.status) { entry.announced_terminal = true; - if verbose { + if plain_progress { eprintln!(" [{}] {}", entry.label, child.status); } } @@ -410,6 +795,46 @@ async fn watch_execution_output( wait_for_stream_handles(child.stream_handles).await; } + if live_renderer.enabled() { + if let Ok(final_children) = list_child_executions(client, execution_id).await { + for child in final_children { + let label = format_task_label(&child.workflow_task, &child.action_ref, child.id); + let task_name = child + .workflow_task + .as_ref() + .map(|task| task.task_name.clone()) + .unwrap_or_else(|| label.clone()); + let is_iterated = child + .workflow_task + .as_ref() + .and_then(|task| task.task_index) + .is_some(); + let started_at = child.started_at.as_deref().and_then(parse_api_timestamp); + let finished_at = if is_terminal(&child.status) { + parse_api_timestamp(&child.updated) + } else { + None + }; + live_renderer.upsert_task(LiveTaskUpdate { + id: child.id, + label, + task_name, + is_root: false, + is_iterated, + action_ref: child.action_ref, + status: child.status, + started_at, + finished_at, + }); + } + } + } + + live_renderer.stop(); + if let Some(handle) = render_handle { + let _ = handle.await; + } + Ok(()) } @@ -433,7 +858,10 @@ fn spawn_execution_log_streams(config: StreamWatchConfig) -> Vec bool { handles.is_empty() || handles.iter().any(|handle| handle.handle.is_finished()) } +fn ensure_streams_running(handles: &mut Vec, config: &StreamWatchConfig) { + if handles.is_empty() { + *handles = spawn_execution_log_streams(config.clone()); + return; + } + + restart_finished_streams(handles, config); +} + fn restart_finished_streams(handles: &mut [StreamWatchHandle], config: &StreamWatchConfig) { for stream in handles.iter_mut() { if stream.handle.is_finished() { @@ -465,7 +902,10 @@ fn restart_finished_streams(handles: &mut [StreamWatchHandle], config: &StreamWa token: config.token.clone(), execution_id: config.execution_id, prefix: config.prefix.clone(), - verbose: config.verbose, + debug: config.debug, + emit_output: config.emit_output, + task_id: config.task_id, + live_renderer: config.live_renderer.clone(), delivered_output: config.delivered_output.clone(), root_stdout_completed: completion_flag, }, @@ -492,6 +932,7 @@ async fn list_child_executions( loop { let path = format!("/executions?parent={execution_id}&page={page}&per_page={PER_PAGE}"); let mut page_items: Vec = client.get_paginated(&path).await?; + page_items.sort_by_key(|item| item.id); let page_len = page_items.len(); all_children.append(&mut page_items); @@ -855,7 +1296,10 @@ async fn stream_execution_log(task: StreamLogTask) { token, execution_id, prefix, - verbose, + debug, + emit_output, + task_id, + live_renderer, delivered_output, root_stdout_completed, }, @@ -869,7 +1313,7 @@ async fn stream_execution_log(task: StreamLogTask) { )) { Ok(url) => url, Err(err) => { - if verbose { + if debug { eprintln!(" [watch] failed to build stream URL: {}", err); } return; @@ -895,17 +1339,34 @@ async fn stream_execution_log(task: StreamLogTask) { if !message.data.is_empty() { delivered_output.store(true, Ordering::Relaxed); } - print_stream_chunk(prefix.as_deref(), &message.data, &mut carry); + let lines = consume_stream_chunk(&message.data, &mut carry); + emit_stream_lines( + prefix.as_deref(), + stream_name, + task_id, + &live_renderer, + emit_output, + lines, + ); } "done" => { if let Some(flag) = &root_stdout_completed { flag.store(true, Ordering::Relaxed); } - flush_stream_chunk(prefix.as_deref(), &mut carry); + if let Some(line) = take_remaining_stream_chunk(&mut carry) { + emit_stream_lines( + prefix.as_deref(), + stream_name, + task_id, + &live_renderer, + emit_output, + vec![line], + ); + } break; } "error" => { - if verbose && !message.data.is_empty() { + if debug && !message.data.is_empty() { eprintln!(" [watch] {}", message.data); } break; @@ -913,8 +1374,20 @@ async fn stream_execution_log(task: StreamLogTask) { _ => {} }, Err(err) => { - flush_stream_chunk(prefix.as_deref(), &mut carry); - if verbose { + if let Some(line) = take_remaining_stream_chunk(&mut carry) { + emit_stream_lines( + prefix.as_deref(), + stream_name, + task_id, + &live_renderer, + emit_output, + vec![line], + ); + } + if is_benign_stream_end_error(&err) { + break; + } + if debug { eprintln!( " [watch] stream error for execution {}: {}", execution_id, err @@ -925,12 +1398,26 @@ async fn stream_execution_log(task: StreamLogTask) { } } - flush_stream_chunk(prefix.as_deref(), &mut carry); + if let Some(line) = take_remaining_stream_chunk(&mut carry) { + emit_stream_lines( + prefix.as_deref(), + stream_name, + task_id, + &live_renderer, + emit_output, + vec![line], + ); + } event_source.close(); } -fn print_stream_chunk(prefix: Option<&str>, chunk: &str, carry: &mut String) { +fn is_benign_stream_end_error(err: &reqwest_eventsource::Error) -> bool { + err.to_string().contains("Stream ended") +} + +fn consume_stream_chunk(chunk: &str, carry: &mut String) -> Vec { carry.push_str(chunk); + let mut lines = Vec::new(); while let Some(idx) = carry.find('\n') { let mut line = carry.drain(..=idx).collect::(); @@ -940,26 +1427,285 @@ fn print_stream_chunk(prefix: Option<&str>, chunk: &str, carry: &mut String) { if line.ends_with('\r') { line.pop(); } + lines.push(line); + } - if let Some(prefix) = prefix { - eprintln!("[{}] {}", prefix, line); - } else { - eprintln!("{}", line); + lines +} + +fn take_remaining_stream_chunk(carry: &mut String) -> Option { + if carry.is_empty() { + return None; + } + let line = carry.clone(); + carry.clear(); + Some(line) +} + +fn emit_stream_lines( + prefix: Option<&str>, + _stream_name: &str, + task_id: Option, + live_renderer: &Option, + emit_output: bool, + lines: Vec, +) { + if lines.is_empty() { + return; + } + + if let (Some(renderer), Some(task_id)) = (live_renderer.as_ref(), task_id) { + for line in lines { + renderer.push_line(task_id, _stream_name, line); + } + return; + } + + if emit_output { + for line in lines { + if let Some(prefix) = prefix { + eprintln!("[{}] {}", prefix, line); + } else { + eprintln!("{}", line); + } } } } -fn flush_stream_chunk(prefix: Option<&str>, carry: &mut String) { - if carry.is_empty() { - return; +fn should_clear_task_tail(status: &str) -> bool { + matches!(status.to_lowercase().as_str(), "completed" | "succeeded") +} + +fn build_iterated_summaries(tasks: &BTreeMap) -> Vec { + let mut summaries: BTreeMap = BTreeMap::new(); + + for (id, task) in tasks.iter().filter(|(_, task)| task.is_iterated) { + let summary = + summaries + .entry(task.task_name.clone()) + .or_insert_with(|| IteratedTaskSummary { + task_name: task.task_name.clone(), + first_seen_id: *id, + ..Default::default() + }); + + if *id < summary.first_seen_id { + summary.first_seen_id = *id; + } + match (&summary.first_started_at, &task.started_at) { + (None, Some(started_at)) => summary.first_started_at = Some(*started_at), + (Some(current), Some(started_at)) if started_at < current => { + summary.first_started_at = Some(*started_at); + } + _ => {} + } + + match normalized_task_state(&task.status) { + TaskStateBucket::Pending => summary.pending += 1, + TaskStateBucket::Running => summary.running += 1, + TaskStateBucket::Completed => summary.completed += 1, + TaskStateBucket::Failed => summary.failed += 1, + } } - if let Some(prefix) = prefix { - eprintln!("[{}] {}", prefix, carry); + summaries.into_values().collect() +} + +fn render_iterated_summary_line( + summary: &IteratedTaskSummary, + now: Instant, + width: usize, +) -> String { + let (icon, icon_width) = if summary.failed > 0 { + ("✗".red().bold().to_string(), 1) + } else if summary.running == 0 && summary.pending == 0 { + ("✓".green().bold().to_string(), 1) } else { - eprintln!("{}", carry); + (spinner_frame(now).cyan().to_string(), 1) + }; + let left = format!( + "{}: {} running, {} pending, {} completed, {} failed", + summary.task_name, summary.running, summary.pending, summary.completed, summary.failed + ); + format_row(&icon, icon_width, &left, None, None, width) +} + +fn render_item_cmp(left: &RenderItem<'_>, right: &RenderItem<'_>) -> std::cmp::Ordering { + render_sort_tuple(left) + .cmp(&render_sort_tuple(right)) + .then_with(|| left.started_at.cmp(&right.started_at)) + .then_with(|| left.id.cmp(&right.id)) +} + +fn render_sort_tuple(item: &RenderItem<'_>) -> (bool, i64, i64, u8) { + ( + item.group_started_at.is_none(), + item.group_started_at.unwrap_or(i64::MAX), + item.group_id, + item.within_group_rank, + ) +} + +fn should_render_iterated_task(task: &LiveTaskState) -> bool { + normalized_task_state(&task.status) == TaskStateBucket::Running && task.started_at.is_some() +} + +fn render_task_lines(task: &LiveTaskState, now: Instant, width: usize) -> Vec { + let elapsed = task + .started_at + .map(|started_at| { + let ended_at = task.finished_at.unwrap_or_else(Utc::now); + format_elapsed( + ended_at + .signed_duration_since(started_at) + .to_std() + .unwrap_or_default(), + ) + }) + .unwrap_or_else(|| "--:--.-".to_string()); + let status = task.status.to_lowercase(); + let (icon, icon_width) = match status.as_str() { + "completed" | "succeeded" => ("✓".green().bold().to_string(), 1), + "failed" => ("✗".red().bold().to_string(), 1), + "cancelled" | "canceled" => ("○".bright_black().to_string(), 1), + "timeout" | "timed_out" => ("◷".yellow().bold().to_string(), 1), + _ => (spinner_frame(now).cyan().to_string(), 1), + }; + + let left = format!("{} {}", task.label, task.status); + let mut lines = vec![format_row( + &icon, + icon_width, + &left, + Some(&format!("[{}]", task.action_ref)), + Some(&elapsed), + width, + )]; + for line in task.stderr_lines.iter().chain(task.stdout_lines.iter()) { + lines.push(format!( + " {}", + truncate_to_width(line, width.saturating_sub(4)) + )); + } + lines +} + +fn spinner_frame(_now: Instant) -> &'static str { + let frame = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + / RENDER_TICK.as_millis(); + let frame = frame as usize % SPINNER_FRAMES.len(); + SPINNER_FRAMES[frame] +} + +fn format_elapsed(duration: Duration) -> String { + let total_tenths = (duration.as_secs_f64() * 10.0).round() as u64; + let total = total_tenths / 10; + let tenths = total_tenths % 10; + let hours = total / 3600; + let minutes = (total % 3600) / 60; + let seconds = total % 60; + if hours > 0 { + format!("{hours:02}:{minutes:02}:{seconds:02}.{tenths}") + } else { + format!("{minutes:02}:{seconds:02}.{tenths}") + } +} + +fn truncate_log_line(line: &str) -> String { + truncate_to_width(line, 120) +} + +fn format_row( + icon: &str, + icon_width: usize, + left: &str, + right_prefix: Option<&str>, + right: Option<&str>, + width: usize, +) -> String { + let min_gap = 2; + let right_prefix_width = right_prefix.map_or(0, display_width); + let right_width = right.map_or(0, display_width); + let right_total_width = + right_prefix_width + usize::from(right_prefix.is_some() && right.is_some()) + right_width; + let reserved = icon_width + 1 + min_gap + right_total_width; + let available_left = width.saturating_sub(reserved).max(10); + let left = truncate_to_width(left, available_left); + let left_width = display_width(&left); + + if right_prefix.is_some() || right.is_some() { + let gap = width + .saturating_sub(icon_width + 1 + left_width + right_total_width) + .max(min_gap); + let mut row = format!("{icon} {left}{}", " ".repeat(gap)); + if let Some(right_prefix) = right_prefix { + row.push_str(right_prefix); + } + if let Some(right) = right { + if right_prefix.is_some() { + row.push(' '); + } + row.push_str(&right.bright_black().to_string()); + } + row + } else { + format!("{icon} {left}") + } +} + +fn current_terminal_width() -> usize { + terminal_size() + .map(|(Width(width), _)| width as usize) + .filter(|width| *width > 20) + .unwrap_or(100) +} + +fn display_width(value: &str) -> usize { + value.chars().count() +} + +fn truncate_to_width(value: &str, width: usize) -> String { + if display_width(value) <= width { + return value.to_string(); + } + + value + .chars() + .take(width.saturating_sub(1)) + .collect::() + + "…" +} + +fn should_stream_logs(execution: &ExecutionListItem) -> bool { + execution.started_at.is_some() +} + +fn parse_api_timestamp(value: &str) -> Option> { + DateTime::parse_from_rfc3339(value) + .ok() + .map(|timestamp| timestamp.with_timezone(&Utc)) +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum TaskStateBucket { + Pending, + Running, + Completed, + Failed, +} + +fn normalized_task_state(status: &str) -> TaskStateBucket { + match status { + "requested" | "scheduling" | "scheduled" => TaskStateBucket::Pending, + "completed" | "succeeded" => TaskStateBucket::Completed, + "failed" | "timeout" | "timed_out" | "cancelled" | "canceled" | "abandoned" => { + TaskStateBucket::Failed + } + _ => TaskStateBucket::Running, } - carry.clear(); } #[cfg(test)] @@ -1034,6 +1780,62 @@ mod tests { assert_eq!(extract_stdout(&result).as_deref(), Some("hello world")); } + #[test] + fn test_should_clear_task_tail() { + assert!(should_clear_task_tail("completed")); + assert!(should_clear_task_tail("succeeded")); + assert!(!should_clear_task_tail("failed")); + assert!(!should_clear_task_tail("running")); + } + + #[test] + fn test_push_line_ignores_late_output_for_successful_task() { + let renderer = LiveRenderer { + enabled: true, + state: Arc::new(Mutex::new(LiveRendererState::default())), + stop: Arc::new(AtomicBool::new(false)), + }; + + renderer.upsert_task(LiveTaskUpdate { + id: 7, + label: "task".to_string(), + task_name: "task".to_string(), + is_root: false, + is_iterated: false, + action_ref: "core.echo".to_string(), + status: "completed".to_string(), + started_at: None, + finished_at: Some(Utc::now()), + }); + renderer.push_line(7, "stderr", "should not persist".to_string()); + + let state = renderer.state.lock().expect("live renderer poisoned"); + let task = state.tasks.get(&7).expect("task exists"); + assert!(task.stderr_lines.is_empty()); + assert!(task.stdout_lines.is_empty()); + } + + #[test] + fn test_render_task_lines_places_stdout_after_stderr() { + let task = LiveTaskState { + label: "task".to_string(), + task_name: "task".to_string(), + is_root: false, + is_iterated: false, + action_ref: "core.echo".to_string(), + status: "running".to_string(), + started_at: None, + finished_at: None, + stderr_lines: VecDeque::from(vec!["stderr line".to_string()]), + stdout_lines: VecDeque::from(vec!["stdout line".to_string()]), + }; + + let lines = render_task_lines(&task, Instant::now(), 100); + + assert!(lines[1].contains("stderr line")); + assert!(lines[2].contains("stdout line")); + } + #[test] fn test_format_task_label() { let workflow_task = Some(WorkflowTaskMetadata { @@ -1046,4 +1848,95 @@ mod tests { ); assert_eq!(format_task_label(&None, "core.echo", 42), "core.echo#42"); } + + #[test] + fn test_is_benign_stream_end_error_message() { + let err = reqwest_eventsource::Error::StreamEnded; + assert!(is_benign_stream_end_error(&err)); + } + + #[test] + fn test_consume_stream_chunk_splits_lines() { + let mut carry = String::new(); + let lines = consume_stream_chunk("one\ntwo", &mut carry); + assert_eq!(lines, vec!["one".to_string()]); + assert_eq!(carry, "two"); + + let lines = consume_stream_chunk("\nthree\n", &mut carry); + assert_eq!(lines, vec!["two".to_string(), "three".to_string()]); + assert!(carry.is_empty()); + } + + #[test] + fn test_format_elapsed() { + assert_eq!(format_elapsed(Duration::from_millis(5300)), "00:05.3"); + assert_eq!(format_elapsed(Duration::from_millis(65100)), "01:05.1"); + assert_eq!(format_elapsed(Duration::from_millis(3665100)), "01:01:05.1"); + } + + #[test] + fn test_truncate_to_width() { + assert_eq!(truncate_to_width("abcdef", 4), "abc…"); + assert_eq!(truncate_to_width("abc", 4), "abc"); + } + + #[tokio::test] + async fn test_ensure_streams_running_spawns_for_empty_handles() { + let mut handles = Vec::new(); + let config = StreamWatchConfig { + base_url: "not a url".to_string(), + token: "token".to_string(), + execution_id: 42, + prefix: Some("process_items[1]".to_string()), + debug: false, + emit_output: false, + task_id: Some(42), + live_renderer: None, + delivered_output: Arc::new(AtomicBool::new(false)), + root_stdout_completed: None, + }; + + ensure_streams_running(&mut handles, &config); + + assert_eq!(handles.len(), 2); + + wait_for_stream_handles(handles).await; + } + + #[test] + fn test_root_task_is_hidden_when_child_tasks_exist() { + let renderer = LiveRenderer { + enabled: true, + state: Arc::new(Mutex::new(LiveRendererState::default())), + stop: Arc::new(AtomicBool::new(false)), + }; + + renderer.upsert_task(LiveTaskUpdate { + id: 1, + label: "execution#1".to_string(), + task_name: "core.workflow".to_string(), + is_root: true, + is_iterated: false, + action_ref: "core.workflow".to_string(), + status: "running".to_string(), + started_at: None, + finished_at: None, + }); + renderer.upsert_task(LiveTaskUpdate { + id: 2, + label: "task_a".to_string(), + task_name: "task_a".to_string(), + is_root: false, + is_iterated: false, + action_ref: "core.echo".to_string(), + status: "running".to_string(), + started_at: None, + finished_at: None, + }); + + renderer.render(false); + + let state = renderer.state.lock().expect("live renderer poisoned"); + assert_eq!(state.rendered_lines, 1); + } } diff --git a/crates/cli/tests/README.md b/crates/cli/tests/README.md index 3d92410..e4026b7 100644 --- a/crates/cli/tests/README.md +++ b/crates/cli/tests/README.md @@ -109,7 +109,7 @@ cargo test --package attune-cli --tests -- --test-threads=1 - ✅ Execute with multiple parameters - ✅ Execute with JSON parameters - ✅ Execute without parameters -- ✅ Execute with --wait flag +- ✅ Execute with --watch flag - ✅ Execute with --async flag - ✅ List actions by pack - ✅ Invalid parameter formats @@ -287,4 +287,4 @@ For more information: - [CLI Usage Guide](../README.md) - [CLI Profile Management](../../../docs/cli-profiles.md) - [API Documentation](../../../docs/api-*.md) -- [Main Project README](../../../README.md) \ No newline at end of file +- [Main Project README](../../../README.md) diff --git a/crates/cli/tests/test_actions.rs b/crates/cli/tests/test_actions.rs index 5c18463..f410afe 100644 --- a/crates/cli/tests/test_actions.rs +++ b/crates/cli/tests/test_actions.rs @@ -324,7 +324,7 @@ async fn test_action_execute_wait_for_completion() { .arg("core.echo") .arg("--param") .arg("message=test") - .arg("--wait"); + .arg("--watch"); cmd.assert() .success() @@ -476,7 +476,7 @@ async fn test_action_execute_async_flag() { .arg("action") .arg("execute") .arg("core.long_running"); - // Note: default behavior is async (no --wait), so no --async flag needed + // Note: default behavior is async (no --watch), so no --async flag needed cmd.assert() .success() diff --git a/crates/common/src/pack_registry/installer.rs b/crates/common/src/pack_registry/installer.rs index 8c00090..ca25d31 100644 --- a/crates/common/src/pack_registry/installer.rs +++ b/crates/common/src/pack_registry/installer.rs @@ -493,7 +493,16 @@ impl PackInstaller { })?; let normalized_host = host.to_ascii_lowercase(); - if normalized_host == "localhost" { + // Whether the host is explicitly trusted via the allowlist. Explicitly allowlisted hosts + // bypass private-IP checks so that local/private registries (e.g. a self-hosted Gitea) + // can be used in development or air-gapped environments. + let host_is_allowlisted = self + .allowed_remote_hosts + .as_ref() + .map(|set| set.contains(&normalized_host)) + .unwrap_or(false); + + if normalized_host == "localhost" && !host_is_allowlisted { return Err(Error::validation(format!( "Remote URL host is not allowed: {}", host @@ -509,12 +518,14 @@ impl PackInstaller { } } - if let Some(ip) = parsed.host().and_then(|host| match host { - url::Host::Ipv4(ip) => Some(IpAddr::V4(ip)), - url::Host::Ipv6(ip) => Some(IpAddr::V6(ip)), - url::Host::Domain(_) => None, - }) { - ensure_public_ip(ip)?; + if !host_is_allowlisted { + if let Some(ip) = parsed.host().and_then(|host| match host { + url::Host::Ipv4(ip) => Some(IpAddr::V4(ip)), + url::Host::Ipv6(ip) => Some(IpAddr::V6(ip)), + url::Host::Domain(_) => None, + }) { + ensure_public_ip(ip)?; + } } let port = parsed.port_or_known_default().ok_or_else(|| { @@ -528,7 +539,9 @@ impl PackInstaller { let mut saw_address = false; for addr in resolved { saw_address = true; - ensure_public_ip(addr.ip())?; + if !host_is_allowlisted { + ensure_public_ip(addr.ip())?; + } } if !saw_address { @@ -557,7 +570,13 @@ impl PackInstaller { fn validate_remote_host(&self, host: &str) -> Result<()> { let normalized_host = host.to_ascii_lowercase(); - if normalized_host == "localhost" { + let host_is_allowlisted = self + .allowed_remote_hosts + .as_ref() + .map(|set| set.contains(&normalized_host)) + .unwrap_or(false); + + if normalized_host == "localhost" && !host_is_allowlisted { return Err(Error::validation(format!( "Remote host is not allowed: {}", host @@ -995,6 +1014,36 @@ mod tests { assert!(hosts.contains("cdn.example.com")); } + #[tokio::test] + async fn test_validate_remote_url_allows_allowlisted_localhost() { + let temp_dir = std::env::temp_dir().join("attune-test"); + let config = PackRegistryConfig { + allowed_source_hosts: vec!["localhost".to_string()], + allow_http: true, + ..Default::default() + }; + let installer = PackInstaller::new(&temp_dir, Some(config)).await.unwrap(); + + installer + .validate_remote_url("http://localhost:3000/example/repo.git") + .await + .unwrap(); + } + + #[test] + fn test_validate_remote_host_allows_allowlisted_localhost() { + let installer = PackInstaller { + temp_dir: std::env::temp_dir().join("attune-test"), + registry_client: None, + verify_checksums: false, + allow_http: false, + allowed_remote_hosts: Some(HashSet::from(["localhost".to_string()])), + progress_callback: None, + }; + + installer.validate_remote_host("localhost").unwrap(); + } + #[test] fn test_extract_git_host_from_scp_style_source() { assert_eq!( diff --git a/crates/worker/src/service.rs b/crates/worker/src/service.rs index e899704..06a4fae 100644 --- a/crates/worker/src/service.rs +++ b/crates/worker/src/service.rs @@ -170,16 +170,11 @@ impl WorkerService { // Initialize worker registration let registration = Arc::new(RwLock::new(WorkerRegistration::new(pool.clone(), &config))); - // Initialize artifact manager (legacy, for stdout/stderr log storage) - // nosemgrep: rust.actix.path-traversal.tainted-path.tainted-path -- Worker artifact/config directories come from trusted process configuration, not request data. - let artifact_base_dir = std::path::PathBuf::from( - config - .worker - .as_ref() - .and_then(|w| w.name.clone()) - .map(|name| format!("/tmp/attune/artifacts/{}", name)) - .unwrap_or_else(|| "/tmp/attune/artifacts".to_string()), - ); + // Initialize artifact manager for execution stdout/stderr/result storage. + // This must use the shared artifacts_dir so the API log streaming endpoints + // and artifact download routes can see the same files the worker writes. + // nosemgrep: rust.actix.path-traversal.tainted-path.tainted-path -- Artifact storage root is a trusted deployment configuration value. + let artifact_base_dir = std::path::PathBuf::from(&config.artifacts_dir); let artifact_manager = ArtifactManager::new(artifact_base_dir); artifact_manager.initialize().await?; diff --git a/deny.toml b/deny.toml index ea20db6..bf844ae 100644 --- a/deny.toml +++ b/deny.toml @@ -24,7 +24,6 @@ allow = [ "Unicode-3.0", "Zlib", "CC0-1.0", - "OpenSSL", "BSL-1.0", "MIT-0", "CDLA-Permissive-2.0", @@ -36,12 +35,36 @@ wildcards = "allow" highlight = "all" deny = [] skip = [ - "winnow@0.6.26", - "winnow@0.7.15", - "windows_x86_64_msvc@0.42.2", - "windows_x86_64_msvc@0.48.5", - "windows_x86_64_msvc@0.52.6", - "windows_x86_64_msvc@0.53.1", + "base64", + "core-foundation", + "cpufeatures", + "darling", + "darling_core", + "darling_macro", + "foldhash", + "getrandom", + "hashbrown", + "nom", + "r-efi", + "rand", + "rand_chacha", + "rand_core", + "reqwest", + "thiserror", + "thiserror-impl", + "wasm-streams", + "webpki-roots", + "windows-sys", + "windows-targets", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", + "winnow" ] skip-tree = [] diff --git a/docker-compose.yaml b/docker-compose.yaml index a834007..17b157d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -282,7 +282,53 @@ services: args: SERVICE: executor BUILDKIT_INLINE_CACHE: 1 - container_name: attune-executor + container_name: attune-executor-1 + environment: + RUST_LOG: info + ATTUNE_CONFIG: /opt/attune/config/config.yaml + ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} + ATTUNE__SECURITY__ENCRYPTION_KEY: ${ENCRYPTION_KEY:-docker-dev-encryption-key-please-change-in-production-32plus} + ATTUNE__DATABASE__URL: postgresql://attune:attune@postgres:5432/attune + ATTUNE__MESSAGE_QUEUE__URL: amqp://attune:attune@rabbitmq:5672 + ATTUNE__REDIS__URL: redis://redis:6379 + ATTUNE__WORKER__WORKER_TYPE: container + volumes: + - ${ATTUNE_DOCKER_CONFIG_PATH:-./config.docker.yaml}:/opt/attune/config/config.yaml:ro + - packs_data:/opt/attune/packs:ro + - ./packs.dev:/opt/attune/packs.dev:rw + - artifacts_data:/opt/attune/artifacts:ro + - executor_logs:/opt/attune/logs + depends_on: + init-packs: + condition: service_completed_successfully + init-user: + condition: service_completed_successfully + migrations: + condition: service_completed_successfully + postgres: + condition: service_healthy + rabbitmq: + condition: service_healthy + redis: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "kill -0 1 || exit 1"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 20s + networks: + - attune-network + restart: unless-stopped + + executor-2: + build: + context: . + dockerfile: docker/Dockerfile.optimized + args: + SERVICE: executor + BUILDKIT_INLINE_CACHE: 1 + container_name: attune-executor-2 environment: RUST_LOG: info ATTUNE_CONFIG: /opt/attune/config/config.yaml diff --git a/docs/cli/cli.md b/docs/cli/cli.md index cd91265..b8bda1a 100644 --- a/docs/cli/cli.md +++ b/docs/cli/cli.md @@ -133,11 +133,11 @@ attune action execute core.echo --param message="Hello" --param count=3 # With JSON parameters attune action execute core.echo --params-json '{"message": "Hello", "count": 5}' -# Wait for completion -attune action execute core.long_task --wait +# Watch until completion +attune action execute core.long_task --watch -# Wait with timeout -attune action execute core.long_task --wait --timeout 600 +# Watch with timeout +attune action execute core.long_task --watch --timeout 600 ``` ### Rule Management @@ -548,4 +548,4 @@ Potential future features: - [Main README](../README.md) - [API Documentation](api-overview.md) - [Pack Development](packs.md) -- [Configuration Guide](configuration.md) \ No newline at end of file +- [Configuration Guide](configuration.md)