Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

All changes to date including pause requests & start paused, along with new adds for cleanups and checking of execution #75

Merged
merged 20 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d62f894
WIP
hannahhoward Jun 28, 2020
3ed6495
feat(graphsync): pause/unpause requests
hannahhoward Jun 30, 2020
6a11065
fix(requestmanager): refactor executor
hannahhoward Jun 30, 2020
87f0c1f
feat(graphsync): support external request pauses
hannahhoward Jun 30, 2020
52f9c94
fix(lint): fix lint errors
hannahhoward Jun 30, 2020
7e942b4
feat(responsemanager): start requests paused
hannahhoward Jun 30, 2020
411f31f
feat(responsemanager): improve cancellation UX
hannahhoward Jul 4, 2020
fb74092
feat(requestmanager): process request cancelled status
hannahhoward Jul 4, 2020
cb15c41
feat(executor): refactor to remove loader
hannahhoward Jul 5, 2020
cce8318
fix(asyncloader): load requests synchronously when possible
hannahhoward Jul 6, 2020
8241d2f
fix(responsemanager): fix external pause
hannahhoward Jul 6, 2020
013825e
fix(responsemanager): do not delay complete listener
hannahhoward Jul 6, 2020
f9f3a4f
fix(responsemanager): fix context check
hannahhoward Jul 6, 2020
fc05cbe
fix(responsemanager): more precise cancel
hannahhoward Jul 6, 2020
e868862
fix(requestmanager): handle non processed pauses
hannahhoward Jul 6, 2020
8f62f29
refactor(responsemanager): handle cancels, correctly this time
hannahhoward Jul 6, 2020
642930f
fix(errors): remove regex cause it appears to be very slow
hannahhoward Jul 8, 2020
1c7a9a7
fix(traverser): fix race condition for shutdown
hannahhoward Jul 8, 2020
d592717
fix(deps): mod tidy
hannahhoward Jul 8, 2020
664461a
fix(executor): add back network error
hannahhoward Jul 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,45 @@ const (
RequestFailedLegal = ResponseStatusCode(33)
// RequestFailedContentNotFound means the respondent does not have the content.
RequestFailedContentNotFound = ResponseStatusCode(34)
// RequestCancelled means the responder was processing the request but decided to top, for whatever reason
RequestCancelled = ResponseStatusCode(35)
)

// RequestFailedBusyErr is an error message received on the error channel when the peer is busy
type RequestFailedBusyErr struct{}

func (e RequestFailedBusyErr) Error() string {
return "Request Failed - Peer Is Busy"
}

// RequestFailedContentNotFoundErr is an error message received on the error channel when the content is not found
type RequestFailedContentNotFoundErr struct{}

func (e RequestFailedContentNotFoundErr) Error() string {
return "Request Failed - Content Not Found"
}

// RequestFailedLegalErr is an error message received on the error channel when the request fails for legal reasons
type RequestFailedLegalErr struct{}

func (e RequestFailedLegalErr) Error() string {
return "Request Failed - For Legal Reasons"
}

// RequestFailedUnknownErr is an error message received on the error channel when the request fails for unknown reasons
type RequestFailedUnknownErr struct{}

func (e RequestFailedUnknownErr) Error() string {
return "Request Failed - Unknown Reason"
}

// RequestCancelledErr is an error message received on the error channel that indicates the responder cancelled a request
type RequestCancelledErr struct{}

func (e RequestCancelledErr) Error() string {
return "Request Failed - Responder Cancelled"
}

var (
// ErrExtensionAlreadyRegistered means a user extension can be registered only once
ErrExtensionAlreadyRegistered = errors.New("extension already registered")
Expand Down Expand Up @@ -158,6 +195,7 @@ type IncomingRequestHookActions interface {
UseLinkTargetNodeStyleChooser(traversal.LinkTargetNodeStyleChooser)
TerminateWithError(error)
ValidateRequest()
PauseResponse()
}

// OutgoingBlockHookActions are actions that an outgoing block hook can take to
Expand Down Expand Up @@ -187,6 +225,7 @@ type IncomingResponseHookActions interface {
type IncomingBlockHookActions interface {
TerminateWithError(error)
UpdateRequestWithExtensions(...ExtensionData)
PauseRequest()
}

// RequestUpdatedHookActions are actions that can be taken in a request updated hook to
Expand Down Expand Up @@ -236,6 +275,9 @@ type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest Req
// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)

// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

Expand Down Expand Up @@ -268,6 +310,17 @@ type GraphExchange interface {
// RegisterCompletedResponseListener adds a listener on the responder for completed responses
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc

// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
RegisterRequestorCancelledListener(listener OnRequestorCancelledListener) UnregisterHookFunc

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
UnpauseRequest(RequestID, ...ExtensionData) error

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
PauseRequest(RequestID) error

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
// Can also send extensions with unpause
UnpauseResponse(peer.ID, RequestID, ...ExtensionData) error
Expand Down
102 changes: 61 additions & 41 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,27 @@ const maxRecursionDepth = 100
// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
network gsnet.GraphSyncNetwork
loader ipld.Loader
storer ipld.Storer
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
asyncLoader *asyncloader.AsyncLoader
peerResponseManager *peerresponsemanager.PeerResponseManager
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
unregisterDefaultValidator graphsync.UnregisterHookFunc
network gsnet.GraphSyncNetwork
loader ipld.Loader
storer ipld.Storer
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
asyncLoader *asyncloader.AsyncLoader
peerResponseManager *peerresponsemanager.PeerResponseManager
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
unregisterDefaultValidator graphsync.UnregisterHookFunc
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -88,29 +89,31 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners)
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
loader: loader,
storer: storer,
asyncLoader: asyncLoader,
requestManager: requestManager,
peerManager: peerManager,
persistenceOptions: persistenceOptions,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
ctx: ctx,
cancel: cancel,
unregisterDefaultValidator: unregisterDefaultValidator,
network: network,
loader: loader,
storer: storer,
asyncLoader: asyncLoader,
requestManager: requestManager,
peerManager: peerManager,
persistenceOptions: persistenceOptions,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
requestorCancelledListeners: requestorCancelledListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
ctx: ctx,
cancel: cancel,
unregisterDefaultValidator: unregisterDefaultValidator,
}

for _, option := range options {
Expand Down Expand Up @@ -177,6 +180,23 @@ func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHoo
return gs.incomingBlockHooks.Register(hook)
}

// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
func (gs *GraphSync) RegisterRequestorCancelledListener(listener graphsync.OnRequestorCancelledListener) graphsync.UnregisterHookFunc {
return gs.requestorCancelledListeners.Register(listener)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.requestManager.UnpauseRequest(requestID, extensions...)
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (gs *GraphSync) PauseRequest(requestID graphsync.RequestID) error {
return gs.requestManager.PauseRequest(requestID)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
Expand Down
46 changes: 46 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,52 @@ func TestPauseResume(t *testing.T) {
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

}
func TestPauseResumeRequest(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockSize := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, uint64(blockSize), blockChainLength)

// initialize graphsync on second node to response to requests
_ = td.GraphSyncHost2()

stopPoint := 50
blocksReceived := 0
requestIDChan := make(chan graphsync.RequestID, 1)
requestor.RegisterIncomingBlockHook(func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
select {
case requestIDChan <- responseData.RequestID():
default:
}
blocksReceived++
if blocksReceived == stopPoint {
hookActions.PauseRequest()
}
})

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint-1)
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestID := <-requestIDChan
err := requestor.UnpauseRequest(requestID, td.extensionUpdate)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint-1)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")
}

func TestPauseResumeViaUpdate(t *testing.T) {
// create network
Expand Down
38 changes: 32 additions & 6 deletions ipldutil/traverser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (

var defaultVisitor traversal.AdvVisitFn = func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }

// ContextCancelError is a sentinel that indicates the passed in context
// was cancelled
type ContextCancelError struct{}

func (cp ContextCancelError) Error() string {
return "Context cancelled"
}

// TraversalBuilder defines parameters for an iterative traversal
type TraversalBuilder struct {
Root ipld.Link
Expand All @@ -31,6 +39,8 @@ type Traverser interface {
Advance(reader io.Reader) error
// Error errors the traversal by returning the given error as the result of the next IPLD load
Error(err error)
// Shutdown cancels the traversal
Shutdown(ctx context.Context)
}

type state struct {
Expand All @@ -47,16 +57,20 @@ type nextResponse struct {

// Start initiates the traversal (run in a go routine because the regular
// selector traversal expects a call back)
func (tb TraversalBuilder) Start(ctx context.Context) Traverser {
func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
ctx, cancel := context.WithCancel(parentCtx)
t := &traverser{
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
root: tb.Root,
selector: tb.Selector,
visitor: defaultVisitor,
chooser: defaultChooser,
awaitRequest: make(chan struct{}, 1),
stateChan: make(chan state, 1),
responses: make(chan nextResponse),
stopped: make(chan struct{}),
}
if tb.Visitor != nil {
t.visitor = tb.Visitor
Expand All @@ -71,7 +85,9 @@ func (tb TraversalBuilder) Start(ctx context.Context) Traverser {
// traverser is a class to perform a selector traversal that stops every time a new block is loaded
// and waits for manual input (in the form of advance or error)
type traverser struct {
parentCtx context.Context
ctx context.Context
cancel func()
root ipld.Link
selector ipld.Node
visitor traversal.AdvVisitFn
Expand All @@ -83,6 +99,7 @@ type traverser struct {
awaitRequest chan struct{}
stateChan chan state
responses chan nextResponse
stopped chan struct{}
}

func (t *traverser) checkState() {
Expand All @@ -91,7 +108,7 @@ func (t *traverser) checkState() {
select {
case <-t.ctx.Done():
t.isDone = true
t.completionErr = errors.New("Context cancelled")
t.completionErr = ContextCancelError{}
case newState := <-t.stateChan:
t.isDone = newState.isDone
t.completionErr = newState.completionErr
Expand All @@ -116,15 +133,16 @@ func (t *traverser) start() {
case t.awaitRequest <- struct{}{}:
}
go func() {
defer close(t.stopped)
loader := func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) {
select {
case <-t.ctx.Done():
return nil, errors.New("Context cancelled")
return nil, ContextCancelError{}
case t.stateChan <- state{false, nil, lnk, lnkCtx}:
}
select {
case <-t.ctx.Done():
return nil, errors.New("Context cancelled")
return nil, ContextCancelError{}
case response := <-t.responses:
return response.input, response.err
}
Expand Down Expand Up @@ -158,6 +176,14 @@ func (t *traverser) start() {
}()
}

func (t *traverser) Shutdown(ctx context.Context) {
t.cancel()
select {
case <-ctx.Done():
case <-t.stopped:
}
}

// IsComplete returns true if a traversal is complete
func (t *traverser) IsComplete() (bool, error) {
t.checkState()
Expand All @@ -179,12 +205,12 @@ func (t *traverser) Advance(reader io.Reader) error {
}
select {
case <-t.ctx.Done():
return errors.New("context cancelled")
return ContextCancelError{}
case t.awaitRequest <- struct{}{}:
}
select {
case <-t.ctx.Done():
return errors.New("context cancelled")
return ContextCancelError{}
case t.responses <- nextResponse{reader, nil}:
}
return nil
Expand Down
Loading