diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 8cbd57136..c36bf5caf 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -35,12 +35,14 @@ InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemory const replication::ReplicationMode mode, const replication::ReplicationClientConfig &config) : name_(std::move(name)), storage_(storage), mode_(mode) { + spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port); if (config.ssl) { rpc_context_.emplace(config.ssl->key_file, config.ssl->cert_file); } else { rpc_context_.emplace(); } + rpc_client_.emplace(endpoint, &*rpc_context_); TryInitializeClientSync(); @@ -268,11 +270,14 @@ bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplicationInternal( } void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit) { + spdlog::debug("Starting replica recover"); while (true) { auto file_locker = storage_->file_retainer_.AddLocker(); const auto steps = GetRecoverySteps(replica_commit, &file_locker); + int i = 0; for (const auto &recovery_step : steps) { + spdlog::trace("Recovering in step: {}", i++); try { std::visit( [&, this]<typename T>(T &&arg) { @@ -285,6 +290,7 @@ void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit) spdlog::debug("Sending the latest wal files"); auto response = TransferWalFiles(arg); replica_commit = response.current_commit_timestamp; + spdlog::debug("Wal files successfully transferred."); } else if constexpr (std::is_same_v<StepType, RecoveryCurrentWal>) { std::unique_lock transaction_guard(storage_->engine_lock_); if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == arg.current_wal_seq_num) { @@ -293,6 +299,8 @@ void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit) spdlog::debug("Sending current wal file"); replica_commit = ReplicateCurrentWal(); storage_->wal_file_->EnableFlushing(); + } else { + spdlog::debug("Cannot recover using current wal file"); } } else { static_assert(always_false_v<T>, "Missing type from variant visitor"); diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 367c13058..2ae88b91d 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -42,6 +42,11 @@ class InMemoryStorage::ReplicationClient { ReplicationClient(std::string name, InMemoryStorage *storage, const io::network::Endpoint &endpoint, replication::ReplicationMode mode, const replication::ReplicationClientConfig &config = {}); + ~ReplicationClient() { + auto endpoint = rpc_client_->Endpoint(); + spdlog::trace("Closing replication client on {}:{}", endpoint.address, endpoint.port); + } + // Handler used for transfering the current transaction. class ReplicaStream { private: diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index 9e3691b24..bb3893251 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -46,7 +46,7 @@ std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder InMemoryStorage::ReplicationServer::ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) - : storage_(storage) { + : storage_(storage), endpoint_(endpoint) { // Create RPC server. if (config.ssl) { rpc_server_context_.emplace(config.ssl->key_file, config.ssl->cert_file, config.ssl->ca_file, @@ -318,6 +318,7 @@ void InMemoryStorage::ReplicationServer::TimestampHandler(slk::Reader *req_reade InMemoryStorage::ReplicationServer::~ReplicationServer() { if (rpc_server_) { + spdlog::trace("Closing replication server on {}:{}", endpoint_.address, endpoint_.port); rpc_server_->Shutdown(); rpc_server_->AwaitShutdown(); } diff --git a/src/storage/v2/replication/replication_server.hpp b/src/storage/v2/replication/replication_server.hpp index 6be1ad23c..3bd027995 100644 --- a/src/storage/v2/replication/replication_server.hpp +++ b/src/storage/v2/replication/replication_server.hpp @@ -46,6 +46,7 @@ class InMemoryStorage::ReplicationServer { std::optional<rpc::Server> rpc_server_; InMemoryStorage *storage_; + io::network::Endpoint endpoint_; }; } // namespace memgraph::storage diff --git a/tests/jepsen/resources/node-config.edn b/tests/jepsen/resources/node-config.edn index ed3e45b44..4b814e006 100644 --- a/tests/jepsen/resources/node-config.edn +++ b/tests/jepsen/resources/node-config.edn @@ -7,4 +7,4 @@ "n2" {:replication-role :replica :replication-mode :async :port 10000} "n3" {:replication-role :replica :replication-mode :sync :port 10000} "n4" {:replication-role :replica :replication-mode :sync :port 10000} - "n5" {:replication-role :replica :replication-mode :sync :port 10000}}] + "n5" {:replication-role :replica :replication-mode :sync :port 10000}}] \ No newline at end of file