Refactor DistributedScanAllAndFilterCursor

This commit is contained in:
János Benjamin Antal 2023-01-17 08:34:08 +01:00
parent d11d5c3fa9
commit 57690c5390

View File

@ -457,14 +457,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
using VertexAccessor = accessors::VertexAccessor; using VertexAccessor = accessors::VertexAccessor;
bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) { bool MakeRequest(ExecutionContext &context) {
{ {
SCOPED_REQUEST_WAIT_PROFILE; SCOPED_REQUEST_WAIT_PROFILE;
std::optional<std::string> request_label = std::nullopt; std::optional<std::string> request_label = std::nullopt;
if (label_.has_value()) { if (label_.has_value()) {
request_label = request_router.LabelToName(*label_); request_label = context.request_router->LabelToName(*label_);
} }
current_batch_ = request_router.ScanVertices(request_label); current_batch_ = context.request_router->ScanVertices(request_label);
} }
current_vertex_it_ = current_batch_.begin(); current_vertex_it_ = current_batch_.begin();
return !current_batch_.empty(); return !current_batch_.empty();
@ -480,7 +480,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
if (current_vertex_it_ == current_batch_.end()) { if (current_vertex_it_ == current_batch_.end()) {
ResetExecutionState(); ResetExecutionState();
if (!input_cursor_->Pull(frame, context) || !MakeRequest(*context.request_router, context)) { if (!input_cursor_->Pull(frame, context) || !MakeRequest(context)) {
return false; return false;
} }
} }
@ -491,66 +491,57 @@ class DistributedScanAllAndFilterCursor : public Cursor {
} }
} }
void PrepareNextFrames(ExecutionContext &context) { bool PullNextFrames(ExecutionContext &context) {
auto &request_router = *context.request_router; input_cursor_->PullMultiple(*own_multi_frame_, context);
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
input_cursor_->PullMultiple(*own_multi_frames_, context); own_frames_it_ = own_frames_consumer_->begin();
valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer(); return own_multi_frame_->HasValidFrame();
valid_frames_it_ = valid_frames_consumer_->begin();
MakeRequest(request_router, context);
} }
inline bool HasNextFrame() { inline bool HasMoreResult() {
return current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end(); return current_vertex_it_ != current_batch_.end() && own_frames_it_ != own_frames_consumer_->end();
} }
FrameWithValidity GetNextFrame(ExecutionContext &context) { bool PopulateFrame(ExecutionContext &context, FrameWithValidity &frame) {
MG_ASSERT(HasNextFrame()); MG_ASSERT(HasMoreResult());
auto frame = *valid_frames_it_; frame = *own_frames_it_;
frame[output_symbol_] = TypedValue(*current_vertex_it_); frame[output_symbol_] = TypedValue(*current_vertex_it_);
++current_vertex_it_; ++current_vertex_it_;
if (current_vertex_it_ == current_batch_.end()) { if (current_vertex_it_ == current_batch_.end()) {
valid_frames_it_->MakeInvalid(); own_frames_it_->MakeInvalid();
++valid_frames_it_; ++own_frames_it_;
if (valid_frames_it_ == valid_frames_consumer_->end()) { current_vertex_it_ = current_batch_.begin();
PrepareNextFrames(context);
} else { if (own_frames_it_ == own_frames_consumer_->end()) {
current_vertex_it_ = current_batch_.begin(); return PullNextFrames(context);
} }
}; };
return true;
return frame;
} }
void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override { void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_); SCOPED_PROFILE_OP(op_name_);
if (!own_multi_frames_.has_value()) { if (!own_multi_frame_.has_value()) {
own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), own_multi_frame_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe,
kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource())); input_multi_frame.GetMemoryResource()));
PrepareNextFrames(context);
MakeRequest(context);
PullNextFrames(context);
} }
while (true) { if (!HasMoreResult()) {
return;
}
for (auto &frame : input_multi_frame.GetInvalidFramesPopulator()) {
if (MustAbort(context)) { if (MustAbort(context)) {
throw HintedAbortError(); throw HintedAbortError();
} }
if (!PopulateFrame(context, frame)) {
auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
auto invalid_frame_it = invalid_frames_populator.begin();
auto has_modified_at_least_one_frame = false;
while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) {
has_modified_at_least_one_frame = true;
*invalid_frame_it = GetNextFrame(context);
++invalid_frame_it;
}
if (!has_modified_at_least_one_frame) {
return; return;
} }
} }
@ -577,9 +568,9 @@ class DistributedScanAllAndFilterCursor : public Cursor {
std::optional<storage::v3::LabelId> label_; std::optional<storage::v3::LabelId> label_;
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_; std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
std::optional<std::vector<Expression *>> filter_expressions_; std::optional<std::vector<Expression *>> filter_expressions_;
std::optional<MultiFrame> own_multi_frames_; std::optional<MultiFrame> own_multi_frame_;
std::optional<ValidFramesConsumer> valid_frames_consumer_; std::optional<ValidFramesConsumer> own_frames_consumer_;
ValidFramesConsumer::Iterator valid_frames_it_; ValidFramesConsumer::Iterator own_frames_it_;
}; };
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view) ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)