Skip to content

Commit

Permalink
Merge pull request #93 from grafana/memberlist-no-denormalize
Browse files Browse the repository at this point in the history
Memberlist/Ring: do not normalise tokens on subsequent Merge().
  • Loading branch information
Tyler Reid authored Dec 16, 2021
2 parents 84c00da + 84360ae commit 1f49a70
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* [ENHANCEMENT] Replace go-kit/kit/log with go-kit/log. #52
* [ENHANCEMENT] Add spanlogger package. #42
* [ENHANCEMENT] Add runutil.CloseWithLogOnErr function. #58
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 #84 #91
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #76 #77 #84 #91 #93
* [ENHANCEMENT] Memberlist: prepare the data to send on the write before starting counting the elapsed time for `-memberlist.packet-write-timeout`, in order to reduce chances we hit the timeout when sending a packet to other node. #89
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [ENHANCEMENT] Added option to BasicLifecycler to keep instance in the ring when stopping. #97
Expand Down
42 changes: 42 additions & 0 deletions ring/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,26 @@ func TestMergeLeft(t *testing.T) {
}
}

// Not normalised because it contains duplicate and unsorted tokens.
firstRingNotNormalised := func() *Desc {
return &Desc{
Ingesters: map[string]InstanceDesc{
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now, State: JOINING, Tokens: []uint32{20, 10, 5, 10, 20, 100, 200, 100}},
},
}
}

secondRing := func() *Desc {
return &Desc{
Ingesters: map[string]InstanceDesc{
"Ing 2": {Addr: "addr2", Timestamp: now, State: LEFT},
},
}
}

// Not normalised because it contains a LEFT ingester with tokens.
secondRingNotNormalised := func() *Desc {
return &Desc{
Ingesters: map[string]InstanceDesc{
"Ing 2": {Addr: "addr2", Timestamp: now, State: LEFT, Tokens: []uint32{5, 10, 20, 100, 200}},
Expand Down Expand Up @@ -300,6 +319,17 @@ func TestMergeLeft(t *testing.T) {
},
}, ch)
}
{
// Should yield same result when RHS is not normalised.
our, ch := merge(firstRing(), secondRingNotNormalised())
assert.Equal(t, expectedFirstSecondMerge(), our)
assert.Equal(t, &Desc{
Ingesters: map[string]InstanceDesc{
"Ing 2": {Addr: "addr2", Timestamp: now, State: LEFT},
},
}, ch)

}

{ // idempotency: (no change after applying same ring again)
our, ch := merge(expectedFirstSecondMerge(), secondRing())
Expand All @@ -317,6 +347,18 @@ func TestMergeLeft(t *testing.T) {
},
}, ch)
}
{
// Should yield same result when RHS is not normalised.
our, ch := merge(secondRing(), firstRingNotNormalised())
assert.Equal(t, expectedFirstSecondMerge(), our)
// when merging first into second ring, only "Ing 1" is new
assert.Equal(t, &Desc{
Ingesters: map[string]InstanceDesc{
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
},
}, ch)

}

{ // associativity: Merge(Merge(first, second), third) == Merge(first, Merge(second, third))
our1, _ := merge(firstRing(), secondRing())
Expand Down
7 changes: 6 additions & 1 deletion ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ func (i *InstanceDesc) IsReady(now time.Time, heartbeatTimeout time.Duration) er
//
// This method is part of memberlist.Mergeable interface, and is only used by gossiping ring.
//
// The receiver must be normalised, that is, the token lists must sorted and not contain
// duplicates. The function guarantees that the receiver will be left in this normalised state,
// so multiple subsequent Merge calls are valid usage.
//
// The Mergeable passed as the parameter does not need to be normalised.
//
// Note: This method modifies d and mergeable to reduce allocations and copies.
func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error) {
return d.mergeWithTime(mergeable, localCAS, time.Now())
Expand All @@ -195,7 +201,6 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now
return nil, nil
}

normalizeIngestersMap(d)
normalizeIngestersMap(other)

thisIngesterMap := d.Ingesters
Expand Down

0 comments on commit 1f49a70

Please sign in to comment.