Add RPC to concurrent ID mapper

Summary:
The distributed ID mapper is not yet utilised in GraphDb as those
changes are in D1060. Depending on landing order it will be added.

Reviewers: dgleich, mislav.bradac

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1064
This commit is contained in:
florijan 2017-12-15 10:48:21 +01:00
parent 0eb30db8db
commit 37722e54b3
14 changed files with 332 additions and 70 deletions

View File

@ -36,6 +36,8 @@ set(memgraph_src_files
query/plan/rule_based_planner.cpp query/plan/rule_based_planner.cpp
query/plan/variable_start_planner.cpp query/plan/variable_start_planner.cpp
query/typed_value.cpp query/typed_value.cpp
storage/concurrent_id_mapper_master.cpp
storage/concurrent_id_mapper_worker.cpp
storage/edge_accessor.cpp storage/edge_accessor.cpp
storage/locking/record_lock.cpp storage/locking/record_lock.cpp
storage/property_value.cpp storage/property_value.cpp

View File

@ -69,6 +69,11 @@ Client::Client(messaging::System &system, const std::string &address,
writer_(system, address, port, kProtocolStreamPrefix + name), writer_(system, address, port, kProtocolStreamPrefix + name),
stream_(system.Open(utils::RandomString(20))) {} stream_(system.Open(utils::RandomString(20))) {}
Client::Client(messaging::System &system,
const io::network::NetworkEndpoint &endpoint,
const std::string &name)
: Client(system, endpoint.address(), endpoint.port(), name) {}
// Because of the way Call is implemented it can fail without reporting (it will // Because of the way Call is implemented it can fail without reporting (it will
// just block indefinately). This is why you always need to provide reasonable // just block indefinately). This is why you always need to provide reasonable
// timeout when calling it. // timeout when calling it.

View File

@ -3,6 +3,7 @@
#include <type_traits> #include <type_traits>
#include "communication/messaging/distributed.hpp" #include "communication/messaging/distributed.hpp"
#include "io/network/network_endpoint.hpp"
namespace communication::rpc { namespace communication::rpc {
@ -17,6 +18,8 @@ class Client {
public: public:
Client(messaging::System &system, const std::string &address, uint16_t port, Client(messaging::System &system, const std::string &address, uint16_t port,
const std::string &name); const std::string &name);
Client(messaging::System &system,
const io::network::NetworkEndpoint &endpoint, const std::string &name);
// Call function can initiate only one request at the time. Function blocks // Call function can initiate only one request at the time. Function blocks
// until there is a response or timeout was reached. If timeout was reached // until there is a response or timeout was reached. If timeout was reached

View File

@ -13,6 +13,7 @@
#include "mvcc/version_list.hpp" #include "mvcc/version_list.hpp"
#include "storage/concurrent_id_mapper.hpp" #include "storage/concurrent_id_mapper.hpp"
#include "storage/concurrent_id_mapper_master.hpp" #include "storage/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_single_node.hpp"
#include "storage/deferred_deleter.hpp" #include "storage/deferred_deleter.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/garbage_collector.hpp" #include "storage/garbage_collector.hpp"
@ -119,14 +120,19 @@ class GraphDb {
// Id to value mappers. // Id to value mappers.
// TODO this should be also garbage collected // TODO this should be also garbage collected
std::unique_ptr<ConcurrentIdMapper<GraphDbTypes::Label, std::string>> labels_{ std::unique_ptr<storage::ConcurrentIdMapper<GraphDbTypes::Label, std::string>>
new MasterConcurrentIdMapper<GraphDbTypes::Label, std::string>}; labels_{new storage::SingleNodeConcurrentIdMapper<GraphDbTypes::Label,
std::unique_ptr<ConcurrentIdMapper<GraphDbTypes::EdgeType, std::string>> std::string>};
std::unique_ptr<
storage::ConcurrentIdMapper<GraphDbTypes::EdgeType, std::string>>
edge_types_{ edge_types_{
new MasterConcurrentIdMapper<GraphDbTypes::EdgeType, std::string>}; new storage::SingleNodeConcurrentIdMapper<GraphDbTypes::EdgeType,
std::unique_ptr<ConcurrentIdMapper<GraphDbTypes::Property, std::string>> std::string>};
std::unique_ptr<
storage::ConcurrentIdMapper<GraphDbTypes::Property, std::string>>
properties_{ properties_{
new MasterConcurrentIdMapper<GraphDbTypes::Property, std::string>}; new storage::SingleNodeConcurrentIdMapper<GraphDbTypes::Property,
std::string>};
// indexes // indexes
KeyIndex<GraphDbTypes::Label, Vertex> labels_index_; KeyIndex<GraphDbTypes::Label, Vertex> labels_index_;

View File

@ -5,6 +5,7 @@
#include "utils/total_ordering.hpp" #include "utils/total_ordering.hpp"
namespace GraphDbTypes { namespace GraphDbTypes {
template <typename TSpecificType> template <typename TSpecificType>
class Common : TotalOrdering<TSpecificType> { class Common : TotalOrdering<TSpecificType> {
public: public:
@ -24,6 +25,12 @@ class Common : TotalOrdering<TSpecificType> {
size_t operator()(const TSpecificType &t) const { return hash(t.storage_); } size_t operator()(const TSpecificType &t) const { return hash(t.storage_); }
}; };
/** Required for cereal serialization. */
template <class Archive>
void serialize(Archive &archive) {
archive(storage_);
}
private: private:
StorageT storage_{0}; StorageT storage_{0};
}; };

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
namespace storage {
/** /**
* Defines an interface for mapping IDs to values and vice-versa. The interface * Defines an interface for mapping IDs to values and vice-versa. The interface
* is necessary because in the distributed system implementations are different * is necessary because in the distributed system implementations are different
@ -15,3 +17,4 @@ class ConcurrentIdMapper {
virtual TId value_to_id(const TValue &value) = 0; virtual TId value_to_id(const TValue &value) = 0;
virtual const TValue &id_to_value(const TId &id) = 0; virtual const TValue &id_to_value(const TId &id) = 0;
}; };
} // namespace storage

View File

@ -0,0 +1,49 @@
#include "glog/logging.h"
#include "database/graph_db_datatypes.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp"
namespace storage {
namespace {
template <typename TId>
void RegisterRpc(MasterConcurrentIdMapper<TId> &mapper,
communication::rpc::Server &rpc_server);
#define ID_VALUE_RPC_CALLS(type) \
template <> \
void RegisterRpc<type>(MasterConcurrentIdMapper<type> & mapper, \
communication::rpc::Server & rpc_server) { \
rpc_server.Register<type##IdRpc>([&mapper](const type##IdReq &req) { \
return std::make_unique<type##IdRes>(mapper.value_to_id(req.member)); \
}); \
rpc_server.Register<Id##type##Rpc>([&mapper](const Id##type##Req &req) { \
return std::make_unique<Id##type##Res>(mapper.id_to_value(req.member)); \
}); \
}
using namespace GraphDbTypes;
ID_VALUE_RPC_CALLS(Label)
ID_VALUE_RPC_CALLS(EdgeType)
ID_VALUE_RPC_CALLS(Property)
#undef ID_VALUE_RPC
}
template <typename TId>
MasterConcurrentIdMapper<TId>::MasterConcurrentIdMapper(
communication::messaging::System &system)
: rpc_server_(system, kConcurrentIdMapperRpc) {
RegisterRpc(*this, rpc_server_);
rpc_server_.Start();
}
template <typename TId>
MasterConcurrentIdMapper<TId>::~MasterConcurrentIdMapper() {
rpc_server_.Shutdown();
}
template class MasterConcurrentIdMapper<Label>;
template class MasterConcurrentIdMapper<EdgeType>;
template class MasterConcurrentIdMapper<Property>;
} // namespace storage

View File

@ -1,47 +1,24 @@
#pragma once #pragma once
#include "glog/logging.h" #include <experimental/optional>
#include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "data_structures/concurrent/concurrent_map.hpp" #include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/concurrent_id_mapper.hpp" #include "storage/concurrent_id_mapper_single_node.hpp"
namespace storage {
/** Master implementation of ConcurrentIdMapper. */ /** Master implementation of ConcurrentIdMapper. */
template <typename TId, typename TValue> template <typename TId>
class MasterConcurrentIdMapper : public ConcurrentIdMapper<TId, TValue> { class MasterConcurrentIdMapper
using StorageT = typename TId::StorageT; : public SingleNodeConcurrentIdMapper<TId, std::string> {
public:
TId value_to_id(const TValue &value) override {
auto value_to_id_acc = value_to_id_.access();
auto found = value_to_id_acc.find(value);
TId inserted_id(0);
if (found == value_to_id_acc.end()) {
StorageT new_id = id_.fetch_add(1);
DCHECK(new_id < std::numeric_limits<StorageT>::max())
<< "Number of used ids overflowed our container";
auto insert_result = value_to_id_acc.insert(value, TId(new_id));
// After we tried to insert value with our id we either got our id, or the
// id created by the thread which succesfully inserted (value, id) pair
inserted_id = insert_result.first->second;
} else {
inserted_id = found->second;
}
auto id_to_value_acc = id_to_value_.access();
// We have to try to insert the inserted_id and value even if we are not the
// one who assigned id because we have to make sure that after this method
// returns that both mappings between id->value and value->id exist.
id_to_value_acc.insert(inserted_id, value);
return inserted_id;
}
const TValue &id_to_value(const TId &id) override { public:
const auto id_to_value_acc = id_to_value_.access(); MasterConcurrentIdMapper(communication::messaging::System &system);
auto result = id_to_value_acc.find(id); ~MasterConcurrentIdMapper();
DCHECK(result != id_to_value_acc.end());
return result->second;
}
private: private:
ConcurrentMap<TValue, TId> value_to_id_; communication::rpc::Server rpc_server_;
ConcurrentMap<TId, TValue> id_to_value_;
std::atomic<StorageT> id_{0};
}; };
} // namespace storage

View File

@ -0,0 +1,45 @@
#pragma once
#include <chrono>
#include "communication/rpc/rpc.hpp"
#include "database/graph_db_datatypes.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/snapshot.hpp"
#include "transactions/type.hpp"
#include "utils/rpc_pimp.hpp"
namespace storage {
const std::string kConcurrentIdMapperRpc = "ConcurrentIdMapper";
const auto kConcurrentIdMapperRpcTimeout = 300ms;
#define ID_VALUE_RPC(type) \
RPC_SINGLE_MEMBER_MESSAGE(type##IdReq, std::string); \
RPC_SINGLE_MEMBER_MESSAGE(type##IdRes, GraphDbTypes::type); \
using type##IdRpc = \
communication::rpc::RequestResponse<type##IdReq, type##IdRes>; \
RPC_SINGLE_MEMBER_MESSAGE(Id##type##Req, GraphDbTypes::type); \
RPC_SINGLE_MEMBER_MESSAGE(Id##type##Res, std::string); \
using Id##type##Rpc = \
communication::rpc::RequestResponse<Id##type##Req, Id##type##Res>;
ID_VALUE_RPC(Label)
ID_VALUE_RPC(EdgeType)
ID_VALUE_RPC(Property)
#undef ID_VALUE_RPC
} // namespace storage
#define ID_VALUE_REGISTER_CEREAL_TYPE(type) \
CEREAL_REGISTER_TYPE(storage::type##IdReq); \
CEREAL_REGISTER_TYPE(storage::type##IdRes); \
CEREAL_REGISTER_TYPE(storage::Id##type##Req); \
CEREAL_REGISTER_TYPE(storage::Id##type##Res);
ID_VALUE_REGISTER_CEREAL_TYPE(Label)
ID_VALUE_REGISTER_CEREAL_TYPE(EdgeType)
ID_VALUE_REGISTER_CEREAL_TYPE(Property)
#undef ID_VALUE_REGISTER_CEREAL_TYPE

View File

@ -0,0 +1,49 @@
#pragma once
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/concurrent_id_mapper.hpp"
namespace storage {
/** SingleNode implementation of ConcurrentIdMapper. */
template <typename TId, typename TValue>
class SingleNodeConcurrentIdMapper : public ConcurrentIdMapper<TId, TValue> {
using StorageT = typename TId::StorageT;
public:
TId value_to_id(const TValue &value) override {
auto value_to_id_acc = value_to_id_.access();
auto found = value_to_id_acc.find(value);
TId inserted_id(0);
if (found == value_to_id_acc.end()) {
StorageT new_id = id_.fetch_add(1);
DCHECK(new_id < std::numeric_limits<StorageT>::max())
<< "Number of used ids overflowed our container";
auto insert_result = value_to_id_acc.insert(value, TId(new_id));
// After we tried to insert value with our id we either got our id, or the
// id created by the thread which succesfully inserted (value, id) pair
inserted_id = insert_result.first->second;
} else {
inserted_id = found->second;
}
auto id_to_value_acc = id_to_value_.access();
// We have to try to insert the inserted_id and value even if we are not the
// one who assigned id because we have to make sure that after this method
// returns that both mappings between id->value and value->id exist.
id_to_value_acc.insert(inserted_id, value);
return inserted_id;
}
const TValue &id_to_value(const TId &id) override {
const auto id_to_value_acc = id_to_value_.access();
auto result = id_to_value_acc.find(id);
DCHECK(result != id_to_value_acc.end());
return result->second;
}
private:
ConcurrentMap<TValue, TId> value_to_id_;
ConcurrentMap<TId, TValue> id_to_value_;
std::atomic<StorageT> id_{0};
};
} // namespace storage

View File

@ -0,0 +1,63 @@
#include "glog/logging.h"
#include "concurrent_id_mapper_worker.hpp"
#include "database/graph_db_datatypes.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp"
namespace storage {
#define ID_VALUE_RPC_CALLS(type) \
template <> \
type WorkerConcurrentIdMapper<type>::RpcValueToId( \
const std::string &value) { \
auto response = \
rpc_client_.Call<type##IdRpc>(kConcurrentIdMapperRpcTimeout, value); \
CHECK(response) << ("Failed to obtain " #type " from master"); \
return response->member; \
} \
\
template <> \
std::string WorkerConcurrentIdMapper<type>::RpcIdToValue(type id) { \
auto response = \
rpc_client_.Call<Id##type##Rpc>(kConcurrentIdMapperRpcTimeout, id); \
CHECK(response) << ("Failed to obtain " #type " value from master"); \
return response->member; \
}
using namespace GraphDbTypes;
ID_VALUE_RPC_CALLS(Label)
ID_VALUE_RPC_CALLS(EdgeType)
ID_VALUE_RPC_CALLS(Property)
#undef ID_VALUE_RPC_CALLS
template <typename TId>
WorkerConcurrentIdMapper<TId>::WorkerConcurrentIdMapper(
communication::messaging::System &system,
const io::network::NetworkEndpoint &master_endpoint)
: rpc_client_(system, master_endpoint, kConcurrentIdMapperRpc) {}
template <typename TId>
TId WorkerConcurrentIdMapper<TId>::value_to_id(const std::string &value) {
auto accessor = value_to_id_cache_.access();
auto found = accessor.find(value);
if (found != accessor.end()) return found->second;
TId id = RpcValueToId(value);
accessor.insert(value, id);
return id;
}
template <typename TId>
const std::string &WorkerConcurrentIdMapper<TId>::id_to_value(const TId &id) {
auto accessor = id_to_value_cache_.access();
auto found = accessor.find(id);
if (found != accessor.end()) return found->second;
std::string value = RpcIdToValue(id);
return accessor.insert(id, value).first->second;
}
template class WorkerConcurrentIdMapper<Label>;
template class WorkerConcurrentIdMapper<EdgeType>;
template class WorkerConcurrentIdMapper<Property>;
} // namespace storage

View File

@ -1,36 +1,36 @@
#pragma once #pragma once
#include "glog/logging.h" #include "communication/messaging/distributed.hpp"
#include "communication/rpc/rpc.hpp"
#include "data_structures/concurrent/concurrent_map.hpp" #include "data_structures/concurrent/concurrent_map.hpp"
#include "io/network/network_endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp" #include "storage/concurrent_id_mapper.hpp"
/** Worker implementation of ConcurrentIdMapper. */ namespace storage {
template <typename TId, typename TValue>
class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId, TValue> {
public:
TId value_to_id(const TValue &value) override {
auto accessor = value_to_id_cache_.accessor();
auto found = accessor.find(value);
if (found != accessor.end()) return found.second;
// TODO make an RPC call to get the ID for value
TId id;
accessor.insert(value, id);
return id;
}
const TValue &id_to_value(const TId &id) override { /** Worker implementation of ConcurrentIdMapper. */
auto accessor = id_to_value_cache_.accessor(); template <typename TId>
auto found = accessor.find(id); class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId, std::string> {
if (found != accessor.end()) return found.second; // Makes an appropriate RPC call for the current TId type and the given value.
// TODO make an RPC call to get the value for ID TId RpcValueToId(const std::string &value);
TValue value;
return accessor.insert(id, value).second.second; // Makes an appropriate RPC call for the current TId type and the given value.
} std::string RpcIdToValue(TId id);
public:
WorkerConcurrentIdMapper(communication::messaging::System &system,
const io::network::NetworkEndpoint &master_endpoint);
TId value_to_id(const std::string &value) override;
const std::string &id_to_value(const TId &id) override;
private: private:
// Sources of truth for the mappings are on the master, not on this worker. We // Sources of truth for the mappings are on the master, not on this worker. We
// keep the caches. // keep the caches.
ConcurrentMap<TValue, TId> value_to_id_cache_; ConcurrentMap<std::string, TId> value_to_id_cache_;
ConcurrentMap<TId, TValue> id_to_value_cache_; ConcurrentMap<TId, std::string> id_to_value_cache_;
// Communication to the concurrent ID master.
mutable communication::rpc::Client rpc_client_;
}; };
} // namespace storage

View File

@ -0,0 +1,53 @@
#include <experimental/optional>
#include "gtest/gtest.h"
#include "communication/messaging/distributed.hpp"
#include "database/graph_db_datatypes.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_worker.hpp"
using namespace communication::messaging;
using namespace storage;
using namespace GraphDbTypes;
template <typename TId>
class DistributedConcurrentIdMapperTest : public ::testing::Test {
const std::string kLocal{"127.0.0.1"};
protected:
System master_system_{kLocal, 0};
std::experimental::optional<MasterConcurrentIdMapper<TId>> master_mapper_;
System worker_system_{kLocal, 0};
std::experimental::optional<WorkerConcurrentIdMapper<TId>> worker_mapper_;
void SetUp() override {
master_mapper_.emplace(master_system_);
worker_mapper_.emplace(worker_system_, master_system_.endpoint());
}
void TearDown() override {
worker_mapper_ = std::experimental::nullopt;
worker_system_.Shutdown();
master_mapper_ = std::experimental::nullopt;
master_system_.Shutdown();
}
};
using namespace GraphDbTypes;
typedef ::testing::Types<Label, EdgeType, Property> GraphDbTestTypes;
TYPED_TEST_CASE(DistributedConcurrentIdMapperTest, GraphDbTestTypes);
TYPED_TEST(DistributedConcurrentIdMapperTest, Basic) {
auto &master = this->master_mapper_.value();
auto &worker = this->worker_mapper_.value();
auto id1 = master.value_to_id("v1");
EXPECT_EQ(worker.id_to_value(id1), "v1");
EXPECT_EQ(worker.value_to_id("v1"), id1);
auto id2 = worker.value_to_id("v2");
EXPECT_EQ(master.id_to_value(id2), "v2");
EXPECT_EQ(master.value_to_id("v2"),id2);
EXPECT_NE(id1, id2);
}

View File

@ -5,13 +5,13 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "database/graph_db_datatypes.hpp" #include "database/graph_db_datatypes.hpp"
#include "storage/concurrent_id_mapper_master.hpp" #include "storage/concurrent_id_mapper_single_node.hpp"
const int THREAD_NUM = 20; const int THREAD_NUM = 20;
const int VALUE_MAX = 50; const int VALUE_MAX = 50;
using Id = GraphDbTypes::Label; using Id = GraphDbTypes::Label;
using Mapper = MasterConcurrentIdMapper<Id, int>; using Mapper = storage::SingleNodeConcurrentIdMapper<Id, int>;
TEST(ConcurrentIdMapper, SameValueGivesSameId) { TEST(ConcurrentIdMapper, SameValueGivesSameId) {
Mapper mapper; Mapper mapper;