Add behaviour of no updates if vertex is updated with same value (#1791)

This commit is contained in:
Josipmrden 2024-03-15 14:45:21 +01:00 committed by GitHub
parent 0ed2d18754
commit 082f9a7d9b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 209 additions and 42 deletions

View File

@ -131,6 +131,10 @@ DEFINE_uint64(storage_recovery_thread_count,
DEFINE_bool(storage_enable_schema_metadata, false, DEFINE_bool(storage_enable_schema_metadata, false,
"Controls whether metadata should be collected about the resident labels and edge types."); "Controls whether metadata should be collected about the resident labels and edge types.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_delta_on_identical_property_update, true,
"Controls whether updating a property with the same value should create a delta object.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(telemetry_enabled, false, DEFINE_bool(telemetry_enabled, false,
"Set to true to enable telemetry. We collect information about the " "Set to true to enable telemetry. We collect information about the "

View File

@ -84,6 +84,8 @@ DECLARE_bool(storage_parallel_schema_recovery);
DECLARE_uint64(storage_recovery_thread_count); DECLARE_uint64(storage_recovery_thread_count);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_enable_schema_metadata); DECLARE_bool(storage_enable_schema_metadata);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(storage_delta_on_identical_property_update);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_bool(telemetry_enabled); DECLARE_bool(telemetry_enabled);

View File

@ -332,7 +332,8 @@ int main(int argc, char **argv) {
.durability_directory = FLAGS_data_directory + "/rocksdb_durability", .durability_directory = FLAGS_data_directory + "/rocksdb_durability",
.wal_directory = FLAGS_data_directory + "/rocksdb_wal"}, .wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
.salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges, .salient.items = {.properties_on_edges = FLAGS_storage_properties_on_edges,
.enable_schema_metadata = FLAGS_storage_enable_schema_metadata}, .enable_schema_metadata = FLAGS_storage_enable_schema_metadata,
.delta_on_identical_property_update = FLAGS_storage_delta_on_identical_property_update},
.salient.storage_mode = memgraph::flags::ParseStorageMode()}; .salient.storage_mode = memgraph::flags::ParseStorageMode()};
spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup, spdlog::info("config recover on startup {}, flags {} {}", db_config.durability.recover_on_startup,
FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup); FLAGS_storage_recover_on_startup, FLAGS_data_recovery_on_startup);

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd. // Copyright 2024 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -329,7 +329,7 @@ CreateExpand::CreateExpand(NodeCreationInfo node_info, EdgeCreationInfo edge_inf
ACCEPT_WITH_INPUT(CreateExpand) ACCEPT_WITH_INPUT(CreateExpand)
UniqueCursorPtr CreateExpand::MakeCursor(utils::MemoryResource *mem) const { UniqueCursorPtr CreateExpand::MakeCursor(utils::MemoryResource *mem) const {
memgraph::metrics::IncrementCounter(memgraph::metrics::CreateNodeOperator); memgraph::metrics::IncrementCounter(memgraph::metrics::CreateExpandOperator);
return MakeUniqueCursorPtr<CreateExpandCursor>(mem, *this, mem); return MakeUniqueCursorPtr<CreateExpandCursor>(mem, *this, mem);
} }

View File

@ -37,6 +37,7 @@ struct SalientConfig {
struct Items { struct Items {
bool properties_on_edges{true}; bool properties_on_edges{true};
bool enable_schema_metadata{false}; bool enable_schema_metadata{false};
bool delta_on_identical_property_update{true};
friend bool operator==(const Items &lrh, const Items &rhs) = default; friend bool operator==(const Items &lrh, const Items &rhs) = default;
} items; } items;

View File

@ -130,9 +130,13 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
using ReturnType = decltype(edge_.ptr->properties.GetProperty(property)); using ReturnType = decltype(edge_.ptr->properties.GetProperty(property));
std::optional<ReturnType> current_value; std::optional<ReturnType> current_value;
const bool skip_duplicate_write = !storage_->config_.salient.items.delta_on_identical_property_update;
utils::AtomicMemoryBlock atomic_memory_block{ utils::AtomicMemoryBlock atomic_memory_block{
[&current_value, &property, &value, transaction = transaction_, edge = edge_]() { [&current_value, &property, &value, transaction = transaction_, edge = edge_, skip_duplicate_write]() {
current_value.emplace(edge.ptr->properties.GetProperty(property)); current_value.emplace(edge.ptr->properties.GetProperty(property));
if (skip_duplicate_write && current_value == value) {
return;
}
// We could skip setting the value if the previous one is the same to the new // We could skip setting the value if the previous one is the same to the new
// one. This would save some memory as a delta would not be created as well as // one. This would save some memory as a delta would not be created as well as
// avoid copying the value. The reason we are not doing that is because the // avoid copying the value. The reason we are not doing that is because the
@ -184,12 +188,14 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> EdgeAc
if (edge_.ptr->deleted) return Error::DELETED_OBJECT; if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
const bool skip_duplicate_write = !storage_->config_.salient.items.delta_on_identical_property_update;
using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties)); using ReturnType = decltype(edge_.ptr->properties.UpdateProperties(properties));
std::optional<ReturnType> id_old_new_change; std::optional<ReturnType> id_old_new_change;
utils::AtomicMemoryBlock atomic_memory_block{ utils::AtomicMemoryBlock atomic_memory_block{
[transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change]() { [transaction_ = transaction_, edge_ = edge_, &properties, &id_old_new_change, skip_duplicate_write]() {
id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties)); id_old_new_change.emplace(edge_.ptr->properties.UpdateProperties(properties));
for (auto &[property, old_value, new_value] : *id_old_new_change) { for (auto &[property, old_value, new_value] : *id_old_new_change) {
if (skip_duplicate_write && old_value == new_value) continue;
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value)); CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
} }
}}; }};

View File

@ -261,20 +261,31 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
auto current_value = vertex_->properties.GetProperty(property); PropertyValue current_value;
// We could skip setting the value if the previous one is the same to the new const bool skip_duplicate_write = !storage_->config_.salient.items.delta_on_identical_property_update;
// one. This would save some memory as a delta would not be created as well as
// avoid copying the value. The reason we are not doing that is because the
// current code always follows the logical pattern of "create a delta" and
// "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR.
utils::AtomicMemoryBlock atomic_memory_block{ utils::AtomicMemoryBlock atomic_memory_block{
[transaction = transaction_, vertex = vertex_, &value, &property, &current_value]() { [transaction = transaction_, vertex = vertex_, &value, &property, &current_value, skip_duplicate_write]() {
current_value = vertex->properties.GetProperty(property);
// We could skip setting the value if the previous one is the same to the new
// one. This would save some memory as a delta would not be created as well as
// avoid copying the value. The reason we are not doing that is because the
// current code always follows the logical pattern of "create a delta" and
// "modify in-place". Additionally, the created delta will make other
// transactions get a SERIALIZATION_ERROR.
if (skip_duplicate_write && current_value == value) {
return true;
}
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value); CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), property, current_value);
vertex->properties.SetProperty(property, value); vertex->properties.SetProperty(property, value);
return false;
}}; }};
std::invoke(atomic_memory_block); const bool early_exit = std::invoke(atomic_memory_block);
if (early_exit) {
return std::move(current_value);
}
if (transaction_->constraint_verification_info) { if (transaction_->constraint_verification_info) {
if (!value.IsNull()) { if (!value.IsNull()) {
@ -339,27 +350,29 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
if (vertex_->deleted) return Error::DELETED_OBJECT; if (vertex_->deleted) return Error::DELETED_OBJECT;
const bool skip_duplicate_update = storage_->config_.salient.items.delta_on_identical_property_update;
using ReturnType = decltype(vertex_->properties.UpdateProperties(properties)); using ReturnType = decltype(vertex_->properties.UpdateProperties(properties));
std::optional<ReturnType> id_old_new_change; std::optional<ReturnType> id_old_new_change;
utils::AtomicMemoryBlock atomic_memory_block{ utils::AtomicMemoryBlock atomic_memory_block{[storage = storage_, transaction = transaction_, vertex = vertex_,
[storage = storage_, transaction = transaction_, vertex = vertex_, &properties, &id_old_new_change]() { &properties, &id_old_new_change, skip_duplicate_update]() {
id_old_new_change.emplace(vertex->properties.UpdateProperties(properties)); id_old_new_change.emplace(vertex->properties.UpdateProperties(properties));
if (!id_old_new_change.has_value()) { if (!id_old_new_change.has_value()) {
return; return;
}
for (auto &[id, old_value, new_value] : *id_old_new_change) {
storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction);
if (skip_duplicate_update && old_value == new_value) continue;
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value));
transaction->manyDeltasCache.Invalidate(vertex, id);
if (transaction->constraint_verification_info) {
if (!new_value.IsNull()) {
transaction->constraint_verification_info->AddedProperty(vertex);
} else {
transaction->constraint_verification_info->RemovedProperty(vertex);
} }
for (auto &[id, old_value, new_value] : *id_old_new_change) { }
storage->indices_.UpdateOnSetProperty(id, new_value, vertex, *transaction); }
CreateAndLinkDelta(transaction, vertex, Delta::SetPropertyTag(), id, std::move(old_value)); }};
transaction->manyDeltasCache.Invalidate(vertex, id);
if (transaction->constraint_verification_info) {
if (!new_value.IsNull()) {
transaction->constraint_verification_info->AddedProperty(vertex);
} else {
transaction->constraint_verification_info->RemovedProperty(vertex);
}
}
}
}};
std::invoke(atomic_memory_block); std::invoke(atomic_memory_block);
return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{}; return id_old_new_change.has_value() ? std::move(id_old_new_change.value()) : ReturnType{};

View File

@ -29,12 +29,10 @@ class [[nodiscard]] AtomicMemoryBlock {
AtomicMemoryBlock &operator=(AtomicMemoryBlock &&) = delete; AtomicMemoryBlock &operator=(AtomicMemoryBlock &&) = delete;
~AtomicMemoryBlock() = default; ~AtomicMemoryBlock() = default;
void operator()() { auto operator()() -> std::invoke_result_t<Callable> {
{ auto check_on_exit = OnScopeExit{[&] { total_memory_tracker.DoCheck(); }};
utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker; utils::MemoryTracker::OutOfMemoryExceptionBlocker oom_blocker;
function_(); return function_();
}
total_memory_tracker.DoCheck();
} }
private: private:

View File

@ -35,7 +35,7 @@ namespace memgraph::utils {
* // long block of code, might throw an exception * // long block of code, might throw an exception
* } * }
*/ */
template <typename Callable> template <std::invocable Callable>
class [[nodiscard]] OnScopeExit { class [[nodiscard]] OnScopeExit {
public: public:
template <typename U> template <typename U>
@ -46,7 +46,7 @@ class [[nodiscard]] OnScopeExit {
OnScopeExit &operator=(OnScopeExit const &) = delete; OnScopeExit &operator=(OnScopeExit const &) = delete;
OnScopeExit &operator=(OnScopeExit &&) = delete; OnScopeExit &operator=(OnScopeExit &&) = delete;
~OnScopeExit() { ~OnScopeExit() {
if (doCall_) function_(); if (doCall_) std::invoke(std::move(function_));
} }
void Disable() { doCall_ = false; } void Disable() { doCall_ = false; }
@ -57,5 +57,4 @@ class [[nodiscard]] OnScopeExit {
}; };
template <typename Callable> template <typename Callable>
OnScopeExit(Callable &&) -> OnScopeExit<Callable>; OnScopeExit(Callable &&) -> OnScopeExit<Callable>;
} // namespace memgraph::utils } // namespace memgraph::utils

View File

@ -77,6 +77,7 @@ add_subdirectory(garbage_collection)
add_subdirectory(query_planning) add_subdirectory(query_planning)
add_subdirectory(awesome_functions) add_subdirectory(awesome_functions)
add_subdirectory(high_availability) add_subdirectory(high_availability)
add_subdirectory(concurrency)
add_subdirectory(replication_experimental) add_subdirectory(replication_experimental)

View File

@ -0,0 +1,6 @@
function(copy_concurrency_e2e_python_files FILE_NAME)
copy_e2e_python_files(concurrency ${FILE_NAME})
endfunction()
copy_concurrency_e2e_python_files(common.py)
copy_concurrency_e2e_python_files(concurrency.py)

View File

@ -0,0 +1,60 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import typing
import mgclient
import pytest
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
cursor.execute(query, params)
return cursor.fetchall()
def execute_and_fetch_all_with_commit(
connection: mgclient.Connection, query: str, params: dict = {}
) -> typing.List[tuple]:
cursor = connection.cursor()
cursor.execute(query, params)
results = cursor.fetchall()
connection.commit()
return results
@pytest.fixture
def first_connection(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
cursor = connection.cursor()
execute_and_fetch_all(cursor, "USE DATABASE memgraph")
try:
execute_and_fetch_all(cursor, "DROP DATABASE clean")
except:
pass
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
connection.autocommit = False
yield connection
@pytest.fixture
def second_connection(**kwargs) -> mgclient.Connection:
connection = mgclient.connect(host="localhost", port=7687, **kwargs)
connection.autocommit = True
cursor = connection.cursor()
execute_and_fetch_all(cursor, "USE DATABASE memgraph")
try:
execute_and_fetch_all(cursor, "DROP DATABASE clean")
except:
pass
execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
connection.autocommit = False
yield connection

View File

@ -0,0 +1,57 @@
# Copyright 2023 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import pytest
from common import execute_and_fetch_all, first_connection, second_connection
def test_concurrency_if_no_delta_on_same_node_property_update(first_connection, second_connection):
m1c = first_connection.cursor()
m2c = second_connection.cursor()
execute_and_fetch_all(m1c, "CREATE (:Node {prop: 1})")
first_connection.commit()
test_has_error = False
try:
m1c.execute("MATCH (n) SET n.prop = 1")
m2c.execute("MATCH (n) SET n.prop = 1")
first_connection.commit()
second_connection.commit()
except Exception as e:
test_has_error = True
assert test_has_error is False
def test_concurrency_if_no_delta_on_same_edge_property_update(first_connection, second_connection):
m1c = first_connection.cursor()
m2c = second_connection.cursor()
execute_and_fetch_all(m1c, "CREATE ()-[:TYPE {prop: 1}]->()")
first_connection.commit()
test_has_error = False
try:
m1c.execute("MATCH (n)-[r]->(m) SET r.prop = 1")
m2c.execute("MATCH (n)-[r]->(m) SET n.prop = 1")
first_connection.commit()
second_connection.commit()
except Exception as e:
test_has_error = True
assert test_has_error is False
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,14 @@
concurrency_cluster: &concurrency_cluster
cluster:
main:
args: ["--bolt-port", "7687", "--log-level=TRACE", "--storage-delta-on-identical-property-update=false"]
log_file: "concurrency.log"
setup_queries: []
validation_queries: []
workloads:
- name: "Concurrency"
binary: "tests/e2e/pytest_runner.sh"
args: ["concurrency/concurrency.py"]
<<: *concurrency_cluster

View File

@ -141,6 +141,11 @@ startup_config_dict = {
"1", "1",
"The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.", "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.",
), ),
"storage_delta_on_identical_property_update": (
"true",
"true",
"Controls whether updating a property with the same value should create a delta object.",
),
"storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."), "storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."),
"storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."), "storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."),
"storage_items_per_batch": ( "storage_items_per_batch": (