From 8b9e1fa08bd2094bf89313e94fe31028d28f4ff1 Mon Sep 17 00:00:00 2001 From: andrejtonev <29177572+andrejtonev@users.noreply.github.com> Date: Thu, 23 Nov 2023 11:02:35 +0100 Subject: [PATCH] Replication refactor part 6 (#1484) Single (instance level) connection to a replica (messages from all databases get multiplexed through it) ReplicationClient split in two: ReplicationClient and ReplicationStorageClient New ReplicationClient, moved under replication, handles the raw connection, owned by MainRoleData ReplicationStorageClient handles the storage <-> replica state machine and holds to a stream Removed epoch and storage from *Clients rpc::Stream proactively aborts on error and sets itself to a defunct state Removed HandleRpcFailure, instead we simply log the error and let the FrequentCheck handle re-connection replica_state is now a synced variable ReplicaStorageClient state machine bugfixes Single FrequentCheck that goes through DBMS Moved ReplicationState under DbmsHandler Moved some replication startup logic under the DbmsHandler's constructor Removed InMemoryReplicationClient CreateReplicationClient has been removed from Storage Simplified GetRecoverySteps and made safer --------- Co-authored-by: Gareth Lloyd --- release/get_version.py | 14 +- src/dbms/CMakeLists.txt | 2 +- src/dbms/constants.hpp | 6 + src/dbms/database.cpp | 2 +- src/dbms/database.hpp | 2 +- src/dbms/database_handler.hpp | 3 +- src/dbms/dbms_handler.cpp | 75 ++++ src/dbms/dbms_handler.hpp | 69 ++-- src/dbms/inmemory/storage_helper.hpp | 10 +- src/dbms/replication_client.cpp | 34 ++ src/dbms/replication_client.hpp | 21 ++ src/dbms/replication_handler.cpp | 164 ++++---- src/dbms/replication_handler.hpp | 6 +- src/memgraph.cpp | 25 +- src/query/frontend/ast/ast.hpp | 2 +- src/query/interpreter.cpp | 55 ++- src/replication/CMakeLists.txt | 4 + .../include/replication/messages.hpp | 44 +++ .../replication/replication_client.hpp | 82 ++++ .../replication/replication_server.hpp | 24 -- src/replication/include/replication/state.hpp | 28 +- src/replication/messages.cpp | 65 ++++ src/replication/replication_client.cpp | 40 ++ src/replication/replication_server.cpp | 46 +-- src/replication/state.cpp | 39 +- src/rpc/client.hpp | 57 ++- src/slk/streams.cpp | 8 +- src/slk/streams.hpp | 8 +- src/storage/v2/CMakeLists.txt | 3 +- src/storage/v2/disk/storage.hpp | 6 - src/storage/v2/durability/durability.cpp | 10 +- .../v2/inmemory/replication/recovery.cpp | 238 ++++++++++++ .../v2/inmemory/replication/recovery.hpp | 32 ++ .../replication/replication_client.cpp | 349 ------------------ .../replication/replication_client.hpp | 38 -- src/storage/v2/inmemory/storage.cpp | 16 +- src/storage/v2/inmemory/storage.hpp | 12 +- src/storage/v2/replication/enums.hpp | 2 +- src/storage/v2/replication/recovery.hpp | 28 ++ .../v2/replication/replication_client.cpp | 349 +++++++++--------- .../v2/replication/replication_client.hpp | 124 ++++--- .../replication/replication_storage_state.cpp | 18 +- .../replication/replication_storage_state.hpp | 25 +- src/storage/v2/storage.hpp | 10 +- .../show_while_creating_invalid_state.py | 155 +++++++- tests/integration/telemetry/client.cpp | 2 +- tests/unit/dbms_handler.cpp | 5 +- tests/unit/dbms_handler_community.cpp | 5 +- tests/unit/storage_v2_replication.cpp | 27 +- 49 files changed, 1398 insertions(+), 991 deletions(-) create mode 100644 src/dbms/dbms_handler.cpp create mode 100644 src/dbms/replication_client.cpp create mode 100644 src/dbms/replication_client.hpp create mode 100644 src/replication/include/replication/messages.hpp create mode 100644 src/replication/include/replication/replication_client.hpp create mode 100644 src/replication/messages.cpp create mode 100644 src/replication/replication_client.cpp create mode 100644 src/storage/v2/inmemory/replication/recovery.cpp create mode 100644 src/storage/v2/inmemory/replication/recovery.hpp delete mode 100644 src/storage/v2/inmemory/replication/replication_client.cpp delete mode 100644 src/storage/v2/inmemory/replication/replication_client.hpp create mode 100644 src/storage/v2/replication/recovery.hpp diff --git a/release/get_version.py b/release/get_version.py index cfce88475..a8539fab4 100755 --- a/release/get_version.py +++ b/release/get_version.py @@ -104,7 +104,9 @@ def retry(retry_limit, timeout=100): except Exception: time.sleep(timeout) return func(*args, **kwargs) + return wrapper + return inner_func @@ -200,19 +202,19 @@ if args.version: try: current_branch = get_output("git", "rev-parse", "--abbrev-ref", "HEAD") if current_branch != "master": - branches = get_output("git", "branch") - if "master" in branches: + branches = get_output("git", "branch", "-r", "--list", "origin/master") + if "origin/master" in branches: # If master is present locally, the fetch is allowed to fail # because this script will still be able to compare against the # master branch. try: - get_output("git", "fetch", "origin", "master:master") + get_output("git", "fetch", "origin", "master") except Exception: pass else: # If master is not present locally, the fetch command has to # succeed because something else will fail otherwise. - get_output("git", "fetch", "origin", "master:master") + get_output("git", "fetch", "origin", "master") except Exception: print("Fatal error while ensuring local master branch.") sys.exit(1) @@ -232,7 +234,7 @@ for branch in branches: match = branch_regex.match(branch) if match is not None: version = tuple(map(int, match.group(1).split("."))) - master_branch_merge = get_output("git", "merge-base", "master", branch) + master_branch_merge = get_output("git", "merge-base", "origin/master", branch) versions.append((version, branch, master_branch_merge)) versions.sort(reverse=True) @@ -243,7 +245,7 @@ current_version = None for version in versions: version_tuple, branch, master_branch_merge = version current_branch_merge = get_output("git", "merge-base", current_hash, branch) - master_current_merge = get_output("git", "merge-base", current_hash, "master") + master_current_merge = get_output("git", "merge-base", current_hash, "origin/master") # The first check checks whether this commit is a child of `master` and # the version branch was created before us. # The second check checks whether this commit is a child of the version diff --git a/src/dbms/CMakeLists.txt b/src/dbms/CMakeLists.txt index 8ec1e0972..f1df4985a 100644 --- a/src/dbms/CMakeLists.txt +++ b/src/dbms/CMakeLists.txt @@ -1,3 +1,3 @@ -add_library(mg-dbms STATIC database.cpp replication_handler.cpp inmemory/replication_handlers.cpp) +add_library(mg-dbms STATIC dbms_handler.cpp database.cpp replication_handler.cpp replication_client.cpp inmemory/replication_handlers.cpp) target_link_libraries(mg-dbms mg-utils mg-storage-v2 mg-query) diff --git a/src/dbms/constants.hpp b/src/dbms/constants.hpp index 3ca61056b..e7ea9987b 100644 --- a/src/dbms/constants.hpp +++ b/src/dbms/constants.hpp @@ -15,4 +15,10 @@ namespace memgraph::dbms { constexpr static const char *kDefaultDB = "memgraph"; //!< Name of the default database +#ifdef MG_EXPERIMENTAL_REPLICATION_MULTITENANCY +constexpr bool allow_mt_repl = true; +#else +constexpr bool allow_mt_repl = false; +#endif + } // namespace memgraph::dbms diff --git a/src/dbms/database.cpp b/src/dbms/database.cpp index 411e282e8..74ee13892 100644 --- a/src/dbms/database.cpp +++ b/src/dbms/database.cpp @@ -21,7 +21,7 @@ template struct memgraph::utils::Gatekeeper; namespace memgraph::dbms { -Database::Database(storage::Config config, const replication::ReplicationState &repl_state) +Database::Database(storage::Config config, replication::ReplicationState &repl_state) : trigger_store_(config.durability.storage_directory / "triggers"), streams_{config.durability.storage_directory / "streams"}, plan_cache_{FLAGS_query_plan_cache_max_size}, diff --git a/src/dbms/database.hpp b/src/dbms/database.hpp index 457aa1c1d..416ff76bc 100644 --- a/src/dbms/database.hpp +++ b/src/dbms/database.hpp @@ -48,7 +48,7 @@ class Database { * * @param config storage configuration */ - explicit Database(storage::Config config, const replication::ReplicationState &repl_state); + explicit Database(storage::Config config, replication::ReplicationState &repl_state); /** * @brief Returns the raw storage pointer. diff --git a/src/dbms/database_handler.hpp b/src/dbms/database_handler.hpp index a6b3b563b..617e614c3 100644 --- a/src/dbms/database_handler.hpp +++ b/src/dbms/database_handler.hpp @@ -51,8 +51,7 @@ class DatabaseHandler : public Handler { * @param config Storage configuration * @return HandlerT::NewResult */ - HandlerT::NewResult New(std::string_view name, storage::Config config, - const replication::ReplicationState &repl_state) { + HandlerT::NewResult New(std::string_view name, storage::Config config, replication::ReplicationState &repl_state) { // Control that no one is using the same data directory if (std::any_of(begin(), end(), [&](auto &elem) { auto db_acc = elem.second.access(); diff --git a/src/dbms/dbms_handler.cpp b/src/dbms/dbms_handler.cpp new file mode 100644 index 000000000..0af9364bf --- /dev/null +++ b/src/dbms/dbms_handler.cpp @@ -0,0 +1,75 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "dbms/dbms_handler.hpp" + +namespace memgraph::dbms { +#ifdef MG_ENTERPRISE +DbmsHandler::DbmsHandler( + storage::Config config, + memgraph::utils::Synchronized *auth, + bool recovery_on_startup, bool delete_on_drop) + : default_config_{std::move(config)}, + delete_on_drop_(delete_on_drop), + repl_state_{ReplicationStateRootPath(default_config_)} { + // TODO: Decouple storage config from dbms config + // TODO: Save individual db configs inside the kvstore and restore from there + storage::UpdatePaths(default_config_, default_config_.durability.storage_directory / "databases"); + const auto &db_dir = default_config_.durability.storage_directory; + const auto durability_dir = db_dir / ".durability"; + utils::EnsureDirOrDie(db_dir); + utils::EnsureDirOrDie(durability_dir); + durability_ = std::make_unique(durability_dir); + + // Generate the default database + MG_ASSERT(!NewDefault_().HasError(), "Failed while creating the default DB."); + + // Recover previous databases + if (recovery_on_startup) { + for (const auto &[name, _] : *durability_) { + if (name == kDefaultDB) continue; // Already set + spdlog::info("Restoring database {}.", name); + MG_ASSERT(!New_(name).HasError(), "Failed while creating database {}.", name); + spdlog::info("Database {} restored.", name); + } + } else { // Clear databases from the durability list and auth + auto locked_auth = auth->Lock(); + for (const auto &[name, _] : *durability_) { + if (name == kDefaultDB) continue; + locked_auth->DeleteDatabase(name); + durability_->Delete(name); + } + } + + // Startup replication state (if recovered at startup) + auto replica = [this](replication::RoleReplicaData const &data) { + // Register handlers + InMemoryReplicationHandlers::Register(this, *data.server); + if (!data.server->Start()) { + spdlog::error("Unable to start the replication server."); + return false; + } + return true; + }; + // Replication frequent check start + auto main = [this](replication::RoleMainData &data) { + for (auto &client : data.registered_replicas_) { + StartReplicaClient(*this, client); + } + return true; + }; + // Startup proccess for main/replica + MG_ASSERT(std::visit(memgraph::utils::Overloaded{replica, main}, repl_state_.ReplicationData()), + "Replica recovery failure!"); +} +#endif + +} // namespace memgraph::dbms diff --git a/src/dbms/dbms_handler.hpp b/src/dbms/dbms_handler.hpp index 27ab963d4..3151398ab 100644 --- a/src/dbms/dbms_handler.hpp +++ b/src/dbms/dbms_handler.hpp @@ -26,9 +26,11 @@ #include "auth/auth.hpp" #include "constants.hpp" #include "dbms/database.hpp" +#include "dbms/inmemory/replication_handlers.hpp" #ifdef MG_ENTERPRISE #include "dbms/database_handler.hpp" #endif +#include "dbms/replication_client.hpp" #include "global.hpp" #include "query/config.hpp" #include "query/interpreter_context.hpp" @@ -102,52 +104,22 @@ class DbmsHandler { * @param recovery_on_startup restore databases (and its content) and authentication data * @param delete_on_drop when dropping delete any associated directories on disk */ - DbmsHandler(storage::Config config, const replication::ReplicationState &repl_state, auto *auth, - bool recovery_on_startup, bool delete_on_drop) - : lock_{utils::RWLock::Priority::READ}, - default_config_{std::move(config)}, - repl_state_(repl_state), - delete_on_drop_(delete_on_drop) { - // TODO: Decouple storage config from dbms config - // TODO: Save individual db configs inside the kvstore and restore from there - storage::UpdatePaths(default_config_, default_config_.durability.storage_directory / "databases"); - const auto &db_dir = default_config_.durability.storage_directory; - const auto durability_dir = db_dir / ".durability"; - utils::EnsureDirOrDie(db_dir); - utils::EnsureDirOrDie(durability_dir); - durability_ = std::make_unique(durability_dir); - - // Generate the default database - MG_ASSERT(!NewDefault_().HasError(), "Failed while creating the default DB."); - // Recover previous databases - if (recovery_on_startup) { - for (const auto &[name, _] : *durability_) { - if (name == kDefaultDB) continue; // Already set - spdlog::info("Restoring database {}.", name); - MG_ASSERT(!New_(name).HasError(), "Failed while creating database {}.", name); - spdlog::info("Database {} restored.", name); - } - } else { // Clear databases from the durability list and auth - auto locked_auth = auth->Lock(); - for (const auto &[name, _] : *durability_) { - if (name == kDefaultDB) continue; - locked_auth->DeleteDatabase(name); - durability_->Delete(name); - } - } - } + DbmsHandler(storage::Config config, + memgraph::utils::Synchronized *auth, + bool recovery_on_startup, bool delete_on_drop); // TODO If more arguments are added use a config strut #else /** * @brief Initialize the handler. A single database is supported in community edition. * * @param configs storage configuration */ - DbmsHandler(storage::Config config, const replication::ReplicationState &repl_state) - : db_gatekeeper_{[&] { + DbmsHandler(storage::Config config) + : repl_state_{ReplicationStateRootPath(config)}, + db_gatekeeper_{[&] { config.name = kDefaultDB; return std::move(config); }(), - repl_state} {} + repl_state_} {} #endif #ifdef MG_ENTERPRISE @@ -248,6 +220,12 @@ class DbmsHandler { #endif } + replication::ReplicationState &ReplicationState() { return repl_state_; } + replication::ReplicationState const &ReplicationState() const { return repl_state_; } + + bool IsMain() const { return repl_state_.IsMain(); } + bool IsReplica() const { return repl_state_.IsReplica(); } + /** * @brief Return the statistics all databases. * @@ -536,14 +514,15 @@ class DbmsHandler { throw UnknownDatabaseException("Tried to retrieve an unknown database \"{}\".", name); } - mutable LockT lock_; //!< protective lock - storage::Config default_config_; //!< Storage configuration used when creating new databases - const replication::ReplicationState &repl_state_; //!< Global replication state - DatabaseHandler db_handler_; //!< multi-tenancy storage handler - std::unique_ptr durability_; //!< list of active dbs (pointer so we can postpone its creation) - bool delete_on_drop_; //!< Flag defining if dropping storage also deletes its directory - std::set> defunct_dbs_; //!< Databases that are in an unknown state due to various failures -#else + mutable LockT lock_{utils::RWLock::Priority::READ}; //!< protective lock + storage::Config default_config_; //!< Storage configuration used when creating new databases + DatabaseHandler db_handler_; //!< multi-tenancy storage handler + std::unique_ptr durability_; //!< list of active dbs (pointer so we can postpone its creation) + bool delete_on_drop_; //!< Flag defining if dropping storage also deletes its directory + std::set defunct_dbs_; //!< Databases that are in an unknown state due to various failures +#endif + replication::ReplicationState repl_state_; //!< Global replication state +#ifndef MG_ENTERPRISE mutable utils::Gatekeeper db_gatekeeper_; //!< Single databases gatekeeper #endif }; diff --git a/src/dbms/inmemory/storage_helper.hpp b/src/dbms/inmemory/storage_helper.hpp index 347c16928..1cd9f9f4e 100644 --- a/src/dbms/inmemory/storage_helper.hpp +++ b/src/dbms/inmemory/storage_helper.hpp @@ -22,14 +22,8 @@ namespace memgraph::dbms { -#ifdef MG_EXPERIMENTAL_REPLICATION_MULTITENANCY -constexpr bool allow_mt_repl = true; -#else -constexpr bool allow_mt_repl = false; -#endif - -inline std::unique_ptr CreateInMemoryStorage( - storage::Config config, const ::memgraph::replication::ReplicationState &repl_state) { +inline std::unique_ptr CreateInMemoryStorage(storage::Config config, + ::memgraph::replication::ReplicationState &repl_state) { const auto wal_mode = config.durability.snapshot_wal_mode; const auto name = config.name; auto storage = std::make_unique(std::move(config)); diff --git a/src/dbms/replication_client.cpp b/src/dbms/replication_client.cpp new file mode 100644 index 000000000..bfa4c622f --- /dev/null +++ b/src/dbms/replication_client.cpp @@ -0,0 +1,34 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "dbms/replication_client.hpp" + +namespace memgraph::dbms { + +void StartReplicaClient(DbmsHandler &dbms_handler, replication::ReplicationClient &client) { + // No client error, start instance level client + auto const &endpoint = client.rpc_client_.Endpoint(); + spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port); + client.StartFrequentCheck([&dbms_handler](std::string_view name) { + // Working connection, check if any database has been left behind + dbms_handler.ForEach([name](dbms::Database *db) { + // Specific database <-> replica client + db->storage()->repl_storage_state_.WithClient(name, [&](storage::ReplicationStorageClient *client) { + if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) { + // Database <-> replica might be behind, check and recover + client->TryCheckReplicaStateAsync(db->storage()); + } + }); + }); + }); +} + +} // namespace memgraph::dbms diff --git a/src/dbms/replication_client.hpp b/src/dbms/replication_client.hpp new file mode 100644 index 000000000..c1bac91a2 --- /dev/null +++ b/src/dbms/replication_client.hpp @@ -0,0 +1,21 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "dbms/dbms_handler.hpp" +#include "replication/replication_client.hpp" + +namespace memgraph::dbms { + +void StartReplicaClient(DbmsHandler &dbms_handler, replication::ReplicationClient &client); + +} // namespace memgraph::dbms diff --git a/src/dbms/replication_handler.cpp b/src/dbms/replication_handler.cpp index cff93fd6b..2cbe2c432 100644 --- a/src/dbms/replication_handler.cpp +++ b/src/dbms/replication_handler.cpp @@ -15,6 +15,7 @@ #include "dbms/dbms_handler.hpp" #include "dbms/inmemory/replication_handlers.hpp" #include "dbms/inmemory/storage_helper.hpp" +#include "dbms/replication_client.hpp" #include "replication/state.hpp" using memgraph::replication::ReplicationClientConfig; @@ -41,6 +42,8 @@ std::string RegisterReplicaErrorToString(RegisterReplicaError error) { } } // namespace +ReplicationHandler::ReplicationHandler(DbmsHandler &dbms_handler) : dbms_handler_(dbms_handler) {} + bool ReplicationHandler::SetReplicationRoleMain() { auto const main_handler = [](RoleMainData const &) { // If we are already MAIN, we don't want to change anything @@ -56,42 +59,49 @@ bool ReplicationHandler::SetReplicationRoleMain() { // STEP 2) Change to MAIN // TODO: restore replication servers if false? - if (!repl_state_.SetReplicationRoleMain()) { + if (!dbms_handler_.ReplicationState().SetReplicationRoleMain()) { // TODO: Handle recovery on failure??? return false; } // STEP 3) We are now MAIN, update storage local epoch + const auto &epoch = + std::get(std::as_const(dbms_handler_.ReplicationState()).ReplicationData()).epoch_; dbms_handler_.ForEach([&](Database *db) { auto *storage = db->storage(); - storage->repl_storage_state_.epoch_ = std::get(std::as_const(repl_state_).ReplicationData()).epoch_; + storage->repl_storage_state_.epoch_ = epoch; }); return true; }; // TODO: under lock - return std::visit(utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData()); + return std::visit(utils::Overloaded{main_handler, replica_handler}, + dbms_handler_.ReplicationState().ReplicationData()); } bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config) { // We don't want to restart the server if we're already a REPLICA - if (repl_state_.IsReplica()) { + if (dbms_handler_.ReplicationState().IsReplica()) { return false; } - // Remove registered replicas + // TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are + // deleting the replica. + // Remove database specific clients dbms_handler_.ForEach([&](Database *db) { auto *storage = db->storage(); storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); }); + // Remove instance level clients + std::get(dbms_handler_.ReplicationState().ReplicationData()).registered_replicas_.clear(); // Creates the server - repl_state_.SetReplicationRoleReplica(config); + dbms_handler_.ReplicationState().SetReplicationRoleReplica(config); // Start const auto success = - std::visit(utils::Overloaded{[](auto) { + std::visit(utils::Overloaded{[](RoleMainData const &) { // ASSERT return false; }, @@ -104,36 +114,37 @@ bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication:: } return true; }}, - repl_state_.ReplicationData()); + dbms_handler_.ReplicationState().ReplicationData()); // TODO Handle error (restore to main?) return success; } auto ReplicationHandler::RegisterReplica(const memgraph::replication::ReplicationClientConfig &config) -> memgraph::utils::BasicResult { - MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!"); + MG_ASSERT(dbms_handler_.ReplicationState().IsMain(), "Only main instance can register a replica!"); - auto res = repl_state_.RegisterReplica(config); - switch (res) { - case memgraph::replication::RegisterReplicaError::NOT_MAIN: - MG_ASSERT(false, "Only main instance can register a replica!"); - return {}; - case memgraph::replication::RegisterReplicaError::NAME_EXISTS: - return memgraph::dbms::RegisterReplicaError::NAME_EXISTS; - case memgraph::replication::RegisterReplicaError::END_POINT_EXISTS: - return memgraph::dbms::RegisterReplicaError::END_POINT_EXISTS; - case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED: - return memgraph::dbms::RegisterReplicaError::COULD_NOT_BE_PERSISTED; - case memgraph::replication::RegisterReplicaError::SUCCESS: - break; - } - - bool all_clients_good = true; + auto instance_client = dbms_handler_.ReplicationState().RegisterReplica(config); + if (instance_client.HasError()) switch (instance_client.GetError()) { + case memgraph::replication::RegisterReplicaError::NOT_MAIN: + MG_ASSERT(false, "Only main instance can register a replica!"); + return {}; + case memgraph::replication::RegisterReplicaError::NAME_EXISTS: + return memgraph::dbms::RegisterReplicaError::NAME_EXISTS; + case memgraph::replication::RegisterReplicaError::END_POINT_EXISTS: + return memgraph::dbms::RegisterReplicaError::END_POINT_EXISTS; + case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED: + return memgraph::dbms::RegisterReplicaError::COULD_NOT_BE_PERSISTED; + case memgraph::replication::RegisterReplicaError::SUCCESS: + break; + } if (!allow_mt_repl && dbms_handler_.All().size() > 1) { spdlog::warn("Multi-tenant replication is currently not supported!"); } + bool all_clients_good = true; + + // Add database specific clients (NOTE Currently all databases are connected to each replica) dbms_handler_.ForEach([&](Database *db) { auto *storage = db->storage(); if (!allow_mt_repl && storage->id() != kDefaultDB) { @@ -143,18 +154,29 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio if (storage->storage_mode_ != storage::StorageMode::IN_MEMORY_TRANSACTIONAL) return; all_clients_good &= - storage->repl_storage_state_.replication_clients_.WithLock([storage, &config](auto &clients) -> bool { - auto client = storage->CreateReplicationClient(config, &storage->repl_storage_state_.epoch_); - client->Start(); - - if (client->State() == storage::replication::ReplicaState::INVALID) { + storage->repl_storage_state_.replication_clients_.WithLock([storage, &instance_client](auto &storage_clients) { + auto client = std::make_unique(*instance_client.GetValue()); + client->Start(storage); + // After start the storage <-> replica state should be READY or RECOVERING (if correctly started) + // MAYBE_BEHIND isn't a statement of the current state, this is the default value + // Failed to start due to branching of MAIN and REPLICA + if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) { return false; } - clients.push_back(std::move(client)); + storage_clients.push_back(std::move(client)); return true; }); }); - if (!all_clients_good) return RegisterReplicaError::CONNECTION_FAILED; // TODO: this happen to 1 or many...what to do + + // NOTE Currently if any databases fails, we revert back + if (!all_clients_good) { + spdlog::error("Failed to register all databases to the REPLICA \"{}\"", config.name); + UnregisterReplica(config.name); + return RegisterReplicaError::CONNECTION_FAILED; + } + + // No client error, start instance level client + StartReplicaClient(dbms_handler_, *instance_client.GetValue()); return {}; } @@ -163,60 +185,66 @@ auto ReplicationHandler::UnregisterReplica(std::string_view name) -> UnregisterR return UnregisterReplicaResult::NOT_MAIN; }; auto const main_handler = [this, name](RoleMainData &mainData) -> UnregisterReplicaResult { - if (!repl_state_.TryPersistUnregisterReplica(name)) { + if (!dbms_handler_.ReplicationState().TryPersistUnregisterReplica(name)) { return UnregisterReplicaResult::COULD_NOT_BE_PERSISTED; } - auto const n_unregistered = - std::erase_if(mainData.registered_replicas_, - [&](ReplicationClientConfig const ®istered_config) { return registered_config.name == name; }); - - dbms_handler_.ForEach([&](Database *db) { - db->storage()->repl_storage_state_.replication_clients_.WithLock( - [&](auto &clients) { std::erase_if(clients, [&](const auto &client) { return client->Name() == name; }); }); + // Remove database specific clients + dbms_handler_.ForEach([name](Database *db) { + db->storage()->repl_storage_state_.replication_clients_.WithLock([&name](auto &clients) { + std::erase_if(clients, [name](const auto &client) { return client->Name() == name; }); + }); }); - + // Remove instance level clients + auto const n_unregistered = + std::erase_if(mainData.registered_replicas_, [name](auto const &client) { return client.name_ == name; }); return n_unregistered != 0 ? UnregisterReplicaResult::SUCCESS : UnregisterReplicaResult::CAN_NOT_UNREGISTER; }; - return std::visit(utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData()); + return std::visit(utils::Overloaded{main_handler, replica_handler}, + dbms_handler_.ReplicationState().ReplicationData()); } -auto ReplicationHandler::GetRole() const -> memgraph::replication::ReplicationRole { return repl_state_.GetRole(); } +auto ReplicationHandler::GetRole() const -> memgraph::replication::ReplicationRole { + return dbms_handler_.ReplicationState().GetRole(); +} -bool ReplicationHandler::IsMain() const { return repl_state_.IsMain(); } +bool ReplicationHandler::IsMain() const { return dbms_handler_.ReplicationState().IsMain(); } -bool ReplicationHandler::IsReplica() const { return repl_state_.IsReplica(); } +bool ReplicationHandler::IsReplica() const { return dbms_handler_.ReplicationState().IsReplica(); } -void RestoreReplication(const replication::ReplicationState &repl_state, storage::Storage &storage) { +// Per storage +// NOTE Storage will connect to all replicas. Future work might change this +void RestoreReplication(replication::ReplicationState &repl_state, storage::Storage &storage) { spdlog::info("Restoring replication role."); /// MAIN - auto const recover_main = [&storage](RoleMainData const &mainData) { - for (const auto &config : mainData.registered_replicas_) { - spdlog::info("Replica {} restoration started for {}.", config.name, storage.id()); + auto const recover_main = [&storage](RoleMainData &mainData) { + // Each individual client has already been restored and started. Here we just go through each database and start its + // client + for (auto &instance_client : mainData.registered_replicas_) { + spdlog::info("Replica {} restoration started for {}.", instance_client.name_, storage.id()); - auto register_replica = [&storage](const memgraph::replication::ReplicationClientConfig &config) - -> memgraph::utils::BasicResult { - return storage.repl_storage_state_.replication_clients_.WithLock( - [&storage, &config](auto &clients) -> utils::BasicResult { - auto client = storage.CreateReplicationClient(config, &storage.repl_storage_state_.epoch_); - client->Start(); + const auto &ret = storage.repl_storage_state_.replication_clients_.WithLock( + [&](auto &storage_clients) -> utils::BasicResult { + auto client = std::make_unique(instance_client); + client->Start(&storage); + // After start the storage <-> replica state should be READY or RECOVERING (if correctly started) + // MAYBE_BEHIND isn't a statement of the current state, this is the default value + // Failed to start due to branching of MAIN and REPLICA + if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) { + spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", + instance_client.name_); + } + storage_clients.push_back(std::move(client)); + return {}; + }); - if (client->State() == storage::replication::ReplicaState::INVALID) { - spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", - client->Name()); - } - clients.push_back(std::move(client)); - return {}; - }); - }; - - auto ret = register_replica(config); if (ret.HasError()) { MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError()); - LOG_FATAL("Failure when restoring replica {}: {}.", config.name, RegisterReplicaErrorToString(ret.GetError())); + LOG_FATAL("Failure when restoring replica {}: {}.", instance_client.name_, + RegisterReplicaErrorToString(ret.GetError())); } - spdlog::info("Replica {} restored for {}.", config.name, storage.id()); + spdlog::info("Replica {} restored for {}.", instance_client.name_, storage.id()); } spdlog::info("Replication role restored to MAIN."); }; @@ -229,6 +257,6 @@ void RestoreReplication(const replication::ReplicationState &repl_state, storage recover_main, recover_replica, }, - std::as_const(repl_state).ReplicationData()); + repl_state.ReplicationData()); } } // namespace memgraph::dbms diff --git a/src/dbms/replication_handler.hpp b/src/dbms/replication_handler.hpp index e50c47969..dc95407b1 100644 --- a/src/dbms/replication_handler.hpp +++ b/src/dbms/replication_handler.hpp @@ -36,8 +36,7 @@ enum class UnregisterReplicaResult : uint8_t { /// A handler type that keep in sync current ReplicationState and the MAIN/REPLICA-ness of Storage /// TODO: extend to do multiple storages struct ReplicationHandler { - ReplicationHandler(memgraph::replication::ReplicationState &replState, DbmsHandler &dbms_handler) - : repl_state_(replState), dbms_handler_(dbms_handler) {} + explicit ReplicationHandler(DbmsHandler &dbms_handler); // as REPLICA, become MAIN bool SetReplicationRoleMain(); @@ -58,12 +57,11 @@ struct ReplicationHandler { bool IsReplica() const; private: - memgraph::replication::ReplicationState &repl_state_; DbmsHandler &dbms_handler_; }; /// A handler type that keep in sync current ReplicationState and the MAIN/REPLICA-ness of Storage /// TODO: extend to do multiple storages -void RestoreReplication(const replication::ReplicationState &repl_state, storage::Storage &storage); +void RestoreReplication(replication::ReplicationState &repl_state, storage::Storage &storage); } // namespace memgraph::dbms diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 983dd61f9..ce43f634d 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -368,34 +368,17 @@ int main(int argc, char **argv) { std::unique_ptr auth_checker; auth_glue(&auth_, auth_handler, auth_checker); - memgraph::replication::ReplicationState repl_state(ReplicationStateRootPath(db_config)); - - memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state + memgraph::dbms::DbmsHandler dbms_handler(db_config #ifdef MG_ENTERPRISE , &auth_, FLAGS_data_recovery_on_startup, FLAGS_storage_delete_on_drop #endif ); auto db_acc = dbms_handler.Get(); - memgraph::query::InterpreterContext interpreter_context_(interp_config, &dbms_handler, &repl_state, - auth_handler.get(), auth_checker.get()); - MG_ASSERT(db_acc, "Failed to access the main database"); - // TODO: Move it somewhere better - // Startup replication state (if recovered at startup) - MG_ASSERT(std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData const &) { return true; }, - [&](memgraph::replication::RoleReplicaData const &data) { - // Register handlers - memgraph::dbms::InMemoryReplicationHandlers::Register( - &dbms_handler, *data.server); - if (!data.server->Start()) { - spdlog::error("Unable to start the replication server."); - return false; - } - return true; - }}, - repl_state.ReplicationData()), - "Replica recovery failure!"); + memgraph::query::InterpreterContext interpreter_context_( + interp_config, &dbms_handler, &dbms_handler.ReplicationState(), auth_handler.get(), auth_checker.get()); + MG_ASSERT(db_acc, "Failed to access the main database"); memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(), FLAGS_data_directory); diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index dc11c3887..d63736c85 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3025,7 +3025,7 @@ class ReplicationQuery : public memgraph::query::Query { enum class SyncMode { SYNC, ASYNC }; - enum class ReplicaState { READY, REPLICATING, RECOVERY, INVALID }; + enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND }; ReplicationQuery() = default; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 354f13dc3..d957d4c2e 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -274,8 +274,7 @@ inline auto convertToReplicationMode(const ReplicationQuery::SyncMode &sync_mode class ReplQueryHandler final : public query::ReplicationQueryHandler { public: - explicit ReplQueryHandler(dbms::DbmsHandler *dbms_handler, memgraph::replication::ReplicationState *repl_state) - : dbms_handler_(dbms_handler), handler_{*repl_state, *dbms_handler} {} + explicit ReplQueryHandler(dbms::DbmsHandler *dbms_handler) : dbms_handler_(dbms_handler), handler_{*dbms_handler} {} /// @throw QueryRuntimeException if an error ocurred. void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role, std::optional port) override { @@ -404,8 +403,8 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { case storage::replication::ReplicaState::RECOVERY: replica.state = ReplicationQuery::ReplicaState::RECOVERY; break; - case storage::replication::ReplicaState::INVALID: - replica.state = ReplicationQuery::ReplicaState::INVALID; + case storage::replication::ReplicaState::MAYBE_BEHIND: + replica.state = ReplicationQuery::ReplicaState::MAYBE_BEHIND; break; } @@ -713,8 +712,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, InterpreterContext *interpreter_ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters, dbms::DbmsHandler *dbms_handler, const query::InterpreterConfig &config, - std::vector *notifications, - memgraph::replication::ReplicationState *repl_state) { + std::vector *notifications) { // TODO: MemoryResource for EvaluationContext, it should probably be passed as // the argument to Callback. EvaluationContext evaluation_context; @@ -734,8 +732,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & notifications->emplace_back(SeverityLevel::WARNING, NotificationCode::REPLICA_PORT_WARNING, "Be careful the replication port must be different from the memgraph port!"); } - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}, role = repl_query->role_, - maybe_port]() mutable { + callback.fn = [handler = ReplQueryHandler{dbms_handler}, role = repl_query->role_, maybe_port]() mutable { handler.SetReplicationRole(role, maybe_port); return std::vector>(); }; @@ -747,7 +744,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: { callback.header = {"replication role"}; - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}] { + callback.fn = [handler = ReplQueryHandler{dbms_handler}] { auto mode = handler.ShowReplicationRole(); switch (mode) { case ReplicationQuery::ReplicationRole::MAIN: { @@ -766,7 +763,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & auto socket_address = repl_query->socket_address_->Accept(evaluator); const auto replica_check_frequency = config.replication_replica_check_frequency; - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}, name, socket_address, sync_mode, + callback.fn = [handler = ReplQueryHandler{dbms_handler}, name, socket_address, sync_mode, replica_check_frequency]() mutable { handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, replica_check_frequency); return std::vector>(); @@ -777,7 +774,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } case ReplicationQuery::Action::DROP_REPLICA: { const auto &name = repl_query->replica_name_; - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}, name]() mutable { + callback.fn = [handler = ReplQueryHandler{dbms_handler}, name]() mutable { handler.DropReplica(name); return std::vector>(); }; @@ -789,7 +786,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & callback.header = { "name", "socket_address", "sync_mode", "current_timestamp_of_replica", "number_of_timestamp_behind_master", "state"}; - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}, replica_nfields = callback.header.size()] { + callback.fn = [handler = ReplQueryHandler{dbms_handler}, replica_nfields = callback.header.size()] { const auto &replicas = handler.ShowReplicas(); auto typed_replicas = std::vector>{}; typed_replicas.reserve(replicas.size()); @@ -822,7 +819,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & case ReplicationQuery::ReplicaState::RECOVERY: typed_replica.emplace_back("recovery"); break; - case ReplicationQuery::ReplicaState::INVALID: + case ReplicationQuery::ReplicaState::MAYBE_BEHIND: typed_replica.emplace_back("invalid"); break; } @@ -2263,15 +2260,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, bool in_explicit_transaction, std::vector *notifications, dbms::DbmsHandler &dbms_handler, - const InterpreterConfig &config, - memgraph::replication::ReplicationState *repl_state) { + const InterpreterConfig &config) { if (in_explicit_transaction) { throw ReplicationModificationInMulticommandTxException(); } auto *replication_query = utils::Downcast(parsed_query.query); - auto callback = HandleReplicationQuery(replication_query, parsed_query.parameters, &dbms_handler, config, - notifications, repl_state); + auto callback = + HandleReplicationQuery(replication_query, parsed_query.parameters, &dbms_handler, config, notifications); return PreparedQuery{callback.header, std::move(parsed_query.required_privileges), [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( @@ -3349,8 +3345,7 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB ¤t_db, InterpreterContext *interpreter_context, - std::optional> on_change_cb, - memgraph::replication::ReplicationState *repl_state) { + std::optional> on_change_cb) { #ifdef MG_ENTERPRISE if (!license::global_license_checker.IsEnterpriseValidFast()) { throw QueryException("Trying to use enterprise feature without a valid license."); @@ -3361,9 +3356,11 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur auto *query = utils::Downcast(parsed_query.query); auto *db_handler = interpreter_context->dbms_handler; + const bool is_replica = interpreter_context->repl_state->IsReplica(); + switch (query->action_) { case MultiDatabaseQuery::Action::CREATE: - if (repl_state->IsReplica()) { + if (is_replica) { throw QueryException("Query forbidden on the replica!"); } return PreparedQuery{ @@ -3408,12 +3405,12 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur if (current_db.in_explicit_db_) { throw QueryException("Database switching is prohibited if session explicitly defines the used database"); } - if (!dbms::allow_mt_repl && repl_state->IsReplica()) { + if (!dbms::allow_mt_repl && is_replica) { throw QueryException("Query forbidden on the replica!"); } return PreparedQuery{{"STATUS"}, std::move(parsed_query.required_privileges), - [db_name = query->db_name_, db_handler, ¤t_db, on_change_cb]( + [db_name = query->db_name_, db_handler, ¤t_db, on_change = std::move(on_change_cb)]( AnyStream *stream, std::optional n) -> std::optional { std::vector> status; std::string res; @@ -3423,7 +3420,7 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur res = "Already using " + db_name; } else { auto tmp = db_handler->Get(db_name); - if (on_change_cb) (*on_change_cb)(db_name); // Will trow if cb fails + if (on_change) (*on_change)(db_name); // Will trow if cb fails current_db.SetCurrentDB(std::move(tmp), false); res = "Using " + db_name; } @@ -3442,7 +3439,7 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur query->db_name_}; case MultiDatabaseQuery::Action::DROP: - if (repl_state->IsReplica()) { + if (is_replica) { throw QueryException("Query forbidden on the replica!"); } return PreparedQuery{ @@ -3765,9 +3762,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, &query_execution->notifications, current_db_); } else if (utils::Downcast(parsed_query.query)) { /// TODO: make replication DB agnostic - prepared_query = PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, - &query_execution->notifications, *interpreter_context_->dbms_handler, - interpreter_context_->config, interpreter_context_->repl_state); + prepared_query = + PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, + *interpreter_context_->dbms_handler, interpreter_context_->config); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, current_db_); } else if (utils::Downcast(parsed_query.query)) { @@ -3807,8 +3804,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, throw MultiDatabaseQueryInMulticommandTxException(); } /// SYSTEM (Replication) + INTERPRETER - prepared_query = PrepareMultiDatabaseQuery(std::move(parsed_query), current_db_, interpreter_context_, on_change_, - interpreter_context_->repl_state); + prepared_query = + PrepareMultiDatabaseQuery(std::move(parsed_query), current_db_, interpreter_context_, on_change_); } else if (utils::Downcast(parsed_query.query)) { /// SYSTEM PURE ("SHOW DATABASES") /// INTERPRETER (TODO: "SHOW DATABASE") diff --git a/src/replication/CMakeLists.txt b/src/replication/CMakeLists.txt index 772ae5591..597ed096a 100644 --- a/src/replication/CMakeLists.txt +++ b/src/replication/CMakeLists.txt @@ -6,8 +6,10 @@ target_sources(mg-replication include/replication/epoch.hpp include/replication/config.hpp include/replication/mode.hpp + include/replication/messages.hpp include/replication/role.hpp include/replication/status.hpp + include/replication/replication_client.hpp include/replication/replication_server.hpp PRIVATE @@ -15,6 +17,8 @@ target_sources(mg-replication epoch.cpp config.cpp status.cpp + messages.cpp + replication_client.cpp replication_server.cpp ) target_include_directories(mg-replication PUBLIC include) diff --git a/src/replication/include/replication/messages.hpp b/src/replication/include/replication/messages.hpp new file mode 100644 index 000000000..57cf29351 --- /dev/null +++ b/src/replication/include/replication/messages.hpp @@ -0,0 +1,44 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "rpc/messages.hpp" +#include "slk/serialization.hpp" + +namespace memgraph::replication { + +struct FrequentHeartbeatReq { + static const utils::TypeInfo kType; // TODO: make constexpr? + static const utils::TypeInfo &GetTypeInfo() { return kType; } // WHAT? + + static void Load(FrequentHeartbeatReq *self, memgraph::slk::Reader *reader); + static void Save(const FrequentHeartbeatReq &self, memgraph::slk::Builder *builder); + FrequentHeartbeatReq() = default; +}; + +struct FrequentHeartbeatRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(FrequentHeartbeatRes *self, memgraph::slk::Reader *reader); + static void Save(const FrequentHeartbeatRes &self, memgraph::slk::Builder *builder); + FrequentHeartbeatRes() = default; + explicit FrequentHeartbeatRes(bool success) : success(success) {} + + bool success; +}; + +using FrequentHeartbeatRpc = rpc::RequestResponse; + +void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder); + +} // namespace memgraph::replication diff --git a/src/replication/include/replication/replication_client.hpp b/src/replication/include/replication/replication_client.hpp new file mode 100644 index 000000000..16e1010bf --- /dev/null +++ b/src/replication/include/replication/replication_client.hpp @@ -0,0 +1,82 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "replication/config.hpp" +#include "replication/messages.hpp" +#include "rpc/client.hpp" +#include "utils/scheduler.hpp" +#include "utils/thread_pool.hpp" + +#include +#include + +namespace memgraph::replication { + +template +concept InvocableWithStringView = std::invocable; + +struct ReplicationClient { + explicit ReplicationClient(const memgraph::replication::ReplicationClientConfig &config); + + ~ReplicationClient(); + ReplicationClient(ReplicationClient const &) = delete; + ReplicationClient &operator=(ReplicationClient const &) = delete; + ReplicationClient(ReplicationClient &&) noexcept = delete; + ReplicationClient &operator=(ReplicationClient &&) noexcept = delete; + + template + void StartFrequentCheck(F &&callback) { + // Help the user to get the most accurate replica state possible. + if (replica_check_frequency_ > std::chrono::seconds(0)) { + replica_checker_.Run("Replica Checker", replica_check_frequency_, [this, cb = std::forward(callback)] { + try { + bool success = false; + { + auto stream{rpc_client_.Stream()}; + success = stream.AwaitResponse().success; + } + if (success) { + cb(name_); + } + } catch (const rpc::RpcFailedException &) { + // Nothing to do...wait for a reconnect + } + }); + } + } + + std::string name_; + communication::ClientContext rpc_context_; + rpc::Client rpc_client_; + std::chrono::seconds replica_check_frequency_; + + memgraph::replication::ReplicationMode mode_{memgraph::replication::ReplicationMode::SYNC}; + // This thread pool is used for background tasks so we don't + // block the main storage thread + // We use only 1 thread for 2 reasons: + // - background tasks ALWAYS contain some kind of RPC communication. + // We can't have multiple RPC communication from a same client + // because that's not logically valid (e.g. you cannot send a snapshot + // and WAL at a same time because WAL will arrive earlier and be applied + // before the snapshot which is not correct) + // - the implementation is simplified as we have a total control of what + // this pool is executing. Also, we can simply queue multiple tasks + // and be sure of the execution order. + // Not having mulitple possible threads in the same client allows us + // to ignore concurrency problems inside the client. + utils::ThreadPool thread_pool_{1}; + + utils::Scheduler replica_checker_; +}; + +} // namespace memgraph::replication diff --git a/src/replication/include/replication/replication_server.hpp b/src/replication/include/replication/replication_server.hpp index e9ca1b549..5ff41b8a5 100644 --- a/src/replication/include/replication/replication_server.hpp +++ b/src/replication/include/replication/replication_server.hpp @@ -17,30 +17,6 @@ namespace memgraph::replication { -struct FrequentHeartbeatReq { - static const utils::TypeInfo kType; // TODO: make constexpr? - static const utils::TypeInfo &GetTypeInfo() { return kType; } // WHAT? - - static void Load(FrequentHeartbeatReq *self, memgraph::slk::Reader *reader); - static void Save(const FrequentHeartbeatReq &self, memgraph::slk::Builder *builder); - FrequentHeartbeatReq() = default; -}; - -struct FrequentHeartbeatRes { - static const utils::TypeInfo kType; - static const utils::TypeInfo &GetTypeInfo() { return kType; } - - static void Load(FrequentHeartbeatRes *self, memgraph::slk::Reader *reader); - static void Save(const FrequentHeartbeatRes &self, memgraph::slk::Builder *builder); - FrequentHeartbeatRes() = default; - explicit FrequentHeartbeatRes(bool success) : success(success) {} - - bool success; -}; - -// TODO: move to own header -using FrequentHeartbeatRpc = rpc::RequestResponse; - class ReplicationServer { public: explicit ReplicationServer(const memgraph::replication::ReplicationServerConfig &config); diff --git a/src/replication/include/replication/state.hpp b/src/replication/include/replication/state.hpp index 0460d0a9d..76aec1053 100644 --- a/src/replication/include/replication/state.hpp +++ b/src/replication/include/replication/state.hpp @@ -11,19 +11,22 @@ #pragma once -#include -#include -#include -#include - #include "kvstore/kvstore.hpp" #include "replication/config.hpp" #include "replication/epoch.hpp" #include "replication/mode.hpp" +#include "replication/replication_client.hpp" #include "replication/role.hpp" #include "replication_server.hpp" #include "status.hpp" #include "utils/result.hpp" +#include "utils/synchronized.hpp" + +#include +#include +#include +#include +#include namespace memgraph::replication { @@ -32,8 +35,17 @@ enum class RolePersisted : uint8_t { UNKNOWN_OR_NO, YES }; enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, COULD_NOT_BE_PERSISTED, NOT_MAIN, SUCCESS }; struct RoleMainData { + RoleMainData() = default; + explicit RoleMainData(ReplicationEpoch e) : epoch_(std::move(e)) {} + ~RoleMainData() = default; + + RoleMainData(RoleMainData const &) = delete; + RoleMainData &operator=(RoleMainData const &) = delete; + RoleMainData(RoleMainData &&) = default; + RoleMainData &operator=(RoleMainData &&) = default; + ReplicationEpoch epoch_; - std::vector registered_replicas_; + std::list registered_replicas_{}; }; struct RoleReplicaData { @@ -41,8 +53,10 @@ struct RoleReplicaData { std::unique_ptr server; }; +// Global (instance) level object struct ReplicationState { explicit ReplicationState(std::optional durability_dir); + ~ReplicationState() = default; ReplicationState(ReplicationState const &) = delete; ReplicationState(ReplicationState &&) = delete; @@ -74,7 +88,7 @@ struct ReplicationState { // TODO: locked access auto ReplicationData() -> ReplicationData_t & { return replication_data_; } auto ReplicationData() const -> ReplicationData_t const & { return replication_data_; } - auto RegisterReplica(const ReplicationClientConfig &config) -> RegisterReplicaError; + utils::BasicResult RegisterReplica(const ReplicationClientConfig &config); bool SetReplicationRoleMain(); diff --git a/src/replication/messages.cpp b/src/replication/messages.cpp new file mode 100644 index 000000000..4503e9df2 --- /dev/null +++ b/src/replication/messages.cpp @@ -0,0 +1,65 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "replication/messages.hpp" +#include "rpc/messages.hpp" +#include "slk/serialization.hpp" +#include "slk/streams.hpp" + +namespace memgraph::slk { +// Serialize code for FrequentHeartbeatRes +void Save(const memgraph::replication::FrequentHeartbeatRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.success, builder); +} +void Load(memgraph::replication::FrequentHeartbeatRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->success, reader); +} + +// Serialize code for FrequentHeartbeatReq +void Save(const memgraph::replication::FrequentHeartbeatReq & /*self*/, memgraph::slk::Builder * /*builder*/) { + /* Nothing to serialize */ +} +void Load(memgraph::replication::FrequentHeartbeatReq * /*self*/, memgraph::slk::Reader * /*reader*/) { + /* Nothing to serialize */ +} + +} // namespace memgraph::slk + +namespace memgraph::replication { + +constexpr utils::TypeInfo FrequentHeartbeatReq::kType{utils::TypeId::REP_FREQUENT_HEARTBEAT_REQ, "FrequentHeartbeatReq", + nullptr}; + +constexpr utils::TypeInfo FrequentHeartbeatRes::kType{utils::TypeId::REP_FREQUENT_HEARTBEAT_RES, "FrequentHeartbeatRes", + nullptr}; + +void FrequentHeartbeatReq::Save(const FrequentHeartbeatReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} +void FrequentHeartbeatReq::Load(FrequentHeartbeatReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} +void FrequentHeartbeatRes::Save(const FrequentHeartbeatRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} +void FrequentHeartbeatRes::Load(FrequentHeartbeatRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + +void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { + FrequentHeartbeatReq req; + FrequentHeartbeatReq::Load(&req, req_reader); + memgraph::slk::Load(&req, req_reader); + FrequentHeartbeatRes res{true}; + memgraph::slk::Save(res, res_builder); +} + +} // namespace memgraph::replication diff --git a/src/replication/replication_client.cpp b/src/replication/replication_client.cpp new file mode 100644 index 000000000..d14250c2a --- /dev/null +++ b/src/replication/replication_client.cpp @@ -0,0 +1,40 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "replication/replication_client.hpp" + +namespace memgraph::replication { + +static auto CreateClientContext(const memgraph::replication::ReplicationClientConfig &config) + -> communication::ClientContext { + return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file} + : communication::ClientContext{}; +} + +ReplicationClient::ReplicationClient(const memgraph::replication::ReplicationClientConfig &config) + : name_{config.name}, + rpc_context_{CreateClientContext(config)}, + rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port), + &rpc_context_}, + replica_check_frequency_{config.replica_check_frequency}, + mode_{config.mode} {} + +ReplicationClient::~ReplicationClient() { + auto endpoint = rpc_client_.Endpoint(); + try { + spdlog::trace("Closing replication client on {}:{}", endpoint.address, endpoint.port); + } catch (...) { + // Logging can throw. Not a big deal, just ignore. + } + thread_pool_.Shutdown(); +} + +} // namespace memgraph::replication diff --git a/src/replication/replication_server.cpp b/src/replication/replication_server.cpp index 7d0ff3cc2..f79ea2add 100644 --- a/src/replication/replication_server.cpp +++ b/src/replication/replication_server.cpp @@ -10,25 +10,7 @@ // licenses/APL.txt. #include "replication/replication_server.hpp" -#include "rpc/messages.hpp" -#include "slk/serialization.hpp" -#include "slk/streams.hpp" - -namespace memgraph::slk { - -// Serialize code for FrequentHeartbeatRes -void Save(const memgraph::replication::FrequentHeartbeatRes &self, memgraph::slk::Builder *builder) { - memgraph::slk::Save(self.success, builder); -} -void Load(memgraph::replication::FrequentHeartbeatRes *self, memgraph::slk::Reader *reader) { - memgraph::slk::Load(&self->success, reader); -} - -// Serialize code for FrequentHeartbeatReq -void Save(const memgraph::replication::FrequentHeartbeatReq &self, memgraph::slk::Builder *builder) {} -void Load(memgraph::replication::FrequentHeartbeatReq *self, memgraph::slk::Reader *reader) {} - -} // namespace memgraph::slk +#include "replication/messages.hpp" namespace memgraph::replication { namespace { @@ -39,13 +21,6 @@ auto CreateServerContext(const memgraph::replication::ReplicationServerConfig &c : communication::ServerContext{}; } -void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { - FrequentHeartbeatReq req; - memgraph::slk::Load(&req, req_reader); - FrequentHeartbeatRes res{true}; - memgraph::slk::Save(res, res_builder); -} - // NOTE: The replication server must have a single thread for processing // because there is no need for more processing threads - each replica can // have only a single main server. Also, the single-threaded guarantee @@ -53,25 +28,6 @@ void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder constexpr auto kReplicationServerThreads = 1; } // namespace -constexpr utils::TypeInfo FrequentHeartbeatReq::kType{utils::TypeId::REP_FREQUENT_HEARTBEAT_REQ, "FrequentHeartbeatReq", - nullptr}; - -constexpr utils::TypeInfo FrequentHeartbeatRes::kType{utils::TypeId::REP_FREQUENT_HEARTBEAT_RES, "FrequentHeartbeatRes", - nullptr}; - -void FrequentHeartbeatReq::Save(const FrequentHeartbeatReq &self, memgraph::slk::Builder *builder) { - memgraph::slk::Save(self, builder); -} -void FrequentHeartbeatReq::Load(FrequentHeartbeatReq *self, memgraph::slk::Reader *reader) { - memgraph::slk::Load(self, reader); -} -void FrequentHeartbeatRes::Save(const FrequentHeartbeatRes &self, memgraph::slk::Builder *builder) { - memgraph::slk::Save(self, builder); -} -void FrequentHeartbeatRes::Load(FrequentHeartbeatRes *self, memgraph::slk::Reader *reader) { - memgraph::slk::Load(self, reader); -} - ReplicationServer::ReplicationServer(const memgraph::replication::ReplicationServerConfig &config) : rpc_server_context_{CreateServerContext(config)}, rpc_server_{io::network::Endpoint{config.ip_address, config.port}, &rpc_server_context_, diff --git a/src/replication/state.cpp b/src/replication/state.cpp index 4551eba7e..60c390e17 100644 --- a/src/replication/state.cpp +++ b/src/replication/state.cpp @@ -11,9 +11,11 @@ #include "replication/state.hpp" +#include "replication/replication_client.hpp" #include "replication/replication_server.hpp" #include "replication/status.hpp" #include "utils/file.hpp" +#include "utils/result.hpp" #include "utils/variant_helpers.hpp" constexpr auto kReplicationDirectory = std::string_view{"replication"}; @@ -125,12 +127,9 @@ auto ReplicationState::FetchReplicationData() -> FetchReplicationResult_t { return std::visit( utils::Overloaded{ [&](durability::MainRole &&r) -> FetchReplicationResult_t { - auto res = RoleMainData{ - .epoch_ = std::move(r.epoch), - }; + auto res = RoleMainData{std::move(r.epoch)}; auto b = durability_->begin(durability::kReplicationReplicaPrefix); auto e = durability_->end(durability::kReplicationReplicaPrefix); - res.registered_replicas_.reserve(durability_->Size(durability::kReplicationReplicaPrefix)); for (; b != e; ++b) { auto const &[replica_name, replica_data] = *b; auto json = nlohmann::json::parse(replica_data, nullptr, false); @@ -141,7 +140,8 @@ auto ReplicationState::FetchReplicationData() -> FetchReplicationResult_t { if (key_name != data.config.name) { return FetchReplicationError::PARSE_ERROR; } - res.registered_replicas_.emplace_back(std::move(data.config)); + // Instance clients + res.registered_replicas_.emplace_back(data.config); } catch (...) { return FetchReplicationError::PARSE_ERROR; } @@ -221,7 +221,7 @@ bool ReplicationState::SetReplicationRoleMain() { if (!TryPersistRoleMain(new_epoch)) { return false; } - replication_data_ = RoleMainData{.epoch_ = ReplicationEpoch{new_epoch}}; + replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}}; return true; } @@ -233,16 +233,14 @@ bool ReplicationState::SetReplicationRoleReplica(const ReplicationServerConfig & return true; } -auto ReplicationState::RegisterReplica(const ReplicationClientConfig &config) -> RegisterReplicaError { - auto const replica_handler = [](RoleReplicaData const &) -> RegisterReplicaError { - return RegisterReplicaError::NOT_MAIN; - }; - auto const main_handler = [this, &config](RoleMainData &mainData) -> RegisterReplicaError { +utils::BasicResult ReplicationState::RegisterReplica( + const ReplicationClientConfig &config) { + auto const replica_handler = [](RoleReplicaData const &) { return RegisterReplicaError::NOT_MAIN; }; + ReplicationClient *client{nullptr}; + auto const main_handler = [&client, &config, this](RoleMainData &mainData) -> RegisterReplicaError { // name check auto name_check = [&config](auto const &replicas) { - auto name_matches = [&name = config.name](ReplicationClientConfig const ®istered_config) { - return registered_config.name == name; - }; + auto name_matches = [&name = config.name](auto const &replica) { return replica.name_ == name; }; return std::any_of(replicas.begin(), replicas.end(), name_matches); }; if (name_check(mainData.registered_replicas_)) { @@ -251,8 +249,9 @@ auto ReplicationState::RegisterReplica(const ReplicationClientConfig &config) -> // endpoint check auto endpoint_check = [&](auto const &replicas) { - auto endpoint_matches = [&config](ReplicationClientConfig const ®istered_config) { - return registered_config.ip_address == config.ip_address && registered_config.port == config.port; + auto endpoint_matches = [&config](auto const &replica) { + const auto &ep = replica.rpc_client_.Endpoint(); + return ep.address == config.ip_address && ep.port == config.port; }; return std::any_of(replicas.begin(), replicas.end(), endpoint_matches); }; @@ -266,10 +265,14 @@ auto ReplicationState::RegisterReplica(const ReplicationClientConfig &config) -> } // set - mainData.registered_replicas_.emplace_back(config); + client = &mainData.registered_replicas_.emplace_back(config); return RegisterReplicaError::SUCCESS; }; - return std::visit(utils::Overloaded{main_handler, replica_handler}, replication_data_); + const auto &res = std::visit(utils::Overloaded{main_handler, replica_handler}, replication_data_); + if (res == RegisterReplicaError::SUCCESS) { + return client; + } + return res; } } // namespace memgraph::replication diff --git a/src/rpc/client.hpp b/src/rpc/client.hpp index f727391ac..1fd3fff8d 100644 --- a/src/rpc/client.hpp +++ b/src/rpc/client.hpp @@ -14,6 +14,7 @@ #include #include #include +#include #include "communication/client.hpp" #include "io/network/endpoint.hpp" @@ -41,16 +42,25 @@ class Client { StreamHandler(Client *self, std::unique_lock &&guard, std::function res_load) - : self_(self), - guard_(std::move(guard)), - req_builder_([self](const uint8_t *data, size_t size, bool have_more) { - if (!self->client_->Write(data, size, have_more)) throw GenericRpcFailedException(); - }), - res_load_(res_load) {} + : self_(self), guard_(std::move(guard)), req_builder_(GenBuilderCallback(self, this)), res_load_(res_load) {} public: - StreamHandler(StreamHandler &&) noexcept = default; - StreamHandler &operator=(StreamHandler &&) noexcept = default; + StreamHandler(StreamHandler &&other) noexcept + : self_{std::exchange(other.self_, nullptr)}, + defunct_{std::exchange(other.defunct_, true)}, + guard_{std::move(other.guard_)}, + req_builder_{std::move(other.req_builder_), GenBuilderCallback(self_, this)}, + res_load_{std::move(other.res_load_)} {} + StreamHandler &operator=(StreamHandler &&other) noexcept { + if (&other != this) { + self_ = std::exchange(other.self_, nullptr); + defunct_ = std::exchange(other.defunct_, true); + guard_ = std::move(other.guard_); + req_builder_ = slk::Builder(std::move(other.req_builder_, GenBuilderCallback(self_, this))); + res_load_ = std::move(other.res_load_); + } + return *this; + } StreamHandler(const StreamHandler &) = delete; StreamHandler &operator=(const StreamHandler &) = delete; @@ -70,10 +80,18 @@ class Client { while (true) { auto ret = slk::CheckStreamComplete(self_->client_->GetData(), self_->client_->GetDataSize()); if (ret.status == slk::StreamStatus::INVALID) { + // Logically invalid state, connection is still up, defunct stream and release + defunct_ = true; + guard_.unlock(); throw GenericRpcFailedException(); - } else if (ret.status == slk::StreamStatus::PARTIAL) { + } + if (ret.status == slk::StreamStatus::PARTIAL) { if (!self_->client_->Read(ret.stream_size - self_->client_->GetDataSize(), /* exactly_len = */ false)) { + // Failed connection, abort and let somebody retry in the future + defunct_ = true; + self_->Abort(); + guard_.unlock(); throw GenericRpcFailedException(); } } else { @@ -103,7 +121,9 @@ class Client { // Check the response ID. if (res_id != res_type.id && res_id != utils::TypeId::UNKNOWN) { spdlog::error("Message response was of unexpected type"); - self_->client_ = std::nullopt; + // Logically invalid state, connection is still up, defunct stream and release + defunct_ = true; + guard_.unlock(); throw GenericRpcFailedException(); } @@ -112,8 +132,23 @@ class Client { return res_load_(&res_reader); } + bool IsDefunct() const { return defunct_; } + private: + static auto GenBuilderCallback(Client *client, StreamHandler *self) { + return [client, self](const uint8_t *data, size_t size, bool have_more) { + if (self->defunct_) throw GenericRpcFailedException(); + if (!client->client_->Write(data, size, have_more)) { + self->defunct_ = true; + client->Abort(); + self->guard_.unlock(); + throw GenericRpcFailedException(); + } + }; + } + Client *self_; + bool defunct_ = false; std::unique_lock guard_; slk::Builder req_builder_; std::function res_load_; @@ -179,7 +214,7 @@ class Client { TRequestResponse::Request::Save(request, handler.GetBuilder()); // Return the handler to the user. - return std::move(handler); + return handler; } /// Call a previously defined and registered RPC call. This function can diff --git a/src/slk/streams.cpp b/src/slk/streams.cpp index 5125d635a..dc5ef8c3c 100644 --- a/src/slk/streams.cpp +++ b/src/slk/streams.cpp @@ -30,7 +30,7 @@ void Builder::Save(const uint8_t *data, uint64_t size) { to_write = kSegmentMaxDataSize - pos_; } - memcpy(segment_ + sizeof(SegmentSize) + pos_, data + offset, to_write); + memcpy(segment_.data() + sizeof(SegmentSize) + pos_, data + offset, to_write); size -= to_write; pos_ += to_write; @@ -48,15 +48,15 @@ void Builder::FlushSegment(bool final_segment) { size_t total_size = sizeof(SegmentSize) + pos_; SegmentSize size = pos_; - memcpy(segment_, &size, sizeof(SegmentSize)); + memcpy(segment_.data(), &size, sizeof(SegmentSize)); if (final_segment) { SegmentSize footer = 0; - memcpy(segment_ + total_size, &footer, sizeof(SegmentSize)); + memcpy(segment_.data() + total_size, &footer, sizeof(SegmentSize)); total_size += sizeof(SegmentSize); } - write_func_(segment_, total_size, !final_segment); + write_func_(segment_.data(), total_size, !final_segment); pos_ = 0; } diff --git a/src/slk/streams.hpp b/src/slk/streams.hpp index 587b7830b..691189443 100644 --- a/src/slk/streams.hpp +++ b/src/slk/streams.hpp @@ -46,7 +46,11 @@ static_assert(kSegmentMaxDataSize <= std::numeric_limits::max(), /// Builder used to create a SLK segment stream. class Builder { public: - Builder(std::function write_func); + explicit Builder(std::function write_func); + Builder(Builder &&other, std::function write_func) + : write_func_{std::move(write_func)}, pos_{std::exchange(other.pos_, 0)}, segment_{other.segment_} { + other.write_func_ = [](const uint8_t *, size_t, bool) { /* Moved builder is defunct, no write possible */ }; + } /// Function used internally by SLK to serialize the data. void Save(const uint8_t *data, uint64_t size); @@ -59,7 +63,7 @@ class Builder { std::function write_func_; size_t pos_{0}; - uint8_t segment_[kSegmentMaxTotalSize]; + std::array segment_; }; /// Exception that will be thrown if segments can't be decoded from the byte diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index 9f6d8d4d7..150a02cc7 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -39,6 +39,7 @@ add_library(mg-storage-v2 STATIC replication/slk.cpp replication/rpc.cpp replication/replication_storage_state.cpp - inmemory/replication/replication_client.cpp + inmemory/replication/recovery.cpp ) + target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk mg-events mg-memory) diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index 3575e685d..e93566c09 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -313,12 +313,6 @@ class DiskStorage final : public Storage { uint64_t CommitTimestamp(std::optional desired_commit_timestamp = {}); - auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig & /*config*/, - const memgraph::replication::ReplicationEpoch * /*current_epoch*/) - -> std::unique_ptr override { - throw utils::BasicException("Disk storage mode does not support replication."); - } - std::unique_ptr kvstore_; DurableMetadata durable_metadata_; EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE}; diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index 1240bd52e..9a43a2876 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -96,6 +96,7 @@ std::vector GetSnapshotFiles(const std::filesystem::path MG_ASSERT(!error_code, "Couldn't recover data because an error occurred: {}!", error_code.message()); } + std::sort(snapshot_files.begin(), snapshot_files.end()); return snapshot_files; } @@ -106,13 +107,17 @@ std::optional> GetWalFiles(const std::filesystem: std::vector wal_files; std::error_code error_code; + // There could be multiple "current" WAL files, the "_current" tag just means that the previous session didn't + // finalize. We cannot skip based on name, will be able to skip based on invalid data or sequence number, so the + // actual current wal will be skipped for (const auto &item : std::filesystem::directory_iterator(wal_directory, error_code)) { if (!item.is_regular_file()) continue; try { auto info = ReadWalInfo(item.path()); - if ((uuid.empty() || info.uuid == uuid) && (!current_seq_num || info.seq_num < *current_seq_num)) + if ((uuid.empty() || info.uuid == uuid) && (!current_seq_num || info.seq_num < *current_seq_num)) { wal_files.emplace_back(info.seq_num, info.from_timestamp, info.to_timestamp, std::move(info.uuid), std::move(info.epoch_id), item.path()); + } } catch (const RecoveryFailure &e) { spdlog::warn("Failed to read {}", item.path()); continue; @@ -120,6 +125,7 @@ std::optional> GetWalFiles(const std::filesystem: } MG_ASSERT(!error_code, "Couldn't recover data because an error occurred: {}!", error_code.message()); + // Sort based on the sequence number, not the file name std::sort(wal_files.begin(), wal_files.end()); return std::move(wal_files); } @@ -246,8 +252,6 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di std::optional snapshot_timestamp; if (!snapshot_files.empty()) { spdlog::info("Try recovering from snapshot directory {}.", snapshot_directory); - // Order the files by name - std::sort(snapshot_files.begin(), snapshot_files.end()); // UUID used for durability is the UUID of the last snapshot file. *uuid = snapshot_files.back().uuid; diff --git a/src/storage/v2/inmemory/replication/recovery.cpp b/src/storage/v2/inmemory/replication/recovery.cpp new file mode 100644 index 000000000..b62fdc2f9 --- /dev/null +++ b/src/storage/v2/inmemory/replication/recovery.cpp @@ -0,0 +1,238 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v2/inmemory/replication/recovery.hpp" +#include +#include +#include +#include +#include "storage/v2/durability/durability.hpp" +#include "storage/v2/inmemory/storage.hpp" +#include "storage/v2/replication/recovery.hpp" +#include "utils/on_scope_exit.hpp" +#include "utils/variant_helpers.hpp" + +namespace memgraph::storage { + +// Handler for transferring the current WAL file whose data is +// contained in the internal buffer and the file. +class InMemoryCurrentWalHandler { + public: + explicit InMemoryCurrentWalHandler(InMemoryStorage const *storage, rpc::Client &rpc_client); + void AppendFilename(const std::string &filename); + + void AppendSize(size_t size); + + void AppendFileData(utils::InputFile *file); + + void AppendBufferData(const uint8_t *buffer, size_t buffer_size); + + /// @throw rpc::RpcFailedException + replication::CurrentWalRes Finalize(); + + private: + rpc::Client::StreamHandler stream_; +}; + +////// CurrentWalHandler ////// +InMemoryCurrentWalHandler::InMemoryCurrentWalHandler(InMemoryStorage const *storage, rpc::Client &rpc_client) + : stream_(rpc_client.Stream(storage->id())) {} + +void InMemoryCurrentWalHandler::AppendFilename(const std::string &filename) { + replication::Encoder encoder(stream_.GetBuilder()); + encoder.WriteString(filename); +} + +void InMemoryCurrentWalHandler::AppendSize(const size_t size) { + replication::Encoder encoder(stream_.GetBuilder()); + encoder.WriteUint(size); +} + +void InMemoryCurrentWalHandler::AppendFileData(utils::InputFile *file) { + replication::Encoder encoder(stream_.GetBuilder()); + encoder.WriteFileData(file); +} + +void InMemoryCurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) { + replication::Encoder encoder(stream_.GetBuilder()); + encoder.WriteBuffer(buffer, buffer_size); +} + +replication::CurrentWalRes InMemoryCurrentWalHandler::Finalize() { return stream_.AwaitResponse(); } + +////// ReplicationClient Helpers ////// +replication::WalFilesRes TransferWalFiles(std::string db_name, rpc::Client &client, + const std::vector &wal_files) { + MG_ASSERT(!wal_files.empty(), "Wal files list is empty!"); + auto stream = client.Stream(std::move(db_name), wal_files.size()); + replication::Encoder encoder(stream.GetBuilder()); + for (const auto &wal : wal_files) { + spdlog::debug("Sending wal file: {}", wal); + encoder.WriteFile(wal); + } + return stream.AwaitResponse(); +} + +replication::SnapshotRes TransferSnapshot(std::string db_name, rpc::Client &client, const std::filesystem::path &path) { + auto stream = client.Stream(std::move(db_name)); + replication::Encoder encoder(stream.GetBuilder()); + encoder.WriteFile(path); + return stream.AwaitResponse(); +} + +uint64_t ReplicateCurrentWal(const InMemoryStorage *storage, rpc::Client &client, durability::WalFile const &wal_file) { + InMemoryCurrentWalHandler stream{storage, client}; + stream.AppendFilename(wal_file.Path().filename()); + utils::InputFile file; + MG_ASSERT(file.Open(wal_file.Path()), "Failed to open current WAL file at {}!", wal_file.Path()); + const auto [buffer, buffer_size] = wal_file.CurrentFileBuffer(); + stream.AppendSize(file.GetSize() + buffer_size); + stream.AppendFileData(&file); + stream.AppendBufferData(buffer, buffer_size); + auto response = stream.Finalize(); + return response.current_commit_timestamp; +} + +/// This method tries to find the optimal path for recoverying a single replica. +/// Based on the last commit transfered to replica it tries to update the +/// replica using durability files - WALs and Snapshots. WAL files are much +/// smaller in size as they contain only the Deltas (changes) made during the +/// transactions while Snapshots contain all the data. For that reason we prefer +/// WALs as much as possible. As the WAL file that is currently being updated +/// can change during the process we ignore it as much as possible. Also, it +/// uses the transaction lock so locking it can be really expensive. After we +/// fetch the list of finalized WALs, we try to find the longest chain of +/// sequential WALs, starting from the latest one, that will update the recovery +/// with the all missed updates. If the WAL chain cannot be created, replica is +/// behind by a lot, so we use the regular recovery process, we send the latest +/// snapshot and all the necessary WAL files, starting from the newest WAL that +/// contains a timestamp before the snapshot. If we registered the existence of +/// the current WAL, we add the sequence number we read from it to the recovery +/// process. After all the other steps are finished, if the current WAL contains +/// the same sequence number, it's the same WAL we read while fetching the +/// recovery steps, so we can safely send it to the replica. +/// We assume that the property of preserving at least 1 WAL before the snapshot +/// is satisfied as we extract the timestamp information from it. +std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker, + const InMemoryStorage *storage) { + std::vector recovery_steps; + auto locker_acc = file_locker->Access(); + + // First check if we can recover using the current wal file only + // otherwise save the seq_num of the current wal file + // This lock is also necessary to force the missed transaction to finish. + std::optional current_wal_seq_num; + std::optional current_wal_from_timestamp; + + std::unique_lock transaction_guard( + storage->engine_lock_); // Hold the storage lock so the current wal file cannot be changed + (void)locker_acc.AddPath(storage->wal_directory_); // Protect all WALs from being deleted + + if (storage->wal_file_) { + current_wal_seq_num.emplace(storage->wal_file_->SequenceNumber()); + current_wal_from_timestamp.emplace(storage->wal_file_->FromTimestamp()); + // No need to hold the lock since the current WAL is present and we can simply skip them + transaction_guard.unlock(); + } + + // Read in finalized WAL files (excluding the current/active WAL) + utils::OnScopeExit + release_wal_dir( // Each individually used file will be locked, so at the end, the dir can be released + [&locker_acc, &wal_dir = storage->wal_directory_]() { (void)locker_acc.RemovePath(wal_dir); }); + // Get WAL files, ordered by timestamp, from oldest to newest + auto wal_files = durability::GetWalFiles(storage->wal_directory_, storage->uuid_, current_wal_seq_num); + MG_ASSERT(wal_files, "Wal files could not be loaded"); + if (transaction_guard.owns_lock()) + transaction_guard.unlock(); // In case we didn't have a current wal file, we can unlock only now since there is no + // guarantee what we'll see after we add the wal file + + // Read in snapshot files + (void)locker_acc.AddPath(storage->snapshot_directory_); // Protect all snapshots from being deleted + utils::OnScopeExit + release_snapshot_dir( // Each individually used file will be locked, so at the end, the dir can be released + [&locker_acc, &snapshot_dir = storage->snapshot_directory_]() { (void)locker_acc.RemovePath(snapshot_dir); }); + auto snapshot_files = durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_); + std::optional latest_snapshot{}; + if (!snapshot_files.empty()) { + latest_snapshot.emplace(std::move(snapshot_files.back())); + } + + auto add_snapshot = [&]() { + if (!latest_snapshot) return; + const auto lock_success = locker_acc.AddPath(latest_snapshot->path); + MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant snapshot path."); + recovery_steps.emplace_back(std::in_place_type_t{}, std::move(latest_snapshot->path)); + }; + + // Check if we need the snapshot or if the WAL chain is enough + if (!wal_files->empty()) { + // Find WAL chain that contains the replica's commit timestamp + auto wal_chain_it = wal_files->rbegin(); + auto prev_seq{wal_chain_it->seq_num}; + for (; wal_chain_it != wal_files->rend(); ++wal_chain_it) { + if (prev_seq - wal_chain_it->seq_num > 1) { + // Broken chain, must have a snapshot that covers the missing commits + if (wal_chain_it->from_timestamp > replica_commit) { + // Chain does not go far enough, check the snapshot + MG_ASSERT(latest_snapshot, "Missing snapshot, while the WAL chain does not cover enough time."); + // Check for a WAL file that connects the snapshot to the chain + for (;; --wal_chain_it) { + // Going from the newest WAL files, find the first one that has a from_timestamp older than the snapshot + // NOTE: It could be that the only WAL needed is the current one + if (wal_chain_it->from_timestamp <= latest_snapshot->start_timestamp) { + break; + } + if (wal_chain_it == wal_files->rbegin()) break; + } + // Add snapshot to recovery steps + add_snapshot(); + } + break; + } + + if (wal_chain_it->to_timestamp <= replica_commit) { + // Got to a WAL that is older than what we need to recover the replica + break; + } + + prev_seq = wal_chain_it->seq_num; + } + + // Copy and lock the chain part we need, from oldest to newest + RecoveryWals rw{}; + rw.reserve(std::distance(wal_files->rbegin(), wal_chain_it)); + for (auto wal_it = wal_chain_it.base(); wal_it != wal_files->end(); ++wal_it) { + const auto lock_success = locker_acc.AddPath(wal_it->path); + MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant WAL path."); + rw.emplace_back(std::move(wal_it->path)); + } + if (!rw.empty()) { + recovery_steps.emplace_back(std::in_place_type_t{}, std::move(rw)); + } + + } else { + // No WAL chain, check if we need the snapshot + if (!current_wal_from_timestamp || replica_commit < *current_wal_from_timestamp) { + // No current wal or current wal too new + add_snapshot(); + } + } + + // In all cases, if we have a current wal file we need to use itW + if (current_wal_seq_num) { + // NOTE: File not handled directly, so no need to lock it + recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); + } + + return recovery_steps; +} + +} // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/replication/recovery.hpp b/src/storage/v2/inmemory/replication/recovery.hpp new file mode 100644 index 000000000..2025800ab --- /dev/null +++ b/src/storage/v2/inmemory/replication/recovery.hpp @@ -0,0 +1,32 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "storage/v2/durability/durability.hpp" +#include "storage/v2/replication/recovery.hpp" +#include "storage/v2/replication/replication_client.hpp" + +namespace memgraph::storage { +class InMemoryStorage; + +////// ReplicationClient Helpers ////// + +replication::WalFilesRes TransferWalFiles(std::string db_name, rpc::Client &client, + const std::vector &wal_files); + +replication::SnapshotRes TransferSnapshot(std::string db_name, rpc::Client &client, const std::filesystem::path &path); + +uint64_t ReplicateCurrentWal(const InMemoryStorage *storage, rpc::Client &client, durability::WalFile const &wal_file); + +auto GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker, + const InMemoryStorage *storage) -> std::vector; + +} // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/replication/replication_client.cpp b/src/storage/v2/inmemory/replication/replication_client.cpp deleted file mode 100644 index b8ecc1c72..000000000 --- a/src/storage/v2/inmemory/replication/replication_client.cpp +++ /dev/null @@ -1,349 +0,0 @@ -// Copyright 2023 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#include "storage/v2/inmemory/replication/replication_client.hpp" - -#include "storage/v2/durability/durability.hpp" -#include "storage/v2/inmemory/storage.hpp" - -namespace memgraph::storage { - -namespace { -template -[[maybe_unused]] inline constexpr bool always_false_v = false; -} // namespace - -// Handler for transfering the current WAL file whose data is -// contained in the internal buffer and the file. -class CurrentWalHandler { - public: - explicit CurrentWalHandler(ReplicationClient *self); - void AppendFilename(const std::string &filename); - - void AppendSize(size_t size); - - void AppendFileData(utils::InputFile *file); - - void AppendBufferData(const uint8_t *buffer, size_t buffer_size); - - /// @throw rpc::RpcFailedException - replication::CurrentWalRes Finalize(); - - private: - ReplicationClient *self_; - rpc::Client::StreamHandler stream_; -}; - -////// CurrentWalHandler ////// -CurrentWalHandler::CurrentWalHandler(ReplicationClient *self) - : self_(self), stream_(self_->rpc_client_.Stream(self->GetStorageId())) {} - -void CurrentWalHandler::AppendFilename(const std::string &filename) { - replication::Encoder encoder(stream_.GetBuilder()); - encoder.WriteString(filename); -} - -void CurrentWalHandler::AppendSize(const size_t size) { - replication::Encoder encoder(stream_.GetBuilder()); - encoder.WriteUint(size); -} - -void CurrentWalHandler::AppendFileData(utils::InputFile *file) { - replication::Encoder encoder(stream_.GetBuilder()); - encoder.WriteFileData(file); -} - -void CurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) { - replication::Encoder encoder(stream_.GetBuilder()); - encoder.WriteBuffer(buffer, buffer_size); -} - -replication::CurrentWalRes CurrentWalHandler::Finalize() { return stream_.AwaitResponse(); } - -////// ReplicationClient Helpers ////// - -replication::WalFilesRes TransferWalFiles(std::string db_name, rpc::Client &client, - const std::vector &wal_files) { - MG_ASSERT(!wal_files.empty(), "Wal files list is empty!"); - auto stream = client.Stream(std::move(db_name), wal_files.size()); - replication::Encoder encoder(stream.GetBuilder()); - for (const auto &wal : wal_files) { - spdlog::debug("Sending wal file: {}", wal); - encoder.WriteFile(wal); - } - return stream.AwaitResponse(); -} - -replication::SnapshotRes TransferSnapshot(std::string db_name, rpc::Client &client, const std::filesystem::path &path) { - auto stream = client.Stream(std::move(db_name)); - replication::Encoder encoder(stream.GetBuilder()); - encoder.WriteFile(path); - return stream.AwaitResponse(); -} - -uint64_t ReplicateCurrentWal(CurrentWalHandler &stream, durability::WalFile const &wal_file) { - stream.AppendFilename(wal_file.Path().filename()); - utils::InputFile file; - MG_ASSERT(file.Open(wal_file.Path()), "Failed to open current WAL file!"); - const auto [buffer, buffer_size] = wal_file.CurrentFileBuffer(); - stream.AppendSize(file.GetSize() + buffer_size); - stream.AppendFileData(&file); - stream.AppendBufferData(buffer, buffer_size); - auto response = stream.Finalize(); - return response.current_commit_timestamp; -} - -////// ReplicationClient ////// - -InMemoryReplicationClient::InMemoryReplicationClient(InMemoryStorage *storage, - const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *epoch) - : ReplicationClient{storage, config, epoch} {} - -void InMemoryReplicationClient::RecoverReplica(uint64_t replica_commit) { - spdlog::debug("Starting replica recover"); - auto *storage = static_cast(storage_); - while (true) { - auto file_locker = storage->file_retainer_.AddLocker(); - - const auto steps = GetRecoverySteps(replica_commit, &file_locker); - int i = 0; - for (const InMemoryReplicationClient::RecoveryStep &recovery_step : steps) { - spdlog::trace("Recovering in step: {}", i++); - try { - std::visit( - [&, this](T &&arg) { - using StepType = std::remove_cvref_t; - if constexpr (std::is_same_v) { // TODO: split into 3 overloads - spdlog::debug("Sending the latest snapshot file: {}", arg); - auto response = TransferSnapshot(storage->id(), rpc_client_, arg); - replica_commit = response.current_commit_timestamp; - } else if constexpr (std::is_same_v) { - spdlog::debug("Sending the latest wal files"); - auto response = TransferWalFiles(storage->id(), rpc_client_, arg); - replica_commit = response.current_commit_timestamp; - spdlog::debug("Wal files successfully transferred."); - } else if constexpr (std::is_same_v) { - std::unique_lock transaction_guard(storage->engine_lock_); - if (storage->wal_file_ && storage->wal_file_->SequenceNumber() == arg.current_wal_seq_num) { - storage->wal_file_->DisableFlushing(); - transaction_guard.unlock(); - spdlog::debug("Sending current wal file"); - auto streamHandler = CurrentWalHandler{this}; - replica_commit = ReplicateCurrentWal(streamHandler, *storage->wal_file_); - storage->wal_file_->EnableFlushing(); - } else { - spdlog::debug("Cannot recover using current wal file"); - } - } else { - static_assert(always_false_v, "Missing type from variant visitor"); - } - }, - recovery_step); - } catch (const rpc::RpcFailedException &) { - { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); - } - HandleRpcFailure(); - return; - } - } - - spdlog::trace("Current timestamp on replica: {}", replica_commit); - // To avoid the situation where we read a correct commit timestamp in - // one thread, and after that another thread commits a different a - // transaction and THEN we set the state to READY in the first thread, - // we set this lock before checking the timestamp. - // We will detect that the state is invalid during the next commit, - // because replication::AppendDeltasRpc sends the last commit timestamp which - // replica checks if it's the same last commit timestamp it received - // and we will go to recovery. - // By adding this lock, we can avoid that, and go to RECOVERY immediately. - std::unique_lock client_guard{client_lock_}; - const auto last_commit_timestamp = LastCommitTimestamp(); - SPDLOG_INFO("Replica timestamp: {}", replica_commit); - SPDLOG_INFO("Last commit: {}", last_commit_timestamp); - if (last_commit_timestamp == replica_commit) { - replica_state_.store(replication::ReplicaState::READY); - return; - } - } -} - -/// This method tries to find the optimal path for recoverying a single replica. -/// Based on the last commit transfered to replica it tries to update the -/// replica using durability files - WALs and Snapshots. WAL files are much -/// smaller in size as they contain only the Deltas (changes) made during the -/// transactions while Snapshots contain all the data. For that reason we prefer -/// WALs as much as possible. As the WAL file that is currently being updated -/// can change during the process we ignore it as much as possible. Also, it -/// uses the transaction lock so locking it can be really expensive. After we -/// fetch the list of finalized WALs, we try to find the longest chain of -/// sequential WALs, starting from the latest one, that will update the recovery -/// with the all missed updates. If the WAL chain cannot be created, replica is -/// behind by a lot, so we use the regular recovery process, we send the latest -/// snapshot and all the necessary WAL files, starting from the newest WAL that -/// contains a timestamp before the snapshot. If we registered the existence of -/// the current WAL, we add the sequence number we read from it to the recovery -/// process. After all the other steps are finished, if the current WAL contains -/// the same sequence number, it's the same WAL we read while fetching the -/// recovery steps, so we can safely send it to the replica. -/// We assume that the property of preserving at least 1 WAL before the snapshot -/// is satisfied as we extract the timestamp information from it. -std::vector InMemoryReplicationClient::GetRecoverySteps( - const uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker) { - // First check if we can recover using the current wal file only - // otherwise save the seq_num of the current wal file - // This lock is also necessary to force the missed transaction to finish. - std::optional current_wal_seq_num; - std::optional current_wal_from_timestamp; - auto *storage = static_cast(storage_); - if (std::unique_lock transtacion_guard(storage->engine_lock_); storage->wal_file_) { - current_wal_seq_num.emplace(storage->wal_file_->SequenceNumber()); - current_wal_from_timestamp.emplace(storage->wal_file_->FromTimestamp()); - } - - auto locker_acc = file_locker->Access(); - auto wal_files = durability::GetWalFiles(storage->wal_directory_, storage->uuid_, current_wal_seq_num); - MG_ASSERT(wal_files, "Wal files could not be loaded"); - - auto snapshot_files = durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_); - std::optional latest_snapshot; - if (!snapshot_files.empty()) { - std::sort(snapshot_files.begin(), snapshot_files.end()); - latest_snapshot.emplace(std::move(snapshot_files.back())); - } - - std::vector recovery_steps; - - // No finalized WAL files were found. This means the difference is contained - // inside the current WAL or the snapshot. - if (wal_files->empty()) { - if (current_wal_from_timestamp && replica_commit >= *current_wal_from_timestamp) { - MG_ASSERT(current_wal_seq_num); - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - return recovery_steps; - } - - // Without the finalized WAL containing the current timestamp of replica, - // we cannot know if the difference is only in the current WAL or we need - // to send the snapshot. - if (latest_snapshot) { - const auto lock_success = locker_acc.AddPath(latest_snapshot->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - recovery_steps.emplace_back(std::in_place_type_t{}, std::move(latest_snapshot->path)); - } - // if there are no finalized WAL files, snapshot left the current WAL - // as the WAL file containing a transaction before snapshot creation - // so we can be sure that the current WAL is present - MG_ASSERT(current_wal_seq_num); - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - return recovery_steps; - } - - // Find the longest chain of WALs for recovery. - // The chain consists ONLY of sequential WALs. - auto rwal_it = wal_files->rbegin(); - - // if the last finalized WAL is before the replica commit - // then we can recovery only from current WAL - if (rwal_it->to_timestamp <= replica_commit) { - MG_ASSERT(current_wal_seq_num); - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - return recovery_steps; - } - - uint64_t previous_seq_num{rwal_it->seq_num}; - for (; rwal_it != wal_files->rend(); ++rwal_it) { - // If the difference between two consecutive wal files is not 0 or 1 - // we have a missing WAL in our chain - if (previous_seq_num - rwal_it->seq_num > 1) { - break; - } - - // Find first WAL that contains up to replica commit, i.e. WAL - // that is before the replica commit or conatins the replica commit - // as the last committed transaction OR we managed to find the first WAL - // file. - if (replica_commit >= rwal_it->from_timestamp || rwal_it->seq_num == 0) { - if (replica_commit >= rwal_it->to_timestamp) { - // We want the WAL after because the replica already contains all the - // commits from this WAL - --rwal_it; - } - std::vector wal_chain; - auto distance_from_first = std::distance(rwal_it, wal_files->rend() - 1); - // We have managed to create WAL chain - // We need to lock these files and add them to the chain - for (auto result_wal_it = wal_files->begin() + distance_from_first; result_wal_it != wal_files->end(); - ++result_wal_it) { - const auto lock_success = locker_acc.AddPath(result_wal_it->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - wal_chain.push_back(std::move(result_wal_it->path)); - } - - recovery_steps.emplace_back(std::in_place_type_t{}, std::move(wal_chain)); - - if (current_wal_seq_num) { - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - } - return recovery_steps; - } - - previous_seq_num = rwal_it->seq_num; - } - - MG_ASSERT(latest_snapshot, "Invalid durability state, missing snapshot"); - // We didn't manage to find a WAL chain, we need to send the latest snapshot - // with its WALs - const auto lock_success = locker_acc.AddPath(latest_snapshot->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - recovery_steps.emplace_back(std::in_place_type_t{}, std::move(latest_snapshot->path)); - - std::vector recovery_wal_files; - auto wal_it = wal_files->begin(); - for (; wal_it != wal_files->end(); ++wal_it) { - // Assuming recovery process is correct the snashpot should - // always retain a single WAL that contains a transaction - // before its creation - if (latest_snapshot->start_timestamp < wal_it->to_timestamp) { - if (latest_snapshot->start_timestamp < wal_it->from_timestamp) { - MG_ASSERT(wal_it != wal_files->begin(), "Invalid durability files state"); - --wal_it; - } - break; - } - } - - for (; wal_it != wal_files->end(); ++wal_it) { - const auto lock_success = locker_acc.AddPath(wal_it->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - recovery_wal_files.push_back(std::move(wal_it->path)); - } - - // We only have a WAL before the snapshot - if (recovery_wal_files.empty()) { - const auto lock_success = locker_acc.AddPath(wal_files->back().path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - recovery_wal_files.push_back(std::move(wal_files->back().path)); - } - - recovery_steps.emplace_back(std::in_place_type_t{}, std::move(recovery_wal_files)); - - if (current_wal_seq_num) { - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - } - - return recovery_steps; -} - -} // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/replication/replication_client.hpp b/src/storage/v2/inmemory/replication/replication_client.hpp deleted file mode 100644 index e956838e7..000000000 --- a/src/storage/v2/inmemory/replication/replication_client.hpp +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2023 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. -#pragma once - -#include "storage/v2/replication/replication_client.hpp" - -namespace memgraph::storage { - -class InMemoryStorage; - -class InMemoryReplicationClient : public ReplicationClient { - public: - InMemoryReplicationClient(InMemoryStorage *storage, const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *epoch); - - protected: - void RecoverReplica(uint64_t replica_commit) override; - - // TODO: move the GetRecoverySteps stuff below as an internal detail - using RecoverySnapshot = std::filesystem::path; - using RecoveryWals = std::vector; - struct RecoveryCurrentWal { - explicit RecoveryCurrentWal(const uint64_t current_wal_seq_num) : current_wal_seq_num(current_wal_seq_num) {} - uint64_t current_wal_seq_num; - }; - using RecoveryStep = std::variant; - std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker); -}; - -} // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 9f00081f6..c27018d24 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -18,7 +18,7 @@ /// REPLICATION /// #include "dbms/inmemory/replication_handlers.hpp" -#include "storage/v2/inmemory/replication/replication_client.hpp" +#include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" #include "utils/resource_lock.hpp" #include "utils/stat.hpp" @@ -1608,7 +1608,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction // A single transaction will always be contained in a single WAL file. auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire); - repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber()); + repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber(), this); auto append_deltas = [&](auto callback) { // Helper lambda that traverses the delta chain on order to find the first @@ -1767,7 +1767,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction wal_file_->AppendTransactionEnd(final_commit_timestamp); FinalizeWalFile(); - return repl_storage_state_.FinalizeTransaction(final_commit_timestamp); + return repl_storage_state_.FinalizeTransaction(final_commit_timestamp, this); } bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, uint64_t final_commit_timestamp) { @@ -1775,7 +1775,7 @@ bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, return true; } - repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber()); + repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber(), this); for (const auto &md_delta : transaction.md_deltas) { switch (md_delta.action) { @@ -1846,7 +1846,7 @@ bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, wal_file_->AppendTransactionEnd(final_commit_timestamp); FinalizeWalFile(); - return repl_storage_state_.FinalizeTransaction(final_commit_timestamp); + return repl_storage_state_.FinalizeTransaction(final_commit_timestamp, this); } void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label, @@ -1972,12 +1972,6 @@ utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath() return true; } -auto InMemoryStorage::CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *current_epoch) - -> std::unique_ptr { - return std::make_unique(this, config, current_epoch); -} - std::unique_ptr InMemoryStorage::Access(std::optional override_isolation_level, bool is_main) { return std::unique_ptr(new InMemoryAccessor{Storage::Accessor::shared_access, this, diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index bfb445332..3c50326b2 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -18,10 +18,13 @@ #include "storage/v2/indices/label_index_stats.hpp" #include "storage/v2/inmemory/label_index.hpp" #include "storage/v2/inmemory/label_property_index.hpp" +#include "storage/v2/inmemory/replication/recovery.hpp" +#include "storage/v2/replication/replication_client.hpp" #include "storage/v2/storage.hpp" /// REPLICATION /// #include "replication/config.hpp" +#include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/replication_storage_state.hpp" #include "storage/v2/replication/rpc.hpp" @@ -44,7 +47,10 @@ namespace memgraph::storage { class InMemoryStorage final : public Storage { friend class memgraph::dbms::InMemoryReplicationHandlers; - friend class InMemoryReplicationClient; + friend class ReplicationStorageClient; + friend std::vector GetRecoverySteps(uint64_t replica_commit, + utils::FileRetainer::FileLocker *file_locker, + const InMemoryStorage *storage); public: enum class CreateSnapshotError : uint8_t { DisabledForReplica, ReachedMaxNumTries }; @@ -332,10 +338,6 @@ class InMemoryStorage final : public Storage { using Storage::CreateTransaction; Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode, bool is_main) override; - auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *current_epoch) - -> std::unique_ptr override; - void SetStorageMode(StorageMode storage_mode); private: diff --git a/src/storage/v2/replication/enums.hpp b/src/storage/v2/replication/enums.hpp index e89a1fdd3..be16ca192 100644 --- a/src/storage/v2/replication/enums.hpp +++ b/src/storage/v2/replication/enums.hpp @@ -14,6 +14,6 @@ namespace memgraph::storage::replication { -enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID }; +enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, MAYBE_BEHIND }; } // namespace memgraph::storage::replication diff --git a/src/storage/v2/replication/recovery.hpp b/src/storage/v2/replication/recovery.hpp new file mode 100644 index 000000000..346e03ecd --- /dev/null +++ b/src/storage/v2/replication/recovery.hpp @@ -0,0 +1,28 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include + +namespace memgraph::storage { + +using RecoverySnapshot = std::filesystem::path; +using RecoveryWals = std::vector; +struct RecoveryCurrentWal { + explicit RecoveryCurrentWal(const uint64_t current_wal_seq_num) : current_wal_seq_num(current_wal_seq_num) {} + uint64_t current_wal_seq_num; +}; +using RecoveryStep = std::variant; + +} // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 33313b130..3bc1b3d32 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -9,53 +9,37 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include "storage/v2/replication/replication_client.hpp" +#include "replication/replication_client.hpp" +#include "storage/v2/durability/durability.hpp" +#include "storage/v2/inmemory/storage.hpp" +#include "storage/v2/storage.hpp" +#include "utils/exceptions.hpp" +#include "utils/variant_helpers.hpp" #include #include -#include "storage/v2/durability/durability.hpp" -#include "storage/v2/storage.hpp" +namespace { +template +[[maybe_unused]] inline constexpr bool always_false_v = false; +} // namespace namespace memgraph::storage { -static auto CreateClientContext(const memgraph::replication::ReplicationClientConfig &config) - -> communication::ClientContext { - return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file} - : communication::ClientContext{}; -} +ReplicationStorageClient::ReplicationStorageClient(::memgraph::replication::ReplicationClient &client) + : client_{client} {} -ReplicationClient::ReplicationClient(Storage *storage, const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *epoch) - : name_{config.name}, - rpc_context_{CreateClientContext(config)}, - rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port), - &rpc_context_}, - replica_check_frequency_{config.replica_check_frequency}, - mode_{config.mode}, - storage_{storage}, - repl_epoch_{epoch} {} - -ReplicationClient::~ReplicationClient() { - auto endpoint = rpc_client_.Endpoint(); - spdlog::trace("Closing replication client on {}:{}", endpoint.address, endpoint.port); - thread_pool_.Shutdown(); -} - -uint64_t ReplicationClient::LastCommitTimestamp() const { - return storage_->repl_storage_state_.last_commit_timestamp_.load(); -} - -void ReplicationClient::InitializeClient() { +void ReplicationStorageClient::CheckReplicaState(Storage *storage) { uint64_t current_commit_timestamp{kTimestampInitialId}; - auto stream{rpc_client_.Stream( - storage_->id(), storage_->repl_storage_state_.last_commit_timestamp_, std::string{repl_epoch_->id()})}; + auto &replStorageState = storage->repl_storage_state_; + auto stream{client_.rpc_client_.Stream( + storage->id(), replStorageState.last_commit_timestamp_, std::string{replStorageState.epoch_.id()})}; const auto replica = stream.AwaitResponse(); std::optional branching_point; - if (replica.epoch_id != repl_epoch_->id() && replica.current_commit_timestamp != kTimestampInitialId) { - auto const &history = storage_->repl_storage_state_.history; + if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) { + auto const &history = replStorageState.history; const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) { return main_epoch_info.first == replica.epoch_id; }); @@ -71,94 +55,86 @@ void ReplicationClient::InitializeClient() { "Replica {} acted as the Main instance. Both the Main and Replica {} " "now hold unique data. Please resolve data conflicts and start the " "replication on a clean instance.", - name_, name_, name_); + client_.name_, client_.name_, client_.name_); + // State not updated, hence in MAYBE_BEHIND state return; } current_commit_timestamp = replica.current_commit_timestamp; - spdlog::trace("Current timestamp on replica {}: {}", name_, current_commit_timestamp); - spdlog::trace("Current timestamp on main: {}", storage_->repl_storage_state_.last_commit_timestamp_.load()); - if (current_commit_timestamp == storage_->repl_storage_state_.last_commit_timestamp_.load()) { - spdlog::debug("Replica '{}' up to date", name_); - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::READY); - } else { - spdlog::debug("Replica '{}' is behind", name_); - { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::RECOVERY); + spdlog::trace("Current timestamp on replica {}: {}", client_.name_, current_commit_timestamp); + spdlog::trace("Current timestamp on main: {}", replStorageState.last_commit_timestamp_.load()); + replica_state_.WithLock([&](auto &state) { + if (current_commit_timestamp == replStorageState.last_commit_timestamp_.load()) { + spdlog::debug("Replica '{}' up to date", client_.name_); + state = replication::ReplicaState::READY; + } else { + spdlog::debug("Replica '{}' is behind", client_.name_); + state = replication::ReplicaState::RECOVERY; + client_.thread_pool_.AddTask( + [storage, current_commit_timestamp, this] { this->RecoverReplica(current_commit_timestamp, storage); }); } - thread_pool_.AddTask([=, this] { this->RecoverReplica(current_commit_timestamp); }); - } + }); } -TimestampInfo ReplicationClient::GetTimestampInfo() { +TimestampInfo ReplicationStorageClient::GetTimestampInfo(Storage const *storage) { TimestampInfo info; info.current_timestamp_of_replica = 0; info.current_number_of_timestamp_behind_master = 0; try { - auto stream{rpc_client_.Stream(storage_->id())}; + auto stream{client_.rpc_client_.Stream(storage->id())}; const auto response = stream.AwaitResponse(); const auto is_success = response.success; if (!is_success) { - replica_state_.store(replication::ReplicaState::INVALID); - HandleRpcFailure(); + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); + LogRpcFailure(); } - auto main_time_stamp = storage_->repl_storage_state_.last_commit_timestamp_.load(); + auto main_time_stamp = storage->repl_storage_state_.last_commit_timestamp_.load(); info.current_timestamp_of_replica = response.current_commit_timestamp; info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp; } catch (const rpc::RpcFailedException &) { - { - std::unique_lock client_guard(client_lock_); - replica_state_.store(replication::ReplicaState::INVALID); - } - HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't - // block + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); + LogRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably + // won't block } return info; } -void ReplicationClient::HandleRpcFailure() { - spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication")); - TryInitializeClientAsync(); +void ReplicationStorageClient::LogRpcFailure() { + spdlog::error( + utils::MessageWithLink("Couldn't replicate data to {}.", client_.name_, "https://memgr.ph/replication")); } -void ReplicationClient::TryInitializeClientAsync() { - thread_pool_.AddTask([this] { - rpc_client_.Abort(); - this->TryInitializeClientSync(); - }); +void ReplicationStorageClient::TryCheckReplicaStateAsync(Storage *storage) { + client_.thread_pool_.AddTask([storage, this] { this->TryCheckReplicaStateSync(storage); }); } -void ReplicationClient::TryInitializeClientSync() { +void ReplicationStorageClient::TryCheckReplicaStateSync(Storage *storage) { try { - InitializeClient(); + CheckReplicaState(storage); } catch (const rpc::VersionMismatchRpcFailedException &) { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); spdlog::error( utils::MessageWithLink("Failed to connect to replica {} at the endpoint {}. Because the replica " "deployed is not a compatible version.", - name_, rpc_client_.Endpoint(), "https://memgr.ph/replication")); + client_.name_, client_.rpc_client_.Endpoint(), "https://memgr.ph/replication")); } catch (const rpc::RpcFailedException &) { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); - spdlog::error(utils::MessageWithLink("Failed to connect to replica {} at the endpoint {}.", name_, - rpc_client_.Endpoint(), "https://memgr.ph/replication")); + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); + spdlog::error(utils::MessageWithLink("Failed to connect to replica {} at the endpoint {}.", client_.name_, + client_.rpc_client_.Endpoint(), "https://memgr.ph/replication")); } } -void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) { - std::unique_lock guard(client_lock_); - const auto status = replica_state_.load(); - switch (status) { - case replication::ReplicaState::RECOVERY: - spdlog::debug("Replica {} is behind MAIN instance", name_); +void ReplicationStorageClient::StartTransactionReplication(const uint64_t current_wal_seq_num, Storage *storage) { + auto locked_state = replica_state_.Lock(); + switch (*locked_state) { + using enum replication::ReplicaState; + case RECOVERY: + spdlog::debug("Replica {} is behind MAIN instance", client_.name_); return; - case replication::ReplicaState::REPLICATING: - spdlog::debug("Replica {} missed a transaction", name_); + case REPLICATING: + spdlog::debug("Replica {} missed a transaction", client_.name_); // We missed a transaction because we're still replicating // the previous transaction so we need to go to RECOVERY // state to catch up with the missing transaction @@ -166,143 +142,169 @@ void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_s // an error can happen while we're replicating the previous // transaction after which the client should go to // INVALID state before starting the recovery process - replica_state_.store(replication::ReplicaState::RECOVERY); + // + // This is a signal to any async streams that are still finalizing to start recovery, since this commit will be + // missed. + *locked_state = RECOVERY; return; - case replication::ReplicaState::INVALID: - HandleRpcFailure(); + case MAYBE_BEHIND: + spdlog::error( + utils::MessageWithLink("Couldn't replicate data to {}.", client_.name_, "https://memgr.ph/replication")); + TryCheckReplicaStateAsync(storage); return; - case replication::ReplicaState::READY: + case READY: MG_ASSERT(!replica_stream_); try { - replica_stream_.emplace( - ReplicaStream{this, storage_->repl_storage_state_.last_commit_timestamp_.load(), current_wal_seq_num}); - replica_state_.store(replication::ReplicaState::REPLICATING); + replica_stream_.emplace(storage, client_.rpc_client_, current_wal_seq_num); + *locked_state = REPLICATING; } catch (const rpc::RpcFailedException &) { - replica_state_.store(replication::ReplicaState::INVALID); - HandleRpcFailure(); + *locked_state = MAYBE_BEHIND; + LogRpcFailure(); } return; } } -bool ReplicationClient::FinalizeTransactionReplication() { +bool ReplicationStorageClient::FinalizeTransactionReplication(Storage *storage) { // We can only check the state because it guarantees to be only // valid during a single transaction replication (if the assumption // that this and other transaction replication functions can only be // called from a one thread stands) - if (replica_state_ != replication::ReplicaState::REPLICATING) { + if (State() != replication::ReplicaState::REPLICATING) { return false; } - auto task = [this]() { + if (replica_stream_->IsDefunct()) return false; + + auto task = [storage, this]() { MG_ASSERT(replica_stream_, "Missing stream for transaction deltas"); try { auto response = replica_stream_->Finalize(); - replica_stream_.reset(); - std::unique_lock client_guard(client_lock_); - if (!response.success || replica_state_ == replication::ReplicaState::RECOVERY) { - replica_state_.store(replication::ReplicaState::RECOVERY); - thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); }); - } else { - replica_state_.store(replication::ReplicaState::READY); + return replica_state_.WithLock([storage, &response, this](auto &state) { + replica_stream_.reset(); + if (!response.success || state == replication::ReplicaState::RECOVERY) { + state = replication::ReplicaState::RECOVERY; + client_.thread_pool_.AddTask( + [storage, &response, this] { this->RecoverReplica(response.current_commit_timestamp, storage); }); + return false; + } + state = replication::ReplicaState::READY; return true; - } + }); } catch (const rpc::RpcFailedException &) { - replica_stream_.reset(); - { - std::unique_lock client_guard(client_lock_); - replica_state_.store(replication::ReplicaState::INVALID); - } - HandleRpcFailure(); + replica_state_.WithLock([this](auto &state) { + replica_stream_.reset(); + state = replication::ReplicaState::MAYBE_BEHIND; + }); + LogRpcFailure(); + return false; } - return false; }; - if (mode_ == memgraph::replication::ReplicationMode::ASYNC) { - thread_pool_.AddTask([=] { (void)task(); }); + if (client_.mode_ == memgraph::replication::ReplicationMode::ASYNC) { + client_.thread_pool_.AddTask([task = std::move(task)] { (void)task(); }); return true; } return task(); } -void ReplicationClient::FrequentCheck() { - const auto is_success = std::invoke([this]() { - try { - auto stream{rpc_client_.Stream()}; - const auto response = stream.AwaitResponse(); - return response.success; - } catch (const rpc::RpcFailedException &) { - return false; - } - }); - // States: READY, REPLICATING, RECOVERY, INVALID - // If success && ready, replicating, recovery -> stay the same because something good is going on. - // If success && INVALID -> [it's possible that replica came back to life] -> TryInitializeClient. - // If fail -> [replica is not reachable at all] -> INVALID state. - // NOTE: TryInitializeClient might return nothing if there is a branching point. - // NOTE: The early return pattern simplified the code, but the behavior should be as explained. - if (!is_success) { - replica_state_.store(replication::ReplicaState::INVALID); - return; - } - if (replica_state_.load() == replication::ReplicaState::INVALID) { - TryInitializeClientAsync(); - } +void ReplicationStorageClient::Start(Storage *storage) { + spdlog::trace("Replication client started for database \"{}\"", storage->id()); + TryCheckReplicaStateSync(storage); } -void ReplicationClient::Start() { - auto const &endpoint = rpc_client_.Endpoint(); - spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port); - - TryInitializeClientSync(); - - // Help the user to get the most accurate replica state possible. - if (replica_check_frequency_ > std::chrono::seconds(0)) { - replica_checker_.Run("Replica Checker", replica_check_frequency_, [this] { this->FrequentCheck(); }); +void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph::storage::Storage *storage) { + if (storage->storage_mode_ != StorageMode::IN_MEMORY_TRANSACTIONAL) { + throw utils::BasicException("Only InMemoryTransactional mode supports replication!"); } -} + spdlog::debug("Starting replica recovery"); + auto *mem_storage = static_cast(storage); -void ReplicationClient::IfStreamingTransaction(const std::function &callback) { - // We can only check the state because it guarantees to be only - // valid during a single transaction replication (if the assumption - // that this and other transaction replication functions can only be - // called from a one thread stands) - if (replica_state_ != replication::ReplicaState::REPLICATING) { - return; - } + while (true) { + auto file_locker = mem_storage->file_retainer_.AddLocker(); - try { - callback(*replica_stream_); - } catch (const rpc::RpcFailedException &) { - { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); + const auto steps = GetRecoverySteps(replica_commit, &file_locker, mem_storage); + int i = 0; + for (const RecoveryStep &recovery_step : steps) { + spdlog::trace("Recovering in step: {}", i++); + try { + rpc::Client &rpcClient = client_.rpc_client_; + std::visit(utils::Overloaded{ + [&replica_commit, mem_storage, &rpcClient](RecoverySnapshot const &snapshot) { + spdlog::debug("Sending the latest snapshot file: {}", snapshot); + auto response = TransferSnapshot(mem_storage->id(), rpcClient, snapshot); + replica_commit = response.current_commit_timestamp; + }, + [&replica_commit, mem_storage, &rpcClient](RecoveryWals const &wals) { + spdlog::debug("Sending the latest wal files"); + auto response = TransferWalFiles(mem_storage->id(), rpcClient, wals); + replica_commit = response.current_commit_timestamp; + spdlog::debug("Wal files successfully transferred."); + }, + [&replica_commit, mem_storage, &rpcClient](RecoveryCurrentWal const ¤t_wal) { + std::unique_lock transaction_guard(mem_storage->engine_lock_); + if (mem_storage->wal_file_ && + mem_storage->wal_file_->SequenceNumber() == current_wal.current_wal_seq_num) { + mem_storage->wal_file_->DisableFlushing(); + transaction_guard.unlock(); + spdlog::debug("Sending current wal file"); + replica_commit = ReplicateCurrentWal(mem_storage, rpcClient, *mem_storage->wal_file_); + mem_storage->wal_file_->EnableFlushing(); + } else { + spdlog::debug("Cannot recover using current wal file"); + } + }, + [](auto const &in) { + static_assert(always_false_v, "Missing type from variant visitor"); + }, + }, + recovery_step); + } catch (const rpc::RpcFailedException &) { + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); + LogRpcFailure(); + return; + } + } + + spdlog::trace("Current timestamp on replica: {}", replica_commit); + // To avoid the situation where we read a correct commit timestamp in + // one thread, and after that another thread commits a different a + // transaction and THEN we set the state to READY in the first thread, + // we set this lock before checking the timestamp. + // We will detect that the state is invalid during the next commit, + // because replication::AppendDeltasRpc sends the last commit timestamp which + // replica checks if it's the same last commit timestamp it received + // and we will go to recovery. + // By adding this lock, we can avoid that, and go to RECOVERY immediately. + const auto last_commit_timestamp = storage->repl_storage_state_.last_commit_timestamp_.load(); + SPDLOG_INFO("Replica timestamp: {}", replica_commit); + SPDLOG_INFO("Last commit: {}", last_commit_timestamp); + if (last_commit_timestamp == replica_commit) { + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::READY; }); + return; } - HandleRpcFailure(); } } ////// ReplicaStream ////// -ReplicaStream::ReplicaStream(ReplicationClient *self, const uint64_t previous_commit_timestamp, - const uint64_t current_seq_num) - : self_(self), - stream_(self_->rpc_client_.Stream(self->GetStorageId(), previous_commit_timestamp, - current_seq_num)) { +ReplicaStream::ReplicaStream(Storage *storage, rpc::Client &rpc_client, const uint64_t current_seq_num) + : storage_{storage}, + stream_(rpc_client.Stream( + storage->id(), storage->repl_storage_state_.last_commit_timestamp_.load(), current_seq_num)) { replication::Encoder encoder{stream_.GetBuilder()}; - - encoder.WriteString(self->repl_epoch_->id()); + encoder.WriteString(storage->repl_storage_state_.epoch_.id()); } void ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp) { replication::Encoder encoder(stream_.GetBuilder()); - auto *storage = self_->GetStorage(); - EncodeDelta(&encoder, storage->name_id_mapper_.get(), storage->config_.items, delta, vertex, final_commit_timestamp); + EncodeDelta(&encoder, storage_->name_id_mapper_.get(), storage_->config_.items, delta, vertex, + final_commit_timestamp); } void ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge, uint64_t final_commit_timestamp) { replication::Encoder encoder(stream_.GetBuilder()); - EncodeDelta(&encoder, self_->GetStorage()->name_id_mapper_.get(), delta, edge, final_commit_timestamp); + EncodeDelta(&encoder, storage_->name_id_mapper_.get(), delta, edge, final_commit_timestamp); } void ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) { @@ -314,11 +316,10 @@ void ReplicaStream::AppendOperation(durability::StorageMetadataOperation operati const std::set &properties, const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, uint64_t timestamp) { replication::Encoder encoder(stream_.GetBuilder()); - EncodeOperation(&encoder, self_->GetStorage()->name_id_mapper_.get(), operation, label, properties, stats, - property_stats, timestamp); + EncodeOperation(&encoder, storage_->name_id_mapper_.get(), operation, label, properties, stats, property_stats, + timestamp); } replication::AppendDeltasRes ReplicaStream::Finalize() { return stream_.AwaitResponse(); } -auto ReplicationClient::GetStorageId() const -> std::string { return storage_->id(); } } // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 8cd8cb384..3d2c019e9 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -13,6 +13,8 @@ #include "replication/config.hpp" #include "replication/epoch.hpp" +#include "replication/messages.hpp" +#include "replication/replication_client.hpp" #include "rpc/client.hpp" #include "storage/v2/durability/storage_global_operation.hpp" #include "storage/v2/id_types.hpp" @@ -23,9 +25,12 @@ #include "storage/v2/replication/rpc.hpp" #include "utils/file_locker.hpp" #include "utils/scheduler.hpp" +#include "utils/synchronized.hpp" #include "utils/thread_pool.hpp" #include +#include +#include #include #include #include @@ -37,12 +42,12 @@ struct Delta; struct Vertex; struct Edge; class Storage; -class ReplicationClient; +class ReplicationStorageClient; // Handler used for transferring the current transaction. class ReplicaStream { public: - explicit ReplicaStream(ReplicationClient *self, uint64_t previous_commit_timestamp, uint64_t current_seq_num); + explicit ReplicaStream(Storage *storage, rpc::Client &rpc_client, uint64_t current_seq_num); /// @throw rpc::RpcFailedException void AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp); @@ -61,85 +66,84 @@ class ReplicaStream { /// @throw rpc::RpcFailedException replication::AppendDeltasRes Finalize(); + bool IsDefunct() const { return stream_.IsDefunct(); } + private: - ReplicationClient *self_; + Storage *storage_; rpc::Client::StreamHandler stream_; }; -class ReplicationClient { - friend class CurrentWalHandler; +template +concept InvocableWithStream = std::invocable; + +// TODO Rename to something without the word "client" +class ReplicationStorageClient { + friend class InMemoryCurrentWalHandler; friend class ReplicaStream; + friend struct ::memgraph::replication::ReplicationClient; public: - ReplicationClient(Storage *storage, const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *epoch); + explicit ReplicationStorageClient(::memgraph::replication::ReplicationClient &client); - ReplicationClient(ReplicationClient const &) = delete; - ReplicationClient &operator=(ReplicationClient const &) = delete; - ReplicationClient(ReplicationClient &&) noexcept = delete; - ReplicationClient &operator=(ReplicationClient &&) noexcept = delete; + ReplicationStorageClient(ReplicationStorageClient const &) = delete; + ReplicationStorageClient &operator=(ReplicationStorageClient const &) = delete; + ReplicationStorageClient(ReplicationStorageClient &&) noexcept = delete; + ReplicationStorageClient &operator=(ReplicationStorageClient &&) noexcept = delete; - virtual ~ReplicationClient(); + ~ReplicationStorageClient() = default; - auto Mode() const -> memgraph::replication::ReplicationMode { return mode_; } - auto Name() const -> std::string const & { return name_; } - auto Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); } - auto State() const -> replication::ReplicaState { return replica_state_.load(); } - auto GetTimestampInfo() -> TimestampInfo; + // TODO Remove the client related functions + auto Mode() const -> memgraph::replication::ReplicationMode { return client_.mode_; } + auto Name() const -> std::string const & { return client_.name_; } + auto Endpoint() const -> io::network::Endpoint const & { return client_.rpc_client_.Endpoint(); } - auto GetStorageId() const -> std::string; + auto State() const -> replication::ReplicaState { return replica_state_.WithLock(std::identity()); } + auto GetTimestampInfo(Storage const *storage) -> TimestampInfo; + + void Start(Storage *storage); + void StartTransactionReplication(uint64_t current_wal_seq_num, Storage *storage); - void Start(); - void StartTransactionReplication(uint64_t current_wal_seq_num); // Replication clients can be removed at any point // so to avoid any complexity of checking if the client was removed whenever // we want to send part of transaction and to avoid adding some GC logic this // function will run a callback if, after previously callling // StartTransactionReplication, stream is created. - void IfStreamingTransaction(const std::function &callback); + template + void IfStreamingTransaction(F &&callback) { + // We can only check the state because it guarantees to be only + // valid during a single transaction replication (if the assumption + // that this and other transaction replication functions can only be + // called from a one thread stands) + if (State() != replication::ReplicaState::REPLICATING) { + return; + } + if (replica_stream_->IsDefunct()) return; + try { + callback(*replica_stream_); // failure state what if not streaming (std::nullopt) + } catch (const rpc::RpcFailedException &) { + return replica_state_.WithLock([](auto &state) { state = replication::ReplicaState::MAYBE_BEHIND; }); + LogRpcFailure(); + } + } + // Return whether the transaction could be finalized on the replication client or not. - [[nodiscard]] bool FinalizeTransactionReplication(); + [[nodiscard]] bool FinalizeTransactionReplication(Storage *storage); - protected: - virtual void RecoverReplica(uint64_t replica_commit) = 0; + void TryCheckReplicaStateAsync(Storage *storage); // TODO Move back to private + private: + void RecoverReplica(uint64_t replica_commit, memgraph::storage::Storage *storage); - auto GetStorage() -> Storage * { return storage_; } - auto LastCommitTimestamp() const -> uint64_t; - void InitializeClient(); - void HandleRpcFailure(); - void TryInitializeClientAsync(); - void TryInitializeClientSync(); - void FrequentCheck(); + void CheckReplicaState(Storage *storage); + void LogRpcFailure(); + void TryCheckReplicaStateSync(Storage *storage); + void FrequentCheck(Storage *storage); - std::string name_; - communication::ClientContext rpc_context_; - rpc::Client rpc_client_; - std::chrono::seconds replica_check_frequency_; - - std::optional replica_stream_; - memgraph::replication::ReplicationMode mode_{memgraph::replication::ReplicationMode::SYNC}; - - utils::SpinLock client_lock_; - // This thread pool is used for background tasks so we don't - // block the main storage thread - // We use only 1 thread for 2 reasons: - // - background tasks ALWAYS contain some kind of RPC communication. - // We can't have multiple RPC communication from a same client - // because that's not logically valid (e.g. you cannot send a snapshot - // and WAL at a same time because WAL will arrive earlier and be applied - // before the snapshot which is not correct) - // - the implementation is simplified as we have a total control of what - // this pool is executing. Also, we can simply queue multiple tasks - // and be sure of the execution order. - // Not having mulitple possible threads in the same client allows us - // to ignore concurrency problems inside the client. - utils::ThreadPool thread_pool_{1}; - std::atomic replica_state_{replication::ReplicaState::INVALID}; - - utils::Scheduler replica_checker_; - Storage *storage_; - - memgraph::replication::ReplicationEpoch const *repl_epoch_; + ::memgraph::replication::ReplicationClient &client_; + // TODO Do not store the stream, make is a local variable + std::optional + replica_stream_; // Currently active stream (nullopt if not in use), note: a single stream per rpc client + mutable utils::Synchronized replica_state_{ + replication::ReplicaState::MAYBE_BEHIND}; }; } // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_storage_state.cpp b/src/storage/v2/replication/replication_storage_state.cpp index 1cd0bec09..a443c7171 100644 --- a/src/storage/v2/replication/replication_storage_state.cpp +++ b/src/storage/v2/replication/replication_storage_state.cpp @@ -16,10 +16,10 @@ namespace memgraph::storage { -void ReplicationStorageState::InitializeTransaction(uint64_t seq_num) { - replication_clients_.WithLock([&](auto &clients) { +void ReplicationStorageState::InitializeTransaction(uint64_t seq_num, Storage *storage) { + replication_clients_.WithLock([=](auto &clients) { for (auto &client : clients) { - client->StartTransactionReplication(seq_num); + client->StartTransactionReplication(seq_num, storage); } }); } @@ -52,12 +52,12 @@ void ReplicationStorageState::AppendOperation(durability::StorageMetadataOperati }); } -bool ReplicationStorageState::FinalizeTransaction(uint64_t timestamp) { +bool ReplicationStorageState::FinalizeTransaction(uint64_t timestamp, Storage *storage) { return replication_clients_.WithLock([=](auto &clients) { bool finalized_on_all_replicas = true; for (ReplicationClientPtr &client : clients) { client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(timestamp); }); - const auto finalized = client->FinalizeTransactionReplication(); + const auto finalized = client->FinalizeTransactionReplication(storage); if (client->Mode() == memgraph::replication::ReplicationMode::SYNC) { finalized_on_all_replicas = finalized && finalized_on_all_replicas; @@ -78,12 +78,12 @@ std::optional ReplicationStorageState::GetReplicaStat }); } -std::vector ReplicationStorageState::ReplicasInfo() const { - return replication_clients_.WithReadLock([](auto const &clients) { +std::vector ReplicationStorageState::ReplicasInfo(const Storage *storage) const { + return replication_clients_.WithReadLock([storage](auto const &clients) { std::vector replica_infos; replica_infos.reserve(clients.size()); - auto const asReplicaInfo = [](ReplicationClientPtr const &client) -> ReplicaInfo { - return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()}; + auto const asReplicaInfo = [storage](ReplicationClientPtr const &client) -> ReplicaInfo { + return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo(storage)}; }; std::transform(clients.begin(), clients.end(), std::back_inserter(replica_infos), asReplicaInfo); return replica_infos; diff --git a/src/storage/v2/replication/replication_storage_state.hpp b/src/storage/v2/replication/replication_storage_state.hpp index afedb3950..e3d6b94a0 100644 --- a/src/storage/v2/replication/replication_storage_state.hpp +++ b/src/storage/v2/replication/replication_storage_state.hpp @@ -12,11 +12,13 @@ #pragma once #include +#include #include "kvstore/kvstore.hpp" #include "storage/v2/delta.hpp" #include "storage/v2/durability/storage_global_operation.hpp" #include "storage/v2/transaction.hpp" +#include "utils/exceptions.hpp" #include "utils/result.hpp" /// REPLICATION /// @@ -33,21 +35,21 @@ namespace memgraph::storage { class Storage; -class ReplicationClient; +class ReplicationStorageClient; struct ReplicationStorageState { // Only MAIN can send - void InitializeTransaction(uint64_t seq_num); + void InitializeTransaction(uint64_t seq_num, Storage *storage); void AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t timestamp); void AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp); void AppendOperation(durability::StorageMetadataOperation operation, LabelId label, const std::set &properties, const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, uint64_t final_commit_timestamp); - bool FinalizeTransaction(uint64_t timestamp); + bool FinalizeTransaction(uint64_t timestamp, Storage *storage); // Getters auto GetReplicaState(std::string_view name) const -> std::optional; - auto ReplicasInfo() const -> std::vector; + auto ReplicasInfo(const Storage *storage) const -> std::vector; // History void TrackLatestHistory(); @@ -55,6 +57,19 @@ struct ReplicationStorageState { void Reset(); + template + bool WithClient(std::string_view replica_name, F &&callback) { + return replication_clients_.WithLock([replica_name, cb = std::forward(callback)](auto &clients) { + for (const auto &client : clients) { + if (client->Name() == replica_name) { + cb(client.get()); + return true; + } + } + return false; + }); + } + // Questions: // - storage durability <- databases/*name*/wal and snapshots (where this for epoch_id) // - multi-tenant durability <- databases/.durability (there is a list of all active tenants) @@ -74,7 +89,7 @@ struct ReplicationStorageState { // This way we can initialize client in main thread which means // that we can immediately notify the user if the initialization // failed. - using ReplicationClientPtr = std::unique_ptr; + using ReplicationClientPtr = std::unique_ptr; using ReplicationClientList = utils::Synchronized, utils::RWSpinLock>; ReplicationClientList replication_clients_; diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 8d8b06cd6..4142da3ca 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -109,7 +109,7 @@ struct EdgeInfoForDeletion { class Storage { friend class ReplicationServer; - friend class ReplicationClient; + friend class ReplicationStorageClient; public: Storage(Config config, StorageMode storage_mode); @@ -355,11 +355,7 @@ class Storage { virtual void PrepareForNewEpoch() = 0; - virtual auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *current_epoch) - -> std::unique_ptr = 0; - - auto ReplicasInfo() const { return repl_storage_state_.ReplicasInfo(); } + auto ReplicasInfo() const { return repl_storage_state_.ReplicasInfo(this); } auto GetReplicaState(std::string_view name) const -> std::optional { return repl_storage_state_.GetReplicaState(name); } @@ -384,7 +380,7 @@ class Storage { Config config_; // Transaction engine - utils::SpinLock engine_lock_; + mutable utils::SpinLock engine_lock_; uint64_t timestamp_{kTimestampInitialId}; uint64_t transaction_id_{kTransactionInitialId}; diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index 996955dc1..74dcbce74 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -123,6 +123,143 @@ def test_show_replicas(connection): assert actual_data == expected_data +def test_drop_replicas(connection): + # Goal of this test is to check the DROP REPLICAS command. + # 0/ Manually start main and all replicas + # 1/ Check status of the replicas + # 2/ Kill replica 3 + # 3/ Drop replica 3 and check status + # 4/ Stop replica 4 + # 5/ Drop replica 4 and check status + # 6/ Kill replica 1 + # 7/ Drop replica 1 and check status + # 8/ Stop replica 2 + # 9/ Drop replica 2 and check status + # 10/ Restart all replicas + # 11/ Register them + # 12/ Drop all and check status + + def retrieve_data(): + return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + # 0/ + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + cursor = connection(7687, "main").cursor() + + # 1/ + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + EXPECTED_COLUMN_NAMES = { + "name", + "socket_address", + "sync_mode", + "current_timestamp_of_replica", + "number_of_timestamp_behind_master", + "state", + } + + actual_column_names = {x.name for x in cursor.description} + assert actual_column_names == EXPECTED_COLUMN_NAMES + + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 2/ + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_3") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "invalid"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 3/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_3") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 4/ + interactive_mg_runner.stop(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_4") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "invalid"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 5/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_4") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 6/ + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 7/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_1") + expected_data = { + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 8/ + interactive_mg_runner.stop(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_2") + expected_data = { + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 9/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_2") + expected_data = set() + mg_sleep_and_assert(expected_data, retrieve_data) + + # 10/ + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_2") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_3") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_4") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';") + + # 11/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 12/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_1") + execute_and_fetch_all(cursor, "DROP REPLICA replica_2") + execute_and_fetch_all(cursor, "DROP REPLICA replica_3") + execute_and_fetch_all(cursor, "DROP REPLICA replica_4") + expected_data = set() + mg_sleep_and_assert(expected_data, retrieve_data) + + def test_basic_recovery(connection): # Goal of this test is to check the recovery of main. # 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. @@ -630,10 +767,26 @@ def test_async_replication_when_main_is_killed(): ) # 2/ - for index in range(50): + # First make sure that anything has been replicated + for index in range(0, 5): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(f"CREATE (p:Number {{name:{index}}})") + expected_data = [("async_replica", "127.0.0.1:10001", "async", "ready")] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, ip, mode, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + for index in range(5, 50): interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(f"CREATE (p:Number {{name:{index}}})") if random.randint(0, 100) > 95: main_killed = f"Main was killed at index={index}" + print(main_killed) interactive_mg_runner.kill(CONFIGURATION, "main") break diff --git a/tests/integration/telemetry/client.cpp b/tests/integration/telemetry/client.cpp index 558e0a6bc..34e1c2a67 100644 --- a/tests/integration/telemetry/client.cpp +++ b/tests/integration/telemetry/client.cpp @@ -41,7 +41,7 @@ int main(int argc, char **argv) { memgraph::storage::UpdatePaths(db_config, data_directory); memgraph::replication::ReplicationState repl_state(ReplicationStateRootPath(db_config)); - memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state + memgraph::dbms::DbmsHandler dbms_handler(db_config #ifdef MG_ENTERPRISE , &auth_, false, false diff --git a/tests/unit/dbms_handler.cpp b/tests/unit/dbms_handler.cpp index 75efddefe..a811e4159 100644 --- a/tests/unit/dbms_handler.cpp +++ b/tests/unit/dbms_handler.cpp @@ -52,18 +52,15 @@ class TestEnvironment : public ::testing::Environment { auth = std::make_unique>( storage_directory / "auth"); - repl_state_.emplace(memgraph::storage::ReplicationStateRootPath(storage_conf)); - ptr_ = std::make_unique(storage_conf, *repl_state_, auth.get(), false, true); + ptr_ = std::make_unique(storage_conf, auth.get(), false, true); } void TearDown() override { ptr_.reset(); auth.reset(); - repl_state_.reset(); } static std::unique_ptr ptr_; - std::optional repl_state_; }; std::unique_ptr TestEnvironment::ptr_ = nullptr; diff --git a/tests/unit/dbms_handler_community.cpp b/tests/unit/dbms_handler_community.cpp index efce2854d..3848cd347 100644 --- a/tests/unit/dbms_handler_community.cpp +++ b/tests/unit/dbms_handler_community.cpp @@ -52,18 +52,15 @@ class TestEnvironment : public ::testing::Environment { auth = std::make_unique>( storage_directory / "auth"); - repl_state_.emplace(memgraph::storage::ReplicationStateRootPath(storage_conf)); - ptr_ = std::make_unique(storage_conf, *repl_state_); + ptr_ = std::make_unique(storage_conf); } void TearDown() override { ptr_.reset(); auth.reset(); - repl_state_.reset(); } static std::unique_ptr ptr_; - std::optional repl_state_; }; std::unique_ptr TestEnvironment::ptr_ = nullptr; diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index 261b2ccf0..f07130c4a 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -102,8 +102,7 @@ class ReplicationTest : public ::testing::Test { struct MinMemgraph { MinMemgraph(const memgraph::storage::Config &conf) - : repl_state{ReplicationStateRootPath(conf)}, - dbms{conf, repl_state + : dbms{conf #ifdef MG_ENTERPRISE , reinterpret_cast< @@ -111,11 +110,12 @@ struct MinMemgraph { true, false #endif }, + repl_state{dbms.ReplicationState()}, db{*dbms.Get().get()}, - repl_handler(repl_state, dbms) { + repl_handler(dbms) { } - memgraph::replication::ReplicationState repl_state; memgraph::dbms::DbmsHandler dbms; + memgraph::replication::ReplicationState &repl_state; memgraph::dbms::Database &db; ReplicationHandler repl_handler; }; @@ -130,14 +130,13 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { .port = ports[0], }); - ASSERT_FALSE(main.repl_handler - .RegisterReplica(ReplicationClientConfig{ - .name = "REPLICA", - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }) - .HasError()); + const auto ® = main.repl_handler.RegisterReplica(ReplicationClientConfig{ + .name = "REPLICA", + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }); + ASSERT_FALSE(reg.HasError()) << (int)reg.GetError(); // vertex create // vertex add label @@ -966,14 +965,14 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { .ip_address = local_host, .port = ports[0], }); - ASSERT_FALSE(res.HasError()); + ASSERT_FALSE(res.HasError()) << (int)res.GetError(); res = main->repl_handler.RegisterReplica(ReplicationClientConfig{ .name = replicas[1], .mode = ReplicationMode::SYNC, .ip_address = local_host, .port = ports[1], }); - ASSERT_FALSE(res.HasError()); + ASSERT_FALSE(res.HasError()) << (int)res.GetError(); auto replica_infos = main->db.storage()->ReplicasInfo();