Add configs and support for semi-sync and SSL (#55)

* Add config for replication client/server
* Add SSL to replication
* Add semi-sync replication
* Expose necessary information about replication
* Thread pool fix
* Set BasicResult value type to void
This commit is contained in:
antonio2368 2020-12-03 13:28:23 +01:00 committed by Antonio Andelic
parent a0705746cb
commit 200ce5f45e
11 changed files with 432 additions and 119 deletions

View File

@ -119,7 +119,7 @@ class Client {
/// RPC call (eg. connection failed, remote end
/// died, etc.)
template <class TRequestResponse, class... Args>
StreamHandler<TRequestResponse> Stream(Args &&... args) {
StreamHandler<TRequestResponse> Stream(Args &&...args) {
return StreamWithLoad<TRequestResponse>(
[](auto *reader) {
typename TRequestResponse::Response response;
@ -133,7 +133,7 @@ class Client {
template <class TRequestResponse, class... Args>
StreamHandler<TRequestResponse> StreamWithLoad(
std::function<typename TRequestResponse::Response(slk::Reader *)> load,
Args &&... args) {
Args &&...args) {
typename TRequestResponse::Request request(std::forward<Args>(args)...);
auto req_type = TRequestResponse::Request::kType;
VLOG(12) << "[RpcClient] sent " << req_type.name;
@ -177,7 +177,7 @@ class Client {
/// RPC call (eg. connection failed, remote end
/// died, etc.)
template <class TRequestResponse, class... Args>
typename TRequestResponse::Response Call(Args &&... args) {
typename TRequestResponse::Response Call(Args &&...args) {
auto stream = Stream<TRequestResponse>(std::forward<Args>(args)...);
return stream.AwaitResponse();
}
@ -186,7 +186,7 @@ class Client {
template <class TRequestResponse, class... Args>
typename TRequestResponse::Response CallWithLoad(
std::function<typename TRequestResponse::Response(slk::Reader *)> load,
Args &&... args) {
Args &&...args) {
auto stream = StreamWithLoad(load, std::forward<Args>(args)...);
return stream.AwaitResponse();
}
@ -194,6 +194,8 @@ class Client {
/// Call this function from another thread to abort a pending RPC call.
void Abort();
const auto &Endpoint() const { return endpoint_; }
private:
io::network::Endpoint endpoint_;
communication::ClientContext *context_;

View File

@ -0,0 +1,27 @@
#pragma once
#include <optional>
#include <string>
namespace storage::replication {
struct ReplicationClientConfig {
std::optional<double> timeout;
struct SSL {
std::string key_file = "";
std::string cert_file = "";
};
std::optional<SSL> ssl;
};
struct ReplicationServerConfig {
struct SSL {
std::string key_file;
std::string cert_file;
std::string ca_file;
bool verify_peer;
};
std::optional<SSL> ssl;
};
} // namespace storage::replication

View File

@ -4,6 +4,7 @@
#include <type_traits>
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "utils/file_locker.hpp"
@ -17,19 +18,28 @@ template <typename>
////// ReplicationClient //////
Storage::ReplicationClient::ReplicationClient(
std::string name, Storage *storage, const io::network::Endpoint &endpoint,
bool use_ssl, const replication::ReplicationMode mode)
: name_(std::move(name)),
storage_(storage),
rpc_context_(use_ssl),
rpc_client_(endpoint, &rpc_context_),
mode_(mode) {
const replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config)
: name_(std::move(name)), storage_(storage), mode_(mode) {
if (config.ssl) {
rpc_context_.emplace(config.ssl->key_file, config.ssl->cert_file);
} else {
rpc_context_.emplace();
}
rpc_client_.emplace(endpoint, &*rpc_context_);
InitializeClient();
if (config.timeout) {
timeout_.emplace(*config.timeout);
timeout_dispatcher_.emplace();
}
}
void Storage::ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
auto stream{
rpc_client_.Stream<HeartbeatRpc>(storage_->last_commit_timestamp_)};
rpc_client_->Stream<HeartbeatRpc>(storage_->last_commit_timestamp_)};
replication::Encoder encoder{stream.GetBuilder()};
// Write epoch id
{
@ -66,7 +76,7 @@ void Storage::ReplicationClient::InitializeClient() {
SnapshotRes Storage::ReplicationClient::TransferSnapshot(
const std::filesystem::path &path) {
auto stream{rpc_client_.Stream<SnapshotRpc>()};
auto stream{rpc_client_->Stream<SnapshotRpc>()};
replication::Encoder encoder(stream.GetBuilder());
encoder.WriteFile(path);
return stream.AwaitResponse();
@ -75,7 +85,7 @@ SnapshotRes Storage::ReplicationClient::TransferSnapshot(
WalFilesRes Storage::ReplicationClient::TransferWalFiles(
const std::vector<std::filesystem::path> &wal_files) {
CHECK(!wal_files.empty()) << "Wal files list is empty!";
auto stream{rpc_client_.Stream<WalFilesRpc>(wal_files.size())};
auto stream{rpc_client_->Stream<WalFilesRpc>(wal_files.size())};
replication::Encoder encoder(stream.GetBuilder());
for (const auto &wal : wal_files) {
DLOG(INFO) << "Sending wal file: " << wal;
@ -87,7 +97,7 @@ WalFilesRes Storage::ReplicationClient::TransferWalFiles(
OnlySnapshotRes Storage::ReplicationClient::TransferOnlySnapshot(
const uint64_t snapshot_timestamp) {
auto stream{rpc_client_.Stream<OnlySnapshotRpc>(snapshot_timestamp)};
auto stream{rpc_client_->Stream<OnlySnapshotRpc>(snapshot_timestamp)};
replication::Encoder encoder{stream.GetBuilder()};
encoder.WriteString(storage_->epoch_id_);
return stream.AwaitResponse();
@ -124,7 +134,7 @@ bool Storage::ReplicationClient::StartTransactionReplication(
replica_state_.store(replication::ReplicaState::INVALID);
LOG(ERROR) << "Couldn't replicate data to " << name_;
thread_pool_.AddTask([this] {
rpc_client_.Abort();
rpc_client_->Abort();
InitializeClient();
});
return false;
@ -142,7 +152,7 @@ void Storage::ReplicationClient::IfStreamingTransaction(
} catch (const rpc::RpcFailedException &) {
LOG(ERROR) << "Couldn't replicate data to " << name_;
thread_pool_.AddTask([this] {
rpc_client_.Abort();
rpc_client_->Abort();
InitializeClient();
});
}
@ -153,6 +163,43 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
if (mode_ == replication::ReplicationMode::ASYNC) {
thread_pool_.AddTask(
[this] { this->FinalizeTransactionReplicationInternal(); });
} else if (timeout_) {
CHECK(mode_ == replication::ReplicationMode::SYNC)
<< "Only SYNC replica can have a timeout.";
CHECK(timeout_dispatcher_) << "Timeout thread is missing";
timeout_dispatcher_->WaitForTaskToFinish();
timeout_dispatcher_->active = true;
thread_pool_.AddTask([&, this] {
this->FinalizeTransactionReplicationInternal();
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
// TimerThread can finish waiting for timeout
timeout_dispatcher_->active = false;
// Notify the main thread
timeout_dispatcher_->main_cv.notify_one();
});
timeout_dispatcher_->StartTimeoutTask(*timeout_);
{
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
// Wait until one of the threads notifies us that they finished executing
// Both threads should first set the active flag to false
timeout_dispatcher_->main_cv.wait(
main_guard, [&] { return !timeout_dispatcher_->active.load(); });
}
// TODO (antonio2368): Document and/or polish SEMI-SYNC to ASYNC fallback.
if (replica_state_ == replication::ReplicaState::REPLICATING) {
mode_ = replication::ReplicationMode::ASYNC;
timeout_.reset();
// This can only happen if we timeouted so we are sure that
// Timeout task finished
// We need to delete timeout dispatcher AFTER the replication
// finished because it tries to acquire the timeout lock
// and acces the `active` variable`
thread_pool_.AddTask([this] { timeout_dispatcher_.reset(); });
}
} else {
FinalizeTransactionReplicationInternal();
}
@ -178,7 +225,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
replica_state_.store(replication::ReplicaState::INVALID);
}
thread_pool_.AddTask([this] {
rpc_client_.Abort();
rpc_client_->Abort();
InitializeClient();
});
}
@ -426,12 +473,38 @@ Storage::ReplicationClient::GetRecoverySteps(
return recovery_steps;
}
////// TimeoutDispatcher //////
void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() {
// Wait for the previous timeout task to finish
std::unique_lock main_guard(main_lock);
main_cv.wait(main_guard, [&] { return finished; });
}
void Storage::ReplicationClient::TimeoutDispatcher::StartTimeoutTask(
const double timeout) {
timeout_pool.AddTask([&, this] {
finished = false;
using std::chrono::steady_clock;
const auto timeout_duration =
std::chrono::duration_cast<steady_clock::duration>(
std::chrono::duration<double>(timeout));
const auto end_time = steady_clock::now() + timeout_duration;
while (active && steady_clock::now() < end_time) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::unique_lock main_guard(main_lock);
finished = true;
active = false;
main_cv.notify_one();
});
}
////// ReplicaStream //////
Storage::ReplicationClient::ReplicaStream::ReplicaStream(
ReplicationClient *self, const uint64_t previous_commit_timestamp,
const uint64_t current_seq_num)
: self_(self),
stream_(self_->rpc_client_.Stream<AppendDeltasRpc>(
stream_(self_->rpc_client_->Stream<AppendDeltasRpc>(
previous_commit_timestamp, current_seq_num)) {
replication::Encoder encoder{stream_.GetBuilder()};
encoder.WriteString(self_->storage_->epoch_id_);
@ -473,7 +546,7 @@ AppendDeltasRes Storage::ReplicationClient::ReplicaStream::Finalize() {
////// CurrentWalHandler //////
Storage::ReplicationClient::CurrentWalHandler::CurrentWalHandler(
ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_.Stream<CurrentWalRpc>()) {}
: self_(self), stream_(self_->rpc_client_->Stream<CurrentWalRpc>()) {}
void Storage::ReplicationClient::CurrentWalHandler::AppendFilename(
const std::string &filename) {

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <chrono>
#include <thread>
#include <variant>
@ -12,6 +13,7 @@
#include "storage/v2/mvcc.hpp"
#include "storage/v2/name_id_mapper.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
@ -27,8 +29,9 @@ namespace storage {
class Storage::ReplicationClient {
public:
ReplicationClient(std::string name, Storage *storage,
const io::network::Endpoint &endpoint, bool use_ssl,
replication::ReplicationMode mode);
const io::network::Endpoint &endpoint,
replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config = {});
// Handler used for transfering the current transaction.
class ReplicaStream {
@ -117,6 +120,12 @@ class Storage::ReplicationClient {
auto State() const { return replica_state_.load(); }
auto Mode() const { return mode_; }
auto Timeout() const { return timeout_; }
const auto &Endpoint() const { return rpc_client_->Endpoint(); }
private:
void FinalizeTransactionReplicationInternal();
@ -150,12 +159,36 @@ class Storage::ReplicationClient {
Storage *storage_;
communication::ClientContext rpc_context_;
rpc::Client rpc_client_;
std::optional<communication::ClientContext> rpc_context_;
std::optional<rpc::Client> rpc_client_;
std::optional<ReplicaStream> replica_stream_;
replication::ReplicationMode mode_{replication::ReplicationMode::SYNC};
// Dispatcher class for timeout tasks
struct TimeoutDispatcher {
explicit TimeoutDispatcher(){};
void WaitForTaskToFinish();
void StartTimeoutTask(double timeout);
// If the Timeout task should continue waiting
std::atomic<bool> active{false};
std::mutex main_lock;
std::condition_variable main_cv;
private:
// if the Timeout task finished executing
bool finished{false};
utils::ThreadPool timeout_pool{1};
};
std::optional<double> timeout_;
std::optional<TimeoutDispatcher> timeout_dispatcher_;
utils::SpinLock client_lock_;
utils::ThreadPool thread_pool_{1};
std::atomic<replication::ReplicaState> replica_state_{

View File

@ -2,16 +2,22 @@
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/transaction.hpp"
#include "utils/exceptions.hpp"
namespace storage {
Storage::ReplicationServer::ReplicationServer(Storage *storage,
io::network::Endpoint endpoint)
Storage::ReplicationServer::ReplicationServer(
Storage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config)
: storage_(storage) {
// Create RPC server.
// TODO (antonio2368): Add support for SSL.
rpc_server_context_.emplace();
if (config.ssl) {
rpc_server_context_.emplace(config.ssl->key_file, config.ssl->cert_file,
config.ssl->ca_file, config.ssl->verify_peer);
} else {
rpc_server_context_.emplace();
}
// NOTE: The replication server must have a single thread for processing
// because there is no need for more processing threads - each replica can
// have only a single main server. Also, the single-threaded guarantee

View File

@ -6,7 +6,9 @@ namespace storage {
class Storage::ReplicationServer {
public:
explicit ReplicationServer(Storage *storage, io::network::Endpoint endpoint);
explicit ReplicationServer(
Storage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config);
ReplicationServer(const ReplicationServer &) = delete;
ReplicationServer(ReplicationServer &&) = delete;
ReplicationServer &operator=(const ReplicationServer &) = delete;

View File

@ -17,6 +17,7 @@
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/indices.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/replication/config.hpp"
#include "utils/file.hpp"
#include "utils/rw_lock.hpp"
#include "utils/spin_lock.hpp"
@ -425,14 +426,22 @@ Storage::Storage(Config config)
// For testing purposes until we can define the instance type from
// a query.
if (FLAGS_main) {
RegisterReplica("REPLICA_SYNC", io::network::Endpoint{"127.0.0.1", 10000});
RegisterReplica("REPLICA_ASYNC", io::network::Endpoint{"127.0.0.1", 10002});
if (RegisterReplica("REPLICA_SYNC",
io::network::Endpoint{"127.0.0.1", 10000},
replication::ReplicationMode::SYNC)
.HasError()) {
LOG(WARNING) << "Couldn't connect to REPLICA_SYNC";
}
if (RegisterReplica("REPLICA_ASYNC",
io::network::Endpoint{"127.0.0.1", 10002},
replication::ReplicationMode::ASYNC)
.HasError()) {
LOG(WARNING) << "Couldn't connect to REPLICA_SYNC";
}
} else if (FLAGS_replica) {
SetReplicationRole<ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000});
} else if (FLAGS_async_replica) {
SetReplicationRole<ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10002});
SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10002});
}
#endif
}
@ -1947,57 +1956,98 @@ uint64_t Storage::CommitTimestamp(
}
#ifdef MG_ENTERPRISE
void Storage::ConfigureReplica(io::network::Endpoint endpoint) {
void Storage::SetReplicaRole(
io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config) {
// We don't want to restart the server if we're already a REPLICA
if (replication_role_ == ReplicationRole::REPLICA) {
return;
}
replication_server_ =
std::make_unique<ReplicationServer>(this, std::move(endpoint));
std::make_unique<ReplicationServer>(this, std::move(endpoint), config);
replication_role_.store(ReplicationRole::REPLICA);
}
void Storage::ConfigureMain() {
void Storage::SetMainReplicationRole() {
// We don't want to generate new epoch_id and do the
// cleanup if we're already a MAIN
if (replication_role_ == ReplicationRole::MAIN) {
return;
}
// Main instance does not need replication server
// This should be always called first so we finalize everything
replication_server_.reset(nullptr);
std::unique_lock engine_guard{engine_lock_};
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_.reset();
{
std::unique_lock engine_guard{engine_lock_};
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_.reset();
}
// Generate new epoch id and save the last one to the history.
if (epoch_history_.size() == kEpochHistoryRetention) {
epoch_history_.pop_front();
}
epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_);
epoch_id_ = utils::GenerateUUID();
}
// Generate new epoch id and save the last one to the history.
if (epoch_history_.size() == kEpochHistoryRetention) {
epoch_history_.pop_front();
}
epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_);
epoch_id_ = utils::GenerateUUID();
replication_role_.store(ReplicationRole::MAIN);
}
void Storage::RegisterReplica(
utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
std::string name, io::network::Endpoint endpoint,
const replication::ReplicationMode replication_mode) {
const replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config) {
// TODO (antonio2368): This shouldn't stop the main instance
CHECK(replication_role_.load() == ReplicationRole::MAIN)
<< "Only main instance can register a replica!";
replication_clients_.WithLock([&](auto &clients) {
if (std::any_of(clients.begin(), clients.end(),
[&](auto &client) { return client->Name() == name; })) {
throw utils::BasicException("Replica with a same name already exists!");
}
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
return std::any_of(clients.begin(), clients.end(),
[&](auto &client) { return client->Name() == name; });
});
auto client = std::make_unique<ReplicationClient>(
std::move(name), this, endpoint, false, replication_mode);
if (name_exists) {
return RegisterReplicaError::NAME_EXISTS;
}
replication_clients_.WithLock(
[&](auto &clients) { clients.push_back(std::move(client)); });
CHECK(replication_mode == replication::ReplicationMode::SYNC ||
!config.timeout)
<< "Only SYNC mode can have a timeout set";
auto client = std::make_unique<ReplicationClient>(
std::move(name), this, endpoint, replication_mode, config);
if (client->State() == replication::ReplicaState::INVALID) {
return RegisterReplicaError::CONNECTION_FAILED;
}
return replication_clients_.WithLock(
[&](auto &clients) -> utils::BasicResult<Storage::RegisterReplicaError> {
// Another thread could have added a client with same name while
// we were connecting to this client.
if (std::any_of(clients.begin(), clients.end(),
[&](auto &other_client) {
return client->Name() == other_client->Name();
})) {
return RegisterReplicaError::NAME_EXISTS;
}
clients.push_back(std::move(client));
return {};
});
}
void Storage::UnregisterReplica(const std::string_view name) {
bool Storage::UnregisterReplica(const std::string_view name) {
CHECK(replication_role_.load() == ReplicationRole::MAIN)
<< "Only main instance can unregister a replica!";
replication_clients_.WithLock([&](auto &clients) {
std::erase_if(clients,
[&](const auto &client) { return client->Name() == name; });
return replication_clients_.WithLock([&](auto &clients) {
return std::erase_if(
clients, [&](const auto &client) { return client->Name() == name; });
});
}
@ -2014,6 +2064,24 @@ std::optional<replication::ReplicaState> Storage::GetReplicaState(
return (*client_it)->State();
});
}
ReplicationRole Storage::GetReplicationRole() const {
return replication_role_;
}
std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
return replication_clients_.WithLock([](auto &clients) {
std::vector<Storage::ReplicaInfo> replica_info;
replica_info.reserve(clients.size());
std::transform(clients.begin(), clients.end(),
std::back_inserter(replica_info),
[](const auto &client) -> ReplicaInfo {
return {client->Name(), client->Mode(), client->Timeout(),
client->Endpoint(), client->State()};
});
return replica_info;
});
}
#endif
} // namespace storage

View File

@ -5,6 +5,7 @@
#include <optional>
#include <shared_mutex>
#include "io/network/endpoint.hpp"
#include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
@ -28,6 +29,7 @@
#ifdef MG_ENTERPRISE
#include "rpc/server.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
@ -413,29 +415,38 @@ class Storage final {
StorageInfo GetInfo() const;
#ifdef MG_ENTERPRISE
template <ReplicationRole role, typename... Args>
void SetReplicationRole(Args &&...args) {
if (replication_role_.load() == role) {
return;
}
#if MG_ENTERPRISE
if constexpr (role == ReplicationRole::REPLICA) {
ConfigureReplica(std::forward<Args>(args)...);
} else if constexpr (role == ReplicationRole::MAIN) {
ConfigureMain(std::forward<Args>(args)...);
}
void SetReplicaRole(io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config = {});
replication_role_.store(role);
}
void SetMainReplicationRole();
void RegisterReplica(std::string name, io::network::Endpoint endpoint,
replication::ReplicationMode replication_mode =
replication::ReplicationMode::SYNC);
void UnregisterReplica(std::string_view name);
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, CONNECTION_FAILED };
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
utils::BasicResult<RegisterReplicaError, void> RegisterReplica(
std::string name, io::network::Endpoint endpoint,
replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config = {});
/// @pre The instance should have a MAIN role
bool UnregisterReplica(std::string_view name);
std::optional<replication::ReplicaState> GetReplicaState(
std::string_view name);
ReplicationRole GetReplicationRole() const;
struct ReplicaInfo {
std::string name;
replication::ReplicationMode mode;
std::optional<double> timeout;
io::network::Endpoint endpoint;
replication::ReplicaState state;
};
std::vector<ReplicaInfo> ReplicasInfo();
#endif
private:
@ -460,8 +471,6 @@ class Storage final {
std::optional<uint64_t> desired_commit_timestamp = {});
#ifdef MG_ENTERPRISE
void ConfigureReplica(io::network::Endpoint endpoint);
void ConfigureMain();
#endif
// Main storage lock.

View File

@ -7,7 +7,7 @@
namespace utils {
template <class TError, class TValue>
template <class TError, class TValue = void>
class [[nodiscard]] BasicResult final {
public:
BasicResult(const TValue &value) : value_(value) {}

View File

@ -13,12 +13,16 @@ void ThreadPool::AddTask(std::function<void()> new_task) {
queue.emplace(std::make_unique<TaskSignature>(std::move(new_task)));
unfinished_tasks_num_.fetch_add(1);
});
std::unique_lock pool_guard(pool_lock_);
queue_cv_.notify_one();
}
void ThreadPool::Shutdown() {
terminate_pool_.store(true);
queue_cv_.notify_all();
{
std::unique_lock pool_guard(pool_lock_);
queue_cv_.notify_all();
}
for (auto &thread : thread_pool_) {
if (thread.joinable()) {

View File

@ -45,11 +45,13 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
replica_store.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000});
main_store.RegisterReplica("REPLICA",
io::network::Endpoint{"127.0.0.1", 10000});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA",
io::network::Endpoint{"127.0.0.1", 10000},
storage::replication::ReplicationMode::SYNC)
.HasError());
// vertex create
// vertex add label
@ -282,8 +284,7 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store1.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000});
storage::Storage replica_store2(
{.durability = {
@ -291,13 +292,18 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store2.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 20000});
replica_store2.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 20000});
main_store.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000});
main_store.RegisterReplica("REPLICA2",
io::network::Endpoint{"127.0.0.1", 20000});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000},
storage::replication::ReplicationMode::SYNC)
.HasError());
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA2",
io::network::Endpoint{"127.0.0.1", 20000},
storage::replication::ReplicationMode::SYNC)
.HasError());
const auto *vertex_label = "label";
const auto *vertex_property = "property";
@ -432,11 +438,13 @@ TEST_F(ReplicationTest, RecoveryProcess) {
.snapshot_wal_mode = storage::Config::Durability::
SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}});
replica_store.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
replica_store.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000});
main_store.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000});
ASSERT_FALSE(main_store
.RegisterReplica(
"REPLICA1", io::network::Endpoint{"127.0.0.1", 10000},
storage::replication::ReplicationMode::SYNC)
.HasError());
ASSERT_EQ(main_store.GetReplicaState("REPLICA1"),
storage::replication::ReplicaState::RECOVERY);
@ -519,12 +527,14 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store_async.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 20000});
replica_store_async.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 20000});
main_store.RegisterReplica("REPLICA_ASYNC",
io::network::Endpoint{"127.0.0.1", 20000},
storage::replication::ReplicationMode::ASYNC);
ASSERT_FALSE(
main_store
.RegisterReplica("REPLICA_ASYNC",
io::network::Endpoint{"127.0.0.1", 20000},
storage::replication::ReplicationMode::ASYNC)
.HasError());
constexpr size_t vertices_create_num = 10;
std::vector<storage::Gid> created_vertices;
@ -576,8 +586,7 @@ TEST_F(ReplicationTest, EpochTest) {
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store1.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000});
storage::Storage replica_store2(
{.items = {.properties_on_edges = true},
@ -587,14 +596,19 @@ TEST_F(ReplicationTest, EpochTest) {
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store2.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10001});
replica_store2.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10001});
main_store.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000},
storage::replication::ReplicationMode::SYNC)
.HasError());
main_store.RegisterReplica("REPLICA2",
io::network::Endpoint{"127.0.0.1", 10001});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA2",
io::network::Endpoint{"127.0.0.1", 10001},
storage::replication::ReplicationMode::SYNC)
.HasError());
std::optional<storage::Gid> vertex_gid;
{
@ -619,9 +633,12 @@ TEST_F(ReplicationTest, EpochTest) {
main_store.UnregisterReplica("REPLICA1");
main_store.UnregisterReplica("REPLICA2");
replica_store1.SetReplicationRole<storage::ReplicationRole::MAIN>();
replica_store1.RegisterReplica("REPLICA2",
io::network::Endpoint{"127.0.0.1", 10001});
replica_store1.SetMainReplicationRole();
ASSERT_FALSE(replica_store1
.RegisterReplica("REPLICA2",
io::network::Endpoint{"127.0.0.1", 10001},
storage::replication::ReplicationMode::SYNC)
.HasError());
{
auto acc = main_store.Access();
@ -642,10 +659,12 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc.Commit().HasError());
}
replica_store1.SetReplicationRole<storage::ReplicationRole::REPLICA>(
io::network::Endpoint{"127.0.0.1", 10000});
main_store.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000});
ASSERT_TRUE(main_store
.RegisterReplica("REPLICA1",
io::network::Endpoint{"127.0.0.1", 10000},
storage::replication::ReplicationMode::SYNC)
.HasError());
{
auto acc = main_store.Access();
@ -662,3 +681,73 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc.Commit().HasError());
}
}
TEST_F(ReplicationTest, ReplicationInformation) {
storage::Storage main_store(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
storage::Storage replica_store1(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
const io::network::Endpoint replica1_endpoint{"127.0.0.1", 10000};
replica_store1.SetReplicaRole(replica1_endpoint);
const io::network::Endpoint replica2_endpoint{"127.0.0.1", 10000};
storage::Storage replica_store2(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::
PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
storage::replication::ReplicationMode::SYNC,
{.timeout = 2.0})
.HasError());
const std::string replica2_name{"REPLICA2"};
ASSERT_FALSE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint,
storage::replication::ReplicationMode::ASYNC)
.HasError());
ASSERT_EQ(main_store.GetReplicationRole(), storage::ReplicationRole::MAIN);
ASSERT_EQ(replica_store1.GetReplicationRole(),
storage::ReplicationRole::REPLICA);
ASSERT_EQ(replica_store2.GetReplicationRole(),
storage::ReplicationRole::REPLICA);
const auto replicas_info = main_store.ReplicasInfo();
ASSERT_EQ(replicas_info.size(), 2);
const auto &first_info = replicas_info[0];
ASSERT_EQ(first_info.name, replica1_name);
ASSERT_EQ(first_info.mode, storage::replication::ReplicationMode::SYNC);
ASSERT_TRUE(first_info.timeout);
ASSERT_EQ(*first_info.timeout, 2.0);
ASSERT_EQ(first_info.endpoint, replica1_endpoint);
ASSERT_EQ(first_info.state, storage::replication::ReplicaState::READY);
const auto &second_info = replicas_info[1];
ASSERT_EQ(second_info.name, replica2_name);
ASSERT_EQ(second_info.mode, storage::replication::ReplicationMode::ASYNC);
ASSERT_FALSE(second_info.timeout);
ASSERT_EQ(second_info.endpoint, replica2_endpoint);
ASSERT_EQ(second_info.state, storage::replication::ReplicaState::READY);
}