Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancel issue between Query Frontend and Query Schdeduler #5113

Merged
merged 12 commits into from
Jan 13, 2022
17 changes: 11 additions & 6 deletions pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,20 @@ enqueueAgain:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request")
}

fmt.Println("waiting on either cancel or response", "cancelCh", cancelCh)
select {
case <-ctx.Done():
if cancelCh != nil {
select {
case cancelCh <- freq.queryID:
// cancellation sent.
default:
// failed to cancel, ignore.
}
// NOTE(kavi): I think we don't need buffer channel.
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
// Let it block, it worker receives it, We don't want to exist RoundTripGRPC without cancelling the frontend worker
cancelCh <- freq.queryID

// select {
// case cancelCh <- freq.queryID:
// cancellation sent.
// default:
// // failed to cancel, ignore.
// }
}
return nil, ctx.Err()

Expand Down
3 changes: 3 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"context"
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -276,6 +277,7 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
return nil

case req := <-w.requestCh:
fmt.Println("Got the request", req.queryID)
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: req.queryID,
Expand Down Expand Up @@ -325,6 +327,7 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
}

case reqID := <-w.cancelCh:
fmt.Println("Got the cancel as well", reqID)
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.CANCEL,
QueryID: reqID,
Expand Down
44 changes: 44 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,50 @@ func TestFrontendCancellation(t *testing.T) {
})
}

// Bug: If FrontendWorkers are busy, cancellation passed by Query frontend may not reach
// all the frontend workers. (assumming we run multiple frontend workers)
func TestFrontendWorkerCancellation(t *testing.T) {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
f, ms := setupFrontend(t, nil)

// fmt.Println("workers count", f.schedulerWorkers.getWorkersCount(), "max-concurrency per worker", f.cfg.WorkerConcurrency)

cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

// send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests.
reqCount := testFrontendWorkerConcurrency + 5
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
}()
}

wg.Wait()

// We wait a bit to make sure scheduler receives the cancellation request.
// 2 * reqCount because for every request, should also be corresponding cancel request
test.Poll(t, time.Second, 2*reqCount, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()

return len(ms.msgs)
})

ms.checkWithLock(func() {
require.Equal(t, 2*reqCount, len(ms.msgs))

// require.True(t, ms.msgs[0].Type == schedulerpb.ENQUEUE)
// require.True(t, ms.msgs[1].Type == schedulerpb.CANCEL)
// require.True(t, ms.msgs[0].QueryID == ms.msgs[1].QueryID)
// fmt.Println(ms.msgs[0].QueryID, ms.msgs[1].QueryID)
})
}

func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil)

Expand Down