diff --git a/src/coordination/coordinator_client.cpp b/src/coordination/coordinator_client.cpp index 87117a627..9f1e1c0b7 100644 --- a/src/coordination/coordinator_client.cpp +++ b/src/coordination/coordinator_client.cpp @@ -16,6 +16,7 @@ #include "coordination/coordinator_config.hpp" #include "coordination/coordinator_rpc.hpp" +#include "replication_coordination_glue/common.hpp" #include "replication_coordination_glue/messages.hpp" #include "utils/result.hpp" @@ -140,5 +141,20 @@ auto CoordinatorClient::SendGetInstanceUUID() const return instance_uuid; } +auto CoordinatorClient::SendGetInstanceTimestampsRpc() const + -> utils::BasicResult> { + try { + auto stream{rpc_client_.Stream()}; + auto res = stream.AwaitResponse(); + + return res.replica_timestamps; + + } catch (const rpc::RpcFailedException &) { + spdlog::error("RPC error occured while sending GetInstance UUID RPC"); + return GetInstanceUUIDError::RPC_EXCEPTION; + } +} + } // namespace memgraph::coordination #endif diff --git a/src/coordination/coordinator_handlers.cpp b/src/coordination/coordinator_handlers.cpp index 9d9744f7e..88f3d58bd 100644 --- a/src/coordination/coordinator_handlers.cpp +++ b/src/coordination/coordinator_handlers.cpp @@ -45,6 +45,20 @@ void CoordinatorHandlers::Register(memgraph::coordination::CoordinatorServer &se spdlog::info("Received GetInstanceUUIDRpc on coordinator server"); CoordinatorHandlers::GetInstanceUUIDHandler(replication_handler, req_reader, res_builder); }); + + server.Register( + [&replication_handler](slk::Reader *req_reader, slk::Builder *res_builder) -> void { + spdlog::info("Received GetInstanceTimestampsRpc on coordinator server"); + CoordinatorHandlers::GetInstanceTimestampsHandler(replication_handler, req_reader, res_builder); + }); +} + +void CoordinatorHandlers::GetInstanceTimestampsHandler(replication::ReplicationHandler &replication_handler, + slk::Reader *req_reader, slk::Builder *res_builder) { + coordination::GetInstanceTimestampsReq req; + slk::Load(&req, req_reader); + + slk::Save(coordination::GetInstanceTimestampsRes{replication_handler.GetTimestampsForEachDb()}, res_builder); } void CoordinatorHandlers::SwapMainUUIDHandler(replication::ReplicationHandler &replication_handler, diff --git a/src/coordination/coordinator_instance.cpp b/src/coordination/coordinator_instance.cpp index 2ac29fb58..e8f2d31ac 100644 --- a/src/coordination/coordinator_instance.cpp +++ b/src/coordination/coordinator_instance.cpp @@ -170,6 +170,20 @@ auto CoordinatorInstance::TryFailover() -> void { } // TODO: Smarter choice + { + // for each DB we get one ReplicationTimestampResult, that is why we have vector here + using ReplicaTimestampsRes = std::vector; + std::unordered_map instance_replica_timestamps_res; + std::for_each(alive_replicas.begin(), alive_replicas.end(), + [&instance_replica_timestamps_res](ReplicationInstance &replica) { + auto res = replica.GetClient().SendGetInstanceTimestampsRpc(); + if (res.HasError()) { + return; + } + + instance_replica_timestamps_res.emplace(replica.InstanceName(), std::move(res.GetValue())); + }); + } auto new_main = ranges::begin(alive_replicas); new_main->PauseFrequentCheck(); diff --git a/src/coordination/coordinator_rpc.cpp b/src/coordination/coordinator_rpc.cpp index e61a2b83e..3d0def7bb 100644 --- a/src/coordination/coordinator_rpc.cpp +++ b/src/coordination/coordinator_rpc.cpp @@ -69,6 +69,23 @@ void GetInstanceUUIDRes::Load(GetInstanceUUIDRes *self, memgraph::slk::Reader *r memgraph::slk::Load(self, reader); } +// GetInstanceUUID +void GetInstanceTimestampsReq::Save(const GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void GetInstanceTimestampsReq::Load(GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + +void GetInstanceTimestampsRes::Save(const GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} + +void GetInstanceTimestampsRes::Load(GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + } // namespace coordination constexpr utils::TypeInfo coordination::PromoteReplicaToMainReq::kType{utils::TypeId::COORD_FAILOVER_REQ, @@ -89,6 +106,12 @@ constexpr utils::TypeInfo coordination::GetInstanceUUIDReq::kType{utils::TypeId: constexpr utils::TypeInfo coordination::GetInstanceUUIDRes::kType{utils::TypeId::COORD_GET_UUID_RES, "CoordGetUUIDRes", nullptr}; +constexpr utils::TypeInfo coordination::GetInstanceTimestampsReq::kType{ + utils::TypeId::COORD_GET_INSTANCE_TIMESTAMPS_REQ, "GetInstanceTimestampsReq", nullptr}; + +constexpr utils::TypeInfo coordination::GetInstanceTimestampsRes::kType{ + utils::TypeId::COORD_GET_INSTANCE_TIMESTAMPS_RES, "GetInstanceTimestampsRes", nullptr}; + namespace slk { // PromoteReplicaToMainRpc @@ -146,6 +169,24 @@ void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reade memgraph::slk::Load(&self->uuid, reader); } +// GetInstanceTimestampsReq + +void Save(const memgraph::coordination::GetInstanceTimestampsReq & /*self*/, memgraph::slk::Builder * /*builder*/) { + /* nothing to serialize*/ +} + +void Load(memgraph::coordination::GetInstanceTimestampsReq * /*self*/, memgraph::slk::Reader * /*reader*/) { + /* nothing to serialize*/ +} + +void Save(const memgraph::coordination::GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.replica_timestamps, builder); +} + +void Load(memgraph::coordination::GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->replica_timestamps, reader); +} + } // namespace slk } // namespace memgraph diff --git a/src/coordination/include/coordination/coordinator_client.hpp b/src/coordination/include/coordination/coordinator_client.hpp index 85905de23..26563198f 100644 --- a/src/coordination/include/coordination/coordinator_client.hpp +++ b/src/coordination/include/coordination/coordinator_client.hpp @@ -15,6 +15,7 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_config.hpp" +#include "replication_coordination_glue/common.hpp" #include "rpc/client.hpp" #include "rpc_errors.hpp" #include "utils/result.hpp" @@ -59,6 +60,10 @@ class CoordinatorClient { auto SetCallbacks(HealthCheckCallback succ_cb, HealthCheckCallback fail_cb) -> void; + auto SendGetInstanceTimestampsRpc() const + -> utils::BasicResult>; + auto RpcClient() -> rpc::Client & { return rpc_client_; } friend bool operator==(CoordinatorClient const &first, CoordinatorClient const &second) { diff --git a/src/coordination/include/coordination/coordinator_handlers.hpp b/src/coordination/include/coordination/coordinator_handlers.hpp index b5638d9e9..a6cd5cf6d 100644 --- a/src/coordination/include/coordination/coordinator_handlers.hpp +++ b/src/coordination/include/coordination/coordinator_handlers.hpp @@ -36,6 +36,9 @@ class CoordinatorHandlers { static void GetInstanceUUIDHandler(replication::ReplicationHandler &replication_handler, slk::Reader *req_reader, slk::Builder *res_builder); + + static void GetInstanceTimestampsHandler(replication::ReplicationHandler &replication_handler, + slk::Reader *req_reader, slk::Builder *res_builder); }; } // namespace memgraph::dbms diff --git a/src/coordination/include/coordination/coordinator_rpc.hpp b/src/coordination/include/coordination/coordinator_rpc.hpp index eaf9ebfa6..722aa9cdc 100644 --- a/src/coordination/include/coordination/coordinator_rpc.hpp +++ b/src/coordination/include/coordination/coordinator_rpc.hpp @@ -15,6 +15,7 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_config.hpp" +#include "replication_coordination_glue/common.hpp" #include "rpc/messages.hpp" #include "slk/serialization.hpp" @@ -109,6 +110,35 @@ struct GetInstanceUUIDRes { using GetInstanceUUIDRpc = rpc::RequestResponse; +struct GetInstanceTimestampsReq { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader); + static void Save(const GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder); + + GetInstanceTimestampsReq() = default; + + bool success; +}; + +struct GetInstanceTimestampsRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader); + static void Save(const GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder); + + explicit GetInstanceTimestampsRes( + const std::vector &replica_timestamps) + : replica_timestamps(replica_timestamps) {} + GetInstanceTimestampsRes() = default; + + std::vector replica_timestamps; +}; + +using GetInstanceTimestampsRpc = rpc::RequestResponse; + } // namespace memgraph::coordination // SLK serialization declarations @@ -132,6 +162,12 @@ void Load(memgraph::coordination::GetInstanceUUIDReq *self, memgraph::slk::Reade void Save(const memgraph::coordination::GetInstanceUUIDRes &self, memgraph::slk::Builder *builder); void Load(memgraph::coordination::GetInstanceUUIDRes *self, memgraph::slk::Reader *reader); +// GetInstanceTimestampsRpc +void Save(const memgraph::coordination::GetInstanceTimestampsReq &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::GetInstanceTimestampsReq *self, memgraph::slk::Reader *reader); +void Save(const memgraph::coordination::GetInstanceTimestampsRes &self, memgraph::slk::Builder *builder); +void Load(memgraph::coordination::GetInstanceTimestampsRes *self, memgraph::slk::Reader *reader); + } // namespace memgraph::slk #endif diff --git a/src/coordination/include/coordination/coordinator_slk.hpp b/src/coordination/include/coordination/coordinator_slk.hpp index 49834be41..de940e339 100644 --- a/src/coordination/include/coordination/coordinator_slk.hpp +++ b/src/coordination/include/coordination/coordinator_slk.hpp @@ -14,6 +14,7 @@ #ifdef MG_ENTERPRISE #include "coordination/coordinator_config.hpp" +#include "replication_coordination_glue/common.hpp" #include "slk/serialization.hpp" #include "slk/streams.hpp" @@ -34,5 +35,20 @@ inline void Load(ReplicationClientInfo *obj, Reader *reader) { Load(&obj->replication_ip_address, reader); Load(&obj->replication_port, reader); } + +inline void Save(const replication_coordination_glue::ReplicationTimestampResult &obj, Builder *builder) { + Save(obj.db_uuid, builder); + Save(obj.history, builder); + Save(obj.last_commit_timestamp, builder); + Save(obj.epoch_id, builder); +} + +inline void Load(replication_coordination_glue::ReplicationTimestampResult *obj, Reader *reader) { + Load(&obj->db_uuid, reader); + Load(&obj->history, reader); + Load(&obj->last_commit_timestamp, reader); + Load(&obj->epoch_id, reader); +} + } // namespace memgraph::slk #endif diff --git a/src/coordination/include/coordination/rpc_errors.hpp b/src/coordination/include/coordination/rpc_errors.hpp index f6bfbf3e0..3829d430a 100644 --- a/src/coordination/include/coordination/rpc_errors.hpp +++ b/src/coordination/include/coordination/rpc_errors.hpp @@ -11,4 +11,5 @@ namespace memgraph::coordination { enum class GetInstanceUUIDError { NO_RESPONSE, RPC_EXCEPTION }; +enum class GetInstanceTimestampsError { NO_RESPONSE, RPC_EXCEPTION }; } // namespace memgraph::coordination diff --git a/src/dbms/dbms_handler.hpp b/src/dbms/dbms_handler.hpp index a373f751b..a0d5907c1 100644 --- a/src/dbms/dbms_handler.hpp +++ b/src/dbms/dbms_handler.hpp @@ -268,10 +268,6 @@ class DbmsHandler { bool IsMain() const { return repl_state_.IsMain(); } bool IsReplica() const { return repl_state_.IsReplica(); } -#ifdef MG_ENTERPRISE - // coordination::CoordinatorState &CoordinatorState() { return coordinator_state_; } -#endif - /** * @brief Return the statistics all databases. * diff --git a/src/replication_coordination_glue/CMakeLists.txt b/src/replication_coordination_glue/CMakeLists.txt index f81aed4ba..f452e1c1f 100644 --- a/src/replication_coordination_glue/CMakeLists.txt +++ b/src/replication_coordination_glue/CMakeLists.txt @@ -7,6 +7,7 @@ target_sources(mg-repl_coord_glue mode.hpp role.hpp handler.hpp + common.hpp PRIVATE messages.cpp diff --git a/src/replication_coordination_glue/common.hpp b/src/replication_coordination_glue/common.hpp new file mode 100644 index 000000000..98da242a7 --- /dev/null +++ b/src/replication_coordination_glue/common.hpp @@ -0,0 +1,31 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "rpc/client.hpp" +#include "utils/uuid.hpp" + +#include +#include "messages.hpp" +#include "rpc/messages.hpp" +#include "utils/uuid.hpp" + +namespace memgraph::replication_coordination_glue { + +struct ReplicationTimestampResult { + memgraph::utils::UUID db_uuid; + std::vector> history; + uint64_t last_commit_timestamp; + std::string epoch_id; +}; + +} // namespace memgraph::replication_coordination_glue diff --git a/src/replication_handler/include/replication_handler/replication_handler.hpp b/src/replication_handler/include/replication_handler/replication_handler.hpp index db6c2d658..56cfdc529 100644 --- a/src/replication_handler/include/replication_handler/replication_handler.hpp +++ b/src/replication_handler/include/replication_handler/replication_handler.hpp @@ -13,6 +13,7 @@ #include "auth/auth.hpp" #include "dbms/dbms_handler.hpp" #include "replication/include/replication/state.hpp" +#include "replication_coordination_glue/common.hpp" #include "replication_handler/system_replication.hpp" #include "replication_handler/system_rpc.hpp" #include "utils/result.hpp" @@ -190,6 +191,8 @@ struct ReplicationHandler : public memgraph::query::ReplicationQueryHandler { auto GetReplicaUUID() -> std::optional; + auto GetTimestampsForEachDb() -> std::vector; + private: template auto RegisterReplica_(const memgraph::replication::ReplicationClientConfig &config, bool send_swap_uuid) diff --git a/src/replication_handler/replication_handler.cpp b/src/replication_handler/replication_handler.cpp index b09a68a19..9816d7dc9 100644 --- a/src/replication_handler/replication_handler.cpp +++ b/src/replication_handler/replication_handler.cpp @@ -272,6 +272,28 @@ auto ReplicationHandler::GetRole() const -> memgraph::replication_coordination_g return repl_state_.GetRole(); } +auto ReplicationHandler::GetTimestampsForEachDb() + -> std::vector { + std::vector results; + dbms_handler_.ForEach([&results](memgraph::dbms::DatabaseAccess db_acc) { + auto *storage = db_acc->storage(); + auto &repl_storage_state = storage->repl_storage_state_; + std::vector> history; + history.reserve(repl_storage_state.history.size()); + std::transform(repl_storage_state.history.begin(), repl_storage_state.history.end(), std::back_inserter(history), + [](const auto &elem) { return std::pair(elem.first, elem.second); }); + + replication_coordination_glue::ReplicationTimestampResult repl{ + .db_uuid = db_acc->storage()->uuid(), + .history = history, + .last_commit_timestamp = repl_storage_state.last_commit_timestamp_.load(), + .epoch_id = std::string(repl_storage_state.epoch_.id())}; + results.emplace_back(repl); + }); + + return results; +} + auto ReplicationHandler::GetReplicaUUID() -> std::optional { MG_ASSERT(repl_state_.IsReplica()); return std::get(repl_state_.ReplicationData()).uuid_; diff --git a/src/utils/typeinfo.hpp b/src/utils/typeinfo.hpp index 5f7f126ee..dde7130af 100644 --- a/src/utils/typeinfo.hpp +++ b/src/utils/typeinfo.hpp @@ -110,6 +110,8 @@ enum class TypeId : uint64_t { COORD_GET_UUID_REQ, COORD_GET_UUID_RES, + COORD_GET_INSTANCE_TIMESTAMPS_REQ, + COORD_GET_INSTANCE_TIMESTAMPS_RES, // AST AST_LABELIX = 3000,