Decouple pure replication state from storage [part 1] (#1325)

A major refactor to decouple replication state from storage.
ATM it is still owned by storage but a following part should fix that.
This commit is contained in:
Gareth Andrew Lloyd 2023-10-10 11:44:19 +01:00 committed by GitHub
parent 7fbf5857f2
commit d278a33f31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 1242 additions and 907 deletions

View File

@ -21,6 +21,7 @@ add_subdirectory(audit)
add_subdirectory(dbms)
add_subdirectory(flags)
add_subdirectory(distributed)
add_subdirectory(replication)
string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)

View File

@ -4,4 +4,5 @@ find_package(ZLIB REQUIRED)
# STATIC library used to store key-value pairs
add_library(mg-kvstore STATIC kvstore.cpp)
add_library(mg::kvstore ALIAS mg-kvstore)
target_link_libraries(mg-kvstore stdc++fs mg-utils rocksdb BZip2::BZip2 ZLIB::ZLIB gflags)

View File

@ -19,7 +19,7 @@ struct InterpreterConfig {
bool allow_load_csv{true};
} query;
// The same as \ref memgraph::storage::replication::ReplicationClientConfig
// The same as \ref memgraph::replication::ReplicationClientConfig
std::chrono::seconds replication_replica_check_frequency{1};
std::string default_kafka_bootstrap_servers;

View File

@ -65,6 +65,7 @@
#include "query/stream/streams.hpp"
#include "query/trigger.hpp"
#include "query/typed_value.hpp"
#include "replication/config.hpp"
#include "spdlog/spdlog.h"
#include "storage/v2/disk/storage.hpp"
#include "storage/v2/edge.hpp"
@ -72,7 +73,6 @@
#include "storage/v2/id_types.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/storage_error.hpp"
#include "storage/v2/storage_mode.hpp"
#include "utils/algorithm.hpp"
@ -98,6 +98,8 @@
#include "dbms/dbms_handler.hpp"
#include "query/auth_query_handler.hpp"
#include "query/interpreter_context.hpp"
#include "replication/state.hpp"
#include "storage/v2/replication/replication_handler.hpp"
namespace memgraph::metrics {
extern Event ReadQuery;
@ -144,7 +146,6 @@ constexpr auto kAlwaysFalse = false;
namespace {
template <typename T, typename K>
void Sort(std::vector<T, K> &vec) {
std::sort(vec.begin(), vec.end());
}
@ -156,13 +157,15 @@ void Sort(std::vector<TypedValue, K> &vec) {
}
// NOLINTNEXTLINE (misc-unused-parameters)
bool Same(const TypedValue &lv, const TypedValue &rv) {
[[maybe_unused]] bool Same(const TypedValue &lv, const TypedValue &rv) {
return TypedValue(lv).ValueString() == TypedValue(rv).ValueString();
}
// NOLINTNEXTLINE (misc-unused-parameters)
bool Same(const TypedValue &lv, const std::string &rv) { return std::string(TypedValue(lv).ValueString()) == rv; }
// NOLINTNEXTLINE (misc-unused-parameters)
bool Same(const std::string &lv, const TypedValue &rv) { return lv == std::string(TypedValue(rv).ValueString()); }
[[maybe_unused]] bool Same(const std::string &lv, const TypedValue &rv) {
return lv == std::string(TypedValue(rv).ValueString());
}
// NOLINTNEXTLINE (misc-unused-parameters)
bool Same(const std::string &lv, const std::string &rv) { return lv == rv; }
@ -248,24 +251,40 @@ bool IsAllShortestPathsQuery(const std::vector<memgraph::query::Clause *> &claus
return false;
}
inline auto convertToReplicationMode(const ReplicationQuery::SyncMode &sync_mode) -> replication::ReplicationMode {
switch (sync_mode) {
case ReplicationQuery::SyncMode::ASYNC: {
return replication::ReplicationMode::ASYNC;
}
case ReplicationQuery::SyncMode::SYNC: {
return replication::ReplicationMode::SYNC;
}
}
// TODO: C++23 std::unreachable()
return replication::ReplicationMode::ASYNC;
}
class ReplQueryHandler final : public query::ReplicationQueryHandler {
public:
explicit ReplQueryHandler(storage::Storage *db) : db_(db) {}
explicit ReplQueryHandler(storage::Storage *db) : db_(db), handler_{db_->repl_state_, *db_} {}
/// @throw QueryRuntimeException if an error ocurred.
void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role, std::optional<int64_t> port) override {
if (replication_role == ReplicationQuery::ReplicationRole::MAIN) {
if (!db_->SetMainReplicationRole()) {
if (!handler_.SetReplicationRoleMain()) {
throw QueryRuntimeException("Couldn't set role to main!");
}
} else {
if (!port || *port < 0 || *port > std::numeric_limits<uint16_t>::max()) {
throw QueryRuntimeException("Port number invalid!");
}
if (!db_->SetReplicaRole(storage::replication::ReplicationServerConfig{
.ip_address = storage::replication::kDefaultReplicationServerIp,
.port = static_cast<uint16_t>(*port),
})) {
auto const config = memgraph::replication::ReplicationServerConfig{
.ip_address = memgraph::replication::kDefaultReplicationServerIp,
.port = static_cast<uint16_t>(*port),
};
if (!handler_.SetReplicationRoleReplica(config)) {
throw QueryRuntimeException("Couldn't set role to replica!");
}
}
@ -273,10 +292,10 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
ReplicationQuery::ReplicationRole ShowReplicationRole() const override {
switch (db_->GetReplicationRole()) {
case storage::replication::ReplicationRole::MAIN:
switch (handler_.GetRole()) {
case memgraph::replication::ReplicationRole::MAIN:
return ReplicationQuery::ReplicationRole::MAIN;
case storage::replication::ReplicationRole::REPLICA:
case memgraph::replication::ReplicationRole::REPLICA:
return ReplicationQuery::ReplicationRole::REPLICA;
}
throw QueryRuntimeException("Couldn't show replication role - invalid role set!");
@ -286,39 +305,29 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode,
const std::chrono::seconds replica_check_frequency) override {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
if (handler_.IsReplica()) {
// replica can't register another replica
throw QueryRuntimeException("Replica can't register another replica!");
}
if (name == storage::replication::kReservedReplicationRoleName) {
if (name == memgraph::replication::kReservedReplicationRoleName) {
throw QueryRuntimeException("This replica name is reserved and can not be used as replica name!");
}
storage::replication::ReplicationMode repl_mode;
switch (sync_mode) {
case ReplicationQuery::SyncMode::ASYNC: {
repl_mode = storage::replication::ReplicationMode::ASYNC;
break;
}
case ReplicationQuery::SyncMode::SYNC: {
repl_mode = storage::replication::ReplicationMode::SYNC;
break;
}
}
auto repl_mode = convertToReplicationMode(sync_mode);
auto maybe_ip_and_port =
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, storage::replication::kDefaultReplicationPort);
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, memgraph::replication::kDefaultReplicationPort);
if (maybe_ip_and_port) {
auto [ip, port] = *maybe_ip_and_port;
auto ret = db_->RegisterReplica(
storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
storage::replication::ReplicationClientConfig{.name = name,
.mode = repl_mode,
.ip_address = ip,
.port = port,
.replica_check_frequency = replica_check_frequency,
.ssl = std::nullopt});
auto config = replication::ReplicationClientConfig{.name = name,
.mode = repl_mode,
.ip_address = ip,
.port = port,
.replica_check_frequency = replica_check_frequency,
.ssl = std::nullopt};
using storage::RegistrationMode;
auto ret = handler_.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, config);
if (ret.HasError()) {
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));
}
@ -327,20 +336,26 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
}
}
/// @throw QueryRuntimeException if an error ocurred.
void DropReplica(const std::string &replica_name) override {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
// replica can't unregister a replica
throw QueryRuntimeException("Replica can't unregister a replica!");
}
if (!db_->UnregisterReplica(replica_name)) {
throw QueryRuntimeException(fmt::format("Couldn't unregister the replica '{}'", replica_name));
/// @throw QueryRuntimeException if an error occurred.
void DropReplica(std::string_view replica_name) override {
auto const result = handler_.UnregisterReplica(replica_name);
switch (result) {
using enum memgraph::storage::UnregisterReplicaResult;
case NOT_MAIN:
throw QueryRuntimeException("Replica can't unregister a replica!");
case COULD_NOT_BE_PERSISTED:
[[fallthrough]];
case CAN_NOT_UNREGISTER:
throw QueryRuntimeException(fmt::format("Couldn't unregister the replica '{}'", replica_name));
case SUCCESS:
break;
}
}
using Replica = ReplicationQueryHandler::Replica;
std::vector<Replica> ShowReplicas() const override {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
auto const &replState = db_->repl_state_;
if (replState.IsReplica()) {
// replica can't show registered replicas (it shouldn't have any)
throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!");
}
@ -354,10 +369,10 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
replica.name = repl_info.name;
replica.socket_address = repl_info.endpoint.SocketAddress();
switch (repl_info.mode) {
case storage::replication::ReplicationMode::SYNC:
case memgraph::replication::ReplicationMode::SYNC:
replica.sync_mode = ReplicationQuery::SyncMode::SYNC;
break;
case storage::replication::ReplicationMode::ASYNC:
case memgraph::replication::ReplicationMode::ASYNC:
replica.sync_mode = ReplicationQuery::SyncMode::ASYNC;
break;
}
@ -390,6 +405,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
private:
storage::Storage *db_;
storage::ReplicationHandler handler_;
};
/// returns false if the replication role can't be set
@ -1370,18 +1386,19 @@ bool IsWriteQueryOnMainMemoryReplica(storage::Storage *storage,
const query::plan::ReadWriteTypeChecker::RWType query_type) {
if (auto storage_mode = storage->GetStorageMode(); storage_mode == storage::StorageMode::IN_MEMORY_ANALYTICAL ||
storage_mode == storage::StorageMode::IN_MEMORY_TRANSACTIONAL) {
return (storage->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) &&
(query_type == RWType::W || query_type == RWType::RW);
auto const &replState = storage->repl_state_;
return replState.IsReplica() && (query_type == RWType::W || query_type == RWType::RW);
}
return false;
}
storage::replication::ReplicationRole GetReplicaRole(storage::Storage *storage) {
bool IsReplica(storage::Storage *storage) {
if (auto storage_mode = storage->GetStorageMode(); storage_mode == storage::StorageMode::IN_MEMORY_ANALYTICAL ||
storage_mode == storage::StorageMode::IN_MEMORY_TRANSACTIONAL) {
return storage->GetReplicationRole();
auto const &replState = storage->repl_state_;
return replState.IsReplica();
}
return storage::replication::ReplicationRole::MAIN;
return false;
}
} // namespace
@ -2784,7 +2801,7 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
std::move(parsed_query.required_privileges),
[storage](AnyStream * /*stream*/, std::optional<int> /*n*/) -> std::optional<QueryHandlerResult> {
auto *mem_storage = static_cast<storage::InMemoryStorage *>(storage);
if (auto maybe_error = mem_storage->CreateSnapshot({}); maybe_error.HasError()) {
if (auto maybe_error = mem_storage->CreateSnapshot(storage->repl_state_, {}); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case storage::InMemoryStorage::CreateSnapshotError::DisabledForReplica:
throw utils::BasicException(
@ -3328,7 +3345,7 @@ PreparedQuery PrepareMultiDatabaseQuery(ParsedQuery parsed_query, CurrentDB &cur
}
// TODO: Remove once replicas support multi-tenant replication
if (!current_db.db_acc_) throw DatabaseContextRequiredException("Multi database queries require a defined database.");
if (GetReplicaRole(current_db.db_acc_->get()->storage()) == storage::replication::ReplicationRole::REPLICA) {
if (IsReplica(current_db.db_acc_->get()->storage())) {
throw QueryException("Query forbidden on the replica!");
}
@ -3471,7 +3488,8 @@ PreparedQuery PrepareShowDatabasesQuery(ParsedQuery parsed_query, CurrentDB &cur
throw QueryException("Trying to use enterprise feature without a valid license.");
}
// TODO: Remove once replicas support multi-tenant replication
if (GetReplicaRole(storage) == storage::replication::ReplicationRole::REPLICA) {
auto &replState = storage->repl_state_;
if (replState.IsReplica()) {
throw QueryException("SHOW DATABASES forbidden on the replica!");
}

View File

@ -95,7 +95,7 @@ class ReplicationQueryHandler {
const std::chrono::seconds replica_check_frequency) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual void DropReplica(const std::string &replica_name) = 0;
virtual void DropReplica(std::string_view replica_name) = 0;
/// @throw QueryRuntimeException if an error ocurred.
virtual std::vector<Replica> ShowReplicas() const = 0;

View File

@ -1940,7 +1940,7 @@ void mgp_vertex_destroy(mgp_vertex *v) { DeleteRawMgpObject(v); }
mgp_error mgp_vertex_equal(mgp_vertex *v1, mgp_vertex *v2, int *result) {
// NOLINTNEXTLINE(clang-diagnostic-unevaluated-expression)
static_assert(noexcept(*result = *v1 == *v2 ? 1 : 0));
static_assert(noexcept(*v1 == *v2));
*result = *v1 == *v2 ? 1 : 0;
return mgp_error::MGP_ERROR_NO_ERROR;
}
@ -2313,7 +2313,7 @@ void mgp_edge_destroy(mgp_edge *e) { DeleteRawMgpObject(e); }
mgp_error mgp_edge_equal(mgp_edge *e1, mgp_edge *e2, int *result) {
// NOLINTNEXTLINE(clang-diagnostic-unevaluated-expression)
static_assert(noexcept(*result = *e1 == *e2 ? 1 : 0));
static_assert(noexcept(*e1 == *e2));
*result = *e1 == *e2 ? 1 : 0;
return mgp_error::MGP_ERROR_NO_ERROR;
}

View File

@ -0,0 +1,24 @@
add_library(mg-replication STATIC)
add_library(mg::replication ALIAS mg-replication)
target_sources(mg-replication
PUBLIC
include/replication/state.hpp
include/replication/epoch.hpp
include/replication/config.hpp
include/replication/mode.hpp
include/replication/role.hpp
include/replication/status.hpp
PRIVATE
state.cpp
epoch.cpp
config.cpp
status.cpp
)
target_include_directories(mg-replication PUBLIC include)
find_package(fmt REQUIRED)
target_link_libraries(mg-replication
PUBLIC mg::utils mg::kvstore lib::json
PRIVATE fmt::fmt
)

View File

@ -0,0 +1,11 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "replication/config.hpp"

12
src/replication/epoch.cpp Normal file
View File

@ -0,0 +1,12 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "replication/epoch.hpp"

View File

@ -12,12 +12,17 @@
#pragma once
#include <chrono>
#include <cstdint>
#include <optional>
#include <string>
#include "replication/mode.hpp"
#include "storage/v2/replication/enums.hpp"
namespace memgraph::replication {
inline constexpr uint16_t kDefaultReplicationPort = 10000;
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
inline constexpr auto *kReservedReplicationRoleName{"__replication_role"};
namespace memgraph::storage::replication {
struct ReplicationClientConfig {
std::string name;
ReplicationMode mode;
@ -51,4 +56,4 @@ struct ReplicationServerConfig {
std::optional<SSL> ssl;
};
} // namespace memgraph::storage::replication
} // namespace memgraph::replication

View File

@ -0,0 +1,49 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <string>
#include <utility>
#include "utils/uuid.hpp"
namespace memgraph::replication {
struct ReplicationEpoch {
ReplicationEpoch() : id_(memgraph::utils::GenerateUUID()) {}
ReplicationEpoch(ReplicationEpoch const &) = delete;
ReplicationEpoch(ReplicationEpoch &&) = delete;
ReplicationEpoch &operator=(ReplicationEpoch const &) = delete;
ReplicationEpoch &operator=(ReplicationEpoch &&) = delete;
auto id() const -> std::string_view { return id_; }
auto NewEpoch() -> std::string { return std::exchange(id_, memgraph::utils::GenerateUUID()); }
auto SetEpoch(std::string new_epoch) -> std::string { return std::exchange(id_, std::move(new_epoch)); }
private:
// UUID to distinguish different main instance runs for replication process
// on SAME storage.
// Multiple instances can have same storage UUID and be MAIN at the same time.
// We cannot compare commit timestamps of those instances if one of them
// becomes the replica of the other so we use epoch_id_ as additional
// discriminating property.
// Example of this:
// We have 2 instances of the same storage, S1 and S2.
// S1 and S2 are MAIN and accept their own commits and write them to the WAL.
// At the moment when S1 commited a transaction with timestamp 20, and S2
// a different transaction with timestamp 15, we change S2's role to REPLICA
// and register it on S1.
// Without using the epoch_id, we don't know that S1 and S2 have completely
// different transactions, we think that the S2 is behind only by 5 commits.
std::string id_;
};
} // namespace memgraph::replication

View File

@ -0,0 +1,16 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
namespace memgraph::replication {
enum class ReplicationMode : std::uint8_t { SYNC, ASYNC };
}

View File

@ -0,0 +1,18 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
namespace memgraph::replication {
enum class ReplicationRole : uint8_t { MAIN, REPLICA };
}

View File

@ -0,0 +1,69 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <atomic>
#include <cstdint>
#include <variant>
#include <vector>
#include "kvstore/kvstore.hpp"
#include "replication/config.hpp"
#include "replication/epoch.hpp"
#include "replication/mode.hpp"
#include "replication/role.hpp"
#include "utils/result.hpp"
namespace memgraph::replication {
enum class RolePersisted : uint8_t { UNKNOWN_OR_NO, YES };
struct ReplicationState {
ReplicationState(std::optional<std::filesystem::path> durability_dir);
ReplicationState(ReplicationState const &) = delete;
ReplicationState(ReplicationState &&) = delete;
ReplicationState &operator=(ReplicationState const &) = delete;
ReplicationState &operator=(ReplicationState &&) = delete;
void SetRole(ReplicationRole role) { return replication_role_.store(role); }
auto GetRole() const -> ReplicationRole { return replication_role_.load(); }
bool IsMain() const { return replication_role_ == ReplicationRole::MAIN; }
bool IsReplica() const { return replication_role_ == ReplicationRole::REPLICA; }
auto GetEpoch() const -> const ReplicationEpoch & { return epoch_; }
auto GetEpoch() -> ReplicationEpoch & { return epoch_; }
enum class FetchReplicationError : uint8_t {
NOTHING_FETCHED,
PARSE_ERROR,
};
using ReplicationDataReplica = ReplicationServerConfig;
using ReplicationDataMain = std::vector<ReplicationClientConfig>;
using ReplicationData = std::variant<ReplicationDataMain, ReplicationDataReplica>;
using FetchReplicationResult = utils::BasicResult<FetchReplicationError, ReplicationData>;
auto FetchReplicationData() -> FetchReplicationResult;
bool ShouldPersist() const { return nullptr != durability_; }
bool TryPersistRoleMain();
bool TryPersistRoleReplica(const ReplicationServerConfig &config);
bool TryPersistUnregisterReplica(std::string_view &name);
bool TryPersistRegisteredReplica(const ReplicationClientConfig &config);
private:
ReplicationEpoch epoch_;
std::atomic<ReplicationRole> replication_role_{ReplicationRole::MAIN};
std::unique_ptr<kvstore::KVStore> durability_;
std::atomic<RolePersisted> role_persisted = RolePersisted::UNKNOWN_OR_NO;
};
} // namespace memgraph::replication

View File

@ -12,21 +12,16 @@
#pragma once
#include <chrono>
#include <compare>
#include <cstdint>
#include <optional>
#include <string>
#include <json/json.hpp>
#include "json/json.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
namespace memgraph::storage::replication {
inline constexpr auto *kReservedReplicationRoleName{"__replication_role"};
inline constexpr uint16_t kDefaultReplicationPort = 10000;
inline constexpr auto *kDefaultReplicationServerIp = "0.0.0.0";
#include "replication/config.hpp"
#include "replication/role.hpp"
namespace memgraph::replication {
struct ReplicationStatus {
std::string name;
std::string ip_address;
@ -40,6 +35,5 @@ struct ReplicationStatus {
};
nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status);
std::optional<ReplicationStatus> JSONToReplicationStatus(nlohmann::json &&data);
} // namespace memgraph::storage::replication
} // namespace memgraph::replication

144
src/replication/state.cpp Normal file
View File

@ -0,0 +1,144 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "replication/state.hpp"
#include "replication/status.hpp" //TODO: don't use status for durability
#include "utils/file.hpp"
constexpr auto kReplicationDirectory = std::string_view{"replication"};
namespace memgraph::replication {
ReplicationState::ReplicationState(std::optional<std::filesystem::path> durability_dir) {
if (!durability_dir) return;
auto repl_dir = *std::move(durability_dir);
repl_dir /= kReplicationDirectory;
utils::EnsureDirOrDie(repl_dir);
durability_ = std::make_unique<kvstore::KVStore>(std::move(repl_dir));
}
bool ReplicationState::TryPersistRoleReplica(const ReplicationServerConfig &config) {
if (!ShouldPersist()) return true;
// Only thing that matters here is the role saved as REPLICA and the listening port
auto data = ReplicationStatusToJSON(ReplicationStatus{.name = kReservedReplicationRoleName,
.ip_address = config.ip_address,
.port = config.port,
.sync_mode = ReplicationMode::SYNC,
.replica_check_frequency = std::chrono::seconds(0),
.ssl = std::nullopt,
.role = ReplicationRole::REPLICA});
if (durability_->Put(kReservedReplicationRoleName, data.dump())) {
role_persisted = RolePersisted::YES;
return true;
}
spdlog::error("Error when saving REPLICA replication role in settings.");
return false;
}
bool ReplicationState::TryPersistRoleMain() {
if (!ShouldPersist()) return true;
// Only thing that matters here is the role saved as MAIN
auto data = ReplicationStatusToJSON(ReplicationStatus{.name = kReservedReplicationRoleName,
.ip_address = "",
.port = 0,
.sync_mode = ReplicationMode::SYNC,
.replica_check_frequency = std::chrono::seconds(0),
.ssl = std::nullopt,
.role = ReplicationRole::MAIN});
if (durability_->Put(kReservedReplicationRoleName, data.dump())) {
role_persisted = RolePersisted::YES;
return true;
}
spdlog::error("Error when saving MAIN replication role in settings.");
return false;
}
bool ReplicationState::TryPersistUnregisterReplica(std::string_view &name) {
if (!ShouldPersist()) return true;
if (durability_->Delete(name)) return true;
spdlog::error("Error when removing replica {} from settings.", name);
return false;
}
auto ReplicationState::FetchReplicationData() -> FetchReplicationResult {
if (!ShouldPersist()) return FetchReplicationError::NOTHING_FETCHED;
const auto replication_data = durability_->Get(kReservedReplicationRoleName);
if (!replication_data.has_value()) {
return FetchReplicationError::NOTHING_FETCHED;
}
const auto maybe_replication_status = JSONToReplicationStatus(nlohmann::json::parse(*replication_data));
if (!maybe_replication_status.has_value()) {
return FetchReplicationError::PARSE_ERROR;
}
// To get here this must be the case
role_persisted = memgraph::replication::RolePersisted::YES;
const auto replication_status = *maybe_replication_status;
auto role = replication_status.role.value_or(ReplicationRole::MAIN);
switch (role) {
case ReplicationRole::REPLICA: {
return {ReplicationServerConfig{
.ip_address = kDefaultReplicationServerIp,
.port = replication_status.port,
}};
}
case ReplicationRole::MAIN: {
auto res = ReplicationState::ReplicationDataMain{};
res.reserve(durability_->Size() - 1);
for (const auto &[replica_name, replica_data] : *durability_) {
if (replica_name == kReservedReplicationRoleName) {
continue;
}
const auto maybe_replica_status = JSONToReplicationStatus(nlohmann::json::parse(replica_data));
if (!maybe_replica_status.has_value()) {
return FetchReplicationError::PARSE_ERROR;
}
auto replica_status = *maybe_replica_status;
if (replica_status.name != replica_name) {
return FetchReplicationError::PARSE_ERROR;
}
res.emplace_back(ReplicationClientConfig{
.name = replica_status.name,
.mode = replica_status.sync_mode,
.ip_address = replica_status.ip_address,
.port = replica_status.port,
.replica_check_frequency = replica_status.replica_check_frequency,
.ssl = replica_status.ssl,
});
}
return {std::move(res)};
}
}
}
bool ReplicationState::TryPersistRegisteredReplica(const ReplicationClientConfig &config) {
if (!ShouldPersist()) return true;
// If any replicas are persisted then Role must be persisted
if (role_persisted != RolePersisted::YES) {
DMG_ASSERT(IsMain(), "MAIN is expected");
if (!TryPersistRoleMain()) return false;
}
auto data = ReplicationStatusToJSON(ReplicationStatus{.name = config.name,
.ip_address = config.ip_address,
.port = config.port,
.sync_mode = config.mode,
.replica_check_frequency = config.replica_check_frequency,
.ssl = config.ssl,
.role = ReplicationRole::REPLICA});
if (durability_->Put(config.name, data.dump())) return true;
spdlog::error("Error when saving replica {} in settings.", config.name);
return false;
}
} // namespace memgraph::replication

View File

@ -8,24 +8,21 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "replication/status.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/replication/enums.hpp"
#include "fmt/format.h"
#include "utils/logging.hpp"
namespace {
inline constexpr auto *kReplicaName = "replica_name";
inline constexpr auto *kIpAddress = "replica_ip_address";
inline constexpr auto *kPort = "replica_port";
inline constexpr auto *kSyncMode = "replica_sync_mode";
inline constexpr auto *kCheckFrequency = "replica_check_frequency";
inline constexpr auto *kSSLKeyFile = "replica_ssl_key_file";
inline constexpr auto *kSSLCertFile = "replica_ssl_cert_file";
inline constexpr auto *kReplicationRole = "replication_role";
} // namespace
constexpr auto *kReplicaName = "replica_name";
constexpr auto *kIpAddress = "replica_ip_address";
constexpr auto *kPort = "replica_port";
constexpr auto *kSyncMode = "replica_sync_mode";
constexpr auto *kCheckFrequency = "replica_check_frequency";
constexpr auto *kSSLKeyFile = "replica_ssl_key_file";
constexpr auto *kSSLCertFile = "replica_ssl_cert_file";
constexpr auto *kReplicationRole = "replication_role";
namespace memgraph::storage::replication {
namespace memgraph::replication {
nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status) {
auto data = nlohmann::json::object();
@ -51,7 +48,6 @@ nlohmann::json ReplicationStatusToJSON(ReplicationStatus &&status) {
return data;
}
std::optional<ReplicationStatus> JSONToReplicationStatus(nlohmann::json &&data) {
ReplicationStatus replica_status;
@ -73,13 +69,13 @@ std::optional<ReplicationStatus> JSONToReplicationStatus(nlohmann::json &&data)
MG_ASSERT(key_file.is_null() == cert_file.is_null());
if (!key_file.is_null()) {
replica_status.ssl = replication::ReplicationClientConfig::SSL{};
replica_status.ssl = ReplicationClientConfig::SSL{};
data.at(kSSLKeyFile).get_to(replica_status.ssl->key_file);
data.at(kSSLCertFile).get_to(replica_status.ssl->cert_file);
}
if (data.find(kReplicationRole) != data.end()) {
replica_status.role = replication::ReplicationRole::MAIN;
replica_status.role = ReplicationRole::MAIN;
data.at(kReplicationRole).get_to(replica_status.role.value());
}
} catch (const nlohmann::json::type_error &exception) {
@ -92,4 +88,4 @@ std::optional<ReplicationStatus> JSONToReplicationStatus(nlohmann::json &&data)
return replica_status;
}
} // namespace memgraph::storage::replication
} // namespace memgraph::replication

View File

@ -36,13 +36,13 @@ add_library(mg-storage-v2 STATIC
replication/replication_server.cpp
replication/serialization.cpp
replication/slk.cpp
replication/replication_persistence_helper.cpp
replication/rpc.cpp
replication/replication.cpp
replication/replication_storage_state.cpp
replication/replication_handler.cpp
inmemory/replication/replication_server.cpp
inmemory/replication/replication_client.cpp
)
target_link_libraries(mg-storage-v2 Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk)
target_link_libraries(mg-storage-v2 mg::replication Threads::Threads mg-utils gflags absl::flat_hash_map mg-rpc mg-slk)
# Until we get LTO there is an advantage to do some unity builds
set_target_properties(mg-storage-v2

View File

@ -346,19 +346,21 @@ class DiskStorage final : public Storage {
void FreeMemory(std::unique_lock<utils::ResourceLock> /*lock*/) override {}
void EstablishNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); }
void PrepareForNewEpoch(std::string prev_epoch) override {
throw utils::BasicException("Disk storage mode does not support replication.");
}
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
EdgeImportMode edge_import_status_{EdgeImportMode::INACTIVE};
std::unique_ptr<EdgeImportModeCache> edge_import_mode_cache_{nullptr};
auto CreateReplicationClient(replication::ReplicationClientConfig const &config)
auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config)
-> std::unique_ptr<ReplicationClient> override {
throw utils::BasicException("Disk storage mode does not support replication.");
}
auto CreateReplicationServer(const replication::ReplicationServerConfig &config)
auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig &config)
-> std::unique_ptr<ReplicationServer> override {
throw utils::BasicException("Disk storage mode does not support replication.");
}

View File

@ -24,6 +24,7 @@
#include <utility>
#include <vector>
#include "replication/epoch.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
@ -210,7 +211,7 @@ void RecoverIndicesAndConstraints(const RecoveredIndicesAndConstraints &indices_
std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
std::string *epoch_id,
memgraph::replication::ReplicationEpoch &epoch,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
@ -263,7 +264,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
recovery_info = recovered_snapshot->recovery_info;
indices_constraints = std::move(recovered_snapshot->indices_constraints);
snapshot_timestamp = recovered_snapshot->snapshot_info.start_timestamp;
*epoch_id = std::move(recovered_snapshot->snapshot_info.epoch_id);
epoch.SetEpoch(std::move(recovered_snapshot->snapshot_info.epoch_id));
if (!utils::DirExists(wal_directory)) {
const auto par_exec_info = config.durability.allow_parallel_index_creation
@ -308,7 +309,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
// UUID used for durability is the UUID of the last WAL file.
// Same for the epoch id.
*uuid = std::move(wal_files.back().uuid);
*epoch_id = std::move(wal_files.back().epoch_id);
epoch.SetEpoch(std::move(wal_files.back().epoch_id));
}
auto maybe_wal_files = GetWalFiles(wal_directory, *uuid);
@ -364,7 +365,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
}
previous_seq_num = wal_file.seq_num;
if (wal_file.epoch_id != *epoch_id) {
if (wal_file.epoch_id != epoch.id()) {
// This way we skip WALs finalized only because of role change.
// We can also set the last timestamp to 0 if last loaded timestamp
// is nullopt as this can only happen if the WAL file with seq = 0
@ -372,7 +373,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
if (last_loaded_timestamp) {
epoch_history->emplace_back(wal_file.epoch_id, *last_loaded_timestamp);
}
*epoch_id = std::move(wal_file.epoch_id);
epoch.SetEpoch(std::move(wal_file.epoch_id));
}
try {
auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper,

View File

@ -18,6 +18,7 @@
#include <string>
#include <variant>
#include "replication/epoch.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/durability/metadata.hpp"
@ -109,7 +110,7 @@ void RecoverIndicesAndConstraints(
/// @throw std::bad_alloc
std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, std::string *uuid,
std::string *epoch_id,
memgraph::replication::ReplicationEpoch &epoch,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -22,7 +22,6 @@ static const std::string kSnapshotDirectory{"snapshots"};
static const std::string kWalDirectory{"wal"};
static const std::string kBackupDirectory{".backup"};
static const std::string kLockFile{".lock"};
static const std::string kReplicationDirectory{"replication"};
// This is the prefix used for Snapshot and WAL filenames. It is a timestamp
// format that equals to: YYYYmmddHHMMSSffffff

View File

@ -1714,7 +1714,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
const std::filesystem::path &wal_directory, uint64_t snapshot_retention_count,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config, const std::string &uuid,
const std::string_view epoch_id, const std::deque<std::pair<std::string, uint64_t>> &epoch_history,
const memgraph::replication::ReplicationEpoch &epoch,
const std::deque<std::pair<std::string, uint64_t>> &epoch_history,
utils::FileRetainer *file_retainer) {
// Ensure that the storage directory exists.
utils::EnsureDirOrDie(snapshot_directory);
@ -2044,7 +2045,7 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
offset_metadata = snapshot.GetPosition();
snapshot.WriteMarker(Marker::SECTION_METADATA);
snapshot.WriteString(uuid);
snapshot.WriteString(epoch_id);
snapshot.WriteString(epoch.id());
snapshot.WriteUint(transaction->start_timestamp);
snapshot.WriteUint(edges_count);
snapshot.WriteUint(vertices_count);

View File

@ -15,6 +15,7 @@
#include <filesystem>
#include <string>
#include "replication/epoch.hpp"
#include "storage/v2/config.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/durability/metadata.hpp"
@ -71,7 +72,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
const std::filesystem::path &wal_directory, uint64_t snapshot_retention_count,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, const Config &config, const std::string &uuid,
std::string_view epoch_id, const std::deque<std::pair<std::string, uint64_t>> &epoch_history,
const memgraph::replication::ReplicationEpoch &epoch,
const std::deque<std::pair<std::string, uint64_t>> &epoch_history,
utils::FileRetainer *file_retainer);
} // namespace memgraph::storage::durability

View File

@ -103,8 +103,9 @@ uint64_t ReplicateCurrentWal(CurrentWalHandler &stream, durability::WalFile cons
////// ReplicationClient //////
InMemoryReplicationClient::InMemoryReplicationClient(InMemoryStorage *storage,
const replication::ReplicationClientConfig &config)
: ReplicationClient{storage, config} {}
const memgraph::replication::ReplicationClientConfig &config,
const memgraph::replication::ReplicationEpoch *epoch)
: ReplicationClient{storage, config, epoch} {}
void InMemoryReplicationClient::RecoverReplica(uint64_t replica_commit) {
spdlog::debug("Starting replica recover");

View File

@ -18,7 +18,8 @@ class InMemoryStorage;
class InMemoryReplicationClient : public ReplicationClient {
public:
InMemoryReplicationClient(InMemoryStorage *storage, const replication::ReplicationClientConfig &config);
InMemoryReplicationClient(InMemoryStorage *storage, const memgraph::replication::ReplicationClientConfig &config,
const memgraph::replication::ReplicationEpoch *epoch);
protected:
void RecoverReplica(uint64_t replica_commit) override;

View File

@ -34,8 +34,9 @@ std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder
} // namespace
InMemoryReplicationServer::InMemoryReplicationServer(InMemoryStorage *storage,
const replication::ReplicationServerConfig &config)
: ReplicationServer{config}, storage_(storage) {
const memgraph::replication::ReplicationServerConfig &config,
memgraph::replication::ReplicationEpoch *repl_epoch)
: ReplicationServer{config}, storage_(storage), repl_epoch_{repl_epoch} {
rpc_server_.Register<replication::HeartbeatRpc>([this](auto *req_reader, auto *res_builder) {
spdlog::debug("Received HeartbeatRpc");
this->HeartbeatHandler(req_reader, res_builder);
@ -66,8 +67,8 @@ InMemoryReplicationServer::InMemoryReplicationServer(InMemoryStorage *storage,
void InMemoryReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::HeartbeatReq req;
slk::Load(&req, req_reader);
replication::HeartbeatRes res{true, storage_->replication_state_.last_commit_timestamp_.load(),
storage_->replication_state_.GetEpoch().id};
replication::HeartbeatRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load(),
std::string{repl_epoch_->id()}};
slk::Save(res, res_builder);
}
@ -80,13 +81,14 @@ void InMemoryReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk
auto maybe_epoch_id = decoder.ReadString();
MG_ASSERT(maybe_epoch_id, "Invalid replication message");
if (*maybe_epoch_id != storage_->replication_state_.GetEpoch().id) {
storage_->replication_state_.AppendEpoch(*maybe_epoch_id);
auto &repl_storage_state = storage_->repl_storage_state_;
if (*maybe_epoch_id != repl_epoch_->id()) {
auto prev_epoch = repl_epoch_->SetEpoch(*maybe_epoch_id);
repl_storage_state.AddEpochToHistoryForce(prev_epoch);
}
if (storage_->wal_file_) {
if (req.seq_num > storage_->wal_file_->SequenceNumber() ||
*maybe_epoch_id != storage_->replication_state_.GetEpoch().id) {
if (req.seq_num > storage_->wal_file_->SequenceNumber() || *maybe_epoch_id != repl_epoch_->id()) {
storage_->wal_file_->FinalizeWal();
storage_->wal_file_.reset();
storage_->wal_seq_num_ = req.seq_num;
@ -99,7 +101,7 @@ void InMemoryReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk
storage_->wal_seq_num_ = req.seq_num;
}
if (req.previous_commit_timestamp != storage_->replication_state_.last_commit_timestamp_.load()) {
if (req.previous_commit_timestamp != repl_storage_state.last_commit_timestamp_.load()) {
// Empty the stream
bool transaction_complete = false;
while (!transaction_complete) {
@ -109,7 +111,7 @@ void InMemoryReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk
delta.type, durability::kVersion); // TODO: Check if we are always using the latest version when replicating
}
replication::AppendDeltasRes res{false, storage_->replication_state_.last_commit_timestamp_.load()};
replication::AppendDeltasRes res{false, repl_storage_state.last_commit_timestamp_.load()};
slk::Save(res, res_builder);
return;
}
@ -117,7 +119,7 @@ void InMemoryReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk
ReadAndApplyDelta(storage_, &decoder,
durability::kVersion); // TODO: Check if we are always using the latest version when replicating
replication::AppendDeltasRes res{true, storage_->replication_state_.last_commit_timestamp_.load()};
replication::AppendDeltasRes res{true, repl_storage_state.last_commit_timestamp_.load()};
slk::Save(res, res_builder);
spdlog::debug("Replication recovery from append deltas finished, replica is now up to date!");
}
@ -147,17 +149,14 @@ void InMemoryReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Bu
std::make_unique<InMemoryLabelPropertyIndex>(&storage_->indices_, storage_->config_);
try {
spdlog::debug("Loading snapshot");
auto &epoch =
storage_->replication_state_
.GetEpoch(); // This needs to be a non-const ref since we are updating it in LoadSnapshot TODO fix
auto recovered_snapshot = durability::LoadSnapshot(
*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_, &storage_->replication_state_.history,
*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_, &storage_->repl_storage_state_.history,
storage_->name_id_mapper_.get(), &storage_->edge_count_, storage_->config_);
spdlog::debug("Snapshot loaded successfully");
// If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost
storage_->uuid_ = std::move(recovered_snapshot.snapshot_info.uuid);
epoch.id = std::move(recovered_snapshot.snapshot_info.epoch_id);
repl_epoch_->SetEpoch(std::move(recovered_snapshot.snapshot_info.epoch_id));
const auto &recovery_info = recovered_snapshot.recovery_info;
storage_->vertex_id_ = recovery_info.next_vertex_id;
storage_->edge_id_ = recovery_info.next_edge_id;
@ -171,7 +170,7 @@ void InMemoryReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Bu
}
storage_guard.unlock();
replication::SnapshotRes res{true, storage_->replication_state_.last_commit_timestamp_.load()};
replication::SnapshotRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load()};
slk::Save(res, res_builder);
spdlog::trace("Deleting old snapshot files due to snapshot recovery.");
@ -209,10 +208,10 @@ void InMemoryReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Bu
utils::EnsureDirOrDie(storage_->wal_directory_);
for (auto i = 0; i < wal_file_number; ++i) {
LoadWal(storage_, &decoder);
LoadWal(storage_, *repl_epoch_, &decoder);
}
replication::WalFilesRes res{true, storage_->replication_state_.last_commit_timestamp_.load()};
replication::WalFilesRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load()};
slk::Save(res, res_builder);
spdlog::debug("Replication recovery from WAL files ended successfully, replica is now up to date!");
}
@ -225,14 +224,15 @@ void InMemoryReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::
utils::EnsureDirOrDie(storage_->wal_directory_);
LoadWal(storage_, &decoder);
LoadWal(storage_, *repl_epoch_, &decoder);
replication::CurrentWalRes res{true, storage_->replication_state_.last_commit_timestamp_.load()};
replication::CurrentWalRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load()};
slk::Save(res, res_builder);
spdlog::debug("Replication recovery from current WAL ended successfully, replica is now up to date!");
}
void InMemoryReplicationServer::LoadWal(InMemoryStorage *storage, replication::Decoder *decoder) {
void InMemoryReplicationServer::LoadWal(InMemoryStorage *storage, memgraph::replication::ReplicationEpoch &epoch,
replication::Decoder *decoder) {
const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory;
utils::EnsureDir(temp_wal_directory);
auto maybe_wal_path = decoder->ReadFile(temp_wal_directory);
@ -244,8 +244,9 @@ void InMemoryReplicationServer::LoadWal(InMemoryStorage *storage, replication::D
storage->uuid_ = wal_info.uuid;
}
if (wal_info.epoch_id != storage->replication_state_.GetEpoch().id) {
storage->replication_state_.AppendEpoch(wal_info.epoch_id);
if (wal_info.epoch_id != epoch.id()) {
auto prev_epoch = epoch.SetEpoch(wal_info.epoch_id);
storage->repl_storage_state_.AddEpochToHistoryForce(prev_epoch);
}
if (storage->wal_file_) {
@ -280,7 +281,7 @@ void InMemoryReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::B
replication::TimestampReq req;
slk::Load(&req, req_reader);
replication::TimestampRes res{true, storage_->replication_state_.last_commit_timestamp_.load()};
replication::TimestampRes res{true, storage_->repl_storage_state_.last_commit_timestamp_.load()};
slk::Save(res, res_builder);
}
@ -311,7 +312,7 @@ uint64_t InMemoryReplicationServer::ReadAndApplyDelta(InMemoryStorage *storage,
};
uint64_t applied_deltas = 0;
auto max_commit_timestamp = storage->replication_state_.last_commit_timestamp_.load();
auto max_commit_timestamp = storage->repl_storage_state_.last_commit_timestamp_.load();
for (bool transaction_complete = false; !transaction_complete; ++applied_deltas) {
const auto [timestamp, delta] = ReadDelta(decoder);
@ -621,7 +622,7 @@ uint64_t InMemoryReplicationServer::ReadAndApplyDelta(InMemoryStorage *storage,
if (commit_timestamp_and_accessor) throw utils::BasicException("Did not finish the transaction!");
storage->replication_state_.last_commit_timestamp_ = max_commit_timestamp;
storage->repl_storage_state_.last_commit_timestamp_ = max_commit_timestamp;
spdlog::debug("Applied {} deltas", applied_deltas);
return applied_deltas;

View File

@ -11,6 +11,7 @@
#pragma once
#include "replication/epoch.hpp"
#include "storage/v2/replication/replication_server.hpp"
#include "storage/v2/replication/serialization.hpp"
@ -20,7 +21,9 @@ class InMemoryStorage;
class InMemoryReplicationServer : public ReplicationServer {
public:
explicit InMemoryReplicationServer(InMemoryStorage *storage, const replication::ReplicationServerConfig &config);
explicit InMemoryReplicationServer(InMemoryStorage *storage,
const memgraph::replication::ReplicationServerConfig &config,
memgraph::replication::ReplicationEpoch *repl_epoch);
private:
// RPC handlers
@ -36,11 +39,14 @@ class InMemoryReplicationServer : public ReplicationServer {
void TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder);
static void LoadWal(InMemoryStorage *storage, replication::Decoder *decoder);
static void LoadWal(InMemoryStorage *storage, memgraph::replication::ReplicationEpoch &epoch,
replication::Decoder *decoder);
static uint64_t ReadAndApplyDelta(InMemoryStorage *storage, durability::BaseDecoder *decoder, uint64_t version);
InMemoryStorage *storage_;
memgraph::replication::ReplicationEpoch *repl_epoch_;
};
} // namespace memgraph::storage

View File

@ -18,6 +18,7 @@
#include "storage/v2/inmemory/replication/replication_client.hpp"
#include "storage/v2/inmemory/replication/replication_server.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
#include "storage/v2/replication/replication_handler.hpp"
#include "utils/resource_lock.hpp"
namespace memgraph::storage {
@ -58,17 +59,18 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
"process!",
config_.durability.storage_directory);
}
auto &repl_state = repl_state_;
if (config_.durability.recover_on_startup) {
auto &epoch = replication_state_.GetEpoch();
auto info = durability::RecoverData(snapshot_directory_, wal_directory_, &uuid_, &epoch.id,
&replication_state_.history, &vertices_, &edges_, &edge_count_,
auto &epoch = repl_state.GetEpoch();
auto info = durability::RecoverData(snapshot_directory_, wal_directory_, &uuid_, epoch,
&repl_storage_state_.history, &vertices_, &edges_, &edge_count_,
name_id_mapper_.get(), &indices_, &constraints_, config_, &wal_seq_num_);
if (info) {
vertex_id_ = info->next_vertex_id;
edge_id_ = info->next_edge_id;
timestamp_ = std::max(timestamp_, info->next_timestamp);
if (info->last_commit_timestamp) {
replication_state_.last_commit_timestamp_ = *info->last_commit_timestamp;
repl_storage_state_.last_commit_timestamp_ = *info->last_commit_timestamp;
}
}
} else if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED ||
@ -102,7 +104,8 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
}
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED) {
snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] {
if (auto maybe_error = this->CreateSnapshot({true}); maybe_error.HasError()) {
auto const &repl_state = repl_state_;
if (auto maybe_error = this->CreateSnapshot(repl_state, {true}); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case CreateSnapshotError::DisabledForReplica:
spdlog::warn(
@ -131,19 +134,14 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
if (config_.durability.restore_replication_state_on_startup) {
spdlog::info("Replication configuration will be stored and will be automatically restored in case of a crash.");
RestoreReplicationRole();
if (replication_state_.GetRole() == replication::ReplicationRole::MAIN) {
RestoreReplicas();
}
ReplicationHandler{repl_state, *this}.RestoreReplication();
} else {
spdlog::warn(
"Replication configuration will NOT be stored. When the server restarts, replication state will be "
"forgotten.");
}
if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED &&
replication_state_.GetRole() == replication::ReplicationRole::MAIN) {
if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED && repl_state.IsMain()) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider "
"enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because "
@ -158,8 +156,8 @@ InMemoryStorage::~InMemoryStorage() {
gc_runner_.Stop();
}
{
// Clear replication data
replication_state_.Reset();
// Stop replication (Stop all clients or stop the REPLICA server)
repl_storage_state_.Reset();
}
if (wal_file_) {
wal_file_->FinalizeWal();
@ -169,7 +167,8 @@ InMemoryStorage::~InMemoryStorage() {
snapshot_runner_.Stop();
}
if (config_.durability.snapshot_on_exit) {
if (auto maybe_error = this->CreateSnapshot({false}); maybe_error.HasError()) {
auto const &repl_state = repl_state_;
if (auto maybe_error = this->CreateSnapshot(repl_state, {false}); maybe_error.HasError()) {
switch (maybe_error.GetError()) {
case CreateSnapshotError::DisabledForReplica:
spdlog::warn(utils::MessageWithLink("Snapshots are disabled for replicas.", "https://memgr.ph/replication"));
@ -653,6 +652,7 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
auto const &replState = mem_storage->repl_state_;
if (!transaction_.md_deltas.empty()) {
// This is usually done by the MVCC, but it does not handle the metadata deltas
transaction_.EnsureCommitTimestampExists();
@ -672,8 +672,7 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
// modifications before they are written to disk.
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
could_replicate_all_sync_replicas = mem_storage->AppendToWalDataDefinition(transaction_, *commit_timestamp_);
// Take committed_transactions lock while holding the engine lock to
// make sure that committed transactions are sorted by the commit
@ -685,10 +684,9 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
// Replica can only update the last commit timestamp with
// the commits received from main.
if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
mem_storage->replication_state_.last_commit_timestamp_.store(*commit_timestamp_);
mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_);
}
// Release engine lock because we don't have to hold it anymore
// and emplace back could take a long time.
@ -772,8 +770,7 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
// modifications before they are written to disk.
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
could_replicate_all_sync_replicas =
mem_storage->AppendToWalDataManipulation(transaction_, *commit_timestamp_);
}
@ -788,10 +785,9 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
// Replica can only update the last commit timestamp with
// the commits received from main.
if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
if (replState.IsMain() || desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
mem_storage->replication_state_.last_commit_timestamp_.store(*commit_timestamp_);
mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_);
}
// Release engine lock because we don't have to hold it anymore
// and emplace back could take a long time.
@ -1159,7 +1155,8 @@ Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, S
// of any query on replica to the last commited transaction
// which is timestamp_ as only commit of transaction with writes
// can change the value of it.
if (replication_state_.GetRole() == replication::ReplicationRole::REPLICA) {
auto const &replState = repl_state_;
if (replState.IsReplica()) {
start_timestamp = timestamp_;
} else {
start_timestamp = timestamp_++;
@ -1494,12 +1491,12 @@ StorageInfo InMemoryStorage::GetInfo() const {
utils::GetDirDiskUsage(config_.durability.storage_directory)};
}
bool InMemoryStorage::InitializeWalFile() {
bool InMemoryStorage::InitializeWalFile(memgraph::replication::ReplicationEpoch &epoch) {
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL)
return false;
if (!wal_file_) {
wal_file_.emplace(wal_directory_, uuid_, replication_state_.GetEpoch().id, config_.items, name_id_mapper_.get(),
wal_seq_num_++, &file_retainer_);
wal_file_.emplace(wal_directory_, uuid_, epoch.id(), config_.items, name_id_mapper_.get(), wal_seq_num_++,
&file_retainer_);
}
return true;
}
@ -1524,14 +1521,15 @@ void InMemoryStorage::FinalizeWalFile() {
}
bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction, uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) {
auto &replState = repl_state_;
if (!InitializeWalFile(replState.GetEpoch())) {
return true;
}
// Traverse deltas and append them to the WAL file.
// A single transaction will always be contained in a single WAL file.
auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire);
replication_state_.InitializeTransaction(wal_file_->SequenceNumber());
repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber());
auto append_deltas = [&](auto callback) {
// Helper lambda that traverses the delta chain on order to find the first
@ -1682,7 +1680,7 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction
append_deltas([&](const Delta &delta, const auto &parent, uint64_t timestamp) {
wal_file_->AppendDelta(delta, parent, timestamp);
replication_state_.AppendDelta(delta, parent, timestamp);
repl_storage_state_.AppendDelta(delta, parent, timestamp);
});
// Add a delta that indicates that the transaction is fully written to the WAL
@ -1690,15 +1688,16 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction
wal_file_->AppendTransactionEnd(final_commit_timestamp);
FinalizeWalFile();
return replication_state_.FinalizeTransaction(final_commit_timestamp);
return repl_storage_state_.FinalizeTransaction(final_commit_timestamp);
}
bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction, uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) {
auto &replState = repl_state_;
if (!InitializeWalFile(replState.GetEpoch())) {
return true;
}
replication_state_.InitializeTransaction(wal_file_->SequenceNumber());
repl_storage_state_.InitializeTransaction(wal_file_->SequenceNumber());
for (const auto &md_delta : transaction.md_deltas) {
switch (md_delta.action) {
@ -1769,7 +1768,7 @@ bool InMemoryStorage::AppendToWalDataDefinition(const Transaction &transaction,
wal_file_->AppendTransactionEnd(final_commit_timestamp);
FinalizeWalFile();
return replication_state_.FinalizeTransaction(final_commit_timestamp);
return repl_storage_state_.FinalizeTransaction(final_commit_timestamp);
}
void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label,
@ -1777,7 +1776,7 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera
LabelPropertyIndexStats property_stats,
uint64_t final_commit_timestamp) {
wal_file_->AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp);
replication_state_.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp);
repl_storage_state_.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp);
}
void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOperation operation, LabelId label,
@ -1804,19 +1803,19 @@ void InMemoryStorage::AppendToWalDataDefinition(durability::StorageMetadataOpera
}
utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot(
std::optional<bool> is_periodic) {
if (replication_state_.GetRole() != replication::ReplicationRole::MAIN) {
memgraph::replication::ReplicationState const &replicationState, std::optional<bool> is_periodic) {
if (replicationState.IsReplica()) {
return CreateSnapshotError::DisabledForReplica;
}
auto snapshot_creator = [this]() {
auto const &epoch = replicationState.GetEpoch();
auto snapshot_creator = [this, &epoch]() {
utils::Timer timer;
const auto &epoch = replication_state_.GetEpoch();
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_);
// Create snapshot.
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_, name_id_mapper_.get(),
&indices_, &constraints_, config_, uuid_, epoch.id, replication_state_.history,
&indices_, &constraints_, config_, uuid_, epoch, repl_storage_state_.history,
&file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);
@ -1872,14 +1871,13 @@ uint64_t InMemoryStorage::CommitTimestamp(const std::optional<uint64_t> desired_
return *desired_commit_timestamp;
}
void InMemoryStorage::EstablishNewEpoch() {
void InMemoryStorage::PrepareForNewEpoch(std::string prev_epoch) {
std::unique_lock engine_guard{engine_lock_};
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_.reset();
}
// TODO: Move out of storage (no need for the lock) <- missing commit_timestamp at a higher level
replication_state_.NewEpoch();
repl_storage_state_.AddEpochToHistory(std::move(prev_epoch));
}
utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::IsPathLocked() {
@ -1907,14 +1905,16 @@ utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath()
return true;
}
auto InMemoryStorage::CreateReplicationClient(replication::ReplicationClientConfig const &config)
auto InMemoryStorage::CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config)
-> std::unique_ptr<ReplicationClient> {
return std::make_unique<InMemoryReplicationClient>(this, config);
auto &replState = this->repl_state_;
return std::make_unique<InMemoryReplicationClient>(this, config, &replState.GetEpoch());
}
std::unique_ptr<ReplicationServer> InMemoryStorage::CreateReplicationServer(
const replication::ReplicationServerConfig &config) {
return std::make_unique<InMemoryReplicationServer>(this, config);
const memgraph::replication::ReplicationServerConfig &config) {
auto &replState = this->repl_state_;
return std::make_unique<InMemoryReplicationServer>(this, config, &replState.GetEpoch());
}
std::unique_ptr<Storage::Accessor> InMemoryStorage::Access(std::optional<IsolationLevel> override_isolation_level) {

View File

@ -21,10 +21,9 @@
#include "storage/v2/storage.hpp"
/// REPLICATION ///
#include "storage/v2/replication/config.hpp"
#include "replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/replication/replication_storage_state.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#include "storage/v2/transaction.hpp"
@ -326,14 +325,15 @@ class InMemoryStorage final : public Storage {
utils::FileRetainer::FileLockerAccessor::ret_type LockPath();
utils::FileRetainer::FileLockerAccessor::ret_type UnlockPath();
utils::BasicResult<CreateSnapshotError> CreateSnapshot(std::optional<bool> is_periodic);
utils::BasicResult<InMemoryStorage::CreateSnapshotError> CreateSnapshot(
memgraph::replication::ReplicationState const &replicationState, std::optional<bool> is_periodic);
Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) override;
auto CreateReplicationClient(replication::ReplicationClientConfig const &config)
auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config)
-> std::unique_ptr<ReplicationClient> override;
auto CreateReplicationServer(const replication::ReplicationServerConfig &config)
auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig &config)
-> std::unique_ptr<ReplicationServer> override;
private:
@ -351,7 +351,7 @@ class InMemoryStorage final : public Storage {
template <bool force>
void CollectGarbage(std::unique_lock<utils::ResourceLock> main_guard = {});
bool InitializeWalFile();
bool InitializeWalFile(memgraph::replication::ReplicationEpoch &epoch);
void FinalizeWalFile();
StorageInfo GetInfo() const override;
@ -379,7 +379,7 @@ class InMemoryStorage final : public Storage {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
void EstablishNewEpoch() override;
void PrepareForNewEpoch(std::string prev_epoch) override;
// Main object storage
utils::SkipList<storage::Vertex> vertices_;

View File

@ -13,11 +13,7 @@
#include <cstdint>
namespace memgraph::storage::replication {
enum class ReplicationRole : uint8_t { MAIN, REPLICA };
enum class ReplicationMode : std::uint8_t { SYNC, ASYNC };
enum class ReplicaState : std::uint8_t { READY, REPLICATING, RECOVERY, INVALID };
enum class RegistrationMode : std::uint8_t { MUST_BE_INSTANTLY_VALID, CAN_BE_INVALID };
} // namespace memgraph::storage::replication

View File

@ -17,38 +17,13 @@
#include <utility>
#include "io/network/endpoint.hpp"
#include "replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "utils/uuid.hpp"
// TODO: move to replication namespace and unify
namespace memgraph::storage {
// TODO: Should be at MAIN instance level; shouldn't be connected to storage
struct ReplicationEpoch {
ReplicationEpoch() : id(utils::GenerateUUID()) {}
// UUID to distinguish different main instance runs for replication process
// on SAME storage.
// Multiple instances can have same storage UUID and be MAIN at the same time.
// We cannot compare commit timestamps of those instances if one of them
// becomes the replica of the other so we use epoch_id_ as additional
// discriminating property.
// Example of this:
// We have 2 instances of the same storage, S1 and S2.
// S1 and S2 are MAIN and accept their own commits and write them to the WAL.
// At the moment when S1 commited a transaction with timestamp 20, and S2
// a different transaction with timestamp 15, we change S2's role to REPLICA
// and register it on S1.
// Without using the epoch_id, we don't know that S1 and S2 have completely
// different transactions, we think that the S2 is behind only by 5 commits.
std::string id; // TODO: Move to replication level
// Generates a new epoch id, returning the old one
std::string NewEpoch() { return std::exchange(id, utils::GenerateUUID()); }
std::string SetEpoch(std::string new_epoch) { return std::exchange(id, std::move(new_epoch)); }
};
struct TimestampInfo {
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
@ -56,7 +31,7 @@ struct TimestampInfo {
struct ReplicaInfo {
std::string name;
replication::ReplicationMode mode;
memgraph::replication::ReplicationMode mode;
io::network::Endpoint endpoint;
replication::ReplicaState state;
TimestampInfo timestamp_info;

View File

@ -1,376 +0,0 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/replication/replication.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/replication/replication_client.hpp"
#include "storage/v2/replication/replication_server.hpp"
#include "storage/v2/storage.hpp"
namespace memgraph::storage {
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
namespace {
std::string RegisterReplicaErrorToString(RegisterReplicaError error) {
using enum RegisterReplicaError;
switch (error) {
case NAME_EXISTS:
return "NAME_EXISTS";
case END_POINT_EXISTS:
return "END_POINT_EXISTS";
case CONNECTION_FAILED:
return "CONNECTION_FAILED";
case COULD_NOT_BE_PERSISTED:
return "COULD_NOT_BE_PERSISTED";
}
}
} // namespace
storage::ReplicationState::ReplicationState(bool restore, std::filesystem::path durability_dir) {
if (restore) {
utils::EnsureDirOrDie(durability_dir / durability::kReplicationDirectory);
durability_ = std::make_unique<kvstore::KVStore>(durability_dir / durability::kReplicationDirectory);
}
}
void storage::ReplicationState::Reset() {
replication_server_.reset();
replication_clients_.WithLock([&](auto &clients) { clients.clear(); });
}
bool storage::ReplicationState::SetMainReplicationRole(storage::Storage *storage) {
// We don't want to generate new epoch_id and do the
// cleanup if we're already a MAIN
if (GetRole() == replication::ReplicationRole::MAIN) {
return false;
}
// Main instance does not need replication server
// This should be always called first so we finalize everything
replication_server_.reset(nullptr);
storage->EstablishNewEpoch();
if (ShouldStoreAndRestoreReplicationState()) {
// Only thing that matters here is the role saved as MAIN
auto data = replication::ReplicationStatusToJSON(
replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName,
.ip_address = "",
.port = 0,
.sync_mode = replication::ReplicationMode::SYNC,
.replica_check_frequency = std::chrono::seconds(0),
.ssl = std::nullopt,
.role = replication::ReplicationRole::MAIN});
if (!durability_->Put(replication::kReservedReplicationRoleName, data.dump())) {
spdlog::error("Error when saving MAIN replication role in settings.");
return false;
}
}
SetRole(replication::ReplicationRole::MAIN);
return true;
}
void storage::ReplicationState::AppendOperation(durability::StorageMetadataOperation operation, LabelId label,
const std::set<PropertyId> &properties, const LabelIndexStats &stats,
const LabelPropertyIndexStats &property_stats,
uint64_t final_commit_timestamp) {
if (GetRole() == replication::ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) {
stream.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp);
});
}
});
}
}
void storage::ReplicationState::InitializeTransaction(uint64_t seq_num) {
if (GetRole() == replication::ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(seq_num);
}
});
}
}
void storage::ReplicationState::AppendDelta(const Delta &delta, const Edge &parent, uint64_t timestamp) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, parent, timestamp); });
}
});
}
void storage::ReplicationState::AppendDelta(const Delta &delta, const Vertex &parent, uint64_t timestamp) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, parent, timestamp); });
}
});
}
bool storage::ReplicationState::FinalizeTransaction(uint64_t timestamp) {
bool finalized_on_all_replicas = true;
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(timestamp); });
const auto finalized = client->FinalizeTransactionReplication();
if (client->Mode() == replication::ReplicationMode::SYNC) {
finalized_on_all_replicas = finalized && finalized_on_all_replicas;
}
}
});
return finalized_on_all_replicas;
}
utils::BasicResult<RegisterReplicaError> ReplicationState::RegisterReplica(
const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config,
Storage *storage) {
MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can register a replica!");
auto name_check = [&config](auto &clients) {
auto name_matches = [&name = config.name](const auto &client) { return client->Name() == name; };
return std::any_of(clients.begin(), clients.end(), name_matches);
};
auto desired_endpoint = io::network::Endpoint{config.ip_address, config.port};
auto endpoint_check = [&](auto &clients) {
auto endpoint_matches = [&](const auto &client) { return client->Endpoint() == desired_endpoint; };
return std::any_of(clients.begin(), clients.end(), endpoint_matches);
};
auto task = [&](auto &clients) -> utils::BasicResult<RegisterReplicaError> {
if (name_check(clients)) {
return RegisterReplicaError::NAME_EXISTS;
}
if (endpoint_check(clients)) {
return RegisterReplicaError::END_POINT_EXISTS;
}
if (!TryPersistReplicaClient(config)) {
return RegisterReplicaError::COULD_NOT_BE_PERSISTED;
}
auto client = storage->CreateReplicationClient(config);
client->Start();
if (client->State() == replication::ReplicaState::INVALID) {
if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) {
return RegisterReplicaError::CONNECTION_FAILED;
}
spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name());
}
clients.push_back(std::move(client));
return {};
};
return replication_clients_.WithLock(task);
}
bool ReplicationState::TryPersistReplicaClient(const replication::ReplicationClientConfig &config) {
if (!ShouldStoreAndRestoreReplicationState()) return true;
auto data = replication::ReplicationStatusToJSON(
replication::ReplicationStatus{.name = config.name,
.ip_address = config.ip_address,
.port = config.port,
.sync_mode = config.mode,
.replica_check_frequency = config.replica_check_frequency,
.ssl = config.ssl,
.role = replication::ReplicationRole::REPLICA});
if (durability_->Put(config.name, data.dump())) return true;
spdlog::error("Error when saving replica {} in settings.", config.name);
return false;
}
bool ReplicationState::SetReplicaRole(const replication::ReplicationServerConfig &config, Storage *storage) {
// We don't want to restart the server if we're already a REPLICA
if (GetRole() == replication::ReplicationRole::REPLICA) {
return false;
}
replication_server_ = storage->CreateReplicationServer(config);
bool res = replication_server_->Start();
if (!res) {
spdlog::error("Unable to start the replication server.");
return false;
}
if (ShouldStoreAndRestoreReplicationState()) {
// Only thing that matters here is the role saved as REPLICA and the listening port
auto data = replication::ReplicationStatusToJSON(
replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName,
.ip_address = config.ip_address,
.port = config.port,
.sync_mode = replication::ReplicationMode::SYNC,
.replica_check_frequency = std::chrono::seconds(0),
.ssl = std::nullopt,
.role = replication::ReplicationRole::REPLICA});
if (!durability_->Put(replication::kReservedReplicationRoleName, data.dump())) {
spdlog::error("Error when saving REPLICA replication role in settings.");
return false;
}
}
SetRole(replication::ReplicationRole::REPLICA);
return true;
}
bool ReplicationState::UnregisterReplica(std::string_view name) {
MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can unregister a replica!");
if (ShouldStoreAndRestoreReplicationState()) {
if (!durability_->Delete(name)) {
spdlog::error("Error when removing replica {} from settings.", name);
return false;
}
}
return replication_clients_.WithLock([&](auto &clients) {
return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; });
});
}
std::optional<replication::ReplicaState> ReplicationState::GetReplicaState(const std::string_view name) {
return replication_clients_.WithLock([&](auto &clients) -> std::optional<replication::ReplicaState> {
const auto client_it =
std::find_if(clients.cbegin(), clients.cend(), [name](auto &client) { return client->Name() == name; });
if (client_it == clients.cend()) {
return std::nullopt;
}
return (*client_it)->State();
});
}
std::vector<ReplicaInfo> ReplicationState::ReplicasInfo() {
return replication_clients_.WithLock([](auto &clients) {
std::vector<ReplicaInfo> replica_info;
replica_info.reserve(clients.size());
std::transform(
clients.begin(), clients.end(), std::back_inserter(replica_info), [](const auto &client) -> ReplicaInfo {
return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()};
});
return replica_info;
});
}
void ReplicationState::RestoreReplicationRole(Storage *storage) {
if (!ShouldStoreAndRestoreReplicationState()) {
return;
}
spdlog::info("Restoring replication role.");
uint16_t port = replication::kDefaultReplicationPort;
const auto replication_data = durability_->Get(replication::kReservedReplicationRoleName);
if (!replication_data.has_value()) {
spdlog::debug("Cannot find data needed for restore replication role in persisted metadata.");
return;
}
const auto maybe_replication_status = replication::JSONToReplicationStatus(nlohmann::json::parse(*replication_data));
if (!maybe_replication_status.has_value()) {
LOG_FATAL("Cannot parse previously saved configuration of replication role {}.",
replication::kReservedReplicationRoleName);
}
const auto replication_status = *maybe_replication_status;
if (!replication_status.role.has_value()) {
SetRole(replication::ReplicationRole::MAIN);
} else {
SetRole(*replication_status.role);
port = replication_status.port;
}
if (GetRole() == replication::ReplicationRole::REPLICA) {
replication_server_ = storage->CreateReplicationServer(replication::ReplicationServerConfig{
.ip_address = replication::kDefaultReplicationServerIp,
.port = port,
});
bool res = replication_server_->Start();
if (!res) {
LOG_FATAL("Unable to start the replication server.");
}
}
spdlog::info("Replication role restored to {}.",
GetRole() == replication::ReplicationRole::MAIN ? "MAIN" : "REPLICA");
}
void ReplicationState::RestoreReplicas(Storage *storage) {
if (!ShouldStoreAndRestoreReplicationState()) {
return;
}
spdlog::info("Restoring replicas.");
for (const auto &[replica_name, replica_data] : *durability_) {
spdlog::info("Restoring replica {}.", replica_name);
const auto maybe_replica_status = replication::JSONToReplicationStatus(nlohmann::json::parse(replica_data));
if (!maybe_replica_status.has_value()) {
LOG_FATAL("Cannot parse previously saved configuration of replica {}.", replica_name);
}
auto replica_status = *maybe_replica_status;
MG_ASSERT(replica_status.name == replica_name, "Expected replica name is '{}', but got '{}'", replica_status.name,
replica_name);
if (replica_name == replication::kReservedReplicationRoleName) {
continue;
}
auto ret = RegisterReplica(replication::RegistrationMode::CAN_BE_INVALID,
replication::ReplicationClientConfig{
.name = replica_status.name,
.mode = replica_status.sync_mode,
.ip_address = replica_status.ip_address,
.port = replica_status.port,
.replica_check_frequency = replica_status.replica_check_frequency,
.ssl = replica_status.ssl,
},
storage);
if (ret.HasError()) {
MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError());
LOG_FATAL("Failure when restoring replica {}: {}.", replica_name, RegisterReplicaErrorToString(ret.GetError()));
}
spdlog::info("Replica {} restored.", replica_name);
}
}
constexpr uint16_t kEpochHistoryRetention = 1000;
void ReplicationState::NewEpoch() {
// Generate new epoch id and save the last one to the history.
if (history.size() == kEpochHistoryRetention) {
history.pop_front();
}
auto prevEpoch = epoch_.NewEpoch();
history.emplace_back(std::move(prevEpoch), last_commit_timestamp_);
}
void ReplicationState::AppendEpoch(std::string new_epoch) {
auto prevEpoch = epoch_.SetEpoch(std::move(new_epoch));
history.emplace_back(std::move(prevEpoch), last_commit_timestamp_);
}
} // namespace memgraph::storage

View File

@ -15,29 +15,25 @@
#include <type_traits>
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/indices/label_property_index_stats.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/transaction.hpp"
#include "utils/file_locker.hpp"
#include "utils/logging.hpp"
#include "utils/message.hpp"
namespace memgraph::storage {
static auto CreateClientContext(const replication::ReplicationClientConfig &config) -> communication::ClientContext {
static auto CreateClientContext(const memgraph::replication::ReplicationClientConfig &config)
-> communication::ClientContext {
return (config.ssl) ? communication::ClientContext{config.ssl->key_file, config.ssl->cert_file}
: communication::ClientContext{};
}
ReplicationClient::ReplicationClient(Storage *storage, replication::ReplicationClientConfig const &config)
ReplicationClient::ReplicationClient(Storage *storage, const memgraph::replication::ReplicationClientConfig &config,
const memgraph::replication::ReplicationEpoch *epoch)
: name_{config.name},
rpc_context_{CreateClientContext(config)},
rpc_client_{io::network::Endpoint(config.ip_address, config.port), &rpc_context_},
replica_check_frequency_{config.replica_check_frequency},
mode_{config.mode},
storage_{storage} {}
storage_{storage},
repl_epoch_{epoch} {}
ReplicationClient::~ReplicationClient() {
auto endpoint = rpc_client_.Endpoint();
@ -46,21 +42,19 @@ ReplicationClient::~ReplicationClient() {
}
uint64_t ReplicationClient::LastCommitTimestamp() const {
return storage_->replication_state_.last_commit_timestamp_.load();
return storage_->repl_storage_state_.last_commit_timestamp_.load();
}
void ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
const auto &main_epoch = storage_->replication_state_.GetEpoch();
auto stream{rpc_client_.Stream<replication::HeartbeatRpc>(storage_->replication_state_.last_commit_timestamp_,
main_epoch.id)};
auto stream{rpc_client_.Stream<replication::HeartbeatRpc>(storage_->repl_storage_state_.last_commit_timestamp_,
std::string{repl_epoch_->id()})};
const auto replica = stream.AwaitResponse();
std::optional<uint64_t> branching_point;
if (replica.epoch_id != main_epoch.id && replica.current_commit_timestamp != kTimestampInitialId) {
auto const &history = storage_->replication_state_.history;
if (replica.epoch_id != repl_epoch_->id() && replica.current_commit_timestamp != kTimestampInitialId) {
auto const &history = storage_->repl_storage_state_.history;
const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
return main_epoch_info.first == replica.epoch_id;
});
@ -82,8 +76,8 @@ void ReplicationClient::InitializeClient() {
current_commit_timestamp = replica.current_commit_timestamp;
spdlog::trace("Current timestamp on replica {}: {}", name_, current_commit_timestamp);
spdlog::trace("Current timestamp on main: {}", storage_->replication_state_.last_commit_timestamp_.load());
if (current_commit_timestamp == storage_->replication_state_.last_commit_timestamp_.load()) {
spdlog::trace("Current timestamp on main: {}", storage_->repl_storage_state_.last_commit_timestamp_.load());
if (current_commit_timestamp == storage_->repl_storage_state_.last_commit_timestamp_.load()) {
spdlog::debug("Replica '{}' up to date", name_);
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::READY);
@ -110,7 +104,7 @@ TimestampInfo ReplicationClient::GetTimestampInfo() {
replica_state_.store(replication::ReplicaState::INVALID);
HandleRpcFailure();
}
auto main_time_stamp = storage_->replication_state_.last_commit_timestamp_.load();
auto main_time_stamp = storage_->repl_storage_state_.last_commit_timestamp_.load();
info.current_timestamp_of_replica = response.current_commit_timestamp;
info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp;
} catch (const rpc::RpcFailedException &) {
@ -173,7 +167,7 @@ void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_s
MG_ASSERT(!replica_stream_);
try {
replica_stream_.emplace(
ReplicaStream{this, storage_->replication_state_.last_commit_timestamp_.load(), current_wal_seq_num});
ReplicaStream{this, storage_->repl_storage_state_.last_commit_timestamp_.load(), current_wal_seq_num});
replica_state_.store(replication::ReplicaState::REPLICATING);
} catch (const rpc::RpcFailedException &) {
replica_state_.store(replication::ReplicaState::INVALID);
@ -183,8 +177,6 @@ void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_s
}
}
auto ReplicationClient::GetEpochId() const -> std::string const & { return storage_->replication_state_.GetEpoch().id; }
bool ReplicationClient::FinalizeTransactionReplication() {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
@ -218,7 +210,7 @@ bool ReplicationClient::FinalizeTransactionReplication() {
return false;
};
if (mode_ == replication::ReplicationMode::ASYNC) {
if (mode_ == memgraph::replication::ReplicationMode::ASYNC) {
thread_pool_.AddTask([=] { (void)task(); });
return true;
}
@ -290,7 +282,7 @@ ReplicaStream::ReplicaStream(ReplicationClient *self, const uint64_t previous_co
stream_(self_->rpc_client_.Stream<replication::AppendDeltasRpc>(previous_commit_timestamp, current_seq_num)) {
replication::Encoder encoder{stream_.GetBuilder()};
encoder.WriteString(self_->GetEpochId());
encoder.WriteString(self->repl_epoch_->id());
}
void ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t final_commit_timestamp) {

View File

@ -11,12 +11,13 @@
#pragma once
#include "replication/config.hpp"
#include "replication/epoch.hpp"
#include "rpc/client.hpp"
#include "storage/v2/durability/storage_global_operation.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/indices/label_index_stats.hpp"
#include "storage/v2/indices/label_property_index_stats.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/global.hpp"
#include "storage/v2/replication/rpc.hpp"
@ -69,7 +70,8 @@ class ReplicationClient {
friend class ReplicaStream;
public:
ReplicationClient(Storage *storage, replication::ReplicationClientConfig const &config);
ReplicationClient(Storage *storage, const memgraph::replication::ReplicationClientConfig &config,
const memgraph::replication::ReplicationEpoch *epoch);
ReplicationClient(ReplicationClient const &) = delete;
ReplicationClient &operator=(ReplicationClient const &) = delete;
@ -78,7 +80,7 @@ class ReplicationClient {
virtual ~ReplicationClient();
auto Mode() const -> replication::ReplicationMode { return mode_; }
auto Mode() const -> memgraph::replication::ReplicationMode { return mode_; }
auto Name() const -> std::string const & { return name_; }
auto Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); }
auto State() const -> replication::ReplicaState { return replica_state_.load(); }
@ -99,7 +101,6 @@ class ReplicationClient {
virtual void RecoverReplica(uint64_t replica_commit) = 0;
auto GetStorage() -> Storage * { return storage_; }
auto GetEpochId() const -> std::string const &;
auto LastCommitTimestamp() const -> uint64_t;
void InitializeClient();
void HandleRpcFailure();
@ -113,7 +114,7 @@ class ReplicationClient {
std::chrono::seconds replica_check_frequency_;
std::optional<ReplicaStream> replica_stream_;
replication::ReplicationMode mode_{replication::ReplicationMode::SYNC};
memgraph::replication::ReplicationMode mode_{memgraph::replication::ReplicationMode::SYNC};
utils::SpinLock client_lock_;
// This thread pool is used for background tasks so we don't
@ -134,6 +135,8 @@ class ReplicationClient {
utils::Scheduler replica_checker_;
Storage *storage_;
memgraph::replication::ReplicationEpoch const *repl_epoch_;
};
} // namespace memgraph::storage

View File

@ -0,0 +1,204 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/replication/replication_handler.hpp"
#include "replication/state.hpp"
#include "storage/v2/storage.hpp"
namespace memgraph::storage {
namespace {
std::string RegisterReplicaErrorToString(RegisterReplicaError error) {
switch (error) {
using enum RegisterReplicaError;
case NAME_EXISTS:
return "NAME_EXISTS";
case END_POINT_EXISTS:
return "END_POINT_EXISTS";
case CONNECTION_FAILED:
return "CONNECTION_FAILED";
case COULD_NOT_BE_PERSISTED:
return "COULD_NOT_BE_PERSISTED";
}
}
} // namespace
bool ReplicationHandler::SetReplicationRoleMain() {
// We don't want to generate new epoch_id and do the
// cleanup if we're already a MAIN
// TODO: under lock
if (repl_state_.IsMain()) {
return false;
}
// STEP 1) bring down all REPLICA servers
auto current_epoch = std::string(repl_state_.GetEpoch().id());
{ // TODO: foreach storage
// ensure replica server brought down
storage_.repl_storage_state_.replication_server_.reset(nullptr);
// Remember old epoch + storage timestamp association
storage_.repl_storage_state_.AddEpochToHistory(current_epoch);
}
// STEP 2) Change to MAIN
repl_state_.GetEpoch().NewEpoch();
if (!repl_state_.TryPersistRoleMain()) {
// TODO: On failure restore old epoch? restore replication servers?
return false;
}
repl_state_.SetRole(memgraph::replication::ReplicationRole::MAIN);
return true;
}
memgraph::utils::BasicResult<RegisterReplicaError> ReplicationHandler::RegisterReplica(
const RegistrationMode registration_mode, const memgraph::replication::ReplicationClientConfig &config) {
MG_ASSERT(repl_state_.IsMain(), "Only main instance can register a replica!");
auto name_check = [&config](auto &clients) {
auto name_matches = [&name = config.name](const auto &client) { return client->Name() == name; };
return std::any_of(clients.begin(), clients.end(), name_matches);
};
auto desired_endpoint = io::network::Endpoint{config.ip_address, config.port};
auto endpoint_check = [&](auto &clients) {
auto endpoint_matches = [&](const auto &client) { return client->Endpoint() == desired_endpoint; };
return std::any_of(clients.begin(), clients.end(), endpoint_matches);
};
auto task = [&](auto &clients) -> utils::BasicResult<RegisterReplicaError> {
if (name_check(clients)) {
return RegisterReplicaError::NAME_EXISTS;
}
if (endpoint_check(clients)) {
return RegisterReplicaError::END_POINT_EXISTS;
}
using enum RegistrationMode;
if (registration_mode != RESTORE && !repl_state_.TryPersistRegisteredReplica(config)) {
return RegisterReplicaError::COULD_NOT_BE_PERSISTED;
}
auto client = storage_.CreateReplicationClient(config);
client->Start();
if (client->State() == replication::ReplicaState::INVALID) {
if (registration_mode != RESTORE) {
return RegisterReplicaError::CONNECTION_FAILED;
}
spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name());
}
clients.push_back(std::move(client));
return {};
};
return storage_.repl_storage_state_.replication_clients_.WithLock(task);
}
bool ReplicationHandler::SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config) {
// We don't want to restart the server if we're already a REPLICA
if (repl_state_.IsReplica()) {
return false;
}
std::unique_ptr<ReplicationServer> replication_server = storage_.CreateReplicationServer(config);
bool res = replication_server->Start();
if (!res) {
spdlog::error("Unable to start the replication server.");
return false;
}
storage_.repl_storage_state_.replication_server_ = std::move(replication_server);
if (!repl_state_.TryPersistRoleReplica(config)) {
return false;
}
repl_state_.SetRole(memgraph::replication::ReplicationRole::REPLICA);
return true;
}
auto ReplicationHandler::UnregisterReplica(std::string_view name) -> UnregisterReplicaResult {
if (repl_state_.IsReplica()) {
return UnregisterReplicaResult::NOT_MAIN;
}
if (!repl_state_.TryPersistUnregisterReplica(name)) {
return UnregisterReplicaResult::COULD_NOT_BE_PERSISTED;
}
auto const n_unregistered = storage_.repl_storage_state_.replication_clients_.WithLock([&](auto &clients) {
return std::erase_if(clients, [&](const auto &client) { return client->Name() == name; });
});
return (n_unregistered != 0) ? UnregisterReplicaResult::SUCCESS : UnregisterReplicaResult::CAN_NOT_UNREGISTER;
}
void ReplicationHandler::RestoreReplication() {
if (!repl_state_.ShouldPersist()) {
return;
}
spdlog::info("Restoring replication role.");
using memgraph::replication::ReplicationState;
auto replicationData = repl_state_.FetchReplicationData();
if (replicationData.HasError()) {
switch (replicationData.GetError()) {
using enum ReplicationState::FetchReplicationError;
case NOTHING_FETCHED: {
spdlog::debug("Cannot find data needed for restore replication role in persisted metadata.");
return;
}
case PARSE_ERROR: {
LOG_FATAL("Cannot parse previously saved configuration of replication role.");
return;
}
}
}
/// MAIN
auto const recover_main = [this](ReplicationState::ReplicationDataMain const &configs) {
storage_.repl_storage_state_.replication_server_.reset();
repl_state_.SetRole(memgraph::replication::ReplicationRole::MAIN);
for (const auto &config : configs) {
spdlog::info("Replica {} restored for {}.", config.name, storage_.id());
auto ret = RegisterReplica(RegistrationMode::RESTORE, config);
if (ret.HasError()) {
MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError());
LOG_FATAL("Failure when restoring replica {}: {}.", config.name, RegisterReplicaErrorToString(ret.GetError()));
}
spdlog::info("Replica {} restored for {}.", config.name, storage_.id());
}
spdlog::info("Replication role restored to MAIN.");
};
/// REPLICA
auto const recover_replica = [this](ReplicationState::ReplicationDataReplica const &config) {
auto replication_server = storage_.CreateReplicationServer(config);
if (!replication_server->Start()) {
LOG_FATAL("Unable to start the replication server.");
}
storage_.repl_storage_state_.replication_server_ = std::move(replication_server);
repl_state_.SetRole(memgraph::replication::ReplicationRole::REPLICA);
spdlog::info("Replication role restored to REPLICA.");
};
std::visit(
utils::Overloaded{
recover_main,
recover_replica,
},
*replicationData);
}
auto ReplicationHandler::GetRole() const -> memgraph::replication::ReplicationRole { return repl_state_.GetRole(); }
bool ReplicationHandler::IsMain() const { return repl_state_.IsMain(); }
bool ReplicationHandler::IsReplica() const { return repl_state_.IsReplica(); }
} // namespace memgraph::storage

View File

@ -0,0 +1,71 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "replication/role.hpp"
#include "utils/result.hpp"
// BEGIN fwd declares
namespace memgraph::replication {
struct ReplicationState;
struct ReplicationServerConfig;
struct ReplicationClientConfig;
} // namespace memgraph::replication
namespace memgraph::storage {
class Storage;
}
// END fwd declares
namespace memgraph::storage {
enum class RegistrationMode : std::uint8_t { MUST_BE_INSTANTLY_VALID, RESTORE };
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED, COULD_NOT_BE_PERSISTED };
enum class UnregisterReplicaResult : uint8_t {
NOT_MAIN,
COULD_NOT_BE_PERSISTED,
CAN_NOT_UNREGISTER,
SUCCESS,
};
/// A handler type that keep in sync current ReplicationState and the MAIN/REPLICA-ness of Storage
/// TODO: extend to do multiple storages
struct ReplicationHandler {
ReplicationHandler(memgraph::replication::ReplicationState &replState, Storage &storage)
: repl_state_(replState), storage_(storage) {}
// as REPLICA, become MAIN
bool SetReplicationRoleMain();
// as MAIN, become REPLICA
bool SetReplicationRoleReplica(const memgraph::replication::ReplicationServerConfig &config);
// as MAIN, define and connect to REPLICAs
auto RegisterReplica(RegistrationMode registration_mode, const memgraph::replication::ReplicationClientConfig &config)
-> utils::BasicResult<RegisterReplicaError>;
// as MAIN, remove a REPLICA connection
auto UnregisterReplica(std::string_view name) -> UnregisterReplicaResult;
// Generic restoration
// TODO: decouple storage restoration from epoch restoration
void RestoreReplication();
// Helper pass-through (TODO: remove)
auto GetRole() const -> memgraph::replication::ReplicationRole;
bool IsMain() const;
bool IsReplica() const;
private:
memgraph::replication::ReplicationState &repl_state_;
Storage &storage_;
};
} // namespace memgraph::storage

View File

@ -11,13 +11,13 @@
#include "replication_server.hpp"
#include "io/network/endpoint.hpp"
#include "replication/config.hpp"
#include "rpc.hpp"
#include "storage/v2/replication/config.hpp"
namespace memgraph::storage {
namespace {
auto CreateServerContext(const replication::ReplicationServerConfig &config) -> communication::ServerContext {
auto CreateServerContext(const memgraph::replication::ReplicationServerConfig &config) -> communication::ServerContext {
return (config.ssl) ? communication::ServerContext{config.ssl->key_file, config.ssl->cert_file, config.ssl->ca_file,
config.ssl->verify_peer}
: communication::ServerContext{};
@ -30,7 +30,7 @@ auto CreateServerContext(const replication::ReplicationServerConfig &config) ->
constexpr auto kReplictionServerThreads = 1;
} // namespace
ReplicationServer::ReplicationServer(const replication::ReplicationServerConfig &config)
ReplicationServer::ReplicationServer(const memgraph::replication::ReplicationServerConfig &config)
: rpc_server_context_{CreateServerContext(config)},
rpc_server_{io::network::Endpoint{config.ip_address, config.port}, &rpc_server_context_,
kReplictionServerThreads} {

View File

@ -11,16 +11,16 @@
#pragma once
#include "replication/config.hpp"
#include "rpc/server.hpp"
#include "slk/streams.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/global.hpp"
namespace memgraph::storage {
class ReplicationServer {
public:
explicit ReplicationServer(const replication::ReplicationServerConfig &config);
explicit ReplicationServer(const memgraph::replication::ReplicationServerConfig &config);
ReplicationServer(const ReplicationServer &) = delete;
ReplicationServer(ReplicationServer &&) = delete;
ReplicationServer &operator=(const ReplicationServer &) = delete;

View File

@ -0,0 +1,111 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/replication/replication_storage_state.hpp"
#include "storage/v2/replication/replication_client.hpp"
#include "storage/v2/replication/replication_server.hpp"
namespace memgraph::storage {
void ReplicationStorageState::InitializeTransaction(uint64_t seq_num) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(seq_num);
}
});
}
void ReplicationStorageState::AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t timestamp) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, vertex, timestamp); });
}
});
}
void ReplicationStorageState::AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, edge, timestamp); });
}
});
}
void ReplicationStorageState::AppendOperation(durability::StorageMetadataOperation operation, LabelId label,
const std::set<PropertyId> &properties, const LabelIndexStats &stats,
const LabelPropertyIndexStats &property_stats,
uint64_t final_commit_timestamp) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) {
stream.AppendOperation(operation, label, properties, stats, property_stats, final_commit_timestamp);
});
}
});
}
bool ReplicationStorageState::FinalizeTransaction(uint64_t timestamp) {
return replication_clients_.WithLock([=](auto &clients) {
bool finalized_on_all_replicas = true;
for (ReplicationClientPtr &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(timestamp); });
const auto finalized = client->FinalizeTransactionReplication();
if (client->Mode() == memgraph::replication::ReplicationMode::SYNC) {
finalized_on_all_replicas = finalized && finalized_on_all_replicas;
}
}
return finalized_on_all_replicas;
});
}
std::optional<replication::ReplicaState> ReplicationStorageState::GetReplicaState(std::string_view name) const {
return replication_clients_.WithReadLock([&](auto const &clients) -> std::optional<replication::ReplicaState> {
auto const name_matches = [=](ReplicationClientPtr const &client) { return client->Name() == name; };
auto const client_it = std::find_if(clients.cbegin(), clients.cend(), name_matches);
if (client_it == clients.cend()) {
return std::nullopt;
}
return (*client_it)->State();
});
}
std::vector<ReplicaInfo> ReplicationStorageState::ReplicasInfo() const {
return replication_clients_.WithReadLock([](auto const &clients) {
std::vector<ReplicaInfo> replica_infos;
replica_infos.reserve(clients.size());
auto const asReplicaInfo = [](ReplicationClientPtr const &client) -> ReplicaInfo {
return {client->Name(), client->Mode(), client->Endpoint(), client->State(), client->GetTimestampInfo()};
};
std::transform(clients.begin(), clients.end(), std::back_inserter(replica_infos), asReplicaInfo);
return replica_infos;
});
}
void ReplicationStorageState::Reset() {
replication_server_.reset();
replication_clients_.WithLock([](auto &clients) { clients.clear(); });
}
void ReplicationStorageState::AddEpochToHistory(std::string prev_epoch) {
constexpr uint16_t kEpochHistoryRetention = 1000;
// Generate new epoch id and save the last one to the history.
if (history.size() == kEpochHistoryRetention) {
history.pop_front();
}
history.emplace_back(std::move(prev_epoch), last_commit_timestamp_);
}
void ReplicationStorageState::AddEpochToHistoryForce(std::string prev_epoch) {
history.emplace_back(std::move(prev_epoch), last_commit_timestamp_);
}
} // namespace memgraph::storage

View File

@ -11,6 +11,8 @@
#pragma once
#include <atomic>
#include "kvstore/kvstore.hpp"
#include "storage/v2/delta.hpp"
#include "storage/v2/durability/storage_global_operation.hpp"
@ -18,63 +20,39 @@
#include "utils/result.hpp"
/// REPLICATION ///
#include "storage/v2/replication/config.hpp"
#include "replication/config.hpp"
#include "replication/epoch.hpp"
#include "replication/state.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/global.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
// TODO use replication namespace
namespace memgraph::storage {
class Storage;
class ReplicationServer;
class ReplicationClient;
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED, COULD_NOT_BE_PERSISTED };
struct ReplicationState {
// TODO: This mirrors the logic in InMemoryConstructor; make it independent
ReplicationState(bool restore, std::filesystem::path durability_dir);
// Generic API
void Reset();
// TODO: Just check if server exists -> you are REPLICA
replication::ReplicationRole GetRole() const { return replication_role_.load(); }
bool SetMainReplicationRole(Storage *storage); // Set the instance to MAIN
// TODO: ReplicationServer/Client uses Storage* for RPC callbacks
bool SetReplicaRole(const replication::ReplicationServerConfig &config,
Storage *storage); // Sets the instance to REPLICA
// Generic restoration
void RestoreReplicationRole(Storage *storage);
// MAIN actually doing the replication
struct ReplicationStorageState {
// Only MAIN can send
void InitializeTransaction(uint64_t seq_num);
void AppendDelta(const Delta &delta, const Vertex &vertex, uint64_t timestamp);
void AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp);
void AppendOperation(durability::StorageMetadataOperation operation, LabelId label,
const std::set<PropertyId> &properties, const LabelIndexStats &stats,
const LabelPropertyIndexStats &property_stats, uint64_t final_commit_timestamp);
void InitializeTransaction(uint64_t seq_num);
void AppendDelta(const Delta &delta, const Vertex &parent, uint64_t timestamp);
void AppendDelta(const Delta &delta, const Edge &parent, uint64_t timestamp);
bool FinalizeTransaction(uint64_t timestamp);
// MAIN connecting to replicas
utils::BasicResult<RegisterReplicaError> RegisterReplica(const replication::RegistrationMode registration_mode,
const replication::ReplicationClientConfig &config,
Storage *storage);
bool UnregisterReplica(std::string_view name);
// Getters
auto GetReplicaState(std::string_view name) const -> std::optional<replication::ReplicaState>;
auto ReplicasInfo() const -> std::vector<ReplicaInfo>;
// MAIN reconnecting to replicas
void RestoreReplicas(Storage *storage);
// History
void AddEpochToHistory(std::string prev_epoch);
void AddEpochToHistoryForce(std::string prev_epoch);
// MAIN getting info from replicas
// TODO make into const (problem with SpinLock and WithReadLock)
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name);
std::vector<ReplicaInfo> ReplicasInfo();
const ReplicationEpoch &GetEpoch() const { return epoch_; }
ReplicationEpoch &GetEpoch() { return epoch_; }
void Reset();
// Questions:
// - storage durability <- databases/*name*/wal and snapshots (where this for epoch_id)
@ -83,21 +61,8 @@ struct ReplicationState {
// Each value consists of the epoch id along the last commit belonging to that
// epoch.
std::deque<std::pair<std::string, uint64_t>> history;
// TODO: actually durability
std::atomic<uint64_t> last_commit_timestamp_{kTimestampInitialId};
void NewEpoch();
void AppendEpoch(std::string new_epoch);
private:
bool TryPersistReplicaClient(const replication::ReplicationClientConfig &config);
bool ShouldStoreAndRestoreReplicationState() const { return nullptr != durability_; }
void SetRole(replication::ReplicationRole role) { return replication_role_.store(role); }
// NOTE: Server is not in MAIN it is in REPLICA
std::unique_ptr<ReplicationServer> replication_server_{nullptr};
// We create ReplicationClient using unique_ptr so we can move
// newly created client into the vector.
// We cannot move the client directly because it contains ThreadPool
@ -108,14 +73,13 @@ struct ReplicationState {
// This way we can initialize client in main thread which means
// that we can immediately notify the user if the initialization
// failed.
using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>;
using ReplicationClientPtr = std::unique_ptr<ReplicationClient>;
using ReplicationClientList = utils::Synchronized<std::vector<ReplicationClientPtr>, utils::RWSpinLock>;
// NOTE: Server is not in MAIN it is in REPLICA
std::unique_ptr<ReplicationServer> replication_server_{nullptr};
ReplicationClientList replication_clients_;
std::atomic<replication::ReplicationRole> replication_role_{replication::ReplicationRole::MAIN};
std::unique_ptr<kvstore::KVStore> durability_;
ReplicationEpoch epoch_;
};
} // namespace memgraph::storage

View File

@ -63,7 +63,7 @@ struct HeartbeatReq {
static void Save(const HeartbeatReq &self, memgraph::slk::Builder *builder);
HeartbeatReq() {}
HeartbeatReq(uint64_t main_commit_timestamp, std::string epoch_id)
: main_commit_timestamp(main_commit_timestamp), epoch_id(epoch_id) {}
: main_commit_timestamp(main_commit_timestamp), epoch_id(std::move(epoch_id)) {}
uint64_t main_commit_timestamp;
std::string epoch_id;

View File

@ -40,8 +40,14 @@ class InMemoryStorage;
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
auto ReplicationStateHelper(Config const &config) -> std::optional<std::filesystem::path> {
if (!config.durability.restore_replication_state_on_startup) return std::nullopt;
return {config.durability.storage_directory};
}
Storage::Storage(Config config, StorageMode storage_mode)
: name_id_mapper_(std::invoke([config, storage_mode]() -> std::unique_ptr<NameIdMapper> {
: repl_state_(ReplicationStateHelper(config)),
name_id_mapper_(std::invoke([config, storage_mode]() -> std::unique_ptr<NameIdMapper> {
if (storage_mode == StorageMode::ON_DISK_TRANSACTIONAL) {
return std::make_unique<DiskNameIdMapper>(config.disk.name_id_mapper_directory,
config.disk.id_name_mapper_directory);
@ -53,9 +59,7 @@ Storage::Storage(Config config, StorageMode storage_mode)
storage_mode_(storage_mode),
indices_(config, storage_mode),
constraints_(config, storage_mode),
id_(config.name),
replication_state_(config_.durability.restore_replication_state_on_startup,
config_.durability.storage_directory) {}
id_(config.name) {}
Storage::Accessor::Accessor(SharedAccess /* tag */, Storage *storage, IsolationLevel isolation_level,
StorageMode storage_mode)

View File

@ -19,6 +19,7 @@
#include "io/network/endpoint.hpp"
#include "kvstore/kvstore.hpp"
#include "query/exceptions.hpp"
#include "replication/config.hpp"
#include "storage/v2/all_vertices_iterable.hpp"
#include "storage/v2/commit_log.hpp"
#include "storage/v2/config.hpp"
@ -27,11 +28,10 @@
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/indices/indices.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication.hpp"
#include "storage/v2/replication/replication_client.hpp"
#include "storage/v2/replication/replication_server.hpp"
#include "storage/v2/replication/replication_storage_state.hpp"
#include "storage/v2/storage_error.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/transaction.hpp"
@ -297,37 +297,22 @@ class Storage {
virtual Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) = 0;
virtual void EstablishNewEpoch() = 0;
virtual void PrepareForNewEpoch(std::string prev_epoch) = 0;
virtual auto CreateReplicationClient(replication::ReplicationClientConfig const &config)
virtual auto CreateReplicationClient(const memgraph::replication::ReplicationClientConfig &config)
-> std::unique_ptr<ReplicationClient> = 0;
virtual auto CreateReplicationServer(const replication::ReplicationServerConfig &config)
virtual auto CreateReplicationServer(const memgraph::replication::ReplicationServerConfig &config)
-> std::unique_ptr<ReplicationServer> = 0;
/// REPLICATION
bool SetReplicaRole(const replication::ReplicationServerConfig &config) {
return replication_state_.SetReplicaRole(config, this);
}
bool SetMainReplicationRole() { return replication_state_.SetMainReplicationRole(this); }
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
auto RegisterReplica(const replication::RegistrationMode registration_mode,
const replication::ReplicationClientConfig &config) {
return replication_state_.RegisterReplica(registration_mode, config, this);
}
/// @pre The instance should have a MAIN role
bool UnregisterReplica(const std::string &name) { return replication_state_.UnregisterReplica(name); }
replication::ReplicationRole GetReplicationRole() const { return replication_state_.GetRole(); }
auto ReplicasInfo() { return replication_state_.ReplicasInfo(); }
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name) {
return replication_state_.GetReplicaState(name);
auto ReplicasInfo() { return repl_storage_state_.ReplicasInfo(); }
auto GetReplicaState(std::string_view name) -> std::optional<replication::ReplicaState> {
return repl_storage_state_.GetReplicaState(name);
}
protected:
void RestoreReplicas() { return replication_state_.RestoreReplicas(this); }
void RestoreReplicationRole() { return replication_state_.RestoreReplicationRole(this); }
// TODO: make non-public
memgraph::replication::ReplicationState repl_state_;
ReplicationStorageState repl_storage_state_;
public:
// Main storage lock.
@ -360,9 +345,6 @@ class Storage {
std::atomic<uint64_t> vertex_id_{0};
std::atomic<uint64_t> edge_id_{0};
const std::string id_; //!< High-level assigned ID
protected:
ReplicationState replication_state_;
};
} // namespace memgraph::storage

View File

@ -25,6 +25,7 @@ find_package(gflags REQUIRED)
find_package(Threads REQUIRED)
add_library(mg-utils STATIC ${utils_src_files})
add_library(mg::utils ALIAS mg-utils)
target_link_libraries(mg-utils PUBLIC Boost::headers fmt::fmt spdlog::spdlog)
target_link_libraries(mg-utils PRIVATE librdtsc stdc++fs Threads::Threads gflags json uuid rt)

View File

@ -9,8 +9,9 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "formatters.hpp"
#include "replication/state.hpp"
#include "replication/status.hpp"
#include "utils/logging.hpp"
#include <gtest/gtest.h>
@ -20,6 +21,7 @@
#include <string>
using namespace memgraph::storage::replication;
using namespace memgraph::replication;
class ReplicationPersistanceHelperTest : public testing::Test {
protected:

View File

@ -21,27 +21,30 @@
#include <storage/v2/inmemory/storage.hpp>
#include <storage/v2/property_value.hpp>
#include <storage/v2/replication/enums.hpp>
#include "replication/config.hpp"
#include "storage/v2/indices/label_index_stats.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/replication_handler.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/view.hpp"
using testing::UnorderedElementsAre;
using memgraph::replication::ReplicationClientConfig;
using memgraph::replication::ReplicationMode;
using memgraph::replication::ReplicationRole;
using memgraph::replication::ReplicationServerConfig;
using memgraph::storage::Config;
using memgraph::storage::EdgeAccessor;
using memgraph::storage::Gid;
using memgraph::storage::InMemoryStorage;
using memgraph::storage::PropertyValue;
using memgraph::storage::RegisterReplicaError;
using memgraph::storage::RegistrationMode;
using memgraph::storage::ReplicationHandler;
using memgraph::storage::Storage;
using memgraph::storage::UnregisterReplicaResult;
using memgraph::storage::View;
using memgraph::storage::replication::RegistrationMode;
using memgraph::storage::replication::ReplicaState;
using memgraph::storage::replication::ReplicationClientConfig;
using memgraph::storage::replication::ReplicationMode;
using memgraph::storage::replication::ReplicationRole;
using memgraph::storage::replication::ReplicationServerConfig;
class ReplicationTest : public ::testing::Test {
protected:
@ -72,19 +75,21 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
std::unique_ptr<Storage> main_store = std::make_unique<InMemoryStorage>(configuration);
std::unique_ptr<Storage> replica_store = std::make_unique<InMemoryStorage>(configuration);
replica_store->SetReplicaRole(ReplicationServerConfig{
auto replica_store_handler = ReplicationHandler{replica_store->repl_state_, *replica_store};
replica_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
});
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = "REPLICA",
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = "REPLICA",
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
.HasError());
// vertex create
@ -368,7 +373,8 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}})};
replica_store1->SetReplicaRole(ReplicationServerConfig{
auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1};
replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
});
@ -378,28 +384,30 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
.storage_directory = storage_directory,
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}})};
replica_store2->SetReplicaRole(ReplicationServerConfig{
auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2};
replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[1],
});
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
.HasError());
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[1],
})
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[1],
})
.HasError());
const auto *vertex_label = "label";
@ -429,7 +437,8 @@ TEST_F(ReplicationTest, MultipleSynchronousReplicationTest) {
check_replica(replica_store1.get());
check_replica(replica_store2.get());
main_store->UnregisterReplica(replicas[1]);
auto handler = ReplicationHandler{main_store->repl_state_, *main_store};
handler.UnregisterReplica(replicas[1]);
{
auto acc = main_store->Access();
auto v = acc->CreateVertex();
@ -526,19 +535,20 @@ TEST_F(ReplicationTest, RecoveryProcess) {
{.durability = {.storage_directory = replica_storage_directory,
.snapshot_wal_mode = Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL}})};
replica_store->SetReplicaRole(ReplicationServerConfig{
auto replica_store_handler = ReplicationHandler{replica_store->repl_state_, *replica_store};
replica_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
});
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
.HasError());
ASSERT_EQ(main_store->GetReplicaState(replicas[0]), ReplicaState::RECOVERY);
@ -600,19 +610,21 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
std::unique_ptr<Storage> replica_store_async{new InMemoryStorage(configuration)};
replica_store_async->SetReplicaRole(ReplicationServerConfig{
auto replica_store_handler = ReplicationHandler{replica_store_async->repl_state_, *replica_store_async};
replica_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[1],
});
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = "REPLICA_ASYNC",
.mode = ReplicationMode::ASYNC,
.ip_address = local_host,
.port = ports[1],
})
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = "REPLICA_ASYNC",
.mode = ReplicationMode::ASYNC,
.ip_address = local_host,
.port = ports[1],
})
.HasError());
static constexpr size_t vertices_create_num = 10;
@ -647,36 +659,39 @@ TEST_F(ReplicationTest, EpochTest) {
std::unique_ptr<Storage> main_store{new InMemoryStorage(configuration)};
std::unique_ptr<Storage> replica_store1{new InMemoryStorage(configuration)};
replica_store1->SetReplicaRole(ReplicationServerConfig{
auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1};
replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
});
std::unique_ptr<Storage> replica_store2{new InMemoryStorage(configuration)};
replica_store2->SetReplicaRole(ReplicationServerConfig{
auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2};
replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = 10001,
});
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
.HasError());
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = 10001,
})
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = 10001,
})
.HasError());
std::optional<Gid> vertex_gid;
@ -699,18 +714,19 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc->Commit().HasError());
}
main_store->UnregisterReplica(replicas[0]);
main_store->UnregisterReplica(replicas[1]);
main_store_handler.UnregisterReplica(replicas[0]);
main_store_handler.UnregisterReplica(replicas[1]);
replica_store1->SetMainReplicationRole();
ASSERT_FALSE(replica_store1
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = 10001,
})
ASSERT_TRUE(replica1_store_handler.SetReplicationRoleMain());
ASSERT_FALSE(replica1_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = 10001,
})
.HasError());
@ -733,18 +749,18 @@ TEST_F(ReplicationTest, EpochTest) {
ASSERT_FALSE(acc->Commit().HasError());
}
replica_store1->SetReplicaRole(ReplicationServerConfig{
replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
});
ASSERT_TRUE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
ASSERT_TRUE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
.HasError());
@ -769,43 +785,46 @@ TEST_F(ReplicationTest, ReplicationInformation) {
std::unique_ptr<Storage> replica_store1{new InMemoryStorage(configuration)};
uint16_t replica1_port = 10001;
replica_store1->SetReplicaRole(ReplicationServerConfig{
auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1};
replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = replica1_port,
});
uint16_t replica2_port = 10002;
std::unique_ptr<Storage> replica_store2{new InMemoryStorage(configuration)};
replica_store2->SetReplicaRole(ReplicationServerConfig{
auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2};
replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = replica2_port,
});
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = replica1_port,
})
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = replica1_port,
})
.HasError());
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::ASYNC,
.ip_address = local_host,
.port = replica2_port,
})
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::ASYNC,
.ip_address = local_host,
.port = replica2_port,
})
.HasError());
ASSERT_EQ(main_store->GetReplicationRole(), ReplicationRole::MAIN);
ASSERT_EQ(replica_store1->GetReplicationRole(), ReplicationRole::REPLICA);
ASSERT_EQ(replica_store2->GetReplicationRole(), ReplicationRole::REPLICA);
ASSERT_TRUE(main_store->repl_state_.IsMain());
ASSERT_TRUE(replica_store1->repl_state_.IsReplica());
ASSERT_TRUE(replica_store2->repl_state_.IsReplica());
const auto replicas_info = main_store->ReplicasInfo();
ASSERT_EQ(replicas_info.size(), 2);
@ -828,36 +847,38 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
std::unique_ptr<Storage> replica_store1{new InMemoryStorage(configuration)};
uint16_t replica1_port = 10001;
replica_store1->SetReplicaRole(ReplicationServerConfig{
auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1};
replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = replica1_port,
});
uint16_t replica2_port = 10002;
std::unique_ptr<Storage> replica_store2{new InMemoryStorage(configuration)};
replica_store2->SetReplicaRole(ReplicationServerConfig{
auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2};
replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = replica2_port,
});
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = replica1_port,
})
.HasError());
ASSERT_TRUE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::ASYNC,
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = replica2_port,
.port = replica1_port,
})
.HasError());
ASSERT_TRUE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::ASYNC,
.ip_address = local_host,
.port = replica2_port,
})
.GetError() == RegisterReplicaError::NAME_EXISTS);
}
@ -866,35 +887,38 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
std::unique_ptr<Storage> main_store{new InMemoryStorage(configuration)};
std::unique_ptr<Storage> replica_store1{new InMemoryStorage(configuration)};
replica_store1->SetReplicaRole(ReplicationServerConfig{
auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1};
replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = common_port,
});
std::unique_ptr<Storage> replica_store2{new InMemoryStorage(configuration)};
replica_store2->SetReplicaRole(ReplicationServerConfig{
auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2};
replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = common_port,
});
ASSERT_FALSE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = common_port,
})
.HasError());
ASSERT_TRUE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_FALSE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::ASYNC,
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = common_port,
})
.HasError());
ASSERT_TRUE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::ASYNC,
.ip_address = local_host,
.port = common_port,
})
.GetError() == RegisterReplicaError::END_POINT_EXISTS);
}
@ -904,30 +928,34 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) {
std::unique_ptr<Storage> main_store{new InMemoryStorage(main_config)};
std::unique_ptr<Storage> replica_store1{new InMemoryStorage(configuration)};
replica_store1->SetReplicaRole(ReplicationServerConfig{
auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1};
replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
});
std::unique_ptr<Storage> replica_store2{new InMemoryStorage(configuration)};
replica_store2->SetReplicaRole(ReplicationServerConfig{
auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2};
replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[1],
});
auto res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
auto res =
main_store_handler.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
});
ASSERT_FALSE(res.HasError());
res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[1],
});
res = main_store_handler.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[1],
});
ASSERT_FALSE(res.HasError());
auto replica_infos = main_store->ReplicasInfo();
@ -961,31 +989,34 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
std::unique_ptr<Storage> main_store{new InMemoryStorage(main_config)};
std::unique_ptr<Storage> replica_store1{new InMemoryStorage(configuration)};
replica_store1->SetReplicaRole(ReplicationServerConfig{
auto replica1_store_handler = ReplicationHandler{replica_store1->repl_state_, *replica_store1};
replica1_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[0],
});
std::unique_ptr<Storage> replica_store2{new InMemoryStorage(configuration)};
replica_store2->SetReplicaRole(ReplicationServerConfig{
auto replica2_store_handler = ReplicationHandler{replica_store2->repl_state_, *replica_store2};
replica2_store_handler.SetReplicationRoleReplica(ReplicationServerConfig{
.ip_address = local_host,
.port = ports[1],
});
auto res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
auto res =
main_store_handler.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{
.name = replicas[0],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
});
ASSERT_FALSE(res.HasError());
res = main_store->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[1],
});
res = main_store_handler.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID, ReplicationClientConfig{
.name = replicas[1],
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[1],
});
ASSERT_FALSE(res.HasError());
auto replica_infos = main_store->ReplicasInfo();
@ -998,8 +1029,9 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
ASSERT_EQ(replica_infos[1].endpoint.address, local_host);
ASSERT_EQ(replica_infos[1].endpoint.port, ports[1]);
const auto unregister_res = main_store->UnregisterReplica(replicas[0]);
ASSERT_TRUE(unregister_res);
auto handler = ReplicationHandler{main_store->repl_state_, *main_store};
const auto unregister_res = handler.UnregisterReplica(replicas[0]);
ASSERT_EQ(unregister_res, UnregisterReplicaResult::SUCCESS);
replica_infos = main_store->ReplicasInfo();
ASSERT_EQ(replica_infos.size(), 1);
@ -1020,13 +1052,14 @@ TEST_F(ReplicationTest, RestoringReplicationAtStartup) {
TEST_F(ReplicationTest, AddingInvalidReplica) {
std::unique_ptr<Storage> main_store{new InMemoryStorage(configuration)};
ASSERT_TRUE(main_store
->RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = "REPLICA",
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
auto main_store_handler = ReplicationHandler{main_store->repl_state_, *main_store};
ASSERT_TRUE(main_store_handler
.RegisterReplica(RegistrationMode::MUST_BE_INSTANTLY_VALID,
ReplicationClientConfig{
.name = "REPLICA",
.mode = ReplicationMode::SYNC,
.ip_address = local_host,
.port = ports[0],
})
.GetError() == RegisterReplicaError::CONNECTION_FAILED);
}