diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index f0a33a78b..8bedec5d1 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -64,6 +64,8 @@ parameters: groupBalancers: ["rackAffinity"] decoder: type: json + batchMaxLen: 1000 + commitInterval: 1000 ` func initNewIngestKafka(t *testing.T, configTemplate string) Ingester { @@ -85,6 +87,8 @@ func Test_NewIngestKafka1(t *testing.T) { require.Equal(t, "FirstOffset", ingestKafka.kafkaParams.StartOffset) require.Equal(t, 2, len(ingestKafka.kafkaReader.Config().GroupBalancers)) require.Equal(t, int64(300), ingestKafka.kafkaParams.BatchReadTimeout) + require.Equal(t, int(500), ingestKafka.batchMaxLength) + require.Equal(t, time.Duration(500)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval) } func Test_NewIngestKafka2(t *testing.T) { @@ -97,6 +101,8 @@ func Test_NewIngestKafka2(t *testing.T) { require.Equal(t, "LastOffset", ingestKafka.kafkaParams.StartOffset) require.Equal(t, 1, len(ingestKafka.kafkaReader.Config().GroupBalancers)) require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout) + require.Equal(t, int(1000), ingestKafka.batchMaxLength) + require.Equal(t, time.Duration(1000)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval) } func removeTimestamp(receivedEntries []config.GenericMap) {