shiva

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

Shiva

Final Fantasy - Shiva

Go Reference Go Report Card License Release Go Version Build Status Coverage Status

Shiva is a GO library/module for working with Kafka. Shiva provides friendly higher level APIs for consuming and producing messages with Kafka. Under the hood Shiva uses the official Confluent Kafka GO client (https://github.com/confluentinc/confluent-kafka-go). Some GO developers are very much opposed to using CGO, and unfortunately, if you are dead set on avoiding CGO, this library may not be for you as it uses confluent-kafka-go, which is a wrapper around librdkafka.

Shiva has a number of features that aim to make working with Kafka in GO easy:

  • High-level and flexible Consumer API for consuming messages from Kafka
  • High-level API for producing messages synchronously and asynchronously to Kafka.
  • Support for OpenTelemetry tracing and metrics
  • Built-in support for dead letter processing when a message cannot be processed
  • Separates the concerns of Kafka from processing messages via the Handler interface

Where Does the Name Shiva Come From?

Shiva is a frequently recurring Ice-elemental summon in the Final Fantasy series. Although enjoying regular appearances throughout the series, Shiva, like most of the popular summonable entities, has not been given a significant back story, being simply described as the "Ice Queen". As naming things can be quite hard, I've started naming my libraries and packages based on video game lore and universes.

Quickstart

Add shiva as a dependency

go get github.com/jkratz55/shiva

The following examples are demonstrating using OpenTelemetry for tracing and metrics (via Prometheus) along with utilizing many of the hooks Shiva offers to invoke code on events. Depending on the use cases you may not need all the features being shown in the examples below.

Consumer
package main

import (
	"context"
	"fmt"
	"log/slog"
	"net/http"
	"os"
	"time"

	"go.opentelemetry.io/otel/exporters/prometheus"
	"go.opentelemetry.io/otel/sdk/metric"

	"github.com/prometheus/client_golang/prometheus/promhttp"

	"github.com/jkratz55/shiva"
	"github.com/jkratz55/shiva/shivaotel"
)

// ExampleHandler is a Handler implementation that just prints out the message
// key. In the real world you'd add your code/logic to process the message.
type ExampleHandler struct{}

func (e ExampleHandler) Handle(ctx context.Context, msg shiva.Message) error {
	// Simulate time it takes to process a message
	time.Sleep(time.Millisecond * 10)

	fmt.Println(msg.Key)
	return nil
}

// DeadLetterHandler is a simple and silly example that simply logs the failed
// message. In some circumstances this may be fine if you have no need to
// re-process the message, but generally you will probably want to save the
// message to a database, disk, or publish it to a retry or dead letter topic
// so it can be retried.
type DeadLetterHandler struct {
	logger *slog.Logger
}

func NewDeadLetterHandler(l *slog.Logger) *DeadLetterHandler {
	return &DeadLetterHandler{
		logger: l,
	}
}

func (d DeadLetterHandler) Handle(ctx context.Context, msg shiva.Message, err error) {
	d.logger.Error("Something went wrong",
		slog.String("err", err.Error()),
		slog.Group("kafka",
			slog.Any("msg", msg)))
}

// ConsumerHooks is a type that holds a reference to a logger and has methods
// conforming to all the hooks we care about from the Consumer. We could, of course,
// have had standalone functions, but this makes it cleaner and potentially more
// re-usable.
type ConsumerHooks struct {
	logger *slog.Logger
}

func NewConsumerHooks(l *slog.Logger) *ConsumerHooks {
	return &ConsumerHooks{
		logger: l,
	}
}

func (ch *ConsumerHooks) OnErr(err error) {
	ch.logger.Error("Kafka Consumer Error",
		slog.String("err", err.Error()))
}

func (ch *ConsumerHooks) OnOffsetsCommitted(offsets shiva.TopicPartitions, err error) {
	if err != nil {
		ch.logger.Error("Failed to commit offsets for one or more partitions",
			slog.String("err", err.Error()))
	}

	for _, offset := range offsets {
		ch.logger.Info("Offsets committed to Kafka",
			slog.Group("kafka",
				slog.String("topic", offset.Topic),
				slog.Int("partition", offset.Partition),
				slog.Int64("offset", offset.Offset)))
	}
}

func (ch *ConsumerHooks) OnAssigned(partitions shiva.TopicPartitions) {
	ch.logger.Info("A rebalance event occurred for consumer group")
	for _, partition := range partitions {
		ch.logger.Info("Consumer was assigned a topic/partition",
			slog.Group("kafka",
				slog.String("topic", partition.Topic),
				slog.Int("partition", partition.Partition)))
	}
}

func (ch *ConsumerHooks) OnRevoked(partitions shiva.TopicPartitions) {
	ch.logger.Info("A rebalance event occurred for consumer group")
	for _, partition := range partitions {
		ch.logger.Info("Brokers revoked assigned for topic/partition",
			slog.Group("kafka",
				slog.String("topic", partition.Topic),
				slog.Int("partition", partition.Partition)))
	}
}

func main() {

	logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
		AddSource: true,
		Level:     slog.LevelDebug,
	}))

	// Setup exporter to expose Consumer metrics to Prometheus
	promExporter, err := prometheus.New()
	if err != nil {
		panic(err)
	}
	defer promExporter.Shutdown(context.Background())

	meterProvider := metric.NewMeterProvider(metric.WithReader(promExporter))
	defer meterProvider.Shutdown(context.Background())

	// Setup Kafka configuration
	kafkaConfig := shiva.KafkaConfig{
		BootstrapServers:       []string{"localhost:9092"},
		GroupID:                "shiva-test",
		AutoOffsetReset:        shiva.Earliest,
		AcknowledgmentStrategy: shiva.AcknowledgmentStrategyPostProcessing,
	}

	var handler shiva.Handler
	handler = &ExampleHandler{}
	hooks := NewConsumerHooks(logger)
	dlHandler := NewDeadLetterHandler(logger)

	// Callback that is invoked whenever the Retry middleware encounters an error
	retryOnErr := func(err error) {
		fmt.Println(err)
	}

	// Initial the ConsumerTelemetryProvider so we get metrics from the Consumer
	telemetryProvider, err := shivaotel.NewConsumerTelemetryProvider(
		shivaotel.WithMeterProvider(meterProvider))
	if err != nil {
		panic(err)
	}

	// Wrap the ExampleHandler with Retry middleware and only retry if errors are
	// marked as retryable
	handler = shiva.Retry(handler,
		shiva.WithMaxAttempts(5),
		shiva.WithOnError(retryOnErr),
		shiva.WithRetryableErrorsOnly(true))(handler)

	// Initialize the Consumer with options for dead letter processing, and hooks/callbacks
	consumer, err := shiva.NewConsumer(kafkaConfig, "test", handler,
		shiva.WithOnOffsetsCommitted(hooks.OnOffsetsCommitted),
		shiva.WithOnErr(hooks.OnErr),
		shiva.WithOnAssigned(hooks.OnAssigned),
		shiva.WithOnRevoked(hooks.OnRevoked),
		shiva.WithDeadLetterHandler(dlHandler),
		shiva.WithConsumerTelemetryProvider(telemetryProvider),
		shiva.WithName("test-consumer"))
	if err != nil {
		panic(err)
	}

	// Start an http server for Prometheus to scrape
	promServer := http.Server{
		Addr:    ":8082",
		Handler: promhttp.Handler(),
	}
	go func() {
		// Don't ignore the error for real code
		_ = promServer.ListenAndServe()
	}()

	// Run the consumer
	go func() {
		err := consumer.Run()
		if err != nil {
			panic(err)
		}
	}()

	// Block forever, don't do this in real code
	select {}
}

Producer
package main

import (
	"context"
	"log/slog"
	"net/http"
	"os"
	"time"

	"github.com/google/uuid"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"go.opentelemetry.io/otel/exporters/prometheus"
	"go.opentelemetry.io/otel/sdk/metric"

	"github.com/jkratz55/shiva"
	"github.com/jkratz55/shiva/shivaotel"
)

func main() {
	logger := slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
		AddSource: true,
		Level:     slog.LevelDebug,
	}))

	// Setup exporter to expose Consumer metrics to Prometheus
	promExporter, err := prometheus.New()
	if err != nil {
		panic(err)
	}
	defer promExporter.Shutdown(context.Background())

	meterProvider := metric.NewMeterProvider(metric.WithReader(promExporter))
	defer meterProvider.Shutdown(context.Background())

	kafkaConfig := shiva.KafkaConfig{
		BootstrapServers: []string{"localhost:9092"},
		RequiredAcks:     shiva.AckLeader,
	}

	// Initial the ConsumerTelemetryProvider so we get metrics from the Consumer
	telemetryProvider, err := shivaotel.NewProducerTelemetryProvider(
		shivaotel.WithMeterProvider(meterProvider))
	if err != nil {
		panic(err)
	}

	producer, err := shiva.NewProducer(kafkaConfig,
		shiva.WithProducerTelemetryProvider(telemetryProvider))
	if err != nil {
		panic(err)
	}
	defer func() {
		producer.Flush(time.Second * 30)
		producer.Close()
	}()

	for i := 0; i < 100000; i++ {
		err := producer.M().
			Topic("test").
			Key(uuid.New().String()).
			Value("Hello World!").
			Produce(context.Background())
		if err != nil {
			logger.Error("Failed to produce message",
				slog.String("err", err.Error()))
		}
	}

	// Start an http server for Prometheus to scrape
	promServer := http.Server{
		Addr:    ":8082",
		Handler: promhttp.Handler(),
	}
	go func() {
		// Don't ignore the error for real code
		_ = promServer.ListenAndServe()
	}()

	// Block forever, don't do this in real code
	select {}
}

Documentation

Overview

Package shiva is a thin abstraction over the official Confluent Kafka Go client providing a simple api for consuming and producing messages.

Index

Constants

View Source
const (

	// UnlimitedRetries is the maximum integer value for the platform.
	//
	// Technically, UnlimitedRetries is not unlimited but from a practical perspective it operates
	// as if it were infinite.
	UnlimitedRetries = math.MaxInt
)

Variables

This section is empty.

Functions

func IsNil

func IsNil(v any) bool

IsNil checks if the v is nil

IsNil is useful as it handles the gotcha case of interfaces where an interface is only considered nil if both the type and value are nil. If the type is known but the value is nil a simple `if v != nil` of `if v == nil` check can result in unexpected results.

func IsRetryable

func IsRetryable(err error) bool

IsRetryable determines if the given error can and/or should be retried.

func Ptr

func Ptr[T any](v T) *T

Ptr returns a pointer to the instance passed in.

func StringPtr

func StringPtr(s string) *string

StringPtr returns a pointer to the string passed in. This is useful since the official confluent-kafka-go library uses pointers to strings in many places like topics and Go doesn't allow you to take the address of a string in a single statement.

func WrapAsRetryable

func WrapAsRetryable(e error) error

WrapAsRetryable wraps an error in a RetryableError, marking it as retryable.

Types

type AcknowledgmentStrategy

type AcknowledgmentStrategy string

AcknowledgmentStrategy defines the strategy for acknowledging messages.

const (
	// AcknowledgmentStrategyNone indicates messages are not acknowledged.
	AcknowledgmentStrategyNone AcknowledgmentStrategy = "none"

	// AcknowledgmentStrategyPreProcessing indicates messages are acknowledged after they've been
	// polled from Kafka but before the message is passed to the Handler to be processed. This is
	// often referred to as `at most once` delivery.
	AcknowledgmentStrategyPreProcessing AcknowledgmentStrategy = "pre-processing"

	// AcknowledgmentStrategyPostProcessing indicates messages are acknowledged only after the Handler
	// returns. This is often referred to as `at least once` delivery.
	AcknowledgmentStrategyPostProcessing AcknowledgmentStrategy = "post-processing"
)

func ParseAcknowledgmentStrategy

func ParseAcknowledgmentStrategy(strategy string) (AcknowledgmentStrategy, error)

ParseAcknowledgmentStrategy parses a string into an AcknowledgmentStrategy or returns an error if unsupported.

func (*AcknowledgmentStrategy) String

func (a *AcknowledgmentStrategy) String() string

func (*AcknowledgmentStrategy) UnmarshalText

func (a *AcknowledgmentStrategy) UnmarshalText(text []byte) error

UnmarshalText converts the provided text bytes into an AcknowledgmentStrategy value or returns an error if invalid.

type AutoOffsetReset

type AutoOffsetReset string
const (
	Earliest AutoOffsetReset = "earliest"
	Latest   AutoOffsetReset = "latest"
)

func ParseAutoOffsetReset

func ParseAutoOffsetReset(s string) (AutoOffsetReset, error)

func (*AutoOffsetReset) String

func (a *AutoOffsetReset) String() string

func (*AutoOffsetReset) UnmarshalText

func (a *AutoOffsetReset) UnmarshalText(text []byte) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides a high-level API for consuming messages from Kafka.

Consumer is a wrapper around the official Confluent Kafka GO client, which is a wrapper around librdkafka.

The zero-value for Consumer is not usable. Instances of Consumer should always be created/initialized using NewConsumer.

func NewConsumer

func NewConsumer(conf KafkaConfig, topic string, handler Handler, opts ...ConsumerOption) (*Consumer, error)

NewConsumer creates and initializes a Consumer instance.

NewConsumer initializes a Consumer and subscribes to the provided topic. NewConsumer does not start polling from Kafka, thus after initialization the Consumer will not join the consumer group and have topics/partitions assigned.

func (*Consumer) Assignment

func (c *Consumer) Assignment() (TopicPartitions, error)

Assignment returns the topics and partitions currently assigned to the Consumer.

Note that until the Consumer is running and polling, the Consumer won't have joined the group and won't have any assignments.

func (*Consumer) Close

func (c *Consumer) Close()

Close stops the Consumer and releases the underlying resources.

After Close the Consumer is not usable.

func (*Consumer) Commit

func (c *Consumer) Commit() (TopicPartitions, error)

Commit commits the highest currently stored offsets for each topic/partition back to Kafka.

func (*Consumer) Committed

func (c *Consumer) Committed(timeout time.Duration) (TopicPartitions, error)

Committed queries the Kafka brokers to get the latest committed offsets for the topics/partitions assigned to the Consumer.

func (*Consumer) GetWatermarkOffsets

func (c *Consumer) GetWatermarkOffsets() (map[string]Watermark, error)

GetWatermarkOffsets gets the lowest and highest offsets for each partition currently assigned to the Consumer.

The watermarks are returned as a map where the key is topic|partition.

Note that GetWatermarkOffsets return the watermarks based on the local client state. This is fine for most use-cases but if you need the absolute most up-to-date watermarks, use QueryWatermarkOffsets instead as it queries the Kafka brokers.

func (*Consumer) IsClosed

func (c *Consumer) IsClosed() bool

IsClosed indicates if the Consumer is closed.

func (*Consumer) IsRunning

func (c *Consumer) IsRunning() bool

IsRunning indicates if the Consumer is running.

func (*Consumer) Lag

func (c *Consumer) Lag() (map[string]int64, error)

Lag returns the current lag for each topic/partition assigned to the Consumer.

The lag is returned as a map[topic|partition]offset.

func (*Consumer) Pause

func (c *Consumer) Pause() error

Pause consumption for the topics and partitions currently assigned to the Consumer.

Note that messages already enqueued on the consumer's Event channel will NOT be purged by this call and will still be processed. Additionally, if a rebalance is triggered, the Consumer will resume fetching/polling from Kafka as the pause state is not persisted between rebalances.

func (*Consumer) Position

func (c *Consumer) Position() (TopicPartitions, error)

Position returns the next Offset the Consumer will read from for each topic/partition assigned to the Consumer.

Note that until the Consumer is running and polling, the Consumer won't have joined the group and won't have any assignments. Additionally, the offsets returned are based on the local state of the client and not what has been committed to Kafka.

func (*Consumer) QueryWatermarkOffsets

func (c *Consumer) QueryWatermarkOffsets(ctx context.Context) (map[string]Watermark, error)

QueryWatermarkOffsets fetches the lowest and highest offsets for each partition currently assigned to the Consumer from the Kafka brokers.

The watermarks are returned as a map where the key is topic|partition.

func (*Consumer) Resume

func (c *Consumer) Resume() error

Resume resumes consuming the topics/partitions previously paused that are assigned to the Consumer instance.

Note that calling Resume on topics/partitions that are not paused is a no-op.

func (*Consumer) Run

func (c *Consumer) Run() error

Run polls events from Kafka until either Close is called or the Consumer encounters a fatal error from which it cannot recover.

Run is blocking and will almost always be called in a separate goroutine. When Close is invoked or a fatal error occurs, the Consumer will attempt to commit any uncommitted offsets back to the Kafka brokers. If the Consumer encountered a fatal error or failed to commit offsets while closing a non-nil error value will be returned.

func (*Consumer) Subscription

func (c *Consumer) Subscription() ([]string, error)

Subscription returns the current topic the Consumer is subscribed to.

type ConsumerOption

type ConsumerOption interface {
	// contains filtered or unexported methods
}

func WithConsumerTelemetryProvider

func WithConsumerTelemetryProvider(provider ConsumerTelemetryProvider) ConsumerOption

WithConsumerTelemetryProvider sets an implementation of ConsumerTelemetryProvider the Consumer will use to record metrics/telemetry.

func WithDeadLetterHandler

func WithDeadLetterHandler(dlh DeadLetterHandler) ConsumerOption

WithDeadLetterHandler sets a DeadLetterHandler that is invoked by the Consumer when the Handler returns an error signaling it failed to process the message.

func WithOnAssigned

func WithOnAssigned(fn func(tps TopicPartitions)) ConsumerOption

WithOnAssigned sets a callback that will be invoked by the Consumer when a rebalance occurs and topics/partitions are assigned.

func WithOnOffsetsCommitted

func WithOnOffsetsCommitted(fn func(offsets TopicPartitions, err error)) ConsumerOption

WithOnOffsetsCommitted sets a callback that will be invoked by the Consumer when offsets are committed back to the Kafka brokers.

func WithOnRevoked

func WithOnRevoked(fn func(tps TopicPartitions)) ConsumerOption

WithOnRevoked sets a callback that will be invoked by the Consumer when a rebalance occurs and topics/partitions are revoked.

func WithStats

func WithStats(interval time.Duration, fn func(stats map[string]any)) ConsumerOption

WithStats enables statistics for the underlying Confluent Kafka / librdkafka client at the provided interval. On the provided interval stats will be fetched, and fn will be invoked with the results.

type ConsumerTelemetryProvider

type ConsumerTelemetryProvider interface {
	RecordMessageProcessed(handler string, topic string)
	RecordRebalance(handler string, groupId string)
	RecordHandlerError(handler string, topic string)
	RecordKafkaError(handler string, topic string, code int)
	RecordHandlerExecutionDuration(handler string, topic string, dur time.Duration)
	RecordLag(handler string, groupId string, topic string, partition string, lag int64)
	Trace(msg Message) (context.Context, func(err error))
}

type DeadLetterHandler

type DeadLetterHandler interface {
	Handle(ctx context.Context, msg Message, err error)
}

A DeadLetterHandler handles messages that could not be successfully processed by a Consumer.

DeadLetterHandler is invoked by the Consumer when the Handler returns an error from processing a Message from Kafka. Implementations of DeadLetterHandler should perform any actions required before the Consumer proceeds to the next Message. This can include logging, instrumentation, persisting the message to a database, or publishing the message to a dead letter topic to retry later.

If the DeadLetterHandler encounters an error handling an unprocessable message it should handle that error if possible and provide any diagnostics such as logging or instrumentation. Handle does not return an error as it is meaningless to the Consumer as it will move on the next event regardless.

type DeadLetterHandlerFunc

type DeadLetterHandlerFunc func(ctx context.Context, msg Message, err error)

The DeadLetterHandlerFunc type is an adapter to allow the use of ordinary functions as a DeadLetterHandler

func (DeadLetterHandlerFunc) Handle

func (d DeadLetterHandlerFunc) Handle(ctx context.Context, msg Message, err error)

type DeliveryReport

type DeliveryReport struct {
	Error     error
	ErrorCode int
	Topic     string
	Partition int
	Offset    int64
	Opaque    interface{}
}

DeliveryReport represents the result of producing a method to Kafka.

You must always check the Error field. If the value of Error is non-nil then the message was not delivered. On error scenarios the ErrorCode will be non-zero if an error code was available.

type Handler

type Handler interface {
	Handle(ctx context.Context, msg Message) error
}

A Handler handles and processes message from Kafka.

When the Consumer receives a message from Kafka it invokes the Handler to handle and process the Message. Implementations of Handler should perform any business rules and logic required for the given Message.

If the Handler fails to process a Message, it can return a non-nil error value to signal to the Consumer it failed to process the Message. The Consumer will then invoke the DeadLetterHandler if one is configured. A nil error value will always be interpreted as the Handler successfully processed the Message.

func WrapHandler

func WrapHandler(h Handler, mw ...Middleware) Handler

WrapHandler applies a chain of Middleware to a Handler, wrapping it in the order provided.

type HandlerFunc

type HandlerFunc func(ctx context.Context, msg Message) error

A HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, msg Message) error
type Header struct {
	Key   string
	Value []byte
}

Header represents a single Kafka message header.

Message headers are made up of a list of Header elements, retaining their original insert order and allowing for duplicate Keys.

Key is a human-readable string identifying the header. Value is the key's binary value, Kafka does not put any restrictions on the format of the Value, but it should be made relatively compact. The value may be a byte array, empty, or nil.

type KafkaConfig

type KafkaConfig struct {
	// The Kafka brokers addresses used to establish the initial connection to
	// Kafka. This is a required field.
	//
	// Applies To: Consumer, Producer
	BootstrapServers []string `env:"SHIVA_KAFKA_BOOTSTRAP_SERVERS,required"`

	// The ID of the consumer group to join. This is a required field when using
	// Consumer.
	//
	// Applies To: Consumer
	GroupID string `env:"SHIVA_KAFKA_GROUP_ID"`

	// Client group session and failure detection timeout. The consumer sends
	// periodic heartbeats to indicate its liveness to the broker. If no heart
	// beats are received by the broker for a group member within the session
	// timeout, the broker will remove the consumer from the group and trigger
	// a rebalance.
	//
	// The unit is milliseconds with a default of 45000 (45 seconds).
	//
	// Applies To: Consumer
	SessionTimeout time.Duration `env:"SHIVA_KAFKA_SESSION_TIMEOUT,default=45s"`

	// Interval at which the consumer sends heartbeats to the broker. The default
	// is 3000 (3 seconds).
	//
	// Applies To: Consumer
	HeartbeatInterval time.Duration `env:"SHIVA_KAFKA_HEARTBEAT_TIMEOUT,default=3s"`

	// The interval between committing offsets back to Kafka brokers. The default
	// is 5000ms (5 seconds).
	//
	// Applies To: Consumer
	CommitInterval time.Duration `env:"SHIVA_KAFKA_COMMIT_INTERVAL,default=5s"`

	// The amount of time to wait for an event when polling from Kafka. The default
	// 100ms.
	//
	// Applies To: Consumer
	PollTimeout time.Duration `env:"SHIVA_KAFKA_POLL_TIMEOUT,default=100ms"`

	// Configures the behavior when there are no stored offsets found for the
	// Consumer group for topic/partition.
	//
	// The default is latest which means that the consumer will start reading
	// from the latest message in the topic/partition.
	//
	// Applies To: Consumer
	AutoOffsetReset AutoOffsetReset `env:"SHIVA_KAFKA_AUTO_OFFSET_RESET,default=earliest"`

	// Configures when the Consumer acknowledges the message. The possible values
	// are:
	//
	//  none - Messages will not be acknowledged and no offsets will be tracked
	//  pre-processing - Messages will be acknowledged when received but before
	//                   the Handler has been invoked. This follows the at-most-once
	//                   delivery model.
	//  post-processing - Messages will be acknowledged only after the Handler has
	//                    returned, regardless if it succeeded or returned an error.
	//                    This follows the at-least-once delivery model.
	//
	// The default is post-processing.
	AcknowledgmentStrategy AcknowledgmentStrategy `env:"SHIVA_KAFKA_CONSUMER_ACK_STRATEGY,default=post-processing"`

	// The maximum size for a message. The default is 1048576 (1MB).
	//
	// Applies To: Consumer, Producer
	MessageMaxBytes int `env:"SHIVA_KAFKA_MESSAGE_MAX_BYTES,default=1048576"`

	// Maximum amount of data the broker shall return for a Fetch request. Messages
	// are fetched in batches by the consumer. The default is 52428800 (50MB).
	//
	// Applies To: Consumer
	MaxFetchBytes int `env:"KAFKA_MAX_FETCH_BYTES, default=52428800"`

	// The security protocol used to communicate with the brokers.
	//
	// Valid values are: plaintext, ssl, sasl_plaintext, sasl_ssl.
	//
	// Applies To: Consumer, Producer
	SecurityProtocol SecurityProtocol `env:"SHIVA_KAFKA_SECURITY_PROTOCOL,default=plaintext"`

	// The location of the certificate authority file used to verify the brokers.
	//
	// Applies To: Consumer, Producer
	CertificateAuthorityLocation string `env:"SHIVA_KAFKA_CERT_AUTHORITY_LOCATION"`

	// The location of the client certificate used to authenticate with the brokers.
	//
	// Applies To: Consumer, Producer
	CertificateLocation string `env:"SHIVA_KAFKA_CERT_LOCATION"`

	// The location of the key for the client certificate.
	//
	// Applies To: Consumer, Producer
	CertificateKeyLocation string `env:"SHIVA_KAFKA_CERT_KEY_LOCATION"`

	// The password for the key used for the client certificate.
	//
	// Applies To: Consumer, Producer
	CertificateKeyPassword string `env:"SHIVA_KAFKA_CERT_KEY_PASSWORD"`

	// Skip TLS verification when using SSL or SASL_SSL.
	//
	// Applies To: Consumer, Producer
	SkipTlsVerification bool `env:"SHIVA_KAFKA_SKIP_TLS_VERIFICATION,default=false"`

	// The SASL mechanism to use for SASL authentication.
	//
	// Applies To: Consumer, Producer
	SASLMechanism SaslMechanism `env:"SHIVA_KAFKA_SASL_MECHANISM,default=PLAIN"`

	// The username for authenticating with SASL.
	//
	// Applies To: Consumer, Producer
	SASLUsername string `env:"SHIVA_KAFKA_SASL_USERNAME"`

	// The password for authenticating with SASL.
	//
	// Applies To: Consumer, Producer
	SASLPassword string `env:"SHIVA_KAFKA_SASL_PASSWORD"`

	// The number of acknowledgements the producer requires the leader to have
	// received before considering a request complete. The default value is
	// AckAll ("all"), which will wait for all in-sync replicas to acknowledge
	// before proceeding.
	//
	// Applies To: Producer
	RequiredAcks RequiredAck `env:"SHIVA_KAFKA_PRODUCER_REQUIRED_ACKS, default=all"`

	// When set to `true`, the producer will ensure that messages are successfully
	// produced exactly once and in the original produce order.
	//
	// The following configuration properties are adjusted automatically (if not modified
	// by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5`
	// (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0),
	// `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied
	// configuration is incompatible.
	//
	// Applies To: Producer
	Idempotence bool `env:"SHIVA_KAFKA_PRODUCER_IDEMPOTENCE, default=false"`

	// The transactional ID to use for messages produced by the producer. This
	// is used to ensure that messages are produced atomically and in order. This
	// is required when using transactions.
	//
	// Applies To: Producer
	TransactionID string `env:"SHIVA_KAFKA_PRODUCER_TRANSACTION_ID"`
}

KafkaConfig represents the configuration settings for a Kafka client, including connection, authentication, and behavior options.

func ConfigFromEnv

func ConfigFromEnv() (*KafkaConfig, error)

ConfigFromEnv loads the configuration for the Kafka client from the environment.

type Logger

type Logger interface {
	Debug(msg string, kvs ...interface{})
	Info(msg string, kvs ...interface{})
	Warn(msg string, kvs ...interface{})
	Error(msg string, kvs ...interface{})
}

type Message

type Message struct {
	Topic     string
	Partition int
	Offset    int64
	Key       []byte
	Value     []byte
	Timestamp time.Time
	Headers   []Header
	Opaque    interface{}
}

Message represents a single message from Kafka.

type MessageBuilder

type MessageBuilder struct {
	// contains filtered or unexported fields
}

MessageBuilder represents a Kafka message and provides a fluent API for constructing a message to be produced to Kafka.

func (*MessageBuilder) Err

func (m *MessageBuilder) Err() error

Err returns the last error that occurred while building the message or nil if there were no errors.

func (*MessageBuilder) Header

func (m *MessageBuilder) Header(key string, value []byte) *MessageBuilder

Header adds a header to the message.

func (*MessageBuilder) JSON

func (m *MessageBuilder) JSON(v any) *MessageBuilder

JSON serializes the provided value to JSON and sets it as the value for the message.

func (*MessageBuilder) Key

Key sets the key for the message.

func (*MessageBuilder) Message

func (m *MessageBuilder) Message() (Message, error)

Message builds and returns a *kafka.Message instance from the Confluent Kafka library.

func (*MessageBuilder) Opaque

func (m *MessageBuilder) Opaque(o interface{}) *MessageBuilder

Opaque sets the opaque value for the message.

func (*MessageBuilder) Produce

func (m *MessageBuilder) Produce(ctx context.Context) error

Produce produces a message to Kafka and waits for the delivery report.

This method is blocking and will wait until the delivery report is received from Kafka.

func (*MessageBuilder) ProduceAsync

func (m *MessageBuilder) ProduceAsync(ctx context.Context, deliveryCh chan DeliveryReport) error

ProduceAsync produces a message to Kafka asynchronously and returns immediately if the message was enqueued successfully, otherwise returns an error. The delivery report is delivered on the provided channel. If the channel is nil than Send operates as fire-and-forget.

func (*MessageBuilder) Topic

func (m *MessageBuilder) Topic(t string) *MessageBuilder

Topic sets the topic for the message.

func (*MessageBuilder) Value

func (m *MessageBuilder) Value(v interface{}) *MessageBuilder

Value sets the value for the message.

The behavior of Value varies based on the type of v:

[]byte - uses value as is
string - cast the value as []byte
encoding.BinaryMarshaler - invokes MarshalBinary and uses the resulting []byte

If v is not any of the above types Value will attempt to marshal v as JSON.

type Middleware

type Middleware func(Handler) Handler

Middleware is a function type that wraps a Handler to enable intercepting a Handler.

func ChainMiddlewares

func ChainMiddlewares(hs ...Middleware) Middleware

ChainMiddlewares chains multiple Middleware functions into a single Middleware by applying them in reverse order.

type NopConsumerTelemetryProvider

type NopConsumerTelemetryProvider struct {
}

func (NopConsumerTelemetryProvider) RecordHandlerError

func (n NopConsumerTelemetryProvider) RecordHandlerError(_ string, _ string)

func (NopConsumerTelemetryProvider) RecordHandlerExecutionDuration

func (n NopConsumerTelemetryProvider) RecordHandlerExecutionDuration(_ string, _ string, _ time.Duration)

func (NopConsumerTelemetryProvider) RecordKafkaError

func (n NopConsumerTelemetryProvider) RecordKafkaError(_ string, _ string, _ int)

func (NopConsumerTelemetryProvider) RecordLag

func (n NopConsumerTelemetryProvider) RecordLag(_ string, _ string, _ string, _ string, _ int64)

func (NopConsumerTelemetryProvider) RecordMessageProcessed

func (n NopConsumerTelemetryProvider) RecordMessageProcessed(_ string, _ string)

func (NopConsumerTelemetryProvider) RecordRebalance

func (n NopConsumerTelemetryProvider) RecordRebalance(_ string, _ string)

func (NopConsumerTelemetryProvider) Trace

func (n NopConsumerTelemetryProvider) Trace(_ Message) (context.Context, func(err error))

type NopLogger

type NopLogger struct{}

NopLogger is a no-operation logger that implements logging methods without performing any actual logging.

func NewNopLogger

func NewNopLogger() *NopLogger

func (NopLogger) Debug

func (n NopLogger) Debug(_ string, _ ...interface{})

func (NopLogger) Error

func (n NopLogger) Error(_ string, _ ...interface{})

func (NopLogger) Info

func (n NopLogger) Info(_ string, _ ...interface{})

func (NopLogger) Warn

func (n NopLogger) Warn(_ string, _ ...interface{})

type NopProducerTelemetryProvider

type NopProducerTelemetryProvider struct{}

func (NopProducerTelemetryProvider) RecordDeliveryEnqueueError

func (n NopProducerTelemetryProvider) RecordDeliveryEnqueueError(topic string)

func (NopProducerTelemetryProvider) RecordDeliveryError

func (n NopProducerTelemetryProvider) RecordDeliveryError(topic string)

func (NopProducerTelemetryProvider) RecordKafkaError

func (n NopProducerTelemetryProvider) RecordKafkaError(code int)

func (NopProducerTelemetryProvider) RecordMessageDelivered

func (n NopProducerTelemetryProvider) RecordMessageDelivered(topic string)

func (NopProducerTelemetryProvider) Trace

func (NopProducerTelemetryProvider) TraceDelivery

func (n NopProducerTelemetryProvider) TraceDelivery(ctx context.Context) (context.Context, func(err error))

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets a Logger to be used for a component.

func WithName

func WithName(name string) Option

WithName sets a name or identifier for a producer or consumer.

func WithOnErr

func WithOnErr(fn func(err error)) Option

WithOnErr sets a callback invoked on an error for a component.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(conf KafkaConfig, opts ...ProducerOption) (*Producer, error)

func (*Producer) Close

func (p *Producer) Close()

Close stops the producer and releases any resources. A Producer is not usable after this method is called.

func (*Producer) Flush

func (p *Producer) Flush(timeout time.Duration) int

Flush and wait for outstanding messages and requests to complete delivery. Runs until value reaches zero or timeout is exceeded. Returns the number of outstanding events still un-flushed.

func (*Producer) IsClosed

func (p *Producer) IsClosed() bool

IsClosed returns true if the producer has been closed, otherwise false.

func (*Producer) Len

func (p *Producer) Len() int

Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application.

func (*Producer) M

func (p *Producer) M() *MessageBuilder

M returns a MessageBuilder that provides a fluent API for building and sending a message.

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, m Message) error

Produce produces a message to Kafka synchronously. The message will be queued for transmission to the Kafka brokers, and Produce will block until the message has been delivered to the Kafka brokers or an error has occurred.

Produce will return an error under the following conditions:

1. The message could not be queued for delivery. 2. The message could not be delivered to Kafka. 3. The context was canceled or exceeded its deadline.

It is important to note that the underlying Confluent Kafka GO client does not use or respect the context. If the context is canceled or exceeds its deadline, it does not interrupt the delivery of the message. Instead, it simply aborts waiting on the delivery report from Kafka. Generally speaking, the context is mainly used for tracing purposes.

func (*Producer) ProduceAsync

func (p *Producer) ProduceAsync(ctx context.Context, m Message, deliveryCh chan DeliveryReport) error

ProduceAsync produces a message to Kafka asynchronously. The message will be transmitted to the Kafka brokers, and the delivery report will be sent to the provided delivery channel.

ProduceAsync returns immediately after the message has been queued to be transmitted to the Kafka brokers. The delivery report will be sent to the provided delivery channel when the message has been successfully transmitted to the Kafka brokers or an error has occurred.

ProduceAsync returns an error only if the message could not be queued for delivery but does not indicate whether the message was successfully transmitted to the Kafka brokers. The caller must check the delivery report to determine if the message was successfully transmitted to the Kafka brokers.

Note providing a nil delivery channel will cause the delivery report to be discarded effectively making ProduceAsync a fire and forget operation.

The context is used for tracing purposes. The context being canceled or exceeding its deadline has no effect. This is because the underlying Confluent Kafka GO client does not use or respect the context.

func (*Producer) Purge

func (p *Producer) Purge() error

Purge removes all messages within librdkafka's internal queue waiting to be transmitted to the Kafka brokers.

Note that Purge does not remove any messages that have already been delivered to the Kafka brokers. This method should be used with extreme caution unless you don't care if messages are delivered or not.

func (*Producer) Transactional

func (p *Producer) Transactional(ctx context.Context, messages []Message) error

type ProducerOption

type ProducerOption interface {
	// contains filtered or unexported methods
}

func WithProducerTelemetryProvider

func WithProducerTelemetryProvider(provider ProducerTelemetryProvider) ProducerOption

WithProducerTelemetryProvider sets an implementation of ProducerTelemetryProvider the Producer will use to record metrics/telemetry.

type ProducerTelemetryProvider

type ProducerTelemetryProvider interface {
	RecordMessageDelivered(topic string)
	RecordDeliveryError(topic string)
	RecordKafkaError(code int)
	RecordDeliveryEnqueueError(topic string)
	Trace(ctx context.Context, msg *kafka.Message) (context.Context, func(err error))
	TraceDelivery(ctx context.Context) (context.Context, func(err error))
}

type RequiredAck

type RequiredAck string
const (
	// AckNone disables acknowledgements from the brokers. The producer will not
	// wait for any acknowledgement from the broker and the broker does not wait
	// for the message to be written before it responds.
	AckNone RequiredAck = "none"

	// AckLeader ensures the leader broker must receive the record and successfully
	// write it to its local log before responding.
	AckLeader RequiredAck = "leader"

	// AckAll ensures the leader and all in-sync replicas must receive the record
	// and successfully write it to their local log before responding.
	AckAll RequiredAck = "all"
)

func ParseRequiredAck

func ParseRequiredAck(s string) (RequiredAck, error)

func (*RequiredAck) UnmarshalText

func (a *RequiredAck) UnmarshalText(text []byte) error

type RetryHandler

type RetryHandler struct {
	// contains filtered or unexported fields
}

RetryHandler is a middleware that wraps a Handler to provide retry logic with configurable behavior and backoff.

func Retry

func Retry(next Handler, opts ...RetryOption) *RetryHandler

Retry is a middleware for a Handler that will retry processing messages when the Handler implementation returns an error up to the max attempts. By default, Retry will make at most five attempts but that can be configured by passing one or many RetryOption. An exponential backoff is used between attempts starting with the initial delay to give the system a chance to recover for transient errors. The default initial delay is 500ms with a max delay of 10s.

By default, all errors will be retried. However, using the WithRetryableErrorsOnly option the middleware can be configured to only retry errors marked as retryable which can be achieved by wrapping error that should be retried with the WrapAsRetryable func.

Retry will return all errors if the max attempts is exhausted without success, but once the operation succeeds the previous errors are discard. It is recommended to either handle logging and instrumentation within the Handler implementation or utilize the WithOnError RetryOption which is a callback invoked whenever an error is returned from the Handler.

A panic will occur if next is nil.

func (*RetryHandler) Handle

func (r *RetryHandler) Handle(ctx context.Context, msg Message) error

type RetryOption

type RetryOption func(*retryConfig)

RetryOption defines a functional option type used to configure retry behavior for the Retry mechanism.

func WithInitialDelay

func WithInitialDelay(initialDelay time.Duration) RetryOption

WithInitialDelay sets the initial delay before the first retry.

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int) RetryOption

WithMaxAttempts sets the maximum number of attempts for the operation before giving up.

Panics if maxAttempts is less than 1.

func WithMaxDelay

func WithMaxDelay(maxDelay time.Duration) RetryOption

WithMaxDelay sets the max delay between retries effectively capping the max time to delay between attempts.

func WithOnError

func WithOnError(onError func(err error)) RetryOption

WithOnError sets a callback function that gets invoked when the next Handler in the chain returns an error. Passing nil is a no-op.

func WithRetryableErrorsOnly

func WithRetryableErrorsOnly(retryableErrorsOnly bool) RetryOption

WithRetryableErrorsOnly restricts retries to only occur for retryable errors when set to true.

type RetryableError

type RetryableError struct {
	// contains filtered or unexported fields
}

func (RetryableError) Error

func (r RetryableError) Error() string

func (RetryableError) IsRetryable

func (r RetryableError) IsRetryable() bool

func (RetryableError) Unwrap

func (r RetryableError) Unwrap() error

type SaslMechanism

type SaslMechanism string
const (
	Plain       SaslMechanism = "PLAIN"
	ScramSha256 SaslMechanism = "SCRAM-SHA-256"
	ScramSha512 SaslMechanism = "SCRAM-SHA-512"
)

func ParseSaslMechanism

func ParseSaslMechanism(s string) (SaslMechanism, error)

func (*SaslMechanism) String

func (sm *SaslMechanism) String() string

func (*SaslMechanism) UnmarshalText

func (sm *SaslMechanism) UnmarshalText(b []byte) error

type SecurityProtocol

type SecurityProtocol string
const (
	Plaintext     SecurityProtocol = "plaintext"
	Ssl           SecurityProtocol = "ssl"
	SaslPlaintext SecurityProtocol = "sasl_plaintext"
	SaslSsl       SecurityProtocol = "sasl_ssl"
)

func ParseSecurityProtocol

func ParseSecurityProtocol(s string) (SecurityProtocol, error)

func (*SecurityProtocol) String

func (sp *SecurityProtocol) String() string

func (*SecurityProtocol) UnmarshalText

func (sp *SecurityProtocol) UnmarshalText(b []byte) error

type SlogAdapter

type SlogAdapter struct {
	// contains filtered or unexported fields
}

SlogAdapter is a structured logging adapter wrapping a slog.Logger. It provides methods for logging at different levels with key-value pairs.

func NewSlogAdapter

func NewSlogAdapter(logger *slog.Logger) *SlogAdapter

NewSlogAdapter initializes a new instance of SlogAdapter wrapping the provided slog.Logger.

Panics if logger is nil

func (SlogAdapter) Debug

func (s SlogAdapter) Debug(msg string, kvs ...interface{})

func (SlogAdapter) Error

func (s SlogAdapter) Error(msg string, kvs ...interface{})

func (SlogAdapter) Info

func (s SlogAdapter) Info(msg string, kvs ...interface{})

func (SlogAdapter) Warn

func (s SlogAdapter) Warn(msg string, kvs ...interface{})

type Stats

type Stats = kafka.Stats

Stats represents statistics from Confluent Kafka / librdkafka client.

type StdLogAdapter

type StdLogAdapter struct {
	// contains filtered or unexported fields
}

func NewStdLogAdapter

func NewStdLogAdapter(logger *log.Logger) *StdLogAdapter

func (StdLogAdapter) Debug

func (s StdLogAdapter) Debug(msg string, kvs ...interface{})

func (StdLogAdapter) Error

func (s StdLogAdapter) Error(msg string, kvs ...interface{})

func (StdLogAdapter) Info

func (s StdLogAdapter) Info(msg string, kvs ...interface{})

func (StdLogAdapter) Warn

func (s StdLogAdapter) Warn(msg string, kvs ...interface{})

type TopicPartition

type TopicPartition struct {
	Topic     string
	Partition int
	Offset    int64
}

TopicPartition represents a topic and partition along with its offset.

type TopicPartitions

type TopicPartitions []TopicPartition

TopicPartitions represents an array of TopicPartition

type Watermark

type Watermark struct {
	Low  int64
	High int64
}

Watermark is a data structure representing the lowest and highest offsets for a topic/partition.

Directories

Path Synopsis
examples
consumer command
otel command
populate command
producer command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL