Skip to content

Commit

Permalink
refact etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
dxyinme committed Dec 30, 2023
1 parent 9c71fd6 commit 049bed1
Showing 1 changed file with 12 additions and 23 deletions.
35 changes: 12 additions & 23 deletions driver/etcddriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,12 @@ func (e *EtcdDriver) putKeyWithLease(ctx context.Context, key, val string) (clie
e.lease = etcdDefaultLease
}

subCtx, cancel := context.WithTimeout(ctx, etcdBusinessTimeout)
defer cancel()
resp, err := e.cli.Grant(subCtx, e.lease)
resp, err := e.cli.Grant(ctx, e.lease)
if err != nil {
return 0, err
}
//注册服务并绑定租约
_, err = e.cli.Put(subCtx, key, val, clientv3.WithLease(resp.ID))
_, err = e.cli.Put(ctx, key, val, clientv3.WithLease(resp.ID))
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -120,15 +118,14 @@ func (e *EtcdDriver) getServices() []string {
return addrs
}

func (e *EtcdDriver) keepAlive(ctx context.Context, nodeID string) (<-chan *clientv3.LeaseKeepAliveResponse, error) {
func (e *EtcdDriver) createLease(ctx context.Context, nodeID string) (*clientv3.LeaseKeepAliveResponse, error) {
var err error
e.leaseID, err = e.putKeyWithLease(ctx, nodeID, nodeID)
if err != nil {
e.logger.Errorf("putKeyWithLease error: %v", err)
return nil, err
}

return e.cli.KeepAlive(ctx, e.leaseID)
return e.cli.KeepAliveOnce(ctx, e.leaseID)
}

func (e *EtcdDriver) revoke(ctx context.Context) {
Expand All @@ -140,7 +137,7 @@ func (e *EtcdDriver) revoke(ctx context.Context) {

func (e *EtcdDriver) heartBeat(ctx context.Context) {
label:
leaseCh, err := e.keepAlive(ctx, e.nodeID)
_, err := e.createLease(ctx, e.nodeID)
if err != nil {
e.logger.Errorf("keep alive error, %v", err)
return
Expand All @@ -152,24 +149,16 @@ label:
e.logger.Infof("driver stopped")
return
}
case _, ok := <-leaseCh:
{
// if lease timeout, goto top of
// this function to keepalive
if !ok {
goto label
}
}
case <-time.After(etcdBusinessTimeout):
{
e.logger.Errorf("ectd cli keepalive timeout")
return
}
case <-time.After(time.Duration(e.lease/2) * (time.Second)):
case <-time.After(time.Duration(e.lease) * (time.Second) / 2):
{
// if near to nodes time,
// renew the lease
goto label
resp, err := e.cli.KeepAliveOnce(ctx, e.leaseID)
if err != nil {
e.logger.Errorf("keep alive error: %v", err)
goto label
}
e.logger.Infof("leaseID=%0x", resp.ID)
}
}
}
Expand Down

0 comments on commit 049bed1

Please sign in to comment.