Skip to content

Commit

Permalink
A RedLock implementation using EVAL for locking. (#33)
Browse files Browse the repository at this point in the history
This commit is based on sewenew's recipes branch and implements and
tests the redlock algorithm.

Thanks go to sewenew for the many suggestions and help with this PR.
  • Loading branch information
wingunder authored and sewenew committed Nov 8, 2019
1 parent 439ac20 commit 15e2926
Show file tree
Hide file tree
Showing 6 changed files with 655 additions and 13 deletions.
140 changes: 136 additions & 4 deletions src/sw/redis++/recipes/redlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "redlock.h"
#include <cassert>
#include <thread>

namespace sw {

Expand Down Expand Up @@ -53,7 +54,7 @@ std::chrono::milliseconds RedMutex::try_lock(const std::string &val,
bool RedMutex::try_lock(const std::string &val,
const std::chrono::time_point<std::chrono::system_clock> &tp) {
try {
try_lock(val, _ttl(tp));
try_lock(val, RedLockUtils::ttl(tp));
} catch (const Error &err) {
return false;
}
Expand All @@ -79,7 +80,7 @@ bool RedMutex::_extend_lock_master(Redis &master,
auto tx = master.transaction(true);
auto r = tx.redis();
try {
auto ttl = _ttl(tp);
auto ttl = RedLockUtils::ttl(tp);

r.watch(_resource);

Expand Down Expand Up @@ -143,7 +144,7 @@ bool RedMutex::_try_lock_master(Redis &master,
return master.set(_resource, val, ttl, UpdateType::NOT_EXIST);
}

std::chrono::milliseconds RedMutex::_ttl(const SysTime &tp) const {
std::chrono::milliseconds RedLockUtils::ttl(const SysTime &tp) {
auto cur = std::chrono::system_clock::now();
auto ttl = tp - cur;
if (ttl.count() < 0) {
Expand All @@ -153,7 +154,7 @@ std::chrono::milliseconds RedMutex::_ttl(const SysTime &tp) const {
return std::chrono::duration_cast<std::chrono::milliseconds>(ttl);
}

std::string RedLock::_lock_id() const {
std::string RedLockUtils::lock_id() {
std::random_device dev;
std::mt19937 random_gen(dev());
int range = 10 + 26 + 26 - 1;
Expand All @@ -175,6 +176,137 @@ std::string RedLock::_lock_id() const {

return id;
}

RedLockMutexVessel::RedLockMutexVessel(Redis& instance) :
RedLockMutexVessel({instance})
{
}

RedLockMutexVessel::RedLockMutexVessel(std::initializer_list<std::reference_wrapper<Redis>> instances) :
_instances(instances.begin(), instances.end())
{
}

bool RedLockMutexVessel::_extend_lock_instance(Redis& instance,
const std::string& resource,
const std::string& random_string,
const std::chrono::milliseconds& ttl)
{
const static std::string script = R"(
if redis.call("GET",KEYS[1]) == ARGV[1] then
return redis.call("pexpire",KEYS[1],ARGV[2])
else
return 0
end
)";
auto result = instance.eval<long long>(script, {resource}, {random_string, std::to_string(ttl.count())});
return result != 0;
}

void RedLockMutexVessel::_unlock_instance(Redis& instance,
const std::string& resource,
const std::string& random_string)
{
const std::string script = R"(
if redis.call("GET",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
)";
instance.eval<long long>(script, {resource}, {random_string});
}

bool RedLockMutexVessel::_lock_instance(Redis& instance,
const std::string& resource,
const std::string& random_string,
const std::chrono::milliseconds& ttl)
{
const auto result = instance.set(resource, random_string, ttl, UpdateType::NOT_EXIST);
return result;
}

RedLockMutexVessel::LockInfo RedLockMutexVessel::lock(const std::string& resource,
const std::string& random_string,
const std::chrono::milliseconds& ttl,
int retry_count,
const std::chrono::milliseconds& retry_delay,
double clock_drift_factor)
{
LockInfo lock_info = {false, std::chrono::steady_clock::now(), ttl, resource, random_string};

for (int i=0; i<retry_count; i++) {
int num_locked = 0;
for (auto& instance : _instances) {
if (_lock_instance(instance, lock_info.resource, lock_info.random_string, ttl)) {
num_locked++;
}
}

const auto drift = std::chrono::milliseconds(int(ttl.count() * clock_drift_factor) + 2);
lock_info.time_remaining = std::chrono::duration_cast<std::chrono::milliseconds>
(lock_info.startTime + ttl - std::chrono::steady_clock::now() - drift);

if (lock_info.time_remaining.count() <= 0) {
unlock(lock_info);
break; // The should not retry after the TTL expiration.
}
if (num_locked >= _quorum()) {
lock_info.locked = true;
break; // We have the lock.
}

unlock(lock_info);

// Sleep, only if it's _not_ the last attempt.
if (i != retry_count - 1) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() * retry_delay.count() / RAND_MAX));
}
}
return lock_info;
}

RedLockMutexVessel::LockInfo RedLockMutexVessel::extend_lock(const RedLockMutexVessel::LockInfo& lock_info,
const std::chrono::milliseconds& ttl,
double clock_drift_factor)
{
if (lock_info.locked) {
LockInfo extended_lock_info = {false, std::chrono::steady_clock::now(), ttl, lock_info.resource, lock_info.random_string};
const auto time_remaining = std::chrono::duration_cast<std::chrono::milliseconds>
(lock_info.startTime + lock_info.time_remaining - extended_lock_info.startTime);

if (time_remaining.count() > 0) {
int num_locked = 0;
for (auto& instance : _instances) {
if (_extend_lock_instance(instance, lock_info.resource, lock_info.random_string, ttl)) {
num_locked++;
}
}
const auto drift = std::chrono::milliseconds(int(ttl.count() * clock_drift_factor) + 2);
extended_lock_info.time_remaining = std::chrono::duration_cast<std::chrono::milliseconds>
(extended_lock_info.startTime + ttl - std::chrono::steady_clock::now() - drift);

if (num_locked >= _quorum() && extended_lock_info.time_remaining.count() > 0) {
extended_lock_info.locked = true;
}
else {
unlock(lock_info);
}
}
return extended_lock_info;
}
else {
return lock_info;
}
}

void RedLockMutexVessel::unlock(const LockInfo& lock_info)
{
for (auto& instance : _instances) {
_unlock_instance(instance, lock_info.resource, lock_info.random_string);
}
}

}

}
152 changes: 145 additions & 7 deletions src/sw/redis++/recipes/redlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ namespace sw {

namespace redis {

class RedLockUtils {
public:
using SysTime = std::chrono::time_point<std::chrono::system_clock>;

static std::chrono::milliseconds ttl(const SysTime &tp);

static std::string lock_id();
};

class RedMutex {
public:
// Lock with a single Redis master.
Expand Down Expand Up @@ -74,20 +83,17 @@ class RedMutex {
return _masters.size() / 2 + 1;
}

using SysTime = std::chrono::time_point<std::chrono::system_clock>;

std::chrono::milliseconds _ttl(const SysTime &tp) const;

using RedisRef = std::reference_wrapper<Redis>;

std::vector<RedisRef> _masters;

std::string _resource;
};

template <typename RedisInstance>
class RedLock {
public:
RedLock(RedMutex &mut, std::defer_lock_t) : _mut(mut), _lock_val(_lock_id()) {}
RedLock(RedisInstance &mut, std::defer_lock_t) : _mut(mut), _lock_val(RedLockUtils::lock_id()) {}

~RedLock() {
if (_owned) {
Expand Down Expand Up @@ -134,15 +140,147 @@ class RedLock {
}

private:
std::string _lock_id() const;

RedMutex &_mut;
RedisInstance &_mut;

bool _owned = false;

std::string _lock_val;
};

class RedLockMutexVessel
{
public:

// This class does _not_ implement RedMutexInterface, as it gives
// the user the ability to use an instance of it for multiple resources.
// More than one resource can thus be locked and tracked with a single
// instantiation of this class.

explicit RedLockMutexVessel(Redis& instance);
explicit RedLockMutexVessel(std::initializer_list<std::reference_wrapper<Redis>> instances);

RedLockMutexVessel(const RedLockMutexVessel &) = delete;
RedLockMutexVessel& operator=(const RedLockMutexVessel &) = delete;

RedLockMutexVessel(RedLockMutexVessel &&) = delete;
RedLockMutexVessel& operator=(RedLockMutexVessel &&) = delete;

~RedLockMutexVessel() = default;

// The LockInfo struct can be used for chaining a lock to
// one or more extend_locks and finally to an unlock.
// All the lock's information is contained, so multiple
// locks could be handled with a single RedLockMutexVessel instance.
struct LockInfo {
bool locked;
std::chrono::time_point<std::chrono::steady_clock> startTime;
std::chrono::milliseconds time_remaining;
std::string resource;
std::string random_string;
};

// RedLockMutexVessel::lock will (re)try to get a lock until either:
// - it gets a lock on (n/2)+1 of the instances.
// - a period of TTL elapsed.
// - the number of retries was reached.
// - an exception was thrown.
LockInfo lock(const std::string& resource,
const std::string& random_string,
const std::chrono::milliseconds& ttl,
int retry_count = 3,
const std::chrono::milliseconds& retry_delay = std::chrono::milliseconds(200),
double clock_drift_factor = 0.01);

// RedLockMutexVessel::extend_lock is exactly the same as RedLockMutexVessel::lock,
// but needs LockInfo from a previously acquired lock.
LockInfo extend_lock(const LockInfo& lock_info,
const std::chrono::milliseconds& ttl,
double clock_drift_factor = 0.01);

// RedLockMutexVessel::unlock unlocks all locked instances,
// that was locked with LockInfo,
void unlock(const LockInfo& lock_info);

private:

bool _lock_instance(Redis& instance,
const std::string& resource,
const std::string& random_string,
const std::chrono::milliseconds& ttl);

bool _extend_lock_instance(Redis& instance,
const std::string& resource,
const std::string& random_string,
const std::chrono::milliseconds& ttl);

void _unlock_instance(Redis& instance,
const std::string& resource,
const std::string& random_string);

int _quorum() const {
return _instances.size() / 2 + 1;
}

std::vector<std::reference_wrapper<Redis>> _instances;
};

class RedLockMutex
{
public:
explicit RedLockMutex(Redis& instance, const std::string& resource) :
_redlock_mutex(instance), _resource(resource) {}

explicit RedLockMutex(std::initializer_list<std::reference_wrapper<Redis>> instances,
const std::string &resource) :
_redlock_mutex(instances), _resource(resource) {}

RedLockMutex(const RedLockMutex &) = delete;
RedLockMutex& operator=(const RedLockMutex &) = delete;

RedLockMutex(RedLockMutex &&) = delete;
RedLockMutex& operator=(RedLockMutex &&) = delete;

virtual ~RedLockMutex() = default;

std::chrono::milliseconds try_lock(const std::string& random_string,
const std::chrono::milliseconds& ttl)
{
const auto lock_info = _redlock_mutex.lock(_resource, random_string, ttl, 1);
if (!lock_info.locked) {
throw Error("failed to lock: " + _resource);
}
return lock_info.time_remaining;
}

bool try_lock(const std::string &random_string,
const std::chrono::time_point<std::chrono::system_clock> &tp)
{
const auto lock_info = _redlock_mutex.lock(_resource, random_string, RedLockUtils::ttl(tp), 1);
return lock_info.locked;
}

bool extend_lock(const std::string &random_string,
const std::chrono::time_point<std::chrono::system_clock> &tp)
{
const auto ttl = RedLockUtils::ttl(tp);
const RedLockMutexVessel::LockInfo lock_info =
{true, std::chrono::steady_clock::now(), ttl, _resource, random_string};
const auto result = _redlock_mutex.extend_lock(lock_info, ttl);
return result.locked;
}

void unlock(const std::string &random_string)
{
_redlock_mutex.unlock({true, std::chrono::steady_clock::now(),
std::chrono::milliseconds(0), _resource, random_string});
}

private:
RedLockMutexVessel _redlock_mutex;
const std::string _resource;
};

}

}
Expand Down
17 changes: 15 additions & 2 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ find_path(HIREDIS_HEADER hiredis)
target_include_directories(${PROJECT_NAME} PUBLIC ${HIREDIS_HEADER})

find_library(HIREDIS_STATIC_LIB libhiredis.a)
target_link_libraries(${PROJECT_NAME} ${HIREDIS_STATIC_LIB})

find_package (OpenSSL)

if (OPENSSL_FOUND)
target_compile_definitions(${PROJECT_NAME} PUBLIC -DUSE_OPENSSL)
include_directories(${OPENSSL_INCLUDE_DIR})
target_link_libraries(${PROJECT_NAME} ${HIREDIS_STATIC_LIB} ${OPENSSL_CRYPTO_LIBRARY})
else (OPENSSL_FOUND)
target_link_libraries(${PROJECT_NAME} ${HIREDIS_STATIC_LIB})
endif (OPENSSL_FOUND)

# redis++ dependency
target_include_directories(${PROJECT_NAME} PUBLIC ../src)
Expand All @@ -30,4 +39,8 @@ ENDIF(CMAKE_SYSTEM_NAME MATCHES "(Solaris|SunOS)" )

find_package(Threads REQUIRED)

target_link_libraries(${PROJECT_NAME} ${REDIS_PLUS_PLUS_LIB} ${CMAKE_THREAD_LIBS_INIT})
if (OPENSSL_FOUND)
target_link_libraries(${PROJECT_NAME} ${REDIS_PLUS_PLUS_LIB} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_CRYPTO_LIBRARY})
else (OPENSSL_FOUND)
target_link_libraries(${PROJECT_NAME} ${REDIS_PLUS_PLUS_LIB} ${CMAKE_THREAD_LIBS_INIT})
endif (OPENSSL_FOUND)
Loading

0 comments on commit 15e2926

Please sign in to comment.