diff --git a/src/storage/v2/disk/durable_metadata.cpp b/src/storage/v2/disk/durable_metadata.cpp index 13d515af2..c1f44a587 100644 --- a/src/storage/v2/disk/durable_metadata.cpp +++ b/src/storage/v2/disk/durable_metadata.cpp @@ -42,7 +42,7 @@ DurableMetadata::DurableMetadata(const Config &config) DurableMetadata::DurableMetadata(DurableMetadata &&other) noexcept : durability_kvstore_(std::move(other.durability_kvstore_)), config_(std::move(other.config_)) {} -void DurableMetadata::SaveBeforeClosingDB(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count) { +void DurableMetadata::UpdateMetaData(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count) { durability_kvstore_.Put(kLastTransactionStartTimeStamp, std::to_string(timestamp)); durability_kvstore_.Put(kVertexCountDescr, std::to_string(vertex_count)); durability_kvstore_.Put(kEdgeDountDescr, std::to_string(edge_count)); diff --git a/src/storage/v2/disk/durable_metadata.hpp b/src/storage/v2/disk/durable_metadata.hpp index 4aaa8a707..06a26ac15 100644 --- a/src/storage/v2/disk/durable_metadata.hpp +++ b/src/storage/v2/disk/durable_metadata.hpp @@ -41,7 +41,7 @@ class DurableMetadata { std::optional> LoadExistenceConstraintInfoIfExists() const; std::optional> LoadUniqueConstraintInfoIfExists() const; - void SaveBeforeClosingDB(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count); + void UpdateMetaData(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count); bool PersistLabelIndexCreation(LabelId label); diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp index 4dbd248f7..9aa6613c7 100644 --- a/src/storage/v2/disk/storage.cpp +++ b/src/storage/v2/disk/storage.cpp @@ -274,8 +274,8 @@ DiskStorage::DiskStorage(Config config) } DiskStorage::~DiskStorage() { - durable_metadata_.SaveBeforeClosingDB(timestamp_, vertex_count_.load(std::memory_order_acquire), - edge_count_.load(std::memory_order_acquire)); + durable_metadata_.UpdateMetaData(timestamp_, vertex_count_.load(std::memory_order_acquire), + edge_count_.load(std::memory_order_acquire)); logging::AssertRocksDBStatus(kvstore_->db_->DestroyColumnFamilyHandle(kvstore_->vertex_chandle)); logging::AssertRocksDBStatus(kvstore_->db_->DestroyColumnFamilyHandle(kvstore_->edge_chandle)); logging::AssertRocksDBStatus(kvstore_->db_->DestroyColumnFamilyHandle(kvstore_->out_edges_chandle)); @@ -1786,7 +1786,8 @@ utils::BasicResult DiskStorage::DiskAccessor::Co if (flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) { disk_storage->indices_.text_index_.Commit(); } - + disk_storage->durable_metadata_.UpdateMetaData(disk_storage->timestamp_, disk_storage->vertex_count_, + disk_storage->edge_count_); is_transaction_active_ = false; return {}; diff --git a/src/storage/v2/disk/storage.hpp b/src/storage/v2/disk/storage.hpp index 349a7454a..74f4f4136 100644 --- a/src/storage/v2/disk/storage.hpp +++ b/src/storage/v2/disk/storage.hpp @@ -301,6 +301,8 @@ class DiskStorage final : public Storage { EdgeImportMode GetEdgeImportMode() const; + DurableMetadata *GetDurableMetadata() { return &durable_metadata_; } + private: void LoadPersistingMetadataInfo(); diff --git a/tests/mgbench/benchmark.py b/tests/mgbench/benchmark.py index cd3fb846f..9c8f1a7d2 100755 --- a/tests/mgbench/benchmark.py +++ b/tests/mgbench/benchmark.py @@ -632,10 +632,12 @@ def run_isolated_workload_without_authorization(vendor_runner, client, queries, def setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload, storage_mode): - vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT) + if benchmark_context.vendor_name == "memgraph": + # Neo4j will get started just before import -> without this if statement it would try to start it twice + vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT) log.info("Executing database index setup") start_time = time.time() - + import_results = None if generated_queries: client.execute(queries=workload.indexes_generator(), num_workers=1) log.info("Finished setting up indexes.") diff --git a/tests/mgbench/graph_bench.py b/tests/mgbench/graph_bench.py index f329cfcb7..bcba55324 100644 --- a/tests/mgbench/graph_bench.py +++ b/tests/mgbench/graph_bench.py @@ -127,8 +127,6 @@ def run_full_benchmarks( ], ] - assert not realistic or not mixed, "Cannot run both realistic and mixed workload, please select one!" - if realistic: # Configurations for full workload for count, write, read, update, analytical in realistic: diff --git a/tests/mgbench/mg_ondisk_vs_neo4j_pokec.sh b/tests/mgbench/mg_ondisk_vs_neo4j_pokec.sh new file mode 100644 index 000000000..0381448fa --- /dev/null +++ b/tests/mgbench/mg_ondisk_vs_neo4j_pokec.sh @@ -0,0 +1,99 @@ +#!/bin/bash + +# Currently only pokec dataset is modified to be used with memgraph on-disk storage + +pushd () { command pushd "$@" > /dev/null; } +popd () { command popd "$@" > /dev/null; } +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +pushd "$SCRIPT_DIR" + +# Help function +function show_help() { + echo "Usage: $0 [OPTIONS]" + echo "Options:" + echo " -n, --neo4j-path Path to Neo4j binary" + echo " -m, --memgraph-path Path to Memgraph binary" + echo " -w, --num-workers Number of workers for benchmark and import" + echo " -d, --dataset_size dataset_size (small, medium, large)" + echo " -h, --help Show this help message" + exit 0 +} + +# Default values +neo4j_path="/usr/share/neo4j/bin/neo4j" +memgraph_path="../../build/memgraph" +num_workers=12 +dataset_size="small" + +# Parse command line arguments +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -n|--neo4j-path) + neo4j_path="$2" + shift + shift + ;; + -m|--memgraph-path) + memgraph_path="$2" + shift + shift + ;; + -w|--num-workers) + num_workers="$2" + shift + shift + ;; + -d|--dataset_size) + dataset_size="$2" + shift + shift + ;; + -h|--help) + show_help + ;; + *) + echo "Invalid option: $1" + show_help + ;; + esac +done + +if [ ! -d "pokec_${dataset_size}_results" ]; then + mkdir "pokec_${dataset_size}_results" +fi + +# Run Python: Mgbench - Neo4j +echo "Running Python: Mgbench - Neo4j" +python3 benchmark.py vendor-native \ + --vendor-binary "$neo4j_path" \ + --vendor-name neo4j \ + --num-workers-for-benchmark "$num_workers" \ + --num-workers-for-import "$num_workers" \ + --no-load-query-counts \ + --export-results "pokec_${dataset_size}_results/neo4j_${dataset_size}_pokec.json" \ + "pokec_disk/${dataset_size}/*/*" \ + --vendor-specific "config=$neo4j_path/conf/neo4j.conf" \ + --no-authorization + +# Run Python: Mgbench - Memgraph - on-disk +echo "Running Python: Mgbench - Memgraph - on-disk" +python3 benchmark.py vendor-native \ + --vendor-binary "$memgraph_path" \ + --vendor-name memgraph \ + --num-workers-for-benchmark "$num_workers" \ + --num-workers-for-import "$num_workers" \ + --no-load-query-counts \ + --export-results-on-disk-txn "pokec_${dataset_size}_results/on_disk_${dataset_size}_pokec.json" \ + --export-results "pokec_${dataset_size}_results/on_disk_export_${dataset_size}_pokec.json" \ + "pokec_disk/${dataset_size}/*/*" \ + --no-authorization \ + --vendor-specific "data-directory=benchmark_datadir" "storage-mode=ON_DISK_TRANSACTIONAL" + +echo "Comparing results" +python3 compare_results.py --compare \ + "pokec_${dataset_size}_results/neo4j_${dataset_size}_pokec.json" \ + "pokec_${dataset_size}_results/on_disk_${dataset_size}_pokec.json" \ + --output \ + "pokec_${dataset_size}_results/neo4j_vs_mg_ondisk_results.html" \ + --different-vendors diff --git a/tests/mgbench/runners.py b/tests/mgbench/runners.py index 155ceac06..005bcb60f 100644 --- a/tests/mgbench/runners.py +++ b/tests/mgbench/runners.py @@ -634,7 +634,7 @@ class Neo4j(BaseRunner): exit_proc = subprocess.run(args=[self._neo4j_binary, "stop"], capture_output=True, check=True) return exit_proc.returncode, usage else: - return 0 + return 0, 0 def start_db_init(self, workload): if self._performance_tracking: diff --git a/tests/mgbench/workloads/base.py b/tests/mgbench/workloads/base.py index 5264dcba9..ab4c21059 100644 --- a/tests/mgbench/workloads/base.py +++ b/tests/mgbench/workloads/base.py @@ -160,12 +160,7 @@ class Workload(ABC): raise ValueError("Vendor does not have INDEX for dataset!") def _set_local_files(self) -> None: - if not self.disk_workload: - if self.LOCAL_FILE is not None: - self._local_file = self.LOCAL_FILE.get(self._variant, None) - else: - self._local_file = None - else: + if self.disk_workload and self._vendor != "neo4j": if self.LOCAL_FILE_NODES is not None: self._local_file_nodes = self.LOCAL_FILE_NODES.get(self._variant, None) else: @@ -175,14 +170,14 @@ class Workload(ABC): self._local_file_edges = self.LOCAL_FILE_EDGES.get(self._variant, None) else: self._local_file_edges = None + else: + if self.LOCAL_FILE is not None: + self._local_file = self.LOCAL_FILE.get(self._variant, None) + else: + self._local_file = None def _set_url_files(self) -> None: - if not self.disk_workload: - if self.URL_FILE is not None: - self._url_file = self.URL_FILE.get(self._variant, None) - else: - self._url_file = None - else: + if self.disk_workload and self._vendor != "neo4j": if self.URL_FILE_NODES is not None: self._url_file_nodes = self.URL_FILE_NODES.get(self._variant, None) else: @@ -191,6 +186,11 @@ class Workload(ABC): self._url_file_edges = self.URL_FILE_EDGES.get(self._variant, None) else: self._url_file_edges = None + else: + if self.URL_FILE is not None: + self._url_file = self.URL_FILE.get(self._variant, None) + else: + self._url_file = None def _set_local_index_file(self) -> None: if self.LOCAL_INDEX_FILE is not None: @@ -205,10 +205,10 @@ class Workload(ABC): self._url_index = None def prepare(self, directory): - if not self.disk_workload: - self._prepare_dataset_for_in_memory_workload(directory) - else: + if self.disk_workload and self._vendor != "neo4j": self._prepare_dataset_for_on_disk_workload(directory) + else: + self._prepare_dataset_for_in_memory_workload(directory) if self._local_index is not None: print("Using local index file:", self._local_index) diff --git a/tests/mgbench/workloads/disk_pokec.py b/tests/mgbench/workloads/disk_pokec.py index f19110a0c..a296e4836 100644 --- a/tests/mgbench/workloads/disk_pokec.py +++ b/tests/mgbench/workloads/disk_pokec.py @@ -13,7 +13,8 @@ import random from benchmark_context import BenchmarkContext from workloads.base import Workload -from workloads.importers.disk_importer_pokec import ImporterPokec +from workloads.importers.disk_importer_pokec import DiskImporterPokec +from workloads.importers.importer_pokec import ImporterPokec class Pokec(Workload): @@ -22,6 +23,12 @@ class Pokec(Workload): DEFAULT_VARIANT = "small" FILE = None + URL_FILE = { + "small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_small_import.cypher", + "medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_medium_import.cypher", + "large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_large.setup.cypher.gz", + } + URL_FILE_NODES = { "small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_small_import_nodes.cypher", "medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_medium_import_nodes.cypher", @@ -42,7 +49,7 @@ class Pokec(Workload): URL_INDEX_FILE = { "memgraph": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/memgraph.cypher", - "neo4j": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/neo4j.cypher", + "neo4j": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/neo4j.cypher", } PROPERTIES_ON_EDGES = False @@ -51,15 +58,26 @@ class Pokec(Workload): super().__init__(variant, benchmark_context=benchmark_context, disk_workload=True) def custom_import(self) -> bool: - importer = ImporterPokec( - benchmark_context=self.benchmark_context, - dataset_name=self.NAME, - index_file=self._file_index, - dataset_nodes_file=self._node_file, - dataset_edges_file=self._edge_file, - variant=self._variant, - ) - return importer.execute_import() + if self._vendor == "neo4j": + importer = ImporterPokec( + benchmark_context=self.benchmark_context, + dataset_name=self.NAME, + index_file=self._file_index, + dataset_file=self._file, + variant=self._variant, + ) + return importer.execute_import() + + else: + importer = DiskImporterPokec( + benchmark_context=self.benchmark_context, + dataset_name=self.NAME, + index_file=self._file_index, + dataset_nodes_file=self._node_file, + dataset_edges_file=self._edge_file, + variant=self._variant, + ) + return importer.execute_import() # Helpers used to generate the queries def _get_random_vertex(self): @@ -214,12 +232,22 @@ class Pokec(Workload): # OK def benchmark__arango__allshortest_paths(self): vertex_from, vertex_to = self._get_random_from_to() - return ( + memgraph = ( "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "MATCH p=(n)-[*allshortest 2 (r, n | 1) total_weight]->(m) " "RETURN extract(n in nodes(p) | n.id) AS path", {"from": vertex_from, "to": vertex_to}, ) + neo4j = ( + "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + "MATCH p = allShortestPaths((n)-[*..2]->(m)) " + "RETURN [node in nodes(p) | node.id] AS path", + {"from": vertex_from, "to": vertex_to}, + ) + if self._vendor == "neo4j": + return neo4j + else: + return memgraph # Our benchmark queries diff --git a/tests/mgbench/workloads/importers/disk_importer_pokec.py b/tests/mgbench/workloads/importers/disk_importer_pokec.py index 560d7da9e..f487dc8f3 100644 --- a/tests/mgbench/workloads/importers/disk_importer_pokec.py +++ b/tests/mgbench/workloads/importers/disk_importer_pokec.py @@ -17,7 +17,7 @@ from constants import * from runners import BaseRunner -class ImporterPokec: +class DiskImporterPokec: def __init__( self, benchmark_context: BenchmarkContext, diff --git a/tests/mgbench/workloads/pokec.py b/tests/mgbench/workloads/pokec.py index 6733d38f2..4c05796b2 100644 --- a/tests/mgbench/workloads/pokec.py +++ b/tests/mgbench/workloads/pokec.py @@ -167,30 +167,62 @@ class Pokec(Workload): def benchmark__arango__shortest_path(self): vertex_from, vertex_to = self._get_random_from_to() - return ( + memgraph = ( "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "MATCH p=(n)-[*bfs..15]->(m) " "RETURN extract(n in nodes(p) | n.id) AS path", {"from": vertex_from, "to": vertex_to}, ) + neo4j = ( + "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + "MATCH p=shortestPath((n)-[*..15]->(m)) " + "RETURN [n in nodes(p) | n.id] AS path", + {"from": vertex_from, "to": vertex_to}, + ) + if self._vendor == "memgraph": + return memgraph + else: + return neo4j def benchmark__arango__shortest_path_with_filter(self): vertex_from, vertex_to = self._get_random_from_to() - return ( + memgraph = ( "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) " "RETURN extract(n in nodes(p) | n.id) AS path", {"from": vertex_from, "to": vertex_to}, ) + neo4j = ( + "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + "MATCH p=shortestPath((n)-[*..15]->(m)) " + "WHERE all(node in nodes(p) WHERE node.age >= 18) " + "RETURN [n in nodes(p) | n.id] AS path", + {"from": vertex_from, "to": vertex_to}, + ) + if self._vendor == "memgraph": + return memgraph + else: + return neo4j + def benchmark__arango__allshortest_paths(self): vertex_from, vertex_to = self._get_random_from_to() - return ( + memgraph = ( "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "MATCH p=(n)-[*allshortest 2 (r, n | 1) total_weight]->(m) " "RETURN extract(n in nodes(p) | n.id) AS path", {"from": vertex_from, "to": vertex_to}, ) + neo4j = ( + "MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " + "MATCH p = allShortestPaths((n)-[*..2]->(m)) " + "RETURN [node in nodes(p) | node.id] AS path", + {"from": vertex_from, "to": vertex_to}, + ) + if self._vendor == "memgraph": + return memgraph + else: + return neo4j # Our benchmark queries diff --git a/tests/unit/clearing_old_disk_data.cpp b/tests/unit/clearing_old_disk_data.cpp index 395391e12..58682a845 100644 --- a/tests/unit/clearing_old_disk_data.cpp +++ b/tests/unit/clearing_old_disk_data.cpp @@ -179,3 +179,35 @@ TEST_F(ClearingOldDiskDataTest, TestNumOfEntriesWithEdgeValueUpdate) { ASSERT_EQ(disk_test_utils::GetRealNumberOfEntriesInRocksDB(tx_db), 5); } + +TEST_F(ClearingOldDiskDataTest, TestTimestampAfterCommit) { + auto *tx_db = disk_storage->GetRocksDBStorage()->db_; + ASSERT_EQ(disk_test_utils::GetRealNumberOfEntriesInRocksDB(tx_db), 0); + + auto acc1 = disk_storage->Access(ReplicationRole::MAIN); + auto vertex1 = acc1->CreateVertex(); + auto label1 = acc1->NameToLabel("DiskLabel"); + auto property1 = acc1->NameToProperty("DiskProperty"); + ASSERT_TRUE(vertex1.AddLabel(label1).HasValue()); + ASSERT_TRUE(vertex1.SetProperty(property1, memgraph::storage::PropertyValue(10)).HasValue()); + ASSERT_FALSE(acc1->Commit().HasError()); + ASSERT_EQ(disk_test_utils::GetRealNumberOfEntriesInRocksDB(tx_db), 1); + + auto saved_timestamp = disk_storage->GetDurableMetadata()->LoadTimestampIfExists(); + ASSERT_EQ(saved_timestamp.has_value(), true); + ASSERT_EQ(disk_storage->timestamp_, saved_timestamp); + + auto acc2 = disk_storage->Access(ReplicationRole::MAIN); + auto vertex2 = acc2->CreateVertex(); + auto label2 = acc2->NameToLabel("DiskLabel2"); + auto property2 = acc2->NameToProperty("DiskProperty2"); + + ASSERT_TRUE(vertex2.AddLabel(label2).HasValue()); + ASSERT_TRUE(vertex2.SetProperty(property2, memgraph::storage::PropertyValue(10)).HasValue()); + ASSERT_FALSE(acc2->Commit().HasError()); + ASSERT_EQ(disk_test_utils::GetRealNumberOfEntriesInRocksDB(tx_db), 2); + + saved_timestamp = disk_storage->GetDurableMetadata()->LoadTimestampIfExists(); + ASSERT_EQ(saved_timestamp.has_value(), true); + ASSERT_EQ(disk_storage->timestamp_, saved_timestamp); +}