Merge branch 'project-pineapples' into T1105-MG-profile-query-in-distributed
This commit is contained in:
commit
d920d7c293
@ -19,6 +19,7 @@
|
||||
|
||||
#include "io/errors.hpp"
|
||||
#include "io/message_conversion.hpp"
|
||||
#include "io/message_histogram_collector.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "io/transport.hpp"
|
||||
|
||||
@ -28,6 +29,7 @@ class LocalTransportHandle {
|
||||
mutable std::mutex mu_{};
|
||||
mutable std::condition_variable cv_;
|
||||
bool should_shut_down_ = false;
|
||||
MessageHistogramCollector histograms_;
|
||||
|
||||
// the responses to requests that are being waited on
|
||||
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
|
||||
@ -54,6 +56,11 @@ class LocalTransportHandle {
|
||||
return should_shut_down_;
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return histograms_.ResponseLatencies();
|
||||
}
|
||||
|
||||
static Time Now() {
|
||||
auto nano_time = std::chrono::system_clock::now();
|
||||
return std::chrono::time_point_cast<std::chrono::microseconds>(nano_time);
|
||||
@ -97,11 +104,14 @@ class LocalTransportHandle {
|
||||
|
||||
template <Message M>
|
||||
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
|
||||
auto type_info = TypeInfoFor(message);
|
||||
|
||||
std::any message_any(std::forward<M>(message));
|
||||
OpaqueMessage opaque_message{.to_address = to_address,
|
||||
.from_address = from_address,
|
||||
.request_id = request_id,
|
||||
.message = std::move(message_any)};
|
||||
.message = std::move(message_any),
|
||||
.type_info = type_info};
|
||||
|
||||
PromiseKey promise_key{
|
||||
.requester_address = to_address, .request_id = opaque_message.request_id, .replier_address = from_address};
|
||||
@ -115,7 +125,10 @@ class LocalTransportHandle {
|
||||
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
|
||||
promises_.erase(promise_key);
|
||||
|
||||
dop.promise.Fill(std::move(opaque_message));
|
||||
Duration response_latency = Now() - dop.requested_at;
|
||||
|
||||
dop.promise.Fill(std::move(opaque_message), response_latency);
|
||||
histograms_.Measure(type_info, response_latency);
|
||||
} else {
|
||||
spdlog::info("placing message in can_receive_");
|
||||
can_receive_.emplace_back(std::move(opaque_message));
|
||||
@ -132,7 +145,9 @@ class LocalTransportHandle {
|
||||
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
|
||||
|
||||
MG_ASSERT(port_matches && ip_matches);
|
||||
const Time deadline = Now() + timeout;
|
||||
|
||||
const auto now = Now();
|
||||
const Time deadline = now + timeout;
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
@ -140,7 +155,7 @@ class LocalTransportHandle {
|
||||
PromiseKey promise_key{
|
||||
.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
|
||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||
DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)};
|
||||
DeadlineAndOpaquePromise dop{.requested_at = now, .deadline = deadline, .promise = std::move(opaque_promise)};
|
||||
promises_.emplace(std::move(promise_key), std::move(dop));
|
||||
} // lock dropped
|
||||
|
||||
|
@ -12,13 +12,10 @@
|
||||
#pragma once
|
||||
|
||||
#include "io/transport.hpp"
|
||||
#include "utils/type_info_ref.hpp"
|
||||
|
||||
namespace memgraph::io {
|
||||
|
||||
using memgraph::io::Duration;
|
||||
using memgraph::io::Message;
|
||||
using memgraph::io::Time;
|
||||
|
||||
struct PromiseKey {
|
||||
Address requester_address;
|
||||
uint64_t request_id;
|
||||
@ -45,6 +42,7 @@ struct OpaqueMessage {
|
||||
Address from_address;
|
||||
uint64_t request_id;
|
||||
std::any message;
|
||||
utils::TypeInfoRef type_info;
|
||||
|
||||
/// Recursively tries to match a specific type from the outer
|
||||
/// variant's parameter pack against the type of the std::any,
|
||||
@ -100,7 +98,7 @@ class OpaquePromiseTraitBase {
|
||||
public:
|
||||
virtual const std::type_info *TypeInfo() const = 0;
|
||||
virtual bool IsAwaited(void *ptr) const = 0;
|
||||
virtual void Fill(void *ptr, OpaqueMessage &&) const = 0;
|
||||
virtual void Fill(void *ptr, OpaqueMessage &&, Duration) const = 0;
|
||||
virtual void TimeOut(void *ptr) const = 0;
|
||||
|
||||
virtual ~OpaquePromiseTraitBase() = default;
|
||||
@ -118,12 +116,13 @@ class OpaquePromiseTrait : public OpaquePromiseTraitBase {
|
||||
|
||||
bool IsAwaited(void *ptr) const override { return static_cast<ResponsePromise<T> *>(ptr)->IsAwaited(); };
|
||||
|
||||
void Fill(void *ptr, OpaqueMessage &&opaque_message) const override {
|
||||
void Fill(void *ptr, OpaqueMessage &&opaque_message, Duration response_latency) const override {
|
||||
T message = std::any_cast<T>(std::move(opaque_message.message));
|
||||
auto response_envelope = ResponseEnvelope<T>{.message = std::move(message),
|
||||
.request_id = opaque_message.request_id,
|
||||
.to_address = opaque_message.to_address,
|
||||
.from_address = opaque_message.from_address};
|
||||
.from_address = opaque_message.from_address,
|
||||
.response_latency = response_latency};
|
||||
auto promise = static_cast<ResponsePromise<T> *>(ptr);
|
||||
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
|
||||
unique_promise->Fill(std::move(response_envelope));
|
||||
@ -187,9 +186,9 @@ class OpaquePromise {
|
||||
ptr_ = nullptr;
|
||||
}
|
||||
|
||||
void Fill(OpaqueMessage &&opaque_message) {
|
||||
void Fill(OpaqueMessage &&opaque_message, Duration response_latency) {
|
||||
MG_ASSERT(ptr_ != nullptr);
|
||||
trait_->Fill(ptr_, std::move(opaque_message));
|
||||
trait_->Fill(ptr_, std::move(opaque_message), response_latency);
|
||||
ptr_ = nullptr;
|
||||
}
|
||||
|
||||
@ -199,18 +198,24 @@ class OpaquePromise {
|
||||
};
|
||||
|
||||
struct DeadlineAndOpaquePromise {
|
||||
Time requested_at;
|
||||
Time deadline;
|
||||
OpaquePromise promise;
|
||||
};
|
||||
|
||||
template <class From>
|
||||
std::type_info const &type_info_for_variant(From const &from) {
|
||||
std::type_info const &TypeInfoForVariant(From const &from) {
|
||||
return std::visit([](auto &&x) -> decltype(auto) { return typeid(x); }, from);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
utils::TypeInfoRef TypeInfoFor(const T & /* t */) {
|
||||
return typeid(T);
|
||||
}
|
||||
|
||||
template <typename From, typename Return, typename Head, typename... Rest>
|
||||
std::optional<Return> ConvertVariantInner(From &&a) {
|
||||
if (typeid(Head) == type_info_for_variant(a)) {
|
||||
if (typeid(Head) == TypeInfoForVariant(a)) {
|
||||
Head concrete = std::get<Head>(std::forward<From>(a));
|
||||
return concrete;
|
||||
}
|
||||
|
97
src/io/message_histogram_collector.hpp
Normal file
97
src/io/message_histogram_collector.hpp
Normal file
@ -0,0 +1,97 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <cmath>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <boost/core/demangle.hpp>
|
||||
|
||||
#include "io/time.hpp"
|
||||
#include "utils/histogram.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/type_info_ref.hpp"
|
||||
|
||||
namespace memgraph::io {
|
||||
|
||||
struct LatencyHistogramSummary {
|
||||
uint64_t count;
|
||||
Duration p0;
|
||||
Duration p50;
|
||||
Duration p75;
|
||||
Duration p90;
|
||||
Duration p95;
|
||||
Duration p975;
|
||||
Duration p99;
|
||||
Duration p999;
|
||||
Duration p9999;
|
||||
Duration p100;
|
||||
Duration sum;
|
||||
|
||||
friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummary &histo) {
|
||||
in << "{ \"count\": " << histo.count;
|
||||
in << ", \"p0\": " << histo.p0.count();
|
||||
in << ", \"p50\": " << histo.p50.count();
|
||||
in << ", \"p75\": " << histo.p75.count();
|
||||
in << ", \"p90\": " << histo.p90.count();
|
||||
in << ", \"p95\": " << histo.p95.count();
|
||||
in << ", \"p975\": " << histo.p975.count();
|
||||
in << ", \"p99\": " << histo.p99.count();
|
||||
in << ", \"p999\": " << histo.p999.count();
|
||||
in << ", \"p9999\": " << histo.p9999.count();
|
||||
in << ", \"p100\": " << histo.p100.count();
|
||||
in << ", \"sum\": " << histo.sum.count();
|
||||
in << " }";
|
||||
|
||||
return in;
|
||||
}
|
||||
};
|
||||
|
||||
class MessageHistogramCollector {
|
||||
std::unordered_map<utils::TypeInfoRef, utils::Histogram, utils::TypeInfoHasher, utils::TypeInfoEqualTo> histograms_;
|
||||
|
||||
public:
|
||||
void Measure(const std::type_info &type_info, const Duration &duration) {
|
||||
auto &histo = histograms_[type_info];
|
||||
histo.Measure(duration.count());
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
|
||||
std::unordered_map<std::string, LatencyHistogramSummary> ret{};
|
||||
|
||||
for (const auto &[type_id, histo] : histograms_) {
|
||||
std::string demangled_name = "\"" + boost::core::demangle(type_id.get().name()) + "\"";
|
||||
|
||||
LatencyHistogramSummary latency_histogram_summary{
|
||||
.count = histo.Count(),
|
||||
.p0 = Duration(static_cast<int64_t>(histo.Percentile(0.0))),
|
||||
.p50 = Duration(static_cast<int64_t>(histo.Percentile(50.0))),
|
||||
.p75 = Duration(static_cast<int64_t>(histo.Percentile(75.0))),
|
||||
.p90 = Duration(static_cast<int64_t>(histo.Percentile(90.0))),
|
||||
.p95 = Duration(static_cast<int64_t>(histo.Percentile(95.0))),
|
||||
.p975 = Duration(static_cast<int64_t>(histo.Percentile(97.5))),
|
||||
.p99 = Duration(static_cast<int64_t>(histo.Percentile(99.0))),
|
||||
.p999 = Duration(static_cast<int64_t>(histo.Percentile(99.9))),
|
||||
.p9999 = Duration(static_cast<int64_t>(histo.Percentile(99.99))),
|
||||
.p100 = Duration(static_cast<int64_t>(histo.Percentile(100.0))),
|
||||
.sum = Duration(histo.Sum()),
|
||||
};
|
||||
|
||||
ret.emplace(demangled_name, latency_histogram_summary);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace memgraph::io
|
@ -22,6 +22,7 @@
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "io/message_conversion.hpp"
|
||||
#include "io/simulator/simulator.hpp"
|
||||
#include "io/transport.hpp"
|
||||
#include "utils/concepts.hpp"
|
||||
@ -88,6 +89,26 @@ struct ReadResponse {
|
||||
std::optional<Address> retry_leader;
|
||||
};
|
||||
|
||||
template <class... ReadReturn>
|
||||
utils::TypeInfoRef TypeInfoFor(const ReadResponse<std::variant<ReadReturn...>> &read_response) {
|
||||
return TypeInfoForVariant(read_response.read_return);
|
||||
}
|
||||
|
||||
template <class ReadReturn>
|
||||
utils::TypeInfoRef TypeInfoFor(const ReadResponse<ReadReturn> & /* read_response */) {
|
||||
return typeid(ReadReturn);
|
||||
}
|
||||
|
||||
template <class... WriteReturn>
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteResponse<std::variant<WriteReturn...>> &write_response) {
|
||||
return TypeInfoForVariant(write_response.write_return);
|
||||
}
|
||||
|
||||
template <class WriteReturn>
|
||||
utils::TypeInfoRef TypeInfoFor(const WriteResponse<WriteReturn> & /* write_response */) {
|
||||
return typeid(WriteReturn);
|
||||
}
|
||||
|
||||
/// AppendRequest is a raft-level message that the Leader
|
||||
/// periodically broadcasts to all Follower peers. This
|
||||
/// serves three main roles:
|
||||
|
@ -31,6 +31,11 @@ bool SimulatorHandle::ShouldShutDown() const {
|
||||
return should_shut_down_;
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, LatencyHistogramSummary> SimulatorHandle::ResponseLatencies() {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
return histograms_.ResponseLatencies();
|
||||
}
|
||||
|
||||
void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address address) {
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
server_addresses_.insert(address);
|
||||
@ -119,7 +124,10 @@ bool SimulatorHandle::MaybeTickSimulator() {
|
||||
dop.promise.TimeOut();
|
||||
} else {
|
||||
stats_.total_responses++;
|
||||
dop.promise.Fill(std::move(opaque_message));
|
||||
Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
|
||||
auto type_info = opaque_message.type_info;
|
||||
dop.promise.Fill(std::move(opaque_message), response_latency);
|
||||
histograms_.Measure(type_info, response_latency);
|
||||
}
|
||||
} else if (should_drop) {
|
||||
// don't add it anywhere, let it drop
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/message_conversion.hpp"
|
||||
#include "io/message_histogram_collector.hpp"
|
||||
#include "io/simulator/simulator_config.hpp"
|
||||
#include "io/simulator/simulator_stats.hpp"
|
||||
#include "io/time.hpp"
|
||||
@ -54,6 +55,7 @@ class SimulatorHandle {
|
||||
std::uniform_int_distribution<int> time_distrib_{5, 50};
|
||||
std::uniform_int_distribution<int> drop_distrib_{0, 99};
|
||||
SimulatorConfig config_;
|
||||
MessageHistogramCollector histograms_;
|
||||
|
||||
void TimeoutPromisesPastDeadline() {
|
||||
const Time now = cluster_wide_time_microseconds_;
|
||||
@ -76,6 +78,8 @@ class SimulatorHandle {
|
||||
explicit SimulatorHandle(SimulatorConfig config)
|
||||
: cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {}
|
||||
|
||||
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies();
|
||||
|
||||
~SimulatorHandle() {
|
||||
for (auto it = promises_.begin(); it != promises_.end();) {
|
||||
auto &[promise_key, dop] = *it;
|
||||
@ -99,6 +103,8 @@ class SimulatorHandle {
|
||||
template <Message Request, Message Response>
|
||||
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request,
|
||||
Duration timeout, ResponsePromise<Response> &&promise) {
|
||||
auto type_info = TypeInfoFor(request);
|
||||
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
|
||||
const Time deadline = cluster_wide_time_microseconds_ + timeout;
|
||||
@ -107,12 +113,17 @@ class SimulatorHandle {
|
||||
OpaqueMessage om{.to_address = to_address,
|
||||
.from_address = from_address,
|
||||
.request_id = request_id,
|
||||
.message = std::move(message)};
|
||||
.message = std::move(message),
|
||||
.type_info = type_info};
|
||||
in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
|
||||
|
||||
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
|
||||
OpaquePromise opaque_promise(std::move(promise).ToUnique());
|
||||
DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)};
|
||||
DeadlineAndOpaquePromise dop{
|
||||
.requested_at = cluster_wide_time_microseconds_,
|
||||
.deadline = deadline,
|
||||
.promise = std::move(opaque_promise),
|
||||
};
|
||||
promises_.emplace(std::move(promise_key), std::move(dop));
|
||||
|
||||
stats_.total_messages++;
|
||||
@ -164,12 +175,14 @@ class SimulatorHandle {
|
||||
|
||||
template <Message M>
|
||||
void Send(Address to_address, Address from_address, RequestId request_id, M message) {
|
||||
auto type_info = TypeInfoFor(message);
|
||||
std::unique_lock<std::mutex> lock(mu_);
|
||||
std::any message_any(std::move(message));
|
||||
OpaqueMessage om{.to_address = to_address,
|
||||
.from_address = from_address,
|
||||
.request_id = request_id,
|
||||
.message = std::move(message_any)};
|
||||
.message = std::move(message_any),
|
||||
.type_info = type_info};
|
||||
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
|
||||
|
||||
stats_.total_messages++;
|
||||
|
@ -63,5 +63,9 @@ class SimulatorTransport {
|
||||
Return Rand(D distrib) {
|
||||
return distrib(rng_);
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
|
||||
return simulator_handle_->ResponseLatencies();
|
||||
}
|
||||
};
|
||||
}; // namespace memgraph::io::simulator
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "io/address.hpp"
|
||||
#include "io/errors.hpp"
|
||||
#include "io/future.hpp"
|
||||
#include "io/message_histogram_collector.hpp"
|
||||
#include "io/time.hpp"
|
||||
#include "utils/result.hpp"
|
||||
|
||||
@ -40,6 +41,7 @@ struct ResponseEnvelope {
|
||||
RequestId request_id;
|
||||
Address to_address;
|
||||
Address from_address;
|
||||
Duration response_latency;
|
||||
};
|
||||
|
||||
template <Message M>
|
||||
@ -140,5 +142,9 @@ class Io {
|
||||
void SetAddress(Address address) { address_ = address; }
|
||||
|
||||
Io<I> ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); }
|
||||
|
||||
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
|
||||
return implementation_.ResponseLatencies();
|
||||
}
|
||||
};
|
||||
}; // namespace memgraph::io
|
||||
|
114
src/utils/histogram.hpp
Normal file
114
src/utils/histogram.hpp
Normal file
@ -0,0 +1,114 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cmath>
|
||||
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
// This is a logarithmically bucketing histogram optimized
|
||||
// for collecting network response latency distributions.
|
||||
// It "compresses" values by mapping them to a point on a
|
||||
// logarithmic curve, which serves as the bucket index. This
|
||||
// compression technique allows for very accurate histograms
|
||||
// (unlike what is the case for sampling or lossy probabilistic
|
||||
// approaches) with the trade-off that we sacrifice around 1%
|
||||
// precision.
|
||||
//
|
||||
// properties:
|
||||
// * roughly 1% precision loss - can be higher for values
|
||||
// less than 100, so if measuring latency, generally do
|
||||
// so in microseconds.
|
||||
// * ~32kb constant space, single allocation per Histogram.
|
||||
// * Histogram::Percentile() will return 0 if there were no
|
||||
// samples measured yet.
|
||||
class Histogram {
|
||||
// This is the number of buckets that observed values
|
||||
// will be logarithmically compressed into.
|
||||
constexpr static auto kSampleLimit = 4096;
|
||||
|
||||
// This is roughly 1/error rate, where 100.0 is roughly
|
||||
// a 1% error bound for measurements. This is less true
|
||||
// for tiny measurements, but because we tend to measure
|
||||
// microseconds, it is usually over 100, which is where
|
||||
// the error bound starts to stabilize a bit. This has
|
||||
// been tuned to allow the maximum uint64_t to compress
|
||||
// within 4096 samples while still achieving a high accuracy.
|
||||
constexpr static auto kPrecision = 92.0;
|
||||
|
||||
// samples_ stores per-bucket counts for measurements
|
||||
// that have been mapped to a specific uint64_t in
|
||||
// the "compression" logic below.
|
||||
std::vector<uint64_t> samples_ = {};
|
||||
|
||||
// count_ is the number of measurements that have been
|
||||
// included in this Histogram.
|
||||
uint64_t count_ = 0;
|
||||
|
||||
// sum_ is the summed value of all measurements that
|
||||
// have been included in this Histogram.
|
||||
uint64_t sum_ = 0;
|
||||
|
||||
public:
|
||||
Histogram() { samples_.resize(kSampleLimit, 0); }
|
||||
|
||||
uint64_t Count() const { return count_; }
|
||||
|
||||
uint64_t Sum() const { return sum_; }
|
||||
|
||||
void Measure(uint64_t value) {
|
||||
// "compression" logic
|
||||
double boosted = 1.0 + static_cast<double>(value);
|
||||
double ln = std::log(boosted);
|
||||
double compressed = (kPrecision * ln) + 0.5;
|
||||
|
||||
MG_ASSERT(compressed < kSampleLimit, "compressing value {} to {} is invalid", value, compressed);
|
||||
auto sample_index = static_cast<uint16_t>(compressed);
|
||||
|
||||
count_++;
|
||||
samples_[sample_index]++;
|
||||
sum_ += value;
|
||||
}
|
||||
|
||||
uint64_t Percentile(double percentile) const {
|
||||
MG_ASSERT(percentile <= 100.0, "percentiles must not exceed 100.0");
|
||||
MG_ASSERT(percentile >= 0.0, "percentiles must be greater than or equal to 0.0");
|
||||
|
||||
if (count_ == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const auto floated_count = static_cast<double>(count_);
|
||||
const auto target = std::max(floated_count * percentile / 100.0, 1.0);
|
||||
|
||||
auto scanned = 0.0;
|
||||
|
||||
for (int i = 0; i < kSampleLimit; i++) {
|
||||
const auto samples_at_index = samples_[i];
|
||||
scanned += static_cast<double>(samples_at_index);
|
||||
if (scanned >= target) {
|
||||
// "decompression" logic
|
||||
auto floated = static_cast<double>(i);
|
||||
auto unboosted = floated / kPrecision;
|
||||
auto decompressed = std::exp(unboosted) - 1.0;
|
||||
return static_cast<uint64_t>(decompressed);
|
||||
}
|
||||
}
|
||||
|
||||
LOG_FATAL("bug in Histogram::Percentile where it failed to return the {} percentile", percentile);
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace memgraph::utils
|
@ -13,6 +13,7 @@
|
||||
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace memgraph::utils::print_helpers {
|
||||
@ -49,6 +50,23 @@ std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
|
||||
return in;
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V> &map) {
|
||||
in << "{";
|
||||
bool first = true;
|
||||
for (const auto &[a, b] : map) {
|
||||
if (!first) {
|
||||
in << ", ";
|
||||
}
|
||||
first = false;
|
||||
in << a;
|
||||
in << ": ";
|
||||
in << b;
|
||||
}
|
||||
in << "}";
|
||||
return in;
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
std::ostream &operator<<(std::ostream &in, const std::pair<K, V> &pair) {
|
||||
const auto &[a, b] = pair;
|
||||
|
29
src/utils/type_info_ref.hpp
Normal file
29
src/utils/type_info_ref.hpp
Normal file
@ -0,0 +1,29 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <typeinfo>
|
||||
|
||||
namespace memgraph::utils {
|
||||
|
||||
using TypeInfoRef = std::reference_wrapper<const std::type_info>;
|
||||
|
||||
struct TypeInfoHasher {
|
||||
std::size_t operator()(TypeInfoRef code) const { return code.get().hash_code(); }
|
||||
};
|
||||
|
||||
struct TypeInfoEqualTo {
|
||||
bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const { return lhs.get() == rhs.get(); }
|
||||
};
|
||||
|
||||
} // namespace memgraph::utils
|
@ -12,6 +12,7 @@
|
||||
#include <thread>
|
||||
|
||||
#include "io/simulator/simulator.hpp"
|
||||
#include "utils/print_helpers.hpp"
|
||||
|
||||
using memgraph::io::Address;
|
||||
using memgraph::io::Io;
|
||||
@ -77,11 +78,15 @@ int main() {
|
||||
std::cout << "[CLIENT] Got a valid response" << std::endl;
|
||||
auto env = res_rez.GetValue();
|
||||
MG_ASSERT(env.message.highest_seen == i);
|
||||
std::cout << "response latency: " << env.response_latency.count() << " microseconds" << std::endl;
|
||||
} else {
|
||||
std::cout << "[CLIENT] Got an error" << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
using memgraph::utils::print_helpers::operator<<;
|
||||
std::cout << "response latencies: " << cli_io.ResponseLatencies() << std::endl;
|
||||
|
||||
simulator.ShutDown();
|
||||
return 0;
|
||||
}
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "query/v2/shard_request_manager.hpp"
|
||||
#include "testing_constants.hpp"
|
||||
#include "utils/print_helpers.hpp"
|
||||
#include "utils/variant_helpers.hpp"
|
||||
|
||||
namespace memgraph::tests::simulation {
|
||||
@ -203,6 +204,7 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
|
||||
auto machine_1_addr = cli_addr.ForkUniqueAddress();
|
||||
|
||||
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
|
||||
Io<SimulatorTransport> cli_io_2 = simulator.Register(Address::TestAddress(2));
|
||||
|
||||
auto coordinator_addresses = std::vector{
|
||||
machine_1_addr,
|
||||
@ -245,6 +247,11 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
|
||||
spdlog::info("total responses: {}", stats.total_responses);
|
||||
spdlog::info("simulator ticks: {}", stats.simulator_ticks);
|
||||
|
||||
auto histo = cli_io_2.ResponseLatencies();
|
||||
|
||||
using memgraph::utils::print_helpers::operator<<;
|
||||
std::cout << "response latencies: " << histo << std::endl;
|
||||
|
||||
spdlog::info("========================== SUCCESS :) ==========================");
|
||||
}
|
||||
|
||||
|
@ -275,6 +275,9 @@ target_link_libraries(${test_prefix}utils_settings mg-utils mg-settings)
|
||||
add_unit_test(utils_temporal utils_temporal.cpp)
|
||||
target_link_libraries(${test_prefix}utils_temporal mg-utils)
|
||||
|
||||
add_unit_test(utils_histogram.cpp)
|
||||
target_link_libraries(${test_prefix}utils_histogram mg-utils)
|
||||
|
||||
# Test mg-storage-v2
|
||||
add_unit_test(commit_log_v2.cpp)
|
||||
target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2)
|
||||
|
46
tests/unit/utils_histogram.cpp
Normal file
46
tests/unit/utils_histogram.cpp
Normal file
@ -0,0 +1,46 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <gmock/gmock.h>
|
||||
|
||||
#include "utils/histogram.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
TEST(Histogram, BasicFunctionality) {
|
||||
memgraph::utils::Histogram histo{};
|
||||
|
||||
for (int i = 0; i < 9000; i++) {
|
||||
histo.Measure(10);
|
||||
}
|
||||
for (int i = 0; i < 900; i++) {
|
||||
histo.Measure(25);
|
||||
}
|
||||
for (int i = 0; i < 90; i++) {
|
||||
histo.Measure(33);
|
||||
}
|
||||
for (int i = 0; i < 9; i++) {
|
||||
histo.Measure(47);
|
||||
}
|
||||
histo.Measure(500);
|
||||
|
||||
ASSERT_EQ(histo.Percentile(0.0), 10);
|
||||
ASSERT_EQ(histo.Percentile(99.0), 25);
|
||||
ASSERT_EQ(histo.Percentile(99.89), 32);
|
||||
ASSERT_EQ(histo.Percentile(99.99), 46);
|
||||
ASSERT_EQ(histo.Percentile(100.0), 500);
|
||||
|
||||
uint64_t max = std::numeric_limits<uint64_t>::max();
|
||||
histo.Measure(max);
|
||||
auto observed_max = static_cast<double>(histo.Percentile(100.0));
|
||||
auto diff = (max - observed_max) / max;
|
||||
ASSERT_THAT(diff, testing::Lt(0.01));
|
||||
}
|
Loading…
Reference in New Issue
Block a user