Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Loki Chunks Dashboard #1126

Merged
merged 13 commits into from
Oct 8, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions docs/operations/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,26 @@ The Loki Distributors expose the following metrics:

The Loki Ingesters expose the following metrics:

| Metric Name | Metric Type | Description |
| ----------------------------------------- | ----------- | ------------------------------------------------------------------------------------------- |
| `cortex_ingester_flush_queue_length` | Gauge | The total number of series pending in the flush queue. |
| `loki_ingester_chunk_age_seconds` | Histogram | Distribution of chunk ages when flushed. |
| `loki_ingester_chunk_encode_time_seconds` | Histogram | Distribution of chunk encode times. |
| `loki_ingester_chunk_entries` | Histogram | Distribution of entires per-chunk when flushed. |
| `loki_ingester_chunk_size_bytes` | Histogram | Distribution of chunk sizes when flushed. |
| `loki_ingester_chunk_stored_bytes_total` | Counter | Total bytes stored in chunks per tenant. |
| `loki_ingester_chunks_created_total` | Counter | The total number of chunks created in the ingester. |
| `loki_ingester_chunks_flushed_total` | Counter | The total number of chunks flushed by the ingester. |
| `loki_ingester_chunks_stored_total` | Counter | Total stored chunks per tenant. |
| `loki_ingester_received_chunks` | Counter | The total number of chunks sent by this ingester whilst joining during the handoff process. |
| `loki_ingester_samples_per_chunk` | Histogram | The number of samples in a chunk. |
| `loki_ingester_sent_chunks` | Counter | The total number of chunks sent by this ingester whilst leaving during the handoff process. |
| `loki_ingester_streams_created_total` | Counter | The total number of streams created per tenant. |
| `loki_ingester_streams_removed_total` | Counter | The total number of streams removed per tenant. |
| Metric Name | Metric Type | Description |
| -------------------------------------------- | ----------- | ------------------------------------------------------------------------------------------- |
| `cortex_ingester_flush_queue_length` | Gauge | The total number of series pending in the flush queue. |
| `cortex_chunk_store_index_entries_per_chunk` | Histogram | Number of index entries written to storage per chunk. |
| `loki_ingester_memory_chunks` | Gauge | The total number of chunks in memory. |
| `loki_ingester_memory_streams` | Gauge | The total number of streams in memory. |
| `loki_ingester_chunk_age_seconds` | Histogram | Distribution of chunk ages when flushed. |
| `loki_ingester_chunk_encode_time_seconds` | Histogram | Distribution of chunk encode times. |
| `loki_ingester_chunk_entries` | Histogram | Distribution of lines per-chunk when flushed. |
| `loki_ingester_chunk_size_bytes` | Histogram | Distribution of chunk sizes when flushed. |
| `loki_ingester_chunk_utilization` | Histogram | Distribution of chunk utilization when flushed. |
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
| `loki_ingester_chunk_compression_ratio` | Histogram | Distribution of chunk compression ratio when flushed. |
| `loki_ingester_chunk_stored_bytes_total` | Counter | Total bytes stored in chunks per tenant. |
| `loki_ingester_chunks_created_total` | Counter | The total number of chunks created in the ingester. |
| `loki_ingester_chunks_stored_total` | Counter | Total stored chunks per tenant. |
| `loki_ingester_received_chunks` | Counter | The total number of chunks sent by this ingester whilst joining during the handoff process. |
| `loki_ingester_samples_per_chunk` | Histogram | The number of samples in a chunk. |
| `loki_ingester_sent_chunks` | Counter | The total number of chunks sent by this ingester whilst leaving during the handoff process. |
| `loki_ingester_streams_created_total` | Counter | The total number of streams created per tenant. |
| `loki_ingester_streams_removed_total` | Counter | The total number of streams removed per tenant. |

Promtail exposes these metrics:

Expand Down
10 changes: 10 additions & 0 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ func (c *dumbChunk) Size() int {
return len(c.entries)
}

// UncompressedSize implements Chunk.
func (c *dumbChunk) UncompressedSize() int {
return c.Size()
}

// Utilization implements Chunk
func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/chunkenc/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,23 @@ func (Facade) Encoding() encoding.Encoding {
return GzipLogChunk
}

// Utilization implements encoding.Chunk.
func (f Facade) Utilization() float64 {
return f.c.Utilization()
}

// LokiChunk returns the chunkenc.Chunk.
func (f Facade) LokiChunk() Chunk {
return f.c
}

// UncompressedSize is a helper function to hide the type assertion kludge when wanting the uncompressed size of the Cortex interface encoding.Chunk.
func UncompressedSize(c encoding.Chunk) (int, bool) {
f, ok := c.(*Facade)

if !ok {
return 0, false
}

return f.c.UncompressedSize(), true
}
34 changes: 29 additions & 5 deletions pkg/chunkenc/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type block struct {

mint, maxt int64

offset int // The offset of the block in the chunk.
offset int // The offset of the block in the chunk.
uncompressedSize int // Total uncompressed size in bytes when the chunk is cut.
}

// This block holds the un-compressed entries. Once it has enough data, this is
Expand Down Expand Up @@ -313,6 +314,28 @@ func (c *MemChunk) SpaceFor(*logproto.Entry) bool {
return len(c.blocks) < blocksPerChunk
}

// UncompressedSize implements Chunk.
func (c *MemChunk) UncompressedSize() int {
size := 0

if !c.head.isEmpty() {
size += c.head.size
}

for _, b := range c.blocks {
size += b.uncompressedSize
}

return size
}

// Utilization implements Chunk. It is the bytes used as a percentage of the
func (c *MemChunk) Utilization() float64 {
size := c.UncompressedSize()

return float64(size) / float64(blocksPerChunk*c.blockSize)
}

// Append implements Chunk.
func (c *MemChunk) Append(entry *logproto.Entry) error {
entryTimestamp := entry.Timestamp.UnixNano()
Expand Down Expand Up @@ -352,10 +375,11 @@ func (c *MemChunk) cut() error {
}

c.blocks = append(c.blocks, block{
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
maxt: c.head.maxt,
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
maxt: c.head.maxt,
uncompressedSize: c.head.size,
})

c.head.entries = c.head.entries[:0]
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Chunk interface {
Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error)
Size() int
Bytes() ([]byte, error)
Utilization() float64
UncompressedSize() int
}

// CompressionWriter is the writer that compresses the data passed to it.
Expand Down
31 changes: 28 additions & 3 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,30 @@ import (
)

var (
chunkUtilization = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
})
memoryChunks = promauto.NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_memory_chunks",
Help: "The total number of chunks in memory.",
})
chunkEntries = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_entries",
Help: "Distribution of stored chunk entries (when stored).",
Help: "Distribution of stored lines per chunk (when stored).",
Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200
})
chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(10000, 2, 7), // biggest bucket is 10000*2^(7-1) = 640000 (~640KB)
})
chunkCompressionRatio = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "loki_ingester_chunk_compression_ratio",
Help: "Compression ratio of chunks (when stored).",
Buckets: prometheus.LinearBuckets(1, 1.5, 6),
})
chunksPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "loki_ingester_chunks_stored_total",
Help: "Total stored chunks per tenant.",
Expand Down Expand Up @@ -234,6 +248,7 @@ func (i *Ingester) shouldFlushChunk(chunk *chunkDesc) bool {
func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
now := time.Now()

prevNumChunks := len(stream.chunks)
for len(stream.chunks) > 0 {
if stream.chunks[0].flushed.IsZero() || now.Sub(stream.chunks[0].flushed) < i.cfg.RetainPeriod {
break
Expand All @@ -242,11 +257,13 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
stream.chunks[0].chunk = nil // erase reference so the chunk can be garbage-collected
stream.chunks = stream.chunks[1:]
}
memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))

if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
instance.index.Delete(client.FromLabelAdaptersToLabels(stream.labels), stream.fp)
instance.streamsRemovedTotal.Inc()
memoryStreams.Dec()
}
}

Expand Down Expand Up @@ -292,9 +309,17 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
continue
}

compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(wc.Data)

if ok {
chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}

chunkUtilization.Observe(wc.Data.Utilization())
chunkEntries.Observe(float64(numEntries))
chunkSize.Observe(float64(len(byt)))
sizePerTenant.Add(float64(len(byt)))
chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()
firstTime, _ := cs[i].chunk.Bounds()
chunkAge.Observe(time.Since(firstTime).Seconds())
Expand Down
16 changes: 15 additions & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ var (
)

var (
memoryStreams = promauto.NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_memory_streams",
Help: "The total number of streams in memory.",
})
streamsCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_streams_created_total",
Expand Down Expand Up @@ -89,10 +93,16 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.Inc()
i.addTailersToNewStream(stream)
}

return stream.consumeChunk(ctx, chunk)
err := stream.consumeChunk(ctx, chunk)
if err == nil {
memoryChunks.Inc()
}

return err
}

func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
Expand All @@ -113,10 +123,13 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
continue
}

prevNumChunks := len(stream.chunks)
if err := stream.Push(ctx, s.Entries); err != nil {
appendErr = err
continue
}

memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
}

return appendErr
Expand All @@ -136,6 +149,7 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
i.streams[fp] = stream
memoryStreams.Inc()
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)

Expand Down
6 changes: 0 additions & 6 deletions pkg/ingester/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ var (
Name: "ingester_chunks_created_total",
Help: "The total number of chunks created in the ingester.",
})
chunksFlushedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "loki",
Name: "ingester_chunks_flushed_total",
Help: "The total number of chunks flushed by the ingester.",
})
samplesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki",
Subsystem: "ingester",
Expand All @@ -42,7 +37,6 @@ var (

func init() {
prometheus.MustRegister(chunksCreatedTotal)
prometheus.MustRegister(chunksFlushedTotal)
prometheus.MustRegister(samplesPerChunk)
}

Expand Down
8 changes: 4 additions & 4 deletions production/loki-mixin/dashboards.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ local utils = import "mixin-utils/utils.libsonnet";
)
.addPanel(
g.panel('Chunks per series') +
g.queryPanel('sum(loki_ingester_memory_chunks{cluster="$cluster", job="$namespace/ingester"}) / sum(loki_ingester_memory_series{job="$namespace/ingester"})', 'chunks'),
g.queryPanel('sum(loki_ingester_memory_chunks{cluster="$cluster", job="$namespace/ingester"}) / sum(loki_ingester_memory_streams{job="$namespace/ingester"})', 'chunks'),
)
)
.addRow(
Expand All @@ -111,19 +111,19 @@ local utils = import "mixin-utils/utils.libsonnet";
g.row('Flush Stats')
.addPanel(
g.panel('Size') +
g.latencyPanel('loki_ingester_chunk_length', '{cluster="$cluster", job="$namespace/ingester"}', multiplier='1') +
g.latencyPanel('loki_ingester_chunk_entries', '{cluster="$cluster", job="$namespace/ingester"}', multiplier='1') +
{ yaxes: g.yaxes('short') },
)
.addPanel(
g.panel('Entries') +
g.queryPanel('sum(rate(loki_chunk_store_index_entries_per_chunk_sum{cluster="$cluster", job="$namespace/ingester"}[5m])) / sum(rate(loki_chunk_store_index_entries_per_chunk_count{cluster="$cluster", job="$namespace/ingester"}[5m]))', 'entries'),
g.queryPanel('sum(rate(cortex_chunk_store_index_entries_per_chunk_sum{cluster="$cluster", job="$namespace/ingester"}[5m])) / sum(rate(cortex_chunk_store_index_entries_per_chunk_count{cluster="$cluster", job="$namespace/ingester"}[5m]))', 'entries'),
),
)
.addRow(
g.row('Flush Stats')
.addPanel(
g.panel('Queue Length') +
g.queryPanel('loki_ingester_flush_queue_length{cluster="$cluster", job="$namespace/ingester"}', '{{instance}}'),
g.queryPanel('cortex_ingester_flush_queue_length{cluster="$cluster", job="$namespace/ingester"}', '{{instance}}'),
)
.addPanel(
g.panel('Flush Rate') +
Expand Down