Add comments
This commit is contained in:
parent
bd26af4271
commit
3e23437f14
@ -68,7 +68,6 @@ VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &c
|
|||||||
const PrimaryKey &split_key) {
|
const PrimaryKey &split_key) {
|
||||||
// Collection of indices is here since it heavily depends on vertices
|
// Collection of indices is here since it heavily depends on vertices
|
||||||
// Old vertex pointer new entry pointer
|
// Old vertex pointer new entry pointer
|
||||||
|
|
||||||
std::map<LabelId, std::multimap<const Vertex *, const LabelIndex::IndexContainer::iterator>>
|
std::map<LabelId, std::multimap<const Vertex *, const LabelIndex::IndexContainer::iterator>>
|
||||||
label_index_vertex_entry_map;
|
label_index_vertex_entry_map;
|
||||||
std::map<std::pair<LabelId, PropertyId>,
|
std::map<std::pair<LabelId, PropertyId>,
|
||||||
@ -79,6 +78,7 @@ VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &c
|
|||||||
CollectIndexEntries<LabelIndex, LabelId>(indices_.label_index, split_key, label_index_vertex_entry_map);
|
CollectIndexEntries<LabelIndex, LabelId>(indices_.label_index, split_key, label_index_vertex_entry_map);
|
||||||
data.label_property_indices = CollectIndexEntries<LabelPropertyIndex, std::pair<LabelId, PropertyId>>(
|
data.label_property_indices = CollectIndexEntries<LabelPropertyIndex, std::pair<LabelId, PropertyId>>(
|
||||||
indices_.label_property_index, split_key, label_property_vertex_entry_map);
|
indices_.label_property_index, split_key, label_property_vertex_entry_map);
|
||||||
|
// This is needed to replace old vertex pointers in index entries with new ones
|
||||||
const auto update_indices = [](auto &entry_vertex_map, auto &updating_index, const auto *old_vertex_ptr,
|
const auto update_indices = [](auto &entry_vertex_map, auto &updating_index, const auto *old_vertex_ptr,
|
||||||
auto &new_vertex_ptr) {
|
auto &new_vertex_ptr) {
|
||||||
for ([[maybe_unused]] auto &[index_type, vertex_entry_mappings] : entry_vertex_map) {
|
for ([[maybe_unused]] auto &[index_type, vertex_entry_mappings] : entry_vertex_map) {
|
||||||
@ -96,7 +96,7 @@ VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &c
|
|||||||
VertexContainer splitted_data;
|
VertexContainer splitted_data;
|
||||||
auto split_key_it = vertices_.find(split_key);
|
auto split_key_it = vertices_.find(split_key);
|
||||||
while (split_key_it != vertices_.end()) {
|
while (split_key_it != vertices_.end()) {
|
||||||
// Go through deltas and pick up transactions start_id
|
// Go through deltas and pick up transactions start_id/commit_id
|
||||||
ScanDeltas(collected_transactions_, split_key_it->second.delta);
|
ScanDeltas(collected_transactions_, split_key_it->second.delta);
|
||||||
|
|
||||||
const auto *old_vertex_ptr = &*split_key_it;
|
const auto *old_vertex_ptr = &*split_key_it;
|
||||||
@ -107,7 +107,7 @@ VertexContainer Splitter::CollectVertices(SplitData &data, std::set<uint64_t> &c
|
|||||||
|
|
||||||
// Update indices
|
// Update indices
|
||||||
update_indices(label_index_vertex_entry_map, data.label_indices, old_vertex_ptr, splitted_vertex_it);
|
update_indices(label_index_vertex_entry_map, data.label_indices, old_vertex_ptr, splitted_vertex_it);
|
||||||
// update_indices(label_property_vertex_entry_map, old_vertex_ptr, splitted_vertex_it);
|
update_indices(label_property_vertex_entry_map, data.label_property_indices, old_vertex_ptr, splitted_vertex_it);
|
||||||
|
|
||||||
split_key_it = next_it;
|
split_key_it = next_it;
|
||||||
}
|
}
|
||||||
@ -122,14 +122,13 @@ std::optional<EdgeContainer> Splitter::CollectEdges(std::set<uint64_t> &collecte
|
|||||||
}
|
}
|
||||||
EdgeContainer splitted_edges;
|
EdgeContainer splitted_edges;
|
||||||
const auto split_vertex_edges = [&](const auto &edges_ref) {
|
const auto split_vertex_edges = [&](const auto &edges_ref) {
|
||||||
// This is safe since if properties_on_edges is true, the this must be a
|
// This is safe since if properties_on_edges is true, the this must be a ptr
|
||||||
// ptr
|
|
||||||
for (const auto &edge_ref : edges_ref) {
|
for (const auto &edge_ref : edges_ref) {
|
||||||
auto *edge = std::get<2>(edge_ref).ptr;
|
auto *edge = std::get<2>(edge_ref).ptr;
|
||||||
const auto &other_vtx = std::get<1>(edge_ref);
|
const auto &other_vtx = std::get<1>(edge_ref);
|
||||||
ScanDeltas(collected_transactions_, edge->delta);
|
ScanDeltas(collected_transactions_, edge->delta);
|
||||||
// Check if src and dest edge are both on splitted shard
|
// Check if src and dest edge are both on splitted shard so we know if we
|
||||||
// so we know if we should remove orphan edge
|
// should remove orphan edge, or make a clone
|
||||||
if (other_vtx.primary_key >= split_key) {
|
if (other_vtx.primary_key >= split_key) {
|
||||||
// Remove edge from shard
|
// Remove edge from shard
|
||||||
splitted_edges.insert(edges_.extract(edge->gid));
|
splitted_edges.insert(edges_.extract(edge->gid));
|
||||||
@ -183,8 +182,7 @@ void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Tr
|
|||||||
const auto *delta = &*delta_it;
|
const auto *delta = &*delta_it;
|
||||||
auto *cloned_delta = &*cloned_delta_it;
|
auto *cloned_delta = &*cloned_delta_it;
|
||||||
while (delta != nullptr) {
|
while (delta != nullptr) {
|
||||||
// Align delta, while ignoring deltas whose transactions have commited,
|
// Align deltas which belong to cloned transaction, skip others
|
||||||
// or aborted
|
|
||||||
if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) {
|
if (cloned_transactions.contains(delta->commit_info->start_or_commit_timestamp.logical_id)) {
|
||||||
auto *found_delta_it = &*std::ranges::find_if(
|
auto *found_delta_it = &*std::ranges::find_if(
|
||||||
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id)->deltas,
|
cloned_transactions.at(delta->commit_info->start_or_commit_timestamp.logical_id)->deltas,
|
||||||
@ -208,7 +206,8 @@ void Splitter::AdjustClonedTransaction(Transaction &cloned_transaction, const Tr
|
|||||||
}
|
}
|
||||||
case PreviousPtr::Type::VERTEX: {
|
case PreviousPtr::Type::VERTEX: {
|
||||||
// What if the vertex is already moved to garbage collection...
|
// What if the vertex is already moved to garbage collection...
|
||||||
// Make test when you have deleted vertex
|
// TODO(jbajic) Maybe revisit when we apply Garbage collection with new
|
||||||
|
// transaction management system
|
||||||
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
|
auto *cloned_vertex = &*cloned_vertices.find(ptr.vertex->first);
|
||||||
cloned_delta->prev.Set(cloned_vertex);
|
cloned_delta->prev.Set(cloned_vertex);
|
||||||
break;
|
break;
|
||||||
|
@ -81,17 +81,16 @@ class Splitter final {
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cloned index entries will contain new index entry iterators, but old
|
||||||
|
// vertices address which need to be adjusted after extracting vertices
|
||||||
std::map<IndexType, typename IndexMap::IndexContainer> cloned_indices;
|
std::map<IndexType, typename IndexMap::IndexContainer> cloned_indices;
|
||||||
for (auto &[index_type_val, index] : index.GetIndex()) {
|
for (auto &[index_type_val, index] : index.GetIndex()) {
|
||||||
// cloned_indices[index_type_val] = typename IndexMap::IndexContainer{};
|
|
||||||
|
|
||||||
auto entry_it = index.begin();
|
auto entry_it = index.begin();
|
||||||
while (entry_it != index.end()) {
|
while (entry_it != index.end()) {
|
||||||
// We need to save the next pointer since the current one will be
|
// We need to save the next pointer since the current one will be
|
||||||
// invalidated after extract
|
// invalidated after extract
|
||||||
auto next_entry_it = std::next(entry_it);
|
auto next_entry_it = std::next(entry_it);
|
||||||
if (entry_it->vertex->first > split_key) {
|
if (entry_it->vertex->first > split_key) {
|
||||||
// We get this entry
|
|
||||||
[[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
|
[[maybe_unused]] const auto &[inserted_entry_it, inserted, node] =
|
||||||
cloned_indices[index_type_val].insert(index.extract(entry_it));
|
cloned_indices[index_type_val].insert(index.extract(entry_it));
|
||||||
MG_ASSERT(inserted, "Failed to extract index entry!");
|
MG_ASSERT(inserted, "Failed to extract index entry!");
|
||||||
|
Loading…
Reference in New Issue
Block a user