Expose the status of transaction within Raft

Summary:
For proper client interaction, we need to expose the (term_id, log_index)
pair for the transaction that's about to be replicated and we need to be able
to retrieve the status of a transaction defined by that pair. Transaction
status can be one of the following:

  1) REPLICATED (self-explanatory)
  2) WAITING (waiting for replication)
  3) ABORTED (self-explanatory)
  4) INVALID (received request with either invalid term_id or invalid log_index)

Reviewers: mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2201
This commit is contained in:
Ivan Paljak 2019-07-16 10:50:53 +02:00
parent d5106c7233
commit 4acad5795b
5 changed files with 105 additions and 20 deletions

View File

@ -9,14 +9,46 @@
namespace raft {
enum class TxStatus { REPLICATED, WAITING, ABORTED, INVALID };
inline std::string TxStatusToString(const TxStatus &tx_status) {
switch (tx_status) {
case TxStatus::REPLICATED:
return "REPLICATED";
case TxStatus::WAITING:
return "WAITING";
case TxStatus::ABORTED:
return "ABORTED";
case TxStatus::INVALID:
return "INVALID";
}
}
/// Structure which describes the StateDelta status after the execution of
/// RaftServer's Emplace method.
///
/// It consists of two fields:
/// 1) A boolean flag `emplaced` which signals whether the delta has
/// successfully been emplaced in the raft log buffer.
/// 2) Two optional unsigned 64-bit integers which denote the term
/// when the corresponding LogEntry was emplaced and its log_index in
/// the Raft log. These values are contained in the optional metadata only
/// if the emplaced StateDelta signifies the COMMIT of a non-read-only
/// transaction.
struct DeltaStatus {
bool emplaced;
std::optional<uint64_t> term_id;
std::optional<uint64_t> log_index;
};
/// Exposes only functionality that other parts of Memgraph can interact with,
/// emplacing a state delta into the appropriate Raft log entry.
class RaftInterface {
public:
/// Add StateDelta to the appropriate Raft log entry.
///
/// @returns true if the Delta is emplaced, false otherwise.
virtual bool Emplace(const database::StateDelta &) = 0;
/// @returns DeltaStatus object as a result.
virtual DeltaStatus Emplace(const database::StateDelta &) = 0;
/// Checks if the transaction with the given transaction id can safely be
/// committed in local storage.
@ -28,6 +60,19 @@ class RaftInterface {
/// Returns the term ID of the current leader.
virtual uint64_t TermId() = 0;
/// Returns the status of the transaction which began its replication in
/// a given term ID and was emplaced in the raft log at the given index.
///
/// Transaction status can be one of the following:
/// 1) REPLICATED -- transaction was successfully replicated accross
/// the Raft cluster
/// 2) WAITING -- transaction was successfully emplaced in the Raft
/// log and is currently being replicated.
/// 3) ABORTED -- transaction was aborted.
/// 4) INVALID -- the request for the transaction was invalid, most
/// likely either term_id or log_index were out of range.
virtual TxStatus TransactionStatus(uint64_t term_id, uint64_t log_index) = 0;
virtual std::mutex &WithLock() = 0;
protected:

View File

@ -429,8 +429,9 @@ void RaftServer::PersistSnapshotMetadata(
snapshot_metadata_.emplace(snapshot_metadata);
}
void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &deltas) {
std::pair<std::optional<uint64_t>, std::optional<uint64_t>>
RaftServer::AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &deltas) {
std::unique_lock<std::mutex> lock(lock_);
DCHECK(mode_ == Mode::LEADER)
<< "`AppendToLog` should only be called in LEADER mode";
@ -440,7 +441,7 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
<< "Transactions with two state deltas must be reads (start with BEGIN "
"and end with COMMIT)";
rlog_->set_replicated(tx_id);
return;
return {std::nullopt, std::nullopt};
}
rlog_->set_active(tx_id);
@ -459,9 +460,10 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
replication_timeout_.Insert(tx_id);
state_changed_.notify_all();
return std::make_pair(current_term_.load(), log_size_ - 1);
}
bool RaftServer::Emplace(const database::StateDelta &delta) {
DeltaStatus RaftServer::Emplace(const database::StateDelta &delta) {
return log_entry_buffer_.Emplace(delta);
}
@ -520,6 +522,21 @@ bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; }
uint64_t RaftServer::TermId() { return current_term_; }
TxStatus RaftServer::TransactionStatus(uint64_t term_id, uint64_t log_index) {
std::unique_lock<std::mutex> lock(lock_);
if (term_id > current_term_ || log_index >= log_size_)
return TxStatus::INVALID;
auto log_entry = GetLogEntry(log_index);
// This is correct because the leader can only append to the log and no two
// workers can be leaders in the same term.
if (log_entry.term != term_id) return TxStatus::ABORTED;
if (last_applied_ < log_index) return TxStatus::WAITING;
return TxStatus::REPLICATED;
}
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
: raft_server_(raft_server) {
CHECK(raft_server_) << "RaftServer can't be nullptr";
@ -537,11 +554,16 @@ void RaftServer::LogEntryBuffer::Disable() {
logs_.clear();
}
bool RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
DeltaStatus RaftServer::LogEntryBuffer::Emplace(
const database::StateDelta &delta) {
std::unique_lock<std::mutex> lock(buffer_lock_);
if (!enabled_) return false;
if (!enabled_) return {false, std::nullopt, std::nullopt};
tx::TransactionId tx_id = delta.transaction_id;
std::optional<uint64_t> term_id = std::nullopt;
std::optional<uint64_t> log_index = std::nullopt;
if (delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) {
auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
@ -551,7 +573,9 @@ bool RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
logs_.erase(it);
lock.unlock();
raft_server_->AppendToLog(tx_id, log);
auto metadata = raft_server_->AppendToLog(tx_id, log);
term_id = metadata.first;
log_index = metadata.second;
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id);
@ -562,7 +586,7 @@ bool RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
logs_[tx_id].emplace_back(std::move(delta));
}
return true;
return {true, term_id, log_index};
}
void RaftServer::RecoverPersistentData() {

View File

@ -92,16 +92,22 @@ class RaftServer final : public RaftInterface {
/// Persists snapshot metadata.
void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata);
/// Append to the log a list of batched state deltasa that are ready to be
/// Append to the log a list of batched state deltas that are ready to be
/// replicated.
void AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &deltas);
///
/// @returns metadata about the emplaced log entry. More precisely, an
/// ordered pair (term_id, log_id) of the newly emplaced
/// log entry. If the entry was not emplaced, the method
/// returns std::nullopt (e.g. read-only transactions).
std::pair<std::optional<uint64_t>, std::optional<uint64_t>> AppendToLog(
const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &deltas);
/// Emplace a single StateDelta to the corresponding batch. If the StateDelta
/// marks the transaction end, it will replicate the log accorss the cluster.
///
/// @returns true if the Delta is emplaced, false otherwise.
bool Emplace(const database::StateDelta &delta) override;
/// @returns DeltaStatus object as a result.
DeltaStatus Emplace(const database::StateDelta &delta) override;
/// Checks if the transaction with the given transaction id can safely be
/// Returns the current state of the replication known by this machine.
@ -122,6 +128,10 @@ class RaftServer final : public RaftInterface {
/// Returns the term ID of the current leader.
uint64_t TermId() override;
/// Returns the status of the transaction which began its replication in
/// a given term ID and was emplaced in the raft log at the given index.
TxStatus TransactionStatus(uint64_t term_id, uint64_t log_index) override;
void GarbageCollectReplicationLog(const tx::TransactionId &tx_id);
private:
@ -151,8 +161,8 @@ class RaftServer final : public RaftInterface {
/// replicating, and if the type is `TRANSACTION_ABORT` it will delete the
/// log from buffer.
///
/// @returns true if the Delta is emplaced, false otherwise.
bool Emplace(const database::StateDelta &delta);
/// @returns DeltaStatus object as a result.
DeltaStatus Emplace(const database::StateDelta &delta);
private:
bool enabled_{false};

View File

@ -79,7 +79,8 @@ CommandId Engine::UpdateCommand(TransactionId id) {
void Engine::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Committing transaction " << t.id_;
if (raft_->Emplace(database::StateDelta::TxCommit(t.id_))) {
auto delta_status = raft_->Emplace(database::StateDelta::TxCommit(t.id_));
if (delta_status.emplaced) {
// It is important to note the following situation. If our cluster ends up
// with a network partition where the current leader can't communicate with
// the majority of the peers, and the client is still sending queries to it,

View File

@ -12,9 +12,9 @@ using namespace tx;
class RaftMock final : public raft::RaftInterface {
public:
bool Emplace(const database::StateDelta &delta) override {
raft::DeltaStatus Emplace(const database::StateDelta &delta) override {
log_[delta.transaction_id].emplace_back(std::move(delta));
return true;
return {true, std::nullopt};
}
bool SafeToCommit(const tx::TransactionId &) override {
@ -25,6 +25,11 @@ class RaftMock final : public raft::RaftInterface {
uint64_t TermId() override { return 1; }
raft::TxStatus TransactionStatus(uint64_t term_id,
uint64_t log_index) override {
return raft::TxStatus::REPLICATED;
}
std::vector<database::StateDelta> GetLogForTx(
const tx::TransactionId &tx_id) {
return log_[tx_id];