Documentation
¶
Overview ¶
Package natskvlease provides common utilities for NATS JetStream KeyValue-based distributed resource management, including distributed locks and leader election.
This package extracts shared functionality to eliminate code duplication between dlock (distributed locks) and leadelect (leader election) packages:
- Retry logic with exponential backoff for transient network errors
- KeyValue bucket creation and management
- Lease-based resource management with camping loops
The package is designed as an internal utility and should not be imported by external packages directly.
Index ¶
- Constants
- Variables
- func Retry[T any](ctx context.Context, fn func() (T, error)) (T, error)
- func RetryWithConfig[T any](ctx context.Context, fn func() (T, error), cfg RetryConfig) (T, error)
- func ValidateJetStreamEnabled(ctx context.Context, js jetstream.JetStream, logger *slog.Logger) error
- func ValidateNatsVersion(nc *nats.Conn) error
- type BucketConfig
- type KVHelper
- type KVOps
- func (o *KVOps) Create(ctx context.Context, key string, value []byte) (uint64, error)
- func (o *KVOps) Delete(ctx context.Context, key string) error
- func (o *KVOps) DeleteWithRevision(ctx context.Context, key string, revision uint64) error
- func (o *KVOps) Get(ctx context.Context, key string) (jetstream.KeyValueEntry, error)
- func (o *KVOps) KV() jetstream.KeyValue
- func (o *KVOps) Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)
- func (o *KVOps) Watch(ctx context.Context, key string) (jetstream.KeyWatcher, error)
- type Lease
- func (l *Lease) Acquire(ctx context.Context) (bool, error)
- func (l *Lease) GetOps() *KVOps
- func (l *Lease) IsHeld() bool
- func (l *Lease) Release(ctx context.Context) error
- func (l *Lease) Renew(ctx context.Context) (bool, error)
- func (l *Lease) RunCamping(ctx context.Context) (bool, error)
- func (l *Lease) StopCamping()
- func (l *Lease) UpdateValue(val []byte)
- type LeaseCallbacks
- type LeaseConfig
- type LeaseManager
- type RetryConfig
Constants ¶
const ( // MinNatsMajorVersion is the minimum required NATS server major version. MinNatsMajorVersion = 2 // MinNatsMinorVersion is the minimum required NATS server minor version. MinNatsMinorVersion = 11 )
const ( // DefaultMaxRetries is the default maximum number of retry attempts. DefaultMaxRetries = 5 // DefaultMaxElapsedTime is the default maximum elapsed time for retries. DefaultMaxElapsedTime = 2 * time.Second )
const ( // DefaultBucketTTL is the default TTL for bucket keys. DefaultBucketTTL = 10 * time.Second )
Variables ¶
var ( ErrLeaseNotHeld = errors.New("lease not held") ErrLeaseExists = errors.New("lease already exists") ErrProviderClosed = errors.New("provider is closed") ErrProviderStopped = errors.New("provider is stopped") )
Common errors for lease operations.
var ErrNatsVersionNotSupported = errors.New("NATS server version must be 2.11.0 or higher and JetStream must be enabled")
ErrNatsVersionNotSupported is returned when the NATS server version is less than 2.11.0 or JetStream is not enabled.
Functions ¶
func Retry ¶
Retry executes the given function with exponential backoff retry logic. It retries on transient network errors (timeouts, connection refused, network errors) and returns immediately on permanent errors.
Example:
entry, err := Retry(ctx, func() (jetstream.KeyValueEntry, error) {
return kv.Get(ctx, key)
})
func RetryWithConfig ¶
RetryWithConfig executes the given function with configurable retry logic.
func ValidateJetStreamEnabled ¶
func ValidateJetStreamEnabled(ctx context.Context, js jetstream.JetStream, logger *slog.Logger) error
ValidateJetStreamEnabled verifies that JetStream is enabled for the given account. It logs a consistent message via logger (when non-nil) and returns the underlying error.
func ValidateNatsVersion ¶
ValidateNatsVersion checks if the NATS server version meets the minimum requirements (2.11.0+). Returns nil if version is valid, ErrNatsVersionNotSupported if version is too old, or a wrapped error if version parsing fails.
Types ¶
type BucketConfig ¶
type BucketConfig struct {
// Bucket is the name of the KeyValue bucket.
Bucket string
// TTL is the time-to-live for keys in the bucket.
// If zero, DefaultBucketTTL is used.
TTL time.Duration
// Storage is the storage type (Memory or File).
// Defaults to MemoryStorage.
Storage jetstream.StorageType
// Compression enables compression for the bucket.
Compression bool
}
BucketConfig holds configuration for creating a KeyValue bucket.
type KVHelper ¶
type KVHelper struct {
// contains filtered or unexported fields
}
KVHelper provides common KeyValue operations for NATS JetStream.
func NewKVHelper ¶
NewKVHelper creates a new KVHelper instance.
func (*KVHelper) GetOrCreateBucket ¶
func (h *KVHelper) GetOrCreateBucket(ctx context.Context, cfg BucketConfig) (jetstream.KeyValue, error)
GetOrCreateBucket retrieves an existing bucket or creates a new one. It uses retry logic to handle transient network errors.
type KVOps ¶
type KVOps struct {
// contains filtered or unexported fields
}
KVOps provides common KeyValue CRUD operations with retry logic.
func (*KVOps) DeleteWithRevision ¶
DeleteWithRevision deletes a key with optimistic locking using revision.
type Lease ¶
type Lease struct {
// contains filtered or unexported fields
}
Lease represents a distributed lease that can be acquired, renewed, and released.
func NewLease ¶
func NewLease(kv jetstream.KeyValue, cfg LeaseConfig) *Lease
NewLease creates a new Lease instance.
func (*Lease) Acquire ¶
Acquire attempts to acquire the lease. Returns true if successfully acquired, false if already held by another owner.
func (*Lease) Renew ¶
Renew attempts to renew the lease. Returns true if successfully renewed, false if lease was lost.
func (*Lease) RunCamping ¶
RunCamping starts a camping loop that maintains the lease. It acquires the lease and periodically renews it until the context is canceled. Returns true if lease was acquired, false otherwise.
func (*Lease) UpdateValue ¶
UpdateValue updates the value stored with the lease. The new value will be used in subsequent renewals.
type LeaseCallbacks ¶
type LeaseCallbacks struct {
// OnAcquired is called when the lease is successfully acquired.
OnAcquired func()
// OnLost is called when the lease is lost (not voluntarily released).
OnLost func()
// OnRenewed is called when the lease is successfully renewed.
OnRenewed func()
// OnReleased is called when the lease is voluntarily released.
OnReleased func()
}
LeaseCallbacks defines callbacks for lease state changes.
type LeaseConfig ¶
type LeaseConfig struct {
// Key is the unique identifier for the lease in the KV store.
Key string
// TTL is the time-to-live for the lease.
TTL time.Duration
// RenewRatio is the ratio of TTL at which to renew (e.g., 0.5 means renew at 50% of TTL).
RenewRatio float64
// Value is the value to store with the lease (owner identifier).
Value []byte
// IsOwner checks if the given value belongs to this lease holder.
IsOwner func(value []byte) bool
// Callbacks for lease state changes.
Callbacks LeaseCallbacks
// Logger for lease operations.
Logger *slog.Logger
}
LeaseConfig holds configuration for a lease.
type LeaseManager ¶
type LeaseManager struct {
// contains filtered or unexported fields
}
LeaseManager manages a lease with optional key watching for faster acquisition.
func NewLeaseManager ¶
func NewLeaseManager(kv jetstream.KeyValue, cfg LeaseConfig) *LeaseManager
NewLeaseManager creates a new LeaseManager instance.
func (*LeaseManager) IsHeld ¶
func (m *LeaseManager) IsHeld() bool
IsHeld returns true if the lease is currently held.
type RetryConfig ¶
RetryConfig holds configuration for retry operations.
func DefaultRetryConfig ¶
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns the default retry configuration.