From b0c9da8d3c70b713b6b57082acf57799fcbb7c64 Mon Sep 17 00:00:00 2001 From: Will Date: Tue, 18 Jul 2023 20:08:21 +0000 Subject: [PATCH] Revert "Unshared blockstore (#150)" This reverts commit edb5cd4c4e0b68bf55c9b1a05f9d1317f54e404b. --- go.mod | 2 +- handlers.go | 6 +- lib/blockstore_cache.go | 43 +++--- lib/graph_gateway.go | 297 +++++++++++++++++++++++++++++++++------- 4 files changed, 273 insertions(+), 75 deletions(-) diff --git a/go.mod b/go.mod index 8781520..f86e35d 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( go.opentelemetry.io/otel/sdk v1.14.0 go.opentelemetry.io/otel/trace v1.14.0 go.uber.org/atomic v1.10.0 + go.uber.org/multierr v1.9.0 go.uber.org/zap v1.24.0 ) @@ -166,7 +167,6 @@ require ( go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/dig v1.15.0 // indirect go.uber.org/fx v1.18.2 // indirect - go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect golang.org/x/mod v0.7.0 // indirect diff --git a/handlers.go b/handlers.go index c83730b..4cdc046 100644 --- a/handlers.go +++ b/handlers.go @@ -149,16 +149,14 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC gwHandler := gateway.NewHandler(gwConf, gwAPI) ipfsHandler := withHTTPMetrics(gwHandler, "ipfs") - //ipnsHandler := withHTTPMetrics(gwHandler, "ipns") - ipnsHandler := gwHandler + ipnsHandler := withHTTPMetrics(gwHandler, "ipns") mux := http.NewServeMux() mux.Handle("/ipfs/", ipfsHandler) mux.Handle("/ipns/", ipnsHandler) // TODO: below is legacy which we want to remove, measuring this separately // allows us to decide when is the time to do it. - //legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc") - legacyKuboRpcHandler := newKuboRPCHandler(kuboRPC) + legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc") mux.Handle("/api/v0/", legacyKuboRpcHandler) // Construct the HTTP handler for the gateway. diff --git a/lib/blockstore_cache.go b/lib/blockstore_cache.go index 86186cd..a41778c 100644 --- a/lib/blockstore_cache.go +++ b/lib/blockstore_cache.go @@ -21,31 +21,36 @@ const DefaultCacheBlockStoreSize = 1024 var cacheLog = golog.Logger("cache/block") -var cacheHitsMetric = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "ipfs", - Subsystem: "http", - Name: "blockstore_cache_hit", - Help: "The number of global block cache hits.", -}) - -var cacheRequestsMetric = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "ipfs", - Subsystem: "http", - Name: "blockstore_cache_requests", - Help: "The number of global block cache requests.", -}) - -func init() { - prometheus.Register(cacheHitsMetric) - prometheus.Register(cacheRequestsMetric) -} - func NewCacheBlockStore(size int) (blockstore.Blockstore, error) { c, err := lru.New2Q[string, []byte](size) if err != nil { return nil, err } + cacheHitsMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "http", + Name: "blockstore_cache_hit", + Help: "The number of global block cache hits.", + }) + + cacheRequestsMetric := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "http", + Name: "blockstore_cache_requests", + Help: "The number of global block cache requests.", + }) + + err = prometheus.Register(cacheHitsMetric) + if err != nil { + return nil, err + } + + err = prometheus.Register(cacheRequestsMetric) + if err != nil { + return nil, err + } + return &cacheBlockStore{ cache: c, rehash: uatomic.NewBool(false), diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index d4b7f78..a0da38a 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -27,12 +27,15 @@ import ( ipfspath "github.com/ipfs/boxo/path" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" golog "github.com/ipfs/go-log/v2" routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-multihash" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/multierr" ) var graphLog = golog.Logger("backend/graph") @@ -84,6 +87,14 @@ type Notifier interface { NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error } +// notifiersForRootCid is used for reducing lock contention by only notifying +// exchanges related to the same content root CID +type notifiersForRootCid struct { + lk sync.RWMutex + deleted int8 + notifiers []Notifier +} + type GraphGateway struct { fetcher CarFetcher blockFetcher exchange.Fetcher @@ -91,7 +102,8 @@ type GraphGateway struct { namesys namesys.NameSystem bstore blockstore.Blockstore - metrics *GraphGatewayMetrics + notifiers sync.Map // cid -> notifiersForRootCid + metrics *GraphGatewayMetrics } type GraphGatewayMetrics struct { @@ -151,6 +163,7 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts .. routing: vs, namesys: ns, bstore: bs, + notifiers: sync.Map{}, metrics: registerGraphGatewayMetrics(), }, nil } @@ -240,6 +253,18 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics { } } +func (api *GraphGateway) getRootOfPath(path string) string { + pth, err := ipfspath.ParsePath(path) + if err != nil { + return path + } + if pth.IsJustAKey() { + return pth.Segments()[0] + } else { + return pth.Segments()[1] + } +} + /* Implementation iteration plan: @@ -251,11 +276,36 @@ Implementation iteration plan: */ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx context.Context, path string) (gateway.IPFSBackend, func(), error) { - bstore, err := NewCacheBlockStore(1024) - if err != nil { - return nil, nil, err + bstore := api.bstore + carFetchingExch := newInboundBlockExchange() + doneWithFetcher := make(chan struct{}, 1) + exch := &handoffExchange{ + startingExchange: carFetchingExch, + followupExchange: &blockFetcherExchWrapper{api.blockFetcher}, + bstore: bstore, + handoffCh: doneWithFetcher, + metrics: api.metrics, + } + + notifierKey := api.getRootOfPath(path) + var notifier *notifiersForRootCid + for { + notifiers, _ := api.notifiers.LoadOrStore(notifierKey, ¬ifiersForRootCid{notifiers: []Notifier{}}) + if n, ok := notifiers.(*notifiersForRootCid); ok { + n.lk.Lock() + // could have been deleted after our load. try again. + if n.deleted != 0 { + n.lk.Unlock() + continue + } + notifier = n + n.notifiers = append(n.notifiers, exch) + n.lk.Unlock() + break + } else { + return nil, nil, errors.New("failed to get notifier") + } } - exch := newBlockExchange(bstore, api.blockFetcher) go func(metrics *GraphGatewayMetrics) { defer func() { @@ -324,16 +374,18 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con return err } metrics.carBlocksFetchedMetric.Inc() - exch.NotifyNewBlocks(ctx, blkRead.block) + api.notifyOngoingRequests(ctx, notifierKey, blkRead.block) } } }) if err != nil { graphLog.Infow("car Fetch failed", "path", path, "error", err) } - if err := exch.Close(); err != nil { + if err := carFetchingExch.Close(); err != nil { graphLog.Errorw("carFetchingExch.Close()", "error", err) } + doneWithFetcher <- struct{}{} + close(doneWithFetcher) }(api.metrics) bserv := blockservice.New(bstore, exch) @@ -342,7 +394,38 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con return nil, nil, err } - return blkgw, func() {}, nil + return blkgw, func() { + notifier.lk.Lock() + for i, e := range notifier.notifiers { + if e == exch { + notifier.notifiers = append(notifier.notifiers[0:i], notifier.notifiers[i+1:]...) + break + } + } + if len(notifier.notifiers) == 0 { + notifier.deleted = 1 + api.notifiers.Delete(notifierKey) + } + notifier.lk.Unlock() + }, nil +} + +func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) { + if notifiers, ok := api.notifiers.Load(key); ok { + notifier, ok := notifiers.(*notifiersForRootCid) + if !ok { + graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", "could not get notifiersForRootCid") + return + } + notifier.lk.RLock() + for _, n := range notifier.notifiers { + err := n.NotifyNewBlocks(ctx, blks...) + if err != nil { + graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", err) + } + } + notifier.lk.RUnlock() + } } type fileCloseWrapper struct { @@ -579,72 +662,184 @@ func (api *GraphGateway) GetDNSLinkRecord(ctx context.Context, hostname string) var _ gateway.IPFSBackend = (*GraphGateway)(nil) -type blockingExchange struct { - notify chan struct{} - nl sync.Mutex - - bstore blockstore.Blockstore - f exchange.Fetcher +type inboundBlockExchange struct { + ps BlockPubSub } -func newBlockExchange(bstore blockstore.Blockstore, fetcher exchange.Fetcher) *blockingExchange { - return &blockingExchange{ - notify: make(chan struct{}), - bstore: bstore, - f: fetcher, +func newInboundBlockExchange() *inboundBlockExchange { + return &inboundBlockExchange{ + ps: NewBlockPubSub(), } } -func (b *blockingExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - <-b.notify - +func (i *inboundBlockExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blk, more := <-i.ps.Subscribe(ctx, c.Hash()) if err := ctx.Err(); err != nil { return nil, err } + if !more { + return nil, format.ErrNotFound{Cid: c} + } + return blk, nil +} + +func (i *inboundBlockExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + mhMap := make(map[string]struct{}) + for _, c := range cids { + mhMap[string(c.Hash())] = struct{}{} + } + mhs := make([]multihash.Multihash, 0, len(mhMap)) + for k := range mhMap { + mhs = append(mhs, multihash.Multihash(k)) + } + return i.ps.Subscribe(ctx, mhs...), nil +} + +func (i *inboundBlockExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + // TODO: handle context cancellation and/or blockage here + i.ps.Publish(blocks...) + return nil +} + +func (i *inboundBlockExchange) Close() error { + i.ps.Shutdown() + return nil +} + +var _ exchange.Interface = (*inboundBlockExchange)(nil) + +type handoffExchange struct { + startingExchange, followupExchange exchange.Interface + bstore blockstore.Blockstore + handoffCh <-chan struct{} + metrics *GraphGatewayMetrics +} - if blk, err := b.bstore.Get(ctx, c); err == nil { +func (f *handoffExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blkCh, err := f.startingExchange.GetBlocks(ctx, []cid.Cid{c}) + if err != nil { + return nil, err + } + blk, ok := <-blkCh + if ok { return blk, nil } - return b.f.GetBlock(ctx, c) + + select { + case <-f.handoffCh: + graphLog.Debugw("switching to backup block fetcher", "cid", c) + f.metrics.blockRecoveryAttemptMetric.Inc() + return f.followupExchange.GetBlock(ctx, c) + case <-ctx.Done(): + return nil, ctx.Err() + } } -func (b *blockingExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - ch := make(chan blocks.Block) +func (f *handoffExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + blkCh, err := f.startingExchange.GetBlocks(ctx, cids) + if err != nil { + return nil, err + } - go func(ctx context.Context, cids []cid.Cid) { - for _, c := range cids { - blk, err := b.GetBlock(ctx, c) - if ctx.Err() != nil { - return + retCh := make(chan blocks.Block) + + go func() { + cs := cid.NewSet() + for cs.Len() < len(cids) { + blk, ok := <-blkCh + if !ok { + break + } + select { + case retCh <- blk: + cs.Add(blk.Cid()) + case <-ctx.Done(): } - if err == nil { - ch <- blk + } + + for cs.Len() < len(cids) { + select { + case <-ctx.Done(): + return + case <-f.handoffCh: + var newCidArr []cid.Cid + for _, c := range cids { + if !cs.Has(c) { + blk, _ := f.bstore.Get(ctx, c) + if blk != nil { + select { + case retCh <- blk: + cs.Add(blk.Cid()) + case <-ctx.Done(): + return + } + } else { + newCidArr = append(newCidArr, c) + } + } + } + + if len(newCidArr) == 0 { + return + } + + graphLog.Debugw("needed to use use a backup fetcher for cids", "cids", newCidArr) + f.metrics.blockRecoveryAttemptMetric.Add(float64(len(newCidArr))) + fch, err := f.followupExchange.GetBlocks(ctx, newCidArr) + if err != nil { + graphLog.Errorw("error getting blocks from followupExchange", "error", err) + return + } + for cs.Len() < len(cids) { + blk, ok := <-fch + if !ok { + return + } + select { + case retCh <- blk: + cs.Add(blk.Cid()) + case <-ctx.Done(): + return + } + } } } - }(ctx, cids) + }() + return retCh, nil +} - return ch, nil +func (f *handoffExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + err1 := f.startingExchange.NotifyNewBlocks(ctx, blocks...) + err2 := f.followupExchange.NotifyNewBlocks(ctx, blocks...) + return multierr.Combine(err1, err2) } -func (b *blockingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { - b.nl.Lock() - defer b.nl.Unlock() - on := b.notify - if on == nil { - return nil - } - b.notify = make(chan struct{}) - close(on) +func (f *handoffExchange) Close() error { + err1 := f.startingExchange.Close() + err2 := f.followupExchange.Close() + return multierr.Combine(err1, err2) +} + +var _ exchange.Interface = (*handoffExchange)(nil) + +type blockFetcherExchWrapper struct { + f exchange.Fetcher +} + +func (b *blockFetcherExchWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + return b.f.GetBlock(ctx, c) +} + +func (b *blockFetcherExchWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + return b.f.GetBlocks(ctx, cids) +} + +func (b *blockFetcherExchWrapper) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { return nil } -func (b *blockingExchange) Close() error { - b.nl.Lock() - on := b.notify - b.notify = nil - close(on) - b.nl.Unlock() +func (b *blockFetcherExchWrapper) Close() error { return nil } -var _ exchange.Interface = (*blockingExchange)(nil) +var _ exchange.Interface = (*blockFetcherExchWrapper)(nil)