hub

package module
v0.0.0-...-9a4bc85 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

README

Hub

A Go library that embeds a NATS server and provides high-level abstractions for messaging, JetStream, Key-Value Store, and Object Store operations.

Features

  • Embedded NATS Server - Run a full NATS server in-process
  • Volatile Messaging - Standard publish/subscribe and request-reply patterns
  • Persistent Messaging - JetStream with durable/ephemeral subscriptions
  • Key-Value Store - Distributed key-value storage with TTL support
  • Object Store - Large object storage with metadata
  • Clustering - Built-in support for NATS clustering

Installation

go get git.sr.ht/~hyper/hub

Quick Start

package main

import (
    "fmt"
    "log"
    
    "git.sr.ht/~hyper/hub"
)

func main() {
    // Create hub with default options
    opts, err := hub.DefaultNodeOptions()
    if err != nil {
        log.Fatal(err)
    }
    
    h, err := hub.NewHub(opts)
    if err != nil {
        log.Fatal(err)
    }
    defer h.Shutdown()
    
    // Subscribe to messages
    cancel, err := h.SubscribeVolatileViaFanout("greetings", 
        func(subject string, msg []byte) ([]byte, bool) {
            fmt.Printf("Received: %s\n", string(msg))
            return nil, false
        }, 
        func(err error) {
            log.Printf("Error: %v", err)
        })
    if err != nil {
        log.Fatal(err)
    }
    defer cancel()
    
    // Publish a message
    err = h.PublishVolatile("greetings", []byte("Hello, World!"))
    if err != nil {
        log.Fatal(err)
    }
}

Usage

Configuration Options

Hub provides three default configurations:

Basic Node (Full Features)
opts, err := hub.DefaultNodeOptions()
  • Full JetStream support
  • Clustering capability
  • Best for: standalone servers, development
Gateway Node (Network Bridge)
// import "net/url"
opts, err := hub.DefaultGatewayOptions()
remoteGatewayURL, _ := url.Parse("nats://remote:7222")
opts.GatewayRoutes = []struct {
    Name string
    URL  *url.URL
}{
    {Name: "remote", URL: remoteGatewayURL},
}
  • Connects different networks
  • Best for: multi-datacenter, network isolation
Edge Node (Lightweight)
// import "net/url"
opts, err := hub.DefaultEdgeOptions()
hubURL, _ := url.Parse("nats://central:7422")
opts.LeafNodeRoutes = []*url.URL{hubURL}
  • No JetStream (uses central hub)
  • Minimal resource usage
  • Best for: IoT devices, edge computing
Messaging Patterns
1. Volatile Publish/Subscribe
// Subscribe
cancel, _ := h.SubscribeVolatileViaFanout("events", handler, errorHandler)
defer cancel()

// Publish
h.PublishVolatile("events", []byte("data"))
2. Queue Groups (Load Balancing)
// Multiple workers - only one receives each message
cancel, _ := h.SubscribeVolatileViaQueue("tasks", "workers", handler, errorHandler)

h.PublishVolatile("tasks", []byte("task data"))
3. Request/Reply
// Responder
h.SubscribeVolatileViaFanout("service", func(subject string, msg []byte) ([]byte, bool) {
    return []byte("response"), true // true = send response
}, errorHandler)

// Requester (import "time")
response, _ := h.RequestVolatile("service", []byte("request"), 5*time.Second)
4. Persistent Messaging (JetStream)
// Create stream (import "time")
config := &hub.PersistentConfig{
    Subjects: []string{"orders.>"},
    MaxMsgs:  10000,
    MaxAge:   24 * time.Hour,
}
h.CreateOrUpdatePersistent(config)

// Durable consumer (survives restart)
cancel, _ := h.SubscribePersistentViaDurable("consumer-1", "orders.new", 
    func(subject string, msg []byte) ([]byte, bool, bool) {
        // Process message
        return nil, false, true // last bool = ACK
    }, errorHandler)

// Publish persistent message
h.PublishPersistent("orders.new", []byte("order data"))
5. Key-Value Store
// Create KV store (import "time")
kvConfig := hub.KeyValueStoreConfig{
    Bucket:  "settings",
    TTL:     30 * 24 * time.Hour,
}
h.CreateOrUpdateKeyValueStore(kvConfig)

// Store and retrieve
h.PutToKeyValueStore("settings", "user:123", []byte(`{"theme":"dark"}`))
data, revision, _ := h.GetFromKeyValueStore("settings", "user:123")
6. Object Store
// Create object store
objConfig := hub.ObjectStoreConfig{
    Bucket:   "files",
    MaxBytes: hub.NewSizeFromGigabytes(10),
}
h.CreateObjectStore(objConfig)

// Store object with metadata
metadata := map[string]string{"type": "document"}
h.PutToObjectStore("files", "doc.pdf", fileData, metadata)

// Retrieve object
data, _ := h.GetFromObjectStore("files", "doc.pdf")

API Reference

Hub Management
  • NewHub(opts *Options) (*Hub, error) - Create and start hub
  • Shutdown() - Gracefully shutdown
Volatile Messaging
  • SubscribeVolatileViaFanout(subject, handler, errHandler) - Subscribe (all receive)
  • SubscribeVolatileViaQueue(subject, queue, handler, errHandler) - Queue subscribe (one receives)
  • PublishVolatile(subject, msg) - Publish message
  • RequestVolatile(subject, msg, timeout) - Request with response
Persistent Messaging (JetStream)
  • CreateOrUpdatePersistent(config) - Create/update stream
  • SubscribePersistentViaDurable(id, subject, handler, errHandler) - Durable consumer
  • SubscribePersistentViaEphemeral(subject, handler, errHandler) - Ephemeral consumer
  • PublishPersistent(subject, msg) - Publish to stream
Key-Value Store
  • CreateOrUpdateKeyValueStore(config) - Create/update KV store
  • GetFromKeyValueStore(bucket, key) - Get value
  • PutToKeyValueStore(bucket, key, value) - Store value
  • UpdateToKeyValueStore(bucket, key, value, revision) - Update with version check
  • DeleteFromKeyValueStore(bucket, key) - Delete key
  • PurgeKeyValueStore(bucket, key) - Purge all revisions of a key
Object Store
  • CreateObjectStore(config) - Create object store
  • GetFromObjectStore(bucket, key) - Get object
  • PutToObjectStore(bucket, key, data, metadata) - Store object
  • DeleteFromObjectStore(bucket, key) - Delete object

Handler Signatures

// Volatile handler
func(subject string, msg []byte) (response []byte, sendReply bool)

// Persistent handler
func(subject string, msg []byte) (response []byte, sendReply bool, ack bool)

// Error handler
func(error)

Examples

Clustering
// import "net/url"
// Node 1
opts1, _ := hub.DefaultNodeOptions()
opts1.Name = "node1"
h1, _ := hub.NewHub(opts1)

// Node 2
opts2, _ := hub.DefaultNodeOptions()
opts2.Name = "node2"
opts2.Routes = []*url.URL{{Host: "node1:6222"}}
h2, _ := hub.NewHub(opts2)
Edge Computing
// import "net/url"
// Central hub
hubOpts, _ := hub.DefaultNodeOptions()
hubOpts.Name = "central"
central, _ := hub.NewHub(hubOpts)

// Edge node
edgeOpts, _ := hub.DefaultEdgeOptions()
edgeOpts.LeafNodeRoutes = []*url.URL{{Host: "central:7422"}}
edge, _ := hub.NewHub(edgeOpts)

Pattern Selection Guide

Pattern Persistence Use Case
Volatile Pub/Sub Real-time events, notifications
Queue Groups Load balancing, task distribution
Request/Reply RPC, service calls
JetStream Event sourcing, audit logs
Key-Value Configuration, cache
Object Store File storage, large data

Requirements

  • Go 1.24 or later
  • Linux, macOS, or Windows

License

Apache License 2.0 - see LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	SizeBytes     Size = 1
	SizeKilobytes      = 1024 * SizeBytes
	SizeMegabytes      = 1024 * SizeKilobytes
	SizeGigabytes      = 1024 * SizeMegabytes
	SizeTerabytes      = 1024 * SizeGigabytes
	SizePetabytes      = 1024 * SizeTerabytes
	SizeExabytes       = 1024 * SizePetabytes
)
View Source
const HubClusterName = "hubstream"
View Source
const HubIDFile = "hub.id"

Variables

This section is empty.

Functions

This section is empty.

Types

type AuthMethod

type AuthMethod func(username string, password string, token string) bool

type CustomAuthenticator

type CustomAuthenticator struct {
	// contains filtered or unexported fields
}

func NewCustomAuthenticator

func NewCustomAuthenticator(authMethod AuthMethod) *CustomAuthenticator

func (*CustomAuthenticator) Check

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

func NewHub

func NewHub(opt *Options) (*Hub, error)

func (*Hub) CreateObjectStore

func (h *Hub) CreateObjectStore(config ObjectStoreConfig) error

func (*Hub) CreateOrUpdateKeyValueStore

func (h *Hub) CreateOrUpdateKeyValueStore(config KeyValueStoreConfig) error

func (*Hub) CreateOrUpdatePersistent

func (h *Hub) CreateOrUpdatePersistent(cfg *PersistentConfig) error

func (*Hub) DeleteFromKeyValueStore

func (h *Hub) DeleteFromKeyValueStore(bucket, key string) error

func (*Hub) DeleteFromObjectStore

func (h *Hub) DeleteFromObjectStore(bucket, key string) error

func (*Hub) GetFromKeyValueStore

func (h *Hub) GetFromKeyValueStore(bucket, key string) ([]byte, uint64, error)

func (*Hub) GetFromObjectStore

func (h *Hub) GetFromObjectStore(bucket, key string) ([]byte, error)

func (*Hub) PublishPersistent

func (h *Hub) PublishPersistent(subject string, msg []byte) error

func (*Hub) PublishVolatile

func (h *Hub) PublishVolatile(subject string, msg []byte) error

func (*Hub) PurgeKeyValueStore

func (h *Hub) PurgeKeyValueStore(bucket, key string) error

func (*Hub) PutToKeyValueStore

func (h *Hub) PutToKeyValueStore(bucket, key string, value []byte) (uint64, error)

func (*Hub) PutToObjectStore

func (h *Hub) PutToObjectStore(bucket, key string, data []byte, metadata map[string]string) error

func (*Hub) RequestVolatile

func (h *Hub) RequestVolatile(subject string, msg []byte, timeout time.Duration) ([]byte, error)

func (*Hub) Shutdown

func (h *Hub) Shutdown()

func (*Hub) SubscribePersistentViaDurable

func (h *Hub) SubscribePersistentViaDurable(subscriberID string, subject string, handler func(subject string, msg []byte) (response []byte, reply bool, ack bool), errHandler func(error)) (cancel func(), err error)

func (*Hub) SubscribePersistentViaEphemeral

func (h *Hub) SubscribePersistentViaEphemeral(subject string, handler func(subject string, msg []byte) (response []byte, reply bool, ack bool), errHandler func(error)) (cancel func(), err error)

func (*Hub) SubscribeVolatileViaFanout

func (h *Hub) SubscribeVolatileViaFanout(subject string, handler func(subject string, msg []byte) ([]byte, bool), errHandler func(error)) (cancel func(), err error)

func (*Hub) SubscribeVolatileViaQueue

func (h *Hub) SubscribeVolatileViaQueue(subject, queue string, handler func(subject string, msg []byte) ([]byte, bool), errHandler func(error)) (cancel func(), err error)

func (*Hub) UpdateToKeyValueStore

func (h *Hub) UpdateToKeyValueStore(bucket, key string, value []byte, expectedRevision uint64) (uint64, error)

type KeyValueStoreConfig

type KeyValueStoreConfig struct {
	Bucket       string
	Description  string
	MaxValueSize Size
	TTL          time.Duration
	MaxBytes     Size
	Replicas     int
}

type ObjectStoreConfig

type ObjectStoreConfig struct {
	Bucket      string            `json:"bucket"`
	Description string            `json:"description,omitempty"`
	TTL         time.Duration     `json:"max_age,omitempty"`
	MaxBytes    Size              `json:"max_bytes,omitempty"`
	Replicas    int               `json:"num_replicas,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
}

type Options

type Options struct {
	Name               string
	Host               string
	Port               int
	AuthorizationToken string
	MaxPayload         Size

	Routes []*url.URL

	ClusterHost         string
	ClusterPort         int
	ClusterUsername     string
	ClusterPassword     string
	ClusterConnPoolSize int
	ClusterPingInterval time.Duration

	GatewayHost     string
	GatewayPort     int
	GatewayUsername string
	GatewayPassword string
	GatewayRoutes   []struct {
		Name string
		URL  *url.URL
	}

	LeafNodeHost     string
	LeafNodePort     int
	LeafNodeUsername string
	LeafNodePassword string
	LeafNodeRoutes   []*url.URL

	JetstreamMaxMemory    Size
	JetstreamMaxStorage   Size
	StreamMaxBufferedMsgs int
	StreamMaxBufferedSize int64
	StoreDir              string
	SyncInterval          time.Duration
	SyncAlways            bool

	LogFile      string
	LogSizeLimit int64
	LogMaxFiles  int64
	Syslog       bool
	RemoteSyslog string

	ClientAuthenticationMethod AuthMethod
	RouterAuthenticationMethod AuthMethod
}

func DefaultEdgeOptions

func DefaultEdgeOptions() (*Options, error)

func DefaultGatewayOptions

func DefaultGatewayOptions() (*Options, error)

func DefaultNodeOptions

func DefaultNodeOptions() (*Options, error)

type PersistentConfig

type PersistentConfig struct {
	// Description is an optional description of the stream.
	Description string

	// Subjects is a list of subjects that the stream is listening on.
	// Wildcards are supported. Subjects cannot be set if the stream is
	// created as a mirror.
	Subjects []string

	// Retention defines the message retention policy for the stream.
	// Defaults to LimitsPolicy.
	Retention nats.RetentionPolicy

	// MaxConsumers specifies the maximum number of consumers allowed for
	// the stream.
	MaxConsumers int

	// MaxMsgs is the maximum number of messages the stream will store.
	// After reaching the limit, stream adheres to the discard policy.
	// If not set, server default is -1 (unlimited).
	MaxMsgs int64

	// MaxBytes is the maximum total size of messages the stream will store.
	// After reaching the limit, stream adheres to the discard policy.
	// If not set, server default is -1 (unlimited).
	MaxBytes int64

	// MaxAge is the maximum age of messages that the stream will retain.
	MaxAge time.Duration

	// MaxMsgsPerSubject is the maximum number of messages per subject that
	// the stream will retain.
	MaxMsgsPerSubject int64

	// MaxMsgSize is the maximum size of any single message in the stream.
	MaxMsgSize Size

	// Replicas is the number of stream replicas in clustered JetStream.
	// Defaults to 1, maximum is 5.
	Replicas int

	// NoAck is a flag to disable acknowledging messages received by this
	// stream.
	//
	// If set to true, publish methods from the JetStream client will not
	// work as expected, since they rely on acknowledgements. Core NATS
	// publish methods should be used instead. Note that this will make
	// message delivery less reliable.
	NoAck bool

	// Duplicates is the window within which to track duplicate messages.
	// If not set, server default is 2 minutes.
	Duplicates time.Duration

	// Metadata is an optional set of key/value pairs that can be used to
	// store additional information about the stream.
	Metadata map[string]string
}

type Size

type Size int64

func NewSizeFromBytes

func NewSizeFromBytes(bytes int64) Size

func NewSizeFromExabytes

func NewSizeFromExabytes(eb float64) Size

func NewSizeFromGigabytes

func NewSizeFromGigabytes(gb float64) Size

func NewSizeFromKilobytes

func NewSizeFromKilobytes(kb float64) Size

func NewSizeFromMegabytes

func NewSizeFromMegabytes(mb float64) Size

func NewSizeFromPetabytes

func NewSizeFromPetabytes(pb float64) Size

func NewSizeFromTerabytes

func NewSizeFromTerabytes(tb float64) Size

func (Size) Bytes

func (s Size) Bytes() int64

func (Size) Exabytes

func (s Size) Exabytes() float64

func (Size) Gigabytes

func (s Size) Gigabytes() float64

func (Size) Kilobytes

func (s Size) Kilobytes() float64

func (Size) MarshalText

func (s Size) MarshalText() ([]byte, error)

func (Size) Megabytes

func (s Size) Megabytes() float64

func (Size) Petabytes

func (s Size) Petabytes() float64

func (Size) String

func (s Size) String() string

func (Size) Terabytes

func (s Size) Terabytes() float64

func (*Size) UnmarshalText

func (s *Size) UnmarshalText(text []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL