Copy all relevant transactions
Copy all transactions which are relevant for entity MVCC value resolution.
This commit is contained in:
parent
7d0e885f9a
commit
28624aaa88
@ -36,22 +36,22 @@ Splitter::Splitter(VertexContainer &vertices, EdgeContainer &edges,
|
|||||||
SplitData Splitter::SplitShard(const PrimaryKey &split_key) {
|
SplitData Splitter::SplitShard(const PrimaryKey &split_key) {
|
||||||
SplitData data;
|
SplitData data;
|
||||||
|
|
||||||
std::set<uint64_t> collected_transactions_start_id;
|
std::set<uint64_t> collected_transactions_;
|
||||||
data.vertices = CollectVertices(data, collected_transactions_start_id, split_key);
|
data.vertices = CollectVertices(data, collected_transactions_, split_key);
|
||||||
data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key);
|
data.edges = CollectEdges(collected_transactions_, data.vertices, split_key);
|
||||||
data.transactions = CollectTransactions(collected_transactions_start_id, data.vertices, *data.edges);
|
data.transactions = CollectTransactions(collected_transactions_, data.vertices, *data.edges);
|
||||||
|
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) {
|
void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_, Delta *delta) {
|
||||||
while (delta != nullptr) {
|
while (delta != nullptr) {
|
||||||
collected_transactions_start_id.insert(delta->commit_info->start_or_commit_timestamp.logical_id);
|
collected_transactions_.insert(delta->commit_info->start_or_commit_timestamp.logical_id);
|
||||||
delta = delta->next;
|
delta = delta->next;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id,
|
VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_,
|
||||||
const PrimaryKey &split_key) {
|
const PrimaryKey &split_key) {
|
||||||
// Collection of indices is here since it heavily depends on vertices
|
// Collection of indices is here since it heavily depends on vertices
|
||||||
// Old vertex pointer new entry pointer
|
// Old vertex pointer new entry pointer
|
||||||
@ -76,7 +76,7 @@ VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &c
|
|||||||
auto split_key_it = vertices_.find(split_key);
|
auto split_key_it = vertices_.find(split_key);
|
||||||
while (split_key_it != vertices_.end()) {
|
while (split_key_it != vertices_.end()) {
|
||||||
// Go through deltas and pick up transactions start_id
|
// Go through deltas and pick up transactions start_id
|
||||||
ScanDeltas(collected_transactions_start_id, split_key_it->second.delta);
|
ScanDeltas(collected_transactions_, split_key_it->second.delta);
|
||||||
|
|
||||||
const auto *old_vertex_ptr = &*split_key_it;
|
const auto *old_vertex_ptr = &*split_key_it;
|
||||||
auto next_it = std::next(split_key_it);
|
auto next_it = std::next(split_key_it);
|
||||||
@ -92,7 +92,7 @@ VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &c
|
|||||||
return splitted_data;
|
return splitted_data;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
|
std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collected_transactions_,
|
||||||
const VertexContainer &split_vertices,
|
const VertexContainer &split_vertices,
|
||||||
const PrimaryKey &split_key) {
|
const PrimaryKey &split_key) {
|
||||||
if (!config_.items.properties_on_edges) {
|
if (!config_.items.properties_on_edges) {
|
||||||
@ -105,7 +105,7 @@ std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collecte
|
|||||||
for (const auto &edge_ref : edges_ref) {
|
for (const auto &edge_ref : edges_ref) {
|
||||||
auto *edge = std::get<2>(edge_ref).ptr;
|
auto *edge = std::get<2>(edge_ref).ptr;
|
||||||
const auto &other_vtx = std::get<1>(edge_ref);
|
const auto &other_vtx = std::get<1>(edge_ref);
|
||||||
ScanDeltas(collected_transactions_start_id, edge->delta);
|
ScanDeltas(collected_transactions_, edge->delta);
|
||||||
// Check if src and dest edge are both on splitted shard
|
// Check if src and dest edge are both on splitted shard
|
||||||
// so we know if we should remove orphan edge
|
// so we know if we should remove orphan edge
|
||||||
if (other_vtx.primary_key >= split_key) {
|
if (other_vtx.primary_key >= split_key) {
|
||||||
@ -124,22 +124,33 @@ std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collecte
|
|||||||
return splitted_edges;
|
return splitted_edges;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<uint64_t, Transaction> Splitter::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
|
std::map<uint64_t, Transaction> Splitter::CollectTransactions(const std::set<uint64_t> &collected_transactions_,
|
||||||
VertexContainer &cloned_vertices,
|
VertexContainer &cloned_vertices,
|
||||||
EdgeContainer &cloned_edges) {
|
EdgeContainer &cloned_edges) {
|
||||||
std::map<uint64_t, Transaction> transactions;
|
std::map<uint64_t, Transaction> transactions;
|
||||||
for (const auto commit_start : collected_transactions_start_id) {
|
|
||||||
// If it does not contain then the transaction has commited, and we ignore it
|
for (const auto &[commit_start, transaction] : start_logical_id_to_transaction_) {
|
||||||
if (start_logical_id_to_transaction_.contains(commit_start)) {
|
// We need all transaction whose deltas need to be resolved for any of the
|
||||||
|
// entities
|
||||||
|
if (collected_transactions_.contains(transaction->commit_info->start_or_commit_timestamp.logical_id)) {
|
||||||
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
|
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// It is necessary to clone all the transactions first so we have new addresses
|
// It is necessary to clone all the transactions first so we have new addresses
|
||||||
// for deltas, before doing alignment of deltas and prev_ptr
|
// for deltas, before doing alignment of deltas and prev_ptr
|
||||||
AlignClonedTransactions(transactions, cloned_vertices, cloned_edges);
|
AlignClonedTransactions(transactions, cloned_vertices, cloned_edges);
|
||||||
return transactions;
|
return transactions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Splitter::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions,
|
||||||
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
|
||||||
|
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
||||||
|
AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
|
||||||
|
cloned_vertices, cloned_edges);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
||||||
std::map<uint64_t, Transaction> &cloned_transactions,
|
std::map<uint64_t, Transaction> &cloned_transactions,
|
||||||
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
|
||||||
@ -201,12 +212,4 @@ void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Tra
|
|||||||
"Both iterators must be exhausted!");
|
"Both iterators must be exhausted!");
|
||||||
}
|
}
|
||||||
|
|
||||||
void Splitter::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions,
|
|
||||||
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
|
|
||||||
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
|
||||||
AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
|
|
||||||
cloned_vertices, cloned_edges);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace memgraph::storage::v3
|
} // namespace memgraph::storage::v3
|
||||||
|
Loading…
Reference in New Issue
Block a user