Skip to content

Commit

Permalink
Memberlist ignore old tombstones (#4420)
Browse files Browse the repository at this point in the history
* Memberlist KV will no longer consider old tombstones as a "change" and will not gossip about them.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* CHANGELOG.md

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany authored Aug 16, 2021
1 parent 399c860 commit 090988c
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [CHANGE] Some files and directories created by Mimir components on local disk now have stricter permissions, and are only readable by owner, but not group or others. #4394
* [CHANGE] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328
* [CHANGE] Memberlist: forward only changes, not entire original message. #4419
* [CHANGE] Memberlist: don't accept old tombstones as incoming change, and don't forward such messages to other gossip members. #4420
* [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262
* [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341
* [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342
Expand Down
11 changes: 11 additions & 0 deletions pkg/ring/kv/memberlist/memberlist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,17 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
total, removed := result.RemoveTombstones(limit)
m.storeTombstones.WithLabelValues(key).Set(float64(total))
m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed))

// Remove tombstones from change too. If change turns out to be empty after this,
// we don't need to change local value either!
//
// Note that "result" and "change" may actually be the same Mergeable. That is why we
// call RemoveTombstones on "result" first, so that we get the correct metrics. Calling
// RemoveTombstones twice with same limit should be noop.
change.RemoveTombstones(limit)
if len(change.MergeContent()) == 0 {
return nil, 0, nil
}
}

newVersion := curr.version + 1
Expand Down
104 changes: 102 additions & 2 deletions pkg/ring/kv/memberlist/memberlist_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,19 @@ func (d *data) MergeContent() []string {
return out
}

func (d *data) RemoveTombstones(limit time.Time) (_, _ int) {
// nothing to do
// This method deliberately ignores zero limit, so that tests can observe LEFT state as well.
func (d *data) RemoveTombstones(limit time.Time) (total, removed int) {
for n, m := range d.Members {
if m.State == LEFT {
if time.Unix(m.Timestamp, 0).Before(limit) {
// remove it
delete(d.Members, n)
removed++
} else {
total++
}
}
}
return
}

Expand Down Expand Up @@ -1137,6 +1148,95 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) {
}}, d)
}

func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) {
codec := dataCodec{}

cfg := KVConfig{}
// We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor.
cfg.RetransmitMult = 1
cfg.LeftIngestersTimeout = 5 * time.Minute
cfg.Codecs = append(cfg.Codecs, codec)

kv := NewKV(cfg, log.NewNopLogger())
require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv))
defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck

client, err := NewClient(kv, codec)
require.NoError(t, err)

now := time.Now()

// No broadcast messages from KV at the beginning.
require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32)))

for _, tc := range []struct {
name string
valueBeforeSend *data // value in KV store before sending messsage
msgToSend *data
valueAfterSend *data // value in KV store after sending message
broadcastMessage *data // broadcasted change, if not nil
}{
// These tests follow each other (end state of KV in state is starting point in the next state).
{
name: "old tombstone, empty KV",
valueBeforeSend: nil,
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - int64(2*cfg.LeftIngestersTimeout.Seconds()), State: LEFT}}},
valueAfterSend: nil, // no change to KV
broadcastMessage: nil, // no new message
},

{
name: "recent tombstone, empty KV",
valueBeforeSend: nil,
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}},
broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}},
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
},

{
name: "old tombstone, KV contains tombstone already",
valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - 10, State: LEFT}}},
broadcastMessage: nil,
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
},

{
name: "fresh tombstone, KV contains tombstone already",
valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}},
msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}},
broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}},
valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT, Tokens: []uint32{}}}},
},
} {
t.Run(tc.name, func(t *testing.T) {
d := getData(t, client, key)
if tc.valueBeforeSend == nil {
require.True(t, d == nil || len(d.Members) == 0)
} else {
require.Equal(t, tc.valueBeforeSend, d, "valueBeforeSend")
}

kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend))

bs := kv.GetBroadcasts(0, math.MaxInt32)
if tc.broadcastMessage == nil {
require.Equal(t, 0, len(bs), "expected no broadcast message")
} else {
require.Equal(t, 1, len(bs), "expected broadcast message")
require.Equal(t, tc.broadcastMessage, decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec))
}

d = getData(t, client, key)
if tc.valueAfterSend == nil {
require.True(t, d == nil || len(d.Members) == 0)
} else {
require.Equal(t, tc.valueAfterSend, d, "valueAfterSend")
}
})
}
}

func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data {
kvp := KeyValuePair{}
require.NoError(t, kvp.Unmarshal(marshalledKVP))
Expand Down

0 comments on commit 090988c

Please sign in to comment.