diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 3ad5556c1..6d730a137 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -891,7 +891,7 @@ int main(int argc, char **argv) { .wal_file_size_kibibytes = FLAGS_storage_wal_file_size_kib, .wal_file_flush_every_n_tx = FLAGS_storage_wal_file_flush_every_n_tx, .snapshot_on_exit = FLAGS_storage_snapshot_on_exit, - .restore_replicas_on_startup = true, + .restore_replication_state_on_startup = true, .items_per_batch = FLAGS_storage_items_per_batch, .recovery_thread_count = FLAGS_storage_recovery_thread_count, .allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery}, diff --git a/src/query/constants.hpp b/src/query/constants.hpp index 5a563524d..55b1eebea 100644 --- a/src/query/constants.hpp +++ b/src/query/constants.hpp @@ -14,8 +14,6 @@ #include namespace memgraph::query { -inline constexpr uint16_t kDefaultReplicationPort = 10000; -inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; inline const std::string kAsterisk = "*"; inline constexpr uint16_t kDeleteStatisticsNumResults = 6; } // namespace memgraph::query diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index e3753923f..f7d45b3b5 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -176,7 +176,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { throw QueryRuntimeException("Port number invalid!"); } if (!db_->SetReplicaRole( - io::network::Endpoint(query::kDefaultReplicationServerIp, static_cast(*port)))) { + io::network::Endpoint(storage::replication::kDefaultReplicationServerIp, static_cast(*port)))) { throw QueryRuntimeException("Couldn't set role to replica!"); } } @@ -185,9 +185,9 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. ReplicationQuery::ReplicationRole ShowReplicationRole() const override { switch (db_->GetReplicationRole()) { - case storage::ReplicationRole::MAIN: + case storage::replication::ReplicationRole::MAIN: return ReplicationQuery::ReplicationRole::MAIN; - case storage::ReplicationRole::REPLICA: + case storage::replication::ReplicationRole::REPLICA: return ReplicationQuery::ReplicationRole::REPLICA; } throw QueryRuntimeException("Couldn't show replication role - invalid role set!"); @@ -197,11 +197,15 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { void RegisterReplica(const std::string &name, const std::string &socket_address, const ReplicationQuery::SyncMode sync_mode, const std::chrono::seconds replica_check_frequency) override { - if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) { + if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) { // replica can't register another replica throw QueryRuntimeException("Replica can't register another replica!"); } + if (name == storage::replication::kReservedReplicationRoleName) { + throw QueryRuntimeException("This replica name is reserved and can not be used as replica name!"); + } + storage::replication::ReplicationMode repl_mode; switch (sync_mode) { case ReplicationQuery::SyncMode::ASYNC: { @@ -215,7 +219,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { } auto maybe_ip_and_port = - io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort); + io::network::Endpoint::ParseSocketOrIpAddress(socket_address, storage::replication::kDefaultReplicationPort); if (maybe_ip_and_port) { auto [ip, port] = *maybe_ip_and_port; auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode, @@ -231,7 +235,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. void DropReplica(const std::string &replica_name) override { - if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) { + if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) { // replica can't unregister a replica throw QueryRuntimeException("Replica can't unregister a replica!"); } @@ -242,7 +246,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler { using Replica = ReplicationQueryHandler::Replica; std::vector ShowReplicas() const override { - if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) { + if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) { // replica can't show registered replicas (it shouldn't have any) throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!"); } @@ -2982,7 +2986,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, UpdateTypeCount(rw_type); if (const auto query_type = query_execution->prepared_query->rw_type; - interpreter_context_->db->GetReplicationRole() == storage::ReplicationRole::REPLICA && + interpreter_context_->db->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA && (query_type == RWType::W || query_type == RWType::RW)) { query_execution = nullptr; throw QueryException("Write query forbidden on the replica!"); diff --git a/src/storage/v2/config.hpp b/src/storage/v2/config.hpp index 126cbc10c..3b1beec92 100644 --- a/src/storage/v2/config.hpp +++ b/src/storage/v2/config.hpp @@ -49,7 +49,7 @@ struct Config { uint64_t wal_file_flush_every_n_tx{100000}; bool snapshot_on_exit{false}; - bool restore_replicas_on_startup{false}; + bool restore_replication_state_on_startup{false}; uint64_t items_per_batch{1'000'000}; uint64_t recovery_thread_count{8}; diff --git a/src/storage/v2/replication/enums.hpp b/src/storage/v2/replication/enums.hpp index 133fd9b74..bbe8ffc62 100644 --- a/src/storage/v2/replication/enums.hpp +++ b/src/storage/v2/replication/enums.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -13,6 +13,8 @@ #include namespace memgraph::storage::replication { +enum class ReplicationRole : uint8_t { MAIN, REPLICA }; + enum class ReplicationMode : std::uint8_t { SYNC, ASYNC }; enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID }; diff --git a/src/storage/v2/replication/replication_persistence_helper.cpp b/src/storage/v2/replication/replication_persistence_helper.cpp index f05848cea..262938a82 100644 --- a/src/storage/v2/replication/replication_persistence_helper.cpp +++ b/src/storage/v2/replication/replication_persistence_helper.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -10,21 +10,24 @@ // licenses/APL.txt. #include "storage/v2/replication/replication_persistence_helper.hpp" + +#include "storage/v2/replication/enums.hpp" #include "utils/logging.hpp" namespace { -const std::string kReplicaName = "replica_name"; -const std::string kIpAddress = "replica_ip_address"; -const std::string kPort = "replica_port"; -const std::string kSyncMode = "replica_sync_mode"; -const std::string kCheckFrequency = "replica_check_frequency"; -const std::string kSSLKeyFile = "replica_ssl_key_file"; -const std::string kSSLCertFile = "replica_ssl_cert_file"; +inline constexpr auto *kReplicaName = "replica_name"; +inline constexpr auto *kIpAddress = "replica_ip_address"; +inline constexpr auto *kPort = "replica_port"; +inline constexpr auto *kSyncMode = "replica_sync_mode"; +inline constexpr auto *kCheckFrequency = "replica_check_frequency"; +inline constexpr auto *kSSLKeyFile = "replica_ssl_key_file"; +inline constexpr auto *kSSLCertFile = "replica_ssl_cert_file"; +inline constexpr auto *kReplicationRole = "replication_role"; } // namespace namespace memgraph::storage::replication { -nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status) { +nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status) { auto data = nlohmann::json::object(); data[kReplicaName] = std::move(status.name); @@ -42,11 +45,15 @@ nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status) { data[kSSLCertFile] = nullptr; } + if (status.role.has_value()) { + data[kReplicationRole] = *status.role; + } + return data; } -std::optional JSONToReplicaStatus(nlohmann::json &&data) { - ReplicaStatus replica_status; +std::optional JSONToReplicationStatus(nlohmann::json &&data) { + ReplicationStatus replica_status; const auto get_failed_message = [](const std::string_view message, const std::string_view nested_message) { return fmt::format("Failed to deserialize replica's configuration: {} : {}", message, nested_message); @@ -70,6 +77,11 @@ std::optional JSONToReplicaStatus(nlohmann::json &&data) { data.at(kSSLKeyFile).get_to(replica_status.ssl->key_file); data.at(kSSLCertFile).get_to(replica_status.ssl->cert_file); } + + if (data.find(kReplicationRole) != data.end()) { + replica_status.role = replication::ReplicationRole::MAIN; + data.at(kReplicationRole).get_to(replica_status.role.value()); + } } catch (const nlohmann::json::type_error &exception) { spdlog::error(get_failed_message("Invalid type conversion", exception.what())); return std::nullopt; diff --git a/src/storage/v2/replication/replication_persistence_helper.hpp b/src/storage/v2/replication/replication_persistence_helper.hpp index c22164e33..df6cb03b9 100644 --- a/src/storage/v2/replication/replication_persistence_helper.hpp +++ b/src/storage/v2/replication/replication_persistence_helper.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -23,18 +23,23 @@ namespace memgraph::storage::replication { -struct ReplicaStatus { +inline constexpr auto *kReservedReplicationRoleName{"__replication_role"}; +inline constexpr uint16_t kDefaultReplicationPort = 10000; +inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0"; + +struct ReplicationStatus { std::string name; std::string ip_address; uint16_t port; ReplicationMode sync_mode; std::chrono::seconds replica_check_frequency; std::optional ssl; + std::optional role; - friend bool operator==(const ReplicaStatus &, const ReplicaStatus &) = default; + friend bool operator==(const ReplicationStatus &, const ReplicationStatus &) = default; }; -nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status); +nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status); -std::optional JSONToReplicaStatus(nlohmann::json &&data); +std::optional JSONToReplicationStatus(nlohmann::json &&data); } // namespace memgraph::storage::replication diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 11ea723ea..b450891e3 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -335,13 +335,6 @@ Storage::Storage(Config config) uuid_(utils::GenerateUUID()), epoch_id_(utils::GenerateUUID()), global_locker_(file_retainer_.AddLocker()) { - if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED && - replication_role_ == ReplicationRole::MAIN) { - spdlog::warn( - "The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider " - "enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because " - "without write-ahead logs this instance is not replicating any data."); - } if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED || config_.durability.snapshot_on_exit || config_.durability.recover_on_startup) { // Create the directory initially to crash the database in case of @@ -437,14 +430,29 @@ Storage::Storage(Config config) commit_log_.emplace(timestamp_); } - if (config_.durability.restore_replicas_on_startup) { - spdlog::info("Replica's configuration will be stored and will be automatically restored in case of a crash."); + if (config_.durability.restore_replication_state_on_startup) { + spdlog::info("Replication configuration will be stored and will be automatically restored in case of a crash."); utils::EnsureDirOrDie(config_.durability.storage_directory / durability::kReplicationDirectory); storage_ = std::make_unique(config_.durability.storage_directory / durability::kReplicationDirectory); - RestoreReplicas(); + + RestoreReplicationRole(); + + if (replication_role_ == replication::ReplicationRole::MAIN) { + RestoreReplicas(); + } } else { - spdlog::warn("Replicas' configuration will NOT be stored. When the server restarts, replicas will be forgotten."); + spdlog::warn( + "Replicastion configuration will NOT be stored. When the server restarts, replication state will be " + "forgotten."); + } + + if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED && + replication_role_ == replication::ReplicationRole::MAIN) { + spdlog::warn( + "The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider " + "enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because " + "without write-ahead logs this instance is not replicating any data."); } } @@ -968,7 +976,7 @@ utils::BasicResult Storage::Accessor::Commit // modifications before they are written to disk. // Replica can log only the write transaction received from Main // so the Wal files are consistent - if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { + if (storage_->replication_role_ == replication::ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { could_replicate_all_sync_replicas = storage_->AppendToWalDataManipulation(transaction_, *commit_timestamp_); } @@ -982,7 +990,8 @@ utils::BasicResult Storage::Accessor::Commit transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release); // Replica can only update the last commit timestamp with // the commits received from main. - if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { + if (storage_->replication_role_ == replication::ReplicationRole::MAIN || + desired_commit_timestamp.has_value()) { // Update the last commit timestamp storage_->last_commit_timestamp_.store(*commit_timestamp_); } @@ -1447,7 +1456,7 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level, StorageMo // of any query on replica to the last commited transaction // which is timestamp_ as only commit of transaction with writes // can change the value of it. - if (replication_role_ == ReplicationRole::REPLICA) { + if (replication_role_ == replication::ReplicationRole::REPLICA) { start_timestamp = timestamp_; } else { start_timestamp = timestamp_++; @@ -1752,7 +1761,7 @@ bool Storage::AppendToWalDataManipulation(const Transaction &transaction, uint64 // A single transaction will always be contained in a single WAL file. auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire); - if (replication_role_.load() == ReplicationRole::MAIN) { + if (replication_role_.load() == replication::ReplicationRole::MAIN) { replication_clients_.WithLock([&](auto &clients) { for (auto &client : clients) { client->StartTransactionReplication(wal_file_->SequenceNumber()); @@ -1940,7 +1949,7 @@ bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation opera auto finalized_on_all_replicas = true; wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp); { - if (replication_role_.load() == ReplicationRole::MAIN) { + if (replication_role_.load() == replication::ReplicationRole::MAIN) { replication_clients_.WithLock([&](auto &clients) { for (auto &client : clients) { client->StartTransactionReplication(wal_file_->SequenceNumber()); @@ -1960,7 +1969,7 @@ bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation opera } utils::BasicResult Storage::CreateSnapshot(std::optional is_periodic) { - if (replication_role_.load() != ReplicationRole::MAIN) { + if (replication_role_.load() != replication::ReplicationRole::MAIN) { return CreateSnapshotError::DisabledForReplica; } @@ -2054,20 +2063,38 @@ uint64_t Storage::CommitTimestamp(const std::optional desired_commit_t bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) { // We don't want to restart the server if we're already a REPLICA - if (replication_role_ == ReplicationRole::REPLICA) { + if (replication_role_ == replication::ReplicationRole::REPLICA) { return false; } + auto port = endpoint.port; // assigning because we will move the endpoint replication_server_ = std::make_unique(this, std::move(endpoint), config); - replication_role_.store(ReplicationRole::REPLICA); + if (ShouldStoreAndRestoreReplicationState()) { + // Only thing that matters here is the role saved as REPLICA and the listening port + auto data = replication::ReplicationStatusToJSON( + replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName, + .ip_address = "", + .port = port, + .sync_mode = replication::ReplicationMode::SYNC, + .replica_check_frequency = std::chrono::seconds(0), + .ssl = std::nullopt, + .role = replication::ReplicationRole::REPLICA}); + + if (!storage_->Put(replication::kReservedReplicationRoleName, data.dump())) { + spdlog::error("Error when saving REPLICA replication role in settings."); + return false; + } + } + + replication_role_.store(replication::ReplicationRole::REPLICA); return true; } bool Storage::SetMainReplicationRole() { // We don't want to generate new epoch_id and do the // cleanup if we're already a MAIN - if (replication_role_ == ReplicationRole::MAIN) { + if (replication_role_ == replication::ReplicationRole::MAIN) { return false; } @@ -2090,14 +2117,33 @@ bool Storage::SetMainReplicationRole() { epoch_id_ = utils::GenerateUUID(); } - replication_role_.store(ReplicationRole::MAIN); + if (ShouldStoreAndRestoreReplicationState()) { + // Only thing that matters here is the role saved as MAIN + auto data = replication::ReplicationStatusToJSON( + replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName, + .ip_address = "", + .port = 0, + .sync_mode = replication::ReplicationMode::SYNC, + .replica_check_frequency = std::chrono::seconds(0), + .ssl = std::nullopt, + .role = replication::ReplicationRole::MAIN}); + + if (!storage_->Put(replication::kReservedReplicationRoleName, data.dump())) { + spdlog::error("Error when saving MAIN replication role in settings."); + return false; + } + } + + replication_role_.store(replication::ReplicationRole::MAIN); + return true; } utils::BasicResult Storage::RegisterReplica( std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode, const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) { - MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!"); + MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN, + "Only main instance can register a replica!"); const bool name_exists = replication_clients_.WithLock([&](auto &clients) { return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; }); @@ -2116,14 +2162,15 @@ utils::BasicResult Storage::RegisterReplica( return RegisterReplicaError::END_POINT_EXISTS; } - if (ShouldStoreAndRestoreReplicas()) { - auto data = replication::ReplicaStatusToJSON( - replication::ReplicaStatus{.name = name, - .ip_address = endpoint.address, - .port = endpoint.port, - .sync_mode = replication_mode, - .replica_check_frequency = config.replica_check_frequency, - .ssl = config.ssl}); + if (ShouldStoreAndRestoreReplicationState()) { + auto data = replication::ReplicationStatusToJSON( + replication::ReplicationStatus{.name = name, + .ip_address = endpoint.address, + .port = endpoint.port, + .sync_mode = replication_mode, + .replica_check_frequency = config.replica_check_frequency, + .ssl = config.ssl, + .role = replication::ReplicationRole::REPLICA}); if (!storage_->Put(name, data.dump())) { spdlog::error("Error when saving replica {} in settings.", name); return RegisterReplicaError::COULD_NOT_BE_PERSISTED; @@ -2159,8 +2206,9 @@ utils::BasicResult Storage::RegisterReplica( } bool Storage::UnregisterReplica(const std::string &name) { - MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!"); - if (ShouldStoreAndRestoreReplicas()) { + MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN, + "Only main instance can unregister a replica!"); + if (ShouldStoreAndRestoreReplicationState()) { if (!storage_->Delete(name)) { spdlog::error("Error when removing replica {} from settings.", name); return false; @@ -2183,7 +2231,7 @@ std::optional Storage::GetReplicaState(const std::str }); } -ReplicationRole Storage::GetReplicationRole() const { return replication_role_; } +replication::ReplicationRole Storage::GetReplicationRole() const { return replication_role_; } std::vector Storage::ReplicasInfo() { return replication_clients_.WithLock([](auto &clients) { @@ -2207,6 +2255,46 @@ utils::BasicResult Storage::SetIsolationLevel(I return {}; } +void Storage::RestoreReplicationRole() { + if (!ShouldStoreAndRestoreReplicationState()) { + return; + } + + spdlog::info("Restoring replication role."); + + uint16_t port = replication::kDefaultReplicationPort; + for (const auto &[replica_name, replica_data] : *storage_) { + const auto maybe_replica_status = replication::JSONToReplicationStatus(nlohmann::json::parse(replica_data)); + if (!maybe_replica_status.has_value()) { + LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name); + } + + if (replica_name != replication::kReservedReplicationRoleName) { + continue; + } + + auto replica_status = *maybe_replica_status; + + if (!replica_status.role.has_value()) { + replication_role_.store(replication::ReplicationRole::MAIN); + } else { + replication_role_.store(*replica_status.role); + port = replica_status.port; + } + + break; + } + + if (replication_role_ == replication::ReplicationRole::REPLICA) { + io::network::Endpoint endpoint(replication::kDefaultReplicationServerIp, port); + replication_server_ = + std::make_unique(this, std::move(endpoint), replication::ReplicationServerConfig{}); + } + + spdlog::info("Replication role restored to {}.", + replication_role_ == replication::ReplicationRole::MAIN ? "MAIN" : "REPLICA"); +} + IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_level_; } void Storage::SetStorageMode(StorageMode storage_mode) { @@ -2217,8 +2305,7 @@ void Storage::SetStorageMode(StorageMode storage_mode) { StorageMode Storage::GetStorageMode() { return storage_mode_; } void Storage::RestoreReplicas() { - MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole()); - if (!ShouldStoreAndRestoreReplicas()) { + if (!ShouldStoreAndRestoreReplicationState()) { return; } spdlog::info("Restoring replicas."); @@ -2226,7 +2313,7 @@ void Storage::RestoreReplicas() { for (const auto &[replica_name, replica_data] : *storage_) { spdlog::info("Restoring replica {}.", replica_name); - const auto maybe_replica_status = replication::JSONToReplicaStatus(nlohmann::json::parse(replica_data)); + const auto maybe_replica_status = replication::JSONToReplicationStatus(nlohmann::json::parse(replica_data)); if (!maybe_replica_status.has_value()) { LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name); } @@ -2235,6 +2322,10 @@ void Storage::RestoreReplicas() { MG_ASSERT(replica_status.name == replica_name, "Expected replica name is '{}', but got '{}'", replica_status.name, replica_name); + if (replica_name == replication::kReservedReplicationRoleName) { + continue; + } + auto ret = RegisterReplica(std::move(replica_status.name), {std::move(replica_status.ip_address), replica_status.port}, replica_status.sync_mode, replication::RegistrationMode::CAN_BE_INVALID, @@ -2251,6 +2342,6 @@ void Storage::RestoreReplicas() { } } -bool Storage::ShouldStoreAndRestoreReplicas() const { return nullptr != storage_; } +bool Storage::ShouldStoreAndRestoreReplicationState() const { return nullptr != storage_; } } // namespace memgraph::storage diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 7fd816530..9a9bc29e9 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -50,6 +50,7 @@ #include "rpc/server.hpp" #include "storage/v2/replication/config.hpp" #include "storage/v2/replication/enums.hpp" +#include "storage/v2/replication/replication_persistence_helper.hpp" #include "storage/v2/replication/rpc.hpp" #include "storage/v2/replication/serialization.hpp" #include "storage/v2/storage_error.hpp" @@ -188,8 +189,6 @@ struct StorageInfo { uint64_t disk_usage; }; -enum class ReplicationRole : uint8_t { MAIN, REPLICA }; - class Storage final { public: /// @throw std::system_error @@ -493,7 +492,7 @@ class Storage final { std::optional GetReplicaState(std::string_view name); - ReplicationRole GetReplicationRole() const; + replication::ReplicationRole GetReplicationRole() const; struct TimestampInfo { uint64_t current_timestamp_of_replica; @@ -557,9 +556,11 @@ class Storage final { uint64_t CommitTimestamp(std::optional desired_commit_timestamp = {}); + void RestoreReplicationRole(); + void RestoreReplicas(); - bool ShouldStoreAndRestoreReplicas() const; + bool ShouldStoreAndRestoreReplicationState() const; // Main storage lock. // @@ -680,7 +681,7 @@ class Storage final { using ReplicationClientList = utils::Synchronized>, utils::SpinLock>; ReplicationClientList replication_clients_; - std::atomic replication_role_{ReplicationRole::MAIN}; + std::atomic replication_role_{replication::ReplicationRole::MAIN}; }; } // namespace memgraph::storage diff --git a/src/utils/frame_change_id.hpp b/src/utils/frame_change_id.hpp index 525af4e92..e3facfa5e 100644 --- a/src/utils/frame_change_id.hpp +++ b/src/utils/frame_change_id.hpp @@ -1,3 +1,14 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + #include #include "query/frontend/ast/ast.hpp" diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index d57f556f2..6bd36dc83 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -9,17 +9,16 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -import sys - import os -import pytest import random +import sys +import tempfile -from common import execute_and_fetch_all -from mg_utils import mg_sleep_and_assert import interactive_mg_runner import mgclient -import tempfile +import pytest +from common import execute_and_fetch_all +from mg_utils import mg_sleep_and_assert interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) interactive_mg_runner.PROJECT_DIR = os.path.normpath( @@ -340,6 +339,138 @@ def test_basic_recovery(connection): assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main +def test_replication_role_recovery(connection): + # Goal of this test is to check the recovery of main and replica role. + # 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. + # 1/ We try to add a replica with reserved name which results in an exception + # 2/ We check that all replicas have the correct state: they should all be ready. + # 3/ We kill main. + # 4/ We re-start main. We check that main indeed has the role main and replicas still have the correct state. + # 5/ We kill the replica. + # 6/ We observed that the replica result is in invalid state. + # 7/ We start the replica again. We observe that indeed the replica has the replica state. + # 8/ We observe that main has the replica ready. + # 9/ We kill the replica again. + # 10/ We add data to main. + # 11/ We start the replica again. We observe that the replica has the same + # data as main because it synced and added lost data. + + # 0/ + data_directory = tempfile.TemporaryDirectory() + CONFIGURATION = { + "replica": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "replica.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + "data_directory": f"{data_directory.name}/replica", + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [], + "data_directory": f"{data_directory.name}/main", + }, + } + + interactive_mg_runner.start_all(CONFIGURATION) + cursor = connection(7687, "main").cursor() + + # We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration. + execute_and_fetch_all(cursor, "REGISTER REPLICA replica SYNC TO '127.0.0.1:10001';") + + # When we restart the replica, it does not need this query anymore since it needs to remember state + CONFIGURATION = { + "replica": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "replica.log", + "setup_queries": [], + "data_directory": f"{data_directory.name}/replica", + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [], + "data_directory": f"{data_directory.name}/main", + }, + } + # 1/ + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, "REGISTER REPLICA __replication_role SYNC TO '127.0.0.1:10002';") + + # 2/ + expected_data = { + ("replica", "127.0.0.1:10001", "sync", 0, 0, "ready"), + } + actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + assert actual_data == expected_data + + def check_roles(): + assert "main" == interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICATION ROLE;")[0][0] + assert "replica" == interactive_mg_runner.MEMGRAPH_INSTANCES["replica"].query("SHOW REPLICATION ROLE;")[0][0] + + check_roles() + + # 3/ + interactive_mg_runner.kill(CONFIGURATION, "main") + + # 4/ + interactive_mg_runner.start(CONFIGURATION, "main") + cursor = connection(7687, "main").cursor() + check_roles() + + def retrieve_data(): + return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 5/ + interactive_mg_runner.kill(CONFIGURATION, "replica") + + # 6/ + expected_data = { + ("replica", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + } + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + + assert actual_data == expected_data + + # 7/ + interactive_mg_runner.start(CONFIGURATION, "replica") + check_roles() + + # 8/ + expected_data = { + ("replica", "127.0.0.1:10001", "sync", 0, 0, "ready"), + } + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 9/ + interactive_mg_runner.kill(CONFIGURATION, "replica") + + # 10/ + with pytest.raises(mgclient.DatabaseError): + execute_and_fetch_all(cursor, "CREATE (n:First)") + + # 11/ + interactive_mg_runner.start(CONFIGURATION, "replica") + check_roles() + + expected_data = { + ("replica", "127.0.0.1:10001", "sync", 2, 0, "ready"), + } + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK) + assert len(res_from_main) == 1 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["replica"].query(QUERY_TO_CHECK) + + def test_conflict_at_startup(connection): # Goal of this test is to check starting up several instance with different replicas' configuration directory works as expected. # main_1 and main_2 have different directory. diff --git a/tests/unit/replication_persistence_helper.cpp b/tests/unit/replication_persistence_helper.cpp index ffe125cd5..7915d315c 100644 --- a/tests/unit/replication_persistence_helper.cpp +++ b/tests/unit/replication_persistence_helper.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -19,73 +19,82 @@ #include #include -class ReplicationPersistanceHelperTest : public ::testing::Test { +using namespace memgraph::storage::replication; + +class ReplicationPersistanceHelperTest : public testing::Test { protected: void SetUp() override {} void TearDown() override {} - memgraph::storage::replication::ReplicaStatus CreateReplicaStatus( - std::string name, std::string ip_address, uint16_t port, - memgraph::storage::replication::ReplicationMode sync_mode, std::chrono::seconds replica_check_frequency, - std::optional ssl) const { - return memgraph::storage::replication::ReplicaStatus{.name = name, - .ip_address = ip_address, - .port = port, - .sync_mode = sync_mode, - .replica_check_frequency = replica_check_frequency, - .ssl = ssl}; + ReplicationStatus CreateReplicationStatus(std::string name, std::string ip_address, uint16_t port, + ReplicationMode sync_mode, std::chrono::seconds replica_check_frequency, + std::optional ssl, + std::optional role) const { + return ReplicationStatus{.name = name, + .ip_address = ip_address, + .port = port, + .sync_mode = sync_mode, + .replica_check_frequency = replica_check_frequency, + .ssl = ssl, + .role = role}; } static_assert( - sizeof(memgraph::storage::replication::ReplicaStatus) == 152, - "Most likely you modified ReplicaStatus without updating the tests. Please modify CreateReplicaStatus. "); + sizeof(ReplicationStatus) == 160, + "Most likely you modified ReplicationStatus without updating the tests. Please modify CreateReplicationStatus. "); }; TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesInitialized) { - auto replicas_status = CreateReplicaStatus( - "name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1), - memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}); + auto replicas_status = CreateReplicationStatus( + "name", "ip_address", 0, ReplicationMode::SYNC, std::chrono::seconds(1), + ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}, ReplicationRole::REPLICA); - auto json_status = memgraph::storage::replication::ReplicaStatusToJSON( - memgraph::storage::replication::ReplicaStatus(replicas_status)); - auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status)); + auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status)); + auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status)); ASSERT_EQ(replicas_status, *replicas_status_converted); } TEST_F(ReplicationPersistanceHelperTest, BasicTestOnlyMandatoryAttributesInitialized) { - auto replicas_status = - CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, - std::chrono::seconds(1), std::nullopt); + auto replicas_status = CreateReplicationStatus("name", "ip_address", 0, ReplicationMode::SYNC, + std::chrono::seconds(1), std::nullopt, std::nullopt); - auto json_status = memgraph::storage::replication::ReplicaStatusToJSON( - memgraph::storage::replication::ReplicaStatus(replicas_status)); - auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status)); + auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status)); + auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status)); ASSERT_EQ(replicas_status, *replicas_status_converted); } TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButSSLInitialized) { - auto replicas_status = - CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, - std::chrono::seconds(1), std::nullopt); + auto replicas_status = CreateReplicationStatus("name", "ip_address", 0, ReplicationMode::SYNC, + std::chrono::seconds(1), std::nullopt, ReplicationRole::MAIN); - auto json_status = memgraph::storage::replication::ReplicaStatusToJSON( - memgraph::storage::replication::ReplicaStatus(replicas_status)); - auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status)); + auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status)); + auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status)); ASSERT_EQ(replicas_status, *replicas_status_converted); } TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButTimeoutInitialized) { - auto replicas_status = CreateReplicaStatus( - "name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1), - memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}); + auto replicas_status = CreateReplicationStatus( + "name", "ip_address", 0, ReplicationMode::SYNC, std::chrono::seconds(1), + ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}, ReplicationRole::REPLICA); - auto json_status = memgraph::storage::replication::ReplicaStatusToJSON( - memgraph::storage::replication::ReplicaStatus(replicas_status)); - auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status)); + auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status)); + auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status)); + + ASSERT_EQ(replicas_status, *replicas_status_converted); +} + +TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButReplicationRoleInitialized) { + // this one is importand for backwards compatibility + auto replicas_status = CreateReplicationStatus( + "name", "ip_address", 0, ReplicationMode::SYNC, std::chrono::seconds(1), + ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}, std::nullopt); + + auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status)); + auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status)); ASSERT_EQ(replicas_status, *replicas_status_converted); } diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index 555b7ab27..54baa2a65 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -652,9 +652,9 @@ TEST_F(ReplicationTest, ReplicationInformation) { memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID) .HasError()); - ASSERT_EQ(main_store.GetReplicationRole(), memgraph::storage::ReplicationRole::MAIN); - ASSERT_EQ(replica_store1.GetReplicationRole(), memgraph::storage::ReplicationRole::REPLICA); - ASSERT_EQ(replica_store2.GetReplicationRole(), memgraph::storage::ReplicationRole::REPLICA); + ASSERT_EQ(main_store.GetReplicationRole(), memgraph::storage::replication::ReplicationRole::MAIN); + ASSERT_EQ(replica_store1.GetReplicationRole(), memgraph::storage::replication::ReplicationRole::REPLICA); + ASSERT_EQ(replica_store2.GetReplicationRole(), memgraph::storage::replication::ReplicationRole::REPLICA); const auto replicas_info = main_store.ReplicasInfo(); ASSERT_EQ(replicas_info.size(), 2); @@ -730,7 +730,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) { TEST_F(ReplicationTest, RestoringReplicationAtStartupAftgerDroppingReplica) { auto main_config = configuration; - main_config.durability.restore_replicas_on_startup = true; + main_config.durability.restore_replication_state_on_startup = true; auto main_store = std::make_unique(main_config); memgraph::storage::Storage replica_store1(configuration); @@ -773,7 +773,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAftgerDroppingReplica) { TEST_F(ReplicationTest, RestoringReplicationAtStartup) { auto main_config = configuration; - main_config.durability.restore_replicas_on_startup = true; + main_config.durability.restore_replication_state_on_startup = true; auto main_store = std::make_unique(main_config); memgraph::storage::Storage replica_store1(configuration); replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});