kafkalinker

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package kafkalinker provides a Kafka pubsub implementation of the component.Linker interface. It is intended to be used by loaders that link components in different address-spaces over a Kafka broker.

By default, the linker uses the following Kafka configuration:

  • bootstrap.servers: <required>
  • group.id: <required>
  • auto.offset.reset: earliest

Importing the package registers flags with the loader package - always call loader.ParseFlags at the beginning of main() to parse the standard command-line flags for the loader package.

See the variable declarations below and init() function for the list of flags registered by this package.

The linked Kafka topics and subscriptions use github.com/IBM/sarama as the underlying Kafka driver.

New returns a component.Linker that links topics and subscriptions using the global Config as the Sarama configuration. That global config is settable from command-line flags and environment variables. For example:

$ export KAFKA_CONFIG=ChannelBufferSize=100;ChannelBufferSize=2

$ go run github.com/example/my-system/cmd/demo -kafka-config 'Metadata.Full=true' -kafka-config 'Consumer.Offset.Initial=sarama.OffsetOldest'

Index

Constants

This section is empty.

Variables

View Source
var (
	// Brokers is the list of Kafka brokers to connect to.
	Brokers []string // -kafka-brokers | $KAFKA_BROKERS

	// Config is the Kafka config from type sarama.Config to be read from cmd flags
	// or env vars.
	Config = MinimalConfig() // -kafka-config | $KAFKA_CONFIG
)

Functions

func MinimalConfig

func MinimalConfig() *sarama.Config

MinimalConfig returns a *sarama.Config instance with basic settings for connecting to a Kafka cluster. It makes the following modifications to the initial configuration provided by kafkapubsub.MinimalConfig:

  • ClientID: Set to the program name for identification purposes.
  • Kafka Version: Upgraded to V3.2.1.0 from the default V0.11.0.0.
  • Consumer Offsets: Configured to start reading from the earliest available messages (OffsetOldest).

The underlying kafkapubsub.MinimalConfig() also applies these settings:

- Headers Support: Enabled by setting Kafka version to V0.11.0.0. - Producer Returns: Configured to return successes for SyncProducer.

Types

type BrokersFlag

type BrokersFlag []string

BrokersFlag implements flag.Value for a comma-separated list of Kafka brokers.

func (*BrokersFlag) Get

func (f *BrokersFlag) Get() any

Get returns the wrapped list of brokers as a []string.

func (*BrokersFlag) Set

func (f *BrokersFlag) Set(s string) error

Set splits a comma-separated value string into individual broker addresses, updating the wrapped list of brokers.

func (*BrokersFlag) String

func (f *BrokersFlag) String() string

String returns a comma-separated string representation of the list of brokers.

type ConfigFlag

type ConfigFlag sarama.Config

ConfigFlag enhances the Sarama Config by integrating with command-line tools to dynamically set Kafka configuration options. This type allows flexible and reflective manipulation of configuration fields.

func (*ConfigFlag) Set

func (f *ConfigFlag) Set(s string) error

Set assigns a key=value pair to the pointer sarama.Config using reflection.

It supports fields with basic types: string, bool, signed and unsigned integers, floats, []byte, time.Duration, and types that implement encoding.TextUnmarshaler. The sarama.Config.Version field is uniquely supported by parsing according to sarama.KafkaVersion.

The key input must adhere to these rules:

  • Each segment of the path should refer to a field within a struct at the corresponding level.
  • Fields that are not exported cannot be accessed.
  • Pointers must be allocated (non-nil) to allow dereferencing. If a pointer is nil, it indicates the absence of the underlying struct, and the function will not be able to find and set the field.

Attempting to set unsupported types will result in a non-nil error.

If the value "nil" is provided, the field will be set to the zero value for its type, effectively clearing or resetting the field. The function returns an error if the key is not found, is not settable, or the value type is unsupported.

func (*ConfigFlag) String

func (f *ConfigFlag) String() string

The func implements the flag.Value interface, returning a string representation of the flag's default value.

If the config differs from the minimal config, it indicates that. Otherwise, it confirms that the configuration is indeed the default.

type KafkaLinker

type KafkaLinker struct {
	*kafkapubsub.URLOpener
	InstanceId string            // correlates to a Kafka consumer-group
	Aspects    map[string]string // map[aspect]topic
	Interests  map[string]string // map[interest]topic
}

KafkaLinker creates a new kafka-based component.Linker, such that all targets are linked using the same config provided in the given opener.

The provided instance string uniquely identifies an instance of the component intended to use this linker such that all interests opened with the same instance shall cooperate consuming the subscription (i.e., it the name of the consumer-group).

func New

func New(group string, aspects, interests map[string]string) (KafkaLinker, error)

New creates a component.Linker that links components over Kafka, using the global configuration variables registered by this package with the loader package.

The provided group name essentially is a consumer-group in Kafka terminology. The given aspect/interest maps correlate between named targets and their effective topic in the Kafka broker.

func (KafkaLinker) LinkAspect

func (l KafkaLinker) LinkAspect(ctx context.Context, aspect string) (target *pubsub.Topic, err error)

func (KafkaLinker) LinkInterest

func (l KafkaLinker) LinkInterest(ctx context.Context, interest string) (target *pubsub.Subscription, err error)

func (KafkaLinker) OpenAspect

func (l KafkaLinker) OpenAspect(ctx context.Context, topic string) (*pubsub.Topic, error)

OpenAspect opens a topic to the given topic directly, as opposed to looking up the topic name in the preconfigured aspects map.

func (KafkaLinker) OpenInterest

func (l KafkaLinker) OpenInterest(ctx context.Context, topic string) (*pubsub.Subscription, error)

OpenInterest opens a subscription to the given topic directly, as opposed to looking up the topic name in the preconfigured interests map.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL