Add split in shard rsm
This commit is contained in:
parent
2835376eda
commit
75b5da9f07
@ -12,6 +12,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
@ -570,6 +571,16 @@ struct CommitResponse {
|
||||
std::optional<ShardError> error;
|
||||
};
|
||||
|
||||
struct SplitInfo {
|
||||
PrimaryKey split_key;
|
||||
uint64_t shard_version;
|
||||
};
|
||||
|
||||
struct PerformSplitDataInfo {
|
||||
PrimaryKey split_key;
|
||||
uint64_t shard_version;
|
||||
};
|
||||
|
||||
using ReadRequests = std::variant<ExpandOneRequest, GetPropertiesRequest, ScanVerticesRequest>;
|
||||
using ReadResponses = std::variant<ExpandOneResponse, GetPropertiesResponse, ScanVerticesResponse>;
|
||||
|
||||
|
@ -342,12 +342,13 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
|
||||
Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
|
||||
std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name)
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name, const uint64_t shard_version)
|
||||
: primary_label_{primary_label},
|
||||
min_primary_key_{min_primary_key},
|
||||
max_primary_key_{max_primary_key},
|
||||
vertices_(std::move(vertices)),
|
||||
edges_(std::move(edges)),
|
||||
shard_version_(shard_version),
|
||||
schema_validator_{schemas_, name_id_mapper_},
|
||||
vertex_validator_{schema_validator_, primary_label},
|
||||
indices_{config.items, vertex_validator_},
|
||||
@ -363,11 +364,12 @@ Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<Pr
|
||||
Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
|
||||
std::vector<SchemaProperty> schema, VertexContainer &&vertices,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name)
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name, const uint64_t shard_version)
|
||||
: primary_label_{primary_label},
|
||||
min_primary_key_{min_primary_key},
|
||||
max_primary_key_{max_primary_key},
|
||||
vertices_(std::move(vertices)),
|
||||
shard_version_(shard_version),
|
||||
schema_validator_{schemas_, name_id_mapper_},
|
||||
vertex_validator_{schema_validator_, primary_label},
|
||||
indices_{config.items, vertex_validator_},
|
||||
@ -386,11 +388,12 @@ std::unique_ptr<Shard> Shard::FromSplitData(SplitData &&split_data) {
|
||||
if (split_data.config.items.properties_on_edges) [[likely]] {
|
||||
return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.min_primary_key,
|
||||
split_data.schema, std::move(split_data.vertices), std::move(*split_data.edges),
|
||||
std::move(split_data.transactions), split_data.config, split_data.id_to_name);
|
||||
std::move(split_data.transactions), split_data.config, split_data.id_to_name,
|
||||
split_data.shard_version);
|
||||
}
|
||||
return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.min_primary_key,
|
||||
split_data.schema, std::move(split_data.vertices), std::move(split_data.transactions),
|
||||
split_data.config, split_data.id_to_name);
|
||||
split_data.config, split_data.id_to_name, split_data.shard_version);
|
||||
}
|
||||
|
||||
Shard::Accessor::Accessor(Shard &shard, Transaction &transaction)
|
||||
@ -1103,14 +1106,15 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
|
||||
if (vertices_.size() > config_.split.max_shard_vertex_size) {
|
||||
auto mid_elem = vertices_.begin();
|
||||
std::ranges::advance(mid_elem, static_cast<VertexContainer::difference_type>(vertices_.size() / 2));
|
||||
return SplitInfo{shard_version_, mid_elem->first};
|
||||
return SplitInfo{mid_elem->first, shard_version_};
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
|
||||
++shard_version_;
|
||||
return shard_splitter_.SplitShard(split_key, max_primary_key_);
|
||||
SplitData Shard::PerformSplit(const PrimaryKey &split_key, const uint64_t shard_version) {
|
||||
shard_version_ = shard_version;
|
||||
max_primary_key_ = split_key;
|
||||
return shard_splitter_.SplitShard(split_key, max_primary_key_, shard_version);
|
||||
}
|
||||
|
||||
bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const {
|
||||
|
@ -177,8 +177,8 @@ struct SchemasInfo {
|
||||
};
|
||||
|
||||
struct SplitInfo {
|
||||
uint64_t shard_version;
|
||||
PrimaryKey split_point;
|
||||
uint64_t shard_version;
|
||||
};
|
||||
|
||||
/// Structure used to return information about the storage.
|
||||
@ -200,12 +200,12 @@ class Shard final {
|
||||
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
|
||||
std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name);
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name, uint64_t shard_version);
|
||||
|
||||
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
|
||||
std::vector<SchemaProperty> schema, VertexContainer &&vertices,
|
||||
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name);
|
||||
const std::unordered_map<uint64_t, std::string> &id_to_name, uint64_t shard_version);
|
||||
|
||||
Shard(const Shard &) = delete;
|
||||
Shard(Shard &&) noexcept = delete;
|
||||
@ -381,7 +381,7 @@ class Shard final {
|
||||
|
||||
std::optional<SplitInfo> ShouldSplit() const noexcept;
|
||||
|
||||
SplitData PerformSplit(const PrimaryKey &split_key);
|
||||
SplitData PerformSplit(const PrimaryKey &split_key, uint64_t shard_version);
|
||||
|
||||
private:
|
||||
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -18,6 +18,7 @@
|
||||
#include <openssl/ec.h>
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v3/shard.hpp"
|
||||
#include "storage/v3/value_conversions.hpp"
|
||||
#include "storage/v3/vertex_accessor.hpp"
|
||||
|
||||
namespace memgraph::storage::v3 {
|
||||
@ -42,7 +43,18 @@ class ShardRsm {
|
||||
public:
|
||||
explicit ShardRsm(std::unique_ptr<Shard> &&shard) : shard_(std::move(shard)){};
|
||||
|
||||
std::optional<SplitInfo> ShouldSplit() const noexcept { return shard_->ShouldSplit(); }
|
||||
std::optional<msgs::SplitInfo> ShouldSplit() const noexcept {
|
||||
auto split_info = shard_->ShouldSplit();
|
||||
if (split_info) {
|
||||
return msgs::SplitInfo{conversions::ConvertValueVector(split_info->split_point), split_info->shard_version};
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::unique_ptr<Shard> PerformSplit(msgs::PerformSplitDataInfo perform_split) const noexcept {
|
||||
return Shard::FromSplitData(
|
||||
shard_->PerformSplit(conversions::ConvertPropertyVector(perform_split.split_key), perform_split.shard_version));
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
||||
msgs::ReadResponses Read(msgs::ReadRequests requests) {
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#include "storage/v3/splitter.hpp"
|
||||
|
||||
#include <cstdint>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
@ -41,13 +42,15 @@ Splitter::Splitter(const LabelId primary_label, VertexContainer &vertices, EdgeC
|
||||
schema_(schema),
|
||||
name_id_mapper_(name_id_mapper) {}
|
||||
|
||||
SplitData Splitter::SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key) {
|
||||
SplitData Splitter::SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key,
|
||||
const uint64_t shard_version) {
|
||||
SplitData data{.primary_label = primary_label_,
|
||||
.min_primary_key = split_key,
|
||||
.max_primary_key = max_primary_key,
|
||||
.schema = schema_,
|
||||
.config = config_,
|
||||
.id_to_name = name_id_mapper_.GetIdToNameMap()};
|
||||
.id_to_name = name_id_mapper_.GetIdToNameMap(),
|
||||
.shard_version = shard_version};
|
||||
|
||||
std::set<uint64_t> collected_transactions_;
|
||||
data.vertices = CollectVertices(data, collected_transactions_, split_key);
|
||||
|
@ -10,6 +10,7 @@
|
||||
// licenses/APL.txt.
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
@ -38,6 +39,7 @@ struct SplitData {
|
||||
std::vector<SchemaProperty> schema;
|
||||
Config config;
|
||||
std::unordered_map<uint64_t, std::string> id_to_name;
|
||||
uint64_t shard_version;
|
||||
|
||||
VertexContainer vertices;
|
||||
std::optional<EdgeContainer> edges;
|
||||
@ -58,7 +60,8 @@ class Splitter final {
|
||||
Splitter operator=(Splitter &&) noexcept = delete;
|
||||
~Splitter() = default;
|
||||
|
||||
SplitData SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key);
|
||||
SplitData SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key,
|
||||
uint64_t shard_version);
|
||||
|
||||
private:
|
||||
VertexContainer CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id,
|
||||
|
@ -86,7 +86,7 @@ BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplit)(::benchmark::State &state)
|
||||
acc.Commit(GetNextHlc());
|
||||
}
|
||||
for (auto _ : state) {
|
||||
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{state.range(0) / 2}});
|
||||
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{state.range(0) / 2}}, 2);
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,7 +114,7 @@ BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithGc)(::benchmark::State &
|
||||
}
|
||||
storage->CollectGarbage(GetNextHlc().coordinator_wall_clock);
|
||||
for (auto _ : state) {
|
||||
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{state.range(0) / 2}});
|
||||
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{state.range(0) / 2}}, 2);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -136,7 +136,7 @@ TEST_F(ShardSplitTest, TestBasicSplitWithVertices) {
|
||||
auto current_hlc = GetNextHlc();
|
||||
acc.Commit(current_hlc);
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 0);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
@ -179,7 +179,7 @@ TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
|
||||
auto current_hlc = GetNextHlc();
|
||||
acc.Commit(current_hlc);
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 2);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
@ -226,7 +226,7 @@ TEST_F(ShardSplitTest, TestBasicSplitBeforeCommit) {
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
|
||||
.HasError());
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 2);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 1);
|
||||
@ -257,7 +257,7 @@ TEST_F(ShardSplitTest, TestBasicSplitWithCommitedAndOngoingTransactions) {
|
||||
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
|
||||
.HasError());
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 2);
|
||||
EXPECT_EQ(splitted_data.transactions.size(), 2);
|
||||
@ -276,7 +276,7 @@ TEST_F(ShardSplitTest, TestBasicSplitWithLabelIndex) {
|
||||
acc.Commit(GetNextHlc());
|
||||
storage.CreateIndex(secondary_label);
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
|
||||
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 0);
|
||||
@ -302,7 +302,7 @@ TEST_F(ShardSplitTest, TestBasicSplitWithLabelPropertyIndex) {
|
||||
acc.Commit(GetNextHlc());
|
||||
storage.CreateIndex(secondary_label, secondary_property);
|
||||
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)});
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
|
||||
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 3);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 0);
|
||||
@ -329,7 +329,7 @@ TEST_F(ShardSplitTest, TestBigSplit) {
|
||||
storage.CreateIndex(secondary_label, secondary_property);
|
||||
|
||||
const auto split_value = pk / 2;
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(split_value)});
|
||||
auto splitted_data = storage.PerformSplit({PropertyValue(split_value)}, 2);
|
||||
|
||||
EXPECT_EQ(splitted_data.vertices.size(), 100000);
|
||||
EXPECT_EQ(splitted_data.edges->size(), 50000);
|
||||
|
Loading…
Reference in New Issue
Block a user