From 220663aaf5a99e85019e33f0593a31bf57521dc5 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 2 Aug 2021 19:42:21 -0700 Subject: [PATCH 1/8] fix(ipld): fix hang in code on race fix hang in code due to potential race, mark this code in need of replacement --- ipldutil/traverser.go | 7 +++++++ ipldutil/traverser_test.go | 17 +++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/ipldutil/traverser.go b/ipldutil/traverser.go index 85aa9c6d..12756d1e 100644 --- a/ipldutil/traverser.go +++ b/ipldutil/traverser.go @@ -10,6 +10,12 @@ import ( "github.com/ipld/go-ipld-prime/traversal/selector" ) +/* TODO: This traverser creates an extra go-routine and is quite complicated, in order to give calling code control of +a selector traversal. If it were implemented inside of go-ipld-primes traversal library, with access to private functions, +it could be done without an extra go-routine, avoiding the possibility of races and simplifying implementation. This has +been documented here: https://github.com/ipld/go-ipld-prime/issues/213 -- and when this issue is implemented, this traverser +can go away */ + var defaultVisitor traversal.AdvVisitFn = func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil } // ContextCancelError is a sentinel that indicates the passed in context @@ -137,6 +143,7 @@ func (t *traverser) writeDone(err error) { func (t *traverser) start() { select { case <-t.ctx.Done(): + close(t.stopped) return case t.awaitRequest <- struct{}{}: } diff --git a/ipldutil/traverser_test.go b/ipldutil/traverser_test.go index 54ba4acd..1015254e 100644 --- a/ipldutil/traverser_test.go +++ b/ipldutil/traverser_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "testing" + "time" blocks "github.com/ipfs/go-block-format" ipld "github.com/ipld/go-ipld-prime" @@ -21,6 +22,22 @@ import ( func TestTraverser(t *testing.T) { ctx := context.Background() + t.Run("started with shutdown context, then shutdown", func(t *testing.T) { + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + testdata := testutil.NewTestIPLDTree() + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() + traverser := TraversalBuilder{ + Root: testdata.RootNodeLnk, + Selector: sel, + }.Start(cancelledCtx) + timeoutCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + traverser.Shutdown(timeoutCtx) + require.NoError(t, timeoutCtx.Err()) + }) + t.Run("traverses correctly, simple struct", func(t *testing.T) { testdata := testutil.NewTestIPLDTree() ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) From 77628ecd57dd55f8233142602dc7bd3bd6367dd3 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 2 Aug 2021 19:43:51 -0700 Subject: [PATCH 2/8] feat(requestmanager): add CancelRequest method add a cancel request that waits for the request to fully complete. add test that verifies no race. --- graphsync.go | 3 ++ impl/graphsync.go | 7 +++- requestmanager/executor/executor.go | 10 ++--- requestmanager/requestmanager.go | 51 ++++++++++++++++++------ requestmanager/requestmanager_test.go | 57 ++++++++++++++++++++++++++- 5 files changed, 108 insertions(+), 20 deletions(-) diff --git a/graphsync.go b/graphsync.go index e3076145..6f5a8d91 100644 --- a/graphsync.go +++ b/graphsync.go @@ -369,4 +369,7 @@ type GraphExchange interface { // CancelResponse cancels an in progress response CancelResponse(peer.ID, RequestID) error + + // CancelRequest cancels an in progress response + CancelRequest(RequestID) error } diff --git a/impl/graphsync.go b/impl/graphsync.go index 76d6b7dc..8e4cb3c8 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -195,7 +195,7 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques return gs.incomingRequestHooks.Register(hook) } -// RegisterIncomingRequestHook adds a hook that runs when a new incoming request is added +// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added // to the responder's task queue. func (gs *GraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc { return gs.incomingRequestQueuedHooks.Register(hook) @@ -296,6 +296,11 @@ func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) er return gs.responseManager.CancelResponse(p, requestID) } +// CancelRequest cancels an in progress request +func (gs *GraphSync) CancelRequest(requestID graphsync.RequestID) error { + return gs.requestManager.CancelRequest(requestID) +} + type graphSyncReceiver GraphSync func (gsr *graphSyncReceiver) graphSync() *GraphSync { diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index bef1274e..b442b290 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -38,7 +38,7 @@ type ExecutionEnv struct { type RequestExecution struct { Ctx context.Context P peer.ID - NetworkError chan error + TerminalError chan error Request gsmsg.GraphSyncRequest LastResponse *atomic.Value DoNotSendCids *cid.Set @@ -54,7 +54,7 @@ func (ee ExecutionEnv) Start(re RequestExecution) (chan graphsync.ResponseProgre inProgressErr: make(chan error), ctx: re.Ctx, p: re.P, - networkError: re.NetworkError, + terminalError: re.TerminalError, request: re.Request, lastResponse: re.LastResponse, doNotSendCids: re.DoNotSendCids, @@ -73,7 +73,7 @@ type requestExecutor struct { inProgressErr chan error ctx context.Context p peer.ID - networkError chan error + terminalError chan error request gsmsg.GraphSyncRequest lastResponse *atomic.Value nodeStyleChooser traversal.LinkTargetNodePrototypeChooser @@ -153,9 +153,9 @@ func (re *requestExecutor) run() { } } select { - case networkError := <-re.networkError: + case terminalError := <-re.terminalError: select { - case re.inProgressErr <- networkError: + case re.inProgressErr <- terminalError: case <-re.env.Ctx.Done(): } default: diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index cfff354b..c51f0c09 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -43,11 +43,12 @@ type inProgressRequestStatus struct { startTime time.Time cancelFn func() p peer.ID - networkError chan error + terminalError chan error resumeMessages chan []graphsync.ExtensionData pauseMessages chan struct{} paused bool lastResponse atomic.Value + onTerminated []chan error } // PeerHandler is an interface that can send requests to peers @@ -234,8 +235,10 @@ func (rm *RequestManager) singleErrorResponse(err error) (chan graphsync.Respons } type cancelRequestMessage struct { - requestID graphsync.RequestID - isPause bool + requestID graphsync.RequestID + isPause bool + onTerminated chan error + terminalError error } func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, @@ -244,7 +247,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, cancelMessageChannel := rm.messages for cancelMessageChannel != nil || incomingResponses != nil || incomingErrors != nil { select { - case cancelMessageChannel <- &cancelRequestMessage{requestID, false}: + case cancelMessageChannel <- &cancelRequestMessage{requestID, false, nil, nil}: cancelMessageChannel = nil // clear out any remaining responses, in case and "incoming reponse" // messages get processed before our cancel message @@ -262,6 +265,12 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, } } +// CancelRequest cancels the given request ID and waits for the request to terminate +func (rm *RequestManager) CancelRequest(requestID graphsync.RequestID) error { + terminated := make(chan error, 1) + return rm.sendSyncMessage(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestContextCancelledErr{}}, terminated) +} + type processResponseMessage struct { p peer.ID responses []gsmsg.GraphSyncResponse @@ -374,9 +383,9 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re p := nrm.p resumeMessages := make(chan []graphsync.ExtensionData, 1) pauseMessages := make(chan struct{}, 1) - networkError := make(chan error, 1) + terminalError := make(chan error, 1) requestStatus := &inProgressRequestStatus{ - ctx: ctx, startTime: time.Now(), cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, networkError: networkError, + ctx: ctx, startTime: time.Now(), cancelFn: cancel, p: p, resumeMessages: resumeMessages, pauseMessages: pauseMessages, terminalError: terminalError, } lastResponse := &requestStatus.lastResponse lastResponse.Store(gsmsg.NewResponse(request.ID(), graphsync.RequestAcknowledged)) @@ -392,7 +401,7 @@ func (nrm *newRequestMessage) setupRequest(requestID graphsync.RequestID, rm *Re Ctx: ctx, P: p, Request: request, - NetworkError: networkError, + TerminalError: terminalError, LastResponse: lastResponse, DoNotSendCids: doNotSendCids, NodePrototypeChooser: hooksResult.CustomChooser, @@ -421,6 +430,14 @@ func (trm *terminateRequestMessage) handle(rm *RequestManager) { } delete(rm.inProgressRequestStatuses, trm.requestID) rm.asyncLoader.CleanupRequest(trm.requestID) + if ok { + for _, onTerminated := range ipr.onTerminated { + select { + case <-rm.ctx.Done(): + case onTerminated <- nil: + } + } + } } func (crm *cancelRequestMessage) handle(rm *RequestManager) { @@ -429,6 +446,16 @@ func (crm *cancelRequestMessage) handle(rm *RequestManager) { return } + if crm.onTerminated != nil { + inProgressRequestStatus.onTerminated = append(inProgressRequestStatus.onTerminated, crm.onTerminated) + } + if crm.terminalError != nil { + select { + case inProgressRequestStatus.terminalError <- crm.terminalError: + default: + } + } + rm.sendRequest(inProgressRequestStatus.p, gsmsg.CancelRequest(crm.requestID)) if crm.isPause { inProgressRequestStatus.paused = true @@ -488,8 +515,8 @@ func (rm *RequestManager) processExtensionsForResponse(p peer.ID, response gsmsg } responseError := rm.generateResponseErrorFromStatus(graphsync.RequestFailedUnknown) select { - case requestStatus.networkError <- responseError: - case <-requestStatus.ctx.Done(): + case requestStatus.terminalError <- responseError: + default: } rm.sendRequest(p, gsmsg.CancelRequest(response.RequestID())) requestStatus.cancelFn() @@ -505,8 +532,8 @@ func (rm *RequestManager) processTerminations(responses []gsmsg.GraphSyncRespons requestStatus := rm.inProgressRequestStatuses[response.RequestID()] responseError := rm.generateResponseErrorFromStatus(response.Status()) select { - case requestStatus.networkError <- responseError: - case <-requestStatus.ctx.Done(): + case requestStatus.terminalError <- responseError: + default: } requestStatus.cancelFn() } @@ -542,7 +569,7 @@ func (rm *RequestManager) processBlockHooks(p peer.ID, response graphsync.Respon _, isPause := result.Err.(hooks.ErrPaused) select { case <-rm.ctx.Done(): - case rm.messages <- &cancelRequestMessage{response.RequestID(), isPause}: + case rm.messages <- &cancelRequestMessage{response.RequestID(), isPause, nil, nil}: } } return result.Err diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 6c514a43..8a78dbf5 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -215,6 +215,59 @@ func TestCancelRequestInProgress(t *testing.T) { _, ok := errors[0].(graphsync.RequestContextCancelledErr) require.True(t, ok) } +func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { + ctx := context.Background() + td := newTestData(ctx, t) + requestCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + peers := testutil.GeneratePeers(1) + + postCancel := make(chan struct{}, 1) + loadPostCancel := make(chan struct{}, 1) + td.fal.OnAsyncLoad(func(graphsync.RequestID, ipld.Link, <-chan types.AsyncLoadResult) { + select { + case <-postCancel: + loadPostCancel <- struct{}{} + default: + } + }) + + _, returnedErrorChan1 := td.requestManager.SendRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector()) + + requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1) + + go func() { + firstBlocks := td.blockChain.Blocks(0, 3) + firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true) + firstResponses := []gsmsg.GraphSyncResponse{ + gsmsg.NewResponse(requestRecords[0].gsr.ID(), graphsync.PartialResponse, firstMetadata), + } + td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks) + td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks) + }() + fmt.Println("her") + + err := td.requestManager.CancelRequest(requestRecords[0].gsr.ID()) + require.NoError(t, err) + postCancel <- struct{}{} + + rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0] + + require.True(t, rr.gsr.IsCancel()) + require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID()) + + errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1) + require.Len(t, errors, 1) + _, ok := errors[0].(graphsync.RequestContextCancelledErr) + require.True(t, ok) + fmt.Println("here") + select { + case <-loadPostCancel: + t.Fatalf("Loaded block after cancel") + case <-requestCtx.Done(): + } + fmt.Println("here2") +} func TestCancelManagerExitsGracefully(t *testing.T) { ctx := context.Background() @@ -375,13 +428,13 @@ func TestDisconnectNotification(t *testing.T) { select { case <-networkErrors: t.Fatal("should not fire network error when unrelated peer disconnects") - default: + default: } // Disconnect the target peer, should fire a network error td.requestManager.Disconnected(targetPeer) select { - case p:= <-networkErrors: + case p := <-networkErrors: require.Equal(t, p, targetPeer) default: t.Fatal("should fire network error when peer disconnects") From 8521bb661fe20c7d61f5d75e487852f020fcadd5 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 2 Aug 2021 19:56:48 -0700 Subject: [PATCH 3/8] refactor(graphsync): rename RequestContextCancelledErr rename RequestContextCancelledErr to reflect reason for err --- graphsync.go | 9 +++++---- requestmanager/requestmanager.go | 2 +- requestmanager/requestmanager_test.go | 4 ++-- requestmanager/responsecollector.go | 4 ++-- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/graphsync.go b/graphsync.go index 6f5a8d91..22585e8e 100644 --- a/graphsync.go +++ b/graphsync.go @@ -94,11 +94,12 @@ const ( RequestCancelled = ResponseStatusCode(35) ) -// RequestContextCancelledErr is an error message received on the error channel when the request context given by the user is cancelled/times out -type RequestContextCancelledErr struct{} +// RequestClientCancelledErr is an error message received on the error channel when the request is cancelled on by the client code, +// either by closing the passed request context or calling CancelRequest +type RequestClientCancelledErr struct{} -func (e RequestContextCancelledErr) Error() string { - return "Request Context Cancelled" +func (e RequestClientCancelledErr) Error() string { + return "Request Cancelled By Client" } // RequestFailedBusyErr is an error message received on the error channel when the peer is busy diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index c51f0c09..ec8505cb 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -268,7 +268,7 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, // CancelRequest cancels the given request ID and waits for the request to terminate func (rm *RequestManager) CancelRequest(requestID graphsync.RequestID) error { terminated := make(chan error, 1) - return rm.sendSyncMessage(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestContextCancelledErr{}}, terminated) + return rm.sendSyncMessage(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestClientCancelledErr{}}, terminated) } type processResponseMessage struct { diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 8a78dbf5..f3c5f73c 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -212,7 +212,7 @@ func TestCancelRequestInProgress(t *testing.T) { errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1) require.Len(t, errors, 1) - _, ok := errors[0].(graphsync.RequestContextCancelledErr) + _, ok := errors[0].(graphsync.RequestClientCancelledErr) require.True(t, ok) } func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { @@ -258,7 +258,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { errors := testutil.CollectErrors(requestCtx, t, returnedErrorChan1) require.Len(t, errors, 1) - _, ok := errors[0].(graphsync.RequestContextCancelledErr) + _, ok := errors[0].(graphsync.RequestClientCancelledErr) require.True(t, ok) fmt.Println("here") select { diff --git a/requestmanager/responsecollector.go b/requestmanager/responsecollector.go index 062ca47c..f348c61c 100644 --- a/requestmanager/responsecollector.go +++ b/requestmanager/responsecollector.go @@ -85,7 +85,7 @@ func (rc *responseCollector) collectResponses( case <-requestCtx.Done(): select { case <-rc.ctx.Done(): - case returnedErrors <- graphsync.RequestContextCancelledErr{}: + case returnedErrors <- graphsync.RequestClientCancelledErr{}: } return case err, ok := <-incomingErrors: @@ -97,7 +97,7 @@ func (rc *responseCollector) collectResponses( case <-requestCtx.Done(): select { case <-rc.ctx.Done(): - case returnedErrors <- graphsync.RequestContextCancelledErr{}: + case returnedErrors <- graphsync.RequestClientCancelledErr{}: } default: } From e4ae94181254c7ab2f9a5871b8d2449ce40cdd9f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 2 Aug 2021 19:57:34 -0700 Subject: [PATCH 4/8] fix(requestmanager): handle edge case for terminated notification --- requestmanager/requestmanager.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index ec8505cb..659997e0 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -443,6 +443,12 @@ func (trm *terminateRequestMessage) handle(rm *RequestManager) { func (crm *cancelRequestMessage) handle(rm *RequestManager) { inProgressRequestStatus, ok := rm.inProgressRequestStatuses[crm.requestID] if !ok { + if crm.onTerminated != nil { + select { + case crm.onTerminated <- errors.New("request not found"): + case <-rm.ctx.Done(): + } + } return } From d2f164ceebd1ab7179b875635dc77886df55a9e4 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 2 Aug 2021 20:02:44 -0700 Subject: [PATCH 5/8] fix(impl): finish rename --- impl/graphsync_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index b2baa1c0..5bc92e8f 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -617,7 +617,7 @@ func TestNetworkDisconnect(t *testing.T) { testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error") testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") - require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error()) + require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error()) testutil.AssertReceive(ctx, t, receiverError, &err, "should receive an error on receiver side") } @@ -653,7 +653,7 @@ func TestConnectFail(t *testing.T) { var err error testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error") testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error") - require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error()) + require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error()) } func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) { From fad302069a6f0cdf3f31f01e28661ac2d7d48995 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 3 Aug 2021 01:29:10 -0700 Subject: [PATCH 6/8] refactor(requestmanager): add ctx to cancel request add a context that can timeout a cancel request if it doesn't finish in a set time --- graphsync.go | 2 +- impl/graphsync.go | 4 ++-- requestmanager/requestmanager.go | 12 +++++++----- requestmanager/requestmanager_test.go | 5 +++-- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/graphsync.go b/graphsync.go index 22585e8e..d6a5b0b1 100644 --- a/graphsync.go +++ b/graphsync.go @@ -372,5 +372,5 @@ type GraphExchange interface { CancelResponse(peer.ID, RequestID) error // CancelRequest cancels an in progress response - CancelRequest(RequestID) error + CancelRequest(context.Context, RequestID) error } diff --git a/impl/graphsync.go b/impl/graphsync.go index 8e4cb3c8..ce16c734 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -297,8 +297,8 @@ func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) er } // CancelRequest cancels an in progress request -func (gs *GraphSync) CancelRequest(requestID graphsync.RequestID) error { - return gs.requestManager.CancelRequest(requestID) +func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error { + return gs.requestManager.CancelRequest(ctx, requestID) } type graphSyncReceiver GraphSync diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index 659997e0..43e2987d 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -266,9 +266,9 @@ func (rm *RequestManager) cancelRequest(requestID graphsync.RequestID, } // CancelRequest cancels the given request ID and waits for the request to terminate -func (rm *RequestManager) CancelRequest(requestID graphsync.RequestID) error { +func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error { terminated := make(chan error, 1) - return rm.sendSyncMessage(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestClientCancelledErr{}}, terminated) + return rm.sendSyncMessage(&cancelRequestMessage{requestID, false, terminated, graphsync.RequestClientCancelledErr{}}, terminated, ctx.Done()) } type processResponseMessage struct { @@ -297,7 +297,7 @@ type unpauseRequestMessage struct { // Can also send extensions with unpause func (rm *RequestManager) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { response := make(chan error, 1) - return rm.sendSyncMessage(&unpauseRequestMessage{requestID, extensions, response}, response) + return rm.sendSyncMessage(&unpauseRequestMessage{requestID, extensions, response}, response, nil) } type pauseRequestMessage struct { @@ -308,10 +308,10 @@ type pauseRequestMessage struct { // PauseRequest pauses an in progress request (may take 1 or more blocks to process) func (rm *RequestManager) PauseRequest(requestID graphsync.RequestID) error { response := make(chan error, 1) - return rm.sendSyncMessage(&pauseRequestMessage{requestID, response}, response) + return rm.sendSyncMessage(&pauseRequestMessage{requestID, response}, response, nil) } -func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, response chan error) error { +func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, response chan error, done <-chan struct{}) error { select { case <-rm.ctx.Done(): return errors.New("Context Cancelled") @@ -320,6 +320,8 @@ func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, respons select { case <-rm.ctx.Done(): return errors.New("Context Cancelled") + case <-done: + return errors.New("Context Cancelled") case err := <-response: return err } diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index f3c5f73c..73f1ed49 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -245,9 +245,10 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { td.requestManager.ProcessResponses(peers[0], firstResponses, firstBlocks) td.fal.SuccessResponseOn(requestRecords[0].gsr.ID(), firstBlocks) }() - fmt.Println("her") - err := td.requestManager.CancelRequest(requestRecords[0].gsr.ID()) + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second) + defer timeoutCancel() + err := td.requestManager.CancelRequest(timeoutCtx, requestRecords[0].gsr.ID()) require.NoError(t, err) postCancel <- struct{}{} From 2c8f175d470319ce75504baf550918791f27889f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 3 Aug 2021 10:36:19 -0700 Subject: [PATCH 7/8] refactor(requestmanager): fix pr comments --- requestmanager/requestmanager.go | 2 ++ requestmanager/requestmanager_test.go | 2 -- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index 43e2987d..9af6c715 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -315,6 +315,8 @@ func (rm *RequestManager) sendSyncMessage(message requestManagerMessage, respons select { case <-rm.ctx.Done(): return errors.New("Context Cancelled") + case <-done: + return errors.New("Context Cancelled") case rm.messages <- message: } select { diff --git a/requestmanager/requestmanager_test.go b/requestmanager/requestmanager_test.go index 73f1ed49..6ef5b77d 100644 --- a/requestmanager/requestmanager_test.go +++ b/requestmanager/requestmanager_test.go @@ -261,13 +261,11 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) { require.Len(t, errors, 1) _, ok := errors[0].(graphsync.RequestClientCancelledErr) require.True(t, ok) - fmt.Println("here") select { case <-loadPostCancel: t.Fatalf("Loaded block after cancel") case <-requestCtx.Done(): } - fmt.Println("here2") } func TestCancelManagerExitsGracefully(t *testing.T) { From 4a7ecf8fc738345fb8162103e16c11d1d7e14e8c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 3 Aug 2021 10:40:09 -0700 Subject: [PATCH 8/8] fix(graphsync): update comment --- graphsync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphsync.go b/graphsync.go index d6a5b0b1..69392119 100644 --- a/graphsync.go +++ b/graphsync.go @@ -371,6 +371,6 @@ type GraphExchange interface { // CancelResponse cancels an in progress response CancelResponse(peer.ID, RequestID) error - // CancelRequest cancels an in progress response + // CancelRequest cancels an in progress request CancelRequest(context.Context, RequestID) error }