diff --git a/AGENTS.md b/AGENTS.md
index eb69f35..ee6cab6 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -218,10 +218,10 @@ Completion listener advances workflow → Schedules successor tasks → Complete
- **FK ON DELETE Policy**: Historical records (executions) use `ON DELETE SET NULL` so they survive entity deletion while preserving text ref fields (`action_ref`, `trigger_ref`, etc.) for auditing. The `event`, `enforcement`, and `execution` tables are TimescaleDB hypertables, so they **cannot be the target of FK constraints** — `enforcement.event`, `execution.enforcement`, `inquiry.execution`, `workflow_execution.execution`, `execution.parent`, and `execution.original_execution` are plain BIGINT columns (no FK) and may become dangling references if the referenced row is deleted. Pack-owned entities (actions, triggers, sensors, rules, runtimes) use `ON DELETE CASCADE` from pack. Workflow executions cascade-delete with their workflow definition.
- **Event Table (TimescaleDB Hypertable)**: The `event` table is a TimescaleDB hypertable partitioned on `created` (1-day chunks). Events are **immutable after insert** — there is no `updated` column, no update trigger, and no `Update` repository impl. The `Event` model has no `updated` field. Compression is segmented by `trigger_ref` (after 7 days) and retention is 90 days. The `event_volume_hourly` continuous aggregate queries the `event` table directly.
- **Enforcement Table (TimescaleDB Hypertable)**: The `enforcement` table is a TimescaleDB hypertable partitioned on `created` (1-day chunks). Enforcements are updated **exactly once** — the executor sets `status` from `created` to `processed` or `disabled` within ~1 second of creation, well before the 7-day compression window. The `resolved_at` column (nullable `TIMESTAMPTZ`) records when this transition occurred; it is `NULL` while status is `created`. There is no `updated` column. Compression is segmented by `rule_ref` (after 7 days) and retention is 90 days. The `enforcement_volume_hourly` continuous aggregate queries the `enforcement` table directly.
-- **Execution Table (TimescaleDB Hypertable)**: The `execution` table is a TimescaleDB hypertable partitioned on `created` (1-day chunks). Executions are updated **~4 times** during their lifecycle (requested → scheduled → running → completed/failed), completing within at most ~1 day — well before the 7-day compression window. The `updated` column and its BEFORE UPDATE trigger are preserved (used by timeout monitor and UI). Compression is segmented by `action_ref` (after 7 days) and retention is 90 days. The `execution_volume_hourly` continuous aggregate queries the execution hypertable directly. The `execution_history` hypertable (field-level diffs) and its continuous aggregates (`execution_status_hourly`, `execution_throughput_hourly`) are preserved alongside — they serve complementary purposes (change tracking vs. volume monitoring).
-- **Entity History Tracking (TimescaleDB)**: Append-only `
_history` hypertables track field-level changes to `execution` and `worker` tables. Populated by PostgreSQL `AFTER INSERT OR UPDATE OR DELETE` triggers — no Rust code changes needed for recording. Uses JSONB diff format (`old_values`/`new_values`) with a `changed_fields TEXT[]` column for efficient filtering. Worker heartbeat-only updates are excluded. There are **no `event_history` or `enforcement_history` tables** — events are immutable and enforcements have a single deterministic status transition, so both tables are hypertables themselves. See `docs/plans/timescaledb-entity-history.md` for full design.
+- **Execution Table (TimescaleDB Hypertable)**: The `execution` table is a TimescaleDB hypertable partitioned on `created` (1-day chunks). Executions are updated **~4 times** during their lifecycle (requested → scheduled → running → completed/failed), completing within at most ~1 day — well before the 7-day compression window. The `updated` column and its BEFORE UPDATE trigger are preserved (used by timeout monitor and UI). The `started_at` column (nullable `TIMESTAMPTZ`) records when the worker picked up the execution (status → `running`); it is `NULL` until then. **Duration** in the UI is computed as `updated - started_at` (not `updated - created`) so that queue/scheduling wait time is excluded. Compression is segmented by `action_ref` (after 7 days) and retention is 90 days. The `execution_volume_hourly` continuous aggregate queries the execution hypertable directly. The `execution_history` hypertable (field-level diffs) and its continuous aggregates (`execution_status_hourly`, `execution_throughput_hourly`) are preserved alongside — they serve complementary purposes (change tracking vs. volume monitoring).
+- **Entity History Tracking (TimescaleDB)**: Append-only `_history` hypertables track field-level changes to `execution` and `worker` tables. Populated by PostgreSQL `AFTER INSERT OR UPDATE OR DELETE` triggers — no Rust code changes needed for recording. Uses JSONB diff format (`old_values`/`new_values`) with a `changed_fields TEXT[]` column for efficient filtering. Worker heartbeat-only updates are excluded. There are **no `event_history` or `enforcement_history` tables** — events are immutable and enforcements have a single deterministic status transition, so both tables are hypertables themselves. See `docs/plans/timescaledb-entity-history.md` for full design. The execution history trigger tracks: `status`, `result`, `executor`, `workflow_task`, `env_vars`, `started_at`.
- **History Large-Field Guardrails**: The `execution` history trigger stores a compact **digest summary** instead of the full value for the `result` column (which can be arbitrarily large). The digest is produced by the `_jsonb_digest_summary(JSONB)` helper function and has the shape `{"digest": "md5:", "size": , "type": ""}`. This preserves change-detection semantics while avoiding history table bloat. The full result is always available on the live `execution` row. When adding new large JSONB columns to history triggers, use `_jsonb_digest_summary()` instead of storing the raw value.
-- **Nullable FK Fields**: `rule.action` and `rule.trigger` are nullable (`Option` in Rust) — a rule with NULL action/trigger is non-functional but preserved for traceability. `execution.action`, `execution.parent`, `execution.enforcement`, and `event.source` are also nullable. `enforcement.event` is nullable but has no FK constraint (event is a hypertable). `execution.enforcement` is nullable but has no FK constraint (enforcement is a hypertable). All FK columns on the execution table (`action`, `parent`, `original_execution`, `enforcement`, `executor`, `workflow_def`) have no FK constraints (execution is a hypertable). `inquiry.execution` and `workflow_execution.execution` also have no FK constraints. `enforcement.resolved_at` is nullable — `None` while status is `created`, set when resolved.
+- **Nullable FK Fields**: `rule.action` and `rule.trigger` are nullable (`Option` in Rust) — a rule with NULL action/trigger is non-functional but preserved for traceability. `execution.action`, `execution.parent`, `execution.enforcement`, `execution.started_at`, and `event.source` are also nullable. `enforcement.event` is nullable but has no FK constraint (event is a hypertable). `execution.enforcement` is nullable but has no FK constraint (enforcement is a hypertable). All FK columns on the execution table (`action`, `parent`, `original_execution`, `enforcement`, `executor`, `workflow_def`) have no FK constraints (execution is a hypertable). `inquiry.execution` and `workflow_execution.execution` also have no FK constraints. `enforcement.resolved_at` is nullable — `None` while status is `created`, set when resolved. `execution.started_at` is nullable — `None` until the worker sets status to `running`.
**Table Count**: 20 tables total in the schema (including `runtime_version`, 2 `*_history` hypertables, and the `event`, `enforcement`, + `execution` hypertables)
**Migration Count**: 9 migrations (`000001` through `000009`) — see `migrations/` directory
- **Pack Component Loading Order**: Runtimes → Triggers → Actions → Sensors (dependency order). Both `PackComponentLoader` (Rust) and `load_core_pack.py` (Python) follow this order.
@@ -229,12 +229,12 @@ Completion listener advances workflow → Schedules successor tasks → Complete
### Workflow Execution Orchestration
- **Detection**: The `ExecutionScheduler` checks `action.workflow_def.is_some()` before dispatching to a worker. Workflow actions are orchestrated by the executor, not sent to workers.
- **Orchestration Flow**: Scheduler loads the `WorkflowDefinition`, builds a `TaskGraph`, creates a `workflow_execution` record, marks the parent execution as Running, builds an initial `WorkflowContext` from execution parameters and workflow vars, then dispatches entry-point tasks as child executions via MQ with rendered inputs.
-- **Template Resolution**: Task inputs are rendered through `WorkflowContext.render_json()` before dispatching. Supports `{{ parameters.x }}`, `{{ item }}`, `{{ index }}`, `{{ number_list }}` (direct variable), `{{ task.task_name.field }}`, and function expressions. **Type-preserving**: pure template expressions like `"{{ item }}"` preserve the JSON type (integer `5` stays as `5`, not string `"5"`). Mixed expressions like `"Sleeping for {{ item }} seconds"` remain strings.
+- **Template Resolution**: Task inputs are rendered through `WorkflowContext.render_json()` before dispatching. Uses the expression engine for full operator/function support inside `{{ }}`. Canonical namespaces: `parameters`, `workflow` (mutable vars), `task` (results), `config` (pack config), `keystore` (secrets), `item`, `index`, `system`. Backward-compat aliases: `vars`/`variables` → `workflow`, `tasks` → `task`, bare names → `workflow` fallback. **Type-preserving**: pure template expressions like `"{{ item }}"` preserve the JSON type (integer `5` stays as `5`, not string `"5"`). Mixed expressions like `"Sleeping for {{ item }} seconds"` remain strings.
- **Function Expressions**: `{{ result() }}` returns the last completed task's result. `{{ result().field.subfield }}` navigates into it. `{{ succeeded() }}`, `{{ failed() }}`, `{{ timed_out() }}` return booleans. These are evaluated by `WorkflowContext.try_evaluate_function_call()`.
-- **Publish Directives**: Transition `publish` directives (e.g., `number_list: "{{ result().data.items }}"`) are evaluated when a transition fires. Published variables are persisted to the `workflow_execution.variables` column and available to subsequent tasks. Uses type-preserving rendering so arrays/numbers/booleans retain their types.
+- **Publish Directives**: Transition `publish` directives (e.g., `number_list: "{{ result().data.items }}"`) are evaluated when a transition fires. Published variables are persisted to the `workflow_execution.variables` column and available to subsequent tasks via the `workflow` namespace (e.g., `{{ workflow.number_list }}`). Uses type-preserving rendering so arrays/numbers/booleans retain their types.
- **Child Task Dispatch**: Each workflow task becomes a child execution with the task's actual action ref (e.g., `core.echo`), `workflow_task` metadata linking it to the `workflow_execution` record, and a parent reference to the workflow execution. Child executions re-enter the normal scheduling pipeline, so nested workflows work recursively.
- **with_items Expansion**: Tasks declaring `with_items: "{{ expr }}"` are expanded into child executions. The expression is resolved via the `WorkflowContext` to produce a JSON array, then each item gets its own child execution with `item`/`index` set on the context and `task_index` in `WorkflowTaskMetadata`. Completion tracking waits for ALL sibling items to finish before marking the task as completed/failed and advancing the workflow.
-- **with_items Concurrency Limiting**: When a task declares `concurrency: N`, ALL child execution records are created in the database up front (with fully-rendered inputs), but only the first `N` are published to the message queue. The remaining children stay at `Requested` status in the DB. As each item completes, `advance_workflow` counts in-flight siblings (`scheduling`/`scheduled`/`running`), calculates free slots (`concurrency - in_flight`), and calls `publish_pending_with_items_children()` which queries for `Requested`-status siblings ordered by `task_index` and publishes them. The DB `status = 'requested'` query is the authoritative source of undispatched items — no auxiliary state in workflow variables needed. The task is only marked complete when all siblings reach a terminal state. Without a `concurrency` value, all items are dispatched at once (previous behavior).
+- **with_items Concurrency Limiting**: ALL child execution records are created in the database up front (with fully-rendered inputs), but only the first `N` are published to the message queue where `N` is the task's `concurrency` value (**default: 1**, i.e. serial execution). The remaining children stay at `Requested` status in the DB. As each item completes, `advance_workflow` counts in-flight siblings (`scheduling`/`scheduled`/`running`), calculates free slots (`concurrency - in_flight`), and calls `publish_pending_with_items_children()` which queries for `Requested`-status siblings ordered by `task_index` and publishes them. The DB `status = 'requested'` query is the authoritative source of undispatched items — no auxiliary state in workflow variables needed. The task is only marked complete when all siblings reach a terminal state. To run all items in parallel, explicitly set `concurrency` to the list length or a suitably large number.
- **Advancement**: The `CompletionListener` detects when a completed execution has `workflow_task` metadata and calls `ExecutionScheduler::advance_workflow()`. The scheduler rebuilds the `WorkflowContext` from persisted `workflow_execution.variables` plus all completed child execution results, sets `last_task_outcome`, evaluates transitions (succeeded/failed/always/timed_out/custom with context-based condition evaluation), processes publish directives, schedules successor tasks with rendered inputs, and completes the workflow when all tasks are done.
- **Transition Evaluation**: `succeeded()`, `failed()`, `timed_out()`, and `always` (no condition) are supported. Custom conditions are evaluated via `WorkflowContext.evaluate_condition()` with fallback to fire-on-success if evaluation fails.
- **Legacy Coordinator**: The prototype `WorkflowCoordinator` in `crates/executor/src/workflow/coordinator.rs` is bypassed — it has hardcoded schema prefixes and is not integrated with the MQ pipeline.
@@ -316,7 +316,7 @@ Completion listener advances workflow → Schedules successor tasks → Complete
- **Available at**: `http://localhost:8080` (dev), `/api-spec/openapi.json` for spec
### Common Library (`crates/common`)
-- **Modules**: `models`, `repositories`, `db`, `config`, `error`, `mq`, `crypto`, `utils`, `workflow`, `pack_registry`, `template_resolver`, `version_matching`, `runtime_detection`
+- **Modules**: `models`, `repositories`, `db`, `config`, `error`, `mq`, `crypto`, `utils`, `workflow` (includes `expression` sub-module), `pack_registry`, `template_resolver`, `version_matching`, `runtime_detection`
- **Exports**: Commonly used types re-exported from `lib.rs`
- **Repository Layer**: All DB access goes through repositories in `repositories/`
- **Message Queue**: Abstractions in `mq/` for RabbitMQ communication
@@ -338,6 +338,84 @@ Rule `action_params` support Jinja2-style `{{ source.path }}` templates resolved
- **Integration**: `crates/executor/src/event_processor.rs` calls `resolve_templates()` in `create_enforcement()`
- **IMPORTANT**: The old `trigger.payload.*` syntax was renamed to `event.payload.*` — the payload data comes from the Event, not the Trigger
+### Workflow Expression Engine
+Workflow templates (`{{ expr }}`) support a full expression language for evaluating conditions, computing values, and transforming data. The engine is in `crates/common/src/workflow/expression/` (tokenizer → parser → evaluator) and is integrated into `WorkflowContext` via the `EvalContext` trait.
+
+**Canonical Namespaces** — all data inside `{{ }}` expressions is organised into well-defined, non-overlapping namespaces:
+
+| Namespace | Example | Description |
+|-----------|---------|-------------|
+| `parameters` | `{{ parameters.url }}` | Immutable workflow input parameters |
+| `workflow` | `{{ workflow.counter }}` | Mutable workflow-scoped variables (set via `publish`) |
+| `task` | `{{ task.fetch.result.data }}` | Completed task results keyed by task name |
+| `config` | `{{ config.api_token }}` | Pack configuration values (read-only) |
+| `keystore` | `{{ keystore.secret_key }}` | Encrypted secrets from the key store (read-only) |
+| `item` | `{{ item }}` / `{{ item.name }}` | Current element in a `with_items` loop |
+| `index` | `{{ index }}` | Zero-based iteration index in a `with_items` loop |
+| `system` | `{{ system.workflow_start }}` | System-provided variables |
+
+Backward-compatible aliases (kept for existing workflow definitions):
+- `vars` / `variables` → same as `workflow`
+- `tasks` → same as `task`
+- Bare variable names (e.g. `{{ my_var }}`) resolve against the `workflow` variable store as a last-resort fallback.
+
+**IMPORTANT**: New workflow definitions should always use the canonical namespace names. The `config` and `keystore` namespaces are populated by the scheduler from the pack's `config` JSONB column and decrypted `key` table entries respectively. If not populated, they resolve to `null`.
+
+**Operators** (lowest to highest precedence):
+1. `or` — logical OR (short-circuit)
+2. `and` — logical AND (short-circuit)
+3. `not` — logical NOT (unary)
+4. `==`, `!=`, `<`, `>`, `<=`, `>=`, `in` — comparison & membership
+5. `+`, `-` — addition/subtraction (also string/array concatenation for `+`)
+6. `*`, `/`, `%` — multiplication, division, modulo
+7. Unary `-` — negation
+8. `.field`, `[index]`, `(args)` — postfix access & function calls
+
+**Type Rules**:
+- **No implicit type coercion**: `"3" == 3` → `false`, `"hello" + 5` → error
+- **Int/float cross-comparison allowed**: `3 == 3.0` → `true`
+- **Integer preservation**: `2 + 3` → `5` (int), `2 + 1.5` → `3.5` (float), `10 / 4` → `2.5` (float), `10 / 5` → `2` (int)
+- **Python-like truthiness**: `null`, `false`, `0`, `""`, `[]`, `{}` are falsy
+- **Deep equality**: `==`/`!=` recursively compare objects and arrays
+- **Negative indexing**: `arr[-1]` returns last element
+
+**Built-in Functions**:
+- Type conversion: `string(v)`, `number(v)`, `int(v)`, `bool(v)`
+- Introspection: `type_of(v)`, `length(v)`, `keys(obj)`, `values(obj)`
+- Math: `abs(n)`, `floor(n)`, `ceil(n)`, `round(n)`, `min(a,b)`, `max(a,b)`, `sum(arr)`
+- String: `lower(s)`, `upper(s)`, `trim(s)`, `split(s, sep)`, `join(arr, sep)`, `replace(s, old, new)`, `starts_with(s, prefix)`, `ends_with(s, suffix)`, `match(pattern, s)` (regex)
+- Collection: `contains(haystack, needle)`, `reversed(v)`, `sort(arr)`, `unique(arr)`, `flat(arr)`, `zip(a, b)`, `range(n)` / `range(start, end)`, `slice(v, start, end)`, `index_of(haystack, needle)`, `count(haystack, needle)`, `merge(obj_a, obj_b)`, `chunks(arr, size)`
+- Workflow: `result()`, `succeeded()`, `failed()`, `timed_out()` (resolved via `EvalContext` trait)
+
+**Usage in Conditions** (`when:` on transitions):
+```
+when: "succeeded() and result().code == 200"
+when: "length(workflow.items) > 3 and \"admin\" in workflow.roles"
+when: "not failed()"
+when: "result().status == \"ok\" or result().status == \"accepted\""
+when: "config.retries > 0"
+```
+
+**Usage in Templates** (`{{ expr }}`):
+```
+input:
+ count: "{{ length(workflow.items) }}"
+ greeting: "{{ parameters.first + \" \" + parameters.last }}"
+ doubled: "{{ parameters.x * 2 }}"
+ names: "{{ join(sort(keys(workflow.data)), \", \") }}"
+ auth: "Bearer {{ keystore.api_key }}"
+ endpoint: "{{ config.base_url + \"/api/v1\" }}"
+ prev_output: "{{ task.fetch.result.data.id }}"
+```
+
+**Implementation Files**:
+- `crates/common/src/workflow/expression/mod.rs` — module entry point, `eval_expression()`, `parse_expression()`
+- `crates/common/src/workflow/expression/tokenizer.rs` — lexer
+- `crates/common/src/workflow/expression/parser.rs` — recursive-descent parser
+- `crates/common/src/workflow/expression/evaluator.rs` — AST evaluator, `EvalContext` trait, built-in functions
+- `crates/common/src/workflow/expression/ast.rs` — AST node types (`Expr`, `BinaryOp`, `UnaryOp`)
+- `crates/executor/src/workflow/context.rs` — `WorkflowContext` implements `EvalContext`
+
### Web UI (`web/`)
- **Generated Client**: OpenAPI client auto-generated from API spec
- Run: `npm run generate:api` (requires API running on :8080)
@@ -519,7 +597,7 @@ When reporting, ask: "Should I fix this first or continue with [original task]?"
- **Web UI**: Static files served separately or via API service
## Current Development Status
-- ✅ **Complete**: Database migrations (20 tables, 10 migration files), API service (most endpoints), common library, message queue infrastructure, repository layer, JWT auth, CLI tool, Web UI (basic + workflow builder), Executor service (core functionality + workflow orchestration), Worker service (shell/Python execution), Runtime version data model, constraint matching, worker version selection pipeline, version verification at startup, per-version environment isolation, TimescaleDB entity history tracking (execution, worker), Event, enforcement, and execution tables as TimescaleDB hypertables (time-series with retention/compression), History API endpoints (generic + entity-specific with pagination & filtering), History UI panels on entity detail pages (execution), TimescaleDB continuous aggregates (6 hourly rollup views with auto-refresh policies), Analytics API endpoints (7 endpoints under `/api/v1/analytics/` — dashboard, execution status/throughput/failure-rate, event volume, worker status, enforcement volume), Analytics dashboard widgets (bar charts, stacked status charts, failure rate ring gauge, time range selector), Workflow execution orchestration (scheduler detects workflow actions, creates child task executions, completion listener advances workflow via transitions), Workflow template resolution (type-preserving `{{ }}` rendering in task inputs), Workflow `with_items` expansion (parallel child executions per item), Workflow `with_items` concurrency limiting (sliding-window dispatch with pending items stored in workflow variables), Workflow `publish` directive processing (variable propagation between tasks), Workflow function expressions (`result()`, `succeeded()`, `failed()`, `timed_out()`)
+- ✅ **Complete**: Database migrations (20 tables, 10 migration files), API service (most endpoints), common library, message queue infrastructure, repository layer, JWT auth, CLI tool, Web UI (basic + workflow builder), Executor service (core functionality + workflow orchestration), Worker service (shell/Python execution), Runtime version data model, constraint matching, worker version selection pipeline, version verification at startup, per-version environment isolation, TimescaleDB entity history tracking (execution, worker), Event, enforcement, and execution tables as TimescaleDB hypertables (time-series with retention/compression), History API endpoints (generic + entity-specific with pagination & filtering), History UI panels on entity detail pages (execution), TimescaleDB continuous aggregates (6 hourly rollup views with auto-refresh policies), Analytics API endpoints (7 endpoints under `/api/v1/analytics/` — dashboard, execution status/throughput/failure-rate, event volume, worker status, enforcement volume), Analytics dashboard widgets (bar charts, stacked status charts, failure rate ring gauge, time range selector), Workflow execution orchestration (scheduler detects workflow actions, creates child task executions, completion listener advances workflow via transitions), Workflow template resolution (type-preserving `{{ }}` rendering in task inputs), Workflow `with_items` expansion (parallel child executions per item), Workflow `with_items` concurrency limiting (sliding-window dispatch with pending items stored in workflow variables), Workflow `publish` directive processing (variable propagation between tasks), Workflow function expressions (`result()`, `succeeded()`, `failed()`, `timed_out()`), Workflow expression engine (full arithmetic/comparison/boolean/membership operators, 30+ built-in functions, recursive-descent parser), Canonical workflow namespaces (`parameters`, `workflow`, `task`, `config`, `keystore`, `item`, `index`, `system`)
- 🔄 **In Progress**: Sensor service, advanced workflow features (nested workflow context propagation), Python runtime dependency management, API/UI endpoints for runtime version management
- 📋 **Planned**: Notifier service, execution policies, monitoring, pack registry system, configurable retention periods via admin settings, export/archival to external storage
diff --git a/crates/api/src/dto/event.rs b/crates/api/src/dto/event.rs
index 085a2f7..ef405ce 100644
--- a/crates/api/src/dto/event.rs
+++ b/crates/api/src/dto/event.rs
@@ -319,6 +319,10 @@ pub struct EnforcementQueryParams {
#[param(example = "core.webhook")]
pub trigger_ref: Option,
+ /// Filter by rule reference
+ #[param(example = "core.on_webhook")]
+ pub rule_ref: Option,
+
/// Page number (1-indexed)
#[serde(default = "default_page")]
#[param(example = 1, minimum = 1)]
diff --git a/crates/api/src/dto/execution.rs b/crates/api/src/dto/execution.rs
index 2e0d030..a765858 100644
--- a/crates/api/src/dto/execution.rs
+++ b/crates/api/src/dto/execution.rs
@@ -7,6 +7,7 @@ use utoipa::{IntoParams, ToSchema};
use attune_common::models::enums::ExecutionStatus;
use attune_common::models::execution::WorkflowTaskMetadata;
+use attune_common::repositories::execution::ExecutionWithRefs;
/// Request DTO for creating a manual execution
#[derive(Debug, Clone, Deserialize, ToSchema)]
@@ -63,6 +64,12 @@ pub struct ExecutionResponse {
#[schema(value_type = Object, example = json!({"message_id": "1234567890.123456"}))]
pub result: Option,
+ /// When the execution actually started running (worker picked it up).
+ /// Null if the execution hasn't started running yet.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[schema(example = "2024-01-13T10:31:00Z", nullable = true)]
+ pub started_at: Option>,
+
/// Workflow task metadata (only populated for workflow task executions)
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option, nullable = true)]
@@ -108,6 +115,12 @@ pub struct ExecutionSummary {
#[schema(example = "core.timer")]
pub trigger_ref: Option,
+ /// When the execution actually started running (worker picked it up).
+ /// Null if the execution hasn't started running yet.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ #[schema(example = "2024-01-13T10:31:00Z", nullable = true)]
+ pub started_at: Option>,
+
/// Workflow task metadata (only populated for workflow task executions)
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(value_type = Option, nullable = true)]
@@ -207,6 +220,7 @@ impl From for ExecutionResponse {
result: execution
.result
.map(|r| serde_json::to_value(r).unwrap_or(JsonValue::Null)),
+ started_at: execution.started_at,
workflow_task: execution.workflow_task,
created: execution.created,
updated: execution.updated,
@@ -225,6 +239,7 @@ impl From for ExecutionSummary {
enforcement: execution.enforcement,
rule_ref: None, // Populated separately via enforcement lookup
trigger_ref: None, // Populated separately via enforcement lookup
+ started_at: execution.started_at,
workflow_task: execution.workflow_task,
created: execution.created,
updated: execution.updated,
@@ -232,6 +247,26 @@ impl From for ExecutionSummary {
}
}
+/// Convert from the joined query result (execution + enforcement refs).
+/// `rule_ref` and `trigger_ref` are already populated from the SQL JOIN.
+impl From for ExecutionSummary {
+ fn from(row: ExecutionWithRefs) -> Self {
+ Self {
+ id: row.id,
+ action_ref: row.action_ref,
+ status: row.status,
+ parent: row.parent,
+ enforcement: row.enforcement,
+ rule_ref: row.rule_ref,
+ trigger_ref: row.trigger_ref,
+ started_at: row.started_at,
+ workflow_task: row.workflow_task,
+ created: row.created,
+ updated: row.updated,
+ }
+ }
+}
+
fn default_page() -> u32 {
1
}
diff --git a/crates/api/src/routes/actions.rs b/crates/api/src/routes/actions.rs
index f95d55b..d087557 100644
--- a/crates/api/src/routes/actions.rs
+++ b/crates/api/src/routes/actions.rs
@@ -11,10 +11,10 @@ use std::sync::Arc;
use validator::Validate;
use attune_common::repositories::{
- action::{ActionRepository, CreateActionInput, UpdateActionInput},
+ action::{ActionRepository, ActionSearchFilters, CreateActionInput, UpdateActionInput},
pack::PackRepository,
queue_stats::QueueStatsRepository,
- Create, Delete, FindByRef, List, Update,
+ Create, Delete, FindByRef, Update,
};
use crate::{
@@ -47,21 +47,20 @@ pub async fn list_actions(
RequireAuth(_user): RequireAuth,
Query(pagination): Query,
) -> ApiResult {
- // Get all actions (we'll implement pagination in repository later)
- let actions = ActionRepository::list(&state.db).await?;
+ // All filtering and pagination happen in a single SQL query.
+ let filters = ActionSearchFilters {
+ pack: None,
+ query: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = actions.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(actions.len());
+ let result = ActionRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_actions: Vec = actions[start..end]
- .iter()
- .map(|a| ActionSummary::from(a.clone()))
- .collect();
+ let paginated_actions: Vec =
+ result.rows.into_iter().map(ActionSummary::from).collect();
- let response = PaginatedResponse::new(paginated_actions, &pagination, total);
+ let response = PaginatedResponse::new(paginated_actions, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -92,21 +91,20 @@ pub async fn list_actions_by_pack(
.await?
.ok_or_else(|| ApiError::NotFound(format!("Pack '{}' not found", pack_ref)))?;
- // Get actions for this pack
- let actions = ActionRepository::find_by_pack(&state.db, pack.id).await?;
+ // All filtering and pagination happen in a single SQL query.
+ let filters = ActionSearchFilters {
+ pack: Some(pack.id),
+ query: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = actions.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(actions.len());
+ let result = ActionRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_actions: Vec = actions[start..end]
- .iter()
- .map(|a| ActionSummary::from(a.clone()))
- .collect();
+ let paginated_actions: Vec =
+ result.rows.into_iter().map(ActionSummary::from).collect();
- let response = PaginatedResponse::new(paginated_actions, &pagination, total);
+ let response = PaginatedResponse::new(paginated_actions, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
diff --git a/crates/api/src/routes/events.rs b/crates/api/src/routes/events.rs
index bf82b4b..a2bf704 100644
--- a/crates/api/src/routes/events.rs
+++ b/crates/api/src/routes/events.rs
@@ -16,9 +16,12 @@ use validator::Validate;
use attune_common::{
mq::{EventCreatedPayload, MessageEnvelope, MessageType},
repositories::{
- event::{CreateEventInput, EnforcementRepository, EventRepository},
+ event::{
+ CreateEventInput, EnforcementRepository, EnforcementSearchFilters, EventRepository,
+ EventSearchFilters,
+ },
trigger::TriggerRepository,
- Create, FindById, FindByRef, List,
+ Create, FindById, FindByRef,
},
};
@@ -220,53 +223,27 @@ pub async fn list_events(
State(state): State>,
Query(query): Query,
) -> ApiResult {
- // Get events based on filters
- let events = if let Some(trigger_id) = query.trigger {
- // Filter by trigger ID
- EventRepository::find_by_trigger(&state.db, trigger_id).await?
- } else if let Some(trigger_ref) = &query.trigger_ref {
- // Filter by trigger reference
- EventRepository::find_by_trigger_ref(&state.db, trigger_ref).await?
- } else {
- // Get all events
- EventRepository::list(&state.db).await?
+ // All filtering and pagination happen in a single SQL query.
+ let filters = EventSearchFilters {
+ trigger: query.trigger,
+ trigger_ref: query.trigger_ref.clone(),
+ source: query.source,
+ rule_ref: query.rule_ref.clone(),
+ limit: query.limit(),
+ offset: query.offset(),
};
- // Apply additional filters in memory
- let mut filtered_events = events;
+ let result = EventRepository::search(&state.db, &filters).await?;
- if let Some(source_id) = query.source {
- filtered_events.retain(|e| e.source == Some(source_id));
- }
+ let paginated_events: Vec =
+ result.rows.into_iter().map(EventSummary::from).collect();
- if let Some(rule_ref) = &query.rule_ref {
- let rule_ref_lower = rule_ref.to_lowercase();
- filtered_events.retain(|e| {
- e.rule_ref
- .as_ref()
- .map(|r| r.to_lowercase().contains(&rule_ref_lower))
- .unwrap_or(false)
- });
- }
-
- // Calculate pagination
- let total = filtered_events.len() as u64;
- let start = query.offset() as usize;
- let end = (start + query.limit() as usize).min(filtered_events.len());
-
- // Get paginated slice
- let paginated_events: Vec = filtered_events[start..end]
- .iter()
- .map(|event| EventSummary::from(event.clone()))
- .collect();
-
- // Convert query params to pagination params for response
let pagination_params = PaginationParams {
page: query.page,
page_size: query.per_page,
};
- let response = PaginatedResponse::new(paginated_events, &pagination_params, total);
+ let response = PaginatedResponse::new(paginated_events, &pagination_params, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -319,46 +296,32 @@ pub async fn list_enforcements(
State(state): State>,
Query(query): Query,
) -> ApiResult {
- // Get enforcements based on filters
- let enforcements = if let Some(status) = query.status {
- // Filter by status
- EnforcementRepository::find_by_status(&state.db, status).await?
- } else if let Some(rule_id) = query.rule {
- // Filter by rule ID
- EnforcementRepository::find_by_rule(&state.db, rule_id).await?
- } else if let Some(event_id) = query.event {
- // Filter by event ID
- EnforcementRepository::find_by_event(&state.db, event_id).await?
- } else {
- // Get all enforcements
- EnforcementRepository::list(&state.db).await?
+ // All filtering and pagination happen in a single SQL query.
+ // Filters are combinable (AND), not mutually exclusive.
+ let filters = EnforcementSearchFilters {
+ status: query.status,
+ rule: query.rule,
+ event: query.event,
+ trigger_ref: query.trigger_ref.clone(),
+ rule_ref: query.rule_ref.clone(),
+ limit: query.limit(),
+ offset: query.offset(),
};
- // Apply additional filters in memory
- let mut filtered_enforcements = enforcements;
+ let result = EnforcementRepository::search(&state.db, &filters).await?;
- if let Some(trigger_ref) = &query.trigger_ref {
- filtered_enforcements.retain(|e| e.trigger_ref == *trigger_ref);
- }
-
- // Calculate pagination
- let total = filtered_enforcements.len() as u64;
- let start = query.offset() as usize;
- let end = (start + query.limit() as usize).min(filtered_enforcements.len());
-
- // Get paginated slice
- let paginated_enforcements: Vec = filtered_enforcements[start..end]
- .iter()
- .map(|enforcement| EnforcementSummary::from(enforcement.clone()))
+ let paginated_enforcements: Vec = result
+ .rows
+ .into_iter()
+ .map(EnforcementSummary::from)
.collect();
- // Convert query params to pagination params for response
let pagination_params = PaginationParams {
page: query.page,
page_size: query.per_page,
};
- let response = PaginatedResponse::new(paginated_enforcements, &pagination_params, total);
+ let response = PaginatedResponse::new(paginated_enforcements, &pagination_params, result.total);
Ok((StatusCode::OK, Json(response)))
}
diff --git a/crates/api/src/routes/executions.rs b/crates/api/src/routes/executions.rs
index 1b65b1e..85bc004 100644
--- a/crates/api/src/routes/executions.rs
+++ b/crates/api/src/routes/executions.rs
@@ -18,9 +18,10 @@ use attune_common::models::enums::ExecutionStatus;
use attune_common::mq::{ExecutionRequestedPayload, MessageEnvelope, MessageType};
use attune_common::repositories::{
action::ActionRepository,
- execution::{CreateExecutionInput, ExecutionRepository},
- Create, EnforcementRepository, FindById, FindByRef, List,
+ execution::{CreateExecutionInput, ExecutionRepository, ExecutionSearchFilters},
+ Create, FindById, FindByRef,
};
+use sqlx::Row;
use crate::{
auth::middleware::RequireAuth,
@@ -125,117 +126,37 @@ pub async fn list_executions(
RequireAuth(_user): RequireAuth,
Query(query): Query,
) -> ApiResult {
- // Get executions based on filters
- let executions = if let Some(status) = query.status {
- // Filter by status
- ExecutionRepository::find_by_status(&state.db, status).await?
- } else if let Some(enforcement_id) = query.enforcement {
- // Filter by enforcement
- ExecutionRepository::find_by_enforcement(&state.db, enforcement_id).await?
- } else {
- // Get all executions
- ExecutionRepository::list(&state.db).await?
+ // All filtering, pagination, and the enforcement JOIN happen in a single
+ // SQL query — no in-memory filtering or post-fetch lookups.
+ let filters = ExecutionSearchFilters {
+ status: query.status,
+ action_ref: query.action_ref.clone(),
+ pack_name: query.pack_name.clone(),
+ rule_ref: query.rule_ref.clone(),
+ trigger_ref: query.trigger_ref.clone(),
+ executor: query.executor,
+ result_contains: query.result_contains.clone(),
+ enforcement: query.enforcement,
+ parent: query.parent,
+ top_level_only: query.top_level_only == Some(true),
+ limit: query.limit(),
+ offset: query.offset(),
};
- // Apply additional filters in memory (could be optimized with database queries)
- let mut filtered_executions = executions;
+ let result = ExecutionRepository::search(&state.db, &filters).await?;
- if let Some(action_ref) = &query.action_ref {
- filtered_executions.retain(|e| e.action_ref == *action_ref);
- }
-
- if let Some(pack_name) = &query.pack_name {
- filtered_executions.retain(|e| {
- // action_ref format is "pack.action"
- e.action_ref.starts_with(&format!("{}.", pack_name))
- });
- }
-
- if let Some(result_search) = &query.result_contains {
- let search_lower = result_search.to_lowercase();
- filtered_executions.retain(|e| {
- if let Some(result) = &e.result {
- // Convert result to JSON string and search case-insensitively
- let result_str = serde_json::to_string(result).unwrap_or_default();
- result_str.to_lowercase().contains(&search_lower)
- } else {
- false
- }
- });
- }
-
- if let Some(parent_id) = query.parent {
- filtered_executions.retain(|e| e.parent == Some(parent_id));
- }
-
- if query.top_level_only == Some(true) {
- filtered_executions.retain(|e| e.parent.is_none());
- }
-
- if let Some(executor_id) = query.executor {
- filtered_executions.retain(|e| e.executor == Some(executor_id));
- }
-
- // Fetch enforcements for all executions to populate rule_ref and trigger_ref
- let enforcement_ids: Vec = filtered_executions
- .iter()
- .filter_map(|e| e.enforcement)
+ let paginated_executions: Vec = result
+ .rows
+ .into_iter()
+ .map(ExecutionSummary::from)
.collect();
- let enforcement_map: std::collections::HashMap = if !enforcement_ids.is_empty() {
- let enforcements = EnforcementRepository::list(&state.db).await?;
- enforcements.into_iter().map(|enf| (enf.id, enf)).collect()
- } else {
- std::collections::HashMap::new()
- };
-
- // Filter by rule_ref if specified
- if let Some(rule_ref) = &query.rule_ref {
- filtered_executions.retain(|e| {
- e.enforcement
- .and_then(|enf_id| enforcement_map.get(&enf_id))
- .map(|enf| enf.rule_ref == *rule_ref)
- .unwrap_or(false)
- });
- }
-
- // Filter by trigger_ref if specified
- if let Some(trigger_ref) = &query.trigger_ref {
- filtered_executions.retain(|e| {
- e.enforcement
- .and_then(|enf_id| enforcement_map.get(&enf_id))
- .map(|enf| enf.trigger_ref == *trigger_ref)
- .unwrap_or(false)
- });
- }
-
- // Calculate pagination
- let total = filtered_executions.len() as u64;
- let start = query.offset() as usize;
- let end = (start + query.limit() as usize).min(filtered_executions.len());
-
- // Get paginated slice and populate rule_ref/trigger_ref from enforcements
- let paginated_executions: Vec = filtered_executions[start..end]
- .iter()
- .map(|e| {
- let mut summary = ExecutionSummary::from(e.clone());
- if let Some(enf_id) = e.enforcement {
- if let Some(enforcement) = enforcement_map.get(&enf_id) {
- summary.rule_ref = Some(enforcement.rule_ref.clone());
- summary.trigger_ref = Some(enforcement.trigger_ref.clone());
- }
- }
- summary
- })
- .collect();
-
- // Convert query params to pagination params for response
let pagination_params = PaginationParams {
page: query.page,
page_size: query.per_page,
};
- let response = PaginatedResponse::new(paginated_executions, &pagination_params, total);
+ let response = PaginatedResponse::new(paginated_executions, &pagination_params, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -310,21 +231,23 @@ pub async fn list_executions_by_status(
}
};
- // Get executions by status
- let executions = ExecutionRepository::find_by_status(&state.db, status).await?;
+ // Use the search method for SQL-side filtering + pagination.
+ let filters = ExecutionSearchFilters {
+ status: Some(status),
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ ..Default::default()
+ };
- // Calculate pagination
- let total = executions.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(executions.len());
+ let result = ExecutionRepository::search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_executions: Vec = executions[start..end]
- .iter()
- .map(|e| ExecutionSummary::from(e.clone()))
+ let paginated_executions: Vec = result
+ .rows
+ .into_iter()
+ .map(ExecutionSummary::from)
.collect();
- let response = PaginatedResponse::new(paginated_executions, &pagination, total);
+ let response = PaginatedResponse::new(paginated_executions, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -350,21 +273,23 @@ pub async fn list_executions_by_enforcement(
Path(enforcement_id): Path,
Query(pagination): Query,
) -> ApiResult {
- // Get executions by enforcement
- let executions = ExecutionRepository::find_by_enforcement(&state.db, enforcement_id).await?;
+ // Use the search method for SQL-side filtering + pagination.
+ let filters = ExecutionSearchFilters {
+ enforcement: Some(enforcement_id),
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ ..Default::default()
+ };
- // Calculate pagination
- let total = executions.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(executions.len());
+ let result = ExecutionRepository::search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_executions: Vec = executions[start..end]
- .iter()
- .map(|e| ExecutionSummary::from(e.clone()))
+ let paginated_executions: Vec = result
+ .rows
+ .into_iter()
+ .map(ExecutionSummary::from)
.collect();
- let response = PaginatedResponse::new(paginated_executions, &pagination, total);
+ let response = PaginatedResponse::new(paginated_executions, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -384,34 +309,37 @@ pub async fn get_execution_stats(
State(state): State>,
RequireAuth(_user): RequireAuth,
) -> ApiResult {
- // Get all executions (limited by repository to 1000)
- let executions = ExecutionRepository::list(&state.db).await?;
+ // Use a single SQL query with COUNT + GROUP BY instead of fetching all rows.
+ let rows = sqlx::query(
+ "SELECT status::text AS status, COUNT(*) AS cnt FROM execution GROUP BY status",
+ )
+ .fetch_all(&state.db)
+ .await?;
- // Calculate statistics
- let total = executions.len();
- let completed = executions
- .iter()
- .filter(|e| e.status == attune_common::models::enums::ExecutionStatus::Completed)
- .count();
- let failed = executions
- .iter()
- .filter(|e| e.status == attune_common::models::enums::ExecutionStatus::Failed)
- .count();
- let running = executions
- .iter()
- .filter(|e| e.status == attune_common::models::enums::ExecutionStatus::Running)
- .count();
- let pending = executions
- .iter()
- .filter(|e| {
- matches!(
- e.status,
- attune_common::models::enums::ExecutionStatus::Requested
- | attune_common::models::enums::ExecutionStatus::Scheduling
- | attune_common::models::enums::ExecutionStatus::Scheduled
- )
- })
- .count();
+ let mut completed: i64 = 0;
+ let mut failed: i64 = 0;
+ let mut running: i64 = 0;
+ let mut pending: i64 = 0;
+ let mut cancelled: i64 = 0;
+ let mut timeout: i64 = 0;
+ let mut abandoned: i64 = 0;
+ let mut total: i64 = 0;
+
+ for row in &rows {
+ let status: &str = row.get("status");
+ let cnt: i64 = row.get("cnt");
+ total += cnt;
+ match status {
+ "completed" => completed = cnt,
+ "failed" => failed = cnt,
+ "running" => running = cnt,
+ "requested" | "scheduling" | "scheduled" => pending += cnt,
+ "cancelled" | "canceling" => cancelled += cnt,
+ "timeout" => timeout = cnt,
+ "abandoned" => abandoned = cnt,
+ _ => {}
+ }
+ }
let stats = serde_json::json!({
"total": total,
@@ -419,9 +347,9 @@ pub async fn get_execution_stats(
"failed": failed,
"running": running,
"pending": pending,
- "cancelled": executions.iter().filter(|e| e.status == attune_common::models::enums::ExecutionStatus::Cancelled).count(),
- "timeout": executions.iter().filter(|e| e.status == attune_common::models::enums::ExecutionStatus::Timeout).count(),
- "abandoned": executions.iter().filter(|e| e.status == attune_common::models::enums::ExecutionStatus::Abandoned).count(),
+ "cancelled": cancelled,
+ "timeout": timeout,
+ "abandoned": abandoned,
});
let response = ApiResponse::new(stats);
diff --git a/crates/api/src/routes/inquiries.rs b/crates/api/src/routes/inquiries.rs
index e13b548..db9024c 100644
--- a/crates/api/src/routes/inquiries.rs
+++ b/crates/api/src/routes/inquiries.rs
@@ -14,8 +14,10 @@ use attune_common::{
mq::{InquiryRespondedPayload, MessageEnvelope, MessageType},
repositories::{
execution::ExecutionRepository,
- inquiry::{CreateInquiryInput, InquiryRepository, UpdateInquiryInput},
- Create, Delete, FindById, List, Update,
+ inquiry::{
+ CreateInquiryInput, InquiryRepository, InquirySearchFilters, UpdateInquiryInput,
+ },
+ Create, Delete, FindById, Update,
},
};
@@ -51,45 +53,30 @@ pub async fn list_inquiries(
State(state): State>,
Query(query): Query,
) -> ApiResult {
- // Get inquiries based on filters
- let inquiries = if let Some(status) = query.status {
- // Filter by status
- InquiryRepository::find_by_status(&state.db, status).await?
- } else if let Some(execution_id) = query.execution {
- // Filter by execution
- InquiryRepository::find_by_execution(&state.db, execution_id).await?
- } else {
- // Get all inquiries
- InquiryRepository::list(&state.db).await?
+ // All filtering and pagination happen in a single SQL query.
+ // Filters are combinable (AND), not mutually exclusive.
+ let limit = query.limit.unwrap_or(50).min(500) as u32;
+ let offset = query.offset.unwrap_or(0) as u32;
+
+ let filters = InquirySearchFilters {
+ status: query.status,
+ execution: query.execution,
+ assigned_to: query.assigned_to,
+ limit,
+ offset,
};
- // Apply additional filters in memory
- let mut filtered_inquiries = inquiries;
+ let result = InquiryRepository::search(&state.db, &filters).await?;
- if let Some(assigned_to) = query.assigned_to {
- filtered_inquiries.retain(|i| i.assigned_to == Some(assigned_to));
- }
+ let paginated_inquiries: Vec =
+ result.rows.into_iter().map(InquirySummary::from).collect();
- // Calculate pagination
- let total = filtered_inquiries.len() as u64;
- let offset = query.offset.unwrap_or(0);
- let limit = query.limit.unwrap_or(50).min(500);
- let start = offset;
- let end = (start + limit).min(filtered_inquiries.len());
-
- // Get paginated slice
- let paginated_inquiries: Vec = filtered_inquiries[start..end]
- .iter()
- .map(|inquiry| InquirySummary::from(inquiry.clone()))
- .collect();
-
- // Convert to pagination params for response
let pagination_params = PaginationParams {
- page: (offset / limit.max(1)) as u32 + 1,
- page_size: limit as u32,
+ page: (offset / limit.max(1)) + 1,
+ page_size: limit,
};
- let response = PaginatedResponse::new(paginated_inquiries, &pagination_params, total);
+ let response = PaginatedResponse::new(paginated_inquiries, &pagination_params, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -161,20 +148,21 @@ pub async fn list_inquiries_by_status(
}
};
- let inquiries = InquiryRepository::find_by_status(&state.db, status).await?;
+ // Use the search method for SQL-side filtering + pagination.
+ let filters = InquirySearchFilters {
+ status: Some(status),
+ execution: None,
+ assigned_to: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = inquiries.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(inquiries.len());
+ let result = InquiryRepository::search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_inquiries: Vec = inquiries[start..end]
- .iter()
- .map(|inquiry| InquirySummary::from(inquiry.clone()))
- .collect();
+ let paginated_inquiries: Vec =
+ result.rows.into_iter().map(InquirySummary::from).collect();
- let response = PaginatedResponse::new(paginated_inquiries, &pagination, total);
+ let response = PaginatedResponse::new(paginated_inquiries, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -209,20 +197,21 @@ pub async fn list_inquiries_by_execution(
ApiError::NotFound(format!("Execution with ID {} not found", execution_id))
})?;
- let inquiries = InquiryRepository::find_by_execution(&state.db, execution_id).await?;
+ // Use the search method for SQL-side filtering + pagination.
+ let filters = InquirySearchFilters {
+ status: None,
+ execution: Some(execution_id),
+ assigned_to: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = inquiries.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(inquiries.len());
+ let result = InquiryRepository::search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_inquiries: Vec = inquiries[start..end]
- .iter()
- .map(|inquiry| InquirySummary::from(inquiry.clone()))
- .collect();
+ let paginated_inquiries: Vec =
+ result.rows.into_iter().map(InquirySummary::from).collect();
- let response = PaginatedResponse::new(paginated_inquiries, &pagination, total);
+ let response = PaginatedResponse::new(paginated_inquiries, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
diff --git a/crates/api/src/routes/keys.rs b/crates/api/src/routes/keys.rs
index 1d42e29..7163072 100644
--- a/crates/api/src/routes/keys.rs
+++ b/crates/api/src/routes/keys.rs
@@ -13,10 +13,10 @@ use validator::Validate;
use attune_common::models::OwnerType;
use attune_common::repositories::{
action::ActionRepository,
- key::{CreateKeyInput, KeyRepository, UpdateKeyInput},
+ key::{CreateKeyInput, KeyRepository, KeySearchFilters, UpdateKeyInput},
pack::PackRepository,
trigger::SensorRepository,
- Create, Delete, FindByRef, List, Update,
+ Create, Delete, FindByRef, Update,
};
use crate::auth::RequireAuth;
@@ -46,40 +46,24 @@ pub async fn list_keys(
State(state): State>,
Query(query): Query,
) -> ApiResult {
- // Get keys based on filters
- let keys = if let Some(owner_type) = query.owner_type {
- // Filter by owner type
- KeyRepository::find_by_owner_type(&state.db, owner_type).await?
- } else {
- // Get all keys
- KeyRepository::list(&state.db).await?
+ // All filtering and pagination happen in a single SQL query.
+ let filters = KeySearchFilters {
+ owner_type: query.owner_type,
+ owner: query.owner.clone(),
+ limit: query.limit(),
+ offset: query.offset(),
};
- // Apply additional filters in memory
- let mut filtered_keys = keys;
+ let result = KeyRepository::search(&state.db, &filters).await?;
- if let Some(owner) = &query.owner {
- filtered_keys.retain(|k| k.owner.as_ref() == Some(owner));
- }
+ let paginated_keys: Vec = result.rows.into_iter().map(KeySummary::from).collect();
- // Calculate pagination
- let total = filtered_keys.len() as u64;
- let start = query.offset() as usize;
- let end = (start + query.limit() as usize).min(filtered_keys.len());
-
- // Get paginated slice (values redacted in summary)
- let paginated_keys: Vec = filtered_keys[start..end]
- .iter()
- .map(|key| KeySummary::from(key.clone()))
- .collect();
-
- // Convert query params to pagination params for response
let pagination_params = PaginationParams {
page: query.page,
page_size: query.per_page,
};
- let response = PaginatedResponse::new(paginated_keys, &pagination_params, total);
+ let response = PaginatedResponse::new(paginated_keys, &pagination_params, result.total);
Ok((StatusCode::OK, Json(response)))
}
diff --git a/crates/api/src/routes/rules.rs b/crates/api/src/routes/rules.rs
index b56f5b0..ca1ed1b 100644
--- a/crates/api/src/routes/rules.rs
+++ b/crates/api/src/routes/rules.rs
@@ -17,9 +17,9 @@ use attune_common::mq::{
use attune_common::repositories::{
action::ActionRepository,
pack::PackRepository,
- rule::{CreateRuleInput, RuleRepository, UpdateRuleInput},
+ rule::{CreateRuleInput, RuleRepository, RuleSearchFilters, UpdateRuleInput},
trigger::TriggerRepository,
- Create, Delete, FindByRef, List, Update,
+ Create, Delete, FindByRef, Update,
};
use crate::{
@@ -50,21 +50,21 @@ pub async fn list_rules(
RequireAuth(_user): RequireAuth,
Query(pagination): Query,
) -> ApiResult {
- // Get all rules
- let rules = RuleRepository::list(&state.db).await?;
+ let filters = RuleSearchFilters {
+ pack: None,
+ action: None,
+ trigger: None,
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = rules.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(rules.len());
+ let result = RuleRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_rules: Vec = rules[start..end]
- .iter()
- .map(|r| RuleSummary::from(r.clone()))
- .collect();
+ let paginated_rules: Vec =
+ result.rows.into_iter().map(RuleSummary::from).collect();
- let response = PaginatedResponse::new(paginated_rules, &pagination, total);
+ let response = PaginatedResponse::new(paginated_rules, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -85,21 +85,21 @@ pub async fn list_enabled_rules(
RequireAuth(_user): RequireAuth,
Query(pagination): Query,
) -> ApiResult {
- // Get enabled rules
- let rules = RuleRepository::find_enabled(&state.db).await?;
+ let filters = RuleSearchFilters {
+ pack: None,
+ action: None,
+ trigger: None,
+ enabled: Some(true),
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = rules.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(rules.len());
+ let result = RuleRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_rules: Vec = rules[start..end]
- .iter()
- .map(|r| RuleSummary::from(r.clone()))
- .collect();
+ let paginated_rules: Vec =
+ result.rows.into_iter().map(RuleSummary::from).collect();
- let response = PaginatedResponse::new(paginated_rules, &pagination, total);
+ let response = PaginatedResponse::new(paginated_rules, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -130,21 +130,21 @@ pub async fn list_rules_by_pack(
.await?
.ok_or_else(|| ApiError::NotFound(format!("Pack '{}' not found", pack_ref)))?;
- // Get rules for this pack
- let rules = RuleRepository::find_by_pack(&state.db, pack.id).await?;
+ let filters = RuleSearchFilters {
+ pack: Some(pack.id),
+ action: None,
+ trigger: None,
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = rules.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(rules.len());
+ let result = RuleRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_rules: Vec = rules[start..end]
- .iter()
- .map(|r| RuleSummary::from(r.clone()))
- .collect();
+ let paginated_rules: Vec =
+ result.rows.into_iter().map(RuleSummary::from).collect();
- let response = PaginatedResponse::new(paginated_rules, &pagination, total);
+ let response = PaginatedResponse::new(paginated_rules, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -175,21 +175,21 @@ pub async fn list_rules_by_action(
.await?
.ok_or_else(|| ApiError::NotFound(format!("Action '{}' not found", action_ref)))?;
- // Get rules for this action
- let rules = RuleRepository::find_by_action(&state.db, action.id).await?;
+ let filters = RuleSearchFilters {
+ pack: None,
+ action: Some(action.id),
+ trigger: None,
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = rules.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(rules.len());
+ let result = RuleRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_rules: Vec = rules[start..end]
- .iter()
- .map(|r| RuleSummary::from(r.clone()))
- .collect();
+ let paginated_rules: Vec =
+ result.rows.into_iter().map(RuleSummary::from).collect();
- let response = PaginatedResponse::new(paginated_rules, &pagination, total);
+ let response = PaginatedResponse::new(paginated_rules, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -220,21 +220,21 @@ pub async fn list_rules_by_trigger(
.await?
.ok_or_else(|| ApiError::NotFound(format!("Trigger '{}' not found", trigger_ref)))?;
- // Get rules for this trigger
- let rules = RuleRepository::find_by_trigger(&state.db, trigger.id).await?;
+ let filters = RuleSearchFilters {
+ pack: None,
+ action: None,
+ trigger: Some(trigger.id),
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = rules.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(rules.len());
+ let result = RuleRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_rules: Vec = rules[start..end]
- .iter()
- .map(|r| RuleSummary::from(r.clone()))
- .collect();
+ let paginated_rules: Vec =
+ result.rows.into_iter().map(RuleSummary::from).collect();
- let response = PaginatedResponse::new(paginated_rules, &pagination, total);
+ let response = PaginatedResponse::new(paginated_rules, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
diff --git a/crates/api/src/routes/triggers.rs b/crates/api/src/routes/triggers.rs
index 2cd46d4..fe96590 100644
--- a/crates/api/src/routes/triggers.rs
+++ b/crates/api/src/routes/triggers.rs
@@ -14,10 +14,10 @@ use attune_common::repositories::{
pack::PackRepository,
runtime::RuntimeRepository,
trigger::{
- CreateSensorInput, CreateTriggerInput, SensorRepository, TriggerRepository,
- UpdateSensorInput, UpdateTriggerInput,
+ CreateSensorInput, CreateTriggerInput, SensorRepository, SensorSearchFilters,
+ TriggerRepository, TriggerSearchFilters, UpdateSensorInput, UpdateTriggerInput,
},
- Create, Delete, FindByRef, List, Update,
+ Create, Delete, FindByRef, Update,
};
use crate::{
@@ -54,21 +54,19 @@ pub async fn list_triggers(
RequireAuth(_user): RequireAuth,
Query(pagination): Query,
) -> ApiResult {
- // Get all triggers
- let triggers = TriggerRepository::list(&state.db).await?;
+ let filters = TriggerSearchFilters {
+ pack: None,
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = triggers.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(triggers.len());
+ let result = TriggerRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_triggers: Vec = triggers[start..end]
- .iter()
- .map(|t| TriggerSummary::from(t.clone()))
- .collect();
+ let paginated_triggers: Vec =
+ result.rows.into_iter().map(TriggerSummary::from).collect();
- let response = PaginatedResponse::new(paginated_triggers, &pagination, total);
+ let response = PaginatedResponse::new(paginated_triggers, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -89,21 +87,19 @@ pub async fn list_enabled_triggers(
RequireAuth(_user): RequireAuth,
Query(pagination): Query,
) -> ApiResult {
- // Get enabled triggers
- let triggers = TriggerRepository::find_enabled(&state.db).await?;
+ let filters = TriggerSearchFilters {
+ pack: None,
+ enabled: Some(true),
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = triggers.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(triggers.len());
+ let result = TriggerRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_triggers: Vec = triggers[start..end]
- .iter()
- .map(|t| TriggerSummary::from(t.clone()))
- .collect();
+ let paginated_triggers: Vec =
+ result.rows.into_iter().map(TriggerSummary::from).collect();
- let response = PaginatedResponse::new(paginated_triggers, &pagination, total);
+ let response = PaginatedResponse::new(paginated_triggers, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -134,21 +130,19 @@ pub async fn list_triggers_by_pack(
.await?
.ok_or_else(|| ApiError::NotFound(format!("Pack '{}' not found", pack_ref)))?;
- // Get triggers for this pack
- let triggers = TriggerRepository::find_by_pack(&state.db, pack.id).await?;
+ let filters = TriggerSearchFilters {
+ pack: Some(pack.id),
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = triggers.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(triggers.len());
+ let result = TriggerRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_triggers: Vec = triggers[start..end]
- .iter()
- .map(|t| TriggerSummary::from(t.clone()))
- .collect();
+ let paginated_triggers: Vec =
+ result.rows.into_iter().map(TriggerSummary::from).collect();
- let response = PaginatedResponse::new(paginated_triggers, &pagination, total);
+ let response = PaginatedResponse::new(paginated_triggers, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -438,21 +432,20 @@ pub async fn list_sensors(
RequireAuth(_user): RequireAuth,
Query(pagination): Query,
) -> ApiResult {
- // Get all sensors
- let sensors = SensorRepository::list(&state.db).await?;
+ let filters = SensorSearchFilters {
+ pack: None,
+ trigger: None,
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = sensors.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(sensors.len());
+ let result = SensorRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_sensors: Vec = sensors[start..end]
- .iter()
- .map(|s| SensorSummary::from(s.clone()))
- .collect();
+ let paginated_sensors: Vec =
+ result.rows.into_iter().map(SensorSummary::from).collect();
- let response = PaginatedResponse::new(paginated_sensors, &pagination, total);
+ let response = PaginatedResponse::new(paginated_sensors, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -473,21 +466,20 @@ pub async fn list_enabled_sensors(
RequireAuth(_user): RequireAuth,
Query(pagination): Query,
) -> ApiResult {
- // Get enabled sensors
- let sensors = SensorRepository::find_enabled(&state.db).await?;
+ let filters = SensorSearchFilters {
+ pack: None,
+ trigger: None,
+ enabled: Some(true),
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = sensors.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(sensors.len());
+ let result = SensorRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_sensors: Vec = sensors[start..end]
- .iter()
- .map(|s| SensorSummary::from(s.clone()))
- .collect();
+ let paginated_sensors: Vec =
+ result.rows.into_iter().map(SensorSummary::from).collect();
- let response = PaginatedResponse::new(paginated_sensors, &pagination, total);
+ let response = PaginatedResponse::new(paginated_sensors, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -518,21 +510,20 @@ pub async fn list_sensors_by_pack(
.await?
.ok_or_else(|| ApiError::NotFound(format!("Pack '{}' not found", pack_ref)))?;
- // Get sensors for this pack
- let sensors = SensorRepository::find_by_pack(&state.db, pack.id).await?;
+ let filters = SensorSearchFilters {
+ pack: Some(pack.id),
+ trigger: None,
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = sensors.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(sensors.len());
+ let result = SensorRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_sensors: Vec = sensors[start..end]
- .iter()
- .map(|s| SensorSummary::from(s.clone()))
- .collect();
+ let paginated_sensors: Vec =
+ result.rows.into_iter().map(SensorSummary::from).collect();
- let response = PaginatedResponse::new(paginated_sensors, &pagination, total);
+ let response = PaginatedResponse::new(paginated_sensors, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -563,21 +554,20 @@ pub async fn list_sensors_by_trigger(
.await?
.ok_or_else(|| ApiError::NotFound(format!("Trigger '{}' not found", trigger_ref)))?;
- // Get sensors for this trigger
- let sensors = SensorRepository::find_by_trigger(&state.db, trigger.id).await?;
+ let filters = SensorSearchFilters {
+ pack: None,
+ trigger: Some(trigger.id),
+ enabled: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = sensors.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(sensors.len());
+ let result = SensorRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_sensors: Vec = sensors[start..end]
- .iter()
- .map(|s| SensorSummary::from(s.clone()))
- .collect();
+ let paginated_sensors: Vec =
+ result.rows.into_iter().map(SensorSummary::from).collect();
- let response = PaginatedResponse::new(paginated_sensors, &pagination, total);
+ let response = PaginatedResponse::new(paginated_sensors, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
diff --git a/crates/api/src/routes/workflows.rs b/crates/api/src/routes/workflows.rs
index c200e91..951a644 100644
--- a/crates/api/src/routes/workflows.rs
+++ b/crates/api/src/routes/workflows.rs
@@ -16,8 +16,9 @@ use attune_common::repositories::{
pack::PackRepository,
workflow::{
CreateWorkflowDefinitionInput, UpdateWorkflowDefinitionInput, WorkflowDefinitionRepository,
+ WorkflowSearchFilters,
},
- Create, Delete, FindByRef, List, Update,
+ Create, Delete, FindByRef, Update,
};
use crate::{
@@ -54,64 +55,30 @@ pub async fn list_workflows(
// Validate search params
search_params.validate()?;
- // Get workflows based on filters
- let mut workflows = if let Some(tags_str) = &search_params.tags {
- // Filter by tags
- let tags: Vec<&str> = tags_str.split(',').map(|s| s.trim()).collect();
- let mut results = Vec::new();
- for tag in tags {
- let mut tag_results = WorkflowDefinitionRepository::find_by_tag(&state.db, tag).await?;
- results.append(&mut tag_results);
- }
- // Remove duplicates by ID
- results.sort_by_key(|w| w.id);
- results.dedup_by_key(|w| w.id);
- results
- } else if search_params.enabled == Some(true) {
- // Filter by enabled status (only return enabled workflows)
- WorkflowDefinitionRepository::find_enabled(&state.db).await?
- } else {
- // Get all workflows
- WorkflowDefinitionRepository::list(&state.db).await?
+ // Parse comma-separated tags into a Vec if provided
+ let tags = search_params.tags.as_ref().map(|t| {
+ t.split(',')
+ .map(|s| s.trim().to_string())
+ .collect::>()
+ });
+
+ // All filtering and pagination happen in a single SQL query.
+ let filters = WorkflowSearchFilters {
+ pack: None,
+ pack_ref: search_params.pack_ref.clone(),
+ enabled: search_params.enabled,
+ tags,
+ search: search_params.search.clone(),
+ limit: pagination.limit(),
+ offset: pagination.offset(),
};
- // Apply enabled filter if specified and not already filtered by it
- if let Some(enabled) = search_params.enabled {
- if search_params.tags.is_some() {
- // If we filtered by tags, also apply enabled filter
- workflows.retain(|w| w.enabled == enabled);
- }
- }
+ let result = WorkflowDefinitionRepository::list_search(&state.db, &filters).await?;
- // Apply search filter if provided
- if let Some(search_term) = &search_params.search {
- let search_lower = search_term.to_lowercase();
- workflows.retain(|w| {
- w.label.to_lowercase().contains(&search_lower)
- || w.description
- .as_ref()
- .map(|d| d.to_lowercase().contains(&search_lower))
- .unwrap_or(false)
- });
- }
+ let paginated_workflows: Vec =
+ result.rows.into_iter().map(WorkflowSummary::from).collect();
- // Apply pack_ref filter if provided
- if let Some(pack_ref) = &search_params.pack_ref {
- workflows.retain(|w| w.pack_ref == *pack_ref);
- }
-
- // Calculate pagination
- let total = workflows.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(workflows.len());
-
- // Get paginated slice
- let paginated_workflows: Vec = workflows[start..end]
- .iter()
- .map(|w| WorkflowSummary::from(w.clone()))
- .collect();
-
- let response = PaginatedResponse::new(paginated_workflows, &pagination, total);
+ let response = PaginatedResponse::new(paginated_workflows, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
@@ -138,25 +105,27 @@ pub async fn list_workflows_by_pack(
Query(pagination): Query,
) -> ApiResult {
// Verify pack exists
- let pack = PackRepository::find_by_ref(&state.db, &pack_ref)
+ let _pack = PackRepository::find_by_ref(&state.db, &pack_ref)
.await?
.ok_or_else(|| ApiError::NotFound(format!("Pack '{}' not found", pack_ref)))?;
- // Get workflows for this pack
- let workflows = WorkflowDefinitionRepository::find_by_pack(&state.db, pack.id).await?;
+ // All filtering and pagination happen in a single SQL query.
+ let filters = WorkflowSearchFilters {
+ pack: None,
+ pack_ref: Some(pack_ref),
+ enabled: None,
+ tags: None,
+ search: None,
+ limit: pagination.limit(),
+ offset: pagination.offset(),
+ };
- // Calculate pagination
- let total = workflows.len() as u64;
- let start = ((pagination.page - 1) * pagination.limit()) as usize;
- let end = (start + pagination.limit() as usize).min(workflows.len());
+ let result = WorkflowDefinitionRepository::list_search(&state.db, &filters).await?;
- // Get paginated slice
- let paginated_workflows: Vec = workflows[start..end]
- .iter()
- .map(|w| WorkflowSummary::from(w.clone()))
- .collect();
+ let paginated_workflows: Vec =
+ result.rows.into_iter().map(WorkflowSummary::from).collect();
- let response = PaginatedResponse::new(paginated_workflows, &pagination, total);
+ let response = PaginatedResponse::new(paginated_workflows, &pagination, result.total);
Ok((StatusCode::OK, Json(response)))
}
diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs
index 01cba9a..03a069d 100644
--- a/crates/common/src/models.rs
+++ b/crates/common/src/models.rs
@@ -1104,6 +1104,11 @@ pub mod execution {
pub status: ExecutionStatus,
pub result: Option,
+ /// When the execution actually started running (worker picked it up).
+ /// Set when status transitions to `Running`. Used to compute accurate
+ /// duration that excludes queue/scheduling wait time.
+ pub started_at: Option>,
+
/// Workflow task metadata (only populated for workflow task executions)
///
/// Provides direct access to workflow orchestration state without JOINs.
diff --git a/crates/common/src/repositories/action.rs b/crates/common/src/repositories/action.rs
index 1d87881..21a170f 100644
--- a/crates/common/src/repositories/action.rs
+++ b/crates/common/src/repositories/action.rs
@@ -8,6 +8,26 @@ use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, FindByRef, List, Repository, Update};
+/// Filters for [`ActionRepository::list_search`].
+///
+/// All fields are optional and combinable (AND). Pagination is always applied.
+#[derive(Debug, Clone, Default)]
+pub struct ActionSearchFilters {
+ /// Filter by pack ID
+ pub pack: Option,
+ /// Text search across ref, label, description (case-insensitive)
+ pub query: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`ActionRepository::list_search`].
+#[derive(Debug)]
+pub struct ActionSearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
/// Repository for Action operations
pub struct ActionRepository;
@@ -287,6 +307,92 @@ impl Delete for ActionRepository {
}
impl ActionRepository {
+ /// Search actions with all filters pushed into SQL.
+ ///
+ /// All filter fields are combinable (AND). Pagination is server-side.
+ pub async fn list_search<'e, E>(
+ db: E,
+ filters: &ActionSearchFilters,
+ ) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_version_constraint, param_schema, out_schema, workflow_def, is_adhoc, created, updated";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM action"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM action");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(pack_id) = filters.pack {
+ push_condition!("pack = ", pack_id);
+ }
+ if let Some(ref query) = filters.query {
+ let pattern = format!("%{}%", query.to_lowercase());
+ // Search needs an OR across multiple columns, wrapped in parens
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push("(LOWER(ref) LIKE ");
+ qb.push_bind(pattern.clone());
+ qb.push(" OR LOWER(label) LIKE ");
+ qb.push_bind(pattern.clone());
+ qb.push(" OR LOWER(description) LIKE ");
+ qb.push_bind(pattern.clone());
+ qb.push(")");
+
+ count_qb.push("(LOWER(ref) LIKE ");
+ count_qb.push_bind(pattern.clone());
+ count_qb.push(" OR LOWER(label) LIKE ");
+ count_qb.push_bind(pattern.clone());
+ count_qb.push(" OR LOWER(description) LIKE ");
+ count_qb.push_bind(pattern);
+ count_qb.push(")");
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY ref ASC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(ActionSearchResult { rows, total })
+ }
+
/// Find actions by pack ID
pub async fn find_by_pack<'e, E>(executor: E, pack_id: Id) -> Result>
where
diff --git a/crates/common/src/repositories/event.rs b/crates/common/src/repositories/event.rs
index 20316ad..60a0706 100644
--- a/crates/common/src/repositories/event.rs
+++ b/crates/common/src/repositories/event.rs
@@ -15,6 +15,56 @@ use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, List, Repository, Update};
+// ============================================================================
+// Event Search
+// ============================================================================
+
+/// Filters for [`EventRepository::search`].
+///
+/// All fields are optional. When set, the corresponding WHERE clause is added.
+/// Pagination is always applied.
+#[derive(Debug, Clone, Default)]
+pub struct EventSearchFilters {
+ pub trigger: Option,
+ pub trigger_ref: Option,
+ pub source: Option,
+ pub rule_ref: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`EventRepository::search`].
+#[derive(Debug)]
+pub struct EventSearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
+// ============================================================================
+// Enforcement Search
+// ============================================================================
+
+/// Filters for [`EnforcementRepository::search`].
+///
+/// All fields are optional and combinable. Pagination is always applied.
+#[derive(Debug, Clone, Default)]
+pub struct EnforcementSearchFilters {
+ pub rule: Option,
+ pub event: Option,
+ pub status: Option,
+ pub trigger_ref: Option,
+ pub rule_ref: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`EnforcementRepository::search`].
+#[derive(Debug)]
+pub struct EnforcementSearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
/// Repository for Event operations
pub struct EventRepository;
@@ -173,6 +223,75 @@ impl EventRepository {
Ok(events)
}
+
+ /// Search events with all filters pushed into SQL.
+ ///
+ /// Builds a dynamic query so that every filter, pagination, and the total
+ /// count are handled in the database — no in-memory filtering or slicing.
+ pub async fn search<'e, E>(db: E, filters: &EventSearchFilters) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, trigger, trigger_ref, config, payload, source, source_ref, rule, rule_ref, created";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM event"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM event");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(trigger_id) = filters.trigger {
+ push_condition!("trigger = ", trigger_id);
+ }
+ if let Some(ref trigger_ref) = filters.trigger_ref {
+ push_condition!("trigger_ref = ", trigger_ref.clone());
+ }
+ if let Some(source_id) = filters.source {
+ push_condition!("source = ", source_id);
+ }
+ if let Some(ref rule_ref) = filters.rule_ref {
+ push_condition!(
+ "LOWER(rule_ref) LIKE ",
+ format!("%{}%", rule_ref.to_lowercase())
+ );
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY created DESC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(EventSearchResult { rows, total })
+ }
}
// ============================================================================
@@ -425,4 +544,75 @@ impl EnforcementRepository {
Ok(enforcements)
}
+
+ /// Search enforcements with all filters pushed into SQL.
+ ///
+ /// All filter fields are combinable (AND). Pagination is server-side.
+ pub async fn search<'e, E>(
+ db: E,
+ filters: &EnforcementSearchFilters,
+ ) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, rule, rule_ref, trigger_ref, config, event, status, payload, condition, conditions, created, resolved_at";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM enforcement"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM enforcement");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(status) = &filters.status {
+ push_condition!("status = ", status.clone());
+ }
+ if let Some(rule_id) = filters.rule {
+ push_condition!("rule = ", rule_id);
+ }
+ if let Some(event_id) = filters.event {
+ push_condition!("event = ", event_id);
+ }
+ if let Some(ref trigger_ref) = filters.trigger_ref {
+ push_condition!("trigger_ref = ", trigger_ref.clone());
+ }
+ if let Some(ref rule_ref) = filters.rule_ref {
+ push_condition!("rule_ref = ", rule_ref.clone());
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY created DESC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(EnforcementSearchResult { rows, total })
+ }
}
diff --git a/crates/common/src/repositories/execution.rs b/crates/common/src/repositories/execution.rs
index 64bab7f..224d6a3 100644
--- a/crates/common/src/repositories/execution.rs
+++ b/crates/common/src/repositories/execution.rs
@@ -1,11 +1,71 @@
//! Execution repository for database operations
+use chrono::{DateTime, Utc};
+
use crate::models::{enums::ExecutionStatus, execution::*, Id, JsonDict};
use crate::Result;
use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, List, Repository, Update};
+/// Filters for the [`ExecutionRepository::search`] query-builder method.
+///
+/// Every field is optional. When set, the corresponding `WHERE` clause is
+/// appended to the query. Pagination (`limit`/`offset`) is always applied.
+///
+/// Filters that involve the `enforcement` table (`rule_ref`, `trigger_ref`)
+/// cause a `LEFT JOIN enforcement` to be added automatically.
+#[derive(Debug, Clone, Default)]
+pub struct ExecutionSearchFilters {
+ pub status: Option,
+ pub action_ref: Option,
+ pub pack_name: Option,
+ pub rule_ref: Option,
+ pub trigger_ref: Option,
+ pub executor: Option,
+ pub result_contains: Option,
+ pub enforcement: Option,
+ pub parent: Option,
+ pub top_level_only: bool,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`ExecutionRepository::search`].
+///
+/// Includes the matching rows *and* the total count (before LIMIT/OFFSET)
+/// so the caller can build pagination metadata without a second round-trip.
+#[derive(Debug)]
+pub struct ExecutionSearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
+/// An execution row with optional `rule_ref` / `trigger_ref` populated from
+/// the joined `enforcement` table. This avoids a separate in-memory lookup.
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct ExecutionWithRefs {
+ // — execution columns (same order as SELECT_COLUMNS) —
+ pub id: Id,
+ pub action: Option,
+ pub action_ref: String,
+ pub config: Option,
+ pub env_vars: Option,
+ pub parent: Option,
+ pub enforcement: Option,
+ pub executor: Option,
+ pub status: ExecutionStatus,
+ pub result: Option,
+ pub started_at: Option>,
+ #[sqlx(json, default)]
+ pub workflow_task: Option,
+ pub created: DateTime,
+ pub updated: DateTime,
+ // — joined from enforcement —
+ pub rule_ref: Option,
+ pub trigger_ref: Option,
+}
+
/// Column list for SELECT queries on the execution table.
///
/// Defined once to avoid drift between queries and the `Execution` model.
@@ -13,7 +73,7 @@ use super::{Create, Delete, FindById, List, Repository, Update};
/// are NOT in the Rust struct, so `SELECT *` must never be used.
pub const SELECT_COLUMNS: &str = "\
id, action, action_ref, config, env_vars, parent, enforcement, \
- executor, status, result, workflow_task, created, updated";
+ executor, status, result, started_at, workflow_task, created, updated";
pub struct ExecutionRepository;
@@ -43,6 +103,7 @@ pub struct UpdateExecutionInput {
pub status: Option,
pub result: Option,
pub executor: Option,
+ pub started_at: Option>,
pub workflow_task: Option,
}
@@ -52,6 +113,7 @@ impl From for UpdateExecutionInput {
status: Some(execution.status),
result: execution.result,
executor: execution.executor,
+ started_at: execution.started_at,
workflow_task: execution.workflow_task,
}
}
@@ -146,6 +208,13 @@ impl Update for ExecutionRepository {
query.push("executor = ").push_bind(executor_id);
has_updates = true;
}
+ if let Some(started_at) = input.started_at {
+ if has_updates {
+ query.push(", ");
+ }
+ query.push("started_at = ").push_bind(started_at);
+ has_updates = true;
+ }
if let Some(workflow_task) = &input.workflow_task {
if has_updates {
query.push(", ");
@@ -239,4 +308,141 @@ impl ExecutionRepository {
.await
.map_err(Into::into)
}
+
+ /// Search executions with all filters pushed into SQL.
+ ///
+ /// Builds a dynamic query with only the WHERE clauses that apply,
+ /// a LEFT JOIN on `enforcement` when `rule_ref` or `trigger_ref` filters
+ /// are present (or always, to populate those columns on the result),
+ /// and proper LIMIT/OFFSET so pagination is server-side.
+ ///
+ /// Returns both the matching page of rows and the total count.
+ pub async fn search<'e, E>(
+ db: E,
+ filters: &ExecutionSearchFilters,
+ ) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ // We always LEFT JOIN enforcement so we can return rule_ref/trigger_ref
+ // on every row without a second round-trip.
+ let prefixed_select = SELECT_COLUMNS
+ .split(", ")
+ .map(|col| format!("e.{col}"))
+ .collect::>()
+ .join(", ");
+
+ let select_clause = format!(
+ "{prefixed_select}, enf.rule_ref AS rule_ref, enf.trigger_ref AS trigger_ref"
+ );
+
+ let from_clause = "FROM execution e LEFT JOIN enforcement enf ON e.enforcement = enf.id";
+
+ // ── Build WHERE clauses ──────────────────────────────────────────
+ let mut conditions: Vec = Vec::new();
+
+ // We'll collect bind values to push into the QueryBuilder afterwards.
+ // Because QueryBuilder doesn't let us interleave raw SQL and binds in
+ // arbitrary order easily, we build the SQL string with numbered $N
+ // placeholders and then bind in order.
+
+ // Track the next placeholder index ($1, $2, …).
+ // We can't use QueryBuilder's push_bind because we need the COUNT(*)
+ // query to share the same WHERE clause text. Instead we build the
+ // clause once and execute both queries with manual binds.
+
+ // ── Use QueryBuilder for the data query ──────────────────────────
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_clause} {from_clause}"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT COUNT(*) AS total {from_clause}"));
+
+ // Helper: append the same condition to both builders.
+ // We need a tiny state machine since push_bind moves the value.
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ let needs_where = conditions.is_empty();
+ conditions.push(String::new()); // just to track count
+ if needs_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ macro_rules! push_raw_condition {
+ ($cond:expr) => {{
+ let needs_where = conditions.is_empty();
+ conditions.push(String::new());
+ if needs_where {
+ qb.push(concat!(" WHERE ", $cond));
+ count_qb.push(concat!(" WHERE ", $cond));
+ } else {
+ qb.push(concat!(" AND ", $cond));
+ count_qb.push(concat!(" AND ", $cond));
+ }
+ }};
+ }
+
+ if let Some(status) = &filters.status {
+ push_condition!("e.status = ", status.clone());
+ }
+ if let Some(action_ref) = &filters.action_ref {
+ push_condition!("e.action_ref = ", action_ref.clone());
+ }
+ if let Some(pack_name) = &filters.pack_name {
+ let pattern = format!("{pack_name}.%");
+ push_condition!("e.action_ref LIKE ", pattern);
+ }
+ if let Some(enforcement_id) = filters.enforcement {
+ push_condition!("e.enforcement = ", enforcement_id);
+ }
+ if let Some(parent_id) = filters.parent {
+ push_condition!("e.parent = ", parent_id);
+ }
+ if filters.top_level_only {
+ push_raw_condition!("e.parent IS NULL");
+ }
+ if let Some(executor_id) = filters.executor {
+ push_condition!("e.executor = ", executor_id);
+ }
+ if let Some(rule_ref) = &filters.rule_ref {
+ push_condition!("enf.rule_ref = ", rule_ref.clone());
+ }
+ if let Some(trigger_ref) = &filters.trigger_ref {
+ push_condition!("enf.trigger_ref = ", trigger_ref.clone());
+ }
+ if let Some(search) = &filters.result_contains {
+ let pattern = format!("%{}%", search.to_lowercase());
+ push_condition!("LOWER(e.result::text) LIKE ", pattern);
+ }
+
+ // ── COUNT query ──────────────────────────────────────────────────
+ let total: i64 = count_qb
+ .build_query_scalar()
+ .fetch_one(db)
+ .await?;
+ let total = total.max(0) as u64;
+
+ // ── Data query with ORDER BY + pagination ────────────────────────
+ qb.push(" ORDER BY e.created DESC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb
+ .build_query_as()
+ .fetch_all(db)
+ .await?;
+
+ Ok(ExecutionSearchResult { rows, total })
+ }
}
diff --git a/crates/common/src/repositories/inquiry.rs b/crates/common/src/repositories/inquiry.rs
index 972bbf9..87b7f74 100644
--- a/crates/common/src/repositories/inquiry.rs
+++ b/crates/common/src/repositories/inquiry.rs
@@ -7,6 +7,25 @@ use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, List, Repository, Update};
+/// Filters for [`InquiryRepository::search`].
+///
+/// All fields are optional and combinable (AND). Pagination is always applied.
+#[derive(Debug, Clone, Default)]
+pub struct InquirySearchFilters {
+ pub status: Option,
+ pub execution: Option,
+ pub assigned_to: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`InquiryRepository::search`].
+#[derive(Debug)]
+pub struct InquirySearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
pub struct InquiryRepository;
impl Repository for InquiryRepository {
@@ -157,4 +176,66 @@ impl InquiryRepository {
"SELECT id, execution, prompt, response_schema, assigned_to, status, response, timeout_at, responded_at, created, updated FROM inquiry WHERE execution = $1 ORDER BY created DESC"
).bind(execution_id).fetch_all(executor).await.map_err(Into::into)
}
+
+ /// Search inquiries with all filters pushed into SQL.
+ ///
+ /// All filter fields are combinable (AND). Pagination is server-side.
+ pub async fn search<'e, E>(db: E, filters: &InquirySearchFilters) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, execution, prompt, response_schema, assigned_to, status, response, timeout_at, responded_at, created, updated";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM inquiry"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM inquiry");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(status) = &filters.status {
+ push_condition!("status = ", status.clone());
+ }
+ if let Some(execution_id) = filters.execution {
+ push_condition!("execution = ", execution_id);
+ }
+ if let Some(assigned_to) = filters.assigned_to {
+ push_condition!("assigned_to = ", assigned_to);
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY created DESC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(InquirySearchResult { rows, total })
+ }
}
diff --git a/crates/common/src/repositories/key.rs b/crates/common/src/repositories/key.rs
index cd87597..ff88161 100644
--- a/crates/common/src/repositories/key.rs
+++ b/crates/common/src/repositories/key.rs
@@ -6,6 +6,24 @@ use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, List, Repository, Update};
+/// Filters for [`KeyRepository::search`].
+///
+/// All fields are optional and combinable (AND). Pagination is always applied.
+#[derive(Debug, Clone, Default)]
+pub struct KeySearchFilters {
+ pub owner_type: Option,
+ pub owner: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`KeyRepository::search`].
+#[derive(Debug)]
+pub struct KeySearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
pub struct KeyRepository;
impl Repository for KeyRepository {
@@ -165,4 +183,63 @@ impl KeyRepository {
"SELECT id, ref, owner_type, owner, owner_identity, owner_pack, owner_pack_ref, owner_action, owner_action_ref, owner_sensor, owner_sensor_ref, name, encrypted, encryption_key_hash, value, created, updated FROM key WHERE owner_type = $1 ORDER BY ref ASC"
).bind(owner_type).fetch_all(executor).await.map_err(Into::into)
}
+
+ /// Search keys with all filters pushed into SQL.
+ ///
+ /// All filter fields are combinable (AND). Pagination is server-side.
+ pub async fn search<'e, E>(db: E, filters: &KeySearchFilters) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, ref, owner_type, owner, owner_identity, owner_pack, owner_pack_ref, owner_action, owner_action_ref, owner_sensor, owner_sensor_ref, name, encrypted, encryption_key_hash, value, created, updated";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM key"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM key");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(ref owner_type) = filters.owner_type {
+ push_condition!("owner_type = ", owner_type.clone());
+ }
+ if let Some(ref owner) = filters.owner {
+ push_condition!("owner = ", owner.clone());
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY ref ASC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(KeySearchResult { rows, total })
+ }
}
diff --git a/crates/common/src/repositories/rule.rs b/crates/common/src/repositories/rule.rs
index 66dc2d2..42cf10d 100644
--- a/crates/common/src/repositories/rule.rs
+++ b/crates/common/src/repositories/rule.rs
@@ -8,6 +8,30 @@ use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, FindByRef, List, Repository, Update};
+/// Filters for [`RuleRepository::list_search`].
+///
+/// All fields are optional and combinable (AND). Pagination is always applied.
+#[derive(Debug, Clone, Default)]
+pub struct RuleSearchFilters {
+ /// Filter by pack ID
+ pub pack: Option,
+ /// Filter by action ID
+ pub action: Option,
+ /// Filter by trigger ID
+ pub trigger: Option,
+ /// Filter by enabled status
+ pub enabled: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`RuleRepository::list_search`].
+#[derive(Debug)]
+pub struct RuleSearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
/// Input for restoring an ad-hoc rule during pack reinstallation.
/// Unlike `CreateRuleInput`, action and trigger IDs are optional because
/// the referenced entities may not exist yet or may have been removed.
@@ -275,6 +299,71 @@ impl Delete for RuleRepository {
}
impl RuleRepository {
+ /// Search rules with all filters pushed into SQL.
+ ///
+ /// All filter fields are combinable (AND). Pagination is server-side.
+ pub async fn list_search<'e, E>(db: E, filters: &RuleSearchFilters) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, ref, pack, pack_ref, label, description, action, action_ref, trigger, trigger_ref, conditions, action_params, trigger_params, enabled, is_adhoc, created, updated";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM rule"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM rule");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(pack_id) = filters.pack {
+ push_condition!("pack = ", pack_id);
+ }
+ if let Some(action_id) = filters.action {
+ push_condition!("action = ", action_id);
+ }
+ if let Some(trigger_id) = filters.trigger {
+ push_condition!("trigger = ", trigger_id);
+ }
+ if let Some(enabled) = filters.enabled {
+ push_condition!("enabled = ", enabled);
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY ref ASC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(RuleSearchResult { rows, total })
+ }
+
/// Find rules by pack ID
pub async fn find_by_pack<'e, E>(executor: E, pack_id: Id) -> Result>
where
diff --git a/crates/common/src/repositories/trigger.rs b/crates/common/src/repositories/trigger.rs
index e11a4b6..20eb2ba 100644
--- a/crates/common/src/repositories/trigger.rs
+++ b/crates/common/src/repositories/trigger.rs
@@ -9,6 +9,56 @@ use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, FindByRef, List, Repository, Update};
+// ============================================================================
+// Trigger Search
+// ============================================================================
+
+/// Filters for [`TriggerRepository::list_search`].
+///
+/// All fields are optional and combinable (AND). Pagination is always applied.
+#[derive(Debug, Clone, Default)]
+pub struct TriggerSearchFilters {
+ /// Filter by pack ID
+ pub pack: Option,
+ /// Filter by enabled status
+ pub enabled: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`TriggerRepository::list_search`].
+#[derive(Debug)]
+pub struct TriggerSearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
+// ============================================================================
+// Sensor Search
+// ============================================================================
+
+/// Filters for [`SensorRepository::list_search`].
+///
+/// All fields are optional and combinable (AND). Pagination is always applied.
+#[derive(Debug, Clone, Default)]
+pub struct SensorSearchFilters {
+ /// Filter by pack ID
+ pub pack: Option,
+ /// Filter by trigger ID
+ pub trigger: Option,
+ /// Filter by enabled status
+ pub enabled: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`SensorRepository::list_search`].
+#[derive(Debug)]
+pub struct SensorSearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
/// Repository for Trigger operations
pub struct TriggerRepository;
@@ -251,6 +301,68 @@ impl Delete for TriggerRepository {
}
impl TriggerRepository {
+ /// Search triggers with all filters pushed into SQL.
+ ///
+ /// All filter fields are combinable (AND). Pagination is server-side.
+ pub async fn list_search<'e, E>(
+ db: E,
+ filters: &TriggerSearchFilters,
+ ) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, ref, pack, pack_ref, label, description, enabled, param_schema, out_schema, webhook_enabled, webhook_key, webhook_config, is_adhoc, created, updated";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM trigger"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM trigger");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(pack_id) = filters.pack {
+ push_condition!("pack = ", pack_id);
+ }
+ if let Some(enabled) = filters.enabled {
+ push_condition!("enabled = ", enabled);
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY ref ASC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(TriggerSearchResult { rows, total })
+ }
+
/// Find triggers by pack ID
pub async fn find_by_pack<'e, E>(executor: E, pack_id: Id) -> Result>
where
@@ -795,6 +907,71 @@ impl Delete for SensorRepository {
}
impl SensorRepository {
+ /// Search sensors with all filters pushed into SQL.
+ ///
+ /// All filter fields are combinable (AND). Pagination is server-side.
+ pub async fn list_search<'e, E>(
+ db: E,
+ filters: &SensorSearchFilters,
+ ) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_ref, runtime_version_constraint, trigger, trigger_ref, enabled, param_schema, config, created, updated";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM sensor"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM sensor");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(pack_id) = filters.pack {
+ push_condition!("pack = ", pack_id);
+ }
+ if let Some(trigger_id) = filters.trigger {
+ push_condition!("trigger = ", trigger_id);
+ }
+ if let Some(enabled) = filters.enabled {
+ push_condition!("enabled = ", enabled);
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY ref ASC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(SensorSearchResult { rows, total })
+ }
+
/// Find sensors by trigger ID
pub async fn find_by_trigger<'e, E>(executor: E, trigger_id: Id) -> Result>
where
diff --git a/crates/common/src/repositories/workflow.rs b/crates/common/src/repositories/workflow.rs
index 29ba26e..63374ae 100644
--- a/crates/common/src/repositories/workflow.rs
+++ b/crates/common/src/repositories/workflow.rs
@@ -6,6 +6,37 @@ use sqlx::{Executor, Postgres, QueryBuilder};
use super::{Create, Delete, FindById, FindByRef, List, Repository, Update};
+// ============================================================================
+// Workflow Definition Search
+// ============================================================================
+
+/// Filters for [`WorkflowDefinitionRepository::list_search`].
+///
+/// All fields are optional and combinable (AND). Pagination is always applied.
+/// Tag filtering uses `ANY(tags)` for each tag (OR across tags, AND with other filters).
+#[derive(Debug, Clone, Default)]
+pub struct WorkflowSearchFilters {
+ /// Filter by pack ID
+ pub pack: Option,
+ /// Filter by pack reference
+ pub pack_ref: Option,
+ /// Filter by enabled status
+ pub enabled: Option,
+ /// Filter by tags (OR across tags — matches if any tag is present)
+ pub tags: Option>,
+ /// Text search across label and description (case-insensitive substring)
+ pub search: Option,
+ pub limit: u32,
+ pub offset: u32,
+}
+
+/// Result of [`WorkflowDefinitionRepository::list_search`].
+#[derive(Debug)]
+pub struct WorkflowSearchResult {
+ pub rows: Vec,
+ pub total: u64,
+}
+
// ============================================================================
// WORKFLOW DEFINITION REPOSITORY
// ============================================================================
@@ -226,6 +257,102 @@ impl Delete for WorkflowDefinitionRepository {
}
impl WorkflowDefinitionRepository {
+ /// Search workflow definitions with all filters pushed into SQL.
+ ///
+ /// All filter fields are combinable (AND). Pagination is server-side.
+ /// Tags use an OR match — a workflow matches if it contains ANY of the
+ /// requested tags (via `tags && ARRAY[...]`).
+ pub async fn list_search<'e, E>(
+ db: E,
+ filters: &WorkflowSearchFilters,
+ ) -> Result
+ where
+ E: Executor<'e, Database = Postgres> + Copy + 'e,
+ {
+ let select_cols = "id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated";
+
+ let mut qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new(format!("SELECT {select_cols} FROM workflow_definition"));
+ let mut count_qb: QueryBuilder<'_, Postgres> =
+ QueryBuilder::new("SELECT COUNT(*) FROM workflow_definition");
+
+ let mut has_where = false;
+
+ macro_rules! push_condition {
+ ($cond_prefix:expr, $value:expr) => {{
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push($cond_prefix);
+ qb.push_bind($value.clone());
+ count_qb.push($cond_prefix);
+ count_qb.push_bind($value);
+ }};
+ }
+
+ if let Some(pack_id) = filters.pack {
+ push_condition!("pack = ", pack_id);
+ }
+ if let Some(ref pack_ref) = filters.pack_ref {
+ push_condition!("pack_ref = ", pack_ref.clone());
+ }
+ if let Some(enabled) = filters.enabled {
+ push_condition!("enabled = ", enabled);
+ }
+ if let Some(ref tags) = filters.tags {
+ if !tags.is_empty() {
+ // Use PostgreSQL array overlap operator: tags && ARRAY[...]
+ push_condition!("tags && ", tags.clone());
+ }
+ }
+ if let Some(ref search) = filters.search {
+ let pattern = format!("%{}%", search.to_lowercase());
+ // Search needs an OR across multiple columns, wrapped in parens
+ if !has_where {
+ qb.push(" WHERE ");
+ count_qb.push(" WHERE ");
+ has_where = true;
+ } else {
+ qb.push(" AND ");
+ count_qb.push(" AND ");
+ }
+ qb.push("(LOWER(label) LIKE ");
+ qb.push_bind(pattern.clone());
+ qb.push(" OR LOWER(COALESCE(description, '')) LIKE ");
+ qb.push_bind(pattern.clone());
+ qb.push(")");
+
+ count_qb.push("(LOWER(label) LIKE ");
+ count_qb.push_bind(pattern.clone());
+ count_qb.push(" OR LOWER(COALESCE(description, '')) LIKE ");
+ count_qb.push_bind(pattern);
+ count_qb.push(")");
+ }
+
+ // Suppress unused-assignment warning from the macro's last expansion.
+ let _ = has_where;
+
+ // Count
+ let total: i64 = count_qb.build_query_scalar().fetch_one(db).await?;
+ let total = total.max(0) as u64;
+
+ // Data query
+ qb.push(" ORDER BY label ASC");
+ qb.push(" LIMIT ");
+ qb.push_bind(filters.limit as i64);
+ qb.push(" OFFSET ");
+ qb.push_bind(filters.offset as i64);
+
+ let rows: Vec = qb.build_query_as().fetch_all(db).await?;
+
+ Ok(WorkflowSearchResult { rows, total })
+ }
+
/// Find all workflows for a specific pack by pack ID
pub async fn find_by_pack<'e, E>(executor: E, pack_id: Id) -> Result>
where
diff --git a/crates/common/src/workflow/expression/ast.rs b/crates/common/src/workflow/expression/ast.rs
new file mode 100644
index 0000000..170c1e8
--- /dev/null
+++ b/crates/common/src/workflow/expression/ast.rs
@@ -0,0 +1,112 @@
+//! # Expression AST
+//!
+//! Defines the abstract syntax tree nodes produced by the parser and consumed
+//! by the evaluator.
+
+use std::fmt;
+
+/// A binary operator connecting two sub-expressions.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum BinaryOp {
+ // Arithmetic
+ Add,
+ Sub,
+ Mul,
+ Div,
+ Mod,
+ // Comparison
+ Eq,
+ Ne,
+ Lt,
+ Gt,
+ Le,
+ Ge,
+ // Logical
+ And,
+ Or,
+ // Membership
+ In,
+}
+
+impl fmt::Display for BinaryOp {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ BinaryOp::Add => write!(f, "+"),
+ BinaryOp::Sub => write!(f, "-"),
+ BinaryOp::Mul => write!(f, "*"),
+ BinaryOp::Div => write!(f, "/"),
+ BinaryOp::Mod => write!(f, "%"),
+ BinaryOp::Eq => write!(f, "=="),
+ BinaryOp::Ne => write!(f, "!="),
+ BinaryOp::Lt => write!(f, "<"),
+ BinaryOp::Gt => write!(f, ">"),
+ BinaryOp::Le => write!(f, "<="),
+ BinaryOp::Ge => write!(f, ">="),
+ BinaryOp::And => write!(f, "and"),
+ BinaryOp::Or => write!(f, "or"),
+ BinaryOp::In => write!(f, "in"),
+ }
+ }
+}
+
+/// A unary operator applied to a single sub-expression.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum UnaryOp {
+ /// Arithmetic negation: `-x`
+ Neg,
+ /// Logical negation: `not x`
+ Not,
+}
+
+impl fmt::Display for UnaryOp {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ UnaryOp::Neg => write!(f, "-"),
+ UnaryOp::Not => write!(f, "not"),
+ }
+ }
+}
+
+/// An expression AST node.
+#[derive(Debug, Clone, PartialEq)]
+pub enum Expr {
+ /// A literal JSON value: number, string, bool, or null.
+ Literal(serde_json::Value),
+
+ /// An array literal: `[expr, expr, ...]`
+ Array(Vec),
+
+ /// A variable reference by name (e.g., `x`, `parameters`, `item`).
+ Ident(String),
+
+ /// Binary operation: `left op right`
+ BinaryOp {
+ op: BinaryOp,
+ left: Box,
+ right: Box,
+ },
+
+ /// Unary operation: `op operand`
+ UnaryOp {
+ op: UnaryOp,
+ operand: Box,
+ },
+
+ /// Property access: `expr.field`
+ DotAccess {
+ object: Box,
+ field: String,
+ },
+
+ /// Index/bracket access: `expr[index_expr]`
+ IndexAccess {
+ object: Box,
+ index: Box,
+ },
+
+ /// Function call: `name(arg1, arg2, ...)`
+ FunctionCall {
+ name: String,
+ args: Vec,
+ },
+}
diff --git a/crates/common/src/workflow/expression/evaluator.rs b/crates/common/src/workflow/expression/evaluator.rs
new file mode 100644
index 0000000..406c1a7
--- /dev/null
+++ b/crates/common/src/workflow/expression/evaluator.rs
@@ -0,0 +1,1316 @@
+//! # Expression Evaluator
+//!
+//! Walks the AST and produces a `JsonValue` result.
+
+use super::ast::{BinaryOp, Expr, UnaryOp};
+use regex::Regex;
+use serde_json::{json, Value as JsonValue};
+use thiserror::Error;
+
+/// Result type for evaluation operations.
+pub type EvalResult = Result;
+
+/// Errors that can occur during expression evaluation.
+#[derive(Debug, Error)]
+pub enum EvalError {
+ #[error("Variable not found: {0}")]
+ VariableNotFound(String),
+
+ #[error("Type error: {0}")]
+ TypeError(String),
+
+ #[error("Division by zero")]
+ DivisionByZero,
+
+ #[error("Index out of bounds: {0}")]
+ IndexOutOfBounds(String),
+
+ #[error("Unknown function: {0}")]
+ UnknownFunction(String),
+
+ #[error("Wrong number of arguments for {0}: expected {1}, got {2}")]
+ WrongArgCount(String, String, usize),
+
+ #[error("Parse error: {0}")]
+ ParseError(String),
+
+ #[error("Regex error: {0}")]
+ RegexError(String),
+}
+
+/// Trait for resolving variables and workflow-specific functions from
+/// the execution context.
+pub trait EvalContext {
+ /// Resolve a top-level variable name to its JSON value.
+ fn resolve_variable(&self, name: &str) -> EvalResult;
+
+ /// Try to call a workflow-specific function (e.g., `result()`, `succeeded()`).
+ /// Return `Ok(Some(value))` if handled, `Ok(None)` if not recognized.
+ fn call_workflow_function(
+ &self,
+ name: &str,
+ args: &[JsonValue],
+ ) -> EvalResult>;
+}
+
+/// Evaluate an AST expression against the given context.
+pub fn eval(expr: &Expr, ctx: &dyn EvalContext) -> EvalResult {
+ match expr {
+ Expr::Literal(v) => Ok(v.clone()),
+
+ Expr::Array(elements) => {
+ let mut arr = Vec::with_capacity(elements.len());
+ for elem in elements {
+ arr.push(eval(elem, ctx)?);
+ }
+ Ok(JsonValue::Array(arr))
+ }
+
+ Expr::Ident(name) => ctx.resolve_variable(name),
+
+ Expr::BinaryOp { op, left, right } => {
+ // Short-circuit for `and` / `or`
+ if *op == BinaryOp::And {
+ let lv = eval(left, ctx)?;
+ if !is_truthy(&lv) {
+ return Ok(json!(false));
+ }
+ let rv = eval(right, ctx)?;
+ return Ok(json!(is_truthy(&rv)));
+ }
+ if *op == BinaryOp::Or {
+ let lv = eval(left, ctx)?;
+ if is_truthy(&lv) {
+ return Ok(json!(true));
+ }
+ let rv = eval(right, ctx)?;
+ return Ok(json!(is_truthy(&rv)));
+ }
+
+ let lv = eval(left, ctx)?;
+ let rv = eval(right, ctx)?;
+ eval_binary_op(*op, &lv, &rv)
+ }
+
+ Expr::UnaryOp { op, operand } => {
+ let v = eval(operand, ctx)?;
+ eval_unary_op(*op, &v)
+ }
+
+ Expr::DotAccess { object, field } => {
+ let obj = eval(object, ctx)?;
+ dot_access(&obj, field)
+ }
+
+ Expr::IndexAccess { object, index } => {
+ let obj = eval(object, ctx)?;
+ let idx = eval(index, ctx)?;
+ index_access(&obj, &idx)
+ }
+
+ Expr::FunctionCall { name, args } => {
+ // First, try workflow-specific functions (result(), succeeded(), etc.)
+ // We evaluate args lazily for workflow fns that take 0 args.
+ let evaluated_args: Vec = args
+ .iter()
+ .map(|a| eval(a, ctx))
+ .collect::>>()?;
+
+ if let Some(val) = ctx.call_workflow_function(name, &evaluated_args)? {
+ return Ok(val);
+ }
+
+ // Built-in functions
+ eval_builtin_function(name, &evaluated_args)
+ }
+ }
+}
+
+// ---------------------------------------------------------------
+// Truthiness
+// ---------------------------------------------------------------
+
+/// Determine if a JSON value is "truthy" (Python-like semantics).
+pub fn is_truthy(v: &JsonValue) -> bool {
+ match v {
+ JsonValue::Null => false,
+ JsonValue::Bool(b) => *b,
+ JsonValue::Number(n) => {
+ if let Some(i) = n.as_i64() {
+ i != 0
+ } else if let Some(f) = n.as_f64() {
+ f != 0.0
+ } else {
+ true
+ }
+ }
+ JsonValue::String(s) => !s.is_empty(),
+ JsonValue::Array(a) => !a.is_empty(),
+ JsonValue::Object(o) => !o.is_empty(),
+ }
+}
+
+// ---------------------------------------------------------------
+// Binary operations
+// ---------------------------------------------------------------
+
+fn eval_binary_op(op: BinaryOp, left: &JsonValue, right: &JsonValue) -> EvalResult {
+ match op {
+ // Arithmetic
+ BinaryOp::Add => eval_add(left, right),
+ BinaryOp::Sub => eval_arithmetic(left, right, |a, b| a - b, |a, b| a - b, "-"),
+ BinaryOp::Mul => eval_arithmetic(left, right, |a, b| a * b, |a, b| a * b, "*"),
+ BinaryOp::Div => eval_div(left, right),
+ BinaryOp::Mod => eval_mod(left, right),
+
+ // Comparison
+ BinaryOp::Eq => Ok(json!(json_eq(left, right))),
+ BinaryOp::Ne => Ok(json!(!json_eq(left, right))),
+ BinaryOp::Lt => eval_ordering(left, right, |o| o == std::cmp::Ordering::Less),
+ BinaryOp::Gt => eval_ordering(left, right, |o| o == std::cmp::Ordering::Greater),
+ BinaryOp::Le => eval_ordering(left, right, |o| o != std::cmp::Ordering::Greater),
+ BinaryOp::Ge => eval_ordering(left, right, |o| o != std::cmp::Ordering::Less),
+
+ // Membership
+ BinaryOp::In => eval_in(left, right),
+
+ // And/Or handled in eval() with short-circuit
+ BinaryOp::And | BinaryOp::Or => unreachable!(),
+ }
+}
+
+fn eval_add(left: &JsonValue, right: &JsonValue) -> EvalResult {
+ // String concatenation
+ if left.is_string() && right.is_string() {
+ let l = left.as_str().unwrap();
+ let r = right.as_str().unwrap();
+ return Ok(json!(format!("{}{}", l, r)));
+ }
+
+ // Array concatenation
+ if left.is_array() && right.is_array() {
+ let mut result = left.as_array().unwrap().clone();
+ result.extend(right.as_array().unwrap().iter().cloned());
+ return Ok(JsonValue::Array(result));
+ }
+
+ // Numeric addition
+ eval_arithmetic(left, right, |a, b| a + b, |a, b| a + b, "+")
+}
+
+fn eval_arithmetic(
+ left: &JsonValue,
+ right: &JsonValue,
+ int_op: impl Fn(i64, i64) -> i64,
+ float_op: impl Fn(f64, f64) -> f64,
+ op_name: &str,
+) -> EvalResult {
+ match (as_numeric(left), as_numeric(right)) {
+ (Some(NumericValue::Int(a)), Some(NumericValue::Int(b))) => Ok(json!(int_op(a, b))),
+ (Some(a), Some(b)) => Ok(json!(float_op(a.as_f64(), b.as_f64()))),
+ _ => Err(EvalError::TypeError(format!(
+ "Cannot apply '{}' to {} and {}",
+ op_name,
+ type_name(left),
+ type_name(right)
+ ))),
+ }
+}
+
+fn eval_div(left: &JsonValue, right: &JsonValue) -> EvalResult {
+ match (as_numeric(left), as_numeric(right)) {
+ (Some(_), Some(b)) if b.as_f64() == 0.0 => Err(EvalError::DivisionByZero),
+ (Some(NumericValue::Int(a)), Some(NumericValue::Int(b))) => {
+ // Integer division stays integer if divisible
+ if a % b == 0 {
+ Ok(json!(a / b))
+ } else {
+ Ok(json!(a as f64 / b as f64))
+ }
+ }
+ (Some(a), Some(b)) => Ok(json!(a.as_f64() / b.as_f64())),
+ _ => Err(EvalError::TypeError(format!(
+ "Cannot apply '/' to {} and {}",
+ type_name(left),
+ type_name(right)
+ ))),
+ }
+}
+
+fn eval_mod(left: &JsonValue, right: &JsonValue) -> EvalResult {
+ match (as_numeric(left), as_numeric(right)) {
+ (Some(_), Some(b)) if b.as_f64() == 0.0 => Err(EvalError::DivisionByZero),
+ (Some(NumericValue::Int(a)), Some(NumericValue::Int(b))) => Ok(json!(a % b)),
+ (Some(a), Some(b)) => Ok(json!(a.as_f64() % b.as_f64())),
+ _ => Err(EvalError::TypeError(format!(
+ "Cannot apply '%' to {} and {}",
+ type_name(left),
+ type_name(right)
+ ))),
+ }
+}
+
+// ---------------------------------------------------------------
+// Comparison helpers
+// ---------------------------------------------------------------
+
+/// Deep equality that allows int/float cross-comparison.
+fn json_eq(a: &JsonValue, b: &JsonValue) -> bool {
+ match (a, b) {
+ (JsonValue::Null, JsonValue::Null) => true,
+ (JsonValue::Bool(a), JsonValue::Bool(b)) => a == b,
+ (JsonValue::Number(_), JsonValue::Number(_)) => {
+ // Allow int/float comparison
+ match (as_numeric(a), as_numeric(b)) {
+ (Some(a), Some(b)) => a.as_f64() == b.as_f64(),
+ _ => false,
+ }
+ }
+ (JsonValue::String(a), JsonValue::String(b)) => a == b,
+ (JsonValue::Array(a), JsonValue::Array(b)) => {
+ if a.len() != b.len() {
+ return false;
+ }
+ a.iter().zip(b.iter()).all(|(x, y)| json_eq(x, y))
+ }
+ (JsonValue::Object(a), JsonValue::Object(b)) => {
+ if a.len() != b.len() {
+ return false;
+ }
+ a.iter()
+ .all(|(k, v)| b.get(k).map_or(false, |bv| json_eq(v, bv)))
+ }
+ // Different types (other than number cross-compare) are never equal
+ _ => false,
+ }
+}
+
+fn eval_ordering(
+ left: &JsonValue,
+ right: &JsonValue,
+ predicate: impl Fn(std::cmp::Ordering) -> bool,
+) -> EvalResult {
+ // Number comparison (int/float cross-allowed)
+ if let (Some(a), Some(b)) = (as_numeric(left), as_numeric(right)) {
+ let af = a.as_f64();
+ let bf = b.as_f64();
+ let ord = af.partial_cmp(&bf).unwrap_or(std::cmp::Ordering::Equal);
+ return Ok(json!(predicate(ord)));
+ }
+
+ // String comparison
+ if let (Some(a), Some(b)) = (left.as_str(), right.as_str()) {
+ return Ok(json!(predicate(a.cmp(b))));
+ }
+
+ // List comparison (lexicographic)
+ if let (Some(a), Some(b)) = (left.as_array(), right.as_array()) {
+ let ord = compare_arrays(a, b)?;
+ return Ok(json!(predicate(ord)));
+ }
+
+ Err(EvalError::TypeError(format!(
+ "Cannot compare {} and {} with ordering operators",
+ type_name(left),
+ type_name(right)
+ )))
+}
+
+fn compare_arrays(a: &[JsonValue], b: &[JsonValue]) -> EvalResult {
+ for (x, y) in a.iter().zip(b.iter()) {
+ if let (Some(xn), Some(yn)) = (as_numeric(x), as_numeric(y)) {
+ let ord = xn
+ .as_f64()
+ .partial_cmp(&yn.as_f64())
+ .unwrap_or(std::cmp::Ordering::Equal);
+ if ord != std::cmp::Ordering::Equal {
+ return Ok(ord);
+ }
+ } else if let (Some(xs), Some(ys)) = (x.as_str(), y.as_str()) {
+ let ord = xs.cmp(ys);
+ if ord != std::cmp::Ordering::Equal {
+ return Ok(ord);
+ }
+ } else {
+ return Err(EvalError::TypeError(
+ "Cannot compare heterogeneous array elements for ordering".to_string(),
+ ));
+ }
+ }
+ Ok(a.len().cmp(&b.len()))
+}
+
+fn eval_in(needle: &JsonValue, haystack: &JsonValue) -> EvalResult {
+ match haystack {
+ JsonValue::Array(arr) => Ok(json!(arr.iter().any(|item| json_eq(needle, item)))),
+ JsonValue::Object(obj) => {
+ if let Some(key) = needle.as_str() {
+ Ok(json!(obj.contains_key(key)))
+ } else {
+ Err(EvalError::TypeError(
+ "Only string keys can be tested for membership in objects".to_string(),
+ ))
+ }
+ }
+ JsonValue::String(s) => {
+ if let Some(sub) = needle.as_str() {
+ Ok(json!(s.contains(sub)))
+ } else {
+ Err(EvalError::TypeError(
+ "Only strings can be tested for substring membership".to_string(),
+ ))
+ }
+ }
+ _ => Err(EvalError::TypeError(format!(
+ "'in' requires array, object, or string on right side, got {}",
+ type_name(haystack)
+ ))),
+ }
+}
+
+// ---------------------------------------------------------------
+// Unary operations
+// ---------------------------------------------------------------
+
+fn eval_unary_op(op: UnaryOp, val: &JsonValue) -> EvalResult {
+ match op {
+ UnaryOp::Neg => {
+ if let Some(n) = as_numeric(val) {
+ match n {
+ NumericValue::Int(i) => Ok(json!(-i)),
+ NumericValue::Float(f) => Ok(json!(-f)),
+ }
+ } else {
+ Err(EvalError::TypeError(format!(
+ "Cannot negate {}",
+ type_name(val)
+ )))
+ }
+ }
+ UnaryOp::Not => Ok(json!(!is_truthy(val))),
+ }
+}
+
+// ---------------------------------------------------------------
+// Property / index access
+// ---------------------------------------------------------------
+
+fn dot_access(obj: &JsonValue, field: &str) -> EvalResult {
+ match obj {
+ JsonValue::Object(map) => map
+ .get(field)
+ .cloned()
+ .ok_or_else(|| EvalError::VariableNotFound(format!("field '{}'", field))),
+ _ => Err(EvalError::TypeError(format!(
+ "Cannot access property '{}' on {}",
+ field,
+ type_name(obj)
+ ))),
+ }
+}
+
+fn index_access(obj: &JsonValue, index: &JsonValue) -> EvalResult {
+ match obj {
+ JsonValue::Array(arr) => {
+ if let Some(i) = index.as_i64() {
+ let i = if i < 0 {
+ // Negative indexing
+ (arr.len() as i64 + i) as usize
+ } else {
+ i as usize
+ };
+ arr.get(i)
+ .cloned()
+ .ok_or_else(|| EvalError::IndexOutOfBounds(format!("{}", i)))
+ } else {
+ Err(EvalError::TypeError(
+ "Array index must be an integer".to_string(),
+ ))
+ }
+ }
+ JsonValue::Object(map) => {
+ if let Some(key) = index.as_str() {
+ map.get(key)
+ .cloned()
+ .ok_or_else(|| EvalError::VariableNotFound(format!("key '{}'", key)))
+ } else {
+ Err(EvalError::TypeError(
+ "Object key must be a string".to_string(),
+ ))
+ }
+ }
+ JsonValue::String(s) => {
+ if let Some(i) = index.as_i64() {
+ let chars: Vec = s.chars().collect();
+ let i = if i < 0 {
+ (chars.len() as i64 + i) as usize
+ } else {
+ i as usize
+ };
+ chars
+ .get(i)
+ .map(|c| json!(c.to_string()))
+ .ok_or_else(|| EvalError::IndexOutOfBounds(format!("{}", i)))
+ } else {
+ Err(EvalError::TypeError(
+ "String index must be an integer".to_string(),
+ ))
+ }
+ }
+ _ => Err(EvalError::TypeError(format!(
+ "Cannot index into {}",
+ type_name(obj)
+ ))),
+ }
+}
+
+// ---------------------------------------------------------------
+// Built-in functions
+// ---------------------------------------------------------------
+
+fn eval_builtin_function(name: &str, args: &[JsonValue]) -> EvalResult {
+ match name {
+ // -- Type conversion --
+ "string" => {
+ expect_args(name, args, 1)?;
+ Ok(json!(value_to_string(&args[0])))
+ }
+ "number" => {
+ expect_args(name, args, 1)?;
+ to_number(&args[0])
+ }
+ "int" => {
+ expect_args(name, args, 1)?;
+ to_int(&args[0])
+ }
+ "bool" => {
+ expect_args(name, args, 1)?;
+ Ok(json!(is_truthy(&args[0])))
+ }
+
+ // -- Introspection --
+ "type_of" => {
+ expect_args(name, args, 1)?;
+ Ok(json!(type_name(&args[0])))
+ }
+ "length" => {
+ expect_args(name, args, 1)?;
+ fn_length(&args[0])
+ }
+ "keys" => {
+ expect_args(name, args, 1)?;
+ fn_keys(&args[0])
+ }
+ "values" => {
+ expect_args(name, args, 1)?;
+ fn_values(&args[0])
+ }
+
+ // -- Math --
+ "abs" => {
+ expect_args(name, args, 1)?;
+ fn_abs(&args[0])
+ }
+ "floor" => {
+ expect_args(name, args, 1)?;
+ fn_floor(&args[0])
+ }
+ "ceil" => {
+ expect_args(name, args, 1)?;
+ fn_ceil(&args[0])
+ }
+ "round" => {
+ expect_args(name, args, 1)?;
+ fn_round(&args[0])
+ }
+ "min" => {
+ expect_args(name, args, 2)?;
+ fn_min(&args[0], &args[1])
+ }
+ "max" => {
+ expect_args(name, args, 2)?;
+ fn_max(&args[0], &args[1])
+ }
+ "sum" => {
+ expect_args(name, args, 1)?;
+ fn_sum(&args[0])
+ }
+
+ // -- String --
+ "lower" => {
+ expect_args(name, args, 1)?;
+ fn_lower(&args[0])
+ }
+ "upper" => {
+ expect_args(name, args, 1)?;
+ fn_upper(&args[0])
+ }
+ "trim" => {
+ expect_args(name, args, 1)?;
+ fn_trim(&args[0])
+ }
+ "split" => {
+ expect_args(name, args, 2)?;
+ fn_split(&args[0], &args[1])
+ }
+ "join" => {
+ expect_args(name, args, 2)?;
+ fn_join(&args[0], &args[1])
+ }
+ "replace" => {
+ expect_args(name, args, 3)?;
+ fn_replace(&args[0], &args[1], &args[2])
+ }
+ "starts_with" => {
+ expect_args(name, args, 2)?;
+ fn_starts_with(&args[0], &args[1])
+ }
+ "ends_with" => {
+ expect_args(name, args, 2)?;
+ fn_ends_with(&args[0], &args[1])
+ }
+ "match" => {
+ expect_args(name, args, 2)?;
+ fn_match(&args[0], &args[1])
+ }
+
+ // -- Collections --
+ "contains" => {
+ expect_args(name, args, 2)?;
+ eval_in(&args[1], &args[0])
+ }
+ "reversed" => {
+ expect_args(name, args, 1)?;
+ fn_reversed(&args[0])
+ }
+ "sort" => {
+ expect_args(name, args, 1)?;
+ fn_sort(&args[0])
+ }
+ "unique" => {
+ expect_args(name, args, 1)?;
+ fn_unique(&args[0])
+ }
+ "flat" => {
+ expect_args(name, args, 1)?;
+ fn_flat(&args[0])
+ }
+ "zip" => {
+ expect_args(name, args, 2)?;
+ fn_zip(&args[0], &args[1])
+ }
+ "range" => {
+ if args.len() == 1 {
+ fn_range_1(&args[0])
+ } else if args.len() == 2 {
+ fn_range_2(&args[0], &args[1])
+ } else {
+ Err(EvalError::WrongArgCount(
+ name.to_string(),
+ "1 or 2".to_string(),
+ args.len(),
+ ))
+ }
+ }
+ "slice" => {
+ if args.len() == 2 {
+ fn_slice(&args[0], &args[1], &JsonValue::Null)
+ } else if args.len() == 3 {
+ fn_slice(&args[0], &args[1], &args[2])
+ } else {
+ Err(EvalError::WrongArgCount(
+ name.to_string(),
+ "2 or 3".to_string(),
+ args.len(),
+ ))
+ }
+ }
+ "index_of" => {
+ expect_args(name, args, 2)?;
+ fn_index_of(&args[0], &args[1])
+ }
+ "count" => {
+ expect_args(name, args, 2)?;
+ fn_count(&args[0], &args[1])
+ }
+ "merge" => {
+ expect_args(name, args, 2)?;
+ fn_merge(&args[0], &args[1])
+ }
+ "chunks" => {
+ expect_args(name, args, 2)?;
+ fn_chunks(&args[0], &args[1])
+ }
+
+ _ => Err(EvalError::UnknownFunction(name.to_string())),
+ }
+}
+
+fn expect_args(name: &str, args: &[JsonValue], expected: usize) -> EvalResult<()> {
+ if args.len() != expected {
+ Err(EvalError::WrongArgCount(
+ name.to_string(),
+ expected.to_string(),
+ args.len(),
+ ))
+ } else {
+ Ok(())
+ }
+}
+
+// ---------------------------------------------------------------
+// Numeric helpers
+// ---------------------------------------------------------------
+
+#[derive(Debug, Clone, Copy)]
+enum NumericValue {
+ Int(i64),
+ Float(f64),
+}
+
+impl NumericValue {
+ fn as_f64(self) -> f64 {
+ match self {
+ NumericValue::Int(i) => i as f64,
+ NumericValue::Float(f) => f,
+ }
+ }
+}
+
+fn as_numeric(v: &JsonValue) -> Option {
+ if let Some(i) = v.as_i64() {
+ Some(NumericValue::Int(i))
+ } else if let Some(f) = v.as_f64() {
+ Some(NumericValue::Float(f))
+ } else {
+ None
+ }
+}
+
+fn type_name(v: &JsonValue) -> &'static str {
+ match v {
+ JsonValue::Null => "null",
+ JsonValue::Bool(_) => "bool",
+ JsonValue::Number(_) => "number",
+ JsonValue::String(_) => "string",
+ JsonValue::Array(_) => "array",
+ JsonValue::Object(_) => "object",
+ }
+}
+
+fn value_to_string(v: &JsonValue) -> String {
+ match v {
+ JsonValue::String(s) => s.clone(),
+ JsonValue::Null => "null".to_string(),
+ JsonValue::Bool(b) => b.to_string(),
+ JsonValue::Number(n) => n.to_string(),
+ other => serde_json::to_string(other).unwrap_or_default(),
+ }
+}
+
+// ---------------------------------------------------------------
+// Type conversion functions
+// ---------------------------------------------------------------
+
+fn to_number(v: &JsonValue) -> EvalResult {
+ match v {
+ JsonValue::Number(_) => Ok(v.clone()),
+ JsonValue::String(s) => {
+ if let Ok(f) = s.parse::() {
+ Ok(json!(f))
+ } else {
+ Err(EvalError::TypeError(format!(
+ "Cannot convert string '{}' to number",
+ s
+ )))
+ }
+ }
+ JsonValue::Bool(b) => Ok(json!(if *b { 1.0 } else { 0.0 })),
+ _ => Err(EvalError::TypeError(format!(
+ "Cannot convert {} to number",
+ type_name(v)
+ ))),
+ }
+}
+
+fn to_int(v: &JsonValue) -> EvalResult {
+ match v {
+ JsonValue::Number(n) => {
+ if let Some(i) = n.as_i64() {
+ Ok(json!(i))
+ } else if let Some(f) = n.as_f64() {
+ Ok(json!(f as i64))
+ } else {
+ Err(EvalError::TypeError("Cannot convert number to int".to_string()))
+ }
+ }
+ JsonValue::String(s) => {
+ // Try integer first, then float truncation
+ if let Ok(i) = s.parse::() {
+ Ok(json!(i))
+ } else if let Ok(f) = s.parse::() {
+ Ok(json!(f as i64))
+ } else {
+ Err(EvalError::TypeError(format!(
+ "Cannot convert string '{}' to int",
+ s
+ )))
+ }
+ }
+ JsonValue::Bool(b) => Ok(json!(if *b { 1 } else { 0 })),
+ _ => Err(EvalError::TypeError(format!(
+ "Cannot convert {} to int",
+ type_name(v)
+ ))),
+ }
+}
+
+// ---------------------------------------------------------------
+// Introspection functions
+// ---------------------------------------------------------------
+
+fn fn_length(v: &JsonValue) -> EvalResult {
+ match v {
+ JsonValue::String(s) => Ok(json!(s.len())),
+ JsonValue::Array(a) => Ok(json!(a.len())),
+ JsonValue::Object(o) => Ok(json!(o.len())),
+ _ => Err(EvalError::TypeError(format!(
+ "length() requires string, array, or object, got {}",
+ type_name(v)
+ ))),
+ }
+}
+
+fn fn_keys(v: &JsonValue) -> EvalResult {
+ match v {
+ JsonValue::Object(obj) => {
+ let keys: Vec = obj.keys().map(|k| json!(k)).collect();
+ Ok(JsonValue::Array(keys))
+ }
+ _ => Err(EvalError::TypeError(format!(
+ "keys() requires object, got {}",
+ type_name(v)
+ ))),
+ }
+}
+
+fn fn_values(v: &JsonValue) -> EvalResult {
+ match v {
+ JsonValue::Object(obj) => {
+ let values: Vec = obj.values().cloned().collect();
+ Ok(JsonValue::Array(values))
+ }
+ _ => Err(EvalError::TypeError(format!(
+ "values() requires object, got {}",
+ type_name(v)
+ ))),
+ }
+}
+
+// ---------------------------------------------------------------
+// Math functions
+// ---------------------------------------------------------------
+
+fn fn_abs(v: &JsonValue) -> EvalResult {
+ match as_numeric(v) {
+ Some(NumericValue::Int(i)) => Ok(json!(i.abs())),
+ Some(NumericValue::Float(f)) => Ok(json!(f.abs())),
+ None => Err(EvalError::TypeError(format!(
+ "abs() requires number, got {}",
+ type_name(v)
+ ))),
+ }
+}
+
+fn fn_floor(v: &JsonValue) -> EvalResult {
+ match as_numeric(v) {
+ Some(NumericValue::Int(i)) => Ok(json!(i)),
+ Some(NumericValue::Float(f)) => Ok(json!(f.floor() as i64)),
+ None => Err(EvalError::TypeError(format!(
+ "floor() requires number, got {}",
+ type_name(v)
+ ))),
+ }
+}
+
+fn fn_ceil(v: &JsonValue) -> EvalResult {
+ match as_numeric(v) {
+ Some(NumericValue::Int(i)) => Ok(json!(i)),
+ Some(NumericValue::Float(f)) => Ok(json!(f.ceil() as i64)),
+ None => Err(EvalError::TypeError(format!(
+ "ceil() requires number, got {}",
+ type_name(v)
+ ))),
+ }
+}
+
+fn fn_round(v: &JsonValue) -> EvalResult {
+ match as_numeric(v) {
+ Some(NumericValue::Int(i)) => Ok(json!(i)),
+ Some(NumericValue::Float(f)) => Ok(json!(f.round() as i64)),
+ None => Err(EvalError::TypeError(format!(
+ "round() requires number, got {}",
+ type_name(v)
+ ))),
+ }
+}
+
+fn fn_min(a: &JsonValue, b: &JsonValue) -> EvalResult {
+ match (as_numeric(a), as_numeric(b)) {
+ (Some(NumericValue::Int(x)), Some(NumericValue::Int(y))) => Ok(json!(x.min(y))),
+ (Some(x), Some(y)) => Ok(json!(x.as_f64().min(y.as_f64()))),
+ _ => {
+ // String min
+ if let (Some(sa), Some(sb)) = (a.as_str(), b.as_str()) {
+ Ok(json!(sa.min(sb)))
+ } else {
+ Err(EvalError::TypeError(
+ "min() requires two numbers or two strings".to_string(),
+ ))
+ }
+ }
+ }
+}
+
+fn fn_max(a: &JsonValue, b: &JsonValue) -> EvalResult {
+ match (as_numeric(a), as_numeric(b)) {
+ (Some(NumericValue::Int(x)), Some(NumericValue::Int(y))) => Ok(json!(x.max(y))),
+ (Some(x), Some(y)) => Ok(json!(x.as_f64().max(y.as_f64()))),
+ _ => {
+ if let (Some(sa), Some(sb)) = (a.as_str(), b.as_str()) {
+ Ok(json!(sa.max(sb)))
+ } else {
+ Err(EvalError::TypeError(
+ "max() requires two numbers or two strings".to_string(),
+ ))
+ }
+ }
+ }
+}
+
+fn fn_sum(v: &JsonValue) -> EvalResult {
+ match v {
+ JsonValue::Array(arr) => {
+ let mut has_float = false;
+ let mut int_sum: i64 = 0;
+ let mut float_sum: f64 = 0.0;
+
+ for item in arr {
+ match as_numeric(item) {
+ Some(NumericValue::Int(i)) => {
+ int_sum += i;
+ float_sum += i as f64;
+ }
+ Some(NumericValue::Float(f)) => {
+ has_float = true;
+ float_sum += f;
+ }
+ None => {
+ return Err(EvalError::TypeError(format!(
+ "sum() requires array of numbers, found {}",
+ type_name(item)
+ )));
+ }
+ }
+ }
+
+ if has_float {
+ Ok(json!(float_sum))
+ } else {
+ Ok(json!(int_sum))
+ }
+ }
+ _ => Err(EvalError::TypeError(format!(
+ "sum() requires array, got {}",
+ type_name(v)
+ ))),
+ }
+}
+
+// ---------------------------------------------------------------
+// String functions
+// ---------------------------------------------------------------
+
+fn fn_lower(v: &JsonValue) -> EvalResult {
+ require_string("lower", v).map(|s| json!(s.to_lowercase()))
+}
+
+fn fn_upper(v: &JsonValue) -> EvalResult {
+ require_string("upper", v).map(|s| json!(s.to_uppercase()))
+}
+
+fn fn_trim(v: &JsonValue) -> EvalResult {
+ require_string("trim", v).map(|s| json!(s.trim()))
+}
+
+fn fn_split(s: &JsonValue, sep: &JsonValue) -> EvalResult