Advanced workflows in NebulaFlow leverage sophisticated execution patterns to handle complex scenarios. This guide covers subflows, loops, parallel execution, conditional branching, and hybrid execution models.
Subflows are reusable workflow components that encapsulate a graph of nodes. They function like functions in programming - you define them once and reuse them multiple times.
Subflow Definition
├── Inputs (SubflowInput nodes)
│ ├── port1: string
│ ├── port2: number
│ └── ...
├── Internal Graph
│ ├── Node A
│ ├── Node B
│ └── Node C
└── Outputs (SubflowOutput nodes)
├── result1: string
└── result2: number
Subflows execute as a unit:
Subflows maintain their own execution state:
interface SubflowDefinitionDTO {
id: string
title: string
version: string
inputs: SubflowPortDTO[]
outputs: SubflowPortDTO[]
graph: SubflowGraphDTO
}
NebulaFlow provides two loop nodes:
NodeType.LOOP_START) - Begins a loop iterationNodeType.LOOP_END) - Ends a loop iterationinterface LoopStartNode {
type: NodeType.LOOP_START
data: {
iterations: number
loopVariable: string
overrideIterations?: boolean
loopMode?: 'fixed' | 'while-variable-not-empty'
collectionVariable?: string
maxSafeIterations?: number
}
}
Properties:
iterations: Number of iterations to run (for fixed mode)loopVariable: Variable name for the current iterationoverrideIterations: Allow runtime override of iteration countloopMode: Execution mode
'fixed': Run for a fixed number of iterations'while-variable-not-empty': Run while a variable is not emptycollectionVariable: Variable containing collection to iterate over (for while mode)maxSafeIterations: Safety limit to prevent infinite loopsRuns a fixed number of times:
Loop Start (iterations=5, loopVariable='i')
└── Body nodes
Loop End
Example:
Runs while a variable is not empty:
Loop Start (loopMode='while-variable-not-empty', collectionVariable='items')
└── Body nodes
Loop End
Example:
Loop variables are accessible within the loop body using ${loopVariable} syntax:
Loop Start (loopVariable='i')
└── LLM Node: "Process item ${i}"
Loop End
To prevent infinite loops, you can set a maximum iteration limit:
{
iterations: 100,
maxSafeIterations: 1000 // Safety limit
}
Allow runtime override of iteration count:
{
iterations: 10,
overrideIterations: true // Can be overridden at runtime
}
NebulaFlow uses a parallel execution engine that:
The execution engine uses Kahn’s algorithm for topological sorting:
1. Compute in-degree for each node
2. Queue nodes with zero in-degree
3. Process queue:
- Execute node
- Decrease in-degree of neighbors
- Add nodes with zero in-degree to queue
4. Repeat until all nodes processed
Nodes are grouped into parallel steps (waves):
Step 0: [Node A, Node B] (no dependencies)
Step 1: [Node C, Node D] (depend on A and B)
Step 2: [Node E] (depends on C and D)
Nodes within the same step can execute in parallel.
Limit total concurrent nodes:
const options = {
concurrency: 10 // Max 10 nodes at once
}
Limit concurrent nodes by type:
const options = {
perType: {
[NodeType.LLM]: 8, // Max 8 LLM nodes
[NodeType.CLI]: 8, // Max 8 CLI nodes
[NodeType.LLM]: 4, // Max 4 LLM nodes (override)
}
}
Default limits:
Cancels all in-flight tasks on first error:
const options = {
onError: 'fail-fast'
}
Use when: Errors should stop the entire workflow
Continues other subgraphs on error:
const options = {
onError: 'continue-subgraph'
}
Use when: Independent branches should continue on error
The engine identifies branch subgraphs for conditional execution:
IF Node
├── True Branch: [Node A, Node B]
└── False Branch: [Node C, Node D]
Each branch can execute independently once the IF condition is evaluated.
The engine performs parallel analysis to:
For workflows with loops, NebulaFlow uses hybrid execution:
1. Analyze graph structure
2. Compute parallel steps
3. Execute step 0 (parallel)
4. Wait for step 0 completion
5. Execute step 1 (parallel)
6. Repeat until complete
The If/Else Node (NodeType.IF_ELSE) enables conditional execution:
IF Node
├── True Path → [Node A, Node B]
└── False Path → [Node C, Node D]
The If/Else node evaluates the input value as a boolean:
Once the condition is evaluated:
Nodes can accept multiple inputs:
Node A ──┐
Node B ──┤ → Node D
Node C ──┘
Supported by:
Nodes can produce multiple outputs:
Node A ──┐
├→ Node C
Node B ──┘
Supported by:
The engine tracks input edge counts and completion status:
The engine tracks output distribution:
NebulaFlow supports both execution models:
Sequential:
Parallel:
For complex workflows, NebulaFlow uses a hybrid approach:
For workflows with loops:
Parallel Step 0: [Node A, Node B]
Parallel Step 1: [Loop Start]
Sequential Step 2: [Loop Body - iterations]
Parallel Step 3: [Loop End, Node C]
For workflows without loops:
executeWorkflowParallel(nodes, edges, callbacks, {
concurrency: 10,
perType: { [NodeType.LLM]: 8, [NodeType.CLI]: 8 }
})
For workflows with loops:
executeWorkflowParallel(nodes, edges, callbacks, {
concurrency: 10,
perType: { [NodeType.LLM]: 8, [NodeType.CLI]: 8 }
})
// Automatically detects loops and uses hybrid execution
For workflows with unsupported nodes:
// Falls back to sequential execution
// Triggered by NEBULAFLOW_DISABLE_HYBRID_PARALLEL env var
// Conservative (safe)
const conservative = {
concurrency: 5,
perType: { [NodeType.LLM]: 3, [NodeType.CLI]: 3 }
}
// Aggressive (fast)
const aggressive = {
concurrency: 20,
perType: { [NodeType.LLM]: 10, [NodeType.CLI]: 10 }
}
// Balanced (recommended)
const balanced = {
concurrency: 10,
perType: { [NodeType.LLM]: 8, [NodeType.CLI]: 8 }
}
// Critical workflow (fail-fast)
const critical = {
onError: 'fail-fast'
}
// Non-critical workflow (continue on error)
const nonCritical = {
onError: 'continue-subgraph'
}
Use case: Process multiple items independently
Input Node (items)
└── Loop Start (iterations=items.length)
└── LLM Node (process item)
└── Loop End
└── Accumulator Node (collect results)
Use case: Different processing paths based on condition
Input Node
└── IF Node (condition)
├── True: [LLM A, CLI B] (parallel)
└── False: [LLM C, CLI D] (parallel)
└── Preview Node (combine results)
Use case: Reusable processing logic
Main Workflow
└── Subflow Node (Data Processor v1)
├── Input: Raw Data
└── Output: Processed Data
└── Subflow Node (Data Processor v1) // Reuse same subflow
├── Input: More Data
└── Output: More Processed Data
Use case: Parallel processing within loops
Loop Start (iterations=10)
└── Parallel Step: [LLM A, CLI B] // Process in parallel
Loop End
└── Accumulator (collect all results)
# Disable hybrid parallel execution (fallback to sequential)
export NEBULAFLOW_DISABLE_HYBRID_PARALLEL=1
# Set default concurrency
export NEBULAFLOW_DEFAULT_CONCURRENCY=10
# Set per-type limits
export NEBULAFLOW_LLM_CONCURRENCY=8
export NEBULAFLOW_CLI_CONCURRENCY=8
# Set maximum safe iterations (default: 1000)
export NEBULAFLOW_MAX_ITERATIONS=1000
# Enable loop iteration override
export NEBULAFLOW_ALLOW_ITERATION_OVERRIDE=1
interface ParallelOptions {
concurrency?: number
perType?: Partial<Record<NodeType, number>>
onError?: 'fail-fast' | 'continue-subgraph'
seeds?: {
outputs?: Record<string, string>
decisions?: Record<string, 'true' | 'false'>
variables?: Record<string, string>
}
pause?: { isPaused: () => boolean }
}
interface LoopConfig {
iterations: number
loopVariable: string
overrideIterations?: boolean
loopMode?: 'fixed' | 'while-variable-not-empty'
collectionVariable?: string
maxSafeIterations?: number
}
Monitor execution with detailed logs:
// Extension logs show:
// - Parallel step computation
// - Node execution status
// - Error handling
// - Loop iteration tracking
Track execution metrics:
Problem: Nodes not executing in parallel
Problem: Too many concurrent nodes
Problem: Infinite loop
Problem: Loop variable not accessible
${loopVariable} syntaxProblem: Subflow not found
Problem: Port connection issues
Speedup potential:
Resource usage:
Fixed iterations:
While loops:
Reusable subflows:
Symptoms: Workflow runs slower than expected
Causes:
Solutions:
Symptoms: Workflow never completes
Causes:
Solutions:
Symptoms: Subflow node shows error
Causes:
Solutions:
Symptoms: Cannot connect nodes to subflow
Causes:
Solutions:
Meaning: Loop nodes present in workflow
Action: Workflow will use hybrid execution automatically
Meaning: Loop hit safety limit
Action: Review loop logic or increase maxSafeIterations
Meaning: Subflow ID invalid or deleted
Action: Refresh subflow list or recreate subflow