Skip to content

Commit

Permalink
feat(wal): Benchmark and improve WAL writes using Reset. (#13272)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jun 24, 2024
1 parent 05176e4 commit debb5f2
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 63 deletions.
5 changes: 5 additions & 0 deletions pkg/storage/wal/index/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (fw *BufferWriter) Close() error {
return nil
}

func (fw *BufferWriter) Reset() {
fw.pos = 0
fw.buf.Reset()
}

func (fw *BufferWriter) Remove() error {
return nil
}
50 changes: 31 additions & 19 deletions pkg/storage/wal/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ type PostingsEncoder func(*encoding.Encbuf, []uint32) error
// Writer implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
ctx context.Context

// For the main index file.
f *BufferWriter

Expand Down Expand Up @@ -197,9 +195,8 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {

// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
// It uses the given encoder to encode each postings list.
func NewWriterWithEncoder(ctx context.Context, encoder PostingsEncoder) (*Writer, error) {
func NewWriterWithEncoder(encoder PostingsEncoder) (*Writer, error) {
iw := &Writer{
ctx: ctx,
f: NewBufferWriter(),
fP: NewBufferWriter(),
fPO: NewBufferWriter(),
Expand All @@ -222,8 +219,8 @@ func NewWriterWithEncoder(ctx context.Context, encoder PostingsEncoder) (*Writer

// NewWriter creates a new index writer using the default encoder. See
// NewWriterWithEncoder.
func NewWriter(ctx context.Context) (*Writer, error) {
return NewWriterWithEncoder(ctx, EncodePostingsRaw)
func NewWriter() (*Writer, error) {
return NewWriterWithEncoder(EncodePostingsRaw)
}

func (w *Writer) write(bufs ...[]byte) error {
Expand All @@ -242,15 +239,36 @@ func (w *Writer) Buffer() ([]byte, io.Closer, error) {
return w.f.Buffer()
}

func (w *Writer) Reset() error {
w.f.Reset()
w.fP.Reset()
w.fPO.Reset()
w.buf1.Reset()
w.buf2.Reset()
w.stage = idxStageNone
w.toc = TOC{}
w.postingsStart = 0
w.numSymbols = 0
w.symbols = nil
w.symbolFile = nil
w.lastSymbol = ""
w.symbolCache = make(map[string]symbolCacheEntry, 1<<8)
w.labelIndexes = w.labelIndexes[:0]
w.labelNames = make(map[string]uint64, 1<<8)
w.lastSeries = nil
w.lastSeriesRef = 0
w.lastChunkRef = 0
w.cntPO = 0
w.crc32.Reset()
if err := w.writeMeta(); err != nil {
return err
}
return nil
}

// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *Writer) ensureStage(s indexWriterStage) error {
select {
case <-w.ctx.Done():
return w.ctx.Err()
default:
}

if w.stage == s {
return nil
}
Expand Down Expand Up @@ -691,7 +709,6 @@ func (w *Writer) writePostingsOffsetTable() error {
if err := w.fPO.Remove(); err != nil {
return err
}
w.fPO = nil

err = w.writeLengthAndHash(startPos)
if err != nil {
Expand Down Expand Up @@ -854,11 +871,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
}
}
}
select {
case <-w.ctx.Done():
return w.ctx.Err()
default:
}

}
return nil
}
Expand Down Expand Up @@ -936,7 +949,6 @@ func (w *Writer) writePostings() error {
if err := w.fP.Remove(); err != nil {
return err
}
w.fP = nil
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/storage/wal/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (m mockIndex) Series(ref storage.SeriesRef, builder *labels.ScratchBuilder,

func TestIndexRW_Create_Open(t *testing.T) {
// An empty index must still result in a readable file.
iw, err := NewWriter(context.Background())
iw, err := NewWriter()
require.NoError(t, err)
require.NoError(t, iw.Close())

Expand All @@ -160,7 +160,7 @@ func TestIndexRW_Postings(t *testing.T) {
labels: labels.FromStrings("a", "1", "b", strconv.Itoa(i)),
})
}
ir, buf, _ := createReader(ctx, t, input)
ir, buf, _ := createReader(t, input)

p, err := ir.Postings(ctx, "a", "1")
require.NoError(t, err)
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestPostingsMany(t *testing.T) {
labels: labels.FromStrings("i", v, "foo", "bar"),
})
}
ir, _, symbols := createReader(ctx, t, input)
ir, _, symbols := createReader(t, input)

cases := []struct {
in []string
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestPersistence_index_e2e(t *testing.T) {
})
}

ir, _, _ := createReader(ctx, t, input)
ir, _, _ := createReader(t, input)

// Population procedure as done by compaction.
var (
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestPersistence_index_e2e(t *testing.T) {
}

func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
w, err := NewWriter(context.Background())
w, err := NewWriter()
require.NoError(t, err)

require.NoError(t, w.AddSymbol("__name__"))
Expand Down Expand Up @@ -523,7 +523,7 @@ func BenchmarkReader_ShardedPostings(b *testing.B) {
labels: labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)),
})
}
ir, _, _ := createReader(ctx, b, input)
ir, _, _ := createReader(b, input)
b.ResetTimer()

for n := 0; n < b.N; n++ {
Expand All @@ -540,7 +540,7 @@ func TestDecoder_Postings_WrongInput(t *testing.T) {
}

func TestChunksRefOrdering(t *testing.T) {
idx, err := NewWriter(context.Background())
idx, err := NewWriter()
require.NoError(t, err)

require.NoError(t, idx.AddSymbol("1"))
Expand All @@ -558,7 +558,7 @@ func TestChunksRefOrdering(t *testing.T) {
}

func TestChunksTimeOrdering(t *testing.T) {
idx, err := NewWriter(context.Background())
idx, err := NewWriter()
require.NoError(t, err)

require.NoError(t, idx.AddSymbol("1"))
Expand All @@ -585,10 +585,10 @@ func TestChunksTimeOrdering(t *testing.T) {

// createFileReader creates a temporary index file. It writes the provided input to this file.
// It returns a Reader for this file, the file's name, and the symbol map.
func createReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, []byte, map[string]struct{}) {
func createReader(tb testing.TB, input indexWriterSeriesSlice) (*Reader, []byte, map[string]struct{}) {
tb.Helper()

iw, err := NewWriter(ctx)
iw, err := NewWriter()
require.NoError(tb, err)

symbols := map[string]struct{}{}
Expand Down
79 changes: 49 additions & 30 deletions pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
"fmt"
"io"
"sort"

"github.com/dolthub/swiss"
"sync"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
Expand All @@ -24,8 +23,15 @@ import (

// LOKW is the magic number for the Loki WAL format.
var (
magicNumber = uint32(0x4C4F4B57)
magicBuf [4]byte
magicNumber = uint32(0x4C4F4B57)
magicBuf [4]byte
streamSegmentPool = sync.Pool{
New: func() interface{} {
return &streamSegment{
entries: make([]*logproto.Entry, 0, 4096),
}
},
}
)

func init() {
Expand All @@ -37,9 +43,10 @@ type streamID struct {
}

type SegmentWriter struct {
streams *swiss.Map[streamID, *streamSegment]
streams map[streamID]*streamSegment
buf1 encoding.Encbuf
inputSize int64
idxWriter *index.Writer
}

type streamSegment struct {
Expand All @@ -49,12 +56,21 @@ type streamSegment struct {
maxt int64
}

func (s *streamSegment) Reset() {
s.entries = s.entries[:0]
}

// NewWalSegmentWriter creates a new WalSegmentWriter.
func NewWalSegmentWriter() *SegmentWriter {
return &SegmentWriter{
streams: swiss.NewMap[streamID, *streamSegment](64),
buf1: encoding.EncWith(make([]byte, 0, 4)),
func NewWalSegmentWriter() (*SegmentWriter, error) {
idxWriter, err := index.NewWriter()
if err != nil {
return nil, err
}
return &SegmentWriter{
streams: make(map[streamID]*streamSegment, 64),
buf1: encoding.EncWith(make([]byte, 0, 4)),
idxWriter: idxWriter,
}, nil
}

// Labels are passed a string `{foo="bar",baz="qux"}` `{foo="foo",baz="foo"}`. labels.Labels => Symbols foo, baz , qux
Expand All @@ -66,22 +82,18 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
b.inputSize += int64(len(e.Line))
}
id := streamID{labels: labelsString, tenant: tenantID}
s, ok := b.streams.Get(id)
s, ok := b.streams[id]
if !ok {
if lbls.Get(tsdb.TenantLabel) == "" {
lbls = labels.NewBuilder(lbls).Set(tsdb.TenantLabel, tenantID).Labels()
}
s = &streamSegment{
// todo: should be pooled.
// prometheus bucketed pool
// https://pkg.go.dev/github.com/prometheus/prometheus/util/pool
entries: make([]*logproto.Entry, 0, 64),
lbls: lbls,
tenantID: tenantID,
}
s = streamSegmentPool.Get().(*streamSegment)
s.Reset()
s.lbls = lbls
s.tenantID = tenantID
s.maxt = entries[len(entries)-1].Timestamp.UnixNano()
s.entries = append(s.entries, entries...)
b.streams.Put(id, s)
b.streams[id] = s
return
}

Expand All @@ -105,22 +117,25 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
var (
total int64
streams = make([]*streamSegment, 0, b.streams.Count())
streams = make([]*streamSegment, 0, len(b.streams))
)

// Collect all streams and sort them by tenantID and labels.
b.streams.Iter(func(k streamID, v *streamSegment) bool {
streams = append(streams, v)
return false
})
for _, s := range b.streams {
if len(s.entries) == 0 {
continue
}
streams = append(streams, s)
}

sort.Slice(streams, func(i, j int) bool {
if streams[i].tenantID != streams[j].tenantID {
return streams[i].tenantID < streams[j].tenantID
}
return labels.Compare(streams[i].lbls, streams[j].lbls) < 0
})

idxw, err := index.NewWriter(context.TODO())
err := b.idxWriter.Reset()
if err != nil {
return total, err
}
Expand All @@ -143,7 +158,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {

// Add symbols
for _, symbol := range symbols {
if err := idxw.AddSymbol(symbol); err != nil {
if err := b.idxWriter.AddSymbol(symbol); err != nil {
return total, err
}
}
Expand All @@ -163,7 +178,7 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {
if err != nil {
return total, err
}
err = idxw.AddSeries(storage.SeriesRef(i), s.lbls, chunks.Meta{
err = b.idxWriter.AddSeries(storage.SeriesRef(i), s.lbls, chunks.Meta{
MinTime: s.entries[0].Timestamp.UnixNano(),
MaxTime: s.entries[len(s.entries)-1].Timestamp.UnixNano(),
Ref: chunks.NewChunkRef(uint64(total), uint64(n)),
Expand All @@ -175,11 +190,11 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) {

}

if err := idxw.Close(); err != nil {
if err := b.idxWriter.Close(); err != nil {
return total, err
}

buf, closer, err := idxw.Buffer()
buf, closer, err := b.idxWriter.Buffer()
if err != nil {
return total, err
}
Expand Down Expand Up @@ -226,7 +241,11 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
// Reset clears the writer.
// After calling Reset, the writer can be reused.
func (b *SegmentWriter) Reset() {
b.streams.Clear()
for _, s := range b.streams {
s := s
streamSegmentPool.Put(s)
}
b.streams = make(map[streamID]*streamSegment, 64)
b.buf1.Reset()
b.inputSize = 0
}
Expand Down
Loading

0 comments on commit debb5f2

Please sign in to comment.