Save replication settings (#415)

* Storage takes care of the saving of setting when a new replica is added

* Restore replicas at startup

* Modify interactive_mg_runner + memgraph to support that data-directory can be configured in CONTEXT

* Extend e2e test

* Correct typo

* Add flag to config to specify when replication should be stored (true by default when starting Memgraph)

* Remove un-necessary "--" in yaml file

* Make sure Memgraph stops if a replica can't be restored.

* Add UT covering the parsing  of ReplicaStatus to/from json

* Add assert in e2e script to check that a port is free before using it

* Add test covering crash on Jepsen

* Make sure applciaiton crashes if it starts on corrupted replications' info

Starting with a non-reponsive replica is allowed.

* Add temporary startup flag: this is needed so jepsen do not automatically restore replica on startup of main. This will be removed in T0835
This commit is contained in:
Jeremy B 2022-07-07 13:30:28 +02:00 committed by GitHub
parent b737e53456
commit f629de7e60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 855 additions and 147 deletions

View File

@ -216,6 +216,11 @@ DEFINE_bool(telemetry_enabled, false,
"the database runtime (vertex and edge counts and resource usage) "
"to allow for easier improvement of the product.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_restore_replicas_on_startup, true,
"Controls replicas should be restored automatically."); // TODO(42jeremy) this must be removed once T0835
// is implemented.
// Streams flags
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_uint32(
@ -1195,7 +1200,8 @@ int main(int argc, char **argv) {
.snapshot_retention_count = FLAGS_storage_snapshot_retention_count,
.wal_file_size_kibibytes = FLAGS_storage_wal_file_size_kib,
.wal_file_flush_every_n_tx = FLAGS_storage_wal_file_flush_every_n_tx,
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit},
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit,
.restore_replicas_on_startup = FLAGS_storage_restore_replicas_on_startup},
.transaction = {.isolation_level = ParseIsolationLevel()}};
if (FLAGS_storage_snapshot_interval_sec == 0) {
if (FLAGS_storage_wal_enabled) {

View File

@ -44,6 +44,7 @@
#include "query/trigger.hpp"
#include "query/typed_value.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/replication/enums.hpp"
#include "utils/algorithm.hpp"
#include "utils/csv_parsing.hpp"
#include "utils/event_counter.hpp"
@ -184,6 +185,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
if (maybe_ip_and_port) {
auto [ip, port] = *maybe_ip_and_port;
auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode,
storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
{.replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
if (ret.HasError()) {
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));

View File

@ -619,10 +619,10 @@ void Streams::Drop(const std::string &stream_name) {
// no running Test function for this consumer, therefore it can be erased.
std::visit([&](const auto &stream_data) { stream_data.stream_source->Lock(); }, it->second);
locked_streams->erase(it);
if (!storage_.Delete(stream_name)) {
throw StreamsException("Couldn't delete stream '{}' from persistent store!", stream_name);
}
locked_streams->erase(it);
// TODO(antaljanosbenjamin) Release the transformation
}

View File

@ -188,7 +188,7 @@ class Streams final {
void Persist(StreamStatus<TStream> &&status) {
const std::string stream_name = status.name;
if (!storage_.Put(stream_name, nlohmann::json(std::move(status)).dump())) {
throw StreamsException{"Couldn't persist steam data for stream '{}'", stream_name};
throw StreamsException{"Couldn't persist stream data for stream '{}'", stream_name};
}
}

View File

@ -13,7 +13,6 @@ set(storage_v2_src_files
storage.cpp)
##### Replication #####
define_add_lcp(add_lcp_storage lcp_storage_cpp_files generated_lcp_storage_files)
add_lcp_storage(replication/rpc.lcp SLK_SERIALIZE)
@ -26,10 +25,10 @@ set(storage_v2_src_files
replication/replication_server.cpp
replication/serialization.cpp
replication/slk.cpp
replication/replication_persistence_helper.cpp
${lcp_storage_cpp_files})
#######################
find_package(gflags REQUIRED)
find_package(Threads REQUIRED)

View File

@ -49,7 +49,7 @@ struct Config {
uint64_t wal_file_flush_every_n_tx{100000};
bool snapshot_on_exit{false};
bool restore_replicas_on_startup{false};
} durability;
struct Transaction {

View File

@ -22,6 +22,7 @@ static const std::string kSnapshotDirectory{"snapshots"};
static const std::string kWalDirectory{"wal"};
static const std::string kBackupDirectory{".backup"};
static const std::string kLockFile{".lock"};
static const std::string kReplicationDirectory{"replication"};
// This is the prefix used for Snapshot and WAL filenames. It is a timestamp
// format that equals to: YYYYmmddHHMMSSffffff

View File

@ -10,6 +10,8 @@
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <optional>
#include <string>
@ -23,6 +25,8 @@ struct ReplicationClientConfig {
struct SSL {
std::string key_file = "";
std::string cert_file = "";
friend bool operator==(const SSL &, const SSL &) = default;
};
std::optional<SSL> ssl;

View File

@ -16,4 +16,6 @@ namespace memgraph::storage::replication {
enum class ReplicationMode : std::uint8_t { SYNC, ASYNC };
enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID };
enum class RegistrationMode : std::uint8_t { MUST_BE_INSTANTLY_VALID, CAN_BE_INVALID };
} // namespace memgraph::storage::replication

View File

@ -0,0 +1,83 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "utils/logging.hpp"
namespace {
const std::string kReplicaName = "replica_name";
const std::string kIpAddress = "replica_ip_address";
const std::string kPort = "replica_port";
const std::string kSyncMode = "replica_sync_mode";
const std::string kCheckFrequency = "replica_check_frequency";
const std::string kSSLKeyFile = "replica_ssl_key_file";
const std::string kSSLCertFile = "replica_ssl_cert_file";
} // namespace
namespace memgraph::storage::replication {
nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status) {
auto data = nlohmann::json::object();
data[kReplicaName] = std::move(status.name);
data[kIpAddress] = std::move(status.ip_address);
data[kPort] = status.port;
data[kSyncMode] = status.sync_mode;
data[kCheckFrequency] = status.replica_check_frequency.count();
if (status.ssl.has_value()) {
data[kSSLKeyFile] = std::move(status.ssl->key_file);
data[kSSLCertFile] = std::move(status.ssl->cert_file);
} else {
data[kSSLKeyFile] = nullptr;
data[kSSLCertFile] = nullptr;
}
return data;
}
std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data) {
ReplicaStatus replica_status;
const auto get_failed_message = [](const std::string_view message, const std::string_view nested_message) {
return fmt::format("Failed to deserialize replica's configuration: {} : {}", message, nested_message);
};
try {
data.at(kReplicaName).get_to(replica_status.name);
data.at(kIpAddress).get_to(replica_status.ip_address);
data.at(kPort).get_to(replica_status.port);
data.at(kSyncMode).get_to(replica_status.sync_mode);
replica_status.replica_check_frequency = std::chrono::seconds(data.at(kCheckFrequency));
const auto &key_file = data.at(kSSLKeyFile);
const auto &cert_file = data.at(kSSLCertFile);
MG_ASSERT(key_file.is_null() == cert_file.is_null());
if (!key_file.is_null()) {
replica_status.ssl = replication::ReplicationClientConfig::SSL{};
data.at(kSSLKeyFile).get_to(replica_status.ssl->key_file);
data.at(kSSLCertFile).get_to(replica_status.ssl->cert_file);
}
} catch (const nlohmann::json::type_error &exception) {
spdlog::error(get_failed_message("Invalid type conversion", exception.what()));
return std::nullopt;
} catch (const nlohmann::json::out_of_range &exception) {
spdlog::error(get_failed_message("Non existing field", exception.what()));
return std::nullopt;
}
return replica_status;
}
} // namespace memgraph::storage::replication

View File

@ -0,0 +1,40 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <compare>
#include <optional>
#include <string>
#include <json/json.hpp>
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
namespace memgraph::storage::replication {
struct ReplicaStatus {
std::string name;
std::string ip_address;
uint16_t port;
ReplicationMode sync_mode;
std::chrono::seconds replica_check_frequency;
std::optional<ReplicationClientConfig::SSL> ssl;
friend bool operator==(const ReplicaStatus &, const ReplicaStatus &) = default;
};
nlohmann::json ReplicaStatusToJSON(ReplicaStatus &&status);
std::optional<ReplicaStatus> JSONToReplicaStatus(nlohmann::json &&data);
} // namespace memgraph::storage::replication

View File

@ -17,6 +17,7 @@
#include <variant>
#include <gflags/gflags.h>
#include <spdlog/spdlog.h>
#include "io/network/endpoint.hpp"
#include "storage/v2/durability/durability.hpp"
@ -28,6 +29,8 @@
#include "storage/v2/indices.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/file.hpp"
@ -50,6 +53,19 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
namespace {
inline constexpr uint16_t kEpochHistoryRetention = 1000;
std::string RegisterReplicaErrorToString(Storage::RegisterReplicaError error) {
switch (error) {
case Storage::RegisterReplicaError::NAME_EXISTS:
return "NAME_EXISTS";
case Storage::RegisterReplicaError::END_POINT_EXISTS:
return "END_POINT_EXISTS";
case Storage::RegisterReplicaError::CONNECTION_FAILED:
return "CONNECTION_FAILED";
case Storage::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
return "COULD_NOT_BE_PERSISTED";
}
}
} // namespace
auto AdvanceToVisibleVertex(utils::SkipList<Vertex>::Iterator it, utils::SkipList<Vertex>::Iterator end,
@ -400,6 +416,16 @@ Storage::Storage(Config config)
} else {
commit_log_.emplace(timestamp_);
}
if (config_.durability.restore_replicas_on_startup) {
spdlog::info("Replica's configuration will be stored and will be automatically restored in case of a crash.");
utils::EnsureDirOrDie(config_.durability.storage_directory / durability::kReplicationDirectory);
storage_ =
std::make_unique<kvstore::KVStore>(config_.durability.storage_directory / durability::kReplicationDirectory);
RestoreReplicas();
} else {
spdlog::warn("Replicas' configuration will NOT be stored. When the server restarts, replicas will be forgotten.");
}
}
Storage::~Storage() {
@ -1882,7 +1908,7 @@ bool Storage::SetMainReplicationRole() {
utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config) {
const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!");
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
@ -1902,9 +1928,28 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
return RegisterReplicaError::END_POINT_EXISTS;
}
if (ShouldStoreAndRestoreReplicas()) {
auto data = replication::ReplicaStatusToJSON(
replication::ReplicaStatus{.name = name,
.ip_address = endpoint.address,
.port = endpoint.port,
.sync_mode = replication_mode,
.replica_check_frequency = config.replica_check_frequency,
.ssl = config.ssl});
if (!storage_->Put(name, data.dump())) {
spdlog::error("Error when saving replica {} in settings.", name);
return RegisterReplicaError::COULD_NOT_BE_PERSISTED;
}
}
auto client = std::make_unique<ReplicationClient>(std::move(name), this, endpoint, replication_mode, config);
if (client->State() == replication::ReplicaState::INVALID) {
return RegisterReplicaError::CONNECTION_FAILED;
if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) {
return RegisterReplicaError::CONNECTION_FAILED;
}
spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name());
}
return replication_clients_.WithLock([&](auto &clients) -> utils::BasicResult<Storage::RegisterReplicaError> {
@ -1925,8 +1970,15 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
});
}
bool Storage::UnregisterReplica(const std::string_view name) {
bool Storage::UnregisterReplica(const std::string &name) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!");
if (ShouldStoreAndRestoreReplicas()) {
if (!storage_->Delete(name)) {
spdlog::error("Error when removing replica {} from settings.", name);
return false;
}
}
return replication_clients_.WithLock([&](auto &clients) {
return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; });
});
@ -1962,4 +2014,41 @@ void Storage::SetIsolationLevel(IsolationLevel isolation_level) {
isolation_level_ = isolation_level;
}
void Storage::RestoreReplicas() {
MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole());
if (!ShouldStoreAndRestoreReplicas()) {
return;
}
spdlog::info("Restoring replicas.");
for (const auto &[replica_name, replica_data] : *storage_) {
spdlog::info("Restoring replica {}.", replica_name);
const auto maybe_replica_status = replication::JSONToReplicaStatus(nlohmann::json::parse(replica_data));
if (!maybe_replica_status.has_value()) {
LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name);
}
auto replica_status = *maybe_replica_status;
MG_ASSERT(replica_status.name == replica_name, "Expected replica name is '{}', but got '{}'", replica_status.name,
replica_name);
auto ret =
RegisterReplica(std::move(replica_status.name), {std::move(replica_status.ip_address), replica_status.port},
replica_status.sync_mode, replication::RegistrationMode::CAN_BE_INVALID,
{
.replica_check_frequency = replica_status.replica_check_frequency,
.ssl = replica_status.ssl,
});
if (ret.HasError()) {
MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError());
LOG_FATAL("Failure when restoring replica {}: {}.", replica_name, RegisterReplicaErrorToString(ret.GetError()));
}
spdlog::info("Replica {} restored.", replica_name);
}
}
bool Storage::ShouldStoreAndRestoreReplicas() const { return nullptr != storage_; }
} // namespace memgraph::storage

View File

@ -18,6 +18,7 @@
#include <variant>
#include "io/network/endpoint.hpp"
#include "kvstore/kvstore.hpp"
#include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints.hpp"
@ -411,15 +412,20 @@ class Storage final {
bool SetMainReplicationRole();
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED };
enum class RegisterReplicaError : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
CONNECTION_FAILED,
COULD_NOT_BE_PERSISTED
};
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
utils::BasicResult<RegisterReplicaError, void> RegisterReplica(
std::string name, io::network::Endpoint endpoint, replication::ReplicationMode replication_mode,
const replication::ReplicationClientConfig &config = {});
replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config = {});
/// @pre The instance should have a MAIN role
bool UnregisterReplica(std::string_view name);
bool UnregisterReplica(const std::string &name);
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name);
@ -474,6 +480,10 @@ class Storage final {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
void RestoreReplicas();
bool ShouldStoreAndRestoreReplicas() const;
// Main storage lock.
//
// Accessors take a shared lock when starting, so it is possible to block
@ -534,6 +544,7 @@ class Storage final {
std::filesystem::path wal_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
std::unique_ptr<kvstore::KVStore> storage_;
utils::Scheduler snapshot_runner_;
utils::SpinLock snapshot_lock_;

View File

@ -36,6 +36,7 @@ import os
import subprocess
from argparse import ArgumentParser
from pathlib import Path
import tempfile
import time
import sys
from inspect import signature
@ -103,7 +104,7 @@ def is_port_in_use(port: int) -> bool:
return s.connect_ex(("localhost", port)) == 0
def _start_instance(name, args, log_file, queries, use_ssl, procdir):
def _start_instance(name, args, log_file, queries, use_ssl, procdir, data_directory):
assert (
name not in MEMGRAPH_INSTANCES.keys()
), "If this raises, you are trying to start an instance with the same name than one already running."
@ -113,7 +114,8 @@ def _start_instance(name, args, log_file, queries, use_ssl, procdir):
mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl)
MEMGRAPH_INSTANCES[name] = mg_instance
log_file_path = os.path.join(BUILD_DIR, "logs", log_file)
binary_args = args + ["--log-file", log_file_path]
data_directory_path = os.path.join(BUILD_DIR, data_directory)
binary_args = args + ["--log-file", log_file_path] + ["--data-directory", data_directory_path]
if len(procdir) != 0:
binary_args.append("--query-modules-directory=" + procdir)
@ -175,8 +177,13 @@ def start_instance(context, name, procdir):
if "ssl" in value:
use_ssl = bool(value["ssl"])
value.pop("ssl")
data_directory = ""
if "data_directory" in value:
data_directory = value["data_directory"]
else:
data_directory = tempfile.TemporaryDirectory().name
instance = _start_instance(name, args, log_file, queries, use_ssl, procdir)
instance = _start_instance(name, args, log_file, queries, use_ssl, procdir, data_directory)
mg_instances[name] = instance
assert len(mg_instances) == 1

View File

@ -76,11 +76,8 @@ class MemgraphInstanceRunner:
self.stop()
self.args = copy.deepcopy(args)
self.args = [replace_paths(arg) for arg in self.args]
self.data_directory = tempfile.TemporaryDirectory()
args_mg = [
self.binary_path,
"--data-directory",
self.data_directory.name,
"--storage-wal-enabled",
"--storage-snapshot-interval-sec",
"300",

View File

@ -5,7 +5,7 @@ monitoring_port: &monitoring_port "7444"
template_cluster: &template_cluster
cluster:
monitoring:
args: ["--bolt-port=7687", "--log-level=TRACE", "--"]
args: ["--bolt-port=7687", "--log-level=TRACE"]
log_file: "monitoring-websocket-e2e.log"
template_cluster_ssl: &template_cluster_ssl
cluster:
@ -21,7 +21,6 @@ template_cluster_ssl: &template_cluster_ssl
*cert_file,
"--bolt-key-file",
*key_file,
"--",
]
log_file: "monitoring-websocket-ssl-e2e.log"
ssl: true

View File

@ -41,14 +41,14 @@ def test_show_replicas(connection):
"state",
}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
assert actual_column_names == expected_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
}
assert expected_data == actual_data
assert actual_data == expected_data
def test_show_replicas_while_inserting_data(connection):
@ -72,14 +72,14 @@ def test_show_replicas_while_inserting_data(connection):
"state",
}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
assert actual_column_names == expected_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
}
assert expected_data == actual_data
assert actual_data == expected_data
# 1/
execute_and_fetch_all(cursor, "CREATE (n1:Number {name: 'forty_two', value:42});")
@ -92,17 +92,15 @@ def test_show_replicas_while_inserting_data(connection):
("replica_3", "127.0.0.1:10003", "async", 4, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
print("actual_data=" + str(actual_data))
print("expected_data=" + str(expected_data))
assert expected_data == actual_data
assert actual_data == expected_data
# 3/
res = execute_and_fetch_all(cursor, "MATCH (node) return node;")
assert 1 == len(res)
assert len(res) == 1
# 4/
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert expected_data == actual_data
assert actual_data == expected_data
if __name__ == "__main__":

View File

@ -18,6 +18,7 @@ import time
from common import execute_and_fetch_all
import interactive_mg_runner
import mgclient
import tempfile
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
@ -84,7 +85,7 @@ def test_show_replicas(connection):
}
actual_column_names = {x.name for x in cursor.description}
assert EXPECTED_COLUMN_NAMES == actual_column_names
assert actual_column_names == EXPECTED_COLUMN_NAMES
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
@ -92,7 +93,7 @@ def test_show_replicas(connection):
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"),
}
assert expected_data == actual_data
assert actual_data == expected_data
# 2/
execute_and_fetch_all(cursor, "DROP REPLICA replica_2")
@ -102,7 +103,7 @@ def test_show_replicas(connection):
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"),
}
assert expected_data == actual_data
assert actual_data == expected_data
# 3/
interactive_mg_runner.kill(MEMGRAPH_INSTANCES_DESCRIPTION, "replica_1")
@ -117,7 +118,283 @@ def test_show_replicas(connection):
("replica_3", "127.0.0.1:10003", "async", 0, 0, "invalid"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "invalid"),
}
assert expected_data == actual_data
assert actual_data == expected_data
def test_basic_recovery(connection):
# Goal of this test is to check the recovery of main.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We check that all replicas have the correct state: they should all be ready.
# 2/ We kill main.
# 3/ We re-start main.
# 4/ We check that all replicas have the correct state: they should all be ready.
# 5/ Drop one replica.
# 6/ We add some data to main, then kill it and restart.
# 7/ We check that all replicas but one have the expected data.
# 8/ We kill another replica.
# 9/ We add some data to main.
# 10/ We re-add the two replicas droped/killed and check the data.
# 11/ We kill another replica.
# 12/ Add some more data to main.
# 13/ Check the states of replicas.
# 0/
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"replica_1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"replica_2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"replica_3": {
"args": ["--bolt-port", "7690", "--log-level=TRACE"],
"log_file": "replica3.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"],
},
"replica_4": {
"args": ["--bolt-port", "7691", "--log-level=TRACE"],
"log_file": "replica4.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10004;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}",
},
}
interactive_mg_runner.start_all(CONFIGURATION)
cursor = connection(7687, "main").cursor()
# We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration.
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';")
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';")
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';")
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';")
# 1/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 0, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 0, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
def check_roles():
assert "main" == interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICATION ROLE;")[0][0]
for index in range(1, 4):
assert (
"replica"
== interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query("SHOW REPLICATION ROLE;")[0][0]
)
check_roles()
# 2/
interactive_mg_runner.kill(CONFIGURATION, "main")
time.sleep(2)
# 3/
interactive_mg_runner.start(CONFIGURATION, "main")
cursor = connection(7687, "main").cursor()
check_roles()
# 4/
# We leave some time for the main to recover.
time.sleep(2)
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
# 5/
execute_and_fetch_all(cursor, "DROP REPLICA replica_2;")
# 6/
execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic', value:42})")
interactive_mg_runner.kill(CONFIGURATION, "main")
time.sleep(2)
interactive_mg_runner.start(CONFIGURATION, "main")
cursor = connection(7687, "main").cursor()
check_roles()
# 7/
QUERY_TO_CHECK = "MATCH (node) return node;"
res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK)
assert len(res_from_main) == 1
for index in (1, 3, 4):
assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK)
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 2, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 2, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 2, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
# Replica_2 was dropped, we check it does not have the data from main.
assert len(interactive_mg_runner.MEMGRAPH_INSTANCES["replica_2"].query(QUERY_TO_CHECK)) == 0
# 8/
interactive_mg_runner.kill(CONFIGURATION, "replica_3")
# 9/
execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic_again', value:43})")
res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK)
assert len(res_from_main) == 2
# 10/
execute_and_fetch_all(cursor, "REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';")
interactive_mg_runner.start(CONFIGURATION, "replica_3")
time.sleep(2)
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 6, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 6, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 6, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
for index in (1, 2, 3, 4):
assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main
# 11/
interactive_mg_runner.kill(CONFIGURATION, "replica_1")
time.sleep(1)
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
("replica_2", "127.0.0.1:10002", "sync", 6, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 6, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
# 12/
execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic_again_again', value:44})")
res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK)
assert len(res_from_main) == 3
for index in (2, 3, 4):
assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main
# 13/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"),
("replica_2", "127.0.0.1:10002", "sync", 9, 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", 9, 0, "ready"),
("replica_4", "127.0.0.1:10004", "async", 9, 0, "ready"),
}
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
assert actual_data == expected_data
def test_conflict_at_startup(connection):
# Goal of this test is to check starting up several instance with different replicas' configuration directory works as expected.
# main_1 and main_2 have different directory.
data_directory1 = tempfile.TemporaryDirectory()
data_directory2 = tempfile.TemporaryDirectory()
CONFIGURATION = {
"main_1": {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main1.log",
"setup_queries": [],
"data_directory": f"{data_directory1.name}",
},
"main_2": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "main2.log",
"setup_queries": [],
"data_directory": f"{data_directory2.name}",
},
}
interactive_mg_runner.start_all(CONFIGURATION)
cursor_1 = connection(7687, "main_1").cursor()
cursor_2 = connection(7688, "main_2").cursor()
assert execute_and_fetch_all(cursor_1, "SHOW REPLICATION ROLE;")[0][0] == "main"
assert execute_and_fetch_all(cursor_2, "SHOW REPLICATION ROLE;")[0][0] == "main"
def test_basic_recovery_when_replica_is_kill_when_main_is_down(connection):
# Goal of this test is to check the recovery of main.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We check that all replicas have the correct state: they should all be ready.
# 2/ We kill main then kill a replica.
# 3/ We re-start main: it should be able to restart.
# 4/ Check status of replica: replica_2 is invalid.
data_directory = tempfile.TemporaryDirectory()
CONFIGURATION = {
"replica_1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"replica_2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"],
"log_file": "main.log",
"setup_queries": [],
"data_directory": f"{data_directory.name}",
},
}
interactive_mg_runner.start_all(CONFIGURATION)
# We want to execute manually and not via the configuration, otherwise re-starting main would also execute these registration.
interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("REGISTER REPLICA replica_1 SYNC TO '127.0.0.1:10001';")
interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("REGISTER REPLICA replica_2 SYNC TO '127.0.0.1:10002';")
# 1/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "ready"),
}
actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;"))
assert actual_data == expected_data
def check_roles():
assert "main" == interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICATION ROLE;")[0][0]
for index in range(1, 2):
assert (
"replica"
== interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query("SHOW REPLICATION ROLE;")[0][0]
)
check_roles()
# 2/
interactive_mg_runner.kill(CONFIGURATION, "main")
interactive_mg_runner.kill(CONFIGURATION, "replica_2")
time.sleep(2)
# 3/
interactive_mg_runner.start(CONFIGURATION, "main")
# 4/
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 0, 0, "invalid"),
}
actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;"))
assert actual_data == expected_data
if __name__ == "__main__":

View File

@ -4,7 +4,7 @@ bolt_port: &bolt_port "7687"
template_cluster: &template_cluster
cluster:
server:
args: ["--bolt-port=7687", "--log-level=TRACE", "--"]
args: ["--bolt-port=7687", "--log-level=TRACE"]
log_file: "server-connection-e2e.log"
template_cluster_ssl: &template_cluster_ssl
cluster:
@ -18,7 +18,6 @@ template_cluster_ssl: &template_cluster_ssl
*cert_file,
"--bolt-key-file",
*key_file,
"--",
]
log_file: "server-connection-ssl-e2e.log"
ssl: true

View File

@ -25,7 +25,8 @@
:--storage-recover-on-startup
:--storage-wal-enabled
:--storage-snapshot-interval-sec 300
:--storage-properties-on-edges))
:--storage-properties-on-edges
:--storage-restore-replicas-on-startup false))
(defn stop-node!
[test node]

View File

@ -22,44 +22,43 @@ function(_add_unit_test test_cpp custom_main)
set(target_name ${test_prefix}${exec_name})
set(source_files
${test_cpp}
${ARGN})
${test_cpp}
${ARGN})
if(NOT ${custom_main})
set(source_files
${source_files}
${memgraph_unit_main})
${source_files}
${memgraph_unit_main})
endif()
add_executable(${target_name} ${source_files})
# OUTPUT_NAME sets the real name of a target when it is built and can be
# used to help create two targets of the same name even though CMake
# requires unique logical target names
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
target_link_libraries(${target_name} mg-memory mg-utils gtest gmock Threads::Threads dl)
# register test
if(TEST_COVERAGE)
add_test(${target_name} env LLVM_PROFILE_FILE=${exec_name}.profraw ./${exec_name})
else()
add_test(${target_name} ${exec_name})
endif()
# add to memgraph__unit target
add_dependencies(memgraph__unit ${target_name})
endfunction(_add_unit_test)
# Test utilities
add_library(storage_test_utils storage_test_utils.cpp)
target_link_libraries(storage_test_utils mg-storage-v2)
# Test integrations-kafka
add_library(kafka-mock STATIC kafka_mock.cpp)
target_link_libraries(kafka-mock mg-utils librdkafka++ librdkafka Threads::Threads gtest)
# Include directories are intentionally not set, because kafka-mock isn't meant to be used apart from unit tests
# Include directories are intentionally not set, because kafka-mock isn't meant to be used apart from unit tests
add_unit_test(integrations_kafka_consumer.cpp kafka_mock.cpp)
target_link_libraries(${test_prefix}integrations_kafka_consumer kafka-mock mg-integrations-kafka)
@ -70,7 +69,6 @@ add_unit_test(mgp_trans_c_api.cpp)
target_link_libraries(${test_prefix}mgp_trans_c_api mg-query)
# Test mg-query
add_unit_test(bfs_single_node.cpp)
target_link_libraries(${test_prefix}bfs_single_node mg-query)
@ -134,7 +132,6 @@ add_unit_test(query_function_mgp_module.cpp)
target_link_libraries(${test_prefix}query_function_mgp_module mg-query)
target_include_directories(${test_prefix}query_function_mgp_module PRIVATE ${CMAKE_SOURCE_DIR}/include)
# Test query/procedure
add_unit_test(query_procedure_mgp_type.cpp)
target_link_libraries(${test_prefix}query_procedure_mgp_type mg-query)
@ -151,8 +148,8 @@ target_include_directories(${test_prefix}query_procedure_py_module PRIVATE ${CMA
add_unit_test(query_procedures_mgp_graph.cpp)
target_link_libraries(${test_prefix}query_procedures_mgp_graph mg-query storage_test_utils)
target_include_directories(${test_prefix}query_procedures_mgp_graph PRIVATE ${CMAKE_SOURCE_DIR}/include)
# END query/procedure
# END query/procedure
add_unit_test(query_profile.cpp)
target_link_libraries(${test_prefix}query_profile mg-query)
@ -171,9 +168,7 @@ target_link_libraries(${test_prefix}stripped mg-query)
add_unit_test(typed_value.cpp)
target_link_libraries(${test_prefix}typed_value mg-query)
# Test mg-communication
add_unit_test(bolt_chunked_decoder_buffer.cpp)
target_link_libraries(${test_prefix}bolt_chunked_decoder_buffer mg-communication)
@ -195,21 +190,15 @@ target_link_libraries(${test_prefix}communication_buffer mg-communication mg-uti
add_unit_test(network_timeouts.cpp)
target_link_libraries(${test_prefix}network_timeouts mg-communication)
# Test mg-kvstore
add_unit_test(kvstore.cpp)
target_link_libraries(${test_prefix}kvstore mg-kvstore mg-utils)
# Test data structures
add_unit_test(ring_buffer.cpp)
target_link_libraries(${test_prefix}ring_buffer mg-utils)
# Test mg-io
add_unit_test(network_endpoint.cpp)
target_link_libraries(${test_prefix}network_endpoint mg-io)
@ -219,9 +208,7 @@ target_link_libraries(${test_prefix}network_utils mg-io)
add_unit_test(socket.cpp)
target_link_libraries(${test_prefix}socket mg-io)
# Test mg-utils
add_unit_test(utils_algorithm.cpp)
target_link_libraries(${test_prefix}utils_algorithm mg-utils)
@ -289,7 +276,6 @@ add_unit_test(utils_temporal utils_temporal.cpp)
target_link_libraries(${test_prefix}utils_temporal mg-utils)
# Test mg-storage-v2
add_unit_test(commit_log_v2.cpp)
target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2)
@ -332,40 +318,36 @@ target_link_libraries(${test_prefix}storage_v2_replication mg-storage-v2 fmt)
add_unit_test(storage_v2_isolation_level.cpp)
target_link_libraries(${test_prefix}storage_v2_isolation_level mg-storage-v2)
add_unit_test(replication_persistence_helper.cpp)
target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2)
# Test mg-auth
if (MG_ENTERPRISE)
add_unit_test(auth.cpp)
target_link_libraries(${test_prefix}auth mg-auth mg-license)
if(MG_ENTERPRISE)
add_unit_test(auth.cpp)
target_link_libraries(${test_prefix}auth mg-auth mg-license)
endif()
# Test mg-slk
if (MG_ENTERPRISE)
add_unit_test(slk_advanced.cpp)
target_link_libraries(${test_prefix}slk_advanced mg-storage-v2)
if(MG_ENTERPRISE)
add_unit_test(slk_advanced.cpp)
target_link_libraries(${test_prefix}slk_advanced mg-storage-v2)
endif()
if (MG_ENTERPRISE)
add_unit_test(slk_core.cpp)
target_link_libraries(${test_prefix}slk_core mg-slk gflags fmt)
if(MG_ENTERPRISE)
add_unit_test(slk_core.cpp)
target_link_libraries(${test_prefix}slk_core mg-slk gflags fmt)
add_unit_test(slk_streams.cpp)
target_link_libraries(${test_prefix}slk_streams mg-slk gflags fmt)
add_unit_test(slk_streams.cpp)
target_link_libraries(${test_prefix}slk_streams mg-slk gflags fmt)
endif()
# Test mg-rpc
if (MG_ENTERPRISE)
add_unit_test(rpc.cpp)
target_link_libraries(${test_prefix}rpc mg-rpc)
if(MG_ENTERPRISE)
add_unit_test(rpc.cpp)
target_link_libraries(${test_prefix}rpc mg-rpc)
endif()
# Test LCP
add_custom_command(
OUTPUT test_lcp
DEPENDS ${lcp_src_files} lcp test_lcp.lisp

View File

@ -0,0 +1,91 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "formatters.hpp"
#include "utils/logging.hpp"
#include <gtest/gtest.h>
#include <fstream>
#include <iostream>
#include <optional>
#include <string>
class ReplicationPersistanceHelperTest : public ::testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
memgraph::storage::replication::ReplicaStatus CreateReplicaStatus(
std::string name, std::string ip_address, uint16_t port,
memgraph::storage::replication::ReplicationMode sync_mode, std::chrono::seconds replica_check_frequency,
std::optional<memgraph::storage::replication::ReplicationClientConfig::SSL> ssl) const {
return memgraph::storage::replication::ReplicaStatus{.name = name,
.ip_address = ip_address,
.port = port,
.sync_mode = sync_mode,
.replica_check_frequency = replica_check_frequency,
.ssl = ssl};
}
static_assert(
sizeof(memgraph::storage::replication::ReplicaStatus) == 152,
"Most likely you modified ReplicaStatus without updating the tests. Please modify CreateReplicaStatus. ");
};
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesInitialized) {
auto replicas_status = CreateReplicaStatus(
"name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1),
memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"});
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestOnlyMandatoryAttributesInitialized) {
auto replicas_status =
CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC,
std::chrono::seconds(1), std::nullopt);
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButSSLInitialized) {
auto replicas_status =
CreateReplicaStatus("name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC,
std::chrono::seconds(1), std::nullopt);
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}
TEST_F(ReplicationPersistanceHelperTest, BasicTestAllAttributesButTimeoutInitialized) {
auto replicas_status = CreateReplicaStatus(
"name", "ip_address", 0, memgraph::storage::replication::ReplicationMode::SYNC, std::chrono::seconds(1),
memgraph::storage::replication::ReplicationClientConfig::SSL{.key_file = "key_file", .cert_file = "cert_file"});
auto json_status = memgraph::storage::replication::ReplicaStatusToJSON(
memgraph::storage::replication::ReplicaStatus(replicas_status));
auto replicas_status_converted = memgraph::storage::replication::JSONToReplicaStatus(std::move(json_status));
ASSERT_EQ(replicas_status, *replicas_status_converted);
}

View File

@ -39,6 +39,10 @@ class ReplicationTest : public ::testing::Test {
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}};
const std::string local_host = ("127.0.0.1");
const std::array<uint16_t, 2> ports{10000, 20000};
const std::array<std::string, 2> replicas = {"REPLICA1", "REPLICA2"};
private:
void Clear() {
if (!std::filesystem::exists(storage_directory)) return;
@ -50,11 +54,12 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
memgraph::storage::Storage main_store(configuration);
memgraph::storage::Storage replica_store(configuration);
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica("REPLICA", memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
// vertex create
@ -268,22 +273,24 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
memgraph::storage::Storage replica_store2(
{.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 20000});
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 20000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
const auto *vertex_label = "label";
@ -314,7 +321,7 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
check_replica(&replica_store1);
check_replica(&replica_store2);
main_store.UnregisterReplica("REPLICA2");
main_store.UnregisterReplica(replicas[1]);
{
auto acc = main_store.Access();
auto v = acc.CreateVertex();
@ -415,16 +422,17 @@ TEST_F(ReplicationTest, RecoveryProcess) {
.storage_directory = replica_storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}});
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_EQ(main_store.GetReplicaState("REPLICA1"), memgraph::storage::replication::ReplicaState::RECOVERY);
ASSERT_EQ(main_store.GetReplicaState(replicas[0]), memgraph::storage::replication::ReplicaState::RECOVERY);
while (main_store.GetReplicaState("REPLICA1") != memgraph::storage::replication::ReplicaState::READY) {
while (main_store.GetReplicaState(replicas[0]) != memgraph::storage::replication::ReplicaState::READY) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
@ -484,11 +492,12 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
memgraph::storage::Storage replica_store_async(configuration);
replica_store_async.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 20000});
replica_store_async.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA_ASYNC", memgraph::io::network::Endpoint{"127.0.0.1", 20000},
memgraph::storage::replication::ReplicationMode::ASYNC)
.RegisterReplica("REPLICA_ASYNC", memgraph::io::network::Endpoint{local_host, ports[1]},
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
static constexpr size_t vertices_create_num = 10;
@ -524,20 +533,22 @@ TEST_F(ReplicationTest, EpochTest) {
memgraph::storage::Storage replica_store1(configuration);
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10001});
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, 10001});
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_FALSE(main_store
.RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 10001},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, 10001},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
std::optional<memgraph::storage::Gid> vertex_gid;
@ -560,13 +571,14 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc.Commit().HasError());
}
main_store.UnregisterReplica("REPLICA1");
main_store.UnregisterReplica("REPLICA2");
main_store.UnregisterReplica(replicas[0]);
main_store.UnregisterReplica(replicas[1]);
replica_store1.SetMainReplicationRole();
ASSERT_FALSE(replica_store1
.RegisterReplica("REPLICA2", memgraph::io::network::Endpoint{"127.0.0.1", 10001},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, 10001},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
{
@ -588,10 +600,11 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc.Commit().HasError());
}
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
ASSERT_TRUE(main_store
.RegisterReplica("REPLICA1", memgraph::io::network::Endpoint{"127.0.0.1", 10000},
memgraph::storage::replication::ReplicationMode::SYNC)
.RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
{
@ -615,25 +628,27 @@ TEST_F(ReplicationTest, ReplicationInformation) {
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002};
const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10002};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
ASSERT_FALSE(
main_store
.RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC)
.HasError());
const std::string replica1_name{replicas[0]};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
const std::string replica2_name{"REPLICA2"};
ASSERT_FALSE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
.HasError());
const std::string replica2_name{replicas[1]};
ASSERT_FALSE(main_store
.RegisterReplica(replica2_name, replica2_endpoint,
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
ASSERT_EQ(main_store.GetReplicationRole(), memgraph::storage::ReplicationRole::MAIN);
ASSERT_EQ(replica_store1.GetReplicationRole(), memgraph::storage::ReplicationRole::REPLICA);
@ -660,25 +675,27 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002};
const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10002};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
ASSERT_FALSE(
main_store
.RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC)
.HasError());
const std::string replica1_name{replicas[0]};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
const std::string replica2_name{"REPLICA1"};
ASSERT_TRUE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::NAME_EXISTS);
const std::string replica2_name{replicas[0]};
ASSERT_TRUE(main_store
.RegisterReplica(replica2_name, replica2_endpoint,
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::NAME_EXISTS);
}
TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
@ -686,23 +703,126 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
const memgraph::io::network::Endpoint replica1_endpoint{local_host, 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10001};
const memgraph::io::network::Endpoint replica2_endpoint{local_host, 10001};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
ASSERT_FALSE(
main_store
.RegisterReplica(replica1_name, replica1_endpoint, memgraph::storage::replication::ReplicationMode::SYNC)
.HasError());
const std::string replica1_name{replicas[0]};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.HasError());
const std::string replica2_name{"REPLICA2"};
ASSERT_TRUE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::END_POINT_EXISTS);
const std::string replica2_name{replicas[1]};
ASSERT_TRUE(main_store
.RegisterReplica(replica2_name, replica2_endpoint,
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::END_POINT_EXISTS);
}
TEST_F(ReplicationTest, RestoringReplicationAtStartupAftgerDroppingReplica) {
auto main_config = configuration;
main_config.durability.restore_replicas_on_startup = true;
auto main_store = std::make_unique<memgraph::storage::Storage>(main_config);
memgraph::storage::Storage replica_store1(configuration);
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]});
auto res = main_store->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID);
ASSERT_FALSE(res.HasError());
res = main_store->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID);
ASSERT_FALSE(res.HasError());
auto replica_infos = main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 2);
ASSERT_EQ(replica_infos[0].name, replicas[0]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]);
ASSERT_EQ(replica_infos[1].name, replicas[1]);
ASSERT_EQ(replica_infos[1].endpoint.address, local_host);
ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]);
main_store.reset();
auto other_main_store = std::make_unique<memgraph::storage::Storage>(main_config);
replica_infos = other_main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 2);
ASSERT_EQ(replica_infos[0].name, replicas[0]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]);
ASSERT_EQ(replica_infos[1].name, replicas[1]);
ASSERT_EQ(replica_infos[1].endpoint.address, local_host);
ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]);
}
TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
auto main_config = configuration;
main_config.durability.restore_replicas_on_startup = true;
auto main_store = std::make_unique<memgraph::storage::Storage>(main_config);
memgraph::storage::Storage replica_store1(configuration);
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[0]});
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{local_host, ports[1]});
auto res = main_store->RegisterReplica(replicas[0], memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID);
ASSERT_FALSE(res.HasError());
res = main_store->RegisterReplica(replicas[1], memgraph::io::network::Endpoint{local_host, ports[1]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID);
ASSERT_FALSE(res.HasError());
auto replica_infos = main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 2);
ASSERT_EQ(replica_infos[0].name, replicas[0]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[0]);
ASSERT_EQ(replica_infos[1].name, replicas[1]);
ASSERT_EQ(replica_infos[1].endpoint.address, local_host);
ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]);
const auto unregister_res = main_store->UnregisterReplica(replicas[0]);
ASSERT_TRUE(unregister_res);
replica_infos = main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 1);
ASSERT_EQ(replica_infos[0].name, replicas[1]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[1]);
main_store.reset();
auto other_main_store = std::make_unique<memgraph::storage::Storage>(main_config);
replica_infos = other_main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 1);
ASSERT_EQ(replica_infos[0].name, replicas[1]);
ASSERT_EQ(replica_infos[0].endpoint.address, local_host);
ASSERT_EQ(replica_infos[0].endpoint.port, ports[1]);
}
TEST_F(ReplicationTest, AddingInvalidReplica) {
memgraph::storage::Storage main_store(configuration);
ASSERT_TRUE(main_store
.RegisterReplica("REPLICA", memgraph::io::network::Endpoint{local_host, ports[0]},
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::CONNECTION_FAILED);
}