Make DistributedExpand operator handle repeated vertices

This commit is contained in:
János Benjamin Antal 2023-01-24 16:33:15 +01:00
parent fa8eee2043
commit 4908af5a18

View File

@ -2842,7 +2842,7 @@ class DistributedExpandCursor : public Cursor {
}
}
void InitEdgesMultiple(ExecutionContext &context) {
void InitEdgesMultiple() {
// This function won't work if any vertex id is duplicated in the input, because:
// 1. vertex_id_to_result_row is not a multimap
// 2. if self_.common_.existing_node is true, then we erase edges that might be necessary for the input vertex on a
@ -2851,49 +2851,28 @@ class DistributedExpandCursor : public Cursor {
const auto &vertex_value = frame[self_.input_symbol_];
if (vertex_value.IsNull()) {
ResetMultiFrameEdgeIts();
return;
}
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
const auto &vertex = vertex_value.ValueVertex();
const auto convert_edges = [&vertex, &context](
std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
const EdgeAtom::Direction direction) {
std::vector<EdgeAccessor> edge_accessors;
edge_accessors.reserve(edge_messages.size());
current_vertex_ = &vertex;
switch (direction) {
case EdgeAtom::Direction::IN: {
for (auto &edge : edge_messages) {
edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type},
context.request_router);
}
break;
}
case EdgeAtom::Direction::OUT: {
for (auto &edge : edge_messages) {
edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type},
context.request_router);
}
break;
}
case EdgeAtom::Direction::BOTH: {
LOG_FATAL("Must indicate exact expansion direction here");
}
}
return edge_accessors;
};
auto &ref_counted_result_row = vertex_id_to_result_row.at(vertex.Id());
auto &result_row = *ref_counted_result_row.result_row;
auto *result_row = vertex_id_to_result_row[vertex.Id()];
current_in_edges_.clear();
current_in_edges_ =
convert_edges(std::move(result_row->in_edges_with_specific_properties), EdgeAtom::Direction::IN);
current_in_edge_it_ = current_in_edges_.begin();
current_out_edges_ =
convert_edges(std::move(result_row->out_edges_with_specific_properties), EdgeAtom::Direction::OUT);
current_out_edge_it_ = current_out_edges_.begin();
vertex_id_to_result_row.erase(vertex.Id());
current_in_edge_mf_it_ = result_row.in_edges_with_specific_properties.begin();
in_edges_end_it_ = result_row.in_edges_with_specific_properties.end();
current_out_edge_mf_it_ = result_row.out_edges_with_specific_properties.begin();
out_edges_end_it_ = result_row.out_edges_with_specific_properties.end();
if (ref_counted_result_row.ref_count == 1) {
vertex_id_to_result_row.erase(vertex.Id());
} else {
ref_counted_result_row.ref_count--;
}
}
bool PullInputFrames(ExecutionContext &context) {
@ -2906,6 +2885,8 @@ class DistributedExpandCursor : public Cursor {
return false;
}
vertex_id_to_result_row.clear();
msgs::ExpandOneRequest request;
request.direction = DirectionToMsgsDirection(self_.common_.direction);
// to not fetch any properties of the edges
@ -2918,16 +2899,21 @@ class DistributedExpandCursor : public Cursor {
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
const auto &vertex = vertex_value.ValueVertex();
request.src_vertices.push_back(vertex.Id());
auto [it, inserted] = vertex_id_to_result_row.try_emplace(vertex.Id(), RefCountedResultRow{1U, nullptr});
if (inserted) {
request.src_vertices.push_back(vertex.Id());
} else {
it->second.ref_count++;
}
}
result_rows_ = std::invoke([&context, &request]() mutable {
SCOPED_REQUEST_WAIT_PROFILE;
return context.request_router->ExpandOne(std::move(request));
});
vertex_id_to_result_row.clear();
for (auto &row : result_rows_) {
vertex_id_to_result_row[row.src_vertex.id] = &row;
vertex_id_to_result_row[row.src_vertex.id].result_row = &row;
}
return true;
@ -2956,7 +2942,7 @@ class DistributedExpandCursor : public Cursor {
if (own_frames_it_ == own_frames_consumer_->end()) {
state_ = State::PullInputAndEdges;
} else {
InitEdgesMultiple(context);
InitEdgesMultiple();
state_ = State::PopulateOutput;
}
break;
@ -2965,28 +2951,45 @@ class DistributedExpandCursor : public Cursor {
if (!output_multi_frame.HasInvalidFrame()) {
return populated_any;
}
if (current_in_edge_it_ == current_in_edges_.end() && current_out_edge_it_ == current_out_edges_.end()) {
if (current_in_edge_mf_it_ == in_edges_end_it_ && current_out_edge_mf_it_ == out_edges_end_it_) {
own_frames_it_->MakeInvalid();
++own_frames_it_;
state_ = State::InitInOutEdgesIt;
continue;
}
auto populate_edges = [this, &context, &output_frames_populator, &populated_any](
const EdgeAtom::Direction direction, std::vector<EdgeAccessor>::iterator &current,
const std::vector<EdgeAccessor>::iterator &end) {
const EdgeAtom::Direction direction, EdgesIterator &current,
const EdgesIterator &end) {
for (auto output_frame_it = output_frames_populator.begin();
output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) {
auto &edge = *current;
// auto &asd = edge.other_end.second[0];
++current;
auto &output_frame = *output_frame_it;
output_frame = *own_frames_it_;
output_frame[self_.common_.edge_symbol] = edge;
switch (direction) {
case EdgeAtom::Direction::IN: {
output_frame[self_.common_.edge_symbol] =
EdgeAccessor{msgs::Edge{edge.other_end, current_vertex_->Id(), {}, {edge.gid}, edge.type},
context.request_router};
break;
}
case EdgeAtom::Direction::OUT: {
output_frame[self_.common_.edge_symbol] =
EdgeAccessor{msgs::Edge{current_vertex_->Id(), edge.other_end, {}, {edge.gid}, edge.type},
context.request_router};
break;
}
case EdgeAtom::Direction::BOTH: {
LOG_FATAL("Must indicate exact expansion direction here");
}
};
PullDstVertex(output_frame, context, direction);
populated_any = true;
}
};
populate_edges(EdgeAtom::Direction::IN, current_in_edge_it_, current_in_edges_.end());
populate_edges(EdgeAtom::Direction::OUT, current_out_edge_it_, current_out_edges_.end());
populate_edges(EdgeAtom::Direction::IN, current_in_edge_mf_it_, in_edges_end_it_);
populate_edges(EdgeAtom::Direction::OUT, current_out_edge_mf_it_, out_edges_end_it_);
break;
}
case State::Exhausted: {
@ -3017,28 +3020,51 @@ class DistributedExpandCursor : public Cursor {
own_frames_consumer_.reset();
own_multi_frame_->MakeAllFramesInvalid();
state_ = State::PullInputAndEdges;
current_in_edges_.clear();
current_out_edges_.clear();
current_in_edge_it_ = current_in_edges_.end();
current_out_edge_it_ = current_out_edges_.end();
ResetMultiFrameEdgeIts();
}
private:
void ResetMultiFrameEdgeIts() {
in_edges_end_it_ = EdgesIterator{};
current_in_edge_mf_it_ = in_edges_end_it_;
out_edges_end_it_ = EdgesIterator{};
current_out_edge_mf_it_ = out_edges_end_it_;
}
enum class State { PullInputAndEdges, InitInOutEdgesIt, PopulateOutput, Exhausted };
struct RefCountedResultRow {
size_t ref_count{0U};
msgs::ExpandOneResultRow *result_row{nullptr};
};
const Expand &self_;
const UniqueCursorPtr input_cursor_;
using EdgesVector = std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties>;
using EdgesIterator = EdgesVector::iterator;
EdgesIterator current_in_edge_mf_it_;
EdgesIterator in_edges_end_it_;
EdgesIterator current_out_edge_mf_it_;
EdgesIterator out_edges_end_it_;
State state_{State::PullInputAndEdges};
std::optional<MultiFrame> own_multi_frame_;
std::optional<ValidFramesConsumer> own_frames_consumer_;
const VertexAccessor *current_vertex_{nullptr};
ValidFramesConsumer::Iterator own_frames_it_;
std::vector<msgs::ExpandOneResultRow> result_rows_;
// This won't work if any vertex id is duplicated in the input
std::unordered_map<msgs::VertexId, RefCountedResultRow> vertex_id_to_result_row;
// TODO(antaljanosbenjamin): Remove when single frame approach is removed
std::vector<EdgeAccessor> current_in_edges_;
std::vector<EdgeAccessor> current_out_edges_;
std::vector<EdgeAccessor>::iterator current_in_edge_it_;
std::vector<EdgeAccessor>::iterator current_out_edge_it_;
State state_{State::PullInputAndEdges};
std::optional<MultiFrame> own_multi_frame_;
std::optional<ValidFramesConsumer> own_frames_consumer_;
ValidFramesConsumer::Iterator own_frames_it_;
std::vector<msgs::ExpandOneResultRow> result_rows_;
// This won't work if any vertex id is duplicated in the input
std::unordered_map<msgs::VertexId, msgs::ExpandOneResultRow *> vertex_id_to_result_row;
};
} // namespace memgraph::query::v2::plan