From 37722e54b35b445f5dda77ff4ca33967db893a87 Mon Sep 17 00:00:00 2001 From: florijan Date: Fri, 15 Dec 2017 10:48:21 +0100 Subject: [PATCH] Add RPC to concurrent ID mapper Summary: The distributed ID mapper is not yet utilised in GraphDb as those changes are in D1060. Depending on landing order it will be added. Reviewers: dgleich, mislav.bradac Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1064 --- src/CMakeLists.txt | 2 + src/communication/rpc/rpc.cpp | 5 ++ src/communication/rpc/rpc.hpp | 3 + src/database/graph_db.hpp | 18 ++++-- src/database/graph_db_datatypes.hpp | 7 +++ src/storage/concurrent_id_mapper.hpp | 3 + src/storage/concurrent_id_mapper_master.cpp | 49 +++++++++++++++ src/storage/concurrent_id_mapper_master.hpp | 51 +++++---------- .../concurrent_id_mapper_rpc_messages.hpp | 45 +++++++++++++ .../concurrent_id_mapper_single_node.hpp | 49 +++++++++++++++ src/storage/concurrent_id_mapper_worker.cpp | 63 +++++++++++++++++++ src/storage/concurrent_id_mapper_worker.hpp | 50 +++++++-------- .../unit/concurrent_id_mapper_distributed.cpp | 53 ++++++++++++++++ ...p => concurrent_id_mapper_single_node.cpp} | 4 +- 14 files changed, 332 insertions(+), 70 deletions(-) create mode 100644 src/storage/concurrent_id_mapper_master.cpp create mode 100644 src/storage/concurrent_id_mapper_rpc_messages.hpp create mode 100644 src/storage/concurrent_id_mapper_single_node.hpp create mode 100644 src/storage/concurrent_id_mapper_worker.cpp create mode 100644 tests/unit/concurrent_id_mapper_distributed.cpp rename tests/unit/{concurrent_id_mapper_master.cpp => concurrent_id_mapper_single_node.cpp} (95%) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4ec1e39f4..dff8140db 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -36,6 +36,8 @@ set(memgraph_src_files query/plan/rule_based_planner.cpp query/plan/variable_start_planner.cpp query/typed_value.cpp + storage/concurrent_id_mapper_master.cpp + storage/concurrent_id_mapper_worker.cpp storage/edge_accessor.cpp storage/locking/record_lock.cpp storage/property_value.cpp diff --git a/src/communication/rpc/rpc.cpp b/src/communication/rpc/rpc.cpp index 2bcbbfcd5..bb1c25252 100644 --- a/src/communication/rpc/rpc.cpp +++ b/src/communication/rpc/rpc.cpp @@ -69,6 +69,11 @@ Client::Client(messaging::System &system, const std::string &address, writer_(system, address, port, kProtocolStreamPrefix + name), stream_(system.Open(utils::RandomString(20))) {} +Client::Client(messaging::System &system, + const io::network::NetworkEndpoint &endpoint, + const std::string &name) + : Client(system, endpoint.address(), endpoint.port(), name) {} + // Because of the way Call is implemented it can fail without reporting (it will // just block indefinately). This is why you always need to provide reasonable // timeout when calling it. diff --git a/src/communication/rpc/rpc.hpp b/src/communication/rpc/rpc.hpp index 98b186762..48e4cd9a6 100644 --- a/src/communication/rpc/rpc.hpp +++ b/src/communication/rpc/rpc.hpp @@ -3,6 +3,7 @@ #include #include "communication/messaging/distributed.hpp" +#include "io/network/network_endpoint.hpp" namespace communication::rpc { @@ -17,6 +18,8 @@ class Client { public: Client(messaging::System &system, const std::string &address, uint16_t port, const std::string &name); + Client(messaging::System &system, + const io::network::NetworkEndpoint &endpoint, const std::string &name); // Call function can initiate only one request at the time. Function blocks // until there is a response or timeout was reached. If timeout was reached diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index e3dc37fb6..fd5adc2a8 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -13,6 +13,7 @@ #include "mvcc/version_list.hpp" #include "storage/concurrent_id_mapper.hpp" #include "storage/concurrent_id_mapper_master.hpp" +#include "storage/concurrent_id_mapper_single_node.hpp" #include "storage/deferred_deleter.hpp" #include "storage/edge.hpp" #include "storage/garbage_collector.hpp" @@ -119,14 +120,19 @@ class GraphDb { // Id to value mappers. // TODO this should be also garbage collected - std::unique_ptr> labels_{ - new MasterConcurrentIdMapper}; - std::unique_ptr> + std::unique_ptr> + labels_{new storage::SingleNodeConcurrentIdMapper}; + std::unique_ptr< + storage::ConcurrentIdMapper> edge_types_{ - new MasterConcurrentIdMapper}; - std::unique_ptr> + new storage::SingleNodeConcurrentIdMapper}; + std::unique_ptr< + storage::ConcurrentIdMapper> properties_{ - new MasterConcurrentIdMapper}; + new storage::SingleNodeConcurrentIdMapper}; // indexes KeyIndex labels_index_; diff --git a/src/database/graph_db_datatypes.hpp b/src/database/graph_db_datatypes.hpp index 9f88a8ee0..aadbf6d6d 100644 --- a/src/database/graph_db_datatypes.hpp +++ b/src/database/graph_db_datatypes.hpp @@ -5,6 +5,7 @@ #include "utils/total_ordering.hpp" namespace GraphDbTypes { + template class Common : TotalOrdering { public: @@ -24,6 +25,12 @@ class Common : TotalOrdering { size_t operator()(const TSpecificType &t) const { return hash(t.storage_); } }; + /** Required for cereal serialization. */ + template + void serialize(Archive &archive) { + archive(storage_); + } + private: StorageT storage_{0}; }; diff --git a/src/storage/concurrent_id_mapper.hpp b/src/storage/concurrent_id_mapper.hpp index 0f9fe46b8..93952ad7d 100644 --- a/src/storage/concurrent_id_mapper.hpp +++ b/src/storage/concurrent_id_mapper.hpp @@ -1,5 +1,7 @@ #pragma once +namespace storage { + /** * Defines an interface for mapping IDs to values and vice-versa. The interface * is necessary because in the distributed system implementations are different @@ -15,3 +17,4 @@ class ConcurrentIdMapper { virtual TId value_to_id(const TValue &value) = 0; virtual const TValue &id_to_value(const TId &id) = 0; }; +} // namespace storage diff --git a/src/storage/concurrent_id_mapper_master.cpp b/src/storage/concurrent_id_mapper_master.cpp new file mode 100644 index 000000000..ee01b2448 --- /dev/null +++ b/src/storage/concurrent_id_mapper_master.cpp @@ -0,0 +1,49 @@ +#include "glog/logging.h" + +#include "database/graph_db_datatypes.hpp" +#include "storage/concurrent_id_mapper_master.hpp" +#include "storage/concurrent_id_mapper_rpc_messages.hpp" + +namespace storage { + +namespace { +template +void RegisterRpc(MasterConcurrentIdMapper &mapper, + communication::rpc::Server &rpc_server); +#define ID_VALUE_RPC_CALLS(type) \ + template <> \ + void RegisterRpc(MasterConcurrentIdMapper & mapper, \ + communication::rpc::Server & rpc_server) { \ + rpc_server.Register([&mapper](const type##IdReq &req) { \ + return std::make_unique(mapper.value_to_id(req.member)); \ + }); \ + rpc_server.Register([&mapper](const Id##type##Req &req) { \ + return std::make_unique(mapper.id_to_value(req.member)); \ + }); \ + } + +using namespace GraphDbTypes; +ID_VALUE_RPC_CALLS(Label) +ID_VALUE_RPC_CALLS(EdgeType) +ID_VALUE_RPC_CALLS(Property) +#undef ID_VALUE_RPC +} + +template +MasterConcurrentIdMapper::MasterConcurrentIdMapper( + communication::messaging::System &system) + : rpc_server_(system, kConcurrentIdMapperRpc) { + RegisterRpc(*this, rpc_server_); + rpc_server_.Start(); +} + +template +MasterConcurrentIdMapper::~MasterConcurrentIdMapper() { + rpc_server_.Shutdown(); +} + +template class MasterConcurrentIdMapper