Merge branch 'project-pineapples' into T1083-MG-limit-and-order-expand-one_v3

This commit is contained in:
Jeremy B 2022-11-09 16:47:06 +01:00 committed by GitHub
commit 18009c06b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 875 additions and 337 deletions

View File

@ -16,7 +16,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)

View File

@ -26,7 +26,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -64,7 +64,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -75,14 +75,39 @@ jobs:
- name: Fetch all history for all tags and branches
run: git fetch
- name: Initialize deps
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
- name: Set base branch
if: ${{ github.event_name == 'pull_request' }}
run: |
echo "BASE_BRANCH=origin/${{ github.base_ref }}" >> $GITHUB_ENV
- name: Set base branch # if we manually dispatch or push to master
if: ${{ github.event_name != 'pull_request' }}
run: |
echo "BASE_BRANCH=origin/master" >> $GITHUB_ENV
- name: Python code analysis
run: |
CHANGED_FILES=$(git diff -U0 ${{ env.BASE_BRANCH }}... --name-only)
for file in ${CHANGED_FILES}; do
echo ${file}
if [[ ${file} == *.py ]]; then
python3 -m black --check --diff ${file}
python3 -m isort --check-only --diff ${file}
fi
done
- name: Build combined ASAN, UBSAN and coverage binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
cd build
cmake -DTEST_COVERAGE=ON -DASAN=ON -DUBSAN=ON ..
make -j$THREADS memgraph__unit
@ -110,7 +135,7 @@ jobs:
tar -czf code_coverage.tar.gz coverage.json html report.json summary.rmu
- name: Save code coverage
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/generated/code_coverage.tar.gz
@ -145,7 +170,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -173,6 +198,15 @@ jobs:
cd build
ctest -R memgraph__simulation --output-on-failure -j$THREADS
- name: Run single benchmark test
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run simulation tests.
cd tests/mgbench
./benchmark.py accesscontrol/small --num-workers-for-import 1 --test-system-arg "split-file splitfiles/accesscontrol_small.shard_configuration bolt-num-workers 1"
release_build:
name: "Release build"
runs-on: [self-hosted, Linux, X64, Diff]
@ -183,7 +217,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -220,6 +254,15 @@ jobs:
cd build
ctest -R memgraph__simulation --output-on-failure -j$THREADS
- name: Run single benchmark test
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run simulation tests.
cd tests/mgbench
./benchmark.py accesscontrol/small --num-workers-for-import 1 --test-system-arg "split-file splitfiles/accesscontrol_small.shard_configuration bolt-num-workers 1"
- name: Run e2e tests
run: |
# TODO(gitbuda): Setup mgclient and pymgclient properly.

View File

@ -14,7 +14,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)

View File

@ -17,7 +17,7 @@ jobs:
run: |
./release/package/run.sh package centos-7
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: centos-7
path: build/output/centos-7/memgraph*.rpm
@ -34,7 +34,7 @@ jobs:
run: |
./release/package/run.sh package centos-9
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: centos-9
path: build/output/centos-9/memgraph*.rpm
@ -51,7 +51,7 @@ jobs:
run: |
./release/package/run.sh package debian-10
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: debian-10
path: build/output/debian-10/memgraph*.deb
@ -68,7 +68,7 @@ jobs:
run: |
./release/package/run.sh package debian-11
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: debian-11
path: build/output/debian-11/memgraph*.deb
@ -87,7 +87,7 @@ jobs:
./run.sh package debian-11 --for-docker
./run.sh docker
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: docker
path: build/output/docker/memgraph*.tar.gz
@ -104,7 +104,7 @@ jobs:
run: |
./release/package/run.sh package ubuntu-18.04
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: ubuntu-1804
path: build/output/ubuntu-18.04/memgraph*.deb
@ -121,7 +121,7 @@ jobs:
run: |
./release/package/run.sh package ubuntu-20.04
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: ubuntu-2004
path: build/output/ubuntu-20.04/memgraph*.deb
@ -138,7 +138,7 @@ jobs:
run: |
./release/package/run.sh package ubuntu-22.04
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: ubuntu-2204
path: build/output/ubuntu-22.04/memgraph*.deb
@ -155,7 +155,7 @@ jobs:
run: |
./release/package/run.sh package debian-11 --for-platform
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: debian-11-platform
path: build/output/debian-11/memgraph*.deb
@ -172,7 +172,7 @@ jobs:
run: |
./release/package/run.sh package debian-11-arm
- name: "Upload package"
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: debian-11-arm
path: build/output/debian-11-arm/memgraph*.deb

View File

@ -17,7 +17,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -55,7 +55,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -97,7 +97,7 @@ jobs:
tar -czf code_coverage.tar.gz coverage.json html report.json summary.rmu
- name: Save code coverage
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/generated/code_coverage.tar.gz
@ -112,7 +112,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -173,7 +173,7 @@ jobs:
./cppcheck_and_clang_format diff
- name: Save cppcheck and clang-format errors
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/cppcheck_and_clang_format.txt
@ -189,7 +189,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -225,7 +225,7 @@ jobs:
rpmlint memgraph*.rpm
- name: Save enterprise RPM package
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Enterprise RPM package"
path: build/output/memgraph*.rpm
@ -262,7 +262,7 @@ jobs:
./continuous_integration
- name: Save quality assurance status
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "GQL Behave Status"
path: |

View File

@ -17,7 +17,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -55,7 +55,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -97,7 +97,7 @@ jobs:
tar -czf code_coverage.tar.gz coverage.json html report.json summary.rmu
- name: Save code coverage
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/generated/code_coverage.tar.gz
@ -112,7 +112,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -173,7 +173,7 @@ jobs:
./cppcheck_and_clang_format diff
- name: Save cppcheck and clang-format errors
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/cppcheck_and_clang_format.txt
@ -189,7 +189,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -224,7 +224,7 @@ jobs:
cpack -G DEB --config ../CPackConfig.cmake
- name: Save enterprise DEB package
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Enterprise DEB package"
path: build/output/memgraph*.deb
@ -261,7 +261,7 @@ jobs:
./continuous_integration
- name: Save quality assurance status
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "GQL Behave Status"
path: |
@ -324,7 +324,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -349,7 +349,7 @@ jobs:
./run.sh test --binary ../../build/memgraph --run-args "test-all --node-configs resources/node-config.edn" --ignore-run-stdout-logs --ignore-run-stderr-logs
- name: Save Jepsen report
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
if: ${{ always() }}
with:
name: "Jepsen Report"

View File

@ -19,7 +19,7 @@ jobs:
DOCKER_REPOSITORY_NAME: memgraph
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v1

View File

@ -17,7 +17,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -55,7 +55,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -97,7 +97,7 @@ jobs:
tar -czf code_coverage.tar.gz coverage.json html report.json summary.rmu
- name: Save code coverage
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/generated/code_coverage.tar.gz
@ -112,7 +112,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -173,7 +173,7 @@ jobs:
./cppcheck_and_clang_format diff
- name: Save cppcheck and clang-format errors
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/cppcheck_and_clang_format.txt
@ -189,7 +189,7 @@ jobs:
steps:
- name: Set up repository
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
@ -224,7 +224,7 @@ jobs:
cpack -G DEB --config ../CPackConfig.cmake
- name: Save enterprise DEB package
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "Enterprise DEB package"
path: build/output/memgraph*.deb
@ -261,7 +261,7 @@ jobs:
./continuous_integration
- name: Save quality assurance status
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v3
with:
name: "GQL Behave Status"
path: |

View File

@ -6,18 +6,14 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 22.3.0
rev: 22.10.0
hooks:
- id: black
args: # arguments to configure black
- --line-length=120
- --include='\.pyi?$'
# these folders wont be formatted by black
- --exclude="""\.git |
\.__pycache__|
build|
libs|
.cache"""
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
name: isort (python)
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: v13.0.0
hooks:

7
init
View File

@ -140,5 +140,12 @@ done;
python3 -m pip install pre-commit
python3 -m pre_commit install
# Install py format tools
echo "Install black formatter"
python3 -m pip install black==22.10.*
echo "Install isort"
python3 -m pip install isort==5.10.*
# Link `include/mgp.py` with `release/mgp/mgp.py`
ln -v -f include/mgp.py release/mgp/mgp.py

12
pyproject.toml Normal file
View File

@ -0,0 +1,12 @@
[tool.black]
line-length = 120
include = '\.pyi?$'
extend-exclude = '''
/(
| .git
| .__pycache__
| build
| libs
| .cache
)/
'''

View File

@ -404,6 +404,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
case Error::SERIALIZATION_ERROR:
case Error::VERTEX_HAS_EDGES:
case Error::PROPERTIES_DISABLED:
case Error::VERTEX_ALREADY_INSERTED:
throw ExpressionRuntimeException("Unexpected error when accessing labels.");
}
}
@ -751,6 +752,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
case Error::SERIALIZATION_ERROR:
case Error::VERTEX_HAS_EDGES:
case Error::PROPERTIES_DISABLED:
case Error::VERTEX_ALREADY_INSERTED:
throw ExpressionRuntimeException("Unexpected error when getting a property.");
}
}
@ -779,6 +781,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
case Error::SERIALIZATION_ERROR:
case Error::VERTEX_HAS_EDGES:
case Error::PROPERTIES_DISABLED:
case Error::VERTEX_ALREADY_INSERTED:
throw ExpressionRuntimeException("Unexpected error when getting a property.");
}
}

View File

@ -489,6 +489,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
case memgraph::storage::v3::Error::VERTEX_HAS_EDGES:
case memgraph::storage::v3::Error::PROPERTIES_DISABLED:
case memgraph::storage::v3::Error::NONEXISTENT_OBJECT:
case memgraph::storage::v3::Error::VERTEX_ALREADY_INSERTED:
throw memgraph::communication::bolt::ClientError("Unexpected storage error when streaming summary.");
}
}
@ -523,6 +524,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
case memgraph::storage::v3::Error::VERTEX_HAS_EDGES:
case memgraph::storage::v3::Error::SERIALIZATION_ERROR:
case memgraph::storage::v3::Error::PROPERTIES_DISABLED:
case memgraph::storage::v3::Error::VERTEX_ALREADY_INSERTED:
throw memgraph::communication::bolt::ClientError("Unexpected storage error when streaming results.");
}
}

View File

@ -28,7 +28,7 @@
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/shard_operation_result.hpp"
#include "storage/v3/view.hpp"
#include "utils/exceptions.hpp"
#include "utils/logging.hpp"
@ -93,7 +93,7 @@ concept AccessorWithSetPropertyAndValidate = requires(T accessor, const storage:
const storage::v3::PropertyValue new_value) {
{
accessor.SetPropertyAndValidate(key, new_value)
} -> std::same_as<storage::v3::ResultSchema<storage::v3::PropertyValue>>;
} -> std::same_as<storage::v3::ShardOperationResult<storage::v3::PropertyValue>>;
};
template <typename TRecordAccessor>
@ -110,6 +110,8 @@ inline void HandleErrorOnPropertyUpdate(const storage::v3::Error error) {
throw QueryRuntimeException("Can't set property because properties on edges are disabled.");
case storage::v3::Error::VERTEX_HAS_EDGES:
case storage::v3::Error::NONEXISTENT_OBJECT:
case storage::v3::Error::VERTEX_ALREADY_INSERTED:
throw QueryRuntimeException("Unexpected error when setting a property.");
}
}

View File

@ -23,6 +23,7 @@
#include "storage/v3/key_store.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/result.hpp"
#include "storage/v3/shard_operation_result.hpp"
///////////////////////////////////////////////////////////
// Our communication layer and query engine don't mix
@ -113,17 +114,19 @@ class VertexAccessor final {
auto PrimaryKey(storage::v3::View view) const { return impl_.PrimaryKey(view); }
storage::v3::ResultSchema<bool> AddLabel(storage::v3::LabelId label) { return impl_.AddLabelAndValidate(label); }
storage::v3::ResultSchema<bool> AddLabelAndValidate(storage::v3::LabelId label) {
storage::v3::ShardOperationResult<bool> AddLabel(storage::v3::LabelId label) {
return impl_.AddLabelAndValidate(label);
}
storage::v3::ResultSchema<bool> RemoveLabel(storage::v3::LabelId label) {
storage::v3::ShardOperationResult<bool> AddLabelAndValidate(storage::v3::LabelId label) {
return impl_.AddLabelAndValidate(label);
}
storage::v3::ShardOperationResult<bool> RemoveLabel(storage::v3::LabelId label) {
return impl_.RemoveLabelAndValidate(label);
}
storage::v3::ResultSchema<bool> RemoveLabelAndValidate(storage::v3::LabelId label) {
storage::v3::ShardOperationResult<bool> RemoveLabelAndValidate(storage::v3::LabelId label) {
return impl_.RemoveLabelAndValidate(label);
}
@ -138,17 +141,17 @@ class VertexAccessor final {
return impl_.GetProperty(key, view);
}
storage::v3::ResultSchema<storage::v3::PropertyValue> SetProperty(storage::v3::PropertyId key,
const storage::v3::PropertyValue &value) {
storage::v3::ShardOperationResult<storage::v3::PropertyValue> SetProperty(storage::v3::PropertyId key,
const storage::v3::PropertyValue &value) {
return impl_.SetPropertyAndValidate(key, value);
}
storage::v3::ResultSchema<storage::v3::PropertyValue> SetPropertyAndValidate(
storage::v3::ShardOperationResult<storage::v3::PropertyValue> SetPropertyAndValidate(
storage::v3::PropertyId key, const storage::v3::PropertyValue &value) {
return impl_.SetPropertyAndValidate(key, value);
}
storage::v3::ResultSchema<storage::v3::PropertyValue> RemovePropertyAndValidate(storage::v3::PropertyId key) {
storage::v3::ShardOperationResult<storage::v3::PropertyValue> RemovePropertyAndValidate(storage::v3::PropertyId key) {
return SetPropertyAndValidate(key, storage::v3::PropertyValue{});
}

View File

@ -25,6 +25,7 @@ enum class Error : uint8_t {
DELETED_OBJECT,
VERTEX_HAS_EDGES,
PROPERTIES_DISABLED,
VERTEX_ALREADY_INSERTED
};
template <class TValue>

View File

@ -79,7 +79,4 @@ struct VertexValidator {
LabelId primary_label_;
};
template <typename TValue>
using ResultSchema = utils::BasicResult<std::variant<SchemaViolation, Error>, TValue>;
} // namespace memgraph::storage::v3

View File

@ -33,6 +33,7 @@
#include "storage/v3/mvcc.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/shard_operation_result.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_accessor.hpp"
@ -343,7 +344,7 @@ Shard::~Shard() {}
Shard::Accessor::Accessor(Shard &shard, Transaction &transaction)
: shard_(&shard), transaction_(&transaction), config_(shard_->config_.items) {}
ResultSchema<VertexAccessor> Shard::Accessor::CreateVertexAndValidate(
ShardOperationResult<VertexAccessor> Shard::Accessor::CreateVertexAndValidate(
const std::vector<LabelId> &labels, const std::vector<PropertyValue> &primary_properties,
const std::vector<std::pair<PropertyId, PropertyValue>> &properties) {
OOMExceptionEnabler oom_exception;
@ -361,7 +362,9 @@ ResultSchema<VertexAccessor> Shard::Accessor::CreateVertexAndValidate(
delta->prev.Set(&it->vertex);
VertexAccessor vertex_acc{&it->vertex, transaction_, &shard_->indices_, config_, shard_->vertex_validator_};
MG_ASSERT(inserted, "The vertex must be inserted here!");
if (!inserted) {
return {Error::VERTEX_ALREADY_INSERTED};
}
MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!");
// TODO(jbajic) Improve, maybe delay index update

View File

@ -38,6 +38,7 @@
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/shard_operation_result.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_accessor.hpp"
@ -206,7 +207,7 @@ class Shard final {
public:
/// @throw std::bad_alloc
ResultSchema<VertexAccessor> CreateVertexAndValidate(
ShardOperationResult<VertexAccessor> CreateVertexAndValidate(
const std::vector<LabelId> &labels, const std::vector<PropertyValue> &primary_properties,
const std::vector<std::pair<PropertyId, PropertyValue>> &properties);

View File

@ -0,0 +1,26 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <variant>
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
namespace memgraph::storage::v3 {
using ResultErrorType = std::variant<SchemaViolation, Error>;
template <typename TValue>
using ShardOperationResult = utils::BasicResult<ResultErrorType, TValue>;
} // namespace memgraph::storage::v3

View File

@ -12,11 +12,13 @@
#include <algorithm>
#include <functional>
#include <iterator>
#include <optional>
#include <unordered_set>
#include <utility>
#include "parser/opencypher/parser.hpp"
#include "query/v2/requests.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/view.hpp"
#include "storage/v3/bindings/ast/ast.hpp"
#include "storage/v3/bindings/cypher_main_visitor.hpp"
@ -180,8 +182,8 @@ std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexA
struct LocalError {};
std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccessor> &v_acc,
const msgs::ExpandOneRequest &req, msgs::VertexId src_vertex) {
std::optional<std::vector<msgs::Label>> FillUpSourceVertexSecondaryLabels(const std::optional<VertexAccessor> &v_acc,
const msgs::ExpandOneRequest &req) {
auto secondary_labels = v_acc->Labels(View::NEW);
if (secondary_labels.HasError()) {
spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}",
@ -190,14 +192,13 @@ std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccesso
}
auto &sec_labels = secondary_labels.GetValue();
msgs::Vertex source_vertex;
source_vertex.id = src_vertex;
source_vertex.labels.reserve(sec_labels.size());
std::vector<msgs::Label> msgs_secondary_labels;
msgs_secondary_labels.reserve(sec_labels.size());
std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(source_vertex.labels),
std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels),
[](auto label_id) { return msgs::Label{.id = label_id}; });
return source_vertex;
return msgs_secondary_labels;
}
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
@ -322,13 +323,16 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
const Schemas::Schema *schema) {
/// Fill up source vertex
const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second));
const auto primary_key = ConvertPropertyVector(src_vertex.second);
auto v_acc = acc.FindVertex(primary_key, View::NEW);
auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex);
if (!source_vertex) {
msgs::Vertex source_vertex = {.id = src_vertex};
if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) {
source_vertex.labels = *maybe_secondary_labels;
} else {
return std::nullopt;
}
std::optional<std::map<PropertyId, Value>> src_vertex_properties;
src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
@ -345,7 +349,7 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
auto [in_edges, out_edges] = fill_up_connecting_edges.value();
msgs::ExpandOneResultRow result_row;
result_row.src_vertex = std::move(*source_vertex);
result_row.src_vertex = std::move(source_vertex);
result_row.src_vertex_properties = std::move(*src_vertex_properties);
static constexpr bool kInEdges = true;
static constexpr bool kOutEdges = false;
@ -528,12 +532,31 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
auto &error = result_schema.GetError();
std::visit(
[]<typename T>(T &&) {
[]<typename T>(T &&error) {
using ErrorType = std::remove_cvref_t<T>;
if constexpr (std::is_same_v<ErrorType, SchemaViolation>) {
spdlog::debug("Creating vertex failed with error: SchemaViolation");
} else if constexpr (std::is_same_v<ErrorType, Error>) {
spdlog::debug("Creating vertex failed with error: Error");
switch (error) {
case Error::DELETED_OBJECT:
spdlog::debug("Creating vertex failed with error: DELETED_OBJECT");
break;
case Error::NONEXISTENT_OBJECT:
spdlog::debug("Creating vertex failed with error: NONEXISTENT_OBJECT");
break;
case Error::SERIALIZATION_ERROR:
spdlog::debug("Creating vertex failed with error: SERIALIZATION_ERROR");
break;
case Error::PROPERTIES_DISABLED:
spdlog::debug("Creating vertex failed with error: PROPERTIES_DISABLED");
break;
case Error::VERTEX_HAS_EDGES:
spdlog::debug("Creating vertex failed with error: VERTEX_HAS_EDGES");
break;
case Error::VERTEX_ALREADY_INSERTED:
spdlog::debug("Creating vertex failed with error: VERTEX_ALREADY_INSERTED");
break;
}
} else {
static_assert(kAlwaysFalse<T>, "Missing type from variant visitor");
}
@ -573,20 +596,39 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateVerticesRequest &&req) {
auto &error = result_schema.GetError();
std::visit(
[&action_successful]<typename T>(T &&) {
[]<typename T>(T &&error) {
using ErrorType = std::remove_cvref_t<T>;
if constexpr (std::is_same_v<ErrorType, SchemaViolation>) {
action_successful = false;
spdlog::debug("Updating vertex failed with error: SchemaViolation");
} else if constexpr (std::is_same_v<ErrorType, Error>) {
action_successful = false;
spdlog::debug("Updating vertex failed with error: Error");
switch (error) {
case Error::DELETED_OBJECT:
spdlog::debug("Updating vertex failed with error: DELETED_OBJECT");
break;
case Error::NONEXISTENT_OBJECT:
spdlog::debug("Updating vertex failed with error: NONEXISTENT_OBJECT");
break;
case Error::SERIALIZATION_ERROR:
spdlog::debug("Updating vertex failed with error: SERIALIZATION_ERROR");
break;
case Error::PROPERTIES_DISABLED:
spdlog::debug("Updating vertex failed with error: PROPERTIES_DISABLED");
break;
case Error::VERTEX_HAS_EDGES:
spdlog::debug("Updating vertex failed with error: VERTEX_HAS_EDGES");
break;
case Error::VERTEX_ALREADY_INSERTED:
spdlog::debug("Updating vertex failed with error: VERTEX_ALREADY_INSERTED");
break;
}
} else {
static_assert(kAlwaysFalse<T>, "Missing type from variant visitor");
}
},
error);
action_successful = false;
break;
}
}

View File

@ -21,8 +21,8 @@
#include "storage/v3/key_store.hpp"
#include "storage/v3/mvcc.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/shard_operation_result.hpp"
#include "storage/v3/vertex.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"
@ -98,7 +98,7 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
return true;
}
ResultSchema<bool> VertexAccessor::AddLabelAndValidate(LabelId label) {
ShardOperationResult<bool> VertexAccessor::AddLabelAndValidate(LabelId label) {
if (const auto maybe_violation_error = vertex_validator_->ValidateAddLabel(label); maybe_violation_error) {
return {*maybe_violation_error};
}
@ -134,7 +134,7 @@ Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
return true;
}
ResultSchema<bool> VertexAccessor::RemoveLabelAndValidate(LabelId label) {
ShardOperationResult<bool> VertexAccessor::RemoveLabelAndValidate(LabelId label) {
if (const auto maybe_violation_error = vertex_validator_->ValidateRemoveLabel(label); maybe_violation_error) {
return {*maybe_violation_error};
}
@ -331,7 +331,8 @@ Result<void> VertexAccessor::CheckVertexExistence(View view) const {
return {};
}
ResultSchema<PropertyValue> VertexAccessor::SetPropertyAndValidate(PropertyId property, const PropertyValue &value) {
ShardOperationResult<PropertyValue> VertexAccessor::SetPropertyAndValidate(PropertyId property,
const PropertyValue &value) {
if (auto maybe_violation_error = vertex_validator_->ValidatePropertyUpdate(property); maybe_violation_error) {
return {*maybe_violation_error};
}

View File

@ -17,7 +17,7 @@
#include "storage/v3/id_types.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/shard_operation_result.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_id.hpp"
@ -55,13 +55,13 @@ class VertexAccessor final {
/// `false` is returned if the label already existed, or SchemaViolation
/// if adding the label has violated one of the schema constraints.
/// @throw std::bad_alloc
ResultSchema<bool> AddLabelAndValidate(LabelId label);
ShardOperationResult<bool> AddLabelAndValidate(LabelId label);
/// Remove a label and return `true` if deletion took place.
/// `false` is returned if the vertex did not have a label already. or SchemaViolation
/// if adding the label has violated one of the schema constraints.
/// @throw std::bad_alloc
ResultSchema<bool> RemoveLabelAndValidate(LabelId label);
ShardOperationResult<bool> RemoveLabelAndValidate(LabelId label);
Result<bool> HasLabel(View view, LabelId label) const;
@ -80,7 +80,7 @@ class VertexAccessor final {
/// Set a property value and return the old value or error.
/// @throw std::bad_alloc
ResultSchema<PropertyValue> SetPropertyAndValidate(PropertyId property, const PropertyValue &value);
ShardOperationResult<PropertyValue> SetPropertyAndValidate(PropertyId property, const PropertyValue &value);
/// Remove all properties and return the values of the removed properties.
/// @throw std::bad_alloc

View File

@ -3,6 +3,7 @@ function(distributed_queries_e2e_python_files FILE_NAME)
endfunction()
distributed_queries_e2e_python_files(distributed_queries.py)
distributed_queries_e2e_python_files(distributed_expand_one.py)
distributed_queries_e2e_python_files(unwind_collect.py)
distributed_queries_e2e_python_files(order_by_and_limit.py)
distributed_queries_e2e_python_files(distinct.py)

View File

@ -9,11 +9,11 @@
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import sys
import pytest
import time
from typing import List, Optional
import mgclient
import pytest
@pytest.fixture(autouse=True)
@ -28,12 +28,12 @@ def connect(**kwargs) -> mgclient.Connection:
return connection
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: Optional[dict] = None) -> List[tuple]:
cursor.execute(query, params if params else {})
return cursor.fetchall()
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int) -> bool:
results = execute_and_fetch_all(cursor, query)
return len(results) == n

View File

@ -0,0 +1,37 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import pytest
from common import connection, execute_and_fetch_all, has_n_result_row, wait_for_shard_manager_to_initialize
def test_sequenced_expand_one(connection):
wait_for_shard_manager_to_initialize()
cursor = connection.cursor()
for i in range(1, 4):
assert has_n_result_row(cursor, f"CREATE (:label {{property:{i}}})", 0), f"Failed creating node"
assert has_n_result_row(cursor, "MATCH (n {property:1}), (m {property:2}) CREATE (n)-[:TO]->(m)", 0)
assert has_n_result_row(cursor, "MATCH (n {property:2}), (m {property:3}) CREATE (n)-[:TO]->(m)", 0)
results = execute_and_fetch_all(cursor, "MATCH (n)-[:TO]->(m)-[:TO]->(l) RETURN n,m,l")
assert len(results) == 1
n, m, l = results[0]
assert n.properties["property"] == 1
assert m.properties["property"] == 2
assert l.properties["property"] == 3
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -12,6 +12,11 @@ workloads:
args: ["distributed_queries/distributed_queries.py"]
<<: *template_cluster
- name: "Distributed expand one"
binary: "tests/e2e/pytest_runner.sh"
args: ["distributed_queries/distributed_expand_one.py"]
<<: *template_cluster
- name: "Distributed unwind collect"
binary: "tests/e2e/pytest_runner.sh"
args: ["distributed_queries/unwind_collect.py"]

View File

@ -15,15 +15,18 @@ import argparse
import collections
import copy
import fnmatch
import importlib
import inspect
import json
import multiprocessing
import os
import random
import sys
import time
import datasets
import log
import helpers
import log
import runners
@ -37,8 +40,7 @@ def get_queries(gen, count):
return ret
def match_patterns(dataset, variant, group, test, is_default_variant,
patterns):
def match_patterns(dataset, variant, group, test, is_default_variant, patterns):
for pattern in patterns:
verdict = [fnmatch.fnmatchcase(dataset, pattern[0])]
if pattern[1] != "":
@ -58,7 +60,7 @@ def filter_benchmarks(generators, patterns):
pattern = patterns[i].split("/")
if len(pattern) > 4 or len(pattern) == 0:
raise Exception("Invalid benchmark description '" + pattern + "'!")
pattern.extend(["", "*", "*"][len(pattern) - 1:])
pattern.extend(["", "*", "*"][len(pattern) - 1 :])
patterns[i] = pattern
filtered = []
for dataset in sorted(generators.keys()):
@ -68,8 +70,7 @@ def filter_benchmarks(generators, patterns):
current = collections.defaultdict(list)
for group in tests:
for test_name, test_func in tests[group]:
if match_patterns(dataset, variant, group, test_name,
is_default_variant, patterns):
if match_patterns(dataset, variant, group, test_name, is_default_variant, patterns):
current[group].append((test_name, test_func))
if len(current) > 0:
filtered.append((generator(variant), dict(current)))
@ -78,54 +79,64 @@ def filter_benchmarks(generators, patterns):
# Parse options.
parser = argparse.ArgumentParser(
description="Memgraph benchmark executor.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("benchmarks", nargs="*", default="",
help="descriptions of benchmarks that should be run; "
"multiple descriptions can be specified to run multiple "
"benchmarks; the description is specified as "
"dataset/variant/group/test; Unix shell-style wildcards "
"can be used in the descriptions; variant, group and test "
"are optional and they can be left out; the default "
"variant is '' which selects the default dataset variant; "
"the default group is '*' which selects all groups; the "
"default test is '*' which selects all tests")
parser.add_argument("--memgraph-binary",
default=helpers.get_binary_path("memgraph"),
help="Memgraph binary used for benchmarking")
parser.add_argument("--client-binary",
default=helpers.get_binary_path("tests/mgbench/client"),
help="client binary used for benchmarking")
parser.add_argument("--num-workers-for-import", type=int,
default=multiprocessing.cpu_count() // 2,
help="number of workers used to import the dataset")
parser.add_argument("--num-workers-for-benchmark", type=int,
default=1,
help="number of workers used to execute the benchmark")
parser.add_argument("--single-threaded-runtime-sec", type=int,
default=10,
help="single threaded duration of each test")
parser.add_argument("--no-load-query-counts", action="store_true",
help="disable loading of cached query counts")
parser.add_argument("--no-save-query-counts", action="store_true",
help="disable storing of cached query counts")
parser.add_argument("--export-results", default="",
help="file path into which results should be exported")
parser.add_argument("--temporary-directory", default="/tmp",
help="directory path where temporary data should "
"be stored")
parser.add_argument("--no-properties-on-edges", action="store_true",
help="disable properties on edges")
description="Memgraph benchmark executor.", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"benchmarks",
nargs="*",
default="",
help="descriptions of benchmarks that should be run; "
"multiple descriptions can be specified to run multiple "
"benchmarks; the description is specified as "
"dataset/variant/group/test; Unix shell-style wildcards "
"can be used in the descriptions; variant, group and test "
"are optional and they can be left out; the default "
"variant is '' which selects the default dataset variant; "
"the default group is '*' which selects all groups; the "
"default test is '*' which selects all tests",
)
parser.add_argument(
"--memgraph-binary", default=helpers.get_binary_path("memgraph"), help="Memgraph binary used for benchmarking"
)
parser.add_argument(
"--client-binary",
default=helpers.get_binary_path("tests/mgbench/client"),
help="client binary used for benchmarking",
)
parser.add_argument(
"--num-workers-for-import",
type=int,
default=multiprocessing.cpu_count() // 2,
help="number of workers used to import the dataset",
)
parser.add_argument(
"--num-workers-for-benchmark", type=int, default=1, help="number of workers used to execute the benchmark"
)
parser.add_argument("--single-threaded-runtime-sec", type=int, default=10, help="single threaded duration of each test")
parser.add_argument("--no-load-query-counts", action="store_true", help="disable loading of cached query counts")
parser.add_argument("--no-save-query-counts", action="store_true", help="disable storing of cached query counts")
parser.add_argument("--export-results", default="", help="file path into which results should be exported")
parser.add_argument(
"--temporary-directory", default="/tmp", help="directory path where temporary data should " "be stored"
)
parser.add_argument("--no-properties-on-edges", action="store_true", help="disable properties on edges")
parser.add_argument("--datasets-path", default="datasets", help="path to datasets to scan")
parser.add_argument("--test-system-args", default="")
args = parser.parse_args()
head_tail = os.path.split(args.datasets_path)
path_without_dataset_name = head_tail[0]
dataset_name = head_tail[1].split(".")[0]
sys.path.append(path_without_dataset_name)
dataset_to_use = importlib.import_module(dataset_name)
# Detect available datasets.
generators = {}
for key in dir(datasets):
for key in dir(dataset_to_use):
if key.startswith("_"):
continue
dataset = getattr(datasets, key)
if not inspect.isclass(dataset) or dataset == datasets.Dataset or \
not issubclass(dataset, datasets.Dataset):
dataset = getattr(dataset_to_use, key)
if not inspect.isclass(dataset) or dataset == datasets.Dataset or not issubclass(dataset, datasets.Dataset):
continue
tests = collections.defaultdict(list)
for funcname in dir(dataset):
@ -135,8 +146,9 @@ for key in dir(datasets):
tests[group].append((test, funcname))
generators[dataset.NAME] = (dataset, dict(tests))
if dataset.PROPERTIES_ON_EDGES and args.no_properties_on_edges:
raise Exception("The \"{}\" dataset requires properties on edges, "
"but you have disabled them!".format(dataset.NAME))
raise Exception(
'The "{}" dataset requires properties on edges, ' "but you have disabled them!".format(dataset.NAME)
)
# List datasets if there is no specified dataset.
if len(args.benchmarks) == 0:
@ -144,8 +156,7 @@ if len(args.benchmarks) == 0:
for name in sorted(generators.keys()):
print("Dataset:", name)
dataset, tests = generators[name]
print(" Variants:", ", ".join(dataset.VARIANTS),
"(default: " + dataset.DEFAULT_VARIANT + ")")
print(" Variants:", ", ".join(dataset.VARIANTS), "(default: " + dataset.DEFAULT_VARIANT + ")")
for group in sorted(tests.keys()):
print(" Group:", group)
for test_name, test_func in tests[group]:
@ -165,31 +176,44 @@ benchmarks = filter_benchmarks(generators, args.benchmarks)
# Run all specified benchmarks.
for dataset, tests in benchmarks:
log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(),
"dataset")
dataset.prepare(cache.cache_directory("datasets", dataset.NAME,
dataset.get_variant()))
log.init("Preparing", dataset.NAME + "/" + dataset.get_variant(), "dataset")
dataset.prepare(cache.cache_directory("datasets", dataset.NAME, dataset.get_variant()))
# Prepare runners and import the dataset.
memgraph = runners.Memgraph(args.memgraph_binary, args.temporary_directory,
not args.no_properties_on_edges)
memgraph = runners.Memgraph(
args.memgraph_binary,
args.temporary_directory,
not args.no_properties_on_edges,
args.test_system_args,
)
client = runners.Client(args.client_binary, args.temporary_directory)
memgraph.start_preparation()
ret = client.execute(file_path=dataset.get_file(),
num_workers=args.num_workers_for_import)
time.sleep(5.0) # giving enough time to machine manager and all to start up
ret = client.execute(file_path=dataset.get_file(), num_workers=args.num_workers_for_import)
usage = memgraph.stop()
# Display import statistics.
print()
for row in ret:
print("Executed", row["count"], "queries in", row["duration"],
"seconds using", row["num_workers"],
"workers with a total throughput of", row["throughput"],
"queries/second.")
print(
"Executed",
row["count"],
"queries in",
row["duration"],
"seconds using",
row["num_workers"],
"workers with a total throughput of",
row["throughput"],
"queries/second.",
)
print()
print("The database used", usage["cpu"],
"seconds of CPU time and peaked at",
usage["memory"] / 1024 / 1024, "MiB of RAM.")
print(
"The database used",
usage["cpu"],
"seconds of CPU time and peaked at",
usage["memory"] / 1024 / 1024,
"MiB of RAM.",
)
# Save import results.
import_key = [dataset.NAME, dataset.get_variant(), "__import__"]
@ -208,24 +232,26 @@ for dataset, tests in benchmarks:
config_key = [dataset.NAME, dataset.get_variant(), group, test]
cached_count = config.get_value(*config_key)
if cached_count is None:
print("Determining the number of queries necessary for",
args.single_threaded_runtime_sec,
"seconds of single-threaded runtime...")
print(
"Determining the number of queries necessary for",
args.single_threaded_runtime_sec,
"seconds of single-threaded runtime...",
)
# First run to prime the query caches.
memgraph.start_benchmark()
client.execute(queries=get_queries(func, 1), num_workers=1)
# Get a sense of the runtime.
count = 1
while True:
ret = client.execute(queries=get_queries(func, count),
num_workers=1)
ret = client.execute(queries=get_queries(func, count), num_workers=1)
duration = ret[0]["duration"]
should_execute = int(args.single_threaded_runtime_sec /
(duration / count))
print("executed_queries={}, total_duration={}, "
"query_duration={}, estimated_count={}".format(
count, duration, duration / count,
should_execute))
should_execute = int(args.single_threaded_runtime_sec / (duration / count))
print(
"executed_queries={}, total_duration={}, "
"query_duration={}, estimated_count={}".format(
count, duration, duration / count, should_execute
)
)
# We don't have to execute the next iteration when
# `should_execute` becomes the same order of magnitude as
# `count * 10`.
@ -235,45 +261,45 @@ for dataset, tests in benchmarks:
else:
count = count * 10
memgraph.stop()
config.set_value(*config_key, value={
"count": count,
"duration": args.single_threaded_runtime_sec})
config.set_value(*config_key, value={"count": count, "duration": args.single_threaded_runtime_sec})
else:
print("Using cached query count of", cached_count["count"],
"queries for", cached_count["duration"],
"seconds of single-threaded runtime.")
count = int(cached_count["count"] *
args.single_threaded_runtime_sec /
cached_count["duration"])
print(
"Using cached query count of",
cached_count["count"],
"queries for",
cached_count["duration"],
"seconds of single-threaded runtime.",
)
count = int(cached_count["count"] * args.single_threaded_runtime_sec / cached_count["duration"])
# Benchmark run.
print("Sample query:", get_queries(func, 1)[0][0])
print("Executing benchmark with", count, "queries that should "
"yield a single-threaded runtime of",
args.single_threaded_runtime_sec, "seconds.")
print("Queries are executed using", args.num_workers_for_benchmark,
"concurrent clients.")
print(
"Executing benchmark with",
count,
"queries that should yield a single-threaded runtime of",
args.single_threaded_runtime_sec,
"seconds.",
)
print("Queries are executed using", args.num_workers_for_benchmark, "concurrent clients.")
memgraph.start_benchmark()
ret = client.execute(queries=get_queries(func, count),
num_workers=args.num_workers_for_benchmark)[0]
ret = client.execute(queries=get_queries(func, count), num_workers=args.num_workers_for_benchmark)[0]
usage = memgraph.stop()
ret["database"] = usage
# Output summary.
print()
print("Executed", ret["count"], "queries in",
ret["duration"], "seconds.")
print("Executed", ret["count"], "queries in", ret["duration"], "seconds.")
print("Queries have been retried", ret["retries"], "times.")
print("Database used {:.3f} seconds of CPU time.".format(
usage["cpu"]))
print("Database peaked at {:.3f} MiB of memory.".format(
usage["memory"] / 1024.0 / 1024.0))
print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min",
"avg", "max"))
print("Database used {:.3f} seconds of CPU time.".format(usage["cpu"]))
print("Database peaked at {:.3f} MiB of memory.".format(usage["memory"] / 1024.0 / 1024.0))
print("{:<31} {:>20} {:>20} {:>20}".format("Metadata:", "min", "avg", "max"))
metadata = ret["metadata"]
for key in sorted(metadata.keys()):
print("{name:>30}: {minimum:>20.06f} {average:>20.06f} "
"{maximum:>20.06f}".format(name=key, **metadata[key]))
print(
"{name:>30}: {minimum:>20.06f} {average:>20.06f} "
"{maximum:>20.06f}".format(name=key, **metadata[key])
)
log.success("Throughput: {:02f} QPS".format(ret["throughput"]))
# Save results.

View File

@ -0,0 +1,118 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import argparse
import random
import helpers
# Explaination of datasets:
# - empty_only_index: contains index; contains no data
# - small: contains index; contains data (small dataset)
#
# Datamodel is as follow:
#
# ┌──────────────┐
# │ Permission │
# ┌────────────────┐ │ Schema:uuid │ ┌────────────┐
# │:IS_FOR_IDENTITY├────┤ Index:name ├───┤:IS_FOR_FILE│
# └┬───────────────┘ └──────────────┘ └────────────┤
# │ │
# ┌──────▼──────────────┐ ┌──▼────────────────┐
# │ Identity │ │ File │
# │ Schema:uuid │ │ Schema:uuid │
# │ Index:email │ │ Index:name │
# └─────────────────────┘ │ Index:platformId │
# └───────────────────┘
#
# - File: attributes: ["uuid", "name", "platformId"]
# - Permission: attributes: ["uuid", "name"]
# - Identity: attributes: ["uuid", "email"]
#
# Indexes:
# - File: [File(uuid), File(platformId), File(name)]
# - Permission: [Permission(uuid), Permission(name)]
# - Identity: [Identity(uuid), Identity(email)]
#
# Edges:
# - (:Permission)-[:IS_FOR_FILE]->(:File)
# - (:Permission)-[:IS_FOR_IDENTITYR]->(:Identity)
#
# AccessControl specific: uuid is the schema
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--number_of_identities", type=int, default=10)
parser.add_argument("--number_of_files", type=int, default=10)
parser.add_argument("--percentage_of_permissions", type=float, default=1.0)
parser.add_argument("--filename", default="dataset.cypher")
args = parser.parse_args()
number_of_identities = args.number_of_identities
number_of_files = args.number_of_files
percentage_of_permissions = args.percentage_of_permissions
filename = args.filename
assert number_of_identities >= 0
assert number_of_files >= 0
assert percentage_of_permissions > 0.0 and percentage_of_permissions <= 1.0
assert filename != ""
with open(filename, "w") as f:
f.write("MATCH (n) DETACH DELETE n;\n")
# Create the indexes
f.write("CREATE INDEX ON :File;\n")
f.write("CREATE INDEX ON :Permission;\n")
f.write("CREATE INDEX ON :Identity;\n")
f.write("CREATE INDEX ON :File(platformId);\n")
f.write("CREATE INDEX ON :File(name);\n")
f.write("CREATE INDEX ON :Permission(name);\n")
f.write("CREATE INDEX ON :Identity(email);\n")
# Create extra index: in distributed, this will be the schema
f.write("CREATE INDEX ON :File(uuid);\n")
f.write("CREATE INDEX ON :Permission(uuid);\n")
f.write("CREATE INDEX ON :Identity(uuid);\n")
uuid = 1
# Create the nodes File
for index in range(0, number_of_files):
f.write(f'CREATE (:File {{uuid: {uuid}, platformId: "platform_id", name: "name_file_{uuid}"}});\n')
uuid += 1
identities = []
# Create the nodes Identity
for index in range(0, number_of_identities):
f.write(f'CREATE (:Identity {{uuid: {uuid}, name: "mail_{uuid}@something.com"}});\n')
uuid += 1
for outer_index in range(0, number_of_files):
for inner_index in range(0, number_of_identities):
file_uuid = outer_index + 1
identity_uuid = number_of_files + inner_index + 1
if random.random() <= percentage_of_permissions:
f.write(f'CREATE (:Permission {{uuid: {uuid}, name: "name_permission_{uuid}"}});\n')
f.write(
f"MATCH (permission:Permission {{uuid: {uuid}}}), (file:File {{uuid: {file_uuid}}}) CREATE (permission)-[e: IS_FOR_FILE]->(file);\n"
)
f.write(
f"MATCH (permission:Permission {{uuid: {uuid}}}), (identity:Identity {{uuid: {identity_uuid}}}) CREATE (permission)-[e: IS_FOR_IDENTITY]->(identity);\n"
)
uuid += 1
if __name__ == "__main__":
main()

View File

@ -45,13 +45,10 @@ class Dataset:
variant = self.DEFAULT_VARIANT
if variant not in self.VARIANTS:
raise ValueError("Invalid test variant!")
if (self.FILES and variant not in self.FILES) and \
(self.URLS and variant not in self.URLS):
raise ValueError("The variant doesn't have a defined URL or "
"file path!")
if (self.FILES and variant not in self.FILES) and (self.URLS and variant not in self.URLS):
raise ValueError("The variant doesn't have a defined URL or " "file path!")
if variant not in self.SIZES:
raise ValueError("The variant doesn't have a defined dataset "
"size!")
raise ValueError("The variant doesn't have a defined dataset " "size!")
self._variant = variant
if self.FILES is not None:
self._file = self.FILES.get(variant, None)
@ -63,8 +60,7 @@ class Dataset:
self._url = None
self._size = self.SIZES[variant]
if "vertices" not in self._size or "edges" not in self._size:
raise ValueError("The size defined for this variant doesn't "
"have the number of vertices and/or edges!")
raise ValueError("The size defined for this variant doesn't " "have the number of vertices and/or edges!")
self._num_vertices = self._size["vertices"]
self._num_edges = self._size["edges"]
@ -76,8 +72,7 @@ class Dataset:
cached_input, exists = directory.get_file("dataset.cypher")
if not exists:
print("Downloading dataset file:", self._url)
downloaded_file = helpers.download_file(
self._url, directory.get_path())
downloaded_file = helpers.download_file(self._url, directory.get_path())
print("Unpacking and caching file:", downloaded_file)
helpers.unpack_and_move_file(downloaded_file, cached_input)
print("Using cached dataset file:", cached_input)
@ -109,9 +104,9 @@ class Pokec(Dataset):
DEFAULT_VARIANT = "small"
FILES = None
URLS = {
"small": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_small.setup.cypher",
"medium": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_medium.setup.cypher",
"large": "https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pokec_large.setup.cypher.gz",
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/pokec_small.setup.cypher",
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/pokec_medium.setup.cypher",
"large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/pokec/pokec_large.setup.cypher.gz",
}
SIZES = {
"small": {"vertices": 10000, "edges": 121716},
@ -137,18 +132,17 @@ class Pokec(Dataset):
# Arango benchmarks
def benchmark__arango__single_vertex_read(self):
return ("MATCH (n:User {id : $id}) RETURN n",
{"id": self._get_random_vertex()})
return ("MATCH (n:User {id : $id}) RETURN n", {"id": self._get_random_vertex()})
def benchmark__arango__single_vertex_write(self):
return ("CREATE (n:UserTemp {id : $id}) RETURN n",
{"id": random.randint(1, self._num_vertices * 10)})
return ("CREATE (n:UserTemp {id : $id}) RETURN n", {"id": random.randint(1, self._num_vertices * 10)})
def benchmark__arango__single_edge_write(self):
vertex_from, vertex_to = self._get_random_from_to()
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"CREATE (n)-[e:Temp]->(m) RETURN e",
{"from": vertex_from, "to": vertex_to})
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m " "CREATE (n)-[e:Temp]->(m) RETURN e",
{"from": vertex_from, "to": vertex_to},
)
def benchmark__arango__aggregate(self):
return ("MATCH (n:User) RETURN n.age, COUNT(*)", {})
@ -157,92 +151,94 @@ class Pokec(Dataset):
return ("MATCH (n:User) WHERE n.age >= 18 RETURN n.age, COUNT(*)", {})
def benchmark__arango__expansion_1(self):
return ("MATCH (s:User {id: $id})-->(n:User) "
"RETURN n.id",
{"id": self._get_random_vertex()})
return ("MATCH (s:User {id: $id})-->(n:User) " "RETURN n.id", {"id": self._get_random_vertex()})
def benchmark__arango__expansion_1_with_filter(self):
return ("MATCH (s:User {id: $id})-->(n:User) "
"WHERE n.age >= 18 "
"RETURN n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->(n:User) " "WHERE n.age >= 18 " "RETURN n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_2(self):
return ("MATCH (s:User {id: $id})-->()-->(n:User) "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return ("MATCH (s:User {id: $id})-->()-->(n:User) " "RETURN DISTINCT n.id", {"id": self._get_random_vertex()})
def benchmark__arango__expansion_2_with_filter(self):
return ("MATCH (s:User {id: $id})-->()-->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_3(self):
return ("MATCH (s:User {id: $id})-->()-->()-->(n:User) "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_3_with_filter(self):
return ("MATCH (s:User {id: $id})-->()-->()-->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_4(self):
return ("MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__expansion_4_with_filter(self):
return ("MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-->()-->()-->()-->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__neighbours_2(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id", {"id": self._get_random_vertex()})
def benchmark__arango__neighbours_2_with_filter(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id",
{"id": self._get_random_vertex()},
)
def benchmark__arango__neighbours_2_with_data(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
"RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()},
)
def benchmark__arango__neighbours_2_with_data_and_filter(self):
return ("MATCH (s:User {id: $id})-[*1..2]->(n:User) "
"WHERE n.age >= 18 "
"RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()})
return (
"MATCH (s:User {id: $id})-[*1..2]->(n:User) " "WHERE n.age >= 18 " "RETURN DISTINCT n.id, n",
{"id": self._get_random_vertex()},
)
def benchmark__arango__shortest_path(self):
vertex_from, vertex_to = self._get_random_from_to()
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*bfs..15]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to})
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*bfs..15]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to},
)
def benchmark__arango__shortest_path_with_filter(self):
vertex_from, vertex_to = self._get_random_from_to()
return ("MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to})
return (
"MATCH (n:User {id: $from}), (m:User {id: $to}) WITH n, m "
"MATCH p=(n)-[*bfs..15 (e, n | n.age >= 18)]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{"from": vertex_from, "to": vertex_to},
)
# Our benchmark queries
def benchmark__create__edge(self):
vertex_from, vertex_to = self._get_random_from_to()
return ("MATCH (a:User {id: $from}), (b:User {id: $to}) "
"CREATE (a)-[:TempEdge]->(b)",
{"from": vertex_from, "to": vertex_to})
return (
"MATCH (a:User {id: $from}), (b:User {id: $to}) " "CREATE (a)-[:TempEdge]->(b)",
{"from": vertex_from, "to": vertex_to},
)
def benchmark__create__pattern(self):
return ("CREATE ()-[:TempEdge]->()", {})
@ -251,9 +247,12 @@ class Pokec(Dataset):
return ("CREATE ()", {})
def benchmark__create__vertex_big(self):
return ("CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, "
"p3: \"Here is some text that is not extremely short\", "
"p4:\"Short text\", p5: 234.434, p6: 11.11, p7: false})", {})
return (
"CREATE (:L1:L2:L3:L4:L5:L6:L7 {p1: true, p2: 42, "
'p3: "Here is some text that is not extremely short", '
'p4:"Short text", p5: 234.434, p6: 11.11, p7: false})',
{},
)
def benchmark__aggregation__count(self):
return ("MATCH (n) RETURN count(n), count(n.age)", {})
@ -262,29 +261,124 @@ class Pokec(Dataset):
return ("MATCH (n) RETURN min(n.age), max(n.age), avg(n.age)", {})
def benchmark__match__pattern_cycle(self):
return ("MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) "
"RETURN e1, m, e2",
{"id": self._get_random_vertex()})
return ("MATCH (n:User {id: $id})-[e1]->(m)-[e2]->(n) " "RETURN e1, m, e2", {"id": self._get_random_vertex()})
def benchmark__match__pattern_long(self):
return ("MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->"
"(n3)-[e3]->(n4)<-[e4]-(n5) "
"RETURN n5 LIMIT 1",
{"id": self._get_random_vertex()})
return (
"MATCH (n1:User {id: $id})-[e1]->(n2)-[e2]->" "(n3)-[e3]->(n4)<-[e4]-(n5) " "RETURN n5 LIMIT 1",
{"id": self._get_random_vertex()},
)
def benchmark__match__pattern_short(self):
return ("MATCH (n:User {id: $id})-[e]->(m) "
"RETURN m LIMIT 1",
{"id": self._get_random_vertex()})
return ("MATCH (n:User {id: $id})-[e]->(m) " "RETURN m LIMIT 1", {"id": self._get_random_vertex()})
def benchmark__match__vertex_on_label_property(self):
return ("MATCH (n:User) WITH n WHERE n.id = $id RETURN n",
{"id": self._get_random_vertex()})
return ("MATCH (n:User) WITH n WHERE n.id = $id RETURN n", {"id": self._get_random_vertex()})
def benchmark__match__vertex_on_label_property_index(self):
return ("MATCH (n:User {id: $id}) RETURN n",
{"id": self._get_random_vertex()})
return ("MATCH (n:User {id: $id}) RETURN n", {"id": self._get_random_vertex()})
def benchmark__match__vertex_on_property(self):
return ("MATCH (n {id: $id}) RETURN n",
{"id": self._get_random_vertex()})
return ("MATCH (n {id: $id}) RETURN n", {"id": self._get_random_vertex()})
class AccessControl(Dataset):
# Explaination of datasets:
# - empty_only_index: contains index; contains no data
# - small/medium/large: contains index; contains data (respectively small/medium/large dataset)
#
# See dataset_creator.py to understand the datamodel and generate a dataset
NAME = "accesscontrol"
VARIANTS = ["empty_only_index", "small", "medium", "large"]
DEFAULT_VARIANT = "empty_only_index"
URLS = {
"empty_only_index": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/accesscontrol/accesscontrol_empty_only_index.setup.cypher.gz",
"small": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/accesscontrol/accesscontrol_small.setup.cypher.gz",
"medium": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/accesscontrol/accesscontrol_medium.setup.cypher.gz",
"large": "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/accesscontrol/accesscontrol_large.setup.cypher.gz",
}
SIZES = {
"empty_only_index": {
"vertices": 0,
"edges": -1, # not used
"uuid_ranges": {
"File": {"first_uuid": 0, "last_uuid": 0},
"Permission": {"first_uuid": 0, "last_uuid": 0},
"Identity": {"first_uuid": 0, "last_uuid": 0},
},
},
"small": {
"vertices": 30,
"edges": -1, # not used
"uuid_ranges": {
"File": {"first_uuid": 1, "last_uuid": 10},
"Identity": {"first_uuid": 11, "last_uuid": 20},
"Permission": {"first_uuid": 21, "last_uuid": 120}, # 120=10*10+20
},
},
"medium": {
"vertices": 3000,
"edges": -1, # not used
"uuid_ranges": {
"File": {"first_uuid": 1, "last_uuid": 1000},
"Identity": {"first_uuid": 1001, "last_uuid": 2000},
"Permission": {"first_uuid": 2001, "last_uuid": 1002000}, # 1002000=1000*1000+2000
},
},
"large": {
"vertices": 30000,
"edges": -1, # not used
"uuid_ranges": {
"File": {"first_uuid": 1, "last_uuid": 10000},
"Identity": {"first_uuid": 10001, "last_uuid": 20000},
"Permission": {"first_uuid": 20001, "last_uuid": 100020000}, # 100020000=10000*10000+20000
},
},
}
def _get_random_uuid(self, type):
assert type in ["File", "Permission", "Identity"]
first_uuid = self.get_size()["uuid_ranges"][type]["first_uuid"]
last_uuid = self.get_size()["uuid_ranges"][type]["last_uuid"]
random_value = random.randint(first_uuid, last_uuid)
return random_value
def __init__(self, variant=None):
super().__init__(variant)
self.next_value_idx = self.get_size()["vertices"] + 1
def benchmark__create__vertex(self):
self.next_value_idx += 1
query = (f"CREATE (:File {{uuid: {self.next_value_idx}}});", {})
return query
def benchmark__create__edges(self):
permission_uuid = self._get_random_uuid("Permission")
file_uuid = self._get_random_uuid("File")
query = (
"MATCH (permission:Permission {uuid: $permission_uuid}), (file:File {uuid: $file_uuid}) "
"CREATE (permission)-[:IS_FOR_FILE]->(file)",
{"permission_uuid": permission_uuid, "file_uuid": file_uuid},
)
return query
def benchmark__match__match_all_vertices(self):
self.next_value_idx += 1
query = ("MATCH (n) RETURN *", {})
return query
def benchmark__match__match_on_labelled_vertices(self):
self.next_value_idx += 1
query = ("MATCH (n:File) RETURN *", {})
return query
def benchmark__match__match_all_vertices_with_edges(self):
self.next_value_idx += 1
query = ("MATCH (permission:Permission)-[e:IS_FOR_FILE]->(file:File) RETURN *", {})
return query

View File

@ -14,7 +14,6 @@ import json
import os
import subprocess
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
@ -28,18 +27,25 @@ def get_binary_path(path, base=""):
def download_file(url, path):
ret = subprocess.run(["wget", "-nv", "--content-disposition", url],
stderr=subprocess.PIPE, cwd=path, check=True)
data = ret.stderr.decode("utf-8")
tmp = data.split("->")[1]
name = tmp[tmp.index('"') + 1:tmp.rindex('"')]
return os.path.join(path, name)
if "https://" in url:
ret = subprocess.run(
["wget", "-nv", "--content-disposition", url], stderr=subprocess.PIPE, cwd=path, check=True
)
data = ret.stderr.decode("utf-8")
tmp = data.split("->")[1]
name = tmp[tmp.index('"') + 1 : tmp.rindex('"')]
return os.path.join(path, name)
else:
assert os.path.exists(url)
subprocess.run(["cp", url, path], stderr=subprocess.PIPE, cwd=path, check=True)
tmp = url.split("/")
name = tmp[len(tmp) - 1]
return os.path.join(path, name)
def unpack_and_move_file(input_path, output_path):
if input_path.endswith(".gz"):
subprocess.run(["gunzip", input_path],
stdout=subprocess.DEVNULL, check=True)
subprocess.run(["gunzip", input_path], stdout=subprocess.DEVNULL, check=True)
input_path = input_path[:-3]
os.rename(input_path, output_path)

View File

@ -40,8 +40,7 @@ def _convert_args_to_flags(*args, **kwargs):
def _get_usage(pid):
total_cpu = 0
with open("/proc/{}/stat".format(pid)) as f:
total_cpu = (sum(map(int, f.read().split(")")[1].split()[11:15])) /
os.sysconf(os.sysconf_names["SC_CLK_TCK"]))
total_cpu = sum(map(int, f.read().split(")")[1].split()[11:15])) / os.sysconf(os.sysconf_names["SC_CLK_TCK"])
peak_rss = 0
with open("/proc/{}/status".format(pid)) as f:
for row in f:
@ -52,18 +51,17 @@ def _get_usage(pid):
class Memgraph:
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges):
def __init__(self, memgraph_binary, temporary_dir, properties_on_edges, extra_args):
self._memgraph_binary = memgraph_binary
self._directory = tempfile.TemporaryDirectory(dir=temporary_dir)
self._properties_on_edges = properties_on_edges
self._proc_mg = None
self._extra_args = extra_args
atexit.register(self._cleanup)
# Determine Memgraph version
ret = subprocess.run([memgraph_binary, "--version"],
stdout=subprocess.PIPE, check=True)
version = re.search(r"[0-9]+\.[0-9]+\.[0-9]+",
ret.stdout.decode("utf-8")).group(0)
ret = subprocess.run([memgraph_binary, "--version"], stdout=subprocess.PIPE, check=True)
version = re.search(r"[0-9]+\.[0-9]+\.[0-9]+", ret.stdout.decode("utf-8")).group(0)
self._memgraph_version = tuple(map(int, version.split(".")))
def __del__(self):
@ -79,8 +77,14 @@ class Memgraph:
if self._memgraph_version >= (0, 50, 0):
kwargs["storage_properties_on_edges"] = self._properties_on_edges
else:
assert self._properties_on_edges, \
"Older versions of Memgraph can't disable properties on edges!"
assert self._properties_on_edges, "Older versions of Memgraph can't disable properties on edges!"
if self._extra_args != "":
args_list = self._extra_args.split(" ")
assert len(args_list) % 2 == 0
for i in range(0, len(args_list), 2):
kwargs[args_list[i]] = args_list[i + 1]
return _convert_args_to_flags(self._memgraph_binary, **kwargs)
def _start(self, **kwargs):
@ -94,8 +98,7 @@ class Memgraph:
raise Exception("The database process died prematurely!")
wait_for_server(7687)
ret = self._proc_mg.poll()
assert ret is None, "The database process died prematurely " \
"({})!".format(ret)
assert ret is None, "The database process died prematurely " "({})!".format(ret)
def _cleanup(self):
if self._proc_mg is None:
@ -121,8 +124,7 @@ class Memgraph:
def stop(self):
ret, usage = self._cleanup()
assert ret == 0, "The database process exited with a non-zero " \
"status ({})!".format(ret)
assert ret == 0, "The database process exited with a non-zero " "status ({})!".format(ret)
return usage
@ -135,8 +137,7 @@ class Client:
return _convert_args_to_flags(self._client_binary, **kwargs)
def execute(self, queries=None, file_path=None, num_workers=1):
if (queries is None and file_path is None) or \
(queries is not None and file_path is not None):
if (queries is None and file_path is None) or (queries is not None and file_path is not None):
raise ValueError("Either queries or input_path must be specified!")
# TODO: check `file_path.endswith(".json")` to support advanced
@ -151,8 +152,8 @@ class Client:
json.dump(query, f)
f.write("\n")
args = self._get_args(input=file_path, num_workers=num_workers,
queries_json=queries_json)
args = self._get_args(input=file_path, num_workers=num_workers, queries_json=queries_json)
ret = subprocess.run(args, stdout=subprocess.PIPE, check=True)
data = ret.stdout.decode("utf-8").strip().split("\n")
data = [x for x in data if not x.startswith("[")]
return list(map(json.loads, data))

View File

@ -0,0 +1,36 @@
4
uuid
email
name
platformId
2
IS_FOR_IDENTITY
IS_FOR_FILE
3
File
1
uuid
int
1
[1]
Identity
1
uuid
int
1
[10001]
Permission
1
uuid
int
10
[20001]
[10020000]
[20002000]
[30002000]
[40002000]
[50002000]
[60002000]
[70002000]
[80002000]
[90002000]

View File

@ -0,0 +1,36 @@
4
uuid
email
name
platformId
2
IS_FOR_IDENTITY
IS_FOR_FILE
3
File
1
uuid
int
1
[1]
Identity
1
uuid
int
1
[1001]
Permission
1
uuid
int
10
[2001]
[102000]
[202000]
[302000]
[402000]
[502000]
[602000]
[702000]
[802000]
[902000]

View File

@ -0,0 +1,36 @@
4
uuid
email
name
platformId
2
IS_FOR_IDENTITY
IS_FOR_FILE
3
File
1
uuid
int
1
[1]
Identity
1
uuid
int
1
[11]
Permission
1
uuid
int
10
[21]
[31]
[41]
[51]
[61]
[71]
[81]
[91]
[100]
[110]

View File

@ -2650,14 +2650,16 @@ TEST_P(StorageV3, TestCreateVertexAndValidate) {
(std::map<PropertyId, PropertyValue>{{prop1, PropertyValue(111)}}));
}
{
ASSERT_DEATH(
{
Shard store(primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector);
auto acc = store.Access(GetNextHlc());
auto vertex1 = acc.CreateVertexAndValidate({}, {PropertyValue{0}}, {});
auto vertex2 = acc.CreateVertexAndValidate({}, {PropertyValue{0}}, {});
},
"");
Shard store(primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector);
auto acc = store.Access(GetNextHlc());
auto vertex1 = acc.CreateVertexAndValidate({}, {PropertyValue{0}}, {});
auto vertex2 = acc.CreateVertexAndValidate({}, {PropertyValue{0}}, {});
ASSERT_TRUE(vertex2.HasError());
auto error = vertex2.GetError();
auto error_ptr = std::get_if<memgraph::storage::v3::Error>(&error);
ASSERT_TRUE(error_ptr);
ASSERT_TRUE(*error_ptr == storage::v3::Error::VERTEX_ALREADY_INSERTED);
}
{
auto acc = store.Access(GetNextHlc());

View File

@ -18,6 +18,7 @@
#include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/shard_operation_result.hpp"
namespace memgraph::storage::v3::tests {
using testing::UnorderedElementsAre;
@ -38,7 +39,7 @@ class StorageEdgeTest : public ::testing::TestWithParam<bool> {
return store.NameToEdgeType(edge_type_name);
}
static ResultSchema<VertexAccessor> CreateVertex(Shard::Accessor &acc, const PropertyValue &key) {
static ShardOperationResult<VertexAccessor> CreateVertex(Shard::Accessor &acc, const PropertyValue &key) {
return acc.CreateVertexAndValidate({}, {key}, {});
}