From 082f9a7d9b5d15f6525fd9514f863d56746a73c1 Mon Sep 17 00:00:00 2001 From: Josipmrden Date: Fri, 15 Mar 2024 14:45:21 +0100 Subject: [PATCH] Add behaviour of no updates if vertex is updated with same value (#1791) --- src/flags/general.cpp | 4 ++ src/flags/general.hpp | 2 + src/memgraph.cpp | 3 +- src/query/frontend/stripped.cpp | 2 +- src/query/plan/operator.cpp | 2 +- src/storage/v2/config.hpp | 1 + src/storage/v2/edge_accessor.cpp | 10 +++- src/storage/v2/vertex_accessor.cpp | 69 ++++++++++++++--------- src/utils/atomic_memory_block.hpp | 10 ++-- src/utils/on_scope_exit.hpp | 5 +- tests/e2e/CMakeLists.txt | 1 + tests/e2e/concurrency/CMakeLists.txt | 6 ++ tests/e2e/concurrency/common.py | 60 ++++++++++++++++++++ tests/e2e/concurrency/concurrency.py | 57 +++++++++++++++++++ tests/e2e/concurrency/workloads.yaml | 14 +++++ tests/e2e/configuration/default_config.py | 5 ++ 16 files changed, 209 insertions(+), 42 deletions(-) create mode 100644 tests/e2e/concurrency/CMakeLists.txt create mode 100644 tests/e2e/concurrency/common.py create mode 100644 tests/e2e/concurrency/concurrency.py create mode 100644 tests/e2e/concurrency/workloads.yaml diff --git a/src/flags/general.cpp b/src/flags/general.cpp index cd2c95c60..37fa17b36 100644 --- a/src/flags/general.cpp +++ b/src/flags/general.cpp @@ -131,6 +131,10 @@ DEFINE_uint64(storage_recovery_thread_count, DEFINE_bool(storage_enable_schema_metadata, false, "Controls whether metadata should be collected about the resident labels and edge types."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(storage_delta_on_identical_property_update, true, + "Controls whether updating a property with the same value should create a delta object."); + // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_bool(telemetry_enabled, false, "Set to true to enable telemetry. We collect information about the " diff --git a/src/flags/general.hpp b/src/flags/general.hpp index a1e8729ab..52f51471d 100644 --- a/src/flags/general.hpp +++ b/src/flags/general.hpp @@ -84,6 +84,8 @@ DECLARE_bool(storage_parallel_schema_recovery); DECLARE_uint64(storage_recovery_thread_count); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_bool(storage_enable_schema_metadata); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_bool(storage_delta_on_identical_property_update); // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_bool(telemetry_enabled); diff --git a/src/memgraph.cpp b/src/memgraph.cpp index d896bcc4c..9bf50131d 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -332,7 +332,8 @@ int main(int argc, char **argv) { .durability_directory = FLAGS_data_directory + "/rocksdb_durability", .wal_directory = FLAGS_data_directory + "/rocksdb_wal"}, .salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges, - .enable_schema_metadata = FLAGS_storage_enable_schema_metadata}, + .enable_schema_metadata = FLAGS_storage_enable_schema_metadata, + .delta_on_identical_property_update = FLAGS_storage_delta_on_identical_property_update}, .salient.storage_mode = memgraph::flags::ParseStorageMode()}; spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup, FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup); diff --git a/src/query/frontend/stripped.cpp b/src/query/frontend/stripped.cpp index 9740cd463..5ea26b041 100644 --- a/src/query/frontend/stripped.cpp +++ b/src/query/frontend/stripped.cpp @@ -1,4 +1,4 @@ -// Copyright 2023 Memgraph Ltd. +// Copyright 2024 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 8e1b9f529..2b970cf49 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -329,7 +329,7 @@ CreateExpand::CreateExpand(NodeCreationInfo node_info, EdgeCreationInfo edge_inf ACCEPT_WITH_INPUT(CreateExpand) UniqueCursorPtr CreateExpand::MakeCursor(utils::MemoryResource *mem) const { - memgraph::metrics::IncrementCounter(memgraph::metrics::CreateNodeOperator); + memgraph::metrics::IncrementCounter(memgraph::metrics::CreateExpandOperator); return MakeUniqueCursorPtr(mem, *this, mem); } diff --git a/src/storage/v2/config.hpp b/src/storage/v2/config.hpp index b2a55a40a..419f29b85 100644 --- a/src/storage/v2/config.hpp +++ b/src/storage/v2/config.hpp @@ -37,6 +37,7 @@ struct SalientConfig { struct Items { bool properties_on_edges{true}; bool enable_schema_metadata{false}; + bool delta_on_identical_property_update{true}; friend bool operator==(const Items &lrh, const Items &rhs) = default; } items; diff --git a/src/storage/v2/edge_accessor.cpp b/src/storage/v2/edge_accessor.cpp index 62a9f4bcd..ba354371e 100644 --- a/src/storage/v2/edge_accessor.cpp +++ b/src/storage/v2/edge_accessor.cpp @@ -130,9 +130,13 @@ Result EdgeAccessor::SetProperty(PropertyId property, co if (edge_.ptr->deleted) return Error::DELETED_OBJECT; using ReturnType = decltype(edge_.ptr->properties.GetProperty(property)); std::optional current_value; + const bool skip_duplicate_write = !storage_->config_.salient.items.delta_on_identical_property_update; utils::AtomicMemoryBlock atomic_memory_block{ - [¤t_value, &property, &value, transaction = transaction_, edge = edge_]() { + [¤t_value, &property, &value, transaction = transaction_, edge = edge_, skip_duplicate_write]() { current_value.emplace(edge.ptr->properties.GetProperty(property)); + if (skip_duplicate_write && current_value == value) { + return; + } // We could skip setting the value if the previous one is the same to the new // one. This would save some memory as a delta would not be created as well as // avoid copying the value. The reason we are not doing that is because the @@ -184,12 +188,14 @@ Result>> EdgeAc if (edge_.ptr->deleted) return Error::DELETED_OBJECT; + const bool skip_duplicate_write = !storage_->config_.salient.items.delta_on_identical_property_update; using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties)); std::optional id_old_new_change; utils::AtomicMemoryBlock atomic_memory_block{ - [transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change]() { + [transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change, skip_duplicate_write]() { id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties)); for (auto &[property, old_value, new_value] : *id_old_new_change) { + if (skip_duplicate_write && old_value == new_value) continue; CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value)); } }}; diff --git a/src/storage/v2/vertex_accessor.cpp b/src/storage/v2/vertex_accessor.cpp index 7d78070a8..83dcc003b 100644 --- a/src/storage/v2/vertex_accessor.cpp +++ b/src/storage/v2/vertex_accessor.cpp @@ -261,20 +261,31 @@ Result VertexAccessor::SetProperty(PropertyId property, const Pro if (vertex_->deleted) return Error::DELETED_OBJECT; - auto current_value = vertex_->properties.GetProperty(property); - // We could skip setting the value if the previous one is the same to the new - // one. This would save some memory as a delta would not be created as well as - // avoid copying the value. The reason we are not doing that is because the - // current code always follows the logical pattern of "create a delta" and - // "modify in-place". Additionally, the created delta will make other - // transactions get a SERIALIZATION_ERROR. - + PropertyValue current_value; + const bool skip_duplicate_write = !storage_->config_.salient.items.delta_on_identical_property_update; utils::AtomicMemoryBlock atomic_memory_block{ - [transaction = transaction_, vertex = vertex_, &value, &property, ¤t_value]() { + [transaction = transaction_, vertex = vertex_, &value, &property, ¤t_value, skip_duplicate_write]() { + current_value = vertex->properties.GetProperty(property); + // We could skip setting the value if the previous one is the same to the new + // one. This would save some memory as a delta would not be created as well as + // avoid copying the value. The reason we are not doing that is because the + // current code always follows the logical pattern of "create a delta" and + // "modify in-place". Additionally, the created delta will make other + // transactions get a SERIALIZATION_ERROR. + if (skip_duplicate_write && current_value == value) { + return true; + } + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value); vertex->properties.SetProperty(property, value); + + return false; }}; - std::invoke(atomic_memory_block); + const bool early_exit = std::invoke(atomic_memory_block); + + if (early_exit) { + return std::move(current_value); + } if (transaction_->constraint_verification_info) { if (!value.IsNull()) { @@ -339,27 +350,29 @@ Result>> Vertex if (vertex_->deleted) return Error::DELETED_OBJECT; + const bool skip_duplicate_update = storage_->config_.salient.items.delta_on_identical_property_update; using ReturnType = decltype(vertex_->properties.UpdateProperties(properties)); std::optional id_old_new_change; - utils::AtomicMemoryBlock atomic_memory_block{ - [storage = storage_, transaction = transaction_, vertex = vertex_, &properties, &id_old_new_change]() { - id_old_new_change.emplace(vertex->properties.UpdateProperties(properties)); - if (!id_old_new_change.has_value()) { - return; + utils::AtomicMemoryBlock atomic_memory_block{[storage = storage_, transaction = transaction_, vertex = vertex_, + &properties, &id_old_new_change, skip_duplicate_update]() { + id_old_new_change.emplace(vertex->properties.UpdateProperties(properties)); + if (!id_old_new_change.has_value()) { + return; + } + for (auto &[id, old_value, new_value] : *id_old_new_change) { + storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction); + if (skip_duplicate_update && old_value == new_value) continue; + CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value)); + transaction->manyDeltasCache.Invalidate(vertex, id); + if (transaction->constraint_verification_info) { + if (!new_value.IsNull()) { + transaction->constraint_verification_info->AddedProperty(vertex); + } else { + transaction->constraint_verification_info->RemovedProperty(vertex); } - for (auto &[id, old_value, new_value] : *id_old_new_change) { - storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction); - CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value)); - transaction->manyDeltasCache.Invalidate(vertex, id); - if (transaction->constraint_verification_info) { - if (!new_value.IsNull()) { - transaction->constraint_verification_info->AddedProperty(vertex); - } else { - transaction->constraint_verification_info->RemovedProperty(vertex); - } - } - } - }}; + } + } + }}; std::invoke(atomic_memory_block); return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{}; diff --git a/src/utils/atomic_memory_block.hpp b/src/utils/atomic_memory_block.hpp index 31a3cf3a9..5ae2aab35 100644 --- a/src/utils/atomic_memory_block.hpp +++ b/src/utils/atomic_memory_block.hpp @@ -29,12 +29,10 @@ class [[nodiscard]] AtomicMemoryBlock { AtomicMemoryBlock &operator=(AtomicMemoryBlock &&) = delete; ~AtomicMemoryBlock() = default; - void operator()() { - { - utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker; - function_(); - } - total_memory_tracker.DoCheck(); + auto operator()() -> std::invoke_result_t { + auto check_on_exit = OnScopeExit{[&] { total_memory_tracker.DoCheck(); }}; + utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker; + return function_(); } private: diff --git a/src/utils/on_scope_exit.hpp b/src/utils/on_scope_exit.hpp index a5398b017..114f1c370 100644 --- a/src/utils/on_scope_exit.hpp +++ b/src/utils/on_scope_exit.hpp @@ -35,7 +35,7 @@ namespace memgraph::utils { * // long block of code, might throw an exception * } */ -template +template class [[nodiscard]] OnScopeExit { public: template @@ -46,7 +46,7 @@ class [[nodiscard]] OnScopeExit { OnScopeExit &operator=(OnScopeExit const &) = delete; OnScopeExit &operator=(OnScopeExit &&) = delete; ~OnScopeExit() { - if (doCall_) function_(); + if (doCall_) std::invoke(std::move(function_)); } void Disable() { doCall_ = false; } @@ -57,5 +57,4 @@ class [[nodiscard]] OnScopeExit { }; template OnScopeExit(Callable &&) -> OnScopeExit; - } // namespace memgraph::utils diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt index 1876074ee..60743676d 100644 --- a/tests/e2e/CMakeLists.txt +++ b/tests/e2e/CMakeLists.txt @@ -77,6 +77,7 @@ add_subdirectory(garbage_collection) add_subdirectory(query_planning) add_subdirectory(awesome_functions) add_subdirectory(high_availability) +add_subdirectory(concurrency) add_subdirectory(replication_experimental) diff --git a/tests/e2e/concurrency/CMakeLists.txt b/tests/e2e/concurrency/CMakeLists.txt new file mode 100644 index 000000000..f981a2537 --- /dev/null +++ b/tests/e2e/concurrency/CMakeLists.txt @@ -0,0 +1,6 @@ +function(copy_concurrency_e2e_python_files FILE_NAME) + copy_e2e_python_files(concurrency ${FILE_NAME}) +endfunction() + +copy_concurrency_e2e_python_files(common.py) +copy_concurrency_e2e_python_files(concurrency.py) diff --git a/tests/e2e/concurrency/common.py b/tests/e2e/concurrency/common.py new file mode 100644 index 000000000..208278929 --- /dev/null +++ b/tests/e2e/concurrency/common.py @@ -0,0 +1,60 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing + +import mgclient +import pytest + + +def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]: + cursor.execute(query, params) + return cursor.fetchall() + + +def execute_and_fetch_all_with_commit( + connection: mgclient.Connection, query: str, params: dict = {} +) -> typing.List[tuple]: + cursor = connection.cursor() + cursor.execute(query, params) + results = cursor.fetchall() + connection.commit() + return results + + +@pytest.fixture +def first_connection(**kwargs) -> mgclient.Connection: + connection = mgclient.connect(host="localhost", port=7687, **kwargs) + connection.autocommit = True + cursor = connection.cursor() + execute_and_fetch_all(cursor, "USE DATABASE memgraph") + try: + execute_and_fetch_all(cursor, "DROP DATABASE clean") + except: + pass + execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") + connection.autocommit = False + yield connection + + +@pytest.fixture +def second_connection(**kwargs) -> mgclient.Connection: + connection = mgclient.connect(host="localhost", port=7687, **kwargs) + connection.autocommit = True + cursor = connection.cursor() + execute_and_fetch_all(cursor, "USE DATABASE memgraph") + try: + execute_and_fetch_all(cursor, "DROP DATABASE clean") + except: + pass + execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") + connection.autocommit = False + yield connection diff --git a/tests/e2e/concurrency/concurrency.py b/tests/e2e/concurrency/concurrency.py new file mode 100644 index 000000000..7961c1984 --- /dev/null +++ b/tests/e2e/concurrency/concurrency.py @@ -0,0 +1,57 @@ +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys + +import pytest +from common import execute_and_fetch_all, first_connection, second_connection + + +def test_concurrency_if_no_delta_on_same_node_property_update(first_connection, second_connection): + m1c = first_connection.cursor() + m2c = second_connection.cursor() + + execute_and_fetch_all(m1c, "CREATE (:Node {prop: 1})") + first_connection.commit() + + test_has_error = False + try: + m1c.execute("MATCH (n) SET n.prop = 1") + m2c.execute("MATCH (n) SET n.prop = 1") + first_connection.commit() + second_connection.commit() + except Exception as e: + test_has_error = True + + assert test_has_error is False + + +def test_concurrency_if_no_delta_on_same_edge_property_update(first_connection, second_connection): + m1c = first_connection.cursor() + m2c = second_connection.cursor() + + execute_and_fetch_all(m1c, "CREATE ()-[:TYPE {prop: 1}]->()") + first_connection.commit() + + test_has_error = False + try: + m1c.execute("MATCH (n)-[r]->(m) SET r.prop = 1") + m2c.execute("MATCH (n)-[r]->(m) SET n.prop = 1") + first_connection.commit() + second_connection.commit() + except Exception as e: + test_has_error = True + + assert test_has_error is False + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/concurrency/workloads.yaml b/tests/e2e/concurrency/workloads.yaml new file mode 100644 index 000000000..839090538 --- /dev/null +++ b/tests/e2e/concurrency/workloads.yaml @@ -0,0 +1,14 @@ +concurrency_cluster: &concurrency_cluster + cluster: + main: + args: ["--bolt-port", "7687", "--log-level=TRACE", "--storage-delta-on-identical-property-update=false"] + log_file: "concurrency.log" + setup_queries: [] + validation_queries: [] + + +workloads: + - name: "Concurrency" + binary: "tests/e2e/pytest_runner.sh" + args: ["concurrency/concurrency.py"] + <<: *concurrency_cluster diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index 65a850f0b..75c211e0f 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -141,6 +141,11 @@ startup_config_dict = { "1", "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.", ), + "storage_delta_on_identical_property_update": ( + "true", + "true", + "Controls whether updating a property with the same value should create a delta object.", + ), "storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."), "storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."), "storage_items_per_batch": (