strata

package module
v0.0.0-...-d13d30b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: Apache-2.0 Imports: 29 Imported by: 0

README

Strata

Strata

Four-tier data library for Go — L1 (memory) → L2 (Redis) → L3 (PostgreSQL) → L4 (distributed ledger/gossip) behind a single API.

Go Reference Go Version

Overview

Strata removes the boilerplate of cache management from Go services. You define a schema once and call Get, Set, Delete, or Search. Strata automatically routes reads through L1 → L2 → L3, propagates writes, evicts stale entries, and keeps a cluster of server instances consistent via Redis pub/sub invalidation.

The optional L4 distributed sync layer adds a peer-to-peer gossip network with cryptographically signed, hash-chained records — providing an immutable audit trail, cross-node publishing, quorum-confirmed records, and revocation support, with no centralised coordinator required.

Get(ctx, "players", id, &dest)
  │
  ├─► L1 hit?  → return immediately   (~100 ns)
  ├─► L2 hit?  → populate L1 → return (~500 μs)
  └─► L3 hit?  → populate L2+L1 → return (~5 ms)
               └─► not found → ErrNotFound

L4 (optional, independent of the read path above):
  Publish(appID, nodeID, payload) → gossip to peers → quorum confirm → immutable record

Table of Contents

  1. Installation
  2. Quick Start
  3. Schema Definition
  4. Core Operations
  5. Query Builder
  6. Transactions
  7. Cache Control
  8. Write Modes
  9. Schema Migration
  10. Encryption
  11. Observability
  12. Configuration Reference
  13. Error Reference
  14. Architecture Notes
  15. L4 — Distributed Sync Layer
  16. Vector Search (L3 Semantic Layer)
  17. Contributing

Installation

go get github.com/AndrewDonelson/strata

Requirements: Go 1.21+, PostgreSQL 14+ with pgvector extension (optional — required only for vector search), Redis 6+


Quick Start

package main

import (
    "context"
    "log"
    "time"

    "github.com/AndrewDonelson/strata"
)

type Player struct {
    ID        string    `strata:"primary_key"`
    Username  string    `strata:"unique,index"`
    Email     string    `strata:"index,nullable"`
    Level     int       `strata:"default:1"`
    CreatedAt time.Time `strata:"auto_now_add"`
    UpdatedAt time.Time `strata:"auto_now"`
}

func main() {
    ctx := context.Background()

    // 1. Create the data store
    ds, err := strata.NewDataStore(strata.Config{
        PostgresDSN: "postgres://user:pass@localhost:5432/mydb",
        RedisAddr:   "localhost:6379",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer ds.Close()

    // 2. Register schemas (once, at startup)
    err = ds.Register(strata.Schema{
        Name:  "players",
        Model: &Player{},
        L1:    strata.MemPolicy{TTL: 60 * time.Second, MaxEntries: 50_000},
        L2:    strata.RedisPolicy{TTL: 30 * time.Minute},
        L3:    strata.PostgresPolicy{},
    })
    if err != nil {
        log.Fatal(err)
    }

    // 3. Run migrations
    if err := ds.Migrate(ctx); err != nil {
        log.Fatal(err)
    }

    // 4. Use it
    player := &Player{ID: "p1", Username: "andrew", Level: 1}
    if err := ds.Set(ctx, "players", "p1", player); err != nil {
        log.Fatal(err)
    }

    // Typed retrieval — no type assertion needed
    p, err := strata.GetTyped[Player](ctx, ds, "players", "p1")
    if err != nil {
        log.Fatal(err)
    }
    log.Printf("player: %+v", p)
}

Schema Definition

A Schema binds a Go struct to three cache tiers and optionally to a PostgreSQL table.

type Schema struct {
    Name      string         // collection/table name; derived from struct name if empty
    Model     any            // pointer to a struct
    L1        MemPolicy      // in-memory cache settings
    L2        RedisPolicy    // Redis cache settings
    L3        PostgresPolicy // Postgres persistence settings
    WriteMode WriteMode      // WriteThrough (default), WriteBehind, WriteThroughL1Async
    Indexes   []Index        // additional database indexes
    Hooks     SchemaHooks    // lifecycle callbacks
}

Register schemas at application startup before any data operations:

err := ds.Register(strata.Schema{
    Name:  "sessions",
    Model: &Session{},
    L1:    strata.MemPolicy{TTL: 5 * time.Minute, MaxEntries: 100_000},
    L2:    strata.RedisPolicy{TTL: 4 * time.Hour},
    // No L3 — sessions are ephemeral, Redis is source of truth
})
Struct Tags

Control column behaviour in PostgreSQL and caching behaviour with strata struct tags.

Tag Effect
primary_key marks the identity field used in Get/Set routing (required)
unique adds UNIQUE constraint in Postgres
index creates a non-unique database index
nullable column is NULL-able (default: NOT NULL)
omit_cache field excluded from L1 and L2 — stored in Postgres only
omit_l1 field excluded from L1 only; still cached in L2
vector marks field as a vector(N) column in Postgres; implies omit_cache (never in L1/L2); field type must be pgvector.Vector; dimension is set automatically from Config.EmbeddingProvider.Dimensions()
default:X generates DEFAULT X in the DDL
auto_now_add set to time.Now() on first insert, never updated
auto_now set to time.Now() on every write
encrypted AES-256-GCM encrypted at rest (requires EncryptionKey in Config)
- field is ignored entirely
type User struct {
    ID           string    `strata:"primary_key"`
    Email        string    `strata:"unique,index"`
    PasswordHash string    `strata:"omit_cache"`       // only stored in Postgres
    APIKey       string    `strata:"encrypted"`        // encrypted at rest
    Role         string    `strata:"default:viewer"`
    Notes        string    `strata:"nullable"`
    CreatedAt    time.Time `strata:"auto_now_add"`
    UpdatedAt    time.Time `strata:"auto_now"`
    Internal     string    `strata:"-"`                // not persisted at all
}

Supported Go → Postgres type mappings:

Go type PostgreSQL type
string TEXT
int, int32, int64 BIGINT
float32, float64 DOUBLE PRECISION
bool BOOLEAN
time.Time TIMESTAMPTZ
[]byte BYTEA
pgvector.Vector vector(N) (requires pgvector extension)
struct / map / slice JSONB
Cache Policies
type MemPolicy struct {
    TTL        time.Duration  // 0 = never expire
    MaxEntries int            // 0 = unlimited (per shard — 256 shards total)
    Eviction   EvictionPolicy // EvictLRU (default), EvictLFU, EvictFIFO
}

type RedisPolicy struct {
    TTL       time.Duration  // 0 = never expire
    KeyPrefix string         // optional; defaults to schema name
}

type PostgresPolicy struct {
    TableName   string // optional; defaults to schema name
    ReadReplica string // optional DSN for a read-only replica
    PartitionBy string // optional column for table partitioning
}
Indexes

Extra database indexes are declared alongside the schema:

strata.Schema{
    Name:  "events",
    Model: &Event{},
    Indexes: []strata.Index{
        {Fields: []string{"user_id"}, Name: "idx_events_user"},
        {Fields: []string{"user_id", "created_at"}, Unique: false},
        {Fields: []string{"trace_id"}, Unique: true},
    },
}
Lifecycle Hooks
type SchemaHooks struct {
    BeforeSet    func(ctx context.Context, value any) error
    AfterSet     func(ctx context.Context, value any)
    BeforeGet    func(ctx context.Context, id string)
    AfterGet     func(ctx context.Context, value any)
    OnEvict      func(ctx context.Context, key string, value any)
    OnWriteError func(ctx context.Context, key string, err error)
}
  • BeforeSet — validate or mutate the value before any write; return a non-nil error to abort.
  • AfterSet — post-write notification (e.g. emit a domain event).
  • BeforeGet — log or trace the read.
  • AfterGet — populate computed fields.
  • OnEvict — called when L1 evicts an entry.
  • OnWriteError — called when a write-behind write exhausts its retries.

Core Operations

Get

Reads a record by primary key. Cascade: L1 → L2 → L3. Each cache miss populates the tiers above it.

// Generic form — preferred
p, err := strata.GetTyped[Player](ctx, ds, "players", "p123")
if errors.Is(err, strata.ErrNotFound) {
    // record does not exist
}

// Non-generic form with destination pointer
var p Player
err := ds.Get(ctx, "players", "p123", &p)
Set

Writes a record. The tier order depends on the schema's WriteMode (see Write Modes).

player := &Player{ID: "p1", Username: "andrew", Level: 5}
err := ds.Set(ctx, "players", "p1", player)
SetMany

Writes multiple records in one call using a map[string]any of id → value:

err := ds.SetMany(ctx, "players", map[string]any{
    "p1": &Player{ID: "p1", Username: "a"},
    "p2": &Player{ID: "p2", Username: "b"},
})

Internally SetMany uses a Redis pipeline for L2 and PostgreSQL COPY for L3.

Delete

Removes a record from all three tiers:

err := ds.Delete(ctx, "players", "p1")

Queries PostgreSQL (L3) by default. Each matching record is individually populated into L1 and L2 as a side-effect.

// Non-generic form
var results []Player
err := ds.Search(ctx, "players", strata.Q().Where("level > $1", 10).Limit(50).Build().Ptr(), &results)

// Generic form
players, err := strata.SearchTyped[Player](ctx, ds, "players",
    strata.Q().Where("level > $1", 10).OrderBy("created_at").Desc().Limit(50).Build().Ptr())
SearchCached

Caches the entire result set under a composite key derived from the query. Subsequent calls with the same *Query return the cached slice until the L2 TTL expires.

var leaderboard []Player
err := ds.SearchCached(ctx, "players",
    strata.Q().OrderBy("level").Desc().Limit(100).Build().Ptr(),
    &leaderboard)
Exists & Count
ok, err := ds.Exists(ctx, "players", "p1")

n, err := ds.Count(ctx, "players", strata.Q().Where("level >= $1", 50).Build().Ptr())

Count always hits L3.


Query Builder

The fluent Q() builder constructs Query values without struct literals:

q := strata.Q().
    Where("region = $1 AND level > $2", "eu-west", 10).
    OrderBy("score").
    Desc().
    Limit(25).
    Offset(50).
    Fields("id", "username", "score").
    Build()

// Force tiers
strata.Q().Where("id = $1", id).ForceL3().Build() // bypass all caches
strata.Q().Where("id = $1", id).ForceL2().Build() // skip L1 only

Query fields at a glance:

Field Type Description
Where string Parameterised SQL WHERE clause
Args []any Positional arguments for WHERE ($1, $2, …)
OrderBy string Column name to sort by
Desc bool Descending sort
Limit int Max rows (0 = use default: 100)
Offset int Rows to skip
Fields []string Column projection (empty = all)
ForceL3 bool Skip L1 and L2 entirely
ForceL2 bool Skip L1 only

Transactions

Tx queues Set and Delete operations and commits them atomically to L3. Caches (L1 + L2) are updated only after a successful commit.

err := ds.Tx(ctx).
    Set("players", "p1", &Player{ID: "p1", Level: 99}).
    Set("scores", "p1", &Score{PlayerID: "p1", Points: 9999}).
    Delete("sessions", "old-session-id").
    Commit()
if errors.Is(err, strata.ErrTxFailed) {
    // all operations were rolled back
}

Cache Control

Invalidate

Removes a single key from L1 and L2 without touching L3. The next Get will re-fetch from Postgres and repopulate the caches.

err := ds.Invalidate(ctx, "players", "p1")
WarmCache

Pre-loads up to limit records from L3 into L1 and L2. Use at startup to avoid cold-cache spikes.

// load first 10,000 active players
err := ds.WarmCache(ctx, "players", 10_000)
// 0 = load all rows
err = ds.WarmCache(ctx, "config", 0)
FlushDirty

Forces all pending write-behind entries to be written to L3 immediately. Called automatically by Close().

err := ds.FlushDirty(ctx)

Write Modes

Set per-schema or globally via Config.DefaultWriteMode.

Mode L3 L2 L1 Latency Durability
WriteThrough (default) sync sync sync highest maximum — L3 is written before returning
WriteThroughL1Async sync sync lazy medium L3 + L2 durable; L1 populated on next read
WriteBehind async async immediate lowest L1 written first; L3 durable within flush interval
strata.Schema{
    Name:      "leaderboard",
    Model:     &Score{},
    WriteMode: strata.WriteBehind,          // high-frequency score updates
    L1:        strata.MemPolicy{TTL: 10 * time.Second},
    L2:        strata.RedisPolicy{TTL: time.Minute},
    L3:        strata.PostgresPolicy{},
}

Write-behind tuning (Config fields):

Field Default Purpose
WriteBehindFlushInterval 500 ms how often the dirty buffer flushes
WriteBehindFlushThreshold 100 flush immediately when dirty count hits this
WriteBehindMaxRetry 5 max L3 retries before OnWriteError hook fires

Schema Migration

Strata manages its own DDL. Migrations are additive-only (new tables, new columns, new indexes) and idempotent.

// Apply all pending migrations for all registered schemas
err := ds.Migrate(ctx)

// Apply SQL files from a directory (files must be named NNN_description.sql)
err = ds.MigrateFrom(ctx, "./migrations")

// Inspect migration state
records, err := ds.MigrationStatus(ctx)
for _, r := range records {
    fmt.Printf("%-30s applied: %s\n", r.FileName, r.AppliedAt.Format(time.RFC3339))
}

Migrate is safe to call on every startup — it only acts when something has changed. Migration state is persisted in the _strata_migrations table.

Note: Destructive changes (drop column, rename column, change type) must be handled via manual SQL files in MigrateFrom. Strata will never drop or rename columns automatically.


Encryption

Enable field-level AES-256-GCM encryption for any field tagged encrypted.

// Generate a 32-byte key and store it in a secrets manager.
key := make([]byte, 32)
rand.Read(key)

ds, err := strata.NewDataStore(strata.Config{
    PostgresDSN:   "...",
    RedisAddr:     "...",
    EncryptionKey: key, // enables the built-in AES256GCM encryptor
})

Fields tagged encrypted are:

  • Encrypted with AES-256-GCM (random nonce per write) before being written to Postgres.
  • Decrypted transparently on reads from L3.
  • Not cached in L1 or L2 while encrypted — Strata stores plaintext in the fast tiers so reads are always as fast as possible.

Only string fields support the encrypted tag today.


Observability

Stats
s := ds.Stats()
fmt.Printf("gets=%d  sets=%d  deletes=%d  errors=%d  l1_entries=%d  dirty=%d\n",
    s.Gets, s.Sets, s.Deletes, s.Errors, s.L1Entries, s.DirtyCount)
Field Type Description
Gets int64 Total Get calls
Sets int64 Total Set calls
Deletes int64 Total Delete calls
Errors int64 Total errors
L1Entries int64 Current L1 entry count
DirtyCount int64 Write-behind entries pending flush
Logger

Implement the Logger interface to integrate with any logging library:

type Logger interface {
    Info(msg string, keysAndValues ...any)
    Warn(msg string, keysAndValues ...any)
    Error(msg string, keysAndValues ...any)
    Debug(msg string, keysAndValues ...any)
}

Example — wrap log/slog:

type slogAdapter struct{ l *slog.Logger }

func (a slogAdapter) Info(msg string, kv ...any)  { a.l.Info(msg, kv...) }
func (a slogAdapter) Warn(msg string, kv ...any)  { a.l.Warn(msg, kv...) }
func (a slogAdapter) Error(msg string, kv ...any) { a.l.Error(msg, kv...) }
func (a slogAdapter) Debug(msg string, kv ...any) { a.l.Debug(msg, kv...) }

ds, _ := strata.NewDataStore(strata.Config{
    Logger: slogAdapter{slog.Default()},
})
Metrics

Implement MetricsRecorder (defined in internal/metrics) to emit counters and histograms to Prometheus, Datadog, etc. Pass nil or omit the field for a no-op recorder.

Codec

Swap the serialisation format used for L1 and L2:

import "github.com/AndrewDonelson/strata/internal/codec"

ds, _ := strata.NewDataStore(strata.Config{
    Codec: codec.MsgPack{}, // faster than JSON; default is codec.JSON{}
})

Configuration Reference

type Config struct {
    // ── Connections ──────────────────────────────────────────────────
    PostgresDSN   string   // "postgres://user:pass@host:5432/db?sslmode=disable"
    RedisAddr     string   // "localhost:6379"
    RedisPassword string
    RedisDB       int

    // ── Pool sizes ───────────────────────────────────────────────────
    L1Pool L1PoolConfig{
        MaxEntries int            // per-shard limit (256 shards)
        Eviction   EvictionPolicy // EvictLRU | EvictLFU | EvictFIFO
    }
    L2Pool L2PoolConfig{
        PoolSize     int
        DialTimeout  time.Duration
        ReadTimeout  time.Duration
        WriteTimeout time.Duration
    }
    L3Pool L3PoolConfig{
        MaxConns        int32
        MinConns        int32
        MaxConnLifetime time.Duration
        MaxConnIdleTime time.Duration
    }

    // ── TTL defaults (overridden per schema) ─────────────────────────
    DefaultL1TTL time.Duration // default: 5m
    DefaultL2TTL time.Duration // default: 30m

    // ── Write behaviour ──────────────────────────────────────────────
    DefaultWriteMode          WriteMode     // default: WriteThrough
    WriteBehindFlushInterval  time.Duration // default: 500ms
    WriteBehindFlushThreshold int           // default: 100
    WriteBehindMaxRetry       int           // default: 5

    // ── Invalidation ─────────────────────────────────────────────────
    InvalidationChannel string // Redis pub/sub channel; default: "strata:invalidate"

    // ── Pluggable components ─────────────────────────────────────────
    Codec   codec.Codec           // default: codec.JSON{}
    Metrics metrics.MetricsRecorder // default: no-op
    Logger  Logger                // default: no-op

    // ── Encryption ───────────────────────────────────────────────────
    EncryptionKey []byte // must be exactly 32 bytes; nil = disabled
}

Minimal valid config (only PostgresDSN and RedisAddr are required; all other fields have sensible defaults):

strata.Config{
    PostgresDSN: os.Getenv("POSTGRES_DSN"),
    RedisAddr:   os.Getenv("REDIS_ADDR"),
}

Error Reference

All errors are exported sentinel values compatible with errors.Is:

// Schema
strata.ErrSchemaNotFound    // schema name not registered
strata.ErrSchemaDuplicate   // Register called twice with same name
strata.ErrNoPrimaryKey      // struct has no primary_key tag
strata.ErrInvalidModel      // nil or non-pointer model
strata.ErrMissingPrimaryKey // value passed to Set has empty/zero PK

// Data
strata.ErrNotFound     // record does not exist in any tier
strata.ErrDecodeFailed // codec or encryption decode error
strata.ErrEncodeFailed // codec or encryption encode error

// Infrastructure
strata.ErrL1Unavailable // in-memory store not initialised
strata.ErrL2Unavailable // Redis unavailable
strata.ErrL3Unavailable // Postgres unavailable
strata.ErrUnavailable   // all tiers unavailable

// Transaction
strata.ErrTxFailed  // Commit returned a Postgres error (rolled back)
strata.ErrTxTimeout // transaction deadline exceeded

// Config
strata.ErrInvalidConfig // missing required fields

// Hook
strata.ErrHookPanic // BeforeSet/BeforeGet hook panicked (recovered)

// Write-behind
strata.ErrWriteBehindMaxRetry // dirty entry exceeded max retry count

// Vector Search
strata.ErrNoVectorField            // schema has no strata:"vector" field
strata.ErrPgvectorExtensionMissing // CREATE EXTENSION vector not installed
strata.ErrVectorDimensionMismatch  // provider dimension changed since last migration
strata.ErrInvalidIndexType         // IVFFlat/HNSW index on non-vector field
strata.ErrTopKInvalid              // topK < 1
strata.ErrNoEmbeddingProvider      // Config.EmbeddingProvider is nil
strata.ErrEmbeddingModelChanged    // provider model ID changed since last migration
strata.ErrReEmbedAlreadyRunning    // another ReEmbed is already in progress
strata.ErrReEmbedTextFieldMissing  // textFieldName not found in model struct
strata.ErrInvalidTagForType        // strata:"vector" on a non-pgvector.Vector field
strata.ErrEmptyVectorQuery         // blank query string passed to VectorSearch
strata.ErrNilContext               // nil context passed to VectorSearch or ReEmbed

Architecture Notes

L1 — Sharded In-Memory Store

L1 uses 256 independent shards (FNV-32a hash → shard index) each protected by its own sync.RWMutex. This eliminates global lock contention under high concurrency. Eviction (LRU, LFU, or FIFO) runs per-shard. TTL expiry is checked lazily on read plus a background sweep every 30 seconds.

MaxEntries in MemPolicy is the limit per shard. For a total limit of ~50 000, set MaxEntries: 200.

L2 — Redis

Strata accepts any redis.UniversalClient (standalone, Sentinel, or Cluster). Keys follow the format strata:{schema}:{id}. Batch operations use a Redis pipeline for single round-trip performance.

L3 — PostgreSQL

Strata uses pgxpool for connection pooling. SetMany uses the PostgreSQL COPY protocol for bulk inserts. Upsert is INSERT … ON CONFLICT DO UPDATE. Read replica connections (PostgresPolicy.ReadReplica) are used for Search and Count queries.

When pgvector is installed, L3 also stores vector(N) columns and executes ANN similarity queries via VectorSearch. Vector fields are never held in L1 or L2 — they are L3-only by design.

L4 — Distributed Gossip Ledger

L4 is integrated into the Strata write path, not a standalone module. When Config.L4.Enabled = true and a schema opts in via L4.Enabled: true, every confirmed L3 write automatically triggers an L4 publish — the calling code never needs to know.

Each node maintains its own local copy of all records it receives; there is no leader and no central store. Records form a per-AppID hash chain: each record's Hash is computed over prevHash|appID|uuid|payload|timestamp using SHA-256 and signed with Ed25519 (NodeSig). Quorum confirmation advances a record from pending to confirmed; a revocation tombstone is gossiped immediately to all connected peers.

In peer mode, records reside in an in-memory store and are lost on restart. In ledger mode, records are persisted to a per-node BoltDB file (DataDir/l4.db). The optional APIServer exposes query access over HTTP.

Cross-Instance Invalidation

Every write publishes a JSON message to the strata:invalidate Redis channel:

{"schema": "players", "id": "p1", "op": "set"}

Every running instance (including the writer) subscribes to this channel and removes the affected L1 entry on receipt. This keeps the L1 caches of all servers in a cluster consistent within ~50 ms of any write.


L4 — Distributed Sync Layer

L4 is an optional, integrated peer-to-peer sync layer built into the Strata write path. When enabled at both the Config level and the Schema level, every successful L3 (PostgreSQL) write is automatically published to a distributed, Ed25519-signed, hash-chained ledger — no extra code required.

The write flow with L4 enabled:

Set(ctx, schema, id, value)
  │
  ├─► L3 write (PostgreSQL)  ← confirmed first
  ├─► L4 Publish             ← automatic after L3 success
  ├─► L2 write (Redis)
  └─► L1 write (in-memory)

For WriteBehind schemas, L4 is synced after the dirty-queue flush confirms the L3 write — never before.
For WriteThrough and WriteThroughL1Async, L4 is synced synchronously in the same Set call.

L4 is disabled by default. Schemas that don't set L4.Enabled: true are completely unaffected.

Quick Start
1. Enable L4 globally in Config
ds, err := strata.NewDataStore(strata.Config{
    PostgresDSN: "...",
    RedisAddr:   "...",
    L4: strata.L4Config{
        Enabled:        true,
        Mode:           "ledger",       // "peer" (in-memory) or "ledger" (BoltDB-backed)
        Port:           7743,
        DataDir:        "/var/lib/myapp/l4",
        Quorum:         3,
        BootstrapPeers: []string{"10.0.0.2:7743", "10.0.0.3:7743"},
    },
})
2. Opt schemas into L4 sync
err = ds.Register(strata.Schema{
    Name:  "leaderboard",
    Model: &LeaderboardEntry{},
    L4: strata.L4Policy{
        Enabled:     true,
        AppID:       "bounty-hunters",  // L4 namespace; defaults to schema name
        SyncDeletes: true,              // Delete() → L4 Revoke()
    },
})
3. Just use Strata normally — L4 syncs automatically
// This upserts to PostgreSQL AND publishes to the L4 distributed ledger.
err = ds.Set(ctx, "leaderboard", entry.Callsign, &LeaderboardEntry{
    Callsign: "vox",
    XP:       14_500,
    Rank:     3,
    Bounty:   2_850,
    Credits:  73_200,
})

// Delete → PostgreSQL delete + L4 revocation (because SyncDeletes: true)
err = ds.Delete(ctx, "leaderboard", "vox")
Example — Subspace Bounty Hunter Leaderboard

A public ledger of player stats that any node in the network can query and verify:

type LeaderboardEntry struct {
    Callsign string `strata:"primary_key"`
    XP       int64
    Rank     int
    Bounty   int64
    Credits  int64
    UpdatedAt time.Time `strata:"auto_now"`
}

// Config
ds, _ := strata.NewDataStore(strata.Config{
    PostgresDSN: os.Getenv("PG_DSN"),
    RedisAddr:   "localhost:6379",
    L4: strata.L4Config{
        Enabled: true,
        Mode:    "ledger",
        Port:    7743,
        Quorum:  2,
    },
})

// Schema — opt into L4
_ = ds.Register(strata.Schema{
    Name:  "leaderboard",
    Model: &LeaderboardEntry{},
    L4:    strata.L4Policy{Enabled: true, AppID: "bounty-hunters"},
})
_ = ds.Migrate(ctx)

// Update a player — L4 automatically gets a cryptographically signed, hash-chained record
_ = ds.Set(ctx, "leaderboard", "vox", &LeaderboardEntry{
    Callsign: "vox",
    XP:       14_500,
    Rank:     3,
    Bounty:   2_850,
    Credits:  73_200,
})

Any network peer can now verify the full history of every leaderboard change — even offline nodes that rejoin later will sync the ledger automatically.

L4Config — global layer configuration
type strata.L4Config struct {
    Enabled        bool          // false = L4 is entirely inactive (default)
    Mode           string        // "peer" (in-memory) or "ledger" (BoltDB-backed)
    Port           int           // TCP listen port; default 7743
    DataDir        string        // BoltDB data directory for ledger mode; default "/var/lib/strata/l4"
    SyncInterval   time.Duration // gossip sync frequency; default 30s
    MaxPeers       int           // max simultaneous peer connections; default 50
    Quorum         int           // confirmations needed for pending → confirmed; default 3
    BootstrapPeers []string      // "host:port" peer addresses to dial on startup
    DNSSeed        string        // DNS seed hostname for peer discovery
    NodeKeyPath    string        // path to load/persist the Ed25519 node private key (optional)
}
L4Policy — per-schema opt-in
type strata.L4Policy struct {
    Enabled     bool   // false = no L4 sync for this schema (default)
    AppID       string // L4 namespace the records are published under; defaults to schema Name
    SyncDeletes bool   // if true, Delete() issues an L4 Revoke; if false, L4 records persist
}

Schemas with L4.Enabled = false (the default) are completely unaffected by the global L4Config.

Accessing L4 Records Directly

You can query the L4 layer independently at any time — useful for building audit UIs, ledger explorers, or cross-node sync checks.

Note: Direct access requires importing github.com/AndrewDonelson/strata/internal/l4.

import "github.com/AndrewDonelson/strata/internal/l4"

// Get the running layer (the DataStore wires this internally)
// For external queries, construct your own read-only layer pointed at the same DataDir:
store, _ := l4.NewBoltStore("/var/lib/myapp/l4")
defer store.Close()

// Retrieve the last 50 records for the bounty-hunters app
records, _ := store.Latest("bounty-hunters", 50)
for _, r := range records {
    fmt.Printf("%s  rank=%v  credits=%v  confirmed=%v\n",
        r.UUID, r.Payload["rank"], r.Payload["credits"], r.Confirmed)
}

// Subscribe to live record events (via a full l4.Layer):
layer.Subscribe("bounty-hunters", func(rec l4.L4Record) {
    fmt.Printf("new record: %s  status=%s\n", rec.UUID, rec.Status)
})
Record Lifecycle
Set(ctx, schema, id, value)
  └─► L3 write succeeds
        └─► L4 Publish(appID, nodeID, payload)
              └─► status = "pending"
              └─► gossips to peers
                    └─► Quorum confirmations → status = "confirmed"

Delete(ctx, schema, id)  [when SyncDeletes = true]
  └─► L3 delete succeeds
        └─► L4 Revoke(appID, id)
              └─► status = "revoked"
              └─► gossips to peers

Record status constants:

l4.StatusPending   = "pending"   // published, awaiting quorum
l4.StatusConfirmed = "confirmed" // quorum reached
l4.StatusRevoked   = "revoked"   // record revoked (schema Delete with SyncDeletes)
HTTP API Server (ledger mode)

An optional HTTP server can be started alongside the ledger to serve external queries. Build it with the running l4.Layer:

import "github.com/AndrewDonelson/strata/internal/l4"

store, _   := l4.NewBoltStore("/var/lib/myapp/l4")
transport  := l4.NewTCPTransport(nodeID, 50, nil)
api        := l4.NewAPIServer(layer, store, transport)
go api.Listen(":7744")
// Graceful shutdown:
_ = api.Shutdown(context.Background())
Method Path Returns
GET /query/{uuid}?app_id=<appID> L4Record JSON (200) or 404
GET /peers []L4Peer JSON
POST /sync {"height": N}

Rate limited to 100 req/min per IP. Access-Control-Allow-Origin: * on all responses.

L4 Errors
l4.ErrL4Disabled        // Config.L4.Enabled = false; all l4 ops return this
l4.ErrInvalidL4Mode     // Mode must be "peer" or "ledger"
l4.ErrInvalidQuorum     // Quorum < 1
l4.ErrAlreadyPublished  // record already exists for this UUID+AppID
l4.ErrAlreadyRevoked    // record is already revoked
l4.ErrNotFound          // no record found
l4.ErrInvalidSignature  // Ed25519 signature check failed
l4.ErrChainBreak        // hash chain integrity violated
l4.ErrNoPeers           // no connected peers
l4.ErrQuorumNotMet      // record is still pending (not enough confirmations)
l4.ErrStoreUnavailable  // store unavailable in peer mode

L4 sync errors inside Set/Delete are logged and swallowed — they never cause the Strata operation to fail. The source of truth is always L3.

Testing with L4
// Unit tests — disable L4 globally (zero overhead)
ds, _ := strata.NewDataStore(strata.Config{
    PostgresDSN: "...",
    // L4 not set → Enabled defaults to false
})

// Integration tests — peer mode with in-memory transport (no ports)
ds, _ := strata.NewDataStore(strata.Config{
    PostgresDSN: testDSN,
    L4: strata.L4Config{
        Enabled: true,
        Mode:    "peer",
        Quorum:  1,
    },
})
_ = ds.Register(strata.Schema{
    Name:  "leaderboard",
    Model: &LeaderboardEntry{},
    L4:    strata.L4Policy{Enabled: true},
})

Vector Search (L3 Semantic Layer)

Strata supports first-class approximate nearest-neighbour (ANN) search via the pgvector Postgres extension. Strata owns the full embedding lifecycle — it calls your provider, stores vectors, indexes them, and detects model changes automatically.

Prerequisites
  1. Install the pgvector extension on your Postgres server:
CREATE EXTENSION IF NOT EXISTS vector;
  1. Add the Go dependency:
go get github.com/pgvector/pgvector-go
  1. Set Config.EmbeddingProvider — no dimension numbers needed in application code.
Embedding Provider
// Built-in: Ollama (local / on-prem)
prov := strata.NewOllamaProvider("http://localhost:11434", "nomic-embed-text")
// Dimension auto-detected from the first Embed() call and cached.

// Built-in: OpenAI-compatible APIs
prov := strata.NewOpenAIProvider(os.Getenv("OPENAI_API_KEY"), "text-embedding-3-small")

Both implement the EmbeddingProvider interface:

type EmbeddingProvider interface {
    Embed(ctx context.Context, text string) (pgvector.Vector, error)
    Dimensions() int   // called once at Register time to size the Postgres column
    ModelID() string   // stored in strata_schema_meta; change triggers ErrEmbeddingModelChanged
}
Vector Schema
import pgvector "github.com/pgvector/pgvector-go"

type FAQ struct {
    ID         string          `strata:"primary_key"`
    Question   string
    CustomerID string
    Embedding  pgvector.Vector `strata:"vector"` // L3-only; never in L1/L2
}

ds, _ := strata.NewDataStore(strata.Config{
    PostgresDSN:       os.Getenv("POSTGRES_DSN"),
    EmbeddingProvider: strata.NewOllamaProvider("http://localhost:11434", "nomic-embed-text"),
})

ds.Register(strata.Schema{
    Name:  "faq",
    Model: &FAQ{},
    Indexes: []strata.Index{
        {
            Fields:         []string{"embedding"},
            Type:           strata.IndexHNSW,
            M:              16,
            EfConstruction: 64,
            DistanceFunc:   "cosine",
        },
    },
})
ds.Migrate(ctx) // creates vector(768) column + HNSW index; checks extension

Migrate() also creates the internal strata_schema_meta table which tracks the model ID and dimension for each vector schema. If you switch models, Migrate() returns ErrEmbeddingModelChanged and you call ReEmbed explicitly — no silent data corruption.

VectorSearch

Pass a plain text query — Strata embeds it internally:

results, err := ds.VectorSearch(ctx, "faq", "How do I get a refund?", 10,
    map[string]any{"customer_id": "cust-42"},  // optional column filters
)
for _, r := range results {
    faq := r.Value.(*FAQ)
    fmt.Printf("score=%.4f  %s\n", r.Score, faq.Question)
}

SimilarityResult fields:

type SimilarityResult struct {
    ID    string   // primary key of the matching record
    Score float64  // cosine similarity: 1.0 = identical, 0.0 = orthogonal
    Value any      // fully-hydrated model struct (*FAQ in the example above)
}

Results are automatically back-filled into L2 and L1 after the query (vector field stripped before cache storage — by design).

ReEmbed — Model Migration

When you switch to a new embedding model:

// 1. Update Config.EmbeddingProvider to the new model.
// 2. ds.Migrate(ctx) will return ErrEmbeddingModelChanged — this is expected.
// 3. Re-embed all existing records:
err := ds.ReEmbed(ctx, "faq", "Question")  // textFieldName is the struct field to re-embed

ReEmbed is resumable — if interrupted (e.g. context cancel), a subsequent call picks up from where it left off. Progress is tracked in strata_schema_meta.reembed_progress.

Vector Index Types
Constant Postgres index Index fields
strata.IndexDefault (empty) btree / sequential scan
strata.IndexIVFFlat ivfflat Lists (default 100)
strata.IndexHNSW hnsw M (default 16), EfConstruction (default 64)
strata.IndexTrigram gin + pg_trgm for trigram text similarity

Distance functions (DistanceFunc field): "cosine" (default), "l2", "ip" (inner product).

Rule of thumb: HNSW for high-recall production workloads; IVFFlat for faster index build time on large tables (set Lists ≈ sqrt(row_count)).

Vector Errors

See Error Reference for the full list. Key ones:

Error When
ErrPgvectorExtensionMissing CREATE EXTENSION vector not run
ErrEmbeddingModelChanged provider ModelID() differs from stored value — call ReEmbed
ErrVectorDimensionMismatch provider Dimensions() changed — update your model or provider
ErrNoEmbeddingProvider Config.EmbeddingProvider is nil
ErrReEmbedAlreadyRunning ReEmbed already in progress for this schema

Contributing

  1. Fork the repository and create a feature branch.
  2. Write tests first — the TDD plan in [STRATA_TDD.md] drives all development.
  3. Run go test -race ./... — all tests must pass.
  4. Run go vet ./... — no warnings.
  5. Open a pull request with a clear description.

Running tests:

# Unit tests (no external dependencies)
go test -race ./...

# With verbose output
go test -race -v ./...

# Benchmarks
go test -bench=. -benchmem ./...

Integration tests (require Docker with Postgres and Redis) are tagged integration and not run by default:

STRATA_POSTGRES_DSN="postgres://..." STRATA_REDIS_ADDR="localhost:6379" \
  go test -tags integration -race ./...

Strata — built by Nlaak Studios and released as open-source software.

Documentation

Overview

Package strata provides a three-tier auto-caching data library unifying in-memory (L1), Redis (L2), and PostgreSQL (L3) behind a single API.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSchemaNotFound    = errors.New("strata: schema not registered")
	ErrSchemaDuplicate   = errors.New("strata: schema already registered")
	ErrNoPrimaryKey      = errors.New("strata: struct has no primary_key field")
	ErrInvalidModel      = errors.New("strata: model must be a non-nil pointer to a struct")
	ErrMissingPrimaryKey = errors.New("strata: value is missing primary key")
)

Schema errors

View Source
var (
	ErrNotFound     = errors.New("strata: record not found")
	ErrDecodeFailed = errors.New("strata: failed to decode stored value")
	ErrEncodeFailed = errors.New("strata: failed to encode value for storage")
)

Data errors

View Source
var (
	ErrL1Unavailable = errors.New("strata: L1 memory store unavailable")
	ErrL2Unavailable = errors.New("strata: L2 Redis unavailable")
	ErrL3Unavailable = errors.New("strata: L3 Postgres unavailable")
	ErrUnavailable   = errors.New("strata: all tiers unavailable")
)

Infrastructure errors

View Source
var (
	ErrTxFailed  = errors.New("strata: transaction failed")
	ErrTxTimeout = errors.New("strata: transaction timeout")
)

Transaction errors

View Source
var (
	ErrNoVectorField            = errors.New("strata: schema has no vector field")
	ErrPgvectorExtensionMissing = errors.New("strata: pgvector extension not installed; run: CREATE EXTENSION IF NOT EXISTS vector;")
	ErrVectorDimensionMismatch  = errors.New("strata: vector dimension does not match index")
	ErrInvalidIndexType         = errors.New("strata: invalid index type for field")
	ErrTopKInvalid              = errors.New("strata: topK must be >= 1")
	ErrNoEmbeddingProvider      = errors.New("strata: EmbeddingProvider is required for vector schemas; set Config.EmbeddingProvider to NewOllamaProvider or NewOpenAIProvider")
	ErrEmbeddingModelChanged    = errors.New("strata: embedding model has changed since last migration; call ds.ReEmbed() to migrate")
	ErrReEmbedAlreadyRunning    = errors.New("strata: re-embed migration already in progress")
	ErrReEmbedTextFieldMissing  = errors.New("strata: specified text field not found in schema model")
	ErrInvalidTagForType        = errors.New("strata: strata:\"vector\" tag requires field type pgvector.Vector")
	ErrEmptyVectorQuery         = errors.New("strata: vector query string must not be empty")
	ErrNilContext               = errors.New("strata: context must not be nil")
)

Vector / pgvector errors

View Source
var (
	// BuildDate is the date and time the binary was built.
	// Set by: -ldflags "-X 'github.com/AndrewDonelson/strata.BuildDate=2026.02.28-1750'"
	BuildDate = "0000.00.00-0000"

	// BuildEnv is the target environment for this build.
	// Set by: -ldflags "-X 'github.com/AndrewDonelson/strata.BuildEnv=dev'"
	BuildEnv = "dev"
)

Build-time variables injected via -ldflags by the Makefile. Defaults represent an unversioned local development build.

BuildDate format : YYYY.MM.DD-HHMM  (24-hour clock)
BuildEnv  values : dev | qa | prod

Full version example: "2026.02.28-1750-dev"

View Source
var (
	ErrHookPanic = errors.New("strata: hook panicked")
)

Hook errors

View Source
var (
	ErrInsufficientFunds = errors.New("strata: insufficient funds")
)

Domain errors

View Source
var (
	ErrInvalidConfig = errors.New("strata: invalid configuration")
)

Config errors

View Source
var (
	ErrWriteBehindMaxRetry = errors.New("strata: write-behind exceeded max retries")
)

Write-behind errors

Functions

func GetTyped

func GetTyped[T any](ctx context.Context, ds *DataStore, schemaName, id string) (*T, error)

GetTyped is a generic convenience wrapper around Get.

func Q

func Q() *queryBuilder

Q returns a new fluent query builder.

func SearchTyped

func SearchTyped[T any](ctx context.Context, ds *DataStore, schemaName string, q *Query) ([]T, error)

SearchTyped is a generic convenience wrapper around Search.

func Version

func Version() string

Version returns the full version string in the form "YYYY.MM.DD-HHMM-env", e.g. "2026.02.28-1750-dev".

Types

type AES256GCM

type AES256GCM struct {
	// contains filtered or unexported fields
}

AES256GCM implements AES-256-GCM authenticated encryption.

func NewAES256GCM

func NewAES256GCM(key []byte) (*AES256GCM, error)

NewAES256GCM creates an AES-256-GCM encryptor from a 32-byte key.

func (*AES256GCM) Decrypt

func (e *AES256GCM) Decrypt(ciphertext []byte) ([]byte, error)

Decrypt decrypts ciphertext produced by Encrypt.

func (*AES256GCM) Encrypt

func (e *AES256GCM) Encrypt(plaintext []byte) ([]byte, error)

Encrypt encrypts plaintext using AES-256-GCM with a random nonce. Output: nonce (12 bytes) || ciphertext.

type Codec

type Codec = codec.Codec

type Config

type Config struct {
	// DSNs
	PostgresDSN   string
	RedisAddr     string
	RedisPassword string
	RedisDB       int

	// Pool sizes
	L1Pool L1PoolConfig
	L2Pool L2PoolConfig
	L3Pool L3PoolConfig

	// TTLs
	DefaultL1TTL time.Duration
	DefaultL2TTL time.Duration

	// Write behaviour
	DefaultWriteMode          WriteMode
	WriteBehindFlushInterval  time.Duration
	WriteBehindFlushThreshold int
	WriteBehindMaxRetry       int

	// Invalidation
	InvalidationChannel string

	// L4 distributed peer sync (optional; schemas opt-in via Schema.L4.Enabled)
	L4 L4Config

	// Optional overrideable components
	Codec   codec.Codec
	Clock   clock.Clock
	Metrics metrics.MetricsRecorder
	Logger  Logger

	// Encryption key (must be 32 bytes for AES-256-GCM; nil = disabled).
	EncryptionKey []byte

	// EmbeddingProvider — required when any registered schema has a strata:"vector" field.
	// If nil and a vector schema is registered, ds.Migrate() returns ErrNoEmbeddingProvider.
	// Use NewOllamaProvider or NewOpenAIProvider to create a provider.
	EmbeddingProvider EmbeddingProvider
}

Config contains all DataStore configuration.

type DataStore

type DataStore struct {
	// contains filtered or unexported fields
}

DataStore is the main entry-point for the Strata library.

func NewDataStore

func NewDataStore(cfg Config) (*DataStore, error)

NewDataStore creates and initialises a DataStore from the provided Config.

func (*DataStore) Close

func (ds *DataStore) Close() error

Close gracefully shuts down the DataStore.

func (*DataStore) Count

func (ds *DataStore) Count(ctx context.Context, schemaName string, q *Query) (int64, error)

Count returns the number of rows matching q (nil q = all rows).

func (*DataStore) Delete

func (ds *DataStore) Delete(ctx context.Context, schemaName, id string) error

Delete removes a record from all tiers.

func (*DataStore) Exists

func (ds *DataStore) Exists(ctx context.Context, schemaName, id string) (bool, error)

Exists returns true if a record exists in any tier.

func (*DataStore) FlushDirty

func (ds *DataStore) FlushDirty(ctx context.Context) error

FlushDirty blocks until all write-behind entries are flushed to L3.

func (*DataStore) Get

func (ds *DataStore) Get(ctx context.Context, schemaName, id string, dest any) error

Get fetches the record with the given id into dest. dest must be a pointer to the model type of the registered schema.

func (*DataStore) Invalidate

func (ds *DataStore) Invalidate(ctx context.Context, schemaName, id string) error

Invalidate removes a key from all cache tiers and publishes an invalidation event.

func (*DataStore) InvalidateAll

func (ds *DataStore) InvalidateAll(ctx context.Context, schemaName string) error

InvalidateAll flushes all cached entries for schemaName across L1 and L2.

func (*DataStore) Migrate

func (ds *DataStore) Migrate(ctx context.Context) error

Migrate applies all pending schema changes to PostgreSQL (idempotent).

func (*DataStore) MigrateFrom

func (ds *DataStore) MigrateFrom(ctx context.Context, dir string) error

MigrateFrom applies SQL migration files from dir in NNN_description.sql order.

func (*DataStore) MigrationStatus

func (ds *DataStore) MigrationStatus(ctx context.Context) ([]MigrationRecord, error)

MigrationStatus returns current migration state.

func (*DataStore) ReEmbed

func (ds *DataStore) ReEmbed(ctx context.Context, schemaName, textFieldName string) error

ReEmbed walks all L3 records for schemaName in batches of 100, re-embedding the textFieldName field using the current EmbeddingProvider. It is resumable: if interrupted, a subsequent call picks up where it left off using the reembed_progress counter in strata_schema_meta.

On completion it updates strata_schema_meta with the new model ID and dimension. If the dimension changed, it also rebuilds the vector index.

Returns ErrReEmbedAlreadyRunning if another migration is in progress, ErrReEmbedTextFieldMissing if textFieldName does not match any struct field, or a summary error if some records failed to embed.

func (*DataStore) Register

func (ds *DataStore) Register(s Schema) error

Register compiles and stores a Schema definition. Returns ErrInvalidTagForType if a strata:"vector" tag is applied to a non-pgvector.Vector field, or ErrInvalidIndexType if an IVFFlat/HNSW index is applied to a non-vector field.

func (*DataStore) Search

func (ds *DataStore) Search(ctx context.Context, schemaName string, q *Query, destSlice any) error

Search runs q against L3 and returns the results in destSlice (pointer to slice).

func (*DataStore) SearchCached

func (ds *DataStore) SearchCached(ctx context.Context, schemaName string, q *Query, destSlice any) error

SearchCached runs q against L3; caches list result in L2 by SQL fingerprint.

func (*DataStore) Set

func (ds *DataStore) Set(ctx context.Context, schemaName, id string, value any) error

Set stores value under schemaName with the given id.

func (*DataStore) SetMany

func (ds *DataStore) SetMany(ctx context.Context, schemaName string, pairs map[string]any) error

SetMany stores multiple id→value pairs for the given schema.

func (*DataStore) Stats

func (ds *DataStore) Stats() Stats

Stats returns a snapshot of operational metrics.

func (*DataStore) Tx

func (ds *DataStore) Tx(ctx context.Context) *Tx

Tx returns a new transaction bound to ctx.

func (*DataStore) VectorSearch

func (ds *DataStore) VectorSearch(
	ctx context.Context,
	schemaName string,
	query string,
	topK int,
	filters map[string]any,
) ([]SimilarityResult, error)

VectorSearch performs an approximate nearest-neighbour search over the vector field of the named schema, using the configured EmbeddingProvider to embed the plain-text query string internally.

topK must be ≥ 1; filters maps column names to equality values (AND logic). Results are ordered by cosine similarity (highest first) and are back-filled into L2 then L1 before returning.

Returns ErrTopKInvalid, ErrSchemaNotFound, ErrNoVectorField, ErrNoEmbeddingProvider, ErrPgvectorExtensionMissing, or ErrEmptyVectorQuery for invalid inputs.

func (*DataStore) WarmCache

func (ds *DataStore) WarmCache(ctx context.Context, schemaName string, limit int) error

WarmCache pre-loads up to limit records from L3 into L1 and L2. If limit <= 0, all rows are loaded.

type EmbeddingProvider

type EmbeddingProvider interface {
	// Embed converts text to a vector. The returned vector length must equal Dimensions().
	Embed(ctx context.Context, text string) (pgvector.Vector, error)

	// Dimensions returns the number of dimensions this provider produces.
	// Strata calls this once at schema Register time to size the Postgres column.
	// Implementations should cache the value after the first resolution.
	Dimensions() int

	// ModelID returns a stable identifier for the current embedding model
	// (e.g. "nomic-embed-text", "text-embedding-3-small").
	// Strata stores this in strata_schema_meta and compares it on every Migrate()
	// to detect model changes that require re-embedding.
	ModelID() string
}

EmbeddingProvider converts text to a fixed-size float32 vector. Strata calls it once per VectorSearch query and once per record during ReEmbed. Implementations must be safe for concurrent use.

func NewOllamaProvider

func NewOllamaProvider(baseURL, modelName string) EmbeddingProvider

NewOllamaProvider creates an OllamaProvider that embeds text using the model running at baseURL/api/embed. The dimension is resolved on the first Embed() or Dimensions() call and cached for subsequent calls.

func NewOpenAIProvider

func NewOpenAIProvider(apiKey, modelName string) EmbeddingProvider

NewOpenAIProvider creates an OpenAIProvider using the official embeddings endpoint.

type Encryptor

type Encryptor interface {
	Encrypt(plaintext []byte) ([]byte, error)
	Decrypt(ciphertext []byte) ([]byte, error)
}

Encryptor encrypts and decrypts field values for fields tagged with "encrypted".

type EvictionPolicy

type EvictionPolicy int

EvictionPolicy determines which L1 entry is evicted when MaxEntries is reached.

const (
	EvictLRU EvictionPolicy = iota
	EvictLFU
	EvictFIFO
)

type Index

type Index struct {
	Fields         []string // column names
	Unique         bool
	Name           string    // optional; auto-generated if empty
	Type           IndexType // IndexDefault (btree) | IndexIVFFlat | IndexHNSW | IndexTrigram
	Lists          int       // IVFFlat: number of lists (default: 100)
	M              int       // HNSW: max connections per layer (default: 16)
	EfConstruction int       // HNSW: build-time search width (default: 64)
	DistanceFunc   string    // "cosine" | "l2" | "ip" (default: "cosine")
}

Index defines a database index on one or more columns.

type IndexType

type IndexType string

IndexType determines what kind of database index is created.

const (
	IndexDefault IndexType = ""        // standard btree
	IndexIVFFlat IndexType = "ivfflat" // pgvector IVFFlat ANN index
	IndexHNSW    IndexType = "hnsw"    // pgvector HNSW ANN index
	IndexTrigram IndexType = "gin"     // pg_trgm GIN index for text search
)

type L1PoolConfig

type L1PoolConfig struct {
	MaxEntries int
	Eviction   EvictionPolicy
}

L1PoolConfig configures the in-memory L1 cache tier.

type L2PoolConfig

type L2PoolConfig struct {
	PoolSize     int
	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
}

L2PoolConfig configures the Redis L2 cache tier client.

type L3PoolConfig

type L3PoolConfig struct {
	MaxConns        int32
	MinConns        int32
	MaxConnLifetime time.Duration
	MaxConnIdleTime time.Duration
}

L3PoolConfig configures the PostgreSQL L3 connection pool.

type L4Config

type L4Config struct {
	Enabled        bool          // false = L4 is entirely inactive (default)
	Mode           string        // "peer" (in-memory) or "ledger" (BoltDB-backed)
	Port           int           // TCP listen port; default 7743
	DataDir        string        // BoltDB directory for ledger mode; default "/var/lib/strata/l4"
	SyncInterval   time.Duration // gossip sync frequency; default 30s
	MaxPeers       int           // max simultaneous peer connections; default 50
	Quorum         int           // confirmations needed for pending → confirmed; default 3
	BootstrapPeers []string      // "host:port" addresses to dial on startup
	DNSSeed        string        // DNS seed hostname for peer discovery
	NodeKeyPath    string        // path to load/persist the Ed25519 node private key
}

L4Config configures the optional L4 distributed peer-to-peer sync layer. Set Enabled = true to activate; choose Mode and set Port/DataDir/Quorum as needed. Individual schemas opt-in via Schema.L4.Enabled.

type L4Policy

type L4Policy struct {
	Enabled     bool   // false = no L4 sync for this schema (default)
	AppID       string // L4 namespace; defaults to schema Name
	SyncDeletes bool   // if true, Delete() → L4 Revoke(); false = L4 record is left as-is
}

L4Policy configures optional L4 distributed-ledger sync for a schema. When Enabled is true, every successful L3 write is automatically published to the L4 peer layer. Deletes are revoked if SyncDeletes is also true.

type Logger

type Logger interface {
	Info(msg string, keysAndValues ...any)
	Warn(msg string, keysAndValues ...any)
	Error(msg string, keysAndValues ...any)
	Debug(msg string, keysAndValues ...any)
}

Logger is the logging interface used internally by Strata. Implement this to route logs to zap, slog, logrus, etc.

type MemPolicy

type MemPolicy struct {
	TTL        time.Duration
	MaxEntries int
	Eviction   EvictionPolicy
}

MemPolicy configures L1 in-memory cache behavior for a schema.

type MetricsRecorder

type MetricsRecorder = metrics.MetricsRecorder

Re-export types so callers only import this package.

type MigrationRecord

type MigrationRecord struct {
	ID        int
	Schema    string
	FileName  string
	AppliedAt time.Time
}

MigrationRecord describes a single applied migration.

type OllamaProvider

type OllamaProvider struct {
	// contains filtered or unexported fields
}

OllamaProvider calls a local Ollama instance to generate embeddings. Dimensions() is auto-detected by calling Embed() once on startup and cached.

Example:

strata.NewOllamaProvider("http://cqai:11434", "nomic-embed-text")

func (*OllamaProvider) Dimensions

func (p *OllamaProvider) Dimensions() int

Dimensions implements EmbeddingProvider. Calls Embed("") once to resolve the dimension, then caches it.

func (*OllamaProvider) Embed

func (p *OllamaProvider) Embed(ctx context.Context, text string) (pgvector.Vector, error)

Embed implements EmbeddingProvider.

func (*OllamaProvider) ModelID

func (p *OllamaProvider) ModelID() string

ModelID implements EmbeddingProvider.

type OpenAIProvider

type OpenAIProvider struct {
	// contains filtered or unexported fields
}

OpenAIProvider calls the OpenAI embeddings API (or compatible endpoints).

Example:

strata.NewOpenAIProvider(os.Getenv("OPENAI_API_KEY"), "text-embedding-3-small")

func (*OpenAIProvider) Dimensions

func (p *OpenAIProvider) Dimensions() int

Dimensions implements EmbeddingProvider. Returns the known dimension for well-known models without an API call, falling back to a real Embed() call for unknown models.

func (*OpenAIProvider) Embed

func (p *OpenAIProvider) Embed(ctx context.Context, text string) (pgvector.Vector, error)

Embed implements EmbeddingProvider.

func (*OpenAIProvider) ModelID

func (p *OpenAIProvider) ModelID() string

ModelID implements EmbeddingProvider.

type PostgresPolicy

type PostgresPolicy struct {
	TableName   string
	ReadReplica string
	PartitionBy string
}

PostgresPolicy configures L3 Postgres persistence for a schema.

type Query

type Query struct {
	Where   string
	Args    []any
	OrderBy string
	Desc    bool
	Limit   int
	Offset  int
	Fields  []string
	ForceL3 bool
	ForceL2 bool
}

Query specifies search parameters for the Search and SearchCached operations.

func (Query) ToSQL

func (q Query) ToSQL(table string, columns []string, defaultLimit int) (string, []any)

toSQL converts a Query into a SQL SELECT statement.

type RedisPolicy

type RedisPolicy struct {
	TTL       time.Duration
	KeyPrefix string
}

RedisPolicy configures L2 Redis cache behavior for a schema.

type Schema

type Schema struct {
	Name      string
	Model     any
	L1        MemPolicy
	L2        RedisPolicy
	L3        PostgresPolicy
	L4        L4Policy
	WriteMode WriteMode
	Indexes   []Index
	Hooks     SchemaHooks
}

Schema defines one data collection and its caching policy.

type SchemaHooks

type SchemaHooks struct {
	BeforeSet    func(ctx context.Context, value any) error
	AfterSet     func(ctx context.Context, value any)
	BeforeGet    func(ctx context.Context, id string)
	AfterGet     func(ctx context.Context, value any)
	OnEvict      func(ctx context.Context, key string, value any)
	OnWriteError func(ctx context.Context, key string, err error)
}

SchemaHooks provides optional lifecycle callbacks.

type SimilarityResult

type SimilarityResult struct {
	// ID is the primary-key value of the matching record.
	ID string
	// Score is the cosine similarity (0.0 = orthogonal, 1.0 = identical).
	// Computed as 1 − cosine_distance, assuming unit-normalised vectors.
	Score float64
	// Value is the fully-hydrated model struct (same type as the schema Model).
	// The embedding/vector field IS populated when results come directly from L3.
	Value any
}

SimilarityResult is one item returned by VectorSearch.

type Stats

type Stats struct {
	Gets       int64
	Sets       int64
	Deletes    int64
	Errors     int64
	DirtyCount int64
	L1Entries  int64
}

Stats is the snapshot returned by DataStore.Stats().

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx is a lightweight transaction helper that queues L3 operations and updates caches on commit.

func (*Tx) Commit

func (tx *Tx) Commit() error

Commit executes all queued operations inside a single L3 transaction.

func (*Tx) Delete

func (tx *Tx) Delete(schemaName, id string) *Tx

Delete queues a delete operation in the transaction.

func (*Tx) Set

func (tx *Tx) Set(schemaName, id string, value any) *Tx

Set queues a set operation in the transaction.

type WriteMode

type WriteMode int

WriteMode controls how writes flow through the three tiers.

const (
	WriteThrough        WriteMode = iota // L3 -> L2 -> L1, maximum safety
	WriteBehind                          // L1 immediately, L2/L3 async, max performance
	WriteThroughL1Async                  // L3+L2 sync, L1 lazily on next read
)

Directories

Path Synopsis
internal
clock
Package clock provides a testable clock interface for TTL calculations.
Package clock provides a testable clock interface for TTL calculations.
codec
Package codec provides encode/decode interfaces for cache serialization.
Package codec provides encode/decode interfaces for cache serialization.
l1
Package l1 provides a sharded, concurrent in-memory cache with TTL and eviction.
Package l1 provides a sharded, concurrent in-memory cache with TTL and eviction.
l2
Package l2 provides the Redis tier cache adapter.
Package l2 provides the Redis tier cache adapter.
l3
Package l3 provides the PostgreSQL persistence tier adapter.
Package l3 provides the PostgreSQL persistence tier adapter.
l4
Package l4 provides the optional distributed ledger sync layer for Strata.
Package l4 provides the optional distributed ledger sync layer for Strata.
metrics
Package metrics provides the MetricsRecorder interface and a noop implementation.
Package metrics provides the MetricsRecorder interface and a noop implementation.
Package l4 re-exports the Strata L4 distributed sync layer for first-party consumers (such as LADL) that need direct access to the ledger primitives.
Package l4 re-exports the Strata L4 distributed sync layer for first-party consumers (such as LADL) that need direct access to the ledger primitives.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL