Adjust pointers
This commit is contained in:
parent
050d5efae7
commit
282725fd0f
@ -1108,25 +1108,50 @@ std::optional<EdgeContainer> Shard::CollectEdges(std::set<uint64_t> &collected_t
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
||||||
std::map<uint64_t, Transaction> &cloned_transactions) {
|
std::map<uint64_t, Transaction> &cloned_transactions,
|
||||||
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
|
||||||
// Align next and prev in deltas
|
// Align next and prev in deltas
|
||||||
// NOTE It is important that the order of delta lists is in same order
|
// NOTE It is important that the order of delta lists is in same order
|
||||||
auto delta_it = transaction.deltas.begin();
|
auto delta_it = transaction.deltas.begin();
|
||||||
auto cloned_delta_it = cloned_transaction.deltas.begin();
|
auto cloned_delta_it = cloned_transaction.deltas.begin();
|
||||||
while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) {
|
while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) {
|
||||||
MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct");
|
MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct");
|
||||||
// // We need to set prev and next on cloned_delta
|
|
||||||
// auto prev = delta_it->prev;
|
|
||||||
|
|
||||||
// Find appropriate prev and delta->next for cloned deltas
|
// Find appropriate prev and delta->next for cloned deltas
|
||||||
auto *next = delta_it->next;
|
// auto *prev = &delta_it->prev;
|
||||||
auto *cloned_next = &*cloned_delta_it;
|
// auto *cloned_prev = &cloned_delta_it->prev;
|
||||||
while (next != nullptr) {
|
|
||||||
// No need to check we can be sure that it exists
|
auto *delta = &*delta_it;
|
||||||
cloned_next->next = &*std::ranges::find_if(cloned_transactions.at(next->command_id).deltas,
|
auto *cloned_delta = &*cloned_delta_it;
|
||||||
[next](const auto &delta) { return delta.uuid == next->uuid; });
|
while (delta != nullptr) {
|
||||||
cloned_next = cloned_next->next;
|
// Align delta
|
||||||
next = next->next;
|
cloned_delta->next = &*std::ranges::find_if(cloned_transactions.at(delta->command_id).deltas,
|
||||||
|
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
|
||||||
|
// Align prev ptr
|
||||||
|
auto ptr = delta->prev.Get();
|
||||||
|
switch (ptr.type) {
|
||||||
|
case PreviousPtr::Type::NULLPTR: {
|
||||||
|
// noop
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PreviousPtr::Type::DELTA: {
|
||||||
|
cloned_delta->prev.Set(ptr.delta);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PreviousPtr::Type::VERTEX: {
|
||||||
|
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
|
||||||
|
cloned_delta->prev.Set(cloned_vertex);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PreviousPtr::Type::EDGE: {
|
||||||
|
// TODO Case when there are no properties on edge is not handled
|
||||||
|
auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid);
|
||||||
|
cloned_delta->prev.Set(&cloned_edge->second);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
cloned_delta = cloned_delta->next;
|
||||||
|
delta = delta->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
++delta_it;
|
++delta_it;
|
||||||
@ -1136,20 +1161,24 @@ void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transa
|
|||||||
"Both iterators must be exhausted!");
|
"Both iterators must be exhausted!");
|
||||||
}
|
}
|
||||||
|
|
||||||
void Shard::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions) {
|
void Shard::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions,
|
||||||
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
|
||||||
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
||||||
AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions);
|
AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
|
||||||
|
cloned_vertices, cloned_edges);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<uint64_t, Transaction> Shard::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id) {
|
std::map<uint64_t, Transaction> Shard::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
|
||||||
|
VertexContainer &cloned_vertices,
|
||||||
|
EdgeContainer &cloned_edges) {
|
||||||
std::map<uint64_t, Transaction> transactions;
|
std::map<uint64_t, Transaction> transactions;
|
||||||
for (const auto commit_start : collected_transactions_start_id) {
|
for (const auto commit_start : collected_transactions_start_id) {
|
||||||
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
|
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
|
||||||
}
|
}
|
||||||
// It is necessary to clone all the transactions first so we have new addresses
|
// It is necessary to clone all the transactions first so we have new addresses
|
||||||
// for deltas, before doing alignment
|
// for deltas, before doing alignment of deltas and prev_ptr
|
||||||
AlignClonedTransactions(transactions);
|
AlignClonedTransactions(transactions, cloned_vertices, cloned_edges);
|
||||||
return transactions;
|
return transactions;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1158,21 +1187,11 @@ SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
|
|||||||
std::set<uint64_t> collected_transactions_start_id;
|
std::set<uint64_t> collected_transactions_start_id;
|
||||||
data.vertices = CollectVertices(collected_transactions_start_id, split_key);
|
data.vertices = CollectVertices(collected_transactions_start_id, split_key);
|
||||||
data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key);
|
data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key);
|
||||||
|
data.transactions = CollectTransactions(collected_transactions_start_id, data.vertices, *data.edges);
|
||||||
|
|
||||||
// TODO indices wont work since timestamp cannot be replicated
|
// TODO indices wont work since timestamp cannot be replicated
|
||||||
// data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
|
// data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
|
||||||
|
|
||||||
// data.transactions = CollectTransactions(collected_transactions_start_id);
|
|
||||||
|
|
||||||
// Update delta addresses with the new addresses
|
|
||||||
// for (auto &vertex : data.vertices) {
|
|
||||||
// AdjustSplittedDataDeltas(vertex.second, data.transactions);
|
|
||||||
// }
|
|
||||||
// if (data.edges) {
|
|
||||||
// for (auto &edge : data.edges) {
|
|
||||||
// AdjustSplittedDataDeltas(edge, data.transactions);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -399,11 +399,14 @@ class Shard final {
|
|||||||
void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const;
|
void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const;
|
||||||
|
|
||||||
void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
||||||
std::map<uint64_t, Transaction> &cloned_transactions);
|
std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
|
||||||
|
EdgeContainer &cloned_edges);
|
||||||
|
|
||||||
void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions);
|
void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
|
||||||
|
EdgeContainer &cloned_edges);
|
||||||
|
|
||||||
std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id);
|
std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
|
||||||
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
|
||||||
|
|
||||||
VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key);
|
VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user