Skip to content

Commit

Permalink
add loop queue (#53)
Browse files Browse the repository at this point in the history
* add loop queue

* add loop queue

* fix the bugs

add loop queue

move the worker queue to directory

按照新的接口实现 lifo 队列

添加新接口的环形队列实现

rename the slice queue

修复了 unlock

使用 queue 管理 goWorkerWithFunc

使用 dequeue 判断队列

add remainder

增加测试文件

循环队列需要一个空闲位

* remove interface{}

* Refine the logic of sync.Pool

* Add flowcharts of ants into READMEs

* Add the installation about ants v2

* Renew the functional options in READMEs

* Renew English and Chinese flowcharts

* rename package name

移动 worker queue 位置

worker queue 都修改为私有接口

考虑到性能问题,把 interface{} 改回到  *goworker

* 修改 releaseExpiry 和 releaseAll

* remove files

* fix some bug
  • Loading branch information
KevinBaiSg authored and panjf2000 committed Oct 9, 2019
1 parent 0efbda3 commit f0e2392
Show file tree
Hide file tree
Showing 6 changed files with 493 additions and 39 deletions.
62 changes: 23 additions & 39 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Pool struct {
expiryDuration time.Duration

// workers is a slice that store the available workers.
workers []*goWorker
workers workerQueue

// release is used to notice the pool to closed itself.
release int32
Expand Down Expand Up @@ -82,35 +82,21 @@ func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration)
defer heartbeat.Stop()

var expiredWorkers []*goWorker
for range heartbeat.C {
if atomic.LoadInt32(&p.release) == CLOSED {
break
}
currentTime := time.Now()

p.lock.Lock()
idleWorkers := p.workers
n := len(idleWorkers)
var i int
for i = 0; i < n && currentTime.Sub(idleWorkers[i].recycleTime) > p.expiryDuration; i++ {
}
expiredWorkers = append(expiredWorkers[:0], idleWorkers[:i]...)
if i > 0 {
m := copy(idleWorkers, idleWorkers[i:])
for i = m; i < n; i++ {
idleWorkers[i] = nil
}
p.workers = idleWorkers[:m]
}
stream := p.workers.releaseExpiry(p.expiryDuration)
p.lock.Unlock()

// Notify obsolete workers to stop.
// This notification must be outside the p.lock, since w.task
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
for i, w := range expiredWorkers {
for w := range stream {
w.task <- nil
expiredWorkers[i] = nil
}

// There might be a situation that all workers have been cleaned up(no any worker is running)
Expand Down Expand Up @@ -156,8 +142,11 @@ func NewPool(size int, options ...Option) (*Pool, error) {
},
}
if opts.PreAlloc {
p.workers = make([]*goWorker, 0, size)
p.workers = newQueue(loopQueueType, size)
} else {
p.workers = newQueue(stackType, 0)
}

p.cond = sync.NewCond(p.lock)

// Start a goroutine to clean up expired workers periodically.
Expand All @@ -166,7 +155,7 @@ func NewPool(size int, options ...Option) (*Pool, error) {
return p, nil
}

//---------------------------------------------------------------------------
// ---------------------------------------------------------------------------

// Submit submits a task to this pool.
func (p *Pool) Submit(task func()) error {
Expand Down Expand Up @@ -209,17 +198,12 @@ func (p *Pool) Release() {
p.once.Do(func() {
atomic.StoreInt32(&p.release, 1)
p.lock.Lock()
idleWorkers := p.workers
for i, w := range idleWorkers {
w.task <- nil
idleWorkers[i] = nil
}
p.workers = nil
p.workers.releaseAll()
p.lock.Unlock()
})
}

//---------------------------------------------------------------------------
// ---------------------------------------------------------------------------

// incRunning increases the number of the currently running goroutines.
func (p *Pool) incRunning() {
Expand All @@ -240,12 +224,9 @@ func (p *Pool) retrieveWorker() *goWorker {
}

p.lock.Lock()
idleWorkers := p.workers
n := len(idleWorkers) - 1
if n >= 0 {
w = idleWorkers[n]
idleWorkers[n] = nil
p.workers = idleWorkers[:n]

w = p.workers.dequeue()
if w != nil {
p.lock.Unlock()
} else if p.Running() < p.Cap() {
p.lock.Unlock()
Expand All @@ -268,13 +249,12 @@ func (p *Pool) retrieveWorker() *goWorker {
spawnWorker()
return w
}
l := len(p.workers) - 1
if l < 0 {

w = p.workers.dequeue()
if w == nil {
goto Reentry
}
w = p.workers[l]
p.workers[l] = nil
p.workers = p.workers[:l]

p.lock.Unlock()
}
return w
Expand All @@ -287,7 +267,11 @@ func (p *Pool) revertWorker(worker *goWorker) bool {
}
worker.recycleTime = time.Now()
p.lock.Lock()
p.workers = append(p.workers, worker)

err := p.workers.enqueue(worker)
if err != nil {
return false
}

// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal()
Expand Down
137 changes: 137 additions & 0 deletions worker_loop_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package ants

import "time"

type loopQueue struct {
items []*goWorker
expiry []*goWorker
head int
tail int
remainder int
}

func newLoopQueue(size int) *loopQueue {
if size <= 0 {
return nil
}

wq := loopQueue{
items: make([]*goWorker, size+1),
expiry: make([]*goWorker, 0),
head: 0,
tail: 0,
remainder: size + 1,
}

return &wq
}

func (wq *loopQueue) len() int {
if wq.remainder == 0 {
return 0
}

return (wq.tail - wq.head + wq.remainder) % wq.remainder
}

func (wq *loopQueue) cap() int {
if wq.remainder == 0 {
return 0
}
return wq.remainder - 1
}

func (wq *loopQueue) isEmpty() bool {
return wq.tail == wq.head
}

func (wq *loopQueue) enqueue(worker *goWorker) error {
if wq.remainder == 0 {
return ErrQueueLengthIsZero
}
if (wq.tail+1)%wq.remainder == wq.head {
return ErrQueueIsFull
}

wq.items[wq.tail] = worker
wq.tail = (wq.tail + 1) % wq.remainder

return nil
}

func (wq *loopQueue) dequeue() *goWorker {
if wq.len() == 0 {
return nil
}

w := wq.items[wq.head]
wq.head = (wq.head + 1) % wq.remainder

return w
}

func (wq *loopQueue) releaseExpiry(duration time.Duration) chan *goWorker {
stream := make(chan *goWorker)

if wq.len() == 0 {
close(stream)
return stream
}

wq.expiry = wq.expiry[:0]
expiryTime := time.Now().Add(-duration)

for wq.head != wq.tail {
if expiryTime.After(wq.items[wq.head].recycleTime) {
wq.expiry = append(wq.expiry, wq.items[wq.head])
wq.head = (wq.head + 1) % wq.remainder
continue
}
break
}

go func() {
defer close(stream)

for i := 0; i < len(wq.expiry); i++ {
stream <- wq.expiry[i]
}
}()

return stream
}

//func (wq *LoopQueue)search(compareTime time.Time, l, r int) int {
// if l == r {
// if wq.items[l].recycleTime.After(compareTime) {
// return -1
// } else {
// return l
// }
// }
//
// c := cap(wq.items)
// mid := ((r-l+c)/2 + l) % c
// if mid == l {
// return wq.search(compareTime, l, l)
// } else if wq.items[mid].recycleTime.After(compareTime) {
// return wq.search(compareTime, l, mid-1)
// } else {
// return wq.search(compareTime, mid+1, r)
// }
//}

func (wq *loopQueue) releaseAll() {
if wq.len() == 0 {
return
}

for wq.head != wq.tail {
wq.items[wq.head].task <- nil
wq.head = (wq.head + 1) % wq.remainder
}
wq.items = wq.items[:0]
wq.remainder = 0
wq.head = 0
wq.tail = 0
}
73 changes: 73 additions & 0 deletions worker_loop_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ants

import (
"testing"
"time"
)

func TestNewLoopQueue(t *testing.T) {
size := 100
q := newLoopQueue(size)
if q.len() != 0 {
t.Fatalf("Len error")
}

if q.cap() != size {
t.Fatalf("Cap error")
}

if !q.isEmpty() {
t.Fatalf("IsEmpty error")
}

if q.dequeue() != nil {
t.Fatalf("Dequeue error")
}
}

func TestLoopQueue(t *testing.T) {
size := 10
q := newLoopQueue(size)

for i := 0; i < 5; i++ {
err := q.enqueue(&goWorker{recycleTime: time.Now()})
if err != nil {
break
}
}

if q.len() != 5 {
t.Fatalf("Len error")
}

v := q.dequeue()
t.Log(v)

if q.len() != 4 {
t.Fatalf("Len error")
}

time.Sleep(time.Second)

for i := 0; i < 6; i++ {
err := q.enqueue(&goWorker{recycleTime: time.Now()})
if err != nil {
break
}
}

if q.len() != 10 {
t.Fatalf("Len error")
}

err := q.enqueue(&goWorker{recycleTime: time.Now()})
if err == nil {
t.Fatalf("Enqueue error")
}

q.releaseExpiry(time.Second)

if q.len() != 6 {
t.Fatalf("Len error: %d", q.len())
}
}
39 changes: 39 additions & 0 deletions worker_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ants

import (
"errors"
"time"
)

var (
ErrQueueIsFull = errors.New("the queue is full")
ErrQueueLengthIsZero = errors.New("the queue length is zero")
)

type workerQueue interface {
len() int
cap() int
isEmpty() bool
enqueue(worker *goWorker) error
dequeue() *goWorker
releaseExpiry(duration time.Duration) chan *goWorker
releaseAll()
}

type queueType int

const (
stackType queueType = 1 << iota
loopQueueType
)

func newQueue(qType queueType, size int) workerQueue {
switch qType {
case stackType:
return newWorkerStack(size)
case loopQueueType:
return newLoopQueue(size)
default:
return newWorkerStack(size)
}
}
Loading

0 comments on commit f0e2392

Please sign in to comment.