Skip to content

Commit

Permalink
Fix kafka metadata to HTTP headers conversion of invalid characters (#…
Browse files Browse the repository at this point in the history
…3511)

Signed-off-by: Anton Troshin <anton@diagrid.io>
  • Loading branch information
antontroshin authored Aug 29, 2024
1 parent e53cf34 commit 8eb716e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
7 changes: 4 additions & 3 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafka
import (
"errors"
"fmt"
"net/url"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -128,7 +129,7 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
messages []*sarama.ConsumerMessage, handler BulkEventHandler, topic string,
) error {
consumer.k.logger.Debugf("Processing Kafka bulk message: %s", topic)
messageValues := make([]KafkaBulkMessageEntry, (len(messages)))
messageValues := make([]KafkaBulkMessageEntry, len(messages))

for i, message := range messages {
if message != nil {
Expand Down Expand Up @@ -205,14 +206,14 @@ func GetEventMetadata(message *sarama.ConsumerMessage) map[string]string {
if message != nil {
metadata := make(map[string]string, len(message.Headers)+5)
if message.Key != nil {
metadata[keyMetadataKey] = string(message.Key)
metadata[keyMetadataKey] = url.QueryEscape(string(message.Key))
}
metadata[offsetMetadataKey] = strconv.FormatInt(message.Offset, 10)
metadata[topicMetadataKey] = message.Topic
metadata[timestampMetadataKey] = strconv.FormatInt(message.Timestamp.UnixMilli(), 10)
metadata[partitionMetadataKey] = strconv.FormatInt(int64(message.Partition), 10)
for _, header := range message.Headers {
metadata[string(header.Key)] = string(header.Value)
metadata[string(header.Key)] = url.QueryEscape(string(header.Value))
}
return metadata
}
Expand Down
28 changes: 28 additions & 0 deletions common/component/kafka/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kafka

import (
"fmt"
"net/url"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -556,4 +557,31 @@ func TestGetEventMetadata(t *testing.T) {
act := GetEventMetadata(nil)
require.Nil(t, act)
})

t.Run("key with invalid value escaped", func(t *testing.T) {
keyValue := "key1\xFF"
escapedKeyValue := url.QueryEscape(keyValue)

m := sarama.ConsumerMessage{
Headers: nil, Timestamp: ts, Key: []byte(keyValue), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m)
require.Equal(t, escapedKeyValue, act[keyMetadataKey])
})

t.Run("header with invalid value escaped", func(t *testing.T) {
headerKey := "key1"
headerValue := "value1\xFF"
escapedHeaderValue := url.QueryEscape(headerValue)

headers := []*sarama.RecordHeader{
{Key: []byte(headerKey), Value: []byte(headerValue)},
}
m := sarama.ConsumerMessage{
Headers: headers, Timestamp: ts, Key: []byte("MyKey"), Value: []byte("MyValue"), Partition: 0, Offset: 123, Topic: "TestTopic",
}
act := GetEventMetadata(&m)
require.Len(t, act, 6)
require.Equal(t, escapedHeaderValue, act[headerKey])
})
}

0 comments on commit 8eb716e

Please sign in to comment.