Extract io/network into mg-io library

Reviewers: buda, dgleich, mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1411
This commit is contained in:
Teon Banek 2018-05-30 13:00:25 +02:00
parent 5b7947cc18
commit c7b6cae526
63 changed files with 590 additions and 625 deletions

View File

@ -2,6 +2,7 @@
# add memgraph sub libraries
add_subdirectory(utils)
add_subdirectory(io)
# all memgraph src files
set(memgraph_src_files
@ -40,10 +41,6 @@ set(memgraph_src_files
durability/recovery.cpp
durability/snapshooter.cpp
durability/wal.cpp
io/network/addrinfo.cpp
io/network/endpoint.cpp
io/network/socket.cpp
io/network/utils.cpp
query/common.cpp
query/repl.cpp
query/frontend/ast/ast.cpp
@ -70,8 +67,6 @@ set(memgraph_src_files
storage/property_value_store.cpp
storage/record_accessor.cpp
storage/vertex_accessor.cpp
threading/sync/rwlock.cpp
threading/thread.cpp
transactions/engine_master.cpp
transactions/engine_single_node.cpp
transactions/engine_worker.cpp
@ -132,7 +127,8 @@ string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags capnp kj
${Boost_IOSTREAMS_LIBRARY_RELEASE}
${Boost_SERIALIZATION_LIBRARY_RELEASE})
${Boost_SERIALIZATION_LIBRARY_RELEASE}
mg-utils mg-io)
if (USE_LTALLOC)
list(APPEND MEMGRAPH_ALL_LIBS ltalloc)
@ -146,7 +142,7 @@ endif()
# STATIC library used by memgraph executables
add_library(memgraph_lib STATIC ${memgraph_src_files})
target_link_libraries(memgraph_lib ${MEMGRAPH_ALL_LIBS} mg-utils)
target_link_libraries(memgraph_lib ${MEMGRAPH_ALL_LIBS})
add_dependencies(memgraph_lib generate_opencypher_parser)
add_dependencies(memgraph_lib generate_lcp)
add_dependencies(memgraph_lib generate_capnp)

View File

@ -17,7 +17,6 @@
#include "communication/buffer.hpp"
#include "database/graph_db.hpp"
#include "query/interpreter.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/transaction.hpp"
#include "utils/exceptions.hpp"

View File

@ -13,8 +13,8 @@
#include "communication/session.hpp"
#include "io/network/epoll.hpp"
#include "io/network/socket.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/thread.hpp"
#include "utils/thread/sync.hpp"
namespace communication {
@ -50,7 +50,7 @@ class Listener {
utils::ThreadSetName(fmt::format("{} timeout", service_name));
while (alive_) {
{
std::unique_lock<SpinLock> guard(lock_);
std::unique_lock<utils::SpinLock> guard(lock_);
for (auto &session : sessions_) {
if (session->TimedOut()) {
LOG(WARNING) << service_name << " session associated with "
@ -86,7 +86,7 @@ class Listener {
* @param connection socket which should be added to the event pool
*/
void AddConnection(io::network::Socket &&connection) {
std::unique_lock<SpinLock> guard(lock_);
std::unique_lock<utils::SpinLock> guard(lock_);
// Set connection options.
// The socket is left to be a blocking socket, but when `Read` is called
@ -202,7 +202,7 @@ class Listener {
// https://idea.popcount.org/2017-03-20-epoll-is-fundamentally-broken-22/
epoll_.Delete(session.socket().fd());
std::unique_lock<SpinLock> guard(lock_);
std::unique_lock<utils::SpinLock> guard(lock_);
auto it = std::find_if(sessions_.begin(), sessions_.end(),
[&](const auto &l) { return l.get() == &session; });
@ -220,7 +220,7 @@ class Listener {
TSessionData &data_;
SpinLock lock_;
utils::SpinLock lock_;
std::vector<std::unique_ptr<SessionHandler>> sessions_;
std::thread thread_;

View File

@ -12,8 +12,8 @@
#include "communication/buffer.hpp"
#include "io/network/socket.hpp"
#include "io/network/stream_buffer.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/exceptions.hpp"
#include "utils/thread/sync.hpp"
namespace communication {
@ -126,7 +126,7 @@ class Session {
* different threads in the network stack.
*/
bool TimedOut() {
std::unique_lock<SpinLock> guard(lock_);
std::unique_lock<utils::SpinLock> guard(lock_);
return last_event_time_ + std::chrono::seconds(inactivity_timeout_sec_) <
std::chrono::steady_clock::now();
}
@ -138,7 +138,7 @@ class Session {
private:
void RefreshLastEventTime() {
std::unique_lock<SpinLock> guard(lock_);
std::unique_lock<utils::SpinLock> guard(lock_);
last_event_time_ = std::chrono::steady_clock::now();
}
@ -155,7 +155,7 @@ class Session {
// Time of the last event and associated lock.
std::chrono::time_point<std::chrono::steady_clock> last_event_time_{
std::chrono::steady_clock::now()};
SpinLock lock_;
utils::SpinLock lock_;
const int inactivity_timeout_sec_;
};
} // namespace communication

View File

@ -3,8 +3,6 @@
#include <atomic>
#include "glog/logging.h"
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
/**
* A sequentially ordered non-unique lock-free concurrent collection of bits.

View File

@ -6,14 +6,12 @@
#include <type_traits>
#include "glog/logging.h"
#include "data_structures/concurrent/skiplist_gc.hpp"
#include "utils/crtp.hpp"
#include "utils/placeholder.hpp"
#include "utils/random/fast_binomial.hpp"
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
#include "data_structures/concurrent/skiplist_gc.hpp"
#include "utils/thread/sync.hpp"
/**
* computes the height for the new node from the interval [1...H]
@ -102,8 +100,8 @@ static thread_local utils::random::FastBinomial<> rnd;
* @tparam lock_t Lock type used when locking is needed during the creation
* and deletion of nodes.
*/
template <class T, size_t H = 32, class lock_t = SpinLock>
class SkipList : private Lockable<lock_t> {
template <class T, size_t H = 32, class lock_t = utils::SpinLock>
class SkipList : private utils::Lockable<lock_t> {
public:
/** @brief Wrapper class for flags used in the implementation
*
@ -134,7 +132,7 @@ class SkipList : private Lockable<lock_t> {
std::atomic<uint8_t> flags{0};
};
class Node : Lockable<lock_t> {
class Node : utils::Lockable<lock_t> {
public:
friend class SkipList;

View File

@ -12,8 +12,8 @@
#include "data_structures/concurrent/push_queue.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/executor.hpp"
#include "utils/thread/sync.hpp"
DECLARE_int32(skiplist_gc_interval);

View File

@ -9,7 +9,7 @@
#include "glog/logging.h"
#include "threading/sync/spinlock.hpp"
#include "utils/thread/sync.hpp"
/**
* A thread-safe ring buffer. Multi-producer, multi-consumer. Producers get
@ -43,7 +43,7 @@ class RingBuffer {
void emplace(TArgs &&... args) {
while (true) {
{
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
if (size_ < capacity_) {
buffer_[write_pos_++] = TElement(std::forward<TArgs>(args)...);
write_pos_ %= capacity_;
@ -64,7 +64,7 @@ class RingBuffer {
* empty, nullopt is returned.
*/
std::experimental::optional<TElement> pop() {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
if (size_ == 0) return std::experimental::nullopt;
size_--;
std::experimental::optional<TElement> result(
@ -75,7 +75,7 @@ class RingBuffer {
/** Removes all elements from the buffer. */
void clear() {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
read_pos_ = 0;
write_pos_ = 0;
size_ = 0;
@ -84,7 +84,7 @@ class RingBuffer {
private:
int capacity_;
TElement *buffer_;
SpinLock lock_;
utils::SpinLock lock_;
int read_pos_{0};
int write_pos_{0};
int size_{0};

View File

@ -14,6 +14,7 @@
#include "storage/vertex.hpp"
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
#include "utils/timer.hpp"
namespace database {

View File

@ -72,7 +72,7 @@ ProduceRpcServer::OngoingProduce::PullOneFromCursor() {
}
} catch (const mvcc::SerializationError &) {
cursor_state_ = PullState::SERIALIZATION_ERROR;
} catch (const LockTimeoutException &) {
} catch (const utils::LockTimeoutException &) {
cursor_state_ = PullState::LOCK_TIMEOUT_ERROR;
} catch (const RecordDeletedError &) {
cursor_state_ = PullState::UPDATE_DELETED_ERROR;

View File

@ -11,9 +11,8 @@
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "storage/types.hpp"
#include "transactions/transaction.hpp"
#include "threading/thread_pool.hpp"
#include "utils/future.hpp"
#include "utils/thread.hpp"
namespace distributed {
@ -76,7 +75,7 @@ class RpcWorkerClients {
Coordination &coordination_;
std::unordered_map<int, communication::rpc::ClientPool> client_pools_;
std::mutex lock_;
threading::ThreadPool thread_pool_;
utils::ThreadPool thread_pool_;
};
/** Wrapper class around a RPC call to build indices.

View File

@ -4,6 +4,7 @@
#include "distributed/updates_rpc_clients.hpp"
#include "query/exceptions.hpp"
#include "utils/thread/sync.hpp"
namespace distributed {
@ -15,7 +16,7 @@ void RaiseIfRemoteError(UpdateResult result) {
case UpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case UpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
throw utils::LockTimeoutException(
"Remote LockTimeoutError during edge creation");
case UpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();

View File

@ -3,7 +3,7 @@
#include "glog/logging.h"
#include "distributed/updates_rpc_server.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "utils/thread/sync.hpp"
namespace distributed {
@ -13,7 +13,7 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
auto gid = std::is_same<TRecordAccessor, VertexAccessor>::value
? delta.vertex_id
: delta.edge_id;
std::lock_guard<SpinLock> guard{lock_};
std::lock_guard<utils::SpinLock> guard{lock_};
auto found = deltas_.find(gid);
if (found == deltas_.end()) {
found =
@ -52,7 +52,7 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
// return UpdateResult::SERIALIZATION_ERROR;
// } catch (const RecordDeletedError &) {
// return UpdateResult::UPDATE_DELETED_ERROR;
// } catch (const LockTimeoutException &) {
// } catch (const utils::LockTimeoutException &) {
// return UpdateResult::LOCK_TIMEOUT_ERROR;
// }
return UpdateResult::DONE;
@ -66,7 +66,7 @@ gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
auto result = db_accessor_.InsertVertex();
for (auto &label : labels) result.add_label(label);
for (auto &kv : properties) result.PropsSet(kv.first, kv.second);
std::lock_guard<SpinLock> guard{lock_};
std::lock_guard<utils::SpinLock> guard{lock_};
deltas_.emplace(result.gid(),
std::make_pair(result, std::vector<database::StateDelta>{}));
return result.gid();
@ -80,7 +80,7 @@ gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
storage::VertexAddress(from, db.WorkerId()));
auto to_addr = db.storage().LocalizedAddressIfPossible(to);
auto edge = db_accessor_.InsertOnlyEdge(from_addr, to_addr, edge_type);
std::lock_guard<SpinLock> guard{lock_};
std::lock_guard<utils::SpinLock> guard{lock_};
deltas_.emplace(edge.gid(),
std::make_pair(edge, std::vector<database::StateDelta>{}));
return edge.gid();
@ -88,7 +88,7 @@ gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
template <typename TRecordAccessor>
UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
std::lock_guard<SpinLock> guard{lock_};
std::lock_guard<utils::SpinLock> guard{lock_};
for (auto &kv : deltas_) {
auto &record_accessor = kv.second.first;
// We need to reconstruct the record as in the meantime some local
@ -164,7 +164,7 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Apply() {
return UpdateResult::SERIALIZATION_ERROR;
} catch (const RecordDeletedError &) {
return UpdateResult::UPDATE_DELETED_ERROR;
} catch (const LockTimeoutException &) {
} catch (const utils::LockTimeoutException &) {
return UpdateResult::LOCK_TIMEOUT_ERROR;
}
}

View File

@ -16,8 +16,8 @@
#include "storage/gid.hpp"
#include "storage/types.hpp"
#include "storage/vertex_accessor.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/type.hpp"
#include "utils/thread/sync.hpp"
namespace distributed {
@ -62,7 +62,7 @@ class UpdatesRpcServer {
gid::Gid, std::pair<TRecordAccessor, std::vector<database::StateDelta>>>
deltas_;
// Multiple workers might be sending remote updates concurrently.
SpinLock lock_;
utils::SpinLock lock_;
// Helper method specialized for [Vertex|Edge]Accessor.
TRecordAccessor FindAccessor(gid::Gid gid);

10
src/io/CMakeLists.txt Normal file
View File

@ -0,0 +1,10 @@
set(io_src_files
network/addrinfo.cpp
network/endpoint.cpp
network/socket.cpp
network/utils.cpp)
add_library(mg-io STATIC ${io_src_files})
target_link_libraries(mg-io stdc++fs Threads::Threads fmt glog mg-utils)
# TODO: Remove this dependency when we switch to capnp
target_link_libraries(mg-io ${Boost_SERIALIZATION_LIBRARY_RELEASE})

View File

@ -19,7 +19,6 @@
#include "glog/logging.h"
#include "io/network/addrinfo.hpp"
#include "threading/sync/cpu_relax.hpp"
#include "utils/likely.hpp"
namespace io::network {

View File

@ -2,7 +2,6 @@
#include "storage/gid.hpp"
#include "storage/locking/record_lock.hpp"
#include "threading/sync/lockable.hpp"
#include "transactions/transaction.hpp"
#include "utils/exceptions.hpp"

View File

@ -162,7 +162,7 @@ AstTreeStorage Interpreter::QueryToAst(const StrippedQuery &stripped,
// stripped query -> AST
auto parser = [&] {
// Be careful about unlocking since parser can throw.
std::unique_lock<SpinLock> guard(antlr_lock_);
std::unique_lock<utils::SpinLock> guard(antlr_lock_);
return std::make_unique<frontend::opencypher::Parser>(
stripped.original_query());
}();
@ -178,7 +178,7 @@ AstTreeStorage Interpreter::QueryToAst(const StrippedQuery &stripped,
// stripped query -> AST
auto parser = [&] {
// Be careful about unlocking since parser can throw.
std::unique_lock<SpinLock> guard(antlr_lock_);
std::unique_lock<utils::SpinLock> guard(antlr_lock_);
try {
return std::make_unique<frontend::opencypher::Parser>(stripped.query());
} catch (const SyntaxException &e) {

View File

@ -11,7 +11,7 @@
#include "query/interpret/frame.hpp"
#include "query/plan/distributed.hpp"
#include "query/plan/operator.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/thread/sync.hpp"
#include "utils/timer.hpp"
DECLARE_int32(query_plan_cache_ttl);
@ -173,7 +173,7 @@ class Interpreter {
// can remove this lock. This will probably never happen since antlr
// developers introduce more bugs in each version. Fortunately, we have cache
// so this lock probably won't impact performance much...
SpinLock antlr_lock_;
utils::SpinLock antlr_lock_;
// Optional, not null only in a distributed master.
distributed::PlanDispatcher *plan_dispatcher_{nullptr};

View File

@ -30,6 +30,7 @@
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
#include "utils/hashing/fnv.hpp"
#include "utils/thread/sync.hpp"
DEFINE_HIDDEN_int32(remote_pull_sleep_micros, 10,
"Sleep between remote result pulling in microseconds");
@ -3266,7 +3267,7 @@ class RemotePuller {
throw mvcc::SerializationError(
"Serialization error occured during PullRemote !");
case distributed::PullState::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
throw utils::LockTimeoutException(
"LockTimeout error occured during PullRemote !");
case distributed::PullState::UPDATE_DELETED_ERROR:
throw QueryRuntimeException(
@ -3528,7 +3529,7 @@ class SynchronizeCursor : public Cursor {
"Failed to perform remote accumulate due to "
"RecordDeletedError");
case distributed::PullState::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
throw utils::LockTimeoutException(
"Failed to perform remote accumulate due to "
"LockTimeoutException");
case distributed::PullState::RECONSTRUCTION_ERROR:
@ -3566,7 +3567,7 @@ class SynchronizeCursor : public Cursor {
throw QueryRuntimeException(
"Failed to apply deferred updates due to RecordDeletedError");
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
throw utils::LockTimeoutException(
"Failed to apply deferred update due to LockTimeoutException");
case distributed::UpdateResult::DONE:
break;

View File

@ -9,6 +9,7 @@
#include "query/exceptions.hpp"
#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp"
#include "utils/flag_validation.hpp"
#include "utils/thread/sync.hpp"
DEFINE_VALIDATED_int32(
dgp_improvement_threshold, 10,
@ -52,7 +53,7 @@ void DynamicGraphPartitioner::Run() {
throw query::QueryRuntimeException(
"Failed to apply deferred updates due to RecordDeletedError");
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException(
throw utils::LockTimeoutException(
"Failed to apply deferred update due to LockTimeoutException");
case distributed::UpdateResult::DONE:
break;

View File

@ -6,9 +6,9 @@
#include <stack>
#include <unordered_set>
#include "threading/sync/lock_timeout_exception.hpp"
#include "transactions/engine.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/thread/sync.hpp"
#include "utils/timer.hpp"
namespace {
@ -102,7 +102,7 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) {
// Message could be incorrect. Transaction could be aborted because it was
// running for too long time, but that is unlikely and it is not very
// important which exception (and message) we throw here.
throw LockTimeoutException(
throw utils::LockTimeoutException(
"Transaction was aborted since it was oldest in a lock cycle");
}
if (TryLock(tx.id_)) {
@ -119,10 +119,10 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) {
it->second = owner;
abort_oldest_tx_in_lock_cycle();
}
cpu_relax();
utils::CpuRelax();
}
throw LockTimeoutException(fmt::format(
throw utils::LockTimeoutException(fmt::format(
"Transaction locked for more than {} seconds", kTimeout.count()));
}

View File

@ -4,9 +4,7 @@
#include <chrono>
#include <unordered_set>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/locking/lock_status.hpp"
#include "threading/sync/futex.hpp"
#include "transactions/type.hpp"
namespace tx {

View File

@ -8,7 +8,7 @@
#include "storage/edge.hpp"
#include "storage/record_accessor.hpp"
#include "storage/vertex.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "utils/thread/sync.hpp"
using database::StateDelta;
@ -214,7 +214,7 @@ void RecordAccessor<TRecord>::SendDelta(
case distributed::UpdateResult::UPDATE_DELETED_ERROR:
throw RecordDeletedError();
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
throw LockTimeoutException("Lock timeout on remote worker");
throw utils::LockTimeoutException("Lock timeout on remote worker");
}
}

View File

@ -1,23 +0,0 @@
#pragma once
#include <unistd.h>
#include <atomic>
class CasLock {
public:
void lock() {
bool locked = false;
while (!lock_flag.compare_exchange_weak(
locked, true, std::memory_order_release, std::memory_order_relaxed)) {
usleep(250);
}
}
void unlock() { lock_flag.store(0, std::memory_order_release); }
bool locked() { return lock_flag.load(std::memory_order_relaxed); }
private:
std::atomic<bool> lock_flag;
};

View File

@ -1,11 +0,0 @@
#pragma once
/* @brief improves contention in spinlocks
* hints the processor that we're in a spinlock and not doing much
*/
inline void cpu_relax() {
// if IBMPower
// HMT_very_low()
// http://stackoverflow.com/questions/5425506/equivalent-of-x86-pause-instruction-for-ppc
asm("PAUSE");
}

View File

@ -1,150 +0,0 @@
#pragma once
#include <errno.h>
#include <stdint.h>
#include <atomic>
#include <linux/futex.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#include "glog/logging.h"
#include "threading/sync/cpu_relax.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
namespace sys {
inline int futex(void *addr1, int op, int val1, const struct timespec *timeout,
void *addr2, int val3) {
return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
};
} // namespace sys
class Futex {
using futex_t = uint32_t;
using flag_t = uint8_t;
/* @brief Data structure for implementing fast mutexes
*
* This structure is 4B wide, as required for futex system call where
* the last two bytes are used for two flags - contended and locked,
* respectively. Memory layout for the structure looks like this:
*
* all
* |---------------------------------|
* 00000000 00000000 0000000C 0000000L
* |------| |------|
* contended locked
*
* L marks the locked bit
* C marks the contended bit
*/
union mutex_t {
std::atomic<futex_t> all{0};
struct {
std::atomic<flag_t> locked;
std::atomic<flag_t> contended;
} state;
};
enum Contention : futex_t { UNCONTENDED = 0x0000, CONTENDED = 0x0100 };
enum State : futex_t {
UNLOCKED = 0x0000,
LOCKED = 0x0001,
UNLOCKED_CONTENDED = UNLOCKED | CONTENDED, // 0x0100
LOCKED_CONTENDED = LOCKED | CONTENDED // 0x0101
};
static constexpr size_t LOCK_RETRIES = 100;
static constexpr size_t UNLOCK_RETRIES = 200;
public:
Futex() {
static_assert(sizeof(mutex_t) == sizeof(futex_t),
"Atomic futex should be the same size as non_atomic");
}
bool try_lock() {
// we took the lock if we stored the LOCKED state and previous
// state was UNLOCKED
return mutex.state.locked.exchange(LOCKED, std::memory_order_seq_cst) ==
UNLOCKED;
}
void lock(const struct timespec *timeout = nullptr) {
// try to fast lock a few times before going to sleep
for (size_t i = 0; i < LOCK_RETRIES; ++i) {
// try to lock and exit if we succeed
if (try_lock()) return;
// we failed, chill a bit
relax();
}
// the lock is contended, go to sleep. when someone
// wakes you up, try taking the lock again
while (mutex.all.exchange(LOCKED_CONTENDED, std::memory_order_seq_cst) &
LOCKED) {
// wait in the kernel for someone to wake us up when unlocking
auto status = futex_wait(LOCKED_CONTENDED, timeout);
// check if we woke up because of a timeout
if (status == -1 && errno == ETIMEDOUT)
throw LockTimeoutException("Lock timeout");
}
}
void unlock() {
futex_t state = LOCKED;
// if we're locked and uncontended, try to unlock the mutex before
// it becomes contended
if (mutex.all.load(std::memory_order_seq_cst) == LOCKED &&
mutex.all.compare_exchange_strong(state, UNLOCKED,
std::memory_order_seq_cst,
std::memory_order_seq_cst))
return;
// we are contended, just release the lock
mutex.state.locked.store(UNLOCKED, std::memory_order_seq_cst);
// spin and hope someone takes a lock so we don't have to wake up
// anyone because that's quite expensive
for (size_t i = 0; i < UNLOCK_RETRIES; ++i) {
// if someone took the lock, we're ok
if (is_locked(std::memory_order_seq_cst)) return;
relax();
}
// store that we are becoming uncontended
mutex.state.contended.store(UNCONTENDED, std::memory_order_seq_cst);
// we need to wake someone up
futex_wake(LOCKED);
}
bool is_locked(std::memory_order order = std::memory_order_seq_cst) const {
return mutex.state.locked.load(order);
}
bool is_contended(std::memory_order order = std::memory_order_seq_cst) const {
return mutex.state.contended.load(order);
}
private:
mutex_t mutex;
int futex_wait(int value, const struct timespec *timeout = nullptr) {
return sys::futex(&mutex.all, FUTEX_WAIT_PRIVATE, value, timeout, nullptr,
0);
}
void futex_wake(int value) {
sys::futex(&mutex.all, FUTEX_WAKE_PRIVATE, value, nullptr, nullptr, 0);
}
void relax() { cpu_relax(); }
};

View File

@ -1,8 +0,0 @@
#pragma once
#include "utils/exceptions.hpp"
class LockTimeoutException : public utils::BasicException {
public:
using utils::BasicException::BasicException;
};

View File

@ -1,31 +0,0 @@
#pragma once
#include <mutex>
#include "spinlock.hpp"
/**
* @class Lockable
*
* @brief
* Lockable is used as an custom implementation of a mutex mechanism. It is
* implemented as a wrapper around std::lock_guard and std::unique_guard with
* a default lock called Spinlock.
*
* @tparam lock_t type of lock to be used (default = Spinlock)
*/
template <class lock_t = SpinLock>
class Lockable {
public:
using lock_type = lock_t;
std::lock_guard<lock_t> acquire_guard() const {
return std::lock_guard<lock_t>(lock);
}
std::unique_lock<lock_t> acquire_unique() const {
return std::unique_lock<lock_t>(lock);
}
mutable lock_t lock;
};

View File

@ -1,46 +0,0 @@
/// @file
#pragma once
#include <pthread.h>
#include "glog/logging.h"
namespace threading {
/// By passing the appropriate parameter to the `RWLock` constructor, it is
/// possible to control the behavior of `RWLock` while shared lock is held. If
/// the priority is set to `READ`, new shared (read) locks can be obtained even
/// though there is a thread waiting for an exclusive (write) lock, which can
/// lead to writer starvation. If the priority is set to `WRITE`, readers will
/// be blocked from obtaining new shared locks while there are writers waiting,
/// which can lead to reader starvation.
enum RWLockPriority { READ, WRITE };
/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to
/// choose read or write priority for `std::shared_mutex`.
class RWLock {
public:
RWLock(const RWLock &) = delete;
RWLock &operator=(const RWLock &) = delete;
RWLock(RWLock &&) = delete;
RWLock &operator=(RWLock &&) = delete;
/// Construct a RWLock object with chosen priority. See comment above
/// `RWLockPriority` for details.
explicit RWLock(RWLockPriority priority);
~RWLock();
bool try_lock();
void lock();
void unlock();
bool try_lock_shared();
void lock_shared();
void unlock_shared();
private:
pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER;
};
} // namespace threading

View File

@ -1,34 +0,0 @@
#pragma once
#include <unistd.h>
#include <atomic>
#include <chrono>
#include "threading/sync/cpu_relax.hpp"
#include "utils/timer.hpp"
/**
* @class SpinLock
*
* @brief
* Spinlock is used as an locking mechanism based on an atomic flag and
* waiting loops. It uses the cpu_relax "asm pause" command to optimize wasted
* time while the threads are waiting.
*
*/
class SpinLock {
public:
void lock() { // Before was memory_order_acquire
while (lock_flag_.test_and_set()) {
cpu_relax();
}
}
// Before was memory_order_release
void unlock() { lock_flag_.clear(); }
bool try_lock() { return !lock_flag_.test_and_set(); }
private:
// guaranteed by standard to be lock free!
mutable std::atomic_flag lock_flag_ = ATOMIC_FLAG_INIT;
};

View File

@ -1,39 +0,0 @@
#pragma once
#include <unistd.h>
#include <atomic>
#include <chrono>
#include <stdexcept>
#include "threading/sync/lock_timeout_exception.hpp"
template <size_t microseconds = 250>
class TimedSpinLock {
public:
TimedSpinLock(std::chrono::seconds expiration) : expiration_(expiration) {}
void lock() {
using clock = std::chrono::high_resolution_clock;
auto start = clock::now();
while (!lock_flag.test_and_set(std::memory_order_acquire)) {
// how long have we been locked? if we exceeded the expiration
// time, throw an exception and stop being blocked because this
// might be a deadlock!
if (clock::now() - start > expiration_)
throw LockTimeoutException("This lock has expired");
usleep(microseconds);
}
}
void unlock() { lock_flag.clear(std::memory_order_release); }
private:
std::chrono::milliseconds expiration_;
// guaranteed by standard to be lock free!
std::atomic_flag lock_flag = ATOMIC_FLAG_INIT;
};

View File

@ -1,12 +0,0 @@
#include "glog/logging.h"
#include "thread.hpp"
Thread::Thread(Thread &&other) {
DCHECK(thread_id == UNINITIALIZED) << "Thread was initialized before.";
thread_id = other.thread_id;
thread = std::move(other.thread);
}
void Thread::join() { return thread.join(); }
std::atomic<unsigned> Thread::thread_counter{1};

View File

@ -1,39 +0,0 @@
#pragma once
#include <atomic>
#include <thread>
class Thread {
static std::atomic<unsigned> thread_counter;
public:
static size_t count(std::memory_order order = std::memory_order_seq_cst) {
return thread_counter.load(order);
}
static constexpr unsigned UNINITIALIZED = -1;
static constexpr unsigned MAIN_THREAD = 0;
template <class F>
explicit Thread(F f) {
thread_id = thread_counter.fetch_add(1, std::memory_order_acq_rel);
thread = std::thread([this, f]() { start_thread(f); });
}
Thread() = default;
Thread(const Thread &) = delete;
Thread(Thread &&other);
void join();
private:
unsigned thread_id = UNINITIALIZED;
std::thread thread;
template <class F, class... Args>
void start_thread(F &&f) {
// this_thread::id = thread_id;
f();
}
};

View File

@ -1,82 +0,0 @@
#pragma once
/// @file
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>
#include "glog/logging.h"
#include "utils/future.hpp"
namespace threading {
/// A thread pool for asynchronous task execution. Supports tasks that produce
/// return values by returning `utils::Future` objects.
class ThreadPool {
public:
/// Creates a thread pool with the given number of threads.
explicit ThreadPool(size_t threads) {
for (size_t i = 0; i < threads; ++i)
workers_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
cvar_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
ThreadPool &operator=(ThreadPool &&) = delete;
/// Runs the given callable with the given args, asynchronously. This function
/// immediately returns an `utils::Future` with the result, to be
/// consumed when ready.
template <class TCallable, class... TArgs>
auto Run(TCallable &&callable, TArgs &&... args) {
auto task = std::make_shared<
std::packaged_task<std::result_of_t<TCallable(TArgs...)>()>>(std::bind(
std::forward<TCallable>(callable), std::forward<TArgs>(args)...));
auto res = utils::make_future(task->get_future());
std::unique_lock<std::mutex> lock(mutex_);
CHECK(!stop_) << "ThreadPool::Run called on stopped ThreadPool.";
tasks_.emplace([task]() { (*task)(); });
lock.unlock();
cvar_.notify_one();
return res;
}
~ThreadPool() {
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
lock.unlock();
cvar_.notify_all();
for (std::thread &worker : workers_) {
if (worker.joinable()) worker.join();
}
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable cvar_;
bool stop_{false};
};
} // namespace threading

View File

@ -5,7 +5,6 @@
#include <vector>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/transaction.hpp"
#include "transactions/type.hpp"

View File

@ -14,7 +14,7 @@ SingleNodeEngine::SingleNodeEngine(durability::WriteAheadLog *wal)
Transaction *SingleNodeEngine::Begin() {
VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
TransactionId id{++counter_};
auto t = new Transaction(id, active_, *this);
@ -27,7 +27,7 @@ Transaction *SingleNodeEngine::Begin() {
}
CommandId SingleNodeEngine::Advance(TransactionId id) {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
@ -43,7 +43,7 @@ CommandId SingleNodeEngine::Advance(TransactionId id) {
}
CommandId SingleNodeEngine::UpdateCommand(TransactionId id) {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
@ -52,7 +52,7 @@ CommandId SingleNodeEngine::UpdateCommand(TransactionId id) {
void SingleNodeEngine::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Commiting transaction " << t.id_;
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_committed(t.id_);
active_.remove(t.id_);
if (wal_) {
@ -63,7 +63,7 @@ void SingleNodeEngine::Commit(const Transaction &t) {
void SingleNodeEngine::Abort(const Transaction &t) {
VLOG(11) << "[Tx] Aborting transaction " << t.id_;
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
active_.remove(t.id_);
if (wal_) {
@ -77,7 +77,7 @@ CommitLog::Info SingleNodeEngine::Info(TransactionId tx) const {
}
Snapshot SingleNodeEngine::GlobalGcSnapshot() {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
// No active transactions.
if (active_.size() == 0) {
@ -93,20 +93,20 @@ Snapshot SingleNodeEngine::GlobalGcSnapshot() {
}
Snapshot SingleNodeEngine::GlobalActiveTransactions() {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
Snapshot active_transactions = active_;
return active_transactions;
}
TransactionId SingleNodeEngine::LocalLast() const {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
return counter_;
}
TransactionId SingleNodeEngine::GlobalLast() const { return LocalLast(); }
TransactionId SingleNodeEngine::LocalOldestActive() const {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
return active_.empty() ? counter_ + 1 : active_.front();
}
@ -116,14 +116,14 @@ void SingleNodeEngine::GarbageCollectCommitLog(TransactionId tx_id) {
void SingleNodeEngine::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
for (auto transaction : active_) {
f(*store_.find(transaction)->second);
}
}
Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
auto found = store_.find(tx_id);
CHECK(found != store_.end())
<< "Can't return snapshot for an inactive transaction";
@ -131,7 +131,7 @@ Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) {
}
void SingleNodeEngine::EnsureNextIdGreater(TransactionId tx_id) {
std::lock_guard<SpinLock> guard(lock_);
std::lock_guard<utils::SpinLock> guard(lock_);
counter_ = std::max(tx_id, counter_);
}

View File

@ -5,11 +5,11 @@
#include <unordered_map>
#include "durability/wal.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "utils/exceptions.hpp"
#include "utils/thread/sync.hpp"
namespace tx {
@ -51,7 +51,7 @@ class SingleNodeEngine : public Engine {
CommitLog clog_;
std::unordered_map<TransactionId, std::unique_ptr<Transaction>> store_;
Snapshot active_;
mutable SpinLock lock_;
mutable utils::SpinLock lock_;
// Optional. If present, the Engine will write tx Begin/Commit/Abort
// atomically (while under lock).
durability::WriteAheadLog *wal_{nullptr};

View File

@ -7,8 +7,8 @@
#include "glog/logging.h"
#include "storage/locking/lock_status.hpp"
#include "storage/locking/record_lock.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/type.hpp"
#include "utils/thread/sync.hpp"
namespace tx {
@ -69,7 +69,7 @@ class LockStore {
// same time. IMPORTANT: This guard must come after LockHolder construction,
// as that potentially takes a long time and this guard only needs to
// protect locks_ update.
std::lock_guard<SpinLock> guard{locks_lock_};
std::lock_guard<utils::SpinLock> guard{locks_lock_};
locks_.emplace_back(std::move(holder));
if (!locks_.back().active()) {
locks_.pop_back();
@ -77,7 +77,7 @@ class LockStore {
}
private:
SpinLock locks_lock_;
utils::SpinLock locks_lock_;
std::vector<LockHolder> locks_;
};
} // namespace tx

View File

@ -8,8 +8,6 @@
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/locking/record_lock.hpp"
#include "threading/sync/lockable.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/lock_store.hpp"
#include "transactions/snapshot.hpp"
#include "transactions/type.hpp"

View File

@ -2,6 +2,8 @@ set(utils_src_files
demangle.cpp
file.cpp
signals.cpp
thread.cpp
thread/sync.cpp
watchdog.cpp)
add_library(mg-utils STATIC ${utils_src_files})

View File

@ -13,7 +13,6 @@
#include "storage/property_value.hpp"
#include "storage/types.hpp"
#include "storage/vertex_accessor.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
namespace utils {

52
src/utils/thread.cpp Normal file
View File

@ -0,0 +1,52 @@
#include "utils/thread.hpp"
#include <sys/prctl.h>
#include <glog/logging.h>
namespace utils {
void ThreadSetName(const std::string &name) {
CHECK(name.size() <= 16) << "Thread name '" << name << "'too long";
LOG_IF(WARNING, prctl(PR_SET_NAME, name.c_str()) != 0)
<< "Couldn't set thread name: " << name << "!";
}
Thread::Thread(Thread &&other) {
DCHECK(thread_id == UNINITIALIZED) << "Thread was initialized before.";
thread_id = other.thread_id;
thread = std::move(other.thread);
}
void Thread::join() { return thread.join(); }
std::atomic<unsigned> Thread::thread_counter{1};
ThreadPool::ThreadPool(size_t threads) {
for (size_t i = 0; i < threads; ++i)
workers_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
cvar_.wait(lock, [this] { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) return;
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
ThreadPool::~ThreadPool() {
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
lock.unlock();
cvar_.notify_all();
for (std::thread &worker : workers_) {
if (worker.joinable()) worker.join();
}
}
} // namespace utils

View File

@ -1,21 +1,99 @@
/// @file
#pragma once
#include <sys/prctl.h>
#include <atomic>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
#include <glog/logging.h>
#include "utils/future.hpp"
namespace utils {
/**
* This function sets the thread name of the calling thread.
* Beware, the name length limit is 16 characters!
*/
inline void ThreadSetName(const std::string &name) {
CHECK(name.size() <= 16) << "Thread name '" << name << "'too long";
LOG_IF(WARNING, prctl(PR_SET_NAME, name.c_str()) != 0)
<< "Couldn't set thread name: " << name << "!";
}
/// This function sets the thread name of the calling thread.
/// Beware, the name length limit is 16 characters!
void ThreadSetName(const std::string &name);
class Thread {
static std::atomic<unsigned> thread_counter;
public:
static size_t count(std::memory_order order = std::memory_order_seq_cst) {
return thread_counter.load(order);
}
static constexpr unsigned UNINITIALIZED = -1;
static constexpr unsigned MAIN_THREAD = 0;
template <class F>
explicit Thread(F f) {
thread_id = thread_counter.fetch_add(1, std::memory_order_acq_rel);
thread = std::thread([this, f]() { start_thread(f); });
}
Thread() = default;
Thread(const Thread &) = delete;
Thread(Thread &&other);
void join();
private:
unsigned thread_id = UNINITIALIZED;
std::thread thread;
template <class F, class... Args>
void start_thread(F &&f) {
// this_thread::id = thread_id;
f();
}
};
/// A thread pool for asynchronous task execution. Supports tasks that produce
/// return values by returning `utils::Future` objects.
class ThreadPool final {
public:
/// Creates a thread pool with the given number of threads.
explicit ThreadPool(size_t threads);
~ThreadPool();
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
ThreadPool &operator=(ThreadPool &&) = delete;
/// Runs the given callable with the given args, asynchronously. This function
/// immediately returns an `utils::Future` with the result, to be
/// consumed when ready.
template <class TCallable, class... TArgs>
auto Run(TCallable &&callable, TArgs &&... args) {
auto task = std::make_shared<
std::packaged_task<std::result_of_t<TCallable(TArgs...)>()>>(std::bind(
std::forward<TCallable>(callable), std::forward<TArgs>(args)...));
auto res = utils::make_future(task->get_future());
std::unique_lock<std::mutex> lock(mutex_);
CHECK(!stop_) << "ThreadPool::Run called on stopped ThreadPool.";
tasks_.emplace([task]() { (*task)(); });
lock.unlock();
cvar_.notify_one();
return res;
}
private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable cvar_;
bool stop_{false};
};
}; // namespace utils

View File

@ -1,6 +1,19 @@
#include "threading/sync/rwlock.hpp"
#include "utils/thread/sync.hpp"
namespace threading {
#include <linux/futex.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
namespace sys {
inline int futex(void *addr1, int op, int val1, const struct timespec *timeout,
void *addr2, int val3) {
return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3);
};
} // namespace sys
namespace utils {
RWLock::RWLock(RWLockPriority priority) {
int err;
@ -87,4 +100,65 @@ void RWLock::unlock_shared() {
}
}
} // namespace threading
void Futex::lock(const struct timespec *timeout) {
// try to fast lock a few times before going to sleep
for (size_t i = 0; i < LOCK_RETRIES; ++i) {
// try to lock and exit if we succeed
if (try_lock()) return;
// we failed, chill a bit
relax();
}
// the lock is contended, go to sleep. when someone
// wakes you up, try taking the lock again
while (mutex.all.exchange(LOCKED_CONTENDED, std::memory_order_seq_cst) &
LOCKED) {
// wait in the kernel for someone to wake us up when unlocking
auto status = futex_wait(LOCKED_CONTENDED, timeout);
// check if we woke up because of a timeout
if (status == -1 && errno == ETIMEDOUT)
throw LockTimeoutException("Lock timeout");
}
}
void Futex::unlock() {
futex_t state = LOCKED;
// if we're locked and uncontended, try to unlock the mutex before
// it becomes contended
if (mutex.all.load(std::memory_order_seq_cst) == LOCKED &&
mutex.all.compare_exchange_strong(state, UNLOCKED,
std::memory_order_seq_cst,
std::memory_order_seq_cst))
return;
// we are contended, just release the lock
mutex.state.locked.store(UNLOCKED, std::memory_order_seq_cst);
// spin and hope someone takes a lock so we don't have to wake up
// anyone because that's quite expensive
for (size_t i = 0; i < UNLOCK_RETRIES; ++i) {
// if someone took the lock, we're ok
if (is_locked(std::memory_order_seq_cst)) return;
relax();
}
// store that we are becoming uncontended
mutex.state.contended.store(UNCONTENDED, std::memory_order_seq_cst);
// we need to wake someone up
futex_wake(LOCKED);
}
int Futex::futex_wait(int value, const struct timespec *timeout) {
return sys::futex(&mutex.all, FUTEX_WAIT_PRIVATE, value, timeout, nullptr, 0);
}
void Futex::futex_wake(int value) {
sys::futex(&mutex.all, FUTEX_WAKE_PRIVATE, value, nullptr, nullptr, 0);
}
} // namespace utils

235
src/utils/thread/sync.hpp Normal file
View File

@ -0,0 +1,235 @@
/// @file
#pragma once
#include <pthread.h>
#include <unistd.h>
#include <atomic>
#include <cerrno>
#include <chrono>
#include <cstdint>
#include <mutex>
#include "utils/exceptions.hpp"
namespace utils {
/// Improves contention in spinlocks by hinting the processor that we're in a
/// spinlock and not doing much.
inline void CpuRelax() {
// if IBMPower
// HMT_very_low()
// http://stackoverflow.com/questions/5425506/equivalent-of-x86-pause-instruction-for-ppc
asm("PAUSE");
}
class LockTimeoutException : public BasicException {
public:
using BasicException::BasicException;
};
class CasLock {
public:
void lock() {
bool locked = false;
while (!lock_flag.compare_exchange_weak(
locked, true, std::memory_order_release, std::memory_order_relaxed)) {
usleep(250);
}
}
void unlock() { lock_flag.store(0, std::memory_order_release); }
bool locked() { return lock_flag.load(std::memory_order_relaxed); }
private:
std::atomic<bool> lock_flag;
};
/// Spinlock is used as a locking mechanism based on an atomic flag and waiting
/// loops.
///
/// It uses the CpuRelax "asm pause" command to optimize wasted time while the
/// threads are waiting.
class SpinLock {
public:
void lock() { // Before was memory_order_acquire
while (lock_flag_.test_and_set()) {
CpuRelax();
}
}
// Before was memory_order_release
void unlock() { lock_flag_.clear(); }
bool try_lock() { return !lock_flag_.test_and_set(); }
private:
// guaranteed by standard to be lock free!
mutable std::atomic_flag lock_flag_ = ATOMIC_FLAG_INIT;
};
template <size_t microseconds = 250>
class TimedSpinLock {
public:
TimedSpinLock(std::chrono::seconds expiration) : expiration_(expiration) {}
void lock() {
using clock = std::chrono::high_resolution_clock;
auto start = clock::now();
while (!lock_flag.test_and_set(std::memory_order_acquire)) {
// how long have we been locked? if we exceeded the expiration
// time, throw an exception and stop being blocked because this
// might be a deadlock!
if (clock::now() - start > expiration_)
throw LockTimeoutException("This lock has expired");
usleep(microseconds);
}
}
void unlock() { lock_flag.clear(std::memory_order_release); }
private:
std::chrono::milliseconds expiration_;
// guaranteed by standard to be lock free!
std::atomic_flag lock_flag = ATOMIC_FLAG_INIT;
};
/// By passing the appropriate parameter to the `RWLock` constructor, it is
/// possible to control the behavior of `RWLock` while shared lock is held. If
/// the priority is set to `READ`, new shared (read) locks can be obtained even
/// though there is a thread waiting for an exclusive (write) lock, which can
/// lead to writer starvation. If the priority is set to `WRITE`, readers will
/// be blocked from obtaining new shared locks while there are writers waiting,
/// which can lead to reader starvation.
enum RWLockPriority { READ, WRITE };
/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to
/// choose read or write priority for `std::shared_mutex`.
class RWLock {
public:
RWLock(const RWLock &) = delete;
RWLock &operator=(const RWLock &) = delete;
RWLock(RWLock &&) = delete;
RWLock &operator=(RWLock &&) = delete;
/// Construct a RWLock object with chosen priority. See comment above
/// `RWLockPriority` for details.
explicit RWLock(RWLockPriority priority);
~RWLock();
bool try_lock();
void lock();
void unlock();
bool try_lock_shared();
void lock_shared();
void unlock_shared();
private:
pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER;
};
/// Lockable is used as an custom implementation of a mutex mechanism.
///
/// It is implemented as a wrapper around std::lock_guard and std::unique_guard
/// with a default lock called Spinlock.
///
/// @tparam lock_t type of lock to be used (default = Spinlock)
template <class lock_t = SpinLock>
class Lockable {
public:
using lock_type = lock_t;
std::lock_guard<lock_t> acquire_guard() const {
return std::lock_guard<lock_t>(lock);
}
std::unique_lock<lock_t> acquire_unique() const {
return std::unique_lock<lock_t>(lock);
}
mutable lock_t lock;
};
class Futex {
using futex_t = uint32_t;
using flag_t = uint8_t;
/// Data structure for implementing fast mutexes
///
/// This structure is 4B wide, as required for futex system call where
/// the last two bytes are used for two flags - contended and locked,
/// respectively. Memory layout for the structure looks like this:
///
/// all
/// |---------------------------------|
/// 00000000 00000000 0000000C 0000000L
/// |------| |------|
/// contended locked
///
/// L marks the locked bit
/// C marks the contended bit
union mutex_t {
std::atomic<futex_t> all{0};
struct {
std::atomic<flag_t> locked;
std::atomic<flag_t> contended;
} state;
};
enum Contention : futex_t { UNCONTENDED = 0x0000, CONTENDED = 0x0100 };
enum State : futex_t {
UNLOCKED = 0x0000,
LOCKED = 0x0001,
UNLOCKED_CONTENDED = UNLOCKED | CONTENDED, // 0x0100
LOCKED_CONTENDED = LOCKED | CONTENDED // 0x0101
};
static constexpr size_t LOCK_RETRIES = 100;
static constexpr size_t UNLOCK_RETRIES = 200;
public:
Futex() {
static_assert(sizeof(mutex_t) == sizeof(futex_t),
"Atomic futex should be the same size as non_atomic");
}
bool try_lock() {
// we took the lock if we stored the LOCKED state and previous
// state was UNLOCKED
return mutex.state.locked.exchange(LOCKED, std::memory_order_seq_cst) ==
UNLOCKED;
}
void lock(const struct timespec *timeout = nullptr);
void unlock();
bool is_locked(std::memory_order order = std::memory_order_seq_cst) const {
return mutex.state.locked.load(order);
}
bool is_contended(std::memory_order order = std::memory_order_seq_cst) const {
return mutex.state.contended.load(order);
}
private:
mutex_t mutex;
int futex_wait(int value, const struct timespec *timeout = nullptr);
void futex_wake(int value);
void relax() { CpuRelax(); }
};
} // namespace utils

View File

@ -3,9 +3,11 @@
#include <random>
#include <thread>
#include "threading/sync/futex.hpp"
#include <glog/logging.h>
Futex futex;
#include "utils/thread/sync.hpp"
utils::Futex futex;
int x = 0;
/**
@ -19,7 +21,7 @@ void test_lock(int) {
// TODO: create long running test
for (int i = 0; i < 5; ++i) {
{
std::unique_lock<Futex> guard(futex);
std::unique_lock<utils::Futex> guard(futex);
x++;
std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen)));
CHECK(x == 1) << "Other thread shouldn't be able to "

View File

@ -6,16 +6,16 @@
#include "glog/logging.h"
#include "threading/sync/spinlock.hpp"
#include "utils/thread/sync.hpp"
int x = 0;
SpinLock lock;
utils::SpinLock lock;
void test_lock() {
using namespace std::literals;
{
std::unique_lock<SpinLock> guard(lock);
std::unique_lock<utils::SpinLock> guard(lock);
x++;
std::this_thread::sleep_for(25ms);

View File

@ -7,7 +7,7 @@
#include "stats/stats.hpp"
#include "stats/stats_rpc_messages.hpp"
#include "threading/sync/rwlock.hpp"
#include "utils/thread/sync.hpp"
#include "long_running_common.hpp"
@ -25,7 +25,7 @@ std::atomic<int64_t> num_cards;
std::atomic<int64_t> num_transactions;
std::atomic<int64_t> max_tx_id;
threading::RWLock world_lock(threading::RWLockPriority::WRITE);
utils::RWLock world_lock(utils::RWLockPriority::WRITE);
DEFINE_string(config, "", "test config");
@ -201,12 +201,12 @@ class CardFraudClient : public TestClient {
void AnalyticStep() {
std::this_thread::sleep_for(
std::chrono::milliseconds(config_["analytic"]["query_interval_ms"]));
std::shared_lock<threading::RWLock> lock(world_lock);
std::shared_lock<utils::RWLock> lock(world_lock);
GetCompromisedPosInc(config_["analytic"]["pos_limit"]);
}
void WorkerStep() {
std::shared_lock<threading::RWLock> lock(world_lock);
std::shared_lock<utils::RWLock> lock(world_lock);
bool is_fraud = UniformDouble(0, 1) < config_["fraud_probability"];
int64_t pos_id = UniformInt(0, num_pos - 1);
@ -243,7 +243,7 @@ class CardFraudClient : public TestClient {
void CleanupStep() {
if (num_transactions >= config_["cleanup"]["tx_hi"].get<int64_t>()) {
LOG(INFO) << "Trying to obtain world lock...";
std::unique_lock<threading::RWLock> lock(world_lock);
std::unique_lock<utils::RWLock> lock(world_lock);
int64_t id_limit = max_tx_id - config_["cleanup"]["tx_lo"].get<int>() + 1;
LOG(INFO) << "Transaction cleanup started, deleting transactions "
"with ids less than "

View File

@ -75,7 +75,7 @@ std::pair<communication::bolt::QueryData, int> ExecuteNTimesTillSuccess(
utils::Timer t;
std::chrono::microseconds to_sleep(rand_dist_(pseudo_rand_gen_));
while (t.Elapsed() < to_sleep) {
cpu_relax();
utils::CpuRelax();
}
}
}

View File

@ -8,7 +8,6 @@
#include "long_running_common.hpp"
#include "stats/stats.hpp"
#include "stats/stats_rpc_messages.hpp"
#include "threading/sync/rwlock.hpp"
class Graph500BfsClient : public TestClient {
public:

View File

@ -36,7 +36,7 @@ class TestClient {
virtual ~TestClient() {}
auto ConsumeStats() {
std::unique_lock<SpinLock> guard(lock_);
std::unique_lock<utils::SpinLock> guard(lock_);
auto stats = stats_;
stats_.clear();
return stats;
@ -76,7 +76,7 @@ class TestClient {
auto metadata = result.metadata;
metadata["wall_time"] = wall_time.count();
{
std::unique_lock<SpinLock> guard(lock_);
std::unique_lock<utils::SpinLock> guard(lock_);
if (query_name != "") {
stats_[query_name].push_back(std::move(metadata));
} else {
@ -88,7 +88,7 @@ class TestClient {
return result;
}
SpinLock lock_;
utils::SpinLock lock_;
std::unordered_map<std::string,
std::vector<std::map<std::string, DecodedValue>>>
stats_;

View File

@ -14,7 +14,6 @@
#include <json/json.hpp>
#include "io/network/utils.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/algorithm.hpp"
#include "utils/timer.hpp"

View File

@ -4,9 +4,9 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "threading/sync/spinlock.hpp"
#include "utils/algorithm.hpp"
#include "utils/string.hpp"
#include "utils/thread/sync.hpp"
#include "utils/timer.hpp"
#include "common.hpp"
@ -47,7 +47,7 @@ void ExecuteQueries(const std::vector<std::string> &queries,
std::ostream &ostream) {
std::vector<std::thread> threads;
SpinLock spinlock;
utils::SpinLock spinlock;
uint64_t last = 0;
std::vector<std::map<std::string, DecodedValue>> metadata;
@ -67,7 +67,7 @@ void ExecuteQueries(const std::vector<std::string> &queries,
while (true) {
uint64_t pos;
{
std::lock_guard<SpinLock> lock(spinlock);
std::lock_guard<utils::SpinLock> lock(spinlock);
if (last == queries.size()) {
break;
}

View File

@ -51,7 +51,7 @@ int main(int argc, char *argv[]) {
{"card", rint(kCardCount)},
{"tx", tx_counter++}});
CheckResults(res, {{1}}, "Transaction creation");
} catch (LockTimeoutException &) {
} catch (utils::LockTimeoutException &) {
--i;
} catch (mvcc::SerializationError &) {
--i;

View File

@ -145,15 +145,9 @@ target_link_libraries(${test_prefix}mvcc_one_transaction memgraph_lib)
add_unit_test(mvcc_parallel_update.cpp)
target_link_libraries(${test_prefix}mvcc_parallel_update memgraph_lib)
add_unit_test(network_endpoint.cpp)
target_link_libraries(${test_prefix}network_endpoint memgraph_lib)
add_unit_test(network_timeouts.cpp)
target_link_libraries(${test_prefix}network_timeouts memgraph_lib)
add_unit_test(network_utils.cpp)
target_link_libraries(${test_prefix}network_utils memgraph_lib)
add_unit_test(property_value_store.cpp)
target_link_libraries(${test_prefix}property_value_store memgraph_lib)
@ -199,18 +193,12 @@ target_link_libraries(${test_prefix}raft_storage memgraph_lib)
add_unit_test(record_edge_vertex_accessor.cpp)
target_link_libraries(${test_prefix}record_edge_vertex_accessor memgraph_lib)
add_unit_test(ring_buffer.cpp)
target_link_libraries(${test_prefix}ring_buffer memgraph_lib)
add_unit_test(rpc.cpp)
target_link_libraries(${test_prefix}rpc memgraph_lib)
add_unit_test(rpc_worker_clients.cpp)
target_link_libraries(${test_prefix}rpc_worker_clients memgraph_lib)
add_unit_test(rwlock.cpp)
target_link_libraries(${test_prefix}rwlock memgraph_lib)
add_unit_test(serialization.cpp)
target_link_libraries(${test_prefix}serialization memgraph_lib)
@ -241,9 +229,6 @@ target_link_libraries(${test_prefix}storage_address memgraph_lib)
add_unit_test(stripped.cpp)
target_link_libraries(${test_prefix}stripped memgraph_lib)
add_unit_test(thread_pool.cpp)
target_link_libraries(${test_prefix}thread_pool memgraph_lib)
add_unit_test(transaction_engine_distributed.cpp)
target_link_libraries(${test_prefix}transaction_engine_distributed memgraph_lib)
@ -253,6 +238,19 @@ target_link_libraries(${test_prefix}transaction_engine_single_node memgraph_lib)
add_unit_test(typed_value.cpp)
target_link_libraries(${test_prefix}typed_value memgraph_lib)
# Test data structures
add_unit_test(ring_buffer.cpp)
target_link_libraries(${test_prefix}ring_buffer mg-utils)
# Test mg-io
add_unit_test(network_endpoint.cpp)
target_link_libraries(${test_prefix}network_endpoint mg-io)
add_unit_test(network_utils.cpp)
target_link_libraries(${test_prefix}network_utils mg-io)
# Test mg-utils
add_unit_test(utils_demangle.cpp)
@ -270,12 +268,18 @@ target_link_libraries(${test_prefix}utils_on_scope_exit mg-utils)
add_unit_test(utils_scheduler.cpp)
target_link_libraries(${test_prefix}utils_scheduler mg-utils)
add_unit_test(utils_rwlock.cpp)
target_link_libraries(${test_prefix}utils_rwlock mg-utils)
add_unit_test(utils_signals.cpp)
target_link_libraries(${test_prefix}utils_signals mg-utils)
add_unit_test(utils_string.cpp)
target_link_libraries(${test_prefix}utils_string mg-utils)
add_unit_test(utils_thread_pool.cpp)
target_link_libraries(${test_prefix}utils_thread_pool mg-utils)
add_unit_test(utils_timestamp.cpp)
target_link_libraries(${test_prefix}utils_timestamp mg-utils)

View File

@ -814,7 +814,7 @@ TEST_F(Durability, SequentialRecovery) {
auto v = dba.FindVertex(random_int(kNumVertices), false);
try {
v.PropsSet(dba.Property("prop"), random_int(100));
} catch (LockTimeoutException &) {
} catch (utils::LockTimeoutException &) {
} catch (mvcc::SerializationError &) {
}
dba.InsertVertex();

View File

@ -4,9 +4,9 @@
#include "mvcc/record.hpp"
#include "mvcc/version.hpp"
#include "mvcc/version_list.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/transaction.hpp"
#include "utils/thread/sync.hpp"
#include "mvcc_gc_common.hpp"
@ -23,7 +23,7 @@ TEST(MVCC, Deadlock) {
version_list1.update(*t1);
version_list2.update(*t2);
EXPECT_THROW(version_list1.update(*t2), LockTimeoutException);
EXPECT_THROW(version_list1.update(*t2), utils::LockTimeoutException);
}
// TODO Gleich: move this test to mvcc_gc???

View File

@ -4,7 +4,6 @@
#include "mvcc/record.hpp"
#include "mvcc/version.hpp"
#include "mvcc/version_list.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/transaction.hpp"

View File

@ -4,7 +4,7 @@
#include "gtest/gtest.h"
#include "data_structures/ring_buffer.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/thread/sync.hpp"
TEST(RingBuffer, MultithreadedUsage) {
auto test_f = [](int producer_count, int elems_per_producer,
@ -12,7 +12,7 @@ TEST(RingBuffer, MultithreadedUsage) {
int consumer_sleep_ms) {
std::unordered_set<int> consumed;
SpinLock consumed_lock;
utils::SpinLock consumed_lock;
RingBuffer<int> buffer{20};
std::vector<std::thread> producers;
@ -34,7 +34,7 @@ TEST(RingBuffer, MultithreadedUsage) {
while (true) {
std::this_thread::sleep_for(
std::chrono::milliseconds(consumer_sleep_ms));
std::lock_guard<SpinLock> guard(consumed_lock);
std::lock_guard<utils::SpinLock> guard(consumed_lock);
if (consumed.size() == elem_total_count) break;
auto value = buffer.pop();
if (value) consumed.emplace(*value);

View File

@ -4,13 +4,13 @@
#include "glog/logging.h"
#include "gtest/gtest.h"
#include "threading/sync/rwlock.hpp"
#include "utils/thread/sync.hpp"
#include "utils/timer.hpp"
using namespace std::chrono_literals;
using threading::RWLock;
using threading::RWLockPriority;
using utils::RWLock;
using utils::RWLockPriority;
TEST(RWLock, MultipleReaders) {
RWLock rwlock(RWLockPriority::READ);

View File

@ -5,12 +5,12 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "threading/thread_pool.hpp"
#include "utils/future.hpp"
#include "utils/thread.hpp"
#include "utils/timer.hpp"
TEST(ThreadPool, RunMany) {
threading::ThreadPool tp(10);
utils::ThreadPool tp(10);
const int kResults = 10000;
std::vector<utils::Future<int>> results;
for (int i = 0; i < kResults; ++i) {
@ -26,7 +26,7 @@ TEST(ThreadPool, EnsureParallel) {
using namespace std::chrono_literals;
const int kSize = 10;
threading::ThreadPool tp(kSize);
utils::ThreadPool tp(kSize);
std::vector<utils::Future<void>> results;
utils::Timer t;