Add detla pruning
This commit is contained in:
parent
90bcdc4e2b
commit
954df64d1d
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
#include "storage/v3/splitter.hpp"
|
#include "storage/v3/splitter.hpp"
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
@ -139,9 +140,45 @@ std::map<uint64_t, std::unique_ptr<Transaction>> Splitter::CollectTransactions(
|
|||||||
return transactions;
|
return transactions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void PruneDeltas(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, const PrimaryKey &split_key) {
|
||||||
|
// Remove delta chains wh
|
||||||
|
auto cloned_transaction_it = cloned_transactions.begin();
|
||||||
|
while (cloned_transaction_it != cloned_transactions.end()) {
|
||||||
|
auto cloned_delta_it = cloned_transaction_it->second->deltas.begin();
|
||||||
|
|
||||||
|
if (const auto prev = cloned_delta_it->prev.Get();
|
||||||
|
prev.type == PreviousPtr::Type::VERTEX && prev.vertex->first < split_key) {
|
||||||
|
// We can remove this delta chain
|
||||||
|
auto *current_next_delta = cloned_delta_it->next;
|
||||||
|
cloned_transaction_it->second->deltas.remove_if(
|
||||||
|
[cloned_delta_it](const auto &delta) { return delta.id == cloned_delta_it->id; });
|
||||||
|
|
||||||
|
while (current_next_delta != nullptr) {
|
||||||
|
auto *next_delta = current_next_delta->next;
|
||||||
|
// Find next delta transaction delta list
|
||||||
|
auto current_transaction_it = std::ranges::find_if(
|
||||||
|
cloned_transactions, [&start_or_commit_timestamp = cloned_delta_it->commit_info->start_or_commit_timestamp](
|
||||||
|
const auto &transaction) {
|
||||||
|
return transaction.second->start_timestamp == start_or_commit_timestamp ||
|
||||||
|
transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp;
|
||||||
|
});
|
||||||
|
// Remove it
|
||||||
|
current_transaction_it->second->deltas.remove_if(
|
||||||
|
[current_next_delta](const auto &delta) { return delta.id == current_next_delta->id; });
|
||||||
|
|
||||||
|
current_next_delta = next_delta;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// while(cloned_delta_it != )
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
||||||
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
|
||||||
const PrimaryKey &split_key) {
|
const PrimaryKey &split_key) {
|
||||||
|
// Prune deltas whose delta chain points to vertex/edge that should not belong on that shard
|
||||||
|
PruneDeltas(cloned_transactions, split_key);
|
||||||
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
||||||
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
|
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
|
||||||
cloned_vertices, cloned_edges, split_key);
|
cloned_vertices, cloned_edges, split_key);
|
||||||
@ -152,10 +189,6 @@ inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) {
|
|||||||
return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE;
|
return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool IsDelta(const PreviousPtr::Type &delta_type) {
|
|
||||||
return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool DoesPrevPtrPointsToSplittedData(const PreviousPtr::Pointer &prev_ptr, const PrimaryKey &split_key) {
|
bool DoesPrevPtrPointsToSplittedData(const PreviousPtr::Pointer &prev_ptr, const PrimaryKey &split_key) {
|
||||||
return prev_ptr.type == PreviousPtr::Type::VERTEX && prev_ptr.vertex->first < split_key;
|
return prev_ptr.type == PreviousPtr::Type::VERTEX && prev_ptr.vertex->first < split_key;
|
||||||
}
|
}
|
||||||
@ -203,17 +236,14 @@ void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Tr
|
|||||||
// Find cloned delta in delta list of cloned transaction
|
// Find cloned delta in delta list of cloned transaction
|
||||||
auto found_cloned_delta_it =
|
auto found_cloned_delta_it =
|
||||||
std::ranges::find_if(cloned_transaction_it->second->deltas,
|
std::ranges::find_if(cloned_transaction_it->second->deltas,
|
||||||
[delta = ptr.delta](const auto &elem) { return elem.uuid == delta->uuid; });
|
[delta = ptr.delta](const auto &elem) { return elem.id == delta->id; });
|
||||||
MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(),
|
MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(),
|
||||||
"Delta with given uuid must exist!");
|
"Delta with given id must exist!");
|
||||||
|
|
||||||
cloned_delta->prev.Set(&*found_cloned_delta_it);
|
cloned_delta->prev.Set(&*found_cloned_delta_it);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case PreviousPtr::Type::VERTEX: {
|
case PreviousPtr::Type::VERTEX: {
|
||||||
// What if the vertex is already moved to garbage collection...
|
|
||||||
// TODO(jbajic) Maybe revisit when we apply Garbage collection with
|
|
||||||
// new transaction management system
|
|
||||||
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
|
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
|
||||||
cloned_delta->prev.Set(cloned_vertex);
|
cloned_delta->prev.Set(cloned_vertex);
|
||||||
break;
|
break;
|
||||||
@ -247,9 +277,8 @@ void Splitter::AdjustDeltaNext(const Delta &original, Delta &cloned,
|
|||||||
|
|
||||||
MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found");
|
MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found");
|
||||||
// Find cloned delta in delta list of cloned transaction
|
// Find cloned delta in delta list of cloned transaction
|
||||||
auto found_cloned_delta_it =
|
auto found_cloned_delta_it = std::ranges::find_if(
|
||||||
std::ranges::find_if(cloned_transaction_it->second->deltas,
|
cloned_transaction_it->second->deltas, [&original](const auto &elem) { return elem.id == original.next->id; });
|
||||||
[&original](const auto &elem) { return elem.uuid == original.next->uuid; });
|
|
||||||
MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), "Delta with given uuid must exist!");
|
MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), "Delta with given uuid must exist!");
|
||||||
cloned.next = &*found_cloned_delta_it;
|
cloned.next = &*found_cloned_delta_it;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user