From 357bc936256c8aee65668670e276c3f52cc43d1d Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Wed, 7 Feb 2024 10:50:47 -0500 Subject: [PATCH] refactor chain manager subnets Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- chains/manager.go | 54 +++---------- chains/subnets.go | 87 ++++++++++++++++++++ chains/subnets_test.go | 180 +++++++++++++++++++++++++++++++++++++++++ node/node.go | 98 +++++++++++----------- 4 files changed, 330 insertions(+), 89 deletions(-) create mode 100644 chains/subnets.go create mode 100644 chains/subnets_test.go diff --git a/chains/manager.go b/chains/manager.go index 48f2fee35704..25f6d2f53d1c 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -256,10 +256,7 @@ type manager struct { chainCreatorShutdownCh chan struct{} chainCreatorExited sync.WaitGroup - subnetsLock sync.RWMutex - // Key: Subnet's ID - // Value: Subnet description - subnets map[ids.ID]subnets.Subnet + subnets *Subnets chainsLock sync.Mutex // Key: Chain's ID @@ -271,13 +268,13 @@ type manager struct { } // New returns a new Manager -func New(config *ManagerConfig) Manager { +func New(config *ManagerConfig, subnets *Subnets) Manager { return &manager{ Aliaser: ids.NewAliaser(), ManagerConfig: *config, stakingSigner: config.StakingTLSCert.PrivateKey.(crypto.Signer), stakingCert: staking.CertificateFromX509(config.StakingTLSCert.Leaf), - subnets: make(map[ids.ID]subnets.Subnet), + subnets: subnets, chains: make(map[ids.ID]handler.Handler), chainsQueue: buffer.NewUnboundedBlockingDeque[ChainParameters](initialQueueSize), unblockChainCreatorCh: make(chan struct{}), @@ -288,25 +285,14 @@ func New(config *ManagerConfig) Manager { // QueueChainCreation queues a chain creation request // Invariant: Tracked Subnet must be checked before calling this function func (m *manager) QueueChainCreation(chainParams ChainParameters) { - m.subnetsLock.Lock() - subnetID := chainParams.SubnetID - sb, exists := m.subnets[subnetID] - if !exists { - sbConfig, ok := m.SubnetConfigs[subnetID] - if !ok { - // default to primary subnet config - sbConfig = m.SubnetConfigs[constants.PrimaryNetworkID] - } - sb = subnets.New(m.NodeID, sbConfig) - m.subnets[chainParams.SubnetID] = sb - } + _ = m.subnets.Add(chainParams.SubnetID) + sb, _ := m.subnets.Get(chainParams.SubnetID) addedChain := sb.AddChain(chainParams.ID) - m.subnetsLock.Unlock() if !addedChain { m.Log.Debug("skipping chain creation", zap.String("reason", "chain already staged"), - zap.Stringer("subnetID", subnetID), + zap.Stringer("subnetID", chainParams.SubnetID), zap.Stringer("chainID", chainParams.ID), zap.Stringer("vmID", chainParams.VMID), ) @@ -316,7 +302,7 @@ func (m *manager) QueueChainCreation(chainParams ChainParameters) { if ok := m.chainsQueue.PushRight(chainParams); !ok { m.Log.Warn("skipping chain creation", zap.String("reason", "couldn't enqueue chain"), - zap.Stringer("subnetID", subnetID), + zap.Stringer("subnetID", chainParams.SubnetID), zap.Stringer("chainID", chainParams.ID), zap.Stringer("vmID", chainParams.VMID), ) @@ -334,9 +320,7 @@ func (m *manager) createChain(chainParams ChainParameters) { zap.Stringer("vmID", chainParams.VMID), ) - m.subnetsLock.RLock() - sb := m.subnets[chainParams.SubnetID] - m.subnetsLock.RUnlock() + sb, _ := m.subnets.Get(chainParams.SubnetID) // Note: buildChain builds all chain's relevant objects (notably engine and handler) // but does not start their operations. Starting of the handler (which could potentially @@ -1307,22 +1291,9 @@ func (m *manager) IsBootstrapped(id ids.ID) bool { return chain.Context().State.Get().State == snow.NormalOp } -func (m *manager) subnetsNotBootstrapped() []ids.ID { - m.subnetsLock.RLock() - defer m.subnetsLock.RUnlock() - - subnetsBootstrapping := make([]ids.ID, 0, len(m.subnets)) - for subnetID, subnet := range m.subnets { - if !subnet.IsBootstrapped() { - subnetsBootstrapping = append(subnetsBootstrapping, subnetID) - } - } - return subnetsBootstrapping -} - func (m *manager) registerBootstrappedHealthChecks() error { bootstrappedCheck := health.CheckerFunc(func(context.Context) (interface{}, error) { - subnetIDs := m.subnetsNotBootstrapped() + subnetIDs := m.subnets.Bootstrapping() if len(subnetIDs) != 0 { return subnetIDs, errNotBootstrapped } @@ -1367,16 +1338,11 @@ func (m *manager) registerBootstrappedHealthChecks() error { func (m *manager) StartChainCreator(platformParams ChainParameters) error { // Get the Primary Network's subnet config. If it wasn't registered, then we // throw a fatal error. - sbConfig, ok := m.SubnetConfigs[constants.PrimaryNetworkID] + sb, ok := m.subnets.Get(constants.PrimaryNetworkID) if !ok { return errNoPrimaryNetworkConfig } - - sb := subnets.New(m.NodeID, sbConfig) - m.subnetsLock.Lock() - m.subnets[platformParams.SubnetID] = sb sb.AddChain(platformParams.ID) - m.subnetsLock.Unlock() // The P-chain is created synchronously to ensure that `VM.Initialize` has // finished before returning from this function. This is required because diff --git a/chains/subnets.go b/chains/subnets.go new file mode 100644 index 000000000000..5017ba76fffe --- /dev/null +++ b/chains/subnets.go @@ -0,0 +1,87 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package chains + +import ( + "errors" + "sync" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/subnets" + "github.com/ava-labs/avalanchego/utils/constants" +) + +var ErrNoPrimaryNetworkConfig = errors.New("no subnet config for primary network found") + +func NewSubnets( + nodeID ids.NodeID, + configs map[ids.ID]subnets.Config, +) (*Subnets, error) { + if _, ok := configs[constants.PrimaryNetworkID]; !ok { + return nil, ErrNoPrimaryNetworkConfig + } + + s := &Subnets{ + nodeID: nodeID, + configs: configs, + subnets: make(map[ids.ID]subnets.Subnet), + } + + _ = s.Add(constants.PrimaryNetworkID) + return s, nil +} + +// Subnets holds the currently running subnets on this node +type Subnets struct { + nodeID ids.NodeID + configs map[ids.ID]subnets.Config + + lock sync.Mutex + subnets map[ids.ID]subnets.Subnet +} + +// Add a subnet that is being run on this node +func (s *Subnets) Add(subnetID ids.ID) bool { + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.subnets[subnetID]; ok { + return false + } + + // Default to the primary network config if a subnet config was not + // specified + config, ok := s.configs[subnetID] + if !ok { + config = s.configs[constants.PrimaryNetworkID] + } + + s.subnets[subnetID] = subnets.New(s.nodeID, config) + return true +} + +// Get returns a subnet if it is being run on this node +func (s *Subnets) Get(subnetID ids.ID) (subnets.Subnet, bool) { + s.lock.Lock() + defer s.lock.Unlock() + + subnet, ok := s.subnets[subnetID] + return subnet, ok +} + +// Bootstrapping returns subnets that have any chains that are still +// bootstrapping. +func (s *Subnets) Bootstrapping() []ids.ID { + s.lock.Lock() + defer s.lock.Unlock() + + subnetsBootstrapping := make([]ids.ID, 0, len(s.subnets)) + for subnetID, subnet := range s.subnets { + if !subnet.IsBootstrapped() { + subnetsBootstrapping = append(subnetsBootstrapping, subnetID) + } + } + + return subnetsBootstrapping +} diff --git a/chains/subnets_test.go b/chains/subnets_test.go new file mode 100644 index 000000000000..3e1dab49a238 --- /dev/null +++ b/chains/subnets_test.go @@ -0,0 +1,180 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package chains + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/subnets" + "github.com/ava-labs/avalanchego/utils/constants" +) + +func TestNewSubnets(t *testing.T) { + require := require.New(t) + config := map[ids.ID]subnets.Config{ + constants.PrimaryNetworkID: {}, + } + + subnets, err := NewSubnets(ids.EmptyNodeID, config) + require.NoError(err) + + subnet, ok := subnets.Get(constants.PrimaryNetworkID) + require.True(ok) + require.Equal(subnet.Config(), config[constants.PrimaryNetworkID]) +} + +func TestNewSubnets_NoPrimaryNetworkConfig(t *testing.T) { + require := require.New(t) + config := map[ids.ID]subnets.Config{} + + _, err := NewSubnets(ids.EmptyNodeID, config) + require.ErrorIs(err, ErrNoPrimaryNetworkConfig) +} + +func TestSubnetsAdd(t *testing.T) { + testSubnetID := ids.GenerateTestID() + + type add struct { + subnetID ids.ID + want bool + } + + tests := []struct { + name string + adds []add + }{ + { + name: "adding duplicate subnet is a noop", + adds: []add{ + { + subnetID: testSubnetID, + want: true, + }, + { + subnetID: testSubnetID, + }, + }, + }, + { + name: "adding unique subnets succeeds", + adds: []add{ + { + subnetID: ids.GenerateTestID(), + want: true, + }, + { + subnetID: ids.GenerateTestID(), + want: true, + }, + { + subnetID: ids.GenerateTestID(), + want: true, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + config := map[ids.ID]subnets.Config{ + constants.PrimaryNetworkID: {}, + } + subnets, err := NewSubnets(ids.EmptyNodeID, config) + require.NoError(err) + + for _, add := range tt.adds { + got := subnets.Add(add.subnetID) + require.Equal(got, add.want) + + _, ok := subnets.Get(add.subnetID) + require.Equal(add.want, ok) + } + }) + } +} + +func TestSubnetConfigs(t *testing.T) { + testSubnetID := ids.GenerateTestID() + + tests := []struct { + name string + config map[ids.ID]subnets.Config + subnetID ids.ID + want subnets.Config + }{ + { + name: "default to primary network config", + config: map[ids.ID]subnets.Config{ + constants.PrimaryNetworkID: {}, + }, + subnetID: testSubnetID, + want: subnets.Config{}, + }, + { + name: "use subnet config", + config: map[ids.ID]subnets.Config{ + constants.PrimaryNetworkID: {}, + testSubnetID: { + GossipConfig: subnets.GossipConfig{ + AcceptedFrontierValidatorSize: 123456789, + }, + }, + }, + subnetID: testSubnetID, + want: subnets.Config{ + GossipConfig: subnets.GossipConfig{ + AcceptedFrontierValidatorSize: 123456789, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + subnets, err := NewSubnets(ids.EmptyNodeID, tt.config) + require.NoError(err) + + ok := subnets.Add(tt.subnetID) + require.True(ok) + + subnet, ok := subnets.Get(tt.subnetID) + require.True(ok) + + require.Equal(tt.want, subnet.Config()) + }) + } +} + +func TestSubnetsBootstrapping(t *testing.T) { + require := require.New(t) + + config := map[ids.ID]subnets.Config{ + constants.PrimaryNetworkID: {}, + } + + subnets, err := NewSubnets(ids.EmptyNodeID, config) + require.NoError(err) + + subnetID := ids.GenerateTestID() + chainID := ids.GenerateTestID() + + subnets.Add(subnetID) + subnet, ok := subnets.Get(subnetID) + require.True(ok) + + // Start bootstrapping + subnet.AddChain(chainID) + bootstrapping := subnets.Bootstrapping() + require.Contains(bootstrapping, subnetID) + + // Finish bootstrapping + subnet.Bootstrapped(chainID) + require.Empty(subnets.Bootstrapping()) +} diff --git a/node/node.go b/node/node.go index d7ef070fd434..6f87f54d65bb 100644 --- a/node/node.go +++ b/node/node.go @@ -387,6 +387,7 @@ type Node struct { // Specifies how much disk usage each peer can cause before // we rate-limit them. diskTargeter tracker.Targeter + subnets *chains.Subnets } /* @@ -1117,51 +1118,58 @@ func (n *Node) initChainManager(avaxAssetID ids.ID) error { return fmt.Errorf("couldn't initialize chain router: %w", err) } - n.chainManager = chains.New(&chains.ManagerConfig{ - SybilProtectionEnabled: n.Config.SybilProtectionEnabled, - StakingTLSCert: n.Config.StakingTLSCert, - StakingBLSKey: n.Config.StakingSigningKey, - Log: n.Log, - LogFactory: n.LogFactory, - VMManager: n.VMManager, - BlockAcceptorGroup: n.BlockAcceptorGroup, - TxAcceptorGroup: n.TxAcceptorGroup, - VertexAcceptorGroup: n.VertexAcceptorGroup, - DB: n.DB, - MsgCreator: n.msgCreator, - Router: n.chainRouter, - Net: n.Net, - Validators: n.vdrs, - PartialSyncPrimaryNetwork: n.Config.PartialSyncPrimaryNetwork, - NodeID: n.ID, - NetworkID: n.Config.NetworkID, - Server: n.APIServer, - Keystore: n.keystore, - AtomicMemory: n.sharedMemory, - AVAXAssetID: avaxAssetID, - XChainID: xChainID, - CChainID: cChainID, - CriticalChains: criticalChains, - TimeoutManager: n.timeoutManager, - Health: n.health, - ShutdownNodeFunc: n.Shutdown, - MeterVMEnabled: n.Config.MeterVMEnabled, - Metrics: n.MetricsGatherer, - SubnetConfigs: n.Config.SubnetConfigs, - ChainConfigs: n.Config.ChainConfigs, - FrontierPollFrequency: n.Config.FrontierPollFrequency, - ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency, - BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors, - BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent, - BootstrapAncestorsMaxContainersReceived: n.Config.BootstrapAncestorsMaxContainersReceived, - ApricotPhase4Time: version.GetApricotPhase4Time(n.Config.NetworkID), - ApricotPhase4MinPChainHeight: version.ApricotPhase4MinPChainHeight[n.Config.NetworkID], - ResourceTracker: n.resourceTracker, - StateSyncBeacons: n.Config.StateSyncIDs, - TracingEnabled: n.Config.TraceConfig.Enabled, - Tracer: n.tracer, - ChainDataDir: n.Config.ChainDataDir, - }) + n.subnets, err = chains.NewSubnets(n.ID, n.Config.SubnetConfigs) + if err != nil { + return fmt.Errorf("failed to initialize subnets: %w", err) + } + n.chainManager = chains.New( + &chains.ManagerConfig{ + SybilProtectionEnabled: n.Config.SybilProtectionEnabled, + StakingTLSCert: n.Config.StakingTLSCert, + StakingBLSKey: n.Config.StakingSigningKey, + Log: n.Log, + LogFactory: n.LogFactory, + VMManager: n.VMManager, + BlockAcceptorGroup: n.BlockAcceptorGroup, + TxAcceptorGroup: n.TxAcceptorGroup, + VertexAcceptorGroup: n.VertexAcceptorGroup, + DB: n.DB, + MsgCreator: n.msgCreator, + Router: n.chainRouter, + Net: n.Net, + Validators: n.vdrs, + PartialSyncPrimaryNetwork: n.Config.PartialSyncPrimaryNetwork, + NodeID: n.ID, + NetworkID: n.Config.NetworkID, + Server: n.APIServer, + Keystore: n.keystore, + AtomicMemory: n.sharedMemory, + AVAXAssetID: avaxAssetID, + XChainID: xChainID, + CChainID: cChainID, + CriticalChains: criticalChains, + TimeoutManager: n.timeoutManager, + Health: n.health, + ShutdownNodeFunc: n.Shutdown, + MeterVMEnabled: n.Config.MeterVMEnabled, + Metrics: n.MetricsGatherer, + SubnetConfigs: n.Config.SubnetConfigs, + ChainConfigs: n.Config.ChainConfigs, + FrontierPollFrequency: n.Config.FrontierPollFrequency, + ConsensusAppConcurrency: n.Config.ConsensusAppConcurrency, + BootstrapMaxTimeGetAncestors: n.Config.BootstrapMaxTimeGetAncestors, + BootstrapAncestorsMaxContainersSent: n.Config.BootstrapAncestorsMaxContainersSent, + BootstrapAncestorsMaxContainersReceived: n.Config.BootstrapAncestorsMaxContainersReceived, + ApricotPhase4Time: version.GetApricotPhase4Time(n.Config.NetworkID), + ApricotPhase4MinPChainHeight: version.ApricotPhase4MinPChainHeight[n.Config.NetworkID], + ResourceTracker: n.resourceTracker, + StateSyncBeacons: n.Config.StateSyncIDs, + TracingEnabled: n.Config.TraceConfig.Enabled, + Tracer: n.tracer, + ChainDataDir: n.Config.ChainDataDir, + }, + n.subnets, + ) // Notify the API server when new chains are created n.chainManager.AddRegistrant(n.APIServer)