16 KiB
Inquiry Handling - Human-in-the-Loop Workflows
Overview
Inquiry handling enables human-in-the-loop workflows in Attune, allowing action executions to pause and wait for human input, approval, or decisions before continuing. This is essential for workflows that require manual intervention, approval gates, or interactive decision-making.
Architecture
Components
- Action - Returns a result containing an inquiry request
- Worker - Executes action and returns result with
__inquirymarker - Executor (Completion Listener) - Detects inquiry request and creates inquiry
- Inquiry Record - Database record tracking the inquiry state
- API - Endpoints for users to view and respond to inquiries
- Executor (Inquiry Handler) - Listens for responses and resumes executions
- Notifier - Sends real-time notifications about inquiry events
Message Flow
Action Execution → Worker completes → ExecutionCompleted message →
Completion Listener detects __inquiry → Creates Inquiry record →
Publishes InquiryCreated message → Notifier alerts users →
User responds via API → API publishes InquiryResponded message →
Inquiry Handler receives message → Updates execution with response →
Execution continues/completes
Inquiry Request Format
Action Result with Inquiry
Actions can request human input by including an __inquiry key in their result:
{
"__inquiry": {
"prompt": "Approve deployment to production?",
"response_schema": {
"type": "object",
"properties": {
"approved": {"type": "boolean"},
"comments": {"type": "string"}
},
"required": ["approved"]
},
"assigned_to": 123,
"timeout_seconds": 3600
},
"deployment_plan": {
"target": "production",
"version": "v2.5.0"
}
}
Inquiry Fields
| Field | Type | Required | Description |
|---|---|---|---|
prompt |
string | Yes | Question/prompt text displayed to user |
response_schema |
JSON Schema | No | Schema defining expected response format |
assigned_to |
integer | No | Identity ID of user assigned to respond |
timeout_seconds |
integer | No | Seconds from creation until inquiry times out |
Creating Inquiries
From Python Actions
def run(deployment_plan):
# Validate deployment plan
validate_plan(deployment_plan)
# Request human approval
return {
"__inquiry": {
"prompt": f"Approve deployment of {deployment_plan['version']} to production?",
"response_schema": {
"type": "object",
"properties": {
"approved": {"type": "boolean"},
"reason": {"type": "string"}
},
"required": ["approved"]
},
"timeout_seconds": 7200 # 2 hours
},
"plan": deployment_plan
}
From JavaScript Actions
async function run(config) {
// Prepare deployment
const plan = await prepareDeploy(config);
// Request approval with assigned user
return {
__inquiry: {
prompt: `Deploy ${plan.serviceName} to ${plan.environment}?`,
response_schema: {
type: "object",
properties: {
approved: { type: "boolean" },
comments: { type: "string" }
}
},
assigned_to: config.approver_id,
timeout_seconds: 3600
},
deployment: plan
};
}
Inquiry Lifecycle
Status Flow
pending → responded (user provides response)
pending → timeout (timeout_at expires)
pending → cancelled (manual cancellation)
Database Schema
CREATE TABLE attune.inquiry (
id BIGSERIAL PRIMARY KEY,
execution BIGINT NOT NULL REFERENCES attune.execution(id),
prompt TEXT NOT NULL,
response_schema JSONB,
assigned_to BIGINT REFERENCES attune.identity(id),
status attune.inquiry_status_enum NOT NULL DEFAULT 'pending',
response JSONB,
timeout_at TIMESTAMPTZ,
responded_at TIMESTAMPTZ,
created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
API Endpoints
List Inquiries
GET /api/v1/inquiries
Query parameters:
status- Filter by status (pending, responded, timeout, cancelled)execution- Filter by execution IDassigned_to- Filter by assigned user IDpage,per_page- Pagination
Response:
{
"data": [
{
"id": 1,
"execution": 123,
"prompt": "Approve deployment?",
"assigned_to": 5,
"status": "pending",
"has_response": false,
"timeout_at": "2024-01-15T12:00:00Z",
"created": "2024-01-15T10:00:00Z"
}
],
"total": 1,
"page": 1,
"per_page": 50,
"pages": 1
}
Get Inquiry Details
GET /api/v1/inquiries/:id
Response:
{
"data": {
"id": 1,
"execution": 123,
"prompt": "Approve deployment to production?",
"response_schema": {
"type": "object",
"properties": {
"approved": {"type": "boolean"}
}
},
"assigned_to": 5,
"status": "pending",
"response": null,
"timeout_at": "2024-01-15T12:00:00Z",
"responded_at": null,
"created": "2024-01-15T10:00:00Z",
"updated": "2024-01-15T10:00:00Z"
}
}
Respond to Inquiry
POST /api/v1/inquiries/:id/respond
Request body:
{
"response": {
"approved": true,
"comments": "LGTM - all tests passed"
}
}
Response:
{
"data": {
"id": 1,
"execution": 123,
"status": "responded",
"response": {
"approved": true,
"comments": "LGTM - all tests passed"
},
"responded_at": "2024-01-15T10:30:00Z"
},
"message": "Response submitted successfully"
}
Cancel Inquiry
POST /api/v1/inquiries/:id/cancel
Cancels a pending inquiry (admin/system use).
Message Queue Events
InquiryCreated
Published when an inquiry is created.
Routing key: inquiry.created
Payload:
{
"inquiry_id": 1,
"execution_id": 123,
"prompt": "Approve deployment?",
"response_schema": {...},
"assigned_to": 5,
"timeout_at": "2024-01-15T12:00:00Z"
}
InquiryResponded
Published when a user responds to an inquiry.
Routing key: inquiry.responded
Payload:
{
"inquiry_id": 1,
"execution_id": 123,
"response": {
"approved": true
},
"responded_by": 5,
"responded_at": "2024-01-15T10:30:00Z"
}
Executor Service Integration
Completion Listener
The completion listener detects inquiry requests in execution results:
// Check if execution result contains an inquiry request
if let Some(result) = &exec.result {
if InquiryHandler::has_inquiry_request(result) {
// Create inquiry and publish InquiryCreated message
InquiryHandler::create_inquiry_from_result(
pool,
publisher,
execution_id,
result,
).await?;
}
}
Inquiry Handler
The inquiry handler processes inquiry responses:
// Listen for InquiryResponded messages
consumer.consume_with_handler(|envelope: MessageEnvelope<InquiryRespondedPayload>| {
async move {
// Update execution with inquiry response
Self::resume_execution_with_response(
pool,
publisher,
execution,
inquiry,
response,
).await?;
}
}).await?;
Timeout Checker
A background task periodically checks for expired inquiries:
// Run every 60 seconds
InquiryHandler::timeout_check_loop(pool, 60).await;
This updates pending inquiries to timeout status when timeout_at is exceeded.
Access Control
Assignment Enforcement
If an inquiry has assigned_to set, only that user can respond:
if let Some(assigned_to) = inquiry.assigned_to {
if assigned_to != user_id {
return Err(ApiError::Forbidden("Not authorized to respond"));
}
}
RBAC Integration (Future)
Future versions will integrate with RBAC for:
- Permission to respond to inquiries
- Permission to cancel inquiries
- Visibility filtering based on roles
Timeout Handling
Automatic Timeout
Inquiries with timeout_at set are automatically marked as timed out:
UPDATE attune.inquiry
SET status = 'timeout', updated = NOW()
WHERE status = 'pending'
AND timeout_at IS NOT NULL
AND timeout_at < NOW();
Timeout Behavior
When an inquiry times out:
- Status changes to
timeout - Execution remains in current state
- Optional: Publish timeout event
- Optional: Resume execution with timeout indicator
Real-Time Notifications
WebSocket Integration
The Notifier service sends real-time notifications for inquiry events:
// Subscribe to inquiry notifications
ws.send(JSON.stringify({
type: "subscribe",
filters: {
entity_type: "inquiry",
user_id: 5
}
}));
// Receive notification
{
"id": 123,
"entity_type": "inquiry",
"entity": "1",
"activity": "created",
"content": {
"prompt": "Approve deployment?",
"assigned_to": 5
}
}
Notification Triggers
- inquiry.created - New inquiry created
- inquiry.responded - Inquiry received response
- inquiry.timeout - Inquiry timed out
- inquiry.cancelled - Inquiry was cancelled
Use Cases
Deployment Approval
def deploy_to_production(config):
# Prepare deployment
plan = prepare_deployment(config)
# Request approval
return {
"__inquiry": {
"prompt": f"Approve deployment of {config['service']} v{config['version']}?",
"response_schema": {
"type": "object",
"properties": {
"approved": {"type": "boolean"},
"rollback_plan": {"type": "string"}
}
},
"assigned_to": get_on_call_engineer(),
"timeout_seconds": 1800 # 30 minutes
},
"deployment_plan": plan
}
Data Validation
def validate_data_import(data):
# Check for anomalies
anomalies = detect_anomalies(data)
if anomalies:
return {
"__inquiry": {
"prompt": f"Found {len(anomalies)} anomalies. Continue import?",
"response_schema": {
"type": "object",
"properties": {
"continue": {"type": "boolean"},
"exclude_records": {"type": "array", "items": {"type": "integer"}}
}
},
"timeout_seconds": 3600
},
"anomalies": anomalies
}
# No anomalies, proceed normally
return import_data(data)
Configuration Review
def update_firewall_rules(rules):
# Analyze impact
impact = analyze_impact(rules)
if impact["severity"] == "high":
return {
"__inquiry": {
"prompt": "High-impact firewall changes detected. Approve?",
"response_schema": {
"type": "object",
"properties": {
"approved": {"type": "boolean"},
"review_notes": {"type": "string"}
}
},
"assigned_to": get_security_team_lead(),
"timeout_seconds": 7200
},
"impact_analysis": impact,
"proposed_rules": rules
}
# Low impact, apply immediately
return apply_rules(rules)
Best Practices
1. Clear Prompts
Write clear, actionable prompts:
✅ Good: "Approve deployment of api-service v2.1.0 to production?" ❌ Bad: "Continue?"
2. Reasonable Timeouts
Set appropriate timeout values:
- Critical decisions: 30-60 minutes
- Routine approvals: 2-4 hours
- Non-urgent reviews: 24-48 hours
3. Response Schemas
Define clear response schemas to validate user input:
{
"type": "object",
"properties": {
"approved": {
"type": "boolean",
"description": "Whether to approve the action"
},
"comments": {
"type": "string",
"description": "Optional comments explaining the decision"
}
},
"required": ["approved"]
}
4. Assignment
Assign inquiries to specific users for accountability:
{
"__inquiry": {
"prompt": "...",
"assigned_to": get_on_call_user_id()
}
}
5. Context Information
Include relevant context in the action result:
return {
"__inquiry": {
"prompt": "Approve deployment?"
},
"deployment_details": {
"service": "api",
"version": "v2.1.0",
"changes": ["Added new endpoint", "Fixed bug #123"],
"tests_passed": True,
"ci_build_url": "https://ci.example.com/builds/456"
}
}
Troubleshooting
Inquiry Not Created
Problem: Action completes but no inquiry is created.
Check:
- Action result contains
__inquirykey - Completion listener is running
- Check executor logs for errors
- Verify inquiry table exists
Execution Not Resuming
Problem: User responds but execution doesn't continue.
Check:
- InquiryResponded message was published (check API logs)
- Inquiry handler is running and consuming messages
- Check executor logs for errors processing response
- Verify execution exists and is in correct state
Timeout Not Working
Problem: Inquiries not timing out automatically.
Check:
- Timeout checker loop is running
timeout_atis set correctly in inquiry record- Check system time/timezone configuration
- Review executor logs for timeout check errors
Response Rejected
Problem: API rejects inquiry response.
Check:
- Inquiry is still in
pendingstatus - Inquiry hasn't timed out
- User is authorized (if
assigned_tois set) - Response matches
response_schema(when validation is enabled)
Performance Considerations
Database Indexes
Ensure these indexes exist for efficient inquiry queries:
CREATE INDEX idx_inquiry_status ON attune.inquiry(status);
CREATE INDEX idx_inquiry_assigned_status ON attune.inquiry(assigned_to, status);
CREATE INDEX idx_inquiry_timeout_at ON attune.inquiry(timeout_at) WHERE timeout_at IS NOT NULL;
Message Queue
- Use separate consumer for inquiry responses
- Set appropriate prefetch count (10-20)
- Enable message acknowledgment
Timeout Checking
- Run timeout checker every 60-120 seconds
- Use batched updates for efficiency
- Monitor for long-running timeout queries
Security
Input Validation
Always validate inquiry responses:
// TODO: Validate response against response_schema
if let Some(schema) = &inquiry.response_schema {
validate_json_schema(&request.response, schema)?;
}
Authorization
Verify user permissions:
// Check assignment
if let Some(assigned_to) = inquiry.assigned_to {
if assigned_to != user.id {
return Err(ApiError::Forbidden("Not authorized"));
}
}
// Future: Check RBAC permissions
if !user.has_permission("inquiry:respond") {
return Err(ApiError::Forbidden("Missing permission"));
}
Audit Trail
All inquiry responses are logged:
- Who responded
- When they responded
- What they responded with
- Original inquiry context
Future Enhancements
Planned Features
- Multi-step Approvals - Chain multiple inquiries for approval workflows
- Conditional Resumption - Resume execution differently based on response
- Inquiry Templates - Reusable inquiry definitions
- Bulk Operations - Approve/reject multiple inquiries at once
- Escalation - Auto-reassign if no response within timeframe
- Reminder Notifications - Alert users of pending inquiries
- Response Validation - Validate responses against JSON schema
- Inquiry History - View history of all inquiries for an execution chain
Integration Opportunities
- Slack/Teams - Respond to inquiries via chat
- Email - Send inquiry notifications and accept email responses
- Mobile Apps - Native mobile inquiry interface
- External Systems - Webhook integration for external approval systems