Add log compaction for Raft, pt. 2

Summary: Implemented snapshot replication and log compaction.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1840
This commit is contained in:
Matija Santl 2019-01-31 13:40:17 +01:00
parent da95cbf4ec
commit 145c81376f
20 changed files with 469 additions and 147 deletions

2
.gitignore vendored
View File

@ -92,6 +92,8 @@ src/raft/log_entry.capnp
src/raft/log_entry.hpp
src/raft/raft_rpc_messages.capnp
src/raft/raft_rpc_messages.hpp
src/raft/snapshot_metadata.capnp
src/raft/snapshot_metadata.hpp
src/stats/stats_rpc_messages.capnp
src/stats/stats_rpc_messages.hpp
src/storage/distributed/rpc/concurrent_id_mapper_rpc_messages.capnp

View File

@ -305,6 +305,8 @@ add_lcp_single_node_ha(raft/raft_rpc_messages.lcp CAPNP_SCHEMA @0xa6c29b4287233b
add_capnp_single_node_ha(raft/raft_rpc_messages.capnp)
add_lcp_single_node_ha(raft/log_entry.lcp CAPNP_SCHEMA @0x96c07fe13850c22a)
add_capnp_single_node_ha(raft/log_entry.capnp)
add_lcp_single_node_ha(raft/snapshot_metadata.lcp CAPNP_SCHEMA @0xaa08e34991680f6c)
add_capnp_single_node_ha(raft/snapshot_metadata.capnp)
add_custom_target(generate_lcp_single_node_ha DEPENDS ${generated_lcp_single_node_ha_files})

View File

@ -39,19 +39,13 @@ void GraphDb::Start() {
GraphDb::~GraphDb() {}
bool GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
bool ret =
coordination_.AwaitShutdown([this, &call_before_shutdown]() -> bool {
is_accepting_transactions_ = false;
tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
void GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
coordination_.AwaitShutdown([this, &call_before_shutdown]() {
tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
call_before_shutdown();
return true;
});
return ret;
call_before_shutdown();
});
}
void GraphDb::Shutdown() {

View File

@ -96,7 +96,7 @@ class GraphDb {
GraphDb &operator=(GraphDb &&) = delete;
void Start();
bool AwaitShutdown(std::function<void(void)> call_before_shutdown);
void AwaitShutdown(std::function<void(void)> call_before_shutdown);
void Shutdown();
/// Create a new accessor by starting a new transaction.
@ -122,9 +122,6 @@ class GraphDb {
/// might end up with some stale transactions on the leader.
void Reset();
/// When this is false, no new transactions should be created.
bool is_accepting_transactions() const { return is_accepting_transactions_; }
/// Get live view of storage stats. Gets updated on RefreshStat.
const Stat &GetStat() const { return stat_; }
@ -146,8 +143,6 @@ class GraphDb {
protected:
Stat stat_;
std::atomic<bool> is_accepting_transactions_{true};
utils::Scheduler transaction_killer_;
Config config_;

View File

@ -14,13 +14,16 @@ namespace durability {
namespace fs = std::experimental::filesystem;
fs::path MakeSnapshotPath(const fs::path &durability_dir,
tx::TransactionId tx_id) {
std::string GetSnapshotFilename(tx::TransactionId tx_id) {
std::string date_str =
utils::Timestamp(utils::Timestamp::Now())
.ToString("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
auto file_name = date_str + "_tx_" + std::to_string(tx_id);
return durability_dir / kSnapshotDir / file_name;
return date_str + "_tx_" + std::to_string(tx_id);
}
fs::path MakeSnapshotPath(const fs::path &durability_dir,
const std::string &snapshot_filename) {
return durability_dir / kSnapshotDir / snapshot_filename;
}
std::experimental::optional<tx::TransactionId>

View File

@ -9,12 +9,15 @@ namespace durability {
const std::string kSnapshotDir = "snapshots";
const std::string kBackupDir = ".backup";
/// Generates a path for a DB snapshot in the given folder in a well-defined
/// Generates a filename for a DB snapshot in the given folder in a well-defined
/// sortable format with transaction from which the snapshot is created appended
/// to the file name.
std::string GetSnapshotFilename(tx::TransactionId tx_id);
/// Generates a full path for a DB snapshot.
std::experimental::filesystem::path MakeSnapshotPath(
const std::experimental::filesystem::path &durability_dir,
tx::TransactionId tx_id);
const std::string &snapshot_filename);
/// Returns the transaction id contained in the file name. If the filename is
/// not a parseable snapshot file name, nullopt is returned.

View File

@ -147,26 +147,24 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
} // anonymous namespace
bool RecoverOnlySnapshot(const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data) {
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
bool RecoverSnapshot(database::GraphDb *db, RecoveryData *recovery_data,
const fs::path &durability_dir,
const std::string &snapshot_filename) {
const auto snapshot_dir = durability_dir / kSnapshotDir;
std::vector<fs::path> snapshot_files;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
for (auto &file : fs::directory_iterator(snapshot_dir))
snapshot_files.emplace_back(file);
if (snapshot_files.size() != 1) {
LOG(WARNING) << "Expected only one snapshot file for recovery!";
if (!fs::exists(snapshot_dir) || !fs::is_directory(snapshot_dir)) {
LOG(WARNING) << "Missing snapshot directory!";
return false;
}
auto snapshot_file = *snapshot_files.begin();
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
LOG(WARNING) << "Snapshot recovery failed";
const auto snapshot = snapshot_dir / snapshot_filename;
if (!fs::exists(snapshot)) {
LOG(WARNING) << "Missing snapshot file!";
return false;
}
LOG(INFO) << "Starting snapshot recovery from: " << snapshot;
if (!RecoverSnapshot(snapshot, db, recovery_data)) {
LOG(WARNING) << "Snapshot recovery failed.";
return false;
}

View File

@ -44,17 +44,20 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash);
/**
* Recovers database from the latest possible snapshot. If recovering fails,
* false is returned and db_accessor aborts transaction, else true is returned
* and transaction is commited.
* Recovers database from the given snapshot. If recovering fails, false is
* returned and db_accessor aborts transaction, else true is returned and
* transaction is commited.
*
* @param durability_dir - Path to durability directory.
* @param db - The database to recover into.
* @param recovery_data - Struct that will contain additional recovery data.
* @param durability_dir - Path to durability directory.
* @param snapshot_filename - Snapshot filename.
* @return - recovery info
*/
bool RecoverOnlySnapshot(
const std::experimental::filesystem::path &durability_dir,
database::GraphDb *db, durability::RecoveryData *recovery_data);
bool RecoverSnapshot(database::GraphDb *db,
durability::RecoveryData *recovery_data,
const std::experimental::filesystem::path &durability_dir,
const std::string &snapshot_filename);
void RecoverIndexes(database::GraphDb *db,
const std::vector<IndexRecoveryData> &indexes);

View File

@ -94,10 +94,11 @@ void RemoveOldSnapshots(const fs::path &snapshot_dir, uint16_t keep) {
} // namespace
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
const fs::path &durability_dir) {
const fs::path &durability_dir,
const std::string &snapshot_filename) {
if (!utils::EnsureDir(durability_dir / kSnapshotDir)) return false;
const auto snapshot_file =
MakeSnapshotPath(durability_dir, dba.transaction_id());
MakeSnapshotPath(durability_dir, snapshot_filename);
if (fs::exists(snapshot_file)) return false;
if (Encode(snapshot_file, db, dba)) {
// Only keep the latest snapshot.

View File

@ -11,8 +11,10 @@ namespace durability {
/// @param dba - db accessor with which we are creating a snapshot (reading
/// data)
/// @param durability_dir - directory where durability data is stored.
/// @param snapshot_filename - filename for the snapshot.
bool MakeSnapshot(database::GraphDb &db, database::GraphDbAccessor &dba,
const std::experimental::filesystem::path &durability_dir);
const std::experimental::filesystem::path &durability_dir,
const std::string &snapshot_filename);
/// Remove all snapshots inside the snapshot durability directory.
void RemoveAllSnapshots(

View File

@ -19,7 +19,7 @@ struct Config {
std::chrono::milliseconds election_timeout_min;
std::chrono::milliseconds election_timeout_max;
std::chrono::milliseconds heartbeat_interval;
uint64_t log_size_snapshot_threshold;
int64_t log_size_snapshot_threshold;
static Config LoadFromFile(const std::string &raft_config_file) {
if (!std::experimental::filesystem::exists(raft_config_file))

View File

@ -96,22 +96,19 @@ bool Coordination::Start() {
return true;
}
bool Coordination::AwaitShutdown(
std::function<bool(void)> call_before_shutdown) {
void Coordination::AwaitShutdown(
std::function<void(void)> call_before_shutdown) {
// Wait for a shutdown notification.
while (alive_) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// Call the before shutdown callback.
bool ret = call_before_shutdown();
call_before_shutdown();
// Shutdown our RPC server.
server_.Shutdown();
server_.AwaitShutdown();
// Return `true` if the `call_before_shutdown` succeeded.
return ret;
}
void Coordination::Shutdown() { alive_.store(false); }

View File

@ -68,8 +68,8 @@ class Coordination final {
template <typename TResult>
auto ExecuteOnWorker(
int worker_id,
std::function<TResult(int worker_id, communication::rpc::ClientPool &)>
execute) {
const std::function<TResult(int worker_id,
communication::rpc::ClientPool &)> &execute) {
auto client_pool = GetClientPool(worker_id);
return thread_pool_.Run(execute, worker_id, std::ref(*client_pool));
}
@ -79,8 +79,8 @@ class Coordination final {
template <typename TResult>
auto ExecuteOnWorkers(
int skip_worker_id,
std::function<TResult(int worker_id, communication::rpc::ClientPool &)>
execute) {
const std::function<TResult(int worker_id,
communication::rpc::ClientPool &)> &execute) {
std::vector<std::future<TResult>> futures;
for (auto &worker_id : GetWorkerIds()) {
if (worker_id == skip_worker_id) continue;
@ -112,8 +112,7 @@ class Coordination final {
/// Starts the coordination and its servers.
bool Start();
bool AwaitShutdown(std::function<bool(void)> call_before_shutdown =
[]() -> bool { return true; });
void AwaitShutdown(std::function<void(void)> call_before_shutdown);
/// Hints that the coordination should start shutting down the whole cluster.
void Shutdown();

View File

@ -1,18 +1,21 @@
#>cpp
#pragma once
#include <cstring>
#include <vector>
#include "communication/rpc/messages.hpp"
#include "raft/log_entry.hpp"
#include "raft/raft_rpc_messages.capnp.h"
#include "raft/snapshot_metadata.hpp"
cpp<#
(lcp:namespace raft)
(lcp:capnp-namespace "raft")
(lcp:capnp-import 'raft "/raft/log_entry.capnp")
(lcp:capnp-import 'log "/raft/log_entry.capnp")
(lcp:capnp-import 'snap "/raft/snapshot_metadata.capnp")
(lcp:define-rpc request-vote
(:request
@ -31,9 +34,52 @@ cpp<#
(term :uint64_t)
(prev-log-index :uint64_t)
(prev-log-term :uint64_t)
(entries "std::vector<raft::LogEntry>" :capnp-type "List(Raft.LogEntry)")))
(entries "std::vector<raft::LogEntry>" :capnp-type "List(Log.LogEntry)")))
(:response
((success :bool)
(term :uint64_t))))
(lcp:define-rpc install-snapshot
(:request
((leader-id :uint16_t)
(term :uint64_t)
(snapshot-metadata "raft::SnapshotMetadata" :capnp-type "Snap.SnapshotMetadata")
(data "std::unique_ptr<char[]>"
:initarg :move
:capnp-type "Data"
:capnp-init nil
:capnp-save (lambda (builder member capnp-name)
#>cpp
auto data_builder = ${builder}->initData(self.size);
memcpy(data_builder.begin(), ${member}.get(), self.size);
cpp<#)
:slk-save (lambda (member)
#>cpp
slk::Save(self.size, builder);
for (uint32_t i = 0; i < self.size; ++i) {
slk::Save(self.data[i], builder);
}
cpp<#)
:capnp-load (lambda (reader member capnp-name)
(declare (ignore capnp-name))
#>cpp
auto data_reader = ${reader}.getData();
self->size = data_reader.size();
${member}.reset(new char[self->size]);
memcpy(${member}.get(), data_reader.begin(), self->size);
cpp<#)
:slk-load (lambda (member)
#>cpp
slk::Load(&self->size, reader);
self->data.reset(new char[self->size]);
for (uint32_t i = 0; i < self->size; ++i) {
uint8_t curr;
slk::Load(&curr, reader);
self->data[i] = curr;
}
cpp<#))
(size :uint32_t :dont-save t)))
(:response
((term :uint64_t))))
(lcp:pop-namespace) ;; raft

View File

@ -2,7 +2,7 @@
#include <kj/std/iostream.h>
#include <chrono>
#include <experimental/filesystem>
#include <iostream>
#include <memory>
#include <fmt/format.h>
@ -10,13 +10,13 @@
#include <glog/logging.h>
#include "database/graph_db_accessor.hpp"
#include "durability/single_node_ha/paths.hpp"
#include "durability/single_node_ha/recovery.hpp"
#include "durability/single_node_ha/snapshooter.hpp"
#include "raft/exceptions.hpp"
#include "utils/exceptions.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/serialization.hpp"
#include "utils/string.hpp"
#include "utils/thread.hpp"
namespace raft {
@ -44,7 +44,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
rlog_(std::make_unique<ReplicationLog>()),
mode_(Mode::FOLLOWER),
server_id_(server_id),
durability_dir_(durability_dir),
durability_dir_(fs::path(durability_dir)),
db_recover_on_startup_(db_recover_on_startup),
commit_index_(0),
last_applied_(0),
@ -54,14 +54,15 @@ void RaftServer::Start() {
if (db_recover_on_startup_) {
auto snapshot_metadata = GetSnapshotMetadata();
if (snapshot_metadata) {
RecoverSnapshot();
RecoverSnapshot(snapshot_metadata->snapshot_filename);
last_applied_ = snapshot_metadata->second;
commit_index_ = snapshot_metadata->second;
last_applied_ = snapshot_metadata->last_included_index;
commit_index_ = snapshot_metadata->last_included_index;
}
} else {
// We need to clear persisted data if we don't want any recovery.
disk_storage_.DeletePrefix("");
disk_storage_.Put(kLogSizeKey, "0");
durability::RemoveAllSnapshots(durability_dir_);
}
@ -131,7 +132,7 @@ void RaftServer::Start() {
// "If a server receives a request with a stale term, it rejects the
// request"
uint64_t current_term = CurrentTerm();
if (req.term < current_term) {
if (exiting_ || req.term < current_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
@ -157,11 +158,29 @@ void RaftServer::Start() {
// term, then they store the same command.
// - If two entries in different logs have the same index and term,
// then the logs are identical in all preceding entries.
if (LogSize() <= req.prev_log_index ||
GetLogEntry(req.prev_log_index).term != req.prev_log_term) {
auto snapshot_metadata = GetSnapshotMetadata();
if (snapshot_metadata &&
snapshot_metadata->last_included_index == req.prev_log_index) {
if (req.prev_log_term != snapshot_metadata->last_included_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
}
} else if (snapshot_metadata &&
snapshot_metadata->last_included_index > req.prev_log_index) {
LOG(ERROR) << "Received entries that are already commited and have been "
"compacted";
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
} else {
if (LogSize() <= req.prev_log_index ||
GetLogEntry(req.prev_log_index).term != req.prev_log_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
return;
}
}
// [Raft paper figure 2]
@ -194,6 +213,92 @@ void RaftServer::Start() {
Save(res, res_builder);
});
coordination_->Register<InstallSnapshotRpc>(
[this](const auto &req_reader, auto *res_builder) {
// Acquire snapshot lock.
std::lock_guard<std::mutex> snapshot_guard(snapshot_lock_);
std::lock_guard<std::mutex> guard(lock_);
InstallSnapshotReq req;
Load(&req, req_reader);
uint64_t current_term = CurrentTerm();
if (exiting_ || req.term < current_term) {
InstallSnapshotRes res(current_term);
Save(res, res_builder);
return;
}
// Check if the current state matches the one in snapshot
if (req.snapshot_metadata.last_included_index == last_applied_ &&
req.snapshot_metadata.last_included_term == current_term) {
InstallSnapshotRes res(current_term);
Save(res, res_builder);
return;
}
VLOG(40) << "[InstallSnapshotRpc] Starting.";
if (req.term > current_term) {
VLOG(40) << "[InstallSnapshotRpc] Updating term.";
UpdateTerm(req.term);
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
}
utils::OnScopeExit extend_election_timeout([this] {
SetNextElectionTimePoint();
election_change_.notify_all();
});
// Temporary freeze election timer while we handle the snapshot.
next_election_ = TimePoint::max();
VLOG(40) << "[InstallSnapshotRpc] Remove all snapshots.";
// Remove all previous snapshots
durability::RemoveAllSnapshots(durability_dir_);
const auto snapshot_path = durability::MakeSnapshotPath(
durability_dir_, req.snapshot_metadata.snapshot_filename);
// Save snapshot file
{
VLOG(40) << "[InstallSnapshotRpc] Saving received snapshot.";
std::ofstream output_stream;
output_stream.open(snapshot_path, std::ios::out | std::ios::binary);
output_stream.write(req.data.get(), req.size);
output_stream.flush();
output_stream.close();
}
// Discard the all logs. We keep the one at index 0.
VLOG(40) << "[InstallSnapshotRpc] Discarding logs.";
uint64_t log_size = LogSize();
for (uint64_t i = 1; i < log_size; ++i)
disk_storage_.Delete(LogEntryKey(i));
// Reset the database.
VLOG(40) << "[InstallSnapshotRpc] Reset database.";
db_->Reset();
// Apply the snapshot.
VLOG(40) << "[InstallSnapshotRpc] Recover from received snapshot.";
RecoverSnapshot(req.snapshot_metadata.snapshot_filename);
VLOG(40) << "[InstallSnapshotRpc] Persist snapshot metadata.";
PersistSnapshotMetadata(req.snapshot_metadata);
// Update the state to match the one from snapshot.
VLOG(40) << "[InstallSnapshotRpc] Update Raft state.";
last_applied_ = req.snapshot_metadata.last_included_index;
commit_index_ = req.snapshot_metadata.last_included_index;
disk_storage_.Put(
kLogSizeKey,
std::to_string(req.snapshot_metadata.last_included_index + 1));
InstallSnapshotRes res(CurrentTerm());
Save(res, res_builder);
});
// start threads
SetNextElectionTimePoint();
@ -251,27 +356,41 @@ uint64_t RaftServer::LogSize() {
return std::stoull(opt_value.value());
}
std::experimental::optional<std::pair<uint64_t, uint64_t>>
std::experimental::optional<SnapshotMetadata>
RaftServer::GetSnapshotMetadata() {
auto opt_value = disk_storage_.Get(kSnapshotMetadataKey);
if (opt_value == std::experimental::nullopt) {
return std::experimental::nullopt;
}
auto value = utils::Split(opt_value.value(), " ");
if (value.size() != 2) {
LOG(WARNING) << "Malformed snapshot metdata";
return std::experimental::nullopt;
}
return std::make_pair(std::stoull(value[0]), std::stoull(value[1]));
::capnp::MallocMessageBuilder message;
std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary);
kj::std::StdInputStream std_stream(stream);
kj::BufferedInputStreamWrapper buffered_stream(std_stream);
stream << *opt_value;
readMessageCopy(buffered_stream, message);
capnp::SnapshotMetadata::Reader reader =
message.getRoot<capnp::SnapshotMetadata>().asReader();
SnapshotMetadata deserialized;
Load(&deserialized, reader);
return std::experimental::make_optional(deserialized);
}
void RaftServer::PersistSnapshotMetadata(uint64_t last_included_term,
uint64_t last_included_index) {
auto value = utils::Join(
{std::to_string(last_included_term), std::to_string(last_included_index)},
" ");
disk_storage_.Put(kSnapshotMetadataKey, value);
void RaftServer::PersistSnapshotMetadata(
const SnapshotMetadata &snapshot_metadata) {
std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary);
{
::capnp::MallocMessageBuilder message;
capnp::SnapshotMetadata::Builder builder =
message.initRoot<capnp::SnapshotMetadata>();
Save(snapshot_metadata, &builder);
kj::std::StdOutputStream std_stream(stream);
kj::BufferedOutputStreamWrapper buffered_stream(std_stream);
writeMessage(buffered_stream, message);
}
disk_storage_.Put(kSnapshotMetadataKey, stream.str());
}
void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
@ -289,13 +408,9 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
}
uint64_t log_size = LogSize();
DCHECK(last_applied_ == log_size - 1) << "Everything from the leaders log "
"should be applied into our state "
"machine";
rlog_->set_active(tx_id);
LogEntry new_entry(CurrentTerm(), deltas);
++last_applied_;
disk_storage_.Put(LogEntryKey(log_size), SerializeLogEntry(new_entry));
disk_storage_.Put(kLogSizeKey, std::to_string(log_size + 1));
@ -402,14 +517,17 @@ void RaftServer::Transition(const Mode &new_mode) {
ResetReplicationLog();
// Re-apply raft log.
auto snapshot_metadata = GetSnapshotMetadata();
uint64_t starting_index = 1;
auto snapshot_metadata = GetSnapshotMetadata();
if (snapshot_metadata) {
RecoverSnapshot();
starting_index = snapshot_metadata->second + 1;
RecoverSnapshot(snapshot_metadata->snapshot_filename);
starting_index = snapshot_metadata->last_included_index + 1;
}
for (uint64_t i = starting_index; i <= commit_index_; ++i)
for (uint64_t i = starting_index; i <= commit_index_; ++i) {
delta_applier_->Apply(GetLogEntry(i).deltas);
}
last_applied_ = commit_index_;
}
@ -538,17 +656,35 @@ void RaftServer::AdvanceCommitIndex() {
}
commit_index_ = new_commit_index;
}
void RaftServer::Recover() {
throw utils::NotYetImplemented("RaftServer Recover");
last_applied_ = new_commit_index;
}
void RaftServer::SendEntries(uint16_t peer_id,
std::unique_lock<std::mutex> &lock) {
std::unique_lock<std::mutex> *lock) {
auto snapshot_metadata = GetSnapshotMetadata();
if (snapshot_metadata &&
snapshot_metadata->last_included_index >= next_index_[peer_id]) {
SendSnapshot(peer_id, *snapshot_metadata, lock);
} else {
SendLogEntries(peer_id, snapshot_metadata, lock);
}
}
void RaftServer::SendLogEntries(
uint16_t peer_id,
const std::experimental::optional<SnapshotMetadata> &snapshot_metadata,
std::unique_lock<std::mutex> *lock) {
uint64_t request_term = CurrentTerm();
uint64_t request_prev_log_index = next_index_[peer_id] - 1;
uint64_t request_prev_log_term = GetLogEntry(next_index_[peer_id] - 1).term;
uint64_t request_prev_log_term;
if (snapshot_metadata &&
snapshot_metadata->last_included_index == next_index_[peer_id] - 1) {
request_prev_log_term = snapshot_metadata->last_included_term;
} else {
request_prev_log_term = GetLogEntry(next_index_[peer_id] - 1).term;
}
std::vector<LogEntry> request_entries;
if (next_index_[peer_id] <= LogSize() - 1)
@ -572,9 +708,9 @@ void RaftServer::SendEntries(uint16_t peer_id,
VLOG(40) << "Entries size: " << request_entries.size();
lock.unlock(); // Release lock while waiting for response.
lock->unlock(); // Release lock while waiting for response.
auto reply = peer_future.get();
lock.lock();
lock->lock();
if (unreachable_peer) {
next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
@ -616,6 +752,76 @@ void RaftServer::SendEntries(uint16_t peer_id,
state_changed_.notify_all();
}
void RaftServer::SendSnapshot(uint16_t peer_id,
const SnapshotMetadata &snapshot_metadata,
std::unique_lock<std::mutex> *lock) {
uint64_t request_term = CurrentTerm();
uint32_t snapshot_size = 0;
std::unique_ptr<char[]> snapshot;
{
const auto snapshot_path = durability::MakeSnapshotPath(
durability_dir_, snapshot_metadata.snapshot_filename);
std::ifstream input_stream;
input_stream.open(snapshot_path, std::ios::in | std::ios::binary);
input_stream.seekg(0, std::ios::end);
snapshot_size = input_stream.tellg();
snapshot.reset(new char[snapshot_size]);
input_stream.seekg(0, std::ios::beg);
input_stream.read(snapshot.get(), snapshot_size);
input_stream.close();
}
VLOG(40) << "Snapshot size: " << snapshot_size << " bytes.";
bool unreachable_peer = false;
auto peer_future = coordination_->ExecuteOnWorker<InstallSnapshotRes>(
peer_id, [&](int worker_id, auto &client) {
try {
auto res = client.template Call<InstallSnapshotRpc>(
server_id_, request_term, snapshot_metadata, std::move(snapshot),
snapshot_size);
return res;
} catch (...) {
unreachable_peer = true;
return InstallSnapshotRes(request_term);
}
});
lock->unlock();
auto reply = peer_future.get();
lock->lock();
if (unreachable_peer) {
next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
return;
}
if (CurrentTerm() != request_term || exiting_) {
return;
}
if (OutOfSync(reply.term)) {
state_changed_.notify_all();
return;
}
if (reply.term != CurrentTerm()) {
VLOG(40) << "Server " << server_id_
<< ": Ignoring stale InstallSnapshotRpc reply from " << peer_id;
return;
}
match_index_[peer_id] = snapshot_metadata.last_included_index;
next_index_[peer_id] = snapshot_metadata.last_included_index + 1;
next_heartbeat_[peer_id] = Clock::now() + config_.heartbeat_interval;
state_changed_.notify_all();
}
void RaftServer::ElectionThreadMain() {
std::unique_lock<std::mutex> lock(lock_);
while (!exiting_) {
@ -713,9 +919,9 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
case Mode::LEADER: {
if (now >= next_heartbeat_[peer_id]) {
VLOG(40) << "Server " << server_id_
<< ": Send AppendEntries RPC to server " << peer_id
<< ": Sending Entries RPC to server " << peer_id
<< " (Term: " << CurrentTerm() << ")";
SendEntries(peer_id, lock);
SendEntries(peer_id, &lock);
continue;
}
wait_until = next_heartbeat_[peer_id];
@ -743,43 +949,59 @@ void RaftServer::NoOpIssuerThreadMain() {
void RaftServer::SnapshotThread() {
utils::ThreadSetName(fmt::format("RaftSnapshot"));
while (!exiting_) {
{
std::unique_lock<std::mutex> lock(lock_);
if (config_.log_size_snapshot_threshold == -1) return;
uint64_t uncompacted_log_size = LogSize();
while (true) {
{
// Acquire snapshot lock before we acquire the Raft lock. This should
// avoid the situation where we release the lock to start writing a
// snapshot but the `InstallSnapshotRpc` deletes it and we write wrong
// metadata.
std::lock_guard<std::mutex> snapshot_guard(snapshot_lock_);
std::unique_lock<std::mutex> lock(lock_);
if (exiting_) break;
uint64_t committed_log_size = last_applied_;
auto snapshot_metadata = GetSnapshotMetadata();
if (snapshot_metadata) {
uncompacted_log_size -= snapshot_metadata->second;
committed_log_size -= snapshot_metadata->last_included_index;
}
// Compare the log size to the config
if (config_.log_size_snapshot_threshold < uncompacted_log_size) {
if (config_.log_size_snapshot_threshold < committed_log_size) {
VLOG(40) << "[LogCompaction] Starting log compaction.";
// Create a DB accessor for snapshot creation
std::unique_ptr<database::GraphDbAccessor> dba = db_->Access();
uint64_t current_term = CurrentTerm();
uint64_t last_applied = last_applied_;
uint64_t last_included_term = GetLogEntry(last_applied_).term;
uint64_t last_included_index = last_applied_;
std::string snapshot_filename =
durability::GetSnapshotFilename(dba->transaction_id());
lock.unlock();
bool status =
durability::MakeSnapshot(*db_, *dba, fs::path(durability_dir_));
VLOG(40) << "[LogCompaction] Creating snapshot.";
bool status = durability::MakeSnapshot(*db_, *dba, durability_dir_,
snapshot_filename);
lock.lock();
if (status) {
uint64_t log_compaction_start_index = 1;
if (snapshot_metadata) {
log_compaction_start_index = snapshot_metadata->second + 1;
log_compaction_start_index =
snapshot_metadata->last_included_index + 1;
}
PersistSnapshotMetadata(current_term, last_applied);
VLOG(40) << "[LogCompaction] Persisting snapshot metadata";
PersistSnapshotMetadata(
{last_included_term, last_included_index, snapshot_filename});
// Log compaction.
// TODO (msantl): In order to handle log compaction correctly, we need
// to be able to send snapshots over the wire and implement additional
// logic to handle log entries that were compacted into a snapshot.
// for (int i = log_compaction_start_index; i <= last_applied_; ++i) {
// disk_storage_.Delete(LogEntryKey(i));
// }
VLOG(40) << "[LogCompaction] Compacting log from "
<< log_compaction_start_index << " to "
<< last_included_index;
for (int i = log_compaction_start_index; i <= last_included_index;
++i) {
disk_storage_.Delete(LogEntryKey(i));
}
}
lock.unlock();
@ -817,6 +1039,11 @@ bool RaftServer::HasMajortyVote() {
std::pair<uint64_t, uint64_t> RaftServer::LastEntryData() {
uint64_t log_size = LogSize();
if (log_size == 0) return {0, 0};
auto snapshot_metadata = GetSnapshotMetadata();
if (snapshot_metadata &&
snapshot_metadata->last_included_index == log_size - 1) {
return {log_size, snapshot_metadata->last_included_term};
}
return {log_size, GetLogEntry(log_size - 1).term};
}
@ -940,10 +1167,12 @@ void RaftServer::ResetReplicationLog() {
rlog_ = std::make_unique<ReplicationLog>();
}
void RaftServer::RecoverSnapshot() {
void RaftServer::RecoverSnapshot(const std::string &snapshot_filename) {
durability::RecoveryData recovery_data;
CHECK(durability::RecoverOnlySnapshot(durability_dir_, db_, &recovery_data))
<< "Failed to recover from snapshot";
bool recovery = durability::RecoverSnapshot(
db_, &recovery_data, durability_dir_, snapshot_filename);
CHECK(recovery);
durability::RecoverIndexes(db_, recovery_data.indexes);
}

View File

@ -3,6 +3,7 @@
#pragma once
#include <atomic>
#include <experimental/filesystem>
#include <mutex>
#include <unordered_map>
#include <vector>
@ -15,6 +16,7 @@
#include "raft/raft_interface.hpp"
#include "raft/raft_rpc_messages.hpp"
#include "raft/replication_log.hpp"
#include "raft/snapshot_metadata.hpp"
#include "storage/common/kvstore/kvstore.hpp"
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"
@ -86,12 +88,12 @@ class RaftServer final : public RaftInterface {
uint64_t LogSize();
/// Retrieves persisted snapshot metadata or nullopt if not present.
std::experimental::optional<std::pair<uint64_t, uint64_t>>
GetSnapshotMetadata();
/// Snapshot metadata is a triplet consisting of the last included term, last
/// last included log entry index and the snapshot filename.
std::experimental::optional<SnapshotMetadata> GetSnapshotMetadata();
/// Persists snapshot metadata.
void PersistSnapshotMetadata(uint64_t last_included_term,
uint64_t last_included_index);
void PersistSnapshotMetadata(const SnapshotMetadata &snapshot_metadata);
/// Append to the log a list of batched state deltasa that are ready to be
/// replicated.
@ -148,7 +150,8 @@ class RaftServer final : public RaftInterface {
RaftServer *raft_server_{nullptr};
};
mutable std::mutex lock_; ///< Guards all internal state.
mutable std::mutex lock_; ///< Guards all internal state.
mutable std::mutex snapshot_lock_; ///< Guards snapshot creation and removal.
//////////////////////////////////////////////////////////////////////////////
// volatile state on all servers
@ -160,9 +163,10 @@ class RaftServer final : public RaftInterface {
database::GraphDb *db_{nullptr};
std::unique_ptr<ReplicationLog> rlog_{nullptr};
std::atomic<Mode> mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server.
std::string durability_dir_; ///< Durability directory.
std::atomic<Mode> mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server.
std::experimental::filesystem::path
durability_dir_; ///< Durability directory.
bool db_recover_on_startup_; ///< Flag indicating if recovery should happen
///< on startup.
uint64_t commit_index_; ///< Index of the highest known committed entry.
@ -192,7 +196,7 @@ class RaftServer final : public RaftInterface {
///< no_op_issuer_thread that a new
///< leader has been elected.
bool exiting_ = false; ///< True on server shutdown.
bool exiting_{false}; ///< True on server shutdown.
//////////////////////////////////////////////////////////////////////////////
// volatile state on followers and candidates
@ -254,17 +258,34 @@ class RaftServer final : public RaftInterface {
/// Tries to advance the commit index on a leader.
void AdvanceCommitIndex();
/// Recovers from persistent storage. This function should be called from
/// the constructor before the server starts with normal operation.
void Recover();
/// Sends Entries to peer. This function should only be called in leader
/// mode.
/// Decides whether to send Log Entires or Snapshot to the given peer.
///
/// @param peer_id ID of the peer which receives entries.
/// @param lock Lock from the peer thread (released while waiting for
/// response)
void SendEntries(uint16_t peer_id, std::unique_lock<std::mutex> &lock);
void SendEntries(uint16_t peer_id, std::unique_lock<std::mutex> *lock);
/// Sends Log Entries to peer. This function should only be called in leader
/// mode.
///
/// @param peer_id ID of the peer which receives entries.
/// @param snapshot_metadata metadata of the last snapshot, if any.
/// @param lock Lock from the peer thread (released while waiting for
/// response)
void SendLogEntries(
uint16_t peer_id,
const std::experimental::optional<SnapshotMetadata> &snapshot_metadata,
std::unique_lock<std::mutex> *lock);
/// Send Snapshot to peer. This function should only be called in leader
/// mode.
///
/// @param peer_id ID of the peer which receives entries.
/// @param snapshot_metadata metadata of the snapshot to send.
/// @param lock Lock from the peer thread (released while waiting for
/// response)
void SendSnapshot(uint16_t peer_id, const SnapshotMetadata &snapshot_metadata,
std::unique_lock<std::mutex> *lock);
/// Main function of the `election_thread_`. It is responsible for
/// transition to CANDIDATE mode when election timeout elapses.
@ -365,8 +386,8 @@ class RaftServer final : public RaftInterface {
/// Resets the replication log used to indicate the replication status.
void ResetReplicationLog();
/// Recovers the latest snapshot that exists in the durability directory.
void RecoverSnapshot();
/// Recovers the given snapshot if it exists in the durability directory.
void RecoverSnapshot(const std::string &snapshot_filename);
/// Start a new transaction with a NO-OP StateDelta.
void NoOpCreate();

View File

@ -0,0 +1,27 @@
#>cpp
#pragma once
#include <string>
#include "raft/snapshot_metadata.capnp.h"
#include "utils/typeinfo.hpp"
cpp<#
(lcp:namespace raft)
(lcp:capnp-namespace "raft")
(lcp:define-struct snapshot-metadata ()
((last-included-term :uint64_t)
(last-included-index :uint64_t)
(snapshot-filename "std::string"))
(:public #>cpp
SnapshotMetadata() = default;
SnapshotMetadata(uint64_t _last_included_term, uint64_t _last_included_index,
const std::string &_snapshot_filename)
: last_included_term(_last_included_term),
last_included_index(_last_included_index),
snapshot_filename(_snapshot_filename) {}
cpp<#)
(:serialize (:slk) (:capnp)))
(lcp:pop-namespace) ;; raft

View File

@ -2,5 +2,5 @@
"election_timeout_min": 350,
"election_timeout_max": 700,
"heartbeat_interval": 100,
"log_size_snapshot_threshold": 100000
"log_size_snapshot_threshold": -1
}

View File

@ -32,7 +32,7 @@ RESULTS="$DIR/.apollo_measurements"
# Benchmark parameters
DURATION=10
## Startup
# Startup
declare -a HA_PIDS
for server_id in 1 2 3

View File

@ -2,5 +2,5 @@
"election_timeout_min": 200,
"election_timeout_max": 500,
"heartbeat_interval": 100,
"log_size_snapshot_threshold": 100000
"log_size_snapshot_threshold": -1
}