pusher

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: MIT Imports: 8 Imported by: 0

README

pusher

pusher is a Go library for simple and quick load testing, see more in the examples folder.

The library is built around the Worker concept — the entity who works! Key elements:

  • Worker — the main character
  • Target — what we want to test
  • Gossiper — interface for listening to something interesting
  • Gossip — task lifecycle events
  • Offer — options for hire the Worker

Quick Start

package main

import (
	"context"
	"time"

	"github.com/therenotomorrow/pusher"
)

func main() {
	target := func(_ context.Context) (pusher.Result, error) {
		return time.Now(), nil // time.Time support pusher.Result interface
	}

	// run target with 50 RPS for 1 minute
	_ = pusher.Work(50, time.Minute, target)
}

Slow Start

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/therenotomorrow/pusher"
)

func main() {
	// Your function to test
	target := func(ctx context.Context) (pusher.Result, error) {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		default:
		}

		now := time.Now().UTC()
		if now.Second()%2 == 0 {
			time.Sleep(100 * time.Millisecond)
		}

		return Result(now.String()), nil
	}

	// Some of your gossips listeners
	observers := make([]*Observer, 0)

	gossipers := make([]pusher.Gossiper, 0)
	for _, when := range []pusher.When{pusher.Canceled, pusher.BeforeTarget, pusher.AfterTarget} {
		observer := &Observer{done: make(chan struct{}), when: when, count: 0}

		observers = append(observers, observer)
		gossipers = append(gossipers, observer)
	}

	// run target with 100 RPS for 1 minute with max 10 requests concurrent
	// and add 3 listeners for collect statistics
	err := pusher.Work(
		100,                              // rps
		time.Minute,                      // duration
		target,                           // target
		pusher.WithOvertime(10),          // concurrent limit
		pusher.WithGossips(gossipers...), // our gossipers
	)

	// check what was done and collect
	fmt.Println("We're done with error:", err)
	fmt.Println("Canceled:", observers[0].count)
	fmt.Println("Received:", observers[1].count)
	fmt.Println("Processed:", observers[2].count)

	// Output:
	// We're done with error: context deadline exceeded
	// Canceled: 256
	// Received: 5744
	// Processed: 5744
}

type Result string

func (r Result) String() string {
	return string(r)
}

type Observer struct {
	done  chan struct{}
	when  pusher.When
	count int
}

func (o *Observer) Listen(_ context.Context, _ *pusher.Worker, gossips <-chan *pusher.Gossip) {
	defer close(o.done)

	for gossip := range gossips {
		if gossip.When == o.when {
			o.count++
		}
	}
}

func (o *Observer) Stop() {
	<-o.done
}

Development

System Requirements
go version
# go version go1.25.2 or higher

just --version
# just 1.42.4 or higher (https://just.systems/)
Download sources
PROJECT_ROOT=pusher
git clone https://github.com/therenotomorrow/pusher.git "$PROJECT_ROOT"
cd "$PROJECT_ROOT"
Setup dependencies
# install dependencies
go mod tidy

# check code integrity
just code test # see other recipes by calling `just`

# setup safe development (optional)
git config --local core.hooksPath .githooks
Project Structure
pusher/
├── config.go   # Configuration and functional options
├── errors.go   # Error definitions
├── gossip.go   # Event system and telemetry
├── pusher.go   # Main API and high-level functions
└── worker.go   # Worker implementation and execution logic
Testing
# run quick checks
just test smoke # or just test

# run with coverage
just test cover

License

MIT License. See the LICENSE file for details.

Documentation

Overview

Package pusher provides tools for load testing by repeatedly calling a given target function.

Index

Constants

View Source
const (
	// ErrInvalidRPS is returned when the provided requests-per-second (RPS) value is not positive.
	ErrInvalidRPS = ex.Error("invalid rps")

	// ErrMissingTarget is returned when a Worker is hired without a Target function.
	ErrMissingTarget = ex.Error("target is missing")

	// ErrWorkerIsBusy is returned when Work is called on a Worker that is already running.
	ErrWorkerIsBusy = ex.Error("worker is busy")

	// ErrInvalidOvertime is returned when Work is tried to run with a negative WithOvertime option.
	ErrInvalidOvertime = ex.Error("invalid overtime")
)

Variables

This section is empty.

Functions

func Farm

func Farm(rps int, duration time.Duration, workers []*Worker) error

Farm runs a set of pre-configured workers in parallel. It uses an errgroup to manage their lifecycle, ensuring that if one worker fails, the context is canceled for all.

func Force

func Force(rps int, duration time.Duration, target Target, offers ...Offer) func(amount int) error

Force is a high-level wrapper that creates a specified number of workers with the same configuration and runs them as a Farm. Be careful - overtime will be populated by all workers at once. Text me if you need another behaviour.

func Work

func Work(rps int, duration time.Duration, target Target, offers ...Offer) error

Work is a convenience wrapper that creates and runs a single Worker for a specified duration.

Types

type Config

type Config struct {
	Ident       string
	Listeners   []Gossiper
	Overtime    int
	WLBCapacity int
	Busy        bool
}

Config is a public copy of the Worker internals.

type Gossip

type Gossip struct {
	Result Result
	Error  error
	When   When
}

Gossip represents a telemetry event generated during a Worker's operation. It contains the result, an error, and the task lifecycle stage.

func (*Gossip) AfterTarget

func (g *Gossip) AfterTarget() bool

AfterTarget returns true if the Gossip event occurred after the target execution.

func (*Gossip) BeforeTarget

func (g *Gossip) BeforeTarget() bool

BeforeTarget returns true if the Gossip event occurred before the target execution.

func (*Gossip) Canceled

func (g *Gossip) Canceled() bool

Canceled returns true if the Gossip event represents a canceled task.

func (*Gossip) String

func (g *Gossip) String() string

type Gossiper

type Gossiper interface {
	// Listen runs in its own goroutine and processes events from the gossip channel.
	Listen(ctx context.Context, worker *Worker, gossips <-chan *Gossip)

	// Stop is called to gracefully shut down the listener and flush any buffered data.
	Stop()
}

Gossiper defines the interface for listeners that process Gossip events. This allows plugging in various metric collectors, loggers, or reporters.

type Offer

type Offer func(w *Worker)

Offer is a functional option for configuring a Worker. This pattern allows for flexible and extensible Worker initialization.

func WithGossips

func WithGossips(listeners ...Gossiper) Offer

WithGossips configures a Worker with a set of event listeners.

func WithOvertime

func WithOvertime(limit int) Offer

WithOvertime sets the maximum number of concurrently executing tasks that a Worker can run. It acts as a concurrency limiter.

type Result

type Result interface{ fmt.Stringer }

Result represents the outcome of a single Target execution.

type Target

type Target func(ctx context.Context) (Result, error)

Target is a function that performs the work to be tested. It receives a context for cancellation and must return a Result and an error.

type When

type When string

When defines the stage of a task's lifecycle at which a Gossip event is generated.

const (
	// BeforeTarget is the moment just before the Target function is called.
	BeforeTarget When = "before-target"

	// AfterTarget is the moment just after the Target function returns.
	AfterTarget When = "after-target"

	// Canceled indicates that a scheduled task was skipped because the concurrency
	// limit was reached.
	Canceled When = "canceled"
)

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is the core entity that generates a load by repeatedly calling the Target function at a specified rate (RPS) and concurrency limit.

func Hire

func Hire(ident string, target Target, offers ...Offer) *Worker

Hire creates and configures a new Worker instance using functional options.

func (*Worker) Config

func (w *Worker) Config() Config

Config returns the public copy of Worker internals.

func (*Worker) String

func (w *Worker) String() string

func (*Worker) Work

func (w *Worker) Work(ctx context.Context, rps int) error

Work starts the load generation loop. It's a blocking method that runs until the provided context is canceled. It generates requests at the specified RPS, respecting the concurrency limit.

Directories

Path Synopsis
farm command
force command
hire command
metrics command
work command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL