add RPC to get data

This commit is contained in:
antoniofilipovic 2024-02-14 13:12:03 +01:00
parent 867c192363
commit da8db8d064
15 changed files with 205 additions and 4 deletions

View File

@ -16,6 +16,7 @@
#include "coordination/coordinator_config.hpp"
#include "coordination/coordinator_rpc.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_coordination_glue/messages.hpp"
#include "utils/result.hpp"
@ -140,5 +141,20 @@ auto CoordinatorClient::SendGetInstanceUUID() const
return instance_uuid;
}
auto CoordinatorClient::SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError,
std::vector<replication_coordination_glue::ReplicationTimestampResult>> {
try {
auto stream{rpc_client_.Stream<coordination::GetInstanceTimestampsRpc>()};
auto res = stream.AwaitResponse();
return res.replica_timestamps;
} catch (const rpc::RpcFailedException &) {
spdlog::error("RPC error occured while sending GetInstance UUID RPC");
return GetInstanceUUIDError::RPC_EXCEPTION;
}
}
} // namespace memgraph::coordination
#endif

View File

@ -45,6 +45,20 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se
spdlog::info("Received GetInstanceUUIDRpc on coordinator server");
CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder);
});
server.Register<coordination::GetInstanceTimestampsRpc>(
[&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void {
spdlog::info("Received GetInstanceTimestampsRpc on coordinator server");
CoordinatorHandlers::GetInstanceTimestampsHandler(replication_handler, req_reader, res_builder);
});
}
void CoordinatorHandlers::GetInstanceTimestampsHandler(replication::ReplicationHandler &replication_handler,
slk::Reader *req_reader, slk::Builder *res_builder) {
coordination::GetInstanceTimestampsReq req;
slk::Load(&req, req_reader);
slk::Save(coordination::GetInstanceTimestampsRes{replication_handler.GetTimestampsForEachDb()}, res_builder);
}
void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler,

View File

@ -170,6 +170,20 @@ auto CoordinatorInstance::TryFailover() -> void {
}
// TODO: Smarter choice
{
// for each DB we get one ReplicationTimestampResult, that is why we have vector here
using ReplicaTimestampsRes = std::vector<replication_coordination_glue::ReplicationTimestampResult>;
std::unordered_map<std::string, ReplicaTimestampsRes> instance_replica_timestamps_res;
std::for_each(alive_replicas.begin(), alive_replicas.end(),
[&instance_replica_timestamps_res](ReplicationInstance &replica) {
auto res = replica.GetClient().SendGetInstanceTimestampsRpc();
if (res.HasError()) {
return;
}
instance_replica_timestamps_res.emplace(replica.InstanceName(), std::move(res.GetValue()));
});
}
auto new_main = ranges::begin(alive_replicas);
new_main->PauseFrequentCheck();

View File

@ -69,6 +69,23 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r
memgraph::slk::Load(self, reader);
}
// GetInstanceUUID
void GetInstanceTimestampsReq::Save(const GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void GetInstanceTimestampsReq::Load(GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
void GetInstanceTimestampsRes::Save(const GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self, builder);
}
void GetInstanceTimestampsRes::Load(GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(self, reader);
}
} // namespace coordination
constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ,
@ -89,6 +106,12 @@ constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId:
constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes",
nullptr};
constexpr utils::TypeInfo coordination::GetInstanceTimestampsReq::kType{
utils::TypeId::COORD_GET_INSTANCE_TIMESTAMPS_REQ, "GetInstanceTimestampsReq", nullptr};
constexpr utils::TypeInfo coordination::GetInstanceTimestampsRes::kType{
utils::TypeId::COORD_GET_INSTANCE_TIMESTAMPS_RES, "GetInstanceTimestampsRes", nullptr};
namespace slk {
// PromoteReplicaToMainRpc
@ -146,6 +169,24 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade
memgraph::slk::Load(&self->uuid, reader);
}
// GetInstanceTimestampsReq
void Save(const memgraph::coordination::GetInstanceTimestampsReq & /*self*/, memgraph::slk::Builder * /*builder*/) {
/* nothing to serialize*/
}
void Load(memgraph::coordination::GetInstanceTimestampsReq * /*self*/, memgraph::slk::Reader * /*reader*/) {
/* nothing to serialize*/
}
void Save(const memgraph::coordination::GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder) {
memgraph::slk::Save(self.replica_timestamps, builder);
}
void Load(memgraph::coordination::GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader) {
memgraph::slk::Load(&self->replica_timestamps, reader);
}
} // namespace slk
} // namespace memgraph

View File

@ -15,6 +15,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "rpc/client.hpp"
#include "rpc_errors.hpp"
#include "utils/result.hpp"
@ -59,6 +60,10 @@ class CoordinatorClient {
auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void;
auto SendGetInstanceTimestampsRpc() const
-> utils::BasicResult<GetInstanceUUIDError,
std::vector<replication_coordination_glue::ReplicationTimestampResult>>;
auto RpcClient() -> rpc::Client & { return rpc_client_; }
friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) {

View File

@ -36,6 +36,9 @@ class CoordinatorHandlers {
static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader,
slk::Builder *res_builder);
static void GetInstanceTimestampsHandler(replication::ReplicationHandler &replication_handler,
slk::Reader *req_reader, slk::Builder *res_builder);
};
} // namespace memgraph::dbms

View File

@ -15,6 +15,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "rpc/messages.hpp"
#include "slk/serialization.hpp"
@ -109,6 +110,35 @@ struct GetInstanceUUIDRes {
using GetInstanceUUIDRpc = rpc::RequestResponse<GetInstanceUUIDReq, GetInstanceUUIDRes>;
struct GetInstanceTimestampsReq {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader);
static void Save(const GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder);
GetInstanceTimestampsReq() = default;
bool success;
};
struct GetInstanceTimestampsRes {
static const utils::TypeInfo kType;
static const utils::TypeInfo &GetTypeInfo() { return kType; }
static void Load(GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader);
static void Save(const GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder);
explicit GetInstanceTimestampsRes(
const std::vector<replication_coordination_glue::ReplicationTimestampResult> &replica_timestamps)
: replica_timestamps(replica_timestamps) {}
GetInstanceTimestampsRes() = default;
std::vector<replication_coordination_glue::ReplicationTimestampResult> replica_timestamps;
};
using GetInstanceTimestampsRpc = rpc::RequestResponse<GetInstanceTimestampsReq, GetInstanceTimestampsRes>;
} // namespace memgraph::coordination
// SLK serialization declarations
@ -132,6 +162,12 @@ void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reade
void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader);
// GetInstanceTimestampsRpc
void Save(const memgraph::coordination::GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader);
void Save(const memgraph::coordination::GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder);
void Load(memgraph::coordination::GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader);
} // namespace memgraph::slk
#endif

View File

@ -14,6 +14,7 @@
#ifdef MG_ENTERPRISE
#include "coordination/coordinator_config.hpp"
#include "replication_coordination_glue/common.hpp"
#include "slk/serialization.hpp"
#include "slk/streams.hpp"
@ -34,5 +35,20 @@ inline void Load(ReplicationClientInfo *obj, Reader *reader) {
Load(&obj->replication_ip_address, reader);
Load(&obj->replication_port, reader);
}
inline void Save(const replication_coordination_glue::ReplicationTimestampResult &obj, Builder *builder) {
Save(obj.db_uuid, builder);
Save(obj.history, builder);
Save(obj.last_commit_timestamp, builder);
Save(obj.epoch_id, builder);
}
inline void Load(replication_coordination_glue::ReplicationTimestampResult *obj, Reader *reader) {
Load(&obj->db_uuid, reader);
Load(&obj->history, reader);
Load(&obj->last_commit_timestamp, reader);
Load(&obj->epoch_id, reader);
}
} // namespace memgraph::slk
#endif

View File

@ -11,4 +11,5 @@
namespace memgraph::coordination {
enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION };
enum class GetInstanceTimestampsError { NO_RESPONSE, RPC_EXCEPTION };
} // namespace memgraph::coordination

View File

@ -268,10 +268,6 @@ class DbmsHandler {
bool IsMain() const { return repl_state_.IsMain(); }
bool IsReplica() const { return repl_state_.IsReplica(); }
#ifdef MG_ENTERPRISE
// coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; }
#endif
/**
* @brief Return the statistics all databases.
*

View File

@ -7,6 +7,7 @@ target_sources(mg-repl_coord_glue
mode.hpp
role.hpp
handler.hpp
common.hpp
PRIVATE
messages.cpp

View File

@ -0,0 +1,31 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "rpc/client.hpp"
#include "utils/uuid.hpp"
#include <deque>
#include "messages.hpp"
#include "rpc/messages.hpp"
#include "utils/uuid.hpp"
namespace memgraph::replication_coordination_glue {
struct ReplicationTimestampResult {
memgraph::utils::UUID db_uuid;
std::vector<std::pair<std::string, uint64_t>> history;
uint64_t last_commit_timestamp;
std::string epoch_id;
};
} // namespace memgraph::replication_coordination_glue

View File

@ -13,6 +13,7 @@
#include "auth/auth.hpp"
#include "dbms/dbms_handler.hpp"
#include "replication/include/replication/state.hpp"
#include "replication_coordination_glue/common.hpp"
#include "replication_handler/system_replication.hpp"
#include "replication_handler/system_rpc.hpp"
#include "utils/result.hpp"
@ -190,6 +191,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler {
auto GetReplicaUUID() -> std::optional<utils::UUID>;
auto GetTimestampsForEachDb() -> std::vector<replication_coordination_glue::ReplicationTimestampResult>;
private:
template <bool HandleFailure>
auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid)

View File

@ -272,6 +272,28 @@ auto ReplicationHandler::GetRole() const -> memgraph::replication_coordination_g
return repl_state_.GetRole();
}
auto ReplicationHandler::GetTimestampsForEachDb()
-> std::vector<replication_coordination_glue::ReplicationTimestampResult> {
std::vector<replication_coordination_glue::ReplicationTimestampResult> results;
dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) {
auto *storage = db_acc->storage();
auto &repl_storage_state = storage->repl_storage_state_;
std::vector<std::pair<std::string, uint64_t>> history;
history.reserve(repl_storage_state.history.size());
std::transform(repl_storage_state.history.begin(), repl_storage_state.history.end(), std::back_inserter(history),
[](const auto &elem) { return std::pair<std::string, uint64_t>(elem.first, elem.second); });
replication_coordination_glue::ReplicationTimestampResult repl{
.db_uuid = db_acc->storage()->uuid(),
.history = history,
.last_commit_timestamp = repl_storage_state.last_commit_timestamp_.load(),
.epoch_id = std::string(repl_storage_state.epoch_.id())};
results.emplace_back(repl);
});
return results;
}
auto ReplicationHandler::GetReplicaUUID() -> std::optional<utils::UUID> {
MG_ASSERT(repl_state_.IsReplica());
return std::get<RoleReplicaData>(repl_state_.ReplicationData()).uuid_;

View File

@ -110,6 +110,8 @@ enum class TypeId : uint64_t {
COORD_GET_UUID_REQ,
COORD_GET_UUID_RES,
COORD_GET_INSTANCE_TIMESTAMPS_REQ,
COORD_GET_INSTANCE_TIMESTAMPS_RES,
// AST
AST_LABELIX = 3000,