Skip to content

Commit

Permalink
[exporter/kafka] Allow configuring the acknowledgement behaviour (#6301)
Browse files Browse the repository at this point in the history
* feat(exporter/kafka): allow configuring the acknowledgement behaviour

* doc(exporter/kafka): add additional context to the RequiredAcks parameter

* chore(exporter/kafka): validate RequiredAcks option

* chore(container.image.name): add changelog entry
  • Loading branch information
secustor committed Dec 28, 2021
1 parent 6350778 commit 6b42249
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- `datadogexporter`: Add compatibility with ECS Fargate semantic conventions (#6670)
- `k8s_observer`: discover k8s.node endpoints (#6820)
- `redisreceiver`: Add missing description fields to keyspace metrics (#6940)
- `kafkaexporter`: Allow controlling Kafka acknowledgment behaviour (#6301)

## 🛑 Breaking changes 🛑

Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ The following settings can be optionally configured:
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.
- `producer`
- `max_message_bytes` (default = 1000000) the maximum permitted size of a message in bytes
- `required_acks` (default = 1) controls when a message is regarded as transmitted. https://pkg.go.dev/github.com/Shopify/sarama@v1.30.0#RequiredAcks

Example configuration:

Expand Down
13 changes: 13 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"

import (
"fmt"
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)
Expand Down Expand Up @@ -67,6 +69,14 @@ type Metadata struct {
type Producer struct {
// Maximum message bytes the producer will accept to produce.
MaxMessageBytes int `mapstructure:"max_message_bytes"`

// RequiredAcks Number of acknowledgements required to assume that a message has been sent.
// https://pkg.go.dev/github.com/Shopify/sarama@v1.30.0#RequiredAcks
// The options are:
// 0 -> NoResponse. doesn't send any response
// 1 -> WaitForLocal. waits for only the local commit to succeed before responding ( default )
// -1 -> WaitForAll. waits for all in-sync replicas to commit before responding.
RequiredAcks sarama.RequiredAcks `mapstructure:"required_acks"`
}

// MetadataRetry defines retry configuration for Metadata.
Expand All @@ -83,5 +93,8 @@ var _ config.Exporter = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if cfg.Producer.RequiredAcks < -1 || cfg.Producer.RequiredAcks > 1 {
return fmt.Errorf("producer.required_acks has to be between -1 and 1. configured value %v", cfg.Producer.RequiredAcks)
}
return nil
}
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
Expand Down Expand Up @@ -72,6 +73,7 @@ func TestLoadConfig(t *testing.T) {
},
Producer: Producer{
MaxMessageBytes: 10000000,
RequiredAcks: sarama.WaitForAll,
},
}, c)
}
4 changes: 4 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -39,6 +40,8 @@ const (
defaultMetadataFull = true
// default max.message.bytes for the producer
defaultProducerMaxMessageBytes = 1000000
// default required_acks for the producer
defaultProducerRequiredAcks = sarama.WaitForLocal
)

// FactoryOption applies changes to kafkaExporterFactory.
Expand Down Expand Up @@ -91,6 +94,7 @@ func createDefaultConfig() config.Exporter {
},
Producer: Producer{
MaxMessageBytes: defaultProducerMaxMessageBytes,
RequiredAcks: defaultProducerRequiredAcks,
},
}
}
Expand Down
3 changes: 1 addition & 2 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
// These setting are required by the sarama.SyncProducer implementation.
c.Producer.Return.Successes = true
c.Producer.Return.Errors = true
// Wait only the local commit to succeed before responding.
c.Producer.RequiredAcks = sarama.WaitForLocal
c.Producer.RequiredAcks = config.Producer.RequiredAcks
// Because sarama does not accept a Context for every message, set the Timeout here.
c.Producer.Timeout = config.Timeout
c.Metadata.Full = config.Metadata.Full
Expand Down
1 change: 1 addition & 0 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ exporters:
max: 15
producer:
max_message_bytes: 10000000
required_acks: -1 # WaitForAll
timeout: 10s
auth:
plain_text:
Expand Down

0 comments on commit 6b42249

Please sign in to comment.