Ensure workers recover appropriate snapshot

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1355
This commit is contained in:
Dominik Gleich 2018-04-16 10:43:16 +02:00
parent e519a64c7c
commit dba81f223c
18 changed files with 282 additions and 58 deletions

View File

@ -14,6 +14,7 @@
#include "distributed/storage_gc_rpc_messages.hpp"
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "distributed/updates_rpc_messages.hpp"
#include "durability/recovery.hpp"
#include "stats/stats_rpc_messages.hpp"
#include "storage/concurrent_id_mapper_rpc_messages.hpp"
#include "transactions/engine_rpc_messages.hpp"
@ -54,6 +55,7 @@ BOOST_CLASS_EXPORT(tx::GlobalLastReq);
BOOST_CLASS_EXPORT(tx::GlobalLastRes);
// Distributed coordination.
BOOST_CLASS_EXPORT(durability::RecoveryInfo);
BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq);
BOOST_CLASS_EXPORT(distributed::RegisterWorkerRes);
BOOST_CLASS_EXPORT(distributed::ClusterDiscoveryReq);

View File

@ -1,3 +1,5 @@
#include <experimental/optional>
#include "glog/logging.h"
#include "communication/rpc/server.hpp"
@ -269,9 +271,45 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
if (impl_->config_.durability_enabled)
durability::CheckDurabilityDir(impl_->config_.durability_directory);
// Recovery on startup.
if (impl_->config_.db_recover_on_startup)
durability::Recover(impl_->config_.durability_directory, *impl_);
// Durability recovery.
{
auto db_type = impl_->type();
// What we should recover.
std::experimental::optional<durability::RecoveryInfo>
required_recovery_info;
if (db_type == Type::DISTRIBUTED_WORKER) {
required_recovery_info = dynamic_cast<impl::Worker *>(impl_.get())
->cluster_discovery_.recovery_info();
}
// What we recover.
std::experimental::optional<durability::RecoveryInfo> recovery_info;
// Recover only if necessary.
if ((db_type != Type::DISTRIBUTED_WORKER &&
impl_->config_.db_recover_on_startup) ||
(db_type == Type::DISTRIBUTED_WORKER && required_recovery_info)) {
recovery_info = durability::Recover(impl_->config_.durability_directory,
*impl_, required_recovery_info);
}
// Post-recovery setup and checking.
switch (db_type) {
case Type::DISTRIBUTED_MASTER:
dynamic_cast<impl::Master *>(impl_.get())
->coordination_.SetRecoveryInfo(recovery_info);
break;
case Type::DISTRIBUTED_WORKER:
if (required_recovery_info != recovery_info)
LOG(FATAL) << "Memgraph worker failed to recover the database state "
"recovered on the master";
break;
case Type::SINGLE_NODE:
break;
}
}
if (impl_->config_.durability_enabled) {
impl_->wal().Enable();
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <experimental/filesystem>
#include <experimental/optional>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "data_structures/concurrent/skiplist.hpp"
@ -11,6 +12,7 @@
#include "storage/edge.hpp"
#include "storage/types.hpp"
#include "storage/vertex.hpp"
#include "transactions/type.hpp"
namespace distributed {
class IndexRpcServer;
@ -21,7 +23,10 @@ class GraphDb;
};
namespace durability {
bool Recover(const std::experimental::filesystem::path &, database::GraphDb &);
struct RecoveryInfo;
RecoveryInfo Recover(const std::experimental::filesystem::path &,
database::GraphDb &,
std::experimental::optional<RecoveryInfo>);
};
namespace database {
@ -89,8 +94,9 @@ class Storage {
friend class GraphDbAccessor;
friend class StorageGc;
friend class distributed::IndexRpcServer;
friend bool durability::Recover(const std::experimental::filesystem::path &,
database::GraphDb &);
friend durability::RecoveryInfo durability::Recover(
const std::experimental::filesystem::path &, database::GraphDb &,
std::experimental::optional<durability::RecoveryInfo>);
int worker_id_;
gid::Generator vertex_generator_;

View File

@ -25,7 +25,8 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster(
}
return std::make_unique<RegisterWorkerRes>(
registration_successful, this->coordination_.GetWorkers());
registration_successful, this->coordination_.RecoveryInfo(),
this->coordination_.GetWorkers());
});
}

View File

@ -24,6 +24,7 @@ void ClusterDiscoveryWorker::RegisterWorker(int worker_id) {
for (auto &kv : result->workers) {
coordination_.RegisterWorker(kv.first, kv.second);
}
recovery_info_ = result->recovery_info;
}
} // namespace distributed

View File

@ -1,8 +1,11 @@
#pragma once
#include <experimental/optional>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "distributed/coordination_worker.hpp"
#include "durability/recovery.hpp"
namespace distributed {
using Server = communication::rpc::Server;
@ -27,10 +30,14 @@ class ClusterDiscoveryWorker final {
*/
void RegisterWorker(int worker_id);
/** Returns the recovery info. Valid only after registration. */
auto recovery_info() const { return recovery_info_; }
private:
Server &server_;
WorkerCoordination &coordination_;
communication::rpc::ClientPool &client_pool_;
std::experimental::optional<durability::RecoveryInfo> recovery_info_;
};
} // namespace distributed

View File

@ -15,6 +15,16 @@ MasterCoordination::MasterCoordination(const Endpoint &master_endpoint)
bool MasterCoordination::RegisterWorker(int desired_worker_id,
Endpoint endpoint) {
// Worker's can't register before the recovery phase on the master is done to
// ensure the whole cluster is in a consistent state.
while (true) {
{
std::lock_guard<std::mutex> guard(lock_);
if (recovery_done_) break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::lock_guard<std::mutex> guard(lock_);
auto workers = GetWorkers();
// Check if the desired worker id already exists.
@ -56,4 +66,18 @@ MasterCoordination::~MasterCoordination() {
}
}
void MasterCoordination::SetRecoveryInfo(
std::experimental::optional<durability::RecoveryInfo> info) {
std::lock_guard<std::mutex> guard(lock_);
recovery_done_ = true;
recovery_info_ = info;
}
std::experimental::optional<durability::RecoveryInfo>
MasterCoordination::RecoveryInfo() const {
std::lock_guard<std::mutex> guard(lock_);
CHECK(recovery_done_) << "RecoveryInfo requested before it's available";
return recovery_info_;
}
} // namespace distributed

View File

@ -1,9 +1,11 @@
#pragma once
#include <experimental/optional>
#include <mutex>
#include <unordered_map>
#include "distributed/coordination.hpp"
#include "durability/recovery.hpp"
#include "io/network/endpoint.hpp"
namespace distributed {
@ -28,8 +30,21 @@ class MasterCoordination final : public Coordination {
Endpoint GetEndpoint(int worker_id);
/// Sets the recovery info. nullopt indicates nothing was recovered.
void SetRecoveryInfo(
std::experimental::optional<durability::RecoveryInfo> info);
std::experimental::optional<durability::RecoveryInfo> RecoveryInfo() const;
private:
// Most master functions aren't thread-safe.
mutable std::mutex lock_;
/// Durabiliry recovery info.
/// Indicates if the recovery phase is done.
bool recovery_done_{false};
/// If nullopt nothing was recovered.
std::experimental::optional<durability::RecoveryInfo> recovery_info_;
};
} // namespace distributed

View File

@ -1,5 +1,6 @@
#pragma once
#include <experimental/optional>
#include <unordered_map>
#include "boost/serialization/access.hpp"
@ -7,6 +8,7 @@
#include "boost/serialization/unordered_map.hpp"
#include "communication/rpc/messages.hpp"
#include "durability/recovery.hpp"
#include "io/network/endpoint.hpp"
namespace distributed {
@ -34,11 +36,16 @@ struct RegisterWorkerReq : public Message {
};
struct RegisterWorkerRes : public Message {
RegisterWorkerRes(bool registration_successful,
const std::unordered_map<int, Endpoint> &workers)
: registration_successful(registration_successful), workers(workers) {}
RegisterWorkerRes(
bool registration_successful,
std::experimental::optional<durability::RecoveryInfo> recovery_info,
std::unordered_map<int, Endpoint> workers)
: registration_successful(registration_successful),
recovery_info(recovery_info),
workers(std::move(workers)) {}
bool registration_successful;
std::experimental::optional<durability::RecoveryInfo> recovery_info;
std::unordered_map<int, Endpoint> workers;
private:
@ -49,6 +56,7 @@ struct RegisterWorkerRes : public Message {
void serialize(TArchive &ar, unsigned int) {
ar &boost::serialization::base_object<Message>(*this);
ar &registration_successful;
ar &recovery_info;
ar &workers;
}
};

View File

@ -12,7 +12,7 @@
namespace durability {
namespace fs = std::experimental::filesystem;
/// Returns true if the given directory path exists or is succesfully created.
bool EnsureDir(const fs::path &dir) {
std::error_code error_code; // Just for exception suppression.
auto result = fs::create_directories(dir, error_code);
@ -21,8 +21,6 @@ bool EnsureDir(const fs::path &dir) {
return result || !error_code.value();
}
/// Ensures the given durability directory exists and is ready for use. Creates
/// the directory if it doesn't exist.
void CheckDurabilityDir(const std::string &durability_dir) {
namespace fs = std::experimental::filesystem;
if (fs::exists(durability_dir)) {
@ -36,12 +34,6 @@ void CheckDurabilityDir(const std::string &durability_dir) {
}
}
/// Returns the transaction id contained in the file name. If the filename is
/// not a parseable WAL file name, nullopt is returned. If the filename
/// represents the "current" WAL file, then the maximum possible transaction ID
/// is returned because that's appropriate for the recovery logic (the current
/// WAL does not yet have a maximum transaction ID and can't be discarded by
/// the recovery regardless of the snapshot from which the transaction starts).
std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
const std::string &name) {
auto nullopt = std::experimental::nullopt;
@ -96,4 +88,25 @@ fs::path WalFilenameForTransactionId(
file_name = file_name + "_Worker_" + std::to_string(worker_id);
return wal_dir / file_name;
}
std::experimental::optional<tx::transaction_id_t>
TransactionIdFromSnapshotFilename(const std::string &name) {
auto nullopt = std::experimental::nullopt;
auto file_name_split = utils::RSplit(name, "_tx_", 1);
if (file_name_split.size() != 2) {
LOG(WARNING) << "Unable to parse snapshot file name: " << name;
return nullopt;
}
try {
return std::stoll(file_name_split[1]);
} catch (std::invalid_argument &) {
LOG(WARNING) << "Unable to parse snapshot file name tx ID: "
<< file_name_split[1];
return nullopt;
} catch (std::out_of_range &) {
LOG(WARNING) << "Unable to parse snapshot file name tx ID: "
<< file_name_split[1];
return nullopt;
}
}
} // namespace durability

View File

@ -32,6 +32,11 @@ std::experimental::filesystem::path MakeSnapshotPath(
const std::experimental::filesystem::path &durability_dir, int worker_id,
tx::transaction_id_t tx_id);
/// Returns the transaction id contained in the file name. If the filename is
/// not a parseable WAL file name, nullopt is returned.
std::experimental::optional<tx::transaction_id_t>
TransactionIdFromSnapshotFilename(const std::string &name);
/// Generates a file path for a write-ahead log file of a specified worker. If
/// given a transaction ID the file name will contain it. Otherwise the file
/// path is for the "current" WAL file for which the max tx id is still unknown.

View File

@ -37,6 +37,7 @@ using communication::bolt::DecodedValue;
// snapshot and WAL recovery functions.
struct RecoveryData {
tx::transaction_id_t snapshooter_tx_id{0};
tx::transaction_id_t wal_max_recovered_tx_id{0};
std::vector<tx::transaction_id_t> snapshooter_tx_snapshot;
// A collection into which the indexes should be added so they
// can be rebuilt at the end of the recovery transaction.
@ -303,6 +304,7 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
break;
case database::StateDelta::Type::TRANSACTION_COMMIT:
get_accessor(delta->transaction_id).Commit();
recovery_data.wal_max_recovered_tx_id = delta->transaction_id;
accessors.erase(accessors.find(delta->transaction_id));
break;
case database::StateDelta::Type::BUILD_INDEX:
@ -326,7 +328,9 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
}
} // anonymous namespace
bool Recover(const fs::path &durability_dir, database::GraphDb &db) {
RecoveryInfo Recover(
const fs::path &durability_dir, database::GraphDb &db,
std::experimental::optional<RecoveryInfo> required_recovery_info) {
RecoveryData recovery_data;
// Attempt to recover from snapshot files in reverse order (from newest
@ -338,6 +342,17 @@ bool Recover(const fs::path &durability_dir, database::GraphDb &db) {
snapshot_files.emplace_back(file);
std::sort(snapshot_files.rbegin(), snapshot_files.rend());
for (auto &snapshot_file : snapshot_files) {
if (required_recovery_info) {
auto snapshot_file_tx_id =
TransactionIdFromSnapshotFilename(snapshot_file);
if (!snapshot_file_tx_id || snapshot_file_tx_id.value() !=
required_recovery_info->snapshot_tx_id) {
LOG(INFO) << "Skipping snapshot file '" << snapshot_file
<< "' because it does not match the required snapshot tx id: "
<< required_recovery_info->snapshot_tx_id;
continue;
}
}
LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file;
if (!RecoverSnapshot(snapshot_file, db, recovery_data)) {
recovery_data.Clear();
@ -349,6 +364,12 @@ bool Recover(const fs::path &durability_dir, database::GraphDb &db) {
}
}
// If snapshot recovery is required, and we failed, don't even deal with the
// WAL recovery.
if (required_recovery_info &&
recovery_data.snapshooter_tx_id != required_recovery_info->snapshot_tx_id)
return {recovery_data.snapshooter_tx_id, 0};
// Write-ahead-log recovery.
// WAL recovery does not have to be complete for the recovery to be
// considered successful. For the time being ignore the return value,
@ -366,6 +387,8 @@ bool Recover(const fs::path &durability_dir, database::GraphDb &db) {
db_accessor_indices.EnableIndex(key);
}
db_accessor_indices.Commit();
return true;
return {recovery_data.snapshooter_tx_id,
recovery_data.wal_max_recovered_tx_id};
}
} // namespace durability

View File

@ -1,16 +1,43 @@
#pragma once
#include <experimental/filesystem>
#include <experimental/optional>
#include <unordered_map>
#include "database/graph_db.hpp"
#include "durability/hashed_file_reader.hpp"
#include "storage/vertex_accessor.hpp"
#include "transactions/type.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
/// Stores info on what was (or needs to be) recovered from durability.
struct RecoveryInfo {
RecoveryInfo() {}
RecoveryInfo(tx::transaction_id_t snapshot_tx_id,
tx::transaction_id_t max_wal_tx_id)
: snapshot_tx_id(snapshot_tx_id), max_wal_tx_id(max_wal_tx_id) {}
tx::transaction_id_t snapshot_tx_id;
tx::transaction_id_t max_wal_tx_id;
bool operator==(const RecoveryInfo &other) const {
return snapshot_tx_id == other.snapshot_tx_id &&
max_wal_tx_id == other.max_wal_tx_id;
}
bool operator!=(const RecoveryInfo &other) const { return !(*this == other); }
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &snapshot_tx_id;
ar &max_wal_tx_id;
}
};
/** Reads snapshot metadata from the end of the file without messing up the
* hash. */
bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
@ -23,8 +50,14 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
*
* @param durability_dir - Path to durability directory.
* @param db - The database to recover into.
* @return - If recovery was succesful.
* @param required_recovery_info - Only used on distributed worker. Indicates
* what the master recovered. The same transactions must be recovered on the
* worker.
* @return - recovery info
*/
bool Recover(const std::experimental::filesystem::path &durability_dir,
database::GraphDb &db);
RecoveryInfo Recover(
const std::experimental::filesystem::path &durability_dir,
database::GraphDb &db,
std::experimental::optional<RecoveryInfo> required_recovery_info);
} // namespace durability

View File

@ -130,10 +130,10 @@ class DistributedGraphDbTest : public ::testing::Test {
return std::distance(edges.begin(), edges.end());
};
fs::path tmp_dir_ = fs::temp_directory_path() /
("MG_test_unit_durability" + std::to_string(getpid()));
private:
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;
fs::path tmp_dir_ = fs::temp_directory_path() /
("MG_test_unit_durability" + std::to_string(getpid()));
};

View File

@ -71,6 +71,7 @@ TEST(Distributed, Coordination) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveryInfo(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
@ -100,6 +101,7 @@ TEST(Distributed, DesiredAndUniqueId) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveryInfo(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
@ -122,6 +124,7 @@ TEST(Distributed, CoordinationWorkersId) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveryInfo(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);
@ -147,6 +150,7 @@ TEST(Distributed, ClusterDiscovery) {
std::vector<std::unique_ptr<WorkerCoordinationInThread>> workers;
{
MasterCoordination master_coord(master_server.endpoint());
master_coord.SetRecoveryInfo(std::experimental::nullopt);
RpcWorkerClients rpc_worker_clients(master_coord);
ClusterDiscoveryMaster master_discovery_(master_server, master_coord,
rpc_worker_clients);

View File

@ -1,35 +1,46 @@
#include "distributed_common.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/snapshooter.hpp"
class DistributedDurability : public DistributedGraphDbTest {
public:
void write_labels() {
add_label(master(), "master");
add_label(worker(1), "worker1");
add_label(worker(2), "worker2");
void AddVertices() {
AddVertex(master(), "master");
AddVertex(worker(1), "worker1");
AddVertex(worker(2), "worker2");
}
void check_labels() {
check_label(master(), "master");
check_label(worker(1), "worker1");
check_label(worker(2), "worker2");
void CheckVertices(int expected_count) {
CheckVertex(master(), expected_count, "master");
CheckVertex(worker(1), expected_count, "worker1");
CheckVertex(worker(2), expected_count, "worker2");
}
void RestartWithRecovery() {
ShutDown();
Initialize([](database::Config config) {
config.db_recover_on_startup = true;
return config;
});
}
private:
void add_label(database::GraphDb &db, const std::string &label) {
void AddVertex(database::GraphDb &db, const std::string &label) {
database::GraphDbAccessor dba(db);
auto vertex = dba.InsertVertex();
vertex.add_label(dba.Label(label));
dba.Commit();
}
void check_label(database::GraphDb &db, const std::string &label) {
void CheckVertex(database::GraphDb &db, int expected_count,
const std::string &label) {
database::GraphDbAccessor dba(db);
auto it = dba.Vertices(false);
ASSERT_NE(it.begin(), it.end());
auto vertex = *it.begin();
ASSERT_EQ(vertex.labels().size(), 1);
EXPECT_EQ(vertex.labels()[0], dba.Label(label));
std::vector<VertexAccessor> vertices{it.begin(), it.end()};
EXPECT_EQ(vertices.size(), expected_count);
for (auto &vertex : vertices) {
ASSERT_EQ(vertex.labels().size(), 1);
EXPECT_EQ(vertex.labels()[0], dba.Label(label));
}
}
};
@ -37,18 +48,14 @@ TEST_F(DistributedDurability, MakeSnapshot) {
// Create a graph with 3 nodes with 3 labels, one on each and make a snapshot
// of it
{
write_labels();
AddVertices();
database::GraphDbAccessor dba(master());
master().MakeSnapshot(dba);
}
// Recover the graph and check if it's the same as before
{
ShutDown();
Initialize([](database::Config config) {
config.db_recover_on_startup = true;
return config;
});
check_labels();
RestartWithRecovery();
CheckVertices(1);
}
}
@ -59,17 +66,52 @@ TEST_F(DistributedDurability, SnapshotOnExit) {
config.snapshot_on_exit = true;
return config;
});
write_labels();
AddVertices();
}
// Recover the graph and check if it's the same as before
{
// This should force the db to make a snapshot
ShutDown();
Initialize([](database::Config config) {
config.db_recover_on_startup = true;
return config;
});
check_labels();
RestartWithRecovery();
CheckVertices(1);
}
}
TEST_F(DistributedDurability, RecoveryFromSameSnapshot) {
{
AddVertices();
// Make snapshot on one worker, expect it won't recover from that.
database::GraphDbAccessor dba(worker(1));
worker(1).MakeSnapshot(dba);
}
{
RestartWithRecovery();
CheckVertices(0);
AddVertices();
database::GraphDbAccessor dba(master());
master().MakeSnapshot(dba);
}
{
RestartWithRecovery();
CheckVertices(1);
AddVertices();
CheckVertices(2);
// Make snapshot on one worker, expect it won't recover from that.
database::GraphDbAccessor dba(worker(1));
worker(1).MakeSnapshot(dba);
}
{
RestartWithRecovery();
CheckVertices(1);
}
}
TEST_F(DistributedDurability, RecoveryFailure) {
{
AddVertices();
// Make a snapshot on the master without the right snapshots on workers.
database::GraphDbAccessor dba(master());
bool status = durability::MakeSnapshot(master(), dba, tmp_dir_, 100);
ASSERT_TRUE(status);
}
::testing::FLAGS_gtest_death_test_style = "threadsafe";
EXPECT_DEATH(RestartWithRecovery(), "worker failed to recover");
}

View File

@ -32,6 +32,7 @@ class RpcWorkerClientsTest : public ::testing::Test {
const io::network::Endpoint kLocalHost{"127.0.0.1", 0};
const int kWorkerCount = 2;
void SetUp() override {
master_coord_->SetRecoveryInfo(std::experimental::nullopt);
for (int i = 1; i <= kWorkerCount; ++i) {
workers_server_.emplace_back(
std::make_unique<communication::rpc::Server>(kLocalHost));

View File

@ -1,3 +1,4 @@
#include <experimental/optional>
#include <string>
#include "gflags/gflags.h"
@ -20,7 +21,7 @@ class RecoveryTest : public ::testing::Test {
protected:
void SetUp() override {
std::string durability_dir(FLAGS_durability_dir);
durability::Recover(durability_dir, db_);
durability::Recover(durability_dir, db_, std::experimental::nullopt);
}
database::SingleNode db_;