Extract splitter

This commit is contained in:
jbajic 2023-01-16 14:54:06 +01:00
parent 8a1dd54735
commit 348b45360b
5 changed files with 259 additions and 200 deletions

View File

@ -18,6 +18,7 @@ set(storage_v3_src_files
bindings/typed_value.cpp
expr.cpp
vertex.cpp
splitter.cpp
request_helper.cpp)
# ######################

View File

@ -332,7 +332,8 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
vertex_validator_{schema_validator_, primary_label},
indices_{config.items, vertex_validator_},
isolation_level_{config.transaction.isolation_level},
config_{config} {
config_{config},
shard_splitter_(vertices_, edges_, start_logical_id_to_transaction_, config_) {
CreateSchema(primary_label_, schema);
StoreMapping(std::move(id_to_name));
}
@ -1056,158 +1057,7 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
return std::nullopt;
}
void Shard::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const {
while (delta != nullptr) {
collected_transactions_start_id.insert(delta->commit_info->start_or_commit_timestamp.logical_id);
delta = delta->next;
}
}
VertexContainer Shard::CollectVertices(std::set<uint64_t> &collected_transactions_start_id,
const PrimaryKey &split_key) {
VertexContainer splitted_data;
auto split_key_it = vertices_.find(split_key);
while (split_key_it != vertices_.end()) {
// Go through deltas and pick up transactions start_id
ScanDeltas(collected_transactions_start_id, split_key_it->second.delta);
auto next_it = std::next(split_key_it);
splitted_data.insert(vertices_.extract(split_key_it->first));
split_key_it = next_it;
}
return splitted_data;
}
std::optional<EdgeContainer> Shard::CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
const VertexContainer &split_vertices, const PrimaryKey &split_key) {
if (!config_.items.properties_on_edges) {
return std::nullopt;
}
EdgeContainer splitted_edges;
const auto split_edges_from_vertex = [&](const auto &edges_ref) {
// This is safe since if properties_on_edges is true, the this must be a
// ptr
for (const auto &edge_ref : edges_ref) {
auto *edge = std::get<2>(edge_ref).ptr;
const auto &other_vtx = std::get<1>(edge_ref);
ScanDeltas(collected_transactions_start_id, edge->delta);
// Check if src and dest edge are both on splitted shard
// so we know if we should remove orphan edge
if (other_vtx.primary_key >= split_key) {
// Remove edge from shard
splitted_edges.insert(edges_.extract(edge->gid));
} else {
splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}});
}
}
};
for (const auto &vertex : split_vertices) {
split_edges_from_vertex(vertex.second.in_edges);
split_edges_from_vertex(vertex.second.out_edges);
}
return splitted_edges;
}
void Shard::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
// Align next and prev in deltas
// NOTE It is important that the order of delta lists is in same order
auto delta_it = transaction.deltas.begin();
auto cloned_delta_it = cloned_transaction.deltas.begin();
while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) {
MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct");
// Find appropriate prev and delta->next for cloned deltas
// auto *prev = &delta_it->prev;
// auto *cloned_prev = &cloned_delta_it->prev;
const auto *delta = &*delta_it;
auto *cloned_delta = &*cloned_delta_it;
while (delta != nullptr) {
// Align delta, while ignoring deltas whose transactions have commited,
// or aborted
if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) {
cloned_delta->next = &*std::ranges::find_if(
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas,
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
} else {
delta = delta->next;
continue;
}
// Align prev ptr
auto ptr = delta->prev.Get();
switch (ptr.type) {
case PreviousPtr::Type::NULLPTR: {
// noop
break;
}
case PreviousPtr::Type::DELTA: {
cloned_delta->prev.Set(ptr.delta);
break;
}
case PreviousPtr::Type::VERTEX: {
// What if the vertex is already moved to garbage collection...
// Make test when you have deleted vertex
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
cloned_delta->prev.Set(cloned_vertex);
break;
}
case PreviousPtr::Type::EDGE: {
// TODO Case when there are no properties on edge is not handled
auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid);
cloned_delta->prev.Set(&cloned_edge->second);
break;
}
};
cloned_delta = cloned_delta->next;
delta = delta->next;
}
++delta_it;
++cloned_delta_it;
}
MG_ASSERT(delta_it == transaction.deltas.end() && cloned_delta_it == cloned_transaction.deltas.end(),
"Both iterators must be exhausted!");
}
void Shard::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
cloned_vertices, cloned_edges);
}
}
std::map<uint64_t, Transaction> Shard::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges) {
std::map<uint64_t, Transaction> transactions;
for (const auto commit_start : collected_transactions_start_id) {
// If it does not contain then the transaction has commited, and we ignore it
if (start_logical_id_to_transaction_.contains(commit_start)) {
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
}
}
// It is necessary to clone all the transactions first so we have new addresses
// for deltas, before doing alignment of deltas and prev_ptr
AlignClonedTransactions(transactions, cloned_vertices, cloned_edges);
return transactions;
}
SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
SplitData data;
std::set<uint64_t> collected_transactions_start_id;
data.vertices = CollectVertices(collected_transactions_start_id, split_key);
data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key);
data.transactions = CollectTransactions(collected_transactions_start_id, data.vertices, *data.edges);
// TODO indices wont work since timestamp cannot be replicated
// data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
return data;
}
SplitData Shard::PerformSplit(const PrimaryKey &split_key) { return shard_splitter_.SplitShard(split_key); }
bool Shard::IsVertexBelongToShard(const VertexId &vertex_id) const {
return vertex_id.primary_label == primary_label_ && vertex_id.primary_key >= min_primary_key_ &&

View File

@ -37,6 +37,7 @@
#include "storage/v3/result.hpp"
#include "storage/v3/schema_validator.hpp"
#include "storage/v3/schemas.hpp"
#include "storage/v3/splitter.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
#include "storage/v3/vertex_accessor.hpp"
@ -179,15 +180,6 @@ struct SplitInfo {
PrimaryKey split_point;
};
// If edge properties-on-edges is false then we don't need to send edges but
// only vertices, since they will contain those edges
struct SplitData {
VertexContainer vertices;
std::optional<EdgeContainer> edges;
IndicesInfo indices_info;
std::map<uint64_t, Transaction> transactions;
};
/// Structure used to return information about the storage.
struct StorageInfo {
uint64_t vertex_count;
@ -379,44 +371,6 @@ class Shard final {
SplitData PerformSplit(const PrimaryKey &split_key);
private:
template <typename TObj>
requires utils::SameAsAnyOf<TObj, Edge, VertexData>
void AdjustSplittedDataDeltas(TObj &delta_holder, const std::map<int64_t, Transaction> &transactions) {
auto *delta_chain = delta_holder.delta;
Delta *new_delta_chain{nullptr};
while (delta_chain != nullptr) {
auto &transaction = transactions.at(delta_chain->command_id);
// This is the address of corresponding delta
const auto transaction_delta_it = std::ranges::find_if(
transaction->deltas, [delta_uuid = delta_chain->uuid](const auto &elem) { return elem.uuid == delta_uuid; });
// Add this delta to the new chain
if (new_delta_chain == nullptr) {
new_delta_chain = &*transaction_delta_it;
} else {
new_delta_chain->next = &*transaction_delta_it;
}
delta_chain = delta_chain->next;
}
delta_holder.delta = new_delta_chain;
}
void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const;
void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key);
std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
const VertexContainer &split_vertices, const PrimaryKey &split_key);
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
@ -456,6 +410,7 @@ class Shard final {
// Holds all of the (in progress, committed and aborted) transactions that are read or write to this shard, but
// haven't been cleaned up yet
std::map<uint64_t, std::unique_ptr<Transaction>> start_logical_id_to_transaction_{};
Splitter shard_splitter_;
bool has_any_transaction_aborted_since_last_gc{false};
};

183
src/storage/v3/splitter.cpp Normal file
View File

@ -0,0 +1,183 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v3/splitter.hpp"
#include <map>
#include <memory>
#include <set>
#include "storage/v3/config.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/transaction.hpp"
namespace memgraph::storage::v3 {
Splitter::Splitter(VertexContainer &vertices, EdgeContainer &edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Config &config)
: vertices_(vertices),
edges_(edges),
start_logical_id_to_transaction_(start_logical_id_to_transaction),
config_(config) {}
SplitData Splitter::SplitShard(const PrimaryKey &split_key) {
SplitData data;
std::set<uint64_t> collected_transactions_start_id;
data.vertices = CollectVertices(collected_transactions_start_id, split_key);
data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key);
data.transactions = CollectTransactions(collected_transactions_start_id, data.vertices, *data.edges);
// TODO Indices
return data;
}
void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) {
while (delta != nullptr) {
collected_transactions_start_id.insert(delta->commit_info->start_or_commit_timestamp.logical_id);
delta = delta->next;
}
}
VertexContainer Splitter::CollectVertices(std::set<uint64_t> &collected_transactions_start_id,
const PrimaryKey &split_key) {
VertexContainer splitted_data;
auto split_key_it = vertices_.find(split_key);
while (split_key_it != vertices_.end()) {
// Go through deltas and pick up transactions start_id
ScanDeltas(collected_transactions_start_id, split_key_it->second.delta);
auto next_it = std::next(split_key_it);
splitted_data.insert(vertices_.extract(split_key_it->first));
split_key_it = next_it;
}
return splitted_data;
}
std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
const VertexContainer &split_vertices,
const PrimaryKey &split_key) {
if (!config_.items.properties_on_edges) {
return std::nullopt;
}
EdgeContainer splitted_edges;
const auto split_edges_from_vertex = [&](const auto &edges_ref) {
// This is safe since if properties_on_edges is true, the this must be a
// ptr
for (const auto &edge_ref : edges_ref) {
auto *edge = std::get<2>(edge_ref).ptr;
const auto &other_vtx = std::get<1>(edge_ref);
ScanDeltas(collected_transactions_start_id, edge->delta);
// Check if src and dest edge are both on splitted shard
// so we know if we should remove orphan edge
if (other_vtx.primary_key >= split_key) {
// Remove edge from shard
splitted_edges.insert(edges_.extract(edge->gid));
} else {
splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}});
}
}
};
for (const auto &vertex : split_vertices) {
split_edges_from_vertex(vertex.second.in_edges);
split_edges_from_vertex(vertex.second.out_edges);
}
return splitted_edges;
}
std::map<uint64_t, Transaction> Splitter::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges) {
std::map<uint64_t, Transaction> transactions;
for (const auto commit_start : collected_transactions_start_id) {
// If it does not contain then the transaction has commited, and we ignore it
if (start_logical_id_to_transaction_.contains(commit_start)) {
transactions.insert({commit_start, start_logical_id_to_transaction_[commit_start]->Clone()});
}
}
// It is necessary to clone all the transactions first so we have new addresses
// for deltas, before doing alignment of deltas and prev_ptr
AlignClonedTransactions(transactions, cloned_vertices, cloned_edges);
return transactions;
}
void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
// Align next and prev in deltas
// NOTE It is important that the order of delta lists is in same order
auto delta_it = transaction.deltas.begin();
auto cloned_delta_it = cloned_transaction.deltas.begin();
while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) {
MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct");
// Find appropriate prev and delta->next for cloned deltas
// auto *prev = &delta_it->prev;
// auto *cloned_prev = &cloned_delta_it->prev;
const auto *delta = &*delta_it;
auto *cloned_delta = &*cloned_delta_it;
while (delta != nullptr) {
// Align delta, while ignoring deltas whose transactions have commited,
// or aborted
if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) {
cloned_delta->next = &*std::ranges::find_if(
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id).deltas,
[delta](const auto &elem) { return elem.uuid == delta->uuid; });
} else {
delta = delta->next;
continue;
}
// Align prev ptr
auto ptr = delta->prev.Get();
switch (ptr.type) {
case PreviousPtr::Type::NULLPTR: {
// noop
break;
}
case PreviousPtr::Type::DELTA: {
cloned_delta->prev.Set(ptr.delta);
break;
}
case PreviousPtr::Type::VERTEX: {
// What if the vertex is already moved to garbage collection...
// Make test when you have deleted vertex
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
cloned_delta->prev.Set(cloned_vertex);
break;
}
case PreviousPtr::Type::EDGE: {
// TODO Case when there are no properties on edge is not handled
auto *cloned_edge = &*cloned_edges.find(ptr.edge->gid);
cloned_delta->prev.Set(&cloned_edge->second);
break;
}
};
cloned_delta = cloned_delta->next;
delta = delta->next;
}
++delta_it;
++cloned_delta_it;
}
MG_ASSERT(delta_it == transaction.deltas.end() && cloned_delta_it == cloned_transaction.deltas.end(),
"Both iterators must be exhausted!");
}
void Splitter::AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges) {
for (auto &[commit_start, cloned_transaction] : cloned_transactions) {
AlignClonedTransaction(cloned_transaction, *start_logical_id_to_transaction_[commit_start], cloned_transactions,
cloned_vertices, cloned_edges);
}
}
} // namespace memgraph::storage::v3

View File

@ -0,0 +1,70 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <map>
#include <memory>
#include <optional>
#include <set>
#include "storage/v3/config.hpp"
#include "storage/v3/delta.hpp"
#include "storage/v3/edge.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
namespace memgraph::storage::v3 {
// If edge properties-on-edges is false then we don't need to send edges but
// only vertices, since they will contain those edges
struct SplitData {
VertexContainer vertices;
std::optional<EdgeContainer> edges;
std::map<uint64_t, Transaction> transactions;
};
class Splitter final {
public:
Splitter(VertexContainer &vertices, EdgeContainer &edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Config &config);
Splitter(const Splitter &) = delete;
Splitter(Splitter &&) noexcept = delete;
Splitter &operator=(const Splitter &) = delete;
Splitter operator=(Splitter &&) noexcept = delete;
~Splitter() = default;
SplitData SplitShard(const PrimaryKey &split_key);
private:
static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta);
void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key);
std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
const VertexContainer &split_vertices, const PrimaryKey &split_key);
VertexContainer &vertices_;
EdgeContainer &edges_;
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction_;
Config &config_;
};
} // namespace memgraph::storage::v3