From 0de116161b1d4a51a8cc029e6457fdc63261218f Mon Sep 17 00:00:00 2001 From: dirkmc Date: Tue, 23 May 2023 10:57:40 +0200 Subject: [PATCH] Signal to index provider to skip announcements (#1457) * fix: signal to index provider to skip announcements * fix: ensure multihash lister skip error is of type ipld.ErrNotExists --------- Co-authored-by: LexLuthr --- indexprovider/wrapper.go | 133 ++++++++++++++++++++++++++++----------- 1 file changed, 96 insertions(+), 37 deletions(-) diff --git a/indexprovider/wrapper.go b/indexprovider/wrapper.go index fc25cb2c9..14e7e0c05 100644 --- a/indexprovider/wrapper.go +++ b/indexprovider/wrapper.go @@ -2,13 +2,19 @@ package indexprovider import ( "context" + "database/sql" "errors" "fmt" + "io/fs" "math" "net/url" "os" "path/filepath" + "github.com/filecoin-project/dagstore/index" + "github.com/ipfs/go-datastore" + "github.com/ipld/go-ipld-prime" + gfm_storagemarket "github.com/filecoin-project/boost-gfm/storagemarket" "github.com/filecoin-project/boost/db" "github.com/filecoin-project/boost/markets/idxprov" @@ -27,7 +33,7 @@ import ( "github.com/ipni/index-provider/engine/xproviders" "github.com/ipni/index-provider/metadata" "github.com/libp2p/go-libp2p/core/crypto" - host "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/fx" ) @@ -264,6 +270,16 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error { return merr } +// While ingesting cids for each piece, if there is an error the indexer +// checks if the error contains the string "content not found": +// - if so, the indexer skips the piece and continues ingestion +// - if not, the indexer pauses ingestion +var ErrStringSkipAdIngest = "content not found" + +func skipError(err error) error { + return fmt.Errorf("%s: %s: %w", ErrStringSkipAdIngest, err.Error(), ipld.ErrNotExists{}) +} + func (w *Wrapper) IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) { e, ok := w.prov.(*engine.Engine) if !ok { @@ -299,42 +315,7 @@ func (w *Wrapper) Start(ctx context.Context) { log.Errorw("failed to migrate dagstore indices for Boost deals", "err", err) } - w.prov.RegisterMultihashLister(func(ctx context.Context, pid peer.ID, contextID []byte) (provider.MultihashIterator, error) { - provideF := func(pieceCid cid.Cid) (provider.MultihashIterator, error) { - ii, err := w.dagStore.GetIterableIndexForPiece(pieceCid) - if err != nil { - return nil, fmt.Errorf("failed to get iterable index: %w", err) - } - - mhi, err := provider.CarMultihashIterator(ii) - if err != nil { - return nil, fmt.Errorf("failed to get mhiterator: %w", err) - } - return mhi, nil - } - - // convert context ID to proposal Cid - proposalCid, err := cid.Cast(contextID) - if err != nil { - return nil, fmt.Errorf("failed to cast context ID to a cid") - } - - // go from proposal cid -> piece cid by looking up deal in boost and if we can't find it there -> then markets - // check Boost deals DB - pds, boostErr := w.dealsDB.BySignedProposalCID(ctx, proposalCid) - if boostErr == nil { - pieceCid := pds.ClientDealProposal.Proposal.PieceCID - return provideF(pieceCid) - } - - // check in legacy markets - md, legacyErr := w.legacyProv.GetLocalDeal(proposalCid) - if legacyErr == nil { - return provideF(md.Proposal.PieceCID) - } - - return nil, fmt.Errorf("failed to look up deal in Boost, err=%s and Legacy Markets, err=%s", boostErr, legacyErr) - }) + w.prov.RegisterMultihashLister(w.MultihashLister) runCtx, runCancel := context.WithCancel(context.Background()) w.stop = runCancel @@ -352,6 +333,84 @@ func (w *Wrapper) Start(ctx context.Context) { }() } +func (w *Wrapper) MultihashLister(ctx context.Context, prov peer.ID, contextID []byte) (provider.MultihashIterator, error) { + provideF := func(proposalCid cid.Cid, pieceCid cid.Cid) (provider.MultihashIterator, error) { + ii, err := w.dagStore.GetIterableIndexForPiece(pieceCid) + if err != nil { + e := fmt.Errorf("failed to get iterable index: %w", err) + if errors.Is(err, index.ErrNotFound) || errors.Is(err, fs.ErrNotExist) { + // If it's a not found error, skip over this piece and continue ingesting + log.Infow("skipping ingestion: piece not found", "piece", pieceCid, "propCid", proposalCid, "err", e) + return nil, skipError(e) + } + + // Some other error, pause ingestion + log.Infow("pausing ingestion: error getting piece", "piece", pieceCid, "propCid", proposalCid, "err", e) + return nil, e + } + + mhi, err := provider.CarMultihashIterator(ii) + if err != nil { + // Bad index, skip over this piece and continue ingesting + err = fmt.Errorf("failed to get mhiterator: %w", err) + log.Infow("skipping ingestion", "piece", pieceCid, "propCid", proposalCid, "err", err) + return nil, skipError(err) + } + + log.Debugw("returning piece iterator", "piece", pieceCid, "propCid", proposalCid, "err", err) + return mhi, nil + } + + // convert context ID to proposal Cid + proposalCid, err := cid.Cast(contextID) + if err != nil { + // Bad contextID, skip over this piece and continue ingesting + err = fmt.Errorf("failed to cast context ID to a cid") + log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err) + return nil, skipError(err) + } + + // Look up deal by proposal cid in the boost database. + // If we can't find it there check legacy markets DB. + pds, boostErr := w.dealsDB.BySignedProposalCID(ctx, proposalCid) + if boostErr == nil { + // Found the deal, get an iterator over the piece + pieceCid := pds.ClientDealProposal.Proposal.PieceCID + return provideF(proposalCid, pieceCid) + } + + // Check if it's a "not found" error + if !errors.Is(boostErr, sql.ErrNoRows) { + // It's not a "not found" error: there was a problem accessing the + // database. Pause ingestion until the user can fix the DB. + e := fmt.Errorf("getting deal with proposal cid %s from boost database: %w", proposalCid, boostErr) + log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e) + return nil, e + } + + // Deal was not found in boost DB - check in legacy markets + md, legacyErr := w.legacyProv.GetLocalDeal(proposalCid) + if legacyErr == nil { + // Found the deal, get an interator over the piece + return provideF(proposalCid, md.Proposal.PieceCID) + } + + // Check if it's a "not found" error + if !errors.Is(legacyErr, datastore.ErrNotFound) { + // It's not a "not found" error: there was a problem accessing the + // legacy database. Pause ingestion until the user can fix the legacy DB. + e := fmt.Errorf("getting deal with proposal cid %s from Legacy Markets: %w", proposalCid, legacyErr) + log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e) + return nil, e + } + + // The deal was not found in the boost or legacy database. + // Skip this deal and continue ingestion. + err = fmt.Errorf("deal with proposal cid %s not found", proposalCid) + log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err) + return nil, skipError(err) +} + func (w *Wrapper) AnnounceBoostDeal(ctx context.Context, deal *types.ProviderDealState) (cid.Cid, error) { // Filter out deals that should not be announced if !deal.AnnounceToIPNI {