Documentation
¶
Overview ¶
Package queue provides a PostgreSQL-backed task queue with support for delayed tasks, retries with exponential backoff, and periodic tickers.
Manager ¶
Create a manager with namespace isolation:
mgr, err := queue.New(ctx, pool, queue.WithNamespace("myapp"))
if err != nil {
panic(err)
}
Queues ¶
Register queues that define retry behavior:
ttl := 24 * time.Hour
retries := uint64(3)
retryDelay := time.Minute
queue, err := mgr.RegisterQueue(ctx, schema.QueueMeta{
Queue: "emails",
TTL: &ttl,
Retries: &retries,
RetryDelay: &retryDelay,
})
Tasks ¶
Create and process tasks:
// Create a task
task, err := mgr.CreateTask(ctx, "emails", schema.TaskMeta{
Payload: map[string]any{"to": "[email protected]"},
})
// Retain next task from a specific queue
task, err := mgr.NextTask(ctx, "worker-1", "emails")
// Retain next task from any queue
task, err := mgr.NextTask(ctx, "worker-1")
// Retain next task from multiple queues
task, err := mgr.NextTask(ctx, "worker-1", "emails", "notifications")
// Release task (success)
mgr.ReleaseTask(ctx, task.Id, true, nil, &status)
// Release task (failure - will retry with backoff)
mgr.ReleaseTask(ctx, task.Id, false, errPayload, &status)
WorkerPool ¶
Use WorkerPool for concurrent task processing:
pool, err := queue.NewWorkerPool(mgr,
queue.WithWorkers(4),
queue.WithWorkerName("worker-1"),
)
// Register queue handlers
pool.RegisterQueue(ctx, schema.QueueMeta{Queue: "emails"}, func(ctx context.Context, task *schema.Task) error {
// Process task
return nil
})
// Run blocks until context is cancelled
err = pool.Run(ctx)
Tickers ¶
Register periodic tickers:
interval := time.Hour
ticker, err := mgr.RegisterTicker(ctx, schema.TickerMeta{
Ticker: "hourly-report",
Interval: &interval,
})
Subpackages ¶
- schema: Data types, request/response structures, and SQL generation
- httphandler: REST API handlers for all queue operations
- httpclient: Typed Go client for the REST API
Index ¶
- Variables
- type Manager
- func (manager *Manager) CleanQueue(ctx context.Context, name string) ([]schema.Task, error)
- func (manager *Manager) Conn() pg.PoolConn
- func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (*schema.Task, error)
- func (manager *Manager) DeleteQueue(ctx context.Context, name string) (*schema.Queue, error)
- func (manager *Manager) DeleteTicker(ctx context.Context, name string) (*schema.Ticker, error)
- func (manager *Manager) GetQueue(ctx context.Context, name string) (*schema.Queue, error)
- func (manager *Manager) GetTicker(ctx context.Context, name string) (*schema.Ticker, error)
- func (manager *Manager) ListNamespaces(ctx context.Context, req schema.NamespaceListRequest) (*schema.NamespaceList, error)
- func (manager *Manager) ListQueueStatuses(ctx context.Context) ([]schema.QueueStatus, error)
- func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (*schema.QueueList, error)
- func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (*schema.TaskList, error)
- func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (*schema.TickerList, error)
- func (manager *Manager) Namespace() string
- func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (*schema.Task, error)
- func (manager *Manager) NextTicker(ctx context.Context) (*schema.Ticker, error)
- func (manager *Manager) NextTickerNs(ctx context.Context, namespace string) (*schema.Ticker, error)
- func (manager *Manager) RegisterQueue(ctx context.Context, meta schema.QueueMeta) (*schema.Queue, error)
- func (manager *Manager) RegisterQueueWorker(ctx context.Context, meta schema.QueueMeta, worker Worker) (*schema.Queue, error)
- func (manager *Manager) RegisterTicker(ctx context.Context, meta schema.TickerMeta) (*schema.Ticker, error)
- func (manager *Manager) RegisterTickerNs(ctx context.Context, namespace string, meta schema.TickerMeta) (*schema.Ticker, error)
- func (manager *Manager) RegisterTickerWorker(ctx context.Context, meta schema.TickerMeta, worker Worker) (*schema.Ticker, error)
- func (manager *Manager) ReleaseTask(ctx context.Context, task uint64, success bool, result any, status *string) (*schema.Task, error)
- func (manager *Manager) Run(ctx context.Context) error
- func (manager *Manager) RunTaskLoop(ctx context.Context, handler TaskHandler, queues ...string) error
- func (manager *Manager) RunTickerLoop(ctx context.Context, period time.Duration, handler TickerHandler) error
- func (manager *Manager) RunTickerLoopChan(ctx context.Context, ch chan<- *schema.Ticker, period time.Duration) error
- func (manager *Manager) RunTickerLoopNs(ctx context.Context, namespace string, period time.Duration, ...) error
- func (manager *Manager) RunTickerLoopNsChan(ctx context.Context, namespace string, ch chan<- *schema.Ticker, ...) error
- func (manager *Manager) UpdateQueue(ctx context.Context, name string, meta schema.QueueMeta) (*schema.Queue, error)
- func (manager *Manager) UpdateTicker(ctx context.Context, name string, meta schema.TickerMeta) (*schema.Ticker, error)
- type Opt
- type TaskHandler
- type TickerHandler
- type Worker
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func New ¶
New creates a new queue manager. Use WithNamespace to set the namespace for all queue operations.
func (*Manager) CleanQueue ¶
CleanQueue removes stale tasks from a queue, and returns the tasks removed
func (*Manager) CreateTask ¶
func (manager *Manager) CreateTask(ctx context.Context, queue string, meta schema.TaskMeta) (*schema.Task, error)
CreateTask creates a new task, and returns it.
func (*Manager) DeleteQueue ¶
DeleteQueue deletes an existing queue, and returns it
func (*Manager) DeleteTicker ¶
DeleteTicker deletes an existing ticker, and returns the deleted ticker.
func (*Manager) ListNamespaces ¶
func (manager *Manager) ListNamespaces(ctx context.Context, req schema.NamespaceListRequest) (*schema.NamespaceList, error)
ListNamespaces returns all distinct namespaces from the queue table
func (*Manager) ListQueueStatuses ¶
ListQueueStatuses returns the status breakdown for all queues in the namespace
func (*Manager) ListQueues ¶
func (manager *Manager) ListQueues(ctx context.Context, req schema.QueueListRequest) (*schema.QueueList, error)
ListQueues returns all queues in a namespace as a list
func (*Manager) ListTasks ¶
func (manager *Manager) ListTasks(ctx context.Context, req schema.TaskListRequest) (*schema.TaskList, error)
ListTasks returns all tasks in a namespace as a list, with optional filtering
func (*Manager) ListTickers ¶
func (manager *Manager) ListTickers(ctx context.Context, req schema.TickerListRequest) (*schema.TickerList, error)
ListTickers returns all tickers in a namespace as a list
func (*Manager) NextTask ¶
func (manager *Manager) NextTask(ctx context.Context, worker string, queues ...string) (*schema.Task, error)
NextTask retains a task from any of the specified queues, and returns it. If queues is empty, tasks from any queue are considered. Returns nil if there is no task to retain.
func (*Manager) NextTicker ¶
NextTicker returns the next matured ticker, or nil
func (*Manager) NextTickerNs ¶
NextTickerNs returns the next matured ticker in a namespace, or nil
func (*Manager) RegisterQueue ¶
func (manager *Manager) RegisterQueue(ctx context.Context, meta schema.QueueMeta) (*schema.Queue, error)
RegisterQueue creates a new queue, or updates an existing queue, and returns it.
func (*Manager) RegisterQueueWorker ¶ added in v1.1.1
func (manager *Manager) RegisterQueueWorker(ctx context.Context, meta schema.QueueMeta, worker Worker) (*schema.Queue, error)
RegisterQueueWorker registers a queue with its worker and creates/updates it in the database.
func (*Manager) RegisterTicker ¶
func (manager *Manager) RegisterTicker(ctx context.Context, meta schema.TickerMeta) (*schema.Ticker, error)
RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.
func (*Manager) RegisterTickerNs ¶
func (manager *Manager) RegisterTickerNs(ctx context.Context, namespace string, meta schema.TickerMeta) (*schema.Ticker, error)
RegisterTicker creates a new ticker, or updates an existing ticker, and returns it.
func (*Manager) RegisterTickerWorker ¶ added in v1.1.1
func (manager *Manager) RegisterTickerWorker(ctx context.Context, meta schema.TickerMeta, worker Worker) (*schema.Ticker, error)
RegisterTickerWorker registers a ticker with its worker and creates/updates it in the database.
func (*Manager) ReleaseTask ¶
func (manager *Manager) ReleaseTask(ctx context.Context, task uint64, success bool, result any, status *string) (*schema.Task, error)
ReleaseTask releases a task from a queue, and returns it. Can optionally set the status
func (*Manager) Run ¶
Run starts the cleanup ticker and all registered task and ticker workers, blocking until context is cancelled or an error occurs.
func (*Manager) RunTaskLoop ¶
func (manager *Manager) RunTaskLoop(ctx context.Context, handler TaskHandler, queues ...string) error
RunTaskLoop runs a task loop with N concurrent workers. This is a low-level API - most users should use RegisterQueueWorker + Run() instead.
Each worker calls the handler function when a task is received. The handler is responsible for:
- Processing the task
- Calling ReleaseTask to mark the task as completed or failed
- Returning an error if the task processing failed
Note: The handler runs in a background goroutine, so any error returned is already tracked in OpenTelemetry spans but won't propagate to the caller.
Blocks until context is cancelled. If queues is empty, tasks from any queue are considered. The worker name (from WithWorkerName) will have @N appended for each concurrent worker (e.g., "hostname@1"). The number of workers is set via WithWorkers option.
func (*Manager) RunTickerLoop ¶
func (manager *Manager) RunTickerLoop(ctx context.Context, period time.Duration, handler TickerHandler) error
RunTickerLoop runs a loop to process matured tickers with a single worker. The handler is called for each matured ticker. Blocks until context is cancelled or an error occurs.
func (*Manager) RunTickerLoopChan ¶ added in v1.1.1
func (manager *Manager) RunTickerLoopChan(ctx context.Context, ch chan<- *schema.Ticker, period time.Duration) error
RunTickerLoopChan runs a loop to process matured tickers, sending them to a channel. Use this for streaming scenarios where you need direct channel access. The caller owns the channel and must close it when done. Blocks until context is cancelled or an error occurs.
func (*Manager) RunTickerLoopNs ¶
func (manager *Manager) RunTickerLoopNs(ctx context.Context, namespace string, period time.Duration, handler TickerHandler) error
RunTickerLoopNs runs a loop to process matured tickers in a specific namespace with a single worker. The handler is called for each matured ticker. Blocks until context is cancelled or an error occurs.
func (*Manager) RunTickerLoopNsChan ¶ added in v1.1.1
func (manager *Manager) RunTickerLoopNsChan(ctx context.Context, namespace string, ch chan<- *schema.Ticker, period time.Duration) error
RunTickerLoopNsChan runs a loop to process matured tickers in a namespace, sending them to a channel. Use this for streaming scenarios where you need direct channel access. The caller owns the channel and must close it when done. Blocks until context is cancelled or an error occurs.
type Opt ¶
type Opt func(*opts) error
Opt is a functional option for worker pool, queue, and ticker configuration.
func WithNamespace ¶ added in v1.1.1
WithNamespace sets the namespace used to scope all queue operations. The namespace cannot be empty or use the reserved system namespace.
func WithPeriod ¶
WithPeriod sets the polling period for a ticker. Returns ErrInvalidPeriod if d < 1ms.
func WithTracer ¶ added in v1.1.1
WithTracer sets the tracer used for tracing operations.
func WithWorkerName ¶
WithWorkerName sets the worker name used to identify this worker instance. Defaults to the hostname if not specified.
func WithWorkers ¶
WithWorkers sets the number of concurrent workers for the worker pool. The worker pool uses a shared pool of workers to process tasks from any registered queue. Returns ErrInvalidWorkers if n < 1.
type TaskHandler ¶
TaskHandler processes a task. Return nil on success, or an error to fail the task.
type TickerHandler ¶
TickerHandler processes a ticker. Return nil on success, or an error on failure.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package httpclient provides a typed Go client for consuming the PostgreSQL queue management REST API.
|
Package httpclient provides a typed Go client for consuming the PostgreSQL queue management REST API. |
|
Package httphandler provides HTTP handlers for the queue package.
|
Package httphandler provides HTTP handlers for the queue package. |