From fe3ea46fcd6b4d7d37c2203f6c9c77e24f974746 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 14 Oct 2019 21:54:00 +0800 Subject: [PATCH] changes as per review --- dht.go | 19 +++++++++++-------- dht_bootstrap.go | 14 ++++++++++++++ dht_test.go | 23 ++++++++++++----------- persist/interfaces.go | 4 ++++ persist/seeder.go | 15 +++++++-------- 5 files changed, 48 insertions(+), 27 deletions(-) diff --git a/dht.go b/dht.go index b565ca951..3a1b0e567 100644 --- a/dht.go +++ b/dht.go @@ -96,20 +96,23 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er } // set seeder, snapshotter & fallback peers if not set - if cfg.Persistence.Seeder == nil { - cfg.Persistence.Seeder = persist.NewRandomSeeder(h, persist.DefaultRndSeederTarget) + seeder := cfg.Persistence.Seeder + if seeder == nil { + seeder = persist.NewRandomSeeder(h, persist.DefaultRndSeederTarget) } - if cfg.Persistence.Snapshotter == nil { + snapshotter := cfg.Persistence.Snapshotter + if snapshotter == nil { s, err := persist.NewDatastoreSnapshotter(cfg.Datastore, persist.DefaultSnapshotNS) // should never happen if err != nil { logger.Error("failed to initialize the default datastore backed snapshotter") panic(err) } - cfg.Persistence.Snapshotter = s + snapshotter = s } + if len(cfg.Persistence.FallbackPeers) == 0 { - cfg.Persistence.FallbackPeers = DefaultBootstrapPeerIDs + cfg.Persistence.FallbackPeers = getDefaultBootstrapPeerIDs() } dht := makeDHT(ctx, h, &cfg, cfg.BucketSize) @@ -122,17 +125,17 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er }) // fetch the last snapshot & feed it to the seeder - candidates, err := cfg.Persistence.Snapshotter.Load() + candidates, err := snapshotter.Load() if err != nil { logger.Warningf("error while loading snapshot of DHT routing table: %s, cannot seed dht", err) - } else if err := cfg.Persistence.Seeder.Seed(dht.routingTable, candidates, cfg.Persistence.FallbackPeers); err != nil { + } else if err := seeder.Seed(dht.routingTable, candidates, cfg.Persistence.FallbackPeers); err != nil { logger.Warningf("error while seeding candidates to the routing table: %s", err) } // schedule periodic snapshots sproc := periodicproc.Tick(cfg.Persistence.SnapshotInterval, func(proc goprocess.Process) { logger.Debugf("storing snapshot of DHT routing table") - err := cfg.Persistence.Snapshotter.Store(dht.routingTable) + err := snapshotter.Store(dht.routingTable) if err != nil { logger.Warningf("error while storing snapshot of DHT routing table snapshot: %s", err) } diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 6e40c597a..00fae813d 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" @@ -41,6 +42,19 @@ func init() { } } +func getDefaultBootstrapPeerIDs() []peer.ID { + var defaultBootstrapPeerIDs []peer.ID + for i := range DefaultBootstrapPeers { + info, err := peer.AddrInfoFromP2pAddr(DefaultBootstrapPeers[i]) + if err != nil { + logger.Errorf("failed to get peerID for peer with multiaddress %s: error is %s", DefaultBootstrapPeers[i].String(), err) + continue + } + defaultBootstrapPeerIDs = append(defaultBootstrapPeerIDs, info.ID) + } + return defaultBootstrapPeerIDs +} + // BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod. func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { triggerBootstrapFnc := func() { diff --git a/dht_test.go b/dht_test.go index 4bf92279c..51290dfb1 100644 --- a/dht_test.go +++ b/dht_test.go @@ -12,22 +12,12 @@ import ( "testing" "time" - "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" - "github.com/libp2p/go-libp2p-kad-dht/persist" - "github.com/multiformats/go-multistream" - - "golang.org/x/xerrors" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" + "github.com/libp2p/go-libp2p-kad-dht/persist" - "github.com/ipfs/go-cid" - u "github.com/ipfs/go-ipfs-util" "github.com/libp2p/go-libp2p-core/peer" kb "github.com/libp2p/go-libp2p-kbucket" "github.com/libp2p/go-libp2p-record" @@ -35,7 +25,18 @@ import ( "github.com/libp2p/go-libp2p-testing/ci" travisci "github.com/libp2p/go-libp2p-testing/ci/travis" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + ma "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multistream" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + u "github.com/ipfs/go-ipfs-util" + + "golang.org/x/xerrors" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var testCaseCids []cid.Cid diff --git a/persist/interfaces.go b/persist/interfaces.go index 5874b8f7c..eac3ebc9a 100644 --- a/persist/interfaces.go +++ b/persist/interfaces.go @@ -10,9 +10,13 @@ import ( var logSeed = log.Logger("dht/seeder") var logSnapshot = log.Logger("dht/snapshot") +// A Seeder provides the ability to seed/fill a routing table with peers obtained from the latest snapshot +// and a set of fallback peers. type Seeder interface { // Seed takes an optional set of candidates from a snapshot (or nil if none could be loaded), // and a set of fallback peers, and it seeds a routing table instance with working peers. + // Note: This works only if the the dht uses a persistent peerstore across restarts + // because we can recover metadata for peers obtained from a snapshot only if the peerstore was/is persistent. Seed(into *kbucket.RoutingTable, candidates []peer.ID, fallback []peer.ID) error } diff --git a/persist/seeder.go b/persist/seeder.go index b9c975cb6..0d03e6d31 100644 --- a/persist/seeder.go +++ b/persist/seeder.go @@ -9,19 +9,19 @@ import ( "github.com/ipfs/go-todocounter" "github.com/libp2p/go-libp2p-core/host" + inet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-kbucket" - inet "github.com/libp2p/go-libp2p-net" ) // SeedDialGracePeriod is the grace period for one dial attempt var SeedDialGracePeriod = 5 * time.Second -// TotalSeedDialGracePeriod is the total grace period for a group of dial attempts -var TotalSeedDialGracePeriod = 30 * time.Second +// SeederDialTimeout is the total grace period for a group of dial attempts +var SeederDialTimeout = 30 * time.Second -// NSimultaneousDial is the number of peers we will dial simultaneously -var NSimultaneousDial = 50 +// SeederConcurrentDials is the number of peers we will dial simultaneously +var SeederConcurrentDials = 50 var ErrPartialSeed = errors.New("routing table seeded partially") @@ -106,11 +106,11 @@ func (rs *randomSeeder) Seed(into *kbucket.RoutingTable, candidates []peer.ID, f } resCh := make(chan result) // dial results. - ctx, cancel := context.WithTimeout(context.Background(), TotalSeedDialGracePeriod) + ctx, cancel := context.WithTimeout(context.Background(), SeederDialTimeout) defer cancel() // start dialing - semaphore := make(chan struct{}, NSimultaneousDial) + semaphore := make(chan struct{}, SeederConcurrentDials) go func(peers []peer.ID) { for _, p := range peers { semaphore <- struct{}{} @@ -155,6 +155,5 @@ func (rs *randomSeeder) Seed(into *kbucket.RoutingTable, candidates []peer.ID, f return attemptSeedWithPeers(fallback) } - // There is a God after all return nil }