This commit is contained in:
Marko Budiselic 2022-06-06 17:31:34 +02:00
parent ed71d01aa3
commit 2d109a13d6
3 changed files with 19 additions and 11 deletions

View File

@ -227,17 +227,18 @@ void Storage::ReplicationClient::IfStreamingTransaction(const std::function<void
}
}
void Storage::ReplicationClient::FinalizeTransactionReplication() {
std::optional<bool> Storage::ReplicationClient::FinalizeTransactionReplication() {
// We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be
// called from a one thread stands)
if (replica_state_ != replication::ReplicaState::REPLICATING) {
return;
return std::nullopt;
}
if (mode_ == replication::ReplicationMode::ASYNC) {
thread_pool_.AddTask([this] { this->FinalizeTransactionReplicationInternal(); });
thread_pool_.AddTask([this] { [[maybe_unused]] auto finalized = this->FinalizeTransactionReplicationInternal(); });
return true;
} else if (timeout_) {
MG_ASSERT(mode_ == replication::ReplicationMode::SYNC, "Only SYNC replica can have a timeout.");
MG_ASSERT(timeout_dispatcher_, "Timeout thread is missing");
@ -245,7 +246,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
timeout_dispatcher_->active = true;
thread_pool_.AddTask([&, this] {
this->FinalizeTransactionReplicationInternal();
[[maybe_unused]] auto finalized = this->FinalizeTransactionReplicationInternal();
std::unique_lock main_guard(timeout_dispatcher_->main_lock);
// TimerThread can finish waiting for timeout
timeout_dispatcher_->active = false;
@ -273,13 +274,13 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
// and acces the `active` variable`
thread_pool_.AddTask([this] { timeout_dispatcher_.reset(); });
}
return true;
} else {
FinalizeTransactionReplicationInternal();
return FinalizeTransactionReplicationInternal();
}
}
// TODO(gitbuda): FinalizeTransactionReplicationInternal should also return success info.
void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
bool Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
try {
auto response = replica_stream_->Finalize();
@ -290,7 +291,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); });
} else {
replica_state_.store(replication::ReplicaState::READY);
// TODO(gitbuda): return true
return true;
}
} catch (const rpc::RpcFailedException &) {
replica_stream_.reset();
@ -300,7 +301,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
}
HandleRpcFailure();
}
// TODO(gitbuda): return false
return false;
}
void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {

View File

@ -103,7 +103,10 @@ class Storage::ReplicationClient {
// StartTransactionReplication, stream is created.
void IfStreamingTransaction(const std::function<void(ReplicaStream &handler)> &callback);
void FinalizeTransactionReplication();
// Return none -> OK
// Return true -> OK
// Return false -> FAIL
std::optional<bool> FinalizeTransactionReplication();
// Transfer the snapshot file.
// @param path Path of the snapshot file.
@ -125,7 +128,7 @@ class Storage::ReplicationClient {
const auto &Endpoint() const { return rpc_client_->Endpoint(); }
private:
void FinalizeTransactionReplicationInternal();
[[nodiscard]] bool FinalizeTransactionReplicationInternal();
void RecoverReplica(uint64_t replica_commit);

View File

@ -1799,11 +1799,15 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_
FinalizeWalFile();
replication_clients_.WithLock([&](auto &clients) {
bool all_sync_replicas_ok = true;
for (auto &client : clients) {
// TODO(gitbuda): SEMI-SYNC should be exculded from here.
if (client->Mode() == replication::ReplicationMode::SYNC)
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); });
// TODO(gitbuda): FinalizeTransactionReplication should also indicate that eveything went well for SYNC replicas.
client->FinalizeTransactionReplication();
}
return all_sync_replicas_ok;
});
}