From e5322262f6fbeb2f7f3ca0ae2040b9b581cb6bec Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 4 Sep 2024 21:16:20 -0700 Subject: [PATCH 1/4] Addressed issue in Kafka-pubsub for avro null messages (#3531) Signed-off-by: Patrick Assuied --- common/component/kafka/kafka.go | 10 +++++++--- common/component/kafka/kafka_test.go | 9 ++++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 8b2be83123..3f3bfca8c4 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -14,6 +14,7 @@ limitations under the License. package kafka import ( + "bytes" "context" "encoding/binary" "errors" @@ -263,7 +264,9 @@ func getSchemaSubject(topic string) string { } func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) { - // Null Data is valid and a tombstone record. It shouldn't be serialized + // Null Data is valid and a tombstone record. + // It shouldn't be going through schema validation and decoding + // Instead directly convert to JSON `null` if message.Value == nil { return []byte("null"), nil } @@ -354,8 +357,9 @@ func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) } func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) { - // Null Data is valid and a tombstone record. It shouldn't be serialized - if data == nil { + // Null Data is valid and a tombstone record. + // It should be converted to NULL and not go through schema validation & encoding + if bytes.Equal(data, []byte("null")) || data == nil { return nil, nil } diff --git a/common/component/kafka/kafka_test.go b/common/component/kafka/kafka_test.go index f60c2ad236..2e9ac3fbe7 100644 --- a/common/component/kafka/kafka_test.go +++ b/common/component/kafka/kafka_test.go @@ -219,7 +219,14 @@ func TestSerializeValueCachingDisabled(t *testing.T) { require.NoError(t, err) }) - t.Run("value published null, no error", func(t *testing.T) { + t.Run("value published 'null', no error", func(t *testing.T) { + act, err := k.SerializeValue("my-topic", []byte("null"), map[string]string{"valueSchemaType": "Avro"}) + + require.Nil(t, act) + require.NoError(t, err) + }) + + t.Run("value published nil, no error", func(t *testing.T) { act, err := k.SerializeValue("my-topic", nil, map[string]string{"valueSchemaType": "Avro"}) require.Nil(t, act) From 9ea3fee247f9220828e8d6a656a967b8eeda0a98 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Thu, 5 Sep 2024 08:01:58 -0700 Subject: [PATCH 2/4] Resolving a weird edge case in case of a poison pill message being retried, followed by a pod restart (#3532) Signed-off-by: Patrick Assuied --- common/component/kafka/consumer.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/common/component/kafka/consumer.go b/common/component/kafka/consumer.go index a05e611707..4b30ed58b8 100644 --- a/common/component/kafka/consumer.go +++ b/common/component/kafka/consumer.go @@ -67,6 +67,15 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai } else { for { select { + // Should return when `session.Context()` is done. + // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: + // https://github.com/IBM/sarama/issues/1192 + // Make sure the check for session context done happens before the next message is processed. + // There is a possibility that the pod takes some time to shutdown and in case of a poison pill message, the `retry` would get interrupted (as expected), + // but the next message would be processed as a result, + // therefore dropping the poison pill message regardless of resiliency policy. + case <-session.Context().Done(): + return nil case message, ok := <-claim.Messages(): if !ok { return nil @@ -88,11 +97,6 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err) } } - // Should return when `session.Context()` is done. - // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: - // https://github.com/IBM/sarama/issues/1192 - case <-session.Context().Done(): - return nil } } } From 4c538165900300ac01a0ad7f2fbd5f62af27dd3c Mon Sep 17 00:00:00 2001 From: Ryan Despain <166053905+soisyourface@users.noreply.github.com> Date: Mon, 9 Sep 2024 08:27:10 -0600 Subject: [PATCH 3/4] Binding AWS Kinesis - reuse client credentials (#3509) Signed-off-by: arr Co-authored-by: arr Co-authored-by: Yaron Schneider --- bindings/aws/kinesis/kinesis.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 8f403e3a2f..94bf590bb4 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -23,7 +23,6 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/cenkalti/backoff/v4" @@ -129,7 +128,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error if m.KinesisConsumerMode == SharedThroughput { kclConfig := config.NewKinesisClientLibConfigWithCredential(m.ConsumerName, m.StreamName, m.Region, m.ConsumerName, - credentials.NewStaticCredentials(m.AccessKey, m.SecretKey, "")) + client.Config.Credentials) a.workerConfig = kclConfig } From 181592079f3ef198ee246d8f8e365e3a76726cb2 Mon Sep 17 00:00:00 2001 From: Loong Dai Date: Mon, 9 Sep 2024 23:45:10 +0800 Subject: [PATCH 4/4] CI: correct upgrade lint path (#3527) Signed-off-by: Loong --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index cbbcfa9641..aef12f0691 100644 --- a/Makefile +++ b/Makefile @@ -100,6 +100,7 @@ verify-linter-version: echo "[!] Yours: $(INSTALLED_LINT_VERSION)"; \ echo "[!] Theirs: $(GH_LINT_VERSION)"; \ echo "[!] Upgrade: curl -sSfL https://github.com/raw/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin $(GH_LINT_VERSION)"; \ + GOLANGCI_LINT=$(go env GOPATH)/bin/$(GOLANGCI_LINT) sleep 3; \ fi;