Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancel issue between Query Frontend and Query Schdeduler #5113

Merged
merged 12 commits into from
Jan 13, 2022
11 changes: 3 additions & 8 deletions pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,21 +192,15 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers.

enqueueAgain:
var cancelCh chan<- uint64
select {
case <-ctx.Done():
return nil, ctx.Err()

case f.requestsCh <- freq:
// Enqueued, let's wait for response.
}

var cancelCh chan<- uint64

select {
case <-ctx.Done():
return nil, ctx.Err()
enqRes := <-freq.enqueue

case enqRes := <-freq.enqueue:
if enqRes.status == waitForResponse {
cancelCh = enqRes.cancelCh
break // go wait for response.
Expand All @@ -228,6 +222,7 @@ enqueueAgain:
// cancellation sent.
default:
// failed to cancel, ignore.
level.Warn(f.log).Log("msg", "failed to send cancellation request to scheduler, queue full")
}
}
return nil, ctx.Err()
Expand Down
5 changes: 4 additions & 1 deletion pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ func newFrontendSchedulerWorker(conn *grpc.ClientConn, schedulerAddr string, fro
schedulerAddr: schedulerAddr,
frontendAddr: frontendAddr,
requestCh: requestCh,
cancelCh: make(chan uint64),
// Allow to enqueue enough cancellation requests. ~ 8MB memory size.
cancelCh: make(chan uint64, 1000000),
}
w.ctx, w.cancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -322,6 +323,8 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
Body: []byte("too many outstanding requests"),
},
}
default:
req.enqueue <- enqueueResult{status: failed}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a warning here, or even an error: this shouldn't ever happen and indicates a bug.

}

case reqID := <-w.cancelCh:
Expand Down
38 changes: 38 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,44 @@ func TestFrontendCancellation(t *testing.T) {
})
}

// If FrontendWorkers are busy, cancellation passed by Query frontend may not reach
// all the frontend workers thus not reaching the scheduler as well.
// Issue: https://github.com/grafana/loki/issues/5132
func TestFrontendWorkerCancellation(t *testing.T) {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
f, ms := setupFrontend(t, nil)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

// send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests.
reqCount := testFrontendWorkerConcurrency + 5
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
}()
}

wg.Wait()

// We wait a bit to make sure scheduler receives the cancellation request.
// 2 * reqCount because for every request, should also be corresponding cancel request
test.Poll(t, 5*time.Second, 2*reqCount, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()

return len(ms.msgs)
})

ms.checkWithLock(func() {
require.Equal(t, 2*reqCount, len(ms.msgs))
})
}

func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil)

Expand Down