From 15b57e2d520428fa882e0beba3bf19c98098e1eb Mon Sep 17 00:00:00 2001 From: "matej.gradicek" <matej.gradicek@memgraph.io> Date: Tue, 6 Jun 2017 15:04:49 +0000 Subject: [PATCH] Implemented recovery and tests. Summary: Implemented durability recovery and tesats. Reviewers: mislav.bradac, dgleich, buda Reviewed By: mislav.bradac, dgleich, buda Subscribers: dtomicevic, mferencevic, pullbot Differential Revision: https://phabricator.memgraph.io/D374 --- CMakeLists.txt | 1 + config/memgraph.yaml | 9 +- src/config/config.hpp | 2 + src/database/graph_db.cpp | 46 ++++- src/database/graph_db.hpp | 22 ++- src/dbms/dbms.cpp | 5 +- src/dbms/dbms.hpp | 23 ++- src/durability/file_reader_buffer.hpp | 97 +++++++++++ src/durability/file_writer_buffer.hpp | 21 +-- src/durability/hasher.hpp | 37 +++++ src/durability/recovery.cpp | 67 ++++++++ src/durability/recovery.hpp | 33 ++++ src/durability/summary.hpp | 12 ++ tests/unit/dbms_recovery.cpp | 100 +++++++++++ tests/unit/recovery.cpp | 231 ++++++++++++++++++++++++++ tests/unit/snapshot.cpp | 12 ++ 16 files changed, 684 insertions(+), 34 deletions(-) create mode 100644 src/durability/file_reader_buffer.hpp create mode 100644 src/durability/hasher.hpp create mode 100644 src/durability/recovery.cpp create mode 100644 src/durability/recovery.hpp create mode 100644 src/durability/summary.hpp create mode 100644 tests/unit/dbms_recovery.cpp create mode 100644 tests/unit/recovery.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 2fd4e3bcc..9c24514ac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -343,6 +343,7 @@ set(memgraph_src_files ${src_dir}/threading/thread.cpp ${src_dir}/mvcc/id.cpp ${src_dir}/durability/snapshooter.cpp + ${src_dir}/durability/recovery.cpp # ${src_dir}/snapshot/snapshot_engine.cpp # ${src_dir}/snapshot/snapshoter.cpp # ${src_dir}/snapshot/snapshot_encoder.cpp diff --git a/config/memgraph.yaml b/config/memgraph.yaml index 9aeeaf937..aceb19d42 100644 --- a/config/memgraph.yaml +++ b/config/memgraph.yaml @@ -19,7 +19,11 @@ snapshots_path: "snapshots" cleaning_cycle_sec: "30" # snapshot cycle interval -snapshot_cycle_sec: "60" +# if set to -1 the snapshooter will not run +snapshot_cycle_sec: "-1" + +# create snapshot disabled on db destruction +snapshot_db_destruction: false # max number of snapshots which will be kept on the disk at some point # if set to -1 the max number of snapshots is unlimited @@ -27,3 +31,6 @@ max_retained_snapshots: "-1" # by default query engine runs in interpret mode interpret: true + +# database recovering is disabled by default +recovery: false diff --git a/src/config/config.hpp b/src/config/config.hpp index 9178baa90..8571d9e2e 100644 --- a/src/config/config.hpp +++ b/src/config/config.hpp @@ -25,8 +25,10 @@ constexpr const char *TEMPLATE_CPP_PATH = "template_cpp_path"; constexpr const char *SNAPSHOTS_PATH = "snapshots_path"; constexpr const char *CLEANING_CYCLE_SEC = "cleaning_cycle_sec"; constexpr const char *SNAPSHOT_CYCLE_SEC = "snapshot_cycle_sec"; +constexpr const char *SNAPSHOT_DB_DESTRUCTION = "snapshot_db_destruction"; constexpr const char *MAX_RETAINED_SNAPSHOTS = "max_retained_snapshots"; constexpr const char *INTERPRET = "interpret"; +constexpr const char *RECOVERY = "recovery"; // -- all possible Memgraph's keys -- inline long long to_int(const std::string &s) { return stoll(s); } diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 5681d146e..a61484f80 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -4,6 +4,7 @@ #include "database/creation_exception.hpp" #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" +#include "durability/recovery.hpp" #include "logging/logger.hpp" #include "storage/edge.hpp" #include "storage/garbage_collector.hpp" @@ -13,7 +14,7 @@ const std::string DEFAULT_SNAPSHOT_FOLDER = "snapshots"; const int DEFAULT_MAX_RETAINED_SNAPSHOTS = -1; // unlimited number of snapshots const int DEFAULT_SNAPSHOT_CYCLE_SEC = -1; // off -GraphDb::GraphDb(const std::string &name, bool import_snapshot) +GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir) : name_(name), gc_vertices_(vertices_, vertex_record_deleter_, vertex_version_list_deleter_), @@ -48,27 +49,33 @@ GraphDb::GraphDb(const std::string &name, bool import_snapshot) }); } - // Creating snapshoter + RecoverDatabase(snapshot_db_dir); + StartSnapshooting(); + +} + +void GraphDb::StartSnapshooting() { const std::string max_retained_snapshots_str = CONFIG(config::MAX_RETAINED_SNAPSHOTS); const std::string snapshot_cycle_sec_str = - CONFIG(config::MAX_RETAINED_SNAPSHOTS); + CONFIG(config::SNAPSHOT_CYCLE_SEC); const std::string snapshot_folder_str = CONFIG(config::SNAPSHOTS_PATH); - int max_retained_snapshots_ = DEFAULT_MAX_RETAINED_SNAPSHOTS; + max_retained_snapshots_ = DEFAULT_MAX_RETAINED_SNAPSHOTS; if (!max_retained_snapshots_str.empty()) max_retained_snapshots_ = CONFIG_INTEGER(config::MAX_RETAINED_SNAPSHOTS); - int snapshot_cycle_sec_ = DEFAULT_SNAPSHOT_CYCLE_SEC; + snapshot_cycle_sec_ = DEFAULT_SNAPSHOT_CYCLE_SEC; if (!snapshot_cycle_sec_str.empty()) snapshot_cycle_sec_ = CONFIG_INTEGER(config::SNAPSHOT_CYCLE_SEC); - std::string snapshot_folder_ = DEFAULT_SNAPSHOT_FOLDER; + snapshot_folder_ = DEFAULT_SNAPSHOT_FOLDER; if (!snapshot_folder_str.empty()) snapshot_folder_ = snapshot_folder_str; + snapshot_db_destruction_ = CONFIG_BOOL(config::SNAPSHOT_DB_DESTRUCTION); + if (snapshot_cycle_sec_ != -1) { - auto create_snapshot = [this, snapshot_folder_, - max_retained_snapshots_]() -> void { + auto create_snapshot = [this]() -> void { GraphDbAccessor db_accessor(*this); snapshooter_.MakeSnapshot(db_accessor, fs::path(snapshot_folder_) / name_, max_retained_snapshots_); @@ -78,6 +85,22 @@ GraphDb::GraphDb(const std::string &name, bool import_snapshot) } } +void GraphDb::RecoverDatabase(const fs::path &snapshot_db_dir) { + if (snapshot_db_dir.empty()) return; + std::vector<fs::path> snapshots; + for (auto &snapshot_file : fs::directory_iterator(snapshot_db_dir)) + snapshots.push_back(snapshot_file); + + std::sort(snapshots.rbegin(), snapshots.rend()); + Recovery recovery; + for (auto &snapshot_file : snapshots) { + GraphDbAccessor db_accessor(*this); + if (recovery.Recover(snapshot_file.string(), db_accessor)) { + return; + } + } +} + GraphDb::~GraphDb() { // Stop the gc scheduler to not run into race conditions for deletions. gc_scheduler_.Stop(); @@ -86,6 +109,13 @@ GraphDb::~GraphDb() { // deleted. snapshot_creator_.Stop(); + // Create last database snapshot + if (snapshot_db_destruction_) { + GraphDbAccessor db_accessor(*this); + snapshooter_.MakeSnapshot(db_accessor, fs::path(snapshot_folder_) / name_, + max_retained_snapshots_); + } + // Delete vertices and edges which weren't collected before, also deletes // records inside version list for (auto &vertex : this->vertices_.access()) delete vertex; diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp index 6bc7acef5..85138d8f3 100644 --- a/src/database/graph_db.hpp +++ b/src/database/graph_db.hpp @@ -20,6 +20,8 @@ #include "transactions/engine.hpp" #include "utils/scheduler.hpp" +namespace fs = std::experimental::filesystem; + // TODO: Maybe split this in another layer between Db and Dbms. Where the new // layer would hold SnapshotEngine and his kind of concept objects. Some // guidelines would be: retain objects which are necessary to implement querys @@ -39,7 +41,7 @@ class GraphDb { * @param import_snapshot will in constructor import latest snapshot * into the db. */ - GraphDb(const std::string &name, bool import_snapshot = true); + GraphDb(const std::string &name, const fs::path &snapshot_db_dir); /** * @brief - Destruct database object. Delete all vertices and edges and free * all deferred deleters. @@ -51,6 +53,17 @@ class GraphDb { */ GraphDb(const GraphDb &db) = delete; + /** + * Starts database snapshooting. + */ + void StartSnapshooting(); + + /** + * Recovers database from a snapshot file and starts snapshooting. + * @param snapshot_db path to snapshot folder + */ + void RecoverDatabase(const fs::path &snapshot_db_path); + /** transaction engine related to this database */ tx::Engine tx_engine; @@ -58,9 +71,6 @@ class GraphDb { // TODO bring back garbage collection // Garbage garbage = {tx_engine}; - // TODO bring back shapshot engine - // SnapshotEngine snap_engine = {*this}; - // database name // TODO consider if this is even necessary const std::string name_; @@ -94,6 +104,10 @@ class GraphDb { // snapshooter Snapshooter snapshooter_; + std::string snapshot_folder_; + int max_retained_snapshots_; + int snapshot_cycle_sec_; + bool snapshot_db_destruction_; // Schedulers Scheduler<std::mutex> gc_scheduler_; diff --git a/src/dbms/dbms.cpp b/src/dbms/dbms.cpp index 94119e59c..2bc8e3acc 100644 --- a/src/dbms/dbms.cpp +++ b/src/dbms/dbms.cpp @@ -5,13 +5,14 @@ std::unique_ptr<GraphDbAccessor> Dbms::active() { *active_db.load(std::memory_order_acquire)); } -std::unique_ptr<GraphDbAccessor> Dbms::active(const std::string &name) { +std::unique_ptr<GraphDbAccessor> Dbms::active(const std::string &name, + const fs::path &snapshot_db_dir) { auto acc = dbs.access(); // create db if it doesn't exist auto it = acc.find(name); if (it == acc.end()) { it = acc.emplace(name, std::forward_as_tuple(name), - std::forward_as_tuple(name)) + std::forward_as_tuple(name, snapshot_db_dir)) .first; } diff --git a/src/dbms/dbms.hpp b/src/dbms/dbms.hpp index 38945880f..90c34613b 100644 --- a/src/dbms/dbms.hpp +++ b/src/dbms/dbms.hpp @@ -1,18 +1,33 @@ #pragma once +#include <algorithm> #include <memory> +#include <vector> #include "config/config.hpp" #include "data_structures/concurrent/concurrent_map.hpp" #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" +#include "durability/recovery.hpp" //#include "dbms/cleaner.hpp" -//#include "snapshot/snapshoter.hpp" + +namespace fs = std::experimental::filesystem; +const std::string DEFAULT_SNAPSHOT_FOLDER = "snapshots"; class Dbms { public: Dbms() { + if (CONFIG_BOOL(config::RECOVERY)) { + auto accessor = dbs.access(); + std::string snapshot_folder = CONFIG(config::SNAPSHOTS_PATH); + if (snapshot_folder.empty()) snapshot_folder = DEFAULT_SNAPSHOT_FOLDER; + for (auto &snapshot_db : fs::directory_iterator(snapshot_folder)) { + // create db and set it active + active(snapshot_db.path().filename(), snapshot_db); + } + } + // create the default database and set is a active active("default"); } @@ -25,11 +40,13 @@ class Dbms { /** * Set the database with the given name to be active. * If there is no database with the given name, - * it's created. + * it's created. If snapshooting is true, snapshooter starts + * snapshooting on database creation. * * @return an accessor to the database with the given name. */ - std::unique_ptr<GraphDbAccessor> active(const std::string &name); + std::unique_ptr<GraphDbAccessor> active( + const std::string &name, const fs::path &snapshot_db_dir = fs::path()); // TODO: DELETE action diff --git a/src/durability/file_reader_buffer.hpp b/src/durability/file_reader_buffer.hpp new file mode 100644 index 000000000..ab0774cce --- /dev/null +++ b/src/durability/file_reader_buffer.hpp @@ -0,0 +1,97 @@ +#pragma once + +#include <fstream> +#include "durability/summary.hpp" +#include "hasher.hpp" +#include "utils/bswap.hpp" + +/** + * Buffer reads data from file and calculates hash of read data. Implements + * template param Buffer interface from BaseDecoder class. Should be closed + * before destructing. + */ +class FileReaderBuffer { + public: + /** + * Opens ifstream to a file, resets hash and reads summary. Returns false if + * opening fails. + * @param file: + * path to a file to which should be read. + * @param summary: + * reference to a summary object where summary should be written. + */ + bool Open(const std::string &file, snapshot::Summary &summary) { + input_stream_.open(file); + if (input_stream_.fail()) return false; + return ReadSummary(summary); + } + + /** + * Closes ifstream. Returns false if closing fails. + */ + bool Close() { + input_stream_.close(); + return !input_stream_.fail(); + } + + /** + * Reads data from stream. + * @param data: + * pointer where data should be stored. + * @param n: + * data length. + */ + bool Read(uint8_t *data, size_t n) { + input_stream_.read(reinterpret_cast<char *>(data), n); + if (input_stream_.fail()) return false; + hasher_.Update(data, n); + return true; + } + /** + * Returns hash of read data. + */ + uint64_t hash() const { return hasher_.hash(); } + + private: + /** + * Reads type T from buffer. Data is written in buffer in big endian format. + * Expected system endianness is little endian. + */ + template <typename T> + bool ReadType(T &val) { + if (!Read(reinterpret_cast<uint8_t *>(&val), sizeof(T))) return false; + // TODO: must be platform specific in the future + val = bswap(val); + return true; + } + + /** + * Reads summary from the end of a file and resets hash. Method should be + * called after ifstream opening. Stream starts reading data from the + * beginning of file in the next read call. Returns false if reading fails. + * @param summary: + * reference to a summary object where summary should be written. + */ + bool ReadSummary(snapshot::Summary &summary) { + debug_assert(input_stream_.tellg() == 0, + "Summary should be read before other data!"); + input_stream_.seekg(-static_cast<int64_t>(sizeof(snapshot::Summary)), + std::ios::end); + if (input_stream_.fail()) return false; + if (!ReadType(summary.vertex_num_) || !ReadType(summary.edge_num_) || + !ReadType(summary.hash_)) + return false; + input_stream_.seekg(0, std::ios::beg); + hasher_.Reset(); + return !input_stream_.fail(); + } + + /** + * Used for calculating hash of read data. + */ + Hasher hasher_; + /** + * Ifstream used for reading from file. + */ + std::ifstream input_stream_; +}; diff --git a/src/durability/file_writer_buffer.hpp b/src/durability/file_writer_buffer.hpp index 017b686a3..852a65f43 100644 --- a/src/durability/file_writer_buffer.hpp +++ b/src/durability/file_writer_buffer.hpp @@ -2,6 +2,7 @@ #include <fstream> #include "utils/bswap.hpp" +#include "hasher.hpp" /** * Buffer that writes data to file and calculates hash of written data. @@ -40,7 +41,7 @@ class FileWriterBuffer { * data length. */ void Write(const uint8_t *data, size_t n) { - UpdateHash(data, n); + hasher_.Update(data, n); output_stream_.write(reinterpret_cast<const char *>(data), n); } /** @@ -62,18 +63,10 @@ class FileWriterBuffer { debug_assert(vertex_num >= 0, "Number of edges should't be negative"); WriteLong(vertex_num); WriteLong(edge_num); - WriteLong(hash_); + WriteLong(hasher_.hash()); } private: - /** - * Hash function is H(n) = H(n-1) * prime + data where data is unsigned char. - * TODO implement different hash function - */ - void UpdateHash(const uint8_t *data, size_t n) { - for (int i = 0; i < n; ++i) hash_ = hash_ * kPrime + data[i] + 1; - } - /** * Method writes uint64_t to ofstream. */ @@ -88,11 +81,7 @@ class FileWriterBuffer { */ std::ofstream output_stream_; /** - * Represents hash of current data. + * Used to calculate hash of written data. */ - uint64_t hash_ = 0; - /** - * Prime number used for hashing. - */ - const uint64_t kPrime = 3137; + Hasher hasher_; }; diff --git a/src/durability/hasher.hpp b/src/durability/hasher.hpp new file mode 100644 index 000000000..0a1f0776e --- /dev/null +++ b/src/durability/hasher.hpp @@ -0,0 +1,37 @@ +#pragma once + +// TODO: implement better hash function + +/** + * Class calculates hash of the data dynamically. + */ +class Hasher { + public: + Hasher() = default; + /** + * Sets hash to 0. + */ + void Reset() { hash_ = 0; } + /** + * Updates hash from given data. + * @param data data from which hash will be updated + * @param n length of the data + */ + void Update(const uint8_t *data, size_t n) { + for (int i = 0; i < n; ++i) hash_ = hash_ * kPrime + data[i] + 1; + } + /** + * Returns current hash value. + */ + uint64_t hash() const { return hash_; } + + private: + /** + * Prime number used in calculating hash. + */ + const uint64_t kPrime = 3137; + /** + * Hash of data. + */ + uint64_t hash_ = 0; +}; diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp new file mode 100644 index 000000000..4b5e8ea0e --- /dev/null +++ b/src/durability/recovery.cpp @@ -0,0 +1,67 @@ +#include "durability/recovery.hpp" +#include "communication/bolt/v1/decoder/decoder.hpp" +#include "durability/file_reader_buffer.hpp" + +bool Recovery::Recover(const fs::path &snapshot_file, + GraphDbAccessor &db_accessor) { + if (!fs::exists(snapshot_file)) return false; + if (!Decode(snapshot_file, db_accessor)) { + db_accessor.abort(); + return false; + } + db_accessor.commit(); + return true; +} + +bool Recovery::Decode(const fs::path &snapshot_file, + GraphDbAccessor &db_accessor) { + FileReaderBuffer buffer; + communication::bolt::Decoder<FileReaderBuffer> decoder(buffer); + + snapshot::Summary summary; + if (!buffer.Open(snapshot_file, summary)) { + buffer.Close(); + return false; + } + std::unordered_map<uint64_t, VertexAccessor> vertices; + + for (int64_t i = 0; i < summary.vertex_num_; ++i) { + communication::bolt::DecodedVertex vertex; + if (!decoder.ReadVertex(&vertex)) { + buffer.Close(); + return false; + } + auto vertex_accessor = db_accessor.insert_vertex(); + for (const auto &label : vertex.labels) { + vertex_accessor.add_label(db_accessor.label(label)); + } + for (const auto &property_pair : vertex.properties) { + vertex_accessor.PropsSet(db_accessor.property(property_pair.first), + property_pair.second); + } + vertices.insert({vertex.id, vertex_accessor}); + } + for (int64_t i = 0; i < summary.edge_num_; ++i) { + communication::bolt::DecodedEdge edge; + if (!decoder.ReadEdge(&edge)) { + buffer.Close(); + return false; + } + auto it_from = vertices.find(edge.from); + auto it_to = vertices.find(edge.to); + if (it_from == vertices.end() || it_to == vertices.end()) { + buffer.Close(); + return false; + } + auto edge_accessor = db_accessor.insert_edge( + it_from->second, it_to->second, db_accessor.edge_type(edge.type)); + + for (const auto &property_pair : edge.properties) + edge_accessor.PropsSet(db_accessor.property(property_pair.first), + property_pair.second); + } + + uint64_t hash = buffer.hash(); + if (!buffer.Close()) return false; + return hash == summary.hash_; +} diff --git a/src/durability/recovery.hpp b/src/durability/recovery.hpp new file mode 100644 index 000000000..0bf86e1b2 --- /dev/null +++ b/src/durability/recovery.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include <experimental/filesystem> +#include <unordered_map> +#include "database/graph_db_accessor.hpp" +#include "storage/vertex_accessor.hpp" + +namespace fs = std::experimental::filesystem; + +/** + * Class used to recover database from snapshot file. + */ +class Recovery { + public: + /** + * Recovers database from snapshot_file. Graph elements are inserted + * in graph using db_accessor. If recovering fails, false is returned and + * db_accessor aborts transaction, else true is returned and transaction is + * commited. + * @param snapshot_file: + * path to snapshot file + * @param db_accessor: + * GraphDbAccessor used to access database. + */ + bool Recover(const fs::path &snapshot_file, GraphDbAccessor &db_accessor); + + private: + /** + * Decodes database from snapshot_file. Graph emlements are inserted in + * graph using db_accessor. If decoding fails, false is returned, else ture. + */ + bool Decode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor); +}; diff --git a/src/durability/summary.hpp b/src/durability/summary.hpp new file mode 100644 index 000000000..417ef6574 --- /dev/null +++ b/src/durability/summary.hpp @@ -0,0 +1,12 @@ +#pragma once + +namespace snapshot { +/** + * Struct represents graph summary in a snapshot file. + */ +struct Summary { + int64_t vertex_num_ = 0LL; + int64_t edge_num_ = 0LL; + uint64_t hash_ = 0ULL; +}; +} diff --git a/tests/unit/dbms_recovery.cpp b/tests/unit/dbms_recovery.cpp new file mode 100644 index 000000000..afd599f25 --- /dev/null +++ b/tests/unit/dbms_recovery.cpp @@ -0,0 +1,100 @@ +#include <experimental/filesystem> +#include "dbms/dbms.hpp" +#include "gtest/gtest.h" + +namespace fs = std::experimental::filesystem; + +const fs::path SNAPSHOTS_DBMS_RECOVERY_ALL_DB = std::tmpnam(nullptr); +const fs::path SNAPSHOTS_DBMS_RECOVERY_DEFAULT_DB_DIR = + SNAPSHOTS_DBMS_RECOVERY_ALL_DB / "default"; + +std::vector<fs::path> GetFilesFromDir( + const std::string &snapshots_default_db_dir) { + std::vector<fs::path> files; + for (auto &file : fs::directory_iterator(snapshots_default_db_dir)) + files.push_back(file.path()); + return files; +} + +void CleanDbDir() { + if (!fs::exists(SNAPSHOTS_DBMS_RECOVERY_DEFAULT_DB_DIR)) return; + std::vector<fs::path> files = + GetFilesFromDir(SNAPSHOTS_DBMS_RECOVERY_DEFAULT_DB_DIR); + for (auto file : files) fs::remove(file); +} + +class DbmsRecoveryTest : public ::testing::Test { + protected: + virtual void TearDown() { + CleanDbDir(); + CONFIG(config::SNAPSHOTS_PATH) = snapshots_path_setup_; + CONFIG(config::SNAPSHOT_CYCLE_SEC) = snapshot_cycle_sec_setup_; + } + + virtual void SetUp() { + CleanDbDir(); + snapshots_path_setup_ = CONFIG(config::SNAPSHOTS_PATH); + snapshot_cycle_sec_setup_ = CONFIG(config::SNAPSHOT_CYCLE_SEC); + CONFIG(config::SNAPSHOTS_PATH) = SNAPSHOTS_DBMS_RECOVERY_ALL_DB; + CONFIG(config::SNAPSHOT_CYCLE_SEC) = "-1"; + } + std::string snapshots_path_setup_; + std::string snapshot_cycle_sec_setup_; +}; + +void CreateSnapshot() { + CONFIG(config::RECOVERY) = "false"; + Dbms dbms; + auto dba = dbms.active(); + + // setup (v1) - [:likes] -> (v2) <- [:hates] - (v3) + auto va1 = dba->insert_vertex(); + auto va2 = dba->insert_vertex(); + dba->insert_edge(va1, va2, dba->edge_type("likes")); + auto va3 = dba->insert_vertex(); + dba->insert_edge(va3, va2, dba->edge_type("hates")); + dba->advance_command(); + + Snapshooter snapshooter; + snapshooter.MakeSnapshot(*dba.get(), SNAPSHOTS_DBMS_RECOVERY_DEFAULT_DB_DIR, + 1); +} + +void RecoverDbms() { + CONFIG(config::RECOVERY) = "true"; + Dbms dbms; + auto dba = dbms.active(); + + std::vector<VertexAccessor> vertices; + std::vector<EdgeAccessor> edges; + + int vertex_count = 0; + for (auto const &vertex : dba->vertices()) { + vertices.push_back(vertex); + vertex_count++; + } + EXPECT_EQ(vertex_count, 3); + + int edge_count = 0; + for (auto const &edge : dba->edges()) { + EXPECT_NE(vertices.end(), + std::find(vertices.begin(), vertices.end(), edge.to())); + EXPECT_NE(vertices.end(), + std::find(vertices.begin(), vertices.end(), edge.from())); + edges.push_back(edge); + edge_count++; + } + EXPECT_EQ(edge_count, 2); + EXPECT_EQ(edges[0].to() == edges[1].to(), true); + EXPECT_EQ(edges[0].from() == edges[1].from(), false); +} + +TEST_F(DbmsRecoveryTest, TestDbmsRecovery) { + CreateSnapshot(); + RecoverDbms(); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/recovery.cpp b/tests/unit/recovery.cpp new file mode 100644 index 000000000..0e4767ab4 --- /dev/null +++ b/tests/unit/recovery.cpp @@ -0,0 +1,231 @@ +#include "durability/recovery.hpp" +#include <cstdio> +#include <experimental/filesystem> +#include "communication/bolt/v1/decoder/decoder.hpp" +#include "dbms/dbms.hpp" +#include "durability/file_reader_buffer.hpp" +#include "gtest/gtest.h" +#include "utils/assert.hpp" + +namespace fs = std::experimental::filesystem; + +const fs::path SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR = std::tmpnam(nullptr); + +std::vector<fs::path> GetFilesFromDir( + const std::string &snapshots_default_db_dir) { + std::vector<fs::path> files; + for (auto &file : fs::directory_iterator(snapshots_default_db_dir)) + files.push_back(file.path()); + return files; +} + +void CleanDbDir() { + if (!fs::exists(SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR)) return; + std::vector<fs::path> files = + GetFilesFromDir(SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR); + for (auto file : files) { + fs::remove(file); + } +} + +class RecoveryTest : public ::testing::Test { + protected: + void TearDown() override { + CleanDbDir(); + CONFIG(config::SNAPSHOT_CYCLE_SEC) = snapshot_cycle_sec_setup_; + } + + void SetUp() override { + CleanDbDir(); + snapshot_cycle_sec_setup_ = CONFIG(config::SNAPSHOT_CYCLE_SEC); + CONFIG(config::SNAPSHOT_CYCLE_SEC) = "-1"; + } + std::string snapshot_cycle_sec_setup_; + const int max_retained_snapshots_ = 10; +}; + +void CreateSmallGraph(Dbms &dbms) { + auto dba = dbms.active(); + + // setup (v1) - [:likes] -> (v2) <- [:hates] - (v3) + auto va1 = dba->insert_vertex(); + auto va2 = dba->insert_vertex(); + dba->insert_edge(va1, va2, dba->edge_type("likes")); + auto va3 = dba->insert_vertex(); + dba->insert_edge(va3, va2, dba->edge_type("hates")); + dba->commit(); +} + +void CreateBigGraph(Dbms &dbms) { + // creates graph with one inner vertex connected with other 999 outer vertices + // relationships are directed from outer vertices to the inner vertex + // every vertex hash label "label" and property "prop" with value "prop" + // every relationship has type "type" and property "prop" with value "prop" + auto dba = dbms.active(); + auto va_middle = dba->insert_vertex(); + va_middle.add_label(dba->label("label")); + va_middle.PropsSet(dba->property("prop"), "prop"); + for (int i = 1; i < 1000; ++i) { + auto va = dba->insert_vertex(); + va.add_label(dba->label("label")); + va.PropsSet(dba->property("prop"), "prop"); + auto ea = dba->insert_edge(va, va_middle, dba->edge_type("type")); + ea.PropsSet(dba->property("prop"), "prop"); + } + dba->commit(); +} + +void TakeSnapshot(Dbms &dbms, int max_retained_snapshots_) { + auto dba = dbms.active(); + Snapshooter snapshooter; + snapshooter.MakeSnapshot(*dba.get(), SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR, + max_retained_snapshots_); +} + +std::string GetLatestSnapshot() { + std::vector<fs::path> files = + GetFilesFromDir(SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR); + permanent_assert(static_cast<int>(files.size()) == 1, + "No snapshot files in folder."); + std::sort(files.rbegin(), files.rend()); + return files[0]; +} + +TEST_F(RecoveryTest, TestEncoding) { + // Creates snapshot of the small graph. Uses file_reader_buffer and bolt + // decoder to read data from the snapshot and reads graph from it. After + // reading graph is tested. + Dbms dbms; + CreateSmallGraph(dbms); + TakeSnapshot(dbms, max_retained_snapshots_); + std::string snapshot = GetLatestSnapshot(); + + FileReaderBuffer buffer; + communication::bolt::Decoder<FileReaderBuffer> decoder(buffer); + + snapshot::Summary summary; + buffer.Open(snapshot, summary); + + std::vector<int64_t> ids; + std::vector<std::string> edge_types; + + for (int i = 0; i < summary.vertex_num_; ++i) { + communication::bolt::DecodedVertex vertex; + decoder.ReadVertex(&vertex); + ids.push_back(vertex.id); + } + std::vector<int> from, to; + for (int i = 0; i < summary.edge_num_; ++i) { + communication::bolt::DecodedEdge edge; + decoder.ReadEdge(&edge); + from.push_back(edge.from); + to.push_back(edge.to); + edge_types.push_back(edge.type); + } + buffer.Close(); + + permanent_assert(static_cast<int>(to.size()) == 2, + "There should be two edges."); + permanent_assert(static_cast<int>(from.size()) == 2, + "There should be two edges."); + + EXPECT_EQ(buffer.hash(), summary.hash_); + EXPECT_NE(edge_types.end(), + std::find(edge_types.begin(), edge_types.end(), "hates")); + EXPECT_NE(edge_types.end(), + std::find(edge_types.begin(), edge_types.end(), "likes")); + EXPECT_EQ(to[0], to[1]); + EXPECT_NE(from[0], from[1]); + EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), to[0])); + EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), from[0])); + EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), from[1])); +} + +TEST_F(RecoveryTest, TestEncodingAndDecoding) { + // Creates snapshot of the small graph. Uses Recovery to recover graph from + // the snapshot file. After creation graph is tested. + Dbms dbms; + CreateSmallGraph(dbms); + TakeSnapshot(dbms, max_retained_snapshots_); + std::string snapshot = GetLatestSnapshot(); + + // New dbms is needed - old dbms has database "default" + Dbms dbms_recover; + auto dba_recover = dbms_recover.active(); + + Recovery recovery; + EXPECT_TRUE(recovery.Recover(snapshot, *dba_recover)); + + std::vector<VertexAccessor> vertices; + std::vector<EdgeAccessor> edges; + + auto dba = dbms_recover.active(); + int64_t vertex_count = 0; + for (const auto &vertex : dba->vertices()) { + vertices.push_back(vertex); + vertex_count++; + } + EXPECT_EQ(vertex_count, 3); + + int64_t edge_count = 0; + for (const auto &edge : dba->edges()) { + EXPECT_NE(vertices.end(), + std::find(vertices.begin(), vertices.end(), edge.to())); + EXPECT_NE(vertices.end(), + std::find(vertices.begin(), vertices.end(), edge.from())); + edges.push_back(edge); + edge_count++; + } + permanent_assert(static_cast<int>(edges.size()) == 2, + "There should be two edges."); + + EXPECT_EQ(edge_count, 2); + EXPECT_TRUE(edges[0].to() == edges[1].to()); + EXPECT_FALSE(edges[0].from() == edges[1].from()); +} + +TEST_F(RecoveryTest, TestEncodingAndRecovering) { + // Creates snapshot of the big graph. Uses Recovery to recover graph from + // the snapshot file. After creation graph is tested. + Dbms dbms; + CreateBigGraph(dbms); + TakeSnapshot(dbms, max_retained_snapshots_); + std::string snapshot = GetLatestSnapshot(); + + // New dbms is needed - old dbms has database "default" + Dbms dbms_recover; + auto dba_recover = dbms_recover.active(); + + Recovery recovery; + EXPECT_TRUE(recovery.Recover(snapshot, *dba_recover)); + + auto dba_get = dbms_recover.active(); + int64_t vertex_count = 0; + for (const auto &vertex : dba_get->vertices()) { + EXPECT_EQ(vertex.labels().size(), 1); + EXPECT_TRUE(vertex.has_label(dba_get->label("label"))); + query::TypedValue prop = + query::TypedValue(vertex.PropsAt(dba_get->property("prop"))); + query::TypedValue expected_prop = query::TypedValue(PropertyValue("prop")); + EXPECT_TRUE((prop == expected_prop).Value<bool>()); + vertex_count++; + } + EXPECT_EQ(vertex_count, 1000); + + int64_t edge_count = 0; + for (const auto &edge : dba_get->edges()) { + EXPECT_EQ(edge.edge_type(), dba_get->edge_type("type")); + query::TypedValue prop = + query::TypedValue(edge.PropsAt(dba_get->property("prop"))); + query::TypedValue expected_prop = query::TypedValue(PropertyValue("prop")); + EXPECT_TRUE((prop == expected_prop).Value<bool>()); + edge_count++; + } + EXPECT_EQ(edge_count, 999); + dba_get->commit(); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tests/unit/snapshot.cpp b/tests/unit/snapshot.cpp index 52cd09a0d..99b9c9c11 100644 --- a/tests/unit/snapshot.cpp +++ b/tests/unit/snapshot.cpp @@ -95,6 +95,18 @@ TEST_F(SnapshotTest, CreateSnapshotWithUnlimitedMaxRetainedSnapshots) { EXPECT_EQ(files.size(), 10); } +TEST_F(SnapshotTest, TestSnapshotFileOnDbDestruct) { + { + CONFIG(config::SNAPSHOTS_PATH) = SNAPSHOTS_FOLDER_ALL_DB; + CONFIG(config::SNAPSHOT_DB_DESTRUCTION) = "true"; + Dbms dbms; + auto dba = dbms.active(); + } + std::vector<fs::path> files = GetFilesFromDir(SNAPSHOTS_TEST_DEFAULT_DB_DIR); + // snapshot is created on dbms destruction + EXPECT_EQ(files.size(), 1); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();