Add frequent replica ping (#380)
This commit is contained in:
parent
cccf32e79d
commit
8e3ab1ad0f
@ -88,4 +88,3 @@ CheckOptions:
|
||||
- key: modernize-use-nullptr.NullMacros
|
||||
value: 'NULL'
|
||||
...
|
||||
|
||||
|
@ -252,6 +252,11 @@ DEFINE_double(query_execution_timeout_sec, 600,
|
||||
"Maximum allowed query execution time. Queries exceeding this "
|
||||
"limit will be aborted. Value of 0 means no limit.");
|
||||
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint64(replication_replica_check_frequency_sec, 1,
|
||||
"The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: "
|
||||
"The MAIN instance allocates a new thread for each REPLICA.");
|
||||
|
||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||
DEFINE_uint64(
|
||||
memory_limit, 0,
|
||||
@ -1216,6 +1221,7 @@ int main(int argc, char **argv) {
|
||||
&db,
|
||||
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
|
||||
.execution_timeout_sec = FLAGS_query_execution_timeout_sec,
|
||||
.replication_replica_check_frequency = std::chrono::seconds(FLAGS_replication_replica_check_frequency_sec),
|
||||
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers,
|
||||
.default_pulsar_service_url = FLAGS_pulsar_service_url,
|
||||
.stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,
|
||||
|
@ -21,6 +21,8 @@ struct InterpreterConfig {
|
||||
|
||||
// The default execution timeout is 10 minutes.
|
||||
double execution_timeout_sec{600.0};
|
||||
// The same as \ref memgraph::storage::replication::ReplicationClientConfig
|
||||
std::chrono::seconds replication_replica_check_frequency{1};
|
||||
|
||||
std::string default_kafka_bootstrap_servers;
|
||||
std::string default_pulsar_service_url;
|
||||
|
@ -160,7 +160,8 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
void RegisterReplica(const std::string &name, const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) override {
|
||||
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
|
||||
const std::chrono::seconds replica_check_frequency) override {
|
||||
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
|
||||
// replica can't register another replica
|
||||
throw QueryRuntimeException("Replica can't register another replica!");
|
||||
@ -182,8 +183,9 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
|
||||
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort);
|
||||
if (maybe_ip_and_port) {
|
||||
auto [ip, port] = *maybe_ip_and_port;
|
||||
auto ret =
|
||||
db_->RegisterReplica(name, {std::move(ip), port}, repl_mode, {.timeout = timeout, .ssl = std::nullopt});
|
||||
auto ret = db_->RegisterReplica(
|
||||
name, {std::move(ip), port}, repl_mode,
|
||||
{.timeout = timeout, .replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
|
||||
if (ret.HasError()) {
|
||||
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));
|
||||
}
|
||||
@ -467,6 +469,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
||||
const auto &sync_mode = repl_query->sync_mode_;
|
||||
auto socket_address = repl_query->socket_address_->Accept(evaluator);
|
||||
auto timeout = EvaluateOptionalExpression(repl_query->timeout_, &evaluator);
|
||||
const auto replica_check_frequency = interpreter_context->config.replication_replica_check_frequency;
|
||||
std::optional<double> maybe_timeout;
|
||||
if (timeout.IsDouble()) {
|
||||
maybe_timeout = timeout.ValueDouble();
|
||||
@ -474,8 +477,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
|
||||
maybe_timeout = static_cast<double>(timeout.ValueInt());
|
||||
}
|
||||
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name, socket_address, sync_mode,
|
||||
maybe_timeout]() mutable {
|
||||
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout);
|
||||
maybe_timeout, replica_check_frequency]() mutable {
|
||||
handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, maybe_timeout,
|
||||
replica_check_frequency);
|
||||
return std::vector<std::vector<TypedValue>>();
|
||||
};
|
||||
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::REGISTER_REPLICA,
|
||||
|
@ -137,7 +137,8 @@ class ReplicationQueryHandler {
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual void RegisterReplica(const std::string &name, const std::string &socket_address,
|
||||
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout) = 0;
|
||||
const ReplicationQuery::SyncMode sync_mode, const std::optional<double> timeout,
|
||||
const std::chrono::seconds replica_check_frequency) = 0;
|
||||
|
||||
/// @throw QueryRuntimeException if an error ocurred.
|
||||
virtual void DropReplica(const std::string &replica_name) = 0;
|
||||
|
@ -16,6 +16,10 @@
|
||||
namespace memgraph::storage::replication {
|
||||
struct ReplicationClientConfig {
|
||||
std::optional<double> timeout;
|
||||
// The default delay between main checking/pinging replicas is 1s because
|
||||
// that seems like a reasonable timeframe in which main should notice a
|
||||
// replica is down.
|
||||
std::chrono::seconds replica_check_frequency{1};
|
||||
|
||||
struct SSL {
|
||||
std::string key_file = "";
|
||||
|
@ -41,12 +41,49 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage
|
||||
}
|
||||
|
||||
rpc_client_.emplace(endpoint, &*rpc_context_);
|
||||
TryInitializeClient();
|
||||
TryInitializeClientSync();
|
||||
|
||||
if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) {
|
||||
timeout_.emplace(*config.timeout);
|
||||
timeout_dispatcher_.emplace();
|
||||
}
|
||||
|
||||
// Help the user to get the most accurate replica state possible.
|
||||
if (config.replica_check_frequency > std::chrono::seconds(0)) {
|
||||
replica_checker_.Run("Replica Checker", config.replica_check_frequency, [&] { FrequentCheck(); });
|
||||
}
|
||||
}
|
||||
|
||||
void Storage::ReplicationClient::TryInitializeClientAsync() {
|
||||
thread_pool_.AddTask([this] {
|
||||
rpc_client_->Abort();
|
||||
this->TryInitializeClientSync();
|
||||
});
|
||||
}
|
||||
|
||||
void Storage::ReplicationClient::FrequentCheck() {
|
||||
const auto is_success = std::invoke([this]() {
|
||||
try {
|
||||
auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()};
|
||||
const auto response = stream.AwaitResponse();
|
||||
return response.success;
|
||||
} catch (const rpc::RpcFailedException &) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
// States: READY, REPLICATING, RECOVERY, INVALID
|
||||
// If success && ready, replicating, recovery -> stay the same because something good is going on.
|
||||
// If success && INVALID -> [it's possible that replica came back to life] -> TryInitializeClient.
|
||||
// If fail -> [replica is not reachable at all] -> INVALID state.
|
||||
// NOTE: TryInitializeClient might return nothing if there is a branching point.
|
||||
// NOTE: The early return pattern simplified the code, but the behavior should be as explained.
|
||||
if (!is_success) {
|
||||
replica_state_.store(replication::ReplicaState::INVALID);
|
||||
return;
|
||||
}
|
||||
if (replica_state_.load() == replication::ReplicaState::INVALID) {
|
||||
TryInitializeClientAsync();
|
||||
}
|
||||
}
|
||||
|
||||
/// @throws rpc::RpcFailedException
|
||||
@ -100,7 +137,7 @@ void Storage::ReplicationClient::InitializeClient() {
|
||||
}
|
||||
}
|
||||
|
||||
void Storage::ReplicationClient::TryInitializeClient() {
|
||||
void Storage::ReplicationClient::TryInitializeClientSync() {
|
||||
try {
|
||||
InitializeClient();
|
||||
} catch (const rpc::RpcFailedException &) {
|
||||
@ -113,10 +150,7 @@ void Storage::ReplicationClient::TryInitializeClient() {
|
||||
|
||||
void Storage::ReplicationClient::HandleRpcFailure() {
|
||||
spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication"));
|
||||
thread_pool_.AddTask([this] {
|
||||
rpc_client_->Abort();
|
||||
this->TryInitializeClient();
|
||||
});
|
||||
TryInitializeClientAsync();
|
||||
}
|
||||
|
||||
replication::SnapshotRes Storage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
|
||||
|
@ -142,16 +142,14 @@ class Storage::ReplicationClient {
|
||||
|
||||
std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker);
|
||||
|
||||
void FrequentCheck();
|
||||
void InitializeClient();
|
||||
|
||||
void TryInitializeClient();
|
||||
|
||||
void TryInitializeClientSync();
|
||||
void TryInitializeClientAsync();
|
||||
void HandleRpcFailure();
|
||||
|
||||
std::string name_;
|
||||
|
||||
Storage *storage_;
|
||||
|
||||
std::optional<communication::ClientContext> rpc_context_;
|
||||
std::optional<rpc::Client> rpc_client_;
|
||||
|
||||
@ -198,6 +196,8 @@ class Storage::ReplicationClient {
|
||||
// to ignore concurrency problems inside the client.
|
||||
utils::ThreadPool thread_pool_{1};
|
||||
std::atomic<replication::ReplicaState> replica_state_{replication::ReplicaState::INVALID};
|
||||
|
||||
utils::Scheduler replica_checker_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::storage
|
||||
|
@ -60,6 +60,10 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End
|
||||
spdlog::debug("Received HeartbeatRpc");
|
||||
this->HeartbeatHandler(req_reader, res_builder);
|
||||
});
|
||||
rpc_server_->Register<replication::FrequentHeartbeatRpc>([](auto *req_reader, auto *res_builder) {
|
||||
spdlog::debug("Received FrequentHeartbeatRpc");
|
||||
FrequentHeartbeatHandler(req_reader, res_builder);
|
||||
});
|
||||
rpc_server_->Register<replication::AppendDeltasRpc>([this](auto *req_reader, auto *res_builder) {
|
||||
spdlog::debug("Received AppendDeltasRpc");
|
||||
this->AppendDeltasHandler(req_reader, res_builder);
|
||||
@ -86,6 +90,13 @@ void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::
|
||||
slk::Save(res, res_builder);
|
||||
}
|
||||
|
||||
void Storage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
|
||||
replication::FrequentHeartbeatReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
replication::FrequentHeartbeatRes res{true};
|
||||
slk::Save(res, res_builder);
|
||||
}
|
||||
|
||||
void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
|
||||
replication::AppendDeltasReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
|
@ -29,6 +29,7 @@ class Storage::ReplicationServer {
|
||||
private:
|
||||
// RPC handlers
|
||||
void HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder);
|
||||
static void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder);
|
||||
void AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder);
|
||||
void SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder);
|
||||
void WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder);
|
||||
|
@ -43,6 +43,12 @@ cpp<#
|
||||
(current-commit-timestamp :uint64_t)
|
||||
(epoch-id "std::string"))))
|
||||
|
||||
;; FrequentHearthbeat is required because calling Heartbeat takes the storage lock.
|
||||
;; Configured by `replication_replica_check_delay`.
|
||||
(lcp:define-rpc frequent-heartbeat
|
||||
(:request ())
|
||||
(:response ((success :bool))))
|
||||
|
||||
(lcp:define-rpc snapshot
|
||||
(:request ())
|
||||
(:response
|
||||
|
Loading…
Reference in New Issue
Block a user