Add log compaction for Raft, pt. 1

Summary:
In this part of log compaction for raft, I've implemented snapshooting
and snapshot recovery. I've also refactored the code a bit, so `RaftServer` now
has a pointer to the `GraphDb` and it can do some things by itself.

Log compaction requires some further work. Since snapshooting isn't synchronous
between peers, and each peer can work at their own pace, once we've compacted
the log so that the next log to be sent to peer `x` isn't available anymore, we
need to send the snapshot over the wire. This means that the next part will
contain the `InstallSnapshotRPC` and then maybe one more that will implement the
logic of sending `LogEntry` or the whole snapshot.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1834
This commit is contained in:
Matija Santl 2019-01-16 10:40:06 +01:00
parent 7542f5b0ba
commit da95cbf4ec
16 changed files with 511 additions and 221 deletions

View File

@ -257,6 +257,7 @@ set(mg_single_node_ha_sources
durability/single_node_ha/state_delta.cpp
durability/single_node_ha/paths.cpp
durability/single_node_ha/snapshooter.cpp
durability/single_node_ha/recovery.cpp
glue/auth.cpp
glue/communication.cpp
raft/coordination.cpp

View File

@ -8,14 +8,7 @@
DEFINE_string(
durability_directory, "durability",
"Path to directory in which to save snapshots and write-ahead log files.");
DEFINE_bool(db_recover_on_startup, false, "Recover database on startup.");
DEFINE_VALIDATED_int32(
snapshot_cycle_sec, 3600,
"Amount of time between two snapshots, in seconds (min 60).",
FLAG_IN_RANGE(1, std::numeric_limits<int32_t>::max()));
DEFINE_int32(snapshot_max_retained, -1,
"Number of retained snapshots, -1 means without limit.");
DEFINE_bool(snapshot_on_exit, false, "Snapshot on exiting the database.");
DEFINE_bool(db_recover_on_startup, true, "Recover database on startup.");
// Misc flags
DEFINE_int32(query_execution_time_sec, 180,
@ -55,9 +48,6 @@ database::Config::Config()
// Durability flags.
: durability_directory{FLAGS_durability_directory},
db_recover_on_startup{FLAGS_db_recover_on_startup},
snapshot_cycle_sec{FLAGS_snapshot_cycle_sec},
snapshot_max_retained{FLAGS_snapshot_max_retained},
snapshot_on_exit{FLAGS_snapshot_on_exit},
// Misc flags.
gc_cycle_sec{FLAGS_gc_cycle_sec},
query_execution_time_sec{FLAGS_query_execution_time_sec},

View File

@ -6,12 +6,9 @@
#include "database/single_node_ha/counters.hpp"
#include "database/single_node_ha/graph_db_accessor.hpp"
#include "durability/single_node_ha/paths.hpp"
#include "durability/single_node_ha/snapshooter.hpp"
#include "storage/single_node_ha/concurrent_id_mapper.hpp"
#include "storage/single_node_ha/storage_gc.hpp"
#include "transactions/single_node_ha/engine.hpp"
#include "utils/file.hpp"
namespace database {
@ -45,18 +42,12 @@ GraphDb::~GraphDb() {}
bool GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
bool ret =
coordination_.AwaitShutdown([this, &call_before_shutdown]() -> bool {
snapshot_creator_ = nullptr;
is_accepting_transactions_ = false;
tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
call_before_shutdown();
if (config_.snapshot_on_exit) {
auto dba = this->Access();
MakeSnapshot(*dba);
}
return true;
});
@ -108,18 +99,6 @@ database::Counters &GraphDb::counters() { return counters_; }
void GraphDb::CollectGarbage() { storage_gc_->CollectGarbage(); }
bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) {
const bool status = durability::MakeSnapshot(
*this, accessor, fs::path(config_.durability_directory),
config_.snapshot_max_retained);
if (status) {
LOG(INFO) << "Snapshot created successfully.";
} else {
LOG(ERROR) << "Snapshot creation failed!";
}
return status;
}
void GraphDb::Reset() {
// Release gc scheduler to stop it from touching storage.
storage_gc_ = nullptr;
@ -133,10 +112,4 @@ void GraphDb::Reset() {
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);
}
void GraphDb::NoOpCreate(void) {
auto dba = this->Access();
raft()->Emplace(database::StateDelta::NoOp(dba->transaction_id()));
dba->Commit();
}
} // namespace database

View File

@ -44,9 +44,6 @@ struct Config {
// Durability flags.
std::string durability_directory;
bool db_recover_on_startup;
int snapshot_cycle_sec;
int snapshot_max_retained;
int snapshot_on_exit;
// Misc flags.
int gc_cycle_sec;
@ -118,9 +115,6 @@ class GraphDb {
database::Counters &counters();
void CollectGarbage();
/// Makes a snapshot from the visibility of the given accessor
bool MakeSnapshot(GraphDbAccessor &accessor);
/// Releases the storage object safely and creates a new object, resets the tx
/// engine.
///
@ -149,15 +143,11 @@ class GraphDb {
}
}
private:
void NoOpCreate(void);
protected:
Stat stat_;
std::atomic<bool> is_accepting_transactions_{true};
std::unique_ptr<utils::Scheduler> snapshot_creator_;
utils::Scheduler transaction_killer_;
Config config_;
@ -171,12 +161,12 @@ class GraphDb {
raft::RaftServer raft_server_{
config_.server_id,
config_.durability_directory,
config_.db_recover_on_startup,
raft::Config::LoadFromFile(config_.raft_config_file),
&coordination_,
&delta_applier_,
[this]() { this->Reset(); },
[this]() { this->NoOpCreate(); },
};
this};
tx::Engine tx_engine_{&raft_server_};
std::unique_ptr<StorageGc> storage_gc_ = std::make_unique<StorageGc>(
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);

View File

@ -14,50 +14,6 @@ namespace durability {
namespace fs = std::experimental::filesystem;
std::experimental::optional<tx::TransactionId> TransactionIdFromWalFilename(
const std::string &name) {
auto nullopt = std::experimental::nullopt;
// Get the max_transaction_id from the file name that has format
// "XXXXX__max_transaction_<MAX_TRANS_ID>"
auto file_name_split = utils::RSplit(name, "__", 1);
if (file_name_split.size() != 2) {
LOG(WARNING) << "Unable to parse WAL file name: " << name;
return nullopt;
}
if (utils::StartsWith(file_name_split[1], "current"))
return std::numeric_limits<tx::TransactionId>::max();
file_name_split = utils::Split(file_name_split[1], "_");
if (file_name_split.size() != 3) {
LOG(WARNING) << "Unable to parse WAL file name: " << name;
return nullopt;
}
auto &tx_id_str = file_name_split[2];
try {
return std::stoll(tx_id_str);
} catch (std::invalid_argument &) {
LOG(WARNING) << "Unable to parse WAL file name tx ID: " << tx_id_str;
return nullopt;
} catch (std::out_of_range &) {
LOG(WARNING) << "WAL file name tx ID too large: " << tx_id_str;
return nullopt;
}
}
/// Generates a file path for a write-ahead log file. If given a transaction ID
/// the file name will contain it. Otherwise the file path is for the "current"
/// WAL file for which the max tx id is still unknown.
fs::path WalFilenameForTransactionId(
const std::experimental::filesystem::path &wal_dir,
std::experimental::optional<tx::TransactionId> tx_id) {
auto file_name = utils::Timestamp::Now().ToIso8601();
if (tx_id) {
file_name += "__max_transaction_" + std::to_string(*tx_id);
} else {
file_name += "__current";
}
return wal_dir / file_name;
}
fs::path MakeSnapshotPath(const fs::path &durability_dir,
tx::TransactionId tx_id) {
std::string date_str =

View File

@ -7,26 +7,8 @@
namespace durability {
const std::string kSnapshotDir = "snapshots";
const std::string kWalDir = "wal";
const std::string kBackupDir = ".backup";
/// Returns the transaction id contained in the file name. If the filename is
/// not a parseable WAL file name, nullopt is returned. If the filename
/// represents the "current" WAL file, then the maximum possible transaction ID
/// is returned because that's appropriate for the recovery logic (the current
/// WAL does not yet have a maximum transaction ID and can't be discarded by
/// the recovery regardless of the snapshot from which the transaction starts).
std::experimental::optional<tx::TransactionId> TransactionIdFromWalFilename(
const std::string &name);
/// Generates a file path for a write-ahead log file. If given a transaction ID
/// the file name will contain it. Otherwise the file path is for the "current"
/// WAL file for which the max tx id is still unknown.
std::experimental::filesystem::path WalFilenameForTransactionId(
const std::experimental::filesystem::path &wal_dir,
std::experimental::optional<tx::TransactionId> tx_id =
std::experimental::nullopt);
/// Generates a path for a DB snapshot in the given folder in a well-defined
/// sortable format with transaction from which the snapshot is created appended
/// to the file name.
@ -35,7 +17,7 @@ std::experimental::filesystem::path MakeSnapshotPath(
tx::TransactionId tx_id);
/// Returns the transaction id contained in the file name. If the filename is
/// not a parseable WAL file name, nullopt is returned.
/// not a parseable snapshot file name, nullopt is returned.
std::experimental::optional<tx::TransactionId>
TransactionIdFromSnapshotFilename(const std::string &name);
} // namespace durability

View File

@ -0,0 +1,192 @@
#include "durability/single_node_ha/recovery.hpp"
#include <experimental/filesystem>
#include <experimental/optional>
#include <limits>
#include <unordered_map>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/single_node_ha/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/single_node_ha/paths.hpp"
#include "durability/single_node_ha/version.hpp"
#include "glue/communication.hpp"
#include "storage/single_node_ha/indexes/label_property_index.hpp"
#include "transactions/type.hpp"
#include "utils/algorithm.hpp"
#include "utils/file.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
using communication::bolt::Value;
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash) {
auto pos = buffer.Tellg();
auto offset = sizeof(vertex_count) + sizeof(edge_count) + sizeof(hash);
buffer.Seek(-offset, std::ios_base::end);
bool r_val = buffer.ReadType(vertex_count, false) &&
buffer.ReadType(edge_count, false) &&
buffer.ReadType(hash, false);
buffer.Seek(pos);
return r_val;
}
namespace {
using communication::bolt::Value;
#define RETURN_IF_NOT(condition) \
if (!(condition)) { \
reader.Close(); \
return false; \
}
bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
RecoveryData *recovery_data) {
HashedFileReader reader;
communication::bolt::Decoder<HashedFileReader> decoder(reader);
RETURN_IF_NOT(reader.Open(snapshot_file));
auto magic_number = durability::kSnapshotMagic;
reader.Read(magic_number.data(), magic_number.size());
RETURN_IF_NOT(magic_number == durability::kSnapshotMagic);
// Read the vertex and edge count, and the hash, from the end of the snapshot.
int64_t vertex_count;
int64_t edge_count;
uint64_t hash;
RETURN_IF_NOT(
durability::ReadSnapshotSummary(reader, vertex_count, edge_count, hash));
Value dv;
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) &&
dv.ValueInt() == durability::kVersion);
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int));
recovery_data->snapshooter_tx_id = dv.ValueInt();
// Transaction snapshot of the transaction that created the snapshot.
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::List));
for (const auto &value : dv.ValueList()) {
RETURN_IF_NOT(value.IsInt());
recovery_data->snapshooter_tx_snapshot.emplace_back(value.ValueInt());
}
// A list of label+property indexes.
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::List));
auto index_value = dv.ValueList();
for (auto it = index_value.begin(); it != index_value.end();) {
auto label = *it++;
RETURN_IF_NOT(it != index_value.end());
auto property = *it++;
RETURN_IF_NOT(it != index_value.end());
auto unique = *it++;
RETURN_IF_NOT(label.IsString() && property.IsString() && unique.IsBool());
recovery_data->indexes.emplace_back(
IndexRecoveryData{label.ValueString(), property.ValueString(),
/*create = */ true, unique.ValueBool()});
}
auto dba = db->Access();
std::unordered_map<uint64_t, VertexAccessor> vertices;
for (int64_t i = 0; i < vertex_count; ++i) {
Value vertex_dv;
RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, Value::Type::Vertex));
auto &vertex = vertex_dv.ValueVertex();
auto vertex_accessor = dba->InsertVertex(vertex.id.AsUint());
for (const auto &label : vertex.labels) {
vertex_accessor.add_label(dba->Label(label));
}
for (const auto &property_pair : vertex.properties) {
vertex_accessor.PropsSet(dba->Property(property_pair.first),
glue::ToPropertyValue(property_pair.second));
}
vertices.insert({vertex.id.AsUint(), vertex_accessor});
}
for (int64_t i = 0; i < edge_count; ++i) {
Value edge_dv;
RETURN_IF_NOT(decoder.ReadValue(&edge_dv, Value::Type::Edge));
auto &edge = edge_dv.ValueEdge();
auto it_from = vertices.find(edge.from.AsUint());
auto it_to = vertices.find(edge.to.AsUint());
RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end());
auto edge_accessor =
dba->InsertEdge(it_from->second, it_to->second,
dba->EdgeType(edge.type), edge.id.AsUint());
for (const auto &property_pair : edge.properties)
edge_accessor.PropsSet(dba->Property(property_pair.first),
glue::ToPropertyValue(property_pair.second));
}
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
reader.ReadType(vertex_count);
reader.ReadType(edge_count);
if (!reader.Close() || reader.hash() != hash) {
dba->Abort();
return false;
}
// Ensure that the next transaction ID in the recovered DB will be greater
// than the latest one we have recovered. Do this to make sure that
// subsequently created snapshots files will have transactional info
// that does not interfere with that found in previous snapshots.
tx::TransactionId max_id = recovery_data->snapshooter_tx_id;
auto &snap = recovery_data->snapshooter_tx_snapshot;
if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end());
dba->db().tx_engine().EnsureNextIdGreater(max_id);
dba->Commit();
return true;
}
#undef RETURN_IF_NOT
} // anonymous namespace
bool RecoverOnlySnapshot(const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data) {
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
const auto snapshot_dir = durability_dir / kSnapshotDir;
std::vector<fs::path> snapshot_files;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
for (auto &file : fs::directory_iterator(snapshot_dir))
snapshot_files.emplace_back(file);
if (snapshot_files.size() != 1) {
LOG(WARNING) << "Expected only one snapshot file for recovery!";
return false;
}
auto snapshot_file = *snapshot_files.begin();
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
LOG(WARNING) << "Snapshot recovery failed";
return false;
}
LOG(INFO) << "Snapshot recovery successful.";
return true;
}
void RecoverIndexes(database::GraphDb *db,
const std::vector<IndexRecoveryData> &indexes) {
auto dba = db->Access();
for (const auto &index : indexes) {
auto label = dba->Label(index.label);
auto property = dba->Property(index.property);
if (index.create) {
dba->BuildIndex(label, property, index.unique);
} else {
dba->DeleteIndex(label, property);
}
}
dba->Commit();
}
} // namespace durability

View File

@ -0,0 +1,62 @@
#pragma once
#include <experimental/filesystem>
#include <experimental/optional>
#include <unordered_map>
#include <vector>
#include "durability/hashed_file_reader.hpp"
#include "durability/single_node_ha/state_delta.hpp"
#include "transactions/type.hpp"
namespace database {
class GraphDb;
};
namespace durability {
struct IndexRecoveryData {
std::string label;
std::string property;
bool create; // distinguish between creating and dropping index
bool unique; // used only when creating an index
};
/// Data structure for exchanging info between main recovery function and
/// snapshot recovery functions.
struct RecoveryData {
tx::TransactionId snapshooter_tx_id{0};
std::vector<tx::TransactionId> snapshooter_tx_snapshot;
// A collection into which the indexes should be added so they
// can be rebuilt at the end of the recovery transaction.
std::vector<IndexRecoveryData> indexes;
void Clear() {
snapshooter_tx_id = 0;
snapshooter_tx_snapshot.clear();
indexes.clear();
}
};
/// Reads snapshot metadata from the end of the file without messing up the
/// hash.
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash);
/**
* Recovers database from the latest possible snapshot. If recovering fails,
* false is returned and db_accessor aborts transaction, else true is returned
* and transaction is commited.
*
* @param durability_dir - Path to durability directory.
* @param db - The database to recover into.
* @return - recovery info
*/
bool RecoverOnlySnapshot(
const std::experimental::filesystem::path &durability_dir,
database::GraphDb *db, durability::RecoveryData *recovery_data);
void RecoverIndexes(database::GraphDb *db,
const std::vector<IndexRecoveryData> &indexes);
} // namespace durability

View File

@ -77,51 +77,31 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db,
return true;
}
// Removes snapshot files so that only `max_retained` latest ones are kept. If
// `max_retained == -1`, all the snapshots are retained.
void RemoveOldSnapshots(const fs::path &snapshot_dir, int max_retained) {
if (max_retained == -1) return;
/// Remove old snapshots but leave at most `keep` number of latest ones.
void RemoveOldSnapshots(const fs::path &snapshot_dir, uint16_t keep) {
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(snapshot_dir))
files.push_back(file.path());
if (static_cast<int>(files.size()) <= max_retained) return;
if (static_cast<uint16_t>(files.size()) <= keep) return;
sort(files.begin(), files.end());
for (int i = 0; i < static_cast<int>(files.size()) - max_retained; ++i) {
for (int i = 0; i < static_cast<uint16_t>(files.size()) - keep; ++i) {
if (!fs::remove(files[i])) {
LOG(ERROR) << "Error while removing file: " << files[i];
}
}
}
// Removes write-ahead log files that are no longer necessary (they don't get
// used when recovering from the latest snapshot.
void RemoveOldWals(const fs::path &wal_dir,
const tx::Transaction &snapshot_transaction) {
if (!fs::exists(wal_dir)) return;
// We can remove all the WAL files that will not be used when restoring from
// the snapshot created in the given transaction.
auto min_trans_id = snapshot_transaction.snapshot().empty()
? snapshot_transaction.id_ + 1
: snapshot_transaction.snapshot().front();
for (auto &wal_file : fs::directory_iterator(wal_dir)) {
auto tx_id = TransactionIdFromWalFilename(wal_file.path().filename());
if (tx_id && tx_id.value() < min_trans_id) {
bool result = fs::remove(wal_file);
DCHECK(result) << "Unable to delete old wal file: " << wal_file;
}
}
}
} // namespace
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
const fs::path &durability_dir, int snapshot_max_retained) {
const fs::path &durability_dir) {
if (!utils::EnsureDir(durability_dir / kSnapshotDir)) return false;
const auto snapshot_file =
MakeSnapshotPath(durability_dir, dba.transaction_id());
if (fs::exists(snapshot_file)) return false;
if (Encode(snapshot_file, db, dba)) {
RemoveOldSnapshots(durability_dir / kSnapshotDir, snapshot_max_retained);
RemoveOldWals(durability_dir / kWalDir, dba.transaction());
// Only keep the latest snapshot.
RemoveOldSnapshots(durability_dir / kSnapshotDir, 1);
return true;
} else {
std::error_code error_code; // Just for exception suppression.
@ -130,4 +110,10 @@ bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
}
}
void RemoveAllSnapshots(const fs::path &durability_dir) {
auto snapshot_dir = durability_dir / kSnapshotDir;
if (!utils::EnsureDir(snapshot_dir)) return;
RemoveOldSnapshots(snapshot_dir, 0);
}
} // namespace durability

View File

@ -6,15 +6,16 @@
namespace durability {
/**
* Make snapshot and save it in snapshots folder. Returns true if successful.
* @param db - database for which we are creating a snapshot
* @param dba - db accessor with which we are creating a snapshot (reading data)
* @param durability_dir - directory where durability data is stored.
* @param snapshot_max_retained - maximum number of snapshots to retain.
*/
/// Make snapshot and save it in snapshots folder. Returns true if successful.
/// @param db - database for which we are creating a snapshot
/// @param dba - db accessor with which we are creating a snapshot (reading
/// data)
/// @param durability_dir - directory where durability data is stored.
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
const std::experimental::filesystem::path &durability_dir,
int snapshot_max_retained);
const std::experimental::filesystem::path &durability_dir);
/// Remove all snapshots inside the snapshot durability directory.
void RemoveAllSnapshots(
const std::experimental::filesystem::path &durability_dir);
} // namespace durability

View File

@ -19,7 +19,7 @@ struct Config {
std::chrono::milliseconds election_timeout_min;
std::chrono::milliseconds election_timeout_max;
std::chrono::milliseconds heartbeat_interval;
std::chrono::milliseconds replicate_timeout;
uint64_t log_size_snapshot_threshold;
static Config LoadFromFile(const std::string &raft_config_file) {
if (!std::experimental::filesystem::exists(raft_config_file))
@ -40,7 +40,7 @@ struct Config {
throw RaftConfigException(raft_config_file);
if (!data["heartbeat_interval"].is_number())
throw RaftConfigException(raft_config_file);
if (!data["replicate_timeout"].is_number())
if (!data["log_size_snapshot_threshold"].is_number())
throw RaftConfigException(raft_config_file);
return Config{
@ -49,7 +49,7 @@ struct Config {
std::chrono::duration<int64_t, std::milli>(
data["election_timeout_max"]),
std::chrono::duration<int64_t, std::milli>(data["heartbeat_interval"]),
std::chrono::duration<int64_t, std::milli>(data["replicate_timeout"])};
data["log_size_snapshot_threshold"]};
}
};

View File

@ -1,44 +1,70 @@
#include "raft/raft_server.hpp"
#include <kj/std/iostream.h>
#include <chrono>
#include <experimental/filesystem>
#include <memory>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "durability/single_node_ha/recovery.hpp"
#include "durability/single_node_ha/snapshooter.hpp"
#include "raft/exceptions.hpp"
#include "utils/exceptions.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/serialization.hpp"
#include "utils/string.hpp"
#include "utils/thread.hpp"
namespace raft {
using namespace std::literals::chrono_literals;
namespace fs = std::experimental::filesystem;
const std::string kCurrentTermKey = "current_term";
const std::string kVotedForKey = "voted_for";
const std::string kLogSizeKey = "log_size";
const std::string kLogEntryPrefix = "log_entry_";
const std::string kSnapshotMetadataKey = "snapshot_metadata";
const std::string kRaftDir = "raft";
const std::chrono::duration<int64_t> kSnapshotPeriod = 1s;
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, Coordination *coordination,
bool db_recover_on_startup, const Config &config,
Coordination *coordination,
database::StateDeltaApplier *delta_applier,
std::function<void(void)> reset_callback,
std::function<void(void)> no_op_create_callback)
database::GraphDb *db)
: config_(config),
coordination_(coordination),
delta_applier_(delta_applier),
db_(db),
rlog_(std::make_unique<ReplicationLog>()),
mode_(Mode::FOLLOWER),
server_id_(server_id),
durability_dir_(durability_dir),
db_recover_on_startup_(db_recover_on_startup),
commit_index_(0),
last_applied_(0),
disk_storage_(fs::path(durability_dir) / kRaftDir),
reset_callback_(reset_callback),
no_op_create_callback_(no_op_create_callback) {}
disk_storage_(fs::path(durability_dir) / kRaftDir) {}
void RaftServer::Start() {
if (db_recover_on_startup_) {
auto snapshot_metadata = GetSnapshotMetadata();
if (snapshot_metadata) {
RecoverSnapshot();
last_applied_ = snapshot_metadata->second;
commit_index_ = snapshot_metadata->second;
}
} else {
// We need to clear persisted data if we don't want any recovery.
disk_storage_.DeletePrefix("");
durability::RemoveAllSnapshots(durability_dir_);
}
// Persistent storage initialization
if (LogSize() == 0) {
UpdateTerm(0);
@ -102,8 +128,8 @@ void RaftServer::Start() {
Load(&req, req_reader);
// [Raft paper 5.1]
// "If a server recieves a request with a stale term,
// it rejects the request"
// "If a server receives a request with a stale term, it rejects the
// request"
uint64_t current_term = CurrentTerm();
if (req.term < current_term) {
AppendEntriesRes res(false, current_term);
@ -111,40 +137,16 @@ void RaftServer::Start() {
return;
}
// [Raft paper figure 2]
// If RPC request or response contains term T > currentTerm,
// set currentTerm = T and convert to follower.
if (req.term > current_term) {
UpdateTerm(req.term);
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
}
// [Raft thesis 3.4]
// A server remains in follower state as long as it receives valid RPCs from
// a leader or candidate.
SetNextElectionTimePoint();
election_change_.notify_all();
// [Raft paper 5.3]
// "Once a follower learns that a log entry is committed, it applies
// the entry to its state machine (in log order)
while (req.leader_commit > last_applied_ && last_applied_ + 1 < LogSize()) {
++last_applied_;
delta_applier_->Apply(GetLogEntry(last_applied_).deltas);
}
// respond positively to a heartbeat.
if (req.entries.empty()) {
AppendEntriesRes res(true, current_term);
Save(res, res_builder);
if (mode_ != Mode::FOLLOWER) {
Transition(Mode::FOLLOWER);
} else {
SetNextElectionTimePoint();
election_change_.notify_all();
}
return;
}
// Everything below is considered to be a valid RPC. This will ensure that
// after we finish processing the current request, the election timeout will
// be extended.
utils::OnScopeExit extend_election_timeout([this] {
// [Raft thesis 3.4]
// A server remains in follower state as long as it receives valid RPCs
// from a leader or candidate.
SetNextElectionTimePoint();
election_change_.notify_all();
});
// [Raft paper 5.3]
// "If a follower's log is inconsistent with the leader's, the
@ -162,7 +164,32 @@ void RaftServer::Start() {
return;
}
// [Raft paper figure 2]
// If RPC request or response contains term T > currentTerm,
// set currentTerm = T and convert to follower.
if (req.term > current_term) {
UpdateTerm(req.term);
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
}
AppendLogEntries(req.leader_commit, req.prev_log_index + 1, req.entries);
// [Raft paper 5.3]
// "Once a follower learns that a log entry is committed, it applies
// the entry to its state machine (in log order)
while (req.leader_commit > last_applied_ && last_applied_ + 1 < LogSize()) {
++last_applied_;
delta_applier_->Apply(GetLogEntry(last_applied_).deltas);
}
// Respond positively to a heartbeat.
if (req.entries.empty()) {
AppendEntriesRes res(true, current_term);
Save(res, res_builder);
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
return;
}
AppendEntriesRes res(true, current_term);
Save(res, res_builder);
});
@ -178,6 +205,8 @@ void RaftServer::Start() {
}
no_op_issuer_thread_ = std::thread(&RaftServer::NoOpIssuerThreadMain, this);
snapshot_thread_ = std::thread(&RaftServer::SnapshotThread, this);
}
void RaftServer::Shutdown() {
@ -196,6 +225,7 @@ void RaftServer::Shutdown() {
if (election_thread_.joinable()) election_thread_.join();
if (no_op_issuer_thread_.joinable()) no_op_issuer_thread_.join();
if (snapshot_thread_.joinable()) snapshot_thread_.join();
}
uint64_t RaftServer::CurrentTerm() {
@ -221,6 +251,29 @@ uint64_t RaftServer::LogSize() {
return std::stoull(opt_value.value());
}
std::experimental::optional<std::pair<uint64_t, uint64_t>>
RaftServer::GetSnapshotMetadata() {
auto opt_value = disk_storage_.Get(kSnapshotMetadataKey);
if (opt_value == std::experimental::nullopt) {
return std::experimental::nullopt;
}
auto value = utils::Split(opt_value.value(), " ");
if (value.size() != 2) {
LOG(WARNING) << "Malformed snapshot metdata";
return std::experimental::nullopt;
}
return std::make_pair(std::stoull(value[0]), std::stoull(value[1]));
}
void RaftServer::PersistSnapshotMetadata(uint64_t last_included_term,
uint64_t last_included_index) {
auto value = utils::Join(
{std::to_string(last_included_term), std::to_string(last_included_index)},
" ");
disk_storage_.Put(kSnapshotMetadataKey, value);
}
void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &deltas) {
std::unique_lock<std::mutex> lock(lock_);
@ -306,7 +359,7 @@ void RaftServer::LogEntryBuffer::Disable() {
}
void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
std::lock_guard<std::mutex> guard(buffer_lock_);
std::unique_lock<std::mutex> lock(buffer_lock_);
if (!enabled_) return;
tx::TransactionId tx_id = delta.transaction_id;
@ -317,7 +370,10 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
std::vector<database::StateDelta> log(std::move(it->second));
log.emplace_back(std::move(delta));
logs_.erase(it);
lock.unlock();
raft_server_->AppendToLog(tx_id, log);
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
@ -338,16 +394,21 @@ void RaftServer::Transition(const Mode &new_mode) {
log_entry_buffer_.Disable();
if (reset) {
VLOG(40) << "Reseting internal state";
// Temporaray freeze election timer while we do the reset.
VLOG(40) << "Resetting internal state";
// Temporary freeze election timer while we do the reset.
next_election_ = TimePoint::max();
reset_callback_();
db_->Reset();
ResetReplicationLog();
// Re-apply raft log.
// TODO(msantl): Implement snapshot recovery also!
for (int i = 1; i <= commit_index_; ++i)
auto snapshot_metadata = GetSnapshotMetadata();
uint64_t starting_index = 1;
if (snapshot_metadata) {
RecoverSnapshot();
starting_index = snapshot_metadata->second + 1;
}
for (uint64_t i = starting_index; i <= commit_index_; ++i)
delta_applier_->Apply(GetLogEntry(i).deltas);
last_applied_ = commit_index_;
}
@ -569,6 +630,7 @@ void RaftServer::ElectionThreadMain() {
}
void RaftServer::PeerThreadMain(uint16_t peer_id) {
utils::ThreadSetName(fmt::format("RaftPeer{}", peer_id));
std::unique_lock<std::mutex> lock(lock_);
/* This loop will either call a function that issues an RPC or wait on the
@ -667,6 +729,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
}
void RaftServer::NoOpIssuerThreadMain() {
utils::ThreadSetName(fmt::format("NoOpIssuer"));
std::mutex m;
auto lock = std::unique_lock<std::mutex>(m);
while (!exiting_) {
@ -674,7 +737,58 @@ void RaftServer::NoOpIssuerThreadMain() {
// no_op_create_callback_ will create a new transaction that has a NO_OP
// StateDelta. This will trigger the whole procedure of replicating logs
// in our implementation of Raft.
if (!exiting_) no_op_create_callback_();
if (!exiting_) NoOpCreate();
}
}
void RaftServer::SnapshotThread() {
utils::ThreadSetName(fmt::format("RaftSnapshot"));
while (!exiting_) {
{
std::unique_lock<std::mutex> lock(lock_);
uint64_t uncompacted_log_size = LogSize();
auto snapshot_metadata = GetSnapshotMetadata();
if (snapshot_metadata) {
uncompacted_log_size -= snapshot_metadata->second;
}
// Compare the log size to the config
if (config_.log_size_snapshot_threshold < uncompacted_log_size) {
// Create a DB accessor for snapshot creation
std::unique_ptr<database::GraphDbAccessor> dba = db_->Access();
uint64_t current_term = CurrentTerm();
uint64_t last_applied = last_applied_;
lock.unlock();
bool status =
durability::MakeSnapshot(*db_, *dba, fs::path(durability_dir_));
lock.lock();
if (status) {
uint64_t log_compaction_start_index = 1;
if (snapshot_metadata) {
log_compaction_start_index = snapshot_metadata->second + 1;
}
PersistSnapshotMetadata(current_term, last_applied);
// Log compaction.
// TODO (msantl): In order to handle log compaction correctly, we need
// to be able to send snapshots over the wire and implement additional
// logic to handle log entries that were compacted into a snapshot.
// for (int i = log_compaction_start_index; i <= last_applied_; ++i) {
// disk_storage_.Delete(LogEntryKey(i));
// }
}
lock.unlock();
// Raft lock must be released when destroying dba object.
dba = nullptr;
}
}
std::this_thread::sleep_for(kSnapshotPeriod);
}
}
@ -736,7 +850,8 @@ bool RaftServer::OutOfSync(uint64_t reply_term) {
LogEntry RaftServer::GetLogEntry(int index) {
auto opt_value = disk_storage_.Get(LogEntryKey(index));
DCHECK(opt_value != std::experimental::nullopt) << "Log index out of bounds.";
DCHECK(opt_value != std::experimental::nullopt)
<< "Log index (" << index << ") out of bounds.";
return DeserializeLogEntry(opt_value.value());
}
@ -825,4 +940,17 @@ void RaftServer::ResetReplicationLog() {
rlog_ = std::make_unique<ReplicationLog>();
}
void RaftServer::RecoverSnapshot() {
durability::RecoveryData recovery_data;
CHECK(durability::RecoverOnlySnapshot(durability_dir_, db_, &recovery_data))
<< "Failed to recover from snapshot";
durability::RecoverIndexes(db_, recovery_data.indexes);
}
void RaftServer::NoOpCreate() {
auto dba = db_->Access();
Emplace(database::StateDelta::NoOp(dba->transaction_id()));
dba->Commit();
}
} // namespace raft

View File

@ -19,6 +19,11 @@
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"
// Forward declaration
namespace database {
class GraphDb;
} // namespace database
namespace raft {
using Clock = std::chrono::system_clock;
@ -50,16 +55,16 @@ class RaftServer final : public RaftInterface {
///
/// @param server_id ID of the current server.
/// @param durbility_dir directory for persisted data.
/// @param db_recover_on_startup flag indicating if recovery should happen at
/// startup.
/// @param config raft configuration.
/// @param coordination Abstraction for coordination between Raft servers.
/// @param delta_applier Object which is able to apply state deltas to SM.
/// @param reset_callback Function that is called on each Leader->Follower
/// transition.
/// @param db The current DB object.
RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, raft::Coordination *coordination,
database::StateDeltaApplier *delta_applier,
std::function<void(void)> reset_callback,
std::function<void(void)> no_op_create);
bool db_recover_on_startup, const Config &config,
raft::Coordination *coordination,
database::StateDeltaApplier *delta_applier, database::GraphDb *db);
/// Starts the RPC servers and starts mechanisms inside Raft protocol.
void Start();
@ -80,6 +85,14 @@ class RaftServer final : public RaftInterface {
/// Retrieves log size from persistent storage.
uint64_t LogSize();
/// Retrieves persisted snapshot metadata or nullopt if not present.
std::experimental::optional<std::pair<uint64_t, uint64_t>>
GetSnapshotMetadata();
/// Persists snapshot metadata.
void PersistSnapshotMetadata(uint64_t last_included_term,
uint64_t last_included_index);
/// Append to the log a list of batched state deltasa that are ready to be
/// replicated.
void AppendToLog(const tx::TransactionId &tx_id,
@ -144,12 +157,16 @@ class RaftServer final : public RaftInterface {
Config config_; ///< Raft config.
Coordination *coordination_{nullptr}; ///< Cluster coordination.
database::StateDeltaApplier *delta_applier_{nullptr};
database::GraphDb *db_{nullptr};
std::unique_ptr<ReplicationLog> rlog_{nullptr};
std::atomic<Mode> mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server.
uint64_t commit_index_; ///< Index of the highest known committed entry.
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
std::atomic<Mode> mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server.
std::string durability_dir_; ///< Durability directory.
bool db_recover_on_startup_; ///< Flag indicating if recovery should happen
///< on startup.
uint64_t commit_index_; ///< Index of the highest known committed entry.
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
/// Raft log entry buffer.
///
@ -167,6 +184,10 @@ class RaftServer final : public RaftInterface {
std::thread no_op_issuer_thread_; ///< Thread responsible for issuing no-op
///< command on leader change.
std::thread snapshot_thread_; ///< Thread responsible for snapshot creation
///< when log size reaches
///< `log_size_snapshot_threshold`.
std::condition_variable leader_changed_; ///< Notifies the
///< no_op_issuer_thread that a new
///< leader has been elected.
@ -221,12 +242,6 @@ class RaftServer final : public RaftInterface {
storage::KVStore disk_storage_;
/// Callback that needs to be called to reset the db state.
std::function<void(void)> reset_callback_;
/// Callback that creates a new transaction with NO_OP StateDelta.
std::function<void(void)> no_op_create_callback_;
/// Makes a transition to a new `raft::Mode`.
///
/// throws InvalidTransitionException when transitioning between incompatible
@ -266,6 +281,11 @@ class RaftServer final : public RaftInterface {
/// have been replicated on a majority of peers.
void NoOpIssuerThreadMain();
/// Periodically checks if the Log size reached `log_size_snapshot_threshold`
/// parameter. If it has, then it performs log compaction and creates
/// snapshots.
void SnapshotThread();
/// Sets the `TimePoint` for next election.
void SetNextElectionTimePoint();
@ -341,6 +361,14 @@ class RaftServer final : public RaftInterface {
/// Deserialized Raft log entry from `std::string`
LogEntry DeserializeLogEntry(const std::string &serialized_log_entry);
/// Resets the replication log used to indicate the replication status.
void ResetReplicationLog();
/// Recovers the latest snapshot that exists in the durability directory.
void RecoverSnapshot();
/// Start a new transaction with a NO-OP StateDelta.
void NoOpCreate();
};
} // namespace raft

View File

@ -2,5 +2,5 @@
"election_timeout_min": 350,
"election_timeout_max": 700,
"heartbeat_interval": 100,
"replicate_timeout": 100
"log_size_snapshot_threshold": 100000
}

View File

@ -41,6 +41,7 @@ do
--coordination_config_file="coordination.json" \
--raft_config_file="raft.json" \
--port $((7686 + $server_id)) \
--db-recover-on-startup=false \
--durability_directory=dur$server_id &
HA_PIDS[$server_id]=$!
wait_for_server $((7686 + $server_id))

View File

@ -2,5 +2,5 @@
"election_timeout_min": 200,
"election_timeout_max": 500,
"heartbeat_interval": 100,
"replicate_timeout": 100
"log_size_snapshot_threshold": 100000
}