Add delta check to tests
This commit is contained in:
parent
28624aaa88
commit
0ae5357399
@ -132,11 +132,12 @@ inline bool operator==(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer
|
||||
|
||||
inline bool operator!=(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer &b) { return !(a == b); }
|
||||
|
||||
struct Delta {
|
||||
// Needed for splits
|
||||
// TODO Replace this with int identifier
|
||||
boost::uuids::uuid uuid{boost::uuids::uuid()};
|
||||
inline uint64_t GetNextDeltaUUID() noexcept {
|
||||
static uint64_t uuid{0};
|
||||
return ++uuid;
|
||||
}
|
||||
|
||||
struct Delta {
|
||||
enum class Action : uint8_t {
|
||||
// Used for both Vertex and Edge
|
||||
DELETE_OBJECT,
|
||||
@ -166,24 +167,37 @@ struct Delta {
|
||||
struct RemoveOutEdgeTag {};
|
||||
|
||||
Delta(DeleteObjectTag /*unused*/, CommitInfo *commit_info, uint64_t command_id)
|
||||
: action(Action::DELETE_OBJECT), commit_info(commit_info), command_id(command_id) {}
|
||||
: action(Action::DELETE_OBJECT), uuid(GetNextDeltaUUID()), commit_info(commit_info), command_id(command_id) {}
|
||||
|
||||
Delta(RecreateObjectTag /*unused*/, CommitInfo *commit_info, uint64_t command_id)
|
||||
: action(Action::RECREATE_OBJECT), commit_info(commit_info), command_id(command_id) {}
|
||||
: action(Action::RECREATE_OBJECT), uuid(GetNextDeltaUUID()), commit_info(commit_info), command_id(command_id) {}
|
||||
|
||||
Delta(AddLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t command_id)
|
||||
: action(Action::ADD_LABEL), commit_info(commit_info), command_id(command_id), label(label) {}
|
||||
: action(Action::ADD_LABEL),
|
||||
uuid(GetNextDeltaUUID()),
|
||||
commit_info(commit_info),
|
||||
command_id(command_id),
|
||||
label(label) {}
|
||||
|
||||
Delta(RemoveLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t command_id)
|
||||
: action(Action::REMOVE_LABEL), commit_info(commit_info), command_id(command_id), label(label) {}
|
||||
: action(Action::REMOVE_LABEL),
|
||||
uuid(GetNextDeltaUUID()),
|
||||
commit_info(commit_info),
|
||||
command_id(command_id),
|
||||
label(label) {}
|
||||
|
||||
Delta(SetPropertyTag /*unused*/, PropertyId key, const PropertyValue &value, CommitInfo *commit_info,
|
||||
uint64_t command_id)
|
||||
: action(Action::SET_PROPERTY), commit_info(commit_info), command_id(command_id), property({key, value}) {}
|
||||
: action(Action::SET_PROPERTY),
|
||||
uuid(GetNextDeltaUUID()),
|
||||
commit_info(commit_info),
|
||||
command_id(command_id),
|
||||
property({key, value}) {}
|
||||
|
||||
Delta(AddInEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info,
|
||||
uint64_t command_id)
|
||||
: action(Action::ADD_IN_EDGE),
|
||||
uuid(GetNextDeltaUUID()),
|
||||
commit_info(commit_info),
|
||||
command_id(command_id),
|
||||
vertex_edge({edge_type, std::move(vertex_id), edge}) {}
|
||||
@ -191,6 +205,7 @@ struct Delta {
|
||||
Delta(AddOutEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info,
|
||||
uint64_t command_id)
|
||||
: action(Action::ADD_OUT_EDGE),
|
||||
uuid(GetNextDeltaUUID()),
|
||||
commit_info(commit_info),
|
||||
command_id(command_id),
|
||||
vertex_edge({edge_type, std::move(vertex_id), edge}) {}
|
||||
@ -198,6 +213,7 @@ struct Delta {
|
||||
Delta(RemoveInEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info,
|
||||
uint64_t command_id)
|
||||
: action(Action::REMOVE_IN_EDGE),
|
||||
uuid(GetNextDeltaUUID()),
|
||||
commit_info(commit_info),
|
||||
command_id(command_id),
|
||||
vertex_edge({edge_type, std::move(vertex_id), edge}) {}
|
||||
@ -205,6 +221,7 @@ struct Delta {
|
||||
Delta(RemoveOutEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info,
|
||||
uint64_t command_id)
|
||||
: action(Action::REMOVE_OUT_EDGE),
|
||||
uuid(GetNextDeltaUUID()),
|
||||
commit_info(commit_info),
|
||||
command_id(command_id),
|
||||
vertex_edge({edge_type, std::move(vertex_id), edge}) {}
|
||||
@ -234,7 +251,7 @@ struct Delta {
|
||||
}
|
||||
|
||||
Action action;
|
||||
|
||||
uint64_t uuid;
|
||||
// TODO: optimize with in-place copy
|
||||
CommitInfo *commit_info;
|
||||
uint64_t command_id;
|
||||
|
@ -159,18 +159,17 @@ void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Tra
|
||||
auto delta_it = transaction.deltas.begin();
|
||||
auto cloned_delta_it = cloned_transaction.deltas.begin();
|
||||
while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) {
|
||||
MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct");
|
||||
// Find appropriate prev and delta->next for cloned deltas
|
||||
|
||||
const auto *delta = &*delta_it;
|
||||
auto *cloned_delta = &*cloned_delta_it;
|
||||
while (delta != nullptr) {
|
||||
// Align delta, while ignoring deltas whose transactions have commited,
|
||||
// or aborted
|
||||
if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) {
|
||||
cloned_delta->next = &*std::ranges::find_if(
|
||||
auto *found_delta_it = &*std::ranges::find_if(
|
||||
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas,
|
||||
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
|
||||
MG_ASSERT(found_delta_it, "Delta with given uuid must exist!");
|
||||
cloned_delta->next = &*found_delta_it;
|
||||
} else {
|
||||
delta = delta->next;
|
||||
continue;
|
||||
|
@ -9,27 +9,41 @@
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <cstdint>
|
||||
|
||||
#include <gmock/gmock-matchers.h>
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v3/delta.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/key_store.hpp"
|
||||
#include "storage/v3/mvcc.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/vertex.hpp"
|
||||
#include "storage/v3/vertex_id.hpp"
|
||||
|
||||
using testing::Pair;
|
||||
using testing::UnorderedElementsAre;
|
||||
|
||||
namespace memgraph::storage::v3::tests {
|
||||
|
||||
class ShardSplitTest : public testing::Test {
|
||||
protected:
|
||||
void SetUp() override { storage.StoreMapping({{1, "label"}, {2, "property"}, {3, "edge_property"}}); }
|
||||
void SetUp() override {
|
||||
storage.StoreMapping(
|
||||
{{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}});
|
||||
}
|
||||
|
||||
const PropertyId primary_property{PropertyId::FromUint(2)};
|
||||
const PropertyId secondary_property{PropertyId::FromUint(5)};
|
||||
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
|
||||
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
|
||||
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
|
||||
const LabelId primary_label{LabelId::FromUint(1)};
|
||||
const LabelId secondary_label{LabelId::FromUint(4)};
|
||||
const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)};
|
||||
Shard storage{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector};
|
||||
|
||||
@ -42,21 +56,74 @@ class ShardSplitTest : public testing::Test {
|
||||
}
|
||||
};
|
||||
|
||||
void AssertEqVertexContainer(const VertexContainer &expected, const VertexContainer &actual) {
|
||||
ASSERT_EQ(expected.size(), actual.size());
|
||||
|
||||
auto expected_it = expected.begin();
|
||||
auto actual_it = actual.begin();
|
||||
while (expected_it != expected.end()) {
|
||||
EXPECT_EQ(expected_it->first, actual_it->first);
|
||||
EXPECT_EQ(expected_it->second.deleted, actual_it->second.deleted);
|
||||
EXPECT_EQ(expected_it->second.in_edges, actual_it->second.in_edges);
|
||||
EXPECT_EQ(expected_it->second.out_edges, actual_it->second.out_edges);
|
||||
EXPECT_EQ(expected_it->second.labels, actual_it->second.labels);
|
||||
|
||||
auto *expected_delta = expected_it->second.delta;
|
||||
auto *actual_delta = actual_it->second.delta;
|
||||
while (expected_delta != nullptr) {
|
||||
EXPECT_EQ(expected_delta->action, actual_delta->action);
|
||||
expected_delta = expected_delta->next;
|
||||
actual_delta = actual_delta->next;
|
||||
}
|
||||
EXPECT_EQ(expected_delta, nullptr);
|
||||
EXPECT_EQ(actual_delta, nullptr);
|
||||
++expected_it;
|
||||
++actual_it;
|
||||
}
|
||||
}
|
||||
|
||||
void AddDeltaToDeltaChain(Vertex *object, Delta *new_delta) {
|
||||
auto *delta_holder = GetDeltaHolder(object);
|
||||
|
||||
new_delta->next = delta_holder->delta;
|
||||
new_delta->prev.Set(object);
|
||||
if (delta_holder->delta) {
|
||||
delta_holder->delta->prev.Set(new_delta);
|
||||
}
|
||||
delta_holder->delta = new_delta;
|
||||
}
|
||||
|
||||
TEST_F(ShardSplitTest, TestBasicSplitWithVertices) {
|
||||
auto acc = storage.Access(GetNextHlc());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
|
||||
acc.Commit(GetNextHlc());
|
||||
storage.CollectGarbage(GetNextHlc().coordinator_wall_clock);
|
||||
auto current_hlc = GetNextHlc();
|
||||
acc.Commit(current_hlc);
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 0);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 0);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
EXPECT_EQ(splitted_data.label_indices.size(), 0);
|
||||
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
|
||||
|
||||
CommitInfo commit_info{.start_or_commit_timestamp = current_hlc};
|
||||
Delta delta_delete1{Delta::DeleteObjectTag{}, &commit_info, 1};
|
||||
Delta delta_delete2{Delta::DeleteObjectTag{}, &commit_info, 2};
|
||||
Delta delta_delete3{Delta::DeleteObjectTag{}, &commit_info, 3};
|
||||
Delta delta_add_label{Delta::RemoveLabelTag{}, secondary_label, &commit_info, 4};
|
||||
VertexContainer expected_vertices;
|
||||
expected_vertices.emplace(PrimaryKey{PropertyValue{4}}, VertexData(&delta_delete1));
|
||||
auto [it, inserted] = expected_vertices.emplace(PrimaryKey{PropertyValue{5}}, VertexData(&delta_delete2));
|
||||
expected_vertices.emplace(PrimaryKey{PropertyValue{6}}, VertexData(&delta_delete3));
|
||||
it->second.labels.push_back(secondary_label);
|
||||
AddDeltaToDeltaChain(&*it, &delta_add_label);
|
||||
|
||||
AssertEqVertexContainer(expected_vertices, splitted_data.vertices);
|
||||
}
|
||||
|
||||
TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||
@ -79,12 +146,13 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||
.HasError());
|
||||
|
||||
acc.Commit(GetNextHlc());
|
||||
storage.CollectGarbage(GetNextHlc().coordinator_wall_clock);
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 2);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 0);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
EXPECT_EQ(splitted_data.label_indices.size(), 0);
|
||||
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
|
||||
}
|
||||
|
||||
TEST_F(ShardSplitTest, TestBasicSplitBeforeCommit) {
|
||||
@ -110,36 +178,11 @@ TEST_F(ShardSplitTest, TestBasicSplitBeforeCommit) {
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 2);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
EXPECT_EQ(splitted_data.label_indices.size(), 0);
|
||||
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
|
||||
}
|
||||
|
||||
TEST_F(ShardSplitTest, TestBasicSplitAfterCommit) {
|
||||
auto acc = storage.Access(GetNextHlc());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
|
||||
|
||||
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
|
||||
.HasError());
|
||||
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
|
||||
.HasError());
|
||||
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
|
||||
.HasError());
|
||||
|
||||
acc.Commit(GetNextHlc());
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 2);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 0);
|
||||
}
|
||||
|
||||
TEST_F(ShardSplitTest, TestBasicSplitAfterCommit2) {
|
||||
TEST_F(ShardSplitTest, TestBasicSplitWithCommitedAndOngoingTransactions) {
|
||||
{
|
||||
auto acc = storage.Access(GetNextHlc());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
|
||||
@ -165,7 +208,9 @@ TEST_F(ShardSplitTest, TestBasicSplitAfterCommit2) {
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 2);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 2);
|
||||
EXPECT_EQ(splitted_data.label_indices.size(), 0);
|
||||
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
|
||||
}
|
||||
|
||||
} // namespace memgraph::storage::v3::tests
|
||||
|
Loading…
Reference in New Issue
Block a user