Skip to content

Commit

Permalink
etl: do sort and file flush in another goroutine (ledgerwatch#1052)
Browse files Browse the repository at this point in the history
- if provider is in-memory: do sort+flush in same goroutine
- if provider is file-based: do sort+flush in another goroutine, and
Load method will wait for unfinished goroutines and return error if one
happened inside goroutine. Also in this case do pre-palloc of new buffer
with `prevBufSize/8` size - because can't re-use prev buffer in this
case.

Reason: E4 has 8 etl collectors in same time (for
domains/history/inverted_indices) and `sort.Stable` is kind-of
bottleneck.
  • Loading branch information
AskAlexSharov authored and blxdyx committed Sep 13, 2023
1 parent 207659a commit 00a18fc
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 62 deletions.
27 changes: 21 additions & 6 deletions etl/buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Buffer interface {
Get(i int, keyBuf, valBuf []byte) ([]byte, []byte)
Len() int
Reset()
SizeLimit() int
Prealloc(predictKeysAmount, predictDataAmount int)
Write(io.Writer) error
Sort()
CheckFlushSize() bool
Expand Down Expand Up @@ -147,11 +149,18 @@ func (b *sortableBuffer) Get(i int, keyBuf, valBuf []byte) ([]byte, []byte) {
return keyBuf, valBuf
}

func (b *sortableBuffer) Prealloc(predictKeysAmount, predictDataSize int) {
b.lens = make([]int, 0, predictKeysAmount)
b.offsets = make([]int, 0, predictKeysAmount)
b.data = make([]byte, 0, predictDataSize)
}

func (b *sortableBuffer) Reset() {
b.offsets = b.offsets[:0]
b.lens = b.lens[:0]
b.data = b.data[:0]
}
func (b *sortableBuffer) SizeLimit() int { return b.optimalSize }
func (b *sortableBuffer) Sort() {
if sort.IsSorted(b) {
return
Expand Down Expand Up @@ -206,9 +215,8 @@ func (b *appendSortableBuffer) Put(k, v []byte) {
b.entries[string(k)] = stored
}

func (b *appendSortableBuffer) Size() int {
return b.size
}
func (b *appendSortableBuffer) Size() int { return b.size }
func (b *appendSortableBuffer) SizeLimit() int { return b.optimalSize }

func (b *appendSortableBuffer) Len() int {
return len(b.entries)
Expand Down Expand Up @@ -238,6 +246,10 @@ func (b *appendSortableBuffer) Reset() {
b.entries = make(map[string][]byte)
b.size = 0
}
func (b *appendSortableBuffer) Prealloc(predictKeysAmount, predictDataSize int) {
b.entries = make(map[string][]byte, predictKeysAmount)
b.sortedBuf = make([]sortableBufferEntry, 0, predictKeysAmount*2)
}

func (b *appendSortableBuffer) Write(w io.Writer) error {
var numBuf [binary.MaxVarintLen64]byte
Expand Down Expand Up @@ -299,9 +311,8 @@ func (b *oldestEntrySortableBuffer) Put(k, v []byte) {
b.entries[string(k)] = common.Copy(v)
}

func (b *oldestEntrySortableBuffer) Size() int {
return b.size
}
func (b *oldestEntrySortableBuffer) Size() int { return b.size }
func (b *oldestEntrySortableBuffer) SizeLimit() int { return b.optimalSize }

func (b *oldestEntrySortableBuffer) Len() int {
return len(b.entries)
Expand Down Expand Up @@ -332,6 +343,10 @@ func (b *oldestEntrySortableBuffer) Reset() {
b.entries = make(map[string][]byte)
b.size = 0
}
func (b *oldestEntrySortableBuffer) Prealloc(predictKeysAmount, predictDataSize int) {
b.entries = make(map[string][]byte, predictKeysAmount)
b.sortedBuf = make([]sortableBufferEntry, 0, predictKeysAmount*2)
}

func (b *oldestEntrySortableBuffer) Write(w io.Writer) error {
var numBuf [binary.MaxVarintLen64]byte
Expand Down
39 changes: 31 additions & 8 deletions etl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package etl

import (
"bytes"
"container/heap"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -27,6 +26,7 @@ import (
"path/filepath"
"time"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -108,25 +108,43 @@ func (c *Collector) flushBuffer(canStoreInRam bool) error {
if c.buf.Len() == 0 {
return nil
}

var provider dataProvider
c.buf.Sort()
if canStoreInRam && len(c.dataProviders) == 0 {
c.buf.Sort()
provider = KeepInRAM(c.buf)
c.allFlushed = true
} else {
fullBuf := c.buf
prevLen, prevSize := fullBuf.Len(), fullBuf.SizeLimit()
c.buf = getBufferByType(c.bufType, datasize.ByteSize(c.buf.SizeLimit()))

doFsync := !c.autoClean /* is critical collector */
var err error
provider, err = FlushToDisk(c.logPrefix, c.buf, c.tmpdir, doFsync, c.logLvl)
provider, err = FlushToDisk(c.logPrefix, fullBuf, c.tmpdir, doFsync, c.logLvl)
if err != nil {
return err
}
c.buf.Prealloc(prevLen/8, prevSize/8)
}
if provider != nil {
c.dataProviders = append(c.dataProviders, provider)
}
return nil
}

// Flush - an optional method (usually user don't need to call it) - forcing sort+flush current buffer.
// it does trigger background sort and flush, reducing RAM-holding, etc...
// it's useful when working with many collectors: to trigger background sort for all of them
func (c *Collector) Flush() error {
if !c.allFlushed {
if e := c.flushBuffer(false); e != nil {
return e
}
}
return nil
}

func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args TransformArgs) error {
if c.autoClean {
defer c.Close()
Expand Down Expand Up @@ -261,12 +279,17 @@ func (c *Collector) Close() {
// The subsequent iterations pop the heap again and load up the provider associated with it to get the next element after processing LoadFunc.
// this continues until all providers have reached their EOF.
func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleLoadFunc, args TransformArgs) error {
for _, provider := range providers {
if err := provider.Wait(); err != nil {
return err
}
}

h := &Heap{}
heap.Init(h)
heapInit(h)
for i, provider := range providers {
if key, value, err := provider.Next(nil, nil); err == nil {
he := HeapElem{key, value, i}
heap.Push(h, he)
heapPush(h, &HeapElem{key, value, i})
} else /* we must have at least one entry per file */ {
eee := fmt.Errorf("%s: error reading first readers: n=%d current=%d provider=%s err=%w",
logPrefix, len(providers), i, provider, err)
Expand All @@ -280,14 +303,14 @@ func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleL
return err
}

element := (heap.Pop(h)).(HeapElem)
element := heapPop(h)
provider := providers[element.TimeIdx]
err := loadFunc(element.Key, element.Value)
if err != nil {
return err
}
if element.Key, element.Value, err = provider.Next(element.Key[:0], element.Value[:0]); err == nil {
heap.Push(h, element)
heapPush(h, element)
} else if !errors.Is(err, io.EOF) {
return fmt.Errorf("%s: error while reading next element from disk: %w", logPrefix, err)
}
Expand Down
74 changes: 40 additions & 34 deletions etl/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ import (
"os"

"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"
)

type dataProvider interface {
Next(keyBuf, valBuf []byte) ([]byte, []byte, error)
Dispose() uint64 // Safe for repeated call, doesn't return error - means defer-friendly
Dispose() // Safe for repeated call, doesn't return error - means defer-friendly
Wait() error // join point for async providers
}

type fileDataProvider struct {
file *os.File
reader io.Reader
byteReader io.ByteReader // Different interface to the same object as reader
wg *errgroup.Group
}

// FlushToDisk - `doFsync` is true only for 'critical' collectors (which should not loose).
Expand All @@ -43,35 +46,39 @@ func FlushToDisk(logPrefix string, b Buffer, tmpdir string, doFsync bool, lvl lo
return nil, nil
}

// if we are going to create files in the system temp dir, we don't need any
// subfolders.
if tmpdir != "" {
if err := os.MkdirAll(tmpdir, 0755); err != nil {
return nil, err
provider := &fileDataProvider{reader: nil, wg: &errgroup.Group{}}
provider.wg.Go(func() error {
b.Sort()

// if we are going to create files in the system temp dir, we don't need any
// subfolders.
if tmpdir != "" {
if err := os.MkdirAll(tmpdir, 0755); err != nil {
return err
}
}
}

bufferFile, err := os.CreateTemp(tmpdir, "erigon-sortable-buf-")
if err != nil {
return nil, err
}
if doFsync {
defer bufferFile.Sync() //nolint:errcheck
}
bufferFile, err := os.CreateTemp(tmpdir, "erigon-sortable-buf-")
if err != nil {
return err
}
provider.file = bufferFile

w := bufio.NewWriterSize(bufferFile, BufIOSize)
defer w.Flush() //nolint:errcheck
if doFsync {
defer bufferFile.Sync() //nolint:errcheck
}

defer func() {
b.Reset() // run it after buf.flush and file.sync
log.Log(lvl, fmt.Sprintf("[%s] Flushed buffer file", logPrefix), "name", bufferFile.Name())
}()
w := bufio.NewWriterSize(bufferFile, BufIOSize)
defer w.Flush() //nolint:errcheck

if err = b.Write(w); err != nil {
return nil, fmt.Errorf("error writing entries to disk: %w", err)
}
if err = b.Write(w); err != nil {
return fmt.Errorf("error writing entries to disk: %w", err)
}
log.Log(lvl, fmt.Sprintf("[%s] Flushed buffer file", logPrefix), "name", bufferFile.Name())
return nil
})

return &fileDataProvider{file: bufferFile, reader: nil}, nil
return provider, nil
}

func (p *fileDataProvider) Next(keyBuf, valBuf []byte) ([]byte, []byte, error) {
Expand All @@ -88,14 +95,14 @@ func (p *fileDataProvider) Next(keyBuf, valBuf []byte) ([]byte, []byte, error) {
return readElementFromDisk(p.reader, p.byteReader, keyBuf, valBuf)
}

func (p *fileDataProvider) Dispose() uint64 {
info, _ := os.Stat(p.file.Name())
_ = p.file.Close()
_ = os.Remove(p.file.Name())
if info == nil {
return 0
func (p *fileDataProvider) Wait() error { return p.wg.Wait() }
func (p *fileDataProvider) Dispose() {
if p.file != nil { //invariant: safe to call multiple time
p.Wait()
_ = p.file.Close()
_ = os.Remove(p.file.Name())
p.file = nil
}
return uint64(info.Size())
}

func (p *fileDataProvider) String() string {
Expand Down Expand Up @@ -161,9 +168,8 @@ func (p *memoryDataProvider) Next(keyBuf, valBuf []byte) ([]byte, []byte, error)
return key, value, nil
}

func (p *memoryDataProvider) Dispose() uint64 {
return 0 /* doesn't take space on disk */
}
func (p *memoryDataProvider) Wait() error { return nil }
func (p *memoryDataProvider) Dispose() {}

func (p *memoryDataProvider) String() string {
return fmt.Sprintf("%T(buffer.Len: %d)", p, p.buffer.Len())
Expand Down
2 changes: 2 additions & 0 deletions etl/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ func TestFileDataProviders(t *testing.T) {
for _, p := range collector.dataProviders {
fp, ok := p.(*fileDataProvider)
assert.True(t, ok)
err := fp.Wait()
require.NoError(t, err)
_, err = os.Stat(fp.file.Name())
assert.NoError(t, err)
}
Expand Down
Loading

0 comments on commit 00a18fc

Please sign in to comment.