Skip to content

Commit

Permalink
swarm: remove unnecessary reqno for pending request tracking (#2460)
Browse files Browse the repository at this point in the history
* swarm: remove unnecessary reqno for pending request tracking

* use struct{} instead of bool for addrs map

* use struct{} for pendingRequests map
  • Loading branch information
sukunrt authored Aug 16, 2023
1 parent 9edef5a commit 4090dcc
Showing 1 changed file with 25 additions and 49 deletions.
74 changes: 25 additions & 49 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ type addrDial struct {
conn *Conn
// err is the err on dialing the address
err error
// requests is the list of pendRequests interested in this dial
// the value in the slice is the request number assigned to this request by the dialWorker
requests []int
// dialed indicates whether we have triggered the dial to the address
dialed bool
// createdAt is the time this struct was created
Expand All @@ -74,13 +71,9 @@ type dialWorker struct {
peer peer.ID
// reqch is used to send dial requests to the worker. close reqch to end the worker loop
reqch <-chan dialRequest
// reqno is the request number used to track different dialRequests for a peer.
// Each incoming request is assigned a reqno. This reqno is used in pendingRequests and in
// addrDial objects in trackedDials to track this request
reqno int
// pendingRequests maps reqno to the pendRequest object for a dialRequest
pendingRequests map[int]*pendRequest
// trackedDials tracks dials to the peers addresses. An entry here is used to ensure that
// pendingRequests is the set of pendingRequests
pendingRequests map[*pendRequest]struct{}
// trackedDials tracks dials to the peer's addresses. An entry here is used to ensure that
// we dial an address at most once
trackedDials map[string]*addrDial
// resch is used to receive response for dials to the peers addresses.
Expand All @@ -101,7 +94,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dia
s: s,
peer: p,
reqch: reqch,
pendingRequests: make(map[int]*pendRequest),
pendingRequests: make(map[*pendRequest]struct{}),
trackedDials: make(map[string]*addrDial),
resch: make(chan dialResult),
cl: cl,
Expand Down Expand Up @@ -232,10 +225,8 @@ loop:
continue loop
}

// The request has some pending or new dials. We assign this request a request number.
// This value of w.reqno is used to track this request in all the structures
w.reqno++
w.pendingRequests[w.reqno] = pr
// The request has some pending or new dials
w.pendingRequests[pr] = struct{}{}

for _, ad := range tojoin {
if !ad.dialed {
Expand All @@ -253,7 +244,6 @@ loop:
}
}
// add the request to the addrDial
ad.requests = append(ad.requests, w.reqno)
}

if len(todial) > 0 {
Expand All @@ -263,7 +253,6 @@ loop:
w.trackedDials[string(a.Bytes())] = &addrDial{
addr: a,
ctx: req.ctx,
requests: []int{w.reqno},
createdAt: now,
}
dq.Add(network.AddrDelay{Addr: a, Delay: addrDelay[string(a.Bytes())]})
Expand Down Expand Up @@ -328,20 +317,14 @@ loop:
continue loop
}

// request succeeded, respond to all pending requests
for _, reqno := range ad.requests {
pr, ok := w.pendingRequests[reqno]
if !ok {
// some other dial for this request succeeded before this one
continue
for pr := range w.pendingRequests {
if _, ok := pr.addrs[string(ad.addr.Bytes())]; ok {
pr.req.resch <- dialResponse{conn: conn}
delete(w.pendingRequests, pr)
}
pr.req.resch <- dialResponse{conn: conn}
delete(w.pendingRequests, reqno)
}

ad.conn = conn
ad.requests = nil

if !w.connected {
w.connected = true
if w.s.metricsTracer != nil {
Expand Down Expand Up @@ -375,33 +358,26 @@ loop:
// dispatches an error to a specific addr dial
func (w *dialWorker) dispatchError(ad *addrDial, err error) {
ad.err = err
for _, reqno := range ad.requests {
pr, ok := w.pendingRequests[reqno]
if !ok {
// some other dial for this request succeeded before this one
continue
}

for pr := range w.pendingRequests {
// accumulate the error
pr.err.recordErr(ad.addr, err)

delete(pr.addrs, string(ad.addr.Bytes()))
if len(pr.addrs) == 0 {
// all addrs have erred, dispatch dial error
// but first do a last one check in case an acceptable connection has landed from
// a simultaneous dial that started later and added new acceptable addrs
c, _ := w.s.bestAcceptableConnToPeer(pr.req.ctx, w.peer)
if c != nil {
pr.req.resch <- dialResponse{conn: c}
} else {
pr.req.resch <- dialResponse{err: pr.err}
if _, ok := pr.addrs[string(ad.addr.Bytes())]; ok {
pr.err.recordErr(ad.addr, err)
delete(pr.addrs, string(ad.addr.Bytes()))
if len(pr.addrs) == 0 {
// all addrs have erred, dispatch dial error
// but first do a last one check in case an acceptable connection has landed from
// a simultaneous dial that started later and added new acceptable addrs
c, _ := w.s.bestAcceptableConnToPeer(pr.req.ctx, w.peer)
if c != nil {
pr.req.resch <- dialResponse{conn: c}
} else {
pr.req.resch <- dialResponse{err: pr.err}
}
delete(w.pendingRequests, pr)
}
delete(w.pendingRequests, reqno)
}
}

ad.requests = nil

// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests.
// this is necessary to support active listen scenarios, where a new dial comes in while
// another dial is in progress, and needs to do a direct connection without inhibitions from
Expand Down

0 comments on commit 4090dcc

Please sign in to comment.