Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not save empty advertisements to mirror #2471

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 48 additions & 59 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func (ing *Ingester) MarkAdProcessed(publisher peer.ID, adCid cid.Cid) error {
return ing.markAdProcessed(publisher, adCid, false, false)
}

func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, keep bool) error {
func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, mirrored bool) error {
cidStr := adCid.String()
ctx := context.Background()
if frozen {
Expand All @@ -591,7 +591,7 @@ func (ing *Ingester) markAdProcessed(publisher peer.ID, adCid cid.Cid, frozen, k
return err
}

if !keep || frozen {
if !mirrored || frozen {
// This ad is processed, so remove it from the datastore.
err = ing.dsTmp.Delete(ctx, datastore.NewKey(cidStr))
if err != nil {
Expand Down Expand Up @@ -1158,24 +1158,11 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as
"adCid", ai.cid,
"progress", fmt.Sprintf("%d of %d", count, total))

keep := ing.mirror.canWrite()
if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, keep); markErr != nil {
if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, false); markErr != nil {
log.Errorw("Failed to mark ad as processed", "err", markErr)
}
if !frozen && keep {
overwrite := ai.resync && ing.overwriteMirrorOnResync
// Write the advertisement to a CAR file, but omit the entries.
carInfo, err := ing.mirror.write(ctx, ai.cid, true, overwrite)
if err != nil {
if !errors.Is(err, fs.ErrExist) {
// Log the error, but do not return. Continue on to save the procesed ad.
log.Errorw("Cannot write advertisement to CAR file", "err", err)
}
// else car file already exists
} else {
log.Infow("Wrote CAR for skipped advertisement", "path", carInfo.Path, "size", carInfo.Size)
}
}
// Do not write removed ads to mirror. They are not read during
// indexing, and those deleted in the future are removed by GC.

// Distribute the atProcessedEvent notices to waiting Sync calls.
ing.inEvents <- adProcessedEvent{
Expand All @@ -1192,56 +1179,58 @@ func (ing *Ingester) ingestWorkerLogic(ctx context.Context, provider peer.ID, as
"progress", fmt.Sprintf("%d of %d", count, total),
"lag", lag)

err := ing.ingestAd(ctx, assignment.publisher, ai.cid, ai.resync, frozen, lag, headProvider)
if err == nil {
// No error at all, this ad was processed successfully.
stats.Record(context.Background(), metrics.AdIngestSuccessCount.M(1))
}

var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
case adIngestDecodingErr, adIngestMalformedErr, adIngestEntryChunkErr, adIngestContentNotFound:
// These error cases are permanent. If retried later the same
// error will happen. So log and drop this error.
log.Errorw("Skipping ad because of a permanent error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state)
stats.Record(context.Background(), metrics.AdIngestSkippedCount.M(1))
err = nil
}
stats.RecordWithOptions(context.Background(),
stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
stats.WithTags(tag.Insert(metrics.ErrKind, string(adIngestErr.state))))
} else if err != nil {
stats.RecordWithOptions(context.Background(),
stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
stats.WithTags(tag.Insert(metrics.ErrKind, "other error")))
}

hasEnts, err := ing.ingestAd(ctx, assignment.publisher, ai.cid, ai.resync, frozen, lag, headProvider)
if err != nil {
errText := err.Error()
if errors.Is(err, errInternal) {
errText = errInternal.Error()
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
case adIngestDecodingErr, adIngestMalformedErr, adIngestEntryChunkErr, adIngestContentNotFound:
// These error cases are permanent. If retried later the same
// error will happen. So log and drop this error.
log.Errorw("Skipping ad because of a permanent error", "adCid", ai.cid, "err", err, "errKind", adIngestErr.state)
stats.Record(context.Background(), metrics.AdIngestSkippedCount.M(1))
err = nil
}
stats.RecordWithOptions(context.Background(),
stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
stats.WithTags(tag.Insert(metrics.ErrKind, string(adIngestErr.state))))
} else {
stats.RecordWithOptions(context.Background(),
stats.WithMeasurements(metrics.AdIngestErrorCount.M(1)),
stats.WithTags(tag.Insert(metrics.ErrKind, "other error")))
}
ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText))
log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1)
// Tell anyone waiting that the sync finished for this head because
// of error. TODO(mm) would be better to propagate the error.
ing.inEvents <- adProcessedEvent{
publisher: assignment.publisher,
headAdCid: headAdCid,
adCid: ai.cid,
err: err,

// If err still not nil, then this is a non-permanent type of error.
if err != nil {
errText := err.Error()
if errors.Is(err, errInternal) {
errText = errInternal.Error()
}
ing.reg.SetLastError(provider, fmt.Errorf("error while ingesting ad %s: %s", ai.cid, errText))
log.Errorw("Error while ingesting ad. Bailing early, not ingesting later ads.", "adCid", ai.cid, "err", err, "adsLeftToProcess", i+1)
// Tell anyone waiting that the sync finished for this head because
// of error. TODO(mm) would be better to propagate the error.
ing.inEvents <- adProcessedEvent{
publisher: assignment.publisher,
headAdCid: headAdCid,
adCid: ai.cid,
err: err,
}
return
}
return
} else {
// No error at all, this ad was processed successfully.
stats.Record(context.Background(), metrics.AdIngestSuccessCount.M(1))
}

ing.reg.SetLastError(provider, nil)

keep := ing.mirror.canWrite()
if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, keep); markErr != nil {
mirrored := hasEnts && ing.mirror.canWrite()
if markErr := ing.markAdProcessed(assignment.publisher, ai.cid, frozen, mirrored); markErr != nil {
log.Errorw("Failed to mark ad as processed", "err", markErr)
}

if !frozen && keep {
if !frozen && mirrored {
overwrite := ai.resync && ing.overwriteMirrorOnResync
carInfo, err := ing.mirror.write(ctx, ai.cid, false, overwrite)
if err != nil {
Expand Down
76 changes: 37 additions & 39 deletions internal/ingest/linksystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func verifyAdvertisement(n ipld.Node, reg *registry.Registry) (peer.ID, error) {
// source of the indexed content, the provider is where content can be
// retrieved from. It is the provider ID that needs to be stored by the
// indexer.
func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) error {
func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid cid.Cid, resync, frozen bool, lag int, headProvider peer.AddrInfo) (bool, error) {
log := log.With("publisher", publisherID, "adCid", adCid)

ad, err := ing.loadAd(adCid)
Expand All @@ -147,7 +147,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
log.Errorw("Failed to load advertisement, skipping", "err", err)
// The ad cannot be loaded, so we cannot process it. Return nil so that
// the ad is marked as processed and is removed from the datastore.
return nil
return false, nil
}

stats.Record(ctx, metrics.IngestChange.M(1))
Expand All @@ -164,17 +164,15 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
stats.Record(ctx, metrics.AdIngestLatency.M(elapsedMsec))
log.Infow("Finished syncing advertisement", "elapsed", elapsed.String(), "multihashes", mhCount)

if mhCount == 0 {
return
}

// Record multihashes rate per provider.
elapsed = now.Sub(entsSyncStart)
ing.ingestRates.Update(string(headProvider.ID), uint64(mhCount), elapsed)
if mhCount != 0 {
// Record multihashes rate per provider.
elapsed = now.Sub(entsSyncStart)
ing.ingestRates.Update(string(headProvider.ID), uint64(mhCount), elapsed)

// Record how long entries sync took.
elapsedMsec = float64(elapsed.Nanoseconds()) / 1e6
stats.Record(ctx, metrics.EntriesSyncLatency.M(elapsedMsec))
// Record how long entries sync took.
elapsedMsec = float64(elapsed.Nanoseconds()) / 1e6
stats.Record(ctx, metrics.EntriesSyncLatency.M(elapsedMsec))
}
}()

// Since all advertisements in an assignment have the same provider,
Expand Down Expand Up @@ -205,11 +203,11 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
var extendedProviders *registry.ExtendedProviders
if ad.ExtendedProvider != nil {
if ad.IsRm {
return adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("rm ads can not have extended providers")}
}

if len(ad.ContextID) == 0 && ad.ExtendedProvider.Override {
return adIngestError{adIngestIndexerErr, fmt.Errorf("override can not be set on extended provider without context id")}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("override can not be set on extended provider without context id")}
}

extendedProviders = &registry.ExtendedProviders{
Expand All @@ -222,7 +220,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
for _, ep := range ad.ExtendedProvider.Providers {
epID, err := peer.Decode(ep.ID)
if err != nil {
return adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not decode extended provider id: %w", err)}
return false, adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not decode extended provider id: %w", err)}
}

eProvs = append(eProvs, registry.ExtendedProviderInfo{
Expand Down Expand Up @@ -259,7 +257,7 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
// to the chain in the future may have a valid address than can be
// used, allowing all the previous ads without valid addresses to be
// processed.
return adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not update provider info: %w", err)}
return false, adIngestError{adIngestRegisterProviderErr, fmt.Errorf("could not update provider info: %w", err)}
}

log = log.With("contextID", base64.StdEncoding.EncodeToString(ad.ContextID))
Expand All @@ -268,18 +266,18 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
log.Infow("Advertisement is for removal by context id")
err = ing.indexer.RemoveProviderContext(providerID, ad.ContextID)
if err != nil {
return adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to remove provider context: %w", errInternal, err)}
}
return nil
return false, nil
}

if len(ad.Metadata) == 0 {
// If the ad has no metadata and no entries, then the ad is only for
// updating provider addresses. Otherwise it is an error.
if ad.Entries != schema.NoEntries {
return adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")}
return false, adIngestError{adIngestMalformedErr, fmt.Errorf("advertisement missing metadata")}
}
return nil
return false, nil
}

// If advertisement has no entries, then it is for updating metadata only.
Expand All @@ -298,14 +296,14 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
}
err = ing.indexer.Put(value)
if err != nil {
return adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to update metadata: %w", errInternal, err)}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("%w: failed to update metadata: %w", errInternal, err)}
}
return nil
return false, nil
}

entriesCid := ad.Entries.(cidlink.Link).Cid
if entriesCid == cid.Undef {
return adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")}
return false, adIngestError{adIngestMalformedErr, errors.New("advertisement entries link is undefined")}
}

if ing.syncTimeout != 0 {
Expand All @@ -319,23 +317,24 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
// If using a CAR reader, then try to get the advertisement CAR file first.
if ing.mirror.canRead() {
mhCount, err = ing.ingestEntriesFromCar(ctx, ad, providerID, adCid, entriesCid, log)
hasEnts := mhCount != 0
// If entries data successfully read from CAR file.
if err == nil {
ing.mhsFromMirror.Add(uint64(mhCount))
return nil
return hasEnts, nil
}
if !errors.Is(err, fs.ErrNotExist) {
var adIngestErr adIngestError
if errors.As(err, &adIngestErr) {
switch adIngestErr.state {
case adIngestIndexerErr:
// Could not store multihashes in core, so stop trying to index ad.
return err
return hasEnts, err
case adIngestContentNotFound:
// No entries data in CAR file. Entries data deleted later
// in chain unknown to this indexer, or publisher not
// serving entries data.
return err
return hasEnts, err
}
}
log.Errorw("Cannot get advertisement from car store", "err", err)
Expand Down Expand Up @@ -363,22 +362,22 @@ func (ing *Ingester) ingestAd(ctx context.Context, publisherID peer.ID, adCid ci
errors.Is(err, ipld.ErrNotExists{}),
strings.Contains(msg, "content not found"),
strings.Contains(msg, "graphsync request failed to complete: skip"):
return adIngestError{adIngestContentNotFound, wrappedErr}
return false, adIngestError{adIngestContentNotFound, wrappedErr}
}
return adIngestError{adIngestSyncEntriesErr, wrappedErr}
return false, adIngestError{adIngestSyncEntriesErr, wrappedErr}
}

node, err := ing.loadNode(entriesCid, basicnode.Prototype.Any)
if err != nil {
return adIngestError{adIngestIndexerErr, fmt.Errorf("failed to load first entry after sync: %w", err)}
return false, adIngestError{adIngestIndexerErr, fmt.Errorf("failed to load first entry after sync: %w", err)}
}

if isHAMT(node) {
mhCount, err = ing.ingestHamtFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log)
} else {
mhCount, err = ing.ingestEntriesFromPublisher(ctx, ad, publisherID, providerID, entriesCid, log)
}
return err
return mhCount != 0, err
}

func (ing *Ingester) ingestHamtFromPublisher(ctx context.Context, ad schema.Advertisement, publisherID, providerID peer.ID, entsCid cid.Cid, log *zap.SugaredLogger) (int, error) {
Expand All @@ -393,16 +392,15 @@ func (ing *Ingester) ingestHamtFromPublisher(ctx context.Context, ad schema.Adve
gatherCids := func(_ peer.ID, c cid.Cid, _ dagsync.SegmentSyncActions) {
hamtCids = append(hamtCids, c)
}
if !ing.mirror.canWrite() {
defer func() {
for _, c := range hamtCids {
err := ing.dsTmp.Delete(ctx, datastore.NewKey(c.String()))
if err != nil {
log.Errorw("Error deleting HAMT cid from datastore", "cid", c, "err", err)
}
// HAMT entries do not get written to mirror, so delete them.
defer func() {
for _, c := range hamtCids {
err := ing.dsTmp.Delete(ctx, datastore.NewKey(c.String()))
if err != nil {
log.Errorw("Error deleting HAMT cid from datastore", "cid", c, "err", err)
}
}()
}
}
}()

// Load the CID as HAMT root node.
hn, err := ing.loadHamt(entsCid)
Expand Down
4 changes: 1 addition & 3 deletions ipni-gc/reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,9 +997,7 @@ func (s *scythe) removeEntries(ctx context.Context, adCid cid.Cid) error {
}
}
newRemoved := s.stats.IndexesRemoved - prevRemoved
if newRemoved != 0 {
log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source)
}
log.Infow("Removed indexes in removed ad", "adCid", adCid, "count", newRemoved, "total", s.stats.IndexesRemoved, "source", source)
return nil
}

Expand Down