Populate custom data of profile query with request wait times

This commit is contained in:
János Benjamin Antal 2022-10-25 20:15:06 +02:00
parent 5784c0d473
commit 1703cd039d

View File

@ -155,7 +155,13 @@ uint64_t ComputeProfilingKey(const T *obj) {
} // namespace
#define SCOPED_PROFILE_OP(name) ScopedProfile profile{ComputeProfilingKey(this), name, &context};
#define SCOPED_PROFILE_OP(name) \
ScopedProfile profile { ComputeProfilingKey(this), name, &context }
#define SCOPED_CUSTOM_PROFILE(name) \
ScopedCustomProfile custom_profile { name, context }
#define SCOPED_REQUEST_WAIT_PROFILE SCOPED_CUSTOM_PROFILE("request_wait")
class DistributedCreateNodeCursor : public Cursor {
public:
@ -168,7 +174,10 @@ class DistributedCreateNodeCursor : public Cursor {
SCOPED_PROFILE_OP("CreateNode");
if (input_cursor_->Pull(frame, context)) {
auto &shard_manager = context.shard_request_manager;
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
{
SCOPED_REQUEST_WAIT_PROFILE;
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
}
return true;
}
@ -361,14 +370,18 @@ class DistributedScanAllAndFilterCursor : public Cursor {
using VertexAccessor = accessors::VertexAccessor;
bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager) {
current_batch = shard_manager.Request(request_state_);
bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager, ExecutionContext &context) {
{
SCOPED_REQUEST_WAIT_PROFILE;
current_batch = shard_manager.Request(request_state_);
}
current_vertex_it = current_batch.begin();
return !current_batch.empty();
}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_);
auto &shard_manager = *context.shard_request_manager;
while (true) {
if (MustAbort(context)) {
@ -383,7 +396,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
}
if (current_vertex_it == current_batch.end()) {
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) {
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager, context)) {
ResetExecutionState();
continue;
}
@ -2367,7 +2380,10 @@ class DistributedCreateExpandCursor : public Cursor {
}
auto &shard_manager = context.shard_request_manager;
ResetExecutionState();
shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame));
{
SCOPED_REQUEST_WAIT_PROFILE;
shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame));
}
return true;
}
@ -2498,7 +2514,10 @@ class DistributedExpandCursor : public Cursor {
request.edge_properties.emplace();
request.src_vertices.push_back(vertex.Id());
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
auto result_rows = std::invoke([&context, &request_state, &request]() mutable {
SCOPED_REQUEST_WAIT_PROFILE;
return context.shard_request_manager->Request(request_state, std::move(request));
});
MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front();