Skip to content

Commit

Permalink
core/bootstrap: cleaned up bootstrapping
Browse files Browse the repository at this point in the history
Moved it to its own package to isolate scope.
  • Loading branch information
jbenet committed Jan 20, 2015
1 parent b872aeb commit aae09ae
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 112 deletions.
203 changes: 148 additions & 55 deletions core/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
Expand All @@ -16,116 +17,208 @@ import (

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)

// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap
// peers to bootstrap correctly.
var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap")

const (
period = 30 * time.Second // how often to check connection status
connectiontimeout time.Duration = period / 3 // duration to wait when attempting to connect
recoveryThreshold = 4 // attempt to bootstrap if connection count falls below this value
numDHTBootstrapQueries = 15 // number of DHT queries to execute
// BootstrapPeriod governs the periodic interval at which the node will
// attempt to bootstrap. The bootstrap process is not very expensive, so
// this threshold can afford to be small (<=30s).
BootstrapPeriod = 30 * time.Second

// BootstrapPeerThreshold governs the node Bootstrap process. If the node
// has less open connections than this number, it will open connections
// to the bootstrap nodes. From there, the routing system should be able
// to use the connections to the bootstrap nodes to connect to even more
// peers. Routing systems like the IpfsDHT do so in their own Bootstrap
// process, which issues random queries to find more peers.
BootstrapPeerThreshold = 4

// BootstrapConnectionTimeout determines how long to wait for a bootstrap
// connection attempt before cancelling it.
BootstrapConnectionTimeout time.Duration = BootstrapPeriod / 3
)

func superviseConnections(parent context.Context,
h host.Host,
route *dht.IpfsDHT, // TODO depend on abstract interface for testing purposes
store peer.Peerstore,
peers []peer.PeerInfo) error {
// nodeBootstrapper is a small object used to bootstrap an IpfsNode.
type nodeBootstrapper struct {
node *IpfsNode
}

var dhtAlreadyBootstrapping bool
// TryToBootstrap starts IpfsNode bootstrapping. This function will run an
// initial bootstrapping phase before exiting: connect to several bootstrap
// nodes. This allows callers to call this function synchronously to:
// - check if an error occurrs (bootstrapping unsuccessful)
// - wait before starting services which require the node to be bootstrapped
//
// If bootstrapping initially fails, Bootstrap() will try again for a total of
// three times, before giving up completely. Note that in environments where a
// node may be initialized offline, as normal operation, BootstrapForever()
// should be used instead.
//
// Note: this function could be much cleaner if we were to relax the constraint
// that we want to exit **after** we have performed initial bootstrapping (and are
// thus connected to nodes). The constraint may not be that useful in practice.
// Consider cases when we initialize the node while disconnected from the internet.
// We don't want this launch to fail... want to continue launching the node, hoping
// that bootstrapping will work in the future if we get connected.
func (nb *nodeBootstrapper) TryToBootstrap(ctx context.Context, peers []peer.PeerInfo) error {
n := nb.node

// TODO what bootstrapping should happen if there is no DHT? i.e. we could
// continue connecting to our bootstrap peers, but for what purpose? for now
// simply exit without connecting to any of them. When we introduce another
// routing system that uses bootstrap peers we can change this.
dht, ok := n.Routing.(*dht.IpfsDHT)
if !ok {
return nil
}

for {
ctx, _ := context.WithTimeout(parent, connectiontimeout)
// TODO get config from disk so |peers| always reflects the latest
// information
if err := bootstrap(ctx, h, route, store, peers); err != nil {
log.Error(err)
for i := 0; i < 3; i++ {
if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil {
return err
}
}

if !dhtAlreadyBootstrapping {
dhtAlreadyBootstrapping = true // only call dht.Bootstrap once.
if _, err := route.Bootstrap(); err != nil {
log.Error(err)
}
// at this point we have done at least one round of initial bootstrap.
// we're ready to kick off dht bootstrapping.
dbproc, err := dht.Bootstrap(ctx)
if err != nil {
return err
}

// kick off the node's periodic bootstrapping
proc := periodicproc.Tick(BootstrapPeriod, func(worker goprocess.Process) {
if err := bootstrapRound(ctx, n.PeerHost, dht, n.Peerstore, peers); err != nil {
log.Error(err)
}
})

// add dht bootstrap proc as a child, so it is closed automatically when we are.
proc.AddChild(dbproc)

// we were given a context. instead of returning proc for the caller
// to manage, for now we just close the proc when context is done.
go func() {
<-ctx.Done()
proc.Close()
}()
return nil
}

select {
case <-parent.Done():
return parent.Err()
case <-time.Tick(period):
// BootstrapForever starts IpfsNode bootstrapping. Unlike TryToBootstrap(),
// BootstrapForever() will run indefinitely (until its context is cancelled).
// This is particularly useful for the daemon and other services, which may
// be started offline and will come online at a future date.
//
// TODO: check offline --to--> online case works well and doesn't hurt perf.
// We may still be dialing. We should check network config.
func (nb *nodeBootstrapper) BootstrapForever(ctx context.Context, peers []peer.PeerInfo) error {
for {
if err := nb.TryToBootstrap(ctx, peers); err == nil {
return nil
}
}
return nil
}

func bootstrap(ctx context.Context,
h host.Host,
r *dht.IpfsDHT,
ps peer.Peerstore,
func bootstrapRound(ctx context.Context,
host host.Host,
route *dht.IpfsDHT,
peerstore peer.Peerstore,
bootstrapPeers []peer.PeerInfo) error {

connectedPeers := h.Network().Peers()
if len(connectedPeers) >= recoveryThreshold {
log.Event(ctx, "bootstrapSkip", h.ID())
log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes",
h.ID(), len(connectedPeers), recoveryThreshold)
ctx, _ = context.WithTimeout(ctx, BootstrapConnectionTimeout)

// determine how many bootstrap connections to open
connectedPeers := host.Network().Peers()
if len(connectedPeers) >= BootstrapPeerThreshold {
log.Event(ctx, "bootstrapSkip", host.ID())
log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes",
host.ID(), len(connectedPeers), BootstrapPeerThreshold)
return nil
}
numCxnsToCreate := recoveryThreshold - len(connectedPeers)

log.Event(ctx, "bootstrapStart", h.ID())
log.Debugf("%s core bootstrapping to %d more nodes", h.ID(), numCxnsToCreate)
numCxnsToCreate := BootstrapPeerThreshold - len(connectedPeers)

// filter out bootstrap nodes we are already connected to
var notConnected []peer.PeerInfo
for _, p := range bootstrapPeers {
if h.Network().Connectedness(p.ID) != inet.Connected {
if host.Network().Connectedness(p.ID) != inet.Connected {
notConnected = append(notConnected, p)
}
}

// if not connected to all bootstrap peer candidates
if len(notConnected) > 0 {
var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate)
log.Debugf("%s bootstrapping to %d nodes: %s", h.ID(), numCxnsToCreate, randomSubset)
if err := connect(ctx, ps, r, randomSubset); err != nil {
log.Event(ctx, "bootstrapError", h.ID(), lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", h.ID(), err)
return err
}
// if connected to all bootstrap peer candidates, exit
if len(notConnected) < 1 {
log.Debugf("%s no more bootstrap peers to create %d connections", host.ID(), numCxnsToCreate)
return ErrNotEnoughBootstrapPeers
}

// connect to a random susbset of bootstrap candidates
var randomSubset = randomSubsetOfPeers(notConnected, numCxnsToCreate)
log.Event(ctx, "bootstrapStart", host.ID())
log.Debugf("%s bootstrapping to %d nodes: %s", host.ID(), numCxnsToCreate, randomSubset)
if err := bootstrapConnect(ctx, peerstore, route, randomSubset); err != nil {
log.Event(ctx, "bootstrapError", host.ID(), lgbl.Error(err))
log.Errorf("%s bootstrap error: %s", host.ID(), err)
return err
}
return nil
}

func connect(ctx context.Context, ps peer.Peerstore, r *dht.IpfsDHT, peers []peer.PeerInfo) error {
func bootstrapConnect(ctx context.Context,
ps peer.Peerstore,
route *dht.IpfsDHT,
peers []peer.PeerInfo) error {
if len(peers) < 1 {
return errors.New("bootstrap set empty")
return ErrNotEnoughBootstrapPeers
}

errs := make(chan error, len(peers))
var wg sync.WaitGroup
for _, p := range peers {

// performed asynchronously because when performed synchronously, if
// one `Connect` call hangs, subsequent calls are more likely to
// fail/abort due to an expiring context.
// Also, performed asynchronously for dial speed.

wg.Add(1)
go func(p peer.PeerInfo) {
defer wg.Done()
log.Event(ctx, "bootstrapDial", r.LocalPeer(), p.ID)
log.Debugf("%s bootstrapping to %s", r.LocalPeer(), p.ID)
log.Event(ctx, "bootstrapDial", route.LocalPeer(), p.ID)
log.Debugf("%s bootstrapping to %s", route.LocalPeer(), p.ID)

ps.AddAddresses(p.ID, p.Addrs)
err := r.Connect(ctx, p.ID)
err := route.Connect(ctx, p.ID)
if err != nil {
log.Event(ctx, "bootstrapFailed", p.ID)
log.Criticalf("failed to bootstrap with %v: %s", p.ID, err)
log.Errorf("failed to bootstrap with %v: %s", p.ID, err)
errs <- err
return
}
log.Event(ctx, "bootstrapSuccess", p.ID)
log.Infof("bootstrapped with %v", p.ID)
}(p)
}
wg.Wait()

// our failure condition is when no connection attempt succeeded.
// So drain the errs channel, counting the results.
close(errs)
count := 0
var err error
for err = range errs {
if err != nil {
count++
}
}
if count == len(peers) {
return fmt.Errorf("failed to bootstrap. %s", err)
}
return nil
}

Expand Down
22 changes: 2 additions & 20 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,30 +297,12 @@ func (n *IpfsNode) Resolve(k util.Key) (*merkledag.Node, error) {
func (n *IpfsNode) Bootstrap(ctx context.Context, peers []peer.PeerInfo) error {

// TODO what should return value be when in offlineMode?

if n.Routing == nil {
return nil
}

// TODO what bootstrapping should happen if there is no DHT? i.e. we could
// continue connecting to our bootstrap peers, but for what purpose?
dhtRouting, ok := n.Routing.(*dht.IpfsDHT)
if !ok {
return nil
}

// TODO consider moving connection supervision into the Network. We've
// discussed improvements to this Node constructor. One improvement
// would be to make the node configurable, allowing clients to inject
// an Exchange, Network, or Routing component and have the constructor
// manage the wiring. In that scenario, this dangling function is a bit
// awkward.

// spin off the node's connection supervisor.
// TODO, clean up how this thing works. Make the superviseConnections thing
// work like the DHT.Bootstrap.
go superviseConnections(ctx, n.PeerHost, dhtRouting, n.Peerstore, peers)
return nil
nb := nodeBootstrapper{n}
return nb.TryToBootstrap(ctx, peers)
}

func (n *IpfsNode) loadID() error {
Expand Down
66 changes: 29 additions & 37 deletions routing/dht/dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)

// DefaultBootstrapQueries specifies how many queries to run,
Expand Down Expand Up @@ -54,9 +55,9 @@ const DefaultBootstrapTimeout = time.Duration(10 * time.Second)
// and connected to at least a few nodes.
//
// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it.
func (dht *IpfsDHT) Bootstrap() (goprocess.Process, error) {
func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) {

if err := dht.runBootstrap(dht.Context(), DefaultBootstrapQueries); err != nil {
if err := dht.runBootstrap(ctx, DefaultBootstrapQueries); err != nil {
return nil, err
}

Expand All @@ -79,41 +80,32 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop
return nil, fmt.Errorf("invalid signal: %v", signal)
}

proc := goprocess.Go(func(worker goprocess.Process) {
defer log.Debug("dht bootstrapper shutting down")
for {
select {
case <-worker.Closing():
return

case <-signal:
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?

ctx := dht.Context()
if err := dht.runBootstrap(ctx, queries); err != nil {
log.Error(err)
// A bootstrapping error is important to notice but not fatal.
// maybe the client should be able to consume these errors,
// though I dont have a clear use case in mind-- what **could**
// the client do if one of the bootstrap calls fails?
//
// This is also related to the core's bootstrap failures.
// superviseConnections should perhaps allow clients to detect
// bootstrapping problems.
//
// Anyway, passing errors could be done with a bootstrapper object.
// this would imply the client should be able to consume a lot of
// other non-fatal dht errors too. providing this functionality
// should be done correctly DHT-wide.
// NB: whatever the design, clients must ensure they drain errors!
// This pattern is common to many things, perhaps long-running services
// should have something like an ErrStream that allows clients to consume
// periodic errors and take action. It should allow the user to also
// ignore all errors with something like an ErrStreamDiscard. We should
// study what other systems do for ideas.
}
}
proc := periodicproc.Ticker(signal, func(worker goprocess.Process) {
// it would be useful to be able to send out signals of when we bootstrap, too...
// maybe this is a good case for whole module event pub/sub?

ctx := dht.Context()
if err := dht.runBootstrap(ctx, queries); err != nil {
log.Error(err)
// A bootstrapping error is important to notice but not fatal.
// maybe the client should be able to consume these errors,
// though I dont have a clear use case in mind-- what **could**
// the client do if one of the bootstrap calls fails?
//
// This is also related to the core's bootstrap failures.
// superviseConnections should perhaps allow clients to detect
// bootstrapping problems.
//
// Anyway, passing errors could be done with a bootstrapper object.
// this would imply the client should be able to consume a lot of
// other non-fatal dht errors too. providing this functionality
// should be done correctly DHT-wide.
// NB: whatever the design, clients must ensure they drain errors!
// This pattern is common to many things, perhaps long-running services
// should have something like an ErrStream that allows clients to consume
// periodic errors and take action. It should allow the user to also
// ignore all errors with something like an ErrStreamDiscard. We should
// study what other systems do for ideas.
}
})

Expand Down

0 comments on commit aae09ae

Please sign in to comment.