diff --git a/src/coordinator/hybrid_logical_clock.hpp b/src/coordinator/hybrid_logical_clock.hpp index 38571dc20..e8ea30e6b 100644 --- a/src/coordinator/hybrid_logical_clock.hpp +++ b/src/coordinator/hybrid_logical_clock.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -23,17 +23,17 @@ namespace memgraph::coordinator { using Time = memgraph::io::Time; /// Hybrid-logical clock -struct Hlc { - uint64_t logical_id = 0; +struct Hlc final { + uint64_t logical_id{0}; Time coordinator_wall_clock = Time::min(); - auto operator<=>(const Hlc &other) const { return logical_id <=> other.logical_id; } + auto operator<=>(const Hlc &other) const noexcept { return logical_id <=> other.logical_id; } - bool operator==(const Hlc &other) const = default; - bool operator<(const Hlc &other) const = default; - bool operator==(const uint64_t other) const { return logical_id == other; } - bool operator<(const uint64_t other) const { return logical_id < other; } - bool operator>=(const uint64_t other) const { return logical_id >= other; } + bool operator==(const Hlc &other) const noexcept = default; + bool operator<(const Hlc &other) const noexcept = default; + bool operator==(const uint64_t other) const noexcept { return logical_id == other; } + bool operator<(const uint64_t other) const noexcept { return logical_id < other; } + bool operator>=(const uint64_t other) const noexcept { return logical_id >= other; } friend std::ostream &operator<<(std::ostream &in, const Hlc &hlc) { auto wall_clock = std::chrono::system_clock::to_time_t(hlc.coordinator_wall_clock); diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index b2d7f9123..f27094c02 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -12,6 +12,7 @@ #pragma once #include <chrono> +#include <cstdint> #include <iostream> #include <map> #include <memory> @@ -571,6 +572,16 @@ struct CommitResponse { std::optional<ShardError> error; }; +struct SplitInfo { + PrimaryKey split_key; + uint64_t shard_version; +}; + +struct PerformSplitDataInfo { + PrimaryKey split_key; + uint64_t shard_version; +}; + using ReadRequests = std::variant<ExpandOneRequest, GetPropertiesRequest, ScanVerticesRequest>; using ReadResponses = std::variant<ExpandOneResponse, GetPropertiesResponse, ScanVerticesResponse>; diff --git a/src/storage/v3/CMakeLists.txt b/src/storage/v3/CMakeLists.txt index b3a3d68a9..05f86a5dd 100644 --- a/src/storage/v3/CMakeLists.txt +++ b/src/storage/v3/CMakeLists.txt @@ -18,6 +18,7 @@ set(storage_v3_src_files bindings/typed_value.cpp expr.cpp vertex.cpp + splitter.cpp request_helper.cpp) # ###################### diff --git a/src/storage/v3/config.hpp b/src/storage/v3/config.hpp index 05179c6b0..868e82f21 100644 --- a/src/storage/v3/config.hpp +++ b/src/storage/v3/config.hpp @@ -30,6 +30,10 @@ struct Config { io::Duration reclamation_interval{}; } gc; + struct Split { + uint64_t max_shard_vertex_size{500'000}; + } split; + struct Items { bool properties_on_edges{true}; } items; diff --git a/src/storage/v3/delta.hpp b/src/storage/v3/delta.hpp index 0bc58f5ef..555a6a151 100644 --- a/src/storage/v3/delta.hpp +++ b/src/storage/v3/delta.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -13,12 +13,14 @@ #include <cstdint> #include <memory> + #include "storage/v3/edge_ref.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/vertex.hpp" #include "storage/v3/vertex_id.hpp" #include "utils/logging.hpp" +#include "utils/synchronized.hpp" namespace memgraph::storage::v3 { @@ -27,6 +29,11 @@ struct Edge; struct Delta; struct CommitInfo; +inline uint64_t GetNextDeltaId() { + static utils::Synchronized<uint64_t, utils::SpinLock> delta_id{0}; + return delta_id.WithLock([](auto &id) { return id++; }); +} + // This class stores one of three pointers (`Delta`, `Vertex` and `Edge`) // without using additional memory for storing the type. The type is stored in // the pointer itself in the lower bits. All of those structures contain large @@ -158,46 +165,54 @@ struct Delta { struct RemoveInEdgeTag {}; struct RemoveOutEdgeTag {}; - Delta(DeleteObjectTag /*unused*/, CommitInfo *commit_info, uint64_t command_id) - : action(Action::DELETE_OBJECT), commit_info(commit_info), command_id(command_id) {} + Delta(DeleteObjectTag /*unused*/, CommitInfo *commit_info, uint64_t delta_id, uint64_t command_id) + : action(Action::DELETE_OBJECT), id(delta_id), commit_info(commit_info), command_id(command_id) {} - Delta(RecreateObjectTag /*unused*/, CommitInfo *commit_info, uint64_t command_id) - : action(Action::RECREATE_OBJECT), commit_info(commit_info), command_id(command_id) {} + Delta(RecreateObjectTag /*unused*/, CommitInfo *commit_info, uint64_t delta_id, uint64_t command_id) + : action(Action::RECREATE_OBJECT), id(delta_id), commit_info(commit_info), command_id(command_id) {} - Delta(AddLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t command_id) - : action(Action::ADD_LABEL), commit_info(commit_info), command_id(command_id), label(label) {} + Delta(AddLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t delta_id, uint64_t command_id) + : action(Action::ADD_LABEL), id(delta_id), commit_info(commit_info), command_id(command_id), label(label) {} - Delta(RemoveLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t command_id) - : action(Action::REMOVE_LABEL), commit_info(commit_info), command_id(command_id), label(label) {} + Delta(RemoveLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t delta_id, uint64_t command_id) + : action(Action::REMOVE_LABEL), id(delta_id), commit_info(commit_info), command_id(command_id), label(label) {} Delta(SetPropertyTag /*unused*/, PropertyId key, const PropertyValue &value, CommitInfo *commit_info, - uint64_t command_id) - : action(Action::SET_PROPERTY), commit_info(commit_info), command_id(command_id), property({key, value}) {} + uint64_t delta_id, uint64_t command_id) + : action(Action::SET_PROPERTY), + id(delta_id), + commit_info(commit_info), + command_id(command_id), + property({key, value}) {} Delta(AddInEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info, - uint64_t command_id) + uint64_t delta_id, uint64_t command_id) : action(Action::ADD_IN_EDGE), + id(delta_id), commit_info(commit_info), command_id(command_id), vertex_edge({edge_type, std::move(vertex_id), edge}) {} Delta(AddOutEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info, - uint64_t command_id) + uint64_t delta_id, uint64_t command_id) : action(Action::ADD_OUT_EDGE), + id(delta_id), commit_info(commit_info), command_id(command_id), vertex_edge({edge_type, std::move(vertex_id), edge}) {} Delta(RemoveInEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info, - uint64_t command_id) + uint64_t delta_id, uint64_t command_id) : action(Action::REMOVE_IN_EDGE), + id(delta_id), commit_info(commit_info), command_id(command_id), vertex_edge({edge_type, std::move(vertex_id), edge}) {} Delta(RemoveOutEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info, - uint64_t command_id) + uint64_t delta_id, uint64_t command_id) : action(Action::REMOVE_OUT_EDGE), + id(delta_id), commit_info(commit_info), command_id(command_id), vertex_edge({edge_type, std::move(vertex_id), edge}) {} @@ -226,8 +241,10 @@ struct Delta { } } - Action action; + friend bool operator==(const Delta &lhs, const Delta &rhs) noexcept { return lhs.id == rhs.id; } + Action action; + uint64_t id; // TODO: optimize with in-place copy CommitInfo *commit_info; uint64_t command_id; diff --git a/src/storage/v3/indices.cpp b/src/storage/v3/indices.cpp index 2f11a35ec..931dd83af 100644 --- a/src/storage/v3/indices.cpp +++ b/src/storage/v3/indices.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -325,7 +325,7 @@ void LabelIndex::RemoveObsoleteEntries(const uint64_t clean_up_before_timestamp) } } -LabelIndex::Iterable::Iterator::Iterator(Iterable *self, LabelIndexContainer::iterator index_iterator) +LabelIndex::Iterable::Iterator::Iterator(Iterable *self, IndexContainer::iterator index_iterator) : self_(self), index_iterator_(index_iterator), current_vertex_accessor_(nullptr, nullptr, nullptr, self_->config_, *self_->vertex_validator_), @@ -353,7 +353,7 @@ void LabelIndex::Iterable::Iterator::AdvanceUntilValid() { } } -LabelIndex::Iterable::Iterable(LabelIndexContainer &index_container, LabelId label, View view, Transaction *transaction, +LabelIndex::Iterable::Iterable(IndexContainer &index_container, LabelId label, View view, Transaction *transaction, Indices *indices, Config::Items config, const VertexValidator &vertex_validator) : index_container_(&index_container), label_(label), @@ -465,7 +465,7 @@ void LabelPropertyIndex::RemoveObsoleteEntries(const uint64_t clean_up_before_ti } } -LabelPropertyIndex::Iterable::Iterator::Iterator(Iterable *self, LabelPropertyIndexContainer::iterator index_iterator) +LabelPropertyIndex::Iterable::Iterator::Iterator(Iterable *self, IndexContainer::iterator index_iterator) : self_(self), index_iterator_(index_iterator), current_vertex_accessor_(nullptr, nullptr, nullptr, self_->config_, *self_->vertex_validator_), @@ -526,7 +526,7 @@ const PropertyValue kSmallestMap = PropertyValue(std::map<std::string, PropertyV const PropertyValue kSmallestTemporalData = PropertyValue(TemporalData{static_cast<TemporalType>(0), std::numeric_limits<int64_t>::min()}); -LabelPropertyIndex::Iterable::Iterable(LabelPropertyIndexContainer &index_container, LabelId label, PropertyId property, +LabelPropertyIndex::Iterable::Iterable(IndexContainer &index_container, LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound, const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction, Indices *indices, Config::Items config, diff --git a/src/storage/v3/indices.hpp b/src/storage/v3/indices.hpp index d8e491470..882f2e894 100644 --- a/src/storage/v3/indices.hpp +++ b/src/storage/v3/indices.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -18,6 +18,8 @@ #include <utility> #include "storage/v3/config.hpp" +#include "storage/v3/id_types.hpp" +#include "storage/v3/key_store.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/transaction.hpp" #include "storage/v3/vertex_accessor.hpp" @@ -40,12 +42,18 @@ class LabelIndex { bool operator==(const Entry &rhs) const { return vertex == rhs.vertex && timestamp == rhs.timestamp; } }; + using IndexType = LabelId; + public: - using LabelIndexContainer = std::set<Entry>; + using IndexContainer = std::set<Entry>; LabelIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator) : indices_(indices), config_(config), vertex_validator_{&vertex_validator} {} + LabelIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator, + std::map<LabelId, IndexContainer> &data) + : index_{std::move(data)}, indices_(indices), config_(config), vertex_validator_{&vertex_validator} {} + /// @throw std::bad_alloc void UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx); @@ -63,12 +71,12 @@ class LabelIndex { class Iterable { public: - Iterable(LabelIndexContainer &index_container, LabelId label, View view, Transaction *transaction, Indices *indices, + Iterable(IndexContainer &index_container, LabelId label, View view, Transaction *transaction, Indices *indices, Config::Items config, const VertexValidator &vertex_validator); class Iterator { public: - Iterator(Iterable *self, LabelIndexContainer::iterator index_iterator); + Iterator(Iterable *self, IndexContainer::iterator index_iterator); VertexAccessor operator*() const { return current_vertex_accessor_; } @@ -81,7 +89,7 @@ class LabelIndex { void AdvanceUntilValid(); Iterable *self_; - LabelIndexContainer::iterator index_iterator_; + IndexContainer::iterator index_iterator_; VertexAccessor current_vertex_accessor_; Vertex *current_vertex_; }; @@ -90,7 +98,7 @@ class LabelIndex { Iterator end() { return {this, index_container_->end()}; } private: - LabelIndexContainer *index_container_; + IndexContainer *index_container_; LabelId label_; View view_; Transaction *transaction_; @@ -114,8 +122,29 @@ class LabelIndex { void Clear() { index_.clear(); } + std::map<IndexType, IndexContainer> SplitIndexEntries(const PrimaryKey &split_key) { + std::map<IndexType, IndexContainer> cloned_indices; + for (auto &[index_type_val, index] : index_) { + auto entry_it = index.begin(); + auto &cloned_indices_container = cloned_indices[index_type_val]; + while (entry_it != index.end()) { + // We need to save the next iterator since the current one will be + // invalidated after extract + auto next_entry_it = std::next(entry_it); + if (entry_it->vertex->first > split_key) { + [[maybe_unused]] const auto &[inserted_entry_it, inserted, node] = + cloned_indices_container.insert(index.extract(entry_it)); + MG_ASSERT(inserted, "Failed to extract index entry!"); + } + entry_it = next_entry_it; + } + } + + return cloned_indices; + } + private: - std::map<LabelId, LabelIndexContainer> index_; + std::map<LabelId, IndexContainer> index_; Indices *indices_; Config::Items config_; const VertexValidator *vertex_validator_; @@ -133,9 +162,10 @@ class LabelPropertyIndex { bool operator<(const PropertyValue &rhs) const; bool operator==(const PropertyValue &rhs) const; }; + using IndexType = std::pair<LabelId, PropertyId>; public: - using LabelPropertyIndexContainer = std::set<Entry>; + using IndexContainer = std::set<Entry>; LabelPropertyIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator) : indices_(indices), config_(config), vertex_validator_{&vertex_validator} {} @@ -159,14 +189,14 @@ class LabelPropertyIndex { class Iterable { public: - Iterable(LabelPropertyIndexContainer &index_container, LabelId label, PropertyId property, + Iterable(IndexContainer &index_container, LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound, const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction, Indices *indices, Config::Items config, const VertexValidator &vertex_validator); class Iterator { public: - Iterator(Iterable *self, LabelPropertyIndexContainer::iterator index_iterator); + Iterator(Iterable *self, IndexContainer::iterator index_iterator); VertexAccessor operator*() const { return current_vertex_accessor_; } @@ -179,7 +209,7 @@ class LabelPropertyIndex { void AdvanceUntilValid(); Iterable *self_; - LabelPropertyIndexContainer::iterator index_iterator_; + IndexContainer::iterator index_iterator_; VertexAccessor current_vertex_accessor_; Vertex *current_vertex_; }; @@ -188,7 +218,7 @@ class LabelPropertyIndex { Iterator end(); private: - LabelPropertyIndexContainer *index_container_; + IndexContainer *index_container_; LabelId label_; PropertyId property_; std::optional<utils::Bound<PropertyValue>> lower_bound_; @@ -229,8 +259,29 @@ class LabelPropertyIndex { void Clear() { index_.clear(); } + std::map<IndexType, IndexContainer> SplitIndexEntries(const PrimaryKey &split_key) { + std::map<IndexType, IndexContainer> cloned_indices; + for (auto &[index_type_val, index] : index_) { + auto entry_it = index.begin(); + auto &cloned_index_container = cloned_indices[index_type_val]; + while (entry_it != index.end()) { + // We need to save the next iterator since the current one will be + // invalidated after extract + auto next_entry_it = std::next(entry_it); + if (entry_it->vertex->first > split_key) { + [[maybe_unused]] const auto &[inserted_entry_it, inserted, node] = + cloned_index_container.insert(index.extract(entry_it)); + MG_ASSERT(inserted, "Failed to extract index entry!"); + } + entry_it = next_entry_it; + } + } + + return cloned_indices; + } + private: - std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndexContainer> index_; + std::map<std::pair<LabelId, PropertyId>, IndexContainer> index_; Indices *indices_; Config::Items config_; const VertexValidator *vertex_validator_; diff --git a/src/storage/v3/mvcc.hpp b/src/storage/v3/mvcc.hpp index 6ce058d62..797339f8b 100644 --- a/src/storage/v3/mvcc.hpp +++ b/src/storage/v3/mvcc.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -108,7 +108,7 @@ inline bool PrepareForWrite(Transaction *transaction, TObj *object) { /// a `DELETE_OBJECT` delta). /// @throw std::bad_alloc inline Delta *CreateDeleteObjectDelta(Transaction *transaction) { - return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_info.get(), + return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_info.get(), GetNextDeltaId(), transaction->command_id); } @@ -119,7 +119,7 @@ template <typename TObj, class... Args> requires utils::SameAsAnyOf<TObj, Edge, Vertex> inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&...args) { auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)..., transaction->commit_info.get(), - transaction->command_id); + GetNextDeltaId(), transaction->command_id); auto *delta_holder = GetDeltaHolder(object); // The operations are written in such order so that both `next` and `prev` diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index 5e16209d9..afd13fa88 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -18,14 +18,14 @@ #include <memory> #include <mutex> #include <optional> -#include <variant> #include <bits/ranges_algo.h> -#include <gflags/gflags.h> #include <spdlog/spdlog.h> #include "io/network/endpoint.hpp" #include "io/time.hpp" +#include "storage/v3/delta.hpp" +#include "storage/v3/edge.hpp" #include "storage/v3/edge_accessor.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/indices.hpp" @@ -332,16 +332,64 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key, vertex_validator_{schema_validator_, primary_label}, indices_{config.items, vertex_validator_}, isolation_level_{config.transaction.isolation_level}, - config_{config}, - uuid_{utils::GenerateUUID()}, - epoch_id_{utils::GenerateUUID()}, - global_locker_{file_retainer_.AddLocker()} { + config_{config} { CreateSchema(primary_label_, schema); StoreMapping(std::move(id_to_name)); } +Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key, + std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges, + std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config, + const std::unordered_map<uint64_t, std::string> &id_to_name, const uint64_t shard_version) + : primary_label_{primary_label}, + min_primary_key_{min_primary_key}, + max_primary_key_{max_primary_key}, + vertices_(std::move(vertices)), + edges_(std::move(edges)), + shard_version_(shard_version), + schema_validator_{schemas_, name_id_mapper_}, + vertex_validator_{schema_validator_, primary_label}, + indices_{config.items, vertex_validator_}, + isolation_level_{config.transaction.isolation_level}, + config_{config}, + start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)) { + CreateSchema(primary_label_, schema); + StoreMapping(id_to_name); +} + +Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key, + std::vector<SchemaProperty> schema, VertexContainer &&vertices, + std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config, + const std::unordered_map<uint64_t, std::string> &id_to_name, const uint64_t shard_version) + : primary_label_{primary_label}, + min_primary_key_{min_primary_key}, + max_primary_key_{max_primary_key}, + vertices_(std::move(vertices)), + shard_version_(shard_version), + schema_validator_{schemas_, name_id_mapper_}, + vertex_validator_{schema_validator_, primary_label}, + indices_{config.items, vertex_validator_}, + isolation_level_{config.transaction.isolation_level}, + config_{config}, + start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)) { + CreateSchema(primary_label_, schema); + StoreMapping(id_to_name); +} + Shard::~Shard() {} +std::unique_ptr<Shard> Shard::FromSplitData(SplitData &&split_data) { + if (split_data.config.items.properties_on_edges) [[likely]] { + return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.max_primary_key, + split_data.schema, std::move(split_data.vertices), std::move(*split_data.edges), + std::move(split_data.transactions), split_data.config, split_data.id_to_name, + split_data.shard_version); + } + return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.max_primary_key, + split_data.schema, std::move(split_data.vertices), std::move(split_data.transactions), + split_data.config, split_data.id_to_name, split_data.shard_version); +} + Shard::Accessor::Accessor(Shard &shard, Transaction &transaction) : shard_(&shard), transaction_(&transaction), config_(shard_->config_.items) {} @@ -436,7 +484,7 @@ ShardResult<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> } std::vector<EdgeAccessor> deleted_edges; - const VertexId vertex_id{shard_->primary_label_, *vertex->PrimaryKey(View::OLD)}; // TODO Replace + const VertexId vertex_id{shard_->primary_label_, *vertex->PrimaryKey(View::OLD)}; for (const auto &item : in_edges) { auto [edge_type, from_vertex, edge] = item; EdgeAccessor e(edge, edge_type, from_vertex, vertex_id, transaction_, &shard_->indices_, config_); @@ -1048,6 +1096,28 @@ void Shard::StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name) { name_id_mapper_.StoreMapping(std::move(id_to_name)); } +std::optional<SplitInfo> Shard::ShouldSplit() const noexcept { + if (vertices_.size() > config_.split.max_shard_vertex_size) { + auto mid_elem = vertices_.begin(); + // TODO(tyler) the first time we calculate the split point, we should store it so that we don't have to + // iterate over half of the entire index each time Cron is run until the split succeeds. + std::ranges::advance(mid_elem, static_cast<VertexContainer::difference_type>(vertices_.size() / 2)); + return SplitInfo{mid_elem->first, shard_version_}; + } + return std::nullopt; +} + +SplitData Shard::PerformSplit(const PrimaryKey &split_key, const uint64_t shard_version) { + shard_version_ = shard_version; + const auto old_max_key = max_primary_key_; + max_primary_key_ = split_key; + const auto *schema = GetSchema(primary_label_); + MG_ASSERT(schema, "Shard must know about schema of primary label!"); + Splitter shard_splitter(primary_label_, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, + schema->second, name_id_mapper_); + return shard_splitter.SplitShard(split_key, old_max_key, shard_version); +} + bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const { return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ && (!max_primary_key_.has_value() || vertex_id.primary_key < *max_primary_key_); diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index 16e2bf6be..9496a6084 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -14,6 +14,7 @@ #include <cstdint> #include <filesystem> #include <map> +#include <memory> #include <numeric> #include <optional> #include <shared_mutex> @@ -37,6 +38,7 @@ #include "storage/v3/result.hpp" #include "storage/v3/schema_validator.hpp" #include "storage/v3/schemas.hpp" +#include "storage/v3/splitter.hpp" #include "storage/v3/transaction.hpp" #include "storage/v3/vertex.hpp" #include "storage/v3/vertex_accessor.hpp" @@ -174,6 +176,11 @@ struct SchemasInfo { Schemas::SchemasList schemas; }; +struct SplitInfo { + PrimaryKey split_point; + uint64_t shard_version; +}; + /// Structure used to return information about the storage. struct StorageInfo { uint64_t vertex_count; @@ -186,9 +193,19 @@ class Shard final { public: /// @throw std::system_error /// @throw std::bad_alloc - explicit Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key, - std::vector<SchemaProperty> schema, Config config = Config(), - std::unordered_map<uint64_t, std::string> id_to_name = {}); + Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key, + std::vector<SchemaProperty> schema, Config config = Config(), + std::unordered_map<uint64_t, std::string> id_to_name = {}); + + Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key, + std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges, + std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config, + const std::unordered_map<uint64_t, std::string> &id_to_name, uint64_t shard_version); + + Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key, + std::vector<SchemaProperty> schema, VertexContainer &&vertices, + std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config, + const std::unordered_map<uint64_t, std::string> &id_to_name, uint64_t shard_version); Shard(const Shard &) = delete; Shard(Shard &&) noexcept = delete; @@ -196,6 +213,8 @@ class Shard final { Shard operator=(Shard &&) noexcept = delete; ~Shard(); + static std::unique_ptr<Shard> FromSplitData(SplitData &&split_data); + class Accessor final { private: friend class Shard; @@ -360,6 +379,10 @@ class Shard final { void StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name); + std::optional<SplitInfo> ShouldSplit() const noexcept; + + SplitData PerformSplit(const PrimaryKey &split_key, uint64_t shard_version); + private: Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level); @@ -377,6 +400,7 @@ class Shard final { // list is used only when properties are enabled for edges. Because of that we // keep a separate count of edges that is always updated. uint64_t edge_count_{0}; + uint64_t shard_version_{0}; SchemaValidator schema_validator_; VertexValidator vertex_validator_; @@ -396,38 +420,6 @@ class Shard final { // storage. std::list<Gid> deleted_edges_; - // UUID used to distinguish snapshots and to link snapshots to WALs - std::string uuid_; - // Sequence number used to keep track of the chain of WALs. - uint64_t wal_seq_num_{0}; - - // UUID to distinguish different main instance runs for replication process - // on SAME storage. - // Multiple instances can have same storage UUID and be MAIN at the same time. - // We cannot compare commit timestamps of those instances if one of them - // becomes the replica of the other so we use epoch_id_ as additional - // discriminating property. - // Example of this: - // We have 2 instances of the same storage, S1 and S2. - // S1 and S2 are MAIN and accept their own commits and write them to the WAL. - // At the moment when S1 commited a transaction with timestamp 20, and S2 - // a different transaction with timestamp 15, we change S2's role to REPLICA - // and register it on S1. - // Without using the epoch_id, we don't know that S1 and S2 have completely - // different transactions, we think that the S2 is behind only by 5 commits. - std::string epoch_id_; - // History of the previous epoch ids. - // Each value consists of the epoch id along the last commit belonging to that - // epoch. - std::deque<std::pair<std::string, uint64_t>> epoch_history_; - - uint64_t wal_unsynced_transactions_{0}; - - utils::FileRetainer file_retainer_; - - // Global locker that is used for clients file locking - utils::FileRetainer::FileLocker global_locker_; - // Holds all of the (in progress, committed and aborted) transactions that are read or write to this shard, but // haven't been cleaned up yet std::map<uint64_t, std::unique_ptr<Transaction>> start_logical_id_to_transaction_{}; diff --git a/src/storage/v3/shard_rsm.hpp b/src/storage/v3/shard_rsm.hpp index 18199d8a1..9c178eccc 100644 --- a/src/storage/v3/shard_rsm.hpp +++ b/src/storage/v3/shard_rsm.hpp @@ -12,11 +12,13 @@ #pragma once #include <memory> +#include <optional> #include <variant> #include <openssl/ec.h> #include "query/v2/requests.hpp" #include "storage/v3/shard.hpp" +#include "storage/v3/value_conversions.hpp" #include "storage/v3/vertex_accessor.hpp" namespace memgraph::storage::v3 { @@ -41,6 +43,19 @@ class ShardRsm { public: explicit ShardRsm(std::unique_ptr<Shard> &&shard) : shard_(std::move(shard)){}; + std::optional<msgs::SplitInfo> ShouldSplit() const noexcept { + auto split_info = shard_->ShouldSplit(); + if (split_info) { + return msgs::SplitInfo{conversions::ConvertValueVector(split_info->split_point), split_info->shard_version}; + } + return std::nullopt; + } + + std::unique_ptr<Shard> PerformSplit(msgs::PerformSplitDataInfo perform_split) const noexcept { + return Shard::FromSplitData( + shard_->PerformSplit(conversions::ConvertPropertyVector(perform_split.split_key), perform_split.shard_version)); + } + // NOLINTNEXTLINE(readability-convert-member-functions-to-static) msgs::ReadResponses Read(msgs::ReadRequests &&requests) { return std::visit([&](auto &&request) mutable { return HandleRead(std::forward<decltype(request)>(request)); }, diff --git a/src/storage/v3/splitter.cpp b/src/storage/v3/splitter.cpp new file mode 100644 index 000000000..6178b67e6 --- /dev/null +++ b/src/storage/v3/splitter.cpp @@ -0,0 +1,411 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "storage/v3/splitter.hpp" + +#include <algorithm> +#include <cstdint> +#include <map> +#include <memory> +#include <optional> +#include <set> + +#include "storage/v3/config.hpp" +#include "storage/v3/delta.hpp" +#include "storage/v3/id_types.hpp" +#include "storage/v3/indices.hpp" +#include "storage/v3/key_store.hpp" +#include "storage/v3/name_id_mapper.hpp" +#include "storage/v3/schemas.hpp" +#include "storage/v3/shard.hpp" +#include "storage/v3/transaction.hpp" +#include "storage/v3/vertex.hpp" +#include "utils/logging.hpp" + +namespace memgraph::storage::v3 { + +Splitter::Splitter(const LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges, + std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices, + const Config &config, const std::vector<SchemaProperty> &schema, const NameIdMapper &name_id_mapper) + : primary_label_(primary_label), + vertices_(vertices), + edges_(edges), + start_logical_id_to_transaction_(start_logical_id_to_transaction), + indices_(indices), + config_(config), + schema_(schema), + name_id_mapper_(name_id_mapper) {} + +SplitData Splitter::SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key, + const uint64_t shard_version) { + SplitData data{.primary_label = primary_label_, + .min_primary_key = split_key, + .max_primary_key = max_primary_key, + .schema = schema_, + .config = config_, + .id_to_name = name_id_mapper_.GetIdToNameMap(), + .shard_version = shard_version}; + + std::set<uint64_t> collected_transactions_; + data.vertices = CollectVertices(data, collected_transactions_, split_key); + data.edges = CollectEdges(collected_transactions_, data.vertices, split_key); + data.transactions = CollectTransactions(collected_transactions_, *data.edges, split_key); + + return data; +} + +void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_, const Delta *delta) { + while (delta != nullptr) { + collected_transactions_.insert(delta->commit_info->start_or_commit_timestamp.logical_id); + delta = delta->next; + } +} + +VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_, + const PrimaryKey &split_key) { + data.label_indices = indices_.label_index.SplitIndexEntries(split_key); + data.label_property_indices = indices_.label_property_index.SplitIndexEntries(split_key); + + VertexContainer splitted_data; + auto split_key_it = vertices_.find(split_key); + while (split_key_it != vertices_.end()) { + // Go through deltas and pick up transactions start_id/commit_id + ScanDeltas(collected_transactions_, split_key_it->second.delta); + + auto next_it = std::next(split_key_it); + + const auto new_it = splitted_data.insert(splitted_data.end(), vertices_.extract(split_key_it)); + MG_ASSERT(new_it != splitted_data.end(), "Failed to extract vertex!"); + + split_key_it = next_it; + } + return splitted_data; +} + +std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collected_transactions_, + const VertexContainer &split_vertices, + const PrimaryKey &split_key) { + if (!config_.items.properties_on_edges) { + return std::nullopt; + } + EdgeContainer splitted_edges; + const auto split_vertex_edges = [&](const auto &edges_ref) { + // This is safe since if properties_on_edges is true, the this must be a ptr + for (const auto &edge_ref : edges_ref) { + auto *edge = std::get<2>(edge_ref).ptr; + const auto &other_vtx = std::get<1>(edge_ref); + ScanDeltas(collected_transactions_, edge->delta); + // Check if src and dest edge are both on splitted shard so we know if we + // should remove orphan edge, or make a clone + if (other_vtx.primary_key >= split_key) { + // Remove edge from shard + splitted_edges.insert(edges_.extract(edge->gid)); + } else { + splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}}); + } + } + }; + + for (const auto &vertex : split_vertices) { + split_vertex_edges(vertex.second.in_edges); + split_vertex_edges(vertex.second.out_edges); + } + return splitted_edges; +} + +std::map<uint64_t, std::unique_ptr<Transaction>> Splitter::CollectTransactions( + const std::set<uint64_t> &collected_transactions_, EdgeContainer &cloned_edges, const PrimaryKey &split_key) { + std::map<uint64_t, std::unique_ptr<Transaction>> transactions; + + for (const auto &[commit_start, transaction] : start_logical_id_to_transaction_) { + // We need all transaction whose deltas need to be resolved for any of the + // entities + if (collected_transactions_.contains(transaction->commit_info->start_or_commit_timestamp.logical_id)) { + transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()}); + } + } + + // It is necessary to clone all the transactions first so we have new addresses + // for deltas, before doing alignment of deltas and prev_ptr + AdjustClonedTransactions(transactions, cloned_edges, split_key); + return transactions; +} + +void EraseDeltaChain(auto &transaction, auto &transactions, auto &delta_head_it) { + auto *current_next_delta = delta_head_it->next; + // We need to keep track of delta_head_it in the delta list of current transaction + delta_head_it = transaction.deltas.erase(delta_head_it); + + while (current_next_delta != nullptr) { + auto *next_delta = current_next_delta->next; + // Find next delta transaction delta list + auto current_transaction_it = std::ranges::find_if( + transactions, [&start_or_commit_timestamp = + current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) { + return transaction.second->start_timestamp == start_or_commit_timestamp || + transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp; + }); + MG_ASSERT(current_transaction_it != transactions.end(), "Error when pruning deltas!"); + // Remove the delta + const auto delta_it = + std::ranges::find_if(current_transaction_it->second->deltas, + [current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; }); + if (delta_it != current_transaction_it->second->deltas.end()) { + // If the next delta is next in transaction list replace current_transaction_it + // with the next one + if (current_transaction_it->second->start_timestamp == transaction.start_timestamp && + current_transaction_it == std::next(current_transaction_it)) { + delta_head_it = current_transaction_it->second->deltas.erase(delta_it); + } else { + current_transaction_it->second->deltas.erase(delta_it); + } + } + + current_next_delta = next_delta; + } +} + +void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, + const PrimaryKey &split_key, EdgeContainer &cloned_edges) { + // Remove delta chains that don't point to objects on splitted shard + auto cloned_delta_it = cloned_transaction.deltas.begin(); + + while (cloned_delta_it != cloned_transaction.deltas.end()) { + const auto prev = cloned_delta_it->prev.Get(); + switch (prev.type) { + case PreviousPtr::Type::DELTA: + case PreviousPtr::Type::NULLPTR: + ++cloned_delta_it; + break; + case PreviousPtr::Type::VERTEX: { + if (prev.vertex->first < split_key) { + // We can remove this delta chain + EraseDeltaChain(cloned_transaction, cloned_transactions, cloned_delta_it); + } else { + ++cloned_delta_it; + } + break; + } + case PreviousPtr::Type::EDGE: { + if (const auto edge_gid = prev.edge->gid; !cloned_edges.contains(edge_gid)) { + // We can remove this delta chain + EraseDeltaChain(cloned_transaction, cloned_transactions, cloned_delta_it); + } else { + ++cloned_delta_it; + } + break; + } + } + } +} + +void Splitter::PruneOriginalDeltas(Transaction &transaction, + std::map<uint64_t, std::unique_ptr<Transaction>> &transactions, + const PrimaryKey &split_key) { + // Remove delta chains that don't point to objects on splitted shard + auto delta_it = transaction.deltas.begin(); + + while (delta_it != transaction.deltas.end()) { + const auto prev = delta_it->prev.Get(); + switch (prev.type) { + case PreviousPtr::Type::DELTA: + case PreviousPtr::Type::NULLPTR: + ++delta_it; + break; + case PreviousPtr::Type::VERTEX: { + if (prev.vertex->first >= split_key) { + // We can remove this delta chain + EraseDeltaChain(transaction, transactions, delta_it); + } else { + ++delta_it; + } + break; + } + case PreviousPtr::Type::EDGE: { + if (const auto edge_gid = prev.edge->gid; !edges_.contains(edge_gid)) { + // We can remove this delta chain + EraseDeltaChain(transaction, transactions, delta_it); + } else { + ++delta_it; + } + break; + } + } + } +} + +void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, + EdgeContainer &cloned_edges, const PrimaryKey &split_key) { + for (auto &[start_id, cloned_transaction] : cloned_transactions) { + AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[start_id], cloned_transactions, + cloned_edges); + } + // Prune deltas whose delta chain points to vertex/edge that should not belong on that shard + // Prune must be after adjust, since next, and prev are not set and we cannot follow the chain + for (auto &[start_id, cloned_transaction] : cloned_transactions) { + PruneDeltas(*cloned_transaction, cloned_transactions, split_key, cloned_edges); + } + // Also we need to remove deltas from original transactions + for (auto &[start_id, original_transaction] : start_logical_id_to_transaction_) { + PruneOriginalDeltas(*original_transaction, start_logical_id_to_transaction_, split_key); + } +} + +inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) { + return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE; +} + +bool DoesPrevPtrPointsToSplittedData(const PreviousPtr::Pointer &prev_ptr, const PrimaryKey &split_key) { + return prev_ptr.type == PreviousPtr::Type::VERTEX && prev_ptr.vertex->first < split_key; +} + +void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction, + std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, + EdgeContainer &cloned_edges) { + auto delta_it = transaction.deltas.begin(); + auto cloned_delta_it = cloned_transaction.deltas.begin(); + + while (delta_it != transaction.deltas.end()) { + // We can safely ignore deltas which are not head of delta chain + // Dont' adjust delta chain that points to irrelevant data vertices/edges + if (const auto delta_prev = delta_it->prev.Get(); !IsDeltaHeadOfChain(delta_prev.type)) { + ++delta_it; + ++cloned_delta_it; + continue; + } + + const auto *delta = &*delta_it; + auto *cloned_delta = &*cloned_delta_it; + Delta *cloned_delta_prev_ptr = cloned_delta; + // The head of delta chains contain either vertex/edge as prev ptr so we adjust + // it just at the beginning of delta chain + AdjustDeltaPrevPtr(*delta, *cloned_delta_prev_ptr, cloned_transactions, cloned_edges); + + while (delta->next != nullptr) { + AdjustEdgeRef(*cloned_delta, cloned_edges); + + // Align next ptr and prev ptr + AdjustDeltaNextAndPrev(*delta, *cloned_delta, cloned_transactions); + + // Next delta might not belong to the cloned transaction and thats + // why we skip this delta of the delta chain + if (cloned_delta->next != nullptr) { + cloned_delta = cloned_delta->next; + cloned_delta_prev_ptr = cloned_delta; + } else { + cloned_delta_prev_ptr = nullptr; + } + delta = delta->next; + } + // Align prev ptr + if (cloned_delta_prev_ptr != nullptr) { + AdjustDeltaPrevPtr(*delta, *cloned_delta_prev_ptr, cloned_transactions, cloned_edges); + } + + ++delta_it; + ++cloned_delta_it; + } + MG_ASSERT(delta_it == transaction.deltas.end() && cloned_delta_it == cloned_transaction.deltas.end(), + "Both iterators must be exhausted!"); +} + +void Splitter::AdjustEdgeRef(Delta &cloned_delta, EdgeContainer &cloned_edges) const { + switch (cloned_delta.action) { + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::REMOVE_OUT_EDGE: { + // Find edge + if (config_.items.properties_on_edges) { + if (const auto cloned_edge_it = cloned_edges.find(cloned_delta.vertex_edge.edge.ptr->gid); + cloned_edge_it != cloned_edges.end()) { + cloned_delta.vertex_edge.edge = EdgeRef{&cloned_edge_it->second}; + } + } + break; + } + case Delta::Action::DELETE_OBJECT: + case Delta::Action::RECREATE_OBJECT: + case Delta::Action::SET_PROPERTY: + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: { + // noop + break; + } + } +} + +void Splitter::AdjustDeltaNextAndPrev(const Delta &original, Delta &cloned, + std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions) { + // Get cloned_delta->next transaction, using delta->next original transaction + // cloned_transactions key is start_timestamp + auto cloned_transaction_it = + cloned_transactions.find(original.next->commit_info->start_or_commit_timestamp.logical_id); + if (cloned_transaction_it == cloned_transactions.end()) { + cloned_transaction_it = std::ranges::find_if(cloned_transactions, [&original](const auto &elem) { + return elem.second->commit_info->start_or_commit_timestamp == + original.next->commit_info->start_or_commit_timestamp; + }); + } + // TODO(jbajic) What if next in delta chain does not belong to cloned transaction? + // MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found"); + if (cloned_transaction_it == cloned_transactions.end()) return; + // Find cloned delta in delta list of cloned transaction + auto found_cloned_delta_it = std::ranges::find_if( + cloned_transaction_it->second->deltas, [&original](const auto &elem) { return elem.id == original.next->id; }); + MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), "Delta with given uuid must exist!"); + cloned.next = &*found_cloned_delta_it; + found_cloned_delta_it->prev.Set(&cloned); +} + +void Splitter::AdjustDeltaPrevPtr(const Delta &original, Delta &cloned, + std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, + EdgeContainer &cloned_edges) { + auto ptr = original.prev.Get(); + switch (ptr.type) { + case PreviousPtr::Type::NULLPTR: { + MG_ASSERT(false, "PreviousPtr cannot be a nullptr!"); + break; + } + case PreviousPtr::Type::DELTA: { + // Same as for deltas except don't align next but prev + auto cloned_transaction_it = std::ranges::find_if(cloned_transactions, [&ptr](const auto &elem) { + return elem.second->start_timestamp == ptr.delta->commit_info->start_or_commit_timestamp || + elem.second->commit_info->start_or_commit_timestamp == ptr.delta->commit_info->start_or_commit_timestamp; + }); + MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found"); + // Find cloned delta in delta list of cloned transaction + auto found_cloned_delta_it = + std::ranges::find_if(cloned_transaction_it->second->deltas, + [delta = ptr.delta](const auto &elem) { return elem.id == delta->id; }); + MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), + "Delta with given id must exist!"); + + cloned.prev.Set(&*found_cloned_delta_it); + break; + } + case PreviousPtr::Type::VERTEX: { + // The vertex was extracted and it is safe to reuse address + cloned.prev.Set(ptr.vertex); + ptr.vertex->second.delta = &cloned; + break; + } + case PreviousPtr::Type::EDGE: { + // We can never be here if we have properties on edge disabled + auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid); + ptr.edge->delta = &cloned; + cloned.prev.Set(&cloned_edge->second); + break; + } + }; +} + +} // namespace memgraph::storage::v3 diff --git a/src/storage/v3/splitter.hpp b/src/storage/v3/splitter.hpp new file mode 100644 index 000000000..8cd0f7ae4 --- /dev/null +++ b/src/storage/v3/splitter.hpp @@ -0,0 +1,109 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +#pragma once + +#include <cstdint> +#include <map> +#include <memory> +#include <optional> +#include <set> + +#include "storage/v3/config.hpp" +#include "storage/v3/delta.hpp" +#include "storage/v3/edge.hpp" +#include "storage/v3/id_types.hpp" +#include "storage/v3/indices.hpp" +#include "storage/v3/key_store.hpp" +#include "storage/v3/name_id_mapper.hpp" +#include "storage/v3/schemas.hpp" +#include "storage/v3/transaction.hpp" +#include "storage/v3/vertex.hpp" +#include "utils/concepts.hpp" + +namespace memgraph::storage::v3 { + +// If edge properties-on-edges is false then we don't need to send edges but +// only vertices, since they will contain those edges +struct SplitData { + LabelId primary_label; + PrimaryKey min_primary_key; + std::optional<PrimaryKey> max_primary_key; + std::vector<SchemaProperty> schema; + Config config; + std::unordered_map<uint64_t, std::string> id_to_name; + uint64_t shard_version; + + VertexContainer vertices; + std::optional<EdgeContainer> edges; + std::map<uint64_t, std::unique_ptr<Transaction>> transactions; + std::map<LabelId, LabelIndex::IndexContainer> label_indices; + std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndex::IndexContainer> label_property_indices; +}; + +// TODO(jbajic) Handle deleted_vertices_ and deleted_edges_ after the finishing GC +class Splitter final { + public: + Splitter(LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges, + std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices, + const Config &config, const std::vector<SchemaProperty> &schema, const NameIdMapper &name_id_mapper_); + + Splitter(const Splitter &) = delete; + Splitter(Splitter &&) noexcept = delete; + Splitter &operator=(const Splitter &) = delete; + Splitter operator=(Splitter &&) noexcept = delete; + ~Splitter() = default; + + SplitData SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key, + uint64_t shard_version); + + private: + VertexContainer CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id, + const PrimaryKey &split_key); + + std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id, + const VertexContainer &split_vertices, const PrimaryKey &split_key); + + std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions( + const std::set<uint64_t> &collected_transactions_start_id, EdgeContainer &cloned_edges, + const PrimaryKey &split_key); + + static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, const Delta *delta); + + void PruneOriginalDeltas(Transaction &transaction, std::map<uint64_t, std::unique_ptr<Transaction>> &transactions, + const PrimaryKey &split_key); + + void AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction, + std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, + EdgeContainer &cloned_edges); + + void AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, + EdgeContainer &cloned_edges, const PrimaryKey &split_key); + + void AdjustEdgeRef(Delta &cloned_delta, EdgeContainer &cloned_edges) const; + + static void AdjustDeltaNextAndPrev(const Delta &original, Delta &cloned, + std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions); + + static void AdjustDeltaPrevPtr(const Delta &original, Delta &cloned, + std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions, + EdgeContainer &cloned_edges); + + const LabelId primary_label_; + VertexContainer &vertices_; + EdgeContainer &edges_; + std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction_; + Indices &indices_; + const Config &config_; + const std::vector<SchemaProperty> schema_; + const NameIdMapper &name_id_mapper_; +}; + +} // namespace memgraph::storage::v3 diff --git a/src/storage/v3/transaction.hpp b/src/storage/v3/transaction.hpp index 229e071b7..0428e0c30 100644 --- a/src/storage/v3/transaction.hpp +++ b/src/storage/v3/transaction.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -31,6 +31,15 @@ struct CommitInfo { }; struct Transaction { + Transaction(coordinator::Hlc start_timestamp, CommitInfo new_commit_info, uint64_t command_id, bool must_abort, + bool is_aborted, IsolationLevel isolation_level) + : start_timestamp{start_timestamp}, + commit_info{std::make_unique<CommitInfo>(new_commit_info)}, + command_id(command_id), + must_abort(must_abort), + is_aborted(is_aborted), + isolation_level(isolation_level){}; + Transaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level) : start_timestamp(start_timestamp), commit_info(std::make_unique<CommitInfo>(CommitInfo{false, {start_timestamp}})), @@ -54,6 +63,56 @@ struct Transaction { ~Transaction() {} + std::list<Delta> CopyDeltas(CommitInfo *commit_info) const { + std::list<Delta> copied_deltas; + for (const auto &delta : deltas) { + switch (delta.action) { + case Delta::Action::DELETE_OBJECT: + copied_deltas.emplace_back(Delta::DeleteObjectTag{}, commit_info, delta.id, command_id); + break; + case Delta::Action::RECREATE_OBJECT: + copied_deltas.emplace_back(Delta::RecreateObjectTag{}, commit_info, delta.id, command_id); + break; + case Delta::Action::ADD_LABEL: + copied_deltas.emplace_back(Delta::AddLabelTag{}, delta.label, commit_info, delta.id, command_id); + break; + case Delta::Action::REMOVE_LABEL: + copied_deltas.emplace_back(Delta::RemoveLabelTag{}, delta.label, commit_info, delta.id, command_id); + break; + case Delta::Action::ADD_IN_EDGE: + copied_deltas.emplace_back(Delta::AddInEdgeTag{}, delta.vertex_edge.edge_type, delta.vertex_edge.vertex_id, + delta.vertex_edge.edge, commit_info, delta.id, command_id); + break; + case Delta::Action::ADD_OUT_EDGE: + copied_deltas.emplace_back(Delta::AddOutEdgeTag{}, delta.vertex_edge.edge_type, delta.vertex_edge.vertex_id, + delta.vertex_edge.edge, commit_info, delta.id, command_id); + break; + case Delta::Action::REMOVE_IN_EDGE: + copied_deltas.emplace_back(Delta::RemoveInEdgeTag{}, delta.vertex_edge.edge_type, delta.vertex_edge.vertex_id, + delta.vertex_edge.edge, commit_info, delta.id, command_id); + break; + case Delta::Action::REMOVE_OUT_EDGE: + copied_deltas.emplace_back(Delta::RemoveOutEdgeTag{}, delta.vertex_edge.edge_type, + delta.vertex_edge.vertex_id, delta.vertex_edge.edge, commit_info, delta.id, + command_id); + break; + case Delta::Action::SET_PROPERTY: + copied_deltas.emplace_back(Delta::SetPropertyTag{}, delta.property.key, delta.property.value, commit_info, + delta.id, command_id); + break; + } + } + return copied_deltas; + } + + // This does not solve the whole problem of copying deltas + std::unique_ptr<Transaction> Clone() const { + auto transaction_ptr = std::make_unique<Transaction>(start_timestamp, *commit_info, command_id, must_abort, + is_aborted, isolation_level); + transaction_ptr->deltas = CopyDeltas(transaction_ptr->commit_info.get()); + return transaction_ptr; + } + coordinator::Hlc start_timestamp; std::unique_ptr<CommitInfo> commit_info; uint64_t command_id; diff --git a/tests/benchmark/CMakeLists.txt b/tests/benchmark/CMakeLists.txt index c7cededd8..954809ffc 100644 --- a/tests/benchmark/CMakeLists.txt +++ b/tests/benchmark/CMakeLists.txt @@ -79,3 +79,12 @@ target_link_libraries(${test_prefix}data_structures_contains mg-utils mg-storage add_benchmark(data_structures_remove.cpp) target_link_libraries(${test_prefix}data_structures_remove mg-utils mg-storage-v3) + +add_benchmark(storage_v3_split.cpp) +target_link_libraries(${test_prefix}storage_v3_split mg-storage-v3 mg-query-v2) + +add_benchmark(storage_v3_split_1.cpp) +target_link_libraries(${test_prefix}storage_v3_split_1 mg-storage-v3 mg-query-v2) + +add_benchmark(storage_v3_split_2.cpp) +target_link_libraries(${test_prefix}storage_v3_split_2 mg-storage-v3 mg-query-v2) diff --git a/tests/benchmark/storage_v3_split.cpp b/tests/benchmark/storage_v3_split.cpp new file mode 100644 index 000000000..de8735f8e --- /dev/null +++ b/tests/benchmark/storage_v3_split.cpp @@ -0,0 +1,249 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include <cstdint> +#include <optional> +#include <vector> + +#include <benchmark/benchmark.h> +#include <gflags/gflags.h> + +#include "storage/v3/id_types.hpp" +#include "storage/v3/key_store.hpp" +#include "storage/v3/property_value.hpp" +#include "storage/v3/shard.hpp" +#include "storage/v3/vertex.hpp" +#include "storage/v3/vertex_id.hpp" + +namespace memgraph::benchmark { + +class ShardSplitBenchmark : public ::benchmark::Fixture { + protected: + using PrimaryKey = storage::v3::PrimaryKey; + using PropertyId = storage::v3::PropertyId; + using PropertyValue = storage::v3::PropertyValue; + using LabelId = storage::v3::LabelId; + using EdgeTypeId = storage::v3::EdgeTypeId; + using Shard = storage::v3::Shard; + using VertexId = storage::v3::VertexId; + using Gid = storage::v3::Gid; + + void SetUp(const ::benchmark::State &state) override { + storage.emplace(primary_label, min_pk, std::nullopt, schema_property_vector); + storage->StoreMapping( + {{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}}); + } + + void TearDown(const ::benchmark::State &) override { storage = std::nullopt; } + + const PropertyId primary_property{PropertyId::FromUint(2)}; + const PropertyId secondary_property{PropertyId::FromUint(5)}; + std::vector<storage::v3::SchemaProperty> schema_property_vector = { + storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}; + const std::vector<PropertyValue> min_pk{PropertyValue{0}}; + const LabelId primary_label{LabelId::FromUint(1)}; + const LabelId secondary_label{LabelId::FromUint(4)}; + const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)}; + std::optional<Shard> storage; + + coordinator::Hlc last_hlc{0, io::Time{}}; + + coordinator::Hlc GetNextHlc() { + ++last_hlc.logical_id; + last_hlc.coordinator_wall_clock += std::chrono::seconds(1); + return last_hlc; + } +}; + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplit)(::benchmark::State &state) { + const auto number_of_vertices{state.range(0)}; + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices); + + for (int64_t i{0}; i < number_of_vertices; ++i) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)}, + {{secondary_property, PropertyValue(i)}}) + .HasValue(), + "Failed creating with pk {}", i); + if (i > 1) { + const auto vtx1 = uniform_dist(e1) % i; + const auto vtx2 = uniform_dist(e1) % i; + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + for (auto _ : state) { + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithGc)(::benchmark::State &state) { + const auto number_of_vertices{state.range(0)}; + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices); + + for (int64_t i{0}; i < number_of_vertices; ++i) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)}, + {{secondary_property, PropertyValue(i)}}) + .HasValue(), + "Failed creating with pk {}", i); + if (i > 1) { + const auto vtx1 = uniform_dist(e1) % i; + const auto vtx2 = uniform_dist(e1) % i; + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + storage->CollectGarbage(GetNextHlc().coordinator_wall_clock); + for (auto _ : state) { + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)(::benchmark::State &state) { + const auto number_of_vertices = state.range(0); + const auto number_of_edges = state.range(1); + const auto number_of_transactions = state.range(2); + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices - number_of_transactions - 1); + + // Create Vertices + int64_t vertex_count{0}; + { + auto acc = storage->Access(GetNextHlc()); + for (; vertex_count < number_of_vertices - number_of_transactions; ++vertex_count) { + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)}, + {{secondary_property, PropertyValue(vertex_count)}}) + .HasValue(), + "Failed creating with pk {}", vertex_count); + } + + // Create Edges + for (int64_t i{0}; i < number_of_edges; ++i) { + const auto vtx1 = uniform_dist(e1); + const auto vtx2 = uniform_dist(e1); + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + // Clean up transactional data + storage->CollectGarbage(GetNextHlc().coordinator_wall_clock); + + // Create rest of the objects and leave transactions + for (; vertex_count < number_of_vertices; ++vertex_count) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)}, + {{secondary_property, PropertyValue(vertex_count)}}) + .HasValue(), + "Failed creating with pk {}", vertex_count); + acc.Commit(GetNextHlc()); + } + + for (auto _ : state) { + // Don't create shard since shard deallocation can take some time as well + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +// Range: +// Number of vertices +// This run is pessimistic, number of vertices corresponds with number if transactions +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplit) +// ->RangeMultiplier(10) +// ->Range(100'000, 100'000) +// ->Unit(::benchmark::kMillisecond); + +// Range: +// Number of vertices +// This run is optimistic, in this run there are no transactions +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithGc) +// ->RangeMultiplier(10) +// ->Range(100'000, 1'000'000) +// ->Unit(::benchmark::kMillisecond); + +// Args: +// Number of vertices +// Number of edges +// Number of transaction +BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) + ->Args({100'000, 100'000, 100}) + ->Args({200'000, 100'000, 100}) + ->Args({300'000, 100'000, 100}) + ->Args({400'000, 100'000, 100}) + ->Args({500'000, 100'000, 100}) + ->Args({600'000, 100'000, 100}) + ->Args({700'000, 100'000, 100}) + ->Args({800'000, 100'000, 100}) + ->Args({900'000, 100'000, 100}) + ->Args({1'000'000, 100'000, 100}) + ->Args({2'000'000, 100'000, 100}) + ->Args({3'000'000, 100'000, 100}) + ->Args({4'000'000, 100'000, 100}) + ->Args({5'000'000, 100'000, 100}) + ->Args({6'000'000, 100'000, 100}) + ->Args({7'000'000, 100'000, 100}) + ->Args({8'000'000, 100'000, 100}) + ->Args({9'000'000, 100'000, 100}) + ->Args({10'000'000, 100'000, 100}) + ->Unit(::benchmark::kMillisecond) + ->Name("IncreaseVertices"); + +BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) + ->Args({100'000, 100'000, 100}) + ->Args({100'000, 200'000, 100}) + ->Args({100'000, 300'000, 100}) + ->Args({100'000, 400'000, 100}) + ->Args({100'000, 500'000, 100}) + ->Args({100'000, 600'000, 100}) + ->Args({100'000, 700'000, 100}) + ->Args({100'000, 800'000, 100}) + ->Args({100'000, 900'000, 100}) + ->Args({100'000, 1'000'000, 100}) + ->Args({100'000, 2'000'000, 100}) + ->Args({100'000, 3'000'000, 100}) + ->Args({100'000, 4'000'000, 100}) + ->Args({100'000, 5'000'000, 100}) + ->Args({100'000, 6'000'000, 100}) + ->Args({100'000, 7'000'000, 100}) + ->Args({100'000, 8'000'000, 100}) + ->Args({100'000, 9'000'000, 100}) + ->Args({100'000, 10'000'000, 100}) + ->Unit(::benchmark::kMillisecond) + ->Name("IncreaseEdges"); + +BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) + ->Args({100'000, 100'000, 100}) + ->Args({100'000, 100'000, 1'000}) + ->Args({100'000, 100'000, 10'000}) + ->Args({100'000, 100'000, 100'000}) + ->Unit(::benchmark::kMillisecond) + ->Name("IncreaseTransactions"); + +} // namespace memgraph::benchmark + +BENCHMARK_MAIN(); diff --git a/tests/benchmark/storage_v3_split_1.cpp b/tests/benchmark/storage_v3_split_1.cpp new file mode 100644 index 000000000..ebfe97c05 --- /dev/null +++ b/tests/benchmark/storage_v3_split_1.cpp @@ -0,0 +1,270 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include <cstdint> +#include <optional> +#include <vector> + +#include <benchmark/benchmark.h> +#include <gflags/gflags.h> + +#include "storage/v3/id_types.hpp" +#include "storage/v3/key_store.hpp" +#include "storage/v3/property_value.hpp" +#include "storage/v3/shard.hpp" +#include "storage/v3/vertex.hpp" +#include "storage/v3/vertex_id.hpp" + +namespace memgraph::benchmark { + +class ShardSplitBenchmark : public ::benchmark::Fixture { + protected: + using PrimaryKey = storage::v3::PrimaryKey; + using PropertyId = storage::v3::PropertyId; + using PropertyValue = storage::v3::PropertyValue; + using LabelId = storage::v3::LabelId; + using EdgeTypeId = storage::v3::EdgeTypeId; + using Shard = storage::v3::Shard; + using VertexId = storage::v3::VertexId; + using Gid = storage::v3::Gid; + + void SetUp(const ::benchmark::State &state) override { + storage.emplace(primary_label, min_pk, std::nullopt, schema_property_vector); + storage->StoreMapping( + {{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}}); + } + + void TearDown(const ::benchmark::State &) override { storage = std::nullopt; } + + const PropertyId primary_property{PropertyId::FromUint(2)}; + const PropertyId secondary_property{PropertyId::FromUint(5)}; + std::vector<storage::v3::SchemaProperty> schema_property_vector = { + storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}; + const std::vector<PropertyValue> min_pk{PropertyValue{0}}; + const LabelId primary_label{LabelId::FromUint(1)}; + const LabelId secondary_label{LabelId::FromUint(4)}; + const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)}; + std::optional<Shard> storage; + + coordinator::Hlc last_hlc{0, io::Time{}}; + + coordinator::Hlc GetNextHlc() { + ++last_hlc.logical_id; + last_hlc.coordinator_wall_clock += std::chrono::seconds(1); + return last_hlc; + } +}; + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplit)(::benchmark::State &state) { + const auto number_of_vertices{state.range(0)}; + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices); + + for (int64_t i{0}; i < number_of_vertices; ++i) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)}, + {{secondary_property, PropertyValue(i)}}) + .HasValue(), + "Failed creating with pk {}", i); + if (i > 1) { + const auto vtx1 = uniform_dist(e1) % i; + const auto vtx2 = uniform_dist(e1) % i; + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + for (auto _ : state) { + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithGc)(::benchmark::State &state) { + const auto number_of_vertices{state.range(0)}; + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices); + + for (int64_t i{0}; i < number_of_vertices; ++i) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)}, + {{secondary_property, PropertyValue(i)}}) + .HasValue(), + "Failed creating with pk {}", i); + if (i > 1) { + const auto vtx1 = uniform_dist(e1) % i; + const auto vtx2 = uniform_dist(e1) % i; + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + storage->CollectGarbage(GetNextHlc().coordinator_wall_clock); + for (auto _ : state) { + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)(::benchmark::State &state) { + const auto number_of_vertices = state.range(0); + const auto number_of_edges = state.range(1); + const auto number_of_transactions = state.range(2); + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices - number_of_transactions - 1); + + // Create Vertices + int64_t vertex_count{0}; + { + auto acc = storage->Access(GetNextHlc()); + for (; vertex_count < number_of_vertices - number_of_transactions; ++vertex_count) { + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)}, + {{secondary_property, PropertyValue(vertex_count)}}) + .HasValue(), + "Failed creating with pk {}", vertex_count); + } + + // Create Edges + for (int64_t i{0}; i < number_of_edges; ++i) { + const auto vtx1 = uniform_dist(e1); + const auto vtx2 = uniform_dist(e1); + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + // Clean up transactional data + storage->CollectGarbage(GetNextHlc().coordinator_wall_clock); + + // Create rest of the objects and leave transactions + for (; vertex_count < number_of_vertices; ++vertex_count) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)}, + {{secondary_property, PropertyValue(vertex_count)}}) + .HasValue(), + "Failed creating with pk {}", vertex_count); + acc.Commit(GetNextHlc()); + } + + for (auto _ : state) { + // Don't create shard since shard deallocation can take some time as well + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +// Range: +// Number of vertices +// This run is pessimistic, number of vertices corresponds with number if transactions +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplit) +// ->RangeMultiplier(10) +// ->Range(100'000, 100'000) +// ->Unit(::benchmark::kMillisecond); + +// Range: +// Number of vertices +// This run is optimistic, in this run there are no transactions +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithGc) +// ->RangeMultiplier(10) +// ->Range(100'000, 1'000'000) +// ->Unit(::benchmark::kMillisecond); + +// Args: +// Number of vertices +// Number of edges +// Number of transaction +BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) + // ->Args({100'000, 100'000, 100}) + // ->Args({200'000, 100'000, 100}) + // ->Args({300'000, 100'000, 100}) + // ->Args({400'000, 100'000, 100}) + // ->Args({500'000, 100'000, 100}) + // ->Args({600'000, 100'000, 100}) + // ->Args({700'000, 100'000, 100}) + // ->Args({800'000, 100'000, 100}) + ->Args({900'000, 100'000, 100}) + // ->Args({1'000'000, 100'000, 100}) + // ->Args({2'000'000, 100'000, 100}) + // ->Args({3'000'000, 100'000, 100}) + // ->Args({4'000'000, 100'000, 100}) + // ->Args({6'000'000, 100'000, 100}) + // ->Args({7'000'000, 100'000, 100}) + // ->Args({8'000'000, 100'000, 100}) + // ->Args({9'000'000, 100'000, 100}) + // ->Args({10'000'000, 100'000, 100}) + ->Unit(::benchmark::kMillisecond) + ->Name("IncreaseVertices"); + +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) +// ->Args({100'000, 100'000, 100}) +// ->Args({200'000, 100'000, 100}) +// ->Args({300'000, 100'000, 100}) +// ->Args({400'000, 100'000, 100}) +// ->Args({500'000, 100'000, 100}) +// ->Args({600'000, 100'000, 100}) +// ->Args({700'000, 100'000, 100}) +// ->Args({800'000, 100'000, 100}) +// ->Args({900'000, 100'000, 100}) +// ->Args({1'000'000, 100'000, 100}) +// ->Args({2'000'000, 100'000, 100}) +// ->Args({3'000'000, 100'000, 100}) +// ->Args({4'000'000, 100'000, 100}) +// ->Args({6'000'000, 100'000, 100}) +// ->Args({7'000'000, 100'000, 100}) +// ->Args({8'000'000, 100'000, 100}) +// ->Args({9'000'000, 100'000, 100}) +// ->Args({10'000'000, 100'000, 100}) +// ->Unit(::benchmark::kMillisecond) +// ->Name("IncreaseVertices"); + +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) +// ->Args({100'000, 100'000, 100}) +// ->Args({100'000, 200'000, 100}) +// ->Args({100'000, 300'000, 100}) +// ->Args({100'000, 400'000, 100}) +// ->Args({100'000, 500'000, 100}) +// ->Args({100'000, 600'000, 100}) +// ->Args({100'000, 700'000, 100}) +// ->Args({100'000, 800'000, 100}) +// ->Args({100'000, 900'000, 100}) +// ->Args({100'000, 1'000'000, 100}) +// ->Args({100'000, 2'000'000, 100}) +// ->Args({100'000, 3'000'000, 100}) +// ->Args({100'000, 4'000'000, 100}) +// ->Args({100'000, 5'000'000, 100}) +// ->Args({100'000, 6'000'000, 100}) +// ->Args({100'000, 7'000'000, 100}) +// ->Args({100'000, 8'000'000, 100}) +// ->Args({100'000, 9'000'000, 100}) +// ->Args({100'000, 10'000'000, 100}) +// ->Unit(::benchmark::kMillisecond) +// ->Name("IncreaseEdges"); + +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) +// ->Args({100'000, 100'000, 100}) +// ->Args({100'000, 100'000, 1'000}) +// ->Args({100'000, 100'000, 10'000}) +// ->Args({100'000, 100'000, 100'000}) +// ->Unit(::benchmark::kMillisecond) +// ->Name("IncreaseTransactions"); + +} // namespace memgraph::benchmark + +BENCHMARK_MAIN(); diff --git a/tests/benchmark/storage_v3_split_2.cpp b/tests/benchmark/storage_v3_split_2.cpp new file mode 100644 index 000000000..39c5c1ead --- /dev/null +++ b/tests/benchmark/storage_v3_split_2.cpp @@ -0,0 +1,270 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include <cstdint> +#include <optional> +#include <vector> + +#include <benchmark/benchmark.h> +#include <gflags/gflags.h> + +#include "storage/v3/id_types.hpp" +#include "storage/v3/key_store.hpp" +#include "storage/v3/property_value.hpp" +#include "storage/v3/shard.hpp" +#include "storage/v3/vertex.hpp" +#include "storage/v3/vertex_id.hpp" + +namespace memgraph::benchmark { + +class ShardSplitBenchmark : public ::benchmark::Fixture { + protected: + using PrimaryKey = storage::v3::PrimaryKey; + using PropertyId = storage::v3::PropertyId; + using PropertyValue = storage::v3::PropertyValue; + using LabelId = storage::v3::LabelId; + using EdgeTypeId = storage::v3::EdgeTypeId; + using Shard = storage::v3::Shard; + using VertexId = storage::v3::VertexId; + using Gid = storage::v3::Gid; + + void SetUp(const ::benchmark::State &state) override { + storage.emplace(primary_label, min_pk, std::nullopt, schema_property_vector); + storage->StoreMapping( + {{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}}); + } + + void TearDown(const ::benchmark::State &) override { storage = std::nullopt; } + + const PropertyId primary_property{PropertyId::FromUint(2)}; + const PropertyId secondary_property{PropertyId::FromUint(5)}; + std::vector<storage::v3::SchemaProperty> schema_property_vector = { + storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}; + const std::vector<PropertyValue> min_pk{PropertyValue{0}}; + const LabelId primary_label{LabelId::FromUint(1)}; + const LabelId secondary_label{LabelId::FromUint(4)}; + const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)}; + std::optional<Shard> storage; + + coordinator::Hlc last_hlc{0, io::Time{}}; + + coordinator::Hlc GetNextHlc() { + ++last_hlc.logical_id; + last_hlc.coordinator_wall_clock += std::chrono::seconds(1); + return last_hlc; + } +}; + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplit)(::benchmark::State &state) { + const auto number_of_vertices{state.range(0)}; + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices); + + for (int64_t i{0}; i < number_of_vertices; ++i) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)}, + {{secondary_property, PropertyValue(i)}}) + .HasValue(), + "Failed creating with pk {}", i); + if (i > 1) { + const auto vtx1 = uniform_dist(e1) % i; + const auto vtx2 = uniform_dist(e1) % i; + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + for (auto _ : state) { + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithGc)(::benchmark::State &state) { + const auto number_of_vertices{state.range(0)}; + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices); + + for (int64_t i{0}; i < number_of_vertices; ++i) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)}, + {{secondary_property, PropertyValue(i)}}) + .HasValue(), + "Failed creating with pk {}", i); + if (i > 1) { + const auto vtx1 = uniform_dist(e1) % i; + const auto vtx2 = uniform_dist(e1) % i; + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + storage->CollectGarbage(GetNextHlc().coordinator_wall_clock); + for (auto _ : state) { + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)(::benchmark::State &state) { + const auto number_of_vertices = state.range(0); + const auto number_of_edges = state.range(1); + const auto number_of_transactions = state.range(2); + std::random_device r; + std::default_random_engine e1(r()); + std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices - number_of_transactions - 1); + + // Create Vertices + int64_t vertex_count{0}; + { + auto acc = storage->Access(GetNextHlc()); + for (; vertex_count < number_of_vertices - number_of_transactions; ++vertex_count) { + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)}, + {{secondary_property, PropertyValue(vertex_count)}}) + .HasValue(), + "Failed creating with pk {}", vertex_count); + } + + // Create Edges + for (int64_t i{0}; i < number_of_edges; ++i) { + const auto vtx1 = uniform_dist(e1); + const auto vtx2 = uniform_dist(e1); + + MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}}, + VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i)) + .HasValue(), + "Failed on {} and {}", vtx1, vtx2); + } + acc.Commit(GetNextHlc()); + } + // Clean up transactional data + storage->CollectGarbage(GetNextHlc().coordinator_wall_clock); + + // Create rest of the objects and leave transactions + for (; vertex_count < number_of_vertices; ++vertex_count) { + auto acc = storage->Access(GetNextHlc()); + MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)}, + {{secondary_property, PropertyValue(vertex_count)}}) + .HasValue(), + "Failed creating with pk {}", vertex_count); + acc.Commit(GetNextHlc()); + } + + for (auto _ : state) { + // Don't create shard since shard deallocation can take some time as well + auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2); + } +} + +// Range: +// Number of vertices +// This run is pessimistic, number of vertices corresponds with number if transactions +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplit) +// ->RangeMultiplier(10) +// ->Range(100'000, 100'000) +// ->Unit(::benchmark::kMillisecond); + +// Range: +// Number of vertices +// This run is optimistic, in this run there are no transactions +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithGc) +// ->RangeMultiplier(10) +// ->Range(100'000, 1'000'000) +// ->Unit(::benchmark::kMillisecond); + +// Args: +// Number of vertices +// Number of edges +// Number of transaction +BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) + // ->Args({100'000, 100'000, 100}) + // ->Args({200'000, 100'000, 100}) + // ->Args({300'000, 100'000, 100}) + // ->Args({400'000, 100'000, 100}) + // ->Args({500'000, 100'000, 100}) + // ->Args({600'000, 100'000, 100}) + // ->Args({700'000, 100'000, 100}) + // ->Args({800'000, 100'000, 100}) + // ->Args({900'000, 100'000, 100}) + ->Args({1'000'000, 100'000, 100}) + // ->Args({2'000'000, 100'000, 100}) + // ->Args({3'000'000, 100'000, 100}) + // ->Args({4'000'000, 100'000, 100}) + // ->Args({6'000'000, 100'000, 100}) + // ->Args({7'000'000, 100'000, 100}) + // ->Args({8'000'000, 100'000, 100}) + // ->Args({9'000'000, 100'000, 100}) + // ->Args({10'000'000, 100'000, 100}) + ->Unit(::benchmark::kMillisecond) + ->Name("IncreaseVertices"); + +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) +// ->Args({100'000, 100'000, 100}) +// ->Args({200'000, 100'000, 100}) +// ->Args({300'000, 100'000, 100}) +// ->Args({400'000, 100'000, 100}) +// ->Args({500'000, 100'000, 100}) +// ->Args({600'000, 100'000, 100}) +// ->Args({700'000, 100'000, 100}) +// ->Args({800'000, 100'000, 100}) +// ->Args({900'000, 100'000, 100}) +// ->Args({1'000'000, 100'000, 100}) +// ->Args({2'000'000, 100'000, 100}) +// ->Args({3'000'000, 100'000, 100}) +// ->Args({4'000'000, 100'000, 100}) +// ->Args({6'000'000, 100'000, 100}) +// ->Args({7'000'000, 100'000, 100}) +// ->Args({8'000'000, 100'000, 100}) +// ->Args({9'000'000, 100'000, 100}) +// ->Args({10'000'000, 100'000, 100}) +// ->Unit(::benchmark::kMillisecond) +// ->Name("IncreaseVertices"); + +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) +// ->Args({100'000, 100'000, 100}) +// ->Args({100'000, 200'000, 100}) +// ->Args({100'000, 300'000, 100}) +// ->Args({100'000, 400'000, 100}) +// ->Args({100'000, 500'000, 100}) +// ->Args({100'000, 600'000, 100}) +// ->Args({100'000, 700'000, 100}) +// ->Args({100'000, 800'000, 100}) +// ->Args({100'000, 900'000, 100}) +// ->Args({100'000, 1'000'000, 100}) +// ->Args({100'000, 2'000'000, 100}) +// ->Args({100'000, 3'000'000, 100}) +// ->Args({100'000, 4'000'000, 100}) +// ->Args({100'000, 5'000'000, 100}) +// ->Args({100'000, 6'000'000, 100}) +// ->Args({100'000, 7'000'000, 100}) +// ->Args({100'000, 8'000'000, 100}) +// ->Args({100'000, 9'000'000, 100}) +// ->Args({100'000, 10'000'000, 100}) +// ->Unit(::benchmark::kMillisecond) +// ->Name("IncreaseEdges"); + +// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices) +// ->Args({100'000, 100'000, 100}) +// ->Args({100'000, 100'000, 1'000}) +// ->Args({100'000, 100'000, 10'000}) +// ->Args({100'000, 100'000, 100'000}) +// ->Unit(::benchmark::kMillisecond) +// ->Name("IncreaseTransactions"); + +} // namespace memgraph::benchmark + +BENCHMARK_MAIN(); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index ae747385b..e59e5cb12 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -294,6 +294,9 @@ target_link_libraries(${test_prefix}storage_v3_expr mg-storage-v3 mg-expr) add_unit_test(storage_v3_schema.cpp) target_link_libraries(${test_prefix}storage_v3_schema mg-storage-v3) +add_unit_test(storage_v3_shard_split.cpp) +target_link_libraries(${test_prefix}storage_v3_shard_split mg-storage-v3 mg-query-v2) + # Test mg-query-v2 # These are commented out because of the new TypedValue in the query engine # add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) diff --git a/tests/unit/storage_v3_shard_split.cpp b/tests/unit/storage_v3_shard_split.cpp new file mode 100644 index 000000000..c91a11004 --- /dev/null +++ b/tests/unit/storage_v3_shard_split.cpp @@ -0,0 +1,501 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include <cstdint> +#include <memory> + +#include <gmock/gmock-matchers.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include "coordinator/hybrid_logical_clock.hpp" +#include "query/v2/requests.hpp" +#include "storage/v3/delta.hpp" +#include "storage/v3/id_types.hpp" +#include "storage/v3/key_store.hpp" +#include "storage/v3/mvcc.hpp" +#include "storage/v3/property_value.hpp" +#include "storage/v3/shard.hpp" +#include "storage/v3/vertex.hpp" +#include "storage/v3/vertex_id.hpp" + +using testing::Pair; +using testing::UnorderedElementsAre; + +namespace memgraph::storage::v3::tests { + +class ShardSplitTest : public testing::Test { + protected: + void SetUp() override { + storage.StoreMapping( + {{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}}); + } + + const PropertyId primary_property{PropertyId::FromUint(2)}; + const PropertyId secondary_property{PropertyId::FromUint(5)}; + std::vector<storage::v3::SchemaProperty> schema_property_vector = { + storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}}; + const std::vector<PropertyValue> min_pk{PropertyValue{0}}; + const LabelId primary_label{LabelId::FromUint(1)}; + const LabelId secondary_label{LabelId::FromUint(4)}; + const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)}; + Shard storage{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector}; + + coordinator::Hlc last_hlc{0, io::Time{}}; + + coordinator::Hlc GetNextHlc() { + ++last_hlc.logical_id; + last_hlc.coordinator_wall_clock += std::chrono::seconds(1); + return last_hlc; + } + + void AssertShardState(auto &shard, const int split_min, const int split_max) { + auto acc = shard.Access(GetNextHlc()); + for (int i{0}; i < split_min; ++i) { + EXPECT_FALSE(acc.FindVertex(PrimaryKey{{PropertyValue(i)}}, View::OLD).has_value()); + } + for (int i{split_min}; i < split_max; ++i) { + const auto vtx = acc.FindVertex(PrimaryKey{{PropertyValue(i)}}, View::OLD); + ASSERT_TRUE(vtx.has_value()); + EXPECT_TRUE(vtx->InEdges(View::OLD)->size() == 1 || vtx->OutEdges(View::OLD)->size() == 1); + } + } +}; + +void AssertEqVertexContainer(const VertexContainer &actual, const VertexContainer &expected) { + ASSERT_EQ(actual.size(), expected.size()); + + auto expected_it = expected.begin(); + auto actual_it = actual.begin(); + while (expected_it != expected.end()) { + EXPECT_EQ(actual_it->first, expected_it->first); + EXPECT_EQ(actual_it->second.deleted, expected_it->second.deleted); + EXPECT_EQ(actual_it->second.labels, expected_it->second.labels); + + auto *expected_delta = expected_it->second.delta; + auto *actual_delta = actual_it->second.delta; + // This asserts delta chain + while (expected_delta != nullptr) { + EXPECT_EQ(actual_delta->action, expected_delta->action); + EXPECT_EQ(actual_delta->id, expected_delta->id); + + switch (expected_delta->action) { + case Delta::Action::ADD_LABEL: + case Delta::Action::REMOVE_LABEL: { + EXPECT_EQ(actual_delta->label, expected_delta->label); + break; + } + case Delta::Action::SET_PROPERTY: { + EXPECT_EQ(actual_delta->property.key, expected_delta->property.key); + EXPECT_EQ(actual_delta->property.value, expected_delta->property.value); + break; + } + case Delta::Action::ADD_IN_EDGE: + case Delta::Action::ADD_OUT_EDGE: + case Delta::Action::REMOVE_IN_EDGE: + case Delta::Action::RECREATE_OBJECT: + case Delta::Action::DELETE_OBJECT: + case Delta::Action::REMOVE_OUT_EDGE: { + break; + } + } + + const auto expected_prev = expected_delta->prev.Get(); + const auto actual_prev = actual_delta->prev.Get(); + switch (expected_prev.type) { + case PreviousPtr::Type::NULLPTR: { + ASSERT_EQ(actual_prev.type, PreviousPtr::Type::NULLPTR) << "Expected type is nullptr!"; + break; + } + case PreviousPtr::Type::DELTA: { + ASSERT_EQ(actual_prev.type, PreviousPtr::Type::DELTA) << "Expected type is delta!"; + EXPECT_EQ(actual_prev.delta->action, expected_prev.delta->action); + EXPECT_EQ(actual_prev.delta->id, expected_prev.delta->id); + break; + } + case v3::PreviousPtr::Type::EDGE: { + ASSERT_EQ(actual_prev.type, PreviousPtr::Type::EDGE) << "Expected type is edge!"; + EXPECT_EQ(actual_prev.edge->gid, expected_prev.edge->gid); + break; + } + case v3::PreviousPtr::Type::VERTEX: { + ASSERT_EQ(actual_prev.type, PreviousPtr::Type::VERTEX) << "Expected type is vertex!"; + EXPECT_EQ(actual_prev.vertex->first, expected_prev.vertex->first); + break; + } + } + + expected_delta = expected_delta->next; + actual_delta = actual_delta->next; + } + EXPECT_EQ(expected_delta, nullptr); + EXPECT_EQ(actual_delta, nullptr); + ++expected_it; + ++actual_it; + } +} + +void AssertEqDeltaLists(const std::list<Delta> &actual, const std::list<Delta> &expected) { + EXPECT_EQ(actual.size(), expected.size()); + auto actual_it = actual.begin(); + auto expected_it = expected.begin(); + while (actual_it != actual.end()) { + EXPECT_EQ(actual_it->id, expected_it->id); + EXPECT_EQ(actual_it->action, expected_it->action); + ++actual_it; + ++expected_it; + } +} + +void AddDeltaToDeltaChain(Vertex *object, Delta *new_delta) { + auto *delta_holder = GetDeltaHolder(object); + + new_delta->next = delta_holder->delta; + new_delta->prev.Set(object); + if (delta_holder->delta) { + delta_holder->delta->prev.Set(new_delta); + } + delta_holder->delta = new_delta; +} + +TEST_F(ShardSplitTest, TestBasicSplitWithVertices) { + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE( + acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {{secondary_property, PropertyValue(121)}}) + .HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError()); + auto current_hlc = GetNextHlc(); + acc.Commit(current_hlc); + + auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2); + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 0); + EXPECT_EQ(splitted_data.transactions.size(), 1); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 0); + + CommitInfo commit_info{.start_or_commit_timestamp = current_hlc}; + Delta delta_delete1{Delta::DeleteObjectTag{}, &commit_info, 4, 1}; + Delta delta_delete2{Delta::DeleteObjectTag{}, &commit_info, 5, 2}; + Delta delta_remove_label{Delta::RemoveLabelTag{}, secondary_label, &commit_info, 7, 4}; + Delta delta_set_property{Delta::SetPropertyTag{}, secondary_property, PropertyValue(), &commit_info, 6, 4}; + Delta delta_delete3{Delta::DeleteObjectTag{}, &commit_info, 8, 3}; + + VertexContainer expected_vertices; + auto [it_4, inserted1] = expected_vertices.emplace(PrimaryKey{PropertyValue{4}}, VertexData(&delta_delete1)); + delta_delete1.prev.Set(&*it_4); + auto [it_5, inserted2] = expected_vertices.emplace(PrimaryKey{PropertyValue{5}}, VertexData(&delta_delete2)); + delta_delete2.prev.Set(&*it_5); + auto [it_6, inserted3] = expected_vertices.emplace(PrimaryKey{PropertyValue{6}}, VertexData(&delta_delete3)); + delta_delete3.prev.Set(&*it_6); + it_5->second.labels.push_back(secondary_label); + AddDeltaToDeltaChain(&*it_5, &delta_set_property); + AddDeltaToDeltaChain(&*it_5, &delta_remove_label); + + AssertEqVertexContainer(splitted_data.vertices, expected_vertices); + + // This is to ensure that the transaction that we have don't point to invalid + // object on the other shard + std::list<Delta> expected_deltas; + expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 4, 1); + expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 5, 2); + expected_deltas.emplace_back(Delta::SetPropertyTag{}, secondary_property, PropertyValue(), &commit_info, 6, 4); + expected_deltas.emplace_back(Delta::RemoveLabelTag{}, secondary_label, &commit_info, 7, 4); + expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 8, 3); + AssertEqDeltaLists(splitted_data.transactions.begin()->second->deltas, expected_deltas); +} + +TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) { + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError()); + + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2)) + .HasError()); + + auto current_hlc = GetNextHlc(); + acc.Commit(current_hlc); + + auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2); + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 2); + EXPECT_EQ(splitted_data.transactions.size(), 1); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 0); + + CommitInfo commit_info{.start_or_commit_timestamp = current_hlc}; + Delta delta_delete1{Delta::DeleteObjectTag{}, &commit_info, 12, 1}; + Delta delta_delete2{Delta::DeleteObjectTag{}, &commit_info, 13, 1}; + Delta delta_delete3{Delta::DeleteObjectTag{}, &commit_info, 14, 1}; + Delta delta_add_in_edge1{Delta::RemoveInEdgeTag{}, + edge_type_id, + VertexId{primary_label, {PropertyValue(1)}}, + EdgeRef{Gid::FromUint(1)}, + &commit_info, + 17, + 1}; + Delta delta_add_out_edge2{Delta::RemoveOutEdgeTag{}, + edge_type_id, + VertexId{primary_label, {PropertyValue(6)}}, + EdgeRef{Gid::FromUint(2)}, + &commit_info, + 19, + 1}; + Delta delta_add_in_edge2{Delta::RemoveInEdgeTag{}, + edge_type_id, + VertexId{primary_label, {PropertyValue(4)}}, + EdgeRef{Gid::FromUint(2)}, + &commit_info, + 20, + 1}; + VertexContainer expected_vertices; + auto [vtx4, inserted4] = expected_vertices.emplace(PrimaryKey{PropertyValue{4}}, VertexData(&delta_delete1)); + auto [vtx5, inserted5] = expected_vertices.emplace(PrimaryKey{PropertyValue{5}}, VertexData(&delta_delete2)); + auto [vtx6, inserted6] = expected_vertices.emplace(PrimaryKey{PropertyValue{6}}, VertexData(&delta_delete3)); + AddDeltaToDeltaChain(&*vtx4, &delta_add_out_edge2); + AddDeltaToDeltaChain(&*vtx5, &delta_add_in_edge1); + AddDeltaToDeltaChain(&*vtx6, &delta_add_in_edge2); + + AssertEqVertexContainer(splitted_data.vertices, expected_vertices); +} + +TEST_F(ShardSplitTest, TestBasicSplitBeforeCommit) { + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError()); + + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2)) + .HasError()); + + auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2); + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 2); + EXPECT_EQ(splitted_data.transactions.size(), 1); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 0); +} + +TEST_F(ShardSplitTest, TestBasicSplitWithCommitedAndOngoingTransactions) { + { + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError()); + + acc.Commit(GetNextHlc()); + } + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(3)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2)) + .HasError()); + + auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2); + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 2); + EXPECT_EQ(splitted_data.transactions.size(), 2); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 0); +} + +TEST_F(ShardSplitTest, TestBasicSplitWithLabelIndex) { + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(6)}, {}).HasError()); + acc.Commit(GetNextHlc()); + storage.CreateIndex(secondary_label); + + auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2); + + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 0); + EXPECT_EQ(splitted_data.transactions.size(), 1); + EXPECT_EQ(splitted_data.label_indices.size(), 1); + EXPECT_EQ(splitted_data.label_property_indices.size(), 0); +} + +TEST_F(ShardSplitTest, TestBasicSplitWithLabelPropertyIndex) { + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE( + acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {{secondary_property, PropertyValue(1)}}) + .HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE( + acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {{secondary_property, PropertyValue(21)}}) + .HasError()); + EXPECT_FALSE( + acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(6)}, {{secondary_property, PropertyValue(22)}}) + .HasError()); + acc.Commit(GetNextHlc()); + storage.CreateIndex(secondary_label, secondary_property); + + auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2); + + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 0); + EXPECT_EQ(splitted_data.transactions.size(), 1); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 1); +} + +TEST_F(ShardSplitTest, TestSplittingShardsWithGcDestroyOriginalShard) { + const auto split_value{4}; + PrimaryKey splitted_value{{PropertyValue(4)}}; + std::unique_ptr<Shard> splitted_shard; + + { + Shard storage2{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector}; + auto acc = storage2.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError()); + + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(3)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2)) + .HasError()); + acc.Commit(GetNextHlc()); + + auto splitted_data = storage2.PerformSplit({PropertyValue(split_value)}, 2); + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 2); + EXPECT_EQ(splitted_data.transactions.size(), 1); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 0); + + // Create a new shard + splitted_shard = Shard::FromSplitData(std::move(splitted_data)); + // Call gc on old shard + storage2.CollectGarbage(GetNextHlc().coordinator_wall_clock); + // Destroy original + } + + splitted_shard->CollectGarbage(GetNextHlc().coordinator_wall_clock); + AssertShardState(*splitted_shard, 4, 6); +} + +TEST_F(ShardSplitTest, TestSplittingShardsWithGcDestroySplittedShard) { + PrimaryKey splitted_value{{PropertyValue(4)}}; + + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError()); + + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(3)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1)) + .HasError()); + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2)) + .HasError()); + acc.Commit(GetNextHlc()); + + auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2); + EXPECT_EQ(splitted_data.vertices.size(), 3); + EXPECT_EQ(splitted_data.edges->size(), 2); + EXPECT_EQ(splitted_data.transactions.size(), 1); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 0); + + { + // Create a new shard + auto splitted_shard = Shard::FromSplitData(std::move(splitted_data)); + // Call gc on new shard + splitted_shard->CollectGarbage(GetNextHlc().coordinator_wall_clock); + // Destroy splitted shard + } + + storage.CollectGarbage(GetNextHlc().coordinator_wall_clock); + AssertShardState(storage, 1, 3); +} + +TEST_F(ShardSplitTest, TestBigSplit) { + int pk{0}; + for (int64_t i{0}; i < 10'000; ++i) { + auto acc = storage.Access(GetNextHlc()); + EXPECT_FALSE( + acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(pk++)}, {{secondary_property, PropertyValue(i)}}) + .HasError()); + EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(pk++)}, {}).HasError()); + + EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(pk - 2)}}, + VertexId{primary_label, PrimaryKey{PropertyValue(pk - 1)}}, edge_type_id, + Gid::FromUint(pk)) + .HasError()); + acc.Commit(GetNextHlc()); + } + storage.CreateIndex(secondary_label, secondary_property); + + const auto split_value = pk / 2; + auto splitted_data = storage.PerformSplit({PropertyValue(split_value)}, 2); + + EXPECT_EQ(splitted_data.vertices.size(), 10000); + EXPECT_EQ(splitted_data.edges->size(), 5000); + EXPECT_EQ(splitted_data.transactions.size(), 5000); + EXPECT_EQ(splitted_data.label_indices.size(), 0); + EXPECT_EQ(splitted_data.label_property_indices.size(), 1); + + auto shard = Shard::FromSplitData(std::move(splitted_data)); + AssertShardState(*shard, split_value, split_value * 2); +} + +} // namespace memgraph::storage::v3::tests