From 363cdb8b88ac2a5305bf56bf0e1d463624599295 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Thu, 3 Jan 2019 14:53:03 +0100 Subject: [PATCH] Issue NO_OP StateDeltas on leader change Summary: Creating Raft noop logs on leader change will trigger the whole log replication procedure that ends up committing/applying state deltas on newly elected leaders that didn't receive the last commit index from the previous leader. I also included a small tweak that won't trigger add logs when a transaction contains only BEGIN and ABORT StateDeltas, because we don't want to replicate read queries. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1785 --- src/database/single_node_ha/graph_db.cpp | 6 ++++++ src/database/single_node_ha/graph_db.hpp | 7 ++++++- src/durability/single_node_ha/state_delta.cpp | 4 ++-- src/durability/single_node_ha/state_delta.lcp | 3 +-- src/raft/raft_server.cpp | 20 +++++++++++++++---- src/raft/raft_server.hpp | 8 ++++++-- 6 files changed, 37 insertions(+), 11 deletions(-) diff --git a/src/database/single_node_ha/graph_db.cpp b/src/database/single_node_ha/graph_db.cpp index 782b14062..01f0019f2 100644 --- a/src/database/single_node_ha/graph_db.cpp +++ b/src/database/single_node_ha/graph_db.cpp @@ -133,4 +133,10 @@ void GraphDb::Reset() { *storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec); } +void GraphDb::NoOpCreate(void) { + auto dba = this->Access(); + raft()->Emplace(database::StateDelta::NoOp(dba->transaction_id())); + dba->Commit(); +} + } // namespace database diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index 511e1fbcb..e7bde4b86 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -149,6 +149,9 @@ class GraphDb { } } + private: + void NoOpCreate(void); + protected: Stat stat_; @@ -171,7 +174,9 @@ class GraphDb { raft::Config::LoadFromFile(config_.raft_config_file), &coordination_, &delta_applier_, - [this]() { this->Reset(); }}; + [this]() { this->Reset(); }, + [this]() { this->NoOpCreate(); }, + }; tx::Engine tx_engine_{&raft_server_}; std::unique_ptr storage_gc_ = std::make_unique( *storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec); diff --git a/src/durability/single_node_ha/state_delta.cpp b/src/durability/single_node_ha/state_delta.cpp index 32f1a0f52..3570af079 100644 --- a/src/durability/single_node_ha/state_delta.cpp +++ b/src/durability/single_node_ha/state_delta.cpp @@ -126,8 +126,8 @@ StateDelta StateDelta::DropIndex(tx::TransactionId tx_id, storage::Label label, return op; } -StateDelta StateDelta::NoOp() { - StateDelta op(StateDelta::Type::NO_OP); +StateDelta StateDelta::NoOp(tx::TransactionId tx_id) { + StateDelta op(StateDelta::Type::NO_OP, tx_id); return op; } diff --git a/src/durability/single_node_ha/state_delta.lcp b/src/durability/single_node_ha/state_delta.lcp index dacf337a5..00f9bd1ac 100644 --- a/src/durability/single_node_ha/state_delta.lcp +++ b/src/durability/single_node_ha/state_delta.lcp @@ -99,7 +99,6 @@ omitted in the comment.") (:serialize)) #>cpp StateDelta() = default; - StateDelta(const enum Type &type) : type(type) {} StateDelta(const enum Type &type, tx::TransactionId tx_id) : type(type), transaction_id(tx_id) {} @@ -152,7 +151,7 @@ omitted in the comment.") storage::Property property, const std::string &property_name); - static StateDelta NoOp(); + static StateDelta NoOp(tx::TransactionId tx_id); /// Applies CRUD delta to database accessor. Fails on other types of deltas void Apply(GraphDbAccessor &dba) const; diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 7a66b2977..f3b89d37e 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -23,7 +23,8 @@ const std::string kRaftDir = "raft"; RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, const Config &config, Coordination *coordination, database::StateDeltaApplier *delta_applier, - std::function reset_callback) + std::function reset_callback, + std::function no_op_create_callback) : config_(config), coordination_(coordination), delta_applier_(delta_applier), @@ -31,7 +32,8 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir, mode_(Mode::FOLLOWER), server_id_(server_id), disk_storage_(fs::path(durability_dir) / kRaftDir), - reset_callback_(reset_callback) {} + reset_callback_(reset_callback), + no_op_create_callback_(no_op_create_callback) {} void RaftServer::Start() { // Persistent storage initialization/recovery. @@ -270,6 +272,14 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) { log.emplace_back(std::move(delta)); logs_.erase(it); + // Make sure that this wasn't a read query (contains transaction begin and + // commit). + if (log.size() == 2) { + DCHECK(log[0].type == database::StateDelta::Type::TRANSACTION_BEGIN) + << "Raft log of size two doesn't start with TRANSACTION_BEGIN"; + return; + } + raft_server_->AppendToLog(tx_id, log); } else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) { auto it = logs_.find(tx_id); @@ -347,8 +357,10 @@ void RaftServer::Transition(const Mode &new_mode) { mode_ = Mode::LEADER; - // TODO(ipaljak): Implement no-op replication. For now, we are only - // sending heartbeats. + // no_op_create_callback_ will create a new transaction that has a NO_OP + // StateDelta. This will trigger the whole procedure of replicating logs + // in our implementation of Raft. + no_op_create_callback_(); break; } } diff --git a/src/raft/raft_server.hpp b/src/raft/raft_server.hpp index 92a6e842f..b42617c3b 100644 --- a/src/raft/raft_server.hpp +++ b/src/raft/raft_server.hpp @@ -57,7 +57,8 @@ class RaftServer final : public RaftInterface { RaftServer(uint16_t server_id, const std::string &durability_dir, const Config &config, raft::Coordination *coordination, database::StateDeltaApplier *delta_applier, - std::function reset_callback); + std::function reset_callback, + std::function no_op_create); /// Starts the RPC servers and starts mechanisms inside Raft protocol. void Start(); @@ -99,7 +100,7 @@ class RaftServer final : public RaftInterface { /// Buffers incomplete Raft logs. /// /// A Raft log is considered to be complete if it ends with a StateDelta - /// that represents transaction commit; + /// that represents transaction commit. /// LogEntryBuffer will be used instead of WriteAheadLog. We don't need to /// persist logs until we receive a majority vote from the Raft cluster, and /// apply the to our local state machine(storage). @@ -212,6 +213,9 @@ class RaftServer final : public RaftInterface { /// Callback that needs to be called to reset the db state. std::function reset_callback_; + /// Callback that creates a new transaction with NO_OP StateDelta. + std::function no_op_create_callback_; + /// Makes a transition to a new `raft::Mode`. /// /// throws InvalidTransitionException when transitioning between incompatible