基于 goroutine 和 channel 的生产和消费工具,可以启动一个生产者
和可配置数量的多个消费者
,支持取消、超时,自动销毁。可以捕获消费者panic事件,影响消费者数量。
go get -u github.com/smokezl/go-worker-pool
import "github.com/smokezl/go-worker-pool"
worker := go_worker_pool.NewGoWorker(ctx, &GoWorkerConfig{
Timeout: 10 * time.Minute,
WorkerNum: 20,
// Sync 表示是否需要调用 waitGroup
Sync: false,
})
// 生产者逻辑
go func() {
// 不要忘记关闭生产者通道
defer worker.CloseItemChan()
for i := 0; i < 10; i++ {
var arr []int
for j := 0; j < 100; j++ {
arr = append(arr, j)
}
err = worker.IterationProducer(arr)
if err != nil {
// err
return
}
}
}()
worker.RegisterFinishFunc(func() {
// 注册退出执行函数
// worker执行完退出或销毁时触发
})
worker.RegisterErrFunc(func(err error) {
// 注册错误执行函数
// worker出错(超时或者panic时触发)
})
// 消费者逻辑
worker.IterationConsumer(func(ctx context.Context, item interface{}) {
num := item.(int)
// 执行消费者代码
})
// 生产者逻辑
go func() {
// 不要忘记关闭生产者通道
defer worker.CloseItemChan()
for i := 0; i < 100; i++ {
err = worker.PushProducerItem(i)
if err != nil {
// err
return
}
}
}()
// 消费者逻辑同上