Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3/concurrency: Added Mutex.TryLock() #11104

Merged
merged 1 commit into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions clientv3/concurrency/example_mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,57 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
)

func ExampleMutex_TryLock() {
vimalk78 marked this conversation as resolved.
Show resolved Hide resolved
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")

s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")

// acquire lock for s1
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")

if err = m2.TryLock(context.TODO()); err == nil {
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}

if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2")

// Output:
// acquired lock for s1
// cannot acquire lock for s2, as already locked in another session
// released lock for s1
// acquired lock for s2
}

func ExampleMutex_Lock() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
Expand Down
70 changes: 53 additions & 17 deletions clientv3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ package concurrency

import (
"context"
"errors"
"fmt"
"sync"

v3 "go.etcd.io/etcd/clientv3"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
)

// ErrLocked is returned by TryLock when Mutex is already locked by another session.
var ErrLocked = errors.New("mutex: Locked by another session")

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
s *Session
Expand All @@ -37,35 +41,44 @@ func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}

// TryLock locks the mutex if not already locked by another session.
// If lock is held by another session, return immediately after attempting necessary cleanup
// The ctx argument is used for the sending/receiving Txn RPC.
func (m *Mutex) TryLock(ctx context.Context) error {
vimalk78 marked this conversation as resolved.
Show resolved Hide resolved
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// Cannot lock, so delete the key
if _, err := client.Delete(ctx, m.myKey); err != nil {
vimalk78 marked this conversation as resolved.
Show resolved Hide resolved
return err
}
m.myKey = "\x00"
m.myRev = -1
return ErrLocked
}

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()

m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}

client := m.s.Client()
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
Expand All @@ -77,6 +90,29 @@ func (m *Mutex) Lock(ctx context.Context) error {
return werr
}

func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()

m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}

func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
Expand Down
70 changes: 63 additions & 7 deletions integration/v3_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,30 @@ import (

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/contrib/recipes"
recipe "go.etcd.io/etcd/contrib/recipes"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/testutil"
)

func TestMutexSingleNode(t *testing.T) {
func TestMutexLockSingleNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
testMutex(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
testMutexLock(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}

func TestMutexMultiNode(t *testing.T) {
func TestMutexLockMultiNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
testMutex(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
testMutexLock(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}

func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
func testMutexLock(t *testing.T, waiters int, chooseClient func() *clientv3.Client) {
// stream lock acquisitions
lockedC := make(chan *concurrency.Mutex)
for i := 0; i < waiters; i++ {
Expand Down Expand Up @@ -82,6 +82,62 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
}
}

func TestMutexTryLockSingleNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
testMutexTryLock(t, 5, makeSingleNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}

func TestMutexTryLockMultiNode(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
testMutexTryLock(t, 5, makeMultiNodeClients(t, clus.cluster, &clients))
closeClients(t, clients)
}

func testMutexTryLock(t *testing.T, lockers int, chooseClient func() *clientv3.Client) {
lockedC := make(chan *concurrency.Mutex)
notlockedC := make(chan *concurrency.Mutex)
for i := 0; i < lockers; i++ {
go func() {
session, err := concurrency.NewSession(chooseClient())
if err != nil {
t.Error(err)
}
m := concurrency.NewMutex(session, "test-mutex-try-lock")
err = m.TryLock(context.TODO())
if err == nil {
lockedC <- m
} else if err == concurrency.ErrLocked {
notlockedC <- m
} else {
t.Errorf("Unexpected Error %v", err)
}
}()
}

timerC := time.After(time.Second)
select {
case <-lockedC:
for i := 0; i < lockers-1; i++ {
select {
case <-lockedC:
t.Fatalf("Multiple Mutes locked on same key")
case <-notlockedC:
case <-timerC:
t.Errorf("timed out waiting for lock")
}
}
case <-timerC:
t.Errorf("timed out waiting for lock")
}
}

// TestMutexSessionRelock ensures that acquiring the same lock with the same
// session will not result in deadlock.
func TestMutexSessionRelock(t *testing.T) {
Expand Down Expand Up @@ -219,7 +275,7 @@ func BenchmarkMutex4Waiters(b *testing.B) {
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
defer clus.Terminate(nil)
for i := 0; i < b.N; i++ {
testMutex(nil, 4, func() *clientv3.Client { return clus.RandClient() })
testMutexLock(nil, 4, func() *clientv3.Client { return clus.RandClient() })
}
}

Expand Down