Decoupling replication logic from InMemoryStorage (#1169)

This commit is contained in:
andrejtonev 2023-08-22 13:29:25 +02:00 committed by GitHub
parent 476968e2c8
commit 9355e58e73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 561 additions and 445 deletions

View File

@ -72,7 +72,7 @@ std::optional<std::string> KVStore::Get(const std::string &key) const noexcept {
return value;
}
bool KVStore::Delete(const std::string &key) {
bool KVStore::Delete(std::string_view key) {
auto s = pimpl_->db->Delete(rocksdb::WriteOptions(), key);
return s.ok();
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -91,7 +91,7 @@ class KVStore final {
* true if the key doesn't exist and underlying storage
* didn't encounter any error.
*/
bool Delete(const std::string &key);
bool Delete(std::string_view key);
/**
* Deletes the keys and corresponding values from storage.

View File

@ -334,6 +334,8 @@ class DiskStorage final : public Storage {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
void EstablishNewEpoch() override { throw utils::BasicException("Disk storage mode does not support replication."); }
std::unique_ptr<RocksDBStorage> kvstore_;
std::unique_ptr<kvstore::KVStore> durability_kvstore_;
};

View File

@ -33,15 +33,16 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
namespace {
inline constexpr uint16_t kEpochHistoryRetention = 1000;
std::string RegisterReplicaErrorToString(InMemoryStorage::RegisterReplicaError error) {
std::string RegisterReplicaErrorToString(InMemoryStorage::ReplicationState::RegisterReplicaError error) {
using enum InMemoryStorage::ReplicationState::RegisterReplicaError;
switch (error) {
case InMemoryStorage::RegisterReplicaError::NAME_EXISTS:
case NAME_EXISTS:
return "NAME_EXISTS";
case InMemoryStorage::RegisterReplicaError::END_POINT_EXISTS:
case END_POINT_EXISTS:
return "END_POINT_EXISTS";
case InMemoryStorage::RegisterReplicaError::CONNECTION_FAILED:
case CONNECTION_FAILED:
return "CONNECTION_FAILED";
case InMemoryStorage::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
case COULD_NOT_BE_PERSISTED:
return "COULD_NOT_BE_PERSISTED";
}
}
@ -153,12 +154,13 @@ InMemoryStorage::InMemoryStorage(Config config)
if (config_.durability.restore_replication_state_on_startup) {
spdlog::info("Replication configuration will be stored and will be automatically restored in case of a crash.");
utils::EnsureDirOrDie(config_.durability.storage_directory / durability::kReplicationDirectory);
storage_ =
// TODO: Move this to replication
replication_state_.durability_ =
std::make_unique<kvstore::KVStore>(config_.durability.storage_directory / durability::kReplicationDirectory);
RestoreReplicationRole();
if (replication_role_ == replication::ReplicationRole::MAIN) {
if (replication_state_.GetRole() == replication::ReplicationRole::MAIN) {
RestoreReplicas();
}
} else {
@ -168,7 +170,7 @@ InMemoryStorage::InMemoryStorage(Config config)
}
if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED &&
replication_role_ == replication::ReplicationRole::MAIN) {
replication_state_.GetRole() == replication::ReplicationRole::MAIN) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider "
"enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because "
@ -182,8 +184,7 @@ InMemoryStorage::~InMemoryStorage() {
}
{
// Clear replication data
replication_server_.reset();
replication_clients_.WithLock([&](auto &clients) { clients.clear(); });
replication_state_.Reset();
}
if (wal_file_) {
wal_file_->FinalizeWal();
@ -241,7 +242,7 @@ VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertex() {
return {&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_};
}
VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertex(storage::Gid gid) {
VertexAccessor InMemoryStorage::InMemoryAccessor::CreateVertexEx(storage::Gid gid) {
OOMExceptionEnabler oom_exception;
// NOTE: When we update the next `vertex_id_` here we perform a RMW
// (read-modify-write) operation that ISN'T atomic! But, that isn't an issue
@ -448,8 +449,8 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
&storage_->constraints_, config_);
}
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccessor *from, VertexAccessor *to,
EdgeTypeId edge_type, storage::Gid gid) {
Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAccessor *from, VertexAccessor *to,
EdgeTypeId edge_type, storage::Gid gid) {
OOMExceptionEnabler oom_exception;
MG_ASSERT(from->transaction_ == to->transaction_,
"VertexAccessors must be from the same transaction when creating "
@ -702,7 +703,7 @@ utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemory
// modifications before they are written to disk.
// Replica can log only the write transaction received from Main
// so the Wal files are consistent
if (mem_storage->replication_role_ == replication::ReplicationRole::MAIN ||
if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
could_replicate_all_sync_replicas =
mem_storage->AppendToWalDataManipulation(transaction_, *commit_timestamp_);
@ -718,7 +719,7 @@ utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemory
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
// Replica can only update the last commit timestamp with
// the commits received from main.
if (mem_storage->replication_role_ == replication::ReplicationRole::MAIN ||
if (mem_storage->replication_state_.GetRole() == replication::ReplicationRole::MAIN ||
desired_commit_timestamp.has_value()) {
// Update the last commit timestamp
mem_storage->last_commit_timestamp_.store(*commit_timestamp_);
@ -1155,7 +1156,7 @@ Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level, S
// of any query on replica to the last commited transaction
// which is timestamp_ as only commit of transaction with writes
// can change the value of it.
if (replication_role_ == replication::ReplicationRole::REPLICA) {
if (replication_state_.GetRole() == replication::ReplicationRole::REPLICA) {
start_timestamp = timestamp_;
} else {
start_timestamp = timestamp_++;
@ -1519,184 +1520,166 @@ bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction
// A single transaction will always be contained in a single WAL file.
auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire);
if (replication_role_.load() == replication::ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(wal_file_->SequenceNumber());
}
});
}
replication_state_.InitializeTransaction(wal_file_->SequenceNumber());
// Helper lambda that traverses the delta chain on order to find the first
// delta that should be processed and then appends all discovered deltas.
auto find_and_apply_deltas = [&](const auto *delta, const auto &parent, auto filter) {
while (true) {
auto *older = delta->next.load(std::memory_order_acquire);
if (older == nullptr || older->timestamp->load(std::memory_order_acquire) != current_commit_timestamp) break;
delta = older;
}
while (true) {
if (filter(delta->action)) {
wal_file_->AppendDelta(*delta, parent, final_commit_timestamp);
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction(
[&](auto &stream) { stream.AppendDelta(*delta, parent, final_commit_timestamp); });
}
});
auto append_deltas = [&](auto callback) {
// Helper lambda that traverses the delta chain on order to find the first
// delta that should be processed and then appends all discovered deltas.
auto find_and_apply_deltas = [&](const auto *delta, const auto &parent, auto filter) {
while (true) {
auto *older = delta->next.load(std::memory_order_acquire);
if (older == nullptr || older->timestamp->load(std::memory_order_acquire) != current_commit_timestamp) break;
delta = older;
}
auto prev = delta->prev.Get();
while (true) {
if (filter(delta->action)) {
callback(*delta, parent, final_commit_timestamp);
}
auto prev = delta->prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::DELTA) break;
delta = prev.delta;
}
};
// The deltas are ordered correctly in the `transaction.deltas` buffer, but we
// don't traverse them in that order. That is because for each delta we need
// information about the vertex or edge they belong to and that information
// isn't stored in the deltas themselves. In order to find out information
// about the corresponding vertex or edge it is necessary to traverse the
// delta chain for each delta until a vertex or edge is encountered. This
// operation is very expensive as the chain grows.
// Instead, we traverse the edges until we find a vertex or edge and traverse
// their delta chains. This approach has a drawback because we lose the
// correct order of the operations. Because of that, we need to traverse the
// deltas several times and we have to manually ensure that the stored deltas
// will be ordered correctly.
// 1. Process all Vertex deltas and store all operations that create vertices
// and modify vertex data.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::DELTA) break;
delta = prev.delta;
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
return true;
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 2. Process all Vertex deltas and store all operations that create edges.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::REMOVE_OUT_EDGE:
return true;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
return false;
}
});
}
// 3. Process all Edge deltas and store all operations that modify edge data.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::EDGE) continue;
find_and_apply_deltas(&delta, *prev.edge, [](auto action) {
switch (action) {
case Delta::Action::SET_PROPERTY:
return true;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 4. Process all Vertex deltas and store all operations that delete edges.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::ADD_OUT_EDGE:
return true;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 5. Process all Vertex deltas and store all operations that delete vertices.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::RECREATE_OBJECT:
return true;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
};
// The deltas are ordered correctly in the `transaction.deltas` buffer, but we
// don't traverse them in that order. That is because for each delta we need
// information about the vertex or edge they belong to and that information
// isn't stored in the deltas themselves. In order to find out information
// about the corresponding vertex or edge it is necessary to traverse the
// delta chain for each delta until a vertex or edge is encountered. This
// operation is very expensive as the chain grows.
// Instead, we traverse the edges until we find a vertex or edge and traverse
// their delta chains. This approach has a drawback because we lose the
// correct order of the operations. Because of that, we need to traverse the
// deltas several times and we have to manually ensure that the stored deltas
// will be ordered correctly.
// 1. Process all Vertex deltas and store all operations that create vertices
// and modify vertex data.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
return true;
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 2. Process all Vertex deltas and store all operations that create edges.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::REMOVE_OUT_EDGE:
return true;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
return false;
}
});
}
// 3. Process all Edge deltas and store all operations that modify edge data.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::EDGE) continue;
find_and_apply_deltas(&delta, *prev.edge, [](auto action) {
switch (action) {
case Delta::Action::SET_PROPERTY:
return true;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 4. Process all Vertex deltas and store all operations that delete edges.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::ADD_OUT_EDGE:
return true;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// 5. Process all Vertex deltas and store all operations that delete vertices.
for (const auto &delta : transaction.deltas) {
auto prev = delta.prev.Get();
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
if (prev.type != PreviousPtr::Type::VERTEX) continue;
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
switch (action) {
case Delta::Action::RECREATE_OBJECT:
return true;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
return false;
}
});
}
// Add a delta that indicates that the transaction is fully written to the WAL
// file.
wal_file_->AppendTransactionEnd(final_commit_timestamp);
FinalizeWalFile();
auto finalized_on_all_replicas = true;
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); });
const auto finalized = client->FinalizeTransactionReplication();
if (client->Mode() == replication::ReplicationMode::SYNC) {
finalized_on_all_replicas = finalized && finalized_on_all_replicas;
}
}
append_deltas([&](const Delta &delta, const auto &parent, uint64_t timestamp) {
wal_file_->AppendDelta(delta, parent, timestamp);
replication_state_.AppendDelta(delta, parent, timestamp);
});
return finalized_on_all_replicas;
// Add a delta that indicates that the transaction is fully written to the WAL
// file.replication_clients_.WithLock
wal_file_->AppendTransactionEnd(final_commit_timestamp);
FinalizeWalFile();
return replication_state_.FinalizeTransaction(final_commit_timestamp);
}
bool InMemoryStorage::AppendToWalDataDefinition(durability::StorageGlobalOperation operation, LabelId label,
@ -1706,31 +1689,15 @@ bool InMemoryStorage::AppendToWalDataDefinition(durability::StorageGlobalOperati
return true;
}
auto finalized_on_all_replicas = true;
wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp);
{
if (replication_role_.load() == replication::ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(wal_file_->SequenceNumber());
client->IfStreamingTransaction(
[&](auto &stream) { stream.AppendOperation(operation, label, properties, final_commit_timestamp); });
const auto finalized = client->FinalizeTransactionReplication();
if (client->Mode() == replication::ReplicationMode::SYNC) {
finalized_on_all_replicas = finalized && finalized_on_all_replicas;
}
}
});
}
}
FinalizeWalFile();
return finalized_on_all_replicas;
return replication_state_.AppendToWalDataDefinition(wal_file_->SequenceNumber(), operation, label, properties,
final_commit_timestamp);
}
utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot(
std::optional<bool> is_periodic) {
if (replication_role_.load() != replication::ReplicationRole::MAIN) {
if (replication_state_.GetRole() != replication::ReplicationRole::MAIN) {
return CreateSnapshotError::DisabledForReplica;
}
@ -1796,41 +1763,54 @@ uint64_t InMemoryStorage::CommitTimestamp(const std::optional<uint64_t> desired_
return *desired_commit_timestamp;
}
bool InMemoryStorage::SetReplicaRole(io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config) {
// We don't want to restart the server if we're already a REPLICA
if (replication_role_ == replication::ReplicationRole::REPLICA) {
return false;
void InMemoryStorage::EstablishNewEpoch() {
std::unique_lock engine_guard{engine_lock_};
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_.reset();
}
auto port = endpoint.port; // assigning because we will move the endpoint
replication_server_ = std::make_unique<ReplicationServer>(this, std::move(endpoint), config);
// Generate new epoch id and save the last one to the history.
if (epoch_history_.size() == kEpochHistoryRetention) {
epoch_history_.pop_front();
}
epoch_history_.emplace_back(std::exchange(epoch_id_, utils::GenerateUUID()), last_commit_timestamp_);
}
if (ShouldStoreAndRestoreReplicationState()) {
// Only thing that matters here is the role saved as REPLICA and the listening port
auto data = replication::ReplicationStatusToJSON(
replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName,
.ip_address = "",
.port = port,
.sync_mode = replication::ReplicationMode::SYNC,
.replica_check_frequency = std::chrono::seconds(0),
.ssl = std::nullopt,
.role = replication::ReplicationRole::REPLICA});
utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::IsPathLocked() {
auto locker_accessor = global_locker_.Access();
return locker_accessor.IsPathLocked(config_.durability.storage_directory);
}
if (!storage_->Put(replication::kReservedReplicationRoleName, data.dump())) {
spdlog::error("Error when saving REPLICA replication role in settings.");
return false;
utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::LockPath() {
auto locker_accessor = global_locker_.Access();
return locker_accessor.AddPath(config_.durability.storage_directory);
}
utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath() {
{
auto locker_accessor = global_locker_.Access();
const auto ret = locker_accessor.RemovePath(config_.durability.storage_directory);
if (ret.HasError() || !ret.GetValue()) {
// Exit without cleaning the queue
return ret;
}
}
replication_role_.store(replication::ReplicationRole::REPLICA);
// We use locker accessor in seperate scope so we don't produce deadlock
// after we call clean queue.
file_retainer_.CleanQueue();
return true;
}
bool InMemoryStorage::SetMainReplicationRole() {
void storage::InMemoryStorage::ReplicationState::Reset() {
replication_server_.reset();
replication_clients_.WithLock([&](auto &clients) { clients.clear(); });
}
bool storage::InMemoryStorage::ReplicationState::SetMainReplicationRole(storage::Storage *storage) {
// We don't want to generate new epoch_id and do the
// cleanup if we're already a MAIN
if (replication_role_ == replication::ReplicationRole::MAIN) {
if (GetRole() == replication::ReplicationRole::MAIN) {
return false;
}
@ -1838,20 +1818,7 @@ bool InMemoryStorage::SetMainReplicationRole() {
// This should be always called first so we finalize everything
replication_server_.reset(nullptr);
{
std::unique_lock engine_guard{engine_lock_};
if (wal_file_) {
wal_file_->FinalizeWal();
wal_file_.reset();
}
// Generate new epoch id and save the last one to the history.
if (epoch_history_.size() == kEpochHistoryRetention) {
epoch_history_.pop_front();
}
epoch_history_.emplace_back(std::move(epoch_id_), last_commit_timestamp_);
epoch_id_ = utils::GenerateUUID();
}
storage->EstablishNewEpoch();
if (ShouldStoreAndRestoreReplicationState()) {
// Only thing that matters here is the role saved as MAIN
@ -1864,21 +1831,90 @@ bool InMemoryStorage::SetMainReplicationRole() {
.ssl = std::nullopt,
.role = replication::ReplicationRole::MAIN});
if (!storage_->Put(replication::kReservedReplicationRoleName, data.dump())) {
if (!durability_->Put(replication::kReservedReplicationRoleName, data.dump())) {
spdlog::error("Error when saving MAIN replication role in settings.");
return false;
}
}
replication_role_.store(replication::ReplicationRole::MAIN);
SetRole(replication::ReplicationRole::MAIN);
return true;
}
utils::BasicResult<InMemoryStorage::RegisterReplicaError> InMemoryStorage::RegisterReplica(
std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode,
const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) {
MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN,
"Only main instance can register a replica!");
bool storage::InMemoryStorage::ReplicationState::AppendToWalDataDefinition(const uint64_t seq_num,
durability::StorageGlobalOperation operation,
LabelId label,
const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp) {
bool finalized_on_all_replicas = true;
// TODO Should we return true if not MAIN?
if (GetRole() == replication::ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(seq_num);
client->IfStreamingTransaction(
[&](auto &stream) { stream.AppendOperation(operation, label, properties, final_commit_timestamp); });
const auto finalized = client->FinalizeTransactionReplication();
if (client->Mode() == replication::ReplicationMode::SYNC) {
finalized_on_all_replicas = finalized && finalized_on_all_replicas;
}
}
});
}
return finalized_on_all_replicas;
}
void storage::InMemoryStorage::ReplicationState::InitializeTransaction(uint64_t seq_num) {
if (GetRole() == replication::ReplicationRole::MAIN) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->StartTransactionReplication(seq_num);
}
});
}
}
void storage::InMemoryStorage::ReplicationState::AppendDelta(const Delta &delta, const Edge &parent,
uint64_t timestamp) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, parent, timestamp); });
}
});
}
void storage::InMemoryStorage::ReplicationState::AppendDelta(const Delta &delta, const Vertex &parent,
uint64_t timestamp) {
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendDelta(delta, parent, timestamp); });
}
});
}
bool storage::InMemoryStorage::ReplicationState::FinalizeTransaction(uint64_t timestamp) {
bool finalized_on_all_replicas = true;
replication_clients_.WithLock([&](auto &clients) {
for (auto &client : clients) {
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(timestamp); });
const auto finalized = client->FinalizeTransactionReplication();
if (client->Mode() == replication::ReplicationMode::SYNC) {
finalized_on_all_replicas = finalized && finalized_on_all_replicas;
}
}
});
return finalized_on_all_replicas;
}
utils::BasicResult<InMemoryStorage::ReplicationState::RegisterReplicaError>
InMemoryStorage::ReplicationState::RegisterReplica(std::string name, io::network::Endpoint endpoint,
const replication::ReplicationMode replication_mode,
const replication::RegistrationMode registration_mode,
const replication::ReplicationClientConfig &config,
InMemoryStorage *storage) {
MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can register a replica!");
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; });
@ -1906,13 +1942,13 @@ utils::BasicResult<InMemoryStorage::RegisterReplicaError> InMemoryStorage::Regis
.replica_check_frequency = config.replica_check_frequency,
.ssl = config.ssl,
.role = replication::ReplicationRole::REPLICA});
if (!storage_->Put(name, data.dump())) {
if (!durability_->Put(name, data.dump())) {
spdlog::error("Error when saving replica {} in settings.", name);
return RegisterReplicaError::COULD_NOT_BE_PERSISTED;
}
}
auto client = std::make_unique<ReplicationClient>(std::move(name), this, endpoint, replication_mode, config);
auto client = std::make_unique<ReplicationClient>(std::move(name), storage, endpoint, replication_mode, config);
if (client->State() == replication::ReplicaState::INVALID) {
if (replication::RegistrationMode::CAN_BE_INVALID != registration_mode) {
@ -1922,29 +1958,62 @@ utils::BasicResult<InMemoryStorage::RegisterReplicaError> InMemoryStorage::Regis
spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name());
}
return replication_clients_.WithLock([&](auto &clients) -> utils::BasicResult<InMemoryStorage::RegisterReplicaError> {
// Another thread could have added a client with same name while
// we were connecting to this client.
if (std::any_of(clients.begin(), clients.end(),
[&](const auto &other_client) { return client->Name() == other_client->Name(); })) {
return RegisterReplicaError::NAME_EXISTS;
}
return replication_clients_.WithLock(
[&](auto &clients) -> utils::BasicResult<InMemoryStorage::ReplicationState::RegisterReplicaError> {
// Another thread could have added a client with same name while
// we were connecting to this client.
if (std::any_of(clients.begin(), clients.end(),
[&](const auto &other_client) { return client->Name() == other_client->Name(); })) {
return RegisterReplicaError::NAME_EXISTS;
}
if (std::any_of(clients.begin(), clients.end(),
[&client](const auto &other_client) { return client->Endpoint() == other_client->Endpoint(); })) {
return RegisterReplicaError::END_POINT_EXISTS;
}
if (std::any_of(clients.begin(), clients.end(), [&client](const auto &other_client) {
return client->Endpoint() == other_client->Endpoint();
})) {
return RegisterReplicaError::END_POINT_EXISTS;
}
clients.push_back(std::move(client));
return {};
});
clients.push_back(std::move(client));
return {};
});
}
bool InMemoryStorage::UnregisterReplica(const std::string &name) {
MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN,
"Only main instance can unregister a replica!");
bool InMemoryStorage::ReplicationState::SetReplicaRole(io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config,
InMemoryStorage *storage) {
// We don't want to restart the server if we're already a REPLICA
if (GetRole() == replication::ReplicationRole::REPLICA) {
return false;
}
auto port = endpoint.port; // assigning because we will move the endpoint
replication_server_ = std::make_unique<ReplicationServer>(storage, std::move(endpoint), config);
if (ShouldStoreAndRestoreReplicationState()) {
if (!storage_->Delete(name)) {
// Only thing that matters here is the role saved as REPLICA and the listening port
auto data = replication::ReplicationStatusToJSON(
replication::ReplicationStatus{.name = replication::kReservedReplicationRoleName,
.ip_address = "",
.port = port,
.sync_mode = replication::ReplicationMode::SYNC,
.replica_check_frequency = std::chrono::seconds(0),
.ssl = std::nullopt,
.role = replication::ReplicationRole::REPLICA});
if (!durability_->Put(replication::kReservedReplicationRoleName, data.dump())) {
spdlog::error("Error when saving REPLICA replication role in settings.");
return false;
}
}
SetRole(replication::ReplicationRole::REPLICA);
return true;
}
bool InMemoryStorage::ReplicationState::UnregisterReplica(std::string_view name) {
MG_ASSERT(GetRole() == replication::ReplicationRole::MAIN, "Only main instance can unregister a replica!");
if (ShouldStoreAndRestoreReplicationState()) {
if (!durability_->Delete(name)) {
spdlog::error("Error when removing replica {} from settings.", name);
return false;
}
@ -1955,7 +2024,8 @@ bool InMemoryStorage::UnregisterReplica(const std::string &name) {
});
}
std::optional<replication::ReplicaState> InMemoryStorage::GetReplicaState(const std::string_view name) {
std::optional<replication::ReplicaState> InMemoryStorage::ReplicationState::GetReplicaState(
const std::string_view name) {
return replication_clients_.WithLock([&](auto &clients) -> std::optional<replication::ReplicaState> {
const auto client_it =
std::find_if(clients.cbegin(), clients.cend(), [name](auto &client) { return client->Name() == name; });
@ -1966,9 +2036,7 @@ std::optional<replication::ReplicaState> InMemoryStorage::GetReplicaState(const
});
}
replication::ReplicationRole InMemoryStorage::GetReplicationRole() const { return replication_role_; }
std::vector<InMemoryStorage::ReplicaInfo> InMemoryStorage::ReplicasInfo() {
std::vector<InMemoryStorage::ReplicaInfo> InMemoryStorage::ReplicationState::ReplicasInfo() {
return replication_clients_.WithLock([](auto &clients) {
std::vector<InMemoryStorage::ReplicaInfo> replica_info;
replica_info.reserve(clients.size());
@ -1980,7 +2048,7 @@ std::vector<InMemoryStorage::ReplicaInfo> InMemoryStorage::ReplicasInfo() {
});
}
void InMemoryStorage::RestoreReplicationRole() {
void InMemoryStorage::ReplicationState::RestoreReplicationRole(InMemoryStorage *storage) {
if (!ShouldStoreAndRestoreReplicationState()) {
return;
}
@ -1988,7 +2056,7 @@ void InMemoryStorage::RestoreReplicationRole() {
spdlog::info("Restoring replication role.");
uint16_t port = replication::kDefaultReplicationPort;
const auto replication_data = storage_->Get(replication::kReservedReplicationRoleName);
const auto replication_data = durability_->Get(replication::kReservedReplicationRoleName);
if (!replication_data.has_value()) {
spdlog::debug("Cannot find data needed for restore replication role in persisted metadata.");
return;
@ -2002,29 +2070,29 @@ void InMemoryStorage::RestoreReplicationRole() {
const auto replication_status = *maybe_replication_status;
if (!replication_status.role.has_value()) {
replication_role_.store(replication::ReplicationRole::MAIN);
SetRole(replication::ReplicationRole::MAIN);
} else {
replication_role_.store(*replication_status.role);
SetRole(*replication_status.role);
port = replication_status.port;
}
if (replication_role_ == replication::ReplicationRole::REPLICA) {
if (GetRole() == replication::ReplicationRole::REPLICA) {
io::network::Endpoint endpoint(replication::kDefaultReplicationServerIp, port);
replication_server_ =
std::make_unique<ReplicationServer>(this, std::move(endpoint), replication::ReplicationServerConfig{});
std::make_unique<ReplicationServer>(storage, std::move(endpoint), replication::ReplicationServerConfig{});
}
spdlog::info("Replication role restored to {}.",
replication_role_ == replication::ReplicationRole::MAIN ? "MAIN" : "REPLICA");
GetRole() == replication::ReplicationRole::MAIN ? "MAIN" : "REPLICA");
}
void InMemoryStorage::RestoreReplicas() {
void InMemoryStorage::ReplicationState::RestoreReplicas(InMemoryStorage *storage) {
if (!ShouldStoreAndRestoreReplicationState()) {
return;
}
spdlog::info("Restoring replicas.");
for (const auto &[replica_name, replica_data] : *storage_) {
for (const auto &[replica_name, replica_data] : *durability_) {
spdlog::info("Restoring replica {}.", replica_name);
const auto maybe_replica_status = replication::JSONToReplicationStatus(nlohmann::json::parse(replica_data));
@ -2046,7 +2114,8 @@ void InMemoryStorage::RestoreReplicas() {
{
.replica_check_frequency = replica_status.replica_check_frequency,
.ssl = replica_status.ssl,
});
},
storage);
if (ret.HasError()) {
MG_ASSERT(RegisterReplicaError::CONNECTION_FAILED != ret.GetError());
@ -2056,31 +2125,4 @@ void InMemoryStorage::RestoreReplicas() {
}
}
bool InMemoryStorage::ShouldStoreAndRestoreReplicationState() const { return nullptr != storage_; }
utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::IsPathLocked() {
auto locker_accessor = global_locker_.Access();
return locker_accessor.IsPathLocked(config_.durability.storage_directory);
}
utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::LockPath() {
auto locker_accessor = global_locker_.Access();
return locker_accessor.AddPath(config_.durability.storage_directory);
}
utils::FileRetainer::FileLockerAccessor::ret_type InMemoryStorage::UnlockPath() {
{
auto locker_accessor = global_locker_.Access();
const auto ret = locker_accessor.RemovePath(config_.durability.storage_directory);
if (ret.HasError() || !ret.GetValue()) {
// Exit without cleaning the queue
return ret;
}
}
// We use locker accessor in seperate scope so we don't produce deadlock
// after we call clean queue.
file_retainer_.CleanQueue();
return true;
}
} // namespace memgraph::storage

View File

@ -11,6 +11,7 @@
#pragma once
#include <cstddef>
#include "storage/v2/inmemory/label_index.hpp"
#include "storage/v2/inmemory/label_property_index.hpp"
#include "storage/v2/storage.hpp"
@ -25,20 +26,19 @@
namespace memgraph::storage {
class ReplicationServer;
class ReplicationClient;
// The storage is based on this paper:
// https://db.in.tum.de/~muehlbau/papers/mvcc.pdf
// The paper implements a fully serializable storage, in our implementation we
// only implement snapshot isolation for transactions.
class InMemoryStorage final : public Storage {
public:
enum class RegisterReplicaError : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
CONNECTION_FAILED,
COULD_NOT_BE_PERSISTED
};
friend class ReplicationServer;
friend class ReplicationClient;
public:
struct TimestampInfo {
uint64_t current_timestamp_of_replica;
uint64_t current_number_of_timestamp_behind_master;
@ -69,7 +69,7 @@ class InMemoryStorage final : public Storage {
~InMemoryStorage() override;
class InMemoryAccessor final : public Storage::Accessor {
class InMemoryAccessor : public Storage::Accessor {
private:
friend class InMemoryStorage;
@ -268,16 +268,33 @@ class InMemoryStorage final : public Storage {
void FinalizeTransaction() override;
private:
protected:
// TODO Better naming
/// @throw std::bad_alloc
VertexAccessor CreateVertex(storage::Gid gid);
VertexAccessor CreateVertexEx(storage::Gid gid);
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid);
Result<EdgeAccessor> CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid);
Config::Items config_;
};
class ReplicationAccessor final : public InMemoryAccessor {
public:
explicit ReplicationAccessor(InMemoryAccessor &&inmem) : InMemoryAccessor(std::move(inmem)) {}
/// @throw std::bad_alloc
VertexAccessor CreateVertexEx(storage::Gid gid) { return InMemoryAccessor::CreateVertexEx(gid); }
/// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type,
storage::Gid gid) {
return InMemoryAccessor::CreateEdgeEx(from, to, edge_type, gid);
}
const Transaction &GetTransaction() const { return transaction_; }
Transaction &GetTransaction() { return transaction_; }
};
std::unique_ptr<Storage::Accessor> Access(std::optional<IsolationLevel> override_isolation_level) override {
return std::unique_ptr<InMemoryAccessor>(
new InMemoryAccessor{this, override_isolation_level.value_or(isolation_level_), storage_mode_});
@ -359,24 +376,32 @@ class InMemoryStorage final : public Storage {
utils::BasicResult<StorageUniqueConstraintDroppingError, UniqueConstraints::DeletionStatus> DropUniqueConstraint(
LabelId label, const std::set<PropertyId> &properties, std::optional<uint64_t> desired_commit_timestamp) override;
bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config);
bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) {
return replication_state_.SetReplicaRole(std::move(endpoint), config, this);
}
bool SetMainReplicationRole();
bool SetMainReplicationRole() { return replication_state_.SetMainReplicationRole(this); }
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication
utils::BasicResult<RegisterReplicaError, void> RegisterReplica(std::string name, io::network::Endpoint endpoint,
replication::ReplicationMode replication_mode,
replication::RegistrationMode registration_mode,
const replication::ReplicationClientConfig &config);
auto RegisterReplica(std::string name, io::network::Endpoint endpoint,
const replication::ReplicationMode replication_mode,
const replication::RegistrationMode registration_mode,
const replication::ReplicationClientConfig &config) {
return replication_state_.RegisterReplica(std::move(name), std::move(endpoint), replication_mode, registration_mode,
config, this);
}
/// @pre The instance should have a MAIN role
bool UnregisterReplica(const std::string &name);
bool UnregisterReplica(const std::string &name) { return replication_state_.UnregisterReplica(name); }
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name);
replication::ReplicationRole GetReplicationRole() const { return replication_state_.GetRole(); }
replication::ReplicationRole GetReplicationRole() const;
auto ReplicasInfo() { return replication_state_.ReplicasInfo(); }
std::vector<ReplicaInfo> ReplicasInfo();
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name) {
return replication_state_.GetReplicaState(name);
}
void FreeMemory(std::unique_lock<utils::RWLock> main_guard) override;
@ -416,11 +441,11 @@ class InMemoryStorage final : public Storage {
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
void RestoreReplicas();
void RestoreReplicationRole() { return replication_state_.RestoreReplicationRole(this); }
void RestoreReplicationRole();
void RestoreReplicas() { return replication_state_.RestoreReplicas(this); }
bool ShouldStoreAndRestoreReplicationState() const;
void EstablishNewEpoch() override;
// Main object storage
utils::SkipList<storage::Vertex> vertices_;
@ -430,7 +455,6 @@ class InMemoryStorage final : public Storage {
std::filesystem::path snapshot_directory_;
std::filesystem::path lock_file_path_;
utils::OutputFile lock_file_handle_;
std::unique_ptr<kvstore::KVStore> storage_;
std::filesystem::path wal_directory_;
utils::Scheduler snapshot_runner_;
@ -455,7 +479,10 @@ class InMemoryStorage final : public Storage {
// and register it on S1.
// Without using the epoch_id, we don't know that S1 and S2 have completely
// different transactions, we think that the S2 is behind only by 5 commits.
std::string epoch_id_;
std::string epoch_id_; // TODO: Move to replication level
// Questions:
// - storage durability <- databases/*name*/wal and snapshots (where this for epoch_id)
// - multi-tenant durability <- databases/.durability (there is a list of all active tenants)
// History of the previous epoch ids.
// Each value consists of the epoch id along the last commit belonging to that
// epoch.
@ -499,24 +526,80 @@ class InMemoryStorage final : public Storage {
std::atomic<uint64_t> last_commit_timestamp_{kTimestampInitialId};
class ReplicationServer;
std::unique_ptr<ReplicationServer> replication_server_{nullptr};
public:
struct ReplicationState {
enum class RegisterReplicaError : uint8_t {
NAME_EXISTS,
END_POINT_EXISTS,
CONNECTION_FAILED,
COULD_NOT_BE_PERSISTED
};
class ReplicationClient;
// We create ReplicationClient using unique_ptr so we can move
// newly created client into the vector.
// We cannot move the client directly because it contains ThreadPool
// which cannot be moved. Also, the move is necessary because
// we don't want to create the client directly inside the vector
// because that would require the lock on the list putting all
// commits (they iterate list of clients) to halt.
// This way we can initialize client in main thread which means
// that we can immediately notify the user if the initialization
// failed.
using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>;
ReplicationClientList replication_clients_;
// TODO Move to private (needed for Storage construction)
std::unique_ptr<kvstore::KVStore> durability_;
std::atomic<replication::ReplicationRole> replication_role_{replication::ReplicationRole::MAIN};
// Generic API
void Reset();
// TODO: Just check if server exists -> you are REPLICA
replication::ReplicationRole GetRole() const { return replication_role_.load(); }
bool SetMainReplicationRole(Storage *storage); // Set the instance to MAIN
// TODO: ReplicationServer/Client uses InMemoryStorage* for RPC callbacks
bool SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config,
InMemoryStorage *storage); // Sets the instance to REPLICA
// Generic restoration
void RestoreReplicationRole(InMemoryStorage *storage);
// MAIN actually doing the replication
bool AppendToWalDataDefinition(uint64_t seq_num, durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties, uint64_t final_commit_timestamp);
void InitializeTransaction(uint64_t seq_num);
void AppendDelta(const Delta &delta, const Vertex &parent, uint64_t timestamp);
void AppendDelta(const Delta &delta, const Edge &parent, uint64_t timestamp);
bool FinalizeTransaction(uint64_t timestamp);
// MAIN connecting to replicas
utils::BasicResult<RegisterReplicaError> RegisterReplica(std::string name, io::network::Endpoint endpoint,
replication::ReplicationMode replication_mode,
replication::RegistrationMode registration_mode,
const replication::ReplicationClientConfig &config,
InMemoryStorage *storage);
bool UnregisterReplica(std::string_view name);
// MAIN reconnecting to replicas
void RestoreReplicas(InMemoryStorage *storage);
// MAIN getting info from replicas
// TODO make into const (problem with SpinLock and WithReadLock)
std::optional<replication::ReplicaState> GetReplicaState(std::string_view name);
std::vector<InMemoryStorage::ReplicaInfo> ReplicasInfo();
private:
bool ShouldStoreAndRestoreReplicationState() const { return nullptr != durability_; }
void SetRole(replication::ReplicationRole role) { return replication_role_.store(role); }
// NOTE: Server is not in MAIN it is in REPLICA
std::unique_ptr<ReplicationServer> replication_server_{nullptr};
// We create ReplicationClient using unique_ptr so we can move
// newly created client into the vector.
// We cannot move the client directly because it contains ThreadPool
// which cannot be moved. Also, the move is necessary because
// we don't want to create the client directly inside the vector
// because that would require the lock on the list putting all
// commits (they iterate list of clients) to halt.
// This way we can initialize client in main thread which means
// that we can immediately notify the user if the initialization
// failed.
using ReplicationClientList = utils::Synchronized<std::vector<std::unique_ptr<ReplicationClient>>, utils::SpinLock>;
ReplicationClientList replication_clients_;
std::atomic<replication::ReplicationRole> replication_role_{replication::ReplicationRole::MAIN};
};
private:
ReplicationState replication_state_;
};
} // namespace memgraph::storage

View File

@ -30,10 +30,9 @@ template <typename>
} // namespace
////// ReplicationClient //////
InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemoryStorage *storage,
const io::network::Endpoint &endpoint,
const replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config)
ReplicationClient::ReplicationClient(std::string name, InMemoryStorage *storage, const io::network::Endpoint &endpoint,
const replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config)
: name_(std::move(name)), storage_(storage), mode_(mode) {
spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port);
if (config.ssl) {
@ -42,7 +41,6 @@ InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemory
rpc_context_.emplace();
}
rpc_client_.emplace(endpoint, &*rpc_context_);
TryInitializeClientSync();
@ -52,14 +50,14 @@ InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemory
}
}
void InMemoryStorage::ReplicationClient::TryInitializeClientAsync() {
void ReplicationClient::TryInitializeClientAsync() {
thread_pool_.AddTask([this] {
rpc_client_->Abort();
this->TryInitializeClientSync();
});
}
void InMemoryStorage::ReplicationClient::FrequentCheck() {
void ReplicationClient::FrequentCheck() {
const auto is_success = std::invoke([this]() {
try {
auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()};
@ -85,7 +83,7 @@ void InMemoryStorage::ReplicationClient::FrequentCheck() {
}
/// @throws rpc::RpcFailedException
void InMemoryStorage::ReplicationClient::InitializeClient() {
void ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId};
std::optional<std::string> epoch_id;
@ -137,7 +135,7 @@ void InMemoryStorage::ReplicationClient::InitializeClient() {
}
}
void InMemoryStorage::ReplicationClient::TryInitializeClientSync() {
void ReplicationClient::TryInitializeClientSync() {
try {
InitializeClient();
} catch (const rpc::RpcFailedException &) {
@ -148,20 +146,19 @@ void InMemoryStorage::ReplicationClient::TryInitializeClientSync() {
}
}
void InMemoryStorage::ReplicationClient::HandleRpcFailure() {
void ReplicationClient::HandleRpcFailure() {
spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication"));
TryInitializeClientAsync();
}
replication::SnapshotRes InMemoryStorage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
replication::SnapshotRes ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
auto stream{rpc_client_->Stream<replication::SnapshotRpc>()};
replication::Encoder encoder(stream.GetBuilder());
encoder.WriteFile(path);
return stream.AwaitResponse();
}
replication::WalFilesRes InMemoryStorage::ReplicationClient::TransferWalFiles(
const std::vector<std::filesystem::path> &wal_files) {
replication::WalFilesRes ReplicationClient::TransferWalFiles(const std::vector<std::filesystem::path> &wal_files) {
MG_ASSERT(!wal_files.empty(), "Wal files list is empty!");
auto stream{rpc_client_->Stream<replication::WalFilesRpc>(wal_files.size())};
replication::Encoder encoder(stream.GetBuilder());
@ -173,7 +170,7 @@ replication::WalFilesRes InMemoryStorage::ReplicationClient::TransferWalFiles(
return stream.AwaitResponse();
}
void InMemoryStorage::ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) {
void ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) {
std::unique_lock guard(client_lock_);
const auto status = replica_state_.load();
switch (status) {
@ -207,8 +204,7 @@ void InMemoryStorage::ReplicationClient::StartTransactionReplication(const uint6
}
}
void InMemoryStorage::ReplicationClient::IfStreamingTransaction(
const std::function<void(ReplicaStream &handler)> &callback) {
void ReplicationClient::IfStreamingTransaction(const std::function<void(ReplicaStream &handler)> &callback) {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
@ -228,7 +224,7 @@ void InMemoryStorage::ReplicationClient::IfStreamingTransaction(
}
}
bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplication() {
bool ReplicationClient::FinalizeTransactionReplication() {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
@ -245,7 +241,7 @@ bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplication() {
}
}
bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplicationInternal() {
bool ReplicationClient::FinalizeTransactionReplicationInternal() {
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
try {
auto response = replica_stream_->Finalize();
@ -269,7 +265,7 @@ bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplicationInternal(
return false;
}
void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
void ReplicationClient::RecoverReplica(uint64_t replica_commit) {
spdlog::debug("Starting replica recover");
while (true) {
auto file_locker = storage_->file_retainer_.AddLocker();
@ -337,7 +333,7 @@ void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit)
}
}
uint64_t InMemoryStorage::ReplicationClient::ReplicateCurrentWal() {
uint64_t ReplicationClient::ReplicateCurrentWal() {
const auto &wal_file = storage_->wal_file_;
auto stream = TransferCurrentWalFile();
stream.AppendFilename(wal_file->Path().filename());
@ -371,7 +367,7 @@ uint64_t InMemoryStorage::ReplicationClient::ReplicateCurrentWal() {
/// recovery steps, so we can safely send it to the replica.
/// We assume that the property of preserving at least 1 WAL before the snapshot
/// is satisfied as we extract the timestamp information from it.
std::vector<InMemoryStorage::ReplicationClient::RecoveryStep> InMemoryStorage::ReplicationClient::GetRecoverySteps(
std::vector<ReplicationClient::RecoveryStep> ReplicationClient::GetRecoverySteps(
const uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker) {
// First check if we can recover using the current wal file only
// otherwise save the seq_num of the current wal file
@ -517,7 +513,7 @@ std::vector<InMemoryStorage::ReplicationClient::RecoveryStep> InMemoryStorage::R
return recovery_steps;
}
InMemoryStorage::TimestampInfo InMemoryStorage::ReplicationClient::GetTimestampInfo() {
InMemoryStorage::TimestampInfo ReplicationClient::GetTimestampInfo() {
InMemoryStorage::TimestampInfo info;
info.current_timestamp_of_replica = 0;
info.current_number_of_timestamp_behind_master = 0;
@ -546,71 +542,63 @@ InMemoryStorage::TimestampInfo InMemoryStorage::ReplicationClient::GetTimestampI
}
////// ReplicaStream //////
InMemoryStorage::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self,
const uint64_t previous_commit_timestamp,
const uint64_t current_seq_num)
ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self, const uint64_t previous_commit_timestamp,
const uint64_t current_seq_num)
: self_(self),
stream_(self_->rpc_client_->Stream<replication::AppendDeltasRpc>(previous_commit_timestamp, current_seq_num)) {
replication::Encoder encoder{stream_.GetBuilder()};
encoder.WriteString(self_->storage_->epoch_id_);
}
void InMemoryStorage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t final_commit_timestamp) {
void ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, self_->storage_->name_id_mapper_.get(), self_->storage_->config_.items, delta, vertex,
final_commit_timestamp);
}
void InMemoryStorage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge,
uint64_t final_commit_timestamp) {
void ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge,
uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, self_->storage_->name_id_mapper_.get(), delta, edge, final_commit_timestamp);
}
void InMemoryStorage::ReplicationClient::ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) {
void ReplicationClient::ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeTransactionEnd(&encoder, final_commit_timestamp);
}
void InMemoryStorage::ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation,
LabelId label,
const std::set<PropertyId> &properties,
uint64_t timestamp) {
void ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties, uint64_t timestamp) {
replication::Encoder encoder(stream_.GetBuilder());
EncodeOperation(&encoder, self_->storage_->name_id_mapper_.get(), operation, label, properties, timestamp);
}
replication::AppendDeltasRes InMemoryStorage::ReplicationClient::ReplicaStream::Finalize() {
return stream_.AwaitResponse();
}
replication::AppendDeltasRes ReplicationClient::ReplicaStream::Finalize() { return stream_.AwaitResponse(); }
////// CurrentWalHandler //////
InMemoryStorage::ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self)
ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_->Stream<replication::CurrentWalRpc>()) {}
void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendFilename(const std::string &filename) {
void ReplicationClient::CurrentWalHandler::AppendFilename(const std::string &filename) {
replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteString(filename);
}
void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) {
void ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) {
replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteUint(size);
}
void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendFileData(utils::InputFile *file) {
void ReplicationClient::CurrentWalHandler::AppendFileData(utils::InputFile *file) {
replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteFileData(file);
}
void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendBufferData(const uint8_t *buffer,
const size_t buffer_size) {
void ReplicationClient::CurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) {
replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteBuffer(buffer, buffer_size);
}
replication::CurrentWalRes InMemoryStorage::ReplicationClient::CurrentWalHandler::Finalize() {
return stream_.AwaitResponse();
}
replication::CurrentWalRes ReplicationClient::CurrentWalHandler::Finalize() { return stream_.AwaitResponse(); }
} // namespace memgraph::storage

View File

@ -37,7 +37,7 @@
namespace memgraph::storage {
class InMemoryStorage::ReplicationClient {
class ReplicationClient {
public:
ReplicationClient(std::string name, InMemoryStorage *storage, const io::network::Endpoint &endpoint,
replication::ReplicationMode mode, const replication::ReplicationClientConfig &config = {});

View File

@ -44,8 +44,8 @@ std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder
};
} // namespace
InMemoryStorage::ReplicationServer::ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config)
ReplicationServer::ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config)
: storage_(storage), endpoint_(endpoint) {
// Create RPC server.
if (config.ssl) {
@ -92,21 +92,21 @@ InMemoryStorage::ReplicationServer::ReplicationServer(InMemoryStorage *storage,
rpc_server_->Start();
}
void InMemoryStorage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::HeartbeatReq req;
slk::Load(&req, req_reader);
replication::HeartbeatRes res{true, storage_->last_commit_timestamp_.load(), storage_->epoch_id_};
slk::Save(res, res_builder);
}
void InMemoryStorage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::FrequentHeartbeatReq req;
slk::Load(&req, req_reader);
replication::FrequentHeartbeatRes res{true};
slk::Save(res, res_builder);
}
void InMemoryStorage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::AppendDeltasReq req;
slk::Load(&req, req_reader);
@ -155,7 +155,7 @@ void InMemoryStorage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_re
spdlog::debug("Replication recovery from append deltas finished, replica is now up to date!");
}
void InMemoryStorage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::SnapshotReq req;
slk::Load(&req, req_reader);
@ -228,7 +228,7 @@ void InMemoryStorage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader
spdlog::debug("Replication recovery from snapshot finished!");
}
void InMemoryStorage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::WalFilesReq req;
slk::Load(&req, req_reader);
@ -248,7 +248,7 @@ void InMemoryStorage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader
spdlog::debug("Replication recovery from WAL files ended successfully, replica is now up to date!");
}
void InMemoryStorage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::CurrentWalReq req;
slk::Load(&req, req_reader);
@ -263,7 +263,7 @@ void InMemoryStorage::ReplicationServer::CurrentWalHandler(slk::Reader *req_read
spdlog::debug("Replication recovery from current WAL ended successfully, replica is now up to date!");
}
void InMemoryStorage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
void ReplicationServer::LoadWal(replication::Decoder *decoder) {
const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory;
utils::EnsureDir(temp_wal_directory);
auto maybe_wal_path = decoder->ReadFile(temp_wal_directory);
@ -308,7 +308,7 @@ void InMemoryStorage::ReplicationServer::LoadWal(replication::Decoder *decoder)
}
}
void InMemoryStorage::ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
void ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::TimestampReq req;
slk::Load(&req, req_reader);
@ -316,30 +316,28 @@ void InMemoryStorage::ReplicationServer::TimestampHandler(slk::Reader *req_reade
slk::Save(res, res_builder);
}
InMemoryStorage::ReplicationServer::~ReplicationServer() {
ReplicationServer::~ReplicationServer() {
if (rpc_server_) {
spdlog::trace("Closing replication server on {}:{}", endpoint_.address, endpoint_.port);
rpc_server_->Shutdown();
rpc_server_->AwaitShutdown();
}
}
uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) {
uint64_t ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) {
auto edge_acc = storage_->edges_.access();
auto vertex_acc = storage_->vertices_.access();
std::optional<std::pair<uint64_t, std::unique_ptr<storage::Storage::Accessor>>> commit_timestamp_and_accessor;
std::optional<std::pair<uint64_t, storage::InMemoryStorage::ReplicationAccessor>> commit_timestamp_and_accessor;
auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) {
if (!commit_timestamp_and_accessor) {
commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access(std::optional<IsolationLevel>{}));
auto acc = storage_->Access(std::nullopt);
auto inmem_acc = std::unique_ptr<storage::InMemoryStorage::InMemoryAccessor>(
static_cast<storage::InMemoryStorage::InMemoryAccessor *>(acc.release()));
commit_timestamp_and_accessor.emplace(commit_timestamp, std::move(*inmem_acc));
} else if (commit_timestamp_and_accessor->first != commit_timestamp) {
throw utils::BasicException("Received more than one transaction!");
}
// TODO: Rethink this if we would reuse ReplicationServer for on disk storage.
if (auto *inmemoryAcc =
static_cast<storage::InMemoryStorage::InMemoryAccessor *>(commit_timestamp_and_accessor->second.get())) {
return inmemoryAcc;
}
throw utils::BasicException("Received transaction for not supported storage!");
return &commit_timestamp_and_accessor->second;
};
uint64_t applied_deltas = 0;
@ -362,7 +360,7 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD
case durability::WalDeltaData::Type::VERTEX_CREATE: {
spdlog::trace(" Create vertex {}", delta.vertex_create_delete.gid.AsUint());
auto transaction = get_transaction(timestamp);
transaction->CreateVertex(delta.vertex_create_delete.gid);
transaction->CreateVertexEx(delta.vertex_create_delete.gid);
break;
}
case durability::WalDeltaData::Type::VERTEX_DELETE: {
@ -414,9 +412,9 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD
if (!from_vertex) throw utils::BasicException("Invalid transaction!");
auto to_vertex = transaction->FindVertex(delta.edge_create_delete.to_vertex, storage::View::NEW);
if (!to_vertex) throw utils::BasicException("Invalid transaction!");
auto edge = transaction->CreateEdge(&*from_vertex, &*to_vertex,
transaction->NameToEdgeType(delta.edge_create_delete.edge_type),
delta.edge_create_delete.gid);
auto edge = transaction->CreateEdgeEx(&*from_vertex, &*to_vertex,
transaction->NameToEdgeType(delta.edge_create_delete.edge_type),
delta.edge_create_delete.gid);
if (edge.HasError()) throw utils::BasicException("Invalid transaction!");
break;
}
@ -463,7 +461,7 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD
is_visible = !edge->deleted;
delta = edge->delta;
}
ApplyDeltasForRead(&transaction->transaction_, delta, View::NEW, [&is_visible](const Delta &delta) {
ApplyDeltasForRead(&transaction->GetTransaction(), delta, View::NEW, [&is_visible](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
@ -496,7 +494,7 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD
EdgeTypeId::FromUint(0UL),
nullptr,
nullptr,
&transaction->transaction_,
&transaction->GetTransaction(),
&storage_->indices_,
&storage_->constraints_,
storage_->config_.items};
@ -511,7 +509,7 @@ uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseD
spdlog::trace(" Transaction end");
if (!commit_timestamp_and_accessor || commit_timestamp_and_accessor->first != timestamp)
throw utils::BasicException("Invalid data!");
auto ret = commit_timestamp_and_accessor->second->Commit(commit_timestamp_and_accessor->first);
auto ret = commit_timestamp_and_accessor->second.Commit(commit_timestamp_and_accessor->first);
if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
commit_timestamp_and_accessor = std::nullopt;
break;

View File

@ -18,7 +18,7 @@
namespace memgraph::storage {
class InMemoryStorage::ReplicationServer {
class ReplicationServer {
public:
explicit ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config);

View File

@ -277,6 +277,8 @@ class Storage {
virtual Transaction CreateTransaction(IsolationLevel isolation_level, StorageMode storage_mode) = 0;
virtual void EstablishNewEpoch() = 0;
// Main storage lock.
// Accessors take a shared lock when starting, so it is possible to block
// creation of new accessors by taking a unique lock. This is used when doing

View File

@ -747,12 +747,12 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
.HasError());
const std::string replica2_name{replicas[0]};
ASSERT_TRUE(main_mem_store
->RegisterReplica(replica2_name, replica2_endpoint,
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
memgraph::storage::replication::ReplicationClientConfig{})
.GetError() == memgraph::storage::InMemoryStorage::RegisterReplicaError::NAME_EXISTS);
ASSERT_TRUE(
main_mem_store
->RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
memgraph::storage::replication::ReplicationClientConfig{})
.GetError() == memgraph::storage::InMemoryStorage::ReplicationState::RegisterReplicaError::NAME_EXISTS);
}
TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
@ -779,12 +779,12 @@ TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
.HasError());
const std::string replica2_name{replicas[1]};
ASSERT_TRUE(main_mem_store
->RegisterReplica(replica2_name, replica2_endpoint,
memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
memgraph::storage::replication::ReplicationClientConfig{})
.GetError() == memgraph::storage::InMemoryStorage::RegisterReplicaError::END_POINT_EXISTS);
ASSERT_TRUE(
main_mem_store
->RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
memgraph::storage::replication::ReplicationClientConfig{})
.GetError() == memgraph::storage::InMemoryStorage::ReplicationState::RegisterReplicaError::END_POINT_EXISTS);
}
TEST_F(ReplicationTest, RestoringReplicationAtStartupAfterDroppingReplica) {
@ -907,5 +907,6 @@ TEST_F(ReplicationTest, AddingInvalidReplica) {
memgraph::storage::replication::ReplicationMode::SYNC,
memgraph::storage::replication::RegistrationMode::MUST_BE_INSTANTLY_VALID,
memgraph::storage::replication::ReplicationClientConfig{})
.GetError() == memgraph::storage::InMemoryStorage::RegisterReplicaError::CONNECTION_FAILED);
.GetError() ==
memgraph::storage::InMemoryStorage::ReplicationState::RegisterReplicaError::CONNECTION_FAILED);
}