Implement StateDelta apply method for Raft

Summary:
TransactionReplicator replicates transactions on follower machines in
HA memgraph. Our DB accessor API doesn't provide us with the functionality to
begin transactions with non-increasing ids. This is why the
`TransactionReplicator` uses a internal map that maps tx ids from the leader
node to transactions on the follower node (whose id doesn't have to match the
leaders tx id).

If the leader has the following transaction timeline:

```
    L
tx1
 |
 |   tx2
 |    |
 |    |
 |    |
 |    |
 |    |
 |   tx2
 |
 |
 |
 |
tx1
```

`tx2` will commit first and will be replicated. When applying `tx2` on follower
nodes, they will start a new transaction with tx id `1`.  When `tx1` starts
replicating, followers will start a new transaction with tx id `2`. And this is
wehre `TransactionReplicator` kicks in.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1775
This commit is contained in:
Matija Santl 2018-12-12 15:50:17 +01:00
parent 68b2dcc490
commit 5e6cf0724a
9 changed files with 146 additions and 32 deletions

View File

@ -253,6 +253,7 @@ set(mg_single_node_ha_sources
database/single_node_ha/config.cpp database/single_node_ha/config.cpp
database/single_node_ha/graph_db.cpp database/single_node_ha/graph_db.cpp
database/single_node_ha/graph_db_accessor.cpp database/single_node_ha/graph_db_accessor.cpp
database/single_node_ha/state_delta_applier.cpp
durability/single_node_ha/state_delta.cpp durability/single_node_ha/state_delta.cpp
durability/single_node_ha/paths.cpp durability/single_node_ha/paths.cpp
durability/single_node_ha/snapshooter.cpp durability/single_node_ha/snapshooter.cpp

View File

@ -7,6 +7,7 @@
#include <vector> #include <vector>
#include "database/single_node_ha/counters.hpp" #include "database/single_node_ha/counters.hpp"
#include "database/single_node_ha/state_delta_applier.hpp"
#include "io/network/endpoint.hpp" #include "io/network/endpoint.hpp"
#include "raft/coordination.hpp" #include "raft/coordination.hpp"
#include "raft/raft_server.hpp" #include "raft/raft_server.hpp"
@ -163,9 +164,13 @@ class GraphDb {
config_.rpc_num_server_workers, config_.rpc_num_client_workers, config_.rpc_num_server_workers, config_.rpc_num_client_workers,
config_.server_id, config_.server_id,
raft::Coordination::LoadFromFile(config_.coordination_config_file)}; raft::Coordination::LoadFromFile(config_.coordination_config_file)};
database::StateDeltaApplier delta_applier_{this};
raft::RaftServer raft_server_{ raft::RaftServer raft_server_{
config_.server_id, config_.durability_directory, config_.server_id,
raft::Config::LoadFromFile(config_.raft_config_file), &coordination_, config_.durability_directory,
raft::Config::LoadFromFile(config_.raft_config_file),
&coordination_,
&delta_applier_,
[this]() { this->Reset(); }}; [this]() { this->Reset(); }};
tx::Engine tx_engine_{&raft_server_}; tx::Engine tx_engine_{&raft_server_};
std::unique_ptr<StorageGc> storage_gc_ = std::unique_ptr<StorageGc> storage_gc_ =

View File

@ -0,0 +1,57 @@
#include "database/single_node_ha/state_delta_applier.hpp"
#include "database/single_node_ha/graph_db_accessor.hpp"
#include "utils/exceptions.hpp"
namespace database {
StateDeltaApplier::StateDeltaApplier(GraphDb *db) : db_(db) {}
void StateDeltaApplier::Apply(const std::vector<StateDelta> &deltas) {
for (auto &delta : deltas) {
switch (delta.type) {
case StateDelta::Type::TRANSACTION_BEGIN:
Begin(delta.transaction_id);
break;
case StateDelta::Type::TRANSACTION_COMMIT:
Commit(delta.transaction_id);
break;
case StateDelta::Type::TRANSACTION_ABORT:
LOG(FATAL) << "StateDeltaApplier shouldn't know about aborted "
"transactions";
break;
case StateDelta::Type::BUILD_INDEX:
case StateDelta::Type::DROP_INDEX:
throw utils::NotYetImplemented(
"High availability doesn't support index at the moment!");
default:
delta.Apply(*GetAccessor(delta.transaction_id));
}
}
}
void StateDeltaApplier::Begin(const tx::TransactionId &tx_id) {
CHECK(accessors_.find(tx_id) == accessors_.end())
<< "Double transaction start";
accessors_.emplace(tx_id, db_->Access());
}
void StateDeltaApplier::Abort(const tx::TransactionId &tx_id) {
GetAccessor(tx_id)->Abort();
accessors_.erase(accessors_.find(tx_id));
}
void StateDeltaApplier::Commit(const tx::TransactionId &tx_id) {
GetAccessor(tx_id)->Commit();
accessors_.erase(accessors_.find(tx_id));
}
GraphDbAccessor *StateDeltaApplier::GetAccessor(
const tx::TransactionId &tx_id) {
auto found = accessors_.find(tx_id);
CHECK(found != accessors_.end())
<< "Accessor does not exist for transaction: " << tx_id;
return found->second.get();
}
} // namespace database

View File

@ -0,0 +1,36 @@
/// @file
#pragma once
#include <unordered_map>
#include <vector>
#include "durability/single_node_ha/state_delta.hpp"
#include "transactions/type.hpp"
namespace database {
class GraphDb;
/// Interface for accessing transactions and applying StateDeltas on machines in
/// Raft follower mode.
class StateDeltaApplier final {
public:
explicit StateDeltaApplier(GraphDb *db);
void Apply(const std::vector<StateDelta> &deltas);
private:
void Begin(const tx::TransactionId &tx_id);
void Abort(const tx::TransactionId &tx_id);
void Commit(const tx::TransactionId &tx_id);
GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id);
GraphDb *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
} // namespace database

View File

@ -3,6 +3,7 @@
#pragma once #pragma once
#include "durability/single_node_ha/state_delta.hpp" #include "durability/single_node_ha/state_delta.hpp"
#include "transactions/type.hpp"
namespace raft { namespace raft {
@ -13,6 +14,10 @@ class RaftInterface {
/// Add StateDelta to the appropriate Raft log entry. /// Add StateDelta to the appropriate Raft log entry.
virtual void Emplace(const database::StateDelta &) = 0; virtual void Emplace(const database::StateDelta &) = 0;
/// Check if the transaction with the given transaction id has been
/// replicated on the majority of the Raft cluster and commited.
virtual bool HasCommitted(const tx::TransactionId &tx_id) = 0;
protected: protected:
~RaftInterface() {} ~RaftInterface() {}
}; };

View File

@ -21,9 +21,11 @@ const std::string kRaftDir = "raft";
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, Coordination *coordination, const Config &config, Coordination *coordination,
database::StateDeltaApplier *delta_applier,
std::function<void(void)> reset_callback) std::function<void(void)> reset_callback)
: config_(config), : config_(config),
coordination_(coordination), coordination_(coordination),
delta_applier_(delta_applier),
mode_(Mode::FOLLOWER), mode_(Mode::FOLLOWER),
server_id_(server_id), server_id_(server_id),
disk_storage_(fs::path(durability_dir) / kRaftDir), disk_storage_(fs::path(durability_dir) / kRaftDir),
@ -209,6 +211,13 @@ void RaftServer::Emplace(const database::StateDelta &delta) {
log_entry_buffer_.Emplace(delta); log_entry_buffer_.Emplace(delta);
} }
bool RaftServer::HasCommitted(const tx::TransactionId &tx_id) {
// When in follower mode return true.
// Raise an exception if in candidate mode (should't happen).
// Check the state and return the correct value if leader.
return true;
}
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
: raft_server_(raft_server) { : raft_server_(raft_server) {
CHECK(raft_server_) << "RaftServer can't be nullptr"; CHECK(raft_server_) << "RaftServer can't be nullptr";
@ -231,7 +240,7 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
if (!enabled_) return; if (!enabled_) return;
tx::TransactionId tx_id = delta.transaction_id; tx::TransactionId tx_id = delta.transaction_id;
if (IsStateDeltaTransactionEnd(delta)) { if (delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) {
auto it = logs_.find(tx_id); auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
@ -240,32 +249,15 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
logs_.erase(it); logs_.erase(it);
raft_server_->Replicate(log); raft_server_->Replicate(log);
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
logs_.erase(it);
} else { } else {
logs_[tx_id].emplace_back(std::move(delta)); logs_[tx_id].emplace_back(std::move(delta));
} }
} }
bool RaftServer::LogEntryBuffer::IsStateDeltaTransactionEnd(
const database::StateDelta &delta) {
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_COMMIT:
return true;
case database::StateDelta::Type::TRANSACTION_ABORT:
case database::StateDelta::Type::TRANSACTION_BEGIN:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
case database::StateDelta::Type::ADD_LABEL:
case database::StateDelta::Type::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_VERTEX:
case database::StateDelta::Type::REMOVE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
case database::StateDelta::Type::DROP_INDEX:
return false;
}
}
void RaftServer::Transition(const Mode &new_mode) { void RaftServer::Transition(const Mode &new_mode) {
switch (new_mode) { switch (new_mode) {
case Mode::FOLLOWER: { case Mode::FOLLOWER: {
@ -537,8 +529,9 @@ void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
auto log = Log(); auto log = Log();
for (auto &entry : new_entries) log.emplace_back(entry); for (auto &entry : new_entries) log.emplace_back(entry);
// See Raft paper 5.3 // See Raft paper 5.3
if (leader_commit_index > commit_index_) if (leader_commit_index > commit_index_) {
commit_index_ = std::min(leader_commit_index, log.size()); commit_index_ = std::min(leader_commit_index, log.size() - 1);
}
disk_storage_.Put(kLogKey, SerializeLog(log)); disk_storage_.Put(kLogKey, SerializeLog(log));
} }

View File

@ -6,12 +6,13 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "database/single_node_ha/state_delta_applier.hpp"
#include "durability/single_node_ha/state_delta.hpp" #include "durability/single_node_ha/state_delta.hpp"
#include "raft/config.hpp" #include "raft/config.hpp"
#include "raft/coordination.hpp" #include "raft/coordination.hpp"
#include "raft/log_entry.hpp" #include "raft/log_entry.hpp"
#include "raft/raft_rpc_messages.hpp"
#include "raft/raft_interface.hpp" #include "raft/raft_interface.hpp"
#include "raft/raft_rpc_messages.hpp"
#include "storage/common/kvstore/kvstore.hpp" #include "storage/common/kvstore/kvstore.hpp"
#include "transactions/type.hpp" #include "transactions/type.hpp"
#include "utils/scheduler.hpp" #include "utils/scheduler.hpp"
@ -49,10 +50,12 @@ class RaftServer final : public RaftInterface {
/// @param durbility_dir directory for persisted data. /// @param durbility_dir directory for persisted data.
/// @param config raft configuration. /// @param config raft configuration.
/// @param coordination Abstraction for coordination between Raft servers. /// @param coordination Abstraction for coordination between Raft servers.
/// @param delta_applier TODO
/// @param reset_callback Function that is called on each Leader->Follower /// @param reset_callback Function that is called on each Leader->Follower
/// transition. /// transition.
RaftServer(uint16_t server_id, const std::string &durability_dir, RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, raft::Coordination *coordination, const Config &config, raft::Coordination *coordination,
database::StateDeltaApplier *delta_applier,
std::function<void(void)> reset_callback); std::function<void(void)> reset_callback);
/// Starts the RPC servers and starts mechanisms inside Raft protocol. /// Starts the RPC servers and starts mechanisms inside Raft protocol.
@ -83,6 +86,10 @@ class RaftServer final : public RaftInterface {
/// marks the transaction end, it will replicate the log accorss the cluster. /// marks the transaction end, it will replicate the log accorss the cluster.
void Emplace(const database::StateDelta &delta) override; void Emplace(const database::StateDelta &delta) override;
/// Check if the transaction with the given transaction id has been
/// replicated on the majority of the Raft cluster and commited.
bool HasCommitted(const tx::TransactionId &tx_id) override;
private: private:
/// Buffers incomplete Raft logs. /// Buffers incomplete Raft logs.
/// ///
@ -106,8 +113,9 @@ class RaftServer final : public RaftInterface {
/// Insert a new StateDelta in logs. /// Insert a new StateDelta in logs.
/// ///
/// If for a state delta, `IsStateDeltaTransactionEnd` returns true, this /// If the StateDelta type is `TRANSACTION_COMMIT` it will start
/// marks that this log is complete and the replication can start. /// replicating, and if the type is `TRANSACTION_ABORT` it will delete the
/// log from buffer.
void Emplace(const database::StateDelta &delta); void Emplace(const database::StateDelta &delta);
private: private:
@ -117,8 +125,6 @@ class RaftServer final : public RaftInterface {
logs_; logs_;
RaftServer *raft_server_{nullptr}; RaftServer *raft_server_{nullptr};
bool IsStateDeltaTransactionEnd(const database::StateDelta &delta);
}; };
mutable std::mutex lock_; ///< Guards all internal state. mutable std::mutex lock_; ///< Guards all internal state.
@ -129,6 +135,7 @@ class RaftServer final : public RaftInterface {
Config config_; ///< Raft config. Config config_; ///< Raft config.
Coordination *coordination_{nullptr}; ///< Cluster coordination. Coordination *coordination_{nullptr}; ///< Cluster coordination.
database::StateDeltaApplier *delta_applier_{nullptr};
Mode mode_; ///< Server's current mode. Mode mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server. uint16_t server_id_; ///< ID of the current server.

View File

@ -78,10 +78,16 @@ CommandId Engine::UpdateCommand(TransactionId id) {
void Engine::Commit(const Transaction &t) { void Engine::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Commiting transaction " << t.id_; VLOG(11) << "[Tx] Commiting transaction " << t.id_;
raft_->Emplace(database::StateDelta::TxCommit(t.id_));
// Wait for Raft to receive confirmation from the majority of followers.
while (!raft_->HasCommitted(t.id_)) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
std::lock_guard<utils::SpinLock> guard(lock_); std::lock_guard<utils::SpinLock> guard(lock_);
clog_->set_committed(t.id_); clog_->set_committed(t.id_);
active_.remove(t.id_); active_.remove(t.id_);
raft_->Emplace(database::StateDelta::TxCommit(t.id_));
store_.erase(store_.find(t.id_)); store_.erase(store_.find(t.id_));
if (t.blocking()) { if (t.blocking()) {
accepting_transactions_.store(true); accepting_transactions_.store(true);

View File

@ -16,6 +16,10 @@ class RaftMock final : public raft::RaftInterface {
log_[delta.transaction_id].emplace_back(std::move(delta)); log_[delta.transaction_id].emplace_back(std::move(delta));
} }
bool HasCommitted(const tx::TransactionId &tx_id) override {
return true;
}
std::vector<database::StateDelta> GetLogForTx( std::vector<database::StateDelta> GetLogForTx(
const tx::TransactionId &tx_id) { const tx::TransactionId &tx_id) {
return log_[tx_id]; return log_[tx_id];