Fix address serialization

Summary:
Defined a new test based on reported bug, for multiple remote expansion.
Fixed the bug. Introduced minor refactors in distributed unit testing.

Reviewers: mculinovic, dgleich

Reviewed By: mculinovic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1173
This commit is contained in:
florijan 2018-02-05 16:28:22 +01:00
parent abe419984a
commit f84b0b0787
5 changed files with 177 additions and 112 deletions

View File

@ -17,9 +17,13 @@ namespace impl {
// global one, using the given worker_id.
template <typename TArchive, typename TAddress>
void SaveAddress(TArchive &ar, TAddress address, int worker_id) {
auto gid = address.is_remote() ? address.gid() : address.local()->gid_;
ar << gid;
ar << worker_id;
if (address.is_local()) {
ar << address.local()->gid_;
ar << worker_id;
} else {
ar << address.gid();
ar << address.worker_id();
}
};
// Saves the given properties into the given archive.

View File

@ -1,15 +1,14 @@
#include <experimental/optional>
#include <memory>
#include <gtest/gtest.h>
#include "database/graph_db.hpp"
#include "transactions/engine_master.hpp"
template <typename T>
using optional = std::experimental::optional<T>;
class DistributedGraphDbTest : public ::testing::Test {
const std::string kLocal = "127.0.0.1";
const int kWorkerCount = 2;
class WorkerInThread {
public:
WorkerInThread(database::Config config) : worker_(config) {
@ -30,7 +29,7 @@ class DistributedGraphDbTest : public ::testing::Test {
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
master_.emplace(master_config);
master_ = std::make_unique<database::Master>(master_config);
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {
@ -41,30 +40,30 @@ class DistributedGraphDbTest : public ::testing::Test {
return config;
};
worker1_.emplace(worker_config(1));
std::this_thread::sleep_for(kInitTime);
worker2_.emplace(worker_config(2));
std::this_thread::sleep_for(kInitTime);
for (int i = 0; i < kWorkerCount; ++i) {
workers_.emplace_back(
std::make_unique<WorkerInThread>(worker_config(i + 1)));
std::this_thread::sleep_for(kInitTime);
}
}
void TearDown() override {
// Kill master first because it will expect a shutdown response from the
// workers.
master_ = std::experimental::nullopt;
worker2_ = std::experimental::nullopt;
worker1_ = std::experimental::nullopt;
master_ = nullptr;
for (int i = kWorkerCount - 1; i >= 0; --i) workers_[i] = nullptr;
}
database::Master &master() { return *master_; }
auto &master_tx_engine() {
return dynamic_cast<tx::MasterEngine &>(master_->tx_engine());
}
database::Worker &worker1() { return worker1_->worker_; }
database::Worker &worker2() { return worker2_->worker_; }
database::Worker &worker(int worker_id) {
return workers_[worker_id - 1]->worker_;
}
private:
optional<database::Master> master_;
optional<WorkerInThread> worker1_;
optional<WorkerInThread> worker2_;
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;
};

View File

@ -0,0 +1,117 @@
#include <unordered_map>
#include "gtest/gtest.h"
#include "database/graph_db_accessor.hpp"
#include "storage/edge_accessor.hpp"
#include "storage/vertex_accessor.hpp"
#include "distributed_common.hpp"
using namespace database;
TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
// Only old data is visible remotely, so create and commit some data.
gid::Gid v1_id, v2_id, e1_id;
{
GraphDbAccessor dba{master()};
auto v1 = dba.InsertVertex();
auto v2 = dba.InsertVertex();
auto e1 = dba.InsertEdge(v1, v2, dba.EdgeType("et"));
// Set some data so we see we're getting the right stuff.
v1.PropsSet(dba.Property("p1"), 42);
v1.add_label(dba.Label("label"));
v2.PropsSet(dba.Property("p2"), "value");
e1.PropsSet(dba.Property("p3"), true);
v1_id = v1.gid();
v2_id = v2.gid();
e1_id = e1.gid();
dba.Commit();
}
// The master must start a transaction before workers can work in it.
GraphDbAccessor master_dba{master()};
{
GraphDbAccessor w1_dba{worker(1), master_dba.transaction_id()};
VertexAccessor v1_in_w1{{v1_id, 0}, w1_dba};
EXPECT_NE(v1_in_w1.GetOld(), nullptr);
EXPECT_EQ(v1_in_w1.GetNew(), nullptr);
EXPECT_EQ(v1_in_w1.PropsAt(w1_dba.Property("p1")).Value<int64_t>(), 42);
EXPECT_TRUE(v1_in_w1.has_label(w1_dba.Label("label")));
}
{
GraphDbAccessor w2_dba{worker(2), master_dba.transaction_id()};
VertexAccessor v2_in_w2{{v2_id, 0}, w2_dba};
EXPECT_NE(v2_in_w2.GetOld(), nullptr);
EXPECT_EQ(v2_in_w2.GetNew(), nullptr);
EXPECT_EQ(v2_in_w2.PropsAt(w2_dba.Property("p2")).Value<std::string>(),
"value");
EXPECT_FALSE(v2_in_w2.has_label(w2_dba.Label("label")));
VertexAccessor v1_in_w2{{v1_id, 0}, w2_dba};
EdgeAccessor e1_in_w2{{e1_id, 0}, w2_dba};
EXPECT_EQ(e1_in_w2.from(), v1_in_w2);
EXPECT_EQ(e1_in_w2.to(), v2_in_w2);
EXPECT_EQ(e1_in_w2.EdgeType(), w2_dba.EdgeType("et"));
EXPECT_EQ(e1_in_w2.PropsAt(w2_dba.Property("p3")).Value<bool>(), true);
}
}
TEST_F(DistributedGraphDbTest, RemoteExpansion) {
// Model (v1)-->(v2), where each vertex is on one worker.
std::vector<Edges::VertexAddress> v_ga;
{
GraphDbAccessor dba{master()};
v_ga.emplace_back(GraphDbAccessor(worker(1), dba.transaction_id())
.InsertVertex()
.GlobalAddress());
v_ga.emplace_back(GraphDbAccessor(worker(2), dba.transaction_id())
.InsertVertex()
.GlobalAddress());
dba.Commit();
}
{
GraphDbAccessor dba{master()};
auto edge_type = dba.EdgeType("et");
auto prop = dba.Property("prop");
GraphDbAccessor dba1{worker(1), dba.transaction_id()};
auto edge_ga =
dba1.InsertOnlyEdge(v_ga[0], v_ga[1], edge_type,
dba1.db().storage().EdgeGenerator().Next())
.GlobalAddress();
for (int i : {0, 1}) {
GraphDbAccessor dba_w{worker(i + 1), dba.transaction_id()};
auto v = dba_w.FindVertexChecked(v_ga[i].gid(), false);
// Update the vertex to create a new record.
v.PropsSet(prop, 42);
auto &edges = i == 0 ? v.GetNew()->out_ : v.GetNew()->in_;
edges.emplace(v_ga[(i + 1) % 2], edge_ga, edge_type);
}
dba.Commit();
}
{
// Expand on the master for three hops. Collect vertex gids.
GraphDbAccessor dba{master()};
std::vector<VertexAccessor> visited;
auto expand = [&visited, &dba](auto &v) {
for (auto e : v.out()) return e.to();
for (auto e : v.in()) return e.from();
CHECK(false) << "No edge in vertex";
};
// Do a few hops back and forth, all on the master.
VertexAccessor v{v_ga[0], dba};
for (int i = 0; i < 5; ++i) {
v = expand(v);
EXPECT_FALSE(v.address().is_local());
EXPECT_EQ(v.address(), v_ga[(i + 1) % 2]);
}
}
}

View File

@ -31,15 +31,15 @@ using namespace database;
TEST_F(DistributedGraphDbTest, Coordination) {
EXPECT_NE(master().endpoint().port(), 0);
EXPECT_NE(worker1().endpoint().port(), 0);
EXPECT_NE(worker2().endpoint().port(), 0);
EXPECT_NE(worker(1).endpoint().port(), 0);
EXPECT_NE(worker(2).endpoint().port(), 0);
EXPECT_EQ(master().GetEndpoint(1), worker1().endpoint());
EXPECT_EQ(master().GetEndpoint(2), worker2().endpoint());
EXPECT_EQ(worker1().GetEndpoint(0), master().endpoint());
EXPECT_EQ(worker1().GetEndpoint(2), worker2().endpoint());
EXPECT_EQ(worker2().GetEndpoint(0), master().endpoint());
EXPECT_EQ(worker2().GetEndpoint(1), worker1().endpoint());
EXPECT_EQ(master().GetEndpoint(1), worker(1).endpoint());
EXPECT_EQ(master().GetEndpoint(2), worker(2).endpoint());
EXPECT_EQ(worker(1).GetEndpoint(0), master().endpoint());
EXPECT_EQ(worker(1).GetEndpoint(2), worker(2).endpoint());
EXPECT_EQ(worker(2).GetEndpoint(0), master().endpoint());
EXPECT_EQ(worker(2).GetEndpoint(1), worker(1).endpoint());
}
TEST_F(DistributedGraphDbTest, TxEngine) {
@ -47,12 +47,12 @@ TEST_F(DistributedGraphDbTest, TxEngine) {
auto *tx2 = master_tx_engine().Begin();
EXPECT_EQ(tx2->snapshot().size(), 1);
EXPECT_EQ(
worker1().tx_engine().RunningTransaction(tx1->id_)->snapshot().size(), 0);
EXPECT_EQ(worker2().tx_engine().RunningTransaction(tx2->id_)->snapshot(),
worker(1).tx_engine().RunningTransaction(tx1->id_)->snapshot().size(), 0);
EXPECT_EQ(worker(2).tx_engine().RunningTransaction(tx2->id_)->snapshot(),
tx2->snapshot());
::testing::FLAGS_gtest_death_test_style = "threadsafe";
EXPECT_DEATH(worker2().tx_engine().RunningTransaction(123), "");
EXPECT_DEATH(worker(2).tx_engine().RunningTransaction(123), "");
}
template <typename TType>
@ -75,82 +75,29 @@ TEST_F(DistributedGraphDbTest, StorageTypes) {
};
test_mappers(mapper_vec<storage::Label>{master().label_mapper(),
worker1().label_mapper(),
worker2().label_mapper()},
worker(1).label_mapper(),
worker(2).label_mapper()},
std::vector<storage::Label>{});
test_mappers(mapper_vec<storage::EdgeType>{master().edge_type_mapper(),
worker1().edge_type_mapper(),
worker2().edge_type_mapper()},
worker(1).edge_type_mapper(),
worker(2).edge_type_mapper()},
std::vector<storage::EdgeType>{});
test_mappers(mapper_vec<storage::Property>{master().property_mapper(),
worker1().property_mapper(),
worker2().property_mapper()},
worker(1).property_mapper(),
worker(2).property_mapper()},
std::vector<storage::Property>{});
}
TEST_F(DistributedGraphDbTest, Counters) {
EXPECT_EQ(master().counters().Get("a"), 0);
EXPECT_EQ(worker1().counters().Get("a"), 1);
EXPECT_EQ(worker2().counters().Get("a"), 2);
EXPECT_EQ(worker(1).counters().Get("a"), 1);
EXPECT_EQ(worker(2).counters().Get("a"), 2);
EXPECT_EQ(worker1().counters().Get("b"), 0);
EXPECT_EQ(worker2().counters().Get("b"), 1);
EXPECT_EQ(worker(1).counters().Get("b"), 0);
EXPECT_EQ(worker(2).counters().Get("b"), 1);
EXPECT_EQ(master().counters().Get("b"), 2);
}
TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
// Only old data is visible remotely, so create and commit some data.
gid::Gid v1_id, v2_id, e1_id;
{
GraphDbAccessor dba{master()};
auto v1 = dba.InsertVertex();
auto v2 = dba.InsertVertex();
auto e1 = dba.InsertEdge(v1, v2, dba.EdgeType("et"));
// Set some data so we see we're getting the right stuff.
v1.PropsSet(dba.Property("p1"), 42);
v1.add_label(dba.Label("label"));
v2.PropsSet(dba.Property("p2"), "value");
e1.PropsSet(dba.Property("p3"), true);
v1_id = v1.gid();
v2_id = v2.gid();
e1_id = e1.gid();
dba.Commit();
}
// The master must start a transaction before workers can work in it.
GraphDbAccessor master_dba{master()};
{
GraphDbAccessor w1_dba{worker1(), master_dba.transaction_id()};
VertexAccessor v1_in_w1{{v1_id, 0}, w1_dba};
EXPECT_NE(v1_in_w1.GetOld(), nullptr);
EXPECT_EQ(v1_in_w1.GetNew(), nullptr);
EXPECT_EQ(v1_in_w1.PropsAt(w1_dba.Property("p1")).Value<int64_t>(), 42);
EXPECT_TRUE(v1_in_w1.has_label(w1_dba.Label("label")));
}
{
GraphDbAccessor w2_dba{worker2(), master_dba.transaction_id()};
VertexAccessor v2_in_w2{{v2_id, 0}, w2_dba};
EXPECT_NE(v2_in_w2.GetOld(), nullptr);
EXPECT_EQ(v2_in_w2.GetNew(), nullptr);
EXPECT_EQ(v2_in_w2.PropsAt(w2_dba.Property("p2")).Value<std::string>(),
"value");
EXPECT_FALSE(v2_in_w2.has_label(w2_dba.Label("label")));
VertexAccessor v1_in_w2{{v1_id, 0}, w2_dba};
EdgeAccessor e1_in_w2{{e1_id, 0}, w2_dba};
EXPECT_EQ(e1_in_w2.from(), v1_in_w2);
EXPECT_EQ(e1_in_w2.to(), v2_in_w2);
EXPECT_EQ(e1_in_w2.EdgeType(), w2_dba.EdgeType("et"));
EXPECT_EQ(e1_in_w2.PropsAt(w2_dba.Property("p3")).Value<bool>(), true);
}
}
TEST_F(DistributedGraphDbTest, DispatchPlan) {
auto kRPCWaitTime = 600ms;
int64_t plan_id = 5;
@ -168,8 +115,8 @@ TEST_F(DistributedGraphDbTest, DispatchPlan) {
EXPECT_EQ(cached.symbol_table.max_position(), symbol_table.max_position());
EXPECT_EQ(cached.symbol_table.table(), symbol_table.table());
};
check_for_worker(worker1());
check_for_worker(worker2());
check_for_worker(worker(1));
check_for_worker(worker(2));
}
TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
@ -234,10 +181,8 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
auto tx1_batch2 = remote_pull(dba_1, worker_id).get();
expect_second_batch(tx1_batch2);
}
master().remote_pull_clients().EndAllRemotePulls(dba_1.transaction_id(),
plan_id);
master().remote_pull_clients().EndAllRemotePulls(dba_2.transaction_id(),
plan_id);
for (auto tx_id : {dba_1.transaction_id(), dba_2.transaction_id()})
master().remote_pull_clients().EndAllRemotePulls(tx_id, plan_id);
}
TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
@ -257,9 +202,9 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
e12.PropsSet(prop, worker_id * 10 + 2);
};
create_data(dba, 0);
GraphDbAccessor dba_w1{worker1(), dba.transaction_id()};
GraphDbAccessor dba_w1{worker(1), dba.transaction_id()};
create_data(dba_w1, 1);
GraphDbAccessor dba_w2{worker2(), dba.transaction_id()};
GraphDbAccessor dba_w2{worker(2), dba.transaction_id()};
create_data(dba_w2, 2);
dba.Commit();
}
@ -347,8 +292,8 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
property = dba0.Property("property");
auto tx_id = dba0.transaction_id();
GraphDbAccessor dba1{worker1(), tx_id};
GraphDbAccessor dba2{worker2(), tx_id};
GraphDbAccessor dba1{worker(1), tx_id};
GraphDbAccessor dba2{worker(2), tx_id};
auto add_vertex = [label, property](GraphDbAccessor &dba) {
auto vertex = dba.InsertVertex();
vertex.add_label(label);
@ -370,27 +315,27 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
GraphDbAccessor dba_master{master()};
{
GraphDbAccessor dba{worker1(), dba_master.transaction_id()};
GraphDbAccessor dba{worker(1), dba_master.transaction_id()};
EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 50);
}
{
GraphDbAccessor dba{worker2(), dba_master.transaction_id()};
GraphDbAccessor dba{worker(2), dba_master.transaction_id()};
EXPECT_TRUE(dba.LabelPropertyIndexExists(label, property));
EXPECT_EQ(CountIterable(dba.Vertices(label, property, false)), 300);
}
}
TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) {
GraphDbAccessor dba_w1(worker1());
GraphDbAccessor dba_w1(worker(1));
auto v = dba_w1.InsertVertex();
auto prop = dba_w1.Property("p");
v.PropsSet(prop, 42);
auto v_ga = v.GlobalAddress();
dba_w1.Commit();
GraphDbAccessor dba_w2(worker2());
GraphDbAccessor dba_w2(worker(2));
VertexAccessor v_in_w2{v_ga, dba_w2};
EXPECT_EQ(v_in_w2.PropsAt(prop).Value<int64_t>(), 42);
}

View File

@ -5,12 +5,12 @@
#include "distributed_common.hpp"
TEST_F(DistributedGraphDbTest, RemoteUpdateLocalVisibility) {
database::GraphDbAccessor dba_tx1{worker1()};
database::GraphDbAccessor dba_tx1{worker(1)};
auto v = dba_tx1.InsertVertex();
auto v_ga = v.GlobalAddress();
dba_tx1.Commit();
database::GraphDbAccessor dba_tx2_w2{worker2()};
database::GraphDbAccessor dba_tx2_w2{worker(2)};
v = VertexAccessor(v_ga, dba_tx2_w2);
ASSERT_FALSE(v.address().is_local());
auto label = dba_tx2_w2.Label("l");
@ -22,7 +22,7 @@ TEST_F(DistributedGraphDbTest, RemoteUpdateLocalVisibility) {
EXPECT_FALSE(v.has_label(label));
// In the same transaction on the owning worker there is no label.
database::GraphDbAccessor dba_tx2_w1{worker1(), dba_tx2_w2.transaction_id()};
database::GraphDbAccessor dba_tx2_w1{worker(1), dba_tx2_w2.transaction_id()};
v = VertexAccessor(v_ga, dba_tx2_w1);
v.SwitchOld();
EXPECT_FALSE(v.has_label(label));