Wire raft into memgraph pt.1.

Summary:
This is just the first diff that tries to wire the raft protocol into
memgraph.

In this diff I'm introducing transaction engine reset functionality. I also
introduced `RaftInterface` which should be used wherever someone wants to access
Raft from Memgraph.

For design decisions see the feature spec.

Reviewers: ipaljak, teon.banek

Reviewed By: ipaljak

Subscribers: pullbot, teon.banek

Differential Revision: https://phabricator.memgraph.io/D1758
This commit is contained in:
Matija Santl 2018-11-30 14:32:32 +01:00
parent 26d23959d3
commit f501980973
15 changed files with 287 additions and 142 deletions

View File

@ -129,9 +129,10 @@ the following scenario:
The problem lies in the fact that there is still a record within the internal
storage of our old leader with the same transaction ID and GID as the recently
committed record by the new leader. Obviously, this is broken. As a solution, we
will prefix the transaction ID with the Raft term, thus ensuring that the
transaction IDs will differ.
committed record by the new leader. Obviously, this is broken. As a solution, on
each transition from `Leader` to `Follower`, we will reinitialize storage, reset
the transaction engine and recover data from the Raft log. This will ensure all
ongoing transactions which have "polluted" the storage will be gone.
"When will followers append that transaction to their commit logs?"

View File

@ -84,7 +84,7 @@ std::unique_ptr<GraphDbAccessor> GraphDb::AccessBlocking(
Storage &GraphDb::storage() { return *storage_; }
raft::RaftServer &GraphDb::raft_server() { return raft_server_; }
raft::RaftInterface *GraphDb::raft() { return &raft_server_; }
tx::Engine &GraphDb::tx_engine() { return tx_engine_; }
@ -116,10 +116,15 @@ bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) {
return status;
}
void GraphDb::ReinitializeStorage() {
void GraphDb::Reset() {
// Release gc scheduler to stop it from touching storage
storage_gc_ = nullptr;
storage_ = std::make_unique<Storage>(config_.properties_on_disk);
// This will make all active transactions to abort and reset the internal
// state.
tx_engine_.Reset();
storage_gc_ =
std::make_unique<StorageGc>(*storage_, tx_engine_, config_.gc_cycle_sec);
}

View File

@ -109,7 +109,7 @@ class GraphDb {
std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId);
Storage &storage();
raft::RaftServer &raft_server();
raft::RaftInterface *raft();
tx::Engine &tx_engine();
storage::ConcurrentIdMapper<storage::Label> &label_mapper();
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper();
@ -120,11 +120,12 @@ class GraphDb {
/// Makes a snapshot from the visibility of the given accessor
bool MakeSnapshot(GraphDbAccessor &accessor);
/// Releases the storage object safely and creates a new object.
/// This is needed because of recovery, otherwise we might try to recover
/// into a storage which has already been polluted because of a failed
/// previous recovery
void ReinitializeStorage();
/// Releases the storage object safely and creates a new object, resets the tx
/// engine.
///
/// This is needed in HA during the leader -> follower transition where we
/// might end up with some stale transactions on the leader.
void Reset();
/// When this is false, no new transactions should be created.
bool is_accepting_transactions() const { return is_accepting_transactions_; }
@ -164,7 +165,8 @@ class GraphDb {
raft::Coordination::LoadFromFile(config_.coordination_config_file)};
raft::RaftServer raft_server_{
config_.server_id, config_.durability_directory,
raft::Config::LoadFromFile(config_.raft_config_file), &coordination_};
raft::Config::LoadFromFile(config_.raft_config_file), &coordination_,
[this]() { this->Reset(); }};
tx::Engine tx_engine_{&raft_server_};
std::unique_ptr<StorageGc> storage_gc_ =
std::make_unique<StorageGc>(*storage_, tx_engine_, config_.gc_cycle_sec);

View File

@ -63,7 +63,7 @@ bool GraphDbAccessor::should_abort() const {
return transaction_.should_abort();
}
raft::RaftServer &GraphDbAccessor::raft_server() { return db_.raft_server(); }
raft::RaftInterface *GraphDbAccessor::raft() { return db_.raft(); }
VertexAccessor GraphDbAccessor::InsertVertex(
std::experimental::optional<gid::Gid> requested_gid) {
@ -76,7 +76,7 @@ VertexAccessor GraphDbAccessor::InsertVertex(
db_.storage().vertices_.access().insert(gid, vertex_vlist).second;
CHECK(success) << "Attempting to insert a vertex with an existing GID: "
<< gid;
raft_server().Emplace(
raft()->Emplace(
database::StateDelta::CreateVertex(transaction_.id_, vertex_vlist->gid_));
auto va = VertexAccessor(vertex_vlist, *this);
return va;
@ -143,7 +143,7 @@ void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) {
// Commit transaction as we finished applying method on newest visible
// records. Write that transaction's ID to the RaftServer as the index has been
// built at this point even if this DBA's transaction aborts for some reason.
raft_server().Emplace(database::StateDelta::BuildIndex(
raft()->Emplace(database::StateDelta::BuildIndex(
transaction_id(), key.label_, LabelName(key.label_), key.property_,
PropertyName(key.property_), key.unique_));
}
@ -170,7 +170,7 @@ void GraphDbAccessor::DeleteIndex(storage::Label label,
db_.AccessBlocking(std::experimental::make_optional(transaction_.id_));
db_.storage().label_property_index_.DeleteIndex(key);
dba->raft_server().Emplace(database::StateDelta::DropIndex(
dba->raft()->Emplace(database::StateDelta::DropIndex(
dba->transaction_id(), key.label_, LabelName(key.label_), key.property_,
PropertyName(key.property_)));
@ -290,7 +290,7 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
return false;
auto *vlist_ptr = vertex_accessor.address();
raft_server().Emplace(database::StateDelta::RemoveVertex(
raft()->Emplace(database::StateDelta::RemoveVertex(
transaction_.id_, vlist_ptr->gid_, check_empty));
vlist_ptr->remove(vertex_accessor.current_, transaction_);
return true;
@ -337,7 +337,7 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
to.SwitchNew();
to.update().in_.emplace(from.address(), edge_vlist, edge_type);
raft_server().Emplace(database::StateDelta::CreateEdge(
raft()->Emplace(database::StateDelta::CreateEdge(
transaction_.id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type,
EdgeTypeName(edge_type)));
@ -362,7 +362,7 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
if (remove_in_edge) edge.to().RemoveInEdge(edge.address());
edge.address()->remove(edge.current_, transaction_);
raft_server().Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
raft()->Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
}
storage::Label GraphDbAccessor::Label(const std::string &label_name) {

View File

@ -11,6 +11,7 @@
#include <cppitertools/imap.hpp>
#include "database/single_node_ha/graph_db.hpp"
#include "raft/raft_interface.hpp"
#include "storage/common/types/types.hpp"
#include "storage/single_node_ha/edge_accessor.hpp"
#include "storage/single_node_ha/vertex_accessor.hpp"
@ -185,9 +186,7 @@ class GraphDbAccessor {
auto Vertices(storage::Label label, bool current_state) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(vlist, *this);
},
[this](auto vlist) { return VertexAccessor(vlist, *this); },
db_.storage().labels_index_.GetVlists(label, transaction_,
current_state));
}
@ -211,9 +210,7 @@ class GraphDbAccessor {
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(vlist, *this);
},
[this](auto vlist) { return VertexAccessor(vlist, *this); },
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), transaction_,
current_state));
@ -241,9 +238,7 @@ class GraphDbAccessor {
CHECK(value.type() != PropertyValue::Type::Null)
<< "Can't query index for propery value type null.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(vlist, *this);
},
[this](auto vlist) { return VertexAccessor(vlist, *this); },
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), value, transaction_,
current_state));
@ -286,9 +281,7 @@ class GraphDbAccessor {
LabelPropertyIndex::Key(label, property)))
<< "Label+property index doesn't exist.";
return iter::imap(
[this](auto vlist) {
return VertexAccessor(vlist, *this);
},
[this](auto vlist) { return VertexAccessor(vlist, *this); },
db_.storage().label_property_index_.GetVlists(
LabelPropertyIndex::Key(label, property), lower, upper,
transaction_, current_state));
@ -374,9 +367,7 @@ class GraphDbAccessor {
// wrap version lists into accessors, which will look for visible versions
auto accessors = iter::imap(
[this](auto id_vlist) {
return EdgeAccessor(id_vlist.second, *this);
},
[this](auto id_vlist) { return EdgeAccessor(id_vlist.second, *this); },
db_.storage().edges_.access());
// filter out the accessors not visible to the current transaction
@ -446,7 +437,7 @@ class GraphDbAccessor {
/// Populates index with vertices containing the key
void PopulateIndex(const LabelPropertyIndex::Key &key);
/// Writes Index (key) creation to RaftServer, marks it as ready for usage
/// Writes Index (key) creation to Raft, marks it as ready for usage
void EnableIndex(const LabelPropertyIndex::Key &key);
/**
@ -583,7 +574,7 @@ class GraphDbAccessor {
bool should_abort() const;
const tx::Transaction &transaction() const { return transaction_; }
raft::RaftServer &raft_server();
raft::RaftInterface *raft();
auto &db() { return db_; }
const auto &db() const { return db_; }

View File

@ -0,0 +1,20 @@
/// @file
#pragma once
#include "durability/single_node_ha/state_delta.hpp"
namespace raft {
/// Exposes only functionality that other parts of Memgraph can interact with,
/// emplacing a state delta into the appropriate Raft log entry.
class RaftInterface {
public:
/// Add StateDelta to the appropriate Raft log entry.
virtual void Emplace(const database::StateDelta &) = 0;
protected:
~RaftInterface() {}
};
} // namespace raft

View File

@ -1,7 +1,7 @@
#include "raft/raft_server.hpp"
#include <experimental/filesystem>
#include <kj/std/iostream.h>
#include <experimental/filesystem>
#include <gflags/gflags.h>
#include <glog/logging.h>
@ -15,18 +15,19 @@ namespace raft {
namespace fs = std::experimental::filesystem;
const std::string kCurrentTermKey = "current_term";
const std::string kVotedForKey = "voted_for";
const std::string kLogKey = "log";
const std::string kRaftDir = "raft";
const std::string kVotedForKey = "voted_for";
const std::string kLogKey = "log";
const std::string kRaftDir = "raft";
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, Coordination *coordination)
const Config &config, Coordination *coordination,
std::function<void(void)> reset_callback)
: config_(config),
coordination_(coordination),
mode_(Mode::FOLLOWER),
server_id_(server_id),
disk_storage_(fs::path(durability_dir) / kRaftDir) {
disk_storage_(fs::path(durability_dir) / kRaftDir),
reset_callback_(reset_callback) {
// Persistent storage initialization/recovery.
if (Log().empty()) {
disk_storage_.Put(kCurrentTermKey, "0");
@ -74,70 +75,68 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
Save(res, res_builder);
});
coordination_->Register<AppendEntriesRpc>(
[this](const auto &req_reader, auto *res_builder) {
std::lock_guard<std::mutex> guard(lock_);
AppendEntriesReq req;
Load(&req, req_reader);
coordination_->Register<AppendEntriesRpc>([this](const auto &req_reader,
auto *res_builder) {
std::lock_guard<std::mutex> guard(lock_);
AppendEntriesReq req;
Load(&req, req_reader);
// [Raft paper 5.1]
// "If a server recieves a request with a stale term,
// it rejects the request"
uint64_t current_term = CurrentTerm();
if (req.term < current_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
}
// [Raft paper 5.1]
// "If a server recieves a request with a stale term,
// it rejects the request"
uint64_t current_term = CurrentTerm();
if (req.term < current_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
}
// respond positively to a heartbeat.
// TODO(ipaljak) review this when implementing log replication.
if (req.entries.empty()) {
AppendEntriesRes res(true, current_term);
Save(res, res_builder);
if (mode_ != Mode::FOLLOWER) {
Transition(Mode::FOLLOWER);
state_changed_.notify_all();
} else {
SetNextElectionTimePoint();
}
return;
}
// respond positively to a heartbeat.
// TODO(ipaljak) review this when implementing log replication.
if (req.entries.empty()) {
AppendEntriesRes res(true, current_term);
Save(res, res_builder);
if (mode_ != Mode::FOLLOWER) {
Transition(Mode::FOLLOWER);
state_changed_.notify_all();
} else {
SetNextElectionTimePoint();
}
return;
}
throw utils::NotYetImplemented(
"AppendEntriesRpc which is not a heartbeat");
throw utils::NotYetImplemented("AppendEntriesRpc which is not a heartbeat");
// [Raft paper 5.3]
// "If a follower's log is inconsistent with the leader's, the
// consistency check will fail in the next AppendEntries RPC."
//
// Consistency checking assures the Log Matching Property:
// - If two entries in different logs have the same index and
// term, then they store the same command.
// - If two entries in different logs have the same index and term,
// then the logs are identical in all preceding entries.
auto log = Log();
if (log.size() < req.prev_log_index ||
log[req.prev_log_index - 1].term != req.prev_log_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
}
// [Raft paper 5.3]
// "If a follower's log is inconsistent with the leader's, the
// consistency check will fail in the next AppendEntries RPC."
//
// Consistency checking assures the Log Matching Property:
// - If two entries in different logs have the same index and
// term, then they store the same command.
// - If two entries in different logs have the same index and term,
// then the logs are identical in all preceding entries.
auto log = Log();
if (log.size() < req.prev_log_index ||
log[req.prev_log_index - 1].term != req.prev_log_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
}
// If existing entry conflicts with new one, we need to delete the
// existing entry and all that follow it.
if (log.size() > req.prev_log_index &&
log[req.prev_log_index].term != req.entries[0].term)
DeleteLogSuffix(req.prev_log_index + 1);
// If existing entry conflicts with new one, we need to delete the
// existing entry and all that follow it.
if (log.size() > req.prev_log_index &&
log[req.prev_log_index].term != req.entries[0].term)
DeleteLogSuffix(req.prev_log_index + 1);
AppendLogEntries(req.leader_commit, req.entries);
AppendEntriesRes res(true, current_term);
Save(res, res_builder);
});
AppendLogEntries(req.leader_commit, req.entries);
AppendEntriesRes res(true, current_term);
Save(res, res_builder);
});
SetNextElectionTimePoint();
election_thread_ =
std::thread(&RaftServer::ElectionThreadMain, this);
election_thread_ = std::thread(&RaftServer::ElectionThreadMain, this);
for (const auto &peer_id : coordination_->GetWorkerIds()) {
if (peer_id == server_id_) continue;
@ -152,12 +151,10 @@ RaftServer::~RaftServer() {
election_change_.notify_all();
for (auto &peer_thread : peer_threads_) {
if (peer_thread.joinable())
peer_thread.join();
if (peer_thread.joinable()) peer_thread.join();
}
if (election_thread_.joinable())
election_thread_.join();
if (election_thread_.joinable()) election_thread_.join();
}
uint64_t RaftServer::CurrentTerm() {
@ -251,16 +248,19 @@ bool RaftServer::LogEntryBuffer::IsStateDeltaTransactionEnd(
void RaftServer::Transition(const Mode &new_mode) {
switch (new_mode) {
case Mode::FOLLOWER: {
if (mode_ == Mode::LEADER) {
reset_callback_();
}
LOG(INFO) << "Server " << server_id_ << ": Transition to FOLLOWER";
SetNextElectionTimePoint();
mode_ = Mode::FOLLOWER;
//log_entry_buffer_.Disable();
// log_entry_buffer_.Disable();
break;
}
case Mode::CANDIDATE: {
LOG(INFO) << "Server " << server_id_ << ": Transition to CANDIDATE";
//log_entry_buffer_.Disable();
// log_entry_buffer_.Disable();
// [Raft thesis, section 3.4]
// "Each candidate restarts its randomized election timeout at the start
@ -294,7 +294,7 @@ void RaftServer::Transition(const Mode &new_mode) {
case Mode::LEADER: {
LOG(INFO) << "Server " << server_id_ << ": Transition to LEADER";
//log_entry_buffer_.Enable();
// log_entry_buffer_.Enable();
// Freeze election timer
next_election_ = TimePoint::max();
@ -493,7 +493,7 @@ bool RaftServer::OutOfSync(uint64_t reply_term) {
void RaftServer::DeleteLogSuffix(int starting_index) {
auto log = Log();
log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed
log.erase(log.begin() + starting_index - 1, log.end()); // 1-indexed
disk_storage_.Put(kLogKey, SerializeLog(log));
}

View File

@ -11,6 +11,7 @@
#include "raft/coordination.hpp"
#include "raft/log_entry.hpp"
#include "raft/raft_rpc_messages.hpp"
#include "raft/raft_interface.hpp"
#include "storage/common/kvstore/kvstore.hpp"
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"
@ -22,11 +23,22 @@ using TimePoint = std::chrono::system_clock::time_point;
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
inline std::string ModeToString(const Mode &mode) {
switch (mode) {
case Mode::FOLLOWER:
return "FOLLOWER";
case Mode::CANDIDATE:
return "CANDIDATE";
case Mode::LEADER:
return "LEADER";
}
}
/// Class which models the behaviour of a single server within the Raft
/// cluster. The class is responsible for storing both volatile and
/// persistent internal state of the corresponding state machine as well
/// as performing operations that comply with the Raft protocol.
class RaftServer {
class RaftServer final : public RaftInterface {
public:
RaftServer() = delete;
@ -37,8 +49,11 @@ class RaftServer {
/// @param durbility_dir directory for persisted data.
/// @param config raft configuration.
/// @param coordination Abstraction for coordination between Raft servers.
/// @param reset_callback Function that is called on each Leader->Follower
/// transition.
RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, raft::Coordination *coordination);
const Config &config, raft::Coordination *coordination,
std::function<void(void)> reset_callback);
~RaftServer();
@ -57,11 +72,12 @@ class RaftServer {
/// persistent storage, an empty Log will be created.
std::vector<LogEntry> Log();
// TODO(msantl): document
/// Start replicating StateDeltas batched together in a Raft log.
void Replicate(const std::vector<database::StateDelta> &log);
// TODO(msantl): document
void Emplace(const database::StateDelta &delta);
/// Emplace a single StateDelta to the corresponding batch. If the StateDelta
/// marks the transaction end, it will replicate the log accorss the cluster.
void Emplace(const database::StateDelta &delta) override;
private:
/// Buffers incomplete Raft logs.
@ -176,6 +192,9 @@ class RaftServer {
storage::KVStore disk_storage_;
/// Callback that needs to be called to reset the db state.
std::function<void(void)> reset_callback_;
/// Makes a transition to a new `raft::Mode`.
///
/// throws InvalidTransitionException when transitioning between incompatible

View File

@ -27,7 +27,7 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key,
dba.PropertyName(key), value);
update().properties_.set(key, value);
dba.UpdatePropertyIndex(key, *this, &update());
db_accessor().raft_server().Emplace(delta);
db_accessor().raft()->Emplace(delta);
}
template <>
@ -38,7 +38,7 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key,
dba.PropertyName(key), value);
update().properties_.set(key, value);
db_accessor().raft_server().Emplace(delta);
db_accessor().raft()->Emplace(delta);
}
template <>
@ -48,7 +48,7 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) {
StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
update().properties_.set(key, PropertyValue::Null);
db_accessor().raft_server().Emplace(delta);
db_accessor().raft()->Emplace(delta);
}
template <>
@ -58,7 +58,7 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) {
StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
update().properties_.set(key, PropertyValue::Null);
db_accessor().raft_server().Emplace(delta);
db_accessor().raft()->Emplace(delta);
}
template <typename TRecord>

View File

@ -24,7 +24,7 @@ void VertexAccessor::add_label(storage::Label label) {
// not a duplicate label, add it
if (!utils::Contains(vertex.labels_, label)) {
vertex.labels_.emplace_back(label);
dba.raft_server().Emplace(delta);
dba.raft()->Emplace(delta);
dba.UpdateLabelIndices(label, *this, &vertex);
}
}
@ -39,7 +39,7 @@ void VertexAccessor::remove_label(storage::Label label) {
auto found = std::find(labels.begin(), labels.end(), delta.label);
std::swap(*found, labels.back());
labels.pop_back();
dba.raft_server().Emplace(delta);
dba.raft()->Emplace(delta);
}
}

View File

@ -6,12 +6,12 @@
#include "glog/logging.h"
#include "durability/single_node_ha/state_delta.hpp"
#include "raft/raft_server.hpp"
namespace tx {
Engine::Engine(raft::RaftServer *raft_server) : raft_server_(raft_server) {
CHECK(raft_server) << "LogBuffer can't be nullptr in HA";
Engine::Engine(raft::RaftInterface *raft)
: clog_(std::make_unique<CommitLog>()), raft_(raft) {
CHECK(raft) << "Raft can't be nullptr in HA";
}
Transaction *Engine::Begin() {
@ -32,7 +32,7 @@ Transaction *Engine::BeginBlocking(
if (!accepting_transactions_.load())
throw TransactionEngineError("Engine is not accepting new transactions");
// Block the engine from acceping new transactions.
// Block the engine from accepting new transactions.
accepting_transactions_.store(false);
// Set active transactions to abort ASAP.
@ -79,9 +79,9 @@ CommandId Engine::UpdateCommand(TransactionId id) {
void Engine::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Commiting transaction " << t.id_;
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_committed(t.id_);
clog_->set_committed(t.id_);
active_.remove(t.id_);
raft_server_->Emplace(database::StateDelta::TxCommit(t.id_));
raft_->Emplace(database::StateDelta::TxCommit(t.id_));
store_.erase(store_.find(t.id_));
if (t.blocking()) {
accepting_transactions_.store(true);
@ -91,9 +91,9 @@ void Engine::Commit(const Transaction &t) {
void Engine::Abort(const Transaction &t) {
VLOG(11) << "[Tx] Aborting transaction " << t.id_;
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
clog_->set_aborted(t.id_);
active_.remove(t.id_);
raft_server_->Emplace(database::StateDelta::TxAbort(t.id_));
raft_->Emplace(database::StateDelta::TxAbort(t.id_));
store_.erase(store_.find(t.id_));
if (t.blocking()) {
accepting_transactions_.store(true);
@ -101,7 +101,7 @@ void Engine::Abort(const Transaction &t) {
}
CommitLog::Info Engine::Info(TransactionId tx) const {
return clog_.fetch_info(tx);
return clog_->fetch_info(tx);
}
Snapshot Engine::GlobalGcSnapshot() {
@ -139,7 +139,7 @@ TransactionId Engine::LocalOldestActive() const {
}
void Engine::GarbageCollectCommitLog(TransactionId tx_id) {
clog_.garbage_collect_older(tx_id);
clog_->garbage_collect_older(tx_id);
}
void Engine::LocalForEachActiveTransaction(
@ -163,12 +163,50 @@ void Engine::EnsureNextIdGreater(TransactionId tx_id) {
counter_ = std::max(tx_id, counter_);
}
void Engine::Reset() {
Snapshot wait_for_txs;
{
std::lock_guard<utils::SpinLock> guard(lock_);
// Block the engine from accepting new transactions.
accepting_transactions_.store(false);
// Set active transactions to abort ASAP.
for (auto transaction : active_) {
store_.find(transaction)->second->set_should_abort();
}
wait_for_txs = active_;
}
// Wait for all active transactions to end.
for (auto id : wait_for_txs) {
while (Info(id).is_active()) {
// TODO reconsider this constant, currently rule-of-thumb chosen
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
// Only after all transactions have finished, reset the engine.
std::lock_guard<utils::SpinLock> guard(lock_);
counter_ = 0;
store_.clear();
active_.clear();
{
clog_ = nullptr;
clog_ = std::make_unique<CommitLog>();
}
// local_lock_graph_ should be empty because all transactions should've finish
// by now.
accepting_transactions_.store(true);
}
Transaction *Engine::BeginTransaction(bool blocking) {
TransactionId id{++counter_};
Transaction *t = new Transaction(id, active_, *this, blocking);
active_.insert(id);
store_.emplace(id, t);
raft_server_->Emplace(database::StateDelta::TxBegin(id));
raft_->Emplace(database::StateDelta::TxBegin(id));
return t;
}

View File

@ -6,15 +6,11 @@
#include <experimental/optional>
#include <unordered_map>
#include "raft/raft_interface.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/transaction.hpp"
#include "utils/thread/sync.hpp"
// Forward declarations.
namespace raft {
class RaftServer;
}
namespace tx {
class TransactionEngineError : public utils::BasicException {
@ -23,11 +19,11 @@ class TransactionEngineError : public utils::BasicException {
/// High availability single node transaction engine.
///
/// Requires RaftServer where it stores StateDeltas containing transaction
/// Requires RaftInterface where it stores StateDeltas containing transaction
/// information needed for raft followers when replicating logs.
class Engine final {
public:
explicit Engine(raft::RaftServer *log_buffer);
explicit Engine(raft::RaftInterface *raft);
Engine(const Engine &) = delete;
Engine(Engine &&) = delete;
@ -35,10 +31,10 @@ class Engine final {
Engine &operator=(Engine &&) = delete;
Transaction *Begin();
/// Blocking transactions are used when we can't allow any other transaction to
/// run (besides this one). This is the reason why this transactions blocks the
/// engine from creating new transactions and waits for the existing ones to
/// finish.
/// Blocking transactions are used when we can't allow any other transaction
/// to run (besides this one). This is the reason why this transactions blocks
/// the engine from creating new transactions and waits for the existing ones
/// to finish.
Transaction *BeginBlocking(
std::experimental::optional<TransactionId> parent_tx);
CommandId Advance(TransactionId id);
@ -59,6 +55,12 @@ class Engine final {
auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; }
/// Reset the internal state of the engine. Use with caution as this will
/// block the engine from receiving any new transaction and will hint all
/// transactions to abort and will wait for them to finish before reseting
/// engines internal state.
void Reset();
private:
// Map lock dependencies. Each entry maps (tx_that_wants_lock,
// tx_that_holds_lock). Used for local deadlock resolution.
@ -66,11 +68,11 @@ class Engine final {
ConcurrentMap<TransactionId, TransactionId> local_lock_graph_;
TransactionId counter_{0};
CommitLog clog_;
std::unique_ptr<CommitLog> clog_{nullptr};
std::unordered_map<TransactionId, std::unique_ptr<Transaction>> store_;
Snapshot active_;
mutable utils::SpinLock lock_;
raft::RaftServer *raft_server_{nullptr};
raft::RaftInterface *raft_{nullptr};
std::atomic<bool> accepting_transactions_{true};
// Helper method for transaction begin.

View File

@ -56,6 +56,11 @@ class Snapshot final {
transaction_ids_.erase(last, transaction_ids_.end());
}
/// Removes all transactions from this Snapshot.
void clear() {
transaction_ids_.clear();
}
TransactionId front() const {
DCHECK(transaction_ids_.size()) << "Snapshot.front() on empty Snapshot";
return transaction_ids_.front();

View File

@ -246,6 +246,9 @@ target_link_libraries(${test_prefix}transaction_engine_distributed mg-distribute
add_unit_test(transaction_engine_single_node.cpp)
target_link_libraries(${test_prefix}transaction_engine_single_node mg-single-node kvstore_dummy_lib)
add_unit_test(transaction_engine_single_node_ha.cpp)
target_link_libraries(${test_prefix}transaction_engine_single_node_ha mg-single-node-ha kvstore_dummy_lib)
add_unit_test(typed_value.cpp)
target_link_libraries(${test_prefix}typed_value mg-single-node kvstore_dummy_lib)

View File

@ -0,0 +1,59 @@
#include "gtest/gtest.h"
#include <unordered_map>
#include <vector>
#include "durability/single_node_ha/state_delta.hpp"
#include "raft/raft_interface.hpp"
#include "transactions/single_node_ha/engine.hpp"
#include "transactions/transaction.hpp"
using namespace tx;
class RaftMock final : public raft::RaftInterface {
public:
void Emplace(const database::StateDelta &delta) override {
log_[delta.transaction_id].emplace_back(std::move(delta));
}
std::vector<database::StateDelta> GetLogForTx(
const tx::TransactionId &tx_id) {
return log_[tx_id];
}
private:
std::unordered_map<tx::TransactionId, std::vector<database::StateDelta>> log_;
};
TEST(Engine, Reset) {
RaftMock raft;
Engine engine{&raft};
auto t0 = engine.Begin();
EXPECT_EQ(t0->id_, 1);
engine.Commit(*t0);
engine.Reset();
auto t1 = engine.Begin();
EXPECT_EQ(t1->id_, 1);
engine.Commit(*t1);
}
TEST(Engine, TxStateDelta) {
RaftMock raft;
Engine engine{&raft};
auto t0 = engine.Begin();
tx::TransactionId tx_id = t0->id_;
engine.Commit(*t0);
auto t0_log = raft.GetLogForTx(tx_id);
EXPECT_EQ(t0_log.size(), 2);
using Type = enum database::StateDelta::Type;
EXPECT_EQ(t0_log[0].type, Type::TRANSACTION_BEGIN);
EXPECT_EQ(t0_log[0].transaction_id, tx_id);
EXPECT_EQ(t0_log[1].type, Type::TRANSACTION_COMMIT);
EXPECT_EQ(t0_log[1].transaction_id, tx_id);
}