diff --git a/dht.go b/dht.go index c5676d39b..91302cc08 100644 --- a/dht.go +++ b/dht.go @@ -36,6 +36,8 @@ import ( var logger = logging.Logger("dht") +// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag. +// It is added with the common prefix length between two peer IDs. const BaseConnMgrScore = 5 type mode int @@ -606,22 +608,22 @@ func (dht *IpfsDHT) getMode() mode { return dht.mode } -// Context return dht's context +// Context returns the DHT's context. func (dht *IpfsDHT) Context() context.Context { return dht.ctx } -// Process return dht's process +// Process returns the DHT's process. func (dht *IpfsDHT) Process() goprocess.Process { return dht.proc } -// RoutingTable return dht's routingTable +// RoutingTable returns the DHT's routingTable. func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable { return dht.routingTable } -// Close calls Process Close +// Close calls Process Close. func (dht *IpfsDHT) Close() error { return dht.proc.Close() } @@ -630,18 +632,22 @@ func mkDsKey(s string) ds.Key { return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s))) } +// PeerID returns the DHT node's Peer ID. func (dht *IpfsDHT) PeerID() peer.ID { return dht.self } +// PeerKey returns a DHT key, converted from the DHT node's Peer ID. func (dht *IpfsDHT) PeerKey() []byte { return kb.ConvertPeerID(dht.self) } +// Host returns the libp2p host this DHT is operating with. func (dht *IpfsDHT) Host() host.Host { return dht.host } +// Ping sends a ping message to the passed peer and waits for a response. func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error { req := pb.NewMessage(pb.Message_PING, nil, 0) resp, err := dht.sendRequest(ctx, p, req) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index fb3961dc9..0f9e27bdc 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -3,17 +3,17 @@ package dht import ( "context" "fmt" - "github.com/libp2p/go-libp2p-core/peer" "time" multierror "github.com/hashicorp/go-multierror" process "github.com/jbenet/goprocess" processctx "github.com/jbenet/goprocess/context" + "github.com/libp2p/go-libp2p-core/peer" kbucket "github.com/libp2p/go-libp2p-kbucket" "github.com/multiformats/go-multiaddr" - _ "github.com/multiformats/go-multiaddr-dns" ) +// DefaultBootstrapPeers is a set of public DHT bootstrap peers provided by libp2p. var DefaultBootstrapPeers []multiaddr.Multiaddr // Minimum number of peers in the routing table. If we drop below this and we diff --git a/dht_net.go b/dht_net.go index 0fea86ecb..9bb1aed6b 100644 --- a/dht_net.go +++ b/dht_net.go @@ -24,6 +24,8 @@ import ( var dhtReadMessageTimeout = 10 * time.Second var dhtStreamIdleTimeout = 1 * time.Minute + +// ErrReadTimeout is an error that occurs when no message is read within the timeout period. var ErrReadTimeout = fmt.Errorf("timed out reading response") // The Protobuf writer performs multiple small writes when writing a message. @@ -111,7 +113,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { err = req.Unmarshal(msgbytes) r.ReleaseMsg(msgbytes) if err != nil { - logger.Debugf("error unmarshalling message: %#v", err) + logger.Debugf("error unmarshaling message: %#v", err) _ = stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")}, metrics.ReceivedMessages.M(1), diff --git a/dht_options.go b/dht_options.go index 1cdde18fb..1e15031d6 100644 --- a/dht_options.go +++ b/dht_options.go @@ -2,11 +2,11 @@ package dht import ( "fmt" - "github.com/ipfs/go-ipns" "time" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" + "github.com/ipfs/go-ipns" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -27,6 +27,7 @@ const ( ModeServer ) +// DefaultPrefix is the application specific prefix attached to all DHT protocols by default. const DefaultPrefix protocol.ID = "/ipfs" // Options is a structure containing all the options that can be used when constructing a DHT. @@ -322,7 +323,7 @@ func DisableProviders() Option { } } -// DisableProviders disables storing and retrieving value records (including +// DisableValues disables storing and retrieving value records (including // public keys). // // Defaults to enabled. diff --git a/dht_test.go b/dht_test.go index 76331951f..666a4a2a4 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1356,9 +1356,8 @@ func TestClientModeFindPeer(t *testing.T) { func minInt(a, b int) int { if a < b { return a - } else { - return b } + return b } func TestFindPeerQueryMinimal(t *testing.T) { diff --git a/events.go b/events.go index 38ec6ca6c..9a1ff3c17 100644 --- a/events.go +++ b/events.go @@ -11,11 +11,13 @@ import ( kbucket "github.com/libp2p/go-libp2p-kbucket" ) +// KeyKadID contains the Kademlia key in string and binary form. type KeyKadID struct { Key string Kad kbucket.ID } +// NewKeyKadID creates a KeyKadID from a string Kademlia ID. func NewKeyKadID(k string) *KeyKadID { return &KeyKadID{ Key: k, @@ -23,11 +25,13 @@ func NewKeyKadID(k string) *KeyKadID { } } +// PeerKadID contains a libp2p Peer ID and a binary Kademlia ID. type PeerKadID struct { Peer peer.ID Kad kbucket.ID } +// NewPeerKadID creates a PeerKadID from a libp2p Peer ID. func NewPeerKadID(p peer.ID) *PeerKadID { return &PeerKadID{ Peer: p, @@ -35,6 +39,7 @@ func NewPeerKadID(p peer.ID) *PeerKadID { } } +// NewPeerKadIDSlice creates a slice of PeerKadID from the passed slice of libp2p Peer IDs. func NewPeerKadIDSlice(p []peer.ID) []*PeerKadID { r := make([]*PeerKadID, len(p)) for i := range p { @@ -43,14 +48,16 @@ func NewPeerKadIDSlice(p []peer.ID) []*PeerKadID { return r } +// OptPeerKadID returns a pointer to a PeerKadID or nil if the passed Peer ID is it's default value. func OptPeerKadID(p peer.ID) *PeerKadID { if p == "" { return nil - } else { - return NewPeerKadID(p) } + return NewPeerKadID(p) } +// NewLookupEvent creates a LookupEvent automatically converting the node +// libp2p Peer ID to a PeerKadID and the string Kademlia key to a KeyKadID. func NewLookupEvent( node peer.ID, id uuid.UUID, @@ -86,6 +93,7 @@ type LookupEvent struct { Terminate *LookupTerminateEvent } +// NewLookupUpdateEvent creates a new lookup update event, automatically converting the passed peer IDs to peer Kad IDs. func NewLookupUpdateEvent( cause peer.ID, source peer.ID, @@ -127,6 +135,7 @@ type LookupTerminateEvent struct { Reason LookupTerminationReason } +// NewLookupTerminateEvent creates a new lookup termination event with a given reason. func NewLookupTerminateEvent(reason LookupTerminationReason) *LookupTerminateEvent { return &LookupTerminateEvent{Reason: reason} } @@ -134,6 +143,7 @@ func NewLookupTerminateEvent(reason LookupTerminationReason) *LookupTerminateEve // LookupTerminationReason captures reasons for terminating a lookup. type LookupTerminationReason int +// MarshalJSON returns the JSON encoding of the passed lookup termination reason. func (r LookupTerminationReason) MarshalJSON() ([]byte, error) { return json.Marshal(r.String()) } @@ -220,7 +230,7 @@ func RegisterForLookupEvents(ctx context.Context) (context.Context, <-chan *Look return context.WithValue(ctx, routingLookupKey{}, ech), ch } -// Number of events to buffer. +// LookupEventBufferSize is the number of events to buffer. var LookupEventBufferSize = 16 // PublishLookupEvent publishes a query event to the query event channel diff --git a/lookup.go b/lookup.go index a696df4cf..057390c49 100644 --- a/lookup.go +++ b/lookup.go @@ -66,8 +66,8 @@ func (lk loggableKeyBytes) String() string { return k } -// Kademlia 'node lookup' operation. Returns a channel of the K closest peers -// to the given key +// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of +// the K closest peers to the given key. // // If the context is canceled, this function will return the context error along // with the closest K peers it has found so far. diff --git a/protocol.go b/protocol.go index 855592e25..a68f01c19 100644 --- a/protocol.go +++ b/protocol.go @@ -7,6 +7,6 @@ import ( var ( // ProtocolDHT is the default DHT protocol. ProtocolDHT protocol.ID = "/ipfs/kad/1.0.0" - // DefualtProtocols spoken by the DHT. + // DefaultProtocols spoken by the DHT. DefaultProtocols = []protocol.ID{ProtocolDHT} ) diff --git a/query.go b/query.go index d65c5cab1..597cd196b 100644 --- a/query.go +++ b/query.go @@ -294,30 +294,31 @@ func (q *query) run() { // spawnQuery starts one query, if an available heard peer is found func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryUpdate) { - if peers := q.queryPeers.GetSortedHeard(); len(peers) == 0 { + peers := q.queryPeers.GetSortedHeard() + if len(peers) == 0 { return - } else { - PublishLookupEvent(ctx, - NewLookupEvent( - q.dht.self, - q.id, - q.key, - NewLookupUpdateEvent( - cause, - q.queryPeers.GetReferrer(peers[0]), - nil, // heard - []peer.ID{peers[0]}, // waiting - nil, // queried - nil, // unreachable - ), - nil, - nil, - ), - ) - q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting) - q.waitGroup.Add(1) - go q.queryPeer(ctx, ch, peers[0]) } + + PublishLookupEvent(ctx, + NewLookupEvent( + q.dht.self, + q.id, + q.key, + NewLookupUpdateEvent( + cause, + q.queryPeers.GetReferrer(peers[0]), + nil, // heard + []peer.ID{peers[0]}, // waiting + nil, // queried + nil, // unreachable + ), + nil, + nil, + ), + ) + q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting) + q.waitGroup.Add(1) + go q.queryPeer(ctx, ch, peers[0]) } func (q *query) isReadyToTerminate() (bool, LookupTerminationReason) { @@ -353,20 +354,20 @@ func (q *query) isStarvationTermination() bool { func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason LookupTerminationReason) { if q.terminated { return - } else { - PublishLookupEvent(ctx, - NewLookupEvent( - q.dht.self, - q.id, - q.key, - nil, - nil, - NewLookupTerminateEvent(reason), - ), - ) - cancel() // abort outstanding queries - q.terminated = true } + + PublishLookupEvent(ctx, + NewLookupEvent( + q.dht.self, + q.id, + q.key, + nil, + nil, + NewLookupTerminateEvent(reason), + ), + ) + cancel() // abort outstanding queries + q.terminated = true } // queryPeer queries a single peer and reports its findings on the channel. diff --git a/records.go b/records.go index 0677b5c40..adb28ce7d 100644 --- a/records.go +++ b/records.go @@ -15,6 +15,8 @@ type pubkrs struct { err error } +// GetPublicKey gets the public key when given a Peer ID. It will extract from +// the Peer ID if inlined or ask the node it belongs to or ask the DHT. func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { logger.Debugf("getPublicKey for: %s", p) @@ -77,7 +79,7 @@ func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubK pubk, err := ci.UnmarshalPublicKey(val) if err != nil { - logger.Errorf("Could not unmarshall public key retrieved from DHT for %v", p) + logger.Errorf("Could not unmarshal public key retrieved from DHT for %v", p) return nil, err } @@ -109,7 +111,7 @@ func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.Pub pubk, err := ci.UnmarshalPublicKey(record.GetValue()) if err != nil { - logger.Errorf("Could not unmarshall public key for %v", p) + logger.Errorf("Could not unmarshal public key for %v", p) return nil, err } diff --git a/routing.go b/routing.go index 6808284a1..4d0f077c5 100644 --- a/routing.go +++ b/routing.go @@ -132,6 +132,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op return best, nil } +// SearchValue searches for the value corresponding to given Key and streams the results. func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { if !dht.enableValues { return nil, routing.ErrNotSupported @@ -476,7 +477,7 @@ func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) { // // only share WAN-friendly addresses ?? // pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs) if len(pi.Addrs) < 1 { - return nil, fmt.Errorf("no known addresses for self. cannot put provider.") + return nil, fmt.Errorf("no known addresses for self, cannot put provider") } pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)