From 954df64d1d7ee9b7f233c4373497bc8e19dd7f9a Mon Sep 17 00:00:00 2001 From: jbajic Date: Fri, 3 Feb 2023 13:42:38 +0100 Subject: [PATCH] Add detla pruning --- src/storage/v3/splitter.cpp | 53 ++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/src/storage/v3/splitter.cpp b/src/storage/v3/splitter.cpp index d54f8d1a1..3e387fe4d 100644 --- a/src/storage/v3/splitter.cpp +++ b/src/storage/v3/splitter.cpp @@ -11,6 +11,7 @@ #include "storage/v3/splitter.hpp" +#include #include #include #include @@ -139,9 +140,45 @@ std::map> Splitter::CollectTransactions( return transactions; } +void PruneDeltas(std::map> &cloned_transactions, const PrimaryKey &split_key) { + // Remove delta chains wh + auto cloned_transaction_it = cloned_transactions.begin(); + while (cloned_transaction_it != cloned_transactions.end()) { + auto cloned_delta_it = cloned_transaction_it->second->deltas.begin(); + + if (const auto prev = cloned_delta_it->prev.Get(); + prev.type == PreviousPtr::Type::VERTEX && prev.vertex->first < split_key) { + // We can remove this delta chain + auto *current_next_delta = cloned_delta_it->next; + cloned_transaction_it->second->deltas.remove_if( + [cloned_delta_it](const auto &delta) { return delta.id == cloned_delta_it->id; }); + + while (current_next_delta != nullptr) { + auto *next_delta = current_next_delta->next; + // Find next delta transaction delta list + auto current_transaction_it = std::ranges::find_if( + cloned_transactions, [&start_or_commit_timestamp = cloned_delta_it->commit_info->start_or_commit_timestamp]( + const auto &transaction) { + return transaction.second->start_timestamp == start_or_commit_timestamp || + transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp; + }); + // Remove it + current_transaction_it->second->deltas.remove_if( + [current_next_delta](const auto &delta) { return delta.id == current_next_delta->id; }); + + current_next_delta = next_delta; + } + } + + // while(cloned_delta_it != ) + } +} + void Splitter::AdjustClonedTransactions(std::map> &cloned_transactions, VertexContainer &cloned_vertices, EdgeContainer &cloned_edges, const PrimaryKey &split_key) { + // Prune deltas whose delta chain points to vertex/edge that should not belong on that shard + PruneDeltas(cloned_transactions, split_key); for (auto &[commit_start, cloned_transaction] : cloned_transactions) { AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions, cloned_vertices, cloned_edges, split_key); @@ -152,10 +189,6 @@ inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) { return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE; } -bool IsDelta(const PreviousPtr::Type &delta_type) { - return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE; -} - bool DoesPrevPtrPointsToSplittedData(const PreviousPtr::Pointer &prev_ptr, const PrimaryKey &split_key) { return prev_ptr.type == PreviousPtr::Type::VERTEX && prev_ptr.vertex->first < split_key; } @@ -203,17 +236,14 @@ void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Tr // Find cloned delta in delta list of cloned transaction auto found_cloned_delta_it = std::ranges::find_if(cloned_transaction_it->second->deltas, - [delta = ptr.delta](const auto &elem) { return elem.uuid == delta->uuid; }); + [delta = ptr.delta](const auto &elem) { return elem.id == delta->id; }); MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), - "Delta with given uuid must exist!"); + "Delta with given id must exist!"); cloned_delta->prev.Set(&*found_cloned_delta_it); break; } case PreviousPtr::Type::VERTEX: { - // What if the vertex is already moved to garbage collection... - // TODO(jbajic) Maybe revisit when we apply Garbage collection with - // new transaction management system auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first); cloned_delta->prev.Set(cloned_vertex); break; @@ -247,9 +277,8 @@ void Splitter::AdjustDeltaNext(const Delta &original, Delta &cloned, MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found"); // Find cloned delta in delta list of cloned transaction - auto found_cloned_delta_it = - std::ranges::find_if(cloned_transaction_it->second->deltas, - [&original](const auto &elem) { return elem.uuid == original.next->uuid; }); + auto found_cloned_delta_it = std::ranges::find_if( + cloned_transaction_it->second->deltas, [&original](const auto &elem) { return elem.id == original.next->id; }); MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), "Delta with given uuid must exist!"); cloned.next = &*found_cloned_delta_it; }