Fix transaction split
This commit is contained in:
parent
791972a6b8
commit
db13ee96b6
@ -1058,7 +1058,7 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
|
||||
|
||||
void Shard::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const {
|
||||
while (delta != nullptr) {
|
||||
collected_transactions_start_id.insert(delta->command_id);
|
||||
collected_transactions_start_id.insert(delta->commit_info->start_or_commit_timestamp.logical_id);
|
||||
delta = delta->next;
|
||||
}
|
||||
}
|
||||
@ -1126,8 +1126,9 @@ void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transa
|
||||
auto *cloned_delta = &*cloned_delta_it;
|
||||
while (delta != nullptr) {
|
||||
// Align delta
|
||||
cloned_delta->next = &*std::ranges::find_if(cloned_transactions.at(delta->command_id).deltas,
|
||||
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
|
||||
cloned_delta->next = &*std::ranges::find_if(
|
||||
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas,
|
||||
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
|
||||
// Align prev ptr
|
||||
auto ptr = delta->prev.Get();
|
||||
switch (ptr.type) {
|
||||
@ -1140,6 +1141,8 @@ void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transa
|
||||
break;
|
||||
}
|
||||
case PreviousPtr::Type::VERTEX: {
|
||||
// What if the vertex is already moved to garbage collection...
|
||||
// Make test when you have deleted vertex
|
||||
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
|
||||
cloned_delta->prev.Set(cloned_vertex);
|
||||
break;
|
||||
@ -1176,7 +1179,7 @@ std::map<uint64_t, Transaction> Shard::CollectTransactions(const std::set<uint64
|
||||
EdgeContainer &cloned_edges) {
|
||||
std::map<uint64_t, Transaction> transactions;
|
||||
for (const auto commit_start : collected_transactions_start_id) {
|
||||
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
|
||||
transactions.insert({commit_start, start_logical_id_to_transaction_.at(commit_start)->Clone()});
|
||||
}
|
||||
// It is necessary to clone all the transactions first so we have new addresses
|
||||
// for deltas, before doing alignment of deltas and prev_ptr
|
||||
|
@ -31,12 +31,12 @@ struct CommitInfo {
|
||||
};
|
||||
|
||||
struct Transaction {
|
||||
Transaction(coordinator::Hlc start_timestamp, CommitInfo new_commit_info, uint64_t command_id, bool must_abort,
|
||||
bool is_aborted, IsolationLevel isolation_level)
|
||||
Transaction(coordinator::Hlc start_timestamp, CommitInfo new_commit_info, std::list<Delta> deltas,
|
||||
uint64_t command_id, bool must_abort, bool is_aborted, IsolationLevel isolation_level)
|
||||
: start_timestamp{start_timestamp},
|
||||
commit_info{std::make_unique<CommitInfo>(new_commit_info)},
|
||||
command_id(command_id),
|
||||
deltas(CopyDeltas(commit_info.get())),
|
||||
deltas(std::move(deltas)),
|
||||
must_abort(must_abort),
|
||||
is_aborted(is_aborted),
|
||||
isolation_level(isolation_level){};
|
||||
@ -108,7 +108,8 @@ struct Transaction {
|
||||
|
||||
// This does not solve the whole problem of copying deltas
|
||||
Transaction Clone() const {
|
||||
return {start_timestamp, *commit_info, command_id, must_abort, is_aborted, isolation_level};
|
||||
return {start_timestamp, *commit_info, CopyDeltas(commit_info.get()), command_id, must_abort,
|
||||
is_aborted, isolation_level};
|
||||
}
|
||||
|
||||
coordinator::Hlc start_timestamp;
|
||||
|
@ -87,4 +87,29 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 0);
|
||||
}
|
||||
|
||||
TEST_F(ShardSplitTest, TestBasicSplit) {
|
||||
auto acc = storage.Access(GetNextHlc());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
|
||||
|
||||
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
|
||||
.HasError());
|
||||
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
|
||||
.HasError());
|
||||
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
|
||||
.HasError());
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 2);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage::v3::tests
|
||||
|
Loading…
Reference in New Issue
Block a user