Merge branch 'project-pineapples' into T1138-MG-fix-edge-id-allocator

This commit is contained in:
János Benjamin Antal 2022-11-01 15:56:12 +01:00 committed by GitHub
commit 50e72a7c28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 147 additions and 53 deletions

View File

@ -116,7 +116,7 @@ declare -A primary_urls=(
["pymgclient"]="http://$local_cache_host/git/pymgclient.git"
["mgconsole"]="http://$local_cache_host/git/mgconsole.git"
["spdlog"]="http://$local_cache_host/git/spdlog"
["nlohmann"]="http://$local_cache_host/file/nlohmann/json/4f8fba14066156b73f1189a2b8bd568bde5284c5/single_include/nlohmann/json.hpp"
["nlohmann"]="http://$local_cache_host/file/nlohmann/json/9d69186291aca4f0137b69c1dee313b391ff564c/single_include/nlohmann/json.hpp"
["neo4j"]="http://$local_cache_host/file/neo4j-community-3.2.3-unix.tar.gz"
["librdkafka"]="http://$local_cache_host/git/librdkafka.git"
["protobuf"]="http://$local_cache_host/git/protobuf.git"
@ -141,7 +141,7 @@ declare -A secondary_urls=(
["pymgclient"]="https://github.com/memgraph/pymgclient.git"
["mgconsole"]="http://github.com/memgraph/mgconsole.git"
["spdlog"]="https://github.com/gabime/spdlog"
["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/4f8fba14066156b73f1189a2b8bd568bde5284c5/single_include/nlohmann/json.hpp"
["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/9d69186291aca4f0137b69c1dee313b391ff564c/single_include/nlohmann/json.hpp"
["neo4j"]="https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/neo4j-community-3.2.3-unix.tar.gz"
["librdkafka"]="https://github.com/edenhill/librdkafka.git"
["protobuf"]="https://github.com/protocolbuffers/protobuf.git"

View File

@ -1043,7 +1043,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
auto rw_type_checker = plan::ReadWriteTypeChecker();
rw_type_checker.InferRWType(const_cast<plan::LogicalOperator &>(cypher_query_plan->plan()));
return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"},
return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME", "CUSTOM DATA"},
std::move(parsed_query.required_privileges),
[plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters),
summary, dba, interpreter_context, execution_memory, memory_limit, shard_request_manager,

View File

@ -155,7 +155,16 @@ uint64_t ComputeProfilingKey(const T *obj) {
} // namespace
#define SCOPED_PROFILE_OP(name) ScopedProfile profile{ComputeProfilingKey(this), name, &context};
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define SCOPED_PROFILE_OP(name) \
ScopedProfile profile { ComputeProfilingKey(this), name, &context }
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define SCOPED_CUSTOM_PROFILE(name) \
ScopedCustomProfile custom_profile { name, context }
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define SCOPED_REQUEST_WAIT_PROFILE SCOPED_CUSTOM_PROFILE("request_wait")
class DistributedCreateNodeCursor : public Cursor {
public:
@ -168,7 +177,10 @@ class DistributedCreateNodeCursor : public Cursor {
SCOPED_PROFILE_OP("CreateNode");
if (input_cursor_->Pull(frame, context)) {
auto &shard_manager = context.shard_request_manager;
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
{
SCOPED_REQUEST_WAIT_PROFILE;
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
}
PlaceNodeOnTheFrame(frame, context);
return true;
}
@ -373,38 +385,43 @@ class DistributedScanAllAndFilterCursor : public Cursor {
using VertexAccessor = accessors::VertexAccessor;
bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager) {
current_batch = shard_manager.Request(request_state_);
bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager, ExecutionContext &context) {
{
SCOPED_REQUEST_WAIT_PROFILE;
current_batch = shard_manager.Request(request_state_);
}
current_vertex_it = current_batch.begin();
return !current_batch.empty();
}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_);
auto &shard_manager = *context.shard_request_manager;
if (MustAbort(context)) {
throw HintedAbortError();
}
using State = msgs::ExecutionState<msgs::ScanVerticesRequest>;
if (request_state_.state == State::INITIALIZING) {
if (!input_cursor_->Pull(frame, context)) {
return false;
while (true) {
if (MustAbort(context)) {
throw HintedAbortError();
}
}
using State = msgs::ExecutionState<msgs::ScanVerticesRequest>;
request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt;
if (request_state_.state == State::INITIALIZING) {
if (!input_cursor_->Pull(frame, context)) {
return false;
}
}
if (current_vertex_it == current_batch.end()) {
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) {
request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt;
if (current_vertex_it == current_batch.end() &&
(request_state_.state == State::COMPLETED || !MakeRequest(shard_manager, context))) {
ResetExecutionState();
return Pull(frame, context);
continue;
}
}
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
++current_vertex_it;
return true;
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it));
++current_vertex_it;
return true;
}
}
void Shutdown() override { input_cursor_->Shutdown(); }
@ -2379,7 +2396,10 @@ class DistributedCreateExpandCursor : public Cursor {
}
auto &shard_manager = context.shard_request_manager;
ResetExecutionState();
shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame));
{
SCOPED_REQUEST_WAIT_PROFILE;
shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame));
}
return true;
}
@ -2535,7 +2555,11 @@ class DistributedExpandCursor : public Cursor {
request.edge_properties.emplace();
request.src_vertices.push_back(vertex.Id());
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
auto result_rows = std::invoke([&context, &request_state, &request]() mutable {
SCOPED_REQUEST_WAIT_PROFILE;
return context.shard_request_manager->Request(request_state, std::move(request));
});
MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front();
if (self_.common_.existing_node) {

View File

@ -24,18 +24,18 @@ namespace memgraph::query::v2::plan {
namespace {
unsigned long long IndividualCycles(const ProfilingStats &cumulative_stats) {
uint64_t IndividualCycles(const ProfilingStats &cumulative_stats) {
return cumulative_stats.num_cycles - std::accumulate(cumulative_stats.children.begin(),
cumulative_stats.children.end(), 0ULL,
[](auto acc, auto &stats) { return acc + stats.num_cycles; });
}
double RelativeTime(unsigned long long num_cycles, unsigned long long total_cycles) {
return static_cast<double>(num_cycles) / total_cycles;
double RelativeTime(const uint64_t num_cycles, const uint64_t total_cycles) {
return static_cast<double>(num_cycles) / static_cast<double>(total_cycles);
}
double AbsoluteTime(unsigned long long num_cycles, unsigned long long total_cycles,
std::chrono::duration<double> total_time) {
double AbsoluteTime(const uint64_t num_cycles, const uint64_t total_cycles,
const std::chrono::duration<double> total_time) {
return (RelativeTime(num_cycles, total_cycles) * static_cast<std::chrono::duration<double, std::milli>>(total_time))
.count();
}
@ -50,26 +50,29 @@ namespace {
class ProfilingStatsToTableHelper {
public:
ProfilingStatsToTableHelper(unsigned long long total_cycles, std::chrono::duration<double> total_time)
ProfilingStatsToTableHelper(uint64_t total_cycles, std::chrono::duration<double> total_time)
: total_cycles_(total_cycles), total_time_(total_time) {}
void Output(const ProfilingStats &cumulative_stats) {
auto cycles = IndividualCycles(cumulative_stats);
auto custom_data_copy = cumulative_stats.custom_data;
ConvertCyclesToTime(custom_data_copy);
rows_.emplace_back(std::vector<TypedValue>{
TypedValue(FormatOperator(cumulative_stats.name)), TypedValue(cumulative_stats.actual_hits),
TypedValue(FormatRelativeTime(cycles)), TypedValue(FormatAbsoluteTime(cycles))});
rows_.emplace_back(
std::vector<TypedValue>{TypedValue(FormatOperator(cumulative_stats.name)),
TypedValue(cumulative_stats.actual_hits), TypedValue(FormatRelativeTime(cycles)),
TypedValue(FormatAbsoluteTime(cycles)), TypedValue(custom_data_copy.dump())});
for (size_t i = 1; i < cumulative_stats.children.size(); ++i) {
Branch(cumulative_stats.children[i]);
}
if (cumulative_stats.children.size() >= 1) {
if (!cumulative_stats.children.empty()) {
Output(cumulative_stats.children[0]);
}
}
std::vector<std::vector<TypedValue>> rows() { return rows_; }
std::vector<std::vector<TypedValue>> rows() const { return rows_; }
private:
void Branch(const ProfilingStats &cumulative_stats) {
@ -80,7 +83,28 @@ class ProfilingStatsToTableHelper {
--depth_;
}
std::string Format(const char *str) {
double AbsoluteTime(const uint64_t cycles) const { return plan::AbsoluteTime(cycles, total_cycles_, total_time_); }
double RelativeTime(const uint64_t cycles) const { return plan::RelativeTime(cycles, total_cycles_); }
void ConvertCyclesToTime(nlohmann::json &custom_data) const {
const auto convert_cycles_in_json = [this](nlohmann::json &json) {
if (!json.is_object()) {
return;
}
if (auto it = json.find(ProfilingStats::kNumCycles); it != json.end()) {
auto num_cycles = it.value().get<uint64_t>();
json[ProfilingStats::kAbsoluteTime] = AbsoluteTime(num_cycles);
json[ProfilingStats::kRelativeTime] = RelativeTime(num_cycles);
}
};
for (auto &json : custom_data) {
convert_cycles_in_json(json);
}
}
std::string Format(const char *str) const {
std::ostringstream ss;
for (int64_t i = 0; i < depth_; ++i) {
ss << "| ";
@ -89,21 +113,21 @@ class ProfilingStatsToTableHelper {
return ss.str();
}
std::string Format(const std::string &str) { return Format(str.c_str()); }
std::string Format(const std::string &str) const { return Format(str.c_str()); }
std::string FormatOperator(const char *str) { return Format(std::string("* ") + str); }
std::string FormatOperator(const char *str) const { return Format(std::string("* ") + str); }
std::string FormatRelativeTime(unsigned long long num_cycles) {
return fmt::format("{: 10.6f} %", RelativeTime(num_cycles, total_cycles_) * 100);
std::string FormatRelativeTime(uint64_t num_cycles) const {
return fmt::format("{: 10.6f} %", RelativeTime(num_cycles) * 100);
}
std::string FormatAbsoluteTime(unsigned long long num_cycles) {
return fmt::format("{: 10.6f} ms", AbsoluteTime(num_cycles, total_cycles_, total_time_));
std::string FormatAbsoluteTime(uint64_t num_cycles) const {
return fmt::format("{: 10.6f} ms", AbsoluteTime(num_cycles));
}
int64_t depth_{0};
std::vector<std::vector<TypedValue>> rows_;
unsigned long long total_cycles_;
uint64_t total_cycles_;
std::chrono::duration<double> total_time_;
};
@ -126,7 +150,7 @@ class ProfilingStatsToJsonHelper {
using json = nlohmann::json;
public:
ProfilingStatsToJsonHelper(unsigned long long total_cycles, std::chrono::duration<double> total_time)
ProfilingStatsToJsonHelper(uint64_t total_cycles, std::chrono::duration<double> total_time)
: total_cycles_(total_cycles), total_time_(total_time) {}
void Output(const ProfilingStats &cumulative_stats) { return Output(cumulative_stats, &json_); }
@ -151,7 +175,7 @@ class ProfilingStatsToJsonHelper {
}
json json_;
unsigned long long total_cycles_;
uint64_t total_cycles_;
std::chrono::duration<double> total_time_;
};

View File

@ -26,10 +26,15 @@ namespace plan {
* Stores profiling statistics for a single logical operator.
*/
struct ProfilingStats {
static constexpr std::string_view kNumCycles{"num_cycles"};
static constexpr std::string_view kRelativeTime{"relative_time"};
static constexpr std::string_view kAbsoluteTime{"absolute_time"};
int64_t actual_hits{0};
unsigned long long num_cycles{0};
uint64_t num_cycles{0};
uint64_t key{0};
const char *name{nullptr};
nlohmann::json custom_data;
// TODO: This should use the allocator for query execution
std::vector<ProfilingStats> children;
};

View File

@ -13,6 +13,8 @@
#include <cstdint>
#include <json/json.hpp>
#include "query/v2/context.hpp"
#include "query/v2/plan/profile.hpp"
#include "utils/likely.hpp"
@ -20,6 +22,38 @@
namespace memgraph::query::v2::plan {
class ScopedCustomProfile {
public:
explicit ScopedCustomProfile(const std::string_view custom_data_name, ExecutionContext &context)
: custom_data_name_(custom_data_name), start_time_{utils::ReadTSC()}, context_{&context} {}
ScopedCustomProfile(const ScopedCustomProfile &) = delete;
ScopedCustomProfile(ScopedCustomProfile &&) = delete;
ScopedCustomProfile &operator=(const ScopedCustomProfile &) = delete;
ScopedCustomProfile &operator=(ScopedCustomProfile &&) = delete;
// If an exception is thrown in any of these functions that signals a problem that is much bigger than we could handle
// it here, thus we don't attempt to handle it.
// NOLINTNEXTLINE(bugprone-exception-escape)
~ScopedCustomProfile() {
if (nullptr != context_->stats_root) [[unlikely]] {
auto &custom_data = context_->stats_root->custom_data[custom_data_name_];
if (!custom_data.is_object()) {
custom_data = nlohmann::json::object();
}
const auto elapsed = utils::ReadTSC() - start_time_;
auto &num_cycles_json = custom_data[ProfilingStats::kNumCycles];
const auto num_cycles = num_cycles_json.is_null() ? 0 : num_cycles_json.get<uint64_t>();
num_cycles_json = num_cycles + elapsed;
}
}
private:
std::string_view custom_data_name_;
uint64_t start_time_;
ExecutionContext *context_;
};
/**
* A RAII class used for profiling logical operators. Instances of this class
* update the profiling data stored within the `ExecutionContext` object and build
@ -29,7 +63,7 @@ namespace memgraph::query::v2::plan {
class ScopedProfile {
public:
ScopedProfile(uint64_t key, const char *name, query::v2::ExecutionContext *context) noexcept : context_(context) {
if (UNLIKELY(context_->is_profile_query)) {
if (IsProfiling()) [[unlikely]] {
root_ = context_->stats_root;
// Are we the root logical operator?
@ -60,8 +94,13 @@ class ScopedProfile {
}
}
ScopedProfile(const ScopedProfile &) = delete;
ScopedProfile(ScopedProfile &&) = delete;
ScopedProfile &operator=(const ScopedProfile &) = delete;
ScopedProfile &operator=(ScopedProfile &&) = delete;
~ScopedProfile() noexcept {
if (UNLIKELY(context_->is_profile_query)) {
if (IsProfiling()) [[unlikely]] {
stats_->num_cycles += utils::ReadTSC() - start_time_;
// Restore the old root ("pop")
@ -70,10 +109,12 @@ class ScopedProfile {
}
private:
[[nodiscard]] bool IsProfiling() const { return context_->is_profile_query; }
query::v2::ExecutionContext *context_;
ProfilingStats *root_{nullptr};
ProfilingStats *stats_{nullptr};
unsigned long long start_time_{0};
uint64_t start_time_{0};
};
} // namespace memgraph::query::v2::plan

View File

@ -18,7 +18,7 @@ extern "C" {
#include "utils/tsc.hpp"
namespace memgraph::utils {
uint64_t ReadTSC() { return rdtsc(); }
uint64_t ReadTSC() noexcept { return rdtsc(); }
std::optional<double> GetTSCFrequency() {
// init is only needed for fetching frequency

View File

@ -18,7 +18,7 @@ namespace memgraph::utils {
// TSC stands for Time-Stamp Counter
uint64_t ReadTSC();
uint64_t ReadTSC() noexcept;
std::optional<double> GetTSCFrequency();