diff --git a/src/rpc/client.hpp b/src/rpc/client.hpp index c4160ca93..52a9f7b3a 100644 --- a/src/rpc/client.hpp +++ b/src/rpc/client.hpp @@ -119,7 +119,7 @@ class Client { /// RPC call (eg. connection failed, remote end /// died, etc.) template - StreamHandler Stream(Args &&... args) { + StreamHandler Stream(Args &&...args) { return StreamWithLoad( [](auto *reader) { typename TRequestResponse::Response response; @@ -133,7 +133,7 @@ class Client { template StreamHandler StreamWithLoad( std::function load, - Args &&... args) { + Args &&...args) { typename TRequestResponse::Request request(std::forward(args)...); auto req_type = TRequestResponse::Request::kType; VLOG(12) << "[RpcClient] sent " << req_type.name; @@ -177,7 +177,7 @@ class Client { /// RPC call (eg. connection failed, remote end /// died, etc.) template - typename TRequestResponse::Response Call(Args &&... args) { + typename TRequestResponse::Response Call(Args &&...args) { auto stream = Stream(std::forward(args)...); return stream.AwaitResponse(); } @@ -186,7 +186,7 @@ class Client { template typename TRequestResponse::Response CallWithLoad( std::function load, - Args &&... args) { + Args &&...args) { auto stream = StreamWithLoad(load, std::forward(args)...); return stream.AwaitResponse(); } @@ -194,6 +194,8 @@ class Client { /// Call this function from another thread to abort a pending RPC call. void Abort(); + const auto &Endpoint() const { return endpoint_; } + private: io::network::Endpoint endpoint_; communication::ClientContext *context_; diff --git a/src/storage/v2/replication/config.hpp b/src/storage/v2/replication/config.hpp new file mode 100644 index 000000000..6c6a9078b --- /dev/null +++ b/src/storage/v2/replication/config.hpp @@ -0,0 +1,27 @@ +#pragma once +#include +#include + +namespace storage::replication { +struct ReplicationClientConfig { + std::optional timeout; + + struct SSL { + std::string key_file = ""; + std::string cert_file = ""; + }; + + std::optional ssl; +}; + +struct ReplicationServerConfig { + struct SSL { + std::string key_file; + std::string cert_file; + std::string ca_file; + bool verify_peer; + }; + + std::optional ssl; +}; +} // namespace storage::replication diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index cedcb6f02..344628955 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -4,6 +4,7 @@ #include #include "storage/v2/durability/durability.hpp" +#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/enums.hpp" #include "utils/file_locker.hpp" @@ -17,19 +18,28 @@ template ////// ReplicationClient ////// Storage::ReplicationClient::ReplicationClient( std::string name, Storage *storage, const io::network::Endpoint &endpoint, - bool use_ssl, const replication::ReplicationMode mode) - : name_(std::move(name)), - storage_(storage), - rpc_context_(use_ssl), - rpc_client_(endpoint, &rpc_context_), - mode_(mode) { + const replication::ReplicationMode mode, + const replication::ReplicationClientConfig &config) + : name_(std::move(name)), storage_(storage), mode_(mode) { + if (config.ssl) { + rpc_context_.emplace(config.ssl->key_file, config.ssl->cert_file); + } else { + rpc_context_.emplace(); + } + + rpc_client_.emplace(endpoint, &*rpc_context_); InitializeClient(); + + if (config.timeout) { + timeout_.emplace(*config.timeout); + timeout_dispatcher_.emplace(); + } } void Storage::ReplicationClient::InitializeClient() { uint64_t current_commit_timestamp{kTimestampInitialId}; auto stream{ - rpc_client_.Stream(storage_->last_commit_timestamp_)}; + rpc_client_->Stream(storage_->last_commit_timestamp_)}; replication::Encoder encoder{stream.GetBuilder()}; // Write epoch id { @@ -66,7 +76,7 @@ void Storage::ReplicationClient::InitializeClient() { SnapshotRes Storage::ReplicationClient::TransferSnapshot( const std::filesystem::path &path) { - auto stream{rpc_client_.Stream()}; + auto stream{rpc_client_->Stream()}; replication::Encoder encoder(stream.GetBuilder()); encoder.WriteFile(path); return stream.AwaitResponse(); @@ -75,7 +85,7 @@ SnapshotRes Storage::ReplicationClient::TransferSnapshot( WalFilesRes Storage::ReplicationClient::TransferWalFiles( const std::vector &wal_files) { CHECK(!wal_files.empty()) << "Wal files list is empty!"; - auto stream{rpc_client_.Stream(wal_files.size())}; + auto stream{rpc_client_->Stream(wal_files.size())}; replication::Encoder encoder(stream.GetBuilder()); for (const auto &wal : wal_files) { DLOG(INFO) << "Sending wal file: " << wal; @@ -87,7 +97,7 @@ WalFilesRes Storage::ReplicationClient::TransferWalFiles( OnlySnapshotRes Storage::ReplicationClient::TransferOnlySnapshot( const uint64_t snapshot_timestamp) { - auto stream{rpc_client_.Stream(snapshot_timestamp)}; + auto stream{rpc_client_->Stream(snapshot_timestamp)}; replication::Encoder encoder{stream.GetBuilder()}; encoder.WriteString(storage_->epoch_id_); return stream.AwaitResponse(); @@ -124,7 +134,7 @@ bool Storage::ReplicationClient::StartTransactionReplication( replica_state_.store(replication::ReplicaState::INVALID); LOG(ERROR) << "Couldn't replicate data to " << name_; thread_pool_.AddTask([this] { - rpc_client_.Abort(); + rpc_client_->Abort(); InitializeClient(); }); return false; @@ -142,7 +152,7 @@ void Storage::ReplicationClient::IfStreamingTransaction( } catch (const rpc::RpcFailedException &) { LOG(ERROR) << "Couldn't replicate data to " << name_; thread_pool_.AddTask([this] { - rpc_client_.Abort(); + rpc_client_->Abort(); InitializeClient(); }); } @@ -153,6 +163,43 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() { if (mode_ == replication::ReplicationMode::ASYNC) { thread_pool_.AddTask( [this] { this->FinalizeTransactionReplicationInternal(); }); + } else if (timeout_) { + CHECK(mode_ == replication::ReplicationMode::SYNC) + << "Only SYNC replica can have a timeout."; + CHECK(timeout_dispatcher_) << "Timeout thread is missing"; + timeout_dispatcher_->WaitForTaskToFinish(); + + timeout_dispatcher_->active = true; + thread_pool_.AddTask([&, this] { + this->FinalizeTransactionReplicationInternal(); + std::unique_lock main_guard(timeout_dispatcher_->main_lock); + // TimerThread can finish waiting for timeout + timeout_dispatcher_->active = false; + // Notify the main thread + timeout_dispatcher_->main_cv.notify_one(); + }); + + timeout_dispatcher_->StartTimeoutTask(*timeout_); + + { + std::unique_lock main_guard(timeout_dispatcher_->main_lock); + // Wait until one of the threads notifies us that they finished executing + // Both threads should first set the active flag to false + timeout_dispatcher_->main_cv.wait( + main_guard, [&] { return !timeout_dispatcher_->active.load(); }); + } + + // TODO (antonio2368): Document and/or polish SEMI-SYNC to ASYNC fallback. + if (replica_state_ == replication::ReplicaState::REPLICATING) { + mode_ = replication::ReplicationMode::ASYNC; + timeout_.reset(); + // This can only happen if we timeouted so we are sure that + // Timeout task finished + // We need to delete timeout dispatcher AFTER the replication + // finished because it tries to acquire the timeout lock + // and acces the `active` variable` + thread_pool_.AddTask([this] { timeout_dispatcher_.reset(); }); + } } else { FinalizeTransactionReplicationInternal(); } @@ -178,7 +225,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() { replica_state_.store(replication::ReplicaState::INVALID); } thread_pool_.AddTask([this] { - rpc_client_.Abort(); + rpc_client_->Abort(); InitializeClient(); }); } @@ -426,12 +473,38 @@ Storage::ReplicationClient::GetRecoverySteps( return recovery_steps; } +////// TimeoutDispatcher ////// +void Storage::ReplicationClient::TimeoutDispatcher::WaitForTaskToFinish() { + // Wait for the previous timeout task to finish + std::unique_lock main_guard(main_lock); + main_cv.wait(main_guard, [&] { return finished; }); +} + +void Storage::ReplicationClient::TimeoutDispatcher::StartTimeoutTask( + const double timeout) { + timeout_pool.AddTask([&, this] { + finished = false; + using std::chrono::steady_clock; + const auto timeout_duration = + std::chrono::duration_cast( + std::chrono::duration(timeout)); + const auto end_time = steady_clock::now() + timeout_duration; + while (active && steady_clock::now() < end_time) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + std::unique_lock main_guard(main_lock); + finished = true; + active = false; + main_cv.notify_one(); + }); +} ////// ReplicaStream ////// Storage::ReplicationClient::ReplicaStream::ReplicaStream( ReplicationClient *self, const uint64_t previous_commit_timestamp, const uint64_t current_seq_num) : self_(self), - stream_(self_->rpc_client_.Stream( + stream_(self_->rpc_client_->Stream( previous_commit_timestamp, current_seq_num)) { replication::Encoder encoder{stream_.GetBuilder()}; encoder.WriteString(self_->storage_->epoch_id_); @@ -473,7 +546,7 @@ AppendDeltasRes Storage::ReplicationClient::ReplicaStream::Finalize() { ////// CurrentWalHandler ////// Storage::ReplicationClient::CurrentWalHandler::CurrentWalHandler( ReplicationClient *self) - : self_(self), stream_(self_->rpc_client_.Stream()) {} + : self_(self), stream_(self_->rpc_client_->Stream()) {} void Storage::ReplicationClient::CurrentWalHandler::AppendFilename( const std::string &filename) { diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index d4a37c698..1c50bc882 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -12,6 +13,7 @@ #include "storage/v2/mvcc.hpp" #include "storage/v2/name_id_mapper.hpp" #include "storage/v2/property_value.hpp" +#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/rpc.hpp" #include "storage/v2/replication/serialization.hpp" @@ -27,8 +29,9 @@ namespace storage { class Storage::ReplicationClient { public: ReplicationClient(std::string name, Storage *storage, - const io::network::Endpoint &endpoint, bool use_ssl, - replication::ReplicationMode mode); + const io::network::Endpoint &endpoint, + replication::ReplicationMode mode, + const replication::ReplicationClientConfig &config = {}); // Handler used for transfering the current transaction. class ReplicaStream { @@ -117,6 +120,12 @@ class Storage::ReplicationClient { auto State() const { return replica_state_.load(); } + auto Mode() const { return mode_; } + + auto Timeout() const { return timeout_; } + + const auto &Endpoint() const { return rpc_client_->Endpoint(); } + private: void FinalizeTransactionReplicationInternal(); @@ -150,12 +159,36 @@ class Storage::ReplicationClient { Storage *storage_; - communication::ClientContext rpc_context_; - rpc::Client rpc_client_; + std::optional rpc_context_; + std::optional rpc_client_; std::optional replica_stream_; replication::ReplicationMode mode_{replication::ReplicationMode::SYNC}; + // Dispatcher class for timeout tasks + struct TimeoutDispatcher { + explicit TimeoutDispatcher(){}; + + void WaitForTaskToFinish(); + + void StartTimeoutTask(double timeout); + + // If the Timeout task should continue waiting + std::atomic active{false}; + + std::mutex main_lock; + std::condition_variable main_cv; + + private: + // if the Timeout task finished executing + bool finished{false}; + + utils::ThreadPool timeout_pool{1}; + }; + + std::optional timeout_; + std::optional timeout_dispatcher_; + utils::SpinLock client_lock_; utils::ThreadPool thread_pool_{1}; std::atomic replica_state_{ diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index 19516ecf2..ad0de8a8c 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -2,16 +2,22 @@ #include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/snapshot.hpp" +#include "storage/v2/replication/config.hpp" #include "storage/v2/transaction.hpp" #include "utils/exceptions.hpp" namespace storage { -Storage::ReplicationServer::ReplicationServer(Storage *storage, - io::network::Endpoint endpoint) +Storage::ReplicationServer::ReplicationServer( + Storage *storage, io::network::Endpoint endpoint, + const replication::ReplicationServerConfig &config) : storage_(storage) { // Create RPC server. - // TODO (antonio2368): Add support for SSL. - rpc_server_context_.emplace(); + if (config.ssl) { + rpc_server_context_.emplace(config.ssl->key_file, config.ssl->cert_file, + config.ssl->ca_file, config.ssl->verify_peer); + } else { + rpc_server_context_.emplace(); + } // NOTE: The replication server must have a single thread for processing // because there is no need for more processing threads - each replica can // have only a single main server. Also, the single-threaded guarantee diff --git a/src/storage/v2/replication/replication_server.hpp b/src/storage/v2/replication/replication_server.hpp index c8cb9e093..06c6d9a98 100644 --- a/src/storage/v2/replication/replication_server.hpp +++ b/src/storage/v2/replication/replication_server.hpp @@ -6,7 +6,9 @@ namespace storage { class Storage::ReplicationServer { public: - explicit ReplicationServer(Storage *storage, io::network::Endpoint endpoint); + explicit ReplicationServer( + Storage *storage, io::network::Endpoint endpoint, + const replication::ReplicationServerConfig &config); ReplicationServer(const ReplicationServer &) = delete; ReplicationServer(ReplicationServer &&) = delete; ReplicationServer &operator=(const ReplicationServer &) = delete; diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index db4b5d777..fd72f211c 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -17,6 +17,7 @@ #include "storage/v2/durability/wal.hpp" #include "storage/v2/indices.hpp" #include "storage/v2/mvcc.hpp" +#include "storage/v2/replication/config.hpp" #include "utils/file.hpp" #include "utils/rw_lock.hpp" #include "utils/spin_lock.hpp" @@ -425,14 +426,22 @@ Storage::Storage(Config config) // For testing purposes until we can define the instance type from // a query. if (FLAGS_main) { - RegisterReplica("REPLICA_SYNC", io::network::Endpoint{"127.0.0.1", 10000}); - RegisterReplica("REPLICA_ASYNC", io::network::Endpoint{"127.0.0.1", 10002}); + if (RegisterReplica("REPLICA_SYNC", + io::network::Endpoint{"127.0.0.1", 10000}, + replication::ReplicationMode::SYNC) + .HasError()) { + LOG(WARNING) << "Couldn't connect to REPLICA_SYNC"; + } + if (RegisterReplica("REPLICA_ASYNC", + io::network::Endpoint{"127.0.0.1", 10002}, + replication::ReplicationMode::ASYNC) + .HasError()) { + LOG(WARNING) << "Couldn't connect to REPLICA_SYNC"; + } } else if (FLAGS_replica) { - SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 10000}); + SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000}); } else if (FLAGS_async_replica) { - SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 10002}); + SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10002}); } #endif } @@ -1947,57 +1956,98 @@ uint64_t Storage::CommitTimestamp( } #ifdef MG_ENTERPRISE -void Storage::ConfigureReplica(io::network::Endpoint endpoint) { +void Storage::SetReplicaRole( + io::network::Endpoint endpoint, + const replication::ReplicationServerConfig &config) { + // We don't want to restart the server if we're already a REPLICA + if (replication_role_ == ReplicationRole::REPLICA) { + return; + } + replication_server_ = - std::make_unique(this, std::move(endpoint)); + std::make_unique(this, std::move(endpoint), config); + + replication_role_.store(ReplicationRole::REPLICA); } -void Storage::ConfigureMain() { +void Storage::SetMainReplicationRole() { + // We don't want to generate new epoch_id and do the + // cleanup if we're already a MAIN + if (replication_role_ == ReplicationRole::MAIN) { + return; + } + // Main instance does not need replication server // This should be always called first so we finalize everything replication_server_.reset(nullptr); - std::unique_lock engine_guard{engine_lock_}; - if (wal_file_) { - wal_file_->FinalizeWal(); - wal_file_.reset(); + { + std::unique_lock engine_guard{engine_lock_}; + if (wal_file_) { + wal_file_->FinalizeWal(); + wal_file_.reset(); + } + + // Generate new epoch id and save the last one to the history. + if (epoch_history_.size() == kEpochHistoryRetention) { + epoch_history_.pop_front(); + } + epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_); + epoch_id_ = utils::GenerateUUID(); } - // Generate new epoch id and save the last one to the history. - if (epoch_history_.size() == kEpochHistoryRetention) { - epoch_history_.pop_front(); - } - epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_); - epoch_id_ = utils::GenerateUUID(); + replication_role_.store(ReplicationRole::MAIN); } -void Storage::RegisterReplica( +utils::BasicResult Storage::RegisterReplica( std::string name, io::network::Endpoint endpoint, - const replication::ReplicationMode replication_mode) { + const replication::ReplicationMode replication_mode, + const replication::ReplicationClientConfig &config) { // TODO (antonio2368): This shouldn't stop the main instance CHECK(replication_role_.load() == ReplicationRole::MAIN) << "Only main instance can register a replica!"; - replication_clients_.WithLock([&](auto &clients) { - if (std::any_of(clients.begin(), clients.end(), - [&](auto &client) { return client->Name() == name; })) { - throw utils::BasicException("Replica with a same name already exists!"); - } + const bool name_exists = replication_clients_.WithLock([&](auto &clients) { + return std::any_of(clients.begin(), clients.end(), + [&](auto &client) { return client->Name() == name; }); }); - auto client = std::make_unique( - std::move(name), this, endpoint, false, replication_mode); + if (name_exists) { + return RegisterReplicaError::NAME_EXISTS; + } - replication_clients_.WithLock( - [&](auto &clients) { clients.push_back(std::move(client)); }); + CHECK(replication_mode == replication::ReplicationMode::SYNC || + !config.timeout) + << "Only SYNC mode can have a timeout set"; + + auto client = std::make_unique( + std::move(name), this, endpoint, replication_mode, config); + if (client->State() == replication::ReplicaState::INVALID) { + return RegisterReplicaError::CONNECTION_FAILED; + } + + return replication_clients_.WithLock( + [&](auto &clients) -> utils::BasicResult { + // Another thread could have added a client with same name while + // we were connecting to this client. + if (std::any_of(clients.begin(), clients.end(), + [&](auto &other_client) { + return client->Name() == other_client->Name(); + })) { + return RegisterReplicaError::NAME_EXISTS; + } + + clients.push_back(std::move(client)); + return {}; + }); } -void Storage::UnregisterReplica(const std::string_view name) { +bool Storage::UnregisterReplica(const std::string_view name) { CHECK(replication_role_.load() == ReplicationRole::MAIN) << "Only main instance can unregister a replica!"; - replication_clients_.WithLock([&](auto &clients) { - std::erase_if(clients, - [&](const auto &client) { return client->Name() == name; }); + return replication_clients_.WithLock([&](auto &clients) { + return std::erase_if( + clients, [&](const auto &client) { return client->Name() == name; }); }); } @@ -2014,6 +2064,24 @@ std::optional Storage::GetReplicaState( return (*client_it)->State(); }); } + +ReplicationRole Storage::GetReplicationRole() const { + return replication_role_; +} + +std::vector Storage::ReplicasInfo() { + return replication_clients_.WithLock([](auto &clients) { + std::vector replica_info; + replica_info.reserve(clients.size()); + std::transform(clients.begin(), clients.end(), + std::back_inserter(replica_info), + [](const auto &client) -> ReplicaInfo { + return {client->Name(), client->Mode(), client->Timeout(), + client->Endpoint(), client->State()}; + }); + return replica_info; + }); +} #endif } // namespace storage diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index e84f28e24..130fad6f4 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -5,6 +5,7 @@ #include #include +#include "io/network/endpoint.hpp" #include "storage/v2/commit_log.hpp" #include "storage/v2/config.hpp" #include "storage/v2/constraints.hpp" @@ -28,6 +29,7 @@ #ifdef MG_ENTERPRISE #include "rpc/server.hpp" +#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/rpc.hpp" #include "storage/v2/replication/serialization.hpp" @@ -413,29 +415,38 @@ class Storage final { StorageInfo GetInfo() const; -#ifdef MG_ENTERPRISE - template - void SetReplicationRole(Args &&...args) { - if (replication_role_.load() == role) { - return; - } +#if MG_ENTERPRISE - if constexpr (role == ReplicationRole::REPLICA) { - ConfigureReplica(std::forward(args)...); - } else if constexpr (role == ReplicationRole::MAIN) { - ConfigureMain(std::forward(args)...); - } + void SetReplicaRole(io::network::Endpoint endpoint, + const replication::ReplicationServerConfig &config = {}); - replication_role_.store(role); - } + void SetMainReplicationRole(); - void RegisterReplica(std::string name, io::network::Endpoint endpoint, - replication::ReplicationMode replication_mode = - replication::ReplicationMode::SYNC); - void UnregisterReplica(std::string_view name); + enum class RegisterReplicaError : uint8_t { NAME_EXISTS, CONNECTION_FAILED }; + + /// @pre The instance should have a MAIN role + /// @pre Timeout can only be set for SYNC replication + utils::BasicResult RegisterReplica( + std::string name, io::network::Endpoint endpoint, + replication::ReplicationMode replication_mode, + const replication::ReplicationClientConfig &config = {}); + /// @pre The instance should have a MAIN role + bool UnregisterReplica(std::string_view name); std::optional GetReplicaState( std::string_view name); + + ReplicationRole GetReplicationRole() const; + + struct ReplicaInfo { + std::string name; + replication::ReplicationMode mode; + std::optional timeout; + io::network::Endpoint endpoint; + replication::ReplicaState state; + }; + + std::vector ReplicasInfo(); #endif private: @@ -460,8 +471,6 @@ class Storage final { std::optional desired_commit_timestamp = {}); #ifdef MG_ENTERPRISE - void ConfigureReplica(io::network::Endpoint endpoint); - void ConfigureMain(); #endif // Main storage lock. diff --git a/src/utils/result.hpp b/src/utils/result.hpp index e7316388c..dca7eb235 100644 --- a/src/utils/result.hpp +++ b/src/utils/result.hpp @@ -7,7 +7,7 @@ namespace utils { -template +template class [[nodiscard]] BasicResult final { public: BasicResult(const TValue &value) : value_(value) {} diff --git a/src/utils/thread_pool.cpp b/src/utils/thread_pool.cpp index 4155c663e..3a4420eaf 100644 --- a/src/utils/thread_pool.cpp +++ b/src/utils/thread_pool.cpp @@ -13,12 +13,16 @@ void ThreadPool::AddTask(std::function new_task) { queue.emplace(std::make_unique(std::move(new_task))); unfinished_tasks_num_.fetch_add(1); }); + std::unique_lock pool_guard(pool_lock_); queue_cv_.notify_one(); } void ThreadPool::Shutdown() { terminate_pool_.store(true); - queue_cv_.notify_all(); + { + std::unique_lock pool_guard(pool_lock_); + queue_cv_.notify_all(); + } for (auto &thread : thread_pool_) { if (thread.joinable()) { diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index 23dae1d67..cb33b0d8a 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -45,11 +45,13 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: PERIODIC_SNAPSHOT_WITH_WAL, }}); - replica_store.SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 10000}); + replica_store.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000}); - main_store.RegisterReplica("REPLICA", - io::network::Endpoint{"127.0.0.1", 10000}); + ASSERT_FALSE(main_store + .RegisterReplica("REPLICA", + io::network::Endpoint{"127.0.0.1", 10000}, + storage::replication::ReplicationMode::SYNC) + .HasError()); // vertex create // vertex add label @@ -282,8 +284,7 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: PERIODIC_SNAPSHOT_WITH_WAL, }}); - replica_store1.SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 10000}); + replica_store1.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000}); storage::Storage replica_store2( {.durability = { @@ -291,13 +292,18 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: PERIODIC_SNAPSHOT_WITH_WAL, }}); - replica_store2.SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 20000}); + replica_store2.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 20000}); - main_store.RegisterReplica("REPLICA1", - io::network::Endpoint{"127.0.0.1", 10000}); - main_store.RegisterReplica("REPLICA2", - io::network::Endpoint{"127.0.0.1", 20000}); + ASSERT_FALSE(main_store + .RegisterReplica("REPLICA1", + io::network::Endpoint{"127.0.0.1", 10000}, + storage::replication::ReplicationMode::SYNC) + .HasError()); + ASSERT_FALSE(main_store + .RegisterReplica("REPLICA2", + io::network::Endpoint{"127.0.0.1", 20000}, + storage::replication::ReplicationMode::SYNC) + .HasError()); const auto *vertex_label = "label"; const auto *vertex_property = "property"; @@ -432,11 +438,13 @@ TEST_F(ReplicationTest, RecoveryProcess) { .snapshot_wal_mode = storage::Config::Durability:: SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}}); - replica_store.SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 10000}); + replica_store.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000}); - main_store.RegisterReplica("REPLICA1", - io::network::Endpoint{"127.0.0.1", 10000}); + ASSERT_FALSE(main_store + .RegisterReplica( + "REPLICA1", io::network::Endpoint{"127.0.0.1", 10000}, + storage::replication::ReplicationMode::SYNC) + .HasError()); ASSERT_EQ(main_store.GetReplicaState("REPLICA1"), storage::replication::ReplicaState::RECOVERY); @@ -519,12 +527,14 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) { PERIODIC_SNAPSHOT_WITH_WAL, }}); - replica_store_async.SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 20000}); + replica_store_async.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 20000}); - main_store.RegisterReplica("REPLICA_ASYNC", - io::network::Endpoint{"127.0.0.1", 20000}, - storage::replication::ReplicationMode::ASYNC); + ASSERT_FALSE( + main_store + .RegisterReplica("REPLICA_ASYNC", + io::network::Endpoint{"127.0.0.1", 20000}, + storage::replication::ReplicationMode::ASYNC) + .HasError()); constexpr size_t vertices_create_num = 10; std::vector created_vertices; @@ -576,8 +586,7 @@ TEST_F(ReplicationTest, EpochTest) { PERIODIC_SNAPSHOT_WITH_WAL, }}); - replica_store1.SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 10000}); + replica_store1.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000}); storage::Storage replica_store2( {.items = {.properties_on_edges = true}, @@ -587,14 +596,19 @@ TEST_F(ReplicationTest, EpochTest) { PERIODIC_SNAPSHOT_WITH_WAL, }}); - replica_store2.SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 10001}); + replica_store2.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10001}); - main_store.RegisterReplica("REPLICA1", - io::network::Endpoint{"127.0.0.1", 10000}); + ASSERT_FALSE(main_store + .RegisterReplica("REPLICA1", + io::network::Endpoint{"127.0.0.1", 10000}, + storage::replication::ReplicationMode::SYNC) + .HasError()); - main_store.RegisterReplica("REPLICA2", - io::network::Endpoint{"127.0.0.1", 10001}); + ASSERT_FALSE(main_store + .RegisterReplica("REPLICA2", + io::network::Endpoint{"127.0.0.1", 10001}, + storage::replication::ReplicationMode::SYNC) + .HasError()); std::optional vertex_gid; { @@ -619,9 +633,12 @@ TEST_F(ReplicationTest, EpochTest) { main_store.UnregisterReplica("REPLICA1"); main_store.UnregisterReplica("REPLICA2"); - replica_store1.SetReplicationRole(); - replica_store1.RegisterReplica("REPLICA2", - io::network::Endpoint{"127.0.0.1", 10001}); + replica_store1.SetMainReplicationRole(); + ASSERT_FALSE(replica_store1 + .RegisterReplica("REPLICA2", + io::network::Endpoint{"127.0.0.1", 10001}, + storage::replication::ReplicationMode::SYNC) + .HasError()); { auto acc = main_store.Access(); @@ -642,10 +659,12 @@ TEST_F(ReplicationTest, EpochTest) { ASSERT_FALSE(acc.Commit().HasError()); } - replica_store1.SetReplicationRole( - io::network::Endpoint{"127.0.0.1", 10000}); - main_store.RegisterReplica("REPLICA1", - io::network::Endpoint{"127.0.0.1", 10000}); + replica_store1.SetReplicaRole(io::network::Endpoint{"127.0.0.1", 10000}); + ASSERT_TRUE(main_store + .RegisterReplica("REPLICA1", + io::network::Endpoint{"127.0.0.1", 10000}, + storage::replication::ReplicationMode::SYNC) + .HasError()); { auto acc = main_store.Access(); @@ -662,3 +681,73 @@ TEST_F(ReplicationTest, EpochTest) { ASSERT_FALSE(acc.Commit().HasError()); } } + +TEST_F(ReplicationTest, ReplicationInformation) { + storage::Storage main_store( + {.items = {.properties_on_edges = true}, + .durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: + PERIODIC_SNAPSHOT_WITH_WAL, + }}); + + storage::Storage replica_store1( + {.items = {.properties_on_edges = true}, + .durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: + PERIODIC_SNAPSHOT_WITH_WAL, + }}); + + const io::network::Endpoint replica1_endpoint{"127.0.0.1", 10000}; + replica_store1.SetReplicaRole(replica1_endpoint); + + const io::network::Endpoint replica2_endpoint{"127.0.0.1", 10000}; + storage::Storage replica_store2( + {.items = {.properties_on_edges = true}, + .durability = { + .storage_directory = storage_directory, + .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode:: + PERIODIC_SNAPSHOT_WITH_WAL, + }}); + + replica_store2.SetReplicaRole(replica2_endpoint); + + const std::string replica1_name{"REPLICA1"}; + ASSERT_FALSE(main_store + .RegisterReplica(replica1_name, replica1_endpoint, + storage::replication::ReplicationMode::SYNC, + {.timeout = 2.0}) + .HasError()); + + const std::string replica2_name{"REPLICA2"}; + ASSERT_FALSE( + main_store + .RegisterReplica(replica2_name, replica2_endpoint, + storage::replication::ReplicationMode::ASYNC) + .HasError()); + + ASSERT_EQ(main_store.GetReplicationRole(), storage::ReplicationRole::MAIN); + ASSERT_EQ(replica_store1.GetReplicationRole(), + storage::ReplicationRole::REPLICA); + ASSERT_EQ(replica_store2.GetReplicationRole(), + storage::ReplicationRole::REPLICA); + + const auto replicas_info = main_store.ReplicasInfo(); + ASSERT_EQ(replicas_info.size(), 2); + + const auto &first_info = replicas_info[0]; + ASSERT_EQ(first_info.name, replica1_name); + ASSERT_EQ(first_info.mode, storage::replication::ReplicationMode::SYNC); + ASSERT_TRUE(first_info.timeout); + ASSERT_EQ(*first_info.timeout, 2.0); + ASSERT_EQ(first_info.endpoint, replica1_endpoint); + ASSERT_EQ(first_info.state, storage::replication::ReplicaState::READY); + + const auto &second_info = replicas_info[1]; + ASSERT_EQ(second_info.name, replica2_name); + ASSERT_EQ(second_info.mode, storage::replication::ReplicationMode::ASYNC); + ASSERT_FALSE(second_info.timeout); + ASSERT_EQ(second_info.endpoint, replica2_endpoint); + ASSERT_EQ(second_info.state, storage::replication::ReplicaState::READY); +}