Begin split funcionlity from shard side

This commit is contained in:
jbajic 2022-12-07 14:13:14 +01:00
parent 59f4b89361
commit 153e9e2fac
4 changed files with 65 additions and 3 deletions

View File

@ -18,10 +18,7 @@
#include <memory>
#include <mutex>
#include <optional>
#include <variant>
#include <bits/ranges_algo.h>
#include <gflags/gflags.h>
#include <spdlog/spdlog.h>
#include "io/network/endpoint.hpp"
@ -1042,6 +1039,42 @@ void Shard::StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name) {
name_id_mapper_.StoreMapping(std::move(id_to_name));
}
std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
if (vertices_.size() > 10000000) {
// Why should we care if the selected vertex is deleted
auto mid_elem = vertices_.begin();
// mid_elem->first
std::ranges::advance(mid_elem, static_cast<VertexContainer::difference_type>(vertices_.size() / 2));
return SplitInfo{shard_version_, mid_elem->first};
}
return std::nullopt;
}
SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
SplitData data;
data.vertices = std::map(vertices_.find(split_key), vertices_.end());
data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
// Get all edges related with those vertices
if (config_.items.properties_on_edges) {
data.edges = std::invoke([&split_vertices = data.vertices]() {
// How to reserve?
EdgeContainer split_edges;
for (const auto &vertex : split_vertices) {
for (const auto &in_edge : vertex.second.in_edges) {
auto edge = std::get<2>(in_edge).ptr;
split_edges.insert(edge->gid, Edge{.gid = edge->gid, .delta = edge->delta, .properties = edge->properties});
}
}
return split_edges;
});
}
// TODO We also need to send ongoing transactions to the shard
// since they own deltas
return data;
}
bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const {
return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ &&
(!max_primary_key_.has_value() || vertex_id.primary_key < *max_primary_key_);

View File

@ -175,6 +175,19 @@ struct SchemasInfo {
Schemas::SchemasList schemas;
};
struct SplitInfo {
uint64_t shard_version;
PrimaryKey split_point;
};
// If edge properties-on-edges is false then we don't need to send edges but
// only vertices, since they will contain those edges
struct SplitData {
VertexContainer vertices;
std::optional<EdgeContainer> edges;
IndicesInfo indices_info;
};
/// Structure used to return information about the storage.
struct StorageInfo {
uint64_t vertex_count;
@ -357,6 +370,10 @@ class Shard final {
void StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name);
std::optional<SplitInfo> ShouldSplit() const noexcept;
SplitData PerformSplit(const PrimaryKey &split_key);
private:
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
@ -374,6 +391,7 @@ class Shard final {
// list is used only when properties are enabled for edges. Because of that we
// keep a separate count of edges that is always updated.
uint64_t edge_count_{0};
uint64_t shard_version_{0};
SchemaValidator schema_validator_;
VertexValidator vertex_validator_;

View File

@ -12,6 +12,7 @@
#pragma once
#include <memory>
#include <optional>
#include <variant>
#include <openssl/ec.h>
@ -41,6 +42,8 @@ class ShardRsm {
public:
explicit ShardRsm(std::unique_ptr<Shard> &&shard) : shard_(std::move(shard)){};
std::optional<SplitInfo> ShouldSplit() const noexcept { return shard_->ShouldSplit(); }
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
msgs::ReadResponses Read(msgs::ReadRequests requests) {
return std::visit([&](auto &&request) mutable { return HandleRead(std::forward<decltype(request)>(request)); },

View File

@ -173,6 +173,14 @@ class ShardWorker {
auto &rsm = rsm_map_.at(uuid);
Time next_for_uuid = rsm.Cron();
// Check if shard should split
if (const auto split_info = rsm.ShouldSplit(); split_info) {
// Request split from coordinator
// split_point => middle pk
// shard_id => uuid
// shard_version =>
}
cron_schedule_.pop();
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));
} else {