From 4a4348057c32d535fd1b3ff1aa455ac0b391f150 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 12 Dec 2019 16:02:22 -0500 Subject: [PATCH] provider record keys can be an arbitrary byte array less than 80 bytes instead of only a multihash --- dht.go | 10 +++++----- handlers.go | 30 ++++++++++++++---------------- providers/providers.go | 24 +++++++++++------------- routing.go | 35 ++++++++++++++++------------------- 4 files changed, 46 insertions(+), 53 deletions(-) diff --git a/dht.go b/dht.go index 20186e687..7ab11c554 100644 --- a/dht.go +++ b/dht.go @@ -374,18 +374,18 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( } } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, keyCid cid.Cid) (*pb.Message, error) { - eip := logger.EventBegin(ctx, "findProvidersSingle", p, keyCid) +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) { + eip := logger.EventBegin(ctx, "findProvidersSingle", p, key) defer eip.Done() - key := keyCid.Hash() - pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0) + keyMH := key.Hash() + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, keyMH, 0) resp, err := dht.sendRequest(ctx, p, pmes) switch err { case nil: return resp, nil case ErrReadTimeout: - logger.Warningf("read timeout: %s %s", p.Pretty(), key) + logger.Warningf("read timeout: %s %s", p.Pretty(), keyMH) fallthrough default: eip.SetError(err) diff --git a/handlers.go b/handlers.go index 918d881cf..fec98c51e 100644 --- a/handlers.go +++ b/handlers.go @@ -12,8 +12,6 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" pstore "github.com/libp2p/go-libp2p-peerstore" - mh "github.com/multiformats/go-multihash" - "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" u "github.com/ipfs/go-ipfs-util" @@ -318,26 +316,26 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. logger.SetTag(ctx, "peer", p) resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) - _, h, err := mh.MHFromBytes(pmes.GetKey()) - if err != nil { - return nil, err + key := pmes.GetKey() + if len(key) > 80 { + return nil, fmt.Errorf("handleGetProviders key size too large") } - logger.SetTag(ctx, "key", h) + logger.SetTag(ctx, "key", key) // debug logging niceness. - reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, h) + reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key) logger.Debugf("%s begin", reqDesc) defer logger.Debugf("%s end", reqDesc) // check if we have this value, to add ourselves as provider. - has, err := dht.datastore.Has(convertToDsKey(h)) + has, err := dht.datastore.Has(convertToDsKey(key)) if err != nil && err != ds.ErrNotFound { logger.Debugf("unexpected datastore error: %v\n", err) has = false } // setup providers - providers := dht.providers.GetProviders(ctx, h) + providers := dht.providers.GetProviders(ctx, key) if has { providers = append(providers, dht.self) logger.Debugf("%s have the value. added self as provider", reqDesc) @@ -367,13 +365,13 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M defer func() { logger.FinishWithErr(ctx, _err) }() logger.SetTag(ctx, "peer", p) - _, h, err := mh.MHFromBytes(pmes.GetKey()) - if err != nil { - return nil, err + key := pmes.GetKey() + if len(key) > 80 { + return nil, fmt.Errorf("handleAddProviders key size too large") } - logger.SetTag(ctx, "key", h) + logger.SetTag(ctx, "key", key) - logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, h) + logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key) // add provider should use the address given in the message pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) @@ -390,12 +388,12 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M continue } - logger.Debugf("received provider %s for %s (addrs: %s)", p, h, pi.Addrs) + logger.Debugf("received provider %s for %s (addrs: %s)", p, key, pi.Addrs) if pi.ID != dht.self { // don't add own addrs. // add the received addresses to our peerstore. dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL) } - dht.providers.AddProvider(ctx, h, p) + dht.providers.AddProvider(ctx, key, p) } return nil, nil diff --git a/providers/providers.go b/providers/providers.go index 5f9a8fc93..a446f87be 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -9,8 +9,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" - mh "github.com/multiformats/go-multihash" - lru "github.com/hashicorp/golang-lru/simplelru" ds "github.com/ipfs/go-datastore" autobatch "github.com/ipfs/go-datastore/autobatch" @@ -48,12 +46,12 @@ type providerSet struct { } type addProv struct { - k mh.Multihash + k []byte val peer.ID } type getProv struct { - k mh.Multihash + k []byte resp chan []peer.ID } @@ -77,7 +75,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) const providersKeyPrefix = "/providers/" -func mkProvKey(k mh.Multihash) string { +func mkProvKey(k []byte) string { return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k) } @@ -85,7 +83,7 @@ func (pm *ProviderManager) Process() goprocess.Process { return pm.proc } -func (pm *ProviderManager) providersForKey(k mh.Multihash) ([]peer.ID, error) { +func (pm *ProviderManager) providersForKey(k []byte) ([]peer.ID, error) { pset, err := pm.getProvSet(k) if err != nil { return nil, err @@ -93,7 +91,7 @@ func (pm *ProviderManager) providersForKey(k mh.Multihash) ([]peer.ID, error) { return pset.providers, nil } -func (pm *ProviderManager) getProvSet(k mh.Multihash) (*providerSet, error) { +func (pm *ProviderManager) getProvSet(k []byte) (*providerSet, error) { cached, ok := pm.providers.Get(string(k)) if ok { return cached.(*providerSet), nil @@ -111,7 +109,7 @@ func (pm *ProviderManager) getProvSet(k mh.Multihash) (*providerSet, error) { return pset, nil } -func loadProvSet(dstore ds.Datastore, k mh.Multihash) (*providerSet, error) { +func loadProvSet(dstore ds.Datastore, k []byte) (*providerSet, error) { res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)}) if err != nil { return nil, err @@ -175,7 +173,7 @@ func readTimeValue(data []byte) (time.Time, error) { return time.Unix(0, nsec), nil } -func (pm *ProviderManager) addProv(k mh.Multihash, p peer.ID) error { +func (pm *ProviderManager) addProv(k []byte, p peer.ID) error { now := time.Now() if provs, ok := pm.providers.Get(string(k)); ok { provs.(*providerSet).setVal(p, now) @@ -184,11 +182,11 @@ func (pm *ProviderManager) addProv(k mh.Multihash, p peer.ID) error { return writeProviderEntry(pm.dstore, k, p, now) } -func mkProvKeyFor(k mh.Multihash, p peer.ID) string { +func mkProvKeyFor(k []byte, p peer.ID) string { return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p)) } -func writeProviderEntry(dstore ds.Datastore, k mh.Multihash, p peer.ID, t time.Time) error { +func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error { dsk := mkProvKeyFor(k, p) buf := make([]byte, 16) @@ -301,7 +299,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { } // AddProvider adds a provider. -func (pm *ProviderManager) AddProvider(ctx context.Context, k mh.Multihash, val peer.ID) { +func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) { prov := &addProv{ k: k, val: val, @@ -314,7 +312,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k mh.Multihash, val // GetProviders returns the set of providers for the given key. // This method _does not_ copy the set. Do not modify it. -func (pm *ProviderManager) GetProviders(ctx context.Context, k mh.Multihash) []peer.ID { +func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID { gp := &getProv{ k: k, resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking diff --git a/routing.go b/routing.go index c2fc69a46..aba249e8a 100644 --- a/routing.go +++ b/routing.go @@ -13,8 +13,6 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" - mh "github.com/multiformats/go-multihash" - "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" @@ -414,12 +412,12 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha // locations of the value, similarly to Coral and Mainline DHT. // Provide makes this node announce that it can provide a value for the given key -func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (err error) { +func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { if !dht.enableProviders { return routing.ErrNotSupported } - eip := logger.EventBegin(ctx, "Provide", keyCid, logging.LoggableMap{"broadcast": brdcst}) - key := keyCid.Hash() + eip := logger.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst}) + keyMH := key.Hash() defer func() { if err != nil { eip.SetError(err) @@ -428,7 +426,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e }() // add self locally - dht.providers.AddProvider(ctx, key, dht.self) + dht.providers.AddProvider(ctx, keyMH, dht.self) if !brdcst { return nil } @@ -454,12 +452,12 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e defer cancel() } - peers, err := dht.GetClosestPeers(closerCtx, string(key)) + peers, err := dht.GetClosestPeers(closerCtx, string(keyMH)) if err != nil { return err } - mes, err := dht.makeProvRecord(key) + mes, err := dht.makeProvRecord(keyMH) if err != nil { return err } @@ -469,7 +467,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e wg.Add(1) go func(p peer.ID) { defer wg.Done() - logger.Debugf("putProvider(%s, %s)", key, p) + logger.Debugf("putProvider(%s, %s)", keyMH, p) err := dht.sendMessage(ctx, p, mes) if err != nil { logger.Debug(err) @@ -479,7 +477,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e wg.Wait() return nil } -func (dht *IpfsDHT) makeProvRecord(key mh.Multihash) (*pb.Message, error) { +func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) { pi := peer.AddrInfo{ ID: dht.self, Addrs: dht.host.Addrs(), @@ -524,14 +522,13 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i return peerOut } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Cid, count int, peerOut chan peer.AddrInfo) { - defer logger.EventBegin(ctx, "findProvidersAsync", keyCid).Done() +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan peer.AddrInfo) { + defer logger.EventBegin(ctx, "findProvidersAsync", key).Done() defer close(peerOut) - key := keyCid.Hash() - + keyMH := key.Hash() ps := peer.NewLimitedSet(count) - provs := dht.providers.GetProviders(ctx, key) + provs := dht.providers.GetProviders(ctx, keyMH) for _, p := range provs { // NOTE: Assuming that this list of peers is unique if ps.TryAdd(p) { @@ -550,7 +547,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci } } - peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(keyMH)), AlphaValue) if len(peers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, @@ -561,12 +558,12 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci // setup the Query parent := ctx - query := dht.newQuery(string(key), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + query := dht.newQuery(string(keyMH), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { routing.PublishQueryEvent(parent, &routing.QueryEvent{ Type: routing.SendingQuery, ID: p, }) - pmes, err := dht.findProvidersSingle(ctx, p, keyCid) + pmes, err := dht.findProvidersSingle(ctx, p, key) if err != nil { return nil, err } @@ -629,7 +626,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci } // refresh the k-bucket containing this key after the query is run - dht.routingTable.BucketForID(kb.ConvertKey(string(key))).ResetRefreshedAt(time.Now()) + dht.routingTable.BucketForID(kb.ConvertKey(string(keyMH))).ResetRefreshedAt(time.Now()) }