Merge pull request #710 from memgraph/T1190-MG-Implement-ScanAll-and-ScanAllByLabel-with-MultiFrame_2
Implement scan all and scan all by label with `MultiFrame`
This commit is contained in:
commit
d44910bc9a
@ -704,7 +704,6 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
|||||||
ctx_.request_router = request_router;
|
ctx_.request_router = request_router;
|
||||||
ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc;
|
ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStream *stream, std::optional<int> n,
|
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStream *stream, std::optional<int> n,
|
||||||
const std::vector<Symbol> &output_symbols,
|
const std::vector<Symbol> &output_symbols,
|
||||||
std::map<std::string, TypedValue> *summary) {
|
std::map<std::string, TypedValue> *summary) {
|
||||||
@ -734,7 +733,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
|
|||||||
// Returns true if a result was pulled.
|
// Returns true if a result was pulled.
|
||||||
const auto pull_result = [&]() -> bool {
|
const auto pull_result = [&]() -> bool {
|
||||||
cursor_->PullMultiple(multi_frame_, ctx_);
|
cursor_->PullMultiple(multi_frame_, ctx_);
|
||||||
return !multi_frame_.HasInvalidFrame();
|
return multi_frame_.HasValidFrame();
|
||||||
};
|
};
|
||||||
|
|
||||||
const auto stream_values = [&output_symbols, &stream](const Frame &frame) {
|
const auto stream_values = [&output_symbols, &stream](const Frame &frame) {
|
||||||
@ -755,13 +754,14 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
|
|||||||
int i = 0;
|
int i = 0;
|
||||||
if (has_unsent_results_ && !output_symbols.empty()) {
|
if (has_unsent_results_ && !output_symbols.empty()) {
|
||||||
// stream unsent results from previous pull
|
// stream unsent results from previous pull
|
||||||
|
for (auto &frame : multi_frame_.GetValidFramesConsumer()) {
|
||||||
auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
|
|
||||||
for (const auto &frame : iterator_for_valid_frame_only) {
|
|
||||||
stream_values(frame);
|
stream_values(frame);
|
||||||
|
frame.MakeInvalid();
|
||||||
++i;
|
++i;
|
||||||
|
if (i == n) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
multi_frame_.MakeAllFramesInvalid();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (; !n || i < n;) {
|
for (; !n || i < n;) {
|
||||||
@ -770,13 +770,15 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!output_symbols.empty()) {
|
if (!output_symbols.empty()) {
|
||||||
auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
|
for (auto &frame : multi_frame_.GetValidFramesConsumer()) {
|
||||||
for (const auto &frame : iterator_for_valid_frame_only) {
|
|
||||||
stream_values(frame);
|
stream_values(frame);
|
||||||
|
frame.MakeInvalid();
|
||||||
++i;
|
++i;
|
||||||
|
if (i == n) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
multi_frame_.MakeAllFramesInvalid();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we finished because we streamed the requested n results,
|
// If we finished because we streamed the requested n results,
|
||||||
|
@ -33,6 +33,7 @@ class MultiFrame {
|
|||||||
MultiFrame(size_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory);
|
MultiFrame(size_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory);
|
||||||
~MultiFrame() = default;
|
~MultiFrame() = default;
|
||||||
|
|
||||||
|
// Assigning and moving the MultiFrame is not allowed if any accessor from the above ones are alive.
|
||||||
MultiFrame(const MultiFrame &other);
|
MultiFrame(const MultiFrame &other);
|
||||||
MultiFrame(MultiFrame &&other) noexcept;
|
MultiFrame(MultiFrame &&other) noexcept;
|
||||||
MultiFrame &operator=(const MultiFrame &other) = delete;
|
MultiFrame &operator=(const MultiFrame &other) = delete;
|
||||||
@ -97,9 +98,9 @@ class ValidFramesReader {
|
|||||||
|
|
||||||
~ValidFramesReader() = default;
|
~ValidFramesReader() = default;
|
||||||
ValidFramesReader(const ValidFramesReader &other) = delete;
|
ValidFramesReader(const ValidFramesReader &other) = delete;
|
||||||
ValidFramesReader(ValidFramesReader &&other) noexcept = delete;
|
ValidFramesReader(ValidFramesReader &&other) noexcept = default;
|
||||||
ValidFramesReader &operator=(const ValidFramesReader &other) = delete;
|
ValidFramesReader &operator=(const ValidFramesReader &other) = delete;
|
||||||
ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = delete;
|
ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = default;
|
||||||
|
|
||||||
struct Iterator {
|
struct Iterator {
|
||||||
using iterator_category = std::forward_iterator_tag;
|
using iterator_category = std::forward_iterator_tag;
|
||||||
@ -147,9 +148,9 @@ class ValidFramesModifier {
|
|||||||
|
|
||||||
~ValidFramesModifier() = default;
|
~ValidFramesModifier() = default;
|
||||||
ValidFramesModifier(const ValidFramesModifier &other) = delete;
|
ValidFramesModifier(const ValidFramesModifier &other) = delete;
|
||||||
ValidFramesModifier(ValidFramesModifier &&other) noexcept = delete;
|
ValidFramesModifier(ValidFramesModifier &&other) noexcept = default;
|
||||||
ValidFramesModifier &operator=(const ValidFramesModifier &other) = delete;
|
ValidFramesModifier &operator=(const ValidFramesModifier &other) = delete;
|
||||||
ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = delete;
|
ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = default;
|
||||||
|
|
||||||
struct Iterator {
|
struct Iterator {
|
||||||
using iterator_category = std::forward_iterator_tag;
|
using iterator_category = std::forward_iterator_tag;
|
||||||
@ -202,9 +203,9 @@ class ValidFramesConsumer {
|
|||||||
|
|
||||||
~ValidFramesConsumer() noexcept;
|
~ValidFramesConsumer() noexcept;
|
||||||
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
||||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
|
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
|
||||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
||||||
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
|
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default;
|
||||||
|
|
||||||
struct Iterator {
|
struct Iterator {
|
||||||
using iterator_category = std::forward_iterator_tag;
|
using iterator_category = std::forward_iterator_tag;
|
||||||
@ -256,9 +257,9 @@ class InvalidFramesPopulator {
|
|||||||
~InvalidFramesPopulator() = default;
|
~InvalidFramesPopulator() = default;
|
||||||
|
|
||||||
InvalidFramesPopulator(const InvalidFramesPopulator &other) = delete;
|
InvalidFramesPopulator(const InvalidFramesPopulator &other) = delete;
|
||||||
InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = delete;
|
InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = default;
|
||||||
InvalidFramesPopulator &operator=(const InvalidFramesPopulator &other) = delete;
|
InvalidFramesPopulator &operator=(const InvalidFramesPopulator &other) = delete;
|
||||||
InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = delete;
|
InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = default;
|
||||||
|
|
||||||
struct Iterator {
|
struct Iterator {
|
||||||
using iterator_category = std::forward_iterator_tag;
|
using iterator_category = std::forward_iterator_tag;
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
#include "query/v2/db_accessor.hpp"
|
#include "query/v2/db_accessor.hpp"
|
||||||
#include "query/v2/exceptions.hpp"
|
#include "query/v2/exceptions.hpp"
|
||||||
#include "query/v2/frontend/ast/ast.hpp"
|
#include "query/v2/frontend/ast/ast.hpp"
|
||||||
|
#include "query/v2/multiframe.hpp"
|
||||||
#include "query/v2/path.hpp"
|
#include "query/v2/path.hpp"
|
||||||
#include "query/v2/plan/scoped_profile.hpp"
|
#include "query/v2/plan/scoped_profile.hpp"
|
||||||
#include "query/v2/request_router.hpp"
|
#include "query/v2/request_router.hpp"
|
||||||
@ -453,53 +454,101 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
|||||||
|
|
||||||
using VertexAccessor = accessors::VertexAccessor;
|
using VertexAccessor = accessors::VertexAccessor;
|
||||||
|
|
||||||
bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) {
|
bool MakeRequest(ExecutionContext &context) {
|
||||||
{
|
{
|
||||||
SCOPED_REQUEST_WAIT_PROFILE;
|
SCOPED_REQUEST_WAIT_PROFILE;
|
||||||
std::optional<std::string> request_label = std::nullopt;
|
std::optional<std::string> request_label = std::nullopt;
|
||||||
if (label_.has_value()) {
|
if (label_.has_value()) {
|
||||||
request_label = request_router.LabelToName(*label_);
|
request_label = context.request_router->LabelToName(*label_);
|
||||||
}
|
}
|
||||||
current_batch = request_router.ScanVertices(request_label);
|
current_batch_ = context.request_router->ScanVertices(request_label);
|
||||||
}
|
}
|
||||||
current_vertex_it = current_batch.begin();
|
current_vertex_it_ = current_batch_.begin();
|
||||||
request_state_ = State::COMPLETED;
|
return !current_batch_.empty();
|
||||||
return !current_batch.empty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||||
SCOPED_PROFILE_OP(op_name_);
|
SCOPED_PROFILE_OP(op_name_);
|
||||||
|
|
||||||
auto &request_router = *context.request_router;
|
|
||||||
while (true) {
|
while (true) {
|
||||||
if (MustAbort(context)) {
|
if (MustAbort(context)) {
|
||||||
throw HintedAbortError();
|
throw HintedAbortError();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (request_state_ == State::INITIALIZING) {
|
if (current_vertex_it_ == current_batch_.end()) {
|
||||||
if (!input_cursor_->Pull(frame, context)) {
|
ResetExecutionState();
|
||||||
|
if (!input_cursor_->Pull(frame, context) || !MakeRequest(context)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (current_vertex_it == current_batch.end() &&
|
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
|
||||||
(request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
|
++current_vertex_it_;
|
||||||
ResetExecutionState();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
|
|
||||||
++current_vertex_it;
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool PullNextFrames(ExecutionContext &context) {
|
||||||
|
input_cursor_->PullMultiple(*own_multi_frame_, context);
|
||||||
|
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
||||||
|
own_frames_it_ = own_frames_consumer_->begin();
|
||||||
|
return own_multi_frame_->HasValidFrame();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool HasMoreResult() {
|
||||||
|
return current_vertex_it_ != current_batch_.end() && own_frames_it_ != own_frames_consumer_->end();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PopulateFrame(ExecutionContext &context, FrameWithValidity &frame) {
|
||||||
|
MG_ASSERT(HasMoreResult());
|
||||||
|
|
||||||
|
frame = *own_frames_it_;
|
||||||
|
frame[output_symbol_] = TypedValue(*current_vertex_it_);
|
||||||
|
|
||||||
|
++current_vertex_it_;
|
||||||
|
if (current_vertex_it_ == current_batch_.end()) {
|
||||||
|
own_frames_it_->MakeInvalid();
|
||||||
|
++own_frames_it_;
|
||||||
|
|
||||||
|
current_vertex_it_ = current_batch_.begin();
|
||||||
|
|
||||||
|
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||||
|
return PullNextFrames(context);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
|
||||||
|
SCOPED_PROFILE_OP(op_name_);
|
||||||
|
|
||||||
|
if (!own_multi_frame_.has_value()) {
|
||||||
|
own_multi_frame_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe,
|
||||||
|
input_multi_frame.GetMemoryResource()));
|
||||||
|
|
||||||
|
MakeRequest(context);
|
||||||
|
PullNextFrames(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!HasMoreResult()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto &frame : input_multi_frame.GetInvalidFramesPopulator()) {
|
||||||
|
if (MustAbort(context)) {
|
||||||
|
throw HintedAbortError();
|
||||||
|
}
|
||||||
|
if (!PopulateFrame(context, frame)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||||
|
|
||||||
void ResetExecutionState() {
|
void ResetExecutionState() {
|
||||||
current_batch.clear();
|
current_batch_.clear();
|
||||||
current_vertex_it = current_batch.end();
|
current_vertex_it_ = current_batch_.end();
|
||||||
request_state_ = State::INITIALIZING;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Reset() override {
|
void Reset() override {
|
||||||
@ -511,12 +560,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
|||||||
const Symbol output_symbol_;
|
const Symbol output_symbol_;
|
||||||
const UniqueCursorPtr input_cursor_;
|
const UniqueCursorPtr input_cursor_;
|
||||||
const char *op_name_;
|
const char *op_name_;
|
||||||
std::vector<VertexAccessor> current_batch;
|
std::vector<VertexAccessor> current_batch_;
|
||||||
std::vector<VertexAccessor>::iterator current_vertex_it;
|
std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()};
|
||||||
State request_state_ = State::INITIALIZING;
|
|
||||||
std::optional<storage::v3::LabelId> label_;
|
std::optional<storage::v3::LabelId> label_;
|
||||||
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
|
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
|
||||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||||
|
std::optional<MultiFrame> own_multi_frame_;
|
||||||
|
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||||
|
ValidFramesConsumer::Iterator own_frames_it_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class DistributedScanByPrimaryKeyCursor : public Cursor {
|
class DistributedScanByPrimaryKeyCursor : public Cursor {
|
||||||
@ -606,8 +657,6 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy
|
|||||||
|
|
||||||
ACCEPT_WITH_INPUT(ScanAll)
|
ACCEPT_WITH_INPUT(ScanAll)
|
||||||
|
|
||||||
class DistributedScanAllCursor;
|
|
||||||
|
|
||||||
UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
|
UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
|
||||||
EventCounter::IncrementCounter(EventCounter::ScanAllOperator);
|
EventCounter::IncrementCounter(EventCounter::ScanAllOperator);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user