Add ReplicationLog to RaftServer

Summary:
* renamed `HasCommitted` to `SafeToCommit`
* implemented (c/p) `ReplicationLog`

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1776
This commit is contained in:
Matija Santl 2018-12-18 14:31:55 +01:00
parent 5e6cf0724a
commit 8c51d2fa0b
12 changed files with 238 additions and 26 deletions

View File

@ -129,8 +129,8 @@ void GraphDb::Reset() {
// state.
tx_engine_.Reset();
storage_gc_ =
std::make_unique<StorageGc>(*storage_, tx_engine_, config_.gc_cycle_sec);
storage_gc_ = std::make_unique<StorageGc>(
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);
}
} // namespace database

View File

@ -173,8 +173,8 @@ class GraphDb {
&delta_applier_,
[this]() { this->Reset(); }};
tx::Engine tx_engine_{&raft_server_};
std::unique_ptr<StorageGc> storage_gc_ =
std::make_unique<StorageGc>(*storage_, tx_engine_, config_.gc_cycle_sec);
std::unique_ptr<StorageGc> storage_gc_ = std::make_unique<StorageGc>(
*storage_, tx_engine_, &raft_server_, config_.gc_cycle_sec);
storage::ConcurrentIdMapper<storage::Label> label_mapper_{
storage_->PropertiesOnDisk()};
storage::ConcurrentIdMapper<storage::EdgeType> edge_mapper_{

View File

@ -54,4 +54,16 @@ class MissingPersistentDataException : public RaftException {
key) {}
};
/// This exception should be thrown when a `RaftServer` instance attempts to
/// read from replication log from a invalid mode or for a garbage collected
/// transaction.
class InvalidReplicationLogLookup : public RaftException {
public:
using RaftException::RaftException;
InvalidReplicationLogLookup()
: RaftException(
"Replication log lookup for invalid transaction or from invalid "
"mode.") {}
};
} // namespace raft

View File

@ -14,9 +14,9 @@ class RaftInterface {
/// Add StateDelta to the appropriate Raft log entry.
virtual void Emplace(const database::StateDelta &) = 0;
/// Check if the transaction with the given transaction id has been
/// replicated on the majority of the Raft cluster and commited.
virtual bool HasCommitted(const tx::TransactionId &tx_id) = 0;
/// Checks if the transaction with the given transaction id can safely be
/// committed in local storage.
virtual bool SafeToCommit(const tx::TransactionId &tx_id) = 0;
protected:
~RaftInterface() {}

View File

@ -2,6 +2,7 @@
#include <kj/std/iostream.h>
#include <experimental/filesystem>
#include <memory>
#include <gflags/gflags.h>
#include <glog/logging.h>
@ -26,6 +27,7 @@ RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
: config_(config),
coordination_(coordination),
delta_applier_(delta_applier),
rlog_(std::make_unique<ReplicationLog>()),
mode_(Mode::FOLLOWER),
server_id_(server_id),
disk_storage_(fs::path(durability_dir) / kRaftDir),
@ -203,7 +205,9 @@ std::vector<LogEntry> RaftServer::Log() {
return DeserializeLog(opt_value.value());
}
void RaftServer::Replicate(const std::vector<database::StateDelta> &log) {
void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &log) {
rlog_->set_active(tx_id);
throw utils::NotYetImplemented("RaftServer replication");
}
@ -211,11 +215,29 @@ void RaftServer::Emplace(const database::StateDelta &delta) {
log_entry_buffer_.Emplace(delta);
}
bool RaftServer::HasCommitted(const tx::TransactionId &tx_id) {
// When in follower mode return true.
// Raise an exception if in candidate mode (should't happen).
// Check the state and return the correct value if leader.
return true;
bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
std::unique_lock<std::mutex> lock(lock_);
switch (mode_) {
case Mode::FOLLOWER:
// When in follower mode, we will only try to apply a Raft Log when we
// receive a commit index greater or equal from the Log index from the
// leader. At that moment we don't have to check the replication log
// because the leader won't commit the Log locally if it's not replicated
// on the majority of the peers in the cluster. This is why we can short
// circut the check to always return true if in follower mode.
return true;
case Mode::LEADER:
if (rlog_->is_active(tx_id)) return false;
if (rlog_->is_replicated(tx_id)) return true;
// The only possibility left is that our ReplicationLog doesn't contain
// information about that tx. Let's log that on our way out.
break;
case Mode::CANDIDATE:
break;
}
throw InvalidReplicationLogLookup();
}
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
@ -248,7 +270,7 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
log.emplace_back(std::move(delta));
logs_.erase(it);
raft_server_->Replicate(log);
raft_server_->AppendToLog(tx_id, log);
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
@ -263,6 +285,7 @@ void RaftServer::Transition(const Mode &new_mode) {
case Mode::FOLLOWER: {
if (mode_ == Mode::LEADER) {
reset_callback_();
ResetReplicationLog();
}
LOG(INFO) << "Server " << server_id_
<< ": Transition to FOLLOWER (Term: " << CurrentTerm() << ")";
@ -575,4 +598,8 @@ std::vector<LogEntry> RaftServer::DeserializeLog(
return deserialized_log;
}
void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) {
rlog_->garbage_collect_older(tx_id);
}
} // namespace raft

View File

@ -13,6 +13,7 @@
#include "raft/log_entry.hpp"
#include "raft/raft_interface.hpp"
#include "raft/raft_rpc_messages.hpp"
#include "raft/replication_log.hpp"
#include "storage/common/kvstore/kvstore.hpp"
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"
@ -79,16 +80,20 @@ class RaftServer final : public RaftInterface {
/// persistent storage, an empty Log will be created.
std::vector<LogEntry> Log();
/// Start replicating StateDeltas batched together in a Raft log.
void Replicate(const std::vector<database::StateDelta> &log);
/// Append the log to the list of completed logs that are ready to be
/// replicated.
void AppendToLog(const tx::TransactionId &tx_id,
const std::vector<database::StateDelta> &log);
/// Emplace a single StateDelta to the corresponding batch. If the StateDelta
/// marks the transaction end, it will replicate the log accorss the cluster.
void Emplace(const database::StateDelta &delta) override;
/// Check if the transaction with the given transaction id has been
/// replicated on the majority of the Raft cluster and commited.
bool HasCommitted(const tx::TransactionId &tx_id) override;
/// Checks if the transaction with the given transaction id can safely be
/// committed in local storage.
bool SafeToCommit(const tx::TransactionId &tx_id) override;
void GarbageCollectReplicationLog(const tx::TransactionId &tx_id);
private:
/// Buffers incomplete Raft logs.
@ -136,6 +141,7 @@ class RaftServer final : public RaftInterface {
Config config_; ///< Raft config.
Coordination *coordination_{nullptr}; ///< Cluster coordination.
database::StateDeltaApplier *delta_applier_{nullptr};
std::unique_ptr<ReplicationLog> rlog_{nullptr};
Mode mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server.
@ -283,5 +289,10 @@ class RaftServer final : public RaftInterface {
/// Deserializes Raft log from `std::string`.
std::vector<LogEntry> DeserializeLog(const std::string &serialized_log);
void ResetReplicationLog() {
rlog_ = nullptr;
rlog_ = std::make_unique<ReplicationLog>();
}
};
} // namespace raft

View File

@ -0,0 +1,87 @@
/// @file
#pragma once
#include <atomic>
#include "data_structures/bitset/dynamic_bitset.hpp"
#include "transactions/type.hpp"
namespace raft {
/// Tracks information about replicated and active logs for high availability.
///
/// The main difference between ReplicationLog and CommitLog is that
/// ReplicationLog doesn't throw when looking up garbage collected transaction
/// ids.
class ReplicationLog final {
public:
static constexpr int kBitsetBlockSize = 32768;
ReplicationLog() = default;
ReplicationLog(const ReplicationLog &) = delete;
ReplicationLog(ReplicationLog &&) = delete;
ReplicationLog &operator=(const ReplicationLog &) = delete;
ReplicationLog &operator=(ReplicationLog &&) = delete;
bool is_active(tx::TransactionId id) const {
return fetch_info(id).is_active();
}
void set_active(tx::TransactionId id) { log.set(2 * id); }
bool is_replicated(tx::TransactionId id) const {
return fetch_info(id).is_replicated();
}
void set_replicated(tx::TransactionId id) { log.set(2 * id + 1); }
// Clears the replication log from bits associated with transactions with an
// id lower than `id`.
void garbage_collect_older(tx::TransactionId id) {
log.delete_prefix(2 * id);
valid_prefix = 2 * (id + 1);
}
class Info final {
public:
enum Status {
UNKNOWN = 0, // 00
ACTIVE = 1, // 01
REPLICATED = 2, // 10
};
explicit Info(uint8_t flags) {
if (flags & REPLICATED) {
flags_ = REPLICATED;
} else if (flags & ACTIVE) {
flags_ = ACTIVE;
} else {
flags_ = UNKNOWN;
}
}
bool is_active() const {
if (flags_ & REPLICATED) return false;
return flags_ & ACTIVE;
}
bool is_replicated() const { return flags_ & REPLICATED; }
operator uint8_t() const { return flags_; }
private:
uint8_t flags_{0};
};
Info fetch_info(tx::TransactionId id) const {
if (valid_prefix > 2 * id) return Info{0};
return Info{log.at(2 * id, 2)};
}
private:
DynamicBitset<uint8_t, kBitsetBlockSize> log;
std::atomic<tx::TransactionId> valid_prefix{0};
};
} // namespace raft

View File

@ -4,12 +4,13 @@
#include <queue>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "storage/single_node_ha/mvcc/version_list.hpp"
#include "raft/raft_server.hpp"
#include "stats/metrics.hpp"
#include "storage/single_node_ha/deferred_deleter.hpp"
#include "storage/single_node_ha/edge.hpp"
#include "storage/single_node_ha/garbage_collector.hpp"
#include "storage/single_node_ha/gid.hpp"
#include "storage/single_node_ha/mvcc/version_list.hpp"
#include "storage/single_node_ha/storage.hpp"
#include "storage/single_node_ha/vertex.hpp"
#include "transactions/single_node_ha/engine.hpp"
@ -41,8 +42,10 @@ class StorageGc {
/** Creates a garbage collector for the given storage that uses the given
* tx::Engine. If `pause_sec` is greater then zero, then GC gets triggered
* periodically. */
StorageGc(Storage &storage, tx::Engine &tx_engine, int pause_sec)
StorageGc(Storage &storage, tx::Engine &tx_engine,
raft::RaftServer *raft_server, int pause_sec)
: tx_engine_(tx_engine),
raft_server_(raft_server),
storage_(storage),
vertices_(storage.vertices_),
edges_(storage.edges_) {
@ -75,9 +78,12 @@ class StorageGc {
StorageGc &operator=(const StorageGc &) = delete;
StorageGc &operator=(StorageGc &&) = delete;
void CollectCommitLogGarbage(tx::TransactionId oldest_active) {
void CollectLogGarbage(tx::TransactionId oldest_active) {
auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) tx_engine_.GarbageCollectCommitLog(*safe_to_delete);
if (safe_to_delete) {
tx_engine_.GarbageCollectCommitLog(*safe_to_delete);
raft_server_->GarbageCollectReplicationLog(*safe_to_delete);
}
}
void CollectGarbage() {
@ -121,7 +127,7 @@ class StorageGc {
<< x.Elapsed().count();
}
CollectCommitLogGarbage(snapshot_gc.back());
CollectLogGarbage(snapshot_gc.back());
gc_txid_ranges_.emplace(snapshot_gc.back(), tx_engine_.GlobalLast());
VLOG(21) << "gc snapshot: " << snapshot_gc;
@ -155,6 +161,7 @@ class StorageGc {
}
tx::Engine &tx_engine_;
raft::RaftServer *raft_server_;
utils::Scheduler scheduler_;
private:

View File

@ -81,7 +81,7 @@ void Engine::Commit(const Transaction &t) {
raft_->Emplace(database::StateDelta::TxCommit(t.id_));
// Wait for Raft to receive confirmation from the majority of followers.
while (!raft_->HasCommitted(t.id_)) {
while (!raft_->SafeToCommit(t.id_)) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}

View File

@ -158,6 +158,9 @@ target_link_libraries(${test_prefix}pod_buffer mg-single-node kvstore_dummy_lib)
add_unit_test(property_value_store.cpp)
target_link_libraries(${test_prefix}property_value_store kvstore_lib mg-single-node)
add_unit_test(replication_log.cpp)
target_link_libraries(${test_prefix}replication_log mg-single-node-ha kvstore_lib glog)
add_unit_test(query_cost_estimator.cpp)
target_link_libraries(${test_prefix}query_cost_estimator mg-single-node kvstore_dummy_lib)

View File

@ -0,0 +1,65 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "raft/replication_log.hpp"
using namespace tx;
TEST(ReplicationLog, ActiveReplicated) {
raft::ReplicationLog rlog;
const tx::TransactionId tx_id = 10;
EXPECT_FALSE(rlog.is_replicated(tx_id));
EXPECT_FALSE(rlog.is_active(tx_id));
rlog.set_active(tx_id);
EXPECT_FALSE(rlog.is_replicated(tx_id));
EXPECT_TRUE(rlog.is_active(tx_id));
rlog.set_replicated(tx_id);
EXPECT_TRUE(rlog.is_replicated(tx_id));
EXPECT_FALSE(rlog.is_active(tx_id));
}
TEST(ReplicationLog, GarbageCollect) {
raft::ReplicationLog rlog;
auto set_active = [&rlog](tx::TransactionId tx_id) {
rlog.set_active(tx_id);
EXPECT_TRUE(rlog.is_active(tx_id));
};
auto set_replicated = [&rlog](tx::TransactionId tx_id) {
rlog.set_replicated(tx_id);
EXPECT_TRUE(rlog.is_replicated(tx_id));
EXPECT_FALSE(rlog.is_active(tx_id));
};
const int n = raft::ReplicationLog::kBitsetBlockSize;
for (int i = 1; i < 3 * n; ++i) {
set_active(i);
}
for (int i = 1; i < 2 * n; ++i) {
set_replicated(i);
}
rlog.garbage_collect_older(n);
for (int i = 1; i <= n; ++i) {
EXPECT_FALSE(rlog.is_active(i));
EXPECT_FALSE(rlog.is_replicated(i));
}
for (int i = n + 1; i < 2 * n; ++i) {
EXPECT_FALSE(rlog.is_active(i));
EXPECT_TRUE(rlog.is_replicated(i));
}
for (int i = 2 * n; i < 3 * n; ++i) {
EXPECT_TRUE(rlog.is_active(i));
}
}

View File

@ -16,7 +16,7 @@ class RaftMock final : public raft::RaftInterface {
log_[delta.transaction_id].emplace_back(std::move(delta));
}
bool HasCommitted(const tx::TransactionId &tx_id) override {
bool SafeToCommit(const tx::TransactionId &tx_id) override {
return true;
}