Skip to content

Commit

Permalink
Merge pull request #15 from testinprod-io/upstream-v2.44.0-f166e7d
Browse files Browse the repository at this point in the history
Upstream sync - v2.44.0
  • Loading branch information
ImTei authored Jul 18, 2023
2 parents 5340344 + b91c78e commit 97f822d
Show file tree
Hide file tree
Showing 80 changed files with 2,287 additions and 2,177 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ $(GOBINREL):

$(GOBINREL)/protoc: | $(GOBINREL)
$(eval PROTOC_TMP := $(shell mktemp -d))
curl -sSL https://github.com/protocolbuffers/protobuf/releases/download/v22.2/protoc-22.2-$(PROTOC_OS)-$(ARCH).zip -o "$(PROTOC_TMP)/protoc.zip"
curl -sSL https://github.com/protocolbuffers/protobuf/releases/download/v22.3/protoc-22.3-$(PROTOC_OS)-$(ARCH).zip -o "$(PROTOC_TMP)/protoc.zip"
cd "$(PROTOC_TMP)" && unzip protoc.zip
cp "$(PROTOC_TMP)/bin/protoc" "$(GOBIN)"
mkdir -p "$(PROTOC_INCLUDE)"
Expand Down
17 changes: 14 additions & 3 deletions chain/chain_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ type BorConfig struct {
CalcuttaBlock *big.Int `json:"calcuttaBlock"` // Calcutta switch block (nil = no fork, 0 = already on calcutta)
JaipurBlock *big.Int `json:"jaipurBlock"` // Jaipur switch block (nil = no fork, 0 = already on jaipur)
DelhiBlock *big.Int `json:"delhiBlock"` // Delhi switch block (nil = no fork, 0 = already on delhi)

IndoreBlock *big.Int `json:"indoreBlock"` // Indore switch block (nil = no fork, 0 = already on indore)
StateSyncConfirmationDelay map[string]uint64 `json:"stateSyncConfirmationDelay"` // StateSync Confirmation Delay, in seconds, to calculate `to`
}

// String implements the stringer interface, returning the consensus engine details.
Expand All @@ -482,11 +485,11 @@ func (b *BorConfig) String() string {
}

func (c *BorConfig) CalculateProducerDelay(number uint64) uint64 {
return c.sprintSize(c.ProducerDelay, number)
return borKeyValueConfigHelper(c.ProducerDelay, number)
}

func (c *BorConfig) CalculateSprint(number uint64) uint64 {
return c.sprintSize(c.Sprint, number)
return borKeyValueConfigHelper(c.Sprint, number)
}

func (c *BorConfig) CalculateBackupMultiplier(number uint64) uint64 {
Expand All @@ -513,6 +516,14 @@ func (c *BorConfig) IsOnCalcutta(number *big.Int) bool {
return numEqual(c.CalcuttaBlock, number)
}

func (c *BorConfig) IsIndore(number uint64) bool {
return isForked(c.IndoreBlock, number)
}

func (c *BorConfig) CalculateStateSyncDelay(number uint64) uint64 {
return borKeyValueConfigHelper(c.StateSyncConfirmationDelay, number)
}

func (c *BorConfig) calcConfig(field map[string]uint64, number uint64) uint64 {
keys := sortMapKeys(field)
for i := 0; i < len(keys)-1; i++ {
Expand All @@ -525,7 +536,7 @@ func (c *BorConfig) calcConfig(field map[string]uint64, number uint64) uint64 {
return field[keys[len(keys)-1]]
}

func (c *BorConfig) sprintSize(field map[string]uint64, number uint64) uint64 {
func borKeyValueConfigHelper(field map[string]uint64, number uint64) uint64 {
keys := sortMapKeys(field)
for i := 0; i < len(keys)-1; i++ {
valUint, _ := strconv.ParseUint(keys[i], 10, 64)
Expand Down
34 changes: 27 additions & 7 deletions common/dbg/experiments.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,25 +193,45 @@ func BigRwTxKb() uint {
}

var (
slowCommit time.Duration
getSlowCommit sync.Once
slowCommit time.Duration
slowCommitOnce sync.Once
)

func SlowCommit() time.Duration {
getSlowCommit.Do(func() {
v, _ := os.LookupEnv("DEBUG_SLOW_COMMIT_MS")
slowCommitOnce.Do(func() {
v, _ := os.LookupEnv("SLOW_COMMIT")
if v != "" {
i, err := strconv.Atoi(v)
var err error
slowCommit, err = time.ParseDuration(v)
if err != nil {
panic(err)
}
slowCommit = time.Duration(i) * time.Millisecond
log.Info("[Experiment]", "DEBUG_BIG_RW_TX_KB", slowCommit)
log.Info("[Experiment]", "SLOW_COMMIT", slowCommit.String())
}
})
return slowCommit
}

var (
slowTx time.Duration
slowTxOnce sync.Once
)

func SlowTx() time.Duration {
slowTxOnce.Do(func() {
v, _ := os.LookupEnv("SLOW_TX")
if v != "" {
var err error
slowTx, err = time.ParseDuration(v)
if err != nil {
panic(err)
}
log.Info("[Experiment]", "SLOW_TX", slowTx.String())
}
})
return slowTx
}

var (
stopBeforeStage string
stopBeforeStageFlag sync.Once
Expand Down
105 changes: 105 additions & 0 deletions common/dbg/leak_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package dbg

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ledgerwatch/log/v3"
)

// LeakDetector - use it to find which resource was created but not closed (leaked)
// periodically does print in logs resources which living longer than 1min with their creation stack trace
// For example db transactions can call Add/Del from Begin/Commit/Rollback methods
type LeakDetector struct {
enabled atomic.Bool
slowThreshold atomic.Pointer[time.Duration]
autoIncrement atomic.Uint64

list map[uint64]LeakDetectorItem
listLock sync.Mutex
}

type LeakDetectorItem struct {
stack string
started time.Time
}

func NewLeakDetector(name string, slowThreshold time.Duration) *LeakDetector {
enabled := slowThreshold > 0
if !enabled {
return nil
}
d := &LeakDetector{list: map[uint64]LeakDetectorItem{}}
d.SetSlowThreshold(slowThreshold)

if enabled {
go func() {
logEvery := time.NewTicker(60 * time.Second)
defer logEvery.Stop()

for {
select {
case <-logEvery.C:
if list := d.slowList(); len(list) > 0 {
log.Info(fmt.Sprintf("[dbg.%s] long living resources", name), "list", strings.Join(d.slowList(), ", "))
}
}
}
}()
}
return d
}

func (d *LeakDetector) slowList() (res []string) {
if d == nil || !d.Enabled() {
return res
}
slowThreshold := *d.slowThreshold.Load()

d.listLock.Lock()
defer d.listLock.Unlock()
i := 0
for key, value := range d.list {
living := time.Since(value.started)
if living > slowThreshold {
res = append(res, fmt.Sprintf("%d(%s): %s", key, living, value.stack))
}
i++
if i > 10 { // protect logs from too many output
break
}
}
return res
}

func (d *LeakDetector) Del(id uint64) {
if d == nil || !d.Enabled() {
return
}
d.listLock.Lock()
defer d.listLock.Unlock()
delete(d.list, id)
}
func (d *LeakDetector) Add() uint64 {
if d == nil || !d.Enabled() {
return 0
}
ac := LeakDetectorItem{
stack: StackSkip(2),
started: time.Now(),
}
id := d.autoIncrement.Add(1)
d.listLock.Lock()
defer d.listLock.Unlock()
d.list[id] = ac
return id
}

func (d *LeakDetector) Enabled() bool { return d.enabled.Load() }
func (d *LeakDetector) SetSlowThreshold(t time.Duration) {
d.slowThreshold.Store(&t)
d.enabled.Store(t > 0)
}
3 changes: 3 additions & 0 deletions common/dbg/log_panic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ import (
func Stack() string {
return stack2.Trace().TrimBelow(stack2.Caller(1)).String()
}
func StackSkip(skip int) string {
return stack2.Trace().TrimBelow(stack2.Caller(skip)).String()
}
18 changes: 10 additions & 8 deletions compress/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ type Compressor struct {
Ratio CompressionRatio
lvl log.Lvl
trace bool
logger log.Logger
}

func NewCompressor(ctx context.Context, logPrefix, outputFile, tmpDir string, minPatternScore uint64, workers int, lvl log.Lvl) (*Compressor, error) {
func NewCompressor(ctx context.Context, logPrefix, outputFile, tmpDir string, minPatternScore uint64, workers int, lvl log.Lvl, logger log.Logger) (*Compressor, error) {
dir2.MustExist(tmpDir)
dir, fileName := filepath.Split(outputFile)
tmpOutFilePath := filepath.Join(dir, fileName) + ".tmp"
Expand All @@ -92,11 +93,11 @@ func NewCompressor(ctx context.Context, logPrefix, outputFile, tmpDir string, mi
wg.Add(workers)
suffixCollectors := make([]*etl.Collector, workers)
for i := 0; i < workers; i++ {
collector := etl.NewCollector(logPrefix+"_dict", tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize/2))
collector := etl.NewCollector(logPrefix+"_dict", tmpDir, etl.NewSortableBuffer(etl.BufferOptimalSize/2), logger)
collector.LogLvl(lvl)

suffixCollectors[i] = collector
go processSuperstring(superstrings, collector, minPatternScore, wg)
go processSuperstring(superstrings, collector, minPatternScore, wg, logger)
}

return &Compressor{
Expand All @@ -111,6 +112,7 @@ func NewCompressor(ctx context.Context, logPrefix, outputFile, tmpDir string, mi
suffixCollectors: suffixCollectors,
lvl: lvl,
wg: wg,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -179,10 +181,10 @@ func (c *Compressor) Compress() error {
c.wg.Wait()

if c.lvl < log.LvlTrace {
log.Log(c.lvl, fmt.Sprintf("[%s] BuildDict start", c.logPrefix), "workers", c.workers)
c.logger.Log(c.lvl, fmt.Sprintf("[%s] BuildDict start", c.logPrefix), "workers", c.workers)
}
t := time.Now()
db, err := DictionaryBuilderFromCollectors(c.ctx, compressLogPrefix, c.tmpDir, c.suffixCollectors, c.lvl)
db, err := DictionaryBuilderFromCollectors(c.ctx, compressLogPrefix, c.tmpDir, c.suffixCollectors, c.lvl, c.logger)
if err != nil {

return err
Expand All @@ -195,11 +197,11 @@ func (c *Compressor) Compress() error {
}
defer os.Remove(c.tmpOutFilePath)
if c.lvl < log.LvlTrace {
log.Log(c.lvl, fmt.Sprintf("[%s] BuildDict", c.logPrefix), "took", time.Since(t))
c.logger.Log(c.lvl, fmt.Sprintf("[%s] BuildDict", c.logPrefix), "took", time.Since(t))
}

t = time.Now()
if err := reducedict(c.ctx, c.trace, c.logPrefix, c.tmpOutFilePath, c.uncompressedFile, c.workers, db, c.lvl); err != nil {
if err := reducedict(c.ctx, c.trace, c.logPrefix, c.tmpOutFilePath, c.uncompressedFile, c.workers, db, c.lvl, c.logger); err != nil {
return err
}

Expand All @@ -213,7 +215,7 @@ func (c *Compressor) Compress() error {

_, fName := filepath.Split(c.outputFile)
if c.lvl < log.LvlTrace {
log.Log(c.lvl, fmt.Sprintf("[%s] Compress", c.logPrefix), "took", time.Since(t), "ratio", c.Ratio, "file", fName)
c.logger.Log(c.lvl, fmt.Sprintf("[%s] Compress", c.logPrefix), "took", time.Since(t), "ratio", c.Ratio, "file", fName)
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion compress/compress_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
)

func FuzzCompress(f *testing.F) {
logger := log.New()
f.Fuzz(func(t *testing.T, x []byte, pos []byte, workers int8) {
t.Helper()
t.Parallel()
Expand All @@ -51,7 +52,7 @@ func FuzzCompress(f *testing.F) {
ctx := context.Background()
tmpDir := t.TempDir()
file := filepath.Join(tmpDir, fmt.Sprintf("compressed-%d", rand.Int31()))
c, err := NewCompressor(ctx, t.Name(), file, tmpDir, 2, int(workers), log.LvlDebug)
c, err := NewCompressor(ctx, t.Name(), file, tmpDir, 2, int(workers), log.LvlDebug, logger)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions compress/compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (
)

func TestCompressEmptyDict(t *testing.T) {
logger := log.New()
tmpDir := t.TempDir()
file := filepath.Join(tmpDir, "compressed")
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 100, 1, log.LvlDebug)
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 100, 1, log.LvlDebug, logger)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -78,10 +79,11 @@ func checksum(file string) uint32 {

func prepareDict(t *testing.T) *Decompressor {
t.Helper()
logger := log.New()
tmpDir := t.TempDir()
file := filepath.Join(tmpDir, "compressed")
t.Name()
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug)
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug, logger)
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 2 additions & 3 deletions compress/decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,12 @@ func SetDecompressionTableCondensity(fromBitSize int) {
condensePatternTableBitThreshold = fromBitSize
}

func NewDecompressor(compressedFilePath string) (*Decompressor, error) {
func NewDecompressor(compressedFilePath string) (d *Decompressor, err error) {
_, fName := filepath.Split(compressedFilePath)
d := &Decompressor{
d = &Decompressor{
filePath: compressedFilePath,
fileName: fName,
}
var err error
defer func() {
if rec := recover(); rec != nil {
err = fmt.Errorf("decompressing file: %s, %+v, trace: %s", compressedFilePath, rec, dbg.Stack())
Expand Down
9 changes: 6 additions & 3 deletions compress/decompress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ import (

func prepareLoremDict(t *testing.T) *Decompressor {
t.Helper()
logger := log.New()
tmpDir := t.TempDir()
file := filepath.Join(tmpDir, "compressed")
t.Name()
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug)
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug, logger)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -100,10 +101,11 @@ func TestDecompressMatchOK(t *testing.T) {

func prepareStupidDict(t *testing.T, size int) *Decompressor {
t.Helper()
logger := log.New()
tmpDir := t.TempDir()
file := filepath.Join(tmpDir, "compressed2")
t.Name()
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug)
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug, logger)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -214,10 +216,11 @@ func TestDecompressMatchPrefix(t *testing.T) {

func prepareLoremDictUncompressed(t *testing.T) *Decompressor {
t.Helper()
logger := log.New()
tmpDir := t.TempDir()
file := filepath.Join(tmpDir, "compressed")
t.Name()
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug)
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, 1, 2, log.LvlDebug, logger)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 97f822d

Please sign in to comment.