Refactor StateDeltaApplier for HA

Summary:
The whole `StateDeltaApplier` implementation was unnecessary. Fixing
this.

Reviewers: mferencevic, ipaljak

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1942
This commit is contained in:
Matija Santl 2019-04-03 16:11:33 +02:00
parent 1124b8d371
commit 6a9acb717d
6 changed files with 37 additions and 105 deletions

View File

@ -266,7 +266,6 @@ set(mg_single_node_ha_sources
database/single_node_ha/config.cpp
database/single_node_ha/graph_db.cpp
database/single_node_ha/graph_db_accessor.cpp
database/single_node_ha/state_delta_applier.cpp
durability/single_node_ha/state_delta.cpp
durability/single_node_ha/paths.cpp
durability/single_node_ha/snapshooter.cpp

View File

@ -7,7 +7,6 @@
#include <vector>
#include "database/single_node_ha/counters.hpp"
#include "database/single_node_ha/state_delta_applier.hpp"
#include "io/network/endpoint.hpp"
#include "raft/coordination.hpp"
#include "raft/raft_server.hpp"
@ -159,14 +158,12 @@ class GraphDb {
config_.rpc_num_server_workers, config_.rpc_num_client_workers,
config_.server_id,
raft::Coordination::LoadFromFile(config_.coordination_config_file)};
database::StateDeltaApplier delta_applier_{this};
raft::RaftServer raft_server_{
config_.server_id,
config_.durability_directory,
config_.db_recover_on_startup,
raft::Config::LoadFromFile(config_.raft_config_file),
&coordination_,
&delta_applier_,
this};
raft::StorageInfo storage_info_{this, &coordination_, config_.server_id};

View File

@ -1,53 +0,0 @@
#include "database/single_node_ha/state_delta_applier.hpp"
#include "database/single_node_ha/graph_db_accessor.hpp"
#include "utils/exceptions.hpp"
namespace database {
StateDeltaApplier::StateDeltaApplier(GraphDb *db) : db_(db) {}
void StateDeltaApplier::Apply(const std::vector<StateDelta> &deltas) {
for (auto &delta : deltas) {
switch (delta.type) {
case StateDelta::Type::TRANSACTION_BEGIN:
Begin(delta.transaction_id);
break;
case StateDelta::Type::TRANSACTION_COMMIT:
Commit(delta.transaction_id);
break;
case StateDelta::Type::TRANSACTION_ABORT:
LOG(FATAL) << "StateDeltaApplier shouldn't know about aborted "
"transactions";
break;
default:
delta.Apply(*GetAccessor(delta.transaction_id));
}
}
}
void StateDeltaApplier::Begin(const tx::TransactionId &tx_id) {
CHECK(accessors_.find(tx_id) == accessors_.end())
<< "Double transaction start";
accessors_.emplace(tx_id, db_->Access());
}
void StateDeltaApplier::Abort(const tx::TransactionId &tx_id) {
GetAccessor(tx_id)->Abort();
accessors_.erase(accessors_.find(tx_id));
}
void StateDeltaApplier::Commit(const tx::TransactionId &tx_id) {
GetAccessor(tx_id)->Commit();
accessors_.erase(accessors_.find(tx_id));
}
GraphDbAccessor *StateDeltaApplier::GetAccessor(
const tx::TransactionId &tx_id) {
auto found = accessors_.find(tx_id);
CHECK(found != accessors_.end())
<< "Accessor does not exist for transaction: " << tx_id;
return found->second.get();
}
} // namespace database

View File

@ -1,36 +0,0 @@
/// @file
#pragma once
#include <unordered_map>
#include <vector>
#include "durability/single_node_ha/state_delta.hpp"
#include "transactions/type.hpp"
namespace database {
class GraphDb;
/// Interface for accessing transactions and applying StateDeltas on machines in
/// Raft follower mode.
class StateDeltaApplier final {
public:
explicit StateDeltaApplier(GraphDb *db);
void Apply(const std::vector<StateDelta> &deltas);
private:
void Begin(const tx::TransactionId &tx_id);
void Abort(const tx::TransactionId &tx_id);
void Commit(const tx::TransactionId &tx_id);
GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id);
GraphDb *db_;
std::unordered_map<tx::TransactionId, std::unique_ptr<GraphDbAccessor>>
accessors_;
};
} // namespace database

View File

@ -34,12 +34,9 @@ const std::chrono::duration<int64_t> kSnapshotPeriod = 1s;
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
bool db_recover_on_startup, const Config &config,
Coordination *coordination,
database::StateDeltaApplier *delta_applier,
database::GraphDb *db)
Coordination *coordination, database::GraphDb *db)
: config_(config),
coordination_(coordination),
delta_applier_(delta_applier),
db_(db),
rlog_(std::make_unique<ReplicationLog>()),
mode_(Mode::FOLLOWER),
@ -197,7 +194,7 @@ void RaftServer::Start() {
// the entry to its state machine (in log order)
while (req.leader_commit > last_applied_ && last_applied_ + 1 < log_size_) {
++last_applied_;
delta_applier_->Apply(GetLogEntry(last_applied_).deltas);
ApplyStateDeltas(GetLogEntry(last_applied_).deltas);
}
// Respond positively to a heartbeat.
@ -541,7 +538,7 @@ void RaftServer::Transition(const Mode &new_mode) {
}
for (uint64_t i = starting_index; i <= commit_index_; ++i) {
delta_applier_->Apply(GetLogEntry(i).deltas);
ApplyStateDeltas(GetLogEntry(i).deltas);
}
last_applied_ = commit_index_;
@ -609,7 +606,7 @@ void RaftServer::Transition(const Mode &new_mode) {
// Raft guarantees the Leader Append-Only property [Raft paper 5.2]
// so its safe to apply everything from our log into our state machine
for (int i = last_applied_ + 1; i < log_size_; ++i)
delta_applier_->Apply(GetLogEntry(i).deltas);
ApplyStateDeltas(GetLogEntry(i).deltas);
last_applied_ = log_size_ - 1;
mode_ = Mode::LEADER;
@ -1187,4 +1184,32 @@ void RaftServer::NoOpCreate() {
dba->Commit();
}
void RaftServer::ApplyStateDeltas(
const std::vector<database::StateDelta> &deltas) {
std::unique_ptr<database::GraphDbAccessor> dba = nullptr;
for (auto &delta : deltas) {
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
CHECK(!dba) << "Double transaction start";
dba = db_->Access();
break;
case database::StateDelta::Type::TRANSACTION_COMMIT:
CHECK(dba) << "Missing accessor for transaction"
<< delta.transaction_id;
dba->Commit();
dba = nullptr;
break;
case database::StateDelta::Type::TRANSACTION_ABORT:
LOG(FATAL) << "ApplyStateDeltas shouldn't know about aborted "
"transactions";
break;
default:
CHECK(dba) << "Missing accessor for transaction"
<< delta.transaction_id;
delta.Apply(*dba);
}
}
CHECK(!dba) << "StateDeltas missing commit command";
}
} // namespace raft

View File

@ -8,7 +8,6 @@
#include <unordered_map>
#include <vector>
#include "database/single_node_ha/state_delta_applier.hpp"
#include "durability/single_node_ha/state_delta.hpp"
#include "raft/config.hpp"
#include "raft/coordination.hpp"
@ -62,12 +61,10 @@ class RaftServer final : public RaftInterface {
/// startup.
/// @param config raft configuration.
/// @param coordination Abstraction for coordination between Raft servers.
/// @param delta_applier Object which is able to apply state deltas to SM.
/// @param db The current DB object.
RaftServer(uint16_t server_id, const std::string &durability_dir,
bool db_recover_on_startup, const Config &config,
raft::Coordination *coordination,
database::StateDeltaApplier *delta_applier, database::GraphDb *db);
raft::Coordination *coordination, database::GraphDb *db);
/// Starts the RPC servers and starts mechanisms inside Raft protocol.
void Start();
@ -164,7 +161,6 @@ class RaftServer final : public RaftInterface {
Config config_; ///< Raft config.
Coordination *coordination_{nullptr}; ///< Cluster coordination.
database::StateDeltaApplier *delta_applier_{nullptr};
database::GraphDb *db_{nullptr};
std::unique_ptr<ReplicationLog> rlog_{nullptr};
@ -409,5 +405,9 @@ class RaftServer final : public RaftInterface {
/// Start a new transaction with a NO-OP StateDelta.
void NoOpCreate();
/// Applies the given batch of state deltas that are representing a transacton
/// to the db.
void ApplyStateDeltas(const std::vector<database::StateDelta> &deltas);
};
} // namespace raft