Make HA heartbeat independent of other RPCs
Reviewers: msantl, ipaljak Reviewed By: msantl, ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2073
This commit is contained in:
parent
6c49e6de02
commit
1e79313538
@ -105,6 +105,16 @@ uint16_t Coordination::GetAllNodeCount() { return cluster_size_; }
|
||||
|
||||
uint16_t Coordination::GetOtherNodeCount() { return cluster_size_ - 1; }
|
||||
|
||||
io::network::Endpoint Coordination::GetOtherNodeEndpoint(uint16_t other_id) {
|
||||
CHECK(other_id != node_id_) << "Trying to execute RPC on self!";
|
||||
CHECK(other_id >= 1 && other_id <= cluster_size_) << "Invalid node id!";
|
||||
return endpoints_[other_id - 1];
|
||||
}
|
||||
|
||||
communication::ClientContext *Coordination::GetRpcClientContext() {
|
||||
return &client_context_.value();
|
||||
}
|
||||
|
||||
bool Coordination::Start() { return server_->Start(); }
|
||||
|
||||
void Coordination::AwaitShutdown(
|
||||
|
@ -65,6 +65,12 @@ class Coordination final {
|
||||
/// Returns number of other nodes.
|
||||
uint16_t GetOtherNodeCount();
|
||||
|
||||
/// Returns endpoint of other node.
|
||||
io::network::Endpoint GetOtherNodeEndpoint(uint16_t other_id);
|
||||
|
||||
/// Returns the currently used RPC client context.
|
||||
communication::ClientContext *GetRpcClientContext();
|
||||
|
||||
/// Executes a RPC on another node in the cluster. If the RPC execution
|
||||
/// fails (because of underlying network issues) it returns a `std::nullopt`.
|
||||
template <class TRequestResponse, class... Args>
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/rpc/client.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "durability/single_node_ha/paths.hpp"
|
||||
#include "durability/single_node_ha/recovery.hpp"
|
||||
@ -979,6 +980,11 @@ void RaftServer::HBThreadMain(uint16_t peer_id) {
|
||||
utils::ThreadSetName(fmt::format("HBThread{}", peer_id));
|
||||
std::unique_lock<std::mutex> lock(heartbeat_lock_);
|
||||
|
||||
// The heartbeat thread uses a dedicated RPC client for its peer so that it
|
||||
// can issue heartbeats in parallel with other RPC requests that are being
|
||||
// issued to the peer (replication, voting, etc.)
|
||||
std::unique_ptr<communication::rpc::Client> rpc_client;
|
||||
|
||||
while (!exiting_) {
|
||||
TimePoint wait_until;
|
||||
|
||||
@ -993,8 +999,17 @@ void RaftServer::HBThreadMain(uint16_t peer_id) {
|
||||
<< peer_id << " (Term: " << current_term_ << ")";
|
||||
|
||||
lock.unlock();
|
||||
coordination_->ExecuteOnOtherNode<HeartbeatRpc>(peer_id, server_id_,
|
||||
current_term_);
|
||||
if (!rpc_client) {
|
||||
rpc_client = std::make_unique<communication::rpc::Client>(
|
||||
coordination_->GetOtherNodeEndpoint(peer_id),
|
||||
coordination_->GetRpcClientContext());
|
||||
}
|
||||
try {
|
||||
rpc_client->Call<HeartbeatRpc>(server_id_, current_term_);
|
||||
} catch (...) {
|
||||
// Invalidate the client so that we reconnect next time.
|
||||
rpc_client = nullptr;
|
||||
}
|
||||
lock.lock();
|
||||
|
||||
// This is ok even if we don't receive a reply.
|
||||
|
Loading…
Reference in New Issue
Block a user