Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): fix data race and add t.Parallel to all tests #2105

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d05e39e
fix: remove race conditions on dot/network
EclesioMeloJunior Dec 6, 2021
0611f92
chore: use available ports queue and update all test to call t.Parall…
EclesioMeloJunior Dec 6, 2021
1e03e39
chore: resolve data races on dot/network
EclesioMeloJunior Dec 7, 2021
1009d6c
chore: add locks to global logger New
EclesioMeloJunior Dec 7, 2021
dd4f9c7
chore: resolve data race with stop channel to peerset
EclesioMeloJunior Dec 7, 2021
f6d51f0
chore: data race at peerstate nodes
EclesioMeloJunior Dec 7, 2021
589c4a9
chore: data race on cfg.SlotDuration
EclesioMeloJunior Dec 7, 2021
04b417a
chore: remove unused comment
EclesioMeloJunior Dec 7, 2021
9fa0bf0
chore: remove test log
EclesioMeloJunior Dec 9, 2021
034c4db
Merge branch 'eclesio/net-race-conditions' of github.com:ChainSafe/go…
EclesioMeloJunior Dec 9, 2021
b4ee352
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Dec 9, 2021
9bbc83b
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Jan 5, 2022
894e218
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Jan 6, 2022
8dc1d05
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Jan 6, 2022
ae518e8
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Jan 14, 2022
56b20e0
chore: remove unused `stopCh`
EclesioMeloJunior Jan 14, 2022
5879434
chore: resolve stream handler race condition
EclesioMeloJunior Jan 14, 2022
03f77aa
chore: resolve race at `TestExistingStream`
EclesioMeloJunior Jan 17, 2022
bbaa958
chore: resolve race at `(*PeersState).getNode()`
EclesioMeloJunior Jan 17, 2022
0518139
chore: fix data race at `TestStreamCloseEOF`
EclesioMeloJunior Jan 17, 2022
ec8dece
chore: add `MessageCacheTTL` at node config
EclesioMeloJunior Jan 17, 2022
59cc124
chore: split tests to avoid data race
EclesioMeloJunior Jan 17, 2022
a43421c
chore: introduce mutexes at peerset handler to avoide close a channel…
EclesioMeloJunior Jan 17, 2022
e7cf32f
feat: remove race conditions from `dot/network` package
EclesioMeloJunior Jan 17, 2022
7e8cc89
chore: remove `./test_data` from network pkg tests
EclesioMeloJunior Jan 17, 2022
1a2a1b3
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Jan 17, 2022
a459fe4
chore: fix typo
EclesioMeloJunior Jan 26, 2022
982d910
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Jan 26, 2022
19e503d
Merge branch 'eclesio/net-race-conditions' of github.com:ChainSafe/go…
EclesioMeloJunior Jan 26, 2022
15b4433
use timer :D
EclesioMeloJunior Jan 26, 2022
fade1ed
chore: test `ok` instead nil messages
EclesioMeloJunior Jan 26, 2022
5869657
chore: use a more meaningful name
EclesioMeloJunior Jan 26, 2022
ea4509d
chore: fix typo
EclesioMeloJunior Jan 26, 2022
74df15f
chore: improving code
EclesioMeloJunior Jan 26, 2022
3a8a47c
chore: remove availablePortQueue
EclesioMeloJunior Jan 27, 2022
37046e1
chore: fix problems with port number 0
EclesioMeloJunior Jan 28, 2022
1fc3b73
chore: use map to avoid inner loops
EclesioMeloJunior Jan 28, 2022
3bf902a
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Jan 28, 2022
290fd1b
chore: remove unneeded code
EclesioMeloJunior Jan 28, 2022
fa16436
chore: use availablePort(t) for tests
EclesioMeloJunior Jan 28, 2022
6ca2cfd
chore: replace available port by 0 at rpc integration tests
EclesioMeloJunior Jan 28, 2022
dd9a674
chore: using rwmutex in addPeerReputation and getNode
EclesioMeloJunior Jan 28, 2022
c84d611
chore: address comments
EclesioMeloJunior Jan 29, 2022
c46a26d
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior Feb 2, 2022
c8f7deb
Merge branch 'development' into eclesio/net-race-conditions
EclesioMeloJunior May 30, 2022
5e70577
chore: fix lint
EclesioMeloJunior May 30, 2022
f1b9a3f
chore: just fall through the `select` case
EclesioMeloJunior May 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ type Config struct {
SlotDuration time.Duration

Telemetry telemetry.Client
Metrics metrics.IntervalConfig

MessageCacheTTL time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit I think you can have this unexported

Metrics metrics.IntervalConfig
}

// build checks the configuration, sets up the private key for the network service,
Expand Down
24 changes: 18 additions & 6 deletions dot/network/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"io"
"math/big"
"sync"
"testing"

"github.com/ChainSafe/gossamer/dot/types"
Expand All @@ -18,15 +19,19 @@ import (
)

type testStreamHandler struct {
sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit Maybe name the mutex so we know what it is protecting? Is it the messages map? Or something else?


messages map[peer.ID][]Message
decoder messageDecoder
exit bool

eofCh chan struct{}
}

func newTestStreamHandler(decoder messageDecoder) *testStreamHandler {
return &testStreamHandler{
messages: make(map[peer.ID][]Message),
decoder: decoder,
eofCh: make(chan struct{}),
}
}

Expand All @@ -42,9 +47,11 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) {
}

func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

s.Lock()
defer s.Unlock()
msgs := s.messages[stream.Conn().RemotePeer()]
s.messages[stream.Conn().RemotePeer()] = append(msgs, msg)

announceHandshake := &BlockAnnounceHandshake{
BestBlockNumber: 0,
}
Expand All @@ -69,13 +76,10 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream,
peer peer.ID, decoder messageDecoder, handler messageHandler) {
msgBytes := make([]byte, maxBlockResponseSize)

defer func() {
s.exit = true
}()

for {
tot, err := readStream(stream, msgBytes)
if errors.Is(err, io.EOF) {
s.eofCh <- struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you close it instead? I doubt you would get EOF twice right?

return
} else if err != nil {
logger.Debugf("failed to read from stream using protocol %s: %s", stream.Protocol(), err)
Expand All @@ -100,6 +104,14 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream,
}
}

func (s *testStreamHandler) messagesFrom(peerID peer.ID) (messages []Message, ok bool) {
s.Lock()
defer s.Unlock()

messages, ok = s.messages[peerID]
return messages, ok
}

var starting, _ = variadic.NewUint64OrHash(uint64(1))

var one = uint32(1)
Expand Down
2 changes: 1 addition & 1 deletion dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return int64(1)
},
}
msgCache, err := newMessageCache(config, msgCacheTTL)
msgCache, err := newMessageCache(config, cfg.MessageCacheTTL)
if err != nil {
return nil, err
}
Expand Down
54 changes: 35 additions & 19 deletions dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package network

import (
"fmt"
"regexp"
"testing"
"time"

Expand Down Expand Up @@ -50,11 +50,10 @@ func mustNewMultiAddr(s string) (a ma.Multiaddr) {
func TestExternalAddrsPublicIP(t *testing.T) {
t.Parallel()

port := availablePort(t)
config := &Config{
BasePath: t.TempDir(),
PublicIP: "10.0.5.2",
Port: port,
Port: availablePort(t),
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -65,6 +64,8 @@ func TestExternalAddrsPublicIP(t *testing.T) {
privateIPs, err := newPrivateIPFilters()
require.NoError(t, err)

multiAddrRegex := regexp.MustCompile("^/ip4/(127.0.0.1|10.0.5.2)/tcp/[0-9]+$")

for i, addr := range addrInfo.Addrs {
switch i {
case len(addrInfo.Addrs) - 1:
Expand All @@ -73,16 +74,14 @@ func TestExternalAddrsPublicIP(t *testing.T) {
default:
require.False(t, privateIPs.AddrBlocked(addr))
}
}

expected := []ma.Multiaddr{
mustNewMultiAddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)),
mustNewMultiAddr(fmt.Sprintf("/ip4/10.0.5.2/tcp/%d", port)),
require.True(t, multiAddrRegex.MatchString(addr.String()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit you can use require.Regexp(t, "myregex", "mystring")

}
assert.Equal(t, addrInfo.Addrs, expected)
}

func TestExternalAddrsPublicDNS(t *testing.T) {
t.Parallel()

config := &Config{
BasePath: t.TempDir(),
PublicDNS: "alice",
Expand All @@ -99,7 +98,6 @@ func TestExternalAddrsPublicDNS(t *testing.T) {
mustNewMultiAddr("/dns/alice/tcp/7001"),
}
assert.Equal(t, addrInfo.Addrs, expected)

}

// test host connect method
Expand Down Expand Up @@ -222,7 +220,7 @@ func TestSend(t *testing.T) {

time.Sleep(TestMessageTimeout)

msg, ok := handler.messages[nodeA.host.id()]
msg, ok := handler.messagesFrom(nodeA.host.id())
require.True(t, ok)
require.Equal(t, 1, len(msg))
require.Equal(t, testBlockReqMessage, msg[0])
Expand Down Expand Up @@ -273,24 +271,32 @@ func TestExistingStream(t *testing.T) {
require.NoError(t, err)

time.Sleep(TestMessageTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we refactor this test to not use sleeps anymore? Now that you have testStreamHandler, you can write messages into an internal channel from testStreamHandler.handleMessage and just listen on that channel.

require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A")

messages, ok := handlerB.messagesFrom(nodeA.host.id())
require.True(t, ok, "node B timed out waiting for message from node A")
assert.Len(t, messages, 1)

// node A uses the stream to send a second message
err = nodeA.host.writeToStream(stream, testBlockReqMessage)
require.NoError(t, err)
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A")

_, ok = handlerB.messagesFrom(nodeA.host.id())
require.True(t, ok, "node B timed out waiting for message from node A")

// node B opens the stream to send the first message
stream, err = nodeB.host.send(addrInfoA.ID, nodeB.host.protocolID, testBlockReqMessage)
require.NoError(t, err)

time.Sleep(TestMessageTimeout)
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B")
_, ok = handlerA.messagesFrom(nodeB.host.id())
require.True(t, ok, "node A timed out waiting for message from node B")

// node B uses the stream to send a second message
err = nodeB.host.writeToStream(stream, testBlockReqMessage)
require.NoError(t, err)
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B")

_, ok = handlerA.messagesFrom(nodeB.host.id())
require.True(t, ok, "node A timed out waiting for message from node B")
}

func TestStreamCloseMetadataCleanup(t *testing.T) {
Expand Down Expand Up @@ -531,9 +537,9 @@ func TestStreamCloseEOF(t *testing.T) {

nodeB := createTestService(t, configB)
nodeB.noGossip = true

handler := newTestStreamHandler(testBlockRequestMessageDecoder)
nodeB.host.registerStreamHandler(nodeB.host.protocolID, handler.handleStream)
require.False(t, handler.exit)

addrInfoB := nodeB.host.addrInfo()
err := nodeA.host.connect(addrInfoB)
Expand All @@ -548,14 +554,20 @@ func TestStreamCloseEOF(t *testing.T) {

stream, err := nodeA.host.send(addrInfoB.ID, nodeB.host.protocolID, testBlockReqMessage)
require.NoError(t, err)
require.False(t, handler.exit)

err = stream.Close()
require.NoError(t, err)

time.Sleep(TestBackoffTimeout)
timeout := time.NewTimer(TestBackoffTimeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit rename to timer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind this naming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it could be a timeout timer as well 😄


require.True(t, handler.exit)
select {
case <-timeout.C:
t.Fatal("stream handler does not exit after stream closed")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
t.Fatal("stream handler does not exit after stream closed")
t.Fatal("stream handler did not exit after stream closed")

case <-handler.eofCh:
if !timeout.Stop() {
<-timeout.C
}
}
}

// Test to check the nodes connection by peer set manager
Expand Down Expand Up @@ -678,6 +690,7 @@ func TestPeerReputation(t *testing.T) {
nodeB.noGossip = true

addrInfoB := nodeB.host.addrInfo()

nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL)
nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID)

Expand All @@ -694,6 +707,9 @@ func TestPeerReputation(t *testing.T) {
time.Sleep(100 * time.Millisecond)

rep, err := nodeA.host.cm.peerSetHandler.PeerReputation(addrInfoB.ID)

const zeroReputation int32 = 0
EclesioMeloJunior marked this conversation as resolved.
Show resolved Hide resolved

require.NoError(t, err)
require.Greater(t, rep, int32(0))
require.Greater(t, rep, zeroReputation)
}
7 changes: 4 additions & 3 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func closeOutboundStream(info *notificationsProtocol, peerID peer.ID, stream lib
)

info.outboundHandshakeData.Delete(peerID)
fmt.Println(">>>>>> closed outbound stream")
_ = stream.Close()
}

Expand All @@ -289,12 +290,12 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
}

if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) {
logger.Tracef("message has already been sent, ignoring: peer=%s msg=%s", peer, msg)
logger.Debugf("message has already been sent, ignoring: peer=%s msg=%s", peer, msg)
return
}

// we've completed the handshake with the peer, send message directly
logger.Tracef("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg)
logger.Debugf("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg)
if err := s.host.writeToStream(stream, msg); err != nil {
logger.Debugf("failed to send message to peer %s: %s", peer, err)

Expand All @@ -310,7 +311,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
}
}

logger.Tracef("successfully sent message on protocol %s to peer %s: message=", info.protocolID, peer, msg)
logger.Debugf("successfully sent message on protocol %s to peer %s: message=", info.protocolID, peer, msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the level change here 🤔 ? (applies to above as well)

s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.GossipSuccessValue,
Reason: peerset.GossipSuccessReason,
Expand Down
Loading