Fix deltas from commited transacations

This commit is contained in:
jbajic 2023-01-16 13:44:57 +01:00
parent 6de683d7f9
commit ff34bcf295
2 changed files with 40 additions and 5 deletions

View File

@ -1122,13 +1122,19 @@ void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transa
// auto *prev = &delta_it->prev;
// auto *cloned_prev = &cloned_delta_it->prev;
auto *delta = &*delta_it;
const auto *delta = &*delta_it;
auto *cloned_delta = &*cloned_delta_it;
while (delta != nullptr) {
// Align delta
cloned_delta->next = &*std::ranges::find_if(
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas,
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
// Align delta, while ignoring deltas whose transactions have commited,
// or aborted
if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) {
cloned_delta->next = &*std::ranges::find_if(
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas,
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
} else {
delta = delta->next;
continue;
}
// Align prev ptr
auto ptr = delta->prev.Get();
switch (ptr.type) {

View File

@ -139,4 +139,33 @@ TEST_F(ShardSplitTest, TestBasicSplitAfterCommit) {
EXPECT_EQ(splitted_data.transactions.size(), 0);
}
TEST_F(ShardSplitTest, TestBasicSplitAfterCommit2) {
{
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
acc.Commit(GetNextHlc());
}
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
.HasError());
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 2);
EXPECT_EQ(splitted_data.transactions.size(), 1);
}
} // namespace memgraph::storage::v3::tests