Fix distributed index creation
Summary: During the creation of indexes there could be a case in which a vertex contains a label/property but is not a part of index after index building completes. This happens if vertices are being inserted while the index is being built. Reviewers: buda, msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1484
This commit is contained in:
parent
e67b06ab61
commit
90519f0beb
@ -154,6 +154,28 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
|
||||
"exists.");
|
||||
}
|
||||
|
||||
std::experimental::optional<std::vector<utils::Future<bool>>>
|
||||
index_rpc_completions;
|
||||
|
||||
// Notify all workers to create the index
|
||||
if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
|
||||
index_rpc_completions.emplace(db_.index_rpc_clients().GetCreateIndexFutures(
|
||||
label, property, this->db_.WorkerId()));
|
||||
}
|
||||
|
||||
if (index_rpc_completions) {
|
||||
// Wait first, check later - so that every thread finishes and none
|
||||
// terminates - this can probably be optimized in case we fail early so that
|
||||
// we notify other workers to stop building indexes
|
||||
for (auto &index_built : *index_rpc_completions) index_built.wait();
|
||||
for (auto &index_built : *index_rpc_completions) {
|
||||
if (!index_built.get()) {
|
||||
db_.storage().label_property_index_.DeleteIndex(key);
|
||||
throw IndexCreationOnWorkerException("Index exists on a worker");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Everything that happens after the line above ended will be added to the
|
||||
// index automatically, but we still have to add to index everything that
|
||||
// happened earlier. We have to first wait for every transaction that
|
||||
@ -179,14 +201,12 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
|
||||
// CreateIndex.
|
||||
GraphDbAccessor dba(db_);
|
||||
|
||||
std::experimental::optional<std::vector<utils::Future<bool>>>
|
||||
index_rpc_completions;
|
||||
|
||||
// Notify all workers to start building an index if we are the master since
|
||||
// Notify all workers to start populating an index if we are the master since
|
||||
// they don't have to wait anymore
|
||||
if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) {
|
||||
index_rpc_completions.emplace(db_.index_rpc_clients().GetBuildIndexFutures(
|
||||
label, property, transaction_id(), this->db_.WorkerId()));
|
||||
index_rpc_completions.emplace(
|
||||
db_.index_rpc_clients().GetPopulateIndexFutures(
|
||||
label, property, dba.transaction_id(), this->db_.WorkerId()));
|
||||
}
|
||||
|
||||
// Add transaction to the build_tx_in_progress as this transaction doesn't
|
||||
|
@ -15,11 +15,17 @@ cpp<#
|
||||
|
||||
(lcp:capnp-import 'storage "/storage/serialization.capnp")
|
||||
|
||||
(lcp:define-rpc build-index
|
||||
(lcp:define-rpc populate-index
|
||||
(:request
|
||||
((label "storage::Label" :capnp-type "Storage.Common")
|
||||
(property "storage::Property" :capnp-type "Storage.Common")
|
||||
(tx-id "tx::TransactionId" :capnp-type "UInt64")))
|
||||
(:response ()))
|
||||
|
||||
(lcp:define-rpc create-index
|
||||
(:request
|
||||
((label "storage::Label" :capnp-type "Storage.Common")
|
||||
(property "storage::Property" :capnp-type "Storage.Common")))
|
||||
(:response ()))
|
||||
|
||||
(lcp:pop-namespace) ;; distributed
|
||||
|
@ -7,26 +7,22 @@ namespace distributed {
|
||||
IndexRpcServer::IndexRpcServer(database::GraphDb &db,
|
||||
communication::rpc::Server &server)
|
||||
: db_(db), rpc_server_(server) {
|
||||
rpc_server_.Register<BuildIndexRpc>(
|
||||
rpc_server_.Register<CreateIndexRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
BuildIndexReq req;
|
||||
CreateIndexReq req;
|
||||
req.Load(req_reader);
|
||||
database::LabelPropertyIndex::Key key{req.label, req.property};
|
||||
db_.storage().label_property_index_.CreateIndex(key);
|
||||
});
|
||||
|
||||
rpc_server_.Register<PopulateIndexRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
PopulateIndexReq req;
|
||||
req.Load(req_reader);
|
||||
database::LabelPropertyIndex::Key key{req.label, req.property};
|
||||
database::GraphDbAccessor dba(db_, req.tx_id);
|
||||
|
||||
if (db_.storage().label_property_index_.CreateIndex(key) == false) {
|
||||
// If we are a distributed worker we just have to wait till the index
|
||||
// (which should be in progress of being created) is created so that
|
||||
// our return guarantess that the index has been built - this assumes
|
||||
// that no worker thread that is creating an index will fail
|
||||
while (!dba.LabelPropertyIndexExists(key.label_, key.property_)) {
|
||||
// TODO reconsider this constant, currently rule-of-thumb chosen
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
||||
}
|
||||
} else {
|
||||
dba.PopulateIndex(key);
|
||||
dba.EnableIndex(key);
|
||||
}
|
||||
dba.PopulateIndex(key);
|
||||
dba.EnableIndex(key);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -84,15 +84,27 @@ class IndexRpcClients {
|
||||
public:
|
||||
explicit IndexRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
|
||||
|
||||
auto GetBuildIndexFutures(const storage::Label &label,
|
||||
const storage::Property &property,
|
||||
tx::TransactionId transaction_id, int worker_id) {
|
||||
auto GetPopulateIndexFutures(const storage::Label &label,
|
||||
const storage::Property &property,
|
||||
tx::TransactionId transaction_id,
|
||||
int worker_id) {
|
||||
return clients_.ExecuteOnWorkers<bool>(
|
||||
worker_id,
|
||||
[label, property, transaction_id](
|
||||
int worker_id, communication::rpc::ClientPool &client_pool) {
|
||||
return static_cast<bool>(client_pool.Call<PopulateIndexRpc>(
|
||||
label, property, transaction_id));
|
||||
});
|
||||
}
|
||||
|
||||
auto GetCreateIndexFutures(const storage::Label &label,
|
||||
const storage::Property &property, int worker_id) {
|
||||
return clients_.ExecuteOnWorkers<bool>(
|
||||
worker_id,
|
||||
[label, property](int worker_id,
|
||||
communication::rpc::ClientPool &client_pool) {
|
||||
return static_cast<bool>(
|
||||
client_pool.Call<BuildIndexRpc>(label, property, transaction_id));
|
||||
client_pool.Call<CreateIndexRpc>(label, property));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -52,8 +52,9 @@ class Cluster {
|
||||
|
||||
void Stop() {
|
||||
interpreter_ = nullptr;
|
||||
master_ = nullptr;
|
||||
auto t = std::thread([this]() { master_ = nullptr; });
|
||||
workers_.clear();
|
||||
if (t.joinable()) t.join();
|
||||
}
|
||||
|
||||
~Cluster() {
|
||||
|
@ -174,6 +174,45 @@ TEST_F(DistributedGraphDb, BuildIndexDistributed) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDb, BuildIndexConcurrentInsert) {
|
||||
storage::Label label;
|
||||
storage::Property property;
|
||||
|
||||
GraphDbAccessor dba0{master()};
|
||||
label = dba0.Label("label");
|
||||
property = dba0.Property("property");
|
||||
|
||||
int cnt = 0;
|
||||
auto add_vertex = [label, property, &cnt](GraphDbAccessor &dba) {
|
||||
auto vertex = dba.InsertVertex();
|
||||
vertex.add_label(label);
|
||||
vertex.PropsSet(property, ++cnt);
|
||||
};
|
||||
dba0.Commit();
|
||||
|
||||
auto worker_insert = std::thread([this, &add_vertex]() {
|
||||
for (int i = 0; i < 10000; ++i) {
|
||||
GraphDbAccessor dba1{worker(1)};
|
||||
add_vertex(dba1);
|
||||
dba1.Commit();
|
||||
}
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(0.5s);
|
||||
{
|
||||
GraphDbAccessor dba{master()};
|
||||
dba.BuildIndex(label, property);
|
||||
EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
|
||||
}
|
||||
|
||||
worker_insert.join();
|
||||
{
|
||||
GraphDbAccessor dba{worker(1)};
|
||||
EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
|
||||
EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 10000);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDb, WorkerOwnedDbAccessors) {
|
||||
GraphDbAccessor dba_w1(worker(1));
|
||||
auto v = dba_w1.InsertVertex();
|
||||
|
Loading…
Reference in New Issue
Block a user