Documentation
¶
Overview ¶
Package pipeline provides workflow execution for AI agent orchestration. conditions.go implements conditional expression evaluation for workflow steps.
Package pipeline provides workflow execution for AI agent orchestration. executor.go implements the core execution engine for running workflows.
Package pipeline provides workflow execution for AI agent orchestration. loops.go implements loop constructs for workflow steps: for-each, while, and times loops.
Package pipeline provides workflow execution for AI agent orchestration. notify.go implements notifications for pipeline events.
Package pipeline provides workflow execution for AI agent orchestration. robot.go implements the --robot-pipeline-* APIs for machine-readable output.
Package pipeline provides workflow execution for AI agent orchestration. variables.go implements variable substitution and output parsing for workflows.
Index ¶
- Constants
- Variables
- func CancelPipeline(runID string)
- func CleanupStates(projectDir string, olderThan time.Duration) (int, error)
- func ClearLoopVars(state *ExecutionState, varName string)
- func ClearPipelineRegistry()
- func EvaluateCondition(condition string, sub *Substitutor) (skip bool, err error)
- func Execute(ctx context.Context, p Pipeline) error
- func GenerateRunID() string
- func IsValidAgentType(t string) bool
- func LoadAndValidate(path string) (*Workflow, ValidationResult, error)
- func NormalizeAgentType(t string) string
- func ParseBool(s string) bool
- func ParseInt(s string, defaultVal int) int
- func ParsePipelineVars(varsJSON string) (map[string]interface{}, error)
- func PrintPipelineCancel(runID string) int
- func PrintPipelineList() int
- func PrintPipelineRun(opts PipelineRunOptions) int
- func PrintPipelineStatus(runID string) int
- func RegisterPipeline(exec *PipelineExecution)
- func SaveState(projectDir string, state *ExecutionState) error
- func SetLoopVars(state *ExecutionState, varName string, item interface{}, index, total int)
- func ShouldNotify(settings WorkflowSettings, event NotificationEvent) bool
- func StoreStepOutput(state *ExecutionState, stepID string, output string, parsedData interface{})
- func UpdatePipelineFromState(runID string, state *ExecutionState)
- func ValidateCondition(condition string) []string
- func ValidateVarRefs(template string, availableVars []string) []string
- type ConditionEvaluator
- type ConditionResult
- type DependencyError
- type DependencyGraph
- func (g *DependencyGraph) ExecutedCount() int
- func (g *DependencyGraph) GetDependencies(id string) []string
- func (g *DependencyGraph) GetDependents(id string) []string
- func (g *DependencyGraph) GetFailedDependencies(id string) []string
- func (g *DependencyGraph) GetReadySteps() []string
- func (g *DependencyGraph) GetStep(id string) (*Step, bool)
- func (g *DependencyGraph) HasFailedDependency(id string) bool
- func (g *DependencyGraph) IsExecuted(id string) bool
- func (g *DependencyGraph) IsFailed(id string) bool
- func (g *DependencyGraph) MarkExecuted(id string) error
- func (g *DependencyGraph) MarkFailed(id string) error
- func (g *DependencyGraph) Resolve() ExecutionPlan
- func (g *DependencyGraph) Size() int
- func (g *DependencyGraph) Validate() []DependencyError
- type Duration
- type ErrLoopBreak
- type ErrLoopContinue
- type ErrMaxIterations
- type ErrorAction
- type ExecutionError
- type ExecutionPlan
- type ExecutionState
- type ExecutionStatus
- type Executor
- func (e *Executor) Cancel()
- func (e *Executor) GetState() *ExecutionState
- func (e *Executor) Resume(ctx context.Context, workflow *Workflow, prior *ExecutionState, ...) (*ExecutionState, error)
- func (e *Executor) Run(ctx context.Context, workflow *Workflow, vars map[string]interface{}, ...) (*ExecutionState, error)
- func (e *Executor) SetNotifier(n *Notifier)
- func (e *Executor) Validate(workflow *Workflow) ValidationResult
- type ExecutorConfig
- type LoopConfig
- type LoopContext
- type LoopControl
- type LoopExecutor
- type LoopResult
- type NotificationChannel
- type NotificationEvent
- type NotificationPayload
- type Notifier
- type NotifierConfig
- type OutputParse
- type OutputParser
- type ParallelGroupResult
- type ParseError
- type Pipeline
- type PipelineCancelOutput
- type PipelineExecution
- type PipelineHints
- type PipelineListOutput
- type PipelineProgress
- type PipelineRunOptions
- type PipelineRunOutput
- type PipelineStatusOutput
- type PipelineStep
- type PipelineSummary
- type ProgressEvent
- type RobotResponse
- type RoutingStrategy
- type Stage
- type Step
- type StepError
- type StepResult
- type SubstitutionError
- type Substitutor
- type ValidationResult
- type VarDef
- type VarType
- type VariableContext
- type WaitCondition
- type Workflow
- type WorkflowSettings
Constants ¶
const ( ErrCodeInvalidFlag = "INVALID_FLAG" ErrCodeSessionNotFound = "SESSION_NOT_FOUND" ErrCodeInternalError = "INTERNAL_ERROR" )
Robot error codes
const DefaultMaxIterations = 100
DefaultMaxIterations is the default safety limit for loops
const MinProgressInterval = 100 * time.Millisecond
MinProgressInterval is the minimum allowed progress interval to prevent ticker panics. time.NewTicker requires a positive duration.
const SchemaVersion = "2.0"
SchemaVersion is the current workflow schema version
Variables ¶
var AgentTypeAliases = map[string]string{
"claude": "claude",
"cc": "claude",
"claude-code": "claude",
"codex": "codex",
"cod": "codex",
"openai": "codex",
"gemini": "gemini",
"gmi": "gemini",
"google": "gemini",
}
AgentTypeAliases maps various agent type names to canonical forms
Functions ¶
func CancelPipeline ¶
func CancelPipeline(runID string)
CancelPipeline cancels a running pipeline by run ID (exported for REST API)
func CleanupStates ¶
CleanupStates removes pipeline state files older than the provided duration. Returns the number of deleted state files.
func ClearLoopVars ¶
func ClearLoopVars(state *ExecutionState, varName string)
ClearLoopVars removes loop context variables from execution state.
func ClearPipelineRegistry ¶
func ClearPipelineRegistry()
ClearPipelineRegistry clears the pipeline registry (for testing)
func EvaluateCondition ¶
func EvaluateCondition(condition string, sub *Substitutor) (skip bool, err error)
EvaluateCondition is a convenience function that evaluates a condition string using a substitutor. Returns true if the step should be SKIPPED. This maintains backward compatibility with the original evaluateCondition API.
func GenerateRunID ¶
func GenerateRunID() string
GenerateRunID creates a unique run ID using timestamp and random bytes (exported)
func IsValidAgentType ¶
IsValidAgentType checks if the given agent type is recognized. Case-insensitive: "Claude", "CLAUDE", "claude" are all valid.
func LoadAndValidate ¶
func LoadAndValidate(path string) (*Workflow, ValidationResult, error)
LoadAndValidate is a convenience function that parses and validates a workflow file
func NormalizeAgentType ¶
NormalizeAgentType converts agent type aliases to canonical form. Case-insensitive: "Claude", "CLAUDE", "claude" all normalize to "claude".
func ParsePipelineVars ¶
ParsePipelineVars parses JSON variable string into map
func PrintPipelineCancel ¶
PrintPipelineCancel cancels a running pipeline
func PrintPipelineList ¶
func PrintPipelineList() int
PrintPipelineList outputs all tracked pipelines
func PrintPipelineRun ¶
func PrintPipelineRun(opts PipelineRunOptions) int
PrintPipelineRun starts a pipeline and returns status
func PrintPipelineStatus ¶
PrintPipelineStatus outputs the status of a running/completed pipeline
func RegisterPipeline ¶
func RegisterPipeline(exec *PipelineExecution)
RegisterPipeline registers a pipeline execution (exported for CLI)
func SaveState ¶
func SaveState(projectDir string, state *ExecutionState) error
SaveState persists the execution state to .ntm/pipelines/<run-id>.json.
func SetLoopVars ¶
func SetLoopVars(state *ExecutionState, varName string, item interface{}, index, total int)
SetLoopVars sets loop context variables in the execution state.
func ShouldNotify ¶
func ShouldNotify(settings WorkflowSettings, event NotificationEvent) bool
ShouldNotify checks if a notification should be sent for the given event.
func StoreStepOutput ¶
func StoreStepOutput(state *ExecutionState, stepID string, output string, parsedData interface{})
StoreStepOutput stores a step's output in the execution state for variable access. Note: Caller must hold any necessary locks on state.Variables if used concurrently.
func UpdatePipelineFromState ¶
func UpdatePipelineFromState(runID string, state *ExecutionState)
UpdatePipelineFromState updates a registered pipeline from execution state (exported for CLI)
func ValidateCondition ¶
ValidateCondition checks if a condition expression has valid syntax. Returns a list of issues found (empty if valid).
func ValidateVarRefs ¶
ValidateVarRefs validates that all variable references in a string are valid. Returns a list of invalid references.
Types ¶
type ConditionEvaluator ¶
type ConditionEvaluator struct {
// contains filtered or unexported fields
}
ConditionEvaluator evaluates conditional expressions for workflow steps. It supports boolean, equality, comparison, contains, and logical operators.
func NewConditionEvaluator ¶
func NewConditionEvaluator(sub *Substitutor) *ConditionEvaluator
NewConditionEvaluator creates a new condition evaluator with the given substitutor.
func (*ConditionEvaluator) Evaluate ¶
func (e *ConditionEvaluator) Evaluate(condition string) (ConditionResult, error)
Evaluate evaluates a condition expression. Returns ConditionResult with Value=true if the condition is met (step should run), Value=false if condition is not met (step should be skipped).
type ConditionResult ¶
type ConditionResult struct {
Value bool // True if condition is met (step should RUN)
Skip bool // True if step should be skipped (inverse of Value)
Reason string // Human-readable explanation
}
ConditionResult contains the result of condition evaluation.
type DependencyError ¶
type DependencyError struct {
Type string `json:"type"` // cycle, missing_dep, unreachable
Steps []string `json:"steps"` // affected step IDs
Message string `json:"message"`
}
DependencyError represents an error in the dependency graph
func (DependencyError) Error ¶
func (e DependencyError) Error() string
type DependencyGraph ¶
type DependencyGraph struct {
// contains filtered or unexported fields
}
DependencyGraph represents step dependencies for execution ordering
func NewDependencyGraph ¶
func NewDependencyGraph(workflow *Workflow) *DependencyGraph
NewDependencyGraph creates a dependency graph from workflow steps
func (*DependencyGraph) ExecutedCount ¶
func (g *DependencyGraph) ExecutedCount() int
ExecutedCount returns the number of executed steps.
func (*DependencyGraph) GetDependencies ¶
func (g *DependencyGraph) GetDependencies(id string) []string
GetDependencies returns the dependencies for a step
func (*DependencyGraph) GetDependents ¶
func (g *DependencyGraph) GetDependents(id string) []string
GetDependents returns steps that depend on the given step
func (*DependencyGraph) GetFailedDependencies ¶
func (g *DependencyGraph) GetFailedDependencies(id string) []string
GetFailedDependencies returns the list of failed dependencies for a step
func (*DependencyGraph) GetReadySteps ¶
func (g *DependencyGraph) GetReadySteps() []string
GetReadySteps returns steps that are ready to execute
func (*DependencyGraph) GetStep ¶
func (g *DependencyGraph) GetStep(id string) (*Step, bool)
GetStep returns a step by ID
func (*DependencyGraph) HasFailedDependency ¶
func (g *DependencyGraph) HasFailedDependency(id string) bool
HasFailedDependency returns true if any of the step's dependencies failed
func (*DependencyGraph) IsExecuted ¶
func (g *DependencyGraph) IsExecuted(id string) bool
IsExecuted returns whether a step has been executed
func (*DependencyGraph) IsFailed ¶
func (g *DependencyGraph) IsFailed(id string) bool
IsFailed returns whether a step has failed
func (*DependencyGraph) MarkExecuted ¶
func (g *DependencyGraph) MarkExecuted(id string) error
MarkExecuted marks a step as executed
func (*DependencyGraph) MarkFailed ¶
func (g *DependencyGraph) MarkFailed(id string) error
MarkFailed marks a step as failed (for CONTINUE mode dependency tracking)
func (*DependencyGraph) Resolve ¶
func (g *DependencyGraph) Resolve() ExecutionPlan
Resolve performs topological sort and returns execution plan
func (*DependencyGraph) Size ¶
func (g *DependencyGraph) Size() int
Size returns the number of steps in the graph
func (*DependencyGraph) Validate ¶
func (g *DependencyGraph) Validate() []DependencyError
Validate checks the dependency graph for errors
type Duration ¶
Duration is a wrapper for time.Duration that supports YAML/TOML/JSON parsing
func DefaultStepTimeout ¶
func DefaultStepTimeout() Duration
DefaultStepTimeout returns the default timeout for a step
func (Duration) MarshalText ¶
MarshalText implements encoding.TextMarshaler for Duration
func (*Duration) UnmarshalText ¶
UnmarshalText implements encoding.TextUnmarshaler for Duration
type ErrLoopBreak ¶
type ErrLoopBreak struct {
Reason string
}
ErrLoopBreak is returned when a loop is exited via break control.
func (*ErrLoopBreak) Error ¶
func (e *ErrLoopBreak) Error() string
type ErrLoopContinue ¶
type ErrLoopContinue struct{}
ErrLoopContinue is returned when an iteration should be skipped.
func (*ErrLoopContinue) Error ¶
func (e *ErrLoopContinue) Error() string
type ErrMaxIterations ¶
type ErrMaxIterations struct {
Limit int
}
ErrMaxIterations is returned when the max iterations limit is reached.
func (*ErrMaxIterations) Error ¶
func (e *ErrMaxIterations) Error() string
type ErrorAction ¶
type ErrorAction string
ErrorAction defines how to handle step errors
const ( ErrorActionFail ErrorAction = "fail" // Wait for all, report all errors ErrorActionFailFast ErrorAction = "fail_fast" // Cancel remaining on first error ErrorActionContinue ErrorAction = "continue" // Ignore errors, continue workflow ErrorActionRetry ErrorAction = "retry" // Retry failed steps )
type ExecutionError ¶
type ExecutionError struct {
StepID string `json:"step_id,omitempty"`
Type string `json:"type"`
Message string `json:"message"`
Timestamp time.Time `json:"timestamp"`
Fatal bool `json:"fatal"`
}
ExecutionError represents an error that occurred during execution
type ExecutionPlan ¶
type ExecutionPlan struct {
Order []string `json:"order"` // Step IDs in execution order
Levels [][]string `json:"levels"` // Parallelizable levels
Errors []DependencyError `json:"errors,omitempty"`
Valid bool `json:"valid"`
}
ExecutionPlan contains the resolved execution order
func ResolveWorkflow ¶
func ResolveWorkflow(workflow *Workflow) ExecutionPlan
ResolveWorkflow is a convenience function to create a graph and resolve it
type ExecutionState ¶
type ExecutionState struct {
RunID string `json:"run_id"`
WorkflowID string `json:"workflow_id"`
WorkflowFile string `json:"workflow_file,omitempty"`
Session string `json:"session,omitempty"`
Status ExecutionStatus `json:"status"`
StartedAt time.Time `json:"started_at"`
UpdatedAt time.Time `json:"updated_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
CurrentStep string `json:"current_step,omitempty"`
Steps map[string]StepResult `json:"steps"`
Variables map[string]interface{} `json:"variables"` // Runtime variables including step outputs
Errors []ExecutionError `json:"errors,omitempty"`
}
ExecutionState contains the complete state of a workflow execution
func LoadState ¶
func LoadState(projectDir, runID string) (*ExecutionState, error)
LoadState loads execution state from .ntm/pipelines/<run-id>.json.
type ExecutionStatus ¶
type ExecutionStatus string
ExecutionStatus represents the current state of workflow execution
const ( StatusPending ExecutionStatus = "pending" StatusRunning ExecutionStatus = "running" StatusPaused ExecutionStatus = "paused" StatusCompleted ExecutionStatus = "completed" StatusFailed ExecutionStatus = "failed" StatusCancelled ExecutionStatus = "cancelled" StatusSkipped ExecutionStatus = "skipped" )
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor runs workflows with full orchestration support
func NewExecutor ¶
func NewExecutor(config ExecutorConfig) *Executor
NewExecutor creates a new workflow executor
func (*Executor) GetState ¶
func (e *Executor) GetState() *ExecutionState
GetState returns the current execution state (for monitoring)
func (*Executor) Resume ¶
func (e *Executor) Resume(ctx context.Context, workflow *Workflow, prior *ExecutionState, progress chan<- ProgressEvent) (*ExecutionState, error)
Resume continues execution from a previously persisted state.
func (*Executor) Run ¶
func (e *Executor) Run(ctx context.Context, workflow *Workflow, vars map[string]interface{}, progress chan<- ProgressEvent) (*ExecutionState, error)
Run executes a workflow with the given initial variables. Returns the final execution state and any fatal error. Progress events are sent to the provided channel if non-nil.
func (*Executor) SetNotifier ¶
SetNotifier sets the notifier for sending notifications on workflow events.
func (*Executor) Validate ¶
func (e *Executor) Validate(workflow *Workflow) ValidationResult
Validate validates a workflow without executing it
type ExecutorConfig ¶
type ExecutorConfig struct {
Session string // Required: tmux session name
ProjectDir string // Optional: project root for .ntm state
WorkflowFile string // Optional: workflow file path for state persistence
DefaultTimeout time.Duration // Default step timeout (default: 5m)
GlobalTimeout time.Duration // Maximum workflow runtime (default: 30m)
ProgressInterval time.Duration // Interval for progress updates (default: 1s)
DryRun bool // If true, validate but don't execute
Verbose bool // Enable verbose logging
RunID string // Optional: pre-generated run ID (if empty, one is generated)
}
ExecutorConfig configures the executor behavior
func DefaultExecutorConfig ¶
func DefaultExecutorConfig(session string) ExecutorConfig
DefaultExecutorConfig returns sensible defaults
type LoopConfig ¶
type LoopConfig struct {
// For-each loop: iterate over array
Items string `yaml:"items,omitempty" toml:"items,omitempty" json:"items,omitempty"` // Expression for array (e.g., ${vars.files})
As string `yaml:"as,omitempty" toml:"as,omitempty" json:"as,omitempty"` // Loop variable name (default: "item")
// While loop: repeat until condition is false
While string `yaml:"while,omitempty" toml:"while,omitempty" json:"while,omitempty"` // Condition expression
// Times loop: repeat N times
Times int `yaml:"times,omitempty" toml:"times,omitempty" json:"times,omitempty"` // Number of iterations
// Safety and timing
MaxIterations int `yaml:"max_iterations,omitempty" toml:"max_iterations,omitempty" json:"max_iterations,omitempty"` // Safety limit (default: 100, required for while loops)
Delay Duration `yaml:"delay,omitempty" toml:"delay,omitempty" json:"delay,omitempty"` // Delay between iterations
// Result collection
Collect string `yaml:"collect,omitempty" toml:"collect,omitempty" json:"collect,omitempty"` // Variable name to store array of results
// Steps to execute per iteration
Steps []Step `yaml:"steps,omitempty" toml:"steps,omitempty" json:"steps,omitempty"`
}
LoopConfig defines loop iteration settings for for-each, while, and times loops
type LoopContext ¶
type LoopContext struct {
VarName string // The "as" variable name
Item interface{} // Current item value
Index int // 0-based iteration index
Count int // Total number of items
First bool // True if first iteration
Last bool // True if last iteration
}
LoopContext holds the current state of loop iteration
type LoopControl ¶
type LoopControl string
LoopControl defines special control flow within loops
const ( LoopControlNone LoopControl = "" // Normal execution LoopControlBreak LoopControl = "break" // Exit loop early LoopControlContinue LoopControl = "continue" // Skip to next iteration )
type LoopExecutor ¶
type LoopExecutor struct {
// contains filtered or unexported fields
}
LoopExecutor handles execution of loop constructs within workflows.
func NewLoopExecutor ¶
func NewLoopExecutor(executor *Executor) *LoopExecutor
NewLoopExecutor creates a new loop executor for the given workflow executor.
func (*LoopExecutor) ExecuteLoop ¶
func (le *LoopExecutor) ExecuteLoop(ctx context.Context, step *Step, workflow *Workflow) LoopResult
ExecuteLoop executes a loop step and returns the aggregated result.
type LoopResult ¶
type LoopResult struct {
Status ExecutionStatus
Iterations int
Results []StepResult // Individual iteration results
Collected []interface{} // Collected outputs if Collect is specified
Error *StepError
BreakReason string // Non-empty if loop exited via break
FinishedAt time.Time
}
LoopResult contains the result of loop execution.
type NotificationChannel ¶
type NotificationChannel string
NotificationChannel represents a channel for sending notifications.
const ( ChannelDesktop NotificationChannel = "desktop" ChannelWebhook NotificationChannel = "webhook" ChannelMail NotificationChannel = "mail" )
type NotificationEvent ¶
type NotificationEvent string
NotificationEvent represents a pipeline event to notify about.
const ( NotifyStarted NotificationEvent = "started" NotifyCompleted NotificationEvent = "completed" NotifyFailed NotificationEvent = "failed" NotifyCancelled NotificationEvent = "cancelled" NotifyStepError NotificationEvent = "step_error" )
type NotificationPayload ¶
type NotificationPayload struct {
Event NotificationEvent `json:"event"`
WorkflowName string `json:"workflow_name"`
RunID string `json:"run_id"`
Session string `json:"session,omitempty"`
Status ExecutionStatus `json:"status"`
Duration time.Duration `json:"duration,omitempty"`
StepsTotal int `json:"steps_total"`
StepsDone int `json:"steps_done"`
StepsFailed int `json:"steps_failed"`
Error string `json:"error,omitempty"`
FailedStep string `json:"failed_step,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
NotificationPayload contains data for a notification.
func BuildPayloadFromState ¶
func BuildPayloadFromState(state *ExecutionState, workflow *Workflow, event NotificationEvent) NotificationPayload
BuildPayloadFromState creates a NotificationPayload from execution state.
type Notifier ¶
type Notifier struct {
// contains filtered or unexported fields
}
Notifier sends notifications for pipeline events.
func NewNotifier ¶
func NewNotifier(cfg NotifierConfig) *Notifier
NewNotifier creates a new notifier with the given configuration.
func NewNotifierFromSettings ¶
func NewNotifierFromSettings(settings WorkflowSettings, mailClient *agentmail.Client, projectKey, agentName string) *Notifier
NewNotifierFromSettings creates a notifier from workflow settings.
type NotifierConfig ¶
type NotifierConfig struct {
Channels []string
WebhookURL string
MailRecipient string
MailClient *agentmail.Client
ProjectKey string
AgentName string
}
NotifierConfig configures the notifier.
type OutputParse ¶
type OutputParse struct {
Type string `yaml:"type,omitempty" toml:"type,omitempty" json:"type,omitempty"` // none, json, yaml, lines, first_line, regex
Pattern string `yaml:"pattern,omitempty" toml:"pattern,omitempty" json:"pattern,omitempty"` // For regex type
}
OutputParse defines how to parse step output
func (*OutputParse) UnmarshalText ¶
func (o *OutputParse) UnmarshalText(text []byte) error
UnmarshalText allows OutputParse to be specified as a simple string
type OutputParser ¶
type OutputParser struct{}
OutputParser handles parsing of step outputs into structured data.
func NewOutputParser ¶
func NewOutputParser() *OutputParser
NewOutputParser creates a new output parser.
func (*OutputParser) Parse ¶
func (p *OutputParser) Parse(output string, config OutputParse) (interface{}, error)
Parse parses output according to the parse configuration.
type ParallelGroupResult ¶
type ParallelGroupResult struct {
Completed []StepResult `json:"completed"`
Failed []StepResult `json:"failed,omitempty"`
Partial bool `json:"partial"` // Some succeeded, some failed
}
ParallelGroupResult contains results from a parallel execution group
type ParseError ¶
type ParseError struct {
File string `json:"file,omitempty"`
Line int `json:"line,omitempty"`
Field string `json:"field,omitempty"`
Message string `json:"message"`
Hint string `json:"hint,omitempty"`
}
ParseError represents a validation or parsing error with location info
func (ParseError) Error ¶
func (e ParseError) Error() string
type PipelineCancelOutput ¶
type PipelineCancelOutput struct {
RobotResponse
RunID string `json:"run_id"`
Status string `json:"status"`
Message string `json:"message,omitempty"`
}
PipelineCancelOutput is the response for --robot-pipeline-cancel
type PipelineExecution ¶
type PipelineExecution struct {
RunID string `json:"run_id"`
WorkflowID string `json:"workflow_id"`
Session string `json:"session"`
Status string `json:"status"`
StartedAt time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
CurrentStep string `json:"current_step,omitempty"`
Progress PipelineProgress `json:"progress"`
Steps map[string]PipelineStep `json:"steps"`
Error string `json:"error,omitempty"`
// contains filtered or unexported fields
}
PipelineExecution tracks a running pipeline
func GetAllPipelines ¶
func GetAllPipelines() []*PipelineExecution
GetAllPipelines returns all tracked pipelines (exported for CLI)
func GetPipelineExecution ¶
func GetPipelineExecution(runID string) *PipelineExecution
GetPipelineExecution returns a pipeline by run ID (exported for CLI)
type PipelineHints ¶
type PipelineHints struct {
Summary string `json:"summary"`
NextAction string `json:"next_action,omitempty"`
StatusCmd string `json:"status_cmd,omitempty"`
CancelCmd string `json:"cancel_cmd,omitempty"`
Suggestions []string `json:"suggestions,omitempty"`
}
PipelineHints provides guidance for AI agents
type PipelineListOutput ¶
type PipelineListOutput struct {
RobotResponse
Pipelines []PipelineSummary `json:"pipelines"`
AgentHints *PipelineHints `json:"_agent_hints,omitempty"`
}
PipelineListOutput is the response for --robot-pipeline-list
type PipelineProgress ¶
type PipelineProgress struct {
Completed int `json:"completed"`
Running int `json:"running"`
Pending int `json:"pending"`
Failed int `json:"failed"`
Skipped int `json:"skipped"`
Total int `json:"total"`
Percent float64 `json:"percent"`
}
PipelineProgress tracks overall progress
type PipelineRunOptions ¶
type PipelineRunOptions struct {
WorkflowFile string // Path to workflow YAML/TOML file
Session string // Tmux session name
Variables map[string]interface{} // Runtime variables
DryRun bool // Validate without executing
Background bool // Run in background
}
PipelineRunOptions configures a pipeline run
type PipelineRunOutput ¶
type PipelineRunOutput struct {
RobotResponse
RunID string `json:"run_id"`
WorkflowID string `json:"workflow_id"`
Session string `json:"session"`
Status string `json:"status"`
DryRun bool `json:"dry_run,omitempty"`
Progress PipelineProgress `json:"progress,omitempty"`
AgentHints *PipelineHints `json:"_agent_hints,omitempty"`
}
PipelineRunOutput is the response for --robot-pipeline-run
type PipelineStatusOutput ¶
type PipelineStatusOutput struct {
RobotResponse
RunID string `json:"run_id"`
WorkflowID string `json:"workflow_id"`
Session string `json:"session"`
Status string `json:"status"`
StartedAt string `json:"started_at"`
FinishedAt string `json:"finished_at,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
CurrentStep string `json:"current_step,omitempty"`
Progress PipelineProgress `json:"progress"`
Steps map[string]PipelineStep `json:"steps"`
Error string `json:"error,omitempty"`
AgentHints *PipelineHints `json:"_agent_hints,omitempty"`
}
PipelineStatusOutput is the response for --robot-pipeline=run-id
type PipelineStep ¶
type PipelineStep struct {
ID string `json:"id"`
Status string `json:"status"`
Agent string `json:"agent,omitempty"`
PaneUsed string `json:"pane_used,omitempty"`
StartedAt string `json:"started_at,omitempty"`
FinishedAt string `json:"finished_at,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
OutputLines int `json:"output_lines,omitempty"`
Error string `json:"error,omitempty"`
}
PipelineStep represents step status in pipeline output
type PipelineSummary ¶
type PipelineSummary struct {
RunID string `json:"run_id"`
WorkflowID string `json:"workflow_id"`
Session string `json:"session"`
Status string `json:"status"`
StartedAt string `json:"started_at"`
FinishedAt string `json:"finished_at,omitempty"`
Progress PipelineProgress `json:"progress"`
}
PipelineSummary is a brief summary for listing
type ProgressEvent ¶
type ProgressEvent struct {
Type string `json:"type"` // step_start, step_complete, step_error, parallel_start, workflow_complete
StepID string `json:"step_id,omitempty"`
Message string `json:"message"`
Progress float64 `json:"progress"` // 0.0 - 1.0
Timestamp time.Time `json:"timestamp"`
}
ProgressEvent is emitted during workflow execution for monitoring
type RobotResponse ¶
type RobotResponse struct {
Success bool `json:"success"`
Timestamp string `json:"timestamp"`
Error string `json:"error,omitempty"`
ErrorCode string `json:"error_code,omitempty"`
Hint string `json:"hint,omitempty"`
}
RobotResponse is the base structure for robot command outputs
func NewErrorResponse ¶
func NewErrorResponse(err error, code string, hint string) RobotResponse
NewErrorResponse creates an error robot response
func NewRobotResponse ¶
func NewRobotResponse(success bool) RobotResponse
NewRobotResponse creates a new robot response
type RoutingStrategy ¶
type RoutingStrategy string
RoutingStrategy defines how to select an agent for a step
const ( RouteLeastLoaded RoutingStrategy = "least-loaded" RouteFirstAvailable RoutingStrategy = "first-available" RouteRoundRobin RoutingStrategy = "round-robin" )
type Step ¶
type Step struct {
// Identity
ID string `yaml:"id" toml:"id" json:"id"` // Required, unique identifier
Name string `yaml:"name,omitempty" toml:"name,omitempty" json:"name,omitempty"` // Human-readable name
// Agent selection (choose one)
Agent string `yaml:"agent,omitempty" toml:"agent,omitempty" json:"agent,omitempty"` // Agent type: claude, codex, gemini
Pane int `yaml:"pane,omitempty" toml:"pane,omitempty" json:"pane,omitempty"` // Specific pane index
Route RoutingStrategy `yaml:"route,omitempty" toml:"route,omitempty" json:"route,omitempty"` // Routing strategy
// Prompt (choose one)
Prompt string `yaml:"prompt,omitempty" toml:"prompt,omitempty" json:"prompt,omitempty"`
PromptFile string `yaml:"prompt_file,omitempty" toml:"prompt_file,omitempty" json:"prompt_file,omitempty"`
// Wait configuration
Wait WaitCondition `yaml:"wait,omitempty" toml:"wait,omitempty" json:"wait,omitempty"` // completion, idle, time, none
Timeout Duration `yaml:"timeout,omitempty" toml:"timeout,omitempty" json:"timeout,omitempty"`
// Dependencies
DependsOn []string `yaml:"depends_on,omitempty" toml:"depends_on,omitempty" json:"depends_on,omitempty"`
// Error handling
OnError ErrorAction `yaml:"on_error,omitempty" toml:"on_error,omitempty" json:"on_error,omitempty"`
RetryCount int `yaml:"retry_count,omitempty" toml:"retry_count,omitempty" json:"retry_count,omitempty"`
RetryDelay Duration `yaml:"retry_delay,omitempty" toml:"retry_delay,omitempty" json:"retry_delay,omitempty"`
RetryBackoff string `yaml:"retry_backoff,omitempty" toml:"retry_backoff,omitempty" json:"retry_backoff,omitempty"` // linear, exponential, none
// Conditionals
When string `yaml:"when,omitempty" toml:"when,omitempty" json:"when,omitempty"` // Skip if evaluates to false
// Output handling
OutputVar string `yaml:"output_var,omitempty" toml:"output_var,omitempty" json:"output_var,omitempty"` // Store output in variable
OutputParse OutputParse `yaml:"output_parse,omitempty" toml:"output_parse,omitempty" json:"output_parse,omitempty"` // none, json, yaml, lines, first_line, regex
// Parallel execution (mutually exclusive with Prompt)
Parallel []Step `yaml:"parallel,omitempty" toml:"parallel,omitempty" json:"parallel,omitempty"`
// Loop execution
Loop *LoopConfig `yaml:"loop,omitempty" toml:"loop,omitempty" json:"loop,omitempty"`
// Loop control: break or continue (only valid inside loops)
LoopControl LoopControl `yaml:"loop_control,omitempty" toml:"loop_control,omitempty" json:"loop_control,omitempty"`
}
Step represents a single step in the workflow
type StepError ¶
type StepError struct {
Type string `json:"type"` // timeout, agent_error, crash, validation, routing, send, capture
Message string `json:"message"`
Details string `json:"details,omitempty"` // Full error output
PaneOutput string `json:"pane_output,omitempty"` // Last N lines from pane for debugging
AgentState string `json:"agent_state,omitempty"` // Agent state at time of error
Attempt int `json:"attempt,omitempty"` // Which retry attempt
Timestamp time.Time `json:"timestamp"`
}
StepError contains detailed error information for a failed step
type StepResult ¶
type StepResult struct {
StepID string `json:"step_id"`
Status ExecutionStatus `json:"status"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
PaneUsed string `json:"pane_used,omitempty"`
AgentType string `json:"agent_type,omitempty"`
Output string `json:"output,omitempty"`
ParsedData interface{} `json:"parsed_data,omitempty"` // Result of output_parse
Error *StepError `json:"error,omitempty"`
SkipReason string `json:"skip_reason,omitempty"` // If skipped due to 'when' condition
Attempts int `json:"attempts,omitempty"` // Number of retry attempts
}
StepResult contains the result of executing a step
type SubstitutionError ¶
type SubstitutionError struct {
VarRef string // The variable reference that failed
Message string // Error description
}
SubstitutionError represents an error during variable substitution
func (*SubstitutionError) Error ¶
func (e *SubstitutionError) Error() string
type Substitutor ¶
type Substitutor struct {
// contains filtered or unexported fields
}
Substitutor handles variable substitution in workflow prompts and conditions. It supports multiple variable types: vars, steps, env, and context variables.
func NewSubstitutor ¶
func NewSubstitutor(state *ExecutionState, session, workflow string) *Substitutor
NewSubstitutor creates a new substitutor with the given execution context.
func (*Substitutor) Substitute ¶
func (s *Substitutor) Substitute(template string) (string, error)
Substitute replaces all ${...} variable references in the template string. Returns the substituted string and any errors encountered.
func (*Substitutor) SubstituteStrict ¶
func (s *Substitutor) SubstituteStrict(template string) (string, error)
SubstituteStrict is like Substitute but returns an error if any variable is undefined.
type ValidationResult ¶
type ValidationResult struct {
Valid bool `json:"valid"`
Errors []ParseError `json:"errors,omitempty"`
Warnings []ParseError `json:"warnings,omitempty"`
}
ValidationResult contains the result of validating a workflow
func Validate ¶
func Validate(w *Workflow) ValidationResult
Validate validates a workflow and returns all errors found
type VarDef ¶
type VarDef struct {
Description string `yaml:"description,omitempty" toml:"description,omitempty" json:"description,omitempty"`
Required bool `yaml:"required,omitempty" toml:"required,omitempty" json:"required,omitempty"`
Default interface{} `yaml:"default,omitempty" toml:"default,omitempty" json:"default,omitempty"`
Type VarType `yaml:"type,omitempty" toml:"type,omitempty" json:"type,omitempty"` // string, number, boolean, array
}
VarDef defines a workflow variable with optional default and type info
type VariableContext ¶
type VariableContext struct {
Vars map[string]interface{}
Steps map[string]StepResult
Session string
RunID string
Workflow string
}
VariableContext provides access to workflow variables
func (*VariableContext) EvaluateString ¶
func (vc *VariableContext) EvaluateString(s string) string
EvaluateString evaluates all variable references in a string
func (*VariableContext) GetVariable ¶
func (vc *VariableContext) GetVariable(ref string) (interface{}, bool)
GetVariable retrieves a variable by reference path
func (*VariableContext) SetVariable ¶
func (vc *VariableContext) SetVariable(name string, value interface{})
SetVariable sets a variable value
type WaitCondition ¶
type WaitCondition string
WaitCondition defines when a step is considered complete
const ( WaitCompletion WaitCondition = "completion" // Wait for agent to return to idle WaitIdle WaitCondition = "idle" // Same as completion WaitTime WaitCondition = "time" // Wait for specified timeout only WaitNone WaitCondition = "none" // Fire and forget )
type Workflow ¶
type Workflow struct {
// Metadata
SchemaVersion string `yaml:"schema_version" toml:"schema_version" json:"schema_version"`
Name string `yaml:"name" toml:"name" json:"name"`
Description string `yaml:"description,omitempty" toml:"description,omitempty" json:"description,omitempty"`
Version string `yaml:"version,omitempty" toml:"version,omitempty" json:"version,omitempty"`
// Variable definitions
Vars map[string]VarDef `yaml:"vars,omitempty" toml:"vars,omitempty" json:"vars,omitempty"`
// Global settings
Settings WorkflowSettings `yaml:"settings,omitempty" toml:"settings,omitempty" json:"settings,omitempty"`
// Step definitions
Steps []Step `yaml:"steps" toml:"steps" json:"steps"`
}
Workflow represents a complete workflow definition loaded from YAML/TOML
type WorkflowSettings ¶
type WorkflowSettings struct {
Timeout Duration `yaml:"timeout,omitempty" toml:"timeout,omitempty" json:"timeout,omitempty"` // Global timeout (e.g., "30m")
OnError ErrorAction `yaml:"on_error,omitempty" toml:"on_error,omitempty" json:"on_error,omitempty"` // fail, continue
NotifyOnComplete bool `yaml:"notify_on_complete,omitempty" toml:"notify_on_complete,omitempty" json:"notify_on_complete,omitempty"`
NotifyOnError bool `yaml:"notify_on_error,omitempty" toml:"notify_on_error,omitempty" json:"notify_on_error,omitempty"`
NotifyChannels []string `yaml:"notify_channels,omitempty" toml:"notify_channels,omitempty" json:"notify_channels,omitempty"` // desktop, webhook, mail
WebhookURL string `yaml:"webhook_url,omitempty" toml:"webhook_url,omitempty" json:"webhook_url,omitempty"`
MailRecipient string `yaml:"mail_recipient,omitempty" toml:"mail_recipient,omitempty" json:"mail_recipient,omitempty"`
}
WorkflowSettings contains global workflow configuration
func DefaultWorkflowSettings ¶
func DefaultWorkflowSettings() WorkflowSettings
DefaultWorkflowSettings returns sensible defaults for workflow settings