From 486da0bd1cf6711f626145eff9334d56fc405525 Mon Sep 17 00:00:00 2001 From: jbajic Date: Wed, 7 Dec 2022 14:13:14 +0100 Subject: [PATCH] Begin split funcionlity from shard side --- src/storage/v3/shard.cpp | 39 ++++++++++++++++++++++++++++++--- src/storage/v3/shard.hpp | 18 +++++++++++++++ src/storage/v3/shard_rsm.hpp | 3 +++ src/storage/v3/shard_worker.hpp | 8 +++++++ 4 files changed, 65 insertions(+), 3 deletions(-) diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index de5161572..294c99f65 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -18,10 +18,7 @@ #include #include #include -#include -#include -#include #include #include "io/network/endpoint.hpp" @@ -1045,6 +1042,42 @@ void Shard::StoreMapping(std::unordered_map id_to_name) { name_id_mapper_.StoreMapping(std::move(id_to_name)); } +std::optional Shard::ShouldSplit() const noexcept { + if (vertices_.size() > 10000000) { + // Why should we care if the selected vertex is deleted + auto mid_elem = vertices_.begin(); + // mid_elem->first + std::ranges::advance(mid_elem, static_cast(vertices_.size() / 2)); + return SplitInfo{shard_version_, mid_elem->first}; + } + return std::nullopt; +} + +SplitData Shard::PerformSplit(const PrimaryKey &split_key) { + SplitData data; + data.vertices = std::map(vertices_.find(split_key), vertices_.end()); + data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()}; + + // Get all edges related with those vertices + if (config_.items.properties_on_edges) { + data.edges = std::invoke([&split_vertices = data.vertices]() { + // How to reserve? + EdgeContainer split_edges; + for (const auto &vertex : split_vertices) { + for (const auto &in_edge : vertex.second.in_edges) { + auto edge = std::get<2>(in_edge).ptr; + split_edges.insert(edge->gid, Edge{.gid = edge->gid, .delta = edge->delta, .properties = edge->properties}); + } + } + return split_edges; + }); + } + // TODO We also need to send ongoing transactions to the shard + // since they own deltas + + return data; +} + bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const { return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ && (!max_primary_key_.has_value() || vertex_id.primary_key < *max_primary_key_); diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index ed170fd55..88849184d 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -174,6 +174,19 @@ struct SchemasInfo { Schemas::SchemasList schemas; }; +struct SplitInfo { + uint64_t shard_version; + PrimaryKey split_point; +}; + +// If edge properties-on-edges is false then we don't need to send edges but +// only vertices, since they will contain those edges +struct SplitData { + VertexContainer vertices; + std::optional edges; + IndicesInfo indices_info; +}; + /// Structure used to return information about the storage. struct StorageInfo { uint64_t vertex_count; @@ -356,6 +369,10 @@ class Shard final { void StoreMapping(std::unordered_map id_to_name); + std::optional ShouldSplit() const noexcept; + + SplitData PerformSplit(const PrimaryKey &split_key); + private: Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level); @@ -373,6 +390,7 @@ class Shard final { // list is used only when properties are enabled for edges. Because of that we // keep a separate count of edges that is always updated. uint64_t edge_count_{0}; + uint64_t shard_version_{0}; SchemaValidator schema_validator_; VertexValidator vertex_validator_; diff --git a/src/storage/v3/shard_rsm.hpp b/src/storage/v3/shard_rsm.hpp index d301bf40b..ba284a3ca 100644 --- a/src/storage/v3/shard_rsm.hpp +++ b/src/storage/v3/shard_rsm.hpp @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include @@ -41,6 +42,8 @@ class ShardRsm { public: explicit ShardRsm(std::unique_ptr &&shard) : shard_(std::move(shard)){}; + std::optional ShouldSplit() const noexcept { return shard_->ShouldSplit(); } + // NOLINTNEXTLINE(readability-convert-member-functions-to-static) msgs::ReadResponses Read(msgs::ReadRequests requests) { return std::visit([&](auto &&request) mutable { return HandleRead(std::forward(request)); }, diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp index 547aa0a6f..dcdc6ee13 100644 --- a/src/storage/v3/shard_worker.hpp +++ b/src/storage/v3/shard_worker.hpp @@ -173,6 +173,14 @@ class ShardWorker { auto &rsm = rsm_map_.at(uuid); Time next_for_uuid = rsm.Cron(); + // Check if shard should split + if (const auto split_info = rsm.ShouldSplit(); split_info) { + // Request split from coordinator + // split_point => middle pk + // shard_id => uuid + // shard_version => + } + cron_schedule_.pop(); cron_schedule_.push(std::make_pair(next_for_uuid, uuid)); } else {