Support distributed (label, property) indexes

Summary:
Call workers buildindex

Merge branch 'master' into setup_distributed_index

Use ExecuteOnWorkers api

Merge branch 'master' into setup_distributed_index

Improve test

Merge branch 'master' into setup_distributed_index

Finish test

Reviewers: florijan, teon.banek

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1134
This commit is contained in:
Dominik Gleich 2018-01-25 17:19:33 +01:00
parent ccefae4002
commit 24857cc1cf
15 changed files with 270 additions and 59 deletions

View File

@ -3,6 +3,7 @@
#include "boost/serialization/export.hpp" #include "boost/serialization/export.hpp"
#include "distributed/coordination_rpc_messages.hpp" #include "distributed/coordination_rpc_messages.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "distributed/plan_rpc_messages.hpp" #include "distributed/plan_rpc_messages.hpp"
#include "distributed/remote_data_rpc_messages.hpp" #include "distributed/remote_data_rpc_messages.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp" #include "distributed/remote_pull_produce_rpc_messages.hpp"
@ -56,3 +57,8 @@ BOOST_CLASS_EXPORT(distributed::RemotePullResData);
BOOST_CLASS_EXPORT(distributed::RemotePullRes); BOOST_CLASS_EXPORT(distributed::RemotePullRes);
BOOST_CLASS_EXPORT(distributed::EndRemotePullReq); BOOST_CLASS_EXPORT(distributed::EndRemotePullReq);
BOOST_CLASS_EXPORT(distributed::EndRemotePullRes); BOOST_CLASS_EXPORT(distributed::EndRemotePullRes);
// Distributed indexes
BOOST_CLASS_EXPORT(distributed::BuildIndexReq);
BOOST_CLASS_EXPORT(distributed::BuildIndexRes);
BOOST_CLASS_EXPORT(distributed::IndexLabelPropertyTx);

View File

@ -4,6 +4,7 @@
#include "database/graph_db.hpp" #include "database/graph_db.hpp"
#include "distributed/coordination_master.hpp" #include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp" #include "distributed/coordination_worker.hpp"
#include "distributed/index_rpc_server.hpp"
#include "distributed/plan_consumer.hpp" #include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp" #include "distributed/plan_dispatcher.hpp"
#include "distributed/remote_data_rpc_clients.hpp" #include "distributed/remote_data_rpc_clients.hpp"
@ -98,6 +99,12 @@ class SingleNode : public PrivateBase {
distributed::RemoteDataRpcClients &remote_data_clients() override { distributed::RemoteDataRpcClients &remote_data_clients() override {
LOG(FATAL) << "Remote data clients not available in single-node."; LOG(FATAL) << "Remote data clients not available in single-node.";
} }
distributed::PlanDispatcher &plan_dispatcher() override {
LOG(FATAL) << "Plan Dispatcher not available in single-node.";
}
distributed::PlanConsumer &plan_consumer() override {
LOG(FATAL) << "Plan Consumer not available in single-node.";
}
}; };
#define IMPL_DISTRIBUTED_GETTERS \ #define IMPL_DISTRIBUTED_GETTERS \
@ -133,6 +140,8 @@ class Master : public PrivateBase {
distributed::RemoteDataRpcClients remote_data_clients_{coordination_}; distributed::RemoteDataRpcClients remote_data_clients_{coordination_};
distributed::PlanDispatcher plan_dispatcher_{coordination_}; distributed::PlanDispatcher plan_dispatcher_{coordination_};
distributed::RemotePullRpcClients remote_pull_clients_{coordination_}; distributed::RemotePullRpcClients remote_pull_clients_{coordination_};
distributed::RpcWorkerClients index_rpc_clients_{coordination_,
distributed::kIndexRpcName};
}; };
class Worker : public PrivateBase { class Worker : public PrivateBase {
@ -148,7 +157,7 @@ class Worker : public PrivateBase {
IMPL_DISTRIBUTED_GETTERS IMPL_DISTRIBUTED_GETTERS
distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; } distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; }
distributed::RemoteProduceRpcServer &remote_produce_server() override { distributed::RemoteProduceRpcServer &remote_produce_server() override {
return remote_produce_server_; return remote_produce_server();
} }
communication::rpc::System system_{config_.worker_endpoint}; communication::rpc::System system_{config_.worker_endpoint};
@ -163,6 +172,7 @@ class Worker : public PrivateBase {
distributed::PlanConsumer plan_consumer_{system_}; distributed::PlanConsumer plan_consumer_{system_};
distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_, distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_,
plan_consumer_}; plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, system_};
}; };
#undef IMPL_GETTERS #undef IMPL_GETTERS
@ -276,6 +286,10 @@ io::network::Endpoint Master::GetEndpoint(int worker_id) {
->coordination_.GetEndpoint(worker_id); ->coordination_.GetEndpoint(worker_id);
} }
distributed::RpcWorkerClients &Master::GetIndexRpcClients() {
return dynamic_cast<impl::Master *>(impl_.get())->index_rpc_clients_;
}
Worker::Worker(Config config) Worker::Worker(Config config)
: PublicBase(std::make_unique<impl::Worker>(config)) {} : PublicBase(std::make_unique<impl::Worker>(config)) {}

View File

@ -6,6 +6,7 @@
#include "database/counters.hpp" #include "database/counters.hpp"
#include "database/storage.hpp" #include "database/storage.hpp"
#include "database/storage_gc.hpp" #include "database/storage_gc.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "durability/wal.hpp" #include "durability/wal.hpp"
#include "io/network/endpoint.hpp" #include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp" #include "storage/concurrent_id_mapper.hpp"
@ -172,6 +173,9 @@ class Master : public MasterBase {
/** Gets the endpoint of the worker with the given id. */ /** Gets the endpoint of the worker with the given id. */
// TODO make const once Coordination::GetEndpoint is const. // TODO make const once Coordination::GetEndpoint is const.
io::network::Endpoint GetEndpoint(int worker_id); io::network::Endpoint GetEndpoint(int worker_id);
/** Gets the index rpc workers*/
distributed::RpcWorkerClients &GetIndexRpcClients();
}; };
class Worker : public impl::PublicBase { class Worker : public impl::PublicBase {

View File

@ -1,7 +1,11 @@
#include <functional>
#include <future>
#include "glog/logging.h" #include "glog/logging.h"
#include "database/graph_db_accessor.hpp" #include "database/graph_db_accessor.hpp"
#include "database/state_delta.hpp" #include "database/state_delta.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "storage/edge.hpp" #include "storage/edge.hpp"
#include "storage/edge_accessor.hpp" #include "storage/edge_accessor.hpp"
#include "storage/vertex.hpp" #include "storage/vertex.hpp"
@ -120,6 +124,8 @@ EdgeAccessor GraphDbAccessor::FindEdgeChecked(gid::Gid gid,
void GraphDbAccessor::BuildIndex(storage::Label label, void GraphDbAccessor::BuildIndex(storage::Label label,
storage::Property property) { storage::Property property) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
DCHECK(db_.type() != GraphDb::Type::DISTRIBUTED_WORKER)
<< "BuildIndex invoked on worker";
db_.storage().index_build_tx_in_progress_.access().insert(transaction_.id_); db_.storage().index_build_tx_in_progress_.access().insert(transaction_.id_);
@ -163,6 +169,23 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
// CreateIndex. // CreateIndex.
GraphDbAccessor dba(db_); GraphDbAccessor dba(db_);
std::experimental::optional<std::vector<std::future<bool>>>
index_rpc_completions;
// Notify all workers to start building an index if we are the master since
// they don't have to wait anymore
if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
auto &rpc_clients = MasterGraphDb().GetIndexRpcClients();
index_rpc_completions.emplace(rpc_clients.ExecuteOnWorkers<bool>(
this->db_.WorkerId(),
[label, property, this](communication::rpc::Client &client) {
return client.Call<distributed::BuildIndexRpc>(
distributed::IndexLabelPropertyTx{
label, property, transaction_id()}) != nullptr;
}));
}
// Add transaction to the build_tx_in_progress as this transaction doesn't // Add transaction to the build_tx_in_progress as this transaction doesn't
// change data and shouldn't block other parallel index creations // change data and shouldn't block other parallel index creations
auto read_transaction_id = dba.transaction().id_; auto read_transaction_id = dba.transaction().id_;
@ -175,25 +198,52 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
DCHECK(removed) << "Index building (read) transaction should be inside set"; DCHECK(removed) << "Index building (read) transaction should be inside set";
}); });
for (auto vertex : dba.Vertices(label, false)) { dba.PopulateIndex(key);
db_.storage().label_property_index_.UpdateOnLabelProperty(
vertex.address().local(), vertex.current_); // Check if all workers sucesfully built their indexes and after this we can
// set the index as built
if (index_rpc_completions) {
// Wait first, check later - so that every thread finishes and none
// terminates - this can probably be optimized in case we fail early so that
// we notify other workers to stop building indexes
for (auto &index_built : *index_rpc_completions) index_built.wait();
for (auto &index_built : *index_rpc_completions) {
if (!index_built.get()) {
db_.storage().label_property_index_.DeleteIndex(key);
throw IndexCreationOnWorkerException("Index exists on a worker");
}
}
} }
dba.EnableIndex(key);
dba.Commit();
}
void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) {
// Commit transaction as we finished applying method on newest visible // Commit transaction as we finished applying method on newest visible
// records. Write that transaction's ID to the WAL as the index has been // records. Write that transaction's ID to the WAL as the index has been
// built at this point even if this DBA's transaction aborts for some // built at this point even if this DBA's transaction aborts for some
// reason. // reason.
auto wal_build_index_tx_id = dba.transaction_id(); auto wal_build_index_tx_id = transaction_id();
dba.Commit(); wal().Emplace(database::StateDelta::BuildIndex(wal_build_index_tx_id,
wal().Emplace(database::StateDelta::BuildIndex( LabelName(key.label_),
wal_build_index_tx_id, LabelName(label), PropertyName(property))); PropertyName(key.property_)));
// After these two operations we are certain that everything is contained in // After these two operations we are certain that everything is contained in
// the index under the assumption that this transaction contained no // the index under the assumption that the original index creation transaction
// vertex/edge insert/update before this method was invoked. // contained no vertex/edge insert/update before this method was invoked.
db_.storage().label_property_index_.IndexFinishedBuilding(key); db_.storage().label_property_index_.IndexFinishedBuilding(key);
} }
void GraphDbAccessor::PopulateIndex(const LabelPropertyIndex::Key &key) {
for (auto vertex : Vertices(key.label_, false)) {
if (vertex.PropsAt(key.property_).type() == PropertyValue::Type::Null)
continue;
db_.storage().label_property_index_.UpdateOnLabelProperty(
vertex.address().local(), vertex.current_);
}
}
void GraphDbAccessor::UpdateLabelIndices(storage::Label label, void GraphDbAccessor::UpdateLabelIndices(storage::Label label,
const VertexAccessor &vertex_accessor, const VertexAccessor &vertex_accessor,
const Vertex *const vertex) { const Vertex *const vertex) {

View File

@ -24,6 +24,11 @@ class IndexExistsException : public utils::BasicException {
using utils::BasicException::BasicException; using utils::BasicException::BasicException;
}; };
/** Thrown when creating an index which already exists. */
class IndexCreationOnWorkerException : public utils::BasicException {
using utils::BasicException::BasicException;
};
/** /**
* An accessor for the database object: exposes functions for operating on the * An accessor for the database object: exposes functions for operating on the
* database. All the functions in this class should be self-sufficient: for * database. All the functions in this class should be self-sufficient: for
@ -379,6 +384,12 @@ class GraphDbAccessor {
*/ */
void BuildIndex(storage::Label label, storage::Property property); void BuildIndex(storage::Label label, storage::Property property);
/// Populates index with vertices containing the key
void PopulateIndex(const LabelPropertyIndex::Key &key);
/// Writes Index (key) creation to wal, marks it as ready for usage
void EnableIndex(const LabelPropertyIndex::Key &key);
/** /**
* @brief - Returns true if the given label+property index already exists and * @brief - Returns true if the given label+property index already exists and
* is ready for use. * is ready for use.
@ -549,12 +560,13 @@ class GraphDbAccessor {
bool commited_{false}; bool commited_{false};
bool aborted_{false}; bool aborted_{false};
std::experimental::optional<distributed::RemoteCache<Vertex>> remote_vertices_; std::experimental::optional<distributed::RemoteCache<Vertex>>
remote_vertices_;
std::experimental::optional<distributed::RemoteCache<Edge>> remote_edges_; std::experimental::optional<distributed::RemoteCache<Edge>> remote_edges_;
/** Casts the transaction engine to SingleNodeEngine and returns it. If the /** Casts the transaction engine to SingleNodeEngine and returns it. If the
* engine is a WorkerEngine (and not SingleNode nor Master), a call to this * engine is a WorkerEngine (and not SingleNode nor Master), a call to this
* function will crash MG. */ * method will crash MG. */
tx::SingleNodeEngine &SingleNodeEngine() { tx::SingleNodeEngine &SingleNodeEngine() {
auto *single_node_engine = auto *single_node_engine =
dynamic_cast<tx::SingleNodeEngine *>(&db_.tx_engine()); dynamic_cast<tx::SingleNodeEngine *>(&db_.tx_engine());
@ -563,6 +575,15 @@ class GraphDbAccessor {
return *single_node_engine; return *single_node_engine;
} }
/** Casts the GraphDb to MasterGraphDb and returns it. If the
* GraphDb is not a MasterGraphDb, a call to this method will crash MG. */
Master &MasterGraphDb() {
auto *master_graph_db = dynamic_cast<Master *>(&db_);
DCHECK(master_graph_db)
<< "Asked for Master Graph db on a distributed worker or single node";
return *master_graph_db;
}
/** /**
* Insert this vertex into corresponding label and label+property (if it * Insert this vertex into corresponding label and label+property (if it
* exists) index. * exists) index.

View File

@ -139,7 +139,7 @@ static auto GetVlists(
*/ */
template <class TKey, class TIndexEntry, class TRecord> template <class TKey, class TIndexEntry, class TRecord>
static void Refresh( static void Refresh(
ConcurrentMap<TKey, SkipList<TIndexEntry> *> &indices, ConcurrentMap<TKey, std::unique_ptr<SkipList<TIndexEntry>>> &indices,
const tx::Snapshot &snapshot, tx::Engine &engine, const tx::Snapshot &snapshot, tx::Engine &engine,
const std::function<bool(const TKey &, const TIndexEntry &)> &exists) { const std::function<bool(const TKey &, const TIndexEntry &)> &exists) {
// iterate over all the indices // iterate over all the indices

View File

@ -27,15 +27,7 @@ class KeyIndex {
KeyIndex(KeyIndex &&other) = delete; KeyIndex(KeyIndex &&other) = delete;
KeyIndex &operator=(const KeyIndex &other) = delete; KeyIndex &operator=(const KeyIndex &other) = delete;
KeyIndex &operator=(KeyIndex &&other) = delete; KeyIndex &operator=(KeyIndex &&other) = delete;
/**
* @brief - Clear all indexes so that we don't leak memory.
*/
~KeyIndex() {
for (auto key_indices_pair : indices_.access()) {
// Delete skiplist because we created it with a new operator.
delete key_indices_pair.second;
}
}
/** /**
* @brief - Add record, vlist, if new, to TKey specific storage. * @brief - Add record, vlist, if new, to TKey specific storage.
* @param key - TKey index to update. * @param key - TKey index to update.
@ -159,14 +151,10 @@ class KeyIndex {
// Avoid excessive new/delete by first checking if it exists. // Avoid excessive new/delete by first checking if it exists.
auto iter = access.find(key); auto iter = access.find(key);
if (iter == access.end()) { if (iter == access.end()) {
auto skiplist = new SkipList<IndexEntry>; auto ret = access.insert(key, std::make_unique<SkipList<IndexEntry>>());
auto ret = access.insert(key, skiplist); return ret.first->second.get();
// In case some other insert managed to create new skiplist we shouldn't
// leak memory and should delete this one accordingly.
if (ret.second == false) delete skiplist;
return ret.first->second;
} }
return iter->second; return iter->second.get();
} }
/** /**
@ -196,6 +184,6 @@ class KeyIndex {
return e->edge_type_ == edge_type; return e->edge_type_ == edge_type;
} }
ConcurrentMap<TKey, SkipList<IndexEntry> *> indices_; ConcurrentMap<TKey, std::unique_ptr<SkipList<IndexEntry>>> indices_;
}; };
} // namespace database } // namespace database

View File

@ -31,16 +31,6 @@ class LabelPropertyIndex {
LabelPropertyIndex &operator=(const LabelPropertyIndex &other) = delete; LabelPropertyIndex &operator=(const LabelPropertyIndex &other) = delete;
LabelPropertyIndex &operator=(LabelPropertyIndex &&other) = delete; LabelPropertyIndex &operator=(LabelPropertyIndex &&other) = delete;
/**
* @brief - Clear all indices so that we don't leak memory.
*/
~LabelPropertyIndex() {
for (auto key_indices_pair : indices_.access()) {
// Delete skiplist because we created it with a new operator.
delete key_indices_pair.second;
}
}
/** /**
* @brief - Contain Label + property, to be used as an index key. * @brief - Contain Label + property, to be used as an index key.
*/ */
@ -74,14 +64,15 @@ class LabelPropertyIndex {
auto iter = access.find(key); auto iter = access.find(key);
if (iter != access.end()) return false; if (iter != access.end()) return false;
auto skiplist = new SkipList<IndexEntry>; auto ret = access.insert(key, std::make_unique<SkipList<IndexEntry>>());
auto ret = access.insert(key, skiplist);
// Avoid multithreaded memory leak if we don't delete skiplist and fail the
// insert (some other thread already inserted)
if (ret.second == false) delete skiplist;
return ret.second; return ret.second;
} }
/**
* Returns if it succeded in deleting the index and freeing the index memory
*/
void DeleteIndex(const Key &key) { indices_.access().remove(key); }
/** /**
* @brief - Notify that the index has been populated with everything it should * @brief - Notify that the index has been populated with everything it should
* be populated with, and can be used from this moment forward without missing * be populated with, and can be used from this moment forward without missing
@ -100,7 +91,7 @@ class LabelPropertyIndex {
void UpdateOnLabelProperty(mvcc::VersionList<Vertex> *const vlist, void UpdateOnLabelProperty(mvcc::VersionList<Vertex> *const vlist,
const Vertex *const vertex) { const Vertex *const vertex) {
const auto &labels = vertex->labels_; const auto &labels = vertex->labels_;
for (auto index : indices_.access()) { for (auto &index : indices_.access()) {
// Vertex has the given label // Vertex has the given label
if (std::find(labels.begin(), labels.end(), index.first.label_) == if (std::find(labels.begin(), labels.end(), index.first.label_) ==
labels.end()) labels.end())
@ -124,7 +115,7 @@ class LabelPropertyIndex {
void UpdateOnLabel(storage::Label label, void UpdateOnLabel(storage::Label label,
mvcc::VersionList<Vertex> *const vlist, mvcc::VersionList<Vertex> *const vlist,
const Vertex *const vertex) { const Vertex *const vertex) {
for (auto index : indices_.access()) { for (auto &index : indices_.access()) {
if (index.first.label_ != label) continue; if (index.first.label_ != label) continue;
auto prop = vertex->properties_.at(index.first.property_); auto prop = vertex->properties_.at(index.first.property_);
if (prop.type() != PropertyValue::Type::Null) { if (prop.type() != PropertyValue::Type::Null) {
@ -146,7 +137,7 @@ class LabelPropertyIndex {
mvcc::VersionList<Vertex> *const vlist, mvcc::VersionList<Vertex> *const vlist,
const Vertex *const vertex) { const Vertex *const vertex) {
const auto &labels = vertex->labels_; const auto &labels = vertex->labels_;
for (auto index : indices_.access()) { for (auto &index : indices_.access()) {
if (index.first.property_ != property) continue; if (index.first.property_ != property) continue;
if (std::find(labels.begin(), labels.end(), index.first.label_) != if (std::find(labels.begin(), labels.end(), index.first.label_) !=
labels.end()) { labels.end()) {
@ -268,10 +259,9 @@ class LabelPropertyIndex {
auto access = GetKeyStorage(key)->access(); auto access = GetKeyStorage(key)->access();
// create the iterator startpoint based on the lower bound // create the iterator startpoint based on the lower bound
auto start_iter = lower auto start_iter = lower ? access.find_or_larger(make_index_bound(
? access.find_or_larger(make_index_bound( lower, lower.value().IsInclusive()))
lower, lower.value().IsInclusive())) : access.begin();
: access.begin();
// a function that defines if an entry staisfies the filtering predicate. // a function that defines if an entry staisfies the filtering predicate.
// since we already handled the lower bound, we only need to deal with the // since we already handled the lower bound, we only need to deal with the
@ -513,7 +503,7 @@ class LabelPropertyIndex {
auto access = indices_.access(); auto access = indices_.access();
auto iter = access.find(key); auto iter = access.find(key);
if (iter == access.end()) return nullptr; if (iter == access.end()) return nullptr;
return iter->second; return iter->second.get();
} }
/** /**
@ -539,7 +529,7 @@ class LabelPropertyIndex {
return !IndexEntry::Less(prop, value) && !IndexEntry::Less(value, prop); return !IndexEntry::Less(prop, value) && !IndexEntry::Less(value, prop);
} }
ConcurrentMap<Key, SkipList<IndexEntry> *> indices_; ConcurrentMap<Key, std::unique_ptr<SkipList<IndexEntry>>> indices_;
ConcurrentSet<Key> ready_for_use_; ConcurrentSet<Key> ready_for_use_;
}; };
} // namespace database } // namespace database

View File

@ -9,6 +9,10 @@
#include "storage/types.hpp" #include "storage/types.hpp"
#include "storage/vertex.hpp" #include "storage/vertex.hpp"
namespace distributed {
class IndexRpcServer;
};
namespace database { namespace database {
/** A data structure containing the main data members of a graph database. */ /** A data structure containing the main data members of a graph database. */
@ -36,6 +40,7 @@ class Storage {
private: private:
friend class GraphDbAccessor; friend class GraphDbAccessor;
friend class StorageGc; friend class StorageGc;
friend class distributed::IndexRpcServer;
gid::Generator vertex_generator_; gid::Generator vertex_generator_;
gid::Generator edge_generator_; gid::Generator edge_generator_;

View File

@ -0,0 +1,33 @@
#pragma once
#include <memory>
#include <string>
#include "communication/rpc/messages.hpp"
#include "distributed/serialization.hpp"
namespace distributed {
const std::string kIndexRpcName = "IndexRpc";
struct IndexLabelPropertyTx {
storage::Label label;
storage::Property property;
tx::transaction_id_t tx_id;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &label;
ar &property;
ar &tx_id;
}
};
RPC_SINGLE_MEMBER_MESSAGE(BuildIndexReq, IndexLabelPropertyTx);
RPC_NO_MEMBER_MESSAGE(BuildIndexRes);
using BuildIndexRpc =
communication::rpc::RequestResponse<BuildIndexReq, BuildIndexRes>;
} // namespace distributed

View File

@ -0,0 +1,43 @@
#pragma once
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/index_rpc_messages.hpp"
using namespace database;
namespace distributed {
class IndexRpcServer {
public:
IndexRpcServer(database::GraphDb &db, communication::rpc::System &system)
: db_(db), system_(system) {
rpc_server_.Register<BuildIndexRpc>([this](const BuildIndexReq &req) {
LabelPropertyIndex::Key key{req.member.label, req.member.property};
GraphDbAccessor dba(db_, req.member.tx_id);
if (db_.storage().label_property_index_.CreateIndex(key) == false) {
// If we are a distributed worker we just have to wait till the index
// (which should be in progress of being created) is created so that our
// return guarantess that the index has been built - this assumes that
// no worker thread that is creating an index will fail
while (!dba.LabelPropertyIndexExists(key.label_, key.property_)) {
// TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
} else {
dba.PopulateIndex(key);
dba.EnableIndex(key);
}
return std::make_unique<BuildIndexRes>();
});
}
private:
database::GraphDb &db_;
communication::rpc::System &system_;
communication::rpc::Server rpc_server_{system_, kIndexRpcName};
};
} // namespace distributed

View File

@ -16,8 +16,7 @@ class RemoteDataRpcServer {
// locks (not sure what the gain would be). But have some way of cache // locks (not sure what the gain would be). But have some way of cache
// invalidation. // invalidation.
public: public:
RemoteDataRpcServer(database::GraphDb &db, RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system)
communication::rpc::System &system)
: db_(db), system_(system) { : db_(db), system_(system) {
rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) { rpc_server_.Register<RemoteVertexRpc>([this](const RemoteVertexReq &req) {
database::GraphDbAccessor dba(db_, req.member.tx_id); database::GraphDbAccessor dba(db_, req.member.tx_id);

View File

@ -67,6 +67,18 @@ TEST(LabelPropertyIndex, CreateIndex) {
EXPECT_EQ(index.CreateIndex(key), false); EXPECT_EQ(index.CreateIndex(key), false);
} }
TEST(LabelPropertyIndex, DeleteIndex) {
SingleNode db;
GraphDbAccessor accessor(db);
LabelPropertyIndex::Key key(accessor.Label("test"),
accessor.Property("test2"));
LabelPropertyIndex index;
EXPECT_EQ(index.CreateIndex(key), true);
EXPECT_EQ(index.CreateIndex(key), false);
index.DeleteIndex(key);
EXPECT_EQ(index.CreateIndex(key), true);
}
TEST(LabelPropertyIndex, IndexExistance) { TEST(LabelPropertyIndex, IndexExistance) {
SingleNode db; SingleNode db;
GraphDbAccessor accessor(db); GraphDbAccessor accessor(db);

View File

@ -300,3 +300,49 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
} }
// TODO EndRemotePull test // TODO EndRemotePull test
TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
using GraphDbAccessor = database::GraphDbAccessor;
storage::Label label;
storage::Property property;
{
GraphDbAccessor dba0{master()};
label = dba0.Label("label");
property = dba0.Property("property");
auto tx_id = dba0.transaction_id();
GraphDbAccessor dba1{worker1(), tx_id};
GraphDbAccessor dba2{worker2(), tx_id};
auto add_vertex = [label, property](GraphDbAccessor &dba) {
auto vertex = dba.InsertVertex();
vertex.add_label(label);
vertex.PropsSet(property, 1);
};
for (int i = 0; i < 100; ++i) add_vertex(dba0);
for (int i = 0; i < 50; ++i) add_vertex(dba1);
for (int i = 0; i < 300; ++i) add_vertex(dba2);
dba0.Commit();
}
{
GraphDbAccessor dba{master()};
dba.BuildIndex(label, property);
EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 100);
}
GraphDbAccessor dba_master{master()};
{
GraphDbAccessor dba{worker1(), dba_master.transaction_id()};
EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 50);
}
{
GraphDbAccessor dba{worker2(), dba_master.transaction_id()};
EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 300);
}
}

View File

@ -364,11 +364,11 @@ TEST_F(Durability, WalEncoding) {
EXPECT_EQ(deltas[6].transaction_id(), 1); EXPECT_EQ(deltas[6].transaction_id(), 1);
// The next two deltas are the BuildIndex internal transactions. // The next two deltas are the BuildIndex internal transactions.
EXPECT_EQ(deltas[7].type(), Type::TRANSACTION_BEGIN); EXPECT_EQ(deltas[7].type(), Type::TRANSACTION_BEGIN);
EXPECT_EQ(deltas[8].type(), Type::TRANSACTION_COMMIT); EXPECT_EQ(deltas[8].type(), Type::BUILD_INDEX);
EXPECT_EQ(deltas[9].type(), Type::BUILD_INDEX); auto index_name = deltas[8].IndexName();
auto index_name = deltas[9].IndexName();
EXPECT_EQ(index_name.first, "l1"); EXPECT_EQ(index_name.first, "l1");
EXPECT_EQ(index_name.second, "p1"); EXPECT_EQ(index_name.second, "p1");
EXPECT_EQ(deltas[9].type(), Type::TRANSACTION_COMMIT);
EXPECT_EQ(deltas[10].type(), Type::TRANSACTION_COMMIT); EXPECT_EQ(deltas[10].type(), Type::TRANSACTION_COMMIT);
EXPECT_EQ(deltas[10].transaction_id(), 1); EXPECT_EQ(deltas[10].transaction_id(), 1);
} }