""" AttuneClient Helper Provides a high-level client for interacting with the Attune API during end-to-end testing. """ import os import time from typing import Any, Dict, List, Optional import psycopg2 import requests from requests.adapters import HTTPAdapter from requests.models import Response from urllib3.util.retry import Retry class AttuneClient: """High-level client for Attune API testing""" def __init__( self, base_url: Optional[str] = None, timeout: int = 30, auto_login: bool = True, ): """ Initialize Attune API client Args: base_url: API base URL (defaults to ATTUNE_API_URL env var or localhost) timeout: Default request timeout in seconds auto_login: Automatically login if token not present """ self.base_url = ( base_url or os.getenv("ATTUNE_API_URL", "http://localhost:8080") ).rstrip("/") self.timeout = timeout self.auto_login = auto_login self.session = requests.Session() self.token: Optional[str] = None self.user_id: Optional[int] = None self.tenant_id: Optional[int] = None # Configure retry strategy for flaky network conditions retry_strategy = Retry( total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=[ "HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE", "POST", ], ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) # ======================================================================== # Authentication # ======================================================================== def register( self, login: str = "test@attune.local", password: str = "TestPass123!", display_name: str = "Test User", ) -> Dict[str, Any]: """ Register a new user Args: login: User login (email or username) password: User password display_name: User's display name Returns: Registration response with user data """ response = self.session.post( f"{self.base_url}/auth/register", json={ "login": login, "password": password, "display_name": display_name, }, timeout=self.timeout, ) response.raise_for_status() return response.json() def login( self, login: str = "test@attune.local", password: str = "TestPass123!", create_if_missing: bool = True, ) -> str: """ Authenticate and get JWT token Args: login: User login password: User password create_if_missing: Auto-register if user doesn't exist Returns: JWT access token """ try: response = self.session.post( f"{self.base_url}/auth/login", json={"login": login, "password": password}, timeout=self.timeout, ) response.raise_for_status() data = response.json() self.token = data["data"]["access_token"] self.user_id = data["data"].get("user_id") self.tenant_id = data["data"].get("tenant_id") # Update session headers self.session.headers.update({"Authorization": f"Bearer {self.token}"}) return self.token except requests.exceptions.HTTPError as e: if e.response.status_code in [401, 404] and create_if_missing: # User doesn't exist, try to register self.register(login, password) # Retry login return self.login(login, password, create_if_missing=False) raise def logout(self): """Clear authentication token""" self.token = None self.user_id = None self.tenant_id = None if "Authorization" in self.session.headers: del self.session.headers["Authorization"] # ======================================================================== # Core Request Methods # ======================================================================== def _request( self, method: str, path: str, auto_auth: bool = True, **kwargs ) -> Dict[str, Any]: """ Make authenticated API request Args: method: HTTP method path: API path (relative to base_url) auto_auth: Automatically login if not authenticated **kwargs: Additional request arguments Returns: JSON response data """ # Auto-login if needed if ( auto_auth and not self.token and path not in ["/auth/login", "/auth/register"] ): if self.auto_login: self.login() else: raise RuntimeError( "Not authenticated. Call login() first or set auto_login=True" ) # Build full URL url = f"{self.base_url}{path}" # Set default timeout if not provided if "timeout" not in kwargs: kwargs["timeout"] = self.timeout # Make request response = self.session.request(method, url, **kwargs) response.raise_for_status() # Parse JSON response return response.json() def get(self, path: str, **kwargs) -> Dict[str, Any]: """GET request""" return self._request("GET", path, **kwargs) def post(self, path: str, **kwargs) -> Dict[str, Any]: """POST request""" return self._request("POST", path, **kwargs) def put(self, path: str, **kwargs) -> Dict[str, Any]: """PUT request""" return self._request("PUT", path, **kwargs) def patch(self, path: str, **kwargs) -> Dict[str, Any]: """PATCH request""" return self._request("PATCH", path, **kwargs) def delete(self, path: str, **kwargs) -> Dict[str, Any]: """DELETE request""" return self._request("DELETE", path, **kwargs) # ======================================================================== # Health Check # ======================================================================== def health(self) -> Dict[str, Any]: """Check API health""" return self.get("/health", auto_auth=False) # ======================================================================== # Pack Management # ======================================================================== def list_packs(self) -> List[Dict[str, Any]]: """List all packs""" response = self.get("/api/v1/packs") return response["data"] def get_pack(self, pack_id: int) -> Dict[str, Any]: """Get pack by ID""" response = self.get(f"/api/v1/packs/{pack_id}") return response["data"] def get_pack_by_ref(self, pack_ref: str) -> Optional[Dict[str, Any]]: """Get pack by reference (namespace.name)""" packs = self.list_packs() for pack in packs: if pack["ref"] == pack_ref: return pack return None def create_pack( self, pack_data: Dict[str, Any] = None, ref: str = None, label: str = None, version: str = "1.0.0", description: str = None, conf_schema: Dict[str, Any] = None, config: Dict[str, Any] = None, meta: Dict[str, Any] = None, tags: List[str] = None, **kwargs, ) -> Dict[str, Any]: """ Create a new pack Args: pack_data: Dict containing pack data (alternative to keyword args) ref: Unique reference identifier (e.g., "slack", "aws") label: Human-readable label version: Pack version (default: "1.0.0") description: Pack description conf_schema: Configuration schema (JSON Schema) config: Pack configuration values meta: Pack metadata tags: Tags for categorization Returns: Created pack data """ # If pack_data dict is provided, use it as base if pack_data: payload = { "ref": pack_data.get("ref", ref), "label": pack_data.get("label") or pack_data.get("name") or pack_data.get("ref", ref), "version": pack_data.get("version", version), "conf_schema": pack_data.get("conf_schema", {}), "config": pack_data.get("config", {}), "meta": pack_data.get("meta", {}), "tags": pack_data.get("tags", []), } if "description" in pack_data: payload["description"] = pack_data["description"] else: # Use keyword arguments payload = { "ref": ref, "label": label or kwargs.get("name") or ref, "version": version, "conf_schema": conf_schema or {}, "config": config or {}, "meta": meta or {}, "tags": tags or [], } if description: payload["description"] = description response = self.post("/api/v1/packs", json=payload) return response["data"] def register_pack( self, pack_dir: str, skip_tests: bool = True, force: bool = False ) -> Dict[str, Any]: """ Register pack from directory Args: pack_dir: Path to pack directory containing pack.yaml skip_tests: Skip running pack tests during registration (default: True) force: Force re-registration if pack already exists (default: False) Returns: Created pack data """ response = self.post( "/api/v1/packs/register", json={"path": pack_dir, "skip_tests": skip_tests, "force": force}, ) return response["data"] def reload_pack(self, pack_id: int) -> Dict[str, Any]: """Reload pack (refresh metadata and actions)""" response = self.post(f"/api/v1/packs/{pack_id}/reload") return response["data"] def delete_pack(self, pack_id: int) -> Dict[str, Any]: """Delete pack""" return self.delete(f"/api/v1/packs/{pack_id}") # ======================================================================== # Runtime Management # ======================================================================== def list_runtimes(self) -> List[Dict[str, Any]]: """ List all runtimes Returns: List of runtimes """ response = self.get("/api/v1/runtimes") return response["data"] # ======================================================================== # Action Management # ======================================================================== def list_actions(self, pack_ref: Optional[str] = None) -> List[Dict[str, Any]]: """ List actions Args: pack_ref: Optional filter by pack reference Returns: List of actions """ params = {} if pack_ref: params["pack_ref"] = pack_ref response = self.get("/api/v1/actions", params=params) return response["data"] def get_action(self, action_id: int) -> Dict[str, Any]: """Get action by ID""" response = self.get(f"/api/v1/actions/{action_id}") return response["data"] def get_action_by_ref(self, action_ref: str) -> Optional[Dict[str, Any]]: """Get action by reference (pack.action_name)""" actions = self.list_actions() for action in actions: if action["ref"] == action_ref: return action return None def create_action( self, pack_ref: str = None, name: str = None, runner_type: str = None, entrypoint: str = "", param_schema: Optional[Dict[str, Any]] = None, ref: str = None, label: str = None, description: str = None, runtime: int = None, runtime_ref: str = None, out_schema: Optional[Dict[str, Any]] = None, **kwargs, ) -> Dict[str, Any]: """ Create action Args: pack_ref: Pack reference (namespace.pack_name) name: Action name (legacy, maps to label) runner_type: Runner type (legacy, maps to runtime_ref) entrypoint: Entry point (e.g., actions/echo.py) param_schema: JSON Schema for parameters ref: Unique reference identifier (preferred) label: Human-readable label description: Action description runtime: Runtime ID (preferred over runtime_ref) runtime_ref: Runtime reference (e.g., "python3") out_schema: Output schema (JSON Schema) **kwargs: Additional action fields Returns: Created action data """ # Handle legacy parameters if not ref and name: # Generate ref from pack_ref and name if pack_ref and "." not in name: ref = f"{pack_ref}.{name}" else: ref = name if not label: label = name or ref or "Unnamed Action" if not description: description = label # Convert runner_type to runtime ID if needed if not runtime and not runtime_ref: runtime_ref = runner_type or "python3" if not runtime and runtime_ref: # Try to look up runtime by reference # If endpoint doesn't exist, runtime field is optional, so we can skip it try: runtimes = self.list_runtimes() for rt in runtimes: if rt.get("ref") == runtime_ref: runtime = rt["id"] break # If not found, try common mappings if not runtime: if runtime_ref in ["python3", "python"]: runtime_ref = "core.action.python3" elif runtime_ref in ["shell", "bash"]: runtime_ref = "core.action.shell" elif runtime_ref == "http": runtime_ref = "core.action.http" # Try lookup again with mapped ref for rt in runtimes: if rt.get("ref") == runtime_ref: runtime = rt["id"] break except Exception as e: # Runtime endpoint doesn't exist or failed - runtime is optional, so continue without it pass payload = { "ref": ref, "pack_ref": pack_ref, "label": label, "description": description, "entrypoint": entrypoint or f"actions/{name or 'action'}.py", } if runtime: payload["runtime"] = runtime if param_schema: payload["param_schema"] = param_schema if out_schema: payload["out_schema"] = out_schema # Merge any additional kwargs payload.update(kwargs) response = self.post("/api/v1/actions", json=payload) return response["data"] def delete_action(self, action_id: int) -> Dict[str, Any]: """Delete action""" return self.delete(f"/api/v1/actions/{action_id}") # ======================================================================== # Trigger Management # ======================================================================== def list_triggers(self) -> List[Dict[str, Any]]: """List all triggers""" response = self.get("/api/v1/triggers") return response["data"] def get_trigger(self, trigger_id: int) -> Dict[str, Any]: """Get trigger by ID""" response = self.get(f"/api/v1/triggers/{trigger_id}") return response["data"] def create_trigger( self, pack_ref: str = None, name: str = None, trigger_type: str = None, ref: str = None, label: str = None, description: str = None, param_schema: Optional[Dict[str, Any]] = None, out_schema: Optional[Dict[str, Any]] = None, enabled: bool = True, parameters: Optional[Dict[str, Any]] = None, **kwargs, ) -> Dict[str, Any]: """ Create trigger Args: pack_ref: Pack reference (optional) name: Trigger name (legacy, maps to label) trigger_type: Type (legacy, not used in API) ref: Unique reference identifier (e.g., "core.webhook") label: Human-readable label description: Trigger description param_schema: Parameter schema (JSON Schema) out_schema: Output schema (JSON Schema) enabled: Whether the trigger is enabled parameters: Trigger-specific parameters (legacy, not used) **kwargs: Additional trigger fields Returns: Created trigger data """ # Handle legacy name/trigger_type parameters if not ref and name: # Generate ref from pack_ref and name if pack_ref and "." not in name: ref = f"{pack_ref}.{name}" else: ref = name if not label: label = name or ref payload = { "ref": ref, "label": label, "enabled": enabled, } if pack_ref: payload["pack_ref"] = pack_ref if description: payload["description"] = description if param_schema: payload["param_schema"] = param_schema if out_schema: payload["out_schema"] = out_schema # Merge any additional kwargs payload.update(kwargs) response = self.post("/api/v1/triggers", json=payload) return response["data"] def delete_trigger(self, trigger_id: int) -> Dict[str, Any]: """Delete trigger""" return self.delete(f"/api/v1/triggers/{trigger_id}") def fire_webhook(self, trigger_id: int, payload: Dict[str, Any]) -> Dict[str, Any]: """ Fire webhook trigger Args: trigger_id: Webhook trigger ID payload: Webhook payload data Returns: Created event data """ response = self.post(f"/api/v1/webhooks/{trigger_id}", json=payload) return response["data"] # ======================================================================== # Sensor Management # ======================================================================== def list_sensors(self) -> List[Dict[str, Any]]: """List all sensors""" response = self.get("/api/v1/sensors") return response["data"] def get_sensor(self, sensor_id: int) -> Dict[str, Any]: """Get sensor by ID""" response = self.get(f"/api/v1/sensors/{sensor_id}") return response["data"] def create_sensor( self, ref: str = None, trigger_id: int = None, trigger_ref: str = None, label: str = None, description: str = "", entrypoint: str = "internal://timer", runtime_ref: str = "python3", pack_ref: str = None, enabled: bool = True, config: Optional[Dict[str, Any]] = None, **kwargs, ) -> Dict[str, Any]: """ Create sensor (using direct SQL until API endpoint exists) Args: ref: Unique reference (e.g., "pack.sensor_name") trigger_id: Trigger ID this sensor monitors trigger_ref: Trigger reference label: Human-readable label description: Sensor description entrypoint: Entry point (default: internal://timer for timers) runtime_ref: Runtime reference (default: python3) pack_ref: Pack reference enabled: Whether sensor is enabled config: Sensor configuration (e.g., {"interval": 5, "unit": "seconds"}) Returns: Created sensor data """ # Get database connection from environment db_url = os.environ.get( "DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/attune" ) conn = psycopg2.connect(db_url) cur = conn.cursor() try: # Map short runtime names to full refs (core.action.*) runtime_ref_map = { "python3": "core.action.python3", "nodejs": "core.action.nodejs", "shell": "core.action.shell", } full_runtime_ref = runtime_ref_map.get(runtime_ref, runtime_ref) # Get runtime ID cur.execute( "SELECT id FROM attune.runtime WHERE ref = %s", (full_runtime_ref,) ) runtime_row = cur.fetchone() if not runtime_row: raise ValueError(f"Runtime not found: {full_runtime_ref}") runtime_id = runtime_row[0] # Get pack ID if pack_ref is provided pack_id = None if pack_ref: cur.execute("SELECT id FROM attune.pack WHERE ref = %s", (pack_ref,)) pack_row = cur.fetchone() if pack_row: pack_id = pack_row[0] # Convert config to JSON import json config_json = json.dumps(config) if config else None # Insert sensor cur.execute( """ INSERT INTO attune.sensor (ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_ref, trigger, trigger_ref, enabled, config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s::jsonb) RETURNING id, ref, pack, pack_ref, label, description, entrypoint, runtime, runtime_ref, trigger, trigger_ref, enabled, config, created, updated """, ( ref, pack_id, pack_ref, label, description, entrypoint, runtime_id, full_runtime_ref, trigger_id, trigger_ref, enabled, config_json, ), ) row = cur.fetchone() conn.commit() # Convert to dict sensor = { "id": row[0], "ref": row[1], "pack": row[2], "pack_ref": row[3], "label": row[4], "description": row[5], "entrypoint": row[6], "runtime": row[7], "runtime_ref": row[8], "trigger": row[9], "trigger_ref": row[10], "enabled": row[11], "config": row[12], "created": row[13].isoformat() if row[13] else None, "updated": row[14].isoformat() if row[14] else None, } return sensor finally: cur.close() conn.close() def delete_sensor(self, sensor_id: int) -> Dict[str, Any]: """Delete sensor""" return self.delete(f"/api/v1/sensors/{sensor_id}") # ======================================================================== # Rule Management # ======================================================================== def list_rules(self) -> List[Dict[str, Any]]: """List all rules""" response = self.get("/api/v1/rules") return response["data"] def get_rule(self, rule_id: int) -> Dict[str, Any]: """Get rule by ID""" response = self.get(f"/api/v1/rules/{rule_id}") return response["data"] def create_rule( self, name: str = None, pack_ref: str = None, trigger_id: int = None, action_ref: str = None, enabled: bool = True, criteria: Optional[str] = None, action_parameters: Optional[Dict[str, Any]] = None, ref: str = None, label: str = None, description: str = None, trigger_ref: str = None, conditions: Optional[Dict[str, Any]] = None, action_params: Optional[Dict[str, Any]] = None, trigger_params: Optional[Dict[str, Any]] = None, **kwargs, ) -> Dict[str, Any]: """ Create rule Args: name: Rule name (legacy, maps to label) pack_ref: Pack reference trigger_id: Trigger ID (legacy, converted to trigger_ref) action_ref: Action reference to execute enabled: Whether rule is enabled criteria: Optional Jinja2 criteria expression (legacy, maps to conditions) action_parameters: Parameters to pass to action (legacy, maps to action_params) ref: Unique reference identifier (e.g., "mypack.notify_on_error") label: Human-readable label description: Rule description trigger_ref: Trigger reference (preferred over trigger_id) conditions: Conditions for rule evaluation (JSON Logic) action_params: Parameters to pass to action trigger_params: Parameters for trigger configuration **kwargs: Additional rule fields Returns: Created rule data """ # Handle legacy parameters if not ref and name: # Generate ref from pack_ref and name if pack_ref and "." not in name: ref = f"{pack_ref}.{name}" else: ref = name if not label: label = name or ref or "Unnamed Rule" if not description: description = label # Convert trigger_id to trigger_ref if needed if not trigger_ref and trigger_id: trigger = self.get_trigger(trigger_id) trigger_ref = trigger["ref"] # Map legacy fields to new schema if not conditions and criteria: # Simple string criteria - wrap in basic condition conditions = {"criteria": criteria} if not action_params: action_params = action_parameters or {} if not trigger_params: trigger_params = {} payload = { "ref": ref, "pack_ref": pack_ref, "label": label, "description": description, "action_ref": action_ref, "trigger_ref": trigger_ref, "conditions": conditions or {}, "action_params": action_params, "trigger_params": trigger_params, "enabled": enabled, } # Merge any additional kwargs payload.update(kwargs) response = self.post("/api/v1/rules", json=payload) return response["data"] def update_rule(self, rule_id: int, **kwargs) -> Dict[str, Any]: """Update rule""" response = self.patch(f"/api/v1/rules/{rule_id}", json=kwargs) return response["data"] def enable_rule(self, rule_id: int) -> Dict[str, Any]: """Enable rule""" return self.update_rule(rule_id, enabled=True) def disable_rule(self, rule_id: int) -> Dict[str, Any]: """Disable rule""" return self.update_rule(rule_id, enabled=False) def delete_rule(self, rule_id: int) -> Dict[str, Any]: """Delete rule""" return self.delete(f"/api/v1/rules/{rule_id}") # ======================================================================== # Event Management # ======================================================================== def list_events( self, trigger_id: Optional[int] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """ List events Args: trigger_id: Optional filter by trigger ID limit: Maximum number of events to return offset: Pagination offset Returns: List of events """ params = {"limit": limit, "offset": offset} if trigger_id: params["trigger_id"] = trigger_id response = self.get("/api/v1/events", params=params) return response["data"] def get_event(self, event_id: int) -> Dict[str, Any]: """Get event by ID""" response = self.get(f"/api/v1/events/{event_id}") return response["data"] # ======================================================================== # Enforcement Management # ======================================================================== def list_enforcements( self, rule_id: Optional[int] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """List enforcements""" params = {"limit": limit, "offset": offset} if rule_id: params["rule_id"] = rule_id response = self.get("/api/v1/enforcements", params=params) return response["data"] def get_enforcement(self, enforcement_id: int) -> Dict[str, Any]: """Get enforcement by ID""" response = self.get(f"/api/v1/enforcements/{enforcement_id}") return response["data"] # ======================================================================== # Execution Management # ======================================================================== def list_executions( self, action_ref: Optional[str] = None, status: Optional[str] = None, enforcement_id: Optional[int] = None, limit: int = 100, offset: int = 0, ) -> List[Dict[str, Any]]: """ List executions Args: action_ref: Optional filter by action reference status: Optional filter by status enforcement_id: Optional filter by enforcement ID limit: Maximum number of executions offset: Pagination offset Returns: List of executions """ params = {"limit": limit, "offset": offset} if action_ref: params["action_ref"] = action_ref if status: params["status"] = status if enforcement_id: params["enforcement"] = enforcement_id response = self.get("/api/v1/executions", params=params) return response["data"] def get_execution(self, execution_id: int) -> Dict[str, Any]: """Get execution by ID""" response = self.get(f"/api/v1/executions/{execution_id}") return response["data"] def cancel_execution(self, execution_id: int) -> Dict[str, Any]: """Cancel running execution""" response = self.post(f"/api/v1/executions/{execution_id}/cancel") return response["data"] # ======================================================================== # Inquiry Management # ======================================================================== def list_inquiries( self, status: Optional[str] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """List inquiries""" params = {"limit": limit, "offset": offset} if status: params["status"] = status response = self.get("/api/v1/inquiries", params=params) return response["data"] def get_inquiry(self, inquiry_id: int) -> Dict[str, Any]: """Get inquiry by ID""" response = self.get(f"/api/v1/inquiries/{inquiry_id}") return response["data"] def respond_to_inquiry( self, inquiry_id: int, response_data: Dict[str, Any] ) -> Dict[str, Any]: """ Respond to inquiry Args: inquiry_id: Inquiry ID response_data: Response data (structure depends on inquiry type) Returns: Updated inquiry data """ response = self.post( f"/api/v1/inquiries/{inquiry_id}/respond", json=response_data ) return response["data"] # ======================================================================== # Datastore (Key-Value Store) # ======================================================================== def datastore_get(self, key: str) -> Optional[Any]: """ Get value from datastore Args: key: Datastore key Returns: Value or None if not found """ try: response = self.get(f"/api/v1/datastore/{key}") return response["data"]["value"] except requests.exceptions.HTTPError as e: if e.response.status_code == 404: return None raise def datastore_set( self, key: str, value: Any, encrypted: bool = False, ttl: Optional[int] = None ) -> Dict[str, Any]: """ Set value in datastore Args: key: Datastore key value: Value to store encrypted: Whether to encrypt value ttl: Time-to-live in seconds Returns: Created datastore item """ payload = { "key": key, "value": value, "encrypted": encrypted, } if ttl: payload["ttl"] = ttl response = self.post("/api/v1/datastore", json=payload) return response["data"] def datastore_delete(self, key: str) -> Dict[str, Any]: """Delete key from datastore""" return self.delete(f"/api/v1/datastore/{key}") # ======================================================================== # Secrets Management # ======================================================================== def list_secrets(self) -> List[Dict[str, Any]]: """List all secrets (values are encrypted)""" response = self.get("/api/v1/secrets") return response["data"] def get_secret(self, key: str) -> Optional[str]: """ Get secret value Args: key: Secret key Returns: Decrypted secret value or None if not found """ try: response = self.get(f"/api/v1/secrets/{key}") return response["data"]["value"] except requests.exceptions.HTTPError as e: if e.response.status_code == 404: return None raise def create_secret( self, key: str = None, value: str = None, name: str = None, encrypted: bool = True, owner_type: str = "system", owner: str = None, owner_identity: int = None, owner_pack: int = None, owner_pack_ref: str = None, owner_action: int = None, owner_action_ref: str = None, owner_sensor: int = None, owner_sensor_ref: str = None, **kwargs, ) -> Dict[str, Any]: """ Create secret (stored as a key in the API) Args: key: Secret key/reference (e.g., "github_token") value: Secret value (will be encrypted if encrypted=True) name: Human-readable name for the key encrypted: Whether to encrypt the value (default: True) owner_type: Type of owner (system, identity, pack, action, sensor) owner: Optional owner string identifier owner_identity: Optional owner identity ID owner_pack: Optional owner pack ID owner_pack_ref: Optional owner pack reference owner_action: Optional owner action ID owner_action_ref: Optional owner action reference owner_sensor: Optional owner sensor ID owner_sensor_ref: Optional owner sensor reference Returns: Created secret metadata """ # Handle legacy kwargs for backwards compatibility if "description" in kwargs: # Ignore description as it's not in the actual API pass payload = { "ref": key, "name": name or key, "value": value, "encrypted": encrypted, "owner_type": owner_type, } # Add optional owner fields if owner: payload["owner"] = owner if owner_identity: payload["owner_identity"] = owner_identity if owner_pack: payload["owner_pack"] = owner_pack if owner_pack_ref: payload["owner_pack_ref"] = owner_pack_ref if owner_action: payload["owner_action"] = owner_action if owner_action_ref: payload["owner_action_ref"] = owner_action_ref if owner_sensor: payload["owner_sensor"] = owner_sensor if owner_sensor_ref: payload["owner_sensor_ref"] = owner_sensor_ref response = self.post("/api/v1/keys", json=payload) return response["data"] def update_secret(self, key: str, value: str) -> Dict[str, Any]: """Update secret value""" response = self.put(f"/api/v1/secrets/{key}", json={"value": value}) return response["data"] def delete_secret(self, key: str) -> Dict[str, Any]: """Delete secret""" return self.delete(f"/api/v1/secrets/{key}")