diff --git a/.clang-tidy b/.clang-tidy index 35d6c9b84..88a06f91b 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -88,4 +88,3 @@ CheckOptions: - key: modernize-use-nullptr.NullMacros value: 'NULL' ... - diff --git a/src/memgraph.cpp b/src/memgraph.cpp index f0872f647..d1a972c7c 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -252,6 +252,11 @@ DEFINE_double(query_execution_timeout_sec, 600, "Maximum allowed query execution time. Queries exceeding this " "limit will be aborted. Value of 0 means no limit."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint64(replication_replica_check_frequency_sec, 1, + "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: " + "The MAIN instance allocates a new thread for each REPLICA."); + // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint64( memory_limit, 0, @@ -1216,6 +1221,7 @@ int main(int argc, char **argv) { &db, {.query = {.allow_load_csv = FLAGS_allow_load_csv}, .execution_timeout_sec = FLAGS_query_execution_timeout_sec, + .replication_replica_check_frequency = std::chrono::seconds(FLAGS_replication_replica_check_frequency_sec), .default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers, .default_pulsar_service_url = FLAGS_pulsar_service_url, .stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries, diff --git a/src/query/config.hpp b/src/query/config.hpp index 0ddaba11b..d3d6c1d8d 100644 --- a/src/query/config.hpp +++ b/src/query/config.hpp @@ -21,6 +21,8 @@ struct InterpreterConfig { // The default execution timeout is 10 minutes. double execution_timeout_sec{600.0}; + // The same as \ref memgraph::storage::replication::ReplicationClientConfig + std::chrono::seconds replication_replica_check_frequency{1}; std::string default_kafka_bootstrap_servers; std::string default_pulsar_service_url; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index af0588e54..63ee6460e 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -160,7 +160,8 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. void RegisterReplica(const std::string &name, const std::string &socket_address, - const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) override { + const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout, + const std::chrono::seconds replica_check_frequency) override { if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) { // replica can't register another replica throw QueryRuntimeException("Replica can't register another replica!"); @@ -182,8 +183,9 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort); if (maybe_ip_and_port) { auto [ip, port] = *maybe_ip_and_port; - auto ret = - db_->RegisterReplica(name, {std::move(ip), port}, repl_mode, {.timeout = timeout, .ssl = std::nullopt}); + auto ret = db_->RegisterReplica( + name, {std::move(ip), port}, repl_mode, + {.timeout = timeout, .replica_check_frequency = replica_check_frequency, .ssl = std::nullopt}); if (ret.HasError()) { throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name)); } @@ -467,6 +469,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & const auto &sync_mode = repl_query->sync_mode_; auto socket_address = repl_query->socket_address_->Accept(evaluator); auto timeout = EvaluateOptionalExpression(repl_query->timeout_, &evaluator); + const auto replica_check_frequency = interpreter_context->config.replication_replica_check_frequency; std::optional<double> maybe_timeout; if (timeout.IsDouble()) { maybe_timeout = timeout.ValueDouble(); @@ -474,8 +477,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & maybe_timeout = static_cast<double>(timeout.ValueInt()); } callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode, - maybe_timeout]() mutable { - handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout); + maybe_timeout, replica_check_frequency]() mutable { + handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout, + replica_check_frequency); return std::vector<std::vector<TypedValue>>(); }; notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICA, diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 8b298d72e..ef790d738 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -137,7 +137,8 @@ class ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. virtual void RegisterReplica(const std::string &name, const std::string &socket_address, - const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) = 0; + const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout, + const std::chrono::seconds replica_check_frequency) = 0; /// @throw QueryRuntimeException if an error ocurred. virtual void DropReplica(const std::string &replica_name) = 0; diff --git a/src/storage/v2/replication/config.hpp b/src/storage/v2/replication/config.hpp index 93700bc66..124e7e9d2 100644 --- a/src/storage/v2/replication/config.hpp +++ b/src/storage/v2/replication/config.hpp @@ -16,6 +16,10 @@ namespace memgraph::storage::replication { struct ReplicationClientConfig { std::optional<double> timeout; + // The default delay between main checking/pinging replicas is 1s because + // that seems like a reasonable timeframe in which main should notice a + // replica is down. + std::chrono::seconds replica_check_frequency{1}; struct SSL { std::string key_file = ""; diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 1789d3936..1fcdcb2c5 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -41,12 +41,49 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage } rpc_client_.emplace(endpoint, &*rpc_context_); - TryInitializeClient(); + TryInitializeClientSync(); if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) { timeout_.emplace(*config.timeout); timeout_dispatcher_.emplace(); } + + // Help the user to get the most accurate replica state possible. + if (config.replica_check_frequency > std::chrono::seconds(0)) { + replica_checker_.Run("Replica Checker", config.replica_check_frequency, [&] { FrequentCheck(); }); + } +} + +void Storage::ReplicationClient::TryInitializeClientAsync() { + thread_pool_.AddTask([this] { + rpc_client_->Abort(); + this->TryInitializeClientSync(); + }); +} + +void Storage::ReplicationClient::FrequentCheck() { + const auto is_success = std::invoke([this]() { + try { + auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()}; + const auto response = stream.AwaitResponse(); + return response.success; + } catch (const rpc::RpcFailedException &) { + return false; + } + }); + // States: READY, REPLICATING, RECOVERY, INVALID + // If success && ready, replicating, recovery -> stay the same because something good is going on. + // If success && INVALID -> [it's possible that replica came back to life] -> TryInitializeClient. + // If fail -> [replica is not reachable at all] -> INVALID state. + // NOTE: TryInitializeClient might return nothing if there is a branching point. + // NOTE: The early return pattern simplified the code, but the behavior should be as explained. + if (!is_success) { + replica_state_.store(replication::ReplicaState::INVALID); + return; + } + if (replica_state_.load() == replication::ReplicaState::INVALID) { + TryInitializeClientAsync(); + } } /// @throws rpc::RpcFailedException @@ -100,7 +137,7 @@ void Storage::ReplicationClient::InitializeClient() { } } -void Storage::ReplicationClient::TryInitializeClient() { +void Storage::ReplicationClient::TryInitializeClientSync() { try { InitializeClient(); } catch (const rpc::RpcFailedException &) { @@ -113,10 +150,7 @@ void Storage::ReplicationClient::TryInitializeClient() { void Storage::ReplicationClient::HandleRpcFailure() { spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication")); - thread_pool_.AddTask([this] { - rpc_client_->Abort(); - this->TryInitializeClient(); - }); + TryInitializeClientAsync(); } replication::SnapshotRes Storage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) { diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 6078e1f4e..d7d13de98 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -142,16 +142,14 @@ class Storage::ReplicationClient { std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker); + void FrequentCheck(); void InitializeClient(); - - void TryInitializeClient(); - + void TryInitializeClientSync(); + void TryInitializeClientAsync(); void HandleRpcFailure(); std::string name_; - Storage *storage_; - std::optional<communication::ClientContext> rpc_context_; std::optional<rpc::Client> rpc_client_; @@ -198,6 +196,8 @@ class Storage::ReplicationClient { // to ignore concurrency problems inside the client. utils::ThreadPool thread_pool_{1}; std::atomic<replication::ReplicaState> replica_state_{replication::ReplicaState::INVALID}; + + utils::Scheduler replica_checker_; }; } // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index 597499a88..be797b1fe 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -60,6 +60,10 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End spdlog::debug("Received HeartbeatRpc"); this->HeartbeatHandler(req_reader, res_builder); }); + rpc_server_->Register<replication::FrequentHeartbeatRpc>([](auto *req_reader, auto *res_builder) { + spdlog::debug("Received FrequentHeartbeatRpc"); + FrequentHeartbeatHandler(req_reader, res_builder); + }); rpc_server_->Register<replication::AppendDeltasRpc>([this](auto *req_reader, auto *res_builder) { spdlog::debug("Received AppendDeltasRpc"); this->AppendDeltasHandler(req_reader, res_builder); @@ -86,6 +90,13 @@ void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk:: slk::Save(res, res_builder); } +void Storage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { + replication::FrequentHeartbeatReq req; + slk::Load(&req, req_reader); + replication::FrequentHeartbeatRes res{true}; + slk::Save(res, res_builder); +} + void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::AppendDeltasReq req; slk::Load(&req, req_reader); diff --git a/src/storage/v2/replication/replication_server.hpp b/src/storage/v2/replication/replication_server.hpp index 300e6a83c..5aa81424c 100644 --- a/src/storage/v2/replication/replication_server.hpp +++ b/src/storage/v2/replication/replication_server.hpp @@ -29,6 +29,7 @@ class Storage::ReplicationServer { private: // RPC handlers void HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder); + static void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder); void AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder); void SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder); void WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder); diff --git a/src/storage/v2/replication/rpc.lcp b/src/storage/v2/replication/rpc.lcp index dd9e155b9..40efdaa0f 100644 --- a/src/storage/v2/replication/rpc.lcp +++ b/src/storage/v2/replication/rpc.lcp @@ -43,6 +43,12 @@ cpp<# (current-commit-timestamp :uint64_t) (epoch-id "std::string")))) +;; FrequentHearthbeat is required because calling Heartbeat takes the storage lock. +;; Configured by `replication_replica_check_delay`. +(lcp:define-rpc frequent-heartbeat + (:request ()) + (:response ((success :bool)))) + (lcp:define-rpc snapshot (:request ()) (:response