Add initial support for multiple clients (#31)

* Add tests for multiple clients
* Use variant for RPC server and clients
* Using synchronized list for replication clients, extracted variant access to a function
* Set MAIN as default, add unregister function, add a name for replication clients
* Use the regular list for clients
* Use test fixture so storage directory is cleaned
* Use seq_cst for replication_state

Co-authored-by: Antonio Andelic <antonio.andelic@memgraph.io>
This commit is contained in:
antonio2368 2020-11-01 15:15:06 +01:00 committed by Antonio Andelic
parent c68ed8d94e
commit b10255a12f
4 changed files with 242 additions and 76 deletions

View File

@ -15,9 +15,11 @@ namespace storage::replication {
class ReplicationClient { class ReplicationClient {
public: public:
ReplicationClient(NameIdMapper *name_id_mapper, Config::Items items, ReplicationClient(std::string name, NameIdMapper *name_id_mapper,
const io::network::Endpoint &endpoint, bool use_ssl) Config::Items items, const io::network::Endpoint &endpoint,
: name_id_mapper_(name_id_mapper), bool use_ssl)
: name_(std::move(name)),
name_id_mapper_(name_id_mapper),
items_(items), items_(items),
rpc_context_(use_ssl), rpc_context_(use_ssl),
rpc_client_(endpoint, &rpc_context_) {} rpc_client_(endpoint, &rpc_context_) {}
@ -72,7 +74,10 @@ class ReplicationClient {
Handler ReplicateTransaction() { return Handler(this); } Handler ReplicateTransaction() { return Handler(this); }
const auto &Name() const { return name_; }
private: private:
std::string name_;
NameIdMapper *name_id_mapper_; NameIdMapper *name_id_mapper_;
Config::Items items_; Config::Items items_;
communication::ClientContext rpc_context_; communication::ClientContext rpc_context_;

View File

@ -4,10 +4,12 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <variant>
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include <glog/logging.h> #include <glog/logging.h>
#include "io/network/endpoint.hpp"
#include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/paths.hpp" #include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp" #include "storage/v2/durability/snapshot.hpp"
@ -419,9 +421,10 @@ Storage::Storage(Config config)
// For testing purposes until we can define the instance type from // For testing purposes until we can define the instance type from
// a query. // a query.
if (FLAGS_main) { if (FLAGS_main) {
SetReplicationState(ReplicationState::MAIN); SetReplicationState<ReplicationState::MAIN>();
} else if (FLAGS_replica) { } else if (FLAGS_replica) {
SetReplicationState(ReplicationState::REPLICA); SetReplicationState<ReplicationState::REPLICA>(
io::network::Endpoint{"127.0.0.1", 1000});
} }
#endif #endif
} }
@ -431,9 +434,9 @@ Storage::~Storage() {
gc_runner_.Stop(); gc_runner_.Stop();
} }
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
if (replication_server_) { {
replication_server_->Shutdown(); std::lock_guard<utils::RWLock> replication_guard(replication_lock_);
replication_server_->AwaitShutdown(); rpc_context_.emplace<std::monostate>();
} }
#endif #endif
wal_file_ = std::nullopt; wal_file_ = std::nullopt;
@ -1649,16 +1652,21 @@ void Storage::AppendToWal(const Transaction &transaction,
transaction.commit_timestamp->load(std::memory_order_acquire); transaction.commit_timestamp->load(std::memory_order_acquire);
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
// We need to keep this lock because handler takes a pointer to the client
// from which it was created
std::shared_lock<utils::RWLock> replication_guard(replication_lock_); std::shared_lock<utils::RWLock> replication_guard(replication_lock_);
std::optional<replication::ReplicationClient::Handler> stream; std::list<replication::ReplicationClient::Handler> streams;
if (replication_client_) { if (replication_state_.load() == ReplicationState::MAIN) {
auto &replication_clients = GetRpcContext<ReplicationClientList>();
try { try {
stream.emplace(replication_client_->ReplicateTransaction()); std::transform(replication_clients.begin(), replication_clients.end(),
std::back_inserter(streams), [](auto &client) {
return client.ReplicateTransaction();
});
} catch (const rpc::RpcFailedException &) { } catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!"; LOG(FATAL) << "Couldn't replicate data!";
} }
} }
replication_guard.unlock();
#endif #endif
// Helper lambda that traverses the delta chain on order to find the first // Helper lambda that traverses the delta chain on order to find the first
@ -1677,12 +1685,12 @@ void Storage::AppendToWal(const Transaction &transaction,
if (filter(delta->action)) { if (filter(delta->action)) {
wal_file_->AppendDelta(*delta, parent, final_commit_timestamp); wal_file_->AppendDelta(*delta, parent, final_commit_timestamp);
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
if (stream) { try {
try { for (auto &stream : streams) {
stream->AppendDelta(*delta, parent, final_commit_timestamp); stream.AppendDelta(*delta, parent, final_commit_timestamp);
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
} }
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
} }
#endif #endif
} }
@ -1819,13 +1827,13 @@ void Storage::AppendToWal(const Transaction &transaction,
FinalizeWalFile(); FinalizeWalFile();
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
if (stream) { try {
try { for (auto &stream : streams) {
stream->AppendTransactionEnd(final_commit_timestamp); stream.AppendTransactionEnd(final_commit_timestamp);
stream->Finalize(); stream.Finalize();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
} }
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
} }
#endif #endif
} }
@ -1838,14 +1846,17 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation,
final_commit_timestamp); final_commit_timestamp);
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
std::shared_lock<utils::RWLock> replication_guard(replication_lock_); std::shared_lock<utils::RWLock> replication_guard(replication_lock_);
if (replication_client_) { if (replication_state_.load() == ReplicationState::MAIN) {
auto stream = replication_client_->ReplicateTransaction(); auto &replication_clients = GetRpcContext<ReplicationClientList>();
try { for (auto &client : replication_clients) {
stream.AppendOperation(operation, label, properties, auto stream = client.ReplicateTransaction();
final_commit_timestamp); try {
stream.Finalize(); stream.AppendOperation(operation, label, properties,
} catch (const rpc::RpcFailedException &) { final_commit_timestamp);
LOG(FATAL) << "Couldn't replicate data!"; stream.Finalize();
} catch (const rpc::RpcFailedException &) {
LOG(FATAL) << "Couldn't replicate data!";
}
} }
} }
replication_guard.unlock(); replication_guard.unlock();
@ -1854,20 +1865,25 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation,
} }
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
void Storage::ConfigureReplica() { void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
rpc_context_.emplace<ReplicationServer>();
auto &replication_server = GetRpcContext<ReplicationServer>();
// Create RPC server. // Create RPC server.
// TODO(mferencevic): Add support for SSL. // TODO(mferencevic): Add support for SSL.
replication_server_context_.emplace(); replication_server.rpc_server_context.emplace();
// NOTE: The replication server must have a single thread for processing // NOTE: The replication server must have a single thread for processing
// because there is no need for more processing threads - each replica can // because there is no need for more processing threads - each replica can
// have only a single main server. Also, the single-threaded guarantee // have only a single main server. Also, the single-threaded guarantee
// simplifies the rest of the implementation. // simplifies the rest of the implementation.
// TODO(mferencevic): Make endpoint configurable. // TODO(mferencevic): Make endpoint configurable.
replication_server_.emplace(io::network::Endpoint{"127.0.0.1", 10000}, replication_server.rpc_server.emplace(endpoint,
&*replication_server_context_, &*replication_server.rpc_server_context,
/* workers_count = */ 1); /* workers_count = */ 1);
replication_server_->Register<AppendDeltasRpc>([this](auto *req_reader, replication_server.rpc_server->Register<
auto *res_builder) { AppendDeltasRpc>([this, endpoint = std::move(endpoint)](
auto *req_reader, auto *res_builder) {
AppendDeltasReq req; AppendDeltasReq req;
slk::Load(&req, req_reader); slk::Load(&req, req_reader);
@ -2235,38 +2251,36 @@ void Storage::ConfigureReplica() {
AppendDeltasRes res; AppendDeltasRes res;
slk::Save(res, res_builder); slk::Save(res, res_builder);
}); });
replication_server_->Start(); replication_server.rpc_server->Start();
} }
void Storage::ConfigureMain() { void Storage::RegisterReplica(std::string name,
replication_client_.emplace(&name_id_mapper_, config_.items, io::network::Endpoint endpoint) {
io::network::Endpoint{"127.0.0.1", 10000}, false);
}
void Storage::SetReplicationState(const ReplicationState state) {
if (replication_state_.load(std::memory_order_acquire) == state) {
return;
}
std::unique_lock<utils::RWLock> replication_guard(replication_lock_); std::unique_lock<utils::RWLock> replication_guard(replication_lock_);
CHECK(replication_state_.load() == ReplicationState::MAIN)
<< "Only main instance can register a replica!";
auto &replication_clients = GetRpcContext<ReplicationClientList>();
replication_server_.reset(); // TODO (antonio2368): Check if it's okay to aquire first the shared lock
replication_server_context_.reset(); // and later on aquire the write lock only if it's necessary
replication_client_.reset(); // because here we wait for the write lock even though there exists
// a replica with a same name
switch (state) { if (std::any_of(replication_clients.begin(), replication_clients.end(),
case ReplicationState::MAIN: [&](auto &client) { return client.Name() == name; })) {
ConfigureMain(); throw utils::BasicException("Replica with a same name already exists!");
break;
case ReplicationState::REPLICA:
ConfigureReplica();
break;
case ReplicationState::NONE:
default:
break;
} }
replication_state_.store(state, std::memory_order_release); replication_clients.emplace_back(std::move(name), &name_id_mapper_,
config_.items, endpoint, false);
}
void Storage::UnregisterReplica(const std::string &name) {
std::unique_lock<utils::RWLock> replication_guard(replication_lock_);
CHECK(replication_state_.load() == ReplicationState::MAIN)
<< "Only main instance can unregister a replica!";
auto &replication_clients = GetRpcContext<ReplicationClientList>();
replication_clients.remove_if(
[&](const auto &client) { return client.Name() == name; });
} }
#endif #endif

View File

@ -4,6 +4,7 @@
#include <filesystem> #include <filesystem>
#include <optional> #include <optional>
#include <shared_mutex> #include <shared_mutex>
#include <variant>
#include "storage/v2/commit_log.hpp" #include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp" #include "storage/v2/config.hpp"
@ -168,7 +169,7 @@ struct StorageInfo {
}; };
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
enum class ReplicationState : uint8_t { NONE, MAIN, REPLICA }; enum class ReplicationState : uint8_t { MAIN, REPLICA };
#endif #endif
class Storage final { class Storage final {
@ -405,7 +406,26 @@ class Storage final {
StorageInfo GetInfo() const; StorageInfo GetInfo() const;
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
void SetReplicationState(ReplicationState state); template <ReplicationState state, typename... Args>
void SetReplicationState(Args &&... args) {
if (replication_state_.load() == state) {
return;
}
std::unique_lock<utils::RWLock> replication_guard(replication_lock_);
rpc_context_.emplace<std::monostate>();
if constexpr (state == ReplicationState::REPLICA) {
ConfigureReplica(std::forward<Args>(args)...);
} else if (state == ReplicationState::MAIN) {
rpc_context_.emplace<ReplicationClientList>();
}
replication_state_.store(state);
}
void RegisterReplica(std::string name, io::network::Endpoint endpoint);
void UnregisterReplica(const std::string &name);
#endif #endif
private: private:
@ -425,8 +445,7 @@ class Storage final {
uint64_t final_commit_timestamp); uint64_t final_commit_timestamp);
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
void ConfigureReplica(); void ConfigureReplica(io::network::Endpoint endpoint);
void ConfigureMain();
#endif #endif
// Main storage lock. // Main storage lock.
@ -505,11 +524,39 @@ class Storage final {
// Replication // Replication
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
utils::RWLock replication_lock_{utils::RWLock::Priority::WRITE}; utils::RWLock replication_lock_{utils::RWLock::Priority::WRITE};
std::optional<communication::ServerContext> replication_server_context_;
std::optional<rpc::Server> replication_server_; struct ReplicationServer {
// TODO(mferencevic): Add support for multiple clients. std::optional<communication::ServerContext> rpc_server_context;
std::optional<replication::ReplicationClient> replication_client_; std::optional<rpc::Server> rpc_server;
std::atomic<ReplicationState> replication_state_{ReplicationState::NONE};
explicit ReplicationServer() = default;
ReplicationServer(const ReplicationServer &) = delete;
ReplicationServer(ReplicationServer &&) = delete;
ReplicationServer &operator=(const ReplicationServer &) = delete;
ReplicationServer &operator=(ReplicationServer &&) = delete;
~ReplicationServer() {
if (rpc_server) {
rpc_server->Shutdown();
rpc_server->AwaitShutdown();
}
}
};
using ReplicationClientList = std::list<replication::ReplicationClient>;
// Monostate is used for explicitly calling the destructor of the current
// type
std::variant<ReplicationClientList, ReplicationServer, std::monostate>
rpc_context_;
template <typename TRpcContext>
TRpcContext &GetRpcContext() {
auto *context = std::get_if<TRpcContext>(&rpc_context_);
CHECK(context) << "Wrong type set for the current replication state!";
return *context;
}
std::atomic<ReplicationState> replication_state_{ReplicationState::MAIN};
#endif #endif
}; };

View File

@ -11,11 +11,23 @@
using testing::UnorderedElementsAre; using testing::UnorderedElementsAre;
TEST(ReplicationTest, BasicSynchronousReplicationTest) { class ReplicationTest : public ::testing::Test {
protected:
std::filesystem::path storage_directory{ std::filesystem::path storage_directory{
std::filesystem::temp_directory_path() / std::filesystem::temp_directory_path() /
"MG_test_unit_storage_v2_replication"}; "MG_test_unit_storage_v2_replication"};
void SetUp() override { Clear(); }
void TearDown() override { Clear(); }
private:
void Clear() {
if (!std::filesystem::exists(storage_directory)) return;
std::filesystem::remove_all(storage_directory);
}
};
TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
storage::Storage main_store( storage::Storage main_store(
{.items = {.properties_on_edges = true}, {.items = {.properties_on_edges = true},
.durability = { .durability = {
@ -23,7 +35,6 @@ TEST(ReplicationTest, BasicSynchronousReplicationTest) {
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL, PERIODIC_SNAPSHOT_WITH_WAL,
}}); }});
main_store.SetReplicationState(storage::ReplicationState::MAIN);
storage::Storage replica_store( storage::Storage replica_store(
{.items = {.properties_on_edges = true}, {.items = {.properties_on_edges = true},
@ -32,7 +43,11 @@ TEST(ReplicationTest, BasicSynchronousReplicationTest) {
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL, PERIODIC_SNAPSHOT_WITH_WAL,
}}); }});
replica_store.SetReplicationState(storage::ReplicationState::REPLICA); replica_store.SetReplicationState<storage::ReplicationState::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
main_store.RegisterReplica("REPLICA",
io::network::Endpoint{"127.0.0.1", 10000});
// vertex create // vertex create
// vertex add label // vertex add label
@ -250,3 +265,88 @@ TEST(ReplicationTest, BasicSynchronousReplicationTest) {
ASSERT_EQ(constraints.unique.size(), 0); ASSERT_EQ(constraints.unique.size(), 0);
} }
} }
TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
storage::Storage main_store(
{.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
storage::Storage replica_store1(
{.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store1.SetReplicationState<storage::ReplicationState::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
storage::Storage replica_store2(
{.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store2.SetReplicationState<storage::ReplicationState::REPLICA>(
io::network::Endpoint{"127.0.0.1", 20000});
main_store.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000});
main_store.RegisterReplica("REPLICA2",
io::network::Endpoint{"127.0.0.1", 20000});
const auto *vertex_label = "label";
const auto *vertex_property = "property";
const auto *vertex_property_value = "property_value";
std::optional<storage::Gid> vertex_gid;
{
auto acc = main_store.Access();
auto v = acc.CreateVertex();
ASSERT_TRUE(v.AddLabel(main_store.NameToLabel(vertex_label)).HasValue());
ASSERT_TRUE(v.SetProperty(main_store.NameToProperty(vertex_property),
storage::PropertyValue(vertex_property_value))
.HasValue());
vertex_gid.emplace(v.Gid());
ASSERT_FALSE(acc.Commit().HasError());
}
const auto check_replica = [&](storage::Storage *replica_store) {
auto acc = replica_store->Access();
const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
const auto labels = v->Labels(storage::View::OLD);
ASSERT_TRUE(labels.HasValue());
ASSERT_THAT(*labels,
UnorderedElementsAre(replica_store->NameToLabel(vertex_label)));
ASSERT_FALSE(acc.Commit().HasError());
};
check_replica(&replica_store1);
check_replica(&replica_store2);
main_store.UnregisterReplica("REPLICA2");
{
auto acc = main_store.Access();
auto v = acc.CreateVertex();
vertex_gid.emplace(v.Gid());
ASSERT_FALSE(acc.Commit().HasError());
}
// REPLICA1 should contain the new vertex
{
auto acc = replica_store1.Access();
const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_TRUE(v);
ASSERT_FALSE(acc.Commit().HasError());
}
// REPLICA2 should not contain the new vertex
{
auto acc = replica_store2.Access();
const auto v = acc.FindVertex(*vertex_gid, storage::View::OLD);
ASSERT_FALSE(v);
ASSERT_FALSE(acc.Commit().HasError());
}
}