Implement scanAll MultiFrame version
This commit is contained in:
parent
04450dada7
commit
af812d1311
@ -201,10 +201,9 @@ class ValidFramesConsumer {
|
|||||||
|
|
||||||
~ValidFramesConsumer() noexcept;
|
~ValidFramesConsumer() noexcept;
|
||||||
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
||||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
|
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
|
||||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
||||||
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
|
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default;
|
||||||
|
|
||||||
struct Iterator {
|
struct Iterator {
|
||||||
using iterator_category = std::forward_iterator_tag;
|
using iterator_category = std::forward_iterator_tag;
|
||||||
using difference_type = std::ptrdiff_t;
|
using difference_type = std::ptrdiff_t;
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
#include "query/v2/db_accessor.hpp"
|
#include "query/v2/db_accessor.hpp"
|
||||||
#include "query/v2/exceptions.hpp"
|
#include "query/v2/exceptions.hpp"
|
||||||
#include "query/v2/frontend/ast/ast.hpp"
|
#include "query/v2/frontend/ast/ast.hpp"
|
||||||
|
#include "query/v2/multiframe.hpp"
|
||||||
#include "query/v2/path.hpp"
|
#include "query/v2/path.hpp"
|
||||||
#include "query/v2/plan/scoped_profile.hpp"
|
#include "query/v2/plan/scoped_profile.hpp"
|
||||||
#include "query/v2/request_router.hpp"
|
#include "query/v2/request_router.hpp"
|
||||||
@ -473,11 +474,11 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
|||||||
if (label_.has_value()) {
|
if (label_.has_value()) {
|
||||||
request_label = request_router.LabelToName(*label_);
|
request_label = request_router.LabelToName(*label_);
|
||||||
}
|
}
|
||||||
current_batch = request_router.ScanVertices(request_label);
|
current_batch_ = request_router.ScanVertices(request_label);
|
||||||
}
|
}
|
||||||
current_vertex_it = current_batch.begin();
|
current_vertex_it_ = current_batch_.begin();
|
||||||
request_state_ = State::COMPLETED;
|
request_state_ = State::COMPLETED;
|
||||||
return !current_batch.empty();
|
return !current_batch_.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||||
@ -495,23 +496,85 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (current_vertex_it == current_batch.end() &&
|
if (current_vertex_it_ == current_batch_.end() &&
|
||||||
(request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
|
(request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
|
||||||
ResetExecutionState();
|
ResetExecutionState();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
|
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
|
||||||
++current_vertex_it;
|
++current_vertex_it_;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Generate(ExecutionContext &context) {
|
||||||
|
auto &request_router = *context.request_router;
|
||||||
|
|
||||||
|
input_cursor_->PullMultiple(*own_multi_frames_, context);
|
||||||
|
|
||||||
|
if (!MakeRequest(request_router, context)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto valid_frames_consumer = own_multi_frames_->GetValidFramesConsumer();
|
||||||
|
auto valid_frames_it = valid_frames_consumer.begin();
|
||||||
|
|
||||||
|
for (auto valid_frames_it = valid_frames_consumer.begin(); valid_frames_it != valid_frames_consumer.end();
|
||||||
|
++valid_frames_it) {
|
||||||
|
for (auto vertex_it = current_batch_.begin(); vertex_it != current_batch_.end(); ++vertex_it) {
|
||||||
|
auto frame = *valid_frames_it;
|
||||||
|
frame[output_symbol_] = TypedValue(*vertex_it);
|
||||||
|
frames_buffer_.push(std::move(frame));
|
||||||
|
}
|
||||||
|
valid_frames_it->MakeInvalid();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
|
||||||
|
SCOPED_PROFILE_OP(op_name_);
|
||||||
|
|
||||||
|
if (!own_multi_frames_.has_value()) {
|
||||||
|
own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(),
|
||||||
|
kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto &request_router = *context.request_router;
|
||||||
|
auto should_make_request = false;
|
||||||
|
auto should_pull = false;
|
||||||
|
auto should_generate = false;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (MustAbort(context)) {
|
||||||
|
throw HintedAbortError();
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto should_generate = frames_buffer_.empty();
|
||||||
|
if (should_generate) {
|
||||||
|
Generate(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
|
||||||
|
auto invalid_frame_it = invalid_frames_populator.begin();
|
||||||
|
auto has_modified_at_least_one_frame = false;
|
||||||
|
while (invalid_frames_populator.end() != invalid_frame_it && !frames_buffer_.empty()) {
|
||||||
|
has_modified_at_least_one_frame = true;
|
||||||
|
*invalid_frame_it = frames_buffer_.front();
|
||||||
|
++invalid_frame_it;
|
||||||
|
frames_buffer_.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!has_modified_at_least_one_frame) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||||
|
|
||||||
void ResetExecutionState() {
|
void ResetExecutionState() {
|
||||||
current_batch.clear();
|
current_batch_.clear();
|
||||||
current_vertex_it = current_batch.end();
|
current_vertex_it_ = current_batch_.end();
|
||||||
request_state_ = State::INITIALIZING;
|
request_state_ = State::INITIALIZING;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -524,12 +587,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
|||||||
const Symbol output_symbol_;
|
const Symbol output_symbol_;
|
||||||
const UniqueCursorPtr input_cursor_;
|
const UniqueCursorPtr input_cursor_;
|
||||||
const char *op_name_;
|
const char *op_name_;
|
||||||
std::vector<VertexAccessor> current_batch;
|
std::vector<VertexAccessor> current_batch_;
|
||||||
std::vector<VertexAccessor>::iterator current_vertex_it;
|
std::vector<VertexAccessor>::iterator current_vertex_it_;
|
||||||
State request_state_ = State::INITIALIZING;
|
State request_state_ = State::INITIALIZING;
|
||||||
std::optional<storage::v3::LabelId> label_;
|
std::optional<storage::v3::LabelId> label_;
|
||||||
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
|
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
|
||||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||||
|
std::optional<MultiFrame> own_multi_frames_;
|
||||||
|
std::queue<FrameWithValidity> frames_buffer_;
|
||||||
};
|
};
|
||||||
|
|
||||||
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
||||||
|
Loading…
Reference in New Issue
Block a user