Documentation
¶
Overview ¶
Package shiva is a thin abstraction over the official Confluent Kafka Go client providing a simple api for consuming and producing messages.
Index ¶
- Constants
- func IsNil(v any) bool
- func IsRetryable(err error) bool
- func Ptr[T any](v T) *T
- func StringPtr(s string) *string
- func WrapAsRetryable(e error) error
- type AcknowledgmentStrategy
- type AutoOffsetReset
- type Consumer
- func (c *Consumer) Assignment() (TopicPartitions, error)
- func (c *Consumer) Close()
- func (c *Consumer) Commit() (TopicPartitions, error)
- func (c *Consumer) Committed(timeout time.Duration) (TopicPartitions, error)
- func (c *Consumer) GetWatermarkOffsets() (map[string]Watermark, error)
- func (c *Consumer) IsClosed() bool
- func (c *Consumer) IsRunning() bool
- func (c *Consumer) Lag() (map[string]int64, error)
- func (c *Consumer) Pause() error
- func (c *Consumer) Position() (TopicPartitions, error)
- func (c *Consumer) QueryWatermarkOffsets(ctx context.Context) (map[string]Watermark, error)
- func (c *Consumer) Resume() error
- func (c *Consumer) Run() error
- func (c *Consumer) Subscription() ([]string, error)
- type ConsumerOption
- func WithConsumerTelemetryProvider(provider ConsumerTelemetryProvider) ConsumerOption
- func WithDeadLetterHandler(dlh DeadLetterHandler) ConsumerOption
- func WithOnAssigned(fn func(tps TopicPartitions)) ConsumerOption
- func WithOnOffsetsCommitted(fn func(offsets TopicPartitions, err error)) ConsumerOption
- func WithOnRevoked(fn func(tps TopicPartitions)) ConsumerOption
- func WithStats(interval time.Duration, fn func(stats map[string]any)) ConsumerOption
- type ConsumerTelemetryProvider
- type DeadLetterHandler
- type DeadLetterHandlerFunc
- type DeliveryReport
- type Handler
- type HandlerFunc
- type Header
- type KafkaConfig
- type Logger
- type Message
- type MessageBuilder
- func (m *MessageBuilder) Err() error
- func (m *MessageBuilder) Header(key string, value []byte) *MessageBuilder
- func (m *MessageBuilder) JSON(v any) *MessageBuilder
- func (m *MessageBuilder) Key(k string) *MessageBuilder
- func (m *MessageBuilder) Message() (Message, error)
- func (m *MessageBuilder) Opaque(o interface{}) *MessageBuilder
- func (m *MessageBuilder) Produce(ctx context.Context) error
- func (m *MessageBuilder) ProduceAsync(ctx context.Context, deliveryCh chan DeliveryReport) error
- func (m *MessageBuilder) Topic(t string) *MessageBuilder
- func (m *MessageBuilder) Value(v interface{}) *MessageBuilder
- type Middleware
- type NopConsumerTelemetryProvider
- func (n NopConsumerTelemetryProvider) RecordHandlerError(_ string, _ string)
- func (n NopConsumerTelemetryProvider) RecordHandlerExecutionDuration(_ string, _ string, _ time.Duration)
- func (n NopConsumerTelemetryProvider) RecordKafkaError(_ string, _ string, _ int)
- func (n NopConsumerTelemetryProvider) RecordLag(_ string, _ string, _ string, _ string, _ int64)
- func (n NopConsumerTelemetryProvider) RecordMessageProcessed(_ string, _ string)
- func (n NopConsumerTelemetryProvider) RecordRebalance(_ string, _ string)
- func (n NopConsumerTelemetryProvider) Trace(_ Message) (context.Context, func(err error))
- type NopLogger
- type NopProducerTelemetryProvider
- func (n NopProducerTelemetryProvider) RecordDeliveryEnqueueError(topic string)
- func (n NopProducerTelemetryProvider) RecordDeliveryError(topic string)
- func (n NopProducerTelemetryProvider) RecordKafkaError(code int)
- func (n NopProducerTelemetryProvider) RecordMessageDelivered(topic string)
- func (n NopProducerTelemetryProvider) Trace(ctx context.Context, msg *kafka.Message) (context.Context, func(err error))
- func (n NopProducerTelemetryProvider) TraceDelivery(ctx context.Context) (context.Context, func(err error))
- type Option
- type Producer
- func (p *Producer) Close()
- func (p *Producer) Flush(timeout time.Duration) int
- func (p *Producer) IsClosed() bool
- func (p *Producer) Len() int
- func (p *Producer) M() *MessageBuilder
- func (p *Producer) Produce(ctx context.Context, m Message) error
- func (p *Producer) ProduceAsync(ctx context.Context, m Message, deliveryCh chan DeliveryReport) error
- func (p *Producer) Purge() error
- func (p *Producer) Transactional(ctx context.Context, messages []Message) error
- type ProducerOption
- type ProducerTelemetryProvider
- type RequiredAck
- type RetryHandler
- type RetryOption
- type RetryableError
- type SaslMechanism
- type SecurityProtocol
- type SlogAdapter
- type Stats
- type StdLogAdapter
- type TopicPartition
- type TopicPartitions
- type Watermark
Constants ¶
const ( // UnlimitedRetries is the maximum integer value for the platform. // // Technically, UnlimitedRetries is not unlimited but from a practical perspective it operates // as if it were infinite. UnlimitedRetries = math.MaxInt )
Variables ¶
This section is empty.
Functions ¶
func IsNil ¶
IsNil checks if the v is nil
IsNil is useful as it handles the gotcha case of interfaces where an interface is only considered nil if both the type and value are nil. If the type is known but the value is nil a simple `if v != nil` of `if v == nil` check can result in unexpected results.
func IsRetryable ¶
IsRetryable determines if the given error can and/or should be retried.
func StringPtr ¶
StringPtr returns a pointer to the string passed in. This is useful since the official confluent-kafka-go library uses pointers to strings in many places like topics and Go doesn't allow you to take the address of a string in a single statement.
func WrapAsRetryable ¶
WrapAsRetryable wraps an error in a RetryableError, marking it as retryable.
Types ¶
type AcknowledgmentStrategy ¶
type AcknowledgmentStrategy string
AcknowledgmentStrategy defines the strategy for acknowledging messages.
const ( // AcknowledgmentStrategyNone indicates messages are not acknowledged. AcknowledgmentStrategyNone AcknowledgmentStrategy = "none" // AcknowledgmentStrategyPreProcessing indicates messages are acknowledged after they've been // polled from Kafka but before the message is passed to the Handler to be processed. This is // often referred to as `at most once` delivery. AcknowledgmentStrategyPreProcessing AcknowledgmentStrategy = "pre-processing" // AcknowledgmentStrategyPostProcessing indicates messages are acknowledged only after the Handler // returns. This is often referred to as `at least once` delivery. AcknowledgmentStrategyPostProcessing AcknowledgmentStrategy = "post-processing" )
func ParseAcknowledgmentStrategy ¶
func ParseAcknowledgmentStrategy(strategy string) (AcknowledgmentStrategy, error)
ParseAcknowledgmentStrategy parses a string into an AcknowledgmentStrategy or returns an error if unsupported.
func (*AcknowledgmentStrategy) String ¶
func (a *AcknowledgmentStrategy) String() string
func (*AcknowledgmentStrategy) UnmarshalText ¶
func (a *AcknowledgmentStrategy) UnmarshalText(text []byte) error
UnmarshalText converts the provided text bytes into an AcknowledgmentStrategy value or returns an error if invalid.
type AutoOffsetReset ¶
type AutoOffsetReset string
const ( Earliest AutoOffsetReset = "earliest" Latest AutoOffsetReset = "latest" )
func ParseAutoOffsetReset ¶
func ParseAutoOffsetReset(s string) (AutoOffsetReset, error)
func (*AutoOffsetReset) String ¶
func (a *AutoOffsetReset) String() string
func (*AutoOffsetReset) UnmarshalText ¶
func (a *AutoOffsetReset) UnmarshalText(text []byte) error
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer provides a high-level API for consuming messages from Kafka.
Consumer is a wrapper around the official Confluent Kafka GO client, which is a wrapper around librdkafka.
The zero-value for Consumer is not usable. Instances of Consumer should always be created/initialized using NewConsumer.
func NewConsumer ¶
func NewConsumer(conf KafkaConfig, topic string, handler Handler, opts ...ConsumerOption) (*Consumer, error)
NewConsumer creates and initializes a Consumer instance.
NewConsumer initializes a Consumer and subscribes to the provided topic. NewConsumer does not start polling from Kafka, thus after initialization the Consumer will not join the consumer group and have topics/partitions assigned.
func (*Consumer) Assignment ¶
func (c *Consumer) Assignment() (TopicPartitions, error)
Assignment returns the topics and partitions currently assigned to the Consumer.
Note that until the Consumer is running and polling, the Consumer won't have joined the group and won't have any assignments.
func (*Consumer) Close ¶
func (c *Consumer) Close()
Close stops the Consumer and releases the underlying resources.
After Close the Consumer is not usable.
func (*Consumer) Commit ¶
func (c *Consumer) Commit() (TopicPartitions, error)
Commit commits the highest currently stored offsets for each topic/partition back to Kafka.
func (*Consumer) Committed ¶
func (c *Consumer) Committed(timeout time.Duration) (TopicPartitions, error)
Committed queries the Kafka brokers to get the latest committed offsets for the topics/partitions assigned to the Consumer.
func (*Consumer) GetWatermarkOffsets ¶
GetWatermarkOffsets gets the lowest and highest offsets for each partition currently assigned to the Consumer.
The watermarks are returned as a map where the key is topic|partition.
Note that GetWatermarkOffsets return the watermarks based on the local client state. This is fine for most use-cases but if you need the absolute most up-to-date watermarks, use QueryWatermarkOffsets instead as it queries the Kafka brokers.
func (*Consumer) Lag ¶
Lag returns the current lag for each topic/partition assigned to the Consumer.
The lag is returned as a map[topic|partition]offset.
func (*Consumer) Pause ¶
Pause consumption for the topics and partitions currently assigned to the Consumer.
Note that messages already enqueued on the consumer's Event channel will NOT be purged by this call and will still be processed. Additionally, if a rebalance is triggered, the Consumer will resume fetching/polling from Kafka as the pause state is not persisted between rebalances.
func (*Consumer) Position ¶
func (c *Consumer) Position() (TopicPartitions, error)
Position returns the next Offset the Consumer will read from for each topic/partition assigned to the Consumer.
Note that until the Consumer is running and polling, the Consumer won't have joined the group and won't have any assignments. Additionally, the offsets returned are based on the local state of the client and not what has been committed to Kafka.
func (*Consumer) QueryWatermarkOffsets ¶
QueryWatermarkOffsets fetches the lowest and highest offsets for each partition currently assigned to the Consumer from the Kafka brokers.
The watermarks are returned as a map where the key is topic|partition.
func (*Consumer) Resume ¶
Resume resumes consuming the topics/partitions previously paused that are assigned to the Consumer instance.
Note that calling Resume on topics/partitions that are not paused is a no-op.
func (*Consumer) Run ¶
Run polls events from Kafka until either Close is called or the Consumer encounters a fatal error from which it cannot recover.
Run is blocking and will almost always be called in a separate goroutine. When Close is invoked or a fatal error occurs, the Consumer will attempt to commit any uncommitted offsets back to the Kafka brokers. If the Consumer encountered a fatal error or failed to commit offsets while closing a non-nil error value will be returned.
func (*Consumer) Subscription ¶
Subscription returns the current topic the Consumer is subscribed to.
type ConsumerOption ¶
type ConsumerOption interface {
// contains filtered or unexported methods
}
func WithConsumerTelemetryProvider ¶
func WithConsumerTelemetryProvider(provider ConsumerTelemetryProvider) ConsumerOption
WithConsumerTelemetryProvider sets an implementation of ConsumerTelemetryProvider the Consumer will use to record metrics/telemetry.
func WithDeadLetterHandler ¶
func WithDeadLetterHandler(dlh DeadLetterHandler) ConsumerOption
WithDeadLetterHandler sets a DeadLetterHandler that is invoked by the Consumer when the Handler returns an error signaling it failed to process the message.
func WithOnAssigned ¶
func WithOnAssigned(fn func(tps TopicPartitions)) ConsumerOption
WithOnAssigned sets a callback that will be invoked by the Consumer when a rebalance occurs and topics/partitions are assigned.
func WithOnOffsetsCommitted ¶
func WithOnOffsetsCommitted(fn func(offsets TopicPartitions, err error)) ConsumerOption
WithOnOffsetsCommitted sets a callback that will be invoked by the Consumer when offsets are committed back to the Kafka brokers.
func WithOnRevoked ¶
func WithOnRevoked(fn func(tps TopicPartitions)) ConsumerOption
WithOnRevoked sets a callback that will be invoked by the Consumer when a rebalance occurs and topics/partitions are revoked.
func WithStats ¶
func WithStats(interval time.Duration, fn func(stats map[string]any)) ConsumerOption
WithStats enables statistics for the underlying Confluent Kafka / librdkafka client at the provided interval. On the provided interval stats will be fetched, and fn will be invoked with the results.
type ConsumerTelemetryProvider ¶
type ConsumerTelemetryProvider interface {
RecordMessageProcessed(handler string, topic string)
RecordRebalance(handler string, groupId string)
RecordHandlerError(handler string, topic string)
RecordKafkaError(handler string, topic string, code int)
RecordHandlerExecutionDuration(handler string, topic string, dur time.Duration)
RecordLag(handler string, groupId string, topic string, partition string, lag int64)
Trace(msg Message) (context.Context, func(err error))
}
type DeadLetterHandler ¶
A DeadLetterHandler handles messages that could not be successfully processed by a Consumer.
DeadLetterHandler is invoked by the Consumer when the Handler returns an error from processing a Message from Kafka. Implementations of DeadLetterHandler should perform any actions required before the Consumer proceeds to the next Message. This can include logging, instrumentation, persisting the message to a database, or publishing the message to a dead letter topic to retry later.
If the DeadLetterHandler encounters an error handling an unprocessable message it should handle that error if possible and provide any diagnostics such as logging or instrumentation. Handle does not return an error as it is meaningless to the Consumer as it will move on the next event regardless.
type DeadLetterHandlerFunc ¶
The DeadLetterHandlerFunc type is an adapter to allow the use of ordinary functions as a DeadLetterHandler
type DeliveryReport ¶
type DeliveryReport struct {
Error error
ErrorCode int
Topic string
Partition int
Offset int64
Opaque interface{}
}
DeliveryReport represents the result of producing a method to Kafka.
You must always check the Error field. If the value of Error is non-nil then the message was not delivered. On error scenarios the ErrorCode will be non-zero if an error code was available.
type Handler ¶
A Handler handles and processes message from Kafka.
When the Consumer receives a message from Kafka it invokes the Handler to handle and process the Message. Implementations of Handler should perform any business rules and logic required for the given Message.
If the Handler fails to process a Message, it can return a non-nil error value to signal to the Consumer it failed to process the Message. The Consumer will then invoke the DeadLetterHandler if one is configured. A nil error value will always be interpreted as the Handler successfully processed the Message.
func WrapHandler ¶
func WrapHandler(h Handler, mw ...Middleware) Handler
WrapHandler applies a chain of Middleware to a Handler, wrapping it in the order provided.
type HandlerFunc ¶
A HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler.
type Header ¶
Header represents a single Kafka message header.
Message headers are made up of a list of Header elements, retaining their original insert order and allowing for duplicate Keys.
Key is a human-readable string identifying the header. Value is the key's binary value, Kafka does not put any restrictions on the format of the Value, but it should be made relatively compact. The value may be a byte array, empty, or nil.
type KafkaConfig ¶
type KafkaConfig struct {
// The Kafka brokers addresses used to establish the initial connection to
// Kafka. This is a required field.
//
// Applies To: Consumer, Producer
BootstrapServers []string `env:"SHIVA_KAFKA_BOOTSTRAP_SERVERS,required"`
// The ID of the consumer group to join. This is a required field when using
// Consumer.
//
// Applies To: Consumer
GroupID string `env:"SHIVA_KAFKA_GROUP_ID"`
// Client group session and failure detection timeout. The consumer sends
// periodic heartbeats to indicate its liveness to the broker. If no heart
// beats are received by the broker for a group member within the session
// timeout, the broker will remove the consumer from the group and trigger
// a rebalance.
//
// The unit is milliseconds with a default of 45000 (45 seconds).
//
// Applies To: Consumer
SessionTimeout time.Duration `env:"SHIVA_KAFKA_SESSION_TIMEOUT,default=45s"`
// Interval at which the consumer sends heartbeats to the broker. The default
// is 3000 (3 seconds).
//
// Applies To: Consumer
HeartbeatInterval time.Duration `env:"SHIVA_KAFKA_HEARTBEAT_TIMEOUT,default=3s"`
// The interval between committing offsets back to Kafka brokers. The default
// is 5000ms (5 seconds).
//
// Applies To: Consumer
CommitInterval time.Duration `env:"SHIVA_KAFKA_COMMIT_INTERVAL,default=5s"`
// The amount of time to wait for an event when polling from Kafka. The default
// 100ms.
//
// Applies To: Consumer
PollTimeout time.Duration `env:"SHIVA_KAFKA_POLL_TIMEOUT,default=100ms"`
// Configures the behavior when there are no stored offsets found for the
// Consumer group for topic/partition.
//
// The default is latest which means that the consumer will start reading
// from the latest message in the topic/partition.
//
// Applies To: Consumer
AutoOffsetReset AutoOffsetReset `env:"SHIVA_KAFKA_AUTO_OFFSET_RESET,default=earliest"`
// Configures when the Consumer acknowledges the message. The possible values
// are:
//
// none - Messages will not be acknowledged and no offsets will be tracked
// pre-processing - Messages will be acknowledged when received but before
// the Handler has been invoked. This follows the at-most-once
// delivery model.
// post-processing - Messages will be acknowledged only after the Handler has
// returned, regardless if it succeeded or returned an error.
// This follows the at-least-once delivery model.
//
// The default is post-processing.
AcknowledgmentStrategy AcknowledgmentStrategy `env:"SHIVA_KAFKA_CONSUMER_ACK_STRATEGY,default=post-processing"`
// The maximum size for a message. The default is 1048576 (1MB).
//
// Applies To: Consumer, Producer
MessageMaxBytes int `env:"SHIVA_KAFKA_MESSAGE_MAX_BYTES,default=1048576"`
// Maximum amount of data the broker shall return for a Fetch request. Messages
// are fetched in batches by the consumer. The default is 52428800 (50MB).
//
// Applies To: Consumer
MaxFetchBytes int `env:"KAFKA_MAX_FETCH_BYTES, default=52428800"`
// The security protocol used to communicate with the brokers.
//
// Valid values are: plaintext, ssl, sasl_plaintext, sasl_ssl.
//
// Applies To: Consumer, Producer
SecurityProtocol SecurityProtocol `env:"SHIVA_KAFKA_SECURITY_PROTOCOL,default=plaintext"`
// The location of the certificate authority file used to verify the brokers.
//
// Applies To: Consumer, Producer
CertificateAuthorityLocation string `env:"SHIVA_KAFKA_CERT_AUTHORITY_LOCATION"`
// The location of the client certificate used to authenticate with the brokers.
//
// Applies To: Consumer, Producer
CertificateLocation string `env:"SHIVA_KAFKA_CERT_LOCATION"`
// The location of the key for the client certificate.
//
// Applies To: Consumer, Producer
CertificateKeyLocation string `env:"SHIVA_KAFKA_CERT_KEY_LOCATION"`
// The password for the key used for the client certificate.
//
// Applies To: Consumer, Producer
CertificateKeyPassword string `env:"SHIVA_KAFKA_CERT_KEY_PASSWORD"`
// Skip TLS verification when using SSL or SASL_SSL.
//
// Applies To: Consumer, Producer
SkipTlsVerification bool `env:"SHIVA_KAFKA_SKIP_TLS_VERIFICATION,default=false"`
// The SASL mechanism to use for SASL authentication.
//
// Applies To: Consumer, Producer
SASLMechanism SaslMechanism `env:"SHIVA_KAFKA_SASL_MECHANISM,default=PLAIN"`
// The username for authenticating with SASL.
//
// Applies To: Consumer, Producer
SASLUsername string `env:"SHIVA_KAFKA_SASL_USERNAME"`
// The password for authenticating with SASL.
//
// Applies To: Consumer, Producer
SASLPassword string `env:"SHIVA_KAFKA_SASL_PASSWORD"`
// The number of acknowledgements the producer requires the leader to have
// received before considering a request complete. The default value is
// AckAll ("all"), which will wait for all in-sync replicas to acknowledge
// before proceeding.
//
// Applies To: Producer
RequiredAcks RequiredAck `env:"SHIVA_KAFKA_PRODUCER_REQUIRED_ACKS, default=all"`
// When set to `true`, the producer will ensure that messages are successfully
// produced exactly once and in the original produce order.
//
// The following configuration properties are adjusted automatically (if not modified
// by the user) when idempotence is enabled: `max.in.flight.requests.per.connection=5`
// (must be less than or equal to 5), `retries=INT32_MAX` (must be greater than 0),
// `acks=all`, `queuing.strategy=fifo`. Producer instantation will fail if user-supplied
// configuration is incompatible.
//
// Applies To: Producer
Idempotence bool `env:"SHIVA_KAFKA_PRODUCER_IDEMPOTENCE, default=false"`
// The transactional ID to use for messages produced by the producer. This
// is used to ensure that messages are produced atomically and in order. This
// is required when using transactions.
//
// Applies To: Producer
TransactionID string `env:"SHIVA_KAFKA_PRODUCER_TRANSACTION_ID"`
}
KafkaConfig represents the configuration settings for a Kafka client, including connection, authentication, and behavior options.
func ConfigFromEnv ¶
func ConfigFromEnv() (*KafkaConfig, error)
ConfigFromEnv loads the configuration for the Kafka client from the environment.
type Message ¶
type Message struct {
Topic string
Partition int
Offset int64
Key []byte
Value []byte
Timestamp time.Time
Headers []Header
Opaque interface{}
}
Message represents a single message from Kafka.
type MessageBuilder ¶
type MessageBuilder struct {
// contains filtered or unexported fields
}
MessageBuilder represents a Kafka message and provides a fluent API for constructing a message to be produced to Kafka.
func (*MessageBuilder) Err ¶
func (m *MessageBuilder) Err() error
Err returns the last error that occurred while building the message or nil if there were no errors.
func (*MessageBuilder) Header ¶
func (m *MessageBuilder) Header(key string, value []byte) *MessageBuilder
Header adds a header to the message.
func (*MessageBuilder) JSON ¶
func (m *MessageBuilder) JSON(v any) *MessageBuilder
JSON serializes the provided value to JSON and sets it as the value for the message.
func (*MessageBuilder) Key ¶
func (m *MessageBuilder) Key(k string) *MessageBuilder
Key sets the key for the message.
func (*MessageBuilder) Message ¶
func (m *MessageBuilder) Message() (Message, error)
Message builds and returns a *kafka.Message instance from the Confluent Kafka library.
func (*MessageBuilder) Opaque ¶
func (m *MessageBuilder) Opaque(o interface{}) *MessageBuilder
Opaque sets the opaque value for the message.
func (*MessageBuilder) Produce ¶
func (m *MessageBuilder) Produce(ctx context.Context) error
Produce produces a message to Kafka and waits for the delivery report.
This method is blocking and will wait until the delivery report is received from Kafka.
func (*MessageBuilder) ProduceAsync ¶
func (m *MessageBuilder) ProduceAsync(ctx context.Context, deliveryCh chan DeliveryReport) error
ProduceAsync produces a message to Kafka asynchronously and returns immediately if the message was enqueued successfully, otherwise returns an error. The delivery report is delivered on the provided channel. If the channel is nil than Send operates as fire-and-forget.
func (*MessageBuilder) Topic ¶
func (m *MessageBuilder) Topic(t string) *MessageBuilder
Topic sets the topic for the message.
func (*MessageBuilder) Value ¶
func (m *MessageBuilder) Value(v interface{}) *MessageBuilder
Value sets the value for the message.
The behavior of Value varies based on the type of v:
[]byte - uses value as is string - cast the value as []byte encoding.BinaryMarshaler - invokes MarshalBinary and uses the resulting []byte
If v is not any of the above types Value will attempt to marshal v as JSON.
type Middleware ¶
Middleware is a function type that wraps a Handler to enable intercepting a Handler.
func ChainMiddlewares ¶
func ChainMiddlewares(hs ...Middleware) Middleware
ChainMiddlewares chains multiple Middleware functions into a single Middleware by applying them in reverse order.
type NopConsumerTelemetryProvider ¶
type NopConsumerTelemetryProvider struct {
}
func (NopConsumerTelemetryProvider) RecordHandlerError ¶
func (n NopConsumerTelemetryProvider) RecordHandlerError(_ string, _ string)
func (NopConsumerTelemetryProvider) RecordHandlerExecutionDuration ¶
func (n NopConsumerTelemetryProvider) RecordHandlerExecutionDuration(_ string, _ string, _ time.Duration)
func (NopConsumerTelemetryProvider) RecordKafkaError ¶
func (n NopConsumerTelemetryProvider) RecordKafkaError(_ string, _ string, _ int)
func (NopConsumerTelemetryProvider) RecordMessageProcessed ¶
func (n NopConsumerTelemetryProvider) RecordMessageProcessed(_ string, _ string)
func (NopConsumerTelemetryProvider) RecordRebalance ¶
func (n NopConsumerTelemetryProvider) RecordRebalance(_ string, _ string)
type NopLogger ¶
type NopLogger struct{}
NopLogger is a no-operation logger that implements logging methods without performing any actual logging.
func NewNopLogger ¶
func NewNopLogger() *NopLogger
type NopProducerTelemetryProvider ¶
type NopProducerTelemetryProvider struct{}
func (NopProducerTelemetryProvider) RecordDeliveryEnqueueError ¶
func (n NopProducerTelemetryProvider) RecordDeliveryEnqueueError(topic string)
func (NopProducerTelemetryProvider) RecordDeliveryError ¶
func (n NopProducerTelemetryProvider) RecordDeliveryError(topic string)
func (NopProducerTelemetryProvider) RecordKafkaError ¶
func (n NopProducerTelemetryProvider) RecordKafkaError(code int)
func (NopProducerTelemetryProvider) RecordMessageDelivered ¶
func (n NopProducerTelemetryProvider) RecordMessageDelivered(topic string)
func (NopProducerTelemetryProvider) TraceDelivery ¶
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
func WithLogger ¶
WithLogger sets a Logger to be used for a component.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(conf KafkaConfig, opts ...ProducerOption) (*Producer, error)
func (*Producer) Close ¶
func (p *Producer) Close()
Close stops the producer and releases any resources. A Producer is not usable after this method is called.
func (*Producer) Flush ¶
Flush and wait for outstanding messages and requests to complete delivery. Runs until value reaches zero or timeout is exceeded. Returns the number of outstanding events still un-flushed.
func (*Producer) Len ¶
Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application.
func (*Producer) M ¶
func (p *Producer) M() *MessageBuilder
M returns a MessageBuilder that provides a fluent API for building and sending a message.
func (*Producer) Produce ¶
Produce produces a message to Kafka synchronously. The message will be queued for transmission to the Kafka brokers, and Produce will block until the message has been delivered to the Kafka brokers or an error has occurred.
Produce will return an error under the following conditions:
1. The message could not be queued for delivery. 2. The message could not be delivered to Kafka. 3. The context was canceled or exceeded its deadline.
It is important to note that the underlying Confluent Kafka GO client does not use or respect the context. If the context is canceled or exceeds its deadline, it does not interrupt the delivery of the message. Instead, it simply aborts waiting on the delivery report from Kafka. Generally speaking, the context is mainly used for tracing purposes.
func (*Producer) ProduceAsync ¶
func (p *Producer) ProduceAsync(ctx context.Context, m Message, deliveryCh chan DeliveryReport) error
ProduceAsync produces a message to Kafka asynchronously. The message will be transmitted to the Kafka brokers, and the delivery report will be sent to the provided delivery channel.
ProduceAsync returns immediately after the message has been queued to be transmitted to the Kafka brokers. The delivery report will be sent to the provided delivery channel when the message has been successfully transmitted to the Kafka brokers or an error has occurred.
ProduceAsync returns an error only if the message could not be queued for delivery but does not indicate whether the message was successfully transmitted to the Kafka brokers. The caller must check the delivery report to determine if the message was successfully transmitted to the Kafka brokers.
Note providing a nil delivery channel will cause the delivery report to be discarded effectively making ProduceAsync a fire and forget operation.
The context is used for tracing purposes. The context being canceled or exceeding its deadline has no effect. This is because the underlying Confluent Kafka GO client does not use or respect the context.
func (*Producer) Purge ¶
Purge removes all messages within librdkafka's internal queue waiting to be transmitted to the Kafka brokers.
Note that Purge does not remove any messages that have already been delivered to the Kafka brokers. This method should be used with extreme caution unless you don't care if messages are delivered or not.
type ProducerOption ¶
type ProducerOption interface {
// contains filtered or unexported methods
}
func WithProducerTelemetryProvider ¶
func WithProducerTelemetryProvider(provider ProducerTelemetryProvider) ProducerOption
WithProducerTelemetryProvider sets an implementation of ProducerTelemetryProvider the Producer will use to record metrics/telemetry.
type ProducerTelemetryProvider ¶
type ProducerTelemetryProvider interface {
RecordMessageDelivered(topic string)
RecordDeliveryError(topic string)
RecordKafkaError(code int)
RecordDeliveryEnqueueError(topic string)
Trace(ctx context.Context, msg *kafka.Message) (context.Context, func(err error))
TraceDelivery(ctx context.Context) (context.Context, func(err error))
}
type RequiredAck ¶
type RequiredAck string
const ( // AckNone disables acknowledgements from the brokers. The producer will not // wait for any acknowledgement from the broker and the broker does not wait // for the message to be written before it responds. AckNone RequiredAck = "none" // AckLeader ensures the leader broker must receive the record and successfully // write it to its local log before responding. AckLeader RequiredAck = "leader" // AckAll ensures the leader and all in-sync replicas must receive the record // and successfully write it to their local log before responding. AckAll RequiredAck = "all" )
func ParseRequiredAck ¶
func ParseRequiredAck(s string) (RequiredAck, error)
func (*RequiredAck) UnmarshalText ¶
func (a *RequiredAck) UnmarshalText(text []byte) error
type RetryHandler ¶
type RetryHandler struct {
// contains filtered or unexported fields
}
RetryHandler is a middleware that wraps a Handler to provide retry logic with configurable behavior and backoff.
func Retry ¶
func Retry(next Handler, opts ...RetryOption) *RetryHandler
Retry is a middleware for a Handler that will retry processing messages when the Handler implementation returns an error up to the max attempts. By default, Retry will make at most five attempts but that can be configured by passing one or many RetryOption. An exponential backoff is used between attempts starting with the initial delay to give the system a chance to recover for transient errors. The default initial delay is 500ms with a max delay of 10s.
By default, all errors will be retried. However, using the WithRetryableErrorsOnly option the middleware can be configured to only retry errors marked as retryable which can be achieved by wrapping error that should be retried with the WrapAsRetryable func.
Retry will return all errors if the max attempts is exhausted without success, but once the operation succeeds the previous errors are discard. It is recommended to either handle logging and instrumentation within the Handler implementation or utilize the WithOnError RetryOption which is a callback invoked whenever an error is returned from the Handler.
A panic will occur if next is nil.
type RetryOption ¶
type RetryOption func(*retryConfig)
RetryOption defines a functional option type used to configure retry behavior for the Retry mechanism.
func WithInitialDelay ¶
func WithInitialDelay(initialDelay time.Duration) RetryOption
WithInitialDelay sets the initial delay before the first retry.
func WithMaxAttempts ¶
func WithMaxAttempts(maxAttempts int) RetryOption
WithMaxAttempts sets the maximum number of attempts for the operation before giving up.
Panics if maxAttempts is less than 1.
func WithMaxDelay ¶
func WithMaxDelay(maxDelay time.Duration) RetryOption
WithMaxDelay sets the max delay between retries effectively capping the max time to delay between attempts.
func WithOnError ¶
func WithOnError(onError func(err error)) RetryOption
WithOnError sets a callback function that gets invoked when the next Handler in the chain returns an error. Passing nil is a no-op.
func WithRetryableErrorsOnly ¶
func WithRetryableErrorsOnly(retryableErrorsOnly bool) RetryOption
WithRetryableErrorsOnly restricts retries to only occur for retryable errors when set to true.
type RetryableError ¶
type RetryableError struct {
// contains filtered or unexported fields
}
func (RetryableError) Error ¶
func (r RetryableError) Error() string
func (RetryableError) IsRetryable ¶
func (r RetryableError) IsRetryable() bool
func (RetryableError) Unwrap ¶
func (r RetryableError) Unwrap() error
type SaslMechanism ¶
type SaslMechanism string
const ( Plain SaslMechanism = "PLAIN" ScramSha256 SaslMechanism = "SCRAM-SHA-256" ScramSha512 SaslMechanism = "SCRAM-SHA-512" )
func ParseSaslMechanism ¶
func ParseSaslMechanism(s string) (SaslMechanism, error)
func (*SaslMechanism) String ¶
func (sm *SaslMechanism) String() string
func (*SaslMechanism) UnmarshalText ¶
func (sm *SaslMechanism) UnmarshalText(b []byte) error
type SecurityProtocol ¶
type SecurityProtocol string
const ( Plaintext SecurityProtocol = "plaintext" Ssl SecurityProtocol = "ssl" SaslPlaintext SecurityProtocol = "sasl_plaintext" SaslSsl SecurityProtocol = "sasl_ssl" )
func ParseSecurityProtocol ¶
func ParseSecurityProtocol(s string) (SecurityProtocol, error)
func (*SecurityProtocol) String ¶
func (sp *SecurityProtocol) String() string
func (*SecurityProtocol) UnmarshalText ¶
func (sp *SecurityProtocol) UnmarshalText(b []byte) error
type SlogAdapter ¶
type SlogAdapter struct {
// contains filtered or unexported fields
}
SlogAdapter is a structured logging adapter wrapping a slog.Logger. It provides methods for logging at different levels with key-value pairs.
func NewSlogAdapter ¶
func NewSlogAdapter(logger *slog.Logger) *SlogAdapter
NewSlogAdapter initializes a new instance of SlogAdapter wrapping the provided slog.Logger.
Panics if logger is nil
func (SlogAdapter) Debug ¶
func (s SlogAdapter) Debug(msg string, kvs ...interface{})
func (SlogAdapter) Error ¶
func (s SlogAdapter) Error(msg string, kvs ...interface{})
func (SlogAdapter) Info ¶
func (s SlogAdapter) Info(msg string, kvs ...interface{})
func (SlogAdapter) Warn ¶
func (s SlogAdapter) Warn(msg string, kvs ...interface{})
type StdLogAdapter ¶
type StdLogAdapter struct {
// contains filtered or unexported fields
}
func NewStdLogAdapter ¶
func NewStdLogAdapter(logger *log.Logger) *StdLogAdapter
func (StdLogAdapter) Debug ¶
func (s StdLogAdapter) Debug(msg string, kvs ...interface{})
func (StdLogAdapter) Error ¶
func (s StdLogAdapter) Error(msg string, kvs ...interface{})
func (StdLogAdapter) Info ¶
func (s StdLogAdapter) Info(msg string, kvs ...interface{})
func (StdLogAdapter) Warn ¶
func (s StdLogAdapter) Warn(msg string, kvs ...interface{})
type TopicPartition ¶
TopicPartition represents a topic and partition along with its offset.
type TopicPartitions ¶
type TopicPartitions []TopicPartition
TopicPartitions represents an array of TopicPartition