Prepare ConcurrentIdMapper for distributed

Reviewers: dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1017
This commit is contained in:
florijan 2017-12-04 14:05:59 +01:00
parent e26456d5ad
commit d1dbf22cd1
6 changed files with 126 additions and 79 deletions

View File

@ -17,6 +17,7 @@
#include "storage/deferred_deleter.hpp"
#include "storage/edge.hpp"
#include "storage/garbage_collector.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
#include "storage/vertex.hpp"
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
@ -116,17 +117,16 @@ class GraphDb {
DeferredDeleter<mvcc::VersionList<Vertex>> vertex_version_list_deleter_;
DeferredDeleter<mvcc::VersionList<Edge>> edge_version_list_deleter_;
// unique object stores
// Id to value mappers.
// TODO this should be also garbage collected
ConcurrentIdMapper<GraphDbTypes::Label, GraphDbTypes::Label::StorageT,
std::string>
labels_;
ConcurrentIdMapper<GraphDbTypes::EdgeType, GraphDbTypes::EdgeType::StorageT,
std::string>
edge_types_;
ConcurrentIdMapper<GraphDbTypes::Property, GraphDbTypes::Property::StorageT,
std::string>
properties_;
std::unique_ptr<ConcurrentIdMapper<GraphDbTypes::Label, std::string>> labels_{
new MasterConcurrentIdMapper<GraphDbTypes::Label, std::string>};
std::unique_ptr<ConcurrentIdMapper<GraphDbTypes::EdgeType, std::string>>
edge_types_{
new MasterConcurrentIdMapper<GraphDbTypes::EdgeType, std::string>};
std::unique_ptr<ConcurrentIdMapper<GraphDbTypes::Property, std::string>>
properties_{
new MasterConcurrentIdMapper<GraphDbTypes::Property, std::string>};
// indexes
KeyIndex<GraphDbTypes::Label, Vertex> labels_index_;

View File

@ -327,37 +327,37 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge_accessor,
GraphDbTypes::Label GraphDbAccessor::Label(const std::string &label_name) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.labels_.insert_value(label_name);
return db_.labels_->value_to_id(label_name);
}
const std::string &GraphDbAccessor::LabelName(
const GraphDbTypes::Label label) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.labels_.value_by_id(label);
return db_.labels_->id_to_value(label);
}
GraphDbTypes::EdgeType GraphDbAccessor::EdgeType(
const std::string &edge_type_name) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.edge_types_.insert_value(edge_type_name);
return db_.edge_types_->value_to_id(edge_type_name);
}
const std::string &GraphDbAccessor::EdgeTypeName(
const GraphDbTypes::EdgeType edge_type) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.edge_types_.value_by_id(edge_type);
return db_.edge_types_->id_to_value(edge_type);
}
GraphDbTypes::Property GraphDbAccessor::Property(
const std::string &property_name) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.properties_.insert_value(property_name);
return db_.properties_->value_to_id(property_name);
}
const std::string &GraphDbAccessor::PropertyName(
const GraphDbTypes::Property property) const {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return db_.properties_.value_by_id(property);
return db_.properties_->id_to_value(property);
}
int64_t GraphDbAccessor::Counter(const std::string &name) {

View File

@ -1,57 +1,17 @@
#pragma once
#include "glog/logging.h"
#include "data_structures/concurrent/concurrent_map.hpp"
/**
* @brief Implements injection between values and ids. Safe to use
* concurrently.
* @TParam TIds - Id type which to use
* @TParam TIdPrimitive - Primitive type which defines storage size (uint16_t,
* uint32_t, etc.)
* @TParam TRecord - Value type for which to define bijection
* Defines an interface for mapping IDs to values and vice-versa. The interface
* is necessary because in the distributed system implementations are different
* for the master (single source of truth) and worker (must query master).
* Both implementations must be concurrent.
*
* @TParam TId - ID type. Must expose `::TStorage`.
* @TParam TRecord - Value type.
*/
template <typename TId, typename TIdPrimitive, typename TValue>
template <typename TId, typename TValue>
class ConcurrentIdMapper {
public:
/**
* Thread safe insert value and get a unique id for it. Calling this method
* with the same value from any number of threads will always return the same
* id, i.e. mapping between values and ids will always be consistent.
*/
TId insert_value(const TValue &value) {
auto value_to_id_acc = value_to_id_.access();
auto found = value_to_id_acc.find(value);
TId inserted_id(0);
if (found == value_to_id_acc.end()) {
TIdPrimitive new_id = id_.fetch_add(1);
DCHECK(new_id < std::numeric_limits<TIdPrimitive>::max())
<< "Number of used ids overflowed our container";
auto insert_result = value_to_id_acc.insert(value, TId(new_id));
// After we tried to insert value with our id we either got our id, or the
// id created by the thread which succesfully inserted (value, id) pair
inserted_id = insert_result.first->second;
} else {
inserted_id = found->second;
}
auto id_to_value_acc = id_to_value_.access();
// We have to try to insert the inserted_id and value even if we are not the
// one who assigned id because we have to make sure that after this method
// returns that both mappings between id->value and value->id exist.
id_to_value_acc.insert(inserted_id, value);
return inserted_id;
}
const TValue &value_by_id(const TId &id) const {
const auto id_to_value_acc = id_to_value_.access();
auto result = id_to_value_acc.find(id);
DCHECK(result != id_to_value_acc.end());
return result->second;
}
private:
ConcurrentMap<TId, TValue> id_to_value_;
ConcurrentMap<TValue, TId> value_to_id_;
std::atomic<TIdPrimitive> id_{0};
virtual TId value_to_id(const TValue &value) = 0;
virtual const TValue &id_to_value(const TId &id) = 0;
};

View File

@ -0,0 +1,47 @@
#pragma once
#include "glog/logging.h"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/concurrent_id_mapper.hpp"
/** Master implementation of ConcurrentIdMapper. */
template <typename TId, typename TValue>
class MasterConcurrentIdMapper : public ConcurrentIdMapper<TId, TValue> {
using StorageT = typename TId::StorageT;
public:
TId value_to_id(const TValue &value) override {
auto value_to_id_acc = value_to_id_.access();
auto found = value_to_id_acc.find(value);
TId inserted_id(0);
if (found == value_to_id_acc.end()) {
StorageT new_id = id_.fetch_add(1);
DCHECK(new_id < std::numeric_limits<StorageT>::max())
<< "Number of used ids overflowed our container";
auto insert_result = value_to_id_acc.insert(value, TId(new_id));
// After we tried to insert value with our id we either got our id, or the
// id created by the thread which succesfully inserted (value, id) pair
inserted_id = insert_result.first->second;
} else {
inserted_id = found->second;
}
auto id_to_value_acc = id_to_value_.access();
// We have to try to insert the inserted_id and value even if we are not the
// one who assigned id because we have to make sure that after this method
// returns that both mappings between id->value and value->id exist.
id_to_value_acc.insert(inserted_id, value);
return inserted_id;
}
const TValue &id_to_value(const TId &id) override {
const auto id_to_value_acc = id_to_value_.access();
auto result = id_to_value_acc.find(id);
DCHECK(result != id_to_value_acc.end());
return result->second;
}
private:
ConcurrentMap<TValue, TId> value_to_id_;
ConcurrentMap<TId, TValue> id_to_value_;
std::atomic<StorageT> id_{0};
};

View File

@ -0,0 +1,36 @@
#pragma once
#include "glog/logging.h"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/concurrent_id_mapper.hpp"
/** Worker implementation of ConcurrentIdMapper. */
template <typename TId, typename TValue>
class WorkerConcurrentIdMapper : public ConcurrentIdMapper<TId, TValue> {
public:
TId value_to_id(const TValue &value) override {
auto accessor = value_to_id_cache_.accessor();
auto found = accessor.find(value);
if (found != accessor.end()) return found.second;
// TODO make an RPC call to get the ID for value
TId id;
accessor.insert(value, id);
return id;
}
const TValue &id_to_value(const TId &id) override {
auto accessor = id_to_value_cache_.accessor();
auto found = accessor.find(id);
if (found != accessor.end()) return found.second;
// TODO make an RPC call to get the value for ID
TValue value;
return accessor.insert(id, value).second.second;
}
private:
// Sources of truth for the mappings are on the master, not on this worker. We
// keep the caches.
ConcurrentMap<TValue, TId> value_to_id_cache_;
ConcurrentMap<TId, TValue> id_to_value_cache_;
};

View File

@ -4,32 +4,36 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "storage/concurrent_id_mapper.hpp"
#include "database/graph_db_datatypes.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
const int THREAD_NUM = 20;
const int VALUE_MAX = 50;
using Id = GraphDbTypes::Label;
using Mapper = MasterConcurrentIdMapper<Id, int>;
TEST(ConcurrentIdMapper, SameValueGivesSameId) {
ConcurrentIdMapper<int, int, int> mapper;
EXPECT_EQ(mapper.insert_value(1), mapper.insert_value(1));
Mapper mapper;
EXPECT_EQ(mapper.value_to_id(1), mapper.value_to_id(1));
}
TEST(ConcurrentIdMapper, IdToValue) {
ConcurrentIdMapper<int, int, int> mapper;
Mapper mapper;
auto value = 1;
auto id = mapper.insert_value(value);
EXPECT_EQ(value, mapper.value_by_id(id));
auto id = mapper.value_to_id(value);
EXPECT_EQ(value, mapper.id_to_value(id));
}
TEST(ConcurrentIdMapper, TwoValuesTwoIds) {
ConcurrentIdMapper<int, int, int> mapper;
EXPECT_NE(mapper.insert_value(1), mapper.insert_value(2));
Mapper mapper;
EXPECT_NE(mapper.value_to_id(1), mapper.value_to_id(2));
}
TEST(ConcurrentIdMapper, SameIdReturnedMultipleThreads) {
std::vector<std::thread> threads;
ConcurrentIdMapper<int, int, int> mapper;
std::vector<std::vector<int>> thread_value_ids(THREAD_NUM);
Mapper mapper;
std::vector<std::vector<Id>> thread_value_ids(THREAD_NUM);
std::atomic<int> current_value{0};
std::atomic<int> current_value_insertion_count{0};
@ -42,10 +46,10 @@ TEST(ConcurrentIdMapper, SameIdReturnedMultipleThreads) {
int last = -1;
while (current_value <= VALUE_MAX) {
while (last == current_value) continue;
auto id = mapper.insert_value(current_value.load());
auto id = mapper.value_to_id(current_value.load());
thread_value_ids[i].push_back(id);
// Also check that reverse mapping exists after method exits
EXPECT_EQ(mapper.value_by_id(id), current_value.load());
EXPECT_EQ(mapper.id_to_value(id), current_value.load());
last = current_value;
current_value_insertion_count.fetch_add(1);
}
@ -69,6 +73,6 @@ TEST(ConcurrentIdMapper, SameIdReturnedMultipleThreads) {
EXPECT_EQ(thread_value_ids[i], thread_value_ids[j]);
// Each value should have a unique id
std::set<int> ids(thread_value_ids[0].begin(), thread_value_ids[0].end());
std::set<Id> ids(thread_value_ids[0].begin(), thread_value_ids[0].end());
EXPECT_EQ(ids.size(), thread_value_ids[0].size());
}