Add one more option as comments
This commit is contained in:
parent
7ac7b7c483
commit
05f3e7d243
@ -244,6 +244,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplication() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(gitbuda): FinalizeTransactionReplicationInternal should also return success info.
|
||||||
void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
|
void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
|
||||||
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
|
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
|
||||||
try {
|
try {
|
||||||
@ -255,6 +256,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
|
|||||||
thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); });
|
thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); });
|
||||||
} else {
|
} else {
|
||||||
replica_state_.store(replication::ReplicaState::READY);
|
replica_state_.store(replication::ReplicaState::READY);
|
||||||
|
// TODO(gitbuda): return true
|
||||||
}
|
}
|
||||||
} catch (const rpc::RpcFailedException &) {
|
} catch (const rpc::RpcFailedException &) {
|
||||||
replica_stream_.reset();
|
replica_stream_.reset();
|
||||||
@ -264,6 +266,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
|
|||||||
}
|
}
|
||||||
HandleRpcFailure();
|
HandleRpcFailure();
|
||||||
}
|
}
|
||||||
|
// TODO(gitbuda): return false
|
||||||
}
|
}
|
||||||
|
|
||||||
void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
|
void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
|
||||||
|
@ -919,6 +919,7 @@ utils::BasicResult<CommitError, void> Storage::Accessor::Commit(
|
|||||||
// so the Wal files are consistent
|
// so the Wal files are consistent
|
||||||
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
|
if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) {
|
||||||
// TODO(gitbuda): Possible to abort data operation because in this context there is an abort operation.
|
// TODO(gitbuda): Possible to abort data operation because in this context there is an abort operation.
|
||||||
|
// TODO(gitbuda): If AppendToWal returns false, we can exit this block and Abort.
|
||||||
storage_->AppendToWal(transaction_, *commit_timestamp_);
|
storage_->AppendToWal(transaction_, *commit_timestamp_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1800,6 +1801,7 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_
|
|||||||
replication_clients_.WithLock([&](auto &clients) {
|
replication_clients_.WithLock([&](auto &clients) {
|
||||||
for (auto &client : clients) {
|
for (auto &client : clients) {
|
||||||
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); });
|
client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); });
|
||||||
|
// TODO(gitbuda): FinalizeTransactionReplication should also indicate that eveything went well for SYNC replicas.
|
||||||
client->FinalizeTransactionReplication();
|
client->FinalizeTransactionReplication();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user