46 KiB
Pitfall Resolution Action Plan
Date: 2024-01-02
Status: Action Plan - Ready for Implementation
Related Document: StackStorm-Pitfalls-Analysis.md
Overview
This document provides a detailed, step-by-step action plan for resolving the critical issues identified in the StackStorm pitfalls analysis. Each issue has been broken down into concrete implementation tasks with estimated effort and dependencies.
Critical Issues Summary
| ID | Issue | Severity | Estimated Effort | Priority |
|---|---|---|---|---|
| P3 | Limited Language Ecosystem Support | ⚠️ Moderate | 5-7 days | P2 |
| P4 | Dependency Hell & System Coupling | 🔴 Critical | 7-10 days | P1 |
| P5 | Insecure Secret Passing | 🔴 Critical | 3-5 days | P0 |
| P6 | Log Storage Size Limits | ⚠️ Moderate | 3-4 days | P1 |
| P7 | Policy Execution Ordering | 🔴 Critical | 4-6 days | P0 |
Total Estimated Effort: 22-32 days (4.5-6.5 weeks)
PHASE 1: CRITICAL CORRECTNESS & SECURITY FIXES
Priority: P0 (BLOCKING)
Estimated Time: 7-11 days
Must Complete Before: Any production deployment or public testing
PHASE 1A: POLICY EXECUTION ORDERING FIX
Priority: P0 (BLOCKING)
Estimated Time: 4-6 days
Must Complete Before: Any production deployment
P7.1: Implement Execution Queue Manager
Files to Create:
crates/executor/src/execution_queue.rscrates/executor/tests/execution_queue_tests.rs
Task P7.1.1: Create ExecutionQueueManager Module
New File: crates/executor/src/execution_queue.rs
//! Execution Queue Manager
//!
//! Manages FIFO queues of delayed executions per action to ensure
//! proper ordering when policy limits are enforced.
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::{Mutex, Notify};
use chrono::{DateTime, Utc};
use tracing::{debug, info};
/// Manages FIFO queues of delayed executions per action
pub struct ExecutionQueueManager {
/// Queue per action_id
queues: Arc<Mutex<HashMap<i64, ActionQueue>>>,
}
struct ActionQueue {
/// FIFO queue of waiting execution IDs with enqueue time
waiting: VecDeque<QueueEntry>,
/// Notify when slot becomes available
notify: Arc<Notify>,
/// Current running count
running_count: u32,
/// Concurrency limit for this action
limit: u32,
}
struct QueueEntry {
execution_id: i64,
enqueued_at: DateTime<Utc>,
}
impl ExecutionQueueManager {
pub fn new() -> Self {
Self {
queues: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Set concurrency limit for an action
pub async fn set_limit(&self, action_id: i64, limit: u32) {
let mut queues = self.queues.lock().await;
queues.entry(action_id)
.or_insert_with(|| ActionQueue::new(limit))
.limit = limit;
}
/// Enqueue an execution (returns position in queue)
pub async fn enqueue(&self, action_id: i64, execution_id: i64) -> usize {
let mut queues = self.queues.lock().await;
let queue = queues.entry(action_id)
.or_insert_with(|| ActionQueue::new(1));
let entry = QueueEntry {
execution_id,
enqueued_at: Utc::now(),
};
queue.waiting.push_back(entry);
let position = queue.waiting.len();
info!("Execution {} enqueued for action {} at position {}",
execution_id, action_id, position);
position
}
/// Wait for turn (blocks until this execution can proceed)
pub async fn wait_for_turn(&self, action_id: i64, execution_id: i64) -> Result<(), String> {
loop {
// Check if it's our turn
let notify = {
let mut queues = self.queues.lock().await;
let queue = queues.get_mut(&action_id)
.ok_or_else(|| format!("No queue for action {}", action_id))?;
// Are we at the front AND is there capacity?
if let Some(front) = queue.waiting.front() {
if front.execution_id == execution_id && queue.running_count < queue.limit {
// It's our turn!
queue.waiting.pop_front();
queue.running_count += 1;
info!("Execution {} proceeding (running: {}/{})",
execution_id, queue.running_count, queue.limit);
return Ok(());
}
}
queue.notify.clone()
};
// Not our turn, wait for notification
debug!("Execution {} waiting for notification", execution_id);
notify.notified().await;
}
}
/// Mark execution as complete (frees up slot)
pub async fn complete(&self, action_id: i64, execution_id: i64) {
let mut queues = self.queues.lock().await;
if let Some(queue) = queues.get_mut(&action_id) {
queue.running_count = queue.running_count.saturating_sub(1);
info!("Execution {} completed for action {} (running: {}/{})",
execution_id, action_id, queue.running_count, queue.limit);
queue.notify.notify_one(); // Wake next waiting execution
}
}
/// Get queue stats for monitoring
pub async fn get_queue_stats(&self, action_id: i64) -> QueueStats {
let queues = self.queues.lock().await;
if let Some(queue) = queues.get(&action_id) {
let avg_wait_time = if !queue.waiting.is_empty() {
let now = Utc::now();
let total_wait: i64 = queue.waiting.iter()
.map(|e| (now - e.enqueued_at).num_seconds())
.sum();
Some(total_wait as f64 / queue.waiting.len() as f64)
} else {
None
};
QueueStats {
waiting: queue.waiting.len(),
running: queue.running_count as usize,
limit: queue.limit as usize,
avg_wait_time_seconds: avg_wait_time,
}
} else {
QueueStats::default()
}
}
}
impl ActionQueue {
fn new(limit: u32) -> Self {
Self {
waiting: VecDeque::new(),
notify: Arc::new(Notify::new()),
running_count: 0,
limit,
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct QueueStats {
pub waiting: usize,
pub running: usize,
pub limit: usize,
pub avg_wait_time_seconds: Option<f64>,
}
impl Default for QueueStats {
fn default() -> Self {
Self {
waiting: 0,
running: 0,
limit: 1,
avg_wait_time_seconds: None,
}
}
}
Acceptance Criteria:
- ExecutionQueueManager created with FIFO queue per action
- Enqueue method adds to back of queue
- wait_for_turn blocks until execution is at front AND capacity available
- complete method decrements counter and notifies next
- Unit tests verify FIFO ordering
- Thread-safe with tokio::sync::Mutex
Estimated Time: 6 hours
Task P7.1.2: Integrate with PolicyEnforcer
File: crates/executor/src/policy_enforcer.rs
// Add field to PolicyEnforcer
pub struct PolicyEnforcer {
pool: PgPool,
queue_manager: Arc<ExecutionQueueManager>, // ← NEW
global_policy: ExecutionPolicy,
pack_policies: HashMap<Id, ExecutionPolicy>,
action_policies: HashMap<Id, ExecutionPolicy>,
}
impl PolicyEnforcer {
pub fn new(pool: PgPool, queue_manager: Arc<ExecutionQueueManager>) -> Self {
Self {
pool,
queue_manager,
global_policy: ExecutionPolicy::default(),
pack_policies: HashMap::new(),
action_policies: HashMap::new(),
}
}
/// Enforce policies with queueing support
pub async fn enforce_and_wait(
&self,
action_id: Id,
execution_id: Id,
pack_id: Option<Id>,
) -> Result<()> {
// Check if policy would be violated
if let Some(violation) = self.check_policies(action_id, pack_id).await? {
match violation {
PolicyViolation::ConcurrencyLimitExceeded { limit, .. } => {
// Set limit in queue manager
self.queue_manager.set_limit(action_id, limit).await;
// Enqueue and wait for turn
let position = self.queue_manager.enqueue(action_id, execution_id).await;
info!("Execution {} queued at position {} due to concurrency limit",
execution_id, position);
self.queue_manager.wait_for_turn(action_id, execution_id)
.await
.map_err(|e| anyhow::anyhow!(e))?;
info!("Execution {} proceeding after queue wait", execution_id);
}
PolicyViolation::RateLimitExceeded { .. } => {
// Rate limit: retry with backoff
self.retry_with_backoff(action_id, pack_id, 60).await?;
}
_ => {
return Err(anyhow::anyhow!("Policy violation: {}", violation));
}
}
}
Ok(())
}
async fn retry_with_backoff(
&self,
action_id: Id,
pack_id: Option<Id>,
max_wait_seconds: u32,
) -> Result<()> {
let start = Utc::now();
let mut backoff_seconds = 1;
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(backoff_seconds)).await;
if self.check_policies(action_id, pack_id).await?.is_none() {
return Ok(());
}
if (Utc::now() - start).num_seconds() > max_wait_seconds as i64 {
return Err(anyhow::anyhow!("Policy compliance timeout"));
}
backoff_seconds = (backoff_seconds * 2).min(10);
}
}
}
Acceptance Criteria:
- PolicyEnforcer has queue_manager field
- enforce_and_wait method uses queue for concurrency limits
- Rate limits use exponential backoff
- Tests verify integration
Estimated Time: 4 hours
Task P7.1.3: Update Scheduler to Use Queue
File: crates/executor/src/scheduler.rs
// Update ExecutionScheduler to include policy enforcement
pub struct ExecutionScheduler {
pool: PgPool,
publisher: Arc<Publisher>,
consumer: Arc<Consumer>,
policy_enforcer: Arc<PolicyEnforcer>, // ← NEW
}
impl ExecutionScheduler {
pub fn new(
pool: PgPool,
publisher: Arc<Publisher>,
consumer: Arc<Consumer>,
policy_enforcer: Arc<PolicyEnforcer>,
) -> Self {
Self {
pool,
publisher,
consumer,
policy_enforcer,
}
}
async fn process_execution_requested(
pool: &PgPool,
publisher: &Publisher,
policy_enforcer: &PolicyEnforcer,
envelope: &MessageEnvelope<ExecutionRequestedPayload>,
) -> Result<()> {
let execution_id = envelope.payload.execution_id;
let execution = ExecutionRepository::find_by_id(pool, execution_id).await?
.ok_or_else(|| anyhow::anyhow!("Execution not found"))?;
let action = Self::get_action_for_execution(pool, &execution).await?;
// Enforce policies with queueing - this may block
policy_enforcer.enforce_and_wait(
action.id,
execution_id,
Some(action.pack),
).await?;
// Now proceed with scheduling
let worker = Self::select_worker(pool, &action).await?;
// ... rest of scheduling logic
}
}
Acceptance Criteria:
- Scheduler calls enforce_and_wait before scheduling
- Blocked executions wait in queue
- Executions proceed in FIFO order
Estimated Time: 3 hours
Task P7.1.4: Add Completion Notification
File: crates/worker/src/executor.rs
// Update ActionExecutor to notify queue on completion
pub struct ActionExecutor {
pool: PgPool,
runtime_registry: RuntimeRegistry,
artifact_manager: ArtifactManager,
secret_manager: SecretManager,
publisher: Arc<Publisher>, // ← NEW: for notifications
}
async fn handle_execution_success(
&self,
execution_id: i64,
action_id: i64,
result: &ExecutionResult,
) -> Result<()> {
// Update database
let input = UpdateExecutionInput {
status: Some(ExecutionStatus::Completed),
result: result.result.clone(),
executor: None,
};
ExecutionRepository::update(&self.pool, execution_id, input).await?;
// Notify queue manager via message queue
let payload = ExecutionCompletedPayload {
execution_id,
action_id,
status: ExecutionStatus::Completed,
};
let envelope = MessageEnvelope::new(
MessageType::ExecutionCompleted,
payload,
).with_source("worker");
self.publisher.publish_envelope(&envelope).await?;
Ok(())
}
async fn handle_execution_failure(&self, execution_id: i64, action_id: i64, error: String) -> Result<()> {
// Similar notification on failure
// ...
}
Acceptance Criteria:
- Worker publishes ExecutionCompleted message
- Includes action_id for queue lookup
- Published on both success and failure
Estimated Time: 3 hours
Task P7.1.5: Add Executor Completion Listener
File: crates/executor/src/completion_listener.rs (new file)
//! Listens for execution completion messages and updates queue
use anyhow::Result;
use attune_common::mq::{Consumer, MessageEnvelope};
use std::sync::Arc;
use tracing::{debug, error, info};
use super::execution_queue::ExecutionQueueManager;
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ExecutionCompletedPayload {
pub execution_id: i64,
pub action_id: i64,
pub status: String,
}
pub struct CompletionListener {
consumer: Arc<Consumer>,
queue_manager: Arc<ExecutionQueueManager>,
}
impl CompletionListener {
pub fn new(consumer: Arc<Consumer>, queue_manager: Arc<ExecutionQueueManager>) -> Self {
Self {
consumer,
queue_manager,
}
}
pub async fn start(&self) -> Result<()> {
info!("Starting execution completion listener");
let queue_manager = self.queue_manager.clone();
self.consumer.consume_with_handler(
move |envelope: MessageEnvelope<ExecutionCompletedPayload>| {
let queue_manager = queue_manager.clone();
async move {
debug!("Received execution completed: {:?}", envelope.payload);
queue_manager.complete(
envelope.payload.action_id,
envelope.payload.execution_id,
).await;
Ok(())
}
}
).await?;
Ok(())
}
}
Acceptance Criteria:
- Listener subscribes to execution.completed messages
- Calls queue_manager.complete on each message
- Error handling and logging
Estimated Time: 2 hours
Task P7.1.6: Add Queue Monitoring API
File: crates/api/src/routes/actions.rs
/// GET /api/v1/actions/:id/queue-stats
#[utoipa::path(
get,
path = "/api/v1/actions/{id}/queue-stats",
params(
("id" = i64, Path, description = "Action ID")
),
responses(
(status = 200, description = "Queue statistics", body = ApiResponse<QueueStats>),
(status = 404, description = "Action not found")
),
tag = "actions"
)]
async fn get_action_queue_stats(
State(state): State<Arc<AppState>>,
Extension(user): Extension<Identity>,
Path(action_id): Path<i64>,
) -> Result<Json<ApiResponse<QueueStats>>> {
// Verify action exists
ActionRepository::find_by_id(&state.pool, action_id)
.await?
.ok_or_else(|| Error::not_found("Action", "id", action_id.to_string()))?;
let stats = state.queue_manager.get_queue_stats(action_id).await;
Ok(Json(ApiResponse::success(stats)))
}
Acceptance Criteria:
- API endpoint returns queue stats
- Stats include waiting, running, limit, avg wait time
- Requires authentication
- OpenAPI documentation
Estimated Time: 2 hours
Task P7.1.7: Integration Tests
New File: crates/executor/tests/execution_queue_tests.rs
#[tokio::test]
async fn test_fifo_ordering() {
// Test that three executions with limit=1 execute in order
}
#[tokio::test]
async fn test_concurrent_enqueue() {
// Test 100 concurrent enqueues maintain order
}
#[tokio::test]
async fn test_completion_releases_slot() {
// Test that completing execution wakes next in queue
}
#[tokio::test]
async fn test_queue_stats() {
// Test stats accurately reflect queue state
}
Acceptance Criteria:
- FIFO ordering verified with multiple executions
- Concurrent operations safe and ordered
- Completion notification works end-to-end
- Stats API accurate
Estimated Time: 6 hours
P7 Subtotal: 4-6 days (26 hours estimated)
PHASE 1B: SECURITY CRITICAL - SECRET PASSING FIX
Priority: P0 (BLOCKING)
Estimated Time: 3-5 days
Must Complete Before: Any production deployment or public testing
P5.1: Implement Stdin-Based Secret Injection
Files to Modify:
crates/worker/src/runtime/mod.rscrates/worker/src/runtime/python.rscrates/worker/src/runtime/shell.rscrates/worker/src/executor.rscrates/worker/src/secrets.rs
Task P5.1.1: Enhance ExecutionContext Structure
File: crates/worker/src/runtime/mod.rs
// Add new field to ExecutionContext
pub struct ExecutionContext {
// ... existing fields ...
/// Secrets to be passed via stdin (NOT environment variables)
pub secrets: HashMap<String, String>,
/// Secret injection method
pub secret_method: SecretInjectionMethod,
}
pub enum SecretInjectionMethod {
/// Pass secrets via stdin as JSON
Stdin,
/// Write secrets to temporary file and pass file path
TempFile,
/// DEPRECATED: Use environment variables (insecure)
#[deprecated(note = "Insecure - secrets visible in process table")]
EnvironmentVariables,
}
Acceptance Criteria:
- ExecutionContext includes secrets field
- SecretInjectionMethod enum defined
- Default method is Stdin
- All tests pass
Estimated Time: 1 hour
Task P5.1.2: Update SecretManager to Not Add Secrets to Env
File: crates/worker/src/secrets.rs
// REMOVE or deprecate this method
// pub fn prepare_secret_env(...) -> HashMap<String, String>
// ADD new method
impl SecretManager {
/// Prepare secrets for secure injection
/// Returns secrets as a HashMap, ready to be passed via stdin
pub fn prepare_secrets(&self, secrets: &HashMap<String, String>)
-> HashMap<String, String> {
// Just return as-is, no env var prefix needed
secrets.clone()
}
/// Serialize secrets to JSON for stdin injection
pub fn serialize_secrets_for_stdin(
&self,
secrets: &HashMap<String, String>,
parameters: &HashMap<String, serde_json::Value>,
) -> Result<String> {
let payload = serde_json::json!({
"secrets": secrets,
"parameters": parameters,
});
serde_json::to_string(&payload)
.map_err(|e| Error::Internal(format!("Failed to serialize secrets: {}", e)))
}
}
Acceptance Criteria:
- prepare_secret_env method removed or deprecated
- New serialize_secrets_for_stdin method added
- Unit tests pass
- Secrets no longer added to environment variables
Estimated Time: 2 hours
Task P5.1.3: Update Executor to Pass Secrets Separately
File: crates/worker/src/executor.rs
async fn prepare_execution_context(
&self,
execution: &Execution,
action: &Action,
) -> Result<ExecutionContext> {
// ... existing parameter extraction ...
// Fetch secrets but DO NOT add to env
let secrets = match self.secret_manager.fetch_secrets_for_action(action).await {
Ok(secrets) => secrets,
Err(e) => {
warn!("Failed to fetch secrets for action {}: {}", action.r#ref, e);
HashMap::new()
}
};
// Environment variables - NO SECRETS!
let mut env = HashMap::new();
env.insert("ATTUNE_EXECUTION_ID".to_string(), execution.id.to_string());
env.insert("ATTUNE_ACTION_REF".to_string(), execution.action_ref.clone());
// ... other non-secret env vars ...
// DO NOT DO THIS ANYMORE:
// env.extend(secret_env); ← REMOVED
let context = ExecutionContext {
execution_id: execution.id,
action_ref: execution.action_ref.clone(),
parameters,
env,
secrets, // ← NEW: Passed separately
secret_method: SecretInjectionMethod::Stdin, // ← NEW
timeout,
working_dir: None,
entry_point,
code: None,
code_path: None,
};
Ok(context)
}
Acceptance Criteria:
- Secrets fetched but not added to env
- Secrets passed in ExecutionContext.secrets field
- Environment variables contain no SECRET_ prefixed keys
- Tests verify env vars don't contain secrets
Estimated Time: 1 hour
Task P5.1.4: Implement Stdin Injection in Python Runtime
File: crates/worker/src/runtime/python.rs
async fn execute_python_code(
&self,
script: String,
context: &ExecutionContext,
) -> RuntimeResult<ExecutionResult> {
let start = Instant::now();
// Build command
let mut cmd = Command::new(&self.python_path);
cmd.arg("-c")
.arg(&script)
.stdin(Stdio::piped()) // ← CHANGED from Stdio::null()
.stdout(Stdio::piped())
.stderr(Stdio::piped());
// Add environment variables (NO SECRETS)
for (key, value) in &context.env {
cmd.env(key, value);
}
// Spawn process
let mut child = cmd.spawn()?;
// Write secrets and parameters to stdin as JSON
if let Some(mut stdin) = child.stdin.take() {
let stdin_data = serde_json::json!({
"secrets": context.secrets,
"parameters": context.parameters,
});
let stdin_bytes = serde_json::to_vec(&stdin_data)?;
stdin.write_all(&stdin_bytes).await
.map_err(|e| RuntimeError::ProcessError(format!("Failed to write to stdin: {}", e)))?;
// Close stdin to signal EOF
drop(stdin);
}
// Wait for execution with timeout
let output = if let Some(timeout_secs) = context.timeout {
match timeout(Duration::from_secs(timeout_secs), child.wait_with_output()).await {
Ok(output) => output?,
Err(_) => {
let _ = child.kill().await;
return Ok(ExecutionResult::failure(
-1,
String::new(),
format!("Execution timed out after {} seconds", timeout_secs),
start.elapsed().as_millis() as u64,
));
}
}
} else {
child.wait_with_output().await?
};
// ... rest of result processing ...
}
Acceptance Criteria:
- Stdin is piped, not null
- Secrets and parameters written to stdin as JSON
- Stdin closed after writing
- No secrets in environment variables
- Tests verify stdin content
Estimated Time: 3 hours
Task P5.1.5: Update Python Wrapper Script Generator
File: crates/worker/src/runtime/python.rs
fn generate_wrapper_script(&self, context: &ExecutionContext) -> RuntimeResult<String> {
// NEW: Don't embed parameters in script, read from stdin
let wrapper = format!(
r#"#!/usr/bin/env python3
import sys
import json
import traceback
def main():
try:
# Read input data from stdin
input_data = json.load(sys.stdin)
secrets = input_data.get('secrets', {{}})
parameters = input_data.get('parameters', {{}})
# Make secrets available as a dict (not env vars)
# Actions access via: secrets['api_key']
# Execute the action code
action_code = '''{}'''
# Create namespace with secrets and parameters
namespace = {{
'__name__': '__main__',
'secrets': secrets,
'parameters': parameters,
}}
exec(action_code, namespace)
# Look for entry point function
if '{}' in namespace:
result = namespace['{}'](**parameters)
elif 'run' in namespace:
result = namespace['run'](parameters=parameters, secrets=secrets)
elif 'main' in namespace:
result = namespace['main'](parameters=parameters, secrets=secrets)
else:
result = {{'status': 'no_entry_point'}}
# Output result as JSON
if result is not None:
print(json.dumps({{'result': result, 'status': 'success'}}))
else:
print(json.dumps({{'status': 'success'}}))
sys.exit(0)
except Exception as e:
error_info = {{
'status': 'error',
'error': str(e),
'error_type': type(e).__name__,
'traceback': traceback.format_exc()
}}
print(json.dumps(error_info), file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
"#,
context.code.as_deref().unwrap_or(""),
context.entry_point,
context.entry_point
);
Ok(wrapper)
}
Acceptance Criteria:
- Wrapper reads from stdin, not embedded JSON
- Secrets available as dict in action code
- Parameters passed to entry point function
- Error handling maintains security
- Tests verify wrapper works correctly
Estimated Time: 2 hours
Task P5.1.6: Implement Stdin Injection in Shell Runtime
File: crates/worker/src/runtime/shell.rs
fn generate_wrapper_script(&self, context: &ExecutionContext) -> RuntimeResult<String> {
let mut script = String::new();
script.push_str("#!/bin/bash\n");
script.push_str("set -e\n\n");
// Read secrets and parameters from stdin
script.push_str("# Read input from stdin\n");
script.push_str("INPUT_JSON=$(cat)\n\n");
// Parse JSON and extract secrets
script.push_str("# Extract secrets (requires jq)\n");
script.push_str("if command -v jq &> /dev/null; then\n");
script.push_str(" # Export each secret as SECRET_NAME\n");
script.push_str(" for key in $(echo \"$INPUT_JSON\" | jq -r '.secrets | keys[]'); do\n");
script.push_str(" value=$(echo \"$INPUT_JSON\" | jq -r \".secrets[\\\"$key\\\"]\")\n");
script.push_str(" export \"SECRET_${key}\"=\"$value\"\n");
script.push_str(" done\n");
script.push_str(" \n");
script.push_str(" # Export each parameter as PARAM_NAME\n");
script.push_str(" for key in $(echo \"$INPUT_JSON\" | jq -r '.parameters | keys[]'); do\n");
script.push_str(" value=$(echo \"$INPUT_JSON\" | jq -r \".parameters[\\\"$key\\\"]\")\n");
script.push_str(" export \"PARAM_${key}\"=\"$value\"\n");
script.push_str(" done\n");
script.push_str("else\n");
script.push_str(" echo 'ERROR: jq is required for shell actions' >&2\n");
script.push_str(" exit 1\n");
script.push_str("fi\n\n");
// Add the action code
script.push_str("# Action code\n");
if let Some(code) = &context.code {
script.push_str(code);
}
Ok(script)
}
async fn execute_shell_code(
&self,
script: String,
context: &ExecutionContext,
) -> RuntimeResult<ExecutionResult> {
// ... similar stdin injection as Python runtime ...
}
Acceptance Criteria:
- Shell wrapper reads from stdin
- Requires jq for JSON parsing
- Secrets exported as SECRET_ variables (but from stdin, not process env)
- Parameters exported as PARAM_ variables
- Tests verify functionality
Estimated Time: 3 hours
Task P5.1.7: Security Testing
New File: crates/worker/tests/security_test.rs
#[tokio::test]
async fn test_secrets_not_in_process_env() {
// Create execution with secrets
// Spawn process
// Read /proc/{pid}/environ
// Assert secrets not present
}
#[tokio::test]
async fn test_secrets_not_visible_in_ps() {
// Similar test using ps command output
}
#[tokio::test]
async fn test_action_can_read_secrets_from_stdin() {
// Action successfully accesses secrets
}
Acceptance Criteria:
- Test verifies secrets not in /proc/pid/environ
- Test verifies secrets not in ps output
- Test verifies action can read secrets
- All security tests pass
Estimated Time: 4 hours
Task P5.1.8: Documentation
New File: docs/secure-secret-handling.md
Content should include:
- Architecture decision: why stdin vs env vars
- How to access secrets in Python actions
- How to access secrets in Shell actions
- Security best practices for pack developers
- Migration guide from old env var approach
Estimated Time: 2 hours
P5 Subtotal: 3-5 days (18 hours estimated)
PHASE 1 TOTAL: 7-11 days (44 hours estimated for P7 + P5)
PHASE 2: DEPENDENCY ISOLATION
Priority: P1 (HIGH)
Estimated Time: 7-10 days
Depends On: None (can run in parallel with P5)
P4.1: Implement Per-Pack Virtual Environments
Task P4.1.1: Add Pack Storage Directory Structure
New File: crates/common/src/pack_storage.rs
/// Manages pack storage on filesystem
pub struct PackStorage {
base_dir: PathBuf, // /var/lib/attune/packs
}
impl PackStorage {
pub fn get_pack_dir(&self, pack_ref: &str) -> PathBuf {
self.base_dir.join(pack_ref)
}
pub fn get_pack_venv_dir(&self, pack_ref: &str) -> PathBuf {
self.get_pack_dir(pack_ref).join(".venv")
}
pub fn get_pack_code_dir(&self, pack_ref: &str) -> PathBuf {
self.get_pack_dir(pack_ref).join("actions")
}
pub async fn initialize_pack_dir(&self, pack_ref: &str) -> Result<()> {
// Create directory structure
}
}
Estimated Time: 2 hours
Task P4.1.2: Implement Python Venv Manager
New File: crates/worker/src/runtime/venv.rs
pub struct VenvManager {
python_path: PathBuf,
venv_base: PathBuf,
}
impl VenvManager {
pub async fn create_venv(&self, pack_ref: &str) -> Result<PathBuf> {
let venv_path = self.venv_base.join(pack_ref).join(".venv");
if venv_path.exists() {
return Ok(venv_path);
}
// Create venv using: python3 -m venv {path}
let output = Command::new(&self.python_path)
.arg("-m")
.arg("venv")
.arg(&venv_path)
.output()
.await?;
if !output.status.success() {
return Err(Error::Internal(format!(
"Failed to create venv: {}",
String::from_utf8_lossy(&output.stderr)
)));
}
Ok(venv_path)
}
pub fn get_venv_python(&self, pack_ref: &str) -> PathBuf {
self.venv_base
.join(pack_ref)
.join(".venv/bin/python")
}
pub async fn install_requirements(
&self,
pack_ref: &str,
requirements: &[String],
) -> Result<()> {
let python = self.get_venv_python(pack_ref);
// Write requirements.txt
let req_file = self.venv_base
.join(pack_ref)
.join("requirements.txt");
tokio::fs::write(&req_file, requirements.join("\n")).await?;
// Install: python -m pip install -r requirements.txt
let output = Command::new(&python)
.arg("-m")
.arg("pip")
.arg("install")
.arg("-r")
.arg(&req_file)
.output()
.await?;
if !output.status.success() {
return Err(Error::Internal(format!(
"Failed to install requirements: {}",
String::from_utf8_lossy(&output.stderr)
)));
}
Ok(())
}
}
Estimated Time: 4 hours
Task P4.1.3: Update PythonRuntime to Use Venv
File: crates/worker/src/runtime/python.rs
pub struct PythonRuntime {
system_python: PathBuf, // For venv creation
venv_manager: VenvManager,
work_dir: PathBuf,
}
impl PythonRuntime {
async fn get_python_for_action(
&self,
action: &Action,
pack_ref: &str,
) -> RuntimeResult<PathBuf> {
// Check if venv exists for this pack
let venv_python = self.venv_manager.get_venv_python(pack_ref);
if venv_python.exists() {
return Ok(venv_python);
}
// Create venv if it doesn't exist
warn!("No venv found for pack {}, creating...", pack_ref);
let venv_path = self.venv_manager.create_venv(pack_ref).await
.map_err(|e| RuntimeError::SetupError(e.to_string()))?;
Ok(venv_path.join("bin/python"))
}
}
Estimated Time: 3 hours
Task P4.1.4: Add Pack Installation Endpoint
File: crates/api/src/routes/packs.rs
/// POST /api/v1/packs/:ref/install
async fn install_pack(
State(state): State<Arc<AppState>>,
Extension(user): Extension<Identity>,
Path(pack_ref): Path<String>,
) -> Result<Json<ApiResponse<InstallationStatus>>> {
// Trigger pack installation
// - Create venv
// - Install dependencies from pack.runtime_deps
// - Update pack status
}
/// GET /api/v1/packs/:ref/installation-status
async fn get_installation_status(
State(state): State<Arc<AppState>>,
Path(pack_ref): Path<String>,
) -> Result<Json<ApiResponse<InstallationStatus>>> {
// Return installation status
}
Estimated Time: 4 hours
Task P4.1.5: Database Schema Updates
New Migration: migrations/20240103000001_add_pack_installation_status.sql
-- Add installation tracking to pack table
ALTER TABLE attune.pack
ADD COLUMN installation_status TEXT DEFAULT 'not_installed',
ADD COLUMN installed_at TIMESTAMPTZ,
ADD COLUMN installation_log TEXT;
-- Create index
CREATE INDEX idx_pack_installation_status
ON attune.pack(installation_status);
Estimated Time: 1 hour
Task P4.1.6: Background Worker for Pack Installation
New File: crates/worker/src/pack_installer.rs
pub struct PackInstaller {
pool: PgPool,
venv_manager: VenvManager,
pack_storage: PackStorage,
}
impl PackInstaller {
pub async fn install_pack(&self, pack_id: i64) -> Result<()> {
// Load pack from DB
// Parse runtime_deps
// Create venv
// Install dependencies
// Update installation status
// Log installation output
}
pub async fn uninstall_pack(&self, pack_id: i64) -> Result<()> {
// Remove venv
// Remove pack files
// Update status
}
}
Estimated Time: 6 hours
P4 Subtotal: 7-10 days (20 hours estimated)
PHASE 3: LANGUAGE ECOSYSTEM SUPPORT
Priority: P2 (MEDIUM)
Estimated Time: 5-7 days
Depends On: P4 (needs pack storage structure)
P3.1: Implement Pack Installation Service
Task P3.1.1: Pack Metadata Schema
Update: crates/common/src/models.rs
/// Pack dependency specification
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PackDependencies {
pub python: Option<PythonDependencies>,
pub nodejs: Option<NodeJsDependencies>,
pub system: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PythonDependencies {
pub version: Option<String>, // ">=3.9,<4.0"
pub packages: Vec<String>, // ["requests>=2.28.0", "boto3"]
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeJsDependencies {
pub version: Option<String>,
pub packages: Vec<String>,
}
Estimated Time: 2 hours
Task P3.1.2: Node.js Runtime with npm Support
New File: crates/worker/src/runtime/nodejs.rs
pub struct NodeJsRuntime {
node_path: PathBuf,
npm_path: PathBuf,
work_dir: PathBuf,
}
impl NodeJsRuntime {
async fn install_npm_packages(
&self,
pack_ref: &str,
packages: &[String],
) -> RuntimeResult<()> {
// Create package.json
// Run npm install
}
async fn execute_nodejs_code(
&self,
script: String,
context: &ExecutionContext,
) -> RuntimeResult<ExecutionResult> {
// Similar to Python runtime
// Pass secrets via stdin
}
}
Estimated Time: 8 hours
Task P3.1.3: Runtime Detection Enhancement
File: crates/worker/src/runtime/mod.rs
/// Runtime registry with smarter detection
impl RuntimeRegistry {
pub fn get_runtime_for_action(
&self,
action: &Action,
runtime_info: &Runtime,
) -> RuntimeResult<&dyn Runtime> {
// Use action.runtime field to select runtime
// Fall back to file extension detection
// Validate runtime is installed and ready
}
}
Estimated Time: 3 hours
Task P3.1.4: Pack Upload and Extraction
New File: crates/api/src/routes/pack_upload.rs
/// POST /api/v1/packs/upload
/// Accepts .zip or .tar.gz with pack files
async fn upload_pack(
multipart: Multipart,
) -> Result<Json<ApiResponse<Pack>>> {
// Extract archive
// Parse pack.yaml
// Store pack metadata in DB
// Store pack files in pack storage
// Trigger installation
}
Estimated Time: 6 hours
P3 Subtotal: 5-7 days (19 hours estimated)
PHASE 4: LOG SIZE LIMITS
Priority: P1 (HIGH)
Estimated Time: 3-4 days
Depends On: None (can run in parallel)
P6.1: Implement Streaming Log Collection
Task P6.1.1: Add Configuration for Log Limits
File: crates/common/src/config.rs
#[derive(Debug, Clone, Deserialize)]
pub struct WorkerConfig {
// ... existing fields ...
#[serde(default)]
pub log_limits: LogLimits,
}
#[derive(Debug, Clone, Deserialize)]
pub struct LogLimits {
#[serde(default = "default_max_stdout")]
pub max_stdout_size: usize, // 10MB
#[serde(default = "default_max_stderr")]
pub max_stderr_size: usize, // 10MB
#[serde(default = "default_truncate")]
pub truncate_on_exceed: bool, // true
}
fn default_max_stdout() -> usize { 10 * 1024 * 1024 }
fn default_max_stderr() -> usize { 10 * 1024 * 1024 }
fn default_truncate() -> bool { true }
Estimated Time: 1 hour
Task P6.1.2: Implement Streaming Log Writer
New File: crates/worker/src/log_writer.rs
pub struct BoundedLogWriter {
file: File,
current_size: usize,
max_size: usize,
truncated: bool,
}
impl BoundedLogWriter {
pub async fn write_line(&mut self, line: &str) -> Result<()> {
if self.truncated {
return Ok(());
}
let line_size = line.len() + 1; // +1 for newline
if self.current_size + line_size > self.max_size {
// Write truncation notice
let notice = format!(
"\n[TRUNCATED: Log exceeded {} MB limit]\n",
self.max_size / 1024 / 1024
);
self.file.write_all(notice.as_bytes()).await?;
self.truncated = true;
return Ok(());
}
self.file.write_all(line.as_bytes()).await?;
self.file.write_all(b"\n").await?;
self.current_size += line_size;
Ok(())
}
}
Estimated Time: 2 hours
Task P6.1.3: Update Python Runtime with Streaming
File: crates/worker/src/runtime/python.rs
async fn execute_with_streaming(
&self,
mut cmd: Command,
execution_id: i64,
log_limits: &LogLimits,
) -> RuntimeResult<ExecutionResult> {
let mut child = cmd.spawn()?;
// Spawn tasks to stream stdout and stderr
let stdout_handle = if let Some(stdout) = child.stdout.take() {
let exec_id = execution_id;
let max_size = log_limits.max_stdout_size;
tokio::spawn(async move {
stream_to_file(stdout, exec_id, "stdout", max_size).await
})
} else {
// Handle None case
};
let stderr_handle = if let Some(stderr) = child.stderr.take() {
// Similar for stderr
};
// Wait for process to complete
let status = child.wait().await?;
// Wait for log streaming to complete
let stdout_result = stdout_handle.await??;
let stderr_result = stderr_handle.await??;
// Return ExecutionResult
}
async fn stream_to_file(
stream: ChildStdout,
execution_id: i64,
stream_name: &str,
max_size: usize,
) -> Result<StreamResult> {
let log_path = format!("/var/lib/attune/artifacts/execution_{}/{}.log",
execution_id, stream_name);
let file = File::create(&log_path).await?;
let mut writer = BoundedLogWriter::new(file, max_size);
let reader = BufReader::new(stream);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
writer.write_line(&line).await?;
}
Ok(StreamResult {
total_size: writer.current_size,
truncated: writer.truncated,
})
}
Estimated Time: 6 hours
Task P6.1.4: Update Artifacts Manager
File: crates/worker/src/artifacts.rs
// Add method to read logs with pagination
pub async fn read_log_page(
&self,
execution_id: i64,
stream: &str, // "stdout" or "stderr"
offset: usize,
limit: usize,
) -> Result<LogPage> {
let log_path = self.get_execution_dir(execution_id)
.join(format!("{}.log", stream));
// Read file with offset/limit
// Return LogPage with content and metadata
}
pub struct LogPage {
pub content: String,
pub offset: usize,
pub total_size: usize,
pub truncated: bool,
}
Estimated Time: 2 hours
Task P6.1.5: Add Log Retrieval API Endpoint
File: crates/api/src/routes/executions.rs
/// GET /api/v1/executions/:id/logs/:stream?offset=0&limit=1000
async fn get_execution_logs(
State(state): State<Arc<AppState>>,
Path((exec_id, stream)): Path<(i64, String)>,
Query(params): Query<LogQueryParams>,
) -> Result<Json<ApiResponse<LogPage>>> {
// Read logs from artifact storage
// Return paginated results
}
Estimated Time: 3 hours
P6 Subtotal: 3-4 days (14 hours estimated)
Testing Strategy
Unit Tests
- Secret injection methods (stdin, temp file)
- Venv creation and management
- Dependency installation
- Log size limiting
- Log streaming
Integration Tests
- End-to-end action execution with secrets
- Pack installation workflow
- Multiple packs with different Python versions
- Large log output handling
- Concurrent executions
Security Tests
- Secrets not visible in process table
- Secrets not in /proc/pid/environ
- Secrets not in command line args
- Actions can successfully access secrets
- Venv isolation between packs
Performance Tests
- Log streaming with high throughput
- Many concurrent executions
- Large pack installations
- Memory usage with large logs
Rollout Plan
Phase 1: Development (Weeks 1-3)
- Implement all features in feature branches
- Unit testing for each component
- Code review for security-critical changes
Phase 2: Integration Testing (Week 4)
- Merge all feature branches
- Run full integration test suite
- Security audit of secret handling
- Performance testing
Phase 3: Beta Testing (Week 5)
- Deploy to staging environment
- Internal testing with real packs
- Gather feedback
- Fix critical issues
Phase 4: Production Release (Week 6)
- Final security review
- Documentation complete
- Deploy to production
- Monitor for issues
Success Criteria
Must Have (v1.0)
- ✅ Secrets not visible in process table
- ✅ Per-pack Python virtual environments
- ✅ Pack installation with dependency management
- ✅ Log size limits enforced
- ✅ All security tests passing
- ✅ Documentation complete
Nice to Have (v1.1)
- Multiple Python version support
- Node.js runtime fully implemented
- Log streaming API
- Container-based runtimes
- Pack marketplace
Risk Mitigation
Risk: Breaking Existing Packs
Mitigation:
- Maintain backward compatibility mode
- Provide migration guide
- Gradual rollout with opt-in
Risk: Performance Degradation
Mitigation:
- Performance benchmarks before/after
- Optimize hot paths
- Add caching where appropriate
Risk: Security Vulnerabilities
Mitigation:
- External security audit
- Penetration testing
- Bug bounty program
Risk: Complex Dependency Resolution
Mitigation:
- Start with simple requirements.txt
- Document dependency conflicts
- Provide troubleshooting guide
Resources Needed
Development
- 1-2 senior Rust engineers (3-5 weeks)
- Security consultant (1 week review)
- Technical writer (documentation)
Infrastructure
- Staging environment for testing
- CI/CD pipeline updates
- Log storage (S3 or similar)
Tools
- Security scanning tools
- Performance profiling tools
- Load testing framework
Next Steps
- Review and Approve Plan - Team meeting to review
- Create GitHub Issues - One per major task
- Assign Owners - Who owns each phase?
- Set Milestones - Weekly checkpoints
- Begin Phase 1 - Security fixes first!
Status: Ready for Review
Last Updated: 2024-01-02
Next Review: After team approval