Make memgraph with storage v2 the main binary

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: buda, pullbot

Differential Revision: https://phabricator.memgraph.io/D2567
This commit is contained in:
Matej Ferencevic 2019-12-05 13:24:30 +01:00
parent c8db7f88dc
commit 8a0abd6dbd
23 changed files with 137 additions and 128 deletions

View File

@ -3,17 +3,17 @@
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# no durability
--durability-enabled=false
--snapshot-on-exit=false
--db-recover-on-startup=false
# no properties on edges
--storage-properties-on-edges=false
# no GC
--gc-cycle-sec=-1
--skiplist_gc_interval=-1
# no durability
--storage-snapshot-on-exit=false
--storage-snapshot-interval-sec=0
--storage-wal-enabled=false
--storage-recover-on-startup=false
# no query execution time limit
--query_execution_time_sec=-1
--query-execution-timeout-sec=0
# number of workers
--num-workers=1

View File

@ -1,15 +0,0 @@
# MEMGRAPH DEFAULT DURABILITY STRESS TESTING CONFIG
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# enable durability
--durability-enabled=true
--snapshot-cycle-sec=5
--snapshot-on-exit=false
--snapshot-max-retained=2
--db-recover-on-startup=true
# increase query timeout (10 min)
--query-execution-time-sec=600

View File

@ -1,15 +0,0 @@
# MEMGRAPH DEFAULT STRESS TESTING CONFIG
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# enable durability
--durability-enabled=true
--snapshot-cycle-sec=600
--snapshot-on-exit=true
--snapshot-max-retained=1
--db-recover-on-startup=false
# increase query timeout (10 min)
--query-execution-time-sec=600

View File

@ -46,8 +46,8 @@ do
# run memgraph with durability_directory pointing
# to examples snapshots_dir
${memgraph_exe} --durability-directory "$script_dir/$snapshots_dir/" \
--snapshot-on-exit > /dev/null 2>&1 &
${memgraph_exe} --data-directory "$script_dir/$snapshots_dir/" \
--storage-snapshot-on-exit > /dev/null 2>&1 &
memgraph_pid=$!
sleep 2 # wait for memgraph to start

View File

@ -158,8 +158,6 @@ target_link_libraries(mg-single-node-v2 ${MG_SINGLE_NODE_V2_LIBS})
add_dependencies(mg-single-node-v2 generate_opencypher_parser)
add_dependencies(mg-single-node-v2 generate_lcp_common)
target_compile_definitions(mg-single-node-v2 PUBLIC MG_SINGLE_NODE_V2)
add_executable(memgraph-v2 memgraph.cpp)
target_link_libraries(memgraph-v2 mg-single-node-v2 kvstore_lib telemetry_lib)
# NOTE: `include/mg_procedure.syms` describes a pattern match for symbols which
# should be dynamically exported, so that `dlopen` can correctly link the
# symbols in custom procedure module libraries.
@ -277,9 +275,13 @@ set(VERSION_STRING ${memgraph_VERSION})
configure_file(version.hpp.in version.hpp @ONLY)
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# memgraph main executable (old storage)
add_executable(memgraph-v1 memgraph.cpp)
target_link_libraries(memgraph-v1 mg-single-node kvstore_lib telemetry_lib)
# memgraph main executable
add_executable(memgraph memgraph.cpp)
target_link_libraries(memgraph mg-single-node kvstore_lib telemetry_lib)
target_link_libraries(memgraph mg-single-node-v2 kvstore_lib telemetry_lib)
set_target_properties(memgraph PROPERTIES
# Set the executable output name to include version information.
OUTPUT_NAME "memgraph-${memgraph_VERSION}-${COMMIT_HASH}_${CMAKE_BUILD_TYPE}"

View File

@ -310,6 +310,17 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
const auto &vertex = expression_result.ValueVertex();
for (const auto &label : labels_test.labels_) {
auto has_label = vertex.HasLabel(view_, GetLabel(label));
if (has_label.HasError() &&
has_label.GetError() == storage::Error::NONEXISTENT_OBJECT) {
// This is a very nasty and temporary hack in order to make MERGE
// work. The old storage had the following logic when returning an
// `OLD` view: `return old ? old : new`. That means that if the
// `OLD` view didn't exist, it returned the NEW view. With this hack
// we simulate that behavior.
// TODO (mferencevic, teon.banek): Remove once MERGE is
// reimplemented.
has_label = vertex.HasLabel(storage::View::NEW, GetLabel(label));
}
if (has_label.HasError()) {
switch (has_label.GetError()) {
case storage::Error::DELETED_OBJECT:
@ -551,6 +562,17 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
PropertyIx prop) {
auto maybe_prop =
record_accessor.GetProperty(view_, ctx_->properties[prop.ix]);
if (maybe_prop.HasError() &&
maybe_prop.GetError() == storage::Error::NONEXISTENT_OBJECT) {
// This is a very nasty and temporary hack in order to make MERGE work.
// The old storage had the following logic when returning an `OLD` view:
// `return old ? old : new`. That means that if the `OLD` view didn't
// exist, it returned the NEW view. With this hack we simulate that
// behavior.
// TODO (mferencevic, teon.banek): Remove once MERGE is reimplemented.
maybe_prop = record_accessor.GetProperty(storage::View::NEW,
ctx_->properties[prop.ix]);
}
if (maybe_prop.HasError()) {
switch (maybe_prop.GetError()) {
case storage::Error::DELETED_OBJECT:
@ -574,6 +596,17 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
const std::string_view &name) {
auto maybe_prop =
record_accessor.GetProperty(view_, dba_->NameToProperty(name));
if (maybe_prop.HasError() &&
maybe_prop.GetError() == storage::Error::NONEXISTENT_OBJECT) {
// This is a very nasty and temporary hack in order to make MERGE work.
// The old storage had the following logic when returning an `OLD` view:
// `return old ? old : new`. That means that if the `OLD` view didn't
// exist, it returned the NEW view. With this hack we simulate that
// behavior.
// TODO (mferencevic, teon.banek): Remove once MERGE is reimplemented.
maybe_prop =
record_accessor.GetProperty(view_, dba_->NameToProperty(name));
}
if (maybe_prop.HasError()) {
switch (maybe_prop.GetError()) {
case storage::Error::DELETED_OBJECT:

View File

@ -29,7 +29,7 @@ fi
# Start memgraph.
$binary_dir/memgraph \
--durability-directory=$tmpdir \
--data-directory=$tmpdir \
--query-execution-timeout-sec=5 \
--session-inactivity-timeout=10 &
pid=$!

View File

@ -56,7 +56,8 @@ def wait_for_server(port, delay=0.1):
def execute_test(memgraph_binary, tester_binary):
storage_directory = tempfile.TemporaryDirectory()
memgraph_args = [memgraph_binary,
"--durability-directory", storage_directory.name,
"--storage-properties-on-edges",
"--data-directory", storage_directory.name,
"--audit-enabled"]
# Start the memgraph binary

View File

@ -189,7 +189,7 @@ def check_permissions(query_perms, user_perms):
def execute_test(memgraph_binary, tester_binary, checker_binary):
storage_directory = tempfile.TemporaryDirectory()
memgraph_args = [memgraph_binary,
"--durability-directory", storage_directory.name]
"--data-directory", storage_directory.name]
def execute_admin_queries(queries):
return execute_tester(tester_binary, queries, should_fail=False,

View File

@ -95,7 +95,7 @@ class Memgraph:
with open(self._auth_config, "w") as f:
f.write(CONFIG_TEMPLATE.format(**config))
args = [self._binary,
"--durability-directory", self._storage_directory.name,
"--data-directory", self._storage_directory.name,
"--auth-module-executable",
kwargs.pop("module_executable", self._auth_module)]
for key, value in kwargs.items():

View File

@ -32,9 +32,9 @@ class Memgraph:
default=get_absolute_path("memgraph", "build"))
argp.add_argument("--port", default="7687",
help="Database and client port")
argp.add_argument("--durability-directory", default=None)
argp.add_argument("--snapshot-on-exit", action="store_true")
argp.add_argument("--db-recover-on-startup", action="store_true")
argp.add_argument("--data-directory", default=None)
argp.add_argument("--storage-snapshot-on-exit", action="store_true")
argp.add_argument("--storage-recover-on-startup", action="store_true")
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.config = config
@ -49,13 +49,12 @@ class Memgraph:
database_args = ["--port", self.args.port]
if self.num_workers:
database_args += ["--num_workers", str(self.num_workers)]
if self.args.durability_directory:
database_args += ["--durability-directory",
self.args.durability_directory]
if self.args.db_recover_on_startup:
database_args += ["--db-recover-on-startup"]
if self.args.snapshot_on_exit:
database_args += ["--snapshot-on-exit"]
if self.args.data_directory:
database_args += ["--data-directory", self.args.data_directory]
if self.args.storage_recover_on_startup:
database_args += ["--storage-recover-on-startup"]
if self.args.storage_snapshot_on_exit:
database_args += ["--storage-snapshot-on-exit"]
# find executable path
runner_bin = self.args.runner_bin

View File

@ -52,6 +52,9 @@ def vertex(vertex_index):
def main():
# create an index to speed setup up
print("CREATE INDEX ON :" + LABEL_INDEX + ";")
for i in range(LABEL_COUNT):
print("CREATE INDEX ON :" + LABEL_PREFIX + str(i) + ";")
print("CREATE INDEX ON :%s(%s);" % (LABEL_INDEX, ID))
# we batch CREATEs because to speed creation up

View File

@ -3,17 +3,14 @@
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# recover from the 'durability' directory
--durability-directory=durability
--db-recover-on-startup=true
# recover from the 'data' directory
--data-directory=mg_data
--storage-recover-on-startup=true
# but don't perform durability
--durability-enabled=false
--snapshot-on-exit=false
# cleaning cycle interval
# if set to -1 the GC will not run
--gc-cycle-sec=-1
--storage-snapshot-interval-sec=0
--storage-wal-enabled=false
--storage-snapshot-on-exit=false
# number of workers
--num-workers=8

View File

@ -41,9 +41,9 @@ class Memgraph:
# database args
database_args = [binary, "--num-workers", self.num_workers,
"--durability-directory", os.path.join(self.dataset,
"memgraph"),
"--db-recover-on-startup", "true",
"--data-directory", os.path.join(self.dataset,
"memgraph"),
"--storage-recover-on-startup", "true",
"--port", self.port]
# database env

View File

@ -92,17 +92,17 @@ class MemgraphRunner():
self.proc_mg = None
self.args = []
def start(self, args):
def start(self, args=[]):
if args == self.args and self.is_running():
return
self.stop()
self.args = copy.deepcopy(args)
self.durability_directory = tempfile.TemporaryDirectory()
self.data_directory = tempfile.TemporaryDirectory()
memgraph_binary = os.path.join(self.build_directory, "memgraph")
args_mg = [memgraph_binary, "--durability-directory",
self.durability_directory.name]
args_mg = [memgraph_binary, "--storage-properties-on-edges",
"--data-directory", self.data_directory.name]
self.proc_mg = subprocess.Popen(args_mg + self.args)
wait_for_server(7687, 1)
assert self.is_running(), "The Memgraph process died!"
@ -155,10 +155,7 @@ def main():
for suite in suites:
print("Starting suite '{}' scenarios.".format(suite["name"]))
params = []
if "properties_on_disk" in suite:
params = ["--properties-on-disk=" + suite["properties_on_disk"]]
memgraph.start(params)
memgraph.start()
suite["stats_file"] = os.path.join(output_dir.name,
suite["name"] + ".json")

View File

@ -2,11 +2,6 @@
test_suite: memgraph_V1
must_pass: true
- name: memgraph_V1_POD
test_suite: memgraph_V1
properties_on_disk: "x,y,z,w,k,v,a,b,c,d,e,f,r,t,o,prop,age,name,surname,location"
must_pass: true
- name: openCypher_M09
test_suite: openCypher_M09
must_pass: false

View File

@ -6,10 +6,6 @@
- ../../config # directory with config files
- ../../build_release/tests/stress # stress client binaries
- name: stress_properties_on_disk
commands: TIMEOUT=600 ./continuous_integration --properties-on-disk
infiles: *STRESS_INFILES
- name: stress_ssl
commands: TIMEOUT=600 ./continuous_integration --use-ssl
infiles: *STRESS_INFILES

View File

@ -129,6 +129,10 @@ def execution_handler():
output_data.add_measurement("cleanup_time",
cleanup_end_time - start_time)
# create indices
session.run('CREATE INDEX ON :U').consume()
session.run('CREATE INDEX ON :V').consume()
# create U vertices
for b in batch(render('CREATE (:U {{id: {}}})', range(args.u_count)),
args.vertex_batch_size):

View File

@ -71,7 +71,6 @@ LARGE_DATASET = [
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
CONFIG_DIR = os.path.join(BASE_DIR, "config")
MEASUREMENTS_FILE = os.path.join(SCRIPT_DIR, ".apollo_measurements")
KEY_FILE = os.path.join(SCRIPT_DIR, ".key.pem")
CERT_FILE = os.path.join(SCRIPT_DIR, ".cert.pem")
@ -125,11 +124,8 @@ def run_test(args, test, options, timeout):
parser = argparse.ArgumentParser(description = "Run stress tests on Memgraph.")
parser.add_argument("--memgraph", default = os.path.join(BUILD_DIR,
"memgraph"))
parser.add_argument("--config", default = os.path.join(CONFIG_DIR,
"stress.conf"))
parser.add_argument("--log-file", default = "")
parser.add_argument("--durability-directory", default = "")
parser.add_argument("--properties-on-disk", action = "store_true")
parser.add_argument("--data-directory", default = "")
parser.add_argument("--python", default = os.path.join(SCRIPT_DIR,
"ve3", "bin", "python3"), type = str)
parser.add_argument("--large-dataset", action = "store_const",
@ -154,19 +150,23 @@ if args.use_ssl:
# start memgraph
cwd = os.path.dirname(args.memgraph)
cmd = [args.memgraph, "--num-workers=" + str(THREADS)]
cmd = [args.memgraph, "--num-workers=" + str(THREADS),
"--storage-properties-on-edges=true",
"--storage-snapshot-on-exit=true",
"--storage-snapshot-interval-sec=600",
"--storage-snapshot-retention-count=1",
"--storage-wal-enabled=true",
"--storage-recover-on-startup=false",
"--query-execution-timeout-sec=600"]
if not args.verbose:
cmd += ["--min-log-level", "1"]
if args.log_file:
cmd += ["--log-file", args.log_file]
if args.durability_directory:
cmd += ["--durability-directory", args.durability_directory]
if args.properties_on_disk:
cmd += ["--properties-on-disk", "id,x"]
if args.data_directory:
cmd += ["--data-directory", args.data_directory]
if args.use_ssl:
cmd += ["--cert-file", CERT_FILE, "--key-file", KEY_FILE]
proc_mg = subprocess.Popen(cmd, cwd = cwd,
env = {"MEMGRAPH_CONFIG": args.config})
proc_mg = subprocess.Popen(cmd, cwd = cwd)
time.sleep(1.0)
# at exit cleanup

View File

@ -96,6 +96,10 @@ def create_handler():
with argument_session(args) as session:
session.run("MATCH (n) DETACH DELETE n").consume()
# create indices
for i in range(args.worker_count):
session.run("CREATE INDEX ON :Label_T" + str(i)).consume()
# concurrent create execution & tests
with multiprocessing.Pool(args.worker_count) as p:
for worker_id, create_time, time_unit in \

View File

@ -23,8 +23,7 @@ from multiprocessing import Pool, Manager
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
BASE_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(BASE_DIR, "build")
CONFIG_DIR = os.path.join(BASE_DIR, "config")
DURABILITY_DIR = os.path.join(BUILD_DIR, "durability")
DATA_DIR = os.path.join(BUILD_DIR, "mg_data")
if "THREADS" in os.environ:
DB_WORKERS = os.environ["THREADS"]
else:
@ -38,12 +37,10 @@ parser = connection_argument_parser()
parser.add_argument("--memgraph", default=os.path.join(BUILD_DIR,
"memgraph"))
parser.add_argument("--config", default=os.path.join(CONFIG_DIR,
"durability_stress.conf"))
parser.add_argument("--log-file", default="")
parser.add_argument("--verbose", action="store_const",
const=True, default=False)
parser.add_argument("--durability-directory", default=DURABILITY_DIR)
parser.add_argument("--data-directory", default=DATA_DIR)
parser.add_argument("--num-clients", default=multiprocessing.cpu_count())
parser.add_argument("--num-steps", type=int, default=5)
args = parser.parse_args()
@ -54,20 +51,27 @@ if not os.path.exists(args.memgraph):
# Memgraph run command construction
cwd = os.path.dirname(args.memgraph)
cmd = [args.memgraph, "--num-workers=" + str(DB_WORKERS)]
cmd = [args.memgraph, "--num-workers=" + str(DB_WORKERS),
"--storage-properties-on-edges=true",
"--storage-snapshot-on-exit=false",
"--storage-snapshot-interval-sec=5",
"--storage-snapshot-retention-count=2",
"--storage-wal-enabled=true",
"--storage-recover-on-startup=true",
"--query-execution-timeout-sec=600"]
if not args.verbose:
cmd += ["--min-log-level", "1"]
if args.log_file:
cmd += ["--log-file", args.log_file]
if args.durability_directory:
cmd += ["--durability-directory", args.durability_directory]
if args.data_directory:
cmd += ["--data-directory", args.data_directory]
data_manager = Manager()
data = data_manager.dict()
# Pretest cleanup
if os.path.exists(DURABILITY_DIR):
shutil.rmtree(DURABILITY_DIR)
if os.path.exists(DATA_DIR):
shutil.rmtree(DATA_DIR)
atexit.register(SessionCache.cleanup)
@ -92,8 +96,7 @@ def wait_for_server(port, delay=0.1):
def run_memgraph():
global proc_mg
proc_mg = subprocess.Popen(cmd, cwd=cwd,
env={"MEMGRAPH_CONFIG": args.config})
proc_mg = subprocess.Popen(cmd, cwd=cwd)
# Wait for Memgraph to finish the recovery process
wait_for_server(args.endpoint.split(":")[1])

View File

@ -141,26 +141,28 @@ class GraphSession {
void RemoveVertex() {
auto vertex_id = RandomElement(vertices_);
auto ret =
Execute(fmt::format("MATCH (n:{} {{id: {}}}) OPTIONAL MATCH (n)-[r]-() "
"DETACH DELETE n RETURN n.id, labels(n), r.id",
indexed_label_, vertex_id));
auto ret = Execute(fmt::format(
"MATCH (n:{} {{id: {}}}) OPTIONAL MATCH (n)-[r]-() "
"WITH n, n.id as n_id, labels(n) as labels_n, collect(r.id) as r_ids "
"DETACH DELETE n RETURN n_id, labels_n, r_ids",
indexed_label_, vertex_id));
if (ret.records.size() > 0) {
std::set<uint64_t> processed_vertices;
for (auto &record : ret.records) {
// remove vertex but note there could be duplicates
auto n_id = record[0].ValueInt();
if (processed_vertices.insert(n_id).second) {
vertices_.erase(n_id);
for (auto &label : record[1].ValueList()) {
if (label.ValueString() == indexed_label_) {
continue;
}
labels_vertices_[label.ValueString()].erase(n_id);
auto &record = ret.records[0];
// remove vertex but note there could be duplicates
auto n_id = record[0].ValueInt();
if (processed_vertices.insert(n_id).second) {
vertices_.erase(n_id);
for (auto &label : record[1].ValueList()) {
if (label.ValueString() == indexed_label_) {
continue;
}
labels_vertices_[label.ValueString()].erase(n_id);
}
// remove edge
auto &edge = record[2];
}
// remove edge
auto &edges = record[2];
for (auto &edge : edges.ValueList()) {
if (edge.type() == ValueT::Type::Int) {
edges_.erase(edge.ValueInt());
}
@ -207,7 +209,7 @@ class GraphSession {
void RemoveEdge() {
auto edge_id = RandomElement(edges_);
auto ret = Execute(
fmt::format("MATCH (:{})-[e {{id: {}}}]->(:{}) DELETE e RETURN e.id",
fmt::format("MATCH (:{})-[e {{id: {}}}]->(:{}) DELETE e RETURN 1",
indexed_label_, edge_id, indexed_label_));
if (ret.records.size() > 0) {
edges_.erase(edge_id);
@ -390,6 +392,7 @@ int main(int argc, char **argv) {
// cleanup and create indexes
client.Execute("MATCH (n) DETACH DELETE n", {});
for (int i = 0; i < FLAGS_worker_count; ++i) {
client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}", i), {});
client.Execute(fmt::format("CREATE INDEX ON :indexed_label{}(id)", i), {});
}

View File

@ -35,7 +35,9 @@ if (lower_build_type STREQUAL "release")
COMMENT Stripping symbols and sections from mg_client)
endif()
install(TARGETS mg_import_csv RUNTIME DESTINATION bin)
# TODO (mferencevic): Currently the `mg_import_csv` tool is tailored to the old
# storage and doesn't work with storage-v2.
# install(TARGETS mg_import_csv RUNTIME DESTINATION bin)
install(TARGETS mg_client RUNTIME DESTINATION bin)
# Target for building all the tool executables.