Documentation
¶
Overview ¶
Package gather - used to create worker pools, pipelines, and middleware
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Workers ¶
func Workers[IN any, OUT any]( ctx context.Context, in <-chan IN, handler HandlerFunc[IN, OUT], opts ...Opt, ) <-chan OUT
Workers starts multiple goroutines that read jobs from in, process them with handler, and forward results to out. It returns out.
Concurrency and resource use:
- Spawns N worker goroutines (configured with opts) plus a small, constant number of internal coordinators (O(1)). No goroutines are created per job.
- Backpressure is applied by the capacities of in/out (unbuffered channels block).
- buffer size of internal and out channels can be configured with opts
Lifecycle:
- When in is closed and all jobs are drained/processed, out is closed.
- If ctx is canceled, workers stop early and out is closed after in-flight jobs exit.
- the out channel MUST be drained to avoid a deadlock.
Ordering:
- By default, results are NOT guaranteed to preserve input order.
- Use opts to configure to guarantee preserved order.
Errors:
- If handler returns an error, the job is NOT sent to out.
Types ¶
type HandlerFunc ¶
HandlerFunc - function used to handle a single request sent to a worker Error handling is done here. user can:
- cancel the context if needed for immediate shutdown
- for graceful shutdown: user controls generator. can just close in chan and then let all downstream stages finish
- send to their own err channel (which could be processed by another Workers)
- use scope for retries, ReplyTo pattern, etc
type Middleware ¶
type Middleware[IN any, OUT any] func(next HandlerFunc[IN, OUT]) HandlerFunc[IN, OUT]
Middleware wraps a HandlerFunc and returns a new HandlerFunc. The returned handler may run logic before and/or after invoking next, or choose not to call next (short-circuit).
Middleware is composable: multiple middlewares can be applied in order.
func Chain ¶
func Chain[IN any, OUT any](mws ...Middleware[IN, OUT]) Middleware[IN, OUT]
Chain middleware together in FIFO execution order.
Chain(m1,m2,m3)(h) == m1(m2(m3(h)))
type Opt ¶
type Opt func(w *workerOpts)
Opt - options used to configure Workers.
func WithBufferSize ¶
WithBufferSize - set buffer size for the internal and output channels.
Uses unbuffered channels by default.
func WithOrderPreserved ¶
func WithOrderPreserved() Opt
WithOrderPreserved - preserves order of input to output the workers will keep running but results are blocked from sending to out until the "next" result is ready to send.
func WithPanicOnNilChannel ¶
func WithPanicOnNilChannel() Opt
WithPanicOnNilChannel - option to panic when a nil channel is sent to Workers By default, Workers will immediately close the out channel and return.
func WithWorkerSize ¶
WithWorkerSize - set the number of concurrent workers. Each worker consumes one job at a time.
Uses 1 by default.
type Scope ¶
type Scope[IN any] struct { // contains filtered or unexported fields }
Scope - a per-request utility passed to handlers and middleware.
func (*Scope[IN]) RetryAfter ¶
RetryAfter - retry the request after "delay" time passes.
Only one retry can be done at a time for each job. Any extra retries will be ignored.
Retries must be executed directly in the handler (not from other goroutines).
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
chain
command
|
|
|
errchan
command
this example is a pipeline where all 3 stages are sending errors to a separate error channel this pattern might be useful to unblock workers while a separate go routine (or worker pool) prepares a list of errors for an api response
|
this example is a pipeline where all 3 stages are sending errors to a separate error channel this pattern might be useful to unblock workers while a separate go routine (or worker pool) prepares a list of errors for an api response |
|
futureideas/seq
command
the seq package would allow you to work with gather without directly using channels below is an example of using a slice as input and then iterating through iter.Seq Note that you can break the loop any time without having to manually cancel the context or close any channel
|
the seq package would allow you to work with gather without directly using channels below is an example of using a slice as input and then iterating through iter.Seq Note that you can break the loop any time without having to manually cancel the context or close any channel |
|
futureideas/sharding
command
this example shows how the experimental shard package can be used
|
this example shows how the experimental shard package can be used |
|
pipe
command
|
|
|
pipeline
command
|
|
|
retries
command
This example shows how scope.RetryAfter can be utilized by a middleware to implement a retry logic with max retries.
|
This example shows how scope.RetryAfter can be utilized by a middleware to implement a retry logic with max retries. |
|
internal
|
|
|
op
Package op - used for math operations
|
Package op - used for math operations |
|
seq
Package seq - used to wrap iter.Seq around gather this can be used to allow abstraction of channels
|
Package seq - used to wrap iter.Seq around gather this can be used to allow abstraction of channels |
|
shard
Package shard - package used to multiply stages into shards in most cases: just use gather.Workers
|
Package shard - package used to multiply stages into shards in most cases: just use gather.Workers |
