diff --git a/docs/operations/observability.md b/docs/operations/observability.md index fcbe10f7346c..20548a76ac1d 100644 --- a/docs/operations/observability.md +++ b/docs/operations/observability.md @@ -24,22 +24,26 @@ The Loki Distributors expose the following metrics: The Loki Ingesters expose the following metrics: -| Metric Name | Metric Type | Description | -| ----------------------------------------- | ----------- | ------------------------------------------------------------------------------------------- | -| `cortex_ingester_flush_queue_length` | Gauge | The total number of series pending in the flush queue. | -| `loki_ingester_chunk_age_seconds` | Histogram | Distribution of chunk ages when flushed. | -| `loki_ingester_chunk_encode_time_seconds` | Histogram | Distribution of chunk encode times. | -| `loki_ingester_chunk_entries` | Histogram | Distribution of entires per-chunk when flushed. | -| `loki_ingester_chunk_size_bytes` | Histogram | Distribution of chunk sizes when flushed. | -| `loki_ingester_chunk_stored_bytes_total` | Counter | Total bytes stored in chunks per tenant. | -| `loki_ingester_chunks_created_total` | Counter | The total number of chunks created in the ingester. | -| `loki_ingester_chunks_flushed_total` | Counter | The total number of chunks flushed by the ingester. | -| `loki_ingester_chunks_stored_total` | Counter | Total stored chunks per tenant. | -| `loki_ingester_received_chunks` | Counter | The total number of chunks sent by this ingester whilst joining during the handoff process. | -| `loki_ingester_samples_per_chunk` | Histogram | The number of samples in a chunk. | -| `loki_ingester_sent_chunks` | Counter | The total number of chunks sent by this ingester whilst leaving during the handoff process. | -| `loki_ingester_streams_created_total` | Counter | The total number of streams created per tenant. | -| `loki_ingester_streams_removed_total` | Counter | The total number of streams removed per tenant. | +| Metric Name | Metric Type | Description | +| -------------------------------------------- | ----------- | ------------------------------------------------------------------------------------------- | +| `cortex_ingester_flush_queue_length` | Gauge | The total number of series pending in the flush queue. | +| `cortex_chunk_store_index_entries_per_chunk` | Histogram | Number of index entries written to storage per chunk. | +| `loki_ingester_memory_chunks` | Gauge | The total number of chunks in memory. | +| `loki_ingester_memory_streams` | Gauge | The total number of streams in memory. | +| `loki_ingester_chunk_age_seconds` | Histogram | Distribution of chunk ages when flushed. | +| `loki_ingester_chunk_encode_time_seconds` | Histogram | Distribution of chunk encode times. | +| `loki_ingester_chunk_entries` | Histogram | Distribution of lines per-chunk when flushed. | +| `loki_ingester_chunk_size_bytes` | Histogram | Distribution of chunk sizes when flushed. | +| `loki_ingester_chunk_utilization` | Histogram | Distribution of chunk utilization (filled uncompressed bytes vs maximum uncompressed bytes) when flushed. | +| `loki_ingester_chunk_compression_ratio` | Histogram | Distribution of chunk compression ratio when flushed. | +| `loki_ingester_chunk_stored_bytes_total` | Counter | Total bytes stored in chunks per tenant. | +| `loki_ingester_chunks_created_total` | Counter | The total number of chunks created in the ingester. | +| `loki_ingester_chunks_stored_total` | Counter | Total stored chunks per tenant. | +| `loki_ingester_received_chunks` | Counter | The total number of chunks sent by this ingester whilst joining during the handoff process. | +| `loki_ingester_samples_per_chunk` | Histogram | The number of samples in a chunk. | +| `loki_ingester_sent_chunks` | Counter | The total number of chunks sent by this ingester whilst leaving during the handoff process. | +| `loki_ingester_streams_created_total` | Counter | The total number of streams created per tenant. | +| `loki_ingester_streams_removed_total` | Counter | The total number of streams removed per tenant. | Promtail exposes these metrics: diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 89a3b4128fee..edb154b050b8 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -50,6 +50,16 @@ func (c *dumbChunk) Size() int { return len(c.entries) } +// UncompressedSize implements Chunk. +func (c *dumbChunk) UncompressedSize() int { + return c.Size() +} + +// Utilization implements Chunk +func (c *dumbChunk) Utilization() float64 { + return float64(len(c.entries)) / float64(tmpNumEntries) +} + // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) { diff --git a/pkg/chunkenc/facade.go b/pkg/chunkenc/facade.go index ef3801d1dc22..8556b0fdd044 100644 --- a/pkg/chunkenc/facade.go +++ b/pkg/chunkenc/facade.go @@ -52,7 +52,23 @@ func (Facade) Encoding() encoding.Encoding { return GzipLogChunk } +// Utilization implements encoding.Chunk. +func (f Facade) Utilization() float64 { + return f.c.Utilization() +} + // LokiChunk returns the chunkenc.Chunk. func (f Facade) LokiChunk() Chunk { return f.c } + +// UncompressedSize is a helper function to hide the type assertion kludge when wanting the uncompressed size of the Cortex interface encoding.Chunk. +func UncompressedSize(c encoding.Chunk) (int, bool) { + f, ok := c.(*Facade) + + if !ok { + return 0, false + } + + return f.c.UncompressedSize(), true +} diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index 3d42dd5b5189..075373b40c9f 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -61,7 +61,8 @@ type block struct { mint, maxt int64 - offset int // The offset of the block in the chunk. + offset int // The offset of the block in the chunk. + uncompressedSize int // Total uncompressed size in bytes when the chunk is cut. } // This block holds the un-compressed entries. Once it has enough data, this is @@ -313,6 +314,28 @@ func (c *MemChunk) SpaceFor(*logproto.Entry) bool { return len(c.blocks) < blocksPerChunk } +// UncompressedSize implements Chunk. +func (c *MemChunk) UncompressedSize() int { + size := 0 + + if !c.head.isEmpty() { + size += c.head.size + } + + for _, b := range c.blocks { + size += b.uncompressedSize + } + + return size +} + +// Utilization implements Chunk. It is the bytes used as a percentage of the +func (c *MemChunk) Utilization() float64 { + size := c.UncompressedSize() + + return float64(size) / float64(blocksPerChunk*c.blockSize) +} + // Append implements Chunk. func (c *MemChunk) Append(entry *logproto.Entry) error { entryTimestamp := entry.Timestamp.UnixNano() @@ -352,10 +375,11 @@ func (c *MemChunk) cut() error { } c.blocks = append(c.blocks, block{ - b: b, - numEntries: len(c.head.entries), - mint: c.head.mint, - maxt: c.head.maxt, + b: b, + numEntries: len(c.head.entries), + mint: c.head.mint, + maxt: c.head.maxt, + uncompressedSize: c.head.size, }) c.head.entries = c.head.entries[:0] diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 08ee8cf3034c..1f8d405e8b4b 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -50,6 +50,8 @@ type Chunk interface { Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) Size() int Bytes() ([]byte, error) + Utilization() float64 + UncompressedSize() int } // CompressionWriter is the writer that compresses the data passed to it. diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index d45c11bea637..29f45a9a2813 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -22,9 +22,18 @@ import ( ) var ( + chunkUtilization = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingester_chunk_utilization", + Help: "Distribution of stored chunk utilization (when stored).", + Buckets: prometheus.LinearBuckets(0, 0.2, 6), + }) + memoryChunks = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_memory_chunks", + Help: "The total number of chunks in memory.", + }) chunkEntries = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "loki_ingester_chunk_entries", - Help: "Distribution of stored chunk entries (when stored).", + Help: "Distribution of stored lines per chunk (when stored).", Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200 }) chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{ @@ -32,6 +41,11 @@ var ( Help: "Distribution of stored chunk sizes (when stored).", Buckets: prometheus.ExponentialBuckets(10000, 2, 7), // biggest bucket is 10000*2^(7-1) = 640000 (~640KB) }) + chunkCompressionRatio = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "loki_ingester_chunk_compression_ratio", + Help: "Compression ratio of chunks (when stored).", + Buckets: prometheus.LinearBuckets(1, 1.5, 6), + }) chunksPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "loki_ingester_chunks_stored_total", Help: "Total stored chunks per tenant.", @@ -234,6 +248,7 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool { func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { now := time.Now() + prevNumChunks := len(stream.chunks) for len(stream.chunks) > 0 { if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod { break @@ -242,11 +257,13 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) { stream.chunks[0].chunk = nil // erase reference so the chunk can be garbage-collected stream.chunks = stream.chunks[1:] } + memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks))) if len(stream.chunks) == 0 { delete(instance.streams, stream.fp) instance.index.Delete(client.FromLabelAdaptersToLabels(stream.labels), stream.fp) instance.streamsRemovedTotal.Inc() + memoryStreams.Dec() } } @@ -292,9 +309,17 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP continue } + compressedSize := float64(len(byt)) + uncompressedSize, ok := chunkenc.UncompressedSize(wc.Data) + + if ok { + chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) + } + + chunkUtilization.Observe(wc.Data.Utilization()) chunkEntries.Observe(float64(numEntries)) - chunkSize.Observe(float64(len(byt))) - sizePerTenant.Add(float64(len(byt))) + chunkSize.Observe(compressedSize) + sizePerTenant.Add(compressedSize) countPerTenant.Inc() firstTime, _ := cs[i].chunk.Bounds() chunkAge.Observe(time.Since(firstTime).Seconds()) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 18c386bc379f..a10fe6e9a4e2 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -32,6 +32,10 @@ var ( ) var ( + memoryStreams = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_memory_streams", + Help: "The total number of streams in memory.", + }) streamsCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "loki", Name: "ingester_streams_created_total", @@ -89,10 +93,16 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte i.index.Add(labels, fp) i.streams[fp] = stream i.streamsCreatedTotal.Inc() + memoryStreams.Inc() i.addTailersToNewStream(stream) } - return stream.consumeChunk(ctx, chunk) + err := stream.consumeChunk(ctx, chunk) + if err == nil { + memoryChunks.Inc() + } + + return err } func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { @@ -113,10 +123,13 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { continue } + prevNumChunks := len(stream.chunks) if err := stream.Push(ctx, s.Entries); err != nil { appendErr = err continue } + + memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks)) } return appendErr @@ -136,6 +149,7 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err stream = newStream(fp, labels, i.blockSize) i.index.Add(labels, fp) i.streams[fp] = stream + memoryStreams.Inc() i.streamsCreatedTotal.Inc() i.addTailersToNewStream(stream) diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 9e8f857f3cc3..095f5137cbd9 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -25,11 +25,6 @@ var ( Name: "ingester_chunks_created_total", Help: "The total number of chunks created in the ingester.", }) - chunksFlushedTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "loki", - Name: "ingester_chunks_flushed_total", - Help: "The total number of chunks flushed by the ingester.", - }) samplesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "loki", Subsystem: "ingester", @@ -42,7 +37,6 @@ var ( func init() { prometheus.MustRegister(chunksCreatedTotal) - prometheus.MustRegister(chunksFlushedTotal) prometheus.MustRegister(samplesPerChunk) } diff --git a/production/loki-mixin/dashboards.libsonnet b/production/loki-mixin/dashboards.libsonnet index dce926923763..4129bdd25f47 100644 --- a/production/loki-mixin/dashboards.libsonnet +++ b/production/loki-mixin/dashboards.libsonnet @@ -92,7 +92,7 @@ local utils = import "mixin-utils/utils.libsonnet"; ) .addPanel( g.panel('Chunks per series') + - g.queryPanel('sum(loki_ingester_memory_chunks{cluster="$cluster", job="$namespace/ingester"}) / sum(loki_ingester_memory_series{job="$namespace/ingester"})', 'chunks'), + g.queryPanel('sum(loki_ingester_memory_chunks{cluster="$cluster", job="$namespace/ingester"}) / sum(loki_ingester_memory_streams{job="$namespace/ingester"})', 'chunks'), ) ) .addRow( @@ -111,19 +111,19 @@ local utils = import "mixin-utils/utils.libsonnet"; g.row('Flush Stats') .addPanel( g.panel('Size') + - g.latencyPanel('loki_ingester_chunk_length', '{cluster="$cluster", job="$namespace/ingester"}', multiplier='1') + + g.latencyPanel('loki_ingester_chunk_entries', '{cluster="$cluster", job="$namespace/ingester"}', multiplier='1') + { yaxes: g.yaxes('short') }, ) .addPanel( g.panel('Entries') + - g.queryPanel('sum(rate(loki_chunk_store_index_entries_per_chunk_sum{cluster="$cluster", job="$namespace/ingester"}[5m])) / sum(rate(loki_chunk_store_index_entries_per_chunk_count{cluster="$cluster", job="$namespace/ingester"}[5m]))', 'entries'), + g.queryPanel('sum(rate(cortex_chunk_store_index_entries_per_chunk_sum{cluster="$cluster", job="$namespace/ingester"}[5m])) / sum(rate(cortex_chunk_store_index_entries_per_chunk_count{cluster="$cluster", job="$namespace/ingester"}[5m]))', 'entries'), ), ) .addRow( g.row('Flush Stats') .addPanel( g.panel('Queue Length') + - g.queryPanel('loki_ingester_flush_queue_length{cluster="$cluster", job="$namespace/ingester"}', '{{instance}}'), + g.queryPanel('cortex_ingester_flush_queue_length{cluster="$cluster", job="$namespace/ingester"}', '{{instance}}'), ) .addPanel( g.panel('Flush Rate') +