Skip to content

Commit

Permalink
Merge pull request #24 from thriee/hotfix/keepalive_leaseRespChan_blo…
Browse files Browse the repository at this point in the history
…cking

修复keeplive时leasech阻塞问题
  • Loading branch information
libi committed Aug 4, 2022
2 parents d7aaa5e + d02aac2 commit b4f2f6d
Showing 1 changed file with 43 additions and 16 deletions.
59 changes: 43 additions & 16 deletions driver/etcd/etcd_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package etcd

import (
"context"
"github.com/google/uuid"
"github.com/libi/dcron/driver"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"log"
"sync"
"time"

"github.com/google/uuid"
"github.com/libi/dcron/driver"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)

var _ driver.Driver = &EtcdDriver{}
Expand All @@ -24,6 +25,7 @@ type EtcdDriver struct {
lease int64
serverList map[string]map[string]string
lock sync.RWMutex
leaseID clientv3.LeaseID
}

//NewEtcdDriver ...
Expand Down Expand Up @@ -146,27 +148,52 @@ func (e *EtcdDriver) Ping() error {
return nil
}

func (e *EtcdDriver) SetHeartBeat(nodeID string) {
leaseID, err := e.putKeyWithLease(nodeID, nodeID)
func (e *EtcdDriver) keepAlive(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
var err error
e.leaseID, err = e.putKeyWithLease(nodeID, nodeID)
if err != nil {
log.Printf("putKeyWithLease error: %v", err)
return
return nil, err
}

leaseRespChan, err := e.cli.KeepAlive(context.Background(), leaseID)
return e.cli.KeepAlive(ctx, e.leaseID)
}

func (e *EtcdDriver) revoke() {
_, err := e.cli.Lease.Revoke(context.Background(), e.leaseID)
if err != nil {
log.Printf("keepalive error:%v", err)
log.Printf("lease revoke error: %v", err)
}
}

func (e *EtcdDriver) SetHeartBeat(nodeID string) {
leaseCh, err := e.keepAlive(context.Background(), nodeID)
if err != nil {
log.Printf("setHeartBeat error: %v", err)
return
}
select {
case resp := <-leaseRespChan:
if resp == nil {
log.Printf("ectd cli keepalive unexpected nil")
go func() {
defer func() {
err := recover()
if err != nil {
log.Printf("keepAlive panic: %v", err)
return
}
}()
for {
select {
case _, ok := <-leaseCh:
if !ok {
e.revoke()
e.SetHeartBeat(nodeID)
return
}
case <-time.After(businessTimeout):
log.Printf("ectd cli keepalive timeout")
return
}
}
case <-time.After(businessTimeout):
log.Printf("ectd cli keepalive timeout")
}
}()
}

// SetTimeout set etcd lease timeout
Expand Down

0 comments on commit b4f2f6d

Please sign in to comment.