Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle context cancellation in some of the querier downstream requests #5080

Merged
merged 24 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0f0dc81
Fix deadlock in disconnecting querier
cyriltovena Jan 6, 2022
4c41d39
todo
kavirajk Jan 6, 2022
e28892c
Add more logs to queriers cancellation
cyriltovena Jan 7, 2022
61abcbd
new span
cyriltovena Jan 7, 2022
f5352bb
Fixe log
cyriltovena Jan 7, 2022
d1da6df
Exit earlier for batch iterator
cyriltovena Jan 7, 2022
f559d0b
Add missing return
kavirajk Jan 7, 2022
a0ad6a4
Merge remote-tracking branch 'upstream/main' into cancelation-issue-w…
cyriltovena Jan 7, 2022
49d4d73
Add store context cancellation
cyriltovena Jan 7, 2022
7d8dde6
Merge branch 'fix-race-shutdown' into cancelation-issue-with-querier
cyriltovena Jan 7, 2022
499325f
Fixes a possible cancellation issue
cyriltovena Jan 7, 2022
3e3979d
Merge branch 'fix-deadlock-limited' into cancelation-issue-with-querier
cyriltovena Jan 7, 2022
f7c5998
Rmove code to find issues
cyriltovena Jan 7, 2022
f66d788
Add splitMiddleware back to handler
kavirajk Jan 7, 2022
538ab07
Remove query split
kavirajk Jan 7, 2022
f4a0684
Merge branch 'main' into cancelation-issue-with-querier
kavirajk Jan 8, 2022
7ed4d02
Remove split middleware and context cancel check for chunks
kavirajk Jan 8, 2022
87d80d0
Handle context cancel properly on `getChunk()` via select.
kavirajk Jan 8, 2022
faae173
Use context in getChunk without starting new goroutine
kavirajk Jan 9, 2022
fd5befa
Just normal ctx.Err() check instead of using Done channel
kavirajk Jan 9, 2022
3bb15a9
Merge branch 'main' into cancelation-issue-with-querier
kavirajk Jan 10, 2022
78db2fd
Remove debug logs
kavirajk Jan 10, 2022
7ab8056
Update pkg/querier/http.go
kavirajk Jan 10, 2022
592c7b9
Remove unused imports
kavirajk Jan 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/storage/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (it *sampleBatchIterator) Sample() logproto.Sample {

func (it *sampleBatchIterator) Next() bool {
// for loop to avoid recursion
for {
for it.ctx.Err() == nil {
if it.curr != nil && it.curr.Next() {
return true
}
Expand All @@ -527,6 +527,7 @@ func (it *sampleBatchIterator) Next() bool {
return false
}
}
return false
}

// newChunksIterator creates an iterator over a set of lazychunks.
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (c *Fetcher) worker() {
// FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be
// lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.
func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks")
defer log.Span.Finish()

Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/chunk/objectclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (o *Client) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.C
}

func (o *Client) getChunk(ctx context.Context, decodeContext *chunk.DecodeContext, c chunk.Chunk) (chunk.Chunk, error) {

if ctx.Err() != nil {
return chunk.Chunk{}, ctx.Err()
}

key := o.schema.ExternalKey(c)
if o.keyEncoder != nil {
key = o.keyEncoder(key)
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/chunk/util/parallel_chunk_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func GetParallelChunks(ctx context.Context, maxParallel int, chunks []chunk.Chun
defer log.Finish()
log.LogFields(otlog.Int("requested", len(chunks)))

if ctx.Err() != nil {
return nil, ctx.Err()
}

queuedChunks := make(chan chunk.Chunk)

go func() {
Expand Down