Skip to content

Commit

Permalink
feat: shared cache for manual peering (#114)
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias authored Apr 22, 2024
1 parent c45ce99 commit 1710b56
Show file tree
Hide file tree
Showing 8 changed files with 412 additions and 87 deletions.
29 changes: 29 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
- [`RAINBOW_GC_INTERVAL`](#rainbow_gc_interval)
- [`RAINBOW_GC_THRESHOLD`](#rainbow_gc_threshold)
- [`RAINBOW_IPNS_MAX_CACHE_TTL`](#rainbow_ipns_max_cache_ttl)
- [`RAINBOW_PEERING`](#rainbow_peering)
- [`RAINBOW_PEERING_SHARED_CACHE`](#rainbow_peering_shared_cache)
- [Logging](#logging)
- [`GOLOG_LOG_LEVEL`](#golog_log_level)
- [`GOLOG_LOG_FMT`](#golog_log_fmt)
Expand Down Expand Up @@ -90,6 +92,33 @@ with [DNSLink](https://dnslink.dev/).

Default: No upper bound, [TTL from IPNS Record](https://specs.ipfs.tech/ipns/ipns-record/#ttl-uint64) or [TTL from DNSLink](https://datatracker.ietf.org/doc/html/rfc2181#section-8) used as-is.

### `RAINBOW_PEERING`

A comma-separated list of [multiaddresses](https://docs.libp2p.io/concepts/fundamentals/addressing/) of peers to stay connected to.


If `RAINBOW_SEED` is set and `/p2p/rainbow-seed/N` value is found here, Rainbow
will replace it with a valid `/p2p/` for a peer ID generated from same seed
and index `N`.

Default: not set (no peering)


### `RAINBOW_PEERING_SHARED_CACHE`

Enable sharing of local cache to peers safe-listed with `RAINBOW_PEERING`.

Once enabled, Rainbow will respond to [Bitswap](https://docs.ipfs.tech/concepts/bitswap/)
queries from these safelisted peers, serving locally cached blocks if requested.

The main use case for this feature is scaling and load balancing acrosss a
fleet of rainbow, or other bitswap-capable IPFS services. Cache sharing allows
clustered services to check if any of the other instances has a requested CID.
This saves resources as data cached on other instance can be fetched internally
(e.g. LAN) rather than externally (WAN, p2p).

Default: `false` (no cache sharing)

## Logging

### `GOLOG_LOG_LEVEL`
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/ipfs-shipyard/nopfs v0.0.12
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231024163508-120e0c51ee3a
github.com/ipfs/boxo v0.19.1-0.20240418055150-eeea41458735
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger4 v0.1.5
Expand Down Expand Up @@ -90,18 +91,19 @@ require (
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-blockservice v0.5.2 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect
github.com/ipfs/go-ipfs-pq v0.0.3 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-cbor v0.1.0 // indirect
github.com/ipfs/go-ipld-format v0.6.0 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.11.0 // indirect
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
github.com/ipfs/go-verifcid v0.0.3 // indirect
github.com/ipld/go-car v0.6.2 // indirect
github.com/ipld/go-car/v2 v2.13.1 // indirect
Expand Down
71 changes: 0 additions & 71 deletions handler_test.go
Original file line number Diff line number Diff line change
@@ -1,84 +1,13 @@
package main

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"

chunker "github.com/ipfs/boxo/chunker"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/unixfs/importer/balanced"
uih "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"
util "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func mustTestNode(t *testing.T, cfg Config) *Node {
cfg.DataDir = t.TempDir()
cfg.BlockstoreType = "flatfs"
cfg.DHTRouting = DHTStandard
cfg.RoutingV1Endpoints = []string{cidContactEndpoint}

ctx := context.Background()

sr := util.NewTimeSeededRand()
sk, _, err := ic.GenerateKeyPairWithReader(ic.Ed25519, 2048, sr)
require.NoError(t, err)

cdns := newCachedDNS(dnsCacheRefreshInterval)

t.Cleanup(func() {
_ = cdns.Close()
})

gnd, err := Setup(ctx, cfg, sk, cdns)
require.NoError(t, err)
return gnd
}

func mustTestServer(t *testing.T, cfg Config) (*httptest.Server, *Node) {
gnd := mustTestNode(t, cfg)

handler, err := setupGatewayHandler(cfg, gnd)
if err != nil {
require.NoError(t, err)
}

ts := httptest.NewServer(handler)

return ts, gnd
}

func mustAddFile(t *testing.T, gnd *Node, content []byte) cid.Cid {
dsrv := merkledag.NewDAGService(gnd.bsrv)

// Create a UnixFS graph from our file, parameters described here but can be visualized at https://dag.ipfs.tech/
ufsImportParams := uih.DagBuilderParams{
Maxlinks: uih.DefaultLinksPerBlock, // Default max of 174 links per block
RawLeaves: true, // Leave the actual file bytes untouched instead of wrapping them in a dag-pb protobuf wrapper
CidBuilder: cid.V1Builder{ // Use CIDv1 for all links
Codec: uint64(multicodec.DagPb),
MhType: uint64(multicodec.Sha2_256), // Use SHA2-256 as the hash function
MhLength: -1, // Use the default hash length for the given hash function (in this case 256 bits)
},
Dagserv: dsrv,
NoCopy: false,
}
ufsBuilder, err := ufsImportParams.New(chunker.NewSizeSplitter(bytes.NewReader(content), chunker.DefaultBlockSize)) // Split the file up into fixed sized 256KiB chunks
require.NoError(t, err)

nd, err := balanced.Layout(ufsBuilder) // Arrange the graph with a balanced layout
require.NoError(t, err)

return nd.Cid()
}

func TestTrustless(t *testing.T) {
t.Parallel()

Expand Down
5 changes: 5 additions & 0 deletions keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
crand "crypto/rand"
"crypto/sha256"
"errors"
"fmt"
"io"

libp2p "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -43,3 +44,7 @@ func deriveKey(b58secret string, info []byte) (libp2p.PrivKey, error) {
key := ed25519.NewKeyFromSeed(keySeed)
return libp2p.UnmarshalEd25519PrivateKey(key)
}

func deriveKeyInfo(index int) []byte {
return []byte(fmt.Sprintf("rainbow-%d", index))
}
51 changes: 48 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"os"
"os/signal"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -205,6 +207,12 @@ Generate an identity seed and launch a gateway:
EnvVars: []string{"RAINBOW_PEERING"},
Usage: "Multiaddresses of peers to stay connected to (comma-separated)",
},
&cli.BoolFlag{
Name: "peering-shared-cache",
Value: false,
EnvVars: []string{"RAINBOW_PEERING_SHARED_CACHE"},
Usage: "Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
},
&cli.StringFlag{
Name: "blockstore",
Value: "flatfs",
Expand Down Expand Up @@ -250,6 +258,8 @@ share the same seed as long as the indexes are different.
}

app.Action = func(cctx *cli.Context) error {
fmt.Printf("Starting %s %s\n", name, version)

ddir := cctx.String("datadir")
cdns := newCachedDNS(dnsCacheRefreshInterval)
defer cdns.Close()
Expand Down Expand Up @@ -280,8 +290,8 @@ share the same seed as long as the indexes are different.

index := cctx.Int("seed-index")
if len(seed) > 0 && index >= 0 {
fmt.Println("Deriving identity from seed")
priv, err = deriveKey(seed, []byte(fmt.Sprintf("rainbow-%d", index)))
fmt.Printf("Deriving identity from seed[%d]\n", index)
priv, err = deriveKey(seed, deriveKeyInfo(index))
} else {
fmt.Println("Setting identity from libp2p.key")
keyFile := filepath.Join(secretsDir, "libp2p.key")
Expand All @@ -293,6 +303,15 @@ share the same seed as long as the indexes are different.

var peeringAddrs []peer.AddrInfo
for _, maStr := range cctx.StringSlice("peering") {
if len(seed) > 0 && index >= 0 {
maStr, err = replaceRainbowSeedWithPeer(maStr, seed)
if err != nil {
return err
}
} else if rainbowSeedRegex.MatchString(maStr) {
return fmt.Errorf("unable to peer with %q without defining --seed-index of this instance first", maStr)
}

ai, err := peer.AddrInfoFromString(maStr)
if err != nil {
return err
Expand All @@ -318,6 +337,7 @@ share the same seed as long as the indexes are different.
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: cctx.StringSlice("denylists"),
Peering: peeringAddrs,
PeeringCache: cctx.Bool("peering-shared-cache"),
GCInterval: cctx.Duration("gc-interval"),
GCThreshold: cctx.Float64("gc-threshold"),
}
Expand All @@ -342,7 +362,6 @@ share the same seed as long as the indexes are different.
Handler: handler,
}

fmt.Printf("Starting %s %s\n", name, version)
pid, err := peer.IDFromPublicKey(priv.GetPublic())
if err != nil {
return err
Expand Down Expand Up @@ -484,3 +503,29 @@ func printIfListConfigured(message string, list []string) {
fmt.Printf(message+"%v\n", strings.Join(list, ", "))
}
}

var rainbowSeedRegex = regexp.MustCompile(`/p2p/rainbow-seed/(\d+)`)

func replaceRainbowSeedWithPeer(addr string, seed string) (string, error) {
match := rainbowSeedRegex.FindStringSubmatch(addr)
if len(match) != 2 {
return addr, nil
}

index, err := strconv.Atoi(match[1])
if err != nil {
return "", err
}

priv, err := deriveKey(seed, deriveKeyInfo(index))
if err != nil {
return "", err
}

pid, err := peer.IDFromPublicKey(priv.GetPublic())
if err != nil {
return "", err
}

return strings.Replace(addr, match[0], "/p2p/"+pid.String(), 1), nil
}
Loading

0 comments on commit 1710b56

Please sign in to comment.