Merge pull request #719 from memgraph/T1149-MG-split-shard

- Implement shard split on storage side
This commit is contained in:
Jure Bajic 2023-03-29 16:07:18 +02:00 committed by GitHub
commit 5b5ce1c4c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 2133 additions and 91 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -23,17 +23,17 @@ namespace memgraph::coordinator {
using Time = memgraph::io::Time;
/// Hybrid-logical clock
struct Hlc {
uint64_t logical_id = 0;
struct Hlc final {
uint64_t logical_id{0};
Time coordinator_wall_clock = Time::min();
auto operator<=>(const Hlc &other) const { return logical_id <=> other.logical_id; }
auto operator<=>(const Hlc &other) const noexcept { return logical_id <=> other.logical_id; }
bool operator==(const Hlc &other) const = default;
bool operator<(const Hlc &other) const = default;
bool operator==(const uint64_t other) const { return logical_id == other; }
bool operator<(const uint64_t other) const { return logical_id < other; }
bool operator>=(const uint64_t other) const { return logical_id >= other; }
bool operator==(const Hlc &other) const noexcept = default;
bool operator<(const Hlc &other) const noexcept = default;
bool operator==(const uint64_t other) const noexcept { return logical_id == other; }
bool operator<(const uint64_t other) const noexcept { return logical_id < other; }
bool operator>=(const uint64_t other) const noexcept { return logical_id >= other; }
friend std::ostream &operator<<(std::ostream &in, const Hlc &hlc) {
auto wall_clock = std::chrono::system_clock::to_time_t(hlc.coordinator_wall_clock);

View File

@ -12,6 +12,7 @@
#pragma once
#include <chrono>
#include <cstdint>
#include <iostream>
#include <map>
#include <memory>
@ -571,6 +572,16 @@ struct CommitResponse {
std::optional<ShardError> error;
};
struct SplitInfo {
PrimaryKey split_key;
uint64_t shard_version;
};
struct PerformSplitDataInfo {
PrimaryKey split_key;
uint64_t shard_version;
};
using ReadRequests = std::variant<ExpandOneRequest, GetPropertiesRequest, ScanVerticesRequest>;
using ReadResponses = std::variant<ExpandOneResponse, GetPropertiesResponse, ScanVerticesResponse>;

View File

@ -18,6 +18,7 @@ set(storage_v3_src_files
bindings/typed_value.cpp
expr.cpp
vertex.cpp
splitter.cpp
request_helper.cpp)
# ######################

View File

@ -30,6 +30,10 @@ struct Config {
io::Duration reclamation_interval{};
} gc;
struct Split {
uint64_t max_shard_vertex_size{500'000};
} split;
struct Items {
bool properties_on_edges{true};
} items;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -13,12 +13,14 @@
#include <cstdint>
#include <memory>
#include "storage/v3/edge_ref.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_id.hpp"
#include "utils/logging.hpp"
#include "utils/synchronized.hpp"
namespace memgraph::storage::v3 {
@ -27,6 +29,11 @@ struct Edge;
struct Delta;
struct CommitInfo;
inline uint64_t GetNextDeltaId() {
static utils::Synchronized<uint64_t, utils::SpinLock> delta_id{0};
return delta_id.WithLock([](auto &id) { return id++; });
}
// This class stores one of three pointers (`Delta`, `Vertex` and `Edge`)
// without using additional memory for storing the type. The type is stored in
// the pointer itself in the lower bits. All of those structures contain large
@ -158,46 +165,54 @@ struct Delta {
struct RemoveInEdgeTag {};
struct RemoveOutEdgeTag {};
Delta(DeleteObjectTag /*unused*/, CommitInfo *commit_info, uint64_t command_id)
: action(Action::DELETE_OBJECT), commit_info(commit_info), command_id(command_id) {}
Delta(DeleteObjectTag /*unused*/, CommitInfo *commit_info, uint64_t delta_id, uint64_t command_id)
: action(Action::DELETE_OBJECT), id(delta_id), commit_info(commit_info), command_id(command_id) {}
Delta(RecreateObjectTag /*unused*/, CommitInfo *commit_info, uint64_t command_id)
: action(Action::RECREATE_OBJECT), commit_info(commit_info), command_id(command_id) {}
Delta(RecreateObjectTag /*unused*/, CommitInfo *commit_info, uint64_t delta_id, uint64_t command_id)
: action(Action::RECREATE_OBJECT), id(delta_id), commit_info(commit_info), command_id(command_id) {}
Delta(AddLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t command_id)
: action(Action::ADD_LABEL), commit_info(commit_info), command_id(command_id), label(label) {}
Delta(AddLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t delta_id, uint64_t command_id)
: action(Action::ADD_LABEL), id(delta_id), commit_info(commit_info), command_id(command_id), label(label) {}
Delta(RemoveLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t command_id)
: action(Action::REMOVE_LABEL), commit_info(commit_info), command_id(command_id), label(label) {}
Delta(RemoveLabelTag /*unused*/, LabelId label, CommitInfo *commit_info, uint64_t delta_id, uint64_t command_id)
: action(Action::REMOVE_LABEL), id(delta_id), commit_info(commit_info), command_id(command_id), label(label) {}
Delta(SetPropertyTag /*unused*/, PropertyId key, const PropertyValue &value, CommitInfo *commit_info,
uint64_t command_id)
: action(Action::SET_PROPERTY), commit_info(commit_info), command_id(command_id), property({key, value}) {}
uint64_t delta_id, uint64_t command_id)
: action(Action::SET_PROPERTY),
id(delta_id),
commit_info(commit_info),
command_id(command_id),
property({key, value}) {}
Delta(AddInEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info,
uint64_t command_id)
uint64_t delta_id, uint64_t command_id)
: action(Action::ADD_IN_EDGE),
id(delta_id),
commit_info(commit_info),
command_id(command_id),
vertex_edge({edge_type, std::move(vertex_id), edge}) {}
Delta(AddOutEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info,
uint64_t command_id)
uint64_t delta_id, uint64_t command_id)
: action(Action::ADD_OUT_EDGE),
id(delta_id),
commit_info(commit_info),
command_id(command_id),
vertex_edge({edge_type, std::move(vertex_id), edge}) {}
Delta(RemoveInEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info,
uint64_t command_id)
uint64_t delta_id, uint64_t command_id)
: action(Action::REMOVE_IN_EDGE),
id(delta_id),
commit_info(commit_info),
command_id(command_id),
vertex_edge({edge_type, std::move(vertex_id), edge}) {}
Delta(RemoveOutEdgeTag /*unused*/, EdgeTypeId edge_type, VertexId vertex_id, EdgeRef edge, CommitInfo *commit_info,
uint64_t command_id)
uint64_t delta_id, uint64_t command_id)
: action(Action::REMOVE_OUT_EDGE),
id(delta_id),
commit_info(commit_info),
command_id(command_id),
vertex_edge({edge_type, std::move(vertex_id), edge}) {}
@ -226,8 +241,10 @@ struct Delta {
}
}
Action action;
friend bool operator==(const Delta &lhs, const Delta &rhs) noexcept { return lhs.id == rhs.id; }
Action action;
uint64_t id;
// TODO: optimize with in-place copy
CommitInfo *commit_info;
uint64_t command_id;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -325,7 +325,7 @@ void LabelIndex::RemoveObsoleteEntries(const uint64_t clean_up_before_timestamp)
}
}
LabelIndex::Iterable::Iterator::Iterator(Iterable *self, LabelIndexContainer::iterator index_iterator)
LabelIndex::Iterable::Iterator::Iterator(Iterable *self, IndexContainer::iterator index_iterator)
: self_(self),
index_iterator_(index_iterator),
current_vertex_accessor_(nullptr, nullptr, nullptr, self_->config_, *self_->vertex_validator_),
@ -353,7 +353,7 @@ void LabelIndex::Iterable::Iterator::AdvanceUntilValid() {
}
}
LabelIndex::Iterable::Iterable(LabelIndexContainer &index_container, LabelId label, View view, Transaction *transaction,
LabelIndex::Iterable::Iterable(IndexContainer &index_container, LabelId label, View view, Transaction *transaction,
Indices *indices, Config::Items config, const VertexValidator &vertex_validator)
: index_container_(&index_container),
label_(label),
@ -465,7 +465,7 @@ void LabelPropertyIndex::RemoveObsoleteEntries(const uint64_t clean_up_before_ti
}
}
LabelPropertyIndex::Iterable::Iterator::Iterator(Iterable *self, LabelPropertyIndexContainer::iterator index_iterator)
LabelPropertyIndex::Iterable::Iterator::Iterator(Iterable *self, IndexContainer::iterator index_iterator)
: self_(self),
index_iterator_(index_iterator),
current_vertex_accessor_(nullptr, nullptr, nullptr, self_->config_, *self_->vertex_validator_),
@ -526,7 +526,7 @@ const PropertyValue kSmallestMap = PropertyValue(std::map<std::string, PropertyV
const PropertyValue kSmallestTemporalData =
PropertyValue(TemporalData{static_cast<TemporalType>(0), std::numeric_limits<int64_t>::min()});
LabelPropertyIndex::Iterable::Iterable(LabelPropertyIndexContainer &index_container, LabelId label, PropertyId property,
LabelPropertyIndex::Iterable::Iterable(IndexContainer &index_container, LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view,
Transaction *transaction, Indices *indices, Config::Items config,

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -18,6 +18,8 @@
#include <utility>
#include "storage/v3/config.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex_accessor.hpp"
@ -40,12 +42,18 @@ class LabelIndex {
bool operator==(const Entry &rhs) const { return vertex == rhs.vertex && timestamp == rhs.timestamp; }
};
using IndexType = LabelId;
public:
using LabelIndexContainer = std::set<Entry>;
using IndexContainer = std::set<Entry>;
LabelIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator)
: indices_(indices), config_(config), vertex_validator_{&vertex_validator} {}
LabelIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator,
std::map<LabelId, IndexContainer> &data)
: index_{std::move(data)}, indices_(indices), config_(config), vertex_validator_{&vertex_validator} {}
/// @throw std::bad_alloc
void UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx);
@ -63,12 +71,12 @@ class LabelIndex {
class Iterable {
public:
Iterable(LabelIndexContainer &index_container, LabelId label, View view, Transaction *transaction, Indices *indices,
Iterable(IndexContainer &index_container, LabelId label, View view, Transaction *transaction, Indices *indices,
Config::Items config, const VertexValidator &vertex_validator);
class Iterator {
public:
Iterator(Iterable *self, LabelIndexContainer::iterator index_iterator);
Iterator(Iterable *self, IndexContainer::iterator index_iterator);
VertexAccessor operator*() const { return current_vertex_accessor_; }
@ -81,7 +89,7 @@ class LabelIndex {
void AdvanceUntilValid();
Iterable *self_;
LabelIndexContainer::iterator index_iterator_;
IndexContainer::iterator index_iterator_;
VertexAccessor current_vertex_accessor_;
Vertex *current_vertex_;
};
@ -90,7 +98,7 @@ class LabelIndex {
Iterator end() { return {this, index_container_->end()}; }
private:
LabelIndexContainer *index_container_;
IndexContainer *index_container_;
LabelId label_;
View view_;
Transaction *transaction_;
@ -114,8 +122,29 @@ class LabelIndex {
void Clear() { index_.clear(); }
std::map<IndexType, IndexContainer> SplitIndexEntries(const PrimaryKey &split_key) {
std::map<IndexType, IndexContainer> cloned_indices;
for (auto &[index_type_val, index] : index_) {
auto entry_it = index.begin();
auto &cloned_indices_container = cloned_indices[index_type_val];
while (entry_it != index.end()) {
// We need to save the next iterator since the current one will be
// invalidated after extract
auto next_entry_it = std::next(entry_it);
if (entry_it->vertex->first > split_key) {
[[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
cloned_indices_container.insert(index.extract(entry_it));
MG_ASSERT(inserted, "Failed to extract index entry!");
}
entry_it = next_entry_it;
}
}
return cloned_indices;
}
private:
std::map<LabelId, LabelIndexContainer> index_;
std::map<LabelId, IndexContainer> index_;
Indices *indices_;
Config::Items config_;
const VertexValidator *vertex_validator_;
@ -133,9 +162,10 @@ class LabelPropertyIndex {
bool operator<(const PropertyValue &rhs) const;
bool operator==(const PropertyValue &rhs) const;
};
using IndexType = std::pair<LabelId, PropertyId>;
public:
using LabelPropertyIndexContainer = std::set<Entry>;
using IndexContainer = std::set<Entry>;
LabelPropertyIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator)
: indices_(indices), config_(config), vertex_validator_{&vertex_validator} {}
@ -159,14 +189,14 @@ class LabelPropertyIndex {
class Iterable {
public:
Iterable(LabelPropertyIndexContainer &index_container, LabelId label, PropertyId property,
Iterable(IndexContainer &index_container, LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view, Transaction *transaction,
Indices *indices, Config::Items config, const VertexValidator &vertex_validator);
class Iterator {
public:
Iterator(Iterable *self, LabelPropertyIndexContainer::iterator index_iterator);
Iterator(Iterable *self, IndexContainer::iterator index_iterator);
VertexAccessor operator*() const { return current_vertex_accessor_; }
@ -179,7 +209,7 @@ class LabelPropertyIndex {
void AdvanceUntilValid();
Iterable *self_;
LabelPropertyIndexContainer::iterator index_iterator_;
IndexContainer::iterator index_iterator_;
VertexAccessor current_vertex_accessor_;
Vertex *current_vertex_;
};
@ -188,7 +218,7 @@ class LabelPropertyIndex {
Iterator end();
private:
LabelPropertyIndexContainer *index_container_;
IndexContainer *index_container_;
LabelId label_;
PropertyId property_;
std::optional<utils::Bound<PropertyValue>> lower_bound_;
@ -229,8 +259,29 @@ class LabelPropertyIndex {
void Clear() { index_.clear(); }
std::map<IndexType, IndexContainer> SplitIndexEntries(const PrimaryKey &split_key) {
std::map<IndexType, IndexContainer> cloned_indices;
for (auto &[index_type_val, index] : index_) {
auto entry_it = index.begin();
auto &cloned_index_container = cloned_indices[index_type_val];
while (entry_it != index.end()) {
// We need to save the next iterator since the current one will be
// invalidated after extract
auto next_entry_it = std::next(entry_it);
if (entry_it->vertex->first > split_key) {
[[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
cloned_index_container.insert(index.extract(entry_it));
MG_ASSERT(inserted, "Failed to extract index entry!");
}
entry_it = next_entry_it;
}
}
return cloned_indices;
}
private:
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndexContainer> index_;
std::map<std::pair<LabelId, PropertyId>, IndexContainer> index_;
Indices *indices_;
Config::Items config_;
const VertexValidator *vertex_validator_;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -108,7 +108,7 @@ inline bool PrepareForWrite(Transaction *transaction, TObj *object) {
/// a `DELETE_OBJECT` delta).
/// @throw std::bad_alloc
inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_info.get(),
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_info.get(), GetNextDeltaId(),
transaction->command_id);
}
@ -119,7 +119,7 @@ template <typename TObj, class... Args>
requires utils::SameAsAnyOf<TObj, Edge, Vertex>
inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&...args) {
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)..., transaction->commit_info.get(),
transaction->command_id);
GetNextDeltaId(), transaction->command_id);
auto *delta_holder = GetDeltaHolder(object);
// The operations are written in such order so that both `next` and `prev`

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -18,14 +18,14 @@
#include <memory>
#include <mutex>
#include <optional>
#include <variant>
#include <bits/ranges_algo.h>
#include <gflags/gflags.h>
#include <spdlog/spdlog.h>
#include "io/network/endpoint.hpp"
#include "io/time.hpp"
#include "storage/v3/delta.hpp"
#include "storage/v3/edge.hpp"
#include "storage/v3/edge_accessor.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/indices.hpp"
@ -332,16 +332,64 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
vertex_validator_{schema_validator_, primary_label},
indices_{config.items, vertex_validator_},
isolation_level_{config.transaction.isolation_level},
config_{config},
uuid_{utils::GenerateUUID()},
epoch_id_{utils::GenerateUUID()},
global_locker_{file_retainer_.AddLocker()} {
config_{config} {
CreateSchema(primary_label_, schema);
StoreMapping(std::move(id_to_name));
}
Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
const std::unordered_map<uint64_t, std::string> &id_to_name, const uint64_t shard_version)
: primary_label_{primary_label},
min_primary_key_{min_primary_key},
max_primary_key_{max_primary_key},
vertices_(std::move(vertices)),
edges_(std::move(edges)),
shard_version_(shard_version),
schema_validator_{schemas_, name_id_mapper_},
vertex_validator_{schema_validator_, primary_label},
indices_{config.items, vertex_validator_},
isolation_level_{config.transaction.isolation_level},
config_{config},
start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)) {
CreateSchema(primary_label_, schema);
StoreMapping(id_to_name);
}
Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, VertexContainer &&vertices,
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
const std::unordered_map<uint64_t, std::string> &id_to_name, const uint64_t shard_version)
: primary_label_{primary_label},
min_primary_key_{min_primary_key},
max_primary_key_{max_primary_key},
vertices_(std::move(vertices)),
shard_version_(shard_version),
schema_validator_{schemas_, name_id_mapper_},
vertex_validator_{schema_validator_, primary_label},
indices_{config.items, vertex_validator_},
isolation_level_{config.transaction.isolation_level},
config_{config},
start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)) {
CreateSchema(primary_label_, schema);
StoreMapping(id_to_name);
}
Shard::~Shard() {}
std::unique_ptr<Shard> Shard::FromSplitData(SplitData &&split_data) {
if (split_data.config.items.properties_on_edges) [[likely]] {
return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.max_primary_key,
split_data.schema, std::move(split_data.vertices), std::move(*split_data.edges),
std::move(split_data.transactions), split_data.config, split_data.id_to_name,
split_data.shard_version);
}
return std::make_unique<Shard>(split_data.primary_label, split_data.min_primary_key, split_data.max_primary_key,
split_data.schema, std::move(split_data.vertices), std::move(split_data.transactions),
split_data.config, split_data.id_to_name, split_data.shard_version);
}
Shard::Accessor::Accessor(Shard &shard, Transaction &transaction)
: shard_(&shard), transaction_(&transaction), config_(shard_->config_.items) {}
@ -436,7 +484,7 @@ ShardResult<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>>
}
std::vector<EdgeAccessor> deleted_edges;
const VertexId vertex_id{shard_->primary_label_, *vertex->PrimaryKey(View::OLD)}; // TODO Replace
const VertexId vertex_id{shard_->primary_label_, *vertex->PrimaryKey(View::OLD)};
for (const auto &item : in_edges) {
auto [edge_type, from_vertex, edge] = item;
EdgeAccessor e(edge, edge_type, from_vertex, vertex_id, transaction_, &shard_->indices_, config_);
@ -1048,6 +1096,28 @@ void Shard::StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name) {
name_id_mapper_.StoreMapping(std::move(id_to_name));
}
std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
if (vertices_.size() > config_.split.max_shard_vertex_size) {
auto mid_elem = vertices_.begin();
// TODO(tyler) the first time we calculate the split point, we should store it so that we don't have to
// iterate over half of the entire index each time Cron is run until the split succeeds.
std::ranges::advance(mid_elem, static_cast<VertexContainer::difference_type>(vertices_.size() / 2));
return SplitInfo{mid_elem->first, shard_version_};
}
return std::nullopt;
}
SplitData Shard::PerformSplit(const PrimaryKey &split_key, const uint64_t shard_version) {
shard_version_ = shard_version;
const auto old_max_key = max_primary_key_;
max_primary_key_ = split_key;
const auto *schema = GetSchema(primary_label_);
MG_ASSERT(schema, "Shard must know about schema of primary label!");
Splitter shard_splitter(primary_label_, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_,
schema->second, name_id_mapper_);
return shard_splitter.SplitShard(split_key, old_max_key, shard_version);
}
bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const {
return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ &&
(!max_primary_key_.has_value() || vertex_id.primary_key < *max_primary_key_);

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -14,6 +14,7 @@
#include <cstdint>
#include <filesystem>
#include <map>
#include <memory>
#include <numeric>
#include <optional>
#include <shared_mutex>
@ -37,6 +38,7 @@
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/splitter.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_accessor.hpp"
@ -174,6 +176,11 @@ struct SchemasInfo {
Schemas::SchemasList schemas;
};
struct SplitInfo {
PrimaryKey split_point;
uint64_t shard_version;
};
/// Structure used to return information about the storage.
struct StorageInfo {
uint64_t vertex_count;
@ -186,9 +193,19 @@ class Shard final {
public:
/// @throw std::system_error
/// @throw std::bad_alloc
explicit Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, Config config = Config(),
std::unordered_map<uint64_t, std::string> id_to_name = {});
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, Config config = Config(),
std::unordered_map<uint64_t, std::string> id_to_name = {});
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
const std::unordered_map<uint64_t, std::string> &id_to_name, uint64_t shard_version);
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, VertexContainer &&vertices,
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, const Config &config,
const std::unordered_map<uint64_t, std::string> &id_to_name, uint64_t shard_version);
Shard(const Shard &) = delete;
Shard(Shard &&) noexcept = delete;
@ -196,6 +213,8 @@ class Shard final {
Shard operator=(Shard &&) noexcept = delete;
~Shard();
static std::unique_ptr<Shard> FromSplitData(SplitData &&split_data);
class Accessor final {
private:
friend class Shard;
@ -360,6 +379,10 @@ class Shard final {
void StoreMapping(std::unordered_map<uint64_t, std::string> id_to_name);
std::optional<SplitInfo> ShouldSplit() const noexcept;
SplitData PerformSplit(const PrimaryKey &split_key, uint64_t shard_version);
private:
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
@ -377,6 +400,7 @@ class Shard final {
// list is used only when properties are enabled for edges. Because of that we
// keep a separate count of edges that is always updated.
uint64_t edge_count_{0};
uint64_t shard_version_{0};
SchemaValidator schema_validator_;
VertexValidator vertex_validator_;
@ -396,38 +420,6 @@ class Shard final {
// storage.
std::list<Gid> deleted_edges_;
// UUID used to distinguish snapshots and to link snapshots to WALs
std::string uuid_;
// Sequence number used to keep track of the chain of WALs.
uint64_t wal_seq_num_{0};
// UUID to distinguish different main instance runs for replication process
// on SAME storage.
// Multiple instances can have same storage UUID and be MAIN at the same time.
// We cannot compare commit timestamps of those instances if one of them
// becomes the replica of the other so we use epoch_id_ as additional
// discriminating property.
// Example of this:
// We have 2 instances of the same storage, S1 and S2.
// S1 and S2 are MAIN and accept their own commits and write them to the WAL.
// At the moment when S1 commited a transaction with timestamp 20, and S2
// a different transaction with timestamp 15, we change S2's role to REPLICA
// and register it on S1.
// Without using the epoch_id, we don't know that S1 and S2 have completely
// different transactions, we think that the S2 is behind only by 5 commits.
std::string epoch_id_;
// History of the previous epoch ids.
// Each value consists of the epoch id along the last commit belonging to that
// epoch.
std::deque<std::pair<std::string, uint64_t>> epoch_history_;
uint64_t wal_unsynced_transactions_{0};
utils::FileRetainer file_retainer_;
// Global locker that is used for clients file locking
utils::FileRetainer::FileLocker global_locker_;
// Holds all of the (in progress, committed and aborted) transactions that are read or write to this shard, but
// haven't been cleaned up yet
std::map<uint64_t, std::unique_ptr<Transaction>> start_logical_id_to_transaction_{};

View File

@ -12,11 +12,13 @@
#pragma once
#include <memory>
#include <optional>
#include <variant>
#include <openssl/ec.h>
#include "query/v2/requests.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/value_conversions.hpp"
#include "storage/v3/vertex_accessor.hpp"
namespace memgraph::storage::v3 {
@ -41,6 +43,19 @@ class ShardRsm {
public:
explicit ShardRsm(std::unique_ptr<Shard> &&shard) : shard_(std::move(shard)){};
std::optional<msgs::SplitInfo> ShouldSplit() const noexcept {
auto split_info = shard_->ShouldSplit();
if (split_info) {
return msgs::SplitInfo{conversions::ConvertValueVector(split_info->split_point), split_info->shard_version};
}
return std::nullopt;
}
std::unique_ptr<Shard> PerformSplit(msgs::PerformSplitDataInfo perform_split) const noexcept {
return Shard::FromSplitData(
shard_->PerformSplit(conversions::ConvertPropertyVector(perform_split.split_key), perform_split.shard_version));
}
// NOLINTNEXTLINE(readability-convert-member-functions-to-static)
msgs::ReadResponses Read(msgs::ReadRequests &&requests) {
return std::visit([&](auto &&request) mutable { return HandleRead(std::forward<decltype(request)>(request)); },

411
src/storage/v3/splitter.cpp Normal file
View File

@ -0,0 +1,411 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v3/splitter.hpp"
#include <algorithm>
#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include "storage/v3/config.hpp"
#include "storage/v3/delta.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/indices.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "utils/logging.hpp"
namespace memgraph::storage::v3 {
Splitter::Splitter(const LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
const Config &config, const std::vector<SchemaProperty> &schema, const NameIdMapper &name_id_mapper)
: primary_label_(primary_label),
vertices_(vertices),
edges_(edges),
start_logical_id_to_transaction_(start_logical_id_to_transaction),
indices_(indices),
config_(config),
schema_(schema),
name_id_mapper_(name_id_mapper) {}
SplitData Splitter::SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key,
const uint64_t shard_version) {
SplitData data{.primary_label = primary_label_,
.min_primary_key = split_key,
.max_primary_key = max_primary_key,
.schema = schema_,
.config = config_,
.id_to_name = name_id_mapper_.GetIdToNameMap(),
.shard_version = shard_version};
std::set<uint64_t> collected_transactions_;
data.vertices = CollectVertices(data, collected_transactions_, split_key);
data.edges = CollectEdges(collected_transactions_, data.vertices, split_key);
data.transactions = CollectTransactions(collected_transactions_, *data.edges, split_key);
return data;
}
void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_, const Delta *delta) {
while (delta != nullptr) {
collected_transactions_.insert(delta->commit_info->start_or_commit_timestamp.logical_id);
delta = delta->next;
}
}
VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_,
const PrimaryKey &split_key) {
data.label_indices = indices_.label_index.SplitIndexEntries(split_key);
data.label_property_indices = indices_.label_property_index.SplitIndexEntries(split_key);
VertexContainer splitted_data;
auto split_key_it = vertices_.find(split_key);
while (split_key_it != vertices_.end()) {
// Go through deltas and pick up transactions start_id/commit_id
ScanDeltas(collected_transactions_, split_key_it->second.delta);
auto next_it = std::next(split_key_it);
const auto new_it = splitted_data.insert(splitted_data.end(), vertices_.extract(split_key_it));
MG_ASSERT(new_it != splitted_data.end(), "Failed to extract vertex!");
split_key_it = next_it;
}
return splitted_data;
}
std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collected_transactions_,
const VertexContainer &split_vertices,
const PrimaryKey &split_key) {
if (!config_.items.properties_on_edges) {
return std::nullopt;
}
EdgeContainer splitted_edges;
const auto split_vertex_edges = [&](const auto &edges_ref) {
// This is safe since if properties_on_edges is true, the this must be a ptr
for (const auto &edge_ref : edges_ref) {
auto *edge = std::get<2>(edge_ref).ptr;
const auto &other_vtx = std::get<1>(edge_ref);
ScanDeltas(collected_transactions_, edge->delta);
// Check if src and dest edge are both on splitted shard so we know if we
// should remove orphan edge, or make a clone
if (other_vtx.primary_key >= split_key) {
// Remove edge from shard
splitted_edges.insert(edges_.extract(edge->gid));
} else {
splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}});
}
}
};
for (const auto &vertex : split_vertices) {
split_vertex_edges(vertex.second.in_edges);
split_vertex_edges(vertex.second.out_edges);
}
return splitted_edges;
}
std::map<uint64_t, std::unique_ptr<Transaction>> Splitter::CollectTransactions(
const std::set<uint64_t> &collected_transactions_, EdgeContainer &cloned_edges, const PrimaryKey &split_key) {
std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
for (const auto &[commit_start, transaction] : start_logical_id_to_transaction_) {
// We need all transaction whose deltas need to be resolved for any of the
// entities
if (collected_transactions_.contains(transaction->commit_info->start_or_commit_timestamp.logical_id)) {
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
}
}
// It is necessary to clone all the transactions first so we have new addresses
// for deltas, before doing alignment of deltas and prev_ptr
AdjustClonedTransactions(transactions, cloned_edges, split_key);
return transactions;
}
void EraseDeltaChain(auto &transaction, auto &transactions, auto &delta_head_it) {
auto *current_next_delta = delta_head_it->next;
// We need to keep track of delta_head_it in the delta list of current transaction
delta_head_it = transaction.deltas.erase(delta_head_it);
while (current_next_delta != nullptr) {
auto *next_delta = current_next_delta->next;
// Find next delta transaction delta list
auto current_transaction_it = std::ranges::find_if(
transactions, [&start_or_commit_timestamp =
current_next_delta->commit_info->start_or_commit_timestamp](const auto &transaction) {
return transaction.second->start_timestamp == start_or_commit_timestamp ||
transaction.second->commit_info->start_or_commit_timestamp == start_or_commit_timestamp;
});
MG_ASSERT(current_transaction_it != transactions.end(), "Error when pruning deltas!");
// Remove the delta
const auto delta_it =
std::ranges::find_if(current_transaction_it->second->deltas,
[current_next_delta](const auto &elem) { return elem.id == current_next_delta->id; });
if (delta_it != current_transaction_it->second->deltas.end()) {
// If the next delta is next in transaction list replace current_transaction_it
// with the next one
if (current_transaction_it->second->start_timestamp == transaction.start_timestamp &&
current_transaction_it == std::next(current_transaction_it)) {
delta_head_it = current_transaction_it->second->deltas.erase(delta_it);
} else {
current_transaction_it->second->deltas.erase(delta_it);
}
}
current_next_delta = next_delta;
}
}
void PruneDeltas(Transaction &cloned_transaction, std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
const PrimaryKey &split_key, EdgeContainer &cloned_edges) {
// Remove delta chains that don't point to objects on splitted shard
auto cloned_delta_it = cloned_transaction.deltas.begin();
while (cloned_delta_it != cloned_transaction.deltas.end()) {
const auto prev = cloned_delta_it->prev.Get();
switch (prev.type) {
case PreviousPtr::Type::DELTA:
case PreviousPtr::Type::NULLPTR:
++cloned_delta_it;
break;
case PreviousPtr::Type::VERTEX: {
if (prev.vertex->first < split_key) {
// We can remove this delta chain
EraseDeltaChain(cloned_transaction, cloned_transactions, cloned_delta_it);
} else {
++cloned_delta_it;
}
break;
}
case PreviousPtr::Type::EDGE: {
if (const auto edge_gid = prev.edge->gid; !cloned_edges.contains(edge_gid)) {
// We can remove this delta chain
EraseDeltaChain(cloned_transaction, cloned_transactions, cloned_delta_it);
} else {
++cloned_delta_it;
}
break;
}
}
}
}
void Splitter::PruneOriginalDeltas(Transaction &transaction,
std::map<uint64_t, std::unique_ptr<Transaction>> &transactions,
const PrimaryKey &split_key) {
// Remove delta chains that don't point to objects on splitted shard
auto delta_it = transaction.deltas.begin();
while (delta_it != transaction.deltas.end()) {
const auto prev = delta_it->prev.Get();
switch (prev.type) {
case PreviousPtr::Type::DELTA:
case PreviousPtr::Type::NULLPTR:
++delta_it;
break;
case PreviousPtr::Type::VERTEX: {
if (prev.vertex->first >= split_key) {
// We can remove this delta chain
EraseDeltaChain(transaction, transactions, delta_it);
} else {
++delta_it;
}
break;
}
case PreviousPtr::Type::EDGE: {
if (const auto edge_gid = prev.edge->gid; !edges_.contains(edge_gid)) {
// We can remove this delta chain
EraseDeltaChain(transaction, transactions, delta_it);
} else {
++delta_it;
}
break;
}
}
}
}
void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
EdgeContainer &cloned_edges, const PrimaryKey &split_key) {
for (auto &[start_id, cloned_transaction] : cloned_transactions) {
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[start_id], cloned_transactions,
cloned_edges);
}
// Prune deltas whose delta chain points to vertex/edge that should not belong on that shard
// Prune must be after adjust, since next, and prev are not set and we cannot follow the chain
for (auto &[start_id, cloned_transaction] : cloned_transactions) {
PruneDeltas(*cloned_transaction, cloned_transactions, split_key, cloned_edges);
}
// Also we need to remove deltas from original transactions
for (auto &[start_id, original_transaction] : start_logical_id_to_transaction_) {
PruneOriginalDeltas(*original_transaction, start_logical_id_to_transaction_, split_key);
}
}
inline bool IsDeltaHeadOfChain(const PreviousPtr::Type &delta_type) {
return delta_type == PreviousPtr::Type::VERTEX || delta_type == PreviousPtr::Type::EDGE;
}
bool DoesPrevPtrPointsToSplittedData(const PreviousPtr::Pointer &prev_ptr, const PrimaryKey &split_key) {
return prev_ptr.type == PreviousPtr::Type::VERTEX && prev_ptr.vertex->first < split_key;
}
void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
EdgeContainer &cloned_edges) {
auto delta_it = transaction.deltas.begin();
auto cloned_delta_it = cloned_transaction.deltas.begin();
while (delta_it != transaction.deltas.end()) {
// We can safely ignore deltas which are not head of delta chain
// Dont' adjust delta chain that points to irrelevant data vertices/edges
if (const auto delta_prev = delta_it->prev.Get(); !IsDeltaHeadOfChain(delta_prev.type)) {
++delta_it;
++cloned_delta_it;
continue;
}
const auto *delta = &*delta_it;
auto *cloned_delta = &*cloned_delta_it;
Delta *cloned_delta_prev_ptr = cloned_delta;
// The head of delta chains contain either vertex/edge as prev ptr so we adjust
// it just at the beginning of delta chain
AdjustDeltaPrevPtr(*delta, *cloned_delta_prev_ptr, cloned_transactions, cloned_edges);
while (delta->next != nullptr) {
AdjustEdgeRef(*cloned_delta, cloned_edges);
// Align next ptr and prev ptr
AdjustDeltaNextAndPrev(*delta, *cloned_delta, cloned_transactions);
// Next delta might not belong to the cloned transaction and thats
// why we skip this delta of the delta chain
if (cloned_delta->next != nullptr) {
cloned_delta = cloned_delta->next;
cloned_delta_prev_ptr = cloned_delta;
} else {
cloned_delta_prev_ptr = nullptr;
}
delta = delta->next;
}
// Align prev ptr
if (cloned_delta_prev_ptr != nullptr) {
AdjustDeltaPrevPtr(*delta, *cloned_delta_prev_ptr, cloned_transactions, cloned_edges);
}
++delta_it;
++cloned_delta_it;
}
MG_ASSERT(delta_it == transaction.deltas.end() && cloned_delta_it == cloned_transaction.deltas.end(),
"Both iterators must be exhausted!");
}
void Splitter::AdjustEdgeRef(Delta &cloned_delta, EdgeContainer &cloned_edges) const {
switch (cloned_delta.action) {
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::REMOVE_OUT_EDGE: {
// Find edge
if (config_.items.properties_on_edges) {
if (const auto cloned_edge_it = cloned_edges.find(cloned_delta.vertex_edge.edge.ptr->gid);
cloned_edge_it != cloned_edges.end()) {
cloned_delta.vertex_edge.edge = EdgeRef{&cloned_edge_it->second};
}
}
break;
}
case Delta::Action::DELETE_OBJECT:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::SET_PROPERTY:
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL: {
// noop
break;
}
}
}
void Splitter::AdjustDeltaNextAndPrev(const Delta &original, Delta &cloned,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions) {
// Get cloned_delta->next transaction, using delta->next original transaction
// cloned_transactions key is start_timestamp
auto cloned_transaction_it =
cloned_transactions.find(original.next->commit_info->start_or_commit_timestamp.logical_id);
if (cloned_transaction_it == cloned_transactions.end()) {
cloned_transaction_it = std::ranges::find_if(cloned_transactions, [&original](const auto &elem) {
return elem.second->commit_info->start_or_commit_timestamp ==
original.next->commit_info->start_or_commit_timestamp;
});
}
// TODO(jbajic) What if next in delta chain does not belong to cloned transaction?
// MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found");
if (cloned_transaction_it == cloned_transactions.end()) return;
// Find cloned delta in delta list of cloned transaction
auto found_cloned_delta_it = std::ranges::find_if(
cloned_transaction_it->second->deltas, [&original](const auto &elem) { return elem.id == original.next->id; });
MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(), "Delta with given uuid must exist!");
cloned.next = &*found_cloned_delta_it;
found_cloned_delta_it->prev.Set(&cloned);
}
void Splitter::AdjustDeltaPrevPtr(const Delta &original, Delta &cloned,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
EdgeContainer &cloned_edges) {
auto ptr = original.prev.Get();
switch (ptr.type) {
case PreviousPtr::Type::NULLPTR: {
MG_ASSERT(false, "PreviousPtr cannot be a nullptr!");
break;
}
case PreviousPtr::Type::DELTA: {
// Same as for deltas except don't align next but prev
auto cloned_transaction_it = std::ranges::find_if(cloned_transactions, [&ptr](const auto &elem) {
return elem.second->start_timestamp == ptr.delta->commit_info->start_or_commit_timestamp ||
elem.second->commit_info->start_or_commit_timestamp == ptr.delta->commit_info->start_or_commit_timestamp;
});
MG_ASSERT(cloned_transaction_it != cloned_transactions.end(), "Cloned transaction not found");
// Find cloned delta in delta list of cloned transaction
auto found_cloned_delta_it =
std::ranges::find_if(cloned_transaction_it->second->deltas,
[delta = ptr.delta](const auto &elem) { return elem.id == delta->id; });
MG_ASSERT(found_cloned_delta_it != cloned_transaction_it->second->deltas.end(),
"Delta with given id must exist!");
cloned.prev.Set(&*found_cloned_delta_it);
break;
}
case PreviousPtr::Type::VERTEX: {
// The vertex was extracted and it is safe to reuse address
cloned.prev.Set(ptr.vertex);
ptr.vertex->second.delta = &cloned;
break;
}
case PreviousPtr::Type::EDGE: {
// We can never be here if we have properties on edge disabled
auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid);
ptr.edge->delta = &cloned;
cloned.prev.Set(&cloned_edge->second);
break;
}
};
}
} // namespace memgraph::storage::v3

109
src/storage/v3/splitter.hpp Normal file
View File

@ -0,0 +1,109 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cstdint>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include "storage/v3/config.hpp"
#include "storage/v3/delta.hpp"
#include "storage/v3/edge.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/indices.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/name_id_mapper.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "utils/concepts.hpp"
namespace memgraph::storage::v3 {
// If edge properties-on-edges is false then we don't need to send edges but
// only vertices, since they will contain those edges
struct SplitData {
LabelId primary_label;
PrimaryKey min_primary_key;
std::optional<PrimaryKey> max_primary_key;
std::vector<SchemaProperty> schema;
Config config;
std::unordered_map<uint64_t, std::string> id_to_name;
uint64_t shard_version;
VertexContainer vertices;
std::optional<EdgeContainer> edges;
std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
std::map<LabelId, LabelIndex::IndexContainer> label_indices;
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndex::IndexContainer> label_property_indices;
};
// TODO(jbajic) Handle deleted_vertices_ and deleted_edges_ after the finishing GC
class Splitter final {
public:
Splitter(LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
const Config &config, const std::vector<SchemaProperty> &schema, const NameIdMapper &name_id_mapper_);
Splitter(const Splitter &) = delete;
Splitter(Splitter &&) noexcept = delete;
Splitter &operator=(const Splitter &) = delete;
Splitter operator=(Splitter &&) noexcept = delete;
~Splitter() = default;
SplitData SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key,
uint64_t shard_version);
private:
VertexContainer CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id,
const PrimaryKey &split_key);
std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
const VertexContainer &split_vertices, const PrimaryKey &split_key);
std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
const std::set<uint64_t> &collected_transactions_start_id, EdgeContainer &cloned_edges,
const PrimaryKey &split_key);
static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, const Delta *delta);
void PruneOriginalDeltas(Transaction &transaction, std::map<uint64_t, std::unique_ptr<Transaction>> &transactions,
const PrimaryKey &split_key);
void AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
EdgeContainer &cloned_edges);
void AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
EdgeContainer &cloned_edges, const PrimaryKey &split_key);
void AdjustEdgeRef(Delta &cloned_delta, EdgeContainer &cloned_edges) const;
static void AdjustDeltaNextAndPrev(const Delta &original, Delta &cloned,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions);
static void AdjustDeltaPrevPtr(const Delta &original, Delta &cloned,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
EdgeContainer &cloned_edges);
const LabelId primary_label_;
VertexContainer &vertices_;
EdgeContainer &edges_;
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction_;
Indices &indices_;
const Config &config_;
const std::vector<SchemaProperty> schema_;
const NameIdMapper &name_id_mapper_;
};
} // namespace memgraph::storage::v3

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -31,6 +31,15 @@ struct CommitInfo {
};
struct Transaction {
Transaction(coordinator::Hlc start_timestamp, CommitInfo new_commit_info, uint64_t command_id, bool must_abort,
bool is_aborted, IsolationLevel isolation_level)
: start_timestamp{start_timestamp},
commit_info{std::make_unique<CommitInfo>(new_commit_info)},
command_id(command_id),
must_abort(must_abort),
is_aborted(is_aborted),
isolation_level(isolation_level){};
Transaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level)
: start_timestamp(start_timestamp),
commit_info(std::make_unique<CommitInfo>(CommitInfo{false, {start_timestamp}})),
@ -54,6 +63,56 @@ struct Transaction {
~Transaction() {}
std::list<Delta> CopyDeltas(CommitInfo *commit_info) const {
std::list<Delta> copied_deltas;
for (const auto &delta : deltas) {
switch (delta.action) {
case Delta::Action::DELETE_OBJECT:
copied_deltas.emplace_back(Delta::DeleteObjectTag{}, commit_info, delta.id, command_id);
break;
case Delta::Action::RECREATE_OBJECT:
copied_deltas.emplace_back(Delta::RecreateObjectTag{}, commit_info, delta.id, command_id);
break;
case Delta::Action::ADD_LABEL:
copied_deltas.emplace_back(Delta::AddLabelTag{}, delta.label, commit_info, delta.id, command_id);
break;
case Delta::Action::REMOVE_LABEL:
copied_deltas.emplace_back(Delta::RemoveLabelTag{}, delta.label, commit_info, delta.id, command_id);
break;
case Delta::Action::ADD_IN_EDGE:
copied_deltas.emplace_back(Delta::AddInEdgeTag{}, delta.vertex_edge.edge_type, delta.vertex_edge.vertex_id,
delta.vertex_edge.edge, commit_info, delta.id, command_id);
break;
case Delta::Action::ADD_OUT_EDGE:
copied_deltas.emplace_back(Delta::AddOutEdgeTag{}, delta.vertex_edge.edge_type, delta.vertex_edge.vertex_id,
delta.vertex_edge.edge, commit_info, delta.id, command_id);
break;
case Delta::Action::REMOVE_IN_EDGE:
copied_deltas.emplace_back(Delta::RemoveInEdgeTag{}, delta.vertex_edge.edge_type, delta.vertex_edge.vertex_id,
delta.vertex_edge.edge, commit_info, delta.id, command_id);
break;
case Delta::Action::REMOVE_OUT_EDGE:
copied_deltas.emplace_back(Delta::RemoveOutEdgeTag{}, delta.vertex_edge.edge_type,
delta.vertex_edge.vertex_id, delta.vertex_edge.edge, commit_info, delta.id,
command_id);
break;
case Delta::Action::SET_PROPERTY:
copied_deltas.emplace_back(Delta::SetPropertyTag{}, delta.property.key, delta.property.value, commit_info,
delta.id, command_id);
break;
}
}
return copied_deltas;
}
// This does not solve the whole problem of copying deltas
std::unique_ptr<Transaction> Clone() const {
auto transaction_ptr = std::make_unique<Transaction>(start_timestamp, *commit_info, command_id, must_abort,
is_aborted, isolation_level);
transaction_ptr->deltas = CopyDeltas(transaction_ptr->commit_info.get());
return transaction_ptr;
}
coordinator::Hlc start_timestamp;
std::unique_ptr<CommitInfo> commit_info;
uint64_t command_id;

View File

@ -79,3 +79,12 @@ target_link_libraries(${test_prefix}data_structures_contains mg-utils mg-storage
add_benchmark(data_structures_remove.cpp)
target_link_libraries(${test_prefix}data_structures_remove mg-utils mg-storage-v3)
add_benchmark(storage_v3_split.cpp)
target_link_libraries(${test_prefix}storage_v3_split mg-storage-v3 mg-query-v2)
add_benchmark(storage_v3_split_1.cpp)
target_link_libraries(${test_prefix}storage_v3_split_1 mg-storage-v3 mg-query-v2)
add_benchmark(storage_v3_split_2.cpp)
target_link_libraries(${test_prefix}storage_v3_split_2 mg-storage-v3 mg-query-v2)

View File

@ -0,0 +1,249 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <cstdint>
#include <optional>
#include <vector>
#include <benchmark/benchmark.h>
#include <gflags/gflags.h>
#include "storage/v3/id_types.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_id.hpp"
namespace memgraph::benchmark {
class ShardSplitBenchmark : public ::benchmark::Fixture {
protected:
using PrimaryKey = storage::v3::PrimaryKey;
using PropertyId = storage::v3::PropertyId;
using PropertyValue = storage::v3::PropertyValue;
using LabelId = storage::v3::LabelId;
using EdgeTypeId = storage::v3::EdgeTypeId;
using Shard = storage::v3::Shard;
using VertexId = storage::v3::VertexId;
using Gid = storage::v3::Gid;
void SetUp(const ::benchmark::State &state) override {
storage.emplace(primary_label, min_pk, std::nullopt, schema_property_vector);
storage->StoreMapping(
{{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}});
}
void TearDown(const ::benchmark::State &) override { storage = std::nullopt; }
const PropertyId primary_property{PropertyId::FromUint(2)};
const PropertyId secondary_property{PropertyId::FromUint(5)};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const LabelId primary_label{LabelId::FromUint(1)};
const LabelId secondary_label{LabelId::FromUint(4)};
const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)};
std::optional<Shard> storage;
coordinator::Hlc last_hlc{0, io::Time{}};
coordinator::Hlc GetNextHlc() {
++last_hlc.logical_id;
last_hlc.coordinator_wall_clock += std::chrono::seconds(1);
return last_hlc;
}
};
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplit)(::benchmark::State &state) {
const auto number_of_vertices{state.range(0)};
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices);
for (int64_t i{0}; i < number_of_vertices; ++i) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)},
{{secondary_property, PropertyValue(i)}})
.HasValue(),
"Failed creating with pk {}", i);
if (i > 1) {
const auto vtx1 = uniform_dist(e1) % i;
const auto vtx2 = uniform_dist(e1) % i;
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
for (auto _ : state) {
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithGc)(::benchmark::State &state) {
const auto number_of_vertices{state.range(0)};
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices);
for (int64_t i{0}; i < number_of_vertices; ++i) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)},
{{secondary_property, PropertyValue(i)}})
.HasValue(),
"Failed creating with pk {}", i);
if (i > 1) {
const auto vtx1 = uniform_dist(e1) % i;
const auto vtx2 = uniform_dist(e1) % i;
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
storage->CollectGarbage(GetNextHlc().coordinator_wall_clock);
for (auto _ : state) {
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)(::benchmark::State &state) {
const auto number_of_vertices = state.range(0);
const auto number_of_edges = state.range(1);
const auto number_of_transactions = state.range(2);
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices - number_of_transactions - 1);
// Create Vertices
int64_t vertex_count{0};
{
auto acc = storage->Access(GetNextHlc());
for (; vertex_count < number_of_vertices - number_of_transactions; ++vertex_count) {
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)},
{{secondary_property, PropertyValue(vertex_count)}})
.HasValue(),
"Failed creating with pk {}", vertex_count);
}
// Create Edges
for (int64_t i{0}; i < number_of_edges; ++i) {
const auto vtx1 = uniform_dist(e1);
const auto vtx2 = uniform_dist(e1);
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
// Clean up transactional data
storage->CollectGarbage(GetNextHlc().coordinator_wall_clock);
// Create rest of the objects and leave transactions
for (; vertex_count < number_of_vertices; ++vertex_count) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)},
{{secondary_property, PropertyValue(vertex_count)}})
.HasValue(),
"Failed creating with pk {}", vertex_count);
acc.Commit(GetNextHlc());
}
for (auto _ : state) {
// Don't create shard since shard deallocation can take some time as well
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
// Range:
// Number of vertices
// This run is pessimistic, number of vertices corresponds with number if transactions
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplit)
// ->RangeMultiplier(10)
// ->Range(100'000, 100'000)
// ->Unit(::benchmark::kMillisecond);
// Range:
// Number of vertices
// This run is optimistic, in this run there are no transactions
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithGc)
// ->RangeMultiplier(10)
// ->Range(100'000, 1'000'000)
// ->Unit(::benchmark::kMillisecond);
// Args:
// Number of vertices
// Number of edges
// Number of transaction
BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
->Args({100'000, 100'000, 100})
->Args({200'000, 100'000, 100})
->Args({300'000, 100'000, 100})
->Args({400'000, 100'000, 100})
->Args({500'000, 100'000, 100})
->Args({600'000, 100'000, 100})
->Args({700'000, 100'000, 100})
->Args({800'000, 100'000, 100})
->Args({900'000, 100'000, 100})
->Args({1'000'000, 100'000, 100})
->Args({2'000'000, 100'000, 100})
->Args({3'000'000, 100'000, 100})
->Args({4'000'000, 100'000, 100})
->Args({5'000'000, 100'000, 100})
->Args({6'000'000, 100'000, 100})
->Args({7'000'000, 100'000, 100})
->Args({8'000'000, 100'000, 100})
->Args({9'000'000, 100'000, 100})
->Args({10'000'000, 100'000, 100})
->Unit(::benchmark::kMillisecond)
->Name("IncreaseVertices");
BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
->Args({100'000, 100'000, 100})
->Args({100'000, 200'000, 100})
->Args({100'000, 300'000, 100})
->Args({100'000, 400'000, 100})
->Args({100'000, 500'000, 100})
->Args({100'000, 600'000, 100})
->Args({100'000, 700'000, 100})
->Args({100'000, 800'000, 100})
->Args({100'000, 900'000, 100})
->Args({100'000, 1'000'000, 100})
->Args({100'000, 2'000'000, 100})
->Args({100'000, 3'000'000, 100})
->Args({100'000, 4'000'000, 100})
->Args({100'000, 5'000'000, 100})
->Args({100'000, 6'000'000, 100})
->Args({100'000, 7'000'000, 100})
->Args({100'000, 8'000'000, 100})
->Args({100'000, 9'000'000, 100})
->Args({100'000, 10'000'000, 100})
->Unit(::benchmark::kMillisecond)
->Name("IncreaseEdges");
BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
->Args({100'000, 100'000, 100})
->Args({100'000, 100'000, 1'000})
->Args({100'000, 100'000, 10'000})
->Args({100'000, 100'000, 100'000})
->Unit(::benchmark::kMillisecond)
->Name("IncreaseTransactions");
} // namespace memgraph::benchmark
BENCHMARK_MAIN();

View File

@ -0,0 +1,270 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <cstdint>
#include <optional>
#include <vector>
#include <benchmark/benchmark.h>
#include <gflags/gflags.h>
#include "storage/v3/id_types.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_id.hpp"
namespace memgraph::benchmark {
class ShardSplitBenchmark : public ::benchmark::Fixture {
protected:
using PrimaryKey = storage::v3::PrimaryKey;
using PropertyId = storage::v3::PropertyId;
using PropertyValue = storage::v3::PropertyValue;
using LabelId = storage::v3::LabelId;
using EdgeTypeId = storage::v3::EdgeTypeId;
using Shard = storage::v3::Shard;
using VertexId = storage::v3::VertexId;
using Gid = storage::v3::Gid;
void SetUp(const ::benchmark::State &state) override {
storage.emplace(primary_label, min_pk, std::nullopt, schema_property_vector);
storage->StoreMapping(
{{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}});
}
void TearDown(const ::benchmark::State &) override { storage = std::nullopt; }
const PropertyId primary_property{PropertyId::FromUint(2)};
const PropertyId secondary_property{PropertyId::FromUint(5)};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const LabelId primary_label{LabelId::FromUint(1)};
const LabelId secondary_label{LabelId::FromUint(4)};
const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)};
std::optional<Shard> storage;
coordinator::Hlc last_hlc{0, io::Time{}};
coordinator::Hlc GetNextHlc() {
++last_hlc.logical_id;
last_hlc.coordinator_wall_clock += std::chrono::seconds(1);
return last_hlc;
}
};
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplit)(::benchmark::State &state) {
const auto number_of_vertices{state.range(0)};
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices);
for (int64_t i{0}; i < number_of_vertices; ++i) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)},
{{secondary_property, PropertyValue(i)}})
.HasValue(),
"Failed creating with pk {}", i);
if (i > 1) {
const auto vtx1 = uniform_dist(e1) % i;
const auto vtx2 = uniform_dist(e1) % i;
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
for (auto _ : state) {
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithGc)(::benchmark::State &state) {
const auto number_of_vertices{state.range(0)};
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices);
for (int64_t i{0}; i < number_of_vertices; ++i) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)},
{{secondary_property, PropertyValue(i)}})
.HasValue(),
"Failed creating with pk {}", i);
if (i > 1) {
const auto vtx1 = uniform_dist(e1) % i;
const auto vtx2 = uniform_dist(e1) % i;
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
storage->CollectGarbage(GetNextHlc().coordinator_wall_clock);
for (auto _ : state) {
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)(::benchmark::State &state) {
const auto number_of_vertices = state.range(0);
const auto number_of_edges = state.range(1);
const auto number_of_transactions = state.range(2);
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices - number_of_transactions - 1);
// Create Vertices
int64_t vertex_count{0};
{
auto acc = storage->Access(GetNextHlc());
for (; vertex_count < number_of_vertices - number_of_transactions; ++vertex_count) {
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)},
{{secondary_property, PropertyValue(vertex_count)}})
.HasValue(),
"Failed creating with pk {}", vertex_count);
}
// Create Edges
for (int64_t i{0}; i < number_of_edges; ++i) {
const auto vtx1 = uniform_dist(e1);
const auto vtx2 = uniform_dist(e1);
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
// Clean up transactional data
storage->CollectGarbage(GetNextHlc().coordinator_wall_clock);
// Create rest of the objects and leave transactions
for (; vertex_count < number_of_vertices; ++vertex_count) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)},
{{secondary_property, PropertyValue(vertex_count)}})
.HasValue(),
"Failed creating with pk {}", vertex_count);
acc.Commit(GetNextHlc());
}
for (auto _ : state) {
// Don't create shard since shard deallocation can take some time as well
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
// Range:
// Number of vertices
// This run is pessimistic, number of vertices corresponds with number if transactions
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplit)
// ->RangeMultiplier(10)
// ->Range(100'000, 100'000)
// ->Unit(::benchmark::kMillisecond);
// Range:
// Number of vertices
// This run is optimistic, in this run there are no transactions
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithGc)
// ->RangeMultiplier(10)
// ->Range(100'000, 1'000'000)
// ->Unit(::benchmark::kMillisecond);
// Args:
// Number of vertices
// Number of edges
// Number of transaction
BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
// ->Args({100'000, 100'000, 100})
// ->Args({200'000, 100'000, 100})
// ->Args({300'000, 100'000, 100})
// ->Args({400'000, 100'000, 100})
// ->Args({500'000, 100'000, 100})
// ->Args({600'000, 100'000, 100})
// ->Args({700'000, 100'000, 100})
// ->Args({800'000, 100'000, 100})
->Args({900'000, 100'000, 100})
// ->Args({1'000'000, 100'000, 100})
// ->Args({2'000'000, 100'000, 100})
// ->Args({3'000'000, 100'000, 100})
// ->Args({4'000'000, 100'000, 100})
// ->Args({6'000'000, 100'000, 100})
// ->Args({7'000'000, 100'000, 100})
// ->Args({8'000'000, 100'000, 100})
// ->Args({9'000'000, 100'000, 100})
// ->Args({10'000'000, 100'000, 100})
->Unit(::benchmark::kMillisecond)
->Name("IncreaseVertices");
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
// ->Args({100'000, 100'000, 100})
// ->Args({200'000, 100'000, 100})
// ->Args({300'000, 100'000, 100})
// ->Args({400'000, 100'000, 100})
// ->Args({500'000, 100'000, 100})
// ->Args({600'000, 100'000, 100})
// ->Args({700'000, 100'000, 100})
// ->Args({800'000, 100'000, 100})
// ->Args({900'000, 100'000, 100})
// ->Args({1'000'000, 100'000, 100})
// ->Args({2'000'000, 100'000, 100})
// ->Args({3'000'000, 100'000, 100})
// ->Args({4'000'000, 100'000, 100})
// ->Args({6'000'000, 100'000, 100})
// ->Args({7'000'000, 100'000, 100})
// ->Args({8'000'000, 100'000, 100})
// ->Args({9'000'000, 100'000, 100})
// ->Args({10'000'000, 100'000, 100})
// ->Unit(::benchmark::kMillisecond)
// ->Name("IncreaseVertices");
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
// ->Args({100'000, 100'000, 100})
// ->Args({100'000, 200'000, 100})
// ->Args({100'000, 300'000, 100})
// ->Args({100'000, 400'000, 100})
// ->Args({100'000, 500'000, 100})
// ->Args({100'000, 600'000, 100})
// ->Args({100'000, 700'000, 100})
// ->Args({100'000, 800'000, 100})
// ->Args({100'000, 900'000, 100})
// ->Args({100'000, 1'000'000, 100})
// ->Args({100'000, 2'000'000, 100})
// ->Args({100'000, 3'000'000, 100})
// ->Args({100'000, 4'000'000, 100})
// ->Args({100'000, 5'000'000, 100})
// ->Args({100'000, 6'000'000, 100})
// ->Args({100'000, 7'000'000, 100})
// ->Args({100'000, 8'000'000, 100})
// ->Args({100'000, 9'000'000, 100})
// ->Args({100'000, 10'000'000, 100})
// ->Unit(::benchmark::kMillisecond)
// ->Name("IncreaseEdges");
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
// ->Args({100'000, 100'000, 100})
// ->Args({100'000, 100'000, 1'000})
// ->Args({100'000, 100'000, 10'000})
// ->Args({100'000, 100'000, 100'000})
// ->Unit(::benchmark::kMillisecond)
// ->Name("IncreaseTransactions");
} // namespace memgraph::benchmark
BENCHMARK_MAIN();

View File

@ -0,0 +1,270 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <cstdint>
#include <optional>
#include <vector>
#include <benchmark/benchmark.h>
#include <gflags/gflags.h>
#include "storage/v3/id_types.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_id.hpp"
namespace memgraph::benchmark {
class ShardSplitBenchmark : public ::benchmark::Fixture {
protected:
using PrimaryKey = storage::v3::PrimaryKey;
using PropertyId = storage::v3::PropertyId;
using PropertyValue = storage::v3::PropertyValue;
using LabelId = storage::v3::LabelId;
using EdgeTypeId = storage::v3::EdgeTypeId;
using Shard = storage::v3::Shard;
using VertexId = storage::v3::VertexId;
using Gid = storage::v3::Gid;
void SetUp(const ::benchmark::State &state) override {
storage.emplace(primary_label, min_pk, std::nullopt, schema_property_vector);
storage->StoreMapping(
{{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}});
}
void TearDown(const ::benchmark::State &) override { storage = std::nullopt; }
const PropertyId primary_property{PropertyId::FromUint(2)};
const PropertyId secondary_property{PropertyId::FromUint(5)};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const LabelId primary_label{LabelId::FromUint(1)};
const LabelId secondary_label{LabelId::FromUint(4)};
const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)};
std::optional<Shard> storage;
coordinator::Hlc last_hlc{0, io::Time{}};
coordinator::Hlc GetNextHlc() {
++last_hlc.logical_id;
last_hlc.coordinator_wall_clock += std::chrono::seconds(1);
return last_hlc;
}
};
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplit)(::benchmark::State &state) {
const auto number_of_vertices{state.range(0)};
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices);
for (int64_t i{0}; i < number_of_vertices; ++i) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)},
{{secondary_property, PropertyValue(i)}})
.HasValue(),
"Failed creating with pk {}", i);
if (i > 1) {
const auto vtx1 = uniform_dist(e1) % i;
const auto vtx2 = uniform_dist(e1) % i;
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
for (auto _ : state) {
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithGc)(::benchmark::State &state) {
const auto number_of_vertices{state.range(0)};
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices);
for (int64_t i{0}; i < number_of_vertices; ++i) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(i)},
{{secondary_property, PropertyValue(i)}})
.HasValue(),
"Failed creating with pk {}", i);
if (i > 1) {
const auto vtx1 = uniform_dist(e1) % i;
const auto vtx2 = uniform_dist(e1) % i;
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
storage->CollectGarbage(GetNextHlc().coordinator_wall_clock);
for (auto _ : state) {
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
BENCHMARK_DEFINE_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)(::benchmark::State &state) {
const auto number_of_vertices = state.range(0);
const auto number_of_edges = state.range(1);
const auto number_of_transactions = state.range(2);
std::random_device r;
std::default_random_engine e1(r());
std::uniform_int_distribution<int> uniform_dist(0, number_of_vertices - number_of_transactions - 1);
// Create Vertices
int64_t vertex_count{0};
{
auto acc = storage->Access(GetNextHlc());
for (; vertex_count < number_of_vertices - number_of_transactions; ++vertex_count) {
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)},
{{secondary_property, PropertyValue(vertex_count)}})
.HasValue(),
"Failed creating with pk {}", vertex_count);
}
// Create Edges
for (int64_t i{0}; i < number_of_edges; ++i) {
const auto vtx1 = uniform_dist(e1);
const auto vtx2 = uniform_dist(e1);
MG_ASSERT(acc.CreateEdge(VertexId{primary_label, {PropertyValue(vtx1)}},
VertexId{primary_label, {PropertyValue(vtx2)}}, edge_type_id, Gid::FromUint(i))
.HasValue(),
"Failed on {} and {}", vtx1, vtx2);
}
acc.Commit(GetNextHlc());
}
// Clean up transactional data
storage->CollectGarbage(GetNextHlc().coordinator_wall_clock);
// Create rest of the objects and leave transactions
for (; vertex_count < number_of_vertices; ++vertex_count) {
auto acc = storage->Access(GetNextHlc());
MG_ASSERT(acc.CreateVertexAndValidate({secondary_label}, PrimaryKey{PropertyValue(vertex_count)},
{{secondary_property, PropertyValue(vertex_count)}})
.HasValue(),
"Failed creating with pk {}", vertex_count);
acc.Commit(GetNextHlc());
}
for (auto _ : state) {
// Don't create shard since shard deallocation can take some time as well
auto data = storage->PerformSplit(PrimaryKey{PropertyValue{number_of_vertices / 2}}, 2);
}
}
// Range:
// Number of vertices
// This run is pessimistic, number of vertices corresponds with number if transactions
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplit)
// ->RangeMultiplier(10)
// ->Range(100'000, 100'000)
// ->Unit(::benchmark::kMillisecond);
// Range:
// Number of vertices
// This run is optimistic, in this run there are no transactions
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithGc)
// ->RangeMultiplier(10)
// ->Range(100'000, 1'000'000)
// ->Unit(::benchmark::kMillisecond);
// Args:
// Number of vertices
// Number of edges
// Number of transaction
BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
// ->Args({100'000, 100'000, 100})
// ->Args({200'000, 100'000, 100})
// ->Args({300'000, 100'000, 100})
// ->Args({400'000, 100'000, 100})
// ->Args({500'000, 100'000, 100})
// ->Args({600'000, 100'000, 100})
// ->Args({700'000, 100'000, 100})
// ->Args({800'000, 100'000, 100})
// ->Args({900'000, 100'000, 100})
->Args({1'000'000, 100'000, 100})
// ->Args({2'000'000, 100'000, 100})
// ->Args({3'000'000, 100'000, 100})
// ->Args({4'000'000, 100'000, 100})
// ->Args({6'000'000, 100'000, 100})
// ->Args({7'000'000, 100'000, 100})
// ->Args({8'000'000, 100'000, 100})
// ->Args({9'000'000, 100'000, 100})
// ->Args({10'000'000, 100'000, 100})
->Unit(::benchmark::kMillisecond)
->Name("IncreaseVertices");
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
// ->Args({100'000, 100'000, 100})
// ->Args({200'000, 100'000, 100})
// ->Args({300'000, 100'000, 100})
// ->Args({400'000, 100'000, 100})
// ->Args({500'000, 100'000, 100})
// ->Args({600'000, 100'000, 100})
// ->Args({700'000, 100'000, 100})
// ->Args({800'000, 100'000, 100})
// ->Args({900'000, 100'000, 100})
// ->Args({1'000'000, 100'000, 100})
// ->Args({2'000'000, 100'000, 100})
// ->Args({3'000'000, 100'000, 100})
// ->Args({4'000'000, 100'000, 100})
// ->Args({6'000'000, 100'000, 100})
// ->Args({7'000'000, 100'000, 100})
// ->Args({8'000'000, 100'000, 100})
// ->Args({9'000'000, 100'000, 100})
// ->Args({10'000'000, 100'000, 100})
// ->Unit(::benchmark::kMillisecond)
// ->Name("IncreaseVertices");
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
// ->Args({100'000, 100'000, 100})
// ->Args({100'000, 200'000, 100})
// ->Args({100'000, 300'000, 100})
// ->Args({100'000, 400'000, 100})
// ->Args({100'000, 500'000, 100})
// ->Args({100'000, 600'000, 100})
// ->Args({100'000, 700'000, 100})
// ->Args({100'000, 800'000, 100})
// ->Args({100'000, 900'000, 100})
// ->Args({100'000, 1'000'000, 100})
// ->Args({100'000, 2'000'000, 100})
// ->Args({100'000, 3'000'000, 100})
// ->Args({100'000, 4'000'000, 100})
// ->Args({100'000, 5'000'000, 100})
// ->Args({100'000, 6'000'000, 100})
// ->Args({100'000, 7'000'000, 100})
// ->Args({100'000, 8'000'000, 100})
// ->Args({100'000, 9'000'000, 100})
// ->Args({100'000, 10'000'000, 100})
// ->Unit(::benchmark::kMillisecond)
// ->Name("IncreaseEdges");
// BENCHMARK_REGISTER_F(ShardSplitBenchmark, BigDataSplitWithFewTransactionsOnVertices)
// ->Args({100'000, 100'000, 100})
// ->Args({100'000, 100'000, 1'000})
// ->Args({100'000, 100'000, 10'000})
// ->Args({100'000, 100'000, 100'000})
// ->Unit(::benchmark::kMillisecond)
// ->Name("IncreaseTransactions");
} // namespace memgraph::benchmark
BENCHMARK_MAIN();

View File

@ -294,6 +294,9 @@ target_link_libraries(${test_prefix}storage_v3_expr mg-storage-v3 mg-expr)
add_unit_test(storage_v3_schema.cpp)
target_link_libraries(${test_prefix}storage_v3_schema mg-storage-v3)
add_unit_test(storage_v3_shard_split.cpp)
target_link_libraries(${test_prefix}storage_v3_shard_split mg-storage-v3 mg-query-v2)
# Test mg-query-v2
# These are commented out because of the new TypedValue in the query engine
# add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp)

View File

@ -0,0 +1,501 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <cstdint>
#include <memory>
#include <gmock/gmock-matchers.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "coordinator/hybrid_logical_clock.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/delta.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/mvcc.hpp"
#include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_id.hpp"
using testing::Pair;
using testing::UnorderedElementsAre;
namespace memgraph::storage::v3::tests {
class ShardSplitTest : public testing::Test {
protected:
void SetUp() override {
storage.StoreMapping(
{{1, "label"}, {2, "property"}, {3, "edge_property"}, {4, "secondary_label"}, {5, "secondary_prop"}});
}
const PropertyId primary_property{PropertyId::FromUint(2)};
const PropertyId secondary_property{PropertyId::FromUint(5)};
std::vector<storage::v3::SchemaProperty> schema_property_vector = {
storage::v3::SchemaProperty{primary_property, common::SchemaType::INT}};
const std::vector<PropertyValue> min_pk{PropertyValue{0}};
const LabelId primary_label{LabelId::FromUint(1)};
const LabelId secondary_label{LabelId::FromUint(4)};
const EdgeTypeId edge_type_id{EdgeTypeId::FromUint(3)};
Shard storage{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector};
coordinator::Hlc last_hlc{0, io::Time{}};
coordinator::Hlc GetNextHlc() {
++last_hlc.logical_id;
last_hlc.coordinator_wall_clock += std::chrono::seconds(1);
return last_hlc;
}
void AssertShardState(auto &shard, const int split_min, const int split_max) {
auto acc = shard.Access(GetNextHlc());
for (int i{0}; i < split_min; ++i) {
EXPECT_FALSE(acc.FindVertex(PrimaryKey{{PropertyValue(i)}}, View::OLD).has_value());
}
for (int i{split_min}; i < split_max; ++i) {
const auto vtx = acc.FindVertex(PrimaryKey{{PropertyValue(i)}}, View::OLD);
ASSERT_TRUE(vtx.has_value());
EXPECT_TRUE(vtx->InEdges(View::OLD)->size() == 1 || vtx->OutEdges(View::OLD)->size() == 1);
}
}
};
void AssertEqVertexContainer(const VertexContainer &actual, const VertexContainer &expected) {
ASSERT_EQ(actual.size(), expected.size());
auto expected_it = expected.begin();
auto actual_it = actual.begin();
while (expected_it != expected.end()) {
EXPECT_EQ(actual_it->first, expected_it->first);
EXPECT_EQ(actual_it->second.deleted, expected_it->second.deleted);
EXPECT_EQ(actual_it->second.labels, expected_it->second.labels);
auto *expected_delta = expected_it->second.delta;
auto *actual_delta = actual_it->second.delta;
// This asserts delta chain
while (expected_delta != nullptr) {
EXPECT_EQ(actual_delta->action, expected_delta->action);
EXPECT_EQ(actual_delta->id, expected_delta->id);
switch (expected_delta->action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL: {
EXPECT_EQ(actual_delta->label, expected_delta->label);
break;
}
case Delta::Action::SET_PROPERTY: {
EXPECT_EQ(actual_delta->property.key, expected_delta->property.key);
EXPECT_EQ(actual_delta->property.value, expected_delta->property.value);
break;
}
case Delta::Action::ADD_IN_EDGE:
case Delta::Action::ADD_OUT_EDGE:
case Delta::Action::REMOVE_IN_EDGE:
case Delta::Action::RECREATE_OBJECT:
case Delta::Action::DELETE_OBJECT:
case Delta::Action::REMOVE_OUT_EDGE: {
break;
}
}
const auto expected_prev = expected_delta->prev.Get();
const auto actual_prev = actual_delta->prev.Get();
switch (expected_prev.type) {
case PreviousPtr::Type::NULLPTR: {
ASSERT_EQ(actual_prev.type, PreviousPtr::Type::NULLPTR) << "Expected type is nullptr!";
break;
}
case PreviousPtr::Type::DELTA: {
ASSERT_EQ(actual_prev.type, PreviousPtr::Type::DELTA) << "Expected type is delta!";
EXPECT_EQ(actual_prev.delta->action, expected_prev.delta->action);
EXPECT_EQ(actual_prev.delta->id, expected_prev.delta->id);
break;
}
case v3::PreviousPtr::Type::EDGE: {
ASSERT_EQ(actual_prev.type, PreviousPtr::Type::EDGE) << "Expected type is edge!";
EXPECT_EQ(actual_prev.edge->gid, expected_prev.edge->gid);
break;
}
case v3::PreviousPtr::Type::VERTEX: {
ASSERT_EQ(actual_prev.type, PreviousPtr::Type::VERTEX) << "Expected type is vertex!";
EXPECT_EQ(actual_prev.vertex->first, expected_prev.vertex->first);
break;
}
}
expected_delta = expected_delta->next;
actual_delta = actual_delta->next;
}
EXPECT_EQ(expected_delta, nullptr);
EXPECT_EQ(actual_delta, nullptr);
++expected_it;
++actual_it;
}
}
void AssertEqDeltaLists(const std::list<Delta> &actual, const std::list<Delta> &expected) {
EXPECT_EQ(actual.size(), expected.size());
auto actual_it = actual.begin();
auto expected_it = expected.begin();
while (actual_it != actual.end()) {
EXPECT_EQ(actual_it->id, expected_it->id);
EXPECT_EQ(actual_it->action, expected_it->action);
++actual_it;
++expected_it;
}
}
void AddDeltaToDeltaChain(Vertex *object, Delta *new_delta) {
auto *delta_holder = GetDeltaHolder(object);
new_delta->next = delta_holder->delta;
new_delta->prev.Set(object);
if (delta_holder->delta) {
delta_holder->delta->prev.Set(new_delta);
}
delta_holder->delta = new_delta;
}
TEST_F(ShardSplitTest, TestBasicSplitWithVertices) {
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(
acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {{secondary_property, PropertyValue(121)}})
.HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
auto current_hlc = GetNextHlc();
acc.Commit(current_hlc);
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 0);
EXPECT_EQ(splitted_data.transactions.size(), 1);
EXPECT_EQ(splitted_data.label_indices.size(), 0);
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
CommitInfo commit_info{.start_or_commit_timestamp = current_hlc};
Delta delta_delete1{Delta::DeleteObjectTag{}, &commit_info, 4, 1};
Delta delta_delete2{Delta::DeleteObjectTag{}, &commit_info, 5, 2};
Delta delta_remove_label{Delta::RemoveLabelTag{}, secondary_label, &commit_info, 7, 4};
Delta delta_set_property{Delta::SetPropertyTag{}, secondary_property, PropertyValue(), &commit_info, 6, 4};
Delta delta_delete3{Delta::DeleteObjectTag{}, &commit_info, 8, 3};
VertexContainer expected_vertices;
auto [it_4, inserted1] = expected_vertices.emplace(PrimaryKey{PropertyValue{4}}, VertexData(&delta_delete1));
delta_delete1.prev.Set(&*it_4);
auto [it_5, inserted2] = expected_vertices.emplace(PrimaryKey{PropertyValue{5}}, VertexData(&delta_delete2));
delta_delete2.prev.Set(&*it_5);
auto [it_6, inserted3] = expected_vertices.emplace(PrimaryKey{PropertyValue{6}}, VertexData(&delta_delete3));
delta_delete3.prev.Set(&*it_6);
it_5->second.labels.push_back(secondary_label);
AddDeltaToDeltaChain(&*it_5, &delta_set_property);
AddDeltaToDeltaChain(&*it_5, &delta_remove_label);
AssertEqVertexContainer(splitted_data.vertices, expected_vertices);
// This is to ensure that the transaction that we have don't point to invalid
// object on the other shard
std::list<Delta> expected_deltas;
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 4, 1);
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 5, 2);
expected_deltas.emplace_back(Delta::SetPropertyTag{}, secondary_property, PropertyValue(), &commit_info, 6, 4);
expected_deltas.emplace_back(Delta::RemoveLabelTag{}, secondary_label, &commit_info, 7, 4);
expected_deltas.emplace_back(Delta::DeleteObjectTag{}, &commit_info, 8, 3);
AssertEqDeltaLists(splitted_data.transactions.begin()->second->deltas, expected_deltas);
}
TEST_F(ShardSplitTest, TestBasicSplitVerticesAndEdges) {
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
.HasError());
auto current_hlc = GetNextHlc();
acc.Commit(current_hlc);
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 2);
EXPECT_EQ(splitted_data.transactions.size(), 1);
EXPECT_EQ(splitted_data.label_indices.size(), 0);
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
CommitInfo commit_info{.start_or_commit_timestamp = current_hlc};
Delta delta_delete1{Delta::DeleteObjectTag{}, &commit_info, 12, 1};
Delta delta_delete2{Delta::DeleteObjectTag{}, &commit_info, 13, 1};
Delta delta_delete3{Delta::DeleteObjectTag{}, &commit_info, 14, 1};
Delta delta_add_in_edge1{Delta::RemoveInEdgeTag{},
edge_type_id,
VertexId{primary_label, {PropertyValue(1)}},
EdgeRef{Gid::FromUint(1)},
&commit_info,
17,
1};
Delta delta_add_out_edge2{Delta::RemoveOutEdgeTag{},
edge_type_id,
VertexId{primary_label, {PropertyValue(6)}},
EdgeRef{Gid::FromUint(2)},
&commit_info,
19,
1};
Delta delta_add_in_edge2{Delta::RemoveInEdgeTag{},
edge_type_id,
VertexId{primary_label, {PropertyValue(4)}},
EdgeRef{Gid::FromUint(2)},
&commit_info,
20,
1};
VertexContainer expected_vertices;
auto [vtx4, inserted4] = expected_vertices.emplace(PrimaryKey{PropertyValue{4}}, VertexData(&delta_delete1));
auto [vtx5, inserted5] = expected_vertices.emplace(PrimaryKey{PropertyValue{5}}, VertexData(&delta_delete2));
auto [vtx6, inserted6] = expected_vertices.emplace(PrimaryKey{PropertyValue{6}}, VertexData(&delta_delete3));
AddDeltaToDeltaChain(&*vtx4, &delta_add_out_edge2);
AddDeltaToDeltaChain(&*vtx5, &delta_add_in_edge1);
AddDeltaToDeltaChain(&*vtx6, &delta_add_in_edge2);
AssertEqVertexContainer(splitted_data.vertices, expected_vertices);
}
TEST_F(ShardSplitTest, TestBasicSplitBeforeCommit) {
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
.HasError());
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 2);
EXPECT_EQ(splitted_data.transactions.size(), 1);
EXPECT_EQ(splitted_data.label_indices.size(), 0);
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
}
TEST_F(ShardSplitTest, TestBasicSplitWithCommitedAndOngoingTransactions) {
{
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
acc.Commit(GetNextHlc());
}
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(3)}},
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
.HasError());
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 2);
EXPECT_EQ(splitted_data.transactions.size(), 2);
EXPECT_EQ(splitted_data.label_indices.size(), 0);
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
}
TEST_F(ShardSplitTest, TestBasicSplitWithLabelIndex) {
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(6)}, {}).HasError());
acc.Commit(GetNextHlc());
storage.CreateIndex(secondary_label);
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 0);
EXPECT_EQ(splitted_data.transactions.size(), 1);
EXPECT_EQ(splitted_data.label_indices.size(), 1);
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
}
TEST_F(ShardSplitTest, TestBasicSplitWithLabelPropertyIndex) {
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(
acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(1)}, {{secondary_property, PropertyValue(1)}})
.HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(
acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(5)}, {{secondary_property, PropertyValue(21)}})
.HasError());
EXPECT_FALSE(
acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(6)}, {{secondary_property, PropertyValue(22)}})
.HasError());
acc.Commit(GetNextHlc());
storage.CreateIndex(secondary_label, secondary_property);
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 0);
EXPECT_EQ(splitted_data.transactions.size(), 1);
EXPECT_EQ(splitted_data.label_indices.size(), 0);
EXPECT_EQ(splitted_data.label_property_indices.size(), 1);
}
TEST_F(ShardSplitTest, TestSplittingShardsWithGcDestroyOriginalShard) {
const auto split_value{4};
PrimaryKey splitted_value{{PropertyValue(4)}};
std::unique_ptr<Shard> splitted_shard;
{
Shard storage2{primary_label, min_pk, std::nullopt /*max_primary_key*/, schema_property_vector};
auto acc = storage2.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(3)}},
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
.HasError());
acc.Commit(GetNextHlc());
auto splitted_data = storage2.PerformSplit({PropertyValue(split_value)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 2);
EXPECT_EQ(splitted_data.transactions.size(), 1);
EXPECT_EQ(splitted_data.label_indices.size(), 0);
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
// Create a new shard
splitted_shard = Shard::FromSplitData(std::move(splitted_data));
// Call gc on old shard
storage2.CollectGarbage(GetNextHlc().coordinator_wall_clock);
// Destroy original
}
splitted_shard->CollectGarbage(GetNextHlc().coordinator_wall_clock);
AssertShardState(*splitted_shard, 4, 6);
}
TEST_F(ShardSplitTest, TestSplittingShardsWithGcDestroySplittedShard) {
PrimaryKey splitted_value{{PropertyValue(4)}};
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(1)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(2)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(3)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(4)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(5)}, {}).HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(6)}, {}).HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(1)}},
VertexId{primary_label, PrimaryKey{PropertyValue(2)}}, edge_type_id, Gid::FromUint(0))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(3)}},
VertexId{primary_label, PrimaryKey{PropertyValue(5)}}, edge_type_id, Gid::FromUint(1))
.HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(4)}},
VertexId{primary_label, PrimaryKey{PropertyValue(6)}}, edge_type_id, Gid::FromUint(2))
.HasError());
acc.Commit(GetNextHlc());
auto splitted_data = storage.PerformSplit({PropertyValue(4)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 3);
EXPECT_EQ(splitted_data.edges->size(), 2);
EXPECT_EQ(splitted_data.transactions.size(), 1);
EXPECT_EQ(splitted_data.label_indices.size(), 0);
EXPECT_EQ(splitted_data.label_property_indices.size(), 0);
{
// Create a new shard
auto splitted_shard = Shard::FromSplitData(std::move(splitted_data));
// Call gc on new shard
splitted_shard->CollectGarbage(GetNextHlc().coordinator_wall_clock);
// Destroy splitted shard
}
storage.CollectGarbage(GetNextHlc().coordinator_wall_clock);
AssertShardState(storage, 1, 3);
}
TEST_F(ShardSplitTest, TestBigSplit) {
int pk{0};
for (int64_t i{0}; i < 10'000; ++i) {
auto acc = storage.Access(GetNextHlc());
EXPECT_FALSE(
acc.CreateVertexAndValidate({secondary_label}, {PropertyValue(pk++)}, {{secondary_property, PropertyValue(i)}})
.HasError());
EXPECT_FALSE(acc.CreateVertexAndValidate({}, {PropertyValue(pk++)}, {}).HasError());
EXPECT_FALSE(acc.CreateEdge(VertexId{primary_label, PrimaryKey{PropertyValue(pk - 2)}},
VertexId{primary_label, PrimaryKey{PropertyValue(pk - 1)}}, edge_type_id,
Gid::FromUint(pk))
.HasError());
acc.Commit(GetNextHlc());
}
storage.CreateIndex(secondary_label, secondary_property);
const auto split_value = pk / 2;
auto splitted_data = storage.PerformSplit({PropertyValue(split_value)}, 2);
EXPECT_EQ(splitted_data.vertices.size(), 10000);
EXPECT_EQ(splitted_data.edges->size(), 5000);
EXPECT_EQ(splitted_data.transactions.size(), 5000);
EXPECT_EQ(splitted_data.label_indices.size(), 0);
EXPECT_EQ(splitted_data.label_property_indices.size(), 1);
auto shard = Shard::FromSplitData(std::move(splitted_data));
AssertShardState(*shard, split_value, split_value * 2);
}
} // namespace memgraph::storage::v3::tests