Improve logging if replica cannot recover using curr WAL file (#1086)
This commit is contained in:
parent
bd2ec6374a
commit
60f4ffc6a1
src/storage/v2/replication
tests/jepsen/resources
@ -35,12 +35,14 @@ InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemory
|
||||
const replication::ReplicationMode mode,
|
||||
const replication::ReplicationClientConfig &config)
|
||||
: name_(std::move(name)), storage_(storage), mode_(mode) {
|
||||
spdlog::trace("Replication client started at: {}:{}", endpoint.address, endpoint.port);
|
||||
if (config.ssl) {
|
||||
rpc_context_.emplace(config.ssl->key_file, config.ssl->cert_file);
|
||||
} else {
|
||||
rpc_context_.emplace();
|
||||
}
|
||||
|
||||
|
||||
rpc_client_.emplace(endpoint, &*rpc_context_);
|
||||
TryInitializeClientSync();
|
||||
|
||||
@ -268,11 +270,14 @@ bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplicationInternal(
|
||||
}
|
||||
|
||||
void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
|
||||
spdlog::debug("Starting replica recover");
|
||||
while (true) {
|
||||
auto file_locker = storage_->file_retainer_.AddLocker();
|
||||
|
||||
const auto steps = GetRecoverySteps(replica_commit, &file_locker);
|
||||
int i = 0;
|
||||
for (const auto &recovery_step : steps) {
|
||||
spdlog::trace("Recovering in step: {}", i++);
|
||||
try {
|
||||
std::visit(
|
||||
[&, this]<typename T>(T &&arg) {
|
||||
@ -285,6 +290,7 @@ void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit)
|
||||
spdlog::debug("Sending the latest wal files");
|
||||
auto response = TransferWalFiles(arg);
|
||||
replica_commit = response.current_commit_timestamp;
|
||||
spdlog::debug("Wal files successfully transferred.");
|
||||
} else if constexpr (std::is_same_v<StepType, RecoveryCurrentWal>) {
|
||||
std::unique_lock transaction_guard(storage_->engine_lock_);
|
||||
if (storage_->wal_file_ && storage_->wal_file_->SequenceNumber() == arg.current_wal_seq_num) {
|
||||
@ -293,6 +299,8 @@ void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit)
|
||||
spdlog::debug("Sending current wal file");
|
||||
replica_commit = ReplicateCurrentWal();
|
||||
storage_->wal_file_->EnableFlushing();
|
||||
} else {
|
||||
spdlog::debug("Cannot recover using current wal file");
|
||||
}
|
||||
} else {
|
||||
static_assert(always_false_v<T>, "Missing type from variant visitor");
|
||||
|
@ -42,6 +42,11 @@ class InMemoryStorage::ReplicationClient {
|
||||
ReplicationClient(std::string name, InMemoryStorage *storage, const io::network::Endpoint &endpoint,
|
||||
replication::ReplicationMode mode, const replication::ReplicationClientConfig &config = {});
|
||||
|
||||
~ReplicationClient() {
|
||||
auto endpoint = rpc_client_->Endpoint();
|
||||
spdlog::trace("Closing replication client on {}:{}", endpoint.address, endpoint.port);
|
||||
}
|
||||
|
||||
// Handler used for transfering the current transaction.
|
||||
class ReplicaStream {
|
||||
private:
|
||||
|
@ -46,7 +46,7 @@ std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder
|
||||
|
||||
InMemoryStorage::ReplicationServer::ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint,
|
||||
const replication::ReplicationServerConfig &config)
|
||||
: storage_(storage) {
|
||||
: storage_(storage), endpoint_(endpoint) {
|
||||
// Create RPC server.
|
||||
if (config.ssl) {
|
||||
rpc_server_context_.emplace(config.ssl->key_file, config.ssl->cert_file, config.ssl->ca_file,
|
||||
@ -318,6 +318,7 @@ void InMemoryStorage::ReplicationServer::TimestampHandler(slk::Reader *req_reade
|
||||
|
||||
InMemoryStorage::ReplicationServer::~ReplicationServer() {
|
||||
if (rpc_server_) {
|
||||
spdlog::trace("Closing replication server on {}:{}", endpoint_.address, endpoint_.port);
|
||||
rpc_server_->Shutdown();
|
||||
rpc_server_->AwaitShutdown();
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ class InMemoryStorage::ReplicationServer {
|
||||
std::optional<rpc::Server> rpc_server_;
|
||||
|
||||
InMemoryStorage *storage_;
|
||||
io::network::Endpoint endpoint_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::storage
|
||||
|
@ -7,4 +7,4 @@
|
||||
"n2" {:replication-role :replica :replication-mode :async :port 10000}
|
||||
"n3" {:replication-role :replica :replication-mode :sync :port 10000}
|
||||
"n4" {:replication-role :replica :replication-mode :sync :port 10000}
|
||||
"n5" {:replication-role :replica :replication-mode :sync :port 10000}}]
|
||||
"n5" {:replication-role :replica :replication-mode :sync :port 10000}}]
|
Loading…
Reference in New Issue
Block a user