diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index bf89ed58c..803678c8a 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -1058,7 +1058,7 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept { void Shard::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const { while (delta != nullptr) { - collected_transactions_start_id.insert(delta->command_id); + collected_transactions_start_id.insert(delta->commit_info->start_or_commit_timestamp.logical_id); delta = delta->next; } } @@ -1126,8 +1126,9 @@ void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transa auto *cloned_delta = &*cloned_delta_it; while (delta != nullptr) { // Align delta - cloned_delta->next = &*std::ranges::find_if(cloned_transactions.at(delta->command_id).deltas, - [delta](const auto &elem) { return elem.uuid == delta->uuid; }); + cloned_delta->next = &*std::ranges::find_if( + cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas, + [delta](const auto &elem) { return elem.uuid == delta->uuid; }); // Align prev ptr auto ptr = delta->prev.Get(); switch (ptr.type) { @@ -1140,6 +1141,8 @@ void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transa break; } case PreviousPtr::Type::VERTEX: { + // What if the vertex is already moved to garbage collection... + // Make test when you have deleted vertex auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first); cloned_delta->prev.Set(cloned_vertex); break; @@ -1176,7 +1179,7 @@ std::map<uint64_t, Transaction> Shard::CollectTransactions(const std::set<uint64 EdgeContainer &cloned_edges) { std::map<uint64_t, Transaction> transactions; for (const auto commit_start : collected_transactions_start_id) { - transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()}); + transactions.insert({commit_start, start_logical_id_to_transaction_.at(commit_start)->Clone()}); } // It is necessary to clone all the transactions first so we have new addresses // for deltas, before doing alignment of deltas and prev_ptr diff --git a/src/storage/v3/transaction.hpp b/src/storage/v3/transaction.hpp index dba65067e..6c396d969 100644 --- a/src/storage/v3/transaction.hpp +++ b/src/storage/v3/transaction.hpp @@ -31,12 +31,12 @@ struct CommitInfo { }; struct Transaction { - Transaction(coordinator::Hlc start_timestamp, CommitInfo new_commit_info, uint64_t command_id, bool must_abort, - bool is_aborted, IsolationLevel isolation_level) + Transaction(coordinator::Hlc start_timestamp, CommitInfo new_commit_info, std::list<Delta> deltas, + uint64_t command_id, bool must_abort, bool is_aborted, IsolationLevel isolation_level) : start_timestamp{start_timestamp}, commit_info{std::make_unique<CommitInfo>(new_commit_info)}, command_id(command_id), - deltas(CopyDeltas(commit_info.get())), + deltas(std::move(deltas)), must_abort(must_abort), is_aborted(is_aborted), isolation_level(isolation_level){}; @@ -108,7 +108,8 @@ struct Transaction { // This does not solve the whole problem of copying deltas Transaction Clone() const { - return {start_timestamp, *commit_info, command_id, must_abort, is_aborted, isolation_level}; + return {start_timestamp, *commit_info, CopyDeltas(commit_info.get()), command_id, must_abort, + is_aborted, isolation_level}; } coordinator::Hlc start_timestamp; diff --git a/tests/unit/storage_v3_shard_split.cpp b/tests/unit/storage_v3_shard_split.cpp index 716734433..ebfca2921 100644 --- a/tests/unit/storage_v3_shard_split.cpp +++ b/tests/unit/storage_v3_shard_split.cpp @@ -87,4 +87,29 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) { EXPECT_EQ(splitted_data.transactions.size(), 0); } +TEST_F(ShardSplitTest, TestBasicSplit) { + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError()); + + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2)) + .HasError()); + + auto splitted_data = storage.PerformSplit({PropertyValue(4)}); + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 2); + EXPECT_EQ(splitted_data.transactions.size(), 1); +} + } // namespace memgraph::storage::v3::tests