Skip to content

Commit

Permalink
added support for independent client and server protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Mar 6, 2020
1 parent d7523e1 commit ef33c79
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 15 deletions.
14 changes: 8 additions & 6 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type IpfsDHT struct {

stripedPutLocks [256]sync.Mutex

protocols []protocol.ID // DHT protocols
protocols, clientProtocols []protocol.ID // DHT protocols

auto bool
mode mode
Expand Down Expand Up @@ -113,9 +113,10 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
}
if cfg.DisjointPaths == 0 {
cfg.DisjointPaths = cfg.BucketSize / 2
if err := opts.UnsetDefaults(&cfg); err != nil {
return nil, err
}

dht := makeDHT(ctx, h, cfg)
dht.autoRefresh = cfg.RoutingTable.AutoRefresh
dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod
Expand Down Expand Up @@ -207,6 +208,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg opts.Options) *IpfsDHT {
rng: rand.New(rand.NewSource(rand.Int63())),
routingTable: rt,
protocols: cfg.Protocols,
clientProtocols: cfg.ClientProtocols,
bucketSize: cfg.BucketSize,
alpha: cfg.Concurrency,
d: cfg.DisjointPaths,
Expand Down Expand Up @@ -541,9 +543,9 @@ func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}

func (dht *IpfsDHT) protocolStrs() []string {
pstrs := make([]string, len(dht.protocols))
for idx, proto := range dht.protocols {
func (dht *IpfsDHT) clientProtocolStrs() []string {
pstrs := make([]string, len(dht.clientProtocols))
for idx, proto := range dht.clientProtocols {
pstrs[idx] = string(proto)
}

Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
return nil
}

nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.clientProtocols...)
if err != nil {
return err
}
Expand Down
40 changes: 36 additions & 4 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
const ProtocolDHTOld protocol.ID = "/ipfs/dht"

var (
ProtocolDHT protocol.ID = "/ipfs/kad/1.0.0"
DefaultProtocols = []protocol.ID{ProtocolDHT}
ProtocolDHT protocol.ID = "/ipfs/kad/2.0.0"
DefaultProtocols = []protocol.ID{ProtocolDHT, "/ipfs/kad/1.0.0"}
DefaultClientProtocols = []protocol.ID{ProtocolDHT}
)

// ModeOpt describes what mode the dht should operate in
Expand All @@ -38,6 +39,7 @@ type Options struct {
Validator record.Validator
Mode ModeOpt
Protocols []protocol.ID
ClientProtocols []protocol.ID
BucketSize int
DisjointPaths int
Concurrency int
Expand Down Expand Up @@ -73,7 +75,6 @@ var Defaults = func(o *Options) error {
"pk": record.PublicKeyValidator{},
}
o.Datastore = dssync.MutexWrap(ds.NewMapDatastore())
o.Protocols = DefaultProtocols
o.EnableProviders = true
o.EnableValues = true

Expand All @@ -89,6 +90,26 @@ var Defaults = func(o *Options) error {
return nil
}

// UnsetDefaults sets default DHT options. It is applied after Defaults and any options passed to the constructor in
// order to allow for defaults that are based on other set options.
func UnsetDefaults(o *Options) error {
if len(o.ClientProtocols) == 0 && len(o.Protocols) == 0 {
o.Protocols = DefaultProtocols
o.ClientProtocols = DefaultClientProtocols
}

// If no client protocols set, use the server protocols as client protocols
if len(o.ClientProtocols) == 0 {
o.ClientProtocols = append(o.Protocols[:0:0], o.Protocols...)
}

if o.DisjointPaths == 0 {
o.DisjointPaths = o.BucketSize / 2
}

return nil
}

// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
Expand Down Expand Up @@ -180,7 +201,7 @@ func NamespacedValidator(ns string, v record.Validator) Option {
}
}

// Protocols sets the protocols for the DHT
// Protocols sets the protocols the DHT may respond to queries with
//
// Defaults to dht.DefaultProtocols
func Protocols(protocols ...protocol.ID) Option {
Expand All @@ -190,6 +211,17 @@ func Protocols(protocols ...protocol.ID) Option {
}
}

// ClientProtocols sets the protocols the DHT uses to initiate connections and queries
//
// Defaults to dht.DefaultClientProtocols. If Protocols have been set the to the non-default protocols,
// then ClientProtocols defaults to match Protocols.
func ClientProtocols(protocols ...protocol.ID) Option {
return func(o *Options) error {
o.ClientProtocols = protocols
return nil
}
}

// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
//
// The default value is 20.
Expand Down
8 changes: 4 additions & 4 deletions subscriber_notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
dht.plk.Lock()
defer dht.plk.Unlock()
for _, p := range dht.host.Network().Peers() {
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
protos, err := dht.peerstore.SupportsProtocols(p, dht.clientProtocolStrs()...)
if err != nil {
return nil, fmt.Errorf("could not check peerstore for protocol support: err: %s", err)
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, evt interface{}) {
}

// if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.protocolStrs()...)
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.clientProtocolStrs()...)
if err != nil {
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
return
Expand All @@ -143,7 +143,7 @@ func handlePeerProtocolsUpdatedEvent(dht *IpfsDHT, evt interface{}) {
return
}

protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.protocolStrs()...)
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.clientProtocolStrs()...)
if err != nil {
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
return
Expand Down Expand Up @@ -195,7 +195,7 @@ func fixLowPeers(dht *IpfsDHT) {
// Passively add peers we already know about
for _, p := range dht.host.Network().Peers() {
// Don't bother probing, we do that on connect.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
protos, err := dht.peerstore.SupportsProtocols(p, dht.clientProtocolStrs()...)
if err == nil && len(protos) != 0 {
dht.Update(dht.Context(), p)
}
Expand Down

0 comments on commit ef33c79

Please sign in to comment.