Skip to content

Commit

Permalink
Signal to index provider to skip announcements (#1457)
Browse files Browse the repository at this point in the history
* fix: signal to index provider to skip announcements

* fix: ensure multihash lister skip error is of type ipld.ErrNotExists

---------

Co-authored-by: LexLuthr <lexluthr@protocol.ai>
  • Loading branch information
dirkmc and LexLuthr authored May 23, 2023
1 parent 1a7b193 commit 0de1161
Showing 1 changed file with 96 additions and 37 deletions.
133 changes: 96 additions & 37 deletions indexprovider/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package indexprovider

import (
"context"
"database/sql"
"errors"
"fmt"
"io/fs"
"math"
"net/url"
"os"
"path/filepath"

"github.com/filecoin-project/dagstore/index"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"

gfm_storagemarket "github.com/filecoin-project/boost-gfm/storagemarket"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/markets/idxprov"
Expand All @@ -27,7 +33,7 @@ import (
"github.com/ipni/index-provider/engine/xproviders"
"github.com/ipni/index-provider/metadata"
"github.com/libp2p/go-libp2p/core/crypto"
host "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/fx"
)
Expand Down Expand Up @@ -264,6 +270,16 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error {
return merr
}

// While ingesting cids for each piece, if there is an error the indexer
// checks if the error contains the string "content not found":
// - if so, the indexer skips the piece and continues ingestion
// - if not, the indexer pauses ingestion
var ErrStringSkipAdIngest = "content not found"

func skipError(err error) error {
return fmt.Errorf("%s: %s: %w", ErrStringSkipAdIngest, err.Error(), ipld.ErrNotExists{})
}

func (w *Wrapper) IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) {
e, ok := w.prov.(*engine.Engine)
if !ok {
Expand Down Expand Up @@ -299,42 +315,7 @@ func (w *Wrapper) Start(ctx context.Context) {
log.Errorw("failed to migrate dagstore indices for Boost deals", "err", err)
}

w.prov.RegisterMultihashLister(func(ctx context.Context, pid peer.ID, contextID []byte) (provider.MultihashIterator, error) {
provideF := func(pieceCid cid.Cid) (provider.MultihashIterator, error) {
ii, err := w.dagStore.GetIterableIndexForPiece(pieceCid)
if err != nil {
return nil, fmt.Errorf("failed to get iterable index: %w", err)
}

mhi, err := provider.CarMultihashIterator(ii)
if err != nil {
return nil, fmt.Errorf("failed to get mhiterator: %w", err)
}
return mhi, nil
}

// convert context ID to proposal Cid
proposalCid, err := cid.Cast(contextID)
if err != nil {
return nil, fmt.Errorf("failed to cast context ID to a cid")
}

// go from proposal cid -> piece cid by looking up deal in boost and if we can't find it there -> then markets
// check Boost deals DB
pds, boostErr := w.dealsDB.BySignedProposalCID(ctx, proposalCid)
if boostErr == nil {
pieceCid := pds.ClientDealProposal.Proposal.PieceCID
return provideF(pieceCid)
}

// check in legacy markets
md, legacyErr := w.legacyProv.GetLocalDeal(proposalCid)
if legacyErr == nil {
return provideF(md.Proposal.PieceCID)
}

return nil, fmt.Errorf("failed to look up deal in Boost, err=%s and Legacy Markets, err=%s", boostErr, legacyErr)
})
w.prov.RegisterMultihashLister(w.MultihashLister)

runCtx, runCancel := context.WithCancel(context.Background())
w.stop = runCancel
Expand All @@ -352,6 +333,84 @@ func (w *Wrapper) Start(ctx context.Context) {
}()
}

func (w *Wrapper) MultihashLister(ctx context.Context, prov peer.ID, contextID []byte) (provider.MultihashIterator, error) {
provideF := func(proposalCid cid.Cid, pieceCid cid.Cid) (provider.MultihashIterator, error) {
ii, err := w.dagStore.GetIterableIndexForPiece(pieceCid)
if err != nil {
e := fmt.Errorf("failed to get iterable index: %w", err)
if errors.Is(err, index.ErrNotFound) || errors.Is(err, fs.ErrNotExist) {
// If it's a not found error, skip over this piece and continue ingesting
log.Infow("skipping ingestion: piece not found", "piece", pieceCid, "propCid", proposalCid, "err", e)
return nil, skipError(e)
}

// Some other error, pause ingestion
log.Infow("pausing ingestion: error getting piece", "piece", pieceCid, "propCid", proposalCid, "err", e)
return nil, e
}

mhi, err := provider.CarMultihashIterator(ii)
if err != nil {
// Bad index, skip over this piece and continue ingesting
err = fmt.Errorf("failed to get mhiterator: %w", err)
log.Infow("skipping ingestion", "piece", pieceCid, "propCid", proposalCid, "err", err)
return nil, skipError(err)
}

log.Debugw("returning piece iterator", "piece", pieceCid, "propCid", proposalCid, "err", err)
return mhi, nil
}

// convert context ID to proposal Cid
proposalCid, err := cid.Cast(contextID)
if err != nil {
// Bad contextID, skip over this piece and continue ingesting
err = fmt.Errorf("failed to cast context ID to a cid")
log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err)
return nil, skipError(err)
}

// Look up deal by proposal cid in the boost database.
// If we can't find it there check legacy markets DB.
pds, boostErr := w.dealsDB.BySignedProposalCID(ctx, proposalCid)
if boostErr == nil {
// Found the deal, get an iterator over the piece
pieceCid := pds.ClientDealProposal.Proposal.PieceCID
return provideF(proposalCid, pieceCid)
}

// Check if it's a "not found" error
if !errors.Is(boostErr, sql.ErrNoRows) {
// It's not a "not found" error: there was a problem accessing the
// database. Pause ingestion until the user can fix the DB.
e := fmt.Errorf("getting deal with proposal cid %s from boost database: %w", proposalCid, boostErr)
log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e)
return nil, e
}

// Deal was not found in boost DB - check in legacy markets
md, legacyErr := w.legacyProv.GetLocalDeal(proposalCid)
if legacyErr == nil {
// Found the deal, get an interator over the piece
return provideF(proposalCid, md.Proposal.PieceCID)
}

// Check if it's a "not found" error
if !errors.Is(legacyErr, datastore.ErrNotFound) {
// It's not a "not found" error: there was a problem accessing the
// legacy database. Pause ingestion until the user can fix the legacy DB.
e := fmt.Errorf("getting deal with proposal cid %s from Legacy Markets: %w", proposalCid, legacyErr)
log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e)
return nil, e
}

// The deal was not found in the boost or legacy database.
// Skip this deal and continue ingestion.
err = fmt.Errorf("deal with proposal cid %s not found", proposalCid)
log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err)
return nil, skipError(err)
}

func (w *Wrapper) AnnounceBoostDeal(ctx context.Context, deal *types.ProviderDealState) (cid.Cid, error) {
// Filter out deals that should not be announced
if !deal.AnnounceToIPNI {
Expand Down

0 comments on commit 0de1161

Please sign in to comment.