From 59f4b893612f187d9abe7f0bb0cf02b07d0dcb79 Mon Sep 17 00:00:00 2001 From: jbajic Date: Tue, 6 Dec 2022 12:34:36 +0100 Subject: [PATCH 1/2] Remove redundandt shard properties --- src/storage/v3/shard.cpp | 5 +---- src/storage/v3/shard.hpp | 32 -------------------------------- 2 files changed, 1 insertion(+), 36 deletions(-) diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index a77f42a76..39c0683cd 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -332,10 +332,7 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key, vertex_validator_{schema_validator_, primary_label}, indices_{config.items, vertex_validator_}, isolation_level_{config.transaction.isolation_level}, - config_{config}, - uuid_{utils::GenerateUUID()}, - epoch_id_{utils::GenerateUUID()}, - global_locker_{file_retainer_.AddLocker()} { + config_{config} { CreateSchema(primary_label_, schema); StoreMapping(std::move(id_to_name)); } diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index b998c06cc..31259a0fe 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -393,38 +393,6 @@ class Shard final { // storage. std::list deleted_edges_; - // UUID used to distinguish snapshots and to link snapshots to WALs - std::string uuid_; - // Sequence number used to keep track of the chain of WALs. - uint64_t wal_seq_num_{0}; - - // UUID to distinguish different main instance runs for replication process - // on SAME storage. - // Multiple instances can have same storage UUID and be MAIN at the same time. - // We cannot compare commit timestamps of those instances if one of them - // becomes the replica of the other so we use epoch_id_ as additional - // discriminating property. - // Example of this: - // We have 2 instances of the same storage, S1 and S2. - // S1 and S2 are MAIN and accept their own commits and write them to the WAL. - // At the moment when S1 commited a transaction with timestamp 20, and S2 - // a different transaction with timestamp 15, we change S2's role to REPLICA - // and register it on S1. - // Without using the epoch_id, we don't know that S1 and S2 have completely - // different transactions, we think that the S2 is behind only by 5 commits. - std::string epoch_id_; - // History of the previous epoch ids. - // Each value consists of the epoch id along the last commit belonging to that - // epoch. - std::deque> epoch_history_; - - uint64_t wal_unsynced_transactions_{0}; - - utils::FileRetainer file_retainer_; - - // Global locker that is used for clients file locking - utils::FileRetainer::FileLocker global_locker_; - // Holds all of the (in progress, committed and aborted) transactions that are read or write to this shard, but // haven't been cleaned up yet std::map> start_logical_id_to_transaction_{}; From 153e9e2fac0a8b628e32585b1dd524f95811420f Mon Sep 17 00:00:00 2001 From: jbajic Date: Wed, 7 Dec 2022 14:13:14 +0100 Subject: [PATCH 2/2] Begin split funcionlity from shard side --- src/storage/v3/shard.cpp | 39 ++++++++++++++++++++++++++++++--- src/storage/v3/shard.hpp | 18 +++++++++++++++ src/storage/v3/shard_rsm.hpp | 3 +++ src/storage/v3/shard_worker.hpp | 8 +++++++ 4 files changed, 65 insertions(+), 3 deletions(-) diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index 39c0683cd..1b3efbff4 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -18,10 +18,7 @@ #include #include #include -#include -#include -#include #include #include "io/network/endpoint.hpp" @@ -1042,6 +1039,42 @@ void Shard::StoreMapping(std::unordered_map id_to_name) { name_id_mapper_.StoreMapping(std::move(id_to_name)); } +std::optional Shard::ShouldSplit() const noexcept { + if (vertices_.size() > 10000000) { + // Why should we care if the selected vertex is deleted + auto mid_elem = vertices_.begin(); + // mid_elem->first + std::ranges::advance(mid_elem, static_cast(vertices_.size() / 2)); + return SplitInfo{shard_version_, mid_elem->first}; + } + return std::nullopt; +} + +SplitData Shard::PerformSplit(const PrimaryKey &split_key) { + SplitData data; + data.vertices = std::map(vertices_.find(split_key), vertices_.end()); + data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()}; + + // Get all edges related with those vertices + if (config_.items.properties_on_edges) { + data.edges = std::invoke([&split_vertices = data.vertices]() { + // How to reserve? + EdgeContainer split_edges; + for (const auto &vertex : split_vertices) { + for (const auto &in_edge : vertex.second.in_edges) { + auto edge = std::get<2>(in_edge).ptr; + split_edges.insert(edge->gid, Edge{.gid = edge->gid, .delta = edge->delta, .properties = edge->properties}); + } + } + return split_edges; + }); + } + // TODO We also need to send ongoing transactions to the shard + // since they own deltas + + return data; +} + bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const { return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ && (!max_primary_key_.has_value() || vertex_id.primary_key < *max_primary_key_); diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index 31259a0fe..301fc8132 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -175,6 +175,19 @@ struct SchemasInfo { Schemas::SchemasList schemas; }; +struct SplitInfo { + uint64_t shard_version; + PrimaryKey split_point; +}; + +// If edge properties-on-edges is false then we don't need to send edges but +// only vertices, since they will contain those edges +struct SplitData { + VertexContainer vertices; + std::optional edges; + IndicesInfo indices_info; +}; + /// Structure used to return information about the storage. struct StorageInfo { uint64_t vertex_count; @@ -357,6 +370,10 @@ class Shard final { void StoreMapping(std::unordered_map id_to_name); + std::optional ShouldSplit() const noexcept; + + SplitData PerformSplit(const PrimaryKey &split_key); + private: Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level); @@ -374,6 +391,7 @@ class Shard final { // list is used only when properties are enabled for edges. Because of that we // keep a separate count of edges that is always updated. uint64_t edge_count_{0}; + uint64_t shard_version_{0}; SchemaValidator schema_validator_; VertexValidator vertex_validator_; diff --git a/src/storage/v3/shard_rsm.hpp b/src/storage/v3/shard_rsm.hpp index d301bf40b..ba284a3ca 100644 --- a/src/storage/v3/shard_rsm.hpp +++ b/src/storage/v3/shard_rsm.hpp @@ -12,6 +12,7 @@ #pragma once #include +#include #include #include @@ -41,6 +42,8 @@ class ShardRsm { public: explicit ShardRsm(std::unique_ptr &&shard) : shard_(std::move(shard)){}; + std::optional ShouldSplit() const noexcept { return shard_->ShouldSplit(); } + // NOLINTNEXTLINE(readability-convert-member-functions-to-static) msgs::ReadResponses Read(msgs::ReadRequests requests) { return std::visit([&](auto &&request) mutable { return HandleRead(std::forward(request)); }, diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp index 547aa0a6f..dcdc6ee13 100644 --- a/src/storage/v3/shard_worker.hpp +++ b/src/storage/v3/shard_worker.hpp @@ -173,6 +173,14 @@ class ShardWorker { auto &rsm = rsm_map_.at(uuid); Time next_for_uuid = rsm.Cron(); + // Check if shard should split + if (const auto split_info = rsm.ShouldSplit(); split_info) { + // Request split from coordinator + // split_point => middle pk + // shard_id => uuid + // shard_version => + } + cron_schedule_.pop(); cron_schedule_.push(std::make_pair(next_for_uuid, uuid)); } else {