-
Notifications
You must be signed in to change notification settings - Fork 68
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ring/util.go: make WaitRingStability more responsive
The current method polls the ring member states at every second. However given: 1) large number of members 2) member state changing after stability reached from one valid state to another e.g. JOINING->ACTIVE. Some members will change their state after stability, earlier then others. Since polling takes 1 second, the remaining members may notice this quite late, thus starting a new 2 second minStability interval. In testing this comes up frequently, so for 20 members , reaching stability for all members can be over 30 seconds. This patch changes the logic so that instead of polling, the algorithm notifies WaitRingStability as soon as possible about changes, thus no time is wasted by waiting when the stability is _already_ lost. For 20 members it reduced the time from 30s to ~7s to reach stability. It is possible to achieve the same result by reducing the polling period to 10ms - at least in the 20 member case, but being notified seems less arbitrary. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
- Loading branch information
Showing
5 changed files
with
176 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package ring | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
) | ||
|
||
type Watcher interface { | ||
Register() (<-chan interface{}, error) | ||
UnRegister() | ||
notify() | ||
} | ||
|
||
type watcher struct { | ||
mtx sync.Mutex | ||
notifier chan interface{} | ||
} | ||
|
||
func (s *watcher) Register() (<-chan interface{}, error) { | ||
s.mtx.Lock() | ||
defer s.mtx.Unlock() | ||
|
||
if s.notifier != nil { | ||
return nil, fmt.Errorf("only one watcher allowed at a time") | ||
} | ||
s.notifier = make(chan interface{}) | ||
return s.notifier, nil | ||
} | ||
|
||
func (s *watcher) UnRegister() { | ||
s.mtx.Lock() | ||
defer s.mtx.Unlock() | ||
if s.notifier != nil { | ||
close(s.notifier) | ||
} | ||
s.notifier = nil | ||
} | ||
|
||
// Handles notifying listeners about state change in the Ring | ||
// Assumes that the callee will check that there is meaningfull change in the states | ||
func (s *watcher) notify() { | ||
s.mtx.Lock() | ||
defer s.mtx.Unlock() | ||
if s.notifier == nil { | ||
return | ||
} | ||
|
||
select { | ||
case s.notifier <- 1: // notify | ||
return | ||
default: // already notified - throttle | ||
return | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package ring | ||
|
||
import ( | ||
"github.com/stretchr/testify/assert" | ||
"sync" | ||
"testing" | ||
) | ||
|
||
func TestRegister(t *testing.T) { | ||
w := watcher{} | ||
|
||
ch, err := w.Register() | ||
defer w.UnRegister() | ||
|
||
assert.IsType(t, make(<-chan interface{}), ch, "should return a channel") | ||
assert.NoError(t, err, "should not have error") | ||
} | ||
|
||
func TestDoubleRegister(t *testing.T) { | ||
w := watcher{} | ||
_, regErr := w.Register() | ||
defer w.UnRegister() | ||
ch, err := w.Register() | ||
|
||
assert.NoError(t, regErr, "should be able to register") | ||
assert.Nil(t, ch, "should not return channel") | ||
assert.Error(t, err, "should have an error") | ||
} | ||
|
||
func TestRegisterAfterUnregister(t *testing.T) { | ||
w := watcher{} | ||
_, regErr := w.Register() | ||
w.UnRegister() | ||
ch, err := w.Register() | ||
defer w.UnRegister() | ||
|
||
assert.NoError(t, regErr, "should be able to register") | ||
assert.IsType(t, make(<-chan interface{}), ch, "should return a channel") | ||
assert.NoError(t, err, "should not have error") | ||
} | ||
|
||
func TestUnRegisterNone(t *testing.T) { | ||
w := watcher{} | ||
w.UnRegister() // does not block, crash | ||
} | ||
|
||
func TestNotifyNoRegister(t *testing.T) { | ||
w := watcher{} | ||
w.notify() // does not block, crash | ||
} | ||
|
||
func TestNotify(t *testing.T) { | ||
w := watcher{} | ||
ch, regErr := w.Register() | ||
defer w.UnRegister() | ||
|
||
received := make(chan interface{}) | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
<-ch | ||
received <- 1 | ||
wg.Done() | ||
}() | ||
loop: | ||
for { | ||
select { | ||
case <-received: | ||
break loop | ||
default: | ||
w.notify() | ||
} | ||
} | ||
wg.Wait() | ||
assert.NoError(t, regErr, "should be able to register") | ||
} | ||
|
||
func TestNotifyNeverBlocks(t *testing.T) { | ||
w := watcher{} | ||
_, regErr := w.Register() | ||
defer w.UnRegister() | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
w.notify() | ||
w.notify() | ||
w.notify() | ||
wg.Done() | ||
}() | ||
wg.Wait() | ||
assert.NoError(t, regErr, "should be able to register") | ||
} | ||
|
||
func TestUnRegisterStopsNotify(t *testing.T) { | ||
w := watcher{} | ||
ch, regErr := w.Register() | ||
w.UnRegister() | ||
_, ok := <-ch | ||
|
||
assert.NoError(t, regErr, "should be able to register") | ||
assert.False(t, ok, "Channel should be closed") | ||
} |