Skip to content

Commit

Permalink
changes as per review
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Oct 14, 2019
1 parent d5346f7 commit fe3ea46
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 27 deletions.
19 changes: 11 additions & 8 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,23 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
}

// set seeder, snapshotter & fallback peers if not set
if cfg.Persistence.Seeder == nil {
cfg.Persistence.Seeder = persist.NewRandomSeeder(h, persist.DefaultRndSeederTarget)
seeder := cfg.Persistence.Seeder
if seeder == nil {
seeder = persist.NewRandomSeeder(h, persist.DefaultRndSeederTarget)
}
if cfg.Persistence.Snapshotter == nil {
snapshotter := cfg.Persistence.Snapshotter
if snapshotter == nil {
s, err := persist.NewDatastoreSnapshotter(cfg.Datastore, persist.DefaultSnapshotNS)
// should never happen
if err != nil {
logger.Error("failed to initialize the default datastore backed snapshotter")
panic(err)
}
cfg.Persistence.Snapshotter = s
snapshotter = s
}

if len(cfg.Persistence.FallbackPeers) == 0 {
cfg.Persistence.FallbackPeers = DefaultBootstrapPeerIDs
cfg.Persistence.FallbackPeers = getDefaultBootstrapPeerIDs()
}

dht := makeDHT(ctx, h, &cfg, cfg.BucketSize)
Expand All @@ -122,17 +125,17 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
})

// fetch the last snapshot & feed it to the seeder
candidates, err := cfg.Persistence.Snapshotter.Load()
candidates, err := snapshotter.Load()
if err != nil {
logger.Warningf("error while loading snapshot of DHT routing table: %s, cannot seed dht", err)
} else if err := cfg.Persistence.Seeder.Seed(dht.routingTable, candidates, cfg.Persistence.FallbackPeers); err != nil {
} else if err := seeder.Seed(dht.routingTable, candidates, cfg.Persistence.FallbackPeers); err != nil {
logger.Warningf("error while seeding candidates to the routing table: %s", err)
}

// schedule periodic snapshots
sproc := periodicproc.Tick(cfg.Persistence.SnapshotInterval, func(proc goprocess.Process) {
logger.Debugf("storing snapshot of DHT routing table")
err := cfg.Persistence.Snapshotter.Store(dht.routingTable)
err := snapshotter.Store(dht.routingTable)
if err != nil {
logger.Warningf("error while storing snapshot of DHT routing table snapshot: %s", err)
}
Expand Down
14 changes: 14 additions & 0 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -41,6 +42,19 @@ func init() {
}
}

func getDefaultBootstrapPeerIDs() []peer.ID {
var defaultBootstrapPeerIDs []peer.ID
for i := range DefaultBootstrapPeers {
info, err := peer.AddrInfoFromP2pAddr(DefaultBootstrapPeers[i])
if err != nil {
logger.Errorf("failed to get peerID for peer with multiaddress %s: error is %s", DefaultBootstrapPeers[i].String(), err)
continue
}
defaultBootstrapPeerIDs = append(defaultBootstrapPeerIDs, info.ID)
}
return defaultBootstrapPeerIDs
}

// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
triggerBootstrapFnc := func() {
Expand Down
23 changes: 12 additions & 11 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,31 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht/persist"
"github.com/multiformats/go-multistream"

"golang.org/x/xerrors"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/persist"

"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
"github.com/libp2p/go-libp2p-core/peer"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p-record"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p-testing/ci"
travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

ma "github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multistream"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"

"golang.org/x/xerrors"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var testCaseCids []cid.Cid
Expand Down
4 changes: 4 additions & 0 deletions persist/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
var logSeed = log.Logger("dht/seeder")
var logSnapshot = log.Logger("dht/snapshot")

// A Seeder provides the ability to seed/fill a routing table with peers obtained from the latest snapshot
// and a set of fallback peers.
type Seeder interface {
// Seed takes an optional set of candidates from a snapshot (or nil if none could be loaded),
// and a set of fallback peers, and it seeds a routing table instance with working peers.
// Note: This works only if the the dht uses a persistent peerstore across restarts
// because we can recover metadata for peers obtained from a snapshot only if the peerstore was/is persistent.
Seed(into *kbucket.RoutingTable, candidates []peer.ID, fallback []peer.ID) error
}

Expand Down
15 changes: 7 additions & 8 deletions persist/seeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ import (
"github.com/ipfs/go-todocounter"

"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-kbucket"
inet "github.com/libp2p/go-libp2p-net"
)

// SeedDialGracePeriod is the grace period for one dial attempt
var SeedDialGracePeriod = 5 * time.Second

// TotalSeedDialGracePeriod is the total grace period for a group of dial attempts
var TotalSeedDialGracePeriod = 30 * time.Second
// SeederDialTimeout is the total grace period for a group of dial attempts
var SeederDialTimeout = 30 * time.Second

// NSimultaneousDial is the number of peers we will dial simultaneously
var NSimultaneousDial = 50
// SeederConcurrentDials is the number of peers we will dial simultaneously
var SeederConcurrentDials = 50

var ErrPartialSeed = errors.New("routing table seeded partially")

Expand Down Expand Up @@ -106,11 +106,11 @@ func (rs *randomSeeder) Seed(into *kbucket.RoutingTable, candidates []peer.ID, f
}

resCh := make(chan result) // dial results.
ctx, cancel := context.WithTimeout(context.Background(), TotalSeedDialGracePeriod)
ctx, cancel := context.WithTimeout(context.Background(), SeederDialTimeout)
defer cancel()

// start dialing
semaphore := make(chan struct{}, NSimultaneousDial)
semaphore := make(chan struct{}, SeederConcurrentDials)
go func(peers []peer.ID) {
for _, p := range peers {
semaphore <- struct{}{}
Expand Down Expand Up @@ -155,6 +155,5 @@ func (rs *randomSeeder) Seed(into *kbucket.RoutingTable, candidates []peer.ID, f
return attemptSeedWithPeers(fallback)
}

// There is a God after all
return nil
}

0 comments on commit fe3ea46

Please sign in to comment.