diff --git a/src/snapshot/snapshot_decoder.cpp b/src/snapshot/snapshot_decoder.cpp deleted file mode 100644 index 4a505621c..000000000 --- a/src/snapshot/snapshot_decoder.cpp +++ /dev/null @@ -1,119 +0,0 @@ -#include "snapshot/snapshot_decoder.hpp" - -SnapshotDecoder::SnapshotDecoder(std::ifstream &snap_file) - : decoder(snap_file) {} - -// Loads propert names, label names, edge_type names. -void SnapshotDecoder::load_init() { - for (auto i = decoder.map_header(); i > 0; i--) { - std::string name; - decoder.string(name); - auto id = decoder.integer(); - property_name_map.insert(std::make_pair(id, std::move(name))); - } - - for (auto i = decoder.map_header(); i > 0; i--) { - std::string name; - decoder.string(name); - auto id = decoder.integer(); - label_name_map.insert(std::make_pair(id, std::move(name))); - } - - for (auto i = decoder.map_header(); i > 0; i--) { - std::string name; - decoder.string(name); - auto id = decoder.integer(); - edge_type_name_map.insert(std::make_pair(id, std::move(name))); - } -} - -// Begins process of reading vertices -void SnapshotDecoder::begin_vertices() { - std::string name; - decoder.string(name); - if (name != "vertices") { - throw DecoderException( - "Tryed to start reading vertices on illegal position marked as: " + - name); - } -} - -// True if it is end of vertices -bool SnapshotDecoder::end_vertices() { - std::string name; - bool ret = decoder.string_try(name); - if (ret && name != "edges") { - throw DecoderException( - "Tryed to end reading vertices on illegal position marked as: " + name); - } - return ret; -} - -// Begins process of loading edges -void SnapshotDecoder::begin_edges() { - // EMPTY -} - -// True if it is end of edges -bool SnapshotDecoder::end_edges() { - std::string name; - bool ret = decoder.string_try(name); - if (ret && name != "indexes") { - throw DecoderException( - "Tryed to end reading edges on illegal position marked as: " + name); - } - return ret; -} - -// Begins process of reading indexes. -void SnapshotDecoder::start_indexes() { - // EMPTY -} - -// Loads IndexDefinition. -IndexDefinition SnapshotDecoder::load_index() { - return IndexDefinition::deserialize(decoder); -} - -// True if it is end. -bool SnapshotDecoder::end() { - std::string name; - bool ret = decoder.string_try(name); - if (ret && name != "end") { - throw DecoderException("Tryed to end on illegal position marked as: " + - name); - } - return ret; -} - -// ***************** from GraphDecoder -// Starts reading vertex. -Id SnapshotDecoder::vertex_start() { return Id(decoder.integer()); } - -// Returns number of stored labels. -size_t SnapshotDecoder::label_count() { return decoder.list_header(); } - -// Wiil read label into given storage. -std::string const &SnapshotDecoder::label() { - return label_name_map.at(decoder.integer()); -} - -// Starts reading edge. Return from to ids of connected vertices. -std::pair SnapshotDecoder::edge_start() { - auto from = Id(decoder.integer()); - auto to = Id(decoder.integer()); - return std::make_pair(from, to); -} - -// Reads edge_type into given storage. -std::string const &SnapshotDecoder::edge_type() { - return edge_type_name_map.at(decoder.integer()); -} - -// Returns number of stored propertys. -size_t SnapshotDecoder::property_count() { return decoder.map_header(); } - -// Reads property name into given storage. -std::string const &SnapshotDecoder::property_name() { - return property_name_map.at(decoder.integer()); -} diff --git a/src/snapshot/snapshot_decoder.hpp b/src/snapshot/snapshot_decoder.hpp deleted file mode 100644 index f127f22f6..000000000 --- a/src/snapshot/snapshot_decoder.hpp +++ /dev/null @@ -1,141 +0,0 @@ -#pragma once - -#include -#include - -#include "communication/bolt/v1/transport/streamed_bolt_decoder.hpp" -#include "mvcc/id.hpp" -#include "serialization/graph_decoder.hpp" -#include "storage/indexes/index_definition.hpp" -#include "storage/model/properties/property.hpp" - -// Decodes stored snapshot. -// Caller must respect loading order to be same as stored order with -// SnapshotEncoder. -// Main idea of knowing when something starts and ends is at certain points try -// to deserialize string and compare it with logically expected string seted by -// the SnapshotEncoder. -class SnapshotDecoder : public GraphDecoder { - public: - SnapshotDecoder(std::ifstream &snap_file); - - // Loads propert names, label names, edge_type names. - void load_init(); - - // Begins process of reading vertices - void begin_vertices(); - - // True if it is end of vertices - bool end_vertices(); - - // Begins process of loading edges - void begin_edges(); - - // True if it is end of edges - bool end_edges(); - - // Begins process of reading indexes. - void start_indexes(); - - // Loads IndexDefinition. - IndexDefinition load_index(); - - // True if it is end. - bool end(); - - // ***************** from GraphDecoder - // Starts reading vertex. - Id vertex_start(); - - // Returns number of stored labels. - size_t label_count(); - - // Wiil read label into given storage. - std::string const &label(); - - // Starts reading edge. Return from to ids of connected vertices. - std::pair edge_start(); - - // Reads edge_type into given storage. - std::string const &edge_type(); - - // Returns number of stored propertys. - size_t property_count(); - - // Reads property name into given storage. - std::string const &property_name(); - - // Reads property and calls T::handle for that property . - template - T property() { - if (decoder.is_list()) { - // Whe are deserializing an array. - - auto size = decoder.list_header(); - if (decoder.is_bool()) { - ArrayStore store; - for (auto i = 0; i < size; i++) { - store.push_back(decoder.read_bool()); - } - return T::handle(std::move(store)); - - } else if (decoder.is_integer()) { - ArrayStore store; - for (auto i = 0; i < size; i++) { - store.push_back(decoder.integer()); - } - return T::handle(std::move(store)); - - } else if (decoder.is_double()) { - ArrayStore store; - for (auto i = 0; i < size; i++) { - store.push_back(decoder.read_double()); - } - return T::handle(std::move(store)); - - } else if (decoder.is_string()) { - ArrayStore store; - for (auto i = 0; i < size; i++) { - std::string s; - decoder.string(s); - store.push_back(std::move(s)); - } - return T::handle(std::move(store)); - } - } else { - // Whe are deserializing a primitive. - - if (decoder.is_bool()) { - return T::handle(decoder.read_bool()); - - } else if (decoder.is_integer()) { - return T::handle(decoder.integer()); - - } else if (decoder.is_double()) { - return T::handle(decoder.read_double()); - - } else if (decoder.is_string()) { - std::string s; - decoder.string(s); - return T::handle(std::move(s)); - } - } - - throw DecoderException( - "Tryed to read property but found " - "unknown type in bolt marked as: ", - decoder.mark()); - } - - private: - bolt::StreamedBoltDecoder decoder; - - // Contains for every property_name here snapshot local id. - std::unordered_map property_name_map; - - // Contains for every label_name here snapshot local id. - std::unordered_map label_name_map; - - // Contains for every edge_type here snapshot local id. - std::unordered_map edge_type_name_map; -}; diff --git a/src/snapshot/snapshot_encoder.cpp b/src/snapshot/snapshot_encoder.cpp deleted file mode 100644 index daf660a97..000000000 --- a/src/snapshot/snapshot_encoder.cpp +++ /dev/null @@ -1,152 +0,0 @@ -#include "snapshot/snapshot_encoder.hpp" - -void SnapshotEncoder::property_name_init(std::string const &name) { - if (property_name_map.find(name) == property_name_map.end()) { - auto id = property_name_map.size(); - property_name_map.insert(std::make_pair(name, id)); - } -} - -void SnapshotEncoder::label_name_init(std::string const &name) { - if (label_name_map.find(name) == label_name_map.end()) { - auto id = label_name_map.size(); - label_name_map.insert(std::make_pair(name, id)); - } -} - -void SnapshotEncoder::edge_type_name_init(std::string const &name) { - if (edge_type_name_map.find(name) == edge_type_name_map.end()) { - auto id = edge_type_name_map.size(); - edge_type_name_map.insert(std::make_pair(name, id)); - } -} - -void SnapshotEncoder::end() { encoder.write_string("end"); } - -// **************** INDEX -// Prepares for indexes -void SnapshotEncoder::start_indexes() { encoder.write_string("indexes"); } - -// Writes index definition -void SnapshotEncoder::index(IndexDefinition const &def) { - def.serialize(encoder); -} - -// ************* VERTEX -// Prepares for vertices -void SnapshotEncoder::start_vertices() { - encoder.write_map_header(property_name_map.size()); - for (auto p : property_name_map) { - encoder.write_string(p.first); - encoder.write_integer(p.second); - } - - encoder.write_map_header(label_name_map.size()); - for (auto p : label_name_map) { - encoder.write_string(p.first); - encoder.write_integer(p.second); - } - - encoder.write_map_header(edge_type_name_map.size()); - for (auto p : edge_type_name_map) { - encoder.write_string(p.first); - encoder.write_integer(p.second); - } - - encoder.write_string("vertices"); -} - -// Starts writing vertex with given id. -void SnapshotEncoder::start_vertex(Id id) { encoder.write_integer(id); } - -// Number of following label calls. -void SnapshotEncoder::label_count(size_t n) { encoder.write_list_header(n); } - -// Label of currently started vertex. -void SnapshotEncoder::label(std::string const &l) { - encoder.write_integer(label_name_map.at(l)); -} - -// ************* EDGE -// Prepares for edges -void SnapshotEncoder::start_edges() { encoder.write_string("edges"); } - -// Starts writing edge from vertex to vertex -void SnapshotEncoder::start_edge(Id from, Id to) { - encoder.write_integer(from); - encoder.write_integer(to); -} - -// Type of currently started edge -void SnapshotEncoder::edge_type(std::string const &et) { - encoder.write_integer(edge_type_name_map.at(et)); -} - -// ******* PROPERTY -void SnapshotEncoder::property_count(size_t n) { encoder.write_map_header(n); } - -void SnapshotEncoder::property_name(std::string const &name) { - encoder.write_integer(property_name_map.at(name)); -} - -void SnapshotEncoder::handle(const Void &v) { encoder.write_null(); } - -void SnapshotEncoder::handle(const bool &prop) { encoder.write_bool(prop); } - -void SnapshotEncoder::handle(const float &prop) { encoder.write_double(prop); } - -void SnapshotEncoder::handle(const double &prop) { encoder.write_double(prop); } - -void SnapshotEncoder::handle(const int32_t &prop) { - encoder.write_integer(prop); -} - -void SnapshotEncoder::handle(const int64_t &prop) { - encoder.write_integer(prop); -} - -void SnapshotEncoder::handle(const std::string &value) { - encoder.write_string(value); -} - -void SnapshotEncoder::handle(const ArrayStore &a) { - encoder.write_list_header(a.size()); - for (auto const &e : a) { - encoder.write_bool(e); - } -} - -void SnapshotEncoder::handle(const ArrayStore &a) { - encoder.write_list_header(a.size()); - for (auto const &e : a) { - encoder.write_integer(e); - } -} - -void SnapshotEncoder::handle(const ArrayStore &a) { - encoder.write_list_header(a.size()); - for (auto const &e : a) { - encoder.write_integer(e); - } -} - -void SnapshotEncoder::handle(const ArrayStore &a) { - encoder.write_list_header(a.size()); - for (auto const &e : a) { - encoder.write_double(e); - } -} - -void SnapshotEncoder::handle(const ArrayStore &a) { - encoder.write_list_header(a.size()); - for (auto const &e : a) { - encoder.write_double(e); - } -} - -void SnapshotEncoder::handle(const ArrayStore &a) { - encoder.write_list_header(a.size()); - for (auto const &e : a) { - encoder.write_string(e); - } -} diff --git a/src/snapshot/snapshot_encoder.hpp b/src/snapshot/snapshot_encoder.hpp deleted file mode 100644 index d1bb68e0f..000000000 --- a/src/snapshot/snapshot_encoder.hpp +++ /dev/null @@ -1,120 +0,0 @@ -#pragma once - -#include -#include - -#include "communication/bolt/v1/transport/bolt_encoder.hpp" -#include "mvcc/id.hpp" -#include "serialization/graph_encoder.hpp" -#include "serialization/serialization.hpp" -#include "storage/indexes/index_definition.hpp" -#include "utils/stream_wrapper.hpp" - -// Represents creation of a snapshot. Contains all necessary informations -// for write. Caller is responisble to structure his calls as following: -// * property_name_init -// * label_name_init -// * edge_type_name_init -// 1 start_vertices -// * -// 1 start_edges -// * -// 1 start_indexes -// * index -// 1 end -class SnapshotEncoder : public GraphEncoder { - public: - SnapshotEncoder(std::ofstream &stream) : stream(stream) {} - - SnapshotEncoder(SnapshotEncoder const &) = delete; - SnapshotEncoder(SnapshotEncoder &&) = delete; - - SnapshotEncoder &operator=(SnapshotEncoder const &) = delete; - SnapshotEncoder &operator=(SnapshotEncoder &&) = delete; - - // Tells in advance which names will be used. - void property_name_init(std::string const &name); - - // Tells in advance which labels will be used. - void label_name_init(std::string const &name); - - // Tells in advance which edge_type will be used. - void edge_type_name_init(std::string const &name); - - // Prepares for vertices - void start_vertices(); - - // Prepares for edges - void start_edges(); - - // Prepares for indexes - void start_indexes(); - - // Writes index definition - void index(IndexDefinition const &); - - // Finishes snapshot - void end(); - - // *********************From graph encoder - // Starts writing vertex with given id. - void start_vertex(Id id); - - // Number of following label calls. - void label_count(size_t n); - - // Label of currently started vertex. - void label(std::string const &l); - - // Starts writing edge from vertex to vertex - void start_edge(Id from, Id to); - - // Type of currently started edge - void edge_type(std::string const &et); - - // Number of following paired property_name,handle calls. - void property_count(size_t n); - - // Property family name of next property for currently started element. - void property_name(std::string const &name); - - void handle(const Void &v); - - void handle(const bool &prop); - - void handle(const float &prop); - - void handle(const double &prop); - - void handle(const int32_t &prop); - - void handle(const int64_t &prop); - - void handle(const std::string &value); - - void handle(const ArrayStore &); - - void handle(const ArrayStore &); - - void handle(const ArrayStore &); - - void handle(const ArrayStore &); - - void handle(const ArrayStore &); - - void handle(const ArrayStore &); - - private: - std::ofstream &stream; - StreamWrapper wrapped = {stream}; - bolt::BoltEncoder> encoder = {wrapped}; - - // Contains for every property_name here snapshot local id. - std::unordered_map property_name_map; - - // Contains for every label_name here snapshot local id. - std::unordered_map label_name_map; - - // Contains for every edge_type here snapshot local id. - std::unordered_map edge_type_name_map; -}; diff --git a/src/snapshot/snapshot_engine.cpp b/src/snapshot/snapshot_engine.cpp deleted file mode 100644 index 8f2f718fc..000000000 --- a/src/snapshot/snapshot_engine.cpp +++ /dev/null @@ -1,324 +0,0 @@ -#include "snapshot/snapshot_engine.hpp" - -#include "config/config.hpp" -#include "database/db_accessor.hpp" -#include "logging/default.hpp" -#include "snapshot/snapshot_decoder.hpp" -#include "snapshot/snapshot_encoder.hpp" -#include "storage/indexes/indexes.hpp" -#include "threading/thread.hpp" -#include "utils/sys.hpp" - -SnapshotEngine::SnapshotEngine(Db &db) - : snapshot_folder(CONFIG(config::SNAPSHOTS_PATH)), - db(db), - max_retained_snapshots(CONFIG_INTEGER(config::MAX_RETAINED_SNAPSHOTS)), - logger(logging::log->logger("SnapshotEngine db[" + db.name() + "]")) {} - -bool SnapshotEngine::make_snapshot() { - std::lock_guard lock(guard); - std::time_t now = std::time(nullptr); - if (make_snapshot(now, "full")) { - // Sanpsthot was created so whe should check if some older snapshots - // should be deleted. - clean_snapshots(); - return true; - - } else { - return false; - } -} - -void SnapshotEngine::clean_snapshots() { - logger.info("Started cleaning commit_file"); - // Whe first count the number of snapshots that whe know about in commit - // file. - std::vector lines; - { - std::ifstream commit_file(snapshot_commit_file()); - - std::string line; - while (std::getline(commit_file, line)) { - lines.push_back(line); - } - } - - int n = lines.size() - max_retained_snapshots; - if (n > 0) { - // Whe have to much snapshots so whe should delete some. - std::ofstream commit_file(snapshot_commit_file(), std::fstream::trunc); - - // First whw will rewrite commit file to contain only - // max_retained_snapshots newest snapshots. - for (auto i = n; i < lines.size(); i++) { - commit_file << lines[i] << std::endl; - } - - auto res = sys::flush_file_to_disk(commit_file); - if (res == 0) { - // Commit file was succesfully changed so whe can now delete - // snapshots which whe evicted from commit file. - commit_file.close(); - logger.info("Removed {} snapshot from commit_file", n); - - for (auto i = 0; i < n; i++) { - auto res = std::remove(lines[i].c_str()); - if (res == 0) { - logger.info("Succesfully deleted snapshot file \"{}\"", lines[i]); - } else { - logger.error("Error {} occured while deleting snapshot file \"{}\"", - res, lines[i]); - } - } - - } else { - logger.error("Error {} occured while flushing commit file", res); - } - } - - logger.info("Finished cleaning commit_file"); -} - -bool SnapshotEngine::make_snapshot(std::time_t now, const char *type) { - bool success = false; - - auto snapshot_file_name = snapshot_file(now, type); - - logger.info("Writing {} snapshot to file \"{}\"", type, snapshot_file_name); - - DbTransaction t(db); - - try { - std::ofstream snapshot_file(snapshot_file_name, - std::fstream::binary | std::fstream::trunc); - - SnapshotEncoder snap(snapshot_file); - - auto old_trans = - tx::TransactionRead(db.tx_engine); // Overenginered for incremental - // snapshot. Can be removed. - - // Everything is ready for creation of snapshot. - snapshot(t, snap, old_trans); - - auto res = sys::flush_file_to_disk(snapshot_file); - if (res == 0) { - // Snapshot was succesfully written to disk. - t.trans.commit(); - success = true; - - } else { - logger.error("Error {} occured while flushing snapshot file", res); - t.trans.abort(); - } - - } catch (const std::exception &e) { - logger.error("Exception occured while creating {} snapshot", type); - logger.error("{}", e.what()); - - t.trans.abort(); - } - - if (success) { - // Snapshot was succesfully created but for it to be reachable for - // import whe must add it to the end of commit file. - std::ofstream commit_file(snapshot_commit_file(), std::fstream::app); - - commit_file << snapshot_file_name << std::endl; - - auto res = sys::flush_file_to_disk(commit_file); - if (res == 0) { - commit_file.close(); - snapshoted_no_v.fetch_add(1); - // Snapshot was succesfully commited. - - } else { - logger.error("Error {} occured while flushing commit file", res); - } - } - - return success; -} - -bool SnapshotEngine::import() { - std::lock_guard lock(guard); - - logger.info("Started import"); - bool success = false; - - try { - std::ifstream commit_file(snapshot_commit_file()); - - // Whe first load all known snpashot file names from commit file. - std::vector snapshots; - std::string line; - while (std::getline(commit_file, line)) { - snapshots.push_back(line); - } - - while (snapshots.size() > 0) { - logger.info("Importing data from snapshot \"{}\"", snapshots.back()); - - DbAccessor t(db); - - try { - std::ifstream snapshot_file(snapshots.back(), std::fstream::binary); - SnapshotDecoder decoder(snapshot_file); - - auto indexes = snapshot_load(t, decoder); - if (t.commit()) { - logger.info("Succesfully imported snapshot \"{}\"", snapshots.back()); - add_indexes(indexes); - success = true; - break; - - } else { - logger.info( - "Unuccesfully tryed to import snapshot " - "\"{}\"", - snapshots.back()); - } - - } catch (const std::exception &e) { - logger.error("Error occured while importing snapshot \"{}\"", - snapshots.back()); - logger.error("{}", e.what()); - t.abort(); - } - - snapshots.pop_back(); - // Whe will try to import older snapashot if such one exist. - } - - } catch (const std::exception &e) { - logger.error("Error occured while importing snapshot"); - logger.error("{}", e.what()); - } - - logger.info("Finished import"); - - return success; -} - -void SnapshotEngine::snapshot(DbTransaction const &dt, SnapshotEncoder &snap, - tx::TransactionRead const &old_trans) { - Db &db = dt.db; - DbAccessor t(db, dt.trans); - - // Anounce property names - for (auto &family : db.graph.vertices.property_family_access()) { - snap.property_name_init(family.first); - } - for (auto &family : db.graph.edges.property_family_access()) { - snap.property_name_init(family.first); - } - - // Anounce label names - for (auto &labels : db.graph.label_store.access()) { - snap.label_name_init(labels.first.to_string()); - } - - // Anounce edge_type names - for (auto &et : db.graph.edge_type_store.access()) { - snap.edge_type_name_init(et.first.to_string()); - } - - // Store vertices - snap.start_vertices(); - t.vertex_access() - .fill() - .filter([&](auto va) { return !va.is_visble_to(old_trans); }) - .for_all([&](auto va) { serialization::serialize_vertex(va, snap); }); - - // Store edges - snap.start_edges(); - t.edge_access() - .fill() - .filter([&](auto va) { return !va.is_visble_to(old_trans); }) - .for_all([&](auto ea) { serialization::serialize_edge(ea, snap); }); - - // Store info on existing indexes. - snap.start_indexes(); - db.indexes().vertex_indexes([&](auto &i) { snap.index(i.definition()); }); - db.indexes().edge_indexes([&](auto &i) { snap.index(i.definition()); }); - - snap.end(); -} - -std::vector SnapshotEngine::snapshot_load( - DbAccessor &t, SnapshotDecoder &snap) { - std::unordered_map vertices; - - // Load names - snap.load_init(); - - // Load vertices - snap.begin_vertices(); - size_t v_count = 0; - while (!snap.end_vertices()) { - vertices.insert(serialization::deserialize_vertex(t, snap)); - v_count++; - } - logger.info("Loaded {} vertices", v_count); - - // Load edges - snap.begin_edges(); - size_t e_count = 0; - while (!snap.end_edges()) { - serialization::deserialize_edge(t, snap, vertices); - e_count++; - } - logger.info("Loaded {} edges", e_count); - - // Load indexes - snap.start_indexes(); - std::vector indexes; - while (!snap.end()) { - indexes.push_back(snap.load_index()); - } - - return indexes; -} - -void SnapshotEngine::add_indexes(std::vector &v) { - logger.info("Adding: {} indexes", v.size()); - for (auto id : v) { - // TODO: It is alright for now to ignore if add_index return false. I am - // not even sure if false should stop snapshot loading. - if (!db.indexes().add_index(id)) { - logger.warn( - "Failed to add index, but still continuing with " - "loading snapshot"); - } - } -} - -std::string SnapshotEngine::snapshot_file(std::time_t const &now, - const char *type) { - // Current nano time less than second. - auto now_nano = std::chrono::time_point_cast( - std::chrono::high_resolution_clock::now()) - .time_since_epoch() - .count() % - (1000 * 1000 * 1000); - - return snapshot_db_dir() + "/" + std::to_string(now) + "_" + - std::to_string(now_nano) + "_" + type; -} - -std::string SnapshotEngine::snapshot_commit_file() { - return snapshot_db_dir() + "/snapshot_commit.txt"; -} - -std::string SnapshotEngine::snapshot_db_dir() { - if (!sys::ensure_directory_exists(snapshot_folder)) { - logger.error("Error while creating directory \"{}\"", snapshot_folder); - } - - auto db_path = snapshot_folder + "/" + db.name(); - if (!sys::ensure_directory_exists(db_path)) { - logger.error("Error while creating directory \"{}\"", db_path); - } - - return db_path; -} diff --git a/src/snapshot/snapshot_engine.hpp b/src/snapshot/snapshot_engine.hpp deleted file mode 100644 index ea63b1607..000000000 --- a/src/snapshot/snapshot_engine.hpp +++ /dev/null @@ -1,72 +0,0 @@ -#pragma once - -#include -#include - -#include "logging/default.hpp" -#include "storage/indexes/index_definition.hpp" -#include "transactions/transaction.hpp" - -class SnapshotEncoder; -class SnapshotDecoder; -class Db; -class DbTransaction; -class DbAccessor; - -// Captures snapshots. Only one per database should exist. -class SnapshotEngine { - public: - SnapshotEngine(Db &db); - - ~SnapshotEngine() = default; - - // Returns number of succesffuly created snapshots. - size_t snapshoted_no() { return snapshoted_no_v.load(); } - - // Imports latest snapshot into the databse. Blocks until other calls don't - // end. - bool import(); - - // Makes snapshot of given type. Blocks until other calls don't end. - bool make_snapshot(); - - private: - // Removes excess of snapshots starting with oldest one. - void clean_snapshots(); - - // Makes snapshot of given type - bool make_snapshot(std::time_t now, const char *type); - - // Makes snapshot. It only saves records which have changed since old_trans. - void snapshot(DbTransaction const &dt, SnapshotEncoder &snap, - tx::TransactionRead const &old_trans); - - // Loads snapshot. True if success. Returns indexes which were in snapshot. - std::vector snapshot_load(DbAccessor &t, - SnapshotDecoder &snap); - - // Adds indexes. Should be called outside transactions. - void add_indexes(std::vector &v); - - // Will return different name on every call. - std::string snapshot_file(std::time_t const &now, const char *type); - - // Returns name of snapshot commit file. - std::string snapshot_commit_file(); - - // Path to directory of database. Ensures that all necessary directorys - // exist. - std::string snapshot_db_dir(); - - Logger logger; - - Db &db; - std::mutex guard; - const std::string snapshot_folder; - - // Determines how many newest snapshot will be preserved, while the other - // ones will be deleted. - const size_t max_retained_snapshots; - - std::atomic snapshoted_no_v = {0}; -}; diff --git a/src/snapshot/snapshoter.cpp b/src/snapshot/snapshoter.cpp deleted file mode 100644 index d8c5ab286..000000000 --- a/src/snapshot/snapshoter.cpp +++ /dev/null @@ -1,62 +0,0 @@ -#include "snapshot/snapshoter.hpp" - -#include "database/db_accessor.hpp" -#include "logging/default.hpp" -#include "snapshot/snapshot_decoder.hpp" -#include "snapshot/snapshot_encoder.hpp" -#include "storage/indexes/indexes.hpp" -#include "threading/thread.hpp" -#include "utils/sys.hpp" - -Snapshoter::Snapshoter(ConcurrentMap &dbs, - size_t snapshot_cycle) - : snapshot_cycle(snapshot_cycle), dbms(dbs) { - // Start snapshoter thread. - thread = std::make_unique([&]() { - logger = logging::log->logger("Snapshoter"); - logger.info("Started with snapshoot cycle of {} sec", this->snapshot_cycle); - - try { - run(); - } catch (const std::exception &e) { - logger.error("Irreversible error occured in snapshoter"); - logger.error("{}", e.what()); - } - - logger.info("Shutting down snapshoter"); - }); -} - -Snapshoter::~Snapshoter() { - snapshoting.store(false, std::memory_order_release); - thread.get()->join(); -} - -void Snapshoter::run() { - std::time_t last_snapshot = std::time(nullptr); - - while (snapshoting.load(std::memory_order_acquire)) { - std::time_t now = std::time(nullptr); - - if (now >= last_snapshot + snapshot_cycle) { - // It's time for snapshot - make_snapshots(); - - last_snapshot = now; - - } else { - // It isn't time for snapshot so i should wait. - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - } -} - -void Snapshoter::make_snapshots() { - logger.info("Started snapshoting cycle"); - - for (auto &db : dbms.access()) { - db.second.snap_engine.make_snapshot(); - } - - logger.info("Finished snapshoting cycle"); -} diff --git a/src/snapshot/snapshoter.hpp b/src/snapshot/snapshoter.hpp deleted file mode 100644 index d2931f364..000000000 --- a/src/snapshot/snapshoter.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#include - -#include "database/graph_db.hpp" -#include "logging/default.hpp" -#include "threading/thread.hpp" - -class SnapshotEncoder; -class SnapshotDecoder; - -// Captures snapshots. -class Snapshoter { - public: - // How much sec is between snapshots - // snapshot_folder is path to common folder for all snapshots. - Snapshoter(ConcurrentMap &dbs, size_t snapshot_cycle); - - ~Snapshoter(); - - private: - void run(); - - // Makes snapshot of given type - void make_snapshots(); - - Logger logger; - - const size_t snapshot_cycle; - std::unique_ptr thread = {nullptr}; - ConcurrentMap &dbms; - std::atomic snapshoting = {true}; -};