diff --git a/src/storage/v3/delta.hpp b/src/storage/v3/delta.hpp index b8bea789d..8c1b85ab5 100644 --- a/src/storage/v3/delta.hpp +++ b/src/storage/v3/delta.hpp @@ -241,6 +241,8 @@ struct Delta { } } + friend bool operator==(const Delta &lhs, const Delta &rhs) noexcept { return lhs.id == rhs.id; } + Action action; uint64_t id; // TODO: optimize with in-place copy diff --git a/src/storage/v3/indices.hpp b/src/storage/v3/indices.hpp index 99023183b..6023ea9e3 100644 --- a/src/storage/v3/indices.hpp +++ b/src/storage/v3/indices.hpp @@ -141,8 +141,6 @@ class LabelIndex { [[maybe_unused]] const auto &[inserted_entry_it, inserted, node] = cloned_indices_container.insert(index.extract(entry_it)); MG_ASSERT(inserted, "Failed to extract index entry!"); - - // vertex_entry_map[index_type_val].insert({inserted_entry_it->vertex, inserted_entry_it}); } entry_it = next_entry_it; } @@ -286,7 +284,6 @@ class LabelPropertyIndex { [[maybe_unused]] const auto &[inserted_entry_it, inserted, node] = cloned_index_container.insert(index.extract(entry_it)); MG_ASSERT(inserted, "Failed to extract index entry!"); - // vertex_entry_map[index_type_val].insert({inserted_entry_it->vertex, inserted_entry_it}); } entry_it = next_entry_it; } diff --git a/src/storage/v3/splitter.cpp b/src/storage/v3/splitter.cpp index 3e387fe4d..40c55c154 100644 --- a/src/storage/v3/splitter.cpp +++ b/src/storage/v3/splitter.cpp @@ -140,37 +140,50 @@ std::map> Splitter::CollectTransactions( return transactions; } -void PruneDeltas(std::map> &cloned_transactions, const PrimaryKey &split_key) { - // Remove delta chains wh - auto cloned_transaction_it = cloned_transactions.begin(); - while (cloned_transaction_it != cloned_transactions.end()) { - auto cloned_delta_it = cloned_transaction_it->second->deltas.begin(); +void PruneDeltas(Transaction &cloned_transaction, std::map> &cloned_transactions, + const PrimaryKey &split_key) { + // Remove delta chains that don't point to objects on splitted shard + auto cloned_delta_it = cloned_transaction.deltas.begin(); - if (const auto prev = cloned_delta_it->prev.Get(); - prev.type == PreviousPtr::Type::VERTEX && prev.vertex->first < split_key) { - // We can remove this delta chain - auto *current_next_delta = cloned_delta_it->next; - cloned_transaction_it->second->deltas.remove_if( - [cloned_delta_it](const auto &delta) { return delta.id == cloned_delta_it->id; }); + while (cloned_delta_it != cloned_transaction.deltas.end()) { + const auto prev = cloned_delta_it->prev.Get(); + switch (prev.type) { + case PreviousPtr::Type::DELTA: + case PreviousPtr::Type::NULLPTR: + ++cloned_delta_it; + break; + case PreviousPtr::Type::VERTEX: { + if (prev.vertex->first < split_key) { + // We can remove this delta chain + auto *current_next_delta = cloned_delta_it->next; + cloned_delta_it = cloned_transaction.deltas.erase(cloned_delta_it); - while (current_next_delta != nullptr) { - auto *next_delta = current_next_delta->next; - // Find next delta transaction delta list - auto current_transaction_it = std::ranges::find_if( - cloned_transactions, [&start_or_commit_timestamp = cloned_delta_it->commit_info->start_or_commit_timestamp]( - const auto &transaction) { - return transaction.second->start_timestamp == start_or_commit_timestamp || - transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp; - }); - // Remove it - current_transaction_it->second->deltas.remove_if( - [current_next_delta](const auto &delta) { return delta.id == current_next_delta->id; }); + while (current_next_delta != nullptr) { + auto *next_delta = current_next_delta->next; + // Find next delta transaction delta list + auto current_transaction_it = std::ranges::find_if( + cloned_transactions, + [&start_or_commit_timestamp = + current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) { + return transaction.second->start_timestamp == start_or_commit_timestamp || + transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp; + }); + MG_ASSERT(current_transaction_it != cloned_transactions.end(), "Error when pruning deltas!"); + // Remove it + current_transaction_it->second->deltas.remove_if( + [¤t_next_delta = *current_next_delta](const auto &delta) { return delta == current_next_delta; }); - current_next_delta = next_delta; + current_next_delta = next_delta; + } + } else { + ++cloned_delta_it; + } + break; } + case PreviousPtr::Type::EDGE: + ++cloned_delta_it; + break; } - - // while(cloned_delta_it != ) } } @@ -178,11 +191,13 @@ void Splitter::AdjustClonedTransactions(std::map> &cloned_transactions, VertexContainer &cloned_vertices, EdgeContainer &cloned_edges, - const PrimaryKey &split_key) { - // Align next and prev in deltas + const PrimaryKey & /*split_key*/) { auto delta_it = transaction.deltas.begin(); auto cloned_delta_it = cloned_transaction.deltas.begin(); while (delta_it != transaction.deltas.end()) { // We can safely ignore deltas which are not head of delta chain // Dont' adjust delta chain that points to irrelevant data vertices/edges - if (const auto delta_prev = delta_it->prev.Get(); - !IsDeltaHeadOfChain(delta_prev.type) && !DoesPrevPtrPointsToSplittedData(delta_prev, split_key)) { + if (const auto delta_prev = delta_it->prev.Get(); !IsDeltaHeadOfChain(delta_prev.type)) { ++delta_it; ++cloned_delta_it; continue; @@ -215,50 +228,16 @@ void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Tr auto *cloned_delta = &*cloned_delta_it; while (delta->next != nullptr) { // Align next ptr - // Get cloned_delta->next transaction, using delta->next original transaction AdjustDeltaNext(*delta, *cloned_delta, cloned_transactions); // Align prev ptr - auto ptr = delta->prev.Get(); - switch (ptr.type) { - case PreviousPtr::Type::NULLPTR: { - // noop - break; - } - case PreviousPtr::Type::DELTA: { - // Same as for deltas except don't align next but prev - auto cloned_transaction_it = std::ranges::find_if(cloned_transactions, [&ptr](const auto &elem) { - return elem.second->start_timestamp == ptr.delta->commit_info->start_or_commit_timestamp || - elem.second->commit_info->start_or_commit_timestamp == - ptr.delta->commit_info->start_or_commit_timestamp; - }); - MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found"); - // Find cloned delta in delta list of cloned transaction - auto found_cloned_delta_it = - std::ranges::find_if(cloned_transaction_it->second->deltas, - [delta = ptr.delta](const auto &elem) { return elem.id == delta->id; }); - MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), - "Delta with given id must exist!"); - - cloned_delta->prev.Set(&*found_cloned_delta_it); - break; - } - case PreviousPtr::Type::VERTEX: { - auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first); - cloned_delta->prev.Set(cloned_vertex); - break; - } - case PreviousPtr::Type::EDGE: { - // We can never be here if we have properties on edge disabled - auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid); - cloned_delta->prev.Set(&cloned_edge->second); - break; - } - }; + AdjustDeltaPrevPtr(*delta, *cloned_delta, cloned_transactions, cloned_vertices, cloned_edges); cloned_delta = cloned_delta->next; delta = delta->next; } + // Align prev ptr + AdjustDeltaPrevPtr(*delta, *cloned_delta, cloned_transactions, cloned_vertices, cloned_edges); ++delta_it; ++cloned_delta_it; @@ -283,4 +262,44 @@ void Splitter::AdjustDeltaNext(const Delta &original, Delta &cloned, cloned.next = &*found_cloned_delta_it; } +void Splitter::AdjustDeltaPrevPtr(const Delta &original, Delta &cloned, + std::map> &cloned_transactions, + VertexContainer & /*cloned_vertices*/, EdgeContainer &cloned_edges) { + auto ptr = original.prev.Get(); + switch (ptr.type) { + case PreviousPtr::Type::NULLPTR: { + // noop + break; + } + case PreviousPtr::Type::DELTA: { + // Same as for deltas except don't align next but prev + auto cloned_transaction_it = std::ranges::find_if(cloned_transactions, [&ptr](const auto &elem) { + return elem.second->start_timestamp == ptr.delta->commit_info->start_or_commit_timestamp || + elem.second->commit_info->start_or_commit_timestamp == ptr.delta->commit_info->start_or_commit_timestamp; + }); + MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found"); + // Find cloned delta in delta list of cloned transaction + auto found_cloned_delta_it = + std::ranges::find_if(cloned_transaction_it->second->deltas, + [delta = ptr.delta](const auto &elem) { return elem.id == delta->id; }); + MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), + "Delta with given id must exist!"); + + cloned.prev.Set(&*found_cloned_delta_it); + break; + } + case PreviousPtr::Type::VERTEX: { + // The vertex was extracted and it is safe to reuse address + cloned.prev.Set(ptr.vertex); + break; + } + case PreviousPtr::Type::EDGE: { + // We can never be here if we have properties on edge disabled + auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid); + cloned.prev.Set(&cloned_edge->second); + break; + } + }; +} + } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/splitter.hpp b/src/storage/v3/splitter.hpp index 2053303e3..193b9d98b 100644 --- a/src/storage/v3/splitter.hpp +++ b/src/storage/v3/splitter.hpp @@ -88,6 +88,10 @@ class Splitter final { static void AdjustDeltaNext(const Delta &original, Delta &cloned, std::map> &cloned_transactions); + static void AdjustDeltaPrevPtr(const Delta &original, Delta &cloned, + std::map> &cloned_transactions, + VertexContainer &cloned_vertices, EdgeContainer &cloned_edges); + const LabelId primary_label_; VertexContainer &vertices_; EdgeContainer &edges_; diff --git a/tests/unit/storage_v3_shard_split.cpp b/tests/unit/storage_v3_shard_split.cpp index 69a69e52e..90bd599d6 100644 --- a/tests/unit/storage_v3_shard_split.cpp +++ b/tests/unit/storage_v3_shard_split.cpp @@ -144,13 +144,14 @@ void AssertEqVertexContainer(const VertexContainer &actual, const VertexContaine } void AssertEqDeltaLists(const std::list &actual, const std::list &expected) { - ASSERT_EQ(actual.size(), expected.size()); + EXPECT_EQ(actual.size(), expected.size()); auto actual_it = actual.begin(); auto expected_it = expected.begin(); while (actual_it != actual.end()) { - EXPECT_EQ(actual_it->action, expected_it->action); EXPECT_EQ(actual_it->id, expected_it->id); - EXPECT_NE(&*actual_it, &*expected_it) << "Deltas must be different objects!"; + EXPECT_EQ(actual_it->action, expected_it->action); + ++actual_it; + ++expected_it; } } @@ -210,10 +211,10 @@ TEST_F(ShardSplitTest, TestBasicSplitWithVertices) { std::list expected_deltas; expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 4, 1); expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 5, 2); - expected_deltas.emplace_back(Delta::RemoveLabelTag{}, secondary_label, &commit_info, 7, 4); expected_deltas.emplace_back(Delta::SetPropertyTag{}, secondary_property, PropertyValue(), &commit_info, 6, 4); + expected_deltas.emplace_back(Delta::RemoveLabelTag{}, secondary_label, &commit_info, 7, 4); expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 8, 3); - // AssertEqDeltaLists(splitted_data.transactions.begin()->second->deltas, expected_deltas); + AssertEqDeltaLists(splitted_data.transactions.begin()->second->deltas, expected_deltas); } TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) { @@ -402,11 +403,11 @@ TEST_F(ShardSplitTest, TestBigSplit) { const auto split_value = pk / 2; auto splitted_data = storage.PerformSplit({PropertyValue(split_value)}, 2); - // EXPECT_EQ(splitted_data.vertices.size(), 100000); - // EXPECT_EQ(splitted_data.edges->size(), 50000); - // EXPECT_EQ(splitted_data.transactions.size(), 50000); - // EXPECT_EQ(splitted_data.label_indices.size(), 0); - // EXPECT_EQ(splitted_data.label_property_indices.size(), 1); + EXPECT_EQ(splitted_data.vertices.size(), 10000); + EXPECT_EQ(splitted_data.edges->size(), 5000); + EXPECT_EQ(splitted_data.transactions.size(), 5000); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 1); AssertSplittedShard(std::move(splitted_data), split_value); }