Skip to content

Commit

Permalink
Merge pull request #58 from libi/develop
Browse files Browse the repository at this point in the history
  • Loading branch information
libi committed May 25, 2023
2 parents 8ab8580 + bfc91dd commit 4616c7e
Show file tree
Hide file tree
Showing 32 changed files with 1,352 additions and 963 deletions.
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ If use distributed-lock to implement it. I will depends on the system-time of ea

1. Create redisDriver instance, set the `ServiceName` and initialize `dcron`. The `ServiceName` will defined the same task unit.
```golang
drv, _ := redis.NewDriver(&redis.Options{
Host: "127.0.0.1:6379"
})
dcron := NewDcron("server1", drv)
redisCli := redis.NewClient(&redis.Options{
Addr: DefaultRedisAddr,
})
drv := driver.NewRedisDriver(redisCli)
dcron := NewDcron("server1", drv)
```
2. Use cron-language to add task, you should set the `TaskName`, the `TaskName` is the primary-key of each task.
```golang
dcron.AddFunc("test1","*/3 * * * *",func(){
fmt.Println("execute test1 task",time.Now().Format("15:04:05"))
})
dcron.AddFunc("test1","*/3 * * * *",func(){
fmt.Println("execute test1 task",time.Now().Format("15:04:05"))
})
```
3. Begin the task
```golang
Expand Down
17 changes: 10 additions & 7 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ a lightweight distributed job scheduler library based on redis or etcd

1.创建redisDriver实例,指定服务名并初始化dcron。服务名为执行相同任务的单元。
```golang
drv, _ := redis.NewDriver(&redis.Options{
Host: "127.0.0.1:6379"
})
dcron := NewDcron("server1", drv)
redisCli := redis.NewClient(&redis.Options{
Addr: DefaultRedisAddr,
})
drv := driver.NewRedisDriver(redisCli)
dcron := NewDcron("server1", drv)
```
当然,如果你可以自己实现一个自定义的Driver也是可以的,只需要实现[DriverV2](driver/driver.go)接口即可。

2.使用cron语法添加任务,需要指定任务名。任务名作为任务的唯一标识,必须保证唯一。
```golang
dcron.AddFunc("test1","*/3 * * * *",func(){
fmt.Println("执行 test1 任务",time.Now().Format("15:04:05"))
})
dcron.AddFunc("test1","*/3 * * * *",func(){
fmt.Println("执行 test1 任务",time.Now().Format("15:04:05"))
})
```
3.开始任务。
```golang
Expand Down
44 changes: 17 additions & 27 deletions dcron.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dcron

import (
"context"
"errors"
"log"
"os"
Expand All @@ -15,7 +16,7 @@ import (

const (
defaultReplicas = 50
defaultDuration = time.Second
defaultDuration = 3 * time.Second
)

const (
Expand All @@ -31,7 +32,7 @@ type Dcron struct {
jobsRWMut sync.Mutex

ServerName string
nodePool *NodePool
nodePool INodePool
running int32

logger dlog.Logger
Expand All @@ -47,34 +48,24 @@ type Dcron struct {
}

// NewDcron create a Dcron
func NewDcron(serverName string, driver driver.Driver, cronOpts ...cron.Option) *Dcron {
func NewDcron(serverName string, driver driver.DriverV2, cronOpts ...cron.Option) *Dcron {
dcron := newDcron(serverName)
dcron.crOptions = cronOpts
dcron.cr = cron.New(cronOpts...)
dcron.running = dcronStopped
var err error
dcron.nodePool, err = newNodePool(serverName, driver, dcron, dcron.nodeUpdateDuration, dcron.hashReplicas)
if err != nil {
dcron.logger.Errorf("ERR: %s", err.Error())
return nil
}
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)
return dcron
}

// NewDcronWithOption create a Dcron with Dcron Option
func NewDcronWithOption(serverName string, driver driver.Driver, dcronOpts ...Option) *Dcron {
func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ...Option) *Dcron {
dcron := newDcron(serverName)
for _, opt := range dcronOpts {
opt(dcron)
}

dcron.cr = cron.New(dcron.crOptions...)
var err error
dcron.nodePool, err = newNodePool(serverName, driver, dcron, dcron.nodeUpdateDuration, dcron.hashReplicas)
if err != nil {
dcron.logger.Errorf("ERR: %s", err.Error())
return nil
}
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)

return dcron
}
Expand Down Expand Up @@ -112,7 +103,7 @@ func (d *Dcron) AddFunc(jobName, cronStr string, cmd func()) (err error) {
return d.addJob(jobName, cronStr, cmd, nil)
}
func (d *Dcron) addJob(jobName, cronStr string, cmd func(), job Job) (err error) {
d.logger.Infof("addJob '%s' : %s", jobName, cronStr)
d.logger.Infof("addJob '%s' : %s", jobName, cronStr)

d.jobsRWMut.Lock()
defer d.jobsRWMut.Unlock()
Expand Down Expand Up @@ -147,13 +138,7 @@ func (d *Dcron) Remove(jobName string) {
}

func (d *Dcron) allowThisNodeRun(jobName string) bool {
allowRunNode := d.nodePool.PickNodeByJobName(jobName)
d.logger.Infof("job '%s' running in node %s", jobName, allowRunNode)
if allowRunNode == "" {
d.logger.Errorf("node pool is empty")
return false
}
return d.nodePool.NodeID == allowRunNode
return d.nodePool.CheckJobAvailable(jobName)
}

// Start job
Expand All @@ -169,29 +154,33 @@ func (d *Dcron) Start() {
return
}
d.cr.Start()
d.logger.Infof("dcron started , nodeID is %s", d.nodePool.NodeID)
d.logger.Infof("dcron started , nodeID is %s", d.nodePool.GetNodeID())
} else {
d.logger.Infof("dcron have started")
}
}

// Run Job
func (d *Dcron) Run() {
// recover jobs before starting
if d.RecoverFunc != nil {
d.RecoverFunc(d)
}
if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
}

d.logger.Infof("dcron running nodeID is %s", d.nodePool.NodeID)
d.logger.Infof("dcron running nodeID is %s", d.nodePool.GetNodeID())
d.cr.Run()
} else {
d.logger.Infof("dcron already running")
}
}

func (d *Dcron) startNodePool() error {
if err := d.nodePool.StartPool(); err != nil {
if err := d.nodePool.Start(context.Background()); err != nil {
d.logger.Errorf("dcron start node pool error %+v", err)
return err
}
Expand All @@ -201,6 +190,7 @@ func (d *Dcron) startNodePool() error {
// Stop job
func (d *Dcron) Stop() {
tick := time.NewTicker(time.Millisecond)
d.nodePool.Stop(context.Background())
for range tick.C {
if atomic.CompareAndSwapInt32(&d.running, dcronRunning, dcronStopped) {
d.cr.Stop()
Expand Down
166 changes: 79 additions & 87 deletions dcron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@ import (
"fmt"
"log"
"os"
"sync"
"testing"
"time"

"github.com/go-redis/redis/v8"
"github.com/libi/dcron"
"github.com/libi/dcron/dlog"
RedisDriver "github.com/libi/dcron/driver/redis"
"github.com/libi/dcron/driver"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/require"
)

const (
DefaultRedisAddr = "127.0.0.1:6379"
)

type TestJob1 struct {
Expand All @@ -24,92 +30,25 @@ func (t TestJob1) Run() {

var testData = make(map[string]struct{})

func Test(t *testing.T) {
drv, err := RedisDriver.NewDriver(&redis.Options{
Addr: "127.0.0.1:6379",
})

if err != nil {
t.Error(err)
}
func TestMultiNodes(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(3)

go runNode(t, drv)
go runNode(t, wg)
// 间隔1秒启动测试节点刷新逻辑
time.Sleep(time.Second)
go runNode(t, drv)
time.Sleep(time.Second * 2)
go runNode(t, drv)

//add recover
dcron2 := dcron.NewDcron("server2", drv, cron.WithChain(cron.Recover(cron.DefaultLogger)))
dcron2.Start()
dcron2.Stop()

//panic recover test
err = dcron2.AddFunc("s2 test1", "* * * * *", func() {
panic("panic test")
})
if err != nil {
t.Fatal("add func error")
}
err = dcron2.AddFunc("s2 test2", "* * * * *", func() {
t.Log("执行 service2 test2 任务", time.Now().Format("15:04:05"))
})
if err != nil {
t.Fatal("add func error")
}
err = dcron2.AddFunc("s2 test3", "* * * * *", func() {
t.Log("执行 service2 test3 任务", time.Now().Format("15:04:05"))
})
if err != nil {
t.Fatal("add func error")
}
dcron2.Start()

// set logger
logger := &dlog.StdLogger{
Log: log.New(os.Stdout, "[test_s3]", log.LstdFlags),
}
// wrap cron recover
rec := dcron.CronOptionChain(cron.Recover(cron.PrintfLogger(logger)))

// option test
dcron3 := dcron.NewDcronWithOption("server3", drv, rec,
dcron.WithLogger(logger),
dcron.WithHashReplicas(10),
dcron.WithNodeUpdateDuration(time.Second*10))

//panic recover test
err = dcron3.AddFunc("s3 test1", "* * * * *", func() {
t.Log("执行 server3 test1 任务,模拟 panic", time.Now().Format("15:04:05"))
panic("panic test")
})
if err != nil {
t.Fatal("add func error")
}

err = dcron3.AddFunc("s3 test2", "* * * * *", func() {
t.Log("执行 server3 test2 任务", time.Now().Format("15:04:05"))
})
if err != nil {
t.Fatal("add func error")
}
err = dcron3.AddFunc("s3 test3", "* * * * *", func() {
t.Log("执行 server3 test3 任务", time.Now().Format("15:04:05"))
})
if err != nil {
t.Fatal("add func error")
}
dcron3.Start()
go runNode(t, wg)
time.Sleep(time.Second)
go runNode(t, wg)

//测试120秒后退出
time.Sleep(120 * time.Second)
t.Log("testData", testData)
dcron2.Stop()
dcron3.Stop()
wg.Wait()
}

func runNode(t *testing.T, drv *RedisDriver.RedisDriver) {
func runNode(t *testing.T, wg *sync.WaitGroup) {
redisCli := redis.NewClient(&redis.Options{
Addr: DefaultRedisAddr,
})
drv := driver.NewRedisDriver(redisCli)
dcron := dcron.NewDcron("server1", drv)
//添加多个任务 启动多个节点时 任务会均匀分配给各个节点

Expand Down Expand Up @@ -147,17 +86,18 @@ func runNode(t *testing.T, drv *RedisDriver.RedisDriver) {

//移除测试
dcron.Remove("s1 test3")
<-time.After(120 * time.Second)
wg.Done()
dcron.Stop()
}

func Test_SecondsJob(t *testing.T) {
drv, err := RedisDriver.NewDriver(&redis.Options{
Addr: "127.0.0.1:6379",
redisCli := redis.NewClient(&redis.Options{
Addr: DefaultRedisAddr,
})
if err != nil {
t.Error(err)
}
drv := driver.NewRedisDriver(redisCli)
dcr := dcron.NewDcronWithOption(t.Name(), drv, dcron.CronOptionSeconds())
err = dcr.AddFunc("job1", "*/5 * * * * *", func() {
err := dcr.AddFunc("job1", "*/5 * * * * *", func() {
t.Log(time.Now())
})
if err != nil {
Expand All @@ -167,3 +107,55 @@ func Test_SecondsJob(t *testing.T) {
time.Sleep(15 * time.Second)
dcr.Stop()
}

func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T) {
redisCli := redis.NewClient(&redis.Options{
Addr: DefaultRedisAddr,
})
drv := driver.NewRedisDriver(redisCli)
dcr := dcron.NewDcronWithOption(t.Name(), drv,
dcron.CronOptionSeconds(),
dcron.WithLogger(&dlog.StdLogger{
Log: log.New(os.Stdout, "["+id+"]", log.LstdFlags),
}),
dcron.CronOptionChain(cron.Recover(
cron.DefaultLogger,
)),
)
var err error
err = dcr.AddFunc("job1", "*/5 * * * * *", func() {
t.Log(time.Now())
})
require.Nil(t, err)
err = dcr.AddFunc("job2", "*/8 * * * * *", func() {
panic("test panic")
})
require.Nil(t, err)
err = dcr.AddFunc("job3", "*/2 * * * * *", func() {
t.Log("job3:", time.Now())
})
require.Nil(t, err)
dcr.Start()
<-time.After(runningTime)
dcr.Stop()
wg.Done()
}

func Test_SecondJobWithPanicAndMultiNodes(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(5)
go runSecondNode("1", wg, 45*time.Second, t)
go runSecondNode("2", wg, 45*time.Second, t)
go runSecondNode("3", wg, 45*time.Second, t)
go runSecondNode("4", wg, 45*time.Second, t)
go runSecondNode("5", wg, 45*time.Second, t)
wg.Wait()
}

func Test_SecondJobWithStopAndSwapNode(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(2)
go runSecondNode("1", wg, 60*time.Second, t)
go runSecondNode("2", wg, 20*time.Second, t)
wg.Wait()
}
Loading

0 comments on commit 4616c7e

Please sign in to comment.