Skip to content

Commit

Permalink
p2p/discv4: revert gotreply handler change from #8661 (#9119) (#9195)
Browse files Browse the repository at this point in the history
The handler had race conditions in the candidates processing goroutine.
  • Loading branch information
battlmonstr committed Jan 11, 2024
1 parent d079008 commit cb5b75a
Showing 1 changed file with 18 additions and 61 deletions.
79 changes: 18 additions & 61 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/common/debug"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/p2p/discover/v4wire"
"github.com/ledgerwatch/erigon/p2p/enode"
"github.com/ledgerwatch/erigon/p2p/netutil"
"github.com/ledgerwatch/log/v3"
)

// Errors
Expand Down Expand Up @@ -610,74 +611,30 @@ func (t *UDPv4) loop() {
}()

case r := <-t.gotreply:

type matchCandidate struct {
el *list.Element
errc chan error
}

var matchCandidates []matchCandidate

mutex.Lock()
for el := plist.Front(); el != nil; el = el.Next() {
p := el.Value.(*replyMatcher)
if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) {
candidate := matchCandidate{el, p.errc}
p.errc = make(chan error, 1)
matchCandidates = append(matchCandidates, candidate)
}
}
mutex.Unlock()

if len(matchCandidates) == 0 {
// if there are no matched candidates try again matching against
// ip & port to handle node key changes
func() {
mutex.Lock()
for el := plist.Front(); el != nil; el = el.Next() {
p := el.Value.(*replyMatcher)
if p.ptype == r.data.Kind() && p.ip.Equal(r.ip) && p.port == r.port {
candidate := matchCandidate{el, p.errc}
p.errc = make(chan error, 1)
matchCandidates = append(matchCandidates, candidate)
}
}
mutex.Unlock()

if len(matchCandidates) == 0 {
r.matched <- false
}
}
defer mutex.Unlock()

go func(r reply) {
var matched bool // whether any replyMatcher considered the reply acceptable.
for _, candidate := range matchCandidates {
p := candidate.el.Value.(*replyMatcher)
ok, requestDone := p.callback(r.data)
matched = matched || ok
p.reply = r.data

// Remove the matcher if callback indicates that all replies have been received.
if requestDone {
mutex.Lock()
plist.Remove(candidate.el)
mutex.Unlock()
candidate.errc <- nil
listUpdate <- candidate.el
} else {
select {
case err := <-p.errc:
candidate.errc <- err
default:
p.errc = candidate.errc
for el := plist.Front(); el != nil; el = el.Next() {
p := el.Value.(*replyMatcher)
if (p.ptype == r.data.Kind()) && p.ip.Equal(r.ip) && (p.port == r.port) {
ok, requestDone := p.callback(r.data)
matched = matched || ok
p.reply = r.data
// Remove the matcher if callback indicates that all replies have been received.
if requestDone {
p.errc <- nil
plist.Remove(el)
listUpdate <- el
}
// Reset the continuous timeout counter (time drift detection)
contTimeouts = 0
}
}

r.matched <- matched
}(r)
}()

// Reset the continuous timeout counter (time drift detection)
contTimeouts = 0
case key := <-t.gotkey:
go func() {
if key, err := v4wire.DecodePubkey(crypto.S256(), key); err == nil {
Expand Down

0 comments on commit cb5b75a

Please sign in to comment.