Skip to content

Commit

Permalink
runtime: improve findrunnable() scaling
Browse files Browse the repository at this point in the history
Before this CL, the work-stealing loop in findrunnable() has the
following behavior:

- In the first 3 iterations, attempt to steal Gs from p.runq from other
Ps (i.e. Gs not at the head of other P's queues).

- In the final (4th) iteration, also attempt to steal from p.runnext
(i.e. Gs at the head of other P's queues), but only after sleeping for
3us before each stealing attempt.

Note that the number of loop iterations is arbitrary:
golang#16476 (comment)

This approach causes findrunnable() to scale poorly with GOMAXPROCS for
two reasons:

- The amount of time per iteration is linear in GOMAXPROCS.

- The amount of time spent sleeping in the last iteration is also linear
in GOMAXPROCS.

This CL changes the work-stealing loop to take at most 4us + 1 iteration
through all Ps, without any sleeping, by allowing the loop to spin iff
there is a p.runnext that will be, but is not yet, stealable. (The
amount of time to wait before stealing runnext is empirically bumped up
from 3us to 4us.)

Updates golang#28808
  • Loading branch information
nixprime committed Mar 24, 2020
1 parent 6a9d850 commit 7336478
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 71 deletions.
2 changes: 1 addition & 1 deletion src/runtime/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func RunSchedLocalQueueStealTest() {
gs[j].sig = 0
runqput(p1, &gs[j], false)
}
gp := runqsteal(p2, p1, true)
gp, _ := runqsteal(p2, p1, 0)
s := 0
if gp != nil {
s++
Expand Down
144 changes: 75 additions & 69 deletions src/runtime/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,9 @@ top:
// Steal work from other P's.
procs := uint32(gomaxprocs)
ranTimer := false
if procs == 1 {
goto stop
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
Expand All @@ -2184,47 +2187,55 @@ top:
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ {
for {
retry := false
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
gp, retryP := runqsteal(_p_, p2, now)
if gp != nil {
return gp, false
}

// Consider stealing timers from p2.
// This call to checkTimers is the only place where
// we hold a lock on a different P's timers.
// Lock contention can be a problem here, so avoid
// grabbing the lock if p2 is running and not marked
// for preemption. If p2 is running and not being
// preempted we assume it will handle its own timers.
if i > 2 && shouldStealTimers(p2) {
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
if ran {
// Running the timers may have
// made an arbitrary number of G's
// ready and added them to this P's
// local run queue. That invalidates
// the assumption of runqsteal
// that is always has room to add
// stolen G's. So check now if there
// is a local G to run.
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
ranTimer = true
if retryP {
retry = true
}
}
if !retry {
break
}
}
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
// This call to checkTimers is the only place where
// we hold a lock on a different P's timers.
// Lock contention can be a problem here, so avoid
// grabbing the lock if p2 is running and not marked
// for preemption. If p2 is running and not being
// preempted we assume it will handle its own timers.
if shouldStealTimers(p2) {
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
if ran {
// Running the timers may have made an arbitrary number of G's
// ready and added them to this P's local run queue.
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
ranTimer = true
}
}
}
Expand Down Expand Up @@ -5023,6 +5034,7 @@ func runqput(_p_ *p, gp *g, next bool) {
}

if next {
atomic.Store64(&_p_.runnextready, uint64(nanotime()))
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
Expand Down Expand Up @@ -5154,46 +5166,39 @@ func runqget(_p_ *p) (gp *g, inheritTime bool) {

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Returns number of grabbed goroutines, and true if caller should retry later.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, start int64) (uint32, bool) {
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
if n == 0 {
if stealRunNextG {
// Try to steal from _p_.runnext.
if next := _p_.runnext; next != 0 {
if _p_.status == _Prunning {
// Sleep to ensure that _p_ isn't about to run the g
// we are about to steal.
// The important use case here is when the g running
// on _p_ ready()s another g and then almost
// immediately blocks. Instead of stealing runnext
// in this window, back off to give _p_ a chance to
// schedule runnext. This will avoid thrashing gs
// between different Ps.
// A sync chan send/recv takes ~50ns as of time of
// writing, so 3us gives ~50x overshoot.
if GOOS != "windows" {
usleep(3)
} else {
// On windows system timer granularity is
// 1-15ms, which is way too much for this
// optimization. So just yield.
osyield()
}
}
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
// Try to steal from _p_.runnext.
next := _p_.runnext
if next == 0 {
return 0, false
}
// If next became ready after start, it is ineligible for stealing
// (to prevent findrunnable() from spinning forever).
nextready := int64(atomic.Load64(&_p_.runnextready))
if start < nextready {
return 0, false
}
return 0
// Otherwise, next is only stealable some time after it is readied.
// This ensures that when one G readies another and then
// immediately blocks or exits, its P has time to schedule the
// second G. A sync chan send/recv takes 50ns as of this time of
// writing, so 4us gives us ~80x overshoot.
if nanotime() < (nextready + 4000) {
return 0, true
}
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1, false
}
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
continue
Expand All @@ -5203,31 +5208,32 @@ func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
return n, false
}
}
}

// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
// Returns one of the stolen elements (or nil if failed), and true if caller
// should retry later.
func runqsteal(_p_, p2 *p, start int64) (*g, bool) {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
n, retry := runqgrab(p2, &_p_.runq, t, start)
if n == 0 {
return nil
return nil, retry
}
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
return gp, false
}
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
return gp, false
}

// A gQueue is a dequeue of Gs linked through g.schedlink. A G can only
Expand Down
3 changes: 2 additions & 1 deletion src/runtime/runtime2.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ type p struct {
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
runnext guintptr
runnext guintptr
runnextready uint64 // nanotime() when runnext readied

// Available G's (status == Gdead)
gFree struct {
Expand Down

0 comments on commit 7336478

Please sign in to comment.