Improve replication logging (#1030)
This commit is contained in:
parent
5ce1526995
commit
261aa4f49b
@ -118,7 +118,7 @@ void Storage::ReplicationClient::InitializeClient() {
|
||||
}
|
||||
|
||||
current_commit_timestamp = response.current_commit_timestamp;
|
||||
spdlog::trace("Current timestamp on replica: {}", current_commit_timestamp);
|
||||
spdlog::trace("Current timestamp on replica {}: {}", name_, current_commit_timestamp);
|
||||
spdlog::trace("Current timestamp on main: {}", storage_->last_commit_timestamp_.load());
|
||||
if (current_commit_timestamp == storage_->last_commit_timestamp_.load()) {
|
||||
spdlog::debug("Replica '{}' up to date", name_);
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <atomic>
|
||||
#include <filesystem>
|
||||
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "storage/v2/durability/durability.hpp"
|
||||
#include "storage/v2/durability/paths.hpp"
|
||||
#include "storage/v2/durability/serialization.hpp"
|
||||
@ -102,6 +103,7 @@ void Storage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reade
|
||||
}
|
||||
|
||||
void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
|
||||
spdlog::debug("Started replication recovery from appending deltas!");
|
||||
replication::AppendDeltasReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
|
||||
@ -120,6 +122,7 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
|
||||
storage_->wal_file_->FinalizeWal();
|
||||
storage_->wal_file_.reset();
|
||||
storage_->wal_seq_num_ = req.seq_num;
|
||||
spdlog::trace("Finalized WAL file");
|
||||
} else {
|
||||
MG_ASSERT(storage_->wal_file_->SequenceNumber() == req.seq_num, "Invalid sequence number of current wal file");
|
||||
storage_->wal_seq_num_ = req.seq_num + 1;
|
||||
@ -146,9 +149,11 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
|
||||
|
||||
replication::AppendDeltasRes res{true, storage_->last_commit_timestamp_.load()};
|
||||
slk::Save(res, res_builder);
|
||||
spdlog::debug("Replication recovery from append deltas finished, replica is now up to date!");
|
||||
}
|
||||
|
||||
void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
|
||||
spdlog::debug("Started replication recovery from received snapshot file!");
|
||||
replication::SnapshotReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
|
||||
@ -161,6 +166,7 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B
|
||||
spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
|
||||
|
||||
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
|
||||
spdlog::trace("Clearing database since recovering from snapshot.");
|
||||
// Clear the database
|
||||
storage_->vertices_.clear();
|
||||
storage_->edges_.clear();
|
||||
@ -184,6 +190,7 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B
|
||||
storage_->edge_id_ = recovery_info.next_edge_id;
|
||||
storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp);
|
||||
|
||||
spdlog::trace("Recovering indices and constraints from snapshot.");
|
||||
durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_,
|
||||
&storage_->constraints_, &storage_->vertices_);
|
||||
} catch (const durability::RecoveryFailure &e) {
|
||||
@ -194,25 +201,31 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B
|
||||
replication::SnapshotRes res{true, storage_->last_commit_timestamp_.load()};
|
||||
slk::Save(res, res_builder);
|
||||
|
||||
spdlog::trace("Deleting old snapshot files due to snapshot recovery.");
|
||||
// Delete other durability files
|
||||
auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_);
|
||||
for (const auto &[path, uuid, _] : snapshot_files) {
|
||||
if (path != *maybe_snapshot_path) {
|
||||
spdlog::trace("Deleting snapshot file {}", path);
|
||||
storage_->file_retainer_.DeleteFile(path);
|
||||
}
|
||||
}
|
||||
|
||||
spdlog::trace("Deleting old WAL files due to snapshot recovery.");
|
||||
auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_);
|
||||
if (wal_files) {
|
||||
for (const auto &wal_file : *wal_files) {
|
||||
spdlog::trace("Deleting WAL file {}", wal_file.path);
|
||||
storage_->file_retainer_.DeleteFile(wal_file.path);
|
||||
}
|
||||
|
||||
storage_->wal_file_.reset();
|
||||
}
|
||||
spdlog::debug("Replication recovery from snapshot finished!");
|
||||
}
|
||||
|
||||
void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
|
||||
spdlog::debug("Started replication recovery from received WAL files!");
|
||||
replication::WalFilesReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
|
||||
@ -229,9 +242,11 @@ void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::B
|
||||
|
||||
replication::WalFilesRes res{true, storage_->last_commit_timestamp_.load()};
|
||||
slk::Save(res, res_builder);
|
||||
spdlog::debug("Replication recovery from WAL files ended successfully, replica is now up to date!");
|
||||
}
|
||||
|
||||
void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
|
||||
spdlog::debug("Started replication recovery from current WAL!");
|
||||
replication::CurrentWalReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
|
||||
@ -243,6 +258,7 @@ void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk:
|
||||
|
||||
replication::CurrentWalRes res{true, storage_->last_commit_timestamp_.load()};
|
||||
slk::Save(res, res_builder);
|
||||
spdlog::debug("Replication recovery from current WAL ended successfully, replica is now up to date!");
|
||||
}
|
||||
|
||||
void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
|
||||
@ -267,13 +283,15 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
|
||||
storage_->wal_file_->FinalizeWal();
|
||||
storage_->wal_seq_num_ = wal_info.seq_num;
|
||||
storage_->wal_file_.reset();
|
||||
spdlog::trace("WAL file {} finalized successfully", *maybe_wal_path);
|
||||
}
|
||||
} else {
|
||||
storage_->wal_seq_num_ = wal_info.seq_num;
|
||||
}
|
||||
|
||||
spdlog::trace("Loading WAL deltas from {}", *maybe_wal_path);
|
||||
durability::Decoder wal;
|
||||
const auto version = wal.Initialize(*maybe_wal_path, durability::kWalMagic);
|
||||
spdlog::debug("WAL file {} loaded successfully", *maybe_wal_path);
|
||||
if (!version) throw durability::RecoveryFailure("Couldn't read WAL magic and/or version!");
|
||||
if (!durability::IsVersionSupported(*version)) throw durability::RecoveryFailure("Invalid WAL version!");
|
||||
wal.SetPosition(wal_info.offset_deltas);
|
||||
@ -282,7 +300,7 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
|
||||
i += ReadAndApplyDelta(&wal);
|
||||
}
|
||||
|
||||
spdlog::debug("{} loaded successfully", *maybe_wal_path);
|
||||
spdlog::debug("Replication from current WAL successful!");
|
||||
} catch (const durability::RecoveryFailure &e) {
|
||||
LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", *maybe_wal_path, e.what());
|
||||
}
|
||||
@ -303,6 +321,7 @@ Storage::ReplicationServer::~ReplicationServer() {
|
||||
}
|
||||
}
|
||||
uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) {
|
||||
spdlog::debug("Reading and applying missing transaction deltas!");
|
||||
auto edge_acc = storage_->edges_.access();
|
||||
auto vertex_acc = storage_->vertices_.access();
|
||||
|
||||
@ -415,7 +434,6 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
|
||||
case durability::WalDeltaData::Type::EDGE_SET_PROPERTY: {
|
||||
spdlog::trace(" Edge {} set property {} to {}", delta.vertex_edge_set_property.gid.AsUint(),
|
||||
delta.vertex_edge_set_property.property, delta.vertex_edge_set_property.value);
|
||||
|
||||
if (!storage_->config_.items.properties_on_edges)
|
||||
throw utils::BasicException(
|
||||
"Can't set properties on edges because properties on edges "
|
||||
@ -586,6 +604,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
|
||||
|
||||
storage_->last_commit_timestamp_ = max_commit_timestamp;
|
||||
|
||||
spdlog::debug("Applied {} deltas", applied_deltas);
|
||||
return applied_deltas;
|
||||
}
|
||||
} // namespace memgraph::storage
|
||||
|
@ -2126,6 +2126,7 @@ uint64_t Storage::CommitTimestamp(const std::optional<uint64_t> desired_commit_t
|
||||
}
|
||||
|
||||
bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) {
|
||||
spdlog::trace("Setting role to replica...");
|
||||
// We don't want to restart the server if we're already a REPLICA
|
||||
if (replication_role_ == replication::ReplicationRole::REPLICA) {
|
||||
return false;
|
||||
@ -2156,6 +2157,7 @@ bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::
|
||||
}
|
||||
|
||||
bool Storage::SetMainReplicationRole() {
|
||||
spdlog::trace("Setting main role...");
|
||||
// We don't want to generate new epoch_id and do the
|
||||
// cleanup if we're already a MAIN
|
||||
if (replication_role_ == replication::ReplicationRole::MAIN) {
|
||||
@ -2197,7 +2199,7 @@ bool Storage::SetMainReplicationRole() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
spdlog::info("Instance is now in a MAIN role.");
|
||||
replication_role_.store(replication::ReplicationRole::MAIN);
|
||||
|
||||
return true;
|
||||
@ -2206,8 +2208,10 @@ bool Storage::SetMainReplicationRole() {
|
||||
utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
|
||||
std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode,
|
||||
const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) {
|
||||
|
||||
MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN,
|
||||
"Only main instance can register a replica!");
|
||||
spdlog::trace("Registering replica...");
|
||||
|
||||
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
|
||||
return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; });
|
||||
@ -2267,9 +2271,12 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
|
||||
clients.push_back(std::move(client));
|
||||
return {};
|
||||
});
|
||||
spdlog::info("Replica {} registered.", name);
|
||||
}
|
||||
|
||||
bool Storage::UnregisterReplica(const std::string &name) {
|
||||
|
||||
spdlog::trace("Unregistering replica...");
|
||||
MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN,
|
||||
"Only main instance can unregister a replica!");
|
||||
if (ShouldStoreAndRestoreReplicationState()) {
|
||||
@ -2285,6 +2292,7 @@ bool Storage::UnregisterReplica(const std::string &name) {
|
||||
}
|
||||
|
||||
std::optional<replication::ReplicaState> Storage::GetReplicaState(const std::string_view name) {
|
||||
spdlog::trace("Getting replica state...");
|
||||
return replication_clients_.WithLock([&](auto &clients) -> std::optional<replication::ReplicaState> {
|
||||
const auto client_it =
|
||||
std::find_if(clients.cbegin(), clients.cend(), [name](auto &client) { return client->Name() == name; });
|
||||
@ -2298,6 +2306,7 @@ std::optional<replication::ReplicaState> Storage::GetReplicaState(const std::str
|
||||
replication::ReplicationRole Storage::GetReplicationRole() const { return replication_role_; }
|
||||
|
||||
std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
|
||||
spdlog::trace("Getting replicas info...");
|
||||
return replication_clients_.WithLock([](auto &clients) {
|
||||
std::vector<Storage::ReplicaInfo> replica_info;
|
||||
replica_info.reserve(clients.size());
|
||||
|
Loading…
Reference in New Issue
Block a user