middleware

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package middleware provides production-grade middleware for Vango applications.

This package includes:

  • OpenTelemetry distributed tracing middleware
  • Prometheus metrics middleware
  • Recovery and logging utilities

OpenTelemetry Middleware

The OpenTelemetry middleware automatically traces every Vango event, providing distributed tracing across your application. Traces include session ID, event type, route, and patch counts.

app := vango.NewApp(
    vango.WithMiddleware(
        middleware.OpenTelemetry(),
    ),
)

Configure with options:

middleware.OpenTelemetry(
    middleware.WithTracerName("my-app"),
    middleware.WithIncludeUserID(true),
    middleware.WithEventFilter(func(ctx server.Ctx) bool {
        return ctx.Path() != "/healthz"
    }),
)

Prometheus Metrics

The Prometheus middleware collects metrics about your Vango application:

  • vango_sessions_active: Current number of active sessions

  • vango_sessions_detached: Current number of detached sessions

  • vango_session_resume_success_total: Total successful resumes

  • vango_session_resume_failed_total{reason}: Total failed resumes

  • vango_patch_bytes{route}: Patch bytes histogram

  • vango_patch_mismatch_total{reason}: Patch mismatch counter

  • vango_resource_duration_ms{resource}: Resource duration histogram

  • vango_resource_error_total{resource}: Resource error counter

  • vango_action_duration_ms{action}: Action duration histogram

  • vango_action_error_total{action}: Action error counter

  • vango_schema_mismatch_total{reason}: Schema mismatch counter

  • vango_persist_write_rejected_total{reason,stable_id}: Persist reject counter

  • vango_persist_bytes_total{stable_id}: Persist bytes counter

  • vango_session_persist_bytes: Session persist bytes histogram

  • vango_cold_deploy_ack_total{source}: Cold deploy ack counter

  • vango_events_total: Total events processed by type

  • vango_event_duration_seconds: Event processing duration histogram

  • vango_patches_sent_total: Total patches sent to clients

    app := vango.NewApp( vango.WithMiddleware( middleware.Prometheus(), ), )

Then expose metrics on a separate port:

http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":9090", nil)

Context Propagation

Both middlewares inject trace context into ctx.StdContext(), allowing database drivers and HTTP clients to inherit the trace:

func MyHandler(ctx server.Ctx) error {
    // Database call inherits trace context
    row := db.QueryRowContext(ctx.StdContext(), "SELECT ...")

    // HTTP call inherits trace context
    req, _ := http.NewRequestWithContext(ctx.StdContext(), "GET", url, nil)
    return nil
}

Phase 13: Production Hardening & Observability

This package was introduced in Phase 13 to provide production-grade observability for Vango applications.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AttachPrometheus

func AttachPrometheus(s *server.Server, r *router.Router, opts ...MetricsOption) router.Middleware

AttachPrometheus registers the Prometheus middleware on the router and configures the server to emit runtime metrics to the Prometheus collector.

func OpenTelemetry

func OpenTelemetry(opts ...OTelOption) router.Middleware

OpenTelemetry creates middleware that traces every Vango event.

The middleware:

  • Creates a span for each event with type, target, and session ID
  • Injects trace context into ctx.StdContext() for downstream calls
  • Records errors and sets span status
  • Records patch count as a span attribute

Example:

app := vango.NewApp(
    vango.WithMiddleware(
        middleware.OpenTelemetry(
            middleware.WithTracerName("my-app"),
            middleware.WithIncludeUserID(true),
        ),
    ),
)

The tracer uses the global OpenTelemetry tracer provider. Configure it in your main() before starting the server:

tp := sdktrace.NewTracerProvider(
    sdktrace.WithBatcher(exporter),
    sdktrace.WithResource(resource.NewWithAttributes(
        semconv.SchemaURL,
        semconv.ServiceName("my-app"),
    )),
)
otel.SetTracerProvider(tp)

func Prometheus

func Prometheus(opts ...MetricsOption) router.Middleware

Prometheus creates middleware that collects Prometheus metrics for Vango events.

Metrics collected:

  • vango_signal_write_violation_total: Counter of invalid signal write attempts (low-cardinality labels: operation, reason)
  • vango_events_total: Counter of events by path and status
  • vango_event_duration_seconds: Histogram of event processing duration
  • vango_event_errors_total: Counter of event errors by path and error type
  • vango_patches_sent_total: Counter of patches sent (when RecordPatches is called)
  • vango_patch_bytes: Histogram of patch frame sizes by route
  • vango_patch_mismatch_total: Counter of patch mismatches by reason
  • vango_resource_duration_ms: Histogram of resource durations by resource
  • vango_resource_error_total: Counter of resource errors by resource
  • vango_action_duration_ms: Histogram of action durations by action
  • vango_action_error_total: Counter of action errors by action
  • vango_schema_mismatch_total: Counter of schema mismatches by reason
  • vango_persist_write_rejected_total: Counter of rejected persisted writes by reason and stable ID
  • vango_persist_bytes_total: Counter of persisted bytes by stable ID
  • vango_session_persist_bytes: Histogram of persisted bytes per session
  • vango_cold_deploy_ack_total: Counter of cold deploy acknowledgments by source
  • vango_sessions_active: Gauge of active sessions (when session hooks are used)
  • vango_sessions_detached: Gauge of detached sessions
  • vango_session_memory_bytes: Histogram of session memory usage
  • vango_websocket_errors_total: Counter of WebSocket errors
  • vango_reconnects_total: Counter of session reconnections

Example:

app := vango.NewApp(
    vango.WithMiddleware(
        middleware.Prometheus(
            middleware.WithNamespace("myapp"),
        ),
    ),
)

// Expose metrics endpoint
http.Handle("/metrics", promhttp.Handler())

func RecordEviction

func RecordEviction()

RecordEviction records an LRU session eviction.

func RecordPatches

func RecordPatches(count int)

RecordPatches records the number of patches sent. Call this from your server code when patches are sent to clients.

func RecordReconnect

func RecordReconnect()

RecordReconnect records a session reconnection. Call this when a detached session is successfully resumed.

func RecordResume

func RecordResume()

RecordResume records a successful session resume.

func RecordResumeFailed

func RecordResumeFailed(reason string)

RecordResumeFailed records a failed resume attempt.

func RecordSessionCreate

func RecordSessionCreate()

RecordSessionCreate records a new session creation.

func RecordSessionDestroy

func RecordSessionDestroy(memoryBytes int64)

RecordSessionDestroy records a session destruction.

func RecordSessionDetach

func RecordSessionDetach()

RecordSessionDetach records a session becoming detached.

func RecordSessionReattach

func RecordSessionReattach()

RecordSessionReattach records a detached session being reattached.

func RecordWebSocketError

func RecordWebSocketError(errorType string)

RecordWebSocketError records a WebSocket error.

func SpanFromContext

func SpanFromContext(ctx server.Ctx) trace.Span

SpanFromContext retrieves the current trace span from the context. Returns nil if no span is available.

Example:

func MyHandler(ctx server.Ctx) error {
    if span := middleware.SpanFromContext(ctx); span != nil {
        span.SetAttributes(attribute.Int("my.count", 42))
    }
    return nil
}

func TraceContext

func TraceContext(ctx server.Ctx) context.Context

TraceContext returns the trace context from the Ctx for propagation. Use this to propagate trace context to external services.

Example:

func MyHandler(ctx server.Ctx) error {
    traceCtx := middleware.TraceContext(ctx)
    req, _ := http.NewRequestWithContext(traceCtx, "GET", url, nil)
    return nil
}

Types

type Collector

type Collector struct {
	// contains filtered or unexported fields
}

Collector returns the metrics for use in custom registrations. This allows collecting Vango metrics alongside other application metrics.

func GetMetrics

func GetMetrics() *Collector

GetMetrics returns the global metrics collector. Returns nil if Prometheus middleware has not been initialized.

func (*Collector) RecordActionDuration

func (c *Collector) RecordActionDuration(action string, ms int64)

RecordActionDuration records action duration.

func (*Collector) RecordActionError

func (c *Collector) RecordActionError(action string)

RecordActionError records an action error.

func (*Collector) RecordColdDeployAck

func (c *Collector) RecordColdDeployAck(source string)

RecordColdDeployAck records a cold deploy acknowledgment.

func (*Collector) RecordPatchBytes

func (c *Collector) RecordPatchBytes(route string, bytes int)

RecordPatchBytes records patch bytes by route.

func (*Collector) RecordPatchMismatch

func (c *Collector) RecordPatchMismatch(reason string)

RecordPatchMismatch records a patch mismatch by reason.

func (*Collector) RecordPersistBytes

func (c *Collector) RecordPersistBytes(stableID string, bytes int64)

RecordPersistBytes records persisted bytes.

func (*Collector) RecordPersistWriteRejected

func (c *Collector) RecordPersistWriteRejected(reason, stableID string)

RecordPersistWriteRejected records a rejected persist write.

func (*Collector) RecordResourceDuration

func (c *Collector) RecordResourceDuration(resource string, ms int64)

RecordResourceDuration records resource duration.

func (*Collector) RecordResourceError

func (c *Collector) RecordResourceError(resource string)

RecordResourceError records a resource error.

func (*Collector) RecordResumeFailure

func (c *Collector) RecordResumeFailure(reason string)

RecordResumeFailure records a failed resume attempt.

func (*Collector) RecordResumeSuccess

func (c *Collector) RecordResumeSuccess()

RecordResumeSuccess records a successful resume.

func (*Collector) RecordSchemaMismatch

func (c *Collector) RecordSchemaMismatch(reason string)

RecordSchemaMismatch records a schema mismatch.

func (*Collector) RecordSessionActive

func (c *Collector) RecordSessionActive(delta int)

RecordSessionActive records active session deltas.

func (*Collector) RecordSessionDetached

func (c *Collector) RecordSessionDetached(delta int)

RecordSessionDetached records detached session deltas.

func (*Collector) RecordSessionPersistBytes

func (c *Collector) RecordSessionPersistBytes(sessionID string, bytes int64)

RecordSessionPersistBytes records persisted bytes per session.

func (*Collector) RecordSignalWriteViolation

func (c *Collector) RecordSignalWriteViolation(operation, reason string)

RecordSignalWriteViolation records an invalid signal write attempt.

type MetricsConfig

type MetricsConfig struct {
	// Namespace is the metrics namespace (default: "vango").
	Namespace string

	// Subsystem is the metrics subsystem (default: "").
	Subsystem string

	// ConstLabels are constant labels added to all metrics.
	ConstLabels prometheus.Labels

	// Buckets are the histogram buckets for event duration.
	// Default: prometheus.DefBuckets
	Buckets []float64

	// Registry is the Prometheus registry to use.
	// Default: prometheus.DefaultRegisterer
	Registry prometheus.Registerer
}

MetricsConfig configures the Prometheus metrics middleware.

type MetricsOption

type MetricsOption func(*MetricsConfig)

MetricsOption configures the Prometheus metrics middleware.

func WithBuckets

func WithBuckets(buckets []float64) MetricsOption

WithBuckets sets the histogram buckets.

func WithConstLabels

func WithConstLabels(labels prometheus.Labels) MetricsOption

WithConstLabels sets constant labels for all metrics.

func WithNamespace

func WithNamespace(namespace string) MetricsOption

WithNamespace sets the metrics namespace.

func WithRegistry

func WithRegistry(registry prometheus.Registerer) MetricsOption

WithRegistry sets the Prometheus registry.

func WithSubsystem

func WithSubsystem(subsystem string) MetricsOption

WithSubsystem sets the metrics subsystem.

type OTelConfig

type OTelConfig struct {
	// TracerName is the name of the tracer (default: "vango").
	TracerName string

	// IncludeUserID includes the user ID in traces if available.
	// May contain sensitive information - disabled by default.
	IncludeUserID bool

	// IncludeRoute includes the current route in traces.
	// Enabled by default.
	IncludeRoute bool

	// Filter determines which events to trace.
	// Return true to trace the event, false to skip.
	// If nil, all events are traced.
	Filter func(ctx server.Ctx) bool

	// AttributeExtractor extracts custom attributes from the context.
	// Called for each traced event.
	AttributeExtractor func(ctx server.Ctx) []attribute.KeyValue
	// contains filtered or unexported fields
}

OTelConfig configures the OpenTelemetry middleware.

type OTelOption

type OTelOption func(*OTelConfig)

OTelOption configures the OpenTelemetry middleware.

func WithAttributeExtractor

func WithAttributeExtractor(extractor func(ctx server.Ctx) []attribute.KeyValue) OTelOption

WithAttributeExtractor sets a custom attribute extractor.

func WithEventFilter

func WithEventFilter(filter func(ctx server.Ctx) bool) OTelOption

WithEventFilter sets a filter function for events.

func WithIncludeRoute

func WithIncludeRoute(include bool) OTelOption

WithIncludeRoute enables/disables including route in traces.

func WithIncludeUserID

func WithIncludeUserID(include bool) OTelOption

WithIncludeUserID enables including user ID in traces.

func WithTracerName

func WithTracerName(name string) OTelOption

WithTracerName sets the tracer name.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL