diff --git a/crates/api/src/dto/artifact.rs b/crates/api/src/dto/artifact.rs index ac6b908..875830a 100644 --- a/crates/api/src/dto/artifact.rs +++ b/crates/api/src/dto/artifact.rs @@ -105,6 +105,9 @@ pub struct UpdateArtifactRequest { /// Updated content type pub content_type: Option, + /// Updated execution ID (re-links artifact to a different execution) + pub execution: Option, + /// Updated structured data (replaces existing data entirely) pub data: Option, } diff --git a/crates/api/src/routes/actions.rs b/crates/api/src/routes/actions.rs index d087557..6e49e11 100644 --- a/crates/api/src/routes/actions.rs +++ b/crates/api/src/routes/actions.rs @@ -237,6 +237,9 @@ pub async fn update_action( runtime_version_constraint: request.runtime_version_constraint, param_schema: request.param_schema, out_schema: request.out_schema, + parameter_delivery: None, + parameter_format: None, + output_format: None, }; let action = ActionRepository::update(&state.db, existing_action.id, update_input).await?; diff --git a/crates/api/src/routes/artifacts.rs b/crates/api/src/routes/artifacts.rs index 0852023..1fcb491 100644 --- a/crates/api/src/routes/artifacts.rs +++ b/crates/api/src/routes/artifacts.rs @@ -8,6 +8,7 @@ //! - Progress append for progress-type artifacts (streaming updates) //! - Listing artifacts by execution //! - Version history and retrieval +//! - Upsert-and-upload: create-or-reuse an artifact by ref and upload a version in one call use axum::{ body::Body, @@ -20,7 +21,9 @@ use axum::{ use std::sync::Arc; use tracing::warn; -use attune_common::models::enums::{ArtifactType, ArtifactVisibility}; +use attune_common::models::enums::{ + ArtifactType, ArtifactVisibility, OwnerType, RetentionPolicyType, +}; use attune_common::repositories::{ artifact::{ ArtifactRepository, ArtifactSearchFilters, ArtifactVersionRepository, CreateArtifactInput, @@ -251,6 +254,7 @@ pub async fn update_artifact( description: request.description, content_type: request.content_type, size_bytes: None, // Managed by version creation trigger + execution: request.execution.map(Some), data: request.data, }; @@ -970,6 +974,282 @@ pub async fn delete_version( )) } +// ============================================================================ +// Upsert-and-upload by ref +// ============================================================================ + +/// Upload a file version to an artifact identified by ref, creating the artifact if it does not +/// already exist. +/// +/// This is the recommended way for actions to produce versioned file artifacts. The caller +/// provides the artifact ref and file content in a single multipart request. The server: +/// +/// 1. Looks up the artifact by `ref`. +/// 2. If not found, creates it using the metadata fields in the multipart body. +/// 3. If found, optionally updates the `execution` link to the current execution. +/// 4. Uploads the file bytes as a new version (version number is auto-assigned). +/// +/// **Multipart fields:** +/// - `file` (required) — the binary file content +/// - `ref` (required for creation) — artifact reference (ignored if artifact already exists) +/// - `scope` — owner scope: `system`, `pack`, `action`, `sensor`, `rule` (default: `action`) +/// - `owner` — owner identifier (default: empty string) +/// - `type` — artifact type: `file_text`, `file_image`, etc. (default: `file_text`) +/// - `visibility` — `public` or `private` (default: type-aware server default) +/// - `name` — human-readable name +/// - `description` — optional description +/// - `content_type` — MIME type (default: auto-detected from multipart or `application/octet-stream`) +/// - `execution` — execution ID to link this artifact to (updates existing artifacts too) +/// - `retention_policy` — `versions`, `days`, `hours`, `minutes` (default: `versions`) +/// - `retention_limit` — limit value (default: `10`) +/// - `created_by` — who created this version +/// - `meta` — JSON metadata for this version +#[utoipa::path( + post, + path = "/api/v1/artifacts/ref/{ref}/versions/upload", + tag = "artifacts", + params(("ref" = String, Path, description = "Artifact reference (created if not found)")), + request_body(content = String, content_type = "multipart/form-data"), + responses( + (status = 201, description = "Version created (artifact may have been created too)", body = inline(ApiResponse)), + (status = 400, description = "Missing file field or invalid metadata"), + (status = 413, description = "File too large"), + ), + security(("bearer_auth" = [])) +)] +pub async fn upload_version_by_ref( + RequireAuth(_user): RequireAuth, + State(state): State>, + Path(artifact_ref): Path, + mut multipart: Multipart, +) -> ApiResult { + // 50 MB limit + const MAX_FILE_SIZE: usize = 50 * 1024 * 1024; + + // Collect all multipart fields + let mut file_data: Option> = None; + let mut file_content_type: Option = None; + let mut content_type_field: Option = None; + let mut meta: Option = None; + let mut created_by: Option = None; + + // Artifact-creation metadata (used only when creating a new artifact) + let mut scope: Option = None; + let mut owner: Option = None; + let mut artifact_type: Option = None; + let mut visibility: Option = None; + let mut name: Option = None; + let mut description: Option = None; + let mut execution: Option = None; + let mut retention_policy: Option = None; + let mut retention_limit: Option = None; + + while let Some(field) = multipart + .next_field() + .await + .map_err(|e| ApiError::BadRequest(format!("Multipart error: {}", e)))? + { + let field_name = field.name().unwrap_or("").to_string(); + match field_name.as_str() { + "file" => { + file_content_type = field.content_type().map(|s| s.to_string()); + let bytes = field + .bytes() + .await + .map_err(|e| ApiError::BadRequest(format!("Failed to read file: {}", e)))?; + if bytes.len() > MAX_FILE_SIZE { + return Err(ApiError::BadRequest(format!( + "File exceeds maximum size of {} bytes", + MAX_FILE_SIZE + ))); + } + file_data = Some(bytes.to_vec()); + } + "content_type" => { + let t = field.text().await.unwrap_or_default(); + if !t.is_empty() { + content_type_field = Some(t); + } + } + "meta" => { + let t = field.text().await.unwrap_or_default(); + if !t.is_empty() { + meta = + Some(serde_json::from_str(&t).map_err(|e| { + ApiError::BadRequest(format!("Invalid meta JSON: {}", e)) + })?); + } + } + "created_by" => { + let t = field.text().await.unwrap_or_default(); + if !t.is_empty() { + created_by = Some(t); + } + } + "scope" => { + scope = Some(field.text().await.unwrap_or_default()); + } + "owner" => { + owner = Some(field.text().await.unwrap_or_default()); + } + "type" => { + artifact_type = Some(field.text().await.unwrap_or_default()); + } + "visibility" => { + visibility = Some(field.text().await.unwrap_or_default()); + } + "name" => { + name = Some(field.text().await.unwrap_or_default()); + } + "description" => { + description = Some(field.text().await.unwrap_or_default()); + } + "execution" => { + execution = Some(field.text().await.unwrap_or_default()); + } + "retention_policy" => { + retention_policy = Some(field.text().await.unwrap_or_default()); + } + "retention_limit" => { + retention_limit = Some(field.text().await.unwrap_or_default()); + } + _ => { /* skip unknown fields */ } + } + } + + let file_bytes = file_data.ok_or_else(|| { + ApiError::BadRequest("Missing required 'file' field in multipart upload".to_string()) + })?; + + // Parse execution ID + let execution_id: Option = match &execution { + Some(s) if !s.is_empty() => Some( + s.parse::() + .map_err(|_| ApiError::BadRequest(format!("Invalid execution ID: '{}'", s)))?, + ), + _ => None, + }; + + // Upsert: find existing artifact or create a new one + let artifact = match ArtifactRepository::find_by_ref(&state.db, &artifact_ref).await? { + Some(existing) => { + // Update execution link if a new execution ID was provided + if execution_id.is_some() && execution_id != existing.execution { + let update_input = UpdateArtifactInput { + r#ref: None, + scope: None, + owner: None, + r#type: None, + visibility: None, + retention_policy: None, + retention_limit: None, + name: None, + description: None, + content_type: None, + size_bytes: None, + execution: execution_id.map(Some), + data: None, + }; + ArtifactRepository::update(&state.db, existing.id, update_input).await? + } else { + existing + } + } + None => { + // Parse artifact type + let a_type: ArtifactType = match &artifact_type { + Some(t) => serde_json::from_value(serde_json::Value::String(t.clone())) + .map_err(|_| ApiError::BadRequest(format!("Invalid artifact type: '{}'", t)))?, + None => ArtifactType::FileText, + }; + + // Parse scope + let a_scope: OwnerType = match &scope { + Some(s) if !s.is_empty() => { + serde_json::from_value(serde_json::Value::String(s.clone())) + .map_err(|_| ApiError::BadRequest(format!("Invalid scope: '{}'", s)))? + } + _ => OwnerType::Action, + }; + + // Parse visibility with type-aware default + let a_visibility: ArtifactVisibility = match &visibility { + Some(v) if !v.is_empty() => { + serde_json::from_value(serde_json::Value::String(v.clone())) + .map_err(|_| ApiError::BadRequest(format!("Invalid visibility: '{}'", v)))? + } + _ => { + if a_type == ArtifactType::Progress { + ArtifactVisibility::Public + } else { + ArtifactVisibility::Private + } + } + }; + + // Parse retention + let a_retention_policy: RetentionPolicyType = match &retention_policy { + Some(rp) if !rp.is_empty() => { + serde_json::from_value(serde_json::Value::String(rp.clone())).map_err(|_| { + ApiError::BadRequest(format!("Invalid retention_policy: '{}'", rp)) + })? + } + _ => RetentionPolicyType::Versions, + }; + let a_retention_limit: i32 = match &retention_limit { + Some(rl) if !rl.is_empty() => rl.parse::().map_err(|_| { + ApiError::BadRequest(format!("Invalid retention_limit: '{}'", rl)) + })?, + _ => 10, + }; + + let create_input = CreateArtifactInput { + r#ref: artifact_ref.clone(), + scope: a_scope, + owner: owner.unwrap_or_default(), + r#type: a_type, + visibility: a_visibility, + retention_policy: a_retention_policy, + retention_limit: a_retention_limit, + name: name.filter(|s| !s.is_empty()), + description: description.filter(|s| !s.is_empty()), + content_type: content_type_field + .clone() + .or_else(|| file_content_type.clone()), + execution: execution_id, + data: None, + }; + + ArtifactRepository::create(&state.db, create_input).await? + } + }; + + // Resolve content type: explicit field > multipart header > fallback + let resolved_ct = content_type_field + .or(file_content_type) + .unwrap_or_else(|| "application/octet-stream".to_string()); + + let version_input = CreateArtifactVersionInput { + artifact: artifact.id, + content_type: Some(resolved_ct), + content: Some(file_bytes), + content_json: None, + file_path: None, + meta, + created_by, + }; + + let version = ArtifactVersionRepository::create(&state.db, version_input).await?; + + Ok(( + StatusCode::CREATED, + Json(ApiResponse::with_message( + ArtifactVersionResponse::from(version), + "Version uploaded successfully", + )), + )) +} + // ============================================================================ // Helpers // ============================================================================ @@ -1219,6 +1499,10 @@ pub fn routes() -> Router> { .delete(delete_artifact), ) .route("/artifacts/ref/{ref}", get(get_artifact_by_ref)) + .route( + "/artifacts/ref/{ref}/versions/upload", + post(upload_version_by_ref), + ) // Progress / data .route("/artifacts/{id}/progress", post(append_progress)) .route( diff --git a/crates/api/src/routes/packs.rs b/crates/api/src/routes/packs.rs index 565078d..6fb0a74 100644 --- a/crates/api/src/routes/packs.rs +++ b/crates/api/src/routes/packs.rs @@ -14,10 +14,7 @@ use validator::Validate; use attune_common::models::pack_test::PackTestResult; use attune_common::mq::{MessageEnvelope, MessageType, PackRegisteredPayload}; use attune_common::repositories::{ - action::ActionRepository, pack::{CreatePackInput, UpdatePackInput}, - rule::{RestoreRuleInput, RuleRepository}, - trigger::TriggerRepository, Create, Delete, FindById, FindByRef, PackRepository, PackTestRepository, Pagination, Update, }; use attune_common::workflow::{PackWorkflowService, PackWorkflowServiceConfig}; @@ -732,85 +729,100 @@ async fn register_pack_internal( .and_then(|v| v.as_str()) .map(|s| s.to_string()); - // Ad-hoc rules to restore after pack reinstallation - let mut saved_adhoc_rules: Vec = Vec::new(); + // Extract common metadata fields used for both create and update + let conf_schema = pack_yaml + .get("config_schema") + .and_then(|v| serde_json::to_value(v).ok()) + .unwrap_or_else(|| serde_json::json!({})); + let meta = pack_yaml + .get("metadata") + .and_then(|v| serde_json::to_value(v).ok()) + .unwrap_or_else(|| serde_json::json!({})); + let tags: Vec = pack_yaml + .get("keywords") + .and_then(|v| v.as_sequence()) + .map(|seq| { + seq.iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); + let runtime_deps: Vec = pack_yaml + .get("runtime_deps") + .and_then(|v| v.as_sequence()) + .map(|seq| { + seq.iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); + let dependencies: Vec = pack_yaml + .get("dependencies") + .and_then(|v| v.as_sequence()) + .map(|seq| { + seq.iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); - // Check if pack already exists - if !force { - if PackRepository::exists_by_ref(&state.db, &pack_ref).await? { + // Check if pack already exists — update in place to preserve IDs + let existing_pack = PackRepository::find_by_ref(&state.db, &pack_ref).await?; + + let is_new_pack; + + let pack = if let Some(existing) = existing_pack { + if !force { return Err(ApiError::Conflict(format!( "Pack '{}' already exists. Use force=true to reinstall.", pack_ref ))); } + + // Update existing pack in place — preserves pack ID and all child entity IDs + let update_input = UpdatePackInput { + label: Some(label), + description: Some(description.unwrap_or_default()), + version: Some(version.clone()), + conf_schema: Some(conf_schema), + config: None, // preserve user-set config + meta: Some(meta), + tags: Some(tags), + runtime_deps: Some(runtime_deps), + dependencies: Some(dependencies), + is_standard: None, + installers: None, + }; + + let updated = PackRepository::update(&state.db, existing.id, update_input).await?; + tracing::info!( + "Updated existing pack '{}' (ID: {}) in place", + pack_ref, + updated.id + ); + is_new_pack = false; + updated } else { - // Delete existing pack if force is true, preserving ad-hoc (user-created) rules - if let Some(existing_pack) = PackRepository::find_by_ref(&state.db, &pack_ref).await? { - // Save ad-hoc rules before deletion — CASCADE on pack FK would destroy them - saved_adhoc_rules = RuleRepository::find_adhoc_by_pack(&state.db, existing_pack.id) - .await - .unwrap_or_default(); - if !saved_adhoc_rules.is_empty() { - tracing::info!( - "Preserving {} ad-hoc rule(s) during reinstall of pack '{}'", - saved_adhoc_rules.len(), - pack_ref - ); - } + // Create new pack + let pack_input = CreatePackInput { + r#ref: pack_ref.clone(), + label, + description, + version: version.clone(), + conf_schema, + config: serde_json::json!({}), + meta, + tags, + runtime_deps, + dependencies, + is_standard: false, + installers: serde_json::json!({}), + }; - PackRepository::delete(&state.db, existing_pack.id).await?; - tracing::info!("Deleted existing pack '{}' for forced reinstall", pack_ref); - } - } - - // Create pack input - let pack_input = CreatePackInput { - r#ref: pack_ref.clone(), - label, - description, - version: version.clone(), - conf_schema: pack_yaml - .get("config_schema") - .and_then(|v| serde_json::to_value(v).ok()) - .unwrap_or_else(|| serde_json::json!({})), - config: serde_json::json!({}), - meta: pack_yaml - .get("metadata") - .and_then(|v| serde_json::to_value(v).ok()) - .unwrap_or_else(|| serde_json::json!({})), - tags: pack_yaml - .get("keywords") - .and_then(|v| v.as_sequence()) - .map(|seq| { - seq.iter() - .filter_map(|v| v.as_str().map(|s| s.to_string())) - .collect() - }) - .unwrap_or_default(), - runtime_deps: pack_yaml - .get("runtime_deps") - .and_then(|v| v.as_sequence()) - .map(|seq| { - seq.iter() - .filter_map(|v| v.as_str().map(|s| s.to_string())) - .collect() - }) - .unwrap_or_default(), - dependencies: pack_yaml - .get("dependencies") - .and_then(|v| v.as_sequence()) - .map(|seq| { - seq.iter() - .filter_map(|v| v.as_str().map(|s| s.to_string())) - .collect() - }) - .unwrap_or_default(), - is_standard: false, - installers: serde_json::json!({}), + is_new_pack = true; + PackRepository::create(&state.db, pack_input).await? }; - let pack = PackRepository::create(&state.db, pack_input).await?; - // Auto-sync workflows after pack creation let packs_base_dir = PathBuf::from(&state.config.packs_base_dir); let service_config = PackWorkflowServiceConfig { @@ -850,14 +862,18 @@ async fn register_pack_internal( match component_loader.load_all(&pack_path).await { Ok(load_result) => { tracing::info!( - "Pack '{}' components loaded: {} runtimes, {} triggers, {} actions, {} sensors ({} skipped, {} warnings)", + "Pack '{}' components loaded: {} created, {} updated, {} skipped, {} removed, {} warnings \ + (runtimes: {}/{}, triggers: {}/{}, actions: {}/{}, sensors: {}/{})", pack.r#ref, - load_result.runtimes_loaded, - load_result.triggers_loaded, - load_result.actions_loaded, - load_result.sensors_loaded, + load_result.total_loaded(), + load_result.total_updated(), load_result.total_skipped(), - load_result.warnings.len() + load_result.removed, + load_result.warnings.len(), + load_result.runtimes_loaded, load_result.runtimes_updated, + load_result.triggers_loaded, load_result.triggers_updated, + load_result.actions_loaded, load_result.actions_updated, + load_result.sensors_loaded, load_result.sensors_updated, ); for warning in &load_result.warnings { tracing::warn!("Pack component warning: {}", warning); @@ -873,122 +889,9 @@ async fn register_pack_internal( } } - // Restore ad-hoc rules that were saved before pack deletion, and - // re-link any rules from other packs whose action/trigger FKs were - // set to NULL when the old pack's entities were cascade-deleted. - { - // Phase 1: Restore saved ad-hoc rules - if !saved_adhoc_rules.is_empty() { - let mut restored = 0u32; - for saved_rule in &saved_adhoc_rules { - // Resolve action and trigger IDs by ref (they may have been recreated) - let action_id = ActionRepository::find_by_ref(&state.db, &saved_rule.action_ref) - .await - .ok() - .flatten() - .map(|a| a.id); - let trigger_id = TriggerRepository::find_by_ref(&state.db, &saved_rule.trigger_ref) - .await - .ok() - .flatten() - .map(|t| t.id); - - let input = RestoreRuleInput { - r#ref: saved_rule.r#ref.clone(), - pack: pack.id, - pack_ref: pack.r#ref.clone(), - label: saved_rule.label.clone(), - description: saved_rule.description.clone(), - action: action_id, - action_ref: saved_rule.action_ref.clone(), - trigger: trigger_id, - trigger_ref: saved_rule.trigger_ref.clone(), - conditions: saved_rule.conditions.clone(), - action_params: saved_rule.action_params.clone(), - trigger_params: saved_rule.trigger_params.clone(), - enabled: saved_rule.enabled, - }; - - match RuleRepository::restore_rule(&state.db, input).await { - Ok(rule) => { - restored += 1; - if rule.action.is_none() || rule.trigger.is_none() { - tracing::warn!( - "Restored ad-hoc rule '{}' with unresolved references \ - (action: {}, trigger: {})", - rule.r#ref, - if rule.action.is_some() { - "linked" - } else { - "NULL" - }, - if rule.trigger.is_some() { - "linked" - } else { - "NULL" - }, - ); - } - } - Err(e) => { - tracing::warn!( - "Failed to restore ad-hoc rule '{}': {}", - saved_rule.r#ref, - e - ); - } - } - } - tracing::info!( - "Restored {}/{} ad-hoc rule(s) for pack '{}'", - restored, - saved_adhoc_rules.len(), - pack.r#ref - ); - } - - // Phase 2: Re-link rules from other packs whose action/trigger FKs - // were set to NULL when the old pack's entities were cascade-deleted - let new_actions = ActionRepository::find_by_pack(&state.db, pack.id) - .await - .unwrap_or_default(); - let new_triggers = TriggerRepository::find_by_pack(&state.db, pack.id) - .await - .unwrap_or_default(); - - for action in &new_actions { - match RuleRepository::relink_action_by_ref(&state.db, &action.r#ref, action.id).await { - Ok(count) if count > 0 => { - tracing::info!("Re-linked {} rule(s) to action '{}'", count, action.r#ref); - } - Err(e) => { - tracing::warn!( - "Failed to re-link rules to action '{}': {}", - action.r#ref, - e - ); - } - _ => {} - } - } - - for trigger in &new_triggers { - match RuleRepository::relink_trigger_by_ref(&state.db, &trigger.r#ref, trigger.id).await - { - Ok(count) if count > 0 => { - tracing::info!("Re-linked {} rule(s) to trigger '{}'", count, trigger.r#ref); - } - Err(e) => { - tracing::warn!( - "Failed to re-link rules to trigger '{}': {}", - trigger.r#ref, - e - ); - } - _ => {} - } - } - } + // Since entities are now updated in place (IDs preserved), ad-hoc rules + // and cross-pack FK references survive reinstallation automatically. + // No need to save/restore rules or re-link FKs. // Set up runtime environments for the pack's actions. // This creates virtualenvs, installs dependencies, etc. based on each @@ -1199,8 +1102,11 @@ async fn register_pack_internal( let test_passed = result.status == "passed"; if !test_passed && !force { - // Tests failed and force is not set - rollback pack creation - let _ = PackRepository::delete(&state.db, pack.id).await; + // Tests failed and force is not set — only delete if we just created this pack. + // If we updated an existing pack, deleting would destroy the original. + if is_new_pack { + let _ = PackRepository::delete(&state.db, pack.id).await; + } return Err(ApiError::BadRequest(format!( "Pack registration failed: tests did not pass. Use force=true to register anyway." ))); @@ -1217,7 +1123,9 @@ async fn register_pack_internal( tracing::warn!("Failed to execute tests for pack '{}': {}", pack.r#ref, e); // If tests can't be executed and force is not set, fail the registration if !force { - let _ = PackRepository::delete(&state.db, pack.id).await; + if is_new_pack { + let _ = PackRepository::delete(&state.db, pack.id).await; + } return Err(ApiError::BadRequest(format!( "Pack registration failed: could not execute tests. Error: {}. Use force=true to register anyway.", e diff --git a/crates/api/src/routes/workflows.rs b/crates/api/src/routes/workflows.rs index 951a644..d73ac06 100644 --- a/crates/api/src/routes/workflows.rs +++ b/crates/api/src/routes/workflows.rs @@ -669,6 +669,9 @@ async fn update_companion_action( runtime_version_constraint: None, param_schema: param_schema.cloned(), out_schema: out_schema.cloned(), + parameter_delivery: None, + parameter_format: None, + output_format: None, }; ActionRepository::update(db, action.id, update_input) @@ -731,6 +734,9 @@ async fn ensure_companion_action( runtime_version_constraint: None, param_schema: param_schema.cloned(), out_schema: out_schema.cloned(), + parameter_delivery: None, + parameter_format: None, + output_format: None, }; ActionRepository::update(db, action.id, update_input) diff --git a/crates/common/src/pack_registry/loader.rs b/crates/common/src/pack_registry/loader.rs index df99a5e..5a8e0f9 100644 --- a/crates/common/src/pack_registry/loader.rs +++ b/crates/common/src/pack_registry/loader.rs @@ -9,6 +9,11 @@ //! 2. Triggers (no dependencies) //! 3. Actions (depend on runtime) //! 4. Sensors (depend on triggers and runtime) +//! +//! All loaders use **upsert** semantics: if an entity with the same ref already +//! exists it is updated in place (preserving its database ID); otherwise a new +//! row is created. After loading, entities that belong to the pack but whose +//! refs are no longer present in the YAML files are deleted. use std::collections::HashMap; use std::path::Path; @@ -18,34 +23,47 @@ use tracing::{info, warn}; use crate::error::{Error, Result}; use crate::models::Id; -use crate::repositories::action::ActionRepository; -use crate::repositories::runtime::{CreateRuntimeInput, RuntimeRepository}; -use crate::repositories::runtime_version::{CreateRuntimeVersionInput, RuntimeVersionRepository}; -use crate::repositories::trigger::{ - CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository, +use crate::repositories::action::{ActionRepository, UpdateActionInput}; +use crate::repositories::runtime::{CreateRuntimeInput, RuntimeRepository, UpdateRuntimeInput}; +use crate::repositories::runtime_version::{ + CreateRuntimeVersionInput, RuntimeVersionRepository, UpdateRuntimeVersionInput, }; -use crate::repositories::{Create, FindById, FindByRef, Update}; +use crate::repositories::trigger::{ + CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository, UpdateSensorInput, + UpdateTriggerInput, +}; +use crate::repositories::{Create, Delete, FindById, FindByRef, Update}; use crate::version_matching::extract_version_components; /// Result of loading pack components into the database. #[derive(Debug, Default)] pub struct PackLoadResult { - /// Number of runtimes loaded + /// Number of runtimes created pub runtimes_loaded: usize, - /// Number of runtimes skipped (already exist) + /// Number of runtimes updated (already existed) + pub runtimes_updated: usize, + /// Number of runtimes skipped due to errors pub runtimes_skipped: usize, - /// Number of triggers loaded + /// Number of triggers created pub triggers_loaded: usize, - /// Number of triggers skipped (already exist) + /// Number of triggers updated + pub triggers_updated: usize, + /// Number of triggers skipped pub triggers_skipped: usize, - /// Number of actions loaded + /// Number of actions created pub actions_loaded: usize, - /// Number of actions skipped (already exist) + /// Number of actions updated + pub actions_updated: usize, + /// Number of actions skipped pub actions_skipped: usize, - /// Number of sensors loaded + /// Number of sensors created pub sensors_loaded: usize, - /// Number of sensors skipped (already exist) + /// Number of sensors updated + pub sensors_updated: usize, + /// Number of sensors skipped pub sensors_skipped: usize, + /// Number of stale entities removed + pub removed: usize, /// Warnings encountered during loading pub warnings: Vec, } @@ -58,6 +76,10 @@ impl PackLoadResult { pub fn total_skipped(&self) -> usize { self.runtimes_skipped + self.triggers_skipped + self.actions_skipped + self.sensors_skipped } + + pub fn total_updated(&self) -> usize { + self.runtimes_updated + self.triggers_updated + self.actions_updated + self.sensors_updated + } } /// Loads pack components (triggers, actions, sensors) from YAML files on disk @@ -79,9 +101,10 @@ impl<'a> PackComponentLoader<'a> { /// Load all components from the pack directory. /// - /// Reads triggers, actions, and sensors from their respective subdirectories - /// and registers them in the database. Components that already exist (by ref) - /// are skipped. + /// Uses upsert semantics: entities that already exist (by ref) are updated + /// in place, preserving their database IDs. New entities are created. + /// After loading, entities that belong to the pack but are no longer + /// present in the YAML files are removed. pub async fn load_all(&self, pack_dir: &Path) -> Result { let mut result = PackLoadResult::default(); @@ -92,43 +115,60 @@ impl<'a> PackComponentLoader<'a> { ); // 1. Load runtimes first (no dependencies) - self.load_runtimes(pack_dir, &mut result).await?; + let runtime_refs = self.load_runtimes(pack_dir, &mut result).await?; // 2. Load triggers (no dependencies) - let trigger_ids = self.load_triggers(pack_dir, &mut result).await?; + let (trigger_ids, trigger_refs) = self.load_triggers(pack_dir, &mut result).await?; // 3. Load actions (depend on runtime) - self.load_actions(pack_dir, &mut result).await?; + let action_refs = self.load_actions(pack_dir, &mut result).await?; // 4. Load sensors (depend on triggers and runtime) - self.load_sensors(pack_dir, &trigger_ids, &mut result) + let sensor_refs = self + .load_sensors(pack_dir, &trigger_ids, &mut result) .await?; + // 5. Clean up entities that are no longer in the pack's YAML files + self.cleanup_removed_entities( + &runtime_refs, + &trigger_refs, + &action_refs, + &sensor_refs, + &mut result, + ) + .await; + info!( - "Pack '{}' component loading complete: {} loaded, {} skipped, {} warnings", + "Pack '{}' component loading complete: {} created, {} updated, {} skipped, {} removed, {} warnings", self.pack_ref, result.total_loaded(), + result.total_updated(), result.total_skipped(), + result.removed, result.warnings.len() ); Ok(result) } - /// Load trigger definitions from `pack_dir/triggers/*.yaml`. - /// - /// Returns a map of trigger ref -> trigger ID for use by sensor loading. /// Load runtime definitions from `pack_dir/runtimes/*.yaml`. /// /// Runtimes define how actions and sensors are executed (interpreter, /// environment setup, dependency management). They are loaded first /// since actions reference them. - async fn load_runtimes(&self, pack_dir: &Path, result: &mut PackLoadResult) -> Result<()> { + /// + /// Returns the set of runtime refs that were loaded (for cleanup). + async fn load_runtimes( + &self, + pack_dir: &Path, + result: &mut PackLoadResult, + ) -> Result> { let runtimes_dir = pack_dir.join("runtimes"); + let mut loaded_refs = Vec::new(); if !runtimes_dir.exists() { info!("No runtimes directory found for pack '{}'", self.pack_ref); - return Ok(()); + return Ok(loaded_refs); } let yaml_files = read_yaml_files(&runtimes_dir)?; @@ -153,16 +193,6 @@ impl<'a> PackComponentLoader<'a> { } }; - // Check if runtime already exists - if let Some(existing) = RuntimeRepository::find_by_ref(self.pool, &runtime_ref).await? { - info!( - "Runtime '{}' already exists (ID: {}), skipping", - runtime_ref, existing.id - ); - result.runtimes_skipped += 1; - continue; - } - let name = data .get("name") .and_then(|v| v.as_str()) @@ -188,6 +218,35 @@ impl<'a> PackComponentLoader<'a> { .and_then(|v| serde_json::to_value(v).ok()) .unwrap_or_else(|| serde_json::json!({})); + // Check if runtime already exists — update in place if so + if let Some(existing) = RuntimeRepository::find_by_ref(self.pool, &runtime_ref).await? { + let update_input = UpdateRuntimeInput { + description, + name: Some(name), + distributions: Some(distributions), + installation, + execution_config: Some(execution_config), + }; + + match RuntimeRepository::update(self.pool, existing.id, update_input).await { + Ok(_) => { + info!("Updated runtime '{}' (ID: {})", runtime_ref, existing.id); + result.runtimes_updated += 1; + + // Also upsert version entries + self.load_runtime_versions(&data, existing.id, &runtime_ref, result) + .await; + } + Err(e) => { + let msg = format!("Failed to update runtime '{}': {}", runtime_ref, e); + warn!("{}", msg); + result.warnings.push(msg); + } + } + loaded_refs.push(runtime_ref); + continue; + } + let input = CreateRuntimeInput { r#ref: runtime_ref.clone(), pack: Some(self.pack_id), @@ -203,6 +262,7 @@ impl<'a> PackComponentLoader<'a> { Ok(rt) => { info!("Created runtime '{}' (ID: {})", runtime_ref, rt.id); result.runtimes_loaded += 1; + loaded_refs.push(runtime_ref.clone()); // Load version entries from the optional `versions` array self.load_runtime_versions(&data, rt.id, &runtime_ref, result) @@ -214,10 +274,11 @@ impl<'a> PackComponentLoader<'a> { if let sqlx::Error::Database(ref inner) = db_err { if inner.is_unique_violation() { info!( - "Runtime '{}' already exists (concurrent creation), skipping", + "Runtime '{}' already exists (concurrent creation), treating as update", runtime_ref ); - result.runtimes_skipped += 1; + loaded_refs.push(runtime_ref); + result.runtimes_updated += 1; continue; } } @@ -229,29 +290,13 @@ impl<'a> PackComponentLoader<'a> { } } - Ok(()) + Ok(loaded_refs) } - /// Load version entries from the `versions` array in a runtime YAML. + /// Load runtime version entries from a runtime's YAML `versions` array. /// - /// Each entry in the array describes a specific version of the runtime - /// with its own `execution_config` and `distributions`. Example: - /// - /// ```yaml - /// versions: - /// - version: "3.12" - /// is_default: true - /// execution_config: - /// interpreter: - /// binary: python3.12 - /// ... - /// distributions: - /// verification: - /// commands: - /// - binary: python3.12 - /// args: ["--version"] - /// ... - /// ``` + /// Uses upsert: existing versions (by runtime + version string) are updated, + /// new versions are created. async fn load_runtime_versions( &self, data: &serde_yaml_ng::Value, @@ -270,6 +315,9 @@ impl<'a> PackComponentLoader<'a> { runtime_ref ); + // Collect version strings we loaded so we can clean up removed versions + let mut loaded_versions = Vec::new(); + for entry in versions { let version_str = match entry.get("version").and_then(|v| v.as_str()) { Some(v) => v.to_string(), @@ -284,21 +332,6 @@ impl<'a> PackComponentLoader<'a> { } }; - // Check if this version already exists - if let Ok(Some(_existing)) = RuntimeVersionRepository::find_by_runtime_and_version( - self.pool, - runtime_id, - &version_str, - ) - .await - { - info!( - "Version '{}' for runtime '{}' already exists, skipping", - version_str, runtime_ref - ); - continue; - } - let (version_major, version_minor, version_patch) = extract_version_components(&version_str); @@ -322,6 +355,47 @@ impl<'a> PackComponentLoader<'a> { .and_then(|v| serde_json::to_value(v).ok()) .unwrap_or_else(|| serde_json::json!({})); + // Check if this version already exists — update in place if so + if let Ok(Some(existing)) = RuntimeVersionRepository::find_by_runtime_and_version( + self.pool, + runtime_id, + &version_str, + ) + .await + { + let update_input = UpdateRuntimeVersionInput { + version: None, // version string doesn't change + version_major: Some(version_major), + version_minor: Some(version_minor), + version_patch: Some(version_patch), + execution_config: Some(execution_config), + distributions: Some(distributions), + is_default: Some(is_default), + available: None, // preserve current availability — verification sets this + verified_at: None, + meta: Some(meta), + }; + + match RuntimeVersionRepository::update(self.pool, existing.id, update_input).await { + Ok(_) => { + info!( + "Updated version '{}' for runtime '{}' (ID: {})", + version_str, runtime_ref, existing.id + ); + } + Err(e) => { + let msg = format!( + "Failed to update version '{}' for runtime '{}': {}", + version_str, runtime_ref, e + ); + warn!("{}", msg); + result.warnings.push(msg); + } + } + loaded_versions.push(version_str); + continue; + } + let input = CreateRuntimeVersionInput { runtime: runtime_id, runtime_ref: runtime_ref.to_string(), @@ -342,6 +416,7 @@ impl<'a> PackComponentLoader<'a> { "Created version '{}' for runtime '{}' (ID: {})", version_str, runtime_ref, rv.id ); + loaded_versions.push(version_str); } Err(e) => { // Check for unique constraint violation (race condition) @@ -352,6 +427,7 @@ impl<'a> PackComponentLoader<'a> { "Version '{}' for runtime '{}' already exists (concurrent), skipping", version_str, runtime_ref ); + loaded_versions.push(version_str); continue; } } @@ -365,19 +441,44 @@ impl<'a> PackComponentLoader<'a> { } } } + + // Clean up versions that are no longer in the YAML + if let Ok(existing_versions) = + RuntimeVersionRepository::find_by_runtime(self.pool, runtime_id).await + { + for existing in existing_versions { + if !loaded_versions.contains(&existing.version) { + info!( + "Removing stale version '{}' for runtime '{}'", + existing.version, runtime_ref + ); + if let Err(e) = RuntimeVersionRepository::delete(self.pool, existing.id).await { + warn!( + "Failed to delete stale version '{}' for runtime '{}': {}", + existing.version, runtime_ref, e + ); + } + } + } + } } + /// Load trigger definitions from `pack_dir/triggers/*.yaml`. + /// + /// Returns a map of trigger ref -> trigger ID for use by sensor loading, + /// and the list of loaded trigger refs for cleanup. async fn load_triggers( &self, pack_dir: &Path, result: &mut PackLoadResult, - ) -> Result> { + ) -> Result<(HashMap, Vec)> { let triggers_dir = pack_dir.join("triggers"); let mut trigger_ids = HashMap::new(); + let mut loaded_refs = Vec::new(); if !triggers_dir.exists() { info!("No triggers directory found for pack '{}'", self.pack_ref); - return Ok(trigger_ids); + return Ok((trigger_ids, loaded_refs)); } let yaml_files = read_yaml_files(&triggers_dir)?; @@ -402,17 +503,6 @@ impl<'a> PackComponentLoader<'a> { } }; - // Check if trigger already exists - if let Some(existing) = TriggerRepository::find_by_ref(self.pool, &trigger_ref).await? { - info!( - "Trigger '{}' already exists (ID: {}), skipping", - trigger_ref, existing.id - ); - trigger_ids.insert(trigger_ref, existing.id); - result.triggers_skipped += 1; - continue; - } - let name = extract_name_from_ref(&trigger_ref); let label = data .get("label") @@ -439,6 +529,32 @@ impl<'a> PackComponentLoader<'a> { .get("output") .and_then(|v| serde_json::to_value(v).ok()); + // Check if trigger already exists — update in place if so + if let Some(existing) = TriggerRepository::find_by_ref(self.pool, &trigger_ref).await? { + let update_input = UpdateTriggerInput { + label: Some(label), + description: Some(description), + enabled: Some(enabled), + param_schema, + out_schema, + }; + + match TriggerRepository::update(self.pool, existing.id, update_input).await { + Ok(_) => { + info!("Updated trigger '{}' (ID: {})", trigger_ref, existing.id); + result.triggers_updated += 1; + } + Err(e) => { + let msg = format!("Failed to update trigger '{}': {}", trigger_ref, e); + warn!("{}", msg); + result.warnings.push(msg); + } + } + trigger_ids.insert(trigger_ref.clone(), existing.id); + loaded_refs.push(trigger_ref); + continue; + } + let input = CreateTriggerInput { r#ref: trigger_ref.clone(), pack: Some(self.pack_id), @@ -454,7 +570,8 @@ impl<'a> PackComponentLoader<'a> { match TriggerRepository::create(self.pool, input).await { Ok(trigger) => { info!("Created trigger '{}' (ID: {})", trigger_ref, trigger.id); - trigger_ids.insert(trigger_ref, trigger.id); + trigger_ids.insert(trigger_ref.clone(), trigger.id); + loaded_refs.push(trigger_ref); result.triggers_loaded += 1; } Err(e) => { @@ -465,16 +582,23 @@ impl<'a> PackComponentLoader<'a> { } } - Ok(trigger_ids) + Ok((trigger_ids, loaded_refs)) } /// Load action definitions from `pack_dir/actions/*.yaml`. - async fn load_actions(&self, pack_dir: &Path, result: &mut PackLoadResult) -> Result<()> { + /// + /// Returns the list of loaded action refs for cleanup. + async fn load_actions( + &self, + pack_dir: &Path, + result: &mut PackLoadResult, + ) -> Result> { let actions_dir = pack_dir.join("actions"); + let mut loaded_refs = Vec::new(); if !actions_dir.exists() { info!("No actions directory found for pack '{}'", self.pack_ref); - return Ok(()); + return Ok(loaded_refs); } let yaml_files = read_yaml_files(&actions_dir)?; @@ -499,16 +623,6 @@ impl<'a> PackComponentLoader<'a> { } }; - // Check if action already exists - if let Some(existing) = ActionRepository::find_by_ref(self.pool, &action_ref).await? { - info!( - "Action '{}' already exists (ID: {}), skipping", - action_ref, existing.id - ); - result.actions_skipped += 1; - continue; - } - let name = extract_name_from_ref(&action_ref); let label = data .get("label") @@ -544,9 +658,6 @@ impl<'a> PackComponentLoader<'a> { .get("output") .and_then(|v| serde_json::to_value(v).ok()); - // Read optional fields for parameter delivery/format and output format. - // The database has defaults (stdin, json, text), so we only set these - // in the INSERT if the YAML specifies them. let parameter_delivery = data .get("parameter_delivery") .and_then(|v| v.as_str()) @@ -571,6 +682,36 @@ impl<'a> PackComponentLoader<'a> { .and_then(|v| v.as_str()) .map(|s| s.to_string()); + // Check if action already exists — update in place if so + if let Some(existing) = ActionRepository::find_by_ref(self.pool, &action_ref).await? { + let update_input = UpdateActionInput { + label: Some(label), + description: Some(description), + entrypoint: Some(entrypoint), + runtime: runtime_id, + runtime_version_constraint: Some(runtime_version_constraint), + param_schema, + out_schema, + parameter_delivery: Some(parameter_delivery), + parameter_format: Some(parameter_format), + output_format: Some(output_format), + }; + + match ActionRepository::update(self.pool, existing.id, update_input).await { + Ok(_) => { + info!("Updated action '{}' (ID: {})", action_ref, existing.id); + result.actions_updated += 1; + } + Err(e) => { + let msg = format!("Failed to update action '{}': {}", action_ref, e); + warn!("{}", msg); + result.warnings.push(msg); + } + } + loaded_refs.push(action_ref); + continue; + } + // Use raw SQL to include parameter_delivery, parameter_format, // output_format which are not in CreateActionInput let create_result = sqlx::query_scalar::<_, i64>( @@ -604,6 +745,7 @@ impl<'a> PackComponentLoader<'a> { match create_result { Ok(id) => { info!("Created action '{}' (ID: {})", action_ref, id); + loaded_refs.push(action_ref); result.actions_loaded += 1; } Err(e) => { @@ -611,10 +753,11 @@ impl<'a> PackComponentLoader<'a> { if let sqlx::Error::Database(ref db_err) = e { if db_err.is_unique_violation() { info!( - "Action '{}' already exists (concurrent creation), skipping", + "Action '{}' already exists (concurrent creation), treating as update", action_ref ); - result.actions_skipped += 1; + loaded_refs.push(action_ref); + result.actions_updated += 1; continue; } } @@ -625,21 +768,24 @@ impl<'a> PackComponentLoader<'a> { } } - Ok(()) + Ok(loaded_refs) } /// Load sensor definitions from `pack_dir/sensors/*.yaml`. + /// + /// Returns the list of loaded sensor refs for cleanup. async fn load_sensors( &self, pack_dir: &Path, trigger_ids: &HashMap, result: &mut PackLoadResult, - ) -> Result<()> { + ) -> Result> { let sensors_dir = pack_dir.join("sensors"); + let mut loaded_refs = Vec::new(); if !sensors_dir.exists() { info!("No sensors directory found for pack '{}'", self.pack_ref); - return Ok(()); + return Ok(loaded_refs); } let yaml_files = read_yaml_files(&sensors_dir)?; @@ -758,8 +904,6 @@ impl<'a> PackComponentLoader<'a> { // Upsert: update existing sensors so re-registration corrects // stale metadata (especially runtime assignments). if let Some(existing) = SensorRepository::find_by_ref(self.pool, &sensor_ref).await? { - use crate::repositories::trigger::UpdateSensorInput; - let update_input = UpdateSensorInput { label: Some(label), description: Some(description), @@ -780,7 +924,7 @@ impl<'a> PackComponentLoader<'a> { "Updated sensor '{}' (ID: {}, runtime: {} → {})", sensor_ref, existing.id, existing.runtime_ref, sensor_runtime_ref ); - result.sensors_loaded += 1; + result.sensors_updated += 1; } Err(e) => { let msg = format!("Failed to update sensor '{}': {}", sensor_ref, e); @@ -788,6 +932,7 @@ impl<'a> PackComponentLoader<'a> { result.warnings.push(msg); } } + loaded_refs.push(sensor_ref); continue; } @@ -811,6 +956,7 @@ impl<'a> PackComponentLoader<'a> { match SensorRepository::create(self.pool, input).await { Ok(sensor) => { info!("Created sensor '{}' (ID: {})", sensor_ref, sensor.id); + loaded_refs.push(sensor_ref); result.sensors_loaded += 1; } Err(e) => { @@ -821,7 +967,7 @@ impl<'a> PackComponentLoader<'a> { } } - Ok(()) + Ok(loaded_refs) } /// Resolve a runtime ID from a runner type string (e.g., "shell", "python", "native"). @@ -917,11 +1063,116 @@ impl<'a> PackComponentLoader<'a> { } } } + + /// Remove entities that belong to this pack but whose refs are no longer + /// present in the pack's YAML files. + /// + /// This handles the case where an action/trigger/sensor/runtime was removed + /// from the pack between versions. Ad-hoc (user-created) entities are never + /// removed. + async fn cleanup_removed_entities( + &self, + runtime_refs: &[String], + trigger_refs: &[String], + action_refs: &[String], + sensor_refs: &[String], + result: &mut PackLoadResult, + ) { + // Clean up sensors first (they depend on triggers/runtimes) + match SensorRepository::delete_by_pack_excluding(self.pool, self.pack_id, sensor_refs).await + { + Ok(count) => { + if count > 0 { + info!( + "Removed {} stale sensor(s) from pack '{}'", + count, self.pack_ref + ); + result.removed += count as usize; + } + } + Err(e) => { + warn!( + "Failed to clean up stale sensors for pack '{}': {}", + self.pack_ref, e + ); + } + } + + // Clean up actions (ad-hoc preserved) + match ActionRepository::delete_non_adhoc_by_pack_excluding( + self.pool, + self.pack_id, + action_refs, + ) + .await + { + Ok(count) => { + if count > 0 { + info!( + "Removed {} stale action(s) from pack '{}'", + count, self.pack_ref + ); + result.removed += count as usize; + } + } + Err(e) => { + warn!( + "Failed to clean up stale actions for pack '{}': {}", + self.pack_ref, e + ); + } + } + + // Clean up triggers (ad-hoc preserved) + match TriggerRepository::delete_non_adhoc_by_pack_excluding( + self.pool, + self.pack_id, + trigger_refs, + ) + .await + { + Ok(count) => { + if count > 0 { + info!( + "Removed {} stale trigger(s) from pack '{}'", + count, self.pack_ref + ); + result.removed += count as usize; + } + } + Err(e) => { + warn!( + "Failed to clean up stale triggers for pack '{}': {}", + self.pack_ref, e + ); + } + } + + // Clean up runtimes last (actions/sensors may reference them) + match RuntimeRepository::delete_by_pack_excluding(self.pool, self.pack_id, runtime_refs) + .await + { + Ok(count) => { + if count > 0 { + info!( + "Removed {} stale runtime(s) from pack '{}'", + count, self.pack_ref + ); + result.removed += count as usize; + } + } + Err(e) => { + warn!( + "Failed to clean up stale runtimes for pack '{}': {}", + self.pack_ref, e + ); + } + } + } } -/// Read all `.yaml` and `.yml` files from a directory, sorted by filename. -/// -/// Returns a Vec of (filename, content) pairs. +/// Read all YAML files from a directory, returning `(filename, content)` pairs +/// sorted by filename for deterministic ordering. fn read_yaml_files(dir: &Path) -> Result> { let mut files = Vec::new(); diff --git a/crates/common/src/repositories/action.rs b/crates/common/src/repositories/action.rs index 21a170f..cb3fbbb 100644 --- a/crates/common/src/repositories/action.rs +++ b/crates/common/src/repositories/action.rs @@ -8,6 +8,11 @@ use sqlx::{Executor, Postgres, QueryBuilder}; use super::{Create, Delete, FindById, FindByRef, List, Repository, Update}; +/// Columns selected in all Action queries. Must match the `Action` model's `FromRow` fields. +pub const ACTION_COLUMNS: &str = "id, ref, pack, pack_ref, label, description, entrypoint, \ + runtime, runtime_version_constraint, param_schema, out_schema, workflow_def, is_adhoc, \ + parameter_delivery, parameter_format, output_format, created, updated"; + /// Filters for [`ActionRepository::list_search`]. /// /// All fields are optional and combinable (AND). Pagination is always applied. @@ -65,6 +70,9 @@ pub struct UpdateActionInput { pub runtime_version_constraint: Option>, pub param_schema: Option, pub out_schema: Option, + pub parameter_delivery: Option, + pub parameter_format: Option, + pub output_format: Option, } #[async_trait::async_trait] @@ -73,15 +81,10 @@ impl FindById for ActionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - let action = sqlx::query_as::<_, Action>( - r#" - SELECT id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated - FROM action - WHERE id = $1 - "#, - ) + let action = sqlx::query_as::<_, Action>(&format!( + "SELECT {} FROM action WHERE id = $1", + ACTION_COLUMNS + )) .bind(id) .fetch_optional(executor) .await?; @@ -96,15 +99,10 @@ impl FindByRef for ActionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - let action = sqlx::query_as::<_, Action>( - r#" - SELECT id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated - FROM action - WHERE ref = $1 - "#, - ) + let action = sqlx::query_as::<_, Action>(&format!( + "SELECT {} FROM action WHERE ref = $1", + ACTION_COLUMNS + )) .bind(ref_str) .fetch_optional(executor) .await?; @@ -119,15 +117,10 @@ impl List for ActionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - let actions = sqlx::query_as::<_, Action>( - r#" - SELECT id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated - FROM action - ORDER BY ref ASC - "#, - ) + let actions = sqlx::query_as::<_, Action>(&format!( + "SELECT {} FROM action ORDER BY ref ASC", + ACTION_COLUMNS + )) .fetch_all(executor) .await?; @@ -155,16 +148,15 @@ impl Create for ActionRepository { } // Try to insert - database will enforce uniqueness constraint - let action = sqlx::query_as::<_, Action>( + let action = sqlx::query_as::<_, Action>(&format!( r#" INSERT INTO action (ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_version_constraint, param_schema, out_schema, is_adhoc) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - RETURNING id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated + RETURNING {} "#, - ) + ACTION_COLUMNS + )) .bind(&input.r#ref) .bind(input.pack) .bind(&input.pack_ref) @@ -267,6 +259,33 @@ impl Update for ActionRepository { has_updates = true; } + if let Some(parameter_delivery) = &input.parameter_delivery { + if has_updates { + query.push(", "); + } + query.push("parameter_delivery = "); + query.push_bind(parameter_delivery); + has_updates = true; + } + + if let Some(parameter_format) = &input.parameter_format { + if has_updates { + query.push(", "); + } + query.push("parameter_format = "); + query.push_bind(parameter_format); + has_updates = true; + } + + if let Some(output_format) = &input.output_format { + if has_updates { + query.push(", "); + } + query.push("output_format = "); + query.push_bind(output_format); + has_updates = true; + } + if !has_updates { // No updates requested, fetch and return existing action return Self::find_by_id(executor, id) @@ -276,7 +295,7 @@ impl Update for ActionRepository { query.push(", updated = NOW() WHERE id = "); query.push_bind(id); - query.push(" RETURNING id, ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_version_constraint, param_schema, out_schema, workflow_def, is_adhoc, created, updated"); + query.push(&format!(" RETURNING {}", ACTION_COLUMNS)); let action = query .build_query_as::() @@ -317,10 +336,8 @@ impl ActionRepository { where E: Executor<'e, Database = Postgres> + Copy + 'e, { - let select_cols = "id, ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_version_constraint, param_schema, out_schema, workflow_def, is_adhoc, created, updated"; - let mut qb: QueryBuilder<'_, Postgres> = - QueryBuilder::new(format!("SELECT {select_cols} FROM action")); + QueryBuilder::new(format!("SELECT {} FROM action", ACTION_COLUMNS)); let mut count_qb: QueryBuilder<'_, Postgres> = QueryBuilder::new("SELECT COUNT(*) FROM action"); @@ -398,16 +415,10 @@ impl ActionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - let actions = sqlx::query_as::<_, Action>( - r#" - SELECT id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated - FROM action - WHERE pack = $1 - ORDER BY ref ASC - "#, - ) + let actions = sqlx::query_as::<_, Action>(&format!( + "SELECT {} FROM action WHERE pack = $1 ORDER BY ref ASC", + ACTION_COLUMNS + )) .bind(pack_id) .fetch_all(executor) .await?; @@ -420,16 +431,10 @@ impl ActionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - let actions = sqlx::query_as::<_, Action>( - r#" - SELECT id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated - FROM action - WHERE runtime = $1 - ORDER BY ref ASC - "#, - ) + let actions = sqlx::query_as::<_, Action>(&format!( + "SELECT {} FROM action WHERE runtime = $1 ORDER BY ref ASC", + ACTION_COLUMNS + )) .bind(runtime_id) .fetch_all(executor) .await?; @@ -443,16 +448,10 @@ impl ActionRepository { E: Executor<'e, Database = Postgres> + 'e, { let search_pattern = format!("%{}%", query.to_lowercase()); - let actions = sqlx::query_as::<_, Action>( - r#" - SELECT id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated - FROM action - WHERE LOWER(ref) LIKE $1 OR LOWER(label) LIKE $1 OR LOWER(description) LIKE $1 - ORDER BY ref ASC - "#, - ) + let actions = sqlx::query_as::<_, Action>(&format!( + "SELECT {} FROM action WHERE LOWER(ref) LIKE $1 OR LOWER(label) LIKE $1 OR LOWER(description) LIKE $1 ORDER BY ref ASC", + ACTION_COLUMNS + )) .bind(&search_pattern) .fetch_all(executor) .await?; @@ -465,16 +464,10 @@ impl ActionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - let actions = sqlx::query_as::<_, Action>( - r#" - SELECT id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated - FROM action - WHERE workflow_def IS NOT NULL - ORDER BY ref ASC - "#, - ) + let actions = sqlx::query_as::<_, Action>(&format!( + "SELECT {} FROM action WHERE workflow_def IS NOT NULL ORDER BY ref ASC", + ACTION_COLUMNS + )) .fetch_all(executor) .await?; @@ -489,15 +482,10 @@ impl ActionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - let action = sqlx::query_as::<_, Action>( - r#" - SELECT id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated - FROM action - WHERE workflow_def = $1 - "#, - ) + let action = sqlx::query_as::<_, Action>(&format!( + "SELECT {} FROM action WHERE workflow_def = $1", + ACTION_COLUMNS + )) .bind(workflow_def_id) .fetch_optional(executor) .await?; @@ -505,6 +493,36 @@ impl ActionRepository { Ok(action) } + /// Delete non-adhoc actions belonging to a pack whose refs are NOT in the given set. + /// + /// Used during pack reinstallation to clean up actions that were removed + /// from the pack's YAML files. Ad-hoc (user-created) actions are preserved. + pub async fn delete_non_adhoc_by_pack_excluding<'e, E>( + executor: E, + pack_id: Id, + keep_refs: &[String], + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { + let result = if keep_refs.is_empty() { + sqlx::query("DELETE FROM action WHERE pack = $1 AND is_adhoc = false") + .bind(pack_id) + .execute(executor) + .await? + } else { + sqlx::query( + "DELETE FROM action WHERE pack = $1 AND is_adhoc = false AND ref != ALL($2)", + ) + .bind(pack_id) + .bind(keep_refs) + .execute(executor) + .await? + }; + + Ok(result.rows_affected()) + } + /// Link an action to a workflow definition pub async fn link_workflow_def<'e, E>( executor: E, @@ -514,16 +532,15 @@ impl ActionRepository { where E: Executor<'e, Database = Postgres> + 'e, { - let action = sqlx::query_as::<_, Action>( + let action = sqlx::query_as::<_, Action>(&format!( r#" UPDATE action SET workflow_def = $2, updated = NOW() WHERE id = $1 - RETURNING id, ref, pack, pack_ref, label, description, entrypoint, - runtime, runtime_version_constraint, - param_schema, out_schema, workflow_def, is_adhoc, created, updated + RETURNING {} "#, - ) + ACTION_COLUMNS + )) .bind(action_id) .bind(workflow_def_id) .fetch_one(executor) diff --git a/crates/common/src/repositories/artifact.rs b/crates/common/src/repositories/artifact.rs index 82dc733..7af2721 100644 --- a/crates/common/src/repositories/artifact.rs +++ b/crates/common/src/repositories/artifact.rs @@ -52,6 +52,7 @@ pub struct UpdateArtifactInput { pub description: Option, pub content_type: Option, pub size_bytes: Option, + pub execution: Option>, pub data: Option, } @@ -189,6 +190,15 @@ impl Update for ArtifactRepository { push_field!(&input.description, "description"); push_field!(&input.content_type, "content_type"); push_field!(input.size_bytes, "size_bytes"); + // execution is Option> — outer Option = "was field provided?", + // inner Option = nullable column value + if let Some(exec_val) = input.execution { + if has_updates { + query.push(", "); + } + query.push("execution = ").push_bind(exec_val); + has_updates = true; + } push_field!(&input.data, "data"); if !has_updates { diff --git a/crates/common/src/repositories/runtime.rs b/crates/common/src/repositories/runtime.rs index bee16f8..ef58ca6 100644 --- a/crates/common/src/repositories/runtime.rs +++ b/crates/common/src/repositories/runtime.rs @@ -284,6 +284,34 @@ impl RuntimeRepository { Ok(runtime) } + /// Delete runtimes belonging to a pack whose refs are NOT in the given set. + /// + /// Used during pack reinstallation to clean up runtimes that were removed + /// from the pack's YAML files. Associated runtime_version rows are + /// cascade-deleted by the FK constraint. + pub async fn delete_by_pack_excluding<'e, E>( + executor: E, + pack_id: Id, + keep_refs: &[String], + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { + let result = if keep_refs.is_empty() { + sqlx::query("DELETE FROM runtime WHERE pack = $1") + .bind(pack_id) + .execute(executor) + .await? + } else { + sqlx::query("DELETE FROM runtime WHERE pack = $1 AND ref != ALL($2)") + .bind(pack_id) + .bind(keep_refs) + .execute(executor) + .await? + }; + + Ok(result.rows_affected()) + } } // ============================================================================ diff --git a/crates/common/src/repositories/trigger.rs b/crates/common/src/repositories/trigger.rs index 20eb2ba..407fbf3 100644 --- a/crates/common/src/repositories/trigger.rs +++ b/crates/common/src/repositories/trigger.rs @@ -301,6 +301,36 @@ impl Delete for TriggerRepository { } impl TriggerRepository { + /// Delete non-adhoc triggers belonging to a pack whose refs are NOT in the given set. + /// + /// Used during pack reinstallation to clean up triggers that were removed + /// from the pack's YAML files. Ad-hoc (user-created) triggers are preserved. + pub async fn delete_non_adhoc_by_pack_excluding<'e, E>( + executor: E, + pack_id: Id, + keep_refs: &[String], + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { + let result = if keep_refs.is_empty() { + sqlx::query("DELETE FROM trigger WHERE pack = $1 AND is_adhoc = false") + .bind(pack_id) + .execute(executor) + .await? + } else { + sqlx::query( + "DELETE FROM trigger WHERE pack = $1 AND is_adhoc = false AND ref != ALL($2)", + ) + .bind(pack_id) + .bind(keep_refs) + .execute(executor) + .await? + }; + + Ok(result.rows_affected()) + } + /// Search triggers with all filters pushed into SQL. /// /// All filter fields are combinable (AND). Pagination is server-side. @@ -907,6 +937,34 @@ impl Delete for SensorRepository { } impl SensorRepository { + /// Delete non-adhoc sensors belonging to a pack whose refs are NOT in the given set. + /// + /// Used during pack reinstallation to clean up sensors that were removed + /// from the pack's YAML files. + pub async fn delete_by_pack_excluding<'e, E>( + executor: E, + pack_id: Id, + keep_refs: &[String], + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { + let result = if keep_refs.is_empty() { + sqlx::query("DELETE FROM sensor WHERE pack = $1") + .bind(pack_id) + .execute(executor) + .await? + } else { + sqlx::query("DELETE FROM sensor WHERE pack = $1 AND ref != ALL($2)") + .bind(pack_id) + .bind(keep_refs) + .execute(executor) + .await? + }; + + Ok(result.rows_affected()) + } + /// Search sensors with all filters pushed into SQL. /// /// All filter fields are combinable (AND). Pagination is server-side. diff --git a/crates/common/src/workflow/registrar.rs b/crates/common/src/workflow/registrar.rs index 7ebe557..b00b306 100644 --- a/crates/common/src/workflow/registrar.rs +++ b/crates/common/src/workflow/registrar.rs @@ -257,6 +257,9 @@ impl WorkflowRegistrar { runtime_version_constraint: None, param_schema: workflow.parameters.clone(), out_schema: workflow.output.clone(), + parameter_delivery: None, + parameter_format: None, + output_format: None, }; ActionRepository::update(&self.pool, action.id, update_input).await?; diff --git a/crates/common/tests/action_repository_tests.rs b/crates/common/tests/action_repository_tests.rs index 4066462..07d32da 100644 --- a/crates/common/tests/action_repository_tests.rs +++ b/crates/common/tests/action_repository_tests.rs @@ -196,11 +196,7 @@ async fn test_update_action() { let update = UpdateActionInput { label: Some("Updated Label".to_string()), description: Some("Updated description".to_string()), - entrypoint: None, - runtime: None, - runtime_version_constraint: None, - param_schema: None, - out_schema: None, + ..Default::default() }; let updated = ActionRepository::update(&pool, action.id, update) diff --git a/crates/common/tests/repository_artifact_tests.rs b/crates/common/tests/repository_artifact_tests.rs index ccb4816..45aaeb9 100644 --- a/crates/common/tests/repository_artifact_tests.rs +++ b/crates/common/tests/repository_artifact_tests.rs @@ -263,6 +263,7 @@ async fn test_update_artifact_all_fields() { content_type: Some("image/png".to_string()), size_bytes: Some(12345), data: Some(serde_json::json!({"key": "value"})), + execution: None, }; let updated = ArtifactRepository::update(&pool, created.id, update_input.clone()) diff --git a/crates/executor/src/workflow/registrar.rs b/crates/executor/src/workflow/registrar.rs index 5d31abf..2efaa67 100644 --- a/crates/executor/src/workflow/registrar.rs +++ b/crates/executor/src/workflow/registrar.rs @@ -259,6 +259,9 @@ impl WorkflowRegistrar { runtime_version_constraint: None, param_schema: workflow.parameters.clone(), out_schema: workflow.output.clone(), + parameter_delivery: None, + parameter_format: None, + output_format: None, }; ActionRepository::update(&self.pool, action.id, update_input).await?; diff --git a/web/src/components/layout/MainLayout.tsx b/web/src/components/layout/MainLayout.tsx index b1a4261..01c4ba9 100644 --- a/web/src/components/layout/MainLayout.tsx +++ b/web/src/components/layout/MainLayout.tsx @@ -16,8 +16,6 @@ import { SquareAsterisk, KeyRound, Home, - Paperclip, - FolderOpenDot, FolderArchive, } from "lucide-react";