diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f50d722e..adcb9417d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,3 +24,4 @@ * [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80 * [BUGFIX] spanlogger: Support multiple tenant IDs. #59 * [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85 +* [ENHANCEMENT] ring/util.go: make WaitRingStability more responsive #96 \ No newline at end of file diff --git a/ring/ring.go b/ring/ring.go index 63e3a547c..bb2c0390d 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -193,6 +193,8 @@ type Ring struct { // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring + statesWatcher watcher + memberOwnershipGaugeVec *prometheus.GaugeVec numMembersGaugeVec *prometheus.GaugeVec totalTokensGauge prometheus.Gauge @@ -328,6 +330,11 @@ func (r *Ring) updateRingState(ringDesc *Desc) { } rc := prevRing.RingCompare(ringDesc) + + if rc != Equal { + r.statesWatcher.notify() + } + if rc == Equal || rc == EqualButStatesAndTimestamps { // No need to update tokens or zones. Only states and timestamps // have changed. (If Equal, nothing has changed, but that doesn't happen diff --git a/ring/util.go b/ring/util.go index a836aa2fc..281363c5b 100644 --- a/ring/util.go +++ b/ring/util.go @@ -102,27 +102,28 @@ func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, // Get the initial ring state. ringLastState, _ := r.GetAllHealthy(op) // nolint:errcheck - ringLastStateTs := time.Now() - const pollingFrequency = time.Second - pollingTicker := time.NewTicker(pollingFrequency) - defer pollingTicker.Stop() + // This timer will be our state + stabilityTimer := time.NewTimer(minStability) + + stateWatch, _ := r.statesWatcher.Register() + defer r.statesWatcher.UnRegister() for { select { case <-ctx.Done(): return ctx.Err() - case <-pollingTicker.C: - // We ignore the error because in case of error it will return an empty - // replication set which we use to compare with the previous state. + case <-stabilityTimer.C: currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck - if HasReplicationSetChanged(ringLastState, currRingState) { ringLastState = currRingState - ringLastStateTs = time.Now() - } else if time.Since(ringLastStateTs) >= minStability { + stabilityTimer = time.NewTimer(minStability) + } else { return nil } + case <-stateWatch: + ringLastState, _ = r.GetAllHealthy(op) // nolint:errcheck + stabilityTimer = time.NewTimer(minStability) } } } diff --git a/ring/watcher.go b/ring/watcher.go new file mode 100644 index 000000000..2058d3802 --- /dev/null +++ b/ring/watcher.go @@ -0,0 +1,54 @@ +package ring + +import ( + "fmt" + "sync" +) + +type Watcher interface { + Register() (<-chan interface{}, error) + UnRegister() + notify() +} + +type watcher struct { + mtx sync.Mutex + notifier chan interface{} +} + +func (s *watcher) Register() (<-chan interface{}, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + if s.notifier != nil { + return nil, fmt.Errorf("only one watcher allowed at a time") + } + s.notifier = make(chan interface{}) + return s.notifier, nil +} + +func (s *watcher) UnRegister() { + s.mtx.Lock() + defer s.mtx.Unlock() + if s.notifier != nil { + close(s.notifier) + } + s.notifier = nil +} + +// Handles notifying listeners about state change in the Ring +// Assumes that the callee will check that there is meaningfull change in the states +func (s *watcher) notify() { + s.mtx.Lock() + defer s.mtx.Unlock() + if s.notifier == nil { + return + } + + select { + case s.notifier <- 1: // notify + return + default: // already notified - throttle + return + } +} diff --git a/ring/watcher_test.go b/ring/watcher_test.go new file mode 100644 index 000000000..1f81021d1 --- /dev/null +++ b/ring/watcher_test.go @@ -0,0 +1,103 @@ +package ring + +import ( + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func TestRegister(t *testing.T) { + w := watcher{} + + ch, err := w.Register() + defer w.UnRegister() + + assert.IsType(t, make(<-chan interface{}), ch, "should return a channel") + assert.NoError(t, err, "should not have error") +} + +func TestDoubleRegister(t *testing.T) { + w := watcher{} + _, regErr := w.Register() + defer w.UnRegister() + ch, err := w.Register() + + assert.NoError(t, regErr, "should be able to register") + assert.Nil(t, ch, "should not return channel") + assert.Error(t, err, "should have an error") +} + +func TestRegisterAfterUnregister(t *testing.T) { + w := watcher{} + _, regErr := w.Register() + w.UnRegister() + ch, err := w.Register() + defer w.UnRegister() + + assert.NoError(t, regErr, "should be able to register") + assert.IsType(t, make(<-chan interface{}), ch, "should return a channel") + assert.NoError(t, err, "should not have error") +} + +func TestUnRegisterNone(t *testing.T) { + w := watcher{} + w.UnRegister() // does not block, crash +} + +func TestNotifyNoRegister(t *testing.T) { + w := watcher{} + w.notify() // does not block, crash +} + +func TestNotify(t *testing.T) { + w := watcher{} + ch, regErr := w.Register() + defer w.UnRegister() + + received := make(chan interface{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + <-ch + received <- 1 + wg.Done() + }() +loop: + for { + select { + case <-received: + break loop + default: + w.notify() + } + } + wg.Wait() + assert.NoError(t, regErr, "should be able to register") +} + +func TestNotifyNeverBlocks(t *testing.T) { + w := watcher{} + _, regErr := w.Register() + defer w.UnRegister() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + w.notify() + w.notify() + w.notify() + wg.Done() + }() + wg.Wait() + assert.NoError(t, regErr, "should be able to register") +} + +func TestUnRegisterStopsNotify(t *testing.T) { + w := watcher{} + ch, regErr := w.Register() + w.UnRegister() + _, ok := <-ch + + assert.NoError(t, regErr, "should be able to register") + assert.False(t, ok, "Channel should be closed") +}