From 8af8c1af9c197fc6377ab009ab3164b86a42f65d Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Mon, 23 Mar 2026 12:49:15 -0500 Subject: [PATCH] first iteration of agent-style worker and sensor containers. --- Makefile | 16 +- charts/attune/templates/NOTES.txt | 3 + charts/attune/templates/applications.yaml | 19 + charts/attune/values.yaml | 6 +- crates/api/src/routes/runtimes.rs | 1 + crates/common/src/agent_bootstrap.rs | 107 ++++ crates/common/src/agent_runtime_detection.rs | 306 ++++++++++ crates/common/src/lib.rs | 2 + crates/common/src/models.rs | 1 + crates/common/src/pack_registry/loader.rs | 12 + crates/common/src/repositories/runtime.rs | 35 +- crates/common/src/runtime_detection.rs | 200 ++----- crates/common/tests/helpers.rs | 1 + .../common/tests/repository_runtime_tests.rs | 2 + .../common/tests/repository_worker_tests.rs | 1 + crates/executor/src/scheduler.rs | 208 ++++++- .../tests/fifo_ordering_integration_test.rs | 1 + .../executor/tests/policy_enforcer_tests.rs | 1 + crates/sensor/Cargo.toml | 4 + crates/sensor/src/agent_main.rs | 79 +++ crates/sensor/src/lib.rs | 1 + crates/sensor/src/main.rs | 108 +--- crates/sensor/src/sensor_manager.rs | 170 +++++- .../sensor/src/sensor_worker_registration.rs | 57 ++ crates/sensor/src/startup.rs | 119 ++++ crates/worker/src/agent_main.rs | 122 +--- crates/worker/src/dynamic_runtime.rs | 43 +- crates/worker/src/env_setup.rs | 12 +- crates/worker/src/registration.rs | 79 +++ crates/worker/src/runtime_detect.rs | 548 +----------------- crates/worker/src/service.rs | 8 +- crates/worker/src/version_verify.rs | 4 +- docker-compose.agent.yaml | 4 +- docker-compose.yaml | 90 +-- docker/Dockerfile.agent | 37 +- docs/plans/sensor-agent-injection.md | 266 +++++++++ migrations/20250101000002_pack_system.sql | 3 + packs/core/runtimes/go.yaml | 1 + packs/core/runtimes/java.yaml | 1 + packs/core/runtimes/native.yaml | 1 + packs/core/runtimes/nodejs.yaml | 1 + packs/core/runtimes/perl.yaml | 1 + packs/core/runtimes/python.yaml | 1 + packs/core/runtimes/r.yaml | 3 +- packs/core/runtimes/ruby.yaml | 1 + packs/core/runtimes/shell.yaml | 1 + scripts/load_core_pack.py | 9 +- 47 files changed, 1667 insertions(+), 1029 deletions(-) create mode 100644 crates/common/src/agent_bootstrap.rs create mode 100644 crates/common/src/agent_runtime_detection.rs create mode 100644 crates/sensor/src/agent_main.rs create mode 100644 crates/sensor/src/startup.rs create mode 100644 docs/plans/sensor-agent-injection.md diff --git a/Makefile b/Makefile index f29d251..5d2b5ee 100644 --- a/Makefile +++ b/Makefile @@ -265,11 +265,15 @@ docker-build-worker-full: build-agent: @echo "Installing musl target (if not already installed)..." rustup target add x86_64-unknown-linux-musl 2>/dev/null || true - @echo "Building statically-linked agent binary..." - SQLX_OFFLINE=true cargo build --release --target x86_64-unknown-linux-musl --bin attune-agent + @echo "Building statically-linked worker and sensor agent binaries..." + SQLX_OFFLINE=true cargo build --release --target x86_64-unknown-linux-musl --bin attune-agent --bin attune-sensor-agent strip target/x86_64-unknown-linux-musl/release/attune-agent - @echo "✅ Agent binary built: target/x86_64-unknown-linux-musl/release/attune-agent" + strip target/x86_64-unknown-linux-musl/release/attune-sensor-agent + @echo "✅ Agent binaries built:" + @echo " - target/x86_64-unknown-linux-musl/release/attune-agent" + @echo " - target/x86_64-unknown-linux-musl/release/attune-sensor-agent" @ls -lh target/x86_64-unknown-linux-musl/release/attune-agent + @ls -lh target/x86_64-unknown-linux-musl/release/attune-sensor-agent docker-build-agent: @echo "Building agent Docker image (statically-linked binary)..." @@ -282,6 +286,12 @@ run-agent: run-agent-release: cargo run --bin attune-agent --release +run-sensor-agent: + cargo run --bin attune-sensor-agent + +run-sensor-agent-release: + cargo run --bin attune-sensor-agent --release + docker-up: @echo "Starting all services with Docker Compose..." docker compose up -d diff --git a/charts/attune/templates/NOTES.txt b/charts/attune/templates/NOTES.txt index 083382d..298f7bd 100644 --- a/charts/attune/templates/NOTES.txt +++ b/charts/attune/templates/NOTES.txt @@ -13,6 +13,9 @@ Each agent worker uses an init container to copy the statically-linked attune-agent binary into the worker pod via an emptyDir volume. The agent auto-detects available runtimes in the container and registers with Attune. +The default sensor deployment also uses the same injection pattern, copying +`attune-sensor-agent` into the pod before starting a stock runtime image. + To add more agent workers, append entries to `agentWorkers` in your values: agentWorkers: diff --git a/charts/attune/templates/applications.yaml b/charts/attune/templates/applications.yaml index b527f97..ce88240 100644 --- a/charts/attune/templates/applications.yaml +++ b/charts/attune/templates/applications.yaml @@ -304,7 +304,15 @@ spec: imagePullSecrets: {{- toYaml .Values.global.imagePullSecrets | nindent 8 }} {{- end }} + terminationGracePeriodSeconds: 45 initContainers: + - name: sensor-agent-loader + image: {{ include "attune.image" (dict "root" . "image" .Values.images.agent) }} + imagePullPolicy: {{ .Values.images.agent.pullPolicy }} + command: ["cp", "/usr/local/bin/attune-sensor-agent", "/opt/attune/agent/attune-sensor-agent"] + volumeMounts: + - name: agent-bin + mountPath: /opt/attune/agent - name: wait-for-schema image: postgres:16-alpine command: ["/bin/sh", "-ec"] @@ -333,6 +341,7 @@ spec: - name: sensor image: {{ include "attune.image" (dict "root" . "image" .Values.images.sensor) }} imagePullPolicy: {{ .Values.images.sensor.pullPolicy }} + command: ["/opt/attune/agent/attune-sensor-agent"] envFrom: - secretRef: name: {{ include "attune.secretName" . }} @@ -343,23 +352,33 @@ spec: value: {{ .Values.database.schema | quote }} - name: ATTUNE__WORKER__WORKER_TYPE value: container + - name: ATTUNE_SENSOR_RUNTIMES + value: {{ .Values.sensor.runtimes | quote }} - name: ATTUNE_API_URL value: http://{{ include "attune.apiServiceName" . }}:{{ .Values.api.service.port }} - name: ATTUNE_MQ_URL value: {{ include "attune.rabbitmqUrl" . | quote }} - name: ATTUNE_PACKS_BASE_DIR value: /opt/attune/packs + - name: RUST_LOG + value: {{ .Values.sensor.logLevel | quote }} resources: {{- toYaml .Values.sensor.resources | nindent 12 }} volumeMounts: + - name: agent-bin + mountPath: /opt/attune/agent + readOnly: true - name: config mountPath: /opt/attune/config.yaml subPath: config.yaml - name: packs mountPath: /opt/attune/packs + readOnly: true - name: runtime-envs mountPath: /opt/attune/runtime_envs volumes: + - name: agent-bin + emptyDir: {} - name: config configMap: name: {{ include "attune.fullname" . }}-config diff --git a/charts/attune/values.yaml b/charts/attune/values.yaml index 79d9603..46a215b 100644 --- a/charts/attune/values.yaml +++ b/charts/attune/values.yaml @@ -108,8 +108,8 @@ images: tag: "" pullPolicy: IfNotPresent sensor: - repository: attune-sensor - tag: "" + repository: nikolaik/python-nodejs + tag: python3.12-nodejs22-slim pullPolicy: IfNotPresent notifier: repository: attune-notifier @@ -166,6 +166,8 @@ worker: sensor: replicaCount: 1 + runtimes: shell,python,node,native + logLevel: debug resources: {} notifier: diff --git a/crates/api/src/routes/runtimes.rs b/crates/api/src/routes/runtimes.rs index 96aba08..c37d160 100644 --- a/crates/api/src/routes/runtimes.rs +++ b/crates/api/src/routes/runtimes.rs @@ -176,6 +176,7 @@ pub async fn create_runtime( pack_ref, description: request.description, name: request.name, + aliases: vec![], distributions: request.distributions, installation: request.installation, execution_config: request.execution_config, diff --git a/crates/common/src/agent_bootstrap.rs b/crates/common/src/agent_bootstrap.rs new file mode 100644 index 0000000..bcac5e5 --- /dev/null +++ b/crates/common/src/agent_bootstrap.rs @@ -0,0 +1,107 @@ +//! Shared bootstrap helpers for injected agent binaries. + +use crate::agent_runtime_detection::{ + detect_runtimes, format_as_env_value, print_detection_report_for_env, DetectedRuntime, +}; +use tracing::{info, warn}; + +#[derive(Debug, Clone)] +pub struct RuntimeBootstrapResult { + pub runtimes_override: Option, + pub detected_runtimes: Option>, +} + +/// Detect runtimes and populate the agent runtime environment variable when needed. +/// +/// This must run before the Tokio runtime starts because it may mutate process +/// environment variables. +pub fn bootstrap_runtime_env(env_var_name: &str) -> RuntimeBootstrapResult { + let runtimes_override = std::env::var(env_var_name).ok(); + let mut detected_runtimes = None; + + if let Some(ref override_value) = runtimes_override { + info!( + "{} already set (override): {}", + env_var_name, override_value + ); + info!("Running auto-detection for override-specified runtimes..."); + + let detected = detect_runtimes(); + let override_names: Vec<&str> = override_value.split(',').map(|s| s.trim()).collect(); + + let filtered: Vec<_> = detected + .into_iter() + .filter(|rt| { + let lower_name = rt.name.to_ascii_lowercase(); + override_names + .iter() + .any(|ov| ov.to_ascii_lowercase() == lower_name) + }) + .collect(); + + if filtered.is_empty() { + warn!( + "None of the override runtimes ({}) were found on this system", + override_value + ); + } else { + info!( + "Matched {} override runtime(s) to detected interpreters:", + filtered.len() + ); + for rt in &filtered { + match &rt.version { + Some(ver) => info!(" ✓ {} — {} ({})", rt.name, rt.path, ver), + None => info!(" ✓ {} — {}", rt.name, rt.path), + } + } + detected_runtimes = Some(filtered); + } + } else { + info!("No {} override — running auto-detection...", env_var_name); + + let detected = detect_runtimes(); + + if detected.is_empty() { + warn!("No runtimes detected! The agent may not be able to execute any work."); + } else { + info!("Detected {} runtime(s):", detected.len()); + for rt in &detected { + match &rt.version { + Some(ver) => info!(" ✓ {} — {} ({})", rt.name, rt.path, ver), + None => info!(" ✓ {} — {}", rt.name, rt.path), + } + } + + let runtime_csv = format_as_env_value(&detected); + info!("Setting {}={}", env_var_name, runtime_csv); + std::env::set_var(env_var_name, &runtime_csv); + detected_runtimes = Some(detected); + } + } + + RuntimeBootstrapResult { + runtimes_override, + detected_runtimes, + } +} + +pub fn print_detect_only_report(env_var_name: &str, result: &RuntimeBootstrapResult) { + if result.runtimes_override.is_some() { + info!("--detect-only: re-running detection to show what is available on this system..."); + println!( + "NOTE: {} is set — auto-detection was skipped during normal startup.", + env_var_name + ); + println!(" Showing what auto-detection would find on this system:"); + println!(); + + let detected = detect_runtimes(); + print_detection_report_for_env(env_var_name, &detected); + } else if let Some(ref detected) = result.detected_runtimes { + print_detection_report_for_env(env_var_name, detected); + } else { + let detected = detect_runtimes(); + print_detection_report_for_env(env_var_name, &detected); + } +} diff --git a/crates/common/src/agent_runtime_detection.rs b/crates/common/src/agent_runtime_detection.rs new file mode 100644 index 0000000..9d69b21 --- /dev/null +++ b/crates/common/src/agent_runtime_detection.rs @@ -0,0 +1,306 @@ +//! Runtime auto-detection for injected Attune agent binaries. +//! +//! This module probes the local system directly for well-known interpreters, +//! without requiring database access. + +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::process::Command; +use tracing::{debug, info}; + +/// A runtime interpreter discovered on the local system. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DetectedRuntime { + /// Canonical runtime name (for example, "python" or "node"). + pub name: String, + + /// Absolute path to the interpreter binary. + pub path: String, + + /// Version string if the version command succeeded. + pub version: Option, +} + +impl fmt::Display for DetectedRuntime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.version { + Some(v) => write!(f, "{} ({}, v{})", self.name, self.path, v), + None => write!(f, "{} ({})", self.name, self.path), + } + } +} + +struct RuntimeCandidate { + name: &'static str, + binaries: &'static [&'static str], + version_args: &'static [&'static str], + version_parser: VersionParser, +} + +enum VersionParser { + SemverLike, + JavaStyle, +} + +fn candidates() -> Vec { + vec![ + RuntimeCandidate { + name: "shell", + binaries: &["bash", "sh"], + version_args: &["--version"], + version_parser: VersionParser::SemverLike, + }, + RuntimeCandidate { + name: "python", + binaries: &["python3", "python"], + version_args: &["--version"], + version_parser: VersionParser::SemverLike, + }, + RuntimeCandidate { + name: "node", + binaries: &["node", "nodejs"], + version_args: &["--version"], + version_parser: VersionParser::SemverLike, + }, + RuntimeCandidate { + name: "ruby", + binaries: &["ruby"], + version_args: &["--version"], + version_parser: VersionParser::SemverLike, + }, + RuntimeCandidate { + name: "go", + binaries: &["go"], + version_args: &["version"], + version_parser: VersionParser::SemverLike, + }, + RuntimeCandidate { + name: "java", + binaries: &["java"], + version_args: &["-version"], + version_parser: VersionParser::JavaStyle, + }, + RuntimeCandidate { + name: "r", + binaries: &["Rscript"], + version_args: &["--version"], + version_parser: VersionParser::SemverLike, + }, + RuntimeCandidate { + name: "perl", + binaries: &["perl"], + version_args: &["--version"], + version_parser: VersionParser::SemverLike, + }, + ] +} + +/// Detect available runtimes by probing the local system. +pub fn detect_runtimes() -> Vec { + info!("Starting runtime auto-detection..."); + + let mut detected = Vec::new(); + + for candidate in candidates() { + match detect_single_runtime(&candidate) { + Some(runtime) => { + info!(" ✓ Detected: {}", runtime); + detected.push(runtime); + } + None => { + debug!(" ✗ Not found: {}", candidate.name); + } + } + } + + info!( + "Runtime auto-detection complete: found {} runtime(s): [{}]", + detected.len(), + detected + .iter() + .map(|r| r.name.as_str()) + .collect::>() + .join(", ") + ); + + detected +} + +fn detect_single_runtime(candidate: &RuntimeCandidate) -> Option { + for binary in candidate.binaries { + if let Some(path) = which_binary(binary) { + let version = get_version(&path, candidate.version_args, &candidate.version_parser); + + return Some(DetectedRuntime { + name: candidate.name.to_string(), + path, + version, + }); + } + } + + None +} + +fn which_binary(binary: &str) -> Option { + if binary == "bash" || binary == "sh" { + let absolute_path = format!("/bin/{}", binary); + if std::path::Path::new(&absolute_path).exists() { + return Some(absolute_path); + } + } + + match Command::new("which").arg(binary).output() { + Ok(output) if output.status.success() => { + let path = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if path.is_empty() { + None + } else { + Some(path) + } + } + Ok(_) => None, + Err(e) => { + debug!("'which' command failed ({}), trying 'command -v'", e); + match Command::new("sh") + .args(["-c", &format!("command -v {}", binary)]) + .output() + { + Ok(output) if output.status.success() => { + let path = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if path.is_empty() { + None + } else { + Some(path) + } + } + _ => None, + } + } + } +} + +fn get_version(binary_path: &str, version_args: &[&str], parser: &VersionParser) -> Option { + let output = match Command::new(binary_path).args(version_args).output() { + Ok(output) => output, + Err(e) => { + debug!("Failed to run version command for {}: {}", binary_path, e); + return None; + } + }; + + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + let combined = format!("{}{}", stdout, stderr); + + match parser { + VersionParser::SemverLike => parse_semver_like(&combined), + VersionParser::JavaStyle => parse_java_version(&combined), + } +} + +fn parse_semver_like(output: &str) -> Option { + let re = regex::Regex::new(r"(?:v|go)?(\d+\.\d+(?:\.\d+)?)").ok()?; + re.captures(output) + .and_then(|captures| captures.get(1).map(|m| m.as_str().to_string())) +} + +fn parse_java_version(output: &str) -> Option { + let quoted_re = regex::Regex::new(r#"version\s+"([^"]+)""#).ok()?; + if let Some(captures) = quoted_re.captures(output) { + return captures.get(1).map(|m| m.as_str().to_string()); + } + + parse_semver_like(output) +} + +pub fn format_as_env_value(runtimes: &[DetectedRuntime]) -> String { + runtimes + .iter() + .map(|r| r.name.as_str()) + .collect::>() + .join(",") +} + +pub fn print_detection_report_for_env(env_var_name: &str, runtimes: &[DetectedRuntime]) { + println!("=== Attune Agent Runtime Detection Report ==="); + println!(); + + if runtimes.is_empty() { + println!("No runtimes detected!"); + println!(); + println!("The agent could not find any supported interpreter binaries."); + println!("Ensure at least one of the following is installed and on PATH:"); + println!(" - bash / sh (shell scripts)"); + println!(" - python3 / python (Python scripts)"); + println!(" - node / nodejs (Node.js scripts)"); + println!(" - ruby (Ruby scripts)"); + println!(" - go (Go programs)"); + println!(" - java (Java programs)"); + println!(" - Rscript (R scripts)"); + println!(" - perl (Perl scripts)"); + } else { + println!("Detected {} runtime(s):", runtimes.len()); + println!(); + for rt in runtimes { + let version_str = rt.version.as_deref().unwrap_or("unknown version"); + println!(" ✓ {:<10} {} ({})", rt.name, rt.path, version_str); + } + } + + println!(); + println!("{}={}", env_var_name, format_as_env_value(runtimes)); + println!(); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_semver_like_python() { + assert_eq!( + parse_semver_like("Python 3.12.1"), + Some("3.12.1".to_string()) + ); + } + + #[test] + fn test_parse_semver_like_node() { + assert_eq!(parse_semver_like("v20.11.0"), Some("20.11.0".to_string())); + } + + #[test] + fn test_parse_semver_like_go() { + assert_eq!( + parse_semver_like("go version go1.22.0 linux/amd64"), + Some("1.22.0".to_string()) + ); + } + + #[test] + fn test_parse_java_version_openjdk() { + assert_eq!( + parse_java_version(r#"openjdk version "21.0.1" 2023-10-17"#), + Some("21.0.1".to_string()) + ); + } + + #[test] + fn test_format_as_env_value_multiple() { + let runtimes = vec![ + DetectedRuntime { + name: "shell".to_string(), + path: "/bin/bash".to_string(), + version: Some("5.2.15".to_string()), + }, + DetectedRuntime { + name: "python".to_string(), + path: "/usr/bin/python3".to_string(), + version: Some("3.12.1".to_string()), + }, + ]; + + assert_eq!(format_as_env_value(&runtimes), "shell,python"); + } +} diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 4872efa..43fd040 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -6,6 +6,8 @@ //! - Configuration //! - Utilities +pub mod agent_bootstrap; +pub mod agent_runtime_detection; pub mod auth; pub mod config; pub mod crypto; diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index df81f53..4512bb8 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -776,6 +776,7 @@ pub mod runtime { pub pack_ref: Option, pub description: Option, pub name: String, + pub aliases: Vec, pub distributions: JsonDict, pub installation: Option, pub installers: JsonDict, diff --git a/crates/common/src/pack_registry/loader.rs b/crates/common/src/pack_registry/loader.rs index 105af9a..31c29c1 100644 --- a/crates/common/src/pack_registry/loader.rs +++ b/crates/common/src/pack_registry/loader.rs @@ -404,6 +404,16 @@ impl<'a> PackComponentLoader<'a> { .and_then(|v| serde_json::to_value(v).ok()) .unwrap_or_else(|| serde_json::json!({})); + let aliases: Vec = data + .get("aliases") + .and_then(|v| v.as_sequence()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(|s| s.to_ascii_lowercase())) + .collect() + }) + .unwrap_or_default(); + // Check if runtime already exists — update in place if so if let Some(existing) = RuntimeRepository::find_by_ref(self.pool, &runtime_ref).await? { let update_input = UpdateRuntimeInput { @@ -418,6 +428,7 @@ impl<'a> PackComponentLoader<'a> { None => Patch::Clear, }), execution_config: Some(execution_config), + aliases: Some(aliases), ..Default::default() }; @@ -449,6 +460,7 @@ impl<'a> PackComponentLoader<'a> { distributions, installation, execution_config, + aliases, auto_detected: false, detection_config: serde_json::json!({}), }; diff --git a/crates/common/src/repositories/runtime.rs b/crates/common/src/repositories/runtime.rs index 350cdd3..91a2538 100644 --- a/crates/common/src/repositories/runtime.rs +++ b/crates/common/src/repositories/runtime.rs @@ -25,7 +25,7 @@ impl Repository for RuntimeRepository { /// Columns selected for all Runtime queries. Centralised here so that /// schema changes only need one update. -pub const SELECT_COLUMNS: &str = "id, ref, pack, pack_ref, description, name, \ +pub const SELECT_COLUMNS: &str = "id, ref, pack, pack_ref, description, name, aliases, \ distributions, installation, installers, execution_config, \ auto_detected, detection_config, \ created, updated"; @@ -38,6 +38,7 @@ pub struct CreateRuntimeInput { pub pack_ref: Option, pub description: Option, pub name: String, + pub aliases: Vec, pub distributions: JsonDict, pub installation: Option, pub execution_config: JsonDict, @@ -50,6 +51,7 @@ pub struct CreateRuntimeInput { pub struct UpdateRuntimeInput { pub description: Option>, pub name: Option, + pub aliases: Option>, pub distributions: Option, pub installation: Option>, pub execution_config: Option, @@ -113,10 +115,10 @@ impl Create for RuntimeRepository { E: Executor<'e, Database = Postgres> + 'e, { let query = format!( - "INSERT INTO runtime (ref, pack, pack_ref, description, name, \ + "INSERT INTO runtime (ref, pack, pack_ref, description, name, aliases, \ distributions, installation, installers, execution_config, \ auto_detected, detection_config) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) \ RETURNING {}", SELECT_COLUMNS ); @@ -126,6 +128,7 @@ impl Create for RuntimeRepository { .bind(&input.pack_ref) .bind(&input.description) .bind(&input.name) + .bind(&input.aliases) .bind(&input.distributions) .bind(&input.installation) .bind(serde_json::json!({})) @@ -170,6 +173,15 @@ impl Update for RuntimeRepository { has_updates = true; } + if let Some(aliases) = &input.aliases { + if has_updates { + query.push(", "); + } + query.push("aliases = "); + query.push_bind(aliases.as_slice()); + has_updates = true; + } + if let Some(distributions) = &input.distributions { if has_updates { query.push(", "); @@ -286,6 +298,23 @@ impl RuntimeRepository { Ok(runtime) } + /// Find a runtime where the given alias appears in its `aliases` array. + /// Uses PostgreSQL's `@>` (array contains) operator with a GIN index. + pub async fn find_by_alias<'e, E>(executor: E, alias: &str) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + let query = format!( + "SELECT {} FROM runtime WHERE aliases @> ARRAY[$1]::text[] LIMIT 1", + SELECT_COLUMNS + ); + let runtime = sqlx::query_as::<_, Runtime>(&query) + .bind(alias) + .fetch_optional(executor) + .await?; + Ok(runtime) + } + /// Delete runtimes belonging to a pack whose refs are NOT in the given set. /// /// Used during pack reinstallation to clean up runtimes that were removed diff --git a/crates/common/src/runtime_detection.rs b/crates/common/src/runtime_detection.rs index 630e16a..79122e7 100644 --- a/crates/common/src/runtime_detection.rs +++ b/crates/common/src/runtime_detection.rs @@ -6,8 +6,10 @@ //! 2. Config file specification (medium priority) //! 3. Database-driven detection with verification (lowest priority) //! -//! Also provides [`normalize_runtime_name`] for alias-aware runtime name -//! comparison across the codebase (worker filters, env setup, etc.). +//! Also provides alias-based matching functions ([`runtime_aliases_match_filter`] +//! and [`runtime_aliases_contain`]) for comparing runtime alias lists against +//! worker filters and capability strings. Aliases are declared per-runtime in +//! pack manifests, so no hardcoded alias table is needed here. use crate::config::Config; use crate::error::Result; @@ -19,51 +21,26 @@ use std::collections::HashMap; use std::process::Command; use tracing::{debug, info, warn}; -/// Normalize a runtime name to its canonical short form. +/// Check if a runtime's aliases overlap with a filter list. /// -/// This ensures that different ways of referring to the same runtime -/// (e.g., "node", "nodejs", "node.js") all resolve to a single canonical -/// name. Used by worker runtime filters and environment setup to match -/// database runtime names against short filter values. +/// The filter list comes from `ATTUNE_WORKER_RUNTIMES` (e.g., `["python", "shell"]`). +/// A runtime matches if any of its declared aliases appear in the filter list. +/// Comparison is case-insensitive. +pub fn runtime_aliases_match_filter(aliases: &[String], filter: &[String]) -> bool { + aliases.iter().any(|alias| { + let lower_alias = alias.to_ascii_lowercase(); + filter.iter().any(|f| f.to_ascii_lowercase() == lower_alias) + }) +} + +/// Check if a runtime's aliases contain a specific name. /// -/// The canonical names mirror the alias groups in -/// `PackComponentLoader::resolve_runtime`. -/// -/// # Examples -/// ``` -/// use attune_common::runtime_detection::normalize_runtime_name; -/// assert_eq!(normalize_runtime_name("node.js"), "node"); -/// assert_eq!(normalize_runtime_name("nodejs"), "node"); -/// assert_eq!(normalize_runtime_name("Python3"), "python"); -/// assert_eq!(normalize_runtime_name("Shell"), "shell"); -/// ``` -pub fn normalize_runtime_name(name: &str) -> String { +/// Used by the scheduler to check if a worker's capability string +/// (e.g., "python") matches a runtime's aliases (e.g., ["python", "python3"]). +/// Comparison is case-insensitive. +pub fn runtime_aliases_contain(aliases: &[String], name: &str) -> bool { let lower = name.to_ascii_lowercase(); - match lower.as_str() { - "node" | "nodejs" | "node.js" => "node".to_string(), - "python" | "python3" => "python".to_string(), - "bash" | "sh" | "shell" => "shell".to_string(), - "native" | "builtin" | "standalone" => "native".to_string(), - "ruby" | "rb" => "ruby".to_string(), - "go" | "golang" => "go".to_string(), - "java" | "jdk" | "openjdk" => "java".to_string(), - "perl" | "perl5" => "perl".to_string(), - "r" | "rscript" => "r".to_string(), - _ => lower, - } -} - -/// Check if a runtime name matches a filter entry, supporting common aliases. -/// -/// Both sides are lowercased and then normalized before comparison so that, -/// e.g., a filter value of `"node"` matches a database runtime name `"Node.js"`. -pub fn runtime_matches_filter(rt_name: &str, filter_entry: &str) -> bool { - normalize_runtime_name(rt_name) == normalize_runtime_name(filter_entry) -} - -/// Check if a runtime name matches any entry in a filter list. -pub fn runtime_in_filter(rt_name: &str, filter: &[String]) -> bool { - filter.iter().any(|f| runtime_matches_filter(rt_name, f)) + aliases.iter().any(|a| a.to_ascii_lowercase() == lower) } /// Runtime detection service @@ -335,125 +312,46 @@ mod tests { use serde_json::json; #[test] - fn test_normalize_runtime_name_node_variants() { - assert_eq!(normalize_runtime_name("node"), "node"); - assert_eq!(normalize_runtime_name("nodejs"), "node"); - assert_eq!(normalize_runtime_name("node.js"), "node"); + fn test_runtime_aliases_match_filter() { + let aliases = vec!["python".to_string(), "python3".to_string()]; + let filter = vec!["python".to_string(), "shell".to_string()]; + assert!(runtime_aliases_match_filter(&aliases, &filter)); + + let filter_no_match = vec!["node".to_string(), "ruby".to_string()]; + assert!(!runtime_aliases_match_filter(&aliases, &filter_no_match)); } #[test] - fn test_normalize_runtime_name_python_variants() { - assert_eq!(normalize_runtime_name("python"), "python"); - assert_eq!(normalize_runtime_name("python3"), "python"); + fn test_runtime_aliases_match_filter_case_insensitive() { + let aliases = vec!["Python".to_string(), "python3".to_string()]; + let filter = vec!["python".to_string()]; + assert!(runtime_aliases_match_filter(&aliases, &filter)); } #[test] - fn test_normalize_runtime_name_shell_variants() { - assert_eq!(normalize_runtime_name("shell"), "shell"); - assert_eq!(normalize_runtime_name("bash"), "shell"); - assert_eq!(normalize_runtime_name("sh"), "shell"); + fn test_runtime_aliases_match_filter_empty() { + let aliases: Vec = vec![]; + let filter = vec!["python".to_string()]; + assert!(!runtime_aliases_match_filter(&aliases, &filter)); + + let aliases = vec!["python".to_string()]; + let filter: Vec = vec![]; + assert!(!runtime_aliases_match_filter(&aliases, &filter)); } #[test] - fn test_normalize_runtime_name_native_variants() { - assert_eq!(normalize_runtime_name("native"), "native"); - assert_eq!(normalize_runtime_name("builtin"), "native"); - assert_eq!(normalize_runtime_name("standalone"), "native"); + fn test_runtime_aliases_contain() { + let aliases = vec!["ruby".to_string(), "rb".to_string()]; + assert!(runtime_aliases_contain(&aliases, "ruby")); + assert!(runtime_aliases_contain(&aliases, "rb")); + assert!(!runtime_aliases_contain(&aliases, "python")); } #[test] - fn test_normalize_runtime_name_ruby_variants() { - assert_eq!(normalize_runtime_name("ruby"), "ruby"); - assert_eq!(normalize_runtime_name("rb"), "ruby"); - } - - #[test] - fn test_normalize_runtime_name_go_variants() { - assert_eq!(normalize_runtime_name("go"), "go"); - assert_eq!(normalize_runtime_name("golang"), "go"); - } - - #[test] - fn test_normalize_runtime_name_java_variants() { - assert_eq!(normalize_runtime_name("java"), "java"); - assert_eq!(normalize_runtime_name("jdk"), "java"); - assert_eq!(normalize_runtime_name("openjdk"), "java"); - } - - #[test] - fn test_normalize_runtime_name_perl_variants() { - assert_eq!(normalize_runtime_name("perl"), "perl"); - assert_eq!(normalize_runtime_name("perl5"), "perl"); - } - - #[test] - fn test_normalize_runtime_name_r_variants() { - assert_eq!(normalize_runtime_name("r"), "r"); - assert_eq!(normalize_runtime_name("rscript"), "r"); - } - - #[test] - fn test_normalize_runtime_name_passthrough() { - assert_eq!(normalize_runtime_name("custom_runtime"), "custom_runtime"); - } - - #[test] - fn test_normalize_runtime_name_case_insensitive() { - assert_eq!(normalize_runtime_name("Node"), "node"); - assert_eq!(normalize_runtime_name("NodeJS"), "node"); - assert_eq!(normalize_runtime_name("Node.js"), "node"); - assert_eq!(normalize_runtime_name("Python"), "python"); - assert_eq!(normalize_runtime_name("Python3"), "python"); - assert_eq!(normalize_runtime_name("Shell"), "shell"); - assert_eq!(normalize_runtime_name("BASH"), "shell"); - assert_eq!(normalize_runtime_name("Ruby"), "ruby"); - assert_eq!(normalize_runtime_name("Go"), "go"); - assert_eq!(normalize_runtime_name("GoLang"), "go"); - assert_eq!(normalize_runtime_name("Java"), "java"); - assert_eq!(normalize_runtime_name("JDK"), "java"); - assert_eq!(normalize_runtime_name("Perl"), "perl"); - assert_eq!(normalize_runtime_name("R"), "r"); - assert_eq!(normalize_runtime_name("Custom_Runtime"), "custom_runtime"); - } - - #[test] - fn test_runtime_matches_filter() { - // Node.js DB name lowercased vs worker filter "node" - assert!(runtime_matches_filter("node.js", "node")); - assert!(runtime_matches_filter("node", "nodejs")); - assert!(runtime_matches_filter("nodejs", "node.js")); - // Exact match - assert!(runtime_matches_filter("shell", "shell")); - // No match - assert!(!runtime_matches_filter("python", "node")); - } - - #[test] - fn test_runtime_matches_filter_case_insensitive() { - // Database stores capitalized names (e.g., "Node.js", "Python") - // Worker capabilities store lowercase (e.g., "node", "python") - assert!(runtime_matches_filter("Node.js", "node")); - assert!(runtime_matches_filter("node", "Node.js")); - assert!(runtime_matches_filter("Python", "python")); - assert!(runtime_matches_filter("python", "Python")); - assert!(runtime_matches_filter("Shell", "shell")); - assert!(runtime_matches_filter("NODEJS", "node")); - assert!(runtime_matches_filter("Ruby", "ruby")); - assert!(runtime_matches_filter("ruby", "rb")); - assert!(runtime_matches_filter("Go", "golang")); - assert!(runtime_matches_filter("R", "rscript")); - assert!(runtime_matches_filter("Java", "jdk")); - assert!(runtime_matches_filter("Perl", "perl5")); - assert!(!runtime_matches_filter("Python", "node")); - } - - #[test] - fn test_runtime_in_filter() { - let filter = vec!["shell".to_string(), "node".to_string()]; - assert!(runtime_in_filter("shell", &filter)); - assert!(runtime_in_filter("node.js", &filter)); - assert!(runtime_in_filter("nodejs", &filter)); - assert!(!runtime_in_filter("python", &filter)); + fn test_runtime_aliases_contain_case_insensitive() { + let aliases = vec!["ruby".to_string(), "rb".to_string()]; + assert!(runtime_aliases_contain(&aliases, "Ruby")); + assert!(runtime_aliases_contain(&aliases, "RB")); } #[test] diff --git a/crates/common/tests/helpers.rs b/crates/common/tests/helpers.rs index d6207dd..1483510 100644 --- a/crates/common/tests/helpers.rs +++ b/crates/common/tests/helpers.rs @@ -961,6 +961,7 @@ impl RuntimeFixture { pack_ref: self.pack_ref, description: self.description, name: self.name, + aliases: vec![], distributions: self.distributions, installation: self.installation, execution_config: self.execution_config, diff --git a/crates/common/tests/repository_runtime_tests.rs b/crates/common/tests/repository_runtime_tests.rs index 68758be..cb76c19 100644 --- a/crates/common/tests/repository_runtime_tests.rs +++ b/crates/common/tests/repository_runtime_tests.rs @@ -64,6 +64,7 @@ impl RuntimeFixture { pack_ref: None, description: Some(format!("Test runtime {}", seq)), name, + aliases: vec![], distributions: json!({ "linux": { "supported": true, "versions": ["ubuntu20.04", "ubuntu22.04"] }, "darwin": { "supported": true, "versions": ["12", "13"] } @@ -95,6 +96,7 @@ impl RuntimeFixture { pack_ref: None, description: None, name, + aliases: vec![], distributions: json!({}), installation: None, execution_config: json!({ diff --git a/crates/common/tests/repository_worker_tests.rs b/crates/common/tests/repository_worker_tests.rs index c59a688..744a45e 100644 --- a/crates/common/tests/repository_worker_tests.rs +++ b/crates/common/tests/repository_worker_tests.rs @@ -574,6 +574,7 @@ async fn test_worker_with_runtime() { pack_ref: None, description: Some("Test runtime".to_string()), name: "test_runtime".to_string(), + aliases: vec![], distributions: json!({}), installation: None, execution_config: json!({ diff --git a/crates/executor/src/scheduler.rs b/crates/executor/src/scheduler.rs index dbb6923..8cb7ee5 100644 --- a/crates/executor/src/scheduler.rs +++ b/crates/executor/src/scheduler.rs @@ -13,8 +13,11 @@ use anyhow::Result; use attune_common::{ - models::{enums::ExecutionStatus, execution::WorkflowTaskMetadata, Action, Execution}, - mq::{Consumer, ExecutionRequestedPayload, MessageEnvelope, MessageType, Publisher}, + models::{enums::ExecutionStatus, execution::WorkflowTaskMetadata, Action, Execution, Runtime}, + mq::{ + Consumer, ExecutionCompletedPayload, ExecutionRequestedPayload, MessageEnvelope, + MessageType, Publisher, + }, repositories::{ action::ActionRepository, execution::{CreateExecutionInput, ExecutionRepository, UpdateExecutionInput}, @@ -24,7 +27,7 @@ use attune_common::{ }, Create, FindById, FindByRef, Update, }, - runtime_detection::runtime_matches_filter, + runtime_detection::runtime_aliases_contain, workflow::WorkflowDefinition, }; use chrono::Utc; @@ -205,7 +208,23 @@ impl ExecutionScheduler { } // Regular action: select appropriate worker (round-robin among compatible workers) - let worker = Self::select_worker(pool, &action, round_robin_counter).await?; + let worker = match Self::select_worker(pool, &action, round_robin_counter).await { + Ok(worker) => worker, + Err(err) if Self::is_unschedulable_error(&err) => { + Self::fail_unschedulable_execution( + pool, + publisher, + envelope, + execution_id, + action.id, + &action.r#ref, + &err.to_string(), + ) + .await?; + return Ok(()); + } + Err(err) => return Err(err), + }; info!( "Selected worker {} for execution {}", @@ -1561,7 +1580,7 @@ impl ExecutionScheduler { let compatible_workers: Vec<_> = if let Some(ref runtime) = runtime { workers .into_iter() - .filter(|w| Self::worker_supports_runtime(w, &runtime.name)) + .filter(|w| Self::worker_supports_runtime(w, runtime)) .collect() } else { workers @@ -1619,20 +1638,26 @@ impl ExecutionScheduler { /// Check if a worker supports a given runtime /// - /// This checks the worker's capabilities.runtimes array for the runtime name. - /// Falls back to checking the deprecated runtime column if capabilities are not set. - fn worker_supports_runtime(worker: &attune_common::models::Worker, runtime_name: &str) -> bool { - // First, try to parse capabilities and check runtimes array + /// This checks the worker's capabilities.runtimes array against the runtime's aliases. + /// If aliases are missing, fall back to the runtime's canonical name. + fn worker_supports_runtime(worker: &attune_common::models::Worker, runtime: &Runtime) -> bool { + let runtime_names = Self::runtime_capability_names(runtime); + + // Try to parse capabilities and check runtimes array if let Some(ref capabilities) = worker.capabilities { if let Some(runtimes) = capabilities.get("runtimes") { if let Some(runtime_array) = runtimes.as_array() { - // Check if any runtime in the array matches (alias-aware) + // Check if any runtime in the array matches via aliases for runtime_value in runtime_array { if let Some(runtime_str) = runtime_value.as_str() { - if runtime_matches_filter(runtime_name, runtime_str) { + if runtime_names + .iter() + .any(|candidate| candidate.eq_ignore_ascii_case(runtime_str)) + || runtime_aliases_contain(&runtime.aliases, runtime_str) + { debug!( - "Worker {} supports runtime '{}' via capabilities (matched '{}')", - worker.name, runtime_name, runtime_str + "Worker {} supports runtime '{}' via capabilities (matched '{}', candidates: {:?})", + worker.name, runtime.name, runtime_str, runtime_names ); return true; } @@ -1642,25 +1667,90 @@ impl ExecutionScheduler { } } - // Fallback: check deprecated runtime column - // This is kept for backward compatibility but should be removed in the future - if worker.runtime.is_some() { - debug!( - "Worker {} using deprecated runtime column for matching", - worker.name - ); - // Note: This fallback is incomplete because we'd need to look up the runtime name - // from the ID, which would require an async call. Since we're moving to capabilities, - // we'll just return false here and require workers to set capabilities properly. - } - debug!( - "Worker {} does not support runtime '{}'", - worker.name, runtime_name + "Worker {} does not support runtime '{}' (candidates: {:?})", + worker.name, runtime.name, runtime_names ); false } + fn runtime_capability_names(runtime: &Runtime) -> Vec { + let mut names: Vec = runtime + .aliases + .iter() + .map(|alias| alias.to_ascii_lowercase()) + .filter(|alias| !alias.is_empty()) + .collect(); + + let runtime_name = runtime.name.to_ascii_lowercase(); + if !runtime_name.is_empty() && !names.iter().any(|name| name == &runtime_name) { + names.push(runtime_name); + } + + names + } + + fn is_unschedulable_error(error: &anyhow::Error) -> bool { + let message = error.to_string(); + message.starts_with("No compatible workers found") + || message.starts_with("No action workers available") + || message.starts_with("No active workers available") + || message.starts_with("No workers with fresh heartbeats available") + } + + async fn fail_unschedulable_execution( + pool: &PgPool, + publisher: &Publisher, + envelope: &MessageEnvelope, + execution_id: i64, + action_id: i64, + action_ref: &str, + error_message: &str, + ) -> Result<()> { + let completed_at = Utc::now(); + let result = serde_json::json!({ + "error": "Execution is unschedulable", + "message": error_message, + "action_ref": action_ref, + "failed_by": "execution_scheduler", + "failed_at": completed_at.to_rfc3339(), + }); + + ExecutionRepository::update( + pool, + execution_id, + UpdateExecutionInput { + status: Some(ExecutionStatus::Failed), + result: Some(result.clone()), + ..Default::default() + }, + ) + .await?; + + let completed = MessageEnvelope::new( + MessageType::ExecutionCompleted, + ExecutionCompletedPayload { + execution_id, + action_id, + action_ref: action_ref.to_string(), + status: "failed".to_string(), + result: Some(result), + completed_at, + }, + ) + .with_correlation_id(envelope.correlation_id) + .with_source("attune-executor"); + + publisher.publish_envelope(&completed).await?; + + warn!( + "Execution {} marked failed as unschedulable: {}", + execution_id, error_message + ); + + Ok(()) + } + /// Check if a worker's heartbeat is fresh enough to schedule work /// /// A worker is considered fresh if its last heartbeat is within @@ -1826,6 +1916,70 @@ mod tests { // Real tests will require database and message queue setup } + #[test] + fn test_worker_supports_runtime_with_alias_match() { + let worker = create_test_worker("test-worker", 5); + let runtime = Runtime { + id: 1, + r#ref: "core.shell".to_string(), + pack: None, + pack_ref: Some("core".to_string()), + description: Some("Shell runtime".to_string()), + name: "Shell".to_string(), + aliases: vec!["shell".to_string(), "bash".to_string()], + distributions: serde_json::json!({}), + installation: None, + installers: serde_json::json!({}), + execution_config: serde_json::json!({}), + auto_detected: false, + detection_config: serde_json::json!({}), + created: Utc::now(), + updated: Utc::now(), + }; + + assert!(ExecutionScheduler::worker_supports_runtime( + &worker, &runtime + )); + } + + #[test] + fn test_worker_supports_runtime_falls_back_to_runtime_name_when_aliases_missing() { + let worker = create_test_worker("test-worker", 5); + let runtime = Runtime { + id: 1, + r#ref: "core.shell".to_string(), + pack: None, + pack_ref: Some("core".to_string()), + description: Some("Shell runtime".to_string()), + name: "Shell".to_string(), + aliases: vec![], + distributions: serde_json::json!({}), + installation: None, + installers: serde_json::json!({}), + execution_config: serde_json::json!({}), + auto_detected: false, + detection_config: serde_json::json!({}), + created: Utc::now(), + updated: Utc::now(), + }; + + assert!(ExecutionScheduler::worker_supports_runtime( + &worker, &runtime + )); + } + + #[test] + fn test_unschedulable_error_classification() { + assert!(ExecutionScheduler::is_unschedulable_error( + &anyhow::anyhow!( + "No compatible workers found for action: core.sleep (requires runtime: Shell)" + ) + )); + assert!(!ExecutionScheduler::is_unschedulable_error( + &anyhow::anyhow!("database temporarily unavailable") + )); + } + #[test] fn test_concurrency_limit_dispatch_count() { // Verify the dispatch_count calculation used by dispatch_with_items_task diff --git a/crates/executor/tests/fifo_ordering_integration_test.rs b/crates/executor/tests/fifo_ordering_integration_test.rs index bd4fb0a..6d36dd0 100644 --- a/crates/executor/tests/fifo_ordering_integration_test.rs +++ b/crates/executor/tests/fifo_ordering_integration_test.rs @@ -72,6 +72,7 @@ async fn _create_test_runtime(pool: &PgPool, suffix: &str) -> i64 { pack_ref: None, description: Some(format!("Test runtime {}", suffix)), name: format!("Python {}", suffix), + aliases: vec![], distributions: json!({"ubuntu": "python3"}), installation: Some(json!({"method": "apt"})), execution_config: json!({ diff --git a/crates/executor/tests/policy_enforcer_tests.rs b/crates/executor/tests/policy_enforcer_tests.rs index fcafcc8..8fcfdfd 100644 --- a/crates/executor/tests/policy_enforcer_tests.rs +++ b/crates/executor/tests/policy_enforcer_tests.rs @@ -67,6 +67,7 @@ async fn create_test_runtime(pool: &PgPool, suffix: &str) -> i64 { pack_ref: None, description: Some(format!("Test runtime {}", suffix)), name: format!("Python {}", suffix), + aliases: vec![], distributions: json!({"ubuntu": "python3"}), installation: Some(json!({"method": "apt"})), execution_config: json!({ diff --git a/crates/sensor/Cargo.toml b/crates/sensor/Cargo.toml index 37f7d02..d944d89 100644 --- a/crates/sensor/Cargo.toml +++ b/crates/sensor/Cargo.toml @@ -14,6 +14,10 @@ path = "src/lib.rs" name = "attune-sensor" path = "src/main.rs" +[[bin]] +name = "attune-sensor-agent" +path = "src/agent_main.rs" + [dependencies] attune-common = { path = "../common" } tokio = { workspace = true } diff --git a/crates/sensor/src/agent_main.rs b/crates/sensor/src/agent_main.rs new file mode 100644 index 0000000..7f05447 --- /dev/null +++ b/crates/sensor/src/agent_main.rs @@ -0,0 +1,79 @@ +//! Attune Universal Sensor Agent. + +use anyhow::Result; +use attune_common::agent_bootstrap::{bootstrap_runtime_env, print_detect_only_report}; +use attune_common::config::Config; +use attune_sensor::startup::{ + apply_sensor_name_override, init_tracing, log_config_details, run_sensor_service, + set_config_path, +}; +use clap::Parser; +use tracing::info; + +#[derive(Parser, Debug)] +#[command(name = "attune-sensor-agent")] +#[command( + version, + about = "Attune Universal Sensor Agent - Injected into runtime containers to auto-detect sensor runtimes" +)] +struct Args { + /// Path to configuration file (optional) + #[arg(short, long)] + config: Option, + + /// Sensor worker name override + #[arg(short, long)] + name: Option, + + /// Run runtime detection, print results, and exit + #[arg(long)] + detect_only: bool, +} + +fn main() -> Result<()> { + attune_common::auth::install_crypto_provider(); + init_tracing(tracing::Level::INFO); + + let args = Args::parse(); + + info!("Starting Attune Universal Sensor Agent"); + info!( + "Agent binary: attune-sensor-agent {}", + env!("CARGO_PKG_VERSION") + ); + + // Safe: no async runtime or worker threads are running yet. + std::env::set_var("ATTUNE_SENSOR_AGENT_MODE", "true"); + std::env::set_var("ATTUNE_SENSOR_AGENT_BINARY_NAME", "attune-sensor-agent"); + std::env::set_var( + "ATTUNE_SENSOR_AGENT_BINARY_VERSION", + env!("CARGO_PKG_VERSION"), + ); + + let bootstrap = bootstrap_runtime_env("ATTUNE_SENSOR_RUNTIMES"); + + if args.detect_only { + print_detect_only_report("ATTUNE_SENSOR_RUNTIMES", &bootstrap); + return Ok(()); + } + + set_config_path(args.config.as_deref()); + + let runtime = tokio::runtime::Runtime::new()?; + runtime.block_on(async_main(args)) +} + +async fn async_main(args: Args) -> Result<()> { + let mut config = Config::load()?; + config.validate()?; + + if let Some(name) = args.name { + apply_sensor_name_override(&mut config, name); + } + + log_config_details(&config); + run_sensor_service(config, "Attune Sensor Agent is ready").await?; + info!("Attune Sensor Agent shutdown complete"); + + Ok(()) +} diff --git a/crates/sensor/src/lib.rs b/crates/sensor/src/lib.rs index 1dbd5d3..7ce89b8 100644 --- a/crates/sensor/src/lib.rs +++ b/crates/sensor/src/lib.rs @@ -8,6 +8,7 @@ pub mod rule_lifecycle_listener; pub mod sensor_manager; pub mod sensor_worker_registration; pub mod service; +pub mod startup; // Re-export template resolver from common crate pub mod template_resolver { diff --git a/crates/sensor/src/main.rs b/crates/sensor/src/main.rs index 2cf4bf3..f460c1c 100644 --- a/crates/sensor/src/main.rs +++ b/crates/sensor/src/main.rs @@ -1,15 +1,14 @@ //! Attune Sensor Service //! //! The Sensor Service monitors for trigger conditions and generates events. -//! It executes custom sensor code, manages sensor lifecycle, and publishes -//! events to the message queue for rule matching and enforcement creation. use anyhow::Result; use attune_common::config::Config; -use attune_sensor::service::SensorService; +use attune_sensor::startup::{ + init_tracing, log_config_details, run_sensor_service, set_config_path, +}; use clap::Parser; -use tokio::signal::unix::{signal, SignalKind}; -use tracing::{error, info}; +use tracing::info; #[derive(Parser, Debug)] #[command(name = "attune-sensor")] @@ -26,114 +25,23 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { - // Install HMAC-only JWT crypto provider (must be before any token operations) attune_common::auth::install_crypto_provider(); let args = Args::parse(); - - // Initialize tracing with specified log level let log_level = args.log_level.parse().unwrap_or(tracing::Level::INFO); - tracing_subscriber::fmt() - .with_max_level(log_level) - .with_target(false) - .with_thread_ids(true) - .with_file(true) - .with_line_number(true) - .init(); + init_tracing(log_level); info!("Starting Attune Sensor Service"); info!("Version: {}", env!("CARGO_PKG_VERSION")); - // Load configuration - if let Some(config_path) = args.config { - info!("Loading configuration from: {}", config_path); - std::env::set_var("ATTUNE_CONFIG", config_path); - } + set_config_path(args.config.as_deref()); let config = Config::load()?; config.validate()?; - info!("Configuration loaded successfully"); - info!("Environment: {}", config.environment); - info!("Database: {}", mask_connection_string(&config.database.url)); - if let Some(ref mq_config) = config.message_queue { - info!("Message Queue: {}", mask_connection_string(&mq_config.url)); - } - - // Create and start sensor service - let service = SensorService::new(config).await?; - - info!("Sensor Service initialized successfully"); - - // Start the service (spawns background tasks and returns) - info!("Starting Sensor Service components..."); - service.start().await?; - - info!("Attune Sensor Service is ready"); - - // Setup signal handlers for graceful shutdown - let mut sigint = signal(SignalKind::interrupt())?; - let mut sigterm = signal(SignalKind::terminate())?; - - tokio::select! { - _ = sigint.recv() => { - info!("Received SIGINT signal"); - } - _ = sigterm.recv() => { - info!("Received SIGTERM signal"); - } - } - - info!("Shutting down gracefully..."); - - // Stop the service: deregister worker, stop sensors, clean up connections - if let Err(e) = service.stop().await { - error!("Error during shutdown: {}", e); - } - + log_config_details(&config); + run_sensor_service(config, "Attune Sensor Service is ready").await?; info!("Attune Sensor Service shutdown complete"); Ok(()) } - -/// Mask sensitive parts of connection strings for logging -fn mask_connection_string(url: &str) -> String { - if let Some(at_pos) = url.find('@') { - if let Some(proto_end) = url.find("://") { - let protocol = &url[..proto_end + 3]; - let host_and_path = &url[at_pos..]; - return format!("{}***:***{}", protocol, host_and_path); - } - } - "***:***@***".to_string() -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_mask_connection_string() { - let url = "postgresql://user:password@localhost:5432/attune"; - let masked = mask_connection_string(url); - assert!(!masked.contains("user")); - assert!(!masked.contains("password")); - assert!(masked.contains("@localhost")); - } - - #[test] - fn test_mask_connection_string_no_credentials() { - let url = "postgresql://localhost:5432/attune"; - let masked = mask_connection_string(url); - assert_eq!(masked, "***:***@***"); - } - - #[test] - fn test_mask_rabbitmq_connection() { - let url = "amqp://admin:secret@rabbitmq:5672/%2F"; - let masked = mask_connection_string(url); - assert!(!masked.contains("admin")); - assert!(!masked.contains("secret")); - assert!(masked.contains("@rabbitmq")); - } -} diff --git a/crates/sensor/src/sensor_manager.rs b/crates/sensor/src/sensor_manager.rs index 38a031b..8ab5c2f 100644 --- a/crates/sensor/src/sensor_manager.rs +++ b/crates/sensor/src/sensor_manager.rs @@ -11,7 +11,7 @@ //! - Monitoring sensor health and restarting failed sensors use anyhow::{anyhow, Result}; -use attune_common::models::{Id, Sensor, Trigger}; +use attune_common::models::{runtime::RuntimeExecutionConfig, Id, Sensor, Trigger}; use attune_common::repositories::{FindById, List, RuntimeRepository}; use sqlx::{PgPool, Row}; @@ -162,6 +162,127 @@ impl SensorManager { Ok(enabled_sensors) } + async fn ensure_runtime_environment( + &self, + exec_config: &RuntimeExecutionConfig, + pack_dir: &std::path::Path, + env_dir: &std::path::Path, + ) -> Result<()> { + let env_cfg = match &exec_config.environment { + Some(cfg) if cfg.env_type != "none" => cfg, + _ => return Ok(()), + }; + + let vars = exec_config.build_template_vars_with_env(pack_dir, Some(env_dir)); + + if !env_dir.exists() { + if env_cfg.create_command.is_empty() { + return Err(anyhow!( + "Runtime environment '{}' requires create_command but none is configured", + env_cfg.env_type + )); + } + + if let Some(parent) = env_dir.parent() { + tokio::fs::create_dir_all(parent).await.map_err(|e| { + anyhow!( + "Failed to create runtime environment parent directory {}: {}", + parent.display(), + e + ) + })?; + } + + let resolved_cmd = + RuntimeExecutionConfig::resolve_command(&env_cfg.create_command, &vars); + let (program, args) = resolved_cmd + .split_first() + .ok_or_else(|| anyhow!("Empty create_command for runtime environment"))?; + + info!( + "Creating sensor runtime environment at {}: {:?}", + env_dir.display(), + resolved_cmd + ); + + let output = Command::new(program) + .args(args) + .current_dir(pack_dir) + .output() + .await + .map_err(|e| anyhow!("Failed to run create command '{}': {}", program, e))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(anyhow!( + "Runtime environment creation failed (exit {}): {}", + output.status.code().unwrap_or(-1), + stderr.trim() + )); + } + } + + let dep_cfg = match &exec_config.dependencies { + Some(cfg) => cfg, + None => return Ok(()), + }; + + let manifest_path = pack_dir.join(&dep_cfg.manifest_file); + if !manifest_path.exists() || dep_cfg.install_command.is_empty() { + return Ok(()); + } + + let install_marker = env_dir.join(".attune_sensor_deps_installed"); + if install_marker.exists() { + return Ok(()); + } + + let resolved_cmd = RuntimeExecutionConfig::resolve_command(&dep_cfg.install_command, &vars); + let (program, args) = resolved_cmd + .split_first() + .ok_or_else(|| anyhow!("Empty install_command for runtime dependencies"))?; + + info!( + "Installing sensor runtime dependencies for {} using {:?}", + pack_dir.display(), + resolved_cmd + ); + + let output = Command::new(program) + .args(args) + .current_dir(pack_dir) + .output() + .await + .map_err(|e| { + anyhow!( + "Failed to run dependency install command '{}': {}", + program, + e + ) + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(anyhow!( + "Runtime dependency installation failed (exit {}): {}", + output.status.code().unwrap_or(-1), + stderr.trim() + )); + } + + tokio::fs::write(&install_marker, b"ok") + .await + .map_err(|e| { + anyhow!( + "Failed to write dependency install marker {}: {}", + install_marker.display(), + e + ) + })?; + + Ok(()) + } + /// Start a sensor instance async fn start_sensor(&self, sensor: Sensor) -> Result<()> { info!("Starting sensor {} ({})", sensor.r#ref, sensor.id); @@ -231,6 +352,12 @@ impl SensorManager { let exec_config = runtime.parsed_execution_config(); let rt_name = runtime.name.to_lowercase(); + let runtime_env_suffix = runtime + .r#ref + .rsplit('.') + .next() + .filter(|suffix| !suffix.is_empty()) + .unwrap_or(&rt_name); info!( "Sensor {} runtime details: id={}, ref='{}', name='{}', execution_config={}", @@ -242,7 +369,19 @@ impl SensorManager { let pack_dir = std::path::PathBuf::from(&self.inner.packs_base_dir).join(pack_ref); let env_dir = std::path::PathBuf::from(&self.inner.runtime_envs_dir) .join(pack_ref) - .join(&rt_name); + .join(runtime_env_suffix); + if let Err(e) = self + .ensure_runtime_environment(&exec_config, &pack_dir, &env_dir) + .await + { + warn!( + "Failed to ensure sensor runtime environment for {} at {}: {}", + sensor.r#ref, + env_dir.display(), + e + ); + } + let env_dir_opt = if env_dir.exists() { Some(env_dir.as_path()) } else { @@ -354,15 +493,31 @@ impl SensorManager { // Start the standalone sensor with token and configuration // Pass sensor ref (e.g., "core.interval_timer_sensor") for proper identification - let mut child = cmd - .env("ATTUNE_API_URL", &self.inner.api_url) + cmd.env("ATTUNE_API_URL", &self.inner.api_url) .env("ATTUNE_API_TOKEN", &token_response.token) .env("ATTUNE_SENSOR_ID", sensor.id.to_string()) .env("ATTUNE_SENSOR_REF", &sensor.r#ref) .env("ATTUNE_SENSOR_TRIGGERS", &trigger_instances_json) .env("ATTUNE_MQ_URL", &self.inner.mq_url) .env("ATTUNE_MQ_EXCHANGE", "attune.events") - .env("ATTUNE_LOG_LEVEL", "info") + .env("ATTUNE_LOG_LEVEL", "info"); + + if !exec_config.env_vars.is_empty() { + let vars = exec_config.build_template_vars_with_env(&pack_dir, env_dir_opt); + for (key, value_template) in &exec_config.env_vars { + let resolved = attune_common::models::RuntimeExecutionConfig::resolve_template( + value_template, + &vars, + ); + debug!( + "Setting sensor runtime env var: {}={} (template: {})", + key, resolved, value_template + ); + cmd.env(key, resolved); + } + } + + let mut child = cmd .stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) @@ -371,13 +526,14 @@ impl SensorManager { anyhow!( "Failed to start sensor process for '{}': {} \ (binary='{}', is_native={}, runtime_ref='{}', \ - interpreter_config='{}')", + interpreter_config='{}', env_dir='{}')", sensor.r#ref, e, spawn_binary, is_native, runtime.r#ref, - interpreter_binary + interpreter_binary, + env_dir.display() ) })?; diff --git a/crates/sensor/src/sensor_worker_registration.rs b/crates/sensor/src/sensor_worker_registration.rs index 35e44ca..065ceed 100644 --- a/crates/sensor/src/sensor_worker_registration.rs +++ b/crates/sensor/src/sensor_worker_registration.rs @@ -15,6 +15,10 @@ use sqlx::{PgPool, Row}; use std::collections::HashMap; use tracing::{debug, info}; +const ATTUNE_SENSOR_AGENT_MODE_ENV: &str = "ATTUNE_SENSOR_AGENT_MODE"; +const ATTUNE_SENSOR_AGENT_BINARY_NAME_ENV: &str = "ATTUNE_SENSOR_AGENT_BINARY_NAME"; +const ATTUNE_SENSOR_AGENT_BINARY_VERSION_ENV: &str = "ATTUNE_SENSOR_AGENT_BINARY_VERSION"; + /// Sensor worker registration manager pub struct SensorWorkerRegistration { pool: PgPool, @@ -25,6 +29,33 @@ pub struct SensorWorkerRegistration { } impl SensorWorkerRegistration { + fn env_truthy(name: &str) -> bool { + std::env::var(name) + .ok() + .map(|value| matches!(value.trim().to_ascii_lowercase().as_str(), "1" | "true")) + .unwrap_or(false) + } + + fn inject_agent_capabilities(capabilities: &mut HashMap) { + if Self::env_truthy(ATTUNE_SENSOR_AGENT_MODE_ENV) { + capabilities.insert("agent_mode".to_string(), json!(true)); + } + + if let Ok(binary_name) = std::env::var(ATTUNE_SENSOR_AGENT_BINARY_NAME_ENV) { + let binary_name = binary_name.trim(); + if !binary_name.is_empty() { + capabilities.insert("agent_binary_name".to_string(), json!(binary_name)); + } + } + + if let Ok(binary_version) = std::env::var(ATTUNE_SENSOR_AGENT_BINARY_VERSION_ENV) { + let binary_version = binary_version.trim(); + if !binary_version.is_empty() { + capabilities.insert("agent_binary_version".to_string(), json!(binary_version)); + } + } + } + /// Create a new sensor worker registration manager pub fn new(pool: PgPool, config: &Config) -> Self { let worker_name = config @@ -67,6 +98,8 @@ impl SensorWorkerRegistration { json!(env!("CARGO_PKG_VERSION")), ); + Self::inject_agent_capabilities(&mut capabilities); + // Placeholder for runtimes (will be detected asynchronously) capabilities.insert("runtimes".to_string(), json!(Vec::::new())); @@ -351,4 +384,28 @@ mod tests { registration.deregister().await.unwrap(); } + + #[test] + fn test_inject_agent_capabilities_from_env() { + std::env::set_var(ATTUNE_SENSOR_AGENT_MODE_ENV, "1"); + std::env::set_var(ATTUNE_SENSOR_AGENT_BINARY_NAME_ENV, "attune-sensor-agent"); + std::env::set_var(ATTUNE_SENSOR_AGENT_BINARY_VERSION_ENV, "1.2.3"); + + let mut capabilities = HashMap::new(); + SensorWorkerRegistration::inject_agent_capabilities(&mut capabilities); + + assert_eq!(capabilities.get("agent_mode"), Some(&json!(true))); + assert_eq!( + capabilities.get("agent_binary_name"), + Some(&json!("attune-sensor-agent")) + ); + assert_eq!( + capabilities.get("agent_binary_version"), + Some(&json!("1.2.3")) + ); + + std::env::remove_var(ATTUNE_SENSOR_AGENT_MODE_ENV); + std::env::remove_var(ATTUNE_SENSOR_AGENT_BINARY_NAME_ENV); + std::env::remove_var(ATTUNE_SENSOR_AGENT_BINARY_VERSION_ENV); + } } diff --git a/crates/sensor/src/startup.rs b/crates/sensor/src/startup.rs new file mode 100644 index 0000000..89b3634 --- /dev/null +++ b/crates/sensor/src/startup.rs @@ -0,0 +1,119 @@ +use crate::service::SensorService; +use anyhow::Result; +use attune_common::config::{Config, SensorConfig}; +use tokio::signal::unix::{signal, SignalKind}; +use tracing::{error, info}; + +pub fn init_tracing(log_level: tracing::Level) { + tracing_subscriber::fmt() + .with_max_level(log_level) + .with_target(false) + .with_thread_ids(true) + .with_file(true) + .with_line_number(true) + .init(); +} + +pub fn set_config_path(config_path: Option<&str>) { + if let Some(config_path) = config_path { + info!("Loading configuration from: {}", config_path); + std::env::set_var("ATTUNE_CONFIG", config_path); + } +} + +pub fn apply_sensor_name_override(config: &mut Config, name: String) { + if let Some(ref mut sensor_config) = config.sensor { + sensor_config.worker_name = Some(name); + } else { + config.sensor = Some(SensorConfig { + worker_name: Some(name), + host: None, + capabilities: None, + max_concurrent_sensors: None, + heartbeat_interval: 30, + poll_interval: 30, + sensor_timeout: 30, + shutdown_timeout: 30, + }); + } +} + +pub fn log_config_details(config: &Config) { + info!("Configuration loaded successfully"); + info!("Environment: {}", config.environment); + info!("Database: {}", mask_connection_string(&config.database.url)); + if let Some(ref mq_config) = config.message_queue { + info!("Message Queue: {}", mask_connection_string(&mq_config.url)); + } +} + +pub async fn run_sensor_service(config: Config, ready_message: &str) -> Result<()> { + let service = SensorService::new(config).await?; + + info!("Sensor Service initialized successfully"); + info!("Starting Sensor Service components..."); + service.start().await?; + info!("{}", ready_message); + + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; + + tokio::select! { + _ = sigint.recv() => { + info!("Received SIGINT signal"); + } + _ = sigterm.recv() => { + info!("Received SIGTERM signal"); + } + } + + info!("Shutting down gracefully..."); + + if let Err(e) = service.stop().await { + error!("Error during shutdown: {}", e); + } + + Ok(()) +} + +/// Mask sensitive parts of connection strings for logging. +pub fn mask_connection_string(url: &str) -> String { + if let Some(at_pos) = url.find('@') { + if let Some(proto_end) = url.find("://") { + let protocol = &url[..proto_end + 3]; + let host_and_path = &url[at_pos..]; + return format!("{}***:***{}", protocol, host_and_path); + } + } + "***:***@***".to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mask_connection_string() { + let url = "postgresql://user:password@localhost:5432/attune"; + let masked = mask_connection_string(url); + assert!(!masked.contains("user")); + assert!(!masked.contains("password")); + assert!(masked.contains("@localhost")); + } + + #[test] + fn test_mask_connection_string_no_credentials() { + let url = "postgresql://localhost:5432/attune"; + let masked = mask_connection_string(url); + assert_eq!(masked, "***:***@***"); + } + + #[test] + fn test_mask_rabbitmq_connection() { + let url = "amqp://admin:secret@rabbitmq:5672/%2F"; + let masked = mask_connection_string(url); + assert!(!masked.contains("admin")); + assert!(!masked.contains("secret")); + assert!(masked.contains("@rabbitmq")); + } +} diff --git a/crates/worker/src/agent_main.rs b/crates/worker/src/agent_main.rs index fd0d63c..f448d1c 100644 --- a/crates/worker/src/agent_main.rs +++ b/crates/worker/src/agent_main.rs @@ -28,18 +28,20 @@ //! - `--detect-only` — Run runtime detection, print results, and exit use anyhow::Result; +use attune_common::agent_bootstrap::{bootstrap_runtime_env, print_detect_only_report}; use attune_common::config::Config; use clap::Parser; use tokio::signal::unix::{signal, SignalKind}; use tracing::{info, warn}; use attune_worker::dynamic_runtime::auto_register_detected_runtimes; -use attune_worker::runtime_detect::{detect_runtimes, print_detection_report}; +use attune_worker::runtime_detect::DetectedRuntime; use attune_worker::service::WorkerService; #[derive(Parser, Debug)] #[command(name = "attune-agent")] #[command( + version, about = "Attune Universal Worker Agent - Injected into any container to auto-detect and execute actions", long_about = "The Attune Agent automatically discovers available runtime interpreters \ in the current environment and registers as a worker capable of executing \ @@ -73,119 +75,19 @@ fn main() -> Result<()> { let args = Args::parse(); info!("Starting Attune Universal Worker Agent"); + info!("Agent binary: attune-agent {}", env!("CARGO_PKG_VERSION")); - // --- Phase 1: Runtime auto-detection (synchronous, before tokio runtime) --- - // - // All std::env::set_var calls MUST happen here, before we create the tokio - // runtime, to avoid undefined behavior from mutating the process environment - // while other threads are running. - // - // Check if the user has explicitly set ATTUNE_WORKER_RUNTIMES. If so, skip - // auto-detection and respect their override. Otherwise, probe the system for - // available interpreters. - let runtimes_override = std::env::var("ATTUNE_WORKER_RUNTIMES").ok(); + // Safe: no async runtime or worker threads are running yet. + std::env::set_var("ATTUNE_AGENT_MODE", "true"); + std::env::set_var("ATTUNE_AGENT_BINARY_NAME", "attune-agent"); + std::env::set_var("ATTUNE_AGENT_BINARY_VERSION", env!("CARGO_PKG_VERSION")); - // Holds the detected runtimes so we can pass them to WorkerService later. - // Populated in both branches: auto-detection and override (filtered to - // match the override list). - let mut agent_detected_runtimes: Option> = - None; - - if let Some(ref override_value) = runtimes_override { - info!( - "ATTUNE_WORKER_RUNTIMES already set (override): {}", - override_value - ); - - // Even with an explicit override, run detection so we can register - // the overridden runtimes in the database and advertise accurate - // capability metadata (binary paths, versions). Without this, the - // worker would accept work for runtimes that were never registered - // locally — e.g. ruby/go on a fresh deployment. - info!("Running auto-detection for override-specified runtimes..."); - let detected = detect_runtimes(); - - // Filter detected runtimes to only those matching the override list, - // so we don't register runtimes the user explicitly excluded. - let override_names: Vec<&str> = override_value.split(',').map(|s| s.trim()).collect(); - let filtered: Vec<_> = detected - .into_iter() - .filter(|rt| { - let normalized = attune_common::runtime_detection::normalize_runtime_name(&rt.name); - override_names.iter().any(|ov| { - attune_common::runtime_detection::normalize_runtime_name(ov) == normalized - }) - }) - .collect(); - - if filtered.is_empty() { - warn!( - "None of the override runtimes ({}) were found on this system! \ - The agent may not be able to execute any actions.", - override_value - ); - } else { - info!( - "Matched {} override runtime(s) to detected interpreters:", - filtered.len() - ); - for rt in &filtered { - match &rt.version { - Some(ver) => info!(" ✓ {} — {} ({})", rt.name, rt.path, ver), - None => info!(" ✓ {} — {}", rt.name, rt.path), - } - } - agent_detected_runtimes = Some(filtered); - } - } else { - info!("No ATTUNE_WORKER_RUNTIMES override — running auto-detection..."); - - let detected = detect_runtimes(); - - if detected.is_empty() { - warn!("No runtimes detected! The agent may not be able to execute any actions."); - } else { - info!("Detected {} runtime(s):", detected.len()); - for rt in &detected { - match &rt.version { - Some(ver) => info!(" ✓ {} — {} ({})", rt.name, rt.path, ver), - None => info!(" ✓ {} — {}", rt.name, rt.path), - } - } - - // Build comma-separated runtime list and set the env var so that - // Config::load() and WorkerService pick it up downstream. - let runtime_list: Vec<&str> = detected.iter().map(|r| r.name.as_str()).collect(); - let runtime_csv = runtime_list.join(","); - info!("Setting ATTUNE_WORKER_RUNTIMES={}", runtime_csv); - // Safe: no other threads are running yet (tokio runtime not started). - std::env::set_var("ATTUNE_WORKER_RUNTIMES", &runtime_csv); - - // Stash for Phase 2: pass to WorkerService for rich capability registration - agent_detected_runtimes = Some(detected); - } - } + let bootstrap = bootstrap_runtime_env("ATTUNE_WORKER_RUNTIMES"); + let agent_detected_runtimes = bootstrap.detected_runtimes.clone(); // --- Handle --detect-only (synchronous, no async runtime needed) --- if args.detect_only { - if runtimes_override.is_some() { - // User set an override, but --detect-only should show what's actually - // on this system regardless, so re-run detection. - info!( - "--detect-only: re-running detection to show what is available on this system..." - ); - println!("NOTE: ATTUNE_WORKER_RUNTIMES is set — auto-detection was skipped during normal startup."); - println!(" Showing what auto-detection would find on this system:"); - println!(); - let detected = detect_runtimes(); - print_detection_report(&detected); - } else if let Some(ref detected) = agent_detected_runtimes { - print_detection_report(detected); - } else { - // No detection ran (empty results), run it fresh - let detected = detect_runtimes(); - print_detection_report(&detected); - } + print_detect_only_report("ATTUNE_WORKER_RUNTIMES", &bootstrap); return Ok(()); } @@ -204,7 +106,7 @@ fn main() -> Result<()> { /// `runtime.block_on()` after all environment variable mutations are complete. async fn async_main( args: Args, - agent_detected_runtimes: Option>, + agent_detected_runtimes: Option>, ) -> Result<()> { // --- Phase 2: Load configuration --- let mut config = Config::load()?; diff --git a/crates/worker/src/dynamic_runtime.rs b/crates/worker/src/dynamic_runtime.rs index 60d5474..d40d6e9 100644 --- a/crates/worker/src/dynamic_runtime.rs +++ b/crates/worker/src/dynamic_runtime.rs @@ -9,8 +9,7 @@ //! //! For each detected runtime the agent found: //! -//! 1. **Look up by name** in the database using alias-aware matching -//! (via [`normalize_runtime_name`]). +//! 1. **Look up by name** in the database using alias-aware matching. //! 2. **If found** → already registered (either from a pack YAML or a previous //! agent run). Nothing to do. //! 3. **If not found** → search for a runtime *template* in loaded packs whose @@ -29,7 +28,7 @@ use attune_common::error::Result; use attune_common::models::runtime::Runtime; use attune_common::repositories::runtime::{CreateRuntimeInput, RuntimeRepository}; use attune_common::repositories::{Create, FindByRef, List}; -use attune_common::runtime_detection::normalize_runtime_name; + use serde_json::json; use sqlx::PgPool; use tracing::{debug, info, warn}; @@ -80,14 +79,17 @@ pub async fn auto_register_detected_runtimes( let mut registered_count = 0; for detected_rt in detected { - let canonical_name = normalize_runtime_name(&detected_rt.name); + let canonical_name = detected_rt.name.to_ascii_lowercase(); // Check if a runtime with a matching name already exists in the DB. - // We normalize both sides for alias-aware comparison. - // normalize_runtime_name lowercases internally, so no need to pre-lowercase. - let already_exists = existing_runtimes - .iter() - .any(|r| normalize_runtime_name(&r.name) == canonical_name); + // Primary: check if the detected name appears in any existing runtime's aliases. + // Secondary: check if the ref ends with the canonical name (e.g., "core.ruby"). + let already_exists = existing_runtimes.iter().any(|r| { + // Primary: check if the detected name is in this runtime's aliases + r.aliases.iter().any(|a| a == &canonical_name) + // Secondary: check if the ref ends with the canonical name (e.g., "core.ruby") + || r.r#ref.ends_with(&format!(".{}", canonical_name)) + }); if already_exists { debug!( @@ -143,6 +145,7 @@ pub async fn auto_register_detected_runtimes( detected_rt.name, tmpl.r#ref )), name: tmpl.name.clone(), + aliases: tmpl.aliases.clone(), distributions: tmpl.distributions.clone(), installation: tmpl.installation.clone(), execution_config: build_execution_config_from_template(&tmpl, detected_rt), @@ -195,6 +198,7 @@ pub async fn auto_register_detected_runtimes( detected_rt.name, detected_rt.path )), name: capitalize_runtime_name(&canonical_name), + aliases: default_aliases(&canonical_name), distributions: build_minimal_distributions(detected_rt), installation: None, execution_config, @@ -285,7 +289,7 @@ fn build_execution_config_from_template( /// This provides enough information for `ProcessRuntime` to invoke the /// interpreter directly, without environment or dependency management. fn build_minimal_execution_config(detected: &DetectedRuntime) -> serde_json::Value { - let canonical = normalize_runtime_name(&detected.name); + let canonical = detected.name.to_ascii_lowercase(); let file_ext = default_file_extension(&canonical); let mut config = json!({ @@ -319,6 +323,23 @@ fn build_minimal_distributions(detected: &DetectedRuntime) -> serde_json::Value }) } +/// Default aliases for auto-detected runtimes that have no template. +/// These match what the core pack YAMLs declare but serve as fallback +/// when the template hasn't been loaded. +fn default_aliases(canonical_name: &str) -> Vec { + match canonical_name { + "shell" => vec!["shell".into(), "bash".into(), "sh".into()], + "python" => vec!["python".into(), "python3".into()], + "node" => vec!["node".into(), "nodejs".into(), "node.js".into()], + "ruby" => vec!["ruby".into(), "rb".into()], + "go" => vec!["go".into(), "golang".into()], + "java" => vec!["java".into(), "jdk".into(), "openjdk".into()], + "perl" => vec!["perl".into(), "perl5".into()], + "r" => vec!["r".into(), "rscript".into()], + _ => vec![canonical_name.to_string()], + } +} + /// Capitalize a runtime name for display (e.g., "ruby" → "Ruby", "r" → "R"). fn capitalize_runtime_name(name: &str) -> String { let mut chars = name.chars(); @@ -437,6 +458,7 @@ mod tests { pack_ref: Some("core".to_string()), description: Some("Ruby Runtime".to_string()), name: "Ruby".to_string(), + aliases: vec!["ruby".to_string(), "rb".to_string()], distributions: json!({}), installation: None, installers: json!({}), @@ -480,6 +502,7 @@ mod tests { pack_ref: Some("core".to_string()), description: None, name: "Python".to_string(), + aliases: vec!["python".to_string(), "python3".to_string()], distributions: json!({}), installation: None, installers: json!({}), diff --git a/crates/worker/src/env_setup.rs b/crates/worker/src/env_setup.rs index 53199e7..14a29f8 100644 --- a/crates/worker/src/env_setup.rs +++ b/crates/worker/src/env_setup.rs @@ -35,7 +35,7 @@ use attune_common::repositories::pack::PackRepository; use attune_common::repositories::runtime::RuntimeRepository; use attune_common::repositories::runtime_version::RuntimeVersionRepository; use attune_common::repositories::{FindById, List}; -use attune_common::runtime_detection::runtime_in_filter; +use attune_common::runtime_detection::runtime_aliases_match_filter; // Re-export the utility that the API also uses so callers can reach it from // either crate without adding a direct common dependency for this one function. @@ -207,7 +207,7 @@ pub async fn setup_environments_for_registered_pack( .iter() .filter(|name| { if let Some(filter) = runtime_filter { - runtime_in_filter(name, filter) + runtime_aliases_match_filter(&[name.to_string()], filter) } else { true } @@ -463,12 +463,12 @@ async fn process_runtime_for_pack( runtime_envs_dir: &Path, pack_result: &mut PackEnvSetupResult, ) { - // Apply worker runtime filter (alias-aware matching) + // Apply worker runtime filter (alias-aware matching via declared aliases) if let Some(filter) = runtime_filter { - if !runtime_in_filter(rt_name, filter) { + if !runtime_aliases_match_filter(&rt.aliases, filter) { debug!( - "Runtime '{}' not in worker filter, skipping for pack '{}'", - rt_name, pack_ref, + "Runtime '{}' not in worker filter (aliases: {:?}), skipping for pack '{}'", + rt_name, rt.aliases, pack_ref, ); return; } diff --git a/crates/worker/src/registration.rs b/crates/worker/src/registration.rs index 6456cbc..5bdf9fd 100644 --- a/crates/worker/src/registration.rs +++ b/crates/worker/src/registration.rs @@ -15,6 +15,10 @@ use tracing::{info, warn}; use crate::runtime_detect::DetectedRuntime; +const ATTUNE_AGENT_MODE_ENV: &str = "ATTUNE_AGENT_MODE"; +const ATTUNE_AGENT_BINARY_NAME_ENV: &str = "ATTUNE_AGENT_BINARY_NAME"; +const ATTUNE_AGENT_BINARY_VERSION_ENV: &str = "ATTUNE_AGENT_BINARY_VERSION"; + /// Worker registration manager pub struct WorkerRegistration { pool: PgPool, @@ -29,12 +33,60 @@ pub struct WorkerRegistration { } impl WorkerRegistration { + fn env_truthy(name: &str) -> bool { + std::env::var(name) + .ok() + .map(|value| matches!(value.trim().to_ascii_lowercase().as_str(), "1" | "true")) + .unwrap_or(false) + } + + fn inject_agent_capabilities(capabilities: &mut HashMap) { + if Self::env_truthy(ATTUNE_AGENT_MODE_ENV) { + capabilities.insert("agent_mode".to_string(), json!(true)); + } + + if let Ok(binary_name) = std::env::var(ATTUNE_AGENT_BINARY_NAME_ENV) { + let binary_name = binary_name.trim(); + if !binary_name.is_empty() { + capabilities.insert("agent_binary_name".to_string(), json!(binary_name)); + } + } + + if let Ok(binary_version) = std::env::var(ATTUNE_AGENT_BINARY_VERSION_ENV) { + let binary_version = binary_version.trim(); + if !binary_version.is_empty() { + capabilities.insert("agent_binary_version".to_string(), json!(binary_version)); + } + } + } + + fn legacy_worker_name() -> Option { + std::env::var("ATTUNE_WORKER_NAME") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + } + + fn legacy_worker_type() -> Option { + let value = std::env::var("ATTUNE_WORKER_TYPE").ok()?; + match value.trim().to_ascii_lowercase().as_str() { + "local" => Some(WorkerType::Local), + "remote" => Some(WorkerType::Remote), + "container" => Some(WorkerType::Container), + other => { + warn!("Ignoring unrecognized ATTUNE_WORKER_TYPE value: {}", other); + None + } + } + } + /// Create a new worker registration manager pub fn new(pool: PgPool, config: &Config) -> Self { let worker_name = config .worker .as_ref() .and_then(|w| w.name.clone()) + .or_else(Self::legacy_worker_name) .unwrap_or_else(|| { format!( "worker-{}", @@ -48,6 +100,7 @@ impl WorkerRegistration { .worker .as_ref() .and_then(|w| w.worker_type) + .or_else(Self::legacy_worker_type) .unwrap_or(WorkerType::Local); let worker_role = WorkerRole::Action; @@ -86,6 +139,8 @@ impl WorkerRegistration { json!(env!("CARGO_PKG_VERSION")), ); + Self::inject_agent_capabilities(&mut capabilities); + // Placeholder for runtimes (will be detected asynchronously) capabilities.insert("runtimes".to_string(), json!(Vec::::new())); @@ -461,4 +516,28 @@ mod tests { let value = json!(false); assert_eq!(value, false); } + + #[test] + fn test_inject_agent_capabilities_from_env() { + std::env::set_var(ATTUNE_AGENT_MODE_ENV, "TRUE"); + std::env::set_var(ATTUNE_AGENT_BINARY_NAME_ENV, "attune-agent"); + std::env::set_var(ATTUNE_AGENT_BINARY_VERSION_ENV, "1.2.3"); + + let mut capabilities = HashMap::new(); + WorkerRegistration::inject_agent_capabilities(&mut capabilities); + + assert_eq!(capabilities.get("agent_mode"), Some(&json!(true))); + assert_eq!( + capabilities.get("agent_binary_name"), + Some(&json!("attune-agent")) + ); + assert_eq!( + capabilities.get("agent_binary_version"), + Some(&json!("1.2.3")) + ); + + std::env::remove_var(ATTUNE_AGENT_MODE_ENV); + std::env::remove_var(ATTUNE_AGENT_BINARY_NAME_ENV); + std::env::remove_var(ATTUNE_AGENT_BINARY_VERSION_ENV); + } } diff --git a/crates/worker/src/runtime_detect.rs b/crates/worker/src/runtime_detect.rs index 5bb1e7b..708685a 100644 --- a/crates/worker/src/runtime_detect.rs +++ b/crates/worker/src/runtime_detect.rs @@ -1,544 +1,12 @@ -//! Runtime Auto-Detection Module -//! -//! Provides lightweight, database-free runtime detection for the Universal Worker Agent. -//! Unlike [`attune_common::runtime_detection::RuntimeDetector`] which queries the database -//! for runtime definitions and verification metadata, this module probes the local system -//! directly by checking for well-known interpreter binaries on PATH. -//! -//! This is designed for the agent entrypoint (`attune-agent`) which is injected into -//! arbitrary containers and must discover what runtimes are available without any -//! database connectivity at detection time. -//! -//! # Detection Strategy -//! -//! For each candidate runtime, the detector: -//! 1. Checks if a binary exists and is executable using `which`-style PATH lookup -//! 2. Optionally runs a version command (e.g., `python3 --version`) to capture the version -//! 3. Returns a list of [`DetectedRuntime`] structs with name, path, and version info -//! -//! # Supported Runtimes -//! -//! | Runtime | Binaries checked (in order) | Version command | -//! |----------|-------------------------------|-------------------------| -//! | shell | `bash`, `sh` | `bash --version` | -//! | python | `python3`, `python` | `python3 --version` | -//! | node | `node`, `nodejs` | `node --version` | -//! | ruby | `ruby` | `ruby --version` | -//! | go | `go` | `go version` | -//! | java | `java` | `java -version` | -//! | r | `Rscript` | `Rscript --version` | -//! | perl | `perl` | `perl --version` | +//! Compatibility wrapper around the shared agent runtime detection module. -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::process::Command; -use tracing::{debug, info}; +pub use attune_common::agent_runtime_detection::{ + detect_runtimes, format_as_env_value, DetectedRuntime, +}; -/// A runtime interpreter discovered on the local system. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DetectedRuntime { - /// Canonical runtime name (e.g., "shell", "python", "node"). - /// These names align with the normalized names from - /// [`attune_common::runtime_detection::normalize_runtime_name`]. - pub name: String, - - /// Absolute path to the interpreter binary (as resolved by `which`). - pub path: String, - - /// Version string if a version check command succeeded (e.g., "3.12.1"). - pub version: Option, -} - -impl fmt::Display for DetectedRuntime { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.version { - Some(v) => write!(f, "{} ({}, v{})", self.name, self.path, v), - None => write!(f, "{} ({})", self.name, self.path), - } - } -} - -/// A candidate runtime to probe for during detection. -struct RuntimeCandidate { - /// Canonical name for this runtime (used in ATTUNE_WORKER_RUNTIMES). - name: &'static str, - - /// Binary names to try, in priority order. The first one found wins. - binaries: &'static [&'static str], - - /// Arguments to pass to the binary to get a version string. - version_args: &'static [&'static str], - - /// How to extract the version from command output. - version_parser: VersionParser, -} - -/// Strategy for parsing version output from a command. -enum VersionParser { - /// Extract a version pattern like "X.Y.Z" from the combined stdout+stderr output. - /// This handles the common case where the version appears somewhere in the output - /// (e.g., "Python 3.12.1", "node v20.11.0", "go1.22.0"). - SemverLike, - - /// Java uses `-version` which writes to stderr, and the format is - /// `openjdk version "21.0.1"` or `java version "1.8.0_392"`. - JavaStyle, -} - -/// All candidate runtimes to probe, in detection order. -fn candidates() -> Vec { - vec![ - RuntimeCandidate { - name: "shell", - binaries: &["bash", "sh"], - version_args: &["--version"], - version_parser: VersionParser::SemverLike, - }, - RuntimeCandidate { - name: "python", - binaries: &["python3", "python"], - version_args: &["--version"], - version_parser: VersionParser::SemverLike, - }, - RuntimeCandidate { - name: "node", - binaries: &["node", "nodejs"], - version_args: &["--version"], - version_parser: VersionParser::SemverLike, - }, - RuntimeCandidate { - name: "ruby", - binaries: &["ruby"], - version_args: &["--version"], - version_parser: VersionParser::SemverLike, - }, - RuntimeCandidate { - name: "go", - binaries: &["go"], - version_args: &["version"], - version_parser: VersionParser::SemverLike, - }, - RuntimeCandidate { - name: "java", - binaries: &["java"], - version_args: &["-version"], - version_parser: VersionParser::JavaStyle, - }, - RuntimeCandidate { - name: "r", - binaries: &["Rscript"], - version_args: &["--version"], - version_parser: VersionParser::SemverLike, - }, - RuntimeCandidate { - name: "perl", - binaries: &["perl"], - version_args: &["--version"], - version_parser: VersionParser::SemverLike, - }, - ] -} - -/// Detect available runtimes by probing the local system for known interpreter binaries. -/// -/// This function performs synchronous subprocess calls (`std::process::Command`) since -/// it is a one-time startup operation. It checks each candidate runtime's binaries -/// in priority order using `which`-style PATH lookup, and optionally captures the -/// interpreter version. -/// -/// # Returns -/// -/// A vector of [`DetectedRuntime`] for each runtime that was found on the system. -/// The order matches the detection order (shell first, then python, node, etc.). -/// -/// # Example -/// -/// ```no_run -/// use attune_worker::runtime_detect::detect_runtimes; -/// -/// let runtimes = detect_runtimes(); -/// for rt in &runtimes { -/// println!("Found: {}", rt); -/// } -/// // Convert to ATTUNE_WORKER_RUNTIMES format -/// let names: Vec<&str> = runtimes.iter().map(|r| r.name.as_str()).collect(); -/// println!("ATTUNE_WORKER_RUNTIMES={}", names.join(",")); -/// ``` -pub fn detect_runtimes() -> Vec { - info!("Starting runtime auto-detection..."); - - let mut detected = Vec::new(); - - for candidate in candidates() { - match detect_single_runtime(&candidate) { - Some(runtime) => { - info!(" ✓ Detected: {}", runtime); - detected.push(runtime); - } - None => { - debug!(" ✗ Not found: {}", candidate.name); - } - } - } - - info!( - "Runtime auto-detection complete: found {} runtime(s): [{}]", - detected.len(), - detected - .iter() - .map(|r| r.name.as_str()) - .collect::>() - .join(", ") - ); - - detected -} - -/// Attempt to detect a single runtime by checking its candidate binaries. -fn detect_single_runtime(candidate: &RuntimeCandidate) -> Option { - for binary in candidate.binaries { - if let Some(path) = which_binary(binary) { - debug!( - "Found {} at {} (for runtime '{}')", - binary, path, candidate.name - ); - - // Attempt to get version info (non-fatal if it fails) - let version = get_version(&path, candidate.version_args, &candidate.version_parser); - - return Some(DetectedRuntime { - name: candidate.name.to_string(), - path, - version, - }); - } - } - - None -} - -/// Look up a binary on PATH, similar to the `which` command. -/// -/// Uses `which ` on the system to resolve the full path. -/// Returns `None` if the binary is not found or `which` fails. -fn which_binary(binary: &str) -> Option { - // First check well-known absolute paths for shell interpreters - // (these may not be on PATH in minimal containers) - if binary == "bash" || binary == "sh" { - let absolute_path = format!("/bin/{}", binary); - if std::path::Path::new(&absolute_path).exists() { - return Some(absolute_path); - } - } - - // Fall back to PATH lookup via `which` - match Command::new("which").arg(binary).output() { - Ok(output) => { - if output.status.success() { - let path = String::from_utf8_lossy(&output.stdout).trim().to_string(); - if !path.is_empty() { - Some(path) - } else { - None - } - } else { - None - } - } - Err(e) => { - // `which` itself not found — try `command -v` as fallback - debug!("'which' command failed ({}), trying 'command -v'", e); - match Command::new("sh") - .args(["-c", &format!("command -v {}", binary)]) - .output() - { - Ok(output) if output.status.success() => { - let path = String::from_utf8_lossy(&output.stdout).trim().to_string(); - if !path.is_empty() { - Some(path) - } else { - None - } - } - _ => None, - } - } - } -} - -/// Run a version command and parse the version string from the output. -fn get_version(binary_path: &str, version_args: &[&str], parser: &VersionParser) -> Option { - let output = match Command::new(binary_path).args(version_args).output() { - Ok(output) => output, - Err(e) => { - debug!("Failed to run version command for {}: {}", binary_path, e); - return None; - } - }; - - let stdout = String::from_utf8_lossy(&output.stdout); - let stderr = String::from_utf8_lossy(&output.stderr); - let combined = format!("{}{}", stdout, stderr); - - match parser { - VersionParser::SemverLike => parse_semver_like(&combined), - VersionParser::JavaStyle => parse_java_version(&combined), - } -} - -/// Extract a semver-like version (X.Y.Z or X.Y) from output text. -/// -/// Handles common patterns: -/// - "Python 3.12.1" -/// - "node v20.11.0" -/// - "go version go1.22.0 linux/amd64" -/// - "GNU bash, version 5.2.15(1)-release" -/// - "ruby 3.2.2 (2023-03-30 revision e51014f9c0)" -/// - "perl 5, version 36, subversion 0 (v5.36.0)" -fn parse_semver_like(output: &str) -> Option { - // Try to find a pattern like X.Y.Z or X.Y (with optional leading 'v') - // Also handle go's "go1.22.0" format - let re = regex::Regex::new(r"(?:v|go)?(\d+\.\d+(?:\.\d+)?)").ok()?; - - if let Some(captures) = re.captures(output) { - captures.get(1).map(|m| m.as_str().to_string()) - } else { - None - } -} - -/// Parse Java's peculiar version output format. -/// -/// Java writes to stderr and uses formats like: -/// - `openjdk version "21.0.1" 2023-10-17` -/// - `java version "1.8.0_392"` -fn parse_java_version(output: &str) -> Option { - // Look for version inside quotes first - let quoted_re = regex::Regex::new(r#"version\s+"([^"]+)""#).ok()?; - if let Some(captures) = quoted_re.captures(output) { - return captures.get(1).map(|m| m.as_str().to_string()); - } - - // Fall back to semver-like parsing - parse_semver_like(output) -} - -/// Format detected runtimes as a comma-separated string suitable for -/// the `ATTUNE_WORKER_RUNTIMES` environment variable. -/// -/// # Example -/// -/// ```no_run -/// use attune_worker::runtime_detect::{detect_runtimes, format_as_env_value}; -/// -/// let runtimes = detect_runtimes(); -/// let env_val = format_as_env_value(&runtimes); -/// // e.g., "shell,python,node" -/// ``` -pub fn format_as_env_value(runtimes: &[DetectedRuntime]) -> String { - runtimes - .iter() - .map(|r| r.name.as_str()) - .collect::>() - .join(",") -} - -/// Print a human-readable detection report to stdout. -/// -/// Used by the `--detect-only` flag to show detection results and exit. pub fn print_detection_report(runtimes: &[DetectedRuntime]) { - println!("=== Attune Agent Runtime Detection Report ==="); - println!(); - - if runtimes.is_empty() { - println!("No runtimes detected!"); - println!(); - println!("The agent could not find any supported interpreter binaries."); - println!("Ensure at least one of the following is installed and on PATH:"); - println!(" - bash / sh (shell scripts)"); - println!(" - python3 / python (Python scripts)"); - println!(" - node / nodejs (Node.js scripts)"); - println!(" - ruby (Ruby scripts)"); - println!(" - go (Go programs)"); - println!(" - java (Java programs)"); - println!(" - Rscript (R scripts)"); - println!(" - perl (Perl scripts)"); - } else { - println!("Detected {} runtime(s):", runtimes.len()); - println!(); - for rt in runtimes { - let version_str = rt.version.as_deref().unwrap_or("unknown version"); - println!(" ✓ {:<10} {} ({})", rt.name, rt.path, version_str); - } - } - - println!(); - println!("ATTUNE_WORKER_RUNTIMES={}", format_as_env_value(runtimes)); - println!(); -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_semver_like_python() { - assert_eq!( - parse_semver_like("Python 3.12.1"), - Some("3.12.1".to_string()) - ); - } - - #[test] - fn test_parse_semver_like_node() { - assert_eq!(parse_semver_like("v20.11.0"), Some("20.11.0".to_string())); - } - - #[test] - fn test_parse_semver_like_go() { - assert_eq!( - parse_semver_like("go version go1.22.0 linux/amd64"), - Some("1.22.0".to_string()) - ); - } - - #[test] - fn test_parse_semver_like_bash() { - assert_eq!( - parse_semver_like("GNU bash, version 5.2.15(1)-release (x86_64-pc-linux-gnu)"), - Some("5.2.15".to_string()) - ); - } - - #[test] - fn test_parse_semver_like_ruby() { - assert_eq!( - parse_semver_like("ruby 3.2.2 (2023-03-30 revision e51014f9c0) [x86_64-linux]"), - Some("3.2.2".to_string()) - ); - } - - #[test] - fn test_parse_semver_like_two_part() { - assert_eq!( - parse_semver_like("SomeRuntime 1.5"), - Some("1.5".to_string()) - ); - } - - #[test] - fn test_parse_semver_like_no_match() { - assert_eq!(parse_semver_like("no version here"), None); - } - - #[test] - fn test_parse_java_version_openjdk() { - assert_eq!( - parse_java_version(r#"openjdk version "21.0.1" 2023-10-17"#), - Some("21.0.1".to_string()) - ); - } - - #[test] - fn test_parse_java_version_legacy() { - assert_eq!( - parse_java_version(r#"java version "1.8.0_392""#), - Some("1.8.0_392".to_string()) - ); - } - - #[test] - fn test_format_as_env_value_empty() { - let runtimes: Vec = vec![]; - assert_eq!(format_as_env_value(&runtimes), ""); - } - - #[test] - fn test_format_as_env_value_multiple() { - let runtimes = vec![ - DetectedRuntime { - name: "shell".to_string(), - path: "/bin/bash".to_string(), - version: Some("5.2.15".to_string()), - }, - DetectedRuntime { - name: "python".to_string(), - path: "/usr/bin/python3".to_string(), - version: Some("3.12.1".to_string()), - }, - DetectedRuntime { - name: "node".to_string(), - path: "/usr/bin/node".to_string(), - version: None, - }, - ]; - assert_eq!(format_as_env_value(&runtimes), "shell,python,node"); - } - - #[test] - fn test_detected_runtime_display_with_version() { - let rt = DetectedRuntime { - name: "python".to_string(), - path: "/usr/bin/python3".to_string(), - version: Some("3.12.1".to_string()), - }; - assert_eq!(format!("{}", rt), "python (/usr/bin/python3, v3.12.1)"); - } - - #[test] - fn test_detected_runtime_display_without_version() { - let rt = DetectedRuntime { - name: "shell".to_string(), - path: "/bin/bash".to_string(), - version: None, - }; - assert_eq!(format!("{}", rt), "shell (/bin/bash)"); - } - - #[test] - fn test_detect_runtimes_runs_without_panic() { - // This test verifies the detection logic doesn't panic, - // regardless of what's actually installed on the system. - let runtimes = detect_runtimes(); - // We should at least find a shell on any Unix system - // but we don't assert that since test environments vary. - let _ = runtimes; - } - - #[test] - fn test_which_binary_sh() { - // /bin/sh should exist on virtually all Unix systems - let result = which_binary("sh"); - assert!(result.is_some(), "Expected to find 'sh' on this system"); - } - - #[test] - fn test_which_binary_nonexistent() { - let result = which_binary("definitely_not_a_real_binary_xyz123"); - assert!(result.is_none()); - } - - #[test] - fn test_candidates_order() { - let c = candidates(); - assert_eq!(c[0].name, "shell"); - assert_eq!(c[1].name, "python"); - assert_eq!(c[2].name, "node"); - assert_eq!(c[3].name, "ruby"); - assert_eq!(c[4].name, "go"); - assert_eq!(c[5].name, "java"); - assert_eq!(c[6].name, "r"); - assert_eq!(c[7].name, "perl"); - } - - #[test] - fn test_candidates_binaries_priority() { - let c = candidates(); - // shell prefers bash over sh - assert_eq!(c[0].binaries, &["bash", "sh"]); - // python prefers python3 over python - assert_eq!(c[1].binaries, &["python3", "python"]); - // node prefers node over nodejs - assert_eq!(c[2].binaries, &["node", "nodejs"]); - } + attune_common::agent_runtime_detection::print_detection_report_for_env( + "ATTUNE_WORKER_RUNTIMES", + runtimes, + ); } diff --git a/crates/worker/src/service.rs b/crates/worker/src/service.rs index 99fadfc..deb9283 100644 --- a/crates/worker/src/service.rs +++ b/crates/worker/src/service.rs @@ -23,7 +23,7 @@ use attune_common::mq::{ MessageEnvelope, MessageType, PackRegisteredPayload, Publisher, PublisherConfig, }; use attune_common::repositories::{execution::ExecutionRepository, FindById}; -use attune_common::runtime_detection::runtime_in_filter; +use attune_common::runtime_detection::runtime_aliases_match_filter; use chrono::Utc; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -253,10 +253,10 @@ impl WorkerService { // Uses alias-aware matching so that e.g. filter "node" // matches DB runtime name "Node.js" (lowercased to "node.js"). if let Some(ref filter) = runtime_filter { - if !runtime_in_filter(&rt_name, filter) { + if !runtime_aliases_match_filter(&rt.aliases, filter) { debug!( - "Skipping runtime '{}' (not in ATTUNE_WORKER_RUNTIMES filter)", - rt_name + "Skipping runtime '{}' (aliases {:?} not in ATTUNE_WORKER_RUNTIMES filter)", + rt_name, rt.aliases ); continue; } diff --git a/crates/worker/src/version_verify.rs b/crates/worker/src/version_verify.rs index f53f10d..25de3ff 100644 --- a/crates/worker/src/version_verify.rs +++ b/crates/worker/src/version_verify.rs @@ -17,7 +17,7 @@ use tracing::{debug, info, warn}; use attune_common::models::RuntimeVersion; use attune_common::repositories::runtime_version::RuntimeVersionRepository; -use attune_common::runtime_detection::runtime_in_filter; +use attune_common::runtime_detection::runtime_aliases_match_filter; /// Result of verifying all runtime versions at startup. #[derive(Debug)] @@ -95,7 +95,7 @@ pub async fn verify_all_runtime_versions( .to_lowercase(); if let Some(filter) = runtime_filter { - if !runtime_in_filter(&rt_base_name, filter) { + if !runtime_aliases_match_filter(&[rt_base_name.to_string()], filter) { debug!( "Skipping version '{}' of runtime '{}' (not in worker runtime filter)", version.version, version.runtime_ref, diff --git a/docker-compose.agent.yaml b/docker-compose.agent.yaml index e521320..9916e73 100644 --- a/docker-compose.agent.yaml +++ b/docker-compose.agent.yaml @@ -8,10 +8,10 @@ # # Prerequisites: # The init-agent service (defined in docker-compose.yaml) must be present. -# It builds the statically-linked agent binary and populates the agent_bin volume. +# It builds the statically-linked worker and sensor agent binaries and populates the agent_bin volume. # # How it works: -# 1. init-agent builds a musl-static attune-agent binary and copies it to the agent_bin volume +# 1. init-agent builds musl-static injected agent binaries and copies them to the agent_bin volume # 2. Each worker service mounts agent_bin read-only and uses the agent as its entrypoint # 3. The agent auto-detects available runtimes in the container (python, ruby, node, etc.) # 4. No Dockerfile needed — just point at any container image with your desired runtime diff --git a/docker-compose.yaml b/docker-compose.yaml index c4e8022..9f236d6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -126,8 +126,9 @@ services: restart: on-failure entrypoint: "" # Override Python image entrypoint - # Agent binary volume population (builds the statically-linked agent and copies it to a shared volume) - # Other containers can use the agent binary by mounting agent_bin volume and running /opt/attune/agent/attune-agent + # Agent binary volume population (builds the statically-linked worker and sensor agents) + # Other containers can use these binaries by mounting agent_bin and running + # /opt/attune/agent/attune-agent or /opt/attune/agent/attune-sensor-agent. init-agent: build: context: . @@ -142,7 +143,7 @@ services: [ "/bin/sh", "-c", - "cp /usr/local/bin/attune-agent /opt/attune/agent/attune-agent && chmod +x /opt/attune/agent/attune-agent && echo 'Agent binary copied successfully'", + "cp /usr/local/bin/attune-agent /opt/attune/agent/attune-agent && cp /usr/local/bin/attune-sensor-agent /opt/attune/agent/attune-sensor-agent && chmod +x /opt/attune/agent/attune-agent /opt/attune/agent/attune-sensor-agent && /usr/local/bin/attune-agent --version > /opt/attune/agent/attune-agent.version && /usr/local/bin/attune-sensor-agent --version > /opt/attune/agent/attune-sensor-agent.version && echo 'Agent binaries copied successfully'", ] restart: "no" networks: @@ -223,6 +224,8 @@ services: - api_logs:/opt/attune/logs - agent_bin:/opt/attune/agent:ro depends_on: + init-agent: + condition: service_completed_successfully init-packs: condition: service_completed_successfully init-user: @@ -294,19 +297,17 @@ services: # ============================================================================ # Workers # ============================================================================ + # Default agent-based workers + # These use stock runtime images and inject the statically-linked attune-agent + # from the shared agent_bin volume instead of baking attune-worker into each image. worker-shell: - build: - context: . - dockerfile: docker/Dockerfile.worker.optimized - target: worker-base - args: - BUILDKIT_INLINE_CACHE: 1 + image: debian:bookworm-slim container_name: attune-worker-shell + entrypoint: ["/opt/attune/agent/attune-agent"] stop_grace_period: 45s environment: RUST_LOG: info ATTUNE_CONFIG: /opt/attune/config/config.yaml - ATTUNE_WORKER_RUNTIMES: shell ATTUNE_WORKER_TYPE: container ATTUNE_WORKER_NAME: worker-shell-01 ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} @@ -315,6 +316,7 @@ services: ATTUNE__MESSAGE_QUEUE__URL: amqp://attune:attune@rabbitmq:5672 ATTUNE_API_URL: http://attune-api:8080 volumes: + - agent_bin:/opt/attune/agent:ro - ${ATTUNE_DOCKER_CONFIG_PATH:-./config.docker.yaml}:/opt/attune/config/config.yaml:ro - packs_data:/opt/attune/packs:ro - ./packs.dev:/opt/attune/packs.dev:rw @@ -322,6 +324,8 @@ services: - artifacts_data:/opt/attune/artifacts - worker_shell_logs:/opt/attune/logs depends_on: + init-agent: + condition: service_completed_successfully init-packs: condition: service_completed_successfully init-user: @@ -333,7 +337,7 @@ services: rabbitmq: condition: service_healthy healthcheck: - test: ["CMD-SHELL", "pgrep -f attune-worker || exit 1"] + test: ["CMD-SHELL", "pgrep -f attune-agent || exit 1"] interval: 30s timeout: 10s retries: 3 @@ -342,20 +346,15 @@ services: - attune-network restart: unless-stopped - # Python worker - Shell + Python runtime + # Python worker - official Python image with agent auto-detection worker-python: - build: - context: . - dockerfile: docker/Dockerfile.worker.optimized - target: worker-python - args: - BUILDKIT_INLINE_CACHE: 1 + image: python:3.12-slim container_name: attune-worker-python + entrypoint: ["/opt/attune/agent/attune-agent"] stop_grace_period: 45s environment: RUST_LOG: info ATTUNE_CONFIG: /opt/attune/config/config.yaml - ATTUNE_WORKER_RUNTIMES: shell,python ATTUNE_WORKER_TYPE: container ATTUNE_WORKER_NAME: worker-python-01 ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} @@ -364,6 +363,7 @@ services: ATTUNE__MESSAGE_QUEUE__URL: amqp://attune:attune@rabbitmq:5672 ATTUNE_API_URL: http://attune-api:8080 volumes: + - agent_bin:/opt/attune/agent:ro - ${ATTUNE_DOCKER_CONFIG_PATH:-./config.docker.yaml}:/opt/attune/config/config.yaml:ro - packs_data:/opt/attune/packs:ro - ./packs.dev:/opt/attune/packs.dev:rw @@ -371,6 +371,8 @@ services: - artifacts_data:/opt/attune/artifacts - worker_python_logs:/opt/attune/logs depends_on: + init-agent: + condition: service_completed_successfully init-packs: condition: service_completed_successfully init-user: @@ -382,7 +384,7 @@ services: rabbitmq: condition: service_healthy healthcheck: - test: ["CMD-SHELL", "pgrep -f attune-worker || exit 1"] + test: ["CMD-SHELL", "pgrep -f attune-agent || exit 1"] interval: 30s timeout: 10s retries: 3 @@ -391,20 +393,15 @@ services: - attune-network restart: unless-stopped - # Node worker - Shell + Node.js runtime + # Node worker - official Node.js image with agent auto-detection worker-node: - build: - context: . - dockerfile: docker/Dockerfile.worker.optimized - target: worker-node - args: - BUILDKIT_INLINE_CACHE: 1 + image: node:22-slim container_name: attune-worker-node + entrypoint: ["/opt/attune/agent/attune-agent"] stop_grace_period: 45s environment: RUST_LOG: info ATTUNE_CONFIG: /opt/attune/config/config.yaml - ATTUNE_WORKER_RUNTIMES: shell,node ATTUNE_WORKER_TYPE: container ATTUNE_WORKER_NAME: worker-node-01 ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} @@ -413,6 +410,7 @@ services: ATTUNE__MESSAGE_QUEUE__URL: amqp://attune:attune@rabbitmq:5672 ATTUNE_API_URL: http://attune-api:8080 volumes: + - agent_bin:/opt/attune/agent:ro - ${ATTUNE_DOCKER_CONFIG_PATH:-./config.docker.yaml}:/opt/attune/config/config.yaml:ro - packs_data:/opt/attune/packs:ro - ./packs.dev:/opt/attune/packs.dev:rw @@ -420,6 +418,8 @@ services: - artifacts_data:/opt/attune/artifacts - worker_node_logs:/opt/attune/logs depends_on: + init-agent: + condition: service_completed_successfully init-packs: condition: service_completed_successfully init-user: @@ -431,7 +431,7 @@ services: rabbitmq: condition: service_healthy healthcheck: - test: ["CMD-SHELL", "pgrep -f attune-worker || exit 1"] + test: ["CMD-SHELL", "pgrep -f attune-agent || exit 1"] interval: 30s timeout: 10s retries: 3 @@ -440,19 +440,17 @@ services: - attune-network restart: unless-stopped - # Full worker - All runtimes (shell, python, node, native) + # Full worker - Python + Node image with manual native capability override worker-full: - build: - context: . - dockerfile: docker/Dockerfile.worker.optimized - target: worker-full - args: - BUILDKIT_INLINE_CACHE: 1 + image: nikolaik/python-nodejs:python3.12-nodejs22-slim container_name: attune-worker-full + entrypoint: ["/opt/attune/agent/attune-agent"] stop_grace_period: 45s environment: RUST_LOG: info ATTUNE_CONFIG: /opt/attune/config/config.yaml + # Keep native support enabled explicitly; the agent auto-detects interpreters + # but "native" is a capability flag rather than a discoverable interpreter. ATTUNE_WORKER_RUNTIMES: shell,python,node,native ATTUNE_WORKER_TYPE: container ATTUNE_WORKER_NAME: worker-full-01 @@ -462,6 +460,7 @@ services: ATTUNE__MESSAGE_QUEUE__URL: amqp://attune:attune@rabbitmq:5672 ATTUNE_API_URL: http://attune-api:8080 volumes: + - agent_bin:/opt/attune/agent:ro - ${ATTUNE_DOCKER_CONFIG_PATH:-./config.docker.yaml}:/opt/attune/config/config.yaml:ro - packs_data:/opt/attune/packs:ro - ./packs.dev:/opt/attune/packs.dev:rw @@ -469,6 +468,8 @@ services: - artifacts_data:/opt/attune/artifacts - worker_full_logs:/opt/attune/logs depends_on: + init-agent: + condition: service_completed_successfully init-packs: condition: service_completed_successfully init-user: @@ -480,7 +481,7 @@ services: rabbitmq: condition: service_healthy healthcheck: - test: ["CMD-SHELL", "pgrep -f attune-worker || exit 1"] + test: ["CMD-SHELL", "pgrep -f attune-agent || exit 1"] interval: 30s timeout: 10s retries: 3 @@ -489,18 +490,18 @@ services: - attune-network restart: unless-stopped + # Default sensor service now uses the injected sensor agent inside a stock runtime image. sensor: - build: - context: . - dockerfile: docker/Dockerfile.sensor.optimized - target: sensor-full - args: - BUILDKIT_INLINE_CACHE: 1 + image: nikolaik/python-nodejs:python3.12-nodejs22-slim container_name: attune-sensor + entrypoint: ["/opt/attune/agent/attune-sensor-agent"] stop_grace_period: 45s environment: RUST_LOG: debug ATTUNE_CONFIG: /opt/attune/config/config.yaml + # Keep native support enabled explicitly; interpreter auto-detection does + # not infer the synthetic "native" capability. + ATTUNE_SENSOR_RUNTIMES: shell,python,node,native ATTUNE__SECURITY__JWT_SECRET: ${JWT_SECRET:-docker-dev-secret-change-in-production} ATTUNE__SECURITY__ENCRYPTION_KEY: ${ENCRYPTION_KEY:-docker-dev-encryption-key-please-change-in-production-32plus} ATTUNE__DATABASE__URL: postgresql://attune:attune@postgres:5432/attune @@ -511,12 +512,15 @@ services: ATTUNE_MQ_URL: amqp://attune:attune@rabbitmq:5672 ATTUNE_PACKS_BASE_DIR: /opt/attune/packs volumes: + - agent_bin:/opt/attune/agent:ro - ${ATTUNE_DOCKER_CONFIG_PATH:-./config.docker.yaml}:/opt/attune/config/config.yaml:ro - packs_data:/opt/attune/packs:rw - ./packs.dev:/opt/attune/packs.dev:rw - runtime_envs:/opt/attune/runtime_envs - sensor_logs:/opt/attune/logs depends_on: + init-agent: + condition: service_completed_successfully init-packs: condition: service_completed_successfully init-user: diff --git a/docker/Dockerfile.agent b/docker/Dockerfile.agent index e273e78..a097fa9 100644 --- a/docker/Dockerfile.agent +++ b/docker/Dockerfile.agent @@ -1,8 +1,7 @@ -# Multi-stage Dockerfile for the Attune universal worker agent +# Multi-stage Dockerfile for the Attune injected agent binaries # -# Builds a statically-linked attune-agent binary using musl, suitable for -# injection into ANY container as a sidecar or init container. The binary -# has zero runtime dependencies — no glibc, no libssl, no shared libraries. +# Builds statically-linked `attune-agent` and `attune-sensor-agent` binaries +# using musl, suitable for injection into arbitrary runtime containers. # # Stages: # builder - Cross-compile with musl for a fully static binary @@ -24,8 +23,8 @@ # # volumes: # # - agent_binary:/shared # -# Note: The agent binary is part of the worker crate (--bin attune-agent). -# It connects to the Attune API and executes actions inside the target container. +# Note: `attune-agent` lives in the worker crate and `attune-sensor-agent` +# lives in the sensor crate. ARG RUST_VERSION=1.92 ARG DEBIAN_VERSION=bookworm @@ -71,13 +70,14 @@ COPY crates/cli/Cargo.toml ./crates/cli/Cargo.toml # Create minimal stub sources so cargo can resolve the workspace and fetch deps. # These are ONLY used for `cargo fetch` — never compiled. -# NOTE: The worker crate has TWO binary targets (attune-worker and attune-agent), -# so we create stubs for both to satisfy the workspace resolver. +# NOTE: The worker crate has TWO binary targets and the sensor crate now has +# two binary targets as well, so we create stubs for all of them. RUN mkdir -p crates/common/src && echo "" > crates/common/src/lib.rs && \ mkdir -p crates/api/src && echo "fn main(){}" > crates/api/src/main.rs && \ mkdir -p crates/executor/src && echo "fn main(){}" > crates/executor/src/main.rs && \ mkdir -p crates/executor/benches && echo "fn main(){}" > crates/executor/benches/context_clone.rs && \ mkdir -p crates/sensor/src && echo "fn main(){}" > crates/sensor/src/main.rs && \ + echo "fn main(){}" > crates/sensor/src/agent_main.rs && \ mkdir -p crates/core-timer-sensor/src && echo "fn main(){}" > crates/core-timer-sensor/src/main.rs && \ mkdir -p crates/worker/src && echo "fn main(){}" > crates/worker/src/main.rs && \ echo "fn main(){}" > crates/worker/src/agent_main.rs && \ @@ -97,22 +97,25 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \ COPY migrations/ ./migrations/ COPY crates/ ./crates/ -# Build ONLY the attune-agent binary, statically linked with musl. +# Build the injected agent binaries, statically linked with musl. # Uses a dedicated cache ID (agent-target) so the musl target directory # doesn't collide with the glibc target cache used by other Dockerfiles. RUN --mount=type=cache,target=/usr/local/cargo/registry,sharing=shared \ --mount=type=cache,target=/usr/local/cargo/git,sharing=shared \ --mount=type=cache,id=agent-target,target=/build/target,sharing=locked \ - cargo build --release --target x86_64-unknown-linux-musl --bin attune-agent && \ - cp /build/target/x86_64-unknown-linux-musl/release/attune-agent /build/attune-agent + cargo build --release --target x86_64-unknown-linux-musl --bin attune-agent --bin attune-sensor-agent && \ + cp /build/target/x86_64-unknown-linux-musl/release/attune-agent /build/attune-agent && \ + cp /build/target/x86_64-unknown-linux-musl/release/attune-sensor-agent /build/attune-sensor-agent -# Strip the binary to minimize size -RUN strip /build/attune-agent +# Strip the binaries to minimize size +RUN strip /build/attune-agent && strip /build/attune-sensor-agent -# Verify the binary is statically linked and functional -RUN ls -lh /build/attune-agent && \ +# Verify the binaries are statically linked and functional +RUN ls -lh /build/attune-agent /build/attune-sensor-agent && \ file /build/attune-agent && \ - ldd /build/attune-agent 2>&1 || true + file /build/attune-sensor-agent && \ + ldd /build/attune-agent 2>&1 || true && \ + ldd /build/attune-sensor-agent 2>&1 || true # ============================================================================ # Stage 2: agent-binary - Minimal image with just the static binary @@ -122,6 +125,7 @@ RUN ls -lh /build/attune-agent && \ FROM scratch AS agent-binary COPY --from=builder /build/attune-agent /usr/local/bin/attune-agent +COPY --from=builder /build/attune-sensor-agent /usr/local/bin/attune-sensor-agent ENTRYPOINT ["/usr/local/bin/attune-agent"] @@ -149,5 +153,6 @@ ENTRYPOINT ["/usr/local/bin/attune-agent"] FROM busybox:1.36 AS agent-init COPY --from=builder /build/attune-agent /usr/local/bin/attune-agent +COPY --from=builder /build/attune-sensor-agent /usr/local/bin/attune-sensor-agent ENTRYPOINT ["/usr/local/bin/attune-agent"] diff --git a/docs/plans/sensor-agent-injection.md b/docs/plans/sensor-agent-injection.md new file mode 100644 index 0000000..195a1c1 --- /dev/null +++ b/docs/plans/sensor-agent-injection.md @@ -0,0 +1,266 @@ +# Sensor Agent Injection Plan + +## Overview + +The sensor service is positioned similarly to the worker service: it is a long-running process that dispatches sensor commands into underlying runtimes rather than containing runtime-specific logic in the service binary itself. The worker side now supports injected, statically-linked agent binaries that run inside arbitrary container images. This plan extends the same model to sensors. + +Goal: + +- Replace the pre-built `attune-sensor` container image in default deployments with an injected sensor agent binary running inside stock runtime images +- Reuse the existing runtime auto-detection and capability reporting model +- Preserve current sensor behavior, including runtime-based execution, registration, heartbeat, and graceful shutdown + +Non-goals: + +- Converging worker and sensor into a single binary +- Redesigning sensor scheduling or runtime execution semantics +- Removing existing `ATTUNE_SENSOR_RUNTIMES` overrides + +## Current State + +Relevant implementation points: + +- Sensor startup entrypoint: [crates/sensor/src/main.rs](/home/david/Codebase/attune/crates/sensor/src/main.rs) +- Sensor service orchestration: [crates/sensor/src/service.rs](/home/david/Codebase/attune/crates/sensor/src/service.rs) +- Sensor capability registration: [crates/sensor/src/sensor_worker_registration.rs](/home/david/Codebase/attune/crates/sensor/src/sensor_worker_registration.rs) +- Shared runtime detection: [crates/common/src/runtime_detection.rs](/home/david/Codebase/attune/crates/common/src/runtime_detection.rs) +- Current sensor container build: [docker/Dockerfile.sensor.optimized](/home/david/Codebase/attune/docker/Dockerfile.sensor.optimized) +- Existing worker-agent design reference: [docs/plans/universal-worker-agent.md](/home/david/Codebase/attune/docs/plans/universal-worker-agent.md) + +Observations: + +- Sensors already use the same three-tier capability detection model as workers: + - `ATTUNE_SENSOR_RUNTIMES` + - config file capabilities + - database-driven verification +- The main missing piece is packaging and entrypoint behavior, not capability modeling +- The current sensor Compose service still depends on a pre-built Rust binary baked into the container image +- The sensor manager relies on shared runtime environment assumptions such as interpreter paths and `runtime_envs` compatibility + +## Proposed Architecture + +Introduce a dedicated injected binary, `attune-sensor-agent`, analogous to the existing `attune-agent` for workers. + +Responsibilities of `attune-sensor-agent`: + +- Probe the container for available interpreters before the Tokio runtime starts +- Respect `ATTUNE_SENSOR_RUNTIMES` as a hard override +- Populate `ATTUNE_SENSOR_RUNTIMES` automatically when unset +- Support `--detect-only` for diagnostics +- Load config and start `SensorService` + +This should remain a separate binary from `attune-agent`. + +Reasoning: + +- `attune-agent` is worker-specific today and boots `WorkerService` +- Sensor startup and runtime semantics are related but not identical +- A shared bootstrap library is useful; a single polymorphic agent binary is not necessary + +## Implementation Phases + +### Phase 1: Add Sensor Agent Binary + +Add a new binary target under the sensor crate, likely: + +- `name = "attune-sensor-agent"` +- `path = "src/agent_main.rs"` + +The new binary should mirror the startup shape of [crates/worker/src/agent_main.rs](/home/david/Codebase/attune/crates/worker/src/agent_main.rs), but target sensors instead of workers. + +Expected behavior: + +1. Install the crypto provider +2. Initialize tracing +3. Parse CLI flags: + - `--config` + - `--name` + - `--detect-only` +4. Detect runtimes synchronously before Tokio starts +5. Set `ATTUNE_SENSOR_RUNTIMES` when auto-detection is used +6. Load config +7. Apply sensor name override if provided +8. Start `SensorService` +9. Handle SIGINT/SIGTERM and call `stop()` + +### Phase 2: Reuse and Extract Shared Bootstrap Logic + +Avoid duplicating the worker-agent detection/bootstrap code blindly. + +Extract shared pieces into a reusable location, likely one of: + +- `attune-common` +- a small shared helper module in `crates/common` +- a narrow internal library module used by both worker and sensor crates + +Candidate shared logic: + +- pre-Tokio runtime detection flow +- override handling +- `--detect-only` reporting +- environment mutation rules + +Keep service-specific startup separate: + +- worker agent starts `WorkerService` +- sensor agent starts `SensorService` + +### Phase 3: Docker Build Support for Injected Sensor Agent + +Extend the current agent binary build pipeline so the statically-linked sensor agent can be published into the same shared volume model used for workers. + +Options: + +- Extend [docker/Dockerfile.agent](/home/david/Codebase/attune/docker/Dockerfile.agent) to build and copy both `attune-agent` and `attune-sensor-agent` +- Or add a sibling Dockerfile if the combined build becomes unclear + +Preferred outcome: + +- `init-agent` populates `/opt/attune/agent/attune-agent` +- `init-agent` also populates `/opt/attune/agent/attune-sensor-agent` + +Constraints: + +- Keep the binaries statically linked +- Preserve the existing API binary-serving flow from the `agent_bin` volume +- Do not break current worker agent consumers + +### Phase 4: Compose Integration for Sensor Agent Injection + +Replace the current `sensor` service in [docker-compose.yaml](/home/david/Codebase/attune/docker-compose.yaml) with an agent-injected service. + +Target shape: + +- stock runtime image instead of `docker/Dockerfile.sensor.optimized` +- `entrypoint: ["/opt/attune/agent/attune-sensor-agent"]` +- `depends_on.init-agent` +- same config, packs, runtime env, and log/artifact mounts as required + +Required environment variables must be preserved, especially: + +- `ATTUNE_CONFIG` +- `ATTUNE__DATABASE__URL` +- `ATTUNE__MESSAGE_QUEUE__URL` +- `ATTUNE_API_URL` +- `ATTUNE_MQ_URL` +- `ATTUNE_PACKS_BASE_DIR` + +Recommended default image strategy: + +- Use a stock image that includes the default runtimes the sensor service should expose +- Be conservative about path compatibility with worker-created runtime environments + +### Phase 5: Native Capability Handling + +Sensors have the same edge case as workers: `native` is a capability but not a discoverable interpreter. + +Implication: + +- Pure auto-detection can discover Python, Node, Shell, Ruby, etc. +- It cannot infer `native` safely from interpreter probing alone + +Plan: + +- Keep explicit `ATTUNE_SENSOR_RUNTIMES=...,native` for any default full-capability sensor image +- Revisit later only if native becomes a first-class explicit capability outside interpreter discovery + +### Phase 6: Runtime Environment Compatibility + +The current sensor image documents an important invariant: sensors and workers share `runtime_envs`, so interpreter paths must remain compatible. + +This must remain true after the migration. + +Validation criteria: + +- Python virtual environments created by workers remain usable by sensors +- Node runtime assumptions remain compatible across images +- No new symlink breakage due to mismatched interpreter installation paths + +If necessary, prefer stock images whose paths align with the worker fleet, or explicitly document where sensor and worker images are allowed to diverge. + +### Phase 7: Documentation and Examples + +After implementation: + +- Update [docs/plans/universal-worker-agent.md](/home/david/Codebase/attune/docs/plans/universal-worker-agent.md) with a sensor extension or cross-reference +- Update [docker-compose.yaml](/home/david/Codebase/attune/docker-compose.yaml) +- Update [docker-compose.agent.yaml](/home/david/Codebase/attune/docker-compose.agent.yaml) if it should also include sensor examples +- Add or update quick references for sensor agent injection + +The message should be clear: + +- Workers and sensors both support injected static agent binaries +- Runtime images are now decoupled from Rust service image builds + +## Recommended Implementation Order + +1. Add `attune-sensor-agent` binary and make it boot `SensorService` +2. Extract shared bootstrap logic from the worker-agent path +3. Extend the agent Docker build/init path to include the sensor agent binary +4. Replace the Compose `sensor` service with an injected sensor-agent container +5. Validate runtime detection and one end-to-end Python, Node, and native sensor path +6. Update docs and examples + +## Risks + +### Worker-Agent Coupling + +Risk: + +- Trying to reuse `attune-agent` directly for sensors will conflate worker and sensor startup semantics + +Mitigation: + +- Keep separate binaries with shared helper code only where it is truly generic + +### Native Capability Loss + +Risk: + +- Auto-detection does not capture `native` + +Mitigation: + +- Preserve explicit `ATTUNE_SENSOR_RUNTIMES` where native support is required + +### Runtime Path Mismatch + +Risk: + +- Switching to a stock image may reintroduce broken venv or interpreter path issues + +Mitigation: + +- Validate image interpreter paths against shared `runtime_envs` +- Prefer images that align with worker path conventions when possible + +### Missing Environment Contract + +Risk: + +- The sensor manager currently depends on env vars such as `ATTUNE_API_URL`, `ATTUNE_MQ_URL`, and `ATTUNE_PACKS_BASE_DIR` + +Mitigation: + +- Preserve these in the injected sensor container definition +- Avoid relying solely on config fields unless the code is updated accordingly + +## Validation Checklist + +- `attune-sensor-agent --detect-only` reports detected runtimes correctly +- `ATTUNE_SENSOR_RUNTIMES` override still takes precedence +- Sensor registration records expected runtime capabilities in the `worker` table +- Sensor heartbeat and deregistration still work +- Python-based sensors execute successfully +- Node-based sensors execute successfully +- Native sensors execute successfully when `native` is explicitly enabled +- Shared `runtime_envs` remain usable between workers and sensors +- `docker compose config` validates cleanly after Compose changes + +## Deliverables + +- New `attune-sensor-agent` binary target +- Shared bootstrap/runtime-detection helpers as needed +- Updated agent build/init pipeline producing a sensor agent binary +- Updated Compose deployment using injected sensor agent containers +- Documentation updates covering the sensor agent model diff --git a/migrations/20250101000002_pack_system.sql b/migrations/20250101000002_pack_system.sql index 875d765..6d903e2 100644 --- a/migrations/20250101000002_pack_system.sql +++ b/migrations/20250101000002_pack_system.sql @@ -96,6 +96,7 @@ CREATE TABLE runtime ( pack_ref TEXT, description TEXT, name TEXT NOT NULL, + aliases TEXT[] NOT NULL DEFAULT '{}'::text[], distributions JSONB NOT NULL, installation JSONB, @@ -158,6 +159,7 @@ CREATE INDEX idx_runtime_verification ON runtime USING GIN ((distributions->'ver CREATE INDEX idx_runtime_execution_config ON runtime USING GIN (execution_config); CREATE INDEX idx_runtime_auto_detected ON runtime(auto_detected); CREATE INDEX idx_runtime_detection_config ON runtime USING GIN (detection_config); +CREATE INDEX idx_runtime_aliases ON runtime USING GIN (aliases); -- Trigger CREATE TRIGGER update_runtime_updated @@ -169,6 +171,7 @@ CREATE TRIGGER update_runtime_updated COMMENT ON TABLE runtime IS 'Runtime environments for executing actions and sensors (unified)'; COMMENT ON COLUMN runtime.ref IS 'Unique runtime reference (format: pack.name, e.g., core.python)'; COMMENT ON COLUMN runtime.name IS 'Runtime name (e.g., "Python", "Node.js", "Shell")'; +COMMENT ON COLUMN runtime.aliases IS 'Lowercase alias names for this runtime (e.g., ["ruby", "rb"] for the Ruby runtime). Used for alias-aware matching during auto-detection and scheduling.'; COMMENT ON COLUMN runtime.distributions IS 'Runtime distribution metadata including verification commands, version requirements, and capabilities'; COMMENT ON COLUMN runtime.installation IS 'Installation requirements and instructions including package managers and setup steps'; COMMENT ON COLUMN runtime.installers IS 'Array of installer actions to create pack-specific runtime environments. Each installer defines commands to set up isolated environments (e.g., Python venv, npm install).'; diff --git a/packs/core/runtimes/go.yaml b/packs/core/runtimes/go.yaml index 16edfd2..5567e4d 100644 --- a/packs/core/runtimes/go.yaml +++ b/packs/core/runtimes/go.yaml @@ -1,6 +1,7 @@ ref: core.go pack_ref: core name: Go +aliases: [go, golang] description: Go runtime for compiling and running Go scripts and programs distributions: diff --git a/packs/core/runtimes/java.yaml b/packs/core/runtimes/java.yaml index 8cafccc..ffb3847 100644 --- a/packs/core/runtimes/java.yaml +++ b/packs/core/runtimes/java.yaml @@ -1,6 +1,7 @@ ref: core.java pack_ref: core name: Java +aliases: [java, jdk, openjdk] description: Java runtime for executing Java programs and scripts distributions: diff --git a/packs/core/runtimes/native.yaml b/packs/core/runtimes/native.yaml index bb69865..07ba9dd 100644 --- a/packs/core/runtimes/native.yaml +++ b/packs/core/runtimes/native.yaml @@ -1,6 +1,7 @@ ref: core.native pack_ref: core name: Native +aliases: [native, builtin, standalone] description: Native compiled runtime (Rust, Go, C, etc.) - executes binaries directly without an interpreter distributions: diff --git a/packs/core/runtimes/nodejs.yaml b/packs/core/runtimes/nodejs.yaml index 6748671..592c5af 100644 --- a/packs/core/runtimes/nodejs.yaml +++ b/packs/core/runtimes/nodejs.yaml @@ -1,6 +1,7 @@ ref: core.nodejs pack_ref: core name: Node.js +aliases: [node, nodejs, "node.js"] description: Node.js runtime for JavaScript-based actions and sensors distributions: diff --git a/packs/core/runtimes/perl.yaml b/packs/core/runtimes/perl.yaml index e1c9ecd..f36304b 100644 --- a/packs/core/runtimes/perl.yaml +++ b/packs/core/runtimes/perl.yaml @@ -1,6 +1,7 @@ ref: core.perl pack_ref: core name: Perl +aliases: [perl, perl5] description: Perl runtime for script execution with optional CPAN dependency management distributions: diff --git a/packs/core/runtimes/python.yaml b/packs/core/runtimes/python.yaml index b093b12..e787d0f 100644 --- a/packs/core/runtimes/python.yaml +++ b/packs/core/runtimes/python.yaml @@ -1,6 +1,7 @@ ref: core.python pack_ref: core name: Python +aliases: [python, python3] description: Python 3 runtime for actions and sensors with automatic environment management distributions: diff --git a/packs/core/runtimes/r.yaml b/packs/core/runtimes/r.yaml index 2194fe5..f93bed6 100644 --- a/packs/core/runtimes/r.yaml +++ b/packs/core/runtimes/r.yaml @@ -1,6 +1,7 @@ ref: core.r pack_ref: core name: R +aliases: [r, rscript] description: R runtime for statistical computing and data analysis scripts distributions: @@ -42,6 +43,6 @@ execution_config: install_command: - sh - "-c" - - "cd {pack_dir} && R_LIBS_USER={env_dir}/library Rscript -e \"if (file.exists('renv.lock')) renv::restore(library='{env_dir}/library', prompt=FALSE)\" 2>/dev/null || true" + - 'cd {pack_dir} && R_LIBS_USER={env_dir}/library Rscript -e "if (file.exists(''renv.lock'')) renv::restore(library=''{env_dir}/library'', prompt=FALSE)" 2>/dev/null || true' env_vars: R_LIBS_USER: "{env_dir}/library" diff --git a/packs/core/runtimes/ruby.yaml b/packs/core/runtimes/ruby.yaml index a8f90d2..fb9569b 100644 --- a/packs/core/runtimes/ruby.yaml +++ b/packs/core/runtimes/ruby.yaml @@ -1,6 +1,7 @@ ref: core.ruby pack_ref: core name: Ruby +aliases: [ruby, rb] description: Ruby runtime for script execution with automatic gem environment management distributions: diff --git a/packs/core/runtimes/shell.yaml b/packs/core/runtimes/shell.yaml index b7f3f98..2a0b64a 100644 --- a/packs/core/runtimes/shell.yaml +++ b/packs/core/runtimes/shell.yaml @@ -1,6 +1,7 @@ ref: core.shell pack_ref: core name: Shell +aliases: [shell, bash, sh] description: Shell (bash/sh) runtime for script execution - always available distributions: diff --git a/scripts/load_core_pack.py b/scripts/load_core_pack.py index 57fc2bc..4721a82 100755 --- a/scripts/load_core_pack.py +++ b/scripts/load_core_pack.py @@ -302,6 +302,7 @@ class PackLoader: name = runtime_data.get("name", ref.split(".")[-1]) description = runtime_data.get("description", "") + aliases = [alias.lower() for alias in runtime_data.get("aliases", [])] distributions = json.dumps(runtime_data.get("distributions", {})) installation = json.dumps(runtime_data.get("installation", {})) execution_config = json.dumps(runtime_data.get("execution_config", {})) @@ -310,12 +311,13 @@ class PackLoader: """ INSERT INTO runtime ( ref, pack, pack_ref, name, description, - distributions, installation, execution_config + aliases, distributions, installation, execution_config ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (ref) DO UPDATE SET name = EXCLUDED.name, description = EXCLUDED.description, + aliases = EXCLUDED.aliases, distributions = EXCLUDED.distributions, installation = EXCLUDED.installation, execution_config = EXCLUDED.execution_config, @@ -328,6 +330,7 @@ class PackLoader: self.pack_ref, name, description, + aliases, distributions, installation, execution_config, @@ -338,6 +341,8 @@ class PackLoader: runtime_ids[ref] = runtime_id # Also index by lowercase name for easy lookup by runner_type runtime_ids[name.lower()] = runtime_id + for alias in aliases: + runtime_ids[alias] = runtime_id print(f" ✓ Runtime '{ref}' (ID: {runtime_id})") cursor.close()