From 5d2e3df37d2422e73c087be61d114ed6ce39c6ef Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 17 Dec 2019 17:25:11 -0500 Subject: [PATCH] make all key types loggable --- dht.go | 11 +++++------ go.sum | 6 ------ lookup.go | 18 +++++++++++++++--- lookup_test.go | 8 +++++++- routing.go | 21 +++++++++++---------- 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/dht.go b/dht.go index 7ab11c554..c2c3f9abd 100644 --- a/dht.go +++ b/dht.go @@ -24,7 +24,6 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/providers" "github.com/gogo/protobuf/proto" - "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" "github.com/jbenet/goprocess" @@ -33,6 +32,7 @@ import ( record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" "github.com/multiformats/go-base32" + "github.com/multiformats/go-multihash" ) var logger = logging.Logger("dht") @@ -374,18 +374,17 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( } } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) { - eip := logger.EventBegin(ctx, "findProvidersSingle", p, key) +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) { + eip := logger.EventBegin(ctx, "findProvidersSingle", p, multihashLoggableKey(key)) defer eip.Done() - keyMH := key.Hash() - pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, keyMH, 0) + pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0) resp, err := dht.sendRequest(ctx, p, pmes) switch err { case nil: return resp, nil case ErrReadTimeout: - logger.Warningf("read timeout: %s %s", p.Pretty(), keyMH) + logger.Warningf("read timeout: %s %s", p.Pretty(), key) fallthrough default: eip.SetError(err) diff --git a/go.sum b/go.sum index c2f35654f..d5492936c 100644 --- a/go.sum +++ b/go.sum @@ -163,8 +163,6 @@ github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xO github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= -github.com/libp2p/go-libp2p v0.4.2 h1:p0cthB0jDNHO4gH2HzS8/nAMMXbfUlFHs0jwZ4U+F2g= -github.com/libp2p/go-libp2p v0.4.2/go.mod h1:MNmgUxUw5pMsdOzMlT0EE7oKjRasl+WyVwM0IBlpKgQ= github.com/libp2p/go-libp2p v0.5.0 h1:/nnb5mc2TK6TwknECsWIkfCwMTHv0AXbvzxlnVivfeg= github.com/libp2p/go-libp2p v0.5.0/go.mod h1:Os7a5Z3B+ErF4v7zgIJ7nBHNu2LYt8ZMLkTQUB3G/wA= github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI= @@ -189,8 +187,6 @@ github.com/libp2p/go-libp2p-core v0.3.0 h1:F7PqduvrztDtFsAa/bcheQ3azmNo+Nq7m8hQY github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw= github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= -github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs= -github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g= github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM= @@ -268,8 +264,6 @@ github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQti github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc= github.com/libp2p/go-tcp-transport v0.1.1 h1:yGlqURmqgNA2fvzjSgZNlHcsd/IulAnKM8Ncu+vlqnw= github.com/libp2p/go-tcp-transport v0.1.1/go.mod h1:3HzGvLbx6etZjnFlERyakbaYPdfjg2pWP97dFZworkY= -github.com/libp2p/go-ws-transport v0.1.2 h1:VnxQcLfSGtqupqPpBNu8fUiCv+IN1RJ2BcVqQEM+z8E= -github.com/libp2p/go-ws-transport v0.1.2/go.mod h1:dsh2Ld8F+XNmzpkaAijmg5Is+e9l6/1tK/6VFOdN69Y= github.com/libp2p/go-ws-transport v0.2.0 h1:MJCw2OrPA9+76YNRvdo1wMnSOxb9Bivj6sVFY1Xrj6w= github.com/libp2p/go-ws-transport v0.2.0/go.mod h1:9BHJz/4Q5A9ludYWKoGCFC5gUElzlHoKzu0yY9p/klM= github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg= diff --git a/lookup.go b/lookup.go index ee1552360..48c23e205 100644 --- a/lookup.go +++ b/lookup.go @@ -13,6 +13,8 @@ import ( pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" notif "github.com/libp2p/go-libp2p-routing/notifications" + "github.com/multiformats/go-base32" + "github.com/multiformats/go-multihash" ) func tryFormatLoggableKey(k string) (string, error) { @@ -33,11 +35,15 @@ func tryFormatLoggableKey(k string) (string, error) { cstr = k } + var encStr string c, err := cid.Cast([]byte(cstr)) - if err != nil { - return "", fmt.Errorf("loggableKey could not cast key to a CID: %x %v", k, err) + if err == nil { + encStr = c.String() + } else { + encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr)) } - return fmt.Sprintf("/%s/%s", proto, c.String()), nil + + return fmt.Sprintf("/%s/%s", proto, encStr), nil } func loggableKey(k string) logging.LoggableMap { @@ -53,6 +59,12 @@ func loggableKey(k string) logging.LoggableMap { } } +func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap { + return logging.LoggableMap{ + "multihash": base32.RawStdEncoding.EncodeToString(mh), + } +} + // Kademlia 'node lookup' operation. Returns a channel of the K closest peers // to the given key func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) { diff --git a/lookup_test.go b/lookup_test.go index 5dc5f29fa..6e29a8ea2 100644 --- a/lookup_test.go +++ b/lookup_test.go @@ -28,9 +28,15 @@ func TestLoggableKey(t *testing.T) { t.Error("expected cid to be formatted as a loggable key") } - for _, s := range []string{"bla bla", "/bla", "/bla/asdf", ""} { + for _, s := range []string{"/bla", ""} { if _, err := tryFormatLoggableKey(s); err == nil { t.Errorf("expected to fail formatting: %s", s) } } + + for _, s := range []string{"bla bla", "/bla/asdf"} { + if _, err := tryFormatLoggableKey(s); err != nil { + t.Errorf("expected to be formatable: %s", s) + } + } } diff --git a/routing.go b/routing.go index de8c265e0..69a6f35ef 100644 --- a/routing.go +++ b/routing.go @@ -19,6 +19,7 @@ import ( pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" + "github.com/multiformats/go-multihash" ) // asyncQueryBuffer is the size of buffered channels in async queries. This @@ -415,8 +416,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err if !dht.enableProviders { return routing.ErrNotSupported } - eip := logger.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst}) keyMH := key.Hash() + eip := logger.EventBegin(ctx, "Provide", multihashLoggableKey(keyMH), logging.LoggableMap{"broadcast": brdcst}) defer func() { if err != nil { eip.SetError(err) @@ -515,19 +516,19 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i return peerOut } - logger.Event(ctx, "findProviders", key) + keyMH := key.Hash() + logger.Event(ctx, "findProviders", multihashLoggableKey(keyMH)) - go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) + go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut) return peerOut } -func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan peer.AddrInfo) { - defer logger.EventBegin(ctx, "findProvidersAsync", key).Done() +func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) { + defer logger.EventBegin(ctx, "findProvidersAsync", multihashLoggableKey(key)).Done() defer close(peerOut) - keyMH := key.Hash() ps := peer.NewLimitedSet(count) - provs := dht.providers.GetProviders(ctx, keyMH) + provs := dht.providers.GetProviders(ctx, key) for _, p := range provs { // NOTE: Assuming that this list of peers is unique if ps.TryAdd(p) { @@ -546,7 +547,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, } } - peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(keyMH)), AlphaValue) + peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), AlphaValue) if len(peers) == 0 { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.QueryError, @@ -557,7 +558,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, // setup the Query parent := ctx - query := dht.newQuery(string(keyMH), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + query := dht.newQuery(string(key), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { routing.PublishQueryEvent(parent, &routing.QueryEvent{ Type: routing.SendingQuery, ID: p, @@ -625,7 +626,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, } // refresh the cpl for this key after the query is run - dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(keyMH)), time.Now()) + dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now()) } // FindPeer searches for a peer with given ID.