diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 62a927cfd..692638338 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -155,7 +155,13 @@ uint64_t ComputeProfilingKey(const T *obj) { } // namespace -#define SCOPED_PROFILE_OP(name) ScopedProfile profile{ComputeProfilingKey(this), name, &context}; +#define SCOPED_PROFILE_OP(name) \ + ScopedProfile profile { ComputeProfilingKey(this), name, &context } + +#define SCOPED_CUSTOM_PROFILE(name) \ + ScopedCustomProfile custom_profile { name, context } + +#define SCOPED_REQUEST_WAIT_PROFILE SCOPED_CUSTOM_PROFILE("request_wait") class DistributedCreateNodeCursor : public Cursor { public: @@ -168,7 +174,10 @@ class DistributedCreateNodeCursor : public Cursor { SCOPED_PROFILE_OP("CreateNode"); if (input_cursor_->Pull(frame, context)) { auto &shard_manager = context.shard_request_manager; - shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); + { + SCOPED_REQUEST_WAIT_PROFILE; + shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); + } return true; } @@ -361,14 +370,18 @@ class DistributedScanAllAndFilterCursor : public Cursor { using VertexAccessor = accessors::VertexAccessor; - bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager) { - current_batch = shard_manager.Request(request_state_); + bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager, ExecutionContext &context) { + { + SCOPED_REQUEST_WAIT_PROFILE; + current_batch = shard_manager.Request(request_state_); + } current_vertex_it = current_batch.begin(); return !current_batch.empty(); } bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); + auto &shard_manager = *context.shard_request_manager; while (true) { if (MustAbort(context)) { @@ -383,7 +396,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { } if (current_vertex_it == current_batch.end()) { - if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) { + if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager, context)) { ResetExecutionState(); continue; } @@ -2367,7 +2380,10 @@ class DistributedCreateExpandCursor : public Cursor { } auto &shard_manager = context.shard_request_manager; ResetExecutionState(); - shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame)); + { + SCOPED_REQUEST_WAIT_PROFILE; + shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame)); + } return true; } @@ -2498,7 +2514,10 @@ class DistributedExpandCursor : public Cursor { request.edge_properties.emplace(); request.src_vertices.push_back(vertex.Id()); msgs::ExecutionState<msgs::ExpandOneRequest> request_state; - auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); + auto result_rows = std::invoke([&context, &request_state, &request]() mutable { + SCOPED_REQUEST_WAIT_PROFILE; + return context.shard_request_manager->Request(request_state, std::move(request)); + }); MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front();