Skip to content

Commit

Permalink
Do not save empty advertisements to mirror (#2471)
Browse files Browse the repository at this point in the history
Empty advertisements should not be saved to mirror since they do not help index ingestion. Advertisements may have no multihash entries when:

1. The ad removes a context ID
  - When ingesting ads, the indexer does not read removal ads from the mirror since there is no entry data to get.
2. The ad only updates metadata or provider information
  - The indexer stores the metadata, but does not read mirror since there are no entries
3. The ad was deleted by a removal ad later in the chain
  - Once an ad is known to be deleted, that ad will never be read from the mirror since its content is deleted
4. The publisher is not serving entries data
  - This is treated like a no-content ad, as in case 2, but means that at some point in the past the ad had entry data. The is or should be a pending unpublished removal later in the chain. So, do not save this in a mirror since it will end up being deleted later, or may be a temporary publisher that should be queried by another indexer using the mirror. In either case the advertisement should not be written to the mirror.

GC also removes any empty (no-content) advertisements from the mirror, and reindexing must not repopulate them.

Other changes:
- GC always logs number of indexes removed
- Always HAMT entries which are not mirrored
  • Loading branch information
gammazero authored Jan 10, 2024
1 parent 0999b39 commit 87bebf0
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 101 deletions.
107 changes: 48 additions & 59 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func (ing *Ingester) MarkAdProcessed(publisher peer.ID, adCid cid.Cid) error {
return ing.markAdProcessed(publisher, adCid, false, false)
}

func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, keep bool) error {
func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, mirrored bool) error {
cidStr := adCid.String()
ctx := context.Background()
if frozen {
Expand All @@ -591,7 +591,7 @@ func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, k
return err
}

if !keep || frozen {
if !mirrored || frozen {
// This ad is processed, so remove it from the datastore.
err = ing.dsTmp.Delete(ctx, datastore.NewKey(cidStr))
if err != nil {
Expand Down Expand Up @@ -1158,24 +1158,11 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as
"adCid", ai.cid,
"progress", fmt.Sprintf("%d of %d", count, total))

keep := ing.mirror.canWrite()
if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, keep); markErr != nil {
if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, false); markErr != nil {
log.Errorw("Failed to mark ad as processed", "err", markErr)
}
if !frozen && keep {
overwrite := ai.resync && ing.overwriteMirrorOnResync
// Write the advertisement to a CAR file, but omit the entries.
carInfo, err := ing.mirror.write(ctx, ai.cid, true, overwrite)
if err != nil {
if !errors.Is(err, fs.ErrExist) {
// Log the error, but do not return. Continue on to save the procesed ad.
log.Errorw("Cannot write advertisement to CAR file", "err", err)
}
// else car file already exists
} else {
log.Infow("Wrote CAR for skipped advertisement", "path", carInfo.Path, "size", carInfo.Size)
}
}
// Do not write removed ads to mirror. They are not read during
// indexing, and those deleted in the future are removed by GC.

// Distribute the atProcessedEvent notices to waiting Sync calls.
ing.inEvents <- adProcessedEvent{
Expand All @@ -1192,56 +1179,58 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as
"progress", fmt.Sprintf("%d of %d", count, total),
"lag", lag)

err := ing.ingestAd(ctx, assignment.publisher, ai.cid, ai.resync, frozen, lag, headProvider)
if err == nil {
// No error at all, this ad was processed successfully.
stats.Record(context.Background(), metrics.AdIngestSuccessCount.M(1))
}

var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
case adIngestDecodingErr, adIngestMalformedErr, adIngestEntryChunkErr, adIngestContentNotFound:
// These error cases are permanent. If retried later the same
// error will happen. So log and drop this error.
log.Errorw("Skipping ad because of a permanent error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state)
stats.Record(context.Background(), metrics.AdIngestSkippedCount.M(1))
err = nil
}
stats.RecordWithOptions(context.Background(),
stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
stats.WithTags(tag.Insert(metrics.ErrKind, string(adIngestErr.state))))
} else if err != nil {
stats.RecordWithOptions(context.Background(),
stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
stats.WithTags(tag.Insert(metrics.ErrKind, "other error")))
}

hasEnts, err := ing.ingestAd(ctx, assignment.publisher, ai.cid, ai.resync, frozen, lag, headProvider)
if err != nil {
errText := err.Error()
if errors.Is(err, errInternal) {
errText = errInternal.Error()
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
case adIngestDecodingErr, adIngestMalformedErr, adIngestEntryChunkErr, adIngestContentNotFound:
// These error cases are permanent. If retried later the same
// error will happen. So log and drop this error.
log.Errorw("Skipping ad because of a permanent error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state)
stats.Record(context.Background(), metrics.AdIngestSkippedCount.M(1))
err = nil
}
stats.RecordWithOptions(context.Background(),
stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
stats.WithTags(tag.Insert(metrics.ErrKind, string(adIngestErr.state))))
} else {
stats.RecordWithOptions(context.Background(),
stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
stats.WithTags(tag.Insert(metrics.ErrKind, "other error")))
}
ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText))
log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1)
// Tell anyone waiting that the sync finished for this head because
// of error. TODO(mm) would be better to propagate the error.
ing.inEvents <- adProcessedEvent{
publisher: assignment.publisher,
headAdCid: headAdCid,
adCid: ai.cid,
err: err,

// If err still not nil, then this is a non-permanent type of error.
if err != nil {
errText := err.Error()
if errors.Is(err, errInternal) {
errText = errInternal.Error()
}
ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText))
log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1)
// Tell anyone waiting that the sync finished for this head because
// of error. TODO(mm) would be better to propagate the error.
ing.inEvents <- adProcessedEvent{
publisher: assignment.publisher,
headAdCid: headAdCid,
adCid: ai.cid,
err: err,
}
return
}
return
} else {
// No error at all, this ad was processed successfully.
stats.Record(context.Background(), metrics.AdIngestSuccessCount.M(1))
}

ing.reg.SetLastError(provider, nil)

keep := ing.mirror.canWrite()
if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, keep); markErr != nil {
mirrored := hasEnts && ing.mirror.canWrite()
if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, mirrored); markErr != nil {
log.Errorw("Failed to mark ad as processed", "err", markErr)
}

if !frozen && keep {
if !frozen && mirrored {
overwrite := ai.resync && ing.overwriteMirrorOnResync
carInfo, err := ing.mirror.write(ctx, ai.cid, false, overwrite)
if err != nil {
Expand Down
76 changes: 37 additions & 39 deletions internal/ingest/linksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func verifyAdvertisement(n ipld.Node, reg *registry.Registry) (peer.ID, error) {
// source of the indexed content, the provider is where content can be
// retrieved from. It is the provider ID that needs to be stored by the
// indexer.
func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) error {
func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) (bool, error) {
log := log.With("publisher", publisherID, "adCid", adCid)

ad, err := ing.loadAd(adCid)
Expand All @@ -147,7 +147,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
log.Errorw("Failed to load advertisement, skipping", "err", err)
// The ad cannot be loaded, so we cannot process it. Return nil so that
// the ad is marked as processed and is removed from the datastore.
return nil
return false, nil
}

stats.Record(ctx, metrics.IngestChange.M(1))
Expand All @@ -164,17 +164,15 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
stats.Record(ctx, metrics.AdIngestLatency.M(elapsedMsec))
log.Infow("Finished syncing advertisement", "elapsed", elapsed.String(), "multihashes", mhCount)

if mhCount == 0 {
return
}

// Record multihashes rate per provider.
elapsed = now.Sub(entsSyncStart)
ing.ingestRates.Update(string(headProvider.ID), uint64(mhCount), elapsed)
if mhCount != 0 {
// Record multihashes rate per provider.
elapsed = now.Sub(entsSyncStart)
ing.ingestRates.Update(string(headProvider.ID), uint64(mhCount), elapsed)

// Record how long entries sync took.
elapsedMsec = float64(elapsed.Nanoseconds()) / 1e6
stats.Record(ctx, metrics.EntriesSyncLatency.M(elapsedMsec))
// Record how long entries sync took.
elapsedMsec = float64(elapsed.Nanoseconds()) / 1e6
stats.Record(ctx, metrics.EntriesSyncLatency.M(elapsedMsec))
}
}()

// Since all advertisements in an assignment have the same provider,
Expand Down Expand Up @@ -205,11 +203,11 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
var extendedProviders *registry.ExtendedProviders
if ad.ExtendedProvider != nil {
if ad.IsRm {
return adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")}
}

if len(ad.ContextID) == 0 && ad.ExtendedProvider.Override {
return adIngestError{adIngestIndexerErr, fmt.Errorf("override can not be set on extended provider without context id")}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("override can not be set on extended provider without context id")}
}

extendedProviders = &registry.ExtendedProviders{
Expand All @@ -222,7 +220,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
for _, ep := range ad.ExtendedProvider.Providers {
epID, err := peer.Decode(ep.ID)
if err != nil {
return adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not decode extended provider id: %w", err)}
return false, adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not decode extended provider id: %w", err)}
}

eProvs = append(eProvs, registry.ExtendedProviderInfo{
Expand Down Expand Up @@ -259,7 +257,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
// to the chain in the future may have a valid address than can be
// used, allowing all the previous ads without valid addresses to be
// processed.
return adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not update provider info: %w", err)}
return false, adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not update provider info: %w", err)}
}

log = log.With("contextID", base64.StdEncoding.EncodeToString(ad.ContextID))
Expand All @@ -268,18 +266,18 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
log.Infow("Advertisement is for removal by context id")
err = ing.indexer.RemoveProviderContext(providerID, ad.ContextID)
if err != nil {
return adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)}
}
return nil
return false, nil
}

if len(ad.Metadata) == 0 {
// If the ad has no metadata and no entries, then the ad is only for
// updating provider addresses. Otherwise it is an error.
if ad.Entries != schema.NoEntries {
return adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")}
return false, adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")}
}
return nil
return false, nil
}

// If advertisement has no entries, then it is for updating metadata only.
Expand All @@ -298,14 +296,14 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
}
err = ing.indexer.Put(value)
if err != nil {
return adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to update metadata: %w", errInternal, err)}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to update metadata: %w", errInternal, err)}
}
return nil
return false, nil
}

entriesCid := ad.Entries.(cidlink.Link).Cid
if entriesCid == cid.Undef {
return adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")}
return false, adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")}
}

if ing.syncTimeout != 0 {
Expand All @@ -319,23 +317,24 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
// If using a CAR reader, then try to get the advertisement CAR file first.
if ing.mirror.canRead() {
mhCount, err = ing.ingestEntriesFromCar(ctx, ad, providerID, adCid, entriesCid, log)
hasEnts := mhCount != 0
// If entries data successfully read from CAR file.
if err == nil {
ing.mhsFromMirror.Add(uint64(mhCount))
return nil
return hasEnts, nil
}
if !errors.Is(err, fs.ErrNotExist) {
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
case adIngestIndexerErr:
// Could not store multihashes in core, so stop trying to index ad.
return err
return hasEnts, err
case adIngestContentNotFound:
// No entries data in CAR file. Entries data deleted later
// in chain unknown to this indexer, or publisher not
// serving entries data.
return err
return hasEnts, err
}
}
log.Errorw("Cannot get advertisement from car store", "err", err)
Expand Down Expand Up @@ -363,22 +362,22 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
errors.Is(err, ipld.ErrNotExists{}),
strings.Contains(msg, "content not found"),
strings.Contains(msg, "graphsync request failed to complete: skip"):
return adIngestError{adIngestContentNotFound, wrappedErr}
return false, adIngestError{adIngestContentNotFound, wrappedErr}
}
return adIngestError{adIngestSyncEntriesErr, wrappedErr}
return false, adIngestError{adIngestSyncEntriesErr, wrappedErr}
}

node, err := ing.loadNode(entriesCid, basicnode.Prototype.Any)
if err != nil {
return adIngestError{adIngestIndexerErr, fmt.Errorf("failed to load first entry after sync: %w", err)}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("failed to load first entry after sync: %w", err)}
}

if isHAMT(node) {
mhCount, err = ing.ingestHamtFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log)
} else {
mhCount, err = ing.ingestEntriesFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log)
}
return err
return mhCount != 0, err
}

func (ing *Ingester) ingestHamtFromPublisher(ctx context.Context, ad schema.Advertisement, publisherID, providerID peer.ID, entsCid cid.Cid, log *zap.SugaredLogger) (int, error) {
Expand All @@ -393,16 +392,15 @@ func (ing *Ingester) ingestHamtFromPublisher(ctx context.Context, ad schema.Adve
gatherCids := func(_ peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) {
hamtCids = append(hamtCids, c)
}
if !ing.mirror.canWrite() {
defer func() {
for _, c := range hamtCids {
err := ing.dsTmp.Delete(ctx, datastore.NewKey(c.String()))
if err != nil {
log.Errorw("Error deleting HAMT cid from datastore", "cid", c, "err", err)
}
// HAMT entries do not get written to mirror, so delete them.
defer func() {
for _, c := range hamtCids {
err := ing.dsTmp.Delete(ctx, datastore.NewKey(c.String()))
if err != nil {
log.Errorw("Error deleting HAMT cid from datastore", "cid", c, "err", err)
}
}()
}
}
}()

// Load the CID as HAMT root node.
hn, err := ing.loadHamt(entsCid)
Expand Down
4 changes: 1 addition & 3 deletions ipni-gc/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,9 +997,7 @@ func (s *scythe) removeEntries(ctx context.Context, adCid cid.Cid) error {
}
}
newRemoved := s.stats.IndexesRemoved - prevRemoved
if newRemoved != 0 {
log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source)
}
log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source)
return nil
}

Expand Down

0 comments on commit 87bebf0

Please sign in to comment.