Skip to content

Commit

Permalink
feat!: (dis)(e)nable DHT/Routing V1 endpoints independently
Browse files Browse the repository at this point in the history
allows to use both routing v1 and dht at the same time without having to compromise between them. Introduces new option (--dht-routing, enabled by default) and replaces --routing with --routing-v1-endpoints (allows more than one endpoint, defaults to cid.contact as before).

--seeds-peering automatically enables --dht-routing
  • Loading branch information
hacdias committed Apr 5, 2024
1 parent e0ac552 commit 347dedf
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gateway-conformance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
kuboNodeMultiaddr=$(ipfs --api=/ip4/127.0.0.1/tcp/5001 swarm addrs local --id | head -n 1)
# run gw
./rainbow --routing=http://127.0.0.1:8080 --peering=$kuboNodeMultiaddr &
./rainbow --routing-v1-endpoints=http://127.0.0.1:8080 --peering=$kuboNodeMultiaddr &
working-directory: rainbow

# 6. Run the gateway-conformance tests
Expand Down
2 changes: 2 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
func mustTestNode(t *testing.T, cfg Config) *Node {
cfg.DataDir = t.TempDir()
cfg.BlockstoreType = "flatfs"
cfg.DHTRouting = true
cfg.RoutingV1Endpoints = []string{cidContactEndpoint}

ctx := context.Background()

Expand Down
36 changes: 22 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ Generate an identity seed and launch a gateway:
Name: "seeds-peering",
Value: false,
EnvVars: []string{"RAINBOW_SEEDS_PEERING"},
Usage: "TODO (needs --seed and --seed-index). Automatically enables --dht-shared-host, incompatible with --routing",
Usage: "Automatic peering with peers with the same seed (requires --seed and --seed-index). Automatically enables --dht-routing and --dht-shared-host",
},
&cli.UintFlag{
Name: "seeds-peering-max-index",
Value: 100,
EnvVars: []string{"RAINBOW_SEEDS_PEERING_MAX_INDEX"},
Usage: "TODO",
Usage: "Largest index to derive automatic peering peer IDs for",
},
&cli.StringSliceFlag{
Name: "gateway-domains",
Expand Down Expand Up @@ -179,10 +179,17 @@ Generate an identity seed and launch a gateway:
EnvVars: []string{"RAINBOW_MAX_FD"},
Usage: "Maximum number of file descriptors. Defaults to 50% of the process' limit",
},
&cli.StringFlag{
Name: "routing",
Value: "",
Usage: "RoutingV1 Endpoint (otherwise Amino DHT and cid.contact is used)",
&cli.StringSliceFlag{
Name: "routing-v1-endpoints",
Value: cli.NewStringSlice(cidContactEndpoint),
EnvVars: []string{"RAINBOW_ROUTING_V1_ENDPOINTS"},
Usage: "Routing V1 endpoints to use for routing (comma-separated)",
},
&cli.BoolFlag{
Name: "dht-routing",
Value: true,
EnvVars: []string{"RAINBOW_DHT_ROUTING"},
Usage: "Use the Amino DHT for routing",
},
&cli.BoolFlag{
Name: "dht-shared-host",
Expand Down Expand Up @@ -308,14 +315,14 @@ share the same seed as long as the indexes are different.
return errors.New("--seed and --seed-index must be explicitly defined when --seeds-peering is enabled")
}

if cctx.String("routing") != "" {
return errors.New("--routing is incompatible with --seeds-peering: DHT needs to run in order to be able to find peers")
// If seeds-peering is enabled, automatically turn on DHT routing, as well
// as sharing the libp2p hosts. This allows the peer information (id and
// addresses) to be discoverable via the DHT without having to create a
// new DHT client instance just for this purpose.
err = cctx.Set("dht-routing", "true")
if err != nil {
return err
}

// If seeds-peering is enabled, automatically use the same libp2p host
// for the DHT. This allows the peer information (id and addresses) to
// be discoverable via the DHT without having to create a new DHT client
// instance just for this purpose.
err = cctx.Set("dht-shared-host", "true")
if err != nil {
return err
Expand Down Expand Up @@ -346,7 +353,8 @@ share the same seed as long as the indexes are different.
MaxMemory: cctx.Uint64("max-memory"),
MaxFD: cctx.Int("max-fd"),
InMemBlockCache: cctx.Int64("inmem-block-cache"),
RoutingV1: cctx.String("routing"),
RoutingV1Endpoints: cctx.StringSlice("routing-v1-endpoints"),
DHTRouting: cctx.Bool("dht-routing"),
DHTSharedHost: cctx.Bool("dht-shared-host"),
AcceleratedDHT: cctx.Bool("accelerated-dht"),
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
Expand Down
65 changes: 34 additions & 31 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func init() {
identify.ActivationThresh = 1
}

const ipniFallbackEndpoint = "https://cid.contact"
const cidContactEndpoint = "https://cid.contact"

type Node struct {
vs routing.ValueStore
Expand Down Expand Up @@ -100,7 +100,8 @@ type Config struct {
GatewayDomains []string
SubdomainGatewayDomains []string
TrustlessGatewayDomains []string
RoutingV1 string
RoutingV1Endpoints []string
DHTRouting bool
DHTSharedHost bool
AcceleratedDHT bool
IpnsMaxCacheTTL time.Duration
Expand Down Expand Up @@ -159,9 +160,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
)
blkst = blockstore.NewIdStore(blkst)

var pr routing.PeerRouting
var vs routing.ValueStore
var cr routing.ContentRouting
var router routing.Routing

// Increase per-host connection pool since we are making lots of concurrent requests.
httpClient := &http.Client{
Expand All @@ -184,17 +183,21 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
if cfg.RoutingV1 != "" {
routingClient, err := delegatedHTTPContentRouter(cfg.RoutingV1, routingv1client.WithStreamResultsRequired(), routingv1client.WithHTTPClient(httpClient))
var routingV1Routers []routing.Routing
for _, endpoint := range cfg.RoutingV1Endpoints {
rv1Opts := []routingv1client.Option{routingv1client.WithHTTPClient(httpClient)}
if endpoint != cidContactEndpoint {
rv1Opts = append(rv1Opts, routingv1client.WithStreamResultsRequired())
}
httpClient, err := delegatedHTTPContentRouter(endpoint, rv1Opts...)
if err != nil {
return nil, err
}
pr = routingClient
vs = routingClient
cr = routingClient
} else {
// If there are no delegated routing endpoints run an accelerated Amino DHT client and send IPNI requests to cid.contact
routingV1Routers = append(routingV1Routers, httpClient)
}

var dhtRouter routing.Routing
if cfg.DHTRouting {
var dhtHost host.Host
if cfg.DHTSharedHost {
dhtHost = h
Expand All @@ -211,8 +214,6 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}
}

var dhtRouter routing.Routing

standardClient, err := dht.New(ctx, dhtHost,
dht.Datastore(ds),
dht.BootstrapPeers(dht.GetDefaultBootstrapPeerAddrInfos()...),
Expand Down Expand Up @@ -243,36 +244,38 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
} else {
dhtRouter = standardClient
}
}

// we want to also use the default HTTP routers, so wrap the FullRT client
// in a parallel router that calls them in parallel
httpRouters, err := delegatedHTTPContentRouter(ipniFallbackEndpoint, routingv1client.WithHTTPClient(httpClient))
if err != nil {
return nil, err
}
if len(routingV1Routers) == 0 && dhtRouter == nil {
return nil, errors.New("no routers available")
}

if len(routingV1Routers) == 0 {
router = dhtRouter
} else {
routers := []*routinghelpers.ParallelRouter{
{
Router: dhtRouter,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: false,
},
{
}

for _, routingV1Router := range routingV1Routers {
routers = append(routers, &routinghelpers.ParallelRouter{
Timeout: 15 * time.Second,
Router: httpRouters,
Router: routingV1Router,
ExecuteAfter: 0,
DoNotWaitForSearchValue: true,
IgnoreError: true,
},
})
}
router := routinghelpers.NewComposableParallel(routers)

pr = router
vs = router
cr = router
router = routinghelpers.NewComposableParallel(routers)
}

return pr, nil
return router, nil
}))
h, err := libp2p.New(opts...)
if err != nil {
Expand All @@ -291,7 +294,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
}

bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)
bn := bsnet.NewFromIpfsHost(h, router)
bswap := bsclient.New(bsctx, bn, blkst,
// default is 1 minute to search for a random live-want (1
// CID). I think we want to search for random live-wants more
Expand Down Expand Up @@ -346,7 +349,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
if cfg.IpnsMaxCacheTTL > 0 {
nsOptions = append(nsOptions, namesys.WithMaxCacheTTL(cfg.IpnsMaxCacheTTL))
}
ns, err := namesys.NewNameSystem(vs, nsOptions...)
ns, err := namesys.NewNameSystem(router, nsOptions...)
if err != nil {
return nil, err
}
Expand All @@ -366,7 +369,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
bsClient: bswap,
ns: ns,
ps: ps,
vs: vs,
vs: router,
bsrv: bsrv,
resolver: r,
bwc: bwc,
Expand Down
3 changes: 3 additions & 0 deletions setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
)

func testSeedPeering(t *testing.T, n int) ([]crypto.PrivKey, []peer.ID, []*Node) {
t.Parallel()

cdns := newCachedDNS(dnsCacheRefreshInterval)
defer cdns.Close()

Expand Down Expand Up @@ -39,6 +41,7 @@ func testSeedPeering(t *testing.T, n int) ([]crypto.PrivKey, []peer.ID, []*Node)
BlockstoreType: "flatfs",
AcceleratedDHT: false,
DHTSharedHost: true,
DHTRouting: true,
}

// Add all remaining peers to the peering config.
Expand Down

0 comments on commit 347dedf

Please sign in to comment.