memgraph/tests/unit/distributed_durability.cpp
Teon Banek 9f460914ed Separate distributed implementation of GraphDbAccessor
Summary:
GraphDbAccessor is now constructed only through GraphDb. This allows the
concrete GraphDb to instantiate a concrete GraphDbAccessor. This allows
us to use virtual calls, so that the implementation may be kept
separate. The major downside of doing things this way is heap allocation
of GraphDbAccessor. In case it turns out to be a real performance
issues, another solution with pointer to static implementation may be
used.

InsertVertexIntoRemote is now a non-member function, which reduces
coupling. It made no sense for it to be member function because it used
only the public parts of GraphDbAccessor.

Reviewers: msantl, mtomic, mferencevic

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1504
2018-07-26 09:16:39 +02:00

189 lines
5.0 KiB
C++

#include "experimental/filesystem"
#include "distributed_common.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/paths.hpp"
#include "durability/snapshooter.hpp"
#include "utils/string.hpp"
class DistributedDurability : public DistributedGraphDbTest {
public:
DistributedDurability() : DistributedGraphDbTest("distributed") {}
void AddVertices() {
AddVertex(master(), "master");
AddVertex(worker(1), "worker1");
AddVertex(worker(2), "worker2");
}
void CheckVertices(int expected_count) {
CheckVertex(master(), expected_count, "master");
CheckVertex(worker(1), expected_count, "worker1");
CheckVertex(worker(2), expected_count, "worker2");
}
void RestartWithRecovery() {
DistributedGraphDbTest::ShutDown();
Initialize([](database::Config config) {
config.db_recover_on_startup = true;
return config;
});
}
void RestartWithWal() {
DistributedGraphDbTest::ShutDown();
Initialize([](database::Config config) {
config.durability_enabled = true;
return config;
});
}
void FlushAllWal() {
// TODO(buda): Extend this when we have a fully durable mode
master().wal().Flush();
worker(1).wal().Flush();
worker(2).wal().Flush();
}
private:
void AddVertex(database::GraphDb &db, const std::string &label) {
auto dba = db.Access();
auto vertex = dba->InsertVertex();
vertex.add_label(dba->Label(label));
dba->Commit();
}
void CheckVertex(database::GraphDb &db, int expected_count,
const std::string &label) {
auto dba = db.Access();
auto it = dba->Vertices(false);
std::vector<VertexAccessor> vertices{it.begin(), it.end()};
EXPECT_EQ(vertices.size(), expected_count);
for (auto &vertex : vertices) {
ASSERT_EQ(vertex.labels().size(), 1);
EXPECT_EQ(vertex.labels()[0], dba->Label(label));
}
}
};
TEST_F(DistributedDurability, MakeSnapshot) {
// Create a graph with 3 nodes with 3 labels, one on each and make a snapshot
// of it
{
AddVertices();
auto dba = master().Access();
master().MakeSnapshot(*dba);
}
// Recover the graph and check if it's the same as before
{
RestartWithRecovery();
CheckVertices(1);
}
}
TEST_F(DistributedDurability, SnapshotOnExit) {
{
TearDown();
Initialize([](database::Config config) {
config.snapshot_on_exit = true;
return config;
});
AddVertices();
}
// Recover the graph and check if it's the same as before
{
RestartWithRecovery();
CheckVertices(1);
}
}
TEST_F(DistributedDurability, RecoveryFromSameSnapshot) {
{
AddVertices();
// Make snapshot on one worker, expect it won't recover from that.
auto dba = worker(1).Access();
worker(1).MakeSnapshot(*dba);
}
{
RestartWithRecovery();
CheckVertices(0);
AddVertices();
auto dba = master().Access();
master().MakeSnapshot(*dba);
}
{
RestartWithRecovery();
CheckVertices(1);
AddVertices();
CheckVertices(2);
// Make snapshot on one worker, expect it won't recover from that.
auto dba = worker(1).Access();
worker(1).MakeSnapshot(*dba);
}
{
RestartWithRecovery();
CheckVertices(1);
}
}
TEST_F(DistributedDurability, RecoveryFailure) {
{
AddVertices();
// Make a snapshot on the master without the right snapshots on workers.
auto dba = master().Access();
bool status = durability::MakeSnapshot(master(), *dba, tmp_dir_, 100);
ASSERT_TRUE(status);
}
::testing::FLAGS_gtest_death_test_style = "threadsafe";
EXPECT_DEATH(RestartWithRecovery(), "worker failed to recover");
}
std::vector<fs::path> DirFiles(fs::path dir) {
std::vector<fs::path> files;
if (fs::exists(dir))
for (auto &file : fs::directory_iterator(dir)) files.push_back(file.path());
return files;
}
void CheckDeltas(fs::path wal_dir, database::StateDelta::Type op) {
// Equal to worker count
auto wal_files = DirFiles(wal_dir);
ASSERT_EQ(wal_files.size(), 3);
HashedFileReader reader;
for (auto worker_wal : wal_files) {
ASSERT_TRUE(reader.Open(worker_wal));
communication::bolt::Decoder<HashedFileReader> decoder{reader};
std::vector<database::StateDelta> deltas;
while (true) {
auto delta = database::StateDelta::Decode(reader, decoder);
if (delta) {
deltas.emplace_back(*delta);
} else {
break;
}
}
reader.Close();
ASSERT_GE(deltas.size(), 1);
// In case of master there is also an state delta with transaction beginning
EXPECT_EQ(deltas[deltas.size() > 1 ? 1 : 0].type, op);
}
}
TEST_F(DistributedDurability, WriteCommittedTx) {
RestartWithWal();
auto dba = master().Access();
dba->Commit();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_COMMIT);
}
TEST_F(DistributedDurability, WriteAbortedTx) {
RestartWithWal();
auto dba = master().Access();
dba->Abort();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
}