Replication refactor (part 4) (#1211)

More refactoring to isolate generic replication behavior. Making the 
InMemory* types even more decoupled from replication logic.
This commit is contained in:
Gareth Andrew Lloyd 2023-08-31 16:06:44 +01:00 committed by GitHub
parent eb5167dfef
commit e928eed028
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 506 additions and 485 deletions

View File

@ -217,16 +217,15 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
void SetReplicationRole(ReplicationQuery::ReplicationRole replication_role, std::optional<int64_t> port) override {
auto *mem_storage = static_cast<storage::InMemoryStorage *>(db_);
if (replication_role == ReplicationQuery::ReplicationRole::MAIN) {
if (!mem_storage->SetMainReplicationRole()) {
if (!db_->SetMainReplicationRole()) {
throw QueryRuntimeException("Couldn't set role to main!");
}
} else {
if (!port || *port < 0 || *port > std::numeric_limits<uint16_t>::max()) {
throw QueryRuntimeException("Port number invalid!");
}
if (!mem_storage->SetReplicaRole(
if (!db_->SetReplicaRole(
io::network::Endpoint(storage::replication::kDefaultReplicationServerIp, static_cast<uint16_t>(*port)),
storage::replication::ReplicationServerConfig{})) {
throw QueryRuntimeException("Couldn't set role to replica!");
@ -236,7 +235,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
ReplicationQuery::ReplicationRole ShowReplicationRole() const override {
switch (static_cast<storage::InMemoryStorage *>(db_)->GetReplicationRole()) {
switch (db_->GetReplicationRole()) {
case storage::replication::ReplicationRole::MAIN:
return ReplicationQuery::ReplicationRole::MAIN;
case storage::replication::ReplicationRole::REPLICA:
@ -249,8 +248,7 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
void RegisterReplica(const std::string &name, const std::string &socket_address,
const ReplicationQuery::SyncMode sync_mode,
const std::chrono::seconds replica_check_frequency) override {
auto *mem_storage = static_cast<storage::InMemoryStorage *>(db_);
if (mem_storage->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
// replica can't register another replica
throw QueryRuntimeException("Replica can't register another replica!");
}
@ -275,9 +273,9 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
io::network::Endpoint::ParseSocketOrIpAddress(socket_address, storage::replication::kDefaultReplicationPort);
if (maybe_ip_and_port) {
auto [ip, port] = *maybe_ip_and_port;
auto ret = mem_storage->RegisterReplica(
name, {std::move(ip), port}, repl_mode, storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
{.replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
auto ret = db_->RegisterReplica(name, {std::move(ip), port}, repl_mode,
storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
{.replica_check_frequency = replica_check_frequency, .ssl = std::nullopt});
if (ret.HasError()) {
throw QueryRuntimeException(fmt::format("Couldn't register replica '{}'!", name));
}
@ -288,26 +286,23 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
void DropReplica(const std::string &replica_name) override {
auto *mem_storage = static_cast<storage::InMemoryStorage *>(db_);
if (mem_storage->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
// replica can't unregister a replica
throw QueryRuntimeException("Replica can't unregister a replica!");
}
if (!mem_storage->UnregisterReplica(replica_name)) {
if (!db_->UnregisterReplica(replica_name)) {
throw QueryRuntimeException(fmt::format("Couldn't unregister the replica '{}'", replica_name));
}
}
using Replica = ReplicationQueryHandler::Replica;
std::vector<Replica> ShowReplicas() const override {
auto *mem_storage = static_cast<storage::InMemoryStorage *>(db_);
if (mem_storage->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
if (db_->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) {
// replica can't show registered replicas (it shouldn't have any)
throw QueryRuntimeException("Replica can't show registered replicas (it shouldn't have any)!");
}
auto repl_infos = mem_storage->ReplicasInfo();
auto repl_infos = db_->ReplicasInfo();
std::vector<Replica> replicas;
replicas.reserve(repl_infos.size());
@ -1333,8 +1328,7 @@ bool IsWriteQueryOnMainMemoryReplica(storage::Storage *storage,
const query::plan::ReadWriteTypeChecker::RWType query_type) {
if (auto storage_mode = storage->GetStorageMode(); storage_mode == storage::StorageMode::IN_MEMORY_ANALYTICAL ||
storage_mode == storage::StorageMode::IN_MEMORY_TRANSACTIONAL) {
auto *mem_storage = static_cast<storage::InMemoryStorage *>(storage);
return (mem_storage->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) &&
return (storage->GetReplicationRole() == storage::replication::ReplicationRole::REPLICA) &&
(query_type == RWType::W || query_type == RWType::RW);
}
return false;
@ -1343,8 +1337,7 @@ bool IsWriteQueryOnMainMemoryReplica(storage::Storage *storage,
storage::replication::ReplicationRole GetReplicaRole(storage::Storage *storage) {
if (auto storage_mode = storage->GetStorageMode(); storage_mode == storage::StorageMode::IN_MEMORY_ANALYTICAL ||
storage_mode == storage::StorageMode::IN_MEMORY_TRANSACTIONAL) {
auto *mem_storage = static_cast<storage::InMemoryStorage *>(storage);
return mem_storage->GetReplicationRole();
return storage->GetReplicationRole();
}
return storage::replication::ReplicationRole::MAIN;
}

View File

@ -5,6 +5,7 @@ find_package(Threads REQUIRED)
add_library(mg-storage-v2 STATIC
commit_log.cpp
constraints/existence_constraints.cpp
constraints/constraints.cpp
temporal.cpp
durability/durability.cpp
durability/serialization.cpp

View File

@ -0,0 +1,32 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/disk/unique_constraints.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
namespace memgraph::storage {
Constraints::Constraints(const Config &config, StorageMode storage_mode) {
std::invoke([this, config, storage_mode]() {
existence_constraints_ = std::make_unique<ExistenceConstraints>();
switch (storage_mode) {
case StorageMode::IN_MEMORY_TRANSACTIONAL:
case StorageMode::IN_MEMORY_ANALYTICAL:
unique_constraints_ = std::make_unique<InMemoryUniqueConstraints>();
break;
case StorageMode::ON_DISK_TRANSACTIONAL:
unique_constraints_ = std::make_unique<DiskUniqueConstraints>(config);
break;
};
});
}
} // namespace memgraph::storage

View File

@ -13,27 +13,13 @@
#include "storage/v2/config.hpp"
#include "storage/v2/constraints/existence_constraints.hpp"
#include "storage/v2/disk/unique_constraints.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
#include "storage/v2/constraints/unique_constraints.hpp"
#include "storage/v2/storage_mode.hpp"
namespace memgraph::storage {
struct Constraints {
Constraints(const Config &config, StorageMode storage_mode) {
std::invoke([this, config, storage_mode]() {
existence_constraints_ = std::make_unique<ExistenceConstraints>();
switch (storage_mode) {
case StorageMode::IN_MEMORY_TRANSACTIONAL:
case StorageMode::IN_MEMORY_ANALYTICAL:
unique_constraints_ = std::make_unique<InMemoryUniqueConstraints>();
break;
case StorageMode::ON_DISK_TRANSACTIONAL:
unique_constraints_ = std::make_unique<DiskUniqueConstraints>(config);
break;
};
});
}
Constraints(const Config &config, StorageMode storage_mode);
Constraints(const Constraints &) = delete;
Constraints(Constraints &&) = delete;

View File

@ -12,13 +12,8 @@
/// TODO: clear dependencies
#include "storage/v2/disk/label_property_index.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/inmemory/indices_utils.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/disk_utils.hpp"
#include "utils/exceptions.hpp"
#include "utils/file.hpp"
#include "utils/skip_list.hpp"
#include "utils/rocksdb_serialization.hpp"
namespace memgraph::storage {

View File

@ -28,6 +28,8 @@
#include "kvstore/kvstore.hpp"
#include "spdlog/spdlog.h"
#include "storage/v2/constraints/unique_constraints.hpp"
#include "storage/v2/disk/label_index.hpp"
#include "storage/v2/disk/label_property_index.hpp"
#include "storage/v2/disk/rocksdb_storage.hpp"
#include "storage/v2/disk/storage.hpp"
#include "storage/v2/disk/unique_constraints.hpp"

View File

@ -27,7 +27,9 @@
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/indices/label_property_index.hpp"
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
#include "utils/event_histogram.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"

View File

@ -0,0 +1,28 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
namespace memgraph::storage::durability {
/// Enum used to indicate a global database operation that isn't transactional.
enum class StorageGlobalOperation {
LABEL_INDEX_CREATE,
LABEL_INDEX_DROP,
LABEL_PROPERTY_INDEX_CREATE,
LABEL_PROPERTY_INDEX_DROP,
EXISTENCE_CONSTRAINT_CREATE,
EXISTENCE_CONSTRAINT_DROP,
UNIQUE_CONSTRAINT_CREATE,
UNIQUE_CONSTRAINT_DROP,
};
} // namespace memgraph::storage::durability

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -20,6 +20,7 @@
#include "storage/v2/delta.hpp"
#include "storage/v2/durability/metadata.hpp"
#include "storage/v2/durability/serialization.hpp"
#include "storage/v2/durability/storage_global_operation.hpp"
#include "storage/v2/edge.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/name_id_mapper.hpp"
@ -107,18 +108,6 @@ struct WalDeltaData {
bool operator==(const WalDeltaData &a, const WalDeltaData &b);
bool operator!=(const WalDeltaData &a, const WalDeltaData &b);
/// Enum used to indicate a global database operation that isn't transactional.
enum class StorageGlobalOperation {
LABEL_INDEX_CREATE,
LABEL_INDEX_DROP,
LABEL_PROPERTY_INDEX_CREATE,
LABEL_PROPERTY_INDEX_DROP,
EXISTENCE_CONSTRAINT_CREATE,
EXISTENCE_CONSTRAINT_DROP,
UNIQUE_CONSTRAINT_CREATE,
UNIQUE_CONSTRAINT_DROP,
};
constexpr bool IsWalDeltaDataTypeTransactionEnd(const WalDeltaData::Type type) {
switch (type) {
// These delta actions are all found inside transactions so they don't

View File

@ -10,7 +10,10 @@
// licenses/APL.txt.
#include "storage/v2/indices/indices.hpp"
#include "storage/v2/disk/label_index.hpp"
#include "storage/v2/disk/label_property_index.hpp"
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/label_property_index.hpp"
namespace memgraph::storage {
@ -35,4 +38,16 @@ void Indices::UpdateOnSetProperty(PropertyId property, const PropertyValue &valu
label_property_index_->UpdateOnSetProperty(property, value, vertex, tx);
}
Indices::Indices(Constraints *constraints, const Config &config, StorageMode storage_mode) {
std::invoke([this, constraints, config, storage_mode]() {
if (storage_mode == StorageMode::IN_MEMORY_TRANSACTIONAL || storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
label_index_ = std::make_unique<InMemoryLabelIndex>(this, constraints, config);
label_property_index_ = std::make_unique<InMemoryLabelPropertyIndex>(this, constraints, config);
} else {
label_index_ = std::make_unique<DiskLabelIndex>(this, constraints, config);
label_property_index_ = std::make_unique<DiskLabelPropertyIndex>(this, constraints, config);
}
});
}
} // namespace memgraph::storage

View File

@ -12,28 +12,14 @@
#pragma once
#include <memory>
#include "storage/v2/disk/label_index.hpp"
#include "storage/v2/disk/label_property_index.hpp"
#include "storage/v2/indices/label_index.hpp"
#include "storage/v2/indices/label_property_index.hpp"
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/storage_mode.hpp"
namespace memgraph::storage {
struct Indices {
Indices(Constraints *constraints, const Config &config, StorageMode storage_mode) {
std::invoke([this, constraints, config, storage_mode]() {
if (storage_mode == StorageMode::IN_MEMORY_TRANSACTIONAL || storage_mode == StorageMode::IN_MEMORY_ANALYTICAL) {
label_index_ = std::make_unique<InMemoryLabelIndex>(this, constraints, config);
label_property_index_ = std::make_unique<InMemoryLabelPropertyIndex>(this, constraints, config);
} else {
label_index_ = std::make_unique<DiskLabelIndex>(this, constraints, config);
label_property_index_ = std::make_unique<DiskLabelPropertyIndex>(this, constraints, config);
}
});
}
Indices(Constraints *constraints, const Config &config, StorageMode storage_mode);
Indices(const Indices &) = delete;
Indices(Indices &&) = delete;

View File

@ -10,7 +10,7 @@
// licenses/APL.txt.
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/indices_utils.hpp"
#include "storage/v2/indices/indices_utils.hpp"
namespace memgraph::storage {

View File

@ -10,7 +10,7 @@
// licenses/APL.txt.
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/inmemory/indices_utils.hpp"
#include "storage/v2/indices/indices_utils.hpp"
namespace memgraph::storage {

View File

@ -12,6 +12,7 @@
#include "storage/v2/inmemory/replication/replication_client.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/inmemory/storage.hpp"
namespace memgraph::storage {
@ -67,212 +68,50 @@ void CurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buf
replication::CurrentWalRes CurrentWalHandler::Finalize() { return stream_.AwaitResponse(); }
////// ReplicationClient Helpers //////
replication::WalFilesRes TransferWalFiles(rpc::Client &client, const std::vector<std::filesystem::path> &wal_files) {
MG_ASSERT(!wal_files.empty(), "Wal files list is empty!");
auto stream = client.Stream<replication::WalFilesRpc>(wal_files.size());
replication::Encoder encoder(stream.GetBuilder());
for (const auto &wal : wal_files) {
spdlog::debug("Sending wal file: {}", wal);
encoder.WriteFile(wal);
}
return stream.AwaitResponse();
}
replication::SnapshotRes TransferSnapshot(rpc::Client &client, const std::filesystem::path &path) {
auto stream = client.Stream<replication::SnapshotRpc>();
replication::Encoder encoder(stream.GetBuilder());
encoder.WriteFile(path);
return stream.AwaitResponse();
}
uint64_t ReplicateCurrentWal(CurrentWalHandler &stream, durability::WalFile const &wal_file) {
stream.AppendFilename(wal_file.Path().filename());
utils::InputFile file;
MG_ASSERT(file.Open(wal_file.Path()), "Failed to open current WAL file!");
const auto [buffer, buffer_size] = wal_file.CurrentFileBuffer();
stream.AppendSize(file.GetSize() + buffer_size);
stream.AppendFileData(&file);
stream.AppendBufferData(buffer, buffer_size);
auto response = stream.Finalize();
return response.current_commit_timestamp;
}
////// ReplicationClient //////
InMemoryReplicationClient::InMemoryReplicationClient(InMemoryStorage *storage, std::string name,
io::network::Endpoint endpoint, replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config)
: ReplicationClient{std::move(name), std::move(endpoint), mode, config}, storage_{storage} {}
: ReplicationClient{storage, std::move(name), std::move(endpoint), mode, config} {}
void InMemoryReplicationClient::TryInitializeClientAsync() {
thread_pool_.AddTask([this] {
rpc_client_.Abort();
this->TryInitializeClientSync();
});
}
void InMemoryReplicationClient::FrequentCheck() {
const auto is_success = std::invoke([this]() {
try {
auto stream{rpc_client_.Stream<replication::FrequentHeartbeatRpc>()};
const auto response = stream.AwaitResponse();
return response.success;
} catch (const rpc::RpcFailedException &) {
return false;
}
});
// States: READY, REPLICATING, RECOVERY, INVALID
// If success && ready, replicating, recovery -> stay the same because something good is going on.
// If success && INVALID -> [it's possible that replica came back to life] -> TryInitializeClient.
// If fail -> [replica is not reachable at all] -> INVALID state.
// NOTE: TryInitializeClient might return nothing if there is a branching point.
// NOTE: The early return pattern simplified the code, but the behavior should be as explained.
if (!is_success) {
replica_state_.store(replication::ReplicaState::INVALID);
return;
}
if (replica_state_.load() == replication::ReplicaState::INVALID) {
TryInitializeClientAsync();
}
} /// @throws rpc::RpcFailedException
void InMemoryReplicationClient::Start() {
auto const &endpoint = rpc_client_.Endpoint();
spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port);
TryInitializeClientSync();
// Help the user to get the most accurate replica state possible.
if (replica_check_frequency_ > std::chrono::seconds(0)) {
replica_checker_.Run("Replica Checker", replica_check_frequency_, [this] { FrequentCheck(); });
}
}
void InMemoryReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
const auto &main_epoch = storage_->replication_state_.GetEpoch();
auto stream{rpc_client_.Stream<replication::HeartbeatRpc>(storage_->replication_state_.last_commit_timestamp_,
main_epoch.id)};
const auto replica = stream.AwaitResponse();
std::optional<uint64_t> branching_point;
if (replica.epoch_id != main_epoch.id && replica.current_commit_timestamp != kTimestampInitialId) {
auto const &history = storage_->replication_state_.history;
const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
return main_epoch_info.first == replica.epoch_id;
});
if (epoch_info_iter == history.crend()) {
branching_point = 0;
} else if (epoch_info_iter->second != replica.current_commit_timestamp) {
branching_point = epoch_info_iter->second;
}
}
if (branching_point) {
spdlog::error(
"You cannot register Replica {} to this Main because at one point "
"Replica {} acted as the Main instance. Both the Main and Replica {} "
"now hold unique data. Please resolve data conflicts and start the "
"replication on a clean instance.",
name_, name_, name_);
return;
}
current_commit_timestamp = replica.current_commit_timestamp;
spdlog::trace("Current timestamp on replica {}: {}", name_, current_commit_timestamp);
spdlog::trace("Current timestamp on main: {}", storage_->replication_state_.last_commit_timestamp_.load());
if (current_commit_timestamp == storage_->replication_state_.last_commit_timestamp_.load()) {
spdlog::debug("Replica '{}' up to date", name_);
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::READY);
} else {
spdlog::debug("Replica '{}' is behind", name_);
{
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::RECOVERY);
}
thread_pool_.AddTask([=, this] { this->RecoverReplica(current_commit_timestamp); });
}
}
void InMemoryReplicationClient::TryInitializeClientSync() {
try {
InitializeClient();
} catch (const rpc::RpcFailedException &) {
std::unique_lock client_guarde{client_lock_};
replica_state_.store(replication::ReplicaState::INVALID);
spdlog::error(utils::MessageWithLink("Failed to connect to replica {} at the endpoint {}.", name_,
rpc_client_.Endpoint(), "https://memgr.ph/replication"));
}
}
void InMemoryReplicationClient::HandleRpcFailure() {
spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication"));
TryInitializeClientAsync();
}
void InMemoryReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) {
std::unique_lock guard(client_lock_);
const auto status = replica_state_.load();
switch (status) {
case replication::ReplicaState::RECOVERY:
spdlog::debug("Replica {} is behind MAIN instance", name_);
return;
case replication::ReplicaState::REPLICATING:
spdlog::debug("Replica {} missed a transaction", name_);
// We missed a transaction because we're still replicating
// the previous transaction so we need to go to RECOVERY
// state to catch up with the missing transaction
// We cannot queue the recovery process here because
// an error can happen while we're replicating the previous
// transaction after which the client should go to
// INVALID state before starting the recovery process
replica_state_.store(replication::ReplicaState::RECOVERY);
return;
case replication::ReplicaState::INVALID:
HandleRpcFailure();
return;
case replication::ReplicaState::READY:
MG_ASSERT(!replica_stream_);
try {
replica_stream_.emplace(
ReplicaStream{this, storage_->replication_state_.last_commit_timestamp_.load(), current_wal_seq_num});
replica_state_.store(replication::ReplicaState::REPLICATING);
} catch (const rpc::RpcFailedException &) {
replica_state_.store(replication::ReplicaState::INVALID);
HandleRpcFailure();
}
return;
}
}
void InMemoryReplicationClient::IfStreamingTransaction(const std::function<void(ReplicaStream &)> &callback) {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
// called from a one thread stands)
if (replica_state_ != replication::ReplicaState::REPLICATING) {
return;
}
try {
callback(*replica_stream_);
} catch (const rpc::RpcFailedException &) {
{
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure();
}
}
bool InMemoryReplicationClient::FinalizeTransactionReplication() {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
// called from a one thread stands)
if (replica_state_ != replication::ReplicaState::REPLICATING) {
return false;
}
if (mode_ == replication::ReplicationMode::ASYNC) {
thread_pool_.AddTask([this] { static_cast<void>(this->FinalizeTransactionReplicationInternal()); });
return true;
}
return FinalizeTransactionReplicationInternal();
}
bool InMemoryReplicationClient::FinalizeTransactionReplicationInternal() {
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
try {
auto response = replica_stream_->Finalize();
replica_stream_.reset();
std::unique_lock client_guard(client_lock_);
if (!response.success || replica_state_ == replication::ReplicaState::RECOVERY) {
replica_state_.store(replication::ReplicaState::RECOVERY);
thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); });
} else {
replica_state_.store(replication::ReplicaState::READY);
return true;
}
} catch (const rpc::RpcFailedException &) {
replica_stream_.reset();
{
std::unique_lock client_guard(client_lock_);
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure();
}
return false;
}
void InMemoryReplicationClient::RecoverReplica(uint64_t replica_commit) {
spdlog::debug("Starting replica recover");
auto *storage = static_cast<InMemoryStorage *>(storage_);
while (true) {
auto file_locker = storage_->file_retainer_.AddLocker();
auto file_locker = storage->file_retainer_.AddLocker();
const auto steps = GetRecoverySteps(replica_commit, &file_locker);
int i = 0;
@ -284,21 +123,22 @@ void InMemoryReplicationClient::RecoverReplica(uint64_t replica_commit) {
using StepType = std::remove_cvref_t<T>;
if constexpr (std::is_same_v<StepType, RecoverySnapshot>) {
spdlog::debug("Sending the latest snapshot file: {}", arg);
auto response = TransferSnapshot(arg);
auto response = TransferSnapshot(rpc_client_, arg);
replica_commit = response.current_commit_timestamp;
} else if constexpr (std::is_same_v<StepType, RecoveryWals>) {
spdlog::debug("Sending the latest wal files");
auto response = TransferWalFiles(arg);
auto response = TransferWalFiles(rpc_client_, arg);
replica_commit = response.current_commit_timestamp;
spdlog::debug("Wal files successfully transferred.");
} else if constexpr (std::is_same_v<StepType, RecoveryCurrentWal>) {
std::unique_lock transaction_guard(storage_->engine_lock_);
if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == arg.current_wal_seq_num) {
storage_->wal_file_->DisableFlushing();
std::unique_lock transaction_guard(storage->engine_lock_);
if (storage->wal_file_ && storage->wal_file_->SequenceNumber() == arg.current_wal_seq_num) {
storage->wal_file_->DisableFlushing();
transaction_guard.unlock();
spdlog::debug("Sending current wal file");
replica_commit = ReplicateCurrentWal();
storage_->wal_file_->EnableFlushing();
auto streamHandler = CurrentWalHandler{this};
replica_commit = ReplicateCurrentWal(streamHandler, *storage->wal_file_);
storage->wal_file_->EnableFlushing();
} else {
spdlog::debug("Cannot recover using current wal file");
}
@ -328,27 +168,17 @@ void InMemoryReplicationClient::RecoverReplica(uint64_t replica_commit) {
// and we will go to recovery.
// By adding this lock, we can avoid that, and go to RECOVERY immediately.
std::unique_lock client_guard{client_lock_};
const auto last_commit_timestamp = LastCommitTimestamp();
SPDLOG_INFO("Replica timestamp: {}", replica_commit);
SPDLOG_INFO("Last commit: {}", storage_->replication_state_.last_commit_timestamp_);
if (storage_->replication_state_.last_commit_timestamp_.load() == replica_commit) {
SPDLOG_INFO("Last commit: {}", last_commit_timestamp);
if (last_commit_timestamp == replica_commit) {
replica_state_.store(replication::ReplicaState::READY);
return;
}
}
}
uint64_t InMemoryReplicationClient::ReplicateCurrentWal() {
const auto &wal_file = storage_->wal_file_;
auto stream = CurrentWalHandler{this};
stream.AppendFilename(wal_file->Path().filename());
utils::InputFile file;
MG_ASSERT(file.Open(storage_->wal_file_->Path()), "Failed to open current WAL file!");
const auto [buffer, buffer_size] = wal_file->CurrentFileBuffer();
stream.AppendSize(file.GetSize() + buffer_size);
stream.AppendFileData(&file);
stream.AppendBufferData(buffer, buffer_size);
auto response = stream.Finalize();
return response.current_commit_timestamp;
} /// This method tries to find the optimal path for recoverying a single replica.
/// This method tries to find the optimal path for recoverying a single replica.
/// Based on the last commit transfered to replica it tries to update the
/// replica using durability files - WALs and Snapshots. WAL files are much
/// smaller in size as they contain only the Deltas (changes) made during the
@ -375,16 +205,17 @@ std::vector<InMemoryReplicationClient::RecoveryStep> InMemoryReplicationClient::
// This lock is also necessary to force the missed transaction to finish.
std::optional<uint64_t> current_wal_seq_num;
std::optional<uint64_t> current_wal_from_timestamp;
if (std::unique_lock transtacion_guard(storage_->engine_lock_); storage_->wal_file_) {
current_wal_seq_num.emplace(storage_->wal_file_->SequenceNumber());
current_wal_from_timestamp.emplace(storage_->wal_file_->FromTimestamp());
auto *storage = static_cast<InMemoryStorage *>(storage_);
if (std::unique_lock transtacion_guard(storage->engine_lock_); storage->wal_file_) {
current_wal_seq_num.emplace(storage->wal_file_->SequenceNumber());
current_wal_from_timestamp.emplace(storage->wal_file_->FromTimestamp());
}
auto locker_acc = file_locker->Access();
auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_, current_wal_seq_num);
auto wal_files = durability::GetWalFiles(storage->wal_directory_, storage->uuid_, current_wal_seq_num);
MG_ASSERT(wal_files, "Wal files could not be loaded");
auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_);
auto snapshot_files = durability::GetSnapshotFiles(storage->snapshot_directory_, storage->uuid_);
std::optional<durability::SnapshotDurabilityInfo> latest_snapshot;
if (!snapshot_files.empty()) {
std::sort(snapshot_files.begin(), snapshot_files.end());
@ -513,54 +344,5 @@ std::vector<InMemoryReplicationClient::RecoveryStep> InMemoryReplicationClient::
return recovery_steps;
}
TimestampInfo InMemoryReplicationClient::GetTimestampInfo() {
TimestampInfo info;
info.current_timestamp_of_replica = 0;
info.current_number_of_timestamp_behind_master = 0;
try {
auto stream{rpc_client_.Stream<replication::TimestampRpc>()};
const auto response = stream.AwaitResponse();
const auto is_success = response.success;
if (!is_success) {
replica_state_.store(replication::ReplicaState::INVALID);
HandleRpcFailure();
}
auto main_time_stamp = storage_->replication_state_.last_commit_timestamp_.load();
info.current_timestamp_of_replica = response.current_commit_timestamp;
info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp;
} catch (const rpc::RpcFailedException &) {
{
std::unique_lock client_guard(client_lock_);
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't
// block
}
return info;
}
std::string const &InMemoryReplicationClient::GetEpochId() const { return storage_->replication_state_.GetEpoch().id; }
auto InMemoryReplicationClient::GetStorage() -> Storage * { return storage_; }
replication::SnapshotRes InMemoryReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
auto stream{rpc_client_.Stream<replication::SnapshotRpc>()};
replication::Encoder encoder(stream.GetBuilder());
encoder.WriteFile(path);
return stream.AwaitResponse();
}
replication::WalFilesRes InMemoryReplicationClient::TransferWalFiles(
const std::vector<std::filesystem::path> &wal_files) {
MG_ASSERT(!wal_files.empty(), "Wal files list is empty!");
auto stream{rpc_client_.Stream<replication::WalFilesRpc>(wal_files.size())};
replication::Encoder encoder(stream.GetBuilder());
for (const auto &wal : wal_files) {
spdlog::debug("Sending wal file: {}", wal);
encoder.WriteFile(wal);
}
return stream.AwaitResponse();
}
} // namespace memgraph::storage

View File

@ -10,42 +10,21 @@
// licenses/APL.txt.
#pragma once
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/replication/replication_client.hpp"
namespace memgraph::storage {
class InMemoryStorage;
class InMemoryReplicationClient : public ReplicationClient {
public:
InMemoryReplicationClient(InMemoryStorage *storage, std::string name, io::network::Endpoint endpoint,
replication::ReplicationMode mode, const replication::ReplicationClientConfig &config = {});
void StartTransactionReplication(uint64_t current_wal_seq_num) override;
// Replication clients can be removed at any point
// so to avoid any complexity of checking if the client was removed whenever
// we want to send part of transaction and to avoid adding some GC logic this
// function will run a callback if, after previously callling
// StartTransactionReplication, stream is created.
void IfStreamingTransaction(const std::function<void(ReplicaStream &)> &callback) override;
auto GetEpochId() const -> std::string const & override;
auto GetStorage() -> Storage * override;
void Start() override;
// Return whether the transaction could be finalized on the replication client or not.
[[nodiscard]] bool FinalizeTransactionReplication() override;
TimestampInfo GetTimestampInfo() override;
private:
void TryInitializeClientAsync();
void FrequentCheck();
void InitializeClient();
void TryInitializeClientSync();
void HandleRpcFailure();
[[nodiscard]] bool FinalizeTransactionReplicationInternal();
void RecoverReplica(uint64_t replica_commit);
uint64_t ReplicateCurrentWal();
protected:
void RecoverReplica(uint64_t replica_commit) override;
// TODO: move the GetRecoverySteps stuff below as an internal detail
using RecoverySnapshot = std::filesystem::path;
using RecoveryWals = std::vector<std::filesystem::path>;
struct RecoveryCurrentWal {
@ -54,15 +33,6 @@ class InMemoryReplicationClient : public ReplicationClient {
};
using RecoveryStep = std::variant<RecoverySnapshot, RecoveryWals, RecoveryCurrentWal>;
std::vector<RecoveryStep> GetRecoverySteps(uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker);
// Transfer the snapshot file.
// @param path Path of the snapshot file.
replication::SnapshotRes TransferSnapshot(const std::filesystem::path &path);
// Transfer the WAL files
replication::WalFilesRes TransferWalFiles(const std::vector<std::filesystem::path> &wal_files);
InMemoryStorage *storage_;
};
} // namespace memgraph::storage

View File

@ -14,6 +14,7 @@
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/version.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
namespace memgraph::storage {
namespace {

View File

@ -10,19 +10,13 @@
// licenses/APL.txt.
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/edge_direction.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/stat.hpp"
/// REPLICATION ///
#include "storage/v2/inmemory/replication/replication_client.hpp"
#include "storage/v2/inmemory/replication/replication_server.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
namespace memgraph::storage {
@ -34,9 +28,7 @@ InMemoryStorage::InMemoryStorage(Config config)
lock_file_path_(config.durability.storage_directory / durability::kLockFile),
wal_directory_(config.durability.storage_directory / durability::kWalDirectory),
uuid_(utils::GenerateUUID()),
global_locker_(file_retainer_.AddLocker()),
replication_state_(config_.durability.restore_replication_state_on_startup,
config_.durability.storage_directory) {
global_locker_(file_retainer_.AddLocker()) {
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED ||
config_.durability.snapshot_on_exit || config_.durability.recover_on_startup) {
// Create the directory initially to crash the database in case of
@ -1668,8 +1660,8 @@ bool InMemoryStorage::AppendToWalDataDefinition(durability::StorageGlobalOperati
wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp);
FinalizeWalFile();
return replication_state_.AppendToWalDataDefinition(wal_file_->SequenceNumber(), operation, label, properties,
final_commit_timestamp);
return replication_state_.AppendOperation(wal_file_->SequenceNumber(), operation, label, properties,
final_commit_timestamp);
}
utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot(
@ -1780,7 +1772,7 @@ auto InMemoryStorage::CreateReplicationClient(std::string name, io::network::End
replication::ReplicationMode mode,
replication::ReplicationClientConfig const &config)
-> std::unique_ptr<ReplicationClient> {
return std::make_unique<InMemoryReplicationClient>(this, std::move(name), endpoint, mode, config);
return std::make_unique<InMemoryReplicationClient>(this, std::move(name), std::move(endpoint), mode, config);
}
std::unique_ptr<ReplicationServer> InMemoryStorage::CreateReplicationServer(

View File

@ -17,15 +17,13 @@
#include "storage/v2/storage.hpp"
/// REPLICATION ///
#include "rpc/server.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#include "storage/v2/replication/replication.hpp"
namespace memgraph::storage {
// The storage is based on this paper:
@ -36,7 +34,6 @@ namespace memgraph::storage {
class InMemoryStorage final : public Storage {
friend class InMemoryReplicationServer;
friend class InMemoryReplicationClient;
friend class ReplicationClient;
public:
enum class CreateSnapshotError : uint8_t {
@ -363,33 +360,6 @@ class InMemoryStorage final : public Storage {
utils::BasicResult<StorageUniqueConstraintDroppingError, UniqueConstraints::DeletionStatus> DropUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties, std::optional<uint64_t> desired_commit_timestamp) override;
bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) {
return replication_state_.SetReplicaRole(std::move(endpoint), config, this);
}
bool SetMainReplicationRole() { return replication_state_.SetMainReplicationRole(this); }
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
auto RegisterReplica(std::string name, io::network::Endpoint endpoint,
const replication::ReplicationMode replication_mode,
const replication::RegistrationMode registration_mode,
const replication::ReplicationClientConfig &config) {
return replication_state_.RegisterReplica(std::move(name), std::move(endpoint), replication_mode, registration_mode,
config, this);
}
/// @pre The instance should have a MAIN role
bool UnregisterReplica(const std::string &name) { return replication_state_.UnregisterReplica(name); }
replication::ReplicationRole GetReplicationRole() const { return replication_state_.GetRole(); }
auto ReplicasInfo() { return replication_state_.ReplicasInfo(); }
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name) {
return replication_state_.GetReplicaState(name);
}
void FreeMemory(std::unique_lock<utils::RWLock> main_guard) override;
utils::FileRetainer::FileLockerAccessor::ret_type IsPathLocked();
@ -435,10 +405,6 @@ class InMemoryStorage final : public Storage {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
void RestoreReplicationRole() { return replication_state_.RestoreReplicationRole(this); }
void RestoreReplicas() { return replication_state_.RestoreReplicas(this); }
void EstablishNewEpoch() override;
// Main object storage
@ -494,8 +460,6 @@ class InMemoryStorage final : public Storage {
// Flags to inform CollectGarbage that it needs to do the more expensive full scans
std::atomic<bool> gc_full_scan_vertices_delete_ = false;
std::atomic<bool> gc_full_scan_edges_delete_ = false;
ReplicationState replication_state_;
};
} // namespace memgraph::storage

View File

@ -13,22 +13,9 @@
#include "storage/v2/constraints/constraints.hpp"
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/edge_direction.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/stat.hpp"
/// REPLICATION ///
#include "storage/v2/replication/replication_client.hpp"
#include "storage/v2/replication/replication_server.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/storage_error.hpp"
#include "storage/v2/inmemory/replication/replication_client.hpp"
#include "storage/v2/inmemory/replication/replication_server.hpp"
#include "storage/v2/storage.hpp"
namespace memgraph::storage {
@ -97,10 +84,9 @@ bool storage::ReplicationState::SetMainReplicationRole(storage::Storage *storage
return true;
}
bool storage::ReplicationState::AppendToWalDataDefinition(const uint64_t seq_num,
durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp) {
bool storage::ReplicationState::AppendOperation(const uint64_t seq_num, durability::StorageGlobalOperation operation,
LabelId label, const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp) {
bool finalized_on_all_replicas = true;
// TODO Should we return true if not MAIN?
if (GetRole() == replication::ReplicationRole::MAIN) {
@ -199,7 +185,7 @@ utils::BasicResult<ReplicationState::RegisterReplicaError> ReplicationState::Reg
}
}
auto client = storage->CreateReplicationClient(std::move(name), endpoint, replication_mode, config);
auto client = storage->CreateReplicationClient(std::move(name), std::move(endpoint), replication_mode, config);
client->Start();
if (client->State() == replication::ReplicaState::INVALID) {

View File

@ -11,20 +11,18 @@
#pragma once
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/storage.hpp"
#include "kvstore/kvstore.hpp"
#include "storage/v2/durability/storage_global_operation.hpp"
#include "utils/result.hpp"
/// REPLICATION ///
#include "rpc/server.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/global.hpp"
#include "storage/v2/replication/replication_persistence_helper.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/replication/serialization.hpp"
#include "storage/v2/replication/global.hpp"
// TODO use replication namespace
namespace memgraph::storage {
@ -56,8 +54,8 @@ struct ReplicationState {
void RestoreReplicationRole(Storage *storage);
// MAIN actually doing the replication
bool AppendToWalDataDefinition(uint64_t seq_num, durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties, uint64_t final_commit_timestamp);
bool AppendOperation(uint64_t seq_num, durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties, uint64_t final_commit_timestamp);
void InitializeTransaction(uint64_t seq_num);
void AppendDelta(const Delta &delta, const Vertex &parent, uint64_t timestamp);
void AppendDelta(const Delta &delta, const Edge &parent, uint64_t timestamp);

View File

@ -15,9 +15,9 @@
#include <type_traits>
#include "storage/v2/durability/durability.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/transaction.hpp"
#include "utils/file_locker.hpp"
#include "utils/logging.hpp"
@ -30,18 +30,258 @@ static auto CreateClientContext(const replication::ReplicationClientConfig &conf
: communication::ClientContext{};
}
ReplicationClient::ReplicationClient(std::string name, memgraph::io::network::Endpoint endpoint,
ReplicationClient::ReplicationClient(Storage *storage, std::string name, memgraph::io::network::Endpoint endpoint,
replication::ReplicationMode mode,
replication::ReplicationClientConfig const &config)
: name_{std::move(name)},
rpc_context_{CreateClientContext(config)},
rpc_client_{std::move(endpoint), &rpc_context_},
replica_check_frequency_{config.replica_check_frequency},
mode_{mode} {}
mode_{mode},
storage_{storage} {}
ReplicationClient::~ReplicationClient() {
auto endpoint = rpc_client_.Endpoint();
spdlog::trace("Closing replication client on {}:{}", endpoint.address, endpoint.port);
thread_pool_.Shutdown();
}
uint64_t ReplicationClient::LastCommitTimestamp() const {
return storage_->replication_state_.last_commit_timestamp_.load();
}
void ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
const auto &main_epoch = storage_->replication_state_.GetEpoch();
auto stream{rpc_client_.Stream<replication::HeartbeatRpc>(storage_->replication_state_.last_commit_timestamp_,
main_epoch.id)};
const auto replica = stream.AwaitResponse();
std::optional<uint64_t> branching_point;
if (replica.epoch_id != main_epoch.id && replica.current_commit_timestamp != kTimestampInitialId) {
auto const &history = storage_->replication_state_.history;
const auto epoch_info_iter = std::find_if(history.crbegin(), history.crend(), [&](const auto &main_epoch_info) {
return main_epoch_info.first == replica.epoch_id;
});
if (epoch_info_iter == history.crend()) {
branching_point = 0;
} else if (epoch_info_iter->second != replica.current_commit_timestamp) {
branching_point = epoch_info_iter->second;
}
}
if (branching_point) {
spdlog::error(
"You cannot register Replica {} to this Main because at one point "
"Replica {} acted as the Main instance. Both the Main and Replica {} "
"now hold unique data. Please resolve data conflicts and start the "
"replication on a clean instance.",
name_, name_, name_);
return;
}
current_commit_timestamp = replica.current_commit_timestamp;
spdlog::trace("Current timestamp on replica {}: {}", name_, current_commit_timestamp);
spdlog::trace("Current timestamp on main: {}", storage_->replication_state_.last_commit_timestamp_.load());
if (current_commit_timestamp == storage_->replication_state_.last_commit_timestamp_.load()) {
spdlog::debug("Replica '{}' up to date", name_);
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::READY);
} else {
spdlog::debug("Replica '{}' is behind", name_);
{
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::RECOVERY);
}
thread_pool_.AddTask([=, this] { this->RecoverReplica(current_commit_timestamp); });
}
}
TimestampInfo ReplicationClient::GetTimestampInfo() {
TimestampInfo info;
info.current_timestamp_of_replica = 0;
info.current_number_of_timestamp_behind_master = 0;
try {
auto stream{rpc_client_.Stream<replication::TimestampRpc>()};
const auto response = stream.AwaitResponse();
const auto is_success = response.success;
if (!is_success) {
replica_state_.store(replication::ReplicaState::INVALID);
HandleRpcFailure();
}
auto main_time_stamp = storage_->replication_state_.last_commit_timestamp_.load();
info.current_timestamp_of_replica = response.current_commit_timestamp;
info.current_number_of_timestamp_behind_master = response.current_commit_timestamp - main_time_stamp;
} catch (const rpc::RpcFailedException &) {
{
std::unique_lock client_guard(client_lock_);
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure(); // mutex already unlocked, if the new enqueued task dispatches immediately it probably won't
// block
}
return info;
}
void ReplicationClient::HandleRpcFailure() {
spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication"));
TryInitializeClientAsync();
}
void ReplicationClient::TryInitializeClientAsync() {
thread_pool_.AddTask([this] {
rpc_client_.Abort();
this->TryInitializeClientSync();
});
}
void ReplicationClient::TryInitializeClientSync() {
try {
InitializeClient();
} catch (const rpc::RpcFailedException &) {
std::unique_lock client_guarde{client_lock_};
replica_state_.store(replication::ReplicaState::INVALID);
spdlog::error(utils::MessageWithLink("Failed to connect to replica {} at the endpoint {}.", name_,
rpc_client_.Endpoint(), "https://memgr.ph/replication"));
}
}
void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) {
std::unique_lock guard(client_lock_);
const auto status = replica_state_.load();
switch (status) {
case replication::ReplicaState::RECOVERY:
spdlog::debug("Replica {} is behind MAIN instance", name_);
return;
case replication::ReplicaState::REPLICATING:
spdlog::debug("Replica {} missed a transaction", name_);
// We missed a transaction because we're still replicating
// the previous transaction so we need to go to RECOVERY
// state to catch up with the missing transaction
// We cannot queue the recovery process here because
// an error can happen while we're replicating the previous
// transaction after which the client should go to
// INVALID state before starting the recovery process
replica_state_.store(replication::ReplicaState::RECOVERY);
return;
case replication::ReplicaState::INVALID:
HandleRpcFailure();
return;
case replication::ReplicaState::READY:
MG_ASSERT(!replica_stream_);
try {
replica_stream_.emplace(
ReplicaStream{this, storage_->replication_state_.last_commit_timestamp_.load(), current_wal_seq_num});
replica_state_.store(replication::ReplicaState::REPLICATING);
} catch (const rpc::RpcFailedException &) {
replica_state_.store(replication::ReplicaState::INVALID);
HandleRpcFailure();
}
return;
}
}
auto ReplicationClient::GetEpochId() const -> std::string const & { return storage_->replication_state_.GetEpoch().id; }
bool ReplicationClient::FinalizeTransactionReplication() {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
// called from a one thread stands)
if (replica_state_ != replication::ReplicaState::REPLICATING) {
return false;
}
auto task = [this]() {
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
try {
auto response = replica_stream_->Finalize();
replica_stream_.reset();
std::unique_lock client_guard(client_lock_);
if (!response.success || replica_state_ == replication::ReplicaState::RECOVERY) {
replica_state_.store(replication::ReplicaState::RECOVERY);
thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); });
} else {
replica_state_.store(replication::ReplicaState::READY);
return true;
}
} catch (const rpc::RpcFailedException &) {
replica_stream_.reset();
{
std::unique_lock client_guard(client_lock_);
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure();
}
return false;
};
if (mode_ == replication::ReplicationMode::ASYNC) {
thread_pool_.AddTask([=] { (void)task(); });
return true;
}
return task();
}
void ReplicationClient::FrequentCheck() {
const auto is_success = std::invoke([this]() {
try {
auto stream{rpc_client_.Stream<replication::FrequentHeartbeatRpc>()};
const auto response = stream.AwaitResponse();
return response.success;
} catch (const rpc::RpcFailedException &) {
return false;
}
});
// States: READY, REPLICATING, RECOVERY, INVALID
// If success && ready, replicating, recovery -> stay the same because something good is going on.
// If success && INVALID -> [it's possible that replica came back to life] -> TryInitializeClient.
// If fail -> [replica is not reachable at all] -> INVALID state.
// NOTE: TryInitializeClient might return nothing if there is a branching point.
// NOTE: The early return pattern simplified the code, but the behavior should be as explained.
if (!is_success) {
replica_state_.store(replication::ReplicaState::INVALID);
return;
}
if (replica_state_.load() == replication::ReplicaState::INVALID) {
TryInitializeClientAsync();
}
}
void ReplicationClient::Start() {
auto const &endpoint = rpc_client_.Endpoint();
spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port);
TryInitializeClientSync();
// Help the user to get the most accurate replica state possible.
if (replica_check_frequency_ > std::chrono::seconds(0)) {
replica_checker_.Run("Replica Checker", replica_check_frequency_, [this] { this->FrequentCheck(); });
}
}
void ReplicationClient::IfStreamingTransaction(const std::function<void(ReplicaStream &)> &callback) {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
// called from a one thread stands)
if (replica_state_ != replication::ReplicaState::REPLICATING) {
return;
}
try {
callback(*replica_stream_);
} catch (const rpc::RpcFailedException &) {
{
std::unique_lock client_guard{client_lock_};
replica_state_.store(replication::ReplicaState::INVALID);
}
HandleRpcFailure();
}
}
////// ReplicaStream //////
@ -50,6 +290,7 @@ ReplicaStream::ReplicaStream(ReplicationClient *self, const uint64_t previous_co
: self_(self),
stream_(self_->rpc_client_.Stream<replication::AppendDeltasRpc>(previous_commit_timestamp, current_seq_num)) {
replication::Encoder encoder{stream_.GetBuilder()};
encoder.WriteString(self_->GetEpochId());
}

View File

@ -12,23 +12,30 @@
#pragma once
#include "rpc/client.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/durability/storage_global_operation.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/global.hpp"
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/vertex.hpp"
#include "utils/file.hpp"
#include "utils/file_locker.hpp"
#include "utils/scheduler.hpp"
#include "utils/thread_pool.hpp"
#include <atomic>
#include <optional>
#include <set>
#include <string>
namespace memgraph::storage {
struct Delta;
struct Vertex;
struct Edge;
class Storage;
class ReplicationClient;
// Handler used for transfering the current transaction.
// Handler used for transferring the current transaction.
class ReplicaStream {
public:
explicit ReplicaStream(ReplicationClient *self, uint64_t previous_commit_timestamp, uint64_t current_seq_num);
@ -59,8 +66,8 @@ class ReplicationClient {
friend class ReplicaStream;
public:
ReplicationClient(std::string name, memgraph::io::network::Endpoint endpoint, replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config);
ReplicationClient(Storage *storage, std::string name, memgraph::io::network::Endpoint endpoint,
replication::ReplicationMode mode, const replication::ReplicationClientConfig &config);
ReplicationClient(ReplicationClient const &) = delete;
ReplicationClient &operator=(ReplicationClient const &) = delete;
@ -69,24 +76,35 @@ class ReplicationClient {
virtual ~ReplicationClient();
virtual void Start() = 0;
const auto &Name() const { return name_; }
auto State() const { return replica_state_.load(); }
auto Mode() const { return mode_; }
auto Mode() const -> replication::ReplicationMode { return mode_; }
auto Name() const -> std::string const & { return name_; }
auto Endpoint() const -> io::network::Endpoint const & { return rpc_client_.Endpoint(); }
auto State() const -> replication::ReplicaState { return replica_state_.load(); }
auto GetTimestampInfo() -> TimestampInfo;
virtual void StartTransactionReplication(uint64_t current_wal_seq_num) = 0;
virtual void IfStreamingTransaction(const std::function<void(ReplicaStream &)> &callback) = 0;
virtual auto GetEpochId() const -> std::string const & = 0; // TODO: make non-virtual once epoch is moved to storage
virtual auto GetStorage() -> Storage * = 0;
[[nodiscard]] virtual bool FinalizeTransactionReplication() = 0;
virtual TimestampInfo GetTimestampInfo() = 0;
void Start();
void StartTransactionReplication(const uint64_t current_wal_seq_num);
// Replication clients can be removed at any point
// so to avoid any complexity of checking if the client was removed whenever
// we want to send part of transaction and to avoid adding some GC logic this
// function will run a callback if, after previously callling
// StartTransactionReplication, stream is created.
void IfStreamingTransaction(const std::function<void(ReplicaStream &)> &callback);
// Return whether the transaction could be finalized on the replication client or not.
[[nodiscard]] bool FinalizeTransactionReplication();
protected:
virtual void RecoverReplica(uint64_t replica_commit) = 0;
auto GetStorage() -> Storage * { return storage_; }
auto GetEpochId() const -> std::string const &;
auto LastCommitTimestamp() const -> uint64_t;
void InitializeClient();
void HandleRpcFailure();
void TryInitializeClientAsync();
void TryInitializeClientSync();
void FrequentCheck();
std::string name_;
communication::ClientContext rpc_context_;
rpc::Client rpc_client_;
@ -113,6 +131,7 @@ class ReplicationClient {
std::atomic<replication::ReplicaState> replica_state_{replication::ReplicaState::INVALID};
utils::Scheduler replica_checker_;
Storage *storage_;
};
} // namespace memgraph::storage

View File

@ -51,7 +51,9 @@ Storage::Storage(Config config, StorageMode storage_mode)
storage_mode_(storage_mode),
indices_(&constraints_, config, storage_mode),
constraints_(config, storage_mode),
id_(config.name) {}
id_(config.name),
replication_state_(config_.durability.restore_replication_state_on_startup,
config_.durability.storage_directory) {}
Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level, StorageMode storage_mode)
: storage_(storage),

View File

@ -26,6 +26,7 @@
#include "storage/v2/mvcc.hpp"
#include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp"
#include "storage/v2/replication/replication.hpp"
#include "storage/v2/replication/replication_client.hpp"
#include "storage/v2/replication/replication_server.hpp"
#include "storage/v2/storage_error.hpp"
@ -68,6 +69,9 @@ struct StorageInfo {
};
class Storage {
friend class ReplicationServer;
friend class ReplicationClient;
public:
Storage(Config config, StorageMode storage_mode);
@ -293,6 +297,34 @@ class Storage {
replication::ReplicationServerConfig const &config)
-> std::unique_ptr<ReplicationServer> = 0;
/// REPLICATION
bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) {
return replication_state_.SetReplicaRole(std::move(endpoint), config, this);
}
bool SetMainReplicationRole() { return replication_state_.SetMainReplicationRole(this); }
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
auto RegisterReplica(std::string name, io::network::Endpoint endpoint,
const replication::ReplicationMode replication_mode,
const replication::RegistrationMode registration_mode,
const replication::ReplicationClientConfig &config) {
return replication_state_.RegisterReplica(std::move(name), std::move(endpoint), replication_mode, registration_mode,
config, this);
}
/// @pre The instance should have a MAIN role
bool UnregisterReplica(const std::string &name) { return replication_state_.UnregisterReplica(name); }
replication::ReplicationRole GetReplicationRole() const { return replication_state_.GetRole(); }
auto ReplicasInfo() { return replication_state_.ReplicasInfo(); }
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name) {
return replication_state_.GetReplicaState(name);
}
protected:
void RestoreReplicas() { return replication_state_.RestoreReplicas(this); }
void RestoreReplicationRole() { return replication_state_.RestoreReplicationRole(this); }
public:
// Main storage lock.
// Accessors take a shared lock when starting, so it is possible to block
// creation of new accessors by taking a unique lock. This is used when doing
@ -323,6 +355,9 @@ class Storage {
std::atomic<uint64_t> vertex_id_{0};
std::atomic<uint64_t> edge_id_{0};
const std::string id_; //!< High-level assigned ID
protected:
ReplicationState replication_state_;
};
} // namespace memgraph::storage

View File

@ -15,6 +15,8 @@
#include <gtest/internal/gtest-type-util.h>
#include "disk_test_utils.hpp"
#include "storage/v2/disk/label_index.hpp"
#include "storage/v2/disk/label_property_index.hpp"
#include "storage/v2/disk/storage.hpp"
#include "storage/v2/inmemory/storage.hpp"
#include "storage/v2/property_value.hpp"