Merge branch 'master' into run-package-in-mgbuilder
This commit is contained in:
commit
a8a6b7e533
@ -42,7 +42,7 @@ DurableMetadata::DurableMetadata(const Config &config)
|
||||
DurableMetadata::DurableMetadata(DurableMetadata &&other) noexcept
|
||||
: durability_kvstore_(std::move(other.durability_kvstore_)), config_(std::move(other.config_)) {}
|
||||
|
||||
void DurableMetadata::SaveBeforeClosingDB(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count) {
|
||||
void DurableMetadata::UpdateMetaData(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count) {
|
||||
durability_kvstore_.Put(kLastTransactionStartTimeStamp, std::to_string(timestamp));
|
||||
durability_kvstore_.Put(kVertexCountDescr, std::to_string(vertex_count));
|
||||
durability_kvstore_.Put(kEdgeDountDescr, std::to_string(edge_count));
|
||||
|
@ -41,7 +41,7 @@ class DurableMetadata {
|
||||
std::optional<std::vector<std::string>> LoadExistenceConstraintInfoIfExists() const;
|
||||
std::optional<std::vector<std::string>> LoadUniqueConstraintInfoIfExists() const;
|
||||
|
||||
void SaveBeforeClosingDB(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count);
|
||||
void UpdateMetaData(uint64_t timestamp, uint64_t vertex_count, uint64_t edge_count);
|
||||
|
||||
bool PersistLabelIndexCreation(LabelId label);
|
||||
|
||||
|
@ -274,8 +274,8 @@ DiskStorage::DiskStorage(Config config)
|
||||
}
|
||||
|
||||
DiskStorage::~DiskStorage() {
|
||||
durable_metadata_.SaveBeforeClosingDB(timestamp_, vertex_count_.load(std::memory_order_acquire),
|
||||
edge_count_.load(std::memory_order_acquire));
|
||||
durable_metadata_.UpdateMetaData(timestamp_, vertex_count_.load(std::memory_order_acquire),
|
||||
edge_count_.load(std::memory_order_acquire));
|
||||
logging::AssertRocksDBStatus(kvstore_->db_->DestroyColumnFamilyHandle(kvstore_->vertex_chandle));
|
||||
logging::AssertRocksDBStatus(kvstore_->db_->DestroyColumnFamilyHandle(kvstore_->edge_chandle));
|
||||
logging::AssertRocksDBStatus(kvstore_->db_->DestroyColumnFamilyHandle(kvstore_->out_edges_chandle));
|
||||
@ -1786,7 +1786,8 @@ utils::BasicResult<StorageManipulationError, void> DiskStorage::DiskAccessor::Co
|
||||
if (flags::AreExperimentsEnabled(flags::Experiments::TEXT_SEARCH)) {
|
||||
disk_storage->indices_.text_index_.Commit();
|
||||
}
|
||||
|
||||
disk_storage->durable_metadata_.UpdateMetaData(disk_storage->timestamp_, disk_storage->vertex_count_,
|
||||
disk_storage->edge_count_);
|
||||
is_transaction_active_ = false;
|
||||
|
||||
return {};
|
||||
|
@ -301,6 +301,8 @@ class DiskStorage final : public Storage {
|
||||
|
||||
EdgeImportMode GetEdgeImportMode() const;
|
||||
|
||||
DurableMetadata *GetDurableMetadata() { return &durable_metadata_; }
|
||||
|
||||
private:
|
||||
void LoadPersistingMetadataInfo();
|
||||
|
||||
|
@ -632,10 +632,12 @@ def run_isolated_workload_without_authorization(vendor_runner, client, queries,
|
||||
|
||||
|
||||
def setup_indices_and_import_dataset(client, vendor_runner, generated_queries, workload, storage_mode):
|
||||
vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT)
|
||||
if benchmark_context.vendor_name == "memgraph":
|
||||
# Neo4j will get started just before import -> without this if statement it would try to start it twice
|
||||
vendor_runner.start_db_init(VENDOR_RUNNER_IMPORT)
|
||||
log.info("Executing database index setup")
|
||||
start_time = time.time()
|
||||
|
||||
import_results = None
|
||||
if generated_queries:
|
||||
client.execute(queries=workload.indexes_generator(), num_workers=1)
|
||||
log.info("Finished setting up indexes.")
|
||||
|
@ -127,8 +127,6 @@ def run_full_benchmarks(
|
||||
],
|
||||
]
|
||||
|
||||
assert not realistic or not mixed, "Cannot run both realistic and mixed workload, please select one!"
|
||||
|
||||
if realistic:
|
||||
# Configurations for full workload
|
||||
for count, write, read, update, analytical in realistic:
|
||||
|
99
tests/mgbench/mg_ondisk_vs_neo4j_pokec.sh
Normal file
99
tests/mgbench/mg_ondisk_vs_neo4j_pokec.sh
Normal file
@ -0,0 +1,99 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Currently only pokec dataset is modified to be used with memgraph on-disk storage
|
||||
|
||||
pushd () { command pushd "$@" > /dev/null; }
|
||||
popd () { command popd "$@" > /dev/null; }
|
||||
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
pushd "$SCRIPT_DIR"
|
||||
|
||||
# Help function
|
||||
function show_help() {
|
||||
echo "Usage: $0 [OPTIONS]"
|
||||
echo "Options:"
|
||||
echo " -n, --neo4j-path Path to Neo4j binary"
|
||||
echo " -m, --memgraph-path Path to Memgraph binary"
|
||||
echo " -w, --num-workers Number of workers for benchmark and import"
|
||||
echo " -d, --dataset_size dataset_size (small, medium, large)"
|
||||
echo " -h, --help Show this help message"
|
||||
exit 0
|
||||
}
|
||||
|
||||
# Default values
|
||||
neo4j_path="/usr/share/neo4j/bin/neo4j"
|
||||
memgraph_path="../../build/memgraph"
|
||||
num_workers=12
|
||||
dataset_size="small"
|
||||
|
||||
# Parse command line arguments
|
||||
while [[ $# -gt 0 ]]; do
|
||||
key="$1"
|
||||
case $key in
|
||||
-n|--neo4j-path)
|
||||
neo4j_path="$2"
|
||||
shift
|
||||
shift
|
||||
;;
|
||||
-m|--memgraph-path)
|
||||
memgraph_path="$2"
|
||||
shift
|
||||
shift
|
||||
;;
|
||||
-w|--num-workers)
|
||||
num_workers="$2"
|
||||
shift
|
||||
shift
|
||||
;;
|
||||
-d|--dataset_size)
|
||||
dataset_size="$2"
|
||||
shift
|
||||
shift
|
||||
;;
|
||||
-h|--help)
|
||||
show_help
|
||||
;;
|
||||
*)
|
||||
echo "Invalid option: $1"
|
||||
show_help
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [ ! -d "pokec_${dataset_size}_results" ]; then
|
||||
mkdir "pokec_${dataset_size}_results"
|
||||
fi
|
||||
|
||||
# Run Python: Mgbench - Neo4j
|
||||
echo "Running Python: Mgbench - Neo4j"
|
||||
python3 benchmark.py vendor-native \
|
||||
--vendor-binary "$neo4j_path" \
|
||||
--vendor-name neo4j \
|
||||
--num-workers-for-benchmark "$num_workers" \
|
||||
--num-workers-for-import "$num_workers" \
|
||||
--no-load-query-counts \
|
||||
--export-results "pokec_${dataset_size}_results/neo4j_${dataset_size}_pokec.json" \
|
||||
"pokec_disk/${dataset_size}/*/*" \
|
||||
--vendor-specific "config=$neo4j_path/conf/neo4j.conf" \
|
||||
--no-authorization
|
||||
|
||||
# Run Python: Mgbench - Memgraph - on-disk
|
||||
echo "Running Python: Mgbench - Memgraph - on-disk"
|
||||
python3 benchmark.py vendor-native \
|
||||
--vendor-binary "$memgraph_path" \
|
||||
--vendor-name memgraph \
|
||||
--num-workers-for-benchmark "$num_workers" \
|
||||
--num-workers-for-import "$num_workers" \
|
||||
--no-load-query-counts \
|
||||
--export-results-on-disk-txn "pokec_${dataset_size}_results/on_disk_${dataset_size}_pokec.json" \
|
||||
--export-results "pokec_${dataset_size}_results/on_disk_export_${dataset_size}_pokec.json" \
|
||||
"pokec_disk/${dataset_size}/*/*" \
|
||||
--no-authorization \
|
||||
--vendor-specific "data-directory=benchmark_datadir" "storage-mode=ON_DISK_TRANSACTIONAL"
|
||||
|
||||
echo "Comparing results"
|
||||
python3 compare_results.py --compare \
|
||||
"pokec_${dataset_size}_results/neo4j_${dataset_size}_pokec.json" \
|
||||
"pokec_${dataset_size}_results/on_disk_${dataset_size}_pokec.json" \
|
||||
--output \
|
||||
"pokec_${dataset_size}_results/neo4j_vs_mg_ondisk_results.html" \
|
||||
--different-vendors
|
@ -634,7 +634,7 @@ class Neo4j(BaseRunner):
|
||||
exit_proc = subprocess.run(args=[self._neo4j_binary, "stop"], capture_output=True, check=True)
|
||||
return exit_proc.returncode, usage
|
||||
else:
|
||||
return 0
|
||||
return 0, 0
|
||||
|
||||
def start_db_init(self, workload):
|
||||
if self._performance_tracking:
|
||||
|
@ -160,12 +160,7 @@ class Workload(ABC):
|
||||
raise ValueError("Vendor does not have INDEX for dataset!")
|
||||
|
||||
def _set_local_files(self) -> None:
|
||||
if not self.disk_workload:
|
||||
if self.LOCAL_FILE is not None:
|
||||
self._local_file = self.LOCAL_FILE.get(self._variant, None)
|
||||
else:
|
||||
self._local_file = None
|
||||
else:
|
||||
if self.disk_workload and self._vendor != "neo4j":
|
||||
if self.LOCAL_FILE_NODES is not None:
|
||||
self._local_file_nodes = self.LOCAL_FILE_NODES.get(self._variant, None)
|
||||
else:
|
||||
@ -175,14 +170,14 @@ class Workload(ABC):
|
||||
self._local_file_edges = self.LOCAL_FILE_EDGES.get(self._variant, None)
|
||||
else:
|
||||
self._local_file_edges = None
|
||||
else:
|
||||
if self.LOCAL_FILE is not None:
|
||||
self._local_file = self.LOCAL_FILE.get(self._variant, None)
|
||||
else:
|
||||
self._local_file = None
|
||||
|
||||
def _set_url_files(self) -> None:
|
||||
if not self.disk_workload:
|
||||
if self.URL_FILE is not None:
|
||||
self._url_file = self.URL_FILE.get(self._variant, None)
|
||||
else:
|
||||
self._url_file = None
|
||||
else:
|
||||
if self.disk_workload and self._vendor != "neo4j":
|
||||
if self.URL_FILE_NODES is not None:
|
||||
self._url_file_nodes = self.URL_FILE_NODES.get(self._variant, None)
|
||||
else:
|
||||
@ -191,6 +186,11 @@ class Workload(ABC):
|
||||
self._url_file_edges = self.URL_FILE_EDGES.get(self._variant, None)
|
||||
else:
|
||||
self._url_file_edges = None
|
||||
else:
|
||||
if self.URL_FILE is not None:
|
||||
self._url_file = self.URL_FILE.get(self._variant, None)
|
||||
else:
|
||||
self._url_file = None
|
||||
|
||||
def _set_local_index_file(self) -> None:
|
||||
if self.LOCAL_INDEX_FILE is not None:
|
||||
@ -205,10 +205,10 @@ class Workload(ABC):
|
||||
self._url_index = None
|
||||
|
||||
def prepare(self, directory):
|
||||
if not self.disk_workload:
|
||||
self._prepare_dataset_for_in_memory_workload(directory)
|
||||
else:
|
||||
if self.disk_workload and self._vendor != "neo4j":
|
||||
self._prepare_dataset_for_on_disk_workload(directory)
|
||||
else:
|
||||
self._prepare_dataset_for_in_memory_workload(directory)
|
||||
|
||||
if self._local_index is not None:
|
||||
print("Using local index file:", self._local_index)
|
||||
|
@ -13,7 +13,8 @@ import random
|
||||
|
||||
from benchmark_context import BenchmarkContext
|
||||
from workloads.base import Workload
|
||||
from workloads.importers.disk_importer_pokec import ImporterPokec
|
||||
from workloads.importers.disk_importer_pokec import DiskImporterPokec
|
||||
from workloads.importers.importer_pokec import ImporterPokec
|
||||
|
||||
|
||||
class Pokec(Workload):
|
||||
@ -22,6 +23,12 @@ class Pokec(Workload):
|
||||
DEFAULT_VARIANT = "small"
|
||||
FILE = None
|
||||
|
||||
URL_FILE = {
|
||||
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_small_import.cypher",
|
||||
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_medium_import.cypher",
|
||||
"large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/pokec_large.setup.cypher.gz",
|
||||
}
|
||||
|
||||
URL_FILE_NODES = {
|
||||
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_small_import_nodes.cypher",
|
||||
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/pokec_medium_import_nodes.cypher",
|
||||
@ -42,7 +49,7 @@ class Pokec(Workload):
|
||||
|
||||
URL_INDEX_FILE = {
|
||||
"memgraph": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/memgraph.cypher",
|
||||
"neo4j": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec_disk/benchmark/neo4j.cypher",
|
||||
"neo4j": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/benchmark/neo4j.cypher",
|
||||
}
|
||||
|
||||
PROPERTIES_ON_EDGES = False
|
||||
@ -51,15 +58,26 @@ class Pokec(Workload):
|
||||
super().__init__(variant, benchmark_context=benchmark_context, disk_workload=True)
|
||||
|
||||
def custom_import(self) -> bool:
|
||||
importer = ImporterPokec(
|
||||
benchmark_context=self.benchmark_context,
|
||||
dataset_name=self.NAME,
|
||||
index_file=self._file_index,
|
||||
dataset_nodes_file=self._node_file,
|
||||
dataset_edges_file=self._edge_file,
|
||||
variant=self._variant,
|
||||
)
|
||||
return importer.execute_import()
|
||||
if self._vendor == "neo4j":
|
||||
importer = ImporterPokec(
|
||||
benchmark_context=self.benchmark_context,
|
||||
dataset_name=self.NAME,
|
||||
index_file=self._file_index,
|
||||
dataset_file=self._file,
|
||||
variant=self._variant,
|
||||
)
|
||||
return importer.execute_import()
|
||||
|
||||
else:
|
||||
importer = DiskImporterPokec(
|
||||
benchmark_context=self.benchmark_context,
|
||||
dataset_name=self.NAME,
|
||||
index_file=self._file_index,
|
||||
dataset_nodes_file=self._node_file,
|
||||
dataset_edges_file=self._edge_file,
|
||||
variant=self._variant,
|
||||
)
|
||||
return importer.execute_import()
|
||||
|
||||
# Helpers used to generate the queries
|
||||
def _get_random_vertex(self):
|
||||
@ -214,12 +232,22 @@ class Pokec(Workload):
|
||||
# OK
|
||||
def benchmark__arango__allshortest_paths(self):
|
||||
vertex_from, vertex_to = self._get_random_from_to()
|
||||
return (
|
||||
memgraph = (
|
||||
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p=(n)-[*allshortest 2 (r, n | 1) total_weight]->(m) "
|
||||
"RETURN extract(n in nodes(p) | n.id) AS path",
|
||||
{"from": vertex_from, "to": vertex_to},
|
||||
)
|
||||
neo4j = (
|
||||
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p = allShortestPaths((n)-[*..2]->(m)) "
|
||||
"RETURN [node in nodes(p) | node.id] AS path",
|
||||
{"from": vertex_from, "to": vertex_to},
|
||||
)
|
||||
if self._vendor == "neo4j":
|
||||
return neo4j
|
||||
else:
|
||||
return memgraph
|
||||
|
||||
# Our benchmark queries
|
||||
|
||||
|
@ -17,7 +17,7 @@ from constants import *
|
||||
from runners import BaseRunner
|
||||
|
||||
|
||||
class ImporterPokec:
|
||||
class DiskImporterPokec:
|
||||
def __init__(
|
||||
self,
|
||||
benchmark_context: BenchmarkContext,
|
||||
|
@ -167,30 +167,62 @@ class Pokec(Workload):
|
||||
|
||||
def benchmark__arango__shortest_path(self):
|
||||
vertex_from, vertex_to = self._get_random_from_to()
|
||||
return (
|
||||
memgraph = (
|
||||
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p=(n)-[*bfs..15]->(m) "
|
||||
"RETURN extract(n in nodes(p) | n.id) AS path",
|
||||
{"from": vertex_from, "to": vertex_to},
|
||||
)
|
||||
neo4j = (
|
||||
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p=shortestPath((n)-[*..15]->(m)) "
|
||||
"RETURN [n in nodes(p) | n.id] AS path",
|
||||
{"from": vertex_from, "to": vertex_to},
|
||||
)
|
||||
if self._vendor == "memgraph":
|
||||
return memgraph
|
||||
else:
|
||||
return neo4j
|
||||
|
||||
def benchmark__arango__shortest_path_with_filter(self):
|
||||
vertex_from, vertex_to = self._get_random_from_to()
|
||||
return (
|
||||
memgraph = (
|
||||
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) "
|
||||
"RETURN extract(n in nodes(p) | n.id) AS path",
|
||||
{"from": vertex_from, "to": vertex_to},
|
||||
)
|
||||
|
||||
neo4j = (
|
||||
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p=shortestPath((n)-[*..15]->(m)) "
|
||||
"WHERE all(node in nodes(p) WHERE node.age >= 18) "
|
||||
"RETURN [n in nodes(p) | n.id] AS path",
|
||||
{"from": vertex_from, "to": vertex_to},
|
||||
)
|
||||
if self._vendor == "memgraph":
|
||||
return memgraph
|
||||
else:
|
||||
return neo4j
|
||||
|
||||
def benchmark__arango__allshortest_paths(self):
|
||||
vertex_from, vertex_to = self._get_random_from_to()
|
||||
return (
|
||||
memgraph = (
|
||||
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p=(n)-[*allshortest 2 (r, n | 1) total_weight]->(m) "
|
||||
"RETURN extract(n in nodes(p) | n.id) AS path",
|
||||
{"from": vertex_from, "to": vertex_to},
|
||||
)
|
||||
neo4j = (
|
||||
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
|
||||
"MATCH p = allShortestPaths((n)-[*..2]->(m)) "
|
||||
"RETURN [node in nodes(p) | node.id] AS path",
|
||||
{"from": vertex_from, "to": vertex_to},
|
||||
)
|
||||
if self._vendor == "memgraph":
|
||||
return memgraph
|
||||
else:
|
||||
return neo4j
|
||||
|
||||
# Our benchmark queries
|
||||
|
||||
|
@ -179,3 +179,35 @@ TEST_F(ClearingOldDiskDataTest, TestNumOfEntriesWithEdgeValueUpdate) {
|
||||
|
||||
ASSERT_EQ(disk_test_utils::GetRealNumberOfEntriesInRocksDB(tx_db), 5);
|
||||
}
|
||||
|
||||
TEST_F(ClearingOldDiskDataTest, TestTimestampAfterCommit) {
|
||||
auto *tx_db = disk_storage->GetRocksDBStorage()->db_;
|
||||
ASSERT_EQ(disk_test_utils::GetRealNumberOfEntriesInRocksDB(tx_db), 0);
|
||||
|
||||
auto acc1 = disk_storage->Access(ReplicationRole::MAIN);
|
||||
auto vertex1 = acc1->CreateVertex();
|
||||
auto label1 = acc1->NameToLabel("DiskLabel");
|
||||
auto property1 = acc1->NameToProperty("DiskProperty");
|
||||
ASSERT_TRUE(vertex1.AddLabel(label1).HasValue());
|
||||
ASSERT_TRUE(vertex1.SetProperty(property1, memgraph::storage::PropertyValue(10)).HasValue());
|
||||
ASSERT_FALSE(acc1->Commit().HasError());
|
||||
ASSERT_EQ(disk_test_utils::GetRealNumberOfEntriesInRocksDB(tx_db), 1);
|
||||
|
||||
auto saved_timestamp = disk_storage->GetDurableMetadata()->LoadTimestampIfExists();
|
||||
ASSERT_EQ(saved_timestamp.has_value(), true);
|
||||
ASSERT_EQ(disk_storage->timestamp_, saved_timestamp);
|
||||
|
||||
auto acc2 = disk_storage->Access(ReplicationRole::MAIN);
|
||||
auto vertex2 = acc2->CreateVertex();
|
||||
auto label2 = acc2->NameToLabel("DiskLabel2");
|
||||
auto property2 = acc2->NameToProperty("DiskProperty2");
|
||||
|
||||
ASSERT_TRUE(vertex2.AddLabel(label2).HasValue());
|
||||
ASSERT_TRUE(vertex2.SetProperty(property2, memgraph::storage::PropertyValue(10)).HasValue());
|
||||
ASSERT_FALSE(acc2->Commit().HasError());
|
||||
ASSERT_EQ(disk_test_utils::GetRealNumberOfEntriesInRocksDB(tx_db), 2);
|
||||
|
||||
saved_timestamp = disk_storage->GetDurableMetadata()->LoadTimestampIfExists();
|
||||
ASSERT_EQ(saved_timestamp.has_value(), true);
|
||||
ASSERT_EQ(disk_storage->timestamp_, saved_timestamp);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user