queue

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

README

PostgreSQL Task Queue

The queue package provides a PostgreSQL-backed task queue with support for delayed tasks, retries with exponential backoff, and periodic tickers. It enables reliable background job processing through both a direct Go API and HTTP interfaces.

To test the package:

git clone github.com/mutablelogic/go-pg
make tests

You'll need to have docker installed in order to run the integration tests, which will create a PostgreSQL server in a container. There is a command line client included for testing:

git clone github.com/mutablelogic/go-pg
make cmd/pgqueue

This places a pgqueue binary in the build folder which you can use as a server or client. To run the server on localhost, port 8080:

build/pgqueue run postgres://localhost:5432/postgres

To use the client:

build/pgqueue queues
build/pgqueue tasks --status=new

Run build/pgqueue --help for more information and to understand the available commands and settings.

Architecture

The package is organized into four main components:

Manager (this package folder)

The core component that provides direct access to queue operations. It wraps a connection pool and exposes methods for managing queues, tasks, and tickers.

import "github.com/mutablelogic/go-pg/pkg/queue"

// Create a manager with namespace isolation
mgr, err := queue.New(ctx, pool, queue.WithNamespace("myapp"))
Schema (schema/)

Defines all data types, request/response structures, and SQL queries. Each resource type has its own file containing:

  • Structs - Go types representing queue objects (Queue, Task, Ticker)
  • Meta types - Parameters for creating/updating resources
  • SQL generation - Methods that produce parameterized SQL queries
HTTP Handler (httphandler/)

Provides REST API endpoints for all queue operations. Register handlers with an http.ServeMux:

import "github.com/mutablelogic/go-pg/pkg/queue/httphandler"

httphandler.RegisterBackendHandlers(mux, "/api", mgr)
HTTP Client (httpclient/)

A typed client for consuming the REST API from Go applications:

import "github.com/mutablelogic/go-pg/pkg/queue/httpclient"

client, err := httpclient.New("http://localhost:8080/api")
queues, err := client.ListQueues(ctx)

Core Concepts

Queues

Queues hold tasks and define retry behavior:

ttl := 24 * time.Hour
retries := uint64(3)
retryDelay := time.Minute

queue, err := mgr.RegisterQueue(ctx, schema.QueueMeta{
    Queue:      "emails",
    TTL:        &ttl,        // Task expiration
    Retries:    &retries,    // Max retry attempts
    RetryDelay: &retryDelay, // Base delay (exponential backoff)
})
Tasks

Tasks progress through a lifecycle:

Status Description
new Newly created, waiting to be processed
retry Failed but has retries remaining, waiting for backoff delay
retained Locked by a worker for processing
released Finished successfully
failed Exhausted all retries
expired TTL exceeded before completion

Tasks with status released, failed, or expired are automatically cleaned up when the queue's TTL expires.

Create and process tasks:

// Create a task
task, err := mgr.CreateTask(ctx, "emails", schema.TaskMeta{
    Payload: map[string]any{"to": "[email protected]"},
})

// Create a delayed task
delayedAt := time.Now().Add(time.Hour)
task, err := mgr.CreateTask(ctx, "emails", schema.TaskMeta{
    Payload:   map[string]any{"to": "[email protected]"},
    DelayedAt: &delayedAt,
})

// Worker: get next available task from specific queue
task, err := mgr.NextTask(ctx, "worker-1", "emails")

// Worker: get next available task from any queue
task, err := mgr.NextTask(ctx, "worker-1")

// Worker: get next available task from multiple queues
task, err := mgr.NextTask(ctx, "worker-1", "emails", "notifications")

// Release task (success)
mgr.ReleaseTask(ctx, task.Id, true, nil, &status)

// Release task (failure - will retry with backoff)
mgr.ReleaseTask(ctx, task.Id, false, errPayload, &status)
Tickers

Tickers generate tasks on a schedule:

period := time.Hour
ticker, err := mgr.RegisterTicker(ctx, schema.TickerMeta{
    Ticker:  "hourly-report",
    Queue:   "reports",
    Period:  &period,
    Payload: map[string]any{"type": "hourly"},
})

// Process matured tickers
mgr.RunTickerLoop(ctx, func(ticker *schema.Ticker) error {
    _, err := mgr.CreateTask(ctx, ticker.Queue, schema.TaskMeta{
        Payload: ticker.Payload,
    })
    return err
})
Namespaces

Each manager operates within a namespace for multi-tenant isolation:

appMgr, _ := queue.New(ctx, pool, queue.WithNamespace("app"))
adminMgr, _ := queue.New(ctx, pool, queue.WithNamespace("admin"))
// Queues and tasks are completely independent

A special pgqueue system namespace is automatically created and used for internal operations like expired task cleanup. This namespace should not be used by applications.

REST API Endpoints

All endpoints are prefixed with the configured path (e.g., /api):

Namespace
Method Path Description
GET {prefix}/namespace List all namespaces
Queue
Method Path Description
GET {prefix}/queue List queues
POST {prefix}/queue Create queue
GET {prefix}/queue/{name} Get queue by name
PATCH {prefix}/queue/{name} Update queue
DELETE {prefix}/queue/{name} Delete queue
Task
Method Path Description
GET {prefix}/task List tasks (filter: ?queue=, ?status=, ?offset=, ?limit=)
POST {prefix}/task Create task
PUT {prefix}/task Retain next task from any queue (requires ?worker=)
PUT {prefix}/task/{queue} Retain next task from specific queue (requires ?worker=)
PATCH {prefix}/task/{id} Release task with result ({"fail": bool, "result": any})
Ticker
Method Path Description
GET {prefix}/ticker List tickers
POST {prefix}/ticker Create ticker
GET {prefix}/ticker/{name} Get ticker by name
PATCH {prefix}/ticker/{name} Update ticker
DELETE {prefix}/ticker/{name} Delete ticker
GET {prefix}/ticker/next SSE stream of matured tickers
Metrics
Method Path Description
GET {prefix}/metrics Prometheus metrics

Exposes the queue_tasks gauge metric with labels for namespace, queue, and status.

CLI Commands

# Namespace operations
pgqueue namespaces                 # List namespaces

# Queue operations
pgqueue queues                     # List queues
pgqueue queue myqueue              # Get queue details
pgqueue create-queue myqueue       # Create queue
pgqueue update-queue myqueue       # Update queue
pgqueue delete-queue myqueue       # Delete queue

# Task operations
pgqueue tasks                      # List all tasks
pgqueue tasks --queue=myqueue      # Filter by queue
pgqueue tasks --status=new         # Filter by status
pgqueue create-task myqueue        # Create task
pgqueue retain-task --worker=worker1 myqueue  # Retain next task from specific queue
pgqueue retain-task --worker=worker1          # Retain next task from any queue
pgqueue retain-task --worker=worker1 queue1 queue2  # Retain from multiple queues
pgqueue complete-task 123          # Release task (success)
pgqueue complete-task 123 --error '{"msg":"failed"}'  # Release task (failure)

# Ticker operations
pgqueue tickers                    # List tickers
pgqueue ticker myticker            # Get ticker details
pgqueue create-ticker myticker     # Create ticker
pgqueue update-ticker myticker     # Update ticker
pgqueue delete-ticker myticker     # Delete ticker
pgqueue next-ticker                # Stream matured tickers (SSE)

# Server
pgqueue run postgres://...         # Run HTTP server

Dependencies

  • github.com/mutablelogic/go-pg - PostgreSQL connection pool
  • github.com/mutablelogic/go-server - HTTP utilities
  • github.com/mutablelogic/go-client - HTTP client

Documentation

Overview

Package queue provides a PostgreSQL-backed task queue with support for delayed tasks, retries with exponential backoff, and periodic tickers.

Manager

Create a manager with namespace isolation:

mgr, err := queue.New(ctx, pool, queue.WithNamespace("myapp"))
if err != nil {
	panic(err)
}

Queues

Register queues that define retry behavior:

ttl := 24 * time.Hour
retries := uint64(3)
retryDelay := time.Minute

queue, err := mgr.RegisterQueue(ctx, schema.QueueMeta{
	Queue:      "emails",
	TTL:        &ttl,
	Retries:    &retries,
	RetryDelay: &retryDelay,
})

Tasks

Create and process tasks:

// Create a task
task, err := mgr.CreateTask(ctx, "emails", schema.TaskMeta{
	Payload: map[string]any{"to": "[email protected]"},
})

// Retain next task from a specific queue
task, err := mgr.NextTask(ctx, "worker-1", "emails")

// Retain next task from any queue
task, err := mgr.NextTask(ctx, "worker-1")

// Retain next task from multiple queues
task, err := mgr.NextTask(ctx, "worker-1", "emails", "notifications")

// Release task (success)
mgr.ReleaseTask(ctx, task.Id, true, nil, &status)

// Release task (failure - will retry with backoff)
mgr.ReleaseTask(ctx, task.Id, false, errPayload, &status)

WorkerPool

Use WorkerPool for concurrent task processing:

pool, err := queue.NewWorkerPool(mgr,
	queue.WithWorkers(4),
	queue.WithWorkerName("worker-1"),
)

// Register queue handlers
pool.RegisterQueue(ctx, schema.QueueMeta{Queue: "emails"}, func(ctx context.Context, task *schema.Task) error {
	// Process task
	return nil
})

// Run blocks until context is cancelled
err = pool.Run(ctx)

Tickers

Register periodic tickers:

interval := time.Hour
ticker, err := mgr.RegisterTicker(ctx, schema.TickerMeta{
	Ticker:   "hourly-report",
	Interval: &interval,
})

Subpackages

  • schema: Data types, request/response structures, and SQL generation
  • httphandler: REST API handlers for all queue operations
  • httpclient: Typed Go client for the REST API

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidNamespace  = errors.New("namespace must not be empty")
	ErrReservedNamespace = errors.New("namespace is reserved for system use")
	ErrInvalidWorkers    = errors.New("workers must be >= 1")
	ErrInvalidPeriod     = errors.New("period must be >= 1ms")
)

Functions

This section is empty.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, conn pg.PoolConn, opts ...Opt) (*Manager, error)

New creates a new queue manager. Use WithNamespace to set the namespace for all queue operations.

func (*Manager) CleanQueue

func (manager *Manager) CleanQueue(ctx context.Context, name string) ([]schema.Task, error)

CleanQueue removes stale tasks from a queue, and returns the tasks removed

func (*Manager) Conn

func (manager *Manager) Conn() pg.PoolConn

func (*Manager) CreateTask

func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (*schema.Task, error)

CreateTask creates a new task, and returns it.

func (*Manager) DeleteQueue

func (manager *Manager) DeleteQueue(ctx context.Context, name string) (*schema.Queue, error)

DeleteQueue deletes an existing queue, and returns it

func (*Manager) DeleteTicker

func (manager *Manager) DeleteTicker(ctx context.Context, name string) (*schema.Ticker, error)

DeleteTicker deletes an existing ticker, and returns the deleted ticker.

func (*Manager) GetQueue

func (manager *Manager) GetQueue(ctx context.Context, name string) (*schema.Queue, error)

GetQueue returns a queue by name

func (*Manager) GetTicker

func (manager *Manager) GetTicker(ctx context.Context, name string) (*schema.Ticker, error)

GetTicker returns a ticker by name

func (*Manager) ListNamespaces

func (manager *Manager) ListNamespaces(ctx context.Context, req schema.NamespaceListRequest) (*schema.NamespaceList, error)

ListNamespaces returns all distinct namespaces from the queue table

func (*Manager) ListQueueStatuses

func (manager *Manager) ListQueueStatuses(ctx context.Context) ([]schema.QueueStatus, error)

ListQueueStatuses returns the status breakdown for all queues in the namespace

func (*Manager) ListQueues

func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (*schema.QueueList, error)

ListQueues returns all queues in a namespace as a list

func (*Manager) ListTasks

func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (*schema.TaskList, error)

ListTasks returns all tasks in a namespace as a list, with optional filtering

func (*Manager) ListTickers

func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (*schema.TickerList, error)

ListTickers returns all tickers in a namespace as a list

func (*Manager) Namespace

func (manager *Manager) Namespace() string

func (*Manager) NextTask

func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (*schema.Task, error)

NextTask retains a task from any of the specified queues, and returns it. If queues is empty, tasks from any queue are considered. Returns nil if there is no task to retain.

func (*Manager) NextTicker

func (manager *Manager) NextTicker(ctx context.Context) (*schema.Ticker, error)

NextTicker returns the next matured ticker, or nil

func (*Manager) NextTickerNs

func (manager *Manager) NextTickerNs(ctx context.Context, namespace string) (*schema.Ticker, error)

NextTickerNs returns the next matured ticker in a namespace, or nil

func (*Manager) RegisterQueue

func (manager *Manager) RegisterQueue(ctx context.Context, meta schema.QueueMeta) (*schema.Queue, error)

RegisterQueue creates a new queue, or updates an existing queue, and returns it.

func (*Manager) RegisterQueueWorker added in v1.1.1

func (manager *Manager) RegisterQueueWorker(ctx context.Context, meta schema.QueueMeta, worker Worker) (*schema.Queue, error)

RegisterQueueWorker registers a queue with its worker and creates/updates it in the database.

func (*Manager) RegisterTicker

func (manager *Manager) RegisterTicker(ctx context.Context, meta schema.TickerMeta) (*schema.Ticker, error)

RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.

func (*Manager) RegisterTickerNs

func (manager *Manager) RegisterTickerNs(ctx context.Context, namespace string, meta schema.TickerMeta) (*schema.Ticker, error)

RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.

func (*Manager) RegisterTickerWorker added in v1.1.1

func (manager *Manager) RegisterTickerWorker(ctx context.Context, meta schema.TickerMeta, worker Worker) (*schema.Ticker, error)

RegisterTickerWorker registers a ticker with its worker and creates/updates it in the database.

func (*Manager) ReleaseTask

func (manager *Manager) ReleaseTask(ctx context.Context, task uint64, success bool, result any, status *string) (*schema.Task, error)

ReleaseTask releases a task from a queue, and returns it. Can optionally set the status

func (*Manager) Run

func (manager *Manager) Run(ctx context.Context) error

Run starts the cleanup ticker and all registered task and ticker workers, blocking until context is cancelled or an error occurs.

func (*Manager) RunTaskLoop

func (manager *Manager) RunTaskLoop(ctx context.Context, handler TaskHandler, queues ...string) error

RunTaskLoop runs a task loop with N concurrent workers. This is a low-level API - most users should use RegisterQueueWorker + Run() instead.

Each worker calls the handler function when a task is received. The handler is responsible for:

  • Processing the task
  • Calling ReleaseTask to mark the task as completed or failed
  • Returning an error if the task processing failed

Note: The handler runs in a background goroutine, so any error returned is already tracked in OpenTelemetry spans but won't propagate to the caller.

Blocks until context is cancelled. If queues is empty, tasks from any queue are considered. The worker name (from WithWorkerName) will have @N appended for each concurrent worker (e.g., "hostname@1"). The number of workers is set via WithWorkers option.

func (*Manager) RunTickerLoop

func (manager *Manager) RunTickerLoop(ctx context.Context, period time.Duration, handler TickerHandler) error

RunTickerLoop runs a loop to process matured tickers with a single worker. The handler is called for each matured ticker. Blocks until context is cancelled or an error occurs.

func (*Manager) RunTickerLoopChan added in v1.1.1

func (manager *Manager) RunTickerLoopChan(ctx context.Context, ch chan<- *schema.Ticker, period time.Duration) error

RunTickerLoopChan runs a loop to process matured tickers, sending them to a channel. Use this for streaming scenarios where you need direct channel access. The caller owns the channel and must close it when done. Blocks until context is cancelled or an error occurs.

func (*Manager) RunTickerLoopNs

func (manager *Manager) RunTickerLoopNs(ctx context.Context, namespace string, period time.Duration, handler TickerHandler) error

RunTickerLoopNs runs a loop to process matured tickers in a specific namespace with a single worker. The handler is called for each matured ticker. Blocks until context is cancelled or an error occurs.

func (*Manager) RunTickerLoopNsChan added in v1.1.1

func (manager *Manager) RunTickerLoopNsChan(ctx context.Context, namespace string, ch chan<- *schema.Ticker, period time.Duration) error

RunTickerLoopNsChan runs a loop to process matured tickers in a namespace, sending them to a channel. Use this for streaming scenarios where you need direct channel access. The caller owns the channel and must close it when done. Blocks until context is cancelled or an error occurs.

func (*Manager) UpdateQueue

func (manager *Manager) UpdateQueue(ctx context.Context, name string, meta schema.QueueMeta) (*schema.Queue, error)

UpdateQueue updates an existing queue, and returns it.

func (*Manager) UpdateTicker

func (manager *Manager) UpdateTicker(ctx context.Context, name string, meta schema.TickerMeta) (*schema.Ticker, error)

UpdateTicker updates an existing ticker, and returns it.

type Opt

type Opt func(*opts) error

Opt is a functional option for worker pool, queue, and ticker configuration.

func WithNamespace added in v1.1.1

func WithNamespace(name string) Opt

WithNamespace sets the namespace used to scope all queue operations. The namespace cannot be empty or use the reserved system namespace.

func WithPeriod

func WithPeriod(d time.Duration) Opt

WithPeriod sets the polling period for a ticker. Returns ErrInvalidPeriod if d < 1ms.

func WithTracer added in v1.1.1

func WithTracer(tracer trace.Tracer) Opt

WithTracer sets the tracer used for tracing operations.

func WithWorkerName

func WithWorkerName(name string) Opt

WithWorkerName sets the worker name used to identify this worker instance. Defaults to the hostname if not specified.

func WithWorkers

func WithWorkers(n int) Opt

WithWorkers sets the number of concurrent workers for the worker pool. The worker pool uses a shared pool of workers to process tasks from any registered queue. Returns ErrInvalidWorkers if n < 1.

type TaskHandler

type TaskHandler func(context.Context, *schema.Task) error

TaskHandler processes a task. Return nil on success, or an error to fail the task.

type TickerHandler

type TickerHandler func(context.Context, *schema.Ticker) error

TickerHandler processes a ticker. Return nil on success, or an error on failure.

type Worker added in v1.1.1

type Worker interface {
	Run(ctx context.Context, payload json.RawMessage) error
}

Worker processes a task or ticker payload. Return nil on success, or an error to indicate failure.

Directories

Path Synopsis
Package httpclient provides a typed Go client for consuming the PostgreSQL queue management REST API.
Package httpclient provides a typed Go client for consuming the PostgreSQL queue management REST API.
Package httphandler provides HTTP handlers for the queue package.
Package httphandler provides HTTP handlers for the queue package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL