Cleanup durability config, docs, CHANGELOG

Reviewers: teon.banek, buda, mislav.bradac, dgleich

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D992
This commit is contained in:
florijan 2017-11-20 11:58:05 +01:00
parent df4cbdc5b2
commit 8bbf1af525
27 changed files with 336 additions and 306 deletions

View File

@ -5,9 +5,11 @@
### Breaking Changes
* Snapshot format changed (not backward compatible).
* Snapshot configuration flags changed, general durability flags added.
### Major Features and Improvements
* Write-ahead log added.
* `nodes` and `relationships` functions added.
### Bug Fixes and Other Changes

View File

@ -192,6 +192,7 @@ set(memgraph_src_files
${src_dir}/communication/reactor/reactor_distributed.cpp
${src_dir}/data_structures/concurrent/skiplist_gc.cpp
${src_dir}/database/graph_db.cpp
${src_dir}/database/graph_db_config.cpp
${src_dir}/database/graph_db_accessor.cpp
${src_dir}/durability/recovery.cpp
${src_dir}/durability/snapshooter.cpp

View File

@ -3,34 +3,17 @@
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# directory to the folder with snapshots
--snapshot-directory=snapshots
# no durability
--durability-enabled=false
--snapshot-on-exit=false
--db-recover-on-startup=false
# cleaning cycle interval
# if set to -1 the GC will not run
# no GC
--gc-cycle-sec=-1
# skiplist gc cycle interval
# if set to 0 the GC will not run
--skiplist_gc_interval=0
# snapshot cycle interval
# if set to -1 the snapshooter will not run
--snapshot-cycle-sec=-1
# snapshot cycle interval
# if set to -1 the snapshooter will not run
# no query execution time limit
--query_execution_time_sec=-1
# create snapshot disabled on db exit
--snapshot-on-exit=false
# max number of snapshots which will be kept on the disk at some point
# if set to -1 the max number of snapshots is unlimited
--snapshot-max-retained=-1
# database recovering is disabled by default
--snapshot-recover-on-startup=false
# number of workers
--num-workers=1

View File

@ -48,29 +48,33 @@
# downside of caching by evicting old plans after the given time.
#--query-plan-cache-ttl=60
## Snapshot
## Durability
#
# Snapshots store the database state to persistent storage. Snapshots are
# taken in intervals and can be used to restore the database to a previous
# state.
# Memgraph can store database state to persistent storage. Two mechanisms
# are used: snapshots store the total current database state while write-ahead
# logs store small changes incrementally. They are used in tandem to provide
# fast and storage-efficient persistence. Some aspects of snapshot taking
# are configurable, while write-ahead logging is pre-configured for optimal
# performance.
--durability-enabled=true
# Interval of taking snapshots, in seconds. If set to -1, snapshot feature
# Path to the directory where snapshots and write-ahead log files will be stored.
--durability-directory=/var/lib/memgraph/durability
# Recover the database on startup.
--db-recover-on-startup=true
# Interval of taking snapshots, in seconds. If set to -1, the snapshot feature
# will be turned off.
--snapshot-cycle-sec=300
# Create a snapshot when closing Memgraph.
--snapshot-on-exit=true
# Path to the directory where snapshots will be stored.
--snapshot-directory=/var/lib/memgraph/snapshots
# Maximum number of kept snapshots. Old snapshots will be deleted to make room
# for new ones. If set to -1, the number of kept snapshots is unlimited.
--snapshot-max-retained=3
# Recover the database from the latest snapshot on startup.
--snapshot-recover-on-startup=true
## Logging
# Path to where the log should be stored.

View File

@ -3,26 +3,13 @@
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# directory to the folder with snapshots
--snapshot-directory=snapshots
# cleaning cycle interval
# if set to -1 the GC will not run
--gc-cycle-sec=30
# snapshot cycle interval
# if set to -1 the snapshooter will not run
# enable durability
--durability-enabled=true
--snapshot-cycle-sec=600
# create snapshot enabled on db exit
--snapshot-on-exit=true
# max number of snapshots which will be kept on the disk at some point
# if set to -1 the max number of snapshots is unlimited
--snapshot-max-retained=1
# database recovering is disabled by default
--snapshot-recover-on-startup=false
--db-recover-on-startup=false
# increase query timeout (10 min)
--query-execution-time-sec=600

View File

@ -3,27 +3,7 @@
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# directory to the folder with snapshots
--snapshot-directory=snapshots
# cleaning cycle interval
# if set to -1 the GC will not run
--gc-cycle-sec=30
# snapshot cycle interval
# if set to -1 the snapshooter will not run
--snapshot-cycle-sec=-1
# create snapshot disabled on db exit
# no durability
--durability-enabled=false
--snapshot-on-exit=false
# disable WAL
--wal-flush-interval-millis=-1
# max number of snapshots which will be kept on the disk at some point
# if set to -1 the max number of snapshots is unlimited
--snapshot-max-retained=-1
# database recovering is disabled by default
--snapshot-recover-on-startup=false
--db-recover-on-startup=false

View File

@ -110,11 +110,12 @@ parameters:
--query-execution-time-sec | integer | 30 | Maximum allowed query execution time, in seconds. <br/>Queries exceeding this limit will be aborted. Value of -1 means no limit.
--query-plan-cache | bool | true | Cache generated query plans.
--query-plan-cache-ttl | int | 60 | Time to live for cached query plans, in seconds.
--snapshot-cycle-sec | integer | 300 | Interval (seconds) between database snapshots.<br/>Value of -1 turns taking snapshots off.
--snapshot-on-exit | bool | true | Make a snapshot when closing Memgraph.
--snapshot-directory | string | "/var/lib/memgraph/snapshots" | Path to the directory where snapshots will be stored.
--durability-enabled | bool | true | If database state persistence is enabled (snapshot and write-ahead log).
--durability-directory | string | "/var/lib/memgraph/durability" | Path to the directory where durability files will be stored.
--db-recover-on-startup | bool | true | Recover the database on startup (from snapshots and write-ahead logs).
--snapshot-cycle-sec | integer | 300 | Interval between database snapshots, in seconds.
--snapshot-max-retained | integer | 3 | Number of retained snapshots.<br/>Value -1 means without limit.
--snapshot-recover-on-startup | bool | true | Recover the database on startup using the last<br/>stored snapshot.
--snapshot-on-exit | bool | true | Make a snapshot when closing Memgraph.
--log-file | string | "/var/log/memgraph/memgraph.log" | Path to where the log should be stored.
--also-log-to-stderr | bool | false | If `true`, log messages will go to stderr in addition to logfiles.
--flag-file | string | "" | Path to a file containing additional configuration settings.

View File

@ -1,66 +1,49 @@
#include <experimental/filesystem>
#include <functional>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "database/creation_exception.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/paths.hpp"
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
#include "storage/edge.hpp"
#include "storage/garbage_collector.hpp"
#include "utils/timer.hpp"
bool ValidateSnapshotDirectory(const char *flagname, const std::string &value) {
if (fs::exists(value) && !fs::is_directory(value)) {
std::cout << "The snapshot directory path '" << value
<< "' is not a directory!" << std::endl;
return false;
}
return true;
}
namespace fs = std::experimental::filesystem;
DEFINE_int32(gc_cycle_sec, 30,
"Amount of time between starts of two cleaning cycles in seconds. "
"-1 to turn off.");
DEFINE_int32(snapshot_max_retained, -1,
"Number of retained snapshots, -1 means without limit.");
DEFINE_int32(snapshot_cycle_sec, -1,
"Amount of time between starts of two snapshooters in seconds. -1 "
"to turn off.");
DEFINE_int32(query_execution_time_sec, 180,
"Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of -1 means no limit.");
DEFINE_bool(snapshot_on_exit, false, "Snapshot on exiting the database.");
DEFINE_string(snapshot_directory, "snapshots",
"Path to directory in which to save snapshots.");
DEFINE_validator(snapshot_directory, &ValidateSnapshotDirectory);
DEFINE_bool(snapshot_recover_on_startup, false, "Recover database on startup.");
GraphDb::GraphDb()
: gc_vertices_(vertices_, vertex_record_deleter_,
GraphDb::GraphDb(GraphDb::Config config)
: config_(config),
gc_vertices_(vertices_, vertex_record_deleter_,
vertex_version_list_deleter_),
gc_edges_(edges_, edge_record_deleter_, edge_version_list_deleter_) {
gc_edges_(edges_, edge_record_deleter_, edge_version_list_deleter_),
wal_{config.durability_directory, config.durability_enabled} {
// Pause of -1 means we shouldn't run the GC.
if (FLAGS_gc_cycle_sec != -1) {
gc_scheduler_.Run(std::chrono::seconds(FLAGS_gc_cycle_sec),
if (config.gc_cycle_sec != -1) {
gc_scheduler_.Run(std::chrono::seconds(config.gc_cycle_sec),
[this]() { CollectGarbage(); });
}
if (FLAGS_snapshot_recover_on_startup)
durability::Recover(FLAGS_snapshot_directory, *this);
wal_.Enable();
// If snapshots are enabled we need the durability dir.
if (config.durability_enabled)
durability::CheckDurabilityDir(config.durability_directory);
if (config.db_recover_on_startup)
durability::Recover(config.durability_directory, *this);
if (config.durability_enabled) wal_.Enable();
StartSnapshooting();
if (FLAGS_query_execution_time_sec != -1) {
if (config.query_execution_time_sec != -1) {
transaction_killer_.Run(
std::chrono::seconds(
std::max(1, std::min(5, FLAGS_query_execution_time_sec / 4))),
std::max(1, std::min(5, config.query_execution_time_sec / 4))),
[this]() {
tx_engine_.ForEachActiveTransaction([](tx::Transaction &t) {
tx_engine_.ForEachActiveTransaction([this](tx::Transaction &t) {
if (t.creation_time() +
std::chrono::seconds(FLAGS_query_execution_time_sec) <
std::chrono::seconds(config_.query_execution_time_sec) <
std::chrono::steady_clock::now()) {
t.set_should_abort();
};
@ -75,16 +58,17 @@ void GraphDb::Shutdown() {
}
void GraphDb::StartSnapshooting() {
if (FLAGS_snapshot_cycle_sec != -1) {
if (config_.durability_enabled) {
auto create_snapshot = [this]() -> void {
GraphDbAccessor db_accessor(*this);
if (!durability::MakeSnapshot(db_accessor, fs::path(FLAGS_snapshot_directory),
FLAGS_snapshot_max_retained)) {
if (!durability::MakeSnapshot(db_accessor,
fs::path(config_.durability_directory),
config_.snapshot_max_retained)) {
LOG(WARNING) << "Durability: snapshot creation failed";
}
db_accessor.Commit();
};
snapshot_creator_.Run(std::chrono::seconds(FLAGS_snapshot_cycle_sec),
snapshot_creator_.Run(std::chrono::seconds(config_.snapshot_cycle_sec),
create_snapshot);
}
}
@ -152,12 +136,12 @@ GraphDb::~GraphDb() {
transaction_killer_.Stop();
// Create last database snapshot
if (FLAGS_snapshot_on_exit == true) {
if (config_.snapshot_on_exit == true) {
GraphDbAccessor db_accessor(*this);
LOG(INFO) << "Creating snapshot on shutdown..." << std::endl;
const bool status = durability::MakeSnapshot(
db_accessor, fs::path(FLAGS_snapshot_directory),
FLAGS_snapshot_max_retained);
db_accessor, fs::path(config_.durability_directory),
config_.snapshot_max_retained);
if (status) {
std::cout << "Snapshot created successfully." << std::endl;
} else {

View File

@ -1,7 +1,5 @@
#pragma once
#include <thread>
#include "cppitertools/filter.hpp"
#include "cppitertools/imap.hpp"
@ -21,8 +19,6 @@
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
namespace fs = std::experimental::filesystem;
/**
* Main class which represents Database concept in code.
* This class is essentially a data structure. It exposes
@ -51,7 +47,23 @@ namespace fs = std::experimental::filesystem;
*/
class GraphDb {
public:
GraphDb();
/// GraphDb configuration. Initialized from flags, but modifiable.
struct Config {
Config();
// Durability flags.
bool durability_enabled;
std::string durability_directory;
bool db_recover_on_startup;
int snapshot_cycle_sec;
int snapshot_max_retained;
int snapshot_on_exit;
// Misc flags.
int gc_cycle_sec;
int query_execution_time_sec;
};
explicit GraphDb(Config config = Config{});
/** Delete all vertices and edges and free all deferred deleters. */
~GraphDb();
@ -74,6 +86,8 @@ class GraphDb {
void StartSnapshooting();
Config config_;
/** transaction engine related to this database */
tx::Engine tx_engine_;

View File

@ -0,0 +1,44 @@
#include <experimental/filesystem>
#include <iostream>
#include <limits>
#include <gflags/gflags.h>
#include "database/graph_db.hpp"
#include "utils/flag_validation.hpp"
namespace fs = std::experimental::filesystem;
// TODO review: tech docs say the default here is 'true', which it is in the
// community config. Should we set the default here to true? On some other
// points the tech docs are consistent with community config, and not with these
// defaults.
DEFINE_bool(durability_enabled, false,
"If durability (database persistence) should be enabled");
DEFINE_string(
durability_directory, "durability",
"Path to directory in which to save snapshots and write-ahead log files.");
DEFINE_bool(db_recover_on_startup, false, "Recover database on startup.");
DEFINE_VALIDATED_int32(
snapshot_cycle_sec, 3600,
"Amount of time between two snapshots, in seconds (min 60).",
FLAG_IN_RANGE(1, std::numeric_limits<int32_t>::max()));
DEFINE_int32(snapshot_max_retained, -1,
"Number of retained snapshots, -1 means without limit.");
DEFINE_bool(snapshot_on_exit, false, "Snapshot on exiting the database.");
DEFINE_int32(gc_cycle_sec, 30,
"Amount of time between starts of two cleaning cycles in seconds. "
"-1 to turn off.");
DEFINE_int32(query_execution_time_sec, 180,
"Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of -1 means no limit.");
GraphDb::Config::Config()
: durability_enabled{FLAGS_durability_enabled},
durability_directory{FLAGS_durability_directory},
db_recover_on_startup{FLAGS_db_recover_on_startup},
snapshot_cycle_sec{FLAGS_snapshot_cycle_sec},
snapshot_max_retained{FLAGS_snapshot_max_retained},
snapshot_on_exit{FLAGS_snapshot_on_exit},
gc_cycle_sec{FLAGS_gc_cycle_sec},
query_execution_time_sec{FLAGS_query_execution_time_sec} {}

26
src/durability/paths.hpp Normal file
View File

@ -0,0 +1,26 @@
#pragma once
#include <experimental/filesystem>
#include <string>
#include "glog/logging.h"
namespace durability {
const std::string kSnapshotDir = "snapshots";
const std::string kWalDir = "wal";
/// Ensures the given durability directory exists and is ready for use. Creates
/// the directory if it doesn't exist.
inline void CheckDurabilityDir(const std::string &durability_dir) {
namespace fs = std::experimental::filesystem;
if (fs::exists(durability_dir)) {
CHECK(fs::is_directory(durability_dir)) << "The durability directory path '"
<< durability_dir
<< "' is not a directory!";
} else {
bool success = fs::create_directory(durability_dir);
CHECK(success) << "Failed to create durability directory '"
<< durability_dir << "'.";
}
}
}

View File

@ -6,6 +6,7 @@
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/paths.hpp"
#include "durability/version.hpp"
#include "durability/wal.hpp"
#include "query/typed_value.hpp"
@ -222,11 +223,12 @@ std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
}
// TODO - finer-grained recovery feedback could be useful here.
bool RecoverWal(GraphDbAccessor &db_accessor, RecoveryData &recovery_data) {
bool RecoverWal(const fs::path &wal_dir, GraphDbAccessor &db_accessor,
RecoveryData &recovery_data) {
// Get paths to all the WAL files and sort them (on date).
std::vector<fs::path> wal_files;
if (!fs::exists(FLAGS_wal_directory)) return true;
for (auto &wal_file : fs::directory_iterator(FLAGS_wal_directory))
if (!fs::exists(wal_dir)) return true;
for (auto &wal_file : fs::directory_iterator(wal_dir))
wal_files.emplace_back(wal_file);
std::sort(wal_files.begin(), wal_files.end());
@ -335,11 +337,12 @@ bool RecoverWal(GraphDbAccessor &db_accessor, RecoveryData &recovery_data) {
}
} // anonymous namespace
bool Recover(const fs::path &snapshot_dir, GraphDb &db) {
bool Recover(const fs::path &durability_dir, GraphDb &db) {
RecoveryData recovery_data;
// Attempt to recover from snapshot files in reverse order (from newest
// backwards).
const auto snapshot_dir = durability_dir / kSnapshotDir;
std::vector<fs::path> snapshot_files;
if (fs::exists(snapshot_dir) && fs::is_directory(snapshot_dir))
for (auto &file : fs::directory_iterator(snapshot_dir))
@ -365,7 +368,7 @@ bool Recover(const fs::path &snapshot_dir, GraphDb &db) {
// WAL recovery does not have to be complete for the recovery to be
// considered successful. For the time being ignore the return value,
// consider a better system.
RecoverWal(db_accessor, recovery_data);
RecoverWal(durability_dir / kWalDir, db_accessor, recovery_data);
db_accessor.Commit();
// Index recovery.

View File

@ -17,14 +17,14 @@ bool ReadSnapshotSummary(HashedFileReader &buffer, int64_t &vertex_count,
int64_t &edge_count, uint64_t &hash);
/**
* Recovers database from snapshot_file. If recovering fails, false is returned
* Recovers database from durability. If recovering fails, false is returned
* and db_accessor aborts transaction, else true is returned and transaction is
* commited.
*
* @param snapshot_dir - Path to snapshot directory.
* @param durability_dir - Path to durability directory.
* @param db - The database to recover into.
* @return - If recovery was succesful.
*/
bool Recover(const std::experimental::filesystem::path &snapshot_dir,
bool Recover(const std::experimental::filesystem::path &durability_dir,
GraphDb &db);
}

View File

@ -7,9 +7,12 @@
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_writer.hpp"
#include "durability/paths.hpp"
#include "durability/version.hpp"
#include "utils/datetime/timestamp.hpp"
namespace fs = std::experimental::filesystem;
namespace durability {
namespace {
@ -67,11 +70,11 @@ bool Encode(const fs::path &snapshot_file, GraphDbAccessor &db_accessor_) {
return true;
}
void MaintainMaxRetainedFiles(const fs::path &snapshot_folder,
void MaintainMaxRetainedFiles(const fs::path &snapshot_dir,
int snapshot_max_retained) {
if (snapshot_max_retained == -1) return;
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(snapshot_folder))
for (auto &file : fs::directory_iterator(snapshot_dir))
files.push_back(file.path());
if (static_cast<int>(files.size()) <= snapshot_max_retained) return;
sort(files.begin(), files.end());
@ -83,25 +86,29 @@ void MaintainMaxRetainedFiles(const fs::path &snapshot_folder,
}
} // annonnymous namespace
fs::path MakeSnapshotPath(const fs::path &snapshot_folder) {
fs::path MakeSnapshotPath(const fs::path &durability_dir) {
std::string date_str =
Timestamp(Timestamp::now())
.to_string("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
return snapshot_folder / date_str;
return durability_dir / kSnapshotDir / date_str;
}
bool MakeSnapshot(GraphDbAccessor &db_accessor_,
const fs::path &snapshot_folder,
bool MakeSnapshot(GraphDbAccessor &db_accessor_, const fs::path &durability_dir,
const int snapshot_max_retained) {
if (!fs::exists(snapshot_folder) &&
!fs::create_directories(snapshot_folder)) {
LOG(ERROR) << "Error while creating directory " << snapshot_folder;
return false;
}
const auto snapshot_file = MakeSnapshotPath(snapshot_folder);
auto ensure_dir = [](const auto &dir) {
if (!fs::exists(dir) && !fs::create_directories(dir)) {
LOG(ERROR) << "Error while creating directory " << dir;
return false;
}
return true;
};
if (!ensure_dir(durability_dir)) return false;
if (!ensure_dir(durability_dir / kSnapshotDir)) return false;
const auto snapshot_file = MakeSnapshotPath(durability_dir);
if (fs::exists(snapshot_file)) return false;
if (Encode(snapshot_file, db_accessor_)) {
MaintainMaxRetainedFiles(snapshot_folder, snapshot_max_retained);
MaintainMaxRetainedFiles(durability_dir / kSnapshotDir,
snapshot_max_retained);
return true;
} else {
std::error_code error_code; // Just for exception suppression.

View File

@ -9,17 +9,15 @@ using path = std::experimental::filesystem::path;
/** Generates a path for a DB snapshot in the given folder in a well-defined
* sortable format. */
path MakeSnapshotPath(const path &snapshot_folder);
// TODO review - move to paths.hpp?
path MakeSnapshotPath(const path &durability_dir);
/**
* Make snapshot and save it in snapshots folder. Returns true if successful.
* @param db_accessor:
* GraphDbAccessor used to access elements of GraphDb.
* @param snapshot_folder:
* folder where snapshots are stored.
* @param snapshot_max_retained:
* maximum number of snapshots stored in snapshot folder.
* @param db_accessor- GraphDbAccessor used to access elements of GraphDb.
* @param durability_dir - directory where durability data is stored.
* @param snapshot_max_retained - maximum number of snapshots to retain.
*/
bool MakeSnapshot(GraphDbAccessor &db_accessor, const path &snapshot_folder,
bool MakeSnapshot(GraphDbAccessor &db_accessor, const path &durability_dir,
int snapshot_max_retained);
}

View File

@ -1,22 +1,22 @@
#include "wal.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "durability/paths.hpp"
#include "utils/datetime/timestamp.hpp"
#include "utils/flag_validation.hpp"
DEFINE_int32(wal_flush_interval_millis, -1,
"Interval between two write-ahead log flushes, in milliseconds. "
"Set to -1 to disable the WAL.");
DEFINE_HIDDEN_int32(
wal_flush_interval_millis, 2,
"Interval between two write-ahead log flushes, in milliseconds.");
DEFINE_string(wal_directory, "wal",
"Directory in which the write-ahead log files are stored.");
DEFINE_HIDDEN_int32(
wal_rotate_ops_count, 10000,
"How many write-ahead ops should be stored in a single WAL file "
"before rotating it.");
DEFINE_int32(wal_rotate_ops_count, 10000,
"How many write-ahead ops should be stored in a single WAL file "
"before rotating it.");
DEFINE_VALIDATED_int32(wal_buffer_size, 4096, "Write-ahead log buffer size.",
FLAG_IN_RANGE(1, 1 << 30));
DEFINE_VALIDATED_HIDDEN_int32(wal_buffer_size, 4096,
"Write-ahead log buffer size.",
FLAG_IN_RANGE(1, 1 << 30));
namespace durability {
@ -146,8 +146,12 @@ std::experimental::optional<WriteAheadLog::Op> WriteAheadLog::Op::Decode(
#undef DECODE_MEMBER
WriteAheadLog::WriteAheadLog() {
if (FLAGS_wal_flush_interval_millis >= 0) {
WriteAheadLog::WriteAheadLog(
const std::experimental::filesystem::path &durability_dir,
bool durability_enabled)
: ops_{FLAGS_wal_buffer_size}, wal_file_{durability_dir} {
if (durability_enabled) {
CheckDurabilityDir(durability_dir);
wal_file_.Init();
scheduler_.Run(std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(ops_); });
@ -155,10 +159,9 @@ WriteAheadLog::WriteAheadLog() {
}
WriteAheadLog::~WriteAheadLog() {
if (FLAGS_wal_flush_interval_millis >= 0) {
scheduler_.Stop();
wal_file_.Flush(ops_);
}
// TODO review : scheduler.Stop() legal if it wasn't started?
scheduler_.Stop();
if (enabled_) wal_file_.Flush(ops_);
}
void WriteAheadLog::TxBegin(tx::transaction_id_t tx_id) {
@ -250,24 +253,28 @@ void WriteAheadLog::BuildIndex(tx::transaction_id_t tx_id,
Emplace(std::move(op));
}
WriteAheadLog::WalFile::WalFile(
const std::experimental::filesystem::path &durability_dir)
: wal_dir_{durability_dir / kWalDir} {}
WriteAheadLog::WalFile::~WalFile() {
if (!current_wal_file_.empty()) writer_.Close();
}
namespace {
auto MakeFilePath(const std::string &suffix) {
return std::experimental::filesystem::path(FLAGS_wal_directory) /
(Timestamp::now().to_iso8601() + suffix);
auto MakeFilePath(const std::experimental::filesystem::path &wal_dir,
const std::string &suffix) {
return wal_dir / (Timestamp::now().to_iso8601() + suffix);
}
}
void WriteAheadLog::WalFile::Init() {
if (!std::experimental::filesystem::exists(FLAGS_wal_directory) &&
!std::experimental::filesystem::create_directories(FLAGS_wal_directory)) {
LOG(ERROR) << "Can't write to WAL directory: " << FLAGS_wal_directory;
if (!std::experimental::filesystem::exists(wal_dir_) &&
!std::experimental::filesystem::create_directories(wal_dir_)) {
LOG(ERROR) << "Can't write to WAL directory: " << wal_dir_;
current_wal_file_ = std::experimental::filesystem::path();
} else {
current_wal_file_ = MakeFilePath("__current");
current_wal_file_ = MakeFilePath(wal_dir_, "__current");
try {
writer_.Open(current_wal_file_);
} catch (std::ios_base::failure &) {
@ -311,7 +318,8 @@ void WriteAheadLog::WalFile::RotateFile() {
writer_.Close();
std::experimental::filesystem::rename(
current_wal_file_,
MakeFilePath("__max_transaction_" + std::to_string(latest_tx_)));
MakeFilePath(wal_dir_,
"__max_transaction_" + std::to_string(latest_tx_)));
Init();
}

View File

@ -18,19 +18,6 @@
#include "transactions/type.hpp"
#include "utils/scheduler.hpp"
// The amount of time between two flushes of the write-ahead log,
// in milliseconds.
DECLARE_int32(wal_flush_interval_millis);
// Directory in which the WAL is dumped.
DECLARE_string(wal_directory);
// How many Ops are stored in a single WAL file.
DECLARE_int32(wal_rotate_ops_count);
// The WAL buffer size (number of ops in a buffer).
DECLARE_int32(wal_buffer_size);
namespace durability {
/** A database operation log for durability. Buffers and periodically serializes
@ -99,7 +86,8 @@ class WriteAheadLog {
communication::bolt::Decoder<HashedFileReader> &decoder);
};
WriteAheadLog();
WriteAheadLog(const std::experimental::filesystem::path &durability_dir,
bool durability_enabled);
~WriteAheadLog();
/** Enables the WAL. Called at the end of GraphDb construction, after
@ -130,6 +118,7 @@ class WriteAheadLog {
/** Groups the logic of WAL file handling (flushing, naming, rotating) */
class WalFile {
public:
WalFile(const std::experimental::filesystem::path &wal__dir);
~WalFile();
/** Initializes the WAL file. Must be called before first flush. Can be
@ -141,6 +130,7 @@ class WriteAheadLog {
void Flush(RingBuffer<Op> &buffer);
private:
const std::experimental::filesystem::path wal_dir_;
HashedFileWriter writer_;
communication::bolt::PrimitiveEncoder<HashedFileWriter> encoder_{writer_};
@ -159,7 +149,7 @@ class WriteAheadLog {
void RotateFile();
};
RingBuffer<Op> ops_{FLAGS_wal_buffer_size};
RingBuffer<Op> ops_;
Scheduler scheduler_;
WalFile wal_file_;
// Used for disabling the WAL during DB recovery.

View File

@ -32,9 +32,9 @@ class Memgraph:
default=get_absolute_path("memgraph", "build"))
argp.add_argument("--port", default="7687",
help="Database and client port")
argp.add_argument("--snapshot-directory", default=None)
argp.add_argument("--durability-directory", default=None)
argp.add_argument("--snapshot-on-exit", action="store_true")
argp.add_argument("--snapshot-recover-on-startup", action="store_true")
argp.add_argument("--db-recover-on-startup", action="store_true")
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.config = config
@ -48,11 +48,11 @@ class Memgraph:
database_args = ["--port", self.args.port]
if self.num_workers:
database_args += ["--num_workers", str(self.num_workers)]
if self.args.snapshot_directory:
database_args += ["--snapshot-directory",
self.args.snapshot_directory]
if self.args.snapshot_recover_on_startup:
database_args += ["--snapshot-recover-on-startup"]
if self.args.durability_directory:
database_args += ["--durability-directory",
self.args.durability_directory]
if self.args.db_recover_on_startup:
database_args += ["--db-recover-on-startup"]
if self.args.snapshot_on_exit:
database_args += ["--snapshot-on-exit"]

View File

@ -3,25 +3,17 @@
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# directory to the folder with snapshots
--snapshot-directory=snapshots
# recover from the 'durability' directory
--durability-directory=durability
--db-recover-on-startup=true
# but don't perform durability
--durability-enabled=false
--snapshot-on-exit=false
# cleaning cycle interval
# if set to -1 the GC will not run
--gc-cycle-sec=-1
# snapshot cycle interval
# if set to -1 the snapshooter will not run
--snapshot-cycle-sec=-1
# create snapshot disabled on db exit
--snapshot-on-exit=false
# max number of snapshots which will be kept on the disk at some point
# if set to -1 the max number of snapshots is unlimited
--snapshot-max-retained=-1
--snapshot-recover-on-startup=true
# number of workers
--num-workers=8

View File

@ -12,7 +12,6 @@ import argparse
import os
import shutil
import subprocess
import sys
import tempfile
import time
@ -42,9 +41,9 @@ class Memgraph:
# database args
database_args = [binary, "--num-workers", self.num_workers,
"--snapshot-directory", os.path.join(self.dataset,
"--durability-directory", os.path.join(self.dataset,
"memgraph"),
"--snapshot-recover-on-startup", "true",
"--db-recover-on-startup", "true",
"--port", self.port]
# database env

View File

@ -118,7 +118,7 @@ parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
parser.add_argument("--config", default = os.path.join(CONFIG_DIR,
"stress.conf"))
parser.add_argument("--log-file", default = "")
parser.add_argument("--snapshot-directory", default = "")
parser.add_argument("--durability-directory", default = "")
parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
"ve3", "bin", "python3"), type = str)
parser.add_argument("--large-dataset", action = "store_const",
@ -138,8 +138,8 @@ if not args.verbose:
cmd += ["--min-log-level", "1"]
if args.log_file:
cmd += ["--log-file", args.log_file]
if args.snapshot_directory:
cmd += ["--snapshot-directory", args.snapshot_directory]
if args.durability_directory:
cmd += ["--durability-directory", args.durability_directory]
proc_mg = subprocess.Popen(cmd, cwd = cwd,
env = {"MEMGRAPH_CONFIG": args.config})
time.sleep(1.0)

View File

@ -14,15 +14,12 @@
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/hashed_file_reader.hpp"
#include "durability/paths.hpp"
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
#include "durability/version.hpp"
#include "utils/string.hpp"
DECLARE_string(snapshot_directory);
DECLARE_bool(snapshot_on_exit);
DECLARE_int32(snapshot_cycle_sec);
DECLARE_bool(snapshot_recover_on_startup);
DECLARE_int32(wal_flush_interval_millis);
DECLARE_int32(wal_rotate_ops_count);
@ -214,27 +211,36 @@ void CompareDbs(GraphDb &a, GraphDb &b) {
}
}
const fs::path kSnapshotDir =
fs::temp_directory_path() / "MG_test_unit_durability" / "snapshot";
const fs::path kWalDir =
fs::temp_directory_path() / "MG_test_unit_durability" / "wal";
const fs::path kDurabilityDir =
fs::temp_directory_path() / "MG_test_unit_durability";
const fs::path kSnapshotDir = kDurabilityDir / durability::kSnapshotDir;
const fs::path kWalDir = kDurabilityDir / durability::kWalDir;
void CleanDurability() {
if (fs::exists(kSnapshotDir))
for (auto file : fs::directory_iterator(kSnapshotDir)) fs::remove(file);
if (fs::exists(kWalDir))
for (auto file : fs::directory_iterator(kWalDir)) fs::remove(file);
if (fs::exists(kDurabilityDir)) fs::remove_all(kDurabilityDir);
}
std::vector<fs::path> DirFiles(fs::path dir) {
std::vector<fs::path> files;
for (auto &file : fs::directory_iterator(dir)) files.push_back(file.path());
if (fs::exists(dir))
for (auto &file : fs::directory_iterator(dir)) files.push_back(file.path());
return files;
}
auto DbConfig() {
GraphDb::Config config;
config.durability_enabled = false;
config.durability_directory = kDurabilityDir;
config.snapshot_on_exit = false;
config.db_recover_on_startup = false;
return config;
}
void MakeSnapshot(GraphDb &db, int snapshot_max_retained = -1) {
GraphDbAccessor dba(db);
durability::MakeSnapshot(dba, kSnapshotDir, snapshot_max_retained);
ASSERT_TRUE(
durability::MakeSnapshot(dba, kDurabilityDir, snapshot_max_retained));
dba.Commit();
}
@ -278,23 +284,18 @@ void MakeDb(GraphDb &db, int scale, std::vector<int> indices = {}) {
class Durability : public ::testing::Test {
protected:
void SetUp() override {
FLAGS_wal_rotate_ops_count = 1000;
CleanDurability();
FLAGS_snapshot_cycle_sec = -1;
FLAGS_snapshot_directory = kSnapshotDir;
FLAGS_snapshot_on_exit = false;
FLAGS_wal_flush_interval_millis = -1;
FLAGS_wal_directory = kWalDir;
FLAGS_snapshot_recover_on_startup = false;
}
void TearDown() override { CleanDurability(); }
};
TEST_F(Durability, WalEncoding) {
FLAGS_wal_flush_interval_millis = 1;
FLAGS_wal_rotate_ops_count = 5000;
{
GraphDb db;
auto config = DbConfig();
config.durability_enabled = true;
GraphDb db{config};
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex();
ASSERT_EQ(v0.id(), 0);
@ -371,7 +372,7 @@ TEST_F(Durability, WalEncoding) {
TEST_F(Durability, SnapshotEncoding) {
{
GraphDb db;
GraphDb db{DbConfig()};
GraphDbAccessor dba(db);
auto v0 = dba.InsertVertex();
ASSERT_EQ(v0.id(), 0);
@ -473,22 +474,23 @@ TEST_F(Durability, SnapshotEncoding) {
}
TEST_F(Durability, SnapshotRecovery) {
GraphDb db;
GraphDb db{DbConfig()};
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeDb(db, 300, {3, 4});
MakeSnapshot(db);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
GraphDb recovered{recovered_config};
CompareDbs(db, recovered);
}
}
TEST_F(Durability, WalRecovery) {
FLAGS_wal_flush_interval_millis = 2;
FLAGS_wal_rotate_ops_count = 5000;
GraphDb db;
auto config = DbConfig();
config.durability_enabled = true;
GraphDb db{config};
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeDb(db, 300, {3, 4});
@ -499,16 +501,17 @@ TEST_F(Durability, WalRecovery) {
EXPECT_GT(DirFiles(kWalDir).size(), 1);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
GraphDb recovered{recovered_config};
CompareDbs(db, recovered);
}
}
TEST_F(Durability, SnapshotAndWalRecovery) {
FLAGS_wal_flush_interval_millis = 2;
FLAGS_wal_rotate_ops_count = 1000;
GraphDb db;
auto config = DbConfig();
config.durability_enabled = true;
GraphDb db{config};
MakeDb(db, 300, {0, 1, 2});
MakeDb(db, 300);
MakeSnapshot(db);
@ -522,16 +525,17 @@ TEST_F(Durability, SnapshotAndWalRecovery) {
EXPECT_GT(DirFiles(kWalDir).size(), 1);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
GraphDb recovered{recovered_config};
CompareDbs(db, recovered);
}
}
TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) {
FLAGS_wal_flush_interval_millis = 2;
FLAGS_wal_rotate_ops_count = 1000;
GraphDb db;
auto config = DbConfig();
config.durability_enabled = true;
GraphDb db{config};
// The first transaction modifies and commits.
GraphDbAccessor dba_1{db};
@ -571,17 +575,18 @@ TEST_F(Durability, SnapshotAndWalRecoveryAfterComplexTxSituation) {
ASSERT_EQ(DirFiles(kSnapshotDir).size(), 1);
EXPECT_GT(DirFiles(kWalDir).size(), 1);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
GraphDb recovered{recovered_config};
ASSERT_EQ(VisibleVertexCount(recovered), 400);
CompareDbs(db, recovered);
}
}
TEST_F(Durability, NoWalDuringRecovery) {
FLAGS_wal_flush_interval_millis = 2;
FLAGS_wal_rotate_ops_count = 1000;
GraphDb db;
auto config = DbConfig();
config.durability_enabled = true;
GraphDb db{config};
MakeDb(db, 300, {0, 1, 2});
// Sleep to ensure the WAL gets flushed.
@ -590,17 +595,17 @@ TEST_F(Durability, NoWalDuringRecovery) {
auto wal_files_before = DirFiles(kWalDir);
ASSERT_GT(wal_files_before.size(), 3);
{
FLAGS_snapshot_recover_on_startup = true;
GraphDb recovered;
auto recovered_config = DbConfig();
recovered_config.db_recover_on_startup = true;
GraphDb recovered{recovered_config};
CompareDbs(db, recovered);
auto wal_files_after = DirFiles(kWalDir);
// We get an extra file for the "current" wal of the recovered db.
EXPECT_EQ(wal_files_after.size(), wal_files_before.size() + 1);
EXPECT_EQ(wal_files_after.size(), wal_files_before.size());
}
}
TEST_F(Durability, SnapshotRetention) {
GraphDb db;
GraphDb db{DbConfig()};
for (auto &pair : {std::pair<int, int>{5, 10}, {5, 3}, {7, -1}}) {
CleanDurability();
int count, retain;
@ -609,8 +614,7 @@ TEST_F(Durability, SnapshotRetention) {
// Track the added snapshots to ensure the correct ones are pruned.
std::unordered_set<std::string> snapshots;
for (int i = 0; i < count; ++i) {
GraphDbAccessor dba(db);
durability::MakeSnapshot(dba, kSnapshotDir, retain);
MakeSnapshot(db, retain);
auto latest = GetLastFile(kSnapshotDir);
snapshots.emplace(GetLastFile(kSnapshotDir));
// Ensures that the latest snapshot was not in the snapshots collection
@ -624,8 +628,10 @@ TEST_F(Durability, SnapshotRetention) {
}
TEST_F(Durability, SnapshotOnExit) {
FLAGS_snapshot_directory = kSnapshotDir;
FLAGS_snapshot_on_exit = true;
{ GraphDb graph_db; }
{
auto config = DbConfig();
config.snapshot_on_exit = true;
GraphDb graph_db{config};
}
EXPECT_EQ(DirFiles(kSnapshotDir).size(), 1);
}

View File

@ -7,11 +7,10 @@
#include "database/graph_db_datatypes.hpp"
#include "database/indexes/label_property_index.hpp"
DECLARE_int32(gc_cycle_sec);
TEST(GraphDbTest, GarbageCollectIndices) {
FLAGS_gc_cycle_sec = -1;
GraphDb graph_db;
GraphDb::Config config;
config.gc_cycle_sec = -1;
GraphDb graph_db{config};
std::unique_ptr<GraphDbAccessor> dba =
std::make_unique<GraphDbAccessor>(graph_db);

View File

@ -1,15 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import sys
import sys
import os
import tempfile
from tabulate import tabulate
from timeit import default_timer as timer
# hackish way to resuse existing start code
sys.path.append(os.path.dirname(os.path.realpath(__file__)) +
sys.path.append(os.path.dirname(os.path.realpath(__file__)) +
"/../tests/macro_benchmark/")
from databases import *
from clients import *
@ -17,11 +16,11 @@ from common import get_absolute_path
def main():
path = get_absolute_path("benchmarking.conf", "config")
tmp_dir = tempfile.TemporaryDirectory()
durability_dir = tempfile.TemporaryDirectory()
SNAPSHOT_DIR_ARG = ["--snapshot-directory", tmp_dir.name]
MAKE_SNAPSHOT_ARGS = ["--snapshot-on-exit"] + SNAPSHOT_DIR_ARG
RECOVER_SNAPSHOT_ARGS = ["--snapshot-recover-on-startup"] + SNAPSHOT_DIR_ARG
DURABILITY_DIR_ARG = ["--durability-directory", durability_dir.name]
MAKE_SNAPSHOT_ARGS = ["--snapshot-on-exit"] + DURABILITY_DIR_ARG
RECOVER_SNAPSHOT_ARGS = ["--db-recover-on-startup"] + DURABILITY_DIR_ARG
snapshot_memgraph = Memgraph(MAKE_SNAPSHOT_ARGS, path, 1)
recover_memgraph = Memgraph(RECOVER_SNAPSHOT_ARGS, path, 1)
client = QueryClient(None, 1)
@ -32,7 +31,7 @@ def main():
for prop_per_node in [0, 1]:
snapshot_memgraph.start()
properties = "{}".format(",".join(
["p{}: 0".format(x) for x in range(prop_per_node)]))
["p{}: 0".format(x) for x in range(prop_per_node)]))
client(["UNWIND RANGE(1, {}) AS _ CREATE ({{ {} }})".format(node_cnt, properties)], snapshot_memgraph)
client(["UNWIND RANGE(1, {}) AS _ MATCH (n) CREATE (n)-[:l]->(n)"
.format(edge_per_node)], snapshot_memgraph)
@ -45,17 +44,16 @@ def main():
stop = timer()
diff = stop - start
snapshots = os.listdir(tmp_dir.name)
assert len(snapshots) == 1
snap_path = tmp_dir.name + "/" + snapshots[0]
snap_size = round(os.path.getsize(snap_path) / 1024. / 1024., 2)
os.remove(snap_path)
snapshots_dir = os.path.join(durability_dir.name, "snapshots")
assert (len(os.listdir(snapshots_dir)) == 1)
snapshot_file = os.path.join(snapshots_dir, os.listdir(snapshots_dir)[0])
snap_size = round(os.path.getsize(snapshot_file) / 1024. / 1024., 2)
os.remove(snapshot_file)
edge_cnt = edge_per_node * node_cnt
results.append((node_cnt, edge_cnt, prop_per_node, snap_size, diff))
print(tabulate(tabular_data=results, headers=["Nodes", "Edges",
print(tabulate(tabular_data=results, headers=["Nodes", "Edges",
"Properties", "Snapshot size (MB)", "Elapsed time (s)"]))
if __name__ == "__main__":

View File

@ -5,6 +5,7 @@ add_executable(mg_recovery_check
${memgraph_src_dir}/communication/bolt/v1/decoder/decoded_value.cpp
${memgraph_src_dir}/data_structures/concurrent/skiplist_gc.cpp
${memgraph_src_dir}/database/graph_db.cpp
${memgraph_src_dir}/database/graph_db_config.cpp
${memgraph_src_dir}/database/graph_db_accessor.cpp
${memgraph_src_dir}/durability/recovery.cpp
${memgraph_src_dir}/durability/snapshooter.cpp

View File

@ -3,24 +3,24 @@
#include "gflags/gflags.h"
#include "gtest/gtest.h"
#include "database/graph_db_accessor.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "durability/recovery.hpp"
#include "query/typed_value.hpp"
static const char *usage =
"--snapshot-dir SNAPSHOT_DIR\n"
"Check that Memgraph can recover that snapshot. This tool should be "
"--durability-dir DURABILITY_DIR\n"
"Check that Memgraph can recover the snapshot. This tool should be "
"invoked through 'test_mg_import' wrapper, so as to check that 'mg_import' "
"tools work correctly.\n";
DEFINE_string(snapshot_dir, "", "Path to where the snapshot is stored");
DEFINE_string(durability_dir, "", "Path to where the durability directory");
class RecoveryTest : public ::testing::Test {
protected:
void SetUp() override {
std::string snapshot(FLAGS_snapshot_dir);
durability::Recover(snapshot, db_);
std::string durability_dir(FLAGS_durability_dir);
durability::Recover(durability_dir, db_);
}
GraphDb db_;

View File

@ -25,14 +25,17 @@ def main():
forum_nodes = os.path.join(_SCRIPT_DIR, 'csv', 'forum_nodes.csv')
relationships_0 = os.path.join(_SCRIPT_DIR, 'csv', 'relationships_0.csv')
relationships_1 = os.path.join(_SCRIPT_DIR, 'csv', 'relationships_1.csv')
with tempfile.TemporaryDirectory(suffix='-snapshots', dir=_SCRIPT_DIR) as snapshot_dir:
with tempfile.TemporaryDirectory(
suffix='-durability', dir=_SCRIPT_DIR) as durability_dir:
snapshot_dir = os.path.join(durability_dir, 'snapshots')
os.makedirs(snapshot_dir, exist_ok=True)
out_snapshot = os.path.join(snapshot_dir, 'snapshot')
mg_import_csv = [args.mg_import_csv, '--nodes', comment_nodes,
'--nodes', forum_nodes, '--relationships', relationships_0,
'--relationships', relationships_1,
'--out', out_snapshot, '--csv-delimiter=|', '--array-delimiter=;']
subprocess.check_call(mg_import_csv)
mg_recovery_check = [args.mg_recovery_check, '--snapshot-dir', snapshot_dir]
mg_recovery_check = [args.mg_recovery_check, '--durability-dir', durability_dir]
subprocess.check_call(mg_recovery_check)