Remove old snapshot

Reviewers: buda, matej.gradicek

Reviewed By: buda

Differential Revision: https://phabricator.memgraph.io/D437
This commit is contained in:
Mislav Bradac 2017-06-07 15:42:52 +02:00
parent 65507da9eb
commit a45e5a9a38
8 changed files with 0 additions and 1023 deletions

View File

@ -1,119 +0,0 @@
#include "snapshot/snapshot_decoder.hpp"
SnapshotDecoder::SnapshotDecoder(std::ifstream &snap_file)
: decoder(snap_file) {}
// Loads propert names, label names, edge_type names.
void SnapshotDecoder::load_init() {
for (auto i = decoder.map_header(); i > 0; i--) {
std::string name;
decoder.string(name);
auto id = decoder.integer();
property_name_map.insert(std::make_pair(id, std::move(name)));
}
for (auto i = decoder.map_header(); i > 0; i--) {
std::string name;
decoder.string(name);
auto id = decoder.integer();
label_name_map.insert(std::make_pair(id, std::move(name)));
}
for (auto i = decoder.map_header(); i > 0; i--) {
std::string name;
decoder.string(name);
auto id = decoder.integer();
edge_type_name_map.insert(std::make_pair(id, std::move(name)));
}
}
// Begins process of reading vertices
void SnapshotDecoder::begin_vertices() {
std::string name;
decoder.string(name);
if (name != "vertices") {
throw DecoderException(
"Tryed to start reading vertices on illegal position marked as: " +
name);
}
}
// True if it is end of vertices
bool SnapshotDecoder::end_vertices() {
std::string name;
bool ret = decoder.string_try(name);
if (ret && name != "edges") {
throw DecoderException(
"Tryed to end reading vertices on illegal position marked as: " + name);
}
return ret;
}
// Begins process of loading edges
void SnapshotDecoder::begin_edges() {
// EMPTY
}
// True if it is end of edges
bool SnapshotDecoder::end_edges() {
std::string name;
bool ret = decoder.string_try(name);
if (ret && name != "indexes") {
throw DecoderException(
"Tryed to end reading edges on illegal position marked as: " + name);
}
return ret;
}
// Begins process of reading indexes.
void SnapshotDecoder::start_indexes() {
// EMPTY
}
// Loads IndexDefinition.
IndexDefinition SnapshotDecoder::load_index() {
return IndexDefinition::deserialize(decoder);
}
// True if it is end.
bool SnapshotDecoder::end() {
std::string name;
bool ret = decoder.string_try(name);
if (ret && name != "end") {
throw DecoderException("Tryed to end on illegal position marked as: " +
name);
}
return ret;
}
// ***************** from GraphDecoder
// Starts reading vertex.
Id SnapshotDecoder::vertex_start() { return Id(decoder.integer()); }
// Returns number of stored labels.
size_t SnapshotDecoder::label_count() { return decoder.list_header(); }
// Wiil read label into given storage.
std::string const &SnapshotDecoder::label() {
return label_name_map.at(decoder.integer());
}
// Starts reading edge. Return from to ids of connected vertices.
std::pair<Id, Id> SnapshotDecoder::edge_start() {
auto from = Id(decoder.integer());
auto to = Id(decoder.integer());
return std::make_pair(from, to);
}
// Reads edge_type into given storage.
std::string const &SnapshotDecoder::edge_type() {
return edge_type_name_map.at(decoder.integer());
}
// Returns number of stored propertys.
size_t SnapshotDecoder::property_count() { return decoder.map_header(); }
// Reads property name into given storage.
std::string const &SnapshotDecoder::property_name() {
return property_name_map.at(decoder.integer());
}

View File

@ -1,141 +0,0 @@
#pragma once
#include <fstream>
#include <unordered_map>
#include "communication/bolt/v1/transport/streamed_bolt_decoder.hpp"
#include "mvcc/id.hpp"
#include "serialization/graph_decoder.hpp"
#include "storage/indexes/index_definition.hpp"
#include "storage/model/properties/property.hpp"
// Decodes stored snapshot.
// Caller must respect loading order to be same as stored order with
// SnapshotEncoder.
// Main idea of knowing when something starts and ends is at certain points try
// to deserialize string and compare it with logically expected string seted by
// the SnapshotEncoder.
class SnapshotDecoder : public GraphDecoder {
public:
SnapshotDecoder(std::ifstream &snap_file);
// Loads propert names, label names, edge_type names.
void load_init();
// Begins process of reading vertices
void begin_vertices();
// True if it is end of vertices
bool end_vertices();
// Begins process of loading edges
void begin_edges();
// True if it is end of edges
bool end_edges();
// Begins process of reading indexes.
void start_indexes();
// Loads IndexDefinition.
IndexDefinition load_index();
// True if it is end.
bool end();
// ***************** from GraphDecoder
// Starts reading vertex.
Id vertex_start();
// Returns number of stored labels.
size_t label_count();
// Wiil read label into given storage.
std::string const &label();
// Starts reading edge. Return from to ids of connected vertices.
std::pair<Id, Id> edge_start();
// Reads edge_type into given storage.
std::string const &edge_type();
// Returns number of stored propertys.
size_t property_count();
// Reads property name into given storage.
std::string const &property_name();
// Reads property and calls T::handle for that property .
template <class T>
T property() {
if (decoder.is_list()) {
// Whe are deserializing an array.
auto size = decoder.list_header();
if (decoder.is_bool()) {
ArrayStore<bool> store;
for (auto i = 0; i < size; i++) {
store.push_back(decoder.read_bool());
}
return T::handle(std::move(store));
} else if (decoder.is_integer()) {
ArrayStore<int64_t> store;
for (auto i = 0; i < size; i++) {
store.push_back(decoder.integer());
}
return T::handle(std::move(store));
} else if (decoder.is_double()) {
ArrayStore<double> store;
for (auto i = 0; i < size; i++) {
store.push_back(decoder.read_double());
}
return T::handle(std::move(store));
} else if (decoder.is_string()) {
ArrayStore<std::string> store;
for (auto i = 0; i < size; i++) {
std::string s;
decoder.string(s);
store.push_back(std::move(s));
}
return T::handle(std::move(store));
}
} else {
// Whe are deserializing a primitive.
if (decoder.is_bool()) {
return T::handle(decoder.read_bool());
} else if (decoder.is_integer()) {
return T::handle(decoder.integer());
} else if (decoder.is_double()) {
return T::handle(decoder.read_double());
} else if (decoder.is_string()) {
std::string s;
decoder.string(s);
return T::handle(std::move(s));
}
}
throw DecoderException(
"Tryed to read property but found "
"unknown type in bolt marked as: ",
decoder.mark());
}
private:
bolt::StreamedBoltDecoder<std::ifstream> decoder;
// Contains for every property_name here snapshot local id.
std::unordered_map<size_t, std::string> property_name_map;
// Contains for every label_name here snapshot local id.
std::unordered_map<size_t, std::string> label_name_map;
// Contains for every edge_type here snapshot local id.
std::unordered_map<size_t, std::string> edge_type_name_map;
};

View File

@ -1,152 +0,0 @@
#include "snapshot/snapshot_encoder.hpp"
void SnapshotEncoder::property_name_init(std::string const &name) {
if (property_name_map.find(name) == property_name_map.end()) {
auto id = property_name_map.size();
property_name_map.insert(std::make_pair(name, id));
}
}
void SnapshotEncoder::label_name_init(std::string const &name) {
if (label_name_map.find(name) == label_name_map.end()) {
auto id = label_name_map.size();
label_name_map.insert(std::make_pair(name, id));
}
}
void SnapshotEncoder::edge_type_name_init(std::string const &name) {
if (edge_type_name_map.find(name) == edge_type_name_map.end()) {
auto id = edge_type_name_map.size();
edge_type_name_map.insert(std::make_pair(name, id));
}
}
void SnapshotEncoder::end() { encoder.write_string("end"); }
// **************** INDEX
// Prepares for indexes
void SnapshotEncoder::start_indexes() { encoder.write_string("indexes"); }
// Writes index definition
void SnapshotEncoder::index(IndexDefinition const &def) {
def.serialize(encoder);
}
// ************* VERTEX
// Prepares for vertices
void SnapshotEncoder::start_vertices() {
encoder.write_map_header(property_name_map.size());
for (auto p : property_name_map) {
encoder.write_string(p.first);
encoder.write_integer(p.second);
}
encoder.write_map_header(label_name_map.size());
for (auto p : label_name_map) {
encoder.write_string(p.first);
encoder.write_integer(p.second);
}
encoder.write_map_header(edge_type_name_map.size());
for (auto p : edge_type_name_map) {
encoder.write_string(p.first);
encoder.write_integer(p.second);
}
encoder.write_string("vertices");
}
// Starts writing vertex with given id.
void SnapshotEncoder::start_vertex(Id id) { encoder.write_integer(id); }
// Number of following label calls.
void SnapshotEncoder::label_count(size_t n) { encoder.write_list_header(n); }
// Label of currently started vertex.
void SnapshotEncoder::label(std::string const &l) {
encoder.write_integer(label_name_map.at(l));
}
// ************* EDGE
// Prepares for edges
void SnapshotEncoder::start_edges() { encoder.write_string("edges"); }
// Starts writing edge from vertex to vertex
void SnapshotEncoder::start_edge(Id from, Id to) {
encoder.write_integer(from);
encoder.write_integer(to);
}
// Type of currently started edge
void SnapshotEncoder::edge_type(std::string const &et) {
encoder.write_integer(edge_type_name_map.at(et));
}
// ******* PROPERTY
void SnapshotEncoder::property_count(size_t n) { encoder.write_map_header(n); }
void SnapshotEncoder::property_name(std::string const &name) {
encoder.write_integer(property_name_map.at(name));
}
void SnapshotEncoder::handle(const Void &v) { encoder.write_null(); }
void SnapshotEncoder::handle(const bool &prop) { encoder.write_bool(prop); }
void SnapshotEncoder::handle(const float &prop) { encoder.write_double(prop); }
void SnapshotEncoder::handle(const double &prop) { encoder.write_double(prop); }
void SnapshotEncoder::handle(const int32_t &prop) {
encoder.write_integer(prop);
}
void SnapshotEncoder::handle(const int64_t &prop) {
encoder.write_integer(prop);
}
void SnapshotEncoder::handle(const std::string &value) {
encoder.write_string(value);
}
void SnapshotEncoder::handle(const ArrayStore<bool> &a) {
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_bool(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<int32_t> &a) {
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_integer(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<int64_t> &a) {
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_integer(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<float> &a) {
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_double(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<double> &a) {
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_double(e);
}
}
void SnapshotEncoder::handle(const ArrayStore<std::string> &a) {
encoder.write_list_header(a.size());
for (auto const &e : a) {
encoder.write_string(e);
}
}

View File

@ -1,120 +0,0 @@
#pragma once
#include <string>
#include <unordered_map>
#include "communication/bolt/v1/transport/bolt_encoder.hpp"
#include "mvcc/id.hpp"
#include "serialization/graph_encoder.hpp"
#include "serialization/serialization.hpp"
#include "storage/indexes/index_definition.hpp"
#include "utils/stream_wrapper.hpp"
// Represents creation of a snapshot. Contains all necessary informations
// for write. Caller is responisble to structure his calls as following:
// * property_name_init
// * label_name_init
// * edge_type_name_init
// 1 start_vertices
// * <vertex>
// 1 start_edges
// * <edges>
// 1 start_indexes
// * index
// 1 end
class SnapshotEncoder : public GraphEncoder {
public:
SnapshotEncoder(std::ofstream &stream) : stream(stream) {}
SnapshotEncoder(SnapshotEncoder const &) = delete;
SnapshotEncoder(SnapshotEncoder &&) = delete;
SnapshotEncoder &operator=(SnapshotEncoder const &) = delete;
SnapshotEncoder &operator=(SnapshotEncoder &&) = delete;
// Tells in advance which names will be used.
void property_name_init(std::string const &name);
// Tells in advance which labels will be used.
void label_name_init(std::string const &name);
// Tells in advance which edge_type will be used.
void edge_type_name_init(std::string const &name);
// Prepares for vertices
void start_vertices();
// Prepares for edges
void start_edges();
// Prepares for indexes
void start_indexes();
// Writes index definition
void index(IndexDefinition const &);
// Finishes snapshot
void end();
// *********************From graph encoder
// Starts writing vertex with given id.
void start_vertex(Id id);
// Number of following label calls.
void label_count(size_t n);
// Label of currently started vertex.
void label(std::string const &l);
// Starts writing edge from vertex to vertex
void start_edge(Id from, Id to);
// Type of currently started edge
void edge_type(std::string const &et);
// Number of following paired property_name,handle calls.
void property_count(size_t n);
// Property family name of next property for currently started element.
void property_name(std::string const &name);
void handle(const Void &v);
void handle(const bool &prop);
void handle(const float &prop);
void handle(const double &prop);
void handle(const int32_t &prop);
void handle(const int64_t &prop);
void handle(const std::string &value);
void handle(const ArrayStore<bool> &);
void handle(const ArrayStore<int32_t> &);
void handle(const ArrayStore<int64_t> &);
void handle(const ArrayStore<float> &);
void handle(const ArrayStore<double> &);
void handle(const ArrayStore<std::string> &);
private:
std::ofstream &stream;
StreamWrapper<std::ofstream> wrapped = {stream};
bolt::BoltEncoder<StreamWrapper<std::ofstream>> encoder = {wrapped};
// Contains for every property_name here snapshot local id.
std::unordered_map<std::string, size_t> property_name_map;
// Contains for every label_name here snapshot local id.
std::unordered_map<std::string, size_t> label_name_map;
// Contains for every edge_type here snapshot local id.
std::unordered_map<std::string, size_t> edge_type_name_map;
};

View File

@ -1,324 +0,0 @@
#include "snapshot/snapshot_engine.hpp"
#include "config/config.hpp"
#include "database/db_accessor.hpp"
#include "logging/default.hpp"
#include "snapshot/snapshot_decoder.hpp"
#include "snapshot/snapshot_encoder.hpp"
#include "storage/indexes/indexes.hpp"
#include "threading/thread.hpp"
#include "utils/sys.hpp"
SnapshotEngine::SnapshotEngine(Db &db)
: snapshot_folder(CONFIG(config::SNAPSHOTS_PATH)),
db(db),
max_retained_snapshots(CONFIG_INTEGER(config::MAX_RETAINED_SNAPSHOTS)),
logger(logging::log->logger("SnapshotEngine db[" + db.name() + "]")) {}
bool SnapshotEngine::make_snapshot() {
std::lock_guard<std::mutex> lock(guard);
std::time_t now = std::time(nullptr);
if (make_snapshot(now, "full")) {
// Sanpsthot was created so whe should check if some older snapshots
// should be deleted.
clean_snapshots();
return true;
} else {
return false;
}
}
void SnapshotEngine::clean_snapshots() {
logger.info("Started cleaning commit_file");
// Whe first count the number of snapshots that whe know about in commit
// file.
std::vector<std::string> lines;
{
std::ifstream commit_file(snapshot_commit_file());
std::string line;
while (std::getline(commit_file, line)) {
lines.push_back(line);
}
}
int n = lines.size() - max_retained_snapshots;
if (n > 0) {
// Whe have to much snapshots so whe should delete some.
std::ofstream commit_file(snapshot_commit_file(), std::fstream::trunc);
// First whw will rewrite commit file to contain only
// max_retained_snapshots newest snapshots.
for (auto i = n; i < lines.size(); i++) {
commit_file << lines[i] << std::endl;
}
auto res = sys::flush_file_to_disk(commit_file);
if (res == 0) {
// Commit file was succesfully changed so whe can now delete
// snapshots which whe evicted from commit file.
commit_file.close();
logger.info("Removed {} snapshot from commit_file", n);
for (auto i = 0; i < n; i++) {
auto res = std::remove(lines[i].c_str());
if (res == 0) {
logger.info("Succesfully deleted snapshot file \"{}\"", lines[i]);
} else {
logger.error("Error {} occured while deleting snapshot file \"{}\"",
res, lines[i]);
}
}
} else {
logger.error("Error {} occured while flushing commit file", res);
}
}
logger.info("Finished cleaning commit_file");
}
bool SnapshotEngine::make_snapshot(std::time_t now, const char *type) {
bool success = false;
auto snapshot_file_name = snapshot_file(now, type);
logger.info("Writing {} snapshot to file \"{}\"", type, snapshot_file_name);
DbTransaction t(db);
try {
std::ofstream snapshot_file(snapshot_file_name,
std::fstream::binary | std::fstream::trunc);
SnapshotEncoder snap(snapshot_file);
auto old_trans =
tx::TransactionRead(db.tx_engine); // Overenginered for incremental
// snapshot. Can be removed.
// Everything is ready for creation of snapshot.
snapshot(t, snap, old_trans);
auto res = sys::flush_file_to_disk(snapshot_file);
if (res == 0) {
// Snapshot was succesfully written to disk.
t.trans.commit();
success = true;
} else {
logger.error("Error {} occured while flushing snapshot file", res);
t.trans.abort();
}
} catch (const std::exception &e) {
logger.error("Exception occured while creating {} snapshot", type);
logger.error("{}", e.what());
t.trans.abort();
}
if (success) {
// Snapshot was succesfully created but for it to be reachable for
// import whe must add it to the end of commit file.
std::ofstream commit_file(snapshot_commit_file(), std::fstream::app);
commit_file << snapshot_file_name << std::endl;
auto res = sys::flush_file_to_disk(commit_file);
if (res == 0) {
commit_file.close();
snapshoted_no_v.fetch_add(1);
// Snapshot was succesfully commited.
} else {
logger.error("Error {} occured while flushing commit file", res);
}
}
return success;
}
bool SnapshotEngine::import() {
std::lock_guard<std::mutex> lock(guard);
logger.info("Started import");
bool success = false;
try {
std::ifstream commit_file(snapshot_commit_file());
// Whe first load all known snpashot file names from commit file.
std::vector<std::string> snapshots;
std::string line;
while (std::getline(commit_file, line)) {
snapshots.push_back(line);
}
while (snapshots.size() > 0) {
logger.info("Importing data from snapshot \"{}\"", snapshots.back());
DbAccessor t(db);
try {
std::ifstream snapshot_file(snapshots.back(), std::fstream::binary);
SnapshotDecoder decoder(snapshot_file);
auto indexes = snapshot_load(t, decoder);
if (t.commit()) {
logger.info("Succesfully imported snapshot \"{}\"", snapshots.back());
add_indexes(indexes);
success = true;
break;
} else {
logger.info(
"Unuccesfully tryed to import snapshot "
"\"{}\"",
snapshots.back());
}
} catch (const std::exception &e) {
logger.error("Error occured while importing snapshot \"{}\"",
snapshots.back());
logger.error("{}", e.what());
t.abort();
}
snapshots.pop_back();
// Whe will try to import older snapashot if such one exist.
}
} catch (const std::exception &e) {
logger.error("Error occured while importing snapshot");
logger.error("{}", e.what());
}
logger.info("Finished import");
return success;
}
void SnapshotEngine::snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionRead const &old_trans) {
Db &db = dt.db;
DbAccessor t(db, dt.trans);
// Anounce property names
for (auto &family : db.graph.vertices.property_family_access()) {
snap.property_name_init(family.first);
}
for (auto &family : db.graph.edges.property_family_access()) {
snap.property_name_init(family.first);
}
// Anounce label names
for (auto &labels : db.graph.label_store.access()) {
snap.label_name_init(labels.first.to_string());
}
// Anounce edge_type names
for (auto &et : db.graph.edge_type_store.access()) {
snap.edge_type_name_init(et.first.to_string());
}
// Store vertices
snap.start_vertices();
t.vertex_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto va) { serialization::serialize_vertex(va, snap); });
// Store edges
snap.start_edges();
t.edge_access()
.fill()
.filter([&](auto va) { return !va.is_visble_to(old_trans); })
.for_all([&](auto ea) { serialization::serialize_edge(ea, snap); });
// Store info on existing indexes.
snap.start_indexes();
db.indexes().vertex_indexes([&](auto &i) { snap.index(i.definition()); });
db.indexes().edge_indexes([&](auto &i) { snap.index(i.definition()); });
snap.end();
}
std::vector<IndexDefinition> SnapshotEngine::snapshot_load(
DbAccessor &t, SnapshotDecoder &snap) {
std::unordered_map<uint64_t, VertexAccessor> vertices;
// Load names
snap.load_init();
// Load vertices
snap.begin_vertices();
size_t v_count = 0;
while (!snap.end_vertices()) {
vertices.insert(serialization::deserialize_vertex(t, snap));
v_count++;
}
logger.info("Loaded {} vertices", v_count);
// Load edges
snap.begin_edges();
size_t e_count = 0;
while (!snap.end_edges()) {
serialization::deserialize_edge(t, snap, vertices);
e_count++;
}
logger.info("Loaded {} edges", e_count);
// Load indexes
snap.start_indexes();
std::vector<IndexDefinition> indexes;
while (!snap.end()) {
indexes.push_back(snap.load_index());
}
return indexes;
}
void SnapshotEngine::add_indexes(std::vector<IndexDefinition> &v) {
logger.info("Adding: {} indexes", v.size());
for (auto id : v) {
// TODO: It is alright for now to ignore if add_index return false. I am
// not even sure if false should stop snapshot loading.
if (!db.indexes().add_index(id)) {
logger.warn(
"Failed to add index, but still continuing with "
"loading snapshot");
}
}
}
std::string SnapshotEngine::snapshot_file(std::time_t const &now,
const char *type) {
// Current nano time less than second.
auto now_nano = std::chrono::time_point_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now())
.time_since_epoch()
.count() %
(1000 * 1000 * 1000);
return snapshot_db_dir() + "/" + std::to_string(now) + "_" +
std::to_string(now_nano) + "_" + type;
}
std::string SnapshotEngine::snapshot_commit_file() {
return snapshot_db_dir() + "/snapshot_commit.txt";
}
std::string SnapshotEngine::snapshot_db_dir() {
if (!sys::ensure_directory_exists(snapshot_folder)) {
logger.error("Error while creating directory \"{}\"", snapshot_folder);
}
auto db_path = snapshot_folder + "/" + db.name();
if (!sys::ensure_directory_exists(db_path)) {
logger.error("Error while creating directory \"{}\"", db_path);
}
return db_path;
}

View File

@ -1,72 +0,0 @@
#pragma once
#include <mutex>
#include <unordered_map>
#include "logging/default.hpp"
#include "storage/indexes/index_definition.hpp"
#include "transactions/transaction.hpp"
class SnapshotEncoder;
class SnapshotDecoder;
class Db;
class DbTransaction;
class DbAccessor;
// Captures snapshots. Only one per database should exist.
class SnapshotEngine {
public:
SnapshotEngine(Db &db);
~SnapshotEngine() = default;
// Returns number of succesffuly created snapshots.
size_t snapshoted_no() { return snapshoted_no_v.load(); }
// Imports latest snapshot into the databse. Blocks until other calls don't
// end.
bool import();
// Makes snapshot of given type. Blocks until other calls don't end.
bool make_snapshot();
private:
// Removes excess of snapshots starting with oldest one.
void clean_snapshots();
// Makes snapshot of given type
bool make_snapshot(std::time_t now, const char *type);
// Makes snapshot. It only saves records which have changed since old_trans.
void snapshot(DbTransaction const &dt, SnapshotEncoder &snap,
tx::TransactionRead const &old_trans);
// Loads snapshot. True if success. Returns indexes which were in snapshot.
std::vector<IndexDefinition> snapshot_load(DbAccessor &t,
SnapshotDecoder &snap);
// Adds indexes. Should be called outside transactions.
void add_indexes(std::vector<IndexDefinition> &v);
// Will return different name on every call.
std::string snapshot_file(std::time_t const &now, const char *type);
// Returns name of snapshot commit file.
std::string snapshot_commit_file();
// Path to directory of database. Ensures that all necessary directorys
// exist.
std::string snapshot_db_dir();
Logger logger;
Db &db;
std::mutex guard;
const std::string snapshot_folder;
// Determines how many newest snapshot will be preserved, while the other
// ones will be deleted.
const size_t max_retained_snapshots;
std::atomic<size_t> snapshoted_no_v = {0};
};

View File

@ -1,62 +0,0 @@
#include "snapshot/snapshoter.hpp"
#include "database/db_accessor.hpp"
#include "logging/default.hpp"
#include "snapshot/snapshot_decoder.hpp"
#include "snapshot/snapshot_encoder.hpp"
#include "storage/indexes/indexes.hpp"
#include "threading/thread.hpp"
#include "utils/sys.hpp"
Snapshoter::Snapshoter(ConcurrentMap<std::string, Db> &dbs,
size_t snapshot_cycle)
: snapshot_cycle(snapshot_cycle), dbms(dbs) {
// Start snapshoter thread.
thread = std::make_unique<Thread>([&]() {
logger = logging::log->logger("Snapshoter");
logger.info("Started with snapshoot cycle of {} sec", this->snapshot_cycle);
try {
run();
} catch (const std::exception &e) {
logger.error("Irreversible error occured in snapshoter");
logger.error("{}", e.what());
}
logger.info("Shutting down snapshoter");
});
}
Snapshoter::~Snapshoter() {
snapshoting.store(false, std::memory_order_release);
thread.get()->join();
}
void Snapshoter::run() {
std::time_t last_snapshot = std::time(nullptr);
while (snapshoting.load(std::memory_order_acquire)) {
std::time_t now = std::time(nullptr);
if (now >= last_snapshot + snapshot_cycle) {
// It's time for snapshot
make_snapshots();
last_snapshot = now;
} else {
// It isn't time for snapshot so i should wait.
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
void Snapshoter::make_snapshots() {
logger.info("Started snapshoting cycle");
for (auto &db : dbms.access()) {
db.second.snap_engine.make_snapshot();
}
logger.info("Finished snapshoting cycle");
}

View File

@ -1,33 +0,0 @@
#pragma once
#include <unordered_map>
#include "database/graph_db.hpp"
#include "logging/default.hpp"
#include "threading/thread.hpp"
class SnapshotEncoder;
class SnapshotDecoder;
// Captures snapshots.
class Snapshoter {
public:
// How much sec is between snapshots
// snapshot_folder is path to common folder for all snapshots.
Snapshoter(ConcurrentMap<std::string, Db> &dbs, size_t snapshot_cycle);
~Snapshoter();
private:
void run();
// Makes snapshot of given type
void make_snapshots();
Logger logger;
const size_t snapshot_cycle;
std::unique_ptr<Thread> thread = {nullptr};
ConcurrentMap<std::string, Db> &dbms;
std::atomic<bool> snapshoting = {true};
};