diff --git a/docs/feature_spec/high_availability.md b/docs/feature_spec/high_availability.md index 3b56f6e63..8b15a646a 100644 --- a/docs/feature_spec/high_availability.md +++ b/docs/feature_spec/high_availability.md @@ -129,9 +129,10 @@ the following scenario: The problem lies in the fact that there is still a record within the internal storage of our old leader with the same transaction ID and GID as the recently -committed record by the new leader. Obviously, this is broken. As a solution, we -will prefix the transaction ID with the Raft term, thus ensuring that the -transaction IDs will differ. +committed record by the new leader. Obviously, this is broken. As a solution, on +each transition from `Leader` to `Follower`, we will reinitialize storage, reset +the transaction engine and recover data from the Raft log. This will ensure all +ongoing transactions which have "polluted" the storage will be gone. "When will followers append that transaction to their commit logs?" diff --git a/src/database/single_node_ha/graph_db.cpp b/src/database/single_node_ha/graph_db.cpp index c23876ab7..705634637 100644 --- a/src/database/single_node_ha/graph_db.cpp +++ b/src/database/single_node_ha/graph_db.cpp @@ -84,7 +84,7 @@ std::unique_ptr GraphDb::AccessBlocking( Storage &GraphDb::storage() { return *storage_; } -raft::RaftServer &GraphDb::raft_server() { return raft_server_; } +raft::RaftInterface *GraphDb::raft() { return &raft_server_; } tx::Engine &GraphDb::tx_engine() { return tx_engine_; } @@ -116,10 +116,15 @@ bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) { return status; } -void GraphDb::ReinitializeStorage() { +void GraphDb::Reset() { // Release gc scheduler to stop it from touching storage storage_gc_ = nullptr; storage_ = std::make_unique(config_.properties_on_disk); + + // This will make all active transactions to abort and reset the internal + // state. + tx_engine_.Reset(); + storage_gc_ = std::make_unique(*storage_, tx_engine_, config_.gc_cycle_sec); } diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index 3b2ea8405..a9869a835 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -109,7 +109,7 @@ class GraphDb { std::unique_ptr Access(tx::TransactionId); Storage &storage(); - raft::RaftServer &raft_server(); + raft::RaftInterface *raft(); tx::Engine &tx_engine(); storage::ConcurrentIdMapper &label_mapper(); storage::ConcurrentIdMapper &edge_type_mapper(); @@ -120,11 +120,12 @@ class GraphDb { /// Makes a snapshot from the visibility of the given accessor bool MakeSnapshot(GraphDbAccessor &accessor); - /// Releases the storage object safely and creates a new object. - /// This is needed because of recovery, otherwise we might try to recover - /// into a storage which has already been polluted because of a failed - /// previous recovery - void ReinitializeStorage(); + /// Releases the storage object safely and creates a new object, resets the tx + /// engine. + /// + /// This is needed in HA during the leader -> follower transition where we + /// might end up with some stale transactions on the leader. + void Reset(); /// When this is false, no new transactions should be created. bool is_accepting_transactions() const { return is_accepting_transactions_; } @@ -164,7 +165,8 @@ class GraphDb { raft::Coordination::LoadFromFile(config_.coordination_config_file)}; raft::RaftServer raft_server_{ config_.server_id, config_.durability_directory, - raft::Config::LoadFromFile(config_.raft_config_file), &coordination_}; + raft::Config::LoadFromFile(config_.raft_config_file), &coordination_, + [this]() { this->Reset(); }}; tx::Engine tx_engine_{&raft_server_}; std::unique_ptr storage_gc_ = std::make_unique(*storage_, tx_engine_, config_.gc_cycle_sec); diff --git a/src/database/single_node_ha/graph_db_accessor.cpp b/src/database/single_node_ha/graph_db_accessor.cpp index e98e4f50f..1a54fb6c5 100644 --- a/src/database/single_node_ha/graph_db_accessor.cpp +++ b/src/database/single_node_ha/graph_db_accessor.cpp @@ -63,7 +63,7 @@ bool GraphDbAccessor::should_abort() const { return transaction_.should_abort(); } -raft::RaftServer &GraphDbAccessor::raft_server() { return db_.raft_server(); } +raft::RaftInterface *GraphDbAccessor::raft() { return db_.raft(); } VertexAccessor GraphDbAccessor::InsertVertex( std::experimental::optional requested_gid) { @@ -76,7 +76,7 @@ VertexAccessor GraphDbAccessor::InsertVertex( db_.storage().vertices_.access().insert(gid, vertex_vlist).second; CHECK(success) << "Attempting to insert a vertex with an existing GID: " << gid; - raft_server().Emplace( + raft()->Emplace( database::StateDelta::CreateVertex(transaction_.id_, vertex_vlist->gid_)); auto va = VertexAccessor(vertex_vlist, *this); return va; @@ -143,7 +143,7 @@ void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) { // Commit transaction as we finished applying method on newest visible // records. Write that transaction's ID to the RaftServer as the index has been // built at this point even if this DBA's transaction aborts for some reason. - raft_server().Emplace(database::StateDelta::BuildIndex( + raft()->Emplace(database::StateDelta::BuildIndex( transaction_id(), key.label_, LabelName(key.label_), key.property_, PropertyName(key.property_), key.unique_)); } @@ -170,7 +170,7 @@ void GraphDbAccessor::DeleteIndex(storage::Label label, db_.AccessBlocking(std::experimental::make_optional(transaction_.id_)); db_.storage().label_property_index_.DeleteIndex(key); - dba->raft_server().Emplace(database::StateDelta::DropIndex( + dba->raft()->Emplace(database::StateDelta::DropIndex( dba->transaction_id(), key.label_, LabelName(key.label_), key.property_, PropertyName(key.property_))); @@ -290,7 +290,7 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor, return false; auto *vlist_ptr = vertex_accessor.address(); - raft_server().Emplace(database::StateDelta::RemoveVertex( + raft()->Emplace(database::StateDelta::RemoveVertex( transaction_.id_, vlist_ptr->gid_, check_empty)); vlist_ptr->remove(vertex_accessor.current_, transaction_); return true; @@ -337,7 +337,7 @@ EdgeAccessor GraphDbAccessor::InsertEdge( to.SwitchNew(); to.update().in_.emplace(from.address(), edge_vlist, edge_type); - raft_server().Emplace(database::StateDelta::CreateEdge( + raft()->Emplace(database::StateDelta::CreateEdge( transaction_.id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type, EdgeTypeName(edge_type))); @@ -362,7 +362,7 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge, if (remove_in_edge) edge.to().RemoveInEdge(edge.address()); edge.address()->remove(edge.current_, transaction_); - raft_server().Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid())); + raft()->Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid())); } storage::Label GraphDbAccessor::Label(const std::string &label_name) { diff --git a/src/database/single_node_ha/graph_db_accessor.hpp b/src/database/single_node_ha/graph_db_accessor.hpp index 49bbc8d20..2c909477d 100644 --- a/src/database/single_node_ha/graph_db_accessor.hpp +++ b/src/database/single_node_ha/graph_db_accessor.hpp @@ -11,6 +11,7 @@ #include #include "database/single_node_ha/graph_db.hpp" +#include "raft/raft_interface.hpp" #include "storage/common/types/types.hpp" #include "storage/single_node_ha/edge_accessor.hpp" #include "storage/single_node_ha/vertex_accessor.hpp" @@ -185,9 +186,7 @@ class GraphDbAccessor { auto Vertices(storage::Label label, bool current_state) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; return iter::imap( - [this](auto vlist) { - return VertexAccessor(vlist, *this); - }, + [this](auto vlist) { return VertexAccessor(vlist, *this); }, db_.storage().labels_index_.GetVlists(label, transaction_, current_state)); } @@ -211,9 +210,7 @@ class GraphDbAccessor { LabelPropertyIndex::Key(label, property))) << "Label+property index doesn't exist."; return iter::imap( - [this](auto vlist) { - return VertexAccessor(vlist, *this); - }, + [this](auto vlist) { return VertexAccessor(vlist, *this); }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), transaction_, current_state)); @@ -241,9 +238,7 @@ class GraphDbAccessor { CHECK(value.type() != PropertyValue::Type::Null) << "Can't query index for propery value type null."; return iter::imap( - [this](auto vlist) { - return VertexAccessor(vlist, *this); - }, + [this](auto vlist) { return VertexAccessor(vlist, *this); }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), value, transaction_, current_state)); @@ -286,9 +281,7 @@ class GraphDbAccessor { LabelPropertyIndex::Key(label, property))) << "Label+property index doesn't exist."; return iter::imap( - [this](auto vlist) { - return VertexAccessor(vlist, *this); - }, + [this](auto vlist) { return VertexAccessor(vlist, *this); }, db_.storage().label_property_index_.GetVlists( LabelPropertyIndex::Key(label, property), lower, upper, transaction_, current_state)); @@ -374,9 +367,7 @@ class GraphDbAccessor { // wrap version lists into accessors, which will look for visible versions auto accessors = iter::imap( - [this](auto id_vlist) { - return EdgeAccessor(id_vlist.second, *this); - }, + [this](auto id_vlist) { return EdgeAccessor(id_vlist.second, *this); }, db_.storage().edges_.access()); // filter out the accessors not visible to the current transaction @@ -446,7 +437,7 @@ class GraphDbAccessor { /// Populates index with vertices containing the key void PopulateIndex(const LabelPropertyIndex::Key &key); - /// Writes Index (key) creation to RaftServer, marks it as ready for usage + /// Writes Index (key) creation to Raft, marks it as ready for usage void EnableIndex(const LabelPropertyIndex::Key &key); /** @@ -583,7 +574,7 @@ class GraphDbAccessor { bool should_abort() const; const tx::Transaction &transaction() const { return transaction_; } - raft::RaftServer &raft_server(); + raft::RaftInterface *raft(); auto &db() { return db_; } const auto &db() const { return db_; } diff --git a/src/raft/raft_interface.hpp b/src/raft/raft_interface.hpp new file mode 100644 index 000000000..a4430c7ec --- /dev/null +++ b/src/raft/raft_interface.hpp @@ -0,0 +1,20 @@ +/// @file + +#pragma once + +#include "durability/single_node_ha/state_delta.hpp" + +namespace raft { + +/// Exposes only functionality that other parts of Memgraph can interact with, +/// emplacing a state delta into the appropriate Raft log entry. +class RaftInterface { + public: + /// Add StateDelta to the appropriate Raft log entry. + virtual void Emplace(const database::StateDelta &) = 0; + + protected: + ~RaftInterface() {} +}; + +} // namespace raft diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index d3730f8ef..04dbeb5ea 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -1,7 +1,7 @@ #include "raft/raft_server.hpp" -#include #include +#include #include #include @@ -15,18 +15,19 @@ namespace raft { namespace fs = std::experimental::filesystem; const std::string kCurrentTermKey = "current_term"; -const std::string kVotedForKey = "voted_for"; -const std::string kLogKey = "log"; -const std::string kRaftDir = "raft"; +const std::string kVotedForKey = "voted_for"; +const std::string kLogKey = "log"; +const std::string kRaftDir = "raft"; RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, - const Config &config, Coordination *coordination) + const Config &config, Coordination *coordination, + std::function reset_callback) : config_(config), coordination_(coordination), mode_(Mode::FOLLOWER), server_id_(server_id), - disk_storage_(fs::path(durability_dir) / kRaftDir) { - + disk_storage_(fs::path(durability_dir) / kRaftDir), + reset_callback_(reset_callback) { // Persistent storage initialization/recovery. if (Log().empty()) { disk_storage_.Put(kCurrentTermKey, "0"); @@ -74,70 +75,68 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, Save(res, res_builder); }); - coordination_->Register( - [this](const auto &req_reader, auto *res_builder) { - std::lock_guard guard(lock_); - AppendEntriesReq req; - Load(&req, req_reader); + coordination_->Register([this](const auto &req_reader, + auto *res_builder) { + std::lock_guard guard(lock_); + AppendEntriesReq req; + Load(&req, req_reader); - // [Raft paper 5.1] - // "If a server recieves a request with a stale term, - // it rejects the request" - uint64_t current_term = CurrentTerm(); - if (req.term < current_term) { - AppendEntriesRes res(false, current_term); - Save(res, res_builder); - return; - } + // [Raft paper 5.1] + // "If a server recieves a request with a stale term, + // it rejects the request" + uint64_t current_term = CurrentTerm(); + if (req.term < current_term) { + AppendEntriesRes res(false, current_term); + Save(res, res_builder); + return; + } - // respond positively to a heartbeat. - // TODO(ipaljak) review this when implementing log replication. - if (req.entries.empty()) { - AppendEntriesRes res(true, current_term); - Save(res, res_builder); - if (mode_ != Mode::FOLLOWER) { - Transition(Mode::FOLLOWER); - state_changed_.notify_all(); - } else { - SetNextElectionTimePoint(); - } - return; - } + // respond positively to a heartbeat. + // TODO(ipaljak) review this when implementing log replication. + if (req.entries.empty()) { + AppendEntriesRes res(true, current_term); + Save(res, res_builder); + if (mode_ != Mode::FOLLOWER) { + Transition(Mode::FOLLOWER); + state_changed_.notify_all(); + } else { + SetNextElectionTimePoint(); + } + return; + } - throw utils::NotYetImplemented( - "AppendEntriesRpc which is not a heartbeat"); + throw utils::NotYetImplemented("AppendEntriesRpc which is not a heartbeat"); - // [Raft paper 5.3] - // "If a follower's log is inconsistent with the leader's, the - // consistency check will fail in the next AppendEntries RPC." - // - // Consistency checking assures the Log Matching Property: - // - If two entries in different logs have the same index and - // term, then they store the same command. - // - If two entries in different logs have the same index and term, - // then the logs are identical in all preceding entries. - auto log = Log(); - if (log.size() < req.prev_log_index || - log[req.prev_log_index - 1].term != req.prev_log_term) { - AppendEntriesRes res(false, current_term); - Save(res, res_builder); - return; - } + // [Raft paper 5.3] + // "If a follower's log is inconsistent with the leader's, the + // consistency check will fail in the next AppendEntries RPC." + // + // Consistency checking assures the Log Matching Property: + // - If two entries in different logs have the same index and + // term, then they store the same command. + // - If two entries in different logs have the same index and term, + // then the logs are identical in all preceding entries. + auto log = Log(); + if (log.size() < req.prev_log_index || + log[req.prev_log_index - 1].term != req.prev_log_term) { + AppendEntriesRes res(false, current_term); + Save(res, res_builder); + return; + } - // If existing entry conflicts with new one, we need to delete the - // existing entry and all that follow it. - if (log.size() > req.prev_log_index && - log[req.prev_log_index].term != req.entries[0].term) - DeleteLogSuffix(req.prev_log_index + 1); + // If existing entry conflicts with new one, we need to delete the + // existing entry and all that follow it. + if (log.size() > req.prev_log_index && + log[req.prev_log_index].term != req.entries[0].term) + DeleteLogSuffix(req.prev_log_index + 1); - AppendLogEntries(req.leader_commit, req.entries); - AppendEntriesRes res(true, current_term); - Save(res, res_builder); - }); + AppendLogEntries(req.leader_commit, req.entries); + AppendEntriesRes res(true, current_term); + Save(res, res_builder); + }); SetNextElectionTimePoint(); - election_thread_ = - std::thread(&RaftServer::ElectionThreadMain, this); + election_thread_ = std::thread(&RaftServer::ElectionThreadMain, this); for (const auto &peer_id : coordination_->GetWorkerIds()) { if (peer_id == server_id_) continue; @@ -152,12 +151,10 @@ RaftServer::~RaftServer() { election_change_.notify_all(); for (auto &peer_thread : peer_threads_) { - if (peer_thread.joinable()) - peer_thread.join(); + if (peer_thread.joinable()) peer_thread.join(); } - if (election_thread_.joinable()) - election_thread_.join(); + if (election_thread_.joinable()) election_thread_.join(); } uint64_t RaftServer::CurrentTerm() { @@ -251,16 +248,19 @@ bool RaftServer::LogEntryBuffer::IsStateDeltaTransactionEnd( void RaftServer::Transition(const Mode &new_mode) { switch (new_mode) { case Mode::FOLLOWER: { + if (mode_ == Mode::LEADER) { + reset_callback_(); + } LOG(INFO) << "Server " << server_id_ << ": Transition to FOLLOWER"; SetNextElectionTimePoint(); mode_ = Mode::FOLLOWER; - //log_entry_buffer_.Disable(); + // log_entry_buffer_.Disable(); break; } case Mode::CANDIDATE: { LOG(INFO) << "Server " << server_id_ << ": Transition to CANDIDATE"; - //log_entry_buffer_.Disable(); + // log_entry_buffer_.Disable(); // [Raft thesis, section 3.4] // "Each candidate restarts its randomized election timeout at the start @@ -294,7 +294,7 @@ void RaftServer::Transition(const Mode &new_mode) { case Mode::LEADER: { LOG(INFO) << "Server " << server_id_ << ": Transition to LEADER"; - //log_entry_buffer_.Enable(); + // log_entry_buffer_.Enable(); // Freeze election timer next_election_ = TimePoint::max(); @@ -493,7 +493,7 @@ bool RaftServer::OutOfSync(uint64_t reply_term) { void RaftServer::DeleteLogSuffix(int starting_index) { auto log = Log(); - log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed + log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed disk_storage_.Put(kLogKey, SerializeLog(log)); } diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 2a5f747cf..d9c61bcee 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -11,6 +11,7 @@ #include "raft/coordination.hpp" #include "raft/log_entry.hpp" #include "raft/raft_rpc_messages.hpp" +#include "raft/raft_interface.hpp" #include "storage/common/kvstore/kvstore.hpp" #include "transactions/type.hpp" #include "utils/scheduler.hpp" @@ -22,11 +23,22 @@ using TimePoint = std::chrono::system_clock::time_point; enum class Mode { FOLLOWER, CANDIDATE, LEADER }; +inline std::string ModeToString(const Mode &mode) { + switch (mode) { + case Mode::FOLLOWER: + return "FOLLOWER"; + case Mode::CANDIDATE: + return "CANDIDATE"; + case Mode::LEADER: + return "LEADER"; + } +} + /// Class which models the behaviour of a single server within the Raft /// cluster. The class is responsible for storing both volatile and /// persistent internal state of the corresponding state machine as well /// as performing operations that comply with the Raft protocol. -class RaftServer { +class RaftServer final : public RaftInterface { public: RaftServer() = delete; @@ -37,8 +49,11 @@ class RaftServer { /// @param durbility_dir directory for persisted data. /// @param config raft configuration. /// @param coordination Abstraction for coordination between Raft servers. + /// @param reset_callback Function that is called on each Leader->Follower + /// transition. RaftServer(uint16_t server_id, const std::string &durability_dir, - const Config &config, raft::Coordination *coordination); + const Config &config, raft::Coordination *coordination, + std::function reset_callback); ~RaftServer(); @@ -57,11 +72,12 @@ class RaftServer { /// persistent storage, an empty Log will be created. std::vector Log(); - // TODO(msantl): document + /// Start replicating StateDeltas batched together in a Raft log. void Replicate(const std::vector &log); - // TODO(msantl): document - void Emplace(const database::StateDelta &delta); + /// Emplace a single StateDelta to the corresponding batch. If the StateDelta + /// marks the transaction end, it will replicate the log accorss the cluster. + void Emplace(const database::StateDelta &delta) override; private: /// Buffers incomplete Raft logs. @@ -176,6 +192,9 @@ class RaftServer { storage::KVStore disk_storage_; + /// Callback that needs to be called to reset the db state. + std::function reset_callback_; + /// Makes a transition to a new `raft::Mode`. /// /// throws InvalidTransitionException when transitioning between incompatible diff --git a/src/storage/single_node_ha/record_accessor.cpp b/src/storage/single_node_ha/record_accessor.cpp index d3e2d200c..40731da01 100644 --- a/src/storage/single_node_ha/record_accessor.cpp +++ b/src/storage/single_node_ha/record_accessor.cpp @@ -27,7 +27,7 @@ void RecordAccessor::PropsSet(storage::Property key, dba.PropertyName(key), value); update().properties_.set(key, value); dba.UpdatePropertyIndex(key, *this, &update()); - db_accessor().raft_server().Emplace(delta); + db_accessor().raft()->Emplace(delta); } template <> @@ -38,7 +38,7 @@ void RecordAccessor::PropsSet(storage::Property key, dba.PropertyName(key), value); update().properties_.set(key, value); - db_accessor().raft_server().Emplace(delta); + db_accessor().raft()->Emplace(delta); } template <> @@ -48,7 +48,7 @@ void RecordAccessor::PropsErase(storage::Property key) { StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); update().properties_.set(key, PropertyValue::Null); - db_accessor().raft_server().Emplace(delta); + db_accessor().raft()->Emplace(delta); } template <> @@ -58,7 +58,7 @@ void RecordAccessor::PropsErase(storage::Property key) { StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); update().properties_.set(key, PropertyValue::Null); - db_accessor().raft_server().Emplace(delta); + db_accessor().raft()->Emplace(delta); } template diff --git a/src/storage/single_node_ha/vertex_accessor.cpp b/src/storage/single_node_ha/vertex_accessor.cpp index ecd0d997d..63ce36f80 100644 --- a/src/storage/single_node_ha/vertex_accessor.cpp +++ b/src/storage/single_node_ha/vertex_accessor.cpp @@ -24,7 +24,7 @@ void VertexAccessor::add_label(storage::Label label) { // not a duplicate label, add it if (!utils::Contains(vertex.labels_, label)) { vertex.labels_.emplace_back(label); - dba.raft_server().Emplace(delta); + dba.raft()->Emplace(delta); dba.UpdateLabelIndices(label, *this, &vertex); } } @@ -39,7 +39,7 @@ void VertexAccessor::remove_label(storage::Label label) { auto found = std::find(labels.begin(), labels.end(), delta.label); std::swap(*found, labels.back()); labels.pop_back(); - dba.raft_server().Emplace(delta); + dba.raft()->Emplace(delta); } } diff --git a/src/transactions/single_node_ha/engine.cpp b/src/transactions/single_node_ha/engine.cpp index cfd4315bd..b7c633daf 100644 --- a/src/transactions/single_node_ha/engine.cpp +++ b/src/transactions/single_node_ha/engine.cpp @@ -6,12 +6,12 @@ #include "glog/logging.h" #include "durability/single_node_ha/state_delta.hpp" -#include "raft/raft_server.hpp" namespace tx { -Engine::Engine(raft::RaftServer *raft_server) : raft_server_(raft_server) { - CHECK(raft_server) << "LogBuffer can't be nullptr in HA"; +Engine::Engine(raft::RaftInterface *raft) + : clog_(std::make_unique()), raft_(raft) { + CHECK(raft) << "Raft can't be nullptr in HA"; } Transaction *Engine::Begin() { @@ -32,7 +32,7 @@ Transaction *Engine::BeginBlocking( if (!accepting_transactions_.load()) throw TransactionEngineError("Engine is not accepting new transactions"); - // Block the engine from acceping new transactions. + // Block the engine from accepting new transactions. accepting_transactions_.store(false); // Set active transactions to abort ASAP. @@ -79,9 +79,9 @@ CommandId Engine::UpdateCommand(TransactionId id) { void Engine::Commit(const Transaction &t) { VLOG(11) << "[Tx] Commiting transaction " << t.id_; std::lock_guard guard(lock_); - clog_.set_committed(t.id_); + clog_->set_committed(t.id_); active_.remove(t.id_); - raft_server_->Emplace(database::StateDelta::TxCommit(t.id_)); + raft_->Emplace(database::StateDelta::TxCommit(t.id_)); store_.erase(store_.find(t.id_)); if (t.blocking()) { accepting_transactions_.store(true); @@ -91,9 +91,9 @@ void Engine::Commit(const Transaction &t) { void Engine::Abort(const Transaction &t) { VLOG(11) << "[Tx] Aborting transaction " << t.id_; std::lock_guard guard(lock_); - clog_.set_aborted(t.id_); + clog_->set_aborted(t.id_); active_.remove(t.id_); - raft_server_->Emplace(database::StateDelta::TxAbort(t.id_)); + raft_->Emplace(database::StateDelta::TxAbort(t.id_)); store_.erase(store_.find(t.id_)); if (t.blocking()) { accepting_transactions_.store(true); @@ -101,7 +101,7 @@ void Engine::Abort(const Transaction &t) { } CommitLog::Info Engine::Info(TransactionId tx) const { - return clog_.fetch_info(tx); + return clog_->fetch_info(tx); } Snapshot Engine::GlobalGcSnapshot() { @@ -139,7 +139,7 @@ TransactionId Engine::LocalOldestActive() const { } void Engine::GarbageCollectCommitLog(TransactionId tx_id) { - clog_.garbage_collect_older(tx_id); + clog_->garbage_collect_older(tx_id); } void Engine::LocalForEachActiveTransaction( @@ -163,12 +163,50 @@ void Engine::EnsureNextIdGreater(TransactionId tx_id) { counter_ = std::max(tx_id, counter_); } +void Engine::Reset() { + Snapshot wait_for_txs; + { + std::lock_guard guard(lock_); + + // Block the engine from accepting new transactions. + accepting_transactions_.store(false); + + // Set active transactions to abort ASAP. + for (auto transaction : active_) { + store_.find(transaction)->second->set_should_abort(); + } + + wait_for_txs = active_; + } + + // Wait for all active transactions to end. + for (auto id : wait_for_txs) { + while (Info(id).is_active()) { + // TODO reconsider this constant, currently rule-of-thumb chosen + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + } + + // Only after all transactions have finished, reset the engine. + std::lock_guard guard(lock_); + counter_ = 0; + store_.clear(); + active_.clear(); + { + clog_ = nullptr; + clog_ = std::make_unique(); + } + // local_lock_graph_ should be empty because all transactions should've finish + // by now. + accepting_transactions_.store(true); +} + Transaction *Engine::BeginTransaction(bool blocking) { TransactionId id{++counter_}; Transaction *t = new Transaction(id, active_, *this, blocking); active_.insert(id); store_.emplace(id, t); - raft_server_->Emplace(database::StateDelta::TxBegin(id)); + raft_->Emplace(database::StateDelta::TxBegin(id)); return t; } diff --git a/src/transactions/single_node_ha/engine.hpp b/src/transactions/single_node_ha/engine.hpp index d8ae36f0e..20d332836 100644 --- a/src/transactions/single_node_ha/engine.hpp +++ b/src/transactions/single_node_ha/engine.hpp @@ -6,15 +6,11 @@ #include #include +#include "raft/raft_interface.hpp" #include "transactions/commit_log.hpp" #include "transactions/transaction.hpp" #include "utils/thread/sync.hpp" -// Forward declarations. -namespace raft { - class RaftServer; -} - namespace tx { class TransactionEngineError : public utils::BasicException { @@ -23,11 +19,11 @@ class TransactionEngineError : public utils::BasicException { /// High availability single node transaction engine. /// -/// Requires RaftServer where it stores StateDeltas containing transaction +/// Requires RaftInterface where it stores StateDeltas containing transaction /// information needed for raft followers when replicating logs. class Engine final { public: - explicit Engine(raft::RaftServer *log_buffer); + explicit Engine(raft::RaftInterface *raft); Engine(const Engine &) = delete; Engine(Engine &&) = delete; @@ -35,10 +31,10 @@ class Engine final { Engine &operator=(Engine &&) = delete; Transaction *Begin(); - /// Blocking transactions are used when we can't allow any other transaction to - /// run (besides this one). This is the reason why this transactions blocks the - /// engine from creating new transactions and waits for the existing ones to - /// finish. + /// Blocking transactions are used when we can't allow any other transaction + /// to run (besides this one). This is the reason why this transactions blocks + /// the engine from creating new transactions and waits for the existing ones + /// to finish. Transaction *BeginBlocking( std::experimental::optional parent_tx); CommandId Advance(TransactionId id); @@ -59,6 +55,12 @@ class Engine final { auto &local_lock_graph() { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; } + /// Reset the internal state of the engine. Use with caution as this will + /// block the engine from receiving any new transaction and will hint all + /// transactions to abort and will wait for them to finish before reseting + /// engines internal state. + void Reset(); + private: // Map lock dependencies. Each entry maps (tx_that_wants_lock, // tx_that_holds_lock). Used for local deadlock resolution. @@ -66,11 +68,11 @@ class Engine final { ConcurrentMap local_lock_graph_; TransactionId counter_{0}; - CommitLog clog_; + std::unique_ptr clog_{nullptr}; std::unordered_map> store_; Snapshot active_; mutable utils::SpinLock lock_; - raft::RaftServer *raft_server_{nullptr}; + raft::RaftInterface *raft_{nullptr}; std::atomic accepting_transactions_{true}; // Helper method for transaction begin. diff --git a/src/transactions/snapshot.hpp b/src/transactions/snapshot.hpp index ef6d493e5..80c0776c4 100644 --- a/src/transactions/snapshot.hpp +++ b/src/transactions/snapshot.hpp @@ -56,6 +56,11 @@ class Snapshot final { transaction_ids_.erase(last, transaction_ids_.end()); } + /// Removes all transactions from this Snapshot. + void clear() { + transaction_ids_.clear(); + } + TransactionId front() const { DCHECK(transaction_ids_.size()) << "Snapshot.front() on empty Snapshot"; return transaction_ids_.front(); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 067721905..cfbb7e50b 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -246,6 +246,9 @@ target_link_libraries(${test_prefix}transaction_engine_distributed mg-distribute add_unit_test(transaction_engine_single_node.cpp) target_link_libraries(${test_prefix}transaction_engine_single_node mg-single-node kvstore_dummy_lib) +add_unit_test(transaction_engine_single_node_ha.cpp) +target_link_libraries(${test_prefix}transaction_engine_single_node_ha mg-single-node-ha kvstore_dummy_lib) + add_unit_test(typed_value.cpp) target_link_libraries(${test_prefix}typed_value mg-single-node kvstore_dummy_lib) diff --git a/tests/unit/transaction_engine_single_node_ha.cpp b/tests/unit/transaction_engine_single_node_ha.cpp new file mode 100644 index 000000000..2b980c959 --- /dev/null +++ b/tests/unit/transaction_engine_single_node_ha.cpp @@ -0,0 +1,59 @@ +#include "gtest/gtest.h" + +#include +#include + +#include "durability/single_node_ha/state_delta.hpp" +#include "raft/raft_interface.hpp" +#include "transactions/single_node_ha/engine.hpp" +#include "transactions/transaction.hpp" + +using namespace tx; + +class RaftMock final : public raft::RaftInterface { + public: + void Emplace(const database::StateDelta &delta) override { + log_[delta.transaction_id].emplace_back(std::move(delta)); + } + + std::vector GetLogForTx( + const tx::TransactionId &tx_id) { + return log_[tx_id]; + } + + private: + std::unordered_map> log_; +}; + +TEST(Engine, Reset) { + RaftMock raft; + Engine engine{&raft}; + + auto t0 = engine.Begin(); + EXPECT_EQ(t0->id_, 1); + engine.Commit(*t0); + + engine.Reset(); + + auto t1 = engine.Begin(); + EXPECT_EQ(t1->id_, 1); + engine.Commit(*t1); +} + +TEST(Engine, TxStateDelta) { + RaftMock raft; + Engine engine{&raft}; + + auto t0 = engine.Begin(); + tx::TransactionId tx_id = t0->id_; + engine.Commit(*t0); + + auto t0_log = raft.GetLogForTx(tx_id); + EXPECT_EQ(t0_log.size(), 2); + + using Type = enum database::StateDelta::Type; + EXPECT_EQ(t0_log[0].type, Type::TRANSACTION_BEGIN); + EXPECT_EQ(t0_log[0].transaction_id, tx_id); + EXPECT_EQ(t0_log[1].type, Type::TRANSACTION_COMMIT); + EXPECT_EQ(t0_log[1].transaction_id, tx_id); +}