From 36891c119b28d472e4d831af2f318cc36c3fd670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <benjamin.antal@memgraph.io> Date: Tue, 17 Jan 2023 07:17:53 +0100 Subject: [PATCH] Remove unnecessary state from `DistributedScanAllAndFilterCursor` --- src/query/v2/plan/operator.cpp | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index a57799367..1e84eaae5 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -467,31 +467,24 @@ class DistributedScanAllAndFilterCursor : public Cursor { current_batch_ = request_router.ScanVertices(request_label); } current_vertex_it_ = current_batch_.begin(); - request_state_ = State::COMPLETED; return !current_batch_.empty(); } bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); - auto &request_router = *context.request_router; while (true) { if (MustAbort(context)) { throw HintedAbortError(); } - if (request_state_ == State::INITIALIZING) { - if (!input_cursor_->Pull(frame, context)) { + if (current_vertex_it_ == current_batch_.end()) { + ResetExecutionState(); + if (!input_cursor_->Pull(frame, context) || !MakeRequest(*context.request_router, context)) { return false; } } - if (current_vertex_it_ == current_batch_.end() && - (request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) { - ResetExecutionState(); - continue; - } - frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_)); ++current_vertex_it_; return true; @@ -568,7 +561,6 @@ class DistributedScanAllAndFilterCursor : public Cursor { void ResetExecutionState() { current_batch_.clear(); current_vertex_it_ = current_batch_.end(); - request_state_ = State::INITIALIZING; } void Reset() override { @@ -581,15 +573,13 @@ class DistributedScanAllAndFilterCursor : public Cursor { const UniqueCursorPtr input_cursor_; const char *op_name_; std::vector<VertexAccessor> current_batch_; - std::vector<VertexAccessor>::iterator current_vertex_it_; - State request_state_ = State::INITIALIZING; + std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()}; std::optional<storage::v3::LabelId> label_; std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_; std::optional<std::vector<Expression *>> filter_expressions_; std::optional<MultiFrame> own_multi_frames_; std::optional<ValidFramesConsumer> valid_frames_consumer_; ValidFramesConsumer::Iterator valid_frames_it_; - std::queue<FrameWithValidity> frames_buffer_; }; ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view) @@ -597,8 +587,6 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy ACCEPT_WITH_INPUT(ScanAll) -class DistributedScanAllCursor; - UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const { EventCounter::IncrementCounter(EventCounter::ScanAllOperator);