From b5d6bb22433c657ee81c05ef83897c48067821a4 Mon Sep 17 00:00:00 2001 From: David Culbreth Date: Wed, 11 Mar 2026 11:21:28 -0500 Subject: [PATCH] more polish on workflows --- crates/api/src/dto/workflow.rs | 27 --- crates/api/src/routes/workflows.rs | 7 - crates/api/tests/helpers.rs | 1 - crates/api/tests/pack_workflow_tests.rs | 2 - crates/api/tests/workflow_tests.rs | 37 +--- crates/cli/src/commands/workflow.rs | 12 - crates/common/src/models.rs | 1 - crates/common/src/pack_registry/loader.rs | 2 - crates/common/src/repositories/workflow.rs | 54 +---- crates/common/src/workflow/registrar.rs | 2 - crates/executor/src/scheduler.rs | 139 ++++++++---- crates/executor/src/workflow/registrar.rs | 2 - migrations/20250101000006_workflow_system.sql | 3 - .../models/ApiResponse_WorkflowResponse.ts | 5 - web/src/api/models/CreateWorkflowRequest.ts | 5 - .../PaginatedResponse_WorkflowSummary.ts | 5 - web/src/api/models/UpdateWorkflowRequest.ts | 5 - web/src/api/models/WorkflowResponse.ts | 5 - web/src/api/models/WorkflowSummary.ts | 5 - web/src/api/services/WorkflowsService.ts | 18 -- .../components/workflows/WorkflowEdges.tsx | 4 +- .../workflows/WorkflowInputsPanel.tsx | 129 ++++++++++- web/src/hooks/useWorkflows.ts | 2 - web/src/pages/actions/WorkflowBuilderPage.tsx | 209 +++++++++++------- web/src/types/workflow.ts | 7 - 25 files changed, 366 insertions(+), 322 deletions(-) diff --git a/crates/api/src/dto/workflow.rs b/crates/api/src/dto/workflow.rs index 2f4002b..bda99cd 100644 --- a/crates/api/src/dto/workflow.rs +++ b/crates/api/src/dto/workflow.rs @@ -49,9 +49,6 @@ pub struct SaveWorkflowFileRequest { #[schema(example = json!(["deployment", "automation"]))] pub tags: Option>, - /// Whether the workflow is enabled - #[schema(example = true)] - pub enabled: Option, } /// Request DTO for creating a new workflow @@ -97,9 +94,6 @@ pub struct CreateWorkflowRequest { #[schema(example = json!(["incident", "slack", "approval"]))] pub tags: Option>, - /// Whether the workflow is enabled - #[schema(example = true)] - pub enabled: Option, } /// Request DTO for updating a workflow @@ -135,9 +129,6 @@ pub struct UpdateWorkflowRequest { #[schema(example = json!(["incident", "slack", "approval", "automation"]))] pub tags: Option>, - /// Whether the workflow is enabled - #[schema(example = true)] - pub enabled: Option, } /// Response DTO for workflow information @@ -187,10 +178,6 @@ pub struct WorkflowResponse { #[schema(example = json!(["incident", "slack", "approval"]))] pub tags: Vec, - /// Whether the workflow is enabled - #[schema(example = true)] - pub enabled: bool, - /// Creation timestamp #[schema(example = "2024-01-13T10:30:00Z")] pub created: DateTime, @@ -231,10 +218,6 @@ pub struct WorkflowSummary { #[schema(example = json!(["incident", "slack", "approval"]))] pub tags: Vec, - /// Whether the workflow is enabled - #[schema(example = true)] - pub enabled: bool, - /// Creation timestamp #[schema(example = "2024-01-13T10:30:00Z")] pub created: DateTime, @@ -259,7 +242,6 @@ impl From for WorkflowRespo out_schema: workflow.out_schema, definition: workflow.definition, tags: workflow.tags, - enabled: workflow.enabled, created: workflow.created, updated: workflow.updated, } @@ -277,7 +259,6 @@ impl From for WorkflowSumma description: workflow.description, version: workflow.version, tags: workflow.tags, - enabled: workflow.enabled, created: workflow.created, updated: workflow.updated, } @@ -291,10 +272,6 @@ pub struct WorkflowSearchParams { #[param(example = "incident,approval")] pub tags: Option, - /// Filter by enabled status - #[param(example = true)] - pub enabled: Option, - /// Search term for label/description (case-insensitive) #[param(example = "incident")] pub search: Option, @@ -320,7 +297,6 @@ mod tests { out_schema: None, definition: serde_json::json!({"tasks": []}), tags: None, - enabled: None, }; assert!(req.validate().is_err()); @@ -338,7 +314,6 @@ mod tests { out_schema: None, definition: serde_json::json!({"tasks": []}), tags: Some(vec!["test".to_string()]), - enabled: Some(true), }; assert!(req.validate().is_ok()); @@ -354,7 +329,6 @@ mod tests { out_schema: None, definition: None, tags: None, - enabled: None, }; // Should be valid even with all None values @@ -365,7 +339,6 @@ mod tests { fn test_workflow_search_params() { let params = WorkflowSearchParams { tags: Some("incident,approval".to_string()), - enabled: Some(true), search: Some("response".to_string()), pack_ref: Some("core".to_string()), }; diff --git a/crates/api/src/routes/workflows.rs b/crates/api/src/routes/workflows.rs index 4e5dd05..9a0d7ba 100644 --- a/crates/api/src/routes/workflows.rs +++ b/crates/api/src/routes/workflows.rs @@ -66,7 +66,6 @@ pub async fn list_workflows( let filters = WorkflowSearchFilters { pack: None, pack_ref: search_params.pack_ref.clone(), - enabled: search_params.enabled, tags, search: search_params.search.clone(), limit: pagination.limit(), @@ -113,7 +112,6 @@ pub async fn list_workflows_by_pack( let filters = WorkflowSearchFilters { pack: None, pack_ref: Some(pack_ref), - enabled: None, tags: None, search: None, limit: pagination.limit(), @@ -208,7 +206,6 @@ pub async fn create_workflow( out_schema: request.out_schema.clone(), definition: request.definition, tags: request.tags.clone().unwrap_or_default(), - enabled: request.enabled.unwrap_or(true), }; let workflow = WorkflowDefinitionRepository::create(&state.db, workflow_input).await?; @@ -275,7 +272,6 @@ pub async fn update_workflow( out_schema: request.out_schema.clone(), definition: request.definition, tags: request.tags, - enabled: request.enabled, }; let workflow = @@ -408,7 +404,6 @@ pub async fn save_workflow_file( out_schema: request.out_schema.clone(), definition: definition_json, tags: request.tags.clone().unwrap_or_default(), - enabled: request.enabled.unwrap_or(true), }; let workflow = WorkflowDefinitionRepository::create(&state.db, workflow_input).await?; @@ -489,7 +484,6 @@ pub async fn update_workflow_file( out_schema: request.out_schema.clone(), definition: Some(definition_json), tags: request.tags, - enabled: request.enabled, }; let workflow = @@ -647,7 +641,6 @@ fn build_action_yaml(pack_ref: &str, request: &SaveWorkflowFileRequest) -> Strin lines.push(format!("description: \"{}\"", desc.replace('"', "\\\""))); } } - lines.push("enabled: true".to_string()); lines.push(format!( "workflow_file: workflows/{}.workflow.yaml", request.name diff --git a/crates/api/tests/helpers.rs b/crates/api/tests/helpers.rs index 11640a5..ddd4279 100644 --- a/crates/api/tests/helpers.rs +++ b/crates/api/tests/helpers.rs @@ -551,7 +551,6 @@ pub async fn create_test_workflow( ] }), tags: vec!["test".to_string()], - enabled: true, }; Ok(WorkflowDefinitionRepository::create(pool, input).await?) diff --git a/crates/api/tests/pack_workflow_tests.rs b/crates/api/tests/pack_workflow_tests.rs index c01a225..7bd6c4e 100644 --- a/crates/api/tests/pack_workflow_tests.rs +++ b/crates/api/tests/pack_workflow_tests.rs @@ -22,7 +22,6 @@ ref: {}.example_workflow label: Example Workflow description: A test workflow for integration testing version: "1.0.0" -enabled: true parameters: message: type: string @@ -46,7 +45,6 @@ ref: {}.another_workflow label: Another Workflow description: Second test workflow version: "1.0.0" -enabled: false tasks: - name: task1 action: core.noop diff --git a/crates/api/tests/workflow_tests.rs b/crates/api/tests/workflow_tests.rs index 9afaa61..69c7676 100644 --- a/crates/api/tests/workflow_tests.rs +++ b/crates/api/tests/workflow_tests.rs @@ -46,8 +46,7 @@ async fn test_create_workflow_success() { } ] }, - "tags": ["test", "automation"], - "enabled": true + "tags": ["test", "automation"] }), ctx.token(), ) @@ -60,7 +59,6 @@ async fn test_create_workflow_success() { assert_eq!(body["data"]["ref"], "test-pack.test_workflow"); assert_eq!(body["data"]["label"], "Test Workflow"); assert_eq!(body["data"]["version"], "1.0.0"); - assert_eq!(body["data"]["enabled"], true); assert!(body["data"]["tags"].as_array().unwrap().len() == 2); } @@ -85,7 +83,6 @@ async fn test_create_workflow_duplicate_ref() { out_schema: None, definition: json!({"tasks": []}), tags: vec![], - enabled: true, }; WorkflowDefinitionRepository::create(&ctx.pool, input) .await @@ -152,7 +149,6 @@ async fn test_get_workflow_by_ref() { out_schema: None, definition: json!({"tasks": [{"name": "task1"}]}), tags: vec!["test".to_string()], - enabled: true, }; WorkflowDefinitionRepository::create(&ctx.pool, input) .await @@ -206,7 +202,6 @@ async fn test_list_workflows() { out_schema: None, definition: json!({"tasks": []}), tags: vec!["test".to_string()], - enabled: i % 2 == 1, // Odd ones enabled }; WorkflowDefinitionRepository::create(&ctx.pool, input) .await @@ -256,7 +251,6 @@ async fn test_list_workflows_by_pack() { out_schema: None, definition: json!({"tasks": []}), tags: vec![], - enabled: true, }; WorkflowDefinitionRepository::create(&ctx.pool, input) .await @@ -275,7 +269,6 @@ async fn test_list_workflows_by_pack() { out_schema: None, definition: json!({"tasks": []}), tags: vec![], - enabled: true, }; WorkflowDefinitionRepository::create(&ctx.pool, input) .await @@ -308,14 +301,14 @@ async fn test_list_workflows_with_filters() { let pack_name = unique_pack_name(); let pack = create_test_pack(&ctx.pool, &pack_name).await.unwrap(); - // Create workflows with different tags and enabled status + // Create workflows with different tags let workflows = vec![ - ("workflow1", vec!["incident", "approval"], true), - ("workflow2", vec!["incident"], false), - ("workflow3", vec!["automation"], true), + ("workflow1", vec!["incident", "approval"]), + ("workflow2", vec!["incident"]), + ("workflow3", vec!["automation"]), ]; - for (ref_name, tags, enabled) in workflows { + for (ref_name, tags) in workflows { let input = CreateWorkflowDefinitionInput { r#ref: format!("test-pack.{}", ref_name), pack: pack.id, @@ -327,24 +320,12 @@ async fn test_list_workflows_with_filters() { out_schema: None, definition: json!({"tasks": []}), tags: tags.iter().map(|s| s.to_string()).collect(), - enabled, }; WorkflowDefinitionRepository::create(&ctx.pool, input) .await .unwrap(); } - // Filter by enabled (and pack_ref for isolation) - let response = ctx - .get( - &format!("/api/v1/workflows?enabled=true&pack_ref={}", pack_name), - ctx.token(), - ) - .await - .unwrap(); - let body: Value = response.json().await.unwrap(); - assert_eq!(body["data"].as_array().unwrap().len(), 2); - // Filter by tag (and pack_ref for isolation) let response = ctx .get( @@ -387,7 +368,6 @@ async fn test_update_workflow() { out_schema: None, definition: json!({"tasks": []}), tags: vec!["test".to_string()], - enabled: true, }; WorkflowDefinitionRepository::create(&ctx.pool, input) .await @@ -400,8 +380,7 @@ async fn test_update_workflow() { json!({ "label": "Updated Label", "description": "Updated description", - "version": "1.1.0", - "enabled": false + "version": "1.1.0" }), ctx.token(), ) @@ -414,7 +393,6 @@ async fn test_update_workflow() { assert_eq!(body["data"]["label"], "Updated Label"); assert_eq!(body["data"]["description"], "Updated description"); assert_eq!(body["data"]["version"], "1.1.0"); - assert_eq!(body["data"]["enabled"], false); } #[tokio::test] @@ -455,7 +433,6 @@ async fn test_delete_workflow() { out_schema: None, definition: json!({"tasks": []}), tags: vec![], - enabled: true, }; WorkflowDefinitionRepository::create(&ctx.pool, input) .await diff --git a/crates/cli/src/commands/workflow.rs b/crates/cli/src/commands/workflow.rs index 717e82c..3f40a9a 100644 --- a/crates/cli/src/commands/workflow.rs +++ b/crates/cli/src/commands/workflow.rs @@ -86,9 +86,6 @@ struct ActionYaml { #[serde(default)] tags: Option>, - /// Whether the action is enabled - #[serde(default)] - enabled: Option, } // ── API DTOs ──────────────────────────────────────────────────────────── @@ -109,8 +106,6 @@ struct SaveWorkflowFileRequest { out_schema: Option, #[serde(skip_serializing_if = "Option::is_none")] tags: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - enabled: Option, } #[derive(Debug, Serialize, Deserialize)] @@ -127,7 +122,6 @@ struct WorkflowResponse { out_schema: Option, definition: serde_json::Value, tags: Vec, - enabled: bool, created: String, updated: String, } @@ -142,7 +136,6 @@ struct WorkflowSummary { description: Option, version: String, tags: Vec, - enabled: bool, created: String, updated: String, } @@ -281,7 +274,6 @@ async fn handle_upload( param_schema: action.parameters.clone(), out_schema: action.output.clone(), tags: action.tags.clone(), - enabled: action.enabled, }; // ── 6. Print progress ─────────────────────────────────────────────── @@ -357,7 +349,6 @@ async fn handle_upload( response.tags.join(", ") }, ), - ("Enabled", output::format_bool(response.enabled)), ]); } } @@ -414,7 +405,6 @@ async fn handle_list( "Pack", "Label", "Version", - "Enabled", "Tags", ], ); @@ -426,7 +416,6 @@ async fn handle_list( wf.pack_ref.clone(), output::truncate(&wf.label, 30), wf.version.clone(), - output::format_bool(wf.enabled), if wf.tags.is_empty() { "-".to_string() } else { @@ -478,7 +467,6 @@ async fn handle_show( .unwrap_or_else(|| "-".to_string()), ), ("Version", workflow.version.clone()), - ("Enabled", output::format_bool(workflow.enabled)), ( "Tags", if workflow.tags.is_empty() { diff --git a/crates/common/src/models.rs b/crates/common/src/models.rs index dcc7fb6..15f9ac8 100644 --- a/crates/common/src/models.rs +++ b/crates/common/src/models.rs @@ -1385,7 +1385,6 @@ pub mod workflow { pub out_schema: Option, pub definition: JsonDict, pub tags: Vec, - pub enabled: bool, pub created: DateTime, pub updated: DateTime, } diff --git a/crates/common/src/pack_registry/loader.rs b/crates/common/src/pack_registry/loader.rs index ae2b1dd..d325a53 100644 --- a/crates/common/src/pack_registry/loader.rs +++ b/crates/common/src/pack_registry/loader.rs @@ -1131,7 +1131,6 @@ impl<'a> PackComponentLoader<'a> { out_schema, definition: Some(definition_json), tags: Some(tags), - enabled: Some(true), }; WorkflowDefinitionRepository::update(self.pool, existing.id, update_input).await?; @@ -1159,7 +1158,6 @@ impl<'a> PackComponentLoader<'a> { out_schema, definition: definition_json, tags, - enabled: true, }; let created = WorkflowDefinitionRepository::create(self.pool, create_input).await?; diff --git a/crates/common/src/repositories/workflow.rs b/crates/common/src/repositories/workflow.rs index 63374ae..0e5dc4d 100644 --- a/crates/common/src/repositories/workflow.rs +++ b/crates/common/src/repositories/workflow.rs @@ -20,8 +20,6 @@ pub struct WorkflowSearchFilters { pub pack: Option, /// Filter by pack reference pub pack_ref: Option, - /// Filter by enabled status - pub enabled: Option, /// Filter by tags (OR across tags — matches if any tag is present) pub tags: Option>, /// Text search across label and description (case-insensitive substring) @@ -62,7 +60,6 @@ pub struct CreateWorkflowDefinitionInput { pub out_schema: Option, pub definition: JsonDict, pub tags: Vec, - pub enabled: bool, } #[derive(Debug, Clone, Default)] @@ -74,7 +71,6 @@ pub struct UpdateWorkflowDefinitionInput { pub out_schema: Option, pub definition: Option, pub tags: Option>, - pub enabled: Option, } #[async_trait::async_trait] @@ -84,7 +80,7 @@ impl FindById for WorkflowDefinitionRepository { E: Executor<'e, Database = Postgres> + 'e, { sqlx::query_as::<_, WorkflowDefinition>( - "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated + "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated FROM workflow_definition WHERE id = $1" ) @@ -102,7 +98,7 @@ impl FindByRef for WorkflowDefinitionRepository { E: Executor<'e, Database = Postgres> + 'e, { sqlx::query_as::<_, WorkflowDefinition>( - "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated + "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated FROM workflow_definition WHERE ref = $1" ) @@ -120,7 +116,7 @@ impl List for WorkflowDefinitionRepository { E: Executor<'e, Database = Postgres> + 'e, { sqlx::query_as::<_, WorkflowDefinition>( - "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated + "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated FROM workflow_definition ORDER BY created DESC LIMIT 1000" @@ -141,9 +137,9 @@ impl Create for WorkflowDefinitionRepository { { sqlx::query_as::<_, WorkflowDefinition>( "INSERT INTO workflow_definition - (ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - RETURNING id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated" + (ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + RETURNING id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated" ) .bind(&input.r#ref) .bind(input.pack) @@ -155,7 +151,6 @@ impl Create for WorkflowDefinitionRepository { .bind(&input.out_schema) .bind(&input.definition) .bind(&input.tags) - .bind(input.enabled) .fetch_one(executor) .await .map_err(Into::into) @@ -219,20 +214,12 @@ impl Update for WorkflowDefinitionRepository { query.push("tags = ").push_bind(tags); has_updates = true; } - if let Some(enabled) = input.enabled { - if has_updates { - query.push(", "); - } - query.push("enabled = ").push_bind(enabled); - has_updates = true; - } - if !has_updates { return Self::get_by_id(executor, id).await; } query.push(", updated = NOW() WHERE id = ").push_bind(id); - query.push(" RETURNING id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated"); + query.push(" RETURNING id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated"); query .build_query_as::() @@ -269,7 +256,7 @@ impl WorkflowDefinitionRepository { where E: Executor<'e, Database = Postgres> + Copy + 'e, { - let select_cols = "id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated"; + let select_cols = "id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated"; let mut qb: QueryBuilder<'_, Postgres> = QueryBuilder::new(format!("SELECT {select_cols} FROM workflow_definition")); @@ -301,9 +288,6 @@ impl WorkflowDefinitionRepository { if let Some(ref pack_ref) = filters.pack_ref { push_condition!("pack_ref = ", pack_ref.clone()); } - if let Some(enabled) = filters.enabled { - push_condition!("enabled = ", enabled); - } if let Some(ref tags) = filters.tags { if !tags.is_empty() { // Use PostgreSQL array overlap operator: tags && ARRAY[...] @@ -359,7 +343,7 @@ impl WorkflowDefinitionRepository { E: Executor<'e, Database = Postgres> + 'e, { sqlx::query_as::<_, WorkflowDefinition>( - "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated + "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated FROM workflow_definition WHERE pack = $1 ORDER BY label" @@ -379,7 +363,7 @@ impl WorkflowDefinitionRepository { E: Executor<'e, Database = Postgres> + 'e, { sqlx::query_as::<_, WorkflowDefinition>( - "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated + "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated FROM workflow_definition WHERE pack_ref = $1 ORDER BY label" @@ -403,29 +387,13 @@ impl WorkflowDefinitionRepository { Ok(result.0) } - /// Find all enabled workflows - pub async fn find_enabled<'e, E>(executor: E) -> Result> - where - E: Executor<'e, Database = Postgres> + 'e, - { - sqlx::query_as::<_, WorkflowDefinition>( - "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated - FROM workflow_definition - WHERE enabled = true - ORDER BY label" - ) - .fetch_all(executor) - .await - .map_err(Into::into) - } - /// Find workflows by tag pub async fn find_by_tag<'e, E>(executor: E, tag: &str) -> Result> where E: Executor<'e, Database = Postgres> + 'e, { sqlx::query_as::<_, WorkflowDefinition>( - "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, enabled, created, updated + "SELECT id, ref, pack, pack_ref, label, description, version, param_schema, out_schema, definition, tags, created, updated FROM workflow_definition WHERE $1 = ANY(tags) ORDER BY label" diff --git a/crates/common/src/workflow/registrar.rs b/crates/common/src/workflow/registrar.rs index fcd61e8..af7d7e4 100644 --- a/crates/common/src/workflow/registrar.rs +++ b/crates/common/src/workflow/registrar.rs @@ -379,7 +379,6 @@ impl WorkflowRegistrar { out_schema: workflow.output.clone(), definition, tags: workflow.tags.clone(), - enabled: true, }; let created = WorkflowDefinitionRepository::create(&self.pool, input).await?; @@ -411,7 +410,6 @@ impl WorkflowRegistrar { out_schema: workflow.output.clone(), definition: Some(definition), tags: Some(workflow.tags.clone()), - enabled: Some(true), }; let updated = WorkflowDefinitionRepository::update(&self.pool, *workflow_id, input).await?; diff --git a/crates/executor/src/scheduler.rs b/crates/executor/src/scheduler.rs index 365595f..dbb6923 100644 --- a/crates/executor/src/scheduler.rs +++ b/crates/executor/src/scheduler.rs @@ -286,21 +286,6 @@ impl ExecutionScheduler { ) })?; - if !workflow_def.enabled { - warn!( - "Workflow '{}' is disabled, failing execution {}", - workflow_def.r#ref, execution.id - ); - let mut fail = execution.clone(); - fail.status = ExecutionStatus::Failed; - fail.result = Some(serde_json::json!({ - "error": format!("Workflow '{}' is disabled", workflow_def.r#ref), - "succeeded": false, - })); - ExecutionRepository::update(pool, fail.id, fail.into()).await?; - return Ok(()); - } - // Parse workflow definition JSON into the strongly-typed struct let definition: WorkflowDefinition = serde_json::from_value(workflow_def.definition.clone()).map_err(|e| { @@ -900,35 +885,61 @@ impl ExecutionScheduler { return Ok(()); } - // Cancelled workflow: don't dispatch new tasks, but check whether all - // running children have now finished. When none remain, finalize the - // parent execution as Cancelled so it doesn't stay stuck in "Canceling". - if workflow_execution.status == ExecutionStatus::Cancelled { - let running = Self::count_running_workflow_children( - pool, - workflow_execution_id, - &workflow_execution.completed_tasks, - &workflow_execution.failed_tasks, - ) - .await?; - - if running == 0 { - info!( - "Cancelled workflow_execution {} has no more running children, \ - finalizing parent execution {} as Cancelled", - workflow_execution_id, workflow_execution.execution - ); - Self::finalize_cancelled_workflow( - pool, + let parent_execution = ExecutionRepository::find_by_id(pool, workflow_execution.execution) + .await? + .ok_or_else(|| { + anyhow::anyhow!( + "Parent execution {} not found for workflow_execution {}", workflow_execution.execution, + workflow_execution_id + ) + })?; + + // Cancellation must be a hard stop for workflow orchestration. Once + // either the workflow record, the parent execution, or the completed + // child itself is in a cancellation state, do not evaluate transitions, + // release more with_items siblings, or dispatch any successor tasks. + if Self::should_halt_workflow_advancement( + workflow_execution.status, + parent_execution.status, + execution.status, + ) { + if workflow_execution.status == ExecutionStatus::Cancelled { + let running = Self::count_running_workflow_children( + pool, workflow_execution_id, + &workflow_execution.completed_tasks, + &workflow_execution.failed_tasks, ) .await?; + + if running == 0 { + info!( + "Cancelled workflow_execution {} has no more running children, \ + finalizing parent execution {} as Cancelled", + workflow_execution_id, workflow_execution.execution + ); + Self::finalize_cancelled_workflow( + pool, + workflow_execution.execution, + workflow_execution_id, + ) + .await?; + } else { + debug!( + "Workflow_execution {} is cancelling/cancelled with {} running children, \ + skipping advancement", + workflow_execution_id, running + ); + } } else { debug!( - "Cancelled workflow_execution {} still has {} running children, \ - waiting for them to finish", - workflow_execution_id, running + "Workflow_execution {} advancement halted due to cancellation state \ + (workflow: {:?}, parent: {:?}, child: {:?})", + workflow_execution_id, + workflow_execution.status, + parent_execution.status, + execution.status ); } @@ -1116,17 +1127,6 @@ impl ExecutionScheduler { } } - // Load the parent execution for context - let parent_execution = ExecutionRepository::find_by_id(pool, workflow_execution.execution) - .await? - .ok_or_else(|| { - anyhow::anyhow!( - "Parent execution {} not found for workflow_execution {}", - workflow_execution.execution, - workflow_execution_id - ) - })?; - // ----------------------------------------------------------------- // Rebuild the WorkflowContext from persisted state + completed task // results so that successor task inputs can be rendered. @@ -1414,6 +1414,23 @@ impl ExecutionScheduler { Ok(count) } + fn should_halt_workflow_advancement( + workflow_status: ExecutionStatus, + parent_status: ExecutionStatus, + child_status: ExecutionStatus, + ) -> bool { + matches!( + workflow_status, + ExecutionStatus::Canceling | ExecutionStatus::Cancelled + ) || matches!( + parent_status, + ExecutionStatus::Canceling | ExecutionStatus::Cancelled + ) || matches!( + child_status, + ExecutionStatus::Canceling | ExecutionStatus::Cancelled + ) + } + /// Finalize a cancelled workflow by updating the parent `execution` record /// to `Cancelled`. The `workflow_execution` record is already `Cancelled` /// (set by `cancel_workflow_children`); this only touches the parent. @@ -1918,4 +1935,28 @@ mod tests { assert_eq!(update.status, Some(ExecutionStatus::Scheduled)); assert_eq!(update.worker, Some(99)); } + + #[test] + fn test_workflow_advancement_halts_for_any_cancellation_state() { + assert!(ExecutionScheduler::should_halt_workflow_advancement( + ExecutionStatus::Running, + ExecutionStatus::Canceling, + ExecutionStatus::Completed + )); + assert!(ExecutionScheduler::should_halt_workflow_advancement( + ExecutionStatus::Cancelled, + ExecutionStatus::Running, + ExecutionStatus::Failed + )); + assert!(ExecutionScheduler::should_halt_workflow_advancement( + ExecutionStatus::Running, + ExecutionStatus::Running, + ExecutionStatus::Cancelled + )); + assert!(!ExecutionScheduler::should_halt_workflow_advancement( + ExecutionStatus::Running, + ExecutionStatus::Running, + ExecutionStatus::Failed + )); + } } diff --git a/crates/executor/src/workflow/registrar.rs b/crates/executor/src/workflow/registrar.rs index 07e8d77..5a9ee92 100644 --- a/crates/executor/src/workflow/registrar.rs +++ b/crates/executor/src/workflow/registrar.rs @@ -379,7 +379,6 @@ impl WorkflowRegistrar { out_schema: workflow.output.clone(), definition, tags: workflow.tags.clone(), - enabled: true, }; let created = WorkflowDefinitionRepository::create(&self.pool, input).await?; @@ -411,7 +410,6 @@ impl WorkflowRegistrar { out_schema: workflow.output.clone(), definition: Some(definition), tags: Some(workflow.tags.clone()), - enabled: Some(true), }; let updated = WorkflowDefinitionRepository::update(&self.pool, *workflow_id, input).await?; diff --git a/migrations/20250101000006_workflow_system.sql b/migrations/20250101000006_workflow_system.sql index 1c689a2..254ebd4 100644 --- a/migrations/20250101000006_workflow_system.sql +++ b/migrations/20250101000006_workflow_system.sql @@ -26,14 +26,12 @@ CREATE TABLE workflow_definition ( out_schema JSONB, definition JSONB NOT NULL, tags TEXT[] DEFAULT '{}', - enabled BOOLEAN DEFAULT true NOT NULL, created TIMESTAMPTZ DEFAULT NOW() NOT NULL, updated TIMESTAMPTZ DEFAULT NOW() NOT NULL ); -- Indexes CREATE INDEX idx_workflow_def_pack ON workflow_definition(pack); -CREATE INDEX idx_workflow_def_enabled ON workflow_definition(enabled); CREATE INDEX idx_workflow_def_ref ON workflow_definition(ref); CREATE INDEX idx_workflow_def_tags ON workflow_definition USING gin(tags); @@ -137,7 +135,6 @@ SELECT wd.ref as workflow_ref, wd.label, wd.version, - wd.enabled, a.id as action_id, a.ref as action_ref, a.pack as pack_id, diff --git a/web/src/api/models/ApiResponse_WorkflowResponse.ts b/web/src/api/models/ApiResponse_WorkflowResponse.ts index 07c0119..2691c08 100644 --- a/web/src/api/models/ApiResponse_WorkflowResponse.ts +++ b/web/src/api/models/ApiResponse_WorkflowResponse.ts @@ -22,10 +22,6 @@ export type ApiResponse_WorkflowResponse = { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled: boolean; /** * Workflow ID */ @@ -72,4 +68,3 @@ export type ApiResponse_WorkflowResponse = { */ message?: string | null; }; - diff --git a/web/src/api/models/CreateWorkflowRequest.ts b/web/src/api/models/CreateWorkflowRequest.ts index 42ec7e8..eaca4ee 100644 --- a/web/src/api/models/CreateWorkflowRequest.ts +++ b/web/src/api/models/CreateWorkflowRequest.ts @@ -14,10 +14,6 @@ export type CreateWorkflowRequest = { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled?: boolean | null; /** * Human-readable label */ @@ -47,4 +43,3 @@ export type CreateWorkflowRequest = { */ version: string; }; - diff --git a/web/src/api/models/PaginatedResponse_WorkflowSummary.ts b/web/src/api/models/PaginatedResponse_WorkflowSummary.ts index 17fd948..f8c4934 100644 --- a/web/src/api/models/PaginatedResponse_WorkflowSummary.ts +++ b/web/src/api/models/PaginatedResponse_WorkflowSummary.ts @@ -19,10 +19,6 @@ export type PaginatedResponse_WorkflowSummary = { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled: boolean; /** * Workflow ID */ @@ -57,4 +53,3 @@ export type PaginatedResponse_WorkflowSummary = { */ pagination: PaginationMeta; }; - diff --git a/web/src/api/models/UpdateWorkflowRequest.ts b/web/src/api/models/UpdateWorkflowRequest.ts index 61dd732..c984cdf 100644 --- a/web/src/api/models/UpdateWorkflowRequest.ts +++ b/web/src/api/models/UpdateWorkflowRequest.ts @@ -14,10 +14,6 @@ export type UpdateWorkflowRequest = { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled?: boolean | null; /** * Human-readable label */ @@ -39,4 +35,3 @@ export type UpdateWorkflowRequest = { */ version?: string | null; }; - diff --git a/web/src/api/models/WorkflowResponse.ts b/web/src/api/models/WorkflowResponse.ts index 7c2d366..6e784ac 100644 --- a/web/src/api/models/WorkflowResponse.ts +++ b/web/src/api/models/WorkflowResponse.ts @@ -18,10 +18,6 @@ export type WorkflowResponse = { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled: boolean; /** * Workflow ID */ @@ -63,4 +59,3 @@ export type WorkflowResponse = { */ version: string; }; - diff --git a/web/src/api/models/WorkflowSummary.ts b/web/src/api/models/WorkflowSummary.ts index 3047eae..17e7cfb 100644 --- a/web/src/api/models/WorkflowSummary.ts +++ b/web/src/api/models/WorkflowSummary.ts @@ -14,10 +14,6 @@ export type WorkflowSummary = { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled: boolean; /** * Workflow ID */ @@ -47,4 +43,3 @@ export type WorkflowSummary = { */ version: string; }; - diff --git a/web/src/api/services/WorkflowsService.ts b/web/src/api/services/WorkflowsService.ts index e3bba47..6029d2d 100644 --- a/web/src/api/services/WorkflowsService.ts +++ b/web/src/api/services/WorkflowsService.ts @@ -57,7 +57,6 @@ export class WorkflowsService { page, pageSize, tags, - enabled, search, packRef, }: { @@ -73,10 +72,6 @@ export class WorkflowsService { * Filter by tag(s) - comma-separated list */ tags?: string | null, - /** - * Filter by enabled status - */ - enabled?: boolean | null, /** * Search term for label/description (case-insensitive) */ @@ -93,7 +88,6 @@ export class WorkflowsService { 'page': page, 'page_size': pageSize, 'tags': tags, - 'enabled': enabled, 'search': search, 'pack_ref': packRef, }, @@ -125,10 +119,6 @@ export class WorkflowsService { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled: boolean; /** * Workflow ID */ @@ -216,10 +206,6 @@ export class WorkflowsService { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled: boolean; /** * Workflow ID */ @@ -308,10 +294,6 @@ export class WorkflowsService { * Workflow description */ description?: string | null; - /** - * Whether the workflow is enabled - */ - enabled: boolean; /** * Workflow ID */ diff --git a/web/src/components/workflows/WorkflowEdges.tsx b/web/src/components/workflows/WorkflowEdges.tsx index 2dfdb4e..009cd8d 100644 --- a/web/src/components/workflows/WorkflowEdges.tsx +++ b/web/src/components/workflows/WorkflowEdges.tsx @@ -63,7 +63,9 @@ const ARROW_LENGTH = 12; const ARROW_HALF_WIDTH = 5; const ARROW_DIRECTION_LOOKBACK_PX = 10; const ARROW_DIRECTION_SAMPLES = 48; -const ARROW_SHAFT_OVERLAP_PX = 2; +// Keep a small amount of shaft under the arrowhead so sample-based trimming +// does not leave a visible gap on simple bezier edges without waypoints. +const ARROW_SHAFT_OVERLAP_PX = 4; /** Color for each edge type (alias for shared constant) */ const EDGE_COLORS = EDGE_TYPE_COLORS; diff --git a/web/src/components/workflows/WorkflowInputsPanel.tsx b/web/src/components/workflows/WorkflowInputsPanel.tsx index 9d6cbb1..a8a0ca9 100644 --- a/web/src/components/workflows/WorkflowInputsPanel.tsx +++ b/web/src/components/workflows/WorkflowInputsPanel.tsx @@ -1,11 +1,29 @@ import { useState } from "react"; -import { Pencil, Plus, X, LogIn, LogOut } from "lucide-react"; +import { + Pencil, + Plus, + X, + LogIn, + LogOut, + SlidersHorizontal, +} from "lucide-react"; import SchemaBuilder from "@/components/common/SchemaBuilder"; -import type { ParamDefinition } from "@/types/workflow"; +import type { CancellationPolicy, ParamDefinition } from "@/types/workflow"; +import { CANCELLATION_POLICY_LABELS } from "@/types/workflow"; interface WorkflowInputsPanelProps { + label: string; + version: string; + description: string; + tags: string[]; + cancellationPolicy: CancellationPolicy; parameters: Record; output: Record; + onLabelChange: (label: string) => void; + onVersionChange: (version: string) => void; + onDescriptionChange: (description: string) => void; + onTagsChange: (tags: string[]) => void; + onCancellationPolicyChange: (policy: CancellationPolicy) => void; onParametersChange: (parameters: Record) => void; onOutputChange: (output: Record) => void; } @@ -82,8 +100,18 @@ function ParamSummaryList({ } export default function WorkflowInputsPanel({ + label, + version, + description, + tags, + cancellationPolicy, parameters, output, + onLabelChange, + onVersionChange, + onDescriptionChange, + onTagsChange, + onCancellationPolicyChange, onParametersChange, onOutputChange, }: WorkflowInputsPanelProps) { @@ -123,8 +151,103 @@ export default function WorkflowInputsPanel({ <>
- {/* Input Parameters */}
+
+ +

+ Workflow +

+
+
+
+ + onLabelChange(e.target.value)} + className="w-full px-2.5 py-2 border border-gray-300 rounded-md text-sm focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + placeholder="Workflow Label" + /> +
+ +
+ + onDescriptionChange(e.target.value)} + className="w-full px-2.5 py-2 border border-gray-300 rounded-md text-sm focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + placeholder="Workflow description" + /> +
+ +
+
+ + onVersionChange(e.target.value)} + className="w-full px-2.5 py-2 border border-gray-300 rounded-md text-sm font-mono focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + placeholder="1.0.0" + /> +
+ +
+ + + onTagsChange( + e.target.value + .split(",") + .map((tag) => tag.trim()) + .filter(Boolean), + ) + } + className="w-full px-2.5 py-2 border border-gray-300 rounded-md text-sm focus:ring-2 focus:ring-blue-500 focus:border-blue-500" + placeholder="tag-one, tag-two" + /> +
+
+ +
+ + +
+
+
+ + {/* Input Parameters */} +
diff --git a/web/src/hooks/useWorkflows.ts b/web/src/hooks/useWorkflows.ts index 4cdc82d..8b03bea 100644 --- a/web/src/hooks/useWorkflows.ts +++ b/web/src/hooks/useWorkflows.ts @@ -10,7 +10,6 @@ interface WorkflowsQueryParams { pageSize?: number; packRef?: string; tags?: string; - enabled?: boolean; search?: string; } @@ -23,7 +22,6 @@ export function useWorkflows(params?: WorkflowsQueryParams) { page: params?.page || 1, pageSize: params?.pageSize || 50, tags: params?.tags, - enabled: params?.enabled, search: params?.search, packRef: params?.packRef, }); diff --git a/web/src/pages/actions/WorkflowBuilderPage.tsx b/web/src/pages/actions/WorkflowBuilderPage.tsx index bb0f12e..22793e4 100644 --- a/web/src/pages/actions/WorkflowBuilderPage.tsx +++ b/web/src/pages/actions/WorkflowBuilderPage.tsx @@ -15,6 +15,7 @@ import { ExternalLink, Copy, Check, + PanelLeftClose, } from "lucide-react"; import SearchableSelect from "@/components/common/SearchableSelect"; import yaml from "js-yaml"; @@ -40,7 +41,6 @@ import type { WorkflowBuilderState, PaletteAction, TransitionPreset, - CancellationPolicy, } from "@/types/workflow"; import { generateUniqueTaskName, @@ -54,7 +54,6 @@ import { removeTaskFromTransitions, renameTaskInTransitions, findStartingTaskIds, - CANCELLATION_POLICY_LABELS, } from "@/types/workflow"; const INITIAL_STATE: WorkflowBuilderState = { @@ -68,10 +67,13 @@ const INITIAL_STATE: WorkflowBuilderState = { vars: {}, tasks: [], tags: [], - enabled: true, cancellationPolicy: "allow_finish", }; +const ACTIONS_SIDEBAR_WIDTH = 256; +const WORKFLOW_OPTIONS_DEFAULT_WIDTH = 360; +const WORKFLOW_OPTIONS_STORAGE_KEY = "workflow-builder-options-width"; + export default function WorkflowBuilderPage() { const navigate = useNavigate(); const { ref: editRef } = useParams<{ ref?: string }>(); @@ -104,10 +106,19 @@ export default function WorkflowBuilderPage() { const [initialized, setInitialized] = useState(false); const [showYamlPreview, setShowYamlPreview] = useState(false); const [sidebarTab, setSidebarTab] = useState<"actions" | "inputs">("actions"); + const [workflowOptionsWidth, setWorkflowOptionsWidth] = useState(() => { + if (typeof window === "undefined") { + return WORKFLOW_OPTIONS_DEFAULT_WIDTH; + } + const saved = window.localStorage.getItem(WORKFLOW_OPTIONS_STORAGE_KEY); + const parsed = saved ? Number(saved) : NaN; + return Number.isFinite(parsed) ? parsed : WORKFLOW_OPTIONS_DEFAULT_WIDTH; + }); const [highlightedTransition, setHighlightedTransition] = useState<{ taskId: string; transitionIndex: number; } | null>(null); + const [isResizingSidebar, setIsResizingSidebar] = useState(false); // Start-node warning toast state const [startWarningVisible, setStartWarningVisible] = useState(false); @@ -261,6 +272,71 @@ export default function WorkflowBuilderPage() { return null; }, [state.tasks, startingTaskIds]); + const getMaxWorkflowOptionsWidth = useCallback(() => { + if (typeof window === "undefined") { + return WORKFLOW_OPTIONS_DEFAULT_WIDTH; + } + return Math.max( + ACTIONS_SIDEBAR_WIDTH, + Math.floor(window.innerWidth * 0.5), + ); + }, []); + + const clampWorkflowOptionsWidth = useCallback( + (width: number) => + Math.min( + Math.max(Math.round(width), ACTIONS_SIDEBAR_WIDTH), + getMaxWorkflowOptionsWidth(), + ), + [getMaxWorkflowOptionsWidth], + ); + + useEffect(() => { + setWorkflowOptionsWidth((prev) => clampWorkflowOptionsWidth(prev)); + }, [clampWorkflowOptionsWidth]); + + useEffect(() => { + const handleResize = () => { + setWorkflowOptionsWidth((prev) => clampWorkflowOptionsWidth(prev)); + }; + + window.addEventListener("resize", handleResize); + return () => window.removeEventListener("resize", handleResize); + }, [clampWorkflowOptionsWidth]); + + useEffect(() => { + window.localStorage.setItem( + WORKFLOW_OPTIONS_STORAGE_KEY, + String(workflowOptionsWidth), + ); + }, [workflowOptionsWidth]); + + useEffect(() => { + if (!isResizingSidebar) return; + + const handleMouseMove = (event: MouseEvent) => { + setWorkflowOptionsWidth(clampWorkflowOptionsWidth(event.clientX)); + }; + + const handleMouseUp = () => { + setIsResizingSidebar(false); + document.body.style.cursor = ""; + document.body.style.userSelect = ""; + }; + + document.body.style.cursor = "col-resize"; + document.body.style.userSelect = "none"; + window.addEventListener("mousemove", handleMouseMove); + window.addEventListener("mouseup", handleMouseUp); + + return () => { + document.body.style.cursor = ""; + document.body.style.userSelect = ""; + window.removeEventListener("mousemove", handleMouseMove); + window.removeEventListener("mouseup", handleMouseUp); + }; + }, [isResizingSidebar, clampWorkflowOptionsWidth]); + // Render-phase state adjustment: detect warning key changes for immediate // show/hide without refs or synchronous setState inside effects. const warningKey = startNodeWarning @@ -475,7 +551,6 @@ export default function WorkflowBuilderPage() { out_schema: Object.keys(state.output).length > 0 ? state.output : undefined, tags: state.tags.length > 0 ? state.tags : undefined, - enabled: state.enabled, }, }); } else { @@ -493,7 +568,6 @@ export default function WorkflowBuilderPage() { out_schema: Object.keys(state.output).length > 0 ? state.output : undefined, tags: state.tags.length > 0 ? state.tags : undefined, - enabled: state.enabled, }; try { await saveWorkflowFile.mutateAsync(fileData); @@ -635,6 +709,8 @@ export default function WorkflowBuilderPage() { const isSaving = saveWorkflowFile.isPending || updateWorkflowFile.isPending; const isExecuting = requestExecution.isPending; + const sidebarWidth = + sidebarTab === "inputs" ? workflowOptionsWidth : ACTIONS_SIDEBAR_WIDTH; if (isEditing && workflowLoading) { return ( @@ -675,7 +751,7 @@ export default function WorkflowBuilderPage() { disabled={isEditing} /> - / + / {/* Workflow name */} - - {/* Label */} - updateMetadata({ label: e.target.value })} - className="px-2 py-1.5 border border-gray-300 rounded text-sm focus:ring-2 focus:ring-blue-500 focus:border-blue-500 flex-1 min-w-[160px] max-w-[300px]" - placeholder="Workflow Label" - /> - - {/* Version */} - updateMetadata({ version: e.target.value })} - className="px-2 py-1.5 border border-gray-300 rounded text-sm font-mono focus:ring-2 focus:ring-blue-500 focus:border-blue-500 w-20" - placeholder="1.0.0" - /> + + {state.label || "Untitled workflow"} +
@@ -819,59 +880,6 @@ export default function WorkflowBuilderPage() {
- {/* Description row (collapsible) */} -
- updateMetadata({ description: e.target.value })} - className="flex-1 px-2 py-1 border border-gray-200 rounded text-xs text-gray-600 focus:ring-1 focus:ring-blue-500 focus:border-blue-500" - placeholder="Workflow description (optional)" - /> -
- - updateMetadata({ - tags: e.target.value - .split(",") - .map((t) => t.trim()) - .filter(Boolean), - }) - } - className="px-2 py-1 border border-gray-200 rounded text-xs text-gray-600 focus:ring-1 focus:ring-blue-500 focus:border-blue-500 w-40" - placeholder="Tags (comma-sep)" - /> - - -
-
{/* Validation errors panel */} @@ -987,8 +995,11 @@ export default function WorkflowBuilderPage() { ) : ( <> - {/* Left sidebar: tabbed Actions / Inputs */} -
+ {/* Left sidebar: tabbed Actions / Workflow Options */} +
{/* Tab header */}