Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancel issue between Query Frontend and Query Schdeduler #5113

Merged
merged 12 commits into from
Jan 13, 2022
17 changes: 11 additions & 6 deletions pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,20 @@ enqueueAgain:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request")
}

fmt.Println("waiting on either cancel or response", "cancelCh", cancelCh)
select {
case <-ctx.Done():
if cancelCh != nil {
select {
case cancelCh <- freq.queryID:
// cancellation sent.
default:
// failed to cancel, ignore.
}
// NOTE(kavi): I think we don't need buffer channel.
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
// Let it block until it's workers receives it, We don't want to exist RoundTripGRPC without cancelling the downstream request started by frontend workers.
f.schedulerWorkers.sendRequestCancel(freq.queryID, cancelCh)

// select {
// case cancelCh <- freq.queryID:
// cancellation sent.
// default:
// // failed to cancel, ignore.
// }
}
return nil, ctx.Err()

Expand Down
17 changes: 17 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2

import (
"context"
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -146,6 +147,20 @@ func (f *frontendSchedulerWorkers) getWorkersCount() int {
return len(f.workers)
}

// sendRequestCancel sends cancellation to the "already scheduled" frontendRequest.
// It will make sure the frontend worker that is responsible for the `reqID`
// receives the cancel signal.
func (f *frontendSchedulerWorkers) sendRequestCancel(reqID uint64, cancelCh chan<- uint64) {
f.mu.Lock()
defer f.mu.Unlock()
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved

// There can be the case, where Frontend is running without any Frontend workers,
// sending cancel shoudn't block in those cases.
if len(f.workers) > 0 {
cancelCh <- reqID
}
}

func (f *frontendSchedulerWorkers) connectToScheduler(ctx context.Context, address string) (*grpc.ClientConn, error) {
// Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics.
opts, err := f.cfg.GRPCClientConfig.DialOption(nil, nil)
Expand Down Expand Up @@ -276,6 +291,7 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
return nil

case req := <-w.requestCh:
fmt.Println("Got the request", req.queryID)
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: req.queryID,
Expand Down Expand Up @@ -325,6 +341,7 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
}

case reqID := <-w.cancelCh:
fmt.Println("Got the cancel as well", reqID)
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
err := loop.Send(&schedulerpb.FrontendToScheduler{
Type: schedulerpb.CANCEL,
QueryID: reqID,
Expand Down
39 changes: 39 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,45 @@ func TestFrontendCancellation(t *testing.T) {
})
}

// Bug: If FrontendWorkers are busy, cancellation passed by Query frontend may not reach
// all the frontend workers. (assumming we run multiple frontend workers)
func TestFrontendWorkerCancellation(t *testing.T) {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
f, ms := setupFrontend(t, nil)

// fmt.Println("workers count", f.schedulerWorkers.getWorkersCount(), "max-concurrency per worker", f.cfg.WorkerConcurrency)

cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

// send multiple requests > maxconcurrency of scheduler. So that it keeps all the frontend worker busy in serving requests.
reqCount := testFrontendWorkerConcurrency + 5
var wg sync.WaitGroup
for i := 0; i < reqCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)
}()
}

wg.Wait()

// We wait a bit to make sure scheduler receives the cancellation request.
// 2 * reqCount because for every request, should also be corresponding cancel request
test.Poll(t, 5*time.Second, 2*reqCount, func() interface{} {
ms.mu.Lock()
defer ms.mu.Unlock()

return len(ms.msgs)
})

ms.checkWithLock(func() {
require.Equal(t, 2*reqCount, len(ms.msgs))
})
}

func TestFrontendFailedCancellation(t *testing.T) {
f, ms := setupFrontend(t, nil)

Expand Down