Skip to content

Commit

Permalink
Merge pull request #11104 from vimalk78/clientv3-fixes
Browse files Browse the repository at this point in the history
clientv3/concurrency: Added Mutex.TryLock()
  • Loading branch information
xiang90 committed Sep 12, 2019
2 parents 2822da8 + 04ddfa8 commit 79b15a9
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 24 deletions.
51 changes: 51 additions & 0 deletions clientv3/concurrency/example_mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,57 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
)

func ExampleMutex_TryLock() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")

s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")

// acquire lock for s1
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")

if err = m2.TryLock(context.TODO()); err == nil {
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}

if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2")

// Output:
// acquired lock for s1
// cannot acquire lock for s2, as already locked in another session
// released lock for s1
// acquired lock for s2
}

func ExampleMutex_Lock() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
Expand Down
70 changes: 53 additions & 17 deletions clientv3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package concurrency

import (
"context"
"errors"
"fmt"
"sync"

v3 "go.etcd.io/etcd/clientv3"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
)

// ErrLocked is returned by TryLock when Mutex is already locked by another session.
var ErrLocked = errors.New("mutex: Locked by another session")

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
s *Session
Expand All @@ -37,35 +41,44 @@ func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}

// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// Cannot lock, so delete the key
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return ErrLocked
}

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()

m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}

client := m.s.Client()
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
Expand All @@ -77,6 +90,29 @@ func (m *Mutex) Lock(ctx context.Context) error {
return werr
}

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()

m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}

func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
Expand Down
70 changes: 63 additions & 7 deletions integration/v3_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,30 @@ import (

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/contrib/recipes"
recipe "go.etcd.io/etcd/contrib/recipes"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/testutil"
)

func TestMutexSingleNode(t *testing.T) {
func TestMutexLockSingleNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
testMutex(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
testMutexLock(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}

func TestMutexMultiNode(t *testing.T) {
func TestMutexLockMultiNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
testMutex(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
testMutexLock(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}

func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
func testMutexLock(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
// stream lock acquisitions
lockedC := make(chan *concurrency.Mutex)
for i := 0; i < waiters; i++ {
Expand Down Expand Up @@ -82,6 +82,62 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
}
}

func TestMutexTryLockSingleNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
testMutexTryLock(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}

func TestMutexTryLockMultiNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
testMutexTryLock(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}

func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.Client) {
lockedC := make(chan *concurrency.Mutex)
notlockedC := make(chan *concurrency.Mutex)
for i := 0; i < lockers; i++ {
go func() {
session, err := concurrency.NewSession(chooseClient())
if err != nil {
t.Error(err)
}
m := concurrency.NewMutex(session, "test-mutex-try-lock")
err = m.TryLock(context.TODO())
if err == nil {
lockedC <- m
} else if err == concurrency.ErrLocked {
notlockedC <- m
} else {
t.Errorf("Unexpected Error %v", err)
}
}()
}

timerC := time.After(time.Second)
select {
case <-lockedC:
for i := 0; i < lockers-1; i++ {
select {
case <-lockedC:
t.Fatalf("Multiple Mutes locked on same key")
case <-notlockedC:
case <-timerC:
t.Errorf("timed out waiting for lock")
}
}
case <-timerC:
t.Errorf("timed out waiting for lock")
}
}

// TestMutexSessionRelock ensures that acquiring the same lock with the same
// session will not result in deadlock.
func TestMutexSessionRelock(t *testing.T) {
Expand Down Expand Up @@ -219,7 +275,7 @@ func BenchmarkMutex4Waiters(b *testing.B) {
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
defer clus.Terminate(nil)
for i := 0; i < b.N; i++ {
testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
testMutexLock(nil, 4, func() *clientv3.Client { return clus.RandClient() })
}
}

Expand Down

0 comments on commit 79b15a9

Please sign in to comment.