Issue NO_OP StateDeltas on leader change

Summary:
Creating Raft noop logs on leader change will trigger the whole
log replication procedure that ends up committing/applying state deltas on newly
elected leaders that didn't receive the last commit index from the previous
leader.

I also included a small tweak that won't trigger add logs when a transaction
contains only BEGIN and ABORT StateDeltas, because we don't want to replicate
read queries.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1785
This commit is contained in:
Matija Santl 2019-01-03 14:53:03 +01:00
parent 0f8571a926
commit 363cdb8b88
6 changed files with 37 additions and 11 deletions

View File

@ -133,4 +133,10 @@ void GraphDb::Reset() {
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);
}
void GraphDb::NoOpCreate(void) {
auto dba = this->Access();
raft()->Emplace(database::StateDelta::NoOp(dba->transaction_id()));
dba->Commit();
}
} // namespace database

View File

@ -149,6 +149,9 @@ class GraphDb {
}
}
private:
void NoOpCreate(void);
protected:
Stat stat_;
@ -171,7 +174,9 @@ class GraphDb {
raft::Config::LoadFromFile(config_.raft_config_file),
&coordination_,
&delta_applier_,
[this]() { this->Reset(); }};
[this]() { this->Reset(); },
[this]() { this->NoOpCreate(); },
};
tx::Engine tx_engine_{&raft_server_};
std::unique_ptr<StorageGc> storage_gc_ = std::make_unique<StorageGc>(
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);

View File

@ -126,8 +126,8 @@ StateDelta StateDelta::DropIndex(tx::TransactionId tx_id, storage::Label label,
return op;
}
StateDelta StateDelta::NoOp() {
StateDelta op(StateDelta::Type::NO_OP);
StateDelta StateDelta::NoOp(tx::TransactionId tx_id) {
StateDelta op(StateDelta::Type::NO_OP, tx_id);
return op;
}

View File

@ -99,7 +99,6 @@ omitted in the comment.")
(:serialize))
#>cpp
StateDelta() = default;
StateDelta(const enum Type &type) : type(type) {}
StateDelta(const enum Type &type, tx::TransactionId tx_id)
: type(type), transaction_id(tx_id) {}
@ -152,7 +151,7 @@ omitted in the comment.")
storage::Property property,
const std::string &property_name);
static StateDelta NoOp();
static StateDelta NoOp(tx::TransactionId tx_id);
/// Applies CRUD delta to database accessor. Fails on other types of deltas
void Apply(GraphDbAccessor &dba) const;

View File

@ -23,7 +23,8 @@ const std::string kRaftDir = "raft";
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, Coordination *coordination,
database::StateDeltaApplier *delta_applier,
std::function<void(void)> reset_callback)
std::function<void(void)> reset_callback,
std::function<void(void)> no_op_create_callback)
: config_(config),
coordination_(coordination),
delta_applier_(delta_applier),
@ -31,7 +32,8 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
mode_(Mode::FOLLOWER),
server_id_(server_id),
disk_storage_(fs::path(durability_dir) / kRaftDir),
reset_callback_(reset_callback) {}
reset_callback_(reset_callback),
no_op_create_callback_(no_op_create_callback) {}
void RaftServer::Start() {
// Persistent storage initialization/recovery.
@ -270,6 +272,14 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
log.emplace_back(std::move(delta));
logs_.erase(it);
// Make sure that this wasn't a read query (contains transaction begin and
// commit).
if (log.size() == 2) {
DCHECK(log[0].type == database::StateDelta::Type::TRANSACTION_BEGIN)
<< "Raft log of size two doesn't start with TRANSACTION_BEGIN";
return;
}
raft_server_->AppendToLog(tx_id, log);
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id);
@ -347,8 +357,10 @@ void RaftServer::Transition(const Mode &new_mode) {
mode_ = Mode::LEADER;
// TODO(ipaljak): Implement no-op replication. For now, we are only
// sending heartbeats.
// no_op_create_callback_ will create a new transaction that has a NO_OP
// StateDelta. This will trigger the whole procedure of replicating logs
// in our implementation of Raft.
no_op_create_callback_();
break;
}
}

View File

@ -57,7 +57,8 @@ class RaftServer final : public RaftInterface {
RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, raft::Coordination *coordination,
database::StateDeltaApplier *delta_applier,
std::function<void(void)> reset_callback);
std::function<void(void)> reset_callback,
std::function<void(void)> no_op_create);
/// Starts the RPC servers and starts mechanisms inside Raft protocol.
void Start();
@ -99,7 +100,7 @@ class RaftServer final : public RaftInterface {
/// Buffers incomplete Raft logs.
///
/// A Raft log is considered to be complete if it ends with a StateDelta
/// that represents transaction commit;
/// that represents transaction commit.
/// LogEntryBuffer will be used instead of WriteAheadLog. We don't need to
/// persist logs until we receive a majority vote from the Raft cluster, and
/// apply the to our local state machine(storage).
@ -212,6 +213,9 @@ class RaftServer final : public RaftInterface {
/// Callback that needs to be called to reset the db state.
std::function<void(void)> reset_callback_;
/// Callback that creates a new transaction with NO_OP StateDelta.
std::function<void(void)> no_op_create_callback_;
/// Makes a transition to a new `raft::Mode`.
///
/// throws InvalidTransitionException when transitioning between incompatible