diff --git a/src/distributed/serialization.hpp b/src/distributed/serialization.hpp index c44e172f9..3d93d31f8 100644 --- a/src/distributed/serialization.hpp +++ b/src/distributed/serialization.hpp @@ -17,9 +17,13 @@ namespace impl { // global one, using the given worker_id. template void SaveAddress(TArchive &ar, TAddress address, int worker_id) { - auto gid = address.is_remote() ? address.gid() : address.local()->gid_; - ar << gid; - ar << worker_id; + if (address.is_local()) { + ar << address.local()->gid_; + ar << worker_id; + } else { + ar << address.gid(); + ar << address.worker_id(); + } }; // Saves the given properties into the given archive. diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index a3304d1db..92b1a1401 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -1,15 +1,14 @@ -#include +#include #include #include "database/graph_db.hpp" #include "transactions/engine_master.hpp" -template -using optional = std::experimental::optional; - class DistributedGraphDbTest : public ::testing::Test { const std::string kLocal = "127.0.0.1"; + const int kWorkerCount = 2; + class WorkerInThread { public: WorkerInThread(database::Config config) : worker_(config) { @@ -30,7 +29,7 @@ class DistributedGraphDbTest : public ::testing::Test { database::Config master_config; master_config.master_endpoint = {kLocal, 0}; - master_.emplace(master_config); + master_ = std::make_unique(master_config); std::this_thread::sleep_for(kInitTime); auto worker_config = [this](int worker_id) { @@ -41,30 +40,30 @@ class DistributedGraphDbTest : public ::testing::Test { return config; }; - worker1_.emplace(worker_config(1)); - std::this_thread::sleep_for(kInitTime); - worker2_.emplace(worker_config(2)); - std::this_thread::sleep_for(kInitTime); + for (int i = 0; i < kWorkerCount; ++i) { + workers_.emplace_back( + std::make_unique(worker_config(i + 1))); + std::this_thread::sleep_for(kInitTime); + } } void TearDown() override { // Kill master first because it will expect a shutdown response from the // workers. - master_ = std::experimental::nullopt; - - worker2_ = std::experimental::nullopt; - worker1_ = std::experimental::nullopt; + master_ = nullptr; + for (int i = kWorkerCount - 1; i >= 0; --i) workers_[i] = nullptr; } database::Master &master() { return *master_; } auto &master_tx_engine() { return dynamic_cast(master_->tx_engine()); } - database::Worker &worker1() { return worker1_->worker_; } - database::Worker &worker2() { return worker2_->worker_; } + + database::Worker &worker(int worker_id) { + return workers_[worker_id - 1]->worker_; + } private: - optional master_; - optional worker1_; - optional worker2_; + std::unique_ptr master_; + std::vector> workers_; }; diff --git a/tests/unit/distributed_data_exchange.cpp b/tests/unit/distributed_data_exchange.cpp new file mode 100644 index 000000000..e9d29e08b --- /dev/null +++ b/tests/unit/distributed_data_exchange.cpp @@ -0,0 +1,117 @@ +#include + +#include "gtest/gtest.h" + +#include "database/graph_db_accessor.hpp" +#include "storage/edge_accessor.hpp" +#include "storage/vertex_accessor.hpp" + +#include "distributed_common.hpp" + +using namespace database; + +TEST_F(DistributedGraphDbTest, RemoteDataGetting) { + // Only old data is visible remotely, so create and commit some data. + gid::Gid v1_id, v2_id, e1_id; + + { + GraphDbAccessor dba{master()}; + auto v1 = dba.InsertVertex(); + auto v2 = dba.InsertVertex(); + auto e1 = dba.InsertEdge(v1, v2, dba.EdgeType("et")); + + // Set some data so we see we're getting the right stuff. + v1.PropsSet(dba.Property("p1"), 42); + v1.add_label(dba.Label("label")); + v2.PropsSet(dba.Property("p2"), "value"); + e1.PropsSet(dba.Property("p3"), true); + + v1_id = v1.gid(); + v2_id = v2.gid(); + e1_id = e1.gid(); + + dba.Commit(); + } + + // The master must start a transaction before workers can work in it. + GraphDbAccessor master_dba{master()}; + + { + GraphDbAccessor w1_dba{worker(1), master_dba.transaction_id()}; + VertexAccessor v1_in_w1{{v1_id, 0}, w1_dba}; + EXPECT_NE(v1_in_w1.GetOld(), nullptr); + EXPECT_EQ(v1_in_w1.GetNew(), nullptr); + EXPECT_EQ(v1_in_w1.PropsAt(w1_dba.Property("p1")).Value(), 42); + EXPECT_TRUE(v1_in_w1.has_label(w1_dba.Label("label"))); + } + + { + GraphDbAccessor w2_dba{worker(2), master_dba.transaction_id()}; + VertexAccessor v2_in_w2{{v2_id, 0}, w2_dba}; + EXPECT_NE(v2_in_w2.GetOld(), nullptr); + EXPECT_EQ(v2_in_w2.GetNew(), nullptr); + EXPECT_EQ(v2_in_w2.PropsAt(w2_dba.Property("p2")).Value(), + "value"); + EXPECT_FALSE(v2_in_w2.has_label(w2_dba.Label("label"))); + + VertexAccessor v1_in_w2{{v1_id, 0}, w2_dba}; + EdgeAccessor e1_in_w2{{e1_id, 0}, w2_dba}; + EXPECT_EQ(e1_in_w2.from(), v1_in_w2); + EXPECT_EQ(e1_in_w2.to(), v2_in_w2); + EXPECT_EQ(e1_in_w2.EdgeType(), w2_dba.EdgeType("et")); + EXPECT_EQ(e1_in_w2.PropsAt(w2_dba.Property("p3")).Value(), true); + } +} + +TEST_F(DistributedGraphDbTest, RemoteExpansion) { + // Model (v1)-->(v2), where each vertex is on one worker. + std::vector v_ga; + { + GraphDbAccessor dba{master()}; + v_ga.emplace_back(GraphDbAccessor(worker(1), dba.transaction_id()) + .InsertVertex() + .GlobalAddress()); + v_ga.emplace_back(GraphDbAccessor(worker(2), dba.transaction_id()) + .InsertVertex() + .GlobalAddress()); + dba.Commit(); + } + { + GraphDbAccessor dba{master()}; + auto edge_type = dba.EdgeType("et"); + auto prop = dba.Property("prop"); + GraphDbAccessor dba1{worker(1), dba.transaction_id()}; + auto edge_ga = + dba1.InsertOnlyEdge(v_ga[0], v_ga[1], edge_type, + dba1.db().storage().EdgeGenerator().Next()) + .GlobalAddress(); + for (int i : {0, 1}) { + GraphDbAccessor dba_w{worker(i + 1), dba.transaction_id()}; + auto v = dba_w.FindVertexChecked(v_ga[i].gid(), false); + // Update the vertex to create a new record. + v.PropsSet(prop, 42); + auto &edges = i == 0 ? v.GetNew()->out_ : v.GetNew()->in_; + edges.emplace(v_ga[(i + 1) % 2], edge_ga, edge_type); + } + dba.Commit(); + } + { + // Expand on the master for three hops. Collect vertex gids. + GraphDbAccessor dba{master()}; + std::vector visited; + + auto expand = [&visited, &dba](auto &v) { + for (auto e : v.out()) return e.to(); + for (auto e : v.in()) return e.from(); + CHECK(false) << "No edge in vertex"; + }; + + // Do a few hops back and forth, all on the master. + VertexAccessor v{v_ga[0], dba}; + for (int i = 0; i < 5; ++i) { + v = expand(v); + EXPECT_FALSE(v.address().is_local()); + EXPECT_EQ(v.address(), v_ga[(i + 1) % 2]); + } + } +} diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index bc18228d8..eb5627ade 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -31,15 +31,15 @@ using namespace database; TEST_F(DistributedGraphDbTest, Coordination) { EXPECT_NE(master().endpoint().port(), 0); - EXPECT_NE(worker1().endpoint().port(), 0); - EXPECT_NE(worker2().endpoint().port(), 0); + EXPECT_NE(worker(1).endpoint().port(), 0); + EXPECT_NE(worker(2).endpoint().port(), 0); - EXPECT_EQ(master().GetEndpoint(1), worker1().endpoint()); - EXPECT_EQ(master().GetEndpoint(2), worker2().endpoint()); - EXPECT_EQ(worker1().GetEndpoint(0), master().endpoint()); - EXPECT_EQ(worker1().GetEndpoint(2), worker2().endpoint()); - EXPECT_EQ(worker2().GetEndpoint(0), master().endpoint()); - EXPECT_EQ(worker2().GetEndpoint(1), worker1().endpoint()); + EXPECT_EQ(master().GetEndpoint(1), worker(1).endpoint()); + EXPECT_EQ(master().GetEndpoint(2), worker(2).endpoint()); + EXPECT_EQ(worker(1).GetEndpoint(0), master().endpoint()); + EXPECT_EQ(worker(1).GetEndpoint(2), worker(2).endpoint()); + EXPECT_EQ(worker(2).GetEndpoint(0), master().endpoint()); + EXPECT_EQ(worker(2).GetEndpoint(1), worker(1).endpoint()); } TEST_F(DistributedGraphDbTest, TxEngine) { @@ -47,12 +47,12 @@ TEST_F(DistributedGraphDbTest, TxEngine) { auto *tx2 = master_tx_engine().Begin(); EXPECT_EQ(tx2->snapshot().size(), 1); EXPECT_EQ( - worker1().tx_engine().RunningTransaction(tx1->id_)->snapshot().size(), 0); - EXPECT_EQ(worker2().tx_engine().RunningTransaction(tx2->id_)->snapshot(), + worker(1).tx_engine().RunningTransaction(tx1->id_)->snapshot().size(), 0); + EXPECT_EQ(worker(2).tx_engine().RunningTransaction(tx2->id_)->snapshot(), tx2->snapshot()); ::testing::FLAGS_gtest_death_test_style = "threadsafe"; - EXPECT_DEATH(worker2().tx_engine().RunningTransaction(123), ""); + EXPECT_DEATH(worker(2).tx_engine().RunningTransaction(123), ""); } template @@ -75,82 +75,29 @@ TEST_F(DistributedGraphDbTest, StorageTypes) { }; test_mappers(mapper_vec{master().label_mapper(), - worker1().label_mapper(), - worker2().label_mapper()}, + worker(1).label_mapper(), + worker(2).label_mapper()}, std::vector{}); test_mappers(mapper_vec{master().edge_type_mapper(), - worker1().edge_type_mapper(), - worker2().edge_type_mapper()}, + worker(1).edge_type_mapper(), + worker(2).edge_type_mapper()}, std::vector{}); test_mappers(mapper_vec{master().property_mapper(), - worker1().property_mapper(), - worker2().property_mapper()}, + worker(1).property_mapper(), + worker(2).property_mapper()}, std::vector{}); } TEST_F(DistributedGraphDbTest, Counters) { EXPECT_EQ(master().counters().Get("a"), 0); - EXPECT_EQ(worker1().counters().Get("a"), 1); - EXPECT_EQ(worker2().counters().Get("a"), 2); + EXPECT_EQ(worker(1).counters().Get("a"), 1); + EXPECT_EQ(worker(2).counters().Get("a"), 2); - EXPECT_EQ(worker1().counters().Get("b"), 0); - EXPECT_EQ(worker2().counters().Get("b"), 1); + EXPECT_EQ(worker(1).counters().Get("b"), 0); + EXPECT_EQ(worker(2).counters().Get("b"), 1); EXPECT_EQ(master().counters().Get("b"), 2); } -TEST_F(DistributedGraphDbTest, RemoteDataGetting) { - // Only old data is visible remotely, so create and commit some data. - gid::Gid v1_id, v2_id, e1_id; - - { - GraphDbAccessor dba{master()}; - auto v1 = dba.InsertVertex(); - auto v2 = dba.InsertVertex(); - auto e1 = dba.InsertEdge(v1, v2, dba.EdgeType("et")); - - // Set some data so we see we're getting the right stuff. - v1.PropsSet(dba.Property("p1"), 42); - v1.add_label(dba.Label("label")); - v2.PropsSet(dba.Property("p2"), "value"); - e1.PropsSet(dba.Property("p3"), true); - - v1_id = v1.gid(); - v2_id = v2.gid(); - e1_id = e1.gid(); - - dba.Commit(); - } - - // The master must start a transaction before workers can work in it. - GraphDbAccessor master_dba{master()}; - - { - GraphDbAccessor w1_dba{worker1(), master_dba.transaction_id()}; - VertexAccessor v1_in_w1{{v1_id, 0}, w1_dba}; - EXPECT_NE(v1_in_w1.GetOld(), nullptr); - EXPECT_EQ(v1_in_w1.GetNew(), nullptr); - EXPECT_EQ(v1_in_w1.PropsAt(w1_dba.Property("p1")).Value(), 42); - EXPECT_TRUE(v1_in_w1.has_label(w1_dba.Label("label"))); - } - - { - GraphDbAccessor w2_dba{worker2(), master_dba.transaction_id()}; - VertexAccessor v2_in_w2{{v2_id, 0}, w2_dba}; - EXPECT_NE(v2_in_w2.GetOld(), nullptr); - EXPECT_EQ(v2_in_w2.GetNew(), nullptr); - EXPECT_EQ(v2_in_w2.PropsAt(w2_dba.Property("p2")).Value(), - "value"); - EXPECT_FALSE(v2_in_w2.has_label(w2_dba.Label("label"))); - - VertexAccessor v1_in_w2{{v1_id, 0}, w2_dba}; - EdgeAccessor e1_in_w2{{e1_id, 0}, w2_dba}; - EXPECT_EQ(e1_in_w2.from(), v1_in_w2); - EXPECT_EQ(e1_in_w2.to(), v2_in_w2); - EXPECT_EQ(e1_in_w2.EdgeType(), w2_dba.EdgeType("et")); - EXPECT_EQ(e1_in_w2.PropsAt(w2_dba.Property("p3")).Value(), true); - } -} - TEST_F(DistributedGraphDbTest, DispatchPlan) { auto kRPCWaitTime = 600ms; int64_t plan_id = 5; @@ -168,8 +115,8 @@ TEST_F(DistributedGraphDbTest, DispatchPlan) { EXPECT_EQ(cached.symbol_table.max_position(), symbol_table.max_position()); EXPECT_EQ(cached.symbol_table.table(), symbol_table.table()); }; - check_for_worker(worker1()); - check_for_worker(worker2()); + check_for_worker(worker(1)); + check_for_worker(worker(2)); } TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { @@ -234,10 +181,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { auto tx1_batch2 = remote_pull(dba_1, worker_id).get(); expect_second_batch(tx1_batch2); } - master().remote_pull_clients().EndAllRemotePulls(dba_1.transaction_id(), - plan_id); - master().remote_pull_clients().EndAllRemotePulls(dba_2.transaction_id(), - plan_id); + for (auto tx_id : {dba_1.transaction_id(), dba_2.transaction_id()}) + master().remote_pull_clients().EndAllRemotePulls(tx_id, plan_id); } TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { @@ -257,9 +202,9 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { e12.PropsSet(prop, worker_id * 10 + 2); }; create_data(dba, 0); - GraphDbAccessor dba_w1{worker1(), dba.transaction_id()}; + GraphDbAccessor dba_w1{worker(1), dba.transaction_id()}; create_data(dba_w1, 1); - GraphDbAccessor dba_w2{worker2(), dba.transaction_id()}; + GraphDbAccessor dba_w2{worker(2), dba.transaction_id()}; create_data(dba_w2, 2); dba.Commit(); } @@ -347,8 +292,8 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { property = dba0.Property("property"); auto tx_id = dba0.transaction_id(); - GraphDbAccessor dba1{worker1(), tx_id}; - GraphDbAccessor dba2{worker2(), tx_id}; + GraphDbAccessor dba1{worker(1), tx_id}; + GraphDbAccessor dba2{worker(2), tx_id}; auto add_vertex = [label, property](GraphDbAccessor &dba) { auto vertex = dba.InsertVertex(); vertex.add_label(label); @@ -370,27 +315,27 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { GraphDbAccessor dba_master{master()}; { - GraphDbAccessor dba{worker1(), dba_master.transaction_id()}; + GraphDbAccessor dba{worker(1), dba_master.transaction_id()}; EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property)); EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 50); } { - GraphDbAccessor dba{worker2(), dba_master.transaction_id()}; + GraphDbAccessor dba{worker(2), dba_master.transaction_id()}; EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property)); EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 300); } } TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) { - GraphDbAccessor dba_w1(worker1()); + GraphDbAccessor dba_w1(worker(1)); auto v = dba_w1.InsertVertex(); auto prop = dba_w1.Property("p"); v.PropsSet(prop, 42); auto v_ga = v.GlobalAddress(); dba_w1.Commit(); - GraphDbAccessor dba_w2(worker2()); + GraphDbAccessor dba_w2(worker(2)); VertexAccessor v_in_w2{v_ga, dba_w2}; EXPECT_EQ(v_in_w2.PropsAt(prop).Value(), 42); } diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index d0c5a0b7b..090eba153 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -5,12 +5,12 @@ #include "distributed_common.hpp" TEST_F(DistributedGraphDbTest, RemoteUpdateLocalVisibility) { - database::GraphDbAccessor dba_tx1{worker1()}; + database::GraphDbAccessor dba_tx1{worker(1)}; auto v = dba_tx1.InsertVertex(); auto v_ga = v.GlobalAddress(); dba_tx1.Commit(); - database::GraphDbAccessor dba_tx2_w2{worker2()}; + database::GraphDbAccessor dba_tx2_w2{worker(2)}; v = VertexAccessor(v_ga, dba_tx2_w2); ASSERT_FALSE(v.address().is_local()); auto label = dba_tx2_w2.Label("l"); @@ -22,7 +22,7 @@ TEST_F(DistributedGraphDbTest, RemoteUpdateLocalVisibility) { EXPECT_FALSE(v.has_label(label)); // In the same transaction on the owning worker there is no label. - database::GraphDbAccessor dba_tx2_w1{worker1(), dba_tx2_w2.transaction_id()}; + database::GraphDbAccessor dba_tx2_w1{worker(1), dba_tx2_w2.transaction_id()}; v = VertexAccessor(v_ga, dba_tx2_w1); v.SwitchOld(); EXPECT_FALSE(v.has_label(label));