Node Specification#
Parent document: YAML Reference Epic: DOC-002 (YAML Reference Modularization)
Overview#
Nodes are the fundamental building blocks of YAML agents. Each node represents a discrete step in the workflow that performs an action and returns state updates.
Table of Contents#
Basic Structure#
nodes:
- name: string # Required: unique node identifier
# One of the following execution methods:
run: string # Inline Python code
script: string # Alias for run (GitLab CI style)
uses: string # Built-in or custom action
steps: array # Multi-step execution
# Navigation (optional, replaces edges section):
goto: string | array # Next node: string for unconditional, array for conditional
# Additional options:
fan_in: boolean # Mark as fan-in node for parallel flows
with: object # Parameters for 'uses' actions
output: string # Key name for action result
Execution Methods#
Method 1: Inline Python Code (run:)#
- name: process_data
run: |
# Full Python with state access
data = state["input"]
processed = data.upper()
count = len(data.split())
# Must return a dict to update state
return {
"processed": processed,
"word_count": count
}
Available in execution context:
state- Current state dictionaryvariables- Global variables from YAMLvariables:sectionsecrets- Secrets from configured backend (see Secrets Actions)json- Python json modulerequests- Auto-imported if referenceddatetime- Auto-imported if referencedOpenAI- Auto-imported if referenced
Method 2: Script (script:)#
Alias for run:, inspired by GitLab CI:
- name: process_data
script: |
result = state["value"] * 2
return {"doubled": result}
Method 2b: Lua Code#
Execute Lua code instead of Python for cross-runtime compatibility with the Rust implementation.
- name: process_lua
run: |
-- lua
local result = {}
result.value = state.value * 2
result.message = state.name .. "!"
return result
Complete Lua documentation: Advanced Runtimes - Lua Includes sandbox details, cross-runtime compatibility, and portable syntax guide.
Method 2c: Prolog Code#
Execute Prolog code for neurosymbolic AI workflows combining neural network outputs with symbolic reasoning.
- name: validate
language: prolog
run: |
state(value, V),
V2 is V * 2,
return(result, V2).
Complete Prolog documentation: Advanced Runtimes - Prolog Includes CLP(FD), module pre-loading, and runtime comparison.
Method 3: Built-in Actions (uses:)#
- name: call_api
uses: http.get
with:
url: "{{ variables.api_url }}/data"
headers:
Authorization: "Bearer {{ secrets.api_key }}"
output: api_response
Parameters:
uses:- Action name (see Built-in Actions)with:- Action parameters (template-processed)output:- State key for result (optional)
Method 4: Expression#
For simple evaluations:
- name: check_count
run:
type: expression
value: len(state.get("items", [])) > 0
output_key: has_items
Method 5: Multi-Step (steps:)#
GitHub Actions-style sequential steps within a node:
- name: multi_step_process
steps:
- name: step1
run: |
return {"intermediate": state["input"] + " processed"}
- name: step2
uses: http.post
with:
url: https://api.example.com/submit
json:
data: "{{ state.intermediate }}"
- name: step3
run: |
return {"final": f"Submitted: {state['intermediate']}"}
Method 6: While-Loop#
Execute a loop body repeatedly until a condition becomes false or max iterations is reached:
- name: refine_until_valid
type: while_loop
condition: "not state.get('is_valid', False)" # Jinja2/Tera expression
max_iterations: 10 # Required: 1-1000
body:
- name: generate
uses: llm.call
with:
model: gpt-4o
messages:
- role: user
content: "Generate valid JSON for: {{ state.prompt }}"
output: llm_response
- name: validate
run: |
import json
try:
parsed = json.loads(state.get('llm_response', {}).get('content', '{}'))
return {"parsed_result": parsed, "is_valid": True}
except:
return {"is_valid": False}
Required fields:
condition: Jinja2 (Python) or Tera (Rust) expression evaluated before each iterationmax_iterations: Safety guard (integer 1-1000) to prevent infinite loopsbody: List of nodes to execute sequentially on each iteration
Behavior:
Evaluate
conditionbefore each iterationIf
true, execute all body nodes sequentiallyState from each iteration passes to the next
Loop exits when condition is
falseormax_iterationsreachedFinal state passes to downstream nodes
Safety guards:
max_iterationsis required — YAML parsing fails if missingRange must be 1-1000 (validation error otherwise)
Nested while-loops are NOT supported (validation error if attempted)
Body execution errors propagate immediately (no automatic retry)
Events emitted:
Event |
Payload |
|---|---|
|
|
|
|
|
|
exit_reason is either "condition_false" or "max_iterations_reached".
Use cases:
LLM refinement until output passes validation
Data extraction with retry until all fields populated
Research agents that continue until sufficient sources found
Cross-runtime parity: The while-loop syntax works identically in both Python and Rust TEA implementations. The same YAML file produces identical results in both runtimes.
Simple counter example:
name: counter-demo
nodes:
- name: count_loop
type: while_loop
condition: "state.count < 5"
max_iterations: 10
body:
- name: increment
run: |
-- lua
local count = state.count or 0
local sum = state.sum or 0
return { count = count + 1, sum = sum + count + 1 }
edges:
- from: __start__
to: count_loop
- from: count_loop
to: __end__
Result with initial state {count: 0, sum: 0}:
Loop runs 5 iterations (count goes 0→1→2→3→4→5)
Final state:
{count: 5, sum: 15}Exit reason:
condition_false(count is no longer < 5)
Method 7: Dynamic Parallel Fan-Out#
Execute branches in parallel over a runtime-evaluated collection, with built-in fan-in:
- name: process_items
type: dynamic_parallel
items: "{{ state.urls }}" # Jinja2 expression → list at runtime
item_var: url # Variable name for each item (default: "item")
index_var: idx # Variable name for index (default: "index")
max_concurrency: 5 # Optional: throttle parallel execution
fail_fast: true # Optional: cancel remaining on first failure
action: # Option A: single action per item
uses: http.get
with:
url: "{{ url }}"
output: response
output: all_responses # Results collected here
Three execution modes (mutually exclusive):
Action mode (
action:): Execute a single action per itemSteps mode (
steps:): Execute sequential steps per itemSubgraph mode (
subgraph:): Load and execute an external YAML file per item
Required fields:
items: Jinja2 expression that evaluates to an iterable at runtimeOne of:
action:,steps:, orsubgraph:
Optional fields:
item_var: Name of the variable holding each item (default:"item")index_var: Name of the variable holding the 0-based index (default:"index")max_concurrency: Maximum parallel branches (default: unlimited)fail_fast: Stop remaining branches on first error (default:false)output: State key to store collected results (default:"parallel_results")
Action mode example:
- name: fetch_all_urls
type: dynamic_parallel
items: "{{ state.urls }}"
item_var: url
max_concurrency: 10
action:
uses: http.get
with:
url: "{{ url }}"
output: response
output: responses
Steps mode example:
- name: process_documents
type: dynamic_parallel
items: "{{ state.documents }}"
item_var: doc
index_var: i
steps:
- name: extract
uses: llm.call
with:
model: gpt-4o
messages:
- role: user
content: "Extract key points from: {{ doc.content }}"
output: extraction
- name: summarize
uses: llm.call
with:
model: gpt-4o
messages:
- role: user
content: "Summarize: {{ state.extraction.content }}"
output: summary
output: processed_docs
Subgraph mode example:
- name: run_analysis_per_item
type: dynamic_parallel
items: "{{ state.data_sources }}"
item_var: source
max_concurrency: 3
fail_fast: true
subgraph: "./analysis-workflow.yaml" # Supports local, s3://, gs://, az://, http://
input:
data_source: "{{ source }}"
config: "{{ state.analysis_config }}"
output: analysis_results
Behavior:
Evaluate
itemsexpression to get the collectionFor each item, spawn a parallel branch with
item_varandindex_varinjectedExecute branches concurrently (throttled by
max_concurrencyif set)Collect results from all branches into
outputas a list ofParallelFlowResultIf
fail_fast: true, cancel remaining branches on first failure
Result format:
Each result in the output list is a ParallelFlowResult with:
state: The final state from that branchsource_node: The dynamic_parallel node nameindex: The branch index (0-based)
Safety guards:
itemsmust evaluate to a list/iterable (runtime error otherwise)Empty
itemslist results in empty output (no branches executed)max_concurrencymust be positive integer if specifiedBranch errors are captured in results unless
fail_fastcancels execution
Events emitted:
Event |
Payload |
|---|---|
|
|
|
|
|
|
|
|
Use cases:
Process a batch of URLs, files, or API endpoints in parallel
Run the same analysis workflow on multiple data sources
Fan-out LLM calls across a list of prompts with rate limiting
Parallel document processing pipelines with controlled concurrency
Comparison with static parallel edges:
Feature |
Static Parallel (edges) |
Dynamic Parallel |
|---|---|---|
Branch count |
Fixed at YAML parse time |
Determined at runtime |
Fan-in |
Explicit fan-in node |
Built-in, automatic |
Item iteration |
Manual |
Automatic with item_var/index_var |
Concurrency control |
None |
max_concurrency + fail_fast |
Subgraph loading |
Not supported |
Supported with fsspec |
Fan-In Nodes#
For collecting results from parallel flows:
- name: aggregate
fan_in: true
run: |
# parallel_results contains list of states from parallel flows
all_data = [r.get("data") for r in parallel_results]
combined = "\n".join(all_data)
return {"combined_results": combined}
Fan-in nodes are used with parallel edges to collect and combine results from multiple concurrent execution paths.
See Also#
Navigation & Flow - Routing between nodes
Template Syntax - Variable interpolation
Advanced Runtimes - Lua and Prolog details
Actions Overview - Built-in action reference