From 24857cc1cf01a08a2590ade4d71822d8e73b360b Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Thu, 25 Jan 2018 17:19:33 +0100 Subject: [PATCH] Support distributed (label, property) indexes Summary: Call workers buildindex Merge branch 'master' into setup_distributed_index Use ExecuteOnWorkers api Merge branch 'master' into setup_distributed_index Improve test Merge branch 'master' into setup_distributed_index Finish test Reviewers: florijan, teon.banek Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1134 --- src/communication/rpc/messages-inl.hpp | 6 ++ src/database/graph_db.cpp | 16 ++++- src/database/graph_db.hpp | 4 ++ src/database/graph_db_accessor.cpp | 68 ++++++++++++++++--- src/database/graph_db_accessor.hpp | 25 ++++++- src/database/indexes/index_common.hpp | 2 +- src/database/indexes/key_index.hpp | 22 ++---- src/database/indexes/label_property_index.hpp | 38 ++++------- src/database/storage.hpp | 5 ++ src/distributed/index_rpc_messages.hpp | 33 +++++++++ src/distributed/index_rpc_server.hpp | 43 ++++++++++++ src/distributed/remote_data_rpc_server.hpp | 3 +- tests/unit/database_label_property_index.cpp | 12 ++++ tests/unit/distributed_graph_db.cpp | 46 +++++++++++++ tests/unit/durability.cpp | 6 +- 15 files changed, 270 insertions(+), 59 deletions(-) create mode 100644 src/distributed/index_rpc_messages.hpp create mode 100644 src/distributed/index_rpc_server.hpp diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index c2c51e40b..56413c086 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -3,6 +3,7 @@ #include "boost/serialization/export.hpp" #include "distributed/coordination_rpc_messages.hpp" +#include "distributed/index_rpc_messages.hpp" #include "distributed/plan_rpc_messages.hpp" #include "distributed/remote_data_rpc_messages.hpp" #include "distributed/remote_pull_produce_rpc_messages.hpp" @@ -56,3 +57,8 @@ BOOST_CLASS_EXPORT(distributed::RemotePullResData); BOOST_CLASS_EXPORT(distributed::RemotePullRes); BOOST_CLASS_EXPORT(distributed::EndRemotePullReq); BOOST_CLASS_EXPORT(distributed::EndRemotePullRes); + +// Distributed indexes +BOOST_CLASS_EXPORT(distributed::BuildIndexReq); +BOOST_CLASS_EXPORT(distributed::BuildIndexRes); +BOOST_CLASS_EXPORT(distributed::IndexLabelPropertyTx); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index fee7a2148..bbda3dbb1 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -4,6 +4,7 @@ #include "database/graph_db.hpp" #include "distributed/coordination_master.hpp" #include "distributed/coordination_worker.hpp" +#include "distributed/index_rpc_server.hpp" #include "distributed/plan_consumer.hpp" #include "distributed/plan_dispatcher.hpp" #include "distributed/remote_data_rpc_clients.hpp" @@ -98,6 +99,12 @@ class SingleNode : public PrivateBase { distributed::RemoteDataRpcClients &remote_data_clients() override { LOG(FATAL) << "Remote data clients not available in single-node."; } + distributed::PlanDispatcher &plan_dispatcher() override { + LOG(FATAL) << "Plan Dispatcher not available in single-node."; + } + distributed::PlanConsumer &plan_consumer() override { + LOG(FATAL) << "Plan Consumer not available in single-node."; + } }; #define IMPL_DISTRIBUTED_GETTERS \ @@ -133,6 +140,8 @@ class Master : public PrivateBase { distributed::RemoteDataRpcClients remote_data_clients_{coordination_}; distributed::PlanDispatcher plan_dispatcher_{coordination_}; distributed::RemotePullRpcClients remote_pull_clients_{coordination_}; + distributed::RpcWorkerClients index_rpc_clients_{coordination_, + distributed::kIndexRpcName}; }; class Worker : public PrivateBase { @@ -148,7 +157,7 @@ class Worker : public PrivateBase { IMPL_DISTRIBUTED_GETTERS distributed::PlanConsumer &plan_consumer() override { return plan_consumer_; } distributed::RemoteProduceRpcServer &remote_produce_server() override { - return remote_produce_server_; + return remote_produce_server(); } communication::rpc::System system_{config_.worker_endpoint}; @@ -163,6 +172,7 @@ class Worker : public PrivateBase { distributed::PlanConsumer plan_consumer_{system_}; distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_, plan_consumer_}; + distributed::IndexRpcServer index_rpc_server_{*this, system_}; }; #undef IMPL_GETTERS @@ -276,6 +286,10 @@ io::network::Endpoint Master::GetEndpoint(int worker_id) { ->coordination_.GetEndpoint(worker_id); } +distributed::RpcWorkerClients &Master::GetIndexRpcClients() { + return dynamic_cast(impl_.get())->index_rpc_clients_; +} + Worker::Worker(Config config) : PublicBase(std::make_unique(config)) {} diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 5e56ec128..2c0897306 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -6,6 +6,7 @@ #include "database/counters.hpp" #include "database/storage.hpp" #include "database/storage_gc.hpp" +#include "distributed/rpc_worker_clients.hpp" #include "durability/wal.hpp" #include "io/network/endpoint.hpp" #include "storage/concurrent_id_mapper.hpp" @@ -172,6 +173,9 @@ class Master : public MasterBase { /** Gets the endpoint of the worker with the given id. */ // TODO make const once Coordination::GetEndpoint is const. io::network::Endpoint GetEndpoint(int worker_id); + + /** Gets the index rpc workers*/ + distributed::RpcWorkerClients &GetIndexRpcClients(); }; class Worker : public impl::PublicBase { diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index d46da8d0d..30293997f 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -1,7 +1,11 @@ +#include +#include + #include "glog/logging.h" #include "database/graph_db_accessor.hpp" #include "database/state_delta.hpp" +#include "distributed/index_rpc_messages.hpp" #include "storage/edge.hpp" #include "storage/edge_accessor.hpp" #include "storage/vertex.hpp" @@ -120,6 +124,8 @@ EdgeAccessor GraphDbAccessor::FindEdgeChecked(gid::Gid gid, void GraphDbAccessor::BuildIndex(storage::Label label, storage::Property property) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; + DCHECK(db_.type() != GraphDb::Type::DISTRIBUTED_WORKER) + << "BuildIndex invoked on worker"; db_.storage().index_build_tx_in_progress_.access().insert(transaction_.id_); @@ -163,6 +169,23 @@ void GraphDbAccessor::BuildIndex(storage::Label label, // CreateIndex. GraphDbAccessor dba(db_); + std::experimental::optional>> + index_rpc_completions; + + // Notify all workers to start building an index if we are the master since + // they don't have to wait anymore + if (db_.type() == GraphDb::Type::DISTRIBUTED_MASTER) { + auto &rpc_clients = MasterGraphDb().GetIndexRpcClients(); + + index_rpc_completions.emplace(rpc_clients.ExecuteOnWorkers( + this->db_.WorkerId(), + [label, property, this](communication::rpc::Client &client) { + return client.Call( + distributed::IndexLabelPropertyTx{ + label, property, transaction_id()}) != nullptr; + })); + } + // Add transaction to the build_tx_in_progress as this transaction doesn't // change data and shouldn't block other parallel index creations auto read_transaction_id = dba.transaction().id_; @@ -175,25 +198,52 @@ void GraphDbAccessor::BuildIndex(storage::Label label, DCHECK(removed) << "Index building (read) transaction should be inside set"; }); - for (auto vertex : dba.Vertices(label, false)) { - db_.storage().label_property_index_.UpdateOnLabelProperty( - vertex.address().local(), vertex.current_); + dba.PopulateIndex(key); + + // Check if all workers sucesfully built their indexes and after this we can + // set the index as built + if (index_rpc_completions) { + // Wait first, check later - so that every thread finishes and none + // terminates - this can probably be optimized in case we fail early so that + // we notify other workers to stop building indexes + for (auto &index_built : *index_rpc_completions) index_built.wait(); + for (auto &index_built : *index_rpc_completions) { + if (!index_built.get()) { + db_.storage().label_property_index_.DeleteIndex(key); + throw IndexCreationOnWorkerException("Index exists on a worker"); + } + } } + + dba.EnableIndex(key); + dba.Commit(); +} + +void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) { // Commit transaction as we finished applying method on newest visible // records. Write that transaction's ID to the WAL as the index has been // built at this point even if this DBA's transaction aborts for some // reason. - auto wal_build_index_tx_id = dba.transaction_id(); - dba.Commit(); - wal().Emplace(database::StateDelta::BuildIndex( - wal_build_index_tx_id, LabelName(label), PropertyName(property))); + auto wal_build_index_tx_id = transaction_id(); + wal().Emplace(database::StateDelta::BuildIndex(wal_build_index_tx_id, + LabelName(key.label_), + PropertyName(key.property_))); // After these two operations we are certain that everything is contained in - // the index under the assumption that this transaction contained no - // vertex/edge insert/update before this method was invoked. + // the index under the assumption that the original index creation transaction + // contained no vertex/edge insert/update before this method was invoked. db_.storage().label_property_index_.IndexFinishedBuilding(key); } +void GraphDbAccessor::PopulateIndex(const LabelPropertyIndex::Key &key) { + for (auto vertex : Vertices(key.label_, false)) { + if (vertex.PropsAt(key.property_).type() == PropertyValue::Type::Null) + continue; + db_.storage().label_property_index_.UpdateOnLabelProperty( + vertex.address().local(), vertex.current_); + } +} + void GraphDbAccessor::UpdateLabelIndices(storage::Label label, const VertexAccessor &vertex_accessor, const Vertex *const vertex) { diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index bac191f09..6d3424761 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -24,6 +24,11 @@ class IndexExistsException : public utils::BasicException { using utils::BasicException::BasicException; }; +/** Thrown when creating an index which already exists. */ +class IndexCreationOnWorkerException : public utils::BasicException { + using utils::BasicException::BasicException; +}; + /** * An accessor for the database object: exposes functions for operating on the * database. All the functions in this class should be self-sufficient: for @@ -379,6 +384,12 @@ class GraphDbAccessor { */ void BuildIndex(storage::Label label, storage::Property property); + /// Populates index with vertices containing the key + void PopulateIndex(const LabelPropertyIndex::Key &key); + + /// Writes Index (key) creation to wal, marks it as ready for usage + void EnableIndex(const LabelPropertyIndex::Key &key); + /** * @brief - Returns true if the given label+property index already exists and * is ready for use. @@ -549,12 +560,13 @@ class GraphDbAccessor { bool commited_{false}; bool aborted_{false}; - std::experimental::optional> remote_vertices_; + std::experimental::optional> + remote_vertices_; std::experimental::optional> remote_edges_; /** Casts the transaction engine to SingleNodeEngine and returns it. If the * engine is a WorkerEngine (and not SingleNode nor Master), a call to this - * function will crash MG. */ + * method will crash MG. */ tx::SingleNodeEngine &SingleNodeEngine() { auto *single_node_engine = dynamic_cast(&db_.tx_engine()); @@ -563,6 +575,15 @@ class GraphDbAccessor { return *single_node_engine; } + /** Casts the GraphDb to MasterGraphDb and returns it. If the + * GraphDb is not a MasterGraphDb, a call to this method will crash MG. */ + Master &MasterGraphDb() { + auto *master_graph_db = dynamic_cast(&db_); + DCHECK(master_graph_db) + << "Asked for Master Graph db on a distributed worker or single node"; + return *master_graph_db; + } + /** * Insert this vertex into corresponding label and label+property (if it * exists) index. diff --git a/src/database/indexes/index_common.hpp b/src/database/indexes/index_common.hpp index 2d43ed3fd..65485467a 100644 --- a/src/database/indexes/index_common.hpp +++ b/src/database/indexes/index_common.hpp @@ -139,7 +139,7 @@ static auto GetVlists( */ template static void Refresh( - ConcurrentMap *> &indices, + ConcurrentMap>> &indices, const tx::Snapshot &snapshot, tx::Engine &engine, const std::function &exists) { // iterate over all the indices diff --git a/src/database/indexes/key_index.hpp b/src/database/indexes/key_index.hpp index 058e54f9d..2f6198810 100644 --- a/src/database/indexes/key_index.hpp +++ b/src/database/indexes/key_index.hpp @@ -27,15 +27,7 @@ class KeyIndex { KeyIndex(KeyIndex &&other) = delete; KeyIndex &operator=(const KeyIndex &other) = delete; KeyIndex &operator=(KeyIndex &&other) = delete; - /** - * @brief - Clear all indexes so that we don't leak memory. - */ - ~KeyIndex() { - for (auto key_indices_pair : indices_.access()) { - // Delete skiplist because we created it with a new operator. - delete key_indices_pair.second; - } - } + /** * @brief - Add record, vlist, if new, to TKey specific storage. * @param key - TKey index to update. @@ -159,14 +151,10 @@ class KeyIndex { // Avoid excessive new/delete by first checking if it exists. auto iter = access.find(key); if (iter == access.end()) { - auto skiplist = new SkipList; - auto ret = access.insert(key, skiplist); - // In case some other insert managed to create new skiplist we shouldn't - // leak memory and should delete this one accordingly. - if (ret.second == false) delete skiplist; - return ret.first->second; + auto ret = access.insert(key, std::make_unique>()); + return ret.first->second.get(); } - return iter->second; + return iter->second.get(); } /** @@ -196,6 +184,6 @@ class KeyIndex { return e->edge_type_ == edge_type; } - ConcurrentMap *> indices_; + ConcurrentMap>> indices_; }; } // namespace database diff --git a/src/database/indexes/label_property_index.hpp b/src/database/indexes/label_property_index.hpp index 05cbddb8f..b09d07a1d 100644 --- a/src/database/indexes/label_property_index.hpp +++ b/src/database/indexes/label_property_index.hpp @@ -31,16 +31,6 @@ class LabelPropertyIndex { LabelPropertyIndex &operator=(const LabelPropertyIndex &other) = delete; LabelPropertyIndex &operator=(LabelPropertyIndex &&other) = delete; - /** - * @brief - Clear all indices so that we don't leak memory. - */ - ~LabelPropertyIndex() { - for (auto key_indices_pair : indices_.access()) { - // Delete skiplist because we created it with a new operator. - delete key_indices_pair.second; - } - } - /** * @brief - Contain Label + property, to be used as an index key. */ @@ -74,14 +64,15 @@ class LabelPropertyIndex { auto iter = access.find(key); if (iter != access.end()) return false; - auto skiplist = new SkipList; - auto ret = access.insert(key, skiplist); - // Avoid multithreaded memory leak if we don't delete skiplist and fail the - // insert (some other thread already inserted) - if (ret.second == false) delete skiplist; + auto ret = access.insert(key, std::make_unique>()); return ret.second; } + /** + * Returns if it succeded in deleting the index and freeing the index memory + */ + void DeleteIndex(const Key &key) { indices_.access().remove(key); } + /** * @brief - Notify that the index has been populated with everything it should * be populated with, and can be used from this moment forward without missing @@ -100,7 +91,7 @@ class LabelPropertyIndex { void UpdateOnLabelProperty(mvcc::VersionList *const vlist, const Vertex *const vertex) { const auto &labels = vertex->labels_; - for (auto index : indices_.access()) { + for (auto &index : indices_.access()) { // Vertex has the given label if (std::find(labels.begin(), labels.end(), index.first.label_) == labels.end()) @@ -124,7 +115,7 @@ class LabelPropertyIndex { void UpdateOnLabel(storage::Label label, mvcc::VersionList *const vlist, const Vertex *const vertex) { - for (auto index : indices_.access()) { + for (auto &index : indices_.access()) { if (index.first.label_ != label) continue; auto prop = vertex->properties_.at(index.first.property_); if (prop.type() != PropertyValue::Type::Null) { @@ -146,7 +137,7 @@ class LabelPropertyIndex { mvcc::VersionList *const vlist, const Vertex *const vertex) { const auto &labels = vertex->labels_; - for (auto index : indices_.access()) { + for (auto &index : indices_.access()) { if (index.first.property_ != property) continue; if (std::find(labels.begin(), labels.end(), index.first.label_) != labels.end()) { @@ -268,10 +259,9 @@ class LabelPropertyIndex { auto access = GetKeyStorage(key)->access(); // create the iterator startpoint based on the lower bound - auto start_iter = lower - ? access.find_or_larger(make_index_bound( - lower, lower.value().IsInclusive())) - : access.begin(); + auto start_iter = lower ? access.find_or_larger(make_index_bound( + lower, lower.value().IsInclusive())) + : access.begin(); // a function that defines if an entry staisfies the filtering predicate. // since we already handled the lower bound, we only need to deal with the @@ -513,7 +503,7 @@ class LabelPropertyIndex { auto access = indices_.access(); auto iter = access.find(key); if (iter == access.end()) return nullptr; - return iter->second; + return iter->second.get(); } /** @@ -539,7 +529,7 @@ class LabelPropertyIndex { return !IndexEntry::Less(prop, value) && !IndexEntry::Less(value, prop); } - ConcurrentMap *> indices_; + ConcurrentMap>> indices_; ConcurrentSet ready_for_use_; }; } // namespace database diff --git a/src/database/storage.hpp b/src/database/storage.hpp index e57b389c5..5169c4671 100644 --- a/src/database/storage.hpp +++ b/src/database/storage.hpp @@ -9,6 +9,10 @@ #include "storage/types.hpp" #include "storage/vertex.hpp" +namespace distributed { +class IndexRpcServer; +}; + namespace database { /** A data structure containing the main data members of a graph database. */ @@ -36,6 +40,7 @@ class Storage { private: friend class GraphDbAccessor; friend class StorageGc; + friend class distributed::IndexRpcServer; gid::Generator vertex_generator_; gid::Generator edge_generator_; diff --git a/src/distributed/index_rpc_messages.hpp b/src/distributed/index_rpc_messages.hpp new file mode 100644 index 000000000..3a9aef3c7 --- /dev/null +++ b/src/distributed/index_rpc_messages.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include + +#include "communication/rpc/messages.hpp" +#include "distributed/serialization.hpp" + +namespace distributed { +const std::string kIndexRpcName = "IndexRpc"; + +struct IndexLabelPropertyTx { + storage::Label label; + storage::Property property; + tx::transaction_id_t tx_id; + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &label; + ar &property; + ar &tx_id; + } +}; + +RPC_SINGLE_MEMBER_MESSAGE(BuildIndexReq, IndexLabelPropertyTx); +RPC_NO_MEMBER_MESSAGE(BuildIndexRes); + +using BuildIndexRpc = + communication::rpc::RequestResponse; +} // namespace distributed diff --git a/src/distributed/index_rpc_server.hpp b/src/distributed/index_rpc_server.hpp new file mode 100644 index 000000000..111436733 --- /dev/null +++ b/src/distributed/index_rpc_server.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include "database/graph_db.hpp" +#include "database/graph_db_accessor.hpp" +#include "distributed/index_rpc_messages.hpp" + +using namespace database; + +namespace distributed { + +class IndexRpcServer { + public: + IndexRpcServer(database::GraphDb &db, communication::rpc::System &system) + : db_(db), system_(system) { + rpc_server_.Register([this](const BuildIndexReq &req) { + + LabelPropertyIndex::Key key{req.member.label, req.member.property}; + GraphDbAccessor dba(db_, req.member.tx_id); + + if (db_.storage().label_property_index_.CreateIndex(key) == false) { + // If we are a distributed worker we just have to wait till the index + // (which should be in progress of being created) is created so that our + // return guarantess that the index has been built - this assumes that + // no worker thread that is creating an index will fail + while (!dba.LabelPropertyIndexExists(key.label_, key.property_)) { + // TODO reconsider this constant, currently rule-of-thumb chosen + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } else { + dba.PopulateIndex(key); + dba.EnableIndex(key); + } + return std::make_unique(); + }); + } + + private: + database::GraphDb &db_; + communication::rpc::System &system_; + communication::rpc::Server rpc_server_{system_, kIndexRpcName}; +}; + +} // namespace distributed diff --git a/src/distributed/remote_data_rpc_server.hpp b/src/distributed/remote_data_rpc_server.hpp index 49a099358..dbe83a8a1 100644 --- a/src/distributed/remote_data_rpc_server.hpp +++ b/src/distributed/remote_data_rpc_server.hpp @@ -16,8 +16,7 @@ class RemoteDataRpcServer { // locks (not sure what the gain would be). But have some way of cache // invalidation. public: - RemoteDataRpcServer(database::GraphDb &db, - communication::rpc::System &system) + RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system) : db_(db), system_(system) { rpc_server_.Register([this](const RemoteVertexReq &req) { database::GraphDbAccessor dba(db_, req.member.tx_id); diff --git a/tests/unit/database_label_property_index.cpp b/tests/unit/database_label_property_index.cpp index 47587b012..544932989 100644 --- a/tests/unit/database_label_property_index.cpp +++ b/tests/unit/database_label_property_index.cpp @@ -67,6 +67,18 @@ TEST(LabelPropertyIndex, CreateIndex) { EXPECT_EQ(index.CreateIndex(key), false); } +TEST(LabelPropertyIndex, DeleteIndex) { + SingleNode db; + GraphDbAccessor accessor(db); + LabelPropertyIndex::Key key(accessor.Label("test"), + accessor.Property("test2")); + LabelPropertyIndex index; + EXPECT_EQ(index.CreateIndex(key), true); + EXPECT_EQ(index.CreateIndex(key), false); + index.DeleteIndex(key); + EXPECT_EQ(index.CreateIndex(key), true); +} + TEST(LabelPropertyIndex, IndexExistance) { SingleNode db; GraphDbAccessor accessor(db); diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index f075114bc..78c9ca93a 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -300,3 +300,49 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { } // TODO EndRemotePull test + +TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { + using GraphDbAccessor = database::GraphDbAccessor; + storage::Label label; + storage::Property property; + + { + GraphDbAccessor dba0{master()}; + label = dba0.Label("label"); + property = dba0.Property("property"); + auto tx_id = dba0.transaction_id(); + + GraphDbAccessor dba1{worker1(), tx_id}; + GraphDbAccessor dba2{worker2(), tx_id}; + auto add_vertex = [label, property](GraphDbAccessor &dba) { + auto vertex = dba.InsertVertex(); + vertex.add_label(label); + vertex.PropsSet(property, 1); + }; + for (int i = 0; i < 100; ++i) add_vertex(dba0); + for (int i = 0; i < 50; ++i) add_vertex(dba1); + for (int i = 0; i < 300; ++i) add_vertex(dba2); + dba0.Commit(); + } + + { + GraphDbAccessor dba{master()}; + dba.BuildIndex(label, property); + EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property)); + EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 100); + } + + GraphDbAccessor dba_master{master()}; + + { + GraphDbAccessor dba{worker1(), dba_master.transaction_id()}; + EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property)); + EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 50); + } + + { + GraphDbAccessor dba{worker2(), dba_master.transaction_id()}; + EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property)); + EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 300); + } +} diff --git a/tests/unit/durability.cpp b/tests/unit/durability.cpp index 98be8afe5..940d6eb5e 100644 --- a/tests/unit/durability.cpp +++ b/tests/unit/durability.cpp @@ -364,11 +364,11 @@ TEST_F(Durability, WalEncoding) { EXPECT_EQ(deltas[6].transaction_id(), 1); // The next two deltas are the BuildIndex internal transactions. EXPECT_EQ(deltas[7].type(), Type::TRANSACTION_BEGIN); - EXPECT_EQ(deltas[8].type(), Type::TRANSACTION_COMMIT); - EXPECT_EQ(deltas[9].type(), Type::BUILD_INDEX); - auto index_name = deltas[9].IndexName(); + EXPECT_EQ(deltas[8].type(), Type::BUILD_INDEX); + auto index_name = deltas[8].IndexName(); EXPECT_EQ(index_name.first, "l1"); EXPECT_EQ(index_name.second, "p1"); + EXPECT_EQ(deltas[9].type(), Type::TRANSACTION_COMMIT); EXPECT_EQ(deltas[10].type(), Type::TRANSACTION_COMMIT); EXPECT_EQ(deltas[10].transaction_id(), 1); }