Align deltas

This commit is contained in:
jbajic 2023-01-11 14:16:41 +01:00
parent 9dc16deae2
commit 050d5efae7
3 changed files with 50 additions and 9 deletions

View File

@ -1107,13 +1107,49 @@ std::optional<EdgeContainer> Shard::CollectEdges(std::set<uint64_t> &collected_t
return splitted_edges;
}
std::map<uint64_t, std::unique_ptr<Transaction>> Shard::CollectTransactions(
const std::set<uint64_t> &collected_transactions_start_id) {
std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
// for (const auto commit_start : collected_transactions_start_id) {
// transactions.insert(
// {commit_start, std::make_unique<Transaction>(*start_logical_id_to_transaction_.at(commit_start))});
// }
void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions) {
// Align next and prev in deltas
// NOTE It is important that the order of delta lists is in same order
auto delta_it = transaction.deltas.begin();
auto cloned_delta_it = cloned_transaction.deltas.begin();
while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) {
MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct");
// // We need to set prev and next on cloned_delta
// auto prev = delta_it->prev;
// Find appropriate prev and delta->next for cloned deltas
auto *next = delta_it->next;
auto *cloned_next = &*cloned_delta_it;
while (next != nullptr) {
// No need to check we can be sure that it exists
cloned_next->next = &*std::ranges::find_if(cloned_transactions.at(next->command_id).deltas,
[next](const auto &delta) { return delta.uuid == next->uuid; });
cloned_next = cloned_next->next;
next = next->next;
}
++delta_it;
++cloned_delta_it;
}
MG_ASSERT(delta_it == transaction.deltas.end() && cloned_delta_it == cloned_transaction.deltas.end(),
"Both iterators must be exhausted!");
}
void Shard::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions) {
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions);
}
}
std::map<uint64_t, Transaction> Shard::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id) {
std::map<uint64_t, Transaction> transactions;
for (const auto commit_start : collected_transactions_start_id) {
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
}
// It is necessary to clone all the transactions first so we have new addresses
// for deltas, before doing alignment
AlignClonedTransactions(transactions);
return transactions;
}

View File

@ -398,8 +398,12 @@ class Shard final {
void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const;
std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
const std::set<uint64_t> &collected_transactions_start_id);
void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions);
void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions);
std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id);
VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key);

View File

@ -65,6 +65,7 @@ struct Transaction {
~Transaction() {}
std::list<Delta> CopyDeltas(CommitInfo *commit_info) const {
// TODO This does not solve the next and prev deltas that also need to be set
std::list<Delta> copied_deltas;
for (const auto &delta : deltas) {
switch (delta.action) {