Merge branch 'master' into text-search-integration-poc

This commit is contained in:
Ante Pušić 2024-01-03 01:07:43 +01:00 committed by GitHub
commit c3208b064b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 419 additions and 527 deletions

View File

@ -229,10 +229,6 @@ jobs:
# branches and tags. (default: 1) # branches and tags. (default: 1)
fetch-depth: 0 fetch-depth: 0
- name: Check e2e service dependencies
run: |
cd tests/e2e
./dependency_check.sh
- name: Build release binaries - name: Build release binaries
run: | run: |
@ -271,6 +267,13 @@ jobs:
cd build cd build
ctest -R memgraph__unit --output-on-failure -j$THREADS ctest -R memgraph__unit --output-on-failure -j$THREADS
- name: Ensure Kafka and Pulsar are up
run: |
cd tests/e2e/streams/kafka
docker-compose up -d
cd ../pulsar
docker-compose up -d
- name: Run e2e tests - name: Run e2e tests
run: | run: |
cd tests cd tests
@ -279,6 +282,14 @@ jobs:
cd e2e cd e2e
./run.sh ./run.sh
- name: Ensure Kafka and Pulsar are down
if: always()
run: |
cd tests/e2e/streams/kafka
docker-compose down
cd ../pulsar
docker-compose down
- name: Run stress test (plain) - name: Run stress test (plain)
run: | run: |
cd tests/stress cd tests/stress

View File

@ -1,318 +0,0 @@
name: Release CentOS 8
on:
workflow_dispatch:
inputs:
build_type:
type: choice
description: "Memgraph Build type. Default value is Release."
default: 'Release'
options:
- Release
- RelWithDebInfo
schedule:
- cron: "0 22 * * *"
jobs:
community_build:
name: "Community build"
runs-on: [self-hosted, Linux, X64, CentOS8]
env:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
timeout-minutes: 960
steps:
- name: Set up repository
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
fetch-depth: 0
- name: Build community binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
# Set default build_type to Release
INPUT_BUILD_TYPE=${{ github.event.inputs.build_type }}
BUILD_TYPE=${INPUT_BUILD_TYPE:-"Release"}
# Build community binaries.
cd build
cmake -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DMG_ENTERPRISE=OFF ..
make -j$THREADS
- name: Run unit tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run unit tests.
cd build
ctest -R memgraph__unit --output-on-failure
coverage_build:
name: "Coverage build"
runs-on: [self-hosted, Linux, X64, CentOS8]
env:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
steps:
- name: Set up repository
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
fetch-depth: 0
- name: Build coverage binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
# Build coverage binaries.
cd build
cmake -DTEST_COVERAGE=ON ..
make -j$THREADS memgraph__unit
- name: Run unit tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run unit tests.
cd build
ctest -R memgraph__unit --output-on-failure
- name: Compute code coverage
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Compute code coverage.
cd tools/github
./coverage_convert
# Package code coverage.
cd generated
tar -czf code_coverage.tar.gz coverage.json html report.json summary.rmu
- name: Save code coverage
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/generated/code_coverage.tar.gz
debug_build:
name: "Debug build"
runs-on: [self-hosted, Linux, X64, CentOS8]
env:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
steps:
- name: Set up repository
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
fetch-depth: 0
- name: Build debug binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
# Build debug binaries.
cd build
cmake ..
make -j$THREADS
- name: Run leftover CTest tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run leftover CTest tests (all except unit and benchmark tests).
cd build
ctest -E "(memgraph__unit|memgraph__benchmark)" --output-on-failure
- name: Run drivers tests
run: |
./tests/drivers/run.sh
- name: Run integration tests
run: |
tests/integration/run.sh
- name: Run cppcheck and clang-format
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run cppcheck and clang-format.
cd tools/github
./cppcheck_and_clang_format diff
- name: Save cppcheck and clang-format errors
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/cppcheck_and_clang_format.txt
release_build:
name: "Release build"
runs-on: [self-hosted, Linux, X64, CentOS8]
env:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
timeout-minutes: 960
steps:
- name: Set up repository
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
fetch-depth: 0
- name: Build release binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
# Set default build_type to Release
INPUT_BUILD_TYPE=${{ github.event.inputs.build_type }}
BUILD_TYPE=${INPUT_BUILD_TYPE:-"Release"}
# Build release binaries.
cd build
cmake -DCMAKE_BUILD_TYPE=$BUILD_TYPE ..
make -j$THREADS
- name: Create enterprise RPM package
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
cd build
# create mgconsole
# we use the -B to force the build
make -j$THREADS -B mgconsole
# Create enterprise RPM package.
mkdir output && cd output
cpack -G RPM --config ../CPackConfig.cmake
rpmlint memgraph*.rpm
- name: Save enterprise RPM package
uses: actions/upload-artifact@v3
with:
name: "Enterprise RPM package"
path: build/output/memgraph*.rpm
- name: Run micro benchmark tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run micro benchmark tests.
cd build
# The `eval` benchmark needs a large stack limit.
ulimit -s 262144
ctest -R memgraph__benchmark -V
- name: Run macro benchmark tests
run: |
cd tests/macro_benchmark
./harness QuerySuite MemgraphRunner \
--groups aggregation 1000_create unwind_create dense_expand match \
--no-strict
- name: Run parallel macro benchmark tests
run: |
cd tests/macro_benchmark
./harness QueryParallelSuite MemgraphRunner \
--groups aggregation_parallel create_parallel bfs_parallel \
--num-database-workers 9 --num-clients-workers 30 \
--no-strict
- name: Run GQL Behave tests
run: |
cd tests
./setup.sh /opt/toolchain-v4/activate
cd gql_behave
./continuous_integration
- name: Save quality assurance status
uses: actions/upload-artifact@v3
with:
name: "GQL Behave Status"
path: |
tests/gql_behave/gql_behave_status.csv
tests/gql_behave/gql_behave_status.html
- name: Run unit tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run unit tests.
cd build
ctest -R memgraph__unit --output-on-failure
- name: Run e2e tests
run: |
cd tests
./setup.sh /opt/toolchain-v4/activate
source ve3/bin/activate_e2e
cd e2e
./run.sh
- name: Run stress test (plain)
run: |
cd tests/stress
./continuous_integration
- name: Run stress test (SSL)
run: |
cd tests/stress
./continuous_integration --use-ssl
- name: Run stress test (large)
run: |
cd tests/stress
./continuous_integration --large-dataset
- name: Run durability test (plain)
run: |
cd tests/stress
source ve3/bin/activate
python3 durability --num-steps 5
- name: Run durability test (large)
run: |
cd tests/stress
source ve3/bin/activate
python3 durability --num-steps 20

View File

@ -53,7 +53,10 @@ set_target_properties(memgraph PROPERTIES
OUTPUT_NAME "memgraph-${MEMGRAPH_VERSION}_${CMAKE_BUILD_TYPE}" OUTPUT_NAME "memgraph-${MEMGRAPH_VERSION}_${CMAKE_BUILD_TYPE}"
# Output the executable in main binary dir. # Output the executable in main binary dir.
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
POSITION_INDEPENDENT_CODE ON
)
# Create symlink to the built executable. # Create symlink to the built executable.
add_custom_command(TARGET memgraph POST_BUILD add_custom_command(TARGET memgraph POST_BUILD

View File

@ -66,6 +66,9 @@ DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed i
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).", DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).",
FLAG_IN_RANGE(1, 24UL * 3600)); FLAG_IN_RANGE(1, 24UL * 3600));
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_uint64(storage_python_gc_cycle_sec, 180,
"Storage python full garbage collection interval (in seconds).", FLAG_IN_RANGE(1, 24UL * 3600));
// NOTE: The `storage_properties_on_edges` flag must be the same here and in // NOTE: The `storage_properties_on_edges` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well. // `mg_import_csv`. If you change it, make sure to change it there as well.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -52,6 +52,8 @@ DECLARE_bool(allow_load_csv);
// Storage flags. // Storage flags.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint64(storage_gc_cycle_sec); DECLARE_uint64(storage_gc_cycle_sec);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint64(storage_python_gc_cycle_sec);
// NOTE: The `storage_properties_on_edges` flag must be the same here and in // NOTE: The `storage_properties_on_edges` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well. // `mg_import_csv`. If you change it, make sure to change it there as well.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -163,7 +163,7 @@ int main(int argc, char **argv) {
// libstd. // libstd.
auto gil = memgraph::py::EnsureGIL(); auto gil = memgraph::py::EnsureGIL();
// NOLINTNEXTLINE(hicpp-signed-bitwise) // NOLINTNEXTLINE(hicpp-signed-bitwise)
auto *flag = PyLong_FromLong(RTLD_NOW | RTLD_DEEPBIND); auto *flag = PyLong_FromLong(RTLD_NOW);
auto *setdl = PySys_GetObject("setdlopenflags"); auto *setdl = PySys_GetObject("setdlopenflags");
MG_ASSERT(setdl); MG_ASSERT(setdl);
auto *arg = PyTuple_New(1); auto *arg = PyTuple_New(1);
@ -184,6 +184,10 @@ int main(int argc, char **argv) {
"https://memgr.ph/python")); "https://memgr.ph/python"));
} }
memgraph::utils::Scheduler python_gc_scheduler;
python_gc_scheduler.Run("Python GC", std::chrono::seconds(FLAGS_storage_python_gc_cycle_sec),
[] { memgraph::query::procedure::PyCollectGarbage(); });
// Initialize the communication library. // Initialize the communication library.
memgraph::communication::SSLInit sslInit; memgraph::communication::SSLInit sslInit;
@ -318,6 +322,11 @@ int main(int argc, char **argv) {
.durability_directory = FLAGS_data_directory + "/rocksdb_durability", .durability_directory = FLAGS_data_directory + "/rocksdb_durability",
.wal_directory = FLAGS_data_directory + "/rocksdb_wal"}, .wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
.storage_mode = memgraph::flags::ParseStorageMode()}; .storage_mode = memgraph::flags::ParseStorageMode()};
memgraph::utils::Scheduler jemalloc_purge_scheduler;
jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec),
[] { memgraph::memory::PurgeUnusedMemory(); });
if (FLAGS_storage_snapshot_interval_sec == 0) { if (FLAGS_storage_snapshot_interval_sec == 0) {
if (FLAGS_storage_wal_enabled) { if (FLAGS_storage_wal_enabled) {
LOG_FATAL( LOG_FATAL(

View File

@ -169,7 +169,7 @@ class ModuleRegistry final {
// mentioned library will be first performed in the already existing binded // mentioned library will be first performed in the already existing binded
// libraries and then the global namespace. // libraries and then the global namespace.
// RTLD_DEEPBIND => https://linux.die.net/man/3/dlopen // RTLD_DEEPBIND => https://linux.die.net/man/3/dlopen
SharedLibraryHandle libstd_handle{"libstdc++.so.6", RTLD_NOW | RTLD_LOCAL | RTLD_DEEPBIND, kLibstdcppWarning}; SharedLibraryHandle libstd_handle{"libstdc++.so.6", RTLD_NOW | RTLD_LOCAL, kLibstdcppWarning};
#endif #endif
std::vector<std::filesystem::path> modules_dirs_; std::vector<std::filesystem::path> modules_dirs_;
std::filesystem::path internal_module_dir_; std::filesystem::path internal_module_dir_;

View File

@ -13,6 +13,7 @@
#include <datetime.h> #include <datetime.h>
#include <methodobject.h> #include <methodobject.h>
#include <objimpl.h>
#include <pyerrors.h> #include <pyerrors.h>
#include <array> #include <array>
#include <optional> #include <optional>
@ -57,7 +58,6 @@ PyObject *gMgpValueConversionError{nullptr}; // NOLINT(cppcoreguidelines-avo
PyObject *gMgpSerializationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) PyObject *gMgpSerializationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
PyObject *gMgpAuthorizationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) PyObject *gMgpAuthorizationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
constexpr bool kStartGarbageCollection{true};
constexpr auto kMicrosecondsInMillisecond{1000}; constexpr auto kMicrosecondsInMillisecond{1000};
constexpr auto kMicrosecondsInSecond{1000000}; constexpr auto kMicrosecondsInSecond{1000000};
@ -867,6 +867,24 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) {
return MgpListToPyTuple(list, reinterpret_cast<PyGraph *>(py_graph)); return MgpListToPyTuple(list, reinterpret_cast<PyGraph *>(py_graph));
} }
void PyCollectGarbage() {
if (!Py_IsInitialized() || _Py_IsFinalizing()) {
// Calling EnsureGIL will crash the program if this is true.
return;
}
auto gil = py::EnsureGIL();
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
}
namespace { namespace {
struct RecordFieldCache { struct RecordFieldCache {
PyObject *key; PyObject *key;
@ -1027,24 +1045,8 @@ std::optional<py::ExceptionInfo> AddMultipleBatchRecordsFromPython(mgp_result *r
return std::nullopt; return std::nullopt;
} }
std::function<void()> PyObjectCleanup(py::Object &py_object, bool start_gc) { std::function<void()> PyObjectCleanup(py::Object &py_object) {
return [py_object, start_gc]() { return [py_object]() {
if (start_gc) {
// Run `gc.collect` (reference cycle-detection) explicitly, so that we are
// sure the procedure cleaned up everything it held references to. If the
// user stored a reference to one of our `_mgp` instances then the
// internally used `mgp_*` structs will stay unfreed and a memory leak
// will be reported at the end of the query execution.
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
}
// After making sure all references from our side have been cleared, // After making sure all references from our side have been cleared,
// invalidate the `_mgp.Graph` object. If the user kept a reference to one // invalidate the `_mgp.Graph` object. If the user kept a reference to one
// of our `_mgp` instances then this will prevent them from using those // of our `_mgp` instances then this will prevent them from using those
@ -1095,7 +1097,7 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
std::optional<std::string> maybe_msg; std::optional<std::string> maybe_msg;
{ {
py::Object py_graph(MakePyGraph(graph, memory)); py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, !is_batched)); utils::OnScopeExit clean_up(PyObjectCleanup(py_graph));
if (py_graph) { if (py_graph) {
maybe_msg = error_to_msg(call(py_graph)); maybe_msg = error_to_msg(call(py_graph));
} else { } else {
@ -1110,22 +1112,11 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
void CallPythonCleanup(const py::Object &py_cleanup) { void CallPythonCleanup(const py::Object &py_cleanup) {
auto gil = py::EnsureGIL(); auto gil = py::EnsureGIL();
auto py_res = py_cleanup.Call(); auto py_res = py_cleanup.Call();
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
} }
void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp_graph *graph, mgp_memory *memory) { void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp_graph *graph, mgp_memory *memory) {
auto gil = py::EnsureGIL(); auto gil = py::EnsureGIL();
auto error_to_msg = [](const std::optional<py::ExceptionInfo> &exc_info) -> std::optional<std::string> { auto error_to_msg = [](const std::optional<py::ExceptionInfo> &exc_info) -> std::optional<std::string> {
if (!exc_info) return std::nullopt; if (!exc_info) return std::nullopt;
// Here we tell the traceback formatter to skip the first line of the // Here we tell the traceback formatter to skip the first line of the
@ -1146,7 +1137,7 @@ void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp
std::optional<std::string> maybe_msg; std::optional<std::string> maybe_msg;
{ {
py::Object py_graph(MakePyGraph(graph, memory)); py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, !kStartGarbageCollection)); utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph));
if (py_graph) { if (py_graph) {
maybe_msg = error_to_msg(call(py_graph)); maybe_msg = error_to_msg(call(py_graph));
} else { } else {
@ -1194,8 +1185,8 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g
py::Object py_graph(MakePyGraph(graph, memory)); py::Object py_graph(MakePyGraph(graph, memory));
py::Object py_messages(MakePyMessages(msgs, memory)); py::Object py_messages(MakePyMessages(msgs, memory));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, kStartGarbageCollection)); utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph));
utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages, kStartGarbageCollection)); utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages));
if (py_graph && py_messages) { if (py_graph && py_messages) {
maybe_msg = error_to_msg(call(py_graph, py_messages)); maybe_msg = error_to_msg(call(py_graph, py_messages));
@ -1264,7 +1255,7 @@ void CallPythonFunction(const py::Object &py_cb, mgp_list *args, mgp_graph *grap
std::optional<std::string> maybe_msg; std::optional<std::string> maybe_msg;
{ {
py::Object py_graph(MakePyGraph(graph, memory)); py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, kStartGarbageCollection)); utils::OnScopeExit clean_up(PyObjectCleanup(py_graph));
if (py_graph) { if (py_graph) {
auto maybe_result = call(py_graph); auto maybe_result = call(py_graph);
if (!maybe_result.HasError()) { if (!maybe_result.HasError()) {

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -79,4 +79,7 @@ py::Object ImportPyModule(const char *, mgp_module *);
/// Return nullptr and set appropriate Python exception on failure. /// Return nullptr and set appropriate Python exception on failure.
py::Object ReloadPyModule(PyObject *, mgp_module *); py::Object ReloadPyModule(PyObject *, mgp_module *);
/// Call full python circular reference garbage collection (all generations)
void PyCollectGarbage();
} // namespace memgraph::query::procedure } // namespace memgraph::query::procedure

View File

@ -21,6 +21,7 @@
#include "storage/v2/result.hpp" #include "storage/v2/result.hpp"
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "storage/v2/vertex_accessor.hpp" #include "storage/v2/vertex_accessor.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/memory_tracker.hpp" #include "utils/memory_tracker.hpp"
namespace memgraph::storage { namespace memgraph::storage {
@ -126,24 +127,28 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR; if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
using ReturnType = decltype(edge_.ptr->properties.GetProperty(property));
auto current_value = edge_.ptr->properties.GetProperty(property); std::optional<ReturnType> current_value;
// We could skip setting the value if the previous one is the same to the new utils::AtomicMemoryBlock atomic_memory_block{
// one. This would save some memory as a delta would not be created as well as [&current_value, &property, &value, transaction = transaction_, edge = edge_]() {
// avoid copying the value. The reason we are not doing that is because the current_value.emplace(edge.ptr->properties.GetProperty(property));
// current code always follows the logical pattern of "create a delta" and // We could skip setting the value if the previous one is the same to the new
// "modify in-place". Additionally, the created delta will make other // one. This would save some memory as a delta would not be created as well as
// transactions get a SERIALIZATION_ERROR. // avoid copying the value. The reason we are not doing that is because the
// current code always follows the logical pattern of "create a delta" and
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value); // "modify in-place". Additionally, the created delta will make other
edge_.ptr->properties.SetProperty(property, value); // transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction, edge.ptr, Delta::SetPropertyTag(), property, *current_value);
edge.ptr->properties.SetProperty(property, value);
}};
std::invoke(atomic_memory_block);
if (transaction_->IsDiskStorage()) { if (transaction_->IsDiskStorage()) {
ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_); ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_);
transaction_->AddModifiedEdge(Gid(), modified_edge); transaction_->AddModifiedEdge(Gid(), modified_edge);
} }
return std::move(current_value); return std::move(*current_value);
} }
Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties) { Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties) {
@ -157,9 +162,12 @@ Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, st
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
if (!edge_.ptr->properties.InitProperties(properties)) return false; if (!edge_.ptr->properties.InitProperties(properties)) return false;
for (const auto &[property, _] : properties) { utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue()); for (const auto &[property, _] : properties) {
} CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue());
}
}};
std::invoke(atomic_memory_block);
return true; return true;
} }
@ -175,13 +183,18 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> EdgeAc
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
auto id_old_new_change = edge_.ptr->properties.UpdateProperties(properties); using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties));
std::optional<ReturnType> id_old_new_change;
utils::AtomicMemoryBlock atomic_memory_block{
[transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change]() {
id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties));
for (auto &[property, old_value, new_value] : *id_old_new_change) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
}
}};
std::invoke(atomic_memory_block);
for (auto &[property, old_value, new_value] : id_old_new_change) { return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{};
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
}
return id_old_new_change;
} }
Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() { Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
@ -193,14 +206,19 @@ Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
auto properties = edge_.ptr->properties.Properties(); using ReturnType = decltype(edge_.ptr->properties.Properties());
for (const auto &property : properties) { std::optional<ReturnType> properties;
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second); utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() {
} properties.emplace(edge_.ptr->properties.Properties());
for (const auto &property : *properties) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second);
}
edge_.ptr->properties.ClearProperties(); edge_.ptr->properties.ClearProperties();
}};
std::invoke(atomic_memory_block);
return std::move(properties); return properties.has_value() ? std::move(properties.value()) : ReturnType{};
} }
Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view) const { Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view) const {

View File

@ -26,6 +26,7 @@
#include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/inmemory/replication/recovery.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp" #include "storage/v2/inmemory/unique_constraints.hpp"
#include "storage/v2/property_value.hpp" #include "storage/v2/property_value.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/resource_lock.hpp" #include "utils/resource_lock.hpp"
#include "utils/stat.hpp" #include "utils/stat.hpp"
@ -144,7 +145,6 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
gc_runner_.Run("Storage GC", config_.gc.interval, [this] { gc_runner_.Run("Storage GC", config_.gc.interval, [this] {
this->FreeMemory(std::unique_lock<utils::ResourceLock>{main_lock_, std::defer_lock}); this->FreeMemory(std::unique_lock<utils::ResourceLock>{main_lock_, std::defer_lock});
}); });
gc_jemalloc_runner_.Run("Jemalloc GC", config_.gc.interval, [] { memory::PurgeUnusedMemory(); });
} }
if (timestamp_ == kTimestampInitialId) { if (timestamp_ == kTimestampInitialId) {
commit_log_.emplace(); commit_log_.emplace();
@ -158,7 +158,6 @@ InMemoryStorage::InMemoryStorage(Config config) : InMemoryStorage(config, Storag
InMemoryStorage::~InMemoryStorage() { InMemoryStorage::~InMemoryStorage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) { if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop(); gc_runner_.Stop();
gc_jemalloc_runner_.Stop();
} }
{ {
// Stop replication (Stop all clients or stop the REPLICA server) // Stop replication (Stop all clients or stop the REPLICA server)
@ -338,18 +337,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
delta->prev.Set(&*it); delta->prev.Set(&*it);
} }
} }
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); // Increment edge count.
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
}};
// Increment edge count. std::invoke(atomic_memory_block);
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
} }
@ -433,18 +436,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
delta->prev.Set(&*it); delta->prev.Set(&*it);
} }
} }
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); // Increment edge count.
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
}};
// Increment edge count. std::invoke(atomic_memory_block);
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
} }
@ -534,18 +541,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor
return Error::DELETED_OBJECT; return Error::DELETED_OBJECT;
} }
} }
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge_ref, old_from_vertex, new_from_vertex, edge_type, to_vertex]() {
CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref); new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref);
to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref); transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT);
new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref); transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref); transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref); }};
transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT); std::invoke(atomic_memory_block);
transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, storage_, &transaction_);
} }
@ -636,17 +647,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor *
} }
} }
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref); utils::AtomicMemoryBlock atomic_memory_block{
CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); [this, edge_ref, old_to_vertex, from_vertex, edge_type, new_to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref); CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref);
from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref); from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref); CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref);
new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref); new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN); transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN); transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN);
}};
std::invoke(atomic_memory_block);
return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, storage_, &transaction_); return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, storage_, &transaction_);
} }
@ -709,17 +725,21 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeChangeType(EdgeAcces
MG_ASSERT((op1 && op2), "Invalid database state!"); MG_ASSERT((op1 && op2), "Invalid database state!");
// "deleting" old edge utils::AtomicMemoryBlock atomic_memory_block{[this, to_vertex, new_edge_type, edge_ref, from_vertex, edge_type]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); // "deleting" old edge
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
// "adding" new edge // "adding" new edge
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref); CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref); CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref);
// edge type is not used while invalidating cache so we can only call it once // edge type is not used while invalidating cache so we can only call it once
transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT); transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN); transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN);
}};
std::invoke(atomic_memory_block);
return EdgeAccessor(edge_ref, new_edge_type, from_vertex, to_vertex, storage_, &transaction_); return EdgeAccessor(edge_ref, new_edge_type, from_vertex, to_vertex, storage_, &transaction_);
} }

View File

@ -427,7 +427,6 @@ class InMemoryStorage final : public Storage {
std::optional<CommitLog> commit_log_; std::optional<CommitLog> commit_log_;
utils::Scheduler gc_runner_; utils::Scheduler gc_runner_;
utils::Scheduler gc_jemalloc_runner_;
std::mutex gc_lock_; std::mutex gc_lock_;
using BondPmrLd = Bond<utils::pmr::list<Delta>>; using BondPmrLd = Bond<utils::pmr::list<Delta>>;

View File

@ -17,6 +17,7 @@
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "storage/v2/transaction.hpp" #include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp" #include "storage/v2/vertex_accessor.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/event_counter.hpp" #include "utils/event_counter.hpp"
#include "utils/event_histogram.hpp" #include "utils/event_histogram.hpp"
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
@ -390,22 +391,29 @@ Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::ClearEdgesOn
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
attached_edges_to_vertex->pop_back(); // MarkEdgeAsDeleted allocates additional memory
if (storage_->config_.items.properties_on_edges) { // and CreateAndLinkDelta needs memory
auto *edge_ptr = edge_ref.ptr; utils::AtomicMemoryBlock atomic_memory_block{[&attached_edges_to_vertex, &deleted_edge_ids, &reverse_vertex_order,
MarkEdgeAsDeleted(edge_ptr); &vertex_ptr, &deleted_edges, deletion_delta = deletion_delta,
} edge_type = edge_type, opposing_vertex = opposing_vertex,
edge_ref = edge_ref, this]() {
attached_edges_to_vertex->pop_back();
if (this->storage_->config_.items.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
MarkEdgeAsDeleted(edge_ptr);
}
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid); auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted; bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) { if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
} }
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); }};
std::invoke(atomic_memory_block);
} }
return std::make_optional<ReturnType>(); return std::make_optional<ReturnType>();
@ -449,30 +457,37 @@ Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::DetachRemain
return !set_for_erasure.contains(edge_gid); return !set_for_erasure.contains(edge_gid);
}); });
for (auto it = mid; it != edges_attached_to_vertex->end(); it++) { // Creating deltas and erasing edge only at the end -> we might have incomplete state as
auto const &[edge_type, opposing_vertex, edge_ref] = *it; // delta might cause OOM, so we don't remove edges from edges_attached_to_vertex
std::unique_lock<utils::RWSpinLock> guard; utils::AtomicMemoryBlock atomic_memory_block{[&mid, &edges_attached_to_vertex, &deleted_edges,
if (storage_->config_.items.properties_on_edges) { &partially_detached_edge_ids, this, vertex_ptr, deletion_delta,
auto edge_ptr = edge_ref.ptr; reverse_vertex_order]() {
guard = std::unique_lock{edge_ptr->lock}; for (auto it = mid; it != edges_attached_to_vertex->end(); it++) {
// this can happen only if we marked edges for deletion with no nodes, auto const &[edge_type, opposing_vertex, edge_ref] = *it;
// so the method detaching nodes will not do anything std::unique_lock<utils::RWSpinLock> guard;
MarkEdgeAsDeleted(edge_ptr); if (storage_->config_.items.properties_on_edges) {
auto edge_ptr = edge_ref.ptr;
guard = std::unique_lock{edge_ptr->lock};
// this can happen only if we marked edges for deletion with no nodes,
// so the method detaching nodes will not do anything
MarkEdgeAsDeleted(edge_ptr);
}
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
}
} }
edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end());
}};
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); std::invoke(atomic_memory_block);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
}
}
edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end());
return std::make_optional<ReturnType>(); return std::make_optional<ReturnType>();
}; };

View File

@ -26,6 +26,7 @@
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
#include "storage/v2/vertex_info_cache.hpp" #include "storage/v2/vertex_info_cache.hpp"
#include "storage/v2/vertex_info_helpers.hpp" #include "storage/v2/vertex_info_helpers.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/logging.hpp" #include "utils/logging.hpp"
#include "utils/memory_tracker.hpp" #include "utils/memory_tracker.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
@ -107,8 +108,11 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end()) return false; if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end()) return false;
CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label); utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label]() {
vertex_->labels.push_back(label); CreateAndLinkDelta(transaction, vertex, Delta::RemoveLabelTag(), label);
vertex->labels.push_back(label);
}};
std::invoke(atomic_memory_block);
if (storage_->config_.items.enable_schema_metadata) { if (storage_->config_.items.enable_schema_metadata) {
storage_->stored_node_labels_.try_insert(label); storage_->stored_node_labels_.try_insert(label);
@ -136,9 +140,12 @@ Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label); auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label);
if (it == vertex_->labels.end()) return false; if (it == vertex_->labels.end()) return false;
CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label); utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label, &it]() {
*it = vertex_->labels.back(); CreateAndLinkDelta(transaction, vertex, Delta::AddLabelTag(), label);
vertex_->labels.pop_back(); *it = vertex->labels.back();
vertex->labels.pop_back();
}};
std::invoke(atomic_memory_block);
/// TODO: some by pointers, some by reference => not good, make it better /// TODO: some by pointers, some by reference => not good, make it better
storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp); storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp);
@ -262,8 +269,12 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
// "modify in-place". Additionally, the created delta will make other // "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR. // transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value); utils::AtomicMemoryBlock atomic_memory_block{
vertex_->properties.SetProperty(property, value); [transaction = transaction_, vertex = vertex_, &value, &property, &current_value]() {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value);
vertex->properties.SetProperty(property, value);
}};
std::invoke(atomic_memory_block);
if (!value.IsNull()) { if (!value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_); transaction_->constraint_verification_info.AddedProperty(vertex_);
@ -287,20 +298,28 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR; if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
bool result{false};
utils::AtomicMemoryBlock atomic_memory_block{
[&result, &properties, storage = storage_, transaction = transaction_, vertex = vertex_]() {
if (!vertex->properties.InitProperties(properties)) {
result = false;
return;
}
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, PropertyValue());
storage->indices_.UpdateOnSetProperty(property, value, vertex, *transaction);
transaction->manyDeltasCache.Invalidate(vertex, property);
if (!value.IsNull()) {
transaction->constraint_verification_info.AddedProperty(vertex);
} else {
transaction->constraint_verification_info.RemovedProperty(vertex);
}
}
result = true;
}};
std::invoke(atomic_memory_block);
if (!vertex_->properties.InitProperties(properties)) return false; return result;
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, PropertyValue());
storage_->indices_.UpdateOnSetProperty(property, value, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, property);
if (!value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_);
} else {
transaction_->constraint_verification_info.RemovedProperty(vertex_);
}
}
return true;
} }
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> VertexAccessor::UpdateProperties( Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> VertexAccessor::UpdateProperties(
@ -316,20 +335,28 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
auto id_old_new_change = vertex_->properties.UpdateProperties(properties); using ReturnType = decltype(vertex_->properties.UpdateProperties(properties));
std::optional<ReturnType> id_old_new_change;
utils::AtomicMemoryBlock atomic_memory_block{
[storage = storage_, transaction = transaction_, vertex = vertex_, &properties, &id_old_new_change]() {
id_old_new_change.emplace(vertex->properties.UpdateProperties(properties));
if (!id_old_new_change.has_value()) {
return;
}
for (auto &[id, old_value, new_value] : *id_old_new_change) {
storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction);
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value));
transaction->manyDeltasCache.Invalidate(vertex, id);
if (!new_value.IsNull()) {
transaction->constraint_verification_info.AddedProperty(vertex);
} else {
transaction->constraint_verification_info.RemovedProperty(vertex);
}
}
}};
std::invoke(atomic_memory_block);
for (auto &[id, old_value, new_value] : id_old_new_change) { return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{};
storage_->indices_.UpdateOnSetProperty(id, new_value, vertex_, *transaction_);
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), id, std::move(old_value));
transaction_->manyDeltasCache.Invalidate(vertex_, id);
if (!new_value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_);
} else {
transaction_->constraint_verification_info.RemovedProperty(vertex_);
}
}
return id_old_new_change;
} }
Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() { Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
@ -342,17 +369,25 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
auto properties = vertex_->properties.Properties(); using ReturnType = decltype(vertex_->properties.Properties());
for (const auto &[property, value] : properties) { std::optional<ReturnType> properties;
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, value); utils::AtomicMemoryBlock atomic_memory_block{
storage_->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex_, *transaction_); [storage = storage_, transaction = transaction_, vertex = vertex_, &properties]() {
transaction_->constraint_verification_info.RemovedProperty(vertex_); properties.emplace(vertex->properties.Properties());
transaction_->manyDeltasCache.Invalidate(vertex_, property); if (!properties.has_value()) {
} return;
}
for (const auto &[property, value] : *properties) {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, value);
storage->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex, *transaction);
transaction->constraint_verification_info.RemovedProperty(vertex);
transaction->manyDeltasCache.Invalidate(vertex, property);
}
vertex->properties.ClearProperties();
}};
std::invoke(atomic_memory_block);
vertex_->properties.ClearProperties(); return properties.has_value() ? std::move(properties.value()) : ReturnType{};
return std::move(properties);
} }
Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view) const { Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view) const {

View File

@ -0,0 +1,44 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <functional>
#include "utils/memory_tracker.hpp"
namespace memgraph::utils {
// Calls a function with out of memory exception blocker, checks memory allocation after block execution.
// Use it in case you need block which will be executed atomically considering memory execution
// but will check after block is executed if OOM exceptions needs to be thrown
template <typename Callable>
class [[nodiscard]] AtomicMemoryBlock {
public:
explicit AtomicMemoryBlock(Callable &&function) : function_{std::forward<Callable>(function)} {}
AtomicMemoryBlock(AtomicMemoryBlock const &) = delete;
AtomicMemoryBlock(AtomicMemoryBlock &&) = delete;
AtomicMemoryBlock &operator=(AtomicMemoryBlock const &) = delete;
AtomicMemoryBlock &operator=(AtomicMemoryBlock &&) = delete;
~AtomicMemoryBlock() = default;
void operator()() {
{
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker;
function_();
}
total_memory_tracker.DoCheck();
}
private:
Callable function_;
};
} // namespace memgraph::utils

View File

@ -124,6 +124,19 @@ void MemoryTracker::Alloc(const int64_t size) {
UpdatePeak(will_be); UpdatePeak(will_be);
} }
void MemoryTracker::DoCheck() {
const auto current_hard_limit = hard_limit_.load(std::memory_order_relaxed);
const auto current_amount = amount_.load(std::memory_order_relaxed);
if (current_hard_limit && current_amount > current_hard_limit && MemoryTrackerCanThrow()) [[unlikely]] {
MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker;
throw OutOfMemoryException(
fmt::format("Memory limit exceeded! Current "
"use is {}, while the maximum allowed size for allocation is set to {}.",
GetReadableSize(static_cast<double>(current_amount)),
GetReadableSize(static_cast<double>(current_hard_limit))));
}
}
void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); } void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); }
} // namespace memgraph::utils } // namespace memgraph::utils

View File

@ -49,6 +49,7 @@ class MemoryTracker final {
void Alloc(int64_t size); void Alloc(int64_t size);
void Free(int64_t size); void Free(int64_t size);
void DoCheck();
auto Amount() const { return amount_.load(std::memory_order_relaxed); } auto Amount() const { return amount_.load(std::memory_order_relaxed); }

View File

@ -143,6 +143,7 @@ startup_config_dict = {
"The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.", "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.",
), ),
"storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."), "storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."),
"storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."),
"storage_items_per_batch": ( "storage_items_per_batch": (
"1000000", "1000000",
"1000000", "1000000",

View File

@ -0,0 +1,22 @@
version: '3'
services:
zookeeper:
image: 'bitnami/zookeeper:3.9.1'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:3.6.1'
ports:
- '9092:9092'
- '29092:29092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper

View File

@ -0,0 +1,9 @@
version: "3"
services:
pulsar:
image: 'apachepulsar/pulsar:2.8.1'
ports:
- '6652:8080'
- '6650:6650'
entrypoint: [ 'bin/pulsar', 'standalone' ]

View File

@ -173,39 +173,49 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
""" """
This writer creates lot of nodes on each write. This writer creates lot of nodes on each write.
Also it checks that query failed if memory limit is tried to be broken Also it checks that query failed if memory limit is tried to be broken
Return:
True if write suceeded
False otherwise
""" """
session = SessionCache.argument_session(args) session = SessionCache.argument_session(args)
def create() -> bool: def try_create() -> bool:
""" """
Returns True if done, False if needs to continue Function tries to create until memory limit is reached
Return:
True if it can continue creating (OOM not reached)
False otherwise
""" """
memory_tracker_data_before_start = get_tracker_data(session) should_continue = True
should_fail = memory_tracker_data_before_start >= 2048
failed = False
try: try:
try_execute( try_execute(
session, session,
f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))", f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))",
) )
except Exception as ex: except Exception as ex:
failed = True
output = str(ex) output = str(ex)
log.info("Exception in create", output) memory_over_2048_mb = False
assert "Memory limit exceeded!" in output memory_tracker_data_after_start = get_tracker_data(session)
if memory_tracker_data_after_start:
memory_over_2048_mb = memory_tracker_data_after_start >= 2048
log.info(
"Exception in create, exception output:",
output,
f"Worker {worker_id} started iteration {curr_repetition}, memory over 2048MB: {memory_over_2048_mb}",
)
has_oom_happend = "Memory limit exceeded!" in output and memory_over_2048_mb
should_continue = not has_oom_happend
if should_fail: return should_continue
assert failed, "Query should have failed"
return False
return True
curr_repetition = 0 curr_repetition = 0
while curr_repetition < repetition_count: while curr_repetition < repetition_count:
log.info(f"Worker {worker_id} started iteration {curr_repetition}") log.info(f"Worker {worker_id} started iteration {curr_repetition}")
should_continue = create() should_continue = try_create()
if not should_continue: if not should_continue:
return True return True
@ -214,6 +224,7 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}") log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}")
curr_repetition += 1 curr_repetition += 1
return False
def execute_function(worker: Worker) -> Worker: def execute_function(worker: Worker) -> Worker: