Remove Raft's dependency on transaction id

Reviewers: buda, mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2338
This commit is contained in:
Ivan Paljak 2019-09-10 15:51:04 +02:00
parent 85b01f8497
commit deee3b8ab7
16 changed files with 310 additions and 416 deletions

View File

@ -71,6 +71,8 @@ raft::StorageInfo *GraphDb::storage_info() { return &storage_info_; }
tx::Engine &GraphDb::tx_engine() { return tx_engine_; } tx::Engine &GraphDb::tx_engine() { return tx_engine_; }
storage::StateDeltaBuffer *GraphDb::sd_buffer() { return &sd_buffer_; }
storage::ConcurrentIdMapper<storage::Label> &GraphDb::label_mapper() { storage::ConcurrentIdMapper<storage::Label> &GraphDb::label_mapper() {
return label_mapper_; return label_mapper_;
} }

View File

@ -13,6 +13,7 @@
#include "raft/storage_info.hpp" #include "raft/storage_info.hpp"
#include "storage/common/types/types.hpp" #include "storage/common/types/types.hpp"
#include "storage/single_node_ha/concurrent_id_mapper.hpp" #include "storage/single_node_ha/concurrent_id_mapper.hpp"
#include "storage/single_node_ha/state_delta_buffer.hpp"
#include "storage/single_node_ha/storage.hpp" #include "storage/single_node_ha/storage.hpp"
#include "storage/single_node_ha/storage_gc.hpp" #include "storage/single_node_ha/storage_gc.hpp"
#include "transactions/single_node_ha/engine.hpp" #include "transactions/single_node_ha/engine.hpp"
@ -84,6 +85,7 @@ class GraphDb {
raft::RaftInterface *raft(); raft::RaftInterface *raft();
raft::StorageInfo *storage_info(); raft::StorageInfo *storage_info();
tx::Engine &tx_engine(); tx::Engine &tx_engine();
storage::StateDeltaBuffer *sd_buffer();
storage::ConcurrentIdMapper<storage::Label> &label_mapper(); storage::ConcurrentIdMapper<storage::Label> &label_mapper();
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper(); storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper();
storage::ConcurrentIdMapper<storage::Property> &property_mapper(); storage::ConcurrentIdMapper<storage::Property> &property_mapper();
@ -138,8 +140,9 @@ class GraphDb {
&coordination_, &coordination_,
this}; this};
raft::StorageInfo storage_info_{this, &coordination_, config_.server_id}; raft::StorageInfo storage_info_{this, &coordination_, config_.server_id};
storage::StateDeltaBuffer sd_buffer_;
tx::Engine tx_engine_{&raft_server_}; tx::Engine tx_engine_{&raft_server_, &sd_buffer_};
std::unique_ptr<StorageGc> storage_gc_ = std::make_unique<StorageGc>( std::unique_ptr<StorageGc> storage_gc_ = std::make_unique<StorageGc>(
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec); *storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);
storage::ConcurrentIdMapper<storage::Label> label_mapper_{ storage::ConcurrentIdMapper<storage::Label> label_mapper_{

View File

@ -90,7 +90,13 @@ bool GraphDbAccessor::should_abort() const {
return transaction_->should_abort(); return transaction_->should_abort();
} }
raft::RaftInterface *GraphDbAccessor::raft() { return db_->raft(); } raft::RaftInterface *GraphDbAccessor::raft() {
return db_->raft();
}
storage::StateDeltaBuffer *GraphDbAccessor::sd_buffer() {
return db_->sd_buffer();
}
VertexAccessor GraphDbAccessor::InsertVertex( VertexAccessor GraphDbAccessor::InsertVertex(
std::optional<storage::Gid> requested_gid) { std::optional<storage::Gid> requested_gid) {
@ -103,7 +109,7 @@ VertexAccessor GraphDbAccessor::InsertVertex(
db_->storage().vertices_.access().insert(gid, vertex_vlist).second; db_->storage().vertices_.access().insert(gid, vertex_vlist).second;
CHECK(success) << "Attempting to insert a vertex with an existing GID: " CHECK(success) << "Attempting to insert a vertex with an existing GID: "
<< gid.AsUint(); << gid.AsUint();
raft()->Emplace( sd_buffer()->Emplace(
database::StateDelta::CreateVertex(transaction_->id_, vertex_vlist->gid_)); database::StateDelta::CreateVertex(transaction_->id_, vertex_vlist->gid_));
auto va = VertexAccessor(vertex_vlist, *this); auto va = VertexAccessor(vertex_vlist, *this);
return va; return va;
@ -165,9 +171,10 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) { void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) {
// Commit transaction as we finished applying method on newest visible // Commit transaction as we finished applying method on newest visible
// records. Write that transaction's ID to the RaftServer as the index has been // records. Write that transaction's ID to the RaftServer as the index has
// built at this point even if this DBA's transaction aborts for some reason. // been built at this point even if this DBA's transaction aborts for some
raft()->Emplace(database::StateDelta::BuildIndex( // reason.
sd_buffer()->Emplace(database::StateDelta::BuildIndex(
transaction_id(), key.label_, LabelName(key.label_), key.property_, transaction_id(), key.label_, LabelName(key.label_), key.property_,
PropertyName(key.property_))); PropertyName(key.property_)));
} }
@ -190,7 +197,7 @@ void GraphDbAccessor::DeleteIndex(storage::Label label,
auto dba = db_->AccessBlocking(std::make_optional(transaction_->id_)); auto dba = db_->AccessBlocking(std::make_optional(transaction_->id_));
db_->storage().label_property_index_.DeleteIndex(key); db_->storage().label_property_index_.DeleteIndex(key);
dba.raft()->Emplace(database::StateDelta::DropIndex( dba.sd_buffer()->Emplace(database::StateDelta::DropIndex(
dba.transaction_id(), key.label_, LabelName(key.label_), key.property_, dba.transaction_id(), key.label_, LabelName(key.label_), key.property_,
PropertyName(key.property_))); PropertyName(key.property_)));
@ -226,7 +233,7 @@ void GraphDbAccessor::BuildUniqueConstraint(
return dba.PropertyName(property); return dba.PropertyName(property);
}); });
dba.raft()->Emplace(database::StateDelta::BuildUniqueConstraint( dba.sd_buffer()->Emplace(database::StateDelta::BuildUniqueConstraint(
dba.transaction().id_, label, dba.LabelName(label), properties, dba.transaction().id_, label, dba.LabelName(label), properties,
property_names)); property_names));
@ -264,7 +271,7 @@ void GraphDbAccessor::DeleteUniqueConstraint(
return dba.PropertyName(property); return dba.PropertyName(property);
}); });
dba.raft()->Emplace(database::StateDelta::DropUniqueConstraint( dba.sd_buffer()->Emplace(database::StateDelta::DropUniqueConstraint(
dba.transaction().id_, label, dba.LabelName(label), properties, dba.transaction().id_, label, dba.LabelName(label), properties,
property_names)); property_names));
@ -420,7 +427,7 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
return false; return false;
auto *vlist_ptr = vertex_accessor.address(); auto *vlist_ptr = vertex_accessor.address();
raft()->Emplace(database::StateDelta::RemoveVertex( sd_buffer()->Emplace(database::StateDelta::RemoveVertex(
transaction_->id_, vlist_ptr->gid_, check_empty)); transaction_->id_, vlist_ptr->gid_, check_empty));
vlist_ptr->remove(vertex_accessor.current_, *transaction_); vlist_ptr->remove(vertex_accessor.current_, *transaction_);
return true; return true;
@ -467,7 +474,7 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
to.SwitchNew(); to.SwitchNew();
to.update().in_.emplace(from.address(), edge_vlist, edge_type); to.update().in_.emplace(from.address(), edge_vlist, edge_type);
raft()->Emplace(database::StateDelta::CreateEdge( sd_buffer()->Emplace(database::StateDelta::CreateEdge(
transaction_->id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type, transaction_->id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type,
EdgeTypeName(edge_type))); EdgeTypeName(edge_type)));
@ -492,7 +499,8 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
if (remove_in_edge) edge.to().RemoveInEdge(edge.address()); if (remove_in_edge) edge.to().RemoveInEdge(edge.address());
edge.address()->remove(edge.current_, *transaction_); edge.address()->remove(edge.current_, *transaction_);
raft()->Emplace(database::StateDelta::RemoveEdge(transaction_->id_, edge.gid())); sd_buffer()->Emplace(
database::StateDelta::RemoveEdge(transaction_->id_, edge.gid()));
} }
storage::Label GraphDbAccessor::Label(const std::string &label_name) { storage::Label GraphDbAccessor::Label(const std::string &label_name) {

View File

@ -15,6 +15,7 @@
#include "raft/raft_interface.hpp" #include "raft/raft_interface.hpp"
#include "storage/common/types/types.hpp" #include "storage/common/types/types.hpp"
#include "storage/single_node_ha/edge_accessor.hpp" #include "storage/single_node_ha/edge_accessor.hpp"
#include "storage/single_node_ha/state_delta_buffer.hpp"
#include "storage/single_node_ha/vertex_accessor.hpp" #include "storage/single_node_ha/vertex_accessor.hpp"
#include "transactions/transaction.hpp" #include "transactions/transaction.hpp"
#include "transactions/type.hpp" #include "transactions/type.hpp"
@ -596,6 +597,7 @@ class GraphDbAccessor {
const tx::Transaction &transaction() const { return *transaction_; } const tx::Transaction &transaction() const { return *transaction_; }
raft::RaftInterface *raft(); raft::RaftInterface *raft();
storage::StateDeltaBuffer *sd_buffer();
auto &db() { return db_; } auto &db() { return db_; }
const auto &db() const { return db_; } const auto &db() const { return db_; }

View File

@ -9,50 +9,50 @@
namespace raft { namespace raft {
enum class TxStatus { REPLICATED, WAITING, ABORTED, INVALID }; enum class ReplicationStatus { REPLICATED, WAITING, ABORTED, INVALID };
inline std::string TxStatusToString(const TxStatus &tx_status) { inline std::string ReplicationStatusToString(
switch (tx_status) { const ReplicationStatus &replication_status) {
case TxStatus::REPLICATED: switch (replication_status) {
case ReplicationStatus::REPLICATED:
return "REPLICATED"; return "REPLICATED";
case TxStatus::WAITING: case ReplicationStatus::WAITING:
return "WAITING"; return "WAITING";
case TxStatus::ABORTED: case ReplicationStatus::ABORTED:
return "ABORTED"; return "ABORTED";
case TxStatus::INVALID: case ReplicationStatus::INVALID:
return "INVALID"; return "INVALID";
} }
} }
/// Structure which describes the StateDelta status after the execution of /// Structure which describes the status of a newly created LogEntry after the
/// RaftServer's Emplace method. /// execution of RaftServer's Emplace method.
/// ///
/// It consists of two fields: /// It consists of two unsigned 64-bit integers which uniquely describe
/// 1) A boolean flag `emplaced` which signals whether the delta has /// the emplaced LogEntry:
/// successfully been emplaced in the raft log buffer. /// 1) Term when the LogEntry was emplaced to the Raft log.
/// 2) Two optional unsigned 64-bit integers which denote the term /// 2) Index of the entry within the Raft log.
/// when the corresponding LogEntry was emplaced and its log_index in ///
/// the Raft log. These values are contained in the optional metadata only /// In the case an entry was not successfully emplaced (e.g. unexpected
/// if the emplaced StateDelta signifies the COMMIT of a non-read-only /// leadership change), the values will have a std::nullopt value instead.
/// transaction. struct LogEntryStatus {
struct DeltaStatus { uint64_t term_id;
bool emplaced; uint64_t log_index;
std::optional<uint64_t> term_id;
std::optional<uint64_t> log_index;
}; };
/// Exposes only functionality that other parts of Memgraph can interact with, /// Exposes only functionality that other parts of Memgraph can interact with.
/// emplacing a state delta into the appropriate Raft log entry.
class RaftInterface { class RaftInterface {
public: public:
/// Add StateDelta to the appropriate Raft log entry. /// Emplace a new LogEntry in the raft log and start its replication. This
/// entry is created from a given batched set of StateDelta objects.
/// ///
/// @returns DeltaStatus object as a result. /// It is possible that the entry was not successfully emplaced. In that case,
virtual DeltaStatus Emplace(const database::StateDelta &) = 0; /// the method returns std::nullopt and the caller is responsible for handling
/// situation correctly (e.g. aborting the corresponding transaction).
/// Checks if the transaction with the given transaction id can safely be ///
/// committed in local storage. /// @returns an optional LogEntryStatus object as result.
virtual bool SafeToCommit(const tx::TransactionId &) = 0; virtual std::optional<LogEntryStatus> Emplace(
const std::vector<database::StateDelta> &) = 0;
/// Returns true if the current servers mode is LEADER. False otherwise. /// Returns true if the current servers mode is LEADER. False otherwise.
virtual bool IsLeader() = 0; virtual bool IsLeader() = 0;
@ -60,18 +60,32 @@ class RaftInterface {
/// Returns the term ID of the current leader. /// Returns the term ID of the current leader.
virtual uint64_t TermId() = 0; virtual uint64_t TermId() = 0;
/// Returns the status of the transaction which began its replication in /// Returns the replication status of LogEntry which began its replication in
/// a given term ID and was emplaced in the raft log at the given index. /// a given term ID and was emplaced in the raft log at the given index.
/// ///
/// Transaction status can be one of the following: /// Replication status can be one of the following
/// 1) REPLICATED -- transaction was successfully replicated accross /// 1) REPLICATED -- LogEntry was successfully replicated across
/// the Raft cluster /// the Raft cluster
/// 2) WAITING -- transaction was successfully emplaced in the Raft /// 2) WAITING -- LogEntry was successfully emplaced in the Raft
/// log and is currently being replicated. /// log and is currently being replicated.
/// 3) ABORTED -- transaction was aborted. /// 3) ABORTED -- LogEntry will not be replicated.
/// 4) INVALID -- the request for the transaction was invalid, most /// 4) INVALID -- the request for the LogEntry was invalid, most
/// likely either term_id or log_index were out of range. /// likely either term_id or log_index were out of range.
virtual TxStatus TransactionStatus(uint64_t term_id, uint64_t log_index) = 0; virtual ReplicationStatus GetReplicationStatus(uint64_t term_id,
uint64_t log_index) = 0;
/// Checks if the LogEntry with the give term id and log index can safely be
/// committed in local storage.
///
/// @param term_id term when the LogEntry was created
/// @param log_index index of the LogEntry in the Raft log
///
/// @return bool True if the transaction is safe to commit, false otherwise.
///
/// @throws ReplicationTimeoutException
/// @throws RaftShutdownException
/// @throws InvalidReplicationLogLookup
virtual bool SafeToCommit(uint64_t term_id, uint64_t log_index) = 0;
virtual std::mutex &WithLock() = 0; virtual std::mutex &WithLock() = 0;

View File

@ -41,7 +41,6 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
: config_(config), : config_(config),
coordination_(coordination), coordination_(coordination),
db_(db), db_(db),
rlog_(std::make_unique<ReplicationLog>()),
mode_(Mode::FOLLOWER), mode_(Mode::FOLLOWER),
server_id_(server_id), server_id_(server_id),
durability_dir_(fs::path(durability_dir)), durability_dir_(fs::path(durability_dir)),
@ -432,22 +431,13 @@ void RaftServer::PersistSnapshotMetadata(
snapshot_metadata_.emplace(snapshot_metadata); snapshot_metadata_.emplace(snapshot_metadata);
} }
std::pair<std::optional<uint64_t>, std::optional<uint64_t>> std::optional<LogEntryStatus> RaftServer::Emplace(
RaftServer::AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &deltas) { const std::vector<database::StateDelta> &deltas) {
std::unique_lock<std::mutex> lock(lock_); std::unique_lock<std::mutex> lock(lock_);
DCHECK(mode_ == Mode::LEADER) if (mode_ != Mode::LEADER) {
<< "`AppendToLog` should only be called in LEADER mode"; return std::nullopt;
if (deltas.size() == 2) {
DCHECK(deltas[0].type == database::StateDelta::Type::TRANSACTION_BEGIN &&
deltas[1].type == database::StateDelta::Type::TRANSACTION_COMMIT)
<< "Transactions with two state deltas must be reads (start with BEGIN "
"and end with COMMIT)";
rlog_->set_replicated(tx_id);
return {std::nullopt, std::nullopt};
} }
rlog_->set_active(tx_id);
LogEntry new_entry(current_term_, deltas); LogEntry new_entry(current_term_, deltas);
log_[log_size_] = new_entry; log_[log_size_] = new_entry;
@ -460,136 +450,55 @@ RaftServer::AppendToLog(const tx::TransactionId &tx_id,
for (auto &peer_replication : next_replication_) peer_replication = now; for (auto &peer_replication : next_replication_) peer_replication = now;
// From this point on, we can say that the replication of a LogEntry started. // From this point on, we can say that the replication of a LogEntry started.
replication_timeout_.Insert(tx_id); replication_timeout_.Insert(new_entry.term, log_size_ - 1);
state_changed_.notify_all(); state_changed_.notify_all();
return std::make_pair(current_term_.load(), log_size_ - 1); return {{new_entry.term, log_size_ - 1}};
}
DeltaStatus RaftServer::Emplace(const database::StateDelta &delta) {
return log_entry_buffer_.Emplace(delta);
}
bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
switch (mode_) {
case Mode::CANDIDATE:
// When Memgraph first starts, the Raft is initialized in candidate
// mode and we try to perform recovery. Since everything for recovery
// needs to be able to commit, we return true.
return true;
case Mode::FOLLOWER:
// When in follower mode, we will only try to apply a Raft Log when we
// receive a commit index greater or equal from the Log index from the
// leader. At that moment we don't have to check the replication log
// because the leader won't commit the Log locally if it's not replicated
// on the majority of the peers in the cluster. This is why we can short
// circuit the check to always return true if in follower mode.
return true;
case Mode::LEADER:
// We are taking copies of the rlog_ status here so we avoid the case
// where the call to `set_replicated` shadows the `active` bit that is
// checked after the `replicated` bit. It is possible that both `active`
// and `replicated` are `true` but since we check `replicated` first this
// shouldn't be a problem.
bool active = rlog_->is_active(tx_id);
bool replicated = rlog_->is_replicated(tx_id);
// If we are shutting down, but we know that the Raft Log replicated
// successfully, we return true. This will eventually commit since we
// replicate NoOp on leader election.
if (replicated) return true;
// Only if the transaction isn't replicated, thrown an exception to inform
// the client.
if (exiting_) throw RaftShutdownException();
if (active) {
if (replication_timeout_.CheckTimeout(tx_id)) {
throw ReplicationTimeoutException();
}
return false;
}
// The only possibility left is that our ReplicationLog doesn't contain
// information about that tx.
throw InvalidReplicationLogLookup();
break;
}
}
void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) {
rlog_->garbage_collect_older(tx_id);
} }
bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; } bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; }
uint64_t RaftServer::TermId() { return current_term_; } uint64_t RaftServer::TermId() { return current_term_; }
TxStatus RaftServer::TransactionStatus(uint64_t term_id, uint64_t log_index) { ReplicationStatus RaftServer::GetReplicationStatus(uint64_t term_id,
uint64_t log_index) {
std::unique_lock<std::mutex> lock(lock_); std::unique_lock<std::mutex> lock(lock_);
if (term_id > current_term_ || log_index >= log_size_) if (term_id > current_term_ || log_index >= log_size_)
return TxStatus::INVALID; return ReplicationStatus::INVALID;
auto log_entry = GetLogEntry(log_index); auto log_entry = GetLogEntry(log_index);
// This is correct because the leader can only append to the log and no two // This is correct because the leader can only append to the log and no two
// workers can be leaders in the same term. // workers can be leaders in the same term.
if (log_entry.term != term_id) return TxStatus::ABORTED; if (log_entry.term != term_id) return ReplicationStatus::ABORTED;
if (last_applied_ < log_index) return TxStatus::WAITING; if (last_applied_ < log_index) return ReplicationStatus::WAITING;
return TxStatus::REPLICATED; return ReplicationStatus::REPLICATED;
} }
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) bool RaftServer::SafeToCommit(uint64_t term_id, uint64_t log_index) {
: raft_server_(raft_server) { auto replication_status = GetReplicationStatus(term_id, log_index);
CHECK(raft_server_) << "RaftServer can't be nullptr";
// If we are shutting down, but we know that the Raft Log replicated
// successfully, we return true. This will eventually commit since we
// replicate NoOp on leader election.
if (replication_status == ReplicationStatus::REPLICATED) return true;
// Only if the log entry isn't replicated, throw an exception to inform
// the client.
if (exiting_) throw RaftShutdownException();
if (replication_status == ReplicationStatus::WAITING) {
if (replication_timeout_.CheckTimeout(term_id, log_index)) {
throw ReplicationTimeoutException();
}
return false;
} }
void RaftServer::LogEntryBuffer::Enable() { // TODO(ipaljak): Fix the old naming.
std::lock_guard<std::mutex> guard(buffer_lock_); // The only possibility left is that our ReplicationLog doesn't contain
enabled_ = true; // information about that tx.
} throw InvalidReplicationLogLookup();
void RaftServer::LogEntryBuffer::Disable() {
std::lock_guard<std::mutex> guard(buffer_lock_);
enabled_ = false;
// Clear all existing logs from buffers.
logs_.clear();
}
DeltaStatus RaftServer::LogEntryBuffer::Emplace(
const database::StateDelta &delta) {
std::unique_lock<std::mutex> lock(buffer_lock_);
if (!enabled_) return {false, std::nullopt, std::nullopt};
tx::TransactionId tx_id = delta.transaction_id;
std::optional<uint64_t> term_id = std::nullopt;
std::optional<uint64_t> log_index = std::nullopt;
if (delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) {
auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
std::vector<database::StateDelta> log(std::move(it->second));
log.emplace_back(std::move(delta));
logs_.erase(it);
lock.unlock();
auto metadata = raft_server_->AppendToLog(tx_id, log);
term_id = metadata.first;
log_index = metadata.second;
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id);
// Sometimes it's possible that we're aborting a transaction that was meant
// to be commited and thus we don't have the StateDeltas anymore.
if (it != logs_.end()) logs_.erase(it);
} else {
logs_[tx_id].emplace_back(std::move(delta));
}
return {true, term_id, log_index};
} }
void RaftServer::RecoverPersistentData() { void RaftServer::RecoverPersistentData() {
@ -624,7 +533,6 @@ void RaftServer::Transition(const Mode &new_mode) {
bool reset = mode_ == Mode::LEADER; bool reset = mode_ == Mode::LEADER;
issue_hb_ = false; issue_hb_ = false;
mode_ = Mode::FOLLOWER; mode_ = Mode::FOLLOWER;
log_entry_buffer_.Disable();
if (reset) { if (reset) {
VLOG(40) << "Resetting internal state"; VLOG(40) << "Resetting internal state";
@ -632,7 +540,6 @@ void RaftServer::Transition(const Mode &new_mode) {
next_election_ = TimePoint::max(); next_election_ = TimePoint::max();
db_->Reset(); db_->Reset();
ResetReplicationLog();
replication_timeout_.Clear(); replication_timeout_.Clear();
// Re-apply raft log. // Re-apply raft log.
@ -718,7 +625,6 @@ void RaftServer::Transition(const Mode &new_mode) {
last_applied_ = log_size_ - 1; last_applied_ = log_size_ - 1;
mode_ = Mode::LEADER; mode_ = Mode::LEADER;
log_entry_buffer_.Enable();
leader_changed_.notify_all(); leader_changed_.notify_all();
break; break;
@ -763,12 +669,10 @@ void RaftServer::AdvanceCommitIndex() {
VLOG(40) << "Begin applying commited transactions"; VLOG(40) << "Begin applying commited transactions";
for (int i = commit_index_ + 1; i <= new_commit_index; ++i) { for (int i = commit_index_ + 1; i <= new_commit_index; ++i) {
auto deltas = GetLogEntry(i).deltas; auto log_entry = GetLogEntry(i);
DCHECK(deltas.size() > 2) DCHECK(log_entry.deltas.size() > 2)
<< "Log entry should consist of at least two state deltas."; << "Log entry should consist of at least three state deltas.";
auto tx_id = deltas[0].transaction_id; replication_timeout_.Remove(log_entry.term, i);
rlog_->set_replicated(tx_id);
replication_timeout_.Remove(tx_id);
} }
commit_index_ = new_commit_index; commit_index_ = new_commit_index;
@ -1331,11 +1235,6 @@ LogEntry RaftServer::DeserializeLogEntry(
return deserialized; return deserialized;
} }
void RaftServer::ResetReplicationLog() {
rlog_ = nullptr;
rlog_ = std::make_unique<ReplicationLog>();
}
void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) { void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) {
durability::RecoveryData recovery_data; durability::RecoveryData recovery_data;
bool recovery = durability::RecoverSnapshot( bool recovery = durability::RecoverSnapshot(
@ -1346,8 +1245,9 @@ void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) {
} }
void RaftServer::NoOpCreate() { void RaftServer::NoOpCreate() {
// TODO(ipaljak): Review this after implementing RaftDelta object.
auto dba = db_->Access(); auto dba = db_->Access();
Emplace(database::StateDelta::NoOp(dba.transaction_id())); db_->sd_buffer()->Emplace(database::StateDelta::NoOp(dba.transaction_id()));
try { try {
dba.Commit(); dba.Commit();
} catch (const RaftException &) { } catch (const RaftException &) {
@ -1361,6 +1261,8 @@ void RaftServer::ApplyStateDeltas(
std::optional<database::GraphDbAccessor> dba; std::optional<database::GraphDbAccessor> dba;
for (auto &delta : deltas) { for (auto &delta : deltas) {
switch (delta.type) { switch (delta.type) {
case database::StateDelta::Type::NO_OP:
break;
case database::StateDelta::Type::TRANSACTION_BEGIN: case database::StateDelta::Type::TRANSACTION_BEGIN:
CHECK(!dba) << "Double transaction start"; CHECK(!dba) << "Double transaction start";
dba = db_->Access(); dba = db_->Access();

View File

@ -92,35 +92,16 @@ class RaftServer final : public RaftInterface {
/// Persists snapshot metadata. /// Persists snapshot metadata.
void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata); void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata);
/// Append to the log a list of batched state deltas that are ready to be /// Emplace a new LogEntry in the raft log and start its replication. This
/// replicated. /// entry is created from a given batched set of StateDelta objects.
/// ///
/// @returns metadata about the emplaced log entry. More precisely, an /// It is possible that the entry was not successfully emplaced. In that case,
/// ordered pair (term_id, log_id) of the newly emplaced /// the method returns std::nullopt and the caller is responsible for handling
/// log entry. If the entry was not emplaced, the method /// situation correctly (e.g. aborting the corresponding transaction).
/// returns std::nullopt (e.g. read-only transactions).
std::pair<std::optional<uint64_t>, std::optional<uint64_t>> AppendToLog(
const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &deltas);
/// Emplace a single StateDelta to the corresponding batch. If the StateDelta
/// marks the transaction end, it will replicate the log accorss the cluster.
/// ///
/// @returns DeltaStatus object as a result. /// @returns an optional LogEntryStatus object as result.
DeltaStatus Emplace(const database::StateDelta &delta) override; std::optional<LogEntryStatus> Emplace(
const std::vector<database::StateDelta> &deltas) override;
/// Checks if the transaction with the given transaction id can safely be
/// Returns the current state of the replication known by this machine.
/// Checks if the transaction with the given transaction id can safely be
/// committed in local storage.
///
/// @param tx_id Transaction id which needs to be checked.
/// @return bool True if the transaction is safe to commit, false otherwise.
/// @throws ReplicationTimeoutException
/// @throws RaftShutdownException
/// @throws InvalidReplicationLogLookup
bool SafeToCommit(const tx::TransactionId &tx_id) override;
/// Returns true if the current servers mode is LEADER. False otherwise. /// Returns true if the current servers mode is LEADER. False otherwise.
bool IsLeader() override; bool IsLeader() override;
@ -128,51 +109,34 @@ class RaftServer final : public RaftInterface {
/// Returns the term ID of the current leader. /// Returns the term ID of the current leader.
uint64_t TermId() override; uint64_t TermId() override;
/// Returns the status of the transaction which began its replication in /// Returns the replication status of LogEntry which began its replication in
/// a given term ID and was emplaced in the raft log at the given index. /// a given term ID and was emplaced in the raft log at the given index.
TxStatus TransactionStatus(uint64_t term_id, uint64_t log_index) override; ///
/// Replication status can be one of the following
/// 1) REPLICATED -- LogEntry was successfully replicated across
/// the Raft cluster
/// 2) WAITING -- LogEntry was successfully emplaced in the Raft
/// log and is currently being replicated.
/// 3) ABORTED -- LogEntry will not be replicated.
/// 4) INVALID -- the request for the LogEntry was invalid, most
/// likely either term_id or log_index were out of range.
ReplicationStatus GetReplicationStatus(uint64_t term_id,
uint64_t log_index) override;
void GarbageCollectReplicationLog(const tx::TransactionId &tx_id); /// Checks if the LogEntry with the give term id and log index can safely be
/// committed in local storage.
///
/// @param term_id term when the LogEntry was created
/// @param log_index index of the LogEntry in the Raft log
///
/// @return bool True if the transaction is safe to commit, false otherwise.
///
/// @throws ReplicationTimeoutException
/// @throws RaftShutdownException
/// @throws InvalidReplicationLogLookup
bool SafeToCommit(uint64_t term_id, uint64_t log_index) override;
private: private:
/// Buffers incomplete Raft logs.
///
/// A Raft log is considered to be complete if it ends with a StateDelta
/// that represents transaction commit.
/// LogEntryBuffer will be used instead of WriteAheadLog. We don't need to
/// persist logs until we receive a majority vote from the Raft cluster, and
/// apply the to our local state machine(storage).
class LogEntryBuffer final {
public:
LogEntryBuffer() = delete;
explicit LogEntryBuffer(RaftServer *raft_server);
void Enable();
/// Disable all future insertions in the buffer.
///
/// Note: this will also clear all existing logs from buffers.
void Disable();
/// Insert a new StateDelta in logs.
///
/// If the StateDelta type is `TRANSACTION_COMMIT` it will start
/// replicating, and if the type is `TRANSACTION_ABORT` it will delete the
/// log from buffer.
///
/// @returns DeltaStatus object as a result.
DeltaStatus Emplace(const database::StateDelta &delta);
private:
bool enabled_{false};
mutable std::mutex buffer_lock_;
std::unordered_map<tx::TransactionId, std::vector<database::StateDelta>>
logs_;
RaftServer *raft_server_{nullptr};
};
mutable std::mutex lock_; ///< Guards all internal state. mutable std::mutex lock_; ///< Guards all internal state.
mutable std::mutex snapshot_lock_; ///< Guards snapshot creation and removal. mutable std::mutex snapshot_lock_; ///< Guards snapshot creation and removal.
mutable std::mutex heartbeat_lock_; ///< Guards HB issuing mutable std::mutex heartbeat_lock_; ///< Guards HB issuing
@ -184,7 +148,6 @@ class RaftServer final : public RaftInterface {
Config config_; ///< Raft config. Config config_; ///< Raft config.
Coordination *coordination_{nullptr}; ///< Cluster coordination. Coordination *coordination_{nullptr}; ///< Cluster coordination.
database::GraphDb *db_{nullptr}; database::GraphDb *db_{nullptr};
std::unique_ptr<ReplicationLog> rlog_{nullptr};
std::atomic<Mode> mode_; ///< Server's current mode. std::atomic<Mode> mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server. uint16_t server_id_; ///< ID of the current server.
@ -198,13 +161,6 @@ class RaftServer final : public RaftInterface {
std::atomic<bool> issue_hb_; ///< Flag which signalizes if the current server std::atomic<bool> issue_hb_; ///< Flag which signalizes if the current server
///< should send HBs to the rest of the cluster. ///< should send HBs to the rest of the cluster.
/// Raft log entry buffer.
///
/// LogEntryBuffer buffers Raft logs until a log is complete and ready for
/// replication. This doesn't have to persist, if something fails before a
/// log is ready for replication it will be discarded anyway.
LogEntryBuffer log_entry_buffer_{this};
std::vector<std::thread> peer_threads_; ///< One thread per peer which std::vector<std::thread> peer_threads_; ///< One thread per peer which
///< handles outgoing RPCs. ///< handles outgoing RPCs.
@ -450,9 +406,6 @@ class RaftServer final : public RaftInterface {
/// Deserialized Raft log entry from `std::string` /// Deserialized Raft log entry from `std::string`
LogEntry DeserializeLogEntry(const std::string &serialized_log_entry); LogEntry DeserializeLogEntry(const std::string &serialized_log_entry);
/// Resets the replication log used to indicate the replication status.
void ResetReplicationLog();
/// Recovers the given snapshot if it exists in the durability directory. /// Recovers the given snapshot if it exists in the durability directory.
void RecoverSnapshot(const std::string &snapshot_filename); void RecoverSnapshot(const std::string &snapshot_filename);

View File

@ -2,10 +2,8 @@
#pragma once #pragma once
#include <chrono> #include <chrono>
#include <map>
#include <mutex> #include <mutex>
#include <unordered_map>
#include "transactions/type.hpp"
namespace raft { namespace raft {
@ -34,23 +32,23 @@ class ReplicationTimeoutMap final {
} }
/// Remove a single entry from the map. /// Remove a single entry from the map.
void Remove(const tx::TransactionId &tx_id) { void Remove(const uint64_t term_id, const uint64_t log_index) {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(lock_);
timeout_.erase(tx_id); timeout_.erase({term_id, log_index});
} }
/// Inserts and entry in the map by setting a point in time until it needs to /// Inserts and entry in the map by setting a point in time until it needs to
/// replicated. /// replicated.
void Insert(const tx::TransactionId &tx_id) { void Insert(const uint64_t term_id, const uint64_t log_index) {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(lock_);
timeout_.emplace(tx_id, replication_timeout_ + Clock::now()); timeout_[{term_id, log_index}] = replication_timeout_ + Clock::now();
} }
/// Checks if the given entry has timed out. /// Checks if the given entry has timed out.
/// @returns bool True if it exceeded timeout, false otherwise. /// @returns bool True if it exceeded timeout, false otherwise.
bool CheckTimeout(const tx::TransactionId &tx_id) { bool CheckTimeout(const uint64_t term_id, const uint64_t log_index) {
std::lock_guard<std::mutex> guard(lock_); std::lock_guard<std::mutex> guard(lock_);
auto found = timeout_.find(tx_id); auto found = timeout_.find({term_id, log_index});
// If we didn't set the timeout yet, or we already deleted it, we didn't // If we didn't set the timeout yet, or we already deleted it, we didn't
// time out. // time out.
if (found == timeout_.end()) return false; if (found == timeout_.end()) return false;
@ -65,7 +63,9 @@ class ReplicationTimeoutMap final {
std::chrono::milliseconds replication_timeout_; std::chrono::milliseconds replication_timeout_;
mutable std::mutex lock_; mutable std::mutex lock_;
std::unordered_map<tx::TransactionId, TimePoint> timeout_; // TODO(ipaljak): Consider using unordered_map if we encounter any performance
// issues.
std::map<std::pair<uint64_t, uint64_t>, TimePoint> timeout_;
}; };
} // namespace raft } // namespace raft

View File

@ -28,7 +28,7 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key,
auto previous_value = PropsAt(key); auto previous_value = PropsAt(key);
update().properties_.set(key, value); update().properties_.set(key, value);
dba.UpdateOnAddProperty(key, previous_value, value, *this, &update()); dba.UpdateOnAddProperty(key, previous_value, value, *this, &update());
dba.raft()->Emplace(delta); dba.sd_buffer()->Emplace(delta);
} }
template <> template <>
@ -39,7 +39,7 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key,
dba.PropertyName(key), value); dba.PropertyName(key), value);
update().properties_.set(key, value); update().properties_.set(key, value);
dba.raft()->Emplace(delta); dba.sd_buffer()->Emplace(delta);
} }
template <> template <>
@ -51,7 +51,7 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) {
auto previous_value = PropsAt(key); auto previous_value = PropsAt(key);
update().properties_.set(key, PropertyValue()); update().properties_.set(key, PropertyValue());
dba.UpdateOnRemoveProperty(key, previous_value, *this, &update()); dba.UpdateOnRemoveProperty(key, previous_value, *this, &update());
dba.raft()->Emplace(delta); dba.sd_buffer()->Emplace(delta);
} }
template <> template <>
@ -61,7 +61,7 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) {
StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue()); dba.PropertyName(key), PropertyValue());
update().properties_.set(key, PropertyValue()); update().properties_.set(key, PropertyValue());
dba.raft()->Emplace(delta); dba.sd_buffer()->Emplace(delta);
} }
template <typename TRecord> template <typename TRecord>

View File

@ -0,0 +1,47 @@
#pragma once
#include "durability/single_node_ha/state_delta.hpp"
namespace storage {
class StateDeltaBuffer final {
public:
/// Inserts a new StateDelta in buffer.
void Emplace(const database::StateDelta &delta) {
tx::TransactionId tx_id = delta.transaction_id;
std::vector<database::StateDelta> *curr_buffer;
{
// We only need the lock when we're inserting a new key into the buffer.
std::lock_guard<std::mutex> lock(buffer_lock_);
curr_buffer = &buffer_[tx_id];
}
curr_buffer->emplace_back(delta);
}
/// Retrieves all buffered StateDeltas for a given transaction id.
/// If there are no such StateDeltas, the return vector is empty.
std::vector<database::StateDelta> GetDeltas(
const tx::TransactionId &tx_id) {
std::vector<database::StateDelta> *curr_buffer;
{
std::lock_guard<std::mutex> lock(buffer_lock_);
auto it = buffer_.find(tx_id);
if (it == buffer_.end()) return {};
curr_buffer = &it->second;
}
return *curr_buffer;
}
/// Deletes all buffered StateDeltas for a given transaction id.
void Erase(const tx::TransactionId &tx_id) {
std::lock_guard<std::mutex> lock(buffer_lock_);
buffer_.erase(tx_id);
}
private:
mutable std::mutex buffer_lock_;
std::unordered_map<tx::TransactionId, std::vector<database::StateDelta>>
buffer_;
};
} // namespace storage

View File

@ -81,7 +81,6 @@ class StorageGc {
auto safe_to_delete = GetClogSafeTransaction(oldest_active); auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) { if (safe_to_delete) {
tx_engine_.GarbageCollectCommitLog(*safe_to_delete); tx_engine_.GarbageCollectCommitLog(*safe_to_delete);
raft_server_->GarbageCollectReplicationLog(*safe_to_delete);
} }
} }

View File

@ -24,7 +24,7 @@ void VertexAccessor::add_label(storage::Label label) {
// not a duplicate label, add it // not a duplicate label, add it
if (!utils::Contains(vertex.labels_, label)) { if (!utils::Contains(vertex.labels_, label)) {
vertex.labels_.emplace_back(label); vertex.labels_.emplace_back(label);
dba.raft()->Emplace(delta); dba.sd_buffer()->Emplace(delta);
dba.UpdateOnAddLabel(label, *this, &vertex); dba.UpdateOnAddLabel(label, *this, &vertex);
} }
} }
@ -39,7 +39,7 @@ void VertexAccessor::remove_label(storage::Label label) {
auto found = std::find(labels.begin(), labels.end(), delta.label); auto found = std::find(labels.begin(), labels.end(), delta.label);
std::swap(*found, labels.back()); std::swap(*found, labels.back());
labels.pop_back(); labels.pop_back();
dba.raft()->Emplace(delta); dba.sd_buffer()->Emplace(delta);
dba.UpdateOnRemoveLabel(label, *this); dba.UpdateOnRemoveLabel(label, *this);
} }
} }

View File

@ -10,9 +10,13 @@
namespace tx { namespace tx {
Engine::Engine(raft::RaftInterface *raft) Engine::Engine(raft::RaftInterface *raft,
: clog_(std::make_unique<CommitLog>()), raft_(raft) { storage::StateDeltaBuffer *delta_buffer)
: clog_(std::make_unique<CommitLog>()),
raft_(raft),
delta_buffer_(delta_buffer) {
CHECK(raft) << "Raft can't be nullptr in HA"; CHECK(raft) << "Raft can't be nullptr in HA";
CHECK(delta_buffer) << "State delta buffer can't be nullptr in HA";
} }
Transaction *Engine::Begin() { Transaction *Engine::Begin() {
@ -79,8 +83,43 @@ CommandId Engine::UpdateCommand(TransactionId id) {
void Engine::Commit(const Transaction &t) { void Engine::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Committing transaction " << t.id_; VLOG(11) << "[Tx] Committing transaction " << t.id_;
auto delta_status = raft_->Emplace(database::StateDelta::TxCommit(t.id_)); delta_buffer_->Emplace(database::StateDelta::TxCommit(t.id_));
if (delta_status.emplaced) { auto deltas = delta_buffer_->GetDeltas(t.id_);
// If we have only two state deltas in our transaction, that means we are
// dealing with a read-only transaction which does not need to be replicated
// throughout the cluster, so we simply commit it in our storage.
//
// Also, when the current server is not in the leader mode, the following
// holds:
//
// 1) In CANDIDATE mode we need to be able to commit because Raft is
// initialzed in that mode and needs to perform recovery.
//
// 2) In FOLLOWER mode, Raft will only try to apply state deltas from logs
// that are behind the current commit index and are therefore safe to
// apply.
if (deltas.size() == 2 || !raft_->IsLeader()) {
delta_buffer_->Erase(t.id_);
std::lock_guard<utils::SpinLock> guard(lock_);
clog_->set_committed(t.id_);
active_.remove(t.id_);
store_.erase(store_.find(t.id_));
if (t.blocking()) {
accepting_transactions_.store(true);
}
return;
}
auto log_entry_status = raft_->Emplace(deltas);
// Log Entry was not successfully emplaced and the transaction should be
// aborted
if (!log_entry_status) {
Abort(t);
return;
}
// It is important to note the following situation. If our cluster ends up // It is important to note the following situation. If our cluster ends up
// with a network partition where the current leader can't communicate with // with a network partition where the current leader can't communicate with
// the majority of the peers, and the client is still sending queries to it, // the majority of the peers, and the client is still sending queries to it,
@ -101,7 +140,9 @@ void Engine::Commit(const Transaction &t) {
// Wait for Raft to receive confirmation from the majority of followers. // Wait for Raft to receive confirmation from the majority of followers.
while (true) { while (true) {
try { try {
if (raft_->SafeToCommit(t.id_)) break; if (raft_->SafeToCommit(log_entry_status->term_id,
log_entry_status->log_index))
break;
} catch (const raft::ReplicationTimeoutException &e) { } catch (const raft::ReplicationTimeoutException &e) {
std::lock_guard<utils::SpinLock> guard(lock_); std::lock_guard<utils::SpinLock> guard(lock_);
if (replication_errors_.insert(t.id_).second) { if (replication_errors_.insert(t.id_).second) {
@ -131,8 +172,8 @@ void Engine::Commit(const Transaction &t) {
if (!raft_->IsLeader()) break; if (!raft_->IsLeader()) break;
std::this_thread::sleep_for(std::chrono::microseconds(100)); std::this_thread::sleep_for(std::chrono::microseconds(100));
} }
}
delta_buffer_->Erase(t.id_);
std::lock_guard<utils::SpinLock> guard(lock_); std::lock_guard<utils::SpinLock> guard(lock_);
replication_errors_.erase(t.id_); replication_errors_.erase(t.id_);
clog_->set_committed(t.id_); clog_->set_committed(t.id_);
@ -145,7 +186,7 @@ void Engine::Commit(const Transaction &t) {
void Engine::Abort(const Transaction &t) { void Engine::Abort(const Transaction &t) {
VLOG(11) << "[Tx] Aborting transaction " << t.id_; VLOG(11) << "[Tx] Aborting transaction " << t.id_;
raft_->Emplace(database::StateDelta::TxAbort(t.id_)); delta_buffer_->Erase(t.id_);
std::lock_guard<utils::SpinLock> guard(lock_); std::lock_guard<utils::SpinLock> guard(lock_);
clog_->set_aborted(t.id_); clog_->set_aborted(t.id_);
active_.remove(t.id_); active_.remove(t.id_);
@ -257,7 +298,7 @@ Transaction *Engine::BeginTransaction(bool blocking) {
Transaction *t = new Transaction(id, active_, *this, blocking); Transaction *t = new Transaction(id, active_, *this, blocking);
active_.insert(id); active_.insert(id);
store_.emplace(id, t); store_.emplace(id, t);
raft_->Emplace(database::StateDelta::TxBegin(id)); delta_buffer_->Emplace(database::StateDelta::TxBegin(id));
return t; return t;
} }

View File

@ -8,6 +8,7 @@
#include <unordered_set> #include <unordered_set>
#include "raft/raft_interface.hpp" #include "raft/raft_interface.hpp"
#include "storage/single_node_ha/state_delta_buffer.hpp"
#include "transactions/commit_log.hpp" #include "transactions/commit_log.hpp"
#include "transactions/transaction.hpp" #include "transactions/transaction.hpp"
#include "utils/spin_lock.hpp" #include "utils/spin_lock.hpp"
@ -24,7 +25,7 @@ class TransactionEngineError : public utils::BasicException {
/// information needed for raft followers when replicating logs. /// information needed for raft followers when replicating logs.
class Engine final { class Engine final {
public: public:
explicit Engine(raft::RaftInterface *raft); Engine(raft::RaftInterface *raft, storage::StateDeltaBuffer *delta_buffer);
Engine(const Engine &) = delete; Engine(const Engine &) = delete;
Engine(Engine &&) = delete; Engine(Engine &&) = delete;
@ -72,6 +73,7 @@ class Engine final {
Snapshot active_; Snapshot active_;
mutable utils::SpinLock lock_; mutable utils::SpinLock lock_;
raft::RaftInterface *raft_{nullptr}; raft::RaftInterface *raft_{nullptr};
storage::StateDeltaBuffer *delta_buffer_{nullptr};
std::atomic<bool> accepting_transactions_{true}; std::atomic<bool> accepting_transactions_{true};
std::atomic<bool> reset_active_{false}; std::atomic<bool> reset_active_{false};

View File

@ -202,9 +202,6 @@ target_link_libraries(${test_prefix}stripped mg-single-node kvstore_dummy_lib)
add_unit_test(transaction_engine_single_node.cpp) add_unit_test(transaction_engine_single_node.cpp)
target_link_libraries(${test_prefix}transaction_engine_single_node mg-single-node kvstore_dummy_lib) target_link_libraries(${test_prefix}transaction_engine_single_node mg-single-node kvstore_dummy_lib)
add_unit_test(transaction_engine_single_node_ha.cpp)
target_link_libraries(${test_prefix}transaction_engine_single_node_ha mg-single-node-ha kvstore_dummy_lib)
add_unit_test(typed_value.cpp) add_unit_test(typed_value.cpp)
target_link_libraries(${test_prefix}typed_value mg-single-node kvstore_dummy_lib) target_link_libraries(${test_prefix}typed_value mg-single-node kvstore_dummy_lib)

View File

@ -1,76 +0,0 @@
#include "gtest/gtest.h"
#include <unordered_map>
#include <vector>
#include "durability/single_node_ha/state_delta.hpp"
#include "raft/raft_interface.hpp"
#include "transactions/single_node_ha/engine.hpp"
#include "transactions/transaction.hpp"
using namespace tx;
class RaftMock final : public raft::RaftInterface {
public:
raft::DeltaStatus Emplace(const database::StateDelta &delta) override {
log_[delta.transaction_id].emplace_back(std::move(delta));
return {true, std::nullopt};
}
bool SafeToCommit(const tx::TransactionId &) override {
return true;
}
bool IsLeader() override { return true; }
uint64_t TermId() override { return 1; }
raft::TxStatus TransactionStatus(uint64_t term_id,
uint64_t log_index) override {
return raft::TxStatus::REPLICATED;
}
std::vector<database::StateDelta> GetLogForTx(
const tx::TransactionId &tx_id) {
return log_[tx_id];
}
std::mutex &WithLock() override { return lock_; }
private:
std::unordered_map<tx::TransactionId, std::vector<database::StateDelta>> log_;
std::mutex lock_;
};
TEST(Engine, Reset) {
RaftMock raft;
Engine engine{&raft};
auto t0 = engine.Begin();
EXPECT_EQ(t0->id_, 1);
engine.Commit(*t0);
engine.Reset();
auto t1 = engine.Begin();
EXPECT_EQ(t1->id_, 1);
engine.Commit(*t1);
}
TEST(Engine, TxStateDelta) {
RaftMock raft;
Engine engine{&raft};
auto t0 = engine.Begin();
tx::TransactionId tx_id = t0->id_;
engine.Commit(*t0);
auto t0_log = raft.GetLogForTx(tx_id);
EXPECT_EQ(t0_log.size(), 2);
using Type = enum database::StateDelta::Type;
EXPECT_EQ(t0_log[0].type, Type::TRANSACTION_BEGIN);
EXPECT_EQ(t0_log[0].transaction_id, tx_id);
EXPECT_EQ(t0_log[1].type, Type::TRANSACTION_COMMIT);
EXPECT_EQ(t0_log[1].transaction_id, tx_id);
}