diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 92f1b169cc4d..f1966adc4dc4 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -367,7 +367,7 @@ func (h *handler) dispatchSync(ctx context.Context) { for { // Get the next message we should process. If the handler is shutting // down, we may fail to pop a message. - ctx, msg, ok := h.popUnexpiredMsg(h.syncMessageQueue, h.metrics.expired) + ctx, msg, ok := h.popUnexpiredMsg(h.syncMessageQueue) if !ok { return } @@ -397,7 +397,7 @@ func (h *handler) dispatchAsync(ctx context.Context) { for { // Get the next message we should process. If the handler is shutting // down, we may fail to pop a message. - ctx, msg, ok := h.popUnexpiredMsg(h.asyncMessageQueue, h.metrics.asyncExpired) + ctx, msg, ok := h.popUnexpiredMsg(h.asyncMessageQueue) if !ok { return } @@ -445,7 +445,7 @@ func (h *handler) dispatchChans(ctx context.Context) { func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { var ( nodeID = msg.NodeID() - op = msg.Op() + op = msg.Op().String() body = msg.Message() startTime = h.clock.Time() // Check if the chain is in normal operation at the start of message @@ -455,13 +455,13 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { if h.ctx.Log.Enabled(logging.Verbo) { h.ctx.Log.Verbo("forwarding sync message to consensus", zap.Stringer("nodeID", nodeID), - zap.Stringer("messageOp", op), + zap.String("messageOp", op), zap.Stringer("message", body), ) } else { h.ctx.Log.Debug("forwarding sync message to consensus", zap.Stringer("nodeID", nodeID), - zap.Stringer("messageOp", op), + zap.String("messageOp", op), ) } h.resourceTracker.StartProcessing(nodeID, startTime) @@ -471,24 +471,28 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { h.ctx.Lock.Unlock() var ( - endTime = h.clock.Time() - messageHistograms = h.metrics.messages[op] - processingTime = endTime.Sub(startTime) - msgHandlingTime = endTime.Sub(lockAcquiredTime) + endTime = h.clock.Time() + lockingTime = lockAcquiredTime.Sub(startTime) + handlingTime = endTime.Sub(lockAcquiredTime) ) h.resourceTracker.StopProcessing(nodeID, endTime) - messageHistograms.processingTime.Observe(float64(processingTime)) - messageHistograms.msgHandlingTime.Observe(float64(msgHandlingTime)) + h.metrics.lockingTime.Add(float64(lockingTime)) + labels := prometheus.Labels{ + opLabel: op, + } + h.metrics.messages.With(labels).Inc() + h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime)) + msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling sync message", - zap.Stringer("messageOp", op), + zap.String("messageOp", op), ) - if processingTime > syncProcessingTimeWarnLimit && isNormalOp { + if lockingTime+handlingTime > syncProcessingTimeWarnLimit && isNormalOp { h.ctx.Log.Warn("handling sync message took longer than expected", - zap.Duration("processingTime", processingTime), - zap.Duration("msgHandlingTime", msgHandlingTime), + zap.Duration("lockingTime", lockingTime), + zap.Duration("handlingTime", handlingTime), zap.Stringer("nodeID", nodeID), - zap.Stringer("messageOp", op), + zap.String("messageOp", op), zap.Stringer("message", body), ) } @@ -504,7 +508,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { // drop the message. h.ctx.Log.Debug("dropping sync message", zap.String("reason", "uninitialized engine type"), - zap.Stringer("messageOp", op), + zap.String("messageOp", op), zap.Stringer("currentEngineType", currentState.Type), zap.Stringer("requestedEngineType", msg.EngineType), ) @@ -534,7 +538,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg Message) error { // requested an Avalanche engine handle the message. h.ctx.Log.Debug("dropping sync message", zap.String("reason", "uninitialized engine state"), - zap.Stringer("messageOp", op), + zap.String("messageOp", op), zap.Stringer("currentEngineType", currentState.Type), zap.Stringer("requestedEngineType", msg.EngineType), zap.Stringer("engineState", currentState.State), @@ -787,36 +791,38 @@ func (h *handler) handleAsyncMsg(ctx context.Context, msg Message) { func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { var ( nodeID = msg.NodeID() - op = msg.Op() + op = msg.Op().String() body = msg.Message() startTime = h.clock.Time() ) if h.ctx.Log.Enabled(logging.Verbo) { h.ctx.Log.Verbo("forwarding async message to consensus", zap.Stringer("nodeID", nodeID), - zap.Stringer("messageOp", op), + zap.String("messageOp", op), zap.Stringer("message", body), ) } else { h.ctx.Log.Debug("forwarding async message to consensus", zap.Stringer("nodeID", nodeID), - zap.Stringer("messageOp", op), + zap.String("messageOp", op), ) } h.resourceTracker.StartProcessing(nodeID, startTime) defer func() { var ( - endTime = h.clock.Time() - messageHistograms = h.metrics.messages[op] - processingTime = endTime.Sub(startTime) + endTime = h.clock.Time() + handlingTime = endTime.Sub(startTime) ) h.resourceTracker.StopProcessing(nodeID, endTime) - // There is no lock grabbed here, so both metrics are identical - messageHistograms.processingTime.Observe(float64(processingTime)) - messageHistograms.msgHandlingTime.Observe(float64(processingTime)) + labels := prometheus.Labels{ + opLabel: op, + } + h.metrics.messages.With(labels).Inc() + h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime)) + msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling async message", - zap.Stringer("messageOp", op), + zap.String("messageOp", op), ) }() @@ -901,7 +907,7 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg Message) error { // Any returned error is treated as fatal func (h *handler) handleChanMsg(msg message.InboundMessage) error { var ( - op = msg.Op() + op = msg.Op().String() body = msg.Message() startTime = h.clock.Time() // Check if the chain is in normal operation at the start of message @@ -910,12 +916,12 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { ) if h.ctx.Log.Enabled(logging.Verbo) { h.ctx.Log.Verbo("forwarding chan message to consensus", - zap.Stringer("messageOp", op), + zap.String("messageOp", op), zap.Stringer("message", body), ) } else { h.ctx.Log.Debug("forwarding chan message to consensus", - zap.Stringer("messageOp", op), + zap.String("messageOp", op), ) } h.ctx.Lock.Lock() @@ -924,22 +930,26 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { h.ctx.Lock.Unlock() var ( - endTime = h.clock.Time() - messageHistograms = h.metrics.messages[op] - processingTime = endTime.Sub(startTime) - msgHandlingTime = endTime.Sub(lockAcquiredTime) + endTime = h.clock.Time() + lockingTime = lockAcquiredTime.Sub(startTime) + handlingTime = endTime.Sub(lockAcquiredTime) ) - messageHistograms.processingTime.Observe(float64(processingTime)) - messageHistograms.msgHandlingTime.Observe(float64(msgHandlingTime)) + h.metrics.lockingTime.Add(float64(lockingTime)) + labels := prometheus.Labels{ + opLabel: op, + } + h.metrics.messages.With(labels).Inc() + h.metrics.messageHandlingTime.With(labels).Add(float64(handlingTime)) + msg.OnFinishedHandling() h.ctx.Log.Debug("finished handling chan message", - zap.Stringer("messageOp", op), + zap.String("messageOp", op), ) - if processingTime > syncProcessingTimeWarnLimit && isNormalOp { + if lockingTime+handlingTime > syncProcessingTimeWarnLimit && isNormalOp { h.ctx.Log.Warn("handling chan message took longer than expected", - zap.Duration("processingTime", processingTime), - zap.Duration("msgHandlingTime", msgHandlingTime), - zap.Stringer("messageOp", op), + zap.Duration("lockingTime", lockingTime), + zap.Duration("handlingTime", handlingTime), + zap.String("messageOp", op), zap.Stringer("message", body), ) } @@ -974,10 +984,7 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error { } } -func (h *handler) popUnexpiredMsg( - queue MessageQueue, - expired prometheus.Counter, -) (context.Context, Message, bool) { +func (h *handler) popUnexpiredMsg(queue MessageQueue) (context.Context, Message, bool) { for { // Get the next message we should process. If the handler is shutting // down, we may fail to pop a message. @@ -988,16 +995,19 @@ func (h *handler) popUnexpiredMsg( // If this message's deadline has passed, don't process it. if expiration := msg.Expiration(); h.clock.Time().After(expiration) { + op := msg.Op().String() h.ctx.Log.Debug("dropping message", zap.String("reason", "timeout"), zap.Stringer("nodeID", msg.NodeID()), - zap.Stringer("messageOp", msg.Op()), + zap.String("messageOp", op), ) span := trace.SpanFromContext(ctx) span.AddEvent("dropping message", trace.WithAttributes( attribute.String("reason", "timeout"), )) - expired.Inc() + h.metrics.expired.With(prometheus.Labels{ + opLabel: op, + }).Inc() msg.OnFinishedHandling() continue } diff --git a/snow/networking/handler/metrics.go b/snow/networking/handler/metrics.go index efb6cf558d1e..cbf297c0f3d9 100644 --- a/snow/networking/handler/metrics.go +++ b/snow/networking/handler/metrics.go @@ -4,69 +4,54 @@ package handler import ( - "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/ava-labs/avalanchego/message" - "github.com/ava-labs/avalanchego/utils/metric" - "github.com/ava-labs/avalanchego/utils/wrappers" + "github.com/ava-labs/avalanchego/utils" ) type metrics struct { - expired prometheus.Counter - asyncExpired prometheus.Counter - messages map[message.Op]*messageProcessing -} - -type messageProcessing struct { - processingTime metric.Averager - msgHandlingTime metric.Averager + expired *prometheus.CounterVec // op + messages *prometheus.CounterVec // op + lockingTime prometheus.Counter + messageHandlingTime *prometheus.CounterVec // op } func newMetrics(namespace string, reg prometheus.Registerer) (*metrics, error) { - errs := wrappers.Errs{} - - expired := prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "expired", - Help: "Incoming sync messages dropped because the message deadline expired", - }) - asyncExpired := prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: namespace, - Name: "async_expired", - Help: "Incoming async messages dropped because the message deadline expired", - }) - errs.Add( - reg.Register(expired), - reg.Register(asyncExpired), - ) - - messages := make(map[message.Op]*messageProcessing, len(message.ConsensusOps)) - for _, op := range message.ConsensusOps { - opStr := op.String() - messageProcessing := &messageProcessing{ - processingTime: metric.NewAveragerWithErrs( - namespace, - opStr, - "time (in ns) spent handling a "+opStr, - reg, - &errs, - ), - msgHandlingTime: metric.NewAveragerWithErrs( - namespace, - opStr+"_msg_handling", - fmt.Sprintf("time (in ns) spent handling a %s after grabbing the lock", opStr), - reg, - &errs, - ), - } - messages[op] = messageProcessing + m := &metrics{ + expired: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "expired", + Help: "messages dropped because the deadline expired", + }, + opLabels, + ), + messages: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "messages", + Help: "messages handled", + }, + opLabels, + ), + messageHandlingTime: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "message_handling_time", + Help: "time spent handling messages", + }, + opLabels, + ), + lockingTime: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Name: "locking_time", + Help: "time spent acquiring the context lock", + }), } - - return &metrics{ - expired: expired, - asyncExpired: asyncExpired, - messages: messages, - }, errs.Err + return m, utils.Err( + reg.Register(m.expired), + reg.Register(m.messages), + reg.Register(m.messageHandlingTime), + reg.Register(m.lockingTime), + ) }