diff --git a/CHANGELOG.md b/CHANGELOG.md index f52ab2ec52..aabd99c941 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [CHANGE] Some files and directories created by Mimir components on local disk now have stricter permissions, and are only readable by owner, but not group or others. #4394 * [CHANGE] Compactor: compactor will no longer try to compact blocks that are already marked for deletion. Previously compactor would consider blocks marked for deletion within `-compactor.deletion-delay / 2` period as eligible for compaction. #4328 * [CHANGE] Memberlist: forward only changes, not entire original message. #4419 +* [CHANGE] Memberlist: don't accept old tombstones as incoming change, and don't forward such messages to other gossip members. #4420 * [ENHANCEMENT] Add timeout for waiting on compactor to become ACTIVE in the ring. #4262 * [ENHANCEMENT] Reduce memory used by streaming queries, particularly in ruler. #4341 * [ENHANCEMENT] Ring: allow experimental configuration of disabling of heartbeat timeouts by setting the relevant configuration value to zero. Applies to the following: #4342 diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index 0121509887..01b3b88fad 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -1201,6 +1201,17 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui total, removed := result.RemoveTombstones(limit) m.storeTombstones.WithLabelValues(key).Set(float64(total)) m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed)) + + // Remove tombstones from change too. If change turns out to be empty after this, + // we don't need to change local value either! + // + // Note that "result" and "change" may actually be the same Mergeable. That is why we + // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling + // RemoveTombstones twice with same limit should be noop. + change.RemoveTombstones(limit) + if len(change.MergeContent()) == 0 { + return nil, 0, nil + } } newVersion := curr.version + 1 diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 150db4896d..25630cded3 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -84,8 +84,19 @@ func (d *data) MergeContent() []string { return out } -func (d *data) RemoveTombstones(limit time.Time) (_, _ int) { - // nothing to do +// This method deliberately ignores zero limit, so that tests can observe LEFT state as well. +func (d *data) RemoveTombstones(limit time.Time) (total, removed int) { + for n, m := range d.Members { + if m.State == LEFT { + if time.Unix(m.Timestamp, 0).Before(limit) { + // remove it + delete(d.Members, n) + removed++ + } else { + total++ + } + } + } return } @@ -1137,6 +1148,95 @@ func TestNotifyMsgResendsOnlyChanges(t *testing.T) { }}, d) } +func TestSendingOldTombstoneShouldNotForwardMessage(t *testing.T) { + codec := dataCodec{} + + cfg := KVConfig{} + // We will be checking for number of messages in the broadcast queue, so make sure to use known retransmit factor. + cfg.RetransmitMult = 1 + cfg.LeftIngestersTimeout = 5 * time.Minute + cfg.Codecs = append(cfg.Codecs, codec) + + kv := NewKV(cfg, log.NewNopLogger()) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), kv)) + defer services.StopAndAwaitTerminated(context.Background(), kv) //nolint:errcheck + + client, err := NewClient(kv, codec) + require.NoError(t, err) + + now := time.Now() + + // No broadcast messages from KV at the beginning. + require.Equal(t, 0, len(kv.GetBroadcasts(0, math.MaxInt32))) + + for _, tc := range []struct { + name string + valueBeforeSend *data // value in KV store before sending messsage + msgToSend *data + valueAfterSend *data // value in KV store after sending message + broadcastMessage *data // broadcasted change, if not nil + }{ + // These tests follow each other (end state of KV in state is starting point in the next state). + { + name: "old tombstone, empty KV", + valueBeforeSend: nil, + msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - int64(2*cfg.LeftIngestersTimeout.Seconds()), State: LEFT}}}, + valueAfterSend: nil, // no change to KV + broadcastMessage: nil, // no new message + }, + + { + name: "recent tombstone, empty KV", + valueBeforeSend: nil, + msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}}, + broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT}}}, + valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}}, + }, + + { + name: "old tombstone, KV contains tombstone already", + valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}}, + msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() - 10, State: LEFT}}}, + broadcastMessage: nil, + valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}}, + }, + + { + name: "fresh tombstone, KV contains tombstone already", + valueBeforeSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix(), State: LEFT, Tokens: []uint32{}}}}, + msgToSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}}, + broadcastMessage: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT}}}, + valueAfterSend: &data{Members: map[string]member{"instance": {Timestamp: now.Unix() + 10, State: LEFT, Tokens: []uint32{}}}}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + d := getData(t, client, key) + if tc.valueBeforeSend == nil { + require.True(t, d == nil || len(d.Members) == 0) + } else { + require.Equal(t, tc.valueBeforeSend, d, "valueBeforeSend") + } + + kv.NotifyMsg(marshalKeyValuePair(t, key, codec, tc.msgToSend)) + + bs := kv.GetBroadcasts(0, math.MaxInt32) + if tc.broadcastMessage == nil { + require.Equal(t, 0, len(bs), "expected no broadcast message") + } else { + require.Equal(t, 1, len(bs), "expected broadcast message") + require.Equal(t, tc.broadcastMessage, decodeDataFromMarshalledKeyValuePair(t, bs[0], key, codec)) + } + + d = getData(t, client, key) + if tc.valueAfterSend == nil { + require.True(t, d == nil || len(d.Members) == 0) + } else { + require.Equal(t, tc.valueAfterSend, d, "valueAfterSend") + } + }) + } +} + func decodeDataFromMarshalledKeyValuePair(t *testing.T, marshalledKVP []byte, key string, codec dataCodec) *data { kvp := KeyValuePair{} require.NoError(t, kvp.Unmarshal(marshalledKVP))