workers

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseWorker

type BaseWorker struct {
	// contains filtered or unexported fields
}

func NewBaseWorker

func NewBaseWorker(name string, nc *nats.Conn, js nats.JetStreamContext, stream, consumer, subject string) *BaseWorker

func (*BaseWorker) HealthCheck

func (w *BaseWorker) HealthCheck() error

func (*BaseWorker) Name

func (w *BaseWorker) Name() string

func (*BaseWorker) Stop

func (w *BaseWorker) Stop(ctx context.Context) error

type CommandWorker

type CommandWorker struct {
	*BaseWorker
}

func NewCommandWorker

func NewCommandWorker(nc *nats.Conn, js nats.JetStreamContext) *CommandWorker

func (*CommandWorker) Start

func (w *CommandWorker) Start(ctx context.Context) error

type EntityRegistry

type EntityRegistry struct {
	// contains filtered or unexported fields
}

EntityRegistry maintains an in-memory set of registered entity IDs This prevents excessive database reads for telemetry validation

func NewEntityRegistry

func NewEntityRegistry(db *sql.DB) (*EntityRegistry, error)

NewEntityRegistry creates a new entity registry and loads existing entities from DB

func (*EntityRegistry) Count

func (r *EntityRegistry) Count() int

Count returns the number of registered entities

func (*EntityRegistry) GetAll

func (r *EntityRegistry) GetAll() []string

GetAll returns all registered entity IDs

func (*EntityRegistry) InitializeKVStoreFromDB

func (r *EntityRegistry) InitializeKVStoreFromDB(kv nats.KeyValue) error

InitializeKVStoreFromDB ensures all entities in the database have a corresponding KV entry This is called on boot to populate the KV store with initial entity states

func (*EntityRegistry) IsRegistered

func (r *EntityRegistry) IsRegistered(entityID string) bool

IsRegistered checks if an entity_id is in the registry

func (*EntityRegistry) LoadFromDB

func (r *EntityRegistry) LoadFromDB() error

LoadFromDB loads all entity IDs from the database into memory

func (*EntityRegistry) Register

func (r *EntityRegistry) Register(entityID string)

Register adds an entity_id to the registry

func (*EntityRegistry) Unregister

func (r *EntityRegistry) Unregister(entityID string)

Unregister removes an entity_id from the registry

type EntityWorker

type EntityWorker struct {
	*BaseWorker
}

func NewEntityWorker

func NewEntityWorker(nc *nats.Conn, js nats.JetStreamContext) *EntityWorker

func (*EntityWorker) Start

func (w *EntityWorker) Start(ctx context.Context) error

type EventWorker

type EventWorker struct {
	*BaseWorker
	// contains filtered or unexported fields
}

func NewEventWorker

func NewEventWorker(nc *nats.Conn, js nats.JetStreamContext, db *sql.DB, registry *EntityRegistry) *EventWorker

func (*EventWorker) Start

func (w *EventWorker) Start(ctx context.Context) error

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(natsClient *embeddednats.EmbeddedNATS, db *sql.DB) (*Manager, error)

NewManager creates a worker manager with database and KV store access

func (*Manager) GetJetStream added in v0.0.12

func (m *Manager) GetJetStream() nats.JetStreamContext

GetJetStream returns the JetStream context

func (*Manager) GetKeyValue added in v0.0.12

func (m *Manager) GetKeyValue() nats.KeyValue

GetKeyValue returns the KV store

func (*Manager) GetRegistry added in v0.0.12

func (m *Manager) GetRegistry() *EntityRegistry

GetRegistry returns the entity registry

func (*Manager) GetWorkers added in v0.0.12

func (m *Manager) GetWorkers() []Worker

GetWorkers returns all workers for status monitoring

func (*Manager) HealthCheck added in v0.0.18

func (m *Manager) HealthCheck() error

HealthCheck returns the health status of the worker manager.

func (*Manager) Name added in v0.0.18

func (m *Manager) Name() string

Name returns the service name for logging.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context) error

type TelemetryWorker

type TelemetryWorker struct {
	*BaseWorker
	// contains filtered or unexported fields
}

TelemetryWorker processes telemetry messages and maintains global entity state

func NewTelemetryWorker

func NewTelemetryWorker(nc *nats.Conn, js nats.JetStreamContext, db *sql.DB, kv nats.KeyValue, registry *EntityRegistry) *TelemetryWorker

NewTelemetryWorker creates a new telemetry worker with database and KV store access

func (*TelemetryWorker) Start

func (w *TelemetryWorker) Start(ctx context.Context) error

type Worker

type Worker interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Name() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL