Documentation
¶
Index ¶
- type BaseWorker
- type CommandWorker
- type EntityRegistry
- func (r *EntityRegistry) Count() int
- func (r *EntityRegistry) GetAll() []string
- func (r *EntityRegistry) InitializeKVStoreFromDB(kv nats.KeyValue) error
- func (r *EntityRegistry) IsRegistered(entityID string) bool
- func (r *EntityRegistry) LoadFromDB() error
- func (r *EntityRegistry) Register(entityID string)
- func (r *EntityRegistry) Unregister(entityID string)
- type EntityWorker
- type EventWorker
- type Manager
- func (m *Manager) GetJetStream() nats.JetStreamContext
- func (m *Manager) GetKeyValue() nats.KeyValue
- func (m *Manager) GetRegistry() *EntityRegistry
- func (m *Manager) GetWorkers() []Worker
- func (m *Manager) HealthCheck() error
- func (m *Manager) Name() string
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop(ctx context.Context) error
- type TelemetryWorker
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseWorker ¶
type BaseWorker struct {
// contains filtered or unexported fields
}
func NewBaseWorker ¶
func NewBaseWorker(name string, nc *nats.Conn, js nats.JetStreamContext, stream, consumer, subject string) *BaseWorker
func (*BaseWorker) HealthCheck ¶
func (w *BaseWorker) HealthCheck() error
func (*BaseWorker) Name ¶
func (w *BaseWorker) Name() string
type CommandWorker ¶
type CommandWorker struct {
*BaseWorker
}
func NewCommandWorker ¶
func NewCommandWorker(nc *nats.Conn, js nats.JetStreamContext) *CommandWorker
type EntityRegistry ¶
type EntityRegistry struct {
// contains filtered or unexported fields
}
EntityRegistry maintains an in-memory set of registered entity IDs This prevents excessive database reads for telemetry validation
func NewEntityRegistry ¶
func NewEntityRegistry(db *sql.DB) (*EntityRegistry, error)
NewEntityRegistry creates a new entity registry and loads existing entities from DB
func (*EntityRegistry) Count ¶
func (r *EntityRegistry) Count() int
Count returns the number of registered entities
func (*EntityRegistry) GetAll ¶
func (r *EntityRegistry) GetAll() []string
GetAll returns all registered entity IDs
func (*EntityRegistry) InitializeKVStoreFromDB ¶
func (r *EntityRegistry) InitializeKVStoreFromDB(kv nats.KeyValue) error
InitializeKVStoreFromDB ensures all entities in the database have a corresponding KV entry This is called on boot to populate the KV store with initial entity states
func (*EntityRegistry) IsRegistered ¶
func (r *EntityRegistry) IsRegistered(entityID string) bool
IsRegistered checks if an entity_id is in the registry
func (*EntityRegistry) LoadFromDB ¶
func (r *EntityRegistry) LoadFromDB() error
LoadFromDB loads all entity IDs from the database into memory
func (*EntityRegistry) Register ¶
func (r *EntityRegistry) Register(entityID string)
Register adds an entity_id to the registry
func (*EntityRegistry) Unregister ¶
func (r *EntityRegistry) Unregister(entityID string)
Unregister removes an entity_id from the registry
type EntityWorker ¶
type EntityWorker struct {
*BaseWorker
}
func NewEntityWorker ¶
func NewEntityWorker(nc *nats.Conn, js nats.JetStreamContext) *EntityWorker
type EventWorker ¶
type EventWorker struct {
*BaseWorker
// contains filtered or unexported fields
}
func NewEventWorker ¶
func NewEventWorker(nc *nats.Conn, js nats.JetStreamContext, db *sql.DB, registry *EntityRegistry) *EventWorker
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
func NewManager ¶
func NewManager(natsClient *embeddednats.EmbeddedNATS, db *sql.DB) (*Manager, error)
NewManager creates a worker manager with database and KV store access
func (*Manager) GetJetStream ¶ added in v0.0.12
func (m *Manager) GetJetStream() nats.JetStreamContext
GetJetStream returns the JetStream context
func (*Manager) GetKeyValue ¶ added in v0.0.12
GetKeyValue returns the KV store
func (*Manager) GetRegistry ¶ added in v0.0.12
func (m *Manager) GetRegistry() *EntityRegistry
GetRegistry returns the entity registry
func (*Manager) GetWorkers ¶ added in v0.0.12
GetWorkers returns all workers for status monitoring
func (*Manager) HealthCheck ¶ added in v0.0.18
HealthCheck returns the health status of the worker manager.
type TelemetryWorker ¶
type TelemetryWorker struct {
*BaseWorker
// contains filtered or unexported fields
}
TelemetryWorker processes telemetry messages and maintains global entity state
func NewTelemetryWorker ¶
func NewTelemetryWorker(nc *nats.Conn, js nats.JetStreamContext, db *sql.DB, kv nats.KeyValue, registry *EntityRegistry) *TelemetryWorker
NewTelemetryWorker creates a new telemetry worker with database and KV store access