From c7b6cae526fe9ebf3c3c378b75fa53f0805f3dbd Mon Sep 17 00:00:00 2001 From: Teon Banek Date: Wed, 30 May 2018 13:00:25 +0200 Subject: [PATCH] Extract io/network into mg-io library Reviewers: buda, dgleich, mferencevic Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1411 --- src/CMakeLists.txt | 12 +- src/communication/bolt/v1/session.hpp | 1 - src/communication/listener.hpp | 10 +- src/communication/session.hpp | 8 +- src/data_structures/bitset/dynamic_bitset.hpp | 2 - src/data_structures/concurrent/skiplist.hpp | 14 +- .../concurrent/skiplist_gc.hpp | 2 +- src/data_structures/ring_buffer.hpp | 10 +- src/database/storage_gc.hpp | 1 + src/distributed/produce_rpc_server.cpp | 2 +- src/distributed/rpc_worker_clients.hpp | 5 +- src/distributed/updates_rpc_clients.cpp | 3 +- src/distributed/updates_rpc_server.cpp | 14 +- src/distributed/updates_rpc_server.hpp | 4 +- src/io/CMakeLists.txt | 10 + src/io/network/socket.cpp | 1 - src/io/network/stream_listener.hpp | 0 src/mvcc/version_list.hpp | 1 - src/query/interpreter.cpp | 4 +- src/query/interpreter.hpp | 4 +- src/query/plan/operator.cpp | 7 +- src/storage/dynamic_graph_partitioner/dgp.cpp | 3 +- src/storage/locking/record_lock.cpp | 8 +- src/storage/locking/record_lock.hpp | 2 - src/storage/record_accessor.cpp | 4 +- src/threading/sync/caslock.hpp | 23 -- src/threading/sync/cpu_relax.hpp | 11 - src/threading/sync/futex.hpp | 150 ----------- src/threading/sync/lock_timeout_exception.hpp | 8 - src/threading/sync/lockable.hpp | 31 --- src/threading/sync/rwlock.hpp | 46 ---- src/threading/sync/spinlock.hpp | 34 --- src/threading/sync/timed_spinlock.hpp | 39 --- src/threading/thread.cpp | 12 - src/threading/thread.hpp | 39 --- src/threading/thread_pool.hpp | 82 ------ src/transactions/engine.hpp | 1 - src/transactions/engine_single_node.cpp | 24 +- src/transactions/engine_single_node.hpp | 4 +- src/transactions/lock_store.hpp | 6 +- src/transactions/transaction.hpp | 2 - src/utils/CMakeLists.txt | 2 + src/utils/random_graph_generator.hpp | 1 - src/utils/thread.cpp | 52 ++++ src/utils/thread.hpp | 100 +++++++- .../sync/rwlock.cpp => utils/thread/sync.cpp} | 80 +++++- src/utils/thread/sync.hpp | 235 ++++++++++++++++++ tests/concurrent/futex.cpp | 8 +- tests/concurrent/spinlock.cpp | 6 +- .../clients/card_fraud_client.cpp | 10 +- tests/macro_benchmark/clients/common.hpp | 2 +- .../macro_benchmark/clients/graph_500_bfs.cpp | 1 - .../clients/long_running_common.hpp | 6 +- .../macro_benchmark/clients/pokec_client.cpp | 1 - .../macro_benchmark/clients/query_client.cpp | 6 +- tests/manual/card_fraud_local.cpp | 2 +- tests/unit/CMakeLists.txt | 34 +-- tests/unit/durability.cpp | 2 +- tests/unit/mvcc.cpp | 4 +- tests/unit/mvcc_find_update_common.hpp | 1 - tests/unit/ring_buffer.cpp | 6 +- tests/unit/{rwlock.cpp => utils_rwlock.cpp} | 6 +- ...{thread_pool.cpp => utils_thread_pool.cpp} | 6 +- 63 files changed, 590 insertions(+), 625 deletions(-) create mode 100644 src/io/CMakeLists.txt delete mode 100644 src/io/network/stream_listener.hpp delete mode 100644 src/threading/sync/caslock.hpp delete mode 100644 src/threading/sync/cpu_relax.hpp delete mode 100644 src/threading/sync/futex.hpp delete mode 100644 src/threading/sync/lock_timeout_exception.hpp delete mode 100644 src/threading/sync/lockable.hpp delete mode 100644 src/threading/sync/rwlock.hpp delete mode 100644 src/threading/sync/spinlock.hpp delete mode 100644 src/threading/sync/timed_spinlock.hpp delete mode 100644 src/threading/thread.cpp delete mode 100644 src/threading/thread.hpp delete mode 100644 src/threading/thread_pool.hpp create mode 100644 src/utils/thread.cpp rename src/{threading/sync/rwlock.cpp => utils/thread/sync.cpp} (50%) create mode 100644 src/utils/thread/sync.hpp rename tests/unit/{rwlock.cpp => utils_rwlock.cpp} (97%) rename tests/unit/{thread_pool.cpp => utils_thread_pool.cpp} (89%) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8c36be9ea..87a5ae3a0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,6 +2,7 @@ # add memgraph sub libraries add_subdirectory(utils) +add_subdirectory(io) # all memgraph src files set(memgraph_src_files @@ -40,10 +41,6 @@ set(memgraph_src_files durability/recovery.cpp durability/snapshooter.cpp durability/wal.cpp - io/network/addrinfo.cpp - io/network/endpoint.cpp - io/network/socket.cpp - io/network/utils.cpp query/common.cpp query/repl.cpp query/frontend/ast/ast.cpp @@ -70,8 +67,6 @@ set(memgraph_src_files storage/property_value_store.cpp storage/record_accessor.cpp storage/vertex_accessor.cpp - threading/sync/rwlock.cpp - threading/thread.cpp transactions/engine_master.cpp transactions/engine_single_node.cpp transactions/engine_worker.cpp @@ -132,7 +127,8 @@ string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type) set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools antlr_opencypher_parser_lib dl glog gflags capnp kj ${Boost_IOSTREAMS_LIBRARY_RELEASE} - ${Boost_SERIALIZATION_LIBRARY_RELEASE}) + ${Boost_SERIALIZATION_LIBRARY_RELEASE} + mg-utils mg-io) if (USE_LTALLOC) list(APPEND MEMGRAPH_ALL_LIBS ltalloc) @@ -146,7 +142,7 @@ endif() # STATIC library used by memgraph executables add_library(memgraph_lib STATIC ${memgraph_src_files}) -target_link_libraries(memgraph_lib ${MEMGRAPH_ALL_LIBS} mg-utils) +target_link_libraries(memgraph_lib ${MEMGRAPH_ALL_LIBS}) add_dependencies(memgraph_lib generate_opencypher_parser) add_dependencies(memgraph_lib generate_lcp) add_dependencies(memgraph_lib generate_capnp) diff --git a/src/communication/bolt/v1/session.hpp b/src/communication/bolt/v1/session.hpp index 99122106a..929376d9f 100644 --- a/src/communication/bolt/v1/session.hpp +++ b/src/communication/bolt/v1/session.hpp @@ -17,7 +17,6 @@ #include "communication/buffer.hpp" #include "database/graph_db.hpp" #include "query/interpreter.hpp" -#include "threading/sync/spinlock.hpp" #include "transactions/transaction.hpp" #include "utils/exceptions.hpp" diff --git a/src/communication/listener.hpp b/src/communication/listener.hpp index d0a9a61b3..1de89f6ab 100644 --- a/src/communication/listener.hpp +++ b/src/communication/listener.hpp @@ -13,8 +13,8 @@ #include "communication/session.hpp" #include "io/network/epoll.hpp" #include "io/network/socket.hpp" -#include "threading/sync/spinlock.hpp" #include "utils/thread.hpp" +#include "utils/thread/sync.hpp" namespace communication { @@ -50,7 +50,7 @@ class Listener { utils::ThreadSetName(fmt::format("{} timeout", service_name)); while (alive_) { { - std::unique_lock guard(lock_); + std::unique_lock guard(lock_); for (auto &session : sessions_) { if (session->TimedOut()) { LOG(WARNING) << service_name << " session associated with " @@ -86,7 +86,7 @@ class Listener { * @param connection socket which should be added to the event pool */ void AddConnection(io::network::Socket &&connection) { - std::unique_lock guard(lock_); + std::unique_lock guard(lock_); // Set connection options. // The socket is left to be a blocking socket, but when `Read` is called @@ -202,7 +202,7 @@ class Listener { // https://idea.popcount.org/2017-03-20-epoll-is-fundamentally-broken-22/ epoll_.Delete(session.socket().fd()); - std::unique_lock guard(lock_); + std::unique_lock guard(lock_); auto it = std::find_if(sessions_.begin(), sessions_.end(), [&](const auto &l) { return l.get() == &session; }); @@ -220,7 +220,7 @@ class Listener { TSessionData &data_; - SpinLock lock_; + utils::SpinLock lock_; std::vector> sessions_; std::thread thread_; diff --git a/src/communication/session.hpp b/src/communication/session.hpp index 0899a2034..2cbab652e 100644 --- a/src/communication/session.hpp +++ b/src/communication/session.hpp @@ -12,8 +12,8 @@ #include "communication/buffer.hpp" #include "io/network/socket.hpp" #include "io/network/stream_buffer.hpp" -#include "threading/sync/spinlock.hpp" #include "utils/exceptions.hpp" +#include "utils/thread/sync.hpp" namespace communication { @@ -126,7 +126,7 @@ class Session { * different threads in the network stack. */ bool TimedOut() { - std::unique_lock guard(lock_); + std::unique_lock guard(lock_); return last_event_time_ + std::chrono::seconds(inactivity_timeout_sec_) < std::chrono::steady_clock::now(); } @@ -138,7 +138,7 @@ class Session { private: void RefreshLastEventTime() { - std::unique_lock guard(lock_); + std::unique_lock guard(lock_); last_event_time_ = std::chrono::steady_clock::now(); } @@ -155,7 +155,7 @@ class Session { // Time of the last event and associated lock. std::chrono::time_point last_event_time_{ std::chrono::steady_clock::now()}; - SpinLock lock_; + utils::SpinLock lock_; const int inactivity_timeout_sec_; }; } // namespace communication diff --git a/src/data_structures/bitset/dynamic_bitset.hpp b/src/data_structures/bitset/dynamic_bitset.hpp index eaae65c39..5a0275fa5 100644 --- a/src/data_structures/bitset/dynamic_bitset.hpp +++ b/src/data_structures/bitset/dynamic_bitset.hpp @@ -3,8 +3,6 @@ #include #include "glog/logging.h" -#include "threading/sync/lockable.hpp" -#include "threading/sync/spinlock.hpp" /** * A sequentially ordered non-unique lock-free concurrent collection of bits. diff --git a/src/data_structures/concurrent/skiplist.hpp b/src/data_structures/concurrent/skiplist.hpp index f4609090a..c1ac7c03d 100644 --- a/src/data_structures/concurrent/skiplist.hpp +++ b/src/data_structures/concurrent/skiplist.hpp @@ -6,14 +6,12 @@ #include #include "glog/logging.h" + +#include "data_structures/concurrent/skiplist_gc.hpp" #include "utils/crtp.hpp" #include "utils/placeholder.hpp" #include "utils/random/fast_binomial.hpp" - -#include "threading/sync/lockable.hpp" -#include "threading/sync/spinlock.hpp" - -#include "data_structures/concurrent/skiplist_gc.hpp" +#include "utils/thread/sync.hpp" /** * computes the height for the new node from the interval [1...H] @@ -102,8 +100,8 @@ static thread_local utils::random::FastBinomial<> rnd; * @tparam lock_t Lock type used when locking is needed during the creation * and deletion of nodes. */ -template -class SkipList : private Lockable { +template +class SkipList : private utils::Lockable { public: /** @brief Wrapper class for flags used in the implementation * @@ -134,7 +132,7 @@ class SkipList : private Lockable { std::atomic flags{0}; }; - class Node : Lockable { + class Node : utils::Lockable { public: friend class SkipList; diff --git a/src/data_structures/concurrent/skiplist_gc.hpp b/src/data_structures/concurrent/skiplist_gc.hpp index f72f063d6..381109b99 100644 --- a/src/data_structures/concurrent/skiplist_gc.hpp +++ b/src/data_structures/concurrent/skiplist_gc.hpp @@ -12,8 +12,8 @@ #include "data_structures/concurrent/push_queue.hpp" -#include "threading/sync/spinlock.hpp" #include "utils/executor.hpp" +#include "utils/thread/sync.hpp" DECLARE_int32(skiplist_gc_interval); diff --git a/src/data_structures/ring_buffer.hpp b/src/data_structures/ring_buffer.hpp index a293f1b46..580366d65 100644 --- a/src/data_structures/ring_buffer.hpp +++ b/src/data_structures/ring_buffer.hpp @@ -9,7 +9,7 @@ #include "glog/logging.h" -#include "threading/sync/spinlock.hpp" +#include "utils/thread/sync.hpp" /** * A thread-safe ring buffer. Multi-producer, multi-consumer. Producers get @@ -43,7 +43,7 @@ class RingBuffer { void emplace(TArgs &&... args) { while (true) { { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); if (size_ < capacity_) { buffer_[write_pos_++] = TElement(std::forward(args)...); write_pos_ %= capacity_; @@ -64,7 +64,7 @@ class RingBuffer { * empty, nullopt is returned. */ std::experimental::optional pop() { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); if (size_ == 0) return std::experimental::nullopt; size_--; std::experimental::optional result( @@ -75,7 +75,7 @@ class RingBuffer { /** Removes all elements from the buffer. */ void clear() { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); read_pos_ = 0; write_pos_ = 0; size_ = 0; @@ -84,7 +84,7 @@ class RingBuffer { private: int capacity_; TElement *buffer_; - SpinLock lock_; + utils::SpinLock lock_; int read_pos_{0}; int write_pos_{0}; int size_{0}; diff --git a/src/database/storage_gc.hpp b/src/database/storage_gc.hpp index 241849224..990cb0255 100644 --- a/src/database/storage_gc.hpp +++ b/src/database/storage_gc.hpp @@ -14,6 +14,7 @@ #include "storage/vertex.hpp" #include "transactions/engine.hpp" #include "utils/scheduler.hpp" +#include "utils/timer.hpp" namespace database { diff --git a/src/distributed/produce_rpc_server.cpp b/src/distributed/produce_rpc_server.cpp index e7926113c..1b3d3a2e7 100644 --- a/src/distributed/produce_rpc_server.cpp +++ b/src/distributed/produce_rpc_server.cpp @@ -72,7 +72,7 @@ ProduceRpcServer::OngoingProduce::PullOneFromCursor() { } } catch (const mvcc::SerializationError &) { cursor_state_ = PullState::SERIALIZATION_ERROR; - } catch (const LockTimeoutException &) { + } catch (const utils::LockTimeoutException &) { cursor_state_ = PullState::LOCK_TIMEOUT_ERROR; } catch (const RecordDeletedError &) { cursor_state_ = PullState::UPDATE_DELETED_ERROR; diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp index 01c5abc6a..5b7c5b043 100644 --- a/src/distributed/rpc_worker_clients.hpp +++ b/src/distributed/rpc_worker_clients.hpp @@ -11,9 +11,8 @@ #include "distributed/transactional_cache_cleaner_rpc_messages.hpp" #include "storage/types.hpp" #include "transactions/transaction.hpp" - -#include "threading/thread_pool.hpp" #include "utils/future.hpp" +#include "utils/thread.hpp" namespace distributed { @@ -76,7 +75,7 @@ class RpcWorkerClients { Coordination &coordination_; std::unordered_map client_pools_; std::mutex lock_; - threading::ThreadPool thread_pool_; + utils::ThreadPool thread_pool_; }; /** Wrapper class around a RPC call to build indices. diff --git a/src/distributed/updates_rpc_clients.cpp b/src/distributed/updates_rpc_clients.cpp index 0d29c5203..42e5f8ef7 100644 --- a/src/distributed/updates_rpc_clients.cpp +++ b/src/distributed/updates_rpc_clients.cpp @@ -4,6 +4,7 @@ #include "distributed/updates_rpc_clients.hpp" #include "query/exceptions.hpp" +#include "utils/thread/sync.hpp" namespace distributed { @@ -15,7 +16,7 @@ void RaiseIfRemoteError(UpdateResult result) { case UpdateResult::SERIALIZATION_ERROR: throw mvcc::SerializationError(); case UpdateResult::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException( + throw utils::LockTimeoutException( "Remote LockTimeoutError during edge creation"); case UpdateResult::UPDATE_DELETED_ERROR: throw RecordDeletedError(); diff --git a/src/distributed/updates_rpc_server.cpp b/src/distributed/updates_rpc_server.cpp index 09e25a569..d3b0d9044 100644 --- a/src/distributed/updates_rpc_server.cpp +++ b/src/distributed/updates_rpc_server.cpp @@ -3,7 +3,7 @@ #include "glog/logging.h" #include "distributed/updates_rpc_server.hpp" -#include "threading/sync/lock_timeout_exception.hpp" +#include "utils/thread/sync.hpp" namespace distributed { @@ -13,7 +13,7 @@ UpdateResult UpdatesRpcServer::TransactionUpdates::Emplace( auto gid = std::is_same::value ? delta.vertex_id : delta.edge_id; - std::lock_guard guard{lock_}; + std::lock_guard guard{lock_}; auto found = deltas_.find(gid); if (found == deltas_.end()) { found = @@ -52,7 +52,7 @@ UpdateResult UpdatesRpcServer::TransactionUpdates::Emplace( // return UpdateResult::SERIALIZATION_ERROR; // } catch (const RecordDeletedError &) { // return UpdateResult::UPDATE_DELETED_ERROR; - // } catch (const LockTimeoutException &) { + // } catch (const utils::LockTimeoutException &) { // return UpdateResult::LOCK_TIMEOUT_ERROR; // } return UpdateResult::DONE; @@ -66,7 +66,7 @@ gid::Gid UpdatesRpcServer::TransactionUpdates::CreateVertex( auto result = db_accessor_.InsertVertex(); for (auto &label : labels) result.add_label(label); for (auto &kv : properties) result.PropsSet(kv.first, kv.second); - std::lock_guard guard{lock_}; + std::lock_guard guard{lock_}; deltas_.emplace(result.gid(), std::make_pair(result, std::vector{})); return result.gid(); @@ -80,7 +80,7 @@ gid::Gid UpdatesRpcServer::TransactionUpdates::CreateEdge( storage::VertexAddress(from, db.WorkerId())); auto to_addr = db.storage().LocalizedAddressIfPossible(to); auto edge = db_accessor_.InsertOnlyEdge(from_addr, to_addr, edge_type); - std::lock_guard guard{lock_}; + std::lock_guard guard{lock_}; deltas_.emplace(edge.gid(), std::make_pair(edge, std::vector{})); return edge.gid(); @@ -88,7 +88,7 @@ gid::Gid UpdatesRpcServer::TransactionUpdates::CreateEdge( template UpdateResult UpdatesRpcServer::TransactionUpdates::Apply() { - std::lock_guard guard{lock_}; + std::lock_guard guard{lock_}; for (auto &kv : deltas_) { auto &record_accessor = kv.second.first; // We need to reconstruct the record as in the meantime some local @@ -164,7 +164,7 @@ UpdateResult UpdatesRpcServer::TransactionUpdates::Apply() { return UpdateResult::SERIALIZATION_ERROR; } catch (const RecordDeletedError &) { return UpdateResult::UPDATE_DELETED_ERROR; - } catch (const LockTimeoutException &) { + } catch (const utils::LockTimeoutException &) { return UpdateResult::LOCK_TIMEOUT_ERROR; } } diff --git a/src/distributed/updates_rpc_server.hpp b/src/distributed/updates_rpc_server.hpp index de3bef334..dc81d9eb6 100644 --- a/src/distributed/updates_rpc_server.hpp +++ b/src/distributed/updates_rpc_server.hpp @@ -16,8 +16,8 @@ #include "storage/gid.hpp" #include "storage/types.hpp" #include "storage/vertex_accessor.hpp" -#include "threading/sync/spinlock.hpp" #include "transactions/type.hpp" +#include "utils/thread/sync.hpp" namespace distributed { @@ -62,7 +62,7 @@ class UpdatesRpcServer { gid::Gid, std::pair>> deltas_; // Multiple workers might be sending remote updates concurrently. - SpinLock lock_; + utils::SpinLock lock_; // Helper method specialized for [Vertex|Edge]Accessor. TRecordAccessor FindAccessor(gid::Gid gid); diff --git a/src/io/CMakeLists.txt b/src/io/CMakeLists.txt new file mode 100644 index 000000000..6c96a87ad --- /dev/null +++ b/src/io/CMakeLists.txt @@ -0,0 +1,10 @@ +set(io_src_files + network/addrinfo.cpp + network/endpoint.cpp + network/socket.cpp + network/utils.cpp) + +add_library(mg-io STATIC ${io_src_files}) +target_link_libraries(mg-io stdc++fs Threads::Threads fmt glog mg-utils) +# TODO: Remove this dependency when we switch to capnp +target_link_libraries(mg-io ${Boost_SERIALIZATION_LIBRARY_RELEASE}) diff --git a/src/io/network/socket.cpp b/src/io/network/socket.cpp index e8f427f96..b2ea57c2b 100644 --- a/src/io/network/socket.cpp +++ b/src/io/network/socket.cpp @@ -19,7 +19,6 @@ #include "glog/logging.h" #include "io/network/addrinfo.hpp" -#include "threading/sync/cpu_relax.hpp" #include "utils/likely.hpp" namespace io::network { diff --git a/src/io/network/stream_listener.hpp b/src/io/network/stream_listener.hpp deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/mvcc/version_list.hpp b/src/mvcc/version_list.hpp index 96cfa7333..9f63e0fe5 100644 --- a/src/mvcc/version_list.hpp +++ b/src/mvcc/version_list.hpp @@ -2,7 +2,6 @@ #include "storage/gid.hpp" #include "storage/locking/record_lock.hpp" -#include "threading/sync/lockable.hpp" #include "transactions/transaction.hpp" #include "utils/exceptions.hpp" diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 9245ed3fc..391541938 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -162,7 +162,7 @@ AstTreeStorage Interpreter::QueryToAst(const StrippedQuery &stripped, // stripped query -> AST auto parser = [&] { // Be careful about unlocking since parser can throw. - std::unique_lock guard(antlr_lock_); + std::unique_lock guard(antlr_lock_); return std::make_unique( stripped.original_query()); }(); @@ -178,7 +178,7 @@ AstTreeStorage Interpreter::QueryToAst(const StrippedQuery &stripped, // stripped query -> AST auto parser = [&] { // Be careful about unlocking since parser can throw. - std::unique_lock guard(antlr_lock_); + std::unique_lock guard(antlr_lock_); try { return std::make_unique(stripped.query()); } catch (const SyntaxException &e) { diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 1c11d316f..2e6788a2f 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -11,7 +11,7 @@ #include "query/interpret/frame.hpp" #include "query/plan/distributed.hpp" #include "query/plan/operator.hpp" -#include "threading/sync/spinlock.hpp" +#include "utils/thread/sync.hpp" #include "utils/timer.hpp" DECLARE_int32(query_plan_cache_ttl); @@ -173,7 +173,7 @@ class Interpreter { // can remove this lock. This will probably never happen since antlr // developers introduce more bugs in each version. Fortunately, we have cache // so this lock probably won't impact performance much... - SpinLock antlr_lock_; + utils::SpinLock antlr_lock_; // Optional, not null only in a distributed master. distributed::PlanDispatcher *plan_dispatcher_{nullptr}; diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 9ea19f02c..42ce06778 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -30,6 +30,7 @@ #include "utils/algorithm.hpp" #include "utils/exceptions.hpp" #include "utils/hashing/fnv.hpp" +#include "utils/thread/sync.hpp" DEFINE_HIDDEN_int32(remote_pull_sleep_micros, 10, "Sleep between remote result pulling in microseconds"); @@ -3266,7 +3267,7 @@ class RemotePuller { throw mvcc::SerializationError( "Serialization error occured during PullRemote !"); case distributed::PullState::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException( + throw utils::LockTimeoutException( "LockTimeout error occured during PullRemote !"); case distributed::PullState::UPDATE_DELETED_ERROR: throw QueryRuntimeException( @@ -3528,7 +3529,7 @@ class SynchronizeCursor : public Cursor { "Failed to perform remote accumulate due to " "RecordDeletedError"); case distributed::PullState::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException( + throw utils::LockTimeoutException( "Failed to perform remote accumulate due to " "LockTimeoutException"); case distributed::PullState::RECONSTRUCTION_ERROR: @@ -3566,7 +3567,7 @@ class SynchronizeCursor : public Cursor { throw QueryRuntimeException( "Failed to apply deferred updates due to RecordDeletedError"); case distributed::UpdateResult::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException( + throw utils::LockTimeoutException( "Failed to apply deferred update due to LockTimeoutException"); case distributed::UpdateResult::DONE: break; diff --git a/src/storage/dynamic_graph_partitioner/dgp.cpp b/src/storage/dynamic_graph_partitioner/dgp.cpp index 956fb7da1..c4c5633ff 100644 --- a/src/storage/dynamic_graph_partitioner/dgp.cpp +++ b/src/storage/dynamic_graph_partitioner/dgp.cpp @@ -9,6 +9,7 @@ #include "query/exceptions.hpp" #include "storage/dynamic_graph_partitioner/vertex_migrator.hpp" #include "utils/flag_validation.hpp" +#include "utils/thread/sync.hpp" DEFINE_VALIDATED_int32( dgp_improvement_threshold, 10, @@ -52,7 +53,7 @@ void DynamicGraphPartitioner::Run() { throw query::QueryRuntimeException( "Failed to apply deferred updates due to RecordDeletedError"); case distributed::UpdateResult::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException( + throw utils::LockTimeoutException( "Failed to apply deferred update due to LockTimeoutException"); case distributed::UpdateResult::DONE: break; diff --git a/src/storage/locking/record_lock.cpp b/src/storage/locking/record_lock.cpp index 98d5a691b..0eb8531ff 100644 --- a/src/storage/locking/record_lock.cpp +++ b/src/storage/locking/record_lock.cpp @@ -6,9 +6,9 @@ #include #include -#include "threading/sync/lock_timeout_exception.hpp" #include "transactions/engine.hpp" #include "utils/on_scope_exit.hpp" +#include "utils/thread/sync.hpp" #include "utils/timer.hpp" namespace { @@ -102,7 +102,7 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) { // Message could be incorrect. Transaction could be aborted because it was // running for too long time, but that is unlikely and it is not very // important which exception (and message) we throw here. - throw LockTimeoutException( + throw utils::LockTimeoutException( "Transaction was aborted since it was oldest in a lock cycle"); } if (TryLock(tx.id_)) { @@ -119,10 +119,10 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) { it->second = owner; abort_oldest_tx_in_lock_cycle(); } - cpu_relax(); + utils::CpuRelax(); } - throw LockTimeoutException(fmt::format( + throw utils::LockTimeoutException(fmt::format( "Transaction locked for more than {} seconds", kTimeout.count())); } diff --git a/src/storage/locking/record_lock.hpp b/src/storage/locking/record_lock.hpp index 7ecd12775..231554f73 100644 --- a/src/storage/locking/record_lock.hpp +++ b/src/storage/locking/record_lock.hpp @@ -4,9 +4,7 @@ #include #include -#include "data_structures/concurrent/concurrent_map.hpp" #include "storage/locking/lock_status.hpp" -#include "threading/sync/futex.hpp" #include "transactions/type.hpp" namespace tx { diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 32fb27e3c..675edd303 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -8,7 +8,7 @@ #include "storage/edge.hpp" #include "storage/record_accessor.hpp" #include "storage/vertex.hpp" -#include "threading/sync/lock_timeout_exception.hpp" +#include "utils/thread/sync.hpp" using database::StateDelta; @@ -214,7 +214,7 @@ void RecordAccessor::SendDelta( case distributed::UpdateResult::UPDATE_DELETED_ERROR: throw RecordDeletedError(); case distributed::UpdateResult::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException("Lock timeout on remote worker"); + throw utils::LockTimeoutException("Lock timeout on remote worker"); } } diff --git a/src/threading/sync/caslock.hpp b/src/threading/sync/caslock.hpp deleted file mode 100644 index ad1ac8689..000000000 --- a/src/threading/sync/caslock.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#include -#include - -class CasLock { - public: - void lock() { - bool locked = false; - - while (!lock_flag.compare_exchange_weak( - locked, true, std::memory_order_release, std::memory_order_relaxed)) { - usleep(250); - } - } - - void unlock() { lock_flag.store(0, std::memory_order_release); } - - bool locked() { return lock_flag.load(std::memory_order_relaxed); } - - private: - std::atomic lock_flag; -}; diff --git a/src/threading/sync/cpu_relax.hpp b/src/threading/sync/cpu_relax.hpp deleted file mode 100644 index 442af7044..000000000 --- a/src/threading/sync/cpu_relax.hpp +++ /dev/null @@ -1,11 +0,0 @@ -#pragma once - -/* @brief improves contention in spinlocks - * hints the processor that we're in a spinlock and not doing much - */ -inline void cpu_relax() { - // if IBMPower - // HMT_very_low() - // http://stackoverflow.com/questions/5425506/equivalent-of-x86-pause-instruction-for-ppc - asm("PAUSE"); -} diff --git a/src/threading/sync/futex.hpp b/src/threading/sync/futex.hpp deleted file mode 100644 index 3f200664a..000000000 --- a/src/threading/sync/futex.hpp +++ /dev/null @@ -1,150 +0,0 @@ -#pragma once - -#include -#include -#include - -#include -#include -#include -#include - -#include "glog/logging.h" -#include "threading/sync/cpu_relax.hpp" -#include "threading/sync/lock_timeout_exception.hpp" - -namespace sys { -inline int futex(void *addr1, int op, int val1, const struct timespec *timeout, - void *addr2, int val3) { - return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); -}; -} // namespace sys - -class Futex { - using futex_t = uint32_t; - using flag_t = uint8_t; - - /* @brief Data structure for implementing fast mutexes - * - * This structure is 4B wide, as required for futex system call where - * the last two bytes are used for two flags - contended and locked, - * respectively. Memory layout for the structure looks like this: - * - * all - * |---------------------------------| - * 00000000 00000000 0000000C 0000000L - * |------| |------| - * contended locked - * - * L marks the locked bit - * C marks the contended bit - */ - union mutex_t { - std::atomic all{0}; - - struct { - std::atomic locked; - std::atomic contended; - } state; - }; - - enum Contention : futex_t { UNCONTENDED = 0x0000, CONTENDED = 0x0100 }; - - enum State : futex_t { - UNLOCKED = 0x0000, - LOCKED = 0x0001, - UNLOCKED_CONTENDED = UNLOCKED | CONTENDED, // 0x0100 - LOCKED_CONTENDED = LOCKED | CONTENDED // 0x0101 - }; - - static constexpr size_t LOCK_RETRIES = 100; - static constexpr size_t UNLOCK_RETRIES = 200; - - public: - Futex() { - static_assert(sizeof(mutex_t) == sizeof(futex_t), - "Atomic futex should be the same size as non_atomic"); - } - - bool try_lock() { - // we took the lock if we stored the LOCKED state and previous - // state was UNLOCKED - return mutex.state.locked.exchange(LOCKED, std::memory_order_seq_cst) == - UNLOCKED; - } - - void lock(const struct timespec *timeout = nullptr) { - // try to fast lock a few times before going to sleep - for (size_t i = 0; i < LOCK_RETRIES; ++i) { - // try to lock and exit if we succeed - if (try_lock()) return; - - // we failed, chill a bit - relax(); - } - - // the lock is contended, go to sleep. when someone - // wakes you up, try taking the lock again - while (mutex.all.exchange(LOCKED_CONTENDED, std::memory_order_seq_cst) & - LOCKED) { - // wait in the kernel for someone to wake us up when unlocking - auto status = futex_wait(LOCKED_CONTENDED, timeout); - - // check if we woke up because of a timeout - if (status == -1 && errno == ETIMEDOUT) - throw LockTimeoutException("Lock timeout"); - } - } - - void unlock() { - futex_t state = LOCKED; - - // if we're locked and uncontended, try to unlock the mutex before - // it becomes contended - if (mutex.all.load(std::memory_order_seq_cst) == LOCKED && - mutex.all.compare_exchange_strong(state, UNLOCKED, - std::memory_order_seq_cst, - std::memory_order_seq_cst)) - return; - - // we are contended, just release the lock - mutex.state.locked.store(UNLOCKED, std::memory_order_seq_cst); - - // spin and hope someone takes a lock so we don't have to wake up - // anyone because that's quite expensive - for (size_t i = 0; i < UNLOCK_RETRIES; ++i) { - // if someone took the lock, we're ok - if (is_locked(std::memory_order_seq_cst)) return; - - relax(); - } - - // store that we are becoming uncontended - mutex.state.contended.store(UNCONTENDED, std::memory_order_seq_cst); - - // we need to wake someone up - futex_wake(LOCKED); - } - - bool is_locked(std::memory_order order = std::memory_order_seq_cst) const { - return mutex.state.locked.load(order); - } - - bool is_contended(std::memory_order order = std::memory_order_seq_cst) const { - return mutex.state.contended.load(order); - } - - private: - mutex_t mutex; - - int futex_wait(int value, const struct timespec *timeout = nullptr) { - return sys::futex(&mutex.all, FUTEX_WAIT_PRIVATE, value, timeout, nullptr, - 0); - } - - void futex_wake(int value) { - sys::futex(&mutex.all, FUTEX_WAKE_PRIVATE, value, nullptr, nullptr, 0); - } - - void relax() { cpu_relax(); } -}; diff --git a/src/threading/sync/lock_timeout_exception.hpp b/src/threading/sync/lock_timeout_exception.hpp deleted file mode 100644 index 5fd0e2cb0..000000000 --- a/src/threading/sync/lock_timeout_exception.hpp +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -#include "utils/exceptions.hpp" - -class LockTimeoutException : public utils::BasicException { - public: - using utils::BasicException::BasicException; -}; diff --git a/src/threading/sync/lockable.hpp b/src/threading/sync/lockable.hpp deleted file mode 100644 index b68edf1f7..000000000 --- a/src/threading/sync/lockable.hpp +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include - -#include "spinlock.hpp" - -/** - * @class Lockable - * - * @brief - * Lockable is used as an custom implementation of a mutex mechanism. It is - * implemented as a wrapper around std::lock_guard and std::unique_guard with - * a default lock called Spinlock. - * - * @tparam lock_t type of lock to be used (default = Spinlock) - */ -template -class Lockable { - public: - using lock_type = lock_t; - - std::lock_guard acquire_guard() const { - return std::lock_guard(lock); - } - - std::unique_lock acquire_unique() const { - return std::unique_lock(lock); - } - - mutable lock_t lock; -}; diff --git a/src/threading/sync/rwlock.hpp b/src/threading/sync/rwlock.hpp deleted file mode 100644 index 2752b0b64..000000000 --- a/src/threading/sync/rwlock.hpp +++ /dev/null @@ -1,46 +0,0 @@ -/// @file -#pragma once - -#include - -#include "glog/logging.h" - -namespace threading { - -/// By passing the appropriate parameter to the `RWLock` constructor, it is -/// possible to control the behavior of `RWLock` while shared lock is held. If -/// the priority is set to `READ`, new shared (read) locks can be obtained even -/// though there is a thread waiting for an exclusive (write) lock, which can -/// lead to writer starvation. If the priority is set to `WRITE`, readers will -/// be blocked from obtaining new shared locks while there are writers waiting, -/// which can lead to reader starvation. -enum RWLockPriority { READ, WRITE }; - -/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to -/// choose read or write priority for `std::shared_mutex`. -class RWLock { - public: - RWLock(const RWLock &) = delete; - RWLock &operator=(const RWLock &) = delete; - RWLock(RWLock &&) = delete; - RWLock &operator=(RWLock &&) = delete; - - /// Construct a RWLock object with chosen priority. See comment above - /// `RWLockPriority` for details. - explicit RWLock(RWLockPriority priority); - - ~RWLock(); - - bool try_lock(); - void lock(); - void unlock(); - - bool try_lock_shared(); - void lock_shared(); - void unlock_shared(); - - private: - pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER; -}; - -} // namespace threading diff --git a/src/threading/sync/spinlock.hpp b/src/threading/sync/spinlock.hpp deleted file mode 100644 index b68f27a6e..000000000 --- a/src/threading/sync/spinlock.hpp +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "threading/sync/cpu_relax.hpp" -#include "utils/timer.hpp" - -/** - * @class SpinLock - * - * @brief - * Spinlock is used as an locking mechanism based on an atomic flag and - * waiting loops. It uses the cpu_relax "asm pause" command to optimize wasted - * time while the threads are waiting. - * - */ -class SpinLock { - public: - void lock() { // Before was memory_order_acquire - while (lock_flag_.test_and_set()) { - cpu_relax(); - } - } - // Before was memory_order_release - void unlock() { lock_flag_.clear(); } - - bool try_lock() { return !lock_flag_.test_and_set(); } - - private: - // guaranteed by standard to be lock free! - mutable std::atomic_flag lock_flag_ = ATOMIC_FLAG_INIT; -}; diff --git a/src/threading/sync/timed_spinlock.hpp b/src/threading/sync/timed_spinlock.hpp deleted file mode 100644 index bc9f8883b..000000000 --- a/src/threading/sync/timed_spinlock.hpp +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "threading/sync/lock_timeout_exception.hpp" - -template -class TimedSpinLock { - public: - TimedSpinLock(std::chrono::seconds expiration) : expiration_(expiration) {} - - void lock() { - using clock = std::chrono::high_resolution_clock; - - auto start = clock::now(); - - while (!lock_flag.test_and_set(std::memory_order_acquire)) { - // how long have we been locked? if we exceeded the expiration - // time, throw an exception and stop being blocked because this - // might be a deadlock! - - if (clock::now() - start > expiration_) - throw LockTimeoutException("This lock has expired"); - - usleep(microseconds); - } - } - - void unlock() { lock_flag.clear(std::memory_order_release); } - - private: - std::chrono::milliseconds expiration_; - - // guaranteed by standard to be lock free! - std::atomic_flag lock_flag = ATOMIC_FLAG_INIT; -}; diff --git a/src/threading/thread.cpp b/src/threading/thread.cpp deleted file mode 100644 index b35da81bc..000000000 --- a/src/threading/thread.cpp +++ /dev/null @@ -1,12 +0,0 @@ -#include "glog/logging.h" -#include "thread.hpp" - -Thread::Thread(Thread &&other) { - DCHECK(thread_id == UNINITIALIZED) << "Thread was initialized before."; - thread_id = other.thread_id; - thread = std::move(other.thread); -} - -void Thread::join() { return thread.join(); } - -std::atomic Thread::thread_counter{1}; diff --git a/src/threading/thread.hpp b/src/threading/thread.hpp deleted file mode 100644 index d28bb94aa..000000000 --- a/src/threading/thread.hpp +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include -#include - -class Thread { - static std::atomic thread_counter; - - public: - static size_t count(std::memory_order order = std::memory_order_seq_cst) { - return thread_counter.load(order); - } - - static constexpr unsigned UNINITIALIZED = -1; - static constexpr unsigned MAIN_THREAD = 0; - - template - explicit Thread(F f) { - thread_id = thread_counter.fetch_add(1, std::memory_order_acq_rel); - thread = std::thread([this, f]() { start_thread(f); }); - } - - Thread() = default; - Thread(const Thread &) = delete; - - Thread(Thread &&other); - - void join(); - - private: - unsigned thread_id = UNINITIALIZED; - std::thread thread; - - template - void start_thread(F &&f) { - // this_thread::id = thread_id; - f(); - } -}; diff --git a/src/threading/thread_pool.hpp b/src/threading/thread_pool.hpp deleted file mode 100644 index 68f5de4d6..000000000 --- a/src/threading/thread_pool.hpp +++ /dev/null @@ -1,82 +0,0 @@ -#pragma once -/// @file - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "glog/logging.h" - -#include "utils/future.hpp" - -namespace threading { - -/// A thread pool for asynchronous task execution. Supports tasks that produce -/// return values by returning `utils::Future` objects. -class ThreadPool { - public: - /// Creates a thread pool with the given number of threads. - explicit ThreadPool(size_t threads) { - for (size_t i = 0; i < threads; ++i) - workers_.emplace_back([this] { - while (true) { - std::function task; - { - std::unique_lock lock(mutex_); - cvar_.wait(lock, [this] { return stop_ || !tasks_.empty(); }); - if (stop_ && tasks_.empty()) return; - task = std::move(tasks_.front()); - tasks_.pop(); - } - task(); - } - }); - } - - ThreadPool(const ThreadPool &) = delete; - ThreadPool(ThreadPool &&) = delete; - ThreadPool &operator=(const ThreadPool &) = delete; - ThreadPool &operator=(ThreadPool &&) = delete; - - /// Runs the given callable with the given args, asynchronously. This function - /// immediately returns an `utils::Future` with the result, to be - /// consumed when ready. - template - auto Run(TCallable &&callable, TArgs &&... args) { - auto task = std::make_shared< - std::packaged_task()>>(std::bind( - std::forward(callable), std::forward(args)...)); - - auto res = utils::make_future(task->get_future()); - - std::unique_lock lock(mutex_); - CHECK(!stop_) << "ThreadPool::Run called on stopped ThreadPool."; - tasks_.emplace([task]() { (*task)(); }); - lock.unlock(); - cvar_.notify_one(); - return res; - } - - ~ThreadPool() { - std::unique_lock lock(mutex_); - stop_ = true; - lock.unlock(); - cvar_.notify_all(); - for (std::thread &worker : workers_) { - if (worker.joinable()) worker.join(); - } - } - - private: - std::vector workers_; - std::queue> tasks_; - std::mutex mutex_; - std::condition_variable cvar_; - bool stop_{false}; -}; -} // namespace threading diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index 771cd1d23..04cb93ba3 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -5,7 +5,6 @@ #include #include "data_structures/concurrent/concurrent_map.hpp" -#include "threading/sync/spinlock.hpp" #include "transactions/commit_log.hpp" #include "transactions/transaction.hpp" #include "transactions/type.hpp" diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index dacc4c233..71dedfffa 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -14,7 +14,7 @@ SingleNodeEngine::SingleNodeEngine(durability::WriteAheadLog *wal) Transaction *SingleNodeEngine::Begin() { VLOG(11) << "[Tx] Starting transaction " << counter_ + 1; - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); TransactionId id{++counter_}; auto t = new Transaction(id, active_, *this); @@ -27,7 +27,7 @@ Transaction *SingleNodeEngine::Begin() { } CommandId SingleNodeEngine::Advance(TransactionId id) { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); auto it = store_.find(id); DCHECK(it != store_.end()) @@ -43,7 +43,7 @@ CommandId SingleNodeEngine::Advance(TransactionId id) { } CommandId SingleNodeEngine::UpdateCommand(TransactionId id) { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); auto it = store_.find(id); DCHECK(it != store_.end()) << "Transaction::advance on non-existing transaction"; @@ -52,7 +52,7 @@ CommandId SingleNodeEngine::UpdateCommand(TransactionId id) { void SingleNodeEngine::Commit(const Transaction &t) { VLOG(11) << "[Tx] Commiting transaction " << t.id_; - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); clog_.set_committed(t.id_); active_.remove(t.id_); if (wal_) { @@ -63,7 +63,7 @@ void SingleNodeEngine::Commit(const Transaction &t) { void SingleNodeEngine::Abort(const Transaction &t) { VLOG(11) << "[Tx] Aborting transaction " << t.id_; - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); clog_.set_aborted(t.id_); active_.remove(t.id_); if (wal_) { @@ -77,7 +77,7 @@ CommitLog::Info SingleNodeEngine::Info(TransactionId tx) const { } Snapshot SingleNodeEngine::GlobalGcSnapshot() { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); // No active transactions. if (active_.size() == 0) { @@ -93,20 +93,20 @@ Snapshot SingleNodeEngine::GlobalGcSnapshot() { } Snapshot SingleNodeEngine::GlobalActiveTransactions() { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); Snapshot active_transactions = active_; return active_transactions; } TransactionId SingleNodeEngine::LocalLast() const { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); return counter_; } TransactionId SingleNodeEngine::GlobalLast() const { return LocalLast(); } TransactionId SingleNodeEngine::LocalOldestActive() const { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); return active_.empty() ? counter_ + 1 : active_.front(); } @@ -116,14 +116,14 @@ void SingleNodeEngine::GarbageCollectCommitLog(TransactionId tx_id) { void SingleNodeEngine::LocalForEachActiveTransaction( std::function f) { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); for (auto transaction : active_) { f(*store_.find(transaction)->second); } } Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); auto found = store_.find(tx_id); CHECK(found != store_.end()) << "Can't return snapshot for an inactive transaction"; @@ -131,7 +131,7 @@ Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) { } void SingleNodeEngine::EnsureNextIdGreater(TransactionId tx_id) { - std::lock_guard guard(lock_); + std::lock_guard guard(lock_); counter_ = std::max(tx_id, counter_); } diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/engine_single_node.hpp index 85d88c0f1..ddc09918a 100644 --- a/src/transactions/engine_single_node.hpp +++ b/src/transactions/engine_single_node.hpp @@ -5,11 +5,11 @@ #include #include "durability/wal.hpp" -#include "threading/sync/spinlock.hpp" #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" #include "transactions/transaction.hpp" #include "utils/exceptions.hpp" +#include "utils/thread/sync.hpp" namespace tx { @@ -51,7 +51,7 @@ class SingleNodeEngine : public Engine { CommitLog clog_; std::unordered_map> store_; Snapshot active_; - mutable SpinLock lock_; + mutable utils::SpinLock lock_; // Optional. If present, the Engine will write tx Begin/Commit/Abort // atomically (while under lock). durability::WriteAheadLog *wal_{nullptr}; diff --git a/src/transactions/lock_store.hpp b/src/transactions/lock_store.hpp index 9400fb430..1cea9d451 100644 --- a/src/transactions/lock_store.hpp +++ b/src/transactions/lock_store.hpp @@ -7,8 +7,8 @@ #include "glog/logging.h" #include "storage/locking/lock_status.hpp" #include "storage/locking/record_lock.hpp" -#include "threading/sync/spinlock.hpp" #include "transactions/type.hpp" +#include "utils/thread/sync.hpp" namespace tx { @@ -69,7 +69,7 @@ class LockStore { // same time. IMPORTANT: This guard must come after LockHolder construction, // as that potentially takes a long time and this guard only needs to // protect locks_ update. - std::lock_guard guard{locks_lock_}; + std::lock_guard guard{locks_lock_}; locks_.emplace_back(std::move(holder)); if (!locks_.back().active()) { locks_.pop_back(); @@ -77,7 +77,7 @@ class LockStore { } private: - SpinLock locks_lock_; + utils::SpinLock locks_lock_; std::vector locks_; }; } // namespace tx diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp index 9613023d2..68d57bbcc 100644 --- a/src/transactions/transaction.hpp +++ b/src/transactions/transaction.hpp @@ -8,8 +8,6 @@ #include "data_structures/concurrent/concurrent_map.hpp" #include "storage/locking/record_lock.hpp" -#include "threading/sync/lockable.hpp" -#include "threading/sync/spinlock.hpp" #include "transactions/lock_store.hpp" #include "transactions/snapshot.hpp" #include "transactions/type.hpp" diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index 1fc9c9d12..dc56d4c3e 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -2,6 +2,8 @@ set(utils_src_files demangle.cpp file.cpp signals.cpp + thread.cpp + thread/sync.cpp watchdog.cpp) add_library(mg-utils STATIC ${utils_src_files}) diff --git a/src/utils/random_graph_generator.hpp b/src/utils/random_graph_generator.hpp index 878ee3cee..41bbe307b 100644 --- a/src/utils/random_graph_generator.hpp +++ b/src/utils/random_graph_generator.hpp @@ -13,7 +13,6 @@ #include "storage/property_value.hpp" #include "storage/types.hpp" #include "storage/vertex_accessor.hpp" -#include "threading/sync/lock_timeout_exception.hpp" namespace utils { diff --git a/src/utils/thread.cpp b/src/utils/thread.cpp new file mode 100644 index 000000000..05c73e833 --- /dev/null +++ b/src/utils/thread.cpp @@ -0,0 +1,52 @@ +#include "utils/thread.hpp" + +#include + +#include + +namespace utils { + +void ThreadSetName(const std::string &name) { + CHECK(name.size() <= 16) << "Thread name '" << name << "'too long"; + LOG_IF(WARNING, prctl(PR_SET_NAME, name.c_str()) != 0) + << "Couldn't set thread name: " << name << "!"; +} + +Thread::Thread(Thread &&other) { + DCHECK(thread_id == UNINITIALIZED) << "Thread was initialized before."; + thread_id = other.thread_id; + thread = std::move(other.thread); +} + +void Thread::join() { return thread.join(); } + +std::atomic Thread::thread_counter{1}; + +ThreadPool::ThreadPool(size_t threads) { + for (size_t i = 0; i < threads; ++i) + workers_.emplace_back([this] { + while (true) { + std::function task; + { + std::unique_lock lock(mutex_); + cvar_.wait(lock, [this] { return stop_ || !tasks_.empty(); }); + if (stop_ && tasks_.empty()) return; + task = std::move(tasks_.front()); + tasks_.pop(); + } + task(); + } + }); +} + +ThreadPool::~ThreadPool() { + std::unique_lock lock(mutex_); + stop_ = true; + lock.unlock(); + cvar_.notify_all(); + for (std::thread &worker : workers_) { + if (worker.joinable()) worker.join(); + } +} + +} // namespace utils diff --git a/src/utils/thread.hpp b/src/utils/thread.hpp index cd3e50e6a..e0d7b226e 100644 --- a/src/utils/thread.hpp +++ b/src/utils/thread.hpp @@ -1,21 +1,99 @@ +/// @file #pragma once -#include - +#include +#include +#include +#include +#include +#include +#include #include +#include #include +#include "utils/future.hpp" + namespace utils { -/** - * This function sets the thread name of the calling thread. - * Beware, the name length limit is 16 characters! - */ -inline void ThreadSetName(const std::string &name) { - CHECK(name.size() <= 16) << "Thread name '" << name << "'too long"; - LOG_IF(WARNING, prctl(PR_SET_NAME, name.c_str()) != 0) - << "Couldn't set thread name: " << name << "!"; -} +/// This function sets the thread name of the calling thread. +/// Beware, the name length limit is 16 characters! +void ThreadSetName(const std::string &name); + +class Thread { + static std::atomic thread_counter; + + public: + static size_t count(std::memory_order order = std::memory_order_seq_cst) { + return thread_counter.load(order); + } + + static constexpr unsigned UNINITIALIZED = -1; + static constexpr unsigned MAIN_THREAD = 0; + + template + explicit Thread(F f) { + thread_id = thread_counter.fetch_add(1, std::memory_order_acq_rel); + thread = std::thread([this, f]() { start_thread(f); }); + } + + Thread() = default; + Thread(const Thread &) = delete; + + Thread(Thread &&other); + + void join(); + + private: + unsigned thread_id = UNINITIALIZED; + std::thread thread; + + template + void start_thread(F &&f) { + // this_thread::id = thread_id; + f(); + } +}; + +/// A thread pool for asynchronous task execution. Supports tasks that produce +/// return values by returning `utils::Future` objects. +class ThreadPool final { + public: + /// Creates a thread pool with the given number of threads. + explicit ThreadPool(size_t threads); + ~ThreadPool(); + + ThreadPool(const ThreadPool &) = delete; + ThreadPool(ThreadPool &&) = delete; + ThreadPool &operator=(const ThreadPool &) = delete; + ThreadPool &operator=(ThreadPool &&) = delete; + + /// Runs the given callable with the given args, asynchronously. This function + /// immediately returns an `utils::Future` with the result, to be + /// consumed when ready. + template + auto Run(TCallable &&callable, TArgs &&... args) { + auto task = std::make_shared< + std::packaged_task()>>(std::bind( + std::forward(callable), std::forward(args)...)); + + auto res = utils::make_future(task->get_future()); + + std::unique_lock lock(mutex_); + CHECK(!stop_) << "ThreadPool::Run called on stopped ThreadPool."; + tasks_.emplace([task]() { (*task)(); }); + lock.unlock(); + cvar_.notify_one(); + return res; + } + + private: + std::vector workers_; + std::queue> tasks_; + std::mutex mutex_; + std::condition_variable cvar_; + bool stop_{false}; +}; }; // namespace utils diff --git a/src/threading/sync/rwlock.cpp b/src/utils/thread/sync.cpp similarity index 50% rename from src/threading/sync/rwlock.cpp rename to src/utils/thread/sync.cpp index 70b37a002..0e7ab2bfd 100644 --- a/src/threading/sync/rwlock.cpp +++ b/src/utils/thread/sync.cpp @@ -1,6 +1,19 @@ -#include "threading/sync/rwlock.hpp" +#include "utils/thread/sync.hpp" -namespace threading { +#include +#include +#include +#include + +namespace sys { +inline int futex(void *addr1, int op, int val1, const struct timespec *timeout, + void *addr2, int val3) { + return syscall(SYS_futex, addr1, op, val1, timeout, addr2, val3); +}; + +} // namespace sys + +namespace utils { RWLock::RWLock(RWLockPriority priority) { int err; @@ -87,4 +100,65 @@ void RWLock::unlock_shared() { } } -} // namespace threading +void Futex::lock(const struct timespec *timeout) { + // try to fast lock a few times before going to sleep + for (size_t i = 0; i < LOCK_RETRIES; ++i) { + // try to lock and exit if we succeed + if (try_lock()) return; + + // we failed, chill a bit + relax(); + } + + // the lock is contended, go to sleep. when someone + // wakes you up, try taking the lock again + while (mutex.all.exchange(LOCKED_CONTENDED, std::memory_order_seq_cst) & + LOCKED) { + // wait in the kernel for someone to wake us up when unlocking + auto status = futex_wait(LOCKED_CONTENDED, timeout); + + // check if we woke up because of a timeout + if (status == -1 && errno == ETIMEDOUT) + throw LockTimeoutException("Lock timeout"); + } +} + +void Futex::unlock() { + futex_t state = LOCKED; + + // if we're locked and uncontended, try to unlock the mutex before + // it becomes contended + if (mutex.all.load(std::memory_order_seq_cst) == LOCKED && + mutex.all.compare_exchange_strong(state, UNLOCKED, + std::memory_order_seq_cst, + std::memory_order_seq_cst)) + return; + + // we are contended, just release the lock + mutex.state.locked.store(UNLOCKED, std::memory_order_seq_cst); + + // spin and hope someone takes a lock so we don't have to wake up + // anyone because that's quite expensive + for (size_t i = 0; i < UNLOCK_RETRIES; ++i) { + // if someone took the lock, we're ok + if (is_locked(std::memory_order_seq_cst)) return; + + relax(); + } + + // store that we are becoming uncontended + mutex.state.contended.store(UNCONTENDED, std::memory_order_seq_cst); + + // we need to wake someone up + futex_wake(LOCKED); +} + +int Futex::futex_wait(int value, const struct timespec *timeout) { + return sys::futex(&mutex.all, FUTEX_WAIT_PRIVATE, value, timeout, nullptr, 0); +} + +void Futex::futex_wake(int value) { + sys::futex(&mutex.all, FUTEX_WAKE_PRIVATE, value, nullptr, nullptr, 0); +} + +} // namespace utils diff --git a/src/utils/thread/sync.hpp b/src/utils/thread/sync.hpp new file mode 100644 index 000000000..5371f7b2f --- /dev/null +++ b/src/utils/thread/sync.hpp @@ -0,0 +1,235 @@ +/// @file +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +#include "utils/exceptions.hpp" + +namespace utils { + +/// Improves contention in spinlocks by hinting the processor that we're in a +/// spinlock and not doing much. +inline void CpuRelax() { + // if IBMPower + // HMT_very_low() + // http://stackoverflow.com/questions/5425506/equivalent-of-x86-pause-instruction-for-ppc + asm("PAUSE"); +} + +class LockTimeoutException : public BasicException { + public: + using BasicException::BasicException; +}; + +class CasLock { + public: + void lock() { + bool locked = false; + + while (!lock_flag.compare_exchange_weak( + locked, true, std::memory_order_release, std::memory_order_relaxed)) { + usleep(250); + } + } + + void unlock() { lock_flag.store(0, std::memory_order_release); } + + bool locked() { return lock_flag.load(std::memory_order_relaxed); } + + private: + std::atomic lock_flag; +}; + +/// Spinlock is used as a locking mechanism based on an atomic flag and waiting +/// loops. +/// +/// It uses the CpuRelax "asm pause" command to optimize wasted time while the +/// threads are waiting. +class SpinLock { + public: + void lock() { // Before was memory_order_acquire + while (lock_flag_.test_and_set()) { + CpuRelax(); + } + } + // Before was memory_order_release + void unlock() { lock_flag_.clear(); } + + bool try_lock() { return !lock_flag_.test_and_set(); } + + private: + // guaranteed by standard to be lock free! + mutable std::atomic_flag lock_flag_ = ATOMIC_FLAG_INIT; +}; + +template +class TimedSpinLock { + public: + TimedSpinLock(std::chrono::seconds expiration) : expiration_(expiration) {} + + void lock() { + using clock = std::chrono::high_resolution_clock; + + auto start = clock::now(); + + while (!lock_flag.test_and_set(std::memory_order_acquire)) { + // how long have we been locked? if we exceeded the expiration + // time, throw an exception and stop being blocked because this + // might be a deadlock! + + if (clock::now() - start > expiration_) + throw LockTimeoutException("This lock has expired"); + + usleep(microseconds); + } + } + + void unlock() { lock_flag.clear(std::memory_order_release); } + + private: + std::chrono::milliseconds expiration_; + + // guaranteed by standard to be lock free! + std::atomic_flag lock_flag = ATOMIC_FLAG_INIT; +}; + +/// By passing the appropriate parameter to the `RWLock` constructor, it is +/// possible to control the behavior of `RWLock` while shared lock is held. If +/// the priority is set to `READ`, new shared (read) locks can be obtained even +/// though there is a thread waiting for an exclusive (write) lock, which can +/// lead to writer starvation. If the priority is set to `WRITE`, readers will +/// be blocked from obtaining new shared locks while there are writers waiting, +/// which can lead to reader starvation. +enum RWLockPriority { READ, WRITE }; + +/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to +/// choose read or write priority for `std::shared_mutex`. +class RWLock { + public: + RWLock(const RWLock &) = delete; + RWLock &operator=(const RWLock &) = delete; + RWLock(RWLock &&) = delete; + RWLock &operator=(RWLock &&) = delete; + + /// Construct a RWLock object with chosen priority. See comment above + /// `RWLockPriority` for details. + explicit RWLock(RWLockPriority priority); + + ~RWLock(); + + bool try_lock(); + void lock(); + void unlock(); + + bool try_lock_shared(); + void lock_shared(); + void unlock_shared(); + + private: + pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER; +}; + +/// Lockable is used as an custom implementation of a mutex mechanism. +/// +/// It is implemented as a wrapper around std::lock_guard and std::unique_guard +/// with a default lock called Spinlock. +/// +/// @tparam lock_t type of lock to be used (default = Spinlock) +template +class Lockable { + public: + using lock_type = lock_t; + + std::lock_guard acquire_guard() const { + return std::lock_guard(lock); + } + + std::unique_lock acquire_unique() const { + return std::unique_lock(lock); + } + + mutable lock_t lock; +}; + +class Futex { + using futex_t = uint32_t; + using flag_t = uint8_t; + + /// Data structure for implementing fast mutexes + /// + /// This structure is 4B wide, as required for futex system call where + /// the last two bytes are used for two flags - contended and locked, + /// respectively. Memory layout for the structure looks like this: + /// + /// all + /// |---------------------------------| + /// 00000000 00000000 0000000C 0000000L + /// |------| |------| + /// contended locked + /// + /// L marks the locked bit + /// C marks the contended bit + union mutex_t { + std::atomic all{0}; + + struct { + std::atomic locked; + std::atomic contended; + } state; + }; + + enum Contention : futex_t { UNCONTENDED = 0x0000, CONTENDED = 0x0100 }; + + enum State : futex_t { + UNLOCKED = 0x0000, + LOCKED = 0x0001, + UNLOCKED_CONTENDED = UNLOCKED | CONTENDED, // 0x0100 + LOCKED_CONTENDED = LOCKED | CONTENDED // 0x0101 + }; + + static constexpr size_t LOCK_RETRIES = 100; + static constexpr size_t UNLOCK_RETRIES = 200; + + public: + Futex() { + static_assert(sizeof(mutex_t) == sizeof(futex_t), + "Atomic futex should be the same size as non_atomic"); + } + + bool try_lock() { + // we took the lock if we stored the LOCKED state and previous + // state was UNLOCKED + return mutex.state.locked.exchange(LOCKED, std::memory_order_seq_cst) == + UNLOCKED; + } + + void lock(const struct timespec *timeout = nullptr); + + void unlock(); + + bool is_locked(std::memory_order order = std::memory_order_seq_cst) const { + return mutex.state.locked.load(order); + } + + bool is_contended(std::memory_order order = std::memory_order_seq_cst) const { + return mutex.state.contended.load(order); + } + + private: + mutex_t mutex; + + int futex_wait(int value, const struct timespec *timeout = nullptr); + + void futex_wake(int value); + + void relax() { CpuRelax(); } +}; + +} // namespace utils diff --git a/tests/concurrent/futex.cpp b/tests/concurrent/futex.cpp index 1f9df9e1d..e44a453b7 100644 --- a/tests/concurrent/futex.cpp +++ b/tests/concurrent/futex.cpp @@ -3,9 +3,11 @@ #include #include -#include "threading/sync/futex.hpp" +#include -Futex futex; +#include "utils/thread/sync.hpp" + +utils::Futex futex; int x = 0; /** @@ -19,7 +21,7 @@ void test_lock(int) { // TODO: create long running test for (int i = 0; i < 5; ++i) { { - std::unique_lock guard(futex); + std::unique_lock guard(futex); x++; std::this_thread::sleep_for(std::chrono::milliseconds(dis(gen))); CHECK(x == 1) << "Other thread shouldn't be able to " diff --git a/tests/concurrent/spinlock.cpp b/tests/concurrent/spinlock.cpp index b8baeacfa..2dd1077a0 100644 --- a/tests/concurrent/spinlock.cpp +++ b/tests/concurrent/spinlock.cpp @@ -6,16 +6,16 @@ #include "glog/logging.h" -#include "threading/sync/spinlock.hpp" +#include "utils/thread/sync.hpp" int x = 0; -SpinLock lock; +utils::SpinLock lock; void test_lock() { using namespace std::literals; { - std::unique_lock guard(lock); + std::unique_lock guard(lock); x++; std::this_thread::sleep_for(25ms); diff --git a/tests/macro_benchmark/clients/card_fraud_client.cpp b/tests/macro_benchmark/clients/card_fraud_client.cpp index 7971a1def..3a70a64c9 100644 --- a/tests/macro_benchmark/clients/card_fraud_client.cpp +++ b/tests/macro_benchmark/clients/card_fraud_client.cpp @@ -7,7 +7,7 @@ #include "stats/stats.hpp" #include "stats/stats_rpc_messages.hpp" -#include "threading/sync/rwlock.hpp" +#include "utils/thread/sync.hpp" #include "long_running_common.hpp" @@ -25,7 +25,7 @@ std::atomic num_cards; std::atomic num_transactions; std::atomic max_tx_id; -threading::RWLock world_lock(threading::RWLockPriority::WRITE); +utils::RWLock world_lock(utils::RWLockPriority::WRITE); DEFINE_string(config, "", "test config"); @@ -201,12 +201,12 @@ class CardFraudClient : public TestClient { void AnalyticStep() { std::this_thread::sleep_for( std::chrono::milliseconds(config_["analytic"]["query_interval_ms"])); - std::shared_lock lock(world_lock); + std::shared_lock lock(world_lock); GetCompromisedPosInc(config_["analytic"]["pos_limit"]); } void WorkerStep() { - std::shared_lock lock(world_lock); + std::shared_lock lock(world_lock); bool is_fraud = UniformDouble(0, 1) < config_["fraud_probability"]; int64_t pos_id = UniformInt(0, num_pos - 1); @@ -243,7 +243,7 @@ class CardFraudClient : public TestClient { void CleanupStep() { if (num_transactions >= config_["cleanup"]["tx_hi"].get()) { LOG(INFO) << "Trying to obtain world lock..."; - std::unique_lock lock(world_lock); + std::unique_lock lock(world_lock); int64_t id_limit = max_tx_id - config_["cleanup"]["tx_lo"].get() + 1; LOG(INFO) << "Transaction cleanup started, deleting transactions " "with ids less than " diff --git a/tests/macro_benchmark/clients/common.hpp b/tests/macro_benchmark/clients/common.hpp index a932f7566..b7039b690 100644 --- a/tests/macro_benchmark/clients/common.hpp +++ b/tests/macro_benchmark/clients/common.hpp @@ -75,7 +75,7 @@ std::pair ExecuteNTimesTillSuccess( utils::Timer t; std::chrono::microseconds to_sleep(rand_dist_(pseudo_rand_gen_)); while (t.Elapsed() < to_sleep) { - cpu_relax(); + utils::CpuRelax(); } } } diff --git a/tests/macro_benchmark/clients/graph_500_bfs.cpp b/tests/macro_benchmark/clients/graph_500_bfs.cpp index 68053a373..0e6d80c34 100644 --- a/tests/macro_benchmark/clients/graph_500_bfs.cpp +++ b/tests/macro_benchmark/clients/graph_500_bfs.cpp @@ -8,7 +8,6 @@ #include "long_running_common.hpp" #include "stats/stats.hpp" #include "stats/stats_rpc_messages.hpp" -#include "threading/sync/rwlock.hpp" class Graph500BfsClient : public TestClient { public: diff --git a/tests/macro_benchmark/clients/long_running_common.hpp b/tests/macro_benchmark/clients/long_running_common.hpp index 11f5c49f4..40b94afb0 100644 --- a/tests/macro_benchmark/clients/long_running_common.hpp +++ b/tests/macro_benchmark/clients/long_running_common.hpp @@ -36,7 +36,7 @@ class TestClient { virtual ~TestClient() {} auto ConsumeStats() { - std::unique_lock guard(lock_); + std::unique_lock guard(lock_); auto stats = stats_; stats_.clear(); return stats; @@ -76,7 +76,7 @@ class TestClient { auto metadata = result.metadata; metadata["wall_time"] = wall_time.count(); { - std::unique_lock guard(lock_); + std::unique_lock guard(lock_); if (query_name != "") { stats_[query_name].push_back(std::move(metadata)); } else { @@ -88,7 +88,7 @@ class TestClient { return result; } - SpinLock lock_; + utils::SpinLock lock_; std::unordered_map>> stats_; diff --git a/tests/macro_benchmark/clients/pokec_client.cpp b/tests/macro_benchmark/clients/pokec_client.cpp index 9bd436b2f..f1c191eb2 100644 --- a/tests/macro_benchmark/clients/pokec_client.cpp +++ b/tests/macro_benchmark/clients/pokec_client.cpp @@ -14,7 +14,6 @@ #include #include "io/network/utils.hpp" -#include "threading/sync/spinlock.hpp" #include "utils/algorithm.hpp" #include "utils/timer.hpp" diff --git a/tests/macro_benchmark/clients/query_client.cpp b/tests/macro_benchmark/clients/query_client.cpp index fe82a57b9..5d28b04f6 100644 --- a/tests/macro_benchmark/clients/query_client.cpp +++ b/tests/macro_benchmark/clients/query_client.cpp @@ -4,9 +4,9 @@ #include #include -#include "threading/sync/spinlock.hpp" #include "utils/algorithm.hpp" #include "utils/string.hpp" +#include "utils/thread/sync.hpp" #include "utils/timer.hpp" #include "common.hpp" @@ -47,7 +47,7 @@ void ExecuteQueries(const std::vector &queries, std::ostream &ostream) { std::vector threads; - SpinLock spinlock; + utils::SpinLock spinlock; uint64_t last = 0; std::vector> metadata; @@ -67,7 +67,7 @@ void ExecuteQueries(const std::vector &queries, while (true) { uint64_t pos; { - std::lock_guard lock(spinlock); + std::lock_guard lock(spinlock); if (last == queries.size()) { break; } diff --git a/tests/manual/card_fraud_local.cpp b/tests/manual/card_fraud_local.cpp index 4685dc1cc..571704474 100644 --- a/tests/manual/card_fraud_local.cpp +++ b/tests/manual/card_fraud_local.cpp @@ -51,7 +51,7 @@ int main(int argc, char *argv[]) { {"card", rint(kCardCount)}, {"tx", tx_counter++}}); CheckResults(res, {{1}}, "Transaction creation"); - } catch (LockTimeoutException &) { + } catch (utils::LockTimeoutException &) { --i; } catch (mvcc::SerializationError &) { --i; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index e5d888ac4..a4f5e45f8 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -145,15 +145,9 @@ target_link_libraries(${test_prefix}mvcc_one_transaction memgraph_lib) add_unit_test(mvcc_parallel_update.cpp) target_link_libraries(${test_prefix}mvcc_parallel_update memgraph_lib) -add_unit_test(network_endpoint.cpp) -target_link_libraries(${test_prefix}network_endpoint memgraph_lib) - add_unit_test(network_timeouts.cpp) target_link_libraries(${test_prefix}network_timeouts memgraph_lib) -add_unit_test(network_utils.cpp) -target_link_libraries(${test_prefix}network_utils memgraph_lib) - add_unit_test(property_value_store.cpp) target_link_libraries(${test_prefix}property_value_store memgraph_lib) @@ -199,18 +193,12 @@ target_link_libraries(${test_prefix}raft_storage memgraph_lib) add_unit_test(record_edge_vertex_accessor.cpp) target_link_libraries(${test_prefix}record_edge_vertex_accessor memgraph_lib) -add_unit_test(ring_buffer.cpp) -target_link_libraries(${test_prefix}ring_buffer memgraph_lib) - add_unit_test(rpc.cpp) target_link_libraries(${test_prefix}rpc memgraph_lib) add_unit_test(rpc_worker_clients.cpp) target_link_libraries(${test_prefix}rpc_worker_clients memgraph_lib) -add_unit_test(rwlock.cpp) -target_link_libraries(${test_prefix}rwlock memgraph_lib) - add_unit_test(serialization.cpp) target_link_libraries(${test_prefix}serialization memgraph_lib) @@ -241,9 +229,6 @@ target_link_libraries(${test_prefix}storage_address memgraph_lib) add_unit_test(stripped.cpp) target_link_libraries(${test_prefix}stripped memgraph_lib) -add_unit_test(thread_pool.cpp) -target_link_libraries(${test_prefix}thread_pool memgraph_lib) - add_unit_test(transaction_engine_distributed.cpp) target_link_libraries(${test_prefix}transaction_engine_distributed memgraph_lib) @@ -253,6 +238,19 @@ target_link_libraries(${test_prefix}transaction_engine_single_node memgraph_lib) add_unit_test(typed_value.cpp) target_link_libraries(${test_prefix}typed_value memgraph_lib) +# Test data structures + +add_unit_test(ring_buffer.cpp) +target_link_libraries(${test_prefix}ring_buffer mg-utils) + +# Test mg-io + +add_unit_test(network_endpoint.cpp) +target_link_libraries(${test_prefix}network_endpoint mg-io) +add_unit_test(network_utils.cpp) +target_link_libraries(${test_prefix}network_utils mg-io) + + # Test mg-utils add_unit_test(utils_demangle.cpp) @@ -270,12 +268,18 @@ target_link_libraries(${test_prefix}utils_on_scope_exit mg-utils) add_unit_test(utils_scheduler.cpp) target_link_libraries(${test_prefix}utils_scheduler mg-utils) +add_unit_test(utils_rwlock.cpp) +target_link_libraries(${test_prefix}utils_rwlock mg-utils) + add_unit_test(utils_signals.cpp) target_link_libraries(${test_prefix}utils_signals mg-utils) add_unit_test(utils_string.cpp) target_link_libraries(${test_prefix}utils_string mg-utils) +add_unit_test(utils_thread_pool.cpp) +target_link_libraries(${test_prefix}utils_thread_pool mg-utils) + add_unit_test(utils_timestamp.cpp) target_link_libraries(${test_prefix}utils_timestamp mg-utils) diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 8c539b4ea..240ec980c 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -814,7 +814,7 @@ TEST_F(Durability, SequentialRecovery) { auto v = dba.FindVertex(random_int(kNumVertices), false); try { v.PropsSet(dba.Property("prop"), random_int(100)); - } catch (LockTimeoutException &) { + } catch (utils::LockTimeoutException &) { } catch (mvcc::SerializationError &) { } dba.InsertVertex(); diff --git a/tests/unit/mvcc.cpp b/tests/unit/mvcc.cpp index c9502d84a..8f9c5b147 100644 --- a/tests/unit/mvcc.cpp +++ b/tests/unit/mvcc.cpp @@ -4,9 +4,9 @@ #include "mvcc/record.hpp" #include "mvcc/version.hpp" #include "mvcc/version_list.hpp" -#include "threading/sync/lock_timeout_exception.hpp" #include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" +#include "utils/thread/sync.hpp" #include "mvcc_gc_common.hpp" @@ -23,7 +23,7 @@ TEST(MVCC, Deadlock) { version_list1.update(*t1); version_list2.update(*t2); - EXPECT_THROW(version_list1.update(*t2), LockTimeoutException); + EXPECT_THROW(version_list1.update(*t2), utils::LockTimeoutException); } // TODO Gleich: move this test to mvcc_gc??? diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp index d92060c53..82a66a4a7 100644 --- a/tests/unit/mvcc_find_update_common.hpp +++ b/tests/unit/mvcc_find_update_common.hpp @@ -4,7 +4,6 @@ #include "mvcc/record.hpp" #include "mvcc/version.hpp" #include "mvcc/version_list.hpp" -#include "threading/sync/lock_timeout_exception.hpp" #include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" diff --git a/tests/unit/ring_buffer.cpp b/tests/unit/ring_buffer.cpp index de4c718c7..d3b746b6d 100644 --- a/tests/unit/ring_buffer.cpp +++ b/tests/unit/ring_buffer.cpp @@ -4,7 +4,7 @@ #include "gtest/gtest.h" #include "data_structures/ring_buffer.hpp" -#include "threading/sync/spinlock.hpp" +#include "utils/thread/sync.hpp" TEST(RingBuffer, MultithreadedUsage) { auto test_f = [](int producer_count, int elems_per_producer, @@ -12,7 +12,7 @@ TEST(RingBuffer, MultithreadedUsage) { int consumer_sleep_ms) { std::unordered_set consumed; - SpinLock consumed_lock; + utils::SpinLock consumed_lock; RingBuffer buffer{20}; std::vector producers; @@ -34,7 +34,7 @@ TEST(RingBuffer, MultithreadedUsage) { while (true) { std::this_thread::sleep_for( std::chrono::milliseconds(consumer_sleep_ms)); - std::lock_guard guard(consumed_lock); + std::lock_guard guard(consumed_lock); if (consumed.size() == elem_total_count) break; auto value = buffer.pop(); if (value) consumed.emplace(*value); diff --git a/tests/unit/rwlock.cpp b/tests/unit/utils_rwlock.cpp similarity index 97% rename from tests/unit/rwlock.cpp rename to tests/unit/utils_rwlock.cpp index 6410931d8..a45b56b74 100644 --- a/tests/unit/rwlock.cpp +++ b/tests/unit/utils_rwlock.cpp @@ -4,13 +4,13 @@ #include "glog/logging.h" #include "gtest/gtest.h" -#include "threading/sync/rwlock.hpp" +#include "utils/thread/sync.hpp" #include "utils/timer.hpp" using namespace std::chrono_literals; -using threading::RWLock; -using threading::RWLockPriority; +using utils::RWLock; +using utils::RWLockPriority; TEST(RWLock, MultipleReaders) { RWLock rwlock(RWLockPriority::READ); diff --git a/tests/unit/thread_pool.cpp b/tests/unit/utils_thread_pool.cpp similarity index 89% rename from tests/unit/thread_pool.cpp rename to tests/unit/utils_thread_pool.cpp index b58381007..1ad6166be 100644 --- a/tests/unit/thread_pool.cpp +++ b/tests/unit/utils_thread_pool.cpp @@ -5,12 +5,12 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "threading/thread_pool.hpp" #include "utils/future.hpp" +#include "utils/thread.hpp" #include "utils/timer.hpp" TEST(ThreadPool, RunMany) { - threading::ThreadPool tp(10); + utils::ThreadPool tp(10); const int kResults = 10000; std::vector> results; for (int i = 0; i < kResults; ++i) { @@ -26,7 +26,7 @@ TEST(ThreadPool, EnsureParallel) { using namespace std::chrono_literals; const int kSize = 10; - threading::ThreadPool tp(kSize); + utils::ThreadPool tp(kSize); std::vector> results; utils::Timer t;