diff --git a/examples/futex.cpp b/examples/futex.cpp new file mode 100644 index 000000000..af0bc3371 --- /dev/null +++ b/examples/futex.cpp @@ -0,0 +1,58 @@ +#include <iostream> +#include <thread> +#include <chrono> +#include <mutex> +#include <vector> +#include <cassert> +#include <random> + +#include "threading/sync/futex.hpp" +#include "debug/log.hpp" + +Futex futex; +int x = 0; + +void test_lock(int id) +{ + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 1000); + + for(int i = 0; i < 100000; ++i) + { + // uncomment sleeps and LOG_DEBUGs to test high contention + + LOG_DEBUG("Acquiring Futex (" << id << ")"); + + { + std::unique_lock<Futex> guard(futex); + x++; + + std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); + + LOG_DEBUG("Critical section no. " << i << " (" << id << ")"); + assert(x == 1); + + x--; + } + + LOG_DEBUG("Non Critical section... (" << id << ")"); + std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); + } +} + +int main(void) +{ + constexpr int N = 128; + + std::vector<std::thread> threads; + + for(int i = 0; i < N; ++i) + threads.push_back(std::thread(test_lock, i)); + + for(auto& thread : threads){ + thread.join(); + } + + return 0; +} diff --git a/threading/sync/futex.hpp b/threading/sync/futex.hpp new file mode 100644 index 000000000..6c208c021 --- /dev/null +++ b/threading/sync/futex.hpp @@ -0,0 +1,125 @@ +#pragma once + +#include <stdint.h> +#include <errno.h> +#include <atomic> + +#include "lock_timeout_error.hpp" +#include "utils/cpu_relax.hpp" +#include "utils/sys.hpp" + +class Futex +{ + using futex_t = int32_t; + + union mutex_t + { + std::atomic<futex_t> all {0}; + + struct + { + std::atomic<uint8_t> locked; + std::atomic<uint8_t> contended; + } state; + }; + + enum Contension : futex_t + { + UNCONTENDED = 0x0000, + CONTENDED = 0x0100 + }; + + enum State : futex_t + { + UNLOCKED = 0x0000, + LOCKED = 0x0001, + UNLOCKED_CONTENDED = UNLOCKED | CONTENDED, + LOCKED_CONTENDED = LOCKED | CONTENDED + }; + + static constexpr size_t LOCK_RETRIES = 256; + static constexpr size_t UNLOCK_RETRIES = 512; + +public: + Futex() + { + static_assert(sizeof(mutex_t) == sizeof(futex_t), + "Atomic futex should be the same size as non_atomic"); + } + + bool try_lock() + { + return mutex.state.locked.exchange(LOCKED, std::memory_order_acquire) + == UNLOCKED; + } + + void lock(const struct timespec* timeout = nullptr) + { + // try to fast lock a few times before going to sleep + for(size_t i = 0; i < LOCK_RETRIES; ++i) + { + if(try_lock()) + return; + + // we failed, chill a bit + cpu_relax(); + } + + // the lock is contended, go to sleep. when someone + // wakes you up, try taking the lock again + while(mutex.all.exchange(LOCKED_CONTENDED, std::memory_order_acquire) + & LOCKED) + { + auto status = futex_wait(LOCKED_CONTENDED, timeout); + + // check if we woke up because of a timeout + if(status == -1 && errno == ETIMEDOUT) + throw LockTimeoutError("Lock timeout"); + } + } + + void unlock() + { + futex_t state = LOCKED; + + // if we're locked and uncontended, try to unlock the mutex before + // it becomes contended + if(mutex.all.load(std::memory_order_acquire) == LOCKED && + mutex.all.compare_exchange_strong(state, UNLOCKED, + std::memory_order_release, + std::memory_order_relaxed)) + return; + + // we are contended, just release the lock + mutex.state.locked.store(UNLOCKED, std::memory_order_seq_cst); + + // spin and hope someone takes a lock so we don't have to wake up + // anyone because that's quite expensive + for(size_t i = 0; i < UNLOCK_RETRIES; ++i) + { + if(mutex.state.locked.load(std::memory_order_acquire) & LOCKED) + return; + + cpu_relax(); + } + + // we need to wake someone up + mutex.state.contended.store(UNCONTENDED, std::memory_order_release); + futex_wake(LOCKED); + } + +private: + mutex_t mutex; + + int futex_wait(int value, const struct timespec* timeout = nullptr) + { + return sys::futex(&mutex.all, FUTEX_WAIT_PRIVATE, value, + timeout, nullptr, 0); + } + + void futex_wake(int value) + { + sys::futex(&mutex.all, FUTEX_WAKE_PRIVATE, value, nullptr, nullptr, 0); + } +}; + diff --git a/threading/sync/lock_timeout_error.hpp b/threading/sync/lock_timeout_error.hpp new file mode 100644 index 000000000..9bb3fb78a --- /dev/null +++ b/threading/sync/lock_timeout_error.hpp @@ -0,0 +1,9 @@ +#pragma once + +#include <stdexcept> + +class LockTimeoutError : public std::runtime_error +{ +public: + using runtime_error::runtime_error; +}; diff --git a/utils/cpu_relax.hpp b/utils/cpu_relax.hpp new file mode 100644 index 000000000..40cb4497a --- /dev/null +++ b/utils/cpu_relax.hpp @@ -0,0 +1,9 @@ +#pragma once + +/* @brief improves contention in spinlocks + * hints the processor that we're in a spinlock and not doing much + */ +inline void cpu_relax() +{ + asm("PAUSE"); +} diff --git a/utils/sys.hpp b/utils/sys.hpp new file mode 100644 index 000000000..8e4ff35a2 --- /dev/null +++ b/utils/sys.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include <sys/syscall.h> +#include <linux/futex.h> +#include <unistd.h> +#include <sys/time.h> + +namespace sys +{ + +inline int futex(void* addr1, int op, int val1, const struct timespec* timeout, + void* addr2, int val3) +{ + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); +} + +}