diff --git a/src/storage/v3/splitter.cpp b/src/storage/v3/splitter.cpp index 0bd69d8c5..1c48907d0 100644 --- a/src/storage/v3/splitter.cpp +++ b/src/storage/v3/splitter.cpp @@ -140,48 +140,45 @@ std::map<uint64_t, std::unique_ptr<Transaction>> Splitter::CollectTransactions( return transactions; } +void EraseDeltaChain(auto &transaction, auto &transactions, auto &delta_head_it) { + auto *current_next_delta = delta_head_it->next; + // We need to keep track of delta_head_it in the delta list of current transaction + delta_head_it = transaction.deltas.erase(delta_head_it); + + while (current_next_delta != nullptr) { + auto *next_delta = current_next_delta->next; + // Find next delta transaction delta list + auto current_transaction_it = std::ranges::find_if( + transactions, [&start_or_commit_timestamp = + current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) { + return transaction.second->start_timestamp == start_or_commit_timestamp || + transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp; + }); + MG_ASSERT(current_transaction_it != transactions.end(), "Error when pruning deltas!"); + // Remove the delta + const auto delta_it = + std::ranges::find_if(current_transaction_it->second->deltas, + [current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; }); + if (delta_it != current_transaction_it->second->deltas.end()) { + // If the next delta is next in transaction list replace current_transaction_it + // with the next one + if (current_transaction_it->second->start_timestamp == transaction.start_timestamp && + current_transaction_it == std::next(current_transaction_it)) { + delta_head_it = current_transaction_it->second->deltas.erase(delta_it); + } else { + current_transaction_it->second->deltas.erase(delta_it); + } + } + + current_next_delta = next_delta; + } +} + void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, const PrimaryKey &split_key, EdgeContainer &cloned_edges) { // Remove delta chains that don't point to objects on splitted shard auto cloned_delta_it = cloned_transaction.deltas.begin(); - // Erases the delta chain - const auto erase_delta_chain = [&cloned_transaction, &cloned_transactions](auto &cloned_delta_it) { - auto *current_next_delta = cloned_delta_it->next; - // We need to keep track of cloned_delta_it in the delta list of current transaction - cloned_delta_it = cloned_transaction.deltas.erase(cloned_delta_it); - - while (current_next_delta != nullptr) { - auto *next_delta = current_next_delta->next; - // Find next delta transaction delta list - auto current_transaction_it = std::ranges::find_if( - cloned_transactions, - [&start_or_commit_timestamp = - current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) { - return transaction.second->start_timestamp == start_or_commit_timestamp || - transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp; - }); - MG_ASSERT(current_transaction_it != cloned_transactions.end(), "Error when pruning deltas!"); - // Remove the delta - const auto delta_it = - std::ranges::find_if(current_transaction_it->second->deltas, - [current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; }); - if (delta_it != current_transaction_it->second->deltas.end()) { - // If the next delta is next in transaction list replace current_transaction_it - // with the next one - if (current_transaction_it->second->start_timestamp == cloned_transaction.start_timestamp && - current_transaction_it == std::next(current_transaction_it)) { - // TODO Dont do this if the delta_it is not next in line in transaction list - cloned_delta_it = current_transaction_it->second->deltas.erase(delta_it); - } else { - current_transaction_it->second->deltas.erase(delta_it); - } - } - - current_next_delta = next_delta; - } - }; - while (cloned_delta_it != cloned_transaction.deltas.end()) { const auto prev = cloned_delta_it->prev.Get(); switch (prev.type) { @@ -192,7 +189,7 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique case PreviousPtr::Type::VERTEX: { if (prev.vertex->first < split_key) { // We can remove this delta chain - erase_delta_chain(cloned_delta_it); + EraseDeltaChain(cloned_transaction, cloned_transactions, cloned_delta_it); } else { ++cloned_delta_it; } @@ -201,7 +198,7 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique case PreviousPtr::Type::EDGE: { if (const auto edge_gid = prev.edge->gid; !cloned_edges.contains(edge_gid)) { // We can remove this delta chain - erase_delta_chain(cloned_delta_it); + EraseDeltaChain(cloned_transaction, cloned_transactions, cloned_delta_it); } else { ++cloned_delta_it; } @@ -217,41 +214,6 @@ void Splitter::PruneOriginalDeltas(Transaction &transaction, // Remove delta chains that don't point to objects on splitted shard auto delta_it = transaction.deltas.begin(); - const auto erase_delta_chain = [&transaction, &transactions](auto &cloned_delta_it) { - auto *current_next_delta = cloned_delta_it->next; - // We need to keep track of cloned_delta_it in the delta list of current transaction - cloned_delta_it = transaction.deltas.erase(cloned_delta_it); - - while (current_next_delta != nullptr) { - auto *next_delta = current_next_delta->next; - // Find next delta transaction delta list - auto current_transaction_it = std::ranges::find_if( - transactions, [&start_or_commit_timestamp = - current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) { - return transaction.second->start_timestamp == start_or_commit_timestamp || - transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp; - }); - MG_ASSERT(current_transaction_it != transactions.end(), "Error when pruning deltas!"); - // Remove the delta - const auto delta_it = - std::ranges::find_if(current_transaction_it->second->deltas, - [current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; }); - if (delta_it != current_transaction_it->second->deltas.end()) { - // If the next delta is next in transaction list replace current_transaction_it - // with the next one - if (current_transaction_it->second->start_timestamp == transaction.start_timestamp && - current_transaction_it == std::next(current_transaction_it)) { - // TODO Dont do this if the delta_it is not next in line in transaction list - cloned_delta_it = current_transaction_it->second->deltas.erase(delta_it); - } else { - current_transaction_it->second->deltas.erase(delta_it); - } - } - - current_next_delta = next_delta; - } - }; - while (delta_it != transaction.deltas.end()) { const auto prev = delta_it->prev.Get(); switch (prev.type) { @@ -262,7 +224,7 @@ void Splitter::PruneOriginalDeltas(Transaction &transaction, case PreviousPtr::Type::VERTEX: { if (prev.vertex->first >= split_key) { // We can remove this delta chain - erase_delta_chain(delta_it); + EraseDeltaChain(transaction, transactions, delta_it); } else { ++delta_it; } @@ -271,7 +233,7 @@ void Splitter::PruneOriginalDeltas(Transaction &transaction, case PreviousPtr::Type::EDGE: { if (const auto edge_gid = prev.edge->gid; !edges_.contains(edge_gid)) { // We can remove this delta chain - erase_delta_chain(delta_it); + EraseDeltaChain(transaction, transactions, delta_it); } else { ++delta_it; } @@ -336,7 +298,7 @@ void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Tr // Align next ptr and prev ptr AdjustDeltaNextAndPrev(*delta, *cloned_delta, cloned_transactions); - // TODO Next delta might not belong to the cloned transaction and thats + // Next delta might not belong to the cloned transaction and thats // why we skip this delta of the delta chain if (cloned_delta->next != nullptr) { cloned_delta = cloned_delta->next; @@ -366,8 +328,6 @@ void Splitter::AdjustEdgeRef(Delta &cloned_delta, EdgeContainer &cloned_edges) c case Delta::Action::REMOVE_OUT_EDGE: { // Find edge if (config_.items.properties_on_edges) { - // Only case when not finding is when the edge is not on splitted shard - // TODO Do this after prune an move condition into assert if (const auto cloned_edge_it = cloned_edges.find(cloned_delta.vertex_edge.edge.ptr->gid); cloned_edge_it != cloned_edges.end()) { cloned_delta.vertex_edge.edge = EdgeRef{&cloned_edge_it->second};