From 82780306991a4fc63bbaad72770166adc3a7bf0e Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Thu, 2 Apr 2026 09:17:21 -0500 Subject: [PATCH] fixing tests, making clippy happy --- crates/cli/src/wait.rs | 173 ++++++++++-------- crates/common/src/pack_registry/installer.rs | 2 +- crates/worker/src/runtime/local.rs | 4 + crates/worker/src/runtime/process.rs | 20 ++ crates/worker/src/runtime/process_executor.rs | 2 + .../worker/tests/dependency_isolation_test.rs | 2 + crates/worker/tests/log_truncation_test.rs | 6 + crates/worker/tests/security_tests.rs | 16 ++ 8 files changed, 144 insertions(+), 81 deletions(-) diff --git a/crates/cli/src/wait.rs b/crates/cli/src/wait.rs index 57caa37..7e7623c 100644 --- a/crates/cli/src/wait.rs +++ b/crates/cli/src/wait.rs @@ -159,6 +159,23 @@ struct StreamWatchHandle { handle: tokio::task::JoinHandle<()>, } +#[derive(Clone)] +struct StreamWatchConfig { + base_url: String, + token: String, + execution_id: i64, + prefix: Option, + verbose: bool, + delivered_output: Arc, + root_stdout_completed: Option>, +} + +struct StreamLogTask { + stream_name: &'static str, + offset: Arc, + config: StreamWatchConfig, +} + impl From for ExecutionSummary { fn from(e: RestExecution) -> Self { Self { @@ -288,25 +305,27 @@ async fn watch_execution_output( match root_watch.as_mut() { Some(state) => restart_finished_streams( &mut state.stream_handles, - &base_url, - token, - execution_id, - None, - verbose, - delivered_output.clone(), - Some(root_stdout_completed.clone()), + &StreamWatchConfig { + base_url: base_url.clone(), + token, + execution_id, + prefix: None, + verbose, + delivered_output: delivered_output.clone(), + root_stdout_completed: Some(root_stdout_completed.clone()), + }, ), None => { root_watch = Some(RootWatchState { - stream_handles: spawn_execution_log_streams( - &base_url, + stream_handles: spawn_execution_log_streams(StreamWatchConfig { + base_url: base_url.clone(), token, execution_id, - None, verbose, - delivered_output.clone(), - Some(root_stdout_completed.clone()), - ), + prefix: None, + delivered_output: delivered_output.clone(), + root_stdout_completed: Some(root_stdout_completed.clone()), + }), }); } } @@ -327,15 +346,15 @@ async fn watch_execution_output( .auth_token() .map(str::to_string) .map(|token| { - spawn_execution_log_streams( - &base_url, + spawn_execution_log_streams(StreamWatchConfig { + base_url: base_url.clone(), token, - child.id, - Some(label.clone()), + execution_id: child.id, + prefix: Some(label.clone()), verbose, - delivered_output.clone(), - None, - ) + delivered_output: delivered_output.clone(), + root_stdout_completed: None, + }) }) .unwrap_or_default(); ChildWatchState { @@ -355,13 +374,15 @@ async fn watch_execution_output( if let Some(token) = client.auth_token().map(str::to_string) { restart_finished_streams( &mut entry.stream_handles, - &base_url, - token, - child.id, - Some(entry.label.clone()), - verbose, - delivered_output.clone(), - None, + &StreamWatchConfig { + base_url: base_url.clone(), + token, + execution_id: child.id, + prefix: Some(entry.label.clone()), + verbose, + delivered_output: delivered_output.clone(), + root_stdout_completed: None, + }, ); } } @@ -392,37 +413,31 @@ async fn watch_execution_output( Ok(()) } -fn spawn_execution_log_streams( - base_url: &str, - token: String, - execution_id: i64, - prefix: Option, - verbose: bool, - delivered_output: Arc, - root_stdout_completed: Option>, -) -> Vec { +fn spawn_execution_log_streams(config: StreamWatchConfig) -> Vec { ["stdout", "stderr"] .into_iter() .map(|stream_name| { let offset = Arc::new(AtomicU64::new(0)); let completion_flag = if stream_name == "stdout" { - root_stdout_completed.clone() + config.root_stdout_completed.clone() } else { None }; StreamWatchHandle { stream_name, - handle: tokio::spawn(stream_execution_log( - base_url.to_string(), - token.clone(), - execution_id, + handle: tokio::spawn(stream_execution_log(StreamLogTask { stream_name, - prefix.clone(), - verbose, - offset.clone(), - delivered_output.clone(), - completion_flag, - )), + offset: offset.clone(), + config: StreamWatchConfig { + base_url: config.base_url.clone(), + token: config.token.clone(), + execution_id: config.execution_id, + prefix: config.prefix.clone(), + verbose: config.verbose, + delivered_output: config.delivered_output.clone(), + root_stdout_completed: completion_flag, + }, + })), offset, } }) @@ -433,35 +448,28 @@ fn streams_need_restart(handles: &[StreamWatchHandle]) -> bool { handles.is_empty() || handles.iter().any(|handle| handle.handle.is_finished()) } -fn restart_finished_streams( - handles: &mut Vec, - base_url: &str, - token: String, - execution_id: i64, - prefix: Option, - verbose: bool, - delivered_output: Arc, - root_stdout_completed: Option>, -) { +fn restart_finished_streams(handles: &mut [StreamWatchHandle], config: &StreamWatchConfig) { for stream in handles.iter_mut() { if stream.handle.is_finished() { let offset = stream.offset.clone(); let completion_flag = if stream.stream_name == "stdout" { - root_stdout_completed.clone() + config.root_stdout_completed.clone() } else { None }; - stream.handle = tokio::spawn(stream_execution_log( - base_url.to_string(), - token.clone(), - execution_id, - stream.stream_name, - prefix.clone(), - verbose, + stream.handle = tokio::spawn(stream_execution_log(StreamLogTask { + stream_name: stream.stream_name, offset, - delivered_output.clone(), - completion_flag, - )); + config: StreamWatchConfig { + base_url: config.base_url.clone(), + token: config.token.clone(), + execution_id: config.execution_id, + prefix: config.prefix.clone(), + verbose: config.verbose, + delivered_output: config.delivered_output.clone(), + root_stdout_completed: completion_flag, + }, + })); } } } @@ -837,17 +845,22 @@ fn format_task_label( } } -async fn stream_execution_log( - base_url: String, - token: String, - execution_id: i64, - stream_name: &'static str, - prefix: Option, - verbose: bool, - offset: Arc, - delivered_output: Arc, - root_stdout_completed: Option>, -) { +async fn stream_execution_log(task: StreamLogTask) { + let StreamLogTask { + stream_name, + offset, + config: + StreamWatchConfig { + base_url, + token, + execution_id, + prefix, + verbose, + delivered_output, + root_stdout_completed, + }, + } = task; + let mut stream_url = match url::Url::parse(&format!( "{}/api/v1/executions/{}/logs/{}/stream", base_url.trim_end_matches('/'), @@ -913,7 +926,7 @@ async fn stream_execution_log( } flush_stream_chunk(prefix.as_deref(), &mut carry); - let _ = event_source.close(); + event_source.close(); } fn print_stream_chunk(prefix: Option<&str>, chunk: &str, carry: &mut String) { diff --git a/crates/common/src/pack_registry/installer.rs b/crates/common/src/pack_registry/installer.rs index 92bf04b..8c00090 100644 --- a/crates/common/src/pack_registry/installer.rs +++ b/crates/common/src/pack_registry/installer.rs @@ -856,7 +856,7 @@ fn extract_git_host(raw_url: &str) -> Option { fn archive_filename_from_url(url: &Url) -> String { let raw_name = url .path_segments() - .and_then(|segments| segments.filter(|segment| !segment.is_empty()).next_back()) + .and_then(|mut segments| segments.rfind(|segment| !segment.is_empty())) .unwrap_or("archive.bin"); let sanitized: String = raw_name diff --git a/crates/worker/src/runtime/local.rs b/crates/worker/src/runtime/local.rs index b6965f8..02445aa 100644 --- a/crates/worker/src/runtime/local.rs +++ b/crates/worker/src/runtime/local.rs @@ -200,6 +200,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -233,6 +235,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), diff --git a/crates/worker/src/runtime/process.rs b/crates/worker/src/runtime/process.rs index ceada55..4f87dd6 100644 --- a/crates/worker/src/runtime/process.rs +++ b/crates/worker/src/runtime/process.rs @@ -1146,6 +1146,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024, max_stderr_bytes: 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1181,6 +1183,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024, max_stderr_bytes: 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1216,6 +1220,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024, max_stderr_bytes: 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1307,6 +1313,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024 * 1024, max_stderr_bytes: 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1366,6 +1374,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024 * 1024, max_stderr_bytes: 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1445,6 +1455,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024 * 1024, max_stderr_bytes: 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1487,6 +1499,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024 * 1024, max_stderr_bytes: 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1534,6 +1548,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024 * 1024, max_stderr_bytes: 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1585,6 +1601,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024 * 1024, max_stderr_bytes: 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), @@ -1694,6 +1712,8 @@ mod tests { selected_runtime_version: None, max_stdout_bytes: 1024 * 1024, max_stderr_bytes: 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), diff --git a/crates/worker/src/runtime/process_executor.rs b/crates/worker/src/runtime/process_executor.rs index 89cbdb3..2238b14 100644 --- a/crates/worker/src/runtime/process_executor.rs +++ b/crates/worker/src/runtime/process_executor.rs @@ -734,6 +734,8 @@ mod tests { 1024 * 1024, OutputFormat::Text, Some(cancel_token), + None, + None, ) .await .unwrap(); diff --git a/crates/worker/tests/dependency_isolation_test.rs b/crates/worker/tests/dependency_isolation_test.rs index cde0d1f..0619d30 100644 --- a/crates/worker/tests/dependency_isolation_test.rs +++ b/crates/worker/tests/dependency_isolation_test.rs @@ -86,6 +86,8 @@ fn make_context(action_ref: &str, entry_point: &str, runtime_name: &str) -> Exec selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: ParameterDelivery::default(), parameter_format: ParameterFormat::default(), output_format: OutputFormat::default(), diff --git a/crates/worker/tests/log_truncation_test.rs b/crates/worker/tests/log_truncation_test.rs index a8b32a8..47cdbfd 100644 --- a/crates/worker/tests/log_truncation_test.rs +++ b/crates/worker/tests/log_truncation_test.rs @@ -80,6 +80,8 @@ fn make_python_context( selected_runtime_version: None, max_stdout_bytes, max_stderr_bytes, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), @@ -164,6 +166,8 @@ done selected_runtime_version: None, max_stdout_bytes: 400, // Small limit max_stderr_bytes: 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), @@ -329,6 +333,8 @@ async fn test_shell_process_runtime_truncation() { selected_runtime_version: None, max_stdout_bytes: 500, max_stderr_bytes: 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), diff --git a/crates/worker/tests/security_tests.rs b/crates/worker/tests/security_tests.rs index d2d355b..876f570 100644 --- a/crates/worker/tests/security_tests.rs +++ b/crates/worker/tests/security_tests.rs @@ -112,6 +112,8 @@ print(json.dumps(result)) selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::Json, @@ -207,6 +209,8 @@ echo "SECURITY_PASS: Secrets not in inherited environment and accessible via mer selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), @@ -272,6 +276,8 @@ print(json.dumps({'secret_a': secrets.get('secret_a')})) selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::Json, @@ -318,6 +324,8 @@ print(json.dumps({ selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::Json, @@ -373,6 +381,8 @@ print("ok") selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), @@ -425,6 +435,8 @@ fi selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), @@ -507,6 +519,8 @@ echo "PASS: No secrets in environment" selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::default(), @@ -588,6 +602,8 @@ print(json.dumps({"leaked": leaked})) selected_runtime_version: None, max_stdout_bytes: 10 * 1024 * 1024, max_stderr_bytes: 10 * 1024 * 1024, + stdout_log_path: None, + stderr_log_path: None, parameter_delivery: attune_worker::runtime::ParameterDelivery::default(), parameter_format: attune_worker::runtime::ParameterFormat::default(), output_format: attune_worker::runtime::OutputFormat::Json,