pipeline

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: MIT Imports: 27 Imported by: 0

Documentation

Overview

Package pipeline provides workflow execution for AI agent orchestration. conditions.go implements conditional expression evaluation for workflow steps.

Package pipeline provides workflow execution for AI agent orchestration. executor.go implements the core execution engine for running workflows.

Package pipeline provides workflow execution for AI agent orchestration. loops.go implements loop constructs for workflow steps: for-each, while, and times loops.

Package pipeline provides workflow execution for AI agent orchestration. notify.go implements notifications for pipeline events.

Package pipeline provides workflow execution for AI agent orchestration. robot.go implements the --robot-pipeline-* APIs for machine-readable output.

Package pipeline provides workflow execution for AI agent orchestration. variables.go implements variable substitution and output parsing for workflows.

Index

Constants

View Source
const (
	ErrCodeInvalidFlag     = "INVALID_FLAG"
	ErrCodeSessionNotFound = "SESSION_NOT_FOUND"
	ErrCodeInternalError   = "INTERNAL_ERROR"
)

Robot error codes

View Source
const DefaultMaxIterations = 100

DefaultMaxIterations is the default safety limit for loops

View Source
const MinProgressInterval = 100 * time.Millisecond

MinProgressInterval is the minimum allowed progress interval to prevent ticker panics. time.NewTicker requires a positive duration.

View Source
const SchemaVersion = "2.0"

SchemaVersion is the current workflow schema version

Variables

View Source
var AgentTypeAliases = map[string]string{
	"claude":      "claude",
	"cc":          "claude",
	"claude-code": "claude",
	"codex":       "codex",
	"cod":         "codex",
	"openai":      "codex",
	"gemini":      "gemini",
	"gmi":         "gemini",
	"google":      "gemini",
}

AgentTypeAliases maps various agent type names to canonical forms

Functions

func CancelPipeline

func CancelPipeline(runID string)

CancelPipeline cancels a running pipeline by run ID (exported for REST API)

func CleanupStates

func CleanupStates(projectDir string, olderThan time.Duration) (int, error)

CleanupStates removes pipeline state files older than the provided duration. Returns the number of deleted state files.

func ClearLoopVars

func ClearLoopVars(state *ExecutionState, varName string)

ClearLoopVars removes loop context variables from execution state.

func ClearPipelineRegistry

func ClearPipelineRegistry()

ClearPipelineRegistry clears the pipeline registry (for testing)

func EvaluateCondition

func EvaluateCondition(condition string, sub *Substitutor) (skip bool, err error)

EvaluateCondition is a convenience function that evaluates a condition string using a substitutor. Returns true if the step should be SKIPPED. This maintains backward compatibility with the original evaluateCondition API.

func Execute

func Execute(ctx context.Context, p Pipeline) error

Execute runs the pipeline stages sequentially

func GenerateRunID

func GenerateRunID() string

GenerateRunID creates a unique run ID using timestamp and random bytes (exported)

func IsValidAgentType

func IsValidAgentType(t string) bool

IsValidAgentType checks if the given agent type is recognized. Case-insensitive: "Claude", "CLAUDE", "claude" are all valid.

func LoadAndValidate

func LoadAndValidate(path string) (*Workflow, ValidationResult, error)

LoadAndValidate is a convenience function that parses and validates a workflow file

func NormalizeAgentType

func NormalizeAgentType(t string) string

NormalizeAgentType converts agent type aliases to canonical form. Case-insensitive: "Claude", "CLAUDE", "claude" all normalize to "claude".

func ParseBool

func ParseBool(s string) bool

ParseBool parses a string as boolean

func ParseInt

func ParseInt(s string, defaultVal int) int

ParseInt parses a string as integer with default

func ParsePipelineVars

func ParsePipelineVars(varsJSON string) (map[string]interface{}, error)

ParsePipelineVars parses JSON variable string into map

func PrintPipelineCancel

func PrintPipelineCancel(runID string) int

PrintPipelineCancel cancels a running pipeline

func PrintPipelineList

func PrintPipelineList() int

PrintPipelineList outputs all tracked pipelines

func PrintPipelineRun

func PrintPipelineRun(opts PipelineRunOptions) int

PrintPipelineRun starts a pipeline and returns status

func PrintPipelineStatus

func PrintPipelineStatus(runID string) int

PrintPipelineStatus outputs the status of a running/completed pipeline

func RegisterPipeline

func RegisterPipeline(exec *PipelineExecution)

RegisterPipeline registers a pipeline execution (exported for CLI)

func SaveState

func SaveState(projectDir string, state *ExecutionState) error

SaveState persists the execution state to .ntm/pipelines/<run-id>.json.

func SetLoopVars

func SetLoopVars(state *ExecutionState, varName string, item interface{}, index, total int)

SetLoopVars sets loop context variables in the execution state.

func ShouldNotify

func ShouldNotify(settings WorkflowSettings, event NotificationEvent) bool

ShouldNotify checks if a notification should be sent for the given event.

func StoreStepOutput

func StoreStepOutput(state *ExecutionState, stepID string, output string, parsedData interface{})

StoreStepOutput stores a step's output in the execution state for variable access. Note: Caller must hold any necessary locks on state.Variables if used concurrently.

func UpdatePipelineFromState

func UpdatePipelineFromState(runID string, state *ExecutionState)

UpdatePipelineFromState updates a registered pipeline from execution state (exported for CLI)

func ValidateCondition

func ValidateCondition(condition string) []string

ValidateCondition checks if a condition expression has valid syntax. Returns a list of issues found (empty if valid).

func ValidateVarRefs

func ValidateVarRefs(template string, availableVars []string) []string

ValidateVarRefs validates that all variable references in a string are valid. Returns a list of invalid references.

Types

type ConditionEvaluator

type ConditionEvaluator struct {
	// contains filtered or unexported fields
}

ConditionEvaluator evaluates conditional expressions for workflow steps. It supports boolean, equality, comparison, contains, and logical operators.

func NewConditionEvaluator

func NewConditionEvaluator(sub *Substitutor) *ConditionEvaluator

NewConditionEvaluator creates a new condition evaluator with the given substitutor.

func (*ConditionEvaluator) Evaluate

func (e *ConditionEvaluator) Evaluate(condition string) (ConditionResult, error)

Evaluate evaluates a condition expression. Returns ConditionResult with Value=true if the condition is met (step should run), Value=false if condition is not met (step should be skipped).

type ConditionResult

type ConditionResult struct {
	Value  bool   // True if condition is met (step should RUN)
	Skip   bool   // True if step should be skipped (inverse of Value)
	Reason string // Human-readable explanation
}

ConditionResult contains the result of condition evaluation.

type DependencyError

type DependencyError struct {
	Type    string   `json:"type"`  // cycle, missing_dep, unreachable
	Steps   []string `json:"steps"` // affected step IDs
	Message string   `json:"message"`
}

DependencyError represents an error in the dependency graph

func (DependencyError) Error

func (e DependencyError) Error() string

type DependencyGraph

type DependencyGraph struct {
	// contains filtered or unexported fields
}

DependencyGraph represents step dependencies for execution ordering

func NewDependencyGraph

func NewDependencyGraph(workflow *Workflow) *DependencyGraph

NewDependencyGraph creates a dependency graph from workflow steps

func (*DependencyGraph) ExecutedCount

func (g *DependencyGraph) ExecutedCount() int

ExecutedCount returns the number of executed steps.

func (*DependencyGraph) GetDependencies

func (g *DependencyGraph) GetDependencies(id string) []string

GetDependencies returns the dependencies for a step

func (*DependencyGraph) GetDependents

func (g *DependencyGraph) GetDependents(id string) []string

GetDependents returns steps that depend on the given step

func (*DependencyGraph) GetFailedDependencies

func (g *DependencyGraph) GetFailedDependencies(id string) []string

GetFailedDependencies returns the list of failed dependencies for a step

func (*DependencyGraph) GetReadySteps

func (g *DependencyGraph) GetReadySteps() []string

GetReadySteps returns steps that are ready to execute

func (*DependencyGraph) GetStep

func (g *DependencyGraph) GetStep(id string) (*Step, bool)

GetStep returns a step by ID

func (*DependencyGraph) HasFailedDependency

func (g *DependencyGraph) HasFailedDependency(id string) bool

HasFailedDependency returns true if any of the step's dependencies failed

func (*DependencyGraph) IsExecuted

func (g *DependencyGraph) IsExecuted(id string) bool

IsExecuted returns whether a step has been executed

func (*DependencyGraph) IsFailed

func (g *DependencyGraph) IsFailed(id string) bool

IsFailed returns whether a step has failed

func (*DependencyGraph) MarkExecuted

func (g *DependencyGraph) MarkExecuted(id string) error

MarkExecuted marks a step as executed

func (*DependencyGraph) MarkFailed

func (g *DependencyGraph) MarkFailed(id string) error

MarkFailed marks a step as failed (for CONTINUE mode dependency tracking)

func (*DependencyGraph) Resolve

func (g *DependencyGraph) Resolve() ExecutionPlan

Resolve performs topological sort and returns execution plan

func (*DependencyGraph) Size

func (g *DependencyGraph) Size() int

Size returns the number of steps in the graph

func (*DependencyGraph) Validate

func (g *DependencyGraph) Validate() []DependencyError

Validate checks the dependency graph for errors

type Duration

type Duration struct {
	time.Duration
}

Duration is a wrapper for time.Duration that supports YAML/TOML/JSON parsing

func DefaultStepTimeout

func DefaultStepTimeout() Duration

DefaultStepTimeout returns the default timeout for a step

func (Duration) MarshalText

func (d Duration) MarshalText() ([]byte, error)

MarshalText implements encoding.TextMarshaler for Duration

func (*Duration) UnmarshalText

func (d *Duration) UnmarshalText(text []byte) error

UnmarshalText implements encoding.TextUnmarshaler for Duration

type ErrLoopBreak

type ErrLoopBreak struct {
	Reason string
}

ErrLoopBreak is returned when a loop is exited via break control.

func (*ErrLoopBreak) Error

func (e *ErrLoopBreak) Error() string

type ErrLoopContinue

type ErrLoopContinue struct{}

ErrLoopContinue is returned when an iteration should be skipped.

func (*ErrLoopContinue) Error

func (e *ErrLoopContinue) Error() string

type ErrMaxIterations

type ErrMaxIterations struct {
	Limit int
}

ErrMaxIterations is returned when the max iterations limit is reached.

func (*ErrMaxIterations) Error

func (e *ErrMaxIterations) Error() string

type ErrorAction

type ErrorAction string

ErrorAction defines how to handle step errors

const (
	ErrorActionFail     ErrorAction = "fail"      // Wait for all, report all errors
	ErrorActionFailFast ErrorAction = "fail_fast" // Cancel remaining on first error
	ErrorActionContinue ErrorAction = "continue"  // Ignore errors, continue workflow
	ErrorActionRetry    ErrorAction = "retry"     // Retry failed steps
)

type ExecutionError

type ExecutionError struct {
	StepID    string    `json:"step_id,omitempty"`
	Type      string    `json:"type"`
	Message   string    `json:"message"`
	Timestamp time.Time `json:"timestamp"`
	Fatal     bool      `json:"fatal"`
}

ExecutionError represents an error that occurred during execution

type ExecutionPlan

type ExecutionPlan struct {
	Order  []string          `json:"order"`  // Step IDs in execution order
	Levels [][]string        `json:"levels"` // Parallelizable levels
	Errors []DependencyError `json:"errors,omitempty"`
	Valid  bool              `json:"valid"`
}

ExecutionPlan contains the resolved execution order

func ResolveWorkflow

func ResolveWorkflow(workflow *Workflow) ExecutionPlan

ResolveWorkflow is a convenience function to create a graph and resolve it

type ExecutionState

type ExecutionState struct {
	RunID        string                 `json:"run_id"`
	WorkflowID   string                 `json:"workflow_id"`
	WorkflowFile string                 `json:"workflow_file,omitempty"`
	Session      string                 `json:"session,omitempty"`
	Status       ExecutionStatus        `json:"status"`
	StartedAt    time.Time              `json:"started_at"`
	UpdatedAt    time.Time              `json:"updated_at"`
	FinishedAt   time.Time              `json:"finished_at,omitempty"`
	CurrentStep  string                 `json:"current_step,omitempty"`
	Steps        map[string]StepResult  `json:"steps"`
	Variables    map[string]interface{} `json:"variables"` // Runtime variables including step outputs
	Errors       []ExecutionError       `json:"errors,omitempty"`
}

ExecutionState contains the complete state of a workflow execution

func LoadState

func LoadState(projectDir, runID string) (*ExecutionState, error)

LoadState loads execution state from .ntm/pipelines/<run-id>.json.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus represents the current state of workflow execution

const (
	StatusPending   ExecutionStatus = "pending"
	StatusRunning   ExecutionStatus = "running"
	StatusPaused    ExecutionStatus = "paused"
	StatusCompleted ExecutionStatus = "completed"
	StatusFailed    ExecutionStatus = "failed"
	StatusCancelled ExecutionStatus = "cancelled"
	StatusSkipped   ExecutionStatus = "skipped"
)

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor runs workflows with full orchestration support

func NewExecutor

func NewExecutor(config ExecutorConfig) *Executor

NewExecutor creates a new workflow executor

func (*Executor) Cancel

func (e *Executor) Cancel()

Cancel cancels the current execution

func (*Executor) GetState

func (e *Executor) GetState() *ExecutionState

GetState returns the current execution state (for monitoring)

func (*Executor) Resume

func (e *Executor) Resume(ctx context.Context, workflow *Workflow, prior *ExecutionState, progress chan<- ProgressEvent) (*ExecutionState, error)

Resume continues execution from a previously persisted state.

func (*Executor) Run

func (e *Executor) Run(ctx context.Context, workflow *Workflow, vars map[string]interface{}, progress chan<- ProgressEvent) (*ExecutionState, error)

Run executes a workflow with the given initial variables. Returns the final execution state and any fatal error. Progress events are sent to the provided channel if non-nil.

func (*Executor) SetNotifier

func (e *Executor) SetNotifier(n *Notifier)

SetNotifier sets the notifier for sending notifications on workflow events.

func (*Executor) Validate

func (e *Executor) Validate(workflow *Workflow) ValidationResult

Validate validates a workflow without executing it

type ExecutorConfig

type ExecutorConfig struct {
	Session          string        // Required: tmux session name
	ProjectDir       string        // Optional: project root for .ntm state
	WorkflowFile     string        // Optional: workflow file path for state persistence
	DefaultTimeout   time.Duration // Default step timeout (default: 5m)
	GlobalTimeout    time.Duration // Maximum workflow runtime (default: 30m)
	ProgressInterval time.Duration // Interval for progress updates (default: 1s)
	DryRun           bool          // If true, validate but don't execute
	Verbose          bool          // Enable verbose logging
	RunID            string        // Optional: pre-generated run ID (if empty, one is generated)
}

ExecutorConfig configures the executor behavior

func DefaultExecutorConfig

func DefaultExecutorConfig(session string) ExecutorConfig

DefaultExecutorConfig returns sensible defaults

type LoopConfig

type LoopConfig struct {
	// For-each loop: iterate over array
	Items string `yaml:"items,omitempty" toml:"items,omitempty" json:"items,omitempty"` // Expression for array (e.g., ${vars.files})
	As    string `yaml:"as,omitempty" toml:"as,omitempty" json:"as,omitempty"`          // Loop variable name (default: "item")

	// While loop: repeat until condition is false
	While string `yaml:"while,omitempty" toml:"while,omitempty" json:"while,omitempty"` // Condition expression

	// Times loop: repeat N times
	Times int `yaml:"times,omitempty" toml:"times,omitempty" json:"times,omitempty"` // Number of iterations

	// Safety and timing
	MaxIterations int      `yaml:"max_iterations,omitempty" toml:"max_iterations,omitempty" json:"max_iterations,omitempty"` // Safety limit (default: 100, required for while loops)
	Delay         Duration `yaml:"delay,omitempty" toml:"delay,omitempty" json:"delay,omitempty"`                            // Delay between iterations

	// Result collection
	Collect string `yaml:"collect,omitempty" toml:"collect,omitempty" json:"collect,omitempty"` // Variable name to store array of results

	// Steps to execute per iteration
	Steps []Step `yaml:"steps,omitempty" toml:"steps,omitempty" json:"steps,omitempty"`
}

LoopConfig defines loop iteration settings for for-each, while, and times loops

type LoopContext

type LoopContext struct {
	VarName string      // The "as" variable name
	Item    interface{} // Current item value
	Index   int         // 0-based iteration index
	Count   int         // Total number of items
	First   bool        // True if first iteration
	Last    bool        // True if last iteration
}

LoopContext holds the current state of loop iteration

type LoopControl

type LoopControl string

LoopControl defines special control flow within loops

const (
	LoopControlNone     LoopControl = ""         // Normal execution
	LoopControlBreak    LoopControl = "break"    // Exit loop early
	LoopControlContinue LoopControl = "continue" // Skip to next iteration
)

type LoopExecutor

type LoopExecutor struct {
	// contains filtered or unexported fields
}

LoopExecutor handles execution of loop constructs within workflows.

func NewLoopExecutor

func NewLoopExecutor(executor *Executor) *LoopExecutor

NewLoopExecutor creates a new loop executor for the given workflow executor.

func (*LoopExecutor) ExecuteLoop

func (le *LoopExecutor) ExecuteLoop(ctx context.Context, step *Step, workflow *Workflow) LoopResult

ExecuteLoop executes a loop step and returns the aggregated result.

type LoopResult

type LoopResult struct {
	Status      ExecutionStatus
	Iterations  int
	Results     []StepResult  // Individual iteration results
	Collected   []interface{} // Collected outputs if Collect is specified
	Error       *StepError
	BreakReason string // Non-empty if loop exited via break
	FinishedAt  time.Time
}

LoopResult contains the result of loop execution.

type NotificationChannel

type NotificationChannel string

NotificationChannel represents a channel for sending notifications.

const (
	ChannelDesktop NotificationChannel = "desktop"
	ChannelWebhook NotificationChannel = "webhook"
	ChannelMail    NotificationChannel = "mail"
)

type NotificationEvent

type NotificationEvent string

NotificationEvent represents a pipeline event to notify about.

const (
	NotifyStarted   NotificationEvent = "started"
	NotifyCompleted NotificationEvent = "completed"
	NotifyFailed    NotificationEvent = "failed"
	NotifyCancelled NotificationEvent = "cancelled"
	NotifyStepError NotificationEvent = "step_error"
)

type NotificationPayload

type NotificationPayload struct {
	Event        NotificationEvent `json:"event"`
	WorkflowName string            `json:"workflow_name"`
	RunID        string            `json:"run_id"`
	Session      string            `json:"session,omitempty"`
	Status       ExecutionStatus   `json:"status"`
	Duration     time.Duration     `json:"duration,omitempty"`
	StepsTotal   int               `json:"steps_total"`
	StepsDone    int               `json:"steps_done"`
	StepsFailed  int               `json:"steps_failed"`
	Error        string            `json:"error,omitempty"`
	FailedStep   string            `json:"failed_step,omitempty"`
	Timestamp    time.Time         `json:"timestamp"`
}

NotificationPayload contains data for a notification.

func BuildPayloadFromState

func BuildPayloadFromState(state *ExecutionState, workflow *Workflow, event NotificationEvent) NotificationPayload

BuildPayloadFromState creates a NotificationPayload from execution state.

type Notifier

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier sends notifications for pipeline events.

func NewNotifier

func NewNotifier(cfg NotifierConfig) *Notifier

NewNotifier creates a new notifier with the given configuration.

func NewNotifierFromSettings

func NewNotifierFromSettings(settings WorkflowSettings, mailClient *agentmail.Client, projectKey, agentName string) *Notifier

NewNotifierFromSettings creates a notifier from workflow settings.

func (*Notifier) Notify

func (n *Notifier) Notify(ctx context.Context, payload NotificationPayload) error

Notify sends a notification to all configured channels.

type NotifierConfig

type NotifierConfig struct {
	Channels      []string
	WebhookURL    string
	MailRecipient string
	MailClient    *agentmail.Client
	ProjectKey    string
	AgentName     string
}

NotifierConfig configures the notifier.

type OutputParse

type OutputParse struct {
	Type    string `yaml:"type,omitempty" toml:"type,omitempty" json:"type,omitempty"`          // none, json, yaml, lines, first_line, regex
	Pattern string `yaml:"pattern,omitempty" toml:"pattern,omitempty" json:"pattern,omitempty"` // For regex type
}

OutputParse defines how to parse step output

func (*OutputParse) UnmarshalText

func (o *OutputParse) UnmarshalText(text []byte) error

UnmarshalText allows OutputParse to be specified as a simple string

type OutputParser

type OutputParser struct{}

OutputParser handles parsing of step outputs into structured data.

func NewOutputParser

func NewOutputParser() *OutputParser

NewOutputParser creates a new output parser.

func (*OutputParser) Parse

func (p *OutputParser) Parse(output string, config OutputParse) (interface{}, error)

Parse parses output according to the parse configuration.

type ParallelGroupResult

type ParallelGroupResult struct {
	Completed []StepResult `json:"completed"`
	Failed    []StepResult `json:"failed,omitempty"`
	Partial   bool         `json:"partial"` // Some succeeded, some failed
}

ParallelGroupResult contains results from a parallel execution group

type ParseError

type ParseError struct {
	File    string `json:"file,omitempty"`
	Line    int    `json:"line,omitempty"`
	Field   string `json:"field,omitempty"`
	Message string `json:"message"`
	Hint    string `json:"hint,omitempty"`
}

ParseError represents a validation or parsing error with location info

func (ParseError) Error

func (e ParseError) Error() string

type Pipeline

type Pipeline struct {
	Session string
	Stages  []Stage
}

Pipeline represents a sequence of stages

type PipelineCancelOutput

type PipelineCancelOutput struct {
	RobotResponse
	RunID   string `json:"run_id"`
	Status  string `json:"status"`
	Message string `json:"message,omitempty"`
}

PipelineCancelOutput is the response for --robot-pipeline-cancel

type PipelineExecution

type PipelineExecution struct {
	RunID       string                  `json:"run_id"`
	WorkflowID  string                  `json:"workflow_id"`
	Session     string                  `json:"session"`
	Status      string                  `json:"status"`
	StartedAt   time.Time               `json:"started_at"`
	FinishedAt  *time.Time              `json:"finished_at,omitempty"`
	CurrentStep string                  `json:"current_step,omitempty"`
	Progress    PipelineProgress        `json:"progress"`
	Steps       map[string]PipelineStep `json:"steps"`
	Error       string                  `json:"error,omitempty"`
	// contains filtered or unexported fields
}

PipelineExecution tracks a running pipeline

func GetAllPipelines

func GetAllPipelines() []*PipelineExecution

GetAllPipelines returns all tracked pipelines (exported for CLI)

func GetPipelineExecution

func GetPipelineExecution(runID string) *PipelineExecution

GetPipelineExecution returns a pipeline by run ID (exported for CLI)

type PipelineHints

type PipelineHints struct {
	Summary     string   `json:"summary"`
	NextAction  string   `json:"next_action,omitempty"`
	StatusCmd   string   `json:"status_cmd,omitempty"`
	CancelCmd   string   `json:"cancel_cmd,omitempty"`
	Suggestions []string `json:"suggestions,omitempty"`
}

PipelineHints provides guidance for AI agents

type PipelineListOutput

type PipelineListOutput struct {
	RobotResponse
	Pipelines  []PipelineSummary `json:"pipelines"`
	AgentHints *PipelineHints    `json:"_agent_hints,omitempty"`
}

PipelineListOutput is the response for --robot-pipeline-list

type PipelineProgress

type PipelineProgress struct {
	Completed int     `json:"completed"`
	Running   int     `json:"running"`
	Pending   int     `json:"pending"`
	Failed    int     `json:"failed"`
	Skipped   int     `json:"skipped"`
	Total     int     `json:"total"`
	Percent   float64 `json:"percent"`
}

PipelineProgress tracks overall progress

type PipelineRunOptions

type PipelineRunOptions struct {
	WorkflowFile string                 // Path to workflow YAML/TOML file
	Session      string                 // Tmux session name
	Variables    map[string]interface{} // Runtime variables
	DryRun       bool                   // Validate without executing
	Background   bool                   // Run in background
}

PipelineRunOptions configures a pipeline run

type PipelineRunOutput

type PipelineRunOutput struct {
	RobotResponse
	RunID      string           `json:"run_id"`
	WorkflowID string           `json:"workflow_id"`
	Session    string           `json:"session"`
	Status     string           `json:"status"`
	DryRun     bool             `json:"dry_run,omitempty"`
	Progress   PipelineProgress `json:"progress,omitempty"`
	AgentHints *PipelineHints   `json:"_agent_hints,omitempty"`
}

PipelineRunOutput is the response for --robot-pipeline-run

type PipelineStatusOutput

type PipelineStatusOutput struct {
	RobotResponse
	RunID       string                  `json:"run_id"`
	WorkflowID  string                  `json:"workflow_id"`
	Session     string                  `json:"session"`
	Status      string                  `json:"status"`
	StartedAt   string                  `json:"started_at"`
	FinishedAt  string                  `json:"finished_at,omitempty"`
	DurationMs  int64                   `json:"duration_ms,omitempty"`
	CurrentStep string                  `json:"current_step,omitempty"`
	Progress    PipelineProgress        `json:"progress"`
	Steps       map[string]PipelineStep `json:"steps"`
	Error       string                  `json:"error,omitempty"`
	AgentHints  *PipelineHints          `json:"_agent_hints,omitempty"`
}

PipelineStatusOutput is the response for --robot-pipeline=run-id

type PipelineStep

type PipelineStep struct {
	ID          string `json:"id"`
	Status      string `json:"status"`
	Agent       string `json:"agent,omitempty"`
	PaneUsed    string `json:"pane_used,omitempty"`
	StartedAt   string `json:"started_at,omitempty"`
	FinishedAt  string `json:"finished_at,omitempty"`
	DurationMs  int64  `json:"duration_ms,omitempty"`
	OutputLines int    `json:"output_lines,omitempty"`
	Error       string `json:"error,omitempty"`
}

PipelineStep represents step status in pipeline output

type PipelineSummary

type PipelineSummary struct {
	RunID      string           `json:"run_id"`
	WorkflowID string           `json:"workflow_id"`
	Session    string           `json:"session"`
	Status     string           `json:"status"`
	StartedAt  string           `json:"started_at"`
	FinishedAt string           `json:"finished_at,omitempty"`
	Progress   PipelineProgress `json:"progress"`
}

PipelineSummary is a brief summary for listing

type ProgressEvent

type ProgressEvent struct {
	Type      string    `json:"type"` // step_start, step_complete, step_error, parallel_start, workflow_complete
	StepID    string    `json:"step_id,omitempty"`
	Message   string    `json:"message"`
	Progress  float64   `json:"progress"` // 0.0 - 1.0
	Timestamp time.Time `json:"timestamp"`
}

ProgressEvent is emitted during workflow execution for monitoring

type RobotResponse

type RobotResponse struct {
	Success   bool   `json:"success"`
	Timestamp string `json:"timestamp"`
	Error     string `json:"error,omitempty"`
	ErrorCode string `json:"error_code,omitempty"`
	Hint      string `json:"hint,omitempty"`
}

RobotResponse is the base structure for robot command outputs

func NewErrorResponse

func NewErrorResponse(err error, code string, hint string) RobotResponse

NewErrorResponse creates an error robot response

func NewRobotResponse

func NewRobotResponse(success bool) RobotResponse

NewRobotResponse creates a new robot response

type RoutingStrategy

type RoutingStrategy string

RoutingStrategy defines how to select an agent for a step

const (
	RouteLeastLoaded    RoutingStrategy = "least-loaded"
	RouteFirstAvailable RoutingStrategy = "first-available"
	RouteRoundRobin     RoutingStrategy = "round-robin"
)

type Stage

type Stage struct {
	AgentType string
	Prompt    string
	Model     string // Optional
}

Stage represents a step in the pipeline

type Step

type Step struct {
	// Identity
	ID   string `yaml:"id" toml:"id" json:"id"`                                     // Required, unique identifier
	Name string `yaml:"name,omitempty" toml:"name,omitempty" json:"name,omitempty"` // Human-readable name

	// Agent selection (choose one)
	Agent string          `yaml:"agent,omitempty" toml:"agent,omitempty" json:"agent,omitempty"` // Agent type: claude, codex, gemini
	Pane  int             `yaml:"pane,omitempty" toml:"pane,omitempty" json:"pane,omitempty"`    // Specific pane index
	Route RoutingStrategy `yaml:"route,omitempty" toml:"route,omitempty" json:"route,omitempty"` // Routing strategy

	// Prompt (choose one)
	Prompt     string `yaml:"prompt,omitempty" toml:"prompt,omitempty" json:"prompt,omitempty"`
	PromptFile string `yaml:"prompt_file,omitempty" toml:"prompt_file,omitempty" json:"prompt_file,omitempty"`

	// Wait configuration
	Wait    WaitCondition `yaml:"wait,omitempty" toml:"wait,omitempty" json:"wait,omitempty"` // completion, idle, time, none
	Timeout Duration      `yaml:"timeout,omitempty" toml:"timeout,omitempty" json:"timeout,omitempty"`

	// Dependencies
	DependsOn []string `yaml:"depends_on,omitempty" toml:"depends_on,omitempty" json:"depends_on,omitempty"`

	// Error handling
	OnError      ErrorAction `yaml:"on_error,omitempty" toml:"on_error,omitempty" json:"on_error,omitempty"`
	RetryCount   int         `yaml:"retry_count,omitempty" toml:"retry_count,omitempty" json:"retry_count,omitempty"`
	RetryDelay   Duration    `yaml:"retry_delay,omitempty" toml:"retry_delay,omitempty" json:"retry_delay,omitempty"`
	RetryBackoff string      `yaml:"retry_backoff,omitempty" toml:"retry_backoff,omitempty" json:"retry_backoff,omitempty"` // linear, exponential, none

	// Conditionals
	When string `yaml:"when,omitempty" toml:"when,omitempty" json:"when,omitempty"` // Skip if evaluates to false

	// Output handling
	OutputVar   string      `yaml:"output_var,omitempty" toml:"output_var,omitempty" json:"output_var,omitempty"`       // Store output in variable
	OutputParse OutputParse `yaml:"output_parse,omitempty" toml:"output_parse,omitempty" json:"output_parse,omitempty"` // none, json, yaml, lines, first_line, regex

	// Parallel execution (mutually exclusive with Prompt)
	Parallel []Step `yaml:"parallel,omitempty" toml:"parallel,omitempty" json:"parallel,omitempty"`

	// Loop execution
	Loop *LoopConfig `yaml:"loop,omitempty" toml:"loop,omitempty" json:"loop,omitempty"`

	// Loop control: break or continue (only valid inside loops)
	LoopControl LoopControl `yaml:"loop_control,omitempty" toml:"loop_control,omitempty" json:"loop_control,omitempty"`
}

Step represents a single step in the workflow

type StepError

type StepError struct {
	Type       string    `json:"type"` // timeout, agent_error, crash, validation, routing, send, capture
	Message    string    `json:"message"`
	Details    string    `json:"details,omitempty"`     // Full error output
	PaneOutput string    `json:"pane_output,omitempty"` // Last N lines from pane for debugging
	AgentState string    `json:"agent_state,omitempty"` // Agent state at time of error
	Attempt    int       `json:"attempt,omitempty"`     // Which retry attempt
	Timestamp  time.Time `json:"timestamp"`
}

StepError contains detailed error information for a failed step

type StepResult

type StepResult struct {
	StepID     string          `json:"step_id"`
	Status     ExecutionStatus `json:"status"`
	StartedAt  time.Time       `json:"started_at,omitempty"`
	FinishedAt time.Time       `json:"finished_at,omitempty"`
	PaneUsed   string          `json:"pane_used,omitempty"`
	AgentType  string          `json:"agent_type,omitempty"`
	Output     string          `json:"output,omitempty"`
	ParsedData interface{}     `json:"parsed_data,omitempty"` // Result of output_parse
	Error      *StepError      `json:"error,omitempty"`
	SkipReason string          `json:"skip_reason,omitempty"` // If skipped due to 'when' condition
	Attempts   int             `json:"attempts,omitempty"`    // Number of retry attempts
}

StepResult contains the result of executing a step

type SubstitutionError

type SubstitutionError struct {
	VarRef  string // The variable reference that failed
	Message string // Error description
}

SubstitutionError represents an error during variable substitution

func (*SubstitutionError) Error

func (e *SubstitutionError) Error() string

type Substitutor

type Substitutor struct {
	// contains filtered or unexported fields
}

Substitutor handles variable substitution in workflow prompts and conditions. It supports multiple variable types: vars, steps, env, and context variables.

func NewSubstitutor

func NewSubstitutor(state *ExecutionState, session, workflow string) *Substitutor

NewSubstitutor creates a new substitutor with the given execution context.

func (*Substitutor) Substitute

func (s *Substitutor) Substitute(template string) (string, error)

Substitute replaces all ${...} variable references in the template string. Returns the substituted string and any errors encountered.

func (*Substitutor) SubstituteStrict

func (s *Substitutor) SubstituteStrict(template string) (string, error)

SubstituteStrict is like Substitute but returns an error if any variable is undefined.

type ValidationResult

type ValidationResult struct {
	Valid    bool         `json:"valid"`
	Errors   []ParseError `json:"errors,omitempty"`
	Warnings []ParseError `json:"warnings,omitempty"`
}

ValidationResult contains the result of validating a workflow

func Validate

func Validate(w *Workflow) ValidationResult

Validate validates a workflow and returns all errors found

type VarDef

type VarDef struct {
	Description string      `yaml:"description,omitempty" toml:"description,omitempty" json:"description,omitempty"`
	Required    bool        `yaml:"required,omitempty" toml:"required,omitempty" json:"required,omitempty"`
	Default     interface{} `yaml:"default,omitempty" toml:"default,omitempty" json:"default,omitempty"`
	Type        VarType     `yaml:"type,omitempty" toml:"type,omitempty" json:"type,omitempty"` // string, number, boolean, array
}

VarDef defines a workflow variable with optional default and type info

type VarType

type VarType string

VarType represents the type of a workflow variable

const (
	VarTypeString  VarType = "string"
	VarTypeNumber  VarType = "number"
	VarTypeBoolean VarType = "boolean"
	VarTypeArray   VarType = "array"
)

type VariableContext

type VariableContext struct {
	Vars     map[string]interface{}
	Steps    map[string]StepResult
	Session  string
	RunID    string
	Workflow string
}

VariableContext provides access to workflow variables

func (*VariableContext) EvaluateString

func (vc *VariableContext) EvaluateString(s string) string

EvaluateString evaluates all variable references in a string

func (*VariableContext) GetVariable

func (vc *VariableContext) GetVariable(ref string) (interface{}, bool)

GetVariable retrieves a variable by reference path

func (*VariableContext) SetVariable

func (vc *VariableContext) SetVariable(name string, value interface{})

SetVariable sets a variable value

type WaitCondition

type WaitCondition string

WaitCondition defines when a step is considered complete

const (
	WaitCompletion WaitCondition = "completion" // Wait for agent to return to idle
	WaitIdle       WaitCondition = "idle"       // Same as completion
	WaitTime       WaitCondition = "time"       // Wait for specified timeout only
	WaitNone       WaitCondition = "none"       // Fire and forget
)

type Workflow

type Workflow struct {
	// Metadata
	SchemaVersion string `yaml:"schema_version" toml:"schema_version" json:"schema_version"`
	Name          string `yaml:"name" toml:"name" json:"name"`
	Description   string `yaml:"description,omitempty" toml:"description,omitempty" json:"description,omitempty"`
	Version       string `yaml:"version,omitempty" toml:"version,omitempty" json:"version,omitempty"`

	// Variable definitions
	Vars map[string]VarDef `yaml:"vars,omitempty" toml:"vars,omitempty" json:"vars,omitempty"`

	// Global settings
	Settings WorkflowSettings `yaml:"settings,omitempty" toml:"settings,omitempty" json:"settings,omitempty"`

	// Step definitions
	Steps []Step `yaml:"steps" toml:"steps" json:"steps"`
}

Workflow represents a complete workflow definition loaded from YAML/TOML

func ParseFile

func ParseFile(path string) (*Workflow, error)

ParseFile parses a workflow file (YAML or TOML) and returns the workflow

func ParseString

func ParseString(content string, format string) (*Workflow, error)

ParseString parses workflow from a string (auto-detects format)

type WorkflowSettings

type WorkflowSettings struct {
	Timeout          Duration    `yaml:"timeout,omitempty" toml:"timeout,omitempty" json:"timeout,omitempty"`    // Global timeout (e.g., "30m")
	OnError          ErrorAction `yaml:"on_error,omitempty" toml:"on_error,omitempty" json:"on_error,omitempty"` // fail, continue
	NotifyOnComplete bool        `yaml:"notify_on_complete,omitempty" toml:"notify_on_complete,omitempty" json:"notify_on_complete,omitempty"`
	NotifyOnError    bool        `yaml:"notify_on_error,omitempty" toml:"notify_on_error,omitempty" json:"notify_on_error,omitempty"`
	NotifyChannels   []string    `yaml:"notify_channels,omitempty" toml:"notify_channels,omitempty" json:"notify_channels,omitempty"` // desktop, webhook, mail
	WebhookURL       string      `yaml:"webhook_url,omitempty" toml:"webhook_url,omitempty" json:"webhook_url,omitempty"`
	MailRecipient    string      `yaml:"mail_recipient,omitempty" toml:"mail_recipient,omitempty" json:"mail_recipient,omitempty"`
}

WorkflowSettings contains global workflow configuration

func DefaultWorkflowSettings

func DefaultWorkflowSettings() WorkflowSettings

DefaultWorkflowSettings returns sensible defaults for workflow settings

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL