Skip to content

Commit

Permalink
Improving tests to cover new exposed parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Jun 23, 2022
1 parent 87fe2c4 commit d90a2e4
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ parameters:
groupBalancers: ["rackAffinity"]
decoder:
type: json
batchMaxLen: 1000
commitInterval: 1000
`

func initNewIngestKafka(t *testing.T, configTemplate string) Ingester {
Expand All @@ -85,6 +87,8 @@ func Test_NewIngestKafka1(t *testing.T) {
require.Equal(t, "FirstOffset", ingestKafka.kafkaParams.StartOffset)
require.Equal(t, 2, len(ingestKafka.kafkaReader.Config().GroupBalancers))
require.Equal(t, int64(300), ingestKafka.kafkaParams.BatchReadTimeout)
require.Equal(t, int(500), ingestKafka.batchMaxLength)
require.Equal(t, time.Duration(500)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
}

func Test_NewIngestKafka2(t *testing.T) {
Expand All @@ -97,6 +101,8 @@ func Test_NewIngestKafka2(t *testing.T) {
require.Equal(t, "LastOffset", ingestKafka.kafkaParams.StartOffset)
require.Equal(t, 1, len(ingestKafka.kafkaReader.Config().GroupBalancers))
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
require.Equal(t, int(1000), ingestKafka.batchMaxLength)
require.Equal(t, time.Duration(1000)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
}

func removeTimestamp(receivedEntries []config.GenericMap) {
Expand Down

0 comments on commit d90a2e4

Please sign in to comment.