From dba81f223cbbab8e7923ec7d33897716c3e82c39 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Mon, 16 Apr 2018 10:43:16 +0200 Subject: [PATCH] Ensure workers recover appropriate snapshot Reviewers: msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1355 --- src/communication/rpc/messages-inl.hpp | 2 + src/database/graph_db.cpp | 44 +++++++- src/database/storage.hpp | 12 ++- src/distributed/cluster_discovery_master.cpp | 3 +- src/distributed/cluster_discovery_worker.cpp | 1 + src/distributed/cluster_discovery_worker.hpp | 7 ++ src/distributed/coordination_master.cpp | 24 +++++ src/distributed/coordination_master.hpp | 15 +++ src/distributed/coordination_rpc_messages.hpp | 14 ++- src/durability/paths.cpp | 31 ++++-- src/durability/paths.hpp | 5 + src/durability/recovery.cpp | 27 ++++- src/durability/recovery.hpp | 39 ++++++- tests/unit/distributed_common.hpp | 6 +- tests/unit/distributed_coordination.cpp | 4 + tests/unit/distributed_durability.cpp | 102 ++++++++++++------ tests/unit/rpc_worker_clients.cpp | 1 + tools/tests/mg_recovery_check.cpp | 3 +- 18 files changed, 282 insertions(+), 58 deletions(-) diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index c314d7bf2..757009313 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -14,6 +14,7 @@ #include "distributed/storage_gc_rpc_messages.hpp" #include "distributed/transactional_cache_cleaner_rpc_messages.hpp" #include "distributed/updates_rpc_messages.hpp" +#include "durability/recovery.hpp" #include "stats/stats_rpc_messages.hpp" #include "storage/concurrent_id_mapper_rpc_messages.hpp" #include "transactions/engine_rpc_messages.hpp" @@ -54,6 +55,7 @@ BOOST_CLASS_EXPORT(tx::GlobalLastReq); BOOST_CLASS_EXPORT(tx::GlobalLastRes); // Distributed coordination. +BOOST_CLASS_EXPORT(durability::RecoveryInfo); BOOST_CLASS_EXPORT(distributed::RegisterWorkerReq); BOOST_CLASS_EXPORT(distributed::RegisterWorkerRes); BOOST_CLASS_EXPORT(distributed::ClusterDiscoveryReq); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 234dcd9ca..20f5dd149 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -1,3 +1,5 @@ +#include + #include "glog/logging.h" #include "communication/rpc/server.hpp" @@ -269,9 +271,45 @@ PublicBase::PublicBase(std::unique_ptr impl) if (impl_->config_.durability_enabled) durability::CheckDurabilityDir(impl_->config_.durability_directory); - // Recovery on startup. - if (impl_->config_.db_recover_on_startup) - durability::Recover(impl_->config_.durability_directory, *impl_); + // Durability recovery. + { + auto db_type = impl_->type(); + + // What we should recover. + std::experimental::optional + required_recovery_info; + if (db_type == Type::DISTRIBUTED_WORKER) { + required_recovery_info = dynamic_cast(impl_.get()) + ->cluster_discovery_.recovery_info(); + } + + // What we recover. + std::experimental::optional recovery_info; + + // Recover only if necessary. + if ((db_type != Type::DISTRIBUTED_WORKER && + impl_->config_.db_recover_on_startup) || + (db_type == Type::DISTRIBUTED_WORKER && required_recovery_info)) { + recovery_info = durability::Recover(impl_->config_.durability_directory, + *impl_, required_recovery_info); + } + + // Post-recovery setup and checking. + switch (db_type) { + case Type::DISTRIBUTED_MASTER: + dynamic_cast(impl_.get()) + ->coordination_.SetRecoveryInfo(recovery_info); + break; + case Type::DISTRIBUTED_WORKER: + if (required_recovery_info != recovery_info) + LOG(FATAL) << "Memgraph worker failed to recover the database state " + "recovered on the master"; + break; + case Type::SINGLE_NODE: + break; + } + } + if (impl_->config_.durability_enabled) { impl_->wal().Enable(); } diff --git a/src/database/storage.hpp b/src/database/storage.hpp index 8a47b4498..2407fa391 100644 --- a/src/database/storage.hpp +++ b/src/database/storage.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include "data_structures/concurrent/concurrent_map.hpp" #include "data_structures/concurrent/skiplist.hpp" @@ -11,6 +12,7 @@ #include "storage/edge.hpp" #include "storage/types.hpp" #include "storage/vertex.hpp" +#include "transactions/type.hpp" namespace distributed { class IndexRpcServer; @@ -21,7 +23,10 @@ class GraphDb; }; namespace durability { -bool Recover(const std::experimental::filesystem::path &, database::GraphDb &); +struct RecoveryInfo; +RecoveryInfo Recover(const std::experimental::filesystem::path &, + database::GraphDb &, + std::experimental::optional); }; namespace database { @@ -89,8 +94,9 @@ class Storage { friend class GraphDbAccessor; friend class StorageGc; friend class distributed::IndexRpcServer; - friend bool durability::Recover(const std::experimental::filesystem::path &, - database::GraphDb &); + friend durability::RecoveryInfo durability::Recover( + const std::experimental::filesystem::path &, database::GraphDb &, + std::experimental::optional); int worker_id_; gid::Generator vertex_generator_; diff --git a/src/distributed/cluster_discovery_master.cpp b/src/distributed/cluster_discovery_master.cpp index 63b2e6acf..d064bbd63 100644 --- a/src/distributed/cluster_discovery_master.cpp +++ b/src/distributed/cluster_discovery_master.cpp @@ -25,7 +25,8 @@ ClusterDiscoveryMaster::ClusterDiscoveryMaster( } return std::make_unique( - registration_successful, this->coordination_.GetWorkers()); + registration_successful, this->coordination_.RecoveryInfo(), + this->coordination_.GetWorkers()); }); } diff --git a/src/distributed/cluster_discovery_worker.cpp b/src/distributed/cluster_discovery_worker.cpp index 1636b1e28..3de166cbb 100644 --- a/src/distributed/cluster_discovery_worker.cpp +++ b/src/distributed/cluster_discovery_worker.cpp @@ -24,6 +24,7 @@ void ClusterDiscoveryWorker::RegisterWorker(int worker_id) { for (auto &kv : result->workers) { coordination_.RegisterWorker(kv.first, kv.second); } + recovery_info_ = result->recovery_info; } } // namespace distributed diff --git a/src/distributed/cluster_discovery_worker.hpp b/src/distributed/cluster_discovery_worker.hpp index 723521925..186cf08cb 100644 --- a/src/distributed/cluster_discovery_worker.hpp +++ b/src/distributed/cluster_discovery_worker.hpp @@ -1,8 +1,11 @@ #pragma once +#include + #include "communication/rpc/client_pool.hpp" #include "communication/rpc/server.hpp" #include "distributed/coordination_worker.hpp" +#include "durability/recovery.hpp" namespace distributed { using Server = communication::rpc::Server; @@ -27,10 +30,14 @@ class ClusterDiscoveryWorker final { */ void RegisterWorker(int worker_id); + /** Returns the recovery info. Valid only after registration. */ + auto recovery_info() const { return recovery_info_; } + private: Server &server_; WorkerCoordination &coordination_; communication::rpc::ClientPool &client_pool_; + std::experimental::optional recovery_info_; }; } // namespace distributed diff --git a/src/distributed/coordination_master.cpp b/src/distributed/coordination_master.cpp index b4008f362..67c3c8869 100644 --- a/src/distributed/coordination_master.cpp +++ b/src/distributed/coordination_master.cpp @@ -15,6 +15,16 @@ MasterCoordination::MasterCoordination(const Endpoint &master_endpoint) bool MasterCoordination::RegisterWorker(int desired_worker_id, Endpoint endpoint) { + // Worker's can't register before the recovery phase on the master is done to + // ensure the whole cluster is in a consistent state. + while (true) { + { + std::lock_guard guard(lock_); + if (recovery_done_) break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + std::lock_guard guard(lock_); auto workers = GetWorkers(); // Check if the desired worker id already exists. @@ -56,4 +66,18 @@ MasterCoordination::~MasterCoordination() { } } +void MasterCoordination::SetRecoveryInfo( + std::experimental::optional info) { + std::lock_guard guard(lock_); + recovery_done_ = true; + recovery_info_ = info; +} + +std::experimental::optional +MasterCoordination::RecoveryInfo() const { + std::lock_guard guard(lock_); + CHECK(recovery_done_) << "RecoveryInfo requested before it's available"; + return recovery_info_; +} + } // namespace distributed diff --git a/src/distributed/coordination_master.hpp b/src/distributed/coordination_master.hpp index d57d8ba02..a42194180 100644 --- a/src/distributed/coordination_master.hpp +++ b/src/distributed/coordination_master.hpp @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include "distributed/coordination.hpp" +#include "durability/recovery.hpp" #include "io/network/endpoint.hpp" namespace distributed { @@ -28,8 +30,21 @@ class MasterCoordination final : public Coordination { Endpoint GetEndpoint(int worker_id); + /// Sets the recovery info. nullopt indicates nothing was recovered. + void SetRecoveryInfo( + std::experimental::optional info); + + std::experimental::optional RecoveryInfo() const; + private: // Most master functions aren't thread-safe. mutable std::mutex lock_; + + /// Durabiliry recovery info. + /// Indicates if the recovery phase is done. + bool recovery_done_{false}; + /// If nullopt nothing was recovered. + std::experimental::optional recovery_info_; }; + } // namespace distributed diff --git a/src/distributed/coordination_rpc_messages.hpp b/src/distributed/coordination_rpc_messages.hpp index 5b992f764..756114b1b 100644 --- a/src/distributed/coordination_rpc_messages.hpp +++ b/src/distributed/coordination_rpc_messages.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include "boost/serialization/access.hpp" @@ -7,6 +8,7 @@ #include "boost/serialization/unordered_map.hpp" #include "communication/rpc/messages.hpp" +#include "durability/recovery.hpp" #include "io/network/endpoint.hpp" namespace distributed { @@ -34,11 +36,16 @@ struct RegisterWorkerReq : public Message { }; struct RegisterWorkerRes : public Message { - RegisterWorkerRes(bool registration_successful, - const std::unordered_map &workers) - : registration_successful(registration_successful), workers(workers) {} + RegisterWorkerRes( + bool registration_successful, + std::experimental::optional recovery_info, + std::unordered_map workers) + : registration_successful(registration_successful), + recovery_info(recovery_info), + workers(std::move(workers)) {} bool registration_successful; + std::experimental::optional recovery_info; std::unordered_map workers; private: @@ -49,6 +56,7 @@ struct RegisterWorkerRes : public Message { void serialize(TArchive &ar, unsigned int) { ar &boost::serialization::base_object(*this); ar ®istration_successful; + ar &recovery_info; ar &workers; } }; diff --git a/src/durability/paths.cpp b/src/durability/paths.cpp index 1c98ddd88..3f47ac2ce 100644 --- a/src/durability/paths.cpp +++ b/src/durability/paths.cpp @@ -12,7 +12,7 @@ namespace durability { namespace fs = std::experimental::filesystem; -/// Returns true if the given directory path exists or is succesfully created. + bool EnsureDir(const fs::path &dir) { std::error_code error_code; // Just for exception suppression. auto result = fs::create_directories(dir, error_code); @@ -21,8 +21,6 @@ bool EnsureDir(const fs::path &dir) { return result || !error_code.value(); } -/// Ensures the given durability directory exists and is ready for use. Creates -/// the directory if it doesn't exist. void CheckDurabilityDir(const std::string &durability_dir) { namespace fs = std::experimental::filesystem; if (fs::exists(durability_dir)) { @@ -36,12 +34,6 @@ void CheckDurabilityDir(const std::string &durability_dir) { } } -/// Returns the transaction id contained in the file name. If the filename is -/// not a parseable WAL file name, nullopt is returned. If the filename -/// represents the "current" WAL file, then the maximum possible transaction ID -/// is returned because that's appropriate for the recovery logic (the current -/// WAL does not yet have a maximum transaction ID and can't be discarded by -/// the recovery regardless of the snapshot from which the transaction starts). std::experimental::optional TransactionIdFromWalFilename( const std::string &name) { auto nullopt = std::experimental::nullopt; @@ -96,4 +88,25 @@ fs::path WalFilenameForTransactionId( file_name = file_name + "_Worker_" + std::to_string(worker_id); return wal_dir / file_name; } + +std::experimental::optional +TransactionIdFromSnapshotFilename(const std::string &name) { + auto nullopt = std::experimental::nullopt; + auto file_name_split = utils::RSplit(name, "_tx_", 1); + if (file_name_split.size() != 2) { + LOG(WARNING) << "Unable to parse snapshot file name: " << name; + return nullopt; + } + try { + return std::stoll(file_name_split[1]); + } catch (std::invalid_argument &) { + LOG(WARNING) << "Unable to parse snapshot file name tx ID: " + << file_name_split[1]; + return nullopt; + } catch (std::out_of_range &) { + LOG(WARNING) << "Unable to parse snapshot file name tx ID: " + << file_name_split[1]; + return nullopt; + } +} } // namespace durability diff --git a/src/durability/paths.hpp b/src/durability/paths.hpp index 7716016f0..2c9967d54 100644 --- a/src/durability/paths.hpp +++ b/src/durability/paths.hpp @@ -32,6 +32,11 @@ std::experimental::filesystem::path MakeSnapshotPath( const std::experimental::filesystem::path &durability_dir, int worker_id, tx::transaction_id_t tx_id); +/// Returns the transaction id contained in the file name. If the filename is +/// not a parseable WAL file name, nullopt is returned. +std::experimental::optional +TransactionIdFromSnapshotFilename(const std::string &name); + /// Generates a file path for a write-ahead log file of a specified worker. If /// given a transaction ID the file name will contain it. Otherwise the file /// path is for the "current" WAL file for which the max tx id is still unknown. diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index 592c3716c..cea21a855 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -37,6 +37,7 @@ using communication::bolt::DecodedValue; // snapshot and WAL recovery functions. struct RecoveryData { tx::transaction_id_t snapshooter_tx_id{0}; + tx::transaction_id_t wal_max_recovered_tx_id{0}; std::vector snapshooter_tx_snapshot; // A collection into which the indexes should be added so they // can be rebuilt at the end of the recovery transaction. @@ -303,6 +304,7 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, break; case database::StateDelta::Type::TRANSACTION_COMMIT: get_accessor(delta->transaction_id).Commit(); + recovery_data.wal_max_recovered_tx_id = delta->transaction_id; accessors.erase(accessors.find(delta->transaction_id)); break; case database::StateDelta::Type::BUILD_INDEX: @@ -326,7 +328,9 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, } } // anonymous namespace -bool Recover(const fs::path &durability_dir, database::GraphDb &db) { +RecoveryInfo Recover( + const fs::path &durability_dir, database::GraphDb &db, + std::experimental::optional required_recovery_info) { RecoveryData recovery_data; // Attempt to recover from snapshot files in reverse order (from newest @@ -338,6 +342,17 @@ bool Recover(const fs::path &durability_dir, database::GraphDb &db) { snapshot_files.emplace_back(file); std::sort(snapshot_files.rbegin(), snapshot_files.rend()); for (auto &snapshot_file : snapshot_files) { + if (required_recovery_info) { + auto snapshot_file_tx_id = + TransactionIdFromSnapshotFilename(snapshot_file); + if (!snapshot_file_tx_id || snapshot_file_tx_id.value() != + required_recovery_info->snapshot_tx_id) { + LOG(INFO) << "Skipping snapshot file '" << snapshot_file + << "' because it does not match the required snapshot tx id: " + << required_recovery_info->snapshot_tx_id; + continue; + } + } LOG(INFO) << "Starting snapshot recovery from: " << snapshot_file; if (!RecoverSnapshot(snapshot_file, db, recovery_data)) { recovery_data.Clear(); @@ -349,6 +364,12 @@ bool Recover(const fs::path &durability_dir, database::GraphDb &db) { } } + // If snapshot recovery is required, and we failed, don't even deal with the + // WAL recovery. + if (required_recovery_info && + recovery_data.snapshooter_tx_id != required_recovery_info->snapshot_tx_id) + return {recovery_data.snapshooter_tx_id, 0}; + // Write-ahead-log recovery. // WAL recovery does not have to be complete for the recovery to be // considered successful. For the time being ignore the return value, @@ -366,6 +387,8 @@ bool Recover(const fs::path &durability_dir, database::GraphDb &db) { db_accessor_indices.EnableIndex(key); } db_accessor_indices.Commit(); - return true; + + return {recovery_data.snapshooter_tx_id, + recovery_data.wal_max_recovered_tx_id}; } } // namespace durability diff --git a/src/durability/recovery.hpp b/src/durability/recovery.hpp index a8fbcc565..095db517d 100644 --- a/src/durability/recovery.hpp +++ b/src/durability/recovery.hpp @@ -1,16 +1,43 @@ #pragma once #include +#include #include #include "database/graph_db.hpp" #include "durability/hashed_file_reader.hpp" #include "storage/vertex_accessor.hpp" +#include "transactions/type.hpp" namespace fs = std::experimental::filesystem; namespace durability { +/// Stores info on what was (or needs to be) recovered from durability. +struct RecoveryInfo { + RecoveryInfo() {} + RecoveryInfo(tx::transaction_id_t snapshot_tx_id, + tx::transaction_id_t max_wal_tx_id) + : snapshot_tx_id(snapshot_tx_id), max_wal_tx_id(max_wal_tx_id) {} + tx::transaction_id_t snapshot_tx_id; + tx::transaction_id_t max_wal_tx_id; + + bool operator==(const RecoveryInfo &other) const { + return snapshot_tx_id == other.snapshot_tx_id && + max_wal_tx_id == other.max_wal_tx_id; + } + bool operator!=(const RecoveryInfo &other) const { return !(*this == other); } + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &snapshot_tx_id; + ar &max_wal_tx_id; + } +}; + /** Reads snapshot metadata from the end of the file without messing up the * hash. */ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count, @@ -23,8 +50,14 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count, * * @param durability_dir - Path to durability directory. * @param db - The database to recover into. - * @return - If recovery was succesful. + * @param required_recovery_info - Only used on distributed worker. Indicates + * what the master recovered. The same transactions must be recovered on the + * worker. + * @return - recovery info */ -bool Recover(const std::experimental::filesystem::path &durability_dir, - database::GraphDb &db); +RecoveryInfo Recover( + const std::experimental::filesystem::path &durability_dir, + database::GraphDb &db, + std::experimental::optional required_recovery_info); + } // namespace durability diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 26e1a9409..0101fc1fa 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -130,10 +130,10 @@ class DistributedGraphDbTest : public ::testing::Test { return std::distance(edges.begin(), edges.end()); }; + fs::path tmp_dir_ = fs::temp_directory_path() / + ("MG_test_unit_durability" + std::to_string(getpid())); + private: std::unique_ptr master_; std::vector> workers_; - - fs::path tmp_dir_ = fs::temp_directory_path() / - ("MG_test_unit_durability" + std::to_string(getpid())); }; diff --git a/tests/unit/distributed_coordination.cpp b/tests/unit/distributed_coordination.cpp index 04ce9cfd7..396100e08 100644 --- a/tests/unit/distributed_coordination.cpp +++ b/tests/unit/distributed_coordination.cpp @@ -71,6 +71,7 @@ TEST(Distributed, Coordination) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); + master_coord.SetRecoveryInfo(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); @@ -100,6 +101,7 @@ TEST(Distributed, DesiredAndUniqueId) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); + master_coord.SetRecoveryInfo(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); @@ -122,6 +124,7 @@ TEST(Distributed, CoordinationWorkersId) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); + master_coord.SetRecoveryInfo(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); @@ -147,6 +150,7 @@ TEST(Distributed, ClusterDiscovery) { std::vector> workers; { MasterCoordination master_coord(master_server.endpoint()); + master_coord.SetRecoveryInfo(std::experimental::nullopt); RpcWorkerClients rpc_worker_clients(master_coord); ClusterDiscoveryMaster master_discovery_(master_server, master_coord, rpc_worker_clients); diff --git a/tests/unit/distributed_durability.cpp b/tests/unit/distributed_durability.cpp index 62518113a..b422c5f18 100644 --- a/tests/unit/distributed_durability.cpp +++ b/tests/unit/distributed_durability.cpp @@ -1,35 +1,46 @@ #include "distributed_common.hpp" #include "database/graph_db_accessor.hpp" +#include "durability/snapshooter.hpp" class DistributedDurability : public DistributedGraphDbTest { public: - void write_labels() { - add_label(master(), "master"); - add_label(worker(1), "worker1"); - add_label(worker(2), "worker2"); + void AddVertices() { + AddVertex(master(), "master"); + AddVertex(worker(1), "worker1"); + AddVertex(worker(2), "worker2"); } - void check_labels() { - check_label(master(), "master"); - check_label(worker(1), "worker1"); - check_label(worker(2), "worker2"); + void CheckVertices(int expected_count) { + CheckVertex(master(), expected_count, "master"); + CheckVertex(worker(1), expected_count, "worker1"); + CheckVertex(worker(2), expected_count, "worker2"); + } + void RestartWithRecovery() { + ShutDown(); + Initialize([](database::Config config) { + config.db_recover_on_startup = true; + return config; + }); } private: - void add_label(database::GraphDb &db, const std::string &label) { + void AddVertex(database::GraphDb &db, const std::string &label) { database::GraphDbAccessor dba(db); auto vertex = dba.InsertVertex(); vertex.add_label(dba.Label(label)); dba.Commit(); } - void check_label(database::GraphDb &db, const std::string &label) { + void CheckVertex(database::GraphDb &db, int expected_count, + const std::string &label) { database::GraphDbAccessor dba(db); auto it = dba.Vertices(false); - ASSERT_NE(it.begin(), it.end()); - auto vertex = *it.begin(); - ASSERT_EQ(vertex.labels().size(), 1); - EXPECT_EQ(vertex.labels()[0], dba.Label(label)); + std::vector vertices{it.begin(), it.end()}; + EXPECT_EQ(vertices.size(), expected_count); + for (auto &vertex : vertices) { + ASSERT_EQ(vertex.labels().size(), 1); + EXPECT_EQ(vertex.labels()[0], dba.Label(label)); + } } }; @@ -37,18 +48,14 @@ TEST_F(DistributedDurability, MakeSnapshot) { // Create a graph with 3 nodes with 3 labels, one on each and make a snapshot // of it { - write_labels(); + AddVertices(); database::GraphDbAccessor dba(master()); master().MakeSnapshot(dba); } // Recover the graph and check if it's the same as before { - ShutDown(); - Initialize([](database::Config config) { - config.db_recover_on_startup = true; - return config; - }); - check_labels(); + RestartWithRecovery(); + CheckVertices(1); } } @@ -59,17 +66,52 @@ TEST_F(DistributedDurability, SnapshotOnExit) { config.snapshot_on_exit = true; return config; }); - write_labels(); + AddVertices(); } // Recover the graph and check if it's the same as before { - // This should force the db to make a snapshot - ShutDown(); - - Initialize([](database::Config config) { - config.db_recover_on_startup = true; - return config; - }); - check_labels(); + RestartWithRecovery(); + CheckVertices(1); } } + +TEST_F(DistributedDurability, RecoveryFromSameSnapshot) { + { + AddVertices(); + // Make snapshot on one worker, expect it won't recover from that. + database::GraphDbAccessor dba(worker(1)); + worker(1).MakeSnapshot(dba); + } + { + RestartWithRecovery(); + CheckVertices(0); + AddVertices(); + database::GraphDbAccessor dba(master()); + master().MakeSnapshot(dba); + } + { + RestartWithRecovery(); + CheckVertices(1); + AddVertices(); + CheckVertices(2); + // Make snapshot on one worker, expect it won't recover from that. + database::GraphDbAccessor dba(worker(1)); + worker(1).MakeSnapshot(dba); + } + { + RestartWithRecovery(); + CheckVertices(1); + } +} + +TEST_F(DistributedDurability, RecoveryFailure) { + { + AddVertices(); + // Make a snapshot on the master without the right snapshots on workers. + database::GraphDbAccessor dba(master()); + bool status = durability::MakeSnapshot(master(), dba, tmp_dir_, 100); + ASSERT_TRUE(status); + } + ::testing::FLAGS_gtest_death_test_style = "threadsafe"; + EXPECT_DEATH(RestartWithRecovery(), "worker failed to recover"); +} diff --git a/tests/unit/rpc_worker_clients.cpp b/tests/unit/rpc_worker_clients.cpp index c62c417c7..db4ba1cb7 100644 --- a/tests/unit/rpc_worker_clients.cpp +++ b/tests/unit/rpc_worker_clients.cpp @@ -32,6 +32,7 @@ class RpcWorkerClientsTest : public ::testing::Test { const io::network::Endpoint kLocalHost{"127.0.0.1", 0}; const int kWorkerCount = 2; void SetUp() override { + master_coord_->SetRecoveryInfo(std::experimental::nullopt); for (int i = 1; i <= kWorkerCount; ++i) { workers_server_.emplace_back( std::make_unique(kLocalHost)); diff --git a/tools/tests/mg_recovery_check.cpp b/tools/tests/mg_recovery_check.cpp index c23ca4bea..962f5e73d 100644 --- a/tools/tests/mg_recovery_check.cpp +++ b/tools/tests/mg_recovery_check.cpp @@ -1,3 +1,4 @@ +#include #include #include "gflags/gflags.h" @@ -20,7 +21,7 @@ class RecoveryTest : public ::testing::Test { protected: void SetUp() override { std::string durability_dir(FLAGS_durability_dir); - durability::Recover(durability_dir, db_); + durability::Recover(durability_dir, db_, std::experimental::nullopt); } database::SingleNode db_;