4e996d2667
Summary: This change improves detection of errorneous situations when starting a distributed cluster on a single machine. It asserts that the user hasn't started more memgraph nodes on the same machine with the same durability directory. Also, this diff improves worker registration. Now workers don't have to have explicitly set IP addresses. The master will deduce them from the connecting IP when the worker registers. Reviewers: teon.banek, buda, msantl Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1582
263 lines
8.4 KiB
C++
263 lines
8.4 KiB
C++
#include <experimental/filesystem>
|
|
#include <memory>
|
|
#include <thread>
|
|
|
|
#include <gflags/gflags.h>
|
|
#include <gtest/gtest.h>
|
|
|
|
#include "database/distributed_graph_db.hpp"
|
|
#include "database/graph_db_accessor.hpp"
|
|
#include "distributed/data_manager.hpp"
|
|
#include "distributed/updates_rpc_server.hpp"
|
|
#include "storage/address_types.hpp"
|
|
#include "transactions/engine_master.hpp"
|
|
#include "utils/file.hpp"
|
|
|
|
DECLARE_string(durability_directory);
|
|
|
|
namespace fs = std::experimental::filesystem;
|
|
|
|
class WorkerInThread {
|
|
public:
|
|
explicit WorkerInThread(database::Config config) : worker_(config) {
|
|
thread_ = std::thread([this, config] { worker_.WaitForShutdown(); });
|
|
}
|
|
|
|
~WorkerInThread() {
|
|
if (thread_.joinable()) thread_.join();
|
|
}
|
|
|
|
database::Worker *db() { return &worker_; }
|
|
|
|
database::Worker worker_;
|
|
std::thread thread_;
|
|
};
|
|
|
|
class DistributedGraphDbTest : public ::testing::Test {
|
|
public:
|
|
const std::string kLocal = "127.0.0.1";
|
|
const int kWorkerCount = 2;
|
|
|
|
protected:
|
|
virtual int QueryExecutionTimeSec(int) { return 180; }
|
|
|
|
void Initialize(
|
|
std::function<database::Config(database::Config config)> modify_config) {
|
|
using namespace std::literals::chrono_literals;
|
|
const auto kInitTime = 200ms;
|
|
|
|
database::Config master_config;
|
|
master_config.master_endpoint = {kLocal, 0};
|
|
master_config.query_execution_time_sec = QueryExecutionTimeSec(0);
|
|
master_config.durability_directory = GetDurabilityDirectory(0);
|
|
// Flag needs to be updated due to props on disk storage.
|
|
FLAGS_durability_directory = GetDurabilityDirectory(0);
|
|
// This is semantically wrong since this is not a cluster of size 1 but of
|
|
// size kWorkerCount+1, but it's hard to wait here for workers to recover
|
|
// and simultaneously assign the port to which the workers must connect.
|
|
// TODO (buda): Fix sometime in the future - not mission critical.
|
|
master_config.recovering_cluster_size = 1;
|
|
master_ = std::make_unique<database::Master>(modify_config(master_config));
|
|
|
|
std::this_thread::sleep_for(kInitTime);
|
|
auto worker_config = [this](int worker_id) {
|
|
database::Config config;
|
|
config.worker_id = worker_id;
|
|
config.master_endpoint = master_->endpoint();
|
|
config.durability_directory = GetDurabilityDirectory(worker_id);
|
|
config.worker_endpoint = {kLocal, 0};
|
|
config.query_execution_time_sec = QueryExecutionTimeSec(worker_id);
|
|
return config;
|
|
};
|
|
|
|
for (int i = 0; i < kWorkerCount; ++i) {
|
|
// Flag needs to be updated due to props on disk storage.
|
|
FLAGS_durability_directory = GetDurabilityDirectory(i + 1);
|
|
workers_.emplace_back(std::make_unique<WorkerInThread>(
|
|
modify_config(worker_config(i + 1))));
|
|
std::this_thread::sleep_for(kInitTime);
|
|
}
|
|
}
|
|
|
|
void SetUp() override {
|
|
Initialize([](database::Config config) { return config; });
|
|
}
|
|
|
|
void ShutDown() {
|
|
// Kill master first because it will expect a shutdown response from the
|
|
// workers.
|
|
auto t = std::thread([this]() { master_ = nullptr; });
|
|
workers_.clear();
|
|
if (t.joinable()) t.join();
|
|
}
|
|
|
|
fs::path GetDurabilityDirectory(int worker_id) {
|
|
if (worker_id == 0) return tmp_dir_ / "master";
|
|
return tmp_dir_ / fmt::format("worker{}", worker_id);
|
|
}
|
|
|
|
void CleanDurability() {
|
|
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
|
|
}
|
|
|
|
void TearDown() override {
|
|
ShutDown();
|
|
CleanDurability();
|
|
}
|
|
|
|
database::Master &master() { return *master_; }
|
|
|
|
database::Worker &worker(int worker_id) {
|
|
return workers_[worker_id - 1]->worker_;
|
|
}
|
|
|
|
/// Inserts a vertex and returns it's global address. Does it in a new
|
|
/// transaction.
|
|
storage::VertexAddress InsertVertex(database::GraphDb &db) {
|
|
auto dba = db.Access();
|
|
auto r_val = dba->InsertVertex().GlobalAddress();
|
|
dba->Commit();
|
|
return r_val;
|
|
}
|
|
|
|
/// Inserts an edge (on the 'from' side) and returns it's global address.
|
|
auto InsertEdge(storage::VertexAddress from_addr,
|
|
storage::VertexAddress to_addr,
|
|
const std::string &edge_type_name) {
|
|
CHECK(from_addr.is_remote() && to_addr.is_remote())
|
|
<< "Distributed test InsertEdge only takes global addresses";
|
|
auto dba = master().Access();
|
|
VertexAccessor from{from_addr, *dba};
|
|
VertexAccessor to{to_addr, *dba};
|
|
auto r_val = dba->InsertEdge(from, to, dba->EdgeType(edge_type_name))
|
|
.GlobalAddress();
|
|
master().updates_server().Apply(dba->transaction_id());
|
|
worker(1).updates_server().Apply(dba->transaction_id());
|
|
worker(2).updates_server().Apply(dba->transaction_id());
|
|
dba->Commit();
|
|
return r_val;
|
|
}
|
|
|
|
auto VertexCount(database::GraphDb &db) {
|
|
auto dba = db.Access();
|
|
auto vertices = dba->Vertices(false);
|
|
return std::distance(vertices.begin(), vertices.end());
|
|
};
|
|
|
|
auto EdgeCount(database::GraphDb &db) {
|
|
auto dba = db.Access();
|
|
auto edges = dba->Edges(false);
|
|
return std::distance(edges.begin(), edges.end());
|
|
};
|
|
|
|
fs::path tmp_dir_{fs::temp_directory_path() / "MG_test_unit_durability"};
|
|
|
|
public:
|
|
// Each test has to specify its own durability suffix to avoid conflicts
|
|
DistributedGraphDbTest() = delete;
|
|
|
|
DistributedGraphDbTest(const std::string &dir_suffix)
|
|
: dir_suffix_(dir_suffix) {
|
|
tmp_dir_ =
|
|
fs::temp_directory_path() / ("MG_test_unit_durability_" + dir_suffix_);
|
|
}
|
|
|
|
private:
|
|
std::string dir_suffix_{""};
|
|
std::unique_ptr<database::Master> master_;
|
|
std::vector<std::unique_ptr<WorkerInThread>> workers_;
|
|
};
|
|
|
|
class Cluster {
|
|
public:
|
|
Cluster(int num_workers, const std::string &test_name) {
|
|
using namespace std::literals::chrono_literals;
|
|
tmp_dir_ = fs::temp_directory_path() / "MG_test_unit_distributed_common_" /
|
|
test_name;
|
|
EXPECT_TRUE(utils::EnsureDir(tmp_dir_));
|
|
|
|
database::Config master_config;
|
|
master_config.master_endpoint = {kLocal, 0};
|
|
master_config.durability_directory = GetDurabilityDirectory(0);
|
|
// Flag needs to be updated due to props on disk storage.
|
|
FLAGS_durability_directory = GetDurabilityDirectory(0);
|
|
|
|
auto master_tmp = std::make_unique<database::Master>(master_config);
|
|
auto master_endpoint = master_tmp->endpoint();
|
|
master_ = std::move(master_tmp);
|
|
|
|
const auto kInitTime = 200ms;
|
|
std::this_thread::sleep_for(kInitTime);
|
|
|
|
auto worker_config = [this, master_endpoint](int worker_id) {
|
|
database::Config config;
|
|
config.worker_id = worker_id;
|
|
config.master_endpoint = master_endpoint;
|
|
config.worker_endpoint = {kLocal, 0};
|
|
config.durability_directory = GetDurabilityDirectory(worker_id);
|
|
return config;
|
|
};
|
|
|
|
for (int i = 0; i < num_workers; ++i) {
|
|
// Flag needs to be updated due to props on disk storage.
|
|
FLAGS_durability_directory = GetDurabilityDirectory(i + 1);
|
|
workers_.emplace_back(
|
|
std::make_unique<WorkerInThread>(worker_config(i + 1)));
|
|
}
|
|
std::this_thread::sleep_for(kInitTime);
|
|
}
|
|
|
|
Cluster(const Cluster &) = delete;
|
|
Cluster(Cluster &&) = delete;
|
|
Cluster &operator=(const Cluster &) = delete;
|
|
Cluster &operator=(Cluster &&) = delete;
|
|
|
|
~Cluster() {
|
|
auto t = std::thread([this] { master_ = nullptr; });
|
|
workers_.clear();
|
|
if (t.joinable()) t.join();
|
|
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
|
|
}
|
|
|
|
auto *master() { return master_.get(); }
|
|
auto workers() {
|
|
return iter::imap([](auto &worker) { return worker->db(); }, workers_);
|
|
}
|
|
|
|
void ClearCache(tx::TransactionId tx_id) {
|
|
master()->data_manager().ClearCacheForSingleTransaction(tx_id);
|
|
for (auto member : workers()) {
|
|
member->data_manager().ClearCacheForSingleTransaction(tx_id);
|
|
}
|
|
}
|
|
|
|
void ApplyUpdates(tx::TransactionId tx_id) {
|
|
master()->updates_server().Apply(tx_id);
|
|
for (auto member : workers()) {
|
|
member->updates_server().Apply(tx_id);
|
|
}
|
|
ClearCache(tx_id);
|
|
}
|
|
|
|
void AdvanceCommand(tx::TransactionId tx_id) {
|
|
ApplyUpdates(tx_id);
|
|
master()->tx_engine().Advance(tx_id);
|
|
for (auto worker : workers()) worker->tx_engine().UpdateCommand(tx_id);
|
|
ClearCache(tx_id);
|
|
}
|
|
|
|
fs::path GetDurabilityDirectory(int worker_id) {
|
|
if (worker_id == 0) return tmp_dir_ / "master";
|
|
return tmp_dir_ / fmt::format("worker{}", worker_id);
|
|
}
|
|
|
|
private:
|
|
const std::string kLocal = "127.0.0.1";
|
|
|
|
fs::path tmp_dir_{fs::temp_directory_path() /
|
|
"MG_test_unit_distributed_common"};
|
|
|
|
std::unique_ptr<database::Master> master_;
|
|
std::vector<std::unique_ptr<WorkerInThread>> workers_;
|
|
};
|