diff --git a/.github/workflows/gateway-conformance.yml b/.github/workflows/gateway-conformance.yml index 724df4a..0ae6141 100644 --- a/.github/workflows/gateway-conformance.yml +++ b/.github/workflows/gateway-conformance.yml @@ -18,13 +18,21 @@ jobs: # 2. Download the gateway-conformance fixtures - name: Download gateway-conformance fixtures - uses: ipfs/gateway-conformance/.github/actions/extract-fixtures@v0.0 + uses: ipfs/gateway-conformance/.github/actions/extract-fixtures@v0.2 with: output: fixtures # 3. Populate the Kubo gateway with the gateway-conformance fixtures - name: Import fixtures - run: find ./fixtures -name '*.car' -exec ipfs dag import --pin-roots=false {} \; + run: | + # Import car files + find ./fixtures -name '*.car' -exec ipfs dag import --pin-roots=false {} \; + + # Import dnslink records + # the IPFS_NS_MAP env will be used by the daemon + export IPFS_NS_MAP=$(cat "./fixtures/dnslinks.json" | jq -r '.subdomains | to_entries | map("\(.key).example.com:\(.value)") | join(",")') + export IPFS_NS_MAP="$(cat "./fixtures/dnslinks.json" | jq -r '.domains | to_entries | map("\(.key):\(.value)") | join(",")'),${IPFS_NS_MAP}" + echo "IPFS_NS_MAP=${IPFS_NS_MAP}" >> $GITHUB_ENV # 4. Build the bifrost-gateway - name: Setup Go @@ -49,13 +57,14 @@ jobs: # 6. Run the gateway-conformance tests - name: Run gateway-conformance tests - uses: ipfs/gateway-conformance/.github/actions/test@v0.0 + uses: ipfs/gateway-conformance/.github/actions/test@v0.2 with: gateway-url: http://127.0.0.1:8081 json: output.json xml: output.xml html: output.html markdown: output.md + specs: +trustless-gateway,-trustless-ipns-gateway,+path-gateway,-path-ipns-gateway,+subdomain-gateway,-subdomain-ipns-gateway,+dnslink-gateway args: -skip 'TestGatewayCar/GET_response_for_application/vnd.ipld.car/Header_Content-Length' # 7. Upload the results diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml index 668bd07..16d65d7 100644 --- a/.github/workflows/stale.yml +++ b/.github/workflows/stale.yml @@ -2,7 +2,11 @@ name: Close and mark stale issue on: schedule: - - cron: '0 0 * * *' + - cron: '0 0 * * *' + +permissions: + issues: write + pull-requests: write jobs: stale: diff --git a/Dockerfile b/Dockerfile index 4975cb7..7f42e6e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,7 +21,6 @@ RUN --mount=target=. \ CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o $GOPATH/bin/bifrost-gateway #------------------------------------------------------ -#FROM busybox:1.34.1-glibc FROM alpine:3.18 # This runs bifrost-gateway diff --git a/blockstore.go b/blockstore.go index f18dae8..571b436 100644 --- a/blockstore.go +++ b/blockstore.go @@ -70,7 +70,7 @@ func (e *exchangeBsWrapper) Close() error { // gatewayError translates underlying blockstore error into one that gateway code will return as HTTP 502 or 504 // it also makes sure Retry-After hint from remote blockstore will be passed to HTTP client, if present. func gatewayError(err error) error { - if errors.Is(err, &gateway.ErrorResponse{}) || + if errors.Is(err, &gateway.ErrorStatusCode{}) || errors.Is(err, &gateway.ErrorRetryAfter{}) { // already correct error return err diff --git a/blockstore_caboose.go b/blockstore_caboose.go index c62177d..e2bdb29 100644 --- a/blockstore_caboose.go +++ b/blockstore_caboose.go @@ -103,5 +103,7 @@ func newCabooseBlockStore(orchestrator, loggingEndpoint string, cdns *cachedDNS) DoValidation: true, PoolRefresh: caboose.DefaultPoolRefreshInterval, SaturnClient: saturnRetrievalClient, + + ComplianceCidPeriod: int64(1), }) } diff --git a/go.mod b/go.mod index 5d74097..f299d74 100644 --- a/go.mod +++ b/go.mod @@ -4,23 +4,22 @@ go 1.19 require ( github.com/cskr/pubsub v1.0.2 - github.com/filecoin-saturn/caboose v0.0.3-0.20230710211206-784216f946e7 - github.com/gogo/protobuf v1.3.2 + github.com/filecoin-saturn/caboose v0.0.3 github.com/hashicorp/golang-lru/v2 v2.0.1 - github.com/ipfs/boxo v0.8.2-0.20230530204310-0dcbaccc3745 + github.com/ipfs/boxo v0.10.2-0.20230627114119-8424cf4e020a github.com/ipfs/go-block-format v0.1.2 - github.com/ipfs/go-cid v0.4.0 - github.com/ipfs/go-ipld-format v0.4.1-0.20230530195241-c3da01c74a06 + github.com/ipfs/go-cid v0.4.1 + github.com/ipfs/go-ipld-format v0.5.0 github.com/ipfs/go-log/v2 v2.5.1 - github.com/ipfs/go-unixfsnode v1.6.0 + github.com/ipfs/go-unixfsnode v1.7.1 github.com/ipld/go-codec-dagpb v1.6.0 github.com/ipld/go-ipld-prime v0.20.0 github.com/libp2p/go-libp2p v0.26.3 - github.com/libp2p/go-libp2p-kad-dht v0.21.1 - github.com/libp2p/go-libp2p-routing-helpers v0.4.0 + github.com/libp2p/go-libp2p-kad-dht v0.23.0 + github.com/libp2p/go-libp2p-routing-helpers v0.7.0 github.com/mitchellh/go-server-timing v1.0.1 - github.com/multiformats/go-multicodec v0.8.1 - github.com/multiformats/go-multihash v0.2.1 + github.com/multiformats/go-multicodec v0.9.0 + github.com/multiformats/go-multihash v0.2.3 github.com/prometheus/client_golang v1.15.1 github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 github.com/spf13/cobra v1.6.1 @@ -31,6 +30,7 @@ require ( go.opentelemetry.io/otel/sdk v1.14.0 go.opentelemetry.io/otel/trace v1.14.0 go.uber.org/atomic v1.10.0 + go.uber.org/multierr v1.9.0 go.uber.org/zap v1.24.0 golang.org/x/sync v0.1.0 ) @@ -59,6 +59,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5 // indirect github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect @@ -83,7 +84,6 @@ require ( github.com/ipfs/go-ipfs-util v0.0.2 // indirect github.com/ipfs/go-ipld-cbor v0.0.6 // indirect github.com/ipfs/go-ipld-legacy v0.2.1 // indirect - github.com/ipfs/go-ipns v0.3.0 // indirect github.com/ipfs/go-libipfs v0.6.0 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-merkledag v0.10.1-0.20230601163447-eceea556e7d4 // indirect @@ -121,7 +121,7 @@ require ( github.com/multiformats/go-multiaddr v0.8.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect - github.com/multiformats/go-multibase v0.1.1 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.5.1 // indirect @@ -168,7 +168,6 @@ require ( go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/dig v1.15.0 // indirect go.uber.org/fx v1.18.2 // indirect - go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect golang.org/x/mod v0.7.0 // indirect @@ -177,6 +176,7 @@ require ( golang.org/x/text v0.7.0 // indirect golang.org/x/tools v0.5.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect + gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect google.golang.org/grpc v1.53.0 // indirect google.golang.org/protobuf v1.30.0 // indirect diff --git a/go.sum b/go.sum index bf60ef6..730d9c2 100644 --- a/go.sum +++ b/go.sum @@ -114,8 +114,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/felixge/httpsnoop v1.0.0/go.mod h1:3+D9sFq0ahK/JeJPhCBUV1xlf4/eIYrUQaxulT0VzX8= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/filecoin-saturn/caboose v0.0.3-0.20230710211206-784216f946e7 h1:w03hR6D3tjry84MVACydAFjRBcrmyNDPf5WuN0MC9OA= -github.com/filecoin-saturn/caboose v0.0.3-0.20230710211206-784216f946e7/go.mod h1:CSXZMijzD8z0Q/K5JQPeDZnb1o50MBK9VGZhOleqIDk= +github.com/filecoin-saturn/caboose v0.0.3 h1:VcfNc3J6qwCAdxKIVkkCglG0ll1qL043S2qcChmUr6o= +github.com/filecoin-saturn/caboose v0.0.3/go.mod h1:CSXZMijzD8z0Q/K5JQPeDZnb1o50MBK9VGZhOleqIDk= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= @@ -272,8 +272,8 @@ github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7P github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.8.2-0.20230530204310-0dcbaccc3745 h1:Q7akmTTtmoHMMYr+wWieIMT91gKva0qqQHzZ+yU/KxY= -github.com/ipfs/boxo v0.8.2-0.20230530204310-0dcbaccc3745/go.mod h1:kpvAPB7sKBGIbjw0ZEonxGLGQV80SE8u/PPizTlhXYQ= +github.com/ipfs/boxo v0.10.2-0.20230627114119-8424cf4e020a h1:Yiy9cuIbazoTzjcRer4NNWeSv3qPJfsZUX9rHQEBe08= +github.com/ipfs/boxo v0.10.2-0.20230627114119-8424cf4e020a/go.mod h1:eVXIkLOG+fTJSuXtkANlwLllV1CEayOZnbDClK8ZOQY= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-bitswap v0.11.0 h1:j1WVvhDX1yhG32NTC9xfxnqycqYIlhzEzLXG/cU1HyQ= @@ -286,8 +286,8 @@ github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUP github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog= github.com/ipfs/go-cid v0.0.6/go.mod h1:6Ux9z5e+HpkQdckYoX1PG/6xqKspzlEIR5SDmgqgC/I= -github.com/ipfs/go-cid v0.4.0 h1:a4pdZq0sx6ZSxbCizebnKiMCx/xI/aBBFlB73IgH4rA= -github.com/ipfs/go-cid v0.4.0/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= +github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= +github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= github.com/ipfs/go-datastore v0.5.0/go.mod h1:9zhEApYMTl17C8YDp7JmU7sQZi2/wqiYh73hakZ90Bk= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= @@ -315,12 +315,10 @@ github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdr github.com/ipfs/go-ipld-cbor v0.0.6 h1:pYuWHyvSpIsOOLw4Jy7NbBkCyzLDcl64Bf/LZW7eBQ0= github.com/ipfs/go-ipld-cbor v0.0.6/go.mod h1:ssdxxaLJPXH7OjF5V4NSjBbcfh+evoR4ukuru0oPXMA= github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dCDnkOJhcZkms= -github.com/ipfs/go-ipld-format v0.4.1-0.20230530195241-c3da01c74a06 h1:veXief2ep+niDh2HuhDrOk0+W2iqn4QdIcmI+8MD9bc= -github.com/ipfs/go-ipld-format v0.4.1-0.20230530195241-c3da01c74a06/go.mod h1:ImdZqJQaEouMjCvqCe0ORUS+uoBmf7Hf+EO/jh+nk3M= +github.com/ipfs/go-ipld-format v0.5.0 h1:WyEle9K96MSrvr47zZHKKcDxJ/vlpET6PSiQsAFO+Ds= +github.com/ipfs/go-ipld-format v0.5.0/go.mod h1:ImdZqJQaEouMjCvqCe0ORUS+uoBmf7Hf+EO/jh+nk3M= github.com/ipfs/go-ipld-legacy v0.2.1 h1:mDFtrBpmU7b//LzLSypVrXsD8QxkEWxu5qVxN99/+tk= github.com/ipfs/go-ipld-legacy v0.2.1/go.mod h1:782MOUghNzMO2DER0FlBR94mllfdCJCkTtDtPM51otM= -github.com/ipfs/go-ipns v0.3.0 h1:ai791nTgVo+zTuq2bLvEGmWP1M0A6kGTXUsgv/Yq67A= -github.com/ipfs/go-ipns v0.3.0/go.mod h1:3cLT2rbvgPZGkHJoPO1YMJeh6LtkxopCkKFcio/wE24= github.com/ipfs/go-libipfs v0.6.0 h1:3FuckAJEm+zdHbHbf6lAyk0QUzc45LsFcGw102oBCZM= github.com/ipfs/go-libipfs v0.6.0/go.mod h1:UjjDIuehp2GzlNP0HEr5I9GfFT7zWgst+YfpUEIThtw= github.com/ipfs/go-log v1.0.5 h1:2dOuUCB1Z7uoczMWgAyDck5JLb72zHzrMnGnCNNbvY8= @@ -334,8 +332,8 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= github.com/ipfs/go-unixfs v0.4.5 h1:wj8JhxvV1G6CD7swACwSKYa+NgtdWC1RUit+gFnymDU= -github.com/ipfs/go-unixfsnode v1.6.0 h1:JOSA02yaLylRNi2rlB4ldPr5VcZhcnaIVj5zNLcOjDo= -github.com/ipfs/go-unixfsnode v1.6.0/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= +github.com/ipfs/go-unixfsnode v1.7.1 h1:RRxO2b6CSr5UQ/kxnGzaChTjp5LWTdf3Y4n8ANZgB/s= +github.com/ipfs/go-unixfsnode v1.7.1/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= github.com/ipfs/go-verifcid v0.0.2/go.mod h1:40cD9x1y4OWnFXbLNJYRe7MpNvWlMn3LZAG5Wb4xnPU= github.com/ipld/go-car v0.6.0 h1:d5QrGLnHAxiNLHor+DKGrLdqnM0dQJh2whfSXRDq6J0= @@ -397,14 +395,14 @@ github.com/libp2p/go-libp2p v0.26.3 h1:6g/psubqwdaBqNNoidbRKSTBEYgaOuKBhHl8Q5tO+ github.com/libp2p/go-libp2p v0.26.3/go.mod h1:x75BN32YbwuY0Awm2Uix4d4KOz+/4piInkp4Wr3yOo8= github.com/libp2p/go-libp2p-asn-util v0.2.0 h1:rg3+Os8jbnO5DxkC7K/Utdi+DkY3q/d1/1q+8WeNAsw= github.com/libp2p/go-libp2p-asn-util v0.2.0/go.mod h1:WoaWxbHKBymSN41hWSq/lGKJEca7TNm58+gGJi2WsLI= -github.com/libp2p/go-libp2p-kad-dht v0.21.1 h1:xpfp8/t9+X2ip1l8Umap1/UGNnJ3RHJgKGAEsnRAlTo= -github.com/libp2p/go-libp2p-kad-dht v0.21.1/go.mod h1:Oy8wvbdjpB70eS5AaFaI68tOtrdo3KylTvXDjikxqFo= +github.com/libp2p/go-libp2p-kad-dht v0.23.0 h1:sxE6LxLopp79eLeV695n7+c77V/Vn4AMF28AdM/XFqM= +github.com/libp2p/go-libp2p-kad-dht v0.23.0/go.mod h1:oO5N308VT2msnQI6qi5M61wzPmJYg7Tr9e16m5n7uDU= github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9bno/4/U1oA= github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= -github.com/libp2p/go-libp2p-routing-helpers v0.4.0 h1:b7y4aixQ7AwbqYfcOQ6wTw8DQvuRZeTAA0Od3YYN5yc= -github.com/libp2p/go-libp2p-routing-helpers v0.4.0/go.mod h1:dYEAgkVhqho3/YKxfOEGdFMIcWfAFNlZX8iAIihYA2E= +github.com/libp2p/go-libp2p-routing-helpers v0.7.0 h1:sirOYVD0wGWjkDwHZvinunIpaqPLBXkcnXApVHwZFGA= +github.com/libp2p/go-libp2p-routing-helpers v0.7.0/go.mod h1:R289GUxUMzRXIbWGSuUUTPrlVJZ3Y/pPz495+qgXJX8= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= github.com/libp2p/go-msgio v0.3.0/go.mod h1:nyRM819GmVaF9LX3l03RMh10QdOroF++NBbxAb0mmDM= @@ -474,16 +472,16 @@ github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/e github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc= -github.com/multiformats/go-multibase v0.1.1 h1:3ASCDsuLX8+j4kx58qnJ4YFq/JWTJpCyDW27ztsVTOI= -github.com/multiformats/go-multibase v0.1.1/go.mod h1:ZEjHE+IsUrgp5mhlEAYjMtZwK1k4haNkcaPg9aoe1a8= -github.com/multiformats/go-multicodec v0.8.1 h1:ycepHwavHafh3grIbR1jIXnKCsFm0fqsfEOsJ8NtKE8= -github.com/multiformats/go-multicodec v0.8.1/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= +github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= +github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= +github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg= +github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= -github.com/multiformats/go-multihash v0.2.1 h1:aem8ZT0VA2nCHHk7bPJ1BjUbHNciqZC/d16Vve9l108= -github.com/multiformats/go-multihash v0.2.1/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc= +github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= +github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3de4741sbiSdfo= github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= @@ -939,6 +937,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E= +gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y= diff --git a/handlers.go b/handlers.go index a4beb75..4cdc046 100644 --- a/handlers.go +++ b/handlers.go @@ -92,7 +92,7 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC blockService := blockservice.New(cacheBlockStore, exch) // Creates the gateway with the block service and the routing. - gwAPI, err = gateway.NewBlocksGateway(blockService, gateway.WithValueStore(routing)) + gwAPI, err = gateway.NewBlocksBackend(blockService, gateway.WithValueStore(routing)) if err != nil { return nil, err } @@ -118,7 +118,7 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC // Note: in the future we may want to make this more configurable. noDNSLink := false - publicGateways := map[string]*gateway.Specification{ + publicGateways := map[string]*gateway.PublicGateway{ "localhost": { Paths: []string{"/ipfs", "/ipns"}, NoDNSLink: noDNSLink, @@ -135,7 +135,7 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC // If we're doing tests, ensure the right public gateways are enabled. if os.Getenv("GATEWAY_CONFORMANCE_TEST") == "true" { - publicGateways["example.com"] = &gateway.Specification{ + publicGateways["example.com"] = &gateway.PublicGateway{ Paths: []string{"/ipfs", "/ipns"}, NoDNSLink: noDNSLink, DeserializedResponses: true, @@ -149,21 +149,19 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC gwHandler := gateway.NewHandler(gwConf, gwAPI) ipfsHandler := withHTTPMetrics(gwHandler, "ipfs") - //ipnsHandler := withHTTPMetrics(gwHandler, "ipns") - ipnsHandler := gwHandler + ipnsHandler := withHTTPMetrics(gwHandler, "ipns") mux := http.NewServeMux() mux.Handle("/ipfs/", ipfsHandler) mux.Handle("/ipns/", ipnsHandler) // TODO: below is legacy which we want to remove, measuring this separately // allows us to decide when is the time to do it. - //legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc") - legacyKuboRpcHandler := newKuboRPCHandler(kuboRPC) + legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc") mux.Handle("/api/v0/", legacyKuboRpcHandler) // Construct the HTTP handler for the gateway. handler := withConnect(mux) - handler = http.Handler(gateway.WithHostname(gwConf, gwAPI, handler)) + handler = http.Handler(gateway.NewHostnameHandler(gwConf, gwAPI, handler)) handler = servertiming.Middleware(handler, nil) // Add logging. diff --git a/lib/blockstore_cache.go b/lib/blockstore_cache.go index 7204361..a41778c 100644 --- a/lib/blockstore_cache.go +++ b/lib/blockstore_cache.go @@ -3,8 +3,6 @@ package lib import ( "context" "errors" - "sync/atomic" - "time" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" @@ -23,39 +21,42 @@ const DefaultCacheBlockStoreSize = 1024 var cacheLog = golog.Logger("cache/block") -var cacheHitsMetric = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "ipfs", - Subsystem: "http", - Name: "blockstore_cache_hit", - Help: "The number of global block cache hits.", -}) - -var cacheRequestsMetric = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "ipfs", - Subsystem: "http", - Name: "blockstore_cache_requests", - Help: "The number of global block cache requests.", -}) - -func init() { - prometheus.Register(cacheHitsMetric) - prometheus.Register(cacheRequestsMetric) -} - func NewCacheBlockStore(size int) (blockstore.Blockstore, error) { c, err := lru.New2Q[string, []byte](size) if err != nil { return nil, err } - cbs := cacheBlockStore{ + cacheHitsMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "http", + Name: "blockstore_cache_hit", + Help: "The number of global block cache hits.", + }) + + cacheRequestsMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "http", + Name: "blockstore_cache_requests", + Help: "The number of global block cache requests.", + }) + + err = prometheus.Register(cacheHitsMetric) + if err != nil { + return nil, err + } + + err = prometheus.Register(cacheRequestsMetric) + if err != nil { + return nil, err + } + + return &cacheBlockStore{ cache: c, rehash: uatomic.NewBool(false), cacheHitsMetric: cacheHitsMetric, cacheRequestsMetric: cacheRequestsMetric, - putDeficit: atomic.Int32{}, - } - return &cbs, nil + }, nil } type cacheBlockStore struct { @@ -63,8 +64,6 @@ type cacheBlockStore struct { rehash *uatomic.Bool cacheHitsMetric prometheus.Counter cacheRequestsMetric prometheus.Counter - - putDeficit atomic.Int32 } func (l *cacheBlockStore) DeleteBlock(ctx context.Context, c cid.Cid) error { @@ -77,7 +76,6 @@ func (l *cacheBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) { } func (l *cacheBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { - l.putDeficit.Add(-1) l.cacheRequestsMetric.Add(1) blkData, found := l.cache.Get(string(c.Hash())) @@ -124,11 +122,6 @@ func (l *cacheBlockStore) Put(ctx context.Context, blk blocks.Block) error { func (l *cacheBlockStore) PutMany(ctx context.Context, blks []blocks.Block) error { for _, b := range blks { - new := l.putDeficit.Add(1) - if new > 0 { - time.Sleep(time.Millisecond * time.Duration(new)) - } - if err := l.Put(ctx, b); err != nil { return err } diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index e437bbe..a0da38a 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -13,8 +13,6 @@ import ( "sync" "time" - "golang.org/x/sync/semaphore" - "github.com/filecoin-saturn/caboose" "github.com/ipfs/boxo/blockservice" blockstore "github.com/ipfs/boxo/blockstore" @@ -29,12 +27,15 @@ import ( ipfspath "github.com/ipfs/boxo/path" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" golog "github.com/ipfs/go-log/v2" routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-multihash" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/multierr" ) var graphLog = golog.Logger("backend/graph") @@ -86,6 +87,14 @@ type Notifier interface { NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error } +// notifiersForRootCid is used for reducing lock contention by only notifying +// exchanges related to the same content root CID +type notifiersForRootCid struct { + lk sync.RWMutex + deleted int8 + notifiers []Notifier +} + type GraphGateway struct { fetcher CarFetcher blockFetcher exchange.Fetcher @@ -93,7 +102,8 @@ type GraphGateway struct { namesys namesys.NameSystem bstore blockstore.Blockstore - metrics *GraphGatewayMetrics + notifiers sync.Map // cid -> notifiersForRootCid + metrics *GraphGatewayMetrics } type GraphGatewayMetrics struct { @@ -153,6 +163,7 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts .. routing: vs, namesys: ns, bstore: bs, + notifiers: sync.Map{}, metrics: registerGraphGatewayMetrics(), }, nil } @@ -242,12 +253,16 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics { } } -var cacheLimiter = semaphore.NewWeighted(4096) -var cachePool = sync.Pool{ - New: func() any { - bs, _ := NewCacheBlockStore(384) - return bs - }, +func (api *GraphGateway) getRootOfPath(path string) string { + pth, err := ipfspath.ParsePath(path) + if err != nil { + return path + } + if pth.IsJustAKey() { + return pth.Segments()[0] + } else { + return pth.Segments()[1] + } } /* @@ -261,8 +276,36 @@ Implementation iteration plan: */ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx context.Context, path string) (gateway.IPFSBackend, func(), error) { - bstore := cachePool.Get().(blockstore.Blockstore) - exch := newBlockExchange(bstore, api.blockFetcher) + bstore := api.bstore + carFetchingExch := newInboundBlockExchange() + doneWithFetcher := make(chan struct{}, 1) + exch := &handoffExchange{ + startingExchange: carFetchingExch, + followupExchange: &blockFetcherExchWrapper{api.blockFetcher}, + bstore: bstore, + handoffCh: doneWithFetcher, + metrics: api.metrics, + } + + notifierKey := api.getRootOfPath(path) + var notifier *notifiersForRootCid + for { + notifiers, _ := api.notifiers.LoadOrStore(notifierKey, ¬ifiersForRootCid{notifiers: []Notifier{}}) + if n, ok := notifiers.(*notifiersForRootCid); ok { + n.lk.Lock() + // could have been deleted after our load. try again. + if n.deleted != 0 { + n.lk.Unlock() + continue + } + notifier = n + n.notifiers = append(n.notifiers, exch) + n.lk.Unlock() + break + } else { + return nil, nil, errors.New("failed to get notifier") + } + } go func(metrics *GraphGatewayMetrics) { defer func() { @@ -300,9 +343,6 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con blk, rdErr := cr.Next() select { case blkCh <- blockRead{blk, rdErr}: - if rdErr != nil { - return - } case <-cbCtx.Done(): return } @@ -330,29 +370,62 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con return blkRead.err } if blkRead.block != nil { - if err := bstore.PutMany(ctx, []blocks.Block{blkRead.block}); err != nil { + if err := bstore.Put(ctx, blkRead.block); err != nil { return err } metrics.carBlocksFetchedMetric.Inc() - exch.NotifyNewBlocks(ctx, blkRead.block) + api.notifyOngoingRequests(ctx, notifierKey, blkRead.block) } } }) if err != nil { graphLog.Infow("car Fetch failed", "path", path, "error", err) } - if err := exch.Close(); err != nil { + if err := carFetchingExch.Close(); err != nil { graphLog.Errorw("carFetchingExch.Close()", "error", err) } + doneWithFetcher <- struct{}{} + close(doneWithFetcher) }(api.metrics) bserv := blockservice.New(bstore, exch) - blkgw, err := gateway.NewBlocksGateway(bserv) + blkgw, err := gateway.NewBlocksBackend(bserv) if err != nil { return nil, nil, err } - return blkgw, func() {}, nil + return blkgw, func() { + notifier.lk.Lock() + for i, e := range notifier.notifiers { + if e == exch { + notifier.notifiers = append(notifier.notifiers[0:i], notifier.notifiers[i+1:]...) + break + } + } + if len(notifier.notifiers) == 0 { + notifier.deleted = 1 + api.notifiers.Delete(notifierKey) + } + notifier.lk.Unlock() + }, nil +} + +func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) { + if notifiers, ok := api.notifiers.Load(key); ok { + notifier, ok := notifiers.(*notifiersForRootCid) + if !ok { + graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", "could not get notifiersForRootCid") + return + } + notifier.lk.RLock() + for _, n := range notifier.notifiers { + err := n.NotifyNewBlocks(ctx, blks...) + if err != nil { + graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", err) + } + } + notifier.lk.RUnlock() + } } type fileCloseWrapper struct { @@ -395,11 +468,6 @@ func wrapNodeWithClose[T files.Node](node T, closeFn func()) (T, error) { } func (api *GraphGateway) Get(ctx context.Context, path gateway.ImmutablePath, byteRanges ...gateway.ByteRange) (gateway.ContentPathMetadata, *gateway.GetResponse, error) { - if err := cacheLimiter.Acquire(ctx, 1); err != nil { - return gateway.ContentPathMetadata{}, nil, err - } - defer cacheLimiter.Release(1) - rangeCount := len(byteRanges) api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": strconv.Itoa(rangeCount)}).Inc() @@ -447,11 +515,6 @@ func (api *GraphGateway) Get(ctx context.Context, path gateway.ImmutablePath, by } func (api *GraphGateway) GetAll(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.Node, error) { - if err := cacheLimiter.Acquire(ctx, 1); err != nil { - return gateway.ContentPathMetadata{}, nil, err - } - defer cacheLimiter.Release(1) - api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "all", "entityRanges": "0"}).Inc() blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&dag-scope=all") if err != nil { @@ -469,11 +532,6 @@ func (api *GraphGateway) GetAll(ctx context.Context, path gateway.ImmutablePath) } func (api *GraphGateway) GetBlock(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.File, error) { - if err := cacheLimiter.Acquire(ctx, 1); err != nil { - return gateway.ContentPathMetadata{}, nil, err - } - defer cacheLimiter.Release(1) - api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "block", "entityRanges": "0"}).Inc() // TODO: if path is `/ipfs/cid`, we should use ?format=raw blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&dag-scope=block") @@ -492,11 +550,6 @@ func (api *GraphGateway) GetBlock(ctx context.Context, path gateway.ImmutablePat } func (api *GraphGateway) Head(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.Node, error) { - if err := cacheLimiter.Acquire(ctx, 1); err != nil { - return gateway.ContentPathMetadata{}, nil, err - } - defer cacheLimiter.Release(1) - api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "entity", "entityRanges": "1"}).Inc() // TODO: we probably want to move this either to boxo, or at least to loadRequestIntoSharedBlockstoreAndBlocksGateway @@ -529,19 +582,14 @@ func (api *GraphGateway) ResolvePath(ctx context.Context, path gateway.Immutable return blkgw.ResolvePath(ctx, path) } -func (api *GraphGateway) GetCAR(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, io.ReadCloser, <-chan error, error) { - if err := cacheLimiter.Acquire(ctx, 1); err != nil { - return gateway.ContentPathMetadata{}, nil, nil, err - } - defer cacheLimiter.Release(1) - +func (api *GraphGateway) GetCAR(ctx context.Context, path gateway.ImmutablePath, params gateway.CarParams) (gateway.ContentPathMetadata, io.ReadCloser, error) { api.metrics.carParamsMetric.With(prometheus.Labels{"dagScope": "all", "entityRanges": "0"}).Inc() blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&dag-scope=all") if err != nil { - return gateway.ContentPathMetadata{}, nil, nil, err + return gateway.ContentPathMetadata{}, nil, err } defer closeFn() - return blkgw.GetCAR(ctx, path) + return blkgw.GetCAR(ctx, path, params) } func (api *GraphGateway) IsCached(ctx context.Context, path ifacepath.Path) bool { @@ -551,13 +599,13 @@ func (api *GraphGateway) IsCached(ctx context.Context, path ifacepath.Path) bool // TODO: This is copy-paste from blocks gateway, maybe share code func (api *GraphGateway) GetIPNSRecord(ctx context.Context, c cid.Cid) ([]byte, error) { if api.routing == nil { - return nil, gateway.NewErrorResponse(errors.New("IPNS Record responses are not supported by this gateway"), http.StatusNotImplemented) + return nil, gateway.NewErrorStatusCode(errors.New("IPNS Record responses are not supported by this gateway"), http.StatusNotImplemented) } // Fails fast if the CID is not an encoded Libp2p Key, avoids wasteful // round trips to the remote routing provider. if multicodec.Code(c.Type()) != multicodec.Libp2pKey { - return nil, gateway.NewErrorResponse(errors.New("cid codec must be libp2p-key"), http.StatusBadRequest) + return nil, gateway.NewErrorStatusCode(errors.New("cid codec must be libp2p-key"), http.StatusBadRequest) } // The value store expects the key itself to be encoded as a multihash. @@ -595,7 +643,7 @@ func (api *GraphGateway) ResolveMutable(ctx context.Context, p ifacepath.Path) ( } return imPath, nil default: - return gateway.ImmutablePath{}, gateway.NewErrorResponse(fmt.Errorf("unsupported path namespace: %s", p.Namespace()), http.StatusNotImplemented) + return gateway.ImmutablePath{}, gateway.NewErrorStatusCode(fmt.Errorf("unsupported path namespace: %s", p.Namespace()), http.StatusNotImplemented) } } @@ -609,81 +657,189 @@ func (api *GraphGateway) GetDNSLinkRecord(ctx context.Context, hostname string) return ifacepath.New(p.String()), err } - return nil, gateway.NewErrorResponse(errors.New("not implemented"), http.StatusNotImplemented) + return nil, gateway.NewErrorStatusCode(errors.New("not implemented"), http.StatusNotImplemented) } var _ gateway.IPFSBackend = (*GraphGateway)(nil) -type blockingExchange struct { - notify chan struct{} - nl sync.Mutex - - bstore blockstore.Blockstore - f exchange.Fetcher +type inboundBlockExchange struct { + ps BlockPubSub } -func newBlockExchange(bstore blockstore.Blockstore, fetcher exchange.Fetcher) *blockingExchange { - return &blockingExchange{ - notify: make(chan struct{}), - bstore: bstore, - f: fetcher, +func newInboundBlockExchange() *inboundBlockExchange { + return &inboundBlockExchange{ + ps: NewBlockPubSub(), } } -func (b *blockingExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - <-b.notify - +func (i *inboundBlockExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blk, more := <-i.ps.Subscribe(ctx, c.Hash()) if err := ctx.Err(); err != nil { return nil, err } + if !more { + return nil, format.ErrNotFound{Cid: c} + } + return blk, nil +} + +func (i *inboundBlockExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + mhMap := make(map[string]struct{}) + for _, c := range cids { + mhMap[string(c.Hash())] = struct{}{} + } + mhs := make([]multihash.Multihash, 0, len(mhMap)) + for k := range mhMap { + mhs = append(mhs, multihash.Multihash(k)) + } + return i.ps.Subscribe(ctx, mhs...), nil +} + +func (i *inboundBlockExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + // TODO: handle context cancellation and/or blockage here + i.ps.Publish(blocks...) + return nil +} + +func (i *inboundBlockExchange) Close() error { + i.ps.Shutdown() + return nil +} + +var _ exchange.Interface = (*inboundBlockExchange)(nil) - if blk, err := b.bstore.Get(ctx, c); err == nil { +type handoffExchange struct { + startingExchange, followupExchange exchange.Interface + bstore blockstore.Blockstore + handoffCh <-chan struct{} + metrics *GraphGatewayMetrics +} + +func (f *handoffExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blkCh, err := f.startingExchange.GetBlocks(ctx, []cid.Cid{c}) + if err != nil { + return nil, err + } + blk, ok := <-blkCh + if ok { return blk, nil } - blk, err := b.f.GetBlock(ctx, c) - if err == nil && blk != nil { - b.bstore.Put(ctx, blk) + + select { + case <-f.handoffCh: + graphLog.Debugw("switching to backup block fetcher", "cid", c) + f.metrics.blockRecoveryAttemptMetric.Inc() + return f.followupExchange.GetBlock(ctx, c) + case <-ctx.Done(): + return nil, ctx.Err() } - return blk, err } -func (b *blockingExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - ch := make(chan blocks.Block) +func (f *handoffExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + blkCh, err := f.startingExchange.GetBlocks(ctx, cids) + if err != nil { + return nil, err + } - go func(ctx context.Context, cids []cid.Cid) { - for _, c := range cids { - blk, err := b.GetBlock(ctx, c) - if ctx.Err() != nil { - return + retCh := make(chan blocks.Block) + + go func() { + cs := cid.NewSet() + for cs.Len() < len(cids) { + blk, ok := <-blkCh + if !ok { + break + } + select { + case retCh <- blk: + cs.Add(blk.Cid()) + case <-ctx.Done(): } - if err == nil { - ch <- blk + } + + for cs.Len() < len(cids) { + select { + case <-ctx.Done(): + return + case <-f.handoffCh: + var newCidArr []cid.Cid + for _, c := range cids { + if !cs.Has(c) { + blk, _ := f.bstore.Get(ctx, c) + if blk != nil { + select { + case retCh <- blk: + cs.Add(blk.Cid()) + case <-ctx.Done(): + return + } + } else { + newCidArr = append(newCidArr, c) + } + } + } + + if len(newCidArr) == 0 { + return + } + + graphLog.Debugw("needed to use use a backup fetcher for cids", "cids", newCidArr) + f.metrics.blockRecoveryAttemptMetric.Add(float64(len(newCidArr))) + fch, err := f.followupExchange.GetBlocks(ctx, newCidArr) + if err != nil { + graphLog.Errorw("error getting blocks from followupExchange", "error", err) + return + } + for cs.Len() < len(cids) { + blk, ok := <-fch + if !ok { + return + } + select { + case retCh <- blk: + cs.Add(blk.Cid()) + case <-ctx.Done(): + return + } + } } } - }(ctx, cids) + }() + return retCh, nil +} - return ch, nil +func (f *handoffExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + err1 := f.startingExchange.NotifyNewBlocks(ctx, blocks...) + err2 := f.followupExchange.NotifyNewBlocks(ctx, blocks...) + return multierr.Combine(err1, err2) } -func (b *blockingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { - b.nl.Lock() - defer b.nl.Unlock() - on := b.notify - if on == nil { - return nil - } - b.notify = make(chan struct{}) - close(on) +func (f *handoffExchange) Close() error { + err1 := f.startingExchange.Close() + err2 := f.followupExchange.Close() + return multierr.Combine(err1, err2) +} + +var _ exchange.Interface = (*handoffExchange)(nil) + +type blockFetcherExchWrapper struct { + f exchange.Fetcher +} + +func (b *blockFetcherExchWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + return b.f.GetBlock(ctx, c) +} + +func (b *blockFetcherExchWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + return b.f.GetBlocks(ctx, cids) +} + +func (b *blockFetcherExchWrapper) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { return nil } -func (b *blockingExchange) Close() error { - b.nl.Lock() - on := b.notify - b.notify = nil - close(on) - b.nl.Unlock() +func (b *blockFetcherExchWrapper) Close() error { return nil } -var _ exchange.Interface = (*blockingExchange)(nil) +var _ exchange.Interface = (*blockFetcherExchWrapper)(nil) diff --git a/routing.go b/routing.go index 5a941d5..286c5dd 100644 --- a/routing.go +++ b/routing.go @@ -13,10 +13,7 @@ import ( "strings" "time" - "github.com/gogo/protobuf/proto" "github.com/ipfs/boxo/ipns" - ipns_pb "github.com/ipfs/boxo/ipns/pb" - "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) @@ -80,13 +77,12 @@ func (ps *proxyRouting) SearchValue(ctx context.Context, k string, opts ...routi } func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err error) { - key = strings.TrimPrefix(key, "/ipns/") - id, err := peer.IDFromBytes([]byte(key)) + name, err := ipns.NameFromRoutingKey([]byte(key)) if err != nil { return nil, err } - key = "/ipns/" + peer.ToCid(id).String() + key = "/ipns/" + name.String() urlStr := fmt.Sprintf("%s/api/v0/dht/get?arg=%s", ps.getRandomKuboURL(), key) req, err := http.NewRequestWithContext(ctx, http.MethodPost, urlStr, nil) @@ -142,13 +138,12 @@ func (ps *proxyRouting) fetch(ctx context.Context, key string) (rb []byte, err e return nil, err } - var entry ipns_pb.IpnsEntry - err = proto.Unmarshal(rb, &entry) + entry, err := ipns.UnmarshalRecord(rb) if err != nil { return nil, err } - err = ipns.ValidateWithPeerID(id, &entry) + err = ipns.ValidateWithName(entry, name) if err != nil { return nil, err } diff --git a/version.json b/version.json index 49ed23d..0f40898 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.0.12" -} + "version": "v0.0.13" +} \ No newline at end of file