From 7ee3604eb1e97f4e126bac2cb30c0d56c5222294 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Wed, 25 Feb 2026 23:40:50 -0600 Subject: [PATCH] [WIP] change capture --- AGENTS.md | 14 +- docker-compose.yaml | 2 +- docs/plans/timescaledb-entity-history.md | 252 +++++++++ ...60226100000_entity_history_timescaledb.sql | 487 ++++++++++++++++++ 4 files changed, 748 insertions(+), 7 deletions(-) create mode 100644 docs/plans/timescaledb-entity-history.md create mode 100644 migrations/20260226100000_entity_history_timescaledb.sql diff --git a/AGENTS.md b/AGENTS.md index 12dcc47..f8f1eb5 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -19,7 +19,7 @@ When this project reaches v1.0 or gets its first production deployment, this sec ## Languages & Core Technologies - **Primary Language**: Rust 2021 edition -- **Database**: PostgreSQL 14+ (primary data store + LISTEN/NOTIFY pub/sub) +- **Database**: PostgreSQL 16+ with TimescaleDB 2.17+ (primary data store + LISTEN/NOTIFY pub/sub + time-series history) - **Message Queue**: RabbitMQ 3.12+ (via lapin) - **Cache**: Redis 7.0+ (optional) - **Web UI**: TypeScript + React 19 + Vite @@ -70,7 +70,7 @@ attune/ - **Default user**: `test@attune.local` / `TestPass123!` (auto-created) **Services**: -- **Infrastructure**: postgres, rabbitmq, redis +- **Infrastructure**: postgres (TimescaleDB), rabbitmq, redis - **Init** (run-once): migrations, init-user, init-packs - **Application**: api (8080), executor, worker-{shell,python,node,full}, sensor, notifier (8081), web (3000) @@ -211,8 +211,9 @@ Enforcement created → Execution scheduled → Worker executes Action - **Enums**: PostgreSQL enum types mapped with `#[sqlx(type_name = "...")]` - **Workflow Tasks**: Stored as JSONB in `execution.workflow_task` (consolidated from separate table 2026-01-27) - **FK ON DELETE Policy**: Historical records (executions, events, enforcements) use `ON DELETE SET NULL` so they survive entity deletion while preserving text ref fields (`action_ref`, `trigger_ref`, etc.) for auditing. Pack-owned entities (actions, triggers, sensors, rules, runtimes) use `ON DELETE CASCADE` from pack. Workflow executions cascade-delete with their workflow definition. +- **Entity History Tracking (TimescaleDB)**: Append-only `_history` hypertables track field-level changes to `execution`, `worker`, `enforcement`, and `event` tables. Populated by PostgreSQL `AFTER INSERT OR UPDATE OR DELETE` triggers — no Rust code changes needed for recording. Uses JSONB diff format (`old_values`/`new_values`) with a `changed_fields TEXT[]` column for efficient filtering. Worker heartbeat-only updates are excluded. See `docs/plans/timescaledb-entity-history.md` for full design. - **Nullable FK Fields**: `rule.action` and `rule.trigger` are nullable (`Option` in Rust) — a rule with NULL action/trigger is non-functional but preserved for traceability. `execution.action`, `execution.parent`, `execution.enforcement`, and `event.source` are also nullable. -**Table Count**: 18 tables total in the schema (including `runtime_version`) +**Table Count**: 22 tables total in the schema (including `runtime_version` and 4 `*_history` hypertables) - **Pack Component Loading Order**: Runtimes → Triggers → Actions → Sensors (dependency order). Both `PackComponentLoader` (Rust) and `load_core_pack.py` (Python) follow this order. ### Pack File Loading & Action Execution @@ -480,6 +481,7 @@ When reporting, ask: "Should I fix this first or continue with [original task]?" 14. **REMEMBER** to regenerate SQLx metadata after schema-related changes: `cargo sqlx prepare` 15. **REMEMBER** packs are volumes - update with restart, not rebuild 16. **REMEMBER** to build pack binaries separately: `./scripts/build-pack-binaries.sh` +17. **REMEMBER** when adding mutable columns to `execution`, `worker`, `enforcement`, or `event`, add a corresponding `IS DISTINCT FROM` check to the entity's history trigger function in the TimescaleDB migration ## Deployment - **Target**: Distributed deployment with separate service instances @@ -490,9 +492,9 @@ When reporting, ask: "Should I fix this first or continue with [original task]?" - **Web UI**: Static files served separately or via API service ## Current Development Status -- ✅ **Complete**: Database migrations (18 tables), API service (most endpoints), common library, message queue infrastructure, repository layer, JWT auth, CLI tool, Web UI (basic + workflow builder), Executor service (core functionality), Worker service (shell/Python execution), Runtime version data model, constraint matching, worker version selection pipeline, version verification at startup, per-version environment isolation +- ✅ **Complete**: Database migrations (22 tables), API service (most endpoints), common library, message queue infrastructure, repository layer, JWT auth, CLI tool, Web UI (basic + workflow builder), Executor service (core functionality), Worker service (shell/Python execution), Runtime version data model, constraint matching, worker version selection pipeline, version verification at startup, per-version environment isolation, TimescaleDB entity history tracking (execution, worker, enforcement, event) - 🔄 **In Progress**: Sensor service, advanced workflow features, Python runtime dependency management, API/UI endpoints for runtime version management -- 📋 **Planned**: Notifier service, execution policies, monitoring, pack registry system +- 📋 **Planned**: Notifier service, execution policies, monitoring, pack registry system, history API endpoints & UI, continuous aggregates for dashboards ## Quick Reference @@ -605,7 +607,7 @@ When updating, be surgical - modify only the affected sections rather than rewri |docs/migrations:{workflow-task-execution-consolidation.md} |docs/packs:{PACK_TESTING.md,QUICKREF-git-installation.md,core-pack-integration.md,pack-install-testing.md,pack-installation-git.md,pack-registry-cicd.md,pack-registry-spec.md,pack-structure.md,pack-testing-framework.md} |docs/performance:{QUICKREF-performance-optimization.md,log-size-limits.md,performance-analysis-workflow-lists.md,performance-before-after-results.md,performance-context-cloning-diagram.md} -|docs/plans:{schema-per-test-refactor.md} +|docs/plans:{schema-per-test-refactor.md,timescaledb-entity-history.md} |docs/sensors:{CHECKLIST-sensor-worker-registration.md,COMPLETION-sensor-worker-registration.md,SUMMARY-database-driven-detection.md,database-driven-runtime-detection.md,native-runtime.md,sensor-authentication-overview.md,sensor-interface.md,sensor-lifecycle-management.md,sensor-runtime.md,sensor-service-setup.md,sensor-worker-registration.md} |docs/testing:{e2e-test-plan.md,running-tests.md,schema-per-test.md,test-user-setup.md,testing-authentication.md,testing-dashboard-rules.md,testing-status.md} |docs/web-ui:{web-ui-pack-testing.md,websocket-usage.md} diff --git a/docker-compose.yaml b/docker-compose.yaml index d4ccb11..3e8f043 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -16,7 +16,7 @@ services: # ============================================================================ postgres: - image: postgres:16-alpine + image: timescale/timescaledb:2.17.2-pg16 container_name: attune-postgres environment: POSTGRES_USER: attune diff --git a/docs/plans/timescaledb-entity-history.md b/docs/plans/timescaledb-entity-history.md new file mode 100644 index 0000000..7c8b1f8 --- /dev/null +++ b/docs/plans/timescaledb-entity-history.md @@ -0,0 +1,252 @@ +# TimescaleDB Entity History Tracking + +## Overview + +This plan describes the addition of **TimescaleDB-backed history tables** to track field-level changes on key operational entities in Attune. The goal is to provide an immutable audit log and time-series analytics for status transitions and other field changes, without modifying existing operational tables or application code. + +## Motivation + +Currently, when a field changes on an operational table (e.g., `execution.status` moves from `requested` → `running`), the row is updated in place and only the current state is retained. The `updated` timestamp is bumped, but there is no record of: + +- What the previous value was +- When each transition occurred +- How long an entity spent in each state +- Historical trends (e.g., failure rate over time, execution throughput per hour) + +This data is essential for operational dashboards, debugging, SLA tracking, and capacity planning. + +## Technology Choice: TimescaleDB + +[TimescaleDB](https://www.timescale.com/) is a PostgreSQL extension that adds time-series capabilities: + +- **Hypertables**: Automatic time-based partitioning (chunks by hour/day/week) +- **Compression**: 10-20x storage reduction on aged-out chunks +- **Retention policies**: Automatic data expiry +- **Continuous aggregates**: Auto-refreshing materialized views for dashboard rollups +- **`time_bucket()` function**: Efficient time-series grouping + +It runs as an extension inside the existing PostgreSQL instance — no additional infrastructure. + +## Design Decisions + +### Separate history tables, not hypertable conversions + +The operational tables (`execution`, `worker`, `enforcement`, `event`) will **NOT** be converted to hypertables. Reasons: + +1. **UNIQUE constraints on hypertables must include the time partitioning column** — this would break `worker.name UNIQUE`, PK references, etc. +2. **Foreign keys INTO hypertables are not supported** — `execution.parent` self-references `execution(id)`, `enforcement` references `rule`, etc. +3. **UPDATE-heavy tables are a poor fit for hypertables** — hypertables are optimized for append-only INSERT workloads. + +Instead, each tracked entity gets a companion `
_history` hypertable that receives append-only change records. + +### JSONB diff format (not full row snapshots) + +Each history row captures only the fields that changed, stored as JSONB: + +- **Compact**: A status change is `{"status": "running"}`, not a copy of the entire row including large `result`/`config` JSONB blobs. +- **Schema-decoupled**: Adding a column to the source table requires no changes to the history table structure — only a new `IS DISTINCT FROM` check in the trigger function. +- **Answering "what changed?"**: Directly readable without diffing two full snapshots. + +A `changed_fields TEXT[]` column enables efficient partial indexes and GIN-indexed queries for filtering by field name. + +### PostgreSQL triggers for population + +History rows are written by `AFTER INSERT OR UPDATE OR DELETE` triggers on the operational tables. This ensures: + +- Every change is captured regardless of which service (API, executor, worker) made it. +- No Rust application code changes are needed for recording. +- It's impossible to miss a change path. + +### Worker heartbeats excluded + +`worker.last_heartbeat` is updated frequently by the heartbeat loop and is high-volume/low-value for history purposes. The trigger function explicitly excludes pure heartbeat-only updates. If heartbeat analytics are needed later, a dedicated lightweight table can be added. + +## Tracked Entities + +| Entity | History Table | `entity_ref` Source | Excluded Fields | +|--------|--------------|---------------------|-----------------| +| `execution` | `execution_history` | `action_ref` | *(none)* | +| `worker` | `worker_history` | `name` | `last_heartbeat` (when sole change) | +| `enforcement` | `enforcement_history` | `rule_ref` | *(none)* | +| `event` | `event_history` | `trigger_ref` | *(none)* | + +## Table Schema + +All four history tables share the same structure: + +```sql +CREATE TABLE _history ( + time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + operation TEXT NOT NULL, -- 'INSERT', 'UPDATE', 'DELETE' + entity_id BIGINT NOT NULL, -- PK of the source row + entity_ref TEXT, -- denormalized ref/name for JOIN-free queries + changed_fields TEXT[] NOT NULL DEFAULT '{}', + old_values JSONB, -- previous values of changed fields + new_values JSONB -- new values of changed fields +); +``` + +Column details: + +| Column | Purpose | +|--------|---------| +| `time` | Hypertable partitioning dimension; when the change occurred | +| `operation` | `INSERT`, `UPDATE`, or `DELETE` | +| `entity_id` | The source row's `id` (conceptual FK, not enforced on hypertable) | +| `entity_ref` | Denormalized human-readable identifier for efficient filtering | +| `changed_fields` | Array of field names that changed — enables partial indexes and GIN queries | +| `old_values` | JSONB of previous field values (NULL for INSERT) | +| `new_values` | JSONB of new field values (NULL for DELETE) | + +## Hypertable Configuration + +| History Table | Chunk Interval | Rationale | +|---------------|---------------|-----------| +| `execution_history` | 1 day | Highest expected volume | +| `enforcement_history` | 1 day | Correlated with execution volume | +| `event_history` | 1 day | Can be high volume from active sensors | +| `worker_history` | 7 days | Low volume (status changes are infrequent) | + +## Indexes + +Each history table gets: + +1. **Entity lookup**: `(entity_id, time DESC)` — "show me history for entity X" +2. **Status change filter**: Partial index on `time DESC` where `'status' = ANY(changed_fields)` — "show me all status changes" +3. **Field filter**: GIN index on `changed_fields` — flexible field-based queries +4. **Ref-based lookup**: `(entity_ref, time DESC)` — "show me all execution history for action `core.http_request`" + +## Trigger Functions + +Each tracked table gets a dedicated trigger function that: + +1. On `INSERT`: Records the operation with key initial field values in `new_values`. +2. On `DELETE`: Records the operation with entity identifiers. +3. On `UPDATE`: Checks each mutable field with `IS DISTINCT FROM`. If any fields changed, records the old and new values. If nothing changed, no history row is written. + +### Fields tracked per entity + +**execution**: `status`, `result`, `executor`, `workflow_task`, `env_vars` + +**worker**: `name`, `status`, `capabilities`, `meta`, `host`, `port` (excludes `last_heartbeat` when it's the only change) + +**enforcement**: `status`, `payload` + +**event**: `config`, `payload` + +## Compression Policies + +Applied after data leaves the "hot" query window: + +| History Table | Compress After | `segmentby` | `orderby` | +|---------------|---------------|-------------|-----------| +| `execution_history` | 7 days | `entity_id` | `time DESC` | +| `worker_history` | 7 days | `entity_id` | `time DESC` | +| `enforcement_history` | 7 days | `entity_id` | `time DESC` | +| `event_history` | 7 days | `entity_id` | `time DESC` | + +`segmentby = entity_id` ensures that "show me history for entity X" queries are fast even on compressed chunks. + +## Retention Policies + +| History Table | Retain For | Rationale | +|---------------|-----------|-----------| +| `execution_history` | 90 days | Primary operational data | +| `enforcement_history` | 90 days | Tied to execution lifecycle | +| `event_history` | 30 days | High volume, less long-term value | +| `worker_history` | 180 days | Low volume, useful for capacity trends | + +## Continuous Aggregates (Future) + +These are not part of the initial migration but are natural follow-ons: + +```sql +-- Execution status transitions per hour (for dashboards) +CREATE MATERIALIZED VIEW execution_status_transitions_hourly +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 hour', time) AS bucket, + entity_ref AS action_ref, + new_values->>'status' AS new_status, + COUNT(*) AS transition_count +FROM execution_history +WHERE 'status' = ANY(changed_fields) +GROUP BY bucket, entity_ref, new_values->>'status' +WITH NO DATA; + +-- Event volume per hour by trigger (for throughput monitoring) +CREATE MATERIALIZED VIEW event_volume_hourly +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 hour', time) AS bucket, + entity_ref AS trigger_ref, + COUNT(*) AS event_count +FROM event_history +WHERE operation = 'INSERT' +GROUP BY bucket, entity_ref +WITH NO DATA; +``` + +## Infrastructure Changes + +### Docker Compose + +Change the PostgreSQL image from `postgres:16-alpine` to `timescale/timescaledb:latest-pg16` (or a pinned version like `timescale/timescaledb:2.17.2-pg16`). + +No other infrastructure changes are needed — TimescaleDB is a drop-in extension. + +### Local Development + +For local development (non-Docker), TimescaleDB must be installed as a PostgreSQL extension. On macOS: `brew install timescaledb`. On Linux: follow [TimescaleDB install docs](https://docs.timescale.com/self-hosted/latest/install/). + +### Testing + +The schema-per-test isolation pattern works with TimescaleDB. The `timescaledb` extension is database-level (created once via `CREATE EXTENSION`), and hypertables in different schemas are independent. The test schema setup requires no changes — `create_hypertable()` operates within the active `search_path`. + +### SQLx Compatibility + +No special SQLx support is needed. History tables are standard PostgreSQL tables from SQLx's perspective. `INSERT`, `SELECT`, `time_bucket()`, and array operators all work as regular SQL. TimescaleDB-specific DDL (`create_hypertable`, `add_compression_policy`, etc.) runs in migrations only. + +## Implementation Scope + +### Phase 1 (this migration) +- [ ] `CREATE EXTENSION IF NOT EXISTS timescaledb` +- [ ] Create four `_history` tables +- [ ] Convert to hypertables with `create_hypertable()` +- [ ] Create indexes (entity lookup, status change filter, GIN on changed_fields, ref lookup) +- [ ] Create trigger functions for `execution`, `worker`, `enforcement`, `event` +- [ ] Attach triggers to operational tables +- [ ] Configure compression policies +- [ ] Configure retention policies + +### Phase 2 (future — API & UI) +- [ ] History repository in `crates/common/src/repositories/` +- [ ] API endpoints (e.g., `GET /api/v1/executions/:id/history`) +- [ ] Web UI history panel on entity detail pages +- [ ] Continuous aggregates for dashboards + +### Phase 3 (future — analytics) +- [ ] Dashboard widgets showing execution throughput, failure rates, worker health trends +- [ ] Configurable retention periods via admin settings +- [ ] Export/archival to external storage before retention expiry + +## Risks & Mitigations + +| Risk | Mitigation | +|------|-----------| +| Trigger overhead on hot paths | Triggers are lightweight (JSONB build + single INSERT into an append-optimized hypertable). Benchmark if execution throughput exceeds 1K/sec. | +| Storage growth | Compression (7-day delay) + retention policies bound storage automatically. | +| JSONB query performance | Partial indexes on `changed_fields` avoid full scans. Continuous aggregates pre-compute hot queries. | +| Schema drift (new columns not tracked) | When adding mutable columns to tracked tables, add a corresponding `IS DISTINCT FROM` check to the trigger function. Document this in the pitfalls section of AGENTS.md. | +| Test compatibility | TimescaleDB extension is database-level; schema-per-test isolation is unaffected. Verify in CI. | + +## Docker Image Pinning + +For reproducibility, pin the TimescaleDB image version rather than using `latest`: + +```yaml +postgres: + image: timescale/timescaledb:2.17.2-pg16 +``` + +Update the pin periodically as new stable versions are released. \ No newline at end of file diff --git a/migrations/20260226100000_entity_history_timescaledb.sql b/migrations/20260226100000_entity_history_timescaledb.sql new file mode 100644 index 0000000..f576395 --- /dev/null +++ b/migrations/20260226100000_entity_history_timescaledb.sql @@ -0,0 +1,487 @@ +-- Migration: TimescaleDB Entity History Tracking +-- Description: Creates append-only history hypertables for execution, worker, enforcement, +-- and event tables. Uses JSONB diff format to track field-level changes via +-- PostgreSQL triggers. See docs/plans/timescaledb-entity-history.md for full design. +-- Version: 20260226100000 + +-- ============================================================================ +-- EXTENSION +-- ============================================================================ + +CREATE EXTENSION IF NOT EXISTS timescaledb; + +-- ============================================================================ +-- HISTORY TABLES +-- ============================================================================ + +-- ---------------------------------------------------------------------------- +-- execution_history +-- ---------------------------------------------------------------------------- + +CREATE TABLE execution_history ( + time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + operation TEXT NOT NULL, + entity_id BIGINT NOT NULL, + entity_ref TEXT, + changed_fields TEXT[] NOT NULL DEFAULT '{}', + old_values JSONB, + new_values JSONB +); + +SELECT create_hypertable('execution_history', 'time', + chunk_time_interval => INTERVAL '1 day'); + +CREATE INDEX idx_execution_history_entity + ON execution_history (entity_id, time DESC); + +CREATE INDEX idx_execution_history_entity_ref + ON execution_history (entity_ref, time DESC); + +CREATE INDEX idx_execution_history_status_changes + ON execution_history (time DESC) + WHERE 'status' = ANY(changed_fields); + +CREATE INDEX idx_execution_history_changed_fields + ON execution_history USING GIN (changed_fields); + +COMMENT ON TABLE execution_history IS 'Append-only history of field-level changes to the execution table (TimescaleDB hypertable)'; +COMMENT ON COLUMN execution_history.time IS 'When the change occurred (hypertable partitioning dimension)'; +COMMENT ON COLUMN execution_history.operation IS 'INSERT, UPDATE, or DELETE'; +COMMENT ON COLUMN execution_history.entity_id IS 'execution.id of the changed row'; +COMMENT ON COLUMN execution_history.entity_ref IS 'Denormalized action_ref for JOIN-free queries'; +COMMENT ON COLUMN execution_history.changed_fields IS 'Array of field names that changed (empty for INSERT/DELETE)'; +COMMENT ON COLUMN execution_history.old_values IS 'Previous values of changed fields (NULL for INSERT)'; +COMMENT ON COLUMN execution_history.new_values IS 'New values of changed fields (NULL for DELETE)'; + +-- ---------------------------------------------------------------------------- +-- worker_history +-- ---------------------------------------------------------------------------- + +CREATE TABLE worker_history ( + time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + operation TEXT NOT NULL, + entity_id BIGINT NOT NULL, + entity_ref TEXT, + changed_fields TEXT[] NOT NULL DEFAULT '{}', + old_values JSONB, + new_values JSONB +); + +SELECT create_hypertable('worker_history', 'time', + chunk_time_interval => INTERVAL '7 days'); + +CREATE INDEX idx_worker_history_entity + ON worker_history (entity_id, time DESC); + +CREATE INDEX idx_worker_history_entity_ref + ON worker_history (entity_ref, time DESC); + +CREATE INDEX idx_worker_history_status_changes + ON worker_history (time DESC) + WHERE 'status' = ANY(changed_fields); + +CREATE INDEX idx_worker_history_changed_fields + ON worker_history USING GIN (changed_fields); + +COMMENT ON TABLE worker_history IS 'Append-only history of field-level changes to the worker table (TimescaleDB hypertable)'; +COMMENT ON COLUMN worker_history.entity_ref IS 'Denormalized worker name for JOIN-free queries'; + +-- ---------------------------------------------------------------------------- +-- enforcement_history +-- ---------------------------------------------------------------------------- + +CREATE TABLE enforcement_history ( + time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + operation TEXT NOT NULL, + entity_id BIGINT NOT NULL, + entity_ref TEXT, + changed_fields TEXT[] NOT NULL DEFAULT '{}', + old_values JSONB, + new_values JSONB +); + +SELECT create_hypertable('enforcement_history', 'time', + chunk_time_interval => INTERVAL '1 day'); + +CREATE INDEX idx_enforcement_history_entity + ON enforcement_history (entity_id, time DESC); + +CREATE INDEX idx_enforcement_history_entity_ref + ON enforcement_history (entity_ref, time DESC); + +CREATE INDEX idx_enforcement_history_status_changes + ON enforcement_history (time DESC) + WHERE 'status' = ANY(changed_fields); + +CREATE INDEX idx_enforcement_history_changed_fields + ON enforcement_history USING GIN (changed_fields); + +COMMENT ON TABLE enforcement_history IS 'Append-only history of field-level changes to the enforcement table (TimescaleDB hypertable)'; +COMMENT ON COLUMN enforcement_history.entity_ref IS 'Denormalized rule_ref for JOIN-free queries'; + +-- ---------------------------------------------------------------------------- +-- event_history +-- ---------------------------------------------------------------------------- + +CREATE TABLE event_history ( + time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + operation TEXT NOT NULL, + entity_id BIGINT NOT NULL, + entity_ref TEXT, + changed_fields TEXT[] NOT NULL DEFAULT '{}', + old_values JSONB, + new_values JSONB +); + +SELECT create_hypertable('event_history', 'time', + chunk_time_interval => INTERVAL '1 day'); + +CREATE INDEX idx_event_history_entity + ON event_history (entity_id, time DESC); + +CREATE INDEX idx_event_history_entity_ref + ON event_history (entity_ref, time DESC); + +CREATE INDEX idx_event_history_changed_fields + ON event_history USING GIN (changed_fields); + +COMMENT ON TABLE event_history IS 'Append-only history of field-level changes to the event table (TimescaleDB hypertable)'; +COMMENT ON COLUMN event_history.entity_ref IS 'Denormalized trigger_ref for JOIN-free queries'; + +-- ============================================================================ +-- TRIGGER FUNCTIONS +-- ============================================================================ + +-- ---------------------------------------------------------------------------- +-- execution history trigger +-- Tracked fields: status, result, executor, workflow_task, env_vars +-- ---------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION record_execution_history() +RETURNS TRIGGER AS $$ +DECLARE + changed TEXT[] := '{}'; + old_vals JSONB := '{}'; + new_vals JSONB := '{}'; +BEGIN + IF TG_OP = 'INSERT' THEN + INSERT INTO execution_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'INSERT', NEW.id, NEW.action_ref, '{}', NULL, + jsonb_build_object( + 'status', NEW.status, + 'action_ref', NEW.action_ref, + 'executor', NEW.executor, + 'parent', NEW.parent, + 'enforcement', NEW.enforcement + )); + RETURN NEW; + END IF; + + IF TG_OP = 'DELETE' THEN + INSERT INTO execution_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'DELETE', OLD.id, OLD.action_ref, '{}', NULL, NULL); + RETURN OLD; + END IF; + + -- UPDATE: detect which fields changed + IF OLD.status IS DISTINCT FROM NEW.status THEN + changed := changed || 'status'; + old_vals := old_vals || jsonb_build_object('status', OLD.status); + new_vals := new_vals || jsonb_build_object('status', NEW.status); + END IF; + + IF OLD.result IS DISTINCT FROM NEW.result THEN + changed := changed || 'result'; + old_vals := old_vals || jsonb_build_object('result', OLD.result); + new_vals := new_vals || jsonb_build_object('result', NEW.result); + END IF; + + IF OLD.executor IS DISTINCT FROM NEW.executor THEN + changed := changed || 'executor'; + old_vals := old_vals || jsonb_build_object('executor', OLD.executor); + new_vals := new_vals || jsonb_build_object('executor', NEW.executor); + END IF; + + IF OLD.workflow_task IS DISTINCT FROM NEW.workflow_task THEN + changed := changed || 'workflow_task'; + old_vals := old_vals || jsonb_build_object('workflow_task', OLD.workflow_task); + new_vals := new_vals || jsonb_build_object('workflow_task', NEW.workflow_task); + END IF; + + IF OLD.env_vars IS DISTINCT FROM NEW.env_vars THEN + changed := changed || 'env_vars'; + old_vals := old_vals || jsonb_build_object('env_vars', OLD.env_vars); + new_vals := new_vals || jsonb_build_object('env_vars', NEW.env_vars); + END IF; + + -- Only record if something actually changed + IF array_length(changed, 1) > 0 THEN + INSERT INTO execution_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'UPDATE', NEW.id, NEW.action_ref, changed, old_vals, new_vals); + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION record_execution_history() IS 'Records field-level changes to execution table in execution_history hypertable'; + +-- ---------------------------------------------------------------------------- +-- worker history trigger +-- Tracked fields: name, status, capabilities, meta, host, port +-- Excludes: last_heartbeat when it is the only field that changed +-- ---------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION record_worker_history() +RETURNS TRIGGER AS $$ +DECLARE + changed TEXT[] := '{}'; + old_vals JSONB := '{}'; + new_vals JSONB := '{}'; +BEGIN + IF TG_OP = 'INSERT' THEN + INSERT INTO worker_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'INSERT', NEW.id, NEW.name, '{}', NULL, + jsonb_build_object( + 'name', NEW.name, + 'worker_type', NEW.worker_type, + 'worker_role', NEW.worker_role, + 'status', NEW.status, + 'host', NEW.host, + 'port', NEW.port + )); + RETURN NEW; + END IF; + + IF TG_OP = 'DELETE' THEN + INSERT INTO worker_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'DELETE', OLD.id, OLD.name, '{}', NULL, NULL); + RETURN OLD; + END IF; + + -- UPDATE: detect which fields changed + IF OLD.name IS DISTINCT FROM NEW.name THEN + changed := changed || 'name'; + old_vals := old_vals || jsonb_build_object('name', OLD.name); + new_vals := new_vals || jsonb_build_object('name', NEW.name); + END IF; + + IF OLD.status IS DISTINCT FROM NEW.status THEN + changed := changed || 'status'; + old_vals := old_vals || jsonb_build_object('status', OLD.status); + new_vals := new_vals || jsonb_build_object('status', NEW.status); + END IF; + + IF OLD.capabilities IS DISTINCT FROM NEW.capabilities THEN + changed := changed || 'capabilities'; + old_vals := old_vals || jsonb_build_object('capabilities', OLD.capabilities); + new_vals := new_vals || jsonb_build_object('capabilities', NEW.capabilities); + END IF; + + IF OLD.meta IS DISTINCT FROM NEW.meta THEN + changed := changed || 'meta'; + old_vals := old_vals || jsonb_build_object('meta', OLD.meta); + new_vals := new_vals || jsonb_build_object('meta', NEW.meta); + END IF; + + IF OLD.host IS DISTINCT FROM NEW.host THEN + changed := changed || 'host'; + old_vals := old_vals || jsonb_build_object('host', OLD.host); + new_vals := new_vals || jsonb_build_object('host', NEW.host); + END IF; + + IF OLD.port IS DISTINCT FROM NEW.port THEN + changed := changed || 'port'; + old_vals := old_vals || jsonb_build_object('port', OLD.port); + new_vals := new_vals || jsonb_build_object('port', NEW.port); + END IF; + + -- Only record if something besides last_heartbeat changed. + -- Pure heartbeat-only updates are excluded to avoid high-volume noise. + IF array_length(changed, 1) > 0 THEN + INSERT INTO worker_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'UPDATE', NEW.id, NEW.name, changed, old_vals, new_vals); + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION record_worker_history() IS 'Records field-level changes to worker table in worker_history hypertable. Excludes heartbeat-only updates.'; + +-- ---------------------------------------------------------------------------- +-- enforcement history trigger +-- Tracked fields: status, payload +-- ---------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION record_enforcement_history() +RETURNS TRIGGER AS $$ +DECLARE + changed TEXT[] := '{}'; + old_vals JSONB := '{}'; + new_vals JSONB := '{}'; +BEGIN + IF TG_OP = 'INSERT' THEN + INSERT INTO enforcement_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'INSERT', NEW.id, NEW.rule_ref, '{}', NULL, + jsonb_build_object( + 'rule_ref', NEW.rule_ref, + 'trigger_ref', NEW.trigger_ref, + 'status', NEW.status, + 'condition', NEW.condition, + 'event', NEW.event + )); + RETURN NEW; + END IF; + + IF TG_OP = 'DELETE' THEN + INSERT INTO enforcement_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'DELETE', OLD.id, OLD.rule_ref, '{}', NULL, NULL); + RETURN OLD; + END IF; + + -- UPDATE: detect which fields changed + IF OLD.status IS DISTINCT FROM NEW.status THEN + changed := changed || 'status'; + old_vals := old_vals || jsonb_build_object('status', OLD.status); + new_vals := new_vals || jsonb_build_object('status', NEW.status); + END IF; + + IF OLD.payload IS DISTINCT FROM NEW.payload THEN + changed := changed || 'payload'; + old_vals := old_vals || jsonb_build_object('payload', OLD.payload); + new_vals := new_vals || jsonb_build_object('payload', NEW.payload); + END IF; + + -- Only record if something actually changed + IF array_length(changed, 1) > 0 THEN + INSERT INTO enforcement_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'UPDATE', NEW.id, NEW.rule_ref, changed, old_vals, new_vals); + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION record_enforcement_history() IS 'Records field-level changes to enforcement table in enforcement_history hypertable'; + +-- ---------------------------------------------------------------------------- +-- event history trigger +-- Tracked fields: config, payload +-- ---------------------------------------------------------------------------- + +CREATE OR REPLACE FUNCTION record_event_history() +RETURNS TRIGGER AS $$ +DECLARE + changed TEXT[] := '{}'; + old_vals JSONB := '{}'; + new_vals JSONB := '{}'; +BEGIN + IF TG_OP = 'INSERT' THEN + INSERT INTO event_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'INSERT', NEW.id, NEW.trigger_ref, '{}', NULL, + jsonb_build_object( + 'trigger_ref', NEW.trigger_ref, + 'source', NEW.source, + 'source_ref', NEW.source_ref, + 'rule', NEW.rule, + 'rule_ref', NEW.rule_ref + )); + RETURN NEW; + END IF; + + IF TG_OP = 'DELETE' THEN + INSERT INTO event_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'DELETE', OLD.id, OLD.trigger_ref, '{}', NULL, NULL); + RETURN OLD; + END IF; + + -- UPDATE: detect which fields changed + IF OLD.config IS DISTINCT FROM NEW.config THEN + changed := changed || 'config'; + old_vals := old_vals || jsonb_build_object('config', OLD.config); + new_vals := new_vals || jsonb_build_object('config', NEW.config); + END IF; + + IF OLD.payload IS DISTINCT FROM NEW.payload THEN + changed := changed || 'payload'; + old_vals := old_vals || jsonb_build_object('payload', OLD.payload); + new_vals := new_vals || jsonb_build_object('payload', NEW.payload); + END IF; + + -- Only record if something actually changed + IF array_length(changed, 1) > 0 THEN + INSERT INTO event_history (time, operation, entity_id, entity_ref, changed_fields, old_values, new_values) + VALUES (NOW(), 'UPDATE', NEW.id, NEW.trigger_ref, changed, old_vals, new_vals); + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION record_event_history() IS 'Records field-level changes to event table in event_history hypertable'; + +-- ============================================================================ +-- ATTACH TRIGGERS TO OPERATIONAL TABLES +-- ============================================================================ + +CREATE TRIGGER execution_history_trigger + AFTER INSERT OR UPDATE OR DELETE ON execution + FOR EACH ROW + EXECUTE FUNCTION record_execution_history(); + +CREATE TRIGGER worker_history_trigger + AFTER INSERT OR UPDATE OR DELETE ON worker + FOR EACH ROW + EXECUTE FUNCTION record_worker_history(); + +CREATE TRIGGER enforcement_history_trigger + AFTER INSERT OR UPDATE OR DELETE ON enforcement + FOR EACH ROW + EXECUTE FUNCTION record_enforcement_history(); + +CREATE TRIGGER event_history_trigger + AFTER INSERT OR UPDATE OR DELETE ON event + FOR EACH ROW + EXECUTE FUNCTION record_event_history(); + +-- ============================================================================ +-- COMPRESSION POLICIES +-- ============================================================================ + +ALTER TABLE execution_history SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'entity_id', + timescaledb.compress_orderby = 'time DESC' +); +SELECT add_compression_policy('execution_history', INTERVAL '7 days'); + +ALTER TABLE worker_history SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'entity_id', + timescaledb.compress_orderby = 'time DESC' +); +SELECT add_compression_policy('worker_history', INTERVAL '7 days'); + +ALTER TABLE enforcement_history SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'entity_id', + timescaledb.compress_orderby = 'time DESC' +); +SELECT add_compression_policy('enforcement_history', INTERVAL '7 days'); + +ALTER TABLE event_history SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'entity_id', + timescaledb.compress_orderby = 'time DESC' +); +SELECT add_compression_policy('event_history', INTERVAL '7 days'); + +-- ============================================================================ +-- RETENTION POLICIES +-- ============================================================================ + +SELECT add_retention_policy('execution_history', INTERVAL '90 days'); +SELECT add_retention_policy('enforcement_history', INTERVAL '90 days'); +SELECT add_retention_policy('event_history', INTERVAL '30 days'); +SELECT add_retention_policy('worker_history', INTERVAL '180 days');