From 750a5322fa656e9952ce030c6adcb282eee646c8 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 11 Dec 2019 17:25:45 -0500 Subject: [PATCH] feat(dht): provider records use multihashes instead of CIDs --- dht.go | 7 +- dht_test.go | 33 +++++++-- go.mod | 1 + handlers.go | 23 ++++--- providers/providers.go | 33 ++++----- providers/providers_test.go | 132 +++++++++++++++++++----------------- routing.go | 28 +++++--- 7 files changed, 146 insertions(+), 111 deletions(-) diff --git a/dht.go b/dht.go index 2ee2894eb..20186e687 100644 --- a/dht.go +++ b/dht.go @@ -374,11 +374,12 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( } } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) { - eip := logger.EventBegin(ctx, "findProvidersSingle", p, key) +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, keyCid cid.Cid) (*pb.Message, error) { + eip := logger.EventBegin(ctx, "findProvidersSingle", p, keyCid) defer eip.Done() - pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.Bytes(), 0) + key := keyCid.Hash() + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0) resp, err := dht.sendRequest(ctx, p, pmes) switch err { case nil: diff --git a/dht_test.go b/dht_test.go index 8458e948c..21c8b8be6 100644 --- a/dht_test.go +++ b/dht_test.go @@ -3,6 +3,7 @@ package dht import ( "bytes" "context" + "encoding/binary" "errors" "fmt" "math/rand" @@ -15,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + "github.com/multiformats/go-multihash" "github.com/multiformats/go-multistream" "golang.org/x/xerrors" @@ -42,8 +44,26 @@ func init() { for i := 0; i < 100; i++ { v := fmt.Sprintf("%d -- value", i) - mhv := u.Hash([]byte(v)) - testCaseCids = append(testCaseCids, cid.NewCidV0(mhv)) + var newCid cid.Cid + switch i % 3 { + case 0: + mhv := u.Hash([]byte(v)) + newCid = cid.NewCidV0(mhv) + case 1: + mhv := u.Hash([]byte(v)) + newCid = cid.NewCidV1(cid.DagCBOR, mhv) + case 2: + rawMh := make([]byte, 12) + binary.PutUvarint(rawMh, cid.Raw) + binary.PutUvarint(rawMh[1:], 10) + copy(rawMh[2:], []byte(v)[:10]) + _, mhv, err := multihash.MHFromBytes(rawMh) + if err != nil { + panic(err) + } + newCid = cid.NewCidV1(cid.Raw, mhv) + } + testCaseCids = append(testCaseCids, newCid) } } @@ -593,7 +613,7 @@ func TestLocalProvides(t *testing.T) { for _, c := range testCaseCids { for i := 0; i < 3; i++ { - provs := dhts[i].providers.GetProviders(ctx, c) + provs := dhts[i].providers.GetProviders(ctx, c.Hash()) if len(provs) > 0 { t.Fatal("shouldnt know this") } @@ -1330,7 +1350,7 @@ func TestClientModeConnect(t *testing.T) { c := testCaseCids[0] p := peer.ID("TestPeer") - a.providers.AddProvider(ctx, c, p) + a.providers.AddProvider(ctx, c.Hash(), p) time.Sleep(time.Millisecond * 5) // just in case... provs, err := b.FindProviders(ctx, c) @@ -1504,6 +1524,7 @@ func TestFindClosestPeers(t *testing.T) { func TestProvideDisabled(t *testing.T) { k := testCaseCids[0] + kHash := k.Hash() for i := 0; i < 3; i++ { enabledA := (i & 0x1) > 0 enabledB := (i & 0x2) > 0 @@ -1544,7 +1565,7 @@ func TestProvideDisabled(t *testing.T) { if err != routing.ErrNotSupported { t.Fatal("get should have failed on node B") } - provs := dhtB.providers.GetProviders(ctx, k) + provs := dhtB.providers.GetProviders(ctx, kHash) if len(provs) != 0 { t.Fatal("node B should not have found local providers") } @@ -1560,7 +1581,7 @@ func TestProvideDisabled(t *testing.T) { t.Fatal("node A should not have found providers") } } - provAddrs := dhtA.providers.GetProviders(ctx, k) + provAddrs := dhtA.providers.GetProviders(ctx, kHash) if len(provAddrs) != 0 { t.Fatal("node A should not have found local providers") } diff --git a/go.mod b/go.mod index a2adca197..4990f1f1b 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/multiformats/go-base32 v0.0.3 github.com/multiformats/go-multiaddr v0.2.0 github.com/multiformats/go-multiaddr-dns v0.2.0 + github.com/multiformats/go-multihash v0.0.10 github.com/multiformats/go-multistream v0.1.0 github.com/stretchr/testify v1.4.0 go.opencensus.io v0.22.2 diff --git a/handlers.go b/handlers.go index 23292ce80..918d881cf 100644 --- a/handlers.go +++ b/handlers.go @@ -12,8 +12,9 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" pstore "github.com/libp2p/go-libp2p-peerstore" + mh "github.com/multiformats/go-multihash" + "github.com/gogo/protobuf/proto" - "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" u "github.com/ipfs/go-ipfs-util" pb "github.com/libp2p/go-libp2p-kad-dht/pb" @@ -317,26 +318,26 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. logger.SetTag(ctx, "peer", p) resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) - c, err := cid.Cast([]byte(pmes.GetKey())) + _, h, err := mh.MHFromBytes(pmes.GetKey()) if err != nil { return nil, err } - logger.SetTag(ctx, "key", c) + logger.SetTag(ctx, "key", h) // debug logging niceness. - reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, c) + reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, h) logger.Debugf("%s begin", reqDesc) defer logger.Debugf("%s end", reqDesc) // check if we have this value, to add ourselves as provider. - has, err := dht.datastore.Has(convertToDsKey(c.Bytes())) + has, err := dht.datastore.Has(convertToDsKey(h)) if err != nil && err != ds.ErrNotFound { logger.Debugf("unexpected datastore error: %v\n", err) has = false } // setup providers - providers := dht.providers.GetProviders(ctx, c) + providers := dht.providers.GetProviders(ctx, h) if has { providers = append(providers, dht.self) logger.Debugf("%s have the value. added self as provider", reqDesc) @@ -366,13 +367,13 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M defer func() { logger.FinishWithErr(ctx, _err) }() logger.SetTag(ctx, "peer", p) - c, err := cid.Cast([]byte(pmes.GetKey())) + _, h, err := mh.MHFromBytes(pmes.GetKey()) if err != nil { return nil, err } - logger.SetTag(ctx, "key", c) + logger.SetTag(ctx, "key", h) - logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, c) + logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, h) // add provider should use the address given in the message pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) @@ -389,12 +390,12 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M continue } - logger.Debugf("received provider %s for %s (addrs: %s)", p, c, pi.Addrs) + logger.Debugf("received provider %s for %s (addrs: %s)", p, h, pi.Addrs) if pi.ID != dht.self { // don't add own addrs. // add the received addresses to our peerstore. dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL) } - dht.providers.AddProvider(ctx, c, p) + dht.providers.AddProvider(ctx, h, p) } return nil, nil diff --git a/providers/providers.go b/providers/providers.go index 2f98fc144..5f9a8fc93 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -9,8 +9,9 @@ import ( "github.com/libp2p/go-libp2p-core/peer" + mh "github.com/multiformats/go-multihash" + lru "github.com/hashicorp/golang-lru/simplelru" - cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" autobatch "github.com/ipfs/go-datastore/autobatch" dsq "github.com/ipfs/go-datastore/query" @@ -47,12 +48,12 @@ type providerSet struct { } type addProv struct { - k cid.Cid + k mh.Multihash val peer.ID } type getProv struct { - k cid.Cid + k mh.Multihash resp chan []peer.ID } @@ -76,15 +77,15 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) const providersKeyPrefix = "/providers/" -func mkProvKey(k cid.Cid) string { - return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes()) +func mkProvKey(k mh.Multihash) string { + return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k) } func (pm *ProviderManager) Process() goprocess.Process { return pm.proc } -func (pm *ProviderManager) providersForKey(k cid.Cid) ([]peer.ID, error) { +func (pm *ProviderManager) providersForKey(k mh.Multihash) ([]peer.ID, error) { pset, err := pm.getProvSet(k) if err != nil { return nil, err @@ -92,8 +93,8 @@ func (pm *ProviderManager) providersForKey(k cid.Cid) ([]peer.ID, error) { return pset.providers, nil } -func (pm *ProviderManager) getProvSet(k cid.Cid) (*providerSet, error) { - cached, ok := pm.providers.Get(k) +func (pm *ProviderManager) getProvSet(k mh.Multihash) (*providerSet, error) { + cached, ok := pm.providers.Get(string(k)) if ok { return cached.(*providerSet), nil } @@ -104,13 +105,13 @@ func (pm *ProviderManager) getProvSet(k cid.Cid) (*providerSet, error) { } if len(pset.providers) > 0 { - pm.providers.Add(k, pset) + pm.providers.Add(string(k), pset) } return pset, nil } -func loadProvSet(dstore ds.Datastore, k cid.Cid) (*providerSet, error) { +func loadProvSet(dstore ds.Datastore, k mh.Multihash) (*providerSet, error) { res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)}) if err != nil { return nil, err @@ -174,20 +175,20 @@ func readTimeValue(data []byte) (time.Time, error) { return time.Unix(0, nsec), nil } -func (pm *ProviderManager) addProv(k cid.Cid, p peer.ID) error { +func (pm *ProviderManager) addProv(k mh.Multihash, p peer.ID) error { now := time.Now() - if provs, ok := pm.providers.Get(k); ok { + if provs, ok := pm.providers.Get(string(k)); ok { provs.(*providerSet).setVal(p, now) } // else not cached, just write through return writeProviderEntry(pm.dstore, k, p, now) } -func mkProvKeyFor(k cid.Cid, p peer.ID) string { +func mkProvKeyFor(k mh.Multihash, p peer.ID) string { return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p)) } -func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) error { +func writeProviderEntry(dstore ds.Datastore, k mh.Multihash, p peer.ID, t time.Time) error { dsk := mkProvKeyFor(k, p) buf := make([]byte, 16) @@ -300,7 +301,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { } // AddProvider adds a provider. -func (pm *ProviderManager) AddProvider(ctx context.Context, k cid.Cid, val peer.ID) { +func (pm *ProviderManager) AddProvider(ctx context.Context, k mh.Multihash, val peer.ID) { prov := &addProv{ k: k, val: val, @@ -313,7 +314,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k cid.Cid, val peer. // GetProviders returns the set of providers for the given key. // This method _does not_ copy the set. Do not modify it. -func (pm *ProviderManager) GetProviders(ctx context.Context, k cid.Cid) []peer.ID { +func (pm *ProviderManager) GetProviders(ctx context.Context, k mh.Multihash) []peer.ID { gp := &getProv{ k: k, resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking diff --git a/providers/providers_test.go b/providers/providers_test.go index 58756c55c..1d0d094a8 100644 --- a/providers/providers_test.go +++ b/providers/providers_test.go @@ -10,7 +10,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" - cid "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" dssync "github.com/ipfs/go-datastore/sync" @@ -26,7 +27,7 @@ func TestProviderManager(t *testing.T) { mid := peer.ID("testing") p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore())) - a := cid.NewCidV0(u.Hash([]byte("test"))) + a := u.Hash([]byte("test")) p.AddProvider(ctx, a, peer.ID("testingprovider")) // Not cached @@ -56,14 +57,14 @@ func TestProvidersDatastore(t *testing.T) { defer p.proc.Close() friend := peer.ID("friend") - var cids []cid.Cid + var mhs []mh.Multihash for i := 0; i < 100; i++ { - c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) - cids = append(cids, c) - p.AddProvider(ctx, c, friend) + h := u.Hash([]byte(fmt.Sprint(i))) + mhs = append(mhs, h) + p.AddProvider(ctx, h, friend) } - for _, c := range cids { + for _, c := range mhs { resp := p.GetProviders(ctx, c) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") @@ -77,7 +78,7 @@ func TestProvidersDatastore(t *testing.T) { func TestProvidersSerialization(t *testing.T) { dstore := dssync.MutexWrap(ds.NewMapDatastore()) - k := cid.NewCidV0(u.Hash(([]byte("my key!")))) + k := u.Hash(([]byte("my key!"))) p1 := peer.ID("peer one") p2 := peer.ID("peer two") pt1 := time.Now() @@ -135,26 +136,26 @@ func TestProvidesExpire(t *testing.T) { p := NewProviderManager(ctx, mid, ds) peers := []peer.ID{"a", "b"} - var cids []cid.Cid + var mhs []mh.Multihash for i := 0; i < 10; i++ { - c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) - cids = append(cids, c) + h := u.Hash([]byte(fmt.Sprint(i))) + mhs = append(mhs, h) } - for _, c := range cids[:5] { - p.AddProvider(ctx, c, peers[0]) - p.AddProvider(ctx, c, peers[1]) + for _, h := range mhs[:5] { + p.AddProvider(ctx, h, peers[0]) + p.AddProvider(ctx, h, peers[1]) } time.Sleep(time.Second / 4) - for _, c := range cids[5:] { - p.AddProvider(ctx, c, peers[0]) - p.AddProvider(ctx, c, peers[1]) + for _, h := range mhs[5:] { + p.AddProvider(ctx, h, peers[0]) + p.AddProvider(ctx, h, peers[1]) } - for _, c := range cids { - out := p.GetProviders(ctx, c) + for _, h := range mhs { + out := p.GetProviders(ctx, h) if len(out) != 2 { t.Fatal("expected providers to still be there") } @@ -162,15 +163,15 @@ func TestProvidesExpire(t *testing.T) { time.Sleep(3 * time.Second / 8) - for _, c := range cids[:5] { - out := p.GetProviders(ctx, c) + for _, h := range mhs[:5] { + out := p.GetProviders(ctx, h) if len(out) > 0 { t.Fatal("expected providers to be cleaned up, got: ", out) } } - for _, c := range cids[5:] { - out := p.GetProviders(ctx, c) + for _, h := range mhs[5:] { + out := p.GetProviders(ctx, h) if len(out) != 2 { t.Fatal("expected providers to still be there") } @@ -201,30 +202,34 @@ func TestProvidesExpire(t *testing.T) { var _ = ioutil.NopCloser var _ = os.DevNull -/* This can be used for profiling. Keeping it commented out for now to avoid incurring extra CI time +// TestLargeProvidersSet can be used for profiling. +// The datastore can be switched to levelDB by uncommenting the section below and the import above func TestLargeProvidersSet(t *testing.T) { + t.Skip("This can be used for profiling. Skipping it for now to avoid incurring extra CI time") old := lruCacheSize lruCacheSize = 10 defer func() { lruCacheSize = old }() - dirn, err := ioutil.TempDir("", "provtest") - if err != nil { - t.Fatal(err) - } - - opts := &lds.Options{ - NoSync: true, - Compression: 1, - } - lds, err := lds.NewDatastore(dirn, opts) - if err != nil { - t.Fatal(err) - } - _ = lds + dstore := ds.NewMapDatastore() - defer func() { - os.RemoveAll(dirn) - }() + //dirn, err := ioutil.TempDir("", "provtest") + //if err != nil { + // t.Fatal(err) + //} + // + //opts := &lds.Options{ + // NoSync: true, + // Compression: 1, + //} + //lds, err := lds.NewDatastore(dirn, opts) + //if err != nil { + // t.Fatal(err) + //} + //dstore = lds + // + //defer func() { + // os.RemoveAll(dirn) + //}() ctx := context.Background() var peers []peer.ID @@ -233,28 +238,27 @@ func TestLargeProvidersSet(t *testing.T) { } mid := peer.ID("myself") - p := NewProviderManager(ctx, mid, lds) + p := NewProviderManager(ctx, mid, dstore) defer p.proc.Close() - var cids []cid.Cid + var mhs []mh.Multihash for i := 0; i < 1000; i++ { - c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i)))) - cids = append(cids, c) + h := u.Hash([]byte(fmt.Sprint(i))) + mhs = append(mhs, h) for _, pid := range peers { - p.AddProvider(ctx, c, pid) + p.AddProvider(ctx, h, pid) } } for i := 0; i < 5; i++ { start := time.Now() - for _, c := range cids { - _ = p.GetProviders(ctx, c) + for _, h := range mhs { + _ = p.GetProviders(ctx, h) } elapsed := time.Since(start) fmt.Printf("query %f ms\n", elapsed.Seconds()*1000) } } -*/ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { old := lruCacheSize @@ -264,20 +268,20 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) { defer cancel() p1, p2 := peer.ID("a"), peer.ID("b") - c1 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("1"))) - c2 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("2"))) + h1 := u.Hash([]byte("1")) + h2 := u.Hash([]byte("2")) pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) // add provider - pm.AddProvider(ctx, c1, p1) - // make the cached provider for c1 go to datastore - pm.AddProvider(ctx, c2, p1) + pm.AddProvider(ctx, h1, p1) + // make the cached provider for h1 go to datastore + pm.AddProvider(ctx, h2, p1) // now just offloaded record should be brought back and joined with p2 - pm.AddProvider(ctx, c1, p2) + pm.AddProvider(ctx, h1, p2) - c1Provs := pm.GetProviders(ctx, c1) - if len(c1Provs) != 2 { - t.Fatalf("expected c1 to be provided by 2 peers, is by %d", len(c1Provs)) + h1Provs := pm.GetProviders(ctx, h1) + if len(h1Provs) != 2 { + t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(h1Provs)) } } @@ -286,18 +290,18 @@ func TestWriteUpdatesCache(t *testing.T) { defer cancel() p1, p2 := peer.ID("a"), peer.ID("b") - c1 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("1"))) + h1 := u.Hash([]byte("1")) pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore())) // add provider - pm.AddProvider(ctx, c1, p1) + pm.AddProvider(ctx, h1, p1) // force into the cache - pm.GetProviders(ctx, c1) + pm.GetProviders(ctx, h1) // add a second provider - pm.AddProvider(ctx, c1, p2) + pm.AddProvider(ctx, h1, p2) - c1Provs := pm.GetProviders(ctx, c1) + c1Provs := pm.GetProviders(ctx, h1) if len(c1Provs) != 2 { - t.Fatalf("expected c1 to be provided by 2 peers, is by %d", len(c1Provs)) + t.Fatalf("expected h1 to be provided by 2 peers, is by %d", len(c1Provs)) } } diff --git a/routing.go b/routing.go index a796f3e6c..c2fc69a46 100644 --- a/routing.go +++ b/routing.go @@ -13,6 +13,8 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" + mh "github.com/multiformats/go-multihash" + "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" @@ -412,11 +414,12 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha // locations of the value, similarly to Coral and Mainline DHT. // Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { +func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (err error) { if !dht.enableProviders { return routing.ErrNotSupported } - eip := logger.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst}) + eip := logger.EventBegin(ctx, "Provide", keyCid, logging.LoggableMap{"broadcast": brdcst}) + key := keyCid.Hash() defer func() { if err != nil { eip.SetError(err) @@ -451,7 +454,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err defer cancel() } - peers, err := dht.GetClosestPeers(closerCtx, key.KeyString()) + peers, err := dht.GetClosestPeers(closerCtx, string(key)) if err != nil { return err } @@ -476,7 +479,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err wg.Wait() return nil } -func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) { +func (dht *IpfsDHT) makeProvRecord(key mh.Multihash) (*pb.Message, error) { pi := peer.AddrInfo{ ID: dht.self, Addrs: dht.host.Addrs(), @@ -488,7 +491,7 @@ func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) { return nil, fmt.Errorf("no known addresses for self. cannot put provider.") } - pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, skey.Bytes(), 0) + pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0) pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi}) return pmes, nil } @@ -521,10 +524,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i return peerOut } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan peer.AddrInfo) { - defer logger.EventBegin(ctx, "findProvidersAsync", key).Done() +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Cid, count int, peerOut chan peer.AddrInfo) { + defer logger.EventBegin(ctx, "findProvidersAsync", keyCid).Done() defer close(peerOut) + key := keyCid.Hash() + ps := peer.NewLimitedSet(count) provs := dht.providers.GetProviders(ctx, key) for _, p := range provs { @@ -545,7 +550,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, } } - peers := dht.routingTable.NearestPeers(kb.ConvertKey(key.KeyString()), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), AlphaValue) if len(peers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, @@ -556,12 +561,12 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, // setup the Query parent := ctx - query := dht.newQuery(key.KeyString(), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + query := dht.newQuery(string(key), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { routing.PublishQueryEvent(parent, &routing.QueryEvent{ Type: routing.SendingQuery, ID: p, }) - pmes, err := dht.findProvidersSingle(ctx, p, key) + pmes, err := dht.findProvidersSingle(ctx, p, keyCid) if err != nil { return nil, err } @@ -624,7 +629,8 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, } // refresh the k-bucket containing this key after the query is run - dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) + dht.routingTable.BucketForID(kb.ConvertKey(string(key))).ResetRefreshedAt(time.Now()) + } // FindPeer searches for a peer with given ID.