Durability distributed wal
Summary: Recover only snapshot Return recovery info on worker recovery Update tests Start wal recovery Single node wal split Keep track of wal possible to recover Fix comment Wal tx intersection Merge branch 'master' into sync_wal_tx Reviewers: buda, ipaljak, dgleich, vkasljevic, teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1489
This commit is contained in:
parent
5db39ac501
commit
0681040395
@ -28,8 +28,8 @@ set(memgraph_src_files
|
||||
distributed/data_manager.cpp
|
||||
distributed/data_rpc_clients.cpp
|
||||
distributed/data_rpc_server.cpp
|
||||
distributed/durability_rpc_clients.cpp
|
||||
distributed/durability_rpc_server.cpp
|
||||
distributed/durability_rpc_master.cpp
|
||||
distributed/durability_rpc_worker.cpp
|
||||
distributed/index_rpc_server.cpp
|
||||
distributed/plan_consumer.cpp
|
||||
distributed/plan_dispatcher.cpp
|
||||
|
@ -11,8 +11,8 @@
|
||||
#include "distributed/coordination_worker.hpp"
|
||||
#include "distributed/data_manager.hpp"
|
||||
#include "distributed/data_rpc_server.hpp"
|
||||
#include "distributed/durability_rpc_clients.hpp"
|
||||
#include "distributed/durability_rpc_server.hpp"
|
||||
#include "distributed/durability_rpc_master.hpp"
|
||||
#include "distributed/durability_rpc_worker.hpp"
|
||||
#include "distributed/index_rpc_server.hpp"
|
||||
#include "distributed/plan_dispatcher.hpp"
|
||||
#include "distributed/pull_rpc_clients.hpp"
|
||||
@ -48,7 +48,7 @@ struct TypemapPack {
|
||||
|
||||
class Master {
|
||||
public:
|
||||
explicit Master(const Config &config, DistributedGraphDb *self)
|
||||
explicit Master(const Config &config, database::Master *self)
|
||||
: config_(config), self_(self) {}
|
||||
|
||||
Config config_;
|
||||
@ -62,7 +62,7 @@ class Master {
|
||||
// have a lot of circular pointers among members. It would be a good idea to
|
||||
// clean the mess. Also, be careful of virtual calls to `self_` in
|
||||
// constructors of members.
|
||||
DistributedGraphDb *self_{nullptr};
|
||||
database::Master *self_{nullptr};
|
||||
communication::rpc::Server server_{
|
||||
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
|
||||
tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
|
||||
@ -79,8 +79,7 @@ class Master {
|
||||
&subcursor_storage_};
|
||||
distributed::BfsRpcClients bfs_subcursor_clients_{
|
||||
self_, &subcursor_storage_, &rpc_worker_clients_, &data_manager_};
|
||||
distributed::DurabilityRpcClients durability_rpc_clients_{
|
||||
rpc_worker_clients_};
|
||||
distributed::DurabilityRpcMaster durability_rpc_{rpc_worker_clients_};
|
||||
distributed::DataRpcServer data_server_{*self_, server_};
|
||||
distributed::DataRpcClients data_clients_{rpc_worker_clients_};
|
||||
distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_};
|
||||
@ -112,14 +111,21 @@ Master::Master(Config config)
|
||||
// What we recover.
|
||||
std::experimental::optional<durability::RecoveryInfo> recovery_info;
|
||||
|
||||
durability::RecoveryData recovery_data;
|
||||
// Recover only if necessary.
|
||||
if (impl_->config_.db_recover_on_startup) {
|
||||
recovery_info = durability::Recover(impl_->config_.durability_directory,
|
||||
*this, std::experimental::nullopt);
|
||||
recovery_info = durability::RecoverOnlySnapshot(
|
||||
impl_->config_.durability_directory, this, &recovery_data,
|
||||
std::experimental::nullopt);
|
||||
}
|
||||
|
||||
// Post-recovery setup and checking.
|
||||
impl_->coordination_.SetRecoveryInfo(recovery_info);
|
||||
impl_->coordination_.SetRecoveredSnapshot(
|
||||
recovery_info
|
||||
? std::experimental::make_optional(recovery_info->snapshot_tx_id)
|
||||
: std::experimental::nullopt);
|
||||
|
||||
// Wait till workers report back their recoverable wal txs
|
||||
if (recovery_info) {
|
||||
CHECK(impl_->config_.recovering_cluster_size > 0)
|
||||
<< "Invalid cluster recovery size flag. Recovered cluster size "
|
||||
@ -129,8 +135,19 @@ Master::Master(Config config)
|
||||
LOG(INFO) << "Waiting for workers to finish recovering..";
|
||||
std::this_thread::sleep_for(2s);
|
||||
}
|
||||
|
||||
// Get the intersection of recoverable transactions from wal on
|
||||
// workers and on master
|
||||
recovery_data.wal_tx_to_recover =
|
||||
impl_->coordination_.CommonWalTransactions(*recovery_info);
|
||||
durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
|
||||
this, &recovery_data);
|
||||
auto workers_recovered_wal =
|
||||
impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data);
|
||||
workers_recovered_wal.get();
|
||||
}
|
||||
}
|
||||
|
||||
// Start the dynamic graph partitioner inside token sharing server
|
||||
if (impl_->config_.dynamic_graph_partitioner_enabled) {
|
||||
impl_->token_sharing_server_.StartTokenSharing();
|
||||
@ -214,7 +231,7 @@ std::vector<int> Master::GetWorkerIds() const {
|
||||
// written here only if workers sucesfully created their own snapshot
|
||||
bool Master::MakeSnapshot(GraphDbAccessor &accessor) {
|
||||
auto workers_snapshot =
|
||||
impl_->durability_rpc_clients_.MakeSnapshot(accessor.transaction_id());
|
||||
impl_->durability_rpc_.MakeSnapshot(accessor.transaction_id());
|
||||
if (!workers_snapshot.get()) return false;
|
||||
// This can be further optimized by creating master snapshot at the same
|
||||
// time as workers snapshots but this forces us to delete the master
|
||||
@ -295,7 +312,7 @@ class Worker {
|
||||
config_.durability_directory,
|
||||
config_.durability_enabled};
|
||||
|
||||
explicit Worker(const Config &config, DistributedGraphDb *self)
|
||||
explicit Worker(const Config &config, database::Worker *self)
|
||||
: config_(config), self_(self) {
|
||||
cluster_discovery_.RegisterWorker(config.worker_id);
|
||||
}
|
||||
@ -304,7 +321,7 @@ class Worker {
|
||||
// have a lot of circular pointers among members. It would be a good idea to
|
||||
// clean the mess. Also, be careful of virtual calls to `self_` in
|
||||
// constructors of members.
|
||||
DistributedGraphDb *self_{nullptr};
|
||||
database::Worker *self_{nullptr};
|
||||
communication::rpc::Server server_{
|
||||
config_.worker_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
|
||||
distributed::WorkerCoordination coordination_{server_,
|
||||
@ -336,7 +353,7 @@ class Worker {
|
||||
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
|
||||
tx_engine_, &wal_, server_,
|
||||
produce_server_, updates_server_, data_manager_};
|
||||
distributed::DurabilityRpcServer durability_rpc_server_{*self_, server_};
|
||||
distributed::DurabilityRpcWorker durability_rpc_{self_, &server_};
|
||||
distributed::ClusterDiscoveryWorker cluster_discovery_{
|
||||
server_, coordination_, rpc_worker_clients_.GetClientPool(0)};
|
||||
distributed::TokenSharingRpcClients token_sharing_clients_{
|
||||
@ -356,23 +373,27 @@ Worker::Worker(Config config)
|
||||
// Durability recovery.
|
||||
{
|
||||
// What we should recover.
|
||||
std::experimental::optional<durability::RecoveryInfo>
|
||||
required_recovery_info(impl_->cluster_discovery_.recovery_info());
|
||||
std::experimental::optional<tx::TransactionId> snapshot_to_recover;
|
||||
snapshot_to_recover = impl_->cluster_discovery_.snapshot_to_recover();
|
||||
|
||||
// What we recover.
|
||||
std::experimental::optional<durability::RecoveryInfo> recovery_info;
|
||||
|
||||
durability::RecoveryData recovery_data;
|
||||
// Recover only if necessary.
|
||||
if (required_recovery_info) {
|
||||
recovery_info = durability::Recover(impl_->config_.durability_directory,
|
||||
*this, required_recovery_info);
|
||||
if (snapshot_to_recover) {
|
||||
recovery_info = durability::RecoverOnlySnapshot(
|
||||
impl_->config_.durability_directory, this, &recovery_data,
|
||||
snapshot_to_recover);
|
||||
}
|
||||
|
||||
// Post-recovery setup and checking.
|
||||
if (required_recovery_info != recovery_info)
|
||||
if (snapshot_to_recover &&
|
||||
(!recovery_info ||
|
||||
snapshot_to_recover != recovery_info->snapshot_tx_id))
|
||||
LOG(FATAL) << "Memgraph worker failed to recover the database state "
|
||||
"recovered on the master";
|
||||
impl_->cluster_discovery_.NotifyWorkerRecovered();
|
||||
impl_->cluster_discovery_.NotifyWorkerRecovered(recovery_info);
|
||||
}
|
||||
|
||||
if (impl_->config_.durability_enabled) {
|
||||
@ -456,6 +477,11 @@ void Worker::ReinitializeStorage() {
|
||||
impl_->rpc_worker_clients_.GetClientPool(0), impl_->config_.worker_id);
|
||||
}
|
||||
|
||||
void Worker::RecoverWalAndIndexes(durability::RecoveryData *recovery_data) {
|
||||
durability::RecoverWalAndIndexes(impl_->config_.durability_directory, this,
|
||||
recovery_data);
|
||||
}
|
||||
|
||||
io::network::Endpoint Worker::endpoint() const {
|
||||
return impl_->server_.endpoint();
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "database/graph_db.hpp"
|
||||
#include "durability/recovery.hpp"
|
||||
|
||||
namespace distributed {
|
||||
class BfsRpcServer;
|
||||
@ -100,6 +101,7 @@ class Worker final : public DistributedGraphDb {
|
||||
std::vector<int> GetWorkerIds() const override;
|
||||
bool MakeSnapshot(GraphDbAccessor &accessor) override;
|
||||
void ReinitializeStorage() override;
|
||||
void RecoverWalAndIndexes(durability::RecoveryData *recovery_data);
|
||||
|
||||
/** Gets this worker's endpoint. */
|
||||
io::network::Endpoint endpoint() const;
|
||||
|
@ -54,10 +54,25 @@ SingleNode::SingleNode(Config config)
|
||||
if (impl_->config_.durability_enabled)
|
||||
utils::CheckDir(impl_->config_.durability_directory);
|
||||
|
||||
// Recover only if necessary.
|
||||
if (impl_->config_.db_recover_on_startup) {
|
||||
durability::Recover(impl_->config_.durability_directory, *this,
|
||||
std::experimental::nullopt);
|
||||
// Durability recovery.
|
||||
{
|
||||
// What we recover.
|
||||
std::experimental::optional<durability::RecoveryInfo> recovery_info;
|
||||
|
||||
durability::RecoveryData recovery_data;
|
||||
// Recover only if necessary.
|
||||
if (impl_->config_.db_recover_on_startup) {
|
||||
recovery_info = durability::RecoverOnlySnapshot(
|
||||
impl_->config_.durability_directory, this, &recovery_data,
|
||||
std::experimental::nullopt);
|
||||
}
|
||||
|
||||
// Post-recovery setup and checking.
|
||||
if (recovery_info) {
|
||||
recovery_data.wal_tx_to_recover = recovery_info->wal_recovered;
|
||||
durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
|
||||
this, &recovery_data);
|
||||
}
|
||||
}
|
||||
|
||||
if (impl_->config_.durability_enabled) {
|
||||
|
@ -23,13 +23,6 @@ namespace database {
|
||||
class GraphDb;
|
||||
};
|
||||
|
||||
namespace durability {
|
||||
struct RecoveryInfo;
|
||||
RecoveryInfo Recover(const std::experimental::filesystem::path &,
|
||||
database::GraphDb &,
|
||||
std::experimental::optional<RecoveryInfo>);
|
||||
}; // namespace durability
|
||||
|
||||
namespace database {
|
||||
|
||||
/** A data structure containing the main data members of a graph database. */
|
||||
@ -56,6 +49,7 @@ class Storage {
|
||||
|
||||
gid::Generator &VertexGenerator() { return vertex_generator_; }
|
||||
gid::Generator &EdgeGenerator() { return edge_generator_; }
|
||||
LabelPropertyIndex &label_property_index() { return label_property_index_; }
|
||||
|
||||
/// Gets the local address for the given gid. Fails if not present.
|
||||
template <typename TRecord>
|
||||
@ -103,9 +97,6 @@ class Storage {
|
||||
friend class GraphDbAccessor;
|
||||
friend class StorageGc;
|
||||
friend class distributed::IndexRpcServer;
|
||||
friend durability::RecoveryInfo durability::Recover(
|
||||
const std::experimental::filesystem::path &, database::GraphDb &,
|
||||
std::experimental::optional<durability::RecoveryInfo>);
|
||||
|
||||
int worker_id_;
|
||||
gid::Generator vertex_generator_;
|
||||
|
@ -28,14 +28,17 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster(
|
||||
}
|
||||
|
||||
RegisterWorkerRes res(registration_successful,
|
||||
this->coordination_.RecoveryInfo(),
|
||||
this->coordination_.RecoveredSnapshotTx(),
|
||||
this->coordination_.GetWorkers());
|
||||
res.Save(res_builder);
|
||||
});
|
||||
|
||||
server_.Register<NotifyWorkerRecoveredRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
this->coordination_.WorkerRecovered(req_reader.getMember());
|
||||
NotifyWorkerRecoveredReq req;
|
||||
req.Load(req_reader);
|
||||
this->coordination_.WorkerRecoveredSnapshot(req.worker_id,
|
||||
req.recovery_info);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -20,21 +20,24 @@ void ClusterDiscoveryWorker::RegisterWorker(int worker_id) {
|
||||
auto result =
|
||||
client_pool_.Call<RegisterWorkerRpc>(worker_id, server_.endpoint());
|
||||
CHECK(result) << "RegisterWorkerRpc failed";
|
||||
CHECK(result->registration_successful)
|
||||
<< "Unable to assign requested ID (" << worker_id << ") to worker!";
|
||||
CHECK(result->registration_successful) << "Unable to assign requested ID ("
|
||||
<< worker_id << ") to worker!";
|
||||
|
||||
worker_id_ = worker_id;
|
||||
for (auto &kv : result->workers) {
|
||||
coordination_.RegisterWorker(kv.first, kv.second);
|
||||
}
|
||||
recovery_info_ = result->recovery_info;
|
||||
snapshot_to_recover_ = result->snapshot_to_recover;
|
||||
}
|
||||
|
||||
void ClusterDiscoveryWorker::NotifyWorkerRecovered() {
|
||||
void ClusterDiscoveryWorker::NotifyWorkerRecovered(
|
||||
const std::experimental::optional<durability::RecoveryInfo>
|
||||
&recovery_info) {
|
||||
CHECK(worker_id_ >= 0)
|
||||
<< "Workers id is not yet assigned, preform registration before "
|
||||
"notifying that the recovery finished";
|
||||
auto result = client_pool_.Call<NotifyWorkerRecoveredRpc>(worker_id_);
|
||||
auto result =
|
||||
client_pool_.Call<NotifyWorkerRecoveredRpc>(worker_id_, recovery_info);
|
||||
CHECK(result) << "NotifyWorkerRecoveredRpc failed";
|
||||
}
|
||||
|
||||
|
@ -34,17 +34,20 @@ class ClusterDiscoveryWorker final {
|
||||
* Notifies the master that the worker finished recovering. Assumes that the
|
||||
* worker was already registered with master.
|
||||
*/
|
||||
void NotifyWorkerRecovered();
|
||||
void NotifyWorkerRecovered(
|
||||
const std::experimental::optional<durability::RecoveryInfo>
|
||||
&recovery_info);
|
||||
|
||||
/** Returns the recovery info. Valid only after registration. */
|
||||
auto recovery_info() const { return recovery_info_; }
|
||||
/** Returns the snapshot that should be recovered on workers. Valid only after
|
||||
* registration. */
|
||||
auto snapshot_to_recover() const { return snapshot_to_recover_; }
|
||||
|
||||
private:
|
||||
int worker_id_{-1};
|
||||
Server &server_;
|
||||
WorkerCoordination &coordination_;
|
||||
communication::rpc::ClientPool &client_pool_;
|
||||
std::experimental::optional<durability::RecoveryInfo> recovery_info_;
|
||||
std::experimental::optional<tx::TransactionId> snapshot_to_recover_;
|
||||
};
|
||||
|
||||
} // namespace distributed
|
||||
|
@ -40,8 +40,11 @@ bool MasterCoordination::RegisterWorker(int desired_worker_id,
|
||||
return true;
|
||||
}
|
||||
|
||||
void MasterCoordination::WorkerRecovered(int worker_id) {
|
||||
CHECK(recovered_workers_.insert(worker_id).second)
|
||||
void MasterCoordination::WorkerRecoveredSnapshot(
|
||||
int worker_id, const std::experimental::optional<durability::RecoveryInfo>
|
||||
&recovery_info) {
|
||||
CHECK(recovered_workers_.insert(std::make_pair(worker_id, recovery_info))
|
||||
.second)
|
||||
<< "Worker already notified about finishing recovery";
|
||||
}
|
||||
|
||||
@ -71,22 +74,54 @@ MasterCoordination::~MasterCoordination() {
|
||||
}
|
||||
}
|
||||
|
||||
void MasterCoordination::SetRecoveryInfo(
|
||||
std::experimental::optional<durability::RecoveryInfo> info) {
|
||||
void MasterCoordination::SetRecoveredSnapshot(
|
||||
std::experimental::optional<tx::TransactionId> recovered_snapshot_tx) {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
recovery_done_ = true;
|
||||
recovery_info_ = info;
|
||||
recovered_snapshot_tx_ = recovered_snapshot_tx;
|
||||
}
|
||||
|
||||
int MasterCoordination::CountRecoveredWorkers() const {
|
||||
return recovered_workers_.size();
|
||||
}
|
||||
|
||||
std::experimental::optional<durability::RecoveryInfo>
|
||||
MasterCoordination::RecoveryInfo() const {
|
||||
std::experimental::optional<tx::TransactionId>
|
||||
MasterCoordination::RecoveredSnapshotTx() const {
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
CHECK(recovery_done_) << "RecoveryInfo requested before it's available";
|
||||
return recovery_info_;
|
||||
CHECK(recovery_done_) << "Recovered snapshot requested before it's available";
|
||||
return recovered_snapshot_tx_;
|
||||
}
|
||||
|
||||
std::vector<tx::TransactionId> MasterCoordination::CommonWalTransactions(
|
||||
const durability::RecoveryInfo &master_info) const {
|
||||
int cluster_size;
|
||||
std::unordered_map<tx::TransactionId, int> tx_cnt;
|
||||
for (auto tx : master_info.wal_recovered) {
|
||||
tx_cnt[tx]++;
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
for (auto worker : recovered_workers_) {
|
||||
// If there is no recovery info we can just return an empty vector since
|
||||
// we can't restore any transaction
|
||||
if (!worker.second) return {};
|
||||
for (auto tx : worker.second->wal_recovered) {
|
||||
tx_cnt[tx]++;
|
||||
}
|
||||
}
|
||||
// Add one because of master
|
||||
cluster_size = recovered_workers_.size() + 1;
|
||||
}
|
||||
|
||||
std::vector<tx::TransactionId> tx_intersection;
|
||||
for (auto tx : tx_cnt) {
|
||||
if (tx.second == cluster_size) {
|
||||
tx_intersection.push_back(tx.first);
|
||||
}
|
||||
}
|
||||
|
||||
return tx_intersection;
|
||||
}
|
||||
|
||||
} // namespace distributed
|
||||
|
@ -31,31 +31,37 @@ class MasterCoordination final : public Coordination {
|
||||
|
||||
/*
|
||||
* Worker `worker_id` finished with recovering, adds it to the set of
|
||||
* recovered workers.
|
||||
* recovered workers alongside with its recovery_info.
|
||||
*/
|
||||
void WorkerRecovered(int worker_id);
|
||||
void WorkerRecoveredSnapshot(
|
||||
int worker_id, const std::experimental::optional<durability::RecoveryInfo>
|
||||
&recovery_info);
|
||||
|
||||
Endpoint GetEndpoint(int worker_id);
|
||||
|
||||
/// Sets the recovery info. nullopt indicates nothing was recovered.
|
||||
void SetRecoveryInfo(
|
||||
std::experimental::optional<durability::RecoveryInfo> info);
|
||||
void SetRecoveredSnapshot(
|
||||
std::experimental::optional<tx::TransactionId> recovered_snapshot);
|
||||
|
||||
std::experimental::optional<durability::RecoveryInfo> RecoveryInfo() const;
|
||||
std::experimental::optional<tx::TransactionId> RecoveredSnapshotTx() const;
|
||||
|
||||
int CountRecoveredWorkers() const;
|
||||
|
||||
std::vector<tx::TransactionId> CommonWalTransactions(
|
||||
const durability::RecoveryInfo &master_info) const;
|
||||
|
||||
private:
|
||||
// Most master functions aren't thread-safe.
|
||||
mutable std::mutex lock_;
|
||||
|
||||
/// Durabiliry recovery info.
|
||||
/// Durabilility recovery info.
|
||||
/// Indicates if the recovery phase is done.
|
||||
bool recovery_done_{false};
|
||||
/// Set of workers that finished sucesfully recovering
|
||||
std::set<int> recovered_workers_;
|
||||
/// Set of workers that finished sucesfully recovering snapshot
|
||||
std::map<int, std::experimental::optional<durability::RecoveryInfo>>
|
||||
recovered_workers_;
|
||||
/// If nullopt nothing was recovered.
|
||||
std::experimental::optional<durability::RecoveryInfo> recovery_info_;
|
||||
std::experimental::optional<tx::TransactionId> recovered_snapshot_tx_;
|
||||
};
|
||||
|
||||
} // namespace distributed
|
||||
|
@ -24,8 +24,21 @@ cpp<#
|
||||
(endpoint "io::network::Endpoint" :capnp-type "Io.Endpoint")))
|
||||
(:response
|
||||
((registration-successful :bool)
|
||||
(recovery-info "std::experimental::optional<durability::RecoveryInfo>"
|
||||
:capnp-type "Utils.Optional(Dur.RecoveryInfo)")
|
||||
(snapshot-to-recover "std::experimental::optional<tx::TransactionId>"
|
||||
:capnp-type "Utils.Optional(Utils.BoxUInt64)"
|
||||
:capnp-save
|
||||
(lambda (builder member)
|
||||
#>cpp
|
||||
utils::SaveOptional<utils::capnp::BoxUInt64, tx::TransactionId>(
|
||||
${member}, &${builder},
|
||||
[](auto builder, const auto &v){ builder->setValue(v); });
|
||||
cpp<#)
|
||||
:capnp-load
|
||||
(lambda (reader member)
|
||||
#>cpp
|
||||
${member} = utils::LoadOptional<utils::capnp::BoxUInt64, tx::TransactionId>(
|
||||
${reader}, [](auto reader){ return reader.getValue(); });
|
||||
cpp<#))
|
||||
(workers "std::unordered_map<int, io::network::Endpoint>"
|
||||
:capnp-type "Utils.Map(Utils.BoxInt16, Io.Endpoint)"
|
||||
:capnp-save
|
||||
@ -61,8 +74,10 @@ cpp<#
|
||||
(:response ()))
|
||||
|
||||
(lcp:define-rpc notify-worker-recovered
|
||||
(:request ((member :int64_t)))
|
||||
(:request
|
||||
((worker-id :int16_t)
|
||||
(recovery-info "std::experimental::optional<durability::RecoveryInfo>"
|
||||
:capnp-type "Utils.Optional(Dur.RecoveryInfo)")))
|
||||
(:response ()))
|
||||
|
||||
(lcp:pop-namespace) ;; distributed
|
||||
|
||||
|
@ -1,25 +0,0 @@
|
||||
#include "distributed/durability_rpc_clients.hpp"
|
||||
|
||||
#include "distributed/durability_rpc_messages.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
#include "utils/future.hpp"
|
||||
|
||||
namespace distributed {
|
||||
utils::Future<bool> DurabilityRpcClients::MakeSnapshot(tx::TransactionId tx) {
|
||||
return utils::make_future(std::async(std::launch::async, [this, tx] {
|
||||
auto futures = clients_.ExecuteOnWorkers<bool>(
|
||||
0, [tx](int worker_id, communication::rpc::ClientPool &client_pool) {
|
||||
auto res = client_pool.Call<MakeSnapshotRpc>(tx);
|
||||
if (!res) return false;
|
||||
return res->member;
|
||||
});
|
||||
|
||||
bool created = true;
|
||||
for (auto &future : futures) {
|
||||
created &= future.get();
|
||||
}
|
||||
|
||||
return created;
|
||||
}));
|
||||
}
|
||||
} // namespace distributed
|
46
src/distributed/durability_rpc_master.cpp
Normal file
46
src/distributed/durability_rpc_master.cpp
Normal file
@ -0,0 +1,46 @@
|
||||
#include "distributed/durability_rpc_master.hpp"
|
||||
|
||||
#include "distributed/durability_rpc_messages.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
#include "utils/future.hpp"
|
||||
|
||||
namespace distributed {
|
||||
utils::Future<bool> DurabilityRpcMaster::MakeSnapshot(tx::TransactionId tx) {
|
||||
return utils::make_future(std::async(std::launch::async, [this, tx] {
|
||||
auto futures = clients_.ExecuteOnWorkers<bool>(
|
||||
0, [tx](int worker_id, communication::rpc::ClientPool &client_pool) {
|
||||
auto res = client_pool.Call<MakeSnapshotRpc>(tx);
|
||||
if (!res) return false;
|
||||
return res->member;
|
||||
});
|
||||
|
||||
bool created = true;
|
||||
for (auto &future : futures) {
|
||||
created &= future.get();
|
||||
}
|
||||
|
||||
return created;
|
||||
}));
|
||||
}
|
||||
|
||||
utils::Future<bool> DurabilityRpcMaster::RecoverWalAndIndexes(
|
||||
durability::RecoveryData *recovery_data) {
|
||||
return utils::make_future(std::async(std::launch::async, [this,
|
||||
recovery_data] {
|
||||
auto futures = clients_.ExecuteOnWorkers<bool>(
|
||||
0, [recovery_data](int worker_id,
|
||||
communication::rpc::ClientPool &client_pool) {
|
||||
auto res = client_pool.Call<RecoverWalAndIndexesRpc>(*recovery_data);
|
||||
if (!res) return false;
|
||||
return true;
|
||||
});
|
||||
|
||||
bool recovered = true;
|
||||
for (auto &future : futures) {
|
||||
recovered &= future.get();
|
||||
}
|
||||
|
||||
return recovered;
|
||||
}));
|
||||
}
|
||||
} // namespace distributed
|
@ -5,15 +5,16 @@
|
||||
#include <utility>
|
||||
|
||||
#include "distributed/rpc_worker_clients.hpp"
|
||||
#include "durability/recovery.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
/// Provides an ability to trigger snapshooting on other workers.
|
||||
class DurabilityRpcClients {
|
||||
class DurabilityRpcMaster {
|
||||
public:
|
||||
DurabilityRpcClients(RpcWorkerClients &clients) : clients_(clients) {}
|
||||
explicit DurabilityRpcMaster(RpcWorkerClients &clients) : clients_(clients) {}
|
||||
|
||||
// Sends a snapshot request to workers and returns a future which becomes true
|
||||
// if all workers sucesfully completed their snapshot creation, false
|
||||
@ -21,6 +22,9 @@ class DurabilityRpcClients {
|
||||
// @param tx - transaction from which to take db snapshot
|
||||
utils::Future<bool> MakeSnapshot(tx::TransactionId tx);
|
||||
|
||||
utils::Future<bool> RecoverWalAndIndexes(
|
||||
durability::RecoveryData *recovery_data);
|
||||
|
||||
private:
|
||||
RpcWorkerClients &clients_;
|
||||
};
|
@ -6,15 +6,22 @@
|
||||
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "distributed/durability_rpc_messages.capnp.h"
|
||||
#include "durability/recovery.hpp"
|
||||
#include "transactions/transaction.hpp"
|
||||
cpp<#
|
||||
|
||||
(lcp:namespace distributed)
|
||||
|
||||
(lcp:capnp-import 'dur "/durability/recovery.capnp")
|
||||
|
||||
(lcp:capnp-namespace "distributed")
|
||||
|
||||
(lcp:define-rpc make-snapshot
|
||||
(:request ((member "tx::TransactionId" :capnp-type "UInt64")))
|
||||
(:response ((member :bool))))
|
||||
|
||||
(lcp:define-rpc recover-wal-and-indexes
|
||||
(:request ((member "durability::RecoveryData" :capnp-type "Dur.RecoveryData")))
|
||||
(:response ()))
|
||||
|
||||
(lcp:pop-namespace) ;; distributed
|
||||
|
@ -1,20 +0,0 @@
|
||||
#include "distributed/durability_rpc_server.hpp"
|
||||
|
||||
#include "database/graph_db.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "distributed/durability_rpc_messages.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
DurabilityRpcServer::DurabilityRpcServer(database::GraphDb &db,
|
||||
communication::rpc::Server &server)
|
||||
: db_(db), rpc_server_(server) {
|
||||
rpc_server_.Register<MakeSnapshotRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
database::GraphDbAccessor dba(this->db_, req_reader.getMember());
|
||||
MakeSnapshotRes res(this->db_.MakeSnapshot(dba));
|
||||
res.Save(res_builder);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace distributed
|
@ -1,21 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/rpc/server.hpp"
|
||||
|
||||
namespace database {
|
||||
class GraphDb;
|
||||
};
|
||||
|
||||
namespace distributed {
|
||||
|
||||
class DurabilityRpcServer {
|
||||
public:
|
||||
DurabilityRpcServer(database::GraphDb &db,
|
||||
communication::rpc::Server &server);
|
||||
|
||||
private:
|
||||
database::GraphDb &db_;
|
||||
communication::rpc::Server &rpc_server_;
|
||||
};
|
||||
|
||||
} // namespace distributed
|
27
src/distributed/durability_rpc_worker.cpp
Normal file
27
src/distributed/durability_rpc_worker.cpp
Normal file
@ -0,0 +1,27 @@
|
||||
#include "distributed/durability_rpc_worker.hpp"
|
||||
|
||||
#include "database/distributed_graph_db.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "distributed/durability_rpc_messages.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
DurabilityRpcWorker::DurabilityRpcWorker(database::Worker *db,
|
||||
communication::rpc::Server *server)
|
||||
: db_(db), rpc_server_(server) {
|
||||
rpc_server_->Register<MakeSnapshotRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
database::GraphDbAccessor dba(*this->db_, req_reader.getMember());
|
||||
MakeSnapshotRes res(this->db_->MakeSnapshot(dba));
|
||||
res.Save(res_builder);
|
||||
});
|
||||
|
||||
rpc_server_->Register<RecoverWalAndIndexesRpc>(
|
||||
[this](const auto &req_reader, auto *res_builder) {
|
||||
durability::RecoveryData recovery_data;
|
||||
recovery_data.Load(req_reader.getMember());
|
||||
this->db_->RecoverWalAndIndexes(&recovery_data);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace distributed
|
20
src/distributed/durability_rpc_worker.hpp
Normal file
20
src/distributed/durability_rpc_worker.hpp
Normal file
@ -0,0 +1,20 @@
|
||||
#pragma once
|
||||
|
||||
#include "communication/rpc/server.hpp"
|
||||
|
||||
namespace database {
|
||||
class Worker;
|
||||
}; // namespace database
|
||||
|
||||
namespace distributed {
|
||||
|
||||
class DurabilityRpcWorker {
|
||||
public:
|
||||
DurabilityRpcWorker(database::Worker *db, communication::rpc::Server *server);
|
||||
|
||||
private:
|
||||
database::Worker *db_;
|
||||
communication::rpc::Server *rpc_server_;
|
||||
};
|
||||
|
||||
} // namespace distributed
|
@ -1,9 +1,18 @@
|
||||
@0xb3d70bc0576218f3;
|
||||
|
||||
using Cxx = import "/capnp/c++.capnp";
|
||||
using Utils = import "/utils/serialization.capnp";
|
||||
|
||||
$Cxx.namespace("durability::capnp");
|
||||
|
||||
struct RecoveryInfo {
|
||||
snapshotTxId @0 :UInt64;
|
||||
maxWalTxId @1 :UInt64;
|
||||
walRecovered @1 :List(UInt64);
|
||||
}
|
||||
|
||||
struct RecoveryData {
|
||||
snapshooterTxId @0 :UInt64;
|
||||
walTxToRecover @1 :List(UInt64);
|
||||
snapshooterTxSnapshot @2 :List(UInt64);
|
||||
indexes @3 :List(Utils.Pair(Text, Text));
|
||||
}
|
||||
|
@ -37,31 +37,14 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
|
||||
namespace {
|
||||
using communication::bolt::DecodedValue;
|
||||
|
||||
// A data structure for exchanging info between main recovery function and
|
||||
// snapshot and WAL recovery functions.
|
||||
struct RecoveryData {
|
||||
tx::TransactionId snapshooter_tx_id{0};
|
||||
tx::TransactionId wal_max_recovered_tx_id{0};
|
||||
std::vector<tx::TransactionId> snapshooter_tx_snapshot;
|
||||
// A collection into which the indexes should be added so they
|
||||
// can be rebuilt at the end of the recovery transaction.
|
||||
std::vector<std::pair<std::string, std::string>> indexes;
|
||||
|
||||
void Clear() {
|
||||
snapshooter_tx_id = 0;
|
||||
snapshooter_tx_snapshot.clear();
|
||||
indexes.clear();
|
||||
}
|
||||
};
|
||||
|
||||
#define RETURN_IF_NOT(condition) \
|
||||
if (!(condition)) { \
|
||||
reader.Close(); \
|
||||
return false; \
|
||||
}
|
||||
|
||||
bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
RecoveryData &recovery_data) {
|
||||
bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb *db,
|
||||
RecoveryData *recovery_data) {
|
||||
HashedFileReader reader;
|
||||
SnapshotDecoder<HashedFileReader> decoder(reader);
|
||||
|
||||
@ -85,25 +68,25 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
|
||||
// Checks worker id was set correctly
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) &&
|
||||
dv.ValueInt() == db.WorkerId());
|
||||
dv.ValueInt() == db->WorkerId());
|
||||
|
||||
// Vertex and edge generator ids
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int));
|
||||
uint64_t vertex_generator_cnt = dv.ValueInt();
|
||||
db.storage().VertexGenerator().SetId(std::max(
|
||||
db.storage().VertexGenerator().LocalCount(), vertex_generator_cnt));
|
||||
db->storage().VertexGenerator().SetId(std::max(
|
||||
db->storage().VertexGenerator().LocalCount(), vertex_generator_cnt));
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int));
|
||||
uint64_t edge_generator_cnt = dv.ValueInt();
|
||||
db.storage().EdgeGenerator().SetId(
|
||||
std::max(db.storage().EdgeGenerator().LocalCount(), edge_generator_cnt));
|
||||
db->storage().EdgeGenerator().SetId(
|
||||
std::max(db->storage().EdgeGenerator().LocalCount(), edge_generator_cnt));
|
||||
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int));
|
||||
recovery_data.snapshooter_tx_id = dv.ValueInt();
|
||||
recovery_data->snapshooter_tx_id = dv.ValueInt();
|
||||
// Transaction snapshot of the transaction that created the snapshot.
|
||||
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::List));
|
||||
for (const auto &value : dv.ValueList()) {
|
||||
RETURN_IF_NOT(value.IsInt());
|
||||
recovery_data.snapshooter_tx_snapshot.emplace_back(value.ValueInt());
|
||||
recovery_data->snapshooter_tx_snapshot.emplace_back(value.ValueInt());
|
||||
}
|
||||
|
||||
// A list of label+property indexes.
|
||||
@ -114,11 +97,11 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
RETURN_IF_NOT(it != index_value.end());
|
||||
auto property = *it++;
|
||||
RETURN_IF_NOT(label.IsString() && property.IsString());
|
||||
recovery_data.indexes.emplace_back(label.ValueString(),
|
||||
property.ValueString());
|
||||
recovery_data->indexes.emplace_back(label.ValueString(),
|
||||
property.ValueString());
|
||||
}
|
||||
|
||||
database::GraphDbAccessor dba(db);
|
||||
database::GraphDbAccessor dba(*db);
|
||||
std::unordered_map<gid::Gid,
|
||||
std::pair<storage::VertexAddress, storage::VertexAddress>>
|
||||
edge_gid_endpoints_mapping;
|
||||
@ -154,7 +137,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
[&db, &dba](storage::VertexAddress &address) {
|
||||
if (address.is_local()) return;
|
||||
// If the worker id matches it should be a local apperance
|
||||
if (address.worker_id() == db.WorkerId()) {
|
||||
if (address.worker_id() == db->WorkerId()) {
|
||||
address = storage::VertexAddress(
|
||||
dba.db().storage().LocalAddress<Vertex>(address.gid()));
|
||||
CHECK(address.is_local()) << "Address should be local but isn't";
|
||||
@ -165,7 +148,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
[&db, &dba](storage::EdgeAddress &address) {
|
||||
if (address.is_local()) return;
|
||||
// If the worker id matches it should be a local apperance
|
||||
if (address.worker_id() == db.WorkerId()) {
|
||||
if (address.worker_id() == db->WorkerId()) {
|
||||
address = storage::EdgeAddress(
|
||||
dba.db().storage().LocalAddress<Edge>(address.gid()));
|
||||
CHECK(address.is_local()) << "Address should be local but isn't";
|
||||
@ -245,8 +228,8 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
// than the latest one we have recovered. Do this to make sure that
|
||||
// subsequently created snapshots and WAL files will have transactional info
|
||||
// that does not interfere with that found in previous snapshots and WAL.
|
||||
tx::TransactionId max_id = recovery_data.snapshooter_tx_id;
|
||||
auto &snap = recovery_data.snapshooter_tx_snapshot;
|
||||
tx::TransactionId max_id = recovery_data->snapshooter_tx_id;
|
||||
auto &snap = recovery_data->snapshooter_tx_snapshot;
|
||||
if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end());
|
||||
dba.db().tx_engine().EnsureNextIdGreater(max_id);
|
||||
dba.Commit();
|
||||
@ -255,32 +238,99 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
|
||||
|
||||
#undef RETURN_IF_NOT
|
||||
|
||||
// TODO - finer-grained recovery feedback could be useful here.
|
||||
bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
|
||||
RecoveryData &recovery_data) {
|
||||
std::vector<fs::path> GetWalFiles(const fs::path &wal_dir) {
|
||||
// Get paths to all the WAL files and sort them (on date).
|
||||
std::vector<fs::path> wal_files;
|
||||
if (!fs::exists(wal_dir)) return true;
|
||||
if (!fs::exists(wal_dir)) return {};
|
||||
for (auto &wal_file : fs::directory_iterator(wal_dir))
|
||||
wal_files.emplace_back(wal_file);
|
||||
std::sort(wal_files.begin(), wal_files.end());
|
||||
return wal_files;
|
||||
}
|
||||
|
||||
// Track which transaction should be recovered first, and define logic for
|
||||
// which transactions should be skipped in recovery.
|
||||
bool ApplyOverDeltas(
|
||||
const std::vector<fs::path> &wal_files, tx::TransactionId first_to_recover,
|
||||
const std::function<void(const database::StateDelta &)> &f) {
|
||||
for (auto &wal_file : wal_files) {
|
||||
auto wal_file_max_tx_id = TransactionIdFromWalFilename(wal_file.filename());
|
||||
if (!wal_file_max_tx_id || *wal_file_max_tx_id < first_to_recover) continue;
|
||||
|
||||
HashedFileReader wal_reader;
|
||||
if (!wal_reader.Open(wal_file)) return false;
|
||||
communication::bolt::Decoder<HashedFileReader> decoder(wal_reader);
|
||||
while (true) {
|
||||
auto delta = database::StateDelta::Decode(wal_reader, decoder);
|
||||
if (!delta) break;
|
||||
f(*delta);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto FirstWalTxToRecover(const RecoveryData &recovery_data) {
|
||||
auto &tx_sn = recovery_data.snapshooter_tx_snapshot;
|
||||
auto first_to_recover = tx_sn.empty() ? recovery_data.snapshooter_tx_id + 1
|
||||
: *std::min(tx_sn.begin(), tx_sn.end());
|
||||
auto should_skip = [&tx_sn, &recovery_data,
|
||||
return first_to_recover;
|
||||
}
|
||||
|
||||
std::vector<tx::TransactionId> ReadWalRecoverableTransactions(
|
||||
const fs::path &wal_dir, database::GraphDb *db,
|
||||
const RecoveryData &recovery_data) {
|
||||
auto wal_files = GetWalFiles(wal_dir);
|
||||
|
||||
std::unordered_set<tx::TransactionId> committed_set;
|
||||
auto first_to_recover = FirstWalTxToRecover(recovery_data);
|
||||
ApplyOverDeltas(
|
||||
wal_files, first_to_recover, [&](const database::StateDelta &delta) {
|
||||
if (delta.transaction_id >= first_to_recover &&
|
||||
delta.type == database::StateDelta::Type::TRANSACTION_COMMIT) {
|
||||
committed_set.insert(delta.transaction_id);
|
||||
}
|
||||
});
|
||||
|
||||
std::vector<tx::TransactionId> committed_tx_ids(committed_set.size());
|
||||
for (auto id : committed_set) committed_tx_ids.push_back(id);
|
||||
return committed_tx_ids;
|
||||
}
|
||||
|
||||
// TODO - finer-grained recovery feedback could be useful here.
|
||||
bool RecoverWal(const fs::path &wal_dir, database::GraphDb *db,
|
||||
RecoveryData *recovery_data) {
|
||||
auto wal_files = GetWalFiles(wal_dir);
|
||||
// Track which transaction should be recovered first, and define logic for
|
||||
// which transactions should be skipped in recovery.
|
||||
auto &tx_sn = recovery_data->snapshooter_tx_snapshot;
|
||||
auto first_to_recover = FirstWalTxToRecover(*recovery_data);
|
||||
|
||||
// Set of transactions which can be recovered, since not every transaction in
|
||||
// wal can be recovered because it might not be present on some workers (there
|
||||
// wasn't enough time for it to flush to disk or similar)
|
||||
std::unordered_set<tx::TransactionId> common_wal_tx;
|
||||
for (auto tx_id : recovery_data->wal_tx_to_recover)
|
||||
common_wal_tx.insert(tx_id);
|
||||
|
||||
auto should_skip = [&tx_sn, recovery_data, &common_wal_tx,
|
||||
first_to_recover](tx::TransactionId tx_id) {
|
||||
return tx_id < first_to_recover ||
|
||||
(tx_id < recovery_data.snapshooter_tx_id &&
|
||||
!utils::Contains(tx_sn, tx_id));
|
||||
(tx_id < recovery_data->snapshooter_tx_id &&
|
||||
!utils::Contains(tx_sn, tx_id)) ||
|
||||
!utils::Contains(common_wal_tx, tx_id);
|
||||
};
|
||||
|
||||
std::unordered_map<tx::TransactionId, database::GraphDbAccessor> accessors;
|
||||
auto get_accessor =
|
||||
[&accessors](tx::TransactionId tx_id) -> database::GraphDbAccessor & {
|
||||
[db, &accessors](tx::TransactionId tx_id) -> database::GraphDbAccessor & {
|
||||
auto found = accessors.find(tx_id);
|
||||
|
||||
// Currently accessors are created on transaction_begin, but since workers
|
||||
// don't have a transaction begin, the accessors are not created.
|
||||
if (db->type() == database::GraphDb::Type::DISTRIBUTED_WORKER &&
|
||||
found == accessors.end()) {
|
||||
std::tie(found, std::ignore) = accessors.emplace(tx_id, *db);
|
||||
}
|
||||
|
||||
CHECK(found != accessors.end())
|
||||
<< "Accessor does not exist for transaction: " << tx_id;
|
||||
return found->second;
|
||||
@ -294,59 +344,49 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
|
||||
|
||||
// Read all the WAL files whose max_tx_id is not smaller than
|
||||
// min_tx_to_recover.
|
||||
for (auto &wal_file : wal_files) {
|
||||
auto wal_file_max_tx_id = TransactionIdFromWalFilename(wal_file.filename());
|
||||
if (!wal_file_max_tx_id || *wal_file_max_tx_id < first_to_recover) continue;
|
||||
|
||||
HashedFileReader wal_reader;
|
||||
if (!wal_reader.Open(wal_file)) return false;
|
||||
communication::bolt::Decoder<HashedFileReader> decoder(wal_reader);
|
||||
while (true) {
|
||||
auto delta = database::StateDelta::Decode(wal_reader, decoder);
|
||||
if (!delta) break;
|
||||
max_observed_tx_id = std::max(max_observed_tx_id, delta->transaction_id);
|
||||
if (should_skip(delta->transaction_id)) continue;
|
||||
switch (delta->type) {
|
||||
case database::StateDelta::Type::TRANSACTION_BEGIN:
|
||||
DCHECK(accessors.find(delta->transaction_id) == accessors.end())
|
||||
<< "Double transaction start";
|
||||
accessors.emplace(delta->transaction_id, db);
|
||||
break;
|
||||
case database::StateDelta::Type::TRANSACTION_ABORT:
|
||||
get_accessor(delta->transaction_id).Abort();
|
||||
accessors.erase(accessors.find(delta->transaction_id));
|
||||
break;
|
||||
case database::StateDelta::Type::TRANSACTION_COMMIT:
|
||||
get_accessor(delta->transaction_id).Commit();
|
||||
recovery_data.wal_max_recovered_tx_id = delta->transaction_id;
|
||||
accessors.erase(accessors.find(delta->transaction_id));
|
||||
break;
|
||||
case database::StateDelta::Type::BUILD_INDEX:
|
||||
// TODO index building might still be problematic in HA
|
||||
recovery_data.indexes.emplace_back(delta->label_name,
|
||||
delta->property_name);
|
||||
break;
|
||||
default:
|
||||
delta->Apply(get_accessor(delta->transaction_id));
|
||||
}
|
||||
} // reading all deltas in a single wal file
|
||||
} // reading all wal files
|
||||
//
|
||||
ApplyOverDeltas(
|
||||
wal_files, first_to_recover, [&](const database::StateDelta &delta) {
|
||||
max_observed_tx_id = std::max(max_observed_tx_id, delta.transaction_id);
|
||||
if (should_skip(delta.transaction_id)) return;
|
||||
switch (delta.type) {
|
||||
case database::StateDelta::Type::TRANSACTION_BEGIN:
|
||||
CHECK(accessors.find(delta.transaction_id) == accessors.end())
|
||||
<< "Double transaction start";
|
||||
accessors.emplace(delta.transaction_id, *db);
|
||||
break;
|
||||
case database::StateDelta::Type::TRANSACTION_ABORT:
|
||||
get_accessor(delta.transaction_id).Abort();
|
||||
accessors.erase(accessors.find(delta.transaction_id));
|
||||
break;
|
||||
case database::StateDelta::Type::TRANSACTION_COMMIT:
|
||||
get_accessor(delta.transaction_id).Commit();
|
||||
accessors.erase(accessors.find(delta.transaction_id));
|
||||
break;
|
||||
case database::StateDelta::Type::BUILD_INDEX:
|
||||
// TODO index building might still be problematic in HA
|
||||
recovery_data->indexes.emplace_back(delta.label_name,
|
||||
delta.property_name);
|
||||
break;
|
||||
default:
|
||||
delta.Apply(get_accessor(delta.transaction_id));
|
||||
}
|
||||
});
|
||||
|
||||
// TODO when implementing proper error handling return one of the following:
|
||||
// - WAL fully recovered
|
||||
// - WAL partially recovered
|
||||
// - WAL recovery error
|
||||
|
||||
db.tx_engine().EnsureNextIdGreater(max_observed_tx_id);
|
||||
db->tx_engine().EnsureNextIdGreater(max_observed_tx_id);
|
||||
return true;
|
||||
}
|
||||
} // anonymous namespace
|
||||
|
||||
RecoveryInfo Recover(
|
||||
const fs::path &durability_dir, database::GraphDb &db,
|
||||
std::experimental::optional<RecoveryInfo> required_recovery_info) {
|
||||
RecoveryData recovery_data;
|
||||
|
||||
RecoveryInfo RecoverOnlySnapshot(
|
||||
const fs::path &durability_dir, database::GraphDb *db,
|
||||
RecoveryData *recovery_data,
|
||||
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id) {
|
||||
// Attempt to recover from snapshot files in reverse order (from newest
|
||||
// backwards).
|
||||
const auto snapshot_dir = durability_dir / kSnapshotDir;
|
||||
@ -356,21 +396,21 @@ RecoveryInfo Recover(
|
||||
snapshot_files.emplace_back(file);
|
||||
std::sort(snapshot_files.rbegin(), snapshot_files.rend());
|
||||
for (auto &snapshot_file : snapshot_files) {
|
||||
if (required_recovery_info) {
|
||||
if (required_snapshot_tx_id) {
|
||||
auto snapshot_file_tx_id =
|
||||
TransactionIdFromSnapshotFilename(snapshot_file);
|
||||
if (!snapshot_file_tx_id || snapshot_file_tx_id.value() !=
|
||||
required_recovery_info->snapshot_tx_id) {
|
||||
if (!snapshot_file_tx_id ||
|
||||
snapshot_file_tx_id.value() != *required_snapshot_tx_id) {
|
||||
LOG(INFO) << "Skipping snapshot file '" << snapshot_file
|
||||
<< "' because it does not match the required snapshot tx id: "
|
||||
<< required_recovery_info->snapshot_tx_id;
|
||||
<< *required_snapshot_tx_id;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
|
||||
if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
|
||||
db.ReinitializeStorage();
|
||||
recovery_data.Clear();
|
||||
db->ReinitializeStorage();
|
||||
recovery_data->Clear();
|
||||
LOG(WARNING) << "Snapshot recovery failed, trying older snapshot...";
|
||||
continue;
|
||||
} else {
|
||||
@ -379,12 +419,19 @@ RecoveryInfo Recover(
|
||||
}
|
||||
}
|
||||
|
||||
// If snapshot recovery is required, and we failed, don't even deal with the
|
||||
// WAL recovery.
|
||||
if (required_recovery_info &&
|
||||
recovery_data.snapshooter_tx_id != required_recovery_info->snapshot_tx_id)
|
||||
return {recovery_data.snapshooter_tx_id, 0};
|
||||
// If snapshot recovery is required, and we failed, don't even deal with
|
||||
// the WAL recovery.
|
||||
if (required_snapshot_tx_id &&
|
||||
recovery_data->snapshooter_tx_id != *required_snapshot_tx_id)
|
||||
return {recovery_data->snapshooter_tx_id, {}};
|
||||
|
||||
return {recovery_data->snapshooter_tx_id,
|
||||
ReadWalRecoverableTransactions(durability_dir / kWalDir, db,
|
||||
*recovery_data)};
|
||||
}
|
||||
|
||||
void RecoverWalAndIndexes(const fs::path &durability_dir, database::GraphDb *db,
|
||||
RecoveryData *recovery_data) {
|
||||
// Write-ahead-log recovery.
|
||||
// WAL recovery does not have to be complete for the recovery to be
|
||||
// considered successful. For the time being ignore the return value,
|
||||
@ -392,18 +439,15 @@ RecoveryInfo Recover(
|
||||
RecoverWal(durability_dir / kWalDir, db, recovery_data);
|
||||
|
||||
// Index recovery.
|
||||
database::GraphDbAccessor db_accessor_indices{db};
|
||||
for (const auto &label_prop : recovery_data.indexes) {
|
||||
database::GraphDbAccessor db_accessor_indices{*db};
|
||||
for (const auto &label_prop : recovery_data->indexes) {
|
||||
const database::LabelPropertyIndex::Key key{
|
||||
db_accessor_indices.Label(label_prop.first),
|
||||
db_accessor_indices.Property(label_prop.second)};
|
||||
db_accessor_indices.db().storage().label_property_index_.CreateIndex(key);
|
||||
db_accessor_indices.db().storage().label_property_index().CreateIndex(key);
|
||||
db_accessor_indices.PopulateIndex(key);
|
||||
db_accessor_indices.EnableIndex(key);
|
||||
}
|
||||
db_accessor_indices.Commit();
|
||||
|
||||
return {recovery_data.snapshooter_tx_id,
|
||||
recovery_data.wal_max_recovered_tx_id};
|
||||
}
|
||||
} // namespace durability
|
||||
|
@ -3,11 +3,15 @@
|
||||
#include <experimental/optional>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "database/graph_db.hpp"
|
||||
#include "durability/hashed_file_reader.hpp"
|
||||
#include "durability/recovery.capnp.h"
|
||||
#include "storage/vertex_accessor.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/serialization.hpp"
|
||||
|
||||
namespace database {
|
||||
class GraphDb;
|
||||
};
|
||||
|
||||
namespace durability {
|
||||
|
||||
@ -15,25 +19,27 @@ namespace durability {
|
||||
struct RecoveryInfo {
|
||||
RecoveryInfo() {}
|
||||
RecoveryInfo(tx::TransactionId snapshot_tx_id,
|
||||
tx::TransactionId max_wal_tx_id)
|
||||
: snapshot_tx_id(snapshot_tx_id), max_wal_tx_id(max_wal_tx_id) {}
|
||||
const std::vector<tx::TransactionId> &wal_recovered)
|
||||
: snapshot_tx_id(snapshot_tx_id), wal_recovered(wal_recovered) {}
|
||||
tx::TransactionId snapshot_tx_id;
|
||||
tx::TransactionId max_wal_tx_id;
|
||||
std::vector<tx::TransactionId> wal_recovered;
|
||||
|
||||
bool operator==(const RecoveryInfo &other) const {
|
||||
return snapshot_tx_id == other.snapshot_tx_id &&
|
||||
max_wal_tx_id == other.max_wal_tx_id;
|
||||
wal_recovered == other.wal_recovered;
|
||||
}
|
||||
bool operator!=(const RecoveryInfo &other) const { return !(*this == other); }
|
||||
|
||||
void Save(capnp::RecoveryInfo::Builder *builder) const {
|
||||
builder->setSnapshotTxId(snapshot_tx_id);
|
||||
builder->setMaxWalTxId(max_wal_tx_id);
|
||||
auto list_builder = builder->initWalRecovered(wal_recovered.size());
|
||||
utils::SaveVector(wal_recovered, &list_builder);
|
||||
}
|
||||
|
||||
void Load(const capnp::RecoveryInfo::Reader &reader) {
|
||||
snapshot_tx_id = reader.getSnapshotTxId();
|
||||
max_wal_tx_id = reader.getMaxWalTxId();
|
||||
auto list_reader = reader.getWalRecovered();
|
||||
utils::LoadVector(&wal_recovered, list_reader);
|
||||
}
|
||||
|
||||
private:
|
||||
@ -42,7 +48,77 @@ struct RecoveryInfo {
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &snapshot_tx_id;
|
||||
ar &max_wal_tx_id;
|
||||
ar &wal_recovered;
|
||||
}
|
||||
};
|
||||
|
||||
// A data structure for exchanging info between main recovery function and
|
||||
// snapshot and WAL recovery functions.
|
||||
struct RecoveryData {
|
||||
tx::TransactionId snapshooter_tx_id{0};
|
||||
std::vector<tx::TransactionId> wal_tx_to_recover{};
|
||||
std::vector<tx::TransactionId> snapshooter_tx_snapshot;
|
||||
// A collection into which the indexes should be added so they
|
||||
// can be rebuilt at the end of the recovery transaction.
|
||||
std::vector<std::pair<std::string, std::string>> indexes;
|
||||
|
||||
void Clear() {
|
||||
snapshooter_tx_id = 0;
|
||||
snapshooter_tx_snapshot.clear();
|
||||
indexes.clear();
|
||||
}
|
||||
|
||||
void Save(capnp::RecoveryData::Builder *builder) const {
|
||||
builder->setSnapshooterTxId(snapshooter_tx_id);
|
||||
{
|
||||
auto list_builder = builder->initWalTxToRecover(wal_tx_to_recover.size());
|
||||
utils::SaveVector(wal_tx_to_recover, &list_builder);
|
||||
}
|
||||
{
|
||||
auto list_builder =
|
||||
builder->initSnapshooterTxSnapshot(snapshooter_tx_snapshot.size());
|
||||
utils::SaveVector(snapshooter_tx_snapshot, &list_builder);
|
||||
}
|
||||
{
|
||||
auto list_builder = builder->initIndexes(indexes.size());
|
||||
utils::SaveVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
|
||||
std::pair<std::string, std::string>>(
|
||||
indexes, &list_builder, [](auto *builder, const auto value) {
|
||||
builder->setFirst(value.first);
|
||||
builder->setSecond(value.second);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void Load(const capnp::RecoveryData::Reader &reader) {
|
||||
snapshooter_tx_id = reader.getSnapshooterTxId();
|
||||
{
|
||||
auto list_reader = reader.getWalTxToRecover();
|
||||
utils::LoadVector(&wal_tx_to_recover, list_reader);
|
||||
}
|
||||
{
|
||||
auto list_reader = reader.getSnapshooterTxSnapshot();
|
||||
utils::LoadVector(&snapshooter_tx_snapshot, list_reader);
|
||||
}
|
||||
{
|
||||
auto list_reader = reader.getIndexes();
|
||||
utils::LoadVector<utils::capnp::Pair<::capnp::Text, ::capnp::Text>,
|
||||
std::pair<std::string, std::string>>(
|
||||
&indexes, list_reader, [](const auto &reader) {
|
||||
return std::make_pair(reader.getFirst(), reader.getSecond());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &snapshooter_tx_id;
|
||||
ar &wal_tx_to_recover;
|
||||
ar &snapshooter_tx_snapshot;
|
||||
ar &indexes;
|
||||
}
|
||||
};
|
||||
|
||||
@ -52,20 +128,22 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
|
||||
int64_t &edge_count, uint64_t &hash);
|
||||
|
||||
/**
|
||||
* Recovers database from durability. If recovering fails, false is returned
|
||||
* and db_accessor aborts transaction, else true is returned and transaction is
|
||||
* commited.
|
||||
* Recovers database from the latest possible snapshot. If recovering fails,
|
||||
* false is returned and db_accessor aborts transaction, else true is returned
|
||||
* and transaction is commited.
|
||||
*
|
||||
* @param durability_dir - Path to durability directory.
|
||||
* @param db - The database to recover into.
|
||||
* @param required_recovery_info - Only used on distributed worker. Indicates
|
||||
* what the master recovered. The same transactions must be recovered on the
|
||||
* @param required_snapshot_tx_id - Only used on distributed worker. Indicates
|
||||
* what the master recovered. The same snapshot must be recovered on the
|
||||
* worker.
|
||||
* @return - recovery info
|
||||
*/
|
||||
RecoveryInfo Recover(
|
||||
RecoveryInfo RecoverOnlySnapshot(
|
||||
const std::experimental::filesystem::path &durability_dir,
|
||||
database::GraphDb &db,
|
||||
std::experimental::optional<RecoveryInfo> required_recovery_info);
|
||||
database::GraphDb *db, durability::RecoveryData *recovery_data,
|
||||
std::experimental::optional<tx::TransactionId> required_snapshot_tx_id);
|
||||
|
||||
void RecoverWalAndIndexes(const std::experimental::filesystem::path &dir,
|
||||
database::GraphDb *db, RecoveryData *recovery_data);
|
||||
} // namespace durability
|
||||
|
@ -982,7 +982,8 @@ used for outside definition."
|
||||
(member-builder (format nil "~A_builder" (cpp-member-name member :struct t)))
|
||||
(capnp-name (cpp-type-name (cpp-member-symbol member))))
|
||||
(cond
|
||||
((capnp-primitive-type-p (capnp-type-of-member member))
|
||||
((and (not (cpp-member-capnp-save member))
|
||||
(capnp-primitive-type-p (capnp-type-of-member member)))
|
||||
(format s " builder->set~A(~A);~%" capnp-name member-name))
|
||||
(t
|
||||
(write-line "{" s) ;; Enclose larger save code in new scope
|
||||
@ -1166,7 +1167,8 @@ used for outside definition."
|
||||
(member-reader (format nil "~A_reader" (cpp-member-name member :struct t)))
|
||||
(capnp-name (cpp-type-name (cpp-member-symbol member))))
|
||||
(cond
|
||||
((capnp-primitive-type-p (capnp-type-of-member member))
|
||||
((and (not (cpp-member-capnp-load member))
|
||||
(capnp-primitive-type-p (capnp-type-of-member member)))
|
||||
(format s " ~A = reader.get~A();~%" member-name capnp-name))
|
||||
(t
|
||||
(write-line "{" s) ;; Enclose larger load code in new scope
|
||||
|
@ -5,7 +5,6 @@
|
||||
#include "data_structures/concurrent/skiplist.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
#include "storage/deferred_deleter.hpp"
|
||||
#include "storage/deferred_deleter.hpp"
|
||||
#include "transactions/engine.hpp"
|
||||
|
||||
/**
|
||||
@ -52,8 +51,8 @@ class GarbageCollector {
|
||||
if (ret.second != nullptr)
|
||||
deleted_records.emplace_back(ret.second, engine.LocalLast());
|
||||
}
|
||||
DLOG_IF(INFO, count > 0) << "GC started cleaning with snapshot: "
|
||||
<< snapshot;
|
||||
DLOG_IF(INFO, count > 0)
|
||||
<< "GC started cleaning with snapshot: " << snapshot;
|
||||
DLOG_IF(INFO, count > 0) << "Destroyed: " << count;
|
||||
|
||||
// Add records to deleter, with the id larger or equal than the last active
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
|
||||
#include "utils/exceptions.hpp"
|
||||
@ -91,6 +92,12 @@ inline TVal First(TIterable &&iterable, TVal &&empty_value) {
|
||||
return empty_value;
|
||||
}
|
||||
|
||||
template <typename TElement>
|
||||
inline bool Contains(const std::unordered_set<TElement> &iterable,
|
||||
const TElement &element) {
|
||||
return iterable.find(element) != iterable.end();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns `true` if the given iterable contains the given element.
|
||||
*
|
||||
|
@ -78,7 +78,7 @@ class MgCluster:
|
||||
])
|
||||
|
||||
# sleep to allow the workers to startup
|
||||
time.sleep(5)
|
||||
time.sleep(15)
|
||||
|
||||
# store initial usage
|
||||
self._usage_start = [self._master.get_usage()]
|
||||
|
@ -60,7 +60,10 @@ class WorkerCoordinationInThread {
|
||||
}
|
||||
auto worker_ids() { return worker->coord.GetWorkerIds(); }
|
||||
void join() { worker_thread_.join(); }
|
||||
void NotifyWorkerRecovered() { worker->discovery.NotifyWorkerRecovered(); }
|
||||
void NotifyWorkerRecovered() {
|
||||
std::experimental::optional<durability::RecoveryInfo> no_recovery_info;
|
||||
worker->discovery.NotifyWorkerRecovered(no_recovery_info);
|
||||
}
|
||||
|
||||
private:
|
||||
std::thread worker_thread_;
|
||||
@ -72,7 +75,7 @@ TEST(Distributed, Coordination) {
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
master_coord.SetRecoveryInfo(std::experimental::nullopt);
|
||||
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
@ -102,7 +105,7 @@ TEST(Distributed, DesiredAndUniqueId) {
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
master_coord.SetRecoveryInfo(std::experimental::nullopt);
|
||||
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
@ -125,7 +128,7 @@ TEST(Distributed, CoordinationWorkersId) {
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
master_coord.SetRecoveryInfo(std::experimental::nullopt);
|
||||
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
@ -151,7 +154,7 @@ TEST(Distributed, ClusterDiscovery) {
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
master_coord.SetRecoveryInfo(std::experimental::nullopt);
|
||||
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
@ -182,7 +185,7 @@ TEST(Distributed, KeepsTrackOfRecovered) {
|
||||
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
|
||||
{
|
||||
MasterCoordination master_coord(master_server.endpoint());
|
||||
master_coord.SetRecoveryInfo(std::experimental::nullopt);
|
||||
master_coord.SetRecoveredSnapshot(std::experimental::nullopt);
|
||||
RpcWorkerClients rpc_worker_clients(master_coord);
|
||||
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
|
||||
rpc_worker_clients);
|
||||
|
@ -52,7 +52,7 @@ class RpcWorkerClientsTest : public ::testing::Test {
|
||||
const io::network::Endpoint kLocalHost{"127.0.0.1", 0};
|
||||
const int kWorkerCount = 2;
|
||||
void SetUp() override {
|
||||
master_coord_->SetRecoveryInfo(std::experimental::nullopt);
|
||||
master_coord_->SetRecoveredSnapshot(std::experimental::nullopt);
|
||||
for (int i = 1; i <= kWorkerCount; ++i) {
|
||||
workers_server_.emplace_back(
|
||||
std::make_unique<communication::rpc::Server>(kLocalHost));
|
||||
|
@ -21,7 +21,10 @@ class RecoveryTest : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override {
|
||||
std::string durability_dir(FLAGS_durability_dir);
|
||||
durability::Recover(durability_dir, db_, std::experimental::nullopt);
|
||||
durability::RecoveryData recovery_data;
|
||||
durability::RecoverOnlySnapshot(durability_dir, &db_, &recovery_data,
|
||||
std::experimental::nullopt);
|
||||
durability::RecoverWalAndIndexes(durability_dir, &db_, &recovery_data);
|
||||
}
|
||||
|
||||
database::SingleNode db_;
|
||||
|
Loading…
Reference in New Issue
Block a user