Skip to content

Commit

Permalink
Slasher min span remove lookback (#5591)
Browse files Browse the repository at this point in the history
* batch db write and read
* fix nil handling
* Merge branch 'master' into batch_min_max_span
* remove commented code
* Merge branch 'master' into batch_min_max_span
* raul feedback
* Merge branch 'batch_min_max_span' of github.com:prysmaticlabs/prysm into batch_min_max_span
* Merge branch 'master' into batch_min_max_span
  • Loading branch information
shayzluf authored Apr 23, 2020
1 parent a78035d commit 37b68ba
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 40 deletions.
2 changes: 1 addition & 1 deletion slasher/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type ReadOnlyDatabase interface {
LatestIndexedAttestationsTargetEpoch(ctx context.Context) (uint64, error)

// MinMaxSpan related methods.
EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]detectionTypes.Span, error)
EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]detectionTypes.Span, bool, error)
EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uint64, epoch uint64) (detectionTypes.Span, error)
EpochsSpanByValidatorsIndices(ctx context.Context, validatorIndices []uint64, maxEpoch uint64) (map[uint64]map[uint64]detectionTypes.Span, error)

Expand Down
2 changes: 1 addition & 1 deletion slasher/db/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) {
return nil, err
}
kv := &Store{db: boltDB, databasePath: datafile}
kv.enableSpanCache(true)
kv.EnableSpanCache(true)
spanCache, err := cache.NewEpochSpansCache(cfg.SpanCacheSize, persistSpanMapsOnEviction(kv))
if err != nil {
return nil, errors.Wrap(err, "could not create new cache")
Expand Down
26 changes: 16 additions & 10 deletions slasher/db/kv/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,17 @@ func marshalSpan(span types.Span) []byte {

// EpochSpansMap accepts epoch and returns the corresponding spans map epoch=>spans
// for slashing detection. This function reads spans from cache if caching is
// enabled and the epoch key exists. Returns nil if the span map
// enabled and the epoch key exists.
// Returns span maps, retrieved from cache bool,
// and error in case of db error. returns empty map if the span map
// for this validator index does not exist.
func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]types.Span, error) {
func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]types.Span, bool, error) {
ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpansMap")
defer span.End()
if db.spanCacheEnabled {
spanMap, ok := db.spanCache.Get(epoch)
if ok {
return spanMap, nil
return spanMap, true, nil
}
}

Expand All @@ -135,7 +137,7 @@ func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]ty
if spanMap == nil {
spanMap = make(map[uint64]types.Span)
}
return spanMap, err
return spanMap, false, err
}

// EpochSpanByValidatorIndex accepts validator index and epoch returns the corresponding spans
Expand Down Expand Up @@ -199,6 +201,9 @@ func (db *Store) EpochsSpanByValidatorsIndices(ctx context.Context, validatorInd
valSpans := make(map[uint64]types.Span, len(validatorIndices))
for _, v := range validatorIndices {
enc := epochBucket.Get(bytesutil.Bytes8(v))
if enc == nil {
continue
}
value, err := unmarshalSpan(ctx, enc)
if err != nil {
return err
Expand Down Expand Up @@ -306,7 +311,8 @@ func (db *Store) SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap ma
})
}

func (db *Store) enableSpanCache(enable bool) {
// EnableSpanCache used to enable or disable span map cache in tests.
func (db *Store) EnableSpanCache(enable bool) {
db.spanCacheEnabled = enable
}

Expand All @@ -316,8 +322,8 @@ func (db *Store) SaveCachedSpansMaps(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveCachedSpansMaps")
defer span.End()
if db.spanCacheEnabled {
db.enableSpanCache(false)
defer db.enableSpanCache(true)
db.EnableSpanCache(false)
defer db.EnableSpanCache(true)
for epoch := lowestObservedEpoch; epoch <= highestObservedEpoch; epoch++ {
spanMap, ok := db.spanCache.Get(epoch)
if ok {
Expand Down Expand Up @@ -382,10 +388,10 @@ func (db *Store) findOrLoadEpochInCache(ctx context.Context, epoch uint64) (map[
return spanMap, nil
}

db.enableSpanCache(false)
defer db.enableSpanCache(true)
db.EnableSpanCache(false)
defer db.EnableSpanCache(true)
// If the epoch we want isn't in the cache, load it in.
spanForEpoch, err := db.EpochSpansMap(ctx, epoch)
spanForEpoch, _, err := db.EpochSpansMap(ctx, epoch)
if err != nil {
return make(map[uint64]types.Span), errors.Wrap(err, "failed to get span map for epoch")
}
Expand Down
22 changes: 11 additions & 11 deletions slasher/db/kv/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestValidatorSpanMap_NilDB(t *testing.T) {
ctx := context.Background()

validatorIdx := uint64(1)
vsm, err := db.EpochSpansMap(ctx, validatorIdx)
vsm, _, err := db.EpochSpansMap(ctx, validatorIdx)
if err != nil {
t.Fatalf("Nil EpochSpansMap should not return error: %v", err)
}
Expand All @@ -76,7 +76,7 @@ func TestStore_SaveSpans(t *testing.T) {
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
sm, err := db.EpochSpansMap(ctx, tt.epoch)
sm, _, err := db.EpochSpansMap(ctx, tt.epoch)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestStore_SaveCachedSpans(t *testing.T) {
}
// wait for value to pass through cache buffers
time.Sleep(time.Millisecond * 10)
sm, err := db.EpochSpansMap(ctx, tt.epoch)
sm, _, err := db.EpochSpansMap(ctx, tt.epoch)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestStore_DeleteEpochSpans(t *testing.T) {
}

for _, tt := range spanTests {
sm, err := db.EpochSpansMap(ctx, tt.epoch)
sm, _, err := db.EpochSpansMap(ctx, tt.epoch)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
Expand All @@ -152,7 +152,7 @@ func TestStore_DeleteEpochSpans(t *testing.T) {
if err != nil {
t.Fatalf("Delete validator span map error: %v", err)
}
sm, err = db.EpochSpansMap(ctx, tt.epoch)
sm, _, err = db.EpochSpansMap(ctx, tt.epoch)
if err != nil {
t.Fatal(err)
}
Expand All @@ -178,7 +178,7 @@ func TestValidatorSpanMap_DeletesOnCacheSavesToDB(t *testing.T) {
// Wait for value to pass through cache buffers.
time.Sleep(time.Millisecond * 10)
for _, tt := range spanTests {
spanMap, err := db.EpochSpansMap(ctx, tt.epoch)
spanMap, _, err := db.EpochSpansMap(ctx, tt.epoch)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
Expand All @@ -190,13 +190,13 @@ func TestValidatorSpanMap_DeletesOnCacheSavesToDB(t *testing.T) {
t.Fatalf("Delete validator span map error: %v", err)
}
// Wait for value to pass through cache buffers.
db.enableSpanCache(false)
db.EnableSpanCache(false)
time.Sleep(time.Millisecond * 10)
spanMap, err = db.EpochSpansMap(ctx, tt.epoch)
spanMap, _, err = db.EpochSpansMap(ctx, tt.epoch)
if err != nil {
t.Fatal(err)
}
db.enableSpanCache(true)
db.EnableSpanCache(true)
if !reflect.DeepEqual(spanMap, tt.spanMap) {
t.Errorf("Expected validator span map to be deleted, received: %v", spanMap)
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestValidatorSpanMap_SaveOnEvict(t *testing.T) {
// Wait for value to pass through cache buffers.
time.Sleep(time.Millisecond * 1000)
for i := uint64(0); i < 6; i++ {
sm, err := db.EpochSpansMap(ctx, i)
sm, _, err := db.EpochSpansMap(ctx, i)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestValidatorSpanMap_SaveCachedSpansMaps(t *testing.T) {
}
db.spanCache.Clear()
for _, tt := range spanTests {
sm, err := db.EpochSpansMap(ctx, tt.epoch)
sm, _, err := db.EpochSpansMap(ctx, tt.epoch)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions slasher/db/testing/setup_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func SetupSlasherDB(t testing.TB, spanCacheEnabled bool) *kv.Store {
}
cfg := &kv.Config{}
db, err := slasherDB.NewDB(p, cfg)
db.EnableSpanCache(spanCacheEnabled)
if err != nil {
t.Fatalf("Failed to instantiate DB: %v", err)
}
Expand Down
58 changes: 42 additions & 16 deletions slasher/detection/attestations/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func (s *SpanDetector) DetectSlashingsForAttestation(
)
}

spanMap, err := s.slasherDB.EpochSpansMap(ctx, sourceEpoch)
spanMap, _, err := s.slasherDB.EpochSpansMap(ctx, sourceEpoch)
if err != nil {
return nil, err
}
targetSpanMap, err := s.slasherDB.EpochSpansMap(ctx, targetEpoch)
targetSpanMap, _, err := s.slasherDB.EpochSpansMap(ctx, targetEpoch)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (s *SpanDetector) saveSigBytes(ctx context.Context, att *ethpb.IndexedAttes
ctx, traceSpan := trace.StartSpan(ctx, "spanner.saveSigBytes")
defer traceSpan.End()
target := att.Data.Target.Epoch
spanMap, err := s.slasherDB.EpochSpansMap(ctx, target)
spanMap, _, err := s.slasherDB.EpochSpansMap(ctx, target)
if err != nil {
return err
}
Expand Down Expand Up @@ -193,17 +193,37 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, att *ethpb.IndexedAtte
}
valIndices := make([]uint64, len(att.AttestingIndices))
copy(valIndices, att.AttestingIndices)
lowestEpoch := source - epochLookback
if int(lowestEpoch) <= 0 {
lowestEpoch = 0
}
latestMinSpanDistanceObserved.Set(float64(att.Data.Target.Epoch - att.Data.Source.Epoch))

for epoch := source - 1; epoch >= lowestEpoch; epoch-- {
spanMap, err := s.slasherDB.EpochSpansMap(ctx, epoch)
// the for loop tries to update min span using cache for as long as there
// is a relevant cached epoch. when there is no such epoch in cache batch
// db read and write is used.
spanMap := make(map[uint64]types.Span)
epochsSpansMap := make(map[uint64]map[uint64]types.Span)
epoch := source - 1
useCache := true
useDb := false
var err error
for ; epoch >= 0; epoch-- {
if useCache {
spanMap, useCache, err = s.slasherDB.EpochSpansMap(ctx, epoch)
}
// Should happen once when cache is exhausted.
if !useCache && !useDb {
epochsSpansMap, err = s.slasherDB.EpochsSpanByValidatorsIndices(ctx, valIndices, epoch)
useDb = true
}
if err != nil {
return err
}
if useDb {
spanMap = epochsSpansMap[epoch]
if spanMap == nil {
spanMap = make(map[uint64]types.Span)
epochsSpansMap[epoch] = spanMap
}
}

indices := valIndices[:0]
for _, idx := range valIndices {
span := spanMap[idx]
Expand All @@ -219,13 +239,19 @@ func (s *SpanDetector) updateMinSpan(ctx context.Context, att *ethpb.IndexedAtte
indices = append(indices, idx)
}
}
if err := s.slasherDB.SaveEpochSpansMap(ctx, epoch, spanMap); err != nil {
return err
}
if len(indices) == 0 {
break
copy(valIndices, indices)
if useCache {
if err := s.slasherDB.SaveEpochSpansMap(ctx, epoch, spanMap); err != nil {
return err
}
}
if epoch == 0 {
if len(indices) == 0 || epoch == 0 {
if useDb {
// should happen once when finishing update to all epochs and all indices.
if err := s.slasherDB.SaveEpochsSpanByValidatorsIndices(ctx, epochsSpansMap); err != nil {
return err
}
}
break
}
}
Expand All @@ -243,7 +269,7 @@ func (s *SpanDetector) updateMaxSpan(ctx context.Context, att *ethpb.IndexedAtte
valIndices := make([]uint64, len(att.AttestingIndices))
copy(valIndices, att.AttestingIndices)
for epoch := source + 1; epoch < target; epoch++ {
spanMap, err := s.slasherDB.EpochSpansMap(ctx, epoch)
spanMap, _, err := s.slasherDB.EpochSpansMap(ctx, epoch)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion slasher/detection/attestations/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func TestNewSpanDetector_UpdateSpans(t *testing.T) {
t.Fatal(err)
}
for epoch := range tt.want {
sm, err := sd.slasherDB.EpochSpansMap(ctx, uint64(epoch))
sm, _, err := sd.slasherDB.EpochSpansMap(ctx, uint64(epoch))
if err != nil {
t.Fatalf("Failed to read from slasherDB: %v", err)
}
Expand Down

0 comments on commit 37b68ba

Please sign in to comment.