diff --git a/src/storage/v3/CMakeLists.txt b/src/storage/v3/CMakeLists.txt index b3a3d68a9..05f86a5dd 100644 --- a/src/storage/v3/CMakeLists.txt +++ b/src/storage/v3/CMakeLists.txt @@ -18,6 +18,7 @@ set(storage_v3_src_files bindings/typed_value.cpp expr.cpp vertex.cpp + splitter.cpp request_helper.cpp) # ###################### diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index 932b0986d..7e99beaea 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -332,7 +332,8 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key, vertex_validator_{schema_validator_, primary_label}, indices_{config.items, vertex_validator_}, isolation_level_{config.transaction.isolation_level}, - config_{config} { + config_{config}, + shard_splitter_(vertices_, edges_, start_logical_id_to_transaction_, config_) { CreateSchema(primary_label_, schema); StoreMapping(std::move(id_to_name)); } @@ -1056,158 +1057,7 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept { return std::nullopt; } -void Shard::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const { - while (delta != nullptr) { - collected_transactions_start_id.insert(delta->commit_info->start_or_commit_timestamp.logical_id); - delta = delta->next; - } -} - -VertexContainer Shard::CollectVertices(std::set<uint64_t> &collected_transactions_start_id, - const PrimaryKey &split_key) { - VertexContainer splitted_data; - - auto split_key_it = vertices_.find(split_key); - while (split_key_it != vertices_.end()) { - // Go through deltas and pick up transactions start_id - ScanDeltas(collected_transactions_start_id, split_key_it->second.delta); - auto next_it = std::next(split_key_it); - splitted_data.insert(vertices_.extract(split_key_it->first)); - split_key_it = next_it; - } - return splitted_data; -} - -std::optional<EdgeContainer> Shard::CollectEdges(std::set<uint64_t> &collected_transactions_start_id, - const VertexContainer &split_vertices, const PrimaryKey &split_key) { - if (!config_.items.properties_on_edges) { - return std::nullopt; - } - EdgeContainer splitted_edges; - const auto split_edges_from_vertex = [&](const auto &edges_ref) { - // This is safe since if properties_on_edges is true, the this must be a - // ptr - for (const auto &edge_ref : edges_ref) { - auto *edge = std::get<2>(edge_ref).ptr; - const auto &other_vtx = std::get<1>(edge_ref); - ScanDeltas(collected_transactions_start_id, edge->delta); - // Check if src and dest edge are both on splitted shard - // so we know if we should remove orphan edge - if (other_vtx.primary_key >= split_key) { - // Remove edge from shard - splitted_edges.insert(edges_.extract(edge->gid)); - } else { - splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}}); - } - } - }; - - for (const auto &vertex : split_vertices) { - split_edges_from_vertex(vertex.second.in_edges); - split_edges_from_vertex(vertex.second.out_edges); - } - return splitted_edges; -} - -void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction, - std::map<uint64_t, Transaction> &cloned_transactions, - VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) { - // Align next and prev in deltas - // NOTE It is important that the order of delta lists is in same order - auto delta_it = transaction.deltas.begin(); - auto cloned_delta_it = cloned_transaction.deltas.begin(); - while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) { - MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct"); - // Find appropriate prev and delta->next for cloned deltas - // auto *prev = &delta_it->prev; - // auto *cloned_prev = &cloned_delta_it->prev; - - const auto *delta = &*delta_it; - auto *cloned_delta = &*cloned_delta_it; - while (delta != nullptr) { - // Align delta, while ignoring deltas whose transactions have commited, - // or aborted - if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) { - cloned_delta->next = &*std::ranges::find_if( - cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas, - [delta](const auto &elem) { return elem.uuid == delta->uuid; }); - } else { - delta = delta->next; - continue; - } - // Align prev ptr - auto ptr = delta->prev.Get(); - switch (ptr.type) { - case PreviousPtr::Type::NULLPTR: { - // noop - break; - } - case PreviousPtr::Type::DELTA: { - cloned_delta->prev.Set(ptr.delta); - break; - } - case PreviousPtr::Type::VERTEX: { - // What if the vertex is already moved to garbage collection... - // Make test when you have deleted vertex - auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first); - cloned_delta->prev.Set(cloned_vertex); - break; - } - case PreviousPtr::Type::EDGE: { - // TODO Case when there are no properties on edge is not handled - auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid); - cloned_delta->prev.Set(&cloned_edge->second); - break; - } - }; - - cloned_delta = cloned_delta->next; - delta = delta->next; - } - - ++delta_it; - ++cloned_delta_it; - } - MG_ASSERT(delta_it == transaction.deltas.end() && cloned_delta_it == cloned_transaction.deltas.end(), - "Both iterators must be exhausted!"); -} - -void Shard::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, - VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) { - for (auto &[commit_start, cloned_transaction] : cloned_transactions) { - AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions, - cloned_vertices, cloned_edges); - } -} - -std::map<uint64_t, Transaction> Shard::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id, - VertexContainer &cloned_vertices, - EdgeContainer &cloned_edges) { - std::map<uint64_t, Transaction> transactions; - for (const auto commit_start : collected_transactions_start_id) { - // If it does not contain then the transaction has commited, and we ignore it - if (start_logical_id_to_transaction_.contains(commit_start)) { - transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()}); - } - } - // It is necessary to clone all the transactions first so we have new addresses - // for deltas, before doing alignment of deltas and prev_ptr - AlignClonedTransactions(transactions, cloned_vertices, cloned_edges); - return transactions; -} - -SplitData Shard::PerformSplit(const PrimaryKey &split_key) { - SplitData data; - std::set<uint64_t> collected_transactions_start_id; - data.vertices = CollectVertices(collected_transactions_start_id, split_key); - data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key); - data.transactions = CollectTransactions(collected_transactions_start_id, data.vertices, *data.edges); - - // TODO indices wont work since timestamp cannot be replicated - // data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()}; - - return data; -} +SplitData Shard::PerformSplit(const PrimaryKey &split_key) { return shard_splitter_.SplitShard(split_key); } bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const { return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ && diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index 3079f89ce..5b18eeda2 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -37,6 +37,7 @@ #include "storage/v3/result.hpp" #include "storage/v3/schema_validator.hpp" #include "storage/v3/schemas.hpp" +#include "storage/v3/splitter.hpp" #include "storage/v3/transaction.hpp" #include "storage/v3/vertex.hpp" #include "storage/v3/vertex_accessor.hpp" @@ -179,15 +180,6 @@ struct SplitInfo { PrimaryKey split_point; }; -// If edge properties-on-edges is false then we don't need to send edges but -// only vertices, since they will contain those edges -struct SplitData { - VertexContainer vertices; - std::optional<EdgeContainer> edges; - IndicesInfo indices_info; - std::map<uint64_t, Transaction> transactions; -}; - /// Structure used to return information about the storage. struct StorageInfo { uint64_t vertex_count; @@ -379,44 +371,6 @@ class Shard final { SplitData PerformSplit(const PrimaryKey &split_key); private: - template <typename TObj> - requires utils::SameAsAnyOf<TObj, Edge, VertexData> - void AdjustSplittedDataDeltas(TObj &delta_holder, const std::map<int64_t, Transaction> &transactions) { - auto *delta_chain = delta_holder.delta; - Delta *new_delta_chain{nullptr}; - while (delta_chain != nullptr) { - auto &transaction = transactions.at(delta_chain->command_id); - // This is the address of corresponding delta - const auto transaction_delta_it = std::ranges::find_if( - transaction->deltas, [delta_uuid = delta_chain->uuid](const auto &elem) { return elem.uuid == delta_uuid; }); - // Add this delta to the new chain - if (new_delta_chain == nullptr) { - new_delta_chain = &*transaction_delta_it; - } else { - new_delta_chain->next = &*transaction_delta_it; - } - delta_chain = delta_chain->next; - } - delta_holder.delta = new_delta_chain; - } - - void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const; - - void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction, - std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices, - EdgeContainer &cloned_edges); - - void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices, - EdgeContainer &cloned_edges); - - std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id, - VertexContainer &cloned_vertices, EdgeContainer &cloned_edges); - - VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key); - - std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id, - const VertexContainer &split_vertices, const PrimaryKey &split_key); - Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level); uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {}); @@ -456,6 +410,7 @@ class Shard final { // Holds all of the (in progress, committed and aborted) transactions that are read or write to this shard, but // haven't been cleaned up yet std::map<uint64_t, std::unique_ptr<Transaction>> start_logical_id_to_transaction_{}; + Splitter shard_splitter_; bool has_any_transaction_aborted_since_last_gc{false}; }; diff --git a/src/storage/v3/splitter.cpp b/src/storage/v3/splitter.cpp new file mode 100644 index 000000000..8969bfa08 --- /dev/null +++ b/src/storage/v3/splitter.cpp @@ -0,0 +1,183 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v3/splitter.hpp" + +#include <map> +#include <memory> +#include <set> + +#include "storage/v3/config.hpp" +#include "storage/v3/key_store.hpp" +#include "storage/v3/transaction.hpp" + +namespace memgraph::storage::v3 { + +Splitter::Splitter(VertexContainer &vertices, EdgeContainer &edges, + std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Config &config) + : vertices_(vertices), + edges_(edges), + start_logical_id_to_transaction_(start_logical_id_to_transaction), + config_(config) {} + +SplitData Splitter::SplitShard(const PrimaryKey &split_key) { + SplitData data; + std::set<uint64_t> collected_transactions_start_id; + data.vertices = CollectVertices(collected_transactions_start_id, split_key); + data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key); + data.transactions = CollectTransactions(collected_transactions_start_id, data.vertices, *data.edges); + // TODO Indices + + return data; +} + +void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) { + while (delta != nullptr) { + collected_transactions_start_id.insert(delta->commit_info->start_or_commit_timestamp.logical_id); + delta = delta->next; + } +} + +VertexContainer Splitter::CollectVertices(std::set<uint64_t> &collected_transactions_start_id, + const PrimaryKey &split_key) { + VertexContainer splitted_data; + + auto split_key_it = vertices_.find(split_key); + while (split_key_it != vertices_.end()) { + // Go through deltas and pick up transactions start_id + ScanDeltas(collected_transactions_start_id, split_key_it->second.delta); + auto next_it = std::next(split_key_it); + splitted_data.insert(vertices_.extract(split_key_it->first)); + split_key_it = next_it; + } + return splitted_data; +} + +std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collected_transactions_start_id, + const VertexContainer &split_vertices, + const PrimaryKey &split_key) { + if (!config_.items.properties_on_edges) { + return std::nullopt; + } + EdgeContainer splitted_edges; + const auto split_edges_from_vertex = [&](const auto &edges_ref) { + // This is safe since if properties_on_edges is true, the this must be a + // ptr + for (const auto &edge_ref : edges_ref) { + auto *edge = std::get<2>(edge_ref).ptr; + const auto &other_vtx = std::get<1>(edge_ref); + ScanDeltas(collected_transactions_start_id, edge->delta); + // Check if src and dest edge are both on splitted shard + // so we know if we should remove orphan edge + if (other_vtx.primary_key >= split_key) { + // Remove edge from shard + splitted_edges.insert(edges_.extract(edge->gid)); + } else { + splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}}); + } + } + }; + + for (const auto &vertex : split_vertices) { + split_edges_from_vertex(vertex.second.in_edges); + split_edges_from_vertex(vertex.second.out_edges); + } + return splitted_edges; +} + +std::map<uint64_t, Transaction> Splitter::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id, + VertexContainer &cloned_vertices, + EdgeContainer &cloned_edges) { + std::map<uint64_t, Transaction> transactions; + for (const auto commit_start : collected_transactions_start_id) { + // If it does not contain then the transaction has commited, and we ignore it + if (start_logical_id_to_transaction_.contains(commit_start)) { + transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()}); + } + } + // It is necessary to clone all the transactions first so we have new addresses + // for deltas, before doing alignment of deltas and prev_ptr + AlignClonedTransactions(transactions, cloned_vertices, cloned_edges); + return transactions; +} + +void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction, + std::map<uint64_t, Transaction> &cloned_transactions, + VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) { + // Align next and prev in deltas + // NOTE It is important that the order of delta lists is in same order + auto delta_it = transaction.deltas.begin(); + auto cloned_delta_it = cloned_transaction.deltas.begin(); + while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) { + MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct"); + // Find appropriate prev and delta->next for cloned deltas + // auto *prev = &delta_it->prev; + // auto *cloned_prev = &cloned_delta_it->prev; + + const auto *delta = &*delta_it; + auto *cloned_delta = &*cloned_delta_it; + while (delta != nullptr) { + // Align delta, while ignoring deltas whose transactions have commited, + // or aborted + if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) { + cloned_delta->next = &*std::ranges::find_if( + cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas, + [delta](const auto &elem) { return elem.uuid == delta->uuid; }); + } else { + delta = delta->next; + continue; + } + // Align prev ptr + auto ptr = delta->prev.Get(); + switch (ptr.type) { + case PreviousPtr::Type::NULLPTR: { + // noop + break; + } + case PreviousPtr::Type::DELTA: { + cloned_delta->prev.Set(ptr.delta); + break; + } + case PreviousPtr::Type::VERTEX: { + // What if the vertex is already moved to garbage collection... + // Make test when you have deleted vertex + auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first); + cloned_delta->prev.Set(cloned_vertex); + break; + } + case PreviousPtr::Type::EDGE: { + // TODO Case when there are no properties on edge is not handled + auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid); + cloned_delta->prev.Set(&cloned_edge->second); + break; + } + }; + + cloned_delta = cloned_delta->next; + delta = delta->next; + } + + ++delta_it; + ++cloned_delta_it; + } + MG_ASSERT(delta_it == transaction.deltas.end() && cloned_delta_it == cloned_transaction.deltas.end(), + "Both iterators must be exhausted!"); +} + +void Splitter::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, + VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) { + for (auto &[commit_start, cloned_transaction] : cloned_transactions) { + AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions, + cloned_vertices, cloned_edges); + } +} + +} // namespace memgraph::storage::v3 diff --git a/src/storage/v3/splitter.hpp b/src/storage/v3/splitter.hpp new file mode 100644 index 000000000..bb5909ef1 --- /dev/null +++ b/src/storage/v3/splitter.hpp @@ -0,0 +1,70 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include <map> +#include <memory> +#include <optional> +#include <set> + +#include "storage/v3/config.hpp" +#include "storage/v3/delta.hpp" +#include "storage/v3/edge.hpp" +#include "storage/v3/transaction.hpp" +#include "storage/v3/vertex.hpp" + +namespace memgraph::storage::v3 { + +// If edge properties-on-edges is false then we don't need to send edges but +// only vertices, since they will contain those edges +struct SplitData { + VertexContainer vertices; + std::optional<EdgeContainer> edges; + std::map<uint64_t, Transaction> transactions; +}; + +class Splitter final { + public: + Splitter(VertexContainer &vertices, EdgeContainer &edges, + std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Config &config); + + Splitter(const Splitter &) = delete; + Splitter(Splitter &&) noexcept = delete; + Splitter &operator=(const Splitter &) = delete; + Splitter operator=(Splitter &&) noexcept = delete; + ~Splitter() = default; + + SplitData SplitShard(const PrimaryKey &split_key); + + private: + static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta); + + void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction, + std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices, + EdgeContainer &cloned_edges); + + void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices, + EdgeContainer &cloned_edges); + + std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id, + VertexContainer &cloned_vertices, EdgeContainer &cloned_edges); + + VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key); + + std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id, + const VertexContainer &split_vertices, const PrimaryKey &split_key); + + VertexContainer &vertices_; + EdgeContainer &edges_; + std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction_; + Config &config_; +}; + +} // namespace memgraph::storage::v3