Extract logic of checking delta skips

This commit is contained in:
jbajic 2023-02-02 16:39:00 +01:00
parent 4619a87e98
commit a95bac65c6
2 changed files with 30 additions and 14 deletions

View File

@ -56,7 +56,7 @@ SplitData Splitter::SplitShard(const PrimaryKey &split_key, const std::optional<
std::set<uint64_t> collected_transactions_;
data.vertices = CollectVertices(data, collected_transactions_, split_key);
data.edges = CollectEdges(collected_transactions_, data.vertices, split_key);
data.transactions = CollectTransactions(collected_transactions_, data.vertices, *data.edges);
data.transactions = CollectTransactions(collected_transactions_, data.vertices, *data.edges, split_key);
return data;
}
@ -121,7 +121,8 @@ std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collecte
}
std::map<uint64_t, std::unique_ptr<Transaction>> Splitter::CollectTransactions(
const std::set<uint64_t> &collected_transactions_, VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
const std::set<uint64_t> &collected_transactions_, VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
const PrimaryKey &split_key) {
std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
for (const auto &[commit_start, transaction] : start_logical_id_to_transaction_) {
@ -134,36 +135,49 @@ std::map<uint64_t, std::unique_ptr<Transaction>> Splitter::CollectTransactions(
// It is necessary to clone all the transactions first so we have new addresses
// for deltas, before doing alignment of deltas and prev_ptr
AdjustClonedTransactions(transactions, cloned_vertices, cloned_edges);
AdjustClonedTransactions(transactions, cloned_vertices, cloned_edges, split_key);
return transactions;
}
void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
const PrimaryKey &split_key) {
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
cloned_vertices, cloned_edges);
cloned_vertices, cloned_edges, split_key);
}
}
inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) {
return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE;
}
bool IsDelta(const PreviousPtr::Type &delta_type) {
return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE;
}
bool DoesPrevPtrPointsToSplittedData(const PreviousPtr::Pointer &prev_ptr, const PrimaryKey &split_key) {
return prev_ptr.type == PreviousPtr::Type::VERTEX && prev_ptr.vertex->first < split_key;
}
void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
const PrimaryKey &split_key) {
// Align next and prev in deltas
// NOTE It is important that the order of delta lists is in same order
auto delta_it = transaction.deltas.begin();
auto cloned_delta_it = cloned_transaction.deltas.begin();
while (delta_it != transaction.deltas.end()) {
// We can safely ignore deltas which are not head of delta chain
if (delta_it->prev.Get().type == PreviousPtr::Type::DELTA ||
delta_it->prev.Get().type == PreviousPtr::Type::NULLPTR) {
// Dont' adjust delta chain that points to irrelevant data vertices/edges
if (const auto delta_prev = delta_it->prev.Get();
!IsDeltaHeadOfChain(delta_prev.type) && !DoesPrevPtrPointsToSplittedData(delta_prev, split_key)) {
++delta_it;
++cloned_delta_it;
continue;
}
// Only start iterating through deltas that are head of delta chain
// => they have prev pointer to vertex/edge
const auto *delta = &*delta_it;
auto *cloned_delta = &*cloned_delta_it;
while (delta->next != nullptr) {

View File

@ -72,16 +72,18 @@ class Splitter final {
std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
const std::set<uint64_t> &collected_transactions_start_id, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
EdgeContainer &cloned_edges, const PrimaryKey &split_key);
static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta);
static void AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
const PrimaryKey &split_key);
void AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
const PrimaryKey &split_key);
static void AdjustDeltaNext(const Delta &original, Delta &cloned,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions);