diff --git a/go.mod b/go.mod index 323a437..cd31926 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,6 @@ require ( go.opentelemetry.io/otel/sdk v1.14.0 go.opentelemetry.io/otel/trace v1.14.0 go.uber.org/atomic v1.10.0 - go.uber.org/multierr v1.9.0 go.uber.org/zap v1.24.0 ) @@ -168,6 +167,7 @@ require ( go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/dig v1.15.0 // indirect go.uber.org/fx v1.18.2 // indirect + go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect golang.org/x/mod v0.7.0 // indirect diff --git a/handlers.go b/handlers.go index ce91c9f..a4beb75 100644 --- a/handlers.go +++ b/handlers.go @@ -149,14 +149,16 @@ func makeGatewayHandler(bs bstore.Blockstore, kuboRPC []string, port int, blockC gwHandler := gateway.NewHandler(gwConf, gwAPI) ipfsHandler := withHTTPMetrics(gwHandler, "ipfs") - ipnsHandler := withHTTPMetrics(gwHandler, "ipns") + //ipnsHandler := withHTTPMetrics(gwHandler, "ipns") + ipnsHandler := gwHandler mux := http.NewServeMux() mux.Handle("/ipfs/", ipfsHandler) mux.Handle("/ipns/", ipnsHandler) // TODO: below is legacy which we want to remove, measuring this separately // allows us to decide when is the time to do it. - legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc") + //legacyKuboRpcHandler := withHTTPMetrics(newKuboRPCHandler(kuboRPC), "legacyKuboRpc") + legacyKuboRpcHandler := newKuboRPCHandler(kuboRPC) mux.Handle("/api/v0/", legacyKuboRpcHandler) // Construct the HTTP handler for the gateway. diff --git a/lib/blockstore_cache.go b/lib/blockstore_cache.go index a41778c..86186cd 100644 --- a/lib/blockstore_cache.go +++ b/lib/blockstore_cache.go @@ -21,36 +21,31 @@ const DefaultCacheBlockStoreSize = 1024 var cacheLog = golog.Logger("cache/block") +var cacheHitsMetric = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "http", + Name: "blockstore_cache_hit", + Help: "The number of global block cache hits.", +}) + +var cacheRequestsMetric = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "ipfs", + Subsystem: "http", + Name: "blockstore_cache_requests", + Help: "The number of global block cache requests.", +}) + +func init() { + prometheus.Register(cacheHitsMetric) + prometheus.Register(cacheRequestsMetric) +} + func NewCacheBlockStore(size int) (blockstore.Blockstore, error) { c, err := lru.New2Q[string, []byte](size) if err != nil { return nil, err } - cacheHitsMetric := prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "ipfs", - Subsystem: "http", - Name: "blockstore_cache_hit", - Help: "The number of global block cache hits.", - }) - - cacheRequestsMetric := prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "ipfs", - Subsystem: "http", - Name: "blockstore_cache_requests", - Help: "The number of global block cache requests.", - }) - - err = prometheus.Register(cacheHitsMetric) - if err != nil { - return nil, err - } - - err = prometheus.Register(cacheRequestsMetric) - if err != nil { - return nil, err - } - return &cacheBlockStore{ cache: c, rehash: uatomic.NewBool(false), diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index d4684cc..c7222f7 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -27,15 +27,12 @@ import ( ipfspath "github.com/ipfs/boxo/path" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" golog "github.com/ipfs/go-log/v2" routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multicodec" - "github.com/multiformats/go-multihash" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/multierr" ) var graphLog = golog.Logger("backend/graph") @@ -87,14 +84,6 @@ type Notifier interface { NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error } -// notifiersForRootCid is used for reducing lock contention by only notifying -// exchanges related to the same content root CID -type notifiersForRootCid struct { - lk sync.RWMutex - deleted int8 - notifiers []Notifier -} - type GraphGateway struct { fetcher CarFetcher blockFetcher exchange.Fetcher @@ -102,8 +91,7 @@ type GraphGateway struct { namesys namesys.NameSystem bstore blockstore.Blockstore - notifiers sync.Map // cid -> notifiersForRootCid - metrics *GraphGatewayMetrics + metrics *GraphGatewayMetrics } type GraphGatewayMetrics struct { @@ -163,7 +151,6 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts .. routing: vs, namesys: ns, bstore: bs, - notifiers: sync.Map{}, metrics: registerGraphGatewayMetrics(), }, nil } @@ -253,18 +240,6 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics { } } -func (api *GraphGateway) getRootOfPath(path string) string { - pth, err := ipfspath.ParsePath(path) - if err != nil { - return path - } - if pth.IsJustAKey() { - return pth.Segments()[0] - } else { - return pth.Segments()[1] - } -} - /* Implementation iteration plan: @@ -276,36 +251,11 @@ Implementation iteration plan: */ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx context.Context, path string) (gateway.IPFSBackend, func(), error) { - bstore := api.bstore - carFetchingExch := newInboundBlockExchange() - doneWithFetcher := make(chan struct{}, 1) - exch := &handoffExchange{ - startingExchange: carFetchingExch, - followupExchange: &blockFetcherExchWrapper{api.blockFetcher}, - bstore: bstore, - handoffCh: doneWithFetcher, - metrics: api.metrics, - } - - notifierKey := api.getRootOfPath(path) - var notifier *notifiersForRootCid - for { - notifiers, _ := api.notifiers.LoadOrStore(notifierKey, ¬ifiersForRootCid{notifiers: []Notifier{}}) - if n, ok := notifiers.(*notifiersForRootCid); ok { - n.lk.Lock() - // could have been deleted after our load. try again. - if n.deleted != 0 { - n.lk.Unlock() - continue - } - notifier = n - n.notifiers = append(n.notifiers, exch) - n.lk.Unlock() - break - } else { - return nil, nil, errors.New("failed to get notifier") - } + bstore, err := NewCacheBlockStore(1024) + if err != nil { + return nil, nil, err } + exch := newBlockExchange(bstore, api.blockFetcher) go func(metrics *GraphGatewayMetrics) { defer func() { @@ -374,18 +324,16 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con return err } metrics.carBlocksFetchedMetric.Inc() - api.notifyOngoingRequests(ctx, notifierKey, blkRead.block) + exch.NotifyNewBlocks(ctx, blkRead.block) } } }) if err != nil { graphLog.Infow("car Fetch failed", "path", path, "error", err) } - if err := carFetchingExch.Close(); err != nil { + if err := exch.Close(); err != nil { graphLog.Errorw("carFetchingExch.Close()", "error", err) } - doneWithFetcher <- struct{}{} - close(doneWithFetcher) }(api.metrics) bserv := blockservice.New(bstore, exch) @@ -394,38 +342,7 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con return nil, nil, err } - return blkgw, func() { - notifier.lk.Lock() - for i, e := range notifier.notifiers { - if e == exch { - notifier.notifiers = append(notifier.notifiers[0:i], notifier.notifiers[i+1:]...) - break - } - } - if len(notifier.notifiers) == 0 { - notifier.deleted = 1 - api.notifiers.Delete(notifierKey) - } - notifier.lk.Unlock() - }, nil -} - -func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) { - if notifiers, ok := api.notifiers.Load(key); ok { - notifier, ok := notifiers.(*notifiersForRootCid) - if !ok { - graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", "could not get notifiersForRootCid") - return - } - notifier.lk.RLock() - for _, n := range notifier.notifiers { - err := n.NotifyNewBlocks(ctx, blks...) - if err != nil { - graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", err) - } - } - notifier.lk.RUnlock() - } + return blkgw, func() {}, nil } type fileCloseWrapper struct { @@ -662,184 +579,72 @@ func (api *GraphGateway) GetDNSLinkRecord(ctx context.Context, hostname string) var _ gateway.IPFSBackend = (*GraphGateway)(nil) -type inboundBlockExchange struct { - ps BlockPubSub -} - -func newInboundBlockExchange() *inboundBlockExchange { - return &inboundBlockExchange{ - ps: NewBlockPubSub(), - } -} +type blockingExchange struct { + notify chan struct{} + nl sync.Mutex -func (i *inboundBlockExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - blk, more := <-i.ps.Subscribe(ctx, c.Hash()) - if err := ctx.Err(); err != nil { - return nil, err - } - if !more { - return nil, format.ErrNotFound{Cid: c} - } - return blk, nil + bstore blockstore.Blockstore + f exchange.Fetcher } -func (i *inboundBlockExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - mhMap := make(map[string]struct{}) - for _, c := range cids { - mhMap[string(c.Hash())] = struct{}{} - } - mhs := make([]multihash.Multihash, 0, len(mhMap)) - for k := range mhMap { - mhs = append(mhs, multihash.Multihash(k)) +func newBlockExchange(bstore blockstore.Blockstore, fetcher exchange.Fetcher) *blockingExchange { + return &blockingExchange{ + notify: make(chan struct{}), + bstore: bstore, + f: fetcher, } - return i.ps.Subscribe(ctx, mhs...), nil -} - -func (i *inboundBlockExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { - // TODO: handle context cancellation and/or blockage here - i.ps.Publish(blocks...) - return nil } -func (i *inboundBlockExchange) Close() error { - i.ps.Shutdown() - return nil -} - -var _ exchange.Interface = (*inboundBlockExchange)(nil) - -type handoffExchange struct { - startingExchange, followupExchange exchange.Interface - bstore blockstore.Blockstore - handoffCh <-chan struct{} - metrics *GraphGatewayMetrics -} +func (b *blockingExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + <-b.notify -func (f *handoffExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - blkCh, err := f.startingExchange.GetBlocks(ctx, []cid.Cid{c}) - if err != nil { + if err := ctx.Err(); err != nil { return nil, err } - blk, ok := <-blkCh - if ok { - return blk, nil - } - select { - case <-f.handoffCh: - graphLog.Debugw("switching to backup block fetcher", "cid", c) - f.metrics.blockRecoveryAttemptMetric.Inc() - return f.followupExchange.GetBlock(ctx, c) - case <-ctx.Done(): - return nil, ctx.Err() + if blk, err := b.bstore.Get(ctx, c); err == nil { + return blk, nil } + return b.f.GetBlock(ctx, c) } -func (f *handoffExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - blkCh, err := f.startingExchange.GetBlocks(ctx, cids) - if err != nil { - return nil, err - } - - retCh := make(chan blocks.Block) +func (b *blockingExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + ch := make(chan blocks.Block) - go func() { - cs := cid.NewSet() - for cs.Len() < len(cids) { - blk, ok := <-blkCh - if !ok { - break - } - select { - case retCh <- blk: - cs.Add(blk.Cid()) - case <-ctx.Done(): - } - } - - for cs.Len() < len(cids) { - select { - case <-ctx.Done(): + go func(ctx context.Context, cids []cid.Cid) { + for _, c := range cids { + blk, err := b.GetBlock(ctx, c) + if ctx.Err() != nil { return - case <-f.handoffCh: - var newCidArr []cid.Cid - for _, c := range cids { - if !cs.Has(c) { - blk, _ := f.bstore.Get(ctx, c) - if blk != nil { - select { - case retCh <- blk: - cs.Add(blk.Cid()) - case <-ctx.Done(): - return - } - } else { - newCidArr = append(newCidArr, c) - } - } - } - - if len(newCidArr) == 0 { - return - } - - graphLog.Debugw("needed to use use a backup fetcher for cids", "cids", newCidArr) - f.metrics.blockRecoveryAttemptMetric.Add(float64(len(newCidArr))) - fch, err := f.followupExchange.GetBlocks(ctx, newCidArr) - if err != nil { - graphLog.Errorw("error getting blocks from followupExchange", "error", err) - return - } - for cs.Len() < len(cids) { - blk, ok := <-fch - if !ok { - return - } - select { - case retCh <- blk: - cs.Add(blk.Cid()) - case <-ctx.Done(): - return - } - } + } + if err == nil { + ch <- blk } } - }() - return retCh, nil -} - -func (f *handoffExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { - err1 := f.startingExchange.NotifyNewBlocks(ctx, blocks...) - err2 := f.followupExchange.NotifyNewBlocks(ctx, blocks...) - return multierr.Combine(err1, err2) -} + }(ctx, cids) -func (f *handoffExchange) Close() error { - err1 := f.startingExchange.Close() - err2 := f.followupExchange.Close() - return multierr.Combine(err1, err2) + return ch, nil } -var _ exchange.Interface = (*handoffExchange)(nil) - -type blockFetcherExchWrapper struct { - f exchange.Fetcher -} - -func (b *blockFetcherExchWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - return b.f.GetBlock(ctx, c) -} - -func (b *blockFetcherExchWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - return b.f.GetBlocks(ctx, cids) -} - -func (b *blockFetcherExchWrapper) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { +func (b *blockingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + b.nl.Lock() + defer b.nl.Unlock() + on := b.notify + if on == nil { + return nil + } + b.notify = make(chan struct{}) + close(on) return nil } -func (b *blockFetcherExchWrapper) Close() error { +func (b *blockingExchange) Close() error { + b.nl.Lock() + on := b.notify + b.notify = nil + close(on) + b.nl.Unlock() return nil } -var _ exchange.Interface = (*blockFetcherExchWrapper)(nil) +var _ exchange.Interface = (*blockingExchange)(nil)