Implement automaton for ExpandOneCursor
This commit is contained in:
parent
f39a937323
commit
e888464de2
@ -48,6 +48,10 @@ bool MultiFrame::HasValidFrame() const noexcept {
|
||||
return std::any_of(frames_.begin(), frames_.end(), [](auto &frame) { return frame.IsValid(); });
|
||||
}
|
||||
|
||||
bool MultiFrame::HasInvalidFrame() const noexcept {
|
||||
return std::any_of(frames_.begin(), frames_.end(), [](auto &frame) { return !frame.IsValid(); });
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-exception-escape)
|
||||
void MultiFrame::DefragmentValidFrames() noexcept {
|
||||
/*
|
||||
|
@ -82,6 +82,7 @@ class MultiFrame {
|
||||
void MakeAllFramesInvalid() noexcept;
|
||||
|
||||
bool HasValidFrame() const noexcept;
|
||||
bool HasInvalidFrame() const noexcept;
|
||||
|
||||
inline utils::MemoryResource *GetMemoryResource() { return frames_[0].GetMemoryResource(); }
|
||||
|
||||
|
@ -2827,9 +2827,15 @@ class DistributedExpandCursor : public Cursor {
|
||||
vertex_id_to_result_row.erase(vertex.Id());
|
||||
}
|
||||
|
||||
void PullEdgesFromStorage(ExecutionContext &context) {
|
||||
// Input Vertex could be null if it is created by a failed optional match. In
|
||||
// those cases we skip that input pull and continue with the next.
|
||||
bool PullInputFrames(ExecutionContext &context) {
|
||||
input_cursor_->PullMultiple(*own_multi_frame_, context);
|
||||
// These needs to be updated regardless of the result of the pull, otherwise the consumer and iterator might
|
||||
// get corrupted because of the operations done on our MultiFrame.
|
||||
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
if (!own_multi_frame_->HasValidFrame()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
msgs::ExpandOneRequest request;
|
||||
request.direction = DirectionToMsgsDirection(self_.common_.direction);
|
||||
@ -2842,7 +2848,7 @@ class DistributedExpandCursor : public Cursor {
|
||||
MG_ASSERT(!vertex_value.IsNull());
|
||||
|
||||
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
||||
auto &vertex = vertex_value.ValueVertex();
|
||||
const auto &vertex = vertex_value.ValueVertex();
|
||||
request.src_vertices.push_back(vertex.Id());
|
||||
}
|
||||
|
||||
@ -2854,6 +2860,8 @@ class DistributedExpandCursor : public Cursor {
|
||||
for (auto &row : result_rows_) {
|
||||
vertex_id_to_result_row[row.src_vertex.id] = &row;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
||||
@ -2862,49 +2870,55 @@ class DistributedExpandCursor : public Cursor {
|
||||
EnsureOwnMultiFrameIsGood(output_multi_frame);
|
||||
// A helper function for expanding a node from an edge.
|
||||
|
||||
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||
|
||||
while (true) {
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
input_cursor_->PullMultiple(*own_multi_frame_, context);
|
||||
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
if (!own_multi_frame_->HasValidFrame()) {
|
||||
switch (state_) {
|
||||
case State::PullInputAndEdges: {
|
||||
if (!PullInputFrames(context)) {
|
||||
state_ = State::Exhausted;
|
||||
return;
|
||||
}
|
||||
state_ = State::InitInOutEdgesIt;
|
||||
break;
|
||||
}
|
||||
|
||||
PullEdgesFromStorage(context);
|
||||
InitEdgesMultiple(context);
|
||||
}
|
||||
|
||||
while (own_frames_it_ != own_frames_consumer_->end()) {
|
||||
if (current_in_edge_it_ == current_in_edges_.end() && current_out_edge_it_ == current_out_edges_.end()) {
|
||||
own_frames_it_->MakeInvalid();
|
||||
++own_frames_it_;
|
||||
|
||||
InitEdgesMultiple(context);
|
||||
}
|
||||
|
||||
auto &input_frame = *own_frames_it_;
|
||||
|
||||
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||
|
||||
auto populate_edges = [this, &context, &output_frames_populator, &input_frame](
|
||||
std::vector<EdgeAccessor>::iterator ¤t,
|
||||
const std::vector<EdgeAccessor>::iterator &end) {
|
||||
for (auto output_frame_it = output_frames_populator.begin();
|
||||
output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) {
|
||||
auto &edge = *current;
|
||||
++current;
|
||||
auto &output_frame = *output_frame_it++;
|
||||
output_frame = input_frame;
|
||||
output_frame[self_.common_.edge_symbol] = edge;
|
||||
PullDstVertex(output_frame, context, EdgeAtom::Direction::IN);
|
||||
case State::InitInOutEdgesIt: {
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
state_ = State::PullInputAndEdges;
|
||||
} else {
|
||||
InitEdges(*own_frames_it_, context);
|
||||
state_ = State::PopulateOutput;
|
||||
}
|
||||
};
|
||||
populate_edges(current_in_edge_it_, current_in_edges_.end());
|
||||
populate_edges(current_out_edge_it_, current_out_edges_.end());
|
||||
|
||||
if (output_frames_populator.begin() == output_frames_populator.end()) {
|
||||
break;
|
||||
}
|
||||
case State::PopulateOutput: {
|
||||
if (!output_multi_frame.HasInvalidFrame()) {
|
||||
return;
|
||||
}
|
||||
if (current_in_edge_it_ == current_in_edges_.end() && current_out_edge_it_ == current_out_edges_.end()) {
|
||||
own_frames_it_->MakeInvalid();
|
||||
++own_frames_it_;
|
||||
state_ = State::InitInOutEdgesIt;
|
||||
continue;
|
||||
}
|
||||
auto populate_edges = [this, &context, &output_frames_populator](
|
||||
const EdgeAtom::Direction direction, std::vector<EdgeAccessor>::iterator ¤t,
|
||||
const std::vector<EdgeAccessor>::iterator &end) {
|
||||
for (auto output_frame_it = output_frames_populator.begin();
|
||||
output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) {
|
||||
auto &edge = *current;
|
||||
++current;
|
||||
auto &output_frame = *output_frame_it++;
|
||||
output_frame = *own_frames_it_;
|
||||
output_frame[self_.common_.edge_symbol] = edge;
|
||||
PullDstVertex(output_frame, context, direction);
|
||||
}
|
||||
};
|
||||
populate_edges(EdgeAtom::Direction::IN, current_in_edge_it_, current_in_edges_.end());
|
||||
populate_edges(EdgeAtom::Direction::OUT, current_out_edge_it_, current_out_edges_.end());
|
||||
break;
|
||||
}
|
||||
case State::Exhausted: {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -2925,6 +2939,12 @@ class DistributedExpandCursor : public Cursor {
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
vertex_id_to_result_row.clear();
|
||||
result_rows_.clear();
|
||||
own_frames_it_ = ValidFramesConsumer::Iterator{};
|
||||
own_frames_consumer_.reset();
|
||||
own_multi_frame_->MakeAllFramesInvalid();
|
||||
state_ = State::PullInputAndEdges;
|
||||
current_in_edges_.clear();
|
||||
current_out_edges_.clear();
|
||||
current_in_edge_it_ = current_in_edges_.end();
|
||||
@ -2932,12 +2952,15 @@ class DistributedExpandCursor : public Cursor {
|
||||
}
|
||||
|
||||
private:
|
||||
enum class State { PullInputAndEdges, InitInOutEdgesIt, PopulateOutput, Exhausted };
|
||||
|
||||
const Expand &self_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
std::vector<EdgeAccessor> current_in_edges_;
|
||||
std::vector<EdgeAccessor> current_out_edges_;
|
||||
std::vector<EdgeAccessor>::iterator current_in_edge_it_;
|
||||
std::vector<EdgeAccessor>::iterator current_out_edge_it_;
|
||||
State state_{State::PullInputAndEdges};
|
||||
std::optional<MultiFrame> own_multi_frame_;
|
||||
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator own_frames_it_;
|
||||
|
Loading…
Reference in New Issue
Block a user