fixing tests, making clippy happy
Some checks failed
CI / Rustfmt (push) Successful in 19s
CI / Cargo Audit & Deny (push) Successful in 33s
CI / Security Blocking Checks (push) Successful in 5s
CI / Web Advisory Checks (push) Successful in 28s
CI / Web Blocking Checks (push) Successful in 52s
Publish Images / Resolve Publish Metadata (push) Successful in 0s
CI / Security Advisory Checks (push) Successful in 23s
CI / Clippy (push) Successful in 2m4s
Publish Images / Publish Docker Dist Bundle (push) Successful in 4s
Publish Images / Publish web (amd64) (push) Successful in 45s
Publish Images / Publish web (arm64) (push) Successful in 3m32s
CI / Tests (push) Failing after 8m25s
Publish Images / Build Rust Bundles (arm64) (push) Successful in 12m12s
Publish Images / Build Rust Bundles (amd64) (push) Successful in 12m39s
Publish Images / Publish agent (amd64) (push) Successful in 26s
Publish Images / Publish executor (amd64) (push) Successful in 40s
Publish Images / Publish api (amd64) (push) Successful in 30s
Publish Images / Publish notifier (amd64) (push) Successful in 41s
Publish Images / Publish agent (arm64) (push) Successful in 52s
Publish Images / Publish api (arm64) (push) Successful in 1m56s
Publish Images / Publish executor (arm64) (push) Successful in 1m57s
Publish Images / Publish notifier (arm64) (push) Successful in 1m50s
Publish Images / Publish manifest attune/agent (push) Successful in 15s
Publish Images / Publish manifest attune/api (push) Failing after 30s
Publish Images / Publish manifest attune/executor (push) Successful in 42s
Publish Images / Publish manifest attune/web (push) Failing after 17s
Publish Images / Publish manifest attune/notifier (push) Failing after 14m44s

This commit is contained in:
2026-04-02 09:17:21 -05:00
parent b34617ded1
commit 8278030699
8 changed files with 144 additions and 81 deletions

View File

@@ -159,6 +159,23 @@ struct StreamWatchHandle {
handle: tokio::task::JoinHandle<()>,
}
#[derive(Clone)]
struct StreamWatchConfig {
base_url: String,
token: String,
execution_id: i64,
prefix: Option<String>,
verbose: bool,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Option<Arc<AtomicBool>>,
}
struct StreamLogTask {
stream_name: &'static str,
offset: Arc<AtomicU64>,
config: StreamWatchConfig,
}
impl From<RestExecution> for ExecutionSummary {
fn from(e: RestExecution) -> Self {
Self {
@@ -288,25 +305,27 @@ async fn watch_execution_output(
match root_watch.as_mut() {
Some(state) => restart_finished_streams(
&mut state.stream_handles,
&base_url,
token,
execution_id,
None,
verbose,
delivered_output.clone(),
Some(root_stdout_completed.clone()),
&StreamWatchConfig {
base_url: base_url.clone(),
token,
execution_id,
prefix: None,
verbose,
delivered_output: delivered_output.clone(),
root_stdout_completed: Some(root_stdout_completed.clone()),
},
),
None => {
root_watch = Some(RootWatchState {
stream_handles: spawn_execution_log_streams(
&base_url,
stream_handles: spawn_execution_log_streams(StreamWatchConfig {
base_url: base_url.clone(),
token,
execution_id,
None,
verbose,
delivered_output.clone(),
Some(root_stdout_completed.clone()),
),
prefix: None,
delivered_output: delivered_output.clone(),
root_stdout_completed: Some(root_stdout_completed.clone()),
}),
});
}
}
@@ -327,15 +346,15 @@ async fn watch_execution_output(
.auth_token()
.map(str::to_string)
.map(|token| {
spawn_execution_log_streams(
&base_url,
spawn_execution_log_streams(StreamWatchConfig {
base_url: base_url.clone(),
token,
child.id,
Some(label.clone()),
execution_id: child.id,
prefix: Some(label.clone()),
verbose,
delivered_output.clone(),
None,
)
delivered_output: delivered_output.clone(),
root_stdout_completed: None,
})
})
.unwrap_or_default();
ChildWatchState {
@@ -355,13 +374,15 @@ async fn watch_execution_output(
if let Some(token) = client.auth_token().map(str::to_string) {
restart_finished_streams(
&mut entry.stream_handles,
&base_url,
token,
child.id,
Some(entry.label.clone()),
verbose,
delivered_output.clone(),
None,
&StreamWatchConfig {
base_url: base_url.clone(),
token,
execution_id: child.id,
prefix: Some(entry.label.clone()),
verbose,
delivered_output: delivered_output.clone(),
root_stdout_completed: None,
},
);
}
}
@@ -392,37 +413,31 @@ async fn watch_execution_output(
Ok(())
}
fn spawn_execution_log_streams(
base_url: &str,
token: String,
execution_id: i64,
prefix: Option<String>,
verbose: bool,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Option<Arc<AtomicBool>>,
) -> Vec<StreamWatchHandle> {
fn spawn_execution_log_streams(config: StreamWatchConfig) -> Vec<StreamWatchHandle> {
["stdout", "stderr"]
.into_iter()
.map(|stream_name| {
let offset = Arc::new(AtomicU64::new(0));
let completion_flag = if stream_name == "stdout" {
root_stdout_completed.clone()
config.root_stdout_completed.clone()
} else {
None
};
StreamWatchHandle {
stream_name,
handle: tokio::spawn(stream_execution_log(
base_url.to_string(),
token.clone(),
execution_id,
handle: tokio::spawn(stream_execution_log(StreamLogTask {
stream_name,
prefix.clone(),
verbose,
offset.clone(),
delivered_output.clone(),
completion_flag,
)),
offset: offset.clone(),
config: StreamWatchConfig {
base_url: config.base_url.clone(),
token: config.token.clone(),
execution_id: config.execution_id,
prefix: config.prefix.clone(),
verbose: config.verbose,
delivered_output: config.delivered_output.clone(),
root_stdout_completed: completion_flag,
},
})),
offset,
}
})
@@ -433,35 +448,28 @@ fn streams_need_restart(handles: &[StreamWatchHandle]) -> bool {
handles.is_empty() || handles.iter().any(|handle| handle.handle.is_finished())
}
fn restart_finished_streams(
handles: &mut Vec<StreamWatchHandle>,
base_url: &str,
token: String,
execution_id: i64,
prefix: Option<String>,
verbose: bool,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Option<Arc<AtomicBool>>,
) {
fn restart_finished_streams(handles: &mut [StreamWatchHandle], config: &StreamWatchConfig) {
for stream in handles.iter_mut() {
if stream.handle.is_finished() {
let offset = stream.offset.clone();
let completion_flag = if stream.stream_name == "stdout" {
root_stdout_completed.clone()
config.root_stdout_completed.clone()
} else {
None
};
stream.handle = tokio::spawn(stream_execution_log(
base_url.to_string(),
token.clone(),
execution_id,
stream.stream_name,
prefix.clone(),
verbose,
stream.handle = tokio::spawn(stream_execution_log(StreamLogTask {
stream_name: stream.stream_name,
offset,
delivered_output.clone(),
completion_flag,
));
config: StreamWatchConfig {
base_url: config.base_url.clone(),
token: config.token.clone(),
execution_id: config.execution_id,
prefix: config.prefix.clone(),
verbose: config.verbose,
delivered_output: config.delivered_output.clone(),
root_stdout_completed: completion_flag,
},
}));
}
}
}
@@ -837,17 +845,22 @@ fn format_task_label(
}
}
async fn stream_execution_log(
base_url: String,
token: String,
execution_id: i64,
stream_name: &'static str,
prefix: Option<String>,
verbose: bool,
offset: Arc<AtomicU64>,
delivered_output: Arc<AtomicBool>,
root_stdout_completed: Option<Arc<AtomicBool>>,
) {
async fn stream_execution_log(task: StreamLogTask) {
let StreamLogTask {
stream_name,
offset,
config:
StreamWatchConfig {
base_url,
token,
execution_id,
prefix,
verbose,
delivered_output,
root_stdout_completed,
},
} = task;
let mut stream_url = match url::Url::parse(&format!(
"{}/api/v1/executions/{}/logs/{}/stream",
base_url.trim_end_matches('/'),
@@ -913,7 +926,7 @@ async fn stream_execution_log(
}
flush_stream_chunk(prefix.as_deref(), &mut carry);
let _ = event_source.close();
event_source.close();
}
fn print_stream_chunk(prefix: Option<&str>, chunk: &str, carry: &mut String) {

View File

@@ -856,7 +856,7 @@ fn extract_git_host(raw_url: &str) -> Option<String> {
fn archive_filename_from_url(url: &Url) -> String {
let raw_name = url
.path_segments()
.and_then(|segments| segments.filter(|segment| !segment.is_empty()).next_back())
.and_then(|mut segments| segments.rfind(|segment| !segment.is_empty()))
.unwrap_or("archive.bin");
let sanitized: String = raw_name

View File

@@ -200,6 +200,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -233,6 +235,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),

View File

@@ -1146,6 +1146,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024,
max_stderr_bytes: 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1181,6 +1183,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024,
max_stderr_bytes: 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1216,6 +1220,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024,
max_stderr_bytes: 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1307,6 +1313,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024 * 1024,
max_stderr_bytes: 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1366,6 +1374,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024 * 1024,
max_stderr_bytes: 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1445,6 +1455,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024 * 1024,
max_stderr_bytes: 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1487,6 +1499,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024 * 1024,
max_stderr_bytes: 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1534,6 +1548,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024 * 1024,
max_stderr_bytes: 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1585,6 +1601,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024 * 1024,
max_stderr_bytes: 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),
@@ -1694,6 +1712,8 @@ mod tests {
selected_runtime_version: None,
max_stdout_bytes: 1024 * 1024,
max_stderr_bytes: 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),

View File

@@ -734,6 +734,8 @@ mod tests {
1024 * 1024,
OutputFormat::Text,
Some(cancel_token),
None,
None,
)
.await
.unwrap();

View File

@@ -86,6 +86,8 @@ fn make_context(action_ref: &str, entry_point: &str, runtime_name: &str) -> Exec
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: ParameterDelivery::default(),
parameter_format: ParameterFormat::default(),
output_format: OutputFormat::default(),

View File

@@ -80,6 +80,8 @@ fn make_python_context(
selected_runtime_version: None,
max_stdout_bytes,
max_stderr_bytes,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
@@ -164,6 +166,8 @@ done
selected_runtime_version: None,
max_stdout_bytes: 400, // Small limit
max_stderr_bytes: 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
@@ -329,6 +333,8 @@ async fn test_shell_process_runtime_truncation() {
selected_runtime_version: None,
max_stdout_bytes: 500,
max_stderr_bytes: 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),

View File

@@ -112,6 +112,8 @@ print(json.dumps(result))
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::Json,
@@ -207,6 +209,8 @@ echo "SECURITY_PASS: Secrets not in inherited environment and accessible via mer
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
@@ -272,6 +276,8 @@ print(json.dumps({'secret_a': secrets.get('secret_a')}))
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::Json,
@@ -318,6 +324,8 @@ print(json.dumps({
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::Json,
@@ -373,6 +381,8 @@ print("ok")
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
@@ -425,6 +435,8 @@ fi
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
@@ -507,6 +519,8 @@ echo "PASS: No secrets in environment"
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::default(),
@@ -588,6 +602,8 @@ print(json.dumps({"leaked": leaked}))
selected_runtime_version: None,
max_stdout_bytes: 10 * 1024 * 1024,
max_stderr_bytes: 10 * 1024 * 1024,
stdout_log_path: None,
stderr_log_path: None,
parameter_delivery: attune_worker::runtime::ParameterDelivery::default(),
parameter_format: attune_worker::runtime::ParameterFormat::default(),
output_format: attune_worker::runtime::OutputFormat::Json,