diff --git a/src/sw/redis++/recipes/redlock.cpp b/src/sw/redis++/recipes/redlock.cpp index acbd6499..5d39ebdc 100644 --- a/src/sw/redis++/recipes/redlock.cpp +++ b/src/sw/redis++/recipes/redlock.cpp @@ -16,6 +16,7 @@ #include "redlock.h" #include +#include namespace sw { @@ -53,7 +54,7 @@ std::chrono::milliseconds RedMutex::try_lock(const std::string &val, bool RedMutex::try_lock(const std::string &val, const std::chrono::time_point &tp) { try { - try_lock(val, _ttl(tp)); + try_lock(val, RedLockUtils::ttl(tp)); } catch (const Error &err) { return false; } @@ -79,7 +80,7 @@ bool RedMutex::_extend_lock_master(Redis &master, auto tx = master.transaction(true); auto r = tx.redis(); try { - auto ttl = _ttl(tp); + auto ttl = RedLockUtils::ttl(tp); r.watch(_resource); @@ -143,7 +144,7 @@ bool RedMutex::_try_lock_master(Redis &master, return master.set(_resource, val, ttl, UpdateType::NOT_EXIST); } -std::chrono::milliseconds RedMutex::_ttl(const SysTime &tp) const { +std::chrono::milliseconds RedLockUtils::ttl(const SysTime &tp) { auto cur = std::chrono::system_clock::now(); auto ttl = tp - cur; if (ttl.count() < 0) { @@ -153,7 +154,7 @@ std::chrono::milliseconds RedMutex::_ttl(const SysTime &tp) const { return std::chrono::duration_cast(ttl); } -std::string RedLock::_lock_id() const { +std::string RedLockUtils::lock_id() { std::random_device dev; std::mt19937 random_gen(dev()); int range = 10 + 26 + 26 - 1; @@ -175,6 +176,137 @@ std::string RedLock::_lock_id() const { return id; } + +RedLockMutexVessel::RedLockMutexVessel(Redis& instance) : + RedLockMutexVessel({instance}) +{ +} + +RedLockMutexVessel::RedLockMutexVessel(std::initializer_list> instances) : + _instances(instances.begin(), instances.end()) +{ +} + +bool RedLockMutexVessel::_extend_lock_instance(Redis& instance, + const std::string& resource, + const std::string& random_string, + const std::chrono::milliseconds& ttl) +{ + const static std::string script = R"( +if redis.call("GET",KEYS[1]) == ARGV[1] then + return redis.call("pexpire",KEYS[1],ARGV[2]) +else + return 0 +end +)"; + auto result = instance.eval(script, {resource}, {random_string, std::to_string(ttl.count())}); + return result != 0; +} + +void RedLockMutexVessel::_unlock_instance(Redis& instance, + const std::string& resource, + const std::string& random_string) +{ + const std::string script = R"( +if redis.call("GET",KEYS[1]) == ARGV[1] then + return redis.call("del",KEYS[1]) +else + return 0 +end +)"; + instance.eval(script, {resource}, {random_string}); +} + +bool RedLockMutexVessel::_lock_instance(Redis& instance, + const std::string& resource, + const std::string& random_string, + const std::chrono::milliseconds& ttl) +{ + const auto result = instance.set(resource, random_string, ttl, UpdateType::NOT_EXIST); + return result; +} + +RedLockMutexVessel::LockInfo RedLockMutexVessel::lock(const std::string& resource, + const std::string& random_string, + const std::chrono::milliseconds& ttl, + int retry_count, + const std::chrono::milliseconds& retry_delay, + double clock_drift_factor) +{ + LockInfo lock_info = {false, std::chrono::steady_clock::now(), ttl, resource, random_string}; + + for (int i=0; i + (lock_info.startTime + ttl - std::chrono::steady_clock::now() - drift); + + if (lock_info.time_remaining.count() <= 0) { + unlock(lock_info); + break; // The should not retry after the TTL expiration. + } + if (num_locked >= _quorum()) { + lock_info.locked = true; + break; // We have the lock. + } + + unlock(lock_info); + + // Sleep, only if it's _not_ the last attempt. + if (i != retry_count - 1) { + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() * retry_delay.count() / RAND_MAX)); + } + } + return lock_info; +} + +RedLockMutexVessel::LockInfo RedLockMutexVessel::extend_lock(const RedLockMutexVessel::LockInfo& lock_info, + const std::chrono::milliseconds& ttl, + double clock_drift_factor) +{ + if (lock_info.locked) { + LockInfo extended_lock_info = {false, std::chrono::steady_clock::now(), ttl, lock_info.resource, lock_info.random_string}; + const auto time_remaining = std::chrono::duration_cast + (lock_info.startTime + lock_info.time_remaining - extended_lock_info.startTime); + + if (time_remaining.count() > 0) { + int num_locked = 0; + for (auto& instance : _instances) { + if (_extend_lock_instance(instance, lock_info.resource, lock_info.random_string, ttl)) { + num_locked++; + } + } + const auto drift = std::chrono::milliseconds(int(ttl.count() * clock_drift_factor) + 2); + extended_lock_info.time_remaining = std::chrono::duration_cast + (extended_lock_info.startTime + ttl - std::chrono::steady_clock::now() - drift); + + if (num_locked >= _quorum() && extended_lock_info.time_remaining.count() > 0) { + extended_lock_info.locked = true; + } + else { + unlock(lock_info); + } + } + return extended_lock_info; + } + else { + return lock_info; + } +} + +void RedLockMutexVessel::unlock(const LockInfo& lock_info) +{ + for (auto& instance : _instances) { + _unlock_instance(instance, lock_info.resource, lock_info.random_string); + } +} + } } diff --git a/src/sw/redis++/recipes/redlock.h b/src/sw/redis++/recipes/redlock.h index 405fe281..3aae9d0b 100644 --- a/src/sw/redis++/recipes/redlock.h +++ b/src/sw/redis++/recipes/redlock.h @@ -28,6 +28,15 @@ namespace sw { namespace redis { +class RedLockUtils { +public: + using SysTime = std::chrono::time_point; + + static std::chrono::milliseconds ttl(const SysTime &tp); + + static std::string lock_id(); +}; + class RedMutex { public: // Lock with a single Redis master. @@ -74,10 +83,6 @@ class RedMutex { return _masters.size() / 2 + 1; } - using SysTime = std::chrono::time_point; - - std::chrono::milliseconds _ttl(const SysTime &tp) const; - using RedisRef = std::reference_wrapper; std::vector _masters; @@ -85,9 +90,10 @@ class RedMutex { std::string _resource; }; +template class RedLock { public: - RedLock(RedMutex &mut, std::defer_lock_t) : _mut(mut), _lock_val(_lock_id()) {} + RedLock(RedisInstance &mut, std::defer_lock_t) : _mut(mut), _lock_val(RedLockUtils::lock_id()) {} ~RedLock() { if (_owned) { @@ -134,15 +140,147 @@ class RedLock { } private: - std::string _lock_id() const; - RedMutex &_mut; + RedisInstance &_mut; bool _owned = false; std::string _lock_val; }; +class RedLockMutexVessel +{ +public: + + // This class does _not_ implement RedMutexInterface, as it gives + // the user the ability to use an instance of it for multiple resources. + // More than one resource can thus be locked and tracked with a single + // instantiation of this class. + + explicit RedLockMutexVessel(Redis& instance); + explicit RedLockMutexVessel(std::initializer_list> instances); + + RedLockMutexVessel(const RedLockMutexVessel &) = delete; + RedLockMutexVessel& operator=(const RedLockMutexVessel &) = delete; + + RedLockMutexVessel(RedLockMutexVessel &&) = delete; + RedLockMutexVessel& operator=(RedLockMutexVessel &&) = delete; + + ~RedLockMutexVessel() = default; + + // The LockInfo struct can be used for chaining a lock to + // one or more extend_locks and finally to an unlock. + // All the lock's information is contained, so multiple + // locks could be handled with a single RedLockMutexVessel instance. + struct LockInfo { + bool locked; + std::chrono::time_point startTime; + std::chrono::milliseconds time_remaining; + std::string resource; + std::string random_string; + }; + + // RedLockMutexVessel::lock will (re)try to get a lock until either: + // - it gets a lock on (n/2)+1 of the instances. + // - a period of TTL elapsed. + // - the number of retries was reached. + // - an exception was thrown. + LockInfo lock(const std::string& resource, + const std::string& random_string, + const std::chrono::milliseconds& ttl, + int retry_count = 3, + const std::chrono::milliseconds& retry_delay = std::chrono::milliseconds(200), + double clock_drift_factor = 0.01); + + // RedLockMutexVessel::extend_lock is exactly the same as RedLockMutexVessel::lock, + // but needs LockInfo from a previously acquired lock. + LockInfo extend_lock(const LockInfo& lock_info, + const std::chrono::milliseconds& ttl, + double clock_drift_factor = 0.01); + + // RedLockMutexVessel::unlock unlocks all locked instances, + // that was locked with LockInfo, + void unlock(const LockInfo& lock_info); + +private: + + bool _lock_instance(Redis& instance, + const std::string& resource, + const std::string& random_string, + const std::chrono::milliseconds& ttl); + + bool _extend_lock_instance(Redis& instance, + const std::string& resource, + const std::string& random_string, + const std::chrono::milliseconds& ttl); + + void _unlock_instance(Redis& instance, + const std::string& resource, + const std::string& random_string); + + int _quorum() const { + return _instances.size() / 2 + 1; + } + + std::vector> _instances; +}; + +class RedLockMutex +{ +public: + explicit RedLockMutex(Redis& instance, const std::string& resource) : + _redlock_mutex(instance), _resource(resource) {} + + explicit RedLockMutex(std::initializer_list> instances, + const std::string &resource) : + _redlock_mutex(instances), _resource(resource) {} + + RedLockMutex(const RedLockMutex &) = delete; + RedLockMutex& operator=(const RedLockMutex &) = delete; + + RedLockMutex(RedLockMutex &&) = delete; + RedLockMutex& operator=(RedLockMutex &&) = delete; + + virtual ~RedLockMutex() = default; + + std::chrono::milliseconds try_lock(const std::string& random_string, + const std::chrono::milliseconds& ttl) + { + const auto lock_info = _redlock_mutex.lock(_resource, random_string, ttl, 1); + if (!lock_info.locked) { + throw Error("failed to lock: " + _resource); + } + return lock_info.time_remaining; + } + + bool try_lock(const std::string &random_string, + const std::chrono::time_point &tp) + { + const auto lock_info = _redlock_mutex.lock(_resource, random_string, RedLockUtils::ttl(tp), 1); + return lock_info.locked; + } + + bool extend_lock(const std::string &random_string, + const std::chrono::time_point &tp) + { + const auto ttl = RedLockUtils::ttl(tp); + const RedLockMutexVessel::LockInfo lock_info = + {true, std::chrono::steady_clock::now(), ttl, _resource, random_string}; + const auto result = _redlock_mutex.extend_lock(lock_info, ttl); + return result.locked; + } + + void unlock(const std::string &random_string) + { + _redlock_mutex.unlock({true, std::chrono::steady_clock::now(), + std::chrono::milliseconds(0), _resource, random_string}); + } + +private: + RedLockMutexVessel _redlock_mutex; + const std::string _resource; +}; + } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 861e6ff8..e7414cd2 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -17,7 +17,16 @@ find_path(HIREDIS_HEADER hiredis) target_include_directories(${PROJECT_NAME} PUBLIC ${HIREDIS_HEADER}) find_library(HIREDIS_STATIC_LIB libhiredis.a) -target_link_libraries(${PROJECT_NAME} ${HIREDIS_STATIC_LIB}) + +find_package (OpenSSL) + +if (OPENSSL_FOUND) + target_compile_definitions(${PROJECT_NAME} PUBLIC -DUSE_OPENSSL) + include_directories(${OPENSSL_INCLUDE_DIR}) + target_link_libraries(${PROJECT_NAME} ${HIREDIS_STATIC_LIB} ${OPENSSL_CRYPTO_LIBRARY}) +else (OPENSSL_FOUND) + target_link_libraries(${PROJECT_NAME} ${HIREDIS_STATIC_LIB}) +endif (OPENSSL_FOUND) # redis++ dependency target_include_directories(${PROJECT_NAME} PUBLIC ../src) @@ -30,4 +39,8 @@ ENDIF(CMAKE_SYSTEM_NAME MATCHES "(Solaris|SunOS)" ) find_package(Threads REQUIRED) -target_link_libraries(${PROJECT_NAME} ${REDIS_PLUS_PLUS_LIB} ${CMAKE_THREAD_LIBS_INIT}) +if (OPENSSL_FOUND) + target_link_libraries(${PROJECT_NAME} ${REDIS_PLUS_PLUS_LIB} ${CMAKE_THREAD_LIBS_INIT} ${OPENSSL_CRYPTO_LIBRARY}) +else (OPENSSL_FOUND) + target_link_libraries(${PROJECT_NAME} ${REDIS_PLUS_PLUS_LIB} ${CMAKE_THREAD_LIBS_INIT}) +endif (OPENSSL_FOUND) diff --git a/test/src/sw/redis++/redlock_test.h b/test/src/sw/redis++/redlock_test.h new file mode 100644 index 00000000..d2af3322 --- /dev/null +++ b/test/src/sw/redis++/redlock_test.h @@ -0,0 +1,47 @@ +/************************************************************************** + Copyright (c) 2019 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + *************************************************************************/ + +#ifndef REDISPLUSPLUS_TEST_REDLOCK_TEST_H +#define REDISPLUSPLUS_TEST_REDLOCK_TEST_H + +#include + +namespace sw { + +namespace redis { + +namespace test { + +template +class RedLockTest { +public: + explicit RedLockTest(RedisInstance &instance) : _redis(instance) {} + + void run(); + +private: + RedisInstance &_redis; +}; + +} // namespace test + +} // namespace redis + +} // namespace sw + +#include "redlock_test.hpp" + +#endif // end REDISPLUSPLUS_TEST_REDLOCK_TEST_H diff --git a/test/src/sw/redis++/redlock_test.hpp b/test/src/sw/redis++/redlock_test.hpp new file mode 100644 index 00000000..b7fa604f --- /dev/null +++ b/test/src/sw/redis++/redlock_test.hpp @@ -0,0 +1,306 @@ +/************************************************************************** + Copyright (c) 2019 + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + *************************************************************************/ + +#ifndef REDISPLUSPLUS_REDLOCK_CMDS_TEST_HPP +#define REDISPLUSPLUS_REDLOCK_CMDS_TEST_HPP + +#include +#include +#include +#include + +namespace sw { + +namespace redis { + +namespace test { + +class RandomBufferInterface +{ +public: + virtual ~RandomBufferInterface() {} + virtual std::string get_updated_string() = 0; +}; + +#ifdef USE_OPENSSL +#include + +template +class RandomBuffer : public RandomBufferInterface +{ +public: + RandomBuffer() : + _in_idx(0), + _out_idx(1) + { + int random_num = std::rand(); + const auto rn_size = sizeof(random_num); + for (size_t i=0; i= rn_size) ? rn_size : N - i); + random_num = std::rand(); + } + RC4_set_key(&_key, N, _data[0]); + } + + std::string get_updated_string() { + RC4(&_key, N, _data[_in_idx], _data[_out_idx]); + // Swap the in and out buffers. + if (_in_idx == 0) { + _in_idx = 1; + _out_idx = 0; + } + else { + _in_idx = 0; + _out_idx = 1; + } + return std::string((char*)(_data[_in_idx]), N); + } + +private: + uint8_t _in_idx; + uint8_t _out_idx; + uint8_t _data[2][N]; + RC4_KEY _key; +}; + +#else // !USE_OPENSSL + +template +class RandomBuffer : public sw::redis::RandomBufferInterface +{ +public: + std::string get_updated_string() { + return RedLockMutexVessel::lock_id(); + } +}; + +#endif // USE_OPENSSL + +template <> +void RedLockTest::run() { + // Not applicable. +} + +template <> +void RedLockTest::run() { + std::srand(std::time(nullptr)); + RandomBuffer<> random_buffer; + + RedLockMutexVessel redlock(std::ref(_redis)); + + const auto resource = test_key(RedLockUtils::lock_id()); + const auto random_string = random_buffer.get_updated_string(); + const std::chrono::milliseconds ttl(200); + + + // Test if we can obtain a lock. + { + const auto lock_info = redlock.lock(resource, random_string, ttl); + if (lock_info.locked) { + redlock.unlock(lock_info); + } + else { + REDIS_ASSERT(0, "unable to obtain a lock"); + } + } + + const int n = 1000; + auto start = std::chrono::system_clock::now(); + // Use a 2 second TTL for the multi lock tests, as + // getting all these locks might take long. + const std::chrono::milliseconds multi_lock_ttl(2000); + // Test if we can obtain n locks with 1 RedLockMutexVessel instance. + { + std::queue lock_infoList; + for (int i=0; i diff = end-start; + std::cout << "Time to lock and unlock " << n << " simultaneous locks with RedLockMutexVessel: " << diff.count() << " s" << std::endl;; + + start = std::chrono::system_clock::now(); + // Test if we can obtain n locks with a n RedLockMutex instances. + { + std::queue mutex_list; + for (int i=0; i tp = std::chrono::system_clock::now() + multi_lock_ttl; + if (!(mutex_list.back()->try_lock(random_string, tp))) { + std::cout << "Num locks = " << i << std::endl;; + REDIS_ASSERT(0, "unable to obtain a lock"); + } + } + while (mutex_list.size() != 0) { + mutex_list.front()->unlock(random_string); + delete(mutex_list.front()); + mutex_list.pop(); + } + } + end = std::chrono::system_clock::now(); + diff = end-start; + std::cout << "Time to lock and unlock " << n << " simultaneous locks with RedLockMutex: " << diff.count() << " s" << std::endl;; + + start = std::chrono::system_clock::now(); + // Test if we can obtain a n locks with a n RedMutex instances. + { + std::queue mutex_list; + for (int i=0; i tp = std::chrono::system_clock::now() + multi_lock_ttl; + mutex_list.push(new RedMutex(std::ref(_redis), RedLockUtils::lock_id())); + if (!(mutex_list.back()->try_lock(random_string, tp))) { + REDIS_ASSERT(0, "unable to obtain a lock"); + } + } + while (mutex_list.size() != 0) { + mutex_list.front()->unlock(random_string); + delete(mutex_list.front()); + mutex_list.pop(); + } + } + end = std::chrono::system_clock::now(); + diff = end-start; + std::cout << "Time to lock and unlock " << n << " simultaneous locks with RedMutex: " << diff.count() << " s" << std::endl;; + + // Test if the lock fails if we try to lock a key, after + // a lock was already obtained. + { + const auto lock_info = redlock.lock(resource, random_string, ttl); + if (lock_info.locked) { + const auto lock_infoFail = redlock.lock(resource, random_string, ttl, 1); + if (lock_infoFail.locked) { + redlock.unlock(lock_infoFail); + REDIS_ASSERT(0, "managed to get lock from redlock, while redlock was locked"); + } + redlock.unlock(lock_info); + } + else { + REDIS_ASSERT(0, "unable to obtain a lock"); + } + } + + // Test if the lock succeeds if we're trying to obtain a lock, + // after the TTL expired of the original lock, when the original + // lock was not unlocked. + { + const auto lock_info = redlock.lock(resource, random_string, ttl); + if (lock_info.locked) { + // Sleep TTL duration + 50 mSec. + std::this_thread::sleep_for(ttl + std::chrono::milliseconds(50)); + const auto lock_info_ok = redlock.lock(resource, random_string, ttl); + if (lock_info_ok.locked) { + redlock.unlock(lock_info_ok); + } + else { + redlock.unlock(lock_info); + REDIS_ASSERT(0, "redlock lock was not automatically released after TTL expired"); + } + } + else { + REDIS_ASSERT(0, "unable to obtain a lock"); + } + } + + // Test if the lock succeeds if we're trying to obtain a lock, + // after 2/3 of TTL expired of the original lock. The new lock + // should have enough time to aquire a lock, if we give it 20 + // retries. + { + const auto lock_info = redlock.lock(resource, random_string, ttl); + if (lock_info.locked) { + // Sleep 2/3 of the TTL duration. + std::this_thread::sleep_for(ttl * 2 / 3); + // We'll now try to get a lock with for TTL/2 mSec, retrying every mSec. + const auto lock_info_ok = redlock.lock(resource, random_string, ttl, ttl.count() / 2, std::chrono::milliseconds(1)); + if (lock_info_ok.locked) { + redlock.unlock(lock_info_ok); + } + else { + redlock.unlock(lock_info); + REDIS_ASSERT(0, "redlock lock retry machanism failed"); + } + } + else { + REDIS_ASSERT(0, "unable to obtain a lock"); + } + } + + // Test if the lock is extendable. + { + const auto lock_info = redlock.lock(resource, random_string, ttl); + if (lock_info.locked) { + // We'll sleep 2/3 of the ttl. + std::this_thread::sleep_for(ttl * 2 / 3); + // Now we have 1/3 of the ttl left, so we extend it by ttl. + const auto lock_info_ext = redlock.extend_lock(lock_info, ttl); + if (lock_info_ext.locked) { + // We'll sleep 2/3 of the ttl. + std::this_thread::sleep_for(ttl * 2 / 3); + // Now we have 1/3 of the ttl left, if the extend_lock worked, + // so locking should fail if we do a single lock (no retrying). + const auto lock_infoTime = redlock.lock(resource, random_string, ttl, 0); + if (lock_infoTime.locked) { + redlock.unlock(lock_infoTime); + REDIS_ASSERT(0, "redlock extend_lock failed to extend the timeout"); + } + else { + redlock.unlock(lock_info_ext); + } + } + else { + redlock.unlock(lock_info); + REDIS_ASSERT(0, "redlock extend_lock failed, although the lock exists"); + } + } + else { + REDIS_ASSERT(0, "unable to obtain a lock"); + } + } + + // Locking should fail, on duplicate instances. + { + // We now use the same instance twice, which is expected to fail on locking. + RedLockMutexVessel redlock_2_identical_instances({std::ref(_redis), std::ref(_redis)}); + const auto lock_info = redlock_2_identical_instances.lock(resource, random_string, ttl); + if (lock_info.locked) { + redlock_2_identical_instances.unlock(lock_info); + REDIS_ASSERT(0, "redlock managed to lock the same instance twice"); + } + else { + redlock_2_identical_instances.unlock(lock_info); + } + } + +} + +} // namespace test + +} // namespace redis + +} // namespace sw + +#endif // REDISPLUSPLUS_REDLOCK_CMDS_TEST_HPP diff --git a/test/src/sw/redis++/test_main.cpp b/test/src/sw/redis++/test_main.cpp index dd2880d1..b9b0c726 100644 --- a/test/src/sw/redis++/test_main.cpp +++ b/test/src/sw/redis++/test_main.cpp @@ -20,6 +20,7 @@ #include #include #include "sanity_test.h" +#include "redlock_test.h" #include "connection_cmds_test.h" #include "keys_cmds_test.h" #include "string_cmds_test.h" @@ -219,6 +220,11 @@ void run_test(const sw::redis::ConnectionOptions &opts) { std::cout << "Pass connection commands tests" << std::endl; + sw::redis::test::RedLockTest redlock_test(instance); + redlock_test.run(); + + std::cout << "Pass redlock tests" << std::endl; + sw::redis::test::KeysCmdTest keys_test(instance); keys_test.run();