pipeline

package module
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: MIT Imports: 7 Imported by: 0

README

pipeline

A Go package for building concurrent data processing pipelines using channels.

Overview

pipeline provides a set of composable stages for processing data streams concurrently. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.

All pipeline stages require a name parameter as their first argument. These names are used to create trace regions for performance analysis using Go's runtime/trace package. When creating a pipeline with WithPipeline, you also provide a name for the overall pipeline task.

Installation

go get github.com/schraf/pipeline

Usage

Basic Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/schraf/pipeline"
)

func main() {
	p, _ := pipeline.WithPipeline(context.Background(), "example")

	// Define some data to process
	data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

	// Create channels
	in := make(chan int, len(data))
	out := make(chan int, len(data))

	// Source: feed data into the pipeline from a slice
	pipeline.SourceSlice("source", p, slices.Values(data), in)

	// Transform: multiply by 2
	pipeline.Transform("multiply", p, func(ctx context.Context, x int) (*int, error) {
		result := x * 2
		return &result, nil
	}, in, out)


	// Read results
	for v, err := range pipeline.Sink("sink", p, out) {
        if err != nil {
            panic(err)
        }

		fmt.Println(v)
	}
}

Pipeline Stages

Source

Starts a pipeline from a fallible iterator (iter.Seq2[T, error]). If the iterator returns an error, the pipeline is cancelled.

// Example: create a custom iterator that reads from a file or database
// and can return an error.
var myIterator iter.Seq2[string, error] 

out := make(chan string, 100)
pipeline.Source("my-source", p, myIterator, out)
SourceSlice

Starts a pipeline from a simple, error-free iterator (iter.Seq[T]), which is useful for in-memory slices.

data := []int{1, 2, 3, 4, 5}
out := make(chan int, 5)

// The iterator can be created from a slice using slices.Values
pipeline.SourceSlice("source", p, slices.Values(data), out)
Sink

The "opposite" of a source, Sink takes an input channel and returns an iter.Seq2[T, error] iterator. This allows consuming values from a pipeline using a standard for...range loop. If the pipeline is cancelled, the iterator will yield an error.

// Given 'out' is a channel from a pipeline stage...
it := pipeline.Sink("sink", p, out)

// You can now iterate over the results.
for v, err := range it {
    if err != nil {
        // This can happen if the pipeline is cancelled.
        log.Fatalf("iterator error: %v", err)
    }
    fmt.Println(v)
}
Transform

Applies a transformation function to each value:

pipeline.Transform("transform", p, func(ctx context.Context, x int) (*int, error) {
    result := x * 2
    return &result, nil
}, in, out)
Filter

Filters values based on a predicate:

pipeline.Filter("filter", p, func(ctx context.Context, x int) (bool, error) {
    return x%2 == 0, nil
}, in, out)
Batch

Groups values into fixed-size batches:

pipeline.Batch("batch", p, func(ctx context.Context, batch []int) (*int, error) {
    sum := 0
    for _, v := range batch {
        sum += v
    }
    return &sum, nil
}, 3, in, out)
ParallelTransform

Applies transformation with concurrent workers:

pipeline.ParallelTransform("parallel-transform", p, 5, func(ctx context.Context, x int) (*int, error) {
    result := x * 2
    return &result, nil
}, in, out)
FanIn

Merges multiple input channels into one:

pipeline.FanIn("fan-in", p, out, in1, in2, in3)
FanOut

Distributes values to multiple output channels (broadcast):

pipeline.FanOut("fan-out", p, in, out1, out2, out3)
FanOutRoundRobin

Distributes values round-robin style:

pipeline.FanOutRoundRobin("fan-out-round-robin", p, in, out1, out2, out3)
Limit

Limits the number of values passed through:

pipeline.Limit("limit", p, 10, in, out)
Split

Routes values to different channels based on a selector:

pipeline.Split("split", p, func(ctx context.Context, x int) int {
    return (x - 1) % 3
}, in, out1, out2, out3)
Aggregate

Collects all values into a single slice:

pipeline.Aggregate("aggregate", p, in, out)
Reduce

Processes values incrementally using a reducer function, combining them with an accumulator. This allows aggregating results as they come in without keeping all values in memory:

pipeline.Reduce("reduce", p, 0, func(ctx context.Context, acc int, x int) (int, error) {
    return acc + x, nil
}, in, out)
Flatten

Takes an input channel of slices and emits each element of each slice as an individual item on the output channel:

pipeline.Flatten("flatten", p, in, out)
Expand

Takes single input items from a channel and for each input, outputs multiple items of another type. The expander function returns an iterator (iter.Seq2[Out, error]) of output items for each input, allowing for lazy evaluation and avoiding loading all expanded items into memory at once:

pipeline.Expand("expand", p, func(ctx context.Context, x int) iter.Seq2[string, error] {
    return func(yield func(string, error) bool) {
        yield(fmt.Sprintf("%d", x), nil)
        yield(fmt.Sprintf("%d", x*2), nil)
    }
}, in, out)

Error Handling

The pipeline automatically cancels all stages when an error occurs. The first error encountered is returned by Wait():

if err := p.Wait(); err != nil {
    log.Fatal(err)
}

Requirements

  • Go 1.24.0 or later

License

See LICENSE file for details.

Documentation

Overview

Package pipeline provides composable stages for building concurrent data processing pipelines using channels. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Aggregate

func Aggregate[T any](name string, p *Pipeline, in <-chan T, out chan<- []T)

Aggregate consumes all values from the input channel and sends the collected slice of values as a single item on the output channel.

func Batch

func Batch[In any, Out any](name string, p *Pipeline, batcher func(context.Context, []In) (*Out, error), batchSize int, in <-chan In, out chan<- Out)

Batch groups incoming values into fixed-size batches, passes each batch to the batcher function, and forwards the resulting value to the output channel. Any remaining items after the input channel closes are processed as a final batch. The batcher must return a non-nil pointer when err is nil, otherwise a panic will occur.

func Expand added in v1.6.0

func Expand[In any, Out any](name string, p *Pipeline, expander func(context.Context, In) iter.Seq2[Out, error], in <-chan In, out chan<- Out)

Expand reads values from the input channel, applies the expander function to each value, and forwards all items from the returned iterator to the output channel. For each input item, the expander returns an iterator of output items, which are all sent to the output channel. Processing continues until the context is done or the input channel is closed. This allows for lazy evaluation and avoids loading all expanded items into memory at once.

func FanIn

func FanIn[T any](name string, p *Pipeline, out chan<- T, in ...<-chan T)

FanIn merges multiple input channels into a single output channel, forwarding all values from each input until the context is done or all inputs are closed.

func FanOut

func FanOut[T any](name string, p *Pipeline, in <-chan T, out ...chan<- T)

FanOut distributes items from a single input channel to multiple output channels, sending each item to all output channels.

func FanOutRoundRobin

func FanOutRoundRobin[T any](name string, p *Pipeline, in <-chan T, out ...chan<- T)

FanOutRoundRobin distributes items from a single input channel to multiple output channels using round-robin distribution, sending each item to only one output channel. Panics if no output channels are provided.

func Filter

func Filter[T any](name string, p *Pipeline, filter func(context.Context, T) (bool, error), in <-chan T, out chan<- T)

Filter reads values from the input channel, applies the filter predicate, and forwards only values that satisfy the predicate to the output channel. It respects context cancellation and stops processing on error.

func Flatten added in v1.4.0

func Flatten[T any](name string, p *Pipeline, in <-chan []T, out chan<- T)

Flatten takes an input channel of slices and emits each element of each slice as an individual item on the output channel. It continues until the input channel is closed or the context is cancelled.

func Limit

func Limit[T any](name string, p *Pipeline, n int, in <-chan T, out chan<- T)

Limit reads values from the input channel, forwards at most n values to the output channel, and then returns. It respects context cancellation while reading and forwarding values.

func ParallelTransform

func ParallelTransform[In any, Out any](name string, p *Pipeline, workers int, transformer func(context.Context, In) (*Out, error), in <-chan In, out chan<- Out)

ParallelTransform applies the transformer function to values read from the input channel using a fixed number of concurrent workers, forwarding successful results to the output channel until the context is done or the input channel is closed. The transformer must return a non-nil pointer when err is nil, otherwise a panic will occur.

func Reduce added in v1.5.0

func Reduce[T any, Acc any](name string, p *Pipeline, initial Acc, reducer func(context.Context, Acc, T) (Acc, error), in <-chan T, out chan<- Acc)

Reduce processes values from the input channel incrementally using a reducer function, combining them with an accumulator. This allows aggregating results as they come in without keeping all values in memory. The reducer function takes the current accumulator and the next value, and returns the updated accumulator. The final accumulated result is sent to the output channel.

func Sink added in v1.3.0

func Sink[T any](name string, p *Pipeline, in <-chan T) iter.Seq2[T, error]

Sink returns an iterator that yields values from an input channel. The iterator continues until the channel is closed or the pipeline's context is cancelled. If the context is cancelled, the iterator yields a zero value for T and the context's error.

func Source added in v1.1.0

func Source[T any](name string, p *Pipeline, seq iter.Seq2[T, error], out chan<- T)

Source reads values from an iterator that can return errors. It sends values to the output channel until the iterator is exhausted, an error occurs, or the context is done. If the iterator returns an error, the pipeline is cancelled.

func SourceSlice added in v1.2.0

func SourceSlice[T any](name string, p *Pipeline, seq iter.Seq[T], out chan<- T)

SourceSlice reads values from the provided iterator and sends them to the output channel until the iterator is exhausted or the context is done.

func Split

func Split[T any](name string, p *Pipeline, selector func(context.Context, T) int, in <-chan T, out ...chan<- T)

Split routes each value read from the input channel to exactly one of the provided output channels, as determined by the selector function. The selector must return a valid index into the out slice. Panics if the selector returns an invalid index.

func Transform

func Transform[In any, Out any](name string, p *Pipeline, transformer func(context.Context, In) (*Out, error), in <-chan In, out chan<- Out)

Transform reads values from the input channel, applies the transformer function, and forwards successful results to the output channel until the context is done or the input channel is closed. The transformer must return a non-nil pointer when err is nil, otherwise a panic will occur.

Types

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline coordinates concurrent processing stages, managing their lifecycle and propagating errors and cancellation signals across all stages.

func WithPipeline

func WithPipeline(ctx context.Context, name string) (*Pipeline, context.Context)

WithPipeline creates a new Pipeline and a derived context for coordinating pipeline stages. The returned context is cancelled when any stage encounters an error. Use the returned Pipeline to register stages and wait for completion.

func (*Pipeline) Wait

func (p *Pipeline) Wait() error

Wait blocks until all registered stages complete and returns the first error encountered by any stage, or nil if all stages completed successfully.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL