Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix races in tests #370

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,13 @@ func TestDataTransferInitiating(t *testing.T) {
require.Equal(t, h.transport.ClosedChannels[0], channelID)

require.Eventually(t, func() bool {
h.network.SentMessagesLk.Lock()
defer h.network.SentMessagesLk.Unlock()
return len(h.network.SentMessages) == 2
}, 5*time.Second, 200*time.Millisecond)
h.network.SentMessagesLk.Lock()
cancelMessage := h.network.SentMessages[1].Message
h.network.SentMessagesLk.Unlock()
require.False(t, cancelMessage.IsUpdate())
require.False(t, cancelMessage.IsPaused())
require.True(t, cancelMessage.IsRequest())
Expand Down Expand Up @@ -261,10 +265,14 @@ func TestDataTransferInitiating(t *testing.T) {
require.Equal(t, h.transport.ClosedChannels[0], channelID)

require.Eventually(t, func() bool {
h.network.SentMessagesLk.Lock()
defer h.network.SentMessagesLk.Unlock()
return len(h.network.SentMessages) == 1
}, 5*time.Second, 200*time.Millisecond)

h.network.SentMessagesLk.Lock()
cancelMessage := h.network.SentMessages[0].Message
h.network.SentMessagesLk.Unlock()
require.False(t, cancelMessage.IsUpdate())
require.False(t, cancelMessage.IsPaused())
require.True(t, cancelMessage.IsRequest())
Expand Down
2 changes: 2 additions & 0 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ func TestDataTransferResponding(t *testing.T) {
h.network.Delegate.ReceiveRequest(h.ctx, h.peers[1], h.pushRequest)
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.cancelUpdate)
require.NoError(t, err)
h.transport.TransportLk.Lock()
defer h.transport.TransportLk.Unlock()
require.Len(t, h.transport.CleanedUpChannels, 1)
require.Equal(t, channelID(h.id, h.peers), h.transport.CleanedUpChannels[0])
},
Expand Down
44 changes: 34 additions & 10 deletions itest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1431,6 +1432,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
var chid datatransfer.ChannelID
var chidLk sync.Mutex
errChan := make(chan struct{}, 2)
clientPausePoint := 0
clientFinished := make(chan struct{}, 1)
Expand Down Expand Up @@ -1471,7 +1473,9 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
timer := time.NewTimer(config.unpauseResponderDelay)
go func() {
<-timer.C
chidLk.Lock()
_ = dt1.ResumeDataTransferChannel(ctx, chid)
chidLk.Unlock()
}()
}
if event.Code == datatransfer.NewVoucher && channelState.Queued() > 0 {
Expand All @@ -1496,7 +1500,9 @@ func TestSimulatedRetrievalFlow(t *testing.T) {

require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv))

chidLk.Lock()
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively)
chidLk.Unlock()
require.NoError(t, err)

for providerFinished != nil || clientFinished != nil {
Expand Down Expand Up @@ -2129,6 +2135,7 @@ func TestMultipleMessagesInExtension(t *testing.T) {
testutil.StartAndWaitForReady(ctx, t, dt2)

var chid datatransfer.ChannelID
var chidLk sync.Mutex
errChan := make(chan struct{}, 2)

clientPausePoint := 0
Expand All @@ -2155,13 +2162,15 @@ func TestMultipleMessagesInExtension(t *testing.T) {
finalVoucherResult := testutil.NewTestTypedVoucher()

dt2.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
chidLk.Lock()
currentChid := chid
chidLk.Unlock()
if event.Code == datatransfer.Error {
errChan <- struct{}{}
}
// Here we verify reception of voucherResults by the client
if event.Code == datatransfer.NewVoucherResult {
voucherResult := channelState.LastVoucherResult()
require.NoError(t, err)

// If this voucher result is the response voucher no action is needed
// we just know that the provider has accepted the transfer and is sending blocks
Expand All @@ -2174,15 +2183,15 @@ func TestMultipleMessagesInExtension(t *testing.T) {
// to revalidate and unpause the transfer
if clientPausePoint < 5 {
if voucherResult.Equals(voucherResults[clientPausePoint]) {
_ = dt2.SendVoucher(ctx, chid, testutil.NewTestTypedVoucher())
_ = dt2.SendVoucher(ctx, currentChid, testutil.NewTestTypedVoucher())
clientPausePoint++
}
}

// If this voucher result is the final voucher result we need
// to send a new voucher to unpause the provider and complete the transfer
if voucherResult.Equals(finalVoucherResult) {
_ = dt2.SendVoucher(ctx, chid, testutil.NewTestTypedVoucher())
_ = dt2.SendVoucher(ctx, currentChid, testutil.NewTestTypedVoucher())
}
}

Expand All @@ -2200,6 +2209,9 @@ func TestMultipleMessagesInExtension(t *testing.T) {
initialVoucherResult: &respVoucher,
}
dt1.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
chidLk.Lock()
currentChid := chid
chidLk.Unlock()
if event.Code == datatransfer.Error {
errChan <- struct{}{}
}
Expand All @@ -2208,23 +2220,25 @@ func TestMultipleMessagesInExtension(t *testing.T) {
}
if event.Code == datatransfer.NewVoucher && channelState.Queued() > 0 {
vs := sv.nextStatus()
dt1.UpdateValidationStatus(ctx, chid, vs)
dt1.UpdateValidationStatus(ctx, currentChid, vs)
}
if event.Code == datatransfer.DataLimitExceeded {
if nextVoucherResult < len(pausePoints) {
dt1.SendVoucherResult(ctx, chid, voucherResults[nextVoucherResult])
dt1.SendVoucherResult(ctx, currentChid, voucherResults[nextVoucherResult])
nextVoucherResult++
}
}
if event.Code == datatransfer.BeginFinalizing {
sv.requiresFinalization = false
dt1.SendVoucherResult(ctx, chid, finalVoucherResult)
dt1.SendVoucherResult(ctx, currentChid, finalVoucherResult)
}
})
require.NoError(t, dt1.RegisterVoucherType(testutil.TestVoucherType, sv))

voucher := testutil.NewTestTypedVoucherWith("applesauce")
chidLk.Lock()
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively)
chidLk.Unlock()
require.NoError(t, err)

// Expect the client to receive a response voucher, the provider to complete the transfer and
Expand Down Expand Up @@ -2299,10 +2313,14 @@ func TestMultipleParallelTransfers(t *testing.T) {
clientFinished := make(chan struct{}, 1)

var chid datatransfer.ChannelID
var chidLk sync.Mutex
chidReceived := make(chan struct{})
dt2.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
<-chidReceived
if chid != channelState.ChannelID() {
chidLk.Lock()
currentChid := chid
chidLk.Unlock()
if currentChid != channelState.ChannelID() {
return
}
if event.Code == datatransfer.Error {
Expand All @@ -2324,7 +2342,7 @@ func TestMultipleParallelTransfers(t *testing.T) {
// If this voucher result is the final voucher result we need
// to send a new voucher to unpause the provider and complete the transfer
if voucherResult.Equals(finalVoucherResult) {
_ = dt2.SendVoucher(ctx, chid, testutil.NewTestTypedVoucher())
_ = dt2.SendVoucher(ctx, currentChid, testutil.NewTestTypedVoucher())
}
}

Expand All @@ -2336,7 +2354,10 @@ func TestMultipleParallelTransfers(t *testing.T) {
providerFinished := make(chan struct{}, 1)
dt1.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
<-chidReceived
if chid != channelState.ChannelID() {
chidLk.Lock()
currentChid := chid
chidLk.Unlock()
if currentChid != channelState.ChannelID() {
return
}
if event.Code == datatransfer.Error {
Expand All @@ -2346,15 +2367,18 @@ func TestMultipleParallelTransfers(t *testing.T) {
providerFinished <- struct{}{}
}
if event.Code == datatransfer.BeginFinalizing {
dt1.SendVoucherResult(ctx, chid, finalVoucherResult)
dt1.SendVoucherResult(ctx, currentChid, finalVoucherResult)
}
})

root, origBytes := LoadRandomData(ctx, t, gsData.DagService1, size)
rootCid := root.(cidlink.Link).Cid

voucher := testutil.NewTestTypedVoucher()
chidLk.Lock()
var err error
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), voucher, rootCid, selectorparse.CommonSelector_ExploreAllRecursively)
chidLk.Unlock()
require.NoError(t, err)
close(chidReceived)
// Expect the client to receive a response voucher, the provider to complete the transfer and
Expand Down
16 changes: 16 additions & 0 deletions testutil/faketransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testutil
import (
"context"
"errors"
"sync"

"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
Expand All @@ -29,6 +30,7 @@ type ResumedChannel struct {

// FakeTransport is a fake transport with mocked results
type FakeTransport struct {
TransportLk sync.Mutex
OpenedChannels []OpenedChannel
OpenChannelErr error
ClosedChannels []datatransfer.ChannelID
Expand All @@ -54,18 +56,24 @@ func NewFakeTransport() *FakeTransport {
// request is push or pull -- OpenChannel is called by the party that is
// intending to receive data
func (ft *FakeTransport) OpenChannel(ctx context.Context, dataSender peer.ID, channelID datatransfer.ChannelID, root ipld.Link, stor datamodel.Node, channel datatransfer.ChannelState, msg datatransfer.Message) error {
ft.TransportLk.Lock()
defer ft.TransportLk.Unlock()
ft.OpenedChannels = append(ft.OpenedChannels, OpenedChannel{dataSender, channelID, root, stor, channel, msg})
return ft.OpenChannelErr
}

// CloseChannel closes the given channel
func (ft *FakeTransport) CloseChannel(ctx context.Context, chid datatransfer.ChannelID) error {
ft.TransportLk.Lock()
defer ft.TransportLk.Unlock()
ft.ClosedChannels = append(ft.ClosedChannels, chid)
return ft.CloseChannelErr
}

// SetEventHandler sets the handler for events on channels
func (ft *FakeTransport) SetEventHandler(events datatransfer.EventsHandler) error {
ft.TransportLk.Lock()
defer ft.TransportLk.Unlock()
ft.EventHandler = events
return ft.SetEventHandlerErr
}
Expand All @@ -76,18 +84,24 @@ func (ft *FakeTransport) Shutdown(ctx context.Context) error {

// PauseChannel paused the given channel ID
func (ft *FakeTransport) PauseChannel(ctx context.Context, chid datatransfer.ChannelID) error {
ft.TransportLk.Lock()
defer ft.TransportLk.Unlock()
ft.PausedChannels = append(ft.PausedChannels, chid)
return ft.PauseChannelErr
}

// ResumeChannel resumes the given channel
func (ft *FakeTransport) ResumeChannel(ctx context.Context, msg datatransfer.Message, chid datatransfer.ChannelID) error {
ft.TransportLk.Lock()
defer ft.TransportLk.Unlock()
ft.ResumedChannels = append(ft.ResumedChannels, ResumedChannel{chid, msg})
return ft.ResumeChannelErr
}

// CleanupChannel cleans up the given channel
func (ft *FakeTransport) CleanupChannel(chid datatransfer.ChannelID) {
ft.TransportLk.Lock()
defer ft.TransportLk.Unlock()
ft.CleanedUpChannels = append(ft.CleanedUpChannels, chid)
}

Expand All @@ -97,6 +111,8 @@ func RecordCustomizedTransfer() datatransfer.TransportOption {
if !ok {
return errors.New("incorrect transport")
}
ft.TransportLk.Lock()
defer ft.TransportLk.Unlock()
ft.CustomizedTransfers = append(ft.CustomizedTransfers, chid)
return nil
}
Expand Down
10 changes: 7 additions & 3 deletions testutil/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testutil

import (
"context"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -19,9 +20,10 @@ type FakeSentMessage struct {
// FakeNetwork is a network that satisfies the DataTransferNetwork interface but
// does not actually do anything
type FakeNetwork struct {
PeerID peer.ID
SentMessages []FakeSentMessage
Delegate network.Receiver
PeerID peer.ID
SentMessagesLk sync.Mutex
SentMessages []FakeSentMessage
Delegate network.Receiver
}

// NewFakeNetwork returns a new fake data transfer network instance
Expand All @@ -33,6 +35,8 @@ var _ network.DataTransferNetwork = (*FakeNetwork)(nil)

// SendMessage sends a GraphSync message to a peer.
func (fn *FakeNetwork) SendMessage(ctx context.Context, p peer.ID, m datatransfer.Message) error {
fn.SentMessagesLk.Lock()
defer fn.SentMessagesLk.Unlock()
fn.SentMessages = append(fn.SentMessages, FakeSentMessage{p, m})
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sync"
"testing"

blocks "github.com/ipfs/go-block-format"
Expand All @@ -24,12 +25,16 @@ var blockGenerator = blocksutil.NewBlockGenerator()

// var prioritySeq int
var seedSeq int64
var seedLock sync.Mutex

// RandomBytes returns a byte array of the given size with random values.
func RandomBytes(n int64) []byte {
data := new(bytes.Buffer)
random.WritePseudoRandomBytes(n, data, seedSeq) // nolint: gosec,errcheck
seedLock.Lock()
currentSeedSeq := seedSeq
seedSeq++
seedLock.Unlock()
random.WritePseudoRandomBytes(n, data, currentSeedSeq) // nolint: gosec,errcheck
return data.Bytes()
}

Expand Down
Loading