Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase scheduler worker cancellation chan cap #741

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@
* [BUGFIX] Ruler: fix formatting of rule groups in `/ruler/rule_groups` endpoint. #655
* [BUGFIX] Querier: Disable query scheduler SRV DNS lookup. #689
* [BUGFIX] Query-frontend: fix API error messages that were mentioning Prometheus `--enable-feature=promql-negative-offset` and `--enable-feature=promql-at-modifier` flags. #688
* [BUGFIX] Query-frontend: worker's cancellation channels are now buffered to ensure that all request cancellations are properly handled. #741

### Mixin (changes since `grafana/cortex-jsonnet` `1.9.0`)

Expand Down
12 changes: 3 additions & 9 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,14 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers.

enqueueAgain:
var cancelCh chan<- uint64
select {
case <-ctx.Done():
return nil, ctx.Err()

case f.requestsCh <- freq:
// Enqueued, let's wait for response.
}

var cancelCh chan<- uint64

select {
case <-ctx.Done():
return nil, ctx.Err()

case enqRes := <-freq.enqueue:
enqRes := <-freq.enqueue
colega marked this conversation as resolved.
Show resolved Hide resolved
if enqRes.status == waitForResponse {
cancelCh = enqRes.cancelCh
break // go wait for response.
Expand All @@ -232,6 +225,7 @@ enqueueAgain:
// cancellation sent.
default:
// failed to cancel, ignore.
level.Warn(f.log).Log("msg", "failed to send cancellation request to scheduler, queue full")
}
}
return nil, ctx.Err()
Expand Down
13 changes: 11 additions & 2 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ import (
"github.com/grafana/mimir/pkg/util"
)

const schedulerAddressLabel = "scheduler_address"
const (
schedulerAddressLabel = "scheduler_address"
// schedulerWorkerCancelChanCapacity should be at least as big as the number of sub-queries issued by a single query
// per scheduler (after splitting and sharding) in order to allow all of them being canceled while scheduler worker is busy.
schedulerWorkerCancelChanCapacity = 1000
)

type frontendSchedulerWorkers struct {
services.Service
Expand Down Expand Up @@ -197,7 +202,7 @@ func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, fro
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestCh: requestCh,
cancelCh: make(chan uint64),
cancelCh: make(chan uint64, schedulerWorkerCancelChanCapacity),
enqueuedRequests: enqueuedRequests,
}
w.ctx, w.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -331,6 +336,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
Body: []byte("too many outstanding requests"),
},
}

default:
level.Error(w.log).Log("msg", "unknown response status from the scheduler", "resp", resp, "queryID", req.queryID)
req.enqueue <- enqueueResult{status: failed}
}

case reqID := <-w.cancelCh:
Expand Down
49 changes: 49 additions & 0 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,55 @@ func TestFrontendCancellation(t *testing.T) {
})
}

// When frontendWorker that processed the request is busy (processing a new request or cancelling a previous one)
// we still need to make sure that the cancellation reach the scheduler at some point.
// Issue: https://github.com/grafana/mimir/issues/740
func TestFrontendWorkerCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

// send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests.
reqCount := testFrontendWorkerConcurrency + 5
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
}()
}

wg.Wait()

// We wait a bit to make sure scheduler receives the cancellation request.
// 2 * reqCount because for every request, should also be corresponding cancel request
test.Poll(t, 5*time.Second, 2*reqCount, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()

return len(ms.msgs)
})

ms.checkWithLock(func() {
require.Equal(t, 2*reqCount, len(ms.msgs))
msgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{}
for _, msg := range ms.msgs {
msgTypeCounts[msg.Type]++
}
expectedMsgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{
schedulerpb.ENQUEUE: reqCount,
schedulerpb.CANCEL: reqCount,
}
require.Equalf(t, expectedMsgTypeCounts, msgTypeCounts,
"Should receive %d enqueue (%d) requests, and %d cancel (%d) requests.", reqCount, schedulerpb.ENQUEUE, reqCount, schedulerpb.CANCEL,
)
})
}

func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)

Expand Down