Skip to content

Commit

Permalink
refactor chain manager subnets
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com>
  • Loading branch information
joshua-kim committed Feb 7, 2024
1 parent c523cee commit a625386
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 101 deletions.
73 changes: 17 additions & 56 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ var (
// Bootstrapping prefixes for ChainVMs
ChainBootstrappingDBPrefix = []byte("bs")

errUnknownVMType = errors.New("the vm should have type avalanche.DAGVM or snowman.ChainVM")
errCreatePlatformVM = errors.New("attempted to create a chain running the PlatformVM")
errNotBootstrapped = errors.New("subnets not bootstrapped")
errNoPrimaryNetworkConfig = errors.New("no subnet config for primary network found")
errPartialSyncAsAValidator = errors.New("partial sync should not be configured for a validator")
errUnknownVMType = errors.New("the vm should have type avalanche.DAGVM or snowman.ChainVM")
errCreatePlatformVM = errors.New("attempted to create a chain running the PlatformVM")
errNotBootstrapped = errors.New("subnets not bootstrapped")
errPrimaryNetworkNotRunning = errors.New("node is not running the primary network")
errPartialSyncAsAValidator = errors.New("partial sync should not be configured for a validator")

fxs = map[ids.ID]fx.Factory{
secp256k1fx.ID: &secp256k1fx.Factory{},
Expand Down Expand Up @@ -256,10 +256,7 @@ type manager struct {
chainCreatorShutdownCh chan struct{}
chainCreatorExited sync.WaitGroup

subnetsLock sync.RWMutex
// Key: Subnet's ID
// Value: Subnet description
subnets map[ids.ID]subnets.Subnet
subnets *Subnets

chainsLock sync.Mutex
// Key: Chain's ID
Expand All @@ -271,13 +268,13 @@ type manager struct {
}

// New returns a new Manager
func New(config *ManagerConfig) Manager {
func New(config *ManagerConfig, subnets *Subnets) Manager {
return &manager{
Aliaser: ids.NewAliaser(),
ManagerConfig: *config,
stakingSigner: config.StakingTLSCert.PrivateKey.(crypto.Signer),
stakingCert: staking.CertificateFromX509(config.StakingTLSCert.Leaf),
subnets: make(map[ids.ID]subnets.Subnet),
subnets: subnets,
chains: make(map[ids.ID]handler.Handler),
chainsQueue: buffer.NewUnboundedBlockingDeque[ChainParameters](initialQueueSize),
unblockChainCreatorCh: make(chan struct{}),
Expand All @@ -288,25 +285,11 @@ func New(config *ManagerConfig) Manager {
// QueueChainCreation queues a chain creation request
// Invariant: Tracked Subnet must be checked before calling this function
func (m *manager) QueueChainCreation(chainParams ChainParameters) {
m.subnetsLock.Lock()
subnetID := chainParams.SubnetID
sb, exists := m.subnets[subnetID]
if !exists {
sbConfig, ok := m.SubnetConfigs[subnetID]
if !ok {
// default to primary subnet config
sbConfig = m.SubnetConfigs[constants.PrimaryNetworkID]
}
sb = subnets.New(m.NodeID, sbConfig)
m.subnets[chainParams.SubnetID] = sb
}
addedChain := sb.AddChain(chainParams.ID)
m.subnetsLock.Unlock()

if !addedChain {
_ = m.subnets.Add(chainParams.SubnetID)
if sb, _ := m.subnets.Get(chainParams.SubnetID); !sb.AddChain(chainParams.ID) {
m.Log.Debug("skipping chain creation",
zap.String("reason", "chain already staged"),
zap.Stringer("subnetID", subnetID),
zap.Stringer("subnetID", chainParams.SubnetID),
zap.Stringer("chainID", chainParams.ID),
zap.Stringer("vmID", chainParams.VMID),
)
Expand All @@ -316,7 +299,7 @@ func (m *manager) QueueChainCreation(chainParams ChainParameters) {
if ok := m.chainsQueue.PushRight(chainParams); !ok {
m.Log.Warn("skipping chain creation",
zap.String("reason", "couldn't enqueue chain"),
zap.Stringer("subnetID", subnetID),
zap.Stringer("subnetID", chainParams.SubnetID),
zap.Stringer("chainID", chainParams.ID),
zap.Stringer("vmID", chainParams.VMID),
)
Expand All @@ -334,9 +317,7 @@ func (m *manager) createChain(chainParams ChainParameters) {
zap.Stringer("vmID", chainParams.VMID),
)

m.subnetsLock.RLock()
sb := m.subnets[chainParams.SubnetID]
m.subnetsLock.RUnlock()
sb, _ := m.subnets.Get(chainParams.SubnetID)

// Note: buildChain builds all chain's relevant objects (notably engine and handler)
// but does not start their operations. Starting of the handler (which could potentially
Expand Down Expand Up @@ -1307,23 +1288,9 @@ func (m *manager) IsBootstrapped(id ids.ID) bool {
return chain.Context().State.Get().State == snow.NormalOp
}

func (m *manager) subnetsNotBootstrapped() []ids.ID {
m.subnetsLock.RLock()
defer m.subnetsLock.RUnlock()

subnetsBootstrapping := make([]ids.ID, 0, len(m.subnets))
for subnetID, subnet := range m.subnets {
if !subnet.IsBootstrapped() {
subnetsBootstrapping = append(subnetsBootstrapping, subnetID)
}
}
return subnetsBootstrapping
}

func (m *manager) registerBootstrappedHealthChecks() error {
bootstrappedCheck := health.CheckerFunc(func(context.Context) (interface{}, error) {
subnetIDs := m.subnetsNotBootstrapped()
if len(subnetIDs) != 0 {
if subnetIDs := m.subnets.Bootstrapping(); len(subnetIDs) != 0 {
return subnetIDs, errNotBootstrapped
}
return []ids.ID{}, nil
Expand Down Expand Up @@ -1365,18 +1332,12 @@ func (m *manager) registerBootstrappedHealthChecks() error {

// Starts chain creation loop to process queued chains
func (m *manager) StartChainCreator(platformParams ChainParameters) error {
// Get the Primary Network's subnet config. If it wasn't registered, then we
// throw a fatal error.
sbConfig, ok := m.SubnetConfigs[constants.PrimaryNetworkID]
// Add the P-Chain to the Primary Network
sb, ok := m.subnets.Get(constants.PrimaryNetworkID)
if !ok {
return errNoPrimaryNetworkConfig
return errPrimaryNetworkNotRunning
}

sb := subnets.New(m.NodeID, sbConfig)
m.subnetsLock.Lock()
m.subnets[platformParams.SubnetID] = sb
sb.AddChain(platformParams.ID)
m.subnetsLock.Unlock()

// The P-chain is created synchronously to ensure that `VM.Initialize` has
// finished before returning from this function. This is required because
Expand Down
87 changes: 87 additions & 0 deletions chains/subnets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package chains

import (
"errors"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/subnets"
"github.com/ava-labs/avalanchego/utils/constants"
)

var ErrNoPrimaryNetworkConfig = errors.New("no subnet config for primary network found")

func NewSubnets(
nodeID ids.NodeID,
configs map[ids.ID]subnets.Config,
) (*Subnets, error) {
if _, ok := configs[constants.PrimaryNetworkID]; !ok {
return nil, ErrNoPrimaryNetworkConfig
}

s := &Subnets{
nodeID: nodeID,
configs: configs,
subnets: make(map[ids.ID]subnets.Subnet),
}

_ = s.Add(constants.PrimaryNetworkID)
return s, nil
}

// Subnets holds the currently running subnets on this node
type Subnets struct {
nodeID ids.NodeID
configs map[ids.ID]subnets.Config

lock sync.Mutex
subnets map[ids.ID]subnets.Subnet
}

// Add a subnet that is being run on this node
func (s *Subnets) Add(subnetID ids.ID) bool {
s.lock.Lock()
defer s.lock.Unlock()

if _, ok := s.subnets[subnetID]; ok {
return false
}

// Default to the primary network config if a subnet config was not
// specified
config, ok := s.configs[subnetID]
if !ok {
config = s.configs[constants.PrimaryNetworkID]
}

s.subnets[subnetID] = subnets.New(s.nodeID, config)
return true
}

// Get returns a subnet if it is being run on this node
func (s *Subnets) Get(subnetID ids.ID) (subnets.Subnet, bool) {
s.lock.Lock()
defer s.lock.Unlock()

subnet, ok := s.subnets[subnetID]
return subnet, ok
}

// Bootstrapping returns subnets that have any chains that are still
// bootstrapping.
func (s *Subnets) Bootstrapping() []ids.ID {
s.lock.Lock()
defer s.lock.Unlock()

subnetsBootstrapping := make([]ids.ID, 0, len(s.subnets))
for subnetID, subnet := range s.subnets {
if !subnet.IsBootstrapped() {
subnetsBootstrapping = append(subnetsBootstrapping, subnetID)
}
}

return subnetsBootstrapping
}
Loading

0 comments on commit a625386

Please sign in to comment.