diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index c190c2764..f0a33a78b 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -99,6 +99,12 @@ func Test_NewIngestKafka2(t *testing.T) { require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout) } +func removeTimestamp(receivedEntries []config.GenericMap) { + for _, entry := range receivedEntries { + delete(entry, "TimeReceived") + } +} + func Test_IngestKafka(t *testing.T) { newIngest := initNewIngestKafka(t, testConfig1) ingestKafka := newIngest.(*ingestKafka) @@ -124,6 +130,10 @@ func Test_IngestKafka(t *testing.T) { // wait for the data to have been processed receivedEntries := <-ingestOutput + // we remove timestamp for test stability + // Timereceived field is tested in the decodeJson tests + removeTimestamp(receivedEntries) + require.Equal(t, 3, len(receivedEntries)) require.Equal(t, test.DeserializeJSONToMap(t, record1), receivedEntries[0]) require.Equal(t, test.DeserializeJSONToMap(t, record2), receivedEntries[1]) @@ -175,6 +185,10 @@ func Test_KafkaListener(t *testing.T) { // wait for the data to have been processed receivedEntries := <-ingestOutput + // we remove timestamp for test stability + // Timereceived field is tested in the decodeJson tests + removeTimestamp(receivedEntries) + require.Equal(t, 1, len(receivedEntries)) require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0]) }