Documentation
¶
Overview ¶
Package middleware provides production-grade middleware for Vango applications.
This package includes:
- OpenTelemetry distributed tracing middleware
- Prometheus metrics middleware
- Recovery and logging utilities
OpenTelemetry Middleware ¶
The OpenTelemetry middleware automatically traces every Vango event, providing distributed tracing across your application. Traces include session ID, event type, route, and patch counts.
app := vango.NewApp(
vango.WithMiddleware(
middleware.OpenTelemetry(),
),
)
Configure with options:
middleware.OpenTelemetry(
middleware.WithTracerName("my-app"),
middleware.WithIncludeUserID(true),
middleware.WithEventFilter(func(ctx server.Ctx) bool {
return ctx.Path() != "/healthz"
}),
)
Prometheus Metrics ¶
The Prometheus middleware collects metrics about your Vango application:
vango_sessions_active: Current number of active sessions
vango_sessions_detached: Current number of detached sessions
vango_session_resume_success_total: Total successful resumes
vango_session_resume_failed_total{reason}: Total failed resumes
vango_patch_bytes{route}: Patch bytes histogram
vango_patch_mismatch_total{reason}: Patch mismatch counter
vango_resource_duration_ms{resource}: Resource duration histogram
vango_resource_error_total{resource}: Resource error counter
vango_action_duration_ms{action}: Action duration histogram
vango_action_error_total{action}: Action error counter
vango_schema_mismatch_total{reason}: Schema mismatch counter
vango_persist_write_rejected_total{reason,stable_id}: Persist reject counter
vango_persist_bytes_total{stable_id}: Persist bytes counter
vango_session_persist_bytes: Session persist bytes histogram
vango_cold_deploy_ack_total{source}: Cold deploy ack counter
vango_events_total: Total events processed by type
vango_event_duration_seconds: Event processing duration histogram
vango_patches_sent_total: Total patches sent to clients
app := vango.NewApp( vango.WithMiddleware( middleware.Prometheus(), ), )
Then expose metrics on a separate port:
http.Handle("/metrics", promhttp.Handler())
go http.ListenAndServe(":9090", nil)
Context Propagation ¶
Both middlewares inject trace context into ctx.StdContext(), allowing database drivers and HTTP clients to inherit the trace:
func MyHandler(ctx server.Ctx) error {
// Database call inherits trace context
row := db.QueryRowContext(ctx.StdContext(), "SELECT ...")
// HTTP call inherits trace context
req, _ := http.NewRequestWithContext(ctx.StdContext(), "GET", url, nil)
return nil
}
Phase 13: Production Hardening & Observability ¶
This package was introduced in Phase 13 to provide production-grade observability for Vango applications.
Index ¶
- func AttachPrometheus(s *server.Server, r *router.Router, opts ...MetricsOption) router.Middleware
- func OpenTelemetry(opts ...OTelOption) router.Middleware
- func Prometheus(opts ...MetricsOption) router.Middleware
- func RecordEviction()
- func RecordPatches(count int)
- func RecordReconnect()
- func RecordResume()
- func RecordResumeFailed(reason string)
- func RecordSessionCreate()
- func RecordSessionDestroy(memoryBytes int64)
- func RecordSessionDetach()
- func RecordSessionReattach()
- func RecordWebSocketError(errorType string)
- func SpanFromContext(ctx server.Ctx) trace.Span
- func TraceContext(ctx server.Ctx) context.Context
- type Collector
- func (c *Collector) RecordActionDuration(action string, ms int64)
- func (c *Collector) RecordActionError(action string)
- func (c *Collector) RecordColdDeployAck(source string)
- func (c *Collector) RecordPatchBytes(route string, bytes int)
- func (c *Collector) RecordPatchMismatch(reason string)
- func (c *Collector) RecordPersistBytes(stableID string, bytes int64)
- func (c *Collector) RecordPersistWriteRejected(reason, stableID string)
- func (c *Collector) RecordResourceDuration(resource string, ms int64)
- func (c *Collector) RecordResourceError(resource string)
- func (c *Collector) RecordResumeFailure(reason string)
- func (c *Collector) RecordResumeSuccess()
- func (c *Collector) RecordSchemaMismatch(reason string)
- func (c *Collector) RecordSessionActive(delta int)
- func (c *Collector) RecordSessionDetached(delta int)
- func (c *Collector) RecordSessionPersistBytes(sessionID string, bytes int64)
- func (c *Collector) RecordSignalWriteViolation(operation, reason string)
- type MetricsConfig
- type MetricsOption
- type OTelConfig
- type OTelOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AttachPrometheus ¶
func AttachPrometheus(s *server.Server, r *router.Router, opts ...MetricsOption) router.Middleware
AttachPrometheus registers the Prometheus middleware on the router and configures the server to emit runtime metrics to the Prometheus collector.
func OpenTelemetry ¶
func OpenTelemetry(opts ...OTelOption) router.Middleware
OpenTelemetry creates middleware that traces every Vango event.
The middleware:
- Creates a span for each event with type, target, and session ID
- Injects trace context into ctx.StdContext() for downstream calls
- Records errors and sets span status
- Records patch count as a span attribute
Example:
app := vango.NewApp(
vango.WithMiddleware(
middleware.OpenTelemetry(
middleware.WithTracerName("my-app"),
middleware.WithIncludeUserID(true),
),
),
)
The tracer uses the global OpenTelemetry tracer provider. Configure it in your main() before starting the server:
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName("my-app"),
)),
)
otel.SetTracerProvider(tp)
func Prometheus ¶
func Prometheus(opts ...MetricsOption) router.Middleware
Prometheus creates middleware that collects Prometheus metrics for Vango events.
Metrics collected:
- vango_signal_write_violation_total: Counter of invalid signal write attempts (low-cardinality labels: operation, reason)
- vango_events_total: Counter of events by path and status
- vango_event_duration_seconds: Histogram of event processing duration
- vango_event_errors_total: Counter of event errors by path and error type
- vango_patches_sent_total: Counter of patches sent (when RecordPatches is called)
- vango_patch_bytes: Histogram of patch frame sizes by route
- vango_patch_mismatch_total: Counter of patch mismatches by reason
- vango_resource_duration_ms: Histogram of resource durations by resource
- vango_resource_error_total: Counter of resource errors by resource
- vango_action_duration_ms: Histogram of action durations by action
- vango_action_error_total: Counter of action errors by action
- vango_schema_mismatch_total: Counter of schema mismatches by reason
- vango_persist_write_rejected_total: Counter of rejected persisted writes by reason and stable ID
- vango_persist_bytes_total: Counter of persisted bytes by stable ID
- vango_session_persist_bytes: Histogram of persisted bytes per session
- vango_cold_deploy_ack_total: Counter of cold deploy acknowledgments by source
- vango_sessions_active: Gauge of active sessions (when session hooks are used)
- vango_sessions_detached: Gauge of detached sessions
- vango_session_memory_bytes: Histogram of session memory usage
- vango_websocket_errors_total: Counter of WebSocket errors
- vango_reconnects_total: Counter of session reconnections
Example:
app := vango.NewApp(
vango.WithMiddleware(
middleware.Prometheus(
middleware.WithNamespace("myapp"),
),
),
)
// Expose metrics endpoint
http.Handle("/metrics", promhttp.Handler())
func RecordPatches ¶
func RecordPatches(count int)
RecordPatches records the number of patches sent. Call this from your server code when patches are sent to clients.
func RecordReconnect ¶
func RecordReconnect()
RecordReconnect records a session reconnection. Call this when a detached session is successfully resumed.
func RecordResumeFailed ¶
func RecordResumeFailed(reason string)
RecordResumeFailed records a failed resume attempt.
func RecordSessionCreate ¶
func RecordSessionCreate()
RecordSessionCreate records a new session creation.
func RecordSessionDestroy ¶
func RecordSessionDestroy(memoryBytes int64)
RecordSessionDestroy records a session destruction.
func RecordSessionDetach ¶
func RecordSessionDetach()
RecordSessionDetach records a session becoming detached.
func RecordSessionReattach ¶
func RecordSessionReattach()
RecordSessionReattach records a detached session being reattached.
func RecordWebSocketError ¶
func RecordWebSocketError(errorType string)
RecordWebSocketError records a WebSocket error.
func SpanFromContext ¶
SpanFromContext retrieves the current trace span from the context. Returns nil if no span is available.
Example:
func MyHandler(ctx server.Ctx) error {
if span := middleware.SpanFromContext(ctx); span != nil {
span.SetAttributes(attribute.Int("my.count", 42))
}
return nil
}
func TraceContext ¶
TraceContext returns the trace context from the Ctx for propagation. Use this to propagate trace context to external services.
Example:
func MyHandler(ctx server.Ctx) error {
traceCtx := middleware.TraceContext(ctx)
req, _ := http.NewRequestWithContext(traceCtx, "GET", url, nil)
return nil
}
Types ¶
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
Collector returns the metrics for use in custom registrations. This allows collecting Vango metrics alongside other application metrics.
func GetMetrics ¶
func GetMetrics() *Collector
GetMetrics returns the global metrics collector. Returns nil if Prometheus middleware has not been initialized.
func (*Collector) RecordActionDuration ¶
RecordActionDuration records action duration.
func (*Collector) RecordActionError ¶
RecordActionError records an action error.
func (*Collector) RecordColdDeployAck ¶
RecordColdDeployAck records a cold deploy acknowledgment.
func (*Collector) RecordPatchBytes ¶
RecordPatchBytes records patch bytes by route.
func (*Collector) RecordPatchMismatch ¶
RecordPatchMismatch records a patch mismatch by reason.
func (*Collector) RecordPersistBytes ¶
RecordPersistBytes records persisted bytes.
func (*Collector) RecordPersistWriteRejected ¶
RecordPersistWriteRejected records a rejected persist write.
func (*Collector) RecordResourceDuration ¶
RecordResourceDuration records resource duration.
func (*Collector) RecordResourceError ¶
RecordResourceError records a resource error.
func (*Collector) RecordResumeFailure ¶
RecordResumeFailure records a failed resume attempt.
func (*Collector) RecordResumeSuccess ¶
func (c *Collector) RecordResumeSuccess()
RecordResumeSuccess records a successful resume.
func (*Collector) RecordSchemaMismatch ¶
RecordSchemaMismatch records a schema mismatch.
func (*Collector) RecordSessionActive ¶
RecordSessionActive records active session deltas.
func (*Collector) RecordSessionDetached ¶
RecordSessionDetached records detached session deltas.
func (*Collector) RecordSessionPersistBytes ¶
RecordSessionPersistBytes records persisted bytes per session.
func (*Collector) RecordSignalWriteViolation ¶
RecordSignalWriteViolation records an invalid signal write attempt.
type MetricsConfig ¶
type MetricsConfig struct {
// Namespace is the metrics namespace (default: "vango").
Namespace string
// Subsystem is the metrics subsystem (default: "").
Subsystem string
// ConstLabels are constant labels added to all metrics.
ConstLabels prometheus.Labels
// Buckets are the histogram buckets for event duration.
// Default: prometheus.DefBuckets
Buckets []float64
// Registry is the Prometheus registry to use.
// Default: prometheus.DefaultRegisterer
Registry prometheus.Registerer
}
MetricsConfig configures the Prometheus metrics middleware.
type MetricsOption ¶
type MetricsOption func(*MetricsConfig)
MetricsOption configures the Prometheus metrics middleware.
func WithBuckets ¶
func WithBuckets(buckets []float64) MetricsOption
WithBuckets sets the histogram buckets.
func WithConstLabels ¶
func WithConstLabels(labels prometheus.Labels) MetricsOption
WithConstLabels sets constant labels for all metrics.
func WithNamespace ¶
func WithNamespace(namespace string) MetricsOption
WithNamespace sets the metrics namespace.
func WithRegistry ¶
func WithRegistry(registry prometheus.Registerer) MetricsOption
WithRegistry sets the Prometheus registry.
func WithSubsystem ¶
func WithSubsystem(subsystem string) MetricsOption
WithSubsystem sets the metrics subsystem.
type OTelConfig ¶
type OTelConfig struct {
// TracerName is the name of the tracer (default: "vango").
TracerName string
// IncludeUserID includes the user ID in traces if available.
// May contain sensitive information - disabled by default.
IncludeUserID bool
// IncludeRoute includes the current route in traces.
// Enabled by default.
IncludeRoute bool
// Filter determines which events to trace.
// Return true to trace the event, false to skip.
// If nil, all events are traced.
Filter func(ctx server.Ctx) bool
// AttributeExtractor extracts custom attributes from the context.
// Called for each traced event.
AttributeExtractor func(ctx server.Ctx) []attribute.KeyValue
// contains filtered or unexported fields
}
OTelConfig configures the OpenTelemetry middleware.
type OTelOption ¶
type OTelOption func(*OTelConfig)
OTelOption configures the OpenTelemetry middleware.
func WithAttributeExtractor ¶
func WithAttributeExtractor(extractor func(ctx server.Ctx) []attribute.KeyValue) OTelOption
WithAttributeExtractor sets a custom attribute extractor.
func WithEventFilter ¶
func WithEventFilter(filter func(ctx server.Ctx) bool) OTelOption
WithEventFilter sets a filter function for events.
func WithIncludeRoute ¶
func WithIncludeRoute(include bool) OTelOption
WithIncludeRoute enables/disables including route in traces.
func WithIncludeUserID ¶
func WithIncludeUserID(include bool) OTelOption
WithIncludeUserID enables including user ID in traces.
func WithTracerName ¶
func WithTracerName(name string) OTelOption
WithTracerName sets the tracer name.