Skip to content

Commit

Permalink
Fix kafka ingest test by removing received timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Jun 22, 2022
1 parent c418a01 commit ac34ff8
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func Test_NewIngestKafka2(t *testing.T) {
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
}

func removeTimestamp(receivedEntries []config.GenericMap) {
for _, entry := range receivedEntries {
delete(entry, "TimeReceived")
}
}

func Test_IngestKafka(t *testing.T) {
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)
Expand All @@ -124,6 +130,10 @@ func Test_IngestKafka(t *testing.T) {
// wait for the data to have been processed
receivedEntries := <-ingestOutput

// we remove timestamp for test stability
// Timereceived field is tested in the decodeJson tests
removeTimestamp(receivedEntries)

require.Equal(t, 3, len(receivedEntries))
require.Equal(t, test.DeserializeJSONToMap(t, record1), receivedEntries[0])
require.Equal(t, test.DeserializeJSONToMap(t, record2), receivedEntries[1])
Expand Down Expand Up @@ -175,6 +185,10 @@ func Test_KafkaListener(t *testing.T) {
// wait for the data to have been processed
receivedEntries := <-ingestOutput

// we remove timestamp for test stability
// Timereceived field is tested in the decodeJson tests
removeTimestamp(receivedEntries)

require.Equal(t, 1, len(receivedEntries))
require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0])
}

0 comments on commit ac34ff8

Please sign in to comment.