Compare commits

..

2 Commits

Author SHA1 Message Date
d629da32fa sql migration rollup 2026-02-20 14:25:43 -06:00
a84c07082c sensors using keys 2026-02-20 14:11:06 -06:00
9 changed files with 410 additions and 261 deletions

View File

@@ -148,8 +148,42 @@ impl From<sqlx::Error> for ApiError {
match err { match err {
sqlx::Error::RowNotFound => ApiError::NotFound("Resource not found".to_string()), sqlx::Error::RowNotFound => ApiError::NotFound("Resource not found".to_string()),
sqlx::Error::Database(db_err) => { sqlx::Error::Database(db_err) => {
// Check for unique constraint violations // PostgreSQL error codes:
if let Some(constraint) = db_err.constraint() { // 23505 = unique_violation → 409 Conflict
// 23503 = foreign_key_violation → 422 Unprocessable Entity
// 23514 = check_violation → 422 Unprocessable Entity
// P0001 = raise_exception → 400 Bad Request (trigger-raised errors)
let pg_code = db_err.code().map(|c| c.to_string()).unwrap_or_default();
if pg_code == "23505" {
// Unique constraint violation — duplicate key
let detail = db_err
.constraint()
.map(|c| format!(" ({})", c))
.unwrap_or_default();
ApiError::Conflict(format!("Already exists{}", detail))
} else if pg_code == "23503" {
// Foreign key violation — the referenced row doesn't exist
let detail = db_err
.constraint()
.map(|c| format!(" ({})", c))
.unwrap_or_default();
ApiError::UnprocessableEntity(format!(
"Referenced entity does not exist{}",
detail
))
} else if pg_code == "23514" {
// CHECK constraint violation — value doesn't meet constraint
let detail = db_err
.constraint()
.map(|c| format!(": {}", c))
.unwrap_or_default();
ApiError::UnprocessableEntity(format!("Validation constraint failed{}", detail))
} else if pg_code == "P0001" {
// RAISE EXCEPTION from a trigger or function
// Extract the human-readable message from the exception
let msg = db_err.message().to_string();
ApiError::BadRequest(msg)
} else if let Some(constraint) = db_err.constraint() {
ApiError::Conflict(format!("Constraint violation: {}", constraint)) ApiError::Conflict(format!("Constraint violation: {}", constraint))
} else { } else {
ApiError::DatabaseError(format!("Database error: {}", db_err)) ApiError::DatabaseError(format!("Database error: {}", db_err))

View File

@@ -719,8 +719,13 @@ pub async fn update_sensor(
label: request.label, label: request.label,
description: request.description, description: request.description,
entrypoint: request.entrypoint, entrypoint: request.entrypoint,
runtime: None,
runtime_ref: None,
trigger: None,
trigger_ref: None,
enabled: request.enabled, enabled: request.enabled,
param_schema: request.param_schema, param_schema: request.param_schema,
config: None,
}; };
let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?; let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?;
@@ -799,8 +804,13 @@ pub async fn enable_sensor(
label: None, label: None,
description: None, description: None,
entrypoint: None, entrypoint: None,
runtime: None,
runtime_ref: None,
trigger: None,
trigger_ref: None,
enabled: Some(true), enabled: Some(true),
param_schema: None, param_schema: None,
config: None,
}; };
let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?; let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?;
@@ -840,8 +850,13 @@ pub async fn disable_sensor(
label: None, label: None,
description: None, description: None,
entrypoint: None, entrypoint: None,
runtime: None,
runtime_ref: None,
trigger: None,
trigger_ref: None,
enabled: Some(false), enabled: Some(false),
param_schema: None, param_schema: None,
config: None,
}; };
let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?; let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?;

View File

@@ -23,7 +23,7 @@ use crate::repositories::runtime::{CreateRuntimeInput, RuntimeRepository};
use crate::repositories::trigger::{ use crate::repositories::trigger::{
CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository, CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository,
}; };
use crate::repositories::{Create, FindByRef}; use crate::repositories::{Create, FindById, FindByRef, Update};
/// Result of loading pack components into the database. /// Result of loading pack components into the database.
#[derive(Debug, Default)] #[derive(Debug, Default)]
@@ -514,6 +514,47 @@ impl<'a> PackComponentLoader<'a> {
.unwrap_or("native"); .unwrap_or("native");
let (sensor_runtime_id, sensor_runtime_ref) = self.resolve_runtime(runner_type).await?; let (sensor_runtime_id, sensor_runtime_ref) = self.resolve_runtime(runner_type).await?;
// Validate: if the runner_type suggests an interpreted runtime (not native)
// but we couldn't resolve it, or it resolved to a runtime with no
// execution_config, warn at registration time rather than failing
// opaquely at sensor startup with "Permission denied".
let is_native_runner = matches!(
runner_type.to_lowercase().as_str(),
"native" | "builtin" | "standalone"
);
if sensor_runtime_id == 0 && !is_native_runner {
let msg = format!(
"Sensor '{}' declares runner_type '{}' but no matching runtime \
was found in the database. The sensor will not be able to start. \
Ensure the core pack (with runtimes) is loaded before registering \
packs that depend on its runtimes.",
filename, runner_type
);
warn!("{}", msg);
result.warnings.push(msg);
} else if sensor_runtime_id != 0 && !is_native_runner {
// Verify the resolved runtime has a non-empty execution_config
if let Some(runtime) =
RuntimeRepository::find_by_id(self.pool, sensor_runtime_id).await?
{
let exec_config = runtime.parsed_execution_config();
if exec_config.interpreter.binary.is_empty()
|| exec_config.interpreter.binary == "native"
|| exec_config.interpreter.binary == "none"
{
let msg = format!(
"Sensor '{}' declares runner_type '{}' (resolved to runtime '{}') \
but that runtime has no interpreter configured in its \
execution_config. The sensor will fail to start. \
Check the runtime definition for '{}'.",
filename, runner_type, runtime.r#ref, runtime.r#ref
);
warn!("{}", msg);
result.warnings.push(msg);
}
}
}
let sensor_ref = match data.get("ref").and_then(|v| v.as_str()) { let sensor_ref = match data.get("ref").and_then(|v| v.as_str()) {
Some(r) => r.to_string(), Some(r) => r.to_string(),
None => { None => {
@@ -524,16 +565,6 @@ impl<'a> PackComponentLoader<'a> {
} }
}; };
// Check if sensor already exists
if let Some(existing) = SensorRepository::find_by_ref(self.pool, &sensor_ref).await? {
info!(
"Sensor '{}' already exists (ID: {}), skipping",
sensor_ref, existing.id
);
result.sensors_skipped += 1;
continue;
}
let name = extract_name_from_ref(&sensor_ref); let name = extract_name_from_ref(&sensor_ref);
let label = data let label = data
.get("label") .get("label")
@@ -570,6 +601,41 @@ impl<'a> PackComponentLoader<'a> {
.and_then(|v| serde_json::to_value(v).ok()) .and_then(|v| serde_json::to_value(v).ok())
.unwrap_or_else(|| serde_json::json!({})); .unwrap_or_else(|| serde_json::json!({}));
// Upsert: update existing sensors so re-registration corrects
// stale metadata (especially runtime assignments).
if let Some(existing) = SensorRepository::find_by_ref(self.pool, &sensor_ref).await? {
use crate::repositories::trigger::UpdateSensorInput;
let update_input = UpdateSensorInput {
label: Some(label),
description: Some(description),
entrypoint: Some(entrypoint),
runtime: Some(sensor_runtime_id),
runtime_ref: Some(sensor_runtime_ref.clone()),
trigger: Some(trigger_id.unwrap_or(existing.trigger)),
trigger_ref: Some(trigger_ref.unwrap_or(existing.trigger_ref.clone())),
enabled: Some(enabled),
param_schema,
config: Some(config),
};
match SensorRepository::update(self.pool, existing.id, update_input).await {
Ok(_) => {
info!(
"Updated sensor '{}' (ID: {}, runtime: {} → {})",
sensor_ref, existing.id, existing.runtime_ref, sensor_runtime_ref
);
result.sensors_loaded += 1;
}
Err(e) => {
let msg = format!("Failed to update sensor '{}': {}", sensor_ref, e);
warn!("{}", msg);
result.warnings.push(msg);
}
}
continue;
}
let input = CreateSensorInput { let input = CreateSensorInput {
r#ref: sensor_ref.clone(), r#ref: sensor_ref.clone(),
pack: Some(self.pack_id), pack: Some(self.pack_id),

View File

@@ -531,8 +531,13 @@ pub struct UpdateSensorInput {
pub label: Option<String>, pub label: Option<String>,
pub description: Option<String>, pub description: Option<String>,
pub entrypoint: Option<String>, pub entrypoint: Option<String>,
pub runtime: Option<Id>,
pub runtime_ref: Option<String>,
pub trigger: Option<Id>,
pub trigger_ref: Option<String>,
pub enabled: Option<bool>, pub enabled: Option<bool>,
pub param_schema: Option<JsonSchema>, pub param_schema: Option<JsonSchema>,
pub config: Option<JsonValue>,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@@ -688,6 +693,42 @@ impl Update for SensorRepository {
has_updates = true; has_updates = true;
} }
if let Some(runtime) = input.runtime {
if has_updates {
query.push(", ");
}
query.push("runtime = ");
query.push_bind(runtime);
has_updates = true;
}
if let Some(runtime_ref) = &input.runtime_ref {
if has_updates {
query.push(", ");
}
query.push("runtime_ref = ");
query.push_bind(runtime_ref);
has_updates = true;
}
if let Some(trigger) = input.trigger {
if has_updates {
query.push(", ");
}
query.push("trigger = ");
query.push_bind(trigger);
has_updates = true;
}
if let Some(trigger_ref) = &input.trigger_ref {
if has_updates {
query.push(", ");
}
query.push("trigger_ref = ");
query.push_bind(trigger_ref);
has_updates = true;
}
if let Some(param_schema) = &input.param_schema { if let Some(param_schema) = &input.param_schema {
if has_updates { if has_updates {
query.push(", "); query.push(", ");
@@ -697,6 +738,15 @@ impl Update for SensorRepository {
has_updates = true; has_updates = true;
} }
if let Some(config) = &input.config {
if has_updates {
query.push(", ");
}
query.push("config = ");
query.push_bind(config);
has_updates = true;
}
if !has_updates { if !has_updates {
// No updates requested, fetch and return existing entity // No updates requested, fetch and return existing entity
return Self::get_by_id(executor, id).await; return Self::get_by_id(executor, id).await;

View File

@@ -34,14 +34,10 @@ async fn test_create_sensor_minimal() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
// Create sensor // Create sensor
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
@@ -85,14 +81,10 @@ async fn test_create_sensor_with_param_schema() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let param_schema = json!({ let param_schema = json!({
"type": "object", "type": "object",
@@ -171,14 +163,10 @@ async fn test_create_sensor_duplicate_ref_fails() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
// Create first sensor // Create first sensor
let sensor_ref = format!("{}.duplicate_sensor", pack.r#ref); let sensor_ref = format!("{}.duplicate_sensor", pack.r#ref);
@@ -223,14 +211,10 @@ async fn test_create_sensor_invalid_ref_format_fails() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
// Try invalid ref formats // Try invalid ref formats
let invalid_refs = vec![ let invalid_refs = vec![
@@ -378,14 +362,10 @@ async fn test_find_by_id_exists() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -433,14 +413,10 @@ async fn test_get_by_id_exists() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -484,14 +460,10 @@ async fn test_find_by_ref_exists() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -540,14 +512,10 @@ async fn test_get_by_ref_exists() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -593,14 +561,10 @@ async fn test_list_all_sensors() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
// Create multiple sensors // Create multiple sensors
let _sensor1 = SensorFixture::new_unique( let _sensor1 = SensorFixture::new_unique(
@@ -668,14 +632,10 @@ async fn test_update_label() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -724,14 +684,10 @@ async fn test_update_description() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -772,14 +728,10 @@ async fn test_update_entrypoint() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -820,14 +772,10 @@ async fn test_update_enabled_status() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -883,14 +831,10 @@ async fn test_update_param_schema() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -941,14 +885,10 @@ async fn test_update_multiple_fields() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -969,6 +909,7 @@ async fn test_update_multiple_fields() {
entrypoint: Some("sensors/multi.py".to_string()), entrypoint: Some("sensors/multi.py".to_string()),
enabled: Some(false), enabled: Some(false),
param_schema: Some(json!({"type": "object"})), param_schema: Some(json!({"type": "object"})),
..Default::default()
}; };
let updated = SensorRepository::update(&pool, sensor.id, input) let updated = SensorRepository::update(&pool, sensor.id, input)
@@ -996,14 +937,10 @@ async fn test_update_no_changes() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -1066,14 +1003,10 @@ async fn test_delete_existing_sensor() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -1120,14 +1053,10 @@ async fn test_delete_sensor_when_pack_deleted() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -1167,14 +1096,10 @@ async fn test_delete_sensor_when_trigger_deleted() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -1214,14 +1139,10 @@ async fn test_delete_sensor_when_runtime_deleted() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -1270,14 +1191,10 @@ async fn test_find_by_trigger() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
// Create sensors for trigger1 // Create sensors for trigger1
let sensor1 = SensorFixture::new_unique( let sensor1 = SensorFixture::new_unique(
@@ -1364,14 +1281,10 @@ async fn test_find_enabled() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
// Create enabled sensor // Create enabled sensor
let enabled_sensor = SensorFixture::new_unique( let enabled_sensor = SensorFixture::new_unique(
@@ -1424,14 +1337,10 @@ async fn test_find_enabled_empty() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
// Create only disabled sensor // Create only disabled sensor
let disabled = SensorFixture::new_unique( let disabled = SensorFixture::new_unique(
@@ -1477,23 +1386,15 @@ async fn test_find_by_pack() {
.await .await
.unwrap(); .unwrap();
let runtime1 = RuntimeFixture::new_unique( let runtime1 = RuntimeFixture::new_unique(Some(pack1.id), Some(pack1.r#ref.clone()), "python3")
Some(pack1.id), .create(&pool)
Some(pack1.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let runtime2 = RuntimeFixture::new_unique( let runtime2 = RuntimeFixture::new_unique(Some(pack2.id), Some(pack2.r#ref.clone()), "nodejs")
Some(pack2.id), .create(&pool)
Some(pack2.r#ref.clone()), .await
"nodejs", .unwrap();
)
.create(&pool)
.await
.unwrap();
// Create sensors for pack1 // Create sensors for pack1
let sensor1 = SensorFixture::new_unique( let sensor1 = SensorFixture::new_unique(
@@ -1580,14 +1481,10 @@ async fn test_created_timestamp_set_automatically() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let before = chrono::Utc::now(); let before = chrono::Utc::now();
@@ -1625,14 +1522,10 @@ async fn test_updated_timestamp_changes_on_update() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -1679,14 +1572,10 @@ async fn test_updated_timestamp_unchanged_on_read() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),
@@ -1733,14 +1622,10 @@ async fn test_param_schema_complex_structure() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let complex_schema = json!({ let complex_schema = json!({
"type": "object", "type": "object",
@@ -1811,14 +1696,10 @@ async fn test_param_schema_can_be_null() {
.await .await
.unwrap(); .unwrap();
let runtime = RuntimeFixture::new_unique( let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
Some(pack.id), .create(&pool)
Some(pack.r#ref.clone()), .await
"python3", .unwrap();
)
.create(&pool)
.await
.unwrap();
let sensor = SensorFixture::new_unique( let sensor = SensorFixture::new_unique(
Some(pack.id), Some(pack.id),

View File

@@ -232,6 +232,11 @@ impl SensorManager {
let exec_config = runtime.parsed_execution_config(); let exec_config = runtime.parsed_execution_config();
let rt_name = runtime.name.to_lowercase(); let rt_name = runtime.name.to_lowercase();
info!(
"Sensor {} runtime details: id={}, ref='{}', name='{}', execution_config={}",
sensor.r#ref, runtime.id, runtime.r#ref, runtime.name, runtime.execution_config
);
// Resolve the interpreter: check for a virtualenv/node_modules first, // Resolve the interpreter: check for a virtualenv/node_modules first,
// then fall back to the system interpreter. // then fall back to the system interpreter.
let pack_dir = std::path::PathBuf::from(&self.inner.packs_base_dir).join(pack_ref); let pack_dir = std::path::PathBuf::from(&self.inner.packs_base_dir).join(pack_ref);
@@ -255,8 +260,13 @@ impl SensorManager {
|| interpreter_binary == "none"; || interpreter_binary == "none";
info!( info!(
"Sensor {} runtime={} interpreter={} native={}", "Sensor {} runtime={} (ref={}) interpreter='{}' native={} env_dir_exists={}",
sensor.r#ref, rt_name, interpreter_binary, is_native sensor.r#ref,
rt_name,
runtime.r#ref,
interpreter_binary,
is_native,
env_dir.exists()
); );
info!("Starting standalone sensor process: {}", sensor_script); info!("Starting standalone sensor process: {}", sensor_script);
@@ -289,8 +299,8 @@ impl SensorManager {
// Build the command: use the interpreter for non-native runtimes, // Build the command: use the interpreter for non-native runtimes,
// execute the script directly for native binaries. // execute the script directly for native binaries.
let mut cmd = if is_native { let (spawn_binary, mut cmd) = if is_native {
Command::new(&sensor_script) (sensor_script.clone(), Command::new(&sensor_script))
} else { } else {
let resolved_interpreter = let resolved_interpreter =
exec_config.resolve_interpreter_with_env(&pack_dir, env_dir_opt); exec_config.resolve_interpreter_with_env(&pack_dir, env_dir_opt);
@@ -299,15 +309,49 @@ impl SensorManager {
resolved_interpreter.display(), resolved_interpreter.display(),
sensor.r#ref sensor.r#ref
); );
let mut c = Command::new(resolved_interpreter); let binary_str = resolved_interpreter.display().to_string();
let mut c = Command::new(&resolved_interpreter);
// Pass any extra interpreter args (e.g., -u for unbuffered Python) // Pass any extra interpreter args (e.g., -u for unbuffered Python)
for arg in &exec_config.interpreter.args { for arg in &exec_config.interpreter.args {
c.arg(arg); c.arg(arg);
} }
c.arg(&sensor_script); c.arg(&sensor_script);
c (binary_str, c)
}; };
// Log the full command for diagnostics
info!(
"Spawning sensor {}: binary='{}' is_native={} script='{}'",
sensor.r#ref, spawn_binary, is_native, sensor_script
);
// Pre-flight check: verify the binary exists and is accessible
let spawn_path = std::path::Path::new(&spawn_binary);
if spawn_path.is_absolute() || spawn_path.components().count() > 1 {
// Absolute or relative path with directory component — check it directly
match std::fs::metadata(spawn_path) {
Ok(meta) => {
use std::os::unix::fs::PermissionsExt;
let mode = meta.permissions().mode();
let is_exec = mode & 0o111 != 0;
if !is_exec {
error!(
"Binary '{}' exists but is not executable (mode: {:o}). \
Sensor runtime ref='{}', execution_config interpreter='{}'.",
spawn_binary, mode, runtime.r#ref, interpreter_binary
);
}
}
Err(e) => {
error!(
"Cannot access binary '{}': {}. \
Sensor runtime ref='{}', execution_config interpreter='{}'.",
spawn_binary, e, runtime.r#ref, interpreter_binary
);
}
}
}
// Start the standalone sensor with token and configuration // Start the standalone sensor with token and configuration
// Pass sensor ref (e.g., "core.interval_timer_sensor") for proper identification // Pass sensor ref (e.g., "core.interval_timer_sensor") for proper identification
let mut child = cmd let mut child = cmd
@@ -323,7 +367,19 @@ impl SensorManager {
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.spawn() .spawn()
.map_err(|e| anyhow!("Failed to start standalone sensor process: {}", e))?; .map_err(|e| {
anyhow!(
"Failed to start sensor process for '{}': {} \
(binary='{}', is_native={}, runtime_ref='{}', \
interpreter_config='{}')",
sensor.r#ref,
e,
spawn_binary,
is_native,
runtime.r#ref,
interpreter_binary
)
})?;
// Get stdout and stderr for logging (standalone sensors output JSON logs to stdout) // Get stdout and stderr for logging (standalone sensors output JSON logs to stdout)
let stdout = child let stdout = child

View File

@@ -27,7 +27,7 @@ CREATE TABLE key (
-- Constraints -- Constraints
CONSTRAINT key_ref_lowercase CHECK (ref = LOWER(ref)), CONSTRAINT key_ref_lowercase CHECK (ref = LOWER(ref)),
CONSTRAINT key_ref_format CHECK (ref ~ '^([^.]+\.)?[^.]+$') CONSTRAINT key_ref_format CHECK (ref ~ '^[^.]+(\.[^.]+)*$')
); );
-- Unique index on owner_type, owner, name -- Unique index on owner_type, owner, name

View File

@@ -528,6 +528,8 @@ class PackLoader:
label = EXCLUDED.label, label = EXCLUDED.label,
description = EXCLUDED.description, description = EXCLUDED.description,
entrypoint = EXCLUDED.entrypoint, entrypoint = EXCLUDED.entrypoint,
runtime = EXCLUDED.runtime,
runtime_ref = EXCLUDED.runtime_ref,
trigger = EXCLUDED.trigger, trigger = EXCLUDED.trigger,
trigger_ref = EXCLUDED.trigger_ref, trigger_ref = EXCLUDED.trigger_ref,
enabled = EXCLUDED.enabled, enabled = EXCLUDED.enabled,

View File

@@ -0,0 +1,45 @@
# Fix: Python Sensor "Permission Denied" — Stale Runtime Assignment
**Date**: 2026-02-20
## Problem
Python-based sensors failed to start with `Permission denied (os error 13)`.
## Root Cause
The sensor's runtime in the database pointed to `core.builtin` (empty `execution_config`) instead of `core.python`. This caused `is_native=true`, making the sensor manager try to execute the `.py` script directly — which fails without the execute bit.
The stale assignment persisted because the pack component loader **skipped** existing sensors on re-registration instead of updating them. Once a sensor was created with the wrong runtime, there was no way to correct it short of deleting the pack entirely.
**DB evidence**: `SELECT runtime, runtime_ref FROM sensor``runtime=4, runtime_ref=core.builtin` (should be `runtime=3, runtime_ref=core.python`).
## Changes
### 1. Sensor upsert on re-registration (`crates/common/src/pack_registry/loader.rs`)
- Changed `load_sensors` from skip-if-exists to upsert: existing sensors are updated with fresh metadata from the YAML (runtime, entrypoint, trigger, config, etc.)
- Re-registering a pack now corrects stale runtime assignments
### 2. `UpdateSensorInput` extended (`crates/common/src/repositories/trigger.rs`)
- Added `runtime`, `runtime_ref`, `trigger`, `trigger_ref`, and `config` fields so the update path can correct all sensor metadata
- Updated all callsites in `crates/api/src/routes/triggers.rs` and tests
### 3. Registration-time validation (`crates/common/src/pack_registry/loader.rs`)
- Warns if a non-native `runner_type` (e.g., `python`) resolves to runtime ID 0 (not found)
- Warns if the resolved runtime has empty/missing `execution_config`
### 4. Sensor manager diagnostics (`crates/sensor/src/sensor_manager.rs`)
- Logs full runtime details (id, ref, name, raw `execution_config` JSON)
- Logs `env_dir_exists` status and resolved interpreter path
- Pre-flight check: verifies binary exists and has execute permission before spawn
- Error message includes binary path, `is_native` flag, and runtime ref
### 5. Python loader consistency (`scripts/load_core_pack.py`)
- Added `runtime` and `runtime_ref` to sensor `ON CONFLICT DO UPDATE` clause
## Verification
After rebuilding and re-registering the pack with `force=true`:
- Sensor runtime corrected: `core.builtin``core.python`
- Sensor started successfully with venv interpreter at `/opt/attune/runtime_envs/python_example/python/bin/python3`
- Counter sensor fully operational (RabbitMQ connected, rules bootstrapped)