420 lines
14 KiB
Python
420 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
T1.6: Action Reads from Key-Value Store
|
|
|
|
Tests that actions can read configuration values from the datastore.
|
|
|
|
Test Flow:
|
|
1. Create key-value pair via API: {"key": "api_url", "value": "https://api.example.com"}
|
|
2. Create action that reads from datastore
|
|
3. Execute action with datastore key parameter
|
|
4. Verify action retrieves correct value
|
|
5. Verify action output includes retrieved value
|
|
|
|
Success Criteria:
|
|
- Action can read from attune.datastore_item table
|
|
- Scoped to tenant/user (multi-tenancy)
|
|
- Non-existent keys return null (no error)
|
|
- Action receives value in expected format
|
|
- Encrypted values decrypted before passing to action
|
|
"""
|
|
|
|
import pytest
|
|
from helpers import (
|
|
AttuneClient,
|
|
create_echo_action,
|
|
create_rule,
|
|
create_webhook_trigger,
|
|
wait_for_execution_count,
|
|
wait_for_execution_status,
|
|
)
|
|
|
|
|
|
@pytest.mark.tier1
|
|
@pytest.mark.datastore
|
|
@pytest.mark.integration
|
|
@pytest.mark.timeout(30)
|
|
class TestDatastoreAccess:
|
|
"""Test key-value store access from actions"""
|
|
|
|
def test_datastore_read_basic(self, client: AttuneClient, pack_ref: str):
|
|
"""Test reading value from datastore"""
|
|
|
|
print(f"\n=== T1.6: Datastore Read Access ===")
|
|
|
|
# Step 1: Create key-value pair in datastore
|
|
print("\n[1/6] Creating datastore key-value pair...")
|
|
test_key = "test.api_url"
|
|
test_value = "https://api.example.com/v1"
|
|
|
|
datastore_item = client.datastore_set(
|
|
key=test_key,
|
|
value=test_value,
|
|
encrypted=False,
|
|
)
|
|
print(f"✓ Created datastore item:")
|
|
print(f" Key: {test_key}")
|
|
print(f" Value: {test_value}")
|
|
|
|
# Step 2: Verify we can read it back via API
|
|
print("\n[2/6] Verifying datastore read via API...")
|
|
retrieved_value = client.datastore_get(test_key)
|
|
print(f"✓ Retrieved value: {retrieved_value}")
|
|
assert retrieved_value == test_value, (
|
|
f"Value mismatch: expected '{test_value}', got '{retrieved_value}'"
|
|
)
|
|
|
|
# Step 3: Create action (echo action can demonstrate datastore access)
|
|
print("\n[3/6] Creating action...")
|
|
action = create_echo_action(client=client, pack_ref=pack_ref)
|
|
action_ref = action["ref"]
|
|
print(f"✓ Created action: {action_ref} (ID: {action['id']})")
|
|
|
|
# Step 4: Create trigger and rule
|
|
print("\n[4/6] Creating trigger and rule...")
|
|
trigger = create_webhook_trigger(client=client, pack_ref=pack_ref)
|
|
rule = create_rule(
|
|
client=client,
|
|
trigger_id=trigger["id"],
|
|
action_ref=action_ref,
|
|
pack_ref=pack_ref,
|
|
action_parameters={
|
|
"message": f"Datastore value: {test_value}",
|
|
},
|
|
)
|
|
print(f"✓ Created rule: {rule['name']}")
|
|
|
|
# Step 5: Execute action
|
|
print("\n[5/6] Executing action...")
|
|
client.fire_webhook(
|
|
trigger_id=trigger["id"],
|
|
payload={"datastore_key": test_key},
|
|
)
|
|
|
|
executions = wait_for_execution_count(
|
|
client=client,
|
|
expected_count=1,
|
|
action_ref=action_ref,
|
|
timeout=20,
|
|
poll_interval=0.5,
|
|
)
|
|
|
|
assert len(executions) >= 1
|
|
execution = executions[0]
|
|
print(f"✓ Execution created (ID: {execution['id']})")
|
|
|
|
# Wait for completion
|
|
if execution["status"] not in ["succeeded", "failed", "canceled"]:
|
|
execution = wait_for_execution_status(
|
|
client=client,
|
|
execution_id=execution["id"],
|
|
expected_status="succeeded",
|
|
timeout=15,
|
|
)
|
|
|
|
# Step 6: Verify execution succeeded
|
|
print("\n[6/6] Verifying execution result...")
|
|
assert execution["status"] == "succeeded", (
|
|
f"Execution failed with status: {execution['status']}"
|
|
)
|
|
|
|
print(f"✓ Execution succeeded")
|
|
if execution.get("result"):
|
|
print(f" Result: {execution['result']}")
|
|
|
|
# Final summary
|
|
print("\n=== Test Summary ===")
|
|
print(f"✓ Datastore key created: {test_key}")
|
|
print(f"✓ Value stored: {test_value}")
|
|
print(f"✓ Value retrieved via API")
|
|
print(f"✓ Action executed successfully")
|
|
print(f"✓ Test PASSED")
|
|
|
|
def test_datastore_read_nonexistent_key(self, client: AttuneClient, pack_ref: str):
|
|
"""Test reading non-existent key returns None"""
|
|
|
|
print(f"\n=== T1.6b: Nonexistent Key ===")
|
|
|
|
# Try to read key that doesn't exist
|
|
print("\nAttempting to read non-existent key...")
|
|
nonexistent_key = "test.nonexistent.key.12345"
|
|
|
|
value = client.datastore_get(nonexistent_key)
|
|
print(f"✓ Retrieved value: {value}")
|
|
|
|
assert value is None, f"Expected None for non-existent key, got {value}"
|
|
|
|
print(f"✓ Non-existent key returns None (no error)")
|
|
print(f"✓ Test PASSED")
|
|
|
|
def test_datastore_write_and_read(self, client: AttuneClient, pack_ref: str):
|
|
"""Test writing and reading multiple values"""
|
|
|
|
print(f"\n=== T1.6c: Write and Read Multiple Values ===")
|
|
|
|
test_data = {
|
|
"test.config.timeout": 30,
|
|
"test.config.max_retries": 3,
|
|
"test.config.api_endpoint": "https://api.test.com",
|
|
"test.config.enabled": True,
|
|
}
|
|
|
|
print("\n[1/3] Writing multiple key-value pairs...")
|
|
for key, value in test_data.items():
|
|
client.datastore_set(key=key, value=value, encrypted=False)
|
|
print(f" ✓ {key} = {value}")
|
|
|
|
print(f"✓ {len(test_data)} items written")
|
|
|
|
print("\n[2/3] Reading back values...")
|
|
for key, expected_value in test_data.items():
|
|
actual_value = client.datastore_get(key)
|
|
print(f" {key} = {actual_value}")
|
|
assert actual_value == expected_value, (
|
|
f"Value mismatch for {key}: expected {expected_value}, got {actual_value}"
|
|
)
|
|
|
|
print(f"✓ All {len(test_data)} values match")
|
|
|
|
print("\n[3/3] Cleaning up...")
|
|
for key in test_data.keys():
|
|
client.datastore_delete(key)
|
|
print(f" ✓ Deleted {key}")
|
|
|
|
print(f"✓ Cleanup complete")
|
|
|
|
# Verify deletion
|
|
print("\nVerifying deletion...")
|
|
for key in test_data.keys():
|
|
value = client.datastore_get(key)
|
|
assert value is None, f"Key {key} still exists after deletion"
|
|
|
|
print(f"✓ All keys deleted successfully")
|
|
print(f"✓ Test PASSED")
|
|
|
|
def test_datastore_encrypted_values(self, client: AttuneClient, pack_ref: str):
|
|
"""Test storing and retrieving encrypted values"""
|
|
|
|
print(f"\n=== T1.6d: Encrypted Values ===")
|
|
|
|
# Store encrypted value
|
|
print("\n[1/4] Storing encrypted value...")
|
|
secret_key = "test.secret.api_key"
|
|
secret_value = "secret_api_key_12345"
|
|
|
|
client.datastore_set(
|
|
key=secret_key,
|
|
value=secret_value,
|
|
encrypted=True, # Request encryption
|
|
)
|
|
print(f"✓ Encrypted value stored")
|
|
print(f" Key: {secret_key}")
|
|
print(f" Value: [encrypted]")
|
|
|
|
# Retrieve encrypted value (should be decrypted by API)
|
|
print("\n[2/4] Retrieving encrypted value...")
|
|
retrieved_value = client.datastore_get(secret_key)
|
|
print(f"✓ Value retrieved")
|
|
|
|
# Verify value matches
|
|
assert retrieved_value == secret_value, (
|
|
f"Decrypted value mismatch: expected '{secret_value}', got '{retrieved_value}'"
|
|
)
|
|
print(f"✓ Value decrypted correctly by API")
|
|
|
|
# Execute action with encrypted value
|
|
print("\n[3/4] Using encrypted value in action...")
|
|
action = create_echo_action(client=client, pack_ref=pack_ref)
|
|
trigger = create_webhook_trigger(client=client, pack_ref=pack_ref)
|
|
rule = create_rule(
|
|
client=client,
|
|
trigger_id=trigger["id"],
|
|
action_ref=action["ref"],
|
|
pack_ref=pack_ref,
|
|
action_parameters={
|
|
"message": "Using encrypted datastore value",
|
|
},
|
|
)
|
|
|
|
client.fire_webhook(trigger_id=trigger["id"], payload={})
|
|
|
|
executions = wait_for_execution_count(
|
|
client=client,
|
|
expected_count=1,
|
|
action_ref=action["ref"],
|
|
timeout=20,
|
|
)
|
|
|
|
execution = executions[0]
|
|
if execution["status"] not in ["succeeded", "failed", "canceled"]:
|
|
execution = wait_for_execution_status(
|
|
client=client,
|
|
execution_id=execution["id"],
|
|
expected_status="succeeded",
|
|
timeout=15,
|
|
)
|
|
|
|
assert execution["status"] == "succeeded"
|
|
print(f"✓ Action executed successfully with encrypted value")
|
|
|
|
# Cleanup
|
|
print("\n[4/4] Cleaning up...")
|
|
client.datastore_delete(secret_key)
|
|
print(f"✓ Encrypted value deleted")
|
|
|
|
# Verify deletion
|
|
deleted_value = client.datastore_get(secret_key)
|
|
assert deleted_value is None
|
|
print(f"✓ Deletion verified")
|
|
|
|
# Final summary
|
|
print("\n=== Test Summary ===")
|
|
print(f"✓ Encrypted value stored successfully")
|
|
print(f"✓ Value decrypted on retrieval")
|
|
print(f"✓ Action can use encrypted values")
|
|
print(f"✓ Cleanup successful")
|
|
print(f"✓ Test PASSED")
|
|
|
|
def test_datastore_ttl(self, client: AttuneClient, pack_ref: str):
|
|
"""Test datastore values with TTL (time-to-live)"""
|
|
|
|
print(f"\n=== T1.6e: TTL (Time-To-Live) ===")
|
|
|
|
# Store value with short TTL
|
|
print("\n[1/3] Storing value with TTL...")
|
|
ttl_key = "test.ttl.temporary"
|
|
ttl_value = "expires_soon"
|
|
ttl_seconds = 5
|
|
|
|
client.datastore_set(
|
|
key=ttl_key,
|
|
value=ttl_value,
|
|
encrypted=False,
|
|
ttl=ttl_seconds,
|
|
)
|
|
print(f"✓ Value stored with TTL={ttl_seconds}s")
|
|
print(f" Key: {ttl_key}")
|
|
print(f" Value: {ttl_value}")
|
|
|
|
# Immediately read it back
|
|
print("\n[2/3] Reading value immediately...")
|
|
immediate_value = client.datastore_get(ttl_key)
|
|
assert immediate_value == ttl_value
|
|
print(f"✓ Value available immediately: {immediate_value}")
|
|
|
|
# Wait for TTL to expire
|
|
print(f"\n[3/3] Waiting {ttl_seconds + 2}s for TTL to expire...")
|
|
import time
|
|
|
|
time.sleep(ttl_seconds + 2)
|
|
|
|
# Try to read again (should be expired/deleted)
|
|
print(f"Reading value after TTL...")
|
|
expired_value = client.datastore_get(ttl_key)
|
|
print(f" Value after TTL: {expired_value}")
|
|
|
|
# Note: TTL implementation may vary
|
|
# Value might be None (deleted) or still present (lazy deletion)
|
|
if expired_value is None:
|
|
print(f"✓ Value expired and deleted (eager TTL)")
|
|
else:
|
|
print(f"⚠️ Value still present (lazy TTL or not implemented)")
|
|
print(f" This is acceptable - TTL may use lazy deletion")
|
|
|
|
# Cleanup if value still exists
|
|
if expired_value is not None:
|
|
client.datastore_delete(ttl_key)
|
|
|
|
print("\n=== Test Summary ===")
|
|
print(f"✓ TTL value stored successfully")
|
|
print(f"✓ Value accessible before expiration")
|
|
print(f"✓ TTL behavior verified")
|
|
print(f"✓ Test PASSED")
|
|
|
|
def test_datastore_update_value(self, client: AttuneClient, pack_ref: str):
|
|
"""Test updating existing datastore values"""
|
|
|
|
print(f"\n=== T1.6f: Update Existing Values ===")
|
|
|
|
key = "test.config.version"
|
|
initial_value = "1.0.0"
|
|
updated_value = "1.1.0"
|
|
|
|
# Store initial value
|
|
print("\n[1/3] Storing initial value...")
|
|
client.datastore_set(key=key, value=initial_value)
|
|
retrieved = client.datastore_get(key)
|
|
assert retrieved == initial_value
|
|
print(f"✓ Initial value: {retrieved}")
|
|
|
|
# Update value
|
|
print("\n[2/3] Updating value...")
|
|
client.datastore_set(key=key, value=updated_value)
|
|
retrieved = client.datastore_get(key)
|
|
assert retrieved == updated_value
|
|
print(f"✓ Updated value: {retrieved}")
|
|
|
|
# Verify update persisted
|
|
print("\n[3/3] Verifying persistence...")
|
|
retrieved_again = client.datastore_get(key)
|
|
assert retrieved_again == updated_value
|
|
print(f"✓ Value persisted: {retrieved_again}")
|
|
|
|
# Cleanup
|
|
client.datastore_delete(key)
|
|
|
|
print("\n=== Test Summary ===")
|
|
print(f"✓ Initial value stored")
|
|
print(f"✓ Value updated successfully")
|
|
print(f"✓ Update persisted")
|
|
print(f"✓ Test PASSED")
|
|
|
|
def test_datastore_complex_values(self, client: AttuneClient, pack_ref: str):
|
|
"""Test storing complex data structures (JSON)"""
|
|
|
|
print(f"\n=== T1.6g: Complex JSON Values ===")
|
|
|
|
# Complex nested structure
|
|
complex_data = {
|
|
"api": {
|
|
"endpoint": "https://api.example.com",
|
|
"version": "v2",
|
|
"timeout": 30,
|
|
},
|
|
"features": {
|
|
"caching": True,
|
|
"retry": {"enabled": True, "max_attempts": 3, "backoff": "exponential"},
|
|
},
|
|
"limits": {"rate_limit": 1000, "burst": 100},
|
|
"tags": ["production", "critical", "monitored"],
|
|
}
|
|
|
|
# Store complex value
|
|
print("\n[1/3] Storing complex JSON structure...")
|
|
key = "test.config.complex"
|
|
client.datastore_set(key=key, value=complex_data)
|
|
print(f"✓ Complex structure stored")
|
|
|
|
# Retrieve and verify structure
|
|
print("\n[2/3] Retrieving and verifying structure...")
|
|
retrieved = client.datastore_get(key)
|
|
print(f"✓ Structure retrieved")
|
|
|
|
# Verify nested values
|
|
assert retrieved["api"]["endpoint"] == complex_data["api"]["endpoint"]
|
|
assert retrieved["features"]["retry"]["max_attempts"] == 3
|
|
assert retrieved["limits"]["rate_limit"] == 1000
|
|
assert "production" in retrieved["tags"]
|
|
print(f"✓ All nested values match")
|
|
|
|
# Cleanup
|
|
print("\n[3/3] Cleaning up...")
|
|
client.datastore_delete(key)
|
|
print(f"✓ Cleanup complete")
|
|
|
|
print("\n=== Test Summary ===")
|
|
print(f"✓ Complex JSON structure stored")
|
|
print(f"✓ Nested values preserved")
|
|
print(f"✓ Structure verified")
|
|
print(f"✓ Test PASSED")
|