Skip to content

Commit

Permalink
fix(proto): handle V3 member metadata and empty owned partitions
Browse files Browse the repository at this point in the history
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Aug 21, 2023
1 parent 96c37d1 commit e9bd1b8
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
60 changes: 52 additions & 8 deletions consumer_group_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type ConsumerGroupMemberMetadata struct {
Topics []string
UserData []byte
OwnedPartitions []*OwnedPartition
GenerationID int32
RackID *string
}

func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
Expand All @@ -22,6 +24,27 @@ func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
return err
}

if m.Version >= 1 {
if err := pe.putArrayLength(len(m.OwnedPartitions)); err != nil {
return err
}
for _, op := range m.OwnedPartitions {
if err := op.encode(pe); err != nil {
return err
}
}
}

if m.Version >= 2 {
pe.putInt32(m.GenerationID)
}

if m.Version >= 3 {
if err := pe.putNullableString(m.RackID); err != nil {
return err
}
}

return nil
}

Expand All @@ -48,18 +71,29 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
}
return err
}
if n == 0 {
return nil
}
m.OwnedPartitions = make([]*OwnedPartition, n)
for i := 0; i < n; i++ {
m.OwnedPartitions[i] = &OwnedPartition{}
if err := m.OwnedPartitions[i].decode(pd); err != nil {
return err
if n > 0 {
m.OwnedPartitions = make([]*OwnedPartition, n)
for i := 0; i < n; i++ {
m.OwnedPartitions[i] = &OwnedPartition{}
if err := m.OwnedPartitions[i].decode(pd); err != nil {
return err
}
}
}
}

if m.Version >= 2 {
if m.GenerationID, err = pd.getInt32(); err != nil {
return err
}
}

if m.Version >= 3 {
if m.RackID, err = pd.getNullableString(); err != nil {
return err
}
}

return nil
}

Expand All @@ -68,6 +102,16 @@ type OwnedPartition struct {
Partitions []int32
}

func (m *OwnedPartition) encode(pe packetEncoder) error {
if err := pe.putString(m.Topic); err != nil {
return err
}
if err := pe.putInt32Array(m.Partitions); err != nil {
return err
}
return nil
}

func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
if m.Topic, err = pd.getString(); err != nil {
return err
Expand Down
17 changes: 17 additions & 0 deletions consumer_group_members_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ var (
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
}

groupMemberMetadataV3NilOwned = []byte{
0, 3, // Version
0, 0, 0, 1, // Topic array length
0, 3, 'o', 'n', 'e', // Topic one
0, 0, 0, 3, 0x01, 0x02, 0x03, // Userdata
0, 0, 0, 0, // OwnedPartitions KIP-429
0, 0, 0, 64, // GenerationID
0, 4, 'r', 'a', 'c', 'k', // RackID
}
)

func TestConsumerGroupMemberMetadata(t *testing.T) {
Expand Down Expand Up @@ -77,6 +87,13 @@ func TestConsumerGroupMemberMetadataV1Decode(t *testing.T) {
}
}

func TestConsumerGroupMemberMetadataV3Decode(t *testing.T) {
meta := new(ConsumerGroupMemberMetadata)
if err := decode(groupMemberMetadataV3NilOwned, meta, nil); err != nil {
t.Error("Failed to decode V3 data", err)
}
}

func TestConsumerGroupMemberAssignment(t *testing.T) {
amt := &ConsumerGroupMemberAssignment{
Version: 0,
Expand Down

0 comments on commit e9bd1b8

Please sign in to comment.