Write committed/aborted op to wal

Summary:
Wal on workers didn't contain committed transactions ids, this is needed for
distributed recovery so that the master may decide which transactions are
present on all the workers.

Reviewers: buda, msantl

Reviewed By: buda

Subscribers: pullbot, msantl, buda

Differential Revision: https://phabricator.memgraph.io/D1440
This commit is contained in:
Dominik Gleich 2018-07-05 10:55:00 +02:00
parent 888c6a4bca
commit d9f25cc668
17 changed files with 204 additions and 52 deletions

View File

@ -326,7 +326,8 @@ class Worker : public PrivateBase {
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{*this, data_clients_};
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
tx_engine_, server_, produce_server_, updates_server_, data_manager_};
tx_engine_, &wal(), server_,
produce_server_, updates_server_, data_manager_};
distributed::DurabilityRpcServer durability_rpc_server_{*this, server_};
distributed::ClusterDiscoveryWorker cluster_discovery_{
server_, coordination_, rpc_worker_clients_.GetClientPool(0)};

View File

@ -130,11 +130,12 @@ class OngoingProduceJoinerRpcClients {
OngoingProduceJoinerRpcClients(RpcWorkerClients &clients)
: clients_(clients) {}
void JoinOngoingProduces(tx::TransactionId tx_id) {
void JoinOngoingProduces(tx::TransactionId tx_id, bool committed) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [tx_id](int worker_id, communication::rpc::ClientPool &client_pool) {
auto result =
client_pool.Call<distributed::WaitOnTransactionEndRpc>(tx_id);
0, [tx_id, committed](int worker_id,
communication::rpc::ClientPool &client_pool) {
auto result = client_pool.Call<distributed::WaitOnTransactionEndRpc>(
tx_id, committed);
CHECK(result)
<< "[WaitOnTransactionEndRpc] failed to notify that transaction "
<< tx_id << " ended";

View File

@ -65,20 +65,32 @@ class WorkerTransactionalCacheCleaner : public TransactionalCacheCleaner {
public:
template <class... T>
WorkerTransactionalCacheCleaner(tx::WorkerEngine &tx_engine,
durability::WriteAheadLog *wal,
communication::rpc::Server &server,
ProduceRpcServer &produce_server,
T &... caches)
: TransactionalCacheCleaner(tx_engine, caches...),
wal_(wal),
rpc_server_(server),
produce_server_(produce_server) {
Register(tx_engine);
rpc_server_.Register<WaitOnTransactionEndRpc>([this](const auto &req_reader,
auto *res_builder) {
produce_server_.FinishAndClearOngoingProducePlans(req_reader.getMember());
});
rpc_server_.Register<WaitOnTransactionEndRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto tx_id = req_reader.getTxId();
auto committed = req_reader.getCommitted();
produce_server_.FinishAndClearOngoingProducePlans(tx_id);
if (wal_) {
if (committed) {
wal_->Emplace(database::StateDelta::TxCommit(tx_id));
} else {
wal_->Emplace(database::StateDelta::TxAbort(tx_id));
}
}
});
}
private:
durability::WriteAheadLog *wal_;
communication::rpc::Server &rpc_server_;
ProduceRpcServer &produce_server_;
};

View File

@ -11,7 +11,8 @@ cpp<#
(lcp:capnp-namespace "distributed")
(lcp:define-rpc wait-on-transaction-end
(:request ((member "tx::TransactionId" :capnp-type "UInt64")))
(:request ((tx-id "tx::TransactionId" :capnp-type "UInt64")
(committed :bool)))
(:response ()))
(lcp:pop-namespace)

View File

@ -86,12 +86,12 @@ MasterEngine::MasterEngine(communication::rpc::Server &server,
}
void MasterEngine::Commit(const Transaction &t) {
ongoing_produce_joiner_.JoinOngoingProduces(t.id_);
ongoing_produce_joiner_.JoinOngoingProduces(t.id_, true);
SingleNodeEngine::Commit(t);
}
void MasterEngine::Abort(const Transaction &t) {
ongoing_produce_joiner_.JoinOngoingProduces(t.id_);
ongoing_produce_joiner_.JoinOngoingProduces(t.id_, false);
SingleNodeEngine::Abort(t);
}

View File

@ -88,6 +88,9 @@ target_link_libraries(${test_prefix}distributed_data_exchange memgraph_lib kvsto
add_unit_test(distributed_durability.cpp)
target_link_libraries(${test_prefix}distributed_durability memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_dynamic_graph_partitioner.cpp)
target_link_libraries(${test_prefix}distributed_dynamic_graph_partitioner memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_gc.cpp)
target_link_libraries(${test_prefix}distributed_gc memgraph_lib kvstore_dummy_lib)
@ -106,6 +109,9 @@ target_link_libraries(${test_prefix}distributed_serialization memgraph_lib kvsto
add_unit_test(distributed_updates.cpp)
target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dummy_lib)
add_unit_test(distributed_vertex_migrator.cpp)
target_link_libraries(${test_prefix}distributed_vertex_migrator memgraph_lib kvstore_dummy_lib)
add_unit_test(durability.cpp)
target_link_libraries(${test_prefix}durability memgraph_lib kvstore_dummy_lib)

View File

@ -27,6 +27,7 @@ class BfsTest : public DistributedGraphDbTest {
}
public:
BfsTest() : DistributedGraphDbTest("bfs") {}
std::vector<storage::VertexAddress> vertices;
std::map<std::pair<int, int>, storage::EdgeAddress> edges;
};

View File

@ -145,10 +145,20 @@ class DistributedGraphDbTest : public ::testing::Test {
return std::distance(edges.begin(), edges.end());
};
fs::path tmp_dir_ = fs::temp_directory_path() /
("MG_test_unit_durability" + std::to_string(getpid()));
fs::path tmp_dir_{fs::temp_directory_path() / "MG_test_unit_durability"};
public:
// Each test has to specify its own durability suffix to avoid conflicts
DistributedGraphDbTest() = delete;
DistributedGraphDbTest(const std::string &dir_suffix)
: dir_suffix_(dir_suffix) {
tmp_dir_ =
fs::temp_directory_path() / ("MG_test_unit_durability_" + dir_suffix_);
}
private:
std::string dir_suffix_{""};
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;
};

View File

@ -11,7 +11,12 @@
using namespace database;
using namespace std::literals::chrono_literals;
TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
class DistributedDataExchangeTest : public DistributedGraphDbTest {
public:
DistributedDataExchangeTest() : DistributedGraphDbTest("data_exchange") {}
};
TEST_F(DistributedDataExchangeTest, RemoteDataGetting) {
// Only old data is visible remotely, so create and commit some data.
gid::Gid v1_id, v2_id, e1_id;
@ -64,7 +69,7 @@ TEST_F(DistributedGraphDbTest, RemoteDataGetting) {
}
}
TEST_F(DistributedGraphDbTest, RemoteExpansion) {
TEST_F(DistributedDataExchangeTest, RemoteExpansion) {
// Model (v1)-->(v2), where each vertex is on one worker.
auto from = InsertVertex(worker(1));
auto to = InsertVertex(worker(2));
@ -90,7 +95,7 @@ TEST_F(DistributedGraphDbTest, RemoteExpansion) {
}
}
TEST_F(DistributedGraphDbTest, VertexCountsEqual) {
TEST_F(DistributedDataExchangeTest, VertexCountsEqual) {
for (int i = 0; i < 5; ++i) InsertVertex(master());
for (int i = 0; i < 7; ++i) InsertVertex(worker(1));
for (int i = 0; i < 9; ++i) InsertVertex(worker(2));
@ -115,7 +120,7 @@ TEST_F(DistributedGraphDbTest, VertexCountsEqual) {
}
}
TEST_F(DistributedGraphDbTest, VertexCountsTransactional) {
TEST_F(DistributedDataExchangeTest, VertexCountsTransactional) {
{
GraphDbAccessor accessor(master());
InsertVertex(master());

View File

@ -1,28 +1,50 @@
#include "experimental/filesystem"
#include "distributed_common.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/paths.hpp"
#include "durability/snapshooter.hpp"
#include "utils/string.hpp"
class DistributedDurability : public DistributedGraphDbTest {
public:
DistributedDurability() : DistributedGraphDbTest("distributed") {}
void AddVertices() {
AddVertex(master(), "master");
AddVertex(worker(1), "worker1");
AddVertex(worker(2), "worker2");
}
void CheckVertices(int expected_count) {
CheckVertex(master(), expected_count, "master");
CheckVertex(worker(1), expected_count, "worker1");
CheckVertex(worker(2), expected_count, "worker2");
}
void RestartWithRecovery() {
ShutDown();
DistributedGraphDbTest::ShutDown();
Initialize([](database::Config config) {
config.db_recover_on_startup = true;
return config;
});
}
void RestartWithWal() {
DistributedGraphDbTest::ShutDown();
Initialize([](database::Config config) {
config.durability_enabled = true;
return config;
});
}
void FlushAllWal() {
// TODO(buda): Extend this when we have a fully durable mode
master().wal().Flush();
worker(1).wal().Flush();
worker(2).wal().Flush();
}
private:
void AddVertex(database::GraphDb &db, const std::string &label) {
database::GraphDbAccessor dba(db);
@ -115,3 +137,52 @@ TEST_F(DistributedDurability, RecoveryFailure) {
::testing::FLAGS_gtest_death_test_style = "threadsafe";
EXPECT_DEATH(RestartWithRecovery(), "worker failed to recover");
}
std::vector<fs::path> DirFiles(fs::path dir) {
std::vector<fs::path> files;
if (fs::exists(dir))
for (auto &file : fs::directory_iterator(dir)) files.push_back(file.path());
return files;
}
void CheckDeltas(fs::path wal_dir, database::StateDelta::Type op) {
// Equal to worker count
auto wal_files = DirFiles(wal_dir);
ASSERT_EQ(wal_files.size(), 3);
HashedFileReader reader;
for (auto worker_wal : wal_files) {
ASSERT_TRUE(reader.Open(worker_wal));
communication::bolt::Decoder<HashedFileReader> decoder{reader};
std::vector<database::StateDelta> deltas;
while (true) {
auto delta = database::StateDelta::Decode(reader, decoder);
if (delta) {
deltas.emplace_back(*delta);
} else {
break;
}
}
reader.Close();
ASSERT_GE(deltas.size(), 1);
// In case of master there is also an state delta with transaction beginning
EXPECT_EQ(deltas[deltas.size() > 1 ? 1 : 0].type, op);
}
}
TEST_F(DistributedDurability, WriteCommittedTx) {
RestartWithWal();
database::GraphDbAccessor dba(master());
dba.Commit();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_COMMIT);
}
TEST_F(DistributedDurability, WriteAbortedTx) {
RestartWithWal();
database::GraphDbAccessor dba(master());
dba.Abort();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
}

View File

@ -15,7 +15,13 @@ using namespace database;
DECLARE_int32(dgp_max_batch_size);
TEST_F(DistributedGraphDbTest, CountLabels) {
class DistributedDynamicGraphPartitionerTest : public DistributedGraphDbTest {
public:
DistributedDynamicGraphPartitionerTest()
: DistributedGraphDbTest("dynamic_graph_partitioner") {}
};
TEST_F(DistributedDynamicGraphPartitionerTest, CountLabels) {
auto va = InsertVertex(master());
auto vb = InsertVertex(worker(1));
auto vc = InsertVertex(worker(2));
@ -37,7 +43,7 @@ TEST_F(DistributedGraphDbTest, CountLabels) {
EXPECT_EQ(count_labels[worker(2).WorkerId()], 4 + 6);
}
TEST_F(DistributedGraphDbTest, FindMigrationsMoveVertex) {
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMoveVertex) {
auto va = InsertVertex(master());
auto vb = InsertVertex(worker(1));
@ -54,7 +60,7 @@ TEST_F(DistributedGraphDbTest, FindMigrationsMoveVertex) {
EXPECT_EQ(migrations[0].second, worker(1).WorkerId());
}
TEST_F(DistributedGraphDbTest, FindMigrationsNoChange) {
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsNoChange) {
InsertVertex(master());
InsertVertex(worker(1));
InsertVertex(worker(2));
@ -67,7 +73,7 @@ TEST_F(DistributedGraphDbTest, FindMigrationsNoChange) {
EXPECT_EQ(migrations.size(), 0);
}
TEST_F(DistributedGraphDbTest, FindMigrationsMultipleAndLimit) {
TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMultipleAndLimit) {
auto va = InsertVertex(master());
auto vb = InsertVertex(master());
auto vc = InsertVertex(worker(1));
@ -96,7 +102,7 @@ TEST_F(DistributedGraphDbTest, FindMigrationsMultipleAndLimit) {
}
}
TEST_F(DistributedGraphDbTest, Run) {
TEST_F(DistributedDynamicGraphPartitionerTest, Run) {
// Emulate a bipartite graph with lots of connections on the left, and right
// side, and some connections between the halfs
std::vector<storage::VertexAddress> left;

View File

@ -2,7 +2,12 @@
#include "distributed_common.hpp"
TEST_F(DistributedGraphDbTest, GarbageCollect) {
class DistributedGcTest : public DistributedGraphDbTest {
public:
DistributedGcTest() : DistributedGraphDbTest("gc") {}
};
TEST_F(DistributedGcTest, GarbageCollect) {
database::GraphDbAccessor dba{master()};
auto tx = dba.transaction_id();
dba.Commit();
@ -33,7 +38,7 @@ TEST_F(DistributedGraphDbTest, GarbageCollect) {
EXPECT_EQ(worker(2).tx_engine().Info(tx_last).is_committed(), true);
}
TEST_F(DistributedGraphDbTest, GarbageCollectBlocked) {
TEST_F(DistributedGcTest, GarbageCollectBlocked) {
database::GraphDbAccessor dba{master()};
auto tx = dba.transaction_id();
dba.Commit();

View File

@ -30,7 +30,12 @@ using namespace distributed;
using namespace database;
using namespace std::literals::chrono_literals;
TEST_F(DistributedGraphDbTest, Coordination) {
class DistributedGraphDb : public DistributedGraphDbTest {
public:
DistributedGraphDb() : DistributedGraphDbTest("distributed_graph") {}
};
TEST_F(DistributedGraphDb, Coordination) {
EXPECT_NE(master().endpoint().port(), 0);
EXPECT_NE(worker(1).endpoint().port(), 0);
EXPECT_NE(worker(2).endpoint().port(), 0);
@ -43,7 +48,7 @@ TEST_F(DistributedGraphDbTest, Coordination) {
EXPECT_EQ(worker(2).GetEndpoint(1), worker(1).endpoint());
}
TEST_F(DistributedGraphDbTest, TxEngine) {
TEST_F(DistributedGraphDb, TxEngine) {
auto *tx1 = master_tx_engine().Begin();
auto *tx2 = master_tx_engine().Begin();
EXPECT_EQ(tx2->snapshot().size(), 1);
@ -60,7 +65,7 @@ template <typename TType>
using mapper_vec =
std::vector<std::reference_wrapper<storage::ConcurrentIdMapper<TType>>>;
TEST_F(DistributedGraphDbTest, StorageTypes) {
TEST_F(DistributedGraphDb, StorageTypes) {
auto test_mappers = [](auto mappers, auto ids) {
for (size_t i = 0; i < mappers.size(); ++i) {
ids.emplace_back(
@ -89,7 +94,7 @@ TEST_F(DistributedGraphDbTest, StorageTypes) {
std::vector<storage::Property>{});
}
TEST_F(DistributedGraphDbTest, Counters) {
TEST_F(DistributedGraphDb, Counters) {
EXPECT_EQ(master().counters().Get("a"), 0);
EXPECT_EQ(worker(1).counters().Get("a"), 1);
EXPECT_EQ(worker(2).counters().Get("a"), 2);
@ -99,7 +104,7 @@ TEST_F(DistributedGraphDbTest, Counters) {
EXPECT_EQ(master().counters().Get("b"), 2);
}
TEST_F(DistributedGraphDbTest, DispatchPlan) {
TEST_F(DistributedGraphDb, DispatchPlan) {
auto kRPCWaitTime = 600ms;
int64_t plan_id = 5;
SymbolTable symbol_table;
@ -124,7 +129,7 @@ TEST_F(DistributedGraphDbTest, DispatchPlan) {
EXPECT_DEATH(check_for_worker(worker(1)), "Missing plan*");
}
TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
TEST_F(DistributedGraphDb, BuildIndexDistributed) {
storage::Label label;
storage::Property property;
@ -169,7 +174,7 @@ TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
}
}
TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) {
TEST_F(DistributedGraphDb, WorkerOwnedDbAccessors) {
GraphDbAccessor dba_w1(worker(1));
auto v = dba_w1.InsertVertex();
auto prop = dba_w1.Property("p");

View File

@ -23,6 +23,8 @@ using namespace std::literals::chrono_literals;
class DistributedInterpretationTest : public DistributedGraphDbTest {
protected:
DistributedInterpretationTest() : DistributedGraphDbTest("interpretation") {}
void SetUp() override {
DistributedGraphDbTest::SetUp();
interpreter_.emplace(master());

View File

@ -32,7 +32,12 @@ using namespace distributed;
using namespace database;
using namespace std::literals::chrono_literals;
TEST_F(DistributedGraphDbTest, PullProduceRpc) {
class DistributedQueryPlan : public DistributedGraphDbTest {
protected:
DistributedQueryPlan() : DistributedGraphDbTest("query_plan") {}
};
TEST_F(DistributedQueryPlan, PullProduceRpc) {
GraphDbAccessor dba{master()};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
@ -96,7 +101,7 @@ TEST_F(DistributedGraphDbTest, PullProduceRpc) {
}
}
TEST_F(DistributedGraphDbTest, PullProduceRpcWithGraphElements) {
TEST_F(DistributedQueryPlan, PullProduceRpcWithGraphElements) {
// Create some data on the master and both workers. Eeach edge (3 of them) and
// vertex (6 of them) will be uniquely identified with their worker id and
// sequence ID, so we can check we retrieved all.
@ -191,7 +196,7 @@ TEST_F(DistributedGraphDbTest, PullProduceRpcWithGraphElements) {
check_result(2, future_w2_results.get().frames);
}
TEST_F(DistributedGraphDbTest, Synchronize) {
TEST_F(DistributedQueryPlan, Synchronize) {
auto from = InsertVertex(worker(1));
auto to = InsertVertex(worker(2));
InsertEdge(from, to, "et");
@ -245,7 +250,7 @@ TEST_F(DistributedGraphDbTest, Synchronize) {
// TODO test without advance command?
}
TEST_F(DistributedGraphDbTest, Create) {
TEST_F(DistributedQueryPlan, Create) {
// Query: UNWIND range(0, 1000) as x CREATE ()
auto &db = master();
GraphDbAccessor dba{db};
@ -267,7 +272,7 @@ TEST_F(DistributedGraphDbTest, Create) {
EXPECT_GT(VertexCount(worker(2)), 200);
}
TEST_F(DistributedGraphDbTest, PullRemoteOrderBy) {
TEST_F(DistributedQueryPlan, PullRemoteOrderBy) {
// Create some data on the master and both workers.
storage::Property prop;
{
@ -327,6 +332,8 @@ TEST_F(DistributedGraphDbTest, PullRemoteOrderBy) {
class DistributedTransactionTimeout : public DistributedGraphDbTest {
protected:
DistributedTransactionTimeout()
: DistributedGraphDbTest("transaction_timeout") {}
int QueryExecutionTimeSec(int) override { return 1; }
};

View File

@ -13,6 +13,8 @@
class DistributedUpdateTest : public DistributedGraphDbTest {
protected:
DistributedUpdateTest() : DistributedGraphDbTest("update") {}
std::unique_ptr<database::GraphDbAccessor> dba1;
std::unique_ptr<database::GraphDbAccessor> dba2;
storage::Label label;
@ -66,7 +68,13 @@ TEST_F(DistributedUpdateTest, UpdateApply) {
#undef EXPECT_LABEL
TEST_F(DistributedGraphDbTest, CreateVertex) {
class DistributedGraphDbSimpleUpdatesTest : public DistributedGraphDbTest {
public:
DistributedGraphDbSimpleUpdatesTest()
: DistributedGraphDbTest("simple_updates") {}
};
TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertex) {
gid::Gid gid;
{
database::GraphDbAccessor dba{worker(1)};
@ -81,7 +89,7 @@ TEST_F(DistributedGraphDbTest, CreateVertex) {
}
}
TEST_F(DistributedGraphDbTest, CreateVertexWithUpdate) {
TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertexWithUpdate) {
gid::Gid gid;
storage::Property prop;
{
@ -101,7 +109,7 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithUpdate) {
}
}
TEST_F(DistributedGraphDbTest, CreateVertexWithData) {
TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertexWithData) {
gid::Gid gid;
storage::Label l1;
storage::Label l2;
@ -135,7 +143,7 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithData) {
// Checks if expiring a local record for a local update before applying a remote
// update delta causes a problem
TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) {
TEST_F(DistributedGraphDbSimpleUpdatesTest, UpdateVertexRemoteAndLocal) {
gid::Gid gid;
storage::Label l1;
storage::Label l2;
@ -161,7 +169,7 @@ TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) {
}
}
TEST_F(DistributedGraphDbTest, AddSameLabelRemoteAndLocal) {
TEST_F(DistributedGraphDbSimpleUpdatesTest, AddSameLabelRemoteAndLocal) {
auto v_address = InsertVertex(worker(1));
{
database::GraphDbAccessor dba0{master()};
@ -182,7 +190,7 @@ TEST_F(DistributedGraphDbTest, AddSameLabelRemoteAndLocal) {
}
}
TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) {
TEST_F(DistributedGraphDbSimpleUpdatesTest, IndexGetsUpdatedRemotely) {
storage::VertexAddress v_remote = InsertVertex(worker(1));
storage::Label label;
{
@ -200,7 +208,7 @@ TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) {
}
}
TEST_F(DistributedGraphDbTest, DeleteVertexRemoteCommit) {
TEST_F(DistributedGraphDbSimpleUpdatesTest, DeleteVertexRemoteCommit) {
auto v_address = InsertVertex(worker(1));
database::GraphDbAccessor dba0{master()};
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
@ -212,7 +220,7 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteCommit) {
EXPECT_FALSE(dba1.FindVertexOptional(v_address.gid(), true));
}
TEST_F(DistributedGraphDbTest, DeleteVertexRemoteBothDelete) {
TEST_F(DistributedGraphDbSimpleUpdatesTest, DeleteVertexRemoteBothDelete) {
auto v_address = InsertVertex(worker(1));
{
database::GraphDbAccessor dba0{master()};
@ -227,7 +235,7 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteBothDelete) {
}
}
TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) {
TEST_F(DistributedGraphDbSimpleUpdatesTest, DeleteVertexRemoteStillConnected) {
auto v_address = InsertVertex(worker(1));
auto e_address = InsertEdge(v_address, v_address, "edge");
@ -258,6 +266,8 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) {
class DistributedDetachDeleteTest : public DistributedGraphDbTest {
protected:
DistributedDetachDeleteTest() : DistributedGraphDbTest("detach_delete") {}
storage::VertexAddress w1_a;
storage::VertexAddress w1_b;
storage::VertexAddress w2_a;
@ -352,6 +362,8 @@ TEST_F(DistributedDetachDeleteTest, TwoVerticesSameWorkers) {
class DistributedEdgeCreateTest : public DistributedGraphDbTest {
protected:
DistributedEdgeCreateTest() : DistributedGraphDbTest("edge_create") {}
storage::VertexAddress w1_a;
storage::VertexAddress w1_b;
storage::VertexAddress w2_a;
@ -474,6 +486,8 @@ TEST_F(DistributedEdgeCreateTest, RemoteRemoteCycle) {
class DistributedEdgeRemoveTest : public DistributedGraphDbTest {
protected:
DistributedEdgeRemoveTest() : DistributedGraphDbTest("edge_remove") {}
storage::VertexAddress from_addr;
storage::VertexAddress to_addr;
storage::EdgeAddress edge_addr;

View File

@ -15,14 +15,19 @@ using namespace database;
DECLARE_bool(generate_vertex_ids);
DECLARE_bool(generate_edge_ids);
class DistributedVertexMigratorTest : public DistributedGraphDbTest {
public:
DistributedVertexMigratorTest() : DistributedGraphDbTest("vertex_migrator") {}
};
// Check if the auto-generated gid property is unchanged after migration
TEST_F(DistributedGraphDbTest, VertexEdgeGidSaved) {
TEST_F(DistributedVertexMigratorTest, VertexEdgeGidSaved) {
FLAGS_generate_vertex_ids = true;
FLAGS_generate_edge_ids = true;
// Fill master so that the ids are not the same on master and worker 1
for (int i = 0; i < 10; ++i) {
auto va = InsertVertex(master());
auto ea = InsertEdge(va, va, "edge");
InsertEdge(va, va, "edge");
}
auto va = InsertVertex(master());
@ -71,7 +76,7 @@ TEST_F(DistributedGraphDbTest, VertexEdgeGidSaved) {
// Checks if two connected nodes from master will be transfered to worker 1 and
// if edge from vertex on the worker 2 will now point to worker 1 after transfer
TEST_F(DistributedGraphDbTest, SomeTransfer) {
TEST_F(DistributedVertexMigratorTest, SomeTransfer) {
auto va = InsertVertex(master());
auto vb = InsertVertex(master());
auto vc = InsertVertex(worker(2));
@ -109,7 +114,7 @@ TEST_F(DistributedGraphDbTest, SomeTransfer) {
// Check if cycle edge is transfered only once since it's contained in both in
// and out edges of a vertex and if not handled correctly could cause problems
TEST_F(DistributedGraphDbTest, EdgeCycle) {
TEST_F(DistributedVertexMigratorTest, EdgeCycle) {
auto va = InsertVertex(master());
InsertEdge(va, va, "edge");
{
@ -132,7 +137,7 @@ TEST_F(DistributedGraphDbTest, EdgeCycle) {
EXPECT_EQ(EdgeCount(worker(1)), 1);
}
TEST_F(DistributedGraphDbTest, TransferLabelsAndProperties) {
TEST_F(DistributedVertexMigratorTest, TransferLabelsAndProperties) {
{
database::GraphDbAccessor dba(master());
auto va = dba.InsertVertex();