Remove unnecessary state from DistributedScanAllAndFilterCursor
This commit is contained in:
parent
b91b16de96
commit
36891c119b
@ -467,31 +467,24 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
current_batch_ = request_router.ScanVertices(request_label);
|
||||
}
|
||||
current_vertex_it_ = current_batch_.begin();
|
||||
request_state_ = State::COMPLETED;
|
||||
return !current_batch_.empty();
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
auto &request_router = *context.request_router;
|
||||
while (true) {
|
||||
if (MustAbort(context)) {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
|
||||
if (request_state_ == State::INITIALIZING) {
|
||||
if (!input_cursor_->Pull(frame, context)) {
|
||||
if (current_vertex_it_ == current_batch_.end()) {
|
||||
ResetExecutionState();
|
||||
if (!input_cursor_->Pull(frame, context) || !MakeRequest(*context.request_router, context)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (current_vertex_it_ == current_batch_.end() &&
|
||||
(request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) {
|
||||
ResetExecutionState();
|
||||
continue;
|
||||
}
|
||||
|
||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
|
||||
++current_vertex_it_;
|
||||
return true;
|
||||
@ -568,7 +561,6 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
void ResetExecutionState() {
|
||||
current_batch_.clear();
|
||||
current_vertex_it_ = current_batch_.end();
|
||||
request_state_ = State::INITIALIZING;
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
@ -581,15 +573,13 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
const char *op_name_;
|
||||
std::vector<VertexAccessor> current_batch_;
|
||||
std::vector<VertexAccessor>::iterator current_vertex_it_;
|
||||
State request_state_ = State::INITIALIZING;
|
||||
std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()};
|
||||
std::optional<storage::v3::LabelId> label_;
|
||||
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
|
||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||
std::optional<MultiFrame> own_multi_frames_;
|
||||
std::optional<ValidFramesConsumer> valid_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator valid_frames_it_;
|
||||
std::queue<FrameWithValidity> frames_buffer_;
|
||||
};
|
||||
|
||||
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
||||
@ -597,8 +587,6 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy
|
||||
|
||||
ACCEPT_WITH_INPUT(ScanAll)
|
||||
|
||||
class DistributedScanAllCursor;
|
||||
|
||||
UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
|
||||
EventCounter::IncrementCounter(EventCounter::ScanAllOperator);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user