Merge branch 'project-pineapples' into T1165-MG-add-property-based-high-level-query-test
This commit is contained in:
commit
16b78e4eb3
@ -704,7 +704,6 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
|
||||
ctx_.request_router = request_router;
|
||||
ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc;
|
||||
}
|
||||
|
||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStream *stream, std::optional<int> n,
|
||||
const std::vector<Symbol> &output_symbols,
|
||||
std::map<std::string, TypedValue> *summary) {
|
||||
@ -734,7 +733,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
|
||||
// Returns true if a result was pulled.
|
||||
const auto pull_result = [&]() -> bool {
|
||||
cursor_->PullMultiple(multi_frame_, ctx_);
|
||||
return !multi_frame_.HasInvalidFrame();
|
||||
return multi_frame_.HasValidFrame();
|
||||
};
|
||||
|
||||
const auto stream_values = [&output_symbols, &stream](const Frame &frame) {
|
||||
@ -755,13 +754,14 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
|
||||
int i = 0;
|
||||
if (has_unsent_results_ && !output_symbols.empty()) {
|
||||
// stream unsent results from previous pull
|
||||
|
||||
auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
|
||||
for (const auto &frame : iterator_for_valid_frame_only) {
|
||||
for (auto &frame : multi_frame_.GetValidFramesConsumer()) {
|
||||
stream_values(frame);
|
||||
frame.MakeInvalid();
|
||||
++i;
|
||||
if (i == n) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
multi_frame_.MakeAllFramesInvalid();
|
||||
}
|
||||
|
||||
for (; !n || i < n;) {
|
||||
@ -770,13 +770,15 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
|
||||
}
|
||||
|
||||
if (!output_symbols.empty()) {
|
||||
auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader();
|
||||
for (const auto &frame : iterator_for_valid_frame_only) {
|
||||
for (auto &frame : multi_frame_.GetValidFramesConsumer()) {
|
||||
stream_values(frame);
|
||||
frame.MakeInvalid();
|
||||
++i;
|
||||
if (i == n) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
multi_frame_.MakeAllFramesInvalid();
|
||||
}
|
||||
|
||||
// If we finished because we streamed the requested n results,
|
||||
|
@ -33,6 +33,7 @@ class MultiFrame {
|
||||
MultiFrame(size_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory);
|
||||
~MultiFrame() = default;
|
||||
|
||||
// Assigning and moving the MultiFrame is not allowed if any accessor from the above ones are alive.
|
||||
MultiFrame(const MultiFrame &other);
|
||||
MultiFrame(MultiFrame &&other) noexcept;
|
||||
MultiFrame &operator=(const MultiFrame &other) = delete;
|
||||
@ -97,9 +98,9 @@ class ValidFramesReader {
|
||||
|
||||
~ValidFramesReader() = default;
|
||||
ValidFramesReader(const ValidFramesReader &other) = delete;
|
||||
ValidFramesReader(ValidFramesReader &&other) noexcept = delete;
|
||||
ValidFramesReader(ValidFramesReader &&other) noexcept = default;
|
||||
ValidFramesReader &operator=(const ValidFramesReader &other) = delete;
|
||||
ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = delete;
|
||||
ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = default;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
@ -147,9 +148,9 @@ class ValidFramesModifier {
|
||||
|
||||
~ValidFramesModifier() = default;
|
||||
ValidFramesModifier(const ValidFramesModifier &other) = delete;
|
||||
ValidFramesModifier(ValidFramesModifier &&other) noexcept = delete;
|
||||
ValidFramesModifier(ValidFramesModifier &&other) noexcept = default;
|
||||
ValidFramesModifier &operator=(const ValidFramesModifier &other) = delete;
|
||||
ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = delete;
|
||||
ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = default;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
@ -202,9 +203,9 @@ class ValidFramesConsumer {
|
||||
|
||||
~ValidFramesConsumer() noexcept;
|
||||
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
|
||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
|
||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
|
||||
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
@ -256,9 +257,9 @@ class InvalidFramesPopulator {
|
||||
~InvalidFramesPopulator() = default;
|
||||
|
||||
InvalidFramesPopulator(const InvalidFramesPopulator &other) = delete;
|
||||
InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = delete;
|
||||
InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = default;
|
||||
InvalidFramesPopulator &operator=(const InvalidFramesPopulator &other) = delete;
|
||||
InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = delete;
|
||||
InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = default;
|
||||
|
||||
struct Iterator {
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
|
@ -38,6 +38,7 @@
|
||||
#include "query/v2/db_accessor.hpp"
|
||||
#include "query/v2/exceptions.hpp"
|
||||
#include "query/v2/frontend/ast/ast.hpp"
|
||||
#include "query/v2/multiframe.hpp"
|
||||
#include "query/v2/path.hpp"
|
||||
#include "query/v2/plan/scoped_profile.hpp"
|
||||
#include "query/v2/request_router.hpp"
|
||||
@ -453,53 +454,101 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
|
||||
using VertexAccessor = accessors::VertexAccessor;
|
||||
|
||||
bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) {
|
||||
bool MakeRequest(ExecutionContext &context) {
|
||||
{
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
std::optional<std::string> request_label = std::nullopt;
|
||||
if (label_.has_value()) {
|
||||
request_label = request_router.LabelToName(*label_);
|
||||
request_label = context.request_router->LabelToName(*label_);
|
||||
}
|
||||
current_batch = request_router.ScanVertices(request_label);
|
||||
current_batch_ = context.request_router->ScanVertices(request_label);
|
||||
}
|
||||
current_vertex_it = current_batch.begin();
|
||||
request_state_ = State::COMPLETED;
|
||||
return !current_batch.empty();
|
||||
current_vertex_it_ = current_batch_.begin();
|
||||
return !current_batch_.empty();
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
auto &request_router = *context.request_router;
|
||||
while (true) {
|
||||
if (MustAbort(context)) {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
|
||||
if (request_state_ == State::INITIALIZING) {
|
||||
if (!input_cursor_->Pull(frame, context)) {
|
||||
if (current_vertex_it_ == current_batch_.end()) {
|
||||
ResetExecutionState();
|
||||
if (!input_cursor_->Pull(frame, context) || !MakeRequest(context)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (current_vertex_it == current_batch.end() &&
|
||||
(request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
|
||||
ResetExecutionState();
|
||||
continue;
|
||||
}
|
||||
|
||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
|
||||
++current_vertex_it;
|
||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
|
||||
++current_vertex_it_;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool PullNextFrames(ExecutionContext &context) {
|
||||
input_cursor_->PullMultiple(*own_multi_frame_, context);
|
||||
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
return own_multi_frame_->HasValidFrame();
|
||||
}
|
||||
|
||||
inline bool HasMoreResult() {
|
||||
return current_vertex_it_ != current_batch_.end() && own_frames_it_ != own_frames_consumer_->end();
|
||||
}
|
||||
|
||||
bool PopulateFrame(ExecutionContext &context, FrameWithValidity &frame) {
|
||||
MG_ASSERT(HasMoreResult());
|
||||
|
||||
frame = *own_frames_it_;
|
||||
frame[output_symbol_] = TypedValue(*current_vertex_it_);
|
||||
|
||||
++current_vertex_it_;
|
||||
if (current_vertex_it_ == current_batch_.end()) {
|
||||
own_frames_it_->MakeInvalid();
|
||||
++own_frames_it_;
|
||||
|
||||
current_vertex_it_ = current_batch_.begin();
|
||||
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
return PullNextFrames(context);
|
||||
}
|
||||
};
|
||||
return true;
|
||||
}
|
||||
|
||||
void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
if (!own_multi_frame_.has_value()) {
|
||||
own_multi_frame_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe,
|
||||
input_multi_frame.GetMemoryResource()));
|
||||
|
||||
MakeRequest(context);
|
||||
PullNextFrames(context);
|
||||
}
|
||||
|
||||
if (!HasMoreResult()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto &frame : input_multi_frame.GetInvalidFramesPopulator()) {
|
||||
if (MustAbort(context)) {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
if (!PopulateFrame(context, frame)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void ResetExecutionState() {
|
||||
current_batch.clear();
|
||||
current_vertex_it = current_batch.end();
|
||||
request_state_ = State::INITIALIZING;
|
||||
current_batch_.clear();
|
||||
current_vertex_it_ = current_batch_.end();
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
@ -511,12 +560,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
const Symbol output_symbol_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
const char *op_name_;
|
||||
std::vector<VertexAccessor> current_batch;
|
||||
std::vector<VertexAccessor>::iterator current_vertex_it;
|
||||
State request_state_ = State::INITIALIZING;
|
||||
std::vector<VertexAccessor> current_batch_;
|
||||
std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()};
|
||||
std::optional<storage::v3::LabelId> label_;
|
||||
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
|
||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||
std::optional<MultiFrame> own_multi_frame_;
|
||||
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator own_frames_it_;
|
||||
};
|
||||
|
||||
class DistributedScanByPrimaryKeyCursor : public Cursor {
|
||||
@ -606,8 +657,6 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy
|
||||
|
||||
ACCEPT_WITH_INPUT(ScanAll)
|
||||
|
||||
class DistributedScanAllCursor;
|
||||
|
||||
UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
|
||||
EventCounter::IncrementCounter(EventCounter::ScanAllOperator);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user