diff --git a/src/storage/v2/durability/snapshot.cpp b/src/storage/v2/durability/snapshot.cpp index 97653e030..5aa4208c7 100644 --- a/src/storage/v2/durability/snapshot.cpp +++ b/src/storage/v2/durability/snapshot.cpp @@ -165,7 +165,7 @@ RecoveredSnapshot LoadSnapshot( if (!version) throw RecoveryFailure("Couldn't read snapshot magic and/or version!"); if (!IsVersionSupported(*version)) - throw RecoveryFailure("Invalid snapshot version!"); + throw RecoveryFailure(fmt::format("Invalid snapshot version {}", *version)); // Cleanup of loaded data in case of failure. bool success = false; @@ -173,6 +173,7 @@ RecoveredSnapshot LoadSnapshot( if (!success) { edges->clear(); vertices->clear(); + epoch_history->clear(); } }); diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 344628955..91b58d195 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -28,14 +28,15 @@ Storage::ReplicationClient::ReplicationClient( } rpc_client_.emplace(endpoint, &*rpc_context_); - InitializeClient(); + TryInitializeClient(); - if (config.timeout) { + if (config.timeout && replica_state_ != replication::ReplicaState::INVALID) { timeout_.emplace(*config.timeout); timeout_dispatcher_.emplace(); } } +/// @throws rpc::RpcFailedException void Storage::ReplicationClient::InitializeClient() { uint64_t current_commit_timestamp{kTimestampInitialId}; auto stream{ @@ -74,6 +75,25 @@ void Storage::ReplicationClient::InitializeClient() { } } +void Storage::ReplicationClient::TryInitializeClient() { + try { + InitializeClient(); + } catch (const rpc::RpcFailedException &) { + std::unique_lock client_guarde{client_lock_}; + replica_state_.store(replication::ReplicaState::INVALID); + LOG(ERROR) << "Failed to connect to replica " << name_ << " at " + << rpc_client_->Endpoint(); + } +} + +void Storage::ReplicationClient::HandleRpcFailure() { + LOG(ERROR) << "Couldn't replicate data to " << name_; + thread_pool_.AddTask([this] { + rpc_client_->Abort(); + this->TryInitializeClient(); + }); +} + SnapshotRes Storage::ReplicationClient::TransferSnapshot( const std::filesystem::path &path) { auto stream{rpc_client_->Stream()}; @@ -103,44 +123,39 @@ OnlySnapshotRes Storage::ReplicationClient::TransferOnlySnapshot( return stream.AwaitResponse(); } -bool Storage::ReplicationClient::StartTransactionReplication( +void Storage::ReplicationClient::StartTransactionReplication( const uint64_t current_wal_seq_num) { std::unique_lock guard(client_lock_); const auto status = replica_state_.load(); switch (status) { case replication::ReplicaState::RECOVERY: DLOG(INFO) << "Replica " << name_ << " is behind MAIN instance"; - return false; + return; case replication::ReplicaState::REPLICATING: - DLOG(INFO) << "Replica missed a transaction, going to recovery"; + DLOG(INFO) << "Replica " << name_ << " missed a transaction"; + // We missed a transaction because we're still replicating + // the previous transaction so we need to go to RECOVERY + // state to catch up with the missing transaction + // We cannot queue the recovery process here because + // an error can happen while we're replicating the previous + // transaction after which the client should go to + // INVALID state before starting the recovery process replica_state_.store(replication::ReplicaState::RECOVERY); - // If it's in replicating state, it should have been up to date with all - // the commits until now so the replica should contain the - // last_commit_timestamp - thread_pool_.AddTask([=, this] { - this->RecoverReplica(storage_->last_commit_timestamp_.load()); - }); - return false; + return; case replication::ReplicaState::INVALID: - LOG(ERROR) << "Couldn't replicate data to " << name_; - return false; + HandleRpcFailure(); + return; case replication::ReplicaState::READY: CHECK(!replica_stream_); try { replica_stream_.emplace( ReplicaStream{this, storage_->last_commit_timestamp_.load(), current_wal_seq_num}); + replica_state_.store(replication::ReplicaState::REPLICATING); } catch (const rpc::RpcFailedException &) { - replica_state_.store(replication::ReplicaState::INVALID); - LOG(ERROR) << "Couldn't replicate data to " << name_; - thread_pool_.AddTask([this] { - rpc_client_->Abort(); - InitializeClient(); - }); - return false; + HandleRpcFailure(); } - replica_state_.store(replication::ReplicaState::REPLICATING); - return true; + return; } } @@ -150,11 +165,7 @@ void Storage::ReplicationClient::IfStreamingTransaction( try { callback(*replica_stream_); } catch (const rpc::RpcFailedException &) { - LOG(ERROR) << "Couldn't replicate data to " << name_; - thread_pool_.AddTask([this] { - rpc_client_->Abort(); - InitializeClient(); - }); + HandleRpcFailure(); } } } @@ -209,25 +220,16 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() { if (replica_stream_) { try { auto response = replica_stream_->Finalize(); - if (!response.success) { - { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::RECOVERY); - } + std::unique_lock client_guard{client_lock_}; + if (!response.success || + replica_state_ == replication::ReplicaState::RECOVERY) { + replica_state_.store(replication::ReplicaState::RECOVERY); thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); }); } } catch (const rpc::RpcFailedException &) { - LOG(ERROR) << "Couldn't replicate data to " << name_; - { - std::unique_lock client_guard{client_lock_}; - replica_state_.store(replication::ReplicaState::INVALID); - } - thread_pool_.AddTask([this] { - rpc_client_->Abort(); - InitializeClient(); - }); + HandleRpcFailure(); } replica_stream_.reset(); } @@ -244,45 +246,52 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) { const auto steps = GetRecoverySteps(replica_commit, &file_locker); for (const auto &recovery_step : steps) { - std::visit( - [&, this](T &&arg) { - using StepType = std::remove_cvref_t; - if constexpr (std::is_same_v) { - DLOG(INFO) << "Sending the latest snapshot file: " << arg; - auto response = TransferSnapshot(arg); - replica_commit = response.current_commit_timestamp; - DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: " << replica_commit; - } else if constexpr (std::is_same_v) { - DLOG(INFO) << "Sending the latest wal files"; - auto response = TransferWalFiles(arg); - replica_commit = response.current_commit_timestamp; - DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: " << replica_commit; - } else if constexpr (std::is_same_v) { - std::unique_lock transaction_guard(storage_->engine_lock_); - if (storage_->wal_file_ && - storage_->wal_file_->SequenceNumber() == - arg.current_wal_seq_num) { - storage_->wal_file_->DisableFlushing(); - transaction_guard.unlock(); - DLOG(INFO) << "Sending current wal file"; - replica_commit = ReplicateCurrentWal(); + try { + std::visit( + [&, this](T &&arg) { + using StepType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + DLOG(INFO) << "Sending the latest snapshot file: " << arg; + auto response = TransferSnapshot(arg); + replica_commit = response.current_commit_timestamp; DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: " << replica_commit; - storage_->wal_file_->EnableFlushing(); - } - } else if constexpr (std::is_same_v) { - DLOG(INFO) << "Snapshot timestamp is the latest"; - auto response = TransferOnlySnapshot(arg.snapshot_timestamp); - if (response.success) { + } else if constexpr (std::is_same_v) { + DLOG(INFO) << "Sending the latest wal files"; + auto response = TransferWalFiles(arg); replica_commit = response.current_commit_timestamp; + DLOG(INFO) << "CURRENT TIMESTAMP ON REPLICA: " + << replica_commit; + } else if constexpr (std::is_same_v) { + std::unique_lock transaction_guard(storage_->engine_lock_); + if (storage_->wal_file_ && + storage_->wal_file_->SequenceNumber() == + arg.current_wal_seq_num) { + storage_->wal_file_->DisableFlushing(); + transaction_guard.unlock(); + DLOG(INFO) << "Sending current wal file"; + replica_commit = ReplicateCurrentWal(); + DLOG(INFO) + << "CURRENT TIMESTAMP ON REPLICA: " << replica_commit; + storage_->wal_file_->EnableFlushing(); + } + } else if constexpr (std::is_same_v) { + DLOG(INFO) << "Snapshot timestamp is the latest"; + auto response = TransferOnlySnapshot(arg.snapshot_timestamp); + if (response.success) { + replica_commit = response.current_commit_timestamp; + } + } else { + static_assert(always_false_v, + "Missing type from variant visitor"); } - } else { - static_assert(always_false_v, - "Missing type from variant visitor"); - } - }, - recovery_step); + }, + recovery_step); + } catch (const rpc::RpcFailedException &) { + HandleRpcFailure(); + } } if (storage_->last_commit_timestamp_.load() == replica_commit) { @@ -391,7 +400,6 @@ Storage::ReplicationClient::GetRecoverySteps( return recovery_steps; } - ++rwal_it; uint64_t previous_seq_num{rwal_it->seq_num}; for (; rwal_it != wal_files->rend(); ++rwal_it) { // If the difference between two consecutive wal files is not 0 or 1 @@ -403,11 +411,12 @@ Storage::ReplicationClient::GetRecoverySteps( // Find first WAL that contains up to replica commit, i.e. WAL // that is before the replica commit or conatins the replica commit // as the last committed transaction. - if (replica_commit >= rwal_it->from_timestamp && - replica_commit >= rwal_it->to_timestamp) { - // We want the WAL after because the replica already contains all the - // commits from this WAL - --rwal_it; + if (replica_commit >= rwal_it->from_timestamp) { + if (replica_commit >= rwal_it->to_timestamp) { + // We want the WAL after because the replica already contains all the + // commits from this WAL + --rwal_it; + } std::vector wal_chain; auto distance_from_first = std::distance(rwal_it, wal_files->rend() - 1); // We have managed to create WAL chain diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 1c50bc882..cbffa9a3b 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -90,7 +90,7 @@ class Storage::ReplicationClient { rpc::Client::StreamHandler stream_; }; - bool StartTransactionReplication(uint64_t current_wal_seq_num); + void StartTransactionReplication(uint64_t current_wal_seq_num); // Replication clients can be removed at any point // so to avoid any complexity of checking if the client was removed whenever @@ -155,6 +155,10 @@ class Storage::ReplicationClient { void InitializeClient(); + void TryInitializeClient(); + + void HandleRpcFailure(); + std::string name_; Storage *storage_; @@ -190,6 +194,19 @@ class Storage::ReplicationClient { std::optional timeout_dispatcher_; utils::SpinLock client_lock_; + // This thread pool is used for background tasks so we don't + // block the main storage thread + // We use only 1 thread for 2 reasons: + // - background tasks ALWAYS contain some kind of RPC communication. + // We can't have multiple RPC communication from a same client + // because that's not logically valid (e.g. you cannot send a snapshot + // and WAL at a same time because WAL will arrive earlier and be applied + // before the snapshot which is not correct) + // - the implementation is simplified as we have a total control of what + // this pool is executing. Also, we can simply queue multiple tasks + // and be sure of the execution order. + // Not having mulitple possible threads in the same client allows us + // to ignore concurrency problems inside the client. utils::ThreadPool thread_pool_{1}; std::atomic replica_state_{ replication::ReplicaState::INVALID}; diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index ad0de8a8c..da46e65a3 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -112,6 +112,13 @@ void Storage::ReplicationServer::AppendDeltasHandler( auto maybe_epoch_id = decoder.ReadString(); CHECK(maybe_epoch_id) << "Invalid replication message"; + // Different epoch ids should not be possible in AppendDeltas + // because Recovery and Heartbeat handlers should resolve + // any issues with timestamp and epoch id + CHECK(*maybe_epoch_id == storage_->epoch_id_) + << "Received Deltas from transaction with incompatible" + " epoch id"; + const auto read_delta = [&]() -> std::pair { try { @@ -126,11 +133,6 @@ void Storage::ReplicationServer::AppendDeltasHandler( } }; - // TODO (antonio2368): Add error handling for different epoch id - if (*maybe_epoch_id != storage_->epoch_id_) { - throw utils::BasicException("Invalid epoch id"); - } - if (req.previous_commit_timestamp != storage_->last_commit_timestamp_.load()) { // Empty the stream @@ -532,7 +534,6 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, storage_->edges_.clear(); storage_->constraints_ = Constraints(); - // TODO (antonio2368): Check if there's a less hacky way storage_->indices_.label_index = LabelIndex( &storage_->indices_, &storage_->constraints_, storage_->config_.items); storage_->indices_.label_property_index = LabelPropertyIndex( @@ -558,8 +559,7 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, recovered_snapshot.indices_constraints, &storage_->indices_, &storage_->constraints_, &storage_->vertices_); } catch (const durability::RecoveryFailure &e) { - // TODO (antonio2368): What to do if the sent snapshot is invalid - LOG(WARNING) << "Couldn't load the snapshot because of: " << e.what(); + LOG(FATAL) << "Couldn't load the snapshot because of: " << e.what(); } storage_->last_commit_timestamp_ = storage_->timestamp_ - 1; storage_guard.unlock(); diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index fd72f211c..c933ed40c 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -2003,7 +2003,6 @@ utils::BasicResult Storage::RegisterReplica( std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode, const replication::ReplicationClientConfig &config) { - // TODO (antonio2368): This shouldn't stop the main instance CHECK(replication_role_.load() == ReplicationRole::MAIN) << "Only main instance can register a replica!";