diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index 340d11711..ba00941b8 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -229,10 +229,6 @@ jobs: # branches and tags. (default: 1) fetch-depth: 0 - - name: Check e2e service dependencies - run: | - cd tests/e2e - ./dependency_check.sh - name: Build release binaries run: | @@ -271,6 +267,13 @@ jobs: cd build ctest -R memgraph__unit --output-on-failure -j$THREADS + - name: Ensure Kafka and Pulsar are up + run: | + cd tests/e2e/streams/kafka + docker-compose up -d + cd ../pulsar + docker-compose up -d + - name: Run e2e tests run: | cd tests @@ -279,6 +282,14 @@ jobs: cd e2e ./run.sh + - name: Ensure Kafka and Pulsar are down + if: always() + run: | + cd tests/e2e/streams/kafka + docker-compose down + cd ../pulsar + docker-compose down + - name: Run stress test (plain) run: | cd tests/stress diff --git a/.github/workflows/release_centos8.yaml b/.github/workflows/release_centos8.yaml deleted file mode 100644 index c82916df6..000000000 --- a/.github/workflows/release_centos8.yaml +++ /dev/null @@ -1,318 +0,0 @@ -name: Release CentOS 8 - -on: - workflow_dispatch: - inputs: - build_type: - type: choice - description: "Memgraph Build type. Default value is Release." - default: 'Release' - options: - - Release - - RelWithDebInfo - - schedule: - - cron: "0 22 * * *" - -jobs: - community_build: - name: "Community build" - runs-on: [self-hosted, Linux, X64, CentOS8] - env: - THREADS: 24 - MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }} - MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }} - timeout-minutes: 960 - - steps: - - name: Set up repository - uses: actions/checkout@v3 - with: - # Number of commits to fetch. `0` indicates all history for all - # branches and tags. (default: 1) - fetch-depth: 0 - - - name: Build community binaries - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Initialize dependencies. - ./init - - # Set default build_type to Release - INPUT_BUILD_TYPE=${{ github.event.inputs.build_type }} - BUILD_TYPE=${INPUT_BUILD_TYPE:-"Release"} - - # Build community binaries. - cd build - cmake -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DMG_ENTERPRISE=OFF .. - make -j$THREADS - - - name: Run unit tests - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Run unit tests. - cd build - ctest -R memgraph__unit --output-on-failure - - coverage_build: - name: "Coverage build" - runs-on: [self-hosted, Linux, X64, CentOS8] - env: - THREADS: 24 - MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }} - MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }} - - steps: - - name: Set up repository - uses: actions/checkout@v3 - with: - # Number of commits to fetch. `0` indicates all history for all - # branches and tags. (default: 1) - fetch-depth: 0 - - - name: Build coverage binaries - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Initialize dependencies. - ./init - - # Build coverage binaries. - cd build - cmake -DTEST_COVERAGE=ON .. - make -j$THREADS memgraph__unit - - - name: Run unit tests - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Run unit tests. - cd build - ctest -R memgraph__unit --output-on-failure - - - name: Compute code coverage - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Compute code coverage. - cd tools/github - ./coverage_convert - - # Package code coverage. - cd generated - tar -czf code_coverage.tar.gz coverage.json html report.json summary.rmu - - - name: Save code coverage - uses: actions/upload-artifact@v3 - with: - name: "Code coverage" - path: tools/github/generated/code_coverage.tar.gz - - debug_build: - name: "Debug build" - runs-on: [self-hosted, Linux, X64, CentOS8] - env: - THREADS: 24 - MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }} - MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }} - - steps: - - name: Set up repository - uses: actions/checkout@v3 - with: - # Number of commits to fetch. `0` indicates all history for all - # branches and tags. (default: 1) - fetch-depth: 0 - - - name: Build debug binaries - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Initialize dependencies. - ./init - - # Build debug binaries. - cd build - cmake .. - make -j$THREADS - - - name: Run leftover CTest tests - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Run leftover CTest tests (all except unit and benchmark tests). - cd build - ctest -E "(memgraph__unit|memgraph__benchmark)" --output-on-failure - - - name: Run drivers tests - run: | - ./tests/drivers/run.sh - - - name: Run integration tests - run: | - tests/integration/run.sh - - - name: Run cppcheck and clang-format - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Run cppcheck and clang-format. - cd tools/github - ./cppcheck_and_clang_format diff - - - name: Save cppcheck and clang-format errors - uses: actions/upload-artifact@v3 - with: - name: "Code coverage" - path: tools/github/cppcheck_and_clang_format.txt - - release_build: - name: "Release build" - runs-on: [self-hosted, Linux, X64, CentOS8] - env: - THREADS: 24 - MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }} - MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }} - timeout-minutes: 960 - - steps: - - name: Set up repository - uses: actions/checkout@v3 - with: - # Number of commits to fetch. `0` indicates all history for all - # branches and tags. (default: 1) - fetch-depth: 0 - - - name: Build release binaries - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Initialize dependencies. - ./init - - # Set default build_type to Release - INPUT_BUILD_TYPE=${{ github.event.inputs.build_type }} - BUILD_TYPE=${INPUT_BUILD_TYPE:-"Release"} - - # Build release binaries. - cd build - cmake -DCMAKE_BUILD_TYPE=$BUILD_TYPE .. - make -j$THREADS - - - name: Create enterprise RPM package - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - cd build - - # create mgconsole - # we use the -B to force the build - make -j$THREADS -B mgconsole - - # Create enterprise RPM package. - mkdir output && cd output - cpack -G RPM --config ../CPackConfig.cmake - rpmlint memgraph*.rpm - - - name: Save enterprise RPM package - uses: actions/upload-artifact@v3 - with: - name: "Enterprise RPM package" - path: build/output/memgraph*.rpm - - - name: Run micro benchmark tests - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Run micro benchmark tests. - cd build - # The `eval` benchmark needs a large stack limit. - ulimit -s 262144 - ctest -R memgraph__benchmark -V - - - name: Run macro benchmark tests - run: | - cd tests/macro_benchmark - ./harness QuerySuite MemgraphRunner \ - --groups aggregation 1000_create unwind_create dense_expand match \ - --no-strict - - - name: Run parallel macro benchmark tests - run: | - cd tests/macro_benchmark - ./harness QueryParallelSuite MemgraphRunner \ - --groups aggregation_parallel create_parallel bfs_parallel \ - --num-database-workers 9 --num-clients-workers 30 \ - --no-strict - - - name: Run GQL Behave tests - run: | - cd tests - ./setup.sh /opt/toolchain-v4/activate - cd gql_behave - ./continuous_integration - - - name: Save quality assurance status - uses: actions/upload-artifact@v3 - with: - name: "GQL Behave Status" - path: | - tests/gql_behave/gql_behave_status.csv - tests/gql_behave/gql_behave_status.html - - - name: Run unit tests - run: | - # Activate toolchain. - source /opt/toolchain-v4/activate - - # Run unit tests. - cd build - ctest -R memgraph__unit --output-on-failure - - - name: Run e2e tests - run: | - cd tests - ./setup.sh /opt/toolchain-v4/activate - source ve3/bin/activate_e2e - cd e2e - ./run.sh - - - name: Run stress test (plain) - run: | - cd tests/stress - ./continuous_integration - - - name: Run stress test (SSL) - run: | - cd tests/stress - ./continuous_integration --use-ssl - - - name: Run stress test (large) - run: | - cd tests/stress - ./continuous_integration --large-dataset - - - name: Run durability test (plain) - run: | - cd tests/stress - source ve3/bin/activate - python3 durability --num-steps 5 - - - name: Run durability test (large) - run: | - cd tests/stress - source ve3/bin/activate - python3 durability --num-steps 20 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ba8784b19..ab6a8c54d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -53,7 +53,10 @@ set_target_properties(memgraph PROPERTIES OUTPUT_NAME "memgraph-${MEMGRAPH_VERSION}_${CMAKE_BUILD_TYPE}" # Output the executable in main binary dir. - RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR} + + POSITION_INDEPENDENT_CODE ON + ) # Create symlink to the built executable. add_custom_command(TARGET memgraph POST_BUILD diff --git a/src/flags/general.cpp b/src/flags/general.cpp index 6bee2e5b3..fdc301fa2 100644 --- a/src/flags/general.cpp +++ b/src/flags/general.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -66,6 +66,9 @@ DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed i // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).", FLAG_IN_RANGE(1, 24UL * 3600)); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_uint64(storage_python_gc_cycle_sec, 180, + "Storage python full garbage collection interval (in seconds).", FLAG_IN_RANGE(1, 24UL * 3600)); // NOTE: The `storage_properties_on_edges` flag must be the same here and in // `mg_import_csv`. If you change it, make sure to change it there as well. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/flags/general.hpp b/src/flags/general.hpp index 890f32cd6..7cc353564 100644 --- a/src/flags/general.hpp +++ b/src/flags/general.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -52,6 +52,8 @@ DECLARE_bool(allow_load_csv); // Storage flags. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_uint64(storage_gc_cycle_sec); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_python_gc_cycle_sec); // NOTE: The `storage_properties_on_edges` flag must be the same here and in // `mg_import_csv`. If you change it, make sure to change it there as well. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 057b30982..34ada7c47 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -163,7 +163,7 @@ int main(int argc, char **argv) { // libstd. auto gil = memgraph::py::EnsureGIL(); // NOLINTNEXTLINE(hicpp-signed-bitwise) - auto *flag = PyLong_FromLong(RTLD_NOW | RTLD_DEEPBIND); + auto *flag = PyLong_FromLong(RTLD_NOW); auto *setdl = PySys_GetObject("setdlopenflags"); MG_ASSERT(setdl); auto *arg = PyTuple_New(1); @@ -184,6 +184,10 @@ int main(int argc, char **argv) { "https://memgr.ph/python")); } + memgraph::utils::Scheduler python_gc_scheduler; + python_gc_scheduler.Run("Python GC", std::chrono::seconds(FLAGS_storage_python_gc_cycle_sec), + [] { memgraph::query::procedure::PyCollectGarbage(); }); + // Initialize the communication library. memgraph::communication::SSLInit sslInit; @@ -318,6 +322,11 @@ int main(int argc, char **argv) { .durability_directory = FLAGS_data_directory + "/rocksdb_durability", .wal_directory = FLAGS_data_directory + "/rocksdb_wal"}, .storage_mode = memgraph::flags::ParseStorageMode()}; + + memgraph::utils::Scheduler jemalloc_purge_scheduler; + jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec), + [] { memgraph::memory::PurgeUnusedMemory(); }); + if (FLAGS_storage_snapshot_interval_sec == 0) { if (FLAGS_storage_wal_enabled) { LOG_FATAL( diff --git a/src/query/procedure/module.hpp b/src/query/procedure/module.hpp index 8963173e7..f5027dafa 100644 --- a/src/query/procedure/module.hpp +++ b/src/query/procedure/module.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -169,7 +169,7 @@ class ModuleRegistry final { // mentioned library will be first performed in the already existing binded // libraries and then the global namespace. // RTLD_DEEPBIND => https://linux.die.net/man/3/dlopen - SharedLibraryHandle libstd_handle{"libstdc++.so.6", RTLD_NOW | RTLD_LOCAL | RTLD_DEEPBIND, kLibstdcppWarning}; + SharedLibraryHandle libstd_handle{"libstdc++.so.6", RTLD_NOW | RTLD_LOCAL, kLibstdcppWarning}; #endif std::vector modules_dirs_; std::filesystem::path internal_module_dir_; diff --git a/src/query/procedure/py_module.cpp b/src/query/procedure/py_module.cpp index 3fd09b0ab..264dee5ba 100644 --- a/src/query/procedure/py_module.cpp +++ b/src/query/procedure/py_module.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -57,7 +58,6 @@ PyObject *gMgpValueConversionError{nullptr}; // NOLINT(cppcoreguidelines-avo PyObject *gMgpSerializationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) PyObject *gMgpAuthorizationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) -constexpr bool kStartGarbageCollection{true}; constexpr auto kMicrosecondsInMillisecond{1000}; constexpr auto kMicrosecondsInSecond{1000000}; @@ -867,6 +867,24 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) { return MgpListToPyTuple(list, reinterpret_cast(py_graph)); } +void PyCollectGarbage() { + if (!Py_IsInitialized() || _Py_IsFinalizing()) { + // Calling EnsureGIL will crash the program if this is true. + return; + } + + auto gil = py::EnsureGIL(); + + py::Object gc(PyImport_ImportModule("gc")); + if (!gc) { + LOG_FATAL(py::FetchError().value()); + } + + if (!gc.CallMethod("collect")) { + LOG_FATAL(py::FetchError().value()); + } +} + namespace { struct RecordFieldCache { PyObject *key; @@ -1027,24 +1045,8 @@ std::optional AddMultipleBatchRecordsFromPython(mgp_result *r return std::nullopt; } -std::function PyObjectCleanup(py::Object &py_object, bool start_gc) { - return [py_object, start_gc]() { - if (start_gc) { - // Run `gc.collect` (reference cycle-detection) explicitly, so that we are - // sure the procedure cleaned up everything it held references to. If the - // user stored a reference to one of our `_mgp` instances then the - // internally used `mgp_*` structs will stay unfreed and a memory leak - // will be reported at the end of the query execution. - py::Object gc(PyImport_ImportModule("gc")); - if (!gc) { - LOG_FATAL(py::FetchError().value()); - } - - if (!gc.CallMethod("collect")) { - LOG_FATAL(py::FetchError().value()); - } - } - +std::function PyObjectCleanup(py::Object &py_object) { + return [py_object]() { // After making sure all references from our side have been cleared, // invalidate the `_mgp.Graph` object. If the user kept a reference to one // of our `_mgp` instances then this will prevent them from using those @@ -1095,7 +1097,7 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra std::optional maybe_msg; { py::Object py_graph(MakePyGraph(graph, memory)); - utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, !is_batched)); + utils::OnScopeExit clean_up(PyObjectCleanup(py_graph)); if (py_graph) { maybe_msg = error_to_msg(call(py_graph)); } else { @@ -1110,22 +1112,11 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra void CallPythonCleanup(const py::Object &py_cleanup) { auto gil = py::EnsureGIL(); - auto py_res = py_cleanup.Call(); - - py::Object gc(PyImport_ImportModule("gc")); - if (!gc) { - LOG_FATAL(py::FetchError().value()); - } - - if (!gc.CallMethod("collect")) { - LOG_FATAL(py::FetchError().value()); - } } void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp_graph *graph, mgp_memory *memory) { auto gil = py::EnsureGIL(); - auto error_to_msg = [](const std::optional &exc_info) -> std::optional { if (!exc_info) return std::nullopt; // Here we tell the traceback formatter to skip the first line of the @@ -1146,7 +1137,7 @@ void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp std::optional maybe_msg; { py::Object py_graph(MakePyGraph(graph, memory)); - utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, !kStartGarbageCollection)); + utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph)); if (py_graph) { maybe_msg = error_to_msg(call(py_graph)); } else { @@ -1194,8 +1185,8 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g py::Object py_graph(MakePyGraph(graph, memory)); py::Object py_messages(MakePyMessages(msgs, memory)); - utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, kStartGarbageCollection)); - utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages, kStartGarbageCollection)); + utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph)); + utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages)); if (py_graph && py_messages) { maybe_msg = error_to_msg(call(py_graph, py_messages)); @@ -1264,7 +1255,7 @@ void CallPythonFunction(const py::Object &py_cb, mgp_list *args, mgp_graph *grap std::optional maybe_msg; { py::Object py_graph(MakePyGraph(graph, memory)); - utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, kStartGarbageCollection)); + utils::OnScopeExit clean_up(PyObjectCleanup(py_graph)); if (py_graph) { auto maybe_result = call(py_graph); if (!maybe_result.HasError()) { diff --git a/src/query/procedure/py_module.hpp b/src/query/procedure/py_module.hpp index 85ad82b3e..fe93b5c51 100644 --- a/src/query/procedure/py_module.hpp +++ b/src/query/procedure/py_module.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -79,4 +79,7 @@ py::Object ImportPyModule(const char *, mgp_module *); /// Return nullptr and set appropriate Python exception on failure. py::Object ReloadPyModule(PyObject *, mgp_module *); +/// Call full python circular reference garbage collection (all generations) +void PyCollectGarbage(); + } // namespace memgraph::query::procedure diff --git a/src/storage/v2/edge_accessor.cpp b/src/storage/v2/edge_accessor.cpp index 7e7166117..09b035e80 100644 --- a/src/storage/v2/edge_accessor.cpp +++ b/src/storage/v2/edge_accessor.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -21,6 +21,7 @@ #include "storage/v2/result.hpp" #include "storage/v2/storage.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/atomic_memory_block.hpp" #include "utils/memory_tracker.hpp" namespace memgraph::storage { @@ -126,24 +127,28 @@ Result EdgeAccessor::SetProperty(PropertyId property, co if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR; if (edge_.ptr->deleted) return Error::DELETED_OBJECT; - - auto current_value = edge_.ptr->properties.GetProperty(property); - // We could skip setting the value if the previous one is the same to the new - // one. This would save some memory as a delta would not be created as well as - // avoid copying the value. The reason we are not doing that is because the - // current code always follows the logical pattern of "create a delta" and - // "modify in-place". Additionally, the created delta will make other - // transactions get a SERIALIZATION_ERROR. - - CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value); - edge_.ptr->properties.SetProperty(property, value); + using ReturnType = decltype(edge_.ptr->properties.GetProperty(property)); + std::optional current_value; + utils::AtomicMemoryBlock atomic_memory_block{ + [¤t_value, &property, &value, transaction = transaction_, edge = edge_]() { + current_value.emplace(edge.ptr->properties.GetProperty(property)); + // We could skip setting the value if the previous one is the same to the new + // one. This would save some memory as a delta would not be created as well as + // avoid copying the value. The reason we are not doing that is because the + // current code always follows the logical pattern of "create a delta" and + // "modify in-place". Additionally, the created delta will make other + // transactions get a SERIALIZATION_ERROR. + CreateAndLinkDelta(transaction, edge.ptr, Delta::SetPropertyTag(), property, *current_value); + edge.ptr->properties.SetProperty(property, value); + }}; + std::invoke(atomic_memory_block); if (transaction_->IsDiskStorage()) { ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_); transaction_->AddModifiedEdge(Gid(), modified_edge); } - return std::move(current_value); + return std::move(*current_value); } Result EdgeAccessor::InitProperties(const std::map &properties) { @@ -157,9 +162,12 @@ Result EdgeAccessor::InitProperties(const std::mapdeleted) return Error::DELETED_OBJECT; if (!edge_.ptr->properties.InitProperties(properties)) return false; - for (const auto &[property, _] : properties) { - CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue()); - } + utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() { + for (const auto &[property, _] : properties) { + CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue()); + } + }}; + std::invoke(atomic_memory_block); return true; } @@ -175,13 +183,18 @@ Result>> EdgeAc if (edge_.ptr->deleted) return Error::DELETED_OBJECT; - auto id_old_new_change = edge_.ptr->properties.UpdateProperties(properties); + using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties)); + std::optional id_old_new_change; + utils::AtomicMemoryBlock atomic_memory_block{ + [transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change]() { + id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties)); + for (auto &[property, old_value, new_value] : *id_old_new_change) { + CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value)); + } + }}; + std::invoke(atomic_memory_block); - for (auto &[property, old_value, new_value] : id_old_new_change) { - CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value)); - } - - return id_old_new_change; + return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{}; } Result> EdgeAccessor::ClearProperties() { @@ -193,14 +206,19 @@ Result> EdgeAccessor::ClearProperties() { if (edge_.ptr->deleted) return Error::DELETED_OBJECT; - auto properties = edge_.ptr->properties.Properties(); - for (const auto &property : properties) { - CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second); - } + using ReturnType = decltype(edge_.ptr->properties.Properties()); + std::optional properties; + utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() { + properties.emplace(edge_.ptr->properties.Properties()); + for (const auto &property : *properties) { + CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second); + } - edge_.ptr->properties.ClearProperties(); + edge_.ptr->properties.ClearProperties(); + }}; + std::invoke(atomic_memory_block); - return std::move(properties); + return properties.has_value() ? std::move(properties.value()) : ReturnType{}; } Result EdgeAccessor::GetProperty(PropertyId property, View view) const { diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index 08aa896bf..20a235f07 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -26,6 +26,7 @@ #include "storage/v2/inmemory/replication/recovery.hpp" #include "storage/v2/inmemory/unique_constraints.hpp" #include "storage/v2/property_value.hpp" +#include "utils/atomic_memory_block.hpp" #include "utils/resource_lock.hpp" #include "utils/stat.hpp" @@ -144,7 +145,6 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->FreeMemory(std::unique_lock{main_lock_, std::defer_lock}); }); - gc_jemalloc_runner_.Run("Jemalloc GC", config_.gc.interval, [] { memory::PurgeUnusedMemory(); }); } if (timestamp_ == kTimestampInitialId) { commit_log_.emplace(); @@ -158,7 +158,6 @@ InMemoryStorage::InMemoryStorage(Config config) : InMemoryStorage(config, Storag InMemoryStorage::~InMemoryStorage() { if (config_.gc.type == Config::Gc::Type::PERIODIC) { gc_runner_.Stop(); - gc_jemalloc_runner_.Stop(); } { // Stop replication (Stop all clients or stop the REPLICA server) @@ -338,18 +337,22 @@ Result InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso delta->prev.Set(&*it); } } + utils::AtomicMemoryBlock atomic_memory_block{ + [this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() { + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); + from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); - CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); - from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); + to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); - to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); + transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); - transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + // Increment edge count. + storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + }}; - // Increment edge count. - storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + std::invoke(atomic_memory_block); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); } @@ -433,18 +436,22 @@ Result InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces delta->prev.Set(&*it); } } + utils::AtomicMemoryBlock atomic_memory_block{ + [this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() { + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); + from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); - CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge); - from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); + to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge); - to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge); + transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); - transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + // Increment edge count. + storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + }}; - // Increment edge count. - storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); + std::invoke(atomic_memory_block); return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_); } @@ -534,18 +541,22 @@ Result InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor return Error::DELETED_OBJECT; } } + utils::AtomicMemoryBlock atomic_memory_block{ + [this, edge_ref, old_from_vertex, new_from_vertex, edge_type, to_vertex]() { + CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref); + new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref); + to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref); - new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref); - to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref); + transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + }}; - transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN); + std::invoke(atomic_memory_block); return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, storage_, &transaction_); } @@ -636,17 +647,22 @@ Result InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor * } } - CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); + utils::AtomicMemoryBlock atomic_memory_block{ + [this, edge_ref, old_to_vertex, from_vertex, edge_type, new_to_vertex]() { + CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref); - from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref); - new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref); + from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref); + new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref); - transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN); - transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN); + transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN); + transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN); + }}; + + std::invoke(atomic_memory_block); return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, storage_, &transaction_); } @@ -709,17 +725,21 @@ Result InMemoryStorage::InMemoryAccessor::EdgeChangeType(EdgeAcces MG_ASSERT((op1 && op2), "Invalid database state!"); - // "deleting" old edge - CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); + utils::AtomicMemoryBlock atomic_memory_block{[this, to_vertex, new_edge_type, edge_ref, from_vertex, edge_type]() { + // "deleting" old edge + CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref); - // "adding" new edge - CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref); - CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref); + // "adding" new edge + CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref); + CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref); - // edge type is not used while invalidating cache so we can only call it once - transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT); - transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN); + // edge type is not used while invalidating cache so we can only call it once + transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT); + transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN); + }}; + + std::invoke(atomic_memory_block); return EdgeAccessor(edge_ref, new_edge_type, from_vertex, to_vertex, storage_, &transaction_); } diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 2d2837467..0a174e3af 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -427,7 +427,6 @@ class InMemoryStorage final : public Storage { std::optional commit_log_; utils::Scheduler gc_runner_; - utils::Scheduler gc_jemalloc_runner_; std::mutex gc_lock_; using BondPmrLd = Bond>; diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index 86cc02696..8af076d7e 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -17,6 +17,7 @@ #include "storage/v2/storage.hpp" #include "storage/v2/transaction.hpp" #include "storage/v2/vertex_accessor.hpp" +#include "utils/atomic_memory_block.hpp" #include "utils/event_counter.hpp" #include "utils/event_histogram.hpp" #include "utils/exceptions.hpp" @@ -390,22 +391,29 @@ Result>> Storage::Accessor::ClearEdgesOn if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!"); - attached_edges_to_vertex->pop_back(); - if (storage_->config_.items.properties_on_edges) { - auto *edge_ptr = edge_ref.ptr; - MarkEdgeAsDeleted(edge_ptr); - } + // MarkEdgeAsDeleted allocates additional memory + // and CreateAndLinkDelta needs memory + utils::AtomicMemoryBlock atomic_memory_block{[&attached_edges_to_vertex, &deleted_edge_ids, &reverse_vertex_order, + &vertex_ptr, &deleted_edges, deletion_delta = deletion_delta, + edge_type = edge_type, opposing_vertex = opposing_vertex, + edge_ref = edge_ref, this]() { + attached_edges_to_vertex->pop_back(); + if (this->storage_->config_.items.properties_on_edges) { + auto *edge_ptr = edge_ref.ptr; + MarkEdgeAsDeleted(edge_ptr); + } - auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; - auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid); - bool const edge_cleared_from_both_directions = !was_inserted; - if (edge_cleared_from_both_directions) { - auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; - auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; - deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); - } - - CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); + auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; + auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid); + bool const edge_cleared_from_both_directions = !was_inserted; + if (edge_cleared_from_both_directions) { + auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; + auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; + deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); + } + CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); + }}; + std::invoke(atomic_memory_block); } return std::make_optional(); @@ -449,30 +457,37 @@ Result>> Storage::Accessor::DetachRemain return !set_for_erasure.contains(edge_gid); }); - for (auto it = mid; it != edges_attached_to_vertex->end(); it++) { - auto const &[edge_type, opposing_vertex, edge_ref] = *it; - std::unique_lock guard; - if (storage_->config_.items.properties_on_edges) { - auto edge_ptr = edge_ref.ptr; - guard = std::unique_lock{edge_ptr->lock}; - // this can happen only if we marked edges for deletion with no nodes, - // so the method detaching nodes will not do anything - MarkEdgeAsDeleted(edge_ptr); + // Creating deltas and erasing edge only at the end -> we might have incomplete state as + // delta might cause OOM, so we don't remove edges from edges_attached_to_vertex + utils::AtomicMemoryBlock atomic_memory_block{[&mid, &edges_attached_to_vertex, &deleted_edges, + &partially_detached_edge_ids, this, vertex_ptr, deletion_delta, + reverse_vertex_order]() { + for (auto it = mid; it != edges_attached_to_vertex->end(); it++) { + auto const &[edge_type, opposing_vertex, edge_ref] = *it; + std::unique_lock guard; + if (storage_->config_.items.properties_on_edges) { + auto edge_ptr = edge_ref.ptr; + guard = std::unique_lock{edge_ptr->lock}; + // this can happen only if we marked edges for deletion with no nodes, + // so the method detaching nodes will not do anything + MarkEdgeAsDeleted(edge_ptr); + } + + CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); + + auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; + auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid); + bool const edge_cleared_from_both_directions = !was_inserted; + if (edge_cleared_from_both_directions) { + auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; + auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; + deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); + } } + edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end()); + }}; - CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref); - - auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid; - auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid); - bool const edge_cleared_from_both_directions = !was_inserted; - if (edge_cleared_from_both_directions) { - auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr; - auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex; - deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true); - } - } - - edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end()); + std::invoke(atomic_memory_block); return std::make_optional(); }; diff --git a/src/storage/v2/vertex_accessor.cpp b/src/storage/v2/vertex_accessor.cpp index ff5881563..a7ac3e528 100644 --- a/src/storage/v2/vertex_accessor.cpp +++ b/src/storage/v2/vertex_accessor.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -26,6 +26,7 @@ #include "storage/v2/storage.hpp" #include "storage/v2/vertex_info_cache.hpp" #include "storage/v2/vertex_info_helpers.hpp" +#include "utils/atomic_memory_block.hpp" #include "utils/logging.hpp" #include "utils/memory_tracker.hpp" #include "utils/variant_helpers.hpp" @@ -107,8 +108,11 @@ Result VertexAccessor::AddLabel(LabelId label) { if (vertex_->deleted) return Error::DELETED_OBJECT; if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end()) return false; - CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label); - vertex_->labels.push_back(label); + utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label]() { + CreateAndLinkDelta(transaction, vertex, Delta::RemoveLabelTag(), label); + vertex->labels.push_back(label); + }}; + std::invoke(atomic_memory_block); if (storage_->config_.items.enable_schema_metadata) { storage_->stored_node_labels_.try_insert(label); @@ -136,9 +140,12 @@ Result VertexAccessor::RemoveLabel(LabelId label) { auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label); if (it == vertex_->labels.end()) return false; - CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label); - *it = vertex_->labels.back(); - vertex_->labels.pop_back(); + utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label, &it]() { + CreateAndLinkDelta(transaction, vertex, Delta::AddLabelTag(), label); + *it = vertex->labels.back(); + vertex->labels.pop_back(); + }}; + std::invoke(atomic_memory_block); /// TODO: some by pointers, some by reference => not good, make it better storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp); @@ -262,8 +269,12 @@ Result VertexAccessor::SetProperty(PropertyId property, const Pro // "modify in-place". Additionally, the created delta will make other // transactions get a SERIALIZATION_ERROR. - CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value); - vertex_->properties.SetProperty(property, value); + utils::AtomicMemoryBlock atomic_memory_block{ + [transaction = transaction_, vertex = vertex_, &value, &property, ¤t_value]() { + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value); + vertex->properties.SetProperty(property, value); + }}; + std::invoke(atomic_memory_block); if (!value.IsNull()) { transaction_->constraint_verification_info.AddedProperty(vertex_); @@ -287,20 +298,28 @@ Result VertexAccessor::InitProperties(const std::mapdeleted) return Error::DELETED_OBJECT; + bool result{false}; + utils::AtomicMemoryBlock atomic_memory_block{ + [&result, &properties, storage = storage_, transaction = transaction_, vertex = vertex_]() { + if (!vertex->properties.InitProperties(properties)) { + result = false; + return; + } + for (const auto &[property, value] : properties) { + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, PropertyValue()); + storage->indices_.UpdateOnSetProperty(property, value, vertex, *transaction); + transaction->manyDeltasCache.Invalidate(vertex, property); + if (!value.IsNull()) { + transaction->constraint_verification_info.AddedProperty(vertex); + } else { + transaction->constraint_verification_info.RemovedProperty(vertex); + } + } + result = true; + }}; + std::invoke(atomic_memory_block); - if (!vertex_->properties.InitProperties(properties)) return false; - for (const auto &[property, value] : properties) { - CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, PropertyValue()); - storage_->indices_.UpdateOnSetProperty(property, value, vertex_, *transaction_); - transaction_->manyDeltasCache.Invalidate(vertex_, property); - if (!value.IsNull()) { - transaction_->constraint_verification_info.AddedProperty(vertex_); - } else { - transaction_->constraint_verification_info.RemovedProperty(vertex_); - } - } - - return true; + return result; } Result>> VertexAccessor::UpdateProperties( @@ -316,20 +335,28 @@ Result>> Vertex if (vertex_->deleted) return Error::DELETED_OBJECT; - auto id_old_new_change = vertex_->properties.UpdateProperties(properties); + using ReturnType = decltype(vertex_->properties.UpdateProperties(properties)); + std::optional id_old_new_change; + utils::AtomicMemoryBlock atomic_memory_block{ + [storage = storage_, transaction = transaction_, vertex = vertex_, &properties, &id_old_new_change]() { + id_old_new_change.emplace(vertex->properties.UpdateProperties(properties)); + if (!id_old_new_change.has_value()) { + return; + } + for (auto &[id, old_value, new_value] : *id_old_new_change) { + storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction); + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value)); + transaction->manyDeltasCache.Invalidate(vertex, id); + if (!new_value.IsNull()) { + transaction->constraint_verification_info.AddedProperty(vertex); + } else { + transaction->constraint_verification_info.RemovedProperty(vertex); + } + } + }}; + std::invoke(atomic_memory_block); - for (auto &[id, old_value, new_value] : id_old_new_change) { - storage_->indices_.UpdateOnSetProperty(id, new_value, vertex_, *transaction_); - CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), id, std::move(old_value)); - transaction_->manyDeltasCache.Invalidate(vertex_, id); - if (!new_value.IsNull()) { - transaction_->constraint_verification_info.AddedProperty(vertex_); - } else { - transaction_->constraint_verification_info.RemovedProperty(vertex_); - } - } - - return id_old_new_change; + return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{}; } Result> VertexAccessor::ClearProperties() { @@ -342,17 +369,25 @@ Result> VertexAccessor::ClearProperties() { if (vertex_->deleted) return Error::DELETED_OBJECT; - auto properties = vertex_->properties.Properties(); - for (const auto &[property, value] : properties) { - CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, value); - storage_->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex_, *transaction_); - transaction_->constraint_verification_info.RemovedProperty(vertex_); - transaction_->manyDeltasCache.Invalidate(vertex_, property); - } + using ReturnType = decltype(vertex_->properties.Properties()); + std::optional properties; + utils::AtomicMemoryBlock atomic_memory_block{ + [storage = storage_, transaction = transaction_, vertex = vertex_, &properties]() { + properties.emplace(vertex->properties.Properties()); + if (!properties.has_value()) { + return; + } + for (const auto &[property, value] : *properties) { + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, value); + storage->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex, *transaction); + transaction->constraint_verification_info.RemovedProperty(vertex); + transaction->manyDeltasCache.Invalidate(vertex, property); + } + vertex->properties.ClearProperties(); + }}; + std::invoke(atomic_memory_block); - vertex_->properties.ClearProperties(); - - return std::move(properties); + return properties.has_value() ? std::move(properties.value()) : ReturnType{}; } Result VertexAccessor::GetProperty(PropertyId property, View view) const { diff --git a/src/utils/atomic_memory_block.hpp b/src/utils/atomic_memory_block.hpp new file mode 100644 index 000000000..31a3cf3a9 --- /dev/null +++ b/src/utils/atomic_memory_block.hpp @@ -0,0 +1,44 @@ +// Copyright 2024 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include +#include "utils/memory_tracker.hpp" + +namespace memgraph::utils { + +// Calls a function with out of memory exception blocker, checks memory allocation after block execution. +// Use it in case you need block which will be executed atomically considering memory execution +// but will check after block is executed if OOM exceptions needs to be thrown +template +class [[nodiscard]] AtomicMemoryBlock { + public: + explicit AtomicMemoryBlock(Callable &&function) : function_{std::forward(function)} {} + AtomicMemoryBlock(AtomicMemoryBlock const &) = delete; + AtomicMemoryBlock(AtomicMemoryBlock &&) = delete; + AtomicMemoryBlock &operator=(AtomicMemoryBlock const &) = delete; + AtomicMemoryBlock &operator=(AtomicMemoryBlock &&) = delete; + ~AtomicMemoryBlock() = default; + + void operator()() { + { + utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker; + function_(); + } + total_memory_tracker.DoCheck(); + } + + private: + Callable function_; +}; + +} // namespace memgraph::utils diff --git a/src/utils/memory_tracker.cpp b/src/utils/memory_tracker.cpp index 029223b71..358fcbe67 100644 --- a/src/utils/memory_tracker.cpp +++ b/src/utils/memory_tracker.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -124,6 +124,19 @@ void MemoryTracker::Alloc(const int64_t size) { UpdatePeak(will_be); } +void MemoryTracker::DoCheck() { + const auto current_hard_limit = hard_limit_.load(std::memory_order_relaxed); + const auto current_amount = amount_.load(std::memory_order_relaxed); + if (current_hard_limit && current_amount > current_hard_limit && MemoryTrackerCanThrow()) [[unlikely]] { + MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker; + throw OutOfMemoryException( + fmt::format("Memory limit exceeded! Current " + "use is {}, while the maximum allowed size for allocation is set to {}.", + GetReadableSize(static_cast(current_amount)), + GetReadableSize(static_cast(current_hard_limit)))); + } +} + void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); } } // namespace memgraph::utils diff --git a/src/utils/memory_tracker.hpp b/src/utils/memory_tracker.hpp index 0da888161..2df17c52a 100644 --- a/src/utils/memory_tracker.hpp +++ b/src/utils/memory_tracker.hpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -49,6 +49,7 @@ class MemoryTracker final { void Alloc(int64_t size); void Free(int64_t size); + void DoCheck(); auto Amount() const { return amount_.load(std::memory_order_relaxed); } diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index e1d42b443..d0029bbd7 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -143,6 +143,7 @@ startup_config_dict = { "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.", ), "storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."), + "storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."), "storage_items_per_batch": ( "1000000", "1000000", diff --git a/tests/e2e/streams/kafka/docker-compose.yml b/tests/e2e/streams/kafka/docker-compose.yml new file mode 100755 index 000000000..3ab2166af --- /dev/null +++ b/tests/e2e/streams/kafka/docker-compose.yml @@ -0,0 +1,22 @@ +version: '3' +services: + zookeeper: + image: 'bitnami/zookeeper:3.9.1' + ports: + - '2181:2181' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + kafka: + image: 'bitnami/kafka:3.6.1' + ports: + - '9092:9092' + - '29092:29092' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_HOST://localhost:29092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + - zookeeper diff --git a/tests/e2e/streams/pulsar/docker-compose.yml b/tests/e2e/streams/pulsar/docker-compose.yml new file mode 100644 index 000000000..a490fbdb1 --- /dev/null +++ b/tests/e2e/streams/pulsar/docker-compose.yml @@ -0,0 +1,9 @@ +version: "3" + +services: + pulsar: + image: 'apachepulsar/pulsar:2.8.1' + ports: + - '6652:8080' + - '6650:6650' + entrypoint: [ 'bin/pulsar', 'standalone' ] diff --git a/tests/stress/memory_limit.py b/tests/stress/memory_limit.py index 2b2084484..dd8225584 100644 --- a/tests/stress/memory_limit.py +++ b/tests/stress/memory_limit.py @@ -173,39 +173,49 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int: """ This writer creates lot of nodes on each write. Also it checks that query failed if memory limit is tried to be broken + + Return: + True if write suceeded + False otherwise """ session = SessionCache.argument_session(args) - def create() -> bool: + def try_create() -> bool: """ - Returns True if done, False if needs to continue + Function tries to create until memory limit is reached + Return: + True if it can continue creating (OOM not reached) + False otherwise """ - memory_tracker_data_before_start = get_tracker_data(session) - should_fail = memory_tracker_data_before_start >= 2048 - failed = False + should_continue = True try: try_execute( session, f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))", ) except Exception as ex: - failed = True output = str(ex) - log.info("Exception in create", output) - assert "Memory limit exceeded!" in output + memory_over_2048_mb = False + memory_tracker_data_after_start = get_tracker_data(session) + if memory_tracker_data_after_start: + memory_over_2048_mb = memory_tracker_data_after_start >= 2048 + log.info( + "Exception in create, exception output:", + output, + f"Worker {worker_id} started iteration {curr_repetition}, memory over 2048MB: {memory_over_2048_mb}", + ) + has_oom_happend = "Memory limit exceeded!" in output and memory_over_2048_mb + should_continue = not has_oom_happend - if should_fail: - assert failed, "Query should have failed" - return False - return True + return should_continue curr_repetition = 0 while curr_repetition < repetition_count: log.info(f"Worker {worker_id} started iteration {curr_repetition}") - should_continue = create() + should_continue = try_create() if not should_continue: return True @@ -214,6 +224,7 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int: log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}") curr_repetition += 1 + return False def execute_function(worker: Worker) -> Worker: