From 52747fc1f0255d0669883402f1cd25fb76d80973 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 30 Oct 2019 17:36:30 -0700 Subject: [PATCH 1/5] feat: allow disabling value and provider storage/messages fixes #274 --- dht.go | 4 +++ dht_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++--- handlers.go | 30 ++++++++++++++-------- opts/options.go | 36 ++++++++++++++++++++++----- records_test.go | 65 ++++++++++++++++++++++++++++++++++++++++++++++++ routing.go | 27 +++++++++++++++++++- 6 files changed, 207 insertions(+), 21 deletions(-) diff --git a/dht.go b/dht.go index 7494bd453..568965206 100644 --- a/dht.go +++ b/dht.go @@ -73,6 +73,8 @@ type IpfsDHT struct { triggerRtRefresh chan struct{} maxRecordAge time.Duration + + enableProviders, enableValues bool } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -98,6 +100,8 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout dht.maxRecordAge = cfg.MaxRecordAge + dht.enableProviders = cfg.EnableProviders + dht.enableValues = cfg.EnableValues // register for network notifs. dht.host.Network().Notify((*netNotifiee)(dht)) diff --git a/dht_test.go b/dht_test.go index b15e5b0cf..4e5d1ba29 100644 --- a/dht_test.go +++ b/dht_test.go @@ -107,13 +107,15 @@ func (testAtomicPutValidator) Select(_ string, bs [][]byte) (int, error) { return index, nil } -func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, client bool, options ...opts.Option) *IpfsDHT { d, err := New( ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), - opts.Client(client), - opts.NamespacedValidator("v", blankValidator{}), - opts.DisableAutoRefresh(), + append([]opts.Option{ + opts.Client(client), + opts.NamespacedValidator("v", blankValidator{}), + opts.DisableAutoRefresh(), + }, options...)..., ) if err != nil { t.Fatal(err) @@ -1419,6 +1421,62 @@ func TestFindClosestPeers(t *testing.T) { } } +func TestProvideDisabled(t *testing.T) { + k := testCaseCids[0] + for i := 0; i < 3; i++ { + enabledA := (i & 0x1) > 0 + enabledB := (i & 0x2) > 0 + t.Run(fmt.Sprintf("a=%v/b=%v", enabledA, enabledB), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dhtA := setupDHT(ctx, t, false, opts.EnableProviders(enabledA)) + dhtB := setupDHT(ctx, t, false, opts.EnableProviders(enabledB)) + + defer dhtA.Close() + defer dhtB.Close() + defer dhtA.host.Close() + defer dhtB.host.Close() + + connect(t, ctx, dhtA, dhtB) + + err := dhtB.Provide(ctx, k, true) + if enabledB { + if err != nil { + t.Fatal("put should have succeeded on node B", err) + } + } else { + if err != routing.ErrNotSupported { + t.Fatal("should not have put the value to node B", err) + } + _, err = dhtB.FindProviders(ctx, k) + if err != routing.ErrNotSupported { + t.Fatal("get should have failed on node B") + } + provs := dhtB.providers.GetProviders(ctx, k) + if len(provs) != 0 { + t.Fatal("node B should not have found local providers") + } + } + + provs, err := dhtA.FindProviders(ctx, k) + if enabledA { + if len(provs) != 0 { + t.Fatal("node A should not have found providers") + } + } else { + if err != routing.ErrNotSupported { + t.Fatal("node A should not have found providers") + } + } + provAddrs := dhtA.providers.GetProviders(ctx, k) + if len(provAddrs) != 0 { + t.Fatal("node A should not have found local providers") + } + }) + } +} + func TestGetSetPluggedProtocol(t *testing.T) { t.Run("PutValue/GetValue - same protocol", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/handlers.go b/handlers.go index 5ec03b8c7..23292ce80 100644 --- a/handlers.go +++ b/handlers.go @@ -26,21 +26,31 @@ type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error) func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler { switch t { - case pb.Message_GET_VALUE: - return dht.handleGetValue - case pb.Message_PUT_VALUE: - return dht.handlePutValue case pb.Message_FIND_NODE: return dht.handleFindPeer - case pb.Message_ADD_PROVIDER: - return dht.handleAddProvider - case pb.Message_GET_PROVIDERS: - return dht.handleGetProviders case pb.Message_PING: return dht.handlePing - default: - return nil } + + if dht.enableValues { + switch t { + case pb.Message_GET_VALUE: + return dht.handleGetValue + case pb.Message_PUT_VALUE: + return dht.handlePutValue + } + } + + if dht.enableProviders { + switch t { + case pb.Message_ADD_PROVIDER: + return dht.handleAddProvider + case pb.Message_GET_PROVIDERS: + return dht.handleGetProviders + } + } + + return nil } func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) { diff --git a/opts/options.go b/opts/options.go index e0e524c17..5bdf733dc 100644 --- a/opts/options.go +++ b/opts/options.go @@ -21,12 +21,14 @@ var ( // Options is a structure containing all the options that can be used when constructing a DHT. type Options struct { - Datastore ds.Batching - Validator record.Validator - Client bool - Protocols []protocol.ID - BucketSize int - MaxRecordAge time.Duration + Datastore ds.Batching + Validator record.Validator + Client bool + Protocols []protocol.ID + BucketSize int + MaxRecordAge time.Duration + EnableProviders bool + EnableValues bool RoutingTable struct { RefreshQueryTimeout time.Duration @@ -56,6 +58,8 @@ var Defaults = func(o *Options) error { } o.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) o.Protocols = DefaultProtocols + o.EnableProviders = true + o.EnableValues = true o.RoutingTable.RefreshQueryTimeout = 10 * time.Second o.RoutingTable.RefreshPeriod = 1 * time.Hour @@ -177,3 +181,23 @@ func DisableAutoRefresh() Option { return nil } } + +// EnableProviders enables storing and retrieving provider records. +// +// Defaults to true. +func EnableProviders(enable bool) Option { + return func(o *Options) error { + o.EnableProviders = enable + return nil + } +} + +// EnableValues enables storing and retrieving value records. +// +// Defaults to true. +func EnableValues(enable bool) Option { + return func(o *Options) error { + o.EnableValues = enable + return nil + } +} diff --git a/records_test.go b/records_test.go index 7083dcdd9..2594b0516 100644 --- a/records_test.go +++ b/records_test.go @@ -3,6 +3,7 @@ package dht import ( "context" "crypto/rand" + "fmt" "github.com/libp2p/go-libp2p-core/test" "testing" "time" @@ -13,6 +14,8 @@ import ( "github.com/libp2p/go-libp2p-core/routing" record "github.com/libp2p/go-libp2p-record" tnet "github.com/libp2p/go-libp2p-testing/net" + + dhtopt "github.com/libp2p/go-libp2p-kad-dht/opts" ) // Check that GetPublicKey() correctly extracts a public key @@ -305,3 +308,65 @@ func TestPubkeyGoodKeyFromDHTGoodKeyDirect(t *testing.T) { t.Fatal("got incorrect public key") } } + +func TestValuesDisabled(t *testing.T) { + for i := 0; i < 3; i++ { + enabledA := (i & 0x1) > 0 + enabledB := (i & 0x2) > 0 + t.Run(fmt.Sprintf("a=%v/b=%v", enabledA, enabledB), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + dhtA := setupDHT(ctx, t, false, dhtopt.EnableValues(enabledA)) + dhtB := setupDHT(ctx, t, false, dhtopt.EnableValues(enabledB)) + + defer dhtA.Close() + defer dhtB.Close() + defer dhtA.host.Close() + defer dhtB.host.Close() + + connect(t, ctx, dhtA, dhtB) + + pubk := dhtB.peerstore.PubKey(dhtB.self) + pkbytes, err := pubk.Bytes() + if err != nil { + t.Fatal(err) + } + + pkkey := routing.KeyForPublicKey(dhtB.self) + err = dhtB.PutValue(ctx, pkkey, pkbytes) + if enabledB { + if err != nil { + t.Fatal("put should have succeeded on node B", err) + } + } else { + if err != routing.ErrNotSupported { + t.Fatal("should not have put the value to node B", err) + } + _, err = dhtB.GetValue(ctx, pkkey) + if err != routing.ErrNotSupported { + t.Fatal("get should have failed on node B") + } + rec, _ := dhtB.getLocal(pkkey) + if rec != nil { + t.Fatal("node B should not have found the value locally") + } + } + + _, err = dhtA.GetValue(ctx, pkkey) + if enabledA { + if err != routing.ErrNotFound { + t.Fatal("node A should not have found the value") + } + } else { + if err != routing.ErrNotSupported { + t.Fatal("node A should not have found the value") + } + } + rec, _ := dhtA.getLocal(pkkey) + if rec != nil { + t.Fatal("node A should not have found the value locally") + } + }) + } +} diff --git a/routing.go b/routing.go index 1cfe293de..a4beadf95 100644 --- a/routing.go +++ b/routing.go @@ -34,6 +34,10 @@ var asyncQueryBuffer = 10 // PutValue adds value corresponding to given Key. // This is the top level "Store" operation of the DHT func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) { + if !dht.enableValues { + return routing.ErrNotSupported + } + eip := logger.EventBegin(ctx, "PutValue") defer func() { eip.Append(loggableKey(key)) @@ -110,6 +114,10 @@ type RecvdVal struct { // GetValue searches for the value corresponding to given Key. func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) { + if !dht.enableValues { + return nil, routing.ErrNotSupported + } + eip := logger.EventBegin(ctx, "GetValue") defer func() { eip.Append(loggableKey(key)) @@ -148,6 +156,10 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Op } func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + if !dht.enableValues { + return nil, routing.ErrNotSupported + } + var cfg routing.Options if err := cfg.Apply(opts...); err != nil { return nil, err @@ -250,8 +262,11 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing // GetValues gets nvals values corresponding to the given key. func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) { - eip := logger.EventBegin(ctx, "GetValues") + if !dht.enableValues { + return nil, routing.ErrNotSupported + } + eip := logger.EventBegin(ctx, "GetValues") eip.Append(loggableKey(key)) defer eip.Done() @@ -398,6 +413,9 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) { + if !dht.enableProviders { + return routing.ErrNotSupported + } eip := logger.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst}) defer func() { if err != nil { @@ -477,6 +495,9 @@ func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) { // FindProviders searches until the context expires. func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) { + if !dht.enableProviders { + return nil, routing.ErrNotSupported + } var providers []peer.AddrInfo for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) { providers = append(providers, p) @@ -488,6 +509,10 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn // Peers will be returned on the channel as soon as they are found, even before // the search query completes. func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { + if !dht.enableProviders { + return nil + } + logger.Event(ctx, "findProviders", key) peerOut := make(chan peer.AddrInfo, count) From c2b72b250d76684d9ef7c003f95e33f8dc7ed59e Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 5 Dec 2019 19:08:18 -0500 Subject: [PATCH 2/5] doc(options): document that disabling values/providers should only be done on forked dhts --- dht.go | 3 +++ opts/options.go | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/dht.go b/dht.go index 568965206..de1377ecc 100644 --- a/dht.go +++ b/dht.go @@ -74,6 +74,9 @@ type IpfsDHT struct { maxRecordAge time.Duration + // Allows disabling dht subsystems. These should _only_ be set on + // "forked" DHTs (e.g., DHTs with custom protocols and/or private + // networks). enableProviders, enableValues bool } diff --git a/opts/options.go b/opts/options.go index 5bdf733dc..74529d290 100644 --- a/opts/options.go +++ b/opts/options.go @@ -185,6 +185,9 @@ func DisableAutoRefresh() Option { // EnableProviders enables storing and retrieving provider records. // // Defaults to true. +// +// WARNING: do not change this unless you're using a forked DHT (i.e., a private +// network and/or distinct DHT protocols with the `Protocols` option). func EnableProviders(enable bool) Option { return func(o *Options) error { o.EnableProviders = enable @@ -195,6 +198,9 @@ func EnableProviders(enable bool) Option { // EnableValues enables storing and retrieving value records. // // Defaults to true. +// +// WARNING: do not change this unless you're using a forked DHT (i.e., a private +// network and/or distinct DHT protocols with the `Protocols` option). func EnableValues(enable bool) Option { return func(o *Options) error { o.EnableValues = enable From ba86f518847c50fb689450691f52d180883af3a9 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 6 Dec 2019 09:27:09 -0500 Subject: [PATCH 3/5] fix: return a closed channel from FindProvidersAsync when providers are disabled. --- routing.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/routing.go b/routing.go index a4beadf95..a796f3e6c 100644 --- a/routing.go +++ b/routing.go @@ -509,12 +509,13 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn // Peers will be returned on the channel as soon as they are found, even before // the search query completes. func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { + peerOut := make(chan peer.AddrInfo, count) if !dht.enableProviders { - return nil + close(peerOut) + return peerOut } logger.Event(ctx, "findProviders", key) - peerOut := make(chan peer.AddrInfo, count) go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) return peerOut From 2a39785d3f2bf8a686066c44b1e2ca3a6cc7b85d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 6 Dec 2019 09:47:22 -0500 Subject: [PATCH 4/5] fix(options): make the disable providers/values options consistent --- dht_test.go | 14 ++++++++++++-- opts/options.go | 16 ++++++++-------- records_test.go | 14 ++++++++++++-- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/dht_test.go b/dht_test.go index 4e5d1ba29..5aa9182c6 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1430,8 +1430,18 @@ func TestProvideDisabled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dhtA := setupDHT(ctx, t, false, opts.EnableProviders(enabledA)) - dhtB := setupDHT(ctx, t, false, opts.EnableProviders(enabledB)) + var ( + optsA, optsB []opts.Option + ) + if !enabledA { + optsA = append(optsA, opts.DisableProviders()) + } + if !enabledB { + optsB = append(optsB, opts.DisableProviders()) + } + + dhtA := setupDHT(ctx, t, false, optsA...) + dhtB := setupDHT(ctx, t, false, optsB...) defer dhtA.Close() defer dhtB.Close() diff --git a/opts/options.go b/opts/options.go index 74529d290..0920ed118 100644 --- a/opts/options.go +++ b/opts/options.go @@ -182,28 +182,28 @@ func DisableAutoRefresh() Option { } } -// EnableProviders enables storing and retrieving provider records. +// DisableProviders disables storing and retrieving provider records. // -// Defaults to true. +// Defaults to enabled. // // WARNING: do not change this unless you're using a forked DHT (i.e., a private // network and/or distinct DHT protocols with the `Protocols` option). -func EnableProviders(enable bool) Option { +func DisableProviders() Option { return func(o *Options) error { - o.EnableProviders = enable + o.EnableProviders = false return nil } } -// EnableValues enables storing and retrieving value records. +// DisableProviders disables storing and retrieving value records. // -// Defaults to true. +// Defaults to enabled. // // WARNING: do not change this unless you're using a forked DHT (i.e., a private // network and/or distinct DHT protocols with the `Protocols` option). -func EnableValues(enable bool) Option { +func DisableValues() Option { return func(o *Options) error { - o.EnableValues = enable + o.EnableValues = false return nil } } diff --git a/records_test.go b/records_test.go index 2594b0516..458de2b91 100644 --- a/records_test.go +++ b/records_test.go @@ -317,8 +317,18 @@ func TestValuesDisabled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - dhtA := setupDHT(ctx, t, false, dhtopt.EnableValues(enabledA)) - dhtB := setupDHT(ctx, t, false, dhtopt.EnableValues(enabledB)) + var ( + optsA, optsB []dhtopt.Option + ) + if !enabledA { + optsA = append(optsA, dhtopt.DisableValues()) + } + if !enabledB { + optsB = append(optsB, dhtopt.DisableValues()) + } + + dhtA := setupDHT(ctx, t, false, optsA...) + dhtB := setupDHT(ctx, t, false, optsB...) defer dhtA.Close() defer dhtB.Close() From 5a048eaa57b21b8d97f337e40fd13be32e1683bd Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 6 Dec 2019 21:44:59 -0500 Subject: [PATCH 5/5] docs(options): document that DisableValues disables retrieving public keys --- opts/options.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opts/options.go b/opts/options.go index 0920ed118..70d6db069 100644 --- a/opts/options.go +++ b/opts/options.go @@ -195,7 +195,8 @@ func DisableProviders() Option { } } -// DisableProviders disables storing and retrieving value records. +// DisableProviders disables storing and retrieving value records (including +// public keys). // // Defaults to enabled. //