Prepare memgraph for HA

Summary:
Removed WAL and WAL recovery from single node ha binary.
Added `LogEntryBuffer` in `RaftServer`.

Reviewers: ipaljak, teon.banek

Reviewed By: ipaljak, teon.banek

Subscribers: teon.banek, pullbot

Differential Revision: https://phabricator.memgraph.io/D1739
This commit is contained in:
Matija Santl 2018-11-19 16:46:30 +01:00
parent 7d01ba5178
commit b647e3f8b8
19 changed files with 320 additions and 1092 deletions

View File

@ -254,9 +254,7 @@ set(mg_single_node_ha_sources
database/single_node_ha/graph_db_accessor.cpp
durability/single_node_ha/state_delta.cpp
durability/single_node_ha/paths.cpp
durability/single_node_ha/recovery.cpp
durability/single_node_ha/snapshooter.cpp
durability/single_node_ha/wal.cpp
glue/auth.cpp
glue/communication.cpp
raft/coordination.cpp

View File

@ -5,8 +5,6 @@
#include "utils/string.hpp"
// Durability flags.
DEFINE_bool(durability_enabled, false,
"If durability (database persistence) should be enabled");
DEFINE_string(
durability_directory, "durability",
"Path to directory in which to save snapshots and write-ahead log files.");
@ -31,11 +29,6 @@ DEFINE_string(properties_on_disk, "",
"Property names of properties which will be stored on available "
"disk. Property names have to be separated with comma (,).");
// Full durability.
DEFINE_bool(synchronous_commit, false,
"Should a transaction end wait for WAL records to be written to "
"disk before the transaction finishes.");
// RPC flags.
DEFINE_VALIDATED_HIDDEN_int32(
rpc_num_client_workers, std::max(std::thread::hardware_concurrency(), 1U),
@ -60,13 +53,11 @@ DEFINE_VALIDATED_int32(
database::Config::Config()
// Durability flags.
: durability_enabled{FLAGS_durability_enabled},
durability_directory{FLAGS_durability_directory},
: durability_directory{FLAGS_durability_directory},
db_recover_on_startup{FLAGS_db_recover_on_startup},
snapshot_cycle_sec{FLAGS_snapshot_cycle_sec},
snapshot_max_retained{FLAGS_snapshot_max_retained},
snapshot_on_exit{FLAGS_snapshot_on_exit},
synchronous_commit{FLAGS_synchronous_commit},
// Misc flags.
gc_cycle_sec{FLAGS_gc_cycle_sec},
query_execution_time_sec{FLAGS_query_execution_time_sec},

View File

@ -7,7 +7,6 @@
#include "database/single_node_ha/counters.hpp"
#include "database/single_node_ha/graph_db_accessor.hpp"
#include "durability/single_node_ha/paths.hpp"
#include "durability/single_node_ha/recovery.hpp"
#include "durability/single_node_ha/snapshooter.hpp"
#include "storage/single_node_ha/concurrent_id_mapper.hpp"
#include "storage/single_node_ha/storage_gc.hpp"
@ -16,52 +15,11 @@
namespace database {
GraphDb::GraphDb(Config config) : config_(config) {
if (config_.durability_enabled) utils::CheckDir(config_.durability_directory);
GraphDb::GraphDb(Config config) : config_(config) {}
// Durability recovery.
if (config_.db_recover_on_startup) {
CHECK(durability::VersionConsistency(config_.durability_directory))
<< "Contents of durability directory are not compatible with the "
"current version of Memgraph binary!";
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
durability::RecoveryData recovery_data;
recovery_info = durability::RecoverOnlySnapshot(
config_.durability_directory, this, &recovery_data,
std::experimental::nullopt);
// Post-recovery setup and checking.
if (recovery_info) {
recovery_data.wal_tx_to_recover = recovery_info->wal_recovered;
durability::RecoveryTransactions recovery_transactions(this);
durability::RecoverWal(config_.durability_directory, this, &recovery_data,
&recovery_transactions);
durability::RecoverIndexes(this, recovery_data.indexes);
}
}
if (config_.durability_enabled) {
// move any existing snapshots or wal files to a deprecated folder.
if (!config_.db_recover_on_startup &&
durability::ContainsDurabilityFiles(config_.durability_directory)) {
durability::MoveToBackup(config_.durability_directory);
LOG(WARNING) << "Since Memgraph was not supposed to recover on startup "
"and durability is enabled, your current durability "
"files will likely be overriden. To prevent important "
"data loss, Memgraph has stored those files into a "
".backup directory inside durability directory";
}
wal_.Init();
snapshot_creator_ = std::make_unique<utils::Scheduler>();
snapshot_creator_->Run(
"Snapshot", std::chrono::seconds(config_.snapshot_cycle_sec), [this] {
auto dba = this->Access();
this->MakeSnapshot(*dba);
});
}
void GraphDb::Start() {
utils::CheckDir(config_.durability_directory);
CHECK(coordination_.Start()) << "Couldn't start coordination!";
// Start transaction killer.
if (config_.query_execution_time_sec != -1) {
@ -81,19 +39,31 @@ GraphDb::GraphDb(Config config) : config_(config) {
}
}
GraphDb::~GraphDb() {
snapshot_creator_ = nullptr;
GraphDb::~GraphDb() {}
is_accepting_transactions_ = false;
tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
bool GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
bool ret =
coordination_.AwaitShutdown([this, &call_before_shutdown]() -> bool {
snapshot_creator_ = nullptr;
is_accepting_transactions_ = false;
tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
if (config_.snapshot_on_exit) {
auto dba = this->Access();
MakeSnapshot(*dba);
}
call_before_shutdown();
if (config_.snapshot_on_exit) {
auto dba = this->Access();
MakeSnapshot(*dba);
}
return true;
});
return ret;
}
void GraphDb::Shutdown() { coordination_.Shutdown(); }
std::unique_ptr<GraphDbAccessor> GraphDb::Access() {
// NOTE: We are doing a heap allocation to allow polymorphism. If this poses
// performance issues, we may want to have a stack allocated GraphDbAccessor
@ -114,7 +84,7 @@ std::unique_ptr<GraphDbAccessor> GraphDb::AccessBlocking(
Storage &GraphDb::storage() { return *storage_; }
durability::WriteAheadLog &GraphDb::wal() { return wal_; }
raft::RaftServer &GraphDb::raft_server() { return raft_server_; }
tx::Engine &GraphDb::tx_engine() { return tx_engine_; }

View File

@ -7,8 +7,6 @@
#include <vector>
#include "database/single_node_ha/counters.hpp"
#include "durability/single_node_ha/recovery.hpp"
#include "durability/single_node_ha/wal.hpp"
#include "io/network/endpoint.hpp"
#include "raft/coordination.hpp"
#include "raft/raft_server.hpp"
@ -43,13 +41,11 @@ struct Config {
Config();
// Durability flags.
bool durability_enabled;
std::string durability_directory;
bool db_recover_on_startup;
int snapshot_cycle_sec;
int snapshot_max_retained;
int snapshot_on_exit;
bool synchronous_commit;
// Misc flags.
int gc_cycle_sec;
@ -101,6 +97,10 @@ class GraphDb {
GraphDb &operator=(const GraphDb &) = delete;
GraphDb &operator=(GraphDb &&) = delete;
void Start();
bool AwaitShutdown(std::function<void(void)> call_before_shutdown);
void Shutdown();
/// Create a new accessor by starting a new transaction.
std::unique_ptr<GraphDbAccessor> Access();
std::unique_ptr<GraphDbAccessor> AccessBlocking(
@ -109,7 +109,7 @@ class GraphDb {
std::unique_ptr<GraphDbAccessor> Access(tx::TransactionId);
Storage &storage();
durability::WriteAheadLog &wal();
raft::RaftServer &raft_server();
tx::Engine &tx_engine();
storage::ConcurrentIdMapper<storage::Label> &label_mapper();
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper();
@ -121,9 +121,9 @@ class GraphDb {
bool MakeSnapshot(GraphDbAccessor &accessor);
/// Releases the storage object safely and creates a new object.
/// This is needed because of recovery, otherwise we might try to recover into
/// a storage which has already been polluted because of a failed previous
/// recovery
/// This is needed because of recovery, otherwise we might try to recover
/// into a storage which has already been polluted because of a failed
/// previous recovery
void ReinitializeStorage();
/// When this is false, no new transactions should be created.
@ -158,10 +158,6 @@ class GraphDb {
Config config_;
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.durability_directory,
config_.durability_enabled,
config_.synchronous_commit};
raft::Coordination coordination_{
config_.rpc_num_server_workers, config_.rpc_num_client_workers,
config_.server_id,
@ -169,7 +165,7 @@ class GraphDb {
raft::RaftServer raft_server_{
config_.server_id, config_.durability_directory,
raft::Config::LoadFromFile(config_.raft_config_file), &coordination_};
tx::Engine tx_engine_{&wal_};
tx::Engine tx_engine_{&raft_server_};
std::unique_ptr<StorageGc> storage_gc_ =
std::make_unique<StorageGc>(*storage_, tx_engine_, config_.gc_cycle_sec);
storage::ConcurrentIdMapper<storage::Label> label_mapper_{

View File

@ -63,7 +63,7 @@ bool GraphDbAccessor::should_abort() const {
return transaction_.should_abort();
}
durability::WriteAheadLog &GraphDbAccessor::wal() { return db_.wal(); }
raft::RaftServer &GraphDbAccessor::raft_server() { return db_.raft_server(); }
VertexAccessor GraphDbAccessor::InsertVertex(
std::experimental::optional<gid::Gid> requested_gid) {
@ -76,7 +76,7 @@ VertexAccessor GraphDbAccessor::InsertVertex(
db_.storage().vertices_.access().insert(gid, vertex_vlist).second;
CHECK(success) << "Attempting to insert a vertex with an existing GID: "
<< gid;
wal().Emplace(
raft_server().Emplace(
database::StateDelta::CreateVertex(transaction_.id_, vertex_vlist->gid_));
auto va = VertexAccessor(vertex_vlist, *this);
return va;
@ -141,10 +141,9 @@ void GraphDbAccessor::BuildIndex(storage::Label label,
void GraphDbAccessor::EnableIndex(const LabelPropertyIndex::Key &key) {
// Commit transaction as we finished applying method on newest visible
// records. Write that transaction's ID to the WAL as the index has been
// built at this point even if this DBA's transaction aborts for some
// reason.
wal().Emplace(database::StateDelta::BuildIndex(
// records. Write that transaction's ID to the RaftServer as the index has been
// built at this point even if this DBA's transaction aborts for some reason.
raft_server().Emplace(database::StateDelta::BuildIndex(
transaction_id(), key.label_, LabelName(key.label_), key.property_,
PropertyName(key.property_), key.unique_));
}
@ -171,7 +170,7 @@ void GraphDbAccessor::DeleteIndex(storage::Label label,
db_.AccessBlocking(std::experimental::make_optional(transaction_.id_));
db_.storage().label_property_index_.DeleteIndex(key);
dba->wal().Emplace(database::StateDelta::DropIndex(
dba->raft_server().Emplace(database::StateDelta::DropIndex(
dba->transaction_id(), key.label_, LabelName(key.label_), key.property_,
PropertyName(key.property_)));
@ -291,7 +290,7 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
return false;
auto *vlist_ptr = vertex_accessor.address();
wal().Emplace(database::StateDelta::RemoveVertex(
raft_server().Emplace(database::StateDelta::RemoveVertex(
transaction_.id_, vlist_ptr->gid_, check_empty));
vlist_ptr->remove(vertex_accessor.current_, transaction_);
return true;
@ -338,7 +337,7 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
to.SwitchNew();
to.update().in_.emplace(from.address(), edge_vlist, edge_type);
wal().Emplace(database::StateDelta::CreateEdge(
raft_server().Emplace(database::StateDelta::CreateEdge(
transaction_.id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type,
EdgeTypeName(edge_type)));
@ -363,7 +362,7 @@ void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
if (remove_in_edge) edge.to().RemoveInEdge(edge.address());
edge.address()->remove(edge.current_, transaction_);
wal().Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
raft_server().Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
}
storage::Label GraphDbAccessor::Label(const std::string &label_name) {

View File

@ -446,7 +446,7 @@ class GraphDbAccessor {
/// Populates index with vertices containing the key
void PopulateIndex(const LabelPropertyIndex::Key &key);
/// Writes Index (key) creation to wal, marks it as ready for usage
/// Writes Index (key) creation to RaftServer, marks it as ready for usage
void EnableIndex(const LabelPropertyIndex::Key &key);
/**
@ -583,7 +583,7 @@ class GraphDbAccessor {
bool should_abort() const;
const tx::Transaction &transaction() const { return transaction_; }
durability::WriteAheadLog &wal();
raft::RaftServer &raft_server();
auto &db() { return db_; }
const auto &db() const { return db_; }

View File

@ -1,474 +0,0 @@
#include "durability/single_node_ha/recovery.hpp"
#include <experimental/filesystem>
#include <experimental/optional>
#include <limits>
#include <unordered_map>
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/single_node_ha/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/single_node_ha/paths.hpp"
#include "durability/single_node_ha/version.hpp"
#include "durability/single_node_ha/wal.hpp"
#include "glue/communication.hpp"
#include "storage/single_node_ha/indexes/label_property_index.hpp"
#include "transactions/type.hpp"
#include "utils/algorithm.hpp"
#include "utils/file.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
using communication::bolt::Value;
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash) {
auto pos = buffer.Tellg();
auto offset = sizeof(vertex_count) + sizeof(edge_count) + sizeof(hash);
buffer.Seek(-offset, std::ios_base::end);
bool r_val = buffer.ReadType(vertex_count, false) &&
buffer.ReadType(edge_count, false) &&
buffer.ReadType(hash, false);
buffer.Seek(pos);
return r_val;
}
bool VersionConsistency(const fs::path &durability_dir) {
for (const auto &durability_type : {kSnapshotDir, kWalDir}) {
auto recovery_dir = durability_dir / durability_type;
if (!fs::exists(recovery_dir) || !fs::is_directory(recovery_dir)) continue;
for (const auto &file : fs::directory_iterator(recovery_dir)) {
HashedFileReader reader;
communication::bolt::Decoder<HashedFileReader> decoder(reader);
// The following checks are ok because we are only trying to detect
// version inconsistencies.
if (!reader.Open(fs::path(file))) continue;
std::array<uint8_t, 4> target_magic_number =
(durability_type == kSnapshotDir) ? durability::kSnapshotMagic
: durability::kWalMagic;
std::array<uint8_t, 4> magic_number;
if (!reader.Read(magic_number.data(), magic_number.size())) continue;
if (magic_number != target_magic_number) continue;
if (reader.EndOfFile()) continue;
Value dv;
if (!decoder.ReadValue(&dv, Value::Type::Int) ||
dv.ValueInt() != durability::kVersion)
return false;
}
}
return true;
}
bool DistributedVersionConsistency(const int64_t master_version) {
return durability::kVersion == master_version;
}
bool ContainsDurabilityFiles(const fs::path &durability_dir) {
for (const auto &durability_type : {kSnapshotDir, kWalDir}) {
auto recovery_dir = durability_dir / durability_type;
if (fs::exists(recovery_dir) && fs::is_directory(recovery_dir) &&
!fs::is_empty(recovery_dir))
return true;
}
return false;
}
void MoveToBackup(const fs::path &durability_dir) {
auto backup_dir = durability_dir / kBackupDir;
utils::CheckDir(backup_dir);
utils::CheckDir(backup_dir / kSnapshotDir);
utils::CheckDir(backup_dir / kWalDir);
for (const auto &durability_type : {kSnapshotDir, kWalDir}) {
auto recovery_dir = durability_dir / durability_type;
if (!fs::exists(recovery_dir) || !fs::is_directory(recovery_dir)) continue;
for (const auto &file : fs::directory_iterator(recovery_dir)) {
auto filename = fs::path(file).filename();
fs::rename(file, backup_dir / durability_type / filename);
}
}
}
namespace {
using communication::bolt::Value;
#define RETURN_IF_NOT(condition) \
if (!(condition)) { \
reader.Close(); \
return false; \
}
bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
RecoveryData *recovery_data) {
HashedFileReader reader;
communication::bolt::Decoder<HashedFileReader> decoder(reader);
RETURN_IF_NOT(reader.Open(snapshot_file));
auto magic_number = durability::kSnapshotMagic;
reader.Read(magic_number.data(), magic_number.size());
RETURN_IF_NOT(magic_number == durability::kSnapshotMagic);
// Read the vertex and edge count, and the hash, from the end of the snapshot.
int64_t vertex_count;
int64_t edge_count;
uint64_t hash;
RETURN_IF_NOT(
durability::ReadSnapshotSummary(reader, vertex_count, edge_count, hash));
Value dv;
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int) &&
dv.ValueInt() == durability::kVersion);
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::Int));
recovery_data->snapshooter_tx_id = dv.ValueInt();
// Transaction snapshot of the transaction that created the snapshot.
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::List));
for (const auto &value : dv.ValueList()) {
RETURN_IF_NOT(value.IsInt());
recovery_data->snapshooter_tx_snapshot.emplace_back(value.ValueInt());
}
// A list of label+property indexes.
RETURN_IF_NOT(decoder.ReadValue(&dv, Value::Type::List));
auto index_value = dv.ValueList();
for (auto it = index_value.begin(); it != index_value.end();) {
auto label = *it++;
RETURN_IF_NOT(it != index_value.end());
auto property = *it++;
RETURN_IF_NOT(it != index_value.end());
auto unique = *it++;
RETURN_IF_NOT(label.IsString() && property.IsString() && unique.IsBool());
recovery_data->indexes.emplace_back(
IndexRecoveryData{label.ValueString(), property.ValueString(),
/*create = */ true, unique.ValueBool()});
}
auto dba = db->Access();
std::unordered_map<uint64_t, VertexAccessor> vertices;
for (int64_t i = 0; i < vertex_count; ++i) {
Value vertex_dv;
RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, Value::Type::Vertex));
auto &vertex = vertex_dv.ValueVertex();
auto vertex_accessor = dba->InsertVertex(vertex.id.AsUint());
for (const auto &label : vertex.labels) {
vertex_accessor.add_label(dba->Label(label));
}
for (const auto &property_pair : vertex.properties) {
vertex_accessor.PropsSet(dba->Property(property_pair.first),
glue::ToPropertyValue(property_pair.second));
}
vertices.insert({vertex.id.AsUint(), vertex_accessor});
}
for (int64_t i = 0; i < edge_count; ++i) {
Value edge_dv;
RETURN_IF_NOT(decoder.ReadValue(&edge_dv, Value::Type::Edge));
auto &edge = edge_dv.ValueEdge();
auto it_from = vertices.find(edge.from.AsUint());
auto it_to = vertices.find(edge.to.AsUint());
RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end());
auto edge_accessor =
dba->InsertEdge(it_from->second, it_to->second,
dba->EdgeType(edge.type), edge.id.AsUint());
for (const auto &property_pair : edge.properties)
edge_accessor.PropsSet(dba->Property(property_pair.first),
glue::ToPropertyValue(property_pair.second));
}
// Vertex and edge counts are included in the hash. Re-read them to update the
// hash.
reader.ReadType(vertex_count);
reader.ReadType(edge_count);
if (!reader.Close() || reader.hash() != hash) {
dba->Abort();
return false;
}
// Ensure that the next transaction ID in the recovered DB will be greater
// than the latest one we have recovered. Do this to make sure that
// subsequently created snapshots and WAL files will have transactional info
// that does not interfere with that found in previous snapshots and WAL.
tx::TransactionId max_id = recovery_data->snapshooter_tx_id;
auto &snap = recovery_data->snapshooter_tx_snapshot;
if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end());
dba->db().tx_engine().EnsureNextIdGreater(max_id);
dba->Commit();
return true;
}
#undef RETURN_IF_NOT
std::vector<fs::path> GetWalFiles(const fs::path &wal_dir) {
// Get paths to all the WAL files and sort them (on date).
std::vector<fs::path> wal_files;
if (!fs::exists(wal_dir)) return {};
for (auto &wal_file : fs::directory_iterator(wal_dir))
wal_files.emplace_back(wal_file);
std::sort(wal_files.begin(), wal_files.end());
return wal_files;
}
bool ApplyOverDeltas(
const std::vector<fs::path> &wal_files, tx::TransactionId first_to_recover,
const std::function<void(const database::StateDelta &)> &f) {
for (auto &wal_file : wal_files) {
auto wal_file_max_tx_id = TransactionIdFromWalFilename(wal_file.filename());
if (!wal_file_max_tx_id || *wal_file_max_tx_id < first_to_recover) continue;
HashedFileReader wal_reader;
if (!wal_reader.Open(wal_file)) return false;
communication::bolt::Decoder<HashedFileReader> decoder(wal_reader);
auto magic_number = durability::kWalMagic;
wal_reader.Read(magic_number.data(), magic_number.size());
if (magic_number != durability::kWalMagic) return false;
Value dv;
if (!decoder.ReadValue(&dv, Value::Type::Int) ||
dv.ValueInt() != durability::kVersion)
return false;
while (true) {
auto delta = database::StateDelta::Decode(wal_reader, decoder);
if (!delta) break;
f(*delta);
}
}
return true;
}
auto FirstWalTxToRecover(const RecoveryData &recovery_data) {
auto &tx_sn = recovery_data.snapshooter_tx_snapshot;
auto first_to_recover = tx_sn.empty() ? recovery_data.snapshooter_tx_id + 1
: *std::min(tx_sn.begin(), tx_sn.end());
return first_to_recover;
}
std::vector<tx::TransactionId> ReadWalRecoverableTransactions(
const fs::path &wal_dir, database::GraphDb *db,
const RecoveryData &recovery_data) {
auto wal_files = GetWalFiles(wal_dir);
std::unordered_set<tx::TransactionId> committed_set;
auto first_to_recover = FirstWalTxToRecover(recovery_data);
ApplyOverDeltas(
wal_files, first_to_recover, [&](const database::StateDelta &delta) {
if (delta.transaction_id >= first_to_recover &&
delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) {
committed_set.insert(delta.transaction_id);
}
});
std::vector<tx::TransactionId> committed_tx_ids(committed_set.size());
for (auto id : committed_set) committed_tx_ids.push_back(id);
return committed_tx_ids;
}
} // anonymous namespace
RecoveryInfo RecoverOnlySnapshot(
const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id) {
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
const auto snapshot_dir = durability_dir / kSnapshotDir;
std::vector<fs::path> snapshot_files;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
for (auto &file : fs::directory_iterator(snapshot_dir))
snapshot_files.emplace_back(file);
std::sort(snapshot_files.rbegin(), snapshot_files.rend());
for (auto &snapshot_file : snapshot_files) {
if (required_snapshot_tx_id) {
auto snapshot_file_tx_id =
TransactionIdFromSnapshotFilename(snapshot_file);
if (!snapshot_file_tx_id ||
snapshot_file_tx_id.value() != *required_snapshot_tx_id) {
LOG(INFO) << "Skipping snapshot file '" << snapshot_file
<< "' because it does not match the required snapshot tx id: "
<< *required_snapshot_tx_id;
continue;
}
}
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
db->ReinitializeStorage();
recovery_data->Clear();
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
continue;
} else {
LOG(INFO) << "Snapshot recovery successful.";
break;
}
}
// If snapshot recovery is required, and we failed, don't even deal with
// the WAL recovery.
if (required_snapshot_tx_id &&
recovery_data->snapshooter_tx_id != *required_snapshot_tx_id)
return {durability::kVersion, recovery_data->snapshooter_tx_id, {}};
return {durability::kVersion, recovery_data->snapshooter_tx_id,
ReadWalRecoverableTransactions(durability_dir / kWalDir, db,
*recovery_data)};
}
RecoveryTransactions::RecoveryTransactions(database::GraphDb *db) : db_(db) {}
void RecoveryTransactions::Begin(const tx::TransactionId &tx_id) {
CHECK(accessors_.find(tx_id) == accessors_.end())
<< "Double transaction start";
accessors_.emplace(tx_id, db_->Access());
}
void RecoveryTransactions::Abort(const tx::TransactionId &tx_id) {
GetAccessor(tx_id)->Abort();
accessors_.erase(accessors_.find(tx_id));
}
void RecoveryTransactions::Commit(const tx::TransactionId &tx_id) {
GetAccessor(tx_id)->Commit();
accessors_.erase(accessors_.find(tx_id));
}
void RecoveryTransactions::Apply(const database::StateDelta &delta) {
delta.Apply(*GetAccessor(delta.transaction_id));
}
database::GraphDbAccessor *RecoveryTransactions::GetAccessor(
const tx::TransactionId &tx_id) {
auto found = accessors_.find(tx_id);
CHECK(found != accessors_.end())
<< "Accessor does not exist for transaction: " << tx_id;
return found->second.get();
}
// TODO - finer-grained recovery feedback could be useful here.
void RecoverWal(const fs::path &durability_dir, database::GraphDb *db,
RecoveryData *recovery_data,
RecoveryTransactions *transactions) {
auto wal_dir = durability_dir / kWalDir;
auto wal_files = GetWalFiles(wal_dir);
// Track which transaction should be recovered first, and define logic for
// which transactions should be skipped in recovery.
auto &tx_sn = recovery_data->snapshooter_tx_snapshot;
auto first_to_recover = FirstWalTxToRecover(*recovery_data);
// Set of transactions which can be recovered, since not every transaction in
// wal can be recovered because it might not be present on some workers (there
// wasn't enough time for it to flush to disk or similar)
std::unordered_set<tx::TransactionId> common_wal_tx;
for (auto tx_id : recovery_data->wal_tx_to_recover)
common_wal_tx.insert(tx_id);
auto should_skip = [&tx_sn, recovery_data, &common_wal_tx,
first_to_recover](tx::TransactionId tx_id) {
return tx_id < first_to_recover ||
(tx_id < recovery_data->snapshooter_tx_id &&
!utils::Contains(tx_sn, tx_id)) ||
!utils::Contains(common_wal_tx, tx_id);
};
// Ensure that the next transaction ID in the recovered DB will be greater
// than the latest one we have recovered. Do this to make sure that
// subsequently created snapshots and WAL files will have transactional info
// that does not interfere with that found in previous snapshots and WAL.
tx::TransactionId max_observed_tx_id{0};
// Read all the WAL files whose max_tx_id is not smaller than
// min_tx_to_recover.
ApplyOverDeltas(
wal_files, first_to_recover, [&](const database::StateDelta &delta) {
max_observed_tx_id = std::max(max_observed_tx_id, delta.transaction_id);
if (should_skip(delta.transaction_id)) return;
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
transactions->Begin(delta.transaction_id);
break;
case database::StateDelta::Type::TRANSACTION_ABORT:
transactions->Abort(delta.transaction_id);
break;
case database::StateDelta::Type::TRANSACTION_COMMIT:
transactions->Commit(delta.transaction_id);
break;
case database::StateDelta::Type::BUILD_INDEX: {
// TODO index building might still be problematic in HA
auto drop_it = std::find_if(
recovery_data->indexes.begin(), recovery_data->indexes.end(),
[label = delta.label_name, property = delta.property_name](
const IndexRecoveryData &other) {
return other.label == label && other.property == property &&
other.create == false;
});
// If we already have a drop index in the recovery data, just erase
// the drop index action. Otherwise add the build index action.
if (drop_it != recovery_data->indexes.end()) {
recovery_data->indexes.erase(drop_it);
} else {
recovery_data->indexes.emplace_back(
IndexRecoveryData{delta.label_name, delta.property_name,
/*create = */ true, delta.unique});
}
break;
}
case database::StateDelta::Type::DROP_INDEX: {
auto build_it = std::find_if(
recovery_data->indexes.begin(), recovery_data->indexes.end(),
[label = delta.label_name, property = delta.property_name](
const IndexRecoveryData &other) {
return other.label == label && other.property == property &&
other.create == true;
});
// If we already have a build index in the recovery data, just erase
// the build index action. Otherwise add the drop index action.
if (build_it != recovery_data->indexes.end()) {
recovery_data->indexes.erase(build_it);
} else {
recovery_data->indexes.emplace_back(
IndexRecoveryData{delta.label_name, delta.property_name,
/*create = */ false});
}
break;
}
default:
transactions->Apply(delta);
}
});
// TODO when implementing proper error handling return one of the following:
// - WAL fully recovered
// - WAL partially recovered
// - WAL recovery error
db->tx_engine().EnsureNextIdGreater(max_observed_tx_id);
}
void RecoverIndexes(database::GraphDb *db,
const std::vector<IndexRecoveryData> &indexes) {
auto dba = db->Access();
for (const auto &index : indexes) {
auto label = dba->Label(index.label);
auto property = dba->Property(index.property);
if (index.create) {
dba->BuildIndex(label, property, index.unique);
} else {
dba->DeleteIndex(label, property);
}
}
dba->Commit();
}
} // namespace durability

View File

@ -1,151 +0,0 @@
#pragma once
#include <experimental/filesystem>
#include <experimental/optional>
#include <unordered_map>
#include <vector>
#include "durability/hashed_file_reader.hpp"
#include "durability/single_node_ha/state_delta.hpp"
#include "transactions/type.hpp"
namespace database {
class GraphDb;
};
namespace durability {
/// Stores info on what was (or needs to be) recovered from durability.
struct RecoveryInfo {
RecoveryInfo() {}
RecoveryInfo(const int64_t durability_version,
tx::TransactionId snapshot_tx_id,
const std::vector<tx::TransactionId> &wal_recovered)
: durability_version(durability_version),
snapshot_tx_id(snapshot_tx_id),
wal_recovered(wal_recovered) {}
int64_t durability_version;
tx::TransactionId snapshot_tx_id;
std::vector<tx::TransactionId> wal_recovered;
bool operator==(const RecoveryInfo &other) const {
return durability_version == other.durability_version &&
snapshot_tx_id == other.snapshot_tx_id &&
wal_recovered == other.wal_recovered;
}
bool operator!=(const RecoveryInfo &other) const { return !(*this == other); }
};
struct IndexRecoveryData {
std::string label;
std::string property;
bool create; // distinguish between creating and dropping index
bool unique; // used only when creating an index
};
// A data structure for exchanging info between main recovery function and
// snapshot and WAL recovery functions.
struct RecoveryData {
tx::TransactionId snapshooter_tx_id{0};
std::vector<tx::TransactionId> wal_tx_to_recover{};
std::vector<tx::TransactionId> snapshooter_tx_snapshot;
// A collection into which the indexes should be added so they
// can be rebuilt at the end of the recovery transaction.
std::vector<IndexRecoveryData> indexes;
void Clear() {
snapshooter_tx_id = 0;
snapshooter_tx_snapshot.clear();
indexes.clear();
}
};
/** Reads snapshot metadata from the end of the file without messing up the
* hash. */
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash);
/**
* Checks version consistency within the durability directory.
*
* @param durability_dir - Path to durability directory.
* @return - True if snapshot and WAL versions are compatible with
* ` current memgraph binary.
*/
bool VersionConsistency(
const std::experimental::filesystem::path &durability_dir);
/**
* Checks whether the current memgraph binary (on a worker) is
* version consistent with the cluster master.
*
* @param master_version - Version of the master.
* @return - True if versions match.
*/
bool DistributedVersionConsistency(const int64_t master_version);
/**
* Checks whether the durability directory contains snapshot
* or write-ahead log file.
*
* @param durability_dir - Path to durability directory.
* @return - True if durability directory contains either a snapshot
* or WAL file.
*/
bool ContainsDurabilityFiles(
const std::experimental::filesystem::path &durabilty_dir);
/**
* Backup snapshots and WAL files to a backup folder.
*
* @param durability_dir - Path to durability directory.
*/
void MoveToBackup(const std::experimental::filesystem::path &durability_dir);
/**
* Recovers database from the latest possible snapshot. If recovering fails,
* false is returned and db_accessor aborts transaction, else true is returned
* and transaction is commited.
*
* @param durability_dir - Path to durability directory.
* @param db - The database to recover into.
* @param required_snapshot_tx_id - Only used on distributed worker. Indicates
* what the master recovered. The same snapshot must be recovered on the
* worker.
* @return - recovery info
*/
RecoveryInfo RecoverOnlySnapshot(
const std::experimental::filesystem::path &durability_dir,
database::GraphDb *db, durability::RecoveryData *recovery_data,
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id);
/** Interface for accessing transactions during WAL recovery. */
class RecoveryTransactions {
public:
explicit RecoveryTransactions(database::GraphDb *db);
void Begin(const tx::TransactionId &tx_id);
void Abort(const tx::TransactionId &tx_id);
void Commit(const tx::TransactionId &tx_id);
void Apply(const database::StateDelta &delta);
private:
database::GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id);
database::GraphDb *db_;
std::unordered_map<tx::TransactionId,
std::unique_ptr<database::GraphDbAccessor>>
accessors_;
};
void RecoverWal(const std::experimental::filesystem::path &durability_dir,
database::GraphDb *db, RecoveryData *recovery_data,
RecoveryTransactions *transactions);
void RecoverIndexes(database::GraphDb *db,
const std::vector<IndexRecoveryData> &indexes);
} // namespace durability

View File

@ -1,159 +0,0 @@
#include "durability/single_node_ha/wal.hpp"
#include "durability/single_node_ha/paths.hpp"
#include "durability/single_node_ha/version.hpp"
#include "utils/file.hpp"
#include "utils/flag_validation.hpp"
DEFINE_HIDDEN_int32(
wal_flush_interval_millis, 2,
"Interval between two write-ahead log flushes, in milliseconds.");
DEFINE_HIDDEN_int32(
wal_rotate_deltas_count, 10000,
"How many write-ahead deltas should be stored in a single WAL file "
"before rotating it.");
DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096,
"Write-ahead log buffer size.",
FLAG_IN_RANGE(1, 1 << 30));
namespace durability {
WriteAheadLog::WriteAheadLog(
const std::experimental::filesystem::path &durability_dir,
bool durability_enabled, bool synchronous_commit)
: deltas_{FLAGS_wal_buffer_size},
wal_file_{durability_dir},
durability_enabled_(durability_enabled),
synchronous_commit_(synchronous_commit) {
if (durability_enabled_) {
utils::CheckDir(durability_dir);
}
}
WriteAheadLog::~WriteAheadLog() {
if (durability_enabled_) {
if (!synchronous_commit_) scheduler_.Stop();
wal_file_.Flush(deltas_);
}
}
WriteAheadLog::WalFile::WalFile(
const std::experimental::filesystem::path &durability_dir)
: wal_dir_{durability_dir / kWalDir} {}
WriteAheadLog::WalFile::~WalFile() {
if (!current_wal_file_.empty()) writer_.Close();
}
void WriteAheadLog::WalFile::Init() {
if (!utils::EnsureDir(wal_dir_)) {
LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_;
current_wal_file_ = std::experimental::filesystem::path();
} else {
current_wal_file_ = WalFilenameForTransactionId(wal_dir_);
// TODO: Fix error handling, the encoder_ returns `true` or `false`.
try {
writer_.Open(current_wal_file_);
encoder_.WriteRAW(durability::kWalMagic.data(),
durability::kWalMagic.size());
encoder_.WriteInt(durability::kVersion);
writer_.Flush();
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to open write-ahead log file: "
<< current_wal_file_;
current_wal_file_ = std::experimental::filesystem::path();
}
}
latest_tx_ = 0;
current_wal_file_delta_count_ = 0;
}
void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
std::lock_guard<std::mutex> flush_lock(flush_mutex_);
if (current_wal_file_.empty()) {
LOG(ERROR) << "Write-ahead log file uninitialized, discarding data.";
buffer.clear();
return;
}
try {
while (true) {
auto delta = buffer.pop();
if (!delta) break;
latest_tx_ = std::max(latest_tx_, delta->transaction_id);
delta->Encode(writer_, encoder_);
writer_.Flush();
if (++current_wal_file_delta_count_ >= FLAGS_wal_rotate_deltas_count)
RotateFile();
}
writer_.Flush();
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to write to write-ahead log, discarding data.";
buffer.clear();
return;
} catch (std::experimental::filesystem::filesystem_error &) {
LOG(ERROR) << "Failed to rotate write-ahead log.";
buffer.clear();
return;
}
}
void WriteAheadLog::WalFile::RotateFile() {
writer_.Flush();
writer_.Close();
std::experimental::filesystem::rename(
current_wal_file_,
WalFilenameForTransactionId(wal_dir_, latest_tx_));
Init();
}
void WriteAheadLog::Init() {
if (durability_enabled_) {
enabled_ = true;
wal_file_.Init();
if (!synchronous_commit_) {
scheduler_.Run("WAL",
std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(deltas_); });
}
}
}
void WriteAheadLog::Emplace(const database::StateDelta &delta) {
if (durability_enabled_ && enabled_) {
deltas_.emplace(delta);
if (synchronous_commit_ && IsStateDeltaTransactionEnd(delta)) {
wal_file_.Flush(deltas_);
}
}
}
bool WriteAheadLog::IsStateDeltaTransactionEnd(
const database::StateDelta &delta) {
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_COMMIT:
case database::StateDelta::Type::TRANSACTION_ABORT:
return true;
case database::StateDelta::Type::TRANSACTION_BEGIN:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
case database::StateDelta::Type::ADD_LABEL:
case database::StateDelta::Type::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_VERTEX:
case database::StateDelta::Type::REMOVE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
case database::StateDelta::Type::DROP_INDEX:
return false;
}
}
void WriteAheadLog::Flush() {
if (enabled_) {
wal_file_.Flush(deltas_);
}
}
} // namespace durability

View File

@ -1,98 +0,0 @@
#pragma once
#include <chrono>
#include <cstdint>
#include <experimental/filesystem>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "data_structures/ring_buffer.hpp"
#include "durability/single_node_ha/state_delta.hpp"
#include "storage/common/types/property_value.hpp"
#include "storage/common/types/types.hpp"
#include "storage/single_node_ha/gid.hpp"
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"
namespace durability {
/// A database StateDelta log for durability. Buffers and periodically
/// serializes small-granulation database deltas (StateDelta).
///
/// The order is not deterministic in a multithreaded scenario (multiple DB
/// transactions). This is fine, the recovery process should be immune to this
/// indeterminism.
class WriteAheadLog {
public:
WriteAheadLog(const std::experimental::filesystem::path &durability_dir,
bool durability_enabled, bool synchronous_commit);
~WriteAheadLog();
/// Initializes the WAL. Called at the end of GraphDb construction, after
/// (optional) recovery. Also responsible for initializing the wal_file.
void Init();
/// Emplaces the given DeltaState onto the buffer, if the WAL is enabled.
/// If the WAL is configured to work in synchronous commit mode, emplace will
/// flush the buffers if a delta represents a transaction end.
void Emplace(const database::StateDelta &delta);
/// Flushes every delta currently in the ring buffer.
/// This method should only be called from tests.
void Flush();
private:
/// Groups the logic of WAL file handling (flushing, naming, rotating)
class WalFile {
public:
explicit WalFile(const std::experimental::filesystem::path &durability_dir);
~WalFile();
/// Initializes the WAL file. Must be called before first flush. Can be
/// called after Flush() to re-initialize stuff.
void Init();
/// Flushes all the deltas in the buffer to the WAL file. If necessary
/// rotates the file.
void Flush(RingBuffer<database::StateDelta> &buffer);
private:
/// Mutex used for flushing wal data
std::mutex flush_mutex_;
const std::experimental::filesystem::path wal_dir_;
HashedFileWriter writer_;
communication::bolt::BaseEncoder<HashedFileWriter> encoder_{writer_};
/// The file to which the WAL flushes data. The path is fixed, the file gets
/// moved when the WAL gets rotated.
std::experimental::filesystem::path current_wal_file_;
/// Number of deltas in the current wal file.
int current_wal_file_delta_count_{0};
/// The latest transaction whose delta is recorded in the current WAL file.
/// Zero indicates that no deltas have so far been written to the current
/// WAL file.
tx::TransactionId latest_tx_{0};
void RotateFile();
};
RingBuffer<database::StateDelta> deltas_;
utils::Scheduler scheduler_;
WalFile wal_file_;
/// Used for disabling the durability feature of the DB.
bool durability_enabled_{false};
/// Used for disabling the WAL during DB recovery.
bool enabled_{false};
/// Should every WAL write be synced with the underlying storage.
bool synchronous_commit_{false};
/// Checks whether the given state delta represents a transaction end,
/// TRANSACTION_COMMIT and TRANSACTION_ABORT.
bool IsStateDeltaTransactionEnd(const database::StateDelta &delta);
};
} // namespace durability

View File

@ -15,7 +15,6 @@
#include "integrations/kafka/streams.hpp"
#include "memgraph_init.hpp"
#include "query/exceptions.hpp"
#include "telemetry/telemetry.hpp"
#include "utils/flag_validation.hpp"
// General purpose flags.
@ -33,17 +32,11 @@ DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800,
DEFINE_string(cert_file, "", "Certificate file to use.");
DEFINE_string(key_file, "", "Key file to use.");
DEFINE_bool(telemetry_enabled, false,
"Set to true to enable telemetry. We collect information about the "
"running system (CPU and memory information) and information about "
"the database runtime (vertex and edge counts and resource usage) "
"to allow for easier improvement of the product.");
using ServerT = communication::Server<BoltSession, SessionData>;
using communication::ServerContext;
void SingleNodeMain() {
google::SetUsageMessage("Memgraph single-node database server");
void SingleNodeHAMain() {
google::SetUsageMessage("Memgraph high availability single-node database server");
database::GraphDb db;
query::Interpreter interpreter;
SessionData session_data{&db, &interpreter};
@ -78,32 +71,24 @@ void SingleNodeMain() {
&session_data, &context, FLAGS_session_inactivity_timeout,
service_name, FLAGS_num_workers);
// Setup telemetry
std::experimental::optional<telemetry::Telemetry> telemetry;
if (FLAGS_telemetry_enabled) {
telemetry.emplace(
"https://telemetry.memgraph.com/88b5e7e8-746a-11e8-9f85-538a9e9690cc/",
std::experimental::filesystem::path(FLAGS_durability_directory) /
"telemetry",
std::chrono::minutes(10));
telemetry->AddCollector("db", [&db]() -> nlohmann::json {
auto dba = db.Access();
return {{"vertices", dba->VerticesCount()}, {"edges", dba->EdgesCount()}};
});
}
// Handler for regular termination signals
auto shutdown = [&server] {
// Server needs to be shutdown first and then the database. This prevents a
// race condition when a transaction is accepted during server shutdown.
server.Shutdown();
auto shutdown = [&db] {
db.Shutdown();
};
InitSignalHandlers(shutdown);
// Start the database.
db.Start();
// Start the Bolt server.
CHECK(server.Start()) << "Couldn't start the Bolt server!";
server.AwaitShutdown();
db.AwaitShutdown([&server] {
server.Shutdown();
server.AwaitShutdown();
});
}
int main(int argc, char **argv) {
return WithInit(argc, argv, []() { return "memgraph"; }, SingleNodeMain);
return WithInit(argc, argv, []() { return "memgraph"; }, SingleNodeHAMain);
}

View File

@ -17,9 +17,14 @@ Coordination::Coordination(
uint16_t worker_id,
std::unordered_map<uint16_t, io::network::Endpoint> workers)
: server_(workers[worker_id], server_workers_count),
worker_id_(worker_id),
workers_(workers),
thread_pool_(client_workers_count, "RPC client") {}
Coordination::~Coordination() {
CHECK(!alive_) << "You must call Shutdown and AwaitShutdown on Coordination!";
}
std::unordered_map<uint16_t, io::network::Endpoint> Coordination::LoadFromFile(
const std::string &coordination_config_file) {
if (!fs::exists(coordination_config_file))
@ -74,12 +79,32 @@ communication::rpc::ClientPool *Coordination::GetClientPool(int worker_id) {
.first->second;
}
void Coordination::AddWorker(int worker_id,
const io::network::Endpoint &endpoint) {
std::lock_guard<std::mutex> guard(lock_);
workers_.insert({worker_id, endpoint});
bool Coordination::Start() {
if (!server_.Start()) return false;
AddWorker(worker_id_, server_.endpoint());
return true;
}
bool Coordination::AwaitShutdown(
std::function<bool(void)> call_before_shutdown) {
// Wait for a shutdown notification.
while (alive_) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// Call the before shutdown callback.
bool ret = call_before_shutdown();
// Shutdown our RPC server.
server_.Shutdown();
server_.AwaitShutdown();
// Return `true` if the `call_before_shutdown` succeeded.
return ret;
}
void Coordination::Shutdown() { alive_.store(false); }
std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) {
std::lock_guard<std::mutex> guard(lock_);
for (const auto &worker : workers_) {
@ -90,4 +115,10 @@ std::string Coordination::GetWorkerName(const io::network::Endpoint &endpoint) {
return fmt::format("unknown worker ({})", endpoint);
}
void Coordination::AddWorker(int worker_id,
const io::network::Endpoint &endpoint) {
std::lock_guard<std::mutex> guard(lock_);
workers_.insert({worker_id, endpoint});
}
} // namespace raft

View File

@ -2,6 +2,7 @@
#pragma once
#include <atomic>
#include <experimental/filesystem>
#include <functional>
#include <mutex>
@ -19,32 +20,35 @@
namespace raft {
/**
* This class is responsible for coordination between workers (nodes) within
* the Raft cluster. Its implementation is quite similar to coordination
* in distributed Memgraph apart from slight modifications which align more
* closely to Raft.
*
* It should be noted that, in the context of communication, all nodes within
* the Raft cluster are considered equivalent and are henceforth known simply
* as workers.
*
* This class is thread safe.
*/
/// This class is responsible for coordination between workers (nodes) within
/// the Raft cluster. Its implementation is quite similar to coordination
/// in distributed Memgraph apart from slight modifications which align more
/// closely to Raft.
///
/// It should be noted that, in the context of communication, all nodes within
/// the Raft cluster are considered equivalent and are henceforth known simply
/// as workers.
///
/// This class is thread safe.
class Coordination final {
public:
/**
* Class constructor
*
* @param server_workers_count Number of workers in RPC Server.
* @param client_workers_count Number of workers in RPC Client.
* @param worker_id ID of Raft worker (node) on this machine.
* @param coordination_config_file file that contains coordination config.
*/
/// Class constructor
///
/// @param server_workers_count Number of workers in RPC Server.
/// @param client_workers_count Number of workers in RPC Client.
/// @param worker_id ID of Raft worker (node) on this machine.
/// @param workers mapping from worker id to endpoint information.
Coordination(uint16_t server_workers_count, uint16_t client_workers_count,
uint16_t worker_id,
std::unordered_map<uint16_t, io::network::Endpoint> workers);
~Coordination();
Coordination(const Coordination &) = delete;
Coordination(Coordination &&) = delete;
Coordination &operator=(const Coordination &) = delete;
Coordination &operator=(Coordination &&) = delete;
/// Gets the endpoint for the given `worker_id`.
io::network::Endpoint GetEndpoint(int worker_id);
@ -86,22 +90,34 @@ class Coordination final {
static std::unordered_map<uint16_t, io::network::Endpoint> LoadFromFile(
const std::string &coordination_config_file);
protected:
/// Adds a worker to the coordination. This function can be called multiple
/// times to replace an existing worker.
void AddWorker(int worker_id, const io::network::Endpoint &endpoint);
/// Starts the coordination and its servers.
bool Start();
bool AwaitShutdown(std::function<bool(void)> call_before_shutdown =
[]() -> bool { return true; });
/// Hints that the coordination should start shutting down the whole cluster.
void Shutdown();
/// Gets a worker name for the given endpoint.
std::string GetWorkerName(const io::network::Endpoint &endpoint);
communication::rpc::Server server_;
private:
std::unordered_map<uint16_t, io::network::Endpoint> workers_;
/// Adds a worker to the coordination. This function can be called multiple
/// times to replace an existing worker.
void AddWorker(int worker_id, const io::network::Endpoint &endpoint);
communication::rpc::Server server_;
uint16_t worker_id_;
mutable std::mutex lock_;
std::unordered_map<uint16_t, io::network::Endpoint> workers_;
std::unordered_map<int, communication::rpc::ClientPool> client_pools_;
utils::ThreadPool thread_pool_;
// Flags used for shutdown.
std::atomic<bool> alive_{true};
};
} // namespace raft

View File

@ -10,7 +10,6 @@
#include "raft/raft_rpc_messages.hpp"
#include "utils/exceptions.hpp"
namespace raft {
namespace fs = std::experimental::filesystem;
@ -19,22 +18,93 @@ const std::string kRaftDir = "raft";
RaftServer::RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, Coordination *coordination)
: config_(config),
server_id_(server_id),
: server_id_(server_id),
config_(config),
coordination_(coordination),
disk_storage_(fs::path(durability_dir) / kRaftDir) {
coordination->Register<RequestVoteRpc>(
coordination_->Register<RequestVoteRpc>(
[this](const auto &req_reader, auto *res_builder) {
throw utils::NotYetImplemented("RaftServer constructor");
});
coordination->Register<AppendEntriesRpc>(
coordination_->Register<AppendEntriesRpc>(
[this](const auto &req_reader, auto *res_builder) {
throw utils::NotYetImplemented("RaftServer constructor");
});
}
void RaftServer::Transition(const Mode &new_mode) {
if (new_mode == Mode::LEADER)
log_entry_buffer_.Enable();
else
log_entry_buffer_.Disable();
throw utils::NotYetImplemented("RaftServer transition");
}
void RaftServer::Replicate(const std::vector<database::StateDelta> &log) {
throw utils::NotYetImplemented("RaftServer replication");
}
void RaftServer::Emplace(const database::StateDelta &delta) {
log_entry_buffer_.Emplace(delta);
}
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
: raft_server_(raft_server) {
CHECK(raft_server_) << "RaftServer can't be nullptr";
}
void RaftServer::LogEntryBuffer::Enable() {
std::lock_guard<std::mutex> guard(lock_);
enabled_ = true;
}
void RaftServer::LogEntryBuffer::Disable() {
std::lock_guard<std::mutex> guard(lock_);
enabled_ = false;
// Clear all existing logs from buffers.
logs_.clear();
}
void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
std::lock_guard<std::mutex> guard(lock_);
if (!enabled_) return;
tx::TransactionId tx_id = delta.transaction_id;
if (IsStateDeltaTransactionEnd(delta)) {
auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
std::vector<database::StateDelta> log(std::move(it->second));
log.emplace_back(std::move(delta));
logs_.erase(it);
raft_server_->Replicate(log);
} else {
logs_[tx_id].emplace_back(std::move(delta));
}
}
bool RaftServer::LogEntryBuffer::IsStateDeltaTransactionEnd(
const database::StateDelta &delta) {
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_COMMIT:
return true;
case database::StateDelta::Type::TRANSACTION_ABORT:
case database::StateDelta::Type::TRANSACTION_BEGIN:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
case database::StateDelta::Type::ADD_LABEL:
case database::StateDelta::Type::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_VERTEX:
case database::StateDelta::Type::REMOVE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
case database::StateDelta::Type::DROP_INDEX:
return false;
}
}
} // namespace raft

View File

@ -2,9 +2,14 @@
#pragma once
#include "raft/config.hpp"
#include <mutex>
#include <unordered_map>
#include <vector>
#include "durability/single_node_ha/state_delta.hpp"
#include "raft/config.hpp"
#include "storage/common/kvstore/kvstore.hpp"
#include "transactions/type.hpp"
namespace raft {
@ -13,12 +18,10 @@ class Coordination;
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
/**
* Class which models the behaviour of a single server within the Raft
* cluster. The class is responsible for storing both volatile and
* persistent internal state of the corresponding state machine as well
* as performing operations that comply with the Raft protocol.
*/
/// Class which models the behaviour of a single server within the Raft
/// cluster. The class is responsible for storing both volatile and
/// persistent internal state of the corresponding state machine as well
/// as performing operations that comply with the Raft protocol.
class RaftServer {
public:
RaftServer() = delete;
@ -35,17 +38,70 @@ class RaftServer {
RaftServer(uint16_t server_id, const std::string &durability_dir,
const Config &config, raft::Coordination *coordination);
void Replicate(const std::vector<database::StateDelta> &log);
void Emplace(const database::StateDelta &delta);
private:
/** volatile state on all servers **/
/// Buffers incomplete Raft logs.
///
/// A Raft log is considered to be complete if it ends with a StateDelta
/// that represents transaction commit;
/// LogEntryBuffer will be used instead of WriteAheadLog. We don't need to
/// persist logs until we receive a majority vote from the Raft cluster, and
/// apply the to our local state machine(storage).
class LogEntryBuffer final {
public:
LogEntryBuffer() = delete;
raft::Mode mode_; ///< Server's current mode.
raft::Config config_; ///< Raft config.
explicit LogEntryBuffer(RaftServer *raft_server);
uint16_t server_id_; ///< ID of the current server.
uint64_t commit_index_; ///< Index of the highest known commited entry.
void Enable();
/// Disable all future insertions in the buffer.
///
/// Note: this will also clear all existing logs from buffers.
void Disable();
/// Insert a new StateDelta in logs.
///
/// If for a state delta, `IsStateDeltaTransactionEnd` returns true, this
/// marks that this log is complete and the replication can start.
void Emplace(const database::StateDelta &delta);
private:
bool enabled_{false};
mutable std::mutex lock_;
std::unordered_map<tx::TransactionId, std::vector<database::StateDelta>>
logs_;
RaftServer *raft_server_{nullptr};
bool IsStateDeltaTransactionEnd(const database::StateDelta &delta);
};
//////////////////////////////////////////////////////////////////////////////
// volatile state on all servers
//////////////////////////////////////////////////////////////////////////////
uint16_t server_id_; ///< ID of the current server.
Config config_; ///< Raft config.
Coordination *coordination_{nullptr}; ///< Cluster coordination.
Mode mode_; ///< Server's current mode.
uint64_t commit_index_; ///< Index of the highest known committed entry.
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
/** volatile state on leaders **/
/// Raft log entry buffer.
///
/// LogEntryBuffer buffers Raft logs until a log is complete and ready for
/// replication. This doesn't have to persist, if something fails before a
/// log is ready for replication it will be discarded anyway.
LogEntryBuffer log_entry_buffer_{this};
//////////////////////////////////////////////////////////////////////////////
// volatile state on leaders
//////////////////////////////////////////////////////////////////////////////
std::vector<uint16_t> next_index_; ///< for each server, index of the next
///< log entry to send to that server.
@ -54,22 +110,20 @@ class RaftServer {
///< highest log entry known to be
///< replicated on server.
/** persistent state on all servers
*
* Persistent data consists of:
* - uint64_t current_term -- latest term server has seen.
* - uint16_t voted_for -- candidate_id that received vote in current
* term (null if none).
* - vector<LogEntry> log -- log entries.
*/
//////////////////////////////////////////////////////////////////////////////
// persistent state on all servers
//
// Persistent data consists of:
// - uint64_t current_term -- latest term server has seen.
// - uint16_t voted_for -- candidate_id that received vote in current
// term (null if none).
// - vector<LogEntry> log -- log entries.
//////////////////////////////////////////////////////////////////////////////
storage::KVStore disk_storage_;
/**
* Makes a transition to a new `raft::Mode`.
*
* @throws InvalidTransitionException when transitioning between uncompatible
* `raft::Mode`s.
*/
void Transition(const raft::Mode &new_mode);
/// Makes a transition to a new `raft::Mode`.
///
/// @throws InvalidTransitionException when transitioning between incompatible
void Transition(const Mode &new_mode);
};
} // namespace raft

View File

@ -27,7 +27,7 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key,
dba.PropertyName(key), value);
update().properties_.set(key, value);
dba.UpdatePropertyIndex(key, *this, &update());
db_accessor().wal().Emplace(delta);
db_accessor().raft_server().Emplace(delta);
}
template <>
@ -38,7 +38,7 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key,
dba.PropertyName(key), value);
update().properties_.set(key, value);
db_accessor().wal().Emplace(delta);
db_accessor().raft_server().Emplace(delta);
}
template <>
@ -48,7 +48,7 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) {
StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
update().properties_.set(key, PropertyValue::Null);
db_accessor().wal().Emplace(delta);
db_accessor().raft_server().Emplace(delta);
}
template <>
@ -58,7 +58,7 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) {
StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
dba.PropertyName(key), PropertyValue::Null);
update().properties_.set(key, PropertyValue::Null);
db_accessor().wal().Emplace(delta);
db_accessor().raft_server().Emplace(delta);
}
template <typename TRecord>

View File

@ -24,7 +24,7 @@ void VertexAccessor::add_label(storage::Label label) {
// not a duplicate label, add it
if (!utils::Contains(vertex.labels_, label)) {
vertex.labels_.emplace_back(label);
dba.wal().Emplace(delta);
dba.raft_server().Emplace(delta);
dba.UpdateLabelIndices(label, *this, &vertex);
}
}
@ -39,7 +39,7 @@ void VertexAccessor::remove_label(storage::Label label) {
auto found = std::find(labels.begin(), labels.end(), delta.label);
std::swap(*found, labels.back());
labels.pop_back();
dba.wal().Emplace(delta);
dba.raft_server().Emplace(delta);
}
}

View File

@ -6,10 +6,13 @@
#include "glog/logging.h"
#include "durability/single_node_ha/state_delta.hpp"
#include "raft/raft_server.hpp"
namespace tx {
Engine::Engine(durability::WriteAheadLog *wal) : wal_(wal) {}
Engine::Engine(raft::RaftServer *raft_server) : raft_server_(raft_server) {
CHECK(raft_server) << "LogBuffer can't be nullptr in HA";
}
Transaction *Engine::Begin() {
VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
@ -78,9 +81,7 @@ void Engine::Commit(const Transaction &t) {
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_committed(t.id_);
active_.remove(t.id_);
if (wal_) {
wal_->Emplace(database::StateDelta::TxCommit(t.id_));
}
raft_server_->Emplace(database::StateDelta::TxCommit(t.id_));
store_.erase(store_.find(t.id_));
if (t.blocking()) {
accepting_transactions_.store(true);
@ -92,9 +93,7 @@ void Engine::Abort(const Transaction &t) {
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
active_.remove(t.id_);
if (wal_) {
wal_->Emplace(database::StateDelta::TxAbort(t.id_));
}
raft_server_->Emplace(database::StateDelta::TxAbort(t.id_));
store_.erase(store_.find(t.id_));
if (t.blocking()) {
accepting_transactions_.store(true);
@ -169,9 +168,7 @@ Transaction *Engine::BeginTransaction(bool blocking) {
Transaction *t = new Transaction(id, active_, *this, blocking);
active_.insert(id);
store_.emplace(id, t);
if (wal_) {
wal_->Emplace(database::StateDelta::TxBegin(id));
}
raft_server_->Emplace(database::StateDelta::TxBegin(id));
return t;
}

View File

@ -6,23 +6,28 @@
#include <experimental/optional>
#include <unordered_map>
#include "durability/single_node_ha/wal.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/transaction.hpp"
#include "utils/thread/sync.hpp"
// Forward declarations.
namespace raft {
class RaftServer;
}
namespace tx {
class TransactionEngineError : public utils::BasicException {
using utils::BasicException::BasicException;
};
/// Single-node deployment transaction engine. Has complete functionality.
/// High availability single node transaction engine.
///
/// Requires RaftServer where it stores StateDeltas containing transaction
/// information needed for raft followers when replicating logs.
class Engine final {
public:
/// @param wal - Optional. If present, the Engine will write tx
/// Begin/Commit/Abort atomically (while under lock).
explicit Engine(durability::WriteAheadLog *wal = nullptr);
explicit Engine(raft::RaftServer *log_buffer);
Engine(const Engine &) = delete;
Engine(Engine &&) = delete;
@ -65,9 +70,7 @@ class Engine final {
std::unordered_map<TransactionId, std::unique_ptr<Transaction>> store_;
Snapshot active_;
mutable utils::SpinLock lock_;
// Optional. If present, the Engine will write tx Begin/Commit/Abort
// atomically (while under lock).
durability::WriteAheadLog *wal_{nullptr};
raft::RaftServer *raft_server_{nullptr};
std::atomic<bool> accepting_transactions_{true};
// Helper method for transaction begin.