-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(dot/network): fix data race and add t.Parallel
to all tests
#2105
Changes from 41 commits
d05e39e
0611f92
1e03e39
1009d6c
dd4f9c7
f6d51f0
589c4a9
04b417a
9fa0bf0
034c4db
b4ee352
9bbc83b
894e218
8dc1d05
ae518e8
56b20e0
5879434
03f77aa
bbaa958
0518139
ec8dece
59cc124
a43421c
e7cf32f
7e8cc89
1a2a1b3
a459fe4
982d910
19e503d
15b4433
fade1ed
5869657
ea4509d
74df15f
3a8a47c
37046e1
1fc3b73
3bf902a
290fd1b
fa16436
6ca2cfd
dd9a674
c84d611
c46a26d
c8f7deb
5e70577
f1b9a3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |||
"errors" | ||||
"io" | ||||
"math/big" | ||||
"sync" | ||||
"testing" | ||||
|
||||
"github.com/ChainSafe/gossamer/dot/types" | ||||
|
@@ -18,15 +19,19 @@ import ( | |||
) | ||||
|
||||
type testStreamHandler struct { | ||||
sync.Mutex | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit Maybe name the mutex so we know what it is protecting? Is it the |
||||
|
||||
messages map[peer.ID][]Message | ||||
decoder messageDecoder | ||||
exit bool | ||||
|
||||
eofCh chan struct{} | ||||
} | ||||
|
||||
func newTestStreamHandler(decoder messageDecoder) *testStreamHandler { | ||||
return &testStreamHandler{ | ||||
messages: make(map[peer.ID][]Message), | ||||
decoder: decoder, | ||||
eofCh: make(chan struct{}), | ||||
} | ||||
} | ||||
|
||||
|
@@ -42,9 +47,11 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) { | |||
} | ||||
|
||||
func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error { | ||||
|
||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||
s.Lock() | ||||
defer s.Unlock() | ||||
msgs := s.messages[stream.Conn().RemotePeer()] | ||||
s.messages[stream.Conn().RemotePeer()] = append(msgs, msg) | ||||
|
||||
announceHandshake := &BlockAnnounceHandshake{ | ||||
BestBlockNumber: 0, | ||||
} | ||||
|
@@ -69,13 +76,10 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream, | |||
peer peer.ID, decoder messageDecoder, handler messageHandler) { | ||||
msgBytes := make([]byte, maxBlockResponseSize) | ||||
|
||||
defer func() { | ||||
s.exit = true | ||||
}() | ||||
|
||||
for { | ||||
tot, err := readStream(stream, msgBytes) | ||||
if errors.Is(err, io.EOF) { | ||||
s.eofCh <- struct{}{} | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you close it instead? I doubt you would get |
||||
return | ||||
} else if err != nil { | ||||
logger.Debugf("failed to read from stream using protocol %s: %s", stream.Protocol(), err) | ||||
|
@@ -100,6 +104,14 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream, | |||
} | ||||
} | ||||
|
||||
func (s *testStreamHandler) messagesFrom(peerID peer.ID) (messages []Message, ok bool) { | ||||
s.Lock() | ||||
defer s.Unlock() | ||||
|
||||
messages, ok = s.messages[peerID] | ||||
return messages, ok | ||||
} | ||||
|
||||
var starting, _ = variadic.NewUint64OrHash(uint64(1)) | ||||
|
||||
var one = uint32(1) | ||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -4,7 +4,7 @@ | |||||
package network | ||||||
|
||||||
import ( | ||||||
"fmt" | ||||||
"regexp" | ||||||
"testing" | ||||||
"time" | ||||||
|
||||||
|
@@ -50,11 +50,10 @@ func mustNewMultiAddr(s string) (a ma.Multiaddr) { | |||||
func TestExternalAddrsPublicIP(t *testing.T) { | ||||||
t.Parallel() | ||||||
|
||||||
port := availablePort(t) | ||||||
config := &Config{ | ||||||
BasePath: t.TempDir(), | ||||||
PublicIP: "10.0.5.2", | ||||||
Port: port, | ||||||
Port: availablePort(t), | ||||||
NoBootstrap: true, | ||||||
NoMDNS: true, | ||||||
} | ||||||
|
@@ -65,6 +64,8 @@ func TestExternalAddrsPublicIP(t *testing.T) { | |||||
privateIPs, err := newPrivateIPFilters() | ||||||
require.NoError(t, err) | ||||||
|
||||||
multiAddrRegex := regexp.MustCompile("^/ip4/(127.0.0.1|10.0.5.2)/tcp/[0-9]+$") | ||||||
|
||||||
for i, addr := range addrInfo.Addrs { | ||||||
switch i { | ||||||
case len(addrInfo.Addrs) - 1: | ||||||
|
@@ -73,16 +74,14 @@ func TestExternalAddrsPublicIP(t *testing.T) { | |||||
default: | ||||||
require.False(t, privateIPs.AddrBlocked(addr)) | ||||||
} | ||||||
} | ||||||
|
||||||
expected := []ma.Multiaddr{ | ||||||
mustNewMultiAddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)), | ||||||
mustNewMultiAddr(fmt.Sprintf("/ip4/10.0.5.2/tcp/%d", port)), | ||||||
require.True(t, multiAddrRegex.MatchString(addr.String())) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit you can use |
||||||
} | ||||||
assert.Equal(t, addrInfo.Addrs, expected) | ||||||
} | ||||||
|
||||||
func TestExternalAddrsPublicDNS(t *testing.T) { | ||||||
t.Parallel() | ||||||
|
||||||
config := &Config{ | ||||||
BasePath: t.TempDir(), | ||||||
PublicDNS: "alice", | ||||||
|
@@ -99,7 +98,6 @@ func TestExternalAddrsPublicDNS(t *testing.T) { | |||||
mustNewMultiAddr("/dns/alice/tcp/7001"), | ||||||
} | ||||||
assert.Equal(t, addrInfo.Addrs, expected) | ||||||
|
||||||
} | ||||||
|
||||||
// test host connect method | ||||||
|
@@ -222,7 +220,7 @@ func TestSend(t *testing.T) { | |||||
|
||||||
time.Sleep(TestMessageTimeout) | ||||||
|
||||||
msg, ok := handler.messages[nodeA.host.id()] | ||||||
msg, ok := handler.messagesFrom(nodeA.host.id()) | ||||||
require.True(t, ok) | ||||||
require.Equal(t, 1, len(msg)) | ||||||
require.Equal(t, testBlockReqMessage, msg[0]) | ||||||
|
@@ -273,24 +271,32 @@ func TestExistingStream(t *testing.T) { | |||||
require.NoError(t, err) | ||||||
|
||||||
time.Sleep(TestMessageTimeout) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we refactor this test to not use sleeps anymore? Now that you have |
||||||
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A") | ||||||
|
||||||
messages, ok := handlerB.messagesFrom(nodeA.host.id()) | ||||||
require.True(t, ok, "node B timed out waiting for message from node A") | ||||||
assert.Len(t, messages, 1) | ||||||
|
||||||
// node A uses the stream to send a second message | ||||||
err = nodeA.host.writeToStream(stream, testBlockReqMessage) | ||||||
require.NoError(t, err) | ||||||
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A") | ||||||
|
||||||
_, ok = handlerB.messagesFrom(nodeA.host.id()) | ||||||
require.True(t, ok, "node B timed out waiting for message from node A") | ||||||
|
||||||
// node B opens the stream to send the first message | ||||||
stream, err = nodeB.host.send(addrInfoA.ID, nodeB.host.protocolID, testBlockReqMessage) | ||||||
require.NoError(t, err) | ||||||
|
||||||
time.Sleep(TestMessageTimeout) | ||||||
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B") | ||||||
_, ok = handlerA.messagesFrom(nodeB.host.id()) | ||||||
require.True(t, ok, "node A timed out waiting for message from node B") | ||||||
|
||||||
// node B uses the stream to send a second message | ||||||
err = nodeB.host.writeToStream(stream, testBlockReqMessage) | ||||||
require.NoError(t, err) | ||||||
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B") | ||||||
|
||||||
_, ok = handlerA.messagesFrom(nodeB.host.id()) | ||||||
require.True(t, ok, "node A timed out waiting for message from node B") | ||||||
} | ||||||
|
||||||
func TestStreamCloseMetadataCleanup(t *testing.T) { | ||||||
|
@@ -531,9 +537,9 @@ func TestStreamCloseEOF(t *testing.T) { | |||||
|
||||||
nodeB := createTestService(t, configB) | ||||||
nodeB.noGossip = true | ||||||
|
||||||
handler := newTestStreamHandler(testBlockRequestMessageDecoder) | ||||||
nodeB.host.registerStreamHandler(nodeB.host.protocolID, handler.handleStream) | ||||||
require.False(t, handler.exit) | ||||||
|
||||||
addrInfoB := nodeB.host.addrInfo() | ||||||
err := nodeA.host.connect(addrInfoB) | ||||||
|
@@ -548,14 +554,20 @@ func TestStreamCloseEOF(t *testing.T) { | |||||
|
||||||
stream, err := nodeA.host.send(addrInfoB.ID, nodeB.host.protocolID, testBlockReqMessage) | ||||||
require.NoError(t, err) | ||||||
require.False(t, handler.exit) | ||||||
|
||||||
err = stream.Close() | ||||||
require.NoError(t, err) | ||||||
|
||||||
time.Sleep(TestBackoffTimeout) | ||||||
timeout := time.NewTimer(TestBackoffTimeout) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit rename to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't mind this naming. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it could be a timeout timer as well 😄 |
||||||
|
||||||
require.True(t, handler.exit) | ||||||
select { | ||||||
case <-timeout.C: | ||||||
t.Fatal("stream handler does not exit after stream closed") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit
Suggested change
|
||||||
case <-handler.eofCh: | ||||||
if !timeout.Stop() { | ||||||
<-timeout.C | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
// Test to check the nodes connection by peer set manager | ||||||
|
@@ -678,6 +690,7 @@ func TestPeerReputation(t *testing.T) { | |||||
nodeB.noGossip = true | ||||||
|
||||||
addrInfoB := nodeB.host.addrInfo() | ||||||
|
||||||
nodeA.host.h.Peerstore().AddAddrs(addrInfoB.ID, addrInfoB.Addrs, peerstore.PermanentAddrTTL) | ||||||
nodeA.host.cm.peerSetHandler.AddPeer(0, addrInfoB.ID) | ||||||
|
||||||
|
@@ -694,6 +707,9 @@ func TestPeerReputation(t *testing.T) { | |||||
time.Sleep(100 * time.Millisecond) | ||||||
|
||||||
rep, err := nodeA.host.cm.peerSetHandler.PeerReputation(addrInfoB.ID) | ||||||
|
||||||
const zeroReputation int32 = 0 | ||||||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
require.NoError(t, err) | ||||||
require.Greater(t, rep, int32(0)) | ||||||
require.Greater(t, rep, zeroReputation) | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -264,6 +264,7 @@ func closeOutboundStream(info *notificationsProtocol, peerID peer.ID, stream lib | |
) | ||
|
||
info.outboundHandshakeData.Delete(peerID) | ||
fmt.Println(">>>>>> closed outbound stream") | ||
_ = stream.Close() | ||
} | ||
|
||
|
@@ -289,12 +290,12 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc | |
} | ||
|
||
if s.host.messageCache != nil && s.host.messageCache.exists(peer, msg) { | ||
logger.Tracef("message has already been sent, ignoring: peer=%s msg=%s", peer, msg) | ||
logger.Debugf("message has already been sent, ignoring: peer=%s msg=%s", peer, msg) | ||
return | ||
} | ||
|
||
// we've completed the handshake with the peer, send message directly | ||
logger.Tracef("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg) | ||
logger.Debugf("sending message to peer %s using protocol %s: %s", peer, info.protocolID, msg) | ||
if err := s.host.writeToStream(stream, msg); err != nil { | ||
logger.Debugf("failed to send message to peer %s: %s", peer, err) | ||
|
||
|
@@ -310,7 +311,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc | |
} | ||
} | ||
|
||
logger.Tracef("successfully sent message on protocol %s to peer %s: message=", info.protocolID, peer, msg) | ||
logger.Debugf("successfully sent message on protocol %s to peer %s: message=", info.protocolID, peer, msg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the level change here 🤔 ? (applies to above as well) |
||
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{ | ||
Value: peerset.GossipSuccessValue, | ||
Reason: peerset.GossipSuccessReason, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit I think you can have this unexported