From c5359575e51eee3a9d534dc38a3d87cf841c13cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Mon, 27 Dec 2021 16:37:10 +0100 Subject: [PATCH] ring/ring.go: reactive update of metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When writing e2e tests for services using Ring, it is natural to wait for having all tokens assigned. With the default 10s interval for updating metrics, this slows down tests considerably. This was not the case before PR 50 where metrics were collected on request. This commit allows the update of metrics when there is a change in the ring, i.e. when KV.Watch() is triggered. To not hold up watching and also skip through high frequency changes, the metrics update is decoupled from watch via a channel. A further optimization is to not hold the lock while calculating the metrics, just copy the Ring.ringDesc pointer which points to a constant state. Signed-off-by: György Krajcsovits --- ring/ring.go | 64 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/ring/ring.go b/ring/ring.go index b9a11b83a..d8938de65 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -286,6 +286,8 @@ func (r *Ring) starting(ctx context.Context) error { func (r *Ring) loop(ctx context.Context) error { // Update the ring metrics at start of the main loop. r.updateRingMetrics() + + metricChan := make(chan struct{}, 1) go func() { // Start metrics update ticker to update the ring metrics. ticker := time.NewTicker(10 * time.Second) @@ -293,6 +295,8 @@ func (r *Ring) loop(ctx context.Context) error { for { select { + case <-metricChan: + r.updateRingMetrics() case <-ticker.C: r.updateRingMetrics() case <-ctx.Done(): @@ -307,13 +311,20 @@ func (r *Ring) loop(ctx context.Context) error { return true } - r.updateRingState(value.(*Desc)) + rc := r.updateRingState(value.(*Desc)) + if rc != Equal { + select { + case metricChan <- struct{}{}: + default: + } + } + return true }) return nil } -func (r *Ring) updateRingState(ringDesc *Desc) { +func (r *Ring) updateRingState(ringDesc *Desc) CompareResult { r.mtx.RLock() prevRing := r.ringDesc r.mtx.RUnlock() @@ -335,7 +346,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { r.mtx.Lock() r.ringDesc = ringDesc r.mtx.Unlock() - return + return rc } now := time.Now() @@ -356,6 +367,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // Invalidate all cached subrings. r.shuffledSubringCache = make(map[subringCacheKey]*Ring) } + return rc } // Get returns n (or more) instances which form the replicas for the given key. @@ -564,12 +576,46 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { return numTokens, owned } -// updateRingMetrics updates ring metrics. +// countTokens returns the number of tokens and tokens within the range for each instance. +func countTokensUtil(ringDesc *Desc) (map[string]uint32, map[string]uint32) { + ringTokens := ringDesc.GetTokens() + ringInstanceByToken := ringDesc.getTokensInfo() + owned := map[string]uint32{} + numTokens := map[string]uint32{} + for i, token := range ringTokens { + var diff uint32 + + // Compute how many tokens are within the range. + if i+1 == len(ringTokens) { + diff = (math.MaxUint32 - token) + ringTokens[0] + } else { + diff = ringTokens[i+1] - token + } + + info := ringInstanceByToken[token] + numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 + owned[info.InstanceID] = owned[info.InstanceID] + diff + } + + // Set to 0 the number of owned tokens by instances which don't have tokens yet. + for id := range ringDesc.Ingesters { + if _, ok := owned[id]; !ok { + owned[id] = 0 + numTokens[id] = 0 + } + } + + return numTokens, owned +} + func (r *Ring) updateRingMetrics() { - r.mtx.RLock() - defer r.mtx.RUnlock() + r.mtx.Lock() + // There is no copy here as the ringDesc object is not modified over time, + // just replaced. + ringDesc := r.ringDesc + r.mtx.Unlock() - numTokens, ownedRange := r.countTokens() + numTokens, ownedRange := countTokensUtil(ringDesc) for id, totalOwned := range ownedRange { r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) @@ -578,13 +624,13 @@ func (r *Ring) updateRingMetrics() { numByState := map[string]int{} oldestTimestampByState := map[string]int64{} - // Initialised to zero so we emit zero-metrics (instead of not emitting anything) + // Initialized to zero so we emit zero-metrics (instead of not emitting anything) for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} { numByState[s] = 0 oldestTimestampByState[s] = 0 } - for _, instance := range r.ringDesc.Ingesters { + for _, instance := range ringDesc.Ingesters { s := instance.State.String() if !r.IsHealthy(&instance, Reporting, time.Now()) { s = unhealthy