Add restoring of replication roles upon database startup (#791)

Fix replica node restoration on startup so it is restored as replica and not as main.
This commit is contained in:
Josipmrden 2023-06-21 19:08:58 +02:00 committed by GitHub
parent 05cc35bf93
commit b875649270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 385 additions and 121 deletions

View File

@ -891,7 +891,7 @@ int main(int argc, char **argv) {
.wal_file_size_kibibytes = FLAGS_storage_wal_file_size_kib,
.wal_file_flush_every_n_tx = FLAGS_storage_wal_file_flush_every_n_tx,
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit,
.restore_replicas_on_startup = true,
.restore_replication_state_on_startup = true,
.items_per_batch = FLAGS_storage_items_per_batch,
.recovery_thread_count = FLAGS_storage_recovery_thread_count,
.allow_parallel_index_creation = FLAGS_storage_parallel_index_recovery},

View File

@ -14,8 +14,6 @@
#include <string>
namespace memgraph::query {
inline constexpr uint16_t kDefaultReplicationPort = 10000;
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
inline const std::string kAsterisk = "*";
inline constexpr uint16_t kDeleteStatisticsNumResults = 6;
} // namespace memgraph::query

View File

@ -176,7 +176,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
throw QueryRuntimeException("Port number invalid!");
}
if (!db_->SetReplicaRole(
io::network::Endpoint(query::kDefaultReplicationServerIp, static_cast<uint16_t>(*port)))) {
io::network::Endpoint(storage::replication::kDefaultReplicationServerIp, static_cast<uint16_t>(*port)))) {
throw QueryRuntimeException("Couldn't set role to replica!");
}
}
@ -185,9 +185,9 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
ReplicationQuery::ReplicationRole ShowReplicationRole() const override {
switch (db_->GetReplicationRole()) {
case storage::ReplicationRole::MAIN:
case storage::replication::ReplicationRole::MAIN:
return ReplicationQuery::ReplicationRole::MAIN;
case storage::ReplicationRole::REPLICA:
case storage::replication::ReplicationRole::REPLICA:
return ReplicationQuery::ReplicationRole::REPLICA;
}
throw QueryRuntimeException("Couldn't show replication role - invalid role set!");
@ -197,11 +197,15 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode,
const std::chrono::seconds replica_check_frequency) override {
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
// replica can't register another replica
throw QueryRuntimeException("Replica can't register another replica!");
}
if (name == storage::replication::kReservedReplicationRoleName) {
throw QueryRuntimeException("This replica name is reserved and can not be used as replica name!");
}
storage::replication::ReplicationMode repl_mode;
switch (sync_mode) {
case ReplicationQuery::SyncMode::ASYNC: {
@ -215,7 +219,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
}
auto maybe_ip_and_port =
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, query::kDefaultReplicationPort);
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, storage::replication::kDefaultReplicationPort);
if (maybe_ip_and_port) {
auto [ip, port] = *maybe_ip_and_port;
auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode,
@ -231,7 +235,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
void DropReplica(const std::string &replica_name) override {
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
// replica can't unregister a replica
throw QueryRuntimeException("Replica can't unregister a replica!");
}
@ -242,7 +246,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
using Replica = ReplicationQueryHandler::Replica;
std::vector<Replica> ShowReplicas() const override {
if (db_->GetReplicationRole() == storage::ReplicationRole::REPLICA) {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
// replica can't show registered replicas (it shouldn't have any)
throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!");
}
@ -2982,7 +2986,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
UpdateTypeCount(rw_type);
if (const auto query_type = query_execution->prepared_query->rw_type;
interpreter_context_->db->GetReplicationRole() == storage::ReplicationRole::REPLICA &&
interpreter_context_->db->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA &&
(query_type == RWType::W || query_type == RWType::RW)) {
query_execution = nullptr;
throw QueryException("Write query forbidden on the replica!");

View File

@ -49,7 +49,7 @@ struct Config {
uint64_t wal_file_flush_every_n_tx{100000};
bool snapshot_on_exit{false};
bool restore_replicas_on_startup{false};
bool restore_replication_state_on_startup{false};
uint64_t items_per_batch{1'000'000};
uint64_t recovery_thread_count{8};

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -13,6 +13,8 @@
#include <cstdint>
namespace memgraph::storage::replication {
enum class ReplicationRole : uint8_t { MAIN, REPLICA };
enum class ReplicationMode : std::uint8_t { SYNC, ASYNC };
enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID };

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,21 +10,24 @@
// licenses/APL.txt.
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/replication/enums.hpp"
#include "utils/logging.hpp"
namespace {
const std::string kReplicaName = "replica_name";
const std::string kIpAddress = "replica_ip_address";
const std::string kPort = "replica_port";
const std::string kSyncMode = "replica_sync_mode";
const std::string kCheckFrequency = "replica_check_frequency";
const std::string kSSLKeyFile = "replica_ssl_key_file";
const std::string kSSLCertFile = "replica_ssl_cert_file";
inline constexpr auto *kReplicaName = "replica_name";
inline constexpr auto *kIpAddress = "replica_ip_address";
inline constexpr auto *kPort = "replica_port";
inline constexpr auto *kSyncMode = "replica_sync_mode";
inline constexpr auto *kCheckFrequency = "replica_check_frequency";
inline constexpr auto *kSSLKeyFile = "replica_ssl_key_file";
inline constexpr auto *kSSLCertFile = "replica_ssl_cert_file";
inline constexpr auto *kReplicationRole = "replication_role";
} // namespace
namespace memgraph::storage::replication {
nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status) {
nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status) {
auto data = nlohmann::json::object();
data[kReplicaName] = std::move(status.name);
@ -42,11 +45,15 @@ nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status) {
data[kSSLCertFile] = nullptr;
}
if (status.role.has_value()) {
data[kReplicationRole] = *status.role;
}
return data;
}
std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data) {
ReplicaStatus replica_status;
std::optional<ReplicationStatus> JSONToReplicationStatus(nlohmann::json &&data) {
ReplicationStatus replica_status;
const auto get_failed_message = [](const std::string_view message, const std::string_view nested_message) {
return fmt::format("Failed to deserialize replica's configuration: {} : {}", message, nested_message);
@ -70,6 +77,11 @@ std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data) {
data.at(kSSLKeyFile).get_to(replica_status.ssl->key_file);
data.at(kSSLCertFile).get_to(replica_status.ssl->cert_file);
}
if (data.find(kReplicationRole) != data.end()) {
replica_status.role = replication::ReplicationRole::MAIN;
data.at(kReplicationRole).get_to(replica_status.role.value());
}
} catch (const nlohmann::json::type_error &exception) {
spdlog::error(get_failed_message("Invalid type conversion", exception.what()));
return std::nullopt;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -23,18 +23,23 @@
namespace memgraph::storage::replication {
struct ReplicaStatus {
inline constexpr auto *kReservedReplicationRoleName{"__replication_role"};
inline constexpr uint16_t kDefaultReplicationPort = 10000;
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
struct ReplicationStatus {
std::string name;
std::string ip_address;
uint16_t port;
ReplicationMode sync_mode;
std::chrono::seconds replica_check_frequency;
std::optional<ReplicationClientConfig::SSL> ssl;
std::optional<ReplicationRole> role;
friend bool operator==(const ReplicaStatus &, const ReplicaStatus &) = default;
friend bool operator==(const ReplicationStatus &, const ReplicationStatus &) = default;
};
nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status);
nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status);
std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data);
std::optional<ReplicationStatus> JSONToReplicationStatus(nlohmann::json &&data);
} // namespace memgraph::storage::replication

View File

@ -335,13 +335,6 @@ Storage::Storage(Config config)
uuid_(utils::GenerateUUID()),
epoch_id_(utils::GenerateUUID()),
global_locker_(file_retainer_.AddLocker()) {
if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED &&
replication_role_ == ReplicationRole::MAIN) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider "
"enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because "
"without write-ahead logs this instance is not replicating any data.");
}
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED ||
config_.durability.snapshot_on_exit || config_.durability.recover_on_startup) {
// Create the directory initially to crash the database in case of
@ -437,14 +430,29 @@ Storage::Storage(Config config)
commit_log_.emplace(timestamp_);
}
if (config_.durability.restore_replicas_on_startup) {
spdlog::info("Replica's configuration will be stored and will be automatically restored in case of a crash.");
if (config_.durability.restore_replication_state_on_startup) {
spdlog::info("Replication configuration will be stored and will be automatically restored in case of a crash.");
utils::EnsureDirOrDie(config_.durability.storage_directory / durability::kReplicationDirectory);
storage_ =
std::make_unique<kvstore::KVStore>(config_.durability.storage_directory / durability::kReplicationDirectory);
RestoreReplicas();
RestoreReplicationRole();
if (replication_role_ == replication::ReplicationRole::MAIN) {
RestoreReplicas();
}
} else {
spdlog::warn("Replicas' configuration will NOT be stored. When the server restarts, replicas will be forgotten.");
spdlog::warn(
"Replicastion configuration will NOT be stored. When the server restarts, replication state will be "
"forgotten.");
}
if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED &&
replication_role_ == replication::ReplicationRole::MAIN) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider "
"enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because "
"without write-ahead logs this instance is not replicating any data.");
}
}
@ -968,7 +976,7 @@ utils::BasicResult<StorageDataManipulationError, void> Storage::Accessor::Commit
// modifications before they are written to disk.
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
if (storage_->replication_role_ == replication::ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
could_replicate_all_sync_replicas = storage_->AppendToWalDataManipulation(transaction_, *commit_timestamp_);
}
@ -982,7 +990,8 @@ utils::BasicResult<StorageDataManipulationError, void> Storage::Accessor::Commit
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
// Replica can only update the last commit timestamp with
// the commits received from main.
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
if (storage_->replication_role_ == replication::ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
storage_->last_commit_timestamp_.store(*commit_timestamp_);
}
@ -1447,7 +1456,7 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level, StorageMo
// of any query on replica to the last commited transaction
// which is timestamp_ as only commit of transaction with writes
// can change the value of it.
if (replication_role_ == ReplicationRole::REPLICA) {
if (replication_role_ == replication::ReplicationRole::REPLICA) {
start_timestamp = timestamp_;
} else {
start_timestamp = timestamp_++;
@ -1752,7 +1761,7 @@ bool Storage::AppendToWalDataManipulation(const Transaction &transaction, uint64
// A single transaction will always be contained in a single WAL file.
auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire);
if (replication_role_.load() == ReplicationRole::MAIN) {
if (replication_role_.load() == replication::ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(wal_file_->SequenceNumber());
@ -1940,7 +1949,7 @@ bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation opera
auto finalized_on_all_replicas = true;
wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp);
{
if (replication_role_.load() == ReplicationRole::MAIN) {
if (replication_role_.load() == replication::ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(wal_file_->SequenceNumber());
@ -1960,7 +1969,7 @@ bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation opera
}
utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot(std::optional<bool> is_periodic) {
if (replication_role_.load() != ReplicationRole::MAIN) {
if (replication_role_.load() != replication::ReplicationRole::MAIN) {
return CreateSnapshotError::DisabledForReplica;
}
@ -2054,20 +2063,38 @@ uint64_t Storage::CommitTimestamp(const std::optional<uint64_t> desired_commit_t
bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) {
// We don't want to restart the server if we're already a REPLICA
if (replication_role_ == ReplicationRole::REPLICA) {
if (replication_role_ == replication::ReplicationRole::REPLICA) {
return false;
}
auto port = endpoint.port; // assigning because we will move the endpoint
replication_server_ = std::make_unique<ReplicationServer>(this, std::move(endpoint), config);
replication_role_.store(ReplicationRole::REPLICA);
if (ShouldStoreAndRestoreReplicationState()) {
// Only thing that matters here is the role saved as REPLICA and the listening port
auto data = replication::ReplicationStatusToJSON(
replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName,
.ip_address = "",
.port = port,
.sync_mode = replication::ReplicationMode::SYNC,
.replica_check_frequency = std::chrono::seconds(0),
.ssl = std::nullopt,
.role = replication::ReplicationRole::REPLICA});
if (!storage_->Put(replication::kReservedReplicationRoleName, data.dump())) {
spdlog::error("Error when saving REPLICA replication role in settings.");
return false;
}
}
replication_role_.store(replication::ReplicationRole::REPLICA);
return true;
}
bool Storage::SetMainReplicationRole() {
// We don't want to generate new epoch_id and do the
// cleanup if we're already a MAIN
if (replication_role_ == ReplicationRole::MAIN) {
if (replication_role_ == replication::ReplicationRole::MAIN) {
return false;
}
@ -2090,14 +2117,33 @@ bool Storage::SetMainReplicationRole() {
epoch_id_ = utils::GenerateUUID();
}
replication_role_.store(ReplicationRole::MAIN);
if (ShouldStoreAndRestoreReplicationState()) {
// Only thing that matters here is the role saved as MAIN
auto data = replication::ReplicationStatusToJSON(
replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName,
.ip_address = "",
.port = 0,
.sync_mode = replication::ReplicationMode::SYNC,
.replica_check_frequency = std::chrono::seconds(0),
.ssl = std::nullopt,
.role = replication::ReplicationRole::MAIN});
if (!storage_->Put(replication::kReservedReplicationRoleName, data.dump())) {
spdlog::error("Error when saving MAIN replication role in settings.");
return false;
}
}
replication_role_.store(replication::ReplicationRole::MAIN);
return true;
}
utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode,
const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!");
MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN,
"Only main instance can register a replica!");
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; });
@ -2116,14 +2162,15 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
return RegisterReplicaError::END_POINT_EXISTS;
}
if (ShouldStoreAndRestoreReplicas()) {
auto data = replication::ReplicaStatusToJSON(
replication::ReplicaStatus{.name = name,
.ip_address = endpoint.address,
.port = endpoint.port,
.sync_mode = replication_mode,
.replica_check_frequency = config.replica_check_frequency,
.ssl = config.ssl});
if (ShouldStoreAndRestoreReplicationState()) {
auto data = replication::ReplicationStatusToJSON(
replication::ReplicationStatus{.name = name,
.ip_address = endpoint.address,
.port = endpoint.port,
.sync_mode = replication_mode,
.replica_check_frequency = config.replica_check_frequency,
.ssl = config.ssl,
.role = replication::ReplicationRole::REPLICA});
if (!storage_->Put(name, data.dump())) {
spdlog::error("Error when saving replica {} in settings.", name);
return RegisterReplicaError::COULD_NOT_BE_PERSISTED;
@ -2159,8 +2206,9 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
}
bool Storage::UnregisterReplica(const std::string &name) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!");
if (ShouldStoreAndRestoreReplicas()) {
MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN,
"Only main instance can unregister a replica!");
if (ShouldStoreAndRestoreReplicationState()) {
if (!storage_->Delete(name)) {
spdlog::error("Error when removing replica {} from settings.", name);
return false;
@ -2183,7 +2231,7 @@ std::optional<replication::ReplicaState> Storage::GetReplicaState(const std::str
});
}
ReplicationRole Storage::GetReplicationRole() const { return replication_role_; }
replication::ReplicationRole Storage::GetReplicationRole() const { return replication_role_; }
std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
return replication_clients_.WithLock([](auto &clients) {
@ -2207,6 +2255,46 @@ utils::BasicResult<Storage::SetIsolationLevelError> Storage::SetIsolationLevel(I
return {};
}
void Storage::RestoreReplicationRole() {
if (!ShouldStoreAndRestoreReplicationState()) {
return;
}
spdlog::info("Restoring replication role.");
uint16_t port = replication::kDefaultReplicationPort;
for (const auto &[replica_name, replica_data] : *storage_) {
const auto maybe_replica_status = replication::JSONToReplicationStatus(nlohmann::json::parse(replica_data));
if (!maybe_replica_status.has_value()) {
LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name);
}
if (replica_name != replication::kReservedReplicationRoleName) {
continue;
}
auto replica_status = *maybe_replica_status;
if (!replica_status.role.has_value()) {
replication_role_.store(replication::ReplicationRole::MAIN);
} else {
replication_role_.store(*replica_status.role);
port = replica_status.port;
}
break;
}
if (replication_role_ == replication::ReplicationRole::REPLICA) {
io::network::Endpoint endpoint(replication::kDefaultReplicationServerIp, port);
replication_server_ =
std::make_unique<ReplicationServer>(this, std::move(endpoint), replication::ReplicationServerConfig{});
}
spdlog::info("Replication role restored to {}.",
replication_role_ == replication::ReplicationRole::MAIN ? "MAIN" : "REPLICA");
}
IsolationLevel Storage::GetIsolationLevel() const noexcept { return isolation_level_; }
void Storage::SetStorageMode(StorageMode storage_mode) {
@ -2217,8 +2305,7 @@ void Storage::SetStorageMode(StorageMode storage_mode) {
StorageMode Storage::GetStorageMode() { return storage_mode_; }
void Storage::RestoreReplicas() {
MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole());
if (!ShouldStoreAndRestoreReplicas()) {
if (!ShouldStoreAndRestoreReplicationState()) {
return;
}
spdlog::info("Restoring replicas.");
@ -2226,7 +2313,7 @@ void Storage::RestoreReplicas() {
for (const auto &[replica_name, replica_data] : *storage_) {
spdlog::info("Restoring replica {}.", replica_name);
const auto maybe_replica_status = replication::JSONToReplicaStatus(nlohmann::json::parse(replica_data));
const auto maybe_replica_status = replication::JSONToReplicationStatus(nlohmann::json::parse(replica_data));
if (!maybe_replica_status.has_value()) {
LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name);
}
@ -2235,6 +2322,10 @@ void Storage::RestoreReplicas() {
MG_ASSERT(replica_status.name == replica_name, "Expected replica name is '{}', but got '{}'", replica_status.name,
replica_name);
if (replica_name == replication::kReservedReplicationRoleName) {
continue;
}
auto ret =
RegisterReplica(std::move(replica_status.name), {std::move(replica_status.ip_address), replica_status.port},
replica_status.sync_mode, replication::RegistrationMode::CAN_BE_INVALID,
@ -2251,6 +2342,6 @@ void Storage::RestoreReplicas() {
}
}
bool Storage::ShouldStoreAndRestoreReplicas() const { return nullptr != storage_; }
bool Storage::ShouldStoreAndRestoreReplicationState() const { return nullptr != storage_; }
} // namespace memgraph::storage

View File

@ -50,6 +50,7 @@
#include "rpc/server.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#include "storage/v2/storage_error.hpp"
@ -188,8 +189,6 @@ struct StorageInfo {
uint64_t disk_usage;
};
enum class ReplicationRole : uint8_t { MAIN, REPLICA };
class Storage final {
public:
/// @throw std::system_error
@ -493,7 +492,7 @@ class Storage final {
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name);
ReplicationRole GetReplicationRole() const;
replication::ReplicationRole GetReplicationRole() const;
struct TimestampInfo {
uint64_t current_timestamp_of_replica;
@ -557,9 +556,11 @@ class Storage final {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
void RestoreReplicationRole();
void RestoreReplicas();
bool ShouldStoreAndRestoreReplicas() const;
bool ShouldStoreAndRestoreReplicationState() const;
// Main storage lock.
//
@ -680,7 +681,7 @@ class Storage final {
using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>;
ReplicationClientList replication_clients_;
std::atomic<ReplicationRole> replication_role_{ReplicationRole::MAIN};
std::atomic<replication::ReplicationRole> replication_role_{replication::ReplicationRole::MAIN};
};
} // namespace memgraph::storage

View File

@ -1,3 +1,14 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <string>
#include "query/frontend/ast/ast.hpp"

View File

@ -9,17 +9,16 @@
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import os
import pytest
import random
import sys
import tempfile
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
import interactive_mg_runner
import mgclient
import tempfile
import pytest
from common import execute_and_fetch_all
from mg_utils import mg_sleep_and_assert
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -340,6 +339,138 @@ def test_basic_recovery(connection):
assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main
def test_replication_role_recovery(connection):
# Goal of this test is to check the recovery of main and replica role.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We try to add a replica with reserved name which results in an exception
# 2/ We check that all replicas have the correct state: they should all be ready.
# 3/ We kill main.
# 4/ We re-start main. We check that main indeed has the role main and replicas still have the correct state.
# 5/ We kill the replica.
# 6/ We observed that the replica result is in invalid state.
# 7/ We start the replica again. We observe that indeed the replica has the replica state.
# 8/ We observe that main has the replica ready.
# 9/ We kill the replica again.
# 10/ We add data to main.
# 11/ We start the replica again. We observe that the replica has the same
# data as main because it synced and added lost data.
# 0/
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"replica": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
"data_directory": f"{data_directory.name}/replica",
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}/main",
},
}
interactive_mg_runner.start_all(CONFIGURATION)
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration.
execute_and_fetch_all(cursor, "REGISTER REPLICA replica SYNC TO '127.0.0.1:10001';")
# When we restart the replica, it does not need this query anymore since it needs to remember state
CONFIGURATION = {
"replica": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}/replica",
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}/main",
},
}
# 1/
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, "REGISTER REPLICA __replication_role SYNC TO '127.0.0.1:10002';")
# 2/
expected_data = {
("replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
def check_roles():
assert "main" == interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICATION ROLE;")[0][0]
assert "replica" == interactive_mg_runner.MEMGRAPH_INSTANCES["replica"].query("SHOW REPLICATION ROLE;")[0][0]
check_roles()
# 3/
interactive_mg_runner.kill(CONFIGURATION, "main")
# 4/
interactive_mg_runner.start(CONFIGURATION, "main")
cursor = connection(7687, "main").cursor()
check_roles()
def retrieve_data():
return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 5/
interactive_mg_runner.kill(CONFIGURATION, "replica")
# 6/
expected_data = {
("replica", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
}
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 7/
interactive_mg_runner.start(CONFIGURATION, "replica")
check_roles()
# 8/
expected_data = {
("replica", "127.0.0.1:10001", "sync", 0, 0, "ready"),
}
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
# 9/
interactive_mg_runner.kill(CONFIGURATION, "replica")
# 10/
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, "CREATE (n:First)")
# 11/
interactive_mg_runner.start(CONFIGURATION, "replica")
check_roles()
expected_data = {
("replica", "127.0.0.1:10001", "sync", 2, 0, "ready"),
}
actual_data = mg_sleep_and_assert(expected_data, retrieve_data)
assert actual_data == expected_data
QUERY_TO_CHECK = "MATCH (node) return node;"
res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK)
assert len(res_from_main) == 1
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["replica"].query(QUERY_TO_CHECK)
def test_conflict_at_startup(connection):
# Goal of this test is to check starting up several instance with different replicas' configuration directory works as expected.
# main_1 and main_2 have different directory.

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -19,73 +19,82 @@
#include <optional>
#include <string>
class ReplicationPersistanceHelperTest : public ::testing::Test {
using namespace memgraph::storage::replication;
class ReplicationPersistanceHelperTest : public testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
memgraph::storage::replication::ReplicaStatus CreateReplicaStatus(
std::string name, std::string ip_address, uint16_t port,
memgraph::storage::replication::ReplicationMode sync_mode, std::chrono::seconds replica_check_frequency,
std::optional<memgraph::storage::replication::ReplicationClientConfig::SSL> ssl) const {
return memgraph::storage::replication::ReplicaStatus{.name = name,
.ip_address = ip_address,
.port = port,
.sync_mode = sync_mode,
.replica_check_frequency = replica_check_frequency,
.ssl = ssl};
ReplicationStatus CreateReplicationStatus(std::string name, std::string ip_address, uint16_t port,
ReplicationMode sync_mode, std::chrono::seconds replica_check_frequency,
std::optional<ReplicationClientConfig::SSL> ssl,
std::optional<ReplicationRole> role) const {
return ReplicationStatus{.name = name,
.ip_address = ip_address,
.port = port,
.sync_mode = sync_mode,
.replica_check_frequency = replica_check_frequency,
.ssl = ssl,
.role = role};
}
static_assert(
sizeof(memgraph::storage::replication::ReplicaStatus) == 152,
"Most likely you modified ReplicaStatus without updating the tests. Please modify CreateReplicaStatus. ");
sizeof(ReplicationStatus) == 160,
"Most likely you modified ReplicationStatus without updating the tests. Please modify CreateReplicationStatus. ");
};
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesInitialized) {
auto replicas_status = CreateReplicaStatus(
"name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1),
memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"});
auto replicas_status = CreateReplicationStatus(
"name", "ip_address", 0, ReplicationMode::SYNC, std::chrono::seconds(1),
ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}, ReplicationRole::REPLICA);
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status));
auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestOnlyMandatoryAttributesInitialized) {
auto replicas_status =
CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC,
std::chrono::seconds(1), std::nullopt);
auto replicas_status = CreateReplicationStatus("name", "ip_address", 0, ReplicationMode::SYNC,
std::chrono::seconds(1), std::nullopt, std::nullopt);
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status));
auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButSSLInitialized) {
auto replicas_status =
CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC,
std::chrono::seconds(1), std::nullopt);
auto replicas_status = CreateReplicationStatus("name", "ip_address", 0, ReplicationMode::SYNC,
std::chrono::seconds(1), std::nullopt, ReplicationRole::MAIN);
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status));
auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButTimeoutInitialized) {
auto replicas_status = CreateReplicaStatus(
"name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1),
memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"});
auto replicas_status = CreateReplicationStatus(
"name", "ip_address", 0, ReplicationMode::SYNC, std::chrono::seconds(1),
ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}, ReplicationRole::REPLICA);
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status));
auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButReplicationRoleInitialized) {
// this one is importand for backwards compatibility
auto replicas_status = CreateReplicationStatus(
"name", "ip_address", 0, ReplicationMode::SYNC, std::chrono::seconds(1),
ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"}, std::nullopt);
auto json_status = ReplicationStatusToJSON(ReplicationStatus(replicas_status));
auto replicas_status_converted = JSONToReplicationStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -652,9 +652,9 @@ TEST_F(ReplicationTest, ReplicationInformation) {
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_EQ(main_store.GetReplicationRole(), memgraph::storage::ReplicationRole::MAIN);
ASSERT_EQ(replica_store1.GetReplicationRole(), memgraph::storage::ReplicationRole::REPLICA);
ASSERT_EQ(replica_store2.GetReplicationRole(), memgraph::storage::ReplicationRole::REPLICA);
ASSERT_EQ(main_store.GetReplicationRole(), memgraph::storage::replication::ReplicationRole::MAIN);
ASSERT_EQ(replica_store1.GetReplicationRole(), memgraph::storage::replication::ReplicationRole::REPLICA);
ASSERT_EQ(replica_store2.GetReplicationRole(), memgraph::storage::replication::ReplicationRole::REPLICA);
const auto replicas_info = main_store.ReplicasInfo();
ASSERT_EQ(replicas_info.size(), 2);
@ -730,7 +730,7 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
TEST_F(ReplicationTest, RestoringReplicationAtStartupAftgerDroppingReplica) {
auto main_config = configuration;
main_config.durability.restore_replicas_on_startup = true;
main_config.durability.restore_replication_state_on_startup = true;
auto main_store = std::make_unique<memgraph::storage::Storage>(main_config);
memgraph::storage::Storage replica_store1(configuration);
@ -773,7 +773,7 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAftgerDroppingReplica) {
TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
auto main_config = configuration;
main_config.durability.restore_replicas_on_startup = true;
main_config.durability.restore_replication_state_on_startup = true;
auto main_store = std::make_unique<memgraph::storage::Storage>(main_config);
memgraph::storage::Storage replica_store1(configuration);
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});