From 5b45b17fa610e69e505b7411acd8daacd5253680 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Tue, 10 Mar 2026 09:30:57 -0500 Subject: [PATCH] [wip] single runtime handling --- config.development.yaml | 1 + config.test.yaml | 1 + crates/api/src/authz.rs | 149 +++++ crates/api/src/dto/mod.rs | 6 + crates/api/src/dto/permission.rs | 65 +++ crates/api/src/lib.rs | 1 + crates/api/src/openapi.rs | 27 + crates/api/src/routes/actions.rs | 70 ++- crates/api/src/routes/auth.rs | 8 +- crates/api/src/routes/executions.rs | 112 +++- crates/api/src/routes/keys.rs | 135 ++++- crates/api/src/routes/mod.rs | 2 + crates/api/src/routes/packs.rs | 135 ++++- crates/api/src/routes/permissions.rs | 507 ++++++++++++++++++ crates/api/src/routes/rules.rs | 70 ++- crates/api/src/server.rs | 1 + crates/api/tests/helpers.rs | 45 ++ crates/api/tests/permissions_api_tests.rs | 178 ++++++ crates/common/src/config.rs | 6 + crates/common/src/lib.rs | 1 + crates/common/src/models.rs | 31 ++ crates/common/src/mq/messages.rs | 5 +- crates/common/src/pack_registry/loader.rs | 217 +++++++- crates/common/src/rbac.rs | 292 ++++++++++ crates/common/src/repositories/identity.rs | 66 ++- crates/executor/src/execution_manager.rs | 218 +++++++- crates/worker/src/executor.rs | 60 ++- crates/worker/src/runtime/local.rs | 40 +- crates/worker/src/runtime/mod.rs | 6 +- crates/worker/src/runtime/process.rs | 199 ++++++- crates/worker/src/runtime/process_executor.rs | 39 +- crates/worker/src/service.rs | 56 +- .../worker/tests/dependency_isolation_test.rs | 5 +- crates/worker/tests/log_truncation_test.rs | 6 +- crates/worker/tests/security_tests.rs | 7 +- docker-compose.yaml | 2 + docker/init-packs.sh | 59 ++ packs/core/permission_sets/admin.yaml | 36 ++ packs/core/permission_sets/editor.yaml | 24 + packs/core/permission_sets/executor.yaml | 20 + packs/core/permission_sets/viewer.yaml | 20 + packs/core/runtimes/shell.yaml | 4 + scripts/load_core_pack.py | 83 ++- 43 files changed, 2905 insertions(+), 110 deletions(-) create mode 100644 crates/api/src/authz.rs create mode 100644 crates/api/src/dto/permission.rs create mode 100644 crates/api/src/routes/permissions.rs create mode 100644 crates/api/tests/permissions_api_tests.rs create mode 100644 crates/common/src/rbac.rs create mode 100644 packs/core/permission_sets/admin.yaml create mode 100644 packs/core/permission_sets/editor.yaml create mode 100644 packs/core/permission_sets/executor.yaml create mode 100644 packs/core/permission_sets/viewer.yaml diff --git a/config.development.yaml b/config.development.yaml index ff4a8fa..f9972ca 100644 --- a/config.development.yaml +++ b/config.development.yaml @@ -46,6 +46,7 @@ security: jwt_refresh_expiration: 2592000 # 30 days encryption_key: test-encryption-key-32-chars-okay enable_auth: true + allow_self_registration: true # Packs directory (where pack action files are located) packs_base_dir: ./packs diff --git a/config.test.yaml b/config.test.yaml index eadeabe..5477e2f 100644 --- a/config.test.yaml +++ b/config.test.yaml @@ -48,6 +48,7 @@ security: jwt_refresh_expiration: 3600 # 1 hour encryption_key: test-encryption-key-32-chars-okay enable_auth: true + allow_self_registration: true # Test packs directory (use /tmp for tests to avoid permission issues) packs_base_dir: /tmp/attune-test-packs diff --git a/crates/api/src/authz.rs b/crates/api/src/authz.rs new file mode 100644 index 0000000..8eac627 --- /dev/null +++ b/crates/api/src/authz.rs @@ -0,0 +1,149 @@ +//! RBAC authorization service for API handlers. +//! +//! This module evaluates grants assigned to user identities via +//! `permission_set` and `permission_assignment`. + +use crate::{ + auth::{jwt::TokenType, middleware::AuthenticatedUser}, + middleware::ApiError, +}; +use attune_common::{ + rbac::{Action, AuthorizationContext, Grant, Resource}, + repositories::{ + identity::{IdentityRepository, PermissionSetRepository}, + FindById, + }, +}; +use sqlx::PgPool; + +#[derive(Debug, Clone)] +pub struct AuthorizationCheck { + pub resource: Resource, + pub action: Action, + pub context: AuthorizationContext, +} + +#[derive(Clone)] +pub struct AuthorizationService { + db: PgPool, +} + +impl AuthorizationService { + pub fn new(db: PgPool) -> Self { + Self { db } + } + + pub async fn authorize( + &self, + user: &AuthenticatedUser, + mut check: AuthorizationCheck, + ) -> Result<(), ApiError> { + // Non-access tokens are governed by dedicated scope checks in route logic. + // They are not evaluated through identity RBAC grants. + if user.claims.token_type != TokenType::Access { + return Ok(()); + } + + let identity_id = user.identity_id().map_err(|_| { + ApiError::Unauthorized("Invalid authentication subject in access token".to_string()) + })?; + + // Ensure identity exists and load identity attributes used by attribute constraints. + let identity = IdentityRepository::find_by_id(&self.db, identity_id) + .await? + .ok_or_else(|| ApiError::Unauthorized("Identity not found".to_string()))?; + + check.context.identity_id = identity_id; + check.context.identity_attributes = match identity.attributes { + serde_json::Value::Object(map) => map.into_iter().collect(), + _ => Default::default(), + }; + + let grants = self.load_effective_grants(identity_id).await?; + + let allowed = Self::is_allowed(&grants, check.resource, check.action, &check.context); + + if !allowed { + return Err(ApiError::Forbidden(format!( + "Insufficient permissions: {}:{}", + resource_name(check.resource), + action_name(check.action) + ))); + } + + Ok(()) + } + + pub async fn effective_grants(&self, user: &AuthenticatedUser) -> Result, ApiError> { + if user.claims.token_type != TokenType::Access { + return Ok(Vec::new()); + } + + let identity_id = user.identity_id().map_err(|_| { + ApiError::Unauthorized("Invalid authentication subject in access token".to_string()) + })?; + self.load_effective_grants(identity_id).await + } + + pub fn is_allowed( + grants: &[Grant], + resource: Resource, + action: Action, + context: &AuthorizationContext, + ) -> bool { + grants.iter().any(|g| g.allows(resource, action, context)) + } + + async fn load_effective_grants(&self, identity_id: i64) -> Result, ApiError> { + let permission_sets = + PermissionSetRepository::find_by_identity(&self.db, identity_id).await?; + + let mut grants = Vec::new(); + for permission_set in permission_sets { + let set_grants: Vec = + serde_json::from_value(permission_set.grants).map_err(|e| { + ApiError::InternalServerError(format!( + "Invalid grant schema in permission set '{}': {}", + permission_set.r#ref, e + )) + })?; + grants.extend(set_grants); + } + + Ok(grants) + } +} + +fn resource_name(resource: Resource) -> &'static str { + match resource { + Resource::Packs => "packs", + Resource::Actions => "actions", + Resource::Rules => "rules", + Resource::Triggers => "triggers", + Resource::Executions => "executions", + Resource::Events => "events", + Resource::Enforcements => "enforcements", + Resource::Inquiries => "inquiries", + Resource::Keys => "keys", + Resource::Artifacts => "artifacts", + Resource::Workflows => "workflows", + Resource::Webhooks => "webhooks", + Resource::Analytics => "analytics", + Resource::History => "history", + Resource::Identities => "identities", + Resource::Permissions => "permissions", + } +} + +fn action_name(action: Action) -> &'static str { + match action { + Action::Read => "read", + Action::Create => "create", + Action::Update => "update", + Action::Delete => "delete", + Action::Execute => "execute", + Action::Cancel => "cancel", + Action::Respond => "respond", + Action::Manage => "manage", + } +} diff --git a/crates/api/src/dto/mod.rs b/crates/api/src/dto/mod.rs index 2078ae1..3d19b5f 100644 --- a/crates/api/src/dto/mod.rs +++ b/crates/api/src/dto/mod.rs @@ -11,6 +11,7 @@ pub mod history; pub mod inquiry; pub mod key; pub mod pack; +pub mod permission; pub mod rule; pub mod trigger; pub mod webhook; @@ -48,6 +49,11 @@ pub use inquiry::{ }; pub use key::{CreateKeyRequest, KeyQueryParams, KeyResponse, KeySummary, UpdateKeyRequest}; pub use pack::{CreatePackRequest, PackResponse, PackSummary, UpdatePackRequest}; +pub use permission::{ + CreateIdentityRequest, CreatePermissionAssignmentRequest, IdentityResponse, IdentitySummary, + PermissionAssignmentResponse, PermissionSetQueryParams, PermissionSetSummary, + UpdateIdentityRequest, +}; pub use rule::{CreateRuleRequest, RuleResponse, RuleSummary, UpdateRuleRequest}; pub use trigger::{ CreateSensorRequest, CreateTriggerRequest, SensorResponse, SensorSummary, TriggerResponse, diff --git a/crates/api/src/dto/permission.rs b/crates/api/src/dto/permission.rs new file mode 100644 index 0000000..ce12db5 --- /dev/null +++ b/crates/api/src/dto/permission.rs @@ -0,0 +1,65 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use utoipa::{IntoParams, ToSchema}; +use validator::Validate; + +#[derive(Debug, Clone, Deserialize, IntoParams)] +pub struct PermissionSetQueryParams { + #[serde(default)] + pub pack_ref: Option, +} + +#[derive(Debug, Clone, Serialize, ToSchema)] +pub struct IdentitySummary { + pub id: i64, + pub login: String, + pub display_name: Option, + pub attributes: JsonValue, +} + +pub type IdentityResponse = IdentitySummary; + +#[derive(Debug, Clone, Serialize, ToSchema)] +pub struct PermissionSetSummary { + pub id: i64, + pub r#ref: String, + pub pack_ref: Option, + pub label: Option, + pub description: Option, + pub grants: JsonValue, +} + +#[derive(Debug, Clone, Serialize, ToSchema)] +pub struct PermissionAssignmentResponse { + pub id: i64, + pub identity_id: i64, + pub permission_set_id: i64, + pub permission_set_ref: String, + pub created: chrono::DateTime, +} + +#[derive(Debug, Clone, Deserialize, ToSchema)] +pub struct CreatePermissionAssignmentRequest { + pub identity_id: Option, + pub identity_login: Option, + pub permission_set_ref: String, +} + +#[derive(Debug, Clone, Deserialize, Validate, ToSchema)] +pub struct CreateIdentityRequest { + #[validate(length(min = 3, max = 255))] + pub login: String, + #[validate(length(max = 255))] + pub display_name: Option, + #[validate(length(min = 8, max = 128))] + pub password: Option, + #[serde(default)] + pub attributes: JsonValue, +} + +#[derive(Debug, Clone, Deserialize, ToSchema)] +pub struct UpdateIdentityRequest { + pub display_name: Option, + pub password: Option, + pub attributes: Option, +} diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index b157549..64b7773 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -5,6 +5,7 @@ //! It is primarily used by the binary target and integration tests. pub mod auth; +pub mod authz; pub mod dto; pub mod middleware; pub mod openapi; diff --git a/crates/api/src/openapi.rs b/crates/api/src/openapi.rs index 56cea45..57db570 100644 --- a/crates/api/src/openapi.rs +++ b/crates/api/src/openapi.rs @@ -26,6 +26,10 @@ use crate::dto::{ PackWorkflowSyncResponse, PackWorkflowValidationResponse, RegisterPackRequest, UpdatePackRequest, WorkflowSyncResult, }, + permission::{ + CreateIdentityRequest, CreatePermissionAssignmentRequest, IdentityResponse, + IdentitySummary, PermissionAssignmentResponse, PermissionSetSummary, UpdateIdentityRequest, + }, rule::{CreateRuleRequest, RuleResponse, RuleSummary, UpdateRuleRequest}, trigger::{ CreateSensorRequest, CreateTriggerRequest, SensorResponse, SensorSummary, TriggerResponse, @@ -160,6 +164,17 @@ use crate::dto::{ crate::routes::keys::update_key, crate::routes::keys::delete_key, + // Permissions + crate::routes::permissions::list_identities, + crate::routes::permissions::get_identity, + crate::routes::permissions::create_identity, + crate::routes::permissions::update_identity, + crate::routes::permissions::delete_identity, + crate::routes::permissions::list_permission_sets, + crate::routes::permissions::list_identity_permissions, + crate::routes::permissions::create_permission_assignment, + crate::routes::permissions::delete_permission_assignment, + // Workflows crate::routes::workflows::list_workflows, crate::routes::workflows::list_workflows_by_pack, @@ -190,6 +205,8 @@ use crate::dto::{ ApiResponse, ApiResponse, ApiResponse, + ApiResponse, + ApiResponse, ApiResponse, ApiResponse, PaginatedResponse, @@ -202,6 +219,7 @@ use crate::dto::{ PaginatedResponse, PaginatedResponse, PaginatedResponse, + PaginatedResponse, PaginatedResponse, PaginationMeta, SuccessResponse, @@ -233,6 +251,15 @@ use crate::dto::{ attune_common::models::pack_test::PackTestSummary, PaginatedResponse, + // Permission DTOs + CreateIdentityRequest, + UpdateIdentityRequest, + IdentityResponse, + PermissionSetSummary, + PermissionAssignmentResponse, + CreatePermissionAssignmentRequest, + IdentitySummary, + // Action DTOs CreateActionRequest, UpdateActionRequest, diff --git a/crates/api/src/routes/actions.rs b/crates/api/src/routes/actions.rs index b8c94c1..497242a 100644 --- a/crates/api/src/routes/actions.rs +++ b/crates/api/src/routes/actions.rs @@ -10,6 +10,7 @@ use axum::{ use std::sync::Arc; use validator::Validate; +use attune_common::rbac::{Action, AuthorizationContext, Resource}; use attune_common::repositories::{ action::{ActionRepository, ActionSearchFilters, CreateActionInput, UpdateActionInput}, pack::PackRepository, @@ -19,6 +20,7 @@ use attune_common::repositories::{ use crate::{ auth::middleware::RequireAuth, + authz::{AuthorizationCheck, AuthorizationService}, dto::{ action::{ ActionResponse, ActionSummary, CreateActionRequest, QueueStatsResponse, @@ -153,7 +155,7 @@ pub async fn get_action( )] pub async fn create_action( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Json(request): Json, ) -> ApiResult { // Validate request @@ -175,6 +177,26 @@ pub async fn create_action( .await? .ok_or_else(|| ApiError::NotFound(format!("Pack '{}' not found", request.pack_ref)))?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.pack_ref = Some(pack.r#ref.clone()); + ctx.target_ref = Some(request.r#ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Actions, + action: Action::Create, + context: ctx, + }, + ) + .await?; + } + // If runtime is specified, we could verify it exists (future enhancement) // For now, the database foreign key constraint will handle invalid runtime IDs @@ -219,7 +241,7 @@ pub async fn create_action( )] pub async fn update_action( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Path(action_ref): Path, Json(request): Json, ) -> ApiResult { @@ -231,6 +253,27 @@ pub async fn update_action( .await? .ok_or_else(|| ApiError::NotFound(format!("Action '{}' not found", action_ref)))?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_id = Some(existing_action.id); + ctx.target_ref = Some(existing_action.r#ref.clone()); + ctx.pack_ref = Some(existing_action.pack_ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Actions, + action: Action::Update, + context: ctx, + }, + ) + .await?; + } + // Create update input let update_input = UpdateActionInput { label: request.label, @@ -269,7 +312,7 @@ pub async fn update_action( )] pub async fn delete_action( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Path(action_ref): Path, ) -> ApiResult { // Check if action exists @@ -277,6 +320,27 @@ pub async fn delete_action( .await? .ok_or_else(|| ApiError::NotFound(format!("Action '{}' not found", action_ref)))?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_id = Some(action.id); + ctx.target_ref = Some(action.r#ref.clone()); + ctx.pack_ref = Some(action.pack_ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Actions, + action: Action::Delete, + context: ctx, + }, + ) + .await?; + } + // Delete the action let deleted = ActionRepository::delete(&state.db, action.id).await?; diff --git a/crates/api/src/routes/auth.rs b/crates/api/src/routes/auth.rs index 94ba402..1076791 100644 --- a/crates/api/src/routes/auth.rs +++ b/crates/api/src/routes/auth.rs @@ -152,6 +152,12 @@ pub async fn register( State(state): State, Json(payload): Json, ) -> Result>, ApiError> { + if !state.config.security.allow_self_registration { + return Err(ApiError::Forbidden( + "Self-service registration is disabled; identities must be provisioned by an administrator or identity provider".to_string(), + )); + } + // Validate request payload .validate() @@ -171,7 +177,7 @@ pub async fn register( // Hash password let password_hash = hash_password(&payload.password)?; - // Create identity with password hash + // Registration creates an identity only; permission assignments are managed separately. let input = CreateIdentityInput { login: payload.login.clone(), display_name: payload.display_name, diff --git a/crates/api/src/routes/executions.rs b/crates/api/src/routes/executions.rs index da95d87..d3b0ca2 100644 --- a/crates/api/src/routes/executions.rs +++ b/crates/api/src/routes/executions.rs @@ -10,6 +10,7 @@ use axum::{ routing::get, Json, Router, }; +use chrono::Utc; use futures::stream::{Stream, StreamExt}; use std::sync::Arc; use tokio_stream::wrappers::BroadcastStream; @@ -32,6 +33,7 @@ use sqlx::Row; use crate::{ auth::middleware::RequireAuth, + authz::{AuthorizationCheck, AuthorizationService}, dto::{ common::{PaginatedResponse, PaginationParams}, execution::{ @@ -42,6 +44,7 @@ use crate::{ middleware::{ApiError, ApiResult}, state::AppState, }; +use attune_common::rbac::{Action, AuthorizationContext, Resource}; /// Create a new execution (manual execution) /// @@ -61,7 +64,7 @@ use crate::{ )] pub async fn create_execution( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Json(request): Json, ) -> ApiResult { // Validate that the action exists @@ -69,6 +72,42 @@ pub async fn create_execution( .await? .ok_or_else(|| ApiError::NotFound(format!("Action '{}' not found", request.action_ref)))?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + + let mut action_ctx = AuthorizationContext::new(identity_id); + action_ctx.target_id = Some(action.id); + action_ctx.target_ref = Some(action.r#ref.clone()); + action_ctx.pack_ref = Some(action.pack_ref.clone()); + + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Actions, + action: Action::Execute, + context: action_ctx, + }, + ) + .await?; + + let mut execution_ctx = AuthorizationContext::new(identity_id); + execution_ctx.pack_ref = Some(action.pack_ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Executions, + action: Action::Create, + context: execution_ctx, + }, + ) + .await?; + } + // Create execution input let execution_input = CreateExecutionInput { action: Some(action.id), @@ -440,9 +479,17 @@ pub async fn cancel_execution( ..Default::default() }; let updated = ExecutionRepository::update(&state.db, id, update).await?; + let delegated_to_executor = publish_status_change_to_executor( + publisher.as_deref(), + &execution, + ExecutionStatus::Cancelled, + "api-service", + ) + .await; - // Cascade to workflow children if this is a workflow execution - cancel_workflow_children(&state.db, publisher.as_deref(), id).await; + if !delegated_to_executor { + cancel_workflow_children(&state.db, publisher.as_deref(), id).await; + } let response = ApiResponse::new(ExecutionResponse::from(updated)); return Ok((StatusCode::OK, Json(response))); @@ -454,6 +501,13 @@ pub async fn cancel_execution( ..Default::default() }; let updated = ExecutionRepository::update(&state.db, id, update).await?; + let delegated_to_executor = publish_status_change_to_executor( + publisher.as_deref(), + &execution, + ExecutionStatus::Canceling, + "api-service", + ) + .await; // Send cancel request to the worker via MQ if let Some(worker_id) = execution.executor { @@ -465,8 +519,9 @@ pub async fn cancel_execution( ); } - // Cascade to workflow children if this is a workflow execution - cancel_workflow_children(&state.db, publisher.as_deref(), id).await; + if !delegated_to_executor { + cancel_workflow_children(&state.db, publisher.as_deref(), id).await; + } let response = ApiResponse::new(ExecutionResponse::from(updated)); Ok((StatusCode::OK, Json(response))) @@ -504,6 +559,53 @@ async fn send_cancel_to_worker(publisher: Option<&Publisher>, execution_id: i64, } } +async fn publish_status_change_to_executor( + publisher: Option<&Publisher>, + execution: &attune_common::models::Execution, + new_status: ExecutionStatus, + source: &str, +) -> bool { + let Some(publisher) = publisher else { + return false; + }; + + let new_status = match new_status { + ExecutionStatus::Requested => "requested", + ExecutionStatus::Scheduling => "scheduling", + ExecutionStatus::Scheduled => "scheduled", + ExecutionStatus::Running => "running", + ExecutionStatus::Completed => "completed", + ExecutionStatus::Failed => "failed", + ExecutionStatus::Canceling => "canceling", + ExecutionStatus::Cancelled => "cancelled", + ExecutionStatus::Timeout => "timeout", + ExecutionStatus::Abandoned => "abandoned", + }; + + let payload = attune_common::mq::ExecutionStatusChangedPayload { + execution_id: execution.id, + action_ref: execution.action_ref.clone(), + previous_status: format!("{:?}", execution.status).to_lowercase(), + new_status: new_status.to_string(), + changed_at: Utc::now(), + }; + + let envelope = MessageEnvelope::new(MessageType::ExecutionStatusChanged, payload) + .with_source(source) + .with_correlation_id(uuid::Uuid::new_v4()); + + if let Err(e) = publisher.publish_envelope(&envelope).await { + tracing::error!( + "Failed to publish status change for execution {} to executor: {}", + execution.id, + e + ); + return false; + } + + true +} + /// Resolve the [`CancellationPolicy`] for a workflow parent execution. /// /// Looks up the `workflow_execution` → `workflow_definition` chain and diff --git a/crates/api/src/routes/keys.rs b/crates/api/src/routes/keys.rs index e2f70bf..7cf2ec9 100644 --- a/crates/api/src/routes/keys.rs +++ b/crates/api/src/routes/keys.rs @@ -10,7 +10,6 @@ use axum::{ use std::sync::Arc; use validator::Validate; -use attune_common::models::OwnerType; use attune_common::repositories::{ action::ActionRepository, key::{CreateKeyInput, KeyRepository, KeySearchFilters, UpdateKeyInput}, @@ -18,9 +17,14 @@ use attune_common::repositories::{ trigger::SensorRepository, Create, Delete, FindByRef, Update, }; +use attune_common::{ + models::{key::Key, OwnerType}, + rbac::{Action, AuthorizationContext, Resource}, +}; -use crate::auth::RequireAuth; +use crate::auth::{jwt::TokenType, RequireAuth}; use crate::{ + authz::{AuthorizationCheck, AuthorizationService}, dto::{ common::{PaginatedResponse, PaginationParams}, key::{CreateKeyRequest, KeyQueryParams, KeyResponse, KeySummary, UpdateKeyRequest}, @@ -42,7 +46,7 @@ use crate::{ security(("bearer_auth" = [])) )] pub async fn list_keys( - _user: RequireAuth, + user: RequireAuth, State(state): State>, Query(query): Query, ) -> ApiResult { @@ -55,8 +59,33 @@ pub async fn list_keys( }; let result = KeyRepository::search(&state.db, &filters).await?; + let mut rows = result.rows; - let paginated_keys: Vec = result.rows.into_iter().map(KeySummary::from).collect(); + if user.0.claims.token_type == TokenType::Access { + let identity_id = user + .0 + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let grants = authz.effective_grants(&user.0).await?; + + // Ensure the principal can read at least some key records. + let can_read_any_key = grants + .iter() + .any(|g| g.resource == Resource::Keys && g.actions.contains(&Action::Read)); + if !can_read_any_key { + return Err(ApiError::Forbidden( + "Insufficient permissions: keys:read".to_string(), + )); + } + + rows.retain(|key| { + let ctx = key_authorization_context(identity_id, key); + AuthorizationService::is_allowed(&grants, Resource::Keys, Action::Read, &ctx) + }); + } + + let paginated_keys: Vec = rows.into_iter().map(KeySummary::from).collect(); let pagination_params = PaginationParams { page: query.page, @@ -83,7 +112,7 @@ pub async fn list_keys( security(("bearer_auth" = [])) )] pub async fn get_key( - _user: RequireAuth, + user: RequireAuth, State(state): State>, Path(key_ref): Path, ) -> ApiResult { @@ -91,6 +120,26 @@ pub async fn get_key( .await? .ok_or_else(|| ApiError::NotFound(format!("Key '{}' not found", key_ref)))?; + if user.0.claims.token_type == TokenType::Access { + let identity_id = user + .0 + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + authz + .authorize( + &user.0, + AuthorizationCheck { + resource: Resource::Keys, + action: Action::Read, + context: key_authorization_context(identity_id, &key), + }, + ) + .await + // Hide unauthorized records behind 404 to reduce enumeration leakage. + .map_err(|_| ApiError::NotFound(format!("Key '{}' not found", key_ref)))?; + } + // Decrypt value if encrypted if key.encrypted { let encryption_key = state @@ -130,13 +179,37 @@ pub async fn get_key( security(("bearer_auth" = [])) )] pub async fn create_key( - _user: RequireAuth, + user: RequireAuth, State(state): State>, Json(request): Json, ) -> ApiResult { // Validate request request.validate()?; + if user.0.claims.token_type == TokenType::Access { + let identity_id = user + .0 + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.owner_identity_id = request.owner_identity; + ctx.owner_type = Some(request.owner_type); + ctx.encrypted = Some(request.encrypted); + ctx.target_ref = Some(request.r#ref.clone()); + + authz + .authorize( + &user.0, + AuthorizationCheck { + resource: Resource::Keys, + action: Action::Create, + context: ctx, + }, + ) + .await?; + } + // Check if key with same ref already exists if KeyRepository::find_by_ref(&state.db, &request.r#ref) .await? @@ -299,7 +372,7 @@ pub async fn create_key( security(("bearer_auth" = [])) )] pub async fn update_key( - _user: RequireAuth, + user: RequireAuth, State(state): State>, Path(key_ref): Path, Json(request): Json, @@ -312,6 +385,24 @@ pub async fn update_key( .await? .ok_or_else(|| ApiError::NotFound(format!("Key '{}' not found", key_ref)))?; + if user.0.claims.token_type == TokenType::Access { + let identity_id = user + .0 + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + authz + .authorize( + &user.0, + AuthorizationCheck { + resource: Resource::Keys, + action: Action::Update, + context: key_authorization_context(identity_id, &existing), + }, + ) + .await?; + } + // Handle value update with encryption let (value, encrypted, encryption_key_hash) = if let Some(new_value) = request.value { let should_encrypt = request.encrypted.unwrap_or(existing.encrypted); @@ -395,7 +486,7 @@ pub async fn update_key( security(("bearer_auth" = [])) )] pub async fn delete_key( - _user: RequireAuth, + user: RequireAuth, State(state): State>, Path(key_ref): Path, ) -> ApiResult { @@ -404,6 +495,24 @@ pub async fn delete_key( .await? .ok_or_else(|| ApiError::NotFound(format!("Key '{}' not found", key_ref)))?; + if user.0.claims.token_type == TokenType::Access { + let identity_id = user + .0 + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + authz + .authorize( + &user.0, + AuthorizationCheck { + resource: Resource::Keys, + action: Action::Delete, + context: key_authorization_context(identity_id, &key), + }, + ) + .await?; + } + // Delete the key let deleted = KeyRepository::delete(&state.db, key.id).await?; @@ -425,3 +534,13 @@ pub fn routes() -> Router> { get(get_key).put(update_key).delete(delete_key), ) } + +fn key_authorization_context(identity_id: i64, key: &Key) -> AuthorizationContext { + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_id = Some(key.id); + ctx.target_ref = Some(key.r#ref.clone()); + ctx.owner_identity_id = key.owner_identity; + ctx.owner_type = Some(key.owner_type); + ctx.encrypted = Some(key.encrypted); + ctx +} diff --git a/crates/api/src/routes/mod.rs b/crates/api/src/routes/mod.rs index 2ccf225..459f33d 100644 --- a/crates/api/src/routes/mod.rs +++ b/crates/api/src/routes/mod.rs @@ -11,6 +11,7 @@ pub mod history; pub mod inquiries; pub mod keys; pub mod packs; +pub mod permissions; pub mod rules; pub mod triggers; pub mod webhooks; @@ -27,6 +28,7 @@ pub use history::routes as history_routes; pub use inquiries::routes as inquiry_routes; pub use keys::routes as key_routes; pub use packs::routes as pack_routes; +pub use permissions::routes as permission_routes; pub use rules::routes as rule_routes; pub use triggers::routes as trigger_routes; pub use webhooks::routes as webhook_routes; diff --git a/crates/api/src/routes/packs.rs b/crates/api/src/routes/packs.rs index 5ac62eb..55fce10 100644 --- a/crates/api/src/routes/packs.rs +++ b/crates/api/src/routes/packs.rs @@ -13,6 +13,7 @@ use validator::Validate; use attune_common::models::pack_test::PackTestResult; use attune_common::mq::{MessageEnvelope, MessageType, PackRegisteredPayload}; +use attune_common::rbac::{Action, AuthorizationContext, Resource}; use attune_common::repositories::{ pack::{CreatePackInput, UpdatePackInput}, Create, Delete, FindById, FindByRef, PackRepository, PackTestRepository, Pagination, Update, @@ -21,6 +22,7 @@ use attune_common::workflow::{PackWorkflowService, PackWorkflowServiceConfig}; use crate::{ auth::middleware::RequireAuth, + authz::{AuthorizationCheck, AuthorizationService}, dto::{ common::{PaginatedResponse, PaginationParams}, pack::{ @@ -115,7 +117,7 @@ pub async fn get_pack( )] pub async fn create_pack( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Json(request): Json, ) -> ApiResult { // Validate request @@ -129,6 +131,25 @@ pub async fn create_pack( ))); } + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_ref = Some(request.r#ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Packs, + action: Action::Create, + context: ctx, + }, + ) + .await?; + } + // Create pack input let pack_input = CreatePackInput { r#ref: request.r#ref, @@ -202,7 +223,7 @@ pub async fn create_pack( )] pub async fn update_pack( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Path(pack_ref): Path, Json(request): Json, ) -> ApiResult { @@ -214,6 +235,26 @@ pub async fn update_pack( .await? .ok_or_else(|| ApiError::NotFound(format!("Pack '{}' not found", pack_ref)))?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_id = Some(existing_pack.id); + ctx.target_ref = Some(existing_pack.r#ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Packs, + action: Action::Update, + context: ctx, + }, + ) + .await?; + } + // Create update input let update_input = UpdatePackInput { label: request.label, @@ -284,7 +325,7 @@ pub async fn update_pack( )] pub async fn delete_pack( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Path(pack_ref): Path, ) -> ApiResult { // Check if pack exists @@ -292,6 +333,26 @@ pub async fn delete_pack( .await? .ok_or_else(|| ApiError::NotFound(format!("Pack '{}' not found", pack_ref)))?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_id = Some(pack.id); + ctx.target_ref = Some(pack.r#ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Packs, + action: Action::Delete, + context: ctx, + }, + ) + .await?; + } + // Delete the pack from the database (cascades to actions, triggers, sensors, rules, etc. // Foreign keys on execution, event, enforcement, and rule tables use ON DELETE SET NULL // so historical records are preserved with their text ref fields intact.) @@ -475,6 +536,23 @@ pub async fn upload_pack( const MAX_PACK_SIZE: usize = 100 * 1024 * 1024; // 100 MB + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Packs, + action: Action::Create, + context: AuthorizationContext::new(identity_id), + }, + ) + .await?; + } + let mut pack_bytes: Option> = None; let mut force = false; let mut skip_tests = false; @@ -649,6 +727,23 @@ pub async fn register_pack( // Validate request request.validate()?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Packs, + action: Action::Create, + context: AuthorizationContext::new(identity_id), + }, + ) + .await?; + } + // Call internal registration logic let pack_id = register_pack_internal( state.clone(), @@ -1207,6 +1302,23 @@ pub async fn install_pack( tracing::info!("Installing pack from source: {}", request.source); + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Packs, + action: Action::Create, + context: AuthorizationContext::new(identity_id), + }, + ) + .await?; + } + // Get user ID early to avoid borrow issues let user_id = user.identity_id().ok(); let user_sub = user.claims.sub.clone(); @@ -2247,6 +2359,23 @@ pub async fn register_packs_batch( RequireAuth(user): RequireAuth, Json(request): Json, ) -> ApiResult>> { + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Packs, + action: Action::Create, + context: AuthorizationContext::new(identity_id), + }, + ) + .await?; + } + let start = std::time::Instant::now(); let mut registered = Vec::new(); let mut failed = Vec::new(); diff --git a/crates/api/src/routes/permissions.rs b/crates/api/src/routes/permissions.rs new file mode 100644 index 0000000..ebdd8d4 --- /dev/null +++ b/crates/api/src/routes/permissions.rs @@ -0,0 +1,507 @@ +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse, + routing::{delete, get, post}, + Json, Router, +}; +use std::sync::Arc; +use validator::Validate; + +use attune_common::{ + models::identity::{Identity, PermissionSet}, + rbac::{Action, AuthorizationContext, Resource}, + repositories::{ + identity::{ + CreateIdentityInput, CreatePermissionAssignmentInput, IdentityRepository, + PermissionAssignmentRepository, PermissionSetRepository, UpdateIdentityInput, + }, + Create, Delete, FindById, FindByRef, List, Update, + }, +}; + +use crate::{ + auth::hash_password, + auth::middleware::RequireAuth, + authz::{AuthorizationCheck, AuthorizationService}, + dto::{ + common::{PaginatedResponse, PaginationParams}, + ApiResponse, CreateIdentityRequest, CreatePermissionAssignmentRequest, IdentityResponse, + IdentitySummary, PermissionAssignmentResponse, PermissionSetQueryParams, + PermissionSetSummary, SuccessResponse, UpdateIdentityRequest, + }, + middleware::{ApiError, ApiResult}, + state::AppState, +}; + +#[utoipa::path( + get, + path = "/api/v1/identities", + tag = "permissions", + params(PaginationParams), + responses( + (status = 200, description = "List identities", body = PaginatedResponse) + ), + security(("bearer_auth" = [])) +)] +pub async fn list_identities( + State(state): State>, + RequireAuth(user): RequireAuth, + Query(query): Query, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Identities, Action::Read).await?; + + let identities = IdentityRepository::list(&state.db).await?; + let total = identities.len() as u64; + let start = query.offset() as usize; + let end = (start + query.limit() as usize).min(identities.len()); + let page_items = if start >= identities.len() { + Vec::new() + } else { + identities[start..end] + .iter() + .cloned() + .map(IdentitySummary::from) + .collect() + }; + + Ok(( + StatusCode::OK, + Json(PaginatedResponse::new(page_items, &query, total)), + )) +} + +#[utoipa::path( + get, + path = "/api/v1/identities/{id}", + tag = "permissions", + params( + ("id" = i64, Path, description = "Identity ID") + ), + responses( + (status = 200, description = "Identity details", body = inline(ApiResponse)), + (status = 404, description = "Identity not found") + ), + security(("bearer_auth" = [])) +)] +pub async fn get_identity( + State(state): State>, + RequireAuth(user): RequireAuth, + Path(identity_id): Path, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Identities, Action::Read).await?; + + let identity = IdentityRepository::find_by_id(&state.db, identity_id) + .await? + .ok_or_else(|| ApiError::NotFound(format!("Identity '{}' not found", identity_id)))?; + + Ok(( + StatusCode::OK, + Json(ApiResponse::new(IdentityResponse::from(identity))), + )) +} + +#[utoipa::path( + post, + path = "/api/v1/identities", + tag = "permissions", + request_body = CreateIdentityRequest, + responses( + (status = 201, description = "Identity created", body = inline(ApiResponse)), + (status = 409, description = "Identity already exists") + ), + security(("bearer_auth" = [])) +)] +pub async fn create_identity( + State(state): State>, + RequireAuth(user): RequireAuth, + Json(request): Json, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Identities, Action::Create).await?; + request.validate()?; + + let password_hash = match request.password { + Some(password) => Some(hash_password(&password)?), + None => None, + }; + + let identity = IdentityRepository::create( + &state.db, + CreateIdentityInput { + login: request.login, + display_name: request.display_name, + password_hash, + attributes: request.attributes, + }, + ) + .await?; + + Ok(( + StatusCode::CREATED, + Json(ApiResponse::new(IdentityResponse::from(identity))), + )) +} + +#[utoipa::path( + put, + path = "/api/v1/identities/{id}", + tag = "permissions", + params( + ("id" = i64, Path, description = "Identity ID") + ), + request_body = UpdateIdentityRequest, + responses( + (status = 200, description = "Identity updated", body = inline(ApiResponse)), + (status = 404, description = "Identity not found") + ), + security(("bearer_auth" = [])) +)] +pub async fn update_identity( + State(state): State>, + RequireAuth(user): RequireAuth, + Path(identity_id): Path, + Json(request): Json, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Identities, Action::Update).await?; + + IdentityRepository::find_by_id(&state.db, identity_id) + .await? + .ok_or_else(|| ApiError::NotFound(format!("Identity '{}' not found", identity_id)))?; + + let password_hash = match request.password { + Some(password) => Some(hash_password(&password)?), + None => None, + }; + + let identity = IdentityRepository::update( + &state.db, + identity_id, + UpdateIdentityInput { + display_name: request.display_name, + password_hash, + attributes: request.attributes, + }, + ) + .await?; + + Ok(( + StatusCode::OK, + Json(ApiResponse::new(IdentityResponse::from(identity))), + )) +} + +#[utoipa::path( + delete, + path = "/api/v1/identities/{id}", + tag = "permissions", + params( + ("id" = i64, Path, description = "Identity ID") + ), + responses( + (status = 200, description = "Identity deleted", body = inline(ApiResponse)), + (status = 404, description = "Identity not found") + ), + security(("bearer_auth" = [])) +)] +pub async fn delete_identity( + State(state): State>, + RequireAuth(user): RequireAuth, + Path(identity_id): Path, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Identities, Action::Delete).await?; + + let caller_identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + if caller_identity_id == identity_id { + return Err(ApiError::BadRequest( + "Refusing to delete the currently authenticated identity".to_string(), + )); + } + + let deleted = IdentityRepository::delete(&state.db, identity_id).await?; + if !deleted { + return Err(ApiError::NotFound(format!( + "Identity '{}' not found", + identity_id + ))); + } + + Ok(( + StatusCode::OK, + Json(ApiResponse::new(SuccessResponse::new( + "Identity deleted successfully", + ))), + )) +} + +#[utoipa::path( + get, + path = "/api/v1/permissions/sets", + tag = "permissions", + params(PermissionSetQueryParams), + responses( + (status = 200, description = "List permission sets", body = Vec) + ), + security(("bearer_auth" = [])) +)] +pub async fn list_permission_sets( + State(state): State>, + RequireAuth(user): RequireAuth, + Query(query): Query, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Permissions, Action::Read).await?; + + let mut permission_sets = PermissionSetRepository::list(&state.db).await?; + if let Some(pack_ref) = &query.pack_ref { + permission_sets.retain(|ps| ps.pack_ref.as_deref() == Some(pack_ref.as_str())); + } + + let response: Vec = permission_sets + .into_iter() + .map(PermissionSetSummary::from) + .collect(); + + Ok((StatusCode::OK, Json(response))) +} + +#[utoipa::path( + get, + path = "/api/v1/identities/{id}/permissions", + tag = "permissions", + params( + ("id" = i64, Path, description = "Identity ID") + ), + responses( + (status = 200, description = "List permission assignments for an identity", body = Vec), + (status = 404, description = "Identity not found") + ), + security(("bearer_auth" = [])) +)] +pub async fn list_identity_permissions( + State(state): State>, + RequireAuth(user): RequireAuth, + Path(identity_id): Path, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Permissions, Action::Read).await?; + + IdentityRepository::find_by_id(&state.db, identity_id) + .await? + .ok_or_else(|| ApiError::NotFound(format!("Identity '{}' not found", identity_id)))?; + + let assignments = + PermissionAssignmentRepository::find_by_identity(&state.db, identity_id).await?; + let permission_sets = PermissionSetRepository::find_by_identity(&state.db, identity_id).await?; + + let permission_set_refs = permission_sets + .into_iter() + .map(|ps| (ps.id, ps.r#ref)) + .collect::>(); + + let response: Vec = assignments + .into_iter() + .filter_map(|assignment| { + permission_set_refs + .get(&assignment.permset) + .cloned() + .map(|permission_set_ref| PermissionAssignmentResponse { + id: assignment.id, + identity_id: assignment.identity, + permission_set_id: assignment.permset, + permission_set_ref, + created: assignment.created, + }) + }) + .collect(); + + Ok((StatusCode::OK, Json(response))) +} + +#[utoipa::path( + post, + path = "/api/v1/permissions/assignments", + tag = "permissions", + request_body = CreatePermissionAssignmentRequest, + responses( + (status = 201, description = "Permission assignment created", body = inline(ApiResponse)), + (status = 404, description = "Identity or permission set not found"), + (status = 409, description = "Assignment already exists") + ), + security(("bearer_auth" = [])) +)] +pub async fn create_permission_assignment( + State(state): State>, + RequireAuth(user): RequireAuth, + Json(request): Json, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Permissions, Action::Manage).await?; + + let identity = resolve_identity(&state, &request).await?; + let permission_set = + PermissionSetRepository::find_by_ref(&state.db, &request.permission_set_ref) + .await? + .ok_or_else(|| { + ApiError::NotFound(format!( + "Permission set '{}' not found", + request.permission_set_ref + )) + })?; + + let assignment = PermissionAssignmentRepository::create( + &state.db, + CreatePermissionAssignmentInput { + identity: identity.id, + permset: permission_set.id, + }, + ) + .await?; + + let response = PermissionAssignmentResponse { + id: assignment.id, + identity_id: assignment.identity, + permission_set_id: assignment.permset, + permission_set_ref: permission_set.r#ref, + created: assignment.created, + }; + + Ok((StatusCode::CREATED, Json(ApiResponse::new(response)))) +} + +#[utoipa::path( + delete, + path = "/api/v1/permissions/assignments/{id}", + tag = "permissions", + params( + ("id" = i64, Path, description = "Permission assignment ID") + ), + responses( + (status = 200, description = "Permission assignment deleted", body = inline(ApiResponse)), + (status = 404, description = "Assignment not found") + ), + security(("bearer_auth" = [])) +)] +pub async fn delete_permission_assignment( + State(state): State>, + RequireAuth(user): RequireAuth, + Path(assignment_id): Path, +) -> ApiResult { + authorize_permissions(&state, &user, Resource::Permissions, Action::Manage).await?; + + let existing = PermissionAssignmentRepository::find_by_id(&state.db, assignment_id) + .await? + .ok_or_else(|| { + ApiError::NotFound(format!( + "Permission assignment '{}' not found", + assignment_id + )) + })?; + + let deleted = PermissionAssignmentRepository::delete(&state.db, existing.id).await?; + if !deleted { + return Err(ApiError::NotFound(format!( + "Permission assignment '{}' not found", + assignment_id + ))); + } + + Ok(( + StatusCode::OK, + Json(ApiResponse::new(SuccessResponse::new( + "Permission assignment deleted successfully", + ))), + )) +} + +pub fn routes() -> Router> { + Router::new() + .route("/identities", get(list_identities).post(create_identity)) + .route( + "/identities/{id}", + get(get_identity) + .put(update_identity) + .delete(delete_identity), + ) + .route( + "/identities/{id}/permissions", + get(list_identity_permissions), + ) + .route("/permissions/sets", get(list_permission_sets)) + .route( + "/permissions/assignments", + post(create_permission_assignment), + ) + .route( + "/permissions/assignments/{id}", + delete(delete_permission_assignment), + ) +} + +async fn authorize_permissions( + state: &Arc, + user: &crate::auth::middleware::AuthenticatedUser, + resource: Resource, + action: Action, +) -> ApiResult<()> { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + authz + .authorize( + user, + AuthorizationCheck { + resource, + action, + context: AuthorizationContext::new(identity_id), + }, + ) + .await +} + +async fn resolve_identity( + state: &Arc, + request: &CreatePermissionAssignmentRequest, +) -> ApiResult { + match (request.identity_id, request.identity_login.as_deref()) { + (Some(identity_id), None) => IdentityRepository::find_by_id(&state.db, identity_id) + .await? + .ok_or_else(|| ApiError::NotFound(format!("Identity '{}' not found", identity_id))), + (None, Some(identity_login)) => { + IdentityRepository::find_by_login(&state.db, identity_login) + .await? + .ok_or_else(|| { + ApiError::NotFound(format!("Identity '{}' not found", identity_login)) + }) + } + (Some(_), Some(_)) => Err(ApiError::BadRequest( + "Provide either identity_id or identity_login, not both".to_string(), + )), + (None, None) => Err(ApiError::BadRequest( + "Either identity_id or identity_login is required".to_string(), + )), + } +} + +impl From for IdentitySummary { + fn from(value: Identity) -> Self { + Self { + id: value.id, + login: value.login, + display_name: value.display_name, + attributes: value.attributes, + } + } +} + +impl From for PermissionSetSummary { + fn from(value: PermissionSet) -> Self { + Self { + id: value.id, + r#ref: value.r#ref, + pack_ref: value.pack_ref, + label: value.label, + description: value.description, + grants: value.grants, + } + } +} diff --git a/crates/api/src/routes/rules.rs b/crates/api/src/routes/rules.rs index a57f017..8e96a22 100644 --- a/crates/api/src/routes/rules.rs +++ b/crates/api/src/routes/rules.rs @@ -14,6 +14,7 @@ use validator::Validate; use attune_common::mq::{ MessageEnvelope, MessageType, RuleCreatedPayload, RuleDisabledPayload, RuleEnabledPayload, }; +use attune_common::rbac::{Action, AuthorizationContext, Resource}; use attune_common::repositories::{ action::ActionRepository, pack::PackRepository, @@ -24,6 +25,7 @@ use attune_common::repositories::{ use crate::{ auth::middleware::RequireAuth, + authz::{AuthorizationCheck, AuthorizationService}, dto::{ common::{PaginatedResponse, PaginationParams}, rule::{CreateRuleRequest, RuleResponse, RuleSummary, UpdateRuleRequest}, @@ -283,7 +285,7 @@ pub async fn get_rule( )] pub async fn create_rule( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Json(request): Json, ) -> ApiResult { // Validate request @@ -317,6 +319,26 @@ pub async fn create_rule( ApiError::NotFound(format!("Trigger '{}' not found", request.trigger_ref)) })?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.pack_ref = Some(pack.r#ref.clone()); + ctx.target_ref = Some(request.r#ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Rules, + action: Action::Create, + context: ctx, + }, + ) + .await?; + } + // Validate trigger parameters against schema validate_trigger_params(&trigger, &request.trigger_params)?; @@ -392,7 +414,7 @@ pub async fn create_rule( )] pub async fn update_rule( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Path(rule_ref): Path, Json(request): Json, ) -> ApiResult { @@ -404,6 +426,27 @@ pub async fn update_rule( .await? .ok_or_else(|| ApiError::NotFound(format!("Rule '{}' not found", rule_ref)))?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_id = Some(existing_rule.id); + ctx.target_ref = Some(existing_rule.r#ref.clone()); + ctx.pack_ref = Some(existing_rule.pack_ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Rules, + action: Action::Update, + context: ctx, + }, + ) + .await?; + } + // If action parameters are being updated, validate against the action's schema if let Some(ref action_params) = request.action_params { let action = ActionRepository::find_by_ref(&state.db, &existing_rule.action_ref) @@ -489,7 +532,7 @@ pub async fn update_rule( )] pub async fn delete_rule( State(state): State>, - RequireAuth(_user): RequireAuth, + RequireAuth(user): RequireAuth, Path(rule_ref): Path, ) -> ApiResult { // Check if rule exists @@ -497,6 +540,27 @@ pub async fn delete_rule( .await? .ok_or_else(|| ApiError::NotFound(format!("Rule '{}' not found", rule_ref)))?; + if user.claims.token_type == crate::auth::jwt::TokenType::Access { + let identity_id = user + .identity_id() + .map_err(|_| ApiError::Unauthorized("Invalid user identity".to_string()))?; + let authz = AuthorizationService::new(state.db.clone()); + let mut ctx = AuthorizationContext::new(identity_id); + ctx.target_id = Some(rule.id); + ctx.target_ref = Some(rule.r#ref.clone()); + ctx.pack_ref = Some(rule.pack_ref.clone()); + authz + .authorize( + &user, + AuthorizationCheck { + resource: Resource::Rules, + action: Action::Delete, + context: ctx, + }, + ) + .await?; + } + // Delete the rule let deleted = RuleRepository::delete(&state.db, rule.id).await?; diff --git a/crates/api/src/server.rs b/crates/api/src/server.rs index dfa4eef..4c0c94b 100644 --- a/crates/api/src/server.rs +++ b/crates/api/src/server.rs @@ -53,6 +53,7 @@ impl Server { .merge(routes::inquiry_routes()) .merge(routes::event_routes()) .merge(routes::key_routes()) + .merge(routes::permission_routes()) .merge(routes::workflow_routes()) .merge(routes::webhook_routes()) .merge(routes::history_routes()) diff --git a/crates/api/tests/helpers.rs b/crates/api/tests/helpers.rs index d90dded..11640a5 100644 --- a/crates/api/tests/helpers.rs +++ b/crates/api/tests/helpers.rs @@ -9,6 +9,10 @@ use attune_common::{ models::*, repositories::{ action::{ActionRepository, CreateActionInput}, + identity::{ + CreatePermissionAssignmentInput, CreatePermissionSetInput, + PermissionAssignmentRepository, PermissionSetRepository, + }, pack::{CreatePackInput, PackRepository}, trigger::{CreateTriggerInput, TriggerRepository}, workflow::{CreateWorkflowDefinitionInput, WorkflowDefinitionRepository}, @@ -246,6 +250,47 @@ impl TestContext { Ok(self) } + /// Create and authenticate a test user with identity + permission admin grants. + pub async fn with_admin_auth(mut self) -> Result { + let unique_id = uuid::Uuid::new_v4().to_string().replace("-", "")[..8].to_string(); + let login = format!("adminuser_{}", unique_id); + let token = self.create_test_user(&login).await?; + + let identity = attune_common::repositories::identity::IdentityRepository::find_by_login( + &self.pool, &login, + ) + .await? + .ok_or_else(|| format!("Failed to find newly created identity '{}'", login))?; + + let permset = PermissionSetRepository::create( + &self.pool, + CreatePermissionSetInput { + r#ref: "core.admin".to_string(), + pack: None, + pack_ref: None, + label: Some("Admin".to_string()), + description: Some("Test admin permission set".to_string()), + grants: json!([ + {"resource": "identities", "actions": ["read", "create", "update", "delete"]}, + {"resource": "permissions", "actions": ["read", "create", "update", "delete", "manage"]} + ]), + }, + ) + .await?; + + PermissionAssignmentRepository::create( + &self.pool, + CreatePermissionAssignmentInput { + identity: identity.id, + permset: permset.id, + }, + ) + .await?; + + self.token = Some(token); + Ok(self) + } + /// Create a test user and return access token async fn create_test_user(&self, login: &str) -> Result { // Register via API to get real token diff --git a/crates/api/tests/permissions_api_tests.rs b/crates/api/tests/permissions_api_tests.rs new file mode 100644 index 0000000..4a0308e --- /dev/null +++ b/crates/api/tests/permissions_api_tests.rs @@ -0,0 +1,178 @@ +use axum::http::StatusCode; +use helpers::*; +use serde_json::json; + +mod helpers; + +#[tokio::test] +#[ignore = "integration test — requires database"] +async fn test_identity_crud_and_permission_assignment_flow() { + let ctx = TestContext::new() + .await + .expect("Failed to create test context") + .with_admin_auth() + .await + .expect("Failed to create admin-authenticated test user"); + + let create_identity_response = ctx + .post( + "/api/v1/identities", + json!({ + "login": "managed_user", + "display_name": "Managed User", + "password": "ManagedPass123!", + "attributes": { + "department": "platform" + } + }), + ctx.token(), + ) + .await + .expect("Failed to create identity"); + + assert_eq!(create_identity_response.status(), StatusCode::CREATED); + + let created_identity: serde_json::Value = create_identity_response + .json() + .await + .expect("Failed to parse identity create response"); + let identity_id = created_identity["data"]["id"] + .as_i64() + .expect("Missing identity id"); + + let list_identities_response = ctx + .get("/api/v1/identities", ctx.token()) + .await + .expect("Failed to list identities"); + assert_eq!(list_identities_response.status(), StatusCode::OK); + + let identities_body: serde_json::Value = list_identities_response + .json() + .await + .expect("Failed to parse identities response"); + assert!(identities_body["data"] + .as_array() + .expect("Expected data array") + .iter() + .any(|item| item["login"] == "managed_user")); + + let update_identity_response = ctx + .put( + &format!("/api/v1/identities/{}", identity_id), + json!({ + "display_name": "Managed User Updated", + "attributes": { + "department": "security" + } + }), + ctx.token(), + ) + .await + .expect("Failed to update identity"); + assert_eq!(update_identity_response.status(), StatusCode::OK); + + let get_identity_response = ctx + .get(&format!("/api/v1/identities/{}", identity_id), ctx.token()) + .await + .expect("Failed to get identity"); + assert_eq!(get_identity_response.status(), StatusCode::OK); + + let identity_body: serde_json::Value = get_identity_response + .json() + .await + .expect("Failed to parse get identity response"); + assert_eq!( + identity_body["data"]["display_name"], + "Managed User Updated" + ); + assert_eq!( + identity_body["data"]["attributes"]["department"], + "security" + ); + + let permission_sets_response = ctx + .get("/api/v1/permissions/sets", ctx.token()) + .await + .expect("Failed to list permission sets"); + assert_eq!(permission_sets_response.status(), StatusCode::OK); + + let assignment_response = ctx + .post( + "/api/v1/permissions/assignments", + json!({ + "identity_id": identity_id, + "permission_set_ref": "core.admin" + }), + ctx.token(), + ) + .await + .expect("Failed to create permission assignment"); + assert_eq!(assignment_response.status(), StatusCode::CREATED); + + let assignment_body: serde_json::Value = assignment_response + .json() + .await + .expect("Failed to parse permission assignment response"); + let assignment_id = assignment_body["data"]["id"] + .as_i64() + .expect("Missing assignment id"); + assert_eq!(assignment_body["data"]["permission_set_ref"], "core.admin"); + + let list_assignments_response = ctx + .get( + &format!("/api/v1/identities/{}/permissions", identity_id), + ctx.token(), + ) + .await + .expect("Failed to list identity permissions"); + assert_eq!(list_assignments_response.status(), StatusCode::OK); + + let assignments_body: serde_json::Value = list_assignments_response + .json() + .await + .expect("Failed to parse identity permissions response"); + assert!(assignments_body + .as_array() + .expect("Expected array response") + .iter() + .any(|item| item["permission_set_ref"] == "core.admin")); + + let delete_assignment_response = ctx + .delete( + &format!("/api/v1/permissions/assignments/{}", assignment_id), + ctx.token(), + ) + .await + .expect("Failed to delete assignment"); + assert_eq!(delete_assignment_response.status(), StatusCode::OK); + + let delete_identity_response = ctx + .delete(&format!("/api/v1/identities/{}", identity_id), ctx.token()) + .await + .expect("Failed to delete identity"); + assert_eq!(delete_identity_response.status(), StatusCode::OK); + + let missing_identity_response = ctx + .get(&format!("/api/v1/identities/{}", identity_id), ctx.token()) + .await + .expect("Failed to fetch deleted identity"); + assert_eq!(missing_identity_response.status(), StatusCode::NOT_FOUND); +} + +#[tokio::test] +#[ignore = "integration test — requires database"] +async fn test_plain_authenticated_user_cannot_manage_identities() { + let ctx = TestContext::new() + .await + .expect("Failed to create test context") + .with_auth() + .await + .expect("Failed to authenticate plain test user"); + + let response = ctx + .get("/api/v1/identities", ctx.token()) + .await + .expect("Failed to call identities endpoint"); + + assert_eq!(response.status(), StatusCode::FORBIDDEN); +} diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index f94c38d..83c5378 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -295,6 +295,10 @@ pub struct SecurityConfig { /// Enable authentication #[serde(default = "default_true")] pub enable_auth: bool, + + /// Allow unauthenticated self-service user registration + #[serde(default)] + pub allow_self_registration: bool, } fn default_jwt_access_expiration() -> u64 { @@ -676,6 +680,7 @@ impl Default for SecurityConfig { jwt_refresh_expiration: default_jwt_refresh_expiration(), encryption_key: None, enable_auth: true, + allow_self_registration: false, } } } @@ -924,6 +929,7 @@ mod tests { jwt_refresh_expiration: 604800, encryption_key: Some("a".repeat(32)), enable_auth: true, + allow_self_registration: false, }, worker: None, sensor: None, diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 442d79d..4872efa 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -15,6 +15,7 @@ pub mod models; pub mod mq; pub mod pack_environment; pub mod pack_registry; +pub mod rbac; pub mod repositories; pub mod runtime_detection; pub mod schema; diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index ef31f77..0c9b07c 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -430,6 +430,10 @@ pub mod runtime { #[serde(default)] pub interpreter: InterpreterConfig, + /// Strategy for inline code execution. + #[serde(default)] + pub inline_execution: InlineExecutionConfig, + /// Optional isolated environment configuration (venv, node_modules, etc.) #[serde(default)] pub environment: Option, @@ -449,6 +453,33 @@ pub mod runtime { pub env_vars: HashMap, } + /// Controls how inline code is materialized before execution. + #[derive(Debug, Clone, Serialize, Deserialize, Default)] + pub struct InlineExecutionConfig { + /// Whether inline code is passed directly to the interpreter or first + /// written to a temporary file. + #[serde(default)] + pub strategy: InlineExecutionStrategy, + + /// Optional extension for temporary inline files (e.g. ".sh"). + #[serde(default)] + pub extension: Option, + + /// When true, inline wrapper files export the merged input map as shell + /// environment variables (`PARAM_*` and bare names) before executing the + /// script body. + #[serde(default)] + pub inject_shell_helpers: bool, + } + + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] + #[serde(rename_all = "snake_case")] + pub enum InlineExecutionStrategy { + #[default] + Direct, + TempFile, + } + /// Describes the interpreter binary and how it invokes action scripts. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct InterpreterConfig { diff --git a/crates/common/src/mq/messages.rs b/crates/common/src/mq/messages.rs index 34c5946..a0e443e 100644 --- a/crates/common/src/mq/messages.rs +++ b/crates/common/src/mq/messages.rs @@ -481,9 +481,8 @@ pub struct PackRegisteredPayload { /// Payload for ExecutionCancelRequested message /// -/// Sent by the API to the worker that is running a specific execution, -/// instructing it to gracefully terminate the process (SIGINT, then SIGTERM -/// after a grace period). +/// Sent by the API or executor to the worker that is running a specific +/// execution, instructing it to terminate the process promptly. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ExecutionCancelRequestedPayload { /// Execution ID to cancel diff --git a/crates/common/src/pack_registry/loader.rs b/crates/common/src/pack_registry/loader.rs index 5002e42..ae2b1dd 100644 --- a/crates/common/src/pack_registry/loader.rs +++ b/crates/common/src/pack_registry/loader.rs @@ -1,14 +1,15 @@ //! Pack Component Loader //! -//! Reads runtime, action, trigger, and sensor YAML definitions from a pack directory +//! Reads permission set, runtime, action, trigger, and sensor YAML definitions from a pack directory //! and registers them in the database. This is the Rust-native equivalent of //! the Python `load_core_pack.py` script used during init-packs. //! //! Components are loaded in dependency order: -//! 1. Runtimes (no dependencies) -//! 2. Triggers (no dependencies) -//! 3. Actions (depend on runtime; workflow actions also create workflow_definition records) -//! 4. Sensors (depend on triggers and runtime) +//! 1. Permission sets (no dependencies) +//! 2. Runtimes (no dependencies) +//! 3. Triggers (no dependencies) +//! 4. Actions (depend on runtime; workflow actions also create workflow_definition records) +//! 5. Sensors (depend on triggers and runtime) //! //! All loaders use **upsert** semantics: if an entity with the same ref already //! exists it is updated in place (preserving its database ID); otherwise a new @@ -38,6 +39,9 @@ use tracing::{debug, info, warn}; use crate::error::{Error, Result}; use crate::models::Id; use crate::repositories::action::{ActionRepository, UpdateActionInput}; +use crate::repositories::identity::{ + CreatePermissionSetInput, PermissionSetRepository, UpdatePermissionSetInput, +}; use crate::repositories::runtime::{CreateRuntimeInput, RuntimeRepository, UpdateRuntimeInput}; use crate::repositories::runtime_version::{ CreateRuntimeVersionInput, RuntimeVersionRepository, UpdateRuntimeVersionInput, @@ -56,6 +60,12 @@ use crate::workflow::parser::parse_workflow_yaml; /// Result of loading pack components into the database. #[derive(Debug, Default)] pub struct PackLoadResult { + /// Number of permission sets created + pub permission_sets_loaded: usize, + /// Number of permission sets updated + pub permission_sets_updated: usize, + /// Number of permission sets skipped + pub permission_sets_skipped: usize, /// Number of runtimes created pub runtimes_loaded: usize, /// Number of runtimes updated (already existed) @@ -88,15 +98,27 @@ pub struct PackLoadResult { impl PackLoadResult { pub fn total_loaded(&self) -> usize { - self.runtimes_loaded + self.triggers_loaded + self.actions_loaded + self.sensors_loaded + self.permission_sets_loaded + + self.runtimes_loaded + + self.triggers_loaded + + self.actions_loaded + + self.sensors_loaded } pub fn total_skipped(&self) -> usize { - self.runtimes_skipped + self.triggers_skipped + self.actions_skipped + self.sensors_skipped + self.permission_sets_skipped + + self.runtimes_skipped + + self.triggers_skipped + + self.actions_skipped + + self.sensors_skipped } pub fn total_updated(&self) -> usize { - self.runtimes_updated + self.triggers_updated + self.actions_updated + self.sensors_updated + self.permission_sets_updated + + self.runtimes_updated + + self.triggers_updated + + self.actions_updated + + self.sensors_updated } } @@ -132,22 +154,26 @@ impl<'a> PackComponentLoader<'a> { pack_dir.display() ); - // 1. Load runtimes first (no dependencies) + // 1. Load permission sets first (no dependencies) + let permission_set_refs = self.load_permission_sets(pack_dir, &mut result).await?; + + // 2. Load runtimes (no dependencies) let runtime_refs = self.load_runtimes(pack_dir, &mut result).await?; - // 2. Load triggers (no dependencies) + // 3. Load triggers (no dependencies) let (trigger_ids, trigger_refs) = self.load_triggers(pack_dir, &mut result).await?; - // 3. Load actions (depend on runtime) + // 4. Load actions (depend on runtime) let action_refs = self.load_actions(pack_dir, &mut result).await?; - // 4. Load sensors (depend on triggers and runtime) + // 5. Load sensors (depend on triggers and runtime) let sensor_refs = self .load_sensors(pack_dir, &trigger_ids, &mut result) .await?; - // 5. Clean up entities that are no longer in the pack's YAML files + // 6. Clean up entities that are no longer in the pack's YAML files self.cleanup_removed_entities( + &permission_set_refs, &runtime_refs, &trigger_refs, &action_refs, @@ -169,6 +195,146 @@ impl<'a> PackComponentLoader<'a> { Ok(result) } + /// Load permission set definitions from `pack_dir/permission_sets/*.yaml`. + /// + /// Permission sets are pack-scoped authorization metadata. Their `grants` + /// payload is stored verbatim and interpreted by the API authorization + /// layer at request time. + async fn load_permission_sets( + &self, + pack_dir: &Path, + result: &mut PackLoadResult, + ) -> Result> { + let permission_sets_dir = pack_dir.join("permission_sets"); + let mut loaded_refs = Vec::new(); + + if !permission_sets_dir.exists() { + info!( + "No permission_sets directory found for pack '{}'", + self.pack_ref + ); + return Ok(loaded_refs); + } + + let yaml_files = read_yaml_files(&permission_sets_dir)?; + info!( + "Found {} permission set definition(s) for pack '{}'", + yaml_files.len(), + self.pack_ref + ); + + for (filename, content) in &yaml_files { + let data: serde_yaml_ng::Value = serde_yaml_ng::from_str(content).map_err(|e| { + Error::validation(format!( + "Failed to parse permission set YAML {}: {}", + filename, e + )) + })?; + + let permission_set_ref = match data.get("ref").and_then(|v| v.as_str()) { + Some(r) => r.to_string(), + None => { + let msg = format!( + "Permission set YAML {} missing 'ref' field, skipping", + filename + ); + warn!("{}", msg); + result.warnings.push(msg); + result.permission_sets_skipped += 1; + continue; + } + }; + + let label = data + .get("label") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let description = data + .get("description") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let grants = data + .get("grants") + .and_then(|v| serde_json::to_value(v).ok()) + .unwrap_or_else(|| serde_json::json!([])); + + if !grants.is_array() { + let msg = format!( + "Permission set '{}' has non-array 'grants', skipping", + permission_set_ref + ); + warn!("{}", msg); + result.warnings.push(msg); + result.permission_sets_skipped += 1; + continue; + } + + if let Some(existing) = + PermissionSetRepository::find_by_ref(self.pool, &permission_set_ref).await? + { + let update_input = UpdatePermissionSetInput { + label, + description, + grants: Some(grants), + }; + + match PermissionSetRepository::update(self.pool, existing.id, update_input).await { + Ok(_) => { + info!( + "Updated permission set '{}' (ID: {})", + permission_set_ref, existing.id + ); + result.permission_sets_updated += 1; + } + Err(e) => { + let msg = format!( + "Failed to update permission set '{}': {}", + permission_set_ref, e + ); + warn!("{}", msg); + result.warnings.push(msg); + result.permission_sets_skipped += 1; + } + } + loaded_refs.push(permission_set_ref); + continue; + } + + let input = CreatePermissionSetInput { + r#ref: permission_set_ref.clone(), + pack: Some(self.pack_id), + pack_ref: Some(self.pack_ref.clone()), + label, + description, + grants, + }; + + match PermissionSetRepository::create(self.pool, input).await { + Ok(permission_set) => { + info!( + "Created permission set '{}' (ID: {})", + permission_set_ref, permission_set.id + ); + result.permission_sets_loaded += 1; + loaded_refs.push(permission_set_ref); + } + Err(e) => { + let msg = format!( + "Failed to create permission set '{}': {}", + permission_set_ref, e + ); + warn!("{}", msg); + result.warnings.push(msg); + result.permission_sets_skipped += 1; + } + } + } + + Ok(loaded_refs) + } + /// Load runtime definitions from `pack_dir/runtimes/*.yaml`. /// /// Runtimes define how actions and sensors are executed (interpreter, @@ -1308,12 +1474,37 @@ impl<'a> PackComponentLoader<'a> { /// removed. async fn cleanup_removed_entities( &self, + permission_set_refs: &[String], runtime_refs: &[String], trigger_refs: &[String], action_refs: &[String], sensor_refs: &[String], result: &mut PackLoadResult, ) { + match PermissionSetRepository::delete_by_pack_excluding( + self.pool, + self.pack_id, + permission_set_refs, + ) + .await + { + Ok(count) => { + if count > 0 { + info!( + "Removed {} stale permission set(s) from pack '{}'", + count, self.pack_ref + ); + result.removed += count as usize; + } + } + Err(e) => { + warn!( + "Failed to clean up stale permission sets for pack '{}': {}", + self.pack_ref, e + ); + } + } + // Clean up sensors first (they depend on triggers/runtimes) match SensorRepository::delete_by_pack_excluding(self.pool, self.pack_id, sensor_refs).await { diff --git a/crates/common/src/rbac.rs b/crates/common/src/rbac.rs new file mode 100644 index 0000000..9813dba --- /dev/null +++ b/crates/common/src/rbac.rs @@ -0,0 +1,292 @@ +//! Role-based access control (RBAC) model and evaluator. +//! +//! Permission sets store `grants` as a JSON array of [`Grant`]. +//! This module defines the canonical grant schema and matching logic. + +use crate::models::{ArtifactVisibility, Id, OwnerType}; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use std::collections::HashMap; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "snake_case")] +pub enum Resource { + Packs, + Actions, + Rules, + Triggers, + Executions, + Events, + Enforcements, + Inquiries, + Keys, + Artifacts, + Workflows, + Webhooks, + Analytics, + History, + Identities, + Permissions, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "snake_case")] +pub enum Action { + Read, + Create, + Update, + Delete, + Execute, + Cancel, + Respond, + Manage, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum OwnerConstraint { + #[serde(rename = "self")] + SelfOnly, + Any, + None, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ExecutionScopeConstraint { + #[serde(rename = "self")] + SelfOnly, + Descendants, + Any, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] +pub struct GrantConstraints { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub pack_refs: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub owner: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub owner_types: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub visibility: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub execution_scope: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub refs: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub ids: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub encrypted: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub attributes: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Grant { + pub resource: Resource, + pub actions: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub constraints: Option, +} + +#[derive(Debug, Clone)] +pub struct AuthorizationContext { + pub identity_id: Id, + pub identity_attributes: HashMap, + pub target_id: Option, + pub target_ref: Option, + pub pack_ref: Option, + pub owner_identity_id: Option, + pub owner_type: Option, + pub visibility: Option, + pub encrypted: Option, + pub execution_owner_identity_id: Option, + pub execution_ancestor_identity_ids: Vec, +} + +impl AuthorizationContext { + pub fn new(identity_id: Id) -> Self { + Self { + identity_id, + identity_attributes: HashMap::new(), + target_id: None, + target_ref: None, + pack_ref: None, + owner_identity_id: None, + owner_type: None, + visibility: None, + encrypted: None, + execution_owner_identity_id: None, + execution_ancestor_identity_ids: Vec::new(), + } + } +} + +impl Grant { + pub fn allows(&self, resource: Resource, action: Action, ctx: &AuthorizationContext) -> bool { + self.resource == resource && self.actions.contains(&action) && self.constraints_match(ctx) + } + + fn constraints_match(&self, ctx: &AuthorizationContext) -> bool { + let Some(constraints) = &self.constraints else { + return true; + }; + + if let Some(pack_refs) = &constraints.pack_refs { + let Some(pack_ref) = &ctx.pack_ref else { + return false; + }; + if !pack_refs.contains(pack_ref) { + return false; + } + } + + if let Some(owner) = constraints.owner { + let owner_match = match owner { + OwnerConstraint::SelfOnly => ctx.owner_identity_id == Some(ctx.identity_id), + OwnerConstraint::Any => true, + OwnerConstraint::None => ctx.owner_identity_id.is_none(), + }; + if !owner_match { + return false; + } + } + + if let Some(owner_types) = &constraints.owner_types { + let Some(owner_type) = ctx.owner_type else { + return false; + }; + if !owner_types.contains(&owner_type) { + return false; + } + } + + if let Some(visibility) = &constraints.visibility { + let Some(target_visibility) = ctx.visibility else { + return false; + }; + if !visibility.contains(&target_visibility) { + return false; + } + } + + if let Some(execution_scope) = constraints.execution_scope { + let execution_match = match execution_scope { + ExecutionScopeConstraint::SelfOnly => { + ctx.execution_owner_identity_id == Some(ctx.identity_id) + } + ExecutionScopeConstraint::Descendants => { + ctx.execution_owner_identity_id == Some(ctx.identity_id) + || ctx + .execution_ancestor_identity_ids + .contains(&ctx.identity_id) + } + ExecutionScopeConstraint::Any => true, + }; + if !execution_match { + return false; + } + } + + if let Some(refs) = &constraints.refs { + let Some(target_ref) = &ctx.target_ref else { + return false; + }; + if !refs.contains(target_ref) { + return false; + } + } + + if let Some(ids) = &constraints.ids { + let Some(target_id) = ctx.target_id else { + return false; + }; + if !ids.contains(&target_id) { + return false; + } + } + + if let Some(encrypted) = constraints.encrypted { + let Some(target_encrypted) = ctx.encrypted else { + return false; + }; + if encrypted != target_encrypted { + return false; + } + } + + if let Some(attributes) = &constraints.attributes { + for (key, expected_value) in attributes { + let Some(actual_value) = ctx.identity_attributes.get(key) else { + return false; + }; + if actual_value != expected_value { + return false; + } + } + } + + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn grant_without_constraints_allows() { + let grant = Grant { + resource: Resource::Actions, + actions: vec![Action::Read], + constraints: None, + }; + let ctx = AuthorizationContext::new(42); + assert!(grant.allows(Resource::Actions, Action::Read, &ctx)); + assert!(!grant.allows(Resource::Actions, Action::Create, &ctx)); + } + + #[test] + fn key_constraint_owner_type_and_encrypted() { + let grant = Grant { + resource: Resource::Keys, + actions: vec![Action::Read], + constraints: Some(GrantConstraints { + owner_types: Some(vec![OwnerType::System]), + encrypted: Some(false), + ..Default::default() + }), + }; + + let mut ctx = AuthorizationContext::new(1); + ctx.owner_type = Some(OwnerType::System); + ctx.encrypted = Some(false); + assert!(grant.allows(Resource::Keys, Action::Read, &ctx)); + + ctx.encrypted = Some(true); + assert!(!grant.allows(Resource::Keys, Action::Read, &ctx)); + } + + #[test] + fn attributes_constraint_requires_exact_value_match() { + let grant = Grant { + resource: Resource::Packs, + actions: vec![Action::Read], + constraints: Some(GrantConstraints { + attributes: Some(HashMap::from([("team".to_string(), json!("platform"))])), + ..Default::default() + }), + }; + + let mut ctx = AuthorizationContext::new(1); + ctx.identity_attributes + .insert("team".to_string(), json!("platform")); + assert!(grant.allows(Resource::Packs, Action::Read, &ctx)); + + ctx.identity_attributes + .insert("team".to_string(), json!("infra")); + assert!(!grant.allows(Resource::Packs, Action::Read, &ctx)); + } +} diff --git a/crates/common/src/repositories/identity.rs b/crates/common/src/repositories/identity.rs index c3cb8b6..5341f4b 100644 --- a/crates/common/src/repositories/identity.rs +++ b/crates/common/src/repositories/identity.rs @@ -4,7 +4,7 @@ use crate::models::{identity::*, Id, JsonDict}; use crate::Result; use sqlx::{Executor, Postgres, QueryBuilder}; -use super::{Create, Delete, FindById, List, Repository, Update}; +use super::{Create, Delete, FindById, FindByRef, List, Repository, Update}; pub struct IdentityRepository; @@ -200,6 +200,22 @@ impl FindById for PermissionSetRepository { } } +#[async_trait::async_trait] +impl FindByRef for PermissionSetRepository { + async fn find_by_ref<'e, E>(executor: E, ref_str: &str) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + sqlx::query_as::<_, PermissionSet>( + "SELECT id, ref, pack, pack_ref, label, description, grants, created, updated FROM permission_set WHERE ref = $1" + ) + .bind(ref_str) + .fetch_optional(executor) + .await + .map_err(Into::into) + } +} + #[async_trait::async_trait] impl List for PermissionSetRepository { async fn list<'e, E>(executor: E) -> Result> @@ -287,6 +303,54 @@ impl Delete for PermissionSetRepository { } } +impl PermissionSetRepository { + pub async fn find_by_identity<'e, E>(executor: E, identity_id: Id) -> Result> + where + E: Executor<'e, Database = Postgres> + 'e, + { + sqlx::query_as::<_, PermissionSet>( + "SELECT ps.id, ps.ref, ps.pack, ps.pack_ref, ps.label, ps.description, ps.grants, ps.created, ps.updated + FROM permission_set ps + INNER JOIN permission_assignment pa ON pa.permset = ps.id + WHERE pa.identity = $1 + ORDER BY ps.ref ASC", + ) + .bind(identity_id) + .fetch_all(executor) + .await + .map_err(Into::into) + } + + /// Delete permission sets belonging to a pack whose refs are NOT in the given set. + /// + /// Used during pack reinstallation to clean up permission sets that were + /// removed from the pack's metadata. Associated permission assignments are + /// cascade-deleted by the FK constraint. + pub async fn delete_by_pack_excluding<'e, E>( + executor: E, + pack_id: Id, + keep_refs: &[String], + ) -> Result + where + E: Executor<'e, Database = Postgres> + 'e, + { + let result = if keep_refs.is_empty() { + sqlx::query("DELETE FROM permission_set WHERE pack = $1") + .bind(pack_id) + .execute(executor) + .await? + } else { + sqlx::query("DELETE FROM permission_set WHERE pack = $1 AND ref != ALL($2)") + .bind(pack_id) + .bind(keep_refs) + .execute(executor) + .await? + }; + + Ok(result.rows_affected()) + } +} + // Permission Assignment Repository pub struct PermissionAssignmentRepository; diff --git a/crates/executor/src/execution_manager.rs b/crates/executor/src/execution_manager.rs index 8fcfdb4..4ba8086 100644 --- a/crates/executor/src/execution_manager.rs +++ b/crates/executor/src/execution_manager.rs @@ -21,13 +21,17 @@ use anyhow::Result; use attune_common::{ models::{enums::ExecutionStatus, Execution}, mq::{ - Consumer, ExecutionRequestedPayload, ExecutionStatusChangedPayload, MessageEnvelope, - MessageType, Publisher, + Consumer, ExecutionCancelRequestedPayload, ExecutionRequestedPayload, + ExecutionStatusChangedPayload, MessageEnvelope, MessageType, Publisher, }, repositories::{ - execution::{CreateExecutionInput, ExecutionRepository}, - Create, FindById, + execution::{CreateExecutionInput, ExecutionRepository, UpdateExecutionInput}, + workflow::{ + UpdateWorkflowExecutionInput, WorkflowDefinitionRepository, WorkflowExecutionRepository, + }, + Create, FindById, Update, }, + workflow::{CancellationPolicy, WorkflowDefinition}, }; use sqlx::PgPool; @@ -116,8 +120,18 @@ impl ExecutionManager { "Execution {} reached terminal state: {:?}, handling orchestration", execution_id, status ); + if status == ExecutionStatus::Cancelled { + Self::handle_workflow_cancellation(pool, publisher, &execution).await?; + } Self::handle_completion(pool, publisher, &execution).await?; } + ExecutionStatus::Canceling => { + debug!( + "Execution {} entered canceling state; checking for workflow child cancellation", + execution_id + ); + Self::handle_workflow_cancellation(pool, publisher, &execution).await?; + } ExecutionStatus::Running => { debug!( "Execution {} now running (worker has updated DB)", @@ -135,6 +149,202 @@ impl ExecutionManager { Ok(()) } + async fn handle_workflow_cancellation( + pool: &PgPool, + publisher: &Publisher, + execution: &Execution, + ) -> Result<()> { + let Some(_) = WorkflowExecutionRepository::find_by_execution(pool, execution.id).await? + else { + return Ok(()); + }; + + let policy = Self::resolve_cancellation_policy(pool, execution.id).await; + Self::cancel_workflow_children_with_policy(pool, publisher, execution.id, policy).await + } + + async fn resolve_cancellation_policy( + pool: &PgPool, + parent_execution_id: i64, + ) -> CancellationPolicy { + let wf_exec = + match WorkflowExecutionRepository::find_by_execution(pool, parent_execution_id).await { + Ok(Some(wf)) => wf, + _ => return CancellationPolicy::default(), + }; + + let wf_def = + match WorkflowDefinitionRepository::find_by_id(pool, wf_exec.workflow_def).await { + Ok(Some(def)) => def, + _ => return CancellationPolicy::default(), + }; + + match serde_json::from_value::(wf_def.definition) { + Ok(def) => def.cancellation_policy, + Err(e) => { + warn!( + "Failed to deserialize workflow definition for workflow_def {}: {}. Falling back to default cancellation policy.", + wf_exec.workflow_def, e + ); + CancellationPolicy::default() + } + } + } + + async fn cancel_workflow_children_with_policy( + pool: &PgPool, + publisher: &Publisher, + parent_execution_id: i64, + policy: CancellationPolicy, + ) -> Result<()> { + let children: Vec = sqlx::query_as::<_, Execution>(&format!( + "SELECT {} FROM execution WHERE parent = $1 AND status NOT IN ('completed', 'failed', 'timeout', 'cancelled', 'abandoned')", + attune_common::repositories::execution::SELECT_COLUMNS + )) + .bind(parent_execution_id) + .fetch_all(pool) + .await?; + + if children.is_empty() { + return Self::finalize_cancelled_workflow_if_idle(pool, parent_execution_id).await; + } + + info!( + "Executor cascading cancellation from workflow execution {} to {} child execution(s) with policy {:?}", + parent_execution_id, + children.len(), + policy, + ); + + for child in &children { + let child_id = child.id; + + if matches!( + child.status, + ExecutionStatus::Requested + | ExecutionStatus::Scheduling + | ExecutionStatus::Scheduled + ) { + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Cancelled), + result: Some(serde_json::json!({ + "error": "Cancelled: parent workflow execution was cancelled" + })), + ..Default::default() + }; + ExecutionRepository::update(pool, child_id, update).await?; + } else if matches!( + child.status, + ExecutionStatus::Running | ExecutionStatus::Canceling + ) { + match policy { + CancellationPolicy::CancelRunning => { + if child.status != ExecutionStatus::Canceling { + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Canceling), + ..Default::default() + }; + ExecutionRepository::update(pool, child_id, update).await?; + } + + if let Some(worker_id) = child.executor { + Self::send_cancel_to_worker(publisher, child_id, worker_id).await?; + } else { + warn!( + "Workflow child execution {} is {:?} but has no assigned worker", + child_id, child.status + ); + } + } + CancellationPolicy::AllowFinish => { + info!( + "AllowFinish policy: leaving running workflow child execution {} alone", + child_id + ); + } + } + } + + Box::pin(Self::cancel_workflow_children_with_policy( + pool, publisher, child_id, policy, + )) + .await?; + } + + if let Some(wf_exec) = + WorkflowExecutionRepository::find_by_execution(pool, parent_execution_id).await? + { + if !matches!( + wf_exec.status, + ExecutionStatus::Completed | ExecutionStatus::Failed | ExecutionStatus::Cancelled + ) { + let wf_update = UpdateWorkflowExecutionInput { + status: Some(ExecutionStatus::Cancelled), + error_message: Some( + "Cancelled: parent workflow execution was cancelled".to_string(), + ), + current_tasks: Some(vec![]), + ..Default::default() + }; + WorkflowExecutionRepository::update(pool, wf_exec.id, wf_update).await?; + } + } + + Self::finalize_cancelled_workflow_if_idle(pool, parent_execution_id).await + } + + async fn finalize_cancelled_workflow_if_idle( + pool: &PgPool, + parent_execution_id: i64, + ) -> Result<()> { + let still_running: Vec = sqlx::query_as::<_, Execution>(&format!( + "SELECT {} FROM execution WHERE parent = $1 AND status IN ('running', 'canceling', 'scheduling', 'scheduled', 'requested')", + attune_common::repositories::execution::SELECT_COLUMNS + )) + .bind(parent_execution_id) + .fetch_all(pool) + .await?; + + if still_running.is_empty() { + let update = UpdateExecutionInput { + status: Some(ExecutionStatus::Cancelled), + result: Some(serde_json::json!({ + "error": "Workflow cancelled", + "succeeded": false, + })), + ..Default::default() + }; + let _ = ExecutionRepository::update(pool, parent_execution_id, update).await?; + } + + Ok(()) + } + + async fn send_cancel_to_worker( + publisher: &Publisher, + execution_id: i64, + worker_id: i64, + ) -> Result<()> { + let payload = ExecutionCancelRequestedPayload { + execution_id, + worker_id, + }; + + let envelope = MessageEnvelope::new(MessageType::ExecutionCancelRequested, payload) + .with_source("executor-service") + .with_correlation_id(uuid::Uuid::new_v4()); + + publisher + .publish_envelope_with_routing( + &envelope, + "attune.executions", + &format!("execution.cancel.worker.{}", worker_id), + ) + .await?; + + Ok(()) + } + /// Parse execution status from string fn parse_execution_status(status: &str) -> Result { match status.to_lowercase().as_str() { diff --git a/crates/worker/src/executor.rs b/crates/worker/src/executor.rs index bc1544d..ed823b2 100644 --- a/crates/worker/src/executor.rs +++ b/crates/worker/src/executor.rs @@ -100,7 +100,7 @@ impl ActionExecutor { /// Execute an action for the given execution, with cancellation support. /// /// When the `cancel_token` is triggered, the running process receives - /// SIGINT → SIGTERM → SIGKILL with escalating grace periods. + /// SIGTERM → SIGKILL with a short grace period. pub async fn execute_with_cancel( &self, execution_id: i64, @@ -139,7 +139,7 @@ impl ActionExecutor { }; // Attach the cancellation token so the process executor can monitor it - context.cancel_token = Some(cancel_token); + context.cancel_token = Some(cancel_token.clone()); // Execute the action // Note: execute_action should rarely return Err - most failures should be @@ -181,7 +181,16 @@ impl ActionExecutor { execution_id, result.exit_code, result.error, is_success ); - if is_success { + let was_cancelled = cancel_token.is_cancelled() + || result + .error + .as_deref() + .is_some_and(|e| e.contains("cancelled")); + + if was_cancelled { + self.handle_execution_cancelled(execution_id, &result) + .await?; + } else if is_success { self.handle_execution_success(execution_id, &result).await?; } else { self.handle_execution_failure(execution_id, Some(&result), None) @@ -913,6 +922,51 @@ impl ActionExecutor { Ok(()) } + async fn handle_execution_cancelled( + &self, + execution_id: i64, + result: &ExecutionResult, + ) -> Result<()> { + let exec_dir = self.artifact_manager.get_execution_dir(execution_id); + let mut result_data = serde_json::json!({ + "succeeded": false, + "cancelled": true, + "exit_code": result.exit_code, + "duration_ms": result.duration_ms, + "error": result.error.clone().unwrap_or_else(|| "Execution cancelled by user".to_string()), + }); + + if !result.stdout.is_empty() { + result_data["stdout"] = serde_json::json!(result.stdout); + } + + if !result.stderr.trim().is_empty() { + let stderr_path = exec_dir.join("stderr.log"); + result_data["stderr_log"] = serde_json::json!(stderr_path.to_string_lossy()); + } + + if result.stdout_truncated { + result_data["stdout_truncated"] = serde_json::json!(true); + result_data["stdout_bytes_truncated"] = + serde_json::json!(result.stdout_bytes_truncated); + } + if result.stderr_truncated { + result_data["stderr_truncated"] = serde_json::json!(true); + result_data["stderr_bytes_truncated"] = + serde_json::json!(result.stderr_bytes_truncated); + } + + let input = UpdateExecutionInput { + status: Some(ExecutionStatus::Cancelled), + result: Some(result_data), + ..Default::default() + }; + + ExecutionRepository::update(&self.pool, execution_id, input).await?; + + Ok(()) + } + /// Update execution status async fn update_execution_status( &self, diff --git a/crates/worker/src/runtime/local.rs b/crates/worker/src/runtime/local.rs index 8690dd5..b6965f8 100644 --- a/crates/worker/src/runtime/local.rs +++ b/crates/worker/src/runtime/local.rs @@ -1,6 +1,6 @@ //! Local Runtime Module //! -//! Provides local execution capabilities by combining Process and Shell runtimes. +//! Provides local execution capabilities by combining Process and Native runtimes. //! This module serves as a facade for all local process-based execution. //! //! The `ProcessRuntime` is used for Python (and other interpreted languages), @@ -8,10 +8,11 @@ use super::native::NativeRuntime; use super::process::ProcessRuntime; -use super::shell::ShellRuntime; use super::{ExecutionContext, ExecutionResult, Runtime, RuntimeError, RuntimeResult}; use async_trait::async_trait; -use attune_common::models::runtime::{InterpreterConfig, RuntimeExecutionConfig}; +use attune_common::models::runtime::{ + InlineExecutionConfig, InlineExecutionStrategy, InterpreterConfig, RuntimeExecutionConfig, +}; use std::path::PathBuf; use tracing::{debug, info}; @@ -19,7 +20,7 @@ use tracing::{debug, info}; pub struct LocalRuntime { native: NativeRuntime, python: ProcessRuntime, - shell: ShellRuntime, + shell: ProcessRuntime, } impl LocalRuntime { @@ -34,6 +35,23 @@ impl LocalRuntime { args: vec![], file_extension: Some(".py".to_string()), }, + inline_execution: InlineExecutionConfig::default(), + environment: None, + dependencies: None, + env_vars: std::collections::HashMap::new(), + }; + + let shell_config = RuntimeExecutionConfig { + interpreter: InterpreterConfig { + binary: "/bin/bash".to_string(), + args: vec![], + file_extension: Some(".sh".to_string()), + }, + inline_execution: InlineExecutionConfig { + strategy: InlineExecutionStrategy::TempFile, + extension: Some(".sh".to_string()), + inject_shell_helpers: true, + }, environment: None, dependencies: None, env_vars: std::collections::HashMap::new(), @@ -47,7 +65,12 @@ impl LocalRuntime { PathBuf::from("/opt/attune/packs"), PathBuf::from("/opt/attune/runtime_envs"), ), - shell: ShellRuntime::new(), + shell: ProcessRuntime::new( + "shell".to_string(), + shell_config, + PathBuf::from("/opt/attune/packs"), + PathBuf::from("/opt/attune/runtime_envs"), + ), } } @@ -55,7 +78,7 @@ impl LocalRuntime { pub fn with_runtimes( native: NativeRuntime, python: ProcessRuntime, - shell: ShellRuntime, + shell: ProcessRuntime, ) -> Self { Self { native, @@ -76,7 +99,10 @@ impl LocalRuntime { ); Ok(&self.python) } else if self.shell.can_execute(context) { - debug!("Selected Shell runtime for action: {}", context.action_ref); + debug!( + "Selected Shell (ProcessRuntime) for action: {}", + context.action_ref + ); Ok(&self.shell) } else { Err(RuntimeError::RuntimeNotFound(format!( diff --git a/crates/worker/src/runtime/mod.rs b/crates/worker/src/runtime/mod.rs index dfe14a4..286f877 100644 --- a/crates/worker/src/runtime/mod.rs +++ b/crates/worker/src/runtime/mod.rs @@ -159,9 +159,9 @@ pub struct ExecutionContext { /// Format for output parsing pub output_format: OutputFormat, - /// Optional cancellation token for graceful process termination. - /// When triggered, the executor sends SIGINT → SIGTERM → SIGKILL - /// with escalating grace periods. + /// Optional cancellation token for process termination. + /// When triggered, the executor sends SIGTERM → SIGKILL + /// with a short grace period. pub cancel_token: Option, } diff --git a/crates/worker/src/runtime/process.rs b/crates/worker/src/runtime/process.rs index a658863..737ff8d 100644 --- a/crates/worker/src/runtime/process.rs +++ b/crates/worker/src/runtime/process.rs @@ -19,12 +19,18 @@ use super::{ process_executor, ExecutionContext, ExecutionResult, Runtime, RuntimeError, RuntimeResult, }; use async_trait::async_trait; -use attune_common::models::runtime::{EnvironmentConfig, RuntimeExecutionConfig}; +use attune_common::models::runtime::{ + EnvironmentConfig, InlineExecutionStrategy, RuntimeExecutionConfig, +}; use std::collections::HashMap; use std::path::{Path, PathBuf}; use tokio::process::Command; use tracing::{debug, error, info, warn}; +fn bash_single_quote_escape(s: &str) -> String { + s.replace('\'', "'\\''") +} + /// A generic runtime driven by `RuntimeExecutionConfig` from the database. /// /// Each `ProcessRuntime` instance corresponds to a row in the `runtime` table. @@ -437,6 +443,90 @@ impl ProcessRuntime { pub fn config(&self) -> &RuntimeExecutionConfig { &self.config } + + fn build_shell_inline_wrapper( + &self, + merged_parameters: &HashMap, + code: &str, + ) -> RuntimeResult { + let mut script = String::new(); + script.push_str("#!/bin/bash\n"); + script.push_str("set -e\n\n"); + + script.push_str("# Action parameters\n"); + for (key, value) in merged_parameters { + let value_str = match value { + serde_json::Value::String(s) => s.clone(), + serde_json::Value::Number(n) => n.to_string(), + serde_json::Value::Bool(b) => b.to_string(), + _ => serde_json::to_string(value)?, + }; + let escaped = bash_single_quote_escape(&value_str); + script.push_str(&format!( + "export PARAM_{}='{}'\n", + key.to_uppercase(), + escaped + )); + script.push_str(&format!("export {}='{}'\n", key, escaped)); + } + script.push('\n'); + script.push_str("# Action code\n"); + script.push_str(code); + + Ok(script) + } + + async fn materialize_inline_code( + &self, + execution_id: i64, + merged_parameters: &HashMap, + code: &str, + effective_config: &RuntimeExecutionConfig, + ) -> RuntimeResult<(PathBuf, bool)> { + let inline_dir = std::env::temp_dir().join("attune").join("inline_actions"); + tokio::fs::create_dir_all(&inline_dir).await.map_err(|e| { + RuntimeError::ExecutionFailed(format!( + "Failed to create inline action directory {}: {}", + inline_dir.display(), + e + )) + })?; + + let extension = effective_config + .inline_execution + .extension + .as_deref() + .unwrap_or(""); + let extension = if extension.is_empty() { + String::new() + } else if extension.starts_with('.') { + extension.to_string() + } else { + format!(".{}", extension) + }; + + let inline_path = inline_dir.join(format!("exec_{}{}", execution_id, extension)); + let inline_code = if effective_config.inline_execution.inject_shell_helpers { + self.build_shell_inline_wrapper(merged_parameters, code)? + } else { + code.to_string() + }; + + tokio::fs::write(&inline_path, inline_code) + .await + .map_err(|e| { + RuntimeError::ExecutionFailed(format!( + "Failed to write inline action file {}: {}", + inline_path.display(), + e + )) + })?; + + Ok(( + inline_path, + effective_config.inline_execution.inject_shell_helpers, + )) + } } #[async_trait] @@ -661,7 +751,7 @@ impl Runtime for ProcessRuntime { }; let prepared_params = parameter_passing::prepare_parameters(&merged_parameters, &mut env, param_config)?; - let parameters_stdin = prepared_params.stdin_content(); + let mut parameters_stdin = prepared_params.stdin_content(); // Determine working directory: use context override, or pack dir let working_dir = context @@ -677,6 +767,7 @@ impl Runtime for ProcessRuntime { }); // Build the command based on whether we have a file or inline code + let mut temp_inline_file: Option = None; let cmd = if let Some(ref code_path) = context.code_path { // File-based execution: interpreter [args] debug!("Executing file: {}", code_path.display()); @@ -688,13 +779,38 @@ impl Runtime for ProcessRuntime { &env, ) } else if let Some(ref code) = context.code { - // Inline code execution: interpreter -c - debug!("Executing inline code ({} bytes)", code.len()); - let mut cmd = process_executor::build_inline_command(&interpreter, code, &env); - if let Some(dir) = working_dir { - cmd.current_dir(dir); + match effective_config.inline_execution.strategy { + InlineExecutionStrategy::Direct => { + debug!("Executing inline code directly ({} bytes)", code.len()); + let mut cmd = process_executor::build_inline_command(&interpreter, code, &env); + if let Some(dir) = working_dir { + cmd.current_dir(dir); + } + cmd + } + InlineExecutionStrategy::TempFile => { + debug!("Executing inline code via temp file ({} bytes)", code.len()); + let (inline_path, consumes_parameters) = self + .materialize_inline_code( + context.execution_id, + &merged_parameters, + code, + effective_config, + ) + .await?; + if consumes_parameters { + parameters_stdin = None; + } + temp_inline_file = Some(inline_path.clone()); + process_executor::build_action_command( + &interpreter, + &effective_config.interpreter.args, + &inline_path, + working_dir, + &env, + ) + } } - cmd } else { // No code_path and no inline code — try treating entry_point as a file // relative to the pack's actions directory @@ -737,7 +853,7 @@ impl Runtime for ProcessRuntime { // Execute with streaming output capture (with optional cancellation support). // Secrets are already merged into parameters — no separate secrets arg needed. - process_executor::execute_streaming_cancellable( + let result = process_executor::execute_streaming_cancellable( cmd, &HashMap::new(), parameters_stdin, @@ -747,7 +863,13 @@ impl Runtime for ProcessRuntime { context.output_format, context.cancel_token.clone(), ) - .await + .await; + + if let Some(path) = temp_inline_file { + let _ = tokio::fs::remove_file(path).await; + } + + result } async fn setup(&self) -> RuntimeResult<()> { @@ -836,7 +958,8 @@ impl Runtime for ProcessRuntime { mod tests { use super::*; use attune_common::models::runtime::{ - DependencyConfig, EnvironmentConfig, InterpreterConfig, RuntimeExecutionConfig, + DependencyConfig, EnvironmentConfig, InlineExecutionConfig, InlineExecutionStrategy, + InterpreterConfig, RuntimeExecutionConfig, }; use attune_common::models::{OutputFormat, ParameterDelivery, ParameterFormat}; use std::collections::HashMap; @@ -849,6 +972,11 @@ mod tests { args: vec![], file_extension: Some(".sh".to_string()), }, + inline_execution: InlineExecutionConfig { + strategy: InlineExecutionStrategy::TempFile, + extension: Some(".sh".to_string()), + inject_shell_helpers: true, + }, environment: None, dependencies: None, env_vars: HashMap::new(), @@ -862,6 +990,7 @@ mod tests { args: vec!["-u".to_string()], file_extension: Some(".py".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: Some(EnvironmentConfig { env_type: "virtualenv".to_string(), dir_name: ".venv".to_string(), @@ -1104,6 +1233,7 @@ mod tests { args: vec![], file_extension: Some(".py".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: None, dependencies: None, env_vars: HashMap::new(), @@ -1183,6 +1313,53 @@ mod tests { assert!(result.stdout.contains("inline shell code")); } + #[tokio::test] + async fn test_execute_inline_code_with_merged_inputs() { + let temp_dir = TempDir::new().unwrap(); + + let runtime = ProcessRuntime::new( + "shell".to_string(), + make_shell_config(), + temp_dir.path().to_path_buf(), + temp_dir.path().join("runtime_envs"), + ); + + let context = ExecutionContext { + execution_id: 30, + action_ref: "adhoc.test_inputs".to_string(), + parameters: { + let mut map = HashMap::new(); + map.insert("name".to_string(), serde_json::json!("Alice")); + map + }, + env: HashMap::new(), + secrets: { + let mut map = HashMap::new(); + map.insert("api_key".to_string(), serde_json::json!("secret-123")); + map + }, + timeout: Some(10), + working_dir: None, + entry_point: "inline".to_string(), + code: Some("echo \"$name/$api_key/$PARAM_NAME/$PARAM_API_KEY\"".to_string()), + code_path: None, + runtime_name: Some("shell".to_string()), + runtime_config_override: None, + runtime_env_dir_suffix: None, + selected_runtime_version: None, + max_stdout_bytes: 1024 * 1024, + max_stderr_bytes: 1024 * 1024, + parameter_delivery: ParameterDelivery::default(), + parameter_format: ParameterFormat::default(), + output_format: OutputFormat::default(), + cancel_token: None, + }; + + let result = runtime.execute(context).await.unwrap(); + assert_eq!(result.exit_code, 0); + assert!(result.stdout.contains("Alice/secret-123/Alice/secret-123")); + } + #[tokio::test] async fn test_execute_entry_point_fallback() { let temp_dir = TempDir::new().unwrap(); diff --git a/crates/worker/src/runtime/process_executor.rs b/crates/worker/src/runtime/process_executor.rs index 08f711b..4df61d6 100644 --- a/crates/worker/src/runtime/process_executor.rs +++ b/crates/worker/src/runtime/process_executor.rs @@ -9,9 +9,8 @@ //! //! When a `CancellationToken` is provided, the executor monitors it alongside //! the running process. On cancellation: -//! 1. SIGINT is sent to the process (allows graceful shutdown) -//! 2. After a 10-second grace period, SIGTERM is sent if the process hasn't exited -//! 3. After another 5-second grace period, SIGKILL is sent as a last resort +//! 1. SIGTERM is sent to the process immediately +//! 2. After a 5-second grace period, SIGKILL is sent as a last resort use super::{BoundedLogWriter, ExecutionResult, OutputFormat, RuntimeResult}; use std::collections::HashMap; @@ -71,7 +70,7 @@ pub async fn execute_streaming( /// - Writing parameters (with secrets merged in) to stdin /// - Streaming stdout/stderr with bounded log collection /// - Timeout management -/// - Graceful cancellation via SIGINT → SIGTERM → SIGKILL escalation +/// - Prompt cancellation via SIGTERM → SIGKILL escalation /// - Output format parsing (JSON, YAML, JSONL, text) /// /// # Arguments @@ -199,35 +198,23 @@ pub async fn execute_streaming_cancellable( tokio::select! { result = timed_wait => (result, false), _ = token.cancelled() => { - // Cancellation requested — escalate signals to the child process. - info!("Cancel signal received, sending SIGINT to process"); + // Cancellation requested — terminate the child process promptly. + info!("Cancel signal received, sending SIGTERM to process"); if let Some(pid) = child_pid { - send_signal(pid, libc::SIGINT); + send_signal(pid, libc::SIGTERM); } - // Grace period: wait up to 10s for the process to exit after SIGINT. - match timeout(std::time::Duration::from_secs(10), child.wait()).await { + // Grace period: wait up to 5s for the process to exit after SIGTERM. + match timeout(std::time::Duration::from_secs(5), child.wait()).await { Ok(status) => (Ok(status), true), Err(_) => { - // Still alive — escalate to SIGTERM - warn!("Process did not exit after SIGINT + 10s grace period, sending SIGTERM"); + // Last resort — SIGKILL + warn!("Process did not exit after SIGTERM + 5s, sending SIGKILL"); if let Some(pid) = child_pid { - send_signal(pid, libc::SIGTERM); - } - - // Final grace period: wait up to 5s for SIGTERM - match timeout(std::time::Duration::from_secs(5), child.wait()).await { - Ok(status) => (Ok(status), true), - Err(_) => { - // Last resort — SIGKILL - warn!("Process did not exit after SIGTERM + 5s, sending SIGKILL"); - if let Some(pid) = child_pid { - send_signal(pid, libc::SIGKILL); - } - // Wait indefinitely for the SIGKILL to take effect - (Ok(child.wait().await), true) - } + send_signal(pid, libc::SIGKILL); } + // Wait indefinitely for the SIGKILL to take effect + (Ok(child.wait().await), true) } } } diff --git a/crates/worker/src/service.rs b/crates/worker/src/service.rs index 4cd81c4..5a6acd3 100644 --- a/crates/worker/src/service.rs +++ b/crates/worker/src/service.rs @@ -27,7 +27,7 @@ use attune_common::runtime_detection::runtime_in_filter; use chrono::Utc; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -44,7 +44,6 @@ use crate::registration::WorkerRegistration; use crate::runtime::local::LocalRuntime; use crate::runtime::native::NativeRuntime; use crate::runtime::process::ProcessRuntime; -use crate::runtime::shell::ShellRuntime; use crate::runtime::RuntimeRegistry; use crate::secrets::SecretManager; use crate::version_verify; @@ -89,8 +88,11 @@ pub struct WorkerService { in_flight_tasks: Arc>>, /// Maps execution ID → CancellationToken for running processes. /// When a cancel request arrives, the token is triggered, causing - /// the process executor to send SIGINT → SIGTERM → SIGKILL. + /// the process executor to send SIGTERM → SIGKILL. cancel_tokens: Arc>>, + /// Tracks cancellation requests that arrived before the in-memory token + /// for an execution had been registered. + pending_cancellations: Arc>>, } impl WorkerService { @@ -263,9 +265,29 @@ impl WorkerService { if runtime_registry.list_runtimes().is_empty() { info!("No runtimes loaded from database, registering built-in defaults"); - // Shell runtime (always available) - runtime_registry.register(Box::new(ShellRuntime::new())); - info!("Registered built-in Shell runtime"); + // Shell runtime (always available) via generic ProcessRuntime + let shell_runtime = ProcessRuntime::new( + "shell".to_string(), + attune_common::models::runtime::RuntimeExecutionConfig { + interpreter: attune_common::models::runtime::InterpreterConfig { + binary: "/bin/bash".to_string(), + args: vec![], + file_extension: Some(".sh".to_string()), + }, + inline_execution: attune_common::models::runtime::InlineExecutionConfig { + strategy: attune_common::models::runtime::InlineExecutionStrategy::TempFile, + extension: Some(".sh".to_string()), + inject_shell_helpers: true, + }, + environment: None, + dependencies: None, + env_vars: std::collections::HashMap::new(), + }, + packs_base_dir.clone(), + runtime_envs_dir.clone(), + ); + runtime_registry.register(Box::new(shell_runtime)); + info!("Registered built-in shell ProcessRuntime"); // Native runtime (for compiled binaries) runtime_registry.register(Box::new(NativeRuntime::new())); @@ -379,6 +401,7 @@ impl WorkerService { execution_semaphore: Arc::new(Semaphore::new(max_concurrent_tasks)), in_flight_tasks: Arc::new(Mutex::new(JoinSet::new())), cancel_tokens: Arc::new(Mutex::new(HashMap::new())), + pending_cancellations: Arc::new(Mutex::new(HashSet::new())), }) } @@ -755,6 +778,7 @@ impl WorkerService { let semaphore = self.execution_semaphore.clone(); let in_flight = self.in_flight_tasks.clone(); let cancel_tokens = self.cancel_tokens.clone(); + let pending_cancellations = self.pending_cancellations.clone(); // Spawn the consumer loop as a background task so start() can return let handle = tokio::spawn(async move { @@ -768,6 +792,7 @@ impl WorkerService { let semaphore = semaphore.clone(); let in_flight = in_flight.clone(); let cancel_tokens = cancel_tokens.clone(); + let pending_cancellations = pending_cancellations.clone(); async move { let execution_id = envelope.payload.execution_id; @@ -794,6 +819,16 @@ impl WorkerService { let mut tokens = cancel_tokens.lock().await; tokens.insert(execution_id, cancel_token.clone()); } + { + let pending = pending_cancellations.lock().await; + if pending.contains(&execution_id) { + info!( + "Execution {} already had a pending cancel request; cancelling immediately", + execution_id + ); + cancel_token.cancel(); + } + } // Spawn the actual execution as a background task so this // handler returns immediately, acking the message and freeing @@ -819,6 +854,8 @@ impl WorkerService { // Remove the cancel token now that execution is done let mut tokens = cancel_tokens.lock().await; tokens.remove(&execution_id); + let mut pending = pending_cancellations.lock().await; + pending.remove(&execution_id); }); Ok(()) @@ -1060,6 +1097,7 @@ impl WorkerService { let consumer_for_task = consumer.clone(); let cancel_tokens = self.cancel_tokens.clone(); + let pending_cancellations = self.pending_cancellations.clone(); let queue_name_for_log = queue_name.clone(); let handle = tokio::spawn(async move { @@ -1071,11 +1109,17 @@ impl WorkerService { .consume_with_handler( move |envelope: MessageEnvelope| { let cancel_tokens = cancel_tokens.clone(); + let pending_cancellations = pending_cancellations.clone(); async move { let execution_id = envelope.payload.execution_id; info!("Received cancel request for execution {}", execution_id); + { + let mut pending = pending_cancellations.lock().await; + pending.insert(execution_id); + } + let tokens = cancel_tokens.lock().await; if let Some(token) = tokens.get(&execution_id) { info!("Triggering cancellation for execution {}", execution_id); diff --git a/crates/worker/tests/dependency_isolation_test.rs b/crates/worker/tests/dependency_isolation_test.rs index 03054f9..cde0d1f 100644 --- a/crates/worker/tests/dependency_isolation_test.rs +++ b/crates/worker/tests/dependency_isolation_test.rs @@ -9,7 +9,8 @@ //! This keeps the pack directory clean and read-only. use attune_common::models::runtime::{ - DependencyConfig, EnvironmentConfig, InterpreterConfig, RuntimeExecutionConfig, + DependencyConfig, EnvironmentConfig, InlineExecutionConfig, InterpreterConfig, + RuntimeExecutionConfig, }; use attune_worker::runtime::process::ProcessRuntime; use attune_worker::runtime::ExecutionContext; @@ -26,6 +27,7 @@ fn make_python_config() -> RuntimeExecutionConfig { args: vec!["-u".to_string()], file_extension: Some(".py".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: Some(EnvironmentConfig { env_type: "virtualenv".to_string(), dir_name: ".venv".to_string(), @@ -59,6 +61,7 @@ fn make_shell_config() -> RuntimeExecutionConfig { args: vec![], file_extension: Some(".sh".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: None, dependencies: None, env_vars: std::collections::HashMap::new(), diff --git a/crates/worker/tests/log_truncation_test.rs b/crates/worker/tests/log_truncation_test.rs index 6434790..629a8e3 100644 --- a/crates/worker/tests/log_truncation_test.rs +++ b/crates/worker/tests/log_truncation_test.rs @@ -3,7 +3,9 @@ //! Tests that verify stdout/stderr are properly truncated when they exceed //! configured size limits, preventing OOM issues with large output. -use attune_common::models::runtime::{InterpreterConfig, RuntimeExecutionConfig}; +use attune_common::models::runtime::{ + InlineExecutionConfig, InterpreterConfig, RuntimeExecutionConfig, +}; use attune_worker::runtime::process::ProcessRuntime; use attune_worker::runtime::{ExecutionContext, Runtime, ShellRuntime}; use std::collections::HashMap; @@ -17,6 +19,7 @@ fn make_python_process_runtime(packs_base_dir: PathBuf) -> ProcessRuntime { args: vec!["-u".to_string()], file_extension: Some(".py".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: None, dependencies: None, env_vars: std::collections::HashMap::new(), @@ -270,6 +273,7 @@ async fn test_shell_process_runtime_truncation() { args: vec![], file_extension: Some(".sh".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: None, dependencies: None, env_vars: std::collections::HashMap::new(), diff --git a/crates/worker/tests/security_tests.rs b/crates/worker/tests/security_tests.rs index fbfcbe5..e9ad3c3 100644 --- a/crates/worker/tests/security_tests.rs +++ b/crates/worker/tests/security_tests.rs @@ -3,7 +3,9 @@ //! These tests verify that secrets are NOT exposed in process environment //! or command-line arguments, ensuring secure secret passing via stdin. -use attune_common::models::runtime::{InterpreterConfig, RuntimeExecutionConfig}; +use attune_common::models::runtime::{ + InlineExecutionConfig, InterpreterConfig, RuntimeExecutionConfig, +}; use attune_worker::runtime::process::ProcessRuntime; use attune_worker::runtime::shell::ShellRuntime; use attune_worker::runtime::{ExecutionContext, Runtime}; @@ -18,6 +20,7 @@ fn make_python_process_runtime(packs_base_dir: PathBuf) -> ProcessRuntime { args: vec!["-u".to_string()], file_extension: Some(".py".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: None, dependencies: None, env_vars: std::collections::HashMap::new(), @@ -440,6 +443,7 @@ echo "PASS: No secrets in environment" args: vec![], file_extension: Some(".sh".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: None, dependencies: None, env_vars: std::collections::HashMap::new(), @@ -520,6 +524,7 @@ print(json.dumps({"leaked": leaked})) args: vec!["-u".to_string()], file_extension: Some(".py".to_string()), }, + inline_execution: InlineExecutionConfig::default(), environment: None, dependencies: None, env_vars: std::collections::HashMap::new(), diff --git a/docker-compose.yaml b/docker-compose.yaml index a7b973c..954c6aa 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -109,6 +109,8 @@ services: SOURCE_PACKS_DIR: /source/packs TARGET_PACKS_DIR: /opt/attune/packs LOADER_SCRIPT: /scripts/load_core_pack.py + DEFAULT_ADMIN_LOGIN: test@attune.local + DEFAULT_ADMIN_PERMISSION_SET_REF: core.admin command: ["/bin/sh", "/init-packs.sh"] depends_on: migrations: diff --git a/docker/init-packs.sh b/docker/init-packs.sh index 7540d62..1fceb28 100755 --- a/docker/init-packs.sh +++ b/docker/init-packs.sh @@ -26,6 +26,8 @@ TARGET_PACKS_DIR="${TARGET_PACKS_DIR:-/opt/attune/packs}" # Python loader script LOADER_SCRIPT="${LOADER_SCRIPT:-/scripts/load_core_pack.py}" +DEFAULT_ADMIN_LOGIN="${DEFAULT_ADMIN_LOGIN:-}" +DEFAULT_ADMIN_PERMISSION_SET_REF="${DEFAULT_ADMIN_PERMISSION_SET_REF:-core.admin}" echo "" echo -e "${BLUE}╔════════════════════════════════════════════════╗${NC}" @@ -205,6 +207,63 @@ else echo -e "${BLUE}ℹ${NC} You can manually load them later" fi +if [ -n "$DEFAULT_ADMIN_LOGIN" ] && [ "$LOADED_COUNT" -gt 0 ]; then + echo "" + echo -e "${BLUE}Bootstrapping local admin assignment...${NC}" + if python3 - < Dict[str, int]: + """Load permission set definitions from permission_sets/*.yaml.""" + print("\n→ Loading permission sets...") + + permission_sets_dir = self.pack_dir / "permission_sets" + if not permission_sets_dir.exists(): + print(" No permission_sets directory found") + return {} + + permission_set_ids = {} + cursor = self.conn.cursor() + + for yaml_file in sorted(permission_sets_dir.glob("*.yaml")): + permission_set_data = self.load_yaml(yaml_file) + if not permission_set_data: + continue + + ref = permission_set_data.get("ref") + if not ref: + print( + f" ⚠ Permission set YAML {yaml_file.name} missing 'ref' field, skipping" + ) + continue + + label = permission_set_data.get("label") + description = permission_set_data.get("description") + grants = permission_set_data.get("grants", []) + + if not isinstance(grants, list): + print( + f" ⚠ Permission set '{ref}' has non-array grants, skipping" + ) + continue + + cursor.execute( + """ + INSERT INTO permission_set ( + ref, pack, pack_ref, label, description, grants + ) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (ref) DO UPDATE SET + label = EXCLUDED.label, + description = EXCLUDED.description, + grants = EXCLUDED.grants, + updated = NOW() + RETURNING id + """, + ( + ref, + self.pack_id, + self.pack_ref, + label, + description, + json.dumps(grants), + ), + ) + + permission_set_id = cursor.fetchone()[0] + permission_set_ids[ref] = permission_set_id + print(f" ✓ Permission set '{ref}' (ID: {permission_set_id})") + + cursor.close() + return permission_set_ids + def upsert_triggers(self) -> Dict[str, int]: """Load trigger definitions""" print("\n→ Loading triggers...") @@ -708,11 +772,12 @@ class PackLoader: """Main loading process. Components are loaded in dependency order: - 1. Runtimes (no dependencies) - 2. Triggers (no dependencies) - 3. Actions (depend on runtime; workflow actions also create + 1. Permission sets (no dependencies) + 2. Runtimes (no dependencies) + 3. Triggers (no dependencies) + 4. Actions (depend on runtime; workflow actions also create workflow_definition records) - 4. Sensors (depend on triggers and runtime) + 5. Sensors (depend on triggers and runtime) """ print("=" * 60) print(f"Pack Loader - {self.pack_name}") @@ -727,7 +792,10 @@ class PackLoader: # Load pack metadata self.upsert_pack() - # Load runtimes first (actions and sensors depend on them) + # Load permission sets first (authorization metadata) + permission_set_ids = self.upsert_permission_sets() + + # Load runtimes (actions and sensors depend on them) runtime_ids = self.upsert_runtimes() # Load triggers @@ -746,6 +814,7 @@ class PackLoader: print(f"✓ Pack '{self.pack_name}' loaded successfully!") print("=" * 60) print(f" Pack ID: {self.pack_id}") + print(f" Permission sets: {len(permission_set_ids)}") print(f" Runtimes: {len(set(runtime_ids.values()))}") print(f" Triggers: {len(trigger_ids)}") print(f" Actions: {len(action_ids)}")