Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simpler new chunk key v12 #5054

Merged
merged 28 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
90fe9c2
starts hacking new chunk paths
owen-d Nov 10, 2021
c634d38
Create new chunk key for S3; update parsing and add basic test and be…
JordanRushing Nov 24, 2021
d6b8616
Finish plumbing through the chunk.ExternalKey ->
cstyan Dec 2, 2021
ebab2ac
Clean up lint failures
JordanRushing Dec 2, 2021
25b631a
Clean up chunk.go and fix tests
JordanRushing Dec 2, 2021
7fda43b
Break SchemaConfig.ExternalKey() into smaller functions
JordanRushing Dec 2, 2021
db54175
Quickly fix variable name in SchemaConfig.newerExternalKey()
JordanRushing Dec 2, 2021
abee82d
Clean up ExternalKey conditional logic; add better comments; add tests
JordanRushing Dec 3, 2021
ca95847
Fix a bug where we are prepending userID to legacy Chunk keys but nev…
JordanRushing Dec 8, 2021
b753541
Add small SchemaConfig test
JordanRushing Dec 8, 2021
1cc0986
Update docs and CHANGELOG.md
JordanRushing Dec 8, 2021
50911ba
Merge branch 'main' into new-chunk-external-key-v12
JordanRushing Dec 8, 2021
1645a49
Correctly return an error when failing to parse a chunk external key
JordanRushing Dec 8, 2021
a3ae0ea
Revert IdentityEncoder to Base64Encoder after testing
JordanRushing Dec 14, 2021
823a63f
Merge branch 'main' into new-chunk-external-key-v12
JordanRushing Dec 27, 2021
2679054
Fix assignment in test for linter
JordanRushing Dec 27, 2021
aa3dc28
Remove leftover comments from development
JordanRushing Dec 27, 2021
d300f55
Change v12 version comment format; remove redundant login in `ParseEx…
JordanRushing Dec 27, 2021
8b582f6
Change remaining v12 comment style
JordanRushing Dec 27, 2021
0a089c7
Merge remote-tracking branch 'upstream/main' into new-chunk-external-…
JordanRushing Jan 4, 2022
d3d2fc3
Pass chunk.SchemaConfig to new parallel chunk client
JordanRushing Jan 4, 2022
24219ba
Simplify chunk external key prefixes for schema v12
JordanRushing Jan 5, 2022
1d1dab9
Fix broken benchmark; add benchmarks for old parsing conditional
JordanRushing Jan 6, 2022
e23c97a
Remove unneeded lines from upgrading doc
JordanRushing Jan 6, 2022
9bb4960
Add benchmarks for root chunk external key parsing functions
JordanRushing Jan 6, 2022
8ee35f8
memoizes schema number calculations
owen-d Jan 6, 2022
8c833be
Merge pull request #3 from owen-d/pull/5054
JordanRushing Jan 6, 2022
fcb10a6
Resolve linter issue with PeriodConfig receiver name
JordanRushing Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [4904](https://github.com/grafana/loki/pull/4904) **bboreham**: Fixes rare race condition that could crash an ingester.
* [4942](https://github.com/grafana/loki/pull/4942) **cyriltovena**: Allow to disable HTTP/2 for GCS.
* [4876](https://github.com/grafana/loki/pull/4876) **trevorwhitney**: Docs: add simple, scalable example using docker-compose
* [4857](https://github.com/grafana/loki/pull/4857) **jordanrushing**: New schema v12 changes chunk key structure

# 2.4.1 (2021/11/07)

Expand Down
13 changes: 9 additions & 4 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func main() {
syncRanges := calcSyncRanges(parsedFrom.UnixNano(), parsedTo.UnixNano(), shardByNs.Nanoseconds())
log.Printf("With a shard duration of %v, %v ranges have been calculated.\n", shardByNs, len(syncRanges))

cm := newChunkMover(ctx, s, d, *source, *dest, matchers, *batch)
// Pass dest schema config, the destination determines the new chunk external keys using potentially a different schema config.
cm := newChunkMover(ctx, destConfig.SchemaConfig.SchemaConfig, s, d, *source, *dest, matchers, *batch)
syncChan := make(chan *syncRange)
errorChan := make(chan error)
statsChan := make(chan stats)
Expand Down Expand Up @@ -264,6 +265,7 @@ type stats struct {

type chunkMover struct {
ctx context.Context
schema chunk.SchemaConfig
source storage.Store
dest storage.Store
sourceUser string
Expand All @@ -272,9 +274,10 @@ type chunkMover struct {
batch int
}

func newChunkMover(ctx context.Context, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int) *chunkMover {
func newChunkMover(ctx context.Context, s chunk.SchemaConfig, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int) *chunkMover {
cm := &chunkMover{
ctx: ctx,
schema: s,
source: source,
dest: dest,
sourceUser: sourceUser,
Expand Down Expand Up @@ -319,9 +322,11 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <
chks := make([]chunk.Chunk, 0, len(chunks))

// FetchChunks requires chunks to be ordered by external key.
sort.Slice(chunks, func(l, m int) bool { return chunks[l].ExternalKey() < chunks[m].ExternalKey() })
sort.Slice(chunks, func(x, y int) bool {
return m.schema.ExternalKey(chunks[x]) < m.schema.ExternalKey(chunks[y])
})
for _, chk := range chunks {
key := chk.ExternalKey()
key := m.schema.ExternalKey(chk)
keys = append(keys, key)
chks = append(chks, chk)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,11 +668,16 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq
return nil, err
}

// todo (Callum) ingester should maybe store the whole schema config?
s := chunk.SchemaConfig{
Configs: i.periodicConfigs,
}

// build the response
resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}}
for _, chunks := range chunksGroups {
for _, chk := range chunks {
resp.ChunkIDs = append(resp.ChunkIDs, chk.ExternalKey())
resp.ChunkIDs = append(resp.ChunkIDs, s.ExternalKey(chk))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
// Use AsyncStore to query both ingesters local store and chunk store for store queries.
// Only queriers should use the AsyncStore, it should never be used in ingesters.
chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier,
chunkStore = loki_storage.NewAsyncStore(chunkStore, t.Cfg.SchemaConfig.SchemaConfig, t.ingesterQuerier,
calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration),
)
case t.Cfg.isModuleEnabled(All):
Expand Down
12 changes: 7 additions & 5 deletions pkg/storage/async_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ type IngesterQuerier interface {
// It should never be used in ingesters otherwise it would start spiraling around doing queries over and over again to other ingesters.
type AsyncStore struct {
chunk.Store
scfg chunk.SchemaConfig
ingesterQuerier IngesterQuerier
queryIngestersWithin time.Duration
}

func NewAsyncStore(store chunk.Store, querier IngesterQuerier, queryIngestersWithin time.Duration) *AsyncStore {
func NewAsyncStore(store chunk.Store, scfg chunk.SchemaConfig, querier IngesterQuerier, queryIngestersWithin time.Duration) *AsyncStore {
return &AsyncStore{
Store: store,
scfg: scfg,
ingesterQuerier: querier,
queryIngestersWithin: queryIngestersWithin,
}
Expand Down Expand Up @@ -87,7 +89,7 @@ func (a *AsyncStore) GetChunkRefs(ctx context.Context, userID string, from, thro
}

func (a *AsyncStore) mergeIngesterAndStoreChunks(userID string, storeChunks [][]chunk.Chunk, fetchers []*chunk.Fetcher, ingesterChunkIDs []string) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
ingesterChunkIDs = filterDuplicateChunks(storeChunks, ingesterChunkIDs)
ingesterChunkIDs = filterDuplicateChunks(a.scfg, storeChunks, ingesterChunkIDs)
level.Debug(util_log.Logger).Log("msg", "post-filtering ingester chunks", "count", len(ingesterChunkIDs))

fetcherToChunksGroupIdx := make(map[*chunk.Fetcher]int, len(fetchers))
Expand All @@ -105,7 +107,7 @@ func (a *AsyncStore) mergeIngesterAndStoreChunks(userID string, storeChunks [][]
// ToDo(Sandeep) possible optimization: Keep the chunk fetcher reference handy after first call since it is expected to stay the same.
fetcher := a.Store.GetChunkFetcher(chk.Through)
if fetcher == nil {
return nil, nil, fmt.Errorf("got a nil fetcher for chunk %s", chk.ExternalKey())
return nil, nil, fmt.Errorf("got a nil fetcher for chunk %s", a.scfg.ExternalKey(chk))
}

if _, ok := fetcherToChunksGroupIdx[fetcher]; !ok {
Expand All @@ -121,13 +123,13 @@ func (a *AsyncStore) mergeIngesterAndStoreChunks(userID string, storeChunks [][]
return storeChunks, fetchers, nil
}

func filterDuplicateChunks(storeChunks [][]chunk.Chunk, ingesterChunkIDs []string) []string {
func filterDuplicateChunks(scfg chunk.SchemaConfig, storeChunks [][]chunk.Chunk, ingesterChunkIDs []string) []string {
filteredChunkIDs := make([]string, 0, len(ingesterChunkIDs))
seen := make(map[string]struct{}, len(storeChunks))

for i := range storeChunks {
for j := range storeChunks[i] {
seen[storeChunks[i][j].ExternalKey()] = struct{}{}
seen[scfg.ExternalKey(storeChunks[i][j])] = struct{}{}
}
}

Expand Down
43 changes: 32 additions & 11 deletions pkg/storage/async_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,23 @@ func buildMockChunkRef(t *testing.T, num int) []chunk.Chunk {
now := time.Now()
var chunks []chunk.Chunk

s := chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
}

for i := 0; i < num; i++ {
chk := newChunk(buildTestStreams(fooLabelsWithName, timeRange{
from: now.Add(time.Duration(i) * time.Minute),
to: now.Add(time.Duration(i+1) * time.Minute),
}))

chunkRef, err := chunk.ParseExternalKey(chk.UserID, chk.ExternalKey())
chunkRef, err := chunk.ParseExternalKey(chk.UserID, s.ExternalKey(chk))
require.NoError(t, err)

chunks = append(chunks, chunkRef)
Expand All @@ -80,6 +90,17 @@ func buildMockFetchers(num int) []*chunk.Fetcher {
func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
testChunks := buildMockChunkRef(t, 10)
fetchers := buildMockFetchers(3)

s := chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
}

for _, tc := range []struct {
name string
storeChunks [][]chunk.Chunk
Expand All @@ -101,7 +122,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
},
{
name: "no chunks from querier",
ingesterChunkIDs: convertChunksToChunkIDs(testChunks),
ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks),
ingesterFetcher: fetchers[0],
expectedChunks: [][]chunk.Chunk{testChunks},
expectedFetchers: fetchers[0:1],
Expand All @@ -112,7 +133,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
testChunks[0:5],
},
storeFetcher: fetchers[0:1],
ingesterChunkIDs: convertChunksToChunkIDs(testChunks[5:]),
ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks[5:]),
ingesterFetcher: fetchers[1],
expectedChunks: [][]chunk.Chunk{
testChunks[0:5],
Expand All @@ -127,7 +148,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
testChunks[5:],
},
storeFetcher: fetchers[0:2],
ingesterChunkIDs: convertChunksToChunkIDs(testChunks[5:]),
ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks[5:]),
ingesterFetcher: fetchers[2],
expectedChunks: [][]chunk.Chunk{
testChunks[0:5],
Expand All @@ -142,7 +163,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
testChunks[2:5],
},
storeFetcher: fetchers[0:2],
ingesterChunkIDs: convertChunksToChunkIDs(testChunks[5:]),
ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks[5:]),
ingesterFetcher: fetchers[1],
expectedChunks: [][]chunk.Chunk{
testChunks[0:2],
Expand All @@ -157,7 +178,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
testChunks[2:5],
},
storeFetcher: fetchers[0:2],
ingesterChunkIDs: convertChunksToChunkIDs(testChunks[5:]),
ingesterChunkIDs: convertChunksToChunkIDs(s, testChunks[5:]),
ingesterFetcher: fetchers[2],
expectedChunks: [][]chunk.Chunk{
testChunks[0:2],
Expand All @@ -172,7 +193,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
testChunks[0:5],
},
storeFetcher: fetchers[0:1],
ingesterChunkIDs: convertChunksToChunkIDs(append(testChunks[5:], testChunks[5:]...)),
ingesterChunkIDs: convertChunksToChunkIDs(s, append(testChunks[5:], testChunks[5:]...)),
ingesterFetcher: fetchers[0],
expectedChunks: [][]chunk.Chunk{
testChunks,
Expand All @@ -188,7 +209,7 @@ func TestAsyncStore_mergeIngesterAndStoreChunks(t *testing.T) {
ingesterQuerier := newIngesterQuerierMock()
ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.ingesterChunkIDs, nil)

asyncStore := NewAsyncStore(store, ingesterQuerier, 0)
asyncStore := NewAsyncStore(store, chunk.SchemaConfig{}, ingesterQuerier, 0)

chunks, fetchers, err := asyncStore.GetChunkRefs(context.Background(), "fake", model.Now(), model.Now(), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -245,7 +266,7 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) {
ingesterQuerier := newIngesterQuerierMock()
ingesterQuerier.On("GetChunkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)

asyncStore := NewAsyncStore(store, ingesterQuerier, tc.queryIngestersWithin)
asyncStore := NewAsyncStore(store, chunk.SchemaConfig{}, ingesterQuerier, tc.queryIngestersWithin)

_, _, err := asyncStore.GetChunkRefs(context.Background(), "fake", tc.queryFrom, tc.queryThrough, nil)
require.NoError(t, err)
Expand All @@ -259,10 +280,10 @@ func TestAsyncStore_QueryIngestersWithin(t *testing.T) {
}
}

func convertChunksToChunkIDs(chunks []chunk.Chunk) []string {
func convertChunksToChunkIDs(s chunk.SchemaConfig, chunks []chunk.Chunk) []string {
var chunkIDs []string
for _, chk := range chunks {
chunkIDs = append(chunkIDs, chk.ExternalKey())
chunkIDs = append(chunkIDs, s.ExternalKey(chk))
}

return chunkIDs
Expand Down
28 changes: 17 additions & 11 deletions pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func NewChunkMetrics(r prometheus.Registerer, maxBatchSize int) *ChunkMetrics {
// chunks with the next chunk from the next batch and added it to the next iteration. In this case the boundaries of the batch
// is reduced to non-overlapping chunks boundaries.
type batchChunkIterator struct {
schemas chunk.SchemaConfig
chunks lazyChunks
batchSize int
lastOverlapping []*LazyChunk
Expand All @@ -95,6 +96,7 @@ type batchChunkIterator struct {
// newBatchChunkIterator creates a new batch iterator with the given batchSize.
func newBatchChunkIterator(
ctx context.Context,
s chunk.SchemaConfig,
chunks []*LazyChunk,
batchSize int,
direction logproto.Direction,
Expand All @@ -109,6 +111,7 @@ func newBatchChunkIterator(
matchers = removeMatchersByName(matchers, labels.MetricName, astmapper.ShardLabel)
res := &batchChunkIterator{
batchSize: batchSize,
schemas: s,
metrics: metrics,
matchers: matchers,
start: start,
Expand Down Expand Up @@ -279,7 +282,7 @@ func (it *batchChunkIterator) nextBatch() (res *chunkBatch) {
}
}
// download chunk for this batch.
chksBySeries, err := fetchChunkBySeries(it.ctx, it.metrics, batch, it.matchers, it.chunkFilterer)
chksBySeries, err := fetchChunkBySeries(it.ctx, it.schemas, it.metrics, batch, it.matchers, it.chunkFilterer)
if err != nil {
return &chunkBatch{err: err}
}
Expand Down Expand Up @@ -312,6 +315,7 @@ type logBatchIterator struct {

func newLogBatchIterator(
ctx context.Context,
schemas chunk.SchemaConfig,
metrics *ChunkMetrics,
chunks []*LazyChunk,
batchSize int,
Expand All @@ -326,7 +330,7 @@ func newLogBatchIterator(
pipeline: pipeline,
ctx: ctx,
cancel: cancel,
batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, metrics, matchers, chunkFilterer),
batchChunkIterator: newBatchChunkIterator(ctx, schemas, chunks, batchSize, direction, start, end, metrics, matchers, chunkFilterer),
}, nil
}

Expand Down Expand Up @@ -451,6 +455,7 @@ type sampleBatchIterator struct {

func newSampleBatchIterator(
ctx context.Context,
schemas chunk.SchemaConfig,
metrics *ChunkMetrics,
chunks []*LazyChunk,
batchSize int,
Expand All @@ -464,7 +469,7 @@ func newSampleBatchIterator(
extractor: extractor,
ctx: ctx,
cancel: cancel,
batchChunkIterator: newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers, chunkFilterer),
batchChunkIterator: newBatchChunkIterator(ctx, schemas, chunks, batchSize, logproto.FORWARD, start, end, metrics, matchers, chunkFilterer),
}, nil
}

Expand Down Expand Up @@ -585,6 +590,7 @@ func removeMatchersByName(matchers []*labels.Matcher, names ...string) []*labels

func fetchChunkBySeries(
ctx context.Context,
s chunk.SchemaConfig,
metrics *ChunkMetrics,
chunks []*LazyChunk,
matchers []*labels.Matcher,
Expand All @@ -594,7 +600,7 @@ func fetchChunkBySeries(

// Make sure the initial chunks are loaded. This is not one chunk
// per series, but rather a chunk per non-overlapping iterator.
if err := loadFirstChunks(ctx, chksBySeries); err != nil {
if err := loadFirstChunks(ctx, s, chksBySeries); err != nil {
return nil, err
}

Expand All @@ -610,7 +616,7 @@ func fetchChunkBySeries(
}

// Finally we load all chunks not already loaded
if err := fetchLazyChunks(ctx, allChunks); err != nil {
if err := fetchLazyChunks(ctx, s, allChunks); err != nil {
return nil, err
}
metrics.chunks.WithLabelValues(statusMatched).Add(float64(len(allChunks)))
Expand Down Expand Up @@ -655,7 +661,7 @@ outer:
return chks
}

func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error {
func fetchLazyChunks(ctx context.Context, s chunk.SchemaConfig, chunks []*LazyChunk) error {
var (
totalChunks int64
start = time.Now()
Expand Down Expand Up @@ -687,9 +693,9 @@ func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error {
index := make(map[string]*LazyChunk, len(chunks))

// FetchChunks requires chunks to be ordered by external key.
sort.Slice(chunks, func(i, j int) bool { return chunks[i].Chunk.ExternalKey() < chunks[j].Chunk.ExternalKey() })
sort.Slice(chunks, func(i, j int) bool { return s.ExternalKey(chunks[i].Chunk) < s.ExternalKey(chunks[j].Chunk) })
for _, chk := range chunks {
key := chk.Chunk.ExternalKey()
key := s.ExternalKey(chk.Chunk)
keys = append(keys, key)
chks = append(chks, chk.Chunk)
index[key] = chk
Expand All @@ -708,7 +714,7 @@ func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error {
}
// assign fetched chunk by key as FetchChunks doesn't guarantee the order.
for _, chk := range chks {
index[chk.ExternalKey()].Chunk = chk
index[s.ExternalKey(chk)].Chunk = chk
}

errChan <- nil
Expand Down Expand Up @@ -742,7 +748,7 @@ func isInvalidChunkError(err error) bool {
return false
}

func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*LazyChunk) error {
func loadFirstChunks(ctx context.Context, s chunk.SchemaConfig, chks map[model.Fingerprint][][]*LazyChunk) error {
var toLoad []*LazyChunk
for _, lchks := range chks {
for _, lchk := range lchks {
Expand All @@ -752,7 +758,7 @@ func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*LazyCh
toLoad = append(toLoad, lchk[0])
}
}
return fetchLazyChunks(ctx, toLoad)
return fetchLazyChunks(ctx, s, toLoad)
}

func partitionBySeriesChunks(chunks []*LazyChunk) map[model.Fingerprint][][]*LazyChunk {
Expand Down
Loading