diff --git a/beacon/light/canonical.go b/beacon/light/canonical.go deleted file mode 100644 index 50c70e1ffae4..000000000000 --- a/beacon/light/canonical.go +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2023 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package light - -import ( - "encoding/binary" - "fmt" - - "github.com/ethereum/go-ethereum/common/lru" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/log" -) - -// canonicalStore stores instances of the given type in a database and caches -// them in memory, associated with a continuous range of period numbers. -// Note: canonicalStore is not thread safe and it is the caller's responsibility -// to avoid concurrent access. -type canonicalStore[T any] struct { - db ethdb.KeyValueStore - keyPrefix []byte - periods Range - cache *lru.Cache[uint64, T] - encode func(T) ([]byte, error) - decode func([]byte) (T, error) -} - -// newCanonicalStore creates a new canonicalStore and loads all keys associated -// with the keyPrefix in order to determine the ranges available in the database. -func newCanonicalStore[T any](db ethdb.KeyValueStore, keyPrefix []byte, - encode func(T) ([]byte, error), decode func([]byte) (T, error)) *canonicalStore[T] { - cs := &canonicalStore[T]{ - db: db, - keyPrefix: keyPrefix, - encode: encode, - decode: decode, - cache: lru.NewCache[uint64, T](100), - } - var ( - iter = db.NewIterator(keyPrefix, nil) - kl = len(keyPrefix) - ) - for iter.Next() { - if len(iter.Key()) != kl+8 { - log.Warn("Invalid key length in the canonical chain database", "key", fmt.Sprintf("%#x", iter.Key())) - continue - } - period := binary.BigEndian.Uint64(iter.Key()[kl : kl+8]) - if cs.periods.Start == 0 { - cs.periods.Start = period - } else if cs.periods.End != period { - log.Warn("Gap in the canonical chain database") - break // continuity guaranteed - } - cs.periods.End = period + 1 - } - iter.Release() - return cs -} - -// databaseKey returns the database key belonging to the given period. -func (cs *canonicalStore[T]) databaseKey(period uint64) []byte { - var ( - kl = len(cs.keyPrefix) - key = make([]byte, kl+8) - ) - copy(key[:kl], cs.keyPrefix) - binary.BigEndian.PutUint64(key[kl:], period) - return key -} - -// add adds the given item to the database. It also ensures that the range remains -// continuous. Can be used either with a batch or database backend. -func (cs *canonicalStore[T]) add(backend ethdb.KeyValueWriter, period uint64, value T) error { - if !cs.periods.CanExpand(period) { - return fmt.Errorf("period expansion is not allowed, first: %d, next: %d, period: %d", cs.periods.Start, cs.periods.End, period) - } - enc, err := cs.encode(value) - if err != nil { - return err - } - if err := backend.Put(cs.databaseKey(period), enc); err != nil { - return err - } - cs.cache.Add(period, value) - cs.periods.Expand(period) - return nil -} - -// deleteFrom removes items starting from the given period. -func (cs *canonicalStore[T]) deleteFrom(batch ethdb.Batch, fromPeriod uint64) (deleted Range) { - if fromPeriod >= cs.periods.End { - return - } - if fromPeriod < cs.periods.Start { - fromPeriod = cs.periods.Start - } - deleted = Range{Start: fromPeriod, End: cs.periods.End} - for period := fromPeriod; period < cs.periods.End; period++ { - batch.Delete(cs.databaseKey(period)) - cs.cache.Remove(period) - } - if fromPeriod > cs.periods.Start { - cs.periods.End = fromPeriod - } else { - cs.periods = Range{} - } - return -} - -// get returns the item at the given period or the null value of the given type -// if no item is present. -// Note: get is thread safe in itself and therefore can be called either with -// locked or unlocked chain mutex. -func (cs *canonicalStore[T]) get(period uint64) (value T, ok bool) { - if value, ok = cs.cache.Get(period); ok { - return - } - if enc, err := cs.db.Get(cs.databaseKey(period)); err == nil { - if v, err := cs.decode(enc); err == nil { - value, ok = v, true - } else { - log.Error("Error decoding canonical store value", "error", err) - } - } - return -} diff --git a/beacon/light/canonicalstore.go b/beacon/light/canonicalstore.go new file mode 100644 index 000000000000..c3ecb5b57bc5 --- /dev/null +++ b/beacon/light/canonicalstore.go @@ -0,0 +1,252 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package light + +import ( + "encoding/binary" + "fmt" + + "github.com/ethereum/go-ethereum/beacon/types" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/lru" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +func determineRange(db ethdb.Iteratee, keyPrefix []byte) Range { + var ( + iter = db.NewIterator(keyPrefix, nil) + keyLength = len(keyPrefix) + p Range + ) + defer iter.Release() + for iter.Next() { + if len(iter.Key()) != keyLength+8 { + log.Warn("Invalid key length in the canonical chain database", "key", fmt.Sprintf("%#x", iter.Key())) + continue + } + period := binary.BigEndian.Uint64(iter.Key()[keyLength : keyLength+8]) + if p.Start == 0 { + p.Start = period + } else if p.End != period { + log.Warn("Gap in the canonical chain database") + break // continuity guaranteed + } + p.End = period + 1 + } + return p +} + +// fixedCommitteeRootsStore stores fixedCommitteeRoots +type fixedCommitteeRootsStore struct { + periods Range + cache *lru.Cache[uint64, common.Hash] +} + +// newFixedCommitteeRootsStore creates a new fixedCommitteeRootsStore and +// verifies the continuity of the range in database. +func newFixedCommitteeRootsStore(db ethdb.Iteratee) *fixedCommitteeRootsStore { + return &fixedCommitteeRootsStore{ + cache: lru.NewCache[uint64, common.Hash](100), + periods: determineRange(db, rawdb.FixedCommitteeRootKey), + } +} + +// databaseKey returns the database key belonging to the given period. +func (cs *fixedCommitteeRootsStore) databaseKey(period uint64) []byte { + return binary.BigEndian.AppendUint64(rawdb.FixedCommitteeRootKey, period) +} + +// add adds the given item to the database. It also ensures that the range remains +// continuous. Can be used either with a batch or database backend. +func (cs *fixedCommitteeRootsStore) add(backend ethdb.KeyValueWriter, period uint64, value common.Hash) error { + if !cs.periods.CanExpand(period) { + return fmt.Errorf("period expansion is not allowed, first: %d, next: %d, period: %d", cs.periods.Start, cs.periods.End, period) + } + if err := backend.Put(cs.databaseKey(period), value[:]); err != nil { + return err + } + cs.cache.Add(period, value) + cs.periods.Expand(period) + return nil +} + +// deleteFrom removes items starting from the given period. +func (cs *fixedCommitteeRootsStore) deleteFrom(db ethdb.KeyValueWriter, fromPeriod uint64) (deleted Range) { + keepRange, deleteRange := cs.periods.Split(fromPeriod) + deleteRange.Each(func(period uint64) { + db.Delete(cs.databaseKey(period)) + cs.cache.Remove(period) + }) + cs.periods = keepRange + return deleteRange +} + +// get returns the item at the given period or the null value of the given type +// if no item is present. +// Note: get is thread safe in itself and therefore can be called either with +// locked or unlocked chain mutex. +func (cs *fixedCommitteeRootsStore) get(backend ethdb.KeyValueReader, period uint64) (value common.Hash, ok bool) { + if value, ok = cs.cache.Get(period); ok { + return + } + enc, err := backend.Get(cs.databaseKey(period)) + if err != nil { + return common.Hash{}, false + } + if len(enc) != common.HashLength { + log.Error("Error decoding canonical store value", "error", "incorrect length for committee root entry in the database") + } + return common.BytesToHash(enc), true +} + +// serializedSyncCommitteeStore stores SerializedSyncCommittee +type serializedSyncCommitteeStore struct { + periods Range + cache *lru.Cache[uint64, *types.SerializedSyncCommittee] +} + +// newSerializedSyncCommitteSTore creates a new serializedSyncCommitteeStore and +// verifies the continuity of the range in database. +func newSerializedSyncCommitteSTore(db ethdb.Iteratee) *serializedSyncCommitteeStore { + return &serializedSyncCommitteeStore{ + cache: lru.NewCache[uint64, *types.SerializedSyncCommittee](100), + periods: determineRange(db, rawdb.SyncCommitteeKey), + } +} + +// databaseKey returns the database key belonging to the given period. +func (cs *serializedSyncCommitteeStore) databaseKey(period uint64) []byte { + return binary.BigEndian.AppendUint64(rawdb.SyncCommitteeKey, period) +} + +// add adds the given item to the database. It also ensures that the range remains +// continuous. Can be used either with a batch or database backend. +func (cs *serializedSyncCommitteeStore) add(db ethdb.KeyValueWriter, period uint64, value *types.SerializedSyncCommittee) error { + if !cs.periods.CanExpand(period) { + return fmt.Errorf("period expansion is not allowed, first: %d, next: %d, period: %d", cs.periods.Start, cs.periods.End, period) + } + if err := db.Put(cs.databaseKey(period), value[:]); err != nil { + return err + } + cs.cache.Add(period, value) + cs.periods.Expand(period) + return nil +} + +// deleteFrom removes items starting from the given period. +func (cs *serializedSyncCommitteeStore) deleteFrom(db ethdb.KeyValueWriter, fromPeriod uint64) (deleted Range) { + keepRange, deleteRange := cs.periods.Split(fromPeriod) + deleteRange.Each(func(period uint64) { + db.Delete(cs.databaseKey(period)) + cs.cache.Remove(period) + }) + cs.periods = keepRange + return deleteRange +} + +// get returns the item at the given period or the null value of the given type +// if no item is present. +// Note: get is thread safe in itself and therefore can be called either with +// locked or unlocked chain mutex. +func (cs *serializedSyncCommitteeStore) get(db ethdb.KeyValueReader, period uint64) (value *types.SerializedSyncCommittee, ok bool) { + if value, ok = cs.cache.Get(period); ok { + return + } + enc, err := db.Get(cs.databaseKey(period)) + if err != nil { + return nil, false + } + if len(enc) != types.SerializedSyncCommitteeSize { + log.Error("Error decoding canonical store value", "error", "incorrect length for serialized committee entry in the database") + return nil, false + } + committee := new(types.SerializedSyncCommittee) + copy(committee[:], enc) + return committee, true +} + +// updatesStore stores lightclient updates +type updatesStore struct { + periods Range + cache *lru.Cache[uint64, *types.LightClientUpdate] +} + +// newUpdatesStore creates a new updatesStore and +// verifies the continuity of the range in database. +func newUpdatesStore(db ethdb.Iteratee) *updatesStore { + return &updatesStore{ + cache: lru.NewCache[uint64, *types.LightClientUpdate](100), + periods: determineRange(db, rawdb.BestUpdateKey), + } +} + +// databaseKey returns the database key belonging to the given period. +func (cs *updatesStore) databaseKey(period uint64) []byte { + return binary.BigEndian.AppendUint64(rawdb.BestUpdateKey, period) +} + +// add adds the given item to the database. It also ensures that the range remains +// continuous. Can be used either with a batch or database backend. +func (cs *updatesStore) add(db ethdb.KeyValueWriter, period uint64, update *types.LightClientUpdate) error { + if !cs.periods.CanExpand(period) { + return fmt.Errorf("period expansion is not allowed, first: %d, next: %d, period: %d", cs.periods.Start, cs.periods.End, period) + } + enc, err := rlp.EncodeToBytes(update) + if err != nil { + return err + } + if err := db.Put(cs.databaseKey(period), enc); err != nil { + return err + } + cs.cache.Add(period, update) + cs.periods.Expand(period) + return nil +} + +// deleteFrom removes items starting from the given period. +func (cs *updatesStore) deleteFrom(db ethdb.KeyValueWriter, fromPeriod uint64) (deleted Range) { + keepRange, deleteRange := cs.periods.Split(fromPeriod) + deleteRange.Each(func(period uint64) { + db.Delete(cs.databaseKey(period)) + cs.cache.Remove(period) + }) + cs.periods = keepRange + return deleteRange +} + +// get returns the item at the given period or the null value of the given type +// if no item is present. +// Note: get is thread safe in itself and therefore can be called either with +// locked or unlocked chain mutex. +func (cs *updatesStore) get(db ethdb.KeyValueReader, period uint64) (value *types.LightClientUpdate, ok bool) { + if value, ok = cs.cache.Get(period); ok { + return + } + enc, err := db.Get(cs.databaseKey(period)) + if err != nil { + return nil, false + } + update := new(types.LightClientUpdate) + if err := rlp.DecodeBytes(enc, update); err != nil { + log.Error("Error decoding canonical store value", "error", err) + return nil, false + } + return update, true +} diff --git a/beacon/light/committee_chain.go b/beacon/light/committee_chain.go index 937453bff9d6..2516888620d6 100644 --- a/beacon/light/committee_chain.go +++ b/beacon/light/committee_chain.go @@ -28,10 +28,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rlp" ) var ( @@ -67,9 +65,9 @@ type CommitteeChain struct { // with each other and with committeeCache. chainmu sync.RWMutex db ethdb.KeyValueStore - updates *canonicalStore[*types.LightClientUpdate] - committees *canonicalStore[*types.SerializedSyncCommittee] - fixedCommitteeRoots *canonicalStore[common.Hash] + updates *updatesStore + committees *serializedSyncCommitteeStore + fixedCommitteeRoots *fixedCommitteeRootsStore committeeCache *lru.Cache[uint64, syncCommittee] // cache deserialized committees clock mclock.Clock // monotonic clock (simulated clock in tests) @@ -90,42 +88,10 @@ func NewCommitteeChain(db ethdb.KeyValueStore, config *types.ChainConfig, signer // newCommitteeChain creates a new CommitteeChain with the option of replacing the // clock source and signature verification for testing purposes. func newCommitteeChain(db ethdb.KeyValueStore, config *types.ChainConfig, signerThreshold int, enforceTime bool, sigVerifier committeeSigVerifier, clock mclock.Clock, unixNano func() int64) *CommitteeChain { - var ( - fixedCommitteeRootEncoder = func(root common.Hash) ([]byte, error) { - return root[:], nil - } - fixedCommitteeRootDecoder = func(enc []byte) (root common.Hash, err error) { - if len(enc) != common.HashLength { - return common.Hash{}, errors.New("incorrect length for committee root entry in the database") - } - return common.BytesToHash(enc), nil - } - committeeEncoder = func(committee *types.SerializedSyncCommittee) ([]byte, error) { - return committee[:], nil - } - committeeDecoder = func(enc []byte) (*types.SerializedSyncCommittee, error) { - if len(enc) == types.SerializedSyncCommitteeSize { - committee := new(types.SerializedSyncCommittee) - copy(committee[:], enc) - return committee, nil - } - return nil, errors.New("incorrect length for serialized committee entry in the database") - } - updateEncoder = func(update *types.LightClientUpdate) ([]byte, error) { - return rlp.EncodeToBytes(update) - } - updateDecoder = func(enc []byte) (*types.LightClientUpdate, error) { - update := new(types.LightClientUpdate) - if err := rlp.DecodeBytes(enc, update); err != nil { - return nil, err - } - return update, nil - } - ) s := &CommitteeChain{ - fixedCommitteeRoots: newCanonicalStore[common.Hash](db, rawdb.FixedCommitteeRootKey, fixedCommitteeRootEncoder, fixedCommitteeRootDecoder), - committees: newCanonicalStore[*types.SerializedSyncCommittee](db, rawdb.SyncCommitteeKey, committeeEncoder, committeeDecoder), - updates: newCanonicalStore[*types.LightClientUpdate](db, rawdb.BestUpdateKey, updateEncoder, updateDecoder), + fixedCommitteeRoots: newFixedCommitteeRootsStore(db), + committees: newSerializedSyncCommitteSTore(db), + updates: newUpdatesStore(db), committeeCache: lru.NewCache[uint64, syncCommittee](10), db: db, sigVerifier: sigVerifier, @@ -146,7 +112,7 @@ func newCommitteeChain(db ethdb.KeyValueStore, config *types.ChainConfig, signer } // roll back invalid updates (might be necessary if forks have been changed since last time) for !s.updates.periods.IsEmpty() { - update, ok := s.updates.get(s.updates.periods.End - 1) + update, ok := s.updates.get(db, s.updates.periods.End-1) if !ok { log.Error("Sync committee update missing", "period", s.updates.periods.End-1) s.Reset() @@ -378,7 +344,7 @@ func (s *CommitteeChain) InsertUpdate(update *types.LightClientUpdate, nextCommi } oldRoot := s.getCommitteeRoot(period + 1) reorg := oldRoot != (common.Hash{}) && oldRoot != update.NextSyncCommitteeRoot - if oldUpdate, ok := s.updates.get(period); ok && !update.Score().BetterThan(oldUpdate.Score()) { + if oldUpdate, ok := s.updates.get(s.db, period); ok && !update.Score().BetterThan(oldUpdate.Score()) { // a better or equal update already exists; no changes, only fail if new one tried to reorg if reorg { return ErrCannotReorg @@ -470,10 +436,10 @@ func (s *CommitteeChain) rollback(period uint64) error { // proven by a previous update or both. It returns an empty hash if the committee // root is unknown. func (s *CommitteeChain) getCommitteeRoot(period uint64) common.Hash { - if root, ok := s.fixedCommitteeRoots.get(period); ok || period == 0 { + if root, ok := s.fixedCommitteeRoots.get(s.db, period); ok || period == 0 { return root } - if update, ok := s.updates.get(period - 1); ok { + if update, ok := s.updates.get(s.db, period-1); ok { return update.NextSyncCommitteeRoot } return common.Hash{} @@ -484,7 +450,7 @@ func (s *CommitteeChain) getSyncCommittee(period uint64) (syncCommittee, error) if c, ok := s.committeeCache.Get(period); ok { return c, nil } - if sc, ok := s.committees.get(period); ok { + if sc, ok := s.committees.get(s.db, period); ok { c, err := s.sigVerifier.deserializeSyncCommittee(sc) if err != nil { return nil, fmt.Errorf("Sync committee #%d deserialization error: %v", period, err) diff --git a/beacon/light/range.go b/beacon/light/range.go index e9427ed3826d..11b5dc8b13d4 100644 --- a/beacon/light/range.go +++ b/beacon/light/range.go @@ -52,3 +52,27 @@ func (a *Range) Expand(period uint64) { a.End++ } } + +// Split splits the range into two ranges. The 'fromPeriod' will be the first +// element in the second range (if present). +// The original range is unchanged by this operation +func (a *Range) Split(fromPeriod uint64) (Range, Range) { + if fromPeriod <= a.Start { + // First range empty, everything in second range, + return Range{}, Range{a.Start, a.End} + } + if fromPeriod > a.End { + // Second range empty, everything in first range, + return Range{a.Start, a.End}, Range{} + } + x := Range{a.Start, fromPeriod} + y := Range{fromPeriod, a.End} + return x, y +} + +// Each invokes the supplied function fn once per period in range +func (a *Range) Each(fn func(uint64)) { + for p := a.Start; p < a.End; p++ { + fn(p) + } +}