Documentation
¶
Overview ¶
Package sqlitestore provides SQLite-backed checkpoint storage for subscriptions.
SQLiteNotifier implements subscription.StoreNotifier for SQLite stores.
Two strategies are supported:
- Callback-based (default): The store calls Signal() after each append. This is instant and works with both :memory: and file-based databases.
- Polling-based: Polls PRAGMA data_version to detect external writes. Useful when multiple processes write to the same file-based database.
For single-process use (the common case), callback-based is recommended. The Store's WithNotifier option wires this up automatically.
Package sqlitestore provides a SQLite-backed EventStore implementation.
Index ¶
- Constants
- type CheckpointStore
- type GlobalEvent
- type GlobalReaderOption
- type NotifierOption
- type Option
- func WithCodec[E any](c codec.Codec) Option[E]
- func WithCodecRegistry[E any](r *codec.Registry) Option[E]
- func WithRegistry[E any](reg *eskit.EventRegistry) Option[E]
- func WithStoreNotifier[E any](n *SQLiteNotifier) Option[E]
- func WithUpcasters[E any](u *eskit.UpcasterRegistry) Option[E]
- func WithWriteCodec[E any](c codec.Codec) Option[E]
- type SQLiteChangeRelay
- type SQLiteChangeRelayOption
- type SQLiteGlobalReader
- type SQLiteNotifier
- type SnapshotStore
- func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error
- func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error
- func (s *SnapshotStore[S]) LoadSnapshot(ctx context.Context, streamID string) (*eskit.Snapshot[S], error)
- func (s *SnapshotStore[S]) SaveSnapshot(ctx context.Context, snapshot eskit.Snapshot[S]) error
- type Store
- func (s *Store[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, ...) ([]eskit.Event[E], error)
- func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, ...) ([]eskit.Event[E], error)
- func (s *Store[E]) Archive(ctx context.Context, streamID string, target eskit.EventStore[E]) error
- func (s *Store[E]) ArchiveStream(ctx context.Context, streamID string) error
- func (s *Store[E]) Close() error
- func (s *Store[E]) DB() *sql.DB
- func (s *Store[E]) Delete(ctx context.Context, streamID string) error
- func (s *Store[E]) DeleteStream(ctx context.Context, streamID string) error
- func (s *Store[E]) IsTombstoned(ctx context.Context, streamID string) (*eskit.Tombstone, error)
- func (s *Store[E]) LatestSequence(ctx context.Context) (uint64, error)
- func (s *Store[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)
- func (s *Store[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)
- func (s *Store[E]) LoadRaw(ctx context.Context, streamID string) ([]*eskit.RawEvent, error)
- func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)
- func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)
- func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)
- func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)
- func (s *Store[E]) Restore(ctx context.Context, streamID string, source eskit.EventStore[E]) error
- func (s *Store[E]) RestoreStream(ctx context.Context, streamID string) error
- func (s *Store[E]) StreamStatus(ctx context.Context, streamID string) (eskit.StreamState, error)
- func (s *Store[E]) Tombstone(ctx context.Context, streamID string, reason string) error
- func (s *Store[E]) TombstoneStream(ctx context.Context, streamID string) error
- func (s *Store[E]) Truncate(ctx context.Context) error
- type SubscriptionAdapter
Constants ¶
const ( // DefaultPollInterval is the default polling interval for data_version changes. DefaultPollInterval = 50 * time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CheckpointStore ¶
type CheckpointStore struct {
// contains filtered or unexported fields
}
CheckpointStore is a durable, SQLite-backed checkpoint for event subscriptions. It tracks each consumer's last processed global sequence number, enabling crash recovery without reprocessing the entire event stream.
Thread-safe: all methods are safe for concurrent use (SQLite serializes writes).
func NewCheckpointStore ¶
func NewCheckpointStore(db *sql.DB) (*CheckpointStore, error)
NewCheckpointStore creates a SQLite-backed checkpoint store. Creates the checkpoints table if it doesn't exist.
type GlobalEvent ¶
type GlobalEvent[E any] struct { // GlobalSequence is the store-wide monotonic position. GlobalSequence uint64 // StreamID identifies which stream this event belongs to. StreamID string // EventType is the string name of the event (e.g., "OrderCreated"). EventType string // Version is the per-stream version number. Version int // Data is the domain event payload. Data E // Timestamp is when the event was stored. Timestamp time.Time }
GlobalEvent is a store-wide event with a global sequence number. Compatible with subscription.GlobalEvent but decoupled to avoid import cycles.
type GlobalReaderOption ¶
type GlobalReaderOption[E any] func(*SQLiteGlobalReader[E])
GlobalReaderOption configures a SQLiteGlobalReader.
func WithGlobalReaderCodecRegistry ¶
func WithGlobalReaderCodecRegistry[E any](reg *codec.Registry) GlobalReaderOption[E]
WithGlobalReaderCodecRegistry sets the codec registry for reading events with different codecs.
func WithGlobalReaderRegistry ¶
func WithGlobalReaderRegistry[E any](reg *eskit.EventRegistry) GlobalReaderOption[E]
WithGlobalReaderRegistry enables type registry for heterogeneous global reads.
type NotifierOption ¶
type NotifierOption func(*SQLiteNotifier)
NotifierOption configures an SQLiteNotifier.
func WithPollInterval ¶
func WithPollInterval(d time.Duration) NotifierOption
WithPollInterval sets the polling interval for data_version checks. Only relevant when Start() is called for polling mode.
type Option ¶
Option configures a SQLite store.
func WithCodec ¶
WithUpcasters enables event upcasting for schema evolution during Load. WithCodec sets a custom codec for event serialization and registers it for reads. By default, events are serialized as JSON using encoding/json. Use this to plug in CBOR, msgpack, or any custom format. For multi-codec migration, use WithWriteCodec and WithCodecRegistry instead.
func WithCodecRegistry ¶
WithCodecRegistry sets the registry used to look up codecs when reading events. Each event stores which codec was used to serialize it; the registry maps codec names back to implementations for deserialization. If not set, a default registry with JSON, JSONiter, and CBOR is used.
func WithRegistry ¶
func WithRegistry[E any](reg *eskit.EventRegistry) Option[E]
WithRegistry enables type registry for heterogeneous event deserialization.
func WithStoreNotifier ¶
func WithStoreNotifier[E any](n *SQLiteNotifier) Option[E]
WithStoreNotifier wires up an SQLiteNotifier to signal after each Append. This provides instant notification without polling.
func WithUpcasters ¶
func WithUpcasters[E any](u *eskit.UpcasterRegistry) Option[E]
type SQLiteChangeRelay ¶
type SQLiteChangeRelay struct {
// contains filtered or unexported fields
}
SQLiteChangeRelay broadcasts projection changes across server instances using a shared SQLite table. Unlike PostgreSQL's LISTEN/NOTIFY, SQLite has no built-in pub/sub mechanism, so this relay polls a shared eskit_changes table for new entries.
Flow:
- Server A processes event → OnChange fires → Broadcast() inserts row + notifies locally
- Server B polls the table → sees new row from A → forwards to its local ChangeNotifier
- SSE handlers on all servers get notified
Safe for concurrent use.
func NewSQLiteChangeRelay ¶
func NewSQLiteChangeRelay(db *sql.DB, notifier *eskit.ChangeNotifier, opts ...SQLiteChangeRelayOption) *SQLiteChangeRelay
NewSQLiteChangeRelay creates a relay that bridges ChangeNotifier across instances using a shared SQLite changes table.
The notifier receives all changes detected by this relay (from other instances). Call Start() to begin polling.
func (*SQLiteChangeRelay) Broadcast ¶
func (r *SQLiteChangeRelay) Broadcast(change eskit.Change)
Broadcast sends a change notification to all instances by inserting a row into the eskit_changes table. Also notifies the local ChangeNotifier directly for immediate delivery to SSE handlers on this instance.
The INSERT is best-effort: errors are silently ignored since SSE handlers will catch up on the next successful notification.
func (*SQLiteChangeRelay) Close ¶
func (r *SQLiteChangeRelay) Close() error
Close stops the relay. Safe to call multiple times.
func (*SQLiteChangeRelay) Start ¶
func (r *SQLiteChangeRelay) Start(ctx context.Context)
Start begins polling for changes from other instances and forwarding them to the local ChangeNotifier. Blocks until ctx is cancelled or Close is called. Auto-creates the eskit_changes table if it does not exist.
func (*SQLiteChangeRelay) Wait ¶
func (r *SQLiteChangeRelay) Wait()
Wait blocks until the relay's poll loop has fully stopped.
type SQLiteChangeRelayOption ¶
type SQLiteChangeRelayOption func(*SQLiteChangeRelay)
SQLiteChangeRelayOption configures a SQLiteChangeRelay.
func WithRelayPollInterval ¶
func WithRelayPollInterval(d time.Duration) SQLiteChangeRelayOption
WithRelayPollInterval sets the interval between polls for new changes from other instances. Default: 100ms.
func WithRelayPruneInterval ¶
func WithRelayPruneInterval(d time.Duration) SQLiteChangeRelayOption
WithRelayPruneInterval sets the interval between prune operations that remove old change entries. Default: 60s.
func WithRelayRetention ¶
func WithRelayRetention(n int) SQLiteChangeRelayOption
WithRelayRetention sets the maximum number of change entries to retain in the table. Older entries are pruned periodically to prevent unbounded growth. Default: 1000.
type SQLiteGlobalReader ¶
type SQLiteGlobalReader[E any] struct { // contains filtered or unexported fields }
SQLiteGlobalReader implements subscription.GlobalReader for SQLite. It reads events by global sequence (the events.id column).
func NewSQLiteGlobalReader ¶
func NewSQLiteGlobalReader[E any](db *sql.DB, opts ...GlobalReaderOption[E]) *SQLiteGlobalReader[E]
NewSQLiteGlobalReader creates a global reader backed by a sql.DB.
func (*SQLiteGlobalReader[E]) LatestSequence ¶
func (r *SQLiteGlobalReader[E]) LatestSequence(ctx context.Context) (uint64, error)
LatestSequence returns the highest global sequence in the store, or 0 if empty.
func (*SQLiteGlobalReader[E]) ReadFrom ¶
func (r *SQLiteGlobalReader[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error)
ReadFrom returns events starting from the given global sequence (inclusive), up to limit.
type SQLiteNotifier ¶
type SQLiteNotifier struct {
// contains filtered or unexported fields
}
SQLiteNotifier detects database changes via callback or PRAGMA data_version polling. Safe for concurrent use.
func NewNotifier ¶
func NewNotifier(db *sql.DB, opts ...NotifierOption) *SQLiteNotifier
NewNotifier creates an SQLiteNotifier. If db is non-nil, Start() enables PRAGMA data_version polling for external writes. Signal() can always be called directly for instant notification.
func (*SQLiteNotifier) Close ¶
func (n *SQLiteNotifier) Close() error
Close stops the notifier and closes all listener channels.
func (*SQLiteNotifier) Notify ¶
func (n *SQLiteNotifier) Notify(ctx context.Context) <-chan uint64
Notify returns a channel that receives the latest global sequence when new events are appended. Implements subscription.StoreNotifier.
func (*SQLiteNotifier) Signal ¶
func (n *SQLiteNotifier) Signal(sequence uint64)
Signal notifies all listeners that new events are available at the given sequence. Call this after appending events. Non-blocking.
func (*SQLiteNotifier) Start ¶
func (n *SQLiteNotifier) Start(ctx context.Context)
Start begins polling PRAGMA data_version for external database changes. Blocks until ctx is cancelled or Close is called. Optional — not needed if all writes go through a store with WithNotifier.
func (*SQLiteNotifier) Wait ¶
func (n *SQLiteNotifier) Wait()
Wait blocks until the notifier has fully stopped.
type SnapshotStore ¶
type SnapshotStore[S any] struct { // contains filtered or unexported fields }
SnapshotStore is a SQLite-backed eskit.SnapshotStore implementation. Stores snapshots as JSON with schema versioning and timestamps.
func NewSnapshotStore ¶
func NewSnapshotStore[S any](db *sql.DB) (*SnapshotStore[S], error)
NewSnapshotStore creates a SQLite-backed snapshot store. Automatically creates the snapshots table with schema_version and created_at columns.
func (*SnapshotStore[S]) Invalidate ¶
func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error
Invalidate deletes the snapshot for a single stream.
func (*SnapshotStore[S]) InvalidateAll ¶
func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error
InvalidateAll deletes all snapshots.
func (*SnapshotStore[S]) LoadSnapshot ¶
func (*SnapshotStore[S]) SaveSnapshot ¶
type Store ¶
type Store[E any] struct { // contains filtered or unexported fields }
Store is a SQLite-backed event store. Events are serialized as JSON.
func New ¶
New creates a new SQLite event store. The dsn is a SQLite connection string (e.g., "file:events.db" or ":memory:" for testing).
func (*Store[E]) AppendWithOptions ¶
func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, opts eskit.AppendOptions) ([]eskit.Event[E], error)
AppendWithOptions persists events with idempotency and custom timestamp support.
func (*Store[E]) ArchiveStream ¶
ArchiveStream marks a stream as archived. Future appends are rejected.
func (*Store[E]) Delete ¶
Delete permanently removes all events for a stream. Returns ErrStreamNotFound if stream does not exist. Also removes associated snapshots and tombstones.
func (*Store[E]) DeleteStream ¶
DeleteStream permanently removes all events in a stream.
func (*Store[E]) IsTombstoned ¶
IsTombstoned checks if a stream has been tombstoned. Returns nil, nil if the stream is not tombstoned.
func (*Store[E]) LatestSequence ¶
LatestSequence returns the highest global sequence (rowid) in the store, or 0 if empty.
func (*Store[E]) LoadRaw ¶
LoadRaw loads events without deserializing the Data field. Returns RawEvent values that can be selectively decoded on demand. This is significantly faster when you only need metadata or a subset of events.
func (*Store[E]) LoadRawWithOptions ¶
func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)
LoadRawWithOptions loads raw events with optional filtering.
func (*Store[E]) LoadWithOptions ¶
func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)
LoadWithOptions loads events with server-side filtering (event types, version range, limit).
func (*Store[E]) ReadFrom ¶
func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)
ReadFrom implements GlobalReader — reads events across all streams by global sequence. Uses SQLite's rowid (aliased as id via INTEGER PRIMARY KEY AUTOINCREMENT) as global sequence.
func (*Store[E]) ReadFromWithOptions ¶
func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)
ReadFromWithOptions reads global events with optional event type filtering.
func (*Store[E]) Restore ¶
Restore moves an archived stream back from the source store to the primary.
func (*Store[E]) RestoreStream ¶
RestoreStream brings an archived stream back to active state.
func (*Store[E]) StreamStatus ¶
StreamStatus returns the current lifecycle state of a stream.
func (*Store[E]) Tombstone ¶
Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted. Load returns empty. The tombstone record remains for audit.
func (*Store[E]) TombstoneStream ¶
TombstoneStream marks a stream as deleted. Future appends are rejected.
type SubscriptionAdapter ¶
type SubscriptionAdapter[E any] struct { // contains filtered or unexported fields }
SubscriptionAdapter adapts SQLiteGlobalReader into a subscription.GlobalReader. This bridges the type gap between sqlitestore.GlobalEvent and subscription.GlobalEvent.
func NewSubscriptionAdapter ¶
func NewSubscriptionAdapter[E any](reader *SQLiteGlobalReader[E]) *SubscriptionAdapter[E]
NewSubscriptionAdapter wraps a SQLiteGlobalReader for use with the subscription system.
func (*SubscriptionAdapter[E]) LatestSequence ¶
func (a *SubscriptionAdapter[E]) LatestSequence(ctx context.Context) (uint64, error)
LatestSequence delegates to the underlying reader.
func (*SubscriptionAdapter[E]) ReadFrom ¶
func (a *SubscriptionAdapter[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]subscription.GlobalEvent[E], error)
ReadFrom reads events and converts sqlitestore.GlobalEvent to subscription.GlobalEvent.