From 7cb07672ffcfcd3b4b73563efdebeea5c2165305 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Tue, 24 Jan 2023 17:17:47 +0100 Subject: [PATCH] Make `DistributedExpandCursor` handle existing nodes with `MultiFrame` --- src/query/v2/plan/operator.cpp | 42 +++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 0868f24a5..64a8c6f8c 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -2865,8 +2865,10 @@ class DistributedExpandCursor : public Cursor { current_in_edge_mf_it_ = result_row.in_edges_with_specific_properties.begin(); in_edges_end_it_ = result_row.in_edges_with_specific_properties.end(); + AdvanceUntilSuitableEdge(current_in_edge_mf_it_, in_edges_end_it_); current_out_edge_mf_it_ = result_row.out_edges_with_specific_properties.begin(); out_edges_end_it_ = result_row.out_edges_with_specific_properties.end(); + AdvanceUntilSuitableEdge(current_out_edge_mf_it_, out_edges_end_it_); if (ref_counted_result_row.ref_count == 1) { vertex_id_to_result_row.erase(vertex.Id()); @@ -2921,7 +2923,6 @@ class DistributedExpandCursor : public Cursor { bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("DistributedExpandMF"); - MG_ASSERT(!self_.common_.existing_node); EnsureOwnMultiFrameIsGood(output_multi_frame); // A helper function for expanding a node from an edge. @@ -2963,8 +2964,6 @@ class DistributedExpandCursor : public Cursor { for (auto output_frame_it = output_frames_populator.begin(); output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) { auto &edge = *current; - // auto &asd = edge.other_end.second[0]; - ++current; auto &output_frame = *output_frame_it; output_frame = *own_frames_it_; switch (direction) { @@ -2985,6 +2984,8 @@ class DistributedExpandCursor : public Cursor { } }; PullDstVertex(output_frame, context, direction); + ++current; + AdvanceUntilSuitableEdge(current, end); populated_any = true; } }; @@ -3030,12 +3031,6 @@ class DistributedExpandCursor : public Cursor { } private: - void ResetMultiFrameEdgeIts() { - in_edges_end_it_ = EdgesIterator{}; - current_in_edge_mf_it_ = in_edges_end_it_; - out_edges_end_it_ = EdgesIterator{}; - current_out_edge_mf_it_ = out_edges_end_it_; - } enum class State { PullInputAndEdges, InitInOutEdgesIt, PopulateOutput, Exhausted }; struct RefCountedResultRow { @@ -3043,10 +3038,35 @@ class DistributedExpandCursor : public Cursor { msgs::ExpandOneResultRow *result_row{nullptr}; }; + using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties; + using EdgesVector = std::vector; + using EdgesIterator = EdgesVector::iterator; + + void ResetMultiFrameEdgeIts() { + in_edges_end_it_ = EdgesIterator{}; + current_in_edge_mf_it_ = in_edges_end_it_; + out_edges_end_it_ = EdgesIterator{}; + current_out_edge_mf_it_ = out_edges_end_it_; + } + + void AdvanceUntilSuitableEdge(EdgesIterator ¤t, const EdgesIterator &end) { + if (!self_.common_.existing_node) { + return; + } + + const auto &existing_node_value = (*own_frames_it_)[self_.common_.node_symbol]; + if (existing_node_value.IsNull()) { + current = end; + return; + } + const auto &existing_node = existing_node_value.ValueVertex(); + current = std::find_if(current, end, [&existing_node](const EdgeWithSpecificProperties &edge) { + return edge.other_end == existing_node.Id(); + }); + } + const Expand &self_; const UniqueCursorPtr input_cursor_; - using EdgesVector = std::vector; - using EdgesIterator = EdgesVector::iterator; EdgesIterator current_in_edge_mf_it_; EdgesIterator in_edges_end_it_; EdgesIterator current_out_edge_mf_it_;