bbq

package module
v0.0.0-...-4239029 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2024 License: MIT Imports: 4 Imported by: 1

README

BBQ 🥩

BBQ is a thread-safe bounded queue with batch reads/writes, timeouts and iterators for streaming data processing.

Installation

To install BBQ, run:

go get github.com/tetsuo/bbq

Example

package main

import (
	"fmt"

	"github.com/tetsuo/bbq"
)

func main() {
	// Create a queue with size 16
	queue := bbq.New[int](16)

	// Producer:
	go func() {
		for i := range 100 {
			_, err := queue.Write(i)
			if err != nil {
				fmt.Println("Write error:", err)
				return
			}
		}
		queue.Close() // Close the queue after writing
	}()

	// Consumer:
	buffer := make([]int, 4) // Batch size of 4
	for {
		n, err := queue.Read(buffer)
		if err != nil {
			if err == bbq.ErrQueueClosed {
				fmt.Println("Queue closed")
				break
			}
			fmt.Println("Read error:", err)
			continue
		}
		fmt.Println("Read:", buffer[:n])
	}
}

API

Creating a Queue
q := bbq.New[int](size)

Creates a new BBQ instance with the specified size, rounding up to the nearest power of two for optimal performance.

Writing to the Queue
n, err := q.Write(items...)

Adds one or more items to the queue, blocking if the queue is full until space becomes available or the queue is closed. Returns the number of items written or an ErrQueueClosed error.

Reading from the Queue
Read
n, err := q.Read(buffer)

Reads up to len(buffer) items from the queue, blocking if the queue is empty until data becomes available or the queue is closed. Returns the number of items read or ErrQueueClosed if the queue has been closed.

ReadUntil
n, err := q.ReadUntil(buffer, timeout)

Reads exactly len(buffer) items or until the specified timeout elapses, returning early if data becomes available. Returns ErrQueueClosed if the queue is closed and fully drained.

Iterators
Stream Items
for item := range q.Items() {
	fmt.Println(item)
}

Provides an iterator to stream individual items from the queue.

Stream Batches
for batch := range q.Slices(4) {
	fmt.Println(batch)
}

Streams batches of items (up to maxItems) from the queue.

Stream Batches with Timeout
for batch := range q.SlicesWhen(4, time.Second) {
	fmt.Println(batch)
}

Streams batches of a specific size or fewer when the timeout expires.

Managing the Queue
Close the Queue
q.Close()

Prevents further writes while allowing the consumer to drain remaining data. Once the buffer is fully drained, operations will return ErrQueueClosed.

Inspecting the Queue
q.Size()       // Total size of the queue
q.Available()  // Remaining space for writes
q.Used()       // Items currently in the queue
q.IsFull()     // Checks if the queue is full
q.IsEmpty()    // Checks if the queue is empty
Piping Between Queues
n, err := src.Pipe(dst)

Transfers items from source to dest, closing source if dest is closed while keeping dest open. Returns the number of items written to the destination in the final operation, or an error if one of the queues is closed.

License

MIT License. See LICENSE for details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueClosed is returned when operations are attempted on a closed queue.
	ErrQueueClosed = errors.New("bbq: operation on closed queue")
	// ErrInvalidSize is returned when a new size is invalid (e.g., smaller than the current size).
	ErrInvalidSize = errors.New("bbq: new size must be greater than the current size")
)

Functions

This section is empty.

Types

type BBQ

type BBQ[T any] struct {
	// contains filtered or unexported fields
}

BBQ is a thread-safe bounded queue that supports batch reads/writes and timeouts.

func New

func New[T any](size int) *BBQ[T]

New creates a new BBQ instance with the specified size, rounding the size up to the nearest power of two if it is not already.

func (*BBQ[T]) Available

func (e *BBQ[T]) Available() int

Available returns the remaining space for new items, indicating how many more can be buffered without blocking.

func (*BBQ[T]) Close

func (e *BBQ[T]) Close()

Close closes the queue, preventing further writes while allowing the consumer to drain remaining data.

func (*BBQ[T]) IsClosed

func (e *BBQ[T]) IsClosed() bool

IsClosed returns true if the queue is closed.

func (*BBQ[T]) IsEmpty

func (e *BBQ[T]) IsEmpty() bool

IsEmpty returns true if the queue is empty.

func (*BBQ[T]) IsFull

func (e *BBQ[T]) IsFull() bool

IsFull returns true if the queue is full.

func (*BBQ[T]) Items

func (e *BBQ[T]) Items() iter.Seq[T]

Items returns an iterator to stream individual items from the queue.

func (*BBQ[T]) Pipe

func (e *BBQ[T]) Pipe(dest *BBQ[T]) (int, error)

Pipe transfers items from source to dest, closing source if dest is closed while keeping dest open. Returns the number of items written to the destination in the final operation, or an error if one of the queues is closed.

func (*BBQ[T]) Read

func (e *BBQ[T]) Read(b []T) (int, error)

Read reads up to len(b) items from the queue, blocking if the queue is empty until data becomes available or the queue is closed. Returns the number of items read or ErrQueueClosed if the queue has been closed.

Example:

buffer := make([]T, 10)
n, err := queue.Read(buffer)
if err != nil {
    // Handle error (e.g., queue is closed).
}
fmt.Println("Got items:", buffer[:n])

func (*BBQ[T]) ReadUntil

func (e *BBQ[T]) ReadUntil(b []T, timeout time.Duration) (n int, err error)

ReadUntil reads exactly len(b) items or until the specified timeout elapses, returning early if data becomes available. Returns ErrQueueClosed if the queue is closed and fully drained.

func (*BBQ[T]) Size

func (e *BBQ[T]) Size() int

Size returns the total size of the queue.

func (*BBQ[T]) Slices

func (e *BBQ[T]) Slices(maxItems int) iter.Seq[[]T]

Slices returns an iterator to stream batches of items (up to maxItems) from the queue.

  • If maxItems is less than or equal to 0, or exceeds the buffer size, it defaults to the buffer size.

func (*BBQ[T]) SlicesWhen

func (e *BBQ[T]) SlicesWhen(requiredItems int, timeout time.Duration) iter.Seq[[]T]

SlicesWhen returns an iterator to stream batches of a specific size or fewer when the timeout expires.

  • If requiredItems is less than or equal to 0, or exceeds the buffer size, it defaults to the buffer size.
  • A value of 0 disables the timeout.

func (*BBQ[T]) Used

func (e *BBQ[T]) Used() int

Used returns the number of items currently in the queue.

func (*BBQ[T]) Write

func (e *BBQ[T]) Write(items ...T) (int, error)

Write adds one or more items to the queue, blocking if the queue is full until space becomes available or the queue is closed. Returns the number of items written or an ErrQueueClosed error.

Example:

n, err := q.Write(item1, item2, item3)
if err != nil {
    // Handle error (e.g., queue is closed).
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL