Compare commits
2 Commits
f9cfcf8f40
...
d629da32fa
| Author | SHA1 | Date | |
|---|---|---|---|
| d629da32fa | |||
| a84c07082c |
@@ -148,8 +148,42 @@ impl From<sqlx::Error> for ApiError {
|
||||
match err {
|
||||
sqlx::Error::RowNotFound => ApiError::NotFound("Resource not found".to_string()),
|
||||
sqlx::Error::Database(db_err) => {
|
||||
// Check for unique constraint violations
|
||||
if let Some(constraint) = db_err.constraint() {
|
||||
// PostgreSQL error codes:
|
||||
// 23505 = unique_violation → 409 Conflict
|
||||
// 23503 = foreign_key_violation → 422 Unprocessable Entity
|
||||
// 23514 = check_violation → 422 Unprocessable Entity
|
||||
// P0001 = raise_exception → 400 Bad Request (trigger-raised errors)
|
||||
let pg_code = db_err.code().map(|c| c.to_string()).unwrap_or_default();
|
||||
if pg_code == "23505" {
|
||||
// Unique constraint violation — duplicate key
|
||||
let detail = db_err
|
||||
.constraint()
|
||||
.map(|c| format!(" ({})", c))
|
||||
.unwrap_or_default();
|
||||
ApiError::Conflict(format!("Already exists{}", detail))
|
||||
} else if pg_code == "23503" {
|
||||
// Foreign key violation — the referenced row doesn't exist
|
||||
let detail = db_err
|
||||
.constraint()
|
||||
.map(|c| format!(" ({})", c))
|
||||
.unwrap_or_default();
|
||||
ApiError::UnprocessableEntity(format!(
|
||||
"Referenced entity does not exist{}",
|
||||
detail
|
||||
))
|
||||
} else if pg_code == "23514" {
|
||||
// CHECK constraint violation — value doesn't meet constraint
|
||||
let detail = db_err
|
||||
.constraint()
|
||||
.map(|c| format!(": {}", c))
|
||||
.unwrap_or_default();
|
||||
ApiError::UnprocessableEntity(format!("Validation constraint failed{}", detail))
|
||||
} else if pg_code == "P0001" {
|
||||
// RAISE EXCEPTION from a trigger or function
|
||||
// Extract the human-readable message from the exception
|
||||
let msg = db_err.message().to_string();
|
||||
ApiError::BadRequest(msg)
|
||||
} else if let Some(constraint) = db_err.constraint() {
|
||||
ApiError::Conflict(format!("Constraint violation: {}", constraint))
|
||||
} else {
|
||||
ApiError::DatabaseError(format!("Database error: {}", db_err))
|
||||
|
||||
@@ -719,8 +719,13 @@ pub async fn update_sensor(
|
||||
label: request.label,
|
||||
description: request.description,
|
||||
entrypoint: request.entrypoint,
|
||||
runtime: None,
|
||||
runtime_ref: None,
|
||||
trigger: None,
|
||||
trigger_ref: None,
|
||||
enabled: request.enabled,
|
||||
param_schema: request.param_schema,
|
||||
config: None,
|
||||
};
|
||||
|
||||
let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?;
|
||||
@@ -799,8 +804,13 @@ pub async fn enable_sensor(
|
||||
label: None,
|
||||
description: None,
|
||||
entrypoint: None,
|
||||
runtime: None,
|
||||
runtime_ref: None,
|
||||
trigger: None,
|
||||
trigger_ref: None,
|
||||
enabled: Some(true),
|
||||
param_schema: None,
|
||||
config: None,
|
||||
};
|
||||
|
||||
let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?;
|
||||
@@ -840,8 +850,13 @@ pub async fn disable_sensor(
|
||||
label: None,
|
||||
description: None,
|
||||
entrypoint: None,
|
||||
runtime: None,
|
||||
runtime_ref: None,
|
||||
trigger: None,
|
||||
trigger_ref: None,
|
||||
enabled: Some(false),
|
||||
param_schema: None,
|
||||
config: None,
|
||||
};
|
||||
|
||||
let sensor = SensorRepository::update(&state.db, existing_sensor.id, update_input).await?;
|
||||
|
||||
@@ -23,7 +23,7 @@ use crate::repositories::runtime::{CreateRuntimeInput, RuntimeRepository};
|
||||
use crate::repositories::trigger::{
|
||||
CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository,
|
||||
};
|
||||
use crate::repositories::{Create, FindByRef};
|
||||
use crate::repositories::{Create, FindById, FindByRef, Update};
|
||||
|
||||
/// Result of loading pack components into the database.
|
||||
#[derive(Debug, Default)]
|
||||
@@ -514,6 +514,47 @@ impl<'a> PackComponentLoader<'a> {
|
||||
.unwrap_or("native");
|
||||
let (sensor_runtime_id, sensor_runtime_ref) = self.resolve_runtime(runner_type).await?;
|
||||
|
||||
// Validate: if the runner_type suggests an interpreted runtime (not native)
|
||||
// but we couldn't resolve it, or it resolved to a runtime with no
|
||||
// execution_config, warn at registration time rather than failing
|
||||
// opaquely at sensor startup with "Permission denied".
|
||||
let is_native_runner = matches!(
|
||||
runner_type.to_lowercase().as_str(),
|
||||
"native" | "builtin" | "standalone"
|
||||
);
|
||||
if sensor_runtime_id == 0 && !is_native_runner {
|
||||
let msg = format!(
|
||||
"Sensor '{}' declares runner_type '{}' but no matching runtime \
|
||||
was found in the database. The sensor will not be able to start. \
|
||||
Ensure the core pack (with runtimes) is loaded before registering \
|
||||
packs that depend on its runtimes.",
|
||||
filename, runner_type
|
||||
);
|
||||
warn!("{}", msg);
|
||||
result.warnings.push(msg);
|
||||
} else if sensor_runtime_id != 0 && !is_native_runner {
|
||||
// Verify the resolved runtime has a non-empty execution_config
|
||||
if let Some(runtime) =
|
||||
RuntimeRepository::find_by_id(self.pool, sensor_runtime_id).await?
|
||||
{
|
||||
let exec_config = runtime.parsed_execution_config();
|
||||
if exec_config.interpreter.binary.is_empty()
|
||||
|| exec_config.interpreter.binary == "native"
|
||||
|| exec_config.interpreter.binary == "none"
|
||||
{
|
||||
let msg = format!(
|
||||
"Sensor '{}' declares runner_type '{}' (resolved to runtime '{}') \
|
||||
but that runtime has no interpreter configured in its \
|
||||
execution_config. The sensor will fail to start. \
|
||||
Check the runtime definition for '{}'.",
|
||||
filename, runner_type, runtime.r#ref, runtime.r#ref
|
||||
);
|
||||
warn!("{}", msg);
|
||||
result.warnings.push(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let sensor_ref = match data.get("ref").and_then(|v| v.as_str()) {
|
||||
Some(r) => r.to_string(),
|
||||
None => {
|
||||
@@ -524,16 +565,6 @@ impl<'a> PackComponentLoader<'a> {
|
||||
}
|
||||
};
|
||||
|
||||
// Check if sensor already exists
|
||||
if let Some(existing) = SensorRepository::find_by_ref(self.pool, &sensor_ref).await? {
|
||||
info!(
|
||||
"Sensor '{}' already exists (ID: {}), skipping",
|
||||
sensor_ref, existing.id
|
||||
);
|
||||
result.sensors_skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let name = extract_name_from_ref(&sensor_ref);
|
||||
let label = data
|
||||
.get("label")
|
||||
@@ -570,6 +601,41 @@ impl<'a> PackComponentLoader<'a> {
|
||||
.and_then(|v| serde_json::to_value(v).ok())
|
||||
.unwrap_or_else(|| serde_json::json!({}));
|
||||
|
||||
// Upsert: update existing sensors so re-registration corrects
|
||||
// stale metadata (especially runtime assignments).
|
||||
if let Some(existing) = SensorRepository::find_by_ref(self.pool, &sensor_ref).await? {
|
||||
use crate::repositories::trigger::UpdateSensorInput;
|
||||
|
||||
let update_input = UpdateSensorInput {
|
||||
label: Some(label),
|
||||
description: Some(description),
|
||||
entrypoint: Some(entrypoint),
|
||||
runtime: Some(sensor_runtime_id),
|
||||
runtime_ref: Some(sensor_runtime_ref.clone()),
|
||||
trigger: Some(trigger_id.unwrap_or(existing.trigger)),
|
||||
trigger_ref: Some(trigger_ref.unwrap_or(existing.trigger_ref.clone())),
|
||||
enabled: Some(enabled),
|
||||
param_schema,
|
||||
config: Some(config),
|
||||
};
|
||||
|
||||
match SensorRepository::update(self.pool, existing.id, update_input).await {
|
||||
Ok(_) => {
|
||||
info!(
|
||||
"Updated sensor '{}' (ID: {}, runtime: {} → {})",
|
||||
sensor_ref, existing.id, existing.runtime_ref, sensor_runtime_ref
|
||||
);
|
||||
result.sensors_loaded += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
let msg = format!("Failed to update sensor '{}': {}", sensor_ref, e);
|
||||
warn!("{}", msg);
|
||||
result.warnings.push(msg);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let input = CreateSensorInput {
|
||||
r#ref: sensor_ref.clone(),
|
||||
pack: Some(self.pack_id),
|
||||
|
||||
@@ -531,8 +531,13 @@ pub struct UpdateSensorInput {
|
||||
pub label: Option<String>,
|
||||
pub description: Option<String>,
|
||||
pub entrypoint: Option<String>,
|
||||
pub runtime: Option<Id>,
|
||||
pub runtime_ref: Option<String>,
|
||||
pub trigger: Option<Id>,
|
||||
pub trigger_ref: Option<String>,
|
||||
pub enabled: Option<bool>,
|
||||
pub param_schema: Option<JsonSchema>,
|
||||
pub config: Option<JsonValue>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -688,6 +693,42 @@ impl Update for SensorRepository {
|
||||
has_updates = true;
|
||||
}
|
||||
|
||||
if let Some(runtime) = input.runtime {
|
||||
if has_updates {
|
||||
query.push(", ");
|
||||
}
|
||||
query.push("runtime = ");
|
||||
query.push_bind(runtime);
|
||||
has_updates = true;
|
||||
}
|
||||
|
||||
if let Some(runtime_ref) = &input.runtime_ref {
|
||||
if has_updates {
|
||||
query.push(", ");
|
||||
}
|
||||
query.push("runtime_ref = ");
|
||||
query.push_bind(runtime_ref);
|
||||
has_updates = true;
|
||||
}
|
||||
|
||||
if let Some(trigger) = input.trigger {
|
||||
if has_updates {
|
||||
query.push(", ");
|
||||
}
|
||||
query.push("trigger = ");
|
||||
query.push_bind(trigger);
|
||||
has_updates = true;
|
||||
}
|
||||
|
||||
if let Some(trigger_ref) = &input.trigger_ref {
|
||||
if has_updates {
|
||||
query.push(", ");
|
||||
}
|
||||
query.push("trigger_ref = ");
|
||||
query.push_bind(trigger_ref);
|
||||
has_updates = true;
|
||||
}
|
||||
|
||||
if let Some(param_schema) = &input.param_schema {
|
||||
if has_updates {
|
||||
query.push(", ");
|
||||
@@ -697,6 +738,15 @@ impl Update for SensorRepository {
|
||||
has_updates = true;
|
||||
}
|
||||
|
||||
if let Some(config) = &input.config {
|
||||
if has_updates {
|
||||
query.push(", ");
|
||||
}
|
||||
query.push("config = ");
|
||||
query.push_bind(config);
|
||||
has_updates = true;
|
||||
}
|
||||
|
||||
if !has_updates {
|
||||
// No updates requested, fetch and return existing entity
|
||||
return Self::get_by_id(executor, id).await;
|
||||
|
||||
@@ -34,11 +34,7 @@ async fn test_create_sensor_minimal() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -85,11 +81,7 @@ async fn test_create_sensor_with_param_schema() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -171,11 +163,7 @@ async fn test_create_sensor_duplicate_ref_fails() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -223,11 +211,7 @@ async fn test_create_sensor_invalid_ref_format_fails() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -378,11 +362,7 @@ async fn test_find_by_id_exists() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -433,11 +413,7 @@ async fn test_get_by_id_exists() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -484,11 +460,7 @@ async fn test_find_by_ref_exists() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -540,11 +512,7 @@ async fn test_get_by_ref_exists() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -593,11 +561,7 @@ async fn test_list_all_sensors() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -668,11 +632,7 @@ async fn test_update_label() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -724,11 +684,7 @@ async fn test_update_description() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -772,11 +728,7 @@ async fn test_update_entrypoint() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -820,11 +772,7 @@ async fn test_update_enabled_status() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -883,11 +831,7 @@ async fn test_update_param_schema() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -941,11 +885,7 @@ async fn test_update_multiple_fields() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -969,6 +909,7 @@ async fn test_update_multiple_fields() {
|
||||
entrypoint: Some("sensors/multi.py".to_string()),
|
||||
enabled: Some(false),
|
||||
param_schema: Some(json!({"type": "object"})),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let updated = SensorRepository::update(&pool, sensor.id, input)
|
||||
@@ -996,11 +937,7 @@ async fn test_update_no_changes() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1066,11 +1003,7 @@ async fn test_delete_existing_sensor() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1120,11 +1053,7 @@ async fn test_delete_sensor_when_pack_deleted() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1167,11 +1096,7 @@ async fn test_delete_sensor_when_trigger_deleted() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1214,11 +1139,7 @@ async fn test_delete_sensor_when_runtime_deleted() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1270,11 +1191,7 @@ async fn test_find_by_trigger() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1364,11 +1281,7 @@ async fn test_find_enabled() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1424,11 +1337,7 @@ async fn test_find_enabled_empty() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1477,20 +1386,12 @@ async fn test_find_by_pack() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime1 = RuntimeFixture::new_unique(
|
||||
Some(pack1.id),
|
||||
Some(pack1.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime1 = RuntimeFixture::new_unique(Some(pack1.id), Some(pack1.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime2 = RuntimeFixture::new_unique(
|
||||
Some(pack2.id),
|
||||
Some(pack2.r#ref.clone()),
|
||||
"nodejs",
|
||||
)
|
||||
let runtime2 = RuntimeFixture::new_unique(Some(pack2.id), Some(pack2.r#ref.clone()), "nodejs")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1580,11 +1481,7 @@ async fn test_created_timestamp_set_automatically() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1625,11 +1522,7 @@ async fn test_updated_timestamp_changes_on_update() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1679,11 +1572,7 @@ async fn test_updated_timestamp_unchanged_on_read() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1733,11 +1622,7 @@ async fn test_param_schema_complex_structure() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -1811,11 +1696,7 @@ async fn test_param_schema_can_be_null() {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let runtime = RuntimeFixture::new_unique(
|
||||
Some(pack.id),
|
||||
Some(pack.r#ref.clone()),
|
||||
"python3",
|
||||
)
|
||||
let runtime = RuntimeFixture::new_unique(Some(pack.id), Some(pack.r#ref.clone()), "python3")
|
||||
.create(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -232,6 +232,11 @@ impl SensorManager {
|
||||
let exec_config = runtime.parsed_execution_config();
|
||||
let rt_name = runtime.name.to_lowercase();
|
||||
|
||||
info!(
|
||||
"Sensor {} runtime details: id={}, ref='{}', name='{}', execution_config={}",
|
||||
sensor.r#ref, runtime.id, runtime.r#ref, runtime.name, runtime.execution_config
|
||||
);
|
||||
|
||||
// Resolve the interpreter: check for a virtualenv/node_modules first,
|
||||
// then fall back to the system interpreter.
|
||||
let pack_dir = std::path::PathBuf::from(&self.inner.packs_base_dir).join(pack_ref);
|
||||
@@ -255,8 +260,13 @@ impl SensorManager {
|
||||
|| interpreter_binary == "none";
|
||||
|
||||
info!(
|
||||
"Sensor {} runtime={} interpreter={} native={}",
|
||||
sensor.r#ref, rt_name, interpreter_binary, is_native
|
||||
"Sensor {} runtime={} (ref={}) interpreter='{}' native={} env_dir_exists={}",
|
||||
sensor.r#ref,
|
||||
rt_name,
|
||||
runtime.r#ref,
|
||||
interpreter_binary,
|
||||
is_native,
|
||||
env_dir.exists()
|
||||
);
|
||||
info!("Starting standalone sensor process: {}", sensor_script);
|
||||
|
||||
@@ -289,8 +299,8 @@ impl SensorManager {
|
||||
|
||||
// Build the command: use the interpreter for non-native runtimes,
|
||||
// execute the script directly for native binaries.
|
||||
let mut cmd = if is_native {
|
||||
Command::new(&sensor_script)
|
||||
let (spawn_binary, mut cmd) = if is_native {
|
||||
(sensor_script.clone(), Command::new(&sensor_script))
|
||||
} else {
|
||||
let resolved_interpreter =
|
||||
exec_config.resolve_interpreter_with_env(&pack_dir, env_dir_opt);
|
||||
@@ -299,15 +309,49 @@ impl SensorManager {
|
||||
resolved_interpreter.display(),
|
||||
sensor.r#ref
|
||||
);
|
||||
let mut c = Command::new(resolved_interpreter);
|
||||
let binary_str = resolved_interpreter.display().to_string();
|
||||
let mut c = Command::new(&resolved_interpreter);
|
||||
// Pass any extra interpreter args (e.g., -u for unbuffered Python)
|
||||
for arg in &exec_config.interpreter.args {
|
||||
c.arg(arg);
|
||||
}
|
||||
c.arg(&sensor_script);
|
||||
c
|
||||
(binary_str, c)
|
||||
};
|
||||
|
||||
// Log the full command for diagnostics
|
||||
info!(
|
||||
"Spawning sensor {}: binary='{}' is_native={} script='{}'",
|
||||
sensor.r#ref, spawn_binary, is_native, sensor_script
|
||||
);
|
||||
|
||||
// Pre-flight check: verify the binary exists and is accessible
|
||||
let spawn_path = std::path::Path::new(&spawn_binary);
|
||||
if spawn_path.is_absolute() || spawn_path.components().count() > 1 {
|
||||
// Absolute or relative path with directory component — check it directly
|
||||
match std::fs::metadata(spawn_path) {
|
||||
Ok(meta) => {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
let mode = meta.permissions().mode();
|
||||
let is_exec = mode & 0o111 != 0;
|
||||
if !is_exec {
|
||||
error!(
|
||||
"Binary '{}' exists but is not executable (mode: {:o}). \
|
||||
Sensor runtime ref='{}', execution_config interpreter='{}'.",
|
||||
spawn_binary, mode, runtime.r#ref, interpreter_binary
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Cannot access binary '{}': {}. \
|
||||
Sensor runtime ref='{}', execution_config interpreter='{}'.",
|
||||
spawn_binary, e, runtime.r#ref, interpreter_binary
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start the standalone sensor with token and configuration
|
||||
// Pass sensor ref (e.g., "core.interval_timer_sensor") for proper identification
|
||||
let mut child = cmd
|
||||
@@ -323,7 +367,19 @@ impl SensorManager {
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.map_err(|e| anyhow!("Failed to start standalone sensor process: {}", e))?;
|
||||
.map_err(|e| {
|
||||
anyhow!(
|
||||
"Failed to start sensor process for '{}': {} \
|
||||
(binary='{}', is_native={}, runtime_ref='{}', \
|
||||
interpreter_config='{}')",
|
||||
sensor.r#ref,
|
||||
e,
|
||||
spawn_binary,
|
||||
is_native,
|
||||
runtime.r#ref,
|
||||
interpreter_binary
|
||||
)
|
||||
})?;
|
||||
|
||||
// Get stdout and stderr for logging (standalone sensors output JSON logs to stdout)
|
||||
let stdout = child
|
||||
|
||||
@@ -27,7 +27,7 @@ CREATE TABLE key (
|
||||
|
||||
-- Constraints
|
||||
CONSTRAINT key_ref_lowercase CHECK (ref = LOWER(ref)),
|
||||
CONSTRAINT key_ref_format CHECK (ref ~ '^([^.]+\.)?[^.]+$')
|
||||
CONSTRAINT key_ref_format CHECK (ref ~ '^[^.]+(\.[^.]+)*$')
|
||||
);
|
||||
|
||||
-- Unique index on owner_type, owner, name
|
||||
|
||||
@@ -528,6 +528,8 @@ class PackLoader:
|
||||
label = EXCLUDED.label,
|
||||
description = EXCLUDED.description,
|
||||
entrypoint = EXCLUDED.entrypoint,
|
||||
runtime = EXCLUDED.runtime,
|
||||
runtime_ref = EXCLUDED.runtime_ref,
|
||||
trigger = EXCLUDED.trigger,
|
||||
trigger_ref = EXCLUDED.trigger_ref,
|
||||
enabled = EXCLUDED.enabled,
|
||||
|
||||
45
work-summary/2026-02-20-sensor-runtime-upsert-fix.md
Normal file
45
work-summary/2026-02-20-sensor-runtime-upsert-fix.md
Normal file
@@ -0,0 +1,45 @@
|
||||
# Fix: Python Sensor "Permission Denied" — Stale Runtime Assignment
|
||||
|
||||
**Date**: 2026-02-20
|
||||
|
||||
## Problem
|
||||
|
||||
Python-based sensors failed to start with `Permission denied (os error 13)`.
|
||||
|
||||
## Root Cause
|
||||
|
||||
The sensor's runtime in the database pointed to `core.builtin` (empty `execution_config`) instead of `core.python`. This caused `is_native=true`, making the sensor manager try to execute the `.py` script directly — which fails without the execute bit.
|
||||
|
||||
The stale assignment persisted because the pack component loader **skipped** existing sensors on re-registration instead of updating them. Once a sensor was created with the wrong runtime, there was no way to correct it short of deleting the pack entirely.
|
||||
|
||||
**DB evidence**: `SELECT runtime, runtime_ref FROM sensor` → `runtime=4, runtime_ref=core.builtin` (should be `runtime=3, runtime_ref=core.python`).
|
||||
|
||||
## Changes
|
||||
|
||||
### 1. Sensor upsert on re-registration (`crates/common/src/pack_registry/loader.rs`)
|
||||
- Changed `load_sensors` from skip-if-exists to upsert: existing sensors are updated with fresh metadata from the YAML (runtime, entrypoint, trigger, config, etc.)
|
||||
- Re-registering a pack now corrects stale runtime assignments
|
||||
|
||||
### 2. `UpdateSensorInput` extended (`crates/common/src/repositories/trigger.rs`)
|
||||
- Added `runtime`, `runtime_ref`, `trigger`, `trigger_ref`, and `config` fields so the update path can correct all sensor metadata
|
||||
- Updated all callsites in `crates/api/src/routes/triggers.rs` and tests
|
||||
|
||||
### 3. Registration-time validation (`crates/common/src/pack_registry/loader.rs`)
|
||||
- Warns if a non-native `runner_type` (e.g., `python`) resolves to runtime ID 0 (not found)
|
||||
- Warns if the resolved runtime has empty/missing `execution_config`
|
||||
|
||||
### 4. Sensor manager diagnostics (`crates/sensor/src/sensor_manager.rs`)
|
||||
- Logs full runtime details (id, ref, name, raw `execution_config` JSON)
|
||||
- Logs `env_dir_exists` status and resolved interpreter path
|
||||
- Pre-flight check: verifies binary exists and has execute permission before spawn
|
||||
- Error message includes binary path, `is_native` flag, and runtime ref
|
||||
|
||||
### 5. Python loader consistency (`scripts/load_core_pack.py`)
|
||||
- Added `runtime` and `runtime_ref` to sensor `ON CONFLICT DO UPDATE` clause
|
||||
|
||||
## Verification
|
||||
|
||||
After rebuilding and re-registering the pack with `force=true`:
|
||||
- Sensor runtime corrected: `core.builtin` → `core.python`
|
||||
- Sensor started successfully with venv interpreter at `/opt/attune/runtime_envs/python_example/python/bin/python3`
|
||||
- Counter sensor fully operational (RabbitMQ connected, rules bootstrapped)
|
||||
Reference in New Issue
Block a user