collections

package module
v0.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 1, 2025 License: Apache-2.0 Imports: 6 Imported by: 0

README

collections

Golang Data Collections

Ring Buffer

The Ring[T] type provide a fixed-size ring buffer, implemented as a slice of type T . This was created because of edge-case bugs found in a free Ring implementation.

It uses views into a slice rather than doing offset arithmetic, which simplifies the implementation and improves performance. An incremental fuzzer test is used to search for unexpected edge cases by comparing against an unoptimized naive implementation.

To create a new ring buffer, use NewRing:

r := NewRing[*MyData](numElements)
if !r.PushBack(&MyData{...}) {
    // the ring buffer is full!
}

v, ok := r.PopFront()
if !ok {
    // no elements left in ring buffer
}

Peek operations are also provided, which look at the next element, or even an arbitrary index, without modifying the ring. The Len and Cap functions indicate the size and capacity of the ring.

StatefulNotifier

StatefulNotifier[T] acts as an atomic variable which allows waiting on a state change. The Load method atomically reads the current value and returns an update notifier which is signalled when the state changes. This allows for efficient wait loops without by avoiding polling for updates.

There are multiple types of wait mechanisms, including Wait which blocks until an arbitrary condition is met, and Watch which turns the updates into a range-friendly sequence. Note that this is not an update queue, meaning that individual updates are not stored (only the last value is available). A receiver is guaranteed to receive the latest value, but is not guaranteed to see all intermediate updates if it hasn't kept up with the state changes.

PubSub

Channel[T] provides a publish/subscribe channel. This is similar to a native chan T, except that it has infinite capacity and allows for multiple subscribers.

To create, simplify define a var ch Channel[*myValueType]. The Publish method adds a new value to the channel. To subscribe, use Subscribe to create a subscription object, which will call the callback in the background. Alternatively use Watch to apply updates in the foreground, or Receive to return an iter.Seq which can be iterated over using range.

Note that updates are not persisted. As soon as all active subscribers have read a message, then it is no longer accessible and will be garbage collected. This means that if a channel is created and values are published before any subscribers have been created, then those values will disappear immediately.

Documentation

Overview

Package collections provides a collection of data structures.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WaitAny added in v0.2.0

func WaitAny[T any, N NotifierLoader[T]](ctx context.Context, fn func(T) bool,
	notifiers ...N) (T, int)

WaitAny blocks until one of the given states match the condition function, or else the context is canceled. It returns the value that satisfied the condition, along with an index of the notifier that was matched.

Note that, like Wait, WaitAny may miss intermediate updates if multiple updates occur quickly.

If the context was canceled, the value will be the zero value and the index will be -1.

func WaitAnyMethod added in v0.2.1

func WaitAnyMethod[T any, V any](ctx context.Context,
	fn func(T) bool,
	method func(V) (T, <-chan struct{}),
	objs ...V) (T, int)

WaitAnyMethod is like WaitAny, but takes a list of objects along with a method signature that returns a value and a notifier channel. This allows it to be used with similar operations which have a different method name or by using `method` as an adapter function.

func WatchFutures added in v0.2.6

func WatchFutures[T any](ctx context.Context, futures ...*Future[T]) iter.Seq2[int, T]

WatchFutures returns an iterator over the future results. It will yield the index and value of the futures as they are set, until the context is cancelled or all futures have been received.

Types

type Channel added in v0.2.0

type Channel[T any] struct {
	// contains filtered or unexported fields
}

Channel is a publish/subscribe channel. It is similar to a Go channel with infinite capacity, with a couple important differences.

1. Multiple receivers. There may be multiple receivers (or publishers), and all receivers get all messages.

2. Persistence. Messages are not persisted. If no receivers are listening when a message is published, it will be lost. When a receiver subscribes, it will only receive messages published after the subscription is created.

func (*Channel[T]) Close added in v0.2.4

func (c *Channel[T]) Close()

Close the channel. This will prevent any new values from being published, and will cause all subscribers to stop receiving values after the last message. For receive iterators, this will cause the iterator to terminate.

func (*Channel[T]) Publish added in v0.2.0

func (c *Channel[T]) Publish(value T)

Publish a new value to the channel. This value will be sent to all subscribers. Note that values are not persisted, so if no subscribers are listening when a value is published, it will be lost.

func (*Channel[T]) Receive added in v0.2.4

func (c *Channel[T]) Receive() iter.Seq[T]

Receive subscribes to updates on the channel and returns a sequence of values. The subscription is setup before the function returns, so it is safe to publish values immediately after calling Receive. The sequence may be infinite, it will only terminate if the channel is closed.

func (*Channel[T]) Subscribe added in v0.2.0

func (c *Channel[T]) Subscribe(fn func(T)) *Subscription[T]

Subscribe is like Watch, but without the context. The subscription will run until it is canceled. The subscription is setup before the function returns, so it is safe to publish values immediately after calling Subscribe.

func (*Channel[T]) Watch added in v0.2.0

func (c *Channel[T]) Watch(ctx context.Context, fn func(T) error) error

Watch updates on the channel. The function will be called with each new value sent to the channel. If the function returns an error, the subscription will be canceled and the error will be returned. If the channel is closed, Watch will return nil.

type Future added in v0.2.6

type Future[T any] struct {
	// contains filtered or unexported fields
}

Future is a value that will be set at some point in the future. It is similar to a StatefulNotifier, but can only be set once.

func NewFuture added in v0.2.6

func NewFuture[T any]() *Future[T]

NewFuture creates a new Future.

func (*Future[T]) Done added in v0.2.6

func (f *Future[T]) Done() <-chan struct{}

Done returns a channel that is unblocked when the Future has been set.

func (*Future[T]) Get added in v0.2.6

func (f *Future[T]) Get(ctx context.Context) (T, error)

Get blocks until the value is available or the context is cancelled.

func (*Future[T]) Set added in v0.2.6

func (f *Future[T]) Set(value T) bool

Set sets the value of the Future. This unblocks any calls to Get. It returns false if the Future has already been set.

type NotifierLoader added in v0.2.1

type NotifierLoader[T any] interface {
	Load() (T, <-chan struct{})
}

NotifierLoader is an interface that provides a Load method that returns a value and a channel that will be closed when the value is updated.

type Ring

type Ring[T any] struct {
	// contains filtered or unexported fields
}

Ring is a fixed-size ring buffer that supports pushing and popping elements, as well as copying elements into a slice, and removing an element by index. The ring is implemented as a single slice, which is never reallocated.

Note that no synchronization is done. If the ring is accessed concurrently, it must be synchronized externally.

func NewRing

func NewRing[T any](fixedSize int) *Ring[T]

NewRing creates a new ring buffer with the given fixed size.

func (*Ring[T]) All added in v0.2.4

func (r *Ring[T]) All() iter.Seq[T]

All returns a sequence of all elements in the ring.

func (*Ring[T]) Cap

func (r *Ring[T]) Cap() int

Cap returns the fixed size of the ring. This is constant for the lifetime of the ring.

func (*Ring[T]) Copy

func (r *Ring[T]) Copy(out []T) int

Copy makes a copy of the first n elements of the ring into the out slice. It returns the number of elements copied. This does not consume elements from the ring.

func (*Ring[T]) Drop added in v0.2.7

func (r *Ring[T]) Drop(n int) int

Drop removes the first n elements from the ring. It returns the number of elements dropped. If n is greater than the number of elements in the ring, all elements are removed.

func (*Ring[T]) Len

func (r *Ring[T]) Len() int

Len returns the number of elements in the ring.

func (*Ring[T]) PeekFront added in v0.1.5

func (r *Ring[T]) PeekFront() (T, bool)

PeekFront returns the first element in the ring without removing it.

func (*Ring[T]) PeekIndex added in v0.1.5

func (r *Ring[T]) PeekIndex(i int) (T, bool)

PeekIndex returns the element at the given index without removing it. If the index is out of bounds, it returns false. The index is 0-based, with 0 being the first element in the ring. PeekIndex(0) is equivalent to PeekFront.

func (*Ring[T]) PopFront

func (r *Ring[T]) PopFront() (T, bool)

PopFront removes and returns the first element in the ring. If the ring is empty, it returns false.

func (*Ring[T]) PopIndex

func (r *Ring[T]) PopIndex(i int) (T, bool)

PopIndex removes and returns the element at the given index. This will require copying elements to maintain the ring structure, which has a time complexity of O(n) in the worst case.

If the index is out of bounds, it returns false. The index is 0-based, with 0 being the first element in the ring. PopIndex(0) is equivalent to PopFront.

func (*Ring[T]) PushBack

func (r *Ring[T]) PushBack(e T) bool

PushBack adds the element to the ring. If the ring is full, it returns false.

func (*Ring[T]) PushBackAll added in v0.2.8

func (r *Ring[T]) PushBackAll(in []T) int

PushBackAll adds elements to the ring. It returns the number of elements copied, which may be less than len(in) if the ring is full.

func (*Ring[T]) Read added in v0.2.7

func (r *Ring[T]) Read(out []T) (int, error)

Read copies the first n elements from the ring into the out slice. It returns the number of elements copied and an error if the ring is empty. If the ring is a Ring[byte], then this implements io.Reader.

func (*Ring[T]) Reset

func (r *Ring[T]) Reset()

Reset removes all elements from the ring.

func (*Ring[T]) Resize added in v0.2.5

func (r *Ring[T]) Resize(newSize int) error

Resize changes the size of the ring. The new size must be greater than or equal to the current size.

func (*Ring[T]) Scan added in v0.1.7

func (r *Ring[T]) Scan(fn func(T) bool) (T, int)

Scan calls the given function for each element in the ring, in order. If the function returns true, then the value and index of the element are returned. If no match is found, then returns the zero value of T and -1.

func (*Ring[T]) Write added in v0.2.7

func (r *Ring[T]) Write(in []T) (int, error)

Write writes the elements to the ring from the in slice. It returns the number of elements written and an error if the ring is full. If the ring is a Ring[byte], then this implements io.Writer.

type StatefulNotifier added in v0.2.0

type StatefulNotifier[T any] struct {
	// contains filtered or unexported fields
}

StatefulNotifier holds a value and notifies listeners when the value is updated. Unlike a Channel, it does not persist values, so a listener (calling Get) may not see all updates if multiple updates occur between calls to Get.

func NewStatefulNotifier added in v0.2.0

func NewStatefulNotifier[T any](initial T) *StatefulNotifier[T]

NewStatefulNotifier creates a new StatefulNotifier with the given initial value.

func (*StatefulNotifier[T]) Load added in v0.2.0

func (n *StatefulNotifier[T]) Load() (T, <-chan struct{})

Load returns the current value, along with a channel that will unblock when the value is updated.

func (*StatefulNotifier[T]) Store added in v0.2.0

func (n *StatefulNotifier[T]) Store(value T)

Store updates the value and unblocks any listeners.

func (*StatefulNotifier[T]) Update added in v0.2.2

func (n *StatefulNotifier[T]) Update(fn func(T) T) T

Update will atomically provide the current value to the update function and store the result of the function. Note that this will call the user's function with a lock held, so if the function blocks, then other calls to the notifier will block.

func (*StatefulNotifier[T]) Wait added in v0.2.0

func (n *StatefulNotifier[T]) Wait(ctx context.Context, fn func(T) bool) (T, error)

Wait blocks until the given condition function returns true or the context is canceled. It returns the value that satisfied the condition.

Note that Wait may miss intermediate updates if multiple update occur quickly. If every update should be processed, use Channel instead.

func (*StatefulNotifier[T]) Watch added in v0.2.4

func (n *StatefulNotifier[T]) Watch(ctx context.Context) iter.Seq[T]

Watch returns an iterator which will yield the current value and any updates. Note that updates may be missed if multiple updates occur quickly. If all updates should be processed, use a Channel instead. If the context is cancelled, then the iterator terminates.

type Subscription added in v0.2.0

type Subscription[T any] struct {
	// contains filtered or unexported fields
}

Subscription is a subscription to a Channel. It will receive all values published to the channel until it is canceled.

func (*Subscription[T]) Cancel added in v0.2.0

func (s *Subscription[T]) Cancel()

Cancel the subscription. This will cause the subscription to stop receiving updates from the channel. Note that the subscription loop runs in the background, so there may be some latency between the cancel call and the subscription stopping.

func (*Subscription[T]) Done added in v0.2.5

func (s *Subscription[T]) Done() <-chan struct{}

Done returns a channel that will be closed when the subscription loop has finished.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL