-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(dot/network): fix data race and add t.Parallel
to all tests
#2105
Changes from 26 commits
d05e39e
0611f92
1e03e39
1009d6c
dd4f9c7
f6d51f0
589c4a9
04b417a
9fa0bf0
034c4db
b4ee352
9bbc83b
894e218
8dc1d05
ae518e8
56b20e0
5879434
03f77aa
bbaa958
0518139
ec8dece
59cc124
a43421c
e7cf32f
7e8cc89
1a2a1b3
a459fe4
982d910
19e503d
15b4433
fade1ed
5869657
ea4509d
74df15f
3a8a47c
37046e1
1fc3b73
3bf902a
290fd1b
fa16436
6ca2cfd
dd9a674
c84d611
c46a26d
c8f7deb
5e70577
f1b9a3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"errors" | ||
"io" | ||
"math/big" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/ChainSafe/gossamer/dot/types" | ||
|
@@ -18,15 +19,21 @@ import ( | |
) | ||
|
||
type testStreamHandler struct { | ||
sync.Mutex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit Maybe name the mutex so we know what it is protecting? Is it the |
||
|
||
messages map[peer.ID][]Message | ||
decoder messageDecoder | ||
exit bool | ||
|
||
// exitChan is a notify chan that the readStream | ||
// works and sucessfuly returns | ||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||
exitChan chan struct{} | ||
} | ||
|
||
func newTestStreamHandler(decoder messageDecoder) *testStreamHandler { | ||
return &testStreamHandler{ | ||
messages: make(map[peer.ID][]Message), | ||
decoder: decoder, | ||
exitChan: make(chan struct{}), | ||
} | ||
} | ||
|
||
|
@@ -42,6 +49,9 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) { | |
} | ||
|
||
func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error { | ||
s.Lock() | ||
defer s.Unlock() | ||
|
||
msgs := s.messages[stream.Conn().RemotePeer()] | ||
s.messages[stream.Conn().RemotePeer()] = append(msgs, msg) | ||
announceHandshake := &BlockAnnounceHandshake{ | ||
|
@@ -69,7 +79,7 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream, | |
msgBytes := make([]byte, maxBlockResponseSize) | ||
|
||
defer func() { | ||
s.exit = true | ||
close(s.exitChan) | ||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}() | ||
|
||
for { | ||
|
@@ -99,6 +109,14 @@ func (s *testStreamHandler) readStream(stream libp2pnetwork.Stream, | |
} | ||
} | ||
|
||
func (s *testStreamHandler) messagesFrom(peerID peer.ID) (messages []Message, ok bool) { | ||
s.Lock() | ||
defer s.Unlock() | ||
|
||
messages, ok = s.messages[peerID] | ||
return messages, ok | ||
} | ||
|
||
var starting, _ = variadic.NewUint64OrHash(uint64(1)) | ||
|
||
var one = uint32(1) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,6 +83,8 @@ func TestExternalAddrsPublicIP(t *testing.T) { | |
} | ||
|
||
func TestExternalAddrsPublicDNS(t *testing.T) { | ||
t.Parallel() | ||
|
||
config := &Config{ | ||
BasePath: t.TempDir(), | ||
PublicDNS: "alice", | ||
|
@@ -99,7 +101,6 @@ func TestExternalAddrsPublicDNS(t *testing.T) { | |
mustNewMultiAddr("/dns/alice/tcp/7001"), | ||
} | ||
assert.Equal(t, addrInfo.Addrs, expected) | ||
|
||
} | ||
|
||
// test host connect method | ||
|
@@ -222,7 +223,7 @@ func TestSend(t *testing.T) { | |
|
||
time.Sleep(TestMessageTimeout) | ||
|
||
msg, ok := handler.messages[nodeA.host.id()] | ||
msg, ok := handler.messagesFrom(nodeA.host.id()) | ||
require.True(t, ok) | ||
require.Equal(t, 1, len(msg)) | ||
require.Equal(t, testBlockReqMessage, msg[0]) | ||
|
@@ -273,24 +274,32 @@ func TestExistingStream(t *testing.T) { | |
require.NoError(t, err) | ||
|
||
time.Sleep(TestMessageTimeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we refactor this test to not use sleeps anymore? Now that you have |
||
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A") | ||
|
||
messages, _ := handlerB.messagesFrom(nodeA.host.id()) | ||
assert.Len(t, messages, 1) | ||
require.NotNil(t, messages, "node B timeout waiting for message from node A") | ||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// node A uses the stream to send a second message | ||
err = nodeA.host.writeToStream(stream, testBlockReqMessage) | ||
require.NoError(t, err) | ||
require.NotNil(t, handlerB.messages[nodeA.host.id()], "node B timeout waiting for message from node A") | ||
|
||
messages, _ = handlerB.messagesFrom(nodeA.host.id()) | ||
require.NotNil(t, messages, "node B timeout waiting for message from node A") | ||
|
||
// node B opens the stream to send the first message | ||
stream, err = nodeB.host.send(addrInfoA.ID, nodeB.host.protocolID, testBlockReqMessage) | ||
require.NoError(t, err) | ||
|
||
time.Sleep(TestMessageTimeout) | ||
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B") | ||
messages, _ = handlerA.messagesFrom(nodeB.host.id()) | ||
require.NotNil(t, messages, "node A timeout waiting for message from node B") | ||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// node B uses the stream to send a second message | ||
err = nodeB.host.writeToStream(stream, testBlockReqMessage) | ||
require.NoError(t, err) | ||
require.NotNil(t, handlerA.messages[nodeB.host.id()], "node A timeout waiting for message from node B") | ||
|
||
messages, _ = handlerA.messagesFrom(nodeB.host.id()) | ||
require.NotNil(t, messages, "node A timeout waiting for message from node B") | ||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
func TestStreamCloseMetadataCleanup(t *testing.T) { | ||
|
@@ -533,7 +542,6 @@ func TestStreamCloseEOF(t *testing.T) { | |
nodeB.noGossip = true | ||
handler := newTestStreamHandler(testBlockRequestMessageDecoder) | ||
nodeB.host.registerStreamHandler(nodeB.host.protocolID, handler.handleStream) | ||
require.False(t, handler.exit) | ||
|
||
addrInfoB := nodeB.host.addrInfo() | ||
err := nodeA.host.connect(addrInfoB) | ||
|
@@ -548,14 +556,15 @@ func TestStreamCloseEOF(t *testing.T) { | |
|
||
stream, err := nodeA.host.send(addrInfoB.ID, nodeB.host.protocolID, testBlockReqMessage) | ||
require.NoError(t, err) | ||
require.False(t, handler.exit) | ||
|
||
err = stream.Close() | ||
require.NoError(t, err) | ||
|
||
time.Sleep(TestBackoffTimeout) | ||
|
||
require.True(t, handler.exit) | ||
select { | ||
case <-time.After(time.Millisecond * 500): | ||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||
require.Fail(t, "stream handler does not exit after stream closed") | ||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||
case <-handler.exitChan: | ||
} | ||
} | ||
|
||
// Test to check the nodes connection by peer set manager | ||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -146,6 +146,9 @@ func NewService(cfg *Config) (*Service, error) { | |||||||||
} | ||||||||||
bufPool := newSizedBufferPool(preAllocateInPool, poolSize) | ||||||||||
|
||||||||||
const cleanupStreamInterval = time.Minute | ||||||||||
streamManager := newStreamManager(ctx, cleanupStreamInterval) | ||||||||||
|
||||||||||
network := &Service{ | ||||||||||
ctx: ctx, | ||||||||||
cancel: cancel, | ||||||||||
|
@@ -163,7 +166,7 @@ func NewService(cfg *Config) (*Service, error) { | |||||||||
telemetryInterval: cfg.telemetryInterval, | ||||||||||
closeCh: make(chan struct{}), | ||||||||||
bufPool: bufPool, | ||||||||||
streamManager: newStreamManager(ctx), | ||||||||||
streamManager: streamManager, | ||||||||||
blockResponseBuf: make([]byte, maxBlockResponseSize), | ||||||||||
telemetry: cfg.Telemetry, | ||||||||||
} | ||||||||||
|
@@ -282,8 +285,7 @@ func (s *Service) Start() error { | |||||||||
} | ||||||||||
|
||||||||||
go s.logPeerCount() | ||||||||||
go s.publishNetworkTelemetry(s.closeCh) | ||||||||||
go s.sentBlockIntervalTelemetry() | ||||||||||
go s.publishTelemetry(s.closeCh) | ||||||||||
s.streamManager.start() | ||||||||||
|
||||||||||
return nil | ||||||||||
|
@@ -384,47 +386,56 @@ func (s *Service) logPeerCount() { | |||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
func (s *Service) publishNetworkTelemetry(done <-chan struct{}) { | ||||||||||
func (s *Service) publishTelemetry(done <-chan struct{}) { | ||||||||||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
ticker := time.NewTicker(s.telemetryInterval) | ||||||||||
defer ticker.Stop() | ||||||||||
|
||||||||||
for { | ||||||||||
s.sentBandwidthTelemetry() | ||||||||||
err := s.sentBlockIntervalTelemetry() | ||||||||||
if err != nil { | ||||||||||
logger.Warnf("failed to sent block interval telemetry: %s", err) | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit
Suggested change
|
||||||||||
continue | ||||||||||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
} | ||||||||||
|
||||||||||
select { | ||||||||||
case <-done: | ||||||||||
return | ||||||||||
|
||||||||||
case <-ticker.C: | ||||||||||
o := s.host.bwc.GetBandwidthTotals() | ||||||||||
s.telemetry.SendMessage(telemetry.NewBandwidth(o.RateIn, o.RateOut, s.host.peerCount())) | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
func (s *Service) sentBlockIntervalTelemetry() { | ||||||||||
for { | ||||||||||
best, err := s.blockState.BestBlockHeader() | ||||||||||
if err != nil { | ||||||||||
continue | ||||||||||
} | ||||||||||
bestHash := best.Hash() | ||||||||||
func (s *Service) sentBandwidthTelemetry() { | ||||||||||
o := s.host.bwc.GetBandwidthTotals() | ||||||||||
s.telemetry.SendMessage(telemetry.NewBandwidth(o.RateIn, o.RateOut, s.host.peerCount())) | ||||||||||
Comment on lines
+420
to
+421
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit rename
Suggested change
|
||||||||||
} | ||||||||||
|
||||||||||
finalised, err := s.blockState.GetHighestFinalisedHeader() | ||||||||||
if err != nil { | ||||||||||
continue | ||||||||||
} | ||||||||||
finalizedHash := finalised.Hash() | ||||||||||
func (s *Service) sentBlockIntervalTelemetry() (err error) { | ||||||||||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
best, err := s.blockState.BestBlockHeader() | ||||||||||
if err != nil { | ||||||||||
return err | ||||||||||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
} | ||||||||||
|
||||||||||
s.telemetry.SendMessage(telemetry.NewBlockInterval( | ||||||||||
&bestHash, | ||||||||||
best.Number, | ||||||||||
&finalizedHash, | ||||||||||
finalised.Number, | ||||||||||
big.NewInt(int64(s.transactionHandler.TransactionsCount())), | ||||||||||
big.NewInt(0), // TODO: (ed) determine where to get used_state_cache_size (#1501) | ||||||||||
)) | ||||||||||
bestHash := best.Hash() | ||||||||||
|
||||||||||
time.Sleep(s.telemetryInterval) | ||||||||||
finalised, err := s.blockState.GetHighestFinalisedHeader() | ||||||||||
if err != nil { | ||||||||||
return err | ||||||||||
EclesioMeloJunior marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
} | ||||||||||
|
||||||||||
finalizedHash := finalised.Hash() | ||||||||||
|
||||||||||
s.telemetry.SendMessage(telemetry.NewBlockInterval( | ||||||||||
&bestHash, | ||||||||||
best.Number, | ||||||||||
&finalizedHash, | ||||||||||
finalised.Number, | ||||||||||
big.NewInt(int64(s.transactionHandler.TransactionsCount())), | ||||||||||
big.NewInt(0), // TODO: (ed) determine where to get used_state_cache_size (#1501) | ||||||||||
)) | ||||||||||
Comment on lines
+439
to
+446
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit it would be nice to split this in two, one instruction for |
||||||||||
|
||||||||||
return nil | ||||||||||
} | ||||||||||
|
||||||||||
func (s *Service) handleConn(conn libp2pnetwork.Conn) { | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit I think you can have this unexported