Fix ASAN bug
This commit is contained in:
parent
0173d9ce37
commit
39cfcca2e5
@ -145,8 +145,10 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
|
||||
// Remove delta chains that don't point to objects on splitted shard
|
||||
auto cloned_delta_it = cloned_transaction.deltas.begin();
|
||||
|
||||
const auto remove_from_delta_chain = [&cloned_transaction, &cloned_transactions](auto &cloned_delta_it) {
|
||||
// Erases the delta chain
|
||||
const auto erase_delta_chain = [&cloned_transaction, &cloned_transactions](auto &cloned_delta_it) {
|
||||
auto *current_next_delta = cloned_delta_it->next;
|
||||
// We need to keep track of cloned_delta_it in the delta list of current transaction
|
||||
cloned_delta_it = cloned_transaction.deltas.erase(cloned_delta_it);
|
||||
|
||||
while (current_next_delta != nullptr) {
|
||||
@ -160,9 +162,21 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
|
||||
transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp;
|
||||
});
|
||||
MG_ASSERT(current_transaction_it != cloned_transactions.end(), "Error when pruning deltas!");
|
||||
// Remove it
|
||||
current_transaction_it->second->deltas.remove_if(
|
||||
[¤t_next_delta = *current_next_delta](const auto &delta) { return delta == current_next_delta; });
|
||||
// Remove the delta
|
||||
const auto delta_it =
|
||||
std::ranges::find_if(current_transaction_it->second->deltas,
|
||||
[current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; });
|
||||
if (delta_it != current_transaction_it->second->deltas.end()) {
|
||||
// If the next delta is next in transaction list replace current_transaction_it
|
||||
// with the next one
|
||||
if (current_transaction_it->second->start_timestamp == cloned_transaction.start_timestamp &&
|
||||
current_transaction_it == std::next(current_transaction_it)) {
|
||||
// TODO Dont do this if the delta_it is not next in line in transaction list
|
||||
cloned_delta_it = current_transaction_it->second->deltas.erase(delta_it);
|
||||
} else {
|
||||
current_transaction_it->second->deltas.erase(delta_it);
|
||||
}
|
||||
}
|
||||
|
||||
current_next_delta = next_delta;
|
||||
}
|
||||
@ -178,7 +192,7 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
|
||||
case PreviousPtr::Type::VERTEX: {
|
||||
if (prev.vertex->first < split_key) {
|
||||
// We can remove this delta chain
|
||||
remove_from_delta_chain(cloned_delta_it);
|
||||
erase_delta_chain(cloned_delta_it);
|
||||
} else {
|
||||
++cloned_delta_it;
|
||||
}
|
||||
@ -187,11 +201,81 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
|
||||
case PreviousPtr::Type::EDGE: {
|
||||
if (const auto edge_gid = prev.edge->gid; !cloned_edges.contains(edge_gid)) {
|
||||
// We can remove this delta chain
|
||||
remove_from_delta_chain(cloned_delta_it);
|
||||
erase_delta_chain(cloned_delta_it);
|
||||
} else {
|
||||
++cloned_delta_it;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Splitter::PruneOriginalDeltas(Transaction &transaction,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &transactions,
|
||||
const PrimaryKey &split_key) {
|
||||
// Remove delta chains that don't point to objects on splitted shard
|
||||
auto delta_it = transaction.deltas.begin();
|
||||
|
||||
const auto erase_delta_chain = [&transaction, &transactions](auto &cloned_delta_it) {
|
||||
auto *current_next_delta = cloned_delta_it->next;
|
||||
// We need to keep track of cloned_delta_it in the delta list of current transaction
|
||||
cloned_delta_it = transaction.deltas.erase(cloned_delta_it);
|
||||
|
||||
while (current_next_delta != nullptr) {
|
||||
auto *next_delta = current_next_delta->next;
|
||||
// Find next delta transaction delta list
|
||||
auto current_transaction_it = std::ranges::find_if(
|
||||
transactions, [&start_or_commit_timestamp =
|
||||
current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) {
|
||||
return transaction.second->start_timestamp == start_or_commit_timestamp ||
|
||||
transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp;
|
||||
});
|
||||
MG_ASSERT(current_transaction_it != transactions.end(), "Error when pruning deltas!");
|
||||
// Remove the delta
|
||||
const auto delta_it =
|
||||
std::ranges::find_if(current_transaction_it->second->deltas,
|
||||
[current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; });
|
||||
if (delta_it != current_transaction_it->second->deltas.end()) {
|
||||
// If the next delta is next in transaction list replace current_transaction_it
|
||||
// with the next one
|
||||
if (current_transaction_it->second->start_timestamp == transaction.start_timestamp &&
|
||||
current_transaction_it == std::next(current_transaction_it)) {
|
||||
// TODO Dont do this if the delta_it is not next in line in transaction list
|
||||
cloned_delta_it = current_transaction_it->second->deltas.erase(delta_it);
|
||||
} else {
|
||||
current_transaction_it->second->deltas.erase(delta_it);
|
||||
}
|
||||
}
|
||||
|
||||
current_next_delta = next_delta;
|
||||
}
|
||||
};
|
||||
|
||||
while (delta_it != transaction.deltas.end()) {
|
||||
const auto prev = delta_it->prev.Get();
|
||||
switch (prev.type) {
|
||||
case PreviousPtr::Type::DELTA:
|
||||
case PreviousPtr::Type::NULLPTR:
|
||||
++delta_it;
|
||||
break;
|
||||
case PreviousPtr::Type::VERTEX: {
|
||||
if (prev.vertex->first >= split_key) {
|
||||
// We can remove this delta chain
|
||||
erase_delta_chain(delta_it);
|
||||
} else {
|
||||
++delta_it;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case PreviousPtr::Type::EDGE: {
|
||||
if (const auto edge_gid = prev.edge->gid; !edges_.contains(edge_gid)) {
|
||||
// We can remove this delta chain
|
||||
erase_delta_chain(delta_it);
|
||||
} else {
|
||||
++delta_it;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -200,15 +284,19 @@ void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique
|
||||
void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
||||
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
|
||||
const PrimaryKey &split_key) {
|
||||
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
||||
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
|
||||
for (auto &[start_id, cloned_transaction] : cloned_transactions) {
|
||||
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[start_id], cloned_transactions,
|
||||
cloned_vertices, cloned_edges, split_key);
|
||||
}
|
||||
// Prune deltas whose delta chain points to vertex/edge that should not belong on that shard
|
||||
// Prune must be after adjust, since next, and prev are not set and we cannot follow the chain
|
||||
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
||||
for (auto &[start_id, cloned_transaction] : cloned_transactions) {
|
||||
PruneDeltas(*cloned_transaction, cloned_transactions, split_key, cloned_edges);
|
||||
}
|
||||
// Also we need to remove deltas from original transactions
|
||||
for (auto &[start_id, original_transaction] : start_logical_id_to_transaction_) {
|
||||
PruneOriginalDeltas(*original_transaction, start_logical_id_to_transaction_, split_key);
|
||||
}
|
||||
}
|
||||
|
||||
inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) {
|
||||
@ -350,11 +438,13 @@ void Splitter::AdjustDeltaPrevPtr(const Delta &original, Delta &cloned,
|
||||
case PreviousPtr::Type::VERTEX: {
|
||||
// The vertex was extracted and it is safe to reuse address
|
||||
cloned.prev.Set(ptr.vertex);
|
||||
ptr.vertex->second.delta = &cloned;
|
||||
break;
|
||||
}
|
||||
case PreviousPtr::Type::EDGE: {
|
||||
// We can never be here if we have properties on edge disabled
|
||||
auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid);
|
||||
ptr.edge->delta = &cloned;
|
||||
cloned.prev.Set(&cloned_edge->second);
|
||||
break;
|
||||
}
|
||||
|
@ -77,6 +77,9 @@ class Splitter final {
|
||||
|
||||
static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, const Delta *delta);
|
||||
|
||||
void PruneOriginalDeltas(Transaction &transaction, std::map<uint64_t, std::unique_ptr<Transaction>> &transactions,
|
||||
const PrimaryKey &split_key);
|
||||
|
||||
void AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
||||
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
|
||||
|
Loading…
Reference in New Issue
Block a user