-
Notifications
You must be signed in to change notification settings - Fork 112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(dot/network): fix data race and add t.Parallel
to all tests
#2105
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An early review, sorry I couldn't hold myself as I got excited with all this test parallelism 💯 😄
blocked by PR #2106 |
Co-authored-by: Quentin McGaw <quentin.mcgaw@gmail.com>
…ssamer into eclesio/net-race-conditions
Codecov Report
@@ Coverage Diff @@
## development #2105 +/- ##
===============================================
+ Coverage 57.40% 60.06% +2.65%
===============================================
Files 217 214 -3
Lines 28587 27807 -780
===============================================
+ Hits 16411 16701 +290
+ Misses 10487 9381 -1106
- Partials 1689 1725 +36
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
t.Parallel
t.Parallel
in all tests
t.Parallel
in all testst.Parallel
in all tests
t.Parallel
in all testst.Parallel
to all tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments but overall looks good, great work! Network is a good place to remove race conditions so good stuff
for i := 0; i < 5; i++ { | ||
nodeA.GossipMessage(announceMessage) | ||
time.Sleep(time.Millisecond * 10) | ||
} | ||
require.Equal(t, 6, len(handler.messages[nodeA.host.id()])) | ||
|
||
time.Sleep(time.Millisecond * 200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seconding this question
dot/network/stream_manager.go
Outdated
ctx context.Context | ||
streamDataMap *sync.Map //map[string]*streamData | ||
ctx context.Context | ||
streamDataMap *sync.Map //map[string]*streamData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SYNC.MAP??? Probably out of scope I would think. If we really are anti sync map maybe we should open an issue/epic to remove them from codebase?
|
||
defer h.writersWG.Done() | ||
|
||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused what the purpose of these two selects are, why cant they just be one? Would you mind explaining just for my own learning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically, the first one prevents later goroutines to write on the already closed channel, the second is when the close case is triggered while we are blocked trying to write in a full channel, but this code is a way to gracefully close a channel that is being used by other goroutines (since closing a channel while writing causes race condition).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooofff this bad. You don't have any guarantee h.closeCh
won't get closed between the two select blocks (and hence closing h.actionQueue
). Please change somehow.
dot/peerset/peerstate.go
Outdated
@@ -212,15 +215,16 @@ func (ps *PeersState) sortedPeers(idx int) peer.IDSlice { | |||
|
|||
func (ps *PeersState) addReputation(pid peer.ID, change ReputationChange) ( | |||
newReputation Reputation, err error) { | |||
ps.mu.Lock() | |||
defer ps.mu.Unlock() | |||
|
|||
n, err := ps.getNode(pid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to lock before this call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PeersState
causes race condition into network
pkg because the getNode
function access the ps.nodes
map at the same time addReputation
updates the map. I added Mutex
to PeersState
struct so getNode
uses the Lock/Unlock
to lock while reading and after getNode
executes addReputation
locks to write. It is not possible locking after getNode
since getNode
will starve waiting the unlock that will never be called, makes sense? 😅
However the previous implementation is not good so I updated to use a RWMutex
instead and remove the getNode
call from addReputation
which uses the Lock/Unlock
and now getNode
uses RLock/RUnlock
, basically getNode
will blocks if another goroutine calls the addReputation
first
@@ -105,7 +105,9 @@ type Config struct { | |||
SlotDuration time.Duration | |||
|
|||
Telemetry telemetry.Client | |||
Metrics metrics.IntervalConfig | |||
|
|||
MessageCacheTTL time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit I think you can have this unexported
@@ -18,15 +19,19 @@ import ( | |||
) | |||
|
|||
type testStreamHandler struct { | |||
sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit Maybe name the mutex so we know what it is protecting? Is it the messages
map? Or something else?
@@ -42,9 +47,11 @@ func (s *testStreamHandler) handleStream(stream libp2pnetwork.Stream) { | |||
} | |||
|
|||
func (s *testStreamHandler) handleMessage(stream libp2pnetwork.Stream, msg Message) error { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for { | ||
tot, err := readStream(stream, msgBytes) | ||
if errors.Is(err, io.EOF) { | ||
s.eofCh <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you close it instead? I doubt you would get EOF
twice right?
expected := []ma.Multiaddr{ | ||
mustNewMultiAddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)), | ||
mustNewMultiAddr(fmt.Sprintf("/ip4/10.0.5.2/tcp/%d", port)), | ||
require.True(t, multiAddrRegex.MatchString(addr.String())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit you can use require.Regexp(t, "myregex", "mystring")
h.writerWGMutex.Lock() | ||
h.writersWG.Add(1) | ||
h.writerWGMutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add to the writes wait group before launching the goroutine. And also the mutex protecting the waitgroup looks sketchy, is it really needed?
|
||
defer h.writersWG.Done() | ||
|
||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooofff this bad. You don't have any guarantee h.closeCh
won't get closed between the two select blocks (and hence closing h.actionQueue
). Please change somehow.
@@ -131,11 +157,11 @@ func (h *Handler) Start(ctx context.Context) { | |||
// SortedPeers return chan for sorted connected peer in the peerSet. | |||
func (h *Handler) SortedPeers(setIdx int) chan peer.IDSlice { | |||
resultPeersCh := make(chan peer.IDSlice) | |||
h.actionQueue <- action{ | |||
h.setActionQueue(action{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps rename to addToActionQueue
?
h.writerWGMutex.Lock() | ||
h.writersWG.Wait() | ||
h.writerWGMutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A mutex for the wait group looks very strange, try to find another way 🤔 ?
@@ -171,6 +175,9 @@ func (ps *PeersState) peerStatus(set int, peerID peer.ID) string { | |||
|
|||
// peers return the list of all the peers we know of. | |||
func (ps *PeersState) peers() []peer.ID { | |||
ps.mu.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps name mu
with a name so we know what it's protecting? I guess peerStateMutex
??
@@ -273,24 +271,32 @@ func TestExistingStream(t *testing.T) { | |||
require.NoError(t, err) | |||
|
|||
time.Sleep(TestMessageTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we refactor this test to not use sleeps anymore? Now that you have testStreamHandler
, you can write messages into an internal channel from testStreamHandler.handleMessage
and just listen on that channel.
|
||
err = stream.Close() | ||
require.NoError(t, err) | ||
|
||
time.Sleep(TestBackoffTimeout) | ||
timeout := time.NewTimer(TestBackoffTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind this naming.
@@ -237,35 +237,36 @@ func TestBroadcastMessages(t *testing.T) { | |||
// simulate message sent from core service | |||
nodeA.GossipMessage(anounceMessage) | |||
time.Sleep(time.Second * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove sleep?
|
||
// Only one message will be sent. | ||
// All 5 message will be sent since cache is disabled. | ||
for i := 0; i < 5; i++ { | ||
nodeA.GossipMessage(announceMessage) | ||
time.Sleep(time.Millisecond * 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to sleep here?
addrInfoB := nodeB.host.addrInfo() | ||
err := nodeA.host.connect(addrInfoB) | ||
// retry connect if "failed to dial" error | ||
if failedToDial(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would this error out on connect? I see this multiple times in this file. I would assume this thing should be ready? Otherwise, we should listen on some sort of internal ready channel to know that it's ready.
@EclesioMeloJunior can you resolve the conflicts? |
Of course, once I finish PR #2267 which this one depends on |
@EclesioMeloJunior can you change this to draft if you don't intend to merge this. |
Changes
dot/network
packageTests
On apple silicon processors
Issues
Primary Reviewer