Skip to content

Commit

Permalink
Increase scheduler worker cancellation chan cap
Browse files Browse the repository at this point in the history
With previous implementation, if worker was busy talking to scheduler,
we didn't push the cancellation, keeping that query running.

When cancelling a query, all its subqueries are cancelled at the same
time, so this was most likely happening all the time (first subquery
scheduled on this worker was canceled, the rest were not because worker
was busy cancelling the first one).

Also removed the `<-ctx.Done()` escape point when waiting for the
enqueueing ACK and modified the enqueueing method to ensure that it
always responds something.

Fixes: #740
Inspired by: grafana/loki#5113

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
colega committed Jan 13, 2022
1 parent e2e2e10 commit cf1eb2a
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 11 deletions.
12 changes: 3 additions & 9 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,14 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers.

enqueueAgain:
var cancelCh chan<- uint64
select {
case <-ctx.Done():
return nil, ctx.Err()

case f.requestsCh <- freq:
// Enqueued, let's wait for response.
}

var cancelCh chan<- uint64

select {
case <-ctx.Done():
return nil, ctx.Err()

case enqRes := <-freq.enqueue:
enqRes := <-freq.enqueue
if enqRes.status == waitForResponse {
cancelCh = enqRes.cancelCh
break // go wait for response.
Expand All @@ -232,6 +225,7 @@ enqueueAgain:
// cancellation sent.
default:
// failed to cancel, ignore.
level.Warn(f.log).Log("msg", "failed to send cancellation request to scheduler, queue full")
}
}
return nil, ctx.Err()
Expand Down
14 changes: 12 additions & 2 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ import (
"github.com/grafana/mimir/pkg/util"
)

const schedulerAddressLabel = "scheduler_address"
const (
schedulerAddressLabel = "scheduler_address"
// schedulerWorkerCancelChanCapacity should be at least as big as the number of sub-queries issued by a single query
// per scheduler (after splitting and sharding) in order to allow all of them being canceled while scheduler worker is busy.
// Since the channel holds uint64, this is 8KB per scheduler worker.
schedulerWorkerCancelChanCapacity = 1000
)

type frontendSchedulerWorkers struct {
services.Service
Expand Down Expand Up @@ -197,7 +203,7 @@ func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, fro
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestCh: requestCh,
cancelCh: make(chan uint64),
cancelCh: make(chan uint64, schedulerWorkerCancelChanCapacity),
enqueuedRequests: enqueuedRequests,
}
w.ctx, w.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -331,6 +337,10 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
Body: []byte("too many outstanding requests"),
},
}

default:
level.Error(w.log).Log("msg", "unknown response status from the scheduler", "status", resp.Status, "queryID", req.queryID)
req.enqueue <- enqueueResult{status: failed}
}

case reqID := <-w.cancelCh:
Expand Down
49 changes: 49 additions & 0 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,55 @@ func TestFrontendCancellation(t *testing.T) {
})
}

// If FrontendWorkers are busy, cancellation passed by Query frontend may not reach
// all the frontend workers thus not reaching the scheduler as well.
// Issue: https://github.com/grafana/mimir/issues/740
func TestFrontendWorkerCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

// send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests.
reqCount := testFrontendWorkerConcurrency + 5
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
}()
}

wg.Wait()

// We wait a bit to make sure scheduler receives the cancellation request.
// 2 * reqCount because for every request, should also be corresponding cancel request
test.Poll(t, 5*time.Second, 2*reqCount, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()

return len(ms.msgs)
})

ms.checkWithLock(func() {
require.Equal(t, 2*reqCount, len(ms.msgs))
msgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{}
for _, msg := range ms.msgs {
msgTypeCounts[msg.Type]++
}
expectedMsgTypeCounts := map[schedulerpb.FrontendToSchedulerType]int{
schedulerpb.ENQUEUE: reqCount,
schedulerpb.CANCEL: reqCount,
}
require.Equalf(t, expectedMsgTypeCounts, msgTypeCounts,
"Should receive %d enqueue (%d) requests, and %d cancel (%d) requests.", reqCount, schedulerpb.ENQUEUE, reqCount, schedulerpb.CANCEL,
)
})
}

func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil, nil)

Expand Down

0 comments on commit cf1eb2a

Please sign in to comment.