diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index 305ef8650..86529ad5a 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -19,6 +19,7 @@ #include #include +#include #include #include "io/network/endpoint.hpp" @@ -1055,7 +1056,7 @@ std::optional Shard::ShouldSplit() const noexcept { return std::nullopt; } -void CollectDeltas(std::set &collected_transactions_start_id, Delta *delta) const { +void Shard::ScanDeltas(std::set &collected_transactions_start_id, Delta *delta) const { while (delta != nullptr) { collected_transactions_start_id.insert(delta->command_id); delta = delta->next; @@ -1069,59 +1070,72 @@ VertexContainer Shard::CollectVertices(std::set &collected_transaction for (; split_key_it != vertices_.end(); split_key_it++) { // Go through deltas and pick up transactions start_id - CollectDeltas(collected_transactions_start_id, split_key_it->second.delta); + ScanDeltas(collected_transactions_start_id, split_key_it->second.delta); splitted_data.insert(vertices_.extract(split_key_it->first)); } return splitted_data; } std::optional Shard::CollectEdges(std::set &collected_transactions_start_id, - const VertexContainer &split_vertices) const { + const VertexContainer &split_vertices, const PrimaryKey &split_key) { if (!config_.items.properties_on_edges) { return std::nullopt; } EdgeContainer splitted_edges; - // TODO This copies edges without removing the unecessary ones!! + const auto split_edges_from_vertex = [&](const auto &edges_ref) { + // This is safe since if properties_on_edges is true, the this must be a + // ptr + for (const auto &edge_ref : edges_ref) { + auto *edge = std::get<2>(edge_ref).ptr; + const auto &other_vtx = std::get<1>(edge_ref); + ScanDeltas(collected_transactions_start_id, edge->delta); + // Check if src and dest edge are both on splitted shard + // so we know if we should remove orphan edge + if (other_vtx.primary_key >= split_key) { + // Remove edge from shard + splitted_edges.insert(edges_.extract(edge->gid)); + } else { + splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}}); + } + } + }; + for (const auto &vertex : split_vertices) { - for (const auto &in_edge : vertex.second.in_edges) { - // This is safe since if properties_on_edges is true, the this must be a - // ptr - auto *edge = std::get<2>(in_edge).ptr; - CollectDeltas(collected_transactions_start_id, edge->delta); - - splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}}); - } - for (const auto &in_edge : vertex.second.out_edges) { - auto *edge = std::get<2>(in_edge).ptr; - CollectDeltas(collected_transactions_start_id, edge->delta); - - splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}}); - } + split_edges_from_vertex(vertex.second.in_edges); + split_edges_from_vertex(vertex.second.out_edges); } return splitted_edges; } -std::list Shard::CollectTransactions(const std::set &collected_transactions_start_id) const { - std::list transactions; - for (const auto commit_start : collected_transactions_start_id) { - transactions.push_back(*start_logical_id_to_transaction_[commit_start]); - } +std::map> Shard::CollectTransactions( + const std::set &collected_transactions_start_id) { + std::map> transactions; + // for (const auto commit_start : collected_transactions_start_id) { + // transactions.insert( + // {commit_start, std::make_unique(*start_logical_id_to_transaction_.at(commit_start))}); + // } return transactions; } SplitData Shard::PerformSplit(const PrimaryKey &split_key) { SplitData data; std::set collected_transactions_start_id; - // Split Vertices data.vertices = CollectVertices(collected_transactions_start_id, split_key); - // Resolve the deltas that were left on the shard, and are not referenced by - // neither of vertices - data.edges = CollectEdges(collected_transactions_start_id, data.vertices); - data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()}; - // TODO Iterate over vertices and edges to replace their deltas with new ones tha are copied over - // use uuid + data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key); + // TODO indices wont work since timestamp cannot be replicated + // data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()}; - data.transactions = CollectTransactions(collected_transactions_start_id); + // data.transactions = CollectTransactions(collected_transactions_start_id); + + // Update delta addresses with the new addresses + // for (auto &vertex : data.vertices) { + // AdjustSplittedDataDeltas(vertex.second, data.transactions); + // } + // if (data.edges) { + // for (auto &edge : data.edges) { + // AdjustSplittedDataDeltas(edge, data.transactions); + // } + // } return data; } diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index ec9104ed6..cab92eae9 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -185,7 +185,7 @@ struct SplitData { VertexContainer vertices; std::optional edges; IndicesInfo indices_info; - std::list transactions; + std::map transactions; }; /// Structure used to return information about the storage. @@ -375,14 +375,36 @@ class Shard final { SplitData PerformSplit(const PrimaryKey &split_key); private: - void CollectDeltas(std::set &collected_transactions_start_id, Delta *delta) const; + template + requires utils::SameAsAnyOf + void AdjustSplittedDataDeltas(TObj &delta_holder, const std::map &transactions) { + auto *delta_chain = delta_holder.delta; + Delta *new_delta_chain{nullptr}; + while (delta_chain != nullptr) { + auto &transaction = transactions.at(delta_chain->command_id); + // This is the address of corresponding delta + const auto transaction_delta_it = std::ranges::find_if( + transaction->deltas, [delta_uuid = delta_chain->uuid](const auto &elem) { return elem.uuid == delta_uuid; }); + // Add this delta to the new chain + if (new_delta_chain == nullptr) { + new_delta_chain = &*transaction_delta_it; + } else { + new_delta_chain->next = &*transaction_delta_it; + } + delta_chain = delta_chain->next; + } + delta_holder.delta = new_delta_chain; + } - std::list CollectTransactions(const std::set &collected_transactions_start_id) const; + void ScanDeltas(std::set &collected_transactions_start_id, Delta *delta) const; + + std::map> CollectTransactions( + const std::set &collected_transactions_start_id); VertexContainer CollectVertices(std::set &collected_transactions_start_id, const PrimaryKey &split_key); std::optional CollectEdges(std::set &collected_transactions_start_id, - const VertexContainer &split_vertices) const; + const VertexContainer &split_vertices, const PrimaryKey &split_key); Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level); @@ -391,7 +413,7 @@ class Shard final { // Main object storage NameIdMapper name_id_mapper_; LabelId primary_label_; - // The shard's range is [min, max) + // The shard's range is [min, max> PrimaryKey min_primary_key_; std::optional max_primary_key_; VertexContainer vertices_; diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp index dcdc6ee13..e3d57964f 100644 --- a/src/storage/v3/shard_worker.hpp +++ b/src/storage/v3/shard_worker.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -174,12 +174,12 @@ class ShardWorker { Time next_for_uuid = rsm.Cron(); // Check if shard should split - if (const auto split_info = rsm.ShouldSplit(); split_info) { - // Request split from coordinator - // split_point => middle pk - // shard_id => uuid - // shard_version => - } + // if (const auto split_info = rsm.ShouldSplit(); split_info) { + // Request split from coordinator + // split_point => middle pk + // shard_id => uuid + // shard_version => + // } cron_schedule_.pop(); cron_schedule_.push(std::make_pair(next_for_uuid, uuid)); diff --git a/src/storage/v3/transaction.hpp b/src/storage/v3/transaction.hpp index 229e071b7..e0ed59290 100644 --- a/src/storage/v3/transaction.hpp +++ b/src/storage/v3/transaction.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -31,6 +31,16 @@ struct CommitInfo { }; struct Transaction { + Transaction(coordinator::Hlc start_timestamp, CommitInfo commit_info, uint64_t command_id, + const std::list &deltas, bool must_abort, bool is_aborted, IsolationLevel isolation_level) + : start_timestamp{start_timestamp}, + commit_info{std::make_unique(commit_info)}, + command_id(command_id), + deltas(CopyDeltas(deltas)), + must_abort(must_abort), + is_aborted(is_aborted), + isolation_level(isolation_level){}; + Transaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level) : start_timestamp(start_timestamp), commit_info(std::make_unique(CommitInfo{false, {start_timestamp}})), @@ -54,6 +64,12 @@ struct Transaction { ~Transaction() {} + std::list CopyDeltas(const std::list &deltas) const { return std::list{}; } + + Transaction Clone() const { + return {start_timestamp, *commit_info, command_id, deltas, must_abort, is_aborted, isolation_level}; + } + coordinator::Hlc start_timestamp; std::unique_ptr commit_info; uint64_t command_id;