Init basic cursor class
Conform clang-tidy and modify PullMultiple behavior
This commit is contained in:
parent
f36b96744c
commit
68175bc97c
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -200,9 +200,9 @@ class ValidFramesConsumer {
|
||||
explicit ValidFramesConsumer(MultiFrame &multiframe);
|
||||
|
||||
~ValidFramesConsumer() noexcept;
|
||||
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
|
||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer(const ValidFramesConsumer &other) = default;
|
||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
|
||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = default;
|
||||
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -462,6 +462,162 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||
};
|
||||
|
||||
class DistributedScanAllByPrimaryKeyCursor : public Cursor {
|
||||
public:
|
||||
explicit DistributedScanAllByPrimaryKeyCursor(
|
||||
Symbol output_symbol, UniqueCursorPtr input_cursor, const char *op_name,
|
||||
std::optional<storage::v3::LabelId> label,
|
||||
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair,
|
||||
std::optional<std::vector<Expression *>> filter_expressions)
|
||||
: output_symbol_(output_symbol),
|
||||
input_cursor_(std::move(input_cursor)),
|
||||
op_name_(op_name),
|
||||
label_(label),
|
||||
property_expression_pair_(property_expression_pair),
|
||||
filter_expressions_(filter_expressions) {
|
||||
ResetExecutionState();
|
||||
}
|
||||
|
||||
enum class State : int8_t { INITIALIZING, COMPLETED };
|
||||
|
||||
using VertexAccessor = accessors::VertexAccessor;
|
||||
|
||||
bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) {
|
||||
{
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
std::optional<std::string> request_label = std::nullopt;
|
||||
if (label_.has_value()) {
|
||||
request_label = request_router.LabelToName(*label_);
|
||||
}
|
||||
current_batch_ = request_router.ScanVertices(request_label);
|
||||
}
|
||||
current_vertex_it_ = current_batch_.begin();
|
||||
request_state_ = State::COMPLETED;
|
||||
return !current_batch_.empty();
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
auto &request_router = *context.request_router;
|
||||
while (true) {
|
||||
if (MustAbort(context)) {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
|
||||
if (request_state_ == State::INITIALIZING) {
|
||||
if (!input_cursor_->Pull(frame, context)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (current_vertex_it_ == current_batch_.end() &&
|
||||
(request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
|
||||
ResetExecutionState();
|
||||
continue;
|
||||
}
|
||||
|
||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
|
||||
++current_vertex_it_;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void PrepareNextFrames(ExecutionContext &context) {
|
||||
auto &request_router = *context.request_router;
|
||||
|
||||
input_cursor_->PullMultiple(*own_multi_frames_, context);
|
||||
valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer();
|
||||
valid_frames_it_ = valid_frames_consumer_->begin();
|
||||
|
||||
MakeRequest(request_router, context);
|
||||
}
|
||||
|
||||
inline bool HasNextFrame() {
|
||||
return current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end();
|
||||
}
|
||||
|
||||
FrameWithValidity GetNextFrame(ExecutionContext &context) {
|
||||
MG_ASSERT(HasNextFrame());
|
||||
|
||||
auto frame = *valid_frames_it_;
|
||||
frame[output_symbol_] = TypedValue(*current_vertex_it_);
|
||||
|
||||
++current_vertex_it_;
|
||||
if (current_vertex_it_ == current_batch_.end()) {
|
||||
valid_frames_it_->MakeInvalid();
|
||||
++valid_frames_it_;
|
||||
|
||||
if (valid_frames_it_ == valid_frames_consumer_->end()) {
|
||||
PrepareNextFrames(context);
|
||||
} else {
|
||||
current_vertex_it_ = current_batch_.begin();
|
||||
}
|
||||
};
|
||||
|
||||
return frame;
|
||||
}
|
||||
|
||||
void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
if (!own_multi_frames_.has_value()) {
|
||||
// NOLINTNEXTLINE(bugprone-narrowing-conversions)
|
||||
own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(),
|
||||
kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
|
||||
PrepareNextFrames(context);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (MustAbort(context)) {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
|
||||
auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
|
||||
auto invalid_frame_it = invalid_frames_populator.begin();
|
||||
auto has_modified_at_least_one_frame = false;
|
||||
|
||||
while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) {
|
||||
has_modified_at_least_one_frame = true;
|
||||
*invalid_frame_it = GetNextFrame(context);
|
||||
++invalid_frame_it;
|
||||
}
|
||||
|
||||
if (!has_modified_at_least_one_frame) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void ResetExecutionState() {
|
||||
current_batch_.clear();
|
||||
current_vertex_it_ = current_batch_.end();
|
||||
request_state_ = State::INITIALIZING;
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
input_cursor_->Reset();
|
||||
ResetExecutionState();
|
||||
}
|
||||
|
||||
private:
|
||||
const Symbol output_symbol_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
const char *op_name_;
|
||||
std::vector<VertexAccessor> current_batch_;
|
||||
std::vector<VertexAccessor>::iterator current_vertex_it_;
|
||||
State request_state_ = State::INITIALIZING;
|
||||
std::optional<storage::v3::LabelId> label_;
|
||||
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
|
||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||
std::optional<MultiFrame> own_multi_frames_;
|
||||
std::optional<ValidFramesConsumer> valid_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator valid_frames_it_;
|
||||
std::queue<FrameWithValidity> frames_buffer_;
|
||||
};
|
||||
|
||||
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
||||
: input_(input ? input : std::make_shared<Once>()), output_symbol_(output_symbol), view_(view) {}
|
||||
|
||||
@ -567,8 +723,13 @@ ScanAllByPrimaryKey::ScanAllByPrimaryKey(const std::shared_ptr<LogicalOperator>
|
||||
|
||||
ACCEPT_WITH_INPUT(ScanAllByPrimaryKey)
|
||||
|
||||
UniqueCursorPtr ScanAllByPrimaryKey::MakeCursor(utils::MemoryResource * /*mem*/) const {
|
||||
// EventCounter::IncrementCounter(EventCounter::ScanAllByPrimaryKeyOperator);
|
||||
UniqueCursorPtr ScanAllByPrimaryKey::MakeCursor(utils::MemoryResource *mem) const {
|
||||
EventCounter::IncrementCounter(EventCounter::ScanAllByPrimaryKeyOperator);
|
||||
|
||||
return MakeUniqueCursorPtr<DistributedScanAllByPrimaryKeyCursor>(
|
||||
mem, output_symbol_, input_->MakeCursor(mem), "ScanAll", std::nullopt /*label*/,
|
||||
std::nullopt /*property_expression_pair*/, std::nullopt /*filter_expressions*/);
|
||||
|
||||
throw QueryRuntimeException("ScanAllByPrimaryKey cursur is yet to be implemented.");
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -67,7 +67,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
|
||||
bool PreVisit(ScanAllByLabelPropertyValue &) override;
|
||||
bool PreVisit(ScanAllByLabelPropertyRange &) override;
|
||||
bool PreVisit(ScanAllByLabelProperty &) override;
|
||||
bool PreVisit(query::v2::plan::ScanAllByPrimaryKey &) override;
|
||||
bool PreVisit(ScanAllByPrimaryKey & /*unused*/) override;
|
||||
|
||||
bool PreVisit(Expand &) override;
|
||||
bool PreVisit(ExpandVariable &) override;
|
||||
@ -194,7 +194,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
|
||||
bool PreVisit(ScanAllByLabelPropertyRange &) override;
|
||||
bool PreVisit(ScanAllByLabelPropertyValue &) override;
|
||||
bool PreVisit(ScanAllByLabelProperty &) override;
|
||||
bool PreVisit(ScanAllByPrimaryKey &) override;
|
||||
bool PreVisit(ScanAllByPrimaryKey & /*unused*/) override;
|
||||
|
||||
bool PreVisit(Produce &) override;
|
||||
bool PreVisit(Accumulate &) override;
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -278,7 +278,7 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor {
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PostVisit(ScanAllByPrimaryKey &) override {
|
||||
bool PostVisit(ScanAllByPrimaryKey & /*unused*/) override {
|
||||
prev_ops_.pop_back();
|
||||
return true;
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -115,7 +115,7 @@ class RequestRouterInterface {
|
||||
virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0;
|
||||
virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0;
|
||||
// TODO - (gvolfing) Implement this function in the mocked class.
|
||||
virtual std::vector<coordinator::SchemaProperty> GetSchemaForLabel(storage::v3::LabelId label) const {
|
||||
virtual std::vector<coordinator::SchemaProperty> GetSchemaForLabel(storage::v3::LabelId /*label*/) const {
|
||||
return std::vector<coordinator::SchemaProperty>{};
|
||||
};
|
||||
};
|
||||
|
@ -25,6 +25,7 @@
|
||||
M(ScanAllByLabelPropertyValueOperator, "Number of times ScanAllByLabelPropertyValue operator was used.") \
|
||||
M(ScanAllByLabelPropertyOperator, "Number of times ScanAllByLabelProperty operator was used.") \
|
||||
M(ScanAllByIdOperator, "Number of times ScanAllById operator was used.") \
|
||||
M(ScanAllByPrimaryKeyOperator, "Number of times ScanAllByPrimaryKey operator was used.") \
|
||||
M(ExpandOperator, "Number of times Expand operator was used.") \
|
||||
M(ExpandVariableOperator, "Number of times ExpandVariable operator was used.") \
|
||||
M(ConstructNamedPathOperator, "Number of times ConstructNamedPath operator was used.") \
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -86,11 +86,12 @@ class TestPlanner : public ::testing::Test {};
|
||||
|
||||
using PlannerTypes = ::testing::Types<Planner>;
|
||||
|
||||
void DeleteListContent(std::list<BaseOpChecker *> *list) {
|
||||
for (BaseOpChecker *ptr : *list) {
|
||||
delete ptr;
|
||||
}
|
||||
}
|
||||
// void DeleteListContent(std::list<BaseOpChecker *> *list) {
|
||||
// for (BaseOpChecker *ptr : *list) {
|
||||
// delete ptr;
|
||||
// }
|
||||
// }
|
||||
|
||||
TYPED_TEST_CASE(TestPlanner, PlannerTypes);
|
||||
|
||||
TYPED_TEST(TestPlanner, MatchFilterPropIsNotNull) {
|
||||
@ -171,8 +172,8 @@ TYPED_TEST(TestPlanner, MatchFilterPropIsNotNull) {
|
||||
dba.SetIndexCount(label, sec_prop_three.second, 1);
|
||||
memgraph::query::v2::AstStorage storage;
|
||||
|
||||
memgraph::query::v2::Expression *expected_primary_key;
|
||||
expected_primary_key = PROPERTY_LOOKUP("n", prim_prop_one);
|
||||
// memgraph::query::v2::Expression *expected_primary_key;
|
||||
// expected_primary_key = PROPERTY_LOOKUP("n", prim_prop_one);
|
||||
auto *query = QUERY(SINGLE_QUERY(MATCH(PATTERN(NODE("n", prim_label_name))),
|
||||
WHERE(EQ(PROPERTY_LOOKUP("n", prim_prop_one), LITERAL(1))), RETURN("n")));
|
||||
auto symbol_table = (memgraph::expr::MakeSymbolTable(query));
|
||||
@ -183,12 +184,6 @@ TYPED_TEST(TestPlanner, MatchFilterPropIsNotNull) {
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
TYPED_TEST(TestPlanner, MatchNodeReturn) {
|
||||
// Test MATCH (n) RETURN n
|
||||
AstStorage storage;
|
||||
@ -1705,5 +1700,4 @@ TYPED_TEST(TestPlanner, Foreach) {
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
} // namespace
|
||||
|
Loading…
Reference in New Issue
Block a user