gosteady

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: MIT Imports: 8 Imported by: 0

README

Gosteady

Deterministic, testable token-bucket rate limiting for Go, with optional upstream-aware dynamic tuning.

This package wraps golang.org/x/time/rate but avoids real time in tests by using an injected clock (utils.Clock).

Features

  • Token-bucket limiting (requests per minute + burst)
  • context.Context cancellation support
  • Thread-safe public API
  • Deterministic tests via injected clock + fake timers
  • Optional dynamic adjustment using upstream rate limit state
  • Minimal logger interface (adapter-friendly)

Install

go get codeberg.org/joydx/gosteady

Import:

import "codeberg.org/joydx/gosteady"

Quick start

cfg := gosteady.DefaultLimiterConfig().
    WithRequestsLimitPerMinute(60) // ~1 request/second

l := gosteady.NewLimiter(&cfg)

for {
    if err := l.WaitForAllowance(ctx); err != nil {
        return err
    }

    // perform upstream request...
}
Semantics: does WaitForAllowance consume a token?

Yes. If WaitForAllowance returns nil, capacity has been consumed for that operation. If the context is canceled while waiting, the reservation is canceled and does not affect future scheduling.

Configuration

LimiterConfig:

  • RequestsLimitPerMinute: steady-state maximum under nominal conditions
  • MinRequestsPerMinute: floor when adjusting down (defaults to 1 if <= 0)
  • Burst: token bucket burst size (defaults to 1 if <= 0)
  • AdjustEveryMillis: minimum interval between adjustments (defaults to 10s if <= 0)

Dynamic adjustment with upstream state

If an upstream API provides:

  • total allowed in the current window
  • remaining
  • reset time

…you can feed it into AdjustRateByState:

state := gosteady.LimiterState{
    Status:         "nominal",
    TotalLimit:     100,
    RemainingLimit: 30,
    ResetTime:      &resetTime,
}

l.AdjustRateByState(state)

If upstream says you are limited:

l.AdjustRateByState(gosteady.LimiterState{Status: "limited"})

The limiter will clamp down to MinRequestsPerMinute (conservative safety).

Hard wait until upstream reset

If you get a definitive reset timestamp (e.g., from a Retry-After or reset header), you can hard-wait:

if err := l.WaitUntilReset(ctx, &resetTime); err != nil {
    return err
}

Logging

The logger interface is intentionally small:

type Logger interface {
    Debug(msg string, kv ...any)
    Info(msg string, kv ...any)
    Warn(msg string, kv ...any)
    Error(msg string, kv ...any)
}

The limiter emits an info log when the RPM changes:

  • message: limiter rate updated
  • keys: prev_rpm, new_rpm, burst

Testing

Tests use utils.FakeClock to:

  • advance time deterministically
  • assert that operations truly block until timers fire
  • avoid flakiness from goroutine scheduling
Deterministic blocking assertions

When a call should block, tests can wait until the limiter actually created a timer before advancing the fake clock:

done := make(chan error, 1)
go func() { done <- l.WaitForAllowance(ctx) }()

// Avoid “advance before timer exists” no-op.
fc.WaitForTimersCreated(1)
fc.Advance(1*time.Second + 1*time.Nanosecond)

if err := <-done; err != nil {
    t.Fatal(err)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter provides token-bucket rate limiting with optional dynamic adjustment based on upstream rate limit state.

Thread-safe: all public methods are safe for concurrent use.

func NewLimiter

func NewLimiter(cfg *LimiterConfig) *Limiter

NewLimiter constructs a Limiter and initializes the internal rate limiter. If cfg is nil, DefaultLimiterConfig() is used.

func (*Limiter) AdjustRateByState

func (l *Limiter) AdjustRateByState(state LimiterState)

AdjustRateByState adjusts the limiter rate based on upstream limit state.

The goal is to avoid exhausting the upstream window by computing a "safe" rate derived from RemainingLimit and time until ResetTime, then clamping to [MinRequestsPerMinute, RequestsLimitPerMinute].

It is throttled by cfg.AdjustEveryMillis (default 10s) to avoid thrashing.

func (*Limiter) CurrentRPM

func (l *Limiter) CurrentRPM() int

CurrentRPM returns the currently configured rate limit in requests per minute.

func (*Limiter) WaitForAllowance

func (l *Limiter) WaitForAllowance(ctx context.Context) error

WaitForAllowance blocks until a token is available or ctx is done.

This should be called before making an upstream request.

func (*Limiter) WaitUntilReset

func (l *Limiter) WaitUntilReset(ctx context.Context, reset *time.Time) error

WaitUntilReset performs a hard wait until the provided reset time. This is useful when upstream returns a definitive "rate limited until X".

If reset is nil, returns nil. If reset is in the past, returns nil. Respect ctx cancellation.

func (*Limiter) WithClock

func (l *Limiter) WithClock(clock utils.Clock) *Limiter

WithClock replaces the clock (useful for tests).

func (*Limiter) WithLogger

func (l *Limiter) WithLogger(logger Logger) *Limiter

WithLogger replaces the logger.

type LimiterConfig

type LimiterConfig struct {
	// RequestsLimitPerMinute is the maximum steady-state rate this limiter will
	// allow under nominal conditions.
	RequestsLimitPerMinute int `json:"requests_limit_per_minute" yaml:"requests_limit_per_minute" mapstructure:"requests_limit_per_minute"`

	// MinRequestsPerMinute is a safety floor when dynamically adjusting rate.
	// If <= 0, defaults to 1.
	MinRequestsPerMinute int `json:"min_requests_per_minute" yaml:"min_requests_per_minute" mapstructure:"min_requests_per_minute"`

	// Burst is the token bucket burst size. Lower bursts avoid spiky traffic.
	// If <= 0, a conservative default is used (1).
	Burst int `json:"burst" yaml:"burst" mapstructure:"burst"`

	// AdjustEvery is a minimum interval between adjustments to avoid thrashing.
	// If zero, defaults to 10 seconds.
	AdjustEveryMillis int `json:"adjust_every_millis" yaml:"adjust_every_millis" mapstructure:"adjust_every_millis"`
}

func DefaultLimiterConfig

func DefaultLimiterConfig() LimiterConfig

func (*LimiterConfig) Ref

func (c *LimiterConfig) Ref() string

func (*LimiterConfig) VarMerge

func (c *LimiterConfig) VarMerge(namespace string)

func (*LimiterConfig) WithRequestsLimitPerMinute

func (c *LimiterConfig) WithRequestsLimitPerMinute(limit int) *LimiterConfig

type LimiterState

type LimiterState struct {
	TotalLimit     int
	RemainingLimit int
	ResetTime      *time.Time
	Status         string
}

LimiterState describes the upstream rate limit situation.

TotalLimit and RemainingLimit are counts for the current rate-limit window. ResetTime is when the window resets (upstream-provided time). Status is a coarse state string used by callers/adapters: - "nominal": normal operation - "limited": upstream says we are limited (e.g., 429/403 rate limit) - "waiting": we should wait until reset time - "error": upstream error parsing/unknown - "passive": no upstream info available

type Logger

type Logger interface {
	Debug(msg string, kv ...any)
	Info(msg string, kv ...any)
	Warn(msg string, kv ...any)
	Error(msg string, kv ...any)
}

Logger is intentionally minimal and test-friendly. Plug in zap/slog/logrus adapters as needed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL