Split vetrices, edges and transactions
This commit is contained in:
parent
cb83b94b16
commit
bf21cbc9a9
@ -12,6 +12,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
|
||||
#include "storage/v3/edge_ref.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
@ -129,6 +132,9 @@ inline bool operator==(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer
|
||||
inline bool operator!=(const PreviousPtr::Pointer &a, const PreviousPtr::Pointer &b) { return !(a == b); }
|
||||
|
||||
struct Delta {
|
||||
// Needed for splits
|
||||
boost::uuids::uuid uuid;
|
||||
|
||||
enum class Action {
|
||||
// Used for both Vertex and Edge
|
||||
DELETE_OBJECT,
|
||||
|
@ -23,6 +23,8 @@
|
||||
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "storage/v3/delta.hpp"
|
||||
#include "storage/v3/edge.hpp"
|
||||
#include "storage/v3/edge_accessor.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/indices.hpp"
|
||||
@ -1053,27 +1055,73 @@ std::optional<SplitInfo> Shard::ShouldSplit() const noexcept {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void CollectDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const {
|
||||
while (delta != nullptr) {
|
||||
collected_transactions_start_id.insert(delta->command_id);
|
||||
delta = delta->next;
|
||||
}
|
||||
}
|
||||
|
||||
VertexContainer Shard::CollectVertices(std::set<uint64_t> &collected_transactions_start_id,
|
||||
const PrimaryKey &split_key) {
|
||||
VertexContainer splitted_data;
|
||||
auto split_key_it = vertices_.find(split_key);
|
||||
|
||||
for (; split_key_it != vertices_.end(); split_key_it++) {
|
||||
// Go through deltas and pick up transactions start_id
|
||||
CollectDeltas(collected_transactions_start_id, split_key_it->second.delta);
|
||||
splitted_data.insert(vertices_.extract(split_key_it->first));
|
||||
}
|
||||
return splitted_data;
|
||||
}
|
||||
|
||||
std::optional<EdgeContainer> Shard::CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
|
||||
const VertexContainer &split_vertices) const {
|
||||
if (!config_.items.properties_on_edges) {
|
||||
return std::nullopt;
|
||||
}
|
||||
EdgeContainer splitted_edges;
|
||||
// TODO This copies edges without removing the unecessary ones!!
|
||||
for (const auto &vertex : split_vertices) {
|
||||
for (const auto &in_edge : vertex.second.in_edges) {
|
||||
// This is safe since if properties_on_edges is true, the this must be a
|
||||
// ptr
|
||||
auto *edge = std::get<2>(in_edge).ptr;
|
||||
CollectDeltas(collected_transactions_start_id, edge->delta);
|
||||
|
||||
splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}});
|
||||
}
|
||||
for (const auto &in_edge : vertex.second.out_edges) {
|
||||
auto *edge = std::get<2>(in_edge).ptr;
|
||||
CollectDeltas(collected_transactions_start_id, edge->delta);
|
||||
|
||||
splitted_edges.insert({edge->gid, Edge{edge->gid, edge->delta}});
|
||||
}
|
||||
}
|
||||
return splitted_edges;
|
||||
}
|
||||
|
||||
std::list<Transaction> Shard::CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id) const {
|
||||
std::list<Transaction> transactions;
|
||||
for (const auto commit_start : collected_transactions_start_id) {
|
||||
transactions.push_back(*start_logical_id_to_transaction_[commit_start]);
|
||||
}
|
||||
return transactions;
|
||||
}
|
||||
|
||||
SplitData Shard::PerformSplit(const PrimaryKey &split_key) {
|
||||
SplitData data;
|
||||
data.vertices = std::map(vertices_.find(split_key), vertices_.end());
|
||||
std::set<uint64_t> collected_transactions_start_id;
|
||||
// Split Vertices
|
||||
data.vertices = CollectVertices(collected_transactions_start_id, split_key);
|
||||
// Resolve the deltas that were left on the shard, and are not referenced by
|
||||
// neither of vertices
|
||||
data.edges = CollectEdges(collected_transactions_start_id, data.vertices);
|
||||
data.indices_info = {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
|
||||
// TODO Iterate over vertices and edges to replace their deltas with new ones tha are copied over
|
||||
// use uuid
|
||||
|
||||
// Get all edges related with those vertices
|
||||
if (config_.items.properties_on_edges) {
|
||||
data.edges = std::invoke([&split_vertices = data.vertices]() {
|
||||
// How to reserve?
|
||||
EdgeContainer split_edges;
|
||||
for (const auto &vertex : split_vertices) {
|
||||
for (const auto &in_edge : vertex.second.in_edges) {
|
||||
auto edge = std::get<2>(in_edge).ptr;
|
||||
split_edges.insert(edge->gid, Edge{.gid = edge->gid, .delta = edge->delta, .properties = edge->properties});
|
||||
}
|
||||
}
|
||||
return split_edges;
|
||||
});
|
||||
}
|
||||
// TODO We also need to send ongoing transactions to the shard
|
||||
// since they own deltas
|
||||
data.transactions = CollectTransactions(collected_transactions_start_id);
|
||||
|
||||
return data;
|
||||
}
|
||||
|
@ -185,6 +185,7 @@ struct SplitData {
|
||||
VertexContainer vertices;
|
||||
std::optional<EdgeContainer> edges;
|
||||
IndicesInfo indices_info;
|
||||
std::list<Transaction> transactions;
|
||||
};
|
||||
|
||||
/// Structure used to return information about the storage.
|
||||
@ -374,6 +375,15 @@ class Shard final {
|
||||
SplitData PerformSplit(const PrimaryKey &split_key);
|
||||
|
||||
private:
|
||||
void CollectDeltas(std::set<uint64_t> &collected_transactions_start_id, Delta *delta) const;
|
||||
|
||||
std::list<Transaction> CollectTransactions(const std::set<uint64_t> &collected_transactions_start_id) const;
|
||||
|
||||
VertexContainer CollectVertices(std::set<uint64_t> &collected_transactions_start_id, const PrimaryKey &split_key);
|
||||
|
||||
std::optional<EdgeContainer> CollectEdges(std::set<uint64_t> &collected_transactions_start_id,
|
||||
const VertexContainer &split_vertices) const;
|
||||
|
||||
Transaction &GetTransaction(coordinator::Hlc start_timestamp, IsolationLevel isolation_level);
|
||||
|
||||
uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {});
|
||||
|
Loading…
Reference in New Issue
Block a user