diff --git a/slasher/db/iface/interface.go b/slasher/db/iface/interface.go index f25c9efde5d6..3c03919bbe47 100644 --- a/slasher/db/iface/interface.go +++ b/slasher/db/iface/interface.go @@ -28,7 +28,7 @@ type ReadOnlyDatabase interface { LatestIndexedAttestationsTargetEpoch(ctx context.Context) (uint64, error) // MinMaxSpan related methods. - EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]detectionTypes.Span, error) + EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]detectionTypes.Span, bool, error) EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uint64, epoch uint64) (detectionTypes.Span, error) EpochsSpanByValidatorsIndices(ctx context.Context, validatorIndices []uint64, maxEpoch uint64) (map[uint64]map[uint64]detectionTypes.Span, error) diff --git a/slasher/db/kv/kv.go b/slasher/db/kv/kv.go index ebbee08995b0..2a5647e96a9d 100644 --- a/slasher/db/kv/kv.go +++ b/slasher/db/kv/kv.go @@ -85,7 +85,7 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) { return nil, err } kv := &Store{db: boltDB, databasePath: datafile} - kv.enableSpanCache(true) + kv.EnableSpanCache(true) spanCache, err := cache.NewEpochSpansCache(cfg.SpanCacheSize, persistSpanMapsOnEviction(kv)) if err != nil { return nil, errors.Wrap(err, "could not create new cache") diff --git a/slasher/db/kv/spanner.go b/slasher/db/kv/spanner.go index afda47a47a83..991679c19a2a 100644 --- a/slasher/db/kv/spanner.go +++ b/slasher/db/kv/spanner.go @@ -100,15 +100,17 @@ func marshalSpan(span types.Span) []byte { // EpochSpansMap accepts epoch and returns the corresponding spans map epoch=>spans // for slashing detection. This function reads spans from cache if caching is -// enabled and the epoch key exists. Returns nil if the span map +// enabled and the epoch key exists. +// Returns span maps, retrieved from cache bool, +// and error in case of db error. returns empty map if the span map // for this validator index does not exist. -func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]types.Span, error) { +func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]types.Span, bool, error) { ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpansMap") defer span.End() if db.spanCacheEnabled { spanMap, ok := db.spanCache.Get(epoch) if ok { - return spanMap, nil + return spanMap, true, nil } } @@ -135,7 +137,7 @@ func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]ty if spanMap == nil { spanMap = make(map[uint64]types.Span) } - return spanMap, err + return spanMap, false, err } // EpochSpanByValidatorIndex accepts validator index and epoch returns the corresponding spans @@ -199,6 +201,9 @@ func (db *Store) EpochsSpanByValidatorsIndices(ctx context.Context, validatorInd valSpans := make(map[uint64]types.Span, len(validatorIndices)) for _, v := range validatorIndices { enc := epochBucket.Get(bytesutil.Bytes8(v)) + if enc == nil { + continue + } value, err := unmarshalSpan(ctx, enc) if err != nil { return err @@ -306,7 +311,8 @@ func (db *Store) SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap ma }) } -func (db *Store) enableSpanCache(enable bool) { +// EnableSpanCache used to enable or disable span map cache in tests. +func (db *Store) EnableSpanCache(enable bool) { db.spanCacheEnabled = enable } @@ -316,8 +322,8 @@ func (db *Store) SaveCachedSpansMaps(ctx context.Context) error { ctx, span := trace.StartSpan(ctx, "slasherDB.SaveCachedSpansMaps") defer span.End() if db.spanCacheEnabled { - db.enableSpanCache(false) - defer db.enableSpanCache(true) + db.EnableSpanCache(false) + defer db.EnableSpanCache(true) for epoch := lowestObservedEpoch; epoch <= highestObservedEpoch; epoch++ { spanMap, ok := db.spanCache.Get(epoch) if ok { @@ -382,10 +388,10 @@ func (db *Store) findOrLoadEpochInCache(ctx context.Context, epoch uint64) (map[ return spanMap, nil } - db.enableSpanCache(false) - defer db.enableSpanCache(true) + db.EnableSpanCache(false) + defer db.EnableSpanCache(true) // If the epoch we want isn't in the cache, load it in. - spanForEpoch, err := db.EpochSpansMap(ctx, epoch) + spanForEpoch, _, err := db.EpochSpansMap(ctx, epoch) if err != nil { return make(map[uint64]types.Span), errors.Wrap(err, "failed to get span map for epoch") } diff --git a/slasher/db/kv/spanner_test.go b/slasher/db/kv/spanner_test.go index 3858a106084d..95dcc8d7ef26 100644 --- a/slasher/db/kv/spanner_test.go +++ b/slasher/db/kv/spanner_test.go @@ -55,7 +55,7 @@ func TestValidatorSpanMap_NilDB(t *testing.T) { ctx := context.Background() validatorIdx := uint64(1) - vsm, err := db.EpochSpansMap(ctx, validatorIdx) + vsm, _, err := db.EpochSpansMap(ctx, validatorIdx) if err != nil { t.Fatalf("Nil EpochSpansMap should not return error: %v", err) } @@ -76,7 +76,7 @@ func TestStore_SaveSpans(t *testing.T) { if err != nil { t.Fatalf("Save validator span map failed: %v", err) } - sm, err := db.EpochSpansMap(ctx, tt.epoch) + sm, _, err := db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatalf("Failed to get validator span map: %v", err) } @@ -108,7 +108,7 @@ func TestStore_SaveCachedSpans(t *testing.T) { } // wait for value to pass through cache buffers time.Sleep(time.Millisecond * 10) - sm, err := db.EpochSpansMap(ctx, tt.epoch) + sm, _, err := db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatalf("Failed to get validator span map: %v", err) } @@ -141,7 +141,7 @@ func TestStore_DeleteEpochSpans(t *testing.T) { } for _, tt := range spanTests { - sm, err := db.EpochSpansMap(ctx, tt.epoch) + sm, _, err := db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatalf("Failed to get validator span map: %v", err) } @@ -152,7 +152,7 @@ func TestStore_DeleteEpochSpans(t *testing.T) { if err != nil { t.Fatalf("Delete validator span map error: %v", err) } - sm, err = db.EpochSpansMap(ctx, tt.epoch) + sm, _, err = db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatal(err) } @@ -178,7 +178,7 @@ func TestValidatorSpanMap_DeletesOnCacheSavesToDB(t *testing.T) { // Wait for value to pass through cache buffers. time.Sleep(time.Millisecond * 10) for _, tt := range spanTests { - spanMap, err := db.EpochSpansMap(ctx, tt.epoch) + spanMap, _, err := db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatalf("Failed to get validator span map: %v", err) } @@ -190,13 +190,13 @@ func TestValidatorSpanMap_DeletesOnCacheSavesToDB(t *testing.T) { t.Fatalf("Delete validator span map error: %v", err) } // Wait for value to pass through cache buffers. - db.enableSpanCache(false) + db.EnableSpanCache(false) time.Sleep(time.Millisecond * 10) - spanMap, err = db.EpochSpansMap(ctx, tt.epoch) + spanMap, _, err = db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatal(err) } - db.enableSpanCache(true) + db.EnableSpanCache(true) if !reflect.DeepEqual(spanMap, tt.spanMap) { t.Errorf("Expected validator span map to be deleted, received: %v", spanMap) } @@ -226,7 +226,7 @@ func TestValidatorSpanMap_SaveOnEvict(t *testing.T) { // Wait for value to pass through cache buffers. time.Sleep(time.Millisecond * 1000) for i := uint64(0); i < 6; i++ { - sm, err := db.EpochSpansMap(ctx, i) + sm, _, err := db.EpochSpansMap(ctx, i) if err != nil { t.Fatalf("Failed to get validator span map: %v", err) } @@ -256,7 +256,7 @@ func TestValidatorSpanMap_SaveCachedSpansMaps(t *testing.T) { } db.spanCache.Clear() for _, tt := range spanTests { - sm, err := db.EpochSpansMap(ctx, tt.epoch) + sm, _, err := db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatalf("Failed to get validator span map: %v", err) } diff --git a/slasher/db/testing/setup_db.go b/slasher/db/testing/setup_db.go index 4136e37284c7..121a6d6ea897 100644 --- a/slasher/db/testing/setup_db.go +++ b/slasher/db/testing/setup_db.go @@ -25,6 +25,7 @@ func SetupSlasherDB(t testing.TB, spanCacheEnabled bool) *kv.Store { } cfg := &kv.Config{} db, err := slasherDB.NewDB(p, cfg) + db.EnableSpanCache(spanCacheEnabled) if err != nil { t.Fatalf("Failed to instantiate DB: %v", err) } diff --git a/slasher/detection/attestations/spanner.go b/slasher/detection/attestations/spanner.go index 40025fccdebc..9d2833c1a513 100644 --- a/slasher/detection/attestations/spanner.go +++ b/slasher/detection/attestations/spanner.go @@ -67,11 +67,11 @@ func (s *SpanDetector) DetectSlashingsForAttestation( ) } - spanMap, err := s.slasherDB.EpochSpansMap(ctx, sourceEpoch) + spanMap, _, err := s.slasherDB.EpochSpansMap(ctx, sourceEpoch) if err != nil { return nil, err } - targetSpanMap, err := s.slasherDB.EpochSpansMap(ctx, targetEpoch) + targetSpanMap, _, err := s.slasherDB.EpochSpansMap(ctx, targetEpoch) if err != nil { return nil, err } @@ -152,7 +152,7 @@ func (s *SpanDetector) saveSigBytes(ctx context.Context, att *ethpb.IndexedAttes ctx, traceSpan := trace.StartSpan(ctx, "spanner.saveSigBytes") defer traceSpan.End() target := att.Data.Target.Epoch - spanMap, err := s.slasherDB.EpochSpansMap(ctx, target) + spanMap, _, err := s.slasherDB.EpochSpansMap(ctx, target) if err != nil { return err } @@ -193,17 +193,37 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, att *ethpb.IndexedAtte } valIndices := make([]uint64, len(att.AttestingIndices)) copy(valIndices, att.AttestingIndices) - lowestEpoch := source - epochLookback - if int(lowestEpoch) <= 0 { - lowestEpoch = 0 - } latestMinSpanDistanceObserved.Set(float64(att.Data.Target.Epoch - att.Data.Source.Epoch)) - for epoch := source - 1; epoch >= lowestEpoch; epoch-- { - spanMap, err := s.slasherDB.EpochSpansMap(ctx, epoch) + // the for loop tries to update min span using cache for as long as there + // is a relevant cached epoch. when there is no such epoch in cache batch + // db read and write is used. + spanMap := make(map[uint64]types.Span) + epochsSpansMap := make(map[uint64]map[uint64]types.Span) + epoch := source - 1 + useCache := true + useDb := false + var err error + for ; epoch >= 0; epoch-- { + if useCache { + spanMap, useCache, err = s.slasherDB.EpochSpansMap(ctx, epoch) + } + // Should happen once when cache is exhausted. + if !useCache && !useDb { + epochsSpansMap, err = s.slasherDB.EpochsSpanByValidatorsIndices(ctx, valIndices, epoch) + useDb = true + } if err != nil { return err } + if useDb { + spanMap = epochsSpansMap[epoch] + if spanMap == nil { + spanMap = make(map[uint64]types.Span) + epochsSpansMap[epoch] = spanMap + } + } + indices := valIndices[:0] for _, idx := range valIndices { span := spanMap[idx] @@ -219,13 +239,19 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, att *ethpb.IndexedAtte indices = append(indices, idx) } } - if err := s.slasherDB.SaveEpochSpansMap(ctx, epoch, spanMap); err != nil { - return err - } - if len(indices) == 0 { - break + copy(valIndices, indices) + if useCache { + if err := s.slasherDB.SaveEpochSpansMap(ctx, epoch, spanMap); err != nil { + return err + } } - if epoch == 0 { + if len(indices) == 0 || epoch == 0 { + if useDb { + // should happen once when finishing update to all epochs and all indices. + if err := s.slasherDB.SaveEpochsSpanByValidatorsIndices(ctx, epochsSpansMap); err != nil { + return err + } + } break } } @@ -243,7 +269,7 @@ func (s *SpanDetector) updateMaxSpan(ctx context.Context, att *ethpb.IndexedAtte valIndices := make([]uint64, len(att.AttestingIndices)) copy(valIndices, att.AttestingIndices) for epoch := source + 1; epoch < target; epoch++ { - spanMap, err := s.slasherDB.EpochSpansMap(ctx, epoch) + spanMap, _, err := s.slasherDB.EpochSpansMap(ctx, epoch) if err != nil { return err } diff --git a/slasher/detection/attestations/spanner_test.go b/slasher/detection/attestations/spanner_test.go index 2202b7d58d16..11f5b8f8ce8d 100644 --- a/slasher/detection/attestations/spanner_test.go +++ b/slasher/detection/attestations/spanner_test.go @@ -825,7 +825,7 @@ func TestNewSpanDetector_UpdateSpans(t *testing.T) { t.Fatal(err) } for epoch := range tt.want { - sm, err := sd.slasherDB.EpochSpansMap(ctx, uint64(epoch)) + sm, _, err := sd.slasherDB.EpochSpansMap(ctx, uint64(epoch)) if err != nil { t.Fatalf("Failed to read from slasherDB: %v", err) }