Fix splitting of edges

This commit is contained in:
jbajic 2023-01-09 16:11:10 +01:00
parent bf21cbc9a9
commit 7ef4114835
4 changed files with 98 additions and 46 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -19,6 +19,7 @@
#include <mutex>
#include <optional>
#include <bits/ranges_algo.h>
#include <spdlog/spdlog.h>
#include "io/network/endpoint.hpp"
@ -1055,7 +1056,7 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
return std::nullopt;
}
void CollectDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const {
void Shard::ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const {
while (delta != nullptr) {
collected_transactions_start_id.insert(delta->command_id);
delta = delta->next;
@ -1069,59 +1070,72 @@ VertexContainer Shard::CollectVertices(std::set<uint64_t> &collected_transaction
for (; split_key_it != vertices_.end(); split_key_it++) {
// Go through deltas and pick up transactions start_id
CollectDeltas(collected_transactions_start_id, split_key_it->second.delta);
ScanDeltas(collected_transactions_start_id, split_key_it->second.delta);
splitted_data.insert(vertices_.extract(split_key_it->first));
}
return splitted_data;
}
std::optional<EdgeContainer> Shard::CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
const VertexContainer &split_vertices) const {
const VertexContainer &split_vertices, const PrimaryKey &split_key) {
if (!config_.items.properties_on_edges) {
return std::nullopt;
}
EdgeContainer splitted_edges;
// TODO This copies edges without removing the unecessary ones!!
for (const auto &vertex : split_vertices) {
for (const auto &in_edge : vertex.second.in_edges) {
const auto split_edges_from_vertex = [&](const auto &edges_ref) {
// This is safe since if properties_on_edges is true, the this must be a
// ptr
auto *edge = std::get<2>(in_edge).ptr;
CollectDeltas(collected_transactions_start_id, edge->delta);
for (const auto &edge_ref : edges_ref) {
auto *edge = std::get<2>(edge_ref).ptr;
const auto &other_vtx = std::get<1>(edge_ref);
ScanDeltas(collected_transactions_start_id, edge->delta);
// Check if src and dest edge are both on splitted shard
// so we know if we should remove orphan edge
if (other_vtx.primary_key >= split_key) {
// Remove edge from shard
splitted_edges.insert(edges_.extract(edge->gid));
} else {
splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}});
}
for (const auto &in_edge : vertex.second.out_edges) {
auto *edge = std::get<2>(in_edge).ptr;
CollectDeltas(collected_transactions_start_id, edge->delta);
splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}});
}
};
for (const auto &vertex : split_vertices) {
split_edges_from_vertex(vertex.second.in_edges);
split_edges_from_vertex(vertex.second.out_edges);
}
return splitted_edges;
}
std::list<Transaction> Shard::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id) const {
std::list<Transaction> transactions;
for (const auto commit_start : collected_transactions_start_id) {
transactions.push_back(*start_logical_id_to_transaction_[commit_start]);
}
std::map<uint64_t, std::unique_ptr<Transaction>> Shard::CollectTransactions(
const std::set<uint64_t> &collected_transactions_start_id) {
std::map<uint64_t, std::unique_ptr<Transaction>> transactions;
// for (const auto commit_start : collected_transactions_start_id) {
// transactions.insert(
// {commit_start, std::make_unique<Transaction>(*start_logical_id_to_transaction_.at(commit_start))});
// }
return transactions;
}
SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
SplitData data;
std::set<uint64_t> collected_transactions_start_id;
// Split Vertices
data.vertices = CollectVertices(collected_transactions_start_id, split_key);
// Resolve the deltas that were left on the shard, and are not referenced by
// neither of vertices
data.edges = CollectEdges(collected_transactions_start_id, data.vertices);
data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
// TODO Iterate over vertices and edges to replace their deltas with new ones tha are copied over
// use uuid
data.edges = CollectEdges(collected_transactions_start_id, data.vertices, split_key);
// TODO indices wont work since timestamp cannot be replicated
// data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
data.transactions = CollectTransactions(collected_transactions_start_id);
// data.transactions = CollectTransactions(collected_transactions_start_id);
// Update delta addresses with the new addresses
// for (auto &vertex : data.vertices) {
// AdjustSplittedDataDeltas(vertex.second, data.transactions);
// }
// if (data.edges) {
// for (auto &edge : data.edges) {
// AdjustSplittedDataDeltas(edge, data.transactions);
// }
// }
return data;
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -185,7 +185,7 @@ struct SplitData {
VertexContainer vertices;
std::optional<EdgeContainer> edges;
IndicesInfo indices_info;
std::list<Transaction> transactions;
std::map<uint64_t, Transaction> transactions;
};
/// Structure used to return information about the storage.
@ -375,14 +375,36 @@ class Shard final {
SplitData PerformSplit(const PrimaryKey &split_key);
private:
void CollectDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const;
template <typename TObj>
requires utils::SameAsAnyOf<TObj, Edge, VertexData>
void AdjustSplittedDataDeltas(TObj &delta_holder, const std::map<int64_t, Transaction> &transactions) {
auto *delta_chain = delta_holder.delta;
Delta *new_delta_chain{nullptr};
while (delta_chain != nullptr) {
auto &transaction = transactions.at(delta_chain->command_id);
// This is the address of corresponding delta
const auto transaction_delta_it = std::ranges::find_if(
transaction->deltas, [delta_uuid = delta_chain->uuid](const auto &elem) { return elem.uuid == delta_uuid; });
// Add this delta to the new chain
if (new_delta_chain == nullptr) {
new_delta_chain = &*transaction_delta_it;
} else {
new_delta_chain->next = &*transaction_delta_it;
}
delta_chain = delta_chain->next;
}
delta_holder.delta = new_delta_chain;
}
std::list<Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id) const;
void ScanDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const;
std::map<uint64_t, std::unique_ptr<Transaction>> CollectTransactions(
const std::set<uint64_t> &collected_transactions_start_id);
VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key);
std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
const VertexContainer &split_vertices) const;
const VertexContainer &split_vertices, const PrimaryKey &split_key);
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
@ -391,7 +413,7 @@ class Shard final {
// Main object storage
NameIdMapper name_id_mapper_;
LabelId primary_label_;
// The shard's range is [min, max)
// The shard's range is [min, max>
PrimaryKey min_primary_key_;
std::optional<PrimaryKey> max_primary_key_;
VertexContainer vertices_;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -174,12 +174,12 @@ class ShardWorker {
Time next_for_uuid = rsm.Cron();
// Check if shard should split
if (const auto split_info = rsm.ShouldSplit(); split_info) {
// if (const auto split_info = rsm.ShouldSplit(); split_info) {
// Request split from coordinator
// split_point => middle pk
// shard_id => uuid
// shard_version =>
}
// }
cron_schedule_.pop();
cron_schedule_.push(std::make_pair(next_for_uuid, uuid));

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -31,6 +31,16 @@ struct CommitInfo {
};
struct Transaction {
Transaction(coordinator::Hlc start_timestamp, CommitInfo commit_info, uint64_t command_id,
const std::list<Delta> &deltas, bool must_abort, bool is_aborted, IsolationLevel isolation_level)
: start_timestamp{start_timestamp},
commit_info{std::make_unique<CommitInfo>(commit_info)},
command_id(command_id),
deltas(CopyDeltas(deltas)),
must_abort(must_abort),
is_aborted(is_aborted),
isolation_level(isolation_level){};
Transaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level)
: start_timestamp(start_timestamp),
commit_info(std::make_unique<CommitInfo>(CommitInfo{false, {start_timestamp}})),
@ -54,6 +64,12 @@ struct Transaction {
~Transaction() {}
std::list<Delta> CopyDeltas(const std::list<Delta> &deltas) const { return std::list<Delta>{}; }
Transaction Clone() const {
return {start_timestamp, *commit_info, command_id, deltas, must_abort, is_aborted, isolation_level};
}
coordinator::Hlc start_timestamp;
std::unique_ptr<CommitInfo> commit_info;
uint64_t command_id;