forked from cosmos/cosmos-sdk
-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
# This is a combination of 5 commits.
# This is the 1st commit message: Enable locking kv store # The commit message #2 will be skipped: # Fix some lock orderings # The commit message #3 will be skipped: # Fix minor typo # The commit message #4 will be skipped: # Ensure that writes happen in a deterministic order. # # Ensure that reads are also done all the time, remove this if it doesn't impact gas. # The commit message #5 will be skipped: # Remove locking for now for lockingkv.Get/Has
- Loading branch information
Showing
3 changed files
with
325 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
package lockingkv | ||
|
||
import ( | ||
"cosmossdk.io/store/cachekv" | ||
"cosmossdk.io/store/tracekv" | ||
storetypes "cosmossdk.io/store/types" | ||
"golang.org/x/exp/slices" | ||
"io" | ||
"sort" | ||
"sync" | ||
) | ||
|
||
var _ storetypes.CacheKVStore = &Store{} | ||
var _ storetypes.LockingCacheWrapper = &Store{} | ||
var _ storetypes.CacheKVStore = &lockedkv{} | ||
|
||
func NewStore(parent storetypes.KVStore) *Store { | ||
return &Store{ | ||
parent: parent, | ||
locks: &sync.Map{}, | ||
} | ||
} | ||
|
||
type lockAndValue struct { | ||
lock sync.Mutex | ||
value []byte | ||
} | ||
|
||
type Store struct { | ||
parent storetypes.KVStore | ||
locks *sync.Map /* map from string key to lockAndValue. */ | ||
} | ||
|
||
// getSortedKeys returns the keys of the map in sorted order. | ||
func getSortedKeys[R interface { | ||
~[]K | ||
sort.Interface | ||
}, K comparable, V any](m map[K]V) []K { | ||
keys := make([]K, 0, len(m)) | ||
for k := range m { | ||
keys = append(keys, k) | ||
} | ||
sort.Sort(R(keys)) | ||
return keys | ||
} | ||
|
||
func (s *Store) Write() { | ||
values := make(map[string]*lockAndValue) | ||
s.locks.Range(func(key, value any) bool { | ||
lv := value.(*lockAndValue) | ||
values[key.(string)] = lv | ||
return true | ||
}) | ||
|
||
// We need to make the mutations to the parent in a deterministic order to ensure a deterministic hash. | ||
for _, sortedKey := range getSortedKeys[sort.StringSlice](values) { | ||
lv := values[sortedKey] | ||
lv.lock.Lock() | ||
defer lv.lock.Unlock() | ||
if lv.value == nil { | ||
s.parent.Delete([]byte(sortedKey)) | ||
} else { | ||
s.parent.Set([]byte(sortedKey), lv.value) | ||
} | ||
} | ||
} | ||
|
||
func (s *Store) GetStoreType() storetypes.StoreType { | ||
return s.parent.GetStoreType() | ||
} | ||
|
||
func (s *Store) CacheWrap() storetypes.CacheWrap { | ||
return cachekv.NewStore(s) | ||
} | ||
|
||
func (s *Store) CacheWrapWithTrace(w io.Writer, tc storetypes.TraceContext) storetypes.CacheWrap { | ||
return cachekv.NewStore(tracekv.NewStore(s, w, tc)) | ||
} | ||
|
||
func (s *Store) CacheWrapWithLocks(keys [][]byte) storetypes.CacheWrap { | ||
stringKeys := make([]string, len(keys)) | ||
for i, key := range keys { | ||
stringKeys[i] = string(key) | ||
} | ||
// Ensure that we always operate in a deterministic ordering when acquiring locks to prevent deadlock. | ||
slices.Sort(stringKeys) | ||
for _, stringKey := range stringKeys { | ||
// If we created this instance holding the lock otherwise attempt to acquire the lock on the previous instance. | ||
lv := &lockAndValue{ | ||
// TODO: Does this reading into the parent store impact gas calculations? If not let us only do it if it | ||
// is required. | ||
value: s.parent.Get([]byte(stringKey)), | ||
} | ||
lv.lock.Lock() | ||
v, loaded := s.locks.LoadOrStore(stringKey, lv) | ||
if loaded { | ||
lv = v.(*lockAndValue) | ||
lv.lock.Lock() | ||
} | ||
} | ||
|
||
return &lockedkv{ | ||
parent: s, | ||
sortedKeys: stringKeys, | ||
mutations: make(map[string][]byte), | ||
} | ||
} | ||
|
||
func (s *Store) Get(key []byte) []byte { | ||
stringKey := string(key) | ||
v, loaded := s.locks.Load(stringKey) | ||
if loaded { | ||
lv := v.(*lockAndValue) | ||
// Do we need to lock here?, currently causes deadlock due to lockedkv calls s.parent.Get(...) | ||
//lv.lock.Lock() | ||
//defer lv.lock.Unlock() | ||
return lv.value | ||
} | ||
|
||
return s.parent.Get(key) | ||
} | ||
|
||
func (s *Store) Has(key []byte) bool { | ||
// TODO: Can we see if we only use this store during check state and then we wouldn't have to implement get/set/... | ||
stringKey := string(key) | ||
v, loaded := s.locks.Load(stringKey) | ||
if loaded { | ||
lv := v.(*lockAndValue) | ||
// Do we need to lock here?, currently causes deadlock due to lockedkv calls s.parent.Has(...) | ||
//lv.lock.Lock() | ||
//defer lv.lock.Unlock() | ||
return lv.value != nil | ||
} | ||
return s.parent.Has(key) | ||
} | ||
|
||
func (s *Store) Set(key, value []byte) { | ||
stringKey := string(key) | ||
v, loaded := s.locks.LoadOrStore(stringKey, &lockAndValue{ | ||
value: value, | ||
}) | ||
if loaded { | ||
lv := v.(*lockAndValue) | ||
lv.lock.Lock() | ||
defer lv.lock.Unlock() | ||
lv.value = value | ||
} | ||
} | ||
|
||
func (s *Store) Delete(key []byte) { | ||
s.Set(key, nil) | ||
} | ||
|
||
func (s *Store) Iterator(start, end []byte) storetypes.Iterator { | ||
panic("This store does not support iteration.") | ||
} | ||
|
||
func (s *Store) ReverseIterator(start, end []byte) storetypes.Iterator { | ||
panic("This store does not support iteration.") | ||
} | ||
|
||
func (s *Store) writeAndUnlock(keys []string, mutations map[string][]byte) { | ||
for _, key := range keys { | ||
v, ok := s.locks.Load(key) | ||
if !ok { | ||
panic("Key not found") | ||
} | ||
lv := v.(*lockAndValue) | ||
|
||
// Update the value if it was mutated while the lock was held. | ||
if newValue, wasMutated := mutations[key]; wasMutated { | ||
lv.value = newValue | ||
} | ||
|
||
// Unlock the row lock. | ||
lv.lock.Unlock() | ||
} | ||
} | ||
|
||
type lockedkv struct { | ||
parent *Store | ||
|
||
sortedKeys []string | ||
mutations map[string][]byte | ||
} | ||
|
||
func (s *lockedkv) Write() { | ||
s.parent.writeAndUnlock(s.sortedKeys, s.mutations) | ||
} | ||
|
||
func (s *lockedkv) GetStoreType() storetypes.StoreType { | ||
return s.parent.GetStoreType() | ||
} | ||
|
||
func (s *lockedkv) CacheWrap() storetypes.CacheWrap { | ||
return cachekv.NewStore(s) | ||
} | ||
|
||
func (s *lockedkv) CacheWrapWithTrace(w io.Writer, tc storetypes.TraceContext) storetypes.CacheWrap { | ||
return cachekv.NewStore(tracekv.NewStore(s, w, tc)) | ||
} | ||
|
||
func (s *lockedkv) Get(key []byte) []byte { | ||
if key == nil { | ||
panic("nil key") | ||
} | ||
stringKey := string(key) | ||
|
||
if value, ok := s.mutations[stringKey]; ok { | ||
return value | ||
} | ||
|
||
return s.parent.Get(key) | ||
} | ||
|
||
func (s *lockedkv) Has(key []byte) bool { | ||
if key == nil { | ||
panic("nil key") | ||
} | ||
stringKey := string(key) | ||
|
||
if value, ok := s.mutations[stringKey]; ok { | ||
return value != nil | ||
} | ||
|
||
return s.parent.Has(key) | ||
} | ||
|
||
func (s *lockedkv) Set(key, value []byte) { | ||
if key == nil { | ||
panic("nil key") | ||
} | ||
stringKey := string(key) | ||
|
||
i := sort.SearchStrings(s.sortedKeys, stringKey) | ||
if i < len(s.sortedKeys) && s.sortedKeys[i] != stringKey { | ||
panic("Setting value without locking being held for key, did you mean to use CacheWrapWithLockedKeys(keys)?") | ||
} | ||
|
||
s.mutations[stringKey] = value | ||
} | ||
|
||
func (s *lockedkv) Delete(key []byte) { | ||
s.Set(key, nil) | ||
} | ||
|
||
func (s *lockedkv) Iterator(start, end []byte) storetypes.Iterator { | ||
panic("This store does not support iteration.") | ||
} | ||
|
||
func (s *lockedkv) ReverseIterator(start, end []byte) storetypes.Iterator { | ||
panic("This store does not support iteration.") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters