diff --git a/dht.go b/dht.go index 7e22a31f..950d7fff 100644 --- a/dht.go +++ b/dht.go @@ -8,6 +8,16 @@ import ( "sync" "time" + "github.com/benbjohnson/clock" + "github.com/plprobelab/go-kademlia/coord" + "github.com/plprobelab/go-kademlia/events/scheduler/simplescheduler" + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" + "github.com/plprobelab/go-kademlia/libp2p" + kadquery "github.com/plprobelab/go-kademlia/query" + "github.com/plprobelab/go-kademlia/routing/triert" + "github.com/plprobelab/go-kademlia/util" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -160,6 +170,11 @@ type IpfsDHT struct { // addrFilter is used to filter the addresses we put into the peer store. // Mostly used to filter out localhost and local addresses. addrFilter func([]ma.Multiaddr) []ma.Multiaddr + + // ------ go-kademlia ------ + + coordinator *coord.Coordinator[key.Key256, ma.Multiaddr] + queryWaiters map[kadquery.QueryID]chan<- kad.Response[key.Key256, ma.Multiaddr] } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -249,9 +264,56 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) dht.runFixLowPeersLoop() } + rt, err := triert.New[key.Key256](newPeerID(h.ID()).Key(), nil) + if err != nil { + return nil, err + } + sched := simplescheduler.NewSimpleScheduler(clock.New()) + ep := libp2p.NewLibp2pEndpoint(ctx, h, sched) + c, _ := coord.NewCoordinator[key.Key256, ma.Multiaddr](ep, rt, nil) + + dht.coordinator = c + dht.queryWaiters = make(map[kadquery.QueryID]chan<- kad.Response[key.Key256, ma.Multiaddr]) + return dht, nil } +func (d *IpfsDHT) Start(ctx context.Context) { + ctx, span := util.StartSpan(ctx, "IpfsDHT.Start") + defer span.End() + go d.loop(ctx) +} + +func (d *IpfsDHT) loop(ctx context.Context) { + ctx, span := util.StartSpan(ctx, "IpfsDHT.loop") + defer span.End() + + kadEvents := d.coordinator.Start(ctx) + for { + select { + case <-ctx.Done(): + return + case ev := <-kadEvents: + switch tev := ev.(type) { + case *coord.KademliaOutboundQueryProgressedEvent[key.Key256, ma.Multiaddr]: + // TODO: locking + ch, ok := d.queryWaiters[tev.QueryID] + if !ok { + // we have lost the query waiter somehow + d.coordinator.StopQuery(ctx, tev.QueryID) + continue + } + + // notify the waiter + ch <- tev.Response + + default: + panic(fmt.Sprintf("unexpected event: %T", tev)) + } + } + } +} + // NewDHT creates a new DHT object with the given peer as the 'local' host. // IpfsDHT's initialized with this function will respond to DHT requests, // whereas IpfsDHT's initialized with NewDHTClient will not. @@ -412,7 +474,6 @@ func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutbound df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int { return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p)) }) - if err != nil { return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err) } diff --git a/go_kademlia.go b/go_kademlia.go new file mode 100644 index 00000000..7a678b7f --- /dev/null +++ b/go_kademlia.go @@ -0,0 +1,65 @@ +package dht + +import ( + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + mh "github.com/multiformats/go-multihash" + mhreg "github.com/multiformats/go-multihash/core" + + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" +) + +type peerID struct { + peer.ID +} + +var _ kad.NodeID[key.Key256] = (*peerID)(nil) + +func newPeerID(p peer.ID) *peerID { + return &peerID{p} +} + +func (id peerID) Key() key.Key256 { + hasher, _ := mhreg.GetHasher(mh.SHA2_256) + hasher.Write([]byte(id.ID)) + return key.NewKey256(hasher.Sum(nil)) +} + +func (id peerID) NodeID() kad.NodeID[key.Key256] { + return &id +} + +func (id peerID) String() string { + return id.ID.String() +} + +type addrInfo struct { + peer.AddrInfo +} + +var _ kad.NodeInfo[key.Key256, multiaddr.Multiaddr] = (*addrInfo)(nil) + +func newAddrInfo(ai peer.AddrInfo) *addrInfo { + return &addrInfo{ + AddrInfo: ai, + } +} + +func (ai addrInfo) Key() key.Key256 { + return newPeerID(ai.AddrInfo.ID).Key() +} + +func (ai addrInfo) ID() kad.NodeID[key.Key256] { + return newPeerID(ai.AddrInfo.ID) +} + +func (ai addrInfo) Addresses() []multiaddr.Multiaddr { + addrs := make([]multiaddr.Multiaddr, len(ai.Addrs)) + copy(addrs, ai.Addrs) + return addrs +} + +func (ai addrInfo) String() string { + return ai.AddrInfo.ID.String() +} diff --git a/pb/helpers.go b/pb/helpers.go new file mode 100644 index 00000000..2315feba --- /dev/null +++ b/pb/helpers.go @@ -0,0 +1,69 @@ +package dht_pb + +import ( + "errors" + + pb "github.com/ipfs/boxo/bitswap/message/pb" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" +) + +var ErrNoValidAddresses = errors.New("no valid addresses") + +var ( + _ kad.Request[key.Key256, multiaddr.Multiaddr] = (*pb.Message)(nil) + _ kad.Response[key.Key256, multiaddr.Multiaddr] = (*pb.Message)(nil) +) + +func (m *Message) Target() key.Key256 { + p, err := peer.IDFromBytes(m.GetKey()) + if err != nil { + return key.ZeroKey256() + } + return key.NewKey256([]byte(p.String())) +} + +func (m *Message) EmptyResponse() kad.Response[key.Key256, multiaddr.Multiaddr] { + return &Message{} +} + +func (m *Message) CloserNodes() []kad.NodeInfo[key.Key256, multiaddr.Multiaddr] { + closerPeers := m.GetCloserPeers() + if closerPeers == nil { + return []kad.NodeInfo[key.Key256, multiaddr.Multiaddr]{} + } + return ParsePeers(closerPeers) +} + +func ParsePeers(pbps []*Message_Peer) []kad.NodeInfo[key.Key256, multiaddr.Multiaddr] { + peers := make([]kad.NodeInfo[key.Key256, multiaddr.Multiaddr], 0, len(pbps)) + for _, p := range pbps { + pi, err := PBPeerToPeerInfo(p) + if err == nil { + peers = append(peers, pi) + } + } + return peers +} + +func PBPeerToPeerInfo(pbp *Message_Peer) (*peer.AddrInfo, error) { + addrs := make([]multiaddr.Multiaddr, 0, len(pbp.Addrs)) + for _, a := range pbp.Addrs { + addr, err := multiaddr.NewMultiaddrBytes(a) + if err == nil { + addrs = append(addrs, addr) + } + } + if len(addrs) == 0 { + return nil, ErrNoValidAddresses + } + + return kad.NewAddrInfo(peer.AddrInfo{ + ID: peer.ID(pbp.Id), + Addrs: addrs, + }), nil +} diff --git a/routing.go b/routing.go index 20490041..9a7f998f 100644 --- a/routing.go +++ b/routing.go @@ -8,6 +8,15 @@ import ( "sync" "time" + "github.com/multiformats/go-multiaddr" + + pb "github.com/libp2p/go-libp2p-kad-dht/pb" + + "github.com/plprobelab/go-kademlia/kad" + "github.com/plprobelab/go-kademlia/key" + "github.com/plprobelab/go-kademlia/network/address" + kadquery "github.com/plprobelab/go-kademlia/query" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" @@ -204,7 +213,8 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing } func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan recvdVal, stopCh chan struct{}, - out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) { + out chan<- []byte, nvals int, +) ([]byte, map[peer.ID]struct{}, bool) { numResponses := 0 return dht.processValues(ctx, key, valCh, func(ctx context.Context, v recvdVal, better bool) bool { @@ -226,7 +236,8 @@ func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-c } func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan recvdVal, - newVal func(ctx context.Context, v recvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { + newVal func(ctx context.Context, v recvdVal, better bool) bool, +) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) { loop: for { if aborted { @@ -372,7 +383,6 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st } }, ) - if err != nil { return } @@ -577,7 +587,6 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash lookupRes, err := dht.runLookupWithFollowup(ctx, string(key), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { - // For DHT query command routing.PublishQueryEvent(ctx, &routing.QueryEvent{ Type: routing.SendingQuery, @@ -641,6 +650,11 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash } } +func (d *IpfsDHT) registerQueryWaiter(queryID kadquery.QueryID, ch chan<- kad.Response[key.Key256, multiaddr.Multiaddr]) { + // TODO: locking + d.queryWaiters[queryID] = ch +} + // FindPeer searches for a peer with given ID. func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) { ctx, span := internal.StartSpan(ctx, "IpfsDHT.FindPeer", trace.WithAttributes(attribute.Stringer("PeerID", id))) @@ -657,6 +671,47 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return pi, nil } + var queryID kadquery.QueryID = "testquery" // TODO: randomize to support multiple queries + + msg := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0) + msg.EmptyResponse() + err = dht.coordinator.StartQuery(ctx, queryID, address.ProtocolID(d.protocols[0]), msg) + if err != nil { + return peer.AddrInfo{}, fmt.Errorf("failed to start query: %w", err) + } + + ch := make(chan kad.Response[key.Key256, multiaddr.Multiaddr]) + dht.registerQueryWaiter(queryID, ch) + + // wait for query to finish + for { + select { + case <-ctx.Done(): + return peer.AddrInfo{}, ctx.Err() + case resp, ok := <-ch: + if !ok { + // channel was closed, so query can't progress + dht.coordinator.StopQuery(ctx, queryID) + return peer.AddrInfo{}, fmt.Errorf("query was unexpectedly stopped") + } + + // For DHT query command + routing.PublishQueryEvent(ctx, &routing.QueryEvent{ + Type: routing.SendingQuery, + ID: p, + }) + + println("IpfsHandler.FindNode: got FindNode response") + for _, found := range resp.CloserNodes() { + if key.Equal(found.ID().Key(), newPeerID(id).Key()) { + // found the node we were looking for + dht.coordinator.StopQuery(ctx, queryID) + return found, nil + } + } + } + } + lookupRes, err := dht.runLookupWithFollowup(ctx, string(id), func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) { // For DHT query command @@ -690,7 +745,6 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return dht.host.Network().Connectedness(id) == network.Connected }, ) - if err != nil { return peer.AddrInfo{}, err }