Add from SplitData
This commit is contained in:
parent
97002a50d5
commit
45521bdba8
@ -333,15 +333,16 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
|
||||
indices_{config.items, vertex_validator_},
|
||||
isolation_level_{config.transaction.isolation_level},
|
||||
config_{config},
|
||||
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
|
||||
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema,
|
||||
name_id_mapper_) {
|
||||
CreateSchema(primary_label_, schema);
|
||||
StoreMapping(std::move(id_to_name));
|
||||
}
|
||||
|
||||
Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
|
||||
std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config,
|
||||
std::unordered_map<uint64_t, std::string> id_to_name)
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name)
|
||||
: primary_label_{primary_label},
|
||||
min_primary_key_{min_primary_key},
|
||||
max_primary_key_{max_primary_key},
|
||||
@ -353,15 +354,16 @@ Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<Pr
|
||||
isolation_level_{config.transaction.isolation_level},
|
||||
config_{config},
|
||||
start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)),
|
||||
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
|
||||
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema,
|
||||
name_id_mapper_) {
|
||||
CreateSchema(primary_label_, schema);
|
||||
StoreMapping(std::move(id_to_name));
|
||||
StoreMapping(id_to_name);
|
||||
}
|
||||
|
||||
Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
|
||||
std::vector<SchemaProperty> schema, VertexContainer &&vertices,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config,
|
||||
std::unordered_map<uint64_t, std::string> id_to_name)
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name)
|
||||
: primary_label_{primary_label},
|
||||
min_primary_key_{min_primary_key},
|
||||
max_primary_key_{max_primary_key},
|
||||
@ -372,13 +374,25 @@ Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<Pr
|
||||
isolation_level_{config.transaction.isolation_level},
|
||||
config_{config},
|
||||
start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)),
|
||||
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
|
||||
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema,
|
||||
name_id_mapper_) {
|
||||
CreateSchema(primary_label_, schema);
|
||||
StoreMapping(std::move(id_to_name));
|
||||
StoreMapping(id_to_name);
|
||||
}
|
||||
|
||||
Shard::~Shard() {}
|
||||
|
||||
std::unique_ptr<Shard> Shard::FromSplitData(SplitData &&split_data) {
|
||||
if (split_data.config.items.properties_on_edges) [[likely]] {
|
||||
return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.min_primary_key,
|
||||
split_data.schema, std::move(split_data.vertices), std::move(*split_data.edges),
|
||||
std::move(split_data.transactions), split_data.config, split_data.id_to_name);
|
||||
}
|
||||
return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.min_primary_key,
|
||||
split_data.schema, std::move(split_data.vertices), std::move(split_data.transactions),
|
||||
split_data.config, split_data.id_to_name);
|
||||
}
|
||||
|
||||
Shard::Accessor::Accessor(Shard &shard, Transaction &transaction)
|
||||
: shard_(&shard), transaction_(&transaction), config_(shard_->config_.items) {}
|
||||
|
||||
@ -1096,7 +1110,7 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::unique_ptr<Shard> Shard::PerformSplit(const PrimaryKey &split_key) {
|
||||
SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
|
||||
return shard_splitter_.SplitShard(split_key, max_primary_key_);
|
||||
}
|
||||
|
||||
|
@ -199,13 +199,13 @@ class Shard final {
|
||||
|
||||
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
|
||||
std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config = Config(),
|
||||
std::unordered_map<uint64_t, std::string> id_to_name = {});
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name);
|
||||
|
||||
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
|
||||
std::vector<SchemaProperty> schema, VertexContainer &&vertices,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config = Config(),
|
||||
std::unordered_map<uint64_t, std::string> id_to_name = {});
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name);
|
||||
|
||||
Shard(const Shard &) = delete;
|
||||
Shard(Shard &&) noexcept = delete;
|
||||
@ -213,6 +213,8 @@ class Shard final {
|
||||
Shard operator=(Shard &&) noexcept = delete;
|
||||
~Shard();
|
||||
|
||||
static std::unique_ptr<Shard> FromSplitData(SplitData &&split_data);
|
||||
|
||||
class Accessor final {
|
||||
private:
|
||||
friend class Shard;
|
||||
@ -379,7 +381,7 @@ class Shard final {
|
||||
|
||||
std::optional<SplitInfo> ShouldSplit() const noexcept;
|
||||
|
||||
std::unique_ptr<Shard> PerformSplit(const PrimaryKey &split_key);
|
||||
SplitData PerformSplit(const PrimaryKey &split_key);
|
||||
|
||||
private:
|
||||
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/indices.hpp"
|
||||
#include "storage/v3/key_store.hpp"
|
||||
#include "storage/v3/name_id_mapper.hpp"
|
||||
#include "storage/v3/schemas.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/transaction.hpp"
|
||||
@ -29,30 +30,37 @@ namespace memgraph::storage::v3 {
|
||||
|
||||
Splitter::Splitter(const LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
|
||||
Config &config, const std::vector<SchemaProperty> &schema)
|
||||
const Config &config, const std::vector<SchemaProperty> &schema, const NameIdMapper &name_id_mapper)
|
||||
: primary_label_(primary_label),
|
||||
vertices_(vertices),
|
||||
edges_(edges),
|
||||
start_logical_id_to_transaction_(start_logical_id_to_transaction),
|
||||
indices_(indices),
|
||||
config_(config),
|
||||
schema_(schema) {}
|
||||
schema_(schema),
|
||||
name_id_mapper_(name_id_mapper) {}
|
||||
|
||||
std::unique_ptr<Shard> Splitter::SplitShard(const PrimaryKey &split_key,
|
||||
const std::optional<PrimaryKey> &max_primary_key) {
|
||||
SplitData data;
|
||||
SplitData Splitter::SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key) {
|
||||
SplitData data{.primary_label = primary_label_,
|
||||
.min_primary_key = split_key,
|
||||
.max_primary_key = max_primary_key,
|
||||
.schema = schema_,
|
||||
.config = config_,
|
||||
.id_to_name = name_id_mapper_.GetIdToNameMap()};
|
||||
|
||||
std::set<uint64_t> collected_transactions_;
|
||||
data.vertices = CollectVertices(data, collected_transactions_, split_key);
|
||||
data.edges = CollectEdges(collected_transactions_, data.vertices, split_key);
|
||||
data.transactions = CollectTransactions(collected_transactions_, data.vertices, *data.edges);
|
||||
|
||||
if (data.edges) {
|
||||
return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
|
||||
std::move(*data.edges), std::move(data.transactions), config_);
|
||||
}
|
||||
return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
|
||||
std::move(data.transactions), config_);
|
||||
// if (data.edges) {
|
||||
// return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
|
||||
// std::move(*data.edges), std::move(data.transactions), config_,
|
||||
// name_id_mapper_.GetIdToNameMap());
|
||||
// }
|
||||
// return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
|
||||
// std::move(data.transactions), config_, name_id_mapper_.GetIdToNameMap());
|
||||
return data;
|
||||
}
|
||||
|
||||
void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_, Delta *delta) {
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/indices.hpp"
|
||||
#include "storage/v3/key_store.hpp"
|
||||
#include "storage/v3/name_id_mapper.hpp"
|
||||
#include "storage/v3/schemas.hpp"
|
||||
#include "storage/v3/transaction.hpp"
|
||||
#include "storage/v3/vertex.hpp"
|
||||
@ -31,6 +32,13 @@ namespace memgraph::storage::v3 {
|
||||
// If edge properties-on-edges is false then we don't need to send edges but
|
||||
// only vertices, since they will contain those edges
|
||||
struct SplitData {
|
||||
LabelId primary_label;
|
||||
PrimaryKey min_primary_key;
|
||||
std::optional<PrimaryKey> max_primary_key;
|
||||
std::vector<SchemaProperty> schema;
|
||||
Config config;
|
||||
std::unordered_map<uint64_t, std::string> id_to_name;
|
||||
|
||||
VertexContainer vertices;
|
||||
std::optional<EdgeContainer> edges;
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
|
||||
@ -42,7 +50,7 @@ class Splitter final {
|
||||
public:
|
||||
Splitter(LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
|
||||
Config &config, const std::vector<SchemaProperty> &schema);
|
||||
const Config &config, const std::vector<SchemaProperty> &schema, const NameIdMapper &name_id_mapper_);
|
||||
|
||||
Splitter(const Splitter &) = delete;
|
||||
Splitter(Splitter &&) noexcept = delete;
|
||||
@ -50,19 +58,19 @@ class Splitter final {
|
||||
Splitter operator=(Splitter &&) noexcept = delete;
|
||||
~Splitter() = default;
|
||||
|
||||
std::unique_ptr<Shard> SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key);
|
||||
SplitData SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key);
|
||||
|
||||
private:
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
|
||||
const std::set<uint64_t> &collected_transactions_start_id, VertexContainer &cloned_vertices,
|
||||
EdgeContainer &cloned_edges);
|
||||
|
||||
VertexContainer CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id,
|
||||
const PrimaryKey &split_key);
|
||||
|
||||
std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
|
||||
const VertexContainer &split_vertices, const PrimaryKey &split_key);
|
||||
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
|
||||
const std::set<uint64_t> &collected_transactions_start_id, VertexContainer &cloned_vertices,
|
||||
EdgeContainer &cloned_edges);
|
||||
|
||||
template <typename IndexMap, typename IndexType>
|
||||
requires utils::SameAsAnyOf<IndexMap, LabelPropertyIndex, LabelIndex>
|
||||
std::map<IndexType, typename IndexMap::IndexContainer> CollectIndexEntries(
|
||||
@ -97,13 +105,14 @@ class Splitter final {
|
||||
void AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
|
||||
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
|
||||
|
||||
LabelId primary_label_;
|
||||
const LabelId primary_label_;
|
||||
VertexContainer &vertices_;
|
||||
EdgeContainer &edges_;
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction_;
|
||||
Indices &indices_;
|
||||
Config &config_;
|
||||
std::vector<SchemaProperty> schema_;
|
||||
const Config &config_;
|
||||
const std::vector<SchemaProperty> schema_;
|
||||
const NameIdMapper &name_id_mapper_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::storage::v3
|
||||
|
@ -56,31 +56,29 @@ class ShardSplitTest : public testing::Test {
|
||||
}
|
||||
};
|
||||
|
||||
void AssertEqVertexContainer(const VertexContainer &expected, const VertexContainer &actual) {
|
||||
ASSERT_EQ(expected.size(), actual.size());
|
||||
void AssertEqVertexContainer(const VertexContainer &actual, const VertexContainer &expected) {
|
||||
ASSERT_EQ(actual.size(), expected.size());
|
||||
|
||||
auto expected_it = expected.begin();
|
||||
auto actual_it = actual.begin();
|
||||
while (expected_it != expected.end()) {
|
||||
EXPECT_EQ(expected_it->first, actual_it->first);
|
||||
EXPECT_EQ(expected_it->second.deleted, actual_it->second.deleted);
|
||||
EXPECT_EQ(expected_it->second.in_edges, actual_it->second.in_edges);
|
||||
EXPECT_EQ(expected_it->second.out_edges, actual_it->second.out_edges);
|
||||
EXPECT_EQ(expected_it->second.labels, actual_it->second.labels);
|
||||
EXPECT_EQ(actual_it->first, expected_it->first);
|
||||
EXPECT_EQ(actual_it->second.deleted, expected_it->second.deleted);
|
||||
EXPECT_EQ(actual_it->second.labels, expected_it->second.labels);
|
||||
|
||||
auto *expected_delta = expected_it->second.delta;
|
||||
auto *actual_delta = actual_it->second.delta;
|
||||
while (expected_delta != nullptr) {
|
||||
EXPECT_EQ(expected_delta->action, actual_delta->action);
|
||||
EXPECT_EQ(actual_delta->action, expected_delta->action);
|
||||
switch (expected_delta->action) {
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
EXPECT_EQ(expected_delta->label, actual_delta->label);
|
||||
EXPECT_EQ(actual_delta->label, expected_delta->label);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
EXPECT_EQ(expected_delta->property.key, actual_delta->property.key);
|
||||
EXPECT_EQ(expected_delta->property.value, actual_delta->property.value);
|
||||
EXPECT_EQ(actual_delta->property.key, expected_delta->property.key);
|
||||
EXPECT_EQ(actual_delta->property.value, expected_delta->property.value);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
@ -147,7 +145,7 @@ TEST_F(ShardSplitTest, TestBasicSplitWithVertices) {
|
||||
AddDeltaToDeltaChain(&*it, &delta_add_property);
|
||||
AddDeltaToDeltaChain(&*it, &delta_add_label);
|
||||
|
||||
AssertEqVertexContainer(expected_vertices, splitted_data.vertices);
|
||||
AssertEqVertexContainer(splitted_data.vertices, expected_vertices);
|
||||
}
|
||||
|
||||
TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||
@ -159,9 +157,6 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
|
||||
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
|
||||
|
||||
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
|
||||
.HasError());
|
||||
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
|
||||
.HasError());
|
||||
@ -169,7 +164,8 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
|
||||
.HasError());
|
||||
|
||||
acc.Commit(GetNextHlc());
|
||||
auto current_hlc = GetNextHlc();
|
||||
acc.Commit(current_hlc);
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
@ -177,6 +173,26 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
EXPECT_EQ(splitted_data.label_indices.size(), 0);
|
||||
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
|
||||
|
||||
CommitInfo commit_info{.start_or_commit_timestamp = current_hlc};
|
||||
Delta delta_delete1{Delta::DeleteObjectTag{}, &commit_info, 1};
|
||||
Delta delta_delete2{Delta::DeleteObjectTag{}, &commit_info, 1};
|
||||
Delta delta_delete3{Delta::DeleteObjectTag{}, &commit_info, 1};
|
||||
Delta delta_add_in_edge1{Delta::RemoveInEdgeTag{}, edge_type_id, VertexId{primary_label, {PropertyValue(1)}},
|
||||
EdgeRef{Gid::FromUint(1)}, &commit_info, 1};
|
||||
Delta delta_add_out_edge2{Delta::RemoveOutEdgeTag{}, edge_type_id, VertexId{primary_label, {PropertyValue(6)}},
|
||||
EdgeRef{Gid::FromUint(2)}, &commit_info, 1};
|
||||
Delta delta_add_in_edge2{Delta::RemoveInEdgeTag{}, edge_type_id, VertexId{primary_label, {PropertyValue(4)}},
|
||||
EdgeRef{Gid::FromUint(2)}, &commit_info, 1};
|
||||
VertexContainer expected_vertices;
|
||||
auto [vtx4, inserted4] = expected_vertices.emplace(PrimaryKey{PropertyValue{4}}, VertexData(&delta_delete1));
|
||||
auto [vtx5, inserted5] = expected_vertices.emplace(PrimaryKey{PropertyValue{5}}, VertexData(&delta_delete2));
|
||||
auto [vtx6, inserted6] = expected_vertices.emplace(PrimaryKey{PropertyValue{6}}, VertexData(&delta_delete3));
|
||||
AddDeltaToDeltaChain(&*vtx4, &delta_add_out_edge2);
|
||||
AddDeltaToDeltaChain(&*vtx5, &delta_add_in_edge1);
|
||||
AddDeltaToDeltaChain(&*vtx6, &delta_add_in_edge2);
|
||||
|
||||
AssertEqVertexContainer(splitted_data.vertices, expected_vertices);
|
||||
}
|
||||
|
||||
TEST_F(ShardSplitTest, TestBasicSplitBeforeCommit) {
|
||||
|
Loading…
Reference in New Issue
Block a user