Merge branch 'text-search-integration-poc' of https://github.com/memgraph/memgraph into text-search-integration-poc

This commit is contained in:
Ante Pušić 2024-01-04 11:16:26 +01:00
commit b4b5970ba6
21 changed files with 431 additions and 539 deletions

View File

@ -229,10 +229,6 @@ jobs:
# branches and tags. (default: 1)
fetch-depth: 0
- name: Check e2e service dependencies
run: |
cd tests/e2e
./dependency_check.sh
- name: Build release binaries
run: |
@ -271,6 +267,13 @@ jobs:
cd build
ctest -R memgraph__unit --output-on-failure -j$THREADS
- name: Ensure Kafka and Pulsar are up
run: |
cd tests/e2e/streams/kafka
docker-compose up -d
cd ../pulsar
docker-compose up -d
- name: Run e2e tests
run: |
cd tests
@ -279,6 +282,14 @@ jobs:
cd e2e
./run.sh
- name: Ensure Kafka and Pulsar are down
if: always()
run: |
cd tests/e2e/streams/kafka
docker-compose down
cd ../pulsar
docker-compose down
- name: Run stress test (plain)
run: |
cd tests/stress

View File

@ -1,318 +0,0 @@
name: Release CentOS 8
on:
workflow_dispatch:
inputs:
build_type:
type: choice
description: "Memgraph Build type. Default value is Release."
default: 'Release'
options:
- Release
- RelWithDebInfo
schedule:
- cron: "0 22 * * *"
jobs:
community_build:
name: "Community build"
runs-on: [self-hosted, Linux, X64, CentOS8]
env:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
timeout-minutes: 960
steps:
- name: Set up repository
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
fetch-depth: 0
- name: Build community binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
# Set default build_type to Release
INPUT_BUILD_TYPE=${{ github.event.inputs.build_type }}
BUILD_TYPE=${INPUT_BUILD_TYPE:-"Release"}
# Build community binaries.
cd build
cmake -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DMG_ENTERPRISE=OFF ..
make -j$THREADS
- name: Run unit tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run unit tests.
cd build
ctest -R memgraph__unit --output-on-failure
coverage_build:
name: "Coverage build"
runs-on: [self-hosted, Linux, X64, CentOS8]
env:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
steps:
- name: Set up repository
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
fetch-depth: 0
- name: Build coverage binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
# Build coverage binaries.
cd build
cmake -DTEST_COVERAGE=ON ..
make -j$THREADS memgraph__unit
- name: Run unit tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run unit tests.
cd build
ctest -R memgraph__unit --output-on-failure
- name: Compute code coverage
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Compute code coverage.
cd tools/github
./coverage_convert
# Package code coverage.
cd generated
tar -czf code_coverage.tar.gz coverage.json html report.json summary.rmu
- name: Save code coverage
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/generated/code_coverage.tar.gz
debug_build:
name: "Debug build"
runs-on: [self-hosted, Linux, X64, CentOS8]
env:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
steps:
- name: Set up repository
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
fetch-depth: 0
- name: Build debug binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
# Build debug binaries.
cd build
cmake ..
make -j$THREADS
- name: Run leftover CTest tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run leftover CTest tests (all except unit and benchmark tests).
cd build
ctest -E "(memgraph__unit|memgraph__benchmark)" --output-on-failure
- name: Run drivers tests
run: |
./tests/drivers/run.sh
- name: Run integration tests
run: |
tests/integration/run.sh
- name: Run cppcheck and clang-format
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run cppcheck and clang-format.
cd tools/github
./cppcheck_and_clang_format diff
- name: Save cppcheck and clang-format errors
uses: actions/upload-artifact@v3
with:
name: "Code coverage"
path: tools/github/cppcheck_and_clang_format.txt
release_build:
name: "Release build"
runs-on: [self-hosted, Linux, X64, CentOS8]
env:
THREADS: 24
MEMGRAPH_ENTERPRISE_LICENSE: ${{ secrets.MEMGRAPH_ENTERPRISE_LICENSE }}
MEMGRAPH_ORGANIZATION_NAME: ${{ secrets.MEMGRAPH_ORGANIZATION_NAME }}
timeout-minutes: 960
steps:
- name: Set up repository
uses: actions/checkout@v3
with:
# Number of commits to fetch. `0` indicates all history for all
# branches and tags. (default: 1)
fetch-depth: 0
- name: Build release binaries
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Initialize dependencies.
./init
# Set default build_type to Release
INPUT_BUILD_TYPE=${{ github.event.inputs.build_type }}
BUILD_TYPE=${INPUT_BUILD_TYPE:-"Release"}
# Build release binaries.
cd build
cmake -DCMAKE_BUILD_TYPE=$BUILD_TYPE ..
make -j$THREADS
- name: Create enterprise RPM package
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
cd build
# create mgconsole
# we use the -B to force the build
make -j$THREADS -B mgconsole
# Create enterprise RPM package.
mkdir output && cd output
cpack -G RPM --config ../CPackConfig.cmake
rpmlint memgraph*.rpm
- name: Save enterprise RPM package
uses: actions/upload-artifact@v3
with:
name: "Enterprise RPM package"
path: build/output/memgraph*.rpm
- name: Run micro benchmark tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run micro benchmark tests.
cd build
# The `eval` benchmark needs a large stack limit.
ulimit -s 262144
ctest -R memgraph__benchmark -V
- name: Run macro benchmark tests
run: |
cd tests/macro_benchmark
./harness QuerySuite MemgraphRunner \
--groups aggregation 1000_create unwind_create dense_expand match \
--no-strict
- name: Run parallel macro benchmark tests
run: |
cd tests/macro_benchmark
./harness QueryParallelSuite MemgraphRunner \
--groups aggregation_parallel create_parallel bfs_parallel \
--num-database-workers 9 --num-clients-workers 30 \
--no-strict
- name: Run GQL Behave tests
run: |
cd tests
./setup.sh /opt/toolchain-v4/activate
cd gql_behave
./continuous_integration
- name: Save quality assurance status
uses: actions/upload-artifact@v3
with:
name: "GQL Behave Status"
path: |
tests/gql_behave/gql_behave_status.csv
tests/gql_behave/gql_behave_status.html
- name: Run unit tests
run: |
# Activate toolchain.
source /opt/toolchain-v4/activate
# Run unit tests.
cd build
ctest -R memgraph__unit --output-on-failure
- name: Run e2e tests
run: |
cd tests
./setup.sh /opt/toolchain-v4/activate
source ve3/bin/activate_e2e
cd e2e
./run.sh
- name: Run stress test (plain)
run: |
cd tests/stress
./continuous_integration
- name: Run stress test (SSL)
run: |
cd tests/stress
./continuous_integration --use-ssl
- name: Run stress test (large)
run: |
cd tests/stress
./continuous_integration --large-dataset
- name: Run durability test (plain)
run: |
cd tests/stress
source ve3/bin/activate
python3 durability --num-steps 5
- name: Run durability test (large)
run: |
cd tests/stress
source ve3/bin/activate
python3 durability --num-steps 20

View File

@ -53,7 +53,10 @@ set_target_properties(memgraph PROPERTIES
OUTPUT_NAME "memgraph-${MEMGRAPH_VERSION}_${CMAKE_BUILD_TYPE}"
# Output the executable in main binary dir.
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}
POSITION_INDEPENDENT_CODE ON
)
# Create symlink to the built executable.
add_custom_command(TARGET memgraph POST_BUILD

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -66,6 +66,9 @@ DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed i
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).",
FLAG_IN_RANGE(1, 24UL * 3600));
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_uint64(storage_python_gc_cycle_sec, 180,
"Storage python full garbage collection interval (in seconds).", FLAG_IN_RANGE(1, 24UL * 3600));
// NOTE: The `storage_properties_on_edges` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -52,6 +52,8 @@ DECLARE_bool(allow_load_csv);
// Storage flags.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint64(storage_gc_cycle_sec);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint64(storage_python_gc_cycle_sec);
// NOTE: The `storage_properties_on_edges` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -163,7 +163,7 @@ int main(int argc, char **argv) {
// libstd.
auto gil = memgraph::py::EnsureGIL();
// NOLINTNEXTLINE(hicpp-signed-bitwise)
auto *flag = PyLong_FromLong(RTLD_NOW | RTLD_DEEPBIND);
auto *flag = PyLong_FromLong(RTLD_NOW);
auto *setdl = PySys_GetObject("setdlopenflags");
MG_ASSERT(setdl);
auto *arg = PyTuple_New(1);
@ -184,6 +184,10 @@ int main(int argc, char **argv) {
"https://memgr.ph/python"));
}
memgraph::utils::Scheduler python_gc_scheduler;
python_gc_scheduler.Run("Python GC", std::chrono::seconds(FLAGS_storage_python_gc_cycle_sec),
[] { memgraph::query::procedure::PyCollectGarbage(); });
// Initialize the communication library.
memgraph::communication::SSLInit sslInit;
@ -318,6 +322,11 @@ int main(int argc, char **argv) {
.durability_directory = FLAGS_data_directory + "/rocksdb_durability",
.wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
.storage_mode = memgraph::flags::ParseStorageMode()};
memgraph::utils::Scheduler jemalloc_purge_scheduler;
jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec),
[] { memgraph::memory::PurgeUnusedMemory(); });
if (FLAGS_storage_snapshot_interval_sec == 0) {
if (FLAGS_storage_wal_enabled) {
LOG_FATAL(

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -169,7 +169,7 @@ class ModuleRegistry final {
// mentioned library will be first performed in the already existing binded
// libraries and then the global namespace.
// RTLD_DEEPBIND => https://linux.die.net/man/3/dlopen
SharedLibraryHandle libstd_handle{"libstdc++.so.6", RTLD_NOW | RTLD_LOCAL | RTLD_DEEPBIND, kLibstdcppWarning};
SharedLibraryHandle libstd_handle{"libstdc++.so.6", RTLD_NOW | RTLD_LOCAL, kLibstdcppWarning};
#endif
std::vector<std::filesystem::path> modules_dirs_;
std::filesystem::path internal_module_dir_;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -13,6 +13,7 @@
#include <datetime.h>
#include <methodobject.h>
#include <objimpl.h>
#include <pyerrors.h>
#include <array>
#include <optional>
@ -57,7 +58,6 @@ PyObject *gMgpValueConversionError{nullptr}; // NOLINT(cppcoreguidelines-avo
PyObject *gMgpSerializationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
PyObject *gMgpAuthorizationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
constexpr bool kStartGarbageCollection{true};
constexpr auto kMicrosecondsInMillisecond{1000};
constexpr auto kMicrosecondsInSecond{1000000};
@ -867,6 +867,24 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) {
return MgpListToPyTuple(list, reinterpret_cast<PyGraph *>(py_graph));
}
void PyCollectGarbage() {
if (!Py_IsInitialized() || _Py_IsFinalizing()) {
// Calling EnsureGIL will crash the program if this is true.
return;
}
auto gil = py::EnsureGIL();
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
}
namespace {
struct RecordFieldCache {
PyObject *key;
@ -1027,24 +1045,8 @@ std::optional<py::ExceptionInfo> AddMultipleBatchRecordsFromPython(mgp_result *r
return std::nullopt;
}
std::function<void()> PyObjectCleanup(py::Object &py_object, bool start_gc) {
return [py_object, start_gc]() {
if (start_gc) {
// Run `gc.collect` (reference cycle-detection) explicitly, so that we are
// sure the procedure cleaned up everything it held references to. If the
// user stored a reference to one of our `_mgp` instances then the
// internally used `mgp_*` structs will stay unfreed and a memory leak
// will be reported at the end of the query execution.
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
}
std::function<void()> PyObjectCleanup(py::Object &py_object) {
return [py_object]() {
// After making sure all references from our side have been cleared,
// invalidate the `_mgp.Graph` object. If the user kept a reference to one
// of our `_mgp` instances then this will prevent them from using those
@ -1095,7 +1097,7 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
std::optional<std::string> maybe_msg;
{
py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, !is_batched));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph));
if (py_graph) {
maybe_msg = error_to_msg(call(py_graph));
} else {
@ -1110,22 +1112,11 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
void CallPythonCleanup(const py::Object &py_cleanup) {
auto gil = py::EnsureGIL();
auto py_res = py_cleanup.Call();
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
}
void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp_graph *graph, mgp_memory *memory) {
auto gil = py::EnsureGIL();
auto error_to_msg = [](const std::optional<py::ExceptionInfo> &exc_info) -> std::optional<std::string> {
if (!exc_info) return std::nullopt;
// Here we tell the traceback formatter to skip the first line of the
@ -1146,7 +1137,7 @@ void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp
std::optional<std::string> maybe_msg;
{
py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, !kStartGarbageCollection));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph));
if (py_graph) {
maybe_msg = error_to_msg(call(py_graph));
} else {
@ -1194,8 +1185,8 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g
py::Object py_graph(MakePyGraph(graph, memory));
py::Object py_messages(MakePyMessages(msgs, memory));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, kStartGarbageCollection));
utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages, kStartGarbageCollection));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph));
utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages));
if (py_graph && py_messages) {
maybe_msg = error_to_msg(call(py_graph, py_messages));
@ -1264,7 +1255,7 @@ void CallPythonFunction(const py::Object &py_cb, mgp_list *args, mgp_graph *grap
std::optional<std::string> maybe_msg;
{
py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, kStartGarbageCollection));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph));
if (py_graph) {
auto maybe_result = call(py_graph);
if (!maybe_result.HasError()) {

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -79,4 +79,7 @@ py::Object ImportPyModule(const char *, mgp_module *);
/// Return nullptr and set appropriate Python exception on failure.
py::Object ReloadPyModule(PyObject *, mgp_module *);
/// Call full python circular reference garbage collection (all generations)
void PyCollectGarbage();
} // namespace memgraph::query::procedure

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -21,6 +21,7 @@
#include "storage/v2/result.hpp"
#include "storage/v2/storage.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/memory_tracker.hpp"
namespace memgraph::storage {
@ -126,24 +127,28 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
auto current_value = edge_.ptr->properties.GetProperty(property);
// We could skip setting the value if the previous one is the same to the new
// one. This would save some memory as a delta would not be created as well as
// avoid copying the value. The reason we are not doing that is because the
// current code always follows the logical pattern of "create a delta" and
// "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, current_value);
edge_.ptr->properties.SetProperty(property, value);
using ReturnType = decltype(edge_.ptr->properties.GetProperty(property));
std::optional<ReturnType> current_value;
utils::AtomicMemoryBlock atomic_memory_block{
[&current_value, &property, &value, transaction = transaction_, edge = edge_]() {
current_value.emplace(edge.ptr->properties.GetProperty(property));
// We could skip setting the value if the previous one is the same to the new
// one. This would save some memory as a delta would not be created as well as
// avoid copying the value. The reason we are not doing that is because the
// current code always follows the logical pattern of "create a delta" and
// "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction, edge.ptr, Delta::SetPropertyTag(), property, *current_value);
edge.ptr->properties.SetProperty(property, value);
}};
std::invoke(atomic_memory_block);
if (transaction_->IsDiskStorage()) {
ModifiedEdgeInfo modified_edge(Delta::Action::SET_PROPERTY, from_vertex_->gid, to_vertex_->gid, edge_type_, edge_);
transaction_->AddModifiedEdge(Gid(), modified_edge);
}
return std::move(current_value);
return std::move(*current_value);
}
Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties) {
@ -157,9 +162,12 @@ Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, st
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
if (!edge_.ptr->properties.InitProperties(properties)) return false;
for (const auto &[property, _] : properties) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue());
}
utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() {
for (const auto &[property, _] : properties) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, PropertyValue());
}
}};
std::invoke(atomic_memory_block);
return true;
}
@ -175,13 +183,18 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> EdgeAc
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
auto id_old_new_change = edge_.ptr->properties.UpdateProperties(properties);
using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties));
std::optional<ReturnType> id_old_new_change;
utils::AtomicMemoryBlock atomic_memory_block{
[transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change]() {
id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties));
for (auto &[property, old_value, new_value] : *id_old_new_change) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
}
}};
std::invoke(atomic_memory_block);
for (auto &[property, old_value, new_value] : id_old_new_change) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
}
return id_old_new_change;
return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{};
}
Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
@ -193,14 +206,19 @@ Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
auto properties = edge_.ptr->properties.Properties();
for (const auto &property : properties) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second);
}
using ReturnType = decltype(edge_.ptr->properties.Properties());
std::optional<ReturnType> properties;
utils::AtomicMemoryBlock atomic_memory_block{[&properties, transaction_ = transaction_, edge_ = edge_]() {
properties.emplace(edge_.ptr->properties.Properties());
for (const auto &property : *properties) {
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property.first, property.second);
}
edge_.ptr->properties.ClearProperties();
edge_.ptr->properties.ClearProperties();
}};
std::invoke(atomic_memory_block);
return std::move(properties);
return properties.has_value() ? std::move(properties.value()) : ReturnType{};
}
Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view) const {

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -26,6 +26,7 @@
#include "storage/v2/inmemory/replication/recovery.hpp"
#include "storage/v2/inmemory/unique_constraints.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/resource_lock.hpp"
#include "utils/stat.hpp"
@ -144,7 +145,6 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
gc_runner_.Run("Storage GC", config_.gc.interval, [this] {
this->FreeMemory(std::unique_lock<utils::ResourceLock>{main_lock_, std::defer_lock});
});
gc_jemalloc_runner_.Run("Jemalloc GC", config_.gc.interval, [] { memory::PurgeUnusedMemory(); });
}
if (timestamp_ == kTimestampInitialId) {
commit_log_.emplace();
@ -158,7 +158,6 @@ InMemoryStorage::InMemoryStorage(Config config) : InMemoryStorage(config, Storag
InMemoryStorage::~InMemoryStorage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop();
gc_jemalloc_runner_.Stop();
}
{
// Stop replication (Stop all clients or stop the REPLICA server)
@ -338,18 +337,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
delta->prev.Set(&*it);
}
}
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
}};
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
std::invoke(atomic_memory_block);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
}
@ -433,18 +436,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
delta->prev.Set(&*it);
}
}
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge, from_vertex = from_vertex, edge_type = edge_type, to_vertex = to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge);
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
}};
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
std::invoke(atomic_memory_block);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
}
@ -534,18 +541,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetFrom(EdgeAccessor
return Error::DELETED_OBJECT;
}
}
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge_ref, old_from_vertex, new_from_vertex, edge_type, to_vertex]() {
CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, old_from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, old_from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref);
new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref);
to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, new_from_vertex, Delta::RemoveOutEdgeTag(), edge_type, to_vertex, edge_ref);
new_from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, new_from_vertex, edge_ref);
to_vertex->in_edges.emplace_back(edge_type, new_from_vertex, edge_ref);
transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
}};
transaction_.manyDeltasCache.Invalidate(new_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(old_from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
std::invoke(atomic_memory_block);
return EdgeAccessor(edge_ref, edge_type, new_from_vertex, to_vertex, storage_, &transaction_);
}
@ -636,17 +647,22 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeSetTo(EdgeAccessor *
}
}
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
utils::AtomicMemoryBlock atomic_memory_block{
[this, edge_ref, old_to_vertex, from_vertex, edge_type, new_to_vertex]() {
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, old_to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, old_to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref);
from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref);
new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), edge_type, new_to_vertex, edge_ref);
from_vertex->out_edges.emplace_back(edge_type, new_to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, new_to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge_ref);
new_to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge_ref);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(old_to_vertex, edge_type, EdgeDirection::IN);
transaction_.manyDeltasCache.Invalidate(new_to_vertex, edge_type, EdgeDirection::IN);
}};
std::invoke(atomic_memory_block);
return EdgeAccessor(edge_ref, edge_type, from_vertex, new_to_vertex, storage_, &transaction_);
}
@ -709,17 +725,21 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::EdgeChangeType(EdgeAcces
MG_ASSERT((op1 && op2), "Invalid database state!");
// "deleting" old edge
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
utils::AtomicMemoryBlock atomic_memory_block{[this, to_vertex, new_edge_type, edge_ref, from_vertex, edge_type]() {
// "deleting" old edge
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
// "adding" new edge
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref);
// "adding" new edge
CreateAndLinkDelta(&transaction_, from_vertex, Delta::RemoveOutEdgeTag(), new_edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), new_edge_type, from_vertex, edge_ref);
// edge type is not used while invalidating cache so we can only call it once
transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN);
// edge type is not used while invalidating cache so we can only call it once
transaction_.manyDeltasCache.Invalidate(from_vertex, new_edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, new_edge_type, EdgeDirection::IN);
}};
std::invoke(atomic_memory_block);
return EdgeAccessor(edge_ref, new_edge_type, from_vertex, to_vertex, storage_, &transaction_);
}

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -427,7 +427,6 @@ class InMemoryStorage final : public Storage {
std::optional<CommitLog> commit_log_;
utils::Scheduler gc_runner_;
utils::Scheduler gc_jemalloc_runner_;
std::mutex gc_lock_;
using BondPmrLd = Bond<utils::pmr::list<Delta>>;

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -17,6 +17,7 @@
#include "storage/v2/storage.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/event_counter.hpp"
#include "utils/event_histogram.hpp"
#include "utils/exceptions.hpp"
@ -390,22 +391,29 @@ Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::ClearEdgesOn
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
MG_ASSERT(!vertex_ptr->deleted, "Invalid database state!");
attached_edges_to_vertex->pop_back();
if (storage_->config_.items.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
MarkEdgeAsDeleted(edge_ptr);
}
// MarkEdgeAsDeleted allocates additional memory
// and CreateAndLinkDelta needs memory
utils::AtomicMemoryBlock atomic_memory_block{[&attached_edges_to_vertex, &deleted_edge_ids, &reverse_vertex_order,
&vertex_ptr, &deleted_edges, deletion_delta = deletion_delta,
edge_type = edge_type, opposing_vertex = opposing_vertex,
edge_ref = edge_ref, this]() {
attached_edges_to_vertex->pop_back();
if (this->storage_->config_.items.properties_on_edges) {
auto *edge_ptr = edge_ref.ptr;
MarkEdgeAsDeleted(edge_ptr);
}
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
}
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = deleted_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
auto *to_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
}
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
}};
std::invoke(atomic_memory_block);
}
return std::make_optional<ReturnType>();
@ -449,30 +457,37 @@ Result<std::optional<std::vector<EdgeAccessor>>> Storage::Accessor::DetachRemain
return !set_for_erasure.contains(edge_gid);
});
for (auto it = mid; it != edges_attached_to_vertex->end(); it++) {
auto const &[edge_type, opposing_vertex, edge_ref] = *it;
std::unique_lock<utils::RWSpinLock> guard;
if (storage_->config_.items.properties_on_edges) {
auto edge_ptr = edge_ref.ptr;
guard = std::unique_lock{edge_ptr->lock};
// this can happen only if we marked edges for deletion with no nodes,
// so the method detaching nodes will not do anything
MarkEdgeAsDeleted(edge_ptr);
// Creating deltas and erasing edge only at the end -> we might have incomplete state as
// delta might cause OOM, so we don't remove edges from edges_attached_to_vertex
utils::AtomicMemoryBlock atomic_memory_block{[&mid, &edges_attached_to_vertex, &deleted_edges,
&partially_detached_edge_ids, this, vertex_ptr, deletion_delta,
reverse_vertex_order]() {
for (auto it = mid; it != edges_attached_to_vertex->end(); it++) {
auto const &[edge_type, opposing_vertex, edge_ref] = *it;
std::unique_lock<utils::RWSpinLock> guard;
if (storage_->config_.items.properties_on_edges) {
auto edge_ptr = edge_ref.ptr;
guard = std::unique_lock{edge_ptr->lock};
// this can happen only if we marked edges for deletion with no nodes,
// so the method detaching nodes will not do anything
MarkEdgeAsDeleted(edge_ptr);
}
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
}
}
edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end());
}};
CreateAndLinkDelta(&transaction_, vertex_ptr, deletion_delta, edge_type, opposing_vertex, edge_ref);
auto const edge_gid = storage_->config_.items.properties_on_edges ? edge_ref.ptr->gid : edge_ref.gid;
auto const [_, was_inserted] = partially_detached_edge_ids.insert(edge_gid);
bool const edge_cleared_from_both_directions = !was_inserted;
if (edge_cleared_from_both_directions) {
auto *from_vertex = reverse_vertex_order ? opposing_vertex : vertex_ptr;
auto *to_vertex = reverse_vertex_order ? vertex_ptr : opposing_vertex;
deleted_edges.emplace_back(edge_ref, edge_type, from_vertex, to_vertex, storage_, &transaction_, true);
}
}
edges_attached_to_vertex->erase(mid, edges_attached_to_vertex->end());
std::invoke(atomic_memory_block);
return std::make_optional<ReturnType>();
};

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -26,6 +26,7 @@
#include "storage/v2/storage.hpp"
#include "storage/v2/vertex_info_cache.hpp"
#include "storage/v2/vertex_info_helpers.hpp"
#include "utils/atomic_memory_block.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/variant_helpers.hpp"
@ -107,8 +108,11 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
if (vertex_->deleted) return Error::DELETED_OBJECT;
if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end()) return false;
CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label);
vertex_->labels.push_back(label);
utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label]() {
CreateAndLinkDelta(transaction, vertex, Delta::RemoveLabelTag(), label);
vertex->labels.push_back(label);
}};
std::invoke(atomic_memory_block);
if (storage_->config_.items.enable_schema_metadata) {
storage_->stored_node_labels_.try_insert(label);
@ -136,9 +140,12 @@ Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label);
if (it == vertex_->labels.end()) return false;
CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label);
*it = vertex_->labels.back();
vertex_->labels.pop_back();
utils::AtomicMemoryBlock atomic_memory_block{[transaction = transaction_, vertex = vertex_, &label, &it]() {
CreateAndLinkDelta(transaction, vertex, Delta::AddLabelTag(), label);
*it = vertex->labels.back();
vertex->labels.pop_back();
}};
std::invoke(atomic_memory_block);
/// TODO: some by pointers, some by reference => not good, make it better
storage_->constraints_.unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp);
@ -262,8 +269,12 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
// "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR.
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, current_value);
vertex_->properties.SetProperty(property, value);
utils::AtomicMemoryBlock atomic_memory_block{
[transaction = transaction_, vertex = vertex_, &value, &property, &current_value]() {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value);
vertex->properties.SetProperty(property, value);
}};
std::invoke(atomic_memory_block);
if (!value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_);
@ -287,20 +298,28 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
if (vertex_->deleted) return Error::DELETED_OBJECT;
bool result{false};
utils::AtomicMemoryBlock atomic_memory_block{
[&result, &properties, storage = storage_, transaction = transaction_, vertex = vertex_]() {
if (!vertex->properties.InitProperties(properties)) {
result = false;
return;
}
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, PropertyValue());
storage->indices_.UpdateOnSetProperty(property, value, vertex, *transaction);
transaction->manyDeltasCache.Invalidate(vertex, property);
if (!value.IsNull()) {
transaction->constraint_verification_info.AddedProperty(vertex);
} else {
transaction->constraint_verification_info.RemovedProperty(vertex);
}
}
result = true;
}};
std::invoke(atomic_memory_block);
if (!vertex_->properties.InitProperties(properties)) return false;
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, PropertyValue());
storage_->indices_.UpdateOnSetProperty(property, value, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, property);
if (!value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_);
} else {
transaction_->constraint_verification_info.RemovedProperty(vertex_);
}
}
return true;
return result;
}
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> VertexAccessor::UpdateProperties(
@ -316,20 +335,28 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
if (vertex_->deleted) return Error::DELETED_OBJECT;
auto id_old_new_change = vertex_->properties.UpdateProperties(properties);
using ReturnType = decltype(vertex_->properties.UpdateProperties(properties));
std::optional<ReturnType> id_old_new_change;
utils::AtomicMemoryBlock atomic_memory_block{
[storage = storage_, transaction = transaction_, vertex = vertex_, &properties, &id_old_new_change]() {
id_old_new_change.emplace(vertex->properties.UpdateProperties(properties));
if (!id_old_new_change.has_value()) {
return;
}
for (auto &[id, old_value, new_value] : *id_old_new_change) {
storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction);
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value));
transaction->manyDeltasCache.Invalidate(vertex, id);
if (!new_value.IsNull()) {
transaction->constraint_verification_info.AddedProperty(vertex);
} else {
transaction->constraint_verification_info.RemovedProperty(vertex);
}
}
}};
std::invoke(atomic_memory_block);
for (auto &[id, old_value, new_value] : id_old_new_change) {
storage_->indices_.UpdateOnSetProperty(id, new_value, vertex_, *transaction_);
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), id, std::move(old_value));
transaction_->manyDeltasCache.Invalidate(vertex_, id);
if (!new_value.IsNull()) {
transaction_->constraint_verification_info.AddedProperty(vertex_);
} else {
transaction_->constraint_verification_info.RemovedProperty(vertex_);
}
}
return id_old_new_change;
return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{};
}
Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
@ -342,17 +369,25 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
if (vertex_->deleted) return Error::DELETED_OBJECT;
auto properties = vertex_->properties.Properties();
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, value);
storage_->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex_, *transaction_);
transaction_->constraint_verification_info.RemovedProperty(vertex_);
transaction_->manyDeltasCache.Invalidate(vertex_, property);
}
using ReturnType = decltype(vertex_->properties.Properties());
std::optional<ReturnType> properties;
utils::AtomicMemoryBlock atomic_memory_block{
[storage = storage_, transaction = transaction_, vertex = vertex_, &properties]() {
properties.emplace(vertex->properties.Properties());
if (!properties.has_value()) {
return;
}
for (const auto &[property, value] : *properties) {
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, value);
storage->indices_.UpdateOnSetProperty(property, PropertyValue(), vertex, *transaction);
transaction->constraint_verification_info.RemovedProperty(vertex);
transaction->manyDeltasCache.Invalidate(vertex, property);
}
vertex->properties.ClearProperties();
}};
std::invoke(atomic_memory_block);
vertex_->properties.ClearProperties();
return std::move(properties);
return properties.has_value() ? std::move(properties.value()) : ReturnType{};
}
Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view) const {

View File

@ -0,0 +1,44 @@
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <functional>
#include "utils/memory_tracker.hpp"
namespace memgraph::utils {
// Calls a function with out of memory exception blocker, checks memory allocation after block execution.
// Use it in case you need block which will be executed atomically considering memory execution
// but will check after block is executed if OOM exceptions needs to be thrown
template <typename Callable>
class [[nodiscard]] AtomicMemoryBlock {
public:
explicit AtomicMemoryBlock(Callable &&function) : function_{std::forward<Callable>(function)} {}
AtomicMemoryBlock(AtomicMemoryBlock const &) = delete;
AtomicMemoryBlock(AtomicMemoryBlock &&) = delete;
AtomicMemoryBlock &operator=(AtomicMemoryBlock const &) = delete;
AtomicMemoryBlock &operator=(AtomicMemoryBlock &&) = delete;
~AtomicMemoryBlock() = default;
void operator()() {
{
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker;
function_();
}
total_memory_tracker.DoCheck();
}
private:
Callable function_;
};
} // namespace memgraph::utils

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -124,6 +124,19 @@ void MemoryTracker::Alloc(const int64_t size) {
UpdatePeak(will_be);
}
void MemoryTracker::DoCheck() {
const auto current_hard_limit = hard_limit_.load(std::memory_order_relaxed);
const auto current_amount = amount_.load(std::memory_order_relaxed);
if (current_hard_limit && current_amount > current_hard_limit && MemoryTrackerCanThrow()) [[unlikely]] {
MemoryTracker::OutOfMemoryExceptionBlocker exception_blocker;
throw OutOfMemoryException(
fmt::format("Memory limit exceeded! Current "
"use is {}, while the maximum allowed size for allocation is set to {}.",
GetReadableSize(static_cast<double>(current_amount)),
GetReadableSize(static_cast<double>(current_hard_limit))));
}
}
void MemoryTracker::Free(const int64_t size) { amount_.fetch_sub(size, std::memory_order_relaxed); }
} // namespace memgraph::utils

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -49,6 +49,7 @@ class MemoryTracker final {
void Alloc(int64_t size);
void Free(int64_t size);
void DoCheck();
auto Amount() const { return amount_.load(std::memory_order_relaxed); }

View File

@ -143,6 +143,7 @@ startup_config_dict = {
"The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.",
),
"storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."),
"storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."),
"storage_items_per_batch": (
"1000000",
"1000000",

View File

@ -0,0 +1,22 @@
version: '3'
services:
zookeeper:
image: 'bitnami/zookeeper:3.9.1'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:3.6.1'
ports:
- '9092:9092'
- '29092:29092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper

View File

@ -0,0 +1,9 @@
version: "3"
services:
pulsar:
image: 'apachepulsar/pulsar:2.8.1'
ports:
- '6652:8080'
- '6650:6650'
entrypoint: [ 'bin/pulsar', 'standalone' ]

View File

@ -173,39 +173,49 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
"""
This writer creates lot of nodes on each write.
Also it checks that query failed if memory limit is tried to be broken
Return:
True if write suceeded
False otherwise
"""
session = SessionCache.argument_session(args)
def create() -> bool:
def try_create() -> bool:
"""
Returns True if done, False if needs to continue
Function tries to create until memory limit is reached
Return:
True if it can continue creating (OOM not reached)
False otherwise
"""
memory_tracker_data_before_start = get_tracker_data(session)
should_fail = memory_tracker_data_before_start >= 2048
failed = False
should_continue = True
try:
try_execute(
session,
f"FOREACH (i in range(1,10000) | CREATE (:Node {{prop:'big string or something like that'}}))",
)
except Exception as ex:
failed = True
output = str(ex)
log.info("Exception in create", output)
assert "Memory limit exceeded!" in output
memory_over_2048_mb = False
memory_tracker_data_after_start = get_tracker_data(session)
if memory_tracker_data_after_start:
memory_over_2048_mb = memory_tracker_data_after_start >= 2048
log.info(
"Exception in create, exception output:",
output,
f"Worker {worker_id} started iteration {curr_repetition}, memory over 2048MB: {memory_over_2048_mb}",
)
has_oom_happend = "Memory limit exceeded!" in output and memory_over_2048_mb
should_continue = not has_oom_happend
if should_fail:
assert failed, "Query should have failed"
return False
return True
return should_continue
curr_repetition = 0
while curr_repetition < repetition_count:
log.info(f"Worker {worker_id} started iteration {curr_repetition}")
should_continue = create()
should_continue = try_create()
if not should_continue:
return True
@ -214,6 +224,7 @@ def run_writer(repetition_count: int, sleep_sec: float, worker_id: int) -> int:
log.info(f"Worker {worker_id} created chain in iteration {curr_repetition}")
curr_repetition += 1
return False
def execute_function(worker: Worker) -> Worker: