Merge pull request #617 from memgraph/T1105-MG-profile-query-in-distributed
This commit is contained in:
commit
17a104677d
@ -116,7 +116,7 @@ declare -A primary_urls=(
|
||||
["pymgclient"]="http://$local_cache_host/git/pymgclient.git"
|
||||
["mgconsole"]="http://$local_cache_host/git/mgconsole.git"
|
||||
["spdlog"]="http://$local_cache_host/git/spdlog"
|
||||
["nlohmann"]="http://$local_cache_host/file/nlohmann/json/4f8fba14066156b73f1189a2b8bd568bde5284c5/single_include/nlohmann/json.hpp"
|
||||
["nlohmann"]="http://$local_cache_host/file/nlohmann/json/9d69186291aca4f0137b69c1dee313b391ff564c/single_include/nlohmann/json.hpp"
|
||||
["neo4j"]="http://$local_cache_host/file/neo4j-community-3.2.3-unix.tar.gz"
|
||||
["librdkafka"]="http://$local_cache_host/git/librdkafka.git"
|
||||
["protobuf"]="http://$local_cache_host/git/protobuf.git"
|
||||
@ -141,7 +141,7 @@ declare -A secondary_urls=(
|
||||
["pymgclient"]="https://github.com/memgraph/pymgclient.git"
|
||||
["mgconsole"]="http://github.com/memgraph/mgconsole.git"
|
||||
["spdlog"]="https://github.com/gabime/spdlog"
|
||||
["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/4f8fba14066156b73f1189a2b8bd568bde5284c5/single_include/nlohmann/json.hpp"
|
||||
["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/9d69186291aca4f0137b69c1dee313b391ff564c/single_include/nlohmann/json.hpp"
|
||||
["neo4j"]="https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/neo4j-community-3.2.3-unix.tar.gz"
|
||||
["librdkafka"]="https://github.com/edenhill/librdkafka.git"
|
||||
["protobuf"]="https://github.com/protocolbuffers/protobuf.git"
|
||||
|
@ -1043,7 +1043,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
|
||||
auto rw_type_checker = plan::ReadWriteTypeChecker();
|
||||
rw_type_checker.InferRWType(const_cast<plan::LogicalOperator &>(cypher_query_plan->plan()));
|
||||
|
||||
return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"},
|
||||
return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME", "CUSTOM DATA"},
|
||||
std::move(parsed_query.required_privileges),
|
||||
[plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters),
|
||||
summary, dba, interpreter_context, execution_memory, memory_limit, shard_request_manager,
|
||||
|
@ -155,7 +155,16 @@ uint64_t ComputeProfilingKey(const T *obj) {
|
||||
|
||||
} // namespace
|
||||
|
||||
#define SCOPED_PROFILE_OP(name) ScopedProfile profile{ComputeProfilingKey(this), name, &context};
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
|
||||
#define SCOPED_PROFILE_OP(name) \
|
||||
ScopedProfile profile { ComputeProfilingKey(this), name, &context }
|
||||
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
|
||||
#define SCOPED_CUSTOM_PROFILE(name) \
|
||||
ScopedCustomProfile custom_profile { name, context }
|
||||
|
||||
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
|
||||
#define SCOPED_REQUEST_WAIT_PROFILE SCOPED_CUSTOM_PROFILE("request_wait")
|
||||
|
||||
class DistributedCreateNodeCursor : public Cursor {
|
||||
public:
|
||||
@ -168,7 +177,10 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
SCOPED_PROFILE_OP("CreateNode");
|
||||
if (input_cursor_->Pull(frame, context)) {
|
||||
auto &shard_manager = context.shard_request_manager;
|
||||
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
|
||||
{
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
|
||||
}
|
||||
PlaceNodeOnTheFrame(frame, context);
|
||||
return true;
|
||||
}
|
||||
@ -373,38 +385,43 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
|
||||
using VertexAccessor = accessors::VertexAccessor;
|
||||
|
||||
bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager) {
|
||||
current_batch = shard_manager.Request(request_state_);
|
||||
bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager, ExecutionContext &context) {
|
||||
{
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
current_batch = shard_manager.Request(request_state_);
|
||||
}
|
||||
current_vertex_it = current_batch.begin();
|
||||
return !current_batch.empty();
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
auto &shard_manager = *context.shard_request_manager;
|
||||
if (MustAbort(context)) {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
using State = msgs::ExecutionState<msgs::ScanVerticesRequest>;
|
||||
|
||||
if (request_state_.state == State::INITIALIZING) {
|
||||
if (!input_cursor_->Pull(frame, context)) {
|
||||
return false;
|
||||
while (true) {
|
||||
if (MustAbort(context)) {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
}
|
||||
using State = msgs::ExecutionState<msgs::ScanVerticesRequest>;
|
||||
|
||||
request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt;
|
||||
if (request_state_.state == State::INITIALIZING) {
|
||||
if (!input_cursor_->Pull(frame, context)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (current_vertex_it == current_batch.end()) {
|
||||
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) {
|
||||
request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt;
|
||||
|
||||
if (current_vertex_it == current_batch.end() &&
|
||||
(request_state_.state == State::COMPLETED || !MakeRequest(shard_manager, context))) {
|
||||
ResetExecutionState();
|
||||
return Pull(frame, context);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
|
||||
++current_vertex_it;
|
||||
return true;
|
||||
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
|
||||
++current_vertex_it;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
@ -2379,7 +2396,10 @@ class DistributedCreateExpandCursor : public Cursor {
|
||||
}
|
||||
auto &shard_manager = context.shard_request_manager;
|
||||
ResetExecutionState();
|
||||
shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame));
|
||||
{
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -2535,7 +2555,11 @@ class DistributedExpandCursor : public Cursor {
|
||||
request.edge_properties.emplace();
|
||||
request.src_vertices.push_back(vertex.Id());
|
||||
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
|
||||
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
|
||||
auto result_rows = std::invoke([&context, &request_state, &request]() mutable {
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
return context.shard_request_manager->Request(request_state, std::move(request));
|
||||
});
|
||||
MG_ASSERT(result_rows.size() == 1);
|
||||
auto &result_row = result_rows.front();
|
||||
|
||||
if (self_.common_.existing_node) {
|
||||
|
@ -24,18 +24,18 @@ namespace memgraph::query::v2::plan {
|
||||
|
||||
namespace {
|
||||
|
||||
unsigned long long IndividualCycles(const ProfilingStats &cumulative_stats) {
|
||||
uint64_t IndividualCycles(const ProfilingStats &cumulative_stats) {
|
||||
return cumulative_stats.num_cycles - std::accumulate(cumulative_stats.children.begin(),
|
||||
cumulative_stats.children.end(), 0ULL,
|
||||
[](auto acc, auto &stats) { return acc + stats.num_cycles; });
|
||||
}
|
||||
|
||||
double RelativeTime(unsigned long long num_cycles, unsigned long long total_cycles) {
|
||||
return static_cast<double>(num_cycles) / total_cycles;
|
||||
double RelativeTime(const uint64_t num_cycles, const uint64_t total_cycles) {
|
||||
return static_cast<double>(num_cycles) / static_cast<double>(total_cycles);
|
||||
}
|
||||
|
||||
double AbsoluteTime(unsigned long long num_cycles, unsigned long long total_cycles,
|
||||
std::chrono::duration<double> total_time) {
|
||||
double AbsoluteTime(const uint64_t num_cycles, const uint64_t total_cycles,
|
||||
const std::chrono::duration<double> total_time) {
|
||||
return (RelativeTime(num_cycles, total_cycles) * static_cast<std::chrono::duration<double, std::milli>>(total_time))
|
||||
.count();
|
||||
}
|
||||
@ -50,26 +50,29 @@ namespace {
|
||||
|
||||
class ProfilingStatsToTableHelper {
|
||||
public:
|
||||
ProfilingStatsToTableHelper(unsigned long long total_cycles, std::chrono::duration<double> total_time)
|
||||
ProfilingStatsToTableHelper(uint64_t total_cycles, std::chrono::duration<double> total_time)
|
||||
: total_cycles_(total_cycles), total_time_(total_time) {}
|
||||
|
||||
void Output(const ProfilingStats &cumulative_stats) {
|
||||
auto cycles = IndividualCycles(cumulative_stats);
|
||||
auto custom_data_copy = cumulative_stats.custom_data;
|
||||
ConvertCyclesToTime(custom_data_copy);
|
||||
|
||||
rows_.emplace_back(std::vector<TypedValue>{
|
||||
TypedValue(FormatOperator(cumulative_stats.name)), TypedValue(cumulative_stats.actual_hits),
|
||||
TypedValue(FormatRelativeTime(cycles)), TypedValue(FormatAbsoluteTime(cycles))});
|
||||
rows_.emplace_back(
|
||||
std::vector<TypedValue>{TypedValue(FormatOperator(cumulative_stats.name)),
|
||||
TypedValue(cumulative_stats.actual_hits), TypedValue(FormatRelativeTime(cycles)),
|
||||
TypedValue(FormatAbsoluteTime(cycles)), TypedValue(custom_data_copy.dump())});
|
||||
|
||||
for (size_t i = 1; i < cumulative_stats.children.size(); ++i) {
|
||||
Branch(cumulative_stats.children[i]);
|
||||
}
|
||||
|
||||
if (cumulative_stats.children.size() >= 1) {
|
||||
if (!cumulative_stats.children.empty()) {
|
||||
Output(cumulative_stats.children[0]);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::vector<TypedValue>> rows() { return rows_; }
|
||||
std::vector<std::vector<TypedValue>> rows() const { return rows_; }
|
||||
|
||||
private:
|
||||
void Branch(const ProfilingStats &cumulative_stats) {
|
||||
@ -80,7 +83,28 @@ class ProfilingStatsToTableHelper {
|
||||
--depth_;
|
||||
}
|
||||
|
||||
std::string Format(const char *str) {
|
||||
double AbsoluteTime(const uint64_t cycles) const { return plan::AbsoluteTime(cycles, total_cycles_, total_time_); }
|
||||
|
||||
double RelativeTime(const uint64_t cycles) const { return plan::RelativeTime(cycles, total_cycles_); }
|
||||
|
||||
void ConvertCyclesToTime(nlohmann::json &custom_data) const {
|
||||
const auto convert_cycles_in_json = [this](nlohmann::json &json) {
|
||||
if (!json.is_object()) {
|
||||
return;
|
||||
}
|
||||
if (auto it = json.find(ProfilingStats::kNumCycles); it != json.end()) {
|
||||
auto num_cycles = it.value().get<uint64_t>();
|
||||
json[ProfilingStats::kAbsoluteTime] = AbsoluteTime(num_cycles);
|
||||
json[ProfilingStats::kRelativeTime] = RelativeTime(num_cycles);
|
||||
}
|
||||
};
|
||||
|
||||
for (auto &json : custom_data) {
|
||||
convert_cycles_in_json(json);
|
||||
}
|
||||
}
|
||||
|
||||
std::string Format(const char *str) const {
|
||||
std::ostringstream ss;
|
||||
for (int64_t i = 0; i < depth_; ++i) {
|
||||
ss << "| ";
|
||||
@ -89,21 +113,21 @@ class ProfilingStatsToTableHelper {
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
std::string Format(const std::string &str) { return Format(str.c_str()); }
|
||||
std::string Format(const std::string &str) const { return Format(str.c_str()); }
|
||||
|
||||
std::string FormatOperator(const char *str) { return Format(std::string("* ") + str); }
|
||||
std::string FormatOperator(const char *str) const { return Format(std::string("* ") + str); }
|
||||
|
||||
std::string FormatRelativeTime(unsigned long long num_cycles) {
|
||||
return fmt::format("{: 10.6f} %", RelativeTime(num_cycles, total_cycles_) * 100);
|
||||
std::string FormatRelativeTime(uint64_t num_cycles) const {
|
||||
return fmt::format("{: 10.6f} %", RelativeTime(num_cycles) * 100);
|
||||
}
|
||||
|
||||
std::string FormatAbsoluteTime(unsigned long long num_cycles) {
|
||||
return fmt::format("{: 10.6f} ms", AbsoluteTime(num_cycles, total_cycles_, total_time_));
|
||||
std::string FormatAbsoluteTime(uint64_t num_cycles) const {
|
||||
return fmt::format("{: 10.6f} ms", AbsoluteTime(num_cycles));
|
||||
}
|
||||
|
||||
int64_t depth_{0};
|
||||
std::vector<std::vector<TypedValue>> rows_;
|
||||
unsigned long long total_cycles_;
|
||||
uint64_t total_cycles_;
|
||||
std::chrono::duration<double> total_time_;
|
||||
};
|
||||
|
||||
@ -126,7 +150,7 @@ class ProfilingStatsToJsonHelper {
|
||||
using json = nlohmann::json;
|
||||
|
||||
public:
|
||||
ProfilingStatsToJsonHelper(unsigned long long total_cycles, std::chrono::duration<double> total_time)
|
||||
ProfilingStatsToJsonHelper(uint64_t total_cycles, std::chrono::duration<double> total_time)
|
||||
: total_cycles_(total_cycles), total_time_(total_time) {}
|
||||
|
||||
void Output(const ProfilingStats &cumulative_stats) { return Output(cumulative_stats, &json_); }
|
||||
@ -151,7 +175,7 @@ class ProfilingStatsToJsonHelper {
|
||||
}
|
||||
|
||||
json json_;
|
||||
unsigned long long total_cycles_;
|
||||
uint64_t total_cycles_;
|
||||
std::chrono::duration<double> total_time_;
|
||||
};
|
||||
|
||||
|
@ -26,10 +26,15 @@ namespace plan {
|
||||
* Stores profiling statistics for a single logical operator.
|
||||
*/
|
||||
struct ProfilingStats {
|
||||
static constexpr std::string_view kNumCycles{"num_cycles"};
|
||||
static constexpr std::string_view kRelativeTime{"relative_time"};
|
||||
static constexpr std::string_view kAbsoluteTime{"absolute_time"};
|
||||
|
||||
int64_t actual_hits{0};
|
||||
unsigned long long num_cycles{0};
|
||||
uint64_t num_cycles{0};
|
||||
uint64_t key{0};
|
||||
const char *name{nullptr};
|
||||
nlohmann::json custom_data;
|
||||
// TODO: This should use the allocator for query execution
|
||||
std::vector<ProfilingStats> children;
|
||||
};
|
||||
|
@ -13,6 +13,8 @@
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#include <json/json.hpp>
|
||||
|
||||
#include "query/v2/context.hpp"
|
||||
#include "query/v2/plan/profile.hpp"
|
||||
#include "utils/likely.hpp"
|
||||
@ -20,6 +22,38 @@
|
||||
|
||||
namespace memgraph::query::v2::plan {
|
||||
|
||||
class ScopedCustomProfile {
|
||||
public:
|
||||
explicit ScopedCustomProfile(const std::string_view custom_data_name, ExecutionContext &context)
|
||||
: custom_data_name_(custom_data_name), start_time_{utils::ReadTSC()}, context_{&context} {}
|
||||
|
||||
ScopedCustomProfile(const ScopedCustomProfile &) = delete;
|
||||
ScopedCustomProfile(ScopedCustomProfile &&) = delete;
|
||||
ScopedCustomProfile &operator=(const ScopedCustomProfile &) = delete;
|
||||
ScopedCustomProfile &operator=(ScopedCustomProfile &&) = delete;
|
||||
|
||||
// If an exception is thrown in any of these functions that signals a problem that is much bigger than we could handle
|
||||
// it here, thus we don't attempt to handle it.
|
||||
// NOLINTNEXTLINE(bugprone-exception-escape)
|
||||
~ScopedCustomProfile() {
|
||||
if (nullptr != context_->stats_root) [[unlikely]] {
|
||||
auto &custom_data = context_->stats_root->custom_data[custom_data_name_];
|
||||
if (!custom_data.is_object()) {
|
||||
custom_data = nlohmann::json::object();
|
||||
}
|
||||
const auto elapsed = utils::ReadTSC() - start_time_;
|
||||
auto &num_cycles_json = custom_data[ProfilingStats::kNumCycles];
|
||||
const auto num_cycles = num_cycles_json.is_null() ? 0 : num_cycles_json.get<uint64_t>();
|
||||
num_cycles_json = num_cycles + elapsed;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::string_view custom_data_name_;
|
||||
uint64_t start_time_;
|
||||
ExecutionContext *context_;
|
||||
};
|
||||
|
||||
/**
|
||||
* A RAII class used for profiling logical operators. Instances of this class
|
||||
* update the profiling data stored within the `ExecutionContext` object and build
|
||||
@ -29,7 +63,7 @@ namespace memgraph::query::v2::plan {
|
||||
class ScopedProfile {
|
||||
public:
|
||||
ScopedProfile(uint64_t key, const char *name, query::v2::ExecutionContext *context) noexcept : context_(context) {
|
||||
if (UNLIKELY(context_->is_profile_query)) {
|
||||
if (IsProfiling()) [[unlikely]] {
|
||||
root_ = context_->stats_root;
|
||||
|
||||
// Are we the root logical operator?
|
||||
@ -60,8 +94,13 @@ class ScopedProfile {
|
||||
}
|
||||
}
|
||||
|
||||
ScopedProfile(const ScopedProfile &) = delete;
|
||||
ScopedProfile(ScopedProfile &&) = delete;
|
||||
ScopedProfile &operator=(const ScopedProfile &) = delete;
|
||||
ScopedProfile &operator=(ScopedProfile &&) = delete;
|
||||
|
||||
~ScopedProfile() noexcept {
|
||||
if (UNLIKELY(context_->is_profile_query)) {
|
||||
if (IsProfiling()) [[unlikely]] {
|
||||
stats_->num_cycles += utils::ReadTSC() - start_time_;
|
||||
|
||||
// Restore the old root ("pop")
|
||||
@ -70,10 +109,12 @@ class ScopedProfile {
|
||||
}
|
||||
|
||||
private:
|
||||
[[nodiscard]] bool IsProfiling() const { return context_->is_profile_query; }
|
||||
|
||||
query::v2::ExecutionContext *context_;
|
||||
ProfilingStats *root_{nullptr};
|
||||
ProfilingStats *stats_{nullptr};
|
||||
unsigned long long start_time_{0};
|
||||
uint64_t start_time_{0};
|
||||
};
|
||||
|
||||
} // namespace memgraph::query::v2::plan
|
||||
|
@ -18,7 +18,7 @@ extern "C" {
|
||||
#include "utils/tsc.hpp"
|
||||
|
||||
namespace memgraph::utils {
|
||||
uint64_t ReadTSC() { return rdtsc(); }
|
||||
uint64_t ReadTSC() noexcept { return rdtsc(); }
|
||||
|
||||
std::optional<double> GetTSCFrequency() {
|
||||
// init is only needed for fetching frequency
|
||||
|
@ -18,7 +18,7 @@ namespace memgraph::utils {
|
||||
|
||||
// TSC stands for Time-Stamp Counter
|
||||
|
||||
uint64_t ReadTSC();
|
||||
uint64_t ReadTSC() noexcept;
|
||||
|
||||
std::optional<double> GetTSCFrequency();
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user