Add index split

This commit is contained in:
jbajic 2023-01-18 13:41:54 +01:00
parent 348b45360b
commit 859dfb28eb
4 changed files with 125 additions and 26 deletions

View File

@ -30,6 +30,7 @@ namespace memgraph::storage::v3 {
struct Indices;
class LabelIndex {
public:
struct Entry {
Vertex *vertex;
uint64_t timestamp;
@ -40,12 +41,15 @@ class LabelIndex {
bool operator==(const Entry &rhs) const { return vertex == rhs.vertex && timestamp == rhs.timestamp; }
};
public:
using LabelIndexContainer = std::set<Entry>;
LabelIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator)
: indices_(indices), config_(config), vertex_validator_{&vertex_validator} {}
LabelIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator,
std::map<LabelId, LabelIndexContainer> &data)
: index_{std::move(data)}, indices_(indices), config_(config), vertex_validator_{&vertex_validator} {}
/// @throw std::bad_alloc
void UpdateOnAddLabel(LabelId label, Vertex *vertex, const Transaction &tx);
@ -114,6 +118,10 @@ class LabelIndex {
void Clear() { index_.clear(); }
[[nodiscard]] bool Empty() const noexcept { return index_.empty(); }
std::map<LabelId, LabelIndexContainer> &GetIndex() { return index_; }
private:
std::map<LabelId, LabelIndexContainer> index_;
Indices *indices_;
@ -122,6 +130,7 @@ class LabelIndex {
};
class LabelPropertyIndex {
public:
struct Entry {
PropertyValue value;
Vertex *vertex;
@ -134,7 +143,6 @@ class LabelPropertyIndex {
bool operator==(const PropertyValue &rhs) const;
};
public:
using LabelPropertyIndexContainer = std::set<Entry>;
LabelPropertyIndex(Indices *indices, Config::Items config, const VertexValidator &vertex_validator)
@ -229,6 +237,10 @@ class LabelPropertyIndex {
void Clear() { index_.clear(); }
[[nodiscard]] bool Empty() const noexcept { return index_.empty(); }
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndexContainer> &GetIndex() { return index_; }
private:
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndexContainer> index_;
Indices *indices_;

View File

@ -333,7 +333,7 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
indices_{config.items, vertex_validator_},
isolation_level_{config.transaction.isolation_level},
config_{config},
shard_splitter_(vertices_, edges_, start_logical_id_to_transaction_, config_) {
shard_splitter_(vertices_, edges_, start_logical_id_to_transaction_, indices_, config_) {
CreateSchema(primary_label_, schema);
StoreMapping(std::move(id_to_name));
}

View File

@ -13,28 +13,33 @@
#include <map>
#include <memory>
#include <optional>
#include <set>
#include "storage/v3/config.hpp"
#include "storage/v3/indices.hpp"
#include "storage/v3/key_store.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
namespace memgraph::storage::v3 {
Splitter::Splitter(VertexContainer &vertices, EdgeContainer &edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Config &config)
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
Config &config)
: vertices_(vertices),
edges_(edges),
start_logical_id_to_transaction_(start_logical_id_to_transaction),
indices_(indices),
config_(config) {}
SplitData Splitter::SplitShard(const PrimaryKey &split_key) {
SplitData data;
std::set<uint64_t> collected_transactions_start_id;
data.vertices = CollectVertices(collected_transactions_start_id, split_key);
data.vertices = CollectVertices(data, collected_transactions_start_id, split_key);
data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key);
data.transactions = CollectTransactions(collected_transactions_start_id, data.vertices, *data.edges);
// TODO Indices
return data;
}
@ -46,16 +51,85 @@ void Splitter::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, D
}
}
VertexContainer Splitter::CollectVertices(std::set<uint64_t> &collected_transactions_start_id,
const PrimaryKey &split_key) {
VertexContainer splitted_data;
std::map<LabelId, LabelIndex::LabelIndexContainer> Splitter::CollectLabelIndices(
const PrimaryKey &split_key,
std::map<LabelId, std::multimap<const Vertex *, LabelIndex::Entry *>> &vertex_entry_map) {
if (indices_.label_index.Empty()) {
return {};
}
// Space O(i * n/2 * 2), i number of indexes, n number of vertices
std::map<LabelId, LabelIndex::LabelIndexContainer> cloned_indices;
for (auto &[label, index] : indices_.label_index.GetIndex()) {
for (const auto &entry : index) {
if (entry.vertex->first > split_key) {
[[maybe_unused]] auto [it, inserted, node] = cloned_indices[label].insert(index.extract(entry));
vertex_entry_map[label].insert({entry.vertex, &node.value()});
}
}
}
return cloned_indices;
}
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndex::LabelPropertyIndexContainer>
Splitter::CollectLabelPropertyIndices(
const PrimaryKey &split_key,
std::map<std::pair<LabelId, PropertyId>, std::multimap<const Vertex *, LabelPropertyIndex::Entry *>>
&vertex_entry_map) {
if (indices_.label_property_index.Empty()) {
return {};
}
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndex::LabelPropertyIndexContainer> cloned_indices;
for (auto &[label_prop_pair, index] : indices_.label_property_index.GetIndex()) {
cloned_indices[label_prop_pair] = LabelPropertyIndex::LabelPropertyIndexContainer{};
for (const auto &entry : index) {
if (entry.vertex->first > split_key) {
// We get this entry
[[maybe_unused]] const auto [it, inserted, node] = cloned_indices[label_prop_pair].insert(index.extract(entry));
vertex_entry_map[label_prop_pair].insert({entry.vertex, &node.value()});
}
}
}
return cloned_indices;
}
VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id,
const PrimaryKey &split_key) {
// Collection of indices is here since it heavily depends on vertices
// Old vertex pointer new entry pointer
std::map<LabelId, std::multimap<const Vertex *, LabelIndex::Entry *>> label_index_vertex_entry_map;
std::map<std::pair<LabelId, PropertyId>, std::multimap<const Vertex *, LabelPropertyIndex::Entry *>>
label_property_vertex_entry_map;
data.label_indices = CollectLabelIndices(split_key, label_index_vertex_entry_map);
data.label_property_indices = CollectLabelPropertyIndices(split_key, label_property_vertex_entry_map);
const auto update_indices = [](auto &index_map, const auto *old_vertex_ptr, auto &splitted_vertex_it) {
for (auto &[label, vertex_entry_mappings] : index_map) {
auto [it, end] = vertex_entry_mappings.equal_range(old_vertex_ptr);
while (it != end) {
it->second->vertex = &*splitted_vertex_it;
++it;
}
}
};
VertexContainer splitted_data;
auto split_key_it = vertices_.find(split_key);
while (split_key_it != vertices_.end()) {
// Go through deltas and pick up transactions start_id
ScanDeltas(collected_transactions_start_id, split_key_it->second.delta);
const auto *old_vertex_ptr = &*split_key_it;
auto next_it = std::next(split_key_it);
splitted_data.insert(vertices_.extract(split_key_it->first));
const auto &[splitted_vertex_it, inserted, node] = splitted_data.insert(vertices_.extract(split_key_it->first));
// Update indices
update_indices(label_index_vertex_entry_map, old_vertex_ptr, splitted_vertex_it);
update_indices(label_property_vertex_entry_map, old_vertex_ptr, splitted_vertex_it);
split_key_it = next_it;
}
return splitted_data;
@ -68,7 +142,7 @@ std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collecte
return std::nullopt;
}
EdgeContainer splitted_edges;
const auto split_edges_from_vertex = [&](const auto &edges_ref) {
const auto split_vertex_edges = [&](const auto &edges_ref) {
// This is safe since if properties_on_edges is true, the this must be a
// ptr
for (const auto &edge_ref : edges_ref) {
@ -87,8 +161,8 @@ std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collecte
};
for (const auto &vertex : split_vertices) {
split_edges_from_vertex(vertex.second.in_edges);
split_edges_from_vertex(vertex.second.out_edges);
split_vertex_edges(vertex.second.in_edges);
split_vertex_edges(vertex.second.out_edges);
}
return splitted_edges;
}
@ -119,8 +193,6 @@ void Splitter::AlignClonedTransaction(Transaction &cloned_transaction, const Tra
while (delta_it != transaction.deltas.end() && cloned_delta_it != cloned_transaction.deltas.end()) {
MG_ASSERT(delta_it->uuid == cloned_delta_it->uuid, "The order of deltas is not correct");
// Find appropriate prev and delta->next for cloned deltas
// auto *prev = &delta_it->prev;
// auto *cloned_prev = &cloned_delta_it->prev;
const auto *delta = &*delta_it;
auto *cloned_delta = &*cloned_delta_it;

View File

@ -17,6 +17,7 @@
#include "storage/v3/config.hpp"
#include "storage/v3/delta.hpp"
#include "storage/v3/edge.hpp"
#include "storage/v3/indices.hpp"
#include "storage/v3/transaction.hpp"
#include "storage/v3/vertex.hpp"
@ -28,12 +29,15 @@ struct SplitData {
VertexContainer vertices;
std::optional<EdgeContainer> edges;
std::map<uint64_t, Transaction> transactions;
std::map<LabelId, LabelIndex::LabelIndexContainer> label_indices;
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndex::LabelPropertyIndexContainer> label_property_indices;
};
class Splitter final {
public:
Splitter(VertexContainer &vertices, EdgeContainer &edges,
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Config &config);
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction, Indices &indices,
Config &config);
Splitter(const Splitter &) = delete;
Splitter(Splitter &&) noexcept = delete;
@ -44,26 +48,37 @@ class Splitter final {
SplitData SplitShard(const PrimaryKey &split_key);
private:
static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta);
void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
std::map<uint64_t, Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key);
VertexContainer CollectVertices(SplitData &data, std::set<uint64_t> &collected_transactions_start_id,
const PrimaryKey &split_key);
std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
const VertexContainer &split_vertices, const PrimaryKey &split_key);
std::map<LabelId, LabelIndex::LabelIndexContainer> CollectLabelIndices(
const PrimaryKey &split_key,
std::map<LabelId, std::multimap<const Vertex *, LabelIndex::Entry *>> &vertex_entry_map);
std::map<std::pair<LabelId, PropertyId>, LabelPropertyIndex::LabelPropertyIndexContainer> CollectLabelPropertyIndices(
const PrimaryKey &split_key,
std::map<std::pair<LabelId, PropertyId>, std::multimap<const Vertex *, LabelPropertyIndex::Entry *>>
&vertex_entry_map);
static void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta);
static void AlignClonedTransaction(Transaction &cloned_transaction, const Transaction &transaction,
std::map<uint64_t, Transaction> &cloned_transactions,
VertexContainer &cloned_vertices, EdgeContainer &cloned_edges);
void AlignClonedTransactions(std::map<uint64_t, Transaction> &cloned_transactions, VertexContainer &cloned_vertices,
EdgeContainer &cloned_edges);
VertexContainer &vertices_;
EdgeContainer &edges_;
std::map<uint64_t, std::unique_ptr<Transaction>> &start_logical_id_to_transaction_;
Indices &indices_;
Config &config_;
};