natskvlease

package
v0.0.0-...-0b8e112 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package natskvlease provides common utilities for NATS JetStream KeyValue-based distributed resource management, including distributed locks and leader election.

This package extracts shared functionality to eliminate code duplication between dlock (distributed locks) and leadelect (leader election) packages:

  • Retry logic with exponential backoff for transient network errors
  • KeyValue bucket creation and management
  • Lease-based resource management with camping loops

The package is designed as an internal utility and should not be imported by external packages directly.

Index

Constants

View Source
const (
	// MinNatsMajorVersion is the minimum required NATS server major version.
	MinNatsMajorVersion = 2
	// MinNatsMinorVersion is the minimum required NATS server minor version.
	MinNatsMinorVersion = 11
)
View Source
const (
	// DefaultMaxRetries is the default maximum number of retry attempts.
	DefaultMaxRetries = 5

	// DefaultMaxElapsedTime is the default maximum elapsed time for retries.
	DefaultMaxElapsedTime = 2 * time.Second
)
View Source
const (
	// DefaultBucketTTL is the default TTL for bucket keys.
	DefaultBucketTTL = 10 * time.Second
)

Variables

View Source
var (
	ErrLeaseNotHeld    = errors.New("lease not held")
	ErrLeaseExists     = errors.New("lease already exists")
	ErrProviderClosed  = errors.New("provider is closed")
	ErrProviderStopped = errors.New("provider is stopped")
)

Common errors for lease operations.

View Source
var ErrNatsVersionNotSupported = errors.New("NATS server version must be 2.11.0 or higher and JetStream must be enabled")

ErrNatsVersionNotSupported is returned when the NATS server version is less than 2.11.0 or JetStream is not enabled.

Functions

func Retry

func Retry[T any](ctx context.Context, fn func() (T, error)) (T, error)

Retry executes the given function with exponential backoff retry logic. It retries on transient network errors (timeouts, connection refused, network errors) and returns immediately on permanent errors.

Example:

entry, err := Retry(ctx, func() (jetstream.KeyValueEntry, error) {
    return kv.Get(ctx, key)
})

func RetryWithConfig

func RetryWithConfig[T any](ctx context.Context, fn func() (T, error), cfg RetryConfig) (T, error)

RetryWithConfig executes the given function with configurable retry logic.

func ValidateJetStreamEnabled

func ValidateJetStreamEnabled(ctx context.Context, js jetstream.JetStream, logger *slog.Logger) error

ValidateJetStreamEnabled verifies that JetStream is enabled for the given account. It logs a consistent message via logger (when non-nil) and returns the underlying error.

func ValidateNatsVersion

func ValidateNatsVersion(nc *nats.Conn) error

ValidateNatsVersion checks if the NATS server version meets the minimum requirements (2.11.0+). Returns nil if version is valid, ErrNatsVersionNotSupported if version is too old, or a wrapped error if version parsing fails.

Types

type BucketConfig

type BucketConfig struct {
	// Bucket is the name of the KeyValue bucket.
	Bucket string

	// TTL is the time-to-live for keys in the bucket.
	// If zero, DefaultBucketTTL is used.
	TTL time.Duration

	// Storage is the storage type (Memory or File).
	// Defaults to MemoryStorage.
	Storage jetstream.StorageType

	// Compression enables compression for the bucket.
	Compression bool
}

BucketConfig holds configuration for creating a KeyValue bucket.

type KVHelper

type KVHelper struct {
	// contains filtered or unexported fields
}

KVHelper provides common KeyValue operations for NATS JetStream.

func NewKVHelper

func NewKVHelper(js jetstream.JetStream, logger *slog.Logger) *KVHelper

NewKVHelper creates a new KVHelper instance.

func (*KVHelper) GetOrCreateBucket

func (h *KVHelper) GetOrCreateBucket(ctx context.Context, cfg BucketConfig) (jetstream.KeyValue, error)

GetOrCreateBucket retrieves an existing bucket or creates a new one. It uses retry logic to handle transient network errors.

type KVOps

type KVOps struct {
	// contains filtered or unexported fields
}

KVOps provides common KeyValue CRUD operations with retry logic.

func NewKVOps

func NewKVOps(kv jetstream.KeyValue, logger *slog.Logger) *KVOps

NewKVOps creates a new KVOps instance.

func (*KVOps) Create

func (o *KVOps) Create(ctx context.Context, key string, value []byte) (uint64, error)

Create creates a new key-value pair. Fails if key already exists.

func (*KVOps) Delete

func (o *KVOps) Delete(ctx context.Context, key string) error

Delete deletes a key from the store.

func (*KVOps) DeleteWithRevision

func (o *KVOps) DeleteWithRevision(ctx context.Context, key string, revision uint64) error

DeleteWithRevision deletes a key with optimistic locking using revision.

func (*KVOps) Get

func (o *KVOps) Get(ctx context.Context, key string) (jetstream.KeyValueEntry, error)

Get retrieves a value from the KeyValue store.

func (*KVOps) KV

func (o *KVOps) KV() jetstream.KeyValue

KV returns the underlying KeyValue store.

func (*KVOps) Update

func (o *KVOps) Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)

Update updates an existing key with optimistic locking using revision.

func (*KVOps) Watch

func (o *KVOps) Watch(ctx context.Context, key string) (jetstream.KeyWatcher, error)

Watch creates a watcher for changes to a specific key.

type Lease

type Lease struct {
	// contains filtered or unexported fields
}

Lease represents a distributed lease that can be acquired, renewed, and released.

func NewLease

func NewLease(kv jetstream.KeyValue, cfg LeaseConfig) *Lease

NewLease creates a new Lease instance.

func (*Lease) Acquire

func (l *Lease) Acquire(ctx context.Context) (bool, error)

Acquire attempts to acquire the lease. Returns true if successfully acquired, false if already held by another owner.

func (*Lease) GetOps

func (l *Lease) GetOps() *KVOps

GetOps returns the underlying KVOps instance.

func (*Lease) IsHeld

func (l *Lease) IsHeld() bool

IsHeld returns true if the lease is currently held.

func (*Lease) Release

func (l *Lease) Release(ctx context.Context) error

Release voluntarily releases the lease.

func (*Lease) Renew

func (l *Lease) Renew(ctx context.Context) (bool, error)

Renew attempts to renew the lease. Returns true if successfully renewed, false if lease was lost.

func (*Lease) RunCamping

func (l *Lease) RunCamping(ctx context.Context) (bool, error)

RunCamping starts a camping loop that maintains the lease. It acquires the lease and periodically renews it until the context is canceled. Returns true if lease was acquired, false otherwise.

func (*Lease) StopCamping

func (l *Lease) StopCamping()

StopCamping stops the camping loop.

func (*Lease) UpdateValue

func (l *Lease) UpdateValue(val []byte)

UpdateValue updates the value stored with the lease. The new value will be used in subsequent renewals.

type LeaseCallbacks

type LeaseCallbacks struct {
	// OnAcquired is called when the lease is successfully acquired.
	OnAcquired func()

	// OnLost is called when the lease is lost (not voluntarily released).
	OnLost func()

	// OnRenewed is called when the lease is successfully renewed.
	OnRenewed func()

	// OnReleased is called when the lease is voluntarily released.
	OnReleased func()
}

LeaseCallbacks defines callbacks for lease state changes.

type LeaseConfig

type LeaseConfig struct {
	// Key is the unique identifier for the lease in the KV store.
	Key string

	// TTL is the time-to-live for the lease.
	TTL time.Duration

	// RenewRatio is the ratio of TTL at which to renew (e.g., 0.5 means renew at 50% of TTL).
	RenewRatio float64

	// Value is the value to store with the lease (owner identifier).
	Value []byte

	// IsOwner checks if the given value belongs to this lease holder.
	IsOwner func(value []byte) bool

	// Callbacks for lease state changes.
	Callbacks LeaseCallbacks

	// Logger for lease operations.
	Logger *slog.Logger
}

LeaseConfig holds configuration for a lease.

type LeaseManager

type LeaseManager struct {
	// contains filtered or unexported fields
}

LeaseManager manages a lease with optional key watching for faster acquisition.

func NewLeaseManager

func NewLeaseManager(kv jetstream.KeyValue, cfg LeaseConfig) *LeaseManager

NewLeaseManager creates a new LeaseManager instance.

func (*LeaseManager) IsHeld

func (m *LeaseManager) IsHeld() bool

IsHeld returns true if the lease is currently held.

func (*LeaseManager) Start

func (m *LeaseManager) Start(ctx context.Context) error

Start begins the lease management loop. It continuously attempts to acquire and maintain the lease.

func (*LeaseManager) Stop

func (m *LeaseManager) Stop(ctx context.Context) error

Stop stops the lease management loop and releases the lease.

type RetryConfig

type RetryConfig struct {
	MaxRetries     uint
	MaxElapsedTime time.Duration
}

RetryConfig holds configuration for retry operations.

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns the default retry configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL