Impl of version more memory friendly
This commit is contained in:
parent
54ce79baa0
commit
311994a36d
@ -200,9 +200,9 @@ class ValidFramesConsumer {
|
||||
explicit ValidFramesConsumer(MultiFrame &multiframe);
|
||||
|
||||
~ValidFramesConsumer() noexcept;
|
||||
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
|
||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
|
||||
ValidFramesConsumer(const ValidFramesConsumer &other) = default;
|
||||
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
|
||||
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = default;
|
||||
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
|
||||
|
||||
struct Iterator {
|
||||
|
@ -508,26 +508,39 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
void Generate(ExecutionContext &context) {
|
||||
void PrepareNextFrames(ExecutionContext &context) {
|
||||
auto &request_router = *context.request_router;
|
||||
|
||||
input_cursor_->PullMultiple(*own_multi_frames_, context);
|
||||
valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer();
|
||||
valid_frames_it_ = valid_frames_consumer_->begin();
|
||||
|
||||
if (!MakeRequest(request_router, context)) {
|
||||
return;
|
||||
}
|
||||
MakeRequest(request_router, context);
|
||||
|
||||
auto valid_frames_consumer = own_multi_frames_->GetValidFramesConsumer();
|
||||
has_next_frame_ = current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end();
|
||||
}
|
||||
|
||||
for (auto valid_frames_it = valid_frames_consumer.begin(); valid_frames_it != valid_frames_consumer.end();
|
||||
++valid_frames_it) {
|
||||
for (auto vertex_it = current_batch_.begin(); vertex_it != current_batch_.end(); ++vertex_it) {
|
||||
auto frame = *valid_frames_it;
|
||||
frame[output_symbol_] = TypedValue(*vertex_it);
|
||||
frames_buffer_.push(std::move(frame));
|
||||
inline bool HasNextFrame() { return has_next_frame_; }
|
||||
|
||||
FrameWithValidity GetNextFrame(ExecutionContext &context) {
|
||||
MG_ASSERT(HasNextFrame());
|
||||
|
||||
auto frame = *valid_frames_it_;
|
||||
frame[output_symbol_] = TypedValue(*current_vertex_it_);
|
||||
|
||||
++current_vertex_it_;
|
||||
if (current_vertex_it_ == current_batch_.end()) {
|
||||
valid_frames_it_->MakeInvalid();
|
||||
++valid_frames_it_;
|
||||
|
||||
if (valid_frames_it_ == valid_frames_consumer_->end()) {
|
||||
PrepareNextFrames(context);
|
||||
} else {
|
||||
current_vertex_it_ = current_batch_.begin();
|
||||
}
|
||||
valid_frames_it->MakeInvalid();
|
||||
}
|
||||
};
|
||||
|
||||
return frame;
|
||||
}
|
||||
|
||||
void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
|
||||
@ -536,6 +549,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
if (!own_multi_frames_.has_value()) {
|
||||
own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(),
|
||||
kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
|
||||
PrepareNextFrames(context);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
@ -543,19 +557,14 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
|
||||
const auto should_generate = frames_buffer_.empty();
|
||||
if (should_generate) {
|
||||
Generate(context);
|
||||
}
|
||||
|
||||
auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
|
||||
auto invalid_frame_it = invalid_frames_populator.begin();
|
||||
auto has_modified_at_least_one_frame = false;
|
||||
while (invalid_frames_populator.end() != invalid_frame_it && !frames_buffer_.empty()) {
|
||||
|
||||
while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) {
|
||||
has_modified_at_least_one_frame = true;
|
||||
*invalid_frame_it = frames_buffer_.front();
|
||||
*invalid_frame_it = GetNextFrame(context);
|
||||
++invalid_frame_it;
|
||||
frames_buffer_.pop();
|
||||
}
|
||||
|
||||
if (!has_modified_at_least_one_frame) {
|
||||
@ -588,7 +597,10 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
|
||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||
std::optional<MultiFrame> own_multi_frames_;
|
||||
std::optional<ValidFramesConsumer> valid_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator valid_frames_it_;
|
||||
std::queue<FrameWithValidity> frames_buffer_;
|
||||
bool has_next_frame_;
|
||||
};
|
||||
|
||||
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
||||
|
Loading…
Reference in New Issue
Block a user