Merge branch 'project-pineapples' into T1173-MG-benchmark-datastructures
This commit is contained in:
commit
5beb7c0966
@ -42,4 +42,19 @@ class Frame {
|
||||
utils::pmr::vector<TypedValue> elems_;
|
||||
};
|
||||
|
||||
template <typename TypedValue>
|
||||
class FrameWithValidity final : public Frame<TypedValue> {
|
||||
public:
|
||||
explicit FrameWithValidity(int64_t size) : Frame<TypedValue>(size), is_valid_(false) {}
|
||||
|
||||
FrameWithValidity(int64_t size, utils::MemoryResource *memory) : Frame<TypedValue>(size, memory), is_valid_(false) {}
|
||||
|
||||
bool IsValid() const noexcept { return is_valid_; }
|
||||
void MakeValid() noexcept { is_valid_ = true; }
|
||||
void MakeInvalid() noexcept { is_valid_ = false; }
|
||||
|
||||
private:
|
||||
bool is_valid_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::expr
|
||||
|
@ -23,7 +23,8 @@ set(mg_query_v2_sources
|
||||
plan/variable_start_planner.cpp
|
||||
serialization/property_value.cpp
|
||||
bindings/typed_value.cpp
|
||||
accessors.cpp)
|
||||
accessors.cpp
|
||||
multiframe.cpp)
|
||||
|
||||
find_package(Boost REQUIRED)
|
||||
|
||||
|
@ -13,9 +13,10 @@
|
||||
|
||||
#include "query/v2/bindings/bindings.hpp"
|
||||
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "expr/interpret/frame.hpp"
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
using Frame = memgraph::expr::Frame<TypedValue>;
|
||||
} // namespace memgraph::query::v2
|
||||
using FrameWithValidity = memgraph::expr::FrameWithValidity<TypedValue>;
|
||||
} // namespace memgraph::query::v2
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include "query/v2/frontend/ast/ast.hpp"
|
||||
#include "query/v2/frontend/semantic/required_privileges.hpp"
|
||||
#include "query/v2/metadata.hpp"
|
||||
#include "query/v2/multiframe.hpp"
|
||||
#include "query/v2/plan/planner.hpp"
|
||||
#include "query/v2/plan/profile.hpp"
|
||||
#include "query/v2/plan/vertex_count_cache.hpp"
|
||||
@ -655,11 +656,15 @@ struct PullPlan {
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullMultiple(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
|
||||
private:
|
||||
std::shared_ptr<CachedPlan> plan_ = nullptr;
|
||||
plan::UniqueCursorPtr cursor_ = nullptr;
|
||||
expr::Frame<TypedValue> frame_;
|
||||
expr::FrameWithValidity<TypedValue> frame_;
|
||||
MultiFrame multi_frame_;
|
||||
ExecutionContext ctx_;
|
||||
std::optional<size_t> memory_limit_;
|
||||
|
||||
@ -683,6 +688,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
: plan_(plan),
|
||||
cursor_(plan->plan().MakeCursor(execution_memory)),
|
||||
frame_(plan->symbol_table().max_position(), execution_memory),
|
||||
multi_frame_(plan->symbol_table().max_position(), kNumberOfFramesInMultiframe, execution_memory),
|
||||
memory_limit_(memory_limit) {
|
||||
ctx_.db_accessor = dba;
|
||||
ctx_.symbol_table = plan->symbol_table();
|
||||
@ -699,9 +705,116 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc;
|
||||
}
|
||||
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||
// single `Pull`.
|
||||
MG_ASSERT(!n.has_value(), "should pull all!");
|
||||
static constexpr size_t stack_size = 256UL * 1024UL;
|
||||
char stack_data[stack_size];
|
||||
utils::ResourceWithOutOfMemoryException resource_with_exception;
|
||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
|
||||
// We can throw on every query because a simple queries for deleting will use only
|
||||
// the stack allocated buffer.
|
||||
// Also, we want to throw only when the query engine requests more memory and not the storage
|
||||
// so we add the exception to the allocator.
|
||||
// TODO (mferencevic): Tune the parameters accordingly.
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
|
||||
|
||||
if (memory_limit_) {
|
||||
maybe_limited_resource.emplace(&pool_memory, *memory_limit_);
|
||||
ctx_.evaluation_context.memory = &*maybe_limited_resource;
|
||||
} else {
|
||||
ctx_.evaluation_context.memory = &pool_memory;
|
||||
}
|
||||
|
||||
// Returns true if a result was pulled.
|
||||
const auto pull_result = [&]() -> bool {
|
||||
cursor_->PullMultiple(multi_frame_, ctx_);
|
||||
return multi_frame_.HasValidFrame();
|
||||
};
|
||||
|
||||
const auto stream_values = [&output_symbols, &stream](const Frame &frame) {
|
||||
// TODO: The streamed values should also probably use the above memory.
|
||||
std::vector<TypedValue> values;
|
||||
values.reserve(output_symbols.size());
|
||||
|
||||
for (const auto &symbol : output_symbols) {
|
||||
values.emplace_back(frame[symbol]);
|
||||
}
|
||||
|
||||
stream->Result(values);
|
||||
};
|
||||
|
||||
// Get the execution time of all possible result pulls and streams.
|
||||
utils::Timer timer;
|
||||
|
||||
int i = 0;
|
||||
if (has_unsent_results_ && !output_symbols.empty()) {
|
||||
// stream unsent results from previous pull
|
||||
|
||||
auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
|
||||
for (const auto &frame : iterator_for_valid_frame_only) {
|
||||
stream_values(frame);
|
||||
++i;
|
||||
}
|
||||
multi_frame_.MakeAllFramesInvalid();
|
||||
}
|
||||
|
||||
for (; !n || i < n;) {
|
||||
if (!pull_result()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!output_symbols.empty()) {
|
||||
auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
|
||||
for (const auto &frame : iterator_for_valid_frame_only) {
|
||||
stream_values(frame);
|
||||
++i;
|
||||
}
|
||||
}
|
||||
multi_frame_.MakeAllFramesInvalid();
|
||||
}
|
||||
|
||||
// If we finished because we streamed the requested n results,
|
||||
// we try to pull the next result to see if there is more.
|
||||
// If there is additional result, we leave the pulled result in the frame
|
||||
// and set the flag to true.
|
||||
has_unsent_results_ = i == n && pull_result();
|
||||
|
||||
execution_time_ += timer.Elapsed();
|
||||
|
||||
if (has_unsent_results_) {
|
||||
return std::nullopt;
|
||||
}
|
||||
summary->insert_or_assign("plan_execution_time", execution_time_.count());
|
||||
// We are finished with pulling all the data, therefore we can send any
|
||||
// metadata about the results i.e. notifications and statistics
|
||||
const bool is_any_counter_set =
|
||||
std::any_of(ctx_.execution_stats.counters.begin(), ctx_.execution_stats.counters.end(),
|
||||
[](const auto &counter) { return counter > 0; });
|
||||
if (is_any_counter_set) {
|
||||
std::map<std::string, TypedValue> stats;
|
||||
for (size_t i = 0; i < ctx_.execution_stats.counters.size(); ++i) {
|
||||
stats.emplace(ExecutionStatsKeyToString(ExecutionStats::Key(i)), ctx_.execution_stats.counters[i]);
|
||||
}
|
||||
summary->insert_or_assign("stats", std::move(stats));
|
||||
}
|
||||
cursor_->Shutdown();
|
||||
ctx_.profile_execution_time = execution_time_;
|
||||
return GetStatsWithTotalTime(ctx_);
|
||||
}
|
||||
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple
|
||||
if (should_pull_multiple) {
|
||||
return PullMultiple(stream, n, output_symbols, summary);
|
||||
}
|
||||
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||
// single `Pull`.
|
||||
|
127
src/query/v2/multiframe.cpp
Normal file
127
src/query/v2/multiframe.cpp
Normal file
@ -0,0 +1,127 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "query/v2/multiframe.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <iterator>
|
||||
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
#include "utils/pmr/vector.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
static_assert(std::forward_iterator<ValidFramesReader::Iterator>);
|
||||
static_assert(std::forward_iterator<ValidFramesModifier::Iterator>);
|
||||
static_assert(std::forward_iterator<ValidFramesConsumer::Iterator>);
|
||||
static_assert(std::forward_iterator<InvalidFramesPopulator::Iterator>);
|
||||
|
||||
MultiFrame::MultiFrame(int64_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory)
|
||||
: frames_(utils::pmr::vector<FrameWithValidity>(
|
||||
number_of_frames, FrameWithValidity(size_of_frame, execution_memory), execution_memory)) {
|
||||
MG_ASSERT(number_of_frames > 0);
|
||||
}
|
||||
|
||||
MultiFrame::MultiFrame(const MultiFrame &other) : frames_{other.frames_} {}
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-exception-escape)
|
||||
MultiFrame::MultiFrame(MultiFrame &&other) noexcept : frames_(std::move(other.frames_)) {}
|
||||
|
||||
FrameWithValidity &MultiFrame::GetFirstFrame() {
|
||||
MG_ASSERT(!frames_.empty());
|
||||
return frames_.front();
|
||||
}
|
||||
|
||||
void MultiFrame::MakeAllFramesInvalid() noexcept {
|
||||
std::for_each(frames_.begin(), frames_.end(), [](auto &frame) { frame.MakeInvalid(); });
|
||||
}
|
||||
|
||||
bool MultiFrame::HasValidFrame() const noexcept {
|
||||
return std::any_of(frames_.begin(), frames_.end(), [](auto &frame) { return frame.IsValid(); });
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-exception-escape)
|
||||
void MultiFrame::DefragmentValidFrames() noexcept {
|
||||
/*
|
||||
from: https://en.cppreference.com/w/cpp/algorithm/remove
|
||||
"Removing is done by shifting (by means of copy assignment (until C++11)move assignment (since C++11)) the elements
|
||||
in the range in such a way that the elements that are not to be removed appear in the beginning of the range.
|
||||
Relative order of the elements that remain is preserved and the physical size of the container is unchanged."
|
||||
*/
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-unused-return-value)
|
||||
std::remove_if(frames_.begin(), frames_.end(), [](auto &frame) { return !frame.IsValid(); });
|
||||
}
|
||||
|
||||
ValidFramesReader MultiFrame::GetValidFramesReader() { return ValidFramesReader{*this}; }
|
||||
|
||||
ValidFramesModifier MultiFrame::GetValidFramesModifier() { return ValidFramesModifier{*this}; }
|
||||
|
||||
ValidFramesConsumer MultiFrame::GetValidFramesConsumer() { return ValidFramesConsumer{*this}; }
|
||||
|
||||
InvalidFramesPopulator MultiFrame::GetInvalidFramesPopulator() { return InvalidFramesPopulator{*this}; }
|
||||
|
||||
ValidFramesReader::ValidFramesReader(MultiFrame &multiframe) : multiframe_(multiframe) {
|
||||
/*
|
||||
From: https://en.cppreference.com/w/cpp/algorithm/find
|
||||
Returns an iterator to the first element in the range [first, last) that satisfies specific criteria:
|
||||
find_if searches for an element for which predicate p returns true
|
||||
Return value
|
||||
Iterator to the first element satisfying the condition or last if no such element is found.
|
||||
|
||||
-> this is what we want. We want the "after" last valid frame (weather this is vector::end or and invalid frame).
|
||||
*/
|
||||
auto it = std::find_if(multiframe.frames_.begin(), multiframe.frames_.end(),
|
||||
[](const auto &frame) { return !frame.IsValid(); });
|
||||
after_last_valid_frame_ = multiframe_.frames_.data() + std::distance(multiframe.frames_.begin(), it);
|
||||
}
|
||||
|
||||
ValidFramesReader::Iterator ValidFramesReader::begin() { return Iterator{&multiframe_.frames_[0]}; }
|
||||
ValidFramesReader::Iterator ValidFramesReader::end() { return Iterator{after_last_valid_frame_}; }
|
||||
|
||||
ValidFramesModifier::ValidFramesModifier(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
ValidFramesModifier::Iterator ValidFramesModifier::begin() { return Iterator{&multiframe_.frames_[0], *this}; }
|
||||
ValidFramesModifier::Iterator ValidFramesModifier::end() {
|
||||
return Iterator{multiframe_.frames_.data() + multiframe_.frames_.size(), *this};
|
||||
}
|
||||
|
||||
ValidFramesConsumer::ValidFramesConsumer(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
// NOLINTNEXTLINE (bugprone-exception-escape)
|
||||
ValidFramesConsumer::~ValidFramesConsumer() noexcept {
|
||||
// TODO Possible optimisation: only DefragmentValidFrames if one frame has been invalidated? Only if does not
|
||||
// cost too much to store it
|
||||
multiframe_.DefragmentValidFrames();
|
||||
}
|
||||
|
||||
ValidFramesConsumer::Iterator ValidFramesConsumer::begin() { return Iterator{&multiframe_.frames_[0], *this}; }
|
||||
|
||||
ValidFramesConsumer::Iterator ValidFramesConsumer::end() {
|
||||
return Iterator{multiframe_.frames_.data() + multiframe_.frames_.size(), *this};
|
||||
}
|
||||
|
||||
InvalidFramesPopulator::InvalidFramesPopulator(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
InvalidFramesPopulator::Iterator InvalidFramesPopulator::begin() {
|
||||
for (auto &frame : multiframe_.frames_) {
|
||||
if (!frame.IsValid()) {
|
||||
return Iterator{&frame};
|
||||
}
|
||||
}
|
||||
return end();
|
||||
}
|
||||
|
||||
InvalidFramesPopulator::Iterator InvalidFramesPopulator::end() {
|
||||
return Iterator{multiframe_.frames_.data() + multiframe_.frames_.size()};
|
||||
}
|
||||
|
||||
} // namespace memgraph::query::v2
|
302
src/query/v2/multiframe.hpp
Normal file
302
src/query/v2/multiframe.hpp
Normal file
@ -0,0 +1,302 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <iterator>
|
||||
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
constexpr uint64_t kNumberOfFramesInMultiframe = 1000; // TODO have it configurable
|
||||
|
||||
class ValidFramesConsumer;
|
||||
class ValidFramesModifier;
|
||||
class ValidFramesReader;
|
||||
class InvalidFramesPopulator;
|
||||
|
||||
class MultiFrame {
|
||||
public:
|
||||
friend class ValidFramesConsumer;
|
||||
friend class ValidFramesModifier;
|
||||
friend class ValidFramesReader;
|
||||
friend class InvalidFramesPopulator;
|
||||
|
||||
MultiFrame(int64_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory);
|
||||
~MultiFrame() = default;
|
||||
|
||||
MultiFrame(const MultiFrame &other);
|
||||
MultiFrame(MultiFrame &&other) noexcept;
|
||||
MultiFrame &operator=(const MultiFrame &other) = delete;
|
||||
MultiFrame &operator=(MultiFrame &&other) noexcept = delete;
|
||||
|
||||
/*
|
||||
* Returns a object on which one can iterate in a for-loop. By doing so, you will only get Frames that are in a valid
|
||||
* state in the MultiFrame.
|
||||
* Iteration goes in a deterministic order.
|
||||
* One can't modify the validity of the Frame nor its content with this implementation.
|
||||
*/
|
||||
ValidFramesReader GetValidFramesReader();
|
||||
|
||||
/*
|
||||
* Returns a object on which one can iterate in a for-loop. By doing so, you will only get Frames that are in a valid
|
||||
* state in the MultiFrame.
|
||||
* Iteration goes in a deterministic order.
|
||||
* One can't modify the validity of the Frame with this implementation. One can modify its content.
|
||||
*/
|
||||
ValidFramesModifier GetValidFramesModifier();
|
||||
|
||||
/*
|
||||
* Returns a object on which one can iterate in a for-loop. By doing so, you will only get Frames that are in a valid
|
||||
* state in the MultiFrame.
|
||||
* Iteration goes in a deterministic order.
|
||||
* One can modify the validity of the Frame with this implementation.
|
||||
* If you do not plan to modify the validity of the Frames, use GetValidFramesReader/GetValidFramesModifer instead as
|
||||
* this is faster.
|
||||
*/
|
||||
ValidFramesConsumer GetValidFramesConsumer();
|
||||
|
||||
/*
|
||||
* Returns a object on which one can iterate in a for-loop. By doing so, you will only get Frames that are in an
|
||||
* invalid state in the MultiFrame. Iteration goes in a deterministic order. One can modify the validity of
|
||||
* the Frame with this implementation.
|
||||
*/
|
||||
InvalidFramesPopulator GetInvalidFramesPopulator();
|
||||
|
||||
/**
|
||||
* Return the first Frame of the MultiFrame. This is only meant to be used in very specific cases. Please consider
|
||||
* using the iterators instead.
|
||||
* The Frame can be valid or invalid.
|
||||
*/
|
||||
FrameWithValidity &GetFirstFrame();
|
||||
|
||||
void MakeAllFramesInvalid() noexcept;
|
||||
|
||||
bool HasValidFrame() const noexcept;
|
||||
|
||||
inline utils::MemoryResource *GetMemoryResource() { return frames_[0].GetMemoryResource(); }
|
||||
|
||||
private:
|
||||
void DefragmentValidFrames() noexcept;
|
||||
|
||||
utils::pmr::vector<FrameWithValidity> frames_;
|
||||
};
|
||||
|
||||
class ValidFramesReader {
|
||||
public:
|
||||
explicit ValidFramesReader(MultiFrame &multiframe);
|
||||
|
||||
~ValidFramesReader() = default;
|
||||
ValidFramesReader(const ValidFramesReader &other) = delete;
|
||||
ValidFramesReader(ValidFramesReader &&other) noexcept = delete;
|
||||
ValidFramesReader &operator=(const ValidFramesReader &other) = delete;
|
||||
ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = const Frame;
|
||||
using pointer = value_type *;
|
||||
using reference = const Frame &;
|
||||
|
||||
Iterator() = default;
|
||||
explicit Iterator(FrameWithValidity *ptr) : ptr_(ptr) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
Iterator &operator++() {
|
||||
ptr_++;
|
||||
return *this;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(cert-dcl21-cpp)
|
||||
Iterator operator++(int) {
|
||||
auto old = *this;
|
||||
ptr_++;
|
||||
return old;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ == rhs.ptr_; };
|
||||
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ != rhs.ptr_; };
|
||||
|
||||
private:
|
||||
FrameWithValidity *ptr_{nullptr};
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
FrameWithValidity *after_last_valid_frame_;
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
class ValidFramesModifier {
|
||||
public:
|
||||
explicit ValidFramesModifier(MultiFrame &multiframe);
|
||||
|
||||
~ValidFramesModifier() = default;
|
||||
ValidFramesModifier(const ValidFramesModifier &other) = delete;
|
||||
ValidFramesModifier(ValidFramesModifier &&other) noexcept = delete;
|
||||
ValidFramesModifier &operator=(const ValidFramesModifier &other) = delete;
|
||||
ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = Frame;
|
||||
using pointer = value_type *;
|
||||
using reference = Frame &;
|
||||
|
||||
Iterator() = default;
|
||||
Iterator(FrameWithValidity *ptr, ValidFramesModifier &iterator_wrapper)
|
||||
: ptr_(ptr), iterator_wrapper_(&iterator_wrapper) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
// Prefix increment
|
||||
Iterator &operator++() {
|
||||
do {
|
||||
ptr_++;
|
||||
} while (*this != iterator_wrapper_->end() && ptr_->IsValid());
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(cert-dcl21-cpp)
|
||||
Iterator operator++(int) {
|
||||
auto old = *this;
|
||||
++*this;
|
||||
return old;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ == rhs.ptr_; };
|
||||
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ != rhs.ptr_; };
|
||||
|
||||
private:
|
||||
FrameWithValidity *ptr_{nullptr};
|
||||
ValidFramesModifier *iterator_wrapper_{nullptr};
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
class ValidFramesConsumer {
|
||||
public:
|
||||
explicit ValidFramesConsumer(MultiFrame &multiframe);
|
||||
|
||||
~ValidFramesConsumer() noexcept;
|
||||
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
|
||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = FrameWithValidity;
|
||||
using pointer = value_type *;
|
||||
using reference = FrameWithValidity &;
|
||||
|
||||
Iterator() = default;
|
||||
Iterator(FrameWithValidity *ptr, ValidFramesConsumer &iterator_wrapper)
|
||||
: ptr_(ptr), iterator_wrapper_(&iterator_wrapper) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
Iterator &operator++() {
|
||||
do {
|
||||
ptr_++;
|
||||
} while (*this != iterator_wrapper_->end() && !ptr_->IsValid());
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(cert-dcl21-cpp)
|
||||
Iterator operator++(int) {
|
||||
auto old = *this;
|
||||
++*this;
|
||||
return old;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ == rhs.ptr_; };
|
||||
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ != rhs.ptr_; };
|
||||
|
||||
private:
|
||||
FrameWithValidity *ptr_{nullptr};
|
||||
ValidFramesConsumer *iterator_wrapper_{nullptr};
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
class InvalidFramesPopulator {
|
||||
public:
|
||||
explicit InvalidFramesPopulator(MultiFrame &multiframe);
|
||||
~InvalidFramesPopulator() = default;
|
||||
|
||||
InvalidFramesPopulator(const InvalidFramesPopulator &other) = delete;
|
||||
InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = delete;
|
||||
InvalidFramesPopulator &operator=(const InvalidFramesPopulator &other) = delete;
|
||||
InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = FrameWithValidity;
|
||||
using pointer = value_type *;
|
||||
using reference = FrameWithValidity &;
|
||||
|
||||
Iterator() = default;
|
||||
explicit Iterator(FrameWithValidity *ptr) : ptr_(ptr) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
Iterator &operator++() {
|
||||
ptr_->MakeValid();
|
||||
ptr_++;
|
||||
return *this;
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(cert-dcl21-cpp)
|
||||
Iterator operator++(int) {
|
||||
auto old = *this;
|
||||
++ptr_;
|
||||
return old;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ == rhs.ptr_; };
|
||||
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { return lhs.ptr_ != rhs.ptr_; };
|
||||
|
||||
private:
|
||||
FrameWithValidity *ptr_{nullptr};
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::query::v2
|
@ -264,6 +264,16 @@ bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("OnceMF");
|
||||
|
||||
if (!did_pull_) {
|
||||
auto &first_frame = multi_frame.GetFirstFrame();
|
||||
first_frame.MakeValid();
|
||||
did_pull_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const {
|
||||
EventCounter::IncrementCounter(EventCounter::OnceOperator);
|
||||
|
||||
@ -748,6 +758,23 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("ProduceMF");
|
||||
|
||||
input_cursor_->PullMultiple(multi_frame, context);
|
||||
|
||||
auto iterator_for_valid_frame_only = multi_frame.GetValidFramesModifier();
|
||||
for (auto &frame : iterator_for_valid_frame_only) {
|
||||
// Produce should always yield the latest results.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
||||
storage::v3::View::NEW);
|
||||
|
||||
for (auto *named_expr : self_.named_expressions_) {
|
||||
named_expr->Accept(evaluator);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void Produce::ProduceCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Produce::ProduceCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
#include "query/v2/bindings/symbol_table.hpp"
|
||||
#include "query/v2/multiframe.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "utils/bound.hpp"
|
||||
#include "utils/fnv.hpp"
|
||||
@ -71,6 +72,8 @@ class Cursor {
|
||||
/// @throws QueryRuntimeException if something went wrong with execution
|
||||
virtual bool Pull(Frame &, ExecutionContext &) = 0;
|
||||
|
||||
virtual void PullMultiple(MultiFrame &, ExecutionContext &) { LOG_FATAL("PullMultipleIsNotImplemented"); }
|
||||
|
||||
/// Resets the Cursor to its initial state.
|
||||
virtual void Reset() = 0;
|
||||
|
||||
@ -332,6 +335,7 @@ and false on every following Pull.")
|
||||
class OnceCursor : public Cursor {
|
||||
public:
|
||||
OnceCursor() {}
|
||||
void PullMultiple(MultiFrame &, ExecutionContext &) override;
|
||||
bool Pull(Frame &, ExecutionContext &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
@ -1207,6 +1211,7 @@ RETURN clause) the Produce's pull succeeds exactly once.")
|
||||
public:
|
||||
ProduceCursor(const Produce &, utils::MemoryResource *);
|
||||
bool Pull(Frame &, ExecutionContext &) override;
|
||||
void PullMultiple(MultiFrame &, ExecutionContext &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user