gather

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2025 License: MIT Imports: 5 Imported by: 0

README

  _____       _______ _    _ ______ _____  
 / ____|   /\|__   __| |  | |  ____|  __ \ 
| |  __   /  \  | |  | |__| | |__  | |__) |
| | |_ | / /\ \ | |  |  __  |  __| |  _  / 
| |__| |/ ____ \| |  | |  | | |____| | \ \ 
 \_____/_/    \_\_|  |_|  |_|______|_|  \_\
 
gather logo

Go Report Card OpenSSF Score Test Status Codecov

Gather is a lightweight channel-based concurrency library for Go. It helps you build worker pools, pipelines, and middleware.

Quick Example

opts := []gather.Opt {
    gather.WithWorkerSize(100),
    gather.WithBufferSize(10),
    gather.WithOrderPreserved(),
}

handler := func(ctx context.Context, in Foo, _ *gather.Scope[Foo]) (Bar, error) {
    // do work...
    return Bar{}, nil
}

out := gather.Workers(ctx, in, handler, opts...) 

for v := range out {
    // consume
}

Add to your project

go get github.com/jaredmtdev/gather

API at a glance

  • Workers: start a worker pool that consumes an input channel and returns an output channel
  • HandlerFunc: handles each job
  • Scope: tools available to a handler (i.e. retries)
  • Middleware: wrap handlers and other middleware
  • Chain: chains multiple middleware

Design Philosophy

Gather provides the glue: workers, pipelines, and middleware.
You design the concurrency patterns that fit your use case.

Simple

  • familiar middleware model (like "net/http")
  • decouples middleware plumbing from business logic
  • uses plain go primitives: context, channels, and functions

Flexible

  • leaves retry and error handling decisions to you
  • lets you manage input/output channels directly
  • no global state
  • context-aware: honors cancellations, timeouts, and deadlines

Should I use Gather?

Use Gather if channels are unavoidable or if you need pipeline semantics that errgroup and sync.WaitGroup don't give you

When Gather is a good fit

  • you need middleware like retries, circuit breakers, backpressure, timeouts, log, etc
  • multi-stage pipelines with fan-out/fan-in
  • optional ordering with a reorder gate
  • composability of stages

When a simpler tool is better

  • independent tasks, fail fast -> use errgroup
  • 100% CPU-bound work -> use sync.WaitGroup
  • jobs that complete instantly
    • simplicity / minimal memory -> use iter package
    • best execution time
      • can avoid channels -> use fixed workers processing batches (fastest)
      • want decoupling/backpressure/middleware -> use gather with multiple workers + batches
        • large enough batches ammortize channel cost to almost nothing
  • background task waiting to close a channel -> use plain goroutine
  • simple generator -> use plain goroutine or iter.Seq

Getting Started

building a worker pool

1: build your handler

This handles each item sent to the worker pool

handler := func(ctx context.Context, in Foo, scope *gather.Scope[Foo]) (Bar, error) {
    // do work...
    return Bar{}, nil
}

The context can be cancelled at any time to shut down the worker pool. Note that it's important for the handler to also honor any context cancellation for a quicker cancellation.

The in variable can be any type and the response from the workerpool can be any type. When returning an error, the result is not sent to the output channel. If needed, the error can also be sent to an error channel (which you create) and then processed in a separate go routine (which you define). The error response also comes in handy when building middleware.

Optionally, make custom middleware to conveniently wrap around the handler:

wrappedHandler := logger(retries(rateLimiter(handler)))

// alternatively, chain the middleware:
mw := gather.Chain(rateLimiter, retries, logger)
wrappedHandler := mw(handler)

See examples/internal/samplemiddleware/ for more detailed examples on building middleware.

The scope provides extra capabilities that may come in handy such as retries or spawning new go routines with a guaruntee that those go routines finish before the worker pool shuts down.

2: build your generator

You need to have a channel of any type <-chan T which can only be received by the worker pool.

3: configure and run the worker pool
out := gather.Workers(ctx, in, handler, opts...) 

This will return an output channel. The channel must be consumed or drained so the workers don't get blocked:

// consume output
for v := range out {
    fmt.Println(v)
}

// alternatively, drain output if it doesn't need to be consumed
for range out{
}

Notice the opts.... These options are used to configure the worker pool. Look for any function starting with gather.With. Here you can configure things like the number of workers, channel buffer size, preserve order, etc.

Building a pipeline

The above worker pools are a single stage of the pipeline. To build a pipeline, just build multiple worker pools and pass the output of one into the next:

out1 := gather.Workers(ctx, in, handler1, opts...) 
out2 := gather.Workers(ctx, out1, handler2, opts...) 
out3 := gather.Workers(ctx, out2, handler3, opts...) 
for range out3 {
    // drain
}

This is quite simple and gives full control! You can tune the configuration of each stage of the pipeline. You could cancel at any stage to stop the entire pipeline.

Examples

Please see examples/ folder for some simple examples.

Future Ideas

  • sharding across multiple channels
  • seq package: offer synchronous helpers that utilize iter.Seq and integrate nicely with Gather
  • include WithEventHook(hook func(Event)) Opt which can be used for logging/debugging

Documentation

Overview

Package gather - used to create worker pools, pipelines, and middleware

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Workers

func Workers[IN any, OUT any](
	ctx context.Context,
	in <-chan IN,
	handler HandlerFunc[IN, OUT],
	opts ...Opt,
) <-chan OUT

Workers starts multiple goroutines that read jobs from in, process them with handler, and forward results to out. It returns out.

Concurrency and resource use:

  • Spawns N worker goroutines (configured with opts) plus a small, constant number of internal coordinators (O(1)). No goroutines are created per job.
  • Backpressure is applied by the capacities of in/out (unbuffered channels block).
  • buffer size of internal and out channels can be configured with opts

Lifecycle:

  • When in is closed and all jobs are drained/processed, out is closed.
  • If ctx is canceled, workers stop early and out is closed after in-flight jobs exit.
  • the out channel MUST be drained to avoid a deadlock.

Ordering:

  • By default, results are NOT guaranteed to preserve input order.
  • Use opts to configure to guarantee preserved order.

Errors:

  • If handler returns an error, the job is NOT sent to out.

Types

type HandlerFunc

type HandlerFunc[IN any, OUT any] func(ctx context.Context, in IN, scope *Scope[IN]) (OUT, error)

HandlerFunc - function used to handle a single request sent to a worker Error handling is done here. user can:

  • cancel the context if needed for immediate shutdown
  • for graceful shutdown: user controls generator. can just close in chan and then let all downstream stages finish
  • send to their own err channel (which could be processed by another Workers)
  • use scope for retries, ReplyTo pattern, etc

type Middleware

type Middleware[IN any, OUT any] func(next HandlerFunc[IN, OUT]) HandlerFunc[IN, OUT]

Middleware wraps a HandlerFunc and returns a new HandlerFunc. The returned handler may run logic before and/or after invoking next, or choose not to call next (short-circuit).

Middleware is composable: multiple middlewares can be applied in order.

func Chain

func Chain[IN any, OUT any](mws ...Middleware[IN, OUT]) Middleware[IN, OUT]

Chain middleware together in FIFO execution order.

Chain(m1,m2,m3)(h) == m1(m2(m3(h)))

type Opt

type Opt func(w *workerOpts)

Opt - options used to configure Workers.

func WithBufferSize

func WithBufferSize(bufferSize int) Opt

WithBufferSize - set buffer size for the internal and output channels.

Uses unbuffered channels by default.

func WithOrderPreserved

func WithOrderPreserved() Opt

WithOrderPreserved - preserves order of input to output the workers will keep running but results are blocked from sending to out until the "next" result is ready to send.

func WithPanicOnNilChannel

func WithPanicOnNilChannel() Opt

WithPanicOnNilChannel - option to panic when a nil channel is sent to Workers By default, Workers will immediately close the out channel and return.

func WithWorkerSize

func WithWorkerSize(workerSize int) Opt

WithWorkerSize - set the number of concurrent workers. Each worker consumes one job at a time.

Uses 1 by default.

type Scope

type Scope[IN any] struct {
	// contains filtered or unexported fields
}

Scope - a per-request utility passed to handlers and middleware.

func (*Scope[IN]) Retry

func (s *Scope[IN]) Retry(ctx context.Context, in IN)

Retry - will retry immediately.

func (*Scope[IN]) RetryAfter

func (s *Scope[IN]) RetryAfter(ctx context.Context, in IN, delay time.Duration)

RetryAfter - retry the request after "delay" time passes.

Only one retry can be done at a time for each job. Any extra retries will be ignored.

Retries must be executed directly in the handler (not from other goroutines).

Directories

Path Synopsis
examples
chain command
errchan command
this example is a pipeline where all 3 stages are sending errors to a separate error channel this pattern might be useful to unblock workers while a separate go routine (or worker pool) prepares a list of errors for an api response
this example is a pipeline where all 3 stages are sending errors to a separate error channel this pattern might be useful to unblock workers while a separate go routine (or worker pool) prepares a list of errors for an api response
futureideas/seq command
the seq package would allow you to work with gather without directly using channels below is an example of using a slice as input and then iterating through iter.Seq Note that you can break the loop any time without having to manually cancel the context or close any channel
the seq package would allow you to work with gather without directly using channels below is an example of using a slice as input and then iterating through iter.Seq Note that you can break the loop any time without having to manually cancel the context or close any channel
futureideas/sharding command
this example shows how the experimental shard package can be used
this example shows how the experimental shard package can be used
pipe command
pipeline command
retries command
This example shows how scope.RetryAfter can be utilized by a middleware to implement a retry logic with max retries.
This example shows how scope.RetryAfter can be utilized by a middleware to implement a retry logic with max retries.
internal
op
Package op - used for math operations
Package op - used for math operations
seq
Package seq - used to wrap iter.Seq around gather this can be used to allow abstraction of channels
Package seq - used to wrap iter.Seq around gather this can be used to allow abstraction of channels
shard
Package shard - package used to multiply stages into shards in most cases: just use gather.Workers
Package shard - package used to multiply stages into shards in most cases: just use gather.Workers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL