Fix delta chain prev pointer
This commit is contained in:
parent
954df64d1d
commit
de15c9719c
@ -241,6 +241,8 @@ struct Delta {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
friend bool operator==(const Delta &lhs, const Delta &rhs) noexcept { return lhs.id == rhs.id; }
|
||||||
|
|
||||||
Action action;
|
Action action;
|
||||||
uint64_t id;
|
uint64_t id;
|
||||||
// TODO: optimize with in-place copy
|
// TODO: optimize with in-place copy
|
||||||
|
@ -141,8 +141,6 @@ class LabelIndex {
|
|||||||
[[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
|
[[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
|
||||||
cloned_indices_container.insert(index.extract(entry_it));
|
cloned_indices_container.insert(index.extract(entry_it));
|
||||||
MG_ASSERT(inserted, "Failed to extract index entry!");
|
MG_ASSERT(inserted, "Failed to extract index entry!");
|
||||||
|
|
||||||
// vertex_entry_map[index_type_val].insert({inserted_entry_it->vertex, inserted_entry_it});
|
|
||||||
}
|
}
|
||||||
entry_it = next_entry_it;
|
entry_it = next_entry_it;
|
||||||
}
|
}
|
||||||
@ -286,7 +284,6 @@ class LabelPropertyIndex {
|
|||||||
[[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
|
[[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
|
||||||
cloned_index_container.insert(index.extract(entry_it));
|
cloned_index_container.insert(index.extract(entry_it));
|
||||||
MG_ASSERT(inserted, "Failed to extract index entry!");
|
MG_ASSERT(inserted, "Failed to extract index entry!");
|
||||||
// vertex_entry_map[index_type_val].insert({inserted_entry_it->vertex, inserted_entry_it});
|
|
||||||
}
|
}
|
||||||
entry_it = next_entry_it;
|
entry_it = next_entry_it;
|
||||||
}
|
}
|
||||||
|
@ -140,37 +140,50 @@ std::map<uint64_t, std::unique_ptr<Transaction>> Splitter::CollectTransactions(
|
|||||||
return transactions;
|
return transactions;
|
||||||
}
|
}
|
||||||
|
|
||||||
void PruneDeltas(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, const PrimaryKey &split_key) {
|
void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
||||||
// Remove delta chains wh
|
const PrimaryKey &split_key) {
|
||||||
auto cloned_transaction_it = cloned_transactions.begin();
|
// Remove delta chains that don't point to objects on splitted shard
|
||||||
while (cloned_transaction_it != cloned_transactions.end()) {
|
auto cloned_delta_it = cloned_transaction.deltas.begin();
|
||||||
auto cloned_delta_it = cloned_transaction_it->second->deltas.begin();
|
|
||||||
|
|
||||||
if (const auto prev = cloned_delta_it->prev.Get();
|
while (cloned_delta_it != cloned_transaction.deltas.end()) {
|
||||||
prev.type == PreviousPtr::Type::VERTEX && prev.vertex->first < split_key) {
|
const auto prev = cloned_delta_it->prev.Get();
|
||||||
// We can remove this delta chain
|
switch (prev.type) {
|
||||||
auto *current_next_delta = cloned_delta_it->next;
|
case PreviousPtr::Type::DELTA:
|
||||||
cloned_transaction_it->second->deltas.remove_if(
|
case PreviousPtr::Type::NULLPTR:
|
||||||
[cloned_delta_it](const auto &delta) { return delta.id == cloned_delta_it->id; });
|
++cloned_delta_it;
|
||||||
|
break;
|
||||||
|
case PreviousPtr::Type::VERTEX: {
|
||||||
|
if (prev.vertex->first < split_key) {
|
||||||
|
// We can remove this delta chain
|
||||||
|
auto *current_next_delta = cloned_delta_it->next;
|
||||||
|
cloned_delta_it = cloned_transaction.deltas.erase(cloned_delta_it);
|
||||||
|
|
||||||
while (current_next_delta != nullptr) {
|
while (current_next_delta != nullptr) {
|
||||||
auto *next_delta = current_next_delta->next;
|
auto *next_delta = current_next_delta->next;
|
||||||
// Find next delta transaction delta list
|
// Find next delta transaction delta list
|
||||||
auto current_transaction_it = std::ranges::find_if(
|
auto current_transaction_it = std::ranges::find_if(
|
||||||
cloned_transactions, [&start_or_commit_timestamp = cloned_delta_it->commit_info->start_or_commit_timestamp](
|
cloned_transactions,
|
||||||
const auto &transaction) {
|
[&start_or_commit_timestamp =
|
||||||
return transaction.second->start_timestamp == start_or_commit_timestamp ||
|
current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) {
|
||||||
transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp;
|
return transaction.second->start_timestamp == start_or_commit_timestamp ||
|
||||||
});
|
transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp;
|
||||||
// Remove it
|
});
|
||||||
current_transaction_it->second->deltas.remove_if(
|
MG_ASSERT(current_transaction_it != cloned_transactions.end(), "Error when pruning deltas!");
|
||||||
[current_next_delta](const auto &delta) { return delta.id == current_next_delta->id; });
|
// Remove it
|
||||||
|
current_transaction_it->second->deltas.remove_if(
|
||||||
|
[¤t_next_delta = *current_next_delta](const auto &delta) { return delta == current_next_delta; });
|
||||||
|
|
||||||
current_next_delta = next_delta;
|
current_next_delta = next_delta;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
++cloned_delta_it;
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
case PreviousPtr::Type::EDGE:
|
||||||
|
++cloned_delta_it;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// while(cloned_delta_it != )
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -178,11 +191,13 @@ void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Trans
|
|||||||
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
|
||||||
const PrimaryKey &split_key) {
|
const PrimaryKey &split_key) {
|
||||||
// Prune deltas whose delta chain points to vertex/edge that should not belong on that shard
|
// Prune deltas whose delta chain points to vertex/edge that should not belong on that shard
|
||||||
PruneDeltas(cloned_transactions, split_key);
|
|
||||||
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
||||||
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
|
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
|
||||||
cloned_vertices, cloned_edges, split_key);
|
cloned_vertices, cloned_edges, split_key);
|
||||||
}
|
}
|
||||||
|
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
|
||||||
|
PruneDeltas(*cloned_transaction, cloned_transactions, split_key);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) {
|
inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) {
|
||||||
@ -196,16 +211,14 @@ bool DoesPrevPtrPointsToSplittedData(const PreviousPtr::Pointer &prev_ptr, const
|
|||||||
void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
|
||||||
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
||||||
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges,
|
||||||
const PrimaryKey &split_key) {
|
const PrimaryKey & /*split_key*/) {
|
||||||
// Align next and prev in deltas
|
|
||||||
auto delta_it = transaction.deltas.begin();
|
auto delta_it = transaction.deltas.begin();
|
||||||
auto cloned_delta_it = cloned_transaction.deltas.begin();
|
auto cloned_delta_it = cloned_transaction.deltas.begin();
|
||||||
|
|
||||||
while (delta_it != transaction.deltas.end()) {
|
while (delta_it != transaction.deltas.end()) {
|
||||||
// We can safely ignore deltas which are not head of delta chain
|
// We can safely ignore deltas which are not head of delta chain
|
||||||
// Dont' adjust delta chain that points to irrelevant data vertices/edges
|
// Dont' adjust delta chain that points to irrelevant data vertices/edges
|
||||||
if (const auto delta_prev = delta_it->prev.Get();
|
if (const auto delta_prev = delta_it->prev.Get(); !IsDeltaHeadOfChain(delta_prev.type)) {
|
||||||
!IsDeltaHeadOfChain(delta_prev.type) && !DoesPrevPtrPointsToSplittedData(delta_prev, split_key)) {
|
|
||||||
++delta_it;
|
++delta_it;
|
||||||
++cloned_delta_it;
|
++cloned_delta_it;
|
||||||
continue;
|
continue;
|
||||||
@ -215,50 +228,16 @@ void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Tr
|
|||||||
auto *cloned_delta = &*cloned_delta_it;
|
auto *cloned_delta = &*cloned_delta_it;
|
||||||
while (delta->next != nullptr) {
|
while (delta->next != nullptr) {
|
||||||
// Align next ptr
|
// Align next ptr
|
||||||
// Get cloned_delta->next transaction, using delta->next original transaction
|
|
||||||
AdjustDeltaNext(*delta, *cloned_delta, cloned_transactions);
|
AdjustDeltaNext(*delta, *cloned_delta, cloned_transactions);
|
||||||
|
|
||||||
// Align prev ptr
|
// Align prev ptr
|
||||||
auto ptr = delta->prev.Get();
|
AdjustDeltaPrevPtr(*delta, *cloned_delta, cloned_transactions, cloned_vertices, cloned_edges);
|
||||||
switch (ptr.type) {
|
|
||||||
case PreviousPtr::Type::NULLPTR: {
|
|
||||||
// noop
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case PreviousPtr::Type::DELTA: {
|
|
||||||
// Same as for deltas except don't align next but prev
|
|
||||||
auto cloned_transaction_it = std::ranges::find_if(cloned_transactions, [&ptr](const auto &elem) {
|
|
||||||
return elem.second->start_timestamp == ptr.delta->commit_info->start_or_commit_timestamp ||
|
|
||||||
elem.second->commit_info->start_or_commit_timestamp ==
|
|
||||||
ptr.delta->commit_info->start_or_commit_timestamp;
|
|
||||||
});
|
|
||||||
MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found");
|
|
||||||
// Find cloned delta in delta list of cloned transaction
|
|
||||||
auto found_cloned_delta_it =
|
|
||||||
std::ranges::find_if(cloned_transaction_it->second->deltas,
|
|
||||||
[delta = ptr.delta](const auto &elem) { return elem.id == delta->id; });
|
|
||||||
MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(),
|
|
||||||
"Delta with given id must exist!");
|
|
||||||
|
|
||||||
cloned_delta->prev.Set(&*found_cloned_delta_it);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case PreviousPtr::Type::VERTEX: {
|
|
||||||
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
|
|
||||||
cloned_delta->prev.Set(cloned_vertex);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case PreviousPtr::Type::EDGE: {
|
|
||||||
// We can never be here if we have properties on edge disabled
|
|
||||||
auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid);
|
|
||||||
cloned_delta->prev.Set(&cloned_edge->second);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
cloned_delta = cloned_delta->next;
|
cloned_delta = cloned_delta->next;
|
||||||
delta = delta->next;
|
delta = delta->next;
|
||||||
}
|
}
|
||||||
|
// Align prev ptr
|
||||||
|
AdjustDeltaPrevPtr(*delta, *cloned_delta, cloned_transactions, cloned_vertices, cloned_edges);
|
||||||
|
|
||||||
++delta_it;
|
++delta_it;
|
||||||
++cloned_delta_it;
|
++cloned_delta_it;
|
||||||
@ -283,4 +262,44 @@ void Splitter::AdjustDeltaNext(const Delta &original, Delta &cloned,
|
|||||||
cloned.next = &*found_cloned_delta_it;
|
cloned.next = &*found_cloned_delta_it;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Splitter::AdjustDeltaPrevPtr(const Delta &original, Delta &cloned,
|
||||||
|
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
||||||
|
VertexContainer & /*cloned_vertices*/, EdgeContainer &cloned_edges) {
|
||||||
|
auto ptr = original.prev.Get();
|
||||||
|
switch (ptr.type) {
|
||||||
|
case PreviousPtr::Type::NULLPTR: {
|
||||||
|
// noop
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PreviousPtr::Type::DELTA: {
|
||||||
|
// Same as for deltas except don't align next but prev
|
||||||
|
auto cloned_transaction_it = std::ranges::find_if(cloned_transactions, [&ptr](const auto &elem) {
|
||||||
|
return elem.second->start_timestamp == ptr.delta->commit_info->start_or_commit_timestamp ||
|
||||||
|
elem.second->commit_info->start_or_commit_timestamp == ptr.delta->commit_info->start_or_commit_timestamp;
|
||||||
|
});
|
||||||
|
MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found");
|
||||||
|
// Find cloned delta in delta list of cloned transaction
|
||||||
|
auto found_cloned_delta_it =
|
||||||
|
std::ranges::find_if(cloned_transaction_it->second->deltas,
|
||||||
|
[delta = ptr.delta](const auto &elem) { return elem.id == delta->id; });
|
||||||
|
MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(),
|
||||||
|
"Delta with given id must exist!");
|
||||||
|
|
||||||
|
cloned.prev.Set(&*found_cloned_delta_it);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PreviousPtr::Type::VERTEX: {
|
||||||
|
// The vertex was extracted and it is safe to reuse address
|
||||||
|
cloned.prev.Set(ptr.vertex);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PreviousPtr::Type::EDGE: {
|
||||||
|
// We can never be here if we have properties on edge disabled
|
||||||
|
auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid);
|
||||||
|
cloned.prev.Set(&cloned_edge->second);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace memgraph::storage::v3
|
} // namespace memgraph::storage::v3
|
||||||
|
@ -88,6 +88,10 @@ class Splitter final {
|
|||||||
static void AdjustDeltaNext(const Delta &original, Delta &cloned,
|
static void AdjustDeltaNext(const Delta &original, Delta &cloned,
|
||||||
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions);
|
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions);
|
||||||
|
|
||||||
|
static void AdjustDeltaPrevPtr(const Delta &original, Delta &cloned,
|
||||||
|
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
||||||
|
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
|
||||||
|
|
||||||
const LabelId primary_label_;
|
const LabelId primary_label_;
|
||||||
VertexContainer &vertices_;
|
VertexContainer &vertices_;
|
||||||
EdgeContainer &edges_;
|
EdgeContainer &edges_;
|
||||||
|
@ -144,13 +144,14 @@ void AssertEqVertexContainer(const VertexContainer &actual, const VertexContaine
|
|||||||
}
|
}
|
||||||
|
|
||||||
void AssertEqDeltaLists(const std::list<Delta> &actual, const std::list<Delta> &expected) {
|
void AssertEqDeltaLists(const std::list<Delta> &actual, const std::list<Delta> &expected) {
|
||||||
ASSERT_EQ(actual.size(), expected.size());
|
EXPECT_EQ(actual.size(), expected.size());
|
||||||
auto actual_it = actual.begin();
|
auto actual_it = actual.begin();
|
||||||
auto expected_it = expected.begin();
|
auto expected_it = expected.begin();
|
||||||
while (actual_it != actual.end()) {
|
while (actual_it != actual.end()) {
|
||||||
EXPECT_EQ(actual_it->action, expected_it->action);
|
|
||||||
EXPECT_EQ(actual_it->id, expected_it->id);
|
EXPECT_EQ(actual_it->id, expected_it->id);
|
||||||
EXPECT_NE(&*actual_it, &*expected_it) << "Deltas must be different objects!";
|
EXPECT_EQ(actual_it->action, expected_it->action);
|
||||||
|
++actual_it;
|
||||||
|
++expected_it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,10 +211,10 @@ TEST_F(ShardSplitTest, TestBasicSplitWithVertices) {
|
|||||||
std::list<Delta> expected_deltas;
|
std::list<Delta> expected_deltas;
|
||||||
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 4, 1);
|
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 4, 1);
|
||||||
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 5, 2);
|
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 5, 2);
|
||||||
expected_deltas.emplace_back(Delta::RemoveLabelTag{}, secondary_label, &commit_info, 7, 4);
|
|
||||||
expected_deltas.emplace_back(Delta::SetPropertyTag{}, secondary_property, PropertyValue(), &commit_info, 6, 4);
|
expected_deltas.emplace_back(Delta::SetPropertyTag{}, secondary_property, PropertyValue(), &commit_info, 6, 4);
|
||||||
|
expected_deltas.emplace_back(Delta::RemoveLabelTag{}, secondary_label, &commit_info, 7, 4);
|
||||||
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 8, 3);
|
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 8, 3);
|
||||||
// AssertEqDeltaLists(splitted_data.transactions.begin()->second->deltas, expected_deltas);
|
AssertEqDeltaLists(splitted_data.transactions.begin()->second->deltas, expected_deltas);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||||
@ -402,11 +403,11 @@ TEST_F(ShardSplitTest, TestBigSplit) {
|
|||||||
const auto split_value = pk / 2;
|
const auto split_value = pk / 2;
|
||||||
auto splitted_data = storage.PerformSplit({PropertyValue(split_value)}, 2);
|
auto splitted_data = storage.PerformSplit({PropertyValue(split_value)}, 2);
|
||||||
|
|
||||||
// EXPECT_EQ(splitted_data.vertices.size(), 100000);
|
EXPECT_EQ(splitted_data.vertices.size(), 10000);
|
||||||
// EXPECT_EQ(splitted_data.edges->size(), 50000);
|
EXPECT_EQ(splitted_data.edges->size(), 5000);
|
||||||
// EXPECT_EQ(splitted_data.transactions.size(), 50000);
|
EXPECT_EQ(splitted_data.transactions.size(), 5000);
|
||||||
// EXPECT_EQ(splitted_data.label_indices.size(), 0);
|
EXPECT_EQ(splitted_data.label_indices.size(), 0);
|
||||||
// EXPECT_EQ(splitted_data.label_property_indices.size(), 1);
|
EXPECT_EQ(splitted_data.label_property_indices.size(), 1);
|
||||||
|
|
||||||
AssertSplittedShard(std::move(splitted_data), split_value);
|
AssertSplittedShard(std::move(splitted_data), split_value);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user