Skip to content

Commit

Permalink
feat(validators): implied pauses
Browse files Browse the repository at this point in the history
causes datalimit exceeded and requires finalization to leave request paused, regardless of where
LeaveRequestPaused is set
  • Loading branch information
hannahhoward committed May 11, 2022
1 parent e37b172 commit e5f267b
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 42 deletions.
8 changes: 5 additions & 3 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,8 @@ func (m *manager) processValidationUpdate(ctx context.Context, chid datatransfer
if chst.Status() == datatransfer.Finalizing {
messageType = types.CompleteMessage
}
response, msgErr := message.ValidationResultResponse(messageType, chst.TransferID(), result, err)
response, msgErr := message.ValidationResultResponse(messageType, chst.TransferID(), result, err,
result.LeaveRequestPaused(chst))
if msgErr != nil {
return nil, nil, msgErr
}
Expand All @@ -409,8 +410,9 @@ func (m *manager) handleTransportUpdate(
resultErr error,
) error {

pauseRequest := result.LeaveRequestPaused(chst)
// resume channel as needed, sending the response message immediately and returning
if resultErr == nil && result.Accepted && !result.LeaveRequestPaused {
if resultErr == nil && result.Accepted && !pauseRequest {
if chst.Status().IsResponderPaused() && !chst.Status().InFinalization() {
return m.transport.(datatransfer.PauseableTransport).ResumeChannel(ctx, response, chst.ChannelID())
}
Expand All @@ -430,7 +432,7 @@ func (m *manager) handleTransportUpdate(
}

// pause the channel as needed
if result.LeaveRequestPaused && !chst.Status().IsResponderPaused() && !chst.Status().InFinalization() {
if pauseRequest && !chst.Status().IsResponderPaused() && !chst.Status().InFinalization() {
return m.transport.(datatransfer.PauseableTransport).PauseChannel(ctx, chst.ChannelID())
}

Expand Down
17 changes: 9 additions & 8 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,12 +1135,12 @@ func TestRoundTripCancelledRequest(t *testing.T) {
var chid datatransfer.ChannelID
if data.isPull {
sv.ExpectSuccessPull()
sv.StubResult(datatransfer.ValidationResult{Accepted: true, LeaveRequestPaused: true})
sv.StubResult(datatransfer.ValidationResult{Accepted: true, ForcePause: true})
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector)
} else {
sv.ExpectSuccessPush()
sv.StubResult(datatransfer.ValidationResult{Accepted: true, LeaveRequestPaused: true})
sv.StubResult(datatransfer.ValidationResult{Accepted: true, ForcePause: true})
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
}
Expand Down Expand Up @@ -1200,7 +1200,7 @@ func (r *retrievalRevalidator) ValidatePush(
vr := datatransfer.ValidationResult{
Accepted: true,
RequiresFinalization: r.requiresFinalization,
LeaveRequestPaused: r.leavePausedInitially,
ForcePause: r.leavePausedInitially,
VoucherResult: r.initialVoucherResult,
}
if len(r.pausePoints) > r.providerPausePoint {
Expand All @@ -1220,7 +1220,7 @@ func (r *retrievalRevalidator) ValidatePull(
vr := datatransfer.ValidationResult{
Accepted: true,
RequiresFinalization: r.requiresFinalization,
LeaveRequestPaused: r.leavePausedInitially,
ForcePause: r.leavePausedInitially,
VoucherResult: r.initialVoucherResult,
}
if len(r.pausePoints) > r.providerPausePoint {
Expand Down Expand Up @@ -1379,6 +1379,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
dt1.SendVoucherResult(ctx, chid, testutil.NewFakeDTType())
}
if event.Code == datatransfer.BeginFinalizing {
sv.requiresFinalization = false
dt1.SendVoucherResult(ctx, chid, finalVoucherResult)
}
if event.Code == datatransfer.Error {
Expand Down Expand Up @@ -2008,8 +2009,7 @@ func TestMultipleMessagesInExtension(t *testing.T) {
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender

root, origBytes := LoadRandomData(ctx, t, gsData.DagService1, 256000)
gsData.OrigBytes = origBytes
root := gsData.LoadUnixFSFile(t, false)
rootCid := root.(cidlink.Link).Cid
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()
Expand Down Expand Up @@ -2108,7 +2108,8 @@ func TestMultipleMessagesInExtension(t *testing.T) {
providerFinished <- struct{}{}
}
if event.Code == datatransfer.NewVoucher && channelState.Queued() > 0 {
dt1.UpdateValidationStatus(ctx, chid, sv.nextStatus())
vs := sv.nextStatus()
dt1.UpdateValidationStatus(ctx, chid, vs)
}
if event.Code == datatransfer.DataLimitExceeded {
if nextVoucherResult < len(pausePoints) {
Expand All @@ -2117,6 +2118,7 @@ func TestMultipleMessagesInExtension(t *testing.T) {
}
}
if event.Code == datatransfer.BeginFinalizing {
sv.requiresFinalization = false
dt1.SendVoucherResult(ctx, chid, finalVoucherResult)
}
})
Expand Down Expand Up @@ -2245,7 +2247,6 @@ func TestMultipleParallelTransfers(t *testing.T) {
return
}
if event.Code == datatransfer.Error {
fmt.Println(event.Message)
errChan <- struct{}{}
}
if channelState.Status() == datatransfer.Completed {
Expand Down
42 changes: 19 additions & 23 deletions impl/receiving_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package impl

import (
"context"
"fmt"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
Expand All @@ -24,13 +23,13 @@ func (m *manager) receiveNewRequest(chid datatransfer.ChannelID, incoming datatr
result, err := m.acceptRequest(chid, incoming)

// generate a response message
msg, msgErr := message.ValidationResultResponse(types.NewMessage, incoming.TransferID(), result, err)
msg, msgErr := message.ValidationResultResponse(types.NewMessage, incoming.TransferID(), result, err, result.ForcePause)
if msgErr != nil {
return nil, msgErr
}

// return the response message and any errors
return msg, m.requestError(result, err, false)
return msg, m.requestError(result, err, result.ForcePause)
}

// acceptRequest performs processing (including validation) on a new incoming request
Expand Down Expand Up @@ -113,84 +112,84 @@ func (m *manager) receiveRestartRequest(chid datatransfer.ChannelID, incoming da
log.Infof("channel %s: received restart request", chid)

// process the restart message, including validations
result, err := m.restartRequest(chid, incoming)
stayPaused, result, err := m.restartRequest(chid, incoming)

// generate a response message
msg, msgErr := message.ValidationResultResponse(types.RestartMessage, incoming.TransferID(), result, err)
msg, msgErr := message.ValidationResultResponse(types.RestartMessage, incoming.TransferID(), result, err, stayPaused)
if msgErr != nil {
return nil, msgErr
}

// return the response message and any errors
return msg, m.requestError(result, err, false)
return msg, m.requestError(result, err, result.ForcePause)
}

// restartRequest performs processing (including validation) on a incoming restart request
func (m *manager) restartRequest(chid datatransfer.ChannelID,
incoming datatransfer.Request) (datatransfer.ValidationResult, error) {
incoming datatransfer.Request) (bool, datatransfer.ValidationResult, error) {

// restart requests are invalid if we the initiator
// (the responder must send a "restart existing channel request")
initiator := chid.Initiator
if m.peerID == initiator {
return datatransfer.ValidationResult{}, xerrors.New("initiator cannot be manager peer for a restart request")
return false, datatransfer.ValidationResult{}, xerrors.New("initiator cannot be manager peer for a restart request")
}

// valide that the request parameters match the original request
// TODO: not sure this is needed -- the request parameters cannot change,
// so perhaps the solution is just to ignore them in the message
if err := m.validateRestartRequest(context.Background(), initiator, chid, incoming); err != nil {
return datatransfer.ValidationResult{}, xerrors.Errorf("restart request for channel %s failed validation: %w", chid, err)
return false, datatransfer.ValidationResult{}, xerrors.Errorf("restart request for channel %s failed validation: %w", chid, err)
}

// read the channel state
chst, err := m.channels.GetByID(context.TODO(), chid)
if err != nil {
return datatransfer.ValidationResult{}, err
return false, datatransfer.ValidationResult{}, err
}

// perform a revalidation against the last voucher
result, err := m.validateRestart(chst)
stayPaused := result.LeaveRequestPaused(chst)

// if an error occurred during validation return
if err != nil {
return result, err
return stayPaused, result, err
}

// if the request is now rejected, error the channel
if !result.Accepted {
return result, m.recordRejectedValidationEvents(chid, result)
return stayPaused, result, m.recordRejectedValidationEvents(chid, result)
}

// record the restart events
if err := m.channels.Restart(chid); err != nil {
return result, xerrors.Errorf("failed to restart channel %s: %w", chid, err)
return stayPaused, result, xerrors.Errorf("failed to restart channel %s: %w", chid, err)
}

// record validation events
if err := m.recordAcceptedValidationEvents(chst, result); err != nil {
return result, err
return stayPaused, result, err
}

// configure the transport
voucher, err := m.decodeVoucher(incoming)
if err != nil {
return result, err
return stayPaused, result, err
}
processor, has := m.transportConfigurers.Processor(voucher.Type())
if has {
transportConfigurer := processor.(datatransfer.TransportConfigurer)
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(initiator, chid.String())
return result, nil
return stayPaused, result, nil
}

// processUpdateVoucher handles an incoming request message with an updated voucher
func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request datatransfer.Request) (datatransfer.Response, error) {
// decode the voucher and save it on the channel
vouch, err := m.decodeVoucher(request)
fmt.Println(err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -223,19 +222,16 @@ func (m *manager) receiveUpdateRequest(chid datatransfer.ChannelID, request data
// ErrPause / ErrResume based off the validation result
// TODO: get away from using ErrPause/ErrResume to indicate pause resume,
// which would remove the need for most of this method
func (m *manager) requestError(result datatransfer.ValidationResult, resultErr error, handleResumes bool) error {
func (m *manager) requestError(result datatransfer.ValidationResult, resultErr error, stayPaused bool) error {
if resultErr != nil {
return resultErr
}
if !result.Accepted {
return datatransfer.ErrRejected
}
if result.LeaveRequestPaused {
if stayPaused {
return datatransfer.ErrPause
}
if handleResumes {
return datatransfer.ErrResume
}
return nil
}

Expand All @@ -255,7 +251,7 @@ func (m *manager) recordAcceptedValidationEvents(chst datatransfer.ChannelState,
chid := chst.ChannelID()

// pause or resume the request as neccesary
if result.LeaveRequestPaused {
if result.LeaveRequestPaused(chst) {
if !chst.Status().IsResponderPaused() {
err := m.channels.PauseResponder(chid)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestDataTransferResponding(t *testing.T) {
"new push request pauses": {
configureValidator: func(sv *testutil.StubbedValidator) {
sv.ExpectSuccessPush()
sv.StubResult(datatransfer.ValidationResult{Accepted: true, LeaveRequestPaused: true, VoucherResult: testutil.NewFakeDTType()})
sv.StubResult(datatransfer.ValidationResult{Accepted: true, ForcePause: true, VoucherResult: testutil.NewFakeDTType()})
},
verify: func(t *testing.T, h *receiverHarness) {
h.network.Delegate.ReceiveRequest(h.ctx, h.peers[1], h.pushRequest)
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestDataTransferResponding(t *testing.T) {
"new pull request pauses": {
configureValidator: func(sv *testutil.StubbedValidator) {
sv.ExpectSuccessPull()
sv.StubResult(datatransfer.ValidationResult{Accepted: true, LeaveRequestPaused: true})
sv.StubResult(datatransfer.ValidationResult{Accepted: true, ForcePause: true})
},
verify: func(t *testing.T, h *receiverHarness) {
response, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
Expand Down
23 changes: 20 additions & 3 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ type ValidationResult struct {
// VoucherResult provides information to the other party about what happened
// with the voucher
VoucherResult
// LeaveRequestPaused indicates whether the request should stay paused
// even if the request was accepted
LeaveRequestPaused bool
// ForcePause indicates whether the request should be paused, regardless
// of data limit and finalization status
ForcePause bool
// DataLimit specifies how much data this voucher is good for. When the amount
// of data specified is reached (or shortly after), the request will pause
// pending revalidation. 0 indicates no limit.
Expand All @@ -28,6 +28,23 @@ type ValidationResult struct {
RequiresFinalization bool
}

// LeaveRequestPaused indicates whether all conditions are met to resume a request
func (vr ValidationResult) LeaveRequestPaused(chst ChannelState) bool {
if vr.ForcePause {
return true
}
if vr.RequiresFinalization && chst.Status().InFinalization() {
return true
}
var limitFactor uint64
if chst.IsPull() {
limitFactor = chst.Queued()
} else {
limitFactor = chst.Received()
}
return vr.DataLimit != 0 && limitFactor >= vr.DataLimit
}

// RequestValidator is an interface implemented by the client of the
// data transfer module to validate requests
type RequestValidator interface {
Expand Down
1 change: 1 addition & 0 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var VoucherRequest = message1_1.VoucherRequest

// DEPRECATED: Use ValidationResultResponse
var RestartResponse = message1_1.RestartResponse

var ValidationResultResponse = message1_1.ValidationResultResponse

// DEPRECATED: Use ValidationResultResponse
Expand Down
5 changes: 3 additions & 2 deletions message/message1_1prime/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func ValidationResultResponse(
messageType types.MessageType,
id datatransfer.TransferID,
validationResult datatransfer.ValidationResult,
validationErr error) (datatransfer.Response, error) {
validationErr error,
paused bool) (datatransfer.Response, error) {
voucherResultType := datatransfer.EmptyTypeIdentifier
if validationResult.VoucherResult != nil {
voucherResultType = validationResult.VoucherResult.Type()
Expand All @@ -125,7 +126,7 @@ func ValidationResultResponse(
// Validation errors vs rejections
RequestAccepted: validationErr == nil && validationResult.Accepted,
MessageType: uint64(messageType),
Paused: validationResult.LeaveRequestPaused,
Paused: paused,
TransferId: uint64(id),
VoucherTypeIdentifier: voucherResultType,
VoucherResultPtr: &vnode,
Expand Down
2 changes: 1 addition & 1 deletion network/libp2p_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestMessageSendAndReceive(t *testing.T) {
accepted := false
id := datatransfer.TransferID(rand.Int31())
voucherResult := testutil.NewFakeDTType()
response, err := message.ValidationResultResponse(types.NewMessage, id, datatransfer.ValidationResult{Accepted: accepted, VoucherResult: voucherResult}, nil)
response, err := message.ValidationResultResponse(types.NewMessage, id, datatransfer.ValidationResult{Accepted: accepted, VoucherResult: voucherResult}, nil, false)
require.NoError(t, err)
require.NoError(t, dtnet2.SendMessage(ctx, host1.ID(), response))

Expand Down

0 comments on commit e5f267b

Please sign in to comment.