Make csv_to_snapshot more user friendly
Summary: Time csv_to_snapshot conversion and log it. Check if writing csv_to_snapshot failed. Extract LoadConfig from memgraph_bolt to config.hpp. Read memgraph config in csv_to_snapshot for snapshot_directory. Rename csv_to_snapshot to mg_import_csv. Add tests for tools. Run tools tests in apollo. Reviewers: mislav.bradac, florijan, mferencevic, buda Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D931
This commit is contained in:
parent
cfb8db9e20
commit
df72861b90
56
src/config.hpp
Normal file
56
src/config.hpp
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <experimental/filesystem>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include <gflags/gflags.h>
|
||||||
|
#include <glog/logging.h>
|
||||||
|
|
||||||
|
/// Reads the memgraph configuration files.
|
||||||
|
///
|
||||||
|
/// Load flags in this order, the last one has the highest priority:
|
||||||
|
/// 1) /etc/memgraph/memgraph.conf
|
||||||
|
/// 2) ~/.memgraph/config
|
||||||
|
/// 3) env - MEMGRAPH_CONFIG
|
||||||
|
void LoadConfig() {
|
||||||
|
namespace fs = std::experimental::filesystem;
|
||||||
|
std::vector<fs::path> configs = {fs::path("/etc/memgraph/memgraph.conf")};
|
||||||
|
if (getenv("HOME") != nullptr)
|
||||||
|
configs.emplace_back(fs::path(getenv("HOME")) /
|
||||||
|
fs::path(".memgraph/config"));
|
||||||
|
{
|
||||||
|
auto memgraph_config = getenv("MEMGRAPH_CONFIG");
|
||||||
|
if (memgraph_config != nullptr) {
|
||||||
|
auto path = fs::path(memgraph_config);
|
||||||
|
CHECK(fs::exists(path))
|
||||||
|
<< "MEMGRAPH_CONFIG environment variable set to nonexisting path: "
|
||||||
|
<< path.generic_string();
|
||||||
|
configs.emplace_back(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::string> flagfile_arguments;
|
||||||
|
for (const auto &config : configs)
|
||||||
|
if (fs::exists(config)) {
|
||||||
|
flagfile_arguments.emplace_back(
|
||||||
|
std::string("--flag-file=" + config.generic_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
int custom_argc = static_cast<int>(flagfile_arguments.size()) + 1;
|
||||||
|
char **custom_argv = new char *[custom_argc];
|
||||||
|
|
||||||
|
custom_argv[0] = strdup(std::string("memgraph").c_str());
|
||||||
|
for (int i = 0; i < static_cast<int>(flagfile_arguments.size()); ++i) {
|
||||||
|
custom_argv[i + 1] = strdup(flagfile_arguments[i].c_str());
|
||||||
|
}
|
||||||
|
|
||||||
|
// setup flags from config flags
|
||||||
|
gflags::ParseCommandLineFlags(&custom_argc, &custom_argv, false);
|
||||||
|
|
||||||
|
// unconsumed arguments have to be freed to avoid memory leak since they are
|
||||||
|
// strdup-ed.
|
||||||
|
for (int i = 0; i < custom_argc; ++i) free(custom_argv[i]);
|
||||||
|
delete[] custom_argv;
|
||||||
|
}
|
||||||
|
|
@ -69,7 +69,7 @@ bool Snapshooter::Encode(const fs::path &snapshot_file,
|
|||||||
}
|
}
|
||||||
buffer.WriteSummary(vertex_num, edge_num);
|
buffer.WriteSummary(vertex_num, edge_num);
|
||||||
buffer.Close();
|
buffer.Close();
|
||||||
} catch (std::ifstream::failure e) {
|
} catch (const std::ifstream::failure &) {
|
||||||
if (fs::exists(snapshot_file) && !fs::remove(snapshot_file)) {
|
if (fs::exists(snapshot_file) && !fs::remove(snapshot_file)) {
|
||||||
LOG(ERROR) << "Error while removing corrupted snapshot file: "
|
LOG(ERROR) << "Error while removing corrupted snapshot file: "
|
||||||
<< snapshot_file;
|
<< snapshot_file;
|
||||||
@ -79,7 +79,7 @@ bool Snapshooter::Encode(const fs::path &snapshot_file,
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
fs::path Snapshooter::GetSnapshotFileName(const fs::path &snapshot_folder) {
|
fs::path GetSnapshotFileName(const fs::path &snapshot_folder) {
|
||||||
std::string date_str =
|
std::string date_str =
|
||||||
Timestamp(Timestamp::now())
|
Timestamp(Timestamp::now())
|
||||||
.to_string("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
|
.to_string("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
|
||||||
|
@ -8,6 +8,11 @@ namespace fs = std::experimental::filesystem;
|
|||||||
|
|
||||||
class GraphDbAccessor;
|
class GraphDbAccessor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns path to new snapshot file in format snapshot_folder/timestamp.
|
||||||
|
*/
|
||||||
|
fs::path GetSnapshotFileName(const fs::path &snapshot_folder);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class responsible for making snapshots. Snapshots are stored in folder
|
* Class responsible for making snapshots. Snapshots are stored in folder
|
||||||
* memgraph/build/$snapshot_folder/$db_name using bolt protocol.
|
* memgraph/build/$snapshot_folder/$db_name using bolt protocol.
|
||||||
@ -29,11 +34,6 @@ class Snapshooter {
|
|||||||
int snapshot_max_retained);
|
int snapshot_max_retained);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
|
||||||
* Method returns path to new snapshot file in format
|
|
||||||
* memgraph/build/$snapshot_folder/$db_name/$timestamp
|
|
||||||
*/
|
|
||||||
fs::path GetSnapshotFileName(const fs::path &snapshot_folder);
|
|
||||||
/**
|
/**
|
||||||
* Method used to keep given number of snapshots in snapshot folder. Newest
|
* Method used to keep given number of snapshots in snapshot folder. Newest
|
||||||
* max_retained_files snapshots are kept, other snapshots are deleted. If
|
* max_retained_files snapshots are kept, other snapshots are deleted. If
|
||||||
|
@ -6,18 +6,16 @@
|
|||||||
|
|
||||||
#include "communication/bolt/v1/session.hpp"
|
#include "communication/bolt/v1/session.hpp"
|
||||||
#include "communication/server.hpp"
|
#include "communication/server.hpp"
|
||||||
|
#include "config.hpp"
|
||||||
#include "io/network/network_endpoint.hpp"
|
#include "io/network/network_endpoint.hpp"
|
||||||
#include "io/network/network_error.hpp"
|
#include "io/network/network_error.hpp"
|
||||||
#include "io/network/socket.hpp"
|
#include "io/network/socket.hpp"
|
||||||
|
|
||||||
#include "utils/flag_validation.hpp"
|
#include "utils/flag_validation.hpp"
|
||||||
#include "utils/scheduler.hpp"
|
#include "utils/scheduler.hpp"
|
||||||
#include "utils/signals/handler.hpp"
|
#include "utils/signals/handler.hpp"
|
||||||
#include "utils/stacktrace.hpp"
|
#include "utils/stacktrace.hpp"
|
||||||
#include "utils/sysinfo/memory.hpp"
|
#include "utils/sysinfo/memory.hpp"
|
||||||
#include "utils/terminate_handler.hpp"
|
#include "utils/terminate_handler.hpp"
|
||||||
|
|
||||||
#include "version.hpp"
|
#include "version.hpp"
|
||||||
|
|
||||||
namespace fs = std::experimental::filesystem;
|
namespace fs = std::experimental::filesystem;
|
||||||
@ -42,60 +40,14 @@ DEFINE_uint64(memory_warning_threshold, 1024,
|
|||||||
"less available RAM available it will log a warning. Set to 0 to "
|
"less available RAM available it will log a warning. Set to 0 to "
|
||||||
"disable.");
|
"disable.");
|
||||||
|
|
||||||
// Load flags in this order, the last one has the highest priority:
|
|
||||||
// 1) /etc/memgraph/memgraph.conf
|
|
||||||
// 2) ~/.memgraph/config
|
|
||||||
// 3) env - MEMGRAPH_CONFIG
|
|
||||||
// 4) command line flags
|
|
||||||
|
|
||||||
void LoadConfig(int &argc, char **&argv) {
|
|
||||||
std::vector<fs::path> configs = {fs::path("/etc/memgraph/memgraph.conf")};
|
|
||||||
if (getenv("HOME") != nullptr)
|
|
||||||
configs.emplace_back(fs::path(getenv("HOME")) /
|
|
||||||
fs::path(".memgraph/config"));
|
|
||||||
{
|
|
||||||
auto memgraph_config = getenv("MEMGRAPH_CONFIG");
|
|
||||||
if (memgraph_config != nullptr) {
|
|
||||||
auto path = fs::path(memgraph_config);
|
|
||||||
CHECK(fs::exists(path))
|
|
||||||
<< "MEMGRAPH_CONFIG environment variable set to nonexisting path: "
|
|
||||||
<< path.generic_string();
|
|
||||||
configs.emplace_back(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<std::string> flagfile_arguments;
|
|
||||||
for (const auto &config : configs)
|
|
||||||
if (fs::exists(config)) {
|
|
||||||
flagfile_arguments.emplace_back(
|
|
||||||
std::string("--flag-file=" + config.generic_string()));
|
|
||||||
}
|
|
||||||
|
|
||||||
int custom_argc = static_cast<int>(flagfile_arguments.size()) + 1;
|
|
||||||
char **custom_argv = new char *[custom_argc];
|
|
||||||
|
|
||||||
custom_argv[0] = strdup(std::string("memgraph").c_str());
|
|
||||||
for (int i = 0; i < static_cast<int>(flagfile_arguments.size()); ++i) {
|
|
||||||
custom_argv[i + 1] = strdup(flagfile_arguments[i].c_str());
|
|
||||||
}
|
|
||||||
|
|
||||||
// setup flags from config flags
|
|
||||||
gflags::ParseCommandLineFlags(&custom_argc, &custom_argv, false);
|
|
||||||
|
|
||||||
// unconsumed arguments have to be freed to avoid memory leak since they are
|
|
||||||
// strdup-ed.
|
|
||||||
for (int i = 0; i < custom_argc; ++i) free(custom_argv[i]);
|
|
||||||
delete[] custom_argv;
|
|
||||||
|
|
||||||
// setup flags from command line
|
|
||||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
google::SetUsageMessage("Memgraph database server");
|
google::SetUsageMessage("Memgraph database server");
|
||||||
gflags::SetVersionString(version_string);
|
gflags::SetVersionString(version_string);
|
||||||
|
|
||||||
LoadConfig(argc, argv);
|
// Load config before parsing arguments, so that flags from the command line
|
||||||
|
// overwrite the config.
|
||||||
|
LoadConfig();
|
||||||
|
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||||
|
|
||||||
google::InitGoogleLogging(argv[0]);
|
google::InitGoogleLogging(argv[0]);
|
||||||
google::SetLogDestination(google::INFO, FLAGS_log_file.c_str());
|
google::SetLogDestination(google::INFO, FLAGS_log_file.c_str());
|
||||||
|
@ -131,6 +131,6 @@ memgraph_snapshot_dir=${dataset_dir}/memgraph/default
|
|||||||
mkdir -p ${memgraph_snapshot_dir}
|
mkdir -p ${memgraph_snapshot_dir}
|
||||||
cd ${memgraph_snapshot_dir}
|
cd ${memgraph_snapshot_dir}
|
||||||
echo "Converting CSV dataset to '${memgraph_snapshot_dir}/snapshot'"
|
echo "Converting CSV dataset to '${memgraph_snapshot_dir}/snapshot'"
|
||||||
${base_dir}/tools/csv_to_snapshot --out snapshot ${csv_dataset} --csv-delimiter "|" --array-delimiter ";"
|
${base_dir}/tools/mg_import_csv --out snapshot ${csv_dataset} --csv-delimiter "|" --array-delimiter ";"
|
||||||
|
|
||||||
echo "Done!"
|
echo "Done!"
|
||||||
|
2
tools/.gitignore
vendored
2
tools/.gitignore
vendored
@ -1,2 +1,2 @@
|
|||||||
build/
|
build/
|
||||||
csv_to_snapshot
|
mg_import_csv
|
||||||
|
@ -72,3 +72,5 @@ set(memgraph_src_dir ${PROJECT_SOURCE_DIR}/../src)
|
|||||||
include_directories(${memgraph_src_dir})
|
include_directories(${memgraph_src_dir})
|
||||||
|
|
||||||
add_subdirectory(src)
|
add_subdirectory(src)
|
||||||
|
enable_testing()
|
||||||
|
add_subdirectory(tests)
|
||||||
|
@ -19,6 +19,8 @@ cd build_release
|
|||||||
cmake -DCMAKE_BUILD_TYPE=release ..
|
cmake -DCMAKE_BUILD_TYPE=release ..
|
||||||
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark memgraph__stress
|
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark memgraph__stress
|
||||||
|
|
||||||
cd ../tools/apollo
|
cd ../tools
|
||||||
|
./setup
|
||||||
|
|
||||||
|
cd apollo
|
||||||
./generate debug
|
./generate debug
|
||||||
|
@ -32,6 +32,8 @@ cd build
|
|||||||
cmake -DCMAKE_BUILD_TYPE=release ..
|
cmake -DCMAKE_BUILD_TYPE=release ..
|
||||||
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark
|
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark
|
||||||
|
|
||||||
cd ../../memgraph/tools/apollo
|
cd ../../memgraph/tools
|
||||||
|
./setup
|
||||||
|
|
||||||
|
cd apollo
|
||||||
./generate diff
|
./generate diff
|
||||||
|
@ -13,6 +13,8 @@ BASE_DIR_NAME = os.path.basename(BASE_DIR)
|
|||||||
BUILD_DIR = os.path.join(BASE_DIR, "build")
|
BUILD_DIR = os.path.join(BASE_DIR, "build")
|
||||||
LIBS_DIR = os.path.join(BASE_DIR, "libs")
|
LIBS_DIR = os.path.join(BASE_DIR, "libs")
|
||||||
TESTS_DIR = os.path.join(BUILD_DIR, "tests")
|
TESTS_DIR = os.path.join(BUILD_DIR, "tests")
|
||||||
|
TOOLS_DIR = os.path.join(BASE_DIR, "tools")
|
||||||
|
TOOLS_BUILD_DIR = os.path.join(TOOLS_DIR, "build")
|
||||||
OUTPUT_DIR = os.path.join(BUILD_DIR, "apollo")
|
OUTPUT_DIR = os.path.join(BUILD_DIR, "apollo")
|
||||||
|
|
||||||
# output lists
|
# output lists
|
||||||
@ -284,11 +286,11 @@ if mode == "release":
|
|||||||
if mode == "release":
|
if mode == "release":
|
||||||
ldbc_path = os.path.join(BASE_DIR, "tests", "public_benchmark", "ldbc")
|
ldbc_path = os.path.join(BASE_DIR, "tests", "public_benchmark", "ldbc")
|
||||||
neo4j_path = os.path.join(BASE_DIR, "libs", "neo4j")
|
neo4j_path = os.path.join(BASE_DIR, "libs", "neo4j")
|
||||||
csv_to_snapshot_path = os.path.join(BASE_DIR, "tools", "csv_to_snapshot")
|
mg_import_csv_path = os.path.join(BASE_DIR, "tools", "mg_import_csv")
|
||||||
plot_ldbc_latency_path = os.path.join(BASE_DIR, "tools", "plot_ldbc_latency")
|
plot_ldbc_latency_path = os.path.join(BASE_DIR, "tools", "plot_ldbc_latency")
|
||||||
infile = create_archive("ldbc", [binary_release_path, ldbc_path,
|
infile = create_archive("ldbc", [binary_release_path, ldbc_path,
|
||||||
binary_release_link_path, neo4j_path, config_path,
|
binary_release_link_path, neo4j_path, config_path,
|
||||||
csv_to_snapshot_path, plot_ldbc_latency_path],
|
mg_import_csv_path, plot_ldbc_latency_path],
|
||||||
cwd = WORKSPACE_DIR)
|
cwd = WORKSPACE_DIR)
|
||||||
cmd = "cd memgraph/tests/public_benchmark/ldbc\n. continuous_integration\n"
|
cmd = "cd memgraph/tests/public_benchmark/ldbc\n. continuous_integration\n"
|
||||||
outfile_paths = "\./memgraph/tests/public_benchmark/ldbc/results/.+\n" \
|
outfile_paths = "\./memgraph/tests/public_benchmark/ldbc/results/.+\n" \
|
||||||
@ -297,6 +299,18 @@ if mode == "release":
|
|||||||
infile = infile, outfile_paths = outfile_paths,
|
infile = infile, outfile_paths = outfile_paths,
|
||||||
slave_group = "remote_20c140g", enable_network = True))
|
slave_group = "remote_20c140g", enable_network = True))
|
||||||
|
|
||||||
|
# tools tests
|
||||||
|
ctest_output = run_cmd(["ctest", "-N"], TOOLS_BUILD_DIR)
|
||||||
|
tools_infile = create_archive("tools_test", [TOOLS_BUILD_DIR], cwd = WORKSPACE_DIR)
|
||||||
|
for row in ctest_output.split("\n"):
|
||||||
|
# Filter rows only containing tests.
|
||||||
|
if "Test #" not in row: continue
|
||||||
|
test_name = row.split(":")[1].strip()
|
||||||
|
test_dir = os.path.relpath(TOOLS_BUILD_DIR, WORKSPACE_DIR)
|
||||||
|
commands = "cd {}\nctest --output-on-failure -R \"^{}$\"".format(test_dir, test_name)
|
||||||
|
run = generate_run("tools_" + test_name, commands = commands, infile = tools_infile)
|
||||||
|
RUNS.append(run)
|
||||||
|
|
||||||
# store ARCHIVES and RUNS
|
# store ARCHIVES and RUNS
|
||||||
store_metadata(OUTPUT_DIR, "archives", ARCHIVES)
|
store_metadata(OUTPUT_DIR, "archives", ARCHIVES)
|
||||||
store_metadata(OUTPUT_DIR, "runs", RUNS + DATA_PROCESS)
|
store_metadata(OUTPUT_DIR, "runs", RUNS + DATA_PROCESS)
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
FROM debian:stretch
|
FROM debian:stretch
|
||||||
|
|
||||||
COPY csv_to_snapshot /usr/local/bin/csv_to_snapshot
|
COPY mg_import_csv /usr/local/bin/mg_import_csv
|
||||||
|
|
||||||
# Setup memgraph user and group.
|
# Setup memgraph user and group.
|
||||||
RUN groupadd -r memgraph
|
RUN groupadd -r memgraph
|
||||||
@ -16,7 +16,7 @@ USER memgraph:memgraph
|
|||||||
VOLUME /data
|
VOLUME /data
|
||||||
WORKDIR /data
|
WORKDIR /data
|
||||||
|
|
||||||
ENTRYPOINT ["csv_to_snapshot"]
|
ENTRYPOINT ["mg_import_csv"]
|
||||||
# Print help and usage by default, since at least one --nodes argument is
|
# Print help and usage by default, since at least one --nodes argument is
|
||||||
# required.
|
# required.
|
||||||
CMD ["--help"]
|
CMD ["--help"]
|
@ -1,9 +1,10 @@
|
|||||||
add_executable(csv_to_snapshot
|
add_executable(mg_import_csv
|
||||||
csv_to_snapshot/main.cpp
|
mg_import_csv/main.cpp
|
||||||
# This is just friggin terrible. csv_to_snapshot needs to depend almost on
|
# This is just friggin terrible. mg_import_csv needs to depend almost on
|
||||||
# the whole memgraph, just to use TypedValue and BaseEncoder.
|
# the whole memgraph, just to use TypedValue and BaseEncoder.
|
||||||
${memgraph_src_dir}/data_structures/concurrent/skiplist_gc.cpp
|
${memgraph_src_dir}/data_structures/concurrent/skiplist_gc.cpp
|
||||||
${memgraph_src_dir}/database/graph_db_accessor.cpp
|
${memgraph_src_dir}/database/graph_db_accessor.cpp
|
||||||
|
${memgraph_src_dir}/durability/snapshooter.cpp
|
||||||
${memgraph_src_dir}/query/typed_value.cpp
|
${memgraph_src_dir}/query/typed_value.cpp
|
||||||
${memgraph_src_dir}/storage/edge_accessor.cpp
|
${memgraph_src_dir}/storage/edge_accessor.cpp
|
||||||
${memgraph_src_dir}/storage/locking/record_lock.cpp
|
${memgraph_src_dir}/storage/locking/record_lock.cpp
|
||||||
@ -16,12 +17,12 @@ add_executable(csv_to_snapshot
|
|||||||
# Strip the executable in release build.
|
# Strip the executable in release build.
|
||||||
string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
|
string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
|
||||||
if (lower_build_type STREQUAL "release")
|
if (lower_build_type STREQUAL "release")
|
||||||
add_custom_command(TARGET csv_to_snapshot POST_BUILD
|
add_custom_command(TARGET mg_import_csv POST_BUILD
|
||||||
COMMAND strip -s csv_to_snapshot
|
COMMAND strip -s mg_import_csv
|
||||||
COMMENT Stripping symbols and sections from csv_to_snapshot)
|
COMMENT Stripping symbols and sections from mg_import_csv)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
target_link_libraries(csv_to_snapshot stdc++fs Threads::Threads fmt
|
target_link_libraries(mg_import_csv stdc++fs Threads::Threads fmt
|
||||||
gflags glog cppitertools)
|
gflags glog cppitertools)
|
||||||
install(TARGETS csv_to_snapshot
|
install(TARGETS mg_import_csv
|
||||||
RUNTIME DESTINATION .)
|
RUNTIME DESTINATION .)
|
||||||
|
@ -9,9 +9,12 @@
|
|||||||
#include "glog/logging.h"
|
#include "glog/logging.h"
|
||||||
|
|
||||||
#include "communication/bolt/v1/encoder/base_encoder.hpp"
|
#include "communication/bolt/v1/encoder/base_encoder.hpp"
|
||||||
|
#include "config.hpp"
|
||||||
#include "durability/file_writer_buffer.hpp"
|
#include "durability/file_writer_buffer.hpp"
|
||||||
|
#include "durability/snapshooter.hpp"
|
||||||
#include "durability/version.hpp"
|
#include "durability/version.hpp"
|
||||||
#include "utils/string.hpp"
|
#include "utils/string.hpp"
|
||||||
|
#include "utils/timer.hpp"
|
||||||
|
|
||||||
bool ValidateNotEmpty(const char *flagname, const std::string &value) {
|
bool ValidateNotEmpty(const char *flagname, const std::string &value) {
|
||||||
if (utils::Trim(value).empty()) {
|
if (utils::Trim(value).empty()) {
|
||||||
@ -21,8 +24,10 @@ bool ValidateNotEmpty(const char *flagname, const std::string &value) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
DEFINE_string(out, "", "Destination for the created snapshot file");
|
DEFINE_string(out, "",
|
||||||
DEFINE_validator(out, &ValidateNotEmpty);
|
"Destination for the created snapshot file. Without it, snapshot "
|
||||||
|
"is written inside the expected snapshots directory of Memgraph "
|
||||||
|
"installation.");
|
||||||
DEFINE_bool(overwrite, false, "Overwrite the output file if it exists");
|
DEFINE_bool(overwrite, false, "Overwrite the output file if it exists");
|
||||||
DEFINE_string(array_delimiter, ";",
|
DEFINE_string(array_delimiter, ";",
|
||||||
"Delimiter between elements of array values, default is ';'");
|
"Delimiter between elements of array values, default is ';'");
|
||||||
@ -108,6 +113,7 @@ std::vector<std::string> ReadRow(std::istream &stream) {
|
|||||||
std::vector<char> column;
|
std::vector<char> column;
|
||||||
char c;
|
char c;
|
||||||
while (!stream.get(c).eof()) {
|
while (!stream.get(c).eof()) {
|
||||||
|
if (!stream) LOG(FATAL) << "Unable to read CSV row";
|
||||||
if (quoting) {
|
if (quoting) {
|
||||||
if (c == quoting)
|
if (c == quoting)
|
||||||
quoting = 0;
|
quoting = 0;
|
||||||
@ -316,56 +322,95 @@ auto ConvertRelationships(
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Convert(const std::vector<std::string> &nodes,
|
void Convert(const std::vector<std::string> &nodes,
|
||||||
const std::vector<std::string> &relationships) {
|
const std::vector<std::string> &relationships,
|
||||||
FileWriterBuffer buffer(FLAGS_out);
|
const std::string &output_path) {
|
||||||
communication::bolt::BaseEncoder<FileWriterBuffer> encoder(buffer);
|
try {
|
||||||
int64_t node_count = 0;
|
FileWriterBuffer buffer(output_path);
|
||||||
int64_t relationship_count = 0;
|
communication::bolt::BaseEncoder<FileWriterBuffer> encoder(buffer);
|
||||||
MemgraphNodeIdMap node_id_map;
|
int64_t node_count = 0;
|
||||||
// Snapshot file has the following contents in order:
|
int64_t relationship_count = 0;
|
||||||
// 1) magic number
|
MemgraphNodeIdMap node_id_map;
|
||||||
// 2) transactional snapshot of the snapshoter. When the snapshot is
|
// Snapshot file has the following contents in order:
|
||||||
// generated it's an empty list.
|
// 1) magic number
|
||||||
// 3) list of label+property index
|
// 2) transactional snapshot of the snapshoter. When the snapshot is
|
||||||
// 4) all nodes, sequentially, but not encoded as a list
|
// generated it's an empty list.
|
||||||
// 5) all relationships, sequentially, but not encoded as a list
|
// 3) list of label+property index
|
||||||
// 5) summary with node count, relationship count and hash digest
|
// 4) all nodes, sequentially, but not encoded as a list
|
||||||
encoder.WriteRAW(durability::kMagicNumber.data(),
|
// 5) all relationships, sequentially, but not encoded as a list
|
||||||
durability::kMagicNumber.size());
|
// 5) summary with node count, relationship count and hash digest
|
||||||
encoder.WriteTypedValue(durability::kVersion);
|
encoder.WriteRAW(durability::kMagicNumber.data(),
|
||||||
encoder.WriteList({}); // Transactional snapshot.
|
durability::kMagicNumber.size());
|
||||||
encoder.WriteList({}); // Label + property indexes.
|
encoder.WriteTypedValue(durability::kVersion);
|
||||||
for (const auto &nodes_file : nodes) {
|
encoder.WriteList({}); // Transactional snapshot.
|
||||||
node_count += ConvertNodes(nodes_file, node_id_map, encoder);
|
encoder.WriteList({}); // Label + property indexes.
|
||||||
|
for (const auto &nodes_file : nodes) {
|
||||||
|
node_count += ConvertNodes(nodes_file, node_id_map, encoder);
|
||||||
|
}
|
||||||
|
for (const auto &relationships_file : relationships) {
|
||||||
|
relationship_count +=
|
||||||
|
ConvertRelationships(relationships_file, node_id_map, encoder);
|
||||||
|
}
|
||||||
|
buffer.WriteSummary(node_count, relationship_count);
|
||||||
|
} catch (const std::ios_base::failure &) {
|
||||||
|
// Only FileWriterBuffer sets the underlying fstream to throw.
|
||||||
|
LOG(FATAL) << fmt::format("Unable to write to '{}'", output_path);
|
||||||
}
|
}
|
||||||
for (const auto &relationships_file : relationships) {
|
|
||||||
relationship_count +=
|
|
||||||
ConvertRelationships(relationships_file, node_id_map, encoder);
|
|
||||||
}
|
|
||||||
buffer.WriteSummary(node_count, relationship_count);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char *usage =
|
static const char *usage =
|
||||||
"[OPTION]... --out SNAPSHOT_FILE [--nodes=CSV_FILE]... [--edges=CSV_FILE]...\n"
|
"[OPTION]... [--out=SNAPSHOT_FILE] [--nodes=CSV_FILE]... "
|
||||||
|
"[--relationships=CSV_FILE]...\n"
|
||||||
"Create a Memgraph recovery snapshot file from CSV.\n";
|
"Create a Memgraph recovery snapshot file from CSV.\n";
|
||||||
|
|
||||||
|
// Used only to get the value from memgraph's configuration files.
|
||||||
|
DEFINE_HIDDEN_string(snapshot_directory, "", "Snapshot directory");
|
||||||
|
|
||||||
|
std::string GetOutputPath() {
|
||||||
|
// If we have the 'out' flag, use that.
|
||||||
|
if (!utils::Trim(FLAGS_out).empty()) return FLAGS_out;
|
||||||
|
// Without the 'out', fall back to reading the memgraph configuration for
|
||||||
|
// snapshot_directory. Hopefully, memgraph configuration doesn't contain other
|
||||||
|
// flags which are defined in this file.
|
||||||
|
LoadConfig();
|
||||||
|
// Without snapshot_directory, we have to require 'out' flag.
|
||||||
|
if (utils::Trim(FLAGS_snapshot_directory).empty())
|
||||||
|
LOG(FATAL) << "Unable to determine snapshot output location. Please, "
|
||||||
|
"provide the 'out' flag";
|
||||||
|
// TODO: Remove 'default' when Dbms is purged.
|
||||||
|
std::string snapshot_dir = FLAGS_snapshot_directory + "/default";
|
||||||
|
try {
|
||||||
|
if (!std::experimental::filesystem::exists(snapshot_dir) &&
|
||||||
|
!std::experimental::filesystem::create_directories(snapshot_dir)) {
|
||||||
|
LOG(FATAL) << fmt::format("Cannot create snapshot directory '{}'",
|
||||||
|
snapshot_dir);
|
||||||
|
}
|
||||||
|
} catch (const std::experimental::filesystem::filesystem_error &error) {
|
||||||
|
LOG(FATAL) << error.what();
|
||||||
|
}
|
||||||
|
return std::string(GetSnapshotFileName(snapshot_dir));
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
gflags::SetUsageMessage(usage);
|
gflags::SetUsageMessage(usage);
|
||||||
auto nodes = ParseRepeatedFlag("nodes", argc, argv);
|
auto nodes = ParseRepeatedFlag("nodes", argc, argv);
|
||||||
auto relationships = ParseRepeatedFlag("relationships", argc, argv);
|
auto relationships = ParseRepeatedFlag("relationships", argc, argv);
|
||||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||||
google::InitGoogleLogging(argv[0]);
|
google::InitGoogleLogging(argv[0]);
|
||||||
if (std::experimental::filesystem::exists(FLAGS_out) && !FLAGS_overwrite) {
|
std::string output_path(GetOutputPath());
|
||||||
|
if (std::experimental::filesystem::exists(output_path) && !FLAGS_overwrite) {
|
||||||
LOG(FATAL) << fmt::format(
|
LOG(FATAL) << fmt::format(
|
||||||
"File exists: '{}'. Pass --overwrite if you want to overwrite.",
|
"File exists: '{}'. Pass --overwrite if you want to overwrite.",
|
||||||
FLAGS_out);
|
output_path);
|
||||||
}
|
}
|
||||||
auto iter_all_inputs = iter::chain(nodes, relationships);
|
auto iter_all_inputs = iter::chain(nodes, relationships);
|
||||||
std::vector<std::string> all_inputs(iter_all_inputs.begin(),
|
std::vector<std::string> all_inputs(iter_all_inputs.begin(),
|
||||||
iter_all_inputs.end());
|
iter_all_inputs.end());
|
||||||
LOG(INFO) << fmt::format("Converting {} to '{}'",
|
LOG(INFO) << fmt::format("Converting {} to '{}'",
|
||||||
utils::Join(all_inputs, ", "), FLAGS_out);
|
utils::Join(all_inputs, ", "), output_path);
|
||||||
Convert(nodes, relationships);
|
utils::Timer conversion_timer;
|
||||||
LOG(INFO) << fmt::format("Created '{}'", FLAGS_out);
|
Convert(nodes, relationships, output_path);
|
||||||
|
double conversion_sec = conversion_timer.Elapsed().count();
|
||||||
|
LOG(INFO) << fmt::format("Created '{}' in {:.2f} seconds", output_path,
|
||||||
|
conversion_sec);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
35
tools/tests/CMakeLists.txt
Normal file
35
tools/tests/CMakeLists.txt
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
include_directories(SYSTEM ${GTEST_INCLUDE_DIR})
|
||||||
|
|
||||||
|
add_executable(mg_recovery_check
|
||||||
|
mg_recovery_check.cpp
|
||||||
|
${memgraph_src_dir}/communication/bolt/v1/decoder/decoded_value.cpp
|
||||||
|
${memgraph_src_dir}/data_structures/concurrent/skiplist_gc.cpp
|
||||||
|
${memgraph_src_dir}/database/dbms.cpp
|
||||||
|
${memgraph_src_dir}/database/graph_db.cpp
|
||||||
|
${memgraph_src_dir}/database/graph_db_accessor.cpp
|
||||||
|
${memgraph_src_dir}/durability/recovery.cpp
|
||||||
|
${memgraph_src_dir}/durability/snapshooter.cpp
|
||||||
|
${memgraph_src_dir}/query/typed_value.cpp
|
||||||
|
${memgraph_src_dir}/storage/edge_accessor.cpp
|
||||||
|
${memgraph_src_dir}/storage/locking/record_lock.cpp
|
||||||
|
${memgraph_src_dir}/storage/property_value.cpp
|
||||||
|
${memgraph_src_dir}/storage/record_accessor.cpp
|
||||||
|
${memgraph_src_dir}/storage/vertex_accessor.cpp
|
||||||
|
${memgraph_src_dir}/transactions/transaction.cpp
|
||||||
|
)
|
||||||
|
|
||||||
|
target_link_libraries(mg_recovery_check stdc++fs Threads::Threads fmt glog
|
||||||
|
gflags cppitertools)
|
||||||
|
target_link_libraries(mg_recovery_check gtest gtest_main)
|
||||||
|
|
||||||
|
# Copy CSV data to CMake build dir
|
||||||
|
configure_file(csv/comment_nodes.csv csv/comment_nodes.csv COPYONLY)
|
||||||
|
configure_file(csv/forum_nodes.csv csv/forum_nodes.csv COPYONLY)
|
||||||
|
configure_file(csv/relationships.csv csv/relationships.csv COPYONLY)
|
||||||
|
# Copy the actual runner to CMake build dir
|
||||||
|
configure_file(test_mg_import_csv test_mg_import_csv COPYONLY)
|
||||||
|
|
||||||
|
add_test(NAME test_mg_import_csv
|
||||||
|
COMMAND test_mg_import_csv
|
||||||
|
--mg-import-csv ../src/mg_import_csv
|
||||||
|
--mg-recovery-check ./mg_recovery_check)
|
6
tools/tests/csv/comment_nodes.csv
Normal file
6
tools/tests/csv/comment_nodes.csv
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
id:ID(COMMENT_ID)|country:string|browser:string|content:string|:LABEL
|
||||||
|
0|Croatia|Chrome|yes|Message;Comment
|
||||||
|
1|United Kingdom|Chrome|thanks|Message;Comment
|
||||||
|
2|Germany||LOL|Message;Comment
|
||||||
|
3|France|Firefox|I see|Message;Comment
|
||||||
|
4|Italy|Internet Explorer|fine|Message;Comment
|
|
6
tools/tests/csv/forum_nodes.csv
Normal file
6
tools/tests/csv/forum_nodes.csv
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
id:ID(FORUM_ID)|title:string|:LABEL
|
||||||
|
0|General|Forum
|
||||||
|
1|Support|Forum
|
||||||
|
2|Music|Forum
|
||||||
|
3|Film|Forum
|
||||||
|
4|Programming|Forum
|
|
6
tools/tests/csv/relationships.csv
Normal file
6
tools/tests/csv/relationships.csv
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
:START_ID(COMMENT_ID)|:END_ID(FORUM_ID)|:TYPE
|
||||||
|
0|0|POSTED_ON
|
||||||
|
1|1|POSTED_ON
|
||||||
|
2|2|POSTED_ON
|
||||||
|
3|3|POSTED_ON
|
||||||
|
4|4|POSTED_ON
|
|
68
tools/tests/mg_recovery_check.cpp
Normal file
68
tools/tests/mg_recovery_check.cpp
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
#include <string>
|
||||||
|
|
||||||
|
#include "gflags/gflags.h"
|
||||||
|
#include "gtest/gtest.h"
|
||||||
|
|
||||||
|
#include "database/dbms.hpp"
|
||||||
|
#include "query/typed_value.hpp"
|
||||||
|
|
||||||
|
static const char *usage =
|
||||||
|
"--snapshot-dir SNAPSHOT_DIR\n"
|
||||||
|
"Check that Memgraph can recover that snapshot. This tool should be "
|
||||||
|
"invoked through 'test_mg_import' wrapper, so as to check that 'mg_import' "
|
||||||
|
"tools work correctly.\n";
|
||||||
|
|
||||||
|
DEFINE_string(snapshot_dir, "", "Path to where the snapshot is stored");
|
||||||
|
|
||||||
|
class RecoveryTest : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
void SetUp() override {
|
||||||
|
Recovery recovery;
|
||||||
|
std::string snapshot(FLAGS_snapshot_dir + "/snapshot");
|
||||||
|
recovery.Recover(snapshot, *dbms_.active());
|
||||||
|
}
|
||||||
|
|
||||||
|
Dbms dbms_;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(RecoveryTest, TestVerticesRecovered) {
|
||||||
|
auto dba = dbms_.active();
|
||||||
|
EXPECT_EQ(dba->VerticesCount(), 10);
|
||||||
|
EXPECT_EQ(dba->VerticesCount(dba->Label("Comment")), 5);
|
||||||
|
for (const auto &vertex : dba->Vertices(dba->Label("Comment"), false)) {
|
||||||
|
EXPECT_TRUE(vertex.has_label(dba->Label("Message")));
|
||||||
|
}
|
||||||
|
EXPECT_EQ(dba->VerticesCount(dba->Label("Forum")), 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RecoveryTest, TestPropertyNull) {
|
||||||
|
auto dba = dbms_.active();
|
||||||
|
bool found = false;
|
||||||
|
for (const auto &vertex : dba->Vertices(dba->Label("Comment"), false)) {
|
||||||
|
auto id_prop = query::TypedValue(vertex.PropsAt(dba->Property("id")));
|
||||||
|
auto browser = query::TypedValue(vertex.PropsAt(dba->Property("browser")));
|
||||||
|
if (id_prop.IsString() && id_prop.Value<std::string>() == "2") {
|
||||||
|
EXPECT_FALSE(found);
|
||||||
|
found = true;
|
||||||
|
EXPECT_TRUE(browser.IsNull());
|
||||||
|
} else {
|
||||||
|
EXPECT_FALSE(browser.IsNull());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT_TRUE(found);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(RecoveryTest, TestEdgesRecovered) {
|
||||||
|
auto dba = dbms_.active();
|
||||||
|
EXPECT_EQ(dba->EdgesCount(), 5);
|
||||||
|
for (const auto &edge : dba->Edges(false)) {
|
||||||
|
EXPECT_TRUE(edge.EdgeType() == dba->EdgeType("POSTED_ON"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
::testing::InitGoogleTest(&argc, argv);
|
||||||
|
gflags::SetUsageMessage(usage);
|
||||||
|
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
38
tools/tests/test_mg_import_csv
Executable file
38
tools/tests/test_mg_import_csv
Executable file
@ -0,0 +1,38 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
'''Run mg_import_csv and test that recovery works with mg_recovery_check.'''
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import subprocess
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
|
||||||
|
_SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
|
||||||
|
def parse_args():
|
||||||
|
argp = argparse.ArgumentParser(description=__doc__)
|
||||||
|
argp.add_argument('--mg-import-csv', required=True,
|
||||||
|
help='Path to mg_import_csv executable.')
|
||||||
|
argp.add_argument('--mg-recovery-check', required=True,
|
||||||
|
help='Path to mg_recovery_check executable.')
|
||||||
|
return argp.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = parse_args()
|
||||||
|
comment_nodes = os.path.join(_SCRIPT_DIR, 'csv', 'comment_nodes.csv')
|
||||||
|
forum_nodes = os.path.join(_SCRIPT_DIR, 'csv', 'forum_nodes.csv')
|
||||||
|
relationships = os.path.join(_SCRIPT_DIR, 'csv', 'relationships.csv')
|
||||||
|
with tempfile.TemporaryDirectory(suffix='-snapshots', dir=_SCRIPT_DIR) as snapshot_dir:
|
||||||
|
out_snapshot = os.path.join(snapshot_dir, 'snapshot')
|
||||||
|
mg_import_csv = [args.mg_import_csv, '--nodes', comment_nodes,
|
||||||
|
'--nodes', forum_nodes, '--relationships', relationships,
|
||||||
|
'--out', out_snapshot, '--csv-delimiter=|', '--array-delimiter=;']
|
||||||
|
subprocess.check_call(mg_import_csv)
|
||||||
|
mg_recovery_check = [args.mg_recovery_check, '--snapshot-dir', snapshot_dir]
|
||||||
|
subprocess.check_call(mg_recovery_check)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
Loading…
Reference in New Issue
Block a user