Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: linting fixes #578

Merged
merged 6 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (

var logger = logging.Logger("dht")

// BaseConnMgrScore is the base of the score set on the connection manager "kbucket" tag.
// It is added with the common prefix length between two peer IDs.
const BaseConnMgrScore = 5

type mode int
Expand Down Expand Up @@ -606,22 +608,22 @@ func (dht *IpfsDHT) getMode() mode {
return dht.mode
}

// Context return dht's context
// Context returns the DHT's context.
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
}

// Process return dht's process
// Process returns the DHT's process.
func (dht *IpfsDHT) Process() goprocess.Process {
return dht.proc
}

// RoutingTable return dht's routingTable
// RoutingTable returns the DHT's routingTable.
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
return dht.routingTable
}

// Close calls Process Close
// Close calls Process Close.
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}
Expand All @@ -630,18 +632,22 @@ func mkDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}

// PeerID returns the DHT node's Peer ID.
func (dht *IpfsDHT) PeerID() peer.ID {
return dht.self
}

// PeerKey returns a DHT key, converted from the DHT node's Peer ID.
func (dht *IpfsDHT) PeerKey() []byte {
return kb.ConvertPeerID(dht.self)
}

// Host returns the libp2p host this DHT is operating with.
func (dht *IpfsDHT) Host() host.Host {
return dht.host
}

// Ping sends a ping message to the passed peer and waits for a response.
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
req := pb.NewMessage(pb.Message_PING, nil, 0)
resp, err := dht.sendRequest(ctx, p, req)
Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package dht
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/peer"
"time"

multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/peer"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
)

// DefaultBootstrapPeers is a set of public DHT bootstrap peers provided by libp2p.
var DefaultBootstrapPeers []multiaddr.Multiaddr

// Minimum number of peers in the routing table. If we drop below this and we
Expand Down
4 changes: 3 additions & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

var dhtReadMessageTimeout = 10 * time.Second
var dhtStreamIdleTimeout = 1 * time.Minute

// ErrReadTimeout is an error that occurs when no message is read within the timeout period.
var ErrReadTimeout = fmt.Errorf("timed out reading response")

// The Protobuf writer performs multiple small writes when writing a message.
Expand Down Expand Up @@ -111,7 +113,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
err = req.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
logger.Debugf("error unmarshalling message: %#v", err)
logger.Debugf("error unmarshaling message: %#v", err)
_ = stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessages.M(1),
Expand Down
5 changes: 3 additions & 2 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package dht

import (
"fmt"
"github.com/ipfs/go-ipns"
"time"

ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipns"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -27,6 +27,7 @@ const (
ModeServer
)

// DefaultPrefix is the application specific prefix attached to all DHT protocols by default.
const DefaultPrefix protocol.ID = "/ipfs"

// Options is a structure containing all the options that can be used when constructing a DHT.
Expand Down Expand Up @@ -322,7 +323,7 @@ func DisableProviders() Option {
}
}

// DisableProviders disables storing and retrieving value records (including
// DisableValues disables storing and retrieving value records (including
// public keys).
//
// Defaults to enabled.
Expand Down
3 changes: 1 addition & 2 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,9 +1356,8 @@ func TestClientModeFindPeer(t *testing.T) {
func minInt(a, b int) int {
if a < b {
return a
} else {
return b
}
return b
}

func TestFindPeerQueryMinimal(t *testing.T) {
Expand Down
16 changes: 13 additions & 3 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,35 @@ import (
kbucket "github.com/libp2p/go-libp2p-kbucket"
)

// KeyKadID contains the Kademlia key in string and binary form.
type KeyKadID struct {
Key string
Kad kbucket.ID
}

// NewKeyKadID creates a KeyKadID from a string Kademlia ID.
func NewKeyKadID(k string) *KeyKadID {
return &KeyKadID{
Key: k,
Kad: kbucket.ConvertKey(k),
}
}

// PeerKadID contains a libp2p Peer ID and a binary Kademlia ID.
type PeerKadID struct {
Peer peer.ID
Kad kbucket.ID
}

// NewPeerKadID creates a PeerKadID from a libp2p Peer ID.
func NewPeerKadID(p peer.ID) *PeerKadID {
return &PeerKadID{
Peer: p,
Kad: kbucket.ConvertPeerID(p),
}
}

// NewPeerKadIDSlice creates a slice of PeerKadID from the passed slice of libp2p Peer IDs.
func NewPeerKadIDSlice(p []peer.ID) []*PeerKadID {
r := make([]*PeerKadID, len(p))
for i := range p {
Expand All @@ -43,14 +48,16 @@ func NewPeerKadIDSlice(p []peer.ID) []*PeerKadID {
return r
}

// OptPeerKadID returns a pointer to a PeerKadID or nil if the passed Peer ID is it's default value.
func OptPeerKadID(p peer.ID) *PeerKadID {
if p == "" {
return nil
} else {
return NewPeerKadID(p)
}
return NewPeerKadID(p)
}

// NewLookupEvent creates a LookupEvent automatically converting the node
// libp2p Peer ID to a PeerKadID and the string Kademlia key to a KeyKadID.
func NewLookupEvent(
node peer.ID,
id uuid.UUID,
Expand Down Expand Up @@ -86,6 +93,7 @@ type LookupEvent struct {
Terminate *LookupTerminateEvent
}

// NewLookupUpdateEvent creates a new lookup update event, automatically converting the passed peer IDs to peer Kad IDs.
func NewLookupUpdateEvent(
cause peer.ID,
source peer.ID,
Expand Down Expand Up @@ -127,13 +135,15 @@ type LookupTerminateEvent struct {
Reason LookupTerminationReason
}

// NewLookupTerminateEvent creates a new lookup termination event with a given reason.
func NewLookupTerminateEvent(reason LookupTerminationReason) *LookupTerminateEvent {
return &LookupTerminateEvent{Reason: reason}
}

// LookupTerminationReason captures reasons for terminating a lookup.
type LookupTerminationReason int

// MarshalJSON returns the JSON encoding of the passed lookup termination reason.
func (r LookupTerminationReason) MarshalJSON() ([]byte, error) {
return json.Marshal(r.String())
}
Expand Down Expand Up @@ -220,7 +230,7 @@ func RegisterForLookupEvents(ctx context.Context) (context.Context, <-chan *Look
return context.WithValue(ctx, routingLookupKey{}, ech), ch
}

// Number of events to buffer.
// LookupEventBufferSize is the number of events to buffer.
var LookupEventBufferSize = 16

// PublishLookupEvent publishes a query event to the query event channel
Expand Down
4 changes: 2 additions & 2 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (lk loggableKeyBytes) String() string {
return k
}

// Kademlia 'node lookup' operation. Returns a channel of the K closest peers
// to the given key
// GetClosestPeers is a Kademlia 'node lookup' operation. Returns a channel of
// the K closest peers to the given key.
//
// If the context is canceled, this function will return the context error along
// with the closest K peers it has found so far.
Expand Down
2 changes: 1 addition & 1 deletion protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (
var (
// ProtocolDHT is the default DHT protocol.
ProtocolDHT protocol.ID = "/ipfs/kad/1.0.0"
// DefualtProtocols spoken by the DHT.
// DefaultProtocols spoken by the DHT.
DefaultProtocols = []protocol.ID{ProtocolDHT}
)
71 changes: 36 additions & 35 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,30 +294,31 @@ func (q *query) run() {

// spawnQuery starts one query, if an available heard peer is found
func (q *query) spawnQuery(ctx context.Context, cause peer.ID, ch chan<- *queryUpdate) {
if peers := q.queryPeers.GetSortedHeard(); len(peers) == 0 {
peers := q.queryPeers.GetSortedHeard()
if len(peers) == 0 {
return
} else {
PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
q.key,
NewLookupUpdateEvent(
cause,
q.queryPeers.GetReferrer(peers[0]),
nil, // heard
[]peer.ID{peers[0]}, // waiting
nil, // queried
nil, // unreachable
),
nil,
nil,
),
)
q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting)
q.waitGroup.Add(1)
go q.queryPeer(ctx, ch, peers[0])
}

PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
q.key,
NewLookupUpdateEvent(
cause,
q.queryPeers.GetReferrer(peers[0]),
nil, // heard
[]peer.ID{peers[0]}, // waiting
nil, // queried
nil, // unreachable
),
nil,
nil,
),
)
q.queryPeers.SetState(peers[0], qpeerset.PeerWaiting)
q.waitGroup.Add(1)
go q.queryPeer(ctx, ch, peers[0])
}

func (q *query) isReadyToTerminate() (bool, LookupTerminationReason) {
Expand Down Expand Up @@ -353,20 +354,20 @@ func (q *query) isStarvationTermination() bool {
func (q *query) terminate(ctx context.Context, cancel context.CancelFunc, reason LookupTerminationReason) {
if q.terminated {
return
} else {
PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
q.key,
nil,
nil,
NewLookupTerminateEvent(reason),
),
)
cancel() // abort outstanding queries
q.terminated = true
}

PublishLookupEvent(ctx,
NewLookupEvent(
q.dht.self,
q.id,
q.key,
nil,
nil,
NewLookupTerminateEvent(reason),
),
)
cancel() // abort outstanding queries
q.terminated = true
}

// queryPeer queries a single peer and reports its findings on the channel.
Expand Down
6 changes: 4 additions & 2 deletions records.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type pubkrs struct {
err error
}

// GetPublicKey gets the public key when given a Peer ID. It will extract from
// the Peer ID if inlined or ask the node it belongs to or ask the DHT.
func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
logger.Debugf("getPublicKey for: %s", p)

Expand Down Expand Up @@ -77,7 +79,7 @@ func (dht *IpfsDHT) getPublicKeyFromDHT(ctx context.Context, p peer.ID) (ci.PubK

pubk, err := ci.UnmarshalPublicKey(val)
if err != nil {
logger.Errorf("Could not unmarshall public key retrieved from DHT for %v", p)
logger.Errorf("Could not unmarshal public key retrieved from DHT for %v", p)
return nil, err
}

Expand Down Expand Up @@ -109,7 +111,7 @@ func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.Pub

pubk, err := ci.UnmarshalPublicKey(record.GetValue())
if err != nil {
logger.Errorf("Could not unmarshall public key for %v", p)
logger.Errorf("Could not unmarshal public key for %v", p)
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op
return best, nil
}

// SearchValue searches for the value corresponding to given Key and streams the results.
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
if !dht.enableValues {
return nil, routing.ErrNotSupported
Expand Down Expand Up @@ -476,7 +477,7 @@ func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
// // only share WAN-friendly addresses ??
// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
if len(pi.Addrs) < 1 {
return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
return nil, fmt.Errorf("no known addresses for self, cannot put provider")
}

pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
Expand Down