diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index fa833e4c2..bb5dce37f 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -326,7 +326,8 @@ class Worker : public PrivateBase { distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_}; distributed::DataManager data_manager_{*this, data_clients_}; distributed::WorkerTransactionalCacheCleaner cache_cleaner_{ - tx_engine_, server_, produce_server_, updates_server_, data_manager_}; + tx_engine_, &wal(), server_, + produce_server_, updates_server_, data_manager_}; distributed::DurabilityRpcServer durability_rpc_server_{*this, server_}; distributed::ClusterDiscoveryWorker cluster_discovery_{ server_, coordination_, rpc_worker_clients_.GetClientPool(0)}; diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp index 9fd4cc55a..a55b14ca6 100644 --- a/src/distributed/rpc_worker_clients.hpp +++ b/src/distributed/rpc_worker_clients.hpp @@ -130,11 +130,12 @@ class OngoingProduceJoinerRpcClients { OngoingProduceJoinerRpcClients(RpcWorkerClients &clients) : clients_(clients) {} - void JoinOngoingProduces(tx::TransactionId tx_id) { + void JoinOngoingProduces(tx::TransactionId tx_id, bool committed) { auto futures = clients_.ExecuteOnWorkers<void>( - 0, [tx_id](int worker_id, communication::rpc::ClientPool &client_pool) { - auto result = - client_pool.Call<distributed::WaitOnTransactionEndRpc>(tx_id); + 0, [tx_id, committed](int worker_id, + communication::rpc::ClientPool &client_pool) { + auto result = client_pool.Call<distributed::WaitOnTransactionEndRpc>( + tx_id, committed); CHECK(result) << "[WaitOnTransactionEndRpc] failed to notify that transaction " << tx_id << " ended"; diff --git a/src/distributed/transactional_cache_cleaner.hpp b/src/distributed/transactional_cache_cleaner.hpp index 98e6007fd..f45566009 100644 --- a/src/distributed/transactional_cache_cleaner.hpp +++ b/src/distributed/transactional_cache_cleaner.hpp @@ -65,20 +65,32 @@ class WorkerTransactionalCacheCleaner : public TransactionalCacheCleaner { public: template <class... T> WorkerTransactionalCacheCleaner(tx::WorkerEngine &tx_engine, + durability::WriteAheadLog *wal, communication::rpc::Server &server, ProduceRpcServer &produce_server, T &... caches) : TransactionalCacheCleaner(tx_engine, caches...), + wal_(wal), rpc_server_(server), produce_server_(produce_server) { Register(tx_engine); - rpc_server_.Register<WaitOnTransactionEndRpc>([this](const auto &req_reader, - auto *res_builder) { - produce_server_.FinishAndClearOngoingProducePlans(req_reader.getMember()); - }); + rpc_server_.Register<WaitOnTransactionEndRpc>( + [this](const auto &req_reader, auto *res_builder) { + auto tx_id = req_reader.getTxId(); + auto committed = req_reader.getCommitted(); + produce_server_.FinishAndClearOngoingProducePlans(tx_id); + if (wal_) { + if (committed) { + wal_->Emplace(database::StateDelta::TxCommit(tx_id)); + } else { + wal_->Emplace(database::StateDelta::TxAbort(tx_id)); + } + } + }); } private: + durability::WriteAheadLog *wal_; communication::rpc::Server &rpc_server_; ProduceRpcServer &produce_server_; }; diff --git a/src/distributed/transactional_cache_cleaner_rpc_messages.lcp b/src/distributed/transactional_cache_cleaner_rpc_messages.lcp index 7580bd8e3..8cb6d65b4 100644 --- a/src/distributed/transactional_cache_cleaner_rpc_messages.lcp +++ b/src/distributed/transactional_cache_cleaner_rpc_messages.lcp @@ -11,7 +11,8 @@ cpp<# (lcp:capnp-namespace "distributed") (lcp:define-rpc wait-on-transaction-end - (:request ((member "tx::TransactionId" :capnp-type "UInt64"))) + (:request ((tx-id "tx::TransactionId" :capnp-type "UInt64") + (committed :bool))) (:response ())) (lcp:pop-namespace) diff --git a/src/transactions/engine_master.cpp b/src/transactions/engine_master.cpp index 6a081b893..68602f583 100644 --- a/src/transactions/engine_master.cpp +++ b/src/transactions/engine_master.cpp @@ -86,12 +86,12 @@ MasterEngine::MasterEngine(communication::rpc::Server &server, } void MasterEngine::Commit(const Transaction &t) { - ongoing_produce_joiner_.JoinOngoingProduces(t.id_); + ongoing_produce_joiner_.JoinOngoingProduces(t.id_, true); SingleNodeEngine::Commit(t); } void MasterEngine::Abort(const Transaction &t) { - ongoing_produce_joiner_.JoinOngoingProduces(t.id_); + ongoing_produce_joiner_.JoinOngoingProduces(t.id_, false); SingleNodeEngine::Abort(t); } diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 1dcdd1be5..168776812 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -88,6 +88,9 @@ target_link_libraries(${test_prefix}distributed_data_exchange memgraph_lib kvsto add_unit_test(distributed_durability.cpp) target_link_libraries(${test_prefix}distributed_durability memgraph_lib kvstore_dummy_lib) +add_unit_test(distributed_dynamic_graph_partitioner.cpp) +target_link_libraries(${test_prefix}distributed_dynamic_graph_partitioner memgraph_lib kvstore_dummy_lib) + add_unit_test(distributed_gc.cpp) target_link_libraries(${test_prefix}distributed_gc memgraph_lib kvstore_dummy_lib) @@ -106,6 +109,9 @@ target_link_libraries(${test_prefix}distributed_serialization memgraph_lib kvsto add_unit_test(distributed_updates.cpp) target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dummy_lib) +add_unit_test(distributed_vertex_migrator.cpp) +target_link_libraries(${test_prefix}distributed_vertex_migrator memgraph_lib kvstore_dummy_lib) + add_unit_test(durability.cpp) target_link_libraries(${test_prefix}durability memgraph_lib kvstore_dummy_lib) diff --git a/tests/unit/distributed_bfs.cpp b/tests/unit/distributed_bfs.cpp index 173b1ff22..9ab941aef 100644 --- a/tests/unit/distributed_bfs.cpp +++ b/tests/unit/distributed_bfs.cpp @@ -27,6 +27,7 @@ class BfsTest : public DistributedGraphDbTest { } public: + BfsTest() : DistributedGraphDbTest("bfs") {} std::vector<storage::VertexAddress> vertices; std::map<std::pair<int, int>, storage::EdgeAddress> edges; }; diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 387ab3d2d..acf8cdc40 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -145,10 +145,20 @@ class DistributedGraphDbTest : public ::testing::Test { return std::distance(edges.begin(), edges.end()); }; - fs::path tmp_dir_ = fs::temp_directory_path() / - ("MG_test_unit_durability" + std::to_string(getpid())); + fs::path tmp_dir_{fs::temp_directory_path() / "MG_test_unit_durability"}; + + public: + // Each test has to specify its own durability suffix to avoid conflicts + DistributedGraphDbTest() = delete; + + DistributedGraphDbTest(const std::string &dir_suffix) + : dir_suffix_(dir_suffix) { + tmp_dir_ = + fs::temp_directory_path() / ("MG_test_unit_durability_" + dir_suffix_); + } private: + std::string dir_suffix_{""}; std::unique_ptr<database::Master> master_; std::vector<std::unique_ptr<WorkerInThread>> workers_; }; diff --git a/tests/unit/distributed_data_exchange.cpp b/tests/unit/distributed_data_exchange.cpp index c475fd5ed..6cc136786 100644 --- a/tests/unit/distributed_data_exchange.cpp +++ b/tests/unit/distributed_data_exchange.cpp @@ -11,7 +11,12 @@ using namespace database; using namespace std::literals::chrono_literals; -TEST_F(DistributedGraphDbTest, RemoteDataGetting) { +class DistributedDataExchangeTest : public DistributedGraphDbTest { + public: + DistributedDataExchangeTest() : DistributedGraphDbTest("data_exchange") {} +}; + +TEST_F(DistributedDataExchangeTest, RemoteDataGetting) { // Only old data is visible remotely, so create and commit some data. gid::Gid v1_id, v2_id, e1_id; @@ -64,7 +69,7 @@ TEST_F(DistributedGraphDbTest, RemoteDataGetting) { } } -TEST_F(DistributedGraphDbTest, RemoteExpansion) { +TEST_F(DistributedDataExchangeTest, RemoteExpansion) { // Model (v1)-->(v2), where each vertex is on one worker. auto from = InsertVertex(worker(1)); auto to = InsertVertex(worker(2)); @@ -90,7 +95,7 @@ TEST_F(DistributedGraphDbTest, RemoteExpansion) { } } -TEST_F(DistributedGraphDbTest, VertexCountsEqual) { +TEST_F(DistributedDataExchangeTest, VertexCountsEqual) { for (int i = 0; i < 5; ++i) InsertVertex(master()); for (int i = 0; i < 7; ++i) InsertVertex(worker(1)); for (int i = 0; i < 9; ++i) InsertVertex(worker(2)); @@ -115,7 +120,7 @@ TEST_F(DistributedGraphDbTest, VertexCountsEqual) { } } -TEST_F(DistributedGraphDbTest, VertexCountsTransactional) { +TEST_F(DistributedDataExchangeTest, VertexCountsTransactional) { { GraphDbAccessor accessor(master()); InsertVertex(master()); diff --git a/tests/unit/distributed_durability.cpp b/tests/unit/distributed_durability.cpp index b422c5f18..d25edb1ed 100644 --- a/tests/unit/distributed_durability.cpp +++ b/tests/unit/distributed_durability.cpp @@ -1,28 +1,50 @@ +#include "experimental/filesystem" + #include "distributed_common.hpp" #include "database/graph_db_accessor.hpp" +#include "durability/paths.hpp" #include "durability/snapshooter.hpp" +#include "utils/string.hpp" class DistributedDurability : public DistributedGraphDbTest { public: + DistributedDurability() : DistributedGraphDbTest("distributed") {} void AddVertices() { AddVertex(master(), "master"); AddVertex(worker(1), "worker1"); AddVertex(worker(2), "worker2"); } + void CheckVertices(int expected_count) { CheckVertex(master(), expected_count, "master"); CheckVertex(worker(1), expected_count, "worker1"); CheckVertex(worker(2), expected_count, "worker2"); } + void RestartWithRecovery() { - ShutDown(); + DistributedGraphDbTest::ShutDown(); Initialize([](database::Config config) { config.db_recover_on_startup = true; return config; }); } + void RestartWithWal() { + DistributedGraphDbTest::ShutDown(); + Initialize([](database::Config config) { + config.durability_enabled = true; + return config; + }); + } + + void FlushAllWal() { + // TODO(buda): Extend this when we have a fully durable mode + master().wal().Flush(); + worker(1).wal().Flush(); + worker(2).wal().Flush(); + } + private: void AddVertex(database::GraphDb &db, const std::string &label) { database::GraphDbAccessor dba(db); @@ -115,3 +137,52 @@ TEST_F(DistributedDurability, RecoveryFailure) { ::testing::FLAGS_gtest_death_test_style = "threadsafe"; EXPECT_DEATH(RestartWithRecovery(), "worker failed to recover"); } + +std::vector<fs::path> DirFiles(fs::path dir) { + std::vector<fs::path> files; + if (fs::exists(dir)) + for (auto &file : fs::directory_iterator(dir)) files.push_back(file.path()); + return files; +} + +void CheckDeltas(fs::path wal_dir, database::StateDelta::Type op) { + // Equal to worker count + auto wal_files = DirFiles(wal_dir); + ASSERT_EQ(wal_files.size(), 3); + HashedFileReader reader; + for (auto worker_wal : wal_files) { + ASSERT_TRUE(reader.Open(worker_wal)); + communication::bolt::Decoder<HashedFileReader> decoder{reader}; + std::vector<database::StateDelta> deltas; + while (true) { + auto delta = database::StateDelta::Decode(reader, decoder); + if (delta) { + deltas.emplace_back(*delta); + } else { + break; + } + } + reader.Close(); + ASSERT_GE(deltas.size(), 1); + // In case of master there is also an state delta with transaction beginning + EXPECT_EQ(deltas[deltas.size() > 1 ? 1 : 0].type, op); + } +} + +TEST_F(DistributedDurability, WriteCommittedTx) { + RestartWithWal(); + database::GraphDbAccessor dba(master()); + dba.Commit(); + FlushAllWal(); + CheckDeltas(tmp_dir_ / durability::kWalDir, + database::StateDelta::Type::TRANSACTION_COMMIT); +} + +TEST_F(DistributedDurability, WriteAbortedTx) { + RestartWithWal(); + database::GraphDbAccessor dba(master()); + dba.Abort(); + FlushAllWal(); + CheckDeltas(tmp_dir_ / durability::kWalDir, + database::StateDelta::Type::TRANSACTION_ABORT); +} diff --git a/tests/unit/distributed_dynamic_graph_partitioner.cpp b/tests/unit/distributed_dynamic_graph_partitioner.cpp index 8ccad6220..c5a8f865e 100644 --- a/tests/unit/distributed_dynamic_graph_partitioner.cpp +++ b/tests/unit/distributed_dynamic_graph_partitioner.cpp @@ -15,7 +15,13 @@ using namespace database; DECLARE_int32(dgp_max_batch_size); -TEST_F(DistributedGraphDbTest, CountLabels) { +class DistributedDynamicGraphPartitionerTest : public DistributedGraphDbTest { + public: + DistributedDynamicGraphPartitionerTest() + : DistributedGraphDbTest("dynamic_graph_partitioner") {} +}; + +TEST_F(DistributedDynamicGraphPartitionerTest, CountLabels) { auto va = InsertVertex(master()); auto vb = InsertVertex(worker(1)); auto vc = InsertVertex(worker(2)); @@ -37,7 +43,7 @@ TEST_F(DistributedGraphDbTest, CountLabels) { EXPECT_EQ(count_labels[worker(2).WorkerId()], 4 + 6); } -TEST_F(DistributedGraphDbTest, FindMigrationsMoveVertex) { +TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMoveVertex) { auto va = InsertVertex(master()); auto vb = InsertVertex(worker(1)); @@ -54,7 +60,7 @@ TEST_F(DistributedGraphDbTest, FindMigrationsMoveVertex) { EXPECT_EQ(migrations[0].second, worker(1).WorkerId()); } -TEST_F(DistributedGraphDbTest, FindMigrationsNoChange) { +TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsNoChange) { InsertVertex(master()); InsertVertex(worker(1)); InsertVertex(worker(2)); @@ -67,7 +73,7 @@ TEST_F(DistributedGraphDbTest, FindMigrationsNoChange) { EXPECT_EQ(migrations.size(), 0); } -TEST_F(DistributedGraphDbTest, FindMigrationsMultipleAndLimit) { +TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMultipleAndLimit) { auto va = InsertVertex(master()); auto vb = InsertVertex(master()); auto vc = InsertVertex(worker(1)); @@ -96,7 +102,7 @@ TEST_F(DistributedGraphDbTest, FindMigrationsMultipleAndLimit) { } } -TEST_F(DistributedGraphDbTest, Run) { +TEST_F(DistributedDynamicGraphPartitionerTest, Run) { // Emulate a bipartite graph with lots of connections on the left, and right // side, and some connections between the halfs std::vector<storage::VertexAddress> left; diff --git a/tests/unit/distributed_gc.cpp b/tests/unit/distributed_gc.cpp index 53fd5b5c0..44c742727 100644 --- a/tests/unit/distributed_gc.cpp +++ b/tests/unit/distributed_gc.cpp @@ -2,7 +2,12 @@ #include "distributed_common.hpp" -TEST_F(DistributedGraphDbTest, GarbageCollect) { +class DistributedGcTest : public DistributedGraphDbTest { + public: + DistributedGcTest() : DistributedGraphDbTest("gc") {} +}; + +TEST_F(DistributedGcTest, GarbageCollect) { database::GraphDbAccessor dba{master()}; auto tx = dba.transaction_id(); dba.Commit(); @@ -33,7 +38,7 @@ TEST_F(DistributedGraphDbTest, GarbageCollect) { EXPECT_EQ(worker(2).tx_engine().Info(tx_last).is_committed(), true); } -TEST_F(DistributedGraphDbTest, GarbageCollectBlocked) { +TEST_F(DistributedGcTest, GarbageCollectBlocked) { database::GraphDbAccessor dba{master()}; auto tx = dba.transaction_id(); dba.Commit(); diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index b93f5bab2..cbd102671 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -30,7 +30,12 @@ using namespace distributed; using namespace database; using namespace std::literals::chrono_literals; -TEST_F(DistributedGraphDbTest, Coordination) { +class DistributedGraphDb : public DistributedGraphDbTest { + public: + DistributedGraphDb() : DistributedGraphDbTest("distributed_graph") {} +}; + +TEST_F(DistributedGraphDb, Coordination) { EXPECT_NE(master().endpoint().port(), 0); EXPECT_NE(worker(1).endpoint().port(), 0); EXPECT_NE(worker(2).endpoint().port(), 0); @@ -43,7 +48,7 @@ TEST_F(DistributedGraphDbTest, Coordination) { EXPECT_EQ(worker(2).GetEndpoint(1), worker(1).endpoint()); } -TEST_F(DistributedGraphDbTest, TxEngine) { +TEST_F(DistributedGraphDb, TxEngine) { auto *tx1 = master_tx_engine().Begin(); auto *tx2 = master_tx_engine().Begin(); EXPECT_EQ(tx2->snapshot().size(), 1); @@ -60,7 +65,7 @@ template <typename TType> using mapper_vec = std::vector<std::reference_wrapper<storage::ConcurrentIdMapper<TType>>>; -TEST_F(DistributedGraphDbTest, StorageTypes) { +TEST_F(DistributedGraphDb, StorageTypes) { auto test_mappers = [](auto mappers, auto ids) { for (size_t i = 0; i < mappers.size(); ++i) { ids.emplace_back( @@ -89,7 +94,7 @@ TEST_F(DistributedGraphDbTest, StorageTypes) { std::vector<storage::Property>{}); } -TEST_F(DistributedGraphDbTest, Counters) { +TEST_F(DistributedGraphDb, Counters) { EXPECT_EQ(master().counters().Get("a"), 0); EXPECT_EQ(worker(1).counters().Get("a"), 1); EXPECT_EQ(worker(2).counters().Get("a"), 2); @@ -99,7 +104,7 @@ TEST_F(DistributedGraphDbTest, Counters) { EXPECT_EQ(master().counters().Get("b"), 2); } -TEST_F(DistributedGraphDbTest, DispatchPlan) { +TEST_F(DistributedGraphDb, DispatchPlan) { auto kRPCWaitTime = 600ms; int64_t plan_id = 5; SymbolTable symbol_table; @@ -124,7 +129,7 @@ TEST_F(DistributedGraphDbTest, DispatchPlan) { EXPECT_DEATH(check_for_worker(worker(1)), "Missing plan*"); } -TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { +TEST_F(DistributedGraphDb, BuildIndexDistributed) { storage::Label label; storage::Property property; @@ -169,7 +174,7 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { } } -TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) { +TEST_F(DistributedGraphDb, WorkerOwnedDbAccessors) { GraphDbAccessor dba_w1(worker(1)); auto v = dba_w1.InsertVertex(); auto prop = dba_w1.Property("p"); diff --git a/tests/unit/distributed_interpretation.cpp b/tests/unit/distributed_interpretation.cpp index d879ca72b..a18b38374 100644 --- a/tests/unit/distributed_interpretation.cpp +++ b/tests/unit/distributed_interpretation.cpp @@ -23,6 +23,8 @@ using namespace std::literals::chrono_literals; class DistributedInterpretationTest : public DistributedGraphDbTest { protected: + DistributedInterpretationTest() : DistributedGraphDbTest("interpretation") {} + void SetUp() override { DistributedGraphDbTest::SetUp(); interpreter_.emplace(master()); diff --git a/tests/unit/distributed_query_plan.cpp b/tests/unit/distributed_query_plan.cpp index 3a9e7e8ff..7f3fea5f3 100644 --- a/tests/unit/distributed_query_plan.cpp +++ b/tests/unit/distributed_query_plan.cpp @@ -32,7 +32,12 @@ using namespace distributed; using namespace database; using namespace std::literals::chrono_literals; -TEST_F(DistributedGraphDbTest, PullProduceRpc) { +class DistributedQueryPlan : public DistributedGraphDbTest { + protected: + DistributedQueryPlan() : DistributedGraphDbTest("query_plan") {} +}; + +TEST_F(DistributedQueryPlan, PullProduceRpc) { GraphDbAccessor dba{master()}; Context ctx{dba}; SymbolGenerator symbol_generator{ctx.symbol_table_}; @@ -96,7 +101,7 @@ TEST_F(DistributedGraphDbTest, PullProduceRpc) { } } -TEST_F(DistributedGraphDbTest, PullProduceRpcWithGraphElements) { +TEST_F(DistributedQueryPlan, PullProduceRpcWithGraphElements) { // Create some data on the master and both workers. Eeach edge (3 of them) and // vertex (6 of them) will be uniquely identified with their worker id and // sequence ID, so we can check we retrieved all. @@ -191,7 +196,7 @@ TEST_F(DistributedGraphDbTest, PullProduceRpcWithGraphElements) { check_result(2, future_w2_results.get().frames); } -TEST_F(DistributedGraphDbTest, Synchronize) { +TEST_F(DistributedQueryPlan, Synchronize) { auto from = InsertVertex(worker(1)); auto to = InsertVertex(worker(2)); InsertEdge(from, to, "et"); @@ -245,7 +250,7 @@ TEST_F(DistributedGraphDbTest, Synchronize) { // TODO test without advance command? } -TEST_F(DistributedGraphDbTest, Create) { +TEST_F(DistributedQueryPlan, Create) { // Query: UNWIND range(0, 1000) as x CREATE () auto &db = master(); GraphDbAccessor dba{db}; @@ -267,7 +272,7 @@ TEST_F(DistributedGraphDbTest, Create) { EXPECT_GT(VertexCount(worker(2)), 200); } -TEST_F(DistributedGraphDbTest, PullRemoteOrderBy) { +TEST_F(DistributedQueryPlan, PullRemoteOrderBy) { // Create some data on the master and both workers. storage::Property prop; { @@ -327,6 +332,8 @@ TEST_F(DistributedGraphDbTest, PullRemoteOrderBy) { class DistributedTransactionTimeout : public DistributedGraphDbTest { protected: + DistributedTransactionTimeout() + : DistributedGraphDbTest("transaction_timeout") {} int QueryExecutionTimeSec(int) override { return 1; } }; diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index 24ec0f9aa..cfdc100f9 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -13,6 +13,8 @@ class DistributedUpdateTest : public DistributedGraphDbTest { protected: + DistributedUpdateTest() : DistributedGraphDbTest("update") {} + std::unique_ptr<database::GraphDbAccessor> dba1; std::unique_ptr<database::GraphDbAccessor> dba2; storage::Label label; @@ -66,7 +68,13 @@ TEST_F(DistributedUpdateTest, UpdateApply) { #undef EXPECT_LABEL -TEST_F(DistributedGraphDbTest, CreateVertex) { +class DistributedGraphDbSimpleUpdatesTest : public DistributedGraphDbTest { + public: + DistributedGraphDbSimpleUpdatesTest() + : DistributedGraphDbTest("simple_updates") {} +}; + +TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertex) { gid::Gid gid; { database::GraphDbAccessor dba{worker(1)}; @@ -81,7 +89,7 @@ TEST_F(DistributedGraphDbTest, CreateVertex) { } } -TEST_F(DistributedGraphDbTest, CreateVertexWithUpdate) { +TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertexWithUpdate) { gid::Gid gid; storage::Property prop; { @@ -101,7 +109,7 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithUpdate) { } } -TEST_F(DistributedGraphDbTest, CreateVertexWithData) { +TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertexWithData) { gid::Gid gid; storage::Label l1; storage::Label l2; @@ -135,7 +143,7 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithData) { // Checks if expiring a local record for a local update before applying a remote // update delta causes a problem -TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) { +TEST_F(DistributedGraphDbSimpleUpdatesTest, UpdateVertexRemoteAndLocal) { gid::Gid gid; storage::Label l1; storage::Label l2; @@ -161,7 +169,7 @@ TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) { } } -TEST_F(DistributedGraphDbTest, AddSameLabelRemoteAndLocal) { +TEST_F(DistributedGraphDbSimpleUpdatesTest, AddSameLabelRemoteAndLocal) { auto v_address = InsertVertex(worker(1)); { database::GraphDbAccessor dba0{master()}; @@ -182,7 +190,7 @@ TEST_F(DistributedGraphDbTest, AddSameLabelRemoteAndLocal) { } } -TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) { +TEST_F(DistributedGraphDbSimpleUpdatesTest, IndexGetsUpdatedRemotely) { storage::VertexAddress v_remote = InsertVertex(worker(1)); storage::Label label; { @@ -200,7 +208,7 @@ TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) { } } -TEST_F(DistributedGraphDbTest, DeleteVertexRemoteCommit) { +TEST_F(DistributedGraphDbSimpleUpdatesTest, DeleteVertexRemoteCommit) { auto v_address = InsertVertex(worker(1)); database::GraphDbAccessor dba0{master()}; database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; @@ -212,7 +220,7 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteCommit) { EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true)); } -TEST_F(DistributedGraphDbTest, DeleteVertexRemoteBothDelete) { +TEST_F(DistributedGraphDbSimpleUpdatesTest, DeleteVertexRemoteBothDelete) { auto v_address = InsertVertex(worker(1)); { database::GraphDbAccessor dba0{master()}; @@ -227,7 +235,7 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteBothDelete) { } } -TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) { +TEST_F(DistributedGraphDbSimpleUpdatesTest, DeleteVertexRemoteStillConnected) { auto v_address = InsertVertex(worker(1)); auto e_address = InsertEdge(v_address, v_address, "edge"); @@ -258,6 +266,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) { class DistributedDetachDeleteTest : public DistributedGraphDbTest { protected: + DistributedDetachDeleteTest() : DistributedGraphDbTest("detach_delete") {} + storage::VertexAddress w1_a; storage::VertexAddress w1_b; storage::VertexAddress w2_a; @@ -352,6 +362,8 @@ TEST_F(DistributedDetachDeleteTest, TwoVerticesSameWorkers) { class DistributedEdgeCreateTest : public DistributedGraphDbTest { protected: + DistributedEdgeCreateTest() : DistributedGraphDbTest("edge_create") {} + storage::VertexAddress w1_a; storage::VertexAddress w1_b; storage::VertexAddress w2_a; @@ -474,6 +486,8 @@ TEST_F(DistributedEdgeCreateTest, RemoteRemoteCycle) { class DistributedEdgeRemoveTest : public DistributedGraphDbTest { protected: + DistributedEdgeRemoveTest() : DistributedGraphDbTest("edge_remove") {} + storage::VertexAddress from_addr; storage::VertexAddress to_addr; storage::EdgeAddress edge_addr; diff --git a/tests/unit/distributed_vertex_migrator.cpp b/tests/unit/distributed_vertex_migrator.cpp index 5796d0124..00851e5e5 100644 --- a/tests/unit/distributed_vertex_migrator.cpp +++ b/tests/unit/distributed_vertex_migrator.cpp @@ -15,14 +15,19 @@ using namespace database; DECLARE_bool(generate_vertex_ids); DECLARE_bool(generate_edge_ids); +class DistributedVertexMigratorTest : public DistributedGraphDbTest { + public: + DistributedVertexMigratorTest() : DistributedGraphDbTest("vertex_migrator") {} +}; + // Check if the auto-generated gid property is unchanged after migration -TEST_F(DistributedGraphDbTest, VertexEdgeGidSaved) { +TEST_F(DistributedVertexMigratorTest, VertexEdgeGidSaved) { FLAGS_generate_vertex_ids = true; FLAGS_generate_edge_ids = true; // Fill master so that the ids are not the same on master and worker 1 for (int i = 0; i < 10; ++i) { auto va = InsertVertex(master()); - auto ea = InsertEdge(va, va, "edge"); + InsertEdge(va, va, "edge"); } auto va = InsertVertex(master()); @@ -71,7 +76,7 @@ TEST_F(DistributedGraphDbTest, VertexEdgeGidSaved) { // Checks if two connected nodes from master will be transfered to worker 1 and // if edge from vertex on the worker 2 will now point to worker 1 after transfer -TEST_F(DistributedGraphDbTest, SomeTransfer) { +TEST_F(DistributedVertexMigratorTest, SomeTransfer) { auto va = InsertVertex(master()); auto vb = InsertVertex(master()); auto vc = InsertVertex(worker(2)); @@ -109,7 +114,7 @@ TEST_F(DistributedGraphDbTest, SomeTransfer) { // Check if cycle edge is transfered only once since it's contained in both in // and out edges of a vertex and if not handled correctly could cause problems -TEST_F(DistributedGraphDbTest, EdgeCycle) { +TEST_F(DistributedVertexMigratorTest, EdgeCycle) { auto va = InsertVertex(master()); InsertEdge(va, va, "edge"); { @@ -132,7 +137,7 @@ TEST_F(DistributedGraphDbTest, EdgeCycle) { EXPECT_EQ(EdgeCount(worker(1)), 1); } -TEST_F(DistributedGraphDbTest, TransferLabelsAndProperties) { +TEST_F(DistributedVertexMigratorTest, TransferLabelsAndProperties) { { database::GraphDbAccessor dba(master()); auto va = dba.InsertVertex();