From 68175bc97cf66fda671569f25880ed35f85e16c5 Mon Sep 17 00:00:00 2001 From: gvolfing Date: Thu, 15 Dec 2022 15:20:32 +0100 Subject: [PATCH] Init basic cursor class Conform clang-tidy and modify PullMultiple behavior --- src/query/v2/multiframe.hpp | 8 +- src/query/v2/plan/operator.cpp | 167 ++++++++++++++++++++- src/query/v2/plan/pretty_print.hpp | 6 +- src/query/v2/plan/rewrite/index_lookup.hpp | 4 +- src/query/v2/request_router.hpp | 4 +- src/utils/event_counter.cpp | 1 + tests/unit/query_v2_plan.cpp | 24 ++- 7 files changed, 185 insertions(+), 29 deletions(-) diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp index 8588993a7..49dad940a 100644 --- a/src/query/v2/multiframe.hpp +++ b/src/query/v2/multiframe.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -200,9 +200,9 @@ class ValidFramesConsumer { explicit ValidFramesConsumer(MultiFrame &multiframe); ~ValidFramesConsumer() noexcept; - ValidFramesConsumer(const ValidFramesConsumer &other) = delete; - ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete; - ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete; + ValidFramesConsumer(const ValidFramesConsumer &other) = default; + ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default; + ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = default; ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete; struct Iterator { diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 7b8cfce07..02abdc3c4 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -462,6 +462,162 @@ class DistributedScanAllAndFilterCursor : public Cursor { std::optional> filter_expressions_; }; +class DistributedScanAllByPrimaryKeyCursor : public Cursor { + public: + explicit DistributedScanAllByPrimaryKeyCursor( + Symbol output_symbol, UniqueCursorPtr input_cursor, const char *op_name, + std::optional label, + std::optional> property_expression_pair, + std::optional> filter_expressions) + : output_symbol_(output_symbol), + input_cursor_(std::move(input_cursor)), + op_name_(op_name), + label_(label), + property_expression_pair_(property_expression_pair), + filter_expressions_(filter_expressions) { + ResetExecutionState(); + } + + enum class State : int8_t { INITIALIZING, COMPLETED }; + + using VertexAccessor = accessors::VertexAccessor; + + bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) { + { + SCOPED_REQUEST_WAIT_PROFILE; + std::optional request_label = std::nullopt; + if (label_.has_value()) { + request_label = request_router.LabelToName(*label_); + } + current_batch_ = request_router.ScanVertices(request_label); + } + current_vertex_it_ = current_batch_.begin(); + request_state_ = State::COMPLETED; + return !current_batch_.empty(); + } + + bool Pull(Frame &frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP(op_name_); + + auto &request_router = *context.request_router; + while (true) { + if (MustAbort(context)) { + throw HintedAbortError(); + } + + if (request_state_ == State::INITIALIZING) { + if (!input_cursor_->Pull(frame, context)) { + return false; + } + } + + if (current_vertex_it_ == current_batch_.end() && + (request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) { + ResetExecutionState(); + continue; + } + + frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_)); + ++current_vertex_it_; + return true; + } + } + + void PrepareNextFrames(ExecutionContext &context) { + auto &request_router = *context.request_router; + + input_cursor_->PullMultiple(*own_multi_frames_, context); + valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer(); + valid_frames_it_ = valid_frames_consumer_->begin(); + + MakeRequest(request_router, context); + } + + inline bool HasNextFrame() { + return current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end(); + } + + FrameWithValidity GetNextFrame(ExecutionContext &context) { + MG_ASSERT(HasNextFrame()); + + auto frame = *valid_frames_it_; + frame[output_symbol_] = TypedValue(*current_vertex_it_); + + ++current_vertex_it_; + if (current_vertex_it_ == current_batch_.end()) { + valid_frames_it_->MakeInvalid(); + ++valid_frames_it_; + + if (valid_frames_it_ == valid_frames_consumer_->end()) { + PrepareNextFrames(context); + } else { + current_vertex_it_ = current_batch_.begin(); + } + }; + + return frame; + } + + void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP(op_name_); + + if (!own_multi_frames_.has_value()) { + // NOLINTNEXTLINE(bugprone-narrowing-conversions) + own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), + kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource())); + PrepareNextFrames(context); + } + + while (true) { + if (MustAbort(context)) { + throw HintedAbortError(); + } + + auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator(); + auto invalid_frame_it = invalid_frames_populator.begin(); + auto has_modified_at_least_one_frame = false; + + while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) { + has_modified_at_least_one_frame = true; + *invalid_frame_it = GetNextFrame(context); + ++invalid_frame_it; + } + + if (!has_modified_at_least_one_frame) { + return; + } + } + }; + + void Shutdown() override { input_cursor_->Shutdown(); } + + void ResetExecutionState() { + current_batch_.clear(); + current_vertex_it_ = current_batch_.end(); + request_state_ = State::INITIALIZING; + } + + void Reset() override { + input_cursor_->Reset(); + ResetExecutionState(); + } + + private: + const Symbol output_symbol_; + const UniqueCursorPtr input_cursor_; + const char *op_name_; + std::vector current_batch_; + std::vector::iterator current_vertex_it_; + State request_state_ = State::INITIALIZING; + std::optional label_; + std::optional> property_expression_pair_; + std::optional> filter_expressions_; + std::optional own_multi_frames_; + std::optional valid_frames_consumer_; + ValidFramesConsumer::Iterator valid_frames_it_; + std::queue frames_buffer_; +}; + ScanAll::ScanAll(const std::shared_ptr &input, Symbol output_symbol, storage::v3::View view) : input_(input ? input : std::make_shared()), output_symbol_(output_symbol), view_(view) {} @@ -567,8 +723,13 @@ ScanAllByPrimaryKey::ScanAllByPrimaryKey(const std::shared_ptr ACCEPT_WITH_INPUT(ScanAllByPrimaryKey) -UniqueCursorPtr ScanAllByPrimaryKey::MakeCursor(utils::MemoryResource * /*mem*/) const { - // EventCounter::IncrementCounter(EventCounter::ScanAllByPrimaryKeyOperator); +UniqueCursorPtr ScanAllByPrimaryKey::MakeCursor(utils::MemoryResource *mem) const { + EventCounter::IncrementCounter(EventCounter::ScanAllByPrimaryKeyOperator); + + return MakeUniqueCursorPtr( + mem, output_symbol_, input_->MakeCursor(mem), "ScanAll", std::nullopt /*label*/, + std::nullopt /*property_expression_pair*/, std::nullopt /*filter_expressions*/); + throw QueryRuntimeException("ScanAllByPrimaryKey cursur is yet to be implemented."); } diff --git a/src/query/v2/plan/pretty_print.hpp b/src/query/v2/plan/pretty_print.hpp index 20fde180b..266b16102 100644 --- a/src/query/v2/plan/pretty_print.hpp +++ b/src/query/v2/plan/pretty_print.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -67,7 +67,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { bool PreVisit(ScanAllByLabelPropertyValue &) override; bool PreVisit(ScanAllByLabelPropertyRange &) override; bool PreVisit(ScanAllByLabelProperty &) override; - bool PreVisit(query::v2::plan::ScanAllByPrimaryKey &) override; + bool PreVisit(ScanAllByPrimaryKey & /*unused*/) override; bool PreVisit(Expand &) override; bool PreVisit(ExpandVariable &) override; @@ -194,7 +194,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { bool PreVisit(ScanAllByLabelPropertyRange &) override; bool PreVisit(ScanAllByLabelPropertyValue &) override; bool PreVisit(ScanAllByLabelProperty &) override; - bool PreVisit(ScanAllByPrimaryKey &) override; + bool PreVisit(ScanAllByPrimaryKey & /*unused*/) override; bool PreVisit(Produce &) override; bool PreVisit(Accumulate &) override; diff --git a/src/query/v2/plan/rewrite/index_lookup.hpp b/src/query/v2/plan/rewrite/index_lookup.hpp index 1a4a4160c..e134dd49b 100644 --- a/src/query/v2/plan/rewrite/index_lookup.hpp +++ b/src/query/v2/plan/rewrite/index_lookup.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -278,7 +278,7 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor { return true; } - bool PostVisit(ScanAllByPrimaryKey &) override { + bool PostVisit(ScanAllByPrimaryKey & /*unused*/) override { prev_ops_.pop_back(); return true; } diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index b5878ff92..ca8d759f1 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -115,7 +115,7 @@ class RequestRouterInterface { virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0; virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0; // TODO - (gvolfing) Implement this function in the mocked class. - virtual std::vector GetSchemaForLabel(storage::v3::LabelId label) const { + virtual std::vector GetSchemaForLabel(storage::v3::LabelId /*label*/) const { return std::vector{}; }; }; diff --git a/src/utils/event_counter.cpp b/src/utils/event_counter.cpp index 634b6eae2..8f7475ffb 100644 --- a/src/utils/event_counter.cpp +++ b/src/utils/event_counter.cpp @@ -25,6 +25,7 @@ M(ScanAllByLabelPropertyValueOperator, "Number of times ScanAllByLabelPropertyValue operator was used.") \ M(ScanAllByLabelPropertyOperator, "Number of times ScanAllByLabelProperty operator was used.") \ M(ScanAllByIdOperator, "Number of times ScanAllById operator was used.") \ + M(ScanAllByPrimaryKeyOperator, "Number of times ScanAllByPrimaryKey operator was used.") \ M(ExpandOperator, "Number of times Expand operator was used.") \ M(ExpandVariableOperator, "Number of times ExpandVariable operator was used.") \ M(ConstructNamedPathOperator, "Number of times ConstructNamedPath operator was used.") \ diff --git a/tests/unit/query_v2_plan.cpp b/tests/unit/query_v2_plan.cpp index 2eb8295db..54d091df7 100644 --- a/tests/unit/query_v2_plan.cpp +++ b/tests/unit/query_v2_plan.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -86,11 +86,12 @@ class TestPlanner : public ::testing::Test {}; using PlannerTypes = ::testing::Types; -void DeleteListContent(std::list *list) { - for (BaseOpChecker *ptr : *list) { - delete ptr; - } -} +// void DeleteListContent(std::list *list) { +// for (BaseOpChecker *ptr : *list) { +// delete ptr; +// } +// } + TYPED_TEST_CASE(TestPlanner, PlannerTypes); TYPED_TEST(TestPlanner, MatchFilterPropIsNotNull) { @@ -171,8 +172,8 @@ TYPED_TEST(TestPlanner, MatchFilterPropIsNotNull) { dba.SetIndexCount(label, sec_prop_three.second, 1); memgraph::query::v2::AstStorage storage; - memgraph::query::v2::Expression *expected_primary_key; - expected_primary_key = PROPERTY_LOOKUP("n", prim_prop_one); + // memgraph::query::v2::Expression *expected_primary_key; + // expected_primary_key = PROPERTY_LOOKUP("n", prim_prop_one); auto *query = QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", prim_label_name))), WHERE(EQ(PROPERTY_LOOKUP("n", prim_prop_one), LITERAL(1))), RETURN("n"))); auto symbol_table = (memgraph::expr::MakeSymbolTable(query)); @@ -183,12 +184,6 @@ TYPED_TEST(TestPlanner, MatchFilterPropIsNotNull) { } /* - - - - - - TYPED_TEST(TestPlanner, MatchNodeReturn) { // Test MATCH (n) RETURN n AstStorage storage; @@ -1705,5 +1700,4 @@ TYPED_TEST(TestPlanner, Foreach) { } } */ - } // namespace