From 1261e445edec559d6cf1fda2422ba1ccb41e022d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 13 Nov 2020 16:06:45 +0100 Subject: [PATCH 1/2] Only append tailed entries if needed. This avoid to doing allocations on slices when there is no tailers. We could potentially pooled the slice too but the tailing code is using the stream in an async fashion so we can't put it back. Signed-off-by: Cyril Tovena --- pkg/chunkenc/dumb_chunk.go | 5 ++ pkg/ingester/instance_test.go | 2 +- pkg/ingester/stream.go | 17 +++++-- pkg/ingester/stream_test.go | 92 +++++++++++++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 4 deletions(-) diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index f8977925ffeb..089105c321a8 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -19,6 +19,11 @@ func NewDumbChunk() Chunk { return &dumbChunk{} } +// NewDumbChunk returns a new chunk that isn't very good. +func NewDumbChunkWithArray(entries []logproto.Entry) Chunk { + return &dumbChunk{entries: entries} +} + type dumbChunk struct { entries []logproto.Entry } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 73290c5e6c34..f11e688ff6b1 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -240,7 +240,7 @@ func Test_SeriesQuery(t *testing.T) { } func entries(n int, t time.Time) []logproto.Entry { - var result []logproto.Entry + result := make([]logproto.Entry, 0, n) for i := 0; i < n; i++ { result = append(result, logproto.Entry{Timestamp: t, Line: fmt.Sprintf("hello %d", i)}) t = t.Add(time.Nanosecond) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 30e2f07ccd12..dde44a050bc2 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -122,7 +122,15 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize _, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds() } - storedEntries := []logproto.Entry{} + s.tailerMtx.RLock() + hasTailers := len(s.tailers) != 0 + s.tailerMtx.RUnlock() + + var storedEntries []logproto.Entry + if hasTailers { + storedEntries = make([]logproto.Entry, 0, len(entries)) + } + failedEntriesWithError := []entryWithError{} // Don't fail on the first append error - if samples are sent out of order, @@ -165,9 +173,12 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err}) } else { // send only stored entries to tailers - storedEntries = append(storedEntries, entries[i]) + if hasTailers { + storedEntries = append(storedEntries, entries[i]) + } lastChunkTimestamp = entries[i].Timestamp - s.lastLine = line{ts: lastChunkTimestamp, content: entries[i].Line} + s.lastLine.ts = lastChunkTimestamp + s.lastLine.content = entries[i].Line } chunk.lastUpdated = time.Now() } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index c9402035eb2a..efa59d3359f5 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -15,6 +15,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" ) @@ -140,3 +141,94 @@ func TestStreamIterator(t *testing.T) { } } + +func Benchmark_PushStream(b *testing.B) { + ls := labels.Labels{ + labels.Label{Name: "namespace", Value: "loki-dev"}, + labels.Label{Name: "cluster", Value: "dev-us-central1"}, + labels.Label{Name: "job", Value: "loki-dev/ingester"}, + labels.Label{Name: "container", Value: "ingester"}, + } + s := newStream(&Config{}, model.Fingerprint(0), ls, func() chunkenc.Chunk { + return &noopChunk{} + }) + t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}) + require.NoError(b, err) + + go t.loop() + defer t.close() + + s.tailers[1] = t + ctx := context.Background() + e := entries(100, time.Now()) + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + require.NoError(b, s.Push(ctx, e, 0, 0)) + } +} + +type noopChunk struct { +} + +func (c *noopChunk) Bounds() (time.Time, time.Time) { + return time.Time{}, time.Time{} +} + +func (c *noopChunk) SpaceFor(_ *logproto.Entry) bool { + return true +} + +func (c *noopChunk) Append(entry *logproto.Entry) error { + return nil +} + +func (c *noopChunk) Size() int { + return 0 +} + +// UncompressedSize implements Chunk. +func (c *noopChunk) UncompressedSize() int { + return c.Size() +} + +// CompressedSize implements Chunk. +func (c *noopChunk) CompressedSize() int { + return 0 +} + +// Utilization implements Chunk +func (c *noopChunk) Utilization() float64 { + return 0 +} + +func (c *noopChunk) Encoding() chunkenc.Encoding { return chunkenc.EncNone } + +func (c *noopChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) { + return nil, nil +} + +func (c *noopChunk) SampleIterator(_ context.Context, from, through time.Time, _ log.StreamSampleExtractor) iter.SampleIterator { + return nil +} + +func (c *noopChunk) Bytes() ([]byte, error) { + return nil, nil +} + +func (c *noopChunk) BytesWith(_ []byte) ([]byte, error) { + return nil, nil +} + +func (c *noopChunk) Blocks(_ time.Time, _ time.Time) []chunkenc.Block { + return nil +} + +func (c *noopChunk) BlockCount() int { + return 0 +} + +func (c *noopChunk) Close() error { + return nil +} From bc4406391b381700603eba95dc92c334149fa586 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 13 Nov 2020 16:09:15 +0100 Subject: [PATCH 2/2] cleanup Signed-off-by: Cyril Tovena --- pkg/chunkenc/dumb_chunk.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 089105c321a8..f8977925ffeb 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -19,11 +19,6 @@ func NewDumbChunk() Chunk { return &dumbChunk{} } -// NewDumbChunk returns a new chunk that isn't very good. -func NewDumbChunkWithArray(entries []logproto.Entry) Chunk { - return &dumbChunk{entries: entries} -} - type dumbChunk struct { entries []logproto.Entry }