From 87bebf03f1262f67c2b233b49966338dccf7d710 Mon Sep 17 00:00:00 2001 From: Andrew Gillis Date: Wed, 10 Jan 2024 02:32:49 -0800 Subject: [PATCH] Do not save empty advertisements to mirror (#2471) Empty advertisements should not be saved to mirror since they do not help index ingestion. Advertisements may have no multihash entries when: 1. The ad removes a context ID - When ingesting ads, the indexer does not read removal ads from the mirror since there is no entry data to get. 2. The ad only updates metadata or provider information - The indexer stores the metadata, but does not read mirror since there are no entries 3. The ad was deleted by a removal ad later in the chain - Once an ad is known to be deleted, that ad will never be read from the mirror since its content is deleted 4. The publisher is not serving entries data - This is treated like a no-content ad, as in case 2, but means that at some point in the past the ad had entry data. The is or should be a pending unpublished removal later in the chain. So, do not save this in a mirror since it will end up being deleted later, or may be a temporary publisher that should be queried by another indexer using the mirror. In either case the advertisement should not be written to the mirror. GC also removes any empty (no-content) advertisements from the mirror, and reindexing must not repopulate them. Other changes: - GC always logs number of indexes removed - Always HAMT entries which are not mirrored --- internal/ingest/ingest.go | 107 +++++++++++++++------------------- internal/ingest/linksystem.go | 76 ++++++++++++------------ ipni-gc/reaper/reaper.go | 4 +- 3 files changed, 86 insertions(+), 101 deletions(-) diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 4d8a64f42..fe231016d 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -576,7 +576,7 @@ func (ing *Ingester) MarkAdProcessed(publisher peer.ID, adCid cid.Cid) error { return ing.markAdProcessed(publisher, adCid, false, false) } -func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, keep bool) error { +func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, mirrored bool) error { cidStr := adCid.String() ctx := context.Background() if frozen { @@ -591,7 +591,7 @@ func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, k return err } - if !keep || frozen { + if !mirrored || frozen { // This ad is processed, so remove it from the datastore. err = ing.dsTmp.Delete(ctx, datastore.NewKey(cidStr)) if err != nil { @@ -1158,24 +1158,11 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as "adCid", ai.cid, "progress", fmt.Sprintf("%d of %d", count, total)) - keep := ing.mirror.canWrite() - if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, keep); markErr != nil { + if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, false); markErr != nil { log.Errorw("Failed to mark ad as processed", "err", markErr) } - if !frozen && keep { - overwrite := ai.resync && ing.overwriteMirrorOnResync - // Write the advertisement to a CAR file, but omit the entries. - carInfo, err := ing.mirror.write(ctx, ai.cid, true, overwrite) - if err != nil { - if !errors.Is(err, fs.ErrExist) { - // Log the error, but do not return. Continue on to save the procesed ad. - log.Errorw("Cannot write advertisement to CAR file", "err", err) - } - // else car file already exists - } else { - log.Infow("Wrote CAR for skipped advertisement", "path", carInfo.Path, "size", carInfo.Size) - } - } + // Do not write removed ads to mirror. They are not read during + // indexing, and those deleted in the future are removed by GC. // Distribute the atProcessedEvent notices to waiting Sync calls. ing.inEvents <- adProcessedEvent{ @@ -1192,56 +1179,58 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as "progress", fmt.Sprintf("%d of %d", count, total), "lag", lag) - err := ing.ingestAd(ctx, assignment.publisher, ai.cid, ai.resync, frozen, lag, headProvider) - if err == nil { - // No error at all, this ad was processed successfully. - stats.Record(context.Background(), metrics.AdIngestSuccessCount.M(1)) - } - - var adIngestErr adIngestError - if errors.As(err, &adIngestErr) { - switch adIngestErr.state { - case adIngestDecodingErr, adIngestMalformedErr, adIngestEntryChunkErr, adIngestContentNotFound: - // These error cases are permanent. If retried later the same - // error will happen. So log and drop this error. - log.Errorw("Skipping ad because of a permanent error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state) - stats.Record(context.Background(), metrics.AdIngestSkippedCount.M(1)) - err = nil - } - stats.RecordWithOptions(context.Background(), - stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)), - stats.WithTags(tag.Insert(metrics.ErrKind, string(adIngestErr.state)))) - } else if err != nil { - stats.RecordWithOptions(context.Background(), - stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)), - stats.WithTags(tag.Insert(metrics.ErrKind, "other error"))) - } - + hasEnts, err := ing.ingestAd(ctx, assignment.publisher, ai.cid, ai.resync, frozen, lag, headProvider) if err != nil { - errText := err.Error() - if errors.Is(err, errInternal) { - errText = errInternal.Error() + var adIngestErr adIngestError + if errors.As(err, &adIngestErr) { + switch adIngestErr.state { + case adIngestDecodingErr, adIngestMalformedErr, adIngestEntryChunkErr, adIngestContentNotFound: + // These error cases are permanent. If retried later the same + // error will happen. So log and drop this error. + log.Errorw("Skipping ad because of a permanent error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state) + stats.Record(context.Background(), metrics.AdIngestSkippedCount.M(1)) + err = nil + } + stats.RecordWithOptions(context.Background(), + stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)), + stats.WithTags(tag.Insert(metrics.ErrKind, string(adIngestErr.state)))) + } else { + stats.RecordWithOptions(context.Background(), + stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)), + stats.WithTags(tag.Insert(metrics.ErrKind, "other error"))) } - ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText)) - log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1) - // Tell anyone waiting that the sync finished for this head because - // of error. TODO(mm) would be better to propagate the error. - ing.inEvents <- adProcessedEvent{ - publisher: assignment.publisher, - headAdCid: headAdCid, - adCid: ai.cid, - err: err, + + // If err still not nil, then this is a non-permanent type of error. + if err != nil { + errText := err.Error() + if errors.Is(err, errInternal) { + errText = errInternal.Error() + } + ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText)) + log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1) + // Tell anyone waiting that the sync finished for this head because + // of error. TODO(mm) would be better to propagate the error. + ing.inEvents <- adProcessedEvent{ + publisher: assignment.publisher, + headAdCid: headAdCid, + adCid: ai.cid, + err: err, + } + return } - return + } else { + // No error at all, this ad was processed successfully. + stats.Record(context.Background(), metrics.AdIngestSuccessCount.M(1)) } + ing.reg.SetLastError(provider, nil) - keep := ing.mirror.canWrite() - if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, keep); markErr != nil { + mirrored := hasEnts && ing.mirror.canWrite() + if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, mirrored); markErr != nil { log.Errorw("Failed to mark ad as processed", "err", markErr) } - if !frozen && keep { + if !frozen && mirrored { overwrite := ai.resync && ing.overwriteMirrorOnResync carInfo, err := ing.mirror.write(ctx, ai.cid, false, overwrite) if err != nil { diff --git a/internal/ingest/linksystem.go b/internal/ingest/linksystem.go index b02c34ee5..6fd71109a 100644 --- a/internal/ingest/linksystem.go +++ b/internal/ingest/linksystem.go @@ -138,7 +138,7 @@ func verifyAdvertisement(n ipld.Node, reg *registry.Registry) (peer.ID, error) { // source of the indexed content, the provider is where content can be // retrieved from. It is the provider ID that needs to be stored by the // indexer. -func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) error { +func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) (bool, error) { log := log.With("publisher", publisherID, "adCid", adCid) ad, err := ing.loadAd(adCid) @@ -147,7 +147,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci log.Errorw("Failed to load advertisement, skipping", "err", err) // The ad cannot be loaded, so we cannot process it. Return nil so that // the ad is marked as processed and is removed from the datastore. - return nil + return false, nil } stats.Record(ctx, metrics.IngestChange.M(1)) @@ -164,17 +164,15 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci stats.Record(ctx, metrics.AdIngestLatency.M(elapsedMsec)) log.Infow("Finished syncing advertisement", "elapsed", elapsed.String(), "multihashes", mhCount) - if mhCount == 0 { - return - } - - // Record multihashes rate per provider. - elapsed = now.Sub(entsSyncStart) - ing.ingestRates.Update(string(headProvider.ID), uint64(mhCount), elapsed) + if mhCount != 0 { + // Record multihashes rate per provider. + elapsed = now.Sub(entsSyncStart) + ing.ingestRates.Update(string(headProvider.ID), uint64(mhCount), elapsed) - // Record how long entries sync took. - elapsedMsec = float64(elapsed.Nanoseconds()) / 1e6 - stats.Record(ctx, metrics.EntriesSyncLatency.M(elapsedMsec)) + // Record how long entries sync took. + elapsedMsec = float64(elapsed.Nanoseconds()) / 1e6 + stats.Record(ctx, metrics.EntriesSyncLatency.M(elapsedMsec)) + } }() // Since all advertisements in an assignment have the same provider, @@ -205,11 +203,11 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci var extendedProviders *registry.ExtendedProviders if ad.ExtendedProvider != nil { if ad.IsRm { - return adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")} + return false, adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")} } if len(ad.ContextID) == 0 && ad.ExtendedProvider.Override { - return adIngestError{adIngestIndexerErr, fmt.Errorf("override can not be set on extended provider without context id")} + return false, adIngestError{adIngestIndexerErr, fmt.Errorf("override can not be set on extended provider without context id")} } extendedProviders = ®istry.ExtendedProviders{ @@ -222,7 +220,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci for _, ep := range ad.ExtendedProvider.Providers { epID, err := peer.Decode(ep.ID) if err != nil { - return adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not decode extended provider id: %w", err)} + return false, adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not decode extended provider id: %w", err)} } eProvs = append(eProvs, registry.ExtendedProviderInfo{ @@ -259,7 +257,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci // to the chain in the future may have a valid address than can be // used, allowing all the previous ads without valid addresses to be // processed. - return adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not update provider info: %w", err)} + return false, adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not update provider info: %w", err)} } log = log.With("contextID", base64.StdEncoding.EncodeToString(ad.ContextID)) @@ -268,18 +266,18 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci log.Infow("Advertisement is for removal by context id") err = ing.indexer.RemoveProviderContext(providerID, ad.ContextID) if err != nil { - return adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)} + return false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)} } - return nil + return false, nil } if len(ad.Metadata) == 0 { // If the ad has no metadata and no entries, then the ad is only for // updating provider addresses. Otherwise it is an error. if ad.Entries != schema.NoEntries { - return adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")} + return false, adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")} } - return nil + return false, nil } // If advertisement has no entries, then it is for updating metadata only. @@ -298,14 +296,14 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci } err = ing.indexer.Put(value) if err != nil { - return adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to update metadata: %w", errInternal, err)} + return false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to update metadata: %w", errInternal, err)} } - return nil + return false, nil } entriesCid := ad.Entries.(cidlink.Link).Cid if entriesCid == cid.Undef { - return adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")} + return false, adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")} } if ing.syncTimeout != 0 { @@ -319,10 +317,11 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci // If using a CAR reader, then try to get the advertisement CAR file first. if ing.mirror.canRead() { mhCount, err = ing.ingestEntriesFromCar(ctx, ad, providerID, adCid, entriesCid, log) + hasEnts := mhCount != 0 // If entries data successfully read from CAR file. if err == nil { ing.mhsFromMirror.Add(uint64(mhCount)) - return nil + return hasEnts, nil } if !errors.Is(err, fs.ErrNotExist) { var adIngestErr adIngestError @@ -330,12 +329,12 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci switch adIngestErr.state { case adIngestIndexerErr: // Could not store multihashes in core, so stop trying to index ad. - return err + return hasEnts, err case adIngestContentNotFound: // No entries data in CAR file. Entries data deleted later // in chain unknown to this indexer, or publisher not // serving entries data. - return err + return hasEnts, err } } log.Errorw("Cannot get advertisement from car store", "err", err) @@ -363,14 +362,14 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci errors.Is(err, ipld.ErrNotExists{}), strings.Contains(msg, "content not found"), strings.Contains(msg, "graphsync request failed to complete: skip"): - return adIngestError{adIngestContentNotFound, wrappedErr} + return false, adIngestError{adIngestContentNotFound, wrappedErr} } - return adIngestError{adIngestSyncEntriesErr, wrappedErr} + return false, adIngestError{adIngestSyncEntriesErr, wrappedErr} } node, err := ing.loadNode(entriesCid, basicnode.Prototype.Any) if err != nil { - return adIngestError{adIngestIndexerErr, fmt.Errorf("failed to load first entry after sync: %w", err)} + return false, adIngestError{adIngestIndexerErr, fmt.Errorf("failed to load first entry after sync: %w", err)} } if isHAMT(node) { @@ -378,7 +377,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci } else { mhCount, err = ing.ingestEntriesFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log) } - return err + return mhCount != 0, err } func (ing *Ingester) ingestHamtFromPublisher(ctx context.Context, ad schema.Advertisement, publisherID, providerID peer.ID, entsCid cid.Cid, log *zap.SugaredLogger) (int, error) { @@ -393,16 +392,15 @@ func (ing *Ingester) ingestHamtFromPublisher(ctx context.Context, ad schema.Adve gatherCids := func(_ peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) { hamtCids = append(hamtCids, c) } - if !ing.mirror.canWrite() { - defer func() { - for _, c := range hamtCids { - err := ing.dsTmp.Delete(ctx, datastore.NewKey(c.String())) - if err != nil { - log.Errorw("Error deleting HAMT cid from datastore", "cid", c, "err", err) - } + // HAMT entries do not get written to mirror, so delete them. + defer func() { + for _, c := range hamtCids { + err := ing.dsTmp.Delete(ctx, datastore.NewKey(c.String())) + if err != nil { + log.Errorw("Error deleting HAMT cid from datastore", "cid", c, "err", err) } - }() - } + } + }() // Load the CID as HAMT root node. hn, err := ing.loadHamt(entsCid) diff --git a/ipni-gc/reaper/reaper.go b/ipni-gc/reaper/reaper.go index d43f30e38..50406fc82 100644 --- a/ipni-gc/reaper/reaper.go +++ b/ipni-gc/reaper/reaper.go @@ -997,9 +997,7 @@ func (s *scythe) removeEntries(ctx context.Context, adCid cid.Cid) error { } } newRemoved := s.stats.IndexesRemoved - prevRemoved - if newRemoved != 0 { - log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source) - } + log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source) return nil }