From 6a9acb717deb080036204bbaf87683ff1e01cca9 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Wed, 3 Apr 2019 16:11:33 +0200 Subject: [PATCH] Refactor StateDeltaApplier for HA Summary: The whole `StateDeltaApplier` implementation was unnecessary. Fixing this. Reviewers: mferencevic, ipaljak Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1942 --- src/CMakeLists.txt | 1 - src/database/single_node_ha/graph_db.hpp | 3 -- .../single_node_ha/state_delta_applier.cpp | 53 ------------------- .../single_node_ha/state_delta_applier.hpp | 36 ------------- src/raft/raft_server.cpp | 39 +++++++++++--- src/raft/raft_server.hpp | 10 ++-- 6 files changed, 37 insertions(+), 105 deletions(-) delete mode 100644 src/database/single_node_ha/state_delta_applier.cpp delete mode 100644 src/database/single_node_ha/state_delta_applier.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index dc4315e74..a5b865e42 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -266,7 +266,6 @@ set(mg_single_node_ha_sources database/single_node_ha/config.cpp database/single_node_ha/graph_db.cpp database/single_node_ha/graph_db_accessor.cpp - database/single_node_ha/state_delta_applier.cpp durability/single_node_ha/state_delta.cpp durability/single_node_ha/paths.cpp durability/single_node_ha/snapshooter.cpp diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index 680cc7751..db74c376e 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -7,7 +7,6 @@ #include #include "database/single_node_ha/counters.hpp" -#include "database/single_node_ha/state_delta_applier.hpp" #include "io/network/endpoint.hpp" #include "raft/coordination.hpp" #include "raft/raft_server.hpp" @@ -159,14 +158,12 @@ class GraphDb { config_.rpc_num_server_workers, config_.rpc_num_client_workers, config_.server_id, raft::Coordination::LoadFromFile(config_.coordination_config_file)}; - database::StateDeltaApplier delta_applier_{this}; raft::RaftServer raft_server_{ config_.server_id, config_.durability_directory, config_.db_recover_on_startup, raft::Config::LoadFromFile(config_.raft_config_file), &coordination_, - &delta_applier_, this}; raft::StorageInfo storage_info_{this, &coordination_, config_.server_id}; diff --git a/src/database/single_node_ha/state_delta_applier.cpp b/src/database/single_node_ha/state_delta_applier.cpp deleted file mode 100644 index 846d85931..000000000 --- a/src/database/single_node_ha/state_delta_applier.cpp +++ /dev/null @@ -1,53 +0,0 @@ -#include "database/single_node_ha/state_delta_applier.hpp" - -#include "database/single_node_ha/graph_db_accessor.hpp" -#include "utils/exceptions.hpp" - -namespace database { - -StateDeltaApplier::StateDeltaApplier(GraphDb *db) : db_(db) {} - -void StateDeltaApplier::Apply(const std::vector &deltas) { - for (auto &delta : deltas) { - switch (delta.type) { - case StateDelta::Type::TRANSACTION_BEGIN: - Begin(delta.transaction_id); - break; - case StateDelta::Type::TRANSACTION_COMMIT: - Commit(delta.transaction_id); - break; - case StateDelta::Type::TRANSACTION_ABORT: - LOG(FATAL) << "StateDeltaApplier shouldn't know about aborted " - "transactions"; - break; - default: - delta.Apply(*GetAccessor(delta.transaction_id)); - } - } -} - -void StateDeltaApplier::Begin(const tx::TransactionId &tx_id) { - CHECK(accessors_.find(tx_id) == accessors_.end()) - << "Double transaction start"; - accessors_.emplace(tx_id, db_->Access()); -} - -void StateDeltaApplier::Abort(const tx::TransactionId &tx_id) { - GetAccessor(tx_id)->Abort(); - accessors_.erase(accessors_.find(tx_id)); -} - -void StateDeltaApplier::Commit(const tx::TransactionId &tx_id) { - GetAccessor(tx_id)->Commit(); - accessors_.erase(accessors_.find(tx_id)); -} - -GraphDbAccessor *StateDeltaApplier::GetAccessor( - const tx::TransactionId &tx_id) { - auto found = accessors_.find(tx_id); - CHECK(found != accessors_.end()) - << "Accessor does not exist for transaction: " << tx_id; - return found->second.get(); -} - -} // namespace database diff --git a/src/database/single_node_ha/state_delta_applier.hpp b/src/database/single_node_ha/state_delta_applier.hpp deleted file mode 100644 index 69385bf48..000000000 --- a/src/database/single_node_ha/state_delta_applier.hpp +++ /dev/null @@ -1,36 +0,0 @@ -/// @file -#pragma once - -#include -#include - -#include "durability/single_node_ha/state_delta.hpp" -#include "transactions/type.hpp" - -namespace database { - -class GraphDb; - -/// Interface for accessing transactions and applying StateDeltas on machines in -/// Raft follower mode. -class StateDeltaApplier final { - public: - explicit StateDeltaApplier(GraphDb *db); - - void Apply(const std::vector &deltas); - - private: - void Begin(const tx::TransactionId &tx_id); - - void Abort(const tx::TransactionId &tx_id); - - void Commit(const tx::TransactionId &tx_id); - - GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id); - - GraphDb *db_; - std::unordered_map> - accessors_; -}; - -} // namespace database diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index b9a4a50ef..953eb2178 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -34,12 +34,9 @@ const std::chrono::duration kSnapshotPeriod = 1s; RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, bool db_recover_on_startup, const Config &config, - Coordination *coordination, - database::StateDeltaApplier *delta_applier, - database::GraphDb *db) + Coordination *coordination, database::GraphDb *db) : config_(config), coordination_(coordination), - delta_applier_(delta_applier), db_(db), rlog_(std::make_unique()), mode_(Mode::FOLLOWER), @@ -197,7 +194,7 @@ void RaftServer::Start() { // the entry to its state machine (in log order) while (req.leader_commit > last_applied_ && last_applied_ + 1 < log_size_) { ++last_applied_; - delta_applier_->Apply(GetLogEntry(last_applied_).deltas); + ApplyStateDeltas(GetLogEntry(last_applied_).deltas); } // Respond positively to a heartbeat. @@ -541,7 +538,7 @@ void RaftServer::Transition(const Mode &new_mode) { } for (uint64_t i = starting_index; i <= commit_index_; ++i) { - delta_applier_->Apply(GetLogEntry(i).deltas); + ApplyStateDeltas(GetLogEntry(i).deltas); } last_applied_ = commit_index_; @@ -609,7 +606,7 @@ void RaftServer::Transition(const Mode &new_mode) { // Raft guarantees the Leader Append-Only property [Raft paper 5.2] // so its safe to apply everything from our log into our state machine for (int i = last_applied_ + 1; i < log_size_; ++i) - delta_applier_->Apply(GetLogEntry(i).deltas); + ApplyStateDeltas(GetLogEntry(i).deltas); last_applied_ = log_size_ - 1; mode_ = Mode::LEADER; @@ -1187,4 +1184,32 @@ void RaftServer::NoOpCreate() { dba->Commit(); } +void RaftServer::ApplyStateDeltas( + const std::vector &deltas) { + std::unique_ptr dba = nullptr; + for (auto &delta : deltas) { + switch (delta.type) { + case database::StateDelta::Type::TRANSACTION_BEGIN: + CHECK(!dba) << "Double transaction start"; + dba = db_->Access(); + break; + case database::StateDelta::Type::TRANSACTION_COMMIT: + CHECK(dba) << "Missing accessor for transaction" + << delta.transaction_id; + dba->Commit(); + dba = nullptr; + break; + case database::StateDelta::Type::TRANSACTION_ABORT: + LOG(FATAL) << "ApplyStateDeltas shouldn't know about aborted " + "transactions"; + break; + default: + CHECK(dba) << "Missing accessor for transaction" + << delta.transaction_id; + delta.Apply(*dba); + } + } + CHECK(!dba) << "StateDeltas missing commit command"; +} + } // namespace raft diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 06736106e..be9e77198 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -8,7 +8,6 @@ #include #include -#include "database/single_node_ha/state_delta_applier.hpp" #include "durability/single_node_ha/state_delta.hpp" #include "raft/config.hpp" #include "raft/coordination.hpp" @@ -62,12 +61,10 @@ class RaftServer final : public RaftInterface { /// startup. /// @param config raft configuration. /// @param coordination Abstraction for coordination between Raft servers. - /// @param delta_applier Object which is able to apply state deltas to SM. /// @param db The current DB object. RaftServer(uint16_t server_id, const std::string &durability_dir, bool db_recover_on_startup, const Config &config, - raft::Coordination *coordination, - database::StateDeltaApplier *delta_applier, database::GraphDb *db); + raft::Coordination *coordination, database::GraphDb *db); /// Starts the RPC servers and starts mechanisms inside Raft protocol. void Start(); @@ -164,7 +161,6 @@ class RaftServer final : public RaftInterface { Config config_; ///< Raft config. Coordination *coordination_{nullptr}; ///< Cluster coordination. - database::StateDeltaApplier *delta_applier_{nullptr}; database::GraphDb *db_{nullptr}; std::unique_ptr rlog_{nullptr}; @@ -409,5 +405,9 @@ class RaftServer final : public RaftInterface { /// Start a new transaction with a NO-OP StateDelta. void NoOpCreate(); + + /// Applies the given batch of state deltas that are representing a transacton + /// to the db. + void ApplyStateDeltas(const std::vector &deltas); }; } // namespace raft