diff --git a/libs/setup.sh b/libs/setup.sh index d902aadb1..bb737d432 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -116,7 +116,7 @@ declare -A primary_urls=( ["pymgclient"]="http://$local_cache_host/git/pymgclient.git" ["mgconsole"]="http://$local_cache_host/git/mgconsole.git" ["spdlog"]="http://$local_cache_host/git/spdlog" - ["nlohmann"]="http://$local_cache_host/file/nlohmann/json/4f8fba14066156b73f1189a2b8bd568bde5284c5/single_include/nlohmann/json.hpp" + ["nlohmann"]="http://$local_cache_host/file/nlohmann/json/9d69186291aca4f0137b69c1dee313b391ff564c/single_include/nlohmann/json.hpp" ["neo4j"]="http://$local_cache_host/file/neo4j-community-3.2.3-unix.tar.gz" ["librdkafka"]="http://$local_cache_host/git/librdkafka.git" ["protobuf"]="http://$local_cache_host/git/protobuf.git" @@ -141,7 +141,7 @@ declare -A secondary_urls=( ["pymgclient"]="https://github.com/memgraph/pymgclient.git" ["mgconsole"]="http://github.com/memgraph/mgconsole.git" ["spdlog"]="https://github.com/gabime/spdlog" - ["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/4f8fba14066156b73f1189a2b8bd568bde5284c5/single_include/nlohmann/json.hpp" + ["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/9d69186291aca4f0137b69c1dee313b391ff564c/single_include/nlohmann/json.hpp" ["neo4j"]="https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/neo4j-community-3.2.3-unix.tar.gz" ["librdkafka"]="https://github.com/edenhill/librdkafka.git" ["protobuf"]="https://github.com/protocolbuffers/protobuf.git" diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index 93ef20764..c835c3d27 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -1043,7 +1043,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra auto rw_type_checker = plan::ReadWriteTypeChecker(); rw_type_checker.InferRWType(const_cast(cypher_query_plan->plan())); - return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"}, + return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME", "CUSTOM DATA"}, std::move(parsed_query.required_privileges), [plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), summary, dba, interpreter_context, execution_memory, memory_limit, shard_request_manager, diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 1a83be36f..0ab550ed1 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -155,7 +155,16 @@ uint64_t ComputeProfilingKey(const T *obj) { } // namespace -#define SCOPED_PROFILE_OP(name) ScopedProfile profile{ComputeProfilingKey(this), name, &context}; +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define SCOPED_PROFILE_OP(name) \ + ScopedProfile profile { ComputeProfilingKey(this), name, &context } + +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define SCOPED_CUSTOM_PROFILE(name) \ + ScopedCustomProfile custom_profile { name, context } + +// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) +#define SCOPED_REQUEST_WAIT_PROFILE SCOPED_CUSTOM_PROFILE("request_wait") class DistributedCreateNodeCursor : public Cursor { public: @@ -168,7 +177,10 @@ class DistributedCreateNodeCursor : public Cursor { SCOPED_PROFILE_OP("CreateNode"); if (input_cursor_->Pull(frame, context)) { auto &shard_manager = context.shard_request_manager; - shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); + { + SCOPED_REQUEST_WAIT_PROFILE; + shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); + } PlaceNodeOnTheFrame(frame, context); return true; } @@ -373,38 +385,43 @@ class DistributedScanAllAndFilterCursor : public Cursor { using VertexAccessor = accessors::VertexAccessor; - bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager) { - current_batch = shard_manager.Request(request_state_); + bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager, ExecutionContext &context) { + { + SCOPED_REQUEST_WAIT_PROFILE; + current_batch = shard_manager.Request(request_state_); + } current_vertex_it = current_batch.begin(); return !current_batch.empty(); } bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); + auto &shard_manager = *context.shard_request_manager; - if (MustAbort(context)) { - throw HintedAbortError(); - } - using State = msgs::ExecutionState; - - if (request_state_.state == State::INITIALIZING) { - if (!input_cursor_->Pull(frame, context)) { - return false; + while (true) { + if (MustAbort(context)) { + throw HintedAbortError(); } - } + using State = msgs::ExecutionState; - request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt; + if (request_state_.state == State::INITIALIZING) { + if (!input_cursor_->Pull(frame, context)) { + return false; + } + } - if (current_vertex_it == current_batch.end()) { - if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) { + request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt; + + if (current_vertex_it == current_batch.end() && + (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager, context))) { ResetExecutionState(); - return Pull(frame, context); + continue; } - } - frame[output_symbol_] = TypedValue(std::move(*current_vertex_it)); - ++current_vertex_it; - return true; + frame[output_symbol_] = TypedValue(std::move(*current_vertex_it)); + ++current_vertex_it; + return true; + } } void Shutdown() override { input_cursor_->Shutdown(); } @@ -2379,7 +2396,10 @@ class DistributedCreateExpandCursor : public Cursor { } auto &shard_manager = context.shard_request_manager; ResetExecutionState(); - shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame)); + { + SCOPED_REQUEST_WAIT_PROFILE; + shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame)); + } return true; } @@ -2535,7 +2555,11 @@ class DistributedExpandCursor : public Cursor { request.edge_properties.emplace(); request.src_vertices.push_back(vertex.Id()); msgs::ExecutionState request_state; - auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); + auto result_rows = std::invoke([&context, &request_state, &request]() mutable { + SCOPED_REQUEST_WAIT_PROFILE; + return context.shard_request_manager->Request(request_state, std::move(request)); + }); + MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front(); if (self_.common_.existing_node) { diff --git a/src/query/v2/plan/profile.cpp b/src/query/v2/plan/profile.cpp index 26c5280d4..cb4340ed7 100644 --- a/src/query/v2/plan/profile.cpp +++ b/src/query/v2/plan/profile.cpp @@ -24,18 +24,18 @@ namespace memgraph::query::v2::plan { namespace { -unsigned long long IndividualCycles(const ProfilingStats &cumulative_stats) { +uint64_t IndividualCycles(const ProfilingStats &cumulative_stats) { return cumulative_stats.num_cycles - std::accumulate(cumulative_stats.children.begin(), cumulative_stats.children.end(), 0ULL, [](auto acc, auto &stats) { return acc + stats.num_cycles; }); } -double RelativeTime(unsigned long long num_cycles, unsigned long long total_cycles) { - return static_cast(num_cycles) / total_cycles; +double RelativeTime(const uint64_t num_cycles, const uint64_t total_cycles) { + return static_cast(num_cycles) / static_cast(total_cycles); } -double AbsoluteTime(unsigned long long num_cycles, unsigned long long total_cycles, - std::chrono::duration total_time) { +double AbsoluteTime(const uint64_t num_cycles, const uint64_t total_cycles, + const std::chrono::duration total_time) { return (RelativeTime(num_cycles, total_cycles) * static_cast>(total_time)) .count(); } @@ -50,26 +50,29 @@ namespace { class ProfilingStatsToTableHelper { public: - ProfilingStatsToTableHelper(unsigned long long total_cycles, std::chrono::duration total_time) + ProfilingStatsToTableHelper(uint64_t total_cycles, std::chrono::duration total_time) : total_cycles_(total_cycles), total_time_(total_time) {} void Output(const ProfilingStats &cumulative_stats) { auto cycles = IndividualCycles(cumulative_stats); + auto custom_data_copy = cumulative_stats.custom_data; + ConvertCyclesToTime(custom_data_copy); - rows_.emplace_back(std::vector{ - TypedValue(FormatOperator(cumulative_stats.name)), TypedValue(cumulative_stats.actual_hits), - TypedValue(FormatRelativeTime(cycles)), TypedValue(FormatAbsoluteTime(cycles))}); + rows_.emplace_back( + std::vector{TypedValue(FormatOperator(cumulative_stats.name)), + TypedValue(cumulative_stats.actual_hits), TypedValue(FormatRelativeTime(cycles)), + TypedValue(FormatAbsoluteTime(cycles)), TypedValue(custom_data_copy.dump())}); for (size_t i = 1; i < cumulative_stats.children.size(); ++i) { Branch(cumulative_stats.children[i]); } - if (cumulative_stats.children.size() >= 1) { + if (!cumulative_stats.children.empty()) { Output(cumulative_stats.children[0]); } } - std::vector> rows() { return rows_; } + std::vector> rows() const { return rows_; } private: void Branch(const ProfilingStats &cumulative_stats) { @@ -80,7 +83,28 @@ class ProfilingStatsToTableHelper { --depth_; } - std::string Format(const char *str) { + double AbsoluteTime(const uint64_t cycles) const { return plan::AbsoluteTime(cycles, total_cycles_, total_time_); } + + double RelativeTime(const uint64_t cycles) const { return plan::RelativeTime(cycles, total_cycles_); } + + void ConvertCyclesToTime(nlohmann::json &custom_data) const { + const auto convert_cycles_in_json = [this](nlohmann::json &json) { + if (!json.is_object()) { + return; + } + if (auto it = json.find(ProfilingStats::kNumCycles); it != json.end()) { + auto num_cycles = it.value().get(); + json[ProfilingStats::kAbsoluteTime] = AbsoluteTime(num_cycles); + json[ProfilingStats::kRelativeTime] = RelativeTime(num_cycles); + } + }; + + for (auto &json : custom_data) { + convert_cycles_in_json(json); + } + } + + std::string Format(const char *str) const { std::ostringstream ss; for (int64_t i = 0; i < depth_; ++i) { ss << "| "; @@ -89,21 +113,21 @@ class ProfilingStatsToTableHelper { return ss.str(); } - std::string Format(const std::string &str) { return Format(str.c_str()); } + std::string Format(const std::string &str) const { return Format(str.c_str()); } - std::string FormatOperator(const char *str) { return Format(std::string("* ") + str); } + std::string FormatOperator(const char *str) const { return Format(std::string("* ") + str); } - std::string FormatRelativeTime(unsigned long long num_cycles) { - return fmt::format("{: 10.6f} %", RelativeTime(num_cycles, total_cycles_) * 100); + std::string FormatRelativeTime(uint64_t num_cycles) const { + return fmt::format("{: 10.6f} %", RelativeTime(num_cycles) * 100); } - std::string FormatAbsoluteTime(unsigned long long num_cycles) { - return fmt::format("{: 10.6f} ms", AbsoluteTime(num_cycles, total_cycles_, total_time_)); + std::string FormatAbsoluteTime(uint64_t num_cycles) const { + return fmt::format("{: 10.6f} ms", AbsoluteTime(num_cycles)); } int64_t depth_{0}; std::vector> rows_; - unsigned long long total_cycles_; + uint64_t total_cycles_; std::chrono::duration total_time_; }; @@ -126,7 +150,7 @@ class ProfilingStatsToJsonHelper { using json = nlohmann::json; public: - ProfilingStatsToJsonHelper(unsigned long long total_cycles, std::chrono::duration total_time) + ProfilingStatsToJsonHelper(uint64_t total_cycles, std::chrono::duration total_time) : total_cycles_(total_cycles), total_time_(total_time) {} void Output(const ProfilingStats &cumulative_stats) { return Output(cumulative_stats, &json_); } @@ -151,7 +175,7 @@ class ProfilingStatsToJsonHelper { } json json_; - unsigned long long total_cycles_; + uint64_t total_cycles_; std::chrono::duration total_time_; }; diff --git a/src/query/v2/plan/profile.hpp b/src/query/v2/plan/profile.hpp index 74437f82f..da25be33c 100644 --- a/src/query/v2/plan/profile.hpp +++ b/src/query/v2/plan/profile.hpp @@ -26,10 +26,15 @@ namespace plan { * Stores profiling statistics for a single logical operator. */ struct ProfilingStats { + static constexpr std::string_view kNumCycles{"num_cycles"}; + static constexpr std::string_view kRelativeTime{"relative_time"}; + static constexpr std::string_view kAbsoluteTime{"absolute_time"}; + int64_t actual_hits{0}; - unsigned long long num_cycles{0}; + uint64_t num_cycles{0}; uint64_t key{0}; const char *name{nullptr}; + nlohmann::json custom_data; // TODO: This should use the allocator for query execution std::vector children; }; diff --git a/src/query/v2/plan/scoped_profile.hpp b/src/query/v2/plan/scoped_profile.hpp index e71cbb047..3aa4bc569 100644 --- a/src/query/v2/plan/scoped_profile.hpp +++ b/src/query/v2/plan/scoped_profile.hpp @@ -13,6 +13,8 @@ #include +#include + #include "query/v2/context.hpp" #include "query/v2/plan/profile.hpp" #include "utils/likely.hpp" @@ -20,6 +22,38 @@ namespace memgraph::query::v2::plan { +class ScopedCustomProfile { + public: + explicit ScopedCustomProfile(const std::string_view custom_data_name, ExecutionContext &context) + : custom_data_name_(custom_data_name), start_time_{utils::ReadTSC()}, context_{&context} {} + + ScopedCustomProfile(const ScopedCustomProfile &) = delete; + ScopedCustomProfile(ScopedCustomProfile &&) = delete; + ScopedCustomProfile &operator=(const ScopedCustomProfile &) = delete; + ScopedCustomProfile &operator=(ScopedCustomProfile &&) = delete; + + // If an exception is thrown in any of these functions that signals a problem that is much bigger than we could handle + // it here, thus we don't attempt to handle it. + // NOLINTNEXTLINE(bugprone-exception-escape) + ~ScopedCustomProfile() { + if (nullptr != context_->stats_root) [[unlikely]] { + auto &custom_data = context_->stats_root->custom_data[custom_data_name_]; + if (!custom_data.is_object()) { + custom_data = nlohmann::json::object(); + } + const auto elapsed = utils::ReadTSC() - start_time_; + auto &num_cycles_json = custom_data[ProfilingStats::kNumCycles]; + const auto num_cycles = num_cycles_json.is_null() ? 0 : num_cycles_json.get(); + num_cycles_json = num_cycles + elapsed; + } + } + + private: + std::string_view custom_data_name_; + uint64_t start_time_; + ExecutionContext *context_; +}; + /** * A RAII class used for profiling logical operators. Instances of this class * update the profiling data stored within the `ExecutionContext` object and build @@ -29,7 +63,7 @@ namespace memgraph::query::v2::plan { class ScopedProfile { public: ScopedProfile(uint64_t key, const char *name, query::v2::ExecutionContext *context) noexcept : context_(context) { - if (UNLIKELY(context_->is_profile_query)) { + if (IsProfiling()) [[unlikely]] { root_ = context_->stats_root; // Are we the root logical operator? @@ -60,8 +94,13 @@ class ScopedProfile { } } + ScopedProfile(const ScopedProfile &) = delete; + ScopedProfile(ScopedProfile &&) = delete; + ScopedProfile &operator=(const ScopedProfile &) = delete; + ScopedProfile &operator=(ScopedProfile &&) = delete; + ~ScopedProfile() noexcept { - if (UNLIKELY(context_->is_profile_query)) { + if (IsProfiling()) [[unlikely]] { stats_->num_cycles += utils::ReadTSC() - start_time_; // Restore the old root ("pop") @@ -70,10 +109,12 @@ class ScopedProfile { } private: + [[nodiscard]] bool IsProfiling() const { return context_->is_profile_query; } + query::v2::ExecutionContext *context_; ProfilingStats *root_{nullptr}; ProfilingStats *stats_{nullptr}; - unsigned long long start_time_{0}; + uint64_t start_time_{0}; }; } // namespace memgraph::query::v2::plan diff --git a/src/utils/tsc.cpp b/src/utils/tsc.cpp index fa87f9754..378c6e9be 100644 --- a/src/utils/tsc.cpp +++ b/src/utils/tsc.cpp @@ -18,7 +18,7 @@ extern "C" { #include "utils/tsc.hpp" namespace memgraph::utils { -uint64_t ReadTSC() { return rdtsc(); } +uint64_t ReadTSC() noexcept { return rdtsc(); } std::optional GetTSCFrequency() { // init is only needed for fetching frequency diff --git a/src/utils/tsc.hpp b/src/utils/tsc.hpp index 40344ddaf..1c0f7a5f0 100644 --- a/src/utils/tsc.hpp +++ b/src/utils/tsc.hpp @@ -18,7 +18,7 @@ namespace memgraph::utils { // TSC stands for Time-Stamp Counter -uint64_t ReadTSC(); +uint64_t ReadTSC() noexcept; std::optional GetTSCFrequency();