From ef33c794a57afe194cbd615bddd420ebd4731909 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 3 Mar 2020 16:41:35 -0500 Subject: [PATCH] added support for independent client and server protocols --- dht.go | 14 ++++++++------ dht_net.go | 2 +- opts/options.go | 40 ++++++++++++++++++++++++++++++++++++---- subscriber_notifee.go | 8 ++++---- 4 files changed, 49 insertions(+), 15 deletions(-) diff --git a/dht.go b/dht.go index 4a79361df..16a7178aa 100644 --- a/dht.go +++ b/dht.go @@ -74,7 +74,7 @@ type IpfsDHT struct { stripedPutLocks [256]sync.Mutex - protocols []protocol.ID // DHT protocols + protocols, clientProtocols []protocol.ID // DHT protocols auto bool mode mode @@ -113,9 +113,10 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil { return nil, err } - if cfg.DisjointPaths == 0 { - cfg.DisjointPaths = cfg.BucketSize / 2 + if err := opts.UnsetDefaults(&cfg); err != nil { + return nil, err } + dht := makeDHT(ctx, h, cfg) dht.autoRefresh = cfg.RoutingTable.AutoRefresh dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod @@ -207,6 +208,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg opts.Options) *IpfsDHT { rng: rand.New(rand.NewSource(rand.Int63())), routingTable: rt, protocols: cfg.Protocols, + clientProtocols: cfg.ClientProtocols, bucketSize: cfg.BucketSize, alpha: cfg.Concurrency, d: cfg.DisjointPaths, @@ -541,9 +543,9 @@ func (dht *IpfsDHT) Close() error { return dht.proc.Close() } -func (dht *IpfsDHT) protocolStrs() []string { - pstrs := make([]string, len(dht.protocols)) - for idx, proto := range dht.protocols { +func (dht *IpfsDHT) clientProtocolStrs() []string { + pstrs := make([]string, len(dht.clientProtocols)) + for idx, proto := range dht.clientProtocols { pstrs[idx] = string(proto) } diff --git a/dht_net.go b/dht_net.go index 1eada2550..91a3d1406 100644 --- a/dht_net.go +++ b/dht_net.go @@ -318,7 +318,7 @@ func (ms *messageSender) prep(ctx context.Context) error { return nil } - nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...) + nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.clientProtocols...) if err != nil { return err } diff --git a/opts/options.go b/opts/options.go index 5d6c9d0d6..a6b4ca675 100644 --- a/opts/options.go +++ b/opts/options.go @@ -15,8 +15,9 @@ import ( const ProtocolDHTOld protocol.ID = "/ipfs/dht" var ( - ProtocolDHT protocol.ID = "/ipfs/kad/1.0.0" - DefaultProtocols = []protocol.ID{ProtocolDHT} + ProtocolDHT protocol.ID = "/ipfs/kad/2.0.0" + DefaultProtocols = []protocol.ID{ProtocolDHT, "/ipfs/kad/1.0.0"} + DefaultClientProtocols = []protocol.ID{ProtocolDHT} ) // ModeOpt describes what mode the dht should operate in @@ -38,6 +39,7 @@ type Options struct { Validator record.Validator Mode ModeOpt Protocols []protocol.ID + ClientProtocols []protocol.ID BucketSize int DisjointPaths int Concurrency int @@ -73,7 +75,6 @@ var Defaults = func(o *Options) error { "pk": record.PublicKeyValidator{}, } o.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) - o.Protocols = DefaultProtocols o.EnableProviders = true o.EnableValues = true @@ -89,6 +90,26 @@ var Defaults = func(o *Options) error { return nil } +// UnsetDefaults sets default DHT options. It is applied after Defaults and any options passed to the constructor in +// order to allow for defaults that are based on other set options. +func UnsetDefaults(o *Options) error { + if len(o.ClientProtocols) == 0 && len(o.Protocols) == 0 { + o.Protocols = DefaultProtocols + o.ClientProtocols = DefaultClientProtocols + } + + // If no client protocols set, use the server protocols as client protocols + if len(o.ClientProtocols) == 0 { + o.ClientProtocols = append(o.Protocols[:0:0], o.Protocols...) + } + + if o.DisjointPaths == 0 { + o.DisjointPaths = o.BucketSize / 2 + } + + return nil +} + // RoutingTableLatencyTolerance sets the maximum acceptable latency for peers // in the routing table's cluster. func RoutingTableLatencyTolerance(latency time.Duration) Option { @@ -180,7 +201,7 @@ func NamespacedValidator(ns string, v record.Validator) Option { } } -// Protocols sets the protocols for the DHT +// Protocols sets the protocols the DHT may respond to queries with // // Defaults to dht.DefaultProtocols func Protocols(protocols ...protocol.ID) Option { @@ -190,6 +211,17 @@ func Protocols(protocols ...protocol.ID) Option { } } +// ClientProtocols sets the protocols the DHT uses to initiate connections and queries +// +// Defaults to dht.DefaultClientProtocols. If Protocols have been set the to the non-default protocols, +// then ClientProtocols defaults to match Protocols. +func ClientProtocols(protocols ...protocol.ID) Option { + return func(o *Options) error { + o.ClientProtocols = protocols + return nil + } +} + // BucketSize configures the bucket size (k in the Kademlia paper) of the routing table. // // The default value is 20. diff --git a/subscriber_notifee.go b/subscriber_notifee.go index 0b204aebf..a69149c1c 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -61,7 +61,7 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { dht.plk.Lock() defer dht.plk.Unlock() for _, p := range dht.host.Network().Peers() { - protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...) + protos, err := dht.peerstore.SupportsProtocols(p, dht.clientProtocolStrs()...) if err != nil { return nil, fmt.Errorf("could not check peerstore for protocol support: err: %s", err) } @@ -124,7 +124,7 @@ func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, evt interface{}) { } // if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed - protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.protocolStrs()...) + protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.clientProtocolStrs()...) if err != nil { logger.Errorf("could not check peerstore for protocol support: err: %s", err) return @@ -143,7 +143,7 @@ func handlePeerProtocolsUpdatedEvent(dht *IpfsDHT, evt interface{}) { return } - protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.protocolStrs()...) + protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.clientProtocolStrs()...) if err != nil { logger.Errorf("could not check peerstore for protocol support: err: %s", err) return @@ -195,7 +195,7 @@ func fixLowPeers(dht *IpfsDHT) { // Passively add peers we already know about for _, p := range dht.host.Network().Peers() { // Don't bother probing, we do that on connect. - protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...) + protos, err := dht.peerstore.SupportsProtocols(p, dht.clientProtocolStrs()...) if err == nil && len(protos) != 0 { dht.Update(dht.Context(), p) }