Make DistributedExpandCursor handle existing nodes with MultiFrame

This commit is contained in:
János Benjamin Antal 2023-01-24 17:17:47 +01:00
parent 4908af5a18
commit 7cb07672ff

View File

@ -2865,8 +2865,10 @@ class DistributedExpandCursor : public Cursor {
current_in_edge_mf_it_ = result_row.in_edges_with_specific_properties.begin();
in_edges_end_it_ = result_row.in_edges_with_specific_properties.end();
AdvanceUntilSuitableEdge(current_in_edge_mf_it_, in_edges_end_it_);
current_out_edge_mf_it_ = result_row.out_edges_with_specific_properties.begin();
out_edges_end_it_ = result_row.out_edges_with_specific_properties.end();
AdvanceUntilSuitableEdge(current_out_edge_mf_it_, out_edges_end_it_);
if (ref_counted_result_row.ref_count == 1) {
vertex_id_to_result_row.erase(vertex.Id());
@ -2921,7 +2923,6 @@ class DistributedExpandCursor : public Cursor {
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("DistributedExpandMF");
MG_ASSERT(!self_.common_.existing_node);
EnsureOwnMultiFrameIsGood(output_multi_frame);
// A helper function for expanding a node from an edge.
@ -2963,8 +2964,6 @@ class DistributedExpandCursor : public Cursor {
for (auto output_frame_it = output_frames_populator.begin();
output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) {
auto &edge = *current;
// auto &asd = edge.other_end.second[0];
++current;
auto &output_frame = *output_frame_it;
output_frame = *own_frames_it_;
switch (direction) {
@ -2985,6 +2984,8 @@ class DistributedExpandCursor : public Cursor {
}
};
PullDstVertex(output_frame, context, direction);
++current;
AdvanceUntilSuitableEdge(current, end);
populated_any = true;
}
};
@ -3030,12 +3031,6 @@ class DistributedExpandCursor : public Cursor {
}
private:
void ResetMultiFrameEdgeIts() {
in_edges_end_it_ = EdgesIterator{};
current_in_edge_mf_it_ = in_edges_end_it_;
out_edges_end_it_ = EdgesIterator{};
current_out_edge_mf_it_ = out_edges_end_it_;
}
enum class State { PullInputAndEdges, InitInOutEdgesIt, PopulateOutput, Exhausted };
struct RefCountedResultRow {
@ -3043,10 +3038,35 @@ class DistributedExpandCursor : public Cursor {
msgs::ExpandOneResultRow *result_row{nullptr};
};
using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties;
using EdgesVector = std::vector<EdgeWithSpecificProperties>;
using EdgesIterator = EdgesVector::iterator;
void ResetMultiFrameEdgeIts() {
in_edges_end_it_ = EdgesIterator{};
current_in_edge_mf_it_ = in_edges_end_it_;
out_edges_end_it_ = EdgesIterator{};
current_out_edge_mf_it_ = out_edges_end_it_;
}
void AdvanceUntilSuitableEdge(EdgesIterator &current, const EdgesIterator &end) {
if (!self_.common_.existing_node) {
return;
}
const auto &existing_node_value = (*own_frames_it_)[self_.common_.node_symbol];
if (existing_node_value.IsNull()) {
current = end;
return;
}
const auto &existing_node = existing_node_value.ValueVertex();
current = std::find_if(current, end, [&existing_node](const EdgeWithSpecificProperties &edge) {
return edge.other_end == existing_node.Id();
});
}
const Expand &self_;
const UniqueCursorPtr input_cursor_;
using EdgesVector = std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties>;
using EdgesIterator = EdgesVector::iterator;
EdgesIterator current_in_edge_mf_it_;
EdgesIterator in_edges_end_it_;
EdgesIterator current_out_edge_mf_it_;