Handle durability versions in distributed system

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1597
This commit is contained in:
Ivan Paljak 2018-09-13 14:56:08 +02:00
parent b5cdf6b476
commit 06b3240f9e
9 changed files with 109 additions and 42 deletions

View File

@ -575,10 +575,9 @@ class Master {
Config config_;
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled,
config_.synchronous_commit};
durability::WriteAheadLog wal_{
config_.worker_id, config_.durability_directory,
config_.durability_enabled, config_.synchronous_commit};
// Shared implementations for all RecordAccessor in this Db.
DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_,
&updates_clients_};
@ -649,6 +648,9 @@ Master::Master(Config config)
durability::RecoveryData recovery_data;
// Recover only if necessary.
if (impl_->config_.db_recover_on_startup) {
CHECK(durability::VersionConsistency(impl_->config_.durability_directory))
<< "Contents of durability directory are not compatible with the "
"current version of Memgraph binary!";
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
std::experimental::nullopt, config.worker_id);
@ -656,9 +658,10 @@ Master::Master(Config config)
// Post-recovery setup and checking.
impl_->coordination_.SetRecoveredSnapshot(
recovery_info
? std::experimental::make_optional(recovery_info->snapshot_tx_id)
: std::experimental::nullopt);
recovery_info ? std::experimental::make_optional(
std::make_pair(recovery_info->durability_version,
recovery_info->snapshot_tx_id))
: std::experimental::nullopt);
// Wait till workers report back their recoverable wal txs
if (recovery_info) {
@ -691,6 +694,17 @@ Master::Master(Config config)
}
if (impl_->config_.durability_enabled) {
// move any existing snapshots or wal files to a deprecated folder.
if (!impl_->config_.db_recover_on_startup &&
durability::ContainsDurabilityFiles(
impl_->config_.durability_directory)) {
durability::MoveToBackup(impl_->config_.durability_directory);
LOG(WARNING) << "Since Memgraph was not supposed to recover on startup "
"and durability is enabled, your current durability "
"files will likely be overriden. To prevent important "
"data loss, Memgraph has stored those files into a "
".backup directory inside durability directory";
}
impl_->wal_.Init();
snapshot_creator_ = std::make_unique<utils::Scheduler>();
snapshot_creator_->Run(
@ -891,10 +905,9 @@ class Worker {
Config config_;
std::unique_ptr<Storage> storage_ =
std::make_unique<Storage>(config_.worker_id, config_.properties_on_disk);
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled,
config_.synchronous_commit};
durability::WriteAheadLog wal_{
config_.worker_id, config_.durability_directory,
config_.durability_enabled, config_.synchronous_commit};
// Shared implementations for all RecordAccessor in this Db.
DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_,
&updates_clients_};
@ -917,7 +930,8 @@ class Worker {
distributed::WorkerCoordination coordination_{server_,
config_.master_endpoint};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
tx::EngineWorker tx_engine_{server_, rpc_worker_clients_.GetClientPool(0), &wal_};
tx::EngineWorker tx_engine_{server_, rpc_worker_clients_.GetClientPool(0),
&wal_};
std::unique_ptr<StorageGcWorker> storage_gc_ =
std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
@ -969,7 +983,7 @@ Worker::Worker(Config config)
// Durability recovery.
{
// What we should recover.
// What we should recover (version, transaction_id) pair.
auto snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover();
// What we recover.
@ -978,21 +992,41 @@ Worker::Worker(Config config)
durability::RecoveryData recovery_data;
// Recover only if necessary.
if (snapshot_to_recover) {
// check version consistency.
if (!durability::DistributedVersionConsistency(
snapshot_to_recover->first))
LOG(FATAL) << "Memgraph worker failed to recover due to version "
"inconsistency with the master.";
if (!durability::VersionConsistency(impl_->config_.durability_directory))
LOG(FATAL)
<< "Contents of durability directory are not compatible with the "
"current version of Memgraph binary!";
recovery_info = durability::RecoverOnlySnapshot(
impl_->config_.durability_directory, this, &recovery_data,
snapshot_to_recover, config.worker_id);
snapshot_to_recover->second, config.worker_id);
}
// Post-recovery setup and checking.
if (snapshot_to_recover &&
(!recovery_info ||
snapshot_to_recover != recovery_info->snapshot_tx_id))
snapshot_to_recover->second != recovery_info->snapshot_tx_id))
LOG(FATAL) << "Memgraph worker failed to recover the database state "
"recovered on the master";
impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info);
}
if (impl_->config_.durability_enabled) {
// move any existing snapshots or wal files to a deprecated folder.
if (!impl_->config_.db_recover_on_startup &&
durability::ContainsDurabilityFiles(
impl_->config_.durability_directory)) {
durability::MoveToBackup(impl_->config_.durability_directory);
LOG(WARNING) << "Since Memgraph was not supposed to recover on startup "
"and durability is enabled, your current durability "
"files will likely be overriden. To prevent important "
"data loss, Memgraph has stored those files into a "
".backup directory inside durability directory";
}
impl_->wal_.Init();
}

View File

@ -3,7 +3,7 @@
#pragma once
#include "database/graph_db.hpp"
#include "durability/recovery.hpp"
#include "durability/version.hpp"
namespace distributed {
class BfsRpcServer;

View File

@ -49,7 +49,7 @@ class ClusterDiscoveryWorker final {
Server &server_;
WorkerCoordination &coordination_;
communication::rpc::ClientPool &client_pool_;
std::experimental::optional<tx::TransactionId> snapshot_to_recover_;
std::experimental::optional<std::pair<int64_t, tx::TransactionId>> snapshot_to_recover_;
};
} // namespace distributed

View File

@ -75,7 +75,8 @@ MasterCoordination::~MasterCoordination() {
}
void MasterCoordination::SetRecoveredSnapshot(
std::experimental::optional<tx::TransactionId> recovered_snapshot_tx) {
std::experimental::optional<std::pair<int64_t, tx::TransactionId>>
recovered_snapshot_tx) {
std::lock_guard<std::mutex> guard(lock_);
recovery_done_ = true;
recovered_snapshot_tx_ = recovered_snapshot_tx;
@ -85,7 +86,7 @@ int MasterCoordination::CountRecoveredWorkers() const {
return recovered_workers_.size();
}
std::experimental::optional<tx::TransactionId>
std::experimental::optional<std::pair<int64_t, tx::TransactionId>>
MasterCoordination::RecoveredSnapshotTx() const {
std::lock_guard<std::mutex> guard(lock_);
CHECK(recovery_done_) << "Recovered snapshot requested before it's available";

View File

@ -41,9 +41,11 @@ class MasterCoordination final : public Coordination {
/// Sets the recovery info. nullopt indicates nothing was recovered.
void SetRecoveredSnapshot(
std::experimental::optional<tx::TransactionId> recovered_snapshot);
std::experimental::optional<std::pair<int64_t, tx::TransactionId>>
recovered_snapshot);
std::experimental::optional<tx::TransactionId> RecoveredSnapshotTx() const;
std::experimental::optional<std::pair<int64_t, tx::TransactionId>>
RecoveredSnapshotTx() const;
int CountRecoveredWorkers() const;
@ -61,7 +63,8 @@ class MasterCoordination final : public Coordination {
std::map<int, std::experimental::optional<durability::RecoveryInfo>>
recovered_workers_;
/// If nullopt nothing was recovered.
std::experimental::optional<tx::TransactionId> recovered_snapshot_tx_;
std::experimental::optional<std::pair<int64_t, tx::TransactionId>>
recovered_snapshot_tx_;
};
} // namespace distributed

View File

@ -26,20 +26,27 @@ cpp<#
(:response
((registration-successful :bool)
(durability-error :bool)
(snapshot-to-recover "std::experimental::optional<tx::TransactionId>"
:capnp-type "Utils.Optional(Utils.BoxUInt64)"
(snapshot-to-recover "std::experimental::optional<std::pair<int64_t, tx::TransactionId>>"
:capnp-type "Utils.Optional(Utils.Pair(Utils.BoxUInt64, Utils.BoxUInt64))"
:capnp-save
(lambda (builder member)
#>cpp
utils::SaveOptional<utils::capnp::BoxUInt64, tx::TransactionId>(
utils::SaveOptional<utils::capnp::Pair<utils::capnp::BoxUInt64, utils::capnp::BoxUInt64>, std::pair<int64_t, tx::TransactionId>>(
${member}, &${builder},
[](auto builder, const auto &v){ builder->setValue(v); });
[](auto builder, const auto &v) {
auto first_builder = builder->initFirst();
auto second_builder = builder->initSecond();
first_builder.setValue(v.first);
second_builder.setValue(v.second);
});
cpp<#)
:capnp-load
(lambda (reader member)
#>cpp
${member} = utils::LoadOptional<utils::capnp::BoxUInt64, tx::TransactionId>(
${reader}, [](auto reader){ return reader.getValue(); });
${member} = utils::LoadOptional<utils::capnp::Pair<utils::capnp::BoxUInt64, utils::capnp::BoxUInt64>, std::pair<int64_t, tx::TransactionId>>(
${reader}, [](auto reader){
return std::make_pair(reader.getFirst().getValue(), reader.getSecond().getValue());
});
cpp<#))
(workers "std::unordered_map<int, io::network::Endpoint>"
:capnp-type "Utils.Map(Utils.BoxInt16, Io.Endpoint)"
@ -48,10 +55,10 @@ cpp<#
#>cpp
utils::SaveMap<utils::capnp::BoxInt16, io::network::capnp::Endpoint>(${member}, &${builder},
[](auto *builder, const auto &entry) {
auto key_builder = builder->initKey();
key_builder.setValue(entry.first);
auto value_builder = builder->initValue();
entry.second.Save(&value_builder);
auto key_builder = builder->initKey();
key_builder.setValue(entry.first);
auto value_builder = builder->initValue();
entry.second.Save(&value_builder);
});
cpp<#)
:capnp-load
@ -59,9 +66,10 @@ cpp<#
#>cpp
utils::LoadMap<utils::capnp::BoxInt16, io::network::capnp::Endpoint>(&${member}, ${reader},
[](const auto &reader) {
io::network::Endpoint value;
value.Load(reader.getValue());
return std::make_pair(reader.getKey().getValue(), value);
io::network::Endpoint value;
value.Load(reader.getValue());
return std::make_pair(
reader.getKey().getValue(), value);
});
cpp<#)))))

View File

@ -6,8 +6,9 @@ using Utils = import "/utils/serialization.capnp";
$Cxx.namespace("durability::capnp");
struct RecoveryInfo {
snapshotTxId @0 :UInt64;
walRecovered @1 :List(UInt64);
durabilityVersion @0 :UInt64;
snapshotTxId @1 :UInt64;
walRecovered @2 :List(UInt64);
}
struct RecoveryData {

View File

@ -68,6 +68,10 @@ bool VersionConsistency(const fs::path &durability_dir) {
return true;
}
bool DistributedVersionConsistency(const int64_t master_version) {
return durability::kVersion == master_version;
}
bool ContainsDurabilityFiles(const fs::path &durability_dir) {
for (const auto &durability_type : {kSnapshotDir, kWalDir}) {
auto recovery_dir = durability_dir / durability_type;
@ -473,9 +477,9 @@ RecoveryInfo RecoverOnlySnapshot(
// the WAL recovery.
if (required_snapshot_tx_id &&
recovery_data->snapshooter_tx_id != *required_snapshot_tx_id)
return {recovery_data->snapshooter_tx_id, {}};
return {durability::kVersion, recovery_data->snapshooter_tx_id, {}};
return {recovery_data->snapshooter_tx_id,
return {durability::kVersion, recovery_data->snapshooter_tx_id,
ReadWalRecoverableTransactions(durability_dir / kWalDir, db,
*recovery_data)};
}

View File

@ -18,25 +18,32 @@ namespace durability {
/// Stores info on what was (or needs to be) recovered from durability.
struct RecoveryInfo {
RecoveryInfo() {}
RecoveryInfo(tx::TransactionId snapshot_tx_id,
RecoveryInfo(const int64_t durability_version,
tx::TransactionId snapshot_tx_id,
const std::vector<tx::TransactionId> &wal_recovered)
: snapshot_tx_id(snapshot_tx_id), wal_recovered(wal_recovered) {}
: durability_version(durability_version),
snapshot_tx_id(snapshot_tx_id),
wal_recovered(wal_recovered) {}
int64_t durability_version;
tx::TransactionId snapshot_tx_id;
std::vector<tx::TransactionId> wal_recovered;
bool operator==(const RecoveryInfo &other) const {
return snapshot_tx_id == other.snapshot_tx_id &&
return durability_version == other.durability_version &&
snapshot_tx_id == other.snapshot_tx_id &&
wal_recovered == other.wal_recovered;
}
bool operator!=(const RecoveryInfo &other) const { return !(*this == other); }
void Save(capnp::RecoveryInfo::Builder *builder) const {
builder->setDurabilityVersion(durability_version);
builder->setSnapshotTxId(snapshot_tx_id);
auto list_builder = builder->initWalRecovered(wal_recovered.size());
utils::SaveVector(wal_recovered, &list_builder);
}
void Load(const capnp::RecoveryInfo::Reader &reader) {
durability_version = reader.getDurabilityVersion();
snapshot_tx_id = reader.getSnapshotTxId();
auto list_reader = reader.getWalRecovered();
utils::LoadVector(&wal_recovered, list_reader);
@ -117,6 +124,15 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
bool VersionConsistency(
const std::experimental::filesystem::path &durability_dir);
/**
* Checks whether the current memgraph binary (on a worker) is
* version consistent with the cluster master.
*
* @param master_version - Version of the master.
* @return - True if versions match.
*/
bool DistributedVersionConsistency(const int64_t master_version);
/**
* Checks whether the durability directory contains snapshot
* or write-ahead log file.