From d1dbf22cd19e6b50318b120bc9fb17dab5fb007c Mon Sep 17 00:00:00 2001 From: florijan Date: Mon, 4 Dec 2017 14:05:59 +0100 Subject: [PATCH] Prepare ConcurrentIdMapper for distributed Reviewers: dgleich Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1017 --- src/database/graph_db.hpp | 20 +++---- src/database/graph_db_accessor.cpp | 12 ++-- src/storage/concurrent_id_mapper.hpp | 60 ++++--------------- src/storage/concurrent_id_mapper_master.hpp | 47 +++++++++++++++ src/storage/concurrent_id_mapper_worker.hpp | 36 +++++++++++ ...er.cpp => concurrent_id_mapper_master.cpp} | 30 ++++++---- 6 files changed, 126 insertions(+), 79 deletions(-) create mode 100644 src/storage/concurrent_id_mapper_master.hpp create mode 100644 src/storage/concurrent_id_mapper_worker.hpp rename tests/unit/{concurrent_id_mapper.cpp => concurrent_id_mapper_master.cpp} (72%) diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 29c1ee5d8..f9b7d17f3 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -17,6 +17,7 @@ #include "storage/deferred_deleter.hpp" #include "storage/edge.hpp" #include "storage/garbage_collector.hpp" +#include "storage/concurrent_id_mapper_master.hpp" #include "storage/vertex.hpp" #include "transactions/engine.hpp" #include "utils/scheduler.hpp" @@ -116,17 +117,16 @@ class GraphDb { DeferredDeleter> vertex_version_list_deleter_; DeferredDeleter> edge_version_list_deleter_; - // unique object stores + // Id to value mappers. // TODO this should be also garbage collected - ConcurrentIdMapper - labels_; - ConcurrentIdMapper - edge_types_; - ConcurrentIdMapper - properties_; + std::unique_ptr> labels_{ + new MasterConcurrentIdMapper}; + std::unique_ptr> + edge_types_{ + new MasterConcurrentIdMapper}; + std::unique_ptr> + properties_{ + new MasterConcurrentIdMapper}; // indexes KeyIndex labels_index_; diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 9aa7d0c23..329f90956 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -327,37 +327,37 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge_accessor, GraphDbTypes::Label GraphDbAccessor::Label(const std::string &label_name) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - return db_.labels_.insert_value(label_name); + return db_.labels_->value_to_id(label_name); } const std::string &GraphDbAccessor::LabelName( const GraphDbTypes::Label label) const { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - return db_.labels_.value_by_id(label); + return db_.labels_->id_to_value(label); } GraphDbTypes::EdgeType GraphDbAccessor::EdgeType( const std::string &edge_type_name) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - return db_.edge_types_.insert_value(edge_type_name); + return db_.edge_types_->value_to_id(edge_type_name); } const std::string &GraphDbAccessor::EdgeTypeName( const GraphDbTypes::EdgeType edge_type) const { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - return db_.edge_types_.value_by_id(edge_type); + return db_.edge_types_->id_to_value(edge_type); } GraphDbTypes::Property GraphDbAccessor::Property( const std::string &property_name) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - return db_.properties_.insert_value(property_name); + return db_.properties_->value_to_id(property_name); } const std::string &GraphDbAccessor::PropertyName( const GraphDbTypes::Property property) const { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; - return db_.properties_.value_by_id(property); + return db_.properties_->id_to_value(property); } int64_t GraphDbAccessor::Counter(const std::string &name) { diff --git a/src/storage/concurrent_id_mapper.hpp b/src/storage/concurrent_id_mapper.hpp index f96761c57..0f9fe46b8 100644 --- a/src/storage/concurrent_id_mapper.hpp +++ b/src/storage/concurrent_id_mapper.hpp @@ -1,57 +1,17 @@ #pragma once -#include "glog/logging.h" - -#include "data_structures/concurrent/concurrent_map.hpp" - /** - * @brief Implements injection between values and ids. Safe to use - * concurrently. - * @TParam TIds - Id type which to use - * @TParam TIdPrimitive - Primitive type which defines storage size (uint16_t, - * uint32_t, etc.) - * @TParam TRecord - Value type for which to define bijection + * Defines an interface for mapping IDs to values and vice-versa. The interface + * is necessary because in the distributed system implementations are different + * for the master (single source of truth) and worker (must query master). + * Both implementations must be concurrent. + * + * @TParam TId - ID type. Must expose `::TStorage`. + * @TParam TRecord - Value type. */ -template +template class ConcurrentIdMapper { public: - /** - * Thread safe insert value and get a unique id for it. Calling this method - * with the same value from any number of threads will always return the same - * id, i.e. mapping between values and ids will always be consistent. - */ - TId insert_value(const TValue &value) { - auto value_to_id_acc = value_to_id_.access(); - auto found = value_to_id_acc.find(value); - TId inserted_id(0); - if (found == value_to_id_acc.end()) { - TIdPrimitive new_id = id_.fetch_add(1); - DCHECK(new_id < std::numeric_limits::max()) - << "Number of used ids overflowed our container"; - auto insert_result = value_to_id_acc.insert(value, TId(new_id)); - // After we tried to insert value with our id we either got our id, or the - // id created by the thread which succesfully inserted (value, id) pair - inserted_id = insert_result.first->second; - } else { - inserted_id = found->second; - } - auto id_to_value_acc = id_to_value_.access(); - // We have to try to insert the inserted_id and value even if we are not the - // one who assigned id because we have to make sure that after this method - // returns that both mappings between id->value and value->id exist. - id_to_value_acc.insert(inserted_id, value); - return inserted_id; - } - - const TValue &value_by_id(const TId &id) const { - const auto id_to_value_acc = id_to_value_.access(); - auto result = id_to_value_acc.find(id); - DCHECK(result != id_to_value_acc.end()); - return result->second; - } - - private: - ConcurrentMap id_to_value_; - ConcurrentMap value_to_id_; - std::atomic id_{0}; + virtual TId value_to_id(const TValue &value) = 0; + virtual const TValue &id_to_value(const TId &id) = 0; }; diff --git a/src/storage/concurrent_id_mapper_master.hpp b/src/storage/concurrent_id_mapper_master.hpp new file mode 100644 index 000000000..6f05fcd05 --- /dev/null +++ b/src/storage/concurrent_id_mapper_master.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include "glog/logging.h" + +#include "data_structures/concurrent/concurrent_map.hpp" +#include "storage/concurrent_id_mapper.hpp" + +/** Master implementation of ConcurrentIdMapper. */ +template +class MasterConcurrentIdMapper : public ConcurrentIdMapper { + using StorageT = typename TId::StorageT; + public: + TId value_to_id(const TValue &value) override { + auto value_to_id_acc = value_to_id_.access(); + auto found = value_to_id_acc.find(value); + TId inserted_id(0); + if (found == value_to_id_acc.end()) { + StorageT new_id = id_.fetch_add(1); + DCHECK(new_id < std::numeric_limits::max()) + << "Number of used ids overflowed our container"; + auto insert_result = value_to_id_acc.insert(value, TId(new_id)); + // After we tried to insert value with our id we either got our id, or the + // id created by the thread which succesfully inserted (value, id) pair + inserted_id = insert_result.first->second; + } else { + inserted_id = found->second; + } + auto id_to_value_acc = id_to_value_.access(); + // We have to try to insert the inserted_id and value even if we are not the + // one who assigned id because we have to make sure that after this method + // returns that both mappings between id->value and value->id exist. + id_to_value_acc.insert(inserted_id, value); + return inserted_id; + } + + const TValue &id_to_value(const TId &id) override { + const auto id_to_value_acc = id_to_value_.access(); + auto result = id_to_value_acc.find(id); + DCHECK(result != id_to_value_acc.end()); + return result->second; + } + + private: + ConcurrentMap value_to_id_; + ConcurrentMap id_to_value_; + std::atomic id_{0}; +}; diff --git a/src/storage/concurrent_id_mapper_worker.hpp b/src/storage/concurrent_id_mapper_worker.hpp new file mode 100644 index 000000000..fc46d54a9 --- /dev/null +++ b/src/storage/concurrent_id_mapper_worker.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "glog/logging.h" + +#include "data_structures/concurrent/concurrent_map.hpp" +#include "storage/concurrent_id_mapper.hpp" + +/** Worker implementation of ConcurrentIdMapper. */ +template +class WorkerConcurrentIdMapper : public ConcurrentIdMapper { + public: + TId value_to_id(const TValue &value) override { + auto accessor = value_to_id_cache_.accessor(); + auto found = accessor.find(value); + if (found != accessor.end()) return found.second; + // TODO make an RPC call to get the ID for value + TId id; + accessor.insert(value, id); + return id; + } + + const TValue &id_to_value(const TId &id) override { + auto accessor = id_to_value_cache_.accessor(); + auto found = accessor.find(id); + if (found != accessor.end()) return found.second; + // TODO make an RPC call to get the value for ID + TValue value; + return accessor.insert(id, value).second.second; + } + + private: + // Sources of truth for the mappings are on the master, not on this worker. We + // keep the caches. + ConcurrentMap value_to_id_cache_; + ConcurrentMap id_to_value_cache_; +}; diff --git a/tests/unit/concurrent_id_mapper.cpp b/tests/unit/concurrent_id_mapper_master.cpp similarity index 72% rename from tests/unit/concurrent_id_mapper.cpp rename to tests/unit/concurrent_id_mapper_master.cpp index 0c447e9a6..54e2a061f 100644 --- a/tests/unit/concurrent_id_mapper.cpp +++ b/tests/unit/concurrent_id_mapper_master.cpp @@ -4,32 +4,36 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" -#include "storage/concurrent_id_mapper.hpp" +#include "database/graph_db_datatypes.hpp" +#include "storage/concurrent_id_mapper_master.hpp" const int THREAD_NUM = 20; const int VALUE_MAX = 50; +using Id = GraphDbTypes::Label; +using Mapper = MasterConcurrentIdMapper; + TEST(ConcurrentIdMapper, SameValueGivesSameId) { - ConcurrentIdMapper mapper; - EXPECT_EQ(mapper.insert_value(1), mapper.insert_value(1)); + Mapper mapper; + EXPECT_EQ(mapper.value_to_id(1), mapper.value_to_id(1)); } TEST(ConcurrentIdMapper, IdToValue) { - ConcurrentIdMapper mapper; + Mapper mapper; auto value = 1; - auto id = mapper.insert_value(value); - EXPECT_EQ(value, mapper.value_by_id(id)); + auto id = mapper.value_to_id(value); + EXPECT_EQ(value, mapper.id_to_value(id)); } TEST(ConcurrentIdMapper, TwoValuesTwoIds) { - ConcurrentIdMapper mapper; - EXPECT_NE(mapper.insert_value(1), mapper.insert_value(2)); + Mapper mapper; + EXPECT_NE(mapper.value_to_id(1), mapper.value_to_id(2)); } TEST(ConcurrentIdMapper, SameIdReturnedMultipleThreads) { std::vector threads; - ConcurrentIdMapper mapper; - std::vector> thread_value_ids(THREAD_NUM); + Mapper mapper; + std::vector> thread_value_ids(THREAD_NUM); std::atomic current_value{0}; std::atomic current_value_insertion_count{0}; @@ -42,10 +46,10 @@ TEST(ConcurrentIdMapper, SameIdReturnedMultipleThreads) { int last = -1; while (current_value <= VALUE_MAX) { while (last == current_value) continue; - auto id = mapper.insert_value(current_value.load()); + auto id = mapper.value_to_id(current_value.load()); thread_value_ids[i].push_back(id); // Also check that reverse mapping exists after method exits - EXPECT_EQ(mapper.value_by_id(id), current_value.load()); + EXPECT_EQ(mapper.id_to_value(id), current_value.load()); last = current_value; current_value_insertion_count.fetch_add(1); } @@ -69,6 +73,6 @@ TEST(ConcurrentIdMapper, SameIdReturnedMultipleThreads) { EXPECT_EQ(thread_value_ids[i], thread_value_ids[j]); // Each value should have a unique id - std::set ids(thread_value_ids[0].begin(), thread_value_ids[0].end()); + std::set ids(thread_value_ids[0].begin(), thread_value_ids[0].end()); EXPECT_EQ(ids.size(), thread_value_ids[0].size()); }