From 5e6cf0724a91330c629bc95950e4b363da080a0e Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Wed, 12 Dec 2018 15:50:17 +0100 Subject: [PATCH] Implement StateDelta apply method for Raft Summary: TransactionReplicator replicates transactions on follower machines in HA memgraph. Our DB accessor API doesn't provide us with the functionality to begin transactions with non-increasing ids. This is why the `TransactionReplicator` uses a internal map that maps tx ids from the leader node to transactions on the follower node (whose id doesn't have to match the leaders tx id). If the leader has the following transaction timeline: ``` L tx1 | | tx2 | | | | | | | | | | | tx2 | | | | tx1 ``` `tx2` will commit first and will be replicated. When applying `tx2` on follower nodes, they will start a new transaction with tx id `1`. When `tx1` starts replicating, followers will start a new transaction with tx id `2`. And this is wehre `TransactionReplicator` kicks in. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1775 --- src/CMakeLists.txt | 1 + src/database/single_node_ha/graph_db.hpp | 9 ++- .../single_node_ha/state_delta_applier.cpp | 57 +++++++++++++++++++ .../single_node_ha/state_delta_applier.hpp | 36 ++++++++++++ src/raft/raft_interface.hpp | 5 ++ src/raft/raft_server.cpp | 41 ++++++------- src/raft/raft_server.hpp | 17 ++++-- src/transactions/single_node_ha/engine.cpp | 8 ++- .../transaction_engine_single_node_ha.cpp | 4 ++ 9 files changed, 146 insertions(+), 32 deletions(-) create mode 100644 src/database/single_node_ha/state_delta_applier.cpp create mode 100644 src/database/single_node_ha/state_delta_applier.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index dac62079c..a487c53b4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -253,6 +253,7 @@ set(mg_single_node_ha_sources database/single_node_ha/config.cpp database/single_node_ha/graph_db.cpp database/single_node_ha/graph_db_accessor.cpp + database/single_node_ha/state_delta_applier.cpp durability/single_node_ha/state_delta.cpp durability/single_node_ha/paths.cpp durability/single_node_ha/snapshooter.cpp diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index a9869a835..7c192c717 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -7,6 +7,7 @@ #include #include "database/single_node_ha/counters.hpp" +#include "database/single_node_ha/state_delta_applier.hpp" #include "io/network/endpoint.hpp" #include "raft/coordination.hpp" #include "raft/raft_server.hpp" @@ -163,9 +164,13 @@ class GraphDb { config_.rpc_num_server_workers, config_.rpc_num_client_workers, config_.server_id, raft::Coordination::LoadFromFile(config_.coordination_config_file)}; + database::StateDeltaApplier delta_applier_{this}; raft::RaftServer raft_server_{ - config_.server_id, config_.durability_directory, - raft::Config::LoadFromFile(config_.raft_config_file), &coordination_, + config_.server_id, + config_.durability_directory, + raft::Config::LoadFromFile(config_.raft_config_file), + &coordination_, + &delta_applier_, [this]() { this->Reset(); }}; tx::Engine tx_engine_{&raft_server_}; std::unique_ptr storage_gc_ = diff --git a/src/database/single_node_ha/state_delta_applier.cpp b/src/database/single_node_ha/state_delta_applier.cpp new file mode 100644 index 000000000..83f54854a --- /dev/null +++ b/src/database/single_node_ha/state_delta_applier.cpp @@ -0,0 +1,57 @@ +#include "database/single_node_ha/state_delta_applier.hpp" + +#include "database/single_node_ha/graph_db_accessor.hpp" +#include "utils/exceptions.hpp" + +namespace database { + +StateDeltaApplier::StateDeltaApplier(GraphDb *db) : db_(db) {} + +void StateDeltaApplier::Apply(const std::vector &deltas) { + for (auto &delta : deltas) { + switch (delta.type) { + case StateDelta::Type::TRANSACTION_BEGIN: + Begin(delta.transaction_id); + break; + case StateDelta::Type::TRANSACTION_COMMIT: + Commit(delta.transaction_id); + break; + case StateDelta::Type::TRANSACTION_ABORT: + LOG(FATAL) << "StateDeltaApplier shouldn't know about aborted " + "transactions"; + break; + case StateDelta::Type::BUILD_INDEX: + case StateDelta::Type::DROP_INDEX: + throw utils::NotYetImplemented( + "High availability doesn't support index at the moment!"); + default: + delta.Apply(*GetAccessor(delta.transaction_id)); + } + } +} + +void StateDeltaApplier::Begin(const tx::TransactionId &tx_id) { + CHECK(accessors_.find(tx_id) == accessors_.end()) + << "Double transaction start"; + accessors_.emplace(tx_id, db_->Access()); +} + +void StateDeltaApplier::Abort(const tx::TransactionId &tx_id) { + GetAccessor(tx_id)->Abort(); + accessors_.erase(accessors_.find(tx_id)); +} + +void StateDeltaApplier::Commit(const tx::TransactionId &tx_id) { + GetAccessor(tx_id)->Commit(); + accessors_.erase(accessors_.find(tx_id)); +} + +GraphDbAccessor *StateDeltaApplier::GetAccessor( + const tx::TransactionId &tx_id) { + auto found = accessors_.find(tx_id); + CHECK(found != accessors_.end()) + << "Accessor does not exist for transaction: " << tx_id; + return found->second.get(); +} + +} // namespace database diff --git a/src/database/single_node_ha/state_delta_applier.hpp b/src/database/single_node_ha/state_delta_applier.hpp new file mode 100644 index 000000000..69385bf48 --- /dev/null +++ b/src/database/single_node_ha/state_delta_applier.hpp @@ -0,0 +1,36 @@ +/// @file +#pragma once + +#include +#include + +#include "durability/single_node_ha/state_delta.hpp" +#include "transactions/type.hpp" + +namespace database { + +class GraphDb; + +/// Interface for accessing transactions and applying StateDeltas on machines in +/// Raft follower mode. +class StateDeltaApplier final { + public: + explicit StateDeltaApplier(GraphDb *db); + + void Apply(const std::vector &deltas); + + private: + void Begin(const tx::TransactionId &tx_id); + + void Abort(const tx::TransactionId &tx_id); + + void Commit(const tx::TransactionId &tx_id); + + GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id); + + GraphDb *db_; + std::unordered_map> + accessors_; +}; + +} // namespace database diff --git a/src/raft/raft_interface.hpp b/src/raft/raft_interface.hpp index a4430c7ec..d7c9de4a0 100644 --- a/src/raft/raft_interface.hpp +++ b/src/raft/raft_interface.hpp @@ -3,6 +3,7 @@ #pragma once #include "durability/single_node_ha/state_delta.hpp" +#include "transactions/type.hpp" namespace raft { @@ -13,6 +14,10 @@ class RaftInterface { /// Add StateDelta to the appropriate Raft log entry. virtual void Emplace(const database::StateDelta &) = 0; + /// Check if the transaction with the given transaction id has been + /// replicated on the majority of the Raft cluster and commited. + virtual bool HasCommitted(const tx::TransactionId &tx_id) = 0; + protected: ~RaftInterface() {} }; diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index ffb2a5495..284a7ed4d 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -21,9 +21,11 @@ const std::string kRaftDir = "raft"; RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, const Config &config, Coordination *coordination, + database::StateDeltaApplier *delta_applier, std::function reset_callback) : config_(config), coordination_(coordination), + delta_applier_(delta_applier), mode_(Mode::FOLLOWER), server_id_(server_id), disk_storage_(fs::path(durability_dir) / kRaftDir), @@ -209,6 +211,13 @@ void RaftServer::Emplace(const database::StateDelta &delta) { log_entry_buffer_.Emplace(delta); } +bool RaftServer::HasCommitted(const tx::TransactionId &tx_id) { + // When in follower mode return true. + // Raise an exception if in candidate mode (should't happen). + // Check the state and return the correct value if leader. + return true; +} + RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server) : raft_server_(raft_server) { CHECK(raft_server_) << "RaftServer can't be nullptr"; @@ -231,7 +240,7 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { if (!enabled_) return; tx::TransactionId tx_id = delta.transaction_id; - if (IsStateDeltaTransactionEnd(delta)) { + if (delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) { auto it = logs_.find(tx_id); CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; @@ -240,32 +249,15 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { logs_.erase(it); raft_server_->Replicate(log); + } else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) { + auto it = logs_.find(tx_id); + CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id; + logs_.erase(it); } else { logs_[tx_id].emplace_back(std::move(delta)); } } -bool RaftServer::LogEntryBuffer::IsStateDeltaTransactionEnd( - const database::StateDelta &delta) { - switch (delta.type) { - case database::StateDelta::Type::TRANSACTION_COMMIT: - return true; - case database::StateDelta::Type::TRANSACTION_ABORT: - case database::StateDelta::Type::TRANSACTION_BEGIN: - case database::StateDelta::Type::CREATE_VERTEX: - case database::StateDelta::Type::CREATE_EDGE: - case database::StateDelta::Type::SET_PROPERTY_VERTEX: - case database::StateDelta::Type::SET_PROPERTY_EDGE: - case database::StateDelta::Type::ADD_LABEL: - case database::StateDelta::Type::REMOVE_LABEL: - case database::StateDelta::Type::REMOVE_VERTEX: - case database::StateDelta::Type::REMOVE_EDGE: - case database::StateDelta::Type::BUILD_INDEX: - case database::StateDelta::Type::DROP_INDEX: - return false; - } -} - void RaftServer::Transition(const Mode &new_mode) { switch (new_mode) { case Mode::FOLLOWER: { @@ -537,8 +529,9 @@ void RaftServer::AppendLogEntries(uint64_t leader_commit_index, auto log = Log(); for (auto &entry : new_entries) log.emplace_back(entry); // See Raft paper 5.3 - if (leader_commit_index > commit_index_) - commit_index_ = std::min(leader_commit_index, log.size()); + if (leader_commit_index > commit_index_) { + commit_index_ = std::min(leader_commit_index, log.size() - 1); + } disk_storage_.Put(kLogKey, SerializeLog(log)); } diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index e3292e4ef..05e3919d4 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -6,12 +6,13 @@ #include #include +#include "database/single_node_ha/state_delta_applier.hpp" #include "durability/single_node_ha/state_delta.hpp" #include "raft/config.hpp" #include "raft/coordination.hpp" #include "raft/log_entry.hpp" -#include "raft/raft_rpc_messages.hpp" #include "raft/raft_interface.hpp" +#include "raft/raft_rpc_messages.hpp" #include "storage/common/kvstore/kvstore.hpp" #include "transactions/type.hpp" #include "utils/scheduler.hpp" @@ -49,10 +50,12 @@ class RaftServer final : public RaftInterface { /// @param durbility_dir directory for persisted data. /// @param config raft configuration. /// @param coordination Abstraction for coordination between Raft servers. + /// @param delta_applier TODO /// @param reset_callback Function that is called on each Leader->Follower /// transition. RaftServer(uint16_t server_id, const std::string &durability_dir, const Config &config, raft::Coordination *coordination, + database::StateDeltaApplier *delta_applier, std::function reset_callback); /// Starts the RPC servers and starts mechanisms inside Raft protocol. @@ -83,6 +86,10 @@ class RaftServer final : public RaftInterface { /// marks the transaction end, it will replicate the log accorss the cluster. void Emplace(const database::StateDelta &delta) override; + /// Check if the transaction with the given transaction id has been + /// replicated on the majority of the Raft cluster and commited. + bool HasCommitted(const tx::TransactionId &tx_id) override; + private: /// Buffers incomplete Raft logs. /// @@ -106,8 +113,9 @@ class RaftServer final : public RaftInterface { /// Insert a new StateDelta in logs. /// - /// If for a state delta, `IsStateDeltaTransactionEnd` returns true, this - /// marks that this log is complete and the replication can start. + /// If the StateDelta type is `TRANSACTION_COMMIT` it will start + /// replicating, and if the type is `TRANSACTION_ABORT` it will delete the + /// log from buffer. void Emplace(const database::StateDelta &delta); private: @@ -117,8 +125,6 @@ class RaftServer final : public RaftInterface { logs_; RaftServer *raft_server_{nullptr}; - - bool IsStateDeltaTransactionEnd(const database::StateDelta &delta); }; mutable std::mutex lock_; ///< Guards all internal state. @@ -129,6 +135,7 @@ class RaftServer final : public RaftInterface { Config config_; ///< Raft config. Coordination *coordination_{nullptr}; ///< Cluster coordination. + database::StateDeltaApplier *delta_applier_{nullptr}; Mode mode_; ///< Server's current mode. uint16_t server_id_; ///< ID of the current server. diff --git a/src/transactions/single_node_ha/engine.cpp b/src/transactions/single_node_ha/engine.cpp index b7c633daf..3f6c237f5 100644 --- a/src/transactions/single_node_ha/engine.cpp +++ b/src/transactions/single_node_ha/engine.cpp @@ -78,10 +78,16 @@ CommandId Engine::UpdateCommand(TransactionId id) { void Engine::Commit(const Transaction &t) { VLOG(11) << "[Tx] Commiting transaction " << t.id_; + raft_->Emplace(database::StateDelta::TxCommit(t.id_)); + + // Wait for Raft to receive confirmation from the majority of followers. + while (!raft_->HasCommitted(t.id_)) { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + std::lock_guard guard(lock_); clog_->set_committed(t.id_); active_.remove(t.id_); - raft_->Emplace(database::StateDelta::TxCommit(t.id_)); store_.erase(store_.find(t.id_)); if (t.blocking()) { accepting_transactions_.store(true); diff --git a/tests/unit/transaction_engine_single_node_ha.cpp b/tests/unit/transaction_engine_single_node_ha.cpp index 2b980c959..002800b68 100644 --- a/tests/unit/transaction_engine_single_node_ha.cpp +++ b/tests/unit/transaction_engine_single_node_ha.cpp @@ -16,6 +16,10 @@ class RaftMock final : public raft::RaftInterface { log_[delta.transaction_id].emplace_back(std::move(delta)); } + bool HasCommitted(const tx::TransactionId &tx_id) override { + return true; + } + std::vector GetLogForTx( const tx::TransactionId &tx_id) { return log_[tx_id];