Documentation
¶
Overview ¶
Package pipeline provides composable stages for building concurrent data processing pipelines using channels. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.
Index ¶
- func Aggregate[T any](name string, p *Pipeline, in <-chan T, out chan<- []T)
- func Batch[In any, Out any](name string, p *Pipeline, batcher func(context.Context, []In) (*Out, error), ...)
- func Expand[In any, Out any](name string, p *Pipeline, ...)
- func FanIn[T any](name string, p *Pipeline, out chan<- T, in ...<-chan T)
- func FanOut[T any](name string, p *Pipeline, in <-chan T, out ...chan<- T)
- func FanOutRoundRobin[T any](name string, p *Pipeline, in <-chan T, out ...chan<- T)
- func Filter[T any](name string, p *Pipeline, filter func(context.Context, T) (bool, error), ...)
- func Flatten[T any](name string, p *Pipeline, in <-chan []T, out chan<- T)
- func Limit[T any](name string, p *Pipeline, n int, in <-chan T, out chan<- T)
- func ParallelTransform[In any, Out any](name string, p *Pipeline, workers int, ...)
- func Reduce[T any, Acc any](name string, p *Pipeline, initial Acc, ...)
- func Sink[T any](name string, p *Pipeline, in <-chan T) iter.Seq2[T, error]
- func Source[T any](name string, p *Pipeline, seq iter.Seq2[T, error], out chan<- T)
- func SourceSlice[T any](name string, p *Pipeline, seq iter.Seq[T], out chan<- T)
- func Split[T any](name string, p *Pipeline, selector func(context.Context, T) int, in <-chan T, ...)
- func Transform[In any, Out any](name string, p *Pipeline, transformer func(context.Context, In) (*Out, error), ...)
- type Pipeline
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Aggregate ¶
Aggregate consumes all values from the input channel and sends the collected slice of values as a single item on the output channel.
func Batch ¶
func Batch[In any, Out any](name string, p *Pipeline, batcher func(context.Context, []In) (*Out, error), batchSize int, in <-chan In, out chan<- Out)
Batch groups incoming values into fixed-size batches, passes each batch to the batcher function, and forwards the resulting value to the output channel. Any remaining items after the input channel closes are processed as a final batch. The batcher must return a non-nil pointer when err is nil, otherwise a panic will occur.
func Expand ¶ added in v1.6.0
func Expand[In any, Out any](name string, p *Pipeline, expander func(context.Context, In) iter.Seq2[Out, error], in <-chan In, out chan<- Out)
Expand reads values from the input channel, applies the expander function to each value, and forwards all items from the returned iterator to the output channel. For each input item, the expander returns an iterator of output items, which are all sent to the output channel. Processing continues until the context is done or the input channel is closed. This allows for lazy evaluation and avoids loading all expanded items into memory at once.
func FanIn ¶
FanIn merges multiple input channels into a single output channel, forwarding all values from each input until the context is done or all inputs are closed.
func FanOut ¶
FanOut distributes items from a single input channel to multiple output channels, sending each item to all output channels.
func FanOutRoundRobin ¶
FanOutRoundRobin distributes items from a single input channel to multiple output channels using round-robin distribution, sending each item to only one output channel. Panics if no output channels are provided.
func Filter ¶
func Filter[T any](name string, p *Pipeline, filter func(context.Context, T) (bool, error), in <-chan T, out chan<- T)
Filter reads values from the input channel, applies the filter predicate, and forwards only values that satisfy the predicate to the output channel. It respects context cancellation and stops processing on error.
func Flatten ¶ added in v1.4.0
Flatten takes an input channel of slices and emits each element of each slice as an individual item on the output channel. It continues until the input channel is closed or the context is cancelled.
func Limit ¶
Limit reads values from the input channel, forwards at most n values to the output channel, and then returns. It respects context cancellation while reading and forwarding values.
func ParallelTransform ¶
func ParallelTransform[In any, Out any](name string, p *Pipeline, workers int, transformer func(context.Context, In) (*Out, error), in <-chan In, out chan<- Out)
ParallelTransform applies the transformer function to values read from the input channel using a fixed number of concurrent workers, forwarding successful results to the output channel until the context is done or the input channel is closed. The transformer must return a non-nil pointer when err is nil, otherwise a panic will occur.
func Reduce ¶ added in v1.5.0
func Reduce[T any, Acc any](name string, p *Pipeline, initial Acc, reducer func(context.Context, Acc, T) (Acc, error), in <-chan T, out chan<- Acc)
Reduce processes values from the input channel incrementally using a reducer function, combining them with an accumulator. This allows aggregating results as they come in without keeping all values in memory. The reducer function takes the current accumulator and the next value, and returns the updated accumulator. The final accumulated result is sent to the output channel.
func Sink ¶ added in v1.3.0
Sink returns an iterator that yields values from an input channel. The iterator continues until the channel is closed or the pipeline's context is cancelled. If the context is cancelled, the iterator yields a zero value for T and the context's error.
func Source ¶ added in v1.1.0
Source reads values from an iterator that can return errors. It sends values to the output channel until the iterator is exhausted, an error occurs, or the context is done. If the iterator returns an error, the pipeline is cancelled.
func SourceSlice ¶ added in v1.2.0
SourceSlice reads values from the provided iterator and sends them to the output channel until the iterator is exhausted or the context is done.
func Split ¶
func Split[T any](name string, p *Pipeline, selector func(context.Context, T) int, in <-chan T, out ...chan<- T)
Split routes each value read from the input channel to exactly one of the provided output channels, as determined by the selector function. The selector must return a valid index into the out slice. Panics if the selector returns an invalid index.
func Transform ¶
func Transform[In any, Out any](name string, p *Pipeline, transformer func(context.Context, In) (*Out, error), in <-chan In, out chan<- Out)
Transform reads values from the input channel, applies the transformer function, and forwards successful results to the output channel until the context is done or the input channel is closed. The transformer must return a non-nil pointer when err is nil, otherwise a panic will occur.
Types ¶
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline coordinates concurrent processing stages, managing their lifecycle and propagating errors and cancellation signals across all stages.
func WithPipeline ¶
WithPipeline creates a new Pipeline and a derived context for coordinating pipeline stages. The returned context is cancelled when any stage encounters an error. Use the returned Pipeline to register stages and wait for completion.