Add supernode vertex cache (#1124)

Add supernode vertex cache to account for long delta chains and modifications in the same module being independent of scanning of the nodes in the next iteration of the pulling mechanism.
This commit is contained in:
Gareth Andrew Lloyd 2023-08-11 09:18:28 +01:00 committed by GitHub
parent adf7533751
commit 2e51e703c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1857 additions and 661 deletions

View File

@ -19,3 +19,16 @@ repos:
rev: v13.0.0
hooks:
- id: clang-format
# - repo: local
# hooks:
# - id: clang-tidy
# name: clang-tidy
# description: Runs clang-tidy and checks for errors
# entry: python ./tools/pre-commit/clang-tidy.py
# language: python
# files: ^src/
# types: [c++, text]
# fail_fast: true
# require_serial: true
# args: [--compile_commands_path=build]
# pass_filenames: false

View File

@ -211,8 +211,8 @@ set(CMAKE_CXX_FLAGS_RELWITHDEBINFO
# ** Static linking is allowed only for executables! **
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++")
# Use gold linker to speedup build
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=gold")
# Use lld linker to speedup build
add_link_options(-fuse-ld=lld) # TODO: use mold linker
# release flags
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")

View File

@ -261,3 +261,7 @@ import_external_library(librdtsc STATIC
# setup ctre
import_header_library(ctre ${CMAKE_CURRENT_SOURCE_DIR})
# setup absl (cmake sub_directory tolerant)
set(ABSL_PROPAGATE_CXX_STD ON)
add_subdirectory(absl EXCLUDE_FROM_ALL)

View File

@ -123,6 +123,7 @@ declare -A primary_urls=(
["pulsar"]="http://$local_cache_host/git/pulsar.git"
["librdtsc"]="http://$local_cache_host/git/librdtsc.git"
["ctre"]="http://$local_cache_host/file/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp"
["absl"]="https://$local_cache_host/git/abseil-cpp.git"
)
# The goal of secondary urls is to have links to the "source of truth" of
@ -149,6 +150,7 @@ declare -A secondary_urls=(
["pulsar"]="https://github.com/apache/pulsar.git"
["librdtsc"]="https://github.com/gabrieleara/librdtsc.git"
["ctre"]="https://raw.githubusercontent.com/hanickadot/compile-time-regular-expressions/v3.7.2/single-header/ctre.hpp"
["absl"]="https://github.com/abseil/abseil-cpp.git"
)
# antlr
@ -246,3 +248,7 @@ mkdir -p ctre
cd ctre
file_get_try_double "${primary_urls[ctre]}" "${secondary_urls[ctre]}"
cd ..
# abseil 20230125.3
absl_ref="20230125.3"
repo_clone_try_double "${primary_urls[absl]}" "${secondary_urls[absl]}" "absl" "$absl_ref"

202
licenses/third-party/abseil-cpp/LICENSE vendored Normal file
View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
https://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,218 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
--- LLVM Exceptions to the Apache 2.0 License ----
As an exception, if, as a result of your compiling your source code, portions
of this Software are embedded into an Object form of such source code, you
may redistribute such embedded portions in such Object form without complying
with the conditions of Sections 4(a), 4(b) and 4(d) of the License.
In addition, if you combine or link compiled forms of this Software with
software that is licensed under the GPLv2 ("Combined Software") and if a
court of competent jurisdiction determines that the patent provision (Section
3), the indemnity provision (Section 9) or other Section of the License
conflicts with the conditions of the GPLv2, you may retroactively and
prospectively choose to deem waived or otherwise exclude such Section(s) of
the License, but only in their entirety and only with respect to the Combined
Software.

View File

@ -3486,9 +3486,7 @@ PreparedQuery PrepareShowDatabasesQuery(ParsedQuery parsed_query, InterpreterCon
}
std::optional<uint64_t> Interpreter::GetTransactionId() const {
if (db_accessor_) {
return db_accessor_->GetTransactionId();
}
if (db_accessor_) return db_accessor_->GetTransactionId();
return {};
}

View File

@ -9,6 +9,9 @@ set(storage_v2_src_files
edge_accessor.cpp
property_store.cpp
vertex_accessor.cpp
vertex_info_cache_fwd.hpp
vertex_info_cache.hpp
vertex_info_cache.cpp
storage.cpp
indices/indices.cpp
all_vertices_iterable.cpp
@ -40,6 +43,17 @@ find_package(gflags REQUIRED)
find_package(Threads REQUIRED)
add_library(mg-storage-v2 STATIC ${storage_v2_src_files})
target_link_libraries(mg-storage-v2 Threads::Threads mg-utils gflags)
target_link_libraries(mg-storage-v2 Threads::Threads mg-utils gflags absl::flat_hash_map)
target_link_libraries(mg-storage-v2 mg-rpc mg-slk)
# Until we get LTO there is an advantage to do some unity builds
set_target_properties(mg-storage-v2
PROPERTIES
UNITY_BUILD ON
UNITY_BUILD_MODE GROUP
)
set_source_files_properties(
vertex_info_cache.cpp vertex_accessor.cpp
PROPERTIES UNITY_GROUP "ensure inline of vertex_info_cache"
)

View File

@ -19,14 +19,13 @@
#include <rocksdb/utilities/transaction_db.h>
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/edge_direction.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/property_store.hpp"
#include "utils/logging.hpp"
namespace memgraph::storage {
enum class EdgeDirection : uint8_t { OUT = 0, IN = 1 };
/// TODO: this should be somehow more wrapped inside the storage class so from the software engineering perspective
/// it isn't great to have this here. But for now it is ok.
/// Wraps RocksDB objects inside a struct. Vertex_chandle and edge_chandle are column family handles that may be

View File

@ -875,6 +875,7 @@ Result<std::optional<VertexAccessor>> DiskStorage::DiskAccessor::DeleteVertex(Ve
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
vertices_to_delete_.emplace_back(utils::SerializeIdType(vertex_ptr->gid), utils::SerializeVertex(*vertex_ptr));
transaction_.manyDeltasCache.Invalidate(vertex_ptr);
return std::make_optional<VertexAccessor>(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_,
config_, true);
@ -945,6 +946,7 @@ DiskStorage::DiskAccessor::DetachDeleteVertex(VertexAccessor *vertex) {
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
vertices_to_delete_.emplace_back(utils::SerializeIdType(vertex_ptr->gid), utils::SerializeVertex(*vertex_ptr));
transaction_.manyDeltasCache.Invalidate(vertex_ptr);
return std::make_optional<ReturnType>(
VertexAccessor{vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, config_, true},
@ -1060,6 +1062,9 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(const VertexAccessor
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_,
@ -1119,6 +1124,9 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from,
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
@ -1206,6 +1214,9 @@ Result<std::optional<EdgeAccessor>> DiskStorage::DiskAccessor::DeleteEdge(EdgeAc
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
// Decrement edge count.
storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel);

View File

@ -0,0 +1,20 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
namespace memgraph::storage {
enum class EdgeDirection : uint8_t {
OUT = 0,
IN = 1,
};
}

View File

@ -14,6 +14,7 @@
#include "storage/v2/mvcc.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_info_helpers.hpp"
#include "utils/spin_lock.hpp"
#include "utils/synchronized.hpp"
@ -156,62 +157,12 @@ inline bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, Prop
});
}
// Helper function for iterating through label index. Returns true if this
// transaction can see the given vertex, and the visible version has the given
// label.
inline bool CurrentVersionHasLabel(const Vertex &vertex, LabelId label, Transaction *transaction, View view) {
bool deleted = false;
bool has_label = false;
const Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex.lock);
deleted = vertex.deleted;
has_label = utils::Contains(vertex.labels, label);
delta = vertex.delta;
}
ApplyDeltasForRead(transaction, delta, view, [&deleted, &has_label, label](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
if (delta.label == label) {
MG_ASSERT(has_label, "Invalid database state!");
has_label = false;
}
break;
}
case Delta::Action::ADD_LABEL: {
if (delta.label == label) {
MG_ASSERT(!has_label, "Invalid database state!");
has_label = true;
}
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
MG_ASSERT(!deleted, "Invalid database state!");
deleted = true;
break;
}
case Delta::Action::RECREATE_OBJECT: {
MG_ASSERT(deleted, "Invalid database state!");
deleted = false;
break;
}
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
}
});
return !deleted && has_label;
}
// Helper function for iterating through label-property index. Returns true if
// this transaction can see the given vertex, and the visible version has the
// given label and property.
inline bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label, PropertyId key,
const PropertyValue &value, Transaction *transaction, View view) {
bool exists = true;
bool deleted = false;
bool has_label = false;
bool current_value_equal_to_value = value.IsNull();
@ -223,46 +174,46 @@ inline bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label,
current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value);
delta = vertex.delta;
}
ApplyDeltasForRead(transaction, delta, view,
[&deleted, &has_label, &current_value_equal_to_value, key, label, &value](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
if (delta.property.key == key) {
current_value_equal_to_value = delta.property.value == value;
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction->manyDeltasCache;
if (auto resError = HasError(view, cache, &vertex, false); resError) return false;
auto resLabel = cache.GetHasLabel(view, &vertex, label);
if (resLabel && *resLabel) {
auto resProp = cache.GetProperty(view, &vertex, key);
if (resProp && *resProp == value) return true;
}
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
MG_ASSERT(!deleted, "Invalid database state!");
deleted = true;
break;
}
case Delta::Action::RECREATE_OBJECT: {
MG_ASSERT(deleted, "Invalid database state!");
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
if (delta.label == label) {
MG_ASSERT(!has_label, "Invalid database state!");
has_label = true;
}
break;
case Delta::Action::REMOVE_LABEL:
if (delta.label == label) {
MG_ASSERT(has_label, "Invalid database state!");
has_label = false;
}
break;
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
}
auto const n_processed = ApplyDeltasForRead(transaction, delta, view, [&, label, key](const Delta &delta) {
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
HasLabel_ActionMethod(has_label, label),
PropertyValueMatch_ActionMethod(current_value_equal_to_value, key,value)
});
return !deleted && has_label && current_value_equal_to_value;
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction->manyDeltasCache;
cache.StoreExists(view, &vertex, exists);
cache.StoreDeleted(view, &vertex, deleted);
cache.StoreHasLabel(view, &vertex, label, has_label);
if (current_value_equal_to_value) {
cache.StoreProperty(view, &vertex, key, value);
}
}
}
return exists && !deleted && has_label && current_value_equal_to_value;
}
template <typename TIndexAccessor>

View File

@ -128,10 +128,12 @@ void InMemoryLabelIndex::Iterable::Iterator::AdvanceUntilValid() {
if (index_iterator_->vertex == current_vertex_) {
continue;
}
if (CurrentVersionHasLabel(*index_iterator_->vertex, self_->label_, self_->transaction_, self_->view_)) {
current_vertex_ = index_iterator_->vertex;
current_vertex_accessor_ = VertexAccessor{current_vertex_, self_->transaction_, self_->indices_,
self_->constraints_, self_->config_.items};
auto accessor = VertexAccessor{index_iterator_->vertex, self_->transaction_, self_->indices_, self_->constraints_,
self_->config_.items};
auto res = accessor.HasLabel(self_->label_, self_->view_);
if (!res.HasError() and res.GetValue()) {
current_vertex_ = accessor.vertex_;
current_vertex_accessor_ = accessor;
break;
}
}

View File

@ -15,6 +15,7 @@
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge_accessor.hpp"
#include "storage/v2/edge_direction.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/stat.hpp"
@ -289,6 +290,7 @@ Result<std::optional<VertexAccessor>> InMemoryStorage::InMemoryAccessor::DeleteV
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
transaction_.manyDeltasCache.Invalidate(vertex_ptr);
// Need to inform the next CollectGarbage call that there are some
// non-transactional deletions that need to be collected
@ -366,6 +368,7 @@ InMemoryStorage::InMemoryAccessor::DetachDeleteVertex(VertexAccessor *vertex) {
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true;
transaction_.manyDeltasCache.Invalidate(vertex_ptr);
// Need to inform the next CollectGarbage call that there are some
// non-transactional deletions that need to be collected
@ -435,6 +438,9 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
@ -507,6 +513,9 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
CreateAndLinkDelta(&transaction_, to_vertex, Delta::RemoveInEdgeTag(), edge_type, from_vertex, edge);
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
// Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
@ -598,6 +607,9 @@ Result<std::optional<EdgeAccessor>> InMemoryStorage::InMemoryAccessor::DeleteEdg
CreateAndLinkDelta(&transaction_, from_vertex, Delta::AddOutEdgeTag(), edge_type, to_vertex, edge_ref);
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type, from_vertex, edge_ref);
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
// Decrement edge count.
storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel);
@ -725,6 +737,7 @@ utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemory
return StorageDataManipulationError{*unique_constraint_violation};
}
}
is_transaction_active_ = false;
if (!could_replicate_all_sync_replicas) {

View File

@ -26,18 +26,22 @@ namespace memgraph::storage {
/// that should be applied it calls the callback function with the delta that
/// should be applied passed as a parameter to the callback. It is up to the
/// caller to apply the deltas.
/// @return number of deltas that were processed
template <typename TCallback>
inline void ApplyDeltasForRead(Transaction *transaction, const Delta *delta, View view, const TCallback &callback) {
inline std::size_t ApplyDeltasForRead(Transaction const *transaction, const Delta *delta, View view,
const TCallback &callback) {
// Avoid work if no deltas or
// IsolationLevel::READ_UNCOMMITTED, where deltas are never applied
if (!delta || transaction->isolation_level == IsolationLevel::READ_UNCOMMITTED) return 0;
// if the transaction is not committed, then its deltas have transaction_id for the timestamp, otherwise they have
// its commit timestamp set.
// This allows the transaction to see its changes even though it's committed.
const auto commit_timestamp = transaction->commit_timestamp
? transaction->commit_timestamp->load(std::memory_order_acquire)
: transaction->transaction_id.load(std::memory_order_acquire);
std::size_t n_processed = 0;
while (delta != nullptr) {
auto ts = delta->timestamp->load(std::memory_order_acquire);
auto cid = delta->command_id;
// For SNAPSHOT ISOLATION -> we can only see the changes which were committed before the start of the current
// transaction
//
@ -46,15 +50,16 @@ inline void ApplyDeltasForRead(Transaction *transaction, const Delta *delta, Vie
// always higher than start or commit timestamps so we know if the timestamp is lower than the initial transaction
// id value, that the change is committed.
//
// For READ UNCOMMITTED -> we accept any change.
// For READ UNCOMMITTED -> we accept any change. (already handled above)
auto ts = delta->timestamp->load(std::memory_order_acquire);
if ((transaction->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION && ts < transaction->start_timestamp) ||
(transaction->isolation_level == IsolationLevel::READ_COMMITTED && ts < kTransactionInitialId) ||
(transaction->isolation_level == IsolationLevel::READ_UNCOMMITTED)) {
(transaction->isolation_level == IsolationLevel::READ_COMMITTED && ts < kTransactionInitialId)) {
break;
}
// We shouldn't undo our newest changes because the user requested a NEW
// view of the database.
auto cid = delta->command_id;
if (view == View::NEW && ts == commit_timestamp && cid <= transaction->command_id) {
break;
}
@ -69,10 +74,12 @@ inline void ApplyDeltasForRead(Transaction *transaction, const Delta *delta, Vie
// This delta must be applied, call the callback.
callback(*delta);
++n_processed;
// Move to the next delta.
delta = delta->next.load(std::memory_order_acquire);
}
return n_processed;
}
/// This function prepares the object for a write. It checks whether there are

View File

@ -120,6 +120,9 @@ std::optional<uint64_t> Storage::Accessor::GetTransactionId() const {
return {};
}
void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; }
void Storage::Accessor::AdvanceCommand() {
transaction_.manyDeltasCache.Clear(); // TODO: Just invalidate the View::OLD cache, NEW should still be fine
++transaction_.command_id;
}
} // namespace memgraph::storage

View File

@ -24,6 +24,7 @@
#include "storage/v2/property_value.hpp"
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/vertex.hpp"
#include "storage/v2/vertex_info_cache.hpp"
#include "storage/v2/view.hpp"
namespace memgraph::storage {
@ -49,7 +50,8 @@ struct Transaction {
deltas(std::move(other.deltas)),
must_abort(other.must_abort),
isolation_level(other.isolation_level),
storage_mode(other.storage_mode) {}
storage_mode(other.storage_mode),
manyDeltasCache{std::move(other.manyDeltasCache)} {}
Transaction(const Transaction &) = delete;
Transaction &operator=(const Transaction &) = delete;
@ -75,6 +77,11 @@ struct Transaction {
bool must_abort;
IsolationLevel isolation_level;
StorageMode storage_mode;
// A cache which is consistent to the current transaction_id + command_id.
// Used to speedup getting info about a vertex when there is a long delta
// chain involved in rebuilding that info.
mutable VertexInfoCache manyDeltasCache;
};
inline bool operator==(const Transaction &first, const Transaction &second) {

View File

@ -21,14 +21,16 @@
#include "storage/v2/mvcc.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v2/result.hpp"
#include "storage/v2/vertex_info_cache.hpp"
#include "storage/v2/vertex_info_helpers.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/variant_helpers.hpp"
namespace memgraph::storage {
namespace detail {
namespace {
std::pair<bool, bool> IsVisible(Vertex *vertex, Transaction *transaction, View view) {
std::pair<bool, bool> IsVisible(Vertex const *vertex, Transaction const *transaction, View view) {
bool exists = true;
bool deleted = false;
Delta *delta = nullptr;
@ -37,31 +39,39 @@ std::pair<bool, bool> IsVisible(Vertex *vertex, Transaction *transaction, View v
deleted = vertex->deleted;
delta = vertex->delta;
}
ApplyDeltasForRead(transaction, delta, view, [&](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction->manyDeltasCache;
auto existsRes = cache.GetExists(view, vertex);
auto deletedRes = cache.GetDeleted(view, vertex);
if (existsRes && deletedRes) return {*existsRes, *deletedRes};
}
auto const n_processed = ApplyDeltasForRead(transaction, delta, view, [&](const Delta &delta) {
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction->manyDeltasCache;
cache.StoreExists(view, vertex, exists);
cache.StoreDeleted(view, vertex, deleted);
}
}
return {exists, deleted};
}
} // namespace
} // namespace detail
std::optional<VertexAccessor> VertexAccessor::Create(Vertex *vertex, Transaction *transaction, Indices *indices,
@ -92,6 +102,7 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
/// TODO: some by pointers, some by reference => not good, make it better
constraints_->unique_constraints_->UpdateOnAddLabel(label, *vertex_, transaction_->start_timestamp);
indices_->UpdateOnAddLabel(label, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, label);
return true;
}
@ -107,12 +118,13 @@ Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
if (it == vertex_->labels.end()) return false;
CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label);
std::swap(*it, *vertex_->labels.rbegin());
*it = vertex_->labels.back();
vertex_->labels.pop_back();
/// TODO: some by pointers, some by reference => not good, make it better
constraints_->unique_constraints_->UpdateOnRemoveLabel(label, *vertex_, transaction_->start_timestamp);
indices_->UpdateOnRemoveLabel(label, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, label);
return true;
}
@ -128,39 +140,37 @@ Result<bool> VertexAccessor::HasLabel(LabelId label, View view) const {
has_label = std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end();
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &has_label, label](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
if (delta.label == label) {
MG_ASSERT(has_label, "Invalid database state!");
has_label = false;
}
break;
}
case Delta::Action::ADD_LABEL: {
if (delta.label == label) {
MG_ASSERT(!has_label, "Invalid database state!");
has_label = true;
}
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction_->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction_->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction_->manyDeltasCache;
if (auto resError = HasError(view, cache, vertex_, for_deleted_); resError) return *resError;
if (auto resLabel = cache.GetHasLabel(view, vertex_, label); resLabel) return {resLabel.value()};
}
auto const n_processed = ApplyDeltasForRead(transaction_, delta, view, [&, label](const Delta &delta) {
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
HasLabel_ActionMethod(has_label, label)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction_->manyDeltasCache;
cache.StoreExists(view, vertex_, exists);
cache.StoreDeleted(view, vertex_, deleted);
cache.StoreHasLabel(view, vertex_, label, has_label);
}
}
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return has_label;
@ -177,40 +187,37 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
labels = vertex_->labels;
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &labels](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
// Remove the label because we don't see the addition.
auto it = std::find(labels.begin(), labels.end(), delta.label);
MG_ASSERT(it != labels.end(), "Invalid database state!");
std::swap(*it, *labels.rbegin());
labels.pop_back();
break;
}
case Delta::Action::ADD_LABEL: {
// Add the label because we don't see the removal.
auto it = std::find(labels.begin(), labels.end(), delta.label);
MG_ASSERT(it == labels.end(), "Invalid database state!");
labels.push_back(delta.label);
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction_->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction_->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction_->manyDeltasCache;
if (auto resError = HasError(view, cache, vertex_, for_deleted_); resError) return *resError;
if (auto resLabels = cache.GetLabels(view, vertex_); resLabels) return {*resLabels};
}
auto const n_processed = ApplyDeltasForRead(transaction_, delta, view, [&](const Delta &delta) {
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
Labels_ActionMethod(labels)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction_->manyDeltasCache;
cache.StoreExists(view, vertex_, exists);
cache.StoreDeleted(view, vertex_, deleted);
cache.StoreLabels(view, vertex_, labels);
}
}
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return std::move(labels);
@ -236,6 +243,7 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
vertex_->properties.SetProperty(property, value);
indices_->UpdateOnSetProperty(property, value, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, property);
return std::move(current_value);
}
@ -252,6 +260,7 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, PropertyValue());
indices_->UpdateOnSetProperty(property, value, vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, property);
}
return true;
@ -271,6 +280,7 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
for (auto &[id, old_value, new_value] : id_old_new_change) {
indices_->UpdateOnSetProperty(id, new_value, vertex_, *transaction_);
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), id, std::move(old_value));
transaction_->manyDeltasCache.Invalidate(vertex_, id);
}
return id_old_new_change;
@ -284,9 +294,10 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
if (vertex_->deleted) return Error::DELETED_OBJECT;
auto properties = vertex_->properties.Properties();
for (const auto &property : properties) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property.first, property.second);
indices_->UpdateOnSetProperty(property.first, PropertyValue(), vertex_, *transaction_);
for (const auto &[property, value] : properties) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property, value);
indices_->UpdateOnSetProperty(property, PropertyValue(), vertex_, *transaction_);
transaction_->manyDeltasCache.Invalidate(vertex_, property);
}
vertex_->properties.ClearProperties();
@ -305,32 +316,38 @@ Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view
value = vertex_->properties.GetProperty(property);
delta = vertex_->delta;
}
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction_->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction_->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction_->manyDeltasCache;
if (auto resError = HasError(view, cache, vertex_, for_deleted_); resError) return *resError;
if (auto resProperty = cache.GetProperty(view, vertex_, property); resProperty) return {*resProperty};
}
auto const n_processed =
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &value, property](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
if (delta.property.key == property) {
value = delta.property.value;
}
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
}
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
PropertyValue_ActionMethod(value, property)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction_->manyDeltasCache;
cache.StoreExists(view, vertex_, exists);
cache.StoreDeleted(view, vertex_, deleted);
cache.StoreProperty(view, vertex_, property, value);
}
}
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return std::move(value);
@ -347,41 +364,38 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view
properties = vertex_->properties.Properties();
delta = vertex_->delta;
}
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction_->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction_->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction_->manyDeltasCache;
if (auto resError = HasError(view, cache, vertex_, for_deleted_); resError) return *resError;
if (auto resProperties = cache.GetProperties(view, vertex_); resProperties) return {*resProperties};
}
auto const n_processed =
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &properties](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
auto it = properties.find(delta.property.key);
if (it != properties.end()) {
if (delta.property.value.IsNull()) {
// remove the property
properties.erase(it);
} else {
// set the value
it->second = delta.property.value;
}
} else if (!delta.property.value.IsNull()) {
properties.emplace(delta.property.key, delta.property.value);
}
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
}
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
Properties_ActionMethod(properties)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction_->manyDeltasCache;
cache.StoreExists(view, vertex_, exists);
cache.StoreDeleted(view, vertex_, deleted);
cache.StoreProperties(view, vertex_, properties);
}
}
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return std::move(properties);
@ -390,90 +404,100 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view
Result<std::vector<EdgeAccessor>> VertexAccessor::InEdges(View view, const std::vector<EdgeTypeId> &edge_types,
const VertexAccessor *destination) const {
MG_ASSERT(!destination || destination->transaction_ == transaction_, "Invalid accessor!");
using edge_store = std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>>;
// We return EdgeAccessors, this method with wrap the results in EdgeAccessors
auto const build_result = [this](edge_store const &edges) -> std::vector<EdgeAccessor> {
auto ret = std::vector<EdgeAccessor>{};
ret.reserve(edges.size());
for (auto const &[edge_type, from_vertex, edge] : edges) {
ret.emplace_back(edge, edge_type, from_vertex, vertex_, transaction_, indices_, constraints_, config_);
}
return ret;
};
auto const *destination_vertex = destination ? destination->vertex_ : nullptr;
bool exists = true;
bool deleted = false;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
auto in_edges = edge_store{};
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
// TODO: a better filter copy
if (edge_types.empty() && !destination) {
in_edges = vertex_->in_edges;
} else {
for (const auto &item : vertex_->in_edges) {
const auto &[edge_type, from_vertex, edge] = item;
if (destination && from_vertex != destination->vertex_) continue;
for (const auto &[edge_type, from_vertex, edge] : vertex_->in_edges) {
if (destination && from_vertex != destination_vertex) continue;
if (!edge_types.empty() && std::find(edge_types.begin(), edge_types.end(), edge_type) == edge_types.end())
continue;
in_edges.push_back(item);
in_edges.emplace_back(edge_type, from_vertex, edge);
}
}
delta = vertex_->delta;
}
ApplyDeltasForRead(
transaction_, delta, view, [&exists, &deleted, &in_edges, &edge_types, &destination](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_IN_EDGE: {
if (destination && delta.vertex_edge.vertex != destination->vertex_) break;
if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
break;
// Add the edge because we don't see the removal.
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{delta.vertex_edge.edge_type, delta.vertex_edge.vertex,
delta.vertex_edge.edge};
auto it = std::find(in_edges.begin(), in_edges.end(), link);
MG_ASSERT(it == in_edges.end(), "Invalid database state!");
in_edges.push_back(link);
break;
}
case Delta::Action::REMOVE_IN_EDGE: {
if (destination && delta.vertex_edge.vertex != destination->vertex_) break;
if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
break;
// Remove the label because we don't see the addition.
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{delta.vertex_edge.edge_type, delta.vertex_edge.vertex,
delta.vertex_edge.edge};
auto it = std::find(in_edges.begin(), in_edges.end(), link);
MG_ASSERT(it != in_edges.end(), "Invalid database state!");
std::swap(*it, *in_edges.rbegin());
in_edges.pop_back();
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction_->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction_->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction_->manyDeltasCache;
if (auto resError = HasError(view, cache, vertex_, for_deleted_); resError) return *resError;
if (auto resInEdges = cache.GetInEdges(view, vertex_, destination_vertex, edge_types); resInEdges)
return {build_result(*resInEdges)};
}
auto const n_processed = ApplyDeltasForRead(
transaction_, delta, view,
[&exists, &deleted, &in_edges, &edge_types, &destination_vertex](const Delta &delta) {
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
Edges_ActionMethod<EdgeDirection::IN>(in_edges, edge_types, destination_vertex)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction_->manyDeltasCache;
cache.StoreExists(view, vertex_, exists);
cache.StoreDeleted(view, vertex_, deleted);
cache.StoreInEdges(view, vertex_, destination_vertex, edge_types, in_edges);
}
}
if (!exists) return Error::NONEXISTENT_OBJECT;
if (deleted) return Error::DELETED_OBJECT;
std::vector<EdgeAccessor> ret;
ret.reserve(in_edges.size());
for (const auto &item : in_edges) {
const auto &[edge_type, from_vertex, edge] = item;
ret.emplace_back(edge, edge_type, from_vertex, vertex_, transaction_, indices_, constraints_, config_);
}
return std::move(ret);
return build_result(in_edges);
}
Result<std::vector<EdgeAccessor>> VertexAccessor::OutEdges(View view, const std::vector<EdgeTypeId> &edge_types,
const VertexAccessor *destination) const {
MG_ASSERT(!destination || destination->transaction_ == transaction_, "Invalid accessor!");
using edge_store = std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>>;
auto const build_result = [this](edge_store const &out_edges) {
auto ret = std::vector<EdgeAccessor>{};
ret.reserve(out_edges.size());
for (const auto &[edge_type, to_vertex, edge] : out_edges) {
ret.emplace_back(edge, edge_type, vertex_, to_vertex, transaction_, indices_, constraints_, config_);
}
return ret;
};
auto const *dst_vertex = destination ? destination->vertex_ : nullptr;
bool exists = true;
bool deleted = false;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
auto out_edges = edge_store{};
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
@ -481,72 +505,51 @@ Result<std::vector<EdgeAccessor>> VertexAccessor::OutEdges(View view, const std:
if (edge_types.empty() && !destination) {
out_edges = vertex_->out_edges;
} else {
for (const auto &item : vertex_->out_edges) {
const auto &[edge_type, to_vertex, edge] = item;
if (destination && to_vertex != destination->vertex_) continue;
for (const auto &[edge_type, to_vertex, edge] : vertex_->out_edges) {
if (destination && to_vertex != dst_vertex) continue;
if (!edge_types.empty() && std::find(edge_types.begin(), edge_types.end(), edge_type) == edge_types.end())
continue;
out_edges.push_back(item);
out_edges.emplace_back(edge_type, to_vertex, edge);
}
}
delta = vertex_->delta;
}
ApplyDeltasForRead(
transaction_, delta, view, [&exists, &deleted, &out_edges, &edge_types, &destination](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_OUT_EDGE: {
if (destination && delta.vertex_edge.vertex != destination->vertex_) break;
if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
break;
// Add the edge because we don't see the removal.
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{delta.vertex_edge.edge_type, delta.vertex_edge.vertex,
delta.vertex_edge.edge};
auto it = std::find(out_edges.begin(), out_edges.end(), link);
MG_ASSERT(it == out_edges.end(), "Invalid database state!");
out_edges.push_back(link);
break;
}
case Delta::Action::REMOVE_OUT_EDGE: {
if (destination && delta.vertex_edge.vertex != destination->vertex_) break;
if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
break;
// Remove the label because we don't see the addition.
std::tuple<EdgeTypeId, Vertex *, EdgeRef> link{delta.vertex_edge.edge_type, delta.vertex_edge.vertex,
delta.vertex_edge.edge};
auto it = std::find(out_edges.begin(), out_edges.end(), link);
MG_ASSERT(it != out_edges.end(), "Invalid database state!");
std::swap(*it, *out_edges.rbegin());
out_edges.pop_back();
break;
}
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT: {
exists = false;
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
break;
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction_->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction_->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction_->manyDeltasCache;
if (auto resError = HasError(view, cache, vertex_, for_deleted_); resError) return *resError;
if (auto resOutEdges = cache.GetOutEdges(view, vertex_, dst_vertex, edge_types); resOutEdges)
return {build_result(*resOutEdges)};
}
auto const n_processed = ApplyDeltasForRead(
transaction_, delta, view, [&exists, &deleted, &out_edges, &edge_types, &dst_vertex](const Delta &delta) {
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
Edges_ActionMethod<EdgeDirection::OUT>(out_edges, edge_types, dst_vertex)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction_->manyDeltasCache;
cache.StoreExists(view, vertex_, exists);
cache.StoreDeleted(view, vertex_, deleted);
cache.StoreOutEdges(view, vertex_, dst_vertex, edge_types, out_edges);
}
}
if (!exists) return Error::NONEXISTENT_OBJECT;
if (deleted) return Error::DELETED_OBJECT;
std::vector<EdgeAccessor> ret;
ret.reserve(out_edges.size());
for (const auto &item : out_edges) {
const auto &[edge_type, to_vertex, edge] = item;
ret.emplace_back(edge, edge_type, vertex_, to_vertex, transaction_, indices_, constraints_, config_);
}
return std::move(ret);
return build_result(out_edges);
}
Result<size_t> VertexAccessor::InDegree(View view) const {
@ -560,29 +563,38 @@ Result<size_t> VertexAccessor::InDegree(View view) const {
degree = vertex_->in_edges.size();
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &degree](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_IN_EDGE:
++degree;
break;
case Delta::Action::REMOVE_IN_EDGE:
--degree;
break;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
exists = false;
break;
case Delta::Action::RECREATE_OBJECT:
deleted = false;
break;
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_OUT_EDGE:
break;
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction_->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction_->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction_->manyDeltasCache;
if (auto resError = HasError(view, cache, vertex_, for_deleted_); resError) return *resError;
if (auto resInDegree = cache.GetInDegree(view, vertex_); resInDegree) return {*resInDegree};
}
auto const n_processed =
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &degree](const Delta &delta) {
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
Degree_ActionMethod<EdgeDirection::IN>(degree)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction_->manyDeltasCache;
cache.StoreExists(view, vertex_, exists);
cache.StoreDeleted(view, vertex_, deleted);
cache.StoreInDegree(view, vertex_, degree);
}
}
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return degree;
@ -599,29 +611,38 @@ Result<size_t> VertexAccessor::OutDegree(View view) const {
degree = vertex_->out_edges.size();
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &degree](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_OUT_EDGE:
++degree;
break;
case Delta::Action::REMOVE_OUT_EDGE:
--degree;
break;
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
case Delta::Action::DELETE_OBJECT:
exists = false;
break;
case Delta::Action::RECREATE_OBJECT:
deleted = false;
break;
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
break;
// Checking cache has a cost, only do it if we have any deltas
// if we have no deltas then what we already have from the vertex is correct.
if (delta && transaction_->isolation_level != IsolationLevel::READ_UNCOMMITTED) {
// IsolationLevel::READ_COMMITTED would be tricky to propagate invalidation to
// so for now only cache for IsolationLevel::SNAPSHOT_ISOLATION
auto const useCache = transaction_->isolation_level == IsolationLevel::SNAPSHOT_ISOLATION;
if (useCache) {
auto const &cache = transaction_->manyDeltasCache;
if (auto resError = HasError(view, cache, vertex_, for_deleted_); resError) return *resError;
if (auto resOutDegree = cache.GetOutDegree(view, vertex_); resOutDegree) return {*resOutDegree};
}
auto const n_processed =
ApplyDeltasForRead(transaction_, delta, view, [&exists, &deleted, &degree](const Delta &delta) {
// clang-format off
DeltaDispatch(delta, utils::ChainedOverloaded{
Deleted_ActionMethod(deleted),
Exists_ActionMethod(exists),
Degree_ActionMethod<EdgeDirection::OUT>(degree)
});
// clang-format on
});
if (useCache && n_processed >= FLAGS_delta_chain_cache_threashold) {
auto &cache = transaction_->manyDeltasCache;
cache.StoreExists(view, vertex_, exists);
cache.StoreDeleted(view, vertex_, deleted);
cache.StoreOutDegree(view, vertex_, degree);
}
}
if (!exists) return Error::NONEXISTENT_OBJECT;
if (!for_deleted_ && deleted) return Error::DELETED_OBJECT;
return degree;

View File

@ -0,0 +1,196 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/vertex_info_cache.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/flag_validation.hpp"
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_uint64(delta_chain_cache_threashold, 128,
"The threshold for when to cache long delta chains. This is used for heavy read + write "
"workloads where repeated processing of delta chains can become costly.",
{ return value > 0; });
namespace memgraph::storage {
/// Helpers to reduce typo errors
template <typename Ret, typename Func, typename... Keys>
auto FetchHelper(VertexInfoCache const &caches, Func &&getCache, View view, Keys &&...keys)
-> std::optional<std::conditional_t<std::is_trivially_copyable_v<Ret>, Ret, std::reference_wrapper<Ret const>>> {
auto const &cache = (view == View::OLD) ? getCache(caches.old_) : getCache(caches.new_);
// check empty first, cheaper than the relative cost of doing an actual hash + find
if (cache.empty()) return std::nullopt;
// defer building the key, maybe a cost at construction
using key_type = typename std::remove_cvref_t<decltype(cache)>::key_type;
auto const it = cache.find(key_type{std::forward<Keys>(keys)...});
if (it == cache.end()) return std::nullopt;
if constexpr (std::is_trivially_copyable_v<Ret>) {
return {it->second};
} else {
// ensure we return a reference
return std::cref(it->second);
}
}
template <typename Value, typename Func, typename... Keys>
void Store(Value &&value, VertexInfoCache &caches, Func &&getCache, View view, Keys &&...keys) {
auto &cache = (view == View::OLD) ? getCache(caches.old_) : getCache(caches.new_);
using key_type = typename std::remove_cvref_t<decltype(cache)>::key_type;
cache.emplace(key_type{std::forward<Keys>(keys)...}, std::forward<Value>(value));
}
VertexInfoCache::VertexInfoCache() = default;
VertexInfoCache::~VertexInfoCache() = default;
VertexInfoCache::VertexInfoCache(VertexInfoCache &&) noexcept = default;
VertexInfoCache &VertexInfoCache::operator=(VertexInfoCache &&) noexcept = default;
auto VertexInfoCache::GetExists(View view, Vertex const *vertex) const -> std::optional<bool> {
return FetchHelper<bool>(*this, std::mem_fn(&Caches::existsCache_), view, vertex);
}
void VertexInfoCache::StoreExists(View view, Vertex const *vertex, bool res) {
Store(res, *this, std::mem_fn(&Caches::existsCache_), view, vertex);
}
auto VertexInfoCache::GetDeleted(View view, Vertex const *vertex) const -> std::optional<bool> {
return FetchHelper<bool>(*this, std::mem_fn(&Caches::deletedCache_), view, vertex);
}
void VertexInfoCache::StoreDeleted(View view, Vertex const *vertex, bool res) {
Store(res, *this, std::mem_fn(&Caches::deletedCache_), view, vertex);
}
void VertexInfoCache::Invalidate(Vertex const *vertex) {
new_.existsCache_.erase(vertex);
new_.deletedCache_.erase(vertex);
new_.labelCache_.erase(vertex);
new_.propertiesCache_.erase(vertex);
new_.inDegreeCache_.erase(vertex);
new_.outDegreeCache_.erase(vertex);
// aggressive cache invalidation, TODO: be smarter
new_.hasLabelCache_.clear();
new_.propertyValueCache_.clear();
new_.inEdgesCache_.clear();
new_.outEdgesCache_.clear();
}
auto VertexInfoCache::GetLabels(View view, Vertex const *vertex) const
-> std::optional<std::reference_wrapper<std::vector<LabelId> const>> {
return FetchHelper<std::vector<LabelId>>(*this, std::mem_fn(&VertexInfoCache::Caches::labelCache_), view, vertex);
}
void VertexInfoCache::StoreLabels(View view, Vertex const *vertex, const std::vector<LabelId> &res) {
Store(res, *this, std::mem_fn(&Caches::labelCache_), view, vertex);
}
auto VertexInfoCache::GetHasLabel(View view, Vertex const *vertex, LabelId label) const -> std::optional<bool> {
return FetchHelper<bool>(*this, std::mem_fn(&Caches::hasLabelCache_), view, vertex, label);
}
void VertexInfoCache::StoreHasLabel(View view, Vertex const *vertex, LabelId label, bool res) {
Store(res, *this, std::mem_fn(&Caches::hasLabelCache_), view, vertex, label);
}
void VertexInfoCache::Invalidate(Vertex const *vertex, LabelId label) {
new_.labelCache_.erase(vertex);
new_.hasLabelCache_.erase(std::tuple{vertex, label});
}
auto VertexInfoCache::GetProperty(View view, Vertex const *vertex, PropertyId property) const
-> std::optional<std::reference_wrapper<PropertyValue const>> {
return FetchHelper<PropertyValue>(*this, std::mem_fn(&Caches::propertyValueCache_), view, vertex, property);
}
void VertexInfoCache::StoreProperty(View view, Vertex const *vertex, PropertyId property, PropertyValue value) {
Store(std::move(value), *this, std::mem_fn(&Caches::propertyValueCache_), view, vertex, property);
}
auto VertexInfoCache::GetProperties(View view, Vertex const *vertex) const
-> std::optional<std::reference_wrapper<std::map<PropertyId, PropertyValue> const>> {
return FetchHelper<std::map<PropertyId, PropertyValue>>(*this, std::mem_fn(&Caches::propertiesCache_), view, vertex);
}
void VertexInfoCache::StoreProperties(View view, Vertex const *vertex, std::map<PropertyId, PropertyValue> properties) {
Store(std::move(properties), *this, std::mem_fn(&Caches::propertiesCache_), view, vertex);
}
void VertexInfoCache::Invalidate(Vertex const *vertex, PropertyId property_key) {
new_.propertiesCache_.erase(vertex);
new_.propertyValueCache_.erase(std::tuple{vertex, property_key});
}
auto VertexInfoCache::GetInEdges(View view, Vertex const *src_vertex, Vertex const *dst_vertex,
const std::vector<EdgeTypeId> &edge_types) const
-> std::optional<std::reference_wrapper<EdgeStore const>> {
return FetchHelper<EdgeStore>(*this, std::mem_fn(&Caches::inEdgesCache_), view, src_vertex, dst_vertex, edge_types);
}
void VertexInfoCache::StoreInEdges(View view, Vertex const *src_vertex, Vertex const *dst_vertex,
std::vector<EdgeTypeId> edge_types, EdgeStore in_edges) {
Store(std::move(in_edges), *this, std::mem_fn(&Caches::inEdgesCache_), view, src_vertex, dst_vertex,
std::move(edge_types));
}
auto VertexInfoCache::GetOutEdges(View view, Vertex const *src_vertex, Vertex const *dst_vertex,
const std::vector<EdgeTypeId> &edge_types) const
-> std::optional<std::reference_wrapper<const EdgeStore>> {
return FetchHelper<EdgeStore>(*this, std::mem_fn(&Caches::outEdgesCache_), view, src_vertex, dst_vertex, edge_types);
}
void VertexInfoCache::StoreOutEdges(View view, Vertex const *src_vertex, Vertex const *dst_vertex,
std::vector<EdgeTypeId> edge_types, EdgeStore out_edges) {
Store(std::move(out_edges), *this, std::mem_fn(&Caches::outEdgesCache_), view, src_vertex, dst_vertex,
std::move(edge_types));
}
auto VertexInfoCache::GetInDegree(View view, Vertex const *vertex) const -> std::optional<std::size_t> {
return FetchHelper<std::size_t>(*this, std::mem_fn(&Caches::inDegreeCache_), view, vertex);
}
void VertexInfoCache::StoreInDegree(View view, Vertex const *vertex, std::size_t in_degree) {
Store(in_degree, *this, std::mem_fn(&Caches::inDegreeCache_), view, vertex);
}
auto VertexInfoCache::GetOutDegree(View view, Vertex const *vertex) const -> std::optional<std::size_t> {
return FetchHelper<std::size_t>(*this, std::mem_fn(&Caches::outDegreeCache_), view, vertex);
}
void VertexInfoCache::StoreOutDegree(View view, Vertex const *vertex, std::size_t out_degree) {
Store(out_degree, *this, std::mem_fn(&Caches::outDegreeCache_), view, vertex);
}
void VertexInfoCache::Invalidate(Vertex const *vertex, EdgeTypeId /*unused*/, EdgeDirection direction) {
// EdgeTypeId is currently unused but could be used to be more precise in future
if (direction == EdgeDirection::IN) {
new_.inDegreeCache_.erase(vertex);
new_.inEdgesCache_.clear(); // TODO: be more precise
} else {
new_.outDegreeCache_.erase(vertex);
new_.outEdgesCache_.clear(); // TODO: be more precise
}
}
void VertexInfoCache::Clear() {
old_.Clear();
new_.Clear();
}
void VertexInfoCache::Caches::Clear() {
existsCache_.clear();
deletedCache_.clear();
hasLabelCache_.clear();
propertyValueCache_.clear();
labelCache_.clear();
propertiesCache_.clear();
inEdgesCache_.clear();
outEdgesCache_.clear();
inDegreeCache_.clear();
outDegreeCache_.clear();
}
VertexInfoCache::EdgeKey::EdgeKey(Vertex const *src_vertex, Vertex const *dst_vertex,
std::vector<EdgeTypeId> edge_types)
: src_vertex_{src_vertex}, dst_vertex_{dst_vertex}, edge_types_{std::move(edge_types)} {
// needed for a canonical form
std::sort(edge_types_.begin(), edge_types_.end());
}
} // namespace memgraph::storage

View File

@ -0,0 +1,169 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "storage/v2/edge_direction.hpp"
#include "storage/v2/edge_ref.hpp"
#include "storage/v2/id_types.hpp"
#include "storage/v2/view.hpp"
#include "absl/container/flat_hash_map.h"
#include <gflags/gflags.h>
#include <tuple>
DECLARE_uint64(delta_chain_cache_threashold);
namespace memgraph::storage {
namespace detail {
// TODO: move to a general C++ utility header
template <typename T>
using optref = std::optional<std::reference_wrapper<T>>;
} // namespace detail
// forward declarations
struct Vertex;
struct Transaction;
class PropertyValue;
/** For vertices with long delta chains, its possible that its expensive
* to rebuild state for the relevant transaction. This cache is used to
* store information that would be expensive to repeatedly rebuild.
*
* The cache is assumed to be used in reference to a given:
* - transaction_id
* - command_id
* - only for View::OLD
*/
struct VertexInfoCache final {
VertexInfoCache();
~VertexInfoCache();
// By design would be a mistake to copy the cache
VertexInfoCache(VertexInfoCache const &) = delete;
VertexInfoCache &operator=(VertexInfoCache const &) = delete;
VertexInfoCache(VertexInfoCache &&) noexcept;
VertexInfoCache &operator=(VertexInfoCache &&) noexcept;
auto GetExists(View view, Vertex const *vertex) const -> std::optional<bool>;
void StoreExists(View view, Vertex const *vertex, bool res);
auto GetDeleted(View view, Vertex const *vertex) const -> std::optional<bool>;
void StoreDeleted(View view, Vertex const *vertex, bool res);
void Invalidate(Vertex const *vertex);
auto GetLabels(View view, Vertex const *vertex) const -> detail::optref<std::vector<LabelId> const>;
void StoreLabels(View view, Vertex const *vertex, std::vector<LabelId> const &res);
auto GetHasLabel(View view, Vertex const *vertex, LabelId label) const -> std::optional<bool>;
void StoreHasLabel(View view, Vertex const *vertex, LabelId label, bool res);
void Invalidate(Vertex const *vertex, LabelId label);
auto GetProperty(View view, Vertex const *vertex, PropertyId property) const -> detail::optref<PropertyValue const>;
void StoreProperty(View view, Vertex const *vertex, PropertyId property, PropertyValue value);
auto GetProperties(View view, Vertex const *vertex) const
-> detail::optref<std::map<PropertyId, PropertyValue> const>;
void StoreProperties(View view, Vertex const *vertex, std::map<PropertyId, PropertyValue> properties);
void Invalidate(Vertex const *vertex, PropertyId property_key);
using EdgeStore = std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>>;
auto GetInEdges(View view, Vertex const *src_vertex, Vertex const *dst_vertex,
const std::vector<EdgeTypeId> &edge_types) const -> detail::optref<const EdgeStore>;
void StoreInEdges(View view, Vertex const *src_vertex, Vertex const *dst_vertex, std::vector<EdgeTypeId> edge_types,
EdgeStore in_edges);
auto GetOutEdges(View view, Vertex const *src_vertex, Vertex const *dst_vertex,
const std::vector<EdgeTypeId> &edge_types) const -> detail::optref<const EdgeStore>;
void StoreOutEdges(View view, Vertex const *src_vertex, Vertex const *dst_vertex, std::vector<EdgeTypeId> edge_types,
EdgeStore out_edges);
auto GetInDegree(View view, Vertex const *vertex) const -> std::optional<std::size_t>;
void StoreInDegree(View view, Vertex const *vertex, std::size_t in_degree);
auto GetOutDegree(View view, Vertex const *vertex) const -> std::optional<std::size_t>;
void StoreOutDegree(View view, Vertex const *vertex, std::size_t out_degree);
void Invalidate(Vertex const *vertex, EdgeTypeId /*unused*/, EdgeDirection direction);
void Clear();
private:
/// Note: not a tuple because need a canonical form for the edge types
struct EdgeKey {
EdgeKey(Vertex const *src_vertex, Vertex const *dst_vertex, std::vector<EdgeTypeId> edge_types);
friend bool operator==(EdgeKey const &, EdgeKey const &) = default;
friend bool operator==(EdgeKey const &lhs, std::tuple<Vertex const *, EdgeTypeId> const &rhs) {
return lhs.src_vertex_ == std::get<0>(rhs) &&
std::find(lhs.edge_types_.begin(), lhs.edge_types_.end(), std::get<1>(rhs)) != lhs.edge_types_.end();
}
template <typename H>
friend H AbslHashValue(H h, EdgeKey const &key) {
return H::combine(std::move(h), key.src_vertex_, key.dst_vertex_, key.edge_types_);
}
private:
Vertex const *src_vertex_;
Vertex const *dst_vertex_;
std::vector<EdgeTypeId> edge_types_; // TODO: use a SBO set type
};
struct Caches {
void Clear();
template <typename K, typename V>
using map = absl::flat_hash_map<K, V>;
// TODO: Why are exists + deleted separate, is there a good reason?
map<Vertex const *, bool> existsCache_;
map<Vertex const *, bool> deletedCache_;
map<std::tuple<Vertex const *, LabelId>, bool> hasLabelCache_;
map<std::tuple<Vertex const *, PropertyId>, PropertyValue> propertyValueCache_;
map<Vertex const *, std::vector<LabelId>> labelCache_;
map<Vertex const *, std::map<PropertyId, PropertyValue>> propertiesCache_;
// TODO: nest keys (edge_types) -> (src+dst) -> EdgeStore
map<EdgeKey, EdgeStore> inEdgesCache_;
// TODO: nest keys (edge_types) -> (src+dst) -> EdgeStore
map<EdgeKey, EdgeStore> outEdgesCache_;
map<Vertex const *, size_t> inDegreeCache_;
map<Vertex const *, size_t> outDegreeCache_;
};
Caches old_;
Caches new_;
// Helpers
template <typename Ret, typename Func, typename... Keys>
friend auto FetchHelper(VertexInfoCache const &caches, Func &&getCache, View view, Keys &&...keys)
-> std::optional<std::conditional_t<std::is_trivially_copyable_v<Ret>, Ret, std::reference_wrapper<Ret const>>>;
template <typename Value, typename Func, typename... Keys>
friend void Store(Value &&value, VertexInfoCache &caches, Func &&getCache, View view, Keys &&...keys);
};
} // namespace memgraph::storage

View File

@ -0,0 +1,15 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
namespace memgraph::storage {
struct VertexInfoCache;
}

View File

@ -0,0 +1,209 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include "storage/v2/delta.hpp"
#include "storage/v2/edge_direction.hpp"
#include "storage/v2/vertex_info_cache.hpp"
#include "utils/variant_helpers.hpp"
#include <algorithm>
#include <tuple>
#include <vector>
namespace memgraph::storage {
template <Delta::Action>
struct DeltaAction_tag {};
template <Delta::Action A, typename Method>
struct ActionMethodImpl : Method {
// Uses tag dispatch to ensure method is only called for the correct action
void operator()(DeltaAction_tag<A> /*unused*/, Delta const &delta) { Method::operator()(delta); }
};
template <Delta::Action A, typename Method>
inline auto ActionMethod(Method &&func) {
return ActionMethodImpl<A, Method>{std::forward<Method>(func)};
}
/// Converts runtime Delta::Action into compile time tag, this allows us to dispatch to the correct overload
template <typename Func>
inline void DeltaDispatch(Delta const &delta, Func &&func) {
// clang-format off
#define dispatch(E) case E: return func(DeltaAction_tag<E>{}, delta); // NOLINT
// clang-format on
switch (delta.action) {
using enum Delta::Action;
dispatch(DELETE_DESERIALIZED_OBJECT);
dispatch(DELETE_OBJECT);
dispatch(RECREATE_OBJECT);
dispatch(SET_PROPERTY);
dispatch(ADD_LABEL);
dispatch(REMOVE_LABEL);
dispatch(ADD_IN_EDGE);
dispatch(ADD_OUT_EDGE);
dispatch(REMOVE_IN_EDGE);
dispatch(REMOVE_OUT_EDGE);
}
#undef dispatch
}
inline auto Exists_ActionMethod(bool &exists) {
using enum Delta::Action;
// clang-format off
return utils::Overloaded{
ActionMethod<DELETE_DESERIALIZED_OBJECT>([&](Delta const & /*unused*/) { exists = false; }),
ActionMethod<DELETE_OBJECT>([&](Delta const & /*unused*/) { exists = false; })
};
// clang-format on
}
inline auto Deleted_ActionMethod(bool &deleted) {
using enum Delta::Action;
return ActionMethod<RECREATE_OBJECT>([&](Delta const & /*unused*/) { deleted = false; });
}
inline auto HasLabel_ActionMethod(bool &has_label, LabelId label) {
using enum Delta::Action;
// clang-format off
return utils::Overloaded{
ActionMethod<REMOVE_LABEL>([&, label](Delta const &delta) {
if (delta.label == label) {
MG_ASSERT(has_label, "Invalid database state!");
has_label = false;
}
}),
ActionMethod<ADD_LABEL>([&, label](Delta const &delta) {
if (delta.label == label) {
MG_ASSERT(!has_label, "Invalid database state!");
has_label = true;
}
})
};
// clang-format on
}
inline auto Labels_ActionMethod(std::vector<LabelId> &labels) {
using enum Delta::Action;
// clang-format off
return utils::Overloaded{
ActionMethod<REMOVE_LABEL>([&](Delta const &delta) {
auto it = std::find(labels.begin(), labels.end(), delta.label);
DMG_ASSERT(it != labels.end(), "Invalid database state!");
*it = labels.back();
labels.pop_back();
}),
ActionMethod<ADD_LABEL>([&](Delta const &delta) {
DMG_ASSERT(std::find(labels.begin(), labels.end(), delta.label) == labels.end(), "Invalid database state!");
labels.emplace_back(delta.label);
})
};
// clang-format on
}
inline auto PropertyValue_ActionMethod(PropertyValue &value, PropertyId property) {
using enum Delta::Action;
return ActionMethod<SET_PROPERTY>([&, property](Delta const &delta) {
if (delta.property.key == property) {
value = delta.property.value;
}
});
}
inline auto PropertyValueMatch_ActionMethod(bool &match, PropertyId property, PropertyValue const &value) {
using enum Delta::Action;
return ActionMethod<SET_PROPERTY>([&, property](Delta const &delta) {
if (delta.property.key == property) match = (value == delta.property.value);
});
}
inline auto Properties_ActionMethod(std::map<PropertyId, PropertyValue> &properties) {
using enum Delta::Action;
return ActionMethod<SET_PROPERTY>([&](Delta const &delta) {
auto it = properties.find(delta.property.key);
if (it != properties.end()) {
if (delta.property.value.IsNull()) {
// remove the property
properties.erase(it);
} else {
// set the value
it->second = delta.property.value;
}
} else if (!delta.property.value.IsNull()) {
properties.emplace(delta.property.key, delta.property.value);
}
});
}
template <EdgeDirection dir>
inline auto Edges_ActionMethod(std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> &edges,
std::vector<EdgeTypeId> const &edge_types, Vertex const *destination) {
auto const predicate = [&, destination](Delta const &delta) {
if (destination && delta.vertex_edge.vertex != destination) return false;
if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
return false;
return true;
};
// clang-format off
using enum Delta::Action;
return utils::Overloaded{
ActionMethod <(dir == EdgeDirection::IN) ? ADD_IN_EDGE : ADD_OUT_EDGE> (
[&, predicate](Delta const &delta) {
if (!predicate(delta)) return;
// Add the edge because we don't see the removal.
auto link = std::tuple{delta.vertex_edge.edge_type, delta.vertex_edge.vertex, delta.vertex_edge.edge};
DMG_ASSERT((std::find(edges.begin(), edges.end(), link) == edges.end()), "Invalid database state!");
edges.push_back(link);
}
),
ActionMethod <(dir == EdgeDirection::IN) ? REMOVE_IN_EDGE : REMOVE_OUT_EDGE> (
[&, predicate](Delta const &delta) {
if (!predicate(delta)) return;
// Remove the label because we don't see the addition.
auto it = std::find(edges.begin(), edges.end(),
std::tuple{delta.vertex_edge.edge_type, delta.vertex_edge.vertex, delta.vertex_edge.edge});
DMG_ASSERT(it != edges.end(), "Invalid database state!");
*it = edges.back();
edges.pop_back();
}
)
};
// clang-format on
}
template <EdgeDirection dir>
inline auto Degree_ActionMethod(size_t &degree) {
using enum Delta::Action;
// clang-format off
return utils::Overloaded{
ActionMethod <(dir == EdgeDirection::IN) ? ADD_IN_EDGE : ADD_OUT_EDGE> (
[&](Delta const &/*unused*/) { ++degree; }
),
ActionMethod <(dir == EdgeDirection::IN) ? REMOVE_IN_EDGE : REMOVE_OUT_EDGE> (
[&](Delta const &/*unused*/) { --degree; }
),
};
// clang-format on
}
inline auto HasError(View view, VertexInfoCache const &cache, Vertex const *vertex, bool for_deleted)
-> std::optional<Error> {
if (auto resExists = cache.GetExists(view, vertex); resExists && !resExists.value()) return Error::NONEXISTENT_OBJECT;
if (!for_deleted) {
if (auto resDeleted = cache.GetDeleted(view, vertex); resDeleted && resDeleted.value())
return Error::DELETED_OBJECT;
}
return std::nullopt;
}
} // namespace memgraph::storage

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -11,10 +11,12 @@
#pragma once
#include <cstdint>
namespace memgraph::storage {
/// Indicator for obtaining the state before or after a transaction & command.
enum class View {
enum class View : uint8_t {
OLD,
NEW,
};

View File

@ -46,7 +46,7 @@ namespace memgraph::utils {
/// heights than 32. The probability of heights larger than 32 gets extremely
/// small. Also, the internal implementation can handle a maximum height of 32
/// primarily becase of the height generator (see the `gen_height` function).
const uint64_t kSkipListMaxHeight = 32;
constexpr uint64_t kSkipListMaxHeight = 32;
/// This is the height that a node that is accessed from the list has to have in
/// order for garbage collection to be triggered. This causes the garbage
@ -54,21 +54,45 @@ const uint64_t kSkipListMaxHeight = 32;
/// list. Each thread that accesses the list can perform garbage collection. The
/// level is determined empirically using benchmarks. A smaller height means
/// that the garbage collection will be triggered more often.
const uint64_t kSkipListGcHeightTrigger = 16;
constexpr uint64_t kSkipListGcHeightTrigger = 16;
/// This is the highest layer that will be used by default for item count
/// estimation. It was determined empirically using benchmarks to have an
/// optimal trade-off between performance and accuracy. The function will have
/// an expected maximum error of less than 20% when the key matches 100k
/// elements.
const int kSkipListCountEstimateDefaultLayer = 10;
constexpr int kSkipListCountEstimateDefaultLayer = 10;
/// These variables define the storage sizes for the SkipListGc. The internal
/// storage of the GC and the Stack storage used within the GC are all
/// optimized to have block sizes that are a whole multiple of the memory page
/// size.
const uint64_t kSkipListGcBlockSize = 8189;
const uint64_t kSkipListGcStackSize = 8191;
constexpr uint64_t kSkipListGcBlockSize = 8189;
constexpr uint64_t kSkipListGcStackSize = 8191;
namespace detail {
struct SkipListNode_base {
// This function generates a binomial distribution using the same technique
// described here: http://ticki.github.io/blog/skip-lists-done-right/ under
// "O(1) level generation". The only exception in this implementation is that
// the special case of 0 is handled correctly. When 0 is passed to `ffs` it
// returns 0 which is an invalid height. To make the distribution binomial
// this value is then mapped to `kSkipListMaxSize`.
static uint32_t gen_height() {
thread_local std::mt19937 gen{std::random_device{}()};
static_assert(kSkipListMaxHeight <= 32,
"utils::SkipList::gen_height is implemented only for heights "
"up to 32!");
uint32_t value = gen();
if (value == 0) return kSkipListMaxHeight;
// The value should have exactly `kSkipListMaxHeight` bits.
value >>= (32 - kSkipListMaxHeight);
// ffs = find first set
// ^ ^ ^
return __builtin_ffs(value);
}
};
} // namespace detail
/// This is the Node object that represents each element stored in the list. The
/// array of pointers to the next nodes is declared here to a size of 0 so that
@ -180,8 +204,8 @@ class SkipListGc final {
using TDeleted = std::pair<uint64_t, TNode *>;
using TStack = Stack<TDeleted, kSkipListGcStackSize>;
const uint64_t kIdsInField = sizeof(uint64_t) * 8;
const uint64_t kIdsInBlock = kSkipListGcBlockSize * kIdsInField;
static constexpr uint64_t kIdsInField = sizeof(uint64_t) * 8;
static constexpr uint64_t kIdsInBlock = kSkipListGcBlockSize * kIdsInField;
struct Block {
std::atomic<Block *> prev;
@ -549,7 +573,7 @@ class SkipListGc final {
///
/// @tparam TObj object type that is stored in the list
template <typename TObj>
class SkipList final {
class SkipList final : detail::SkipListNode_base {
private:
using TNode = SkipListNode<TObj>;
@ -1256,34 +1280,12 @@ class SkipList final {
}
}
// This function generates a binomial distribution using the same technique
// described here: http://ticki.github.io/blog/skip-lists-done-right/ under
// "O(1) level generation". The only exception in this implementation is that
// the special case of 0 is handled correctly. When 0 is passed to `ffs` it
// returns 0 which is an invalid height. To make the distribution binomial
// this value is then mapped to `kSkipListMaxSize`.
uint32_t gen_height() {
std::lock_guard<SpinLock> guard(lock_);
static_assert(kSkipListMaxHeight <= 32,
"utils::SkipList::gen_height is implemented only for heights "
"up to 32!");
uint32_t value = gen_();
if (value == 0) return kSkipListMaxHeight;
// The value should have exactly `kSkipListMaxHeight` bits.
value >>= (32 - kSkipListMaxHeight);
// ffs = find first set
// ^ ^ ^
return __builtin_ffs(value);
}
private:
TNode *head_{nullptr};
// gc_ also stores the only copy of `MemoryResource *`, to save space.
mutable SkipListGc<TObj> gc_;
std::mt19937 gen_{std::random_device{}()};
std::atomic<uint64_t> size_{0};
SpinLock lock_;
};
} // namespace memgraph::utils

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -11,6 +11,9 @@
#pragma once
#include <functional>
#include <type_traits>
namespace memgraph::utils {
template <class... Ts>
struct Overloaded : Ts... {
@ -19,4 +22,24 @@ struct Overloaded : Ts... {
template <class... Ts>
Overloaded(Ts...) -> Overloaded<Ts...>;
template <typename... Ts>
struct ChainedOverloaded : Ts... {
template <typename... Us>
ChainedOverloaded(Us &&...ts) : Ts(std::forward<Us>(ts))... {}
template <typename... Args>
auto operator()(Args &&...args) {
auto conditional_invoke = [&]<typename Base>(Base *self) {
if constexpr (std::is_invocable_v<Base, Args...>) {
std::invoke(*self, args...);
}
};
(conditional_invoke(static_cast<Ts *>(this)), ...);
}
};
template <typename... Ts>
ChainedOverloaded(Ts...) -> ChainedOverloaded<Ts...>;
} // namespace memgraph::utils

View File

@ -202,4 +202,9 @@ startup_config_dict = {
"",
"The path to mappings that describes aliases to callables in cypher queries in the form of key-value pairs in a json file. With this option query module procedures that do not exist in memgraph can be mapped to ones that exist.",
),
"delta_chain_cache_threashold": (
"128",
"128",
"The threshold for when to cache long delta chains. This is used for heavy read + write workloads where repeated processing of delta chains can become costly.",
),
}

View File

@ -736,6 +736,7 @@ TEST_P(DurabilityTest, SnapshotPeriodic) {
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_P(DurabilityTest, SnapshotFallback) {
// Create snapshot.
std::size_t current_number_of_snapshots = 0;
{
std::unique_ptr<memgraph::storage::Storage> store(new memgraph::storage::InMemoryStorage(
{.items = {.properties_on_edges = GetParam()},
@ -744,21 +745,28 @@ TEST_P(DurabilityTest, SnapshotFallback) {
.snapshot_interval = std::chrono::milliseconds(3000)}}));
CreateBaseDataset(store.get(), GetParam());
std::this_thread::sleep_for(std::chrono::milliseconds(3500));
ASSERT_EQ(GetSnapshotsList().size(), 1);
current_number_of_snapshots = GetSnapshotsList().size();
ASSERT_GE(current_number_of_snapshots, 1);
CreateExtendedDataset(store.get());
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
}
ASSERT_EQ(GetSnapshotsList().size(), 2);
auto prev_number_of_snapshots = current_number_of_snapshots;
auto snapshots = GetSnapshotsList();
current_number_of_snapshots = snapshots.size();
ASSERT_GE(current_number_of_snapshots, prev_number_of_snapshots + 1);
ASSERT_EQ(GetBackupSnapshotsList().size(), 0);
ASSERT_EQ(GetWalsList().size(), 0);
ASSERT_EQ(GetBackupWalsList().size(), 0);
// Destroy last snapshot.
// Destroy snapshots.
{
auto snapshots = GetSnapshotsList();
ASSERT_EQ(snapshots.size(), 2);
CorruptSnapshot(*snapshots.begin());
// protect the last, destroy the rest
auto it = snapshots.begin();
auto const e = snapshots.end() - 1;
for (; it != e; ++it) {
CorruptSnapshot(*it);
}
}
// Recover snapshot.

View File

@ -41,7 +41,7 @@ try:
except ImportError:
yaml = None
is_py2 = sys.version[0] == '2'
is_py2 = sys.version[0] == "2"
if is_py2:
import Queue as queue
@ -49,43 +49,58 @@ else:
import queue as queue
def run_tidy(task_queue, lock, timeout):
def run_tidy(task_queue, lock, timeout, exit_status, running_processes):
watchdog = None
while True:
command = task_queue.get()
with lock:
should_run = exit_status["status"] == 0
if not should_run:
task_queue.task_done()
continue
proc = None
try:
proc = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
with lock:
running_processes.add(proc)
if timeout is not None:
watchdog = threading.Timer(timeout, proc.kill)
watchdog.start()
stdout, stderr = proc.communicate()
return_code = proc.returncode
with lock:
sys.stdout.write(stdout.decode('utf-8') + '\n')
sys.stdout.write(stdout.decode("utf-8") + "\n")
sys.stdout.flush()
if stderr:
sys.stderr.write(stderr.decode('utf-8') + '\n')
sys.stderr.write(stderr.decode("utf-8") + "\n")
sys.stderr.flush()
if return_code != 0:
exit_status["status"] = 1
for p in running_processes:
if p != proc:
p.terminate()
except Exception as e:
with lock:
sys.stderr.write('Failed: ' + str(e) + ': '.join(command) + '\n')
sys.stderr.write("Failed: " + str(e) + ": ".join(command) + "\n")
exit_status["status"] = 1
finally:
with lock:
if not (timeout is None or watchdog is None):
if not watchdog.is_alive():
sys.stderr.write('Terminated by timeout: ' +
' '.join(command) + '\n')
sys.stderr.write("Terminated by timeout: " + " ".join(command) + "\n")
watchdog.cancel()
if proc is not None:
running_processes.remove(proc)
task_queue.task_done()
def start_workers(max_tasks, tidy_caller, task_queue, lock, timeout):
def start_workers(max_tasks, tidy_caller, task_queue, lock, timeout, exit_status, running_processes):
for _ in range(max_tasks):
t = threading.Thread(target=tidy_caller, args=(task_queue, lock, timeout))
t = threading.Thread(target=tidy_caller, args=(task_queue, lock, timeout, exit_status, running_processes))
t.daemon = True
t.start()
@ -96,8 +111,8 @@ def merge_replacement_files(tmpdir, mergefile):
# the top level key 'Diagnostics' in the output yaml files
mergekey = "Diagnostics"
merged = []
for replacefile in glob.iglob(os.path.join(tmpdir, '*.yaml')):
content = yaml.safe_load(open(replacefile, 'r'))
for replacefile in glob.iglob(os.path.join(tmpdir, "*.yaml")):
content = yaml.safe_load(open(replacefile, "r"))
if not content:
continue # Skip empty files.
merged.extend(content.get(mergekey, []))
@ -107,62 +122,65 @@ def merge_replacement_files(tmpdir, mergefile):
# include/clang/Tooling/ReplacementsYaml.h, but the value
# is actually never used inside clang-apply-replacements,
# so we set it to '' here.
output = {'MainSourceFile': '', mergekey: merged}
with open(mergefile, 'w') as out:
output = {"MainSourceFile": "", mergekey: merged}
with open(mergefile, "w") as out:
yaml.safe_dump(output, out)
else:
# Empty the file:
open(mergefile, 'w').close()
open(mergefile, "w").close()
def main():
parser = argparse.ArgumentParser(description=
'Run clang-tidy against changed files, and '
'output diagnostics only for modified '
'lines.')
parser.add_argument('-clang-tidy-binary', metavar='PATH',
default='clang-tidy',
help='path to clang-tidy binary')
parser.add_argument('-p', metavar='NUM', default=0,
help='strip the smallest prefix containing P slashes')
parser.add_argument('-regex', metavar='PATTERN', default=None,
help='custom pattern selecting file paths to check '
'(case sensitive, overrides -iregex)')
parser.add_argument('-iregex', metavar='PATTERN', default=
r'.*\.(cpp|cc|c\+\+|cxx|c|cl|h|hpp|m|mm|inc)',
help='custom pattern selecting file paths to check '
'(case insensitive, overridden by -regex)')
parser.add_argument('-j', type=int, default=1,
help='number of tidy instances to be run in parallel.')
parser.add_argument('-timeout', type=int, default=None,
help='timeout per each file in seconds.')
parser.add_argument('-fix', action='store_true', default=False,
help='apply suggested fixes')
parser.add_argument('-checks',
help='checks filter, when not specified, use clang-tidy '
'default',
default='')
parser.add_argument('-path', dest='build_path',
help='Path used to read a compile command database.')
parser = argparse.ArgumentParser(
description="Run clang-tidy against changed files, and " "output diagnostics only for modified " "lines."
)
parser.add_argument("-clang-tidy-binary", metavar="PATH", default="clang-tidy", help="path to clang-tidy binary")
parser.add_argument("-p", metavar="NUM", default=0, help="strip the smallest prefix containing P slashes")
parser.add_argument(
"-regex",
metavar="PATTERN",
default=None,
help="custom pattern selecting file paths to check " "(case sensitive, overrides -iregex)",
)
parser.add_argument(
"-iregex",
metavar="PATTERN",
default=r".*\.(cpp|cc|c\+\+|cxx|c|cl|h|hpp|m|mm|inc)",
help="custom pattern selecting file paths to check " "(case insensitive, overridden by -regex)",
)
parser.add_argument("-j", type=int, default=1, help="number of tidy instances to be run in parallel.")
parser.add_argument("-timeout", type=int, default=None, help="timeout per each file in seconds.")
parser.add_argument("-fix", action="store_true", default=False, help="apply suggested fixes")
parser.add_argument("-checks", help="checks filter, when not specified, use clang-tidy " "default", default="")
parser.add_argument("-path", dest="build_path", help="Path used to read a compile command database.")
if yaml:
parser.add_argument('-export-fixes', metavar='FILE', dest='export_fixes',
help='Create a yaml file to store suggested fixes in, '
'which can be applied with clang-apply-replacements.')
parser.add_argument('-extra-arg', dest='extra_arg',
action='append', default=[],
help='Additional argument to append to the compiler '
'command line.')
parser.add_argument('-extra-arg-before', dest='extra_arg_before',
action='append', default=[],
help='Additional argument to prepend to the compiler '
'command line.')
parser.add_argument('-quiet', action='store_true', default=False,
help='Run clang-tidy in quiet mode')
parser.add_argument(
"-export-fixes",
metavar="FILE",
dest="export_fixes",
help="Create a yaml file to store suggested fixes in, "
"which can be applied with clang-apply-replacements.",
)
parser.add_argument(
"-extra-arg",
dest="extra_arg",
action="append",
default=[],
help="Additional argument to append to the compiler " "command line.",
)
parser.add_argument(
"-extra-arg-before",
dest="extra_arg_before",
action="append",
default=[],
help="Additional argument to prepend to the compiler " "command line.",
)
parser.add_argument("-quiet", action="store_true", default=False, help="Run clang-tidy in quiet mode")
clang_tidy_args = []
argv = sys.argv[1:]
if '--' in argv:
clang_tidy_args.extend(argv[argv.index('--'):])
argv = argv[:argv.index('--')]
if "--" in argv:
clang_tidy_args.extend(argv[argv.index("--") :])
argv = argv[: argv.index("--")]
args = parser.parse_args(argv)
@ -170,20 +188,20 @@ def main():
filename = None
lines_by_file = {}
for line in sys.stdin:
match = re.search('^\+\+\+\ \"?(.*?/){%s}([^ \t\n\"]*)' % args.p, line)
match = re.search('^\+\+\+\ "?(.*?/){%s}([^ \t\n"]*)' % args.p, line)
if match:
filename = match.group(2)
if filename is None:
continue
if args.regex is not None:
if not re.match('^%s$' % args.regex, filename):
if not re.match("^%s$" % args.regex, filename):
continue
else:
if not re.match('^%s$' % args.iregex, filename, re.IGNORECASE):
if not re.match("^%s$" % args.iregex, filename, re.IGNORECASE):
continue
match = re.search('^@@.*\+(\d+)(,(\d+))?', line)
match = re.search("^@@.*\+(\d+)(,(\d+))?", line)
if match:
start_line = int(match.group(1))
line_count = 1
@ -211,39 +229,40 @@ def main():
task_queue = queue.Queue(max_task_count)
# A lock for console output.
lock = threading.Lock()
running_processes = set()
exit_status = {"status": 0}
# Run a pool of clang-tidy workers.
start_workers(max_task_count, run_tidy, task_queue, lock, args.timeout)
start_workers(max_task_count, run_tidy, task_queue, lock, args.timeout, exit_status, running_processes)
# Form the common args list.
common_clang_tidy_args = []
if args.fix:
common_clang_tidy_args.append('-fix')
if args.checks != '':
common_clang_tidy_args.append('-checks=' + args.checks)
common_clang_tidy_args.append("-fix")
if args.checks != "":
common_clang_tidy_args.append("-checks=" + args.checks)
if args.quiet:
common_clang_tidy_args.append('-quiet')
common_clang_tidy_args.append("-quiet")
if args.build_path is not None:
common_clang_tidy_args.append('-p=%s' % args.build_path)
common_clang_tidy_args.append("-p=%s" % args.build_path)
for arg in args.extra_arg:
common_clang_tidy_args.append('-extra-arg=%s' % arg)
common_clang_tidy_args.append("-extra-arg=%s" % arg)
for arg in args.extra_arg_before:
common_clang_tidy_args.append('-extra-arg-before=%s' % arg)
common_clang_tidy_args.append("-extra-arg-before=%s" % arg)
for name in lines_by_file:
line_filter_json = json.dumps(
[{"name": name, "lines": lines_by_file[name]}],
separators=(',', ':'))
line_filter_json = json.dumps([{"name": name, "lines": lines_by_file[name]}], separators=(",", ":"))
# Run clang-tidy on files containing changes.
command = [args.clang_tidy_binary]
command.append('-line-filter=' + line_filter_json)
command.append("-line-filter=" + line_filter_json)
if yaml and args.export_fixes:
# Get a temporary file. We immediately close the handle so clang-tidy can
# overwrite it.
(handle, tmp_name) = tempfile.mkstemp(suffix='.yaml', dir=tmpdir)
(handle, tmp_name) = tempfile.mkstemp(suffix=".yaml", dir=tmpdir)
os.close(handle)
command.append('-export-fixes=' + tmp_name)
command.append("-export-fixes=" + tmp_name)
command.extend(common_clang_tidy_args)
command.append(name)
command.extend(clang_tidy_args)
@ -253,17 +272,20 @@ def main():
# Wait for all threads to be done.
task_queue.join()
if exit_status["status"] != 0:
sys.exit(exit_status["status"])
if yaml and args.export_fixes:
print('Writing fixes to ' + args.export_fixes + ' ...')
print("Writing fixes to " + args.export_fixes + " ...")
try:
merge_replacement_files(tmpdir, args.export_fixes)
except:
sys.stderr.write('Error exporting fixes.\n')
sys.stderr.write("Error exporting fixes.\n")
traceback.print_exc()
if tmpdir:
shutil.rmtree(tmpdir)
if __name__ == '__main__':
if __name__ == "__main__":
main()

56
tools/pre-commit/clang-tidy.py Executable file
View File

@ -0,0 +1,56 @@
#!/usr/bin/env python3
import argparse
import os
import subprocess
import sys
CLANG_TIDY_DIFF = "./tools/github/clang-tidy/clang-tidy-diff.py"
def check_clang_tidy():
try:
subprocess.run(["clang-tidy", "--version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError:
print("clang-tidy is not installed. Please install clang-tidy and try again.", file=sys.stderr)
sys.exit(1)
except subprocess.CalledProcessError:
print("Error while checking clang-tidy version.", file=sys.stderr)
sys.exit(1)
if not os.path.exists(CLANG_TIDY_DIFF):
print(f"Error can't find '{CLANG_TIDY_DIFF}'.", file=sys.stderr)
sys.exit(1)
def run_clang_tidy_on_files(compile_commands_path):
process1 = subprocess.Popen("git diff -U0 HEAD".split(), stdout=subprocess.PIPE)
process2 = subprocess.Popen(
f"{CLANG_TIDY_DIFF} -p1 -j 8 -path {compile_commands_path}".split(),
stdin=process1.stdout,
stdout=subprocess.PIPE,
)
process1.stdout.close() # Close the output pipe of the first process
output, err_output = process2.communicate() # Get the output from the second process
if process2.returncode != 0:
print("Error occurred in clang-tidy-diff command:")
if output:
print(output.decode().strip())
if err_output:
print(err_output.decode().strip(), file=sys.stderr)
sys.exit(1)
def main():
check_clang_tidy()
parser = argparse.ArgumentParser(description="Run clang-tidy on specified files.")
# parser.add_argument("file_paths", nargs="+", type=str, help="Paths to files to be checked.")
parser.add_argument("--compile_commands_path", type=str, required=True, help="Path to compile_commands.json.")
args = parser.parse_args()
run_clang_tidy_on_files(args.compile_commands_path)
if __name__ == "__main__":
main()