From b365037e232d5bde78c5a24a68d5beae52a05bfa Mon Sep 17 00:00:00 2001 From: Matej Ferencevic <matej.ferencevic@memgraph.io> Date: Tue, 16 Jul 2019 10:32:06 +0200 Subject: [PATCH] Cleanup locks Summary: Move RWLock and replace exceptions with `CHECK`s Reviewers: mtomic, teon.banek Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2205 --- src/utils/CMakeLists.txt | 1 - src/utils/rw_lock.hpp | 124 +++++++++++++ src/utils/thread/sync.cpp | 164 ------------------ src/utils/thread/sync.hpp | 134 -------------- tests/concurrent/CMakeLists.txt | 3 - tests/concurrent/futex.cpp | 46 ----- .../clients/card_fraud_client.cpp | 4 +- tests/unit/utils_rwlock.cpp | 27 ++- 8 files changed, 138 insertions(+), 365 deletions(-) create mode 100644 src/utils/rw_lock.hpp delete mode 100644 src/utils/thread/sync.cpp delete mode 100644 tests/concurrent/futex.cpp diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index 72769eb83..ee8d2dac8 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -4,7 +4,6 @@ set(utils_src_files memory.cpp signals.cpp thread.cpp - thread/sync.cpp uuid.cpp watchdog.cpp) diff --git a/src/utils/rw_lock.hpp b/src/utils/rw_lock.hpp new file mode 100644 index 000000000..f8ba3e758 --- /dev/null +++ b/src/utils/rw_lock.hpp @@ -0,0 +1,124 @@ +/// @file +#pragma once + +#include <pthread.h> +#include <unistd.h> + +#include <cerrno> + +#include <glog/logging.h> + +namespace utils { + +/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to +/// choose read or write priority for `std::shared_mutex`. +class RWLock { + public: + /// By passing the appropriate parameter to the `RWLock` constructor, it is + /// possible to control the behavior of `RWLock` while shared lock is held. If + /// the priority is set to `READ`, new shared (read) locks can be obtained + /// even though there is a thread waiting for an exclusive (write) lock, which + /// can lead to writer starvation. If the priority is set to `WRITE`, readers + /// will be blocked from obtaining new shared locks while there are writers + /// waiting, which can lead to reader starvation. + enum class Priority { READ, WRITE }; + + /// Construct a RWLock object with chosen priority. See comment above + /// `RWLockPriority` for details. + explicit RWLock(Priority priority) { + pthread_rwlockattr_t attr; + + CHECK(pthread_rwlockattr_init(&attr) == 0) + << "Couldn't initialize utils::RWLock!"; + + switch (priority) { + case Priority::READ: + pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_READER_NP); + break; + case Priority::WRITE: + // There is also `PTHREAD_RWLOCK_PREFER_WRITER_NP` but it is not + // providing the desired behavior. + // + // From `man 7 pthread_rwlockattr_setkind_np`: + // "Setting the value read-write lock kind to + // PTHREAD_RWLOCK_PREFER_WRITER_NP results in the same behavior as + // setting the value to PTHREAD_RWLOCK_PREFER_READER_NP. As long as a + // reader thread holds the lock, the thread holding a write lock will be + // starved. Setting the lock kind to + // PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP allows writers to run, + // but, as the name implies a writer may not lock recursively." + // + // For this reason, `RWLock` should not be used recursively. + pthread_rwlockattr_setkind_np( + &attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + break; + } + + CHECK(pthread_rwlock_init(&lock_, &attr) == 0) + << "Couldn't initialize utils::RWLock!"; + pthread_rwlockattr_destroy(&attr); + } + + RWLock(const RWLock &) = delete; + RWLock &operator=(const RWLock &) = delete; + RWLock(RWLock &&) = delete; + RWLock &operator=(RWLock &&) = delete; + + ~RWLock() { pthread_rwlock_destroy(&lock_); } + + void lock() { + CHECK(pthread_rwlock_wrlock(&lock_) == 0) << "Couldn't lock utils::RWLock!"; + } + + bool try_lock() { + int err = pthread_rwlock_trywrlock(&lock_); + if (err == 0) return true; + CHECK(err == EBUSY) << "Couldn't try lock utils::RWLock!"; + return false; + } + + void unlock() { + CHECK(pthread_rwlock_unlock(&lock_) == 0) + << "Couldn't unlock utils::RWLock!"; + } + + void lock_shared() { + int err; + while (true) { + err = pthread_rwlock_rdlock(&lock_); + if (err == 0) { + return; + } else if (err == EAGAIN) { + continue; + } else { + LOG(FATAL) << "Couldn't lock shared utils::RWLock!"; + } + } + } + + bool try_lock_shared() { + int err; + while (true) { + err = pthread_rwlock_tryrdlock(&lock_); + if (err == 0) { + return true; + } else if (err == EBUSY) { + return false; + } else if (err == EAGAIN) { + continue; + } else { + LOG(FATAL) << "Couldn't try lock shared utils::RWLock!"; + } + } + } + + void unlock_shared() { + CHECK(pthread_rwlock_unlock(&lock_) == 0) + << "Couldn't unlock shared utils::RWLock!"; + } + + private: + pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER; +}; + +} // namespace utils diff --git a/src/utils/thread/sync.cpp b/src/utils/thread/sync.cpp deleted file mode 100644 index 0e7ab2bfd..000000000 --- a/src/utils/thread/sync.cpp +++ /dev/null @@ -1,164 +0,0 @@ -#include "utils/thread/sync.hpp" - -#include <linux/futex.h> -#include <sys/syscall.h> -#include <sys/types.h> -#include <unistd.h> - -namespace sys { -inline int futex(void *addr1, int op, int val1, const struct timespec *timeout, - void *addr2, int val3) { - return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); -}; - -} // namespace sys - -namespace utils { - -RWLock::RWLock(RWLockPriority priority) { - int err; - pthread_rwlockattr_t attr; - - err = pthread_rwlockattr_init(&attr); - if (err != 0) { - throw std::system_error(err, std::system_category()); - } - - switch (priority) { - case RWLockPriority::READ: - pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_READER_NP); - break; - case RWLockPriority::WRITE: - /* There is also `PTHREAD_RWLOCK_PREFER_WRITER_NP` but it is not - * providing the desired behavior. - * - * From `man 7 pthread_rwlockattr_setkind_np`: - * "Setting the value read-write lock kind to - * PTHREAD_RWLOCK_PREFER_WRITER_NP results in the same behavior as - * setting the value to PTHREAD_RWLOCK_PREFER_READER_NP. As long as a - * reader thread holds the lock, the thread holding a write lock will be - * starved. Setting the lock kind to - * PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP allows writers to run, - * but, as the name implies a writer may not lock recursively." - * - * For this reason, `RWLock` should not be used recursively. - * */ - pthread_rwlockattr_setkind_np( - &attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); - break; - } - - err = pthread_rwlock_init(&lock_, &attr); - pthread_rwlockattr_destroy(&attr); - - if (err != 0) { - throw std::system_error(err, std::system_category()); - } -} - -RWLock::~RWLock() { pthread_rwlock_destroy(&lock_); } - -void RWLock::lock() { - int err = pthread_rwlock_wrlock(&lock_); - if (err != 0) { - throw std::system_error(err, std::system_category()); - } -} - -bool RWLock::try_lock() { - int err = pthread_rwlock_trywrlock(&lock_); - if (err == 0) return true; - if (err == EBUSY) return false; - throw std::system_error(err, std::system_category()); -} - -void RWLock::unlock() { - int err = pthread_rwlock_unlock(&lock_); - if (err != 0) { - throw std::system_error(err, std::system_category()); - } -} - -void RWLock::lock_shared() { - int err = pthread_rwlock_rdlock(&lock_); - if (err != 0) { - throw std::system_error(err, std::system_category()); - } -} - -bool RWLock::try_lock_shared() { - int err = pthread_rwlock_tryrdlock(&lock_); - if (err == 0) return true; - if (err == EBUSY) return false; - throw std::system_error(err, std::system_category()); -} - -void RWLock::unlock_shared() { - int err = pthread_rwlock_unlock(&lock_); - if (err != 0) { - throw std::system_error(err, std::system_category()); - } -} - -void Futex::lock(const struct timespec *timeout) { - // try to fast lock a few times before going to sleep - for (size_t i = 0; i < LOCK_RETRIES; ++i) { - // try to lock and exit if we succeed - if (try_lock()) return; - - // we failed, chill a bit - relax(); - } - - // the lock is contended, go to sleep. when someone - // wakes you up, try taking the lock again - while (mutex.all.exchange(LOCKED_CONTENDED, std::memory_order_seq_cst) & - LOCKED) { - // wait in the kernel for someone to wake us up when unlocking - auto status = futex_wait(LOCKED_CONTENDED, timeout); - - // check if we woke up because of a timeout - if (status == -1 && errno == ETIMEDOUT) - throw LockTimeoutException("Lock timeout"); - } -} - -void Futex::unlock() { - futex_t state = LOCKED; - - // if we're locked and uncontended, try to unlock the mutex before - // it becomes contended - if (mutex.all.load(std::memory_order_seq_cst) == LOCKED && - mutex.all.compare_exchange_strong(state, UNLOCKED, - std::memory_order_seq_cst, - std::memory_order_seq_cst)) - return; - - // we are contended, just release the lock - mutex.state.locked.store(UNLOCKED, std::memory_order_seq_cst); - - // spin and hope someone takes a lock so we don't have to wake up - // anyone because that's quite expensive - for (size_t i = 0; i < UNLOCK_RETRIES; ++i) { - // if someone took the lock, we're ok - if (is_locked(std::memory_order_seq_cst)) return; - - relax(); - } - - // store that we are becoming uncontended - mutex.state.contended.store(UNCONTENDED, std::memory_order_seq_cst); - - // we need to wake someone up - futex_wake(LOCKED); -} - -int Futex::futex_wait(int value, const struct timespec *timeout) { - return sys::futex(&mutex.all, FUTEX_WAIT_PRIVATE, value, timeout, nullptr, 0); -} - -void Futex::futex_wake(int value) { - sys::futex(&mutex.all, FUTEX_WAKE_PRIVATE, value, nullptr, nullptr, 0); -} - -} // namespace utils diff --git a/src/utils/thread/sync.hpp b/src/utils/thread/sync.hpp index 03496bd32..c5b8de8fc 100644 --- a/src/utils/thread/sync.hpp +++ b/src/utils/thread/sync.hpp @@ -1,11 +1,6 @@ /// @file #pragma once -#include <pthread.h> -#include <unistd.h> - -#include <atomic> -#include <cstdint> #include <mutex> #include "utils/exceptions.hpp" @@ -27,61 +22,6 @@ class LockTimeoutException : public BasicException { using BasicException::BasicException; }; -class CasLock { - public: - void lock() { - bool locked = false; - - while (!lock_flag.compare_exchange_weak( - locked, true, std::memory_order_release, std::memory_order_relaxed)) { - usleep(250); - } - } - - void unlock() { lock_flag.store(0, std::memory_order_release); } - - bool locked() { return lock_flag.load(std::memory_order_relaxed); } - - private: - std::atomic<bool> lock_flag; -}; - -/// By passing the appropriate parameter to the `RWLock` constructor, it is -/// possible to control the behavior of `RWLock` while shared lock is held. If -/// the priority is set to `READ`, new shared (read) locks can be obtained even -/// though there is a thread waiting for an exclusive (write) lock, which can -/// lead to writer starvation. If the priority is set to `WRITE`, readers will -/// be blocked from obtaining new shared locks while there are writers waiting, -/// which can lead to reader starvation. -enum RWLockPriority { READ, WRITE }; - -/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to -/// choose read or write priority for `std::shared_mutex`. -class RWLock { - public: - RWLock(const RWLock &) = delete; - RWLock &operator=(const RWLock &) = delete; - RWLock(RWLock &&) = delete; - RWLock &operator=(RWLock &&) = delete; - - /// Construct a RWLock object with chosen priority. See comment above - /// `RWLockPriority` for details. - explicit RWLock(RWLockPriority priority); - - ~RWLock(); - - bool try_lock(); - void lock(); - void unlock(); - - bool try_lock_shared(); - void lock_shared(); - void unlock_shared(); - - private: - pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER; -}; - /// Lockable is used as an custom implementation of a mutex mechanism. /// /// It is implemented as a wrapper around std::lock_guard and std::unique_guard @@ -104,78 +44,4 @@ class Lockable { mutable lock_t lock; }; -class Futex { - using futex_t = uint32_t; - using flag_t = uint8_t; - - /// Data structure for implementing fast mutexes - /// - /// This structure is 4B wide, as required for futex system call where - /// the last two bytes are used for two flags - contended and locked, - /// respectively. Memory layout for the structure looks like this: - /// - /// all - /// |---------------------------------| - /// 00000000 00000000 0000000C 0000000L - /// |------| |------| - /// contended locked - /// - /// L marks the locked bit - /// C marks the contended bit - union mutex_t { - std::atomic<futex_t> all{0}; - - struct { - std::atomic<flag_t> locked; - std::atomic<flag_t> contended; - } state; - }; - - enum Contention : futex_t { UNCONTENDED = 0x0000, CONTENDED = 0x0100 }; - - enum State : futex_t { - UNLOCKED = 0x0000, - LOCKED = 0x0001, - UNLOCKED_CONTENDED = UNLOCKED | CONTENDED, // 0x0100 - LOCKED_CONTENDED = LOCKED | CONTENDED // 0x0101 - }; - - static constexpr size_t LOCK_RETRIES = 100; - static constexpr size_t UNLOCK_RETRIES = 200; - - public: - Futex() { - static_assert(sizeof(mutex_t) == sizeof(futex_t), - "Atomic futex should be the same size as non_atomic"); - } - - bool try_lock() { - // we took the lock if we stored the LOCKED state and previous - // state was UNLOCKED - return mutex.state.locked.exchange(LOCKED, std::memory_order_seq_cst) == - UNLOCKED; - } - - void lock(const struct timespec *timeout = nullptr); - - void unlock(); - - bool is_locked(std::memory_order order = std::memory_order_seq_cst) const { - return mutex.state.locked.load(order); - } - - bool is_contended(std::memory_order order = std::memory_order_seq_cst) const { - return mutex.state.contended.load(order); - } - - private: - mutex_t mutex; - - int futex_wait(int value, const struct timespec *timeout = nullptr); - - void futex_wake(int value); - - void relax() { CpuRelax(); } -}; - } // namespace utils diff --git a/tests/concurrent/CMakeLists.txt b/tests/concurrent/CMakeLists.txt index 4cc5b5ddd..403f9e5eb 100644 --- a/tests/concurrent/CMakeLists.txt +++ b/tests/concurrent/CMakeLists.txt @@ -29,9 +29,6 @@ target_link_libraries(${test_prefix}dynamic_bitset_set mg-single-node kvstore_du add_concurrent_test(dynamic_bitset_set_n.cpp) target_link_libraries(${test_prefix}dynamic_bitset_set_n mg-single-node kvstore_dummy_lib) -add_concurrent_test(futex.cpp) -target_link_libraries(${test_prefix}futex mg-single-node kvstore_dummy_lib) - add_concurrent_test(network_read_hang.cpp) target_link_libraries(${test_prefix}network_read_hang mg-single-node kvstore_dummy_lib) diff --git a/tests/concurrent/futex.cpp b/tests/concurrent/futex.cpp deleted file mode 100644 index e44a453b7..000000000 --- a/tests/concurrent/futex.cpp +++ /dev/null @@ -1,46 +0,0 @@ -#include <chrono> -#include <mutex> -#include <random> -#include <thread> - -#include <glog/logging.h> - -#include "utils/thread/sync.hpp" - -utils::Futex futex; -int x = 0; - -/** - * @param thread id - */ -void test_lock(int) { - std::random_device rd; - std::mt19937 gen(rd()); - std::uniform_int_distribution<> dis(0, 1000); - - // TODO: create long running test - for (int i = 0; i < 5; ++i) { - { - std::unique_lock<utils::Futex> guard(futex); - x++; - std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); - CHECK(x == 1) << "Other thread shouldn't be able to " - "change the value of x"; - x--; - } - std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); - } -} - -int main(void) { - constexpr int N = 16; - std::vector<std::thread> threads; - - for (int i = 0; i < N; ++i) threads.push_back(std::thread(test_lock, i)); - - for (auto& thread : threads) { - thread.join(); - } - - return 0; -} diff --git a/tests/macro_benchmark/clients/card_fraud_client.cpp b/tests/macro_benchmark/clients/card_fraud_client.cpp index 7559b700e..2e4e2b2b6 100644 --- a/tests/macro_benchmark/clients/card_fraud_client.cpp +++ b/tests/macro_benchmark/clients/card_fraud_client.cpp @@ -8,7 +8,7 @@ #include "communication/rpc/client.hpp" #include "stats/stats.hpp" #include "stats/stats_rpc_messages.hpp" -#include "utils/thread/sync.hpp" +#include "utils/rw_lock.hpp" #include "long_running_common.hpp" @@ -17,7 +17,7 @@ std::atomic<int64_t> num_cards; std::atomic<int64_t> num_transactions; std::atomic<int64_t> max_tx_id; -utils::RWLock world_lock(utils::RWLockPriority::WRITE); +utils::RWLock world_lock(utils::RWLock::Priority::WRITE); DEFINE_string(config, "", "test config"); diff --git a/tests/unit/utils_rwlock.cpp b/tests/unit/utils_rwlock.cpp index a45b56b74..57d854c7b 100644 --- a/tests/unit/utils_rwlock.cpp +++ b/tests/unit/utils_rwlock.cpp @@ -4,22 +4,19 @@ #include "glog/logging.h" #include "gtest/gtest.h" -#include "utils/thread/sync.hpp" +#include "utils/rw_lock.hpp" #include "utils/timer.hpp" using namespace std::chrono_literals; -using utils::RWLock; -using utils::RWLockPriority; - TEST(RWLock, MultipleReaders) { - RWLock rwlock(RWLockPriority::READ); + utils::RWLock rwlock(utils::RWLock::Priority::READ); std::vector<std::thread> threads; utils::Timer timer; for (int i = 0; i < 3; ++i) { threads.push_back(std::thread([&rwlock] { - std::shared_lock<RWLock> lock(rwlock); + std::shared_lock<utils::RWLock> lock(rwlock); std::this_thread::sleep_for(100ms); })); } @@ -33,13 +30,13 @@ TEST(RWLock, MultipleReaders) { } TEST(RWLock, SingleWriter) { - RWLock rwlock(RWLockPriority::READ); + utils::RWLock rwlock(utils::RWLock::Priority::READ); std::vector<std::thread> threads; utils::Timer timer; for (int i = 0; i < 3; ++i) { threads.push_back(std::thread([&rwlock] { - std::unique_lock<RWLock> lock(rwlock); + std::unique_lock<utils::RWLock> lock(rwlock); std::this_thread::sleep_for(100ms); })); } @@ -59,19 +56,19 @@ TEST(RWLock, ReadPriority) { * - Thread 2 successfuly acquires a shared lock at T = 60ms, even though * there's a writer waiting. */ - RWLock rwlock(RWLockPriority::READ); + utils::RWLock rwlock(utils::RWLock::Priority::READ); rwlock.lock_shared(); bool first = true; std::thread t1([&rwlock, &first] { std::this_thread::sleep_for(30ms); - std::unique_lock<RWLock> lock(rwlock); + std::unique_lock<utils::RWLock> lock(rwlock); EXPECT_FALSE(first); }); std::thread t2([&rwlock, &first] { std::this_thread::sleep_for(60ms); - std::shared_lock<RWLock> lock(rwlock); + std::shared_lock<utils::RWLock> lock(rwlock); EXPECT_TRUE(first); first = false; }); @@ -89,20 +86,20 @@ TEST(RWLock, WritePriority) { * - Thread 2 tries to acquire a shared lock at T = 60ms, but it is not able * to because of write priority. */ - RWLock rwlock(RWLockPriority::WRITE); + utils::RWLock rwlock(utils::RWLock::Priority::WRITE); rwlock.lock_shared(); bool first = true; std::thread t1([&rwlock, &first] { std::this_thread::sleep_for(30ms); - std::unique_lock<RWLock> lock(rwlock); + std::unique_lock<utils::RWLock> lock(rwlock); EXPECT_TRUE(first); first = false; }); std::thread t2([&rwlock, &first] { std::this_thread::sleep_for(60ms); - std::shared_lock<RWLock> lock(rwlock); + std::shared_lock<utils::RWLock> lock(rwlock); EXPECT_FALSE(first); }); @@ -114,7 +111,7 @@ TEST(RWLock, WritePriority) { } TEST(RWLock, TryLock) { - RWLock rwlock(RWLockPriority::WRITE); + utils::RWLock rwlock(utils::RWLock::Priority::WRITE); rwlock.lock(); std::thread t1([&rwlock] { EXPECT_FALSE(rwlock.try_lock()); });