sqlitestore

package module
v0.0.0-...-860e172 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package sqlitestore provides SQLite-backed checkpoint storage for subscriptions.

SQLiteNotifier implements subscription.StoreNotifier for SQLite stores.

Two strategies are supported:

  1. Callback-based (default): The store calls Signal() after each append. This is instant and works with both :memory: and file-based databases.
  2. Polling-based: Polls PRAGMA data_version to detect external writes. Useful when multiple processes write to the same file-based database.

For single-process use (the common case), callback-based is recommended. The Store's WithNotifier option wires this up automatically.

Package sqlitestore provides a SQLite-backed EventStore implementation.

Index

Constants

View Source
const (
	// DefaultPollInterval is the default polling interval for data_version changes.
	DefaultPollInterval = 50 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CheckpointStore

type CheckpointStore struct {
	// contains filtered or unexported fields
}

CheckpointStore is a durable, SQLite-backed checkpoint for event subscriptions. It tracks each consumer's last processed global sequence number, enabling crash recovery without reprocessing the entire event stream.

Thread-safe: all methods are safe for concurrent use (SQLite serializes writes).

func NewCheckpointStore

func NewCheckpointStore(db *sql.DB) (*CheckpointStore, error)

NewCheckpointStore creates a SQLite-backed checkpoint store. Creates the checkpoints table if it doesn't exist.

func (*CheckpointStore) Load

func (c *CheckpointStore) Load(ctx context.Context, consumerID string) (uint64, error)

Load returns the last processed sequence for a consumer. Returns 0 if new.

func (*CheckpointStore) Save

func (c *CheckpointStore) Save(ctx context.Context, consumerID string, sequence uint64) error

Save persists the consumer's position atomically.

type GlobalEvent

type GlobalEvent[E any] struct {
	// GlobalSequence is the store-wide monotonic position.
	GlobalSequence uint64
	// StreamID identifies which stream this event belongs to.
	StreamID string
	// EventType is the string name of the event (e.g., "OrderCreated").
	EventType string
	// Version is the per-stream version number.
	Version int
	// Data is the domain event payload.
	Data E
	// Timestamp is when the event was stored.
	Timestamp time.Time
}

GlobalEvent is a store-wide event with a global sequence number. Compatible with subscription.GlobalEvent but decoupled to avoid import cycles.

type GlobalReaderOption

type GlobalReaderOption[E any] func(*SQLiteGlobalReader[E])

GlobalReaderOption configures a SQLiteGlobalReader.

func WithGlobalReaderCodecRegistry

func WithGlobalReaderCodecRegistry[E any](reg *codec.Registry) GlobalReaderOption[E]

WithGlobalReaderCodecRegistry sets the codec registry for reading events with different codecs.

func WithGlobalReaderRegistry

func WithGlobalReaderRegistry[E any](reg *eskit.EventRegistry) GlobalReaderOption[E]

WithGlobalReaderRegistry enables type registry for heterogeneous global reads.

type NotifierOption

type NotifierOption func(*SQLiteNotifier)

NotifierOption configures an SQLiteNotifier.

func WithPollInterval

func WithPollInterval(d time.Duration) NotifierOption

WithPollInterval sets the polling interval for data_version checks. Only relevant when Start() is called for polling mode.

type Option

type Option[E any] func(*Store[E])

Option configures a SQLite store.

func WithCodec

func WithCodec[E any](c codec.Codec) Option[E]

WithUpcasters enables event upcasting for schema evolution during Load. WithCodec sets a custom codec for event serialization and registers it for reads. By default, events are serialized as JSON using encoding/json. Use this to plug in CBOR, msgpack, or any custom format. For multi-codec migration, use WithWriteCodec and WithCodecRegistry instead.

func WithCodecRegistry

func WithCodecRegistry[E any](r *codec.Registry) Option[E]

WithCodecRegistry sets the registry used to look up codecs when reading events. Each event stores which codec was used to serialize it; the registry maps codec names back to implementations for deserialization. If not set, a default registry with JSON, JSONiter, and CBOR is used.

func WithRegistry

func WithRegistry[E any](reg *eskit.EventRegistry) Option[E]

WithRegistry enables type registry for heterogeneous event deserialization.

func WithStoreNotifier

func WithStoreNotifier[E any](n *SQLiteNotifier) Option[E]

WithStoreNotifier wires up an SQLiteNotifier to signal after each Append. This provides instant notification without polling.

func WithUpcasters

func WithUpcasters[E any](u *eskit.UpcasterRegistry) Option[E]

func WithWriteCodec

func WithWriteCodec[E any](c codec.Codec) Option[E]

WithWriteCodec sets the codec used for writing NEW events. Existing events are read using the codec stored in each event's codec column. Must be used with WithCodecRegistry to ensure the write codec is available for reads.

type SQLiteChangeRelay

type SQLiteChangeRelay struct {
	// contains filtered or unexported fields
}

SQLiteChangeRelay broadcasts projection changes across server instances using a shared SQLite table. Unlike PostgreSQL's LISTEN/NOTIFY, SQLite has no built-in pub/sub mechanism, so this relay polls a shared eskit_changes table for new entries.

Flow:

  1. Server A processes event → OnChange fires → Broadcast() inserts row + notifies locally
  2. Server B polls the table → sees new row from A → forwards to its local ChangeNotifier
  3. SSE handlers on all servers get notified

Safe for concurrent use.

func NewSQLiteChangeRelay

func NewSQLiteChangeRelay(db *sql.DB, notifier *eskit.ChangeNotifier, opts ...SQLiteChangeRelayOption) *SQLiteChangeRelay

NewSQLiteChangeRelay creates a relay that bridges ChangeNotifier across instances using a shared SQLite changes table.

The notifier receives all changes detected by this relay (from other instances). Call Start() to begin polling.

func (*SQLiteChangeRelay) Broadcast

func (r *SQLiteChangeRelay) Broadcast(change eskit.Change)

Broadcast sends a change notification to all instances by inserting a row into the eskit_changes table. Also notifies the local ChangeNotifier directly for immediate delivery to SSE handlers on this instance.

The INSERT is best-effort: errors are silently ignored since SSE handlers will catch up on the next successful notification.

func (*SQLiteChangeRelay) Close

func (r *SQLiteChangeRelay) Close() error

Close stops the relay. Safe to call multiple times.

func (*SQLiteChangeRelay) Start

func (r *SQLiteChangeRelay) Start(ctx context.Context)

Start begins polling for changes from other instances and forwarding them to the local ChangeNotifier. Blocks until ctx is cancelled or Close is called. Auto-creates the eskit_changes table if it does not exist.

func (*SQLiteChangeRelay) Wait

func (r *SQLiteChangeRelay) Wait()

Wait blocks until the relay's poll loop has fully stopped.

type SQLiteChangeRelayOption

type SQLiteChangeRelayOption func(*SQLiteChangeRelay)

SQLiteChangeRelayOption configures a SQLiteChangeRelay.

func WithRelayPollInterval

func WithRelayPollInterval(d time.Duration) SQLiteChangeRelayOption

WithRelayPollInterval sets the interval between polls for new changes from other instances. Default: 100ms.

func WithRelayPruneInterval

func WithRelayPruneInterval(d time.Duration) SQLiteChangeRelayOption

WithRelayPruneInterval sets the interval between prune operations that remove old change entries. Default: 60s.

func WithRelayRetention

func WithRelayRetention(n int) SQLiteChangeRelayOption

WithRelayRetention sets the maximum number of change entries to retain in the table. Older entries are pruned periodically to prevent unbounded growth. Default: 1000.

type SQLiteGlobalReader

type SQLiteGlobalReader[E any] struct {
	// contains filtered or unexported fields
}

SQLiteGlobalReader implements subscription.GlobalReader for SQLite. It reads events by global sequence (the events.id column).

func NewSQLiteGlobalReader

func NewSQLiteGlobalReader[E any](db *sql.DB, opts ...GlobalReaderOption[E]) *SQLiteGlobalReader[E]

NewSQLiteGlobalReader creates a global reader backed by a sql.DB.

func (*SQLiteGlobalReader[E]) LatestSequence

func (r *SQLiteGlobalReader[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence returns the highest global sequence in the store, or 0 if empty.

func (*SQLiteGlobalReader[E]) ReadFrom

func (r *SQLiteGlobalReader[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)

ReadFrom returns events starting from the given global sequence (inclusive), up to limit.

type SQLiteNotifier

type SQLiteNotifier struct {
	// contains filtered or unexported fields
}

SQLiteNotifier detects database changes via callback or PRAGMA data_version polling. Safe for concurrent use.

func NewNotifier

func NewNotifier(db *sql.DB, opts ...NotifierOption) *SQLiteNotifier

NewNotifier creates an SQLiteNotifier. If db is non-nil, Start() enables PRAGMA data_version polling for external writes. Signal() can always be called directly for instant notification.

func (*SQLiteNotifier) Close

func (n *SQLiteNotifier) Close() error

Close stops the notifier and closes all listener channels.

func (*SQLiteNotifier) Notify

func (n *SQLiteNotifier) Notify(ctx context.Context) <-chan uint64

Notify returns a channel that receives the latest global sequence when new events are appended. Implements subscription.StoreNotifier.

func (*SQLiteNotifier) Signal

func (n *SQLiteNotifier) Signal(sequence uint64)

Signal notifies all listeners that new events are available at the given sequence. Call this after appending events. Non-blocking.

func (*SQLiteNotifier) Start

func (n *SQLiteNotifier) Start(ctx context.Context)

Start begins polling PRAGMA data_version for external database changes. Blocks until ctx is cancelled or Close is called. Optional — not needed if all writes go through a store with WithNotifier.

func (*SQLiteNotifier) Wait

func (n *SQLiteNotifier) Wait()

Wait blocks until the notifier has fully stopped.

type SnapshotStore

type SnapshotStore[S any] struct {
	// contains filtered or unexported fields
}

SnapshotStore is a SQLite-backed eskit.SnapshotStore implementation. Stores snapshots as JSON with schema versioning and timestamps.

func NewSnapshotStore

func NewSnapshotStore[S any](db *sql.DB) (*SnapshotStore[S], error)

NewSnapshotStore creates a SQLite-backed snapshot store. Automatically creates the snapshots table with schema_version and created_at columns.

func (*SnapshotStore[S]) Invalidate

func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error

Invalidate deletes the snapshot for a single stream.

func (*SnapshotStore[S]) InvalidateAll

func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error

InvalidateAll deletes all snapshots.

func (*SnapshotStore[S]) LoadSnapshot

func (s *SnapshotStore[S]) LoadSnapshot(ctx context.Context, streamID string) (*eskit.Snapshot[S], error)

func (*SnapshotStore[S]) SaveSnapshot

func (s *SnapshotStore[S]) SaveSnapshot(ctx context.Context, snapshot eskit.Snapshot[S]) error

type Store

type Store[E any] struct {
	// contains filtered or unexported fields
}

Store is a SQLite-backed event store. Events are serialized as JSON.

func New

func New[E any](dsn string, opts ...Option[E]) (*Store[E], error)

New creates a new SQLite event store. The dsn is a SQLite connection string (e.g., "file:events.db" or ":memory:" for testing).

func NewFromDB

func NewFromDB[E any](db *sql.DB, opts ...Option[E]) (*Store[E], error)

NewFromDB wraps an existing *sql.DB connection.

func (*Store[E]) Append

func (s *Store[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, metadata ...eskit.Metadata) ([]eskit.Event[E], error)

func (*Store[E]) AppendWithOptions

func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, opts eskit.AppendOptions) ([]eskit.Event[E], error)

AppendWithOptions persists events with idempotency and custom timestamp support.

func (*Store[E]) Archive

func (s *Store[E]) Archive(ctx context.Context, streamID string, target eskit.EventStore[E]) error

Archive moves a stream to the target store and tombstones the primary.

func (*Store[E]) ArchiveStream

func (s *Store[E]) ArchiveStream(ctx context.Context, streamID string) error

ArchiveStream marks a stream as archived. Future appends are rejected.

func (*Store[E]) Close

func (s *Store[E]) Close() error

Close closes the underlying database connection.

func (*Store[E]) DB

func (s *Store[E]) DB() *sql.DB

DB returns the underlying database connection for advanced use cases.

func (*Store[E]) Delete

func (s *Store[E]) Delete(ctx context.Context, streamID string) error

Delete permanently removes all events for a stream. Returns ErrStreamNotFound if stream does not exist. Also removes associated snapshots and tombstones.

func (*Store[E]) DeleteStream

func (s *Store[E]) DeleteStream(ctx context.Context, streamID string) error

DeleteStream permanently removes all events in a stream.

func (*Store[E]) IsTombstoned

func (s *Store[E]) IsTombstoned(ctx context.Context, streamID string) (*eskit.Tombstone, error)

IsTombstoned checks if a stream has been tombstoned. Returns nil, nil if the stream is not tombstoned.

func (*Store[E]) LatestSequence

func (s *Store[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence returns the highest global sequence (rowid) in the store, or 0 if empty.

func (*Store[E]) Load

func (s *Store[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)

func (*Store[E]) LoadFrom

func (s *Store[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)

func (*Store[E]) LoadRaw

func (s *Store[E]) LoadRaw(ctx context.Context, streamID string) ([]*eskit.RawEvent, error)

LoadRaw loads events without deserializing the Data field. Returns RawEvent values that can be selectively decoded on demand. This is significantly faster when you only need metadata or a subset of events.

func (*Store[E]) LoadRawWithOptions

func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)

LoadRawWithOptions loads raw events with optional filtering.

func (*Store[E]) LoadWithOptions

func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)

LoadWithOptions loads events with server-side filtering (event types, version range, limit).

func (*Store[E]) ReadFrom

func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)

ReadFrom implements GlobalReader — reads events across all streams by global sequence. Uses SQLite's rowid (aliased as id via INTEGER PRIMARY KEY AUTOINCREMENT) as global sequence.

func (*Store[E]) ReadFromWithOptions

func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)

ReadFromWithOptions reads global events with optional event type filtering.

func (*Store[E]) Restore

func (s *Store[E]) Restore(ctx context.Context, streamID string, source eskit.EventStore[E]) error

Restore moves an archived stream back from the source store to the primary.

func (*Store[E]) RestoreStream

func (s *Store[E]) RestoreStream(ctx context.Context, streamID string) error

RestoreStream brings an archived stream back to active state.

func (*Store[E]) StreamStatus

func (s *Store[E]) StreamStatus(ctx context.Context, streamID string) (eskit.StreamState, error)

StreamStatus returns the current lifecycle state of a stream.

func (*Store[E]) Tombstone

func (s *Store[E]) Tombstone(ctx context.Context, streamID string, reason string) error

Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted. Load returns empty. The tombstone record remains for audit.

func (*Store[E]) TombstoneStream

func (s *Store[E]) TombstoneStream(ctx context.Context, streamID string) error

TombstoneStream marks a stream as deleted. Future appends are rejected.

func (*Store[E]) Truncate

func (s *Store[E]) Truncate(ctx context.Context) error

Truncate removes all events from the store and resets the autoincrement sequence. Intended for test isolation.

type SubscriptionAdapter

type SubscriptionAdapter[E any] struct {
	// contains filtered or unexported fields
}

SubscriptionAdapter adapts SQLiteGlobalReader into a subscription.GlobalReader. This bridges the type gap between sqlitestore.GlobalEvent and subscription.GlobalEvent.

func NewSubscriptionAdapter

func NewSubscriptionAdapter[E any](reader *SQLiteGlobalReader[E]) *SubscriptionAdapter[E]

NewSubscriptionAdapter wraps a SQLiteGlobalReader for use with the subscription system.

func (*SubscriptionAdapter[E]) LatestSequence

func (a *SubscriptionAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence delegates to the underlying reader.

func (*SubscriptionAdapter[E]) ReadFrom

func (a *SubscriptionAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]subscription.GlobalEvent[E], error)

ReadFrom reads events and converts sqlitestore.GlobalEvent to subscription.GlobalEvent.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL