diff --git a/release/get_version.py b/release/get_version.py index cfce88475..a8539fab4 100755 --- a/release/get_version.py +++ b/release/get_version.py @@ -104,7 +104,9 @@ def retry(retry_limit, timeout=100): except Exception: time.sleep(timeout) return func(*args, **kwargs) + return wrapper + return inner_func @@ -200,19 +202,19 @@ if args.version: try: current_branch = get_output("git", "rev-parse", "--abbrev-ref", "HEAD") if current_branch != "master": - branches = get_output("git", "branch") - if "master" in branches: + branches = get_output("git", "branch", "-r", "--list", "origin/master") + if "origin/master" in branches: # If master is present locally, the fetch is allowed to fail # because this script will still be able to compare against the # master branch. try: - get_output("git", "fetch", "origin", "master:master") + get_output("git", "fetch", "origin", "master") except Exception: pass else: # If master is not present locally, the fetch command has to # succeed because something else will fail otherwise. - get_output("git", "fetch", "origin", "master:master") + get_output("git", "fetch", "origin", "master") except Exception: print("Fatal error while ensuring local master branch.") sys.exit(1) @@ -232,7 +234,7 @@ for branch in branches: match = branch_regex.match(branch) if match is not None: version = tuple(map(int, match.group(1).split("."))) - master_branch_merge = get_output("git", "merge-base", "master", branch) + master_branch_merge = get_output("git", "merge-base", "origin/master", branch) versions.append((version, branch, master_branch_merge)) versions.sort(reverse=True) @@ -243,7 +245,7 @@ current_version = None for version in versions: version_tuple, branch, master_branch_merge = version current_branch_merge = get_output("git", "merge-base", current_hash, branch) - master_current_merge = get_output("git", "merge-base", current_hash, "master") + master_current_merge = get_output("git", "merge-base", current_hash, "origin/master") # The first check checks whether this commit is a child of `master` and # the version branch was created before us. # The second check checks whether this commit is a child of the version diff --git a/src/dbms/CMakeLists.txt b/src/dbms/CMakeLists.txt index 8ec1e0972..f1df4985a 100644 --- a/src/dbms/CMakeLists.txt +++ b/src/dbms/CMakeLists.txt @@ -1,3 +1,3 @@ -add_library(mg-dbms STATIC database.cpp replication_handler.cpp inmemory/replication_handlers.cpp) +add_library(mg-dbms STATIC dbms_handler.cpp database.cpp replication_handler.cpp replication_client.cpp inmemory/replication_handlers.cpp) target_link_libraries(mg-dbms mg-utils mg-storage-v2 mg-query) diff --git a/src/dbms/constants.hpp b/src/dbms/constants.hpp index 3ca61056b..e7ea9987b 100644 --- a/src/dbms/constants.hpp +++ b/src/dbms/constants.hpp @@ -15,4 +15,10 @@ namespace memgraph::dbms { constexpr static const char *kDefaultDB = "memgraph"; //!< Name of the default database +#ifdef MG_EXPERIMENTAL_REPLICATION_MULTITENANCY +constexpr bool allow_mt_repl = true; +#else +constexpr bool allow_mt_repl = false; +#endif + } // namespace memgraph::dbms diff --git a/src/dbms/database.cpp b/src/dbms/database.cpp index 411e282e8..74ee13892 100644 --- a/src/dbms/database.cpp +++ b/src/dbms/database.cpp @@ -21,7 +21,7 @@ template struct memgraph::utils::Gatekeeper; namespace memgraph::dbms { -Database::Database(storage::Config config, const replication::ReplicationState &repl_state) +Database::Database(storage::Config config, replication::ReplicationState &repl_state) : trigger_store_(config.durability.storage_directory / "triggers"), streams_{config.durability.storage_directory / "streams"}, plan_cache_{FLAGS_query_plan_cache_max_size}, diff --git a/src/dbms/database.hpp b/src/dbms/database.hpp index 457aa1c1d..416ff76bc 100644 --- a/src/dbms/database.hpp +++ b/src/dbms/database.hpp @@ -48,7 +48,7 @@ class Database { * * @param config storage configuration */ - explicit Database(storage::Config config, const replication::ReplicationState &repl_state); + explicit Database(storage::Config config, replication::ReplicationState &repl_state); /** * @brief Returns the raw storage pointer. diff --git a/src/dbms/database_handler.hpp b/src/dbms/database_handler.hpp index a6b3b563b..617e614c3 100644 --- a/src/dbms/database_handler.hpp +++ b/src/dbms/database_handler.hpp @@ -51,8 +51,7 @@ class DatabaseHandler : public Handler { * @param config Storage configuration * @return HandlerT::NewResult */ - HandlerT::NewResult New(std::string_view name, storage::Config config, - const replication::ReplicationState &repl_state) { + HandlerT::NewResult New(std::string_view name, storage::Config config, replication::ReplicationState &repl_state) { // Control that no one is using the same data directory if (std::any_of(begin(), end(), [&](auto &elem) { auto db_acc = elem.second.access(); diff --git a/src/dbms/dbms_handler.cpp b/src/dbms/dbms_handler.cpp new file mode 100644 index 000000000..0af9364bf --- /dev/null +++ b/src/dbms/dbms_handler.cpp @@ -0,0 +1,75 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "dbms/dbms_handler.hpp" + +namespace memgraph::dbms { +#ifdef MG_ENTERPRISE +DbmsHandler::DbmsHandler( + storage::Config config, + memgraph::utils::Synchronized *auth, + bool recovery_on_startup, bool delete_on_drop) + : default_config_{std::move(config)}, + delete_on_drop_(delete_on_drop), + repl_state_{ReplicationStateRootPath(default_config_)} { + // TODO: Decouple storage config from dbms config + // TODO: Save individual db configs inside the kvstore and restore from there + storage::UpdatePaths(default_config_, default_config_.durability.storage_directory / "databases"); + const auto &db_dir = default_config_.durability.storage_directory; + const auto durability_dir = db_dir / ".durability"; + utils::EnsureDirOrDie(db_dir); + utils::EnsureDirOrDie(durability_dir); + durability_ = std::make_unique(durability_dir); + + // Generate the default database + MG_ASSERT(!NewDefault_().HasError(), "Failed while creating the default DB."); + + // Recover previous databases + if (recovery_on_startup) { + for (const auto &[name, _] : *durability_) { + if (name == kDefaultDB) continue; // Already set + spdlog::info("Restoring database {}.", name); + MG_ASSERT(!New_(name).HasError(), "Failed while creating database {}.", name); + spdlog::info("Database {} restored.", name); + } + } else { // Clear databases from the durability list and auth + auto locked_auth = auth->Lock(); + for (const auto &[name, _] : *durability_) { + if (name == kDefaultDB) continue; + locked_auth->DeleteDatabase(name); + durability_->Delete(name); + } + } + + // Startup replication state (if recovered at startup) + auto replica = [this](replication::RoleReplicaData const &data) { + // Register handlers + InMemoryReplicationHandlers::Register(this, *data.server); + if (!data.server->Start()) { + spdlog::error("Unable to start the replication server."); + return false; + } + return true; + }; + // Replication frequent check start + auto main = [this](replication::RoleMainData &data) { + for (auto &client : data.registered_replicas_) { + StartReplicaClient(*this, client); + } + return true; + }; + // Startup proccess for main/replica + MG_ASSERT(std::visit(memgraph::utils::Overloaded{replica, main}, repl_state_.ReplicationData()), + "Replica recovery failure!"); +} +#endif + +} // namespace memgraph::dbms diff --git a/src/dbms/dbms_handler.hpp b/src/dbms/dbms_handler.hpp index 27ab963d4..3151398ab 100644 --- a/src/dbms/dbms_handler.hpp +++ b/src/dbms/dbms_handler.hpp @@ -26,9 +26,11 @@ #include "auth/auth.hpp" #include "constants.hpp" #include "dbms/database.hpp" +#include "dbms/inmemory/replication_handlers.hpp" #ifdef MG_ENTERPRISE #include "dbms/database_handler.hpp" #endif +#include "dbms/replication_client.hpp" #include "global.hpp" #include "query/config.hpp" #include "query/interpreter_context.hpp" @@ -102,52 +104,22 @@ class DbmsHandler { * @param recovery_on_startup restore databases (and its content) and authentication data * @param delete_on_drop when dropping delete any associated directories on disk */ - DbmsHandler(storage::Config config, const replication::ReplicationState &repl_state, auto *auth, - bool recovery_on_startup, bool delete_on_drop) - : lock_{utils::RWLock::Priority::READ}, - default_config_{std::move(config)}, - repl_state_(repl_state), - delete_on_drop_(delete_on_drop) { - // TODO: Decouple storage config from dbms config - // TODO: Save individual db configs inside the kvstore and restore from there - storage::UpdatePaths(default_config_, default_config_.durability.storage_directory / "databases"); - const auto &db_dir = default_config_.durability.storage_directory; - const auto durability_dir = db_dir / ".durability"; - utils::EnsureDirOrDie(db_dir); - utils::EnsureDirOrDie(durability_dir); - durability_ = std::make_unique(durability_dir); - - // Generate the default database - MG_ASSERT(!NewDefault_().HasError(), "Failed while creating the default DB."); - // Recover previous databases - if (recovery_on_startup) { - for (const auto &[name, _] : *durability_) { - if (name == kDefaultDB) continue; // Already set - spdlog::info("Restoring database {}.", name); - MG_ASSERT(!New_(name).HasError(), "Failed while creating database {}.", name); - spdlog::info("Database {} restored.", name); - } - } else { // Clear databases from the durability list and auth - auto locked_auth = auth->Lock(); - for (const auto &[name, _] : *durability_) { - if (name == kDefaultDB) continue; - locked_auth->DeleteDatabase(name); - durability_->Delete(name); - } - } - } + DbmsHandler(storage::Config config, + memgraph::utils::Synchronized *auth, + bool recovery_on_startup, bool delete_on_drop); // TODO If more arguments are added use a config strut #else /** * @brief Initialize the handler. A single database is supported in community edition. * * @param configs storage configuration */ - DbmsHandler(storage::Config config, const replication::ReplicationState &repl_state) - : db_gatekeeper_{[&] { + DbmsHandler(storage::Config config) + : repl_state_{ReplicationStateRootPath(config)}, + db_gatekeeper_{[&] { config.name = kDefaultDB; return std::move(config); }(), - repl_state} {} + repl_state_} {} #endif #ifdef MG_ENTERPRISE @@ -248,6 +220,12 @@ class DbmsHandler { #endif } + replication::ReplicationState &ReplicationState() { return repl_state_; } + replication::ReplicationState const &ReplicationState() const { return repl_state_; } + + bool IsMain() const { return repl_state_.IsMain(); } + bool IsReplica() const { return repl_state_.IsReplica(); } + /** * @brief Return the statistics all databases. * @@ -536,14 +514,15 @@ class DbmsHandler { throw UnknownDatabaseException("Tried to retrieve an unknown database \"{}\".", name); } - mutable LockT lock_; //!< protective lock - storage::Config default_config_; //!< Storage configuration used when creating new databases - const replication::ReplicationState &repl_state_; //!< Global replication state - DatabaseHandler db_handler_; //!< multi-tenancy storage handler - std::unique_ptr durability_; //!< list of active dbs (pointer so we can postpone its creation) - bool delete_on_drop_; //!< Flag defining if dropping storage also deletes its directory - std::set> defunct_dbs_; //!< Databases that are in an unknown state due to various failures -#else + mutable LockT lock_{utils::RWLock::Priority::READ}; //!< protective lock + storage::Config default_config_; //!< Storage configuration used when creating new databases + DatabaseHandler db_handler_; //!< multi-tenancy storage handler + std::unique_ptr durability_; //!< list of active dbs (pointer so we can postpone its creation) + bool delete_on_drop_; //!< Flag defining if dropping storage also deletes its directory + std::set defunct_dbs_; //!< Databases that are in an unknown state due to various failures +#endif + replication::ReplicationState repl_state_; //!< Global replication state +#ifndef MG_ENTERPRISE mutable utils::Gatekeeper db_gatekeeper_; //!< Single databases gatekeeper #endif }; diff --git a/src/dbms/inmemory/storage_helper.hpp b/src/dbms/inmemory/storage_helper.hpp index 347c16928..1cd9f9f4e 100644 --- a/src/dbms/inmemory/storage_helper.hpp +++ b/src/dbms/inmemory/storage_helper.hpp @@ -22,14 +22,8 @@ namespace memgraph::dbms { -#ifdef MG_EXPERIMENTAL_REPLICATION_MULTITENANCY -constexpr bool allow_mt_repl = true; -#else -constexpr bool allow_mt_repl = false; -#endif - -inline std::unique_ptr CreateInMemoryStorage( - storage::Config config, const ::memgraph::replication::ReplicationState &repl_state) { +inline std::unique_ptr CreateInMemoryStorage(storage::Config config, + ::memgraph::replication::ReplicationState &repl_state) { const auto wal_mode = config.durability.snapshot_wal_mode; const auto name = config.name; auto storage = std::make_unique(std::move(config)); diff --git a/src/dbms/replication_client.cpp b/src/dbms/replication_client.cpp new file mode 100644 index 000000000..bfa4c622f --- /dev/null +++ b/src/dbms/replication_client.cpp @@ -0,0 +1,34 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "dbms/replication_client.hpp" + +namespace memgraph::dbms { + +void StartReplicaClient(DbmsHandler &dbms_handler, replication::ReplicationClient &client) { + // No client error, start instance level client + auto const &endpoint = client.rpc_client_.Endpoint(); + spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port); + client.StartFrequentCheck([&dbms_handler](std::string_view name) { + // Working connection, check if any database has been left behind + dbms_handler.ForEach([name](dbms::Database *db) { + // Specific database <-> replica client + db->storage()->repl_storage_state_.WithClient(name, [&](storage::ReplicationStorageClient *client) { + if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) { + // Database <-> replica might be behind, check and recover + client->TryCheckReplicaStateAsync(db->storage()); + } + }); + }); + }); +} + +} // namespace memgraph::dbms diff --git a/src/dbms/replication_client.hpp b/src/dbms/replication_client.hpp new file mode 100644 index 000000000..c1bac91a2 --- /dev/null +++ b/src/dbms/replication_client.hpp @@ -0,0 +1,21 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "dbms/dbms_handler.hpp" +#include "replication/replication_client.hpp" + +namespace memgraph::dbms { + +void StartReplicaClient(DbmsHandler &dbms_handler, replication::ReplicationClient &client); + +} // namespace memgraph::dbms diff --git a/src/dbms/replication_handler.cpp b/src/dbms/replication_handler.cpp index cff93fd6b..2cbe2c432 100644 --- a/src/dbms/replication_handler.cpp +++ b/src/dbms/replication_handler.cpp @@ -15,6 +15,7 @@ #include "dbms/dbms_handler.hpp" #include "dbms/inmemory/replication_handlers.hpp" #include "dbms/inmemory/storage_helper.hpp" +#include "dbms/replication_client.hpp" #include "replication/state.hpp" using memgraph::replication::ReplicationClientConfig; @@ -41,6 +42,8 @@ std::string RegisterReplicaErrorToString(RegisterReplicaError error) { } } // namespace +ReplicationHandler::ReplicationHandler(DbmsHandler &dbms_handler) : dbms_handler_(dbms_handler) {} + bool ReplicationHandler::SetReplicationRoleMain() { auto const main_handler = [](RoleMainData const &) { // If we are already MAIN, we don't want to change anything @@ -56,42 +59,49 @@ bool ReplicationHandler::SetReplicationRoleMain() { // STEP 2) Change to MAIN // TODO: restore replication servers if false? - if (!repl_state_.SetReplicationRoleMain()) { + if (!dbms_handler_.ReplicationState().SetReplicationRoleMain()) { // TODO: Handle recovery on failure??? return false; } // STEP 3) We are now MAIN, update storage local epoch + const auto &epoch = + std::get(std::as_const(dbms_handler_.ReplicationState()).ReplicationData()).epoch_; dbms_handler_.ForEach([&](Database *db) { auto *storage = db->storage(); - storage->repl_storage_state_.epoch_ = std::get(std::as_const(repl_state_).ReplicationData()).epoch_; + storage->repl_storage_state_.epoch_ = epoch; }); return true; }; // TODO: under lock - return std::visit(utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData()); + return std::visit(utils::Overloaded{main_handler, replica_handler}, + dbms_handler_.ReplicationState().ReplicationData()); } bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config) { // We don't want to restart the server if we're already a REPLICA - if (repl_state_.IsReplica()) { + if (dbms_handler_.ReplicationState().IsReplica()) { return false; } - // Remove registered replicas + // TODO StorageState needs to be synched. Could have a dangling reference if someone adds a database as we are + // deleting the replica. + // Remove database specific clients dbms_handler_.ForEach([&](Database *db) { auto *storage = db->storage(); storage->repl_storage_state_.replication_clients_.WithLock([](auto &clients) { clients.clear(); }); }); + // Remove instance level clients + std::get(dbms_handler_.ReplicationState().ReplicationData()).registered_replicas_.clear(); // Creates the server - repl_state_.SetReplicationRoleReplica(config); + dbms_handler_.ReplicationState().SetReplicationRoleReplica(config); // Start const auto success = - std::visit(utils::Overloaded{[](auto) { + std::visit(utils::Overloaded{[](RoleMainData const &) { // ASSERT return false; }, @@ -104,36 +114,37 @@ bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication:: } return true; }}, - repl_state_.ReplicationData()); + dbms_handler_.ReplicationState().ReplicationData()); // TODO Handle error (restore to main?) return success; } auto ReplicationHandler::RegisterReplica(const memgraph::replication::ReplicationClientConfig &config) -> memgraph::utils::BasicResult { - MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!"); + MG_ASSERT(dbms_handler_.ReplicationState().IsMain(), "Only main instance can register a replica!"); - auto res = repl_state_.RegisterReplica(config); - switch (res) { - case memgraph::replication::RegisterReplicaError::NOT_MAIN: - MG_ASSERT(false, "Only main instance can register a replica!"); - return {}; - case memgraph::replication::RegisterReplicaError::NAME_EXISTS: - return memgraph::dbms::RegisterReplicaError::NAME_EXISTS; - case memgraph::replication::RegisterReplicaError::END_POINT_EXISTS: - return memgraph::dbms::RegisterReplicaError::END_POINT_EXISTS; - case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED: - return memgraph::dbms::RegisterReplicaError::COULD_NOT_BE_PERSISTED; - case memgraph::replication::RegisterReplicaError::SUCCESS: - break; - } - - bool all_clients_good = true; + auto instance_client = dbms_handler_.ReplicationState().RegisterReplica(config); + if (instance_client.HasError()) switch (instance_client.GetError()) { + case memgraph::replication::RegisterReplicaError::NOT_MAIN: + MG_ASSERT(false, "Only main instance can register a replica!"); + return {}; + case memgraph::replication::RegisterReplicaError::NAME_EXISTS: + return memgraph::dbms::RegisterReplicaError::NAME_EXISTS; + case memgraph::replication::RegisterReplicaError::END_POINT_EXISTS: + return memgraph::dbms::RegisterReplicaError::END_POINT_EXISTS; + case memgraph::replication::RegisterReplicaError::COULD_NOT_BE_PERSISTED: + return memgraph::dbms::RegisterReplicaError::COULD_NOT_BE_PERSISTED; + case memgraph::replication::RegisterReplicaError::SUCCESS: + break; + } if (!allow_mt_repl && dbms_handler_.All().size() > 1) { spdlog::warn("Multi-tenant replication is currently not supported!"); } + bool all_clients_good = true; + + // Add database specific clients (NOTE Currently all databases are connected to each replica) dbms_handler_.ForEach([&](Database *db) { auto *storage = db->storage(); if (!allow_mt_repl && storage->id() != kDefaultDB) { @@ -143,18 +154,29 @@ auto ReplicationHandler::RegisterReplica(const memgraph::replication::Replicatio if (storage->storage_mode_ != storage::StorageMode::IN_MEMORY_TRANSACTIONAL) return; all_clients_good &= - storage->repl_storage_state_.replication_clients_.WithLock([storage, &config](auto &clients) -> bool { - auto client = storage->CreateReplicationClient(config, &storage->repl_storage_state_.epoch_); - client->Start(); - - if (client->State() == storage::replication::ReplicaState::INVALID) { + storage->repl_storage_state_.replication_clients_.WithLock([storage, &instance_client](auto &storage_clients) { + auto client = std::make_unique(*instance_client.GetValue()); + client->Start(storage); + // After start the storage <-> replica state should be READY or RECOVERING (if correctly started) + // MAYBE_BEHIND isn't a statement of the current state, this is the default value + // Failed to start due to branching of MAIN and REPLICA + if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) { return false; } - clients.push_back(std::move(client)); + storage_clients.push_back(std::move(client)); return true; }); }); - if (!all_clients_good) return RegisterReplicaError::CONNECTION_FAILED; // TODO: this happen to 1 or many...what to do + + // NOTE Currently if any databases fails, we revert back + if (!all_clients_good) { + spdlog::error("Failed to register all databases to the REPLICA \"{}\"", config.name); + UnregisterReplica(config.name); + return RegisterReplicaError::CONNECTION_FAILED; + } + + // No client error, start instance level client + StartReplicaClient(dbms_handler_, *instance_client.GetValue()); return {}; } @@ -163,60 +185,66 @@ auto ReplicationHandler::UnregisterReplica(std::string_view name) -> UnregisterR return UnregisterReplicaResult::NOT_MAIN; }; auto const main_handler = [this, name](RoleMainData &mainData) -> UnregisterReplicaResult { - if (!repl_state_.TryPersistUnregisterReplica(name)) { + if (!dbms_handler_.ReplicationState().TryPersistUnregisterReplica(name)) { return UnregisterReplicaResult::COULD_NOT_BE_PERSISTED; } - auto const n_unregistered = - std::erase_if(mainData.registered_replicas_, - [&](ReplicationClientConfig const ®istered_config) { return registered_config.name == name; }); - - dbms_handler_.ForEach([&](Database *db) { - db->storage()->repl_storage_state_.replication_clients_.WithLock( - [&](auto &clients) { std::erase_if(clients, [&](const auto &client) { return client->Name() == name; }); }); + // Remove database specific clients + dbms_handler_.ForEach([name](Database *db) { + db->storage()->repl_storage_state_.replication_clients_.WithLock([&name](auto &clients) { + std::erase_if(clients, [name](const auto &client) { return client->Name() == name; }); + }); }); - + // Remove instance level clients + auto const n_unregistered = + std::erase_if(mainData.registered_replicas_, [name](auto const &client) { return client.name_ == name; }); return n_unregistered != 0 ? UnregisterReplicaResult::SUCCESS : UnregisterReplicaResult::CAN_NOT_UNREGISTER; }; - return std::visit(utils::Overloaded{main_handler, replica_handler}, repl_state_.ReplicationData()); + return std::visit(utils::Overloaded{main_handler, replica_handler}, + dbms_handler_.ReplicationState().ReplicationData()); } -auto ReplicationHandler::GetRole() const -> memgraph::replication::ReplicationRole { return repl_state_.GetRole(); } +auto ReplicationHandler::GetRole() const -> memgraph::replication::ReplicationRole { + return dbms_handler_.ReplicationState().GetRole(); +} -bool ReplicationHandler::IsMain() const { return repl_state_.IsMain(); } +bool ReplicationHandler::IsMain() const { return dbms_handler_.ReplicationState().IsMain(); } -bool ReplicationHandler::IsReplica() const { return repl_state_.IsReplica(); } +bool ReplicationHandler::IsReplica() const { return dbms_handler_.ReplicationState().IsReplica(); } -void RestoreReplication(const replication::ReplicationState &repl_state, storage::Storage &storage) { +// Per storage +// NOTE Storage will connect to all replicas. Future work might change this +void RestoreReplication(replication::ReplicationState &repl_state, storage::Storage &storage) { spdlog::info("Restoring replication role."); /// MAIN - auto const recover_main = [&storage](RoleMainData const &mainData) { - for (const auto &config : mainData.registered_replicas_) { - spdlog::info("Replica {} restoration started for {}.", config.name, storage.id()); + auto const recover_main = [&storage](RoleMainData &mainData) { + // Each individual client has already been restored and started. Here we just go through each database and start its + // client + for (auto &instance_client : mainData.registered_replicas_) { + spdlog::info("Replica {} restoration started for {}.", instance_client.name_, storage.id()); - auto register_replica = [&storage](const memgraph::replication::ReplicationClientConfig &config) - -> memgraph::utils::BasicResult { - return storage.repl_storage_state_.replication_clients_.WithLock( - [&storage, &config](auto &clients) -> utils::BasicResult { - auto client = storage.CreateReplicationClient(config, &storage.repl_storage_state_.epoch_); - client->Start(); + const auto &ret = storage.repl_storage_state_.replication_clients_.WithLock( + [&](auto &storage_clients) -> utils::BasicResult { + auto client = std::make_unique(instance_client); + client->Start(&storage); + // After start the storage <-> replica state should be READY or RECOVERING (if correctly started) + // MAYBE_BEHIND isn't a statement of the current state, this is the default value + // Failed to start due to branching of MAIN and REPLICA + if (client->State() == storage::replication::ReplicaState::MAYBE_BEHIND) { + spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", + instance_client.name_); + } + storage_clients.push_back(std::move(client)); + return {}; + }); - if (client->State() == storage::replication::ReplicaState::INVALID) { - spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", - client->Name()); - } - clients.push_back(std::move(client)); - return {}; - }); - }; - - auto ret = register_replica(config); if (ret.HasError()) { MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError()); - LOG_FATAL("Failure when restoring replica {}: {}.", config.name, RegisterReplicaErrorToString(ret.GetError())); + LOG_FATAL("Failure when restoring replica {}: {}.", instance_client.name_, + RegisterReplicaErrorToString(ret.GetError())); } - spdlog::info("Replica {} restored for {}.", config.name, storage.id()); + spdlog::info("Replica {} restored for {}.", instance_client.name_, storage.id()); } spdlog::info("Replication role restored to MAIN."); }; @@ -229,6 +257,6 @@ void RestoreReplication(const replication::ReplicationState &repl_state, storage recover_main, recover_replica, }, - std::as_const(repl_state).ReplicationData()); + repl_state.ReplicationData()); } } // namespace memgraph::dbms diff --git a/src/dbms/replication_handler.hpp b/src/dbms/replication_handler.hpp index e50c47969..dc95407b1 100644 --- a/src/dbms/replication_handler.hpp +++ b/src/dbms/replication_handler.hpp @@ -36,8 +36,7 @@ enum class UnregisterReplicaResult : uint8_t { /// A handler type that keep in sync current ReplicationState and the MAIN/REPLICA-ness of Storage /// TODO: extend to do multiple storages struct ReplicationHandler { - ReplicationHandler(memgraph::replication::ReplicationState &replState, DbmsHandler &dbms_handler) - : repl_state_(replState), dbms_handler_(dbms_handler) {} + explicit ReplicationHandler(DbmsHandler &dbms_handler); // as REPLICA, become MAIN bool SetReplicationRoleMain(); @@ -58,12 +57,11 @@ struct ReplicationHandler { bool IsReplica() const; private: - memgraph::replication::ReplicationState &repl_state_; DbmsHandler &dbms_handler_; }; /// A handler type that keep in sync current ReplicationState and the MAIN/REPLICA-ness of Storage /// TODO: extend to do multiple storages -void RestoreReplication(const replication::ReplicationState &repl_state, storage::Storage &storage); +void RestoreReplication(replication::ReplicationState &repl_state, storage::Storage &storage); } // namespace memgraph::dbms diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 983dd61f9..ce43f634d 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -368,34 +368,17 @@ int main(int argc, char **argv) { std::unique_ptr auth_checker; auth_glue(&auth_, auth_handler, auth_checker); - memgraph::replication::ReplicationState repl_state(ReplicationStateRootPath(db_config)); - - memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state + memgraph::dbms::DbmsHandler dbms_handler(db_config #ifdef MG_ENTERPRISE , &auth_, FLAGS_data_recovery_on_startup, FLAGS_storage_delete_on_drop #endif ); auto db_acc = dbms_handler.Get(); - memgraph::query::InterpreterContext interpreter_context_(interp_config, &dbms_handler, &repl_state, - auth_handler.get(), auth_checker.get()); - MG_ASSERT(db_acc, "Failed to access the main database"); - // TODO: Move it somewhere better - // Startup replication state (if recovered at startup) - MG_ASSERT(std::visit(memgraph::utils::Overloaded{[](memgraph::replication::RoleMainData const &) { return true; }, - [&](memgraph::replication::RoleReplicaData const &data) { - // Register handlers - memgraph::dbms::InMemoryReplicationHandlers::Register( - &dbms_handler, *data.server); - if (!data.server->Start()) { - spdlog::error("Unable to start the replication server."); - return false; - } - return true; - }}, - repl_state.ReplicationData()), - "Replica recovery failure!"); + memgraph::query::InterpreterContext interpreter_context_( + interp_config, &dbms_handler, &dbms_handler.ReplicationState(), auth_handler.get(), auth_checker.get()); + MG_ASSERT(db_acc, "Failed to access the main database"); memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(), FLAGS_data_directory); diff --git a/src/query/frontend/ast/ast.hpp b/src/query/frontend/ast/ast.hpp index dc11c3887..d63736c85 100644 --- a/src/query/frontend/ast/ast.hpp +++ b/src/query/frontend/ast/ast.hpp @@ -3025,7 +3025,7 @@ class ReplicationQuery : public memgraph::query::Query { enum class SyncMode { SYNC, ASYNC }; - enum class ReplicaState { READY, REPLICATING, RECOVERY, INVALID }; + enum class ReplicaState { READY, REPLICATING, RECOVERY, MAYBE_BEHIND }; ReplicationQuery() = default; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 354f13dc3..d957d4c2e 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -274,8 +274,7 @@ inline auto convertToReplicationMode(const ReplicationQuery::SyncMode &sync_mode class ReplQueryHandler final : public query::ReplicationQueryHandler { public: - explicit ReplQueryHandler(dbms::DbmsHandler *dbms_handler, memgraph::replication::ReplicationState *repl_state) - : dbms_handler_(dbms_handler), handler_{*repl_state, *dbms_handler} {} + explicit ReplQueryHandler(dbms::DbmsHandler *dbms_handler) : dbms_handler_(dbms_handler), handler_{*dbms_handler} {} /// @throw QueryRuntimeException if an error ocurred. void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role, std::optional port) override { @@ -404,8 +403,8 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { case storage::replication::ReplicaState::RECOVERY: replica.state = ReplicationQuery::ReplicaState::RECOVERY; break; - case storage::replication::ReplicaState::INVALID: - replica.state = ReplicationQuery::ReplicaState::INVALID; + case storage::replication::ReplicaState::MAYBE_BEHIND: + replica.state = ReplicationQuery::ReplicaState::MAYBE_BEHIND; break; } @@ -713,8 +712,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, InterpreterContext *interpreter_ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters, dbms::DbmsHandler *dbms_handler, const query::InterpreterConfig &config, - std::vector *notifications, - memgraph::replication::ReplicationState *repl_state) { + std::vector *notifications) { // TODO: MemoryResource for EvaluationContext, it should probably be passed as // the argument to Callback. EvaluationContext evaluation_context; @@ -734,8 +732,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & notifications->emplace_back(SeverityLevel::WARNING, NotificationCode::REPLICA_PORT_WARNING, "Be careful the replication port must be different from the memgraph port!"); } - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}, role = repl_query->role_, - maybe_port]() mutable { + callback.fn = [handler = ReplQueryHandler{dbms_handler}, role = repl_query->role_, maybe_port]() mutable { handler.SetReplicationRole(role, maybe_port); return std::vector>(); }; @@ -747,7 +744,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } case ReplicationQuery::Action::SHOW_REPLICATION_ROLE: { callback.header = {"replication role"}; - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}] { + callback.fn = [handler = ReplQueryHandler{dbms_handler}] { auto mode = handler.ShowReplicationRole(); switch (mode) { case ReplicationQuery::ReplicationRole::MAIN: { @@ -766,7 +763,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & auto socket_address = repl_query->socket_address_->Accept(evaluator); const auto replica_check_frequency = config.replication_replica_check_frequency; - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}, name, socket_address, sync_mode, + callback.fn = [handler = ReplQueryHandler{dbms_handler}, name, socket_address, sync_mode, replica_check_frequency]() mutable { handler.RegisterReplica(name, std::string(socket_address.ValueString()), sync_mode, replica_check_frequency); return std::vector>(); @@ -777,7 +774,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } case ReplicationQuery::Action::DROP_REPLICA: { const auto &name = repl_query->replica_name_; - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}, name]() mutable { + callback.fn = [handler = ReplQueryHandler{dbms_handler}, name]() mutable { handler.DropReplica(name); return std::vector>(); }; @@ -789,7 +786,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & callback.header = { "name", "socket_address", "sync_mode", "current_timestamp_of_replica", "number_of_timestamp_behind_master", "state"}; - callback.fn = [handler = ReplQueryHandler{dbms_handler, repl_state}, replica_nfields = callback.header.size()] { + callback.fn = [handler = ReplQueryHandler{dbms_handler}, replica_nfields = callback.header.size()] { const auto &replicas = handler.ShowReplicas(); auto typed_replicas = std::vector>{}; typed_replicas.reserve(replicas.size()); @@ -822,7 +819,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & case ReplicationQuery::ReplicaState::RECOVERY: typed_replica.emplace_back("recovery"); break; - case ReplicationQuery::ReplicaState::INVALID: + case ReplicationQuery::ReplicaState::MAYBE_BEHIND: typed_replica.emplace_back("invalid"); break; } @@ -2263,15 +2260,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, bool in_explicit_transaction, std::vector *notifications, dbms::DbmsHandler &dbms_handler, - const InterpreterConfig &config, - memgraph::replication::ReplicationState *repl_state) { + const InterpreterConfig &config) { if (in_explicit_transaction) { throw ReplicationModificationInMulticommandTxException(); } auto *replication_query = utils::Downcast(parsed_query.query); - auto callback = HandleReplicationQuery(replication_query, parsed_query.parameters, &dbms_handler, config, - notifications, repl_state); + auto callback = + HandleReplicationQuery(replication_query, parsed_query.parameters, &dbms_handler, config, notifications); return PreparedQuery{callback.header, std::move(parsed_query.required_privileges), [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( @@ -3349,8 +3345,7 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB ¤t_db, InterpreterContext *interpreter_context, - std::optional> on_change_cb, - memgraph::replication::ReplicationState *repl_state) { + std::optional> on_change_cb) { #ifdef MG_ENTERPRISE if (!license::global_license_checker.IsEnterpriseValidFast()) { throw QueryException("Trying to use enterprise feature without a valid license."); @@ -3361,9 +3356,11 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur auto *query = utils::Downcast(parsed_query.query); auto *db_handler = interpreter_context->dbms_handler; + const bool is_replica = interpreter_context->repl_state->IsReplica(); + switch (query->action_) { case MultiDatabaseQuery::Action::CREATE: - if (repl_state->IsReplica()) { + if (is_replica) { throw QueryException("Query forbidden on the replica!"); } return PreparedQuery{ @@ -3408,12 +3405,12 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur if (current_db.in_explicit_db_) { throw QueryException("Database switching is prohibited if session explicitly defines the used database"); } - if (!dbms::allow_mt_repl && repl_state->IsReplica()) { + if (!dbms::allow_mt_repl && is_replica) { throw QueryException("Query forbidden on the replica!"); } return PreparedQuery{{"STATUS"}, std::move(parsed_query.required_privileges), - [db_name = query->db_name_, db_handler, ¤t_db, on_change_cb]( + [db_name = query->db_name_, db_handler, ¤t_db, on_change = std::move(on_change_cb)]( AnyStream *stream, std::optional n) -> std::optional { std::vector> status; std::string res; @@ -3423,7 +3420,7 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur res = "Already using " + db_name; } else { auto tmp = db_handler->Get(db_name); - if (on_change_cb) (*on_change_cb)(db_name); // Will trow if cb fails + if (on_change) (*on_change)(db_name); // Will trow if cb fails current_db.SetCurrentDB(std::move(tmp), false); res = "Using " + db_name; } @@ -3442,7 +3439,7 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur query->db_name_}; case MultiDatabaseQuery::Action::DROP: - if (repl_state->IsReplica()) { + if (is_replica) { throw QueryException("Query forbidden on the replica!"); } return PreparedQuery{ @@ -3765,9 +3762,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, &query_execution->notifications, current_db_); } else if (utils::Downcast(parsed_query.query)) { /// TODO: make replication DB agnostic - prepared_query = PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, - &query_execution->notifications, *interpreter_context_->dbms_handler, - interpreter_context_->config, interpreter_context_->repl_state); + prepared_query = + PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, + *interpreter_context_->dbms_handler, interpreter_context_->config); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, current_db_); } else if (utils::Downcast(parsed_query.query)) { @@ -3807,8 +3804,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, throw MultiDatabaseQueryInMulticommandTxException(); } /// SYSTEM (Replication) + INTERPRETER - prepared_query = PrepareMultiDatabaseQuery(std::move(parsed_query), current_db_, interpreter_context_, on_change_, - interpreter_context_->repl_state); + prepared_query = + PrepareMultiDatabaseQuery(std::move(parsed_query), current_db_, interpreter_context_, on_change_); } else if (utils::Downcast(parsed_query.query)) { /// SYSTEM PURE ("SHOW DATABASES") /// INTERPRETER (TODO: "SHOW DATABASE") diff --git a/src/replication/CMakeLists.txt b/src/replication/CMakeLists.txt index 772ae5591..597ed096a 100644 --- a/src/replication/CMakeLists.txt +++ b/src/replication/CMakeLists.txt @@ -6,8 +6,10 @@ target_sources(mg-replication include/replication/epoch.hpp include/replication/config.hpp include/replication/mode.hpp + include/replication/messages.hpp include/replication/role.hpp include/replication/status.hpp + include/replication/replication_client.hpp include/replication/replication_server.hpp PRIVATE @@ -15,6 +17,8 @@ target_sources(mg-replication epoch.cpp config.cpp status.cpp + messages.cpp + replication_client.cpp replication_server.cpp ) target_include_directories(mg-replication PUBLIC include) diff --git a/src/replication/include/replication/messages.hpp b/src/replication/include/replication/messages.hpp new file mode 100644 index 000000000..57cf29351 --- /dev/null +++ b/src/replication/include/replication/messages.hpp @@ -0,0 +1,44 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "rpc/messages.hpp" +#include "slk/serialization.hpp" + +namespace memgraph::replication { + +struct FrequentHeartbeatReq { + static const utils::TypeInfo kType; // TODO: make constexpr? + static const utils::TypeInfo &GetTypeInfo() { return kType; } // WHAT? + + static void Load(FrequentHeartbeatReq *self, memgraph::slk::Reader *reader); + static void Save(const FrequentHeartbeatReq &self, memgraph::slk::Builder *builder); + FrequentHeartbeatReq() = default; +}; + +struct FrequentHeartbeatRes { + static const utils::TypeInfo kType; + static const utils::TypeInfo &GetTypeInfo() { return kType; } + + static void Load(FrequentHeartbeatRes *self, memgraph::slk::Reader *reader); + static void Save(const FrequentHeartbeatRes &self, memgraph::slk::Builder *builder); + FrequentHeartbeatRes() = default; + explicit FrequentHeartbeatRes(bool success) : success(success) {} + + bool success; +}; + +using FrequentHeartbeatRpc = rpc::RequestResponse; + +void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder); + +} // namespace memgraph::replication diff --git a/src/replication/include/replication/replication_client.hpp b/src/replication/include/replication/replication_client.hpp new file mode 100644 index 000000000..16e1010bf --- /dev/null +++ b/src/replication/include/replication/replication_client.hpp @@ -0,0 +1,82 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "replication/config.hpp" +#include "replication/messages.hpp" +#include "rpc/client.hpp" +#include "utils/scheduler.hpp" +#include "utils/thread_pool.hpp" + +#include +#include + +namespace memgraph::replication { + +template +concept InvocableWithStringView = std::invocable; + +struct ReplicationClient { + explicit ReplicationClient(const memgraph::replication::ReplicationClientConfig &config); + + ~ReplicationClient(); + ReplicationClient(ReplicationClient const &) = delete; + ReplicationClient &operator=(ReplicationClient const &) = delete; + ReplicationClient(ReplicationClient &&) noexcept = delete; + ReplicationClient &operator=(ReplicationClient &&) noexcept = delete; + + template + void StartFrequentCheck(F &&callback) { + // Help the user to get the most accurate replica state possible. + if (replica_check_frequency_ > std::chrono::seconds(0)) { + replica_checker_.Run("Replica Checker", replica_check_frequency_, [this, cb = std::forward(callback)] { + try { + bool success = false; + { + auto stream{rpc_client_.Stream()}; + success = stream.AwaitResponse().success; + } + if (success) { + cb(name_); + } + } catch (const rpc::RpcFailedException &) { + // Nothing to do...wait for a reconnect + } + }); + } + } + + std::string name_; + communication::ClientContext rpc_context_; + rpc::Client rpc_client_; + std::chrono::seconds replica_check_frequency_; + + memgraph::replication::ReplicationMode mode_{memgraph::replication::ReplicationMode::SYNC}; + // This thread pool is used for background tasks so we don't + // block the main storage thread + // We use only 1 thread for 2 reasons: + // - background tasks ALWAYS contain some kind of RPC communication. + // We can't have multiple RPC communication from a same client + // because that's not logically valid (e.g. you cannot send a snapshot + // and WAL at a same time because WAL will arrive earlier and be applied + // before the snapshot which is not correct) + // - the implementation is simplified as we have a total control of what + // this pool is executing. Also, we can simply queue multiple tasks + // and be sure of the execution order. + // Not having mulitple possible threads in the same client allows us + // to ignore concurrency problems inside the client. + utils::ThreadPool thread_pool_{1}; + + utils::Scheduler replica_checker_; +}; + +} // namespace memgraph::replication diff --git a/src/replication/include/replication/replication_server.hpp b/src/replication/include/replication/replication_server.hpp index e9ca1b549..5ff41b8a5 100644 --- a/src/replication/include/replication/replication_server.hpp +++ b/src/replication/include/replication/replication_server.hpp @@ -17,30 +17,6 @@ namespace memgraph::replication { -struct FrequentHeartbeatReq { - static const utils::TypeInfo kType; // TODO: make constexpr? - static const utils::TypeInfo &GetTypeInfo() { return kType; } // WHAT? - - static void Load(FrequentHeartbeatReq *self, memgraph::slk::Reader *reader); - static void Save(const FrequentHeartbeatReq &self, memgraph::slk::Builder *builder); - FrequentHeartbeatReq() = default; -}; - -struct FrequentHeartbeatRes { - static const utils::TypeInfo kType; - static const utils::TypeInfo &GetTypeInfo() { return kType; } - - static void Load(FrequentHeartbeatRes *self, memgraph::slk::Reader *reader); - static void Save(const FrequentHeartbeatRes &self, memgraph::slk::Builder *builder); - FrequentHeartbeatRes() = default; - explicit FrequentHeartbeatRes(bool success) : success(success) {} - - bool success; -}; - -// TODO: move to own header -using FrequentHeartbeatRpc = rpc::RequestResponse; - class ReplicationServer { public: explicit ReplicationServer(const memgraph::replication::ReplicationServerConfig &config); diff --git a/src/replication/include/replication/state.hpp b/src/replication/include/replication/state.hpp index 0460d0a9d..76aec1053 100644 --- a/src/replication/include/replication/state.hpp +++ b/src/replication/include/replication/state.hpp @@ -11,19 +11,22 @@ #pragma once -#include -#include -#include -#include - #include "kvstore/kvstore.hpp" #include "replication/config.hpp" #include "replication/epoch.hpp" #include "replication/mode.hpp" +#include "replication/replication_client.hpp" #include "replication/role.hpp" #include "replication_server.hpp" #include "status.hpp" #include "utils/result.hpp" +#include "utils/synchronized.hpp" + +#include +#include +#include +#include +#include namespace memgraph::replication { @@ -32,8 +35,17 @@ enum class RolePersisted : uint8_t { UNKNOWN_OR_NO, YES }; enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, COULD_NOT_BE_PERSISTED, NOT_MAIN, SUCCESS }; struct RoleMainData { + RoleMainData() = default; + explicit RoleMainData(ReplicationEpoch e) : epoch_(std::move(e)) {} + ~RoleMainData() = default; + + RoleMainData(RoleMainData const &) = delete; + RoleMainData &operator=(RoleMainData const &) = delete; + RoleMainData(RoleMainData &&) = default; + RoleMainData &operator=(RoleMainData &&) = default; + ReplicationEpoch epoch_; - std::vector registered_replicas_; + std::list registered_replicas_{}; }; struct RoleReplicaData { @@ -41,8 +53,10 @@ struct RoleReplicaData { std::unique_ptr server; }; +// Global (instance) level object struct ReplicationState { explicit ReplicationState(std::optional durability_dir); + ~ReplicationState() = default; ReplicationState(ReplicationState const &) = delete; ReplicationState(ReplicationState &&) = delete; @@ -74,7 +88,7 @@ struct ReplicationState { // TODO: locked access auto ReplicationData() -> ReplicationData_t & { return replication_data_; } auto ReplicationData() const -> ReplicationData_t const & { return replication_data_; } - auto RegisterReplica(const ReplicationClientConfig &config) -> RegisterReplicaError; + utils::BasicResult RegisterReplica(const ReplicationClientConfig &config); bool SetReplicationRoleMain(); diff --git a/src/replication/messages.cpp b/src/replication/messages.cpp new file mode 100644 index 000000000..4503e9df2 --- /dev/null +++ b/src/replication/messages.cpp @@ -0,0 +1,65 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "replication/messages.hpp" +#include "rpc/messages.hpp" +#include "slk/serialization.hpp" +#include "slk/streams.hpp" + +namespace memgraph::slk { +// Serialize code for FrequentHeartbeatRes +void Save(const memgraph::replication::FrequentHeartbeatRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self.success, builder); +} +void Load(memgraph::replication::FrequentHeartbeatRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(&self->success, reader); +} + +// Serialize code for FrequentHeartbeatReq +void Save(const memgraph::replication::FrequentHeartbeatReq & /*self*/, memgraph::slk::Builder * /*builder*/) { + /* Nothing to serialize */ +} +void Load(memgraph::replication::FrequentHeartbeatReq * /*self*/, memgraph::slk::Reader * /*reader*/) { + /* Nothing to serialize */ +} + +} // namespace memgraph::slk + +namespace memgraph::replication { + +constexpr utils::TypeInfo FrequentHeartbeatReq::kType{utils::TypeId::REP_FREQUENT_HEARTBEAT_REQ, "FrequentHeartbeatReq", + nullptr}; + +constexpr utils::TypeInfo FrequentHeartbeatRes::kType{utils::TypeId::REP_FREQUENT_HEARTBEAT_RES, "FrequentHeartbeatRes", + nullptr}; + +void FrequentHeartbeatReq::Save(const FrequentHeartbeatReq &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} +void FrequentHeartbeatReq::Load(FrequentHeartbeatReq *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} +void FrequentHeartbeatRes::Save(const FrequentHeartbeatRes &self, memgraph::slk::Builder *builder) { + memgraph::slk::Save(self, builder); +} +void FrequentHeartbeatRes::Load(FrequentHeartbeatRes *self, memgraph::slk::Reader *reader) { + memgraph::slk::Load(self, reader); +} + +void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { + FrequentHeartbeatReq req; + FrequentHeartbeatReq::Load(&req, req_reader); + memgraph::slk::Load(&req, req_reader); + FrequentHeartbeatRes res{true}; + memgraph::slk::Save(res, res_builder); +} + +} // namespace memgraph::replication diff --git a/src/replication/replication_client.cpp b/src/replication/replication_client.cpp new file mode 100644 index 000000000..d14250c2a --- /dev/null +++ b/src/replication/replication_client.cpp @@ -0,0 +1,40 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "replication/replication_client.hpp" + +namespace memgraph::replication { + +static auto CreateClientContext(const memgraph::replication::ReplicationClientConfig &config) + -> communication::ClientContext { + return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file} + : communication::ClientContext{}; +} + +ReplicationClient::ReplicationClient(const memgraph::replication::ReplicationClientConfig &config) + : name_{config.name}, + rpc_context_{CreateClientContext(config)}, + rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port), + &rpc_context_}, + replica_check_frequency_{config.replica_check_frequency}, + mode_{config.mode} {} + +ReplicationClient::~ReplicationClient() { + auto endpoint = rpc_client_.Endpoint(); + try { + spdlog::trace("Closing replication client on {}:{}", endpoint.address, endpoint.port); + } catch (...) { + // Logging can throw. Not a big deal, just ignore. + } + thread_pool_.Shutdown(); +} + +} // namespace memgraph::replication diff --git a/src/replication/replication_server.cpp b/src/replication/replication_server.cpp index 7d0ff3cc2..f79ea2add 100644 --- a/src/replication/replication_server.cpp +++ b/src/replication/replication_server.cpp @@ -10,25 +10,7 @@ // licenses/APL.txt. #include "replication/replication_server.hpp" -#include "rpc/messages.hpp" -#include "slk/serialization.hpp" -#include "slk/streams.hpp" - -namespace memgraph::slk { - -// Serialize code for FrequentHeartbeatRes -void Save(const memgraph::replication::FrequentHeartbeatRes &self, memgraph::slk::Builder *builder) { - memgraph::slk::Save(self.success, builder); -} -void Load(memgraph::replication::FrequentHeartbeatRes *self, memgraph::slk::Reader *reader) { - memgraph::slk::Load(&self->success, reader); -} - -// Serialize code for FrequentHeartbeatReq -void Save(const memgraph::replication::FrequentHeartbeatReq &self, memgraph::slk::Builder *builder) {} -void Load(memgraph::replication::FrequentHeartbeatReq *self, memgraph::slk::Reader *reader) {} - -} // namespace memgraph::slk +#include "replication/messages.hpp" namespace memgraph::replication { namespace { @@ -39,13 +21,6 @@ auto CreateServerContext(const memgraph::replication::ReplicationServerConfig &c : communication::ServerContext{}; } -void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { - FrequentHeartbeatReq req; - memgraph::slk::Load(&req, req_reader); - FrequentHeartbeatRes res{true}; - memgraph::slk::Save(res, res_builder); -} - // NOTE: The replication server must have a single thread for processing // because there is no need for more processing threads - each replica can // have only a single main server. Also, the single-threaded guarantee @@ -53,25 +28,6 @@ void FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder constexpr auto kReplicationServerThreads = 1; } // namespace -constexpr utils::TypeInfo FrequentHeartbeatReq::kType{utils::TypeId::REP_FREQUENT_HEARTBEAT_REQ, "FrequentHeartbeatReq", - nullptr}; - -constexpr utils::TypeInfo FrequentHeartbeatRes::kType{utils::TypeId::REP_FREQUENT_HEARTBEAT_RES, "FrequentHeartbeatRes", - nullptr}; - -void FrequentHeartbeatReq::Save(const FrequentHeartbeatReq &self, memgraph::slk::Builder *builder) { - memgraph::slk::Save(self, builder); -} -void FrequentHeartbeatReq::Load(FrequentHeartbeatReq *self, memgraph::slk::Reader *reader) { - memgraph::slk::Load(self, reader); -} -void FrequentHeartbeatRes::Save(const FrequentHeartbeatRes &self, memgraph::slk::Builder *builder) { - memgraph::slk::Save(self, builder); -} -void FrequentHeartbeatRes::Load(FrequentHeartbeatRes *self, memgraph::slk::Reader *reader) { - memgraph::slk::Load(self, reader); -} - ReplicationServer::ReplicationServer(const memgraph::replication::ReplicationServerConfig &config) : rpc_server_context_{CreateServerContext(config)}, rpc_server_{io::network::Endpoint{config.ip_address, config.port}, &rpc_server_context_, diff --git a/src/replication/state.cpp b/src/replication/state.cpp index 4551eba7e..60c390e17 100644 --- a/src/replication/state.cpp +++ b/src/replication/state.cpp @@ -11,9 +11,11 @@ #include "replication/state.hpp" +#include "replication/replication_client.hpp" #include "replication/replication_server.hpp" #include "replication/status.hpp" #include "utils/file.hpp" +#include "utils/result.hpp" #include "utils/variant_helpers.hpp" constexpr auto kReplicationDirectory = std::string_view{"replication"}; @@ -125,12 +127,9 @@ auto ReplicationState::FetchReplicationData() -> FetchReplicationResult_t { return std::visit( utils::Overloaded{ [&](durability::MainRole &&r) -> FetchReplicationResult_t { - auto res = RoleMainData{ - .epoch_ = std::move(r.epoch), - }; + auto res = RoleMainData{std::move(r.epoch)}; auto b = durability_->begin(durability::kReplicationReplicaPrefix); auto e = durability_->end(durability::kReplicationReplicaPrefix); - res.registered_replicas_.reserve(durability_->Size(durability::kReplicationReplicaPrefix)); for (; b != e; ++b) { auto const &[replica_name, replica_data] = *b; auto json = nlohmann::json::parse(replica_data, nullptr, false); @@ -141,7 +140,8 @@ auto ReplicationState::FetchReplicationData() -> FetchReplicationResult_t { if (key_name != data.config.name) { return FetchReplicationError::PARSE_ERROR; } - res.registered_replicas_.emplace_back(std::move(data.config)); + // Instance clients + res.registered_replicas_.emplace_back(data.config); } catch (...) { return FetchReplicationError::PARSE_ERROR; } @@ -221,7 +221,7 @@ bool ReplicationState::SetReplicationRoleMain() { if (!TryPersistRoleMain(new_epoch)) { return false; } - replication_data_ = RoleMainData{.epoch_ = ReplicationEpoch{new_epoch}}; + replication_data_ = RoleMainData{ReplicationEpoch{new_epoch}}; return true; } @@ -233,16 +233,14 @@ bool ReplicationState::SetReplicationRoleReplica(const ReplicationServerConfig & return true; } -auto ReplicationState::RegisterReplica(const ReplicationClientConfig &config) -> RegisterReplicaError { - auto const replica_handler = [](RoleReplicaData const &) -> RegisterReplicaError { - return RegisterReplicaError::NOT_MAIN; - }; - auto const main_handler = [this, &config](RoleMainData &mainData) -> RegisterReplicaError { +utils::BasicResult ReplicationState::RegisterReplica( + const ReplicationClientConfig &config) { + auto const replica_handler = [](RoleReplicaData const &) { return RegisterReplicaError::NOT_MAIN; }; + ReplicationClient *client{nullptr}; + auto const main_handler = [&client, &config, this](RoleMainData &mainData) -> RegisterReplicaError { // name check auto name_check = [&config](auto const &replicas) { - auto name_matches = [&name = config.name](ReplicationClientConfig const ®istered_config) { - return registered_config.name == name; - }; + auto name_matches = [&name = config.name](auto const &replica) { return replica.name_ == name; }; return std::any_of(replicas.begin(), replicas.end(), name_matches); }; if (name_check(mainData.registered_replicas_)) { @@ -251,8 +249,9 @@ auto ReplicationState::RegisterReplica(const ReplicationClientConfig &config) -> // endpoint check auto endpoint_check = [&](auto const &replicas) { - auto endpoint_matches = [&config](ReplicationClientConfig const ®istered_config) { - return registered_config.ip_address == config.ip_address && registered_config.port == config.port; + auto endpoint_matches = [&config](auto const &replica) { + const auto &ep = replica.rpc_client_.Endpoint(); + return ep.address == config.ip_address && ep.port == config.port; }; return std::any_of(replicas.begin(), replicas.end(), endpoint_matches); }; @@ -266,10 +265,14 @@ auto ReplicationState::RegisterReplica(const ReplicationClientConfig &config) -> } // set - mainData.registered_replicas_.emplace_back(config); + client = &mainData.registered_replicas_.emplace_back(config); return RegisterReplicaError::SUCCESS; }; - return std::visit(utils::Overloaded{main_handler, replica_handler}, replication_data_); + const auto &res = std::visit(utils::Overloaded{main_handler, replica_handler}, replication_data_); + if (res == RegisterReplicaError::SUCCESS) { + return client; + } + return res; } } // namespace memgraph::replication diff --git a/src/rpc/client.hpp b/src/rpc/client.hpp index f727391ac..1fd3fff8d 100644 --- a/src/rpc/client.hpp +++ b/src/rpc/client.hpp @@ -14,6 +14,7 @@ #include #include #include +#include #include "communication/client.hpp" #include "io/network/endpoint.hpp" @@ -41,16 +42,25 @@ class Client { StreamHandler(Client *self, std::unique_lock &&guard, std::function res_load) - : self_(self), - guard_(std::move(guard)), - req_builder_([self](const uint8_t *data, size_t size, bool have_more) { - if (!self->client_->Write(data, size, have_more)) throw GenericRpcFailedException(); - }), - res_load_(res_load) {} + : self_(self), guard_(std::move(guard)), req_builder_(GenBuilderCallback(self, this)), res_load_(res_load) {} public: - StreamHandler(StreamHandler &&) noexcept = default; - StreamHandler &operator=(StreamHandler &&) noexcept = default; + StreamHandler(StreamHandler &&other) noexcept + : self_{std::exchange(other.self_, nullptr)}, + defunct_{std::exchange(other.defunct_, true)}, + guard_{std::move(other.guard_)}, + req_builder_{std::move(other.req_builder_), GenBuilderCallback(self_, this)}, + res_load_{std::move(other.res_load_)} {} + StreamHandler &operator=(StreamHandler &&other) noexcept { + if (&other != this) { + self_ = std::exchange(other.self_, nullptr); + defunct_ = std::exchange(other.defunct_, true); + guard_ = std::move(other.guard_); + req_builder_ = slk::Builder(std::move(other.req_builder_, GenBuilderCallback(self_, this))); + res_load_ = std::move(other.res_load_); + } + return *this; + } StreamHandler(const StreamHandler &) = delete; StreamHandler &operator=(const StreamHandler &) = delete; @@ -70,10 +80,18 @@ class Client { while (true) { auto ret = slk::CheckStreamComplete(self_->client_->GetData(), self_->client_->GetDataSize()); if (ret.status == slk::StreamStatus::INVALID) { + // Logically invalid state, connection is still up, defunct stream and release + defunct_ = true; + guard_.unlock(); throw GenericRpcFailedException(); - } else if (ret.status == slk::StreamStatus::PARTIAL) { + } + if (ret.status == slk::StreamStatus::PARTIAL) { if (!self_->client_->Read(ret.stream_size - self_->client_->GetDataSize(), /* exactly_len = */ false)) { + // Failed connection, abort and let somebody retry in the future + defunct_ = true; + self_->Abort(); + guard_.unlock(); throw GenericRpcFailedException(); } } else { @@ -103,7 +121,9 @@ class Client { // Check the response ID. if (res_id != res_type.id && res_id != utils::TypeId::UNKNOWN) { spdlog::error("Message response was of unexpected type"); - self_->client_ = std::nullopt; + // Logically invalid state, connection is still up, defunct stream and release + defunct_ = true; + guard_.unlock(); throw GenericRpcFailedException(); } @@ -112,8 +132,23 @@ class Client { return res_load_(&res_reader); } + bool IsDefunct() const { return defunct_; } + private: + static auto GenBuilderCallback(Client *client, StreamHandler *self) { + return [client, self](const uint8_t *data, size_t size, bool have_more) { + if (self->defunct_) throw GenericRpcFailedException(); + if (!client->client_->Write(data, size, have_more)) { + self->defunct_ = true; + client->Abort(); + self->guard_.unlock(); + throw GenericRpcFailedException(); + } + }; + } + Client *self_; + bool defunct_ = false; std::unique_lock guard_; slk::Builder req_builder_; std::function res_load_; @@ -179,7 +214,7 @@ class Client { TRequestResponse::Request::Save(request, handler.GetBuilder()); // Return the handler to the user. - return std::move(handler); + return handler; } /// Call a previously defined and registered RPC call. This function can diff --git a/src/slk/streams.cpp b/src/slk/streams.cpp index 5125d635a..dc5ef8c3c 100644 --- a/src/slk/streams.cpp +++ b/src/slk/streams.cpp @@ -30,7 +30,7 @@ void Builder::Save(const uint8_t *data, uint64_t size) { to_write = kSegmentMaxDataSize - pos_; } - memcpy(segment_ + sizeof(SegmentSize) + pos_, data + offset, to_write); + memcpy(segment_.data() + sizeof(SegmentSize) + pos_, data + offset, to_write); size -= to_write; pos_ += to_write; @@ -48,15 +48,15 @@ void Builder::FlushSegment(bool final_segment) { size_t total_size = sizeof(SegmentSize) + pos_; SegmentSize size = pos_; - memcpy(segment_, &size, sizeof(SegmentSize)); + memcpy(segment_.data(), &size, sizeof(SegmentSize)); if (final_segment) { SegmentSize footer = 0; - memcpy(segment_ + total_size, &footer, sizeof(SegmentSize)); + memcpy(segment_.data() + total_size, &footer, sizeof(SegmentSize)); total_size += sizeof(SegmentSize); } - write_func_(segment_, total_size, !final_segment); + write_func_(segment_.data(), total_size, !final_segment); pos_ = 0; } diff --git a/src/slk/streams.hpp b/src/slk/streams.hpp index 587b7830b..691189443 100644 --- a/src/slk/streams.hpp +++ b/src/slk/streams.hpp @@ -46,7 +46,11 @@ static_assert(kSegmentMaxDataSize <= std::numeric_limits::max(), /// Builder used to create a SLK segment stream. class Builder { public: - Builder(std::function write_func); + explicit Builder(std::function write_func); + Builder(Builder &&other, std::function write_func) + : write_func_{std::move(write_func)}, pos_{std::exchange(other.pos_, 0)}, segment_{other.segment_} { + other.write_func_ = [](const uint8_t *, size_t, bool) { /* Moved builder is defunct, no write possible */ }; + } /// Function used internally by SLK to serialize the data. void Save(const uint8_t *data, uint64_t size); @@ -59,7 +63,7 @@ class Builder { std::function write_func_; size_t pos_{0}; - uint8_t segment_[kSegmentMaxTotalSize]; + std::array segment_; }; /// Exception that will be thrown if segments can't be decoded from the byte diff --git a/src/storage/v2/CMakeLists.txt b/src/storage/v2/CMakeLists.txt index 9f6d8d4d7..150a02cc7 100644 --- a/src/storage/v2/CMakeLists.txt +++ b/src/storage/v2/CMakeLists.txt @@ -39,6 +39,7 @@ add_library(mg-storage-v2 STATIC replication/slk.cpp replication/rpc.cpp replication/replication_storage_state.cpp - inmemory/replication/replication_client.cpp + inmemory/replication/recovery.cpp ) + target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk mg-events mg-memory) diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index 3575e685d..e93566c09 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -313,12 +313,6 @@ class DiskStorage final : public Storage { uint64_t CommitTimestamp(std::optional desired_commit_timestamp = {}); - auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig & /*config*/, - const memgraph::replication::ReplicationEpoch * /*current_epoch*/) - -> std::unique_ptr override { - throw utils::BasicException("Disk storage mode does not support replication."); - } - std::unique_ptr kvstore_; DurableMetadata durable_metadata_; EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE}; diff --git a/src/storage/v2/durability/durability.cpp b/src/storage/v2/durability/durability.cpp index 1240bd52e..9a43a2876 100644 --- a/src/storage/v2/durability/durability.cpp +++ b/src/storage/v2/durability/durability.cpp @@ -96,6 +96,7 @@ std::vector GetSnapshotFiles(const std::filesystem::path MG_ASSERT(!error_code, "Couldn't recover data because an error occurred: {}!", error_code.message()); } + std::sort(snapshot_files.begin(), snapshot_files.end()); return snapshot_files; } @@ -106,13 +107,17 @@ std::optional> GetWalFiles(const std::filesystem: std::vector wal_files; std::error_code error_code; + // There could be multiple "current" WAL files, the "_current" tag just means that the previous session didn't + // finalize. We cannot skip based on name, will be able to skip based on invalid data or sequence number, so the + // actual current wal will be skipped for (const auto &item : std::filesystem::directory_iterator(wal_directory, error_code)) { if (!item.is_regular_file()) continue; try { auto info = ReadWalInfo(item.path()); - if ((uuid.empty() || info.uuid == uuid) && (!current_seq_num || info.seq_num < *current_seq_num)) + if ((uuid.empty() || info.uuid == uuid) && (!current_seq_num || info.seq_num < *current_seq_num)) { wal_files.emplace_back(info.seq_num, info.from_timestamp, info.to_timestamp, std::move(info.uuid), std::move(info.epoch_id), item.path()); + } } catch (const RecoveryFailure &e) { spdlog::warn("Failed to read {}", item.path()); continue; @@ -120,6 +125,7 @@ std::optional> GetWalFiles(const std::filesystem: } MG_ASSERT(!error_code, "Couldn't recover data because an error occurred: {}!", error_code.message()); + // Sort based on the sequence number, not the file name std::sort(wal_files.begin(), wal_files.end()); return std::move(wal_files); } @@ -246,8 +252,6 @@ std::optional RecoverData(const std::filesystem::path &snapshot_di std::optional snapshot_timestamp; if (!snapshot_files.empty()) { spdlog::info("Try recovering from snapshot directory {}.", snapshot_directory); - // Order the files by name - std::sort(snapshot_files.begin(), snapshot_files.end()); // UUID used for durability is the UUID of the last snapshot file. *uuid = snapshot_files.back().uuid; diff --git a/src/storage/v2/inmemory/replication/recovery.cpp b/src/storage/v2/inmemory/replication/recovery.cpp new file mode 100644 index 000000000..b62fdc2f9 --- /dev/null +++ b/src/storage/v2/inmemory/replication/recovery.cpp @@ -0,0 +1,238 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v2/inmemory/replication/recovery.hpp" +#include +#include +#include +#include +#include "storage/v2/durability/durability.hpp" +#include "storage/v2/inmemory/storage.hpp" +#include "storage/v2/replication/recovery.hpp" +#include "utils/on_scope_exit.hpp" +#include "utils/variant_helpers.hpp" + +namespace memgraph::storage { + +// Handler for transferring the current WAL file whose data is +// contained in the internal buffer and the file. +class InMemoryCurrentWalHandler { + public: + explicit InMemoryCurrentWalHandler(InMemoryStorage const *storage, rpc::Client &rpc_client); + void AppendFilename(const std::string &filename); + + void AppendSize(size_t size); + + void AppendFileData(utils::InputFile *file); + + void AppendBufferData(const uint8_t *buffer, size_t buffer_size); + + /// @throw rpc::RpcFailedException + replication::CurrentWalRes Finalize(); + + private: + rpc::Client::StreamHandler stream_; +}; + +////// CurrentWalHandler ////// +InMemoryCurrentWalHandler::InMemoryCurrentWalHandler(InMemoryStorage const *storage, rpc::Client &rpc_client) + : stream_(rpc_client.Stream(storage->id())) {} + +void InMemoryCurrentWalHandler::AppendFilename(const std::string &filename) { + replication::Encoder encoder(stream_.GetBuilder()); + encoder.WriteString(filename); +} + +void InMemoryCurrentWalHandler::AppendSize(const size_t size) { + replication::Encoder encoder(stream_.GetBuilder()); + encoder.WriteUint(size); +} + +void InMemoryCurrentWalHandler::AppendFileData(utils::InputFile *file) { + replication::Encoder encoder(stream_.GetBuilder()); + encoder.WriteFileData(file); +} + +void InMemoryCurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) { + replication::Encoder encoder(stream_.GetBuilder()); + encoder.WriteBuffer(buffer, buffer_size); +} + +replication::CurrentWalRes InMemoryCurrentWalHandler::Finalize() { return stream_.AwaitResponse(); } + +////// ReplicationClient Helpers ////// +replication::WalFilesRes TransferWalFiles(std::string db_name, rpc::Client &client, + const std::vector &wal_files) { + MG_ASSERT(!wal_files.empty(), "Wal files list is empty!"); + auto stream = client.Stream(std::move(db_name), wal_files.size()); + replication::Encoder encoder(stream.GetBuilder()); + for (const auto &wal : wal_files) { + spdlog::debug("Sending wal file: {}", wal); + encoder.WriteFile(wal); + } + return stream.AwaitResponse(); +} + +replication::SnapshotRes TransferSnapshot(std::string db_name, rpc::Client &client, const std::filesystem::path &path) { + auto stream = client.Stream(std::move(db_name)); + replication::Encoder encoder(stream.GetBuilder()); + encoder.WriteFile(path); + return stream.AwaitResponse(); +} + +uint64_t ReplicateCurrentWal(const InMemoryStorage *storage, rpc::Client &client, durability::WalFile const &wal_file) { + InMemoryCurrentWalHandler stream{storage, client}; + stream.AppendFilename(wal_file.Path().filename()); + utils::InputFile file; + MG_ASSERT(file.Open(wal_file.Path()), "Failed to open current WAL file at {}!", wal_file.Path()); + const auto [buffer, buffer_size] = wal_file.CurrentFileBuffer(); + stream.AppendSize(file.GetSize() + buffer_size); + stream.AppendFileData(&file); + stream.AppendBufferData(buffer, buffer_size); + auto response = stream.Finalize(); + return response.current_commit_timestamp; +} + +/// This method tries to find the optimal path for recoverying a single replica. +/// Based on the last commit transfered to replica it tries to update the +/// replica using durability files - WALs and Snapshots. WAL files are much +/// smaller in size as they contain only the Deltas (changes) made during the +/// transactions while Snapshots contain all the data. For that reason we prefer +/// WALs as much as possible. As the WAL file that is currently being updated +/// can change during the process we ignore it as much as possible. Also, it +/// uses the transaction lock so locking it can be really expensive. After we +/// fetch the list of finalized WALs, we try to find the longest chain of +/// sequential WALs, starting from the latest one, that will update the recovery +/// with the all missed updates. If the WAL chain cannot be created, replica is +/// behind by a lot, so we use the regular recovery process, we send the latest +/// snapshot and all the necessary WAL files, starting from the newest WAL that +/// contains a timestamp before the snapshot. If we registered the existence of +/// the current WAL, we add the sequence number we read from it to the recovery +/// process. After all the other steps are finished, if the current WAL contains +/// the same sequence number, it's the same WAL we read while fetching the +/// recovery steps, so we can safely send it to the replica. +/// We assume that the property of preserving at least 1 WAL before the snapshot +/// is satisfied as we extract the timestamp information from it. +std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker, + const InMemoryStorage *storage) { + std::vector recovery_steps; + auto locker_acc = file_locker->Access(); + + // First check if we can recover using the current wal file only + // otherwise save the seq_num of the current wal file + // This lock is also necessary to force the missed transaction to finish. + std::optional current_wal_seq_num; + std::optional current_wal_from_timestamp; + + std::unique_lock transaction_guard( + storage->engine_lock_); // Hold the storage lock so the current wal file cannot be changed + (void)locker_acc.AddPath(storage->wal_directory_); // Protect all WALs from being deleted + + if (storage->wal_file_) { + current_wal_seq_num.emplace(storage->wal_file_->SequenceNumber()); + current_wal_from_timestamp.emplace(storage->wal_file_->FromTimestamp()); + // No need to hold the lock since the current WAL is present and we can simply skip them + transaction_guard.unlock(); + } + + // Read in finalized WAL files (excluding the current/active WAL) + utils::OnScopeExit + release_wal_dir( // Each individually used file will be locked, so at the end, the dir can be released + [&locker_acc, &wal_dir = storage->wal_directory_]() { (void)locker_acc.RemovePath(wal_dir); }); + // Get WAL files, ordered by timestamp, from oldest to newest + auto wal_files = durability::GetWalFiles(storage->wal_directory_, storage->uuid_, current_wal_seq_num); + MG_ASSERT(wal_files, "Wal files could not be loaded"); + if (transaction_guard.owns_lock()) + transaction_guard.unlock(); // In case we didn't have a current wal file, we can unlock only now since there is no + // guarantee what we'll see after we add the wal file + + // Read in snapshot files + (void)locker_acc.AddPath(storage->snapshot_directory_); // Protect all snapshots from being deleted + utils::OnScopeExit + release_snapshot_dir( // Each individually used file will be locked, so at the end, the dir can be released + [&locker_acc, &snapshot_dir = storage->snapshot_directory_]() { (void)locker_acc.RemovePath(snapshot_dir); }); + auto snapshot_files = durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_); + std::optional latest_snapshot{}; + if (!snapshot_files.empty()) { + latest_snapshot.emplace(std::move(snapshot_files.back())); + } + + auto add_snapshot = [&]() { + if (!latest_snapshot) return; + const auto lock_success = locker_acc.AddPath(latest_snapshot->path); + MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant snapshot path."); + recovery_steps.emplace_back(std::in_place_type_t{}, std::move(latest_snapshot->path)); + }; + + // Check if we need the snapshot or if the WAL chain is enough + if (!wal_files->empty()) { + // Find WAL chain that contains the replica's commit timestamp + auto wal_chain_it = wal_files->rbegin(); + auto prev_seq{wal_chain_it->seq_num}; + for (; wal_chain_it != wal_files->rend(); ++wal_chain_it) { + if (prev_seq - wal_chain_it->seq_num > 1) { + // Broken chain, must have a snapshot that covers the missing commits + if (wal_chain_it->from_timestamp > replica_commit) { + // Chain does not go far enough, check the snapshot + MG_ASSERT(latest_snapshot, "Missing snapshot, while the WAL chain does not cover enough time."); + // Check for a WAL file that connects the snapshot to the chain + for (;; --wal_chain_it) { + // Going from the newest WAL files, find the first one that has a from_timestamp older than the snapshot + // NOTE: It could be that the only WAL needed is the current one + if (wal_chain_it->from_timestamp <= latest_snapshot->start_timestamp) { + break; + } + if (wal_chain_it == wal_files->rbegin()) break; + } + // Add snapshot to recovery steps + add_snapshot(); + } + break; + } + + if (wal_chain_it->to_timestamp <= replica_commit) { + // Got to a WAL that is older than what we need to recover the replica + break; + } + + prev_seq = wal_chain_it->seq_num; + } + + // Copy and lock the chain part we need, from oldest to newest + RecoveryWals rw{}; + rw.reserve(std::distance(wal_files->rbegin(), wal_chain_it)); + for (auto wal_it = wal_chain_it.base(); wal_it != wal_files->end(); ++wal_it) { + const auto lock_success = locker_acc.AddPath(wal_it->path); + MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant WAL path."); + rw.emplace_back(std::move(wal_it->path)); + } + if (!rw.empty()) { + recovery_steps.emplace_back(std::in_place_type_t{}, std::move(rw)); + } + + } else { + // No WAL chain, check if we need the snapshot + if (!current_wal_from_timestamp || replica_commit < *current_wal_from_timestamp) { + // No current wal or current wal too new + add_snapshot(); + } + } + + // In all cases, if we have a current wal file we need to use itW + if (current_wal_seq_num) { + // NOTE: File not handled directly, so no need to lock it + recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); + } + + return recovery_steps; +} + +} // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/replication/recovery.hpp b/src/storage/v2/inmemory/replication/recovery.hpp new file mode 100644 index 000000000..2025800ab --- /dev/null +++ b/src/storage/v2/inmemory/replication/recovery.hpp @@ -0,0 +1,32 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include "storage/v2/durability/durability.hpp" +#include "storage/v2/replication/recovery.hpp" +#include "storage/v2/replication/replication_client.hpp" + +namespace memgraph::storage { +class InMemoryStorage; + +////// ReplicationClient Helpers ////// + +replication::WalFilesRes TransferWalFiles(std::string db_name, rpc::Client &client, + const std::vector &wal_files); + +replication::SnapshotRes TransferSnapshot(std::string db_name, rpc::Client &client, const std::filesystem::path &path); + +uint64_t ReplicateCurrentWal(const InMemoryStorage *storage, rpc::Client &client, durability::WalFile const &wal_file); + +auto GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker, + const InMemoryStorage *storage) -> std::vector; + +} // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/replication/replication_client.cpp b/src/storage/v2/inmemory/replication/replication_client.cpp deleted file mode 100644 index b8ecc1c72..000000000 --- a/src/storage/v2/inmemory/replication/replication_client.cpp +++ /dev/null @@ -1,349 +0,0 @@ -// Copyright 2023 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#include "storage/v2/inmemory/replication/replication_client.hpp" - -#include "storage/v2/durability/durability.hpp" -#include "storage/v2/inmemory/storage.hpp" - -namespace memgraph::storage { - -namespace { -template -[[maybe_unused]] inline constexpr bool always_false_v = false; -} // namespace - -// Handler for transfering the current WAL file whose data is -// contained in the internal buffer and the file. -class CurrentWalHandler { - public: - explicit CurrentWalHandler(ReplicationClient *self); - void AppendFilename(const std::string &filename); - - void AppendSize(size_t size); - - void AppendFileData(utils::InputFile *file); - - void AppendBufferData(const uint8_t *buffer, size_t buffer_size); - - /// @throw rpc::RpcFailedException - replication::CurrentWalRes Finalize(); - - private: - ReplicationClient *self_; - rpc::Client::StreamHandler stream_; -}; - -////// CurrentWalHandler ////// -CurrentWalHandler::CurrentWalHandler(ReplicationClient *self) - : self_(self), stream_(self_->rpc_client_.Stream(self->GetStorageId())) {} - -void CurrentWalHandler::AppendFilename(const std::string &filename) { - replication::Encoder encoder(stream_.GetBuilder()); - encoder.WriteString(filename); -} - -void CurrentWalHandler::AppendSize(const size_t size) { - replication::Encoder encoder(stream_.GetBuilder()); - encoder.WriteUint(size); -} - -void CurrentWalHandler::AppendFileData(utils::InputFile *file) { - replication::Encoder encoder(stream_.GetBuilder()); - encoder.WriteFileData(file); -} - -void CurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) { - replication::Encoder encoder(stream_.GetBuilder()); - encoder.WriteBuffer(buffer, buffer_size); -} - -replication::CurrentWalRes CurrentWalHandler::Finalize() { return stream_.AwaitResponse(); } - -////// ReplicationClient Helpers ////// - -replication::WalFilesRes TransferWalFiles(std::string db_name, rpc::Client &client, - const std::vector &wal_files) { - MG_ASSERT(!wal_files.empty(), "Wal files list is empty!"); - auto stream = client.Stream(std::move(db_name), wal_files.size()); - replication::Encoder encoder(stream.GetBuilder()); - for (const auto &wal : wal_files) { - spdlog::debug("Sending wal file: {}", wal); - encoder.WriteFile(wal); - } - return stream.AwaitResponse(); -} - -replication::SnapshotRes TransferSnapshot(std::string db_name, rpc::Client &client, const std::filesystem::path &path) { - auto stream = client.Stream(std::move(db_name)); - replication::Encoder encoder(stream.GetBuilder()); - encoder.WriteFile(path); - return stream.AwaitResponse(); -} - -uint64_t ReplicateCurrentWal(CurrentWalHandler &stream, durability::WalFile const &wal_file) { - stream.AppendFilename(wal_file.Path().filename()); - utils::InputFile file; - MG_ASSERT(file.Open(wal_file.Path()), "Failed to open current WAL file!"); - const auto [buffer, buffer_size] = wal_file.CurrentFileBuffer(); - stream.AppendSize(file.GetSize() + buffer_size); - stream.AppendFileData(&file); - stream.AppendBufferData(buffer, buffer_size); - auto response = stream.Finalize(); - return response.current_commit_timestamp; -} - -////// ReplicationClient ////// - -InMemoryReplicationClient::InMemoryReplicationClient(InMemoryStorage *storage, - const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *epoch) - : ReplicationClient{storage, config, epoch} {} - -void InMemoryReplicationClient::RecoverReplica(uint64_t replica_commit) { - spdlog::debug("Starting replica recover"); - auto *storage = static_cast(storage_); - while (true) { - auto file_locker = storage->file_retainer_.AddLocker(); - - const auto steps = GetRecoverySteps(replica_commit, &file_locker); - int i = 0; - for (const InMemoryReplicationClient::RecoveryStep &recovery_step : steps) { - spdlog::trace("Recovering in step: {}", i++); - try { - std::visit( - [&, this](T &&arg) { - using StepType = std::remove_cvref_t; - if constexpr (std::is_same_v) { // TODO: split into 3 overloads - spdlog::debug("Sending the latest snapshot file: {}", arg); - auto response = TransferSnapshot(storage->id(), rpc_client_, arg); - replica_commit = response.current_commit_timestamp; - } else if constexpr (std::is_same_v) { - spdlog::debug("Sending the latest wal files"); - auto response = TransferWalFiles(storage->id(), rpc_client_, arg); - replica_commit = response.current_commit_timestamp; - spdlog::debug("Wal files successfully transferred."); - } else if constexpr (std::is_same_v) { - std::unique_lock transaction_guard(storage->engine_lock_); - if (storage->wal_file_ && storage->wal_file_->SequenceNumber() == arg.current_wal_seq_num) { - storage->wal_file_->DisableFlushing(); - transaction_guard.unlock(); - spdlog::debug("Sending current wal file"); - auto streamHandler = CurrentWalHandler{this}; - replica_commit = ReplicateCurrentWal(streamHandler, *storage->wal_file_); - storage->wal_file_->EnableFlushing(); - } else { - spdlog::debug("Cannot recover using current wal file"); - } - } else { - static_assert(always_false_v, "Missing type from variant visitor"); - } - }, - recovery_step); - } catch (const rpc::RpcFailedException &) { - { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); - } - HandleRpcFailure(); - return; - } - } - - spdlog::trace("Current timestamp on replica: {}", replica_commit); - // To avoid the situation where we read a correct commit timestamp in - // one thread, and after that another thread commits a different a - // transaction and THEN we set the state to READY in the first thread, - // we set this lock before checking the timestamp. - // We will detect that the state is invalid during the next commit, - // because replication::AppendDeltasRpc sends the last commit timestamp which - // replica checks if it's the same last commit timestamp it received - // and we will go to recovery. - // By adding this lock, we can avoid that, and go to RECOVERY immediately. - std::unique_lock client_guard{client_lock_}; - const auto last_commit_timestamp = LastCommitTimestamp(); - SPDLOG_INFO("Replica timestamp: {}", replica_commit); - SPDLOG_INFO("Last commit: {}", last_commit_timestamp); - if (last_commit_timestamp == replica_commit) { - replica_state_.store(replication::ReplicaState::READY); - return; - } - } -} - -/// This method tries to find the optimal path for recoverying a single replica. -/// Based on the last commit transfered to replica it tries to update the -/// replica using durability files - WALs and Snapshots. WAL files are much -/// smaller in size as they contain only the Deltas (changes) made during the -/// transactions while Snapshots contain all the data. For that reason we prefer -/// WALs as much as possible. As the WAL file that is currently being updated -/// can change during the process we ignore it as much as possible. Also, it -/// uses the transaction lock so locking it can be really expensive. After we -/// fetch the list of finalized WALs, we try to find the longest chain of -/// sequential WALs, starting from the latest one, that will update the recovery -/// with the all missed updates. If the WAL chain cannot be created, replica is -/// behind by a lot, so we use the regular recovery process, we send the latest -/// snapshot and all the necessary WAL files, starting from the newest WAL that -/// contains a timestamp before the snapshot. If we registered the existence of -/// the current WAL, we add the sequence number we read from it to the recovery -/// process. After all the other steps are finished, if the current WAL contains -/// the same sequence number, it's the same WAL we read while fetching the -/// recovery steps, so we can safely send it to the replica. -/// We assume that the property of preserving at least 1 WAL before the snapshot -/// is satisfied as we extract the timestamp information from it. -std::vector InMemoryReplicationClient::GetRecoverySteps( - const uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker) { - // First check if we can recover using the current wal file only - // otherwise save the seq_num of the current wal file - // This lock is also necessary to force the missed transaction to finish. - std::optional current_wal_seq_num; - std::optional current_wal_from_timestamp; - auto *storage = static_cast(storage_); - if (std::unique_lock transtacion_guard(storage->engine_lock_); storage->wal_file_) { - current_wal_seq_num.emplace(storage->wal_file_->SequenceNumber()); - current_wal_from_timestamp.emplace(storage->wal_file_->FromTimestamp()); - } - - auto locker_acc = file_locker->Access(); - auto wal_files = durability::GetWalFiles(storage->wal_directory_, storage->uuid_, current_wal_seq_num); - MG_ASSERT(wal_files, "Wal files could not be loaded"); - - auto snapshot_files = durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_); - std::optional latest_snapshot; - if (!snapshot_files.empty()) { - std::sort(snapshot_files.begin(), snapshot_files.end()); - latest_snapshot.emplace(std::move(snapshot_files.back())); - } - - std::vector recovery_steps; - - // No finalized WAL files were found. This means the difference is contained - // inside the current WAL or the snapshot. - if (wal_files->empty()) { - if (current_wal_from_timestamp && replica_commit >= *current_wal_from_timestamp) { - MG_ASSERT(current_wal_seq_num); - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - return recovery_steps; - } - - // Without the finalized WAL containing the current timestamp of replica, - // we cannot know if the difference is only in the current WAL or we need - // to send the snapshot. - if (latest_snapshot) { - const auto lock_success = locker_acc.AddPath(latest_snapshot->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - recovery_steps.emplace_back(std::in_place_type_t{}, std::move(latest_snapshot->path)); - } - // if there are no finalized WAL files, snapshot left the current WAL - // as the WAL file containing a transaction before snapshot creation - // so we can be sure that the current WAL is present - MG_ASSERT(current_wal_seq_num); - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - return recovery_steps; - } - - // Find the longest chain of WALs for recovery. - // The chain consists ONLY of sequential WALs. - auto rwal_it = wal_files->rbegin(); - - // if the last finalized WAL is before the replica commit - // then we can recovery only from current WAL - if (rwal_it->to_timestamp <= replica_commit) { - MG_ASSERT(current_wal_seq_num); - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - return recovery_steps; - } - - uint64_t previous_seq_num{rwal_it->seq_num}; - for (; rwal_it != wal_files->rend(); ++rwal_it) { - // If the difference between two consecutive wal files is not 0 or 1 - // we have a missing WAL in our chain - if (previous_seq_num - rwal_it->seq_num > 1) { - break; - } - - // Find first WAL that contains up to replica commit, i.e. WAL - // that is before the replica commit or conatins the replica commit - // as the last committed transaction OR we managed to find the first WAL - // file. - if (replica_commit >= rwal_it->from_timestamp || rwal_it->seq_num == 0) { - if (replica_commit >= rwal_it->to_timestamp) { - // We want the WAL after because the replica already contains all the - // commits from this WAL - --rwal_it; - } - std::vector wal_chain; - auto distance_from_first = std::distance(rwal_it, wal_files->rend() - 1); - // We have managed to create WAL chain - // We need to lock these files and add them to the chain - for (auto result_wal_it = wal_files->begin() + distance_from_first; result_wal_it != wal_files->end(); - ++result_wal_it) { - const auto lock_success = locker_acc.AddPath(result_wal_it->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - wal_chain.push_back(std::move(result_wal_it->path)); - } - - recovery_steps.emplace_back(std::in_place_type_t{}, std::move(wal_chain)); - - if (current_wal_seq_num) { - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - } - return recovery_steps; - } - - previous_seq_num = rwal_it->seq_num; - } - - MG_ASSERT(latest_snapshot, "Invalid durability state, missing snapshot"); - // We didn't manage to find a WAL chain, we need to send the latest snapshot - // with its WALs - const auto lock_success = locker_acc.AddPath(latest_snapshot->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - recovery_steps.emplace_back(std::in_place_type_t{}, std::move(latest_snapshot->path)); - - std::vector recovery_wal_files; - auto wal_it = wal_files->begin(); - for (; wal_it != wal_files->end(); ++wal_it) { - // Assuming recovery process is correct the snashpot should - // always retain a single WAL that contains a transaction - // before its creation - if (latest_snapshot->start_timestamp < wal_it->to_timestamp) { - if (latest_snapshot->start_timestamp < wal_it->from_timestamp) { - MG_ASSERT(wal_it != wal_files->begin(), "Invalid durability files state"); - --wal_it; - } - break; - } - } - - for (; wal_it != wal_files->end(); ++wal_it) { - const auto lock_success = locker_acc.AddPath(wal_it->path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - recovery_wal_files.push_back(std::move(wal_it->path)); - } - - // We only have a WAL before the snapshot - if (recovery_wal_files.empty()) { - const auto lock_success = locker_acc.AddPath(wal_files->back().path); - MG_ASSERT(!lock_success.HasError(), "Tried to lock a nonexistant path."); - recovery_wal_files.push_back(std::move(wal_files->back().path)); - } - - recovery_steps.emplace_back(std::in_place_type_t{}, std::move(recovery_wal_files)); - - if (current_wal_seq_num) { - recovery_steps.emplace_back(RecoveryCurrentWal{*current_wal_seq_num}); - } - - return recovery_steps; -} - -} // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/replication/replication_client.hpp b/src/storage/v2/inmemory/replication/replication_client.hpp deleted file mode 100644 index e956838e7..000000000 --- a/src/storage/v2/inmemory/replication/replication_client.hpp +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2023 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. -#pragma once - -#include "storage/v2/replication/replication_client.hpp" - -namespace memgraph::storage { - -class InMemoryStorage; - -class InMemoryReplicationClient : public ReplicationClient { - public: - InMemoryReplicationClient(InMemoryStorage *storage, const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *epoch); - - protected: - void RecoverReplica(uint64_t replica_commit) override; - - // TODO: move the GetRecoverySteps stuff below as an internal detail - using RecoverySnapshot = std::filesystem::path; - using RecoveryWals = std::vector; - struct RecoveryCurrentWal { - explicit RecoveryCurrentWal(const uint64_t current_wal_seq_num) : current_wal_seq_num(current_wal_seq_num) {} - uint64_t current_wal_seq_num; - }; - using RecoveryStep = std::variant; - std::vector GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker); -}; - -} // namespace memgraph::storage diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 9f00081f6..c27018d24 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -18,7 +18,7 @@ /// REPLICATION /// #include "dbms/inmemory/replication_handlers.hpp" -#include "storage/v2/inmemory/replication/replication_client.hpp" +#include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" #include "utils/resource_lock.hpp" #include "utils/stat.hpp" @@ -1608,7 +1608,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction // A single transaction will always be contained in a single WAL file. auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire); - repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber()); + repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber(), this); auto append_deltas = [&](auto callback) { // Helper lambda that traverses the delta chain on order to find the first @@ -1767,7 +1767,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction wal_file_->AppendTransactionEnd(final_commit_timestamp); FinalizeWalFile(); - return repl_storage_state_.FinalizeTransaction(final_commit_timestamp); + return repl_storage_state_.FinalizeTransaction(final_commit_timestamp, this); } bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, uint64_t final_commit_timestamp) { @@ -1775,7 +1775,7 @@ bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, return true; } - repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber()); + repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber(), this); for (const auto &md_delta : transaction.md_deltas) { switch (md_delta.action) { @@ -1846,7 +1846,7 @@ bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, wal_file_->AppendTransactionEnd(final_commit_timestamp); FinalizeWalFile(); - return repl_storage_state_.FinalizeTransaction(final_commit_timestamp); + return repl_storage_state_.FinalizeTransaction(final_commit_timestamp, this); } void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label, @@ -1972,12 +1972,6 @@ utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath() return true; } -auto InMemoryStorage::CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *current_epoch) - -> std::unique_ptr { - return std::make_unique(this, config, current_epoch); -} - std::unique_ptr InMemoryStorage::Access(std::optional override_isolation_level, bool is_main) { return std::unique_ptr(new InMemoryAccessor{Storage::Accessor::shared_access, this, diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index bfb445332..3c50326b2 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -18,10 +18,13 @@ #include "storage/v2/indices/label_index_stats.hpp" #include "storage/v2/inmemory/label_index.hpp" #include "storage/v2/inmemory/label_property_index.hpp" +#include "storage/v2/inmemory/replication/recovery.hpp" +#include "storage/v2/replication/replication_client.hpp" #include "storage/v2/storage.hpp" /// REPLICATION /// #include "replication/config.hpp" +#include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/replication_storage_state.hpp" #include "storage/v2/replication/rpc.hpp" @@ -44,7 +47,10 @@ namespace memgraph::storage { class InMemoryStorage final : public Storage { friend class memgraph::dbms::InMemoryReplicationHandlers; - friend class InMemoryReplicationClient; + friend class ReplicationStorageClient; + friend std::vector GetRecoverySteps(uint64_t replica_commit, + utils::FileRetainer::FileLocker *file_locker, + const InMemoryStorage *storage); public: enum class CreateSnapshotError : uint8_t { DisabledForReplica, ReachedMaxNumTries }; @@ -332,10 +338,6 @@ class InMemoryStorage final : public Storage { using Storage::CreateTransaction; Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode, bool is_main) override; - auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *current_epoch) - -> std::unique_ptr override; - void SetStorageMode(StorageMode storage_mode); private: diff --git a/src/storage/v2/replication/enums.hpp b/src/storage/v2/replication/enums.hpp index e89a1fdd3..be16ca192 100644 --- a/src/storage/v2/replication/enums.hpp +++ b/src/storage/v2/replication/enums.hpp @@ -14,6 +14,6 @@ namespace memgraph::storage::replication { -enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID }; +enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, MAYBE_BEHIND }; } // namespace memgraph::storage::replication diff --git a/src/storage/v2/replication/recovery.hpp b/src/storage/v2/replication/recovery.hpp new file mode 100644 index 000000000..346e03ecd --- /dev/null +++ b/src/storage/v2/replication/recovery.hpp @@ -0,0 +1,28 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include +#include + +namespace memgraph::storage { + +using RecoverySnapshot = std::filesystem::path; +using RecoveryWals = std::vector; +struct RecoveryCurrentWal { + explicit RecoveryCurrentWal(const uint64_t current_wal_seq_num) : current_wal_seq_num(current_wal_seq_num) {} + uint64_t current_wal_seq_num; +}; +using RecoveryStep = std::variant; + +} // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 33313b130..3bc1b3d32 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -9,53 +9,37 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -#include "storage/v2/replication/replication_client.hpp" +#include "replication/replication_client.hpp" +#include "storage/v2/durability/durability.hpp" +#include "storage/v2/inmemory/storage.hpp" +#include "storage/v2/storage.hpp" +#include "utils/exceptions.hpp" +#include "utils/variant_helpers.hpp" #include #include -#include "storage/v2/durability/durability.hpp" -#include "storage/v2/storage.hpp" +namespace { +template +[[maybe_unused]] inline constexpr bool always_false_v = false; +} // namespace namespace memgraph::storage { -static auto CreateClientContext(const memgraph::replication::ReplicationClientConfig &config) - -> communication::ClientContext { - return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file} - : communication::ClientContext{}; -} +ReplicationStorageClient::ReplicationStorageClient(::memgraph::replication::ReplicationClient &client) + : client_{client} {} -ReplicationClient::ReplicationClient(Storage *storage, const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *epoch) - : name_{config.name}, - rpc_context_{CreateClientContext(config)}, - rpc_client_{io::network::Endpoint(io::network::Endpoint::needs_resolving, config.ip_address, config.port), - &rpc_context_}, - replica_check_frequency_{config.replica_check_frequency}, - mode_{config.mode}, - storage_{storage}, - repl_epoch_{epoch} {} - -ReplicationClient::~ReplicationClient() { - auto endpoint = rpc_client_.Endpoint(); - spdlog::trace("Closing replication client on {}:{}", endpoint.address, endpoint.port); - thread_pool_.Shutdown(); -} - -uint64_t ReplicationClient::LastCommitTimestamp() const { - return storage_->repl_storage_state_.last_commit_timestamp_.load(); -} - -void ReplicationClient::InitializeClient() { +void ReplicationStorageClient::CheckReplicaState(Storage *storage) { uint64_t current_commit_timestamp{kTimestampInitialId}; - auto stream{rpc_client_.Stream( - storage_->id(), storage_->repl_storage_state_.last_commit_timestamp_, std::string{repl_epoch_->id()})}; + auto &replStorageState = storage->repl_storage_state_; + auto stream{client_.rpc_client_.Stream( + storage->id(), replStorageState.last_commit_timestamp_, std::string{replStorageState.epoch_.id()})}; const auto replica = stream.AwaitResponse(); std::optional branching_point; - if (replica.epoch_id != repl_epoch_->id() && replica.current_commit_timestamp != kTimestampInitialId) { - auto const &history = storage_->repl_storage_state_.history; + if (replica.epoch_id != replStorageState.epoch_.id() && replica.current_commit_timestamp != kTimestampInitialId) { + auto const &history = replStorageState.history; const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) { return main_epoch_info.first == replica.epoch_id; }); @@ -71,94 +55,86 @@ void ReplicationClient::InitializeClient() { "Replica {} acted as the Main instance. Both the Main and Replica {} " "now hold unique data. Please resolve data conflicts and start the " "replication on a clean instance.", - name_, name_, name_); + client_.name_, client_.name_, client_.name_); + // State not updated, hence in MAYBE_BEHIND state return; } current_commit_timestamp = replica.current_commit_timestamp; - spdlog::trace("Current timestamp on replica {}: {}", name_, current_commit_timestamp); - spdlog::trace("Current timestamp on main: {}", storage_->repl_storage_state_.last_commit_timestamp_.load()); - if (current_commit_timestamp == storage_->repl_storage_state_.last_commit_timestamp_.load()) { - spdlog::debug("Replica '{}' up to date", name_); - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::READY); - } else { - spdlog::debug("Replica '{}' is behind", name_); - { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::RECOVERY); + spdlog::trace("Current timestamp on replica {}: {}", client_.name_, current_commit_timestamp); + spdlog::trace("Current timestamp on main: {}", replStorageState.last_commit_timestamp_.load()); + replica_state_.WithLock([&](auto &state) { + if (current_commit_timestamp == replStorageState.last_commit_timestamp_.load()) { + spdlog::debug("Replica '{}' up to date", client_.name_); + state = replication::ReplicaState::READY; + } else { + spdlog::debug("Replica '{}' is behind", client_.name_); + state = replication::ReplicaState::RECOVERY; + client_.thread_pool_.AddTask( + [storage, current_commit_timestamp, this] { this->RecoverReplica(current_commit_timestamp, storage); }); } - thread_pool_.AddTask([=, this] { this->RecoverReplica(current_commit_timestamp); }); - } + }); } -TimestampInfo ReplicationClient::GetTimestampInfo() { +TimestampInfo ReplicationStorageClient::GetTimestampInfo(Storage const *storage) { TimestampInfo info; info.current_timestamp_of_replica = 0; info.current_number_of_timestamp_behind_master = 0; try { - auto stream{rpc_client_.Stream(storage_->id())}; + auto stream{client_.rpc_client_.Stream(storage->id())}; const auto response = stream.AwaitResponse(); const auto is_success = response.success; if (!is_success) { - replica_state_.store(replication::ReplicaState::INVALID); - HandleRpcFailure(); + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); + LogRpcFailure(); } - auto main_time_stamp = storage_->repl_storage_state_.last_commit_timestamp_.load(); + auto main_time_stamp = storage->repl_storage_state_.last_commit_timestamp_.load(); info.current_timestamp_of_replica = response.current_commit_timestamp; info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp; } catch (const rpc::RpcFailedException &) { - { - std::unique_lock client_guard(client_lock_); - replica_state_.store(replication::ReplicaState::INVALID); - } - HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't - // block + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); + LogRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably + // won't block } return info; } -void ReplicationClient::HandleRpcFailure() { - spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication")); - TryInitializeClientAsync(); +void ReplicationStorageClient::LogRpcFailure() { + spdlog::error( + utils::MessageWithLink("Couldn't replicate data to {}.", client_.name_, "https://memgr.ph/replication")); } -void ReplicationClient::TryInitializeClientAsync() { - thread_pool_.AddTask([this] { - rpc_client_.Abort(); - this->TryInitializeClientSync(); - }); +void ReplicationStorageClient::TryCheckReplicaStateAsync(Storage *storage) { + client_.thread_pool_.AddTask([storage, this] { this->TryCheckReplicaStateSync(storage); }); } -void ReplicationClient::TryInitializeClientSync() { +void ReplicationStorageClient::TryCheckReplicaStateSync(Storage *storage) { try { - InitializeClient(); + CheckReplicaState(storage); } catch (const rpc::VersionMismatchRpcFailedException &) { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); spdlog::error( utils::MessageWithLink("Failed to connect to replica {} at the endpoint {}. Because the replica " "deployed is not a compatible version.", - name_, rpc_client_.Endpoint(), "https://memgr.ph/replication")); + client_.name_, client_.rpc_client_.Endpoint(), "https://memgr.ph/replication")); } catch (const rpc::RpcFailedException &) { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); - spdlog::error(utils::MessageWithLink("Failed to connect to replica {} at the endpoint {}.", name_, - rpc_client_.Endpoint(), "https://memgr.ph/replication")); + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); + spdlog::error(utils::MessageWithLink("Failed to connect to replica {} at the endpoint {}.", client_.name_, + client_.rpc_client_.Endpoint(), "https://memgr.ph/replication")); } } -void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) { - std::unique_lock guard(client_lock_); - const auto status = replica_state_.load(); - switch (status) { - case replication::ReplicaState::RECOVERY: - spdlog::debug("Replica {} is behind MAIN instance", name_); +void ReplicationStorageClient::StartTransactionReplication(const uint64_t current_wal_seq_num, Storage *storage) { + auto locked_state = replica_state_.Lock(); + switch (*locked_state) { + using enum replication::ReplicaState; + case RECOVERY: + spdlog::debug("Replica {} is behind MAIN instance", client_.name_); return; - case replication::ReplicaState::REPLICATING: - spdlog::debug("Replica {} missed a transaction", name_); + case REPLICATING: + spdlog::debug("Replica {} missed a transaction", client_.name_); // We missed a transaction because we're still replicating // the previous transaction so we need to go to RECOVERY // state to catch up with the missing transaction @@ -166,143 +142,169 @@ void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_s // an error can happen while we're replicating the previous // transaction after which the client should go to // INVALID state before starting the recovery process - replica_state_.store(replication::ReplicaState::RECOVERY); + // + // This is a signal to any async streams that are still finalizing to start recovery, since this commit will be + // missed. + *locked_state = RECOVERY; return; - case replication::ReplicaState::INVALID: - HandleRpcFailure(); + case MAYBE_BEHIND: + spdlog::error( + utils::MessageWithLink("Couldn't replicate data to {}.", client_.name_, "https://memgr.ph/replication")); + TryCheckReplicaStateAsync(storage); return; - case replication::ReplicaState::READY: + case READY: MG_ASSERT(!replica_stream_); try { - replica_stream_.emplace( - ReplicaStream{this, storage_->repl_storage_state_.last_commit_timestamp_.load(), current_wal_seq_num}); - replica_state_.store(replication::ReplicaState::REPLICATING); + replica_stream_.emplace(storage, client_.rpc_client_, current_wal_seq_num); + *locked_state = REPLICATING; } catch (const rpc::RpcFailedException &) { - replica_state_.store(replication::ReplicaState::INVALID); - HandleRpcFailure(); + *locked_state = MAYBE_BEHIND; + LogRpcFailure(); } return; } } -bool ReplicationClient::FinalizeTransactionReplication() { +bool ReplicationStorageClient::FinalizeTransactionReplication(Storage *storage) { // We can only check the state because it guarantees to be only // valid during a single transaction replication (if the assumption // that this and other transaction replication functions can only be // called from a one thread stands) - if (replica_state_ != replication::ReplicaState::REPLICATING) { + if (State() != replication::ReplicaState::REPLICATING) { return false; } - auto task = [this]() { + if (replica_stream_->IsDefunct()) return false; + + auto task = [storage, this]() { MG_ASSERT(replica_stream_, "Missing stream for transaction deltas"); try { auto response = replica_stream_->Finalize(); - replica_stream_.reset(); - std::unique_lock client_guard(client_lock_); - if (!response.success || replica_state_ == replication::ReplicaState::RECOVERY) { - replica_state_.store(replication::ReplicaState::RECOVERY); - thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); }); - } else { - replica_state_.store(replication::ReplicaState::READY); + return replica_state_.WithLock([storage, &response, this](auto &state) { + replica_stream_.reset(); + if (!response.success || state == replication::ReplicaState::RECOVERY) { + state = replication::ReplicaState::RECOVERY; + client_.thread_pool_.AddTask( + [storage, &response, this] { this->RecoverReplica(response.current_commit_timestamp, storage); }); + return false; + } + state = replication::ReplicaState::READY; return true; - } + }); } catch (const rpc::RpcFailedException &) { - replica_stream_.reset(); - { - std::unique_lock client_guard(client_lock_); - replica_state_.store(replication::ReplicaState::INVALID); - } - HandleRpcFailure(); + replica_state_.WithLock([this](auto &state) { + replica_stream_.reset(); + state = replication::ReplicaState::MAYBE_BEHIND; + }); + LogRpcFailure(); + return false; } - return false; }; - if (mode_ == memgraph::replication::ReplicationMode::ASYNC) { - thread_pool_.AddTask([=] { (void)task(); }); + if (client_.mode_ == memgraph::replication::ReplicationMode::ASYNC) { + client_.thread_pool_.AddTask([task = std::move(task)] { (void)task(); }); return true; } return task(); } -void ReplicationClient::FrequentCheck() { - const auto is_success = std::invoke([this]() { - try { - auto stream{rpc_client_.Stream()}; - const auto response = stream.AwaitResponse(); - return response.success; - } catch (const rpc::RpcFailedException &) { - return false; - } - }); - // States: READY, REPLICATING, RECOVERY, INVALID - // If success && ready, replicating, recovery -> stay the same because something good is going on. - // If success && INVALID -> [it's possible that replica came back to life] -> TryInitializeClient. - // If fail -> [replica is not reachable at all] -> INVALID state. - // NOTE: TryInitializeClient might return nothing if there is a branching point. - // NOTE: The early return pattern simplified the code, but the behavior should be as explained. - if (!is_success) { - replica_state_.store(replication::ReplicaState::INVALID); - return; - } - if (replica_state_.load() == replication::ReplicaState::INVALID) { - TryInitializeClientAsync(); - } +void ReplicationStorageClient::Start(Storage *storage) { + spdlog::trace("Replication client started for database \"{}\"", storage->id()); + TryCheckReplicaStateSync(storage); } -void ReplicationClient::Start() { - auto const &endpoint = rpc_client_.Endpoint(); - spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port); - - TryInitializeClientSync(); - - // Help the user to get the most accurate replica state possible. - if (replica_check_frequency_ > std::chrono::seconds(0)) { - replica_checker_.Run("Replica Checker", replica_check_frequency_, [this] { this->FrequentCheck(); }); +void ReplicationStorageClient::RecoverReplica(uint64_t replica_commit, memgraph::storage::Storage *storage) { + if (storage->storage_mode_ != StorageMode::IN_MEMORY_TRANSACTIONAL) { + throw utils::BasicException("Only InMemoryTransactional mode supports replication!"); } -} + spdlog::debug("Starting replica recovery"); + auto *mem_storage = static_cast(storage); -void ReplicationClient::IfStreamingTransaction(const std::function &callback) { - // We can only check the state because it guarantees to be only - // valid during a single transaction replication (if the assumption - // that this and other transaction replication functions can only be - // called from a one thread stands) - if (replica_state_ != replication::ReplicaState::REPLICATING) { - return; - } + while (true) { + auto file_locker = mem_storage->file_retainer_.AddLocker(); - try { - callback(*replica_stream_); - } catch (const rpc::RpcFailedException &) { - { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); + const auto steps = GetRecoverySteps(replica_commit, &file_locker, mem_storage); + int i = 0; + for (const RecoveryStep &recovery_step : steps) { + spdlog::trace("Recovering in step: {}", i++); + try { + rpc::Client &rpcClient = client_.rpc_client_; + std::visit(utils::Overloaded{ + [&replica_commit, mem_storage, &rpcClient](RecoverySnapshot const &snapshot) { + spdlog::debug("Sending the latest snapshot file: {}", snapshot); + auto response = TransferSnapshot(mem_storage->id(), rpcClient, snapshot); + replica_commit = response.current_commit_timestamp; + }, + [&replica_commit, mem_storage, &rpcClient](RecoveryWals const &wals) { + spdlog::debug("Sending the latest wal files"); + auto response = TransferWalFiles(mem_storage->id(), rpcClient, wals); + replica_commit = response.current_commit_timestamp; + spdlog::debug("Wal files successfully transferred."); + }, + [&replica_commit, mem_storage, &rpcClient](RecoveryCurrentWal const ¤t_wal) { + std::unique_lock transaction_guard(mem_storage->engine_lock_); + if (mem_storage->wal_file_ && + mem_storage->wal_file_->SequenceNumber() == current_wal.current_wal_seq_num) { + mem_storage->wal_file_->DisableFlushing(); + transaction_guard.unlock(); + spdlog::debug("Sending current wal file"); + replica_commit = ReplicateCurrentWal(mem_storage, rpcClient, *mem_storage->wal_file_); + mem_storage->wal_file_->EnableFlushing(); + } else { + spdlog::debug("Cannot recover using current wal file"); + } + }, + [](auto const &in) { + static_assert(always_false_v, "Missing type from variant visitor"); + }, + }, + recovery_step); + } catch (const rpc::RpcFailedException &) { + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::MAYBE_BEHIND; }); + LogRpcFailure(); + return; + } + } + + spdlog::trace("Current timestamp on replica: {}", replica_commit); + // To avoid the situation where we read a correct commit timestamp in + // one thread, and after that another thread commits a different a + // transaction and THEN we set the state to READY in the first thread, + // we set this lock before checking the timestamp. + // We will detect that the state is invalid during the next commit, + // because replication::AppendDeltasRpc sends the last commit timestamp which + // replica checks if it's the same last commit timestamp it received + // and we will go to recovery. + // By adding this lock, we can avoid that, and go to RECOVERY immediately. + const auto last_commit_timestamp = storage->repl_storage_state_.last_commit_timestamp_.load(); + SPDLOG_INFO("Replica timestamp: {}", replica_commit); + SPDLOG_INFO("Last commit: {}", last_commit_timestamp); + if (last_commit_timestamp == replica_commit) { + replica_state_.WithLock([](auto &val) { val = replication::ReplicaState::READY; }); + return; } - HandleRpcFailure(); } } ////// ReplicaStream ////// -ReplicaStream::ReplicaStream(ReplicationClient *self, const uint64_t previous_commit_timestamp, - const uint64_t current_seq_num) - : self_(self), - stream_(self_->rpc_client_.Stream(self->GetStorageId(), previous_commit_timestamp, - current_seq_num)) { +ReplicaStream::ReplicaStream(Storage *storage, rpc::Client &rpc_client, const uint64_t current_seq_num) + : storage_{storage}, + stream_(rpc_client.Stream( + storage->id(), storage->repl_storage_state_.last_commit_timestamp_.load(), current_seq_num)) { replication::Encoder encoder{stream_.GetBuilder()}; - - encoder.WriteString(self->repl_epoch_->id()); + encoder.WriteString(storage->repl_storage_state_.epoch_.id()); } void ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp) { replication::Encoder encoder(stream_.GetBuilder()); - auto *storage = self_->GetStorage(); - EncodeDelta(&encoder, storage->name_id_mapper_.get(), storage->config_.items, delta, vertex, final_commit_timestamp); + EncodeDelta(&encoder, storage_->name_id_mapper_.get(), storage_->config_.items, delta, vertex, + final_commit_timestamp); } void ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge, uint64_t final_commit_timestamp) { replication::Encoder encoder(stream_.GetBuilder()); - EncodeDelta(&encoder, self_->GetStorage()->name_id_mapper_.get(), delta, edge, final_commit_timestamp); + EncodeDelta(&encoder, storage_->name_id_mapper_.get(), delta, edge, final_commit_timestamp); } void ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) { @@ -314,11 +316,10 @@ void ReplicaStream::AppendOperation(durability::StorageMetadataOperation operati const std::set &properties, const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, uint64_t timestamp) { replication::Encoder encoder(stream_.GetBuilder()); - EncodeOperation(&encoder, self_->GetStorage()->name_id_mapper_.get(), operation, label, properties, stats, - property_stats, timestamp); + EncodeOperation(&encoder, storage_->name_id_mapper_.get(), operation, label, properties, stats, property_stats, + timestamp); } replication::AppendDeltasRes ReplicaStream::Finalize() { return stream_.AwaitResponse(); } -auto ReplicationClient::GetStorageId() const -> std::string { return storage_->id(); } } // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 8cd8cb384..3d2c019e9 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -13,6 +13,8 @@ #include "replication/config.hpp" #include "replication/epoch.hpp" +#include "replication/messages.hpp" +#include "replication/replication_client.hpp" #include "rpc/client.hpp" #include "storage/v2/durability/storage_global_operation.hpp" #include "storage/v2/id_types.hpp" @@ -23,9 +25,12 @@ #include "storage/v2/replication/rpc.hpp" #include "utils/file_locker.hpp" #include "utils/scheduler.hpp" +#include "utils/synchronized.hpp" #include "utils/thread_pool.hpp" #include +#include +#include #include #include #include @@ -37,12 +42,12 @@ struct Delta; struct Vertex; struct Edge; class Storage; -class ReplicationClient; +class ReplicationStorageClient; // Handler used for transferring the current transaction. class ReplicaStream { public: - explicit ReplicaStream(ReplicationClient *self, uint64_t previous_commit_timestamp, uint64_t current_seq_num); + explicit ReplicaStream(Storage *storage, rpc::Client &rpc_client, uint64_t current_seq_num); /// @throw rpc::RpcFailedException void AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp); @@ -61,85 +66,84 @@ class ReplicaStream { /// @throw rpc::RpcFailedException replication::AppendDeltasRes Finalize(); + bool IsDefunct() const { return stream_.IsDefunct(); } + private: - ReplicationClient *self_; + Storage *storage_; rpc::Client::StreamHandler stream_; }; -class ReplicationClient { - friend class CurrentWalHandler; +template +concept InvocableWithStream = std::invocable; + +// TODO Rename to something without the word "client" +class ReplicationStorageClient { + friend class InMemoryCurrentWalHandler; friend class ReplicaStream; + friend struct ::memgraph::replication::ReplicationClient; public: - ReplicationClient(Storage *storage, const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *epoch); + explicit ReplicationStorageClient(::memgraph::replication::ReplicationClient &client); - ReplicationClient(ReplicationClient const &) = delete; - ReplicationClient &operator=(ReplicationClient const &) = delete; - ReplicationClient(ReplicationClient &&) noexcept = delete; - ReplicationClient &operator=(ReplicationClient &&) noexcept = delete; + ReplicationStorageClient(ReplicationStorageClient const &) = delete; + ReplicationStorageClient &operator=(ReplicationStorageClient const &) = delete; + ReplicationStorageClient(ReplicationStorageClient &&) noexcept = delete; + ReplicationStorageClient &operator=(ReplicationStorageClient &&) noexcept = delete; - virtual ~ReplicationClient(); + ~ReplicationStorageClient() = default; - auto Mode() const -> memgraph::replication::ReplicationMode { return mode_; } - auto Name() const -> std::string const & { return name_; } - auto Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); } - auto State() const -> replication::ReplicaState { return replica_state_.load(); } - auto GetTimestampInfo() -> TimestampInfo; + // TODO Remove the client related functions + auto Mode() const -> memgraph::replication::ReplicationMode { return client_.mode_; } + auto Name() const -> std::string const & { return client_.name_; } + auto Endpoint() const -> io::network::Endpoint const & { return client_.rpc_client_.Endpoint(); } - auto GetStorageId() const -> std::string; + auto State() const -> replication::ReplicaState { return replica_state_.WithLock(std::identity()); } + auto GetTimestampInfo(Storage const *storage) -> TimestampInfo; + + void Start(Storage *storage); + void StartTransactionReplication(uint64_t current_wal_seq_num, Storage *storage); - void Start(); - void StartTransactionReplication(uint64_t current_wal_seq_num); // Replication clients can be removed at any point // so to avoid any complexity of checking if the client was removed whenever // we want to send part of transaction and to avoid adding some GC logic this // function will run a callback if, after previously callling // StartTransactionReplication, stream is created. - void IfStreamingTransaction(const std::function &callback); + template + void IfStreamingTransaction(F &&callback) { + // We can only check the state because it guarantees to be only + // valid during a single transaction replication (if the assumption + // that this and other transaction replication functions can only be + // called from a one thread stands) + if (State() != replication::ReplicaState::REPLICATING) { + return; + } + if (replica_stream_->IsDefunct()) return; + try { + callback(*replica_stream_); // failure state what if not streaming (std::nullopt) + } catch (const rpc::RpcFailedException &) { + return replica_state_.WithLock([](auto &state) { state = replication::ReplicaState::MAYBE_BEHIND; }); + LogRpcFailure(); + } + } + // Return whether the transaction could be finalized on the replication client or not. - [[nodiscard]] bool FinalizeTransactionReplication(); + [[nodiscard]] bool FinalizeTransactionReplication(Storage *storage); - protected: - virtual void RecoverReplica(uint64_t replica_commit) = 0; + void TryCheckReplicaStateAsync(Storage *storage); // TODO Move back to private + private: + void RecoverReplica(uint64_t replica_commit, memgraph::storage::Storage *storage); - auto GetStorage() -> Storage * { return storage_; } - auto LastCommitTimestamp() const -> uint64_t; - void InitializeClient(); - void HandleRpcFailure(); - void TryInitializeClientAsync(); - void TryInitializeClientSync(); - void FrequentCheck(); + void CheckReplicaState(Storage *storage); + void LogRpcFailure(); + void TryCheckReplicaStateSync(Storage *storage); + void FrequentCheck(Storage *storage); - std::string name_; - communication::ClientContext rpc_context_; - rpc::Client rpc_client_; - std::chrono::seconds replica_check_frequency_; - - std::optional replica_stream_; - memgraph::replication::ReplicationMode mode_{memgraph::replication::ReplicationMode::SYNC}; - - utils::SpinLock client_lock_; - // This thread pool is used for background tasks so we don't - // block the main storage thread - // We use only 1 thread for 2 reasons: - // - background tasks ALWAYS contain some kind of RPC communication. - // We can't have multiple RPC communication from a same client - // because that's not logically valid (e.g. you cannot send a snapshot - // and WAL at a same time because WAL will arrive earlier and be applied - // before the snapshot which is not correct) - // - the implementation is simplified as we have a total control of what - // this pool is executing. Also, we can simply queue multiple tasks - // and be sure of the execution order. - // Not having mulitple possible threads in the same client allows us - // to ignore concurrency problems inside the client. - utils::ThreadPool thread_pool_{1}; - std::atomic replica_state_{replication::ReplicaState::INVALID}; - - utils::Scheduler replica_checker_; - Storage *storage_; - - memgraph::replication::ReplicationEpoch const *repl_epoch_; + ::memgraph::replication::ReplicationClient &client_; + // TODO Do not store the stream, make is a local variable + std::optional + replica_stream_; // Currently active stream (nullopt if not in use), note: a single stream per rpc client + mutable utils::Synchronized replica_state_{ + replication::ReplicaState::MAYBE_BEHIND}; }; } // namespace memgraph::storage diff --git a/src/storage/v2/replication/replication_storage_state.cpp b/src/storage/v2/replication/replication_storage_state.cpp index 1cd0bec09..a443c7171 100644 --- a/src/storage/v2/replication/replication_storage_state.cpp +++ b/src/storage/v2/replication/replication_storage_state.cpp @@ -16,10 +16,10 @@ namespace memgraph::storage { -void ReplicationStorageState::InitializeTransaction(uint64_t seq_num) { - replication_clients_.WithLock([&](auto &clients) { +void ReplicationStorageState::InitializeTransaction(uint64_t seq_num, Storage *storage) { + replication_clients_.WithLock([=](auto &clients) { for (auto &client : clients) { - client->StartTransactionReplication(seq_num); + client->StartTransactionReplication(seq_num, storage); } }); } @@ -52,12 +52,12 @@ void ReplicationStorageState::AppendOperation(durability::StorageMetadataOperati }); } -bool ReplicationStorageState::FinalizeTransaction(uint64_t timestamp) { +bool ReplicationStorageState::FinalizeTransaction(uint64_t timestamp, Storage *storage) { return replication_clients_.WithLock([=](auto &clients) { bool finalized_on_all_replicas = true; for (ReplicationClientPtr &client : clients) { client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(timestamp); }); - const auto finalized = client->FinalizeTransactionReplication(); + const auto finalized = client->FinalizeTransactionReplication(storage); if (client->Mode() == memgraph::replication::ReplicationMode::SYNC) { finalized_on_all_replicas = finalized && finalized_on_all_replicas; @@ -78,12 +78,12 @@ std::optional ReplicationStorageState::GetReplicaStat }); } -std::vector ReplicationStorageState::ReplicasInfo() const { - return replication_clients_.WithReadLock([](auto const &clients) { +std::vector ReplicationStorageState::ReplicasInfo(const Storage *storage) const { + return replication_clients_.WithReadLock([storage](auto const &clients) { std::vector replica_infos; replica_infos.reserve(clients.size()); - auto const asReplicaInfo = [](ReplicationClientPtr const &client) -> ReplicaInfo { - return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()}; + auto const asReplicaInfo = [storage](ReplicationClientPtr const &client) -> ReplicaInfo { + return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo(storage)}; }; std::transform(clients.begin(), clients.end(), std::back_inserter(replica_infos), asReplicaInfo); return replica_infos; diff --git a/src/storage/v2/replication/replication_storage_state.hpp b/src/storage/v2/replication/replication_storage_state.hpp index afedb3950..e3d6b94a0 100644 --- a/src/storage/v2/replication/replication_storage_state.hpp +++ b/src/storage/v2/replication/replication_storage_state.hpp @@ -12,11 +12,13 @@ #pragma once #include +#include #include "kvstore/kvstore.hpp" #include "storage/v2/delta.hpp" #include "storage/v2/durability/storage_global_operation.hpp" #include "storage/v2/transaction.hpp" +#include "utils/exceptions.hpp" #include "utils/result.hpp" /// REPLICATION /// @@ -33,21 +35,21 @@ namespace memgraph::storage { class Storage; -class ReplicationClient; +class ReplicationStorageClient; struct ReplicationStorageState { // Only MAIN can send - void InitializeTransaction(uint64_t seq_num); + void InitializeTransaction(uint64_t seq_num, Storage *storage); void AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t timestamp); void AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp); void AppendOperation(durability::StorageMetadataOperation operation, LabelId label, const std::set &properties, const LabelIndexStats &stats, const LabelPropertyIndexStats &property_stats, uint64_t final_commit_timestamp); - bool FinalizeTransaction(uint64_t timestamp); + bool FinalizeTransaction(uint64_t timestamp, Storage *storage); // Getters auto GetReplicaState(std::string_view name) const -> std::optional; - auto ReplicasInfo() const -> std::vector; + auto ReplicasInfo(const Storage *storage) const -> std::vector; // History void TrackLatestHistory(); @@ -55,6 +57,19 @@ struct ReplicationStorageState { void Reset(); + template + bool WithClient(std::string_view replica_name, F &&callback) { + return replication_clients_.WithLock([replica_name, cb = std::forward(callback)](auto &clients) { + for (const auto &client : clients) { + if (client->Name() == replica_name) { + cb(client.get()); + return true; + } + } + return false; + }); + } + // Questions: // - storage durability <- databases/*name*/wal and snapshots (where this for epoch_id) // - multi-tenant durability <- databases/.durability (there is a list of all active tenants) @@ -74,7 +89,7 @@ struct ReplicationStorageState { // This way we can initialize client in main thread which means // that we can immediately notify the user if the initialization // failed. - using ReplicationClientPtr = std::unique_ptr; + using ReplicationClientPtr = std::unique_ptr; using ReplicationClientList = utils::Synchronized, utils::RWSpinLock>; ReplicationClientList replication_clients_; diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 8d8b06cd6..4142da3ca 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -109,7 +109,7 @@ struct EdgeInfoForDeletion { class Storage { friend class ReplicationServer; - friend class ReplicationClient; + friend class ReplicationStorageClient; public: Storage(Config config, StorageMode storage_mode); @@ -355,11 +355,7 @@ class Storage { virtual void PrepareForNewEpoch() = 0; - virtual auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config, - const memgraph::replication::ReplicationEpoch *current_epoch) - -> std::unique_ptr = 0; - - auto ReplicasInfo() const { return repl_storage_state_.ReplicasInfo(); } + auto ReplicasInfo() const { return repl_storage_state_.ReplicasInfo(this); } auto GetReplicaState(std::string_view name) const -> std::optional { return repl_storage_state_.GetReplicaState(name); } @@ -384,7 +380,7 @@ class Storage { Config config_; // Transaction engine - utils::SpinLock engine_lock_; + mutable utils::SpinLock engine_lock_; uint64_t timestamp_{kTimestampInitialId}; uint64_t transaction_id_{kTransactionInitialId}; diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index 996955dc1..74dcbce74 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -123,6 +123,143 @@ def test_show_replicas(connection): assert actual_data == expected_data +def test_drop_replicas(connection): + # Goal of this test is to check the DROP REPLICAS command. + # 0/ Manually start main and all replicas + # 1/ Check status of the replicas + # 2/ Kill replica 3 + # 3/ Drop replica 3 and check status + # 4/ Stop replica 4 + # 5/ Drop replica 4 and check status + # 6/ Kill replica 1 + # 7/ Drop replica 1 and check status + # 8/ Stop replica 2 + # 9/ Drop replica 2 and check status + # 10/ Restart all replicas + # 11/ Register them + # 12/ Drop all and check status + + def retrieve_data(): + return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + # 0/ + interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION) + + cursor = connection(7687, "main").cursor() + + # 1/ + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + EXPECTED_COLUMN_NAMES = { + "name", + "socket_address", + "sync_mode", + "current_timestamp_of_replica", + "number_of_timestamp_behind_master", + "state", + } + + actual_column_names = {x.name for x in cursor.description} + assert actual_column_names == EXPECTED_COLUMN_NAMES + + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 2/ + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_3") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "invalid"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 3/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_3") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 4/ + interactive_mg_runner.stop(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_4") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "invalid"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 5/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_4") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 6/ + interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1") + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 7/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_1") + expected_data = { + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 8/ + interactive_mg_runner.stop(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_2") + expected_data = { + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 9/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_2") + expected_data = set() + mg_sleep_and_assert(expected_data, retrieve_data) + + # 10/ + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_2") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_3") + interactive_mg_runner.start(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_4") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';") + execute_and_fetch_all(cursor, "REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';") + + # 11/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"), + } + mg_sleep_and_assert(expected_data, retrieve_data) + + # 12/ + execute_and_fetch_all(cursor, "DROP REPLICA replica_1") + execute_and_fetch_all(cursor, "DROP REPLICA replica_2") + execute_and_fetch_all(cursor, "DROP REPLICA replica_3") + execute_and_fetch_all(cursor, "DROP REPLICA replica_4") + expected_data = set() + mg_sleep_and_assert(expected_data, retrieve_data) + + def test_basic_recovery(connection): # Goal of this test is to check the recovery of main. # 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. @@ -630,10 +767,26 @@ def test_async_replication_when_main_is_killed(): ) # 2/ - for index in range(50): + # First make sure that anything has been replicated + for index in range(0, 5): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(f"CREATE (p:Number {{name:{index}}})") + expected_data = [("async_replica", "127.0.0.1:10001", "async", "ready")] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, ip, mode, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + for index in range(5, 50): interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(f"CREATE (p:Number {{name:{index}}})") if random.randint(0, 100) > 95: main_killed = f"Main was killed at index={index}" + print(main_killed) interactive_mg_runner.kill(CONFIGURATION, "main") break diff --git a/tests/integration/telemetry/client.cpp b/tests/integration/telemetry/client.cpp index 558e0a6bc..34e1c2a67 100644 --- a/tests/integration/telemetry/client.cpp +++ b/tests/integration/telemetry/client.cpp @@ -41,7 +41,7 @@ int main(int argc, char **argv) { memgraph::storage::UpdatePaths(db_config, data_directory); memgraph::replication::ReplicationState repl_state(ReplicationStateRootPath(db_config)); - memgraph::dbms::DbmsHandler dbms_handler(db_config, repl_state + memgraph::dbms::DbmsHandler dbms_handler(db_config #ifdef MG_ENTERPRISE , &auth_, false, false diff --git a/tests/unit/dbms_handler.cpp b/tests/unit/dbms_handler.cpp index 75efddefe..a811e4159 100644 --- a/tests/unit/dbms_handler.cpp +++ b/tests/unit/dbms_handler.cpp @@ -52,18 +52,15 @@ class TestEnvironment : public ::testing::Environment { auth = std::make_unique>( storage_directory / "auth"); - repl_state_.emplace(memgraph::storage::ReplicationStateRootPath(storage_conf)); - ptr_ = std::make_unique(storage_conf, *repl_state_, auth.get(), false, true); + ptr_ = std::make_unique(storage_conf, auth.get(), false, true); } void TearDown() override { ptr_.reset(); auth.reset(); - repl_state_.reset(); } static std::unique_ptr ptr_; - std::optional repl_state_; }; std::unique_ptr TestEnvironment::ptr_ = nullptr; diff --git a/tests/unit/dbms_handler_community.cpp b/tests/unit/dbms_handler_community.cpp index efce2854d..3848cd347 100644 --- a/tests/unit/dbms_handler_community.cpp +++ b/tests/unit/dbms_handler_community.cpp @@ -52,18 +52,15 @@ class TestEnvironment : public ::testing::Environment { auth = std::make_unique>( storage_directory / "auth"); - repl_state_.emplace(memgraph::storage::ReplicationStateRootPath(storage_conf)); - ptr_ = std::make_unique(storage_conf, *repl_state_); + ptr_ = std::make_unique(storage_conf); } void TearDown() override { ptr_.reset(); auth.reset(); - repl_state_.reset(); } static std::unique_ptr ptr_; - std::optional repl_state_; }; std::unique_ptr TestEnvironment::ptr_ = nullptr; diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index 261b2ccf0..f07130c4a 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -102,8 +102,7 @@ class ReplicationTest : public ::testing::Test { struct MinMemgraph { MinMemgraph(const memgraph::storage::Config &conf) - : repl_state{ReplicationStateRootPath(conf)}, - dbms{conf, repl_state + : dbms{conf #ifdef MG_ENTERPRISE , reinterpret_cast< @@ -111,11 +110,12 @@ struct MinMemgraph { true, false #endif }, + repl_state{dbms.ReplicationState()}, db{*dbms.Get().get()}, - repl_handler(repl_state, dbms) { + repl_handler(dbms) { } - memgraph::replication::ReplicationState repl_state; memgraph::dbms::DbmsHandler dbms; + memgraph::replication::ReplicationState &repl_state; memgraph::dbms::Database &db; ReplicationHandler repl_handler; }; @@ -130,14 +130,13 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { .port = ports[0], }); - ASSERT_FALSE(main.repl_handler - .RegisterReplica(ReplicationClientConfig{ - .name = "REPLICA", - .mode = ReplicationMode::SYNC, - .ip_address = local_host, - .port = ports[0], - }) - .HasError()); + const auto ® = main.repl_handler.RegisterReplica(ReplicationClientConfig{ + .name = "REPLICA", + .mode = ReplicationMode::SYNC, + .ip_address = local_host, + .port = ports[0], + }); + ASSERT_FALSE(reg.HasError()) << (int)reg.GetError(); // vertex create // vertex add label @@ -966,14 +965,14 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) { .ip_address = local_host, .port = ports[0], }); - ASSERT_FALSE(res.HasError()); + ASSERT_FALSE(res.HasError()) << (int)res.GetError(); res = main->repl_handler.RegisterReplica(ReplicationClientConfig{ .name = replicas[1], .mode = ReplicationMode::SYNC, .ip_address = local_host, .port = ports[1], }); - ASSERT_FALSE(res.HasError()); + ASSERT_FALSE(res.HasError()) << (int)res.GetError(); auto replica_infos = main->db.storage()->ReplicasInfo();