diff --git a/etl/buffers.go b/etl/buffers.go index b73ecb5f4..5d0c2e4e7 100644 --- a/etl/buffers.go +++ b/etl/buffers.go @@ -48,6 +48,8 @@ type Buffer interface { Get(i int, keyBuf, valBuf []byte) ([]byte, []byte) Len() int Reset() + SizeLimit() int + Prealloc(predictKeysAmount, predictDataAmount int) Write(io.Writer) error Sort() CheckFlushSize() bool @@ -147,11 +149,18 @@ func (b *sortableBuffer) Get(i int, keyBuf, valBuf []byte) ([]byte, []byte) { return keyBuf, valBuf } +func (b *sortableBuffer) Prealloc(predictKeysAmount, predictDataSize int) { + b.lens = make([]int, 0, predictKeysAmount) + b.offsets = make([]int, 0, predictKeysAmount) + b.data = make([]byte, 0, predictDataSize) +} + func (b *sortableBuffer) Reset() { b.offsets = b.offsets[:0] b.lens = b.lens[:0] b.data = b.data[:0] } +func (b *sortableBuffer) SizeLimit() int { return b.optimalSize } func (b *sortableBuffer) Sort() { if sort.IsSorted(b) { return @@ -206,9 +215,8 @@ func (b *appendSortableBuffer) Put(k, v []byte) { b.entries[string(k)] = stored } -func (b *appendSortableBuffer) Size() int { - return b.size -} +func (b *appendSortableBuffer) Size() int { return b.size } +func (b *appendSortableBuffer) SizeLimit() int { return b.optimalSize } func (b *appendSortableBuffer) Len() int { return len(b.entries) @@ -238,6 +246,10 @@ func (b *appendSortableBuffer) Reset() { b.entries = make(map[string][]byte) b.size = 0 } +func (b *appendSortableBuffer) Prealloc(predictKeysAmount, predictDataSize int) { + b.entries = make(map[string][]byte, predictKeysAmount) + b.sortedBuf = make([]sortableBufferEntry, 0, predictKeysAmount*2) +} func (b *appendSortableBuffer) Write(w io.Writer) error { var numBuf [binary.MaxVarintLen64]byte @@ -299,9 +311,8 @@ func (b *oldestEntrySortableBuffer) Put(k, v []byte) { b.entries[string(k)] = common.Copy(v) } -func (b *oldestEntrySortableBuffer) Size() int { - return b.size -} +func (b *oldestEntrySortableBuffer) Size() int { return b.size } +func (b *oldestEntrySortableBuffer) SizeLimit() int { return b.optimalSize } func (b *oldestEntrySortableBuffer) Len() int { return len(b.entries) @@ -332,6 +343,10 @@ func (b *oldestEntrySortableBuffer) Reset() { b.entries = make(map[string][]byte) b.size = 0 } +func (b *oldestEntrySortableBuffer) Prealloc(predictKeysAmount, predictDataSize int) { + b.entries = make(map[string][]byte, predictKeysAmount) + b.sortedBuf = make([]sortableBufferEntry, 0, predictKeysAmount*2) +} func (b *oldestEntrySortableBuffer) Write(w io.Writer) error { var numBuf [binary.MaxVarintLen64]byte diff --git a/etl/collector.go b/etl/collector.go index e33a05f7d..d72ddecd0 100644 --- a/etl/collector.go +++ b/etl/collector.go @@ -18,7 +18,6 @@ package etl import ( "bytes" - "container/heap" "encoding/hex" "errors" "fmt" @@ -27,6 +26,7 @@ import ( "path/filepath" "time" + "github.com/c2h5oh/datasize" "github.com/ledgerwatch/log/v3" "github.com/ledgerwatch/erigon-lib/common" @@ -108,18 +108,24 @@ func (c *Collector) flushBuffer(canStoreInRam bool) error { if c.buf.Len() == 0 { return nil } + var provider dataProvider - c.buf.Sort() if canStoreInRam && len(c.dataProviders) == 0 { + c.buf.Sort() provider = KeepInRAM(c.buf) c.allFlushed = true } else { + fullBuf := c.buf + prevLen, prevSize := fullBuf.Len(), fullBuf.SizeLimit() + c.buf = getBufferByType(c.bufType, datasize.ByteSize(c.buf.SizeLimit())) + doFsync := !c.autoClean /* is critical collector */ var err error - provider, err = FlushToDisk(c.logPrefix, c.buf, c.tmpdir, doFsync, c.logLvl) + provider, err = FlushToDisk(c.logPrefix, fullBuf, c.tmpdir, doFsync, c.logLvl) if err != nil { return err } + c.buf.Prealloc(prevLen/8, prevSize/8) } if provider != nil { c.dataProviders = append(c.dataProviders, provider) @@ -127,6 +133,18 @@ func (c *Collector) flushBuffer(canStoreInRam bool) error { return nil } +// Flush - an optional method (usually user don't need to call it) - forcing sort+flush current buffer. +// it does trigger background sort and flush, reducing RAM-holding, etc... +// it's useful when working with many collectors: to trigger background sort for all of them +func (c *Collector) Flush() error { + if !c.allFlushed { + if e := c.flushBuffer(false); e != nil { + return e + } + } + return nil +} + func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args TransformArgs) error { if c.autoClean { defer c.Close() @@ -261,12 +279,17 @@ func (c *Collector) Close() { // The subsequent iterations pop the heap again and load up the provider associated with it to get the next element after processing LoadFunc. // this continues until all providers have reached their EOF. func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleLoadFunc, args TransformArgs) error { + for _, provider := range providers { + if err := provider.Wait(); err != nil { + return err + } + } + h := &Heap{} - heap.Init(h) + heapInit(h) for i, provider := range providers { if key, value, err := provider.Next(nil, nil); err == nil { - he := HeapElem{key, value, i} - heap.Push(h, he) + heapPush(h, &HeapElem{key, value, i}) } else /* we must have at least one entry per file */ { eee := fmt.Errorf("%s: error reading first readers: n=%d current=%d provider=%s err=%w", logPrefix, len(providers), i, provider, err) @@ -280,14 +303,14 @@ func mergeSortFiles(logPrefix string, providers []dataProvider, loadFunc simpleL return err } - element := (heap.Pop(h)).(HeapElem) + element := heapPop(h) provider := providers[element.TimeIdx] err := loadFunc(element.Key, element.Value) if err != nil { return err } if element.Key, element.Value, err = provider.Next(element.Key[:0], element.Value[:0]); err == nil { - heap.Push(h, element) + heapPush(h, element) } else if !errors.Is(err, io.EOF) { return fmt.Errorf("%s: error while reading next element from disk: %w", logPrefix, err) } diff --git a/etl/dataprovider.go b/etl/dataprovider.go index baab747b9..a142f37f8 100644 --- a/etl/dataprovider.go +++ b/etl/dataprovider.go @@ -24,17 +24,20 @@ import ( "os" "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/errgroup" ) type dataProvider interface { Next(keyBuf, valBuf []byte) ([]byte, []byte, error) - Dispose() uint64 // Safe for repeated call, doesn't return error - means defer-friendly + Dispose() // Safe for repeated call, doesn't return error - means defer-friendly + Wait() error // join point for async providers } type fileDataProvider struct { file *os.File reader io.Reader byteReader io.ByteReader // Different interface to the same object as reader + wg *errgroup.Group } // FlushToDisk - `doFsync` is true only for 'critical' collectors (which should not loose). @@ -43,35 +46,39 @@ func FlushToDisk(logPrefix string, b Buffer, tmpdir string, doFsync bool, lvl lo return nil, nil } - // if we are going to create files in the system temp dir, we don't need any - // subfolders. - if tmpdir != "" { - if err := os.MkdirAll(tmpdir, 0755); err != nil { - return nil, err + provider := &fileDataProvider{reader: nil, wg: &errgroup.Group{}} + provider.wg.Go(func() error { + b.Sort() + + // if we are going to create files in the system temp dir, we don't need any + // subfolders. + if tmpdir != "" { + if err := os.MkdirAll(tmpdir, 0755); err != nil { + return err + } } - } - bufferFile, err := os.CreateTemp(tmpdir, "erigon-sortable-buf-") - if err != nil { - return nil, err - } - if doFsync { - defer bufferFile.Sync() //nolint:errcheck - } + bufferFile, err := os.CreateTemp(tmpdir, "erigon-sortable-buf-") + if err != nil { + return err + } + provider.file = bufferFile - w := bufio.NewWriterSize(bufferFile, BufIOSize) - defer w.Flush() //nolint:errcheck + if doFsync { + defer bufferFile.Sync() //nolint:errcheck + } - defer func() { - b.Reset() // run it after buf.flush and file.sync - log.Log(lvl, fmt.Sprintf("[%s] Flushed buffer file", logPrefix), "name", bufferFile.Name()) - }() + w := bufio.NewWriterSize(bufferFile, BufIOSize) + defer w.Flush() //nolint:errcheck - if err = b.Write(w); err != nil { - return nil, fmt.Errorf("error writing entries to disk: %w", err) - } + if err = b.Write(w); err != nil { + return fmt.Errorf("error writing entries to disk: %w", err) + } + log.Log(lvl, fmt.Sprintf("[%s] Flushed buffer file", logPrefix), "name", bufferFile.Name()) + return nil + }) - return &fileDataProvider{file: bufferFile, reader: nil}, nil + return provider, nil } func (p *fileDataProvider) Next(keyBuf, valBuf []byte) ([]byte, []byte, error) { @@ -88,14 +95,14 @@ func (p *fileDataProvider) Next(keyBuf, valBuf []byte) ([]byte, []byte, error) { return readElementFromDisk(p.reader, p.byteReader, keyBuf, valBuf) } -func (p *fileDataProvider) Dispose() uint64 { - info, _ := os.Stat(p.file.Name()) - _ = p.file.Close() - _ = os.Remove(p.file.Name()) - if info == nil { - return 0 +func (p *fileDataProvider) Wait() error { return p.wg.Wait() } +func (p *fileDataProvider) Dispose() { + if p.file != nil { //invariant: safe to call multiple time + p.Wait() + _ = p.file.Close() + _ = os.Remove(p.file.Name()) + p.file = nil } - return uint64(info.Size()) } func (p *fileDataProvider) String() string { @@ -161,9 +168,8 @@ func (p *memoryDataProvider) Next(keyBuf, valBuf []byte) ([]byte, []byte, error) return key, value, nil } -func (p *memoryDataProvider) Dispose() uint64 { - return 0 /* doesn't take space on disk */ -} +func (p *memoryDataProvider) Wait() error { return nil } +func (p *memoryDataProvider) Dispose() {} func (p *memoryDataProvider) String() string { return fmt.Sprintf("%T(buffer.Len: %d)", p, p.buffer.Len()) diff --git a/etl/etl_test.go b/etl/etl_test.go index 78873f311..18ab3dc48 100644 --- a/etl/etl_test.go +++ b/etl/etl_test.go @@ -188,6 +188,8 @@ func TestFileDataProviders(t *testing.T) { for _, p := range collector.dataProviders { fp, ok := p.(*fileDataProvider) assert.True(t, ok) + err := fp.Wait() + require.NoError(t, err) _, err = os.Stat(fp.file.Name()) assert.NoError(t, err) } diff --git a/etl/heap.go b/etl/heap.go index 804412101..03eea253c 100644 --- a/etl/heap.go +++ b/etl/heap.go @@ -8,7 +8,7 @@ http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, + distwributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. @@ -27,35 +27,96 @@ type HeapElem struct { } type Heap struct { - elems []HeapElem + elems []*HeapElem } -func (h Heap) Len() int { +func (h *Heap) Len() int { return len(h.elems) } -func (h Heap) Less(i, j int) bool { +func (h *Heap) Less(i, j int) bool { if c := bytes.Compare(h.elems[i].Key, h.elems[j].Key); c != 0 { return c < 0 } return h.elems[i].TimeIdx < h.elems[j].TimeIdx } -func (h Heap) Swap(i, j int) { +func (h *Heap) Swap(i, j int) { h.elems[i], h.elems[j] = h.elems[j], h.elems[i] } -func (h *Heap) Push(x interface{}) { - // Push and Pop use pointer receivers because they modify the slice's length, - // not just its contents. - h.elems = append(h.elems, x.(HeapElem)) +func (h *Heap) Push(x *HeapElem) { + h.elems = append(h.elems, x) } -func (h *Heap) Pop() interface{} { +func (h *Heap) Pop() *HeapElem { old := h.elems - n := len(old) - x := old[n-1] - old[n-1] = HeapElem{} - h.elems = old[0 : n-1] + n := len(old) - 1 + x := old[n] + //old[n].Key, old[n].Value, old[n].TimeIdx = nil, nil, 0 + old[n] = nil + h.elems = old[0:n] return x } + +// ------ Copy-Paste of `container/heap/heap.go` without interface conversion + +// Init establishes the heap invariants required by the other routines in this package. +// Init is idempotent with respect to the heap invariants +// and may be called whenever the heap invariants may have been invalidated. +// The complexity is O(n) where n = h.Len(). +func heapInit(h *Heap) { + // heapify + n := h.Len() + for i := n/2 - 1; i >= 0; i-- { + down(h, i, n) + } +} + +// Push pushes the element x onto the heap. +// The complexity is O(log n) where n = h.Len(). +func heapPush(h *Heap, x *HeapElem) { + h.Push(x) + up(h, h.Len()-1) +} + +// Pop removes and returns the minimum element (according to Less) from the heap. +// The complexity is O(log n) where n = h.Len(). +// Pop is equivalent to Remove(h, 0). +func heapPop(h *Heap) *HeapElem { + n := h.Len() - 1 + h.Swap(0, n) + down(h, 0, n) + return h.Pop() +} + +func up(h *Heap, j int) { + for { + i := (j - 1) / 2 // parent + if i == j || !h.Less(j, i) { + break + } + h.Swap(i, j) + j = i + } +} + +func down(h *Heap, i0, n int) bool { + i := i0 + for { + j1 := 2*i + 1 + if j1 >= n || j1 < 0 { // j1 < 0 after int overflow + break + } + j := j1 // left child + if j2 := j1 + 1; j2 < n && h.Less(j2, j1) { + j = j2 // = 2*i + 2 // right child + } + if !h.Less(j, i) { + break + } + h.Swap(i, j) + i = j + } + return i > i0 +}