Documentation
¶
Overview ¶
Package sqlview provides SQL-backed StateView projections with transactional event processing.
sqlview bridges the gap between eskit's StateView pattern and SQL databases. Each projection's Evolve function receives a *sql.Tx, ensuring that read model updates are atomic. Setup and Reset functions receive a *sql.DB for DDL operations.
Level 1: Simple (in-memory) ¶
Use eskit.StateView directly with closures over your data structures. Good for prototyping, tests, and small datasets.
Level 2: SQL (this package) ¶
Use sqlview.Config to define projections that write to SQL tables. sqlview.From() wraps the config into an eskit.StateView with TX management. Good for production: queryable, survives restarts, rebuildable.
Atomic Checkpoint ¶
For non-idempotent projections, set CheckpointInTx: true on the Config. This saves the subscription checkpoint within the same transaction as Evolve, preventing double processing on crash recovery.
Example ¶
var BalanceView = sqlview.Config[domain.Event]{
Name: "account-balance",
EventTypes: []string{"MoneyDeposited", "MoneyWithdrawn"},
Setup: func(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS balances (
account_id TEXT PRIMARY KEY, balance INTEGER NOT NULL DEFAULT 0)`)
return err
},
Evolve: func(ctx context.Context, tx *sql.Tx, event eskit.Event[domain.Event]) error {
switch e := event.Data.(type) {
case domain.MoneyDeposited:
_, err := tx.ExecContext(ctx, `INSERT INTO balances ...`, ...)
return err
}
return nil
},
}
view := sqlview.From(db, BalanceView)
sub := subscription.FromStateView(view, reader, checkpoint)
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config[E any] struct { // Name identifies this projection (used for checkpoint tracking). Name string // EventTypes lists the event types this view cares about. EventTypes []string // Setup initializes the read model (e.g., CREATE TABLE IF NOT EXISTS). // Called once when the subscription starts. Optional. Setup func(ctx context.Context, db *sql.DB) error // Reset clears the read model for rebuild (e.g., DELETE FROM table). // Called before replaying from position 0. Optional. Reset func(ctx context.Context, db *sql.DB) error // Evolve applies a single event to the read model within a transaction. // The transaction is committed automatically if Evolve returns nil. Evolve func(ctx context.Context, tx *sql.Tx, event eskit.Event[E]) error // CheckpointInTx saves the subscription checkpoint within the same // transaction as Evolve, making them atomic. This prevents double // processing of non-idempotent side effects on crash recovery. // Requires a "checkpoints" table with (name TEXT PRIMARY KEY, position INTEGER, updated_at DATETIME). CheckpointInTx bool }
Config defines a SQL-backed projection.
Each field maps to the corresponding StateView field, but with SQL-specific signatures: Setup and Reset receive *sql.DB, Evolve receives *sql.Tx.