From 6b42249c51f8f63bc7df0b8e208f3f9714508c6f Mon Sep 17 00:00:00 2001 From: Sebastian Poxhofer Date: Tue, 28 Dec 2021 15:32:24 +0100 Subject: [PATCH] [exporter/kafka] Allow configuring the acknowledgement behaviour (#6301) * feat(exporter/kafka): allow configuring the acknowledgement behaviour * doc(exporter/kafka): add additional context to the RequiredAcks parameter * chore(exporter/kafka): validate RequiredAcks option * chore(container.image.name): add changelog entry --- CHANGELOG.md | 1 + exporter/kafkaexporter/README.md | 3 +++ exporter/kafkaexporter/config.go | 13 +++++++++++++ exporter/kafkaexporter/config_test.go | 2 ++ exporter/kafkaexporter/factory.go | 4 ++++ exporter/kafkaexporter/kafka_exporter.go | 3 +-- exporter/kafkaexporter/testdata/config.yaml | 1 + 7 files changed, 25 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d74b9a3ae31a..c17f717681fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - `datadogexporter`: Add compatibility with ECS Fargate semantic conventions (#6670) - `k8s_observer`: discover k8s.node endpoints (#6820) - `redisreceiver`: Add missing description fields to keyspace metrics (#6940) +- `kafkaexporter`: Allow controlling Kafka acknowledgment behaviour (#6301) ## 🛑 Breaking changes 🛑 diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index c36b14bbb886..56c73842889e 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -62,6 +62,9 @@ The following settings can be optionally configured: User should calculate this as `num_seconds * requests_per_second` where: - `num_seconds` is the number of seconds to buffer in case of a backend outage - `requests_per_second` is the average number of requests per seconds. +- `producer` + - `max_message_bytes` (default = 1000000) the maximum permitted size of a message in bytes + - `required_acks` (default = 1) controls when a message is regarded as transmitted. https://pkg.go.dev/github.com/Shopify/sarama@v1.30.0#RequiredAcks Example configuration: diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index ed544c460701..aa33e9def59f 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -15,8 +15,10 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" import ( + "fmt" "time" + "github.com/Shopify/sarama" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/exporter/exporterhelper" ) @@ -67,6 +69,14 @@ type Metadata struct { type Producer struct { // Maximum message bytes the producer will accept to produce. MaxMessageBytes int `mapstructure:"max_message_bytes"` + + // RequiredAcks Number of acknowledgements required to assume that a message has been sent. + // https://pkg.go.dev/github.com/Shopify/sarama@v1.30.0#RequiredAcks + // The options are: + // 0 -> NoResponse. doesn't send any response + // 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default ) + // -1 -> WaitForAll. waits for all in-sync replicas to commit before responding. + RequiredAcks sarama.RequiredAcks `mapstructure:"required_acks"` } // MetadataRetry defines retry configuration for Metadata. @@ -83,5 +93,8 @@ var _ config.Exporter = (*Config)(nil) // Validate checks if the exporter configuration is valid func (cfg *Config) Validate() error { + if cfg.Producer.RequiredAcks < -1 || cfg.Producer.RequiredAcks > 1 { + return fmt.Errorf("producer.required_acks has to be between -1 and 1. configured value %v", cfg.Producer.RequiredAcks) + } return nil } diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index 6d70f8b74fd6..0fde9a8035bf 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/Shopify/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" @@ -72,6 +73,7 @@ func TestLoadConfig(t *testing.T) { }, Producer: Producer{ MaxMessageBytes: 10000000, + RequiredAcks: sarama.WaitForAll, }, }, c) } diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index 289b7bfdb2a0..5209198a5ad9 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -18,6 +18,7 @@ import ( "context" "time" + "github.com/Shopify/sarama" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" @@ -39,6 +40,8 @@ const ( defaultMetadataFull = true // default max.message.bytes for the producer defaultProducerMaxMessageBytes = 1000000 + // default required_acks for the producer + defaultProducerRequiredAcks = sarama.WaitForLocal ) // FactoryOption applies changes to kafkaExporterFactory. @@ -91,6 +94,7 @@ func createDefaultConfig() config.Exporter { }, Producer: Producer{ MaxMessageBytes: defaultProducerMaxMessageBytes, + RequiredAcks: defaultProducerRequiredAcks, }, } } diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 153241ed5d51..9a03f117cb13 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -128,8 +128,7 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) { // These setting are required by the sarama.SyncProducer implementation. c.Producer.Return.Successes = true c.Producer.Return.Errors = true - // Wait only the local commit to succeed before responding. - c.Producer.RequiredAcks = sarama.WaitForLocal + c.Producer.RequiredAcks = config.Producer.RequiredAcks // Because sarama does not accept a Context for every message, set the Timeout here. c.Producer.Timeout = config.Timeout c.Metadata.Full = config.Metadata.Full diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index c1cef85b16f0..8c0885165c5a 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -10,6 +10,7 @@ exporters: max: 15 producer: max_message_bytes: 10000000 + required_acks: -1 # WaitForAll timeout: 10s auth: plain_text: