Skip to content

Commit

Permalink
Merge pull request #50 from treid314/remove-ring-global-metrics
Browse files Browse the repository at this point in the history
Remove ring global metrics
  • Loading branch information
Tyler Reid authored Oct 7, 2021
2 parents f3d1d9e + 232a3da commit 4c56601
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 110 deletions.
39 changes: 10 additions & 29 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/grafana/dskit/flagext"
Expand All @@ -23,26 +22,6 @@ import (
dstime "github.com/grafana/dskit/time"
)

var (
consulHeartbeats = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "member_consul_heartbeats_total",
Help: "The total number of heartbeats sent to consul.",
}, []string{"name"})
tokensOwned = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "member_ring_tokens_owned",
Help: "The number of tokens owned in the ring.",
}, []string{"name"})
tokensToOwn = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "member_ring_tokens_to_own",
Help: "The number of tokens to own in the ring.",
}, []string{"name"})
shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "shutdown_duration_seconds",
Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).",
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
}, []string{"op", "status", "name"})
)

// LifecyclerConfig is the config to build a Lifecycler.
type LifecyclerConfig struct {
RingConfig Config `yaml:"ring"`
Expand Down Expand Up @@ -145,7 +124,8 @@ type Lifecycler struct {
healthyInstancesCount int
zonesCount int

logger log.Logger
lifecyclerMetrics *LifecyclerMetrics
logger log.Logger
}

// NewLifecycler creates new Lifecycler. It must be started via StartAsync.
Expand Down Expand Up @@ -191,10 +171,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
Zone: zone,
actorChan: make(chan func()),
state: PENDING,
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
}

tokensToOwn.WithLabelValues(l.RingName).Set(float64(cfg.NumTokens))
l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens))

l.BasicService = services.
NewBasicService(nil, l.loop, l.stopping).
Expand Down Expand Up @@ -322,7 +303,7 @@ func (i *Lifecycler) getTokens() Tokens {
}

func (i *Lifecycler) setTokens(tokens Tokens) {
tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens)))
i.lifecyclerMetrics.tokensOwned.Set(float64(len(tokens)))

i.stateMtx.Lock()
defer i.stateMtx.Unlock()
Expand Down Expand Up @@ -473,7 +454,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
}

case <-heartbeatTickerChan:
consulHeartbeats.WithLabelValues(i.RingName).Inc()
i.lifecyclerMetrics.consulHeartbeats.Inc()
if err := i.updateConsul(context.Background()); err != nil {
level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
}
Expand Down Expand Up @@ -520,7 +501,7 @@ heartbeatLoop:
for {
select {
case <-heartbeatTickerChan:
consulHeartbeats.WithLabelValues(i.RingName).Inc()
i.lifecyclerMetrics.consulHeartbeats.Inc()
if err := i.updateConsul(context.Background()); err != nil {
level.Error(i.logger).Log("msg", "failed to write to the KV store, sleeping", "ring", i.RingName, "err", err)
}
Expand Down Expand Up @@ -851,17 +832,17 @@ func (i *Lifecycler) processShutdown(ctx context.Context) {
level.Info(i.logger).Log("msg", "transfers are disabled")
} else {
level.Error(i.logger).Log("msg", "failed to transfer chunks to another instance", "ring", i.RingName, "err", err)
shutdownDuration.WithLabelValues("transfer", "fail", i.RingName).Observe(time.Since(transferStart).Seconds())
i.lifecyclerMetrics.shutdownDuration.WithLabelValues("transfer", "fail").Observe(time.Since(transferStart).Seconds())
}
} else {
flushRequired = false
shutdownDuration.WithLabelValues("transfer", "success", i.RingName).Observe(time.Since(transferStart).Seconds())
i.lifecyclerMetrics.shutdownDuration.WithLabelValues("transfer", "success").Observe(time.Since(transferStart).Seconds())
}

if flushRequired {
flushStart := time.Now()
i.flushTransferer.Flush()
shutdownDuration.WithLabelValues("flush", "success", i.RingName).Observe(time.Since(flushStart).Seconds())
i.lifecyclerMetrics.shutdownDuration.WithLabelValues("flush", "success").Observe(time.Since(flushStart).Seconds())
}

// Sleep so the shutdownDuration metric can be collected.
Expand Down
40 changes: 40 additions & 0 deletions ring/lifecycler_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ring

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type LifecyclerMetrics struct {
consulHeartbeats prometheus.Counter
tokensOwned prometheus.Gauge
tokensToOwn prometheus.Gauge
shutdownDuration *prometheus.HistogramVec
}

func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *LifecyclerMetrics {
return &LifecyclerMetrics{
consulHeartbeats: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "member_consul_heartbeats_total",
Help: "The total number of heartbeats sent to consul.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
tokensOwned: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "member_ring_tokens_owned",
Help: "The number of tokens owned in the ring.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
tokensToOwn: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "member_ring_tokens_to_own",
Help: "The number of tokens to own in the ring.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
shutdownDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "shutdown_duration_seconds",
Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).",
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
ConstLabels: prometheus.Labels{"name": ringName},
}, []string{"op", "status"}),
}

}
139 changes: 58 additions & 81 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/dskit/kv"
shardUtil "github.com/grafana/dskit/ring/shard"
Expand Down Expand Up @@ -47,7 +48,6 @@ const (

// ReadRing represents the read interface to the ring.
type ReadRing interface {
prometheus.Collector

// Get returns n (or more) instances which form the replicas for the given key.
// bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
Expand Down Expand Up @@ -193,11 +193,12 @@ type Ring struct {
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring

memberOwnershipDesc *prometheus.Desc
numMembersDesc *prometheus.Desc
totalTokensDesc *prometheus.Desc
numTokensDesc *prometheus.Desc
oldestTimestampDesc *prometheus.Desc
memberOwnershipGaugeVec *prometheus.GaugeVec
numMembersGaugeVec *prometheus.GaugeVec
totalTokensGauge prometheus.Gauge
numTokensGaugeVec *prometheus.GaugeVec
oldestTimestampGaugeVec *prometheus.GaugeVec
metricsUpdateTicker *time.Ticker

logger log.Logger
}
Expand All @@ -221,10 +222,10 @@ func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registe
return nil, err
}

return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), logger)
return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), reg, logger)
}

func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, logger log.Logger) (*Ring, error) {
func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error) {
if cfg.ReplicationFactor <= 0 {
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
}
Expand All @@ -236,40 +237,34 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client
strategy: strategy,
ringDesc: &Desc{},
shuffledSubringCache: map[subringCacheKey]*Ring{},
memberOwnershipDesc: prometheus.NewDesc(
"ring_member_ownership_percent",
"The percent ownership of the ring by member",
[]string{"member"},
map[string]string{"name": name},
),
numMembersDesc: prometheus.NewDesc(
"ring_members",
"Number of members in the ring",
[]string{"state"},
map[string]string{"name": name},
),
totalTokensDesc: prometheus.NewDesc(
"ring_tokens_total",
"Number of tokens in the ring",
nil,
map[string]string{"name": name},
),
numTokensDesc: prometheus.NewDesc(
"ring_tokens_owned",
"The number of tokens in the ring owned by the member",
[]string{"member"},
map[string]string{"name": name},
),
oldestTimestampDesc: prometheus.NewDesc(
"ring_oldest_member_timestamp",
"Timestamp of the oldest member in the ring.",
[]string{"state"},
map[string]string{"name": name},
),
memberOwnershipGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_member_ownership_percent",
Help: "The percent ownership of the ring by member",
ConstLabels: map[string]string{"name": name}},
[]string{"member"}),
numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_members",
Help: "Number of members in the ring",
ConstLabels: map[string]string{"name": name}},
[]string{"state"}),
totalTokensGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "ring_tokens_total",
Help: "Number of tokens in the ring",
ConstLabels: map[string]string{"name": name}}),
numTokensGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_tokens_owned",
Help: "The number of tokens in the ring owned by the member",
ConstLabels: map[string]string{"name": name}},
[]string{"member"}),
oldestTimestampGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_oldest_member_timestamp",
Help: "Timestamp of the oldest member in the ring.",
ConstLabels: map[string]string{"name": name}},
[]string{"state"}),
logger: logger,
}

r.Service = services.NewBasicService(r.starting, r.loop, nil).WithName(fmt.Sprintf("%s ring client", name))
r.Service = services.NewBasicService(r.starting, r.loop, r.stopping).WithName(fmt.Sprintf("%s ring client", name))
return r, nil
}

Expand All @@ -287,6 +282,14 @@ func (r *Ring) starting(ctx context.Context) error {
}

r.updateRingState(value.(*Desc))

// Start metrics update ticker, and give it a function to update the ring metrics.
r.metricsUpdateTicker = time.NewTicker(10 * time.Second)
go func() {
for range r.metricsUpdateTicker.C {
r.updateRingMetrics()
}
}()
return nil
}

Expand All @@ -303,6 +306,14 @@ func (r *Ring) loop(ctx context.Context) error {
return nil
}

func (r *Ring) stopping(_ error) error {
// Stop Metrics ticker.
if r.metricsUpdateTicker != nil {
r.metricsUpdateTicker.Stop()
}
return nil
}

func (r *Ring) updateRingState(ringDesc *Desc) {
r.mtx.RLock()
prevRing := r.ringDesc
Expand Down Expand Up @@ -523,15 +534,6 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
}, nil
}

// Describe implements prometheus.Collector.
func (r *Ring) Describe(ch chan<- *prometheus.Desc) {
ch <- r.memberOwnershipDesc
ch <- r.numMembersDesc
ch <- r.totalTokensDesc
ch <- r.oldestTimestampDesc
ch <- r.numTokensDesc
}

// countTokens returns the number of tokens and tokens within the range for each instance.
// The ring read lock must be already taken when calling this function.
func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
Expand Down Expand Up @@ -563,25 +565,15 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) {
return numTokens, owned
}

// Collect implements prometheus.Collector.
func (r *Ring) Collect(ch chan<- prometheus.Metric) {
// updateRingMetrics updates ring metrics.
func (r *Ring) updateRingMetrics() {
r.mtx.RLock()
defer r.mtx.RUnlock()

numTokens, ownedRange := r.countTokens()
for id, totalOwned := range ownedRange {
ch <- prometheus.MustNewConstMetric(
r.memberOwnershipDesc,
prometheus.GaugeValue,
float64(totalOwned)/float64(math.MaxUint32),
id,
)
ch <- prometheus.MustNewConstMetric(
r.numTokensDesc,
prometheus.GaugeValue,
float64(numTokens[id]),
id,
)
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
}

numByState := map[string]int{}
Expand All @@ -605,27 +597,12 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) {
}

for state, count := range numByState {
ch <- prometheus.MustNewConstMetric(
r.numMembersDesc,
prometheus.GaugeValue,
float64(count),
state,
)
r.numMembersGaugeVec.WithLabelValues(state).Set(float64(count))
}
for state, timestamp := range oldestTimestampByState {
ch <- prometheus.MustNewConstMetric(
r.oldestTimestampDesc,
prometheus.GaugeValue,
float64(timestamp),
state,
)
}

ch <- prometheus.MustNewConstMetric(
r.totalTokensDesc,
prometheus.GaugeValue,
float64(len(r.ringTokens)),
)
r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp))
}
r.totalTokensGauge.Set(float64(len(r.ringTokens)))
}

// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
Expand Down

0 comments on commit 4c56601

Please sign in to comment.