From bec39170672acbc4f3f3770702f8d3ffc56db8c7 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Fri, 3 May 2024 18:44:38 -0400 Subject: [PATCH] Cleanup compression metrics --- message/messages.go | 106 ++++++++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/message/messages.go b/message/messages.go index a1d0601d4988..05220222dda7 100644 --- a/message/messages.go +++ b/message/messages.go @@ -9,23 +9,32 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/proto/pb/p2p" + "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/compression" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/metric" "github.com/ava-labs/avalanchego/utils/timer/mockable" - "github.com/ava-labs/avalanchego/utils/wrappers" +) + +const ( + typeLabel = "type" + opLabel = "op" + directionLabel = "direction" + + compressionLabel = "compression" + decompressionLabel = "decompression" ) var ( _ InboundMessage = (*inboundMessage)(nil) _ OutboundMessage = (*outboundMessage)(nil) + metricLabels = []string{typeLabel, opLabel, directionLabel} + errUnknownCompressionType = errors.New("message is compressed with an unknown compression type") ) @@ -131,9 +140,9 @@ func (m *outboundMessage) BytesSavedCompression() int { type msgBuilder struct { log logging.Logger - zstdCompressor compression.Compressor - zstdCompressTimeMetrics map[Op]metric.Averager - zstdDecompressTimeMetrics map[Op]metric.Averager + zstdCompressor compression.Compressor + count *prometheus.CounterVec // type + op + direction + duration *prometheus.CounterVec // type + op + direction maxMessageTimeout time.Duration } @@ -152,31 +161,30 @@ func newMsgBuilder( mb := &msgBuilder{ log: log, - zstdCompressor: zstdCompressor, - zstdCompressTimeMetrics: make(map[Op]metric.Averager, len(ExternalOps)), - zstdDecompressTimeMetrics: make(map[Op]metric.Averager, len(ExternalOps)), + zstdCompressor: zstdCompressor, + count: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "compressed_count", + Help: "number of compressed messages", + }, + metricLabels, + ), + duration: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "compressed_duration", + Help: "time spent handling compressed messages", + }, + metricLabels, + ), maxMessageTimeout: maxMessageTimeout, } - - errs := wrappers.Errs{} - for _, op := range ExternalOps { - mb.zstdCompressTimeMetrics[op] = metric.NewAveragerWithErrs( - namespace, - fmt.Sprintf("zstd_%s_compress_time", op), - fmt.Sprintf("time (in ns) to compress %s messages with zstd", op), - metrics, - &errs, - ) - mb.zstdDecompressTimeMetrics[op] = metric.NewAveragerWithErrs( - namespace, - fmt.Sprintf("zstd_%s_decompress_time", op), - fmt.Sprintf("time (in ns) to decompress %s messages with zstd", op), - metrics, - &errs, - ) - } - return mb, errs.Err + return mb, utils.Err( + metrics.Register(mb.count), + metrics.Register(mb.duration), + ) } func (mb *msgBuilder) marshal( @@ -200,9 +208,8 @@ func (mb *msgBuilder) marshal( // This recursive packing allows us to avoid an extra compression on/off // field in the message. var ( - startTime = time.Now() - compressedMsg p2p.Message - opToCompressTimeMetrics map[Op]metric.Averager + startTime = time.Now() + compressedMsg p2p.Message ) switch compressionType { case compression.TypeNone: @@ -217,7 +224,6 @@ func (mb *msgBuilder) marshal( CompressedZstd: compressedBytes, }, } - opToCompressTimeMetrics = mb.zstdCompressTimeMetrics default: return nil, 0, 0, errUnknownCompressionType } @@ -228,15 +234,13 @@ func (mb *msgBuilder) marshal( } compressTook := time.Since(startTime) - if compressTimeMetric, ok := opToCompressTimeMetrics[op]; ok { - compressTimeMetric.Observe(float64(compressTook)) - } else { - // Should never happen - mb.log.Warn("no compression metric found for op", - zap.Stringer("op", op), - zap.Stringer("compressionType", compressionType), - ) + labels := prometheus.Labels{ + typeLabel: compressionType.String(), + opLabel: op.String(), + directionLabel: compressionLabel, } + mb.count.With(labels).Inc() + mb.duration.With(labels).Add(float64(compressTook)) bytesSaved := len(uncompressedMsgBytes) - len(compressedMsgBytes) return compressedMsgBytes, bytesSaved, op, nil @@ -250,14 +254,12 @@ func (mb *msgBuilder) unmarshal(b []byte) (*p2p.Message, int, Op, error) { // Figure out what compression type, if any, was used to compress the message. var ( - opToDecompressTimeMetrics map[Op]metric.Averager - compressor compression.Compressor - compressedBytes []byte - zstdCompressed = m.GetCompressedZstd() + compressor compression.Compressor + compressedBytes []byte + zstdCompressed = m.GetCompressedZstd() ) switch { case len(zstdCompressed) > 0: - opToDecompressTimeMetrics = mb.zstdDecompressTimeMetrics compressor = mb.zstdCompressor compressedBytes = zstdCompressed default: @@ -284,14 +286,14 @@ func (mb *msgBuilder) unmarshal(b []byte) (*p2p.Message, int, Op, error) { if err != nil { return nil, 0, 0, err } - if decompressTimeMetric, ok := opToDecompressTimeMetrics[op]; ok { - decompressTimeMetric.Observe(float64(decompressTook)) - } else { - // Should never happen - mb.log.Warn("no decompression metric found for op", - zap.Stringer("op", op), - ) + + labels := prometheus.Labels{ + typeLabel: compression.TypeZstd.String(), + opLabel: op.String(), + directionLabel: decompressionLabel, } + mb.count.With(labels).Inc() + mb.duration.With(labels).Add(float64(decompressTook)) return m, bytesSavedCompression, op, nil }