change capture

This commit is contained in:
2026-02-26 14:34:02 -06:00
parent 7ee3604eb1
commit b43495b26d
47 changed files with 5785 additions and 1525 deletions

View File

@@ -10,6 +10,7 @@ use sqlx::FromRow;
// Re-export common types
pub use action::*;
pub use entity_history::*;
pub use enums::*;
pub use event::*;
pub use execution::*;
@@ -1439,3 +1440,91 @@ pub mod pack_test {
pub last_test_passed: Option<bool>,
}
}
/// Entity history tracking models (TimescaleDB hypertables)
///
/// These models represent rows in the `<entity>_history` append-only hypertables
/// that track field-level changes to operational tables via PostgreSQL triggers.
pub mod entity_history {
use super::*;
/// A single history record capturing a field-level change to an entity.
///
/// History records are append-only and populated by PostgreSQL triggers —
/// they are never created or modified by application code.
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct EntityHistoryRecord {
/// When the change occurred (hypertable partitioning dimension)
pub time: DateTime<Utc>,
/// The operation that produced this record: `INSERT`, `UPDATE`, or `DELETE`
pub operation: String,
/// The primary key of the changed row in the source table
pub entity_id: Id,
/// Denormalized human-readable identifier (e.g., `action_ref`, `worker.name`, `rule_ref`, `trigger_ref`)
pub entity_ref: Option<String>,
/// Names of fields that changed in this operation (empty for INSERT/DELETE)
pub changed_fields: Vec<String>,
/// Previous values of the changed fields (NULL for INSERT)
pub old_values: Option<JsonValue>,
/// New values of the changed fields (NULL for DELETE)
pub new_values: Option<JsonValue>,
}
/// Supported entity types that have history tracking.
///
/// Each variant maps to a `<name>_history` hypertable in the database.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum HistoryEntityType {
Execution,
Worker,
Enforcement,
Event,
}
impl HistoryEntityType {
/// Returns the history table name for this entity type.
pub fn table_name(&self) -> &'static str {
match self {
Self::Execution => "execution_history",
Self::Worker => "worker_history",
Self::Enforcement => "enforcement_history",
Self::Event => "event_history",
}
}
}
impl std::fmt::Display for HistoryEntityType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Execution => write!(f, "execution"),
Self::Worker => write!(f, "worker"),
Self::Enforcement => write!(f, "enforcement"),
Self::Event => write!(f, "event"),
}
}
}
impl std::str::FromStr for HistoryEntityType {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"execution" => Ok(Self::Execution),
"worker" => Ok(Self::Worker),
"enforcement" => Ok(Self::Enforcement),
"event" => Ok(Self::Event),
other => Err(format!(
"unknown history entity type '{}'; expected one of: execution, worker, enforcement, event",
other
)),
}
}
}
}

View File

@@ -191,9 +191,13 @@ impl RabbitMqConfig {
/// Queue configurations
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueuesConfig {
/// Events queue configuration
/// Events queue configuration (sensor catch-all, bound with `#`)
pub events: QueueConfig,
/// Executor events queue configuration (bound only to `event.created`)
#[serde(default = "default_executor_events_queue")]
pub executor_events: QueueConfig,
/// Executions queue configuration (legacy - to be deprecated)
pub executions: QueueConfig,
@@ -216,6 +220,15 @@ pub struct QueuesConfig {
pub notifications: QueueConfig,
}
fn default_executor_events_queue() -> QueueConfig {
QueueConfig {
name: "attune.executor.events.queue".to_string(),
durable: true,
exclusive: false,
auto_delete: false,
}
}
impl Default for QueuesConfig {
fn default() -> Self {
Self {
@@ -225,6 +238,12 @@ impl Default for QueuesConfig {
exclusive: false,
auto_delete: false,
},
executor_events: QueueConfig {
name: "attune.executor.events.queue".to_string(),
durable: true,
exclusive: false,
auto_delete: false,
},
executions: QueueConfig {
name: "attune.executions.queue".to_string(),
durable: true,
@@ -567,6 +586,7 @@ mod tests {
fn test_default_queues() {
let queues = QueuesConfig::default();
assert_eq!(queues.events.name, "attune.events.queue");
assert_eq!(queues.executor_events.name, "attune.executor.events.queue");
assert_eq!(queues.executions.name, "attune.executions.queue");
assert_eq!(
queues.execution_completed.name,

View File

@@ -396,6 +396,11 @@ impl Connection {
None
};
// Declare executor-specific events queue (only receives event.created messages,
// unlike the sensor's catch-all events queue which is bound with `#`)
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.executor_events, dlx)
.await?;
// Declare executor queues
self.declare_queue_with_optional_dlx(&config.rabbitmq.queues.enforcements, dlx)
.await?;
@@ -444,6 +449,15 @@ impl Connection {
)
.await?;
// Bind executor events queue to only event.created routing key
// (the sensor's attune.events.queue uses `#` and gets all message types)
self.bind_queue(
&config.rabbitmq.queues.executor_events.name,
&config.rabbitmq.exchanges.events.name,
"event.created",
)
.await?;
info!("Executor infrastructure setup complete");
Ok(())
}

View File

@@ -190,8 +190,10 @@ pub mod exchanges {
/// Well-known queue names
pub mod queues {
/// Event processing queue
/// Event processing queue (sensor catch-all, bound with `#`)
pub const EVENTS: &str = "attune.events.queue";
/// Executor event processing queue (bound only to `event.created`)
pub const EXECUTOR_EVENTS: &str = "attune.executor.events.queue";
/// Execution request queue
pub const EXECUTIONS: &str = "attune.executions.queue";
/// Notification delivery queue

View File

@@ -0,0 +1,565 @@
//! Analytics repository for querying TimescaleDB continuous aggregates
//!
//! This module provides read-only query methods for the continuous aggregate
//! materialized views created in migration 000009_timescaledb_history. These views are
//! auto-refreshed by TimescaleDB policies and provide pre-computed hourly
//! rollups for dashboard widgets.
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::{Executor, FromRow, Postgres};
use crate::Result;
/// Repository for querying analytics continuous aggregates.
///
/// All methods are read-only. The underlying materialized views are
/// auto-refreshed by TimescaleDB continuous aggregate policies.
pub struct AnalyticsRepository;
// ---------------------------------------------------------------------------
// Row types returned by aggregate queries
// ---------------------------------------------------------------------------
/// A single hourly bucket of execution status transitions.
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct ExecutionStatusBucket {
/// Start of the 1-hour bucket
pub bucket: DateTime<Utc>,
/// Action ref (e.g., "core.http_request"); NULL when grouped across all actions
pub action_ref: Option<String>,
/// The status that was transitioned to (e.g., "completed", "failed")
pub new_status: Option<String>,
/// Number of transitions in this bucket
pub transition_count: i64,
}
/// A single hourly bucket of execution throughput (creations).
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct ExecutionThroughputBucket {
/// Start of the 1-hour bucket
pub bucket: DateTime<Utc>,
/// Action ref; NULL when grouped across all actions
pub action_ref: Option<String>,
/// Number of executions created in this bucket
pub execution_count: i64,
}
/// A single hourly bucket of event volume.
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct EventVolumeBucket {
/// Start of the 1-hour bucket
pub bucket: DateTime<Utc>,
/// Trigger ref; NULL when grouped across all triggers
pub trigger_ref: Option<String>,
/// Number of events created in this bucket
pub event_count: i64,
}
/// A single hourly bucket of worker status transitions.
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct WorkerStatusBucket {
/// Start of the 1-hour bucket
pub bucket: DateTime<Utc>,
/// Worker name; NULL when grouped across all workers
pub worker_name: Option<String>,
/// The status transitioned to (e.g., "online", "offline")
pub new_status: Option<String>,
/// Number of transitions in this bucket
pub transition_count: i64,
}
/// A single hourly bucket of enforcement volume.
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct EnforcementVolumeBucket {
/// Start of the 1-hour bucket
pub bucket: DateTime<Utc>,
/// Rule ref; NULL when grouped across all rules
pub rule_ref: Option<String>,
/// Number of enforcements created in this bucket
pub enforcement_count: i64,
}
/// Aggregated failure rate over a time range.
#[derive(Debug, Clone, Serialize)]
pub struct FailureRateSummary {
/// Total status transitions to terminal states in the window
pub total_terminal: i64,
/// Number of transitions to "failed" status
pub failed_count: i64,
/// Number of transitions to "timeout" status
pub timeout_count: i64,
/// Number of transitions to "completed" status
pub completed_count: i64,
/// Failure rate as a percentage (0.0 100.0)
pub failure_rate_pct: f64,
}
// ---------------------------------------------------------------------------
// Query parameters
// ---------------------------------------------------------------------------
/// Common time-range parameters for analytics queries.
#[derive(Debug, Clone)]
pub struct AnalyticsTimeRange {
/// Start of the query window (inclusive). Defaults to 24 hours ago.
pub since: DateTime<Utc>,
/// End of the query window (inclusive). Defaults to now.
pub until: DateTime<Utc>,
}
impl Default for AnalyticsTimeRange {
fn default() -> Self {
let now = Utc::now();
Self {
since: now - chrono::Duration::hours(24),
until: now,
}
}
}
impl AnalyticsTimeRange {
/// Create a range covering the last N hours from now.
pub fn last_hours(hours: i64) -> Self {
let now = Utc::now();
Self {
since: now - chrono::Duration::hours(hours),
until: now,
}
}
/// Create a range covering the last N days from now.
pub fn last_days(days: i64) -> Self {
let now = Utc::now();
Self {
since: now - chrono::Duration::days(days),
until: now,
}
}
}
// ---------------------------------------------------------------------------
// Repository implementation
// ---------------------------------------------------------------------------
impl AnalyticsRepository {
// =======================================================================
// Execution status transitions
// =======================================================================
/// Get execution status transitions per hour, aggregated across all actions.
///
/// Returns one row per (bucket, new_status) pair, ordered by bucket ascending.
pub async fn execution_status_hourly<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
) -> Result<Vec<ExecutionStatusBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, ExecutionStatusBucket>(
r#"
SELECT
bucket,
NULL::text AS action_ref,
new_status,
SUM(transition_count)::bigint AS transition_count
FROM execution_status_hourly
WHERE bucket >= $1 AND bucket <= $2
GROUP BY bucket, new_status
ORDER BY bucket ASC, new_status
"#,
)
.bind(range.since)
.bind(range.until)
.fetch_all(executor)
.await?;
Ok(rows)
}
/// Get execution status transitions per hour for a specific action.
pub async fn execution_status_hourly_by_action<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
action_ref: &str,
) -> Result<Vec<ExecutionStatusBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, ExecutionStatusBucket>(
r#"
SELECT
bucket,
action_ref,
new_status,
transition_count
FROM execution_status_hourly
WHERE bucket >= $1 AND bucket <= $2 AND action_ref = $3
ORDER BY bucket ASC, new_status
"#,
)
.bind(range.since)
.bind(range.until)
.bind(action_ref)
.fetch_all(executor)
.await?;
Ok(rows)
}
// =======================================================================
// Execution throughput
// =======================================================================
/// Get execution creation throughput per hour, aggregated across all actions.
pub async fn execution_throughput_hourly<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
) -> Result<Vec<ExecutionThroughputBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, ExecutionThroughputBucket>(
r#"
SELECT
bucket,
NULL::text AS action_ref,
SUM(execution_count)::bigint AS execution_count
FROM execution_throughput_hourly
WHERE bucket >= $1 AND bucket <= $2
GROUP BY bucket
ORDER BY bucket ASC
"#,
)
.bind(range.since)
.bind(range.until)
.fetch_all(executor)
.await?;
Ok(rows)
}
/// Get execution creation throughput per hour for a specific action.
pub async fn execution_throughput_hourly_by_action<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
action_ref: &str,
) -> Result<Vec<ExecutionThroughputBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, ExecutionThroughputBucket>(
r#"
SELECT
bucket,
action_ref,
execution_count
FROM execution_throughput_hourly
WHERE bucket >= $1 AND bucket <= $2 AND action_ref = $3
ORDER BY bucket ASC
"#,
)
.bind(range.since)
.bind(range.until)
.bind(action_ref)
.fetch_all(executor)
.await?;
Ok(rows)
}
// =======================================================================
// Event volume
// =======================================================================
/// Get event creation volume per hour, aggregated across all triggers.
pub async fn event_volume_hourly<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
) -> Result<Vec<EventVolumeBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, EventVolumeBucket>(
r#"
SELECT
bucket,
NULL::text AS trigger_ref,
SUM(event_count)::bigint AS event_count
FROM event_volume_hourly
WHERE bucket >= $1 AND bucket <= $2
GROUP BY bucket
ORDER BY bucket ASC
"#,
)
.bind(range.since)
.bind(range.until)
.fetch_all(executor)
.await?;
Ok(rows)
}
/// Get event creation volume per hour for a specific trigger.
pub async fn event_volume_hourly_by_trigger<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
trigger_ref: &str,
) -> Result<Vec<EventVolumeBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, EventVolumeBucket>(
r#"
SELECT
bucket,
trigger_ref,
event_count
FROM event_volume_hourly
WHERE bucket >= $1 AND bucket <= $2 AND trigger_ref = $3
ORDER BY bucket ASC
"#,
)
.bind(range.since)
.bind(range.until)
.bind(trigger_ref)
.fetch_all(executor)
.await?;
Ok(rows)
}
// =======================================================================
// Worker health
// =======================================================================
/// Get worker status transitions per hour, aggregated across all workers.
pub async fn worker_status_hourly<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
) -> Result<Vec<WorkerStatusBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, WorkerStatusBucket>(
r#"
SELECT
bucket,
NULL::text AS worker_name,
new_status,
SUM(transition_count)::bigint AS transition_count
FROM worker_status_hourly
WHERE bucket >= $1 AND bucket <= $2
GROUP BY bucket, new_status
ORDER BY bucket ASC, new_status
"#,
)
.bind(range.since)
.bind(range.until)
.fetch_all(executor)
.await?;
Ok(rows)
}
/// Get worker status transitions per hour for a specific worker.
pub async fn worker_status_hourly_by_name<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
worker_name: &str,
) -> Result<Vec<WorkerStatusBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, WorkerStatusBucket>(
r#"
SELECT
bucket,
worker_name,
new_status,
transition_count
FROM worker_status_hourly
WHERE bucket >= $1 AND bucket <= $2 AND worker_name = $3
ORDER BY bucket ASC, new_status
"#,
)
.bind(range.since)
.bind(range.until)
.bind(worker_name)
.fetch_all(executor)
.await?;
Ok(rows)
}
// =======================================================================
// Enforcement volume
// =======================================================================
/// Get enforcement creation volume per hour, aggregated across all rules.
pub async fn enforcement_volume_hourly<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
) -> Result<Vec<EnforcementVolumeBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, EnforcementVolumeBucket>(
r#"
SELECT
bucket,
NULL::text AS rule_ref,
SUM(enforcement_count)::bigint AS enforcement_count
FROM enforcement_volume_hourly
WHERE bucket >= $1 AND bucket <= $2
GROUP BY bucket
ORDER BY bucket ASC
"#,
)
.bind(range.since)
.bind(range.until)
.fetch_all(executor)
.await?;
Ok(rows)
}
/// Get enforcement creation volume per hour for a specific rule.
pub async fn enforcement_volume_hourly_by_rule<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
rule_ref: &str,
) -> Result<Vec<EnforcementVolumeBucket>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let rows = sqlx::query_as::<_, EnforcementVolumeBucket>(
r#"
SELECT
bucket,
rule_ref,
enforcement_count
FROM enforcement_volume_hourly
WHERE bucket >= $1 AND bucket <= $2 AND rule_ref = $3
ORDER BY bucket ASC
"#,
)
.bind(range.since)
.bind(range.until)
.bind(rule_ref)
.fetch_all(executor)
.await?;
Ok(rows)
}
// =======================================================================
// Derived analytics
// =======================================================================
/// Compute the execution failure rate over a time range.
///
/// Uses the `execution_status_hourly` aggregate to count terminal-state
/// transitions (completed, failed, timeout) and derive the failure
/// percentage.
pub async fn execution_failure_rate<'e, E>(
executor: E,
range: &AnalyticsTimeRange,
) -> Result<FailureRateSummary>
where
E: Executor<'e, Database = Postgres> + 'e,
{
// Query terminal-state transitions from the aggregate
let rows = sqlx::query_as::<_, (Option<String>, i64)>(
r#"
SELECT
new_status,
SUM(transition_count)::bigint AS cnt
FROM execution_status_hourly
WHERE bucket >= $1 AND bucket <= $2
AND new_status IN ('completed', 'failed', 'timeout')
GROUP BY new_status
"#,
)
.bind(range.since)
.bind(range.until)
.fetch_all(executor)
.await?;
let mut completed: i64 = 0;
let mut failed: i64 = 0;
let mut timeout: i64 = 0;
for (status, count) in &rows {
match status.as_deref() {
Some("completed") => completed = *count,
Some("failed") => failed = *count,
Some("timeout") => timeout = *count,
_ => {}
}
}
let total_terminal = completed + failed + timeout;
let failure_rate_pct = if total_terminal > 0 {
((failed + timeout) as f64 / total_terminal as f64) * 100.0
} else {
0.0
};
Ok(FailureRateSummary {
total_terminal,
failed_count: failed,
timeout_count: timeout,
completed_count: completed,
failure_rate_pct,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_analytics_time_range_default() {
let range = AnalyticsTimeRange::default();
let diff = range.until - range.since;
// Should be approximately 24 hours
assert!((diff.num_hours() - 24).abs() <= 1);
}
#[test]
fn test_analytics_time_range_last_hours() {
let range = AnalyticsTimeRange::last_hours(6);
let diff = range.until - range.since;
assert!((diff.num_hours() - 6).abs() <= 1);
}
#[test]
fn test_analytics_time_range_last_days() {
let range = AnalyticsTimeRange::last_days(7);
let diff = range.until - range.since;
assert!((diff.num_days() - 7).abs() <= 1);
}
#[test]
fn test_failure_rate_summary_zero_total() {
let summary = FailureRateSummary {
total_terminal: 0,
failed_count: 0,
timeout_count: 0,
completed_count: 0,
failure_rate_pct: 0.0,
};
assert_eq!(summary.failure_rate_pct, 0.0);
}
#[test]
fn test_failure_rate_calculation() {
// 80 completed, 15 failed, 5 timeout → 20% failure rate
let total = 80 + 15 + 5;
let rate = ((15 + 5) as f64 / total as f64) * 100.0;
assert!((rate - 20.0).abs() < 0.01);
}
}

View File

@@ -0,0 +1,301 @@
//! Entity history repository for querying TimescaleDB history hypertables
//!
//! This module provides read-only query methods for the `<entity>_history` tables.
//! History records are written exclusively by PostgreSQL triggers — this repository
//! only reads them.
use chrono::{DateTime, Utc};
use sqlx::{Executor, Postgres, QueryBuilder};
use crate::models::entity_history::{EntityHistoryRecord, HistoryEntityType};
use crate::Result;
/// Repository for querying entity history hypertables.
///
/// All methods are read-only. History records are populated by PostgreSQL
/// `AFTER INSERT OR UPDATE OR DELETE` triggers on the operational tables.
pub struct EntityHistoryRepository;
/// Query parameters for filtering history records.
#[derive(Debug, Clone, Default)]
pub struct HistoryQueryParams {
/// Filter by entity ID (e.g., execution.id)
pub entity_id: Option<i64>,
/// Filter by entity ref (e.g., action_ref, worker name)
pub entity_ref: Option<String>,
/// Filter by operation type: `INSERT`, `UPDATE`, or `DELETE`
pub operation: Option<String>,
/// Only include records where this field was changed
pub changed_field: Option<String>,
/// Only include records at or after this time
pub since: Option<DateTime<Utc>>,
/// Only include records at or before this time
pub until: Option<DateTime<Utc>>,
/// Maximum number of records to return (default: 100, max: 1000)
pub limit: Option<i64>,
/// Offset for pagination
pub offset: Option<i64>,
}
impl HistoryQueryParams {
/// Returns the effective limit, capped at 1000.
pub fn effective_limit(&self) -> i64 {
self.limit.unwrap_or(100).min(1000).max(1)
}
/// Returns the effective offset.
pub fn effective_offset(&self) -> i64 {
self.offset.unwrap_or(0).max(0)
}
}
impl EntityHistoryRepository {
/// Query history records for a given entity type with optional filters.
///
/// Results are ordered by `time DESC` (most recent first).
pub async fn query<'e, E>(
executor: E,
entity_type: HistoryEntityType,
params: &HistoryQueryParams,
) -> Result<Vec<EntityHistoryRecord>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
// We must use format! for the table name since it can't be a bind parameter,
// but HistoryEntityType::table_name() returns a known static str so this is safe.
let table = entity_type.table_name();
let mut qb: QueryBuilder<Postgres> =
QueryBuilder::new(format!("SELECT time, operation, entity_id, entity_ref, changed_fields, old_values, new_values FROM {table} WHERE 1=1"));
if let Some(entity_id) = params.entity_id {
qb.push(" AND entity_id = ").push_bind(entity_id);
}
if let Some(ref entity_ref) = params.entity_ref {
qb.push(" AND entity_ref = ").push_bind(entity_ref.clone());
}
if let Some(ref operation) = params.operation {
qb.push(" AND operation = ")
.push_bind(operation.to_uppercase());
}
if let Some(ref changed_field) = params.changed_field {
qb.push(" AND ")
.push_bind(changed_field.clone())
.push(" = ANY(changed_fields)");
}
if let Some(since) = params.since {
qb.push(" AND time >= ").push_bind(since);
}
if let Some(until) = params.until {
qb.push(" AND time <= ").push_bind(until);
}
qb.push(" ORDER BY time DESC");
qb.push(" LIMIT ").push_bind(params.effective_limit());
qb.push(" OFFSET ").push_bind(params.effective_offset());
let records = qb
.build_query_as::<EntityHistoryRecord>()
.fetch_all(executor)
.await?;
Ok(records)
}
/// Count history records for a given entity type with optional filters.
///
/// Useful for pagination metadata.
pub async fn count<'e, E>(
executor: E,
entity_type: HistoryEntityType,
params: &HistoryQueryParams,
) -> Result<i64>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let table = entity_type.table_name();
let mut qb: QueryBuilder<Postgres> =
QueryBuilder::new(format!("SELECT COUNT(*) FROM {table} WHERE 1=1"));
if let Some(entity_id) = params.entity_id {
qb.push(" AND entity_id = ").push_bind(entity_id);
}
if let Some(ref entity_ref) = params.entity_ref {
qb.push(" AND entity_ref = ").push_bind(entity_ref.clone());
}
if let Some(ref operation) = params.operation {
qb.push(" AND operation = ")
.push_bind(operation.to_uppercase());
}
if let Some(ref changed_field) = params.changed_field {
qb.push(" AND ")
.push_bind(changed_field.clone())
.push(" = ANY(changed_fields)");
}
if let Some(since) = params.since {
qb.push(" AND time >= ").push_bind(since);
}
if let Some(until) = params.until {
qb.push(" AND time <= ").push_bind(until);
}
let row: (i64,) = qb.build_query_as().fetch_one(executor).await?;
Ok(row.0)
}
/// Get history records for a specific entity by ID.
///
/// Convenience method equivalent to `query()` with `entity_id` set.
pub async fn find_by_entity_id<'e, E>(
executor: E,
entity_type: HistoryEntityType,
entity_id: i64,
limit: Option<i64>,
) -> Result<Vec<EntityHistoryRecord>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let params = HistoryQueryParams {
entity_id: Some(entity_id),
limit,
..Default::default()
};
Self::query(executor, entity_type, &params).await
}
/// Get only status-change history records for a specific entity.
///
/// Filters to UPDATE operations where `changed_fields` includes `"status"`.
pub async fn find_status_changes<'e, E>(
executor: E,
entity_type: HistoryEntityType,
entity_id: i64,
limit: Option<i64>,
) -> Result<Vec<EntityHistoryRecord>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let params = HistoryQueryParams {
entity_id: Some(entity_id),
operation: Some("UPDATE".to_string()),
changed_field: Some("status".to_string()),
limit,
..Default::default()
};
Self::query(executor, entity_type, &params).await
}
/// Get the most recent history record for a specific entity.
pub async fn find_latest<'e, E>(
executor: E,
entity_type: HistoryEntityType,
entity_id: i64,
) -> Result<Option<EntityHistoryRecord>>
where
E: Executor<'e, Database = Postgres> + 'e,
{
let records = Self::find_by_entity_id(executor, entity_type, entity_id, Some(1)).await?;
Ok(records.into_iter().next())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_history_query_params_defaults() {
let params = HistoryQueryParams::default();
assert_eq!(params.effective_limit(), 100);
assert_eq!(params.effective_offset(), 0);
}
#[test]
fn test_history_query_params_limit_cap() {
let params = HistoryQueryParams {
limit: Some(5000),
..Default::default()
};
assert_eq!(params.effective_limit(), 1000);
}
#[test]
fn test_history_query_params_limit_min() {
let params = HistoryQueryParams {
limit: Some(-10),
..Default::default()
};
assert_eq!(params.effective_limit(), 1);
}
#[test]
fn test_history_query_params_offset_min() {
let params = HistoryQueryParams {
offset: Some(-5),
..Default::default()
};
assert_eq!(params.effective_offset(), 0);
}
#[test]
fn test_history_entity_type_table_name() {
assert_eq!(
HistoryEntityType::Execution.table_name(),
"execution_history"
);
assert_eq!(HistoryEntityType::Worker.table_name(), "worker_history");
assert_eq!(
HistoryEntityType::Enforcement.table_name(),
"enforcement_history"
);
assert_eq!(HistoryEntityType::Event.table_name(), "event_history");
}
#[test]
fn test_history_entity_type_from_str() {
assert_eq!(
"execution".parse::<HistoryEntityType>().unwrap(),
HistoryEntityType::Execution
);
assert_eq!(
"Worker".parse::<HistoryEntityType>().unwrap(),
HistoryEntityType::Worker
);
assert_eq!(
"ENFORCEMENT".parse::<HistoryEntityType>().unwrap(),
HistoryEntityType::Enforcement
);
assert_eq!(
"event".parse::<HistoryEntityType>().unwrap(),
HistoryEntityType::Event
);
assert!("unknown".parse::<HistoryEntityType>().is_err());
}
#[test]
fn test_history_entity_type_display() {
assert_eq!(HistoryEntityType::Execution.to_string(), "execution");
assert_eq!(HistoryEntityType::Worker.to_string(), "worker");
assert_eq!(HistoryEntityType::Enforcement.to_string(), "enforcement");
assert_eq!(HistoryEntityType::Event.to_string(), "event");
}
}

View File

@@ -28,7 +28,9 @@
use sqlx::{Executor, Postgres, Transaction};
pub mod action;
pub mod analytics;
pub mod artifact;
pub mod entity_history;
pub mod event;
pub mod execution;
pub mod identity;
@@ -46,7 +48,9 @@ pub mod workflow;
// Re-export repository types
pub use action::{ActionRepository, PolicyRepository};
pub use analytics::AnalyticsRepository;
pub use artifact::ArtifactRepository;
pub use entity_history::EntityHistoryRepository;
pub use event::{EnforcementRepository, EventRepository};
pub use execution::ExecutionRepository;
pub use identity::{IdentityRepository, PermissionAssignmentRepository, PermissionSetRepository};