diff --git a/beacon/light/canonical.go b/beacon/light/canonical.go
deleted file mode 100644
index 50c70e1ffae4..000000000000
--- a/beacon/light/canonical.go
+++ /dev/null
@@ -1,140 +0,0 @@
-// Copyright 2023 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package light
-
-import (
- "encoding/binary"
- "fmt"
-
- "github.com/ethereum/go-ethereum/common/lru"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/log"
-)
-
-// canonicalStore stores instances of the given type in a database and caches
-// them in memory, associated with a continuous range of period numbers.
-// Note: canonicalStore is not thread safe and it is the caller's responsibility
-// to avoid concurrent access.
-type canonicalStore[T any] struct {
- db ethdb.KeyValueStore
- keyPrefix []byte
- periods Range
- cache *lru.Cache[uint64, T]
- encode func(T) ([]byte, error)
- decode func([]byte) (T, error)
-}
-
-// newCanonicalStore creates a new canonicalStore and loads all keys associated
-// with the keyPrefix in order to determine the ranges available in the database.
-func newCanonicalStore[T any](db ethdb.KeyValueStore, keyPrefix []byte,
- encode func(T) ([]byte, error), decode func([]byte) (T, error)) *canonicalStore[T] {
- cs := &canonicalStore[T]{
- db: db,
- keyPrefix: keyPrefix,
- encode: encode,
- decode: decode,
- cache: lru.NewCache[uint64, T](100),
- }
- var (
- iter = db.NewIterator(keyPrefix, nil)
- kl = len(keyPrefix)
- )
- for iter.Next() {
- if len(iter.Key()) != kl+8 {
- log.Warn("Invalid key length in the canonical chain database", "key", fmt.Sprintf("%#x", iter.Key()))
- continue
- }
- period := binary.BigEndian.Uint64(iter.Key()[kl : kl+8])
- if cs.periods.Start == 0 {
- cs.periods.Start = period
- } else if cs.periods.End != period {
- log.Warn("Gap in the canonical chain database")
- break // continuity guaranteed
- }
- cs.periods.End = period + 1
- }
- iter.Release()
- return cs
-}
-
-// databaseKey returns the database key belonging to the given period.
-func (cs *canonicalStore[T]) databaseKey(period uint64) []byte {
- var (
- kl = len(cs.keyPrefix)
- key = make([]byte, kl+8)
- )
- copy(key[:kl], cs.keyPrefix)
- binary.BigEndian.PutUint64(key[kl:], period)
- return key
-}
-
-// add adds the given item to the database. It also ensures that the range remains
-// continuous. Can be used either with a batch or database backend.
-func (cs *canonicalStore[T]) add(backend ethdb.KeyValueWriter, period uint64, value T) error {
- if !cs.periods.CanExpand(period) {
- return fmt.Errorf("period expansion is not allowed, first: %d, next: %d, period: %d", cs.periods.Start, cs.periods.End, period)
- }
- enc, err := cs.encode(value)
- if err != nil {
- return err
- }
- if err := backend.Put(cs.databaseKey(period), enc); err != nil {
- return err
- }
- cs.cache.Add(period, value)
- cs.periods.Expand(period)
- return nil
-}
-
-// deleteFrom removes items starting from the given period.
-func (cs *canonicalStore[T]) deleteFrom(batch ethdb.Batch, fromPeriod uint64) (deleted Range) {
- if fromPeriod >= cs.periods.End {
- return
- }
- if fromPeriod < cs.periods.Start {
- fromPeriod = cs.periods.Start
- }
- deleted = Range{Start: fromPeriod, End: cs.periods.End}
- for period := fromPeriod; period < cs.periods.End; period++ {
- batch.Delete(cs.databaseKey(period))
- cs.cache.Remove(period)
- }
- if fromPeriod > cs.periods.Start {
- cs.periods.End = fromPeriod
- } else {
- cs.periods = Range{}
- }
- return
-}
-
-// get returns the item at the given period or the null value of the given type
-// if no item is present.
-// Note: get is thread safe in itself and therefore can be called either with
-// locked or unlocked chain mutex.
-func (cs *canonicalStore[T]) get(period uint64) (value T, ok bool) {
- if value, ok = cs.cache.Get(period); ok {
- return
- }
- if enc, err := cs.db.Get(cs.databaseKey(period)); err == nil {
- if v, err := cs.decode(enc); err == nil {
- value, ok = v, true
- } else {
- log.Error("Error decoding canonical store value", "error", err)
- }
- }
- return
-}
diff --git a/beacon/light/canonicalstore.go b/beacon/light/canonicalstore.go
new file mode 100644
index 000000000000..c3ecb5b57bc5
--- /dev/null
+++ b/beacon/light/canonicalstore.go
@@ -0,0 +1,252 @@
+// Copyright 2023 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package light
+
+import (
+ "encoding/binary"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/beacon/types"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/lru"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+func determineRange(db ethdb.Iteratee, keyPrefix []byte) Range {
+ var (
+ iter = db.NewIterator(keyPrefix, nil)
+ keyLength = len(keyPrefix)
+ p Range
+ )
+ defer iter.Release()
+ for iter.Next() {
+ if len(iter.Key()) != keyLength+8 {
+ log.Warn("Invalid key length in the canonical chain database", "key", fmt.Sprintf("%#x", iter.Key()))
+ continue
+ }
+ period := binary.BigEndian.Uint64(iter.Key()[keyLength : keyLength+8])
+ if p.Start == 0 {
+ p.Start = period
+ } else if p.End != period {
+ log.Warn("Gap in the canonical chain database")
+ break // continuity guaranteed
+ }
+ p.End = period + 1
+ }
+ return p
+}
+
+// fixedCommitteeRootsStore stores fixedCommitteeRoots
+type fixedCommitteeRootsStore struct {
+ periods Range
+ cache *lru.Cache[uint64, common.Hash]
+}
+
+// newFixedCommitteeRootsStore creates a new fixedCommitteeRootsStore and
+// verifies the continuity of the range in database.
+func newFixedCommitteeRootsStore(db ethdb.Iteratee) *fixedCommitteeRootsStore {
+ return &fixedCommitteeRootsStore{
+ cache: lru.NewCache[uint64, common.Hash](100),
+ periods: determineRange(db, rawdb.FixedCommitteeRootKey),
+ }
+}
+
+// databaseKey returns the database key belonging to the given period.
+func (cs *fixedCommitteeRootsStore) databaseKey(period uint64) []byte {
+ return binary.BigEndian.AppendUint64(rawdb.FixedCommitteeRootKey, period)
+}
+
+// add adds the given item to the database. It also ensures that the range remains
+// continuous. Can be used either with a batch or database backend.
+func (cs *fixedCommitteeRootsStore) add(backend ethdb.KeyValueWriter, period uint64, value common.Hash) error {
+ if !cs.periods.CanExpand(period) {
+ return fmt.Errorf("period expansion is not allowed, first: %d, next: %d, period: %d", cs.periods.Start, cs.periods.End, period)
+ }
+ if err := backend.Put(cs.databaseKey(period), value[:]); err != nil {
+ return err
+ }
+ cs.cache.Add(period, value)
+ cs.periods.Expand(period)
+ return nil
+}
+
+// deleteFrom removes items starting from the given period.
+func (cs *fixedCommitteeRootsStore) deleteFrom(db ethdb.KeyValueWriter, fromPeriod uint64) (deleted Range) {
+ keepRange, deleteRange := cs.periods.Split(fromPeriod)
+ deleteRange.Each(func(period uint64) {
+ db.Delete(cs.databaseKey(period))
+ cs.cache.Remove(period)
+ })
+ cs.periods = keepRange
+ return deleteRange
+}
+
+// get returns the item at the given period or the null value of the given type
+// if no item is present.
+// Note: get is thread safe in itself and therefore can be called either with
+// locked or unlocked chain mutex.
+func (cs *fixedCommitteeRootsStore) get(backend ethdb.KeyValueReader, period uint64) (value common.Hash, ok bool) {
+ if value, ok = cs.cache.Get(period); ok {
+ return
+ }
+ enc, err := backend.Get(cs.databaseKey(period))
+ if err != nil {
+ return common.Hash{}, false
+ }
+ if len(enc) != common.HashLength {
+ log.Error("Error decoding canonical store value", "error", "incorrect length for committee root entry in the database")
+ }
+ return common.BytesToHash(enc), true
+}
+
+// serializedSyncCommitteeStore stores SerializedSyncCommittee
+type serializedSyncCommitteeStore struct {
+ periods Range
+ cache *lru.Cache[uint64, *types.SerializedSyncCommittee]
+}
+
+// newSerializedSyncCommitteSTore creates a new serializedSyncCommitteeStore and
+// verifies the continuity of the range in database.
+func newSerializedSyncCommitteSTore(db ethdb.Iteratee) *serializedSyncCommitteeStore {
+ return &serializedSyncCommitteeStore{
+ cache: lru.NewCache[uint64, *types.SerializedSyncCommittee](100),
+ periods: determineRange(db, rawdb.SyncCommitteeKey),
+ }
+}
+
+// databaseKey returns the database key belonging to the given period.
+func (cs *serializedSyncCommitteeStore) databaseKey(period uint64) []byte {
+ return binary.BigEndian.AppendUint64(rawdb.SyncCommitteeKey, period)
+}
+
+// add adds the given item to the database. It also ensures that the range remains
+// continuous. Can be used either with a batch or database backend.
+func (cs *serializedSyncCommitteeStore) add(db ethdb.KeyValueWriter, period uint64, value *types.SerializedSyncCommittee) error {
+ if !cs.periods.CanExpand(period) {
+ return fmt.Errorf("period expansion is not allowed, first: %d, next: %d, period: %d", cs.periods.Start, cs.periods.End, period)
+ }
+ if err := db.Put(cs.databaseKey(period), value[:]); err != nil {
+ return err
+ }
+ cs.cache.Add(period, value)
+ cs.periods.Expand(period)
+ return nil
+}
+
+// deleteFrom removes items starting from the given period.
+func (cs *serializedSyncCommitteeStore) deleteFrom(db ethdb.KeyValueWriter, fromPeriod uint64) (deleted Range) {
+ keepRange, deleteRange := cs.periods.Split(fromPeriod)
+ deleteRange.Each(func(period uint64) {
+ db.Delete(cs.databaseKey(period))
+ cs.cache.Remove(period)
+ })
+ cs.periods = keepRange
+ return deleteRange
+}
+
+// get returns the item at the given period or the null value of the given type
+// if no item is present.
+// Note: get is thread safe in itself and therefore can be called either with
+// locked or unlocked chain mutex.
+func (cs *serializedSyncCommitteeStore) get(db ethdb.KeyValueReader, period uint64) (value *types.SerializedSyncCommittee, ok bool) {
+ if value, ok = cs.cache.Get(period); ok {
+ return
+ }
+ enc, err := db.Get(cs.databaseKey(period))
+ if err != nil {
+ return nil, false
+ }
+ if len(enc) != types.SerializedSyncCommitteeSize {
+ log.Error("Error decoding canonical store value", "error", "incorrect length for serialized committee entry in the database")
+ return nil, false
+ }
+ committee := new(types.SerializedSyncCommittee)
+ copy(committee[:], enc)
+ return committee, true
+}
+
+// updatesStore stores lightclient updates
+type updatesStore struct {
+ periods Range
+ cache *lru.Cache[uint64, *types.LightClientUpdate]
+}
+
+// newUpdatesStore creates a new updatesStore and
+// verifies the continuity of the range in database.
+func newUpdatesStore(db ethdb.Iteratee) *updatesStore {
+ return &updatesStore{
+ cache: lru.NewCache[uint64, *types.LightClientUpdate](100),
+ periods: determineRange(db, rawdb.BestUpdateKey),
+ }
+}
+
+// databaseKey returns the database key belonging to the given period.
+func (cs *updatesStore) databaseKey(period uint64) []byte {
+ return binary.BigEndian.AppendUint64(rawdb.BestUpdateKey, period)
+}
+
+// add adds the given item to the database. It also ensures that the range remains
+// continuous. Can be used either with a batch or database backend.
+func (cs *updatesStore) add(db ethdb.KeyValueWriter, period uint64, update *types.LightClientUpdate) error {
+ if !cs.periods.CanExpand(period) {
+ return fmt.Errorf("period expansion is not allowed, first: %d, next: %d, period: %d", cs.periods.Start, cs.periods.End, period)
+ }
+ enc, err := rlp.EncodeToBytes(update)
+ if err != nil {
+ return err
+ }
+ if err := db.Put(cs.databaseKey(period), enc); err != nil {
+ return err
+ }
+ cs.cache.Add(period, update)
+ cs.periods.Expand(period)
+ return nil
+}
+
+// deleteFrom removes items starting from the given period.
+func (cs *updatesStore) deleteFrom(db ethdb.KeyValueWriter, fromPeriod uint64) (deleted Range) {
+ keepRange, deleteRange := cs.periods.Split(fromPeriod)
+ deleteRange.Each(func(period uint64) {
+ db.Delete(cs.databaseKey(period))
+ cs.cache.Remove(period)
+ })
+ cs.periods = keepRange
+ return deleteRange
+}
+
+// get returns the item at the given period or the null value of the given type
+// if no item is present.
+// Note: get is thread safe in itself and therefore can be called either with
+// locked or unlocked chain mutex.
+func (cs *updatesStore) get(db ethdb.KeyValueReader, period uint64) (value *types.LightClientUpdate, ok bool) {
+ if value, ok = cs.cache.Get(period); ok {
+ return
+ }
+ enc, err := db.Get(cs.databaseKey(period))
+ if err != nil {
+ return nil, false
+ }
+ update := new(types.LightClientUpdate)
+ if err := rlp.DecodeBytes(enc, update); err != nil {
+ log.Error("Error decoding canonical store value", "error", err)
+ return nil, false
+ }
+ return update, true
+}
diff --git a/beacon/light/committee_chain.go b/beacon/light/committee_chain.go
index 937453bff9d6..2516888620d6 100644
--- a/beacon/light/committee_chain.go
+++ b/beacon/light/committee_chain.go
@@ -28,10 +28,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/common/mclock"
- "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
- "github.com/ethereum/go-ethereum/rlp"
)
var (
@@ -67,9 +65,9 @@ type CommitteeChain struct {
// with each other and with committeeCache.
chainmu sync.RWMutex
db ethdb.KeyValueStore
- updates *canonicalStore[*types.LightClientUpdate]
- committees *canonicalStore[*types.SerializedSyncCommittee]
- fixedCommitteeRoots *canonicalStore[common.Hash]
+ updates *updatesStore
+ committees *serializedSyncCommitteeStore
+ fixedCommitteeRoots *fixedCommitteeRootsStore
committeeCache *lru.Cache[uint64, syncCommittee] // cache deserialized committees
clock mclock.Clock // monotonic clock (simulated clock in tests)
@@ -90,42 +88,10 @@ func NewCommitteeChain(db ethdb.KeyValueStore, config *types.ChainConfig, signer
// newCommitteeChain creates a new CommitteeChain with the option of replacing the
// clock source and signature verification for testing purposes.
func newCommitteeChain(db ethdb.KeyValueStore, config *types.ChainConfig, signerThreshold int, enforceTime bool, sigVerifier committeeSigVerifier, clock mclock.Clock, unixNano func() int64) *CommitteeChain {
- var (
- fixedCommitteeRootEncoder = func(root common.Hash) ([]byte, error) {
- return root[:], nil
- }
- fixedCommitteeRootDecoder = func(enc []byte) (root common.Hash, err error) {
- if len(enc) != common.HashLength {
- return common.Hash{}, errors.New("incorrect length for committee root entry in the database")
- }
- return common.BytesToHash(enc), nil
- }
- committeeEncoder = func(committee *types.SerializedSyncCommittee) ([]byte, error) {
- return committee[:], nil
- }
- committeeDecoder = func(enc []byte) (*types.SerializedSyncCommittee, error) {
- if len(enc) == types.SerializedSyncCommitteeSize {
- committee := new(types.SerializedSyncCommittee)
- copy(committee[:], enc)
- return committee, nil
- }
- return nil, errors.New("incorrect length for serialized committee entry in the database")
- }
- updateEncoder = func(update *types.LightClientUpdate) ([]byte, error) {
- return rlp.EncodeToBytes(update)
- }
- updateDecoder = func(enc []byte) (*types.LightClientUpdate, error) {
- update := new(types.LightClientUpdate)
- if err := rlp.DecodeBytes(enc, update); err != nil {
- return nil, err
- }
- return update, nil
- }
- )
s := &CommitteeChain{
- fixedCommitteeRoots: newCanonicalStore[common.Hash](db, rawdb.FixedCommitteeRootKey, fixedCommitteeRootEncoder, fixedCommitteeRootDecoder),
- committees: newCanonicalStore[*types.SerializedSyncCommittee](db, rawdb.SyncCommitteeKey, committeeEncoder, committeeDecoder),
- updates: newCanonicalStore[*types.LightClientUpdate](db, rawdb.BestUpdateKey, updateEncoder, updateDecoder),
+ fixedCommitteeRoots: newFixedCommitteeRootsStore(db),
+ committees: newSerializedSyncCommitteSTore(db),
+ updates: newUpdatesStore(db),
committeeCache: lru.NewCache[uint64, syncCommittee](10),
db: db,
sigVerifier: sigVerifier,
@@ -146,7 +112,7 @@ func newCommitteeChain(db ethdb.KeyValueStore, config *types.ChainConfig, signer
}
// roll back invalid updates (might be necessary if forks have been changed since last time)
for !s.updates.periods.IsEmpty() {
- update, ok := s.updates.get(s.updates.periods.End - 1)
+ update, ok := s.updates.get(db, s.updates.periods.End-1)
if !ok {
log.Error("Sync committee update missing", "period", s.updates.periods.End-1)
s.Reset()
@@ -378,7 +344,7 @@ func (s *CommitteeChain) InsertUpdate(update *types.LightClientUpdate, nextCommi
}
oldRoot := s.getCommitteeRoot(period + 1)
reorg := oldRoot != (common.Hash{}) && oldRoot != update.NextSyncCommitteeRoot
- if oldUpdate, ok := s.updates.get(period); ok && !update.Score().BetterThan(oldUpdate.Score()) {
+ if oldUpdate, ok := s.updates.get(s.db, period); ok && !update.Score().BetterThan(oldUpdate.Score()) {
// a better or equal update already exists; no changes, only fail if new one tried to reorg
if reorg {
return ErrCannotReorg
@@ -470,10 +436,10 @@ func (s *CommitteeChain) rollback(period uint64) error {
// proven by a previous update or both. It returns an empty hash if the committee
// root is unknown.
func (s *CommitteeChain) getCommitteeRoot(period uint64) common.Hash {
- if root, ok := s.fixedCommitteeRoots.get(period); ok || period == 0 {
+ if root, ok := s.fixedCommitteeRoots.get(s.db, period); ok || period == 0 {
return root
}
- if update, ok := s.updates.get(period - 1); ok {
+ if update, ok := s.updates.get(s.db, period-1); ok {
return update.NextSyncCommitteeRoot
}
return common.Hash{}
@@ -484,7 +450,7 @@ func (s *CommitteeChain) getSyncCommittee(period uint64) (syncCommittee, error)
if c, ok := s.committeeCache.Get(period); ok {
return c, nil
}
- if sc, ok := s.committees.get(period); ok {
+ if sc, ok := s.committees.get(s.db, period); ok {
c, err := s.sigVerifier.deserializeSyncCommittee(sc)
if err != nil {
return nil, fmt.Errorf("Sync committee #%d deserialization error: %v", period, err)
diff --git a/beacon/light/range.go b/beacon/light/range.go
index e9427ed3826d..11b5dc8b13d4 100644
--- a/beacon/light/range.go
+++ b/beacon/light/range.go
@@ -52,3 +52,27 @@ func (a *Range) Expand(period uint64) {
a.End++
}
}
+
+// Split splits the range into two ranges. The 'fromPeriod' will be the first
+// element in the second range (if present).
+// The original range is unchanged by this operation
+func (a *Range) Split(fromPeriod uint64) (Range, Range) {
+ if fromPeriod <= a.Start {
+ // First range empty, everything in second range,
+ return Range{}, Range{a.Start, a.End}
+ }
+ if fromPeriod > a.End {
+ // Second range empty, everything in first range,
+ return Range{a.Start, a.End}, Range{}
+ }
+ x := Range{a.Start, fromPeriod}
+ y := Range{fromPeriod, a.End}
+ return x, y
+}
+
+// Each invokes the supplied function fn once per period in range
+func (a *Range) Each(fn func(uint64)) {
+ for p := a.Start; p < a.End; p++ {
+ fn(p)
+ }
+}