Begin split funcionlity from shard side
This commit is contained in:
parent
b968748d9f
commit
486da0bd1c
@ -18,10 +18,7 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <optional>
|
#include <optional>
|
||||||
#include <variant>
|
|
||||||
|
|
||||||
#include <bits/ranges_algo.h>
|
|
||||||
#include <gflags/gflags.h>
|
|
||||||
#include <spdlog/spdlog.h>
|
#include <spdlog/spdlog.h>
|
||||||
|
|
||||||
#include "io/network/endpoint.hpp"
|
#include "io/network/endpoint.hpp"
|
||||||
@ -1045,6 +1042,42 @@ void Shard::StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name) {
|
|||||||
name_id_mapper_.StoreMapping(std::move(id_to_name));
|
name_id_mapper_.StoreMapping(std::move(id_to_name));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
|
||||||
|
if (vertices_.size() > 10000000) {
|
||||||
|
// Why should we care if the selected vertex is deleted
|
||||||
|
auto mid_elem = vertices_.begin();
|
||||||
|
// mid_elem->first
|
||||||
|
std::ranges::advance(mid_elem, static_cast<VertexContainer::difference_type>(vertices_.size() / 2));
|
||||||
|
return SplitInfo{shard_version_, mid_elem->first};
|
||||||
|
}
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
|
||||||
|
SplitData data;
|
||||||
|
data.vertices = std::map(vertices_.find(split_key), vertices_.end());
|
||||||
|
data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
|
||||||
|
|
||||||
|
// Get all edges related with those vertices
|
||||||
|
if (config_.items.properties_on_edges) {
|
||||||
|
data.edges = std::invoke([&split_vertices = data.vertices]() {
|
||||||
|
// How to reserve?
|
||||||
|
EdgeContainer split_edges;
|
||||||
|
for (const auto &vertex : split_vertices) {
|
||||||
|
for (const auto &in_edge : vertex.second.in_edges) {
|
||||||
|
auto edge = std::get<2>(in_edge).ptr;
|
||||||
|
split_edges.insert(edge->gid, Edge{.gid = edge->gid, .delta = edge->delta, .properties = edge->properties});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return split_edges;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// TODO We also need to send ongoing transactions to the shard
|
||||||
|
// since they own deltas
|
||||||
|
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const {
|
bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const {
|
||||||
return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ &&
|
return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ &&
|
||||||
(!max_primary_key_.has_value() || vertex_id.primary_key < *max_primary_key_);
|
(!max_primary_key_.has_value() || vertex_id.primary_key < *max_primary_key_);
|
||||||
|
@ -174,6 +174,19 @@ struct SchemasInfo {
|
|||||||
Schemas::SchemasList schemas;
|
Schemas::SchemasList schemas;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SplitInfo {
|
||||||
|
uint64_t shard_version;
|
||||||
|
PrimaryKey split_point;
|
||||||
|
};
|
||||||
|
|
||||||
|
// If edge properties-on-edges is false then we don't need to send edges but
|
||||||
|
// only vertices, since they will contain those edges
|
||||||
|
struct SplitData {
|
||||||
|
VertexContainer vertices;
|
||||||
|
std::optional<EdgeContainer> edges;
|
||||||
|
IndicesInfo indices_info;
|
||||||
|
};
|
||||||
|
|
||||||
/// Structure used to return information about the storage.
|
/// Structure used to return information about the storage.
|
||||||
struct StorageInfo {
|
struct StorageInfo {
|
||||||
uint64_t vertex_count;
|
uint64_t vertex_count;
|
||||||
@ -356,6 +369,10 @@ class Shard final {
|
|||||||
|
|
||||||
void StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name);
|
void StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name);
|
||||||
|
|
||||||
|
std::optional<SplitInfo> ShouldSplit() const noexcept;
|
||||||
|
|
||||||
|
SplitData PerformSplit(const PrimaryKey &split_key);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
|
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
|
||||||
|
|
||||||
@ -373,6 +390,7 @@ class Shard final {
|
|||||||
// list is used only when properties are enabled for edges. Because of that we
|
// list is used only when properties are enabled for edges. Because of that we
|
||||||
// keep a separate count of edges that is always updated.
|
// keep a separate count of edges that is always updated.
|
||||||
uint64_t edge_count_{0};
|
uint64_t edge_count_{0};
|
||||||
|
uint64_t shard_version_{0};
|
||||||
|
|
||||||
SchemaValidator schema_validator_;
|
SchemaValidator schema_validator_;
|
||||||
VertexValidator vertex_validator_;
|
VertexValidator vertex_validator_;
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
#include <variant>
|
#include <variant>
|
||||||
|
|
||||||
#include <openssl/ec.h>
|
#include <openssl/ec.h>
|
||||||
@ -41,6 +42,8 @@ class ShardRsm {
|
|||||||
public:
|
public:
|
||||||
explicit ShardRsm(std::unique_ptr<Shard> &&shard) : shard_(std::move(shard)){};
|
explicit ShardRsm(std::unique_ptr<Shard> &&shard) : shard_(std::move(shard)){};
|
||||||
|
|
||||||
|
std::optional<SplitInfo> ShouldSplit() const noexcept { return shard_->ShouldSplit(); }
|
||||||
|
|
||||||
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
|
||||||
msgs::ReadResponses Read(msgs::ReadRequests requests) {
|
msgs::ReadResponses Read(msgs::ReadRequests requests) {
|
||||||
return std::visit([&](auto &&request) mutable { return HandleRead(std::forward<decltype(request)>(request)); },
|
return std::visit([&](auto &&request) mutable { return HandleRead(std::forward<decltype(request)>(request)); },
|
||||||
|
@ -173,6 +173,14 @@ class ShardWorker {
|
|||||||
auto &rsm = rsm_map_.at(uuid);
|
auto &rsm = rsm_map_.at(uuid);
|
||||||
Time next_for_uuid = rsm.Cron();
|
Time next_for_uuid = rsm.Cron();
|
||||||
|
|
||||||
|
// Check if shard should split
|
||||||
|
if (const auto split_info = rsm.ShouldSplit(); split_info) {
|
||||||
|
// Request split from coordinator
|
||||||
|
// split_point => middle pk
|
||||||
|
// shard_id => uuid
|
||||||
|
// shard_version =>
|
||||||
|
}
|
||||||
|
|
||||||
cron_schedule_.pop();
|
cron_schedule_.pop();
|
||||||
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
|
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user