Extracted from snapshoter => snapshot_engine.

This commit is contained in:
Kruno Tomola Fabro 2016-09-09 18:48:15 +01:00
parent 911eeff1d9
commit cf497dd522
10 changed files with 322 additions and 260 deletions

View File

@ -435,6 +435,7 @@ set(memgraph_src_files
${src_dir}/communication/bolt/v1/serialization/bolt_serializer.cpp
${src_dir}/threading/thread.cpp
${src_dir}/mvcc/id.cpp
${src_dir}/snapshot/snapshot_engine.cpp
${src_dir}/snapshot/snapshoter.cpp
${src_dir}/snapshot/snapshot_encoder.cpp
${src_dir}/snapshot/snapshot_decoder.cpp

View File

@ -3,6 +3,7 @@
#include "storage/type_group_edge.hpp"
#include "storage/type_group_vertex.hpp"
#include "snapshot/snapshot_engine.hpp"
#include "storage/garbage/garbage.hpp"
#include "storage/graph.hpp"
#include "transactions/engine.hpp"
@ -17,17 +18,16 @@ public:
Db();
Db(const std::string &name);
// Loads newest snapshot
Db(const std::string &name, Snapshoter &snap);
Db(const Db &db) = delete;
Graph graph;
tx::Engine tx_engine;
Garbage garbage;
SnapshotEngine snap_engine;
std::string const &name() const;
Indexes indexes();
Indexes indexes(); // TODO join into Db
// INDEXES

View File

@ -32,6 +32,5 @@ private:
Cleaning cleaning = {dbs, CONFIG_INTEGER(config::CLEANING_CYCLE_SEC)};
Snapshoter snapshoter = {dbs, CONFIG_INTEGER(config::SNAPSHOT_CYCLE_SEC),
CONFIG(config::SNAPSHOTS_PATH)};
Snapshoter snapshoter = {dbs, CONFIG_INTEGER(config::SNAPSHOT_CYCLE_SEC)};
};

View File

@ -0,0 +1,58 @@
#pragma once
#include <mutex>
#include <unordered_map>
#include "logging/default.hpp"
#include "transactions/transaction.hpp"
class SnapshotEncoder;
class SnapshotDecoder;
class Db;
class DbTransaction;
// Captures snapshots. Only one per database should exist.
class SnapshotEngine
{
public:
SnapshotEngine(Db &db, std::string const &name);
~SnapshotEngine() = default;
// Returns number of succesffuly created snapshots.
size_t snapshoted_no() { return snapshoted_no_v.load(); }
// Imports latest snapshot into the databse. Blocks until other calls don't
// end.
bool import();
// Makes snapshot of given type. Blocks until other calls don't end.
bool make_snapshot();
private:
// Makes snapshot of given type
bool make_snapshot(std::time_t now, const char *type);
// Makes snapshot. It only saves records which have changed since old_trans.
void snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionId const &old_trans);
// Loads snapshot. True if success
bool snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap);
std::string snapshot_file(std::time_t const &now, const char *type);
std::string snapshot_commit_file();
// Path to directory of database. Ensures that all necessary directorys
// exist.
std::string snapshot_db_dir();
Logger logger;
Db &db;
std::mutex guard;
const std::string snapshot_folder;
std::atomic<size_t> snapshoted_no_v = {0};
};

View File

@ -16,61 +16,20 @@ class Snapshoter
public:
// How much sec is between snapshots
// snapshot_folder is path to common folder for all snapshots.
Snapshoter(ConcurrentMap<std::string, Db> &dbs, size_t snapshot_cycle,
std::string &&snapshot_folder);
Snapshoter(ConcurrentMap<std::string, Db> &dbs, size_t snapshot_cycle);
~Snapshoter();
// Imports latest snapshot into the databse. Multi thread safe.
void import(Db &db);
private:
void run(Logger &logger);
void run();
// Makes snapshot of given type
void make_snapshot(std::time_t now, const char *type);
// Makes snapshot. It only saves records which have changed since old_trans.
void snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionId const &old_trans);
// Loads snapshot. True if success
bool snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap);
std::string snapshot_file(Db &db, std::time_t const &now, const char *type)
{
return snapshot_db_dir(db) + "/" + std::to_string(now) + "_" + type;
}
std::string snapshot_commit_file(Db &db)
{
return snapshot_db_dir(db) + "/snapshot_commit.txt";
}
// Path to directory of database. Ensures that all necessary directorys
// exist.
std::string snapshot_db_dir(Db &db)
{
if (!sys::ensure_directory_exists(snapshot_folder)) {
logger.error("Error while creating directory \"{}\"",
snapshot_folder);
}
auto db_path = snapshot_folder + "/" + db.name();
if (!sys::ensure_directory_exists(db_path)) {
logger.error("Error while creating directory \"{}\"", db_path);
}
return db_path;
}
void make_snapshots();
Logger logger;
const size_t snapshot_cycle;
const size_t max_old_snapshots;
const std::string snapshot_folder;
std::unique_ptr<Thread> thread = {nullptr};
ConcurrentMap<std::string, Db> &dbms;
std::atomic<bool> snapshoting = {true};
};

View File

@ -4,13 +4,14 @@
#include "storage/indexes/indexes.hpp"
#include "storage/model/properties/property_family.hpp"
Db::Db() = default;
Db::Db(const std::string &name) : name_(name) {}
Db::Db(const std::string &name, Snapshoter &snap) : name_(name)
Db::Db() : name_("default"), snap_engine(*this, "default")
{
snap.import(*this);
snap_engine.import();
}
Db::Db(const std::string &name) : name_(name), snap_engine(*this, name)
{
snap_engine.import();
}
std::string const &Db::name() const { return name_; }

View File

@ -21,7 +21,7 @@ Db &Dbms::active(const std::string &name)
if (it == acc.end()) {
Snapshoter &snap = snapshoter;
it = acc.emplace(name, std::forward_as_tuple(name),
std::forward_as_tuple(name, snap))
std::forward_as_tuple(name))
.first;
}

View File

@ -0,0 +1,239 @@
#include "snapshot/snapshot_engine.hpp"
#include "config/config.hpp"
#include "database/db_accessor.hpp"
#include "logging/default.hpp"
#include "snapshot/snapshot_decoder.hpp"
#include "snapshot/snapshot_encoder.hpp"
#include "storage/indexes/indexes.hpp"
#include "threading/thread.hpp"
#include "utils/sys.hpp"
SnapshotEngine::SnapshotEngine(Db &db, std::string const &name)
: snapshot_folder(CONFIG(config::SNAPSHOTS_PATH)), db(db),
logger(logging::log->logger("SnapshotEngine db[" + name + "]"))
{
}
bool SnapshotEngine::make_snapshot()
{
std::lock_guard<std::mutex> lock(guard);
std::time_t now = std::time(nullptr);
return make_snapshot(now, "full");
}
bool SnapshotEngine::make_snapshot(std::time_t now, const char *type)
{
bool success = false;
auto snapshot_file_name = snapshot_file(now, type);
logger.info("Writing {} snapshot to file \"{}\"", type, snapshot_file_name);
DbTransaction t(db);
try {
std::ofstream snapshot_file(snapshot_file_name,
std::fstream::binary | std::fstream::trunc);
SnapshotEncoder snap(snapshot_file);
auto old_trans = tx::TransactionId(db.tx_engine);
snapshot(t, snap, old_trans);
auto res = sys::flush_file_to_disk(snapshot_file);
if (res == 0) {
t.trans.commit();
success = true;
} else {
logger.error("Error {} occured while flushing snapshot file", res);
t.trans.abort();
}
} catch (const std::exception &e) {
logger.error("Exception occured while creating {} snapshot", type);
logger.error("{}", e.what());
t.trans.abort();
}
if (success) {
std::ofstream commit_file(snapshot_commit_file(), std::fstream::app);
commit_file << snapshot_file_name << std::endl;
auto res = sys::flush_file_to_disk(commit_file);
if (res == 0) {
commit_file.close();
snapshoted_no_v.fetch_add(1);
} else {
logger.error("Error {} occured while flushing commit file", res);
}
}
return success;
}
bool SnapshotEngine::import()
{
std::lock_guard<std::mutex> lock(guard);
logger.info("Started import");
bool success = false;
try {
std::ifstream commit_file(snapshot_commit_file());
std::vector<std::string> snapshots;
std::string line;
while (std::getline(commit_file, line)) {
snapshots.push_back(line);
}
while (snapshots.size() > 0) {
logger.info("Importing data from snapshot \"{}\"",
snapshots.back());
DbTransaction t(db);
try {
std::ifstream snapshot_file(snapshots.back(),
std::fstream::binary);
SnapshotDecoder decoder(snapshot_file);
if (snapshot_load(t, decoder)) {
t.trans.commit();
logger.info("Succesfully imported snapshot \"{}\"",
snapshots.back());
success = true;
break;
} else {
t.trans.abort();
logger.info("Unuccesfully tryed to import snapshot \"{}\"",
snapshots.back());
}
} catch (const std::exception &e) {
logger.error("Error occured while importing snapshot \"{}\"",
snapshots.back());
logger.error("{}", e.what());
t.trans.abort();
}
snapshots.pop_back();
}
} catch (const std::exception &e) {
logger.error("Error occured while importing snapshot");
logger.error("{}", e.what());
}
logger.info("Finished import");
return success;
}
void SnapshotEngine::snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionId const &old_trans)
{
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Anounce property names
for (auto &family : db.graph.vertices.property_family_access()) {
snap.property_name_init(family.first);
}
for (auto &family : db.graph.edges.property_family_access()) {
snap.property_name_init(family.first);
}
// Anounce label names
for (auto &labels : db.graph.label_store.access()) {
snap.label_name_init(labels.first.to_string());
}
// Anounce edge_type names
for (auto &et : db.graph.edge_type_store.access()) {
snap.edge_type_name_init(et.first.to_string());
}
// Store vertices
snap.start_vertices();
t.vertex_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto va) { serialization::serialize_vertex(va, snap); });
// Store edges
snap.start_edges();
t.edge_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto ea) { serialization::serialize_edge(ea, snap); });
// Store info on existing indexes.
snap.start_indexes();
db.indexes().vertex_indexes([&](auto &i) { snap.index(i.definition()); });
db.indexes().edge_indexes([&](auto &i) { snap.index(i.definition()); });
snap.end();
}
bool SnapshotEngine::snapshot_load(DbTransaction const &dt,
SnapshotDecoder &snap)
{
std::unordered_map<uint64_t, VertexAccessor> vertices;
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Load names
snap.load_init();
// Load vertices
snap.begin_vertices();
while (!snap.end_vertices()) {
vertices.insert(serialization::deserialize_vertex(t, snap));
}
// Load edges
snap.begin_edges();
while (!snap.end_edges()) {
serialization::deserialize_edge(t, snap, vertices);
}
// Load indexes
snap.start_indexes();
auto indexs = db.indexes();
while (!snap.end()) {
// This will add index. It is alright for now to ignore if add_index
// return false.
indexs.add_index(snap.load_index());
}
return true;
}
std::string SnapshotEngine::snapshot_file(std::time_t const &now,
const char *type)
{
return snapshot_db_dir() + "/" + std::to_string(now) + "_" + type;
}
std::string SnapshotEngine::snapshot_commit_file()
{
return snapshot_db_dir() + "/snapshot_commit.txt";
}
std::string SnapshotEngine::snapshot_db_dir()
{
if (!sys::ensure_directory_exists(snapshot_folder)) {
logger.error("Error while creating directory \"{}\"", snapshot_folder);
}
auto db_path = snapshot_folder + "/" + db.name();
if (!sys::ensure_directory_exists(db_path)) {
logger.error("Error while creating directory \"{}\"", db_path);
}
return db_path;
}

View File

@ -9,9 +9,8 @@
#include "utils/sys.hpp"
Snapshoter::Snapshoter(ConcurrentMap<std::string, Db> &dbs,
size_t snapshot_cycle, std::string &&snapshot_folder)
: snapshot_cycle(snapshot_cycle), snapshot_folder(snapshot_folder),
dbms(dbs)
size_t snapshot_cycle)
: snapshot_cycle(snapshot_cycle), dbms(dbs)
{
thread = std::make_unique<Thread>([&]() {
logger = logging::log->logger("Snapshoter");
@ -19,7 +18,7 @@ Snapshoter::Snapshoter(ConcurrentMap<std::string, Db> &dbs,
this->snapshot_cycle);
try {
run(logger);
run();
} catch (const std::exception &e) {
logger.error("Irreversible error occured in snapshoter");
logger.error("{}", e.what());
@ -35,7 +34,7 @@ Snapshoter::~Snapshoter()
thread.get()->join();
}
void Snapshoter::run(Logger &logger)
void Snapshoter::run()
{
std::time_t last_snapshot = std::time(nullptr);
@ -44,7 +43,7 @@ void Snapshoter::run(Logger &logger)
if (now >= last_snapshot + snapshot_cycle) {
// It's time for snapshot
make_snapshot(now, "full");
make_snapshots();
last_snapshot = now;
} else {
@ -53,207 +52,13 @@ void Snapshoter::run(Logger &logger)
}
}
void Snapshoter::make_snapshot(std::time_t now, const char *type)
void Snapshoter::make_snapshots()
{
logger.info(std::string("Started ") + type + " snapshot cycle");
logger.info("Started snapshoting cycle");
for (auto &db : dbms.access()) {
auto snapshot_file_name = snapshot_file(db.second, now, type);
logger.info(std::string("Writing ") + type + " snapshot of database "
"\"{}\" to file \"{}\"",
db.first, snapshot_file_name);
DbTransaction t(db.second);
bool success = false;
try {
std::ofstream snapshot_file(
snapshot_file_name, std::fstream::binary | std::fstream::trunc);
SnapshotEncoder snap(snapshot_file);
auto old_trans = tx::TransactionId(db.second.tx_engine);
snapshot(t, snap, old_trans);
auto res = sys::flush_file_to_disk(snapshot_file);
if (res == 0) {
t.trans.commit();
success = true;
} else {
logger.error("Error {} occured while flushing snapshot file",
res);
t.trans.abort();
}
} catch (const std::exception &e) {
logger.error(std::string("Error occured while creating ") + type +
" "
"snapshot of database \"{}\"",
db.first);
logger.error("{}", e.what());
t.trans.abort();
}
if (success) {
std::ofstream commit_file(snapshot_commit_file(db.second),
std::fstream::app);
commit_file << snapshot_file_name << std::endl;
auto res = sys::flush_file_to_disk(commit_file);
if (res == 0) {
commit_file.close();
} else {
logger.error("Error {} occured while flushing commit file",
res);
}
}
db.second.snap_engine.make_snapshot();
}
logger.info(std::string("Finished ") + type + " snapshot cycle");
}
void Snapshoter::import(Db &db)
{
Logger logger = logging::log->logger("Snapshot import");
logger.info("Started import for database \"{}\"", db.name());
try {
std::ifstream commit_file(snapshot_commit_file(db));
std::vector<std::string> snapshots;
std::string line;
while (std::getline(commit_file, line)) {
snapshots.push_back(line);
}
while (snapshots.size() > 0) {
logger.info("Importing data from snapshot \"{}\" into "
"database \"{}\"",
snapshots.back(), db.name());
DbTransaction t(db);
try {
std::ifstream snapshot_file(snapshots.back(),
std::fstream::binary);
SnapshotDecoder decoder(snapshot_file);
if (snapshot_load(t, decoder)) {
t.trans.commit();
logger.info("Succesfully imported snapshot \"{}\" into "
"database \"{}\"",
snapshots.back(), db.name());
break;
} else {
t.trans.abort();
logger.info(
"Unuccesfully tryed to import snapshot \"{}\" into "
"database \"{}\"",
snapshots.back(), db.name());
}
} catch (const std::exception &e) {
logger.error(
"Error occured while importing snapshot \"{}\" into "
"database \"{}\"",
snapshots.back(), db.name());
logger.error("{}", e.what());
t.trans.abort();
}
snapshots.pop_back();
}
} catch (const std::exception &e) {
logger.error(
"Error occured while importing snapshot for database \"{}\"",
db.name());
logger.error("{}", e.what());
}
logger.info("Finished import for database \"{}\"", db.name());
}
void Snapshoter::snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionId const &old_trans)
{
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Anounce property names
for (auto &family : db.graph.vertices.property_family_access()) {
snap.property_name_init(family.first);
}
for (auto &family : db.graph.edges.property_family_access()) {
snap.property_name_init(family.first);
}
// Anounce label names
for (auto &labels : db.graph.label_store.access()) {
snap.label_name_init(labels.first.to_string());
}
// Anounce edge_type names
for (auto &et : db.graph.edge_type_store.access()) {
snap.edge_type_name_init(et.first.to_string());
}
// Store vertices
snap.start_vertices();
t.vertex_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto va) { serialization::serialize_vertex(va, snap); });
// Store edges
snap.start_edges();
t.edge_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto ea) { serialization::serialize_edge(ea, snap); });
// Store info on existing indexes.
snap.start_indexes();
db.indexes().vertex_indexes([&](auto &i) { snap.index(i.definition()); });
db.indexes().edge_indexes([&](auto &i) { snap.index(i.definition()); });
snap.end();
}
bool Snapshoter::snapshot_load(DbTransaction const &dt, SnapshotDecoder &snap)
{
std::unordered_map<uint64_t, VertexAccessor> vertices;
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Load names
snap.load_init();
// Load vertices
snap.begin_vertices();
while (!snap.end_vertices()) {
vertices.insert(serialization::deserialize_vertex(t, snap));
}
// Load edges
snap.begin_edges();
while (!snap.end_edges()) {
serialization::deserialize_edge(t, snap, vertices);
}
// Load indexes
snap.start_indexes();
auto indexs = db.indexes();
while (!snap.end()) {
// This will add index. It is alright for now to ignore if add_index
// return false.
indexs.add_index(snap.load_index());
}
return true;
logger.info("Finished snapshoting cycle");
}

View File