Impl of Multiframe and iterators
This commit is contained in:
parent
c647134916
commit
b244c4d6ee
src
expr/interpret
query/v2
@ -42,4 +42,19 @@ class Frame {
|
||||
utils::pmr::vector<TypedValue> elems_;
|
||||
};
|
||||
|
||||
template <typename TypedValue>
|
||||
class FrameWithValidity final : public Frame<TypedValue> {
|
||||
public:
|
||||
explicit FrameWithValidity(int64_t size) : Frame<TypedValue>(size), is_valid_(false) {}
|
||||
|
||||
FrameWithValidity(int64_t size, utils::MemoryResource *memory) : Frame<TypedValue>(size, memory), is_valid_(false) {}
|
||||
|
||||
bool IsValid() const { return is_valid_; }
|
||||
void MakeValid() { is_valid_ = true; }
|
||||
void MakeInvalid() { is_valid_ = false; }
|
||||
|
||||
private:
|
||||
bool is_valid_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::expr
|
||||
|
@ -24,7 +24,8 @@ set(mg_query_v2_sources
|
||||
plan/variable_start_planner.cpp
|
||||
serialization/property_value.cpp
|
||||
bindings/typed_value.cpp
|
||||
accessors.cpp)
|
||||
accessors.cpp
|
||||
multiframe.cpp)
|
||||
|
||||
find_package(Boost REQUIRED)
|
||||
|
||||
|
@ -13,9 +13,10 @@
|
||||
|
||||
#include "query/v2/bindings/bindings.hpp"
|
||||
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "expr/interpret/frame.hpp"
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
using Frame = memgraph::expr::Frame<TypedValue>;
|
||||
} // namespace memgraph::query::v2
|
||||
using FrameWithValidity = memgraph::expr::FrameWithValidity<TypedValue>;
|
||||
} // namespace memgraph::query::v2
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include "query/v2/frontend/ast/ast.hpp"
|
||||
#include "query/v2/frontend/semantic/required_privileges.hpp"
|
||||
#include "query/v2/metadata.hpp"
|
||||
#include "query/v2/multiframe.hpp"
|
||||
#include "query/v2/plan/planner.hpp"
|
||||
#include "query/v2/plan/profile.hpp"
|
||||
#include "query/v2/plan/vertex_count_cache.hpp"
|
||||
@ -655,11 +656,15 @@ struct PullPlan {
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullMultiple(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary);
|
||||
|
||||
private:
|
||||
std::shared_ptr<CachedPlan> plan_ = nullptr;
|
||||
plan::UniqueCursorPtr cursor_ = nullptr;
|
||||
expr::Frame<TypedValue> frame_;
|
||||
expr::FrameWithValidity<TypedValue> frame_;
|
||||
MultiFrame multi_frame_;
|
||||
ExecutionContext ctx_;
|
||||
std::optional<size_t> memory_limit_;
|
||||
|
||||
@ -683,6 +688,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
: plan_(plan),
|
||||
cursor_(plan->plan().MakeCursor(execution_memory)),
|
||||
frame_(plan->symbol_table().max_position(), execution_memory),
|
||||
multi_frame_(frame_, kNumberOfFramesInMultiframe, execution_memory),
|
||||
memory_limit_(memory_limit) {
|
||||
ctx_.db_accessor = dba;
|
||||
ctx_.symbol_table = plan->symbol_table();
|
||||
@ -699,9 +705,116 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc;
|
||||
}
|
||||
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||
// single `Pull`.
|
||||
MG_ASSERT(!n.has_value(), "should pull all!");
|
||||
static constexpr size_t stack_size = 256UL * 1024UL;
|
||||
char stack_data[stack_size];
|
||||
utils::ResourceWithOutOfMemoryException resource_with_exception;
|
||||
utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
|
||||
// We can throw on every query because a simple queries for deleting will use only
|
||||
// the stack allocated buffer.
|
||||
// Also, we want to throw only when the query engine requests more memory and not the storage
|
||||
// so we add the exception to the allocator.
|
||||
// TODO (mferencevic): Tune the parameters accordingly.
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
|
||||
|
||||
if (memory_limit_) {
|
||||
maybe_limited_resource.emplace(&pool_memory, *memory_limit_);
|
||||
ctx_.evaluation_context.memory = &*maybe_limited_resource;
|
||||
} else {
|
||||
ctx_.evaluation_context.memory = &pool_memory;
|
||||
}
|
||||
|
||||
// Returns true if a result was pulled.
|
||||
const auto pull_result = [&]() -> bool {
|
||||
cursor_->PullMultiple(multi_frame_, ctx_);
|
||||
return multi_frame_.HasValidFrame();
|
||||
};
|
||||
|
||||
const auto stream_values = [&output_symbols, &stream](Frame &frame) {
|
||||
// TODO: The streamed values should also probably use the above memory.
|
||||
std::vector<TypedValue> values;
|
||||
values.reserve(output_symbols.size());
|
||||
|
||||
for (const auto &symbol : output_symbols) {
|
||||
values.emplace_back(frame[symbol]);
|
||||
}
|
||||
|
||||
stream->Result(values);
|
||||
};
|
||||
|
||||
// Get the execution time of all possible result pulls and streams.
|
||||
utils::Timer timer;
|
||||
|
||||
int i = 0;
|
||||
if (has_unsent_results_ && !output_symbols.empty()) {
|
||||
// stream unsent results from previous pull
|
||||
|
||||
auto iterator_for_valid_frame_only = multi_frame_.GetItOnConstValidFrames();
|
||||
for (auto &frame : iterator_for_valid_frame_only) {
|
||||
stream_values(frame);
|
||||
++i;
|
||||
}
|
||||
multi_frame_.ResetAllFramesInvalid();
|
||||
}
|
||||
|
||||
for (; !n || i < n;) {
|
||||
if (!pull_result()) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!output_symbols.empty()) {
|
||||
auto iterator_for_valid_frame_only = multi_frame_.GetItOnConstValidFrames();
|
||||
for (auto &frame : iterator_for_valid_frame_only) {
|
||||
stream_values(frame);
|
||||
++i;
|
||||
}
|
||||
}
|
||||
multi_frame_.ResetAllFramesInvalid();
|
||||
}
|
||||
|
||||
// If we finished because we streamed the requested n results,
|
||||
// we try to pull the next result to see if there is more.
|
||||
// If there is additional result, we leave the pulled result in the frame
|
||||
// and set the flag to true.
|
||||
has_unsent_results_ = i == n && pull_result();
|
||||
|
||||
execution_time_ += timer.Elapsed();
|
||||
|
||||
if (has_unsent_results_) {
|
||||
return std::nullopt;
|
||||
}
|
||||
summary->insert_or_assign("plan_execution_time", execution_time_.count());
|
||||
// We are finished with pulling all the data, therefore we can send any
|
||||
// metadata about the results i.e. notifications and statistics
|
||||
const bool is_any_counter_set =
|
||||
std::any_of(ctx_.execution_stats.counters.begin(), ctx_.execution_stats.counters.end(),
|
||||
[](const auto &counter) { return counter > 0; });
|
||||
if (is_any_counter_set) {
|
||||
std::map<std::string, TypedValue> stats;
|
||||
for (size_t i = 0; i < ctx_.execution_stats.counters.size(); ++i) {
|
||||
stats.emplace(ExecutionStatsKeyToString(ExecutionStats::Key(i)), ctx_.execution_stats.counters[i]);
|
||||
}
|
||||
summary->insert_or_assign("stats", std::move(stats));
|
||||
}
|
||||
cursor_->Shutdown();
|
||||
ctx_.profile_execution_time = execution_time_;
|
||||
return GetStatsWithTotalTime(ctx_);
|
||||
}
|
||||
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
auto should_pull_multiple = false; // #NoCommit
|
||||
if (should_pull_multiple) {
|
||||
return PullMultiple(stream, n, output_symbols, summary);
|
||||
}
|
||||
// Set up temporary memory for a single Pull. Initial memory comes from the
|
||||
// stack. 256 KiB should fit on the stack and should be more than enough for a
|
||||
// single `Pull`.
|
||||
|
128
src/query/v2/multiframe.cpp
Normal file
128
src/query/v2/multiframe.cpp
Normal file
@ -0,0 +1,128 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "query/v2/multiframe.hpp"
|
||||
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
#include "utils/pmr/vector.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
|
||||
MultiFrame::MultiFrame(FrameWithValidity default_frame, size_t number_of_frames,
|
||||
utils::MemoryResource *execution_memory)
|
||||
: default_frame_(default_frame),
|
||||
frames_(utils::pmr::vector<FrameWithValidity>(number_of_frames, default_frame, execution_memory)) {
|
||||
MG_ASSERT(number_of_frames > 0);
|
||||
MG_ASSERT(!default_frame.IsValid());
|
||||
}
|
||||
|
||||
MultiFrame::~MultiFrame() = default;
|
||||
|
||||
MultiFrame::MultiFrame(const MultiFrame &other) : default_frame_(other.default_frame_) {
|
||||
/*
|
||||
#NoCommit maybe not needed
|
||||
Do we just copy all frames or do we make distinctions between valid and not valid frames? Does it make any
|
||||
difference?
|
||||
*/
|
||||
frames_.reserve(other.frames_.size());
|
||||
std::transform(other.frames_.begin(), other.frames_.end(), std::back_inserter(frames_),
|
||||
[&default_frame = default_frame_](const auto &other_frame) {
|
||||
if (other_frame.IsValid()) {
|
||||
return other_frame;
|
||||
} else {
|
||||
return default_frame;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
MultiFrame::MultiFrame(MultiFrame &&other) noexcept : default_frame_(std::move(other.default_frame_)) {
|
||||
/*
|
||||
#NoCommit maybe not needed
|
||||
Do we just copy all frames or do we make distinctions between valid and not valid frames? Does it make any
|
||||
difference?
|
||||
*/
|
||||
frames_.reserve(other.frames_.size());
|
||||
std::transform(make_move_iterator(other.frames_.begin()), make_move_iterator(other.frames_.end()),
|
||||
std::back_inserter(frames_), [&default_frame = default_frame_](const auto &other_frame) {
|
||||
if (other_frame.IsValid()) {
|
||||
return other_frame;
|
||||
} else {
|
||||
return default_frame;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void MultiFrame::ResetAllFramesInvalid() noexcept {
|
||||
std::for_each(frames_.begin(), frames_.end(), [](auto &frame) { frame.MakeInvalid(); });
|
||||
}
|
||||
|
||||
bool MultiFrame::HasValidFrame() const noexcept {
|
||||
return std::any_of(frames_.begin(), frames_.end(), [](auto &frame) { return frame.IsValid(); });
|
||||
}
|
||||
|
||||
void MultiFrame::DefragmentValidFrames() noexcept {
|
||||
/*
|
||||
from: https://en.cppreference.com/w/cpp/algorithm/remove
|
||||
"Removing is done by shifting (by means of copy assignment (until C++11)move assignment (since C++11)) the elements
|
||||
in the range in such a way that the elements that are not to be removed appear in the beginning of the range.
|
||||
Relative order of the elements that remain is preserved and the physical size of the container is unchanged."
|
||||
*/
|
||||
std::remove_if(frames_.begin(), frames_.end(), [](auto &frame) { return !frame.IsValid(); });
|
||||
}
|
||||
|
||||
ItOnConstValidFrames MultiFrame::GetItOnConstValidFrames() { return ItOnConstValidFrames(*this); }
|
||||
|
||||
ItOnNonConstValidFrames MultiFrame::GetItOnNonConstValidFrames() { return ItOnNonConstValidFrames(*this); }
|
||||
|
||||
ItOnNonConstInvalidFrames MultiFrame::GetItOnNonConstInvalidFrames() { return ItOnNonConstInvalidFrames(*this); }
|
||||
|
||||
ItOnConstValidFrames::ItOnConstValidFrames(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
ItOnConstValidFrames::~ItOnConstValidFrames() = default;
|
||||
|
||||
ItOnConstValidFrames::Iterator ItOnConstValidFrames::begin() { return Iterator(&multiframe_.frames_[0], *this); }
|
||||
ItOnConstValidFrames::Iterator ItOnConstValidFrames::end() {
|
||||
return Iterator(&multiframe_.frames_[multiframe_.frames_.size()], *this);
|
||||
}
|
||||
|
||||
ItOnNonConstValidFrames::ItOnNonConstValidFrames(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
ItOnNonConstValidFrames::~ItOnNonConstValidFrames() {
|
||||
// #NoCommit possible optimisation: only DefragmentValidFrames if one frame has been invalidated? Only if does not
|
||||
// cost too much to store it
|
||||
multiframe_.DefragmentValidFrames();
|
||||
}
|
||||
|
||||
ItOnNonConstValidFrames::Iterator ItOnNonConstValidFrames::begin() { return Iterator(&multiframe_.frames_[0], *this); }
|
||||
|
||||
ItOnNonConstValidFrames::Iterator ItOnNonConstValidFrames::end() {
|
||||
return Iterator(&multiframe_.frames_[multiframe_.frames_.size()], *this);
|
||||
}
|
||||
|
||||
ItOnNonConstInvalidFrames::ItOnNonConstInvalidFrames(MultiFrame &multiframe) : multiframe_(multiframe) {}
|
||||
|
||||
ItOnNonConstInvalidFrames::~ItOnNonConstInvalidFrames() = default;
|
||||
|
||||
ItOnNonConstInvalidFrames::Iterator ItOnNonConstInvalidFrames::begin() {
|
||||
for (auto idx = 0UL; idx < multiframe_.frames_.size(); ++idx) {
|
||||
if (!multiframe_.frames_[idx].IsValid()) {
|
||||
return Iterator(&multiframe_.frames_[idx]);
|
||||
}
|
||||
}
|
||||
|
||||
return end();
|
||||
}
|
||||
|
||||
ItOnNonConstInvalidFrames::Iterator ItOnNonConstInvalidFrames::end() {
|
||||
return Iterator(&multiframe_.frames_[multiframe_.frames_.size()]);
|
||||
}
|
||||
|
||||
} // namespace memgraph::query::v2
|
218
src/query/v2/multiframe.hpp
Normal file
218
src/query/v2/multiframe.hpp
Normal file
@ -0,0 +1,218 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <iterator>
|
||||
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
|
||||
namespace memgraph::query::v2 {
|
||||
constexpr unsigned long kNumberOfFramesInMultiframe = 1000; // #NoCommit have it configurable
|
||||
|
||||
class ItOnConstValidFrames;
|
||||
class ItOnNonConstValidFrames;
|
||||
class ItOnNonConstInvalidFrames;
|
||||
|
||||
class MultiFrame {
|
||||
public:
|
||||
friend class ItOnConstValidFrames;
|
||||
friend class ItOnNonConstValidFrames;
|
||||
friend class ItOnNonConstInvalidFrames;
|
||||
|
||||
MultiFrame(FrameWithValidity default_frame, size_t number_of_frames, utils::MemoryResource *execution_memory);
|
||||
~MultiFrame();
|
||||
|
||||
MultiFrame(const MultiFrame &other); // copy constructor
|
||||
MultiFrame(MultiFrame &&other) noexcept; // move constructor
|
||||
MultiFrame &operator=(const MultiFrame &other) = delete;
|
||||
MultiFrame &operator=(MultiFrame &&other) noexcept = delete;
|
||||
|
||||
/*!
|
||||
Returns a object on which one can iterate in a for-loop. By doing so, you will only get frames that are in a valid
|
||||
state in the multiframe.
|
||||
Iteration goes in a deterministic order.
|
||||
One can't modify the validity of the frame with this implementation.
|
||||
*/
|
||||
ItOnConstValidFrames GetItOnConstValidFrames();
|
||||
|
||||
/*!
|
||||
Returns a object on which one can iterate in a for-loop. By doing so, you will only get frames that are in a valid
|
||||
state in the multiframe.
|
||||
Iteration goes in a deterministic order.
|
||||
One can modify the validity of the frame with this implementation.
|
||||
If you do not plan to modify the validity of the frames, use GetItOnConstValidFrames instead as this is faster.
|
||||
*/
|
||||
ItOnNonConstValidFrames GetItOnNonConstValidFrames();
|
||||
|
||||
/*!
|
||||
Returns a object on which one can iterate in a for-loop. By doing so, you will only get frames that are in an invalid
|
||||
state in the multiframe.
|
||||
Iteration goes in a deterministic order.
|
||||
One can modify the validity of the frame with this implementation.
|
||||
*/
|
||||
ItOnNonConstInvalidFrames GetItOnNonConstInvalidFrames();
|
||||
|
||||
void ResetAllFramesInvalid() noexcept;
|
||||
|
||||
bool HasValidFrame() const noexcept;
|
||||
|
||||
inline utils::MemoryResource *GetMemoryResource() { return frames_[0].GetMemoryResource(); }
|
||||
|
||||
private:
|
||||
void DefragmentValidFrames() noexcept;
|
||||
|
||||
FrameWithValidity default_frame_;
|
||||
utils::pmr::vector<FrameWithValidity> frames_ =
|
||||
utils::pmr::vector<FrameWithValidity>(0, FrameWithValidity{1}, utils::NewDeleteResource());
|
||||
};
|
||||
|
||||
class ItOnConstValidFrames {
|
||||
public:
|
||||
ItOnConstValidFrames(MultiFrame &multiframe);
|
||||
|
||||
~ItOnConstValidFrames();
|
||||
ItOnConstValidFrames(const ItOnConstValidFrames &other) = delete; // copy constructor
|
||||
ItOnConstValidFrames(ItOnConstValidFrames &&other) noexcept = delete; // move constructor
|
||||
ItOnConstValidFrames &operator=(const ItOnConstValidFrames &other) = delete; // copy assignment
|
||||
ItOnConstValidFrames &operator=(ItOnConstValidFrames &&other) noexcept = delete; // move assignment
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = Frame;
|
||||
using pointer = value_type *;
|
||||
using reference = Frame &;
|
||||
using internal_ptr = FrameWithValidity *;
|
||||
|
||||
Iterator(internal_ptr ptr, ItOnConstValidFrames &iterator_wrapper)
|
||||
: ptr_(ptr), iterator_wrapper_(iterator_wrapper) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
// Prefix increment
|
||||
Iterator &operator++() {
|
||||
do {
|
||||
ptr_++;
|
||||
} while (!this->ptr_->IsValid() && *this != iterator_wrapper_.end());
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &a, const Iterator &b) { return a.ptr_ == b.ptr_; };
|
||||
friend bool operator!=(const Iterator &a, const Iterator &b) { return a.ptr_ != b.ptr_; };
|
||||
|
||||
private:
|
||||
internal_ptr ptr_;
|
||||
ItOnConstValidFrames &iterator_wrapper_;
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
class ItOnNonConstValidFrames {
|
||||
public:
|
||||
ItOnNonConstValidFrames(MultiFrame &multiframe);
|
||||
|
||||
~ItOnNonConstValidFrames();
|
||||
ItOnNonConstValidFrames(const ItOnNonConstValidFrames &other) = delete; // copy constructor
|
||||
ItOnNonConstValidFrames(ItOnNonConstValidFrames &&other) noexcept = delete; // move constructor
|
||||
ItOnNonConstValidFrames &operator=(const ItOnNonConstValidFrames &other) = delete; // copy assignment
|
||||
ItOnNonConstValidFrames &operator=(ItOnNonConstValidFrames &&other) noexcept = delete; // move assignment
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = FrameWithValidity;
|
||||
using pointer = value_type *;
|
||||
using reference = FrameWithValidity &;
|
||||
using internal_ptr = FrameWithValidity *;
|
||||
|
||||
Iterator(internal_ptr ptr, ItOnNonConstValidFrames &iterator_wrapper)
|
||||
: ptr_(ptr), iterator_wrapper_(iterator_wrapper) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
// Prefix increment
|
||||
Iterator &operator++() {
|
||||
do {
|
||||
ptr_++;
|
||||
} while (!this->ptr_->IsValid() && *this != iterator_wrapper_.end());
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &a, const Iterator &b) { return a.ptr_ == b.ptr_; };
|
||||
friend bool operator!=(const Iterator &a, const Iterator &b) { return a.ptr_ != b.ptr_; };
|
||||
|
||||
private:
|
||||
internal_ptr ptr_;
|
||||
ItOnNonConstValidFrames &iterator_wrapper_;
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
class ItOnNonConstInvalidFrames {
|
||||
public:
|
||||
ItOnNonConstInvalidFrames(MultiFrame &multiframe);
|
||||
~ItOnNonConstInvalidFrames();
|
||||
|
||||
ItOnNonConstInvalidFrames(const ItOnNonConstInvalidFrames &other) = delete; // copy constructor
|
||||
ItOnNonConstInvalidFrames(ItOnNonConstInvalidFrames &&other) noexcept = delete; // move constructor
|
||||
ItOnNonConstInvalidFrames &operator=(const ItOnNonConstInvalidFrames &other) = delete; // copy assignment
|
||||
ItOnNonConstInvalidFrames &operator=(ItOnNonConstInvalidFrames &&other) noexcept = delete; // move assignment
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = FrameWithValidity;
|
||||
using pointer = value_type *;
|
||||
using reference = FrameWithValidity &;
|
||||
using internal_ptr = FrameWithValidity *;
|
||||
|
||||
Iterator(internal_ptr ptr) : ptr_(ptr) {}
|
||||
|
||||
reference operator*() const { return *ptr_; }
|
||||
pointer operator->() { return ptr_; }
|
||||
|
||||
// Prefix increment
|
||||
Iterator &operator++() {
|
||||
ptr_->MakeValid();
|
||||
ptr_++;
|
||||
return *this;
|
||||
}
|
||||
|
||||
friend bool operator==(const Iterator &a, const Iterator &b) { return a.ptr_ == b.ptr_; };
|
||||
friend bool operator!=(const Iterator &a, const Iterator &b) { return a.ptr_ != b.ptr_; };
|
||||
|
||||
private:
|
||||
internal_ptr ptr_;
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
private:
|
||||
MultiFrame &multiframe_;
|
||||
};
|
||||
|
||||
} // namespace memgraph::query::v2
|
@ -257,13 +257,30 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("Once");
|
||||
|
||||
if (!did_pull_) {
|
||||
did_pull_ = true;
|
||||
if (pull_count_ < 1) {
|
||||
pull_count_++;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("OnceMF");
|
||||
|
||||
auto iterator_for_valid_frame_only = multi_frame.GetItOnNonConstInvalidFrames();
|
||||
auto first_it = iterator_for_valid_frame_only.begin();
|
||||
MG_ASSERT(first_it != iterator_for_valid_frame_only.end());
|
||||
if (pull_count_ < 1) {
|
||||
auto *memory_resource = multi_frame.GetMemoryResource();
|
||||
auto &frame = *first_it;
|
||||
frame.MakeValid();
|
||||
for (auto &value : frame.elems()) {
|
||||
value = TypedValue{memory_resource};
|
||||
}
|
||||
pull_count_++;
|
||||
}
|
||||
}
|
||||
|
||||
UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const {
|
||||
EventCounter::IncrementCounter(EventCounter::OnceOperator);
|
||||
|
||||
@ -274,7 +291,7 @@ WITHOUT_SINGLE_INPUT(Once);
|
||||
|
||||
void Once::OnceCursor::Shutdown() {}
|
||||
|
||||
void Once::OnceCursor::Reset() { did_pull_ = false; }
|
||||
void Once::OnceCursor::Reset() { pull_count_ = 0; }
|
||||
|
||||
CreateNode::CreateNode(const std::shared_ptr<LogicalOperator> &input, const NodeCreationInfo &node_info)
|
||||
: input_(input ? input : std::make_shared<Once>()), node_info_(node_info) {}
|
||||
@ -766,6 +783,23 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
||||
SCOPED_PROFILE_OP("ProduceMF");
|
||||
|
||||
input_cursor_->PullMultiple(multi_frame, context);
|
||||
|
||||
auto iterator_for_valid_frame_only = multi_frame.GetItOnConstValidFrames();
|
||||
for (auto &frame : iterator_for_valid_frame_only) {
|
||||
// Produce should always yield the latest results.
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context,
|
||||
context.shard_request_manager, storage::v3::View::NEW);
|
||||
|
||||
for (auto *named_expr : self_.named_expressions_) {
|
||||
named_expr->Accept(evaluator);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void Produce::ProduceCursor::Shutdown() { input_cursor_->Shutdown(); }
|
||||
|
||||
void Produce::ProduceCursor::Reset() { input_cursor_->Reset(); }
|
||||
|
@ -28,6 +28,7 @@
|
||||
#include "query/v2/bindings/typed_value.hpp"
|
||||
#include "query/v2/bindings/frame.hpp"
|
||||
#include "query/v2/bindings/symbol_table.hpp"
|
||||
#include "query/v2/multiframe.hpp"
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "utils/bound.hpp"
|
||||
#include "utils/fnv.hpp"
|
||||
@ -71,6 +72,8 @@ class Cursor {
|
||||
/// @throws QueryRuntimeException if something went wrong with execution
|
||||
virtual bool Pull(Frame &, ExecutionContext &) = 0;
|
||||
|
||||
virtual void PullMultiple(MultiFrame &, ExecutionContext &) { LOG_FATAL("PullMultipleIsNotImplemented"); }
|
||||
|
||||
/// Resets the Cursor to its initial state.
|
||||
virtual void Reset() = 0;
|
||||
|
||||
@ -332,12 +335,13 @@ and false on every following Pull.")
|
||||
class OnceCursor : public Cursor {
|
||||
public:
|
||||
OnceCursor() {}
|
||||
void PullMultiple(MultiFrame &, ExecutionContext &) override;
|
||||
bool Pull(Frame &, ExecutionContext &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
private:
|
||||
bool did_pull_{false};
|
||||
size_t pull_count_{false};
|
||||
};
|
||||
cpp<#)
|
||||
(:serialize (:slk))
|
||||
@ -1207,6 +1211,7 @@ RETURN clause) the Produce's pull succeeds exactly once.")
|
||||
public:
|
||||
ProduceCursor(const Produce &, utils::MemoryResource *);
|
||||
bool Pull(Frame &, ExecutionContext &) override;
|
||||
void PullMultiple(MultiFrame &, ExecutionContext &) override;
|
||||
void Shutdown() override;
|
||||
void Reset() override;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user