artifact management

This commit is contained in:
2026-03-03 14:16:23 -06:00
parent 8299e5efcb
commit b54aa3ec26
15 changed files with 980 additions and 411 deletions

View File

@@ -9,6 +9,11 @@
//! 2. Triggers (no dependencies)
//! 3. Actions (depend on runtime)
//! 4. Sensors (depend on triggers and runtime)
//!
//! All loaders use **upsert** semantics: if an entity with the same ref already
//! exists it is updated in place (preserving its database ID); otherwise a new
//! row is created. After loading, entities that belong to the pack but whose
//! refs are no longer present in the YAML files are deleted.
use std::collections::HashMap;
use std::path::Path;
@@ -18,34 +23,47 @@ use tracing::{info, warn};
use crate::error::{Error, Result};
use crate::models::Id;
use crate::repositories::action::ActionRepository;
use crate::repositories::runtime::{CreateRuntimeInput, RuntimeRepository};
use crate::repositories::runtime_version::{CreateRuntimeVersionInput, RuntimeVersionRepository};
use crate::repositories::trigger::{
CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository,
use crate::repositories::action::{ActionRepository, UpdateActionInput};
use crate::repositories::runtime::{CreateRuntimeInput, RuntimeRepository, UpdateRuntimeInput};
use crate::repositories::runtime_version::{
CreateRuntimeVersionInput, RuntimeVersionRepository, UpdateRuntimeVersionInput,
};
use crate::repositories::{Create, FindById, FindByRef, Update};
use crate::repositories::trigger::{
CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository, UpdateSensorInput,
UpdateTriggerInput,
};
use crate::repositories::{Create, Delete, FindById, FindByRef, Update};
use crate::version_matching::extract_version_components;
/// Result of loading pack components into the database.
#[derive(Debug, Default)]
pub struct PackLoadResult {
/// Number of runtimes loaded
/// Number of runtimes created
pub runtimes_loaded: usize,
/// Number of runtimes skipped (already exist)
/// Number of runtimes updated (already existed)
pub runtimes_updated: usize,
/// Number of runtimes skipped due to errors
pub runtimes_skipped: usize,
/// Number of triggers loaded
/// Number of triggers created
pub triggers_loaded: usize,
/// Number of triggers skipped (already exist)
/// Number of triggers updated
pub triggers_updated: usize,
/// Number of triggers skipped
pub triggers_skipped: usize,
/// Number of actions loaded
/// Number of actions created
pub actions_loaded: usize,
/// Number of actions skipped (already exist)
/// Number of actions updated
pub actions_updated: usize,
/// Number of actions skipped
pub actions_skipped: usize,
/// Number of sensors loaded
/// Number of sensors created
pub sensors_loaded: usize,
/// Number of sensors skipped (already exist)
/// Number of sensors updated
pub sensors_updated: usize,
/// Number of sensors skipped
pub sensors_skipped: usize,
/// Number of stale entities removed
pub removed: usize,
/// Warnings encountered during loading
pub warnings: Vec<String>,
}
@@ -58,6 +76,10 @@ impl PackLoadResult {
pub fn total_skipped(&self) -> usize {
self.runtimes_skipped + self.triggers_skipped + self.actions_skipped + self.sensors_skipped
}
pub fn total_updated(&self) -> usize {
self.runtimes_updated + self.triggers_updated + self.actions_updated + self.sensors_updated
}
}
/// Loads pack components (triggers, actions, sensors) from YAML files on disk
@@ -79,9 +101,10 @@ impl<'a> PackComponentLoader<'a> {
/// Load all components from the pack directory.
///
/// Reads triggers, actions, and sensors from their respective subdirectories
/// and registers them in the database. Components that already exist (by ref)
/// are skipped.
/// Uses upsert semantics: entities that already exist (by ref) are updated
/// in place, preserving their database IDs. New entities are created.
/// After loading, entities that belong to the pack but are no longer
/// present in the YAML files are removed.
pub async fn load_all(&self, pack_dir: &Path) -> Result<PackLoadResult> {
let mut result = PackLoadResult::default();
@@ -92,43 +115,60 @@ impl<'a> PackComponentLoader<'a> {
);
// 1. Load runtimes first (no dependencies)
self.load_runtimes(pack_dir, &mut result).await?;
let runtime_refs = self.load_runtimes(pack_dir, &mut result).await?;
// 2. Load triggers (no dependencies)
let trigger_ids = self.load_triggers(pack_dir, &mut result).await?;
let (trigger_ids, trigger_refs) = self.load_triggers(pack_dir, &mut result).await?;
// 3. Load actions (depend on runtime)
self.load_actions(pack_dir, &mut result).await?;
let action_refs = self.load_actions(pack_dir, &mut result).await?;
// 4. Load sensors (depend on triggers and runtime)
self.load_sensors(pack_dir, &trigger_ids, &mut result)
let sensor_refs = self
.load_sensors(pack_dir, &trigger_ids, &mut result)
.await?;
// 5. Clean up entities that are no longer in the pack's YAML files
self.cleanup_removed_entities(
&runtime_refs,
&trigger_refs,
&action_refs,
&sensor_refs,
&mut result,
)
.await;
info!(
"Pack '{}' component loading complete: {} loaded, {} skipped, {} warnings",
"Pack '{}' component loading complete: {} created, {} updated, {} skipped, {} removed, {} warnings",
self.pack_ref,
result.total_loaded(),
result.total_updated(),
result.total_skipped(),
result.removed,
result.warnings.len()
);
Ok(result)
}
/// Load trigger definitions from `pack_dir/triggers/*.yaml`.
///
/// Returns a map of trigger ref -> trigger ID for use by sensor loading.
/// Load runtime definitions from `pack_dir/runtimes/*.yaml`.
///
/// Runtimes define how actions and sensors are executed (interpreter,
/// environment setup, dependency management). They are loaded first
/// since actions reference them.
async fn load_runtimes(&self, pack_dir: &Path, result: &mut PackLoadResult) -> Result<()> {
///
/// Returns the set of runtime refs that were loaded (for cleanup).
async fn load_runtimes(
&self,
pack_dir: &Path,
result: &mut PackLoadResult,
) -> Result<Vec<String>> {
let runtimes_dir = pack_dir.join("runtimes");
let mut loaded_refs = Vec::new();
if !runtimes_dir.exists() {
info!("No runtimes directory found for pack '{}'", self.pack_ref);
return Ok(());
return Ok(loaded_refs);
}
let yaml_files = read_yaml_files(&runtimes_dir)?;
@@ -153,16 +193,6 @@ impl<'a> PackComponentLoader<'a> {
}
};
// Check if runtime already exists
if let Some(existing) = RuntimeRepository::find_by_ref(self.pool, &runtime_ref).await? {
info!(
"Runtime '{}' already exists (ID: {}), skipping",
runtime_ref, existing.id
);
result.runtimes_skipped += 1;
continue;
}
let name = data
.get("name")
.and_then(|v| v.as_str())
@@ -188,6 +218,35 @@ impl<'a> PackComponentLoader<'a> {
.and_then(|v| serde_json::to_value(v).ok())
.unwrap_or_else(|| serde_json::json!({}));
// Check if runtime already exists — update in place if so
if let Some(existing) = RuntimeRepository::find_by_ref(self.pool, &runtime_ref).await? {
let update_input = UpdateRuntimeInput {
description,
name: Some(name),
distributions: Some(distributions),
installation,
execution_config: Some(execution_config),
};
match RuntimeRepository::update(self.pool, existing.id, update_input).await {
Ok(_) => {
info!("Updated runtime '{}' (ID: {})", runtime_ref, existing.id);
result.runtimes_updated += 1;
// Also upsert version entries
self.load_runtime_versions(&data, existing.id, &runtime_ref, result)
.await;
}
Err(e) => {
let msg = format!("Failed to update runtime '{}': {}", runtime_ref, e);
warn!("{}", msg);
result.warnings.push(msg);
}
}
loaded_refs.push(runtime_ref);
continue;
}
let input = CreateRuntimeInput {
r#ref: runtime_ref.clone(),
pack: Some(self.pack_id),
@@ -203,6 +262,7 @@ impl<'a> PackComponentLoader<'a> {
Ok(rt) => {
info!("Created runtime '{}' (ID: {})", runtime_ref, rt.id);
result.runtimes_loaded += 1;
loaded_refs.push(runtime_ref.clone());
// Load version entries from the optional `versions` array
self.load_runtime_versions(&data, rt.id, &runtime_ref, result)
@@ -214,10 +274,11 @@ impl<'a> PackComponentLoader<'a> {
if let sqlx::Error::Database(ref inner) = db_err {
if inner.is_unique_violation() {
info!(
"Runtime '{}' already exists (concurrent creation), skipping",
"Runtime '{}' already exists (concurrent creation), treating as update",
runtime_ref
);
result.runtimes_skipped += 1;
loaded_refs.push(runtime_ref);
result.runtimes_updated += 1;
continue;
}
}
@@ -229,29 +290,13 @@ impl<'a> PackComponentLoader<'a> {
}
}
Ok(())
Ok(loaded_refs)
}
/// Load version entries from the `versions` array in a runtime YAML.
/// Load runtime version entries from a runtime's YAML `versions` array.
///
/// Each entry in the array describes a specific version of the runtime
/// with its own `execution_config` and `distributions`. Example:
///
/// ```yaml
/// versions:
/// - version: "3.12"
/// is_default: true
/// execution_config:
/// interpreter:
/// binary: python3.12
/// ...
/// distributions:
/// verification:
/// commands:
/// - binary: python3.12
/// args: ["--version"]
/// ...
/// ```
/// Uses upsert: existing versions (by runtime + version string) are updated,
/// new versions are created.
async fn load_runtime_versions(
&self,
data: &serde_yaml_ng::Value,
@@ -270,6 +315,9 @@ impl<'a> PackComponentLoader<'a> {
runtime_ref
);
// Collect version strings we loaded so we can clean up removed versions
let mut loaded_versions = Vec::new();
for entry in versions {
let version_str = match entry.get("version").and_then(|v| v.as_str()) {
Some(v) => v.to_string(),
@@ -284,21 +332,6 @@ impl<'a> PackComponentLoader<'a> {
}
};
// Check if this version already exists
if let Ok(Some(_existing)) = RuntimeVersionRepository::find_by_runtime_and_version(
self.pool,
runtime_id,
&version_str,
)
.await
{
info!(
"Version '{}' for runtime '{}' already exists, skipping",
version_str, runtime_ref
);
continue;
}
let (version_major, version_minor, version_patch) =
extract_version_components(&version_str);
@@ -322,6 +355,47 @@ impl<'a> PackComponentLoader<'a> {
.and_then(|v| serde_json::to_value(v).ok())
.unwrap_or_else(|| serde_json::json!({}));
// Check if this version already exists — update in place if so
if let Ok(Some(existing)) = RuntimeVersionRepository::find_by_runtime_and_version(
self.pool,
runtime_id,
&version_str,
)
.await
{
let update_input = UpdateRuntimeVersionInput {
version: None, // version string doesn't change
version_major: Some(version_major),
version_minor: Some(version_minor),
version_patch: Some(version_patch),
execution_config: Some(execution_config),
distributions: Some(distributions),
is_default: Some(is_default),
available: None, // preserve current availability — verification sets this
verified_at: None,
meta: Some(meta),
};
match RuntimeVersionRepository::update(self.pool, existing.id, update_input).await {
Ok(_) => {
info!(
"Updated version '{}' for runtime '{}' (ID: {})",
version_str, runtime_ref, existing.id
);
}
Err(e) => {
let msg = format!(
"Failed to update version '{}' for runtime '{}': {}",
version_str, runtime_ref, e
);
warn!("{}", msg);
result.warnings.push(msg);
}
}
loaded_versions.push(version_str);
continue;
}
let input = CreateRuntimeVersionInput {
runtime: runtime_id,
runtime_ref: runtime_ref.to_string(),
@@ -342,6 +416,7 @@ impl<'a> PackComponentLoader<'a> {
"Created version '{}' for runtime '{}' (ID: {})",
version_str, runtime_ref, rv.id
);
loaded_versions.push(version_str);
}
Err(e) => {
// Check for unique constraint violation (race condition)
@@ -352,6 +427,7 @@ impl<'a> PackComponentLoader<'a> {
"Version '{}' for runtime '{}' already exists (concurrent), skipping",
version_str, runtime_ref
);
loaded_versions.push(version_str);
continue;
}
}
@@ -365,19 +441,44 @@ impl<'a> PackComponentLoader<'a> {
}
}
}
// Clean up versions that are no longer in the YAML
if let Ok(existing_versions) =
RuntimeVersionRepository::find_by_runtime(self.pool, runtime_id).await
{
for existing in existing_versions {
if !loaded_versions.contains(&existing.version) {
info!(
"Removing stale version '{}' for runtime '{}'",
existing.version, runtime_ref
);
if let Err(e) = RuntimeVersionRepository::delete(self.pool, existing.id).await {
warn!(
"Failed to delete stale version '{}' for runtime '{}': {}",
existing.version, runtime_ref, e
);
}
}
}
}
}
/// Load trigger definitions from `pack_dir/triggers/*.yaml`.
///
/// Returns a map of trigger ref -> trigger ID for use by sensor loading,
/// and the list of loaded trigger refs for cleanup.
async fn load_triggers(
&self,
pack_dir: &Path,
result: &mut PackLoadResult,
) -> Result<HashMap<String, Id>> {
) -> Result<(HashMap<String, Id>, Vec<String>)> {
let triggers_dir = pack_dir.join("triggers");
let mut trigger_ids = HashMap::new();
let mut loaded_refs = Vec::new();
if !triggers_dir.exists() {
info!("No triggers directory found for pack '{}'", self.pack_ref);
return Ok(trigger_ids);
return Ok((trigger_ids, loaded_refs));
}
let yaml_files = read_yaml_files(&triggers_dir)?;
@@ -402,17 +503,6 @@ impl<'a> PackComponentLoader<'a> {
}
};
// Check if trigger already exists
if let Some(existing) = TriggerRepository::find_by_ref(self.pool, &trigger_ref).await? {
info!(
"Trigger '{}' already exists (ID: {}), skipping",
trigger_ref, existing.id
);
trigger_ids.insert(trigger_ref, existing.id);
result.triggers_skipped += 1;
continue;
}
let name = extract_name_from_ref(&trigger_ref);
let label = data
.get("label")
@@ -439,6 +529,32 @@ impl<'a> PackComponentLoader<'a> {
.get("output")
.and_then(|v| serde_json::to_value(v).ok());
// Check if trigger already exists — update in place if so
if let Some(existing) = TriggerRepository::find_by_ref(self.pool, &trigger_ref).await? {
let update_input = UpdateTriggerInput {
label: Some(label),
description: Some(description),
enabled: Some(enabled),
param_schema,
out_schema,
};
match TriggerRepository::update(self.pool, existing.id, update_input).await {
Ok(_) => {
info!("Updated trigger '{}' (ID: {})", trigger_ref, existing.id);
result.triggers_updated += 1;
}
Err(e) => {
let msg = format!("Failed to update trigger '{}': {}", trigger_ref, e);
warn!("{}", msg);
result.warnings.push(msg);
}
}
trigger_ids.insert(trigger_ref.clone(), existing.id);
loaded_refs.push(trigger_ref);
continue;
}
let input = CreateTriggerInput {
r#ref: trigger_ref.clone(),
pack: Some(self.pack_id),
@@ -454,7 +570,8 @@ impl<'a> PackComponentLoader<'a> {
match TriggerRepository::create(self.pool, input).await {
Ok(trigger) => {
info!("Created trigger '{}' (ID: {})", trigger_ref, trigger.id);
trigger_ids.insert(trigger_ref, trigger.id);
trigger_ids.insert(trigger_ref.clone(), trigger.id);
loaded_refs.push(trigger_ref);
result.triggers_loaded += 1;
}
Err(e) => {
@@ -465,16 +582,23 @@ impl<'a> PackComponentLoader<'a> {
}
}
Ok(trigger_ids)
Ok((trigger_ids, loaded_refs))
}
/// Load action definitions from `pack_dir/actions/*.yaml`.
async fn load_actions(&self, pack_dir: &Path, result: &mut PackLoadResult) -> Result<()> {
///
/// Returns the list of loaded action refs for cleanup.
async fn load_actions(
&self,
pack_dir: &Path,
result: &mut PackLoadResult,
) -> Result<Vec<String>> {
let actions_dir = pack_dir.join("actions");
let mut loaded_refs = Vec::new();
if !actions_dir.exists() {
info!("No actions directory found for pack '{}'", self.pack_ref);
return Ok(());
return Ok(loaded_refs);
}
let yaml_files = read_yaml_files(&actions_dir)?;
@@ -499,16 +623,6 @@ impl<'a> PackComponentLoader<'a> {
}
};
// Check if action already exists
if let Some(existing) = ActionRepository::find_by_ref(self.pool, &action_ref).await? {
info!(
"Action '{}' already exists (ID: {}), skipping",
action_ref, existing.id
);
result.actions_skipped += 1;
continue;
}
let name = extract_name_from_ref(&action_ref);
let label = data
.get("label")
@@ -544,9 +658,6 @@ impl<'a> PackComponentLoader<'a> {
.get("output")
.and_then(|v| serde_json::to_value(v).ok());
// Read optional fields for parameter delivery/format and output format.
// The database has defaults (stdin, json, text), so we only set these
// in the INSERT if the YAML specifies them.
let parameter_delivery = data
.get("parameter_delivery")
.and_then(|v| v.as_str())
@@ -571,6 +682,36 @@ impl<'a> PackComponentLoader<'a> {
.and_then(|v| v.as_str())
.map(|s| s.to_string());
// Check if action already exists — update in place if so
if let Some(existing) = ActionRepository::find_by_ref(self.pool, &action_ref).await? {
let update_input = UpdateActionInput {
label: Some(label),
description: Some(description),
entrypoint: Some(entrypoint),
runtime: runtime_id,
runtime_version_constraint: Some(runtime_version_constraint),
param_schema,
out_schema,
parameter_delivery: Some(parameter_delivery),
parameter_format: Some(parameter_format),
output_format: Some(output_format),
};
match ActionRepository::update(self.pool, existing.id, update_input).await {
Ok(_) => {
info!("Updated action '{}' (ID: {})", action_ref, existing.id);
result.actions_updated += 1;
}
Err(e) => {
let msg = format!("Failed to update action '{}': {}", action_ref, e);
warn!("{}", msg);
result.warnings.push(msg);
}
}
loaded_refs.push(action_ref);
continue;
}
// Use raw SQL to include parameter_delivery, parameter_format,
// output_format which are not in CreateActionInput
let create_result = sqlx::query_scalar::<_, i64>(
@@ -604,6 +745,7 @@ impl<'a> PackComponentLoader<'a> {
match create_result {
Ok(id) => {
info!("Created action '{}' (ID: {})", action_ref, id);
loaded_refs.push(action_ref);
result.actions_loaded += 1;
}
Err(e) => {
@@ -611,10 +753,11 @@ impl<'a> PackComponentLoader<'a> {
if let sqlx::Error::Database(ref db_err) = e {
if db_err.is_unique_violation() {
info!(
"Action '{}' already exists (concurrent creation), skipping",
"Action '{}' already exists (concurrent creation), treating as update",
action_ref
);
result.actions_skipped += 1;
loaded_refs.push(action_ref);
result.actions_updated += 1;
continue;
}
}
@@ -625,21 +768,24 @@ impl<'a> PackComponentLoader<'a> {
}
}
Ok(())
Ok(loaded_refs)
}
/// Load sensor definitions from `pack_dir/sensors/*.yaml`.
///
/// Returns the list of loaded sensor refs for cleanup.
async fn load_sensors(
&self,
pack_dir: &Path,
trigger_ids: &HashMap<String, Id>,
result: &mut PackLoadResult,
) -> Result<()> {
) -> Result<Vec<String>> {
let sensors_dir = pack_dir.join("sensors");
let mut loaded_refs = Vec::new();
if !sensors_dir.exists() {
info!("No sensors directory found for pack '{}'", self.pack_ref);
return Ok(());
return Ok(loaded_refs);
}
let yaml_files = read_yaml_files(&sensors_dir)?;
@@ -758,8 +904,6 @@ impl<'a> PackComponentLoader<'a> {
// Upsert: update existing sensors so re-registration corrects
// stale metadata (especially runtime assignments).
if let Some(existing) = SensorRepository::find_by_ref(self.pool, &sensor_ref).await? {
use crate::repositories::trigger::UpdateSensorInput;
let update_input = UpdateSensorInput {
label: Some(label),
description: Some(description),
@@ -780,7 +924,7 @@ impl<'a> PackComponentLoader<'a> {
"Updated sensor '{}' (ID: {}, runtime: {} → {})",
sensor_ref, existing.id, existing.runtime_ref, sensor_runtime_ref
);
result.sensors_loaded += 1;
result.sensors_updated += 1;
}
Err(e) => {
let msg = format!("Failed to update sensor '{}': {}", sensor_ref, e);
@@ -788,6 +932,7 @@ impl<'a> PackComponentLoader<'a> {
result.warnings.push(msg);
}
}
loaded_refs.push(sensor_ref);
continue;
}
@@ -811,6 +956,7 @@ impl<'a> PackComponentLoader<'a> {
match SensorRepository::create(self.pool, input).await {
Ok(sensor) => {
info!("Created sensor '{}' (ID: {})", sensor_ref, sensor.id);
loaded_refs.push(sensor_ref);
result.sensors_loaded += 1;
}
Err(e) => {
@@ -821,7 +967,7 @@ impl<'a> PackComponentLoader<'a> {
}
}
Ok(())
Ok(loaded_refs)
}
/// Resolve a runtime ID from a runner type string (e.g., "shell", "python", "native").
@@ -917,11 +1063,116 @@ impl<'a> PackComponentLoader<'a> {
}
}
}
/// Remove entities that belong to this pack but whose refs are no longer
/// present in the pack's YAML files.
///
/// This handles the case where an action/trigger/sensor/runtime was removed
/// from the pack between versions. Ad-hoc (user-created) entities are never
/// removed.
async fn cleanup_removed_entities(
&self,
runtime_refs: &[String],
trigger_refs: &[String],
action_refs: &[String],
sensor_refs: &[String],
result: &mut PackLoadResult,
) {
// Clean up sensors first (they depend on triggers/runtimes)
match SensorRepository::delete_by_pack_excluding(self.pool, self.pack_id, sensor_refs).await
{
Ok(count) => {
if count > 0 {
info!(
"Removed {} stale sensor(s) from pack '{}'",
count, self.pack_ref
);
result.removed += count as usize;
}
}
Err(e) => {
warn!(
"Failed to clean up stale sensors for pack '{}': {}",
self.pack_ref, e
);
}
}
// Clean up actions (ad-hoc preserved)
match ActionRepository::delete_non_adhoc_by_pack_excluding(
self.pool,
self.pack_id,
action_refs,
)
.await
{
Ok(count) => {
if count > 0 {
info!(
"Removed {} stale action(s) from pack '{}'",
count, self.pack_ref
);
result.removed += count as usize;
}
}
Err(e) => {
warn!(
"Failed to clean up stale actions for pack '{}': {}",
self.pack_ref, e
);
}
}
// Clean up triggers (ad-hoc preserved)
match TriggerRepository::delete_non_adhoc_by_pack_excluding(
self.pool,
self.pack_id,
trigger_refs,
)
.await
{
Ok(count) => {
if count > 0 {
info!(
"Removed {} stale trigger(s) from pack '{}'",
count, self.pack_ref
);
result.removed += count as usize;
}
}
Err(e) => {
warn!(
"Failed to clean up stale triggers for pack '{}': {}",
self.pack_ref, e
);
}
}
// Clean up runtimes last (actions/sensors may reference them)
match RuntimeRepository::delete_by_pack_excluding(self.pool, self.pack_id, runtime_refs)
.await
{
Ok(count) => {
if count > 0 {
info!(
"Removed {} stale runtime(s) from pack '{}'",
count, self.pack_ref
);
result.removed += count as usize;
}
}
Err(e) => {
warn!(
"Failed to clean up stale runtimes for pack '{}': {}",
self.pack_ref, e
);
}
}
}
}
/// Read all `.yaml` and `.yml` files from a directory, sorted by filename.
///
/// Returns a Vec of (filename, content) pairs.
/// Read all YAML files from a directory, returning `(filename, content)` pairs
/// sorted by filename for deterministic ordering.
fn read_yaml_files(dir: &Path) -> Result<Vec<(String, String)>> {
let mut files = Vec::new();

View File

@@ -8,6 +8,11 @@ use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, FindByRef, List, Repository, Update};
/// Columns selected in all Action queries. Must match the `Action` model's `FromRow` fields.
pub const ACTION_COLUMNS: &str = "id, ref, pack, pack_ref, label, description, entrypoint, \
runtime, runtime_version_constraint, param_schema, out_schema, workflow_def, is_adhoc, \
parameter_delivery, parameter_format, output_format, created, updated";
/// Filters for [`ActionRepository::list_search`].
///
/// All fields are optional and combinable (AND). Pagination is always applied.
@@ -65,6 +70,9 @@ pub struct UpdateActionInput {
pub runtime_version_constraint: Option<Option<String>>,
pub param_schema: Option<JsonSchema>,
pub out_schema: Option<JsonSchema>,
pub parameter_delivery: Option<String>,
pub parameter_format: Option<String>,
pub output_format: Option<String>,
}
#[async_trait::async_trait]
@@ -73,15 +81,10 @@ impl FindById for ActionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
let action = sqlx::query_as::<_, Action>(
r#"
SELECT id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
FROM action
WHERE id = $1
"#,
)
let action = sqlx::query_as::<_, Action>(&format!(
"SELECT {} FROM action WHERE id = $1",
ACTION_COLUMNS
))
.bind(id)
.fetch_optional(executor)
.await?;
@@ -96,15 +99,10 @@ impl FindByRef for ActionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
let action = sqlx::query_as::<_, Action>(
r#"
SELECT id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
FROM action
WHERE ref = $1
"#,
)
let action = sqlx::query_as::<_, Action>(&format!(
"SELECT {} FROM action WHERE ref = $1",
ACTION_COLUMNS
))
.bind(ref_str)
.fetch_optional(executor)
.await?;
@@ -119,15 +117,10 @@ impl List for ActionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
let actions = sqlx::query_as::<_, Action>(
r#"
SELECT id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
FROM action
ORDER BY ref ASC
"#,
)
let actions = sqlx::query_as::<_, Action>(&format!(
"SELECT {} FROM action ORDER BY ref ASC",
ACTION_COLUMNS
))
.fetch_all(executor)
.await?;
@@ -155,16 +148,15 @@ impl Create for ActionRepository {
}
// Try to insert - database will enforce uniqueness constraint
let action = sqlx::query_as::<_, Action>(
let action = sqlx::query_as::<_, Action>(&format!(
r#"
INSERT INTO action (ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint, param_schema, out_schema, is_adhoc)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
RETURNING id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
RETURNING {}
"#,
)
ACTION_COLUMNS
))
.bind(&input.r#ref)
.bind(input.pack)
.bind(&input.pack_ref)
@@ -267,6 +259,33 @@ impl Update for ActionRepository {
has_updates = true;
}
if let Some(parameter_delivery) = &input.parameter_delivery {
if has_updates {
query.push(", ");
}
query.push("parameter_delivery = ");
query.push_bind(parameter_delivery);
has_updates = true;
}
if let Some(parameter_format) = &input.parameter_format {
if has_updates {
query.push(", ");
}
query.push("parameter_format = ");
query.push_bind(parameter_format);
has_updates = true;
}
if let Some(output_format) = &input.output_format {
if has_updates {
query.push(", ");
}
query.push("output_format = ");
query.push_bind(output_format);
has_updates = true;
}
if !has_updates {
// No updates requested, fetch and return existing action
return Self::find_by_id(executor, id)
@@ -276,7 +295,7 @@ impl Update for ActionRepository {
query.push(", updated = NOW() WHERE id = ");
query.push_bind(id);
query.push(" RETURNING id, ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_version_constraint, param_schema, out_schema, workflow_def, is_adhoc, created, updated");
query.push(&format!(" RETURNING {}", ACTION_COLUMNS));
let action = query
.build_query_as::<Action>()
@@ -317,10 +336,8 @@ impl ActionRepository {
where
E: Executor<'e, Database = Postgres> + Copy + 'e,
{
let select_cols = "id, ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_version_constraint, param_schema, out_schema, workflow_def, is_adhoc, created, updated";
let mut qb: QueryBuilder<'_, Postgres> =
QueryBuilder::new(format!("SELECT {select_cols} FROM action"));
QueryBuilder::new(format!("SELECT {} FROM action", ACTION_COLUMNS));
let mut count_qb: QueryBuilder<'_, Postgres> =
QueryBuilder::new("SELECT COUNT(*) FROM action");
@@ -398,16 +415,10 @@ impl ActionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
let actions = sqlx::query_as::<_, Action>(
r#"
SELECT id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
FROM action
WHERE pack = $1
ORDER BY ref ASC
"#,
)
let actions = sqlx::query_as::<_, Action>(&format!(
"SELECT {} FROM action WHERE pack = $1 ORDER BY ref ASC",
ACTION_COLUMNS
))
.bind(pack_id)
.fetch_all(executor)
.await?;
@@ -420,16 +431,10 @@ impl ActionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
let actions = sqlx::query_as::<_, Action>(
r#"
SELECT id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
FROM action
WHERE runtime = $1
ORDER BY ref ASC
"#,
)
let actions = sqlx::query_as::<_, Action>(&format!(
"SELECT {} FROM action WHERE runtime = $1 ORDER BY ref ASC",
ACTION_COLUMNS
))
.bind(runtime_id)
.fetch_all(executor)
.await?;
@@ -443,16 +448,10 @@ impl ActionRepository {
E: Executor<'e, Database = Postgres> + 'e,
{
let search_pattern = format!("%{}%", query.to_lowercase());
let actions = sqlx::query_as::<_, Action>(
r#"
SELECT id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
FROM action
WHERE LOWER(ref) LIKE $1 OR LOWER(label) LIKE $1 OR LOWER(description) LIKE $1
ORDER BY ref ASC
"#,
)
let actions = sqlx::query_as::<_, Action>(&format!(
"SELECT {} FROM action WHERE LOWER(ref) LIKE $1 OR LOWER(label) LIKE $1 OR LOWER(description) LIKE $1 ORDER BY ref ASC",
ACTION_COLUMNS
))
.bind(&search_pattern)
.fetch_all(executor)
.await?;
@@ -465,16 +464,10 @@ impl ActionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
let actions = sqlx::query_as::<_, Action>(
r#"
SELECT id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
FROM action
WHERE workflow_def IS NOT NULL
ORDER BY ref ASC
"#,
)
let actions = sqlx::query_as::<_, Action>(&format!(
"SELECT {} FROM action WHERE workflow_def IS NOT NULL ORDER BY ref ASC",
ACTION_COLUMNS
))
.fetch_all(executor)
.await?;
@@ -489,15 +482,10 @@ impl ActionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
let action = sqlx::query_as::<_, Action>(
r#"
SELECT id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
FROM action
WHERE workflow_def = $1
"#,
)
let action = sqlx::query_as::<_, Action>(&format!(
"SELECT {} FROM action WHERE workflow_def = $1",
ACTION_COLUMNS
))
.bind(workflow_def_id)
.fetch_optional(executor)
.await?;
@@ -505,6 +493,36 @@ impl ActionRepository {
Ok(action)
}
/// Delete non-adhoc actions belonging to a pack whose refs are NOT in the given set.
///
/// Used during pack reinstallation to clean up actions that were removed
/// from the pack's YAML files. Ad-hoc (user-created) actions are preserved.
pub async fn delete_non_adhoc_by_pack_excluding<'e, E>(
executor: E,
pack_id: Id,
keep_refs: &[String],
) -> Result<u64>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let result = if keep_refs.is_empty() {
sqlx::query("DELETE FROM action WHERE pack = $1 AND is_adhoc = false")
.bind(pack_id)
.execute(executor)
.await?
} else {
sqlx::query(
"DELETE FROM action WHERE pack = $1 AND is_adhoc = false AND ref != ALL($2)",
)
.bind(pack_id)
.bind(keep_refs)
.execute(executor)
.await?
};
Ok(result.rows_affected())
}
/// Link an action to a workflow definition
pub async fn link_workflow_def<'e, E>(
executor: E,
@@ -514,16 +532,15 @@ impl ActionRepository {
where
E: Executor<'e, Database = Postgres> + 'e,
{
let action = sqlx::query_as::<_, Action>(
let action = sqlx::query_as::<_, Action>(&format!(
r#"
UPDATE action
SET workflow_def = $2, updated = NOW()
WHERE id = $1
RETURNING id, ref, pack, pack_ref, label, description, entrypoint,
runtime, runtime_version_constraint,
param_schema, out_schema, workflow_def, is_adhoc, created, updated
RETURNING {}
"#,
)
ACTION_COLUMNS
))
.bind(action_id)
.bind(workflow_def_id)
.fetch_one(executor)

View File

@@ -52,6 +52,7 @@ pub struct UpdateArtifactInput {
pub description: Option<String>,
pub content_type: Option<String>,
pub size_bytes: Option<i64>,
pub execution: Option<Option<i64>>,
pub data: Option<serde_json::Value>,
}
@@ -189,6 +190,15 @@ impl Update for ArtifactRepository {
push_field!(&input.description, "description");
push_field!(&input.content_type, "content_type");
push_field!(input.size_bytes, "size_bytes");
// execution is Option<Option<i64>> — outer Option = "was field provided?",
// inner Option = nullable column value
if let Some(exec_val) = input.execution {
if has_updates {
query.push(", ");
}
query.push("execution = ").push_bind(exec_val);
has_updates = true;
}
push_field!(&input.data, "data");
if !has_updates {

View File

@@ -284,6 +284,34 @@ impl RuntimeRepository {
Ok(runtime)
}
/// Delete runtimes belonging to a pack whose refs are NOT in the given set.
///
/// Used during pack reinstallation to clean up runtimes that were removed
/// from the pack's YAML files. Associated runtime_version rows are
/// cascade-deleted by the FK constraint.
pub async fn delete_by_pack_excluding<'e, E>(
executor: E,
pack_id: Id,
keep_refs: &[String],
) -> Result<u64>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let result = if keep_refs.is_empty() {
sqlx::query("DELETE FROM runtime WHERE pack = $1")
.bind(pack_id)
.execute(executor)
.await?
} else {
sqlx::query("DELETE FROM runtime WHERE pack = $1 AND ref != ALL($2)")
.bind(pack_id)
.bind(keep_refs)
.execute(executor)
.await?
};
Ok(result.rows_affected())
}
}
// ============================================================================

View File

@@ -301,6 +301,36 @@ impl Delete for TriggerRepository {
}
impl TriggerRepository {
/// Delete non-adhoc triggers belonging to a pack whose refs are NOT in the given set.
///
/// Used during pack reinstallation to clean up triggers that were removed
/// from the pack's YAML files. Ad-hoc (user-created) triggers are preserved.
pub async fn delete_non_adhoc_by_pack_excluding<'e, E>(
executor: E,
pack_id: Id,
keep_refs: &[String],
) -> Result<u64>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let result = if keep_refs.is_empty() {
sqlx::query("DELETE FROM trigger WHERE pack = $1 AND is_adhoc = false")
.bind(pack_id)
.execute(executor)
.await?
} else {
sqlx::query(
"DELETE FROM trigger WHERE pack = $1 AND is_adhoc = false AND ref != ALL($2)",
)
.bind(pack_id)
.bind(keep_refs)
.execute(executor)
.await?
};
Ok(result.rows_affected())
}
/// Search triggers with all filters pushed into SQL.
///
/// All filter fields are combinable (AND). Pagination is server-side.
@@ -907,6 +937,34 @@ impl Delete for SensorRepository {
}
impl SensorRepository {
/// Delete non-adhoc sensors belonging to a pack whose refs are NOT in the given set.
///
/// Used during pack reinstallation to clean up sensors that were removed
/// from the pack's YAML files.
pub async fn delete_by_pack_excluding<'e, E>(
executor: E,
pack_id: Id,
keep_refs: &[String],
) -> Result<u64>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let result = if keep_refs.is_empty() {
sqlx::query("DELETE FROM sensor WHERE pack = $1")
.bind(pack_id)
.execute(executor)
.await?
} else {
sqlx::query("DELETE FROM sensor WHERE pack = $1 AND ref != ALL($2)")
.bind(pack_id)
.bind(keep_refs)
.execute(executor)
.await?
};
Ok(result.rows_affected())
}
/// Search sensors with all filters pushed into SQL.
///
/// All filter fields are combinable (AND). Pagination is server-side.

View File

@@ -257,6 +257,9 @@ impl WorkflowRegistrar {
runtime_version_constraint: None,
param_schema: workflow.parameters.clone(),
out_schema: workflow.output.clone(),
parameter_delivery: None,
parameter_format: None,
output_format: None,
};
ActionRepository::update(&self.pool, action.id, update_input).await?;

View File

@@ -196,11 +196,7 @@ async fn test_update_action() {
let update = UpdateActionInput {
label: Some("Updated Label".to_string()),
description: Some("Updated description".to_string()),
entrypoint: None,
runtime: None,
runtime_version_constraint: None,
param_schema: None,
out_schema: None,
..Default::default()
};
let updated = ActionRepository::update(&pool, action.id, update)

View File

@@ -263,6 +263,7 @@ async fn test_update_artifact_all_fields() {
content_type: Some("image/png".to_string()),
size_bytes: Some(12345),
data: Some(serde_json::json!({"key": "value"})),
execution: None,
};
let updated = ArtifactRepository::update(&pool, created.id, update_input.clone())