diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 310b3d912..64cd9b607 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -118,7 +118,7 @@ void Storage::ReplicationClient::InitializeClient() { } current_commit_timestamp = response.current_commit_timestamp; - spdlog::trace("Current timestamp on replica: {}", current_commit_timestamp); + spdlog::trace("Current timestamp on replica {}: {}", name_, current_commit_timestamp); spdlog::trace("Current timestamp on main: {}", storage_->last_commit_timestamp_.load()); if (current_commit_timestamp == storage_->last_commit_timestamp_.load()) { spdlog::debug("Replica '{}' up to date", name_); diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index 7e8a1304f..8a705460d 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -13,6 +13,7 @@ #include #include +#include "spdlog/spdlog.h" #include "storage/v2/durability/durability.hpp" #include "storage/v2/durability/paths.hpp" #include "storage/v2/durability/serialization.hpp" @@ -102,6 +103,7 @@ void Storage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reade } void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) { + spdlog::debug("Started replication recovery from appending deltas!"); replication::AppendDeltasReq req; slk::Load(&req, req_reader); @@ -120,6 +122,7 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl storage_->wal_file_->FinalizeWal(); storage_->wal_file_.reset(); storage_->wal_seq_num_ = req.seq_num; + spdlog::trace("Finalized WAL file"); } else { MG_ASSERT(storage_->wal_file_->SequenceNumber() == req.seq_num, "Invalid sequence number of current wal file"); storage_->wal_seq_num_ = req.seq_num + 1; @@ -146,9 +149,11 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl replication::AppendDeltasRes res{true, storage_->last_commit_timestamp_.load()}; slk::Save(res, res_builder); + spdlog::debug("Replication recovery from append deltas finished, replica is now up to date!"); } void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) { + spdlog::debug("Started replication recovery from received snapshot file!"); replication::SnapshotReq req; slk::Load(&req, req_reader); @@ -161,6 +166,7 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path); std::unique_lock storage_guard(storage_->main_lock_); + spdlog::trace("Clearing database since recovering from snapshot."); // Clear the database storage_->vertices_.clear(); storage_->edges_.clear(); @@ -184,6 +190,7 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B storage_->edge_id_ = recovery_info.next_edge_id; storage_->timestamp_ = std::max(storage_->timestamp_, recovery_info.next_timestamp); + spdlog::trace("Recovering indices and constraints from snapshot."); durability::RecoverIndicesAndConstraints(recovered_snapshot.indices_constraints, &storage_->indices_, &storage_->constraints_, &storage_->vertices_); } catch (const durability::RecoveryFailure &e) { @@ -194,25 +201,31 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B replication::SnapshotRes res{true, storage_->last_commit_timestamp_.load()}; slk::Save(res, res_builder); + spdlog::trace("Deleting old snapshot files due to snapshot recovery."); // Delete other durability files auto snapshot_files = durability::GetSnapshotFiles(storage_->snapshot_directory_, storage_->uuid_); for (const auto &[path, uuid, _] : snapshot_files) { if (path != *maybe_snapshot_path) { + spdlog::trace("Deleting snapshot file {}", path); storage_->file_retainer_.DeleteFile(path); } } + spdlog::trace("Deleting old WAL files due to snapshot recovery."); auto wal_files = durability::GetWalFiles(storage_->wal_directory_, storage_->uuid_); if (wal_files) { for (const auto &wal_file : *wal_files) { + spdlog::trace("Deleting WAL file {}", wal_file.path); storage_->file_retainer_.DeleteFile(wal_file.path); } storage_->wal_file_.reset(); } + spdlog::debug("Replication recovery from snapshot finished!"); } void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) { + spdlog::debug("Started replication recovery from received WAL files!"); replication::WalFilesReq req; slk::Load(&req, req_reader); @@ -229,9 +242,11 @@ void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::B replication::WalFilesRes res{true, storage_->last_commit_timestamp_.load()}; slk::Save(res, res_builder); + spdlog::debug("Replication recovery from WAL files ended successfully, replica is now up to date!"); } void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) { + spdlog::debug("Started replication recovery from current WAL!"); replication::CurrentWalReq req; slk::Load(&req, req_reader); @@ -243,6 +258,7 @@ void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk: replication::CurrentWalRes res{true, storage_->last_commit_timestamp_.load()}; slk::Save(res, res_builder); + spdlog::debug("Replication recovery from current WAL ended successfully, replica is now up to date!"); } void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) { @@ -267,13 +283,15 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) { storage_->wal_file_->FinalizeWal(); storage_->wal_seq_num_ = wal_info.seq_num; storage_->wal_file_.reset(); + spdlog::trace("WAL file {} finalized successfully", *maybe_wal_path); } } else { storage_->wal_seq_num_ = wal_info.seq_num; } - + spdlog::trace("Loading WAL deltas from {}", *maybe_wal_path); durability::Decoder wal; const auto version = wal.Initialize(*maybe_wal_path, durability::kWalMagic); + spdlog::debug("WAL file {} loaded successfully", *maybe_wal_path); if (!version) throw durability::RecoveryFailure("Couldn't read WAL magic and/or version!"); if (!durability::IsVersionSupported(*version)) throw durability::RecoveryFailure("Invalid WAL version!"); wal.SetPosition(wal_info.offset_deltas); @@ -282,7 +300,7 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) { i += ReadAndApplyDelta(&wal); } - spdlog::debug("{} loaded successfully", *maybe_wal_path); + spdlog::debug("Replication from current WAL successful!"); } catch (const durability::RecoveryFailure &e) { LOG_FATAL("Couldn't recover WAL deltas from {} because of: {}", *maybe_wal_path, e.what()); } @@ -303,6 +321,7 @@ Storage::ReplicationServer::~ReplicationServer() { } } uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) { + spdlog::debug("Reading and applying missing transaction deltas!"); auto edge_acc = storage_->edges_.access(); auto vertex_acc = storage_->vertices_.access(); @@ -415,7 +434,6 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * case durability::WalDeltaData::Type::EDGE_SET_PROPERTY: { spdlog::trace(" Edge {} set property {} to {}", delta.vertex_edge_set_property.gid.AsUint(), delta.vertex_edge_set_property.property, delta.vertex_edge_set_property.value); - if (!storage_->config_.items.properties_on_edges) throw utils::BasicException( "Can't set properties on edges because properties on edges " @@ -586,6 +604,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * storage_->last_commit_timestamp_ = max_commit_timestamp; + spdlog::debug("Applied {} deltas", applied_deltas); return applied_deltas; } } // namespace memgraph::storage diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 79c38492f..4c65350b2 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -2126,6 +2126,7 @@ uint64_t Storage::CommitTimestamp(const std::optional desired_commit_t } bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) { + spdlog::trace("Setting role to replica..."); // We don't want to restart the server if we're already a REPLICA if (replication_role_ == replication::ReplicationRole::REPLICA) { return false; @@ -2156,6 +2157,7 @@ bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication:: } bool Storage::SetMainReplicationRole() { + spdlog::trace("Setting main role..."); // We don't want to generate new epoch_id and do the // cleanup if we're already a MAIN if (replication_role_ == replication::ReplicationRole::MAIN) { @@ -2197,7 +2199,7 @@ bool Storage::SetMainReplicationRole() { return false; } } - + spdlog::info("Instance is now in a MAIN role."); replication_role_.store(replication::ReplicationRole::MAIN); return true; @@ -2206,8 +2208,10 @@ bool Storage::SetMainReplicationRole() { utils::BasicResult Storage::RegisterReplica( std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode, const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) { + MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN, "Only main instance can register a replica!"); + spdlog::trace("Registering replica..."); const bool name_exists = replication_clients_.WithLock([&](auto &clients) { return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; }); @@ -2267,9 +2271,12 @@ utils::BasicResult Storage::RegisterReplica( clients.push_back(std::move(client)); return {}; }); + spdlog::info("Replica {} registered.", name); } bool Storage::UnregisterReplica(const std::string &name) { + + spdlog::trace("Unregistering replica..."); MG_ASSERT(replication_role_.load() == replication::ReplicationRole::MAIN, "Only main instance can unregister a replica!"); if (ShouldStoreAndRestoreReplicationState()) { @@ -2285,6 +2292,7 @@ bool Storage::UnregisterReplica(const std::string &name) { } std::optional Storage::GetReplicaState(const std::string_view name) { + spdlog::trace("Getting replica state..."); return replication_clients_.WithLock([&](auto &clients) -> std::optional { const auto client_it = std::find_if(clients.cbegin(), clients.cend(), [name](auto &client) { return client->Name() == name; }); @@ -2298,6 +2306,7 @@ std::optional Storage::GetReplicaState(const std::str replication::ReplicationRole Storage::GetReplicationRole() const { return replication_role_; } std::vector Storage::ReplicasInfo() { + spdlog::trace("Getting replicas info..."); return replication_clients_.WithLock([](auto &clients) { std::vector replica_info; replica_info.reserve(clients.size());