From 050d5efae73dbdb1537b7318276e6ff25825bb48 Mon Sep 17 00:00:00 2001 From: jbajic <jure.bajic@memgraph.com> Date: Wed, 11 Jan 2023 14:16:41 +0100 Subject: [PATCH] Align deltas --- src/storage/v3/shard.cpp | 50 +++++++++++++++++++++++++++++----- src/storage/v3/shard.hpp | 8 ++++-- src/storage/v3/transaction.hpp | 1 + 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index 86529ad5a..4ea89e6aa 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -1107,13 +1107,49 @@ std::optional<EdgeContainer> Shard::CollectEdges(std::set<uint64_t> &collected_t return splitted_edges; } -std::map<uint64_t, std::unique_ptr<Transaction>> Shard::CollectTransactions( - const std::set<uint64_t> &collected_transactions_start_id) { - std::map<uint64_t, std::unique_ptr<Transaction>> transactions; - // for (const auto commit_start : collected_transactions_start_id) { - // transactions.insert( - // {commit_start, std::make_unique<Transaction>(*start_logical_id_to_transaction_.at(commit_start))}); - // } +void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction, + std::map<uint64_t, Transaction> &cloned_transactions) { + // Align next and prev in deltas + // NOTE It is important that the order of delta lists is in same order + auto delta_it = transaction.deltas.begin(); + auto cloned_delta_it = cloned_transaction.deltas.begin(); + while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) { + MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct"); + // // We need to set prev and next on cloned_delta + // auto prev = delta_it->prev; + + // Find appropriate prev and delta->next for cloned deltas + auto *next = delta_it->next; + auto *cloned_next = &*cloned_delta_it; + while (next != nullptr) { + // No need to check we can be sure that it exists + cloned_next->next = &*std::ranges::find_if(cloned_transactions.at(next->command_id).deltas, + [next](const auto &delta) { return delta.uuid == next->uuid; }); + cloned_next = cloned_next->next; + next = next->next; + } + + ++delta_it; + ++cloned_delta_it; + } + MG_ASSERT(delta_it == transaction.deltas.end() && cloned_delta_it == cloned_transaction.deltas.end(), + "Both iterators must be exhausted!"); +} + +void Shard::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions) { + for (auto &[commit_start, cloned_transaction] : cloned_transactions) { + AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions); + } +} + +std::map<uint64_t, Transaction> Shard::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id) { + std::map<uint64_t, Transaction> transactions; + for (const auto commit_start : collected_transactions_start_id) { + transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()}); + } + // It is necessary to clone all the transactions first so we have new addresses + // for deltas, before doing alignment + AlignClonedTransactions(transactions); return transactions; } diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index cab92eae9..502278564 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -398,8 +398,12 @@ class Shard final { void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const; - std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions( - const std::set<uint64_t> &collected_transactions_start_id); + void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction, + std::map<uint64_t, Transaction> &cloned_transactions); + + void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions); + + std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id); VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key); diff --git a/src/storage/v3/transaction.hpp b/src/storage/v3/transaction.hpp index 6b208877a..dba65067e 100644 --- a/src/storage/v3/transaction.hpp +++ b/src/storage/v3/transaction.hpp @@ -65,6 +65,7 @@ struct Transaction { ~Transaction() {} std::list<Delta> CopyDeltas(CommitInfo *commit_info) const { + // TODO This does not solve the next and prev deltas that also need to be set std::list<Delta> copied_deltas; for (const auto &delta : deltas) { switch (delta.action) {