Implemented recovery and tests.
Summary: Implemented durability recovery and tesats. Reviewers: mislav.bradac, dgleich, buda Reviewed By: mislav.bradac, dgleich, buda Subscribers: dtomicevic, mferencevic, pullbot Differential Revision: https://phabricator.memgraph.io/D374
This commit is contained in:
parent
7278bdff94
commit
15b57e2d52
@ -343,6 +343,7 @@ set(memgraph_src_files
|
||||
${src_dir}/threading/thread.cpp
|
||||
${src_dir}/mvcc/id.cpp
|
||||
${src_dir}/durability/snapshooter.cpp
|
||||
${src_dir}/durability/recovery.cpp
|
||||
# ${src_dir}/snapshot/snapshot_engine.cpp
|
||||
# ${src_dir}/snapshot/snapshoter.cpp
|
||||
# ${src_dir}/snapshot/snapshot_encoder.cpp
|
||||
|
@ -19,7 +19,11 @@ snapshots_path: "snapshots"
|
||||
cleaning_cycle_sec: "30"
|
||||
|
||||
# snapshot cycle interval
|
||||
snapshot_cycle_sec: "60"
|
||||
# if set to -1 the snapshooter will not run
|
||||
snapshot_cycle_sec: "-1"
|
||||
|
||||
# create snapshot disabled on db destruction
|
||||
snapshot_db_destruction: false
|
||||
|
||||
# max number of snapshots which will be kept on the disk at some point
|
||||
# if set to -1 the max number of snapshots is unlimited
|
||||
@ -27,3 +31,6 @@ max_retained_snapshots: "-1"
|
||||
|
||||
# by default query engine runs in interpret mode
|
||||
interpret: true
|
||||
|
||||
# database recovering is disabled by default
|
||||
recovery: false
|
||||
|
@ -25,8 +25,10 @@ constexpr const char *TEMPLATE_CPP_PATH = "template_cpp_path";
|
||||
constexpr const char *SNAPSHOTS_PATH = "snapshots_path";
|
||||
constexpr const char *CLEANING_CYCLE_SEC = "cleaning_cycle_sec";
|
||||
constexpr const char *SNAPSHOT_CYCLE_SEC = "snapshot_cycle_sec";
|
||||
constexpr const char *SNAPSHOT_DB_DESTRUCTION = "snapshot_db_destruction";
|
||||
constexpr const char *MAX_RETAINED_SNAPSHOTS = "max_retained_snapshots";
|
||||
constexpr const char *INTERPRET = "interpret";
|
||||
constexpr const char *RECOVERY = "recovery";
|
||||
// -- all possible Memgraph's keys --
|
||||
|
||||
inline long long to_int(const std::string &s) { return stoll(s); }
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include "database/creation_exception.hpp"
|
||||
#include "database/graph_db.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "durability/recovery.hpp"
|
||||
#include "logging/logger.hpp"
|
||||
#include "storage/edge.hpp"
|
||||
#include "storage/garbage_collector.hpp"
|
||||
@ -13,7 +14,7 @@ const std::string DEFAULT_SNAPSHOT_FOLDER = "snapshots";
|
||||
const int DEFAULT_MAX_RETAINED_SNAPSHOTS = -1; // unlimited number of snapshots
|
||||
const int DEFAULT_SNAPSHOT_CYCLE_SEC = -1; // off
|
||||
|
||||
GraphDb::GraphDb(const std::string &name, bool import_snapshot)
|
||||
GraphDb::GraphDb(const std::string &name, const fs::path &snapshot_db_dir)
|
||||
: name_(name),
|
||||
gc_vertices_(vertices_, vertex_record_deleter_,
|
||||
vertex_version_list_deleter_),
|
||||
@ -48,27 +49,33 @@ GraphDb::GraphDb(const std::string &name, bool import_snapshot)
|
||||
});
|
||||
}
|
||||
|
||||
// Creating snapshoter
|
||||
RecoverDatabase(snapshot_db_dir);
|
||||
StartSnapshooting();
|
||||
|
||||
}
|
||||
|
||||
void GraphDb::StartSnapshooting() {
|
||||
const std::string max_retained_snapshots_str =
|
||||
CONFIG(config::MAX_RETAINED_SNAPSHOTS);
|
||||
const std::string snapshot_cycle_sec_str =
|
||||
CONFIG(config::MAX_RETAINED_SNAPSHOTS);
|
||||
CONFIG(config::SNAPSHOT_CYCLE_SEC);
|
||||
const std::string snapshot_folder_str = CONFIG(config::SNAPSHOTS_PATH);
|
||||
|
||||
int max_retained_snapshots_ = DEFAULT_MAX_RETAINED_SNAPSHOTS;
|
||||
max_retained_snapshots_ = DEFAULT_MAX_RETAINED_SNAPSHOTS;
|
||||
if (!max_retained_snapshots_str.empty())
|
||||
max_retained_snapshots_ = CONFIG_INTEGER(config::MAX_RETAINED_SNAPSHOTS);
|
||||
|
||||
int snapshot_cycle_sec_ = DEFAULT_SNAPSHOT_CYCLE_SEC;
|
||||
snapshot_cycle_sec_ = DEFAULT_SNAPSHOT_CYCLE_SEC;
|
||||
if (!snapshot_cycle_sec_str.empty())
|
||||
snapshot_cycle_sec_ = CONFIG_INTEGER(config::SNAPSHOT_CYCLE_SEC);
|
||||
|
||||
std::string snapshot_folder_ = DEFAULT_SNAPSHOT_FOLDER;
|
||||
snapshot_folder_ = DEFAULT_SNAPSHOT_FOLDER;
|
||||
if (!snapshot_folder_str.empty()) snapshot_folder_ = snapshot_folder_str;
|
||||
|
||||
snapshot_db_destruction_ = CONFIG_BOOL(config::SNAPSHOT_DB_DESTRUCTION);
|
||||
|
||||
if (snapshot_cycle_sec_ != -1) {
|
||||
auto create_snapshot = [this, snapshot_folder_,
|
||||
max_retained_snapshots_]() -> void {
|
||||
auto create_snapshot = [this]() -> void {
|
||||
GraphDbAccessor db_accessor(*this);
|
||||
snapshooter_.MakeSnapshot(db_accessor, fs::path(snapshot_folder_) / name_,
|
||||
max_retained_snapshots_);
|
||||
@ -78,6 +85,22 @@ GraphDb::GraphDb(const std::string &name, bool import_snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
void GraphDb::RecoverDatabase(const fs::path &snapshot_db_dir) {
|
||||
if (snapshot_db_dir.empty()) return;
|
||||
std::vector<fs::path> snapshots;
|
||||
for (auto &snapshot_file : fs::directory_iterator(snapshot_db_dir))
|
||||
snapshots.push_back(snapshot_file);
|
||||
|
||||
std::sort(snapshots.rbegin(), snapshots.rend());
|
||||
Recovery recovery;
|
||||
for (auto &snapshot_file : snapshots) {
|
||||
GraphDbAccessor db_accessor(*this);
|
||||
if (recovery.Recover(snapshot_file.string(), db_accessor)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
GraphDb::~GraphDb() {
|
||||
// Stop the gc scheduler to not run into race conditions for deletions.
|
||||
gc_scheduler_.Stop();
|
||||
@ -86,6 +109,13 @@ GraphDb::~GraphDb() {
|
||||
// deleted.
|
||||
snapshot_creator_.Stop();
|
||||
|
||||
// Create last database snapshot
|
||||
if (snapshot_db_destruction_) {
|
||||
GraphDbAccessor db_accessor(*this);
|
||||
snapshooter_.MakeSnapshot(db_accessor, fs::path(snapshot_folder_) / name_,
|
||||
max_retained_snapshots_);
|
||||
}
|
||||
|
||||
// Delete vertices and edges which weren't collected before, also deletes
|
||||
// records inside version list
|
||||
for (auto &vertex : this->vertices_.access()) delete vertex;
|
||||
|
@ -20,6 +20,8 @@
|
||||
#include "transactions/engine.hpp"
|
||||
#include "utils/scheduler.hpp"
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
// TODO: Maybe split this in another layer between Db and Dbms. Where the new
|
||||
// layer would hold SnapshotEngine and his kind of concept objects. Some
|
||||
// guidelines would be: retain objects which are necessary to implement querys
|
||||
@ -39,7 +41,7 @@ class GraphDb {
|
||||
* @param import_snapshot will in constructor import latest snapshot
|
||||
* into the db.
|
||||
*/
|
||||
GraphDb(const std::string &name, bool import_snapshot = true);
|
||||
GraphDb(const std::string &name, const fs::path &snapshot_db_dir);
|
||||
/**
|
||||
* @brief - Destruct database object. Delete all vertices and edges and free
|
||||
* all deferred deleters.
|
||||
@ -51,6 +53,17 @@ class GraphDb {
|
||||
*/
|
||||
GraphDb(const GraphDb &db) = delete;
|
||||
|
||||
/**
|
||||
* Starts database snapshooting.
|
||||
*/
|
||||
void StartSnapshooting();
|
||||
|
||||
/**
|
||||
* Recovers database from a snapshot file and starts snapshooting.
|
||||
* @param snapshot_db path to snapshot folder
|
||||
*/
|
||||
void RecoverDatabase(const fs::path &snapshot_db_path);
|
||||
|
||||
/** transaction engine related to this database */
|
||||
tx::Engine tx_engine;
|
||||
|
||||
@ -58,9 +71,6 @@ class GraphDb {
|
||||
// TODO bring back garbage collection
|
||||
// Garbage garbage = {tx_engine};
|
||||
|
||||
// TODO bring back shapshot engine
|
||||
// SnapshotEngine snap_engine = {*this};
|
||||
|
||||
// database name
|
||||
// TODO consider if this is even necessary
|
||||
const std::string name_;
|
||||
@ -94,6 +104,10 @@ class GraphDb {
|
||||
|
||||
// snapshooter
|
||||
Snapshooter snapshooter_;
|
||||
std::string snapshot_folder_;
|
||||
int max_retained_snapshots_;
|
||||
int snapshot_cycle_sec_;
|
||||
bool snapshot_db_destruction_;
|
||||
|
||||
// Schedulers
|
||||
Scheduler<std::mutex> gc_scheduler_;
|
||||
|
@ -5,13 +5,14 @@ std::unique_ptr<GraphDbAccessor> Dbms::active() {
|
||||
*active_db.load(std::memory_order_acquire));
|
||||
}
|
||||
|
||||
std::unique_ptr<GraphDbAccessor> Dbms::active(const std::string &name) {
|
||||
std::unique_ptr<GraphDbAccessor> Dbms::active(const std::string &name,
|
||||
const fs::path &snapshot_db_dir) {
|
||||
auto acc = dbs.access();
|
||||
// create db if it doesn't exist
|
||||
auto it = acc.find(name);
|
||||
if (it == acc.end()) {
|
||||
it = acc.emplace(name, std::forward_as_tuple(name),
|
||||
std::forward_as_tuple(name))
|
||||
std::forward_as_tuple(name, snapshot_db_dir))
|
||||
.first;
|
||||
}
|
||||
|
||||
|
@ -1,18 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "config/config.hpp"
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "database/graph_db.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "durability/recovery.hpp"
|
||||
|
||||
//#include "dbms/cleaner.hpp"
|
||||
//#include "snapshot/snapshoter.hpp"
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
const std::string DEFAULT_SNAPSHOT_FOLDER = "snapshots";
|
||||
|
||||
class Dbms {
|
||||
public:
|
||||
Dbms() {
|
||||
if (CONFIG_BOOL(config::RECOVERY)) {
|
||||
auto accessor = dbs.access();
|
||||
std::string snapshot_folder = CONFIG(config::SNAPSHOTS_PATH);
|
||||
if (snapshot_folder.empty()) snapshot_folder = DEFAULT_SNAPSHOT_FOLDER;
|
||||
for (auto &snapshot_db : fs::directory_iterator(snapshot_folder)) {
|
||||
// create db and set it active
|
||||
active(snapshot_db.path().filename(), snapshot_db);
|
||||
}
|
||||
}
|
||||
|
||||
// create the default database and set is a active
|
||||
active("default");
|
||||
}
|
||||
@ -25,11 +40,13 @@ class Dbms {
|
||||
/**
|
||||
* Set the database with the given name to be active.
|
||||
* If there is no database with the given name,
|
||||
* it's created.
|
||||
* it's created. If snapshooting is true, snapshooter starts
|
||||
* snapshooting on database creation.
|
||||
*
|
||||
* @return an accessor to the database with the given name.
|
||||
*/
|
||||
std::unique_ptr<GraphDbAccessor> active(const std::string &name);
|
||||
std::unique_ptr<GraphDbAccessor> active(
|
||||
const std::string &name, const fs::path &snapshot_db_dir = fs::path());
|
||||
|
||||
// TODO: DELETE action
|
||||
|
||||
|
97
src/durability/file_reader_buffer.hpp
Normal file
97
src/durability/file_reader_buffer.hpp
Normal file
@ -0,0 +1,97 @@
|
||||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include "durability/summary.hpp"
|
||||
#include "hasher.hpp"
|
||||
#include "utils/bswap.hpp"
|
||||
|
||||
/**
|
||||
* Buffer reads data from file and calculates hash of read data. Implements
|
||||
* template param Buffer interface from BaseDecoder class. Should be closed
|
||||
* before destructing.
|
||||
*/
|
||||
class FileReaderBuffer {
|
||||
public:
|
||||
/**
|
||||
* Opens ifstream to a file, resets hash and reads summary. Returns false if
|
||||
* opening fails.
|
||||
* @param file:
|
||||
* path to a file to which should be read.
|
||||
* @param summary:
|
||||
* reference to a summary object where summary should be written.
|
||||
*/
|
||||
bool Open(const std::string &file, snapshot::Summary &summary) {
|
||||
input_stream_.open(file);
|
||||
if (input_stream_.fail()) return false;
|
||||
return ReadSummary(summary);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes ifstream. Returns false if closing fails.
|
||||
*/
|
||||
bool Close() {
|
||||
input_stream_.close();
|
||||
return !input_stream_.fail();
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads data from stream.
|
||||
* @param data:
|
||||
* pointer where data should be stored.
|
||||
* @param n:
|
||||
* data length.
|
||||
*/
|
||||
bool Read(uint8_t *data, size_t n) {
|
||||
input_stream_.read(reinterpret_cast<char *>(data), n);
|
||||
if (input_stream_.fail()) return false;
|
||||
hasher_.Update(data, n);
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* Returns hash of read data.
|
||||
*/
|
||||
uint64_t hash() const { return hasher_.hash(); }
|
||||
|
||||
private:
|
||||
/**
|
||||
* Reads type T from buffer. Data is written in buffer in big endian format.
|
||||
* Expected system endianness is little endian.
|
||||
*/
|
||||
template <typename T>
|
||||
bool ReadType(T &val) {
|
||||
if (!Read(reinterpret_cast<uint8_t *>(&val), sizeof(T))) return false;
|
||||
// TODO: must be platform specific in the future
|
||||
val = bswap(val);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads summary from the end of a file and resets hash. Method should be
|
||||
* called after ifstream opening. Stream starts reading data from the
|
||||
* beginning of file in the next read call. Returns false if reading fails.
|
||||
* @param summary:
|
||||
* reference to a summary object where summary should be written.
|
||||
*/
|
||||
bool ReadSummary(snapshot::Summary &summary) {
|
||||
debug_assert(input_stream_.tellg() == 0,
|
||||
"Summary should be read before other data!");
|
||||
input_stream_.seekg(-static_cast<int64_t>(sizeof(snapshot::Summary)),
|
||||
std::ios::end);
|
||||
if (input_stream_.fail()) return false;
|
||||
if (!ReadType(summary.vertex_num_) || !ReadType(summary.edge_num_) ||
|
||||
!ReadType(summary.hash_))
|
||||
return false;
|
||||
input_stream_.seekg(0, std::ios::beg);
|
||||
hasher_.Reset();
|
||||
return !input_stream_.fail();
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for calculating hash of read data.
|
||||
*/
|
||||
Hasher hasher_;
|
||||
/**
|
||||
* Ifstream used for reading from file.
|
||||
*/
|
||||
std::ifstream input_stream_;
|
||||
};
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <fstream>
|
||||
#include "utils/bswap.hpp"
|
||||
#include "hasher.hpp"
|
||||
|
||||
/**
|
||||
* Buffer that writes data to file and calculates hash of written data.
|
||||
@ -40,7 +41,7 @@ class FileWriterBuffer {
|
||||
* data length.
|
||||
*/
|
||||
void Write(const uint8_t *data, size_t n) {
|
||||
UpdateHash(data, n);
|
||||
hasher_.Update(data, n);
|
||||
output_stream_.write(reinterpret_cast<const char *>(data), n);
|
||||
}
|
||||
/**
|
||||
@ -62,18 +63,10 @@ class FileWriterBuffer {
|
||||
debug_assert(vertex_num >= 0, "Number of edges should't be negative");
|
||||
WriteLong(vertex_num);
|
||||
WriteLong(edge_num);
|
||||
WriteLong(hash_);
|
||||
WriteLong(hasher_.hash());
|
||||
}
|
||||
|
||||
private:
|
||||
/**
|
||||
* Hash function is H(n) = H(n-1) * prime + data where data is unsigned char.
|
||||
* TODO implement different hash function
|
||||
*/
|
||||
void UpdateHash(const uint8_t *data, size_t n) {
|
||||
for (int i = 0; i < n; ++i) hash_ = hash_ * kPrime + data[i] + 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method writes uint64_t to ofstream.
|
||||
*/
|
||||
@ -88,11 +81,7 @@ class FileWriterBuffer {
|
||||
*/
|
||||
std::ofstream output_stream_;
|
||||
/**
|
||||
* Represents hash of current data.
|
||||
* Used to calculate hash of written data.
|
||||
*/
|
||||
uint64_t hash_ = 0;
|
||||
/**
|
||||
* Prime number used for hashing.
|
||||
*/
|
||||
const uint64_t kPrime = 3137;
|
||||
Hasher hasher_;
|
||||
};
|
||||
|
37
src/durability/hasher.hpp
Normal file
37
src/durability/hasher.hpp
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
|
||||
// TODO: implement better hash function
|
||||
|
||||
/**
|
||||
* Class calculates hash of the data dynamically.
|
||||
*/
|
||||
class Hasher {
|
||||
public:
|
||||
Hasher() = default;
|
||||
/**
|
||||
* Sets hash to 0.
|
||||
*/
|
||||
void Reset() { hash_ = 0; }
|
||||
/**
|
||||
* Updates hash from given data.
|
||||
* @param data data from which hash will be updated
|
||||
* @param n length of the data
|
||||
*/
|
||||
void Update(const uint8_t *data, size_t n) {
|
||||
for (int i = 0; i < n; ++i) hash_ = hash_ * kPrime + data[i] + 1;
|
||||
}
|
||||
/**
|
||||
* Returns current hash value.
|
||||
*/
|
||||
uint64_t hash() const { return hash_; }
|
||||
|
||||
private:
|
||||
/**
|
||||
* Prime number used in calculating hash.
|
||||
*/
|
||||
const uint64_t kPrime = 3137;
|
||||
/**
|
||||
* Hash of data.
|
||||
*/
|
||||
uint64_t hash_ = 0;
|
||||
};
|
67
src/durability/recovery.cpp
Normal file
67
src/durability/recovery.cpp
Normal file
@ -0,0 +1,67 @@
|
||||
#include "durability/recovery.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoder.hpp"
|
||||
#include "durability/file_reader_buffer.hpp"
|
||||
|
||||
bool Recovery::Recover(const fs::path &snapshot_file,
|
||||
GraphDbAccessor &db_accessor) {
|
||||
if (!fs::exists(snapshot_file)) return false;
|
||||
if (!Decode(snapshot_file, db_accessor)) {
|
||||
db_accessor.abort();
|
||||
return false;
|
||||
}
|
||||
db_accessor.commit();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Recovery::Decode(const fs::path &snapshot_file,
|
||||
GraphDbAccessor &db_accessor) {
|
||||
FileReaderBuffer buffer;
|
||||
communication::bolt::Decoder<FileReaderBuffer> decoder(buffer);
|
||||
|
||||
snapshot::Summary summary;
|
||||
if (!buffer.Open(snapshot_file, summary)) {
|
||||
buffer.Close();
|
||||
return false;
|
||||
}
|
||||
std::unordered_map<uint64_t, VertexAccessor> vertices;
|
||||
|
||||
for (int64_t i = 0; i < summary.vertex_num_; ++i) {
|
||||
communication::bolt::DecodedVertex vertex;
|
||||
if (!decoder.ReadVertex(&vertex)) {
|
||||
buffer.Close();
|
||||
return false;
|
||||
}
|
||||
auto vertex_accessor = db_accessor.insert_vertex();
|
||||
for (const auto &label : vertex.labels) {
|
||||
vertex_accessor.add_label(db_accessor.label(label));
|
||||
}
|
||||
for (const auto &property_pair : vertex.properties) {
|
||||
vertex_accessor.PropsSet(db_accessor.property(property_pair.first),
|
||||
property_pair.second);
|
||||
}
|
||||
vertices.insert({vertex.id, vertex_accessor});
|
||||
}
|
||||
for (int64_t i = 0; i < summary.edge_num_; ++i) {
|
||||
communication::bolt::DecodedEdge edge;
|
||||
if (!decoder.ReadEdge(&edge)) {
|
||||
buffer.Close();
|
||||
return false;
|
||||
}
|
||||
auto it_from = vertices.find(edge.from);
|
||||
auto it_to = vertices.find(edge.to);
|
||||
if (it_from == vertices.end() || it_to == vertices.end()) {
|
||||
buffer.Close();
|
||||
return false;
|
||||
}
|
||||
auto edge_accessor = db_accessor.insert_edge(
|
||||
it_from->second, it_to->second, db_accessor.edge_type(edge.type));
|
||||
|
||||
for (const auto &property_pair : edge.properties)
|
||||
edge_accessor.PropsSet(db_accessor.property(property_pair.first),
|
||||
property_pair.second);
|
||||
}
|
||||
|
||||
uint64_t hash = buffer.hash();
|
||||
if (!buffer.Close()) return false;
|
||||
return hash == summary.hash_;
|
||||
}
|
33
src/durability/recovery.hpp
Normal file
33
src/durability/recovery.hpp
Normal file
@ -0,0 +1,33 @@
|
||||
#pragma once
|
||||
|
||||
#include <experimental/filesystem>
|
||||
#include <unordered_map>
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "storage/vertex_accessor.hpp"
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
/**
|
||||
* Class used to recover database from snapshot file.
|
||||
*/
|
||||
class Recovery {
|
||||
public:
|
||||
/**
|
||||
* Recovers database from snapshot_file. Graph elements are inserted
|
||||
* in graph using db_accessor. If recovering fails, false is returned and
|
||||
* db_accessor aborts transaction, else true is returned and transaction is
|
||||
* commited.
|
||||
* @param snapshot_file:
|
||||
* path to snapshot file
|
||||
* @param db_accessor:
|
||||
* GraphDbAccessor used to access database.
|
||||
*/
|
||||
bool Recover(const fs::path &snapshot_file, GraphDbAccessor &db_accessor);
|
||||
|
||||
private:
|
||||
/**
|
||||
* Decodes database from snapshot_file. Graph emlements are inserted in
|
||||
* graph using db_accessor. If decoding fails, false is returned, else ture.
|
||||
*/
|
||||
bool Decode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor);
|
||||
};
|
12
src/durability/summary.hpp
Normal file
12
src/durability/summary.hpp
Normal file
@ -0,0 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
namespace snapshot {
|
||||
/**
|
||||
* Struct represents graph summary in a snapshot file.
|
||||
*/
|
||||
struct Summary {
|
||||
int64_t vertex_num_ = 0LL;
|
||||
int64_t edge_num_ = 0LL;
|
||||
uint64_t hash_ = 0ULL;
|
||||
};
|
||||
}
|
100
tests/unit/dbms_recovery.cpp
Normal file
100
tests/unit/dbms_recovery.cpp
Normal file
@ -0,0 +1,100 @@
|
||||
#include <experimental/filesystem>
|
||||
#include "dbms/dbms.hpp"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
const fs::path SNAPSHOTS_DBMS_RECOVERY_ALL_DB = std::tmpnam(nullptr);
|
||||
const fs::path SNAPSHOTS_DBMS_RECOVERY_DEFAULT_DB_DIR =
|
||||
SNAPSHOTS_DBMS_RECOVERY_ALL_DB / "default";
|
||||
|
||||
std::vector<fs::path> GetFilesFromDir(
|
||||
const std::string &snapshots_default_db_dir) {
|
||||
std::vector<fs::path> files;
|
||||
for (auto &file : fs::directory_iterator(snapshots_default_db_dir))
|
||||
files.push_back(file.path());
|
||||
return files;
|
||||
}
|
||||
|
||||
void CleanDbDir() {
|
||||
if (!fs::exists(SNAPSHOTS_DBMS_RECOVERY_DEFAULT_DB_DIR)) return;
|
||||
std::vector<fs::path> files =
|
||||
GetFilesFromDir(SNAPSHOTS_DBMS_RECOVERY_DEFAULT_DB_DIR);
|
||||
for (auto file : files) fs::remove(file);
|
||||
}
|
||||
|
||||
class DbmsRecoveryTest : public ::testing::Test {
|
||||
protected:
|
||||
virtual void TearDown() {
|
||||
CleanDbDir();
|
||||
CONFIG(config::SNAPSHOTS_PATH) = snapshots_path_setup_;
|
||||
CONFIG(config::SNAPSHOT_CYCLE_SEC) = snapshot_cycle_sec_setup_;
|
||||
}
|
||||
|
||||
virtual void SetUp() {
|
||||
CleanDbDir();
|
||||
snapshots_path_setup_ = CONFIG(config::SNAPSHOTS_PATH);
|
||||
snapshot_cycle_sec_setup_ = CONFIG(config::SNAPSHOT_CYCLE_SEC);
|
||||
CONFIG(config::SNAPSHOTS_PATH) = SNAPSHOTS_DBMS_RECOVERY_ALL_DB;
|
||||
CONFIG(config::SNAPSHOT_CYCLE_SEC) = "-1";
|
||||
}
|
||||
std::string snapshots_path_setup_;
|
||||
std::string snapshot_cycle_sec_setup_;
|
||||
};
|
||||
|
||||
void CreateSnapshot() {
|
||||
CONFIG(config::RECOVERY) = "false";
|
||||
Dbms dbms;
|
||||
auto dba = dbms.active();
|
||||
|
||||
// setup (v1) - [:likes] -> (v2) <- [:hates] - (v3)
|
||||
auto va1 = dba->insert_vertex();
|
||||
auto va2 = dba->insert_vertex();
|
||||
dba->insert_edge(va1, va2, dba->edge_type("likes"));
|
||||
auto va3 = dba->insert_vertex();
|
||||
dba->insert_edge(va3, va2, dba->edge_type("hates"));
|
||||
dba->advance_command();
|
||||
|
||||
Snapshooter snapshooter;
|
||||
snapshooter.MakeSnapshot(*dba.get(), SNAPSHOTS_DBMS_RECOVERY_DEFAULT_DB_DIR,
|
||||
1);
|
||||
}
|
||||
|
||||
void RecoverDbms() {
|
||||
CONFIG(config::RECOVERY) = "true";
|
||||
Dbms dbms;
|
||||
auto dba = dbms.active();
|
||||
|
||||
std::vector<VertexAccessor> vertices;
|
||||
std::vector<EdgeAccessor> edges;
|
||||
|
||||
int vertex_count = 0;
|
||||
for (auto const &vertex : dba->vertices()) {
|
||||
vertices.push_back(vertex);
|
||||
vertex_count++;
|
||||
}
|
||||
EXPECT_EQ(vertex_count, 3);
|
||||
|
||||
int edge_count = 0;
|
||||
for (auto const &edge : dba->edges()) {
|
||||
EXPECT_NE(vertices.end(),
|
||||
std::find(vertices.begin(), vertices.end(), edge.to()));
|
||||
EXPECT_NE(vertices.end(),
|
||||
std::find(vertices.begin(), vertices.end(), edge.from()));
|
||||
edges.push_back(edge);
|
||||
edge_count++;
|
||||
}
|
||||
EXPECT_EQ(edge_count, 2);
|
||||
EXPECT_EQ(edges[0].to() == edges[1].to(), true);
|
||||
EXPECT_EQ(edges[0].from() == edges[1].from(), false);
|
||||
}
|
||||
|
||||
TEST_F(DbmsRecoveryTest, TestDbmsRecovery) {
|
||||
CreateSnapshot();
|
||||
RecoverDbms();
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
231
tests/unit/recovery.cpp
Normal file
231
tests/unit/recovery.cpp
Normal file
@ -0,0 +1,231 @@
|
||||
#include "durability/recovery.hpp"
|
||||
#include <cstdio>
|
||||
#include <experimental/filesystem>
|
||||
#include "communication/bolt/v1/decoder/decoder.hpp"
|
||||
#include "dbms/dbms.hpp"
|
||||
#include "durability/file_reader_buffer.hpp"
|
||||
#include "gtest/gtest.h"
|
||||
#include "utils/assert.hpp"
|
||||
|
||||
namespace fs = std::experimental::filesystem;
|
||||
|
||||
const fs::path SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR = std::tmpnam(nullptr);
|
||||
|
||||
std::vector<fs::path> GetFilesFromDir(
|
||||
const std::string &snapshots_default_db_dir) {
|
||||
std::vector<fs::path> files;
|
||||
for (auto &file : fs::directory_iterator(snapshots_default_db_dir))
|
||||
files.push_back(file.path());
|
||||
return files;
|
||||
}
|
||||
|
||||
void CleanDbDir() {
|
||||
if (!fs::exists(SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR)) return;
|
||||
std::vector<fs::path> files =
|
||||
GetFilesFromDir(SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR);
|
||||
for (auto file : files) {
|
||||
fs::remove(file);
|
||||
}
|
||||
}
|
||||
|
||||
class RecoveryTest : public ::testing::Test {
|
||||
protected:
|
||||
void TearDown() override {
|
||||
CleanDbDir();
|
||||
CONFIG(config::SNAPSHOT_CYCLE_SEC) = snapshot_cycle_sec_setup_;
|
||||
}
|
||||
|
||||
void SetUp() override {
|
||||
CleanDbDir();
|
||||
snapshot_cycle_sec_setup_ = CONFIG(config::SNAPSHOT_CYCLE_SEC);
|
||||
CONFIG(config::SNAPSHOT_CYCLE_SEC) = "-1";
|
||||
}
|
||||
std::string snapshot_cycle_sec_setup_;
|
||||
const int max_retained_snapshots_ = 10;
|
||||
};
|
||||
|
||||
void CreateSmallGraph(Dbms &dbms) {
|
||||
auto dba = dbms.active();
|
||||
|
||||
// setup (v1) - [:likes] -> (v2) <- [:hates] - (v3)
|
||||
auto va1 = dba->insert_vertex();
|
||||
auto va2 = dba->insert_vertex();
|
||||
dba->insert_edge(va1, va2, dba->edge_type("likes"));
|
||||
auto va3 = dba->insert_vertex();
|
||||
dba->insert_edge(va3, va2, dba->edge_type("hates"));
|
||||
dba->commit();
|
||||
}
|
||||
|
||||
void CreateBigGraph(Dbms &dbms) {
|
||||
// creates graph with one inner vertex connected with other 999 outer vertices
|
||||
// relationships are directed from outer vertices to the inner vertex
|
||||
// every vertex hash label "label" and property "prop" with value "prop"
|
||||
// every relationship has type "type" and property "prop" with value "prop"
|
||||
auto dba = dbms.active();
|
||||
auto va_middle = dba->insert_vertex();
|
||||
va_middle.add_label(dba->label("label"));
|
||||
va_middle.PropsSet(dba->property("prop"), "prop");
|
||||
for (int i = 1; i < 1000; ++i) {
|
||||
auto va = dba->insert_vertex();
|
||||
va.add_label(dba->label("label"));
|
||||
va.PropsSet(dba->property("prop"), "prop");
|
||||
auto ea = dba->insert_edge(va, va_middle, dba->edge_type("type"));
|
||||
ea.PropsSet(dba->property("prop"), "prop");
|
||||
}
|
||||
dba->commit();
|
||||
}
|
||||
|
||||
void TakeSnapshot(Dbms &dbms, int max_retained_snapshots_) {
|
||||
auto dba = dbms.active();
|
||||
Snapshooter snapshooter;
|
||||
snapshooter.MakeSnapshot(*dba.get(), SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR,
|
||||
max_retained_snapshots_);
|
||||
}
|
||||
|
||||
std::string GetLatestSnapshot() {
|
||||
std::vector<fs::path> files =
|
||||
GetFilesFromDir(SNAPSHOTS_RECOVERY_DEFAULT_DB_DIR);
|
||||
permanent_assert(static_cast<int>(files.size()) == 1,
|
||||
"No snapshot files in folder.");
|
||||
std::sort(files.rbegin(), files.rend());
|
||||
return files[0];
|
||||
}
|
||||
|
||||
TEST_F(RecoveryTest, TestEncoding) {
|
||||
// Creates snapshot of the small graph. Uses file_reader_buffer and bolt
|
||||
// decoder to read data from the snapshot and reads graph from it. After
|
||||
// reading graph is tested.
|
||||
Dbms dbms;
|
||||
CreateSmallGraph(dbms);
|
||||
TakeSnapshot(dbms, max_retained_snapshots_);
|
||||
std::string snapshot = GetLatestSnapshot();
|
||||
|
||||
FileReaderBuffer buffer;
|
||||
communication::bolt::Decoder<FileReaderBuffer> decoder(buffer);
|
||||
|
||||
snapshot::Summary summary;
|
||||
buffer.Open(snapshot, summary);
|
||||
|
||||
std::vector<int64_t> ids;
|
||||
std::vector<std::string> edge_types;
|
||||
|
||||
for (int i = 0; i < summary.vertex_num_; ++i) {
|
||||
communication::bolt::DecodedVertex vertex;
|
||||
decoder.ReadVertex(&vertex);
|
||||
ids.push_back(vertex.id);
|
||||
}
|
||||
std::vector<int> from, to;
|
||||
for (int i = 0; i < summary.edge_num_; ++i) {
|
||||
communication::bolt::DecodedEdge edge;
|
||||
decoder.ReadEdge(&edge);
|
||||
from.push_back(edge.from);
|
||||
to.push_back(edge.to);
|
||||
edge_types.push_back(edge.type);
|
||||
}
|
||||
buffer.Close();
|
||||
|
||||
permanent_assert(static_cast<int>(to.size()) == 2,
|
||||
"There should be two edges.");
|
||||
permanent_assert(static_cast<int>(from.size()) == 2,
|
||||
"There should be two edges.");
|
||||
|
||||
EXPECT_EQ(buffer.hash(), summary.hash_);
|
||||
EXPECT_NE(edge_types.end(),
|
||||
std::find(edge_types.begin(), edge_types.end(), "hates"));
|
||||
EXPECT_NE(edge_types.end(),
|
||||
std::find(edge_types.begin(), edge_types.end(), "likes"));
|
||||
EXPECT_EQ(to[0], to[1]);
|
||||
EXPECT_NE(from[0], from[1]);
|
||||
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), to[0]));
|
||||
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), from[0]));
|
||||
EXPECT_NE(ids.end(), std::find(ids.begin(), ids.end(), from[1]));
|
||||
}
|
||||
|
||||
TEST_F(RecoveryTest, TestEncodingAndDecoding) {
|
||||
// Creates snapshot of the small graph. Uses Recovery to recover graph from
|
||||
// the snapshot file. After creation graph is tested.
|
||||
Dbms dbms;
|
||||
CreateSmallGraph(dbms);
|
||||
TakeSnapshot(dbms, max_retained_snapshots_);
|
||||
std::string snapshot = GetLatestSnapshot();
|
||||
|
||||
// New dbms is needed - old dbms has database "default"
|
||||
Dbms dbms_recover;
|
||||
auto dba_recover = dbms_recover.active();
|
||||
|
||||
Recovery recovery;
|
||||
EXPECT_TRUE(recovery.Recover(snapshot, *dba_recover));
|
||||
|
||||
std::vector<VertexAccessor> vertices;
|
||||
std::vector<EdgeAccessor> edges;
|
||||
|
||||
auto dba = dbms_recover.active();
|
||||
int64_t vertex_count = 0;
|
||||
for (const auto &vertex : dba->vertices()) {
|
||||
vertices.push_back(vertex);
|
||||
vertex_count++;
|
||||
}
|
||||
EXPECT_EQ(vertex_count, 3);
|
||||
|
||||
int64_t edge_count = 0;
|
||||
for (const auto &edge : dba->edges()) {
|
||||
EXPECT_NE(vertices.end(),
|
||||
std::find(vertices.begin(), vertices.end(), edge.to()));
|
||||
EXPECT_NE(vertices.end(),
|
||||
std::find(vertices.begin(), vertices.end(), edge.from()));
|
||||
edges.push_back(edge);
|
||||
edge_count++;
|
||||
}
|
||||
permanent_assert(static_cast<int>(edges.size()) == 2,
|
||||
"There should be two edges.");
|
||||
|
||||
EXPECT_EQ(edge_count, 2);
|
||||
EXPECT_TRUE(edges[0].to() == edges[1].to());
|
||||
EXPECT_FALSE(edges[0].from() == edges[1].from());
|
||||
}
|
||||
|
||||
TEST_F(RecoveryTest, TestEncodingAndRecovering) {
|
||||
// Creates snapshot of the big graph. Uses Recovery to recover graph from
|
||||
// the snapshot file. After creation graph is tested.
|
||||
Dbms dbms;
|
||||
CreateBigGraph(dbms);
|
||||
TakeSnapshot(dbms, max_retained_snapshots_);
|
||||
std::string snapshot = GetLatestSnapshot();
|
||||
|
||||
// New dbms is needed - old dbms has database "default"
|
||||
Dbms dbms_recover;
|
||||
auto dba_recover = dbms_recover.active();
|
||||
|
||||
Recovery recovery;
|
||||
EXPECT_TRUE(recovery.Recover(snapshot, *dba_recover));
|
||||
|
||||
auto dba_get = dbms_recover.active();
|
||||
int64_t vertex_count = 0;
|
||||
for (const auto &vertex : dba_get->vertices()) {
|
||||
EXPECT_EQ(vertex.labels().size(), 1);
|
||||
EXPECT_TRUE(vertex.has_label(dba_get->label("label")));
|
||||
query::TypedValue prop =
|
||||
query::TypedValue(vertex.PropsAt(dba_get->property("prop")));
|
||||
query::TypedValue expected_prop = query::TypedValue(PropertyValue("prop"));
|
||||
EXPECT_TRUE((prop == expected_prop).Value<bool>());
|
||||
vertex_count++;
|
||||
}
|
||||
EXPECT_EQ(vertex_count, 1000);
|
||||
|
||||
int64_t edge_count = 0;
|
||||
for (const auto &edge : dba_get->edges()) {
|
||||
EXPECT_EQ(edge.edge_type(), dba_get->edge_type("type"));
|
||||
query::TypedValue prop =
|
||||
query::TypedValue(edge.PropsAt(dba_get->property("prop")));
|
||||
query::TypedValue expected_prop = query::TypedValue(PropertyValue("prop"));
|
||||
EXPECT_TRUE((prop == expected_prop).Value<bool>());
|
||||
edge_count++;
|
||||
}
|
||||
EXPECT_EQ(edge_count, 999);
|
||||
dba_get->commit();
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -95,6 +95,18 @@ TEST_F(SnapshotTest, CreateSnapshotWithUnlimitedMaxRetainedSnapshots) {
|
||||
EXPECT_EQ(files.size(), 10);
|
||||
}
|
||||
|
||||
TEST_F(SnapshotTest, TestSnapshotFileOnDbDestruct) {
|
||||
{
|
||||
CONFIG(config::SNAPSHOTS_PATH) = SNAPSHOTS_FOLDER_ALL_DB;
|
||||
CONFIG(config::SNAPSHOT_DB_DESTRUCTION) = "true";
|
||||
Dbms dbms;
|
||||
auto dba = dbms.active();
|
||||
}
|
||||
std::vector<fs::path> files = GetFilesFromDir(SNAPSHOTS_TEST_DEFAULT_DB_DIR);
|
||||
// snapshot is created on dbms destruction
|
||||
EXPECT_EQ(files.size(), 1);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
|
Loading…
Reference in New Issue
Block a user