Files
attune/tests/helpers/client_wrapper.py
2026-02-04 17:46:30 -06:00

1188 lines
41 KiB
Python

"""
Wrapper for Generated API Client
This module provides a backward-compatible wrapper around the auto-generated
OpenAPI client, maintaining the same interface as the original AttuneClient
while using the generated client internally.
This allows tests to gradually migrate to the generated client without
requiring immediate changes to all test code.
"""
import os
from typing import Any, Optional
from generated_client import AuthenticatedClient, Client
from generated_client.api.actions import (
create_action as gen_create_action,
)
from generated_client.api.actions import (
delete_action as gen_delete_action,
)
from generated_client.api.actions import (
get_action as gen_get_action,
)
from generated_client.api.actions import (
list_actions as gen_list_actions,
)
from generated_client.api.auth import (
login as gen_login,
)
from generated_client.api.auth import (
register as gen_register,
)
from generated_client.api.enforcements import list_enforcements as gen_list_enforcements
from generated_client.api.events import list_events as gen_list_events
from generated_client.api.executions import (
get_execution as gen_get_execution,
)
from generated_client.api.executions import (
list_executions as gen_list_executions,
)
from generated_client.api.health import health as gen_health
from generated_client.api.inquiries import (
list_inquiries as gen_list_inquiries,
)
from generated_client.api.inquiries import (
respond_to_inquiry as gen_respond_inquiry,
)
from generated_client.api.packs import (
create_pack as gen_create_pack,
)
from generated_client.api.packs import (
delete_pack as gen_delete_pack,
)
from generated_client.api.packs import (
get_pack as gen_get_pack,
)
from generated_client.api.packs import (
list_packs as gen_list_packs,
)
from generated_client.api.packs import (
register_pack as gen_register_pack,
)
from generated_client.api.rules import (
create_rule as gen_create_rule,
)
from generated_client.api.rules import (
delete_rule as gen_delete_rule,
)
from generated_client.api.rules import (
disable_rule as gen_disable_rule,
)
from generated_client.api.rules import (
enable_rule as gen_enable_rule,
)
from generated_client.api.rules import (
get_rule as gen_get_rule,
)
from generated_client.api.rules import (
list_rules as gen_list_rules,
)
from generated_client.api.secrets import (
create_key as gen_create_key,
)
from generated_client.api.secrets import (
delete_key as gen_delete_key,
)
from generated_client.api.secrets import (
get_key as gen_get_key,
)
from generated_client.api.secrets import (
list_keys as gen_list_keys,
)
from generated_client.api.secrets import (
update_key as gen_update_key,
)
from generated_client.api.sensors import (
create_sensor as gen_create_sensor,
)
from generated_client.api.sensors import (
delete_sensor as gen_delete_sensor,
)
from generated_client.api.sensors import (
get_sensor as gen_get_sensor,
)
from generated_client.api.sensors import (
list_sensors as gen_list_sensors,
)
from generated_client.api.triggers import (
create_trigger as gen_create_trigger,
)
from generated_client.api.triggers import (
delete_trigger as gen_delete_trigger,
)
from generated_client.api.triggers import (
get_trigger as gen_get_trigger,
)
from generated_client.api.triggers import (
list_triggers as gen_list_triggers,
)
from generated_client.api.webhooks import receive_webhook as gen_receive_webhook
from generated_client.models.create_action_request import CreateActionRequest
from generated_client.models.create_key_request import CreateKeyRequest
from generated_client.models.create_pack_request import CreatePackRequest
from generated_client.models.create_rule_request import CreateRuleRequest
from generated_client.models.create_sensor_request import CreateSensorRequest
from generated_client.models.create_trigger_request import CreateTriggerRequest
from generated_client.models.inquiry_respond_request import InquiryRespondRequest
from generated_client.models.login_request import LoginRequest
from generated_client.models.register_pack_request import RegisterPackRequest
from generated_client.models.register_request import RegisterRequest
from generated_client.models.update_key_request import UpdateKeyRequest
def to_dict(obj: Any) -> Any:
"""Convert Pydantic model to dict recursively"""
if obj is None:
return None
if hasattr(obj, "to_dict"):
return obj.to_dict()
if isinstance(obj, dict):
return obj
if isinstance(obj, list):
return [to_dict(item) for item in obj]
return obj
class AttuneClient:
"""
Backward-compatible wrapper for generated Attune API client
This class wraps the auto-generated OpenAPI client to maintain
compatibility with existing test code while using type-safe
generated API calls internally.
"""
def __init__(
self,
base_url: Optional[str] = None,
timeout: int = 30,
auto_login: bool = True,
):
"""
Initialize Attune client wrapper
Args:
base_url: API base URL (defaults to ATTUNE_API_URL env var)
timeout: Request timeout in seconds
auto_login: Whether to auto-login with default credentials
"""
self.base_url = base_url or os.getenv("ATTUNE_API_URL", "http://localhost:8080")
self.timeout = timeout
self.auto_login_flag = auto_login
# Initialize unauthenticated clients
# Note: Generated API functions include full paths like "/api/v1/packs"
# so base_url should just be the host
self.client = Client(
base_url=self.base_url,
timeout=float(timeout),
verify_ssl=False,
)
# Auth client (same as regular client since paths are in generated code)
self.auth_base_client = Client(
base_url=self.base_url,
timeout=float(timeout),
verify_ssl=False,
)
# Will be set after login
self.auth_client: Optional[AuthenticatedClient] = None
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.user_info: Optional[dict] = None
# Default credentials
self.default_login = os.getenv("TEST_USER_LOGIN", "test@attune.local")
self.default_password = os.getenv("TEST_USER_PASSWORD", "TestPass123!")
# Auto-login if requested
if self.auto_login_flag:
self.login()
def _get_client(self) -> AuthenticatedClient:
"""Get authenticated client (raises if not logged in)"""
if not self.auth_client:
raise Exception("Not authenticated. Please login first.")
return self.auth_client
def register(
self,
login: str,
password: str,
display_name: Optional[str] = None,
) -> dict:
"""
Register a new user
Args:
login: User login/email
password: User password
display_name: User display name
Returns:
dict: User information
"""
request = RegisterRequest(
login=login,
password=password,
display_name=display_name or login,
)
response = gen_register.sync(client=self.auth_base_client, body=request)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return result
raise Exception("Registration failed")
def login(
self,
login: Optional[str] = None,
password: Optional[str] = None,
) -> dict:
"""
Login and obtain access token
Args:
login: User login (defaults to TEST_USER_LOGIN)
password: User password (defaults to TEST_USER_PASSWORD)
Returns:
dict: Login response with tokens
"""
login_email = login or self.default_login
login_password = password or self.default_password
request = LoginRequest(
login=login_email,
password=login_password,
)
response = gen_login.sync(client=self.auth_base_client, body=request)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
data = result["data"]
self.access_token = data.get("access_token")
self.refresh_token = data.get("refresh_token")
self.user_info = data.get("user")
# Create authenticated client
# Note: base_url should just be host since generated API includes full paths
self.auth_client = AuthenticatedClient(
base_url=self.base_url,
token=self.access_token,
timeout=float(self.timeout),
verify_ssl=False,
)
return data
raise Exception("Login failed")
def logout(self):
"""Logout and clear tokens"""
self.auth_client = None
self.access_token = None
self.refresh_token = None
self.user_info = None
def health(self) -> dict:
"""Check API health"""
response = gen_health.sync(client=self.auth_base_client)
return to_dict(response) if response else {}
# ========================================================================
# Packs
# ========================================================================
def list_packs(self, **params) -> list[dict]:
"""List all packs"""
response = gen_list_packs.sync(client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_pack(self, pack_id: int) -> dict:
"""Get pack by ID - Note: API uses ref, so need to lookup by ref"""
# The generated API uses ref, not ID
# This is a compatibility shim - we need to list and find by ID
packs = self.list_packs()
for pack in packs:
if pack.get("id") == pack_id:
return self.get_pack_by_ref(pack["ref"])
raise Exception(f"Pack {pack_id} not found")
def get_pack_by_ref(self, ref: str) -> Optional[dict]:
"""Get pack by reference"""
response = gen_get_pack.sync(ref=ref, client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return None
def create_pack(
self,
ref: str,
label: str,
description: Optional[str] = None,
version: str = "1.0.0",
author: Optional[str] = None,
**kwargs,
) -> dict:
"""Create a new pack"""
request = CreatePackRequest(
ref=ref,
label=label,
description=description,
version=version,
author=author,
)
response = gen_create_pack.sync(client=self._get_client(), body=request)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return result
raise Exception("Failed to create pack")
def register_pack(
self, path: str, skip_tests: bool = True, force: bool = False
) -> dict:
"""Register a pack from filesystem path
Args:
path: Path to pack directory
skip_tests: Skip running pack tests during registration (default: True)
force: Force registration even if tests fail (default: False)
"""
# Use direct HTTP request to avoid generated client schema mismatch
payload = {"path": path, "skip_tests": skip_tests, "force": force}
response = self.post("/api/v1/packs/register", json=payload)
if response.status_code in (200, 201):
data = response.json()
if isinstance(data, dict) and "data" in data:
return data["data"]
return data
raise Exception(
f"Failed to register pack: {response.status_code} {response.text}"
)
def reload_pack(self, pack_id: int) -> dict:
"""Reload a pack"""
# Not implemented in wrapper yet
raise NotImplementedError("reload_pack not yet implemented")
def delete_pack(self, pack_id: int):
"""Delete a pack"""
# Need to get pack ref first
pack = self.get_pack(pack_id)
if pack:
gen_delete_pack.sync(ref=pack["ref"], client=self._get_client())
# ========================================================================
# Actions
# ========================================================================
def list_actions(self, **params) -> list[dict]:
"""List all actions"""
response = gen_list_actions.sync(
client=self._get_client(),
pack=params.get("pack"),
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_action(self, action_id: int) -> dict:
"""Get action by ID - needs ref lookup"""
actions = self.list_actions()
for action in actions:
if action.get("id") == action_id:
return self.get_action_by_ref(action["ref"])
raise Exception(f"Action {action_id} not found")
def get_action_by_ref(self, ref: str) -> Optional[dict]:
"""Get action by reference"""
response = gen_get_action.sync(ref=ref, client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return None
def create_action(
self,
ref: Optional[str] = None,
label: Optional[str] = None,
pack_ref: Optional[str] = None,
entrypoint: Optional[str] = None,
description: Optional[str] = None,
param_schema: Optional[dict] = None,
out_schema: Optional[dict] = None,
runtime_ref: Optional[str] = None,
# Legacy arguments for backward compatibility
name: Optional[str] = None,
runner_type: Optional[str] = None,
**kwargs,
) -> dict:
"""Create a new action
Supports both new-style (ref, label, pack_ref, entrypoint)
and legacy-style (pack_ref, name, runner_type, entrypoint) arguments.
"""
# Handle legacy-style arguments
if pack_ref and name:
# Build ref from pack_ref and name
ref = f"{pack_ref}.{name}"
label = name.replace("_", " ").title()
# Map legacy runner_type to runtime_ref
if runner_type and not runtime_ref:
runtime_ref = f"core.action.{runner_type}"
# Validate required fields
if not ref or not label or not pack_ref or not entrypoint:
raise ValueError(
"Missing required arguments: ref, label, pack_ref, and entrypoint "
"(or pack_ref, name, and entrypoint)"
)
# Use plain POST request instead of generated client to handle API schema changes
payload = {
"ref": ref,
"label": label,
"pack_ref": pack_ref,
"entrypoint": entrypoint,
"description": description or f"Action: {label}",
}
if param_schema:
payload["param_schema"] = param_schema
if out_schema:
payload["out_schema"] = out_schema
if runtime_ref:
payload["runtime_ref"] = runtime_ref
response = self._request("POST", "/api/v1/actions", json=payload)
if response.status_code in (200, 201):
data = response.json()
if "data" in data:
return data["data"]
return data
raise Exception(
f"Failed to create action: {response.status_code} {response.text}"
)
def delete_action(self, action_id: int):
"""Delete an action"""
action = self.get_action(action_id)
if action:
gen_delete_action.sync(ref=action["ref"], client=self._get_client())
# ========================================================================
# Triggers
# ========================================================================
def list_triggers(self, **params) -> list[dict]:
"""List all triggers"""
response = gen_list_triggers.sync(client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_trigger(self, trigger_id: int) -> dict:
"""Get trigger by ID"""
triggers = self.list_triggers()
for trigger in triggers:
if trigger.get("id") == trigger_id:
return self.get_trigger_by_ref(trigger["ref"])
raise Exception(f"Trigger {trigger_id} not found")
def get_trigger_by_ref(self, ref: str) -> Optional[dict]:
"""Get trigger by reference"""
response = gen_get_trigger.sync(ref=ref, client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return None
def create_trigger(
self,
ref: Optional[str] = None,
label: Optional[str] = None,
pack_ref: Optional[str] = None,
description: Optional[str] = None,
param_schema: Optional[dict] = None,
out_schema: Optional[dict] = None,
# Legacy arguments for backward compatibility
name: Optional[str] = None,
trigger_type: Optional[str] = None,
parameters: Optional[dict] = None,
**kwargs,
) -> dict:
"""Create a new trigger
Supports both new-style (ref, label, pack_ref) and legacy-style (pack_ref, name) arguments.
"""
# Handle legacy-style arguments
if pack_ref and name:
# Build ref from pack_ref and name
ref = f"{pack_ref}.{name}"
label = name.replace("_", " ").title()
if parameters:
param_schema = parameters
# Validate required fields
if not ref or not label or not pack_ref:
raise ValueError(
"Missing required arguments: ref, label, and pack_ref (or pack_ref and name)"
)
# Use plain POST request instead of generated client to handle API schema changes
payload = {
"ref": ref,
"label": label,
"pack_ref": pack_ref,
}
if description:
payload["description"] = description
if param_schema:
payload["param_schema"] = param_schema
if out_schema:
payload["out_schema"] = out_schema
response = self._request("POST", "/api/v1/triggers", json=payload)
if response.status_code in (200, 201):
data = response.json()
if "data" in data:
return data["data"]
return data
raise Exception(
f"Failed to create trigger: {response.status_code} {response.text}"
)
def delete_trigger(self, trigger_id: int):
"""Delete a trigger"""
trigger = self.get_trigger(trigger_id)
if trigger:
gen_delete_trigger.sync(ref=trigger["ref"], client=self._get_client())
def enable_webhook(
self,
trigger_ref: Optional[str] = None,
trigger_id: Optional[int] = None,
) -> dict:
"""Enable webhooks for a trigger
Supports both trigger_ref and trigger_id arguments for backward compatibility.
"""
# Handle legacy trigger_id argument
if trigger_id and not trigger_ref:
trigger = self.get_trigger(trigger_id)
if trigger:
trigger_ref = trigger["ref"]
else:
raise Exception(f"Trigger {trigger_id} not found")
if not trigger_ref:
raise ValueError("Either trigger_ref or trigger_id must be provided")
response = self._request(
"POST", f"/api/v1/triggers/{trigger_ref}/webhooks/enable"
)
if response.status_code in (200, 201):
data = response.json()
if "data" in data:
return data["data"]
return data
raise Exception(
f"Failed to enable webhook: {response.status_code} {response.text}"
)
def disable_webhook(
self,
trigger_ref: Optional[str] = None,
trigger_id: Optional[int] = None,
) -> dict:
"""Disable webhooks for a trigger
Supports both trigger_ref and trigger_id arguments for backward compatibility.
"""
# Handle legacy trigger_id argument
if trigger_id and not trigger_ref:
trigger = self.get_trigger(trigger_id)
if trigger:
trigger_ref = trigger["ref"]
else:
raise Exception(f"Trigger {trigger_id} not found")
if not trigger_ref:
raise ValueError("Either trigger_ref or trigger_id must be provided")
response = self._request(
"POST", f"/api/v1/triggers/{trigger_ref}/webhooks/disable"
)
if response.status_code in (200, 201):
data = response.json()
if "data" in data:
return data["data"]
return data
raise Exception(
f"Failed to disable webhook: {response.status_code} {response.text}"
)
def fire_webhook(
self,
trigger_ref: Optional[str] = None,
payload: Optional[dict] = None,
trigger_id: Optional[int] = None,
auto_enable: bool = True,
) -> dict:
"""Fire a webhook trigger
Supports both trigger_ref and trigger_id arguments for backward compatibility.
Args:
trigger_ref: Trigger reference
payload: Webhook payload
trigger_id: Trigger ID (legacy)
auto_enable: Automatically enable webhooks if not enabled (default: True)
"""
# Handle legacy trigger_id argument
if trigger_id and not trigger_ref:
trigger = self.get_trigger(trigger_id)
if trigger:
trigger_ref = trigger["ref"]
else:
raise Exception(f"Trigger {trigger_id} not found")
if not trigger_ref:
raise ValueError("Either trigger_ref or trigger_id must be provided")
# Get the trigger to check if webhooks are enabled
trigger = self.get_trigger_by_ref(trigger_ref)
if not trigger:
raise Exception(f"Trigger {trigger_ref} not found")
# Enable webhooks if not enabled and auto_enable is True
if not trigger.get("webhook_enabled") and auto_enable:
trigger = self.enable_webhook(trigger_ref=trigger_ref)
# Check if we have a webhook_key now
if not trigger.get("webhook_key"):
raise Exception(f"Trigger {trigger_ref} does not have a webhook_key")
# Use plain POST request instead of generated client to handle API response structure
webhook_key = trigger["webhook_key"]
response = self._request(
"POST",
f"/api/v1/webhooks/{webhook_key}",
json={"payload": payload},
)
if response.status_code in (200, 201):
data = response.json()
if "data" in data:
return data["data"]
return data
raise Exception(
f"Failed to fire webhook: {response.status_code} {response.text}"
)
# ========================================================================
# Sensors
# ========================================================================
def list_sensors(self, **params) -> list[dict]:
"""List all sensors"""
response = gen_list_sensors.sync(client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_sensor(self, sensor_id: int) -> dict:
"""Get sensor by ID"""
sensors = self.list_sensors()
for sensor in sensors:
if sensor.get("id") == sensor_id:
response = gen_get_sensor.sync(
ref=sensor.get("ref", str(sensor_id)), client=self._get_client()
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
raise Exception(f"Sensor {sensor_id} not found")
def create_sensor(
self,
trigger_id: int,
enabled: bool = True,
parameters: Optional[dict] = None,
**kwargs,
) -> dict:
"""Create a new sensor"""
# Get trigger to obtain trigger_ref
trigger = self.get_trigger(trigger_id)
trigger_ref = trigger.get("ref")
# Extract required fields from kwargs or use defaults
ref = kwargs.get("ref", f"sensor_{trigger_id}")
pack_ref = kwargs.get("pack_ref", "core")
runtime_ref = kwargs.get("runtime_ref", "python3")
label = kwargs.get("label", f"Sensor for {trigger_ref}")
entrypoint = kwargs.get("entrypoint", "internal://sensor")
description = kwargs.get("description", f"Sensor for trigger {trigger_ref}")
param_schema = kwargs.get("param_schema")
config = kwargs.get("config")
request = CreateSensorRequest(
ref=ref,
trigger_ref=trigger_ref,
pack_ref=pack_ref,
runtime_ref=runtime_ref,
label=label,
entrypoint=entrypoint,
description=description,
param_schema=param_schema,
config=config,
enabled=enabled,
)
response = gen_create_sensor.sync(client=self._get_client(), body=request)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return result
# Get more detailed error information
try:
error_response = self._request(
"POST", "/api/v1/sensors", json=request.to_dict()
)
error_msg = f"Failed to create sensor: {error_response.status_code} - {error_response.text}"
except Exception as e:
error_msg = f"Failed to create sensor: {str(e)}"
raise Exception(error_msg)
def delete_sensor(self, sensor_id: int):
"""Delete a sensor"""
sensor = self.get_sensor(sensor_id)
if sensor:
gen_delete_sensor.sync(
ref=sensor.get("ref", str(sensor_id)), client=self._get_client()
)
# ========================================================================
# Rules
# ========================================================================
def list_rules(self, **params) -> list[dict]:
"""List all rules"""
response = gen_list_rules.sync(client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_rule(self, rule_id: int) -> dict:
"""Get rule by ID"""
rules = self.list_rules()
for rule in rules:
if rule.get("id") == rule_id:
response = gen_get_rule.sync(
ref=rule.get("ref", str(rule_id)), client=self._get_client()
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
raise Exception(f"Rule {rule_id} not found")
def create_rule(
self,
ref: Optional[str] = None,
label: Optional[str] = None,
pack_ref: Optional[str] = None,
trigger_ref: Optional[str] = None,
action_ref: Optional[str] = None,
enabled: bool = True,
description: Optional[str] = None,
criteria: Optional[str] = None,
action_parameters: Optional[dict] = None,
# Legacy arguments for backward compatibility
name: Optional[str] = None,
trigger_id: Optional[int] = None,
**kwargs,
) -> dict:
"""Create a new rule
Supports both new-style (ref, label, pack_ref, trigger_ref, action_ref)
and legacy-style (name, pack_ref, trigger_id, action_ref) arguments.
"""
# Handle legacy-style arguments
if pack_ref and name:
# Build ref from pack_ref and name
ref = f"{pack_ref}.{name}"
label = name.replace("_", " ").title()
# If trigger_id is provided, get the trigger to find trigger_ref
if trigger_id and not trigger_ref:
trigger = self.get_trigger(trigger_id)
if trigger:
trigger_ref = trigger["ref"]
else:
raise Exception(f"Trigger {trigger_id} not found")
# Validate required fields
if not ref or not label or not pack_ref or not trigger_ref or not action_ref:
raise ValueError(
"Missing required arguments: ref, label, pack_ref, trigger_ref, and action_ref "
"(or pack_ref, name, trigger_id, and action_ref)"
)
# Use plain POST request instead of generated client to handle API schema changes
payload = {
"ref": ref,
"label": label,
"pack_ref": pack_ref,
"trigger_ref": trigger_ref,
"action_ref": action_ref,
"enabled": enabled,
"description": description or f"Rule: {label}",
}
if criteria:
payload["criteria"] = criteria
if action_parameters:
payload["action_parameters"] = action_parameters
response = self._request("POST", "/api/v1/rules", json=payload)
if response.status_code in (200, 201):
data = response.json()
if "data" in data:
return data["data"]
return data
raise Exception(
f"Failed to create rule: {response.status_code} {response.text}"
)
def update_rule(self, rule_id: int, **kwargs) -> dict:
"""Update a rule"""
raise NotImplementedError("update_rule not yet implemented")
def enable_rule(self, rule_id: int) -> dict:
"""Enable a rule"""
rule = self.get_rule(rule_id)
response = gen_enable_rule.sync(
ref=rule.get("ref", str(rule_id)), client=self._get_client()
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return {}
def disable_rule(self, rule_id: int) -> dict:
"""Disable a rule"""
rule = self.get_rule(rule_id)
response = gen_disable_rule.sync(
ref=rule.get("ref", str(rule_id)), client=self._get_client()
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return {}
def delete_rule(self, rule_id: int):
"""Delete a rule"""
rule = self.get_rule(rule_id)
if rule:
gen_delete_rule.sync(
ref=rule.get("ref", str(rule_id)), client=self._get_client()
)
# ========================================================================
# Events
# ========================================================================
def list_events(self, **params) -> list[dict]:
"""List all events"""
# Map trigger_id to trigger for backward compatibility
trigger = params.get("trigger_id") or params.get("trigger")
response = gen_list_events.sync(
client=self._get_client(),
trigger=trigger,
trigger_ref=params.get("trigger_ref"),
source=params.get("source"),
page=params.get("page"),
per_page=params.get("limit"),
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_event(self, event_id: int) -> dict:
"""Get event by ID"""
# Events don't have a get-by-ref endpoint, need to list and filter
events = self.list_events(limit=1000)
for event in events:
if event.get("id") == event_id:
return event
raise Exception(f"Event {event_id} not found")
# ========================================================================
# Enforcements
# ========================================================================
def list_enforcements(self, **params) -> list[dict]:
"""List all enforcements"""
response = gen_list_enforcements.sync(
client=self._get_client(),
rule_id=params.get("rule_id"),
limit=params.get("limit"),
offset=params.get("offset"),
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_enforcement(self, enforcement_id: int) -> dict:
"""Get enforcement by ID"""
enforcements = self.list_enforcements(limit=1000)
for enforcement in enforcements:
if enforcement.get("id") == enforcement_id:
return enforcement
raise Exception(f"Enforcement {enforcement_id} not found")
# ========================================================================
# Executions
# ========================================================================
def list_executions(self, **params) -> list[dict]:
"""List all executions"""
response = gen_list_executions.sync(
client=self._get_client(),
action_ref=params.get("action_ref"),
status=params.get("status"),
limit=params.get("limit"),
offset=params.get("offset"),
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_execution(self, execution_id: int) -> dict:
"""Get execution by ID"""
response = gen_get_execution.sync(
ref=str(execution_id), client=self._get_client()
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
raise Exception(f"Execution {execution_id} not found")
def cancel_execution(self, execution_id: int) -> dict:
"""Cancel an execution"""
raise NotImplementedError("cancel_execution not yet implemented")
# ========================================================================
# Inquiries
# ========================================================================
def list_inquiries(self, **params) -> list[dict]:
"""List all inquiries"""
response = gen_list_inquiries.sync(
client=self._get_client(),
status=params.get("status"),
limit=params.get("limit"),
offset=params.get("offset"),
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_inquiry(self, inquiry_id: int) -> dict:
"""Get inquiry by ID"""
inquiries = self.list_inquiries(limit=1000)
for inquiry in inquiries:
if inquiry.get("id") == inquiry_id:
return inquiry
raise Exception(f"Inquiry {inquiry_id} not found")
def respond_to_inquiry(self, inquiry_id: int, response_data: dict) -> dict:
"""Respond to an inquiry"""
request = InquiryRespondRequest(response=response_data)
response = gen_respond_inquiry.sync(
ref=str(inquiry_id), client=self._get_client(), body=request
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return {}
# ========================================================================
# Secrets (Datastore/Keys)
# ========================================================================
def list_secrets(self, **params) -> list[dict]:
"""List all secrets/keys"""
response = gen_list_keys.sync(client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return []
def get_secret(self, key_name: str, **params) -> Optional[dict]:
"""Get secret by key name"""
response = gen_get_key.sync(ref=key_name, client=self._get_client())
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return None
def datastore_get(self, key: str, **params) -> Optional[str]:
"""Get value from datastore"""
secret = self.get_secret(key)
return secret.get("value") if secret else None
def datastore_set(self, key: str, value: str, **params) -> dict:
"""Set value in datastore"""
# Check if key exists
existing = self.get_secret(key)
if existing:
# Update existing
request = UpdateKeyRequest(value=value)
response = gen_update_key.sync(
ref=key, client=self._get_client(), body=request
)
else:
# Create new
request = CreateKeyRequest(
key=key,
value=value,
scope=params.get("scope", "user"),
)
response = gen_create_key.sync(client=self._get_client(), body=request)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return result
return {}
def datastore_delete(self, key: str, **params):
"""Delete value from datastore"""
gen_delete_key.sync(ref=key, client=self._get_client())
def create_secret(self, key: str, value: str, **params) -> dict:
"""Create a new secret"""
request = CreateKeyRequest(
key=key,
value=value,
scope=params.get("scope", "user"),
)
response = gen_create_key.sync(client=self._get_client(), body=request)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return result
return {}
def update_secret(self, key_id: int, **params) -> dict:
"""Update a secret"""
# Need to get key ref first
secrets = self.list_secrets()
for secret in secrets:
if secret.get("id") == key_id:
request = UpdateKeyRequest(value=params.get("value"))
response = gen_update_key.sync(
ref=secret["key"], client=self._get_client(), body=request
)
if response:
result = to_dict(response)
if isinstance(result, dict) and "data" in result:
return result["data"]
return {}
def delete_secret(self, key_id: int):
"""Delete a secret"""
secrets = self.list_secrets()
for secret in secrets:
if secret.get("id") == key_id:
gen_delete_key.sync(ref=secret["key"], client=self._get_client())
return
# ========================================================================
# Compatibility helpers
# ========================================================================
def _request(self, method: str, path: str, **kwargs):
"""Raw request method for backward compatibility"""
# This is for any edge cases that need raw access
client = self._get_client()
url = f"{path}"
response = client.get_httpx_client().request(method, url, **kwargs)
return response
def get(self, path: str, **kwargs):
"""GET request"""
return self._request("GET", path, **kwargs)
def post(self, path: str, **kwargs):
"""POST request"""
return self._request("POST", path, **kwargs)
def put(self, path: str, **kwargs):
"""PUT request"""
return self._request("PUT", path, **kwargs)
def patch(self, path: str, **kwargs):
"""PATCH request"""
return self._request("PATCH", path, **kwargs)
def delete(self, path: str, **kwargs):
"""DELETE request"""
return self._request("DELETE", path, **kwargs)