Skip to content

Commit

Permalink
ring/util: allow waiting for ring stability with good state transitions
Browse files Browse the repository at this point in the history
In some use cases, such as the store gateway, it is enough for the
user to know that the token distribution didn't change and it is not
important to know if some member changed between allowed states.

This patch implements the new WaitRingTokensStability function to
be able to wait for tokens stability and ignore allowed state changes.

The original motivation was that store gateway tests took a minute longer
then strictly necessary.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed Dec 14, 2021
1 parent 2cdfa2f commit 63e9a9f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95
50 changes: 17 additions & 33 deletions ring/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,35 +130,24 @@ func (r ReplicationSet) GetAddressesWithout(exclude string) []string {
// HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps),
// false if they differ in any way (number of instances, instance states, tokens, zones, ...).
func HasReplicationSetChanged(before, after ReplicationSet) bool {
beforeInstances := before.Instances
afterInstances := after.Instances

if len(beforeInstances) != len(afterInstances) {
return true
}

sort.Sort(ByAddr(beforeInstances))
sort.Sort(ByAddr(afterInstances))

for i := 0; i < len(beforeInstances); i++ {
b := beforeInstances[i]
a := afterInstances[i]

// Exclude the heartbeat timestamp from the comparison.
b.Timestamp = 0
a.Timestamp = 0

if !b.Equal(a) {
return true
}
}

return false
return hasReplicationSetChangedExcluding(before, after, func(i *InstanceDesc) {
i.Timestamp = 0
})
}

// HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps),
// false if they differ in any way (number of instances, instance states, tokens, zones, ...).
// HasReplicationSetChangedWithoutState returns true if two replications sets
// are the same (with possibly different timestamps and instance states),
// false if they differ in any other way (number of instances, tokens, zones, ...).
func HasReplicationSetChangedWithoutState(before, after ReplicationSet) bool {
return hasReplicationSetChangedExcluding(before, after, func(i *InstanceDesc) {
i.Timestamp = 0
i.State = PENDING
})
}

// Do comparison of replicasets, but apply a function first
// to be able to exclude (reset) some values
func hasReplicationSetChangedExcluding(before, after ReplicationSet, exclude func(*InstanceDesc)) bool {
beforeInstances := before.Instances
afterInstances := after.Instances

Expand All @@ -173,13 +162,8 @@ func HasReplicationSetChangedWithoutState(before, after ReplicationSet) bool {
b := beforeInstances[i]
a := afterInstances[i]

// Exclude the heartbeat timestamp from the comparison.
b.Timestamp = 0
a.Timestamp = 0

// Exclude the state
b.State = PENDING
a.State = PENDING
exclude(&a)
exclude(&b)

if !b.Equal(a) {
return true
Expand Down
57 changes: 57 additions & 0 deletions ring/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,60 @@ func TestReplicationSet_Do(t *testing.T) {
})
}
}

func TestHasReplicationSetChangedWithoutStateIgnoresTimeStampAndState(t *testing.T) {
// Only testing difference to underlying Equal function
rs1 := ReplicationSet{
Instances: []InstanceDesc{
{Addr: "127.0.0.1"},
{Addr: "127.0.0.2"},
{Addr: "127.0.0.3"},
},
}
tests := map[string]struct {
rs ReplicationSet
expectHasReplicationSetChanged bool
expectHasReplicationSetChangedWithoutState bool
}{
"timestamp changed": {
ReplicationSet{
Instances: []InstanceDesc{
{Addr: "127.0.0.1", Timestamp: time.Hour.Microseconds()},
{Addr: "127.0.0.2"},
{Addr: "127.0.0.3"},
},
},
false,
false,
},
"state changed": {
ReplicationSet{
Instances: []InstanceDesc{
{Addr: "127.0.0.1", State: PENDING},
{Addr: "127.0.0.2"},
{Addr: "127.0.0.3"},
},
},
true,
false,
},
"more instances": {
ReplicationSet{
Instances: []InstanceDesc{
{Addr: "127.0.0.1"},
{Addr: "127.0.0.2"},
{Addr: "127.0.0.3"},
{Addr: "127.0.0.4"},
},
},
true,
true,
},
}
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
assert.Equal(t, testData.expectHasReplicationSetChanged, HasReplicationSetChanged(rs1, testData.rs), "HasReplicationSetChanged wrong result")
assert.Equal(t, testData.expectHasReplicationSetChangedWithoutState, HasReplicationSetChangedWithoutState(rs1, testData.rs), "HasReplicationSetChangedWithoutState wrong result")
})
}
}
44 changes: 11 additions & 33 deletions ring/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,41 +96,19 @@ func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state
// WaitRingStability monitors the ring topology for the provided operation and waits until it
// keeps stable for at least minStability.
func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error {
// Configure the max waiting time as a context deadline.
ctx, cancel := context.WithTimeout(ctx, maxWaiting)
defer cancel()

// Get the initial ring state.
ringLastState, _ := r.GetAllHealthy(op) // nolint:errcheck
ringLastStateTs := time.Now()

const pollingFrequency = time.Second
pollingTicker := time.NewTicker(pollingFrequency)
defer pollingTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-pollingTicker.C:
// We ignore the error because in case of error it will return an empty
// replication set which we use to compare with the previous state.
currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck

if HasReplicationSetChanged(ringLastState, currRingState) {
ringLastState = currRingState
ringLastStateTs = time.Now()
} else if time.Since(ringLastStateTs) >= minStability {
return nil
}
}
}
return waitStability(ctx, r, op, minStability, maxWaiting, HasReplicationSetChanged)
}

// WaitRingTokensStability waits for the Tokens in the Ring to be unchanged at
// least for minStability time period. This can be used to avoid wasting
// resources on moving data around due to multiple changes in the Ring.
// WaitRingTokensStability waits for the Ring to be unchanged at
// least for minStability time period, excluding transitioninig between
// allowed states (e.g. JOINING->ACTIVE if allowed by op).
// This can be used to avoid wasting resources on moving data around
// due to multiple changes in the Ring.
func WaitRingTokensStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error {
return waitStability(ctx, r, op, minStability, maxWaiting, HasReplicationSetChangedWithoutState)
}

func waitStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration, isChanged func(ReplicationSet, ReplicationSet) bool) error {
// Configure the max waiting time as a context deadline.
ctx, cancel := context.WithTimeout(ctx, maxWaiting)
defer cancel()
Expand All @@ -152,7 +130,7 @@ func WaitRingTokensStability(ctx context.Context, r *Ring, op Operation, minStab
// replication set which we use to compare with the previous state.
currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck

if HasReplicationSetChangedWithoutState(ringLastState, currRingState) {
if isChanged(ringLastState, currRingState) {
ringLastState = currRingState
ringLastStateTs = time.Now()
} else if time.Since(ringLastStateTs) >= minStability {
Expand Down

0 comments on commit 63e9a9f

Please sign in to comment.