diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index f1966adc4dc4..9388d2d66be6 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -165,11 +165,25 @@ func New( return nil, fmt.Errorf("initializing handler metrics errored with: %w", err) } cpuTracker := resourceTracker.CPUTracker() - h.syncMessageQueue, err = NewMessageQueue(h.ctx, h.validators, cpuTracker, "handler") + h.syncMessageQueue, err = NewMessageQueue( + h.ctx.Log, + h.ctx.SubnetID, + h.validators, + cpuTracker, + "handler", + h.ctx.Registerer, + ) if err != nil { return nil, fmt.Errorf("initializing sync message queue errored with: %w", err) } - h.asyncMessageQueue, err = NewMessageQueue(h.ctx, h.validators, cpuTracker, "handler_async") + h.asyncMessageQueue, err = NewMessageQueue( + h.ctx.Log, + h.ctx.SubnetID, + h.validators, + cpuTracker, + "handler_async", + h.ctx.Registerer, + ) if err != nil { return nil, fmt.Errorf("initializing async message queue errored with: %w", err) } diff --git a/snow/networking/handler/message_queue.go b/snow/networking/handler/message_queue.go index f17cfc1a2e92..4d632c62d77e 100644 --- a/snow/networking/handler/message_queue.go +++ b/snow/networking/handler/message_queue.go @@ -13,10 +13,10 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/message" "github.com/ava-labs/avalanchego/proto/pb/p2p" - "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/networking/tracker" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/buffer" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/timer/mockable" ) @@ -60,7 +60,8 @@ type messageQueue struct { clock mockable.Clock metrics messageQueueMetrics - ctx *snow.ConsensusContext + log logging.Logger + subnetID ids.ID // Validator set for the chain associated with this vdrs validators.Manager // Tracks CPU utilization of each node @@ -75,20 +76,23 @@ type messageQueue struct { } func NewMessageQueue( - ctx *snow.ConsensusContext, + log logging.Logger, + subnetID ids.ID, vdrs validators.Manager, cpuTracker tracker.Tracker, metricsNamespace string, + reg prometheus.Registerer, ) (MessageQueue, error) { m := &messageQueue{ - ctx: ctx, + log: log, + subnetID: subnetID, vdrs: vdrs, cpuTracker: cpuTracker, cond: sync.NewCond(&sync.Mutex{}), nodeToUnprocessedMsgs: make(map[ids.NodeID]int), msgAndCtxs: buffer.NewUnboundedDeque[*msgAndContext](1 /*=initSize*/), } - return m, m.metrics.initialize(metricsNamespace, ctx.Registerer) + return m, m.metrics.initialize(metricsNamespace, reg) } func (m *messageQueue) Push(ctx context.Context, msg Message) { @@ -137,7 +141,7 @@ func (m *messageQueue) Pop() (context.Context, Message, bool) { i := 0 for { if i == n { - m.ctx.Log.Debug("canPop is false for all unprocessed messages", + m.log.Debug("canPop is false for all unprocessed messages", zap.Int("numMessages", n), ) } @@ -212,21 +216,21 @@ func (m *messageQueue) canPop(msg message.InboundMessage) bool { // the number of nodes with unprocessed messages. baseMaxCPU := 1 / float64(len(m.nodeToUnprocessedMsgs)) nodeID := msg.NodeID() - weight := m.vdrs.GetWeight(m.ctx.SubnetID, nodeID) + weight := m.vdrs.GetWeight(m.subnetID, nodeID) var portionWeight float64 - if totalVdrsWeight, err := m.vdrs.TotalWeight(m.ctx.SubnetID); err != nil { + if totalVdrsWeight, err := m.vdrs.TotalWeight(m.subnetID); err != nil { // The sum of validator weights should never overflow, but if they do, // we treat portionWeight as 0. - m.ctx.Log.Error("failed to get total weight of validators", - zap.Stringer("subnetID", m.ctx.SubnetID), + m.log.Error("failed to get total weight of validators", + zap.Stringer("subnetID", m.subnetID), zap.Error(err), ) } else if totalVdrsWeight == 0 { // The sum of validator weights should never be 0, but handle that case // for completeness here to avoid divide by 0. - m.ctx.Log.Warn("validator set is empty", - zap.Stringer("subnetID", m.ctx.SubnetID), + m.log.Warn("validator set is empty", + zap.Stringer("subnetID", m.subnetID), ) } else { portionWeight = float64(weight) / float64(totalVdrsWeight) diff --git a/snow/networking/handler/message_queue_test.go b/snow/networking/handler/message_queue_test.go index 577a4686faa0..a74ffcfb4469 100644 --- a/snow/networking/handler/message_queue_test.go +++ b/snow/networking/handler/message_queue_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -15,21 +16,27 @@ import ( "github.com/ava-labs/avalanchego/message" "github.com/ava-labs/avalanchego/proto/pb/p2p" "github.com/ava-labs/avalanchego/snow/networking/tracker" - "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/constants" + "github.com/ava-labs/avalanchego/utils/logging" ) func TestQueue(t *testing.T) { ctrl := gomock.NewController(t) require := require.New(t) cpuTracker := tracker.NewMockTracker(ctrl) - snowCtx := snowtest.Context(t, snowtest.CChainID) - ctx := snowtest.ConsensusContext(snowCtx) vdrs := validators.NewManager() vdr1ID, vdr2ID := ids.GenerateTestNodeID(), ids.GenerateTestNodeID() - require.NoError(vdrs.AddStaker(ctx.SubnetID, vdr1ID, nil, ids.Empty, 1)) - require.NoError(vdrs.AddStaker(ctx.SubnetID, vdr2ID, nil, ids.Empty, 1)) - mIntf, err := NewMessageQueue(ctx, vdrs, cpuTracker, "") + require.NoError(vdrs.AddStaker(constants.PrimaryNetworkID, vdr1ID, nil, ids.Empty, 1)) + require.NoError(vdrs.AddStaker(constants.PrimaryNetworkID, vdr2ID, nil, ids.Empty, 1)) + mIntf, err := NewMessageQueue( + logging.NoLog{}, + constants.PrimaryNetworkID, + vdrs, + cpuTracker, + "", + prometheus.NewRegistry(), + ) require.NoError(err) u := mIntf.(*messageQueue) currentTime := time.Now() diff --git a/vms/avm/vm.go b/vms/avm/vm.go index ab05b053b393..6a455132c1a1 100644 --- a/vms/avm/vm.go +++ b/vms/avm/vm.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/versiondb" @@ -33,7 +34,6 @@ import ( "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/vms/avm/block" "github.com/ava-labs/avalanchego/vms/avm/config" - "github.com/ava-labs/avalanchego/vms/avm/metrics" "github.com/ava-labs/avalanchego/vms/avm/network" "github.com/ava-labs/avalanchego/vms/avm/state" "github.com/ava-labs/avalanchego/vms/avm/txs" @@ -47,6 +47,7 @@ import ( blockbuilder "github.com/ava-labs/avalanchego/vms/avm/block/builder" blockexecutor "github.com/ava-labs/avalanchego/vms/avm/block/executor" extensions "github.com/ava-labs/avalanchego/vms/avm/fxs" + avmmetrics "github.com/ava-labs/avalanchego/vms/avm/metrics" txexecutor "github.com/ava-labs/avalanchego/vms/avm/txs/executor" xmempool "github.com/ava-labs/avalanchego/vms/avm/txs/mempool" ) @@ -66,7 +67,7 @@ type VM struct { config.Config - metrics metrics.Metrics + metrics avmmetrics.Metrics avax.AddressManager ids.Aliaser @@ -173,16 +174,15 @@ func (vm *VM) Initialize( zap.Reflect("config", avmConfig), ) - registerer := prometheus.NewRegistry() - if err := ctx.Metrics.Register("", registerer); err != nil { + vm.registerer, err = metrics.MakeAndRegister(ctx.Metrics, "") + if err != nil { return err } - vm.registerer = registerer vm.connectedPeers = make(map[ids.NodeID]*version.Application) // Initialize metrics as soon as possible - vm.metrics, err = metrics.New(registerer) + vm.metrics, err = avmmetrics.New(vm.registerer) if err != nil { return fmt.Errorf("failed to initialize metrics: %w", err) } diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index 565960cff599..efbfe0fa5453 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -12,9 +12,9 @@ import ( "time" "github.com/gorilla/rpc/v2" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "github.com/ava-labs/avalanchego/api/metrics" "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/codec/linearcodec" @@ -35,7 +35,6 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/block" "github.com/ava-labs/avalanchego/vms/platformvm/config" "github.com/ava-labs/avalanchego/vms/platformvm/fx" - "github.com/ava-labs/avalanchego/vms/platformvm/metrics" "github.com/ava-labs/avalanchego/vms/platformvm/network" "github.com/ava-labs/avalanchego/vms/platformvm/reward" "github.com/ava-labs/avalanchego/vms/platformvm/state" @@ -47,6 +46,7 @@ import ( snowmanblock "github.com/ava-labs/avalanchego/snow/engine/snowman/block" blockbuilder "github.com/ava-labs/avalanchego/vms/platformvm/block/builder" blockexecutor "github.com/ava-labs/avalanchego/vms/platformvm/block/executor" + platformvmmetrics "github.com/ava-labs/avalanchego/vms/platformvm/metrics" txexecutor "github.com/ava-labs/avalanchego/vms/platformvm/txs/executor" pmempool "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" pvalidators "github.com/ava-labs/avalanchego/vms/platformvm/validators" @@ -65,7 +65,7 @@ type VM struct { *network.Network validators.State - metrics metrics.Metrics + metrics platformvmmetrics.Metrics // Used to get time. Useful for faking time during tests. clock mockable.Clock @@ -113,13 +113,13 @@ func (vm *VM) Initialize( } chainCtx.Log.Info("using VM execution config", zap.Reflect("config", execConfig)) - registerer := prometheus.NewRegistry() - if err := chainCtx.Metrics.Register("", registerer); err != nil { + registerer, err := metrics.MakeAndRegister(chainCtx.Metrics, "") + if err != nil { return err } // Initialize metrics as soon as possible - vm.metrics, err = metrics.New(registerer) + vm.metrics, err = platformvmmetrics.New(registerer) if err != nil { return fmt.Errorf("failed to initialize metrics: %w", err) }