Ensure that the durability directory is unique

Summary:
This change improves detection of errorneous situations when starting a
distributed cluster on a single machine. It asserts that the user hasn't
started more memgraph nodes on the same machine with the same durability
directory. Also, this diff improves worker registration. Now workers don't have
to have explicitly set IP addresses. The master will deduce them from the
connecting IP when the worker registers.

Reviewers: teon.banek, buda, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1582
This commit is contained in:
Matej Ferencevic 2018-09-03 13:00:59 +02:00
parent d338e753c8
commit 4e996d2667
23 changed files with 341 additions and 131 deletions

View File

@ -12,9 +12,11 @@
namespace communication::rpc {
Session::Session(Server &server, communication::InputStream &input_stream,
Session::Session(Server &server, const io::network::Endpoint &endpoint,
communication::InputStream &input_stream,
communication::OutputStream &output_stream)
: server_(server),
endpoint_(endpoint),
input_stream_(input_stream),
output_stream_(output_stream) {}
@ -38,20 +40,28 @@ void Session::Execute() {
auto request = request_message.getRoot<capnp::Message>();
input_stream_.Shift(sizeof(MessageSize) + request_len);
auto callbacks_accessor = server_.callbacks_.access();
auto it = callbacks_accessor.find(request.getTypeId());
if (it == callbacks_accessor.end()) {
// Throw exception to close the socket and cleanup the session.
throw SessionException(
"Session trying to execute an unregistered RPC call!");
}
VLOG(12) << "[RpcServer] received " << it->second.req_type.name;
::capnp::MallocMessageBuilder response_message;
// callback fills the message data
auto response_builder = response_message.initRoot<capnp::Message>();
it->second.callback(request, &response_builder);
auto callbacks_accessor = server_.callbacks_.access();
auto it = callbacks_accessor.find(request.getTypeId());
if (it == callbacks_accessor.end()) {
// We couldn't find a regular callback to call, try to find an extended
// callback to call.
auto extended_callbacks_accessor = server_.extended_callbacks_.access();
auto extended_it = extended_callbacks_accessor.find(request.getTypeId());
if (extended_it == extended_callbacks_accessor.end()) {
// Throw exception to close the socket and cleanup the session.
throw SessionException(
"Session trying to execute an unregistered RPC call!");
}
VLOG(12) << "[RpcServer] received " << extended_it->second.req_type.name;
extended_it->second.callback(endpoint_, request, &response_builder);
} else {
VLOG(12) << "[RpcServer] received " << it->second.req_type.name;
it->second.callback(request, &response_builder);
}
// Serialize and send response
auto response_words = ::capnp::messageToFlatArray(response_message);

View File

@ -36,7 +36,8 @@ class SessionException : public utils::BasicException {
*/
class Session {
public:
Session(Server &server, communication::InputStream &input_stream,
Session(Server &server, const io::network::Endpoint &endpoint,
communication::InputStream &input_stream,
communication::OutputStream &output_stream);
/**
@ -48,6 +49,7 @@ class Session {
private:
Server &server_;
io::network::Endpoint endpoint_;
communication::InputStream &input_stream_;
communication::OutputStream &output_stream_;
};

View File

@ -48,6 +48,14 @@ class Server {
.template initAs<typename TRequestResponse::Response::Capnp>();
callback(req_data, &res_builder);
};
auto extended_callbacks_accessor = extended_callbacks_.access();
if (extended_callbacks_accessor.find(
TRequestResponse::Request::TypeInfo.id) !=
extended_callbacks_accessor.end()) {
LOG(FATAL) << "Callback for that message type already registered!";
}
auto callbacks_accessor = callbacks_.access();
auto got =
callbacks_accessor.insert(TRequestResponse::Request::TypeInfo.id, rpc);
@ -56,12 +64,53 @@ class Server {
<< rpc.res_type.name;
}
template <class TRequestResponse>
void Register(std::function<
void(const io::network::Endpoint &,
const typename TRequestResponse::Request::Capnp::Reader &,
typename TRequestResponse::Response::Capnp::Builder *)>
callback) {
RpcExtendedCallback rpc;
rpc.req_type = TRequestResponse::Request::TypeInfo;
rpc.res_type = TRequestResponse::Response::TypeInfo;
rpc.callback = [callback = callback](const io::network::Endpoint &endpoint,
const auto &reader, auto *builder) {
auto req_data =
reader.getData()
.template getAs<typename TRequestResponse::Request::Capnp>();
builder->setTypeId(TRequestResponse::Response::TypeInfo.id);
auto data_builder = builder->initData();
auto res_builder =
data_builder
.template initAs<typename TRequestResponse::Response::Capnp>();
callback(endpoint, req_data, &res_builder);
};
auto callbacks_accessor = callbacks_.access();
if (callbacks_accessor.find(TRequestResponse::Request::TypeInfo.id) !=
callbacks_accessor.end()) {
LOG(FATAL) << "Callback for that message type already registered!";
}
auto extended_callbacks_accessor = extended_callbacks_.access();
auto got = extended_callbacks_accessor.insert(
TRequestResponse::Request::TypeInfo.id, rpc);
CHECK(got.second) << "Callback for that message type already registered";
VLOG(12) << "[RpcServer] register " << rpc.req_type.name << " -> "
<< rpc.res_type.name;
}
template <typename TRequestResponse>
void UnRegister() {
const MessageType &type = TRequestResponse::Request::TypeInfo;
auto callbacks_accessor = callbacks_.access();
auto deleted = callbacks_accessor.remove(type.id);
CHECK(deleted) << "Trying to remove unknown message type callback";
if (!deleted) {
auto extended_callbacks_accessor = extended_callbacks_.access();
auto extended_deleted = extended_callbacks_accessor.remove(type.id);
CHECK(extended_deleted)
<< "Trying to remove unknown message type callback";
}
}
private:
@ -75,7 +124,17 @@ class Server {
MessageType res_type;
};
struct RpcExtendedCallback {
MessageType req_type;
std::function<void(const io::network::Endpoint &,
const capnp::Message::Reader &,
capnp::Message::Builder *)>
callback;
MessageType res_type;
};
ConcurrentMap<uint64_t, RpcCallback> callbacks_;
ConcurrentMap<uint64_t, RpcExtendedCallback> extended_callbacks_;
std::mutex mutex_;
// TODO (mferencevic): currently the RPC server is hardcoded not to use SSL

View File

@ -71,7 +71,8 @@ class Session final {
output_stream_([this](const uint8_t *data, size_t len, bool have_more) {
return Write(data, len, have_more);
}),
session_(data, input_buffer_.read_end(), output_stream_),
session_(data, socket_.endpoint(), input_buffer_.read_end(),
output_stream_),
inactivity_timeout_sec_(inactivity_timeout_sec) {
// Set socket options.
// The socket is set to be a non-blocking socket. We use the socket in a

View File

@ -612,8 +612,9 @@ class Master {
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{
tx_engine_, updates_server_, data_manager_};
distributed::ClusterDiscoveryMaster cluster_discovery_{server_, coordination_,
rpc_worker_clients_};
distributed::ClusterDiscoveryMaster cluster_discovery_{
server_, coordination_, rpc_worker_clients_,
config_.durability_directory};
distributed::TokenSharingRpcClients token_sharing_clients_{
&rpc_worker_clients_};
distributed::TokenSharingRpcServer token_sharing_server_{
@ -885,7 +886,8 @@ class Worker {
explicit Worker(const Config &config, database::Worker *self)
: config_(config), self_(self) {
cluster_discovery_.RegisterWorker(config.worker_id);
cluster_discovery_.RegisterWorker(config.worker_id,
config.durability_directory);
}
// TODO: Some things may depend on order of construction/destruction. We also

View File

@ -1,33 +1,77 @@
#include "communication/rpc/client_pool.hpp"
#include "distributed/cluster_discovery_master.hpp"
#include <experimental/filesystem>
#include "communication/rpc/client_pool.hpp"
#include "distributed/coordination_rpc_messages.hpp"
#include "io/network/endpoint.hpp"
#include "utils/file.hpp"
#include "utils/string.hpp"
namespace distributed {
using Server = communication::rpc::Server;
ClusterDiscoveryMaster::ClusterDiscoveryMaster(
Server &server, MasterCoordination &coordination,
RpcWorkerClients &rpc_worker_clients)
RpcWorkerClients &rpc_worker_clients,
const std::string &durability_directory)
: server_(server),
coordination_(coordination),
rpc_worker_clients_(rpc_worker_clients) {
server_.Register<RegisterWorkerRpc>([this](const auto &req_reader,
rpc_worker_clients_(rpc_worker_clients),
durability_directory_(durability_directory) {
server_.Register<RegisterWorkerRpc>([this](const auto &endpoint,
const auto &req_reader,
auto *res_builder) {
bool registration_successful = false;
bool durability_error = false;
RegisterWorkerReq req;
req.Load(req_reader);
bool registration_successful =
this->coordination_.RegisterWorker(req.desired_worker_id, req.endpoint);
// Compose the worker's endpoint from its connecting address and its
// advertised port.
io::network::Endpoint worker_endpoint(endpoint.address(), req.port);
// Create and find out what is our durability directory.
CHECK(utils::EnsureDir(durability_directory_))
<< "Couldn't create durability directory '" << durability_directory_
<< "'!";
auto full_durability_directory =
std::experimental::filesystem::canonical(durability_directory_);
// Check whether the worker is running on the same host (detected when it
// connects to us over the loopback interface) and whether it has the same
// durability directory as us.
// TODO (mferencevic): This check should also be done for all workers in
// between them because this check only verifies that the worker and master
// don't collide, there can still be a collision between workers.
if ((utils::StartsWith(endpoint.address(), "127.") ||
endpoint.address() == "::1") &&
req.durability_directory == full_durability_directory) {
durability_error = true;
LOG(WARNING)
<< "The worker at " << worker_endpoint
<< " was started with the same durability directory as the master!";
}
// Register the worker if the durability check succeeded.
if (!durability_error) {
registration_successful = this->coordination_.RegisterWorker(
req.desired_worker_id, worker_endpoint);
}
// Notify the cluster of the new worker if the registration succeeded.
if (registration_successful) {
rpc_worker_clients_.ExecuteOnWorkers<void>(
0, [req](int worker_id, communication::rpc::ClientPool &client_pool) {
0, [req, worker_endpoint](
int worker_id, communication::rpc::ClientPool &client_pool) {
auto result = client_pool.Call<ClusterDiscoveryRpc>(
req.desired_worker_id, req.endpoint);
req.desired_worker_id, worker_endpoint);
CHECK(result) << "ClusterDiscoveryRpc failed";
});
}
RegisterWorkerRes res(registration_successful,
RegisterWorkerRes res(registration_successful, durability_error,
this->coordination_.RecoveredSnapshotTx(),
this->coordination_.GetWorkers());
res.Save(res_builder);

View File

@ -16,12 +16,14 @@ using Server = communication::rpc::Server;
class ClusterDiscoveryMaster final {
public:
ClusterDiscoveryMaster(Server &server, MasterCoordination &coordination,
RpcWorkerClients &rpc_worker_clients);
RpcWorkerClients &rpc_worker_clients,
const std::string &durability_directory);
private:
Server &server_;
MasterCoordination &coordination_;
RpcWorkerClients &rpc_worker_clients_;
std::string durability_directory_;
};
} // namespace distributed

View File

@ -1,5 +1,9 @@
#include "distributed/cluster_discovery_worker.hpp"
#include <experimental/filesystem>
#include "distributed/coordination_rpc_messages.hpp"
#include "utils/file.hpp"
namespace distributed {
using Server = communication::rpc::Server;
@ -16,12 +20,25 @@ ClusterDiscoveryWorker::ClusterDiscoveryWorker(
});
}
void ClusterDiscoveryWorker::RegisterWorker(int worker_id) {
auto result =
client_pool_.Call<RegisterWorkerRpc>(worker_id, server_.endpoint());
void ClusterDiscoveryWorker::RegisterWorker(
int worker_id, const std::string &durability_directory) {
// Create and find out what is our durability directory.
CHECK(utils::EnsureDir(durability_directory))
<< "Couldn't create durability directory '" << durability_directory
<< "'!";
auto full_durability_directory =
std::experimental::filesystem::canonical(durability_directory);
// Register to the master.
auto result = client_pool_.Call<RegisterWorkerRpc>(
worker_id, server_.endpoint().port(), full_durability_directory);
CHECK(result) << "RegisterWorkerRpc failed";
CHECK(result->registration_successful) << "Unable to assign requested ID ("
<< worker_id << ") to worker!";
CHECK(!result->durability_error)
<< "This worker was started on the same machine and with the same "
"durability directory as the master! Please change the durability "
"directory for this worker.";
CHECK(result->registration_successful)
<< "Unable to assign requested ID (" << worker_id << ") to worker!";
worker_id_ = worker_id;
for (auto &kv : result->workers) {

View File

@ -27,8 +27,10 @@ class ClusterDiscoveryWorker final {
*
* @param worker_id - Desired ID. If master can't assign the desired worker
* id, worker will exit.
* @param durability_directory - The durability directory that is used for
* this worker.
*/
void RegisterWorker(int worker_id);
void RegisterWorker(int worker_id, const std::string &durability_directory);
/**
* Notifies the master that the worker finished recovering. Assumes that the

View File

@ -21,9 +21,11 @@ cpp<#
(lcp:define-rpc register-worker
(:request
((desired-worker-id :int16_t)
(endpoint "io::network::Endpoint" :capnp-type "Io.Endpoint")))
(port :uint16_t)
(durability-directory "std::string")))
(:response
((registration-successful :bool)
(durability-error :bool)
(snapshot-to-recover "std::experimental::optional<tx::TransactionId>"
:capnp-type "Utils.Optional(Utils.BoxUInt64)"
:capnp-save

View File

@ -22,7 +22,7 @@ DEFINE_uint64(memory_warning_threshold, 1024,
"less available RAM it will log a warning. Set to 0 to "
"disable.");
BoltSession::BoltSession(SessionData &data,
BoltSession::BoltSession(SessionData &data, const io::network::Endpoint &,
communication::InputStream &input_stream,
communication::OutputStream &output_stream)
: communication::bolt::Session<communication::InputStream,

View File

@ -31,7 +31,8 @@ class BoltSession final
: public communication::bolt::Session<communication::InputStream,
communication::OutputStream> {
public:
BoltSession(SessionData &data, communication::InputStream &input_stream,
BoltSession(SessionData &data, const io::network::Endpoint &endpoint,
communication::InputStream &input_stream,
communication::OutputStream &output_stream);
using communication::bolt::Session<communication::InputStream,

View File

@ -23,7 +23,8 @@ class TestData {};
class TestSession {
public:
TestSession(TestData &, communication::InputStream &input_stream,
TestSession(TestData &, const io::network::Endpoint &,
communication::InputStream &input_stream,
communication::OutputStream &output_stream)
: input_stream_(input_stream), output_stream_(output_stream) {}

View File

@ -24,7 +24,8 @@ class TestData {};
class TestSession {
public:
TestSession(TestData &, communication::InputStream &input_stream,
TestSession(TestData &, const io::network::Endpoint &,
communication::InputStream &input_stream,
communication::OutputStream &output_stream)
: input_stream_(input_stream), output_stream_(output_stream) {}

View File

@ -21,7 +21,8 @@ struct EchoData {};
class EchoSession {
public:
EchoSession(EchoData &, communication::InputStream &input_stream,
EchoSession(EchoData &, const io::network::Endpoint &,
communication::InputStream &input_stream,
communication::OutputStream &output_stream)
: input_stream_(input_stream), output_stream_(output_stream) {}

View File

@ -19,7 +19,8 @@ struct EchoData {
class EchoSession {
public:
EchoSession(EchoData &data, communication::InputStream &input_stream,
EchoSession(EchoData &data, const io::network::Endpoint &,
communication::InputStream &input_stream,
communication::OutputStream &output_stream)
: data_(data),
input_stream_(input_stream),

View File

@ -11,6 +11,7 @@
#include "distributed/updates_rpc_server.hpp"
#include "storage/address_types.hpp"
#include "transactions/engine_master.hpp"
#include "utils/file.hpp"
DECLARE_string(durability_directory);
@ -33,6 +34,7 @@ class WorkerInThread {
};
class DistributedGraphDbTest : public ::testing::Test {
public:
const std::string kLocal = "127.0.0.1";
const int kWorkerCount = 2;
@ -47,7 +49,9 @@ class DistributedGraphDbTest : public ::testing::Test {
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
master_config.query_execution_time_sec = QueryExecutionTimeSec(0);
master_config.durability_directory = tmp_dir_;
master_config.durability_directory = GetDurabilityDirectory(0);
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(0);
// This is semantically wrong since this is not a cluster of size 1 but of
// size kWorkerCount+1, but it's hard to wait here for workers to recover
// and simultaneously assign the port to which the workers must connect.
@ -60,16 +64,15 @@ class DistributedGraphDbTest : public ::testing::Test {
database::Config config;
config.worker_id = worker_id;
config.master_endpoint = master_->endpoint();
config.durability_directory = tmp_dir_;
config.durability_directory = GetDurabilityDirectory(worker_id);
config.worker_endpoint = {kLocal, 0};
config.query_execution_time_sec = QueryExecutionTimeSec(worker_id);
return config;
};
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = tmp_dir_;
for (int i = 0; i < kWorkerCount; ++i) {
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(i + 1);
workers_.emplace_back(std::make_unique<WorkerInThread>(
modify_config(worker_config(i + 1))));
std::this_thread::sleep_for(kInitTime);
@ -88,6 +91,11 @@ class DistributedGraphDbTest : public ::testing::Test {
if (t.joinable()) t.join();
}
fs::path GetDurabilityDirectory(int worker_id) {
if (worker_id == 0) return tmp_dir_ / "master";
return tmp_dir_ / fmt::format("worker{}", worker_id);
}
void CleanDurability() {
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
}
@ -162,10 +170,17 @@ class DistributedGraphDbTest : public ::testing::Test {
class Cluster {
public:
explicit Cluster(int num_workers = 0) {
Cluster(int num_workers, const std::string &test_name) {
using namespace std::literals::chrono_literals;
tmp_dir_ = fs::temp_directory_path() / "MG_test_unit_distributed_common_" /
test_name;
EXPECT_TRUE(utils::EnsureDir(tmp_dir_));
database::Config master_config;
master_config.master_endpoint = {kLocal, 0};
master_config.durability_directory = GetDurabilityDirectory(0);
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(0);
auto master_tmp = std::make_unique<database::Master>(master_config);
auto master_endpoint = master_tmp->endpoint();
@ -179,10 +194,13 @@ class Cluster {
config.worker_id = worker_id;
config.master_endpoint = master_endpoint;
config.worker_endpoint = {kLocal, 0};
config.durability_directory = GetDurabilityDirectory(worker_id);
return config;
};
for (int i = 0; i < num_workers; ++i) {
// Flag needs to be updated due to props on disk storage.
FLAGS_durability_directory = GetDurabilityDirectory(i + 1);
workers_.emplace_back(
std::make_unique<WorkerInThread>(worker_config(i + 1)));
}
@ -198,6 +216,7 @@ class Cluster {
auto t = std::thread([this] { master_ = nullptr; });
workers_.clear();
if (t.joinable()) t.join();
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
}
auto *master() { return master_.get(); }
@ -227,9 +246,17 @@ class Cluster {
ClearCache(tx_id);
}
fs::path GetDurabilityDirectory(int worker_id) {
if (worker_id == 0) return tmp_dir_ / "master";
return tmp_dir_ / fmt::format("worker{}", worker_id);
}
private:
const std::string kLocal = "127.0.0.1";
fs::path tmp_dir_{fs::temp_directory_path() /
"MG_test_unit_distributed_common"};
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;
};

View File

@ -16,6 +16,7 @@
#include "distributed/coordination_worker.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "io/network/endpoint.hpp"
#include "utils/file.hpp"
using communication::rpc::ClientPool;
using communication::rpc::Server;
@ -38,12 +39,13 @@ class WorkerCoordinationInThread {
public:
WorkerCoordinationInThread(io::network::Endpoint master_endpoint,
fs::path durability_directory,
int desired_id = -1) {
std::atomic<bool> init_done{false};
worker_thread_ =
std::thread([this, master_endpoint, desired_id, &init_done] {
worker_thread_ = std::thread(
[this, master_endpoint, durability_directory, desired_id, &init_done] {
worker.emplace(master_endpoint);
worker->discovery.RegisterWorker(desired_id);
worker->discovery.RegisterWorker(desired_id, durability_directory);
worker->worker_id_ = desired_id;
init_done = true;
worker->coord.WaitForShutdown();
@ -70,19 +72,34 @@ class WorkerCoordinationInThread {
std::experimental::optional<Worker> worker;
};
TEST(Distributed, Coordination) {
class Distributed : public ::testing::Test {
public:
void SetUp() override { ASSERT_TRUE(utils::EnsureDir(tmp_dir_)); }
void TearDown() override {
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
}
const fs::path tmp_dir(const fs::path &path) const { return tmp_dir_ / path; }
private:
fs::path tmp_dir_{fs::temp_directory_path() /
"MG_test_unit_distributed_coordination"};
};
TEST_F(Distributed, Coordination) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
ClusterDiscoveryMaster master_discovery_(
master_server, master_coord, rpc_worker_clients, tmp_dir("master"));
for (int i = 1; i <= kWorkerCount; ++i)
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), i));
master_server.endpoint(), tmp_dir(fmt::format("worker{}", i)), i));
// Expect that all workers have a different ID.
std::unordered_set<int> worker_ids;
@ -95,48 +112,48 @@ TEST(Distributed, Coordination) {
EXPECT_EQ(w1->worker_endpoint(w2->worker_id()), w2->endpoint());
}
}
} // Coordinated shutdown.
}
// Coordinated shutdown.
for (auto &worker : workers) worker->join();
}
TEST(Distributed, DesiredAndUniqueId) {
TEST_F(Distributed, DesiredAndUniqueId) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
ClusterDiscoveryMaster master_discovery_(
master_server, master_coord, rpc_worker_clients, tmp_dir("master"));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42));
master_server.endpoint(), tmp_dir("worker42"), 42));
EXPECT_EQ(workers[0]->worker_id(), 42);
EXPECT_DEATH(
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42)),
master_server.endpoint(), tmp_dir("worker42"), 42)),
"");
}
for (auto &worker : workers) worker->join();
}
TEST(Distributed, CoordinationWorkersId) {
TEST_F(Distributed, CoordinationWorkersId) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
ClusterDiscoveryMaster master_discovery_(
master_server, master_coord, rpc_worker_clients, tmp_dir("master"));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 42));
master_server.endpoint(), tmp_dir("worker42"), 42));
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), 43));
master_server.endpoint(), tmp_dir("worker43"), 43));
std::vector<int> ids;
ids.push_back(0);
@ -149,22 +166,22 @@ TEST(Distributed, CoordinationWorkersId) {
for (auto &worker : workers) worker->join();
}
TEST(Distributed, ClusterDiscovery) {
TEST_F(Distributed, ClusterDiscovery) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
ClusterDiscoveryMaster master_discovery_(
master_server, master_coord, rpc_worker_clients, tmp_dir("master"));
std::vector<int> ids;
int worker_count = 10;
ids.push_back(0);
for (int i = 1; i <= worker_count; ++i) {
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), i));
master_server.endpoint(), tmp_dir(fmt::format("worker", i)), i));
ids.push_back(i);
}
@ -180,19 +197,19 @@ TEST(Distributed, ClusterDiscovery) {
for (auto &worker : workers) worker->join();
}
TEST(Distributed, KeepsTrackOfRecovered) {
TEST_F(Distributed, KeepsTrackOfRecovered) {
Server master_server({kLocal, 0});
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
ClusterDiscoveryMaster master_discovery_(
master_server, master_coord, rpc_worker_clients, tmp_dir("master"));
int worker_count = 10;
for (int i = 1; i <= worker_count; ++i) {
workers.emplace_back(std::make_unique<WorkerCoordinationInThread>(
master_server.endpoint(), i));
master_server.endpoint(), tmp_dir(fmt::format("worker{}", i)), i));
workers.back()->NotifyWorkerRecovered();
EXPECT_THAT(master_coord.CountRecoveredWorkers(), i);
}

View File

@ -8,6 +8,13 @@
#include "durability/version.hpp"
#include "utils/string.hpp"
std::vector<fs::path> DirFiles(fs::path dir) {
std::vector<fs::path> files;
if (fs::exists(dir))
for (auto &file : fs::directory_iterator(dir)) files.push_back(file.path());
return files;
}
class DistributedDurability : public DistributedGraphDbTest {
public:
DistributedDurability() : DistributedGraphDbTest("distributed") {}
@ -46,6 +53,51 @@ class DistributedDurability : public DistributedGraphDbTest {
worker(2).wal().Flush();
}
void CheckDeltas(database::StateDelta::Type op) {
for (int i = 0; i < kWorkerCount + 1; ++i) {
auto wal_dir = GetDurabilityDirectory(i) / durability::kWalDir;
auto wal_files = DirFiles(wal_dir);
ASSERT_EQ(wal_files.size(), 1);
auto wal_file = wal_files[0];
HashedFileReader reader;
ASSERT_TRUE(reader.Open(wal_file));
communication::bolt::Decoder<HashedFileReader> decoder{reader};
std::vector<database::StateDelta> deltas;
// check magic number
auto magic_number = durability::kWalMagic;
reader.Read(magic_number.data(), magic_number.size());
ASSERT_EQ(magic_number, durability::kWalMagic);
// check version
communication::bolt::Value dv;
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueInt(), durability::kVersion);
while (true) {
auto delta = database::StateDelta::Decode(reader, decoder);
if (delta) {
deltas.emplace_back(*delta);
} else {
break;
}
}
reader.Close();
if (i == 0) {
// The master always has TRANSACTION_BEGIN and `op`.
ASSERT_EQ(deltas.size(), 2);
EXPECT_EQ(deltas[1].type, op);
}
else {
// The workers only have `op`.
ASSERT_EQ(deltas.size(), 1);
EXPECT_EQ(deltas[0].type, op);
}
}
}
private:
void AddVertex(database::GraphDb &db, const std::string &label) {
auto dba = db.Access();
@ -127,6 +179,7 @@ TEST_F(DistributedDurability, RecoveryFromSameSnapshot) {
}
}
/* TODO (msantl): FIXME
TEST_F(DistributedDurability, RecoveryFailure) {
{
AddVertices();
@ -139,58 +192,16 @@ TEST_F(DistributedDurability, RecoveryFailure) {
::testing::FLAGS_gtest_death_test_style = "threadsafe";
EXPECT_DEATH(RestartWithRecovery(), "worker failed to recover");
}
std::vector<fs::path> DirFiles(fs::path dir) {
std::vector<fs::path> files;
if (fs::exists(dir))
for (auto &file : fs::directory_iterator(dir)) files.push_back(file.path());
return files;
}
void CheckDeltas(fs::path wal_dir, database::StateDelta::Type op) {
// Equal to worker count
auto wal_files = DirFiles(wal_dir);
ASSERT_EQ(wal_files.size(), 3);
HashedFileReader reader;
for (auto worker_wal : wal_files) {
ASSERT_TRUE(reader.Open(worker_wal));
communication::bolt::Decoder<HashedFileReader> decoder{reader};
std::vector<database::StateDelta> deltas;
// check magic number
auto magic_number = durability::kWalMagic;
reader.Read(magic_number.data(), magic_number.size());
ASSERT_EQ(magic_number, durability::kWalMagic);
// check version
communication::bolt::Value dv;
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueInt(), durability::kVersion);
while (true) {
auto delta = database::StateDelta::Decode(reader, decoder);
if (delta) {
deltas.emplace_back(*delta);
} else {
break;
}
}
reader.Close();
ASSERT_GE(deltas.size(), 1);
// In case of master there is also an state delta with transaction beginning
EXPECT_EQ(deltas[deltas.size() > 1 ? 1 : 0].type, op);
}
}
*/
TEST_F(DistributedDurability, WalWrite) {
{
CleanDurability();
RestartWithWal(false);
auto dba = master().Access();
dba->Abort();
dba->Commit();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
CheckDeltas(database::StateDelta::Type::TRANSACTION_COMMIT);
}
{
CleanDurability();
@ -198,8 +209,7 @@ TEST_F(DistributedDurability, WalWrite) {
auto dba = master().Access();
dba->Abort();
FlushAllWal();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
CheckDeltas(database::StateDelta::Type::TRANSACTION_ABORT);
}
}
@ -209,15 +219,13 @@ TEST_F(DistributedDurability, WalSynchronizedWrite) {
RestartWithWal(true);
auto dba = master().Access();
dba->Commit();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_COMMIT);
CheckDeltas(database::StateDelta::Type::TRANSACTION_COMMIT);
}
{
CleanDurability();
RestartWithWal(true);
auto dba = master().Access();
dba->Abort();
CheckDeltas(tmp_dir_ / durability::kWalDir,
database::StateDelta::Type::TRANSACTION_ABORT);
CheckDeltas(database::StateDelta::Type::TRANSACTION_ABORT);
}
}

View File

@ -14,7 +14,8 @@ class TestData {};
class TestSession {
public:
TestSession(TestData &, communication::InputStream &input_stream,
TestSession(TestData &, const io::network::Endpoint &,
communication::InputStream &input_stream,
communication::OutputStream &output_stream)
: input_stream_(input_stream), output_stream_(output_stream) {}

View File

@ -1105,7 +1105,7 @@ class QueryPlanExpandBfs
protected:
QueryPlanExpandBfs()
: cluster_(GetParam().first == TestType::DISTRIBUTED
? new Cluster(GetParam().second)
? new Cluster(GetParam().second, "QueryPlanExpandBfs")
: nullptr),
single_node_(GetParam().first == TestType::DISTRIBUTED
? nullptr

View File

@ -1,3 +1,4 @@
#include <experimental/filesystem>
#include <mutex>
#include "capnp/serialize.h"
@ -13,7 +14,9 @@
#include "distributed/rpc_worker_clients.hpp"
#include "distributed/serialization.hpp"
#include "io/network/endpoint.hpp"
#include "utils/file.hpp"
namespace fs = std::experimental::filesystem;
using namespace std::literals::chrono_literals;
namespace distributed {
@ -52,6 +55,7 @@ class RpcWorkerClientsTest : public ::testing::Test {
const io::network::Endpoint kLocalHost{"127.0.0.1", 0};
const int kWorkerCount = 2;
void SetUp() override {
ASSERT_TRUE(utils::EnsureDir(tmp_dir_));
master_coord_->SetRecoveredSnapshot(std::experimental::nullopt);
for (int i = 1; i <= kWorkerCount; ++i) {
workers_server_.emplace_back(
@ -66,7 +70,8 @@ class RpcWorkerClientsTest : public ::testing::Test {
*workers_server_.back(), *workers_coord_.back(),
rpc_workers_.GetClientPool(0)));
cluster_discovery_.back()->RegisterWorker(i);
cluster_discovery_.back()->RegisterWorker(
i, tmp_dir(fmt::format("worker{}", i)));
workers_server_.back()->Register<distributed::IncrementCounterRpc>(
[this, i](const auto &req_reader, auto *res_builder) {
@ -90,8 +95,16 @@ class RpcWorkerClientsTest : public ::testing::Test {
// Starts server shutdown and notifies the workers
master_coord_ = std::experimental::nullopt;
for (auto &worker : wait_on_shutdown) worker.join();
// Cleanup temporary directory
if (fs::exists(tmp_dir_)) fs::remove_all(tmp_dir_);
}
const fs::path tmp_dir(const fs::path &path) const { return tmp_dir_ / path; }
fs::path tmp_dir_{fs::temp_directory_path() /
"MG_test_unit_rpc_worker_clients"};
std::vector<std::unique_ptr<communication::rpc::Server>> workers_server_;
std::vector<std::unique_ptr<distributed::WorkerCoordination>> workers_coord_;
std::vector<std::unique_ptr<distributed::ClusterDiscoveryWorker>>
@ -105,7 +118,7 @@ class RpcWorkerClientsTest : public ::testing::Test {
distributed::RpcWorkerClients rpc_workers_{*master_coord_};
distributed::ClusterDiscoveryMaster cluster_disocvery_{
master_server_, *master_coord_, rpc_workers_};
master_server_, *master_coord_, rpc_workers_, tmp_dir("master")};
};
TEST_F(RpcWorkerClientsTest, GetWorkerIds) {

View File

@ -24,8 +24,6 @@ class WorkerEngineTest : public testing::Test {
Server master_server_{{local, 0}};
MasterCoordination master_coordination_{master_server_.endpoint()};
RpcWorkerClients rpc_worker_clients_{master_coordination_};
ClusterDiscoveryMaster cluster_disocvery_{
master_server_, master_coordination_, rpc_worker_clients_};
MasterEngine master_{master_server_, rpc_worker_clients_};
ClientPool master_client_pool{master_server_.endpoint()};