Fix profile queries with ScanAll

This commit is contained in:
János Benjamin Antal 2022-10-25 14:37:18 +02:00
parent 5939fb2b0c
commit 8ebc704819

View File

@ -370,27 +370,29 @@ class DistributedScanAllAndFilterCursor : public Cursor {
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_);
auto &shard_manager = *context.shard_request_manager;
if (MustAbort(context)) {
throw HintedAbortError();
}
using State = msgs::ExecutionState<msgs::ScanVerticesRequest>;
if (request_state_.state == State::INITIALIZING) {
if (!input_cursor_->Pull(frame, context)) {
return false;
while (true) {
if (MustAbort(context)) {
throw HintedAbortError();
}
}
using State = msgs::ExecutionState<msgs::ScanVerticesRequest>;
if (current_vertex_it == current_batch.end()) {
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) {
ResetExecutionState();
return Pull(frame, context);
if (request_state_.state == State::INITIALIZING) {
if (!input_cursor_->Pull(frame, context)) {
return false;
}
}
}
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
++current_vertex_it;
return true;
if (current_vertex_it == current_batch.end()) {
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) {
ResetExecutionState();
continue;
}
}
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
++current_vertex_it;
return true;
}
}
void Shutdown() override { input_cursor_->Shutdown(); }