From a84c07082cfb7767642cd46859e59e0cd90fdb3d Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Fri, 20 Feb 2026 14:11:06 -0600 Subject: [PATCH] sensors using keys --- crates/api/src/middleware/error.rs | 38 +- crates/api/src/routes/triggers.rs | 15 + crates/common/src/pack_registry/loader.rs | 88 ++++- crates/common/src/repositories/trigger.rs | 50 +++ .../common/tests/sensor_repository_tests.rs | 361 ++++++------------ crates/sensor/src/sensor_manager.rs | 70 +++- .../20260220000000_relax_key_ref_format.sql | 7 + scripts/load_core_pack.py | 2 + .../2026-02-20-sensor-runtime-upsert-fix.md | 45 +++ 9 files changed, 416 insertions(+), 260 deletions(-) create mode 100644 migrations/20260220000000_relax_key_ref_format.sql create mode 100644 work-summary/2026-02-20-sensor-runtime-upsert-fix.md diff --git a/crates/api/src/middleware/error.rs b/crates/api/src/middleware/error.rs index 0212654..f7efc29 100644 --- a/crates/api/src/middleware/error.rs +++ b/crates/api/src/middleware/error.rs @@ -148,8 +148,42 @@ impl From for ApiError { match err { sqlx::Error::RowNotFound => ApiError::NotFound("Resource not found".to_string()), sqlx::Error::Database(db_err) => { - // Check for unique constraint violations - if let Some(constraint) = db_err.constraint() { + // PostgreSQL error codes: + // 23505 = unique_violation → 409 Conflict + // 23503 = foreign_key_violation → 422 Unprocessable Entity + // 23514 = check_violation → 422 Unprocessable Entity + // P0001 = raise_exception → 400 Bad Request (trigger-raised errors) + let pg_code = db_err.code().map(|c| c.to_string()).unwrap_or_default(); + if pg_code == "23505" { + // Unique constraint violation — duplicate key + let detail = db_err + .constraint() + .map(|c| format!(" ({})", c)) + .unwrap_or_default(); + ApiError::Conflict(format!("Already exists{}", detail)) + } else if pg_code == "23503" { + // Foreign key violation — the referenced row doesn't exist + let detail = db_err + .constraint() + .map(|c| format!(" ({})", c)) + .unwrap_or_default(); + ApiError::UnprocessableEntity(format!( + "Referenced entity does not exist{}", + detail + )) + } else if pg_code == "23514" { + // CHECK constraint violation — value doesn't meet constraint + let detail = db_err + .constraint() + .map(|c| format!(": {}", c)) + .unwrap_or_default(); + ApiError::UnprocessableEntity(format!("Validation constraint failed{}", detail)) + } else if pg_code == "P0001" { + // RAISE EXCEPTION from a trigger or function + // Extract the human-readable message from the exception + let msg = db_err.message().to_string(); + ApiError::BadRequest(msg) + } else if let Some(constraint) = db_err.constraint() { ApiError::Conflict(format!("Constraint violation: {}", constraint)) } else { ApiError::DatabaseError(format!("Database error: {}", db_err)) diff --git a/crates/api/src/routes/triggers.rs b/crates/api/src/routes/triggers.rs index c5fb092..9f31247 100644 --- a/crates/api/src/routes/triggers.rs +++ b/crates/api/src/routes/triggers.rs @@ -719,8 +719,13 @@ pub async fn update_sensor( label: request.label, description: request.description, entrypoint: request.entrypoint, + runtime: None, + runtime_ref: None, + trigger: None, + trigger_ref: None, enabled: request.enabled, param_schema: request.param_schema, + config: None, }; let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?; @@ -799,8 +804,13 @@ pub async fn enable_sensor( label: None, description: None, entrypoint: None, + runtime: None, + runtime_ref: None, + trigger: None, + trigger_ref: None, enabled: Some(true), param_schema: None, + config: None, }; let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?; @@ -840,8 +850,13 @@ pub async fn disable_sensor( label: None, description: None, entrypoint: None, + runtime: None, + runtime_ref: None, + trigger: None, + trigger_ref: None, enabled: Some(false), param_schema: None, + config: None, }; let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?; diff --git a/crates/common/src/pack_registry/loader.rs b/crates/common/src/pack_registry/loader.rs index 6bcbbfc..4c2072e 100644 --- a/crates/common/src/pack_registry/loader.rs +++ b/crates/common/src/pack_registry/loader.rs @@ -23,7 +23,7 @@ use crate::repositories::runtime::{CreateRuntimeInput, RuntimeRepository}; use crate::repositories::trigger::{ CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository, }; -use crate::repositories::{Create, FindByRef}; +use crate::repositories::{Create, FindById, FindByRef, Update}; /// Result of loading pack components into the database. #[derive(Debug, Default)] @@ -514,6 +514,47 @@ impl<'a> PackComponentLoader<'a> { .unwrap_or("native"); let (sensor_runtime_id, sensor_runtime_ref) = self.resolve_runtime(runner_type).await?; + // Validate: if the runner_type suggests an interpreted runtime (not native) + // but we couldn't resolve it, or it resolved to a runtime with no + // execution_config, warn at registration time rather than failing + // opaquely at sensor startup with "Permission denied". + let is_native_runner = matches!( + runner_type.to_lowercase().as_str(), + "native" | "builtin" | "standalone" + ); + if sensor_runtime_id == 0 && !is_native_runner { + let msg = format!( + "Sensor '{}' declares runner_type '{}' but no matching runtime \ + was found in the database. The sensor will not be able to start. \ + Ensure the core pack (with runtimes) is loaded before registering \ + packs that depend on its runtimes.", + filename, runner_type + ); + warn!("{}", msg); + result.warnings.push(msg); + } else if sensor_runtime_id != 0 && !is_native_runner { + // Verify the resolved runtime has a non-empty execution_config + if let Some(runtime) = + RuntimeRepository::find_by_id(self.pool, sensor_runtime_id).await? + { + let exec_config = runtime.parsed_execution_config(); + if exec_config.interpreter.binary.is_empty() + || exec_config.interpreter.binary == "native" + || exec_config.interpreter.binary == "none" + { + let msg = format!( + "Sensor '{}' declares runner_type '{}' (resolved to runtime '{}') \ + but that runtime has no interpreter configured in its \ + execution_config. The sensor will fail to start. \ + Check the runtime definition for '{}'.", + filename, runner_type, runtime.r#ref, runtime.r#ref + ); + warn!("{}", msg); + result.warnings.push(msg); + } + } + } + let sensor_ref = match data.get("ref").and_then(|v| v.as_str()) { Some(r) => r.to_string(), None => { @@ -524,16 +565,6 @@ impl<'a> PackComponentLoader<'a> { } }; - // Check if sensor already exists - if let Some(existing) = SensorRepository::find_by_ref(self.pool, &sensor_ref).await? { - info!( - "Sensor '{}' already exists (ID: {}), skipping", - sensor_ref, existing.id - ); - result.sensors_skipped += 1; - continue; - } - let name = extract_name_from_ref(&sensor_ref); let label = data .get("label") @@ -570,6 +601,41 @@ impl<'a> PackComponentLoader<'a> { .and_then(|v| serde_json::to_value(v).ok()) .unwrap_or_else(|| serde_json::json!({})); + // Upsert: update existing sensors so re-registration corrects + // stale metadata (especially runtime assignments). + if let Some(existing) = SensorRepository::find_by_ref(self.pool, &sensor_ref).await? { + use crate::repositories::trigger::UpdateSensorInput; + + let update_input = UpdateSensorInput { + label: Some(label), + description: Some(description), + entrypoint: Some(entrypoint), + runtime: Some(sensor_runtime_id), + runtime_ref: Some(sensor_runtime_ref.clone()), + trigger: Some(trigger_id.unwrap_or(existing.trigger)), + trigger_ref: Some(trigger_ref.unwrap_or(existing.trigger_ref.clone())), + enabled: Some(enabled), + param_schema, + config: Some(config), + }; + + match SensorRepository::update(self.pool, existing.id, update_input).await { + Ok(_) => { + info!( + "Updated sensor '{}' (ID: {}, runtime: {} → {})", + sensor_ref, existing.id, existing.runtime_ref, sensor_runtime_ref + ); + result.sensors_loaded += 1; + } + Err(e) => { + let msg = format!("Failed to update sensor '{}': {}", sensor_ref, e); + warn!("{}", msg); + result.warnings.push(msg); + } + } + continue; + } + let input = CreateSensorInput { r#ref: sensor_ref.clone(), pack: Some(self.pack_id), diff --git a/crates/common/src/repositories/trigger.rs b/crates/common/src/repositories/trigger.rs index 62d7509..d3afcc8 100644 --- a/crates/common/src/repositories/trigger.rs +++ b/crates/common/src/repositories/trigger.rs @@ -531,8 +531,13 @@ pub struct UpdateSensorInput { pub label: Option, pub description: Option, pub entrypoint: Option, + pub runtime: Option, + pub runtime_ref: Option, + pub trigger: Option, + pub trigger_ref: Option, pub enabled: Option, pub param_schema: Option, + pub config: Option, } #[async_trait::async_trait] @@ -688,6 +693,42 @@ impl Update for SensorRepository { has_updates = true; } + if let Some(runtime) = input.runtime { + if has_updates { + query.push(", "); + } + query.push("runtime = "); + query.push_bind(runtime); + has_updates = true; + } + + if let Some(runtime_ref) = &input.runtime_ref { + if has_updates { + query.push(", "); + } + query.push("runtime_ref = "); + query.push_bind(runtime_ref); + has_updates = true; + } + + if let Some(trigger) = input.trigger { + if has_updates { + query.push(", "); + } + query.push("trigger = "); + query.push_bind(trigger); + has_updates = true; + } + + if let Some(trigger_ref) = &input.trigger_ref { + if has_updates { + query.push(", "); + } + query.push("trigger_ref = "); + query.push_bind(trigger_ref); + has_updates = true; + } + if let Some(param_schema) = &input.param_schema { if has_updates { query.push(", "); @@ -697,6 +738,15 @@ impl Update for SensorRepository { has_updates = true; } + if let Some(config) = &input.config { + if has_updates { + query.push(", "); + } + query.push("config = "); + query.push_bind(config); + has_updates = true; + } + if !has_updates { // No updates requested, fetch and return existing entity return Self::get_by_id(executor, id).await; diff --git a/crates/common/tests/sensor_repository_tests.rs b/crates/common/tests/sensor_repository_tests.rs index eba4753..b13033d 100644 --- a/crates/common/tests/sensor_repository_tests.rs +++ b/crates/common/tests/sensor_repository_tests.rs @@ -34,14 +34,10 @@ async fn test_create_sensor_minimal() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); // Create sensor let sensor = SensorFixture::new_unique( @@ -85,14 +81,10 @@ async fn test_create_sensor_with_param_schema() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let param_schema = json!({ "type": "object", @@ -171,14 +163,10 @@ async fn test_create_sensor_duplicate_ref_fails() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); // Create first sensor let sensor_ref = format!("{}.duplicate_sensor", pack.r#ref); @@ -223,14 +211,10 @@ async fn test_create_sensor_invalid_ref_format_fails() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); // Try invalid ref formats let invalid_refs = vec![ @@ -378,14 +362,10 @@ async fn test_find_by_id_exists() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -433,14 +413,10 @@ async fn test_get_by_id_exists() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -484,14 +460,10 @@ async fn test_find_by_ref_exists() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -540,14 +512,10 @@ async fn test_get_by_ref_exists() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -593,14 +561,10 @@ async fn test_list_all_sensors() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); // Create multiple sensors let _sensor1 = SensorFixture::new_unique( @@ -668,14 +632,10 @@ async fn test_update_label() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -724,14 +684,10 @@ async fn test_update_description() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -772,14 +728,10 @@ async fn test_update_entrypoint() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -820,14 +772,10 @@ async fn test_update_enabled_status() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -883,14 +831,10 @@ async fn test_update_param_schema() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -941,14 +885,10 @@ async fn test_update_multiple_fields() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -969,6 +909,7 @@ async fn test_update_multiple_fields() { entrypoint: Some("sensors/multi.py".to_string()), enabled: Some(false), param_schema: Some(json!({"type": "object"})), + ..Default::default() }; let updated = SensorRepository::update(&pool, sensor.id, input) @@ -996,14 +937,10 @@ async fn test_update_no_changes() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -1066,14 +1003,10 @@ async fn test_delete_existing_sensor() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -1120,14 +1053,10 @@ async fn test_delete_sensor_when_pack_deleted() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -1167,14 +1096,10 @@ async fn test_delete_sensor_when_trigger_deleted() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -1214,14 +1139,10 @@ async fn test_delete_sensor_when_runtime_deleted() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -1270,14 +1191,10 @@ async fn test_find_by_trigger() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); // Create sensors for trigger1 let sensor1 = SensorFixture::new_unique( @@ -1364,14 +1281,10 @@ async fn test_find_enabled() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); // Create enabled sensor let enabled_sensor = SensorFixture::new_unique( @@ -1424,14 +1337,10 @@ async fn test_find_enabled_empty() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); // Create only disabled sensor let disabled = SensorFixture::new_unique( @@ -1477,23 +1386,15 @@ async fn test_find_by_pack() { .await .unwrap(); - let runtime1 = RuntimeFixture::new_unique( - Some(pack1.id), - Some(pack1.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime1 = RuntimeFixture::new_unique(Some(pack1.id), Some(pack1.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); - let runtime2 = RuntimeFixture::new_unique( - Some(pack2.id), - Some(pack2.r#ref.clone()), - "nodejs", - ) - .create(&pool) - .await - .unwrap(); + let runtime2 = RuntimeFixture::new_unique(Some(pack2.id), Some(pack2.r#ref.clone()), "nodejs") + .create(&pool) + .await + .unwrap(); // Create sensors for pack1 let sensor1 = SensorFixture::new_unique( @@ -1580,14 +1481,10 @@ async fn test_created_timestamp_set_automatically() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let before = chrono::Utc::now(); @@ -1625,14 +1522,10 @@ async fn test_updated_timestamp_changes_on_update() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -1679,14 +1572,10 @@ async fn test_updated_timestamp_unchanged_on_read() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), @@ -1733,14 +1622,10 @@ async fn test_param_schema_complex_structure() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let complex_schema = json!({ "type": "object", @@ -1811,14 +1696,10 @@ async fn test_param_schema_can_be_null() { .await .unwrap(); - let runtime = RuntimeFixture::new_unique( - Some(pack.id), - Some(pack.r#ref.clone()), - "python3", - ) - .create(&pool) - .await - .unwrap(); + let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3") + .create(&pool) + .await + .unwrap(); let sensor = SensorFixture::new_unique( Some(pack.id), diff --git a/crates/sensor/src/sensor_manager.rs b/crates/sensor/src/sensor_manager.rs index c70f3c5..37ed782 100644 --- a/crates/sensor/src/sensor_manager.rs +++ b/crates/sensor/src/sensor_manager.rs @@ -232,6 +232,11 @@ impl SensorManager { let exec_config = runtime.parsed_execution_config(); let rt_name = runtime.name.to_lowercase(); + info!( + "Sensor {} runtime details: id={}, ref='{}', name='{}', execution_config={}", + sensor.r#ref, runtime.id, runtime.r#ref, runtime.name, runtime.execution_config + ); + // Resolve the interpreter: check for a virtualenv/node_modules first, // then fall back to the system interpreter. let pack_dir = std::path::PathBuf::from(&self.inner.packs_base_dir).join(pack_ref); @@ -255,8 +260,13 @@ impl SensorManager { || interpreter_binary == "none"; info!( - "Sensor {} runtime={} interpreter={} native={}", - sensor.r#ref, rt_name, interpreter_binary, is_native + "Sensor {} runtime={} (ref={}) interpreter='{}' native={} env_dir_exists={}", + sensor.r#ref, + rt_name, + runtime.r#ref, + interpreter_binary, + is_native, + env_dir.exists() ); info!("Starting standalone sensor process: {}", sensor_script); @@ -289,8 +299,8 @@ impl SensorManager { // Build the command: use the interpreter for non-native runtimes, // execute the script directly for native binaries. - let mut cmd = if is_native { - Command::new(&sensor_script) + let (spawn_binary, mut cmd) = if is_native { + (sensor_script.clone(), Command::new(&sensor_script)) } else { let resolved_interpreter = exec_config.resolve_interpreter_with_env(&pack_dir, env_dir_opt); @@ -299,15 +309,49 @@ impl SensorManager { resolved_interpreter.display(), sensor.r#ref ); - let mut c = Command::new(resolved_interpreter); + let binary_str = resolved_interpreter.display().to_string(); + let mut c = Command::new(&resolved_interpreter); // Pass any extra interpreter args (e.g., -u for unbuffered Python) for arg in &exec_config.interpreter.args { c.arg(arg); } c.arg(&sensor_script); - c + (binary_str, c) }; + // Log the full command for diagnostics + info!( + "Spawning sensor {}: binary='{}' is_native={} script='{}'", + sensor.r#ref, spawn_binary, is_native, sensor_script + ); + + // Pre-flight check: verify the binary exists and is accessible + let spawn_path = std::path::Path::new(&spawn_binary); + if spawn_path.is_absolute() || spawn_path.components().count() > 1 { + // Absolute or relative path with directory component — check it directly + match std::fs::metadata(spawn_path) { + Ok(meta) => { + use std::os::unix::fs::PermissionsExt; + let mode = meta.permissions().mode(); + let is_exec = mode & 0o111 != 0; + if !is_exec { + error!( + "Binary '{}' exists but is not executable (mode: {:o}). \ + Sensor runtime ref='{}', execution_config interpreter='{}'.", + spawn_binary, mode, runtime.r#ref, interpreter_binary + ); + } + } + Err(e) => { + error!( + "Cannot access binary '{}': {}. \ + Sensor runtime ref='{}', execution_config interpreter='{}'.", + spawn_binary, e, runtime.r#ref, interpreter_binary + ); + } + } + } + // Start the standalone sensor with token and configuration // Pass sensor ref (e.g., "core.interval_timer_sensor") for proper identification let mut child = cmd @@ -323,7 +367,19 @@ impl SensorManager { .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() - .map_err(|e| anyhow!("Failed to start standalone sensor process: {}", e))?; + .map_err(|e| { + anyhow!( + "Failed to start sensor process for '{}': {} \ + (binary='{}', is_native={}, runtime_ref='{}', \ + interpreter_config='{}')", + sensor.r#ref, + e, + spawn_binary, + is_native, + runtime.r#ref, + interpreter_binary + ) + })?; // Get stdout and stderr for logging (standalone sensors output JSON logs to stdout) let stdout = child diff --git a/migrations/20260220000000_relax_key_ref_format.sql b/migrations/20260220000000_relax_key_ref_format.sql new file mode 100644 index 0000000..1f33281 --- /dev/null +++ b/migrations/20260220000000_relax_key_ref_format.sql @@ -0,0 +1,7 @@ +-- Migration: Relax key ref format constraint +-- Description: Allow multi-segment dot-separated key refs (e.g., "pack.prefix.name") +-- The original constraint only allowed at most one dot: '^([^.]+\.)?[^.]+$' +-- Sensors create refs like "python_example.counter.rule_ref" which have multiple dots. + +ALTER TABLE key DROP CONSTRAINT key_ref_format; +ALTER TABLE key ADD CONSTRAINT key_ref_format CHECK (ref ~ '^[^.]+(\.[^.]+)*$'); diff --git a/scripts/load_core_pack.py b/scripts/load_core_pack.py index c7b3e37..13df027 100755 --- a/scripts/load_core_pack.py +++ b/scripts/load_core_pack.py @@ -528,6 +528,8 @@ class PackLoader: label = EXCLUDED.label, description = EXCLUDED.description, entrypoint = EXCLUDED.entrypoint, + runtime = EXCLUDED.runtime, + runtime_ref = EXCLUDED.runtime_ref, trigger = EXCLUDED.trigger, trigger_ref = EXCLUDED.trigger_ref, enabled = EXCLUDED.enabled, diff --git a/work-summary/2026-02-20-sensor-runtime-upsert-fix.md b/work-summary/2026-02-20-sensor-runtime-upsert-fix.md new file mode 100644 index 0000000..20fbc2a --- /dev/null +++ b/work-summary/2026-02-20-sensor-runtime-upsert-fix.md @@ -0,0 +1,45 @@ +# Fix: Python Sensor "Permission Denied" — Stale Runtime Assignment + +**Date**: 2026-02-20 + +## Problem + +Python-based sensors failed to start with `Permission denied (os error 13)`. + +## Root Cause + +The sensor's runtime in the database pointed to `core.builtin` (empty `execution_config`) instead of `core.python`. This caused `is_native=true`, making the sensor manager try to execute the `.py` script directly — which fails without the execute bit. + +The stale assignment persisted because the pack component loader **skipped** existing sensors on re-registration instead of updating them. Once a sensor was created with the wrong runtime, there was no way to correct it short of deleting the pack entirely. + +**DB evidence**: `SELECT runtime, runtime_ref FROM sensor` → `runtime=4, runtime_ref=core.builtin` (should be `runtime=3, runtime_ref=core.python`). + +## Changes + +### 1. Sensor upsert on re-registration (`crates/common/src/pack_registry/loader.rs`) +- Changed `load_sensors` from skip-if-exists to upsert: existing sensors are updated with fresh metadata from the YAML (runtime, entrypoint, trigger, config, etc.) +- Re-registering a pack now corrects stale runtime assignments + +### 2. `UpdateSensorInput` extended (`crates/common/src/repositories/trigger.rs`) +- Added `runtime`, `runtime_ref`, `trigger`, `trigger_ref`, and `config` fields so the update path can correct all sensor metadata +- Updated all callsites in `crates/api/src/routes/triggers.rs` and tests + +### 3. Registration-time validation (`crates/common/src/pack_registry/loader.rs`) +- Warns if a non-native `runner_type` (e.g., `python`) resolves to runtime ID 0 (not found) +- Warns if the resolved runtime has empty/missing `execution_config` + +### 4. Sensor manager diagnostics (`crates/sensor/src/sensor_manager.rs`) +- Logs full runtime details (id, ref, name, raw `execution_config` JSON) +- Logs `env_dir_exists` status and resolved interpreter path +- Pre-flight check: verifies binary exists and has execute permission before spawn +- Error message includes binary path, `is_native` flag, and runtime ref + +### 5. Python loader consistency (`scripts/load_core_pack.py`) +- Added `runtime` and `runtime_ref` to sensor `ON CONFLICT DO UPDATE` clause + +## Verification + +After rebuilding and re-registering the pack with `force=true`: +- Sensor runtime corrected: `core.builtin` → `core.python` +- Sensor started successfully with venv interpreter at `/opt/attune/runtime_envs/python_example/python/bin/python3` +- Counter sensor fully operational (RabbitMQ connected, rules bootstrapped) \ No newline at end of file