Documentation
¶
Overview ¶
Package kafkalinker provides a Kafka pubsub implementation of the component.Linker interface. It is intended to be used by loaders that link components in different address-spaces over a Kafka broker.
By default, the linker uses the following Kafka configuration:
- bootstrap.servers: <required>
- group.id: <required>
- auto.offset.reset: earliest
Importing the package registers flags with the loader package - always call loader.ParseFlags at the beginning of main() to parse the standard command-line flags for the loader package.
See the variable declarations below and init() function for the list of flags registered by this package.
The linked Kafka topics and subscriptions use github.com/IBM/sarama as the underlying Kafka driver.
New returns a component.Linker that links topics and subscriptions using the global Config as the Sarama configuration. That global config is settable from command-line flags and environment variables. For example:
$ export KAFKA_CONFIG=ChannelBufferSize=100;ChannelBufferSize=2 $ go run github.com/example/my-system/cmd/demo -kafka-config 'Metadata.Full=true' -kafka-config 'Consumer.Offset.Initial=sarama.OffsetOldest'
Index ¶
- Variables
- func MinimalConfig() *sarama.Config
- type BrokersFlag
- type ConfigFlag
- type KafkaLinker
- func (l KafkaLinker) LinkAspect(ctx context.Context, aspect string) (target *pubsub.Topic, err error)
- func (l KafkaLinker) LinkInterest(ctx context.Context, interest string) (target *pubsub.Subscription, err error)
- func (l KafkaLinker) OpenAspect(ctx context.Context, topic string) (*pubsub.Topic, error)
- func (l KafkaLinker) OpenInterest(ctx context.Context, topic string) (*pubsub.Subscription, error)
Constants ¶
This section is empty.
Variables ¶
var ( // Brokers is the list of Kafka brokers to connect to. Brokers []string // -kafka-brokers | $KAFKA_BROKERS // Config is the Kafka config from type sarama.Config to be read from cmd flags // or env vars. Config = MinimalConfig() // -kafka-config | $KAFKA_CONFIG )
Functions ¶
func MinimalConfig ¶
MinimalConfig returns a *sarama.Config instance with basic settings for connecting to a Kafka cluster. It makes the following modifications to the initial configuration provided by kafkapubsub.MinimalConfig:
- ClientID: Set to the program name for identification purposes.
- Kafka Version: Upgraded to V3.2.1.0 from the default V0.11.0.0.
- Consumer Offsets: Configured to start reading from the earliest available messages (OffsetOldest).
The underlying kafkapubsub.MinimalConfig() also applies these settings:
- Headers Support: Enabled by setting Kafka version to V0.11.0.0. - Producer Returns: Configured to return successes for SyncProducer.
Types ¶
type BrokersFlag ¶
type BrokersFlag []string
BrokersFlag implements flag.Value for a comma-separated list of Kafka brokers.
func (*BrokersFlag) Get ¶
func (f *BrokersFlag) Get() any
Get returns the wrapped list of brokers as a []string.
func (*BrokersFlag) Set ¶
func (f *BrokersFlag) Set(s string) error
Set splits a comma-separated value string into individual broker addresses, updating the wrapped list of brokers.
func (*BrokersFlag) String ¶
func (f *BrokersFlag) String() string
String returns a comma-separated string representation of the list of brokers.
type ConfigFlag ¶
ConfigFlag enhances the Sarama Config by integrating with command-line tools to dynamically set Kafka configuration options. This type allows flexible and reflective manipulation of configuration fields.
func (*ConfigFlag) Set ¶
func (f *ConfigFlag) Set(s string) error
Set assigns a key=value pair to the pointer sarama.Config using reflection.
It supports fields with basic types: string, bool, signed and unsigned integers, floats, []byte, time.Duration, and types that implement encoding.TextUnmarshaler. The sarama.Config.Version field is uniquely supported by parsing according to sarama.KafkaVersion.
The key input must adhere to these rules:
- Each segment of the path should refer to a field within a struct at the corresponding level.
- Fields that are not exported cannot be accessed.
- Pointers must be allocated (non-nil) to allow dereferencing. If a pointer is nil, it indicates the absence of the underlying struct, and the function will not be able to find and set the field.
Attempting to set unsupported types will result in a non-nil error.
If the value "nil" is provided, the field will be set to the zero value for its type, effectively clearing or resetting the field. The function returns an error if the key is not found, is not settable, or the value type is unsupported.
func (*ConfigFlag) String ¶
func (f *ConfigFlag) String() string
The func implements the flag.Value interface, returning a string representation of the flag's default value.
If the config differs from the minimal config, it indicates that. Otherwise, it confirms that the configuration is indeed the default.
type KafkaLinker ¶
type KafkaLinker struct {
*kafkapubsub.URLOpener
InstanceId string // correlates to a Kafka consumer-group
Aspects map[string]string // map[aspect]topic
Interests map[string]string // map[interest]topic
}
KafkaLinker creates a new kafka-based component.Linker, such that all targets are linked using the same config provided in the given opener.
The provided instance string uniquely identifies an instance of the component intended to use this linker such that all interests opened with the same instance shall cooperate consuming the subscription (i.e., it the name of the consumer-group).
func New ¶
func New(group string, aspects, interests map[string]string) (KafkaLinker, error)
New creates a component.Linker that links components over Kafka, using the global configuration variables registered by this package with the loader package.
The provided group name essentially is a consumer-group in Kafka terminology. The given aspect/interest maps correlate between named targets and their effective topic in the Kafka broker.
func (KafkaLinker) LinkAspect ¶
func (KafkaLinker) LinkInterest ¶
func (l KafkaLinker) LinkInterest(ctx context.Context, interest string) (target *pubsub.Subscription, err error)
func (KafkaLinker) OpenAspect ¶
OpenAspect opens a topic to the given topic directly, as opposed to looking up the topic name in the preconfigured aspects map.
func (KafkaLinker) OpenInterest ¶
func (l KafkaLinker) OpenInterest(ctx context.Context, topic string) (*pubsub.Subscription, error)
OpenInterest opens a subscription to the given topic directly, as opposed to looking up the topic name in the preconfigured interests map.