Skip to content

Commit

Permalink
make all key types loggable
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Dec 19, 2019
1 parent 3e24f35 commit 5d2e3df
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 26 deletions.
11 changes: 5 additions & 6 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/providers"

"github.com/gogo/protobuf/proto"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
Expand All @@ -33,6 +32,7 @@ import (
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
)

var logger = logging.Logger("dht")
Expand Down Expand Up @@ -374,18 +374,17 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, key)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, multihashLoggableKey(key))
defer eip.Done()

keyMH := key.Hash()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, keyMH, 0)
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
logger.Warningf("read timeout: %s %s", p.Pretty(), keyMH)
logger.Warningf("read timeout: %s %s", p.Pretty(), key)
fallthrough
default:
eip.SetError(err)
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,6 @@ github.com/libp2p/go-flow-metrics v0.0.2 h1:U5TvqfoyR6GVRM+bC15Ux1ltar1kbj6Zw6xO
github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT7epKdeM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p v0.4.2 h1:p0cthB0jDNHO4gH2HzS8/nAMMXbfUlFHs0jwZ4U+F2g=
github.com/libp2p/go-libp2p v0.4.2/go.mod h1:MNmgUxUw5pMsdOzMlT0EE7oKjRasl+WyVwM0IBlpKgQ=
github.com/libp2p/go-libp2p v0.5.0 h1:/nnb5mc2TK6TwknECsWIkfCwMTHv0AXbvzxlnVivfeg=
github.com/libp2p/go-libp2p v0.5.0/go.mod h1:Os7a5Z3B+ErF4v7zgIJ7nBHNu2LYt8ZMLkTQUB3G/wA=
github.com/libp2p/go-libp2p-autonat v0.1.1 h1:WLBZcIRsjZlWdAZj9CiBSvU2wQXoUOiS1Zk1tM7DTJI=
Expand All @@ -189,8 +187,6 @@ github.com/libp2p/go-libp2p-core v0.3.0 h1:F7PqduvrztDtFsAa/bcheQ3azmNo+Nq7m8hQY
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM=
Expand Down Expand Up @@ -268,8 +264,6 @@ github.com/libp2p/go-tcp-transport v0.1.0 h1:IGhowvEqyMFknOar4FWCKSWE0zL36UFKQti
github.com/libp2p/go-tcp-transport v0.1.0/go.mod h1:oJ8I5VXryj493DEJ7OsBieu8fcg2nHGctwtInJVpipc=
github.com/libp2p/go-tcp-transport v0.1.1 h1:yGlqURmqgNA2fvzjSgZNlHcsd/IulAnKM8Ncu+vlqnw=
github.com/libp2p/go-tcp-transport v0.1.1/go.mod h1:3HzGvLbx6etZjnFlERyakbaYPdfjg2pWP97dFZworkY=
github.com/libp2p/go-ws-transport v0.1.2 h1:VnxQcLfSGtqupqPpBNu8fUiCv+IN1RJ2BcVqQEM+z8E=
github.com/libp2p/go-ws-transport v0.1.2/go.mod h1:dsh2Ld8F+XNmzpkaAijmg5Is+e9l6/1tK/6VFOdN69Y=
github.com/libp2p/go-ws-transport v0.2.0 h1:MJCw2OrPA9+76YNRvdo1wMnSOxb9Bivj6sVFY1Xrj6w=
github.com/libp2p/go-ws-transport v0.2.0/go.mod h1:9BHJz/4Q5A9ludYWKoGCFC5gUElzlHoKzu0yY9p/klM=
github.com/libp2p/go-yamux v1.2.2 h1:s6J6o7+ajoQMjHe7BEnq+EynOj5D2EoG8CuQgL3F2vg=
Expand Down
18 changes: 15 additions & 3 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
notif "github.com/libp2p/go-libp2p-routing/notifications"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
)

func tryFormatLoggableKey(k string) (string, error) {
Expand All @@ -33,11 +35,15 @@ func tryFormatLoggableKey(k string) (string, error) {
cstr = k
}

var encStr string
c, err := cid.Cast([]byte(cstr))
if err != nil {
return "", fmt.Errorf("loggableKey could not cast key to a CID: %x %v", k, err)
if err == nil {
encStr = c.String()
} else {
encStr = base32.RawStdEncoding.EncodeToString([]byte(cstr))
}
return fmt.Sprintf("/%s/%s", proto, c.String()), nil

return fmt.Sprintf("/%s/%s", proto, encStr), nil
}

func loggableKey(k string) logging.LoggableMap {
Expand All @@ -53,6 +59,12 @@ func loggableKey(k string) logging.LoggableMap {
}
}

func multihashLoggableKey(mh multihash.Multihash) logging.LoggableMap {
return logging.LoggableMap{
"multihash": base32.RawStdEncoding.EncodeToString(mh),
}
}

// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan peer.ID, error) {
Expand Down
8 changes: 7 additions & 1 deletion lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ func TestLoggableKey(t *testing.T) {
t.Error("expected cid to be formatted as a loggable key")
}

for _, s := range []string{"bla bla", "/bla", "/bla/asdf", ""} {
for _, s := range []string{"/bla", ""} {
if _, err := tryFormatLoggableKey(s); err == nil {
t.Errorf("expected to fail formatting: %s", s)
}
}

for _, s := range []string{"bla bla", "/bla/asdf"} {
if _, err := tryFormatLoggableKey(s); err != nil {
t.Errorf("expected to be formatable: %s", s)
}
}
}
21 changes: 11 additions & 10 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
"github.com/multiformats/go-multihash"
)

// asyncQueryBuffer is the size of buffered channels in async queries. This
Expand Down Expand Up @@ -415,8 +416,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
if !dht.enableProviders {
return routing.ErrNotSupported
}
eip := logger.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
keyMH := key.Hash()
eip := logger.EventBegin(ctx, "Provide", multihashLoggableKey(keyMH), logging.LoggableMap{"broadcast": brdcst})
defer func() {
if err != nil {
eip.SetError(err)
Expand Down Expand Up @@ -515,19 +516,19 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
return peerOut
}

logger.Event(ctx, "findProviders", key)
keyMH := key.Hash()
logger.Event(ctx, "findProviders", multihashLoggableKey(keyMH))

go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
return peerOut
}

func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan peer.AddrInfo) {
defer logger.EventBegin(ctx, "findProvidersAsync", key).Done()
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer logger.EventBegin(ctx, "findProvidersAsync", multihashLoggableKey(key)).Done()
defer close(peerOut)

keyMH := key.Hash()
ps := peer.NewLimitedSet(count)
provs := dht.providers.GetProviders(ctx, keyMH)
provs := dht.providers.GetProviders(ctx, key)
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
Expand All @@ -546,7 +547,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,
}
}

peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(keyMH)), AlphaValue)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), AlphaValue)
if len(peers) == 0 {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
Expand All @@ -557,7 +558,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,

// setup the Query
parent := ctx
query := dht.newQuery(string(keyMH), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
query := dht.newQuery(string(key), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
routing.PublishQueryEvent(parent, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
Expand Down Expand Up @@ -625,7 +626,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid,
}

// refresh the cpl for this key after the query is run
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(keyMH)), time.Now())
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now())
}

// FindPeer searches for a peer with given ID.
Expand Down

0 comments on commit 5d2e3df

Please sign in to comment.