Create new shard

This commit is contained in:
jbajic 2023-01-23 13:23:21 +01:00
parent 6ab76fb9ec
commit 97002a50d5
5 changed files with 109 additions and 40 deletions

View File

@ -333,7 +333,46 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
indices_{config.items, vertex_validator_},
isolation_level_{config.transaction.isolation_level},
config_{config},
shard_splitter_(vertices_, edges_, start_logical_id_to_transaction_, indices_, config_) {
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
CreateSchema(primary_label_, schema);
StoreMapping(std::move(id_to_name));
}
Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config,
std::unordered_map<uint64_t, std::string> id_to_name)
: primary_label_{primary_label},
min_primary_key_{min_primary_key},
max_primary_key_{max_primary_key},
vertices_(std::move(vertices)),
edges_(std::move(edges)),
schema_validator_{schemas_, name_id_mapper_},
vertex_validator_{schema_validator_, primary_label},
indices_{config.items, vertex_validator_},
isolation_level_{config.transaction.isolation_level},
config_{config},
start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)),
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
CreateSchema(primary_label_, schema);
StoreMapping(std::move(id_to_name));
}
Shard::Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, VertexContainer &&vertices,
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config,
std::unordered_map<uint64_t, std::string> id_to_name)
: primary_label_{primary_label},
min_primary_key_{min_primary_key},
max_primary_key_{max_primary_key},
vertices_(std::move(vertices)),
schema_validator_{schemas_, name_id_mapper_},
vertex_validator_{schema_validator_, primary_label},
indices_{config.items, vertex_validator_},
isolation_level_{config.transaction.isolation_level},
config_{config},
start_logical_id_to_transaction_(std::move(start_logical_id_to_transaction)),
shard_splitter_(primary_label, vertices_, edges_, start_logical_id_to_transaction_, indices_, config_, schema) {
CreateSchema(primary_label_, schema);
StoreMapping(std::move(id_to_name));
}
@ -434,7 +473,7 @@ ShardResult<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>>
}
std::vector<EdgeAccessor> deleted_edges;
const VertexId vertex_id{shard_->primary_label_, *vertex->PrimaryKey(View::OLD)}; // TODO Replace
const VertexId vertex_id{shard_->primary_label_, *vertex->PrimaryKey(View::OLD)};
for (const auto &item : in_edges) {
auto [edge_type, from_vertex, edge] = item;
EdgeAccessor e(edge, edge_type, from_vertex, vertex_id, transaction_, &shard_->indices_, config_);
@ -1057,7 +1096,9 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
return std::nullopt;
}
SplitData Shard::PerformSplit(const PrimaryKey &split_key) { return shard_splitter_.SplitShard(split_key); }
std::unique_ptr<Shard> Shard::PerformSplit(const PrimaryKey &split_key) {
return shard_splitter_.SplitShard(split_key, max_primary_key_);
}
bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const {
return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ &&

View File

@ -14,6 +14,7 @@
#include <cstdint>
#include <filesystem>
#include <map>
#include <memory>
#include <numeric>
#include <optional>
#include <shared_mutex>
@ -192,9 +193,19 @@ class Shard final {
public:
/// @throw std::system_error
/// @throw std::bad_alloc
explicit Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, Config config = Config(),
std::unordered_map<uint64_t, std::string> id_to_name = {});
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, Config config = Config(),
std::unordered_map<uint64_t, std::string> id_to_name = {});
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, VertexContainer &&vertices, EdgeContainer &&edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config = Config(),
std::unordered_map<uint64_t, std::string> id_to_name = {});
Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
std::vector<SchemaProperty> schema, VertexContainer &&vertices,
std::map<uint64_t, std::unique_ptr<Transaction>> &&start_logical_id_to_transaction, Config config = Config(),
std::unordered_map<uint64_t, std::string> id_to_name = {});
Shard(const Shard &) = delete;
Shard(Shard &&) noexcept = delete;
@ -368,7 +379,7 @@ class Shard final {
std::optional<SplitInfo> ShouldSplit() const noexcept;
SplitData PerformSplit(const PrimaryKey &split_key);
std::unique_ptr<Shard> PerformSplit(const PrimaryKey &split_key);
private:
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);

View File

@ -17,23 +17,29 @@
#include <set>
#include "storage/v3/config.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/indices.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
namespace memgraph::storage::v3 {
Splitter::Splitter(VertexContainer &vertices, EdgeContainer &edges,
Splitter::Splitter(const LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
Config &config)
: vertices_(vertices),
Config &config, const std::vector<SchemaProperty> &schema)
: primary_label_(primary_label),
vertices_(vertices),
edges_(edges),
start_logical_id_to_transaction_(start_logical_id_to_transaction),
indices_(indices),
config_(config) {}
config_(config),
schema_(schema) {}
SplitData Splitter::SplitShard(const PrimaryKey &split_key) {
std::unique_ptr<Shard> Splitter::SplitShard(const PrimaryKey &split_key,
const std::optional<PrimaryKey> &max_primary_key) {
SplitData data;
std::set<uint64_t> collected_transactions_;
@ -41,7 +47,12 @@ SplitData Splitter::SplitShard(const PrimaryKey &split_key) {
data.edges = CollectEdges(collected_transactions_, data.vertices, split_key);
data.transactions = CollectTransactions(collected_transactions_, data.vertices, *data.edges);
return data;
if (data.edges) {
return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
std::move(*data.edges), std::move(data.transactions), config_);
}
return std::make_unique<Shard>(primary_label_, split_key, max_primary_key, schema_, std::move(data.vertices),
std::move(data.transactions), config_);
}
void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_, Delta *delta) {
@ -124,10 +135,9 @@ std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collecte
return splitted_edges;
}
std::map<uint64_t, Transaction> Splitter::CollectTransactions(const std::set<uint64_t> &collected_transactions_,
VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges) {
std::map<uint64_t, Transaction> transactions;
std::map<uint64_t, std::unique_ptr<Transaction>> Splitter::CollectTransactions(
const std::set<uint64_t> &collected_transactions_, VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
for (const auto &[commit_start, transaction] : start_logical_id_to_transaction_) {
// We need all transaction whose deltas need to be resolved for any of the
@ -139,21 +149,21 @@ std::map<uint64_t, Transaction> Splitter::CollectTransactions(const std::set<uin
// It is necessary to clone all the transactions first so we have new addresses
// for deltas, before doing alignment of deltas and prev_ptr
AlignClonedTransactions(transactions, cloned_vertices, cloned_edges);
AdjustClonedTransactions(transactions, cloned_vertices, cloned_edges);
return transactions;
}
void Splitter::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
void Splitter::AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
cloned_vertices, cloned_edges);
AdjustClonedTransaction(*cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
cloned_vertices, cloned_edges);
}
}
void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
// Align next and prev in deltas
// NOTE It is important that the order of delta lists is in same order
auto delta_it = transaction.deltas.begin();
@ -166,7 +176,7 @@ void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Tra
// or aborted
if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) {
auto *found_delta_it = &*std::ranges::find_if(
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas,
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id)->deltas,
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
MG_ASSERT(found_delta_it, "Delta with given uuid must exist!");
cloned_delta->next = &*found_delta_it;

View File

@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <map>
#include <memory>
@ -17,7 +18,10 @@
#include "storage/v3/config.hpp"
#include "storage/v3/delta.hpp"
#include "storage/v3/edge.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/indices.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "utils/concepts.hpp"
@ -29,16 +33,16 @@ namespace memgraph::storage::v3 {
struct SplitData {
VertexContainer vertices;
std::optional<EdgeContainer> edges;
std::map<uint64_t, Transaction> transactions;
std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
std::map<LabelId, LabelIndex::IndexContainer> label_indices;
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndex::IndexContainer> label_property_indices;
};
class Splitter final {
public:
Splitter(VertexContainer &vertices, EdgeContainer &edges,
Splitter(LabelId primary_label, VertexContainer &vertices, EdgeContainer &edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
Config &config);
Config &config, const std::vector<SchemaProperty> &schema);
Splitter(const Splitter &) = delete;
Splitter(Splitter &&) noexcept = delete;
@ -46,11 +50,12 @@ class Splitter final {
Splitter operator=(Splitter &&) noexcept = delete;
~Splitter() = default;
SplitData SplitShard(const PrimaryKey &split_key);
std::unique_ptr<Shard> SplitShard(const PrimaryKey &split_key, const std::optional<PrimaryKey> &max_primary_key);
private:
std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
const std::set<uint64_t> &collected_transactions_start_id, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
VertexContainer CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id,
const PrimaryKey &split_key);
@ -85,18 +90,20 @@ class Splitter final {
static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta);
static void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
static void AdjustClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
void AdjustClonedTransactions(std::map<uint64_t, std::unique_ptr<Transaction>> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
LabelId primary_label_;
VertexContainer &vertices_;
EdgeContainer &edges_;
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction_;
Indices &indices_;
Config &config_;
std::vector<SchemaProperty> schema_;
};
} // namespace memgraph::storage::v3

View File

@ -106,9 +106,9 @@ struct Transaction {
}
// This does not solve the whole problem of copying deltas
Transaction Clone() const {
return {start_timestamp, *commit_info, CopyDeltas(commit_info.get()), command_id, must_abort,
is_aborted, isolation_level};
std::unique_ptr<Transaction> Clone() const {
return std::make_unique<Transaction>(start_timestamp, *commit_info, CopyDeltas(commit_info.get()), command_id,
must_abort, is_aborted, isolation_level);
}
coordinator::Hlc start_timestamp;