Added logic for cleaning older snapshots.
This commit is contained in:
parent
cd528b9bd6
commit
efbae39f41
@ -5,3 +5,4 @@ template_cpu_hpp_path: "./template/template_code_cpu.hpp"
|
|||||||
snapshots_path: "./snapshots"
|
snapshots_path: "./snapshots"
|
||||||
cleaning_cycle_sec: "60"
|
cleaning_cycle_sec: "60"
|
||||||
snapshot_cycle_sec: "60"
|
snapshot_cycle_sec: "60"
|
||||||
|
max_retained_snapshots: "3"
|
||||||
|
@ -25,6 +25,7 @@ constexpr const char *BARRIER_TEMPLATE_CPU_CPP_PATH =
|
|||||||
constexpr const char *SNAPSHOTS_PATH = "snapshots_path";
|
constexpr const char *SNAPSHOTS_PATH = "snapshots_path";
|
||||||
constexpr const char *CLEANING_CYCLE_SEC = "cleaning_cycle_sec";
|
constexpr const char *CLEANING_CYCLE_SEC = "cleaning_cycle_sec";
|
||||||
constexpr const char *SNAPSHOT_CYCLE_SEC = "snapshot_cycle_sec";
|
constexpr const char *SNAPSHOT_CYCLE_SEC = "snapshot_cycle_sec";
|
||||||
|
constexpr const char *MAX_RETAINED_SNAPSHOTS = "max_retained_snapshots";
|
||||||
// -- all possible Memgraph's keys --
|
// -- all possible Memgraph's keys --
|
||||||
|
|
||||||
inline size_t to_int(std::string &&s) { return stoull(s); }
|
inline size_t to_int(std::string &&s) { return stoull(s); }
|
||||||
|
@ -32,6 +32,9 @@ public:
|
|||||||
bool make_snapshot();
|
bool make_snapshot();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
// Removes excess of snapshots starting with oldest one.
|
||||||
|
void clean_snapshots();
|
||||||
|
|
||||||
// Makes snapshot of given type
|
// Makes snapshot of given type
|
||||||
bool make_snapshot(std::time_t now, const char *type);
|
bool make_snapshot(std::time_t now, const char *type);
|
||||||
|
|
||||||
@ -55,5 +58,10 @@ private:
|
|||||||
Db &db;
|
Db &db;
|
||||||
std::mutex guard;
|
std::mutex guard;
|
||||||
const std::string snapshot_folder;
|
const std::string snapshot_folder;
|
||||||
|
|
||||||
|
// Determines how many newest snapshot will be preserved, while the other
|
||||||
|
// ones will be deleted.
|
||||||
|
const size_t max_retained_snapshots;
|
||||||
|
|
||||||
std::atomic<size_t> snapshoted_no_v = {0};
|
std::atomic<size_t> snapshoted_no_v = {0};
|
||||||
};
|
};
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
SnapshotEngine::SnapshotEngine(Db &db, std::string const &name)
|
SnapshotEngine::SnapshotEngine(Db &db, std::string const &name)
|
||||||
: snapshot_folder(CONFIG(config::SNAPSHOTS_PATH)), db(db),
|
: snapshot_folder(CONFIG(config::SNAPSHOTS_PATH)), db(db),
|
||||||
|
max_retained_snapshots(CONFIG_INTEGER(config::MAX_RETAINED_SNAPSHOTS)),
|
||||||
logger(logging::log->logger("SnapshotEngine db[" + name + "]"))
|
logger(logging::log->logger("SnapshotEngine db[" + name + "]"))
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -19,7 +20,58 @@ bool SnapshotEngine::make_snapshot()
|
|||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(guard);
|
std::lock_guard<std::mutex> lock(guard);
|
||||||
std::time_t now = std::time(nullptr);
|
std::time_t now = std::time(nullptr);
|
||||||
return make_snapshot(now, "full");
|
if (make_snapshot(now, "full")) {
|
||||||
|
clean_snapshots();
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SnapshotEngine::clean_snapshots()
|
||||||
|
{
|
||||||
|
logger.info("Started cleaning commit_file");
|
||||||
|
std::vector<std::string> lines;
|
||||||
|
{
|
||||||
|
std::ifstream commit_file(snapshot_commit_file());
|
||||||
|
|
||||||
|
std::string line;
|
||||||
|
while (std::getline(commit_file, line)) {
|
||||||
|
lines.push_back(line);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int n = lines.size() - max_retained_snapshots;
|
||||||
|
if (n > 0) {
|
||||||
|
std::ofstream commit_file(snapshot_commit_file(), std::fstream::trunc);
|
||||||
|
|
||||||
|
for (auto i = n; i < lines.size(); i++) {
|
||||||
|
commit_file << lines[i] << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto res = sys::flush_file_to_disk(commit_file);
|
||||||
|
if (res == 0) {
|
||||||
|
commit_file.close();
|
||||||
|
logger.info("Removed {} snapshot from commit_file", n);
|
||||||
|
|
||||||
|
for (auto i = 0; i < n; i++) {
|
||||||
|
auto res = std::remove(lines[i].c_str());
|
||||||
|
if (res == 0) {
|
||||||
|
logger.info("Succesfully deleted snapshot file \"{}\"",
|
||||||
|
lines[i]);
|
||||||
|
} else {
|
||||||
|
logger.error(
|
||||||
|
"Error {} occured while deleting snapshot file \"{}\"",
|
||||||
|
res, lines[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
logger.error("Error {} occured while flushing commit file", res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Finished cleaning commit_file");
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SnapshotEngine::make_snapshot(std::time_t now, const char *type)
|
bool SnapshotEngine::make_snapshot(std::time_t now, const char *type)
|
||||||
@ -38,7 +90,9 @@ bool SnapshotEngine::make_snapshot(std::time_t now, const char *type)
|
|||||||
|
|
||||||
SnapshotEncoder snap(snapshot_file);
|
SnapshotEncoder snap(snapshot_file);
|
||||||
|
|
||||||
auto old_trans = tx::TransactionRead(db.tx_engine);
|
auto old_trans =
|
||||||
|
tx::TransactionRead(db.tx_engine); // Overenginered for incremental
|
||||||
|
// snapshot. Can be removed.
|
||||||
snapshot(t, snap, old_trans);
|
snapshot(t, snap, old_trans);
|
||||||
|
|
||||||
auto res = sys::flush_file_to_disk(snapshot_file);
|
auto res = sys::flush_file_to_disk(snapshot_file);
|
||||||
|
Loading…
Reference in New Issue
Block a user