Skip to content

Commit

Permalink
Merge branch 'main' into conversation
Browse files Browse the repository at this point in the history
  • Loading branch information
yaron2 authored Sep 9, 2024
2 parents 70a773b + 1815920 commit 934f744
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 11 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ verify-linter-version:
echo "[!] Yours: $(INSTALLED_LINT_VERSION)"; \
echo "[!] Theirs: $(GH_LINT_VERSION)"; \
echo "[!] Upgrade: curl -sSfL https://github.com/raw/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin $(GH_LINT_VERSION)"; \
GOLANGCI_LINT=$(go env GOPATH)/bin/$(GOLANGCI_LINT)
sleep 3; \
fi;

Expand Down
3 changes: 1 addition & 2 deletions bindings/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -129,7 +128,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error
if m.KinesisConsumerMode == SharedThroughput {
kclConfig := config.NewKinesisClientLibConfigWithCredential(m.ConsumerName,
m.StreamName, m.Region, m.ConsumerName,
credentials.NewStaticCredentials(m.AccessKey, m.SecretKey, ""))
client.Config.Credentials)
a.workerConfig = kclConfig
}

Expand Down
14 changes: 9 additions & 5 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
} else {
for {
select {
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
// Make sure the check for session context done happens before the next message is processed.
// There is a possibility that the pod takes some time to shutdown and in case of a poison pill message, the `retry` would get interrupted (as expected),
// but the next message would be processed as a result,
// therefore dropping the poison pill message regardless of resiliency policy.
case <-session.Context().Done():
return nil
case message, ok := <-claim.Messages():
if !ok {
return nil
Expand All @@ -88,11 +97,6 @@ func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
consumer.k.logger.Errorf("Error processing Kafka message: %s/%d/%d [key=%s]. Error: %v.", message.Topic, message.Partition, message.Offset, asBase64String(message.Key), err)
}
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package kafka

import (
"bytes"
"context"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -263,7 +264,9 @@ func getSchemaSubject(topic string) string {
}

func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
// Null Data is valid and a tombstone record. It shouldn't be serialized
// Null Data is valid and a tombstone record.
// It shouldn't be going through schema validation and decoding
// Instead directly convert to JSON `null`
if message.Value == nil {
return []byte("null"), nil
}
Expand Down Expand Up @@ -354,8 +357,9 @@ func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error)
}

func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
// Null Data is valid and a tombstone record. It shouldn't be serialized
if data == nil {
// Null Data is valid and a tombstone record.
// It should be converted to NULL and not go through schema validation & encoding
if bytes.Equal(data, []byte("null")) || data == nil {
return nil, nil
}

Expand Down
9 changes: 8 additions & 1 deletion common/component/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,14 @@ func TestSerializeValueCachingDisabled(t *testing.T) {
require.NoError(t, err)
})

t.Run("value published null, no error", func(t *testing.T) {
t.Run("value published 'null', no error", func(t *testing.T) {
act, err := k.SerializeValue("my-topic", []byte("null"), map[string]string{"valueSchemaType": "Avro"})

require.Nil(t, act)
require.NoError(t, err)
})

t.Run("value published nil, no error", func(t *testing.T) {
act, err := k.SerializeValue("my-topic", nil, map[string]string{"valueSchemaType": "Avro"})

require.Nil(t, act)
Expand Down

0 comments on commit 934f744

Please sign in to comment.