From cf1eb2ae9d15d16470a63c60b8134ab4dd494420 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Thu, 13 Jan 2022 15:27:39 +0100 Subject: [PATCH 1/6] Increase scheduler worker cancellation chan cap With previous implementation, if worker was busy talking to scheduler, we didn't push the cancellation, keeping that query running. When cancelling a query, all its subqueries are cancelled at the same time, so this was most likely happening all the time (first subquery scheduled on this worker was canceled, the rest were not because worker was busy cancelling the first one). Also removed the `<-ctx.Done()` escape point when waiting for the enqueueing ACK and modified the enqueueing method to ensure that it always responds something. Fixes: https://github.com/grafana/mimir/issues/740 Inspired by: https://github.com/grafana/loki/pull/5113 Signed-off-by: Oleg Zaytsev --- pkg/frontend/v2/frontend.go | 12 ++--- pkg/frontend/v2/frontend_scheduler_worker.go | 14 +++++- pkg/frontend/v2/frontend_test.go | 49 ++++++++++++++++++++ 3 files changed, 64 insertions(+), 11 deletions(-) diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index 2fc6f5037a1..82e325939ae 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -196,21 +196,14 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. enqueueAgain: + var cancelCh chan<- uint64 select { case <-ctx.Done(): return nil, ctx.Err() case f.requestsCh <- freq: // Enqueued, let's wait for response. - } - - var cancelCh chan<- uint64 - - select { - case <-ctx.Done(): - return nil, ctx.Err() - - case enqRes := <-freq.enqueue: + enqRes := <-freq.enqueue if enqRes.status == waitForResponse { cancelCh = enqRes.cancelCh break // go wait for response. @@ -232,6 +225,7 @@ enqueueAgain: // cancellation sent. default: // failed to cancel, ignore. + level.Warn(f.log).Log("msg", "failed to send cancellation request to scheduler, queue full") } } return nil, ctx.Err() diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 9a93054c69b..edfb57e43ef 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -26,7 +26,13 @@ import ( "github.com/grafana/mimir/pkg/util" ) -const schedulerAddressLabel = "scheduler_address" +const ( + schedulerAddressLabel = "scheduler_address" + // schedulerWorkerCancelChanCapacity should be at least as big as the number of sub-queries issued by a single query + // per scheduler (after splitting and sharding) in order to allow all of them being canceled while scheduler worker is busy. + // Since the channel holds uint64, this is 8KB per scheduler worker. + schedulerWorkerCancelChanCapacity = 1000 +) type frontendSchedulerWorkers struct { services.Service @@ -197,7 +203,7 @@ func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, fro schedulerAddr: schedulerAddr, frontendAddr: frontendAddr, requestCh: requestCh, - cancelCh: make(chan uint64), + cancelCh: make(chan uint64, schedulerWorkerCancelChanCapacity), enqueuedRequests: enqueuedRequests, } w.ctx, w.cancel = context.WithCancel(context.Background()) @@ -331,6 +337,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro Body: []byte("too many outstanding requests"), }, } + + default: + level.Error(w.log).Log("msg", "unknown response status from the scheduler", "status", resp.Status, "queryID", req.queryID) + req.enqueue <- enqueueResult{status: failed} } case reqID := <-w.cancelCh: diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index d71ac56dc69..095185b8bf3 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -228,6 +228,55 @@ func TestFrontendCancellation(t *testing.T) { }) } +// If FrontendWorkers are busy, cancellation passed by Query frontend may not reach +// all the frontend workers thus not reaching the scheduler as well. +// Issue: https://github.com/grafana/mimir/issues/740 +func TestFrontendWorkerCancellation(t *testing.T) { + f, ms := setupFrontend(t, nil, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests. + reqCount := testFrontendWorkerConcurrency + 5 + var wg sync.WaitGroup + for i := 0; i < reqCount; i++ { + wg.Add(1) + go func() { + defer wg.Done() + resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + require.Nil(t, resp) + }() + } + + wg.Wait() + + // We wait a bit to make sure scheduler receives the cancellation request. + // 2 * reqCount because for every request, should also be corresponding cancel request + test.Poll(t, 5*time.Second, 2*reqCount, func() interface{} { + ms.mu.Lock() + defer ms.mu.Unlock() + + return len(ms.msgs) + }) + + ms.checkWithLock(func() { + require.Equal(t, 2*reqCount, len(ms.msgs)) + msgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{} + for _, msg := range ms.msgs { + msgTypeCounts[msg.Type]++ + } + expectedMsgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{ + schedulerpb.ENQUEUE: reqCount, + schedulerpb.CANCEL: reqCount, + } + require.Equalf(t, expectedMsgTypeCounts, msgTypeCounts, + "Should receive %d enqueue (%d) requests, and %d cancel (%d) requests.", reqCount, schedulerpb.ENQUEUE, reqCount, schedulerpb.CANCEL, + ) + }) +} + func TestFrontendFailedCancellation(t *testing.T) { f, ms := setupFrontend(t, nil, nil) From 73e2bea5a89ea9c77f97f42f2ccdc12282e98bd5 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Thu, 13 Jan 2022 15:32:32 +0100 Subject: [PATCH 2/6] Update CHANGELOG.md Signed-off-by: Oleg Zaytsev --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 122cca48d51..83fcae23b1b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -200,6 +200,7 @@ * [BUGFIX] Ruler: fix formatting of rule groups in `/ruler/rule_groups` endpoint. #655 * [BUGFIX] Querier: Disable query scheduler SRV DNS lookup. #689 * [BUGFIX] Query-frontend: fix API error messages that were mentioning Prometheus `--enable-feature=promql-negative-offset` and `--enable-feature=promql-at-modifier` flags. #688 +* [BUGFIX] Query-frontend: worker's cancellation channels are now buffered to ensure that all request cancellations are properly handled. #741 ### Mixin (changes since `grafana/cortex-jsonnet` `1.9.0`) From 75e2790eee6840965f2334adbb75763099cf2f05 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Thu, 13 Jan 2022 16:55:33 +0100 Subject: [PATCH 3/6] Remove comment about chan memory usage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Peter Štibraný --- pkg/frontend/v2/frontend_scheduler_worker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index edfb57e43ef..6b8268a1282 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -30,7 +30,6 @@ const ( schedulerAddressLabel = "scheduler_address" // schedulerWorkerCancelChanCapacity should be at least as big as the number of sub-queries issued by a single query // per scheduler (after splitting and sharding) in order to allow all of them being canceled while scheduler worker is busy. - // Since the channel holds uint64, this is 8KB per scheduler worker. schedulerWorkerCancelChanCapacity = 1000 ) From a4b9111caacb27c9404c7aaf4feb35428a4cbfa7 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Thu, 13 Jan 2022 18:00:30 +0100 Subject: [PATCH 4/6] Update test comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Peter Štibraný --- pkg/frontend/v2/frontend_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index 095185b8bf3..8cccb691e43 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -228,8 +228,8 @@ func TestFrontendCancellation(t *testing.T) { }) } -// If FrontendWorkers are busy, cancellation passed by Query frontend may not reach -// all the frontend workers thus not reaching the scheduler as well. +// When frontendWorker that processed the request is busy (processing a new request or cancelling a previous one) +// we still need to make sure that the cancellation reach the scheduler at some point. // Issue: https://github.com/grafana/mimir/issues/740 func TestFrontendWorkerCancellation(t *testing.T) { f, ms := setupFrontend(t, nil, nil) From 196a91493adbafca3a74e71de004f5ff0767b891 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Fri, 14 Jan 2022 09:12:29 +0100 Subject: [PATCH 5/6] Add resp.Error to the log when response is unknown Signed-off-by: Oleg Zaytsev --- pkg/frontend/v2/frontend_scheduler_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 6b8268a1282..2480b43b427 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -338,7 +338,7 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro } default: - level.Error(w.log).Log("msg", "unknown response status from the scheduler", "status", resp.Status, "queryID", req.queryID) + level.Error(w.log).Log("msg", "unknown response status from the scheduler", "status", resp.Status, "error", resp.Error, "queryID", req.queryID) req.enqueue <- enqueueResult{status: failed} } From cc7ee62b0468bf4a429056405d5d53c980b5ae87 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Fri, 14 Jan 2022 09:21:38 +0100 Subject: [PATCH 6/6] Log the entire uknown response Signed-off-by: Oleg Zaytsev --- pkg/frontend/v2/frontend_scheduler_worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 2480b43b427..a6a131f3537 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -338,7 +338,7 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro } default: - level.Error(w.log).Log("msg", "unknown response status from the scheduler", "status", resp.Status, "error", resp.Error, "queryID", req.queryID) + level.Error(w.log).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID) req.enqueue <- enqueueResult{status: failed} }