Merge branch 'project-pineapples' into T1083-MG-limit-and-order-expand-one_v3

This commit is contained in:
Jeremy B 2022-10-26 15:59:15 +02:00 committed by GitHub
commit fa6129dc2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 410 additions and 19 deletions

View File

@ -19,6 +19,7 @@
#include "io/errors.hpp" #include "io/errors.hpp"
#include "io/message_conversion.hpp" #include "io/message_conversion.hpp"
#include "io/message_histogram_collector.hpp"
#include "io/time.hpp" #include "io/time.hpp"
#include "io/transport.hpp" #include "io/transport.hpp"
@ -28,6 +29,7 @@ class LocalTransportHandle {
mutable std::mutex mu_{}; mutable std::mutex mu_{};
mutable std::condition_variable cv_; mutable std::condition_variable cv_;
bool should_shut_down_ = false; bool should_shut_down_ = false;
MessageHistogramCollector histograms_;
// the responses to requests that are being waited on // the responses to requests that are being waited on
std::map<PromiseKey, DeadlineAndOpaquePromise> promises_; std::map<PromiseKey, DeadlineAndOpaquePromise> promises_;
@ -54,6 +56,11 @@ class LocalTransportHandle {
return should_shut_down_; return should_shut_down_;
} }
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
std::unique_lock<std::mutex> lock(mu_);
return histograms_.ResponseLatencies();
}
static Time Now() { static Time Now() {
auto nano_time = std::chrono::system_clock::now(); auto nano_time = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::microseconds>(nano_time); return std::chrono::time_point_cast<std::chrono::microseconds>(nano_time);
@ -97,11 +104,14 @@ class LocalTransportHandle {
template <Message M> template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M &&message) { void Send(Address to_address, Address from_address, RequestId request_id, M &&message) {
auto type_info = TypeInfoFor(message);
std::any message_any(std::forward<M>(message)); std::any message_any(std::forward<M>(message));
OpaqueMessage opaque_message{.to_address = to_address, OpaqueMessage opaque_message{.to_address = to_address,
.from_address = from_address, .from_address = from_address,
.request_id = request_id, .request_id = request_id,
.message = std::move(message_any)}; .message = std::move(message_any),
.type_info = type_info};
PromiseKey promise_key{ PromiseKey promise_key{
.requester_address = to_address, .request_id = opaque_message.request_id, .replier_address = from_address}; .requester_address = to_address, .request_id = opaque_message.request_id, .replier_address = from_address};
@ -115,7 +125,10 @@ class LocalTransportHandle {
DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key)); DeadlineAndOpaquePromise dop = std::move(promises_.at(promise_key));
promises_.erase(promise_key); promises_.erase(promise_key);
dop.promise.Fill(std::move(opaque_message)); Duration response_latency = Now() - dop.requested_at;
dop.promise.Fill(std::move(opaque_message), response_latency);
histograms_.Measure(type_info, response_latency);
} else { } else {
spdlog::info("placing message in can_receive_"); spdlog::info("placing message in can_receive_");
can_receive_.emplace_back(std::move(opaque_message)); can_receive_.emplace_back(std::move(opaque_message));
@ -132,7 +145,9 @@ class LocalTransportHandle {
const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip; const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip;
MG_ASSERT(port_matches && ip_matches); MG_ASSERT(port_matches && ip_matches);
const Time deadline = Now() + timeout;
const auto now = Now();
const Time deadline = now + timeout;
{ {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
@ -140,7 +155,7 @@ class LocalTransportHandle {
PromiseKey promise_key{ PromiseKey promise_key{
.requester_address = from_address, .request_id = request_id, .replier_address = to_address}; .requester_address = from_address, .request_id = request_id, .replier_address = to_address};
OpaquePromise opaque_promise(std::move(promise).ToUnique()); OpaquePromise opaque_promise(std::move(promise).ToUnique());
DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)}; DeadlineAndOpaquePromise dop{.requested_at = now, .deadline = deadline, .promise = std::move(opaque_promise)};
promises_.emplace(std::move(promise_key), std::move(dop)); promises_.emplace(std::move(promise_key), std::move(dop));
} // lock dropped } // lock dropped

View File

@ -12,13 +12,10 @@
#pragma once #pragma once
#include "io/transport.hpp" #include "io/transport.hpp"
#include "utils/type_info_ref.hpp"
namespace memgraph::io { namespace memgraph::io {
using memgraph::io::Duration;
using memgraph::io::Message;
using memgraph::io::Time;
struct PromiseKey { struct PromiseKey {
Address requester_address; Address requester_address;
uint64_t request_id; uint64_t request_id;
@ -45,6 +42,7 @@ struct OpaqueMessage {
Address from_address; Address from_address;
uint64_t request_id; uint64_t request_id;
std::any message; std::any message;
utils::TypeInfoRef type_info;
/// Recursively tries to match a specific type from the outer /// Recursively tries to match a specific type from the outer
/// variant's parameter pack against the type of the std::any, /// variant's parameter pack against the type of the std::any,
@ -100,7 +98,7 @@ class OpaquePromiseTraitBase {
public: public:
virtual const std::type_info *TypeInfo() const = 0; virtual const std::type_info *TypeInfo() const = 0;
virtual bool IsAwaited(void *ptr) const = 0; virtual bool IsAwaited(void *ptr) const = 0;
virtual void Fill(void *ptr, OpaqueMessage &&) const = 0; virtual void Fill(void *ptr, OpaqueMessage &&, Duration) const = 0;
virtual void TimeOut(void *ptr) const = 0; virtual void TimeOut(void *ptr) const = 0;
virtual ~OpaquePromiseTraitBase() = default; virtual ~OpaquePromiseTraitBase() = default;
@ -118,12 +116,13 @@ class OpaquePromiseTrait : public OpaquePromiseTraitBase {
bool IsAwaited(void *ptr) const override { return static_cast<ResponsePromise<T> *>(ptr)->IsAwaited(); }; bool IsAwaited(void *ptr) const override { return static_cast<ResponsePromise<T> *>(ptr)->IsAwaited(); };
void Fill(void *ptr, OpaqueMessage &&opaque_message) const override { void Fill(void *ptr, OpaqueMessage &&opaque_message, Duration response_latency) const override {
T message = std::any_cast<T>(std::move(opaque_message.message)); T message = std::any_cast<T>(std::move(opaque_message.message));
auto response_envelope = ResponseEnvelope<T>{.message = std::move(message), auto response_envelope = ResponseEnvelope<T>{.message = std::move(message),
.request_id = opaque_message.request_id, .request_id = opaque_message.request_id,
.to_address = opaque_message.to_address, .to_address = opaque_message.to_address,
.from_address = opaque_message.from_address}; .from_address = opaque_message.from_address,
.response_latency = response_latency};
auto promise = static_cast<ResponsePromise<T> *>(ptr); auto promise = static_cast<ResponsePromise<T> *>(ptr);
auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise); auto unique_promise = std::unique_ptr<ResponsePromise<T>>(promise);
unique_promise->Fill(std::move(response_envelope)); unique_promise->Fill(std::move(response_envelope));
@ -187,9 +186,9 @@ class OpaquePromise {
ptr_ = nullptr; ptr_ = nullptr;
} }
void Fill(OpaqueMessage &&opaque_message) { void Fill(OpaqueMessage &&opaque_message, Duration response_latency) {
MG_ASSERT(ptr_ != nullptr); MG_ASSERT(ptr_ != nullptr);
trait_->Fill(ptr_, std::move(opaque_message)); trait_->Fill(ptr_, std::move(opaque_message), response_latency);
ptr_ = nullptr; ptr_ = nullptr;
} }
@ -199,18 +198,24 @@ class OpaquePromise {
}; };
struct DeadlineAndOpaquePromise { struct DeadlineAndOpaquePromise {
Time requested_at;
Time deadline; Time deadline;
OpaquePromise promise; OpaquePromise promise;
}; };
template <class From> template <class From>
std::type_info const &type_info_for_variant(From const &from) { std::type_info const &TypeInfoForVariant(From const &from) {
return std::visit([](auto &&x) -> decltype(auto) { return typeid(x); }, from); return std::visit([](auto &&x) -> decltype(auto) { return typeid(x); }, from);
} }
template <class T>
utils::TypeInfoRef TypeInfoFor(const T & /* t */) {
return typeid(T);
}
template <typename From, typename Return, typename Head, typename... Rest> template <typename From, typename Return, typename Head, typename... Rest>
std::optional<Return> ConvertVariantInner(From &&a) { std::optional<Return> ConvertVariantInner(From &&a) {
if (typeid(Head) == type_info_for_variant(a)) { if (typeid(Head) == TypeInfoForVariant(a)) {
Head concrete = std::get<Head>(std::forward<From>(a)); Head concrete = std::get<Head>(std::forward<From>(a));
return concrete; return concrete;
} }

View File

@ -0,0 +1,97 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <cmath>
#include <unordered_map>
#include <boost/core/demangle.hpp>
#include "io/time.hpp"
#include "utils/histogram.hpp"
#include "utils/logging.hpp"
#include "utils/type_info_ref.hpp"
namespace memgraph::io {
struct LatencyHistogramSummary {
uint64_t count;
Duration p0;
Duration p50;
Duration p75;
Duration p90;
Duration p95;
Duration p975;
Duration p99;
Duration p999;
Duration p9999;
Duration p100;
Duration sum;
friend std::ostream &operator<<(std::ostream &in, const LatencyHistogramSummary &histo) {
in << "{ \"count\": " << histo.count;
in << ", \"p0\": " << histo.p0.count();
in << ", \"p50\": " << histo.p50.count();
in << ", \"p75\": " << histo.p75.count();
in << ", \"p90\": " << histo.p90.count();
in << ", \"p95\": " << histo.p95.count();
in << ", \"p975\": " << histo.p975.count();
in << ", \"p99\": " << histo.p99.count();
in << ", \"p999\": " << histo.p999.count();
in << ", \"p9999\": " << histo.p9999.count();
in << ", \"p100\": " << histo.p100.count();
in << ", \"sum\": " << histo.sum.count();
in << " }";
return in;
}
};
class MessageHistogramCollector {
std::unordered_map<utils::TypeInfoRef, utils::Histogram, utils::TypeInfoHasher, utils::TypeInfoEqualTo> histograms_;
public:
void Measure(const std::type_info &type_info, const Duration &duration) {
auto &histo = histograms_[type_info];
histo.Measure(duration.count());
}
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
std::unordered_map<std::string, LatencyHistogramSummary> ret{};
for (const auto &[type_id, histo] : histograms_) {
std::string demangled_name = "\"" + boost::core::demangle(type_id.get().name()) + "\"";
LatencyHistogramSummary latency_histogram_summary{
.count = histo.Count(),
.p0 = Duration(static_cast<int64_t>(histo.Percentile(0.0))),
.p50 = Duration(static_cast<int64_t>(histo.Percentile(50.0))),
.p75 = Duration(static_cast<int64_t>(histo.Percentile(75.0))),
.p90 = Duration(static_cast<int64_t>(histo.Percentile(90.0))),
.p95 = Duration(static_cast<int64_t>(histo.Percentile(95.0))),
.p975 = Duration(static_cast<int64_t>(histo.Percentile(97.5))),
.p99 = Duration(static_cast<int64_t>(histo.Percentile(99.0))),
.p999 = Duration(static_cast<int64_t>(histo.Percentile(99.9))),
.p9999 = Duration(static_cast<int64_t>(histo.Percentile(99.99))),
.p100 = Duration(static_cast<int64_t>(histo.Percentile(100.0))),
.sum = Duration(histo.Sum()),
};
ret.emplace(demangled_name, latency_histogram_summary);
}
return ret;
}
};
} // namespace memgraph::io

View File

@ -22,6 +22,7 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "io/message_conversion.hpp"
#include "io/simulator/simulator.hpp" #include "io/simulator/simulator.hpp"
#include "io/transport.hpp" #include "io/transport.hpp"
#include "utils/concepts.hpp" #include "utils/concepts.hpp"
@ -88,6 +89,26 @@ struct ReadResponse {
std::optional<Address> retry_leader; std::optional<Address> retry_leader;
}; };
template <class... ReadReturn>
utils::TypeInfoRef TypeInfoFor(const ReadResponse<std::variant<ReadReturn...>> &read_response) {
return TypeInfoForVariant(read_response.read_return);
}
template <class ReadReturn>
utils::TypeInfoRef TypeInfoFor(const ReadResponse<ReadReturn> & /* read_response */) {
return typeid(ReadReturn);
}
template <class... WriteReturn>
utils::TypeInfoRef TypeInfoFor(const WriteResponse<std::variant<WriteReturn...>> &write_response) {
return TypeInfoForVariant(write_response.write_return);
}
template <class WriteReturn>
utils::TypeInfoRef TypeInfoFor(const WriteResponse<WriteReturn> & /* write_response */) {
return typeid(WriteReturn);
}
/// AppendRequest is a raft-level message that the Leader /// AppendRequest is a raft-level message that the Leader
/// periodically broadcasts to all Follower peers. This /// periodically broadcasts to all Follower peers. This
/// serves three main roles: /// serves three main roles:

View File

@ -31,6 +31,11 @@ bool SimulatorHandle::ShouldShutDown() const {
return should_shut_down_; return should_shut_down_;
} }
std::unordered_map<std::string, LatencyHistogramSummary> SimulatorHandle::ResponseLatencies() {
std::unique_lock<std::mutex> lock(mu_);
return histograms_.ResponseLatencies();
}
void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address address) { void SimulatorHandle::IncrementServerCountAndWaitForQuiescentState(Address address) {
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
server_addresses_.insert(address); server_addresses_.insert(address);
@ -119,7 +124,10 @@ bool SimulatorHandle::MaybeTickSimulator() {
dop.promise.TimeOut(); dop.promise.TimeOut();
} else { } else {
stats_.total_responses++; stats_.total_responses++;
dop.promise.Fill(std::move(opaque_message)); Duration response_latency = cluster_wide_time_microseconds_ - dop.requested_at;
auto type_info = opaque_message.type_info;
dop.promise.Fill(std::move(opaque_message), response_latency);
histograms_.Measure(type_info, response_latency);
} }
} else if (should_drop) { } else if (should_drop) {
// don't add it anywhere, let it drop // don't add it anywhere, let it drop

View File

@ -25,6 +25,7 @@
#include "io/address.hpp" #include "io/address.hpp"
#include "io/errors.hpp" #include "io/errors.hpp"
#include "io/message_conversion.hpp" #include "io/message_conversion.hpp"
#include "io/message_histogram_collector.hpp"
#include "io/simulator/simulator_config.hpp" #include "io/simulator/simulator_config.hpp"
#include "io/simulator/simulator_stats.hpp" #include "io/simulator/simulator_stats.hpp"
#include "io/time.hpp" #include "io/time.hpp"
@ -54,6 +55,7 @@ class SimulatorHandle {
std::uniform_int_distribution<int> time_distrib_{5, 50}; std::uniform_int_distribution<int> time_distrib_{5, 50};
std::uniform_int_distribution<int> drop_distrib_{0, 99}; std::uniform_int_distribution<int> drop_distrib_{0, 99};
SimulatorConfig config_; SimulatorConfig config_;
MessageHistogramCollector histograms_;
void TimeoutPromisesPastDeadline() { void TimeoutPromisesPastDeadline() {
const Time now = cluster_wide_time_microseconds_; const Time now = cluster_wide_time_microseconds_;
@ -76,6 +78,8 @@ class SimulatorHandle {
explicit SimulatorHandle(SimulatorConfig config) explicit SimulatorHandle(SimulatorConfig config)
: cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {} : cluster_wide_time_microseconds_(config.start_time), rng_(config.rng_seed), config_(config) {}
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies();
~SimulatorHandle() { ~SimulatorHandle() {
for (auto it = promises_.begin(); it != promises_.end();) { for (auto it = promises_.begin(); it != promises_.end();) {
auto &[promise_key, dop] = *it; auto &[promise_key, dop] = *it;
@ -99,6 +103,8 @@ class SimulatorHandle {
template <Message Request, Message Response> template <Message Request, Message Response>
void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request, void SubmitRequest(Address to_address, Address from_address, RequestId request_id, Request &&request,
Duration timeout, ResponsePromise<Response> &&promise) { Duration timeout, ResponsePromise<Response> &&promise) {
auto type_info = TypeInfoFor(request);
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
const Time deadline = cluster_wide_time_microseconds_ + timeout; const Time deadline = cluster_wide_time_microseconds_ + timeout;
@ -107,12 +113,17 @@ class SimulatorHandle {
OpaqueMessage om{.to_address = to_address, OpaqueMessage om{.to_address = to_address,
.from_address = from_address, .from_address = from_address,
.request_id = request_id, .request_id = request_id,
.message = std::move(message)}; .message = std::move(message),
.type_info = type_info};
in_flight_.emplace_back(std::make_pair(to_address, std::move(om))); in_flight_.emplace_back(std::make_pair(to_address, std::move(om)));
PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address}; PromiseKey promise_key{.requester_address = from_address, .request_id = request_id, .replier_address = to_address};
OpaquePromise opaque_promise(std::move(promise).ToUnique()); OpaquePromise opaque_promise(std::move(promise).ToUnique());
DeadlineAndOpaquePromise dop{.deadline = deadline, .promise = std::move(opaque_promise)}; DeadlineAndOpaquePromise dop{
.requested_at = cluster_wide_time_microseconds_,
.deadline = deadline,
.promise = std::move(opaque_promise),
};
promises_.emplace(std::move(promise_key), std::move(dop)); promises_.emplace(std::move(promise_key), std::move(dop));
stats_.total_messages++; stats_.total_messages++;
@ -164,12 +175,14 @@ class SimulatorHandle {
template <Message M> template <Message M>
void Send(Address to_address, Address from_address, RequestId request_id, M message) { void Send(Address to_address, Address from_address, RequestId request_id, M message) {
auto type_info = TypeInfoFor(message);
std::unique_lock<std::mutex> lock(mu_); std::unique_lock<std::mutex> lock(mu_);
std::any message_any(std::move(message)); std::any message_any(std::move(message));
OpaqueMessage om{.to_address = to_address, OpaqueMessage om{.to_address = to_address,
.from_address = from_address, .from_address = from_address,
.request_id = request_id, .request_id = request_id,
.message = std::move(message_any)}; .message = std::move(message_any),
.type_info = type_info};
in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om))); in_flight_.emplace_back(std::make_pair(std::move(to_address), std::move(om)));
stats_.total_messages++; stats_.total_messages++;

View File

@ -63,5 +63,9 @@ class SimulatorTransport {
Return Rand(D distrib) { Return Rand(D distrib) {
return distrib(rng_); return distrib(rng_);
} }
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
return simulator_handle_->ResponseLatencies();
}
}; };
}; // namespace memgraph::io::simulator }; // namespace memgraph::io::simulator

View File

@ -19,6 +19,7 @@
#include "io/address.hpp" #include "io/address.hpp"
#include "io/errors.hpp" #include "io/errors.hpp"
#include "io/future.hpp" #include "io/future.hpp"
#include "io/message_histogram_collector.hpp"
#include "io/time.hpp" #include "io/time.hpp"
#include "utils/result.hpp" #include "utils/result.hpp"
@ -40,6 +41,7 @@ struct ResponseEnvelope {
RequestId request_id; RequestId request_id;
Address to_address; Address to_address;
Address from_address; Address from_address;
Duration response_latency;
}; };
template <Message M> template <Message M>
@ -140,5 +142,9 @@ class Io {
void SetAddress(Address address) { address_ = address; } void SetAddress(Address address) { address_ = address; }
Io<I> ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); } Io<I> ForkLocal() { return Io(implementation_, address_.ForkUniqueAddress()); }
std::unordered_map<std::string, LatencyHistogramSummary> ResponseLatencies() {
return implementation_.ResponseLatencies();
}
}; };
}; // namespace memgraph::io }; // namespace memgraph::io

114
src/utils/histogram.hpp Normal file
View File

@ -0,0 +1,114 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cmath>
#include "utils/logging.hpp"
namespace memgraph::utils {
// This is a logarithmically bucketing histogram optimized
// for collecting network response latency distributions.
// It "compresses" values by mapping them to a point on a
// logarithmic curve, which serves as the bucket index. This
// compression technique allows for very accurate histograms
// (unlike what is the case for sampling or lossy probabilistic
// approaches) with the trade-off that we sacrifice around 1%
// precision.
//
// properties:
// * roughly 1% precision loss - can be higher for values
// less than 100, so if measuring latency, generally do
// so in microseconds.
// * ~32kb constant space, single allocation per Histogram.
// * Histogram::Percentile() will return 0 if there were no
// samples measured yet.
class Histogram {
// This is the number of buckets that observed values
// will be logarithmically compressed into.
constexpr static auto kSampleLimit = 4096;
// This is roughly 1/error rate, where 100.0 is roughly
// a 1% error bound for measurements. This is less true
// for tiny measurements, but because we tend to measure
// microseconds, it is usually over 100, which is where
// the error bound starts to stabilize a bit. This has
// been tuned to allow the maximum uint64_t to compress
// within 4096 samples while still achieving a high accuracy.
constexpr static auto kPrecision = 92.0;
// samples_ stores per-bucket counts for measurements
// that have been mapped to a specific uint64_t in
// the "compression" logic below.
std::vector<uint64_t> samples_ = {};
// count_ is the number of measurements that have been
// included in this Histogram.
uint64_t count_ = 0;
// sum_ is the summed value of all measurements that
// have been included in this Histogram.
uint64_t sum_ = 0;
public:
Histogram() { samples_.resize(kSampleLimit, 0); }
uint64_t Count() const { return count_; }
uint64_t Sum() const { return sum_; }
void Measure(uint64_t value) {
// "compression" logic
double boosted = 1.0 + static_cast<double>(value);
double ln = std::log(boosted);
double compressed = (kPrecision * ln) + 0.5;
MG_ASSERT(compressed < kSampleLimit, "compressing value {} to {} is invalid", value, compressed);
auto sample_index = static_cast<uint16_t>(compressed);
count_++;
samples_[sample_index]++;
sum_ += value;
}
uint64_t Percentile(double percentile) const {
MG_ASSERT(percentile <= 100.0, "percentiles must not exceed 100.0");
MG_ASSERT(percentile >= 0.0, "percentiles must be greater than or equal to 0.0");
if (count_ == 0) {
return 0;
}
const auto floated_count = static_cast<double>(count_);
const auto target = std::max(floated_count * percentile / 100.0, 1.0);
auto scanned = 0.0;
for (int i = 0; i < kSampleLimit; i++) {
const auto samples_at_index = samples_[i];
scanned += static_cast<double>(samples_at_index);
if (scanned >= target) {
// "decompression" logic
auto floated = static_cast<double>(i);
auto unboosted = floated / kPrecision;
auto decompressed = std::exp(unboosted) - 1.0;
return static_cast<uint64_t>(decompressed);
}
}
LOG_FATAL("bug in Histogram::Percentile where it failed to return the {} percentile", percentile);
return 0;
}
};
} // namespace memgraph::utils

View File

@ -13,6 +13,7 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <unordered_map>
#include <vector> #include <vector>
namespace memgraph::utils::print_helpers { namespace memgraph::utils::print_helpers {
@ -49,6 +50,23 @@ std::ostream &operator<<(std::ostream &in, const std::map<K, V> &map) {
return in; return in;
} }
template <typename K, typename V>
std::ostream &operator<<(std::ostream &in, const std::unordered_map<K, V> &map) {
in << "{";
bool first = true;
for (const auto &[a, b] : map) {
if (!first) {
in << ", ";
}
first = false;
in << a;
in << ": ";
in << b;
}
in << "}";
return in;
}
template <typename K, typename V> template <typename K, typename V>
std::ostream &operator<<(std::ostream &in, const std::pair<K, V> &pair) { std::ostream &operator<<(std::ostream &in, const std::pair<K, V> &pair) {
const auto &[a, b] = pair; const auto &[a, b] = pair;

View File

@ -0,0 +1,29 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <functional>
#include <typeinfo>
namespace memgraph::utils {
using TypeInfoRef = std::reference_wrapper<const std::type_info>;
struct TypeInfoHasher {
std::size_t operator()(TypeInfoRef code) const { return code.get().hash_code(); }
};
struct TypeInfoEqualTo {
bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const { return lhs.get() == rhs.get(); }
};
} // namespace memgraph::utils

View File

@ -12,6 +12,7 @@
#include <thread> #include <thread>
#include "io/simulator/simulator.hpp" #include "io/simulator/simulator.hpp"
#include "utils/print_helpers.hpp"
using memgraph::io::Address; using memgraph::io::Address;
using memgraph::io::Io; using memgraph::io::Io;
@ -77,11 +78,15 @@ int main() {
std::cout << "[CLIENT] Got a valid response" << std::endl; std::cout << "[CLIENT] Got a valid response" << std::endl;
auto env = res_rez.GetValue(); auto env = res_rez.GetValue();
MG_ASSERT(env.message.highest_seen == i); MG_ASSERT(env.message.highest_seen == i);
std::cout << "response latency: " << env.response_latency.count() << " microseconds" << std::endl;
} else { } else {
std::cout << "[CLIENT] Got an error" << std::endl; std::cout << "[CLIENT] Got an error" << std::endl;
} }
} }
using memgraph::utils::print_helpers::operator<<;
std::cout << "response latencies: " << cli_io.ResponseLatencies() << std::endl;
simulator.ShutDown(); simulator.ShutDown();
return 0; return 0;
} }

View File

@ -32,6 +32,7 @@
#include "query/v2/requests.hpp" #include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/shard_request_manager.hpp"
#include "testing_constants.hpp" #include "testing_constants.hpp"
#include "utils/print_helpers.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
namespace memgraph::tests::simulation { namespace memgraph::tests::simulation {
@ -203,6 +204,7 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
auto machine_1_addr = cli_addr.ForkUniqueAddress(); auto machine_1_addr = cli_addr.ForkUniqueAddress();
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr); Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
Io<SimulatorTransport> cli_io_2 = simulator.Register(Address::TestAddress(2));
auto coordinator_addresses = std::vector{ auto coordinator_addresses = std::vector{
machine_1_addr, machine_1_addr,
@ -245,6 +247,11 @@ void RunClusterSimulation(const SimulatorConfig &sim_config, const ClusterConfig
spdlog::info("total responses: {}", stats.total_responses); spdlog::info("total responses: {}", stats.total_responses);
spdlog::info("simulator ticks: {}", stats.simulator_ticks); spdlog::info("simulator ticks: {}", stats.simulator_ticks);
auto histo = cli_io_2.ResponseLatencies();
using memgraph::utils::print_helpers::operator<<;
std::cout << "response latencies: " << histo << std::endl;
spdlog::info("========================== SUCCESS :) =========================="); spdlog::info("========================== SUCCESS :) ==========================");
} }

View File

@ -275,6 +275,9 @@ target_link_libraries(${test_prefix}utils_settings mg-utils mg-settings)
add_unit_test(utils_temporal utils_temporal.cpp) add_unit_test(utils_temporal utils_temporal.cpp)
target_link_libraries(${test_prefix}utils_temporal mg-utils) target_link_libraries(${test_prefix}utils_temporal mg-utils)
add_unit_test(utils_histogram.cpp)
target_link_libraries(${test_prefix}utils_histogram mg-utils)
# Test mg-storage-v2 # Test mg-storage-v2
add_unit_test(commit_log_v2.cpp) add_unit_test(commit_log_v2.cpp)
target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2) target_link_libraries(${test_prefix}commit_log_v2 gflags mg-utils mg-storage-v2)

View File

@ -0,0 +1,46 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "utils/histogram.hpp"
#include "utils/logging.hpp"
TEST(Histogram, BasicFunctionality) {
memgraph::utils::Histogram histo{};
for (int i = 0; i < 9000; i++) {
histo.Measure(10);
}
for (int i = 0; i < 900; i++) {
histo.Measure(25);
}
for (int i = 0; i < 90; i++) {
histo.Measure(33);
}
for (int i = 0; i < 9; i++) {
histo.Measure(47);
}
histo.Measure(500);
ASSERT_EQ(histo.Percentile(0.0), 10);
ASSERT_EQ(histo.Percentile(99.0), 25);
ASSERT_EQ(histo.Percentile(99.89), 32);
ASSERT_EQ(histo.Percentile(99.99), 46);
ASSERT_EQ(histo.Percentile(100.0), 500);
uint64_t max = std::numeric_limits<uint64_t>::max();
histo.Measure(max);
auto observed_max = static_cast<double>(histo.Percentile(100.0));
auto diff = (max - observed_max) / max;
ASSERT_THAT(diff, testing::Lt(0.01));
}