diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f19d66d8a..d6258f2c8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -21,6 +21,7 @@ add_subdirectory(audit) add_subdirectory(dbms) add_subdirectory(flags) add_subdirectory(distributed) +add_subdirectory(replication) string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type) diff --git a/src/kvstore/CMakeLists.txt b/src/kvstore/CMakeLists.txt index b3a119ed5..fee8b7b05 100644 --- a/src/kvstore/CMakeLists.txt +++ b/src/kvstore/CMakeLists.txt @@ -4,4 +4,5 @@ find_package(ZLIB REQUIRED) # STATIC library used to store key-value pairs add_library(mg-kvstore STATIC kvstore.cpp) +add_library(mg::kvstore ALIAS mg-kvstore) target_link_libraries(mg-kvstore stdc++fs mg-utils rocksdb BZip2::BZip2 ZLIB::ZLIB gflags) diff --git a/src/query/config.hpp b/src/query/config.hpp index 59a34e263..64e2da5bb 100644 --- a/src/query/config.hpp +++ b/src/query/config.hpp @@ -19,7 +19,7 @@ struct InterpreterConfig { bool allow_load_csv{true}; } query; - // The same as \ref memgraph::storage::replication::ReplicationClientConfig + // The same as \ref memgraph::replication::ReplicationClientConfig std::chrono::seconds replication_replica_check_frequency{1}; std::string default_kafka_bootstrap_servers; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 45a6e369b..6f9873b18 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -65,6 +65,7 @@ #include "query/stream/streams.hpp" #include "query/trigger.hpp" #include "query/typed_value.hpp" +#include "replication/config.hpp" #include "spdlog/spdlog.h" #include "storage/v2/disk/storage.hpp" #include "storage/v2/edge.hpp" @@ -72,7 +73,6 @@ #include "storage/v2/id_types.hpp" #include "storage/v2/inmemory/storage.hpp" #include "storage/v2/property_value.hpp" -#include "storage/v2/replication/config.hpp" #include "storage/v2/storage_error.hpp" #include "storage/v2/storage_mode.hpp" #include "utils/algorithm.hpp" @@ -98,6 +98,8 @@ #include "dbms/dbms_handler.hpp" #include "query/auth_query_handler.hpp" #include "query/interpreter_context.hpp" +#include "replication/state.hpp" +#include "storage/v2/replication/replication_handler.hpp" namespace memgraph::metrics { extern Event ReadQuery; @@ -144,7 +146,6 @@ constexpr auto kAlwaysFalse = false; namespace { template - void Sort(std::vector &vec) { std::sort(vec.begin(), vec.end()); } @@ -156,13 +157,15 @@ void Sort(std::vector &vec) { } // NOLINTNEXTLINE (misc-unused-parameters) -bool Same(const TypedValue &lv, const TypedValue &rv) { +[[maybe_unused]] bool Same(const TypedValue &lv, const TypedValue &rv) { return TypedValue(lv).ValueString() == TypedValue(rv).ValueString(); } // NOLINTNEXTLINE (misc-unused-parameters) bool Same(const TypedValue &lv, const std::string &rv) { return std::string(TypedValue(lv).ValueString()) == rv; } // NOLINTNEXTLINE (misc-unused-parameters) -bool Same(const std::string &lv, const TypedValue &rv) { return lv == std::string(TypedValue(rv).ValueString()); } +[[maybe_unused]] bool Same(const std::string &lv, const TypedValue &rv) { + return lv == std::string(TypedValue(rv).ValueString()); +} // NOLINTNEXTLINE (misc-unused-parameters) bool Same(const std::string &lv, const std::string &rv) { return lv == rv; } @@ -248,24 +251,40 @@ bool IsAllShortestPathsQuery(const std::vector &claus return false; } +inline auto convertToReplicationMode(const ReplicationQuery::SyncMode &sync_mode) -> replication::ReplicationMode { + switch (sync_mode) { + case ReplicationQuery::SyncMode::ASYNC: { + return replication::ReplicationMode::ASYNC; + } + case ReplicationQuery::SyncMode::SYNC: { + return replication::ReplicationMode::SYNC; + } + } + // TODO: C++23 std::unreachable() + return replication::ReplicationMode::ASYNC; +} + class ReplQueryHandler final : public query::ReplicationQueryHandler { public: - explicit ReplQueryHandler(storage::Storage *db) : db_(db) {} + explicit ReplQueryHandler(storage::Storage *db) : db_(db), handler_{db_->repl_state_, *db_} {} /// @throw QueryRuntimeException if an error ocurred. void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role, std::optional port) override { if (replication_role == ReplicationQuery::ReplicationRole::MAIN) { - if (!db_->SetMainReplicationRole()) { + if (!handler_.SetReplicationRoleMain()) { throw QueryRuntimeException("Couldn't set role to main!"); } } else { if (!port || *port < 0 || *port > std::numeric_limits::max()) { throw QueryRuntimeException("Port number invalid!"); } - if (!db_->SetReplicaRole(storage::replication::ReplicationServerConfig{ - .ip_address = storage::replication::kDefaultReplicationServerIp, - .port = static_cast(*port), - })) { + + auto const config = memgraph::replication::ReplicationServerConfig{ + .ip_address = memgraph::replication::kDefaultReplicationServerIp, + .port = static_cast(*port), + }; + + if (!handler_.SetReplicationRoleReplica(config)) { throw QueryRuntimeException("Couldn't set role to replica!"); } } @@ -273,10 +292,10 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. ReplicationQuery::ReplicationRole ShowReplicationRole() const override { - switch (db_->GetReplicationRole()) { - case storage::replication::ReplicationRole::MAIN: + switch (handler_.GetRole()) { + case memgraph::replication::ReplicationRole::MAIN: return ReplicationQuery::ReplicationRole::MAIN; - case storage::replication::ReplicationRole::REPLICA: + case memgraph::replication::ReplicationRole::REPLICA: return ReplicationQuery::ReplicationRole::REPLICA; } throw QueryRuntimeException("Couldn't show replication role - invalid role set!"); @@ -286,39 +305,29 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { void RegisterReplica(const std::string &name, const std::string &socket_address, const ReplicationQuery::SyncMode sync_mode, const std::chrono::seconds replica_check_frequency) override { - if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) { + if (handler_.IsReplica()) { // replica can't register another replica throw QueryRuntimeException("Replica can't register another replica!"); } - if (name == storage::replication::kReservedReplicationRoleName) { + if (name == memgraph::replication::kReservedReplicationRoleName) { throw QueryRuntimeException("This replica name is reserved and can not be used as replica name!"); } - storage::replication::ReplicationMode repl_mode; - switch (sync_mode) { - case ReplicationQuery::SyncMode::ASYNC: { - repl_mode = storage::replication::ReplicationMode::ASYNC; - break; - } - case ReplicationQuery::SyncMode::SYNC: { - repl_mode = storage::replication::ReplicationMode::SYNC; - break; - } - } + auto repl_mode = convertToReplicationMode(sync_mode); auto maybe_ip_and_port = - io::network::Endpoint::ParseSocketOrIpAddress(socket_address, storage::replication::kDefaultReplicationPort); + io::network::Endpoint::ParseSocketOrIpAddress(socket_address, memgraph::replication::kDefaultReplicationPort); if (maybe_ip_and_port) { auto [ip, port] = *maybe_ip_and_port; - auto ret = db_->RegisterReplica( - storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID, - storage::replication::ReplicationClientConfig{.name = name, - .mode = repl_mode, - .ip_address = ip, - .port = port, - .replica_check_frequency = replica_check_frequency, - .ssl = std::nullopt}); + auto config = replication::ReplicationClientConfig{.name = name, + .mode = repl_mode, + .ip_address = ip, + .port = port, + .replica_check_frequency = replica_check_frequency, + .ssl = std::nullopt}; + using storage::RegistrationMode; + auto ret = handler_.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, config); if (ret.HasError()) { throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name)); } @@ -327,20 +336,26 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { } } - /// @throw QueryRuntimeException if an error ocurred. - void DropReplica(const std::string &replica_name) override { - if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) { - // replica can't unregister a replica - throw QueryRuntimeException("Replica can't unregister a replica!"); - } - if (!db_->UnregisterReplica(replica_name)) { - throw QueryRuntimeException(fmt::format("Couldn't unregister the replica '{}'", replica_name)); + /// @throw QueryRuntimeException if an error occurred. + void DropReplica(std::string_view replica_name) override { + auto const result = handler_.UnregisterReplica(replica_name); + switch (result) { + using enum memgraph::storage::UnregisterReplicaResult; + case NOT_MAIN: + throw QueryRuntimeException("Replica can't unregister a replica!"); + case COULD_NOT_BE_PERSISTED: + [[fallthrough]]; + case CAN_NOT_UNREGISTER: + throw QueryRuntimeException(fmt::format("Couldn't unregister the replica '{}'", replica_name)); + case SUCCESS: + break; } } using Replica = ReplicationQueryHandler::Replica; std::vector ShowReplicas() const override { - if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) { + auto const &replState = db_->repl_state_; + if (replState.IsReplica()) { // replica can't show registered replicas (it shouldn't have any) throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!"); } @@ -354,10 +369,10 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { replica.name = repl_info.name; replica.socket_address = repl_info.endpoint.SocketAddress(); switch (repl_info.mode) { - case storage::replication::ReplicationMode::SYNC: + case memgraph::replication::ReplicationMode::SYNC: replica.sync_mode = ReplicationQuery::SyncMode::SYNC; break; - case storage::replication::ReplicationMode::ASYNC: + case memgraph::replication::ReplicationMode::ASYNC: replica.sync_mode = ReplicationQuery::SyncMode::ASYNC; break; } @@ -390,6 +405,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { private: storage::Storage *db_; + storage::ReplicationHandler handler_; }; /// returns false if the replication role can't be set @@ -1370,18 +1386,19 @@ bool IsWriteQueryOnMainMemoryReplica(storage::Storage *storage, const query::plan::ReadWriteTypeChecker::RWType query_type) { if (auto storage_mode = storage->GetStorageMode(); storage_mode == storage::StorageMode::IN_MEMORY_ANALYTICAL || storage_mode == storage::StorageMode::IN_MEMORY_TRANSACTIONAL) { - return (storage->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) && - (query_type == RWType::W || query_type == RWType::RW); + auto const &replState = storage->repl_state_; + return replState.IsReplica() && (query_type == RWType::W || query_type == RWType::RW); } return false; } -storage::replication::ReplicationRole GetReplicaRole(storage::Storage *storage) { +bool IsReplica(storage::Storage *storage) { if (auto storage_mode = storage->GetStorageMode(); storage_mode == storage::StorageMode::IN_MEMORY_ANALYTICAL || storage_mode == storage::StorageMode::IN_MEMORY_TRANSACTIONAL) { - return storage->GetReplicationRole(); + auto const &replState = storage->repl_state_; + return replState.IsReplica(); } - return storage::replication::ReplicationRole::MAIN; + return false; } } // namespace @@ -2784,7 +2801,7 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli std::move(parsed_query.required_privileges), [storage](AnyStream * /*stream*/, std::optional /*n*/) -> std::optional { auto *mem_storage = static_cast(storage); - if (auto maybe_error = mem_storage->CreateSnapshot({}); maybe_error.HasError()) { + if (auto maybe_error = mem_storage->CreateSnapshot(storage->repl_state_, {}); maybe_error.HasError()) { switch (maybe_error.GetError()) { case storage::InMemoryStorage::CreateSnapshotError::DisabledForReplica: throw utils::BasicException( @@ -3328,7 +3345,7 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur } // TODO: Remove once replicas support multi-tenant replication if (!current_db.db_acc_) throw DatabaseContextRequiredException("Multi database queries require a defined database."); - if (GetReplicaRole(current_db.db_acc_->get()->storage()) == storage::replication::ReplicationRole::REPLICA) { + if (IsReplica(current_db.db_acc_->get()->storage())) { throw QueryException("Query forbidden on the replica!"); } @@ -3471,7 +3488,8 @@ PreparedQuery PrepareShowDatabasesQuery(ParsedQuery parsed_query, CurrentDB &cur throw QueryException("Trying to use enterprise feature without a valid license."); } // TODO: Remove once replicas support multi-tenant replication - if (GetReplicaRole(storage) == storage::replication::ReplicationRole::REPLICA) { + auto &replState = storage->repl_state_; + if (replState.IsReplica()) { throw QueryException("SHOW DATABASES forbidden on the replica!"); } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index bcd1f30eb..515419070 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -95,7 +95,7 @@ class ReplicationQueryHandler { const std::chrono::seconds replica_check_frequency) = 0; /// @throw QueryRuntimeException if an error ocurred. - virtual void DropReplica(const std::string &replica_name) = 0; + virtual void DropReplica(std::string_view replica_name) = 0; /// @throw QueryRuntimeException if an error ocurred. virtual std::vector ShowReplicas() const = 0; diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index 0294b6f73..eddba6d43 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -1940,7 +1940,7 @@ void mgp_vertex_destroy(mgp_vertex *v) { DeleteRawMgpObject(v); } mgp_error mgp_vertex_equal(mgp_vertex *v1, mgp_vertex *v2, int *result) { // NOLINTNEXTLINE(clang-diagnostic-unevaluated-expression) - static_assert(noexcept(*result = *v1 == *v2 ? 1 : 0)); + static_assert(noexcept(*v1 == *v2)); *result = *v1 == *v2 ? 1 : 0; return mgp_error::MGP_ERROR_NO_ERROR; } @@ -2313,7 +2313,7 @@ void mgp_edge_destroy(mgp_edge *e) { DeleteRawMgpObject(e); } mgp_error mgp_edge_equal(mgp_edge *e1, mgp_edge *e2, int *result) { // NOLINTNEXTLINE(clang-diagnostic-unevaluated-expression) - static_assert(noexcept(*result = *e1 == *e2 ? 1 : 0)); + static_assert(noexcept(*e1 == *e2)); *result = *e1 == *e2 ? 1 : 0; return mgp_error::MGP_ERROR_NO_ERROR; } diff --git a/src/replication/CMakeLists.txt b/src/replication/CMakeLists.txt new file mode 100644 index 000000000..4170cd6a8 --- /dev/null +++ b/src/replication/CMakeLists.txt @@ -0,0 +1,24 @@ +add_library(mg-replication STATIC) +add_library(mg::replication ALIAS mg-replication) +target_sources(mg-replication + PUBLIC + include/replication/state.hpp + include/replication/epoch.hpp + include/replication/config.hpp + include/replication/mode.hpp + include/replication/role.hpp + include/replication/status.hpp + + PRIVATE + state.cpp + epoch.cpp + config.cpp + status.cpp +) +target_include_directories(mg-replication PUBLIC include) + +find_package(fmt REQUIRED) +target_link_libraries(mg-replication + PUBLIC mg::utils mg::kvstore lib::json + PRIVATE fmt::fmt +) diff --git a/src/replication/config.cpp b/src/replication/config.cpp new file mode 100644 index 000000000..c4bba4341 --- /dev/null +++ b/src/replication/config.cpp @@ -0,0 +1,11 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#include "replication/config.hpp" diff --git a/src/replication/epoch.cpp b/src/replication/epoch.cpp new file mode 100644 index 000000000..8561e8129 --- /dev/null +++ b/src/replication/epoch.cpp @@ -0,0 +1,12 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "replication/epoch.hpp" diff --git a/src/storage/v2/replication/config.hpp b/src/replication/include/replication/config.hpp similarity index 80% rename from src/storage/v2/replication/config.hpp rename to src/replication/include/replication/config.hpp index d96e1ee76..975fb6b4d 100644 --- a/src/storage/v2/replication/config.hpp +++ b/src/replication/include/replication/config.hpp @@ -12,12 +12,17 @@ #pragma once #include +#include #include #include +#include "replication/mode.hpp" -#include "storage/v2/replication/enums.hpp" +namespace memgraph::replication { + +inline constexpr uint16_t kDefaultReplicationPort = 10000; +inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; +inline constexpr auto *kReservedReplicationRoleName{"__replication_role"}; -namespace memgraph::storage::replication { struct ReplicationClientConfig { std::string name; ReplicationMode mode; @@ -51,4 +56,4 @@ struct ReplicationServerConfig { std::optional ssl; }; -} // namespace memgraph::storage::replication +} // namespace memgraph::replication diff --git a/src/replication/include/replication/epoch.hpp b/src/replication/include/replication/epoch.hpp new file mode 100644 index 000000000..9cf04d11c --- /dev/null +++ b/src/replication/include/replication/epoch.hpp @@ -0,0 +1,49 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once +#include +#include + +#include "utils/uuid.hpp" + +namespace memgraph::replication { + +struct ReplicationEpoch { + ReplicationEpoch() : id_(memgraph::utils::GenerateUUID()) {} + ReplicationEpoch(ReplicationEpoch const &) = delete; + ReplicationEpoch(ReplicationEpoch &&) = delete; + ReplicationEpoch &operator=(ReplicationEpoch const &) = delete; + ReplicationEpoch &operator=(ReplicationEpoch &&) = delete; + + auto id() const -> std::string_view { return id_; } + + auto NewEpoch() -> std::string { return std::exchange(id_, memgraph::utils::GenerateUUID()); } + auto SetEpoch(std::string new_epoch) -> std::string { return std::exchange(id_, std::move(new_epoch)); } + + private: + // UUID to distinguish different main instance runs for replication process + // on SAME storage. + // Multiple instances can have same storage UUID and be MAIN at the same time. + // We cannot compare commit timestamps of those instances if one of them + // becomes the replica of the other so we use epoch_id_ as additional + // discriminating property. + // Example of this: + // We have 2 instances of the same storage, S1 and S2. + // S1 and S2 are MAIN and accept their own commits and write them to the WAL. + // At the moment when S1 commited a transaction with timestamp 20, and S2 + // a different transaction with timestamp 15, we change S2's role to REPLICA + // and register it on S1. + // Without using the epoch_id, we don't know that S1 and S2 have completely + // different transactions, we think that the S2 is behind only by 5 commits. + std::string id_; +}; +} // namespace memgraph::replication diff --git a/src/replication/include/replication/mode.hpp b/src/replication/include/replication/mode.hpp new file mode 100644 index 000000000..c1afe2b1f --- /dev/null +++ b/src/replication/include/replication/mode.hpp @@ -0,0 +1,16 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once +#include +namespace memgraph::replication { +enum class ReplicationMode : std::uint8_t { SYNC, ASYNC }; +} diff --git a/src/replication/include/replication/role.hpp b/src/replication/include/replication/role.hpp new file mode 100644 index 000000000..54fbab9f0 --- /dev/null +++ b/src/replication/include/replication/role.hpp @@ -0,0 +1,18 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +namespace memgraph::replication { + +enum class ReplicationRole : uint8_t { MAIN, REPLICA }; +} diff --git a/src/replication/include/replication/state.hpp b/src/replication/include/replication/state.hpp new file mode 100644 index 000000000..ac5b2841c --- /dev/null +++ b/src/replication/include/replication/state.hpp @@ -0,0 +1,69 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include +#include + +#include "kvstore/kvstore.hpp" +#include "replication/config.hpp" +#include "replication/epoch.hpp" +#include "replication/mode.hpp" +#include "replication/role.hpp" +#include "utils/result.hpp" + +namespace memgraph::replication { + +enum class RolePersisted : uint8_t { UNKNOWN_OR_NO, YES }; + +struct ReplicationState { + ReplicationState(std::optional durability_dir); + + ReplicationState(ReplicationState const &) = delete; + ReplicationState(ReplicationState &&) = delete; + ReplicationState &operator=(ReplicationState const &) = delete; + ReplicationState &operator=(ReplicationState &&) = delete; + + void SetRole(ReplicationRole role) { return replication_role_.store(role); } + auto GetRole() const -> ReplicationRole { return replication_role_.load(); } + bool IsMain() const { return replication_role_ == ReplicationRole::MAIN; } + bool IsReplica() const { return replication_role_ == ReplicationRole::REPLICA; } + + auto GetEpoch() const -> const ReplicationEpoch & { return epoch_; } + auto GetEpoch() -> ReplicationEpoch & { return epoch_; } + + enum class FetchReplicationError : uint8_t { + NOTHING_FETCHED, + PARSE_ERROR, + }; + using ReplicationDataReplica = ReplicationServerConfig; + using ReplicationDataMain = std::vector; + using ReplicationData = std::variant; + using FetchReplicationResult = utils::BasicResult; + auto FetchReplicationData() -> FetchReplicationResult; + + bool ShouldPersist() const { return nullptr != durability_; } + bool TryPersistRoleMain(); + bool TryPersistRoleReplica(const ReplicationServerConfig &config); + bool TryPersistUnregisterReplica(std::string_view &name); + bool TryPersistRegisteredReplica(const ReplicationClientConfig &config); + + private: + ReplicationEpoch epoch_; + std::atomic replication_role_{ReplicationRole::MAIN}; + std::unique_ptr durability_; + std::atomic role_persisted = RolePersisted::UNKNOWN_OR_NO; +}; + +} // namespace memgraph::replication diff --git a/src/storage/v2/replication/replication_persistence_helper.hpp b/src/replication/include/replication/status.hpp similarity index 72% rename from src/storage/v2/replication/replication_persistence_helper.hpp rename to src/replication/include/replication/status.hpp index df6cb03b9..b158cf919 100644 --- a/src/storage/v2/replication/replication_persistence_helper.hpp +++ b/src/replication/include/replication/status.hpp @@ -12,21 +12,16 @@ #pragma once #include -#include +#include #include #include -#include +#include "json/json.hpp" -#include "storage/v2/replication/config.hpp" -#include "storage/v2/replication/enums.hpp" - -namespace memgraph::storage::replication { - -inline constexpr auto *kReservedReplicationRoleName{"__replication_role"}; -inline constexpr uint16_t kDefaultReplicationPort = 10000; -inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; +#include "replication/config.hpp" +#include "replication/role.hpp" +namespace memgraph::replication { struct ReplicationStatus { std::string name; std::string ip_address; @@ -40,6 +35,5 @@ struct ReplicationStatus { }; nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status); - std::optional JSONToReplicationStatus(nlohmann::json &&data); -} // namespace memgraph::storage::replication +} // namespace memgraph::replication diff --git a/src/replication/state.cpp b/src/replication/state.cpp new file mode 100644 index 000000000..45984d317 --- /dev/null +++ b/src/replication/state.cpp @@ -0,0 +1,144 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "replication/state.hpp" + +#include "replication/status.hpp" //TODO: don't use status for durability +#include "utils/file.hpp" + +constexpr auto kReplicationDirectory = std::string_view{"replication"}; + +namespace memgraph::replication { + +ReplicationState::ReplicationState(std::optional durability_dir) { + if (!durability_dir) return; + auto repl_dir = *std::move(durability_dir); + repl_dir /= kReplicationDirectory; + utils::EnsureDirOrDie(repl_dir); + durability_ = std::make_unique(std::move(repl_dir)); +} +bool ReplicationState::TryPersistRoleReplica(const ReplicationServerConfig &config) { + if (!ShouldPersist()) return true; + // Only thing that matters here is the role saved as REPLICA and the listening port + auto data = ReplicationStatusToJSON(ReplicationStatus{.name = kReservedReplicationRoleName, + .ip_address = config.ip_address, + .port = config.port, + .sync_mode = ReplicationMode::SYNC, + .replica_check_frequency = std::chrono::seconds(0), + .ssl = std::nullopt, + .role = ReplicationRole::REPLICA}); + + if (durability_->Put(kReservedReplicationRoleName, data.dump())) { + role_persisted = RolePersisted::YES; + return true; + } + spdlog::error("Error when saving REPLICA replication role in settings."); + return false; +} +bool ReplicationState::TryPersistRoleMain() { + if (!ShouldPersist()) return true; + // Only thing that matters here is the role saved as MAIN + auto data = ReplicationStatusToJSON(ReplicationStatus{.name = kReservedReplicationRoleName, + .ip_address = "", + .port = 0, + .sync_mode = ReplicationMode::SYNC, + .replica_check_frequency = std::chrono::seconds(0), + .ssl = std::nullopt, + .role = ReplicationRole::MAIN}); + + if (durability_->Put(kReservedReplicationRoleName, data.dump())) { + role_persisted = RolePersisted::YES; + return true; + } + spdlog::error("Error when saving MAIN replication role in settings."); + return false; +} +bool ReplicationState::TryPersistUnregisterReplica(std::string_view &name) { + if (!ShouldPersist()) return true; + if (durability_->Delete(name)) return true; + spdlog::error("Error when removing replica {} from settings.", name); + return false; +} +auto ReplicationState::FetchReplicationData() -> FetchReplicationResult { + if (!ShouldPersist()) return FetchReplicationError::NOTHING_FETCHED; + const auto replication_data = durability_->Get(kReservedReplicationRoleName); + if (!replication_data.has_value()) { + return FetchReplicationError::NOTHING_FETCHED; + } + + const auto maybe_replication_status = JSONToReplicationStatus(nlohmann::json::parse(*replication_data)); + if (!maybe_replication_status.has_value()) { + return FetchReplicationError::PARSE_ERROR; + } + + // To get here this must be the case + role_persisted = memgraph::replication::RolePersisted::YES; + + const auto replication_status = *maybe_replication_status; + auto role = replication_status.role.value_or(ReplicationRole::MAIN); + switch (role) { + case ReplicationRole::REPLICA: { + return {ReplicationServerConfig{ + .ip_address = kDefaultReplicationServerIp, + .port = replication_status.port, + }}; + } + case ReplicationRole::MAIN: { + auto res = ReplicationState::ReplicationDataMain{}; + res.reserve(durability_->Size() - 1); + for (const auto &[replica_name, replica_data] : *durability_) { + if (replica_name == kReservedReplicationRoleName) { + continue; + } + + const auto maybe_replica_status = JSONToReplicationStatus(nlohmann::json::parse(replica_data)); + if (!maybe_replica_status.has_value()) { + return FetchReplicationError::PARSE_ERROR; + } + + auto replica_status = *maybe_replica_status; + if (replica_status.name != replica_name) { + return FetchReplicationError::PARSE_ERROR; + } + res.emplace_back(ReplicationClientConfig{ + .name = replica_status.name, + .mode = replica_status.sync_mode, + .ip_address = replica_status.ip_address, + .port = replica_status.port, + .replica_check_frequency = replica_status.replica_check_frequency, + .ssl = replica_status.ssl, + }); + } + return {std::move(res)}; + } + } +} +bool ReplicationState::TryPersistRegisteredReplica(const ReplicationClientConfig &config) { + if (!ShouldPersist()) return true; + + // If any replicas are persisted then Role must be persisted + if (role_persisted != RolePersisted::YES) { + DMG_ASSERT(IsMain(), "MAIN is expected"); + if (!TryPersistRoleMain()) return false; + } + + auto data = ReplicationStatusToJSON(ReplicationStatus{.name = config.name, + .ip_address = config.ip_address, + .port = config.port, + .sync_mode = config.mode, + .replica_check_frequency = config.replica_check_frequency, + .ssl = config.ssl, + .role = ReplicationRole::REPLICA}); + if (durability_->Put(config.name, data.dump())) return true; + spdlog::error("Error when saving replica {} in settings.", config.name); + return false; +} +} // namespace memgraph::replication diff --git a/src/storage/v2/replication/replication_persistence_helper.cpp b/src/replication/status.cpp similarity index 76% rename from src/storage/v2/replication/replication_persistence_helper.cpp rename to src/replication/status.cpp index 262938a82..711b1f955 100644 --- a/src/storage/v2/replication/replication_persistence_helper.cpp +++ b/src/replication/status.cpp @@ -8,24 +8,21 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include "replication/status.hpp" -#include "storage/v2/replication/replication_persistence_helper.hpp" - -#include "storage/v2/replication/enums.hpp" +#include "fmt/format.h" #include "utils/logging.hpp" -namespace { -inline constexpr auto *kReplicaName = "replica_name"; -inline constexpr auto *kIpAddress = "replica_ip_address"; -inline constexpr auto *kPort = "replica_port"; -inline constexpr auto *kSyncMode = "replica_sync_mode"; -inline constexpr auto *kCheckFrequency = "replica_check_frequency"; -inline constexpr auto *kSSLKeyFile = "replica_ssl_key_file"; -inline constexpr auto *kSSLCertFile = "replica_ssl_cert_file"; -inline constexpr auto *kReplicationRole = "replication_role"; -} // namespace +constexpr auto *kReplicaName = "replica_name"; +constexpr auto *kIpAddress = "replica_ip_address"; +constexpr auto *kPort = "replica_port"; +constexpr auto *kSyncMode = "replica_sync_mode"; +constexpr auto *kCheckFrequency = "replica_check_frequency"; +constexpr auto *kSSLKeyFile = "replica_ssl_key_file"; +constexpr auto *kSSLCertFile = "replica_ssl_cert_file"; +constexpr auto *kReplicationRole = "replication_role"; -namespace memgraph::storage::replication { +namespace memgraph::replication { nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status) { auto data = nlohmann::json::object(); @@ -51,7 +48,6 @@ nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status) { return data; } - std::optional JSONToReplicationStatus(nlohmann::json &&data) { ReplicationStatus replica_status; @@ -73,13 +69,13 @@ std::optional JSONToReplicationStatus(nlohmann::json &&data) MG_ASSERT(key_file.is_null() == cert_file.is_null()); if (!key_file.is_null()) { - replica_status.ssl = replication::ReplicationClientConfig::SSL{}; + replica_status.ssl = ReplicationClientConfig::SSL{}; data.at(kSSLKeyFile).get_to(replica_status.ssl->key_file); data.at(kSSLCertFile).get_to(replica_status.ssl->cert_file); } if (data.find(kReplicationRole) != data.end()) { - replica_status.role = replication::ReplicationRole::MAIN; + replica_status.role = ReplicationRole::MAIN; data.at(kReplicationRole).get_to(replica_status.role.value()); } } catch (const nlohmann::json::type_error &exception) { @@ -92,4 +88,4 @@ std::optional JSONToReplicationStatus(nlohmann::json &&data) return replica_status; } -} // namespace memgraph::storage::replication +} // namespace memgraph::replication diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index 7c9c567ed..a6eaebe24 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -36,13 +36,13 @@ add_library(mg-storage-v2 STATIC replication/replication_server.cpp replication/serialization.cpp replication/slk.cpp - replication/replication_persistence_helper.cpp replication/rpc.cpp - replication/replication.cpp + replication/replication_storage_state.cpp + replication/replication_handler.cpp inmemory/replication/replication_server.cpp inmemory/replication/replication_client.cpp ) -target_link_libraries(mg-storage-v2 Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk) +target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk) # Until we get LTO there is an advantage to do some unity builds set_target_properties(mg-storage-v2 diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index 0ff76710f..af5f24d5f 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -346,19 +346,21 @@ class DiskStorage final : public Storage { void FreeMemory(std::unique_lock /*lock*/) override {} - void EstablishNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); } + void PrepareForNewEpoch(std::string prev_epoch) override { + throw utils::BasicException("Disk storage mode does not support replication."); + } uint64_t CommitTimestamp(std::optional desired_commit_timestamp = {}); EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE}; std::unique_ptr edge_import_mode_cache_{nullptr}; - auto CreateReplicationClient(replication::ReplicationClientConfig const &config) + auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config) -> std::unique_ptr override { throw utils::BasicException("Disk storage mode does not support replication."); } - auto CreateReplicationServer(const replication::ReplicationServerConfig &config) + auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig &config) -> std::unique_ptr override { throw utils::BasicException("Disk storage mode does not support replication."); } diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index e3ba88026..aa0730e6f 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -24,6 +24,7 @@ #include #include +#include "replication/epoch.hpp" #include "storage/v2/durability/paths.hpp" #include "storage/v2/durability/snapshot.hpp" #include "storage/v2/durability/wal.hpp" @@ -210,7 +211,7 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_ std::optional RecoverData(const std::filesystem::path &snapshot_directory, const std::filesystem::path &wal_directory, std::string *uuid, - std::string *epoch_id, + memgraph::replication::ReplicationEpoch &epoch, std::deque> *epoch_history, utils::SkipList *vertices, utils::SkipList *edges, std::atomic *edge_count, NameIdMapper *name_id_mapper, @@ -263,7 +264,7 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di recovery_info = recovered_snapshot->recovery_info; indices_constraints = std::move(recovered_snapshot->indices_constraints); snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp; - *epoch_id = std::move(recovered_snapshot->snapshot_info.epoch_id); + epoch.SetEpoch(std::move(recovered_snapshot->snapshot_info.epoch_id)); if (!utils::DirExists(wal_directory)) { const auto par_exec_info = config.durability.allow_parallel_index_creation @@ -308,7 +309,7 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di // UUID used for durability is the UUID of the last WAL file. // Same for the epoch id. *uuid = std::move(wal_files.back().uuid); - *epoch_id = std::move(wal_files.back().epoch_id); + epoch.SetEpoch(std::move(wal_files.back().epoch_id)); } auto maybe_wal_files = GetWalFiles(wal_directory, *uuid); @@ -364,7 +365,7 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di } previous_seq_num = wal_file.seq_num; - if (wal_file.epoch_id != *epoch_id) { + if (wal_file.epoch_id != epoch.id()) { // This way we skip WALs finalized only because of role change. // We can also set the last timestamp to 0 if last loaded timestamp // is nullopt as this can only happen if the WAL file with seq = 0 @@ -372,7 +373,7 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di if (last_loaded_timestamp) { epoch_history->emplace_back(wal_file.epoch_id, *last_loaded_timestamp); } - *epoch_id = std::move(wal_file.epoch_id); + epoch.SetEpoch(std::move(wal_file.epoch_id)); } try { auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper, diff --git a/src/storage/v2/durability/durability.hpp b/src/storage/v2/durability/durability.hpp index 99efde6b6..b68e18071 100644 --- a/src/storage/v2/durability/durability.hpp +++ b/src/storage/v2/durability/durability.hpp @@ -18,6 +18,7 @@ #include #include +#include "replication/epoch.hpp" #include "storage/v2/config.hpp" #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/durability/metadata.hpp" @@ -109,7 +110,7 @@ void RecoverIndicesAndConstraints( /// @throw std::bad_alloc std::optional RecoverData(const std::filesystem::path &snapshot_directory, const std::filesystem::path &wal_directory, std::string *uuid, - std::string *epoch_id, + memgraph::replication::ReplicationEpoch &epoch, std::deque> *epoch_history, utils::SkipList *vertices, utils::SkipList *edges, std::atomic *edge_count, NameIdMapper *name_id_mapper, diff --git a/src/storage/v2/durability/paths.hpp b/src/storage/v2/durability/paths.hpp index 557e4daa9..7e3083937 100644 --- a/src/storage/v2/durability/paths.hpp +++ b/src/storage/v2/durability/paths.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -22,7 +22,6 @@ static const std::string kSnapshotDirectory{"snapshots"}; static const std::string kWalDirectory{"wal"}; static const std::string kBackupDirectory{".backup"}; static const std::string kLockFile{".lock"}; -static const std::string kReplicationDirectory{"replication"}; // This is the prefix used for Snapshot and WAL filenames. It is a timestamp // format that equals to: YYYYmmddHHMMSSffffff diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index 4e97e83d2..6e173fb06 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -1714,7 +1714,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps const std::filesystem::path &wal_directory, uint64_t snapshot_retention_count, utils::SkipList *vertices, utils::SkipList *edges, NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, const Config &config, const std::string &uuid, - const std::string_view epoch_id, const std::deque> &epoch_history, + const memgraph::replication::ReplicationEpoch &epoch, + const std::deque> &epoch_history, utils::FileRetainer *file_retainer) { // Ensure that the storage directory exists. utils::EnsureDirOrDie(snapshot_directory); @@ -2044,7 +2045,7 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps offset_metadata = snapshot.GetPosition(); snapshot.WriteMarker(Marker::SECTION_METADATA); snapshot.WriteString(uuid); - snapshot.WriteString(epoch_id); + snapshot.WriteString(epoch.id()); snapshot.WriteUint(transaction->start_timestamp); snapshot.WriteUint(edges_count); snapshot.WriteUint(vertices_count); diff --git a/src/storage/v2/durability/snapshot.hpp b/src/storage/v2/durability/snapshot.hpp index 2f16088a0..ac4fe5b1f 100644 --- a/src/storage/v2/durability/snapshot.hpp +++ b/src/storage/v2/durability/snapshot.hpp @@ -15,6 +15,7 @@ #include #include +#include "replication/epoch.hpp" #include "storage/v2/config.hpp" #include "storage/v2/constraints/constraints.hpp" #include "storage/v2/durability/metadata.hpp" @@ -71,7 +72,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps const std::filesystem::path &wal_directory, uint64_t snapshot_retention_count, utils::SkipList *vertices, utils::SkipList *edges, NameIdMapper *name_id_mapper, Indices *indices, Constraints *constraints, const Config &config, const std::string &uuid, - std::string_view epoch_id, const std::deque> &epoch_history, + const memgraph::replication::ReplicationEpoch &epoch, + const std::deque> &epoch_history, utils::FileRetainer *file_retainer); } // namespace memgraph::storage::durability diff --git a/src/storage/v2/inmemory/replication/replication_client.cpp b/src/storage/v2/inmemory/replication/replication_client.cpp index bfd272a9b..48aeafd7d 100644 --- a/src/storage/v2/inmemory/replication/replication_client.cpp +++ b/src/storage/v2/inmemory/replication/replication_client.cpp @@ -103,8 +103,9 @@ uint64_t ReplicateCurrentWal(CurrentWalHandler &stream, durability::WalFile cons ////// ReplicationClient ////// InMemoryReplicationClient::InMemoryReplicationClient(InMemoryStorage *storage, - const replication::ReplicationClientConfig &config) - : ReplicationClient{storage, config} {} + const memgraph::replication::ReplicationClientConfig &config, + const memgraph::replication::ReplicationEpoch *epoch) + : ReplicationClient{storage, config, epoch} {} void InMemoryReplicationClient::RecoverReplica(uint64_t replica_commit) { spdlog::debug("Starting replica recover"); diff --git a/src/storage/v2/inmemory/replication/replication_client.hpp b/src/storage/v2/inmemory/replication/replication_client.hpp index 94f47cb48..e956838e7 100644 --- a/src/storage/v2/inmemory/replication/replication_client.hpp +++ b/src/storage/v2/inmemory/replication/replication_client.hpp @@ -18,7 +18,8 @@ class InMemoryStorage; class InMemoryReplicationClient : public ReplicationClient { public: - InMemoryReplicationClient(InMemoryStorage *storage, const replication::ReplicationClientConfig &config); + InMemoryReplicationClient(InMemoryStorage *storage, const memgraph::replication::ReplicationClientConfig &config, + const memgraph::replication::ReplicationEpoch *epoch); protected: void RecoverReplica(uint64_t replica_commit) override; diff --git a/src/storage/v2/inmemory/replication/replication_server.cpp b/src/storage/v2/inmemory/replication/replication_server.cpp index a2ae75391..0f7669d5a 100644 --- a/src/storage/v2/inmemory/replication/replication_server.cpp +++ b/src/storage/v2/inmemory/replication/replication_server.cpp @@ -34,8 +34,9 @@ std::pair ReadDelta(durability::BaseDecoder } // namespace InMemoryReplicationServer::InMemoryReplicationServer(InMemoryStorage *storage, - const replication::ReplicationServerConfig &config) - : ReplicationServer{config}, storage_(storage) { + const memgraph::replication::ReplicationServerConfig &config, + memgraph::replication::ReplicationEpoch *repl_epoch) + : ReplicationServer{config}, storage_(storage), repl_epoch_{repl_epoch} { rpc_server_.Register([this](auto *req_reader, auto *res_builder) { spdlog::debug("Received HeartbeatRpc"); this->HeartbeatHandler(req_reader, res_builder); @@ -66,8 +67,8 @@ InMemoryReplicationServer::InMemoryReplicationServer(InMemoryStorage *storage, void InMemoryReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { replication::HeartbeatReq req; slk::Load(&req, req_reader); - replication::HeartbeatRes res{true, storage_->replication_state_.last_commit_timestamp_.load(), - storage_->replication_state_.GetEpoch().id}; + replication::HeartbeatRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load(), + std::string{repl_epoch_->id()}}; slk::Save(res, res_builder); } @@ -80,13 +81,14 @@ void InMemoryReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk auto maybe_epoch_id = decoder.ReadString(); MG_ASSERT(maybe_epoch_id, "Invalid replication message"); - if (*maybe_epoch_id != storage_->replication_state_.GetEpoch().id) { - storage_->replication_state_.AppendEpoch(*maybe_epoch_id); + auto &repl_storage_state = storage_->repl_storage_state_; + if (*maybe_epoch_id != repl_epoch_->id()) { + auto prev_epoch = repl_epoch_->SetEpoch(*maybe_epoch_id); + repl_storage_state.AddEpochToHistoryForce(prev_epoch); } if (storage_->wal_file_) { - if (req.seq_num > storage_->wal_file_->SequenceNumber() || - *maybe_epoch_id != storage_->replication_state_.GetEpoch().id) { + if (req.seq_num > storage_->wal_file_->SequenceNumber() || *maybe_epoch_id != repl_epoch_->id()) { storage_->wal_file_->FinalizeWal(); storage_->wal_file_.reset(); storage_->wal_seq_num_ = req.seq_num; @@ -99,7 +101,7 @@ void InMemoryReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk storage_->wal_seq_num_ = req.seq_num; } - if (req.previous_commit_timestamp != storage_->replication_state_.last_commit_timestamp_.load()) { + if (req.previous_commit_timestamp != repl_storage_state.last_commit_timestamp_.load()) { // Empty the stream bool transaction_complete = false; while (!transaction_complete) { @@ -109,7 +111,7 @@ void InMemoryReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk delta.type, durability::kVersion); // TODO: Check if we are always using the latest version when replicating } - replication::AppendDeltasRes res{false, storage_->replication_state_.last_commit_timestamp_.load()}; + replication::AppendDeltasRes res{false, repl_storage_state.last_commit_timestamp_.load()}; slk::Save(res, res_builder); return; } @@ -117,7 +119,7 @@ void InMemoryReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk ReadAndApplyDelta(storage_, &decoder, durability::kVersion); // TODO: Check if we are always using the latest version when replicating - replication::AppendDeltasRes res{true, storage_->replication_state_.last_commit_timestamp_.load()}; + replication::AppendDeltasRes res{true, repl_storage_state.last_commit_timestamp_.load()}; slk::Save(res, res_builder); spdlog::debug("Replication recovery from append deltas finished, replica is now up to date!"); } @@ -147,17 +149,14 @@ void InMemoryReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Bu std::make_unique(&storage_->indices_, storage_->config_); try { spdlog::debug("Loading snapshot"); - auto &epoch = - storage_->replication_state_ - .GetEpoch(); // This needs to be a non-const ref since we are updating it in LoadSnapshot TODO fix auto recovered_snapshot = durability::LoadSnapshot( - *maybe_snapshot_path, &storage_->vertices_, &storage_->edges_, &storage_->replication_state_.history, + *maybe_snapshot_path, &storage_->vertices_, &storage_->edges_, &storage_->repl_storage_state_.history, storage_->name_id_mapper_.get(), &storage_->edge_count_, storage_->config_); spdlog::debug("Snapshot loaded successfully"); // If this step is present it should always be the first step of // the recovery so we use the UUID we read from snasphost storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid); - epoch.id = std::move(recovered_snapshot.snapshot_info.epoch_id); + repl_epoch_->SetEpoch(std::move(recovered_snapshot.snapshot_info.epoch_id)); const auto &recovery_info = recovered_snapshot.recovery_info; storage_->vertex_id_ = recovery_info.next_vertex_id; storage_->edge_id_ = recovery_info.next_edge_id; @@ -171,7 +170,7 @@ void InMemoryReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Bu } storage_guard.unlock(); - replication::SnapshotRes res{true, storage_->replication_state_.last_commit_timestamp_.load()}; + replication::SnapshotRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load()}; slk::Save(res, res_builder); spdlog::trace("Deleting old snapshot files due to snapshot recovery."); @@ -209,10 +208,10 @@ void InMemoryReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Bu utils::EnsureDirOrDie(storage_->wal_directory_); for (auto i = 0; i < wal_file_number; ++i) { - LoadWal(storage_, &decoder); + LoadWal(storage_, *repl_epoch_, &decoder); } - replication::WalFilesRes res{true, storage_->replication_state_.last_commit_timestamp_.load()}; + replication::WalFilesRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load()}; slk::Save(res, res_builder); spdlog::debug("Replication recovery from WAL files ended successfully, replica is now up to date!"); } @@ -225,14 +224,15 @@ void InMemoryReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk:: utils::EnsureDirOrDie(storage_->wal_directory_); - LoadWal(storage_, &decoder); + LoadWal(storage_, *repl_epoch_, &decoder); - replication::CurrentWalRes res{true, storage_->replication_state_.last_commit_timestamp_.load()}; + replication::CurrentWalRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load()}; slk::Save(res, res_builder); spdlog::debug("Replication recovery from current WAL ended successfully, replica is now up to date!"); } -void InMemoryReplicationServer::LoadWal(InMemoryStorage *storage, replication::Decoder *decoder) { +void InMemoryReplicationServer::LoadWal(InMemoryStorage *storage, memgraph::replication::ReplicationEpoch &epoch, + replication::Decoder *decoder) { const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory; utils::EnsureDir(temp_wal_directory); auto maybe_wal_path = decoder->ReadFile(temp_wal_directory); @@ -244,8 +244,9 @@ void InMemoryReplicationServer::LoadWal(InMemoryStorage *storage, replication::D storage->uuid_ = wal_info.uuid; } - if (wal_info.epoch_id != storage->replication_state_.GetEpoch().id) { - storage->replication_state_.AppendEpoch(wal_info.epoch_id); + if (wal_info.epoch_id != epoch.id()) { + auto prev_epoch = epoch.SetEpoch(wal_info.epoch_id); + storage->repl_storage_state_.AddEpochToHistoryForce(prev_epoch); } if (storage->wal_file_) { @@ -280,7 +281,7 @@ void InMemoryReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::B replication::TimestampReq req; slk::Load(&req, req_reader); - replication::TimestampRes res{true, storage_->replication_state_.last_commit_timestamp_.load()}; + replication::TimestampRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load()}; slk::Save(res, res_builder); } @@ -311,7 +312,7 @@ uint64_t InMemoryReplicationServer::ReadAndApplyDelta(InMemoryStorage *storage, }; uint64_t applied_deltas = 0; - auto max_commit_timestamp = storage->replication_state_.last_commit_timestamp_.load(); + auto max_commit_timestamp = storage->repl_storage_state_.last_commit_timestamp_.load(); for (bool transaction_complete = false; !transaction_complete; ++applied_deltas) { const auto [timestamp, delta] = ReadDelta(decoder); @@ -621,7 +622,7 @@ uint64_t InMemoryReplicationServer::ReadAndApplyDelta(InMemoryStorage *storage, if (commit_timestamp_and_accessor) throw utils::BasicException("Did not finish the transaction!"); - storage->replication_state_.last_commit_timestamp_ = max_commit_timestamp; + storage->repl_storage_state_.last_commit_timestamp_ = max_commit_timestamp; spdlog::debug("Applied {} deltas", applied_deltas); return applied_deltas; diff --git a/src/storage/v2/inmemory/replication/replication_server.hpp b/src/storage/v2/inmemory/replication/replication_server.hpp index 750e090e7..697108a0a 100644 --- a/src/storage/v2/inmemory/replication/replication_server.hpp +++ b/src/storage/v2/inmemory/replication/replication_server.hpp @@ -11,6 +11,7 @@ #pragma once +#include "replication/epoch.hpp" #include "storage/v2/replication/replication_server.hpp" #include "storage/v2/replication/serialization.hpp" @@ -20,7 +21,9 @@ class InMemoryStorage; class InMemoryReplicationServer : public ReplicationServer { public: - explicit InMemoryReplicationServer(InMemoryStorage *storage, const replication::ReplicationServerConfig &config); + explicit InMemoryReplicationServer(InMemoryStorage *storage, + const memgraph::replication::ReplicationServerConfig &config, + memgraph::replication::ReplicationEpoch *repl_epoch); private: // RPC handlers @@ -36,11 +39,14 @@ class InMemoryReplicationServer : public ReplicationServer { void TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder); - static void LoadWal(InMemoryStorage *storage, replication::Decoder *decoder); + static void LoadWal(InMemoryStorage *storage, memgraph::replication::ReplicationEpoch &epoch, + replication::Decoder *decoder); static uint64_t ReadAndApplyDelta(InMemoryStorage *storage, durability::BaseDecoder *decoder, uint64_t version); InMemoryStorage *storage_; + + memgraph::replication::ReplicationEpoch *repl_epoch_; }; } // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 66e425fd3..63acba07a 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -18,6 +18,7 @@ #include "storage/v2/inmemory/replication/replication_client.hpp" #include "storage/v2/inmemory/replication/replication_server.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" +#include "storage/v2/replication/replication_handler.hpp" #include "utils/resource_lock.hpp" namespace memgraph::storage { @@ -58,17 +59,18 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) "process!", config_.durability.storage_directory); } + auto &repl_state = repl_state_; if (config_.durability.recover_on_startup) { - auto &epoch = replication_state_.GetEpoch(); - auto info = durability::RecoverData(snapshot_directory_, wal_directory_, &uuid_, &epoch.id, - &replication_state_.history, &vertices_, &edges_, &edge_count_, + auto &epoch = repl_state.GetEpoch(); + auto info = durability::RecoverData(snapshot_directory_, wal_directory_, &uuid_, epoch, + &repl_storage_state_.history, &vertices_, &edges_, &edge_count_, name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_); if (info) { vertex_id_ = info->next_vertex_id; edge_id_ = info->next_edge_id; timestamp_ = std::max(timestamp_, info->next_timestamp); if (info->last_commit_timestamp) { - replication_state_.last_commit_timestamp_ = *info->last_commit_timestamp; + repl_storage_state_.last_commit_timestamp_ = *info->last_commit_timestamp; } } } else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED || @@ -102,7 +104,8 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) } if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) { snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] { - if (auto maybe_error = this->CreateSnapshot({true}); maybe_error.HasError()) { + auto const &repl_state = repl_state_; + if (auto maybe_error = this->CreateSnapshot(repl_state, {true}); maybe_error.HasError()) { switch (maybe_error.GetError()) { case CreateSnapshotError::DisabledForReplica: spdlog::warn( @@ -131,19 +134,14 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) if (config_.durability.restore_replication_state_on_startup) { spdlog::info("Replication configuration will be stored and will be automatically restored in case of a crash."); - RestoreReplicationRole(); - - if (replication_state_.GetRole() == replication::ReplicationRole::MAIN) { - RestoreReplicas(); - } + ReplicationHandler{repl_state, *this}.RestoreReplication(); } else { spdlog::warn( "Replication configuration will NOT be stored. When the server restarts, replication state will be " "forgotten."); } - if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED && - replication_state_.GetRole() == replication::ReplicationRole::MAIN) { + if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED && repl_state.IsMain()) { spdlog::warn( "The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider " "enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because " @@ -158,8 +156,8 @@ InMemoryStorage::~InMemoryStorage() { gc_runner_.Stop(); } { - // Clear replication data - replication_state_.Reset(); + // Stop replication (Stop all clients or stop the REPLICA server) + repl_storage_state_.Reset(); } if (wal_file_) { wal_file_->FinalizeWal(); @@ -169,7 +167,8 @@ InMemoryStorage::~InMemoryStorage() { snapshot_runner_.Stop(); } if (config_.durability.snapshot_on_exit) { - if (auto maybe_error = this->CreateSnapshot({false}); maybe_error.HasError()) { + auto const &repl_state = repl_state_; + if (auto maybe_error = this->CreateSnapshot(repl_state, {false}); maybe_error.HasError()) { switch (maybe_error.GetError()) { case CreateSnapshotError::DisabledForReplica: spdlog::warn(utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication")); @@ -653,6 +652,7 @@ utils::BasicResult InMemoryStorage::InMemoryAcce auto *mem_storage = static_cast(storage_); + auto const &replState = mem_storage->repl_state_; if (!transaction_.md_deltas.empty()) { // This is usually done by the MVCC, but it does not handle the metadata deltas transaction_.EnsureCommitTimestampExists(); @@ -672,8 +672,7 @@ utils::BasicResult InMemoryStorage::InMemoryAcce // modifications before they are written to disk. // Replica can log only the write transaction received from Main // so the Wal files are consistent - if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN || - desired_commit_timestamp.has_value()) { + if (replState.IsMain() || desired_commit_timestamp.has_value()) { could_replicate_all_sync_replicas = mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_); // Take committed_transactions lock while holding the engine lock to // make sure that committed transactions are sorted by the commit @@ -685,10 +684,9 @@ utils::BasicResult InMemoryStorage::InMemoryAcce transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // Replica can only update the last commit timestamp with // the commits received from main. - if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN || - desired_commit_timestamp.has_value()) { + if (replState.IsMain() || desired_commit_timestamp.has_value()) { // Update the last commit timestamp - mem_storage->replication_state_.last_commit_timestamp_.store(*commit_timestamp_); + mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_); } // Release engine lock because we don't have to hold it anymore // and emplace back could take a long time. @@ -772,8 +770,7 @@ utils::BasicResult InMemoryStorage::InMemoryAcce // modifications before they are written to disk. // Replica can log only the write transaction received from Main // so the Wal files are consistent - if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN || - desired_commit_timestamp.has_value()) { + if (replState.IsMain() || desired_commit_timestamp.has_value()) { could_replicate_all_sync_replicas = mem_storage->AppendToWalDataManipulation(transaction_, *commit_timestamp_); } @@ -788,10 +785,9 @@ utils::BasicResult InMemoryStorage::InMemoryAcce transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // Replica can only update the last commit timestamp with // the commits received from main. - if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN || - desired_commit_timestamp.has_value()) { + if (replState.IsMain() || desired_commit_timestamp.has_value()) { // Update the last commit timestamp - mem_storage->replication_state_.last_commit_timestamp_.store(*commit_timestamp_); + mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_); } // Release engine lock because we don't have to hold it anymore // and emplace back could take a long time. @@ -1159,7 +1155,8 @@ Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, S // of any query on replica to the last commited transaction // which is timestamp_ as only commit of transaction with writes // can change the value of it. - if (replication_state_.GetRole() == replication::ReplicationRole::REPLICA) { + auto const &replState = repl_state_; + if (replState.IsReplica()) { start_timestamp = timestamp_; } else { start_timestamp = timestamp_++; @@ -1494,12 +1491,12 @@ StorageInfo InMemoryStorage::GetInfo() const { utils::GetDirDiskUsage(config_.durability.storage_directory)}; } -bool InMemoryStorage::InitializeWalFile() { +bool InMemoryStorage::InitializeWalFile(memgraph::replication::ReplicationEpoch &epoch) { if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL) return false; if (!wal_file_) { - wal_file_.emplace(wal_directory_, uuid_, replication_state_.GetEpoch().id, config_.items, name_id_mapper_.get(), - wal_seq_num_++, &file_retainer_); + wal_file_.emplace(wal_directory_, uuid_, epoch.id(), config_.items, name_id_mapper_.get(), wal_seq_num_++, + &file_retainer_); } return true; } @@ -1524,14 +1521,15 @@ void InMemoryStorage::FinalizeWalFile() { } bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction, uint64_t final_commit_timestamp) { - if (!InitializeWalFile()) { + auto &replState = repl_state_; + if (!InitializeWalFile(replState.GetEpoch())) { return true; } // Traverse deltas and append them to the WAL file. // A single transaction will always be contained in a single WAL file. auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire); - replication_state_.InitializeTransaction(wal_file_->SequenceNumber()); + repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber()); auto append_deltas = [&](auto callback) { // Helper lambda that traverses the delta chain on order to find the first @@ -1682,7 +1680,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction append_deltas([&](const Delta &delta, const auto &parent, uint64_t timestamp) { wal_file_->AppendDelta(delta, parent, timestamp); - replication_state_.AppendDelta(delta, parent, timestamp); + repl_storage_state_.AppendDelta(delta, parent, timestamp); }); // Add a delta that indicates that the transaction is fully written to the WAL @@ -1690,15 +1688,16 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction wal_file_->AppendTransactionEnd(final_commit_timestamp); FinalizeWalFile(); - return replication_state_.FinalizeTransaction(final_commit_timestamp); + return repl_storage_state_.FinalizeTransaction(final_commit_timestamp); } bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, uint64_t final_commit_timestamp) { - if (!InitializeWalFile()) { + auto &replState = repl_state_; + if (!InitializeWalFile(replState.GetEpoch())) { return true; } - replication_state_.InitializeTransaction(wal_file_->SequenceNumber()); + repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber()); for (const auto &md_delta : transaction.md_deltas) { switch (md_delta.action) { @@ -1769,7 +1768,7 @@ bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, wal_file_->AppendTransactionEnd(final_commit_timestamp); FinalizeWalFile(); - return replication_state_.FinalizeTransaction(final_commit_timestamp); + return repl_storage_state_.FinalizeTransaction(final_commit_timestamp); } void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label, @@ -1777,7 +1776,7 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera LabelPropertyIndexStats property_stats, uint64_t final_commit_timestamp) { wal_file_->AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp); - replication_state_.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp); + repl_storage_state_.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp); } void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label, @@ -1804,19 +1803,19 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera } utils::BasicResult InMemoryStorage::CreateSnapshot( - std::optional is_periodic) { - if (replication_state_.GetRole() != replication::ReplicationRole::MAIN) { + memgraph::replication::ReplicationState const &replicationState, std::optional is_periodic) { + if (replicationState.IsReplica()) { return CreateSnapshotError::DisabledForReplica; } - auto snapshot_creator = [this]() { + auto const &epoch = replicationState.GetEpoch(); + auto snapshot_creator = [this, &epoch]() { utils::Timer timer; - const auto &epoch = replication_state_.GetEpoch(); auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_); // Create snapshot. durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_, config_.durability.snapshot_retention_count, &vertices_, &edges_, name_id_mapper_.get(), - &indices_, &constraints_, config_, uuid_, epoch.id, replication_state_.history, + &indices_, &constraints_, config_, uuid_, epoch, repl_storage_state_.history, &file_retainer_); // Finalize snapshot transaction. commit_log_->MarkFinished(transaction.start_timestamp); @@ -1872,14 +1871,13 @@ uint64_t InMemoryStorage::CommitTimestamp(const std::optional desired_ return *desired_commit_timestamp; } -void InMemoryStorage::EstablishNewEpoch() { +void InMemoryStorage::PrepareForNewEpoch(std::string prev_epoch) { std::unique_lock engine_guard{engine_lock_}; if (wal_file_) { wal_file_->FinalizeWal(); wal_file_.reset(); } - // TODO: Move out of storage (no need for the lock) <- missing commit_timestamp at a higher level - replication_state_.NewEpoch(); + repl_storage_state_.AddEpochToHistory(std::move(prev_epoch)); } utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::IsPathLocked() { @@ -1907,14 +1905,16 @@ utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath() return true; } -auto InMemoryStorage::CreateReplicationClient(replication::ReplicationClientConfig const &config) +auto InMemoryStorage::CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config) -> std::unique_ptr { - return std::make_unique(this, config); + auto &replState = this->repl_state_; + return std::make_unique(this, config, &replState.GetEpoch()); } std::unique_ptr InMemoryStorage::CreateReplicationServer( - const replication::ReplicationServerConfig &config) { - return std::make_unique(this, config); + const memgraph::replication::ReplicationServerConfig &config) { + auto &replState = this->repl_state_; + return std::make_unique(this, config, &replState.GetEpoch()); } std::unique_ptr InMemoryStorage::Access(std::optional override_isolation_level) { diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 708f6b7e6..8f403768f 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -21,10 +21,9 @@ #include "storage/v2/storage.hpp" /// REPLICATION /// -#include "storage/v2/replication/config.hpp" +#include "replication/config.hpp" #include "storage/v2/replication/enums.hpp" -#include "storage/v2/replication/replication.hpp" -#include "storage/v2/replication/replication_persistence_helper.hpp" +#include "storage/v2/replication/replication_storage_state.hpp" #include "storage/v2/replication/rpc.hpp" #include "storage/v2/replication/serialization.hpp" #include "storage/v2/transaction.hpp" @@ -326,14 +325,15 @@ class InMemoryStorage final : public Storage { utils::FileRetainer::FileLockerAccessor::ret_type LockPath(); utils::FileRetainer::FileLockerAccessor::ret_type UnlockPath(); - utils::BasicResult CreateSnapshot(std::optional is_periodic); + utils::BasicResult CreateSnapshot( + memgraph::replication::ReplicationState const &replicationState, std::optional is_periodic); Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) override; - auto CreateReplicationClient(replication::ReplicationClientConfig const &config) + auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config) -> std::unique_ptr override; - auto CreateReplicationServer(const replication::ReplicationServerConfig &config) + auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig &config) -> std::unique_ptr override; private: @@ -351,7 +351,7 @@ class InMemoryStorage final : public Storage { template void CollectGarbage(std::unique_lock main_guard = {}); - bool InitializeWalFile(); + bool InitializeWalFile(memgraph::replication::ReplicationEpoch &epoch); void FinalizeWalFile(); StorageInfo GetInfo() const override; @@ -379,7 +379,7 @@ class InMemoryStorage final : public Storage { uint64_t CommitTimestamp(std::optional desired_commit_timestamp = {}); - void EstablishNewEpoch() override; + void PrepareForNewEpoch(std::string prev_epoch) override; // Main object storage utils::SkipList vertices_; diff --git a/src/storage/v2/replication/enums.hpp b/src/storage/v2/replication/enums.hpp index bbe8ffc62..e89a1fdd3 100644 --- a/src/storage/v2/replication/enums.hpp +++ b/src/storage/v2/replication/enums.hpp @@ -13,11 +13,7 @@ #include namespace memgraph::storage::replication { -enum class ReplicationRole : uint8_t { MAIN, REPLICA }; - -enum class ReplicationMode : std::uint8_t { SYNC, ASYNC }; enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID }; -enum class RegistrationMode : std::uint8_t { MUST_BE_INSTANTLY_VALID, CAN_BE_INVALID }; } // namespace memgraph::storage::replication diff --git a/src/storage/v2/replication/global.hpp b/src/storage/v2/replication/global.hpp index 820362740..7892fb990 100644 --- a/src/storage/v2/replication/global.hpp +++ b/src/storage/v2/replication/global.hpp @@ -17,38 +17,13 @@ #include #include "io/network/endpoint.hpp" +#include "replication/config.hpp" #include "storage/v2/replication/enums.hpp" #include "utils/uuid.hpp" // TODO: move to replication namespace and unify namespace memgraph::storage { -// TODO: Should be at MAIN instance level; shouldn't be connected to storage -struct ReplicationEpoch { - ReplicationEpoch() : id(utils::GenerateUUID()) {} - - // UUID to distinguish different main instance runs for replication process - // on SAME storage. - // Multiple instances can have same storage UUID and be MAIN at the same time. - // We cannot compare commit timestamps of those instances if one of them - // becomes the replica of the other so we use epoch_id_ as additional - // discriminating property. - // Example of this: - // We have 2 instances of the same storage, S1 and S2. - // S1 and S2 are MAIN and accept their own commits and write them to the WAL. - // At the moment when S1 commited a transaction with timestamp 20, and S2 - // a different transaction with timestamp 15, we change S2's role to REPLICA - // and register it on S1. - // Without using the epoch_id, we don't know that S1 and S2 have completely - // different transactions, we think that the S2 is behind only by 5 commits. - std::string id; // TODO: Move to replication level - - // Generates a new epoch id, returning the old one - std::string NewEpoch() { return std::exchange(id, utils::GenerateUUID()); } - - std::string SetEpoch(std::string new_epoch) { return std::exchange(id, std::move(new_epoch)); } -}; - struct TimestampInfo { uint64_t current_timestamp_of_replica; uint64_t current_number_of_timestamp_behind_master; @@ -56,7 +31,7 @@ struct TimestampInfo { struct ReplicaInfo { std::string name; - replication::ReplicationMode mode; + memgraph::replication::ReplicationMode mode; io::network::Endpoint endpoint; replication::ReplicaState state; TimestampInfo timestamp_info; diff --git a/src/storage/v2/replication/replication.cpp b/src/storage/v2/replication/replication.cpp deleted file mode 100644 index 30426ec04..000000000 --- a/src/storage/v2/replication/replication.cpp +++ /dev/null @@ -1,376 +0,0 @@ -// Copyright 2023 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#include "storage/v2/replication/replication.hpp" -#include "storage/v2/constraints/constraints.hpp" -#include "storage/v2/durability/durability.hpp" -#include "storage/v2/durability/snapshot.hpp" -#include "storage/v2/replication/replication_client.hpp" -#include "storage/v2/replication/replication_server.hpp" -#include "storage/v2/storage.hpp" - -namespace memgraph::storage { - -using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; - -namespace { - -std::string RegisterReplicaErrorToString(RegisterReplicaError error) { - using enum RegisterReplicaError; - switch (error) { - case NAME_EXISTS: - return "NAME_EXISTS"; - case END_POINT_EXISTS: - return "END_POINT_EXISTS"; - case CONNECTION_FAILED: - return "CONNECTION_FAILED"; - case COULD_NOT_BE_PERSISTED: - return "COULD_NOT_BE_PERSISTED"; - } -} -} // namespace - -storage::ReplicationState::ReplicationState(bool restore, std::filesystem::path durability_dir) { - if (restore) { - utils::EnsureDirOrDie(durability_dir / durability::kReplicationDirectory); - durability_ = std::make_unique(durability_dir / durability::kReplicationDirectory); - } -} - -void storage::ReplicationState::Reset() { - replication_server_.reset(); - replication_clients_.WithLock([&](auto &clients) { clients.clear(); }); -} - -bool storage::ReplicationState::SetMainReplicationRole(storage::Storage *storage) { - // We don't want to generate new epoch_id and do the - // cleanup if we're already a MAIN - if (GetRole() == replication::ReplicationRole::MAIN) { - return false; - } - - // Main instance does not need replication server - // This should be always called first so we finalize everything - replication_server_.reset(nullptr); - - storage->EstablishNewEpoch(); - - if (ShouldStoreAndRestoreReplicationState()) { - // Only thing that matters here is the role saved as MAIN - auto data = replication::ReplicationStatusToJSON( - replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName, - .ip_address = "", - .port = 0, - .sync_mode = replication::ReplicationMode::SYNC, - .replica_check_frequency = std::chrono::seconds(0), - .ssl = std::nullopt, - .role = replication::ReplicationRole::MAIN}); - - if (!durability_->Put(replication::kReservedReplicationRoleName, data.dump())) { - spdlog::error("Error when saving MAIN replication role in settings."); - return false; - } - } - - SetRole(replication::ReplicationRole::MAIN); - return true; -} - -void storage::ReplicationState::AppendOperation(durability::StorageMetadataOperation operation, LabelId label, - const std::set &properties, const LabelIndexStats &stats, - const LabelPropertyIndexStats &property_stats, - uint64_t final_commit_timestamp) { - if (GetRole() == replication::ReplicationRole::MAIN) { - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->IfStreamingTransaction([&](auto &stream) { - stream.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp); - }); - } - }); - } -} - -void storage::ReplicationState::InitializeTransaction(uint64_t seq_num) { - if (GetRole() == replication::ReplicationRole::MAIN) { - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->StartTransactionReplication(seq_num); - } - }); - } -} - -void storage::ReplicationState::AppendDelta(const Delta &delta, const Edge &parent, uint64_t timestamp) { - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, parent, timestamp); }); - } - }); -} - -void storage::ReplicationState::AppendDelta(const Delta &delta, const Vertex &parent, uint64_t timestamp) { - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, parent, timestamp); }); - } - }); -} - -bool storage::ReplicationState::FinalizeTransaction(uint64_t timestamp) { - bool finalized_on_all_replicas = true; - replication_clients_.WithLock([&](auto &clients) { - for (auto &client : clients) { - client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(timestamp); }); - const auto finalized = client->FinalizeTransactionReplication(); - - if (client->Mode() == replication::ReplicationMode::SYNC) { - finalized_on_all_replicas = finalized && finalized_on_all_replicas; - } - } - }); - return finalized_on_all_replicas; -} - -utils::BasicResult ReplicationState::RegisterReplica( - const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config, - Storage *storage) { - MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can register a replica!"); - - auto name_check = [&config](auto &clients) { - auto name_matches = [&name = config.name](const auto &client) { return client->Name() == name; }; - return std::any_of(clients.begin(), clients.end(), name_matches); - }; - - auto desired_endpoint = io::network::Endpoint{config.ip_address, config.port}; - auto endpoint_check = [&](auto &clients) { - auto endpoint_matches = [&](const auto &client) { return client->Endpoint() == desired_endpoint; }; - return std::any_of(clients.begin(), clients.end(), endpoint_matches); - }; - - auto task = [&](auto &clients) -> utils::BasicResult { - if (name_check(clients)) { - return RegisterReplicaError::NAME_EXISTS; - } - - if (endpoint_check(clients)) { - return RegisterReplicaError::END_POINT_EXISTS; - } - - if (!TryPersistReplicaClient(config)) { - return RegisterReplicaError::COULD_NOT_BE_PERSISTED; - } - - auto client = storage->CreateReplicationClient(config); - client->Start(); - - if (client->State() == replication::ReplicaState::INVALID) { - if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) { - return RegisterReplicaError::CONNECTION_FAILED; - } - - spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name()); - } - - clients.push_back(std::move(client)); - return {}; - }; - - return replication_clients_.WithLock(task); -} - -bool ReplicationState::TryPersistReplicaClient(const replication::ReplicationClientConfig &config) { - if (!ShouldStoreAndRestoreReplicationState()) return true; - auto data = replication::ReplicationStatusToJSON( - replication::ReplicationStatus{.name = config.name, - .ip_address = config.ip_address, - .port = config.port, - .sync_mode = config.mode, - .replica_check_frequency = config.replica_check_frequency, - .ssl = config.ssl, - .role = replication::ReplicationRole::REPLICA}); - if (durability_->Put(config.name, data.dump())) return true; - spdlog::error("Error when saving replica {} in settings.", config.name); - return false; -} - -bool ReplicationState::SetReplicaRole(const replication::ReplicationServerConfig &config, Storage *storage) { - // We don't want to restart the server if we're already a REPLICA - if (GetRole() == replication::ReplicationRole::REPLICA) { - return false; - } - - replication_server_ = storage->CreateReplicationServer(config); - bool res = replication_server_->Start(); - if (!res) { - spdlog::error("Unable to start the replication server."); - return false; - } - - if (ShouldStoreAndRestoreReplicationState()) { - // Only thing that matters here is the role saved as REPLICA and the listening port - auto data = replication::ReplicationStatusToJSON( - replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName, - .ip_address = config.ip_address, - .port = config.port, - .sync_mode = replication::ReplicationMode::SYNC, - .replica_check_frequency = std::chrono::seconds(0), - .ssl = std::nullopt, - .role = replication::ReplicationRole::REPLICA}); - - if (!durability_->Put(replication::kReservedReplicationRoleName, data.dump())) { - spdlog::error("Error when saving REPLICA replication role in settings."); - return false; - } - } - - SetRole(replication::ReplicationRole::REPLICA); - return true; -} - -bool ReplicationState::UnregisterReplica(std::string_view name) { - MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can unregister a replica!"); - if (ShouldStoreAndRestoreReplicationState()) { - if (!durability_->Delete(name)) { - spdlog::error("Error when removing replica {} from settings.", name); - return false; - } - } - - return replication_clients_.WithLock([&](auto &clients) { - return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; }); - }); -} - -std::optional ReplicationState::GetReplicaState(const std::string_view name) { - return replication_clients_.WithLock([&](auto &clients) -> std::optional { - const auto client_it = - std::find_if(clients.cbegin(), clients.cend(), [name](auto &client) { return client->Name() == name; }); - if (client_it == clients.cend()) { - return std::nullopt; - } - return (*client_it)->State(); - }); -} - -std::vector ReplicationState::ReplicasInfo() { - return replication_clients_.WithLock([](auto &clients) { - std::vector replica_info; - replica_info.reserve(clients.size()); - std::transform( - clients.begin(), clients.end(), std::back_inserter(replica_info), [](const auto &client) -> ReplicaInfo { - return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()}; - }); - return replica_info; - }); -} - -void ReplicationState::RestoreReplicationRole(Storage *storage) { - if (!ShouldStoreAndRestoreReplicationState()) { - return; - } - - spdlog::info("Restoring replication role."); - uint16_t port = replication::kDefaultReplicationPort; - - const auto replication_data = durability_->Get(replication::kReservedReplicationRoleName); - if (!replication_data.has_value()) { - spdlog::debug("Cannot find data needed for restore replication role in persisted metadata."); - return; - } - - const auto maybe_replication_status = replication::JSONToReplicationStatus(nlohmann::json::parse(*replication_data)); - if (!maybe_replication_status.has_value()) { - LOG_FATAL("Cannot parse previously saved configuration of replication role {}.", - replication::kReservedReplicationRoleName); - } - - const auto replication_status = *maybe_replication_status; - if (!replication_status.role.has_value()) { - SetRole(replication::ReplicationRole::MAIN); - } else { - SetRole(*replication_status.role); - port = replication_status.port; - } - - if (GetRole() == replication::ReplicationRole::REPLICA) { - replication_server_ = storage->CreateReplicationServer(replication::ReplicationServerConfig{ - .ip_address = replication::kDefaultReplicationServerIp, - .port = port, - }); - bool res = replication_server_->Start(); - if (!res) { - LOG_FATAL("Unable to start the replication server."); - } - } - - spdlog::info("Replication role restored to {}.", - GetRole() == replication::ReplicationRole::MAIN ? "MAIN" : "REPLICA"); -} - -void ReplicationState::RestoreReplicas(Storage *storage) { - if (!ShouldStoreAndRestoreReplicationState()) { - return; - } - spdlog::info("Restoring replicas."); - - for (const auto &[replica_name, replica_data] : *durability_) { - spdlog::info("Restoring replica {}.", replica_name); - - const auto maybe_replica_status = replication::JSONToReplicationStatus(nlohmann::json::parse(replica_data)); - if (!maybe_replica_status.has_value()) { - LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name); - } - - auto replica_status = *maybe_replica_status; - MG_ASSERT(replica_status.name == replica_name, "Expected replica name is '{}', but got '{}'", replica_status.name, - replica_name); - - if (replica_name == replication::kReservedReplicationRoleName) { - continue; - } - - auto ret = RegisterReplica(replication::RegistrationMode::CAN_BE_INVALID, - replication::ReplicationClientConfig{ - .name = replica_status.name, - .mode = replica_status.sync_mode, - .ip_address = replica_status.ip_address, - .port = replica_status.port, - .replica_check_frequency = replica_status.replica_check_frequency, - .ssl = replica_status.ssl, - }, - storage); - - if (ret.HasError()) { - MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError()); - LOG_FATAL("Failure when restoring replica {}: {}.", replica_name, RegisterReplicaErrorToString(ret.GetError())); - } - spdlog::info("Replica {} restored.", replica_name); - } -} - -constexpr uint16_t kEpochHistoryRetention = 1000; - -void ReplicationState::NewEpoch() { - // Generate new epoch id and save the last one to the history. - if (history.size() == kEpochHistoryRetention) { - history.pop_front(); - } - auto prevEpoch = epoch_.NewEpoch(); - history.emplace_back(std::move(prevEpoch), last_commit_timestamp_); -} - -void ReplicationState::AppendEpoch(std::string new_epoch) { - auto prevEpoch = epoch_.SetEpoch(std::move(new_epoch)); - history.emplace_back(std::move(prevEpoch), last_commit_timestamp_); -} - -} // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index b1b021192..c2b099ff1 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -15,29 +15,25 @@ #include #include "storage/v2/durability/durability.hpp" -#include "storage/v2/indices/label_property_index_stats.hpp" -#include "storage/v2/replication/config.hpp" -#include "storage/v2/replication/enums.hpp" #include "storage/v2/storage.hpp" -#include "storage/v2/transaction.hpp" -#include "utils/file_locker.hpp" -#include "utils/logging.hpp" -#include "utils/message.hpp" namespace memgraph::storage { -static auto CreateClientContext(const replication::ReplicationClientConfig &config) -> communication::ClientContext { +static auto CreateClientContext(const memgraph::replication::ReplicationClientConfig &config) + -> communication::ClientContext { return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file} : communication::ClientContext{}; } -ReplicationClient::ReplicationClient(Storage *storage, replication::ReplicationClientConfig const &config) +ReplicationClient::ReplicationClient(Storage *storage, const memgraph::replication::ReplicationClientConfig &config, + const memgraph::replication::ReplicationEpoch *epoch) : name_{config.name}, rpc_context_{CreateClientContext(config)}, rpc_client_{io::network::Endpoint(config.ip_address, config.port), &rpc_context_}, replica_check_frequency_{config.replica_check_frequency}, mode_{config.mode}, - storage_{storage} {} + storage_{storage}, + repl_epoch_{epoch} {} ReplicationClient::~ReplicationClient() { auto endpoint = rpc_client_.Endpoint(); @@ -46,21 +42,19 @@ ReplicationClient::~ReplicationClient() { } uint64_t ReplicationClient::LastCommitTimestamp() const { - return storage_->replication_state_.last_commit_timestamp_.load(); + return storage_->repl_storage_state_.last_commit_timestamp_.load(); } void ReplicationClient::InitializeClient() { uint64_t current_commit_timestamp{kTimestampInitialId}; - const auto &main_epoch = storage_->replication_state_.GetEpoch(); - - auto stream{rpc_client_.Stream(storage_->replication_state_.last_commit_timestamp_, - main_epoch.id)}; + auto stream{rpc_client_.Stream(storage_->repl_storage_state_.last_commit_timestamp_, + std::string{repl_epoch_->id()})}; const auto replica = stream.AwaitResponse(); std::optional branching_point; - if (replica.epoch_id != main_epoch.id && replica.current_commit_timestamp != kTimestampInitialId) { - auto const &history = storage_->replication_state_.history; + if (replica.epoch_id != repl_epoch_->id() && replica.current_commit_timestamp != kTimestampInitialId) { + auto const &history = storage_->repl_storage_state_.history; const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) { return main_epoch_info.first == replica.epoch_id; }); @@ -82,8 +76,8 @@ void ReplicationClient::InitializeClient() { current_commit_timestamp = replica.current_commit_timestamp; spdlog::trace("Current timestamp on replica {}: {}", name_, current_commit_timestamp); - spdlog::trace("Current timestamp on main: {}", storage_->replication_state_.last_commit_timestamp_.load()); - if (current_commit_timestamp == storage_->replication_state_.last_commit_timestamp_.load()) { + spdlog::trace("Current timestamp on main: {}", storage_->repl_storage_state_.last_commit_timestamp_.load()); + if (current_commit_timestamp == storage_->repl_storage_state_.last_commit_timestamp_.load()) { spdlog::debug("Replica '{}' up to date", name_); std::unique_lock client_guard{client_lock_}; replica_state_.store(replication::ReplicaState::READY); @@ -110,7 +104,7 @@ TimestampInfo ReplicationClient::GetTimestampInfo() { replica_state_.store(replication::ReplicaState::INVALID); HandleRpcFailure(); } - auto main_time_stamp = storage_->replication_state_.last_commit_timestamp_.load(); + auto main_time_stamp = storage_->repl_storage_state_.last_commit_timestamp_.load(); info.current_timestamp_of_replica = response.current_commit_timestamp; info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp; } catch (const rpc::RpcFailedException &) { @@ -173,7 +167,7 @@ void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_s MG_ASSERT(!replica_stream_); try { replica_stream_.emplace( - ReplicaStream{this, storage_->replication_state_.last_commit_timestamp_.load(), current_wal_seq_num}); + ReplicaStream{this, storage_->repl_storage_state_.last_commit_timestamp_.load(), current_wal_seq_num}); replica_state_.store(replication::ReplicaState::REPLICATING); } catch (const rpc::RpcFailedException &) { replica_state_.store(replication::ReplicaState::INVALID); @@ -183,8 +177,6 @@ void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_s } } -auto ReplicationClient::GetEpochId() const -> std::string const & { return storage_->replication_state_.GetEpoch().id; } - bool ReplicationClient::FinalizeTransactionReplication() { // We can only check the state because it guarantees to be only // valid during a single transaction replication (if the assumption @@ -218,7 +210,7 @@ bool ReplicationClient::FinalizeTransactionReplication() { return false; }; - if (mode_ == replication::ReplicationMode::ASYNC) { + if (mode_ == memgraph::replication::ReplicationMode::ASYNC) { thread_pool_.AddTask([=] { (void)task(); }); return true; } @@ -290,7 +282,7 @@ ReplicaStream::ReplicaStream(ReplicationClient *self, const uint64_t previous_co stream_(self_->rpc_client_.Stream(previous_commit_timestamp, current_seq_num)) { replication::Encoder encoder{stream_.GetBuilder()}; - encoder.WriteString(self_->GetEpochId()); + encoder.WriteString(self->repl_epoch_->id()); } void ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp) { diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index e1cf7ab85..e3c1b2634 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -11,12 +11,13 @@ #pragma once +#include "replication/config.hpp" +#include "replication/epoch.hpp" #include "rpc/client.hpp" #include "storage/v2/durability/storage_global_operation.hpp" #include "storage/v2/id_types.hpp" #include "storage/v2/indices/label_index_stats.hpp" #include "storage/v2/indices/label_property_index_stats.hpp" -#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/global.hpp" #include "storage/v2/replication/rpc.hpp" @@ -69,7 +70,8 @@ class ReplicationClient { friend class ReplicaStream; public: - ReplicationClient(Storage *storage, replication::ReplicationClientConfig const &config); + ReplicationClient(Storage *storage, const memgraph::replication::ReplicationClientConfig &config, + const memgraph::replication::ReplicationEpoch *epoch); ReplicationClient(ReplicationClient const &) = delete; ReplicationClient &operator=(ReplicationClient const &) = delete; @@ -78,7 +80,7 @@ class ReplicationClient { virtual ~ReplicationClient(); - auto Mode() const -> replication::ReplicationMode { return mode_; } + auto Mode() const -> memgraph::replication::ReplicationMode { return mode_; } auto Name() const -> std::string const & { return name_; } auto Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); } auto State() const -> replication::ReplicaState { return replica_state_.load(); } @@ -99,7 +101,6 @@ class ReplicationClient { virtual void RecoverReplica(uint64_t replica_commit) = 0; auto GetStorage() -> Storage * { return storage_; } - auto GetEpochId() const -> std::string const &; auto LastCommitTimestamp() const -> uint64_t; void InitializeClient(); void HandleRpcFailure(); @@ -113,7 +114,7 @@ class ReplicationClient { std::chrono::seconds replica_check_frequency_; std::optional replica_stream_; - replication::ReplicationMode mode_{replication::ReplicationMode::SYNC}; + memgraph::replication::ReplicationMode mode_{memgraph::replication::ReplicationMode::SYNC}; utils::SpinLock client_lock_; // This thread pool is used for background tasks so we don't @@ -134,6 +135,8 @@ class ReplicationClient { utils::Scheduler replica_checker_; Storage *storage_; + + memgraph::replication::ReplicationEpoch const *repl_epoch_; }; } // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_handler.cpp b/src/storage/v2/replication/replication_handler.cpp new file mode 100644 index 000000000..4ba1edf69 --- /dev/null +++ b/src/storage/v2/replication/replication_handler.cpp @@ -0,0 +1,204 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v2/replication/replication_handler.hpp" + +#include "replication/state.hpp" +#include "storage/v2/storage.hpp" + +namespace memgraph::storage { + +namespace { + +std::string RegisterReplicaErrorToString(RegisterReplicaError error) { + switch (error) { + using enum RegisterReplicaError; + case NAME_EXISTS: + return "NAME_EXISTS"; + case END_POINT_EXISTS: + return "END_POINT_EXISTS"; + case CONNECTION_FAILED: + return "CONNECTION_FAILED"; + case COULD_NOT_BE_PERSISTED: + return "COULD_NOT_BE_PERSISTED"; + } +} +} // namespace + +bool ReplicationHandler::SetReplicationRoleMain() { + // We don't want to generate new epoch_id and do the + // cleanup if we're already a MAIN + // TODO: under lock + if (repl_state_.IsMain()) { + return false; + } + + // STEP 1) bring down all REPLICA servers + auto current_epoch = std::string(repl_state_.GetEpoch().id()); + { // TODO: foreach storage + // ensure replica server brought down + storage_.repl_storage_state_.replication_server_.reset(nullptr); + // Remember old epoch + storage timestamp association + storage_.repl_storage_state_.AddEpochToHistory(current_epoch); + } + + // STEP 2) Change to MAIN + repl_state_.GetEpoch().NewEpoch(); + if (!repl_state_.TryPersistRoleMain()) { + // TODO: On failure restore old epoch? restore replication servers? + return false; + } + repl_state_.SetRole(memgraph::replication::ReplicationRole::MAIN); + return true; +} +memgraph::utils::BasicResult ReplicationHandler::RegisterReplica( + const RegistrationMode registration_mode, const memgraph::replication::ReplicationClientConfig &config) { + MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!"); + + auto name_check = [&config](auto &clients) { + auto name_matches = [&name = config.name](const auto &client) { return client->Name() == name; }; + return std::any_of(clients.begin(), clients.end(), name_matches); + }; + + auto desired_endpoint = io::network::Endpoint{config.ip_address, config.port}; + auto endpoint_check = [&](auto &clients) { + auto endpoint_matches = [&](const auto &client) { return client->Endpoint() == desired_endpoint; }; + return std::any_of(clients.begin(), clients.end(), endpoint_matches); + }; + + auto task = [&](auto &clients) -> utils::BasicResult { + if (name_check(clients)) { + return RegisterReplicaError::NAME_EXISTS; + } + + if (endpoint_check(clients)) { + return RegisterReplicaError::END_POINT_EXISTS; + } + + using enum RegistrationMode; + if (registration_mode != RESTORE && !repl_state_.TryPersistRegisteredReplica(config)) { + return RegisterReplicaError::COULD_NOT_BE_PERSISTED; + } + + auto client = storage_.CreateReplicationClient(config); + client->Start(); + + if (client->State() == replication::ReplicaState::INVALID) { + if (registration_mode != RESTORE) { + return RegisterReplicaError::CONNECTION_FAILED; + } + + spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name()); + } + + clients.push_back(std::move(client)); + return {}; + }; + + return storage_.repl_storage_state_.replication_clients_.WithLock(task); +} +bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config) { + // We don't want to restart the server if we're already a REPLICA + if (repl_state_.IsReplica()) { + return false; + } + + std::unique_ptr replication_server = storage_.CreateReplicationServer(config); + bool res = replication_server->Start(); + if (!res) { + spdlog::error("Unable to start the replication server."); + return false; + } + storage_.repl_storage_state_.replication_server_ = std::move(replication_server); + + if (!repl_state_.TryPersistRoleReplica(config)) { + return false; + } + + repl_state_.SetRole(memgraph::replication::ReplicationRole::REPLICA); + return true; +} +auto ReplicationHandler::UnregisterReplica(std::string_view name) -> UnregisterReplicaResult { + if (repl_state_.IsReplica()) { + return UnregisterReplicaResult::NOT_MAIN; + } + + if (!repl_state_.TryPersistUnregisterReplica(name)) { + return UnregisterReplicaResult::COULD_NOT_BE_PERSISTED; + } + + auto const n_unregistered = storage_.repl_storage_state_.replication_clients_.WithLock([&](auto &clients) { + return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; }); + }); + return (n_unregistered != 0) ? UnregisterReplicaResult::SUCCESS : UnregisterReplicaResult::CAN_NOT_UNREGISTER; +} +void ReplicationHandler::RestoreReplication() { + if (!repl_state_.ShouldPersist()) { + return; + } + + spdlog::info("Restoring replication role."); + + using memgraph::replication::ReplicationState; + + auto replicationData = repl_state_.FetchReplicationData(); + if (replicationData.HasError()) { + switch (replicationData.GetError()) { + using enum ReplicationState::FetchReplicationError; + case NOTHING_FETCHED: { + spdlog::debug("Cannot find data needed for restore replication role in persisted metadata."); + return; + } + case PARSE_ERROR: { + LOG_FATAL("Cannot parse previously saved configuration of replication role."); + return; + } + } + } + + /// MAIN + auto const recover_main = [this](ReplicationState::ReplicationDataMain const &configs) { + storage_.repl_storage_state_.replication_server_.reset(); + repl_state_.SetRole(memgraph::replication::ReplicationRole::MAIN); + for (const auto &config : configs) { + spdlog::info("Replica {} restored for {}.", config.name, storage_.id()); + auto ret = RegisterReplica(RegistrationMode::RESTORE, config); + if (ret.HasError()) { + MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError()); + LOG_FATAL("Failure when restoring replica {}: {}.", config.name, RegisterReplicaErrorToString(ret.GetError())); + } + spdlog::info("Replica {} restored for {}.", config.name, storage_.id()); + } + spdlog::info("Replication role restored to MAIN."); + }; + + /// REPLICA + auto const recover_replica = [this](ReplicationState::ReplicationDataReplica const &config) { + auto replication_server = storage_.CreateReplicationServer(config); + if (!replication_server->Start()) { + LOG_FATAL("Unable to start the replication server."); + } + storage_.repl_storage_state_.replication_server_ = std::move(replication_server); + repl_state_.SetRole(memgraph::replication::ReplicationRole::REPLICA); + spdlog::info("Replication role restored to REPLICA."); + }; + + std::visit( + utils::Overloaded{ + recover_main, + recover_replica, + }, + *replicationData); +} +auto ReplicationHandler::GetRole() const -> memgraph::replication::ReplicationRole { return repl_state_.GetRole(); } +bool ReplicationHandler::IsMain() const { return repl_state_.IsMain(); } +bool ReplicationHandler::IsReplica() const { return repl_state_.IsReplica(); } +} // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_handler.hpp b/src/storage/v2/replication/replication_handler.hpp new file mode 100644 index 000000000..797f76b54 --- /dev/null +++ b/src/storage/v2/replication/replication_handler.hpp @@ -0,0 +1,71 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "replication/role.hpp" +#include "utils/result.hpp" + +// BEGIN fwd declares +namespace memgraph::replication { +struct ReplicationState; +struct ReplicationServerConfig; +struct ReplicationClientConfig; +} // namespace memgraph::replication +namespace memgraph::storage { +class Storage; +} +// END fwd declares + +namespace memgraph::storage { + +enum class RegistrationMode : std::uint8_t { MUST_BE_INSTANTLY_VALID, RESTORE }; +enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED, COULD_NOT_BE_PERSISTED }; +enum class UnregisterReplicaResult : uint8_t { + NOT_MAIN, + COULD_NOT_BE_PERSISTED, + CAN_NOT_UNREGISTER, + SUCCESS, +}; + +/// A handler type that keep in sync current ReplicationState and the MAIN/REPLICA-ness of Storage +/// TODO: extend to do multiple storages +struct ReplicationHandler { + ReplicationHandler(memgraph::replication::ReplicationState &replState, Storage &storage) + : repl_state_(replState), storage_(storage) {} + + // as REPLICA, become MAIN + bool SetReplicationRoleMain(); + + // as MAIN, become REPLICA + bool SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config); + + // as MAIN, define and connect to REPLICAs + auto RegisterReplica(RegistrationMode registration_mode, const memgraph::replication::ReplicationClientConfig &config) + -> utils::BasicResult; + + // as MAIN, remove a REPLICA connection + auto UnregisterReplica(std::string_view name) -> UnregisterReplicaResult; + + // Generic restoration + // TODO: decouple storage restoration from epoch restoration + void RestoreReplication(); + + // Helper pass-through (TODO: remove) + auto GetRole() const -> memgraph::replication::ReplicationRole; + bool IsMain() const; + bool IsReplica() const; + + private: + memgraph::replication::ReplicationState &repl_state_; + Storage &storage_; +}; +} // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index f64e2cd16..0ed367d88 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -11,13 +11,13 @@ #include "replication_server.hpp" #include "io/network/endpoint.hpp" +#include "replication/config.hpp" #include "rpc.hpp" -#include "storage/v2/replication/config.hpp" namespace memgraph::storage { namespace { -auto CreateServerContext(const replication::ReplicationServerConfig &config) -> communication::ServerContext { +auto CreateServerContext(const memgraph::replication::ReplicationServerConfig &config) -> communication::ServerContext { return (config.ssl) ? communication::ServerContext{config.ssl->key_file, config.ssl->cert_file, config.ssl->ca_file, config.ssl->verify_peer} : communication::ServerContext{}; @@ -30,7 +30,7 @@ auto CreateServerContext(const replication::ReplicationServerConfig &config) -> constexpr auto kReplictionServerThreads = 1; } // namespace -ReplicationServer::ReplicationServer(const replication::ReplicationServerConfig &config) +ReplicationServer::ReplicationServer(const memgraph::replication::ReplicationServerConfig &config) : rpc_server_context_{CreateServerContext(config)}, rpc_server_{io::network::Endpoint{config.ip_address, config.port}, &rpc_server_context_, kReplictionServerThreads} { diff --git a/src/storage/v2/replication/replication_server.hpp b/src/storage/v2/replication/replication_server.hpp index ca036fd3a..0d7415396 100644 --- a/src/storage/v2/replication/replication_server.hpp +++ b/src/storage/v2/replication/replication_server.hpp @@ -11,16 +11,16 @@ #pragma once +#include "replication/config.hpp" #include "rpc/server.hpp" #include "slk/streams.hpp" -#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/global.hpp" namespace memgraph::storage { class ReplicationServer { public: - explicit ReplicationServer(const replication::ReplicationServerConfig &config); + explicit ReplicationServer(const memgraph::replication::ReplicationServerConfig &config); ReplicationServer(const ReplicationServer &) = delete; ReplicationServer(ReplicationServer &&) = delete; ReplicationServer &operator=(const ReplicationServer &) = delete; diff --git a/src/storage/v2/replication/replication_storage_state.cpp b/src/storage/v2/replication/replication_storage_state.cpp new file mode 100644 index 000000000..5779bd2ba --- /dev/null +++ b/src/storage/v2/replication/replication_storage_state.cpp @@ -0,0 +1,111 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v2/replication/replication_storage_state.hpp" + +#include "storage/v2/replication/replication_client.hpp" +#include "storage/v2/replication/replication_server.hpp" + +namespace memgraph::storage { + +void ReplicationStorageState::InitializeTransaction(uint64_t seq_num) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->StartTransactionReplication(seq_num); + } + }); +} + +void ReplicationStorageState::AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t timestamp) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, vertex, timestamp); }); + } + }); +} + +void ReplicationStorageState::AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, edge, timestamp); }); + } + }); +} +void ReplicationStorageState::AppendOperation(durability::StorageMetadataOperation operation, LabelId label, + const std::set &properties, const LabelIndexStats &stats, + const LabelPropertyIndexStats &property_stats, + uint64_t final_commit_timestamp) { + replication_clients_.WithLock([&](auto &clients) { + for (auto &client : clients) { + client->IfStreamingTransaction([&](auto &stream) { + stream.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp); + }); + } + }); +} + +bool ReplicationStorageState::FinalizeTransaction(uint64_t timestamp) { + return replication_clients_.WithLock([=](auto &clients) { + bool finalized_on_all_replicas = true; + for (ReplicationClientPtr &client : clients) { + client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(timestamp); }); + const auto finalized = client->FinalizeTransactionReplication(); + + if (client->Mode() == memgraph::replication::ReplicationMode::SYNC) { + finalized_on_all_replicas = finalized && finalized_on_all_replicas; + } + } + return finalized_on_all_replicas; + }); +} + +std::optional ReplicationStorageState::GetReplicaState(std::string_view name) const { + return replication_clients_.WithReadLock([&](auto const &clients) -> std::optional { + auto const name_matches = [=](ReplicationClientPtr const &client) { return client->Name() == name; }; + auto const client_it = std::find_if(clients.cbegin(), clients.cend(), name_matches); + if (client_it == clients.cend()) { + return std::nullopt; + } + return (*client_it)->State(); + }); +} + +std::vector ReplicationStorageState::ReplicasInfo() const { + return replication_clients_.WithReadLock([](auto const &clients) { + std::vector replica_infos; + replica_infos.reserve(clients.size()); + auto const asReplicaInfo = [](ReplicationClientPtr const &client) -> ReplicaInfo { + return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()}; + }; + std::transform(clients.begin(), clients.end(), std::back_inserter(replica_infos), asReplicaInfo); + return replica_infos; + }); +} + +void ReplicationStorageState::Reset() { + replication_server_.reset(); + replication_clients_.WithLock([](auto &clients) { clients.clear(); }); +} + +void ReplicationStorageState::AddEpochToHistory(std::string prev_epoch) { + constexpr uint16_t kEpochHistoryRetention = 1000; + // Generate new epoch id and save the last one to the history. + if (history.size() == kEpochHistoryRetention) { + history.pop_front(); + } + history.emplace_back(std::move(prev_epoch), last_commit_timestamp_); +} + +void ReplicationStorageState::AddEpochToHistoryForce(std::string prev_epoch) { + history.emplace_back(std::move(prev_epoch), last_commit_timestamp_); +} + +} // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication.hpp b/src/storage/v2/replication/replication_storage_state.hpp similarity index 51% rename from src/storage/v2/replication/replication.hpp rename to src/storage/v2/replication/replication_storage_state.hpp index 2c38b9536..c3dc7c833 100644 --- a/src/storage/v2/replication/replication.hpp +++ b/src/storage/v2/replication/replication_storage_state.hpp @@ -11,6 +11,8 @@ #pragma once +#include + #include "kvstore/kvstore.hpp" #include "storage/v2/delta.hpp" #include "storage/v2/durability/storage_global_operation.hpp" @@ -18,63 +20,39 @@ #include "utils/result.hpp" /// REPLICATION /// -#include "storage/v2/replication/config.hpp" +#include "replication/config.hpp" +#include "replication/epoch.hpp" +#include "replication/state.hpp" #include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/global.hpp" -#include "storage/v2/replication/replication_persistence_helper.hpp" #include "storage/v2/replication/rpc.hpp" #include "storage/v2/replication/serialization.hpp" -// TODO use replication namespace namespace memgraph::storage { class Storage; class ReplicationServer; class ReplicationClient; -enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED, COULD_NOT_BE_PERSISTED }; - -struct ReplicationState { - // TODO: This mirrors the logic in InMemoryConstructor; make it independent - ReplicationState(bool restore, std::filesystem::path durability_dir); - - // Generic API - void Reset(); - // TODO: Just check if server exists -> you are REPLICA - replication::ReplicationRole GetRole() const { return replication_role_.load(); } - - bool SetMainReplicationRole(Storage *storage); // Set the instance to MAIN - // TODO: ReplicationServer/Client uses Storage* for RPC callbacks - bool SetReplicaRole(const replication::ReplicationServerConfig &config, - Storage *storage); // Sets the instance to REPLICA - // Generic restoration - void RestoreReplicationRole(Storage *storage); - - // MAIN actually doing the replication +struct ReplicationStorageState { + // Only MAIN can send + void InitializeTransaction(uint64_t seq_num); + void AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t timestamp); + void AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp); void AppendOperation(durability::StorageMetadataOperation operation, LabelId label, const std::set &properties, const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, uint64_t final_commit_timestamp); - void InitializeTransaction(uint64_t seq_num); - void AppendDelta(const Delta &delta, const Vertex &parent, uint64_t timestamp); - void AppendDelta(const Delta &delta, const Edge &parent, uint64_t timestamp); bool FinalizeTransaction(uint64_t timestamp); - // MAIN connecting to replicas - utils::BasicResult RegisterReplica(const replication::RegistrationMode registration_mode, - const replication::ReplicationClientConfig &config, - Storage *storage); - bool UnregisterReplica(std::string_view name); + // Getters + auto GetReplicaState(std::string_view name) const -> std::optional; + auto ReplicasInfo() const -> std::vector; - // MAIN reconnecting to replicas - void RestoreReplicas(Storage *storage); + // History + void AddEpochToHistory(std::string prev_epoch); + void AddEpochToHistoryForce(std::string prev_epoch); - // MAIN getting info from replicas - // TODO make into const (problem with SpinLock and WithReadLock) - std::optional GetReplicaState(std::string_view name); - std::vector ReplicasInfo(); - - const ReplicationEpoch &GetEpoch() const { return epoch_; } - ReplicationEpoch &GetEpoch() { return epoch_; } + void Reset(); // Questions: // - storage durability <- databases/*name*/wal and snapshots (where this for epoch_id) @@ -83,21 +61,8 @@ struct ReplicationState { // Each value consists of the epoch id along the last commit belonging to that // epoch. std::deque> history; - - // TODO: actually durability std::atomic last_commit_timestamp_{kTimestampInitialId}; - void NewEpoch(); - void AppendEpoch(std::string new_epoch); - - private: - bool TryPersistReplicaClient(const replication::ReplicationClientConfig &config); - bool ShouldStoreAndRestoreReplicationState() const { return nullptr != durability_; } - void SetRole(replication::ReplicationRole role) { return replication_role_.store(role); } - - // NOTE: Server is not in MAIN it is in REPLICA - std::unique_ptr replication_server_{nullptr}; - // We create ReplicationClient using unique_ptr so we can move // newly created client into the vector. // We cannot move the client directly because it contains ThreadPool @@ -108,14 +73,13 @@ struct ReplicationState { // This way we can initialize client in main thread which means // that we can immediately notify the user if the initialization // failed. - using ReplicationClientList = utils::Synchronized>, utils::SpinLock>; + using ReplicationClientPtr = std::unique_ptr; + using ReplicationClientList = utils::Synchronized, utils::RWSpinLock>; + + // NOTE: Server is not in MAIN it is in REPLICA + std::unique_ptr replication_server_{nullptr}; + ReplicationClientList replication_clients_; - - std::atomic replication_role_{replication::ReplicationRole::MAIN}; - - std::unique_ptr durability_; - - ReplicationEpoch epoch_; }; } // namespace memgraph::storage diff --git a/src/storage/v2/replication/rpc.hpp b/src/storage/v2/replication/rpc.hpp index f466b1880..d2cf21fb4 100644 --- a/src/storage/v2/replication/rpc.hpp +++ b/src/storage/v2/replication/rpc.hpp @@ -63,7 +63,7 @@ struct HeartbeatReq { static void Save(const HeartbeatReq &self, memgraph::slk::Builder *builder); HeartbeatReq() {} HeartbeatReq(uint64_t main_commit_timestamp, std::string epoch_id) - : main_commit_timestamp(main_commit_timestamp), epoch_id(epoch_id) {} + : main_commit_timestamp(main_commit_timestamp), epoch_id(std::move(epoch_id)) {} uint64_t main_commit_timestamp; std::string epoch_id; diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index c4f52faa3..11008d0a1 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -40,8 +40,14 @@ class InMemoryStorage; using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler; +auto ReplicationStateHelper(Config const &config) -> std::optional { + if (!config.durability.restore_replication_state_on_startup) return std::nullopt; + return {config.durability.storage_directory}; +} + Storage::Storage(Config config, StorageMode storage_mode) - : name_id_mapper_(std::invoke([config, storage_mode]() -> std::unique_ptr { + : repl_state_(ReplicationStateHelper(config)), + name_id_mapper_(std::invoke([config, storage_mode]() -> std::unique_ptr { if (storage_mode == StorageMode::ON_DISK_TRANSACTIONAL) { return std::make_unique(config.disk.name_id_mapper_directory, config.disk.id_name_mapper_directory); @@ -53,9 +59,7 @@ Storage::Storage(Config config, StorageMode storage_mode) storage_mode_(storage_mode), indices_(config, storage_mode), constraints_(config, storage_mode), - id_(config.name), - replication_state_(config_.durability.restore_replication_state_on_startup, - config_.durability.storage_directory) {} + id_(config.name) {} Storage::Accessor::Accessor(SharedAccess /* tag */, Storage *storage, IsolationLevel isolation_level, StorageMode storage_mode) diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 366048c7d..0aeb206e7 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -19,6 +19,7 @@ #include "io/network/endpoint.hpp" #include "kvstore/kvstore.hpp" #include "query/exceptions.hpp" +#include "replication/config.hpp" #include "storage/v2/all_vertices_iterable.hpp" #include "storage/v2/commit_log.hpp" #include "storage/v2/config.hpp" @@ -27,11 +28,10 @@ #include "storage/v2/edge_accessor.hpp" #include "storage/v2/indices/indices.hpp" #include "storage/v2/mvcc.hpp" -#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/enums.hpp" -#include "storage/v2/replication/replication.hpp" #include "storage/v2/replication/replication_client.hpp" #include "storage/v2/replication/replication_server.hpp" +#include "storage/v2/replication/replication_storage_state.hpp" #include "storage/v2/storage_error.hpp" #include "storage/v2/storage_mode.hpp" #include "storage/v2/transaction.hpp" @@ -297,37 +297,22 @@ class Storage { virtual Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) = 0; - virtual void EstablishNewEpoch() = 0; + virtual void PrepareForNewEpoch(std::string prev_epoch) = 0; - virtual auto CreateReplicationClient(replication::ReplicationClientConfig const &config) + virtual auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config) -> std::unique_ptr = 0; - virtual auto CreateReplicationServer(const replication::ReplicationServerConfig &config) + virtual auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig &config) -> std::unique_ptr = 0; - /// REPLICATION - bool SetReplicaRole(const replication::ReplicationServerConfig &config) { - return replication_state_.SetReplicaRole(config, this); - } - bool SetMainReplicationRole() { return replication_state_.SetMainReplicationRole(this); } - - /// @pre The instance should have a MAIN role - /// @pre Timeout can only be set for SYNC replication - auto RegisterReplica(const replication::RegistrationMode registration_mode, - const replication::ReplicationClientConfig &config) { - return replication_state_.RegisterReplica(registration_mode, config, this); - } - /// @pre The instance should have a MAIN role - bool UnregisterReplica(const std::string &name) { return replication_state_.UnregisterReplica(name); } - replication::ReplicationRole GetReplicationRole() const { return replication_state_.GetRole(); } - auto ReplicasInfo() { return replication_state_.ReplicasInfo(); } - std::optional GetReplicaState(std::string_view name) { - return replication_state_.GetReplicaState(name); + auto ReplicasInfo() { return repl_storage_state_.ReplicasInfo(); } + auto GetReplicaState(std::string_view name) -> std::optional { + return repl_storage_state_.GetReplicaState(name); } - protected: - void RestoreReplicas() { return replication_state_.RestoreReplicas(this); } - void RestoreReplicationRole() { return replication_state_.RestoreReplicationRole(this); } + // TODO: make non-public + memgraph::replication::ReplicationState repl_state_; + ReplicationStorageState repl_storage_state_; public: // Main storage lock. @@ -360,9 +345,6 @@ class Storage { std::atomic vertex_id_{0}; std::atomic edge_id_{0}; const std::string id_; //!< High-level assigned ID - - protected: - ReplicationState replication_state_; }; } // namespace memgraph::storage diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index d5144daf6..9b22389b1 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -25,6 +25,7 @@ find_package(gflags REQUIRED) find_package(Threads REQUIRED) add_library(mg-utils STATIC ${utils_src_files}) +add_library(mg::utils ALIAS mg-utils) target_link_libraries(mg-utils PUBLIC Boost::headers fmt::fmt spdlog::spdlog) target_link_libraries(mg-utils PRIVATE librdtsc stdc++fs Threads::Threads gflags json uuid rt) diff --git a/tests/unit/replication_persistence_helper.cpp b/tests/unit/replication_persistence_helper.cpp index 7915d315c..b2fcae53e 100644 --- a/tests/unit/replication_persistence_helper.cpp +++ b/tests/unit/replication_persistence_helper.cpp @@ -9,8 +9,9 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include "storage/v2/replication/replication_persistence_helper.hpp" #include "formatters.hpp" +#include "replication/state.hpp" +#include "replication/status.hpp" #include "utils/logging.hpp" #include @@ -20,6 +21,7 @@ #include using namespace memgraph::storage::replication; +using namespace memgraph::replication; class ReplicationPersistanceHelperTest : public testing::Test { protected: diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index 6757ca266..85b7f801b 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -21,27 +21,30 @@ #include #include #include +#include "replication/config.hpp" #include "storage/v2/indices/label_index_stats.hpp" -#include "storage/v2/replication/config.hpp" +#include "storage/v2/replication/replication_handler.hpp" #include "storage/v2/storage.hpp" #include "storage/v2/view.hpp" using testing::UnorderedElementsAre; +using memgraph::replication::ReplicationClientConfig; +using memgraph::replication::ReplicationMode; +using memgraph::replication::ReplicationRole; +using memgraph::replication::ReplicationServerConfig; using memgraph::storage::Config; using memgraph::storage::EdgeAccessor; using memgraph::storage::Gid; using memgraph::storage::InMemoryStorage; using memgraph::storage::PropertyValue; using memgraph::storage::RegisterReplicaError; +using memgraph::storage::RegistrationMode; +using memgraph::storage::ReplicationHandler; using memgraph::storage::Storage; +using memgraph::storage::UnregisterReplicaResult; using memgraph::storage::View; -using memgraph::storage::replication::RegistrationMode; using memgraph::storage::replication::ReplicaState; -using memgraph::storage::replication::ReplicationClientConfig; -using memgraph::storage::replication::ReplicationMode; -using memgraph::storage::replication::ReplicationRole; -using memgraph::storage::replication::ReplicationServerConfig; class ReplicationTest : public ::testing::Test { protected: @@ -72,19 +75,21 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { std::unique_ptr main_store = std::make_unique(configuration); std::unique_ptr replica_store = std::make_unique(configuration); - replica_store->SetReplicaRole(ReplicationServerConfig{ + auto replica_store_handler = ReplicationHandler{replica_store->repl_state_, *replica_store}; + replica_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], }); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = "REPLICA", - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }) + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = "REPLICA", + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); // vertex create @@ -368,7 +373,8 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, }})}; - replica_store1->SetReplicaRole(ReplicationServerConfig{ + auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1}; + replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], }); @@ -378,28 +384,30 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { .storage_directory = storage_directory, .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL, }})}; - replica_store2->SetReplicaRole(ReplicationServerConfig{ + auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2}; + replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[1], }); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }) + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[1], - }) + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }) .HasError()); const auto *vertex_label = "label"; @@ -429,7 +437,8 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) { check_replica(replica_store1.get()); check_replica(replica_store2.get()); - main_store->UnregisterReplica(replicas[1]); + auto handler = ReplicationHandler{main_store->repl_state_, *main_store}; + handler.UnregisterReplica(replicas[1]); { auto acc = main_store->Access(); auto v = acc->CreateVertex(); @@ -526,19 +535,20 @@ TEST_F(ReplicationTest, RecoveryProcess) { {.durability = {.storage_directory = replica_storage_directory, .snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}})}; - replica_store->SetReplicaRole(ReplicationServerConfig{ + auto replica_store_handler = ReplicationHandler{replica_store->repl_state_, *replica_store}; + replica_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], }); - - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }) + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); ASSERT_EQ(main_store->GetReplicaState(replicas[0]), ReplicaState::RECOVERY); @@ -600,19 +610,21 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) { std::unique_ptr replica_store_async{new InMemoryStorage(configuration)}; - replica_store_async->SetReplicaRole(ReplicationServerConfig{ + auto replica_store_handler = ReplicationHandler{replica_store_async->repl_state_, *replica_store_async}; + replica_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[1], }); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = "REPLICA_ASYNC", - .mode = ReplicationMode::ASYNC, - .ip_address = local_host, - .port = ports[1], - }) + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = "REPLICA_ASYNC", + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = ports[1], + }) .HasError()); static constexpr size_t vertices_create_num = 10; @@ -647,36 +659,39 @@ TEST_F(ReplicationTest, EpochTest) { std::unique_ptr main_store{new InMemoryStorage(configuration)}; std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - replica_store1->SetReplicaRole(ReplicationServerConfig{ + auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1}; + replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], }); std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; - replica_store2->SetReplicaRole(ReplicationServerConfig{ + auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2}; + replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = 10001, }); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }) + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = 10001, - }) + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = 10001, + }) .HasError()); std::optional vertex_gid; @@ -699,18 +714,19 @@ TEST_F(ReplicationTest, EpochTest) { ASSERT_FALSE(acc->Commit().HasError()); } - main_store->UnregisterReplica(replicas[0]); - main_store->UnregisterReplica(replicas[1]); + main_store_handler.UnregisterReplica(replicas[0]); + main_store_handler.UnregisterReplica(replicas[1]); - replica_store1->SetMainReplicationRole(); - ASSERT_FALSE(replica_store1 - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = 10001, - }) + ASSERT_TRUE(replica1_store_handler.SetReplicationRoleMain()); + + ASSERT_FALSE(replica1_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = 10001, + }) .HasError()); @@ -733,18 +749,18 @@ TEST_F(ReplicationTest, EpochTest) { ASSERT_FALSE(acc->Commit().HasError()); } - replica_store1->SetReplicaRole(ReplicationServerConfig{ + replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], }); - ASSERT_TRUE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }) + ASSERT_TRUE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .HasError()); @@ -769,43 +785,46 @@ TEST_F(ReplicationTest, ReplicationInformation) { std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; uint16_t replica1_port = 10001; - replica_store1->SetReplicaRole(ReplicationServerConfig{ + auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1}; + replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = replica1_port, }); uint16_t replica2_port = 10002; std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; - replica_store2->SetReplicaRole(ReplicationServerConfig{ + auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2}; + replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = replica2_port, }); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = replica1_port, - }) + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = replica1_port, + }) .HasError()); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::ASYNC, - .ip_address = local_host, - .port = replica2_port, - }) + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = replica2_port, + }) .HasError()); - ASSERT_EQ(main_store->GetReplicationRole(), ReplicationRole::MAIN); - ASSERT_EQ(replica_store1->GetReplicationRole(), ReplicationRole::REPLICA); - ASSERT_EQ(replica_store2->GetReplicationRole(), ReplicationRole::REPLICA); + ASSERT_TRUE(main_store->repl_state_.IsMain()); + ASSERT_TRUE(replica_store1->repl_state_.IsReplica()); + ASSERT_TRUE(replica_store2->repl_state_.IsReplica()); const auto replicas_info = main_store->ReplicasInfo(); ASSERT_EQ(replicas_info.size(), 2); @@ -828,36 +847,38 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) { std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; uint16_t replica1_port = 10001; - replica_store1->SetReplicaRole(ReplicationServerConfig{ + auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1}; + replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = replica1_port, }); uint16_t replica2_port = 10002; std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; - replica_store2->SetReplicaRole(ReplicationServerConfig{ + auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2}; + replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = replica2_port, }); - - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = replica1_port, - }) - .HasError()); - - ASSERT_TRUE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ .name = replicas[0], - .mode = ReplicationMode::ASYNC, + .mode = ReplicationMode::SYNC, .ip_address = local_host, - .port = replica2_port, + .port = replica1_port, }) + .HasError()); + + ASSERT_TRUE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[0], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = replica2_port, + }) .GetError() == RegisterReplicaError::NAME_EXISTS); } @@ -866,35 +887,38 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { std::unique_ptr main_store{new InMemoryStorage(configuration)}; std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - replica_store1->SetReplicaRole(ReplicationServerConfig{ + auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1}; + replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = common_port, }); std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; - replica_store2->SetReplicaRole(ReplicationServerConfig{ + auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2}; + replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = common_port, }); - ASSERT_FALSE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = replicas[0], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = common_port, - }) - .HasError()); - - ASSERT_TRUE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_FALSE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::ASYNC, + .name = replicas[0], + .mode = ReplicationMode::SYNC, .ip_address = local_host, .port = common_port, }) + .HasError()); + + ASSERT_TRUE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::ASYNC, + .ip_address = local_host, + .port = common_port, + }) .GetError() == RegisterReplicaError::END_POINT_EXISTS); } @@ -904,30 +928,34 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { std::unique_ptr main_store{new InMemoryStorage(main_config)}; std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - replica_store1->SetReplicaRole(ReplicationServerConfig{ + auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1}; + replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], }); std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; - replica_store2->SetReplicaRole(ReplicationServerConfig{ + auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2}; + replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[1], }); - auto res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + auto res = + main_store_handler.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ .name = replicas[0], .mode = ReplicationMode::SYNC, .ip_address = local_host, .port = ports[0], }); ASSERT_FALSE(res.HasError()); - res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[1], - }); + res = main_store_handler.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }); ASSERT_FALSE(res.HasError()); auto replica_infos = main_store->ReplicasInfo(); @@ -961,31 +989,34 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { std::unique_ptr main_store{new InMemoryStorage(main_config)}; std::unique_ptr replica_store1{new InMemoryStorage(configuration)}; - replica_store1->SetReplicaRole(ReplicationServerConfig{ + auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1}; + replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[0], }); std::unique_ptr replica_store2{new InMemoryStorage(configuration)}; - replica_store2->SetReplicaRole(ReplicationServerConfig{ + auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2}; + replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{ .ip_address = local_host, .port = ports[1], }); - - auto res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + auto res = + main_store_handler.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ .name = replicas[0], .mode = ReplicationMode::SYNC, .ip_address = local_host, .port = ports[0], }); ASSERT_FALSE(res.HasError()); - res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ - .name = replicas[1], - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[1], - }); + res = main_store_handler.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{ + .name = replicas[1], + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[1], + }); ASSERT_FALSE(res.HasError()); auto replica_infos = main_store->ReplicasInfo(); @@ -998,8 +1029,9 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { ASSERT_EQ(replica_infos[1].endpoint.address, local_host); ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]); - const auto unregister_res = main_store->UnregisterReplica(replicas[0]); - ASSERT_TRUE(unregister_res); + auto handler = ReplicationHandler{main_store->repl_state_, *main_store}; + const auto unregister_res = handler.UnregisterReplica(replicas[0]); + ASSERT_EQ(unregister_res, UnregisterReplicaResult::SUCCESS); replica_infos = main_store->ReplicasInfo(); ASSERT_EQ(replica_infos.size(), 1); @@ -1020,13 +1052,14 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) { TEST_F(ReplicationTest, AddingInvalidReplica) { std::unique_ptr main_store{new InMemoryStorage(configuration)}; - ASSERT_TRUE(main_store - ->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, - ReplicationClientConfig{ - .name = "REPLICA", - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }) + auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store}; + ASSERT_TRUE(main_store_handler + .RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, + ReplicationClientConfig{ + .name = "REPLICA", + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }) .GetError() == RegisterReplicaError::CONNECTION_FAILED); }