Add some metric types and basic RPC server stats
Summary: Get rid of client class Fix cppcheck errors Add documentation to metrics.hpp Add documentation to stats.hpp Remove stats from global namespace Fix build failures Refactor a bit Refactor stopwatch into a function Add rpc execution time stats Fix segmentation fault Reviewers: mferencevic Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1200
This commit is contained in:
parent
b2d7f95568
commit
d15464e181
@ -38,6 +38,7 @@ set(memgraph_src_files
|
||||
query/plan/rule_based_planner.cpp
|
||||
query/plan/variable_start_planner.cpp
|
||||
query/typed_value.cpp
|
||||
stats/metrics.cpp
|
||||
stats/stats.cpp
|
||||
storage/concurrent_id_mapper_master.cpp
|
||||
storage/concurrent_id_mapper_worker.cpp
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include "boost/serialization/unique_ptr.hpp"
|
||||
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "stats/metrics.hpp"
|
||||
|
||||
namespace communication::rpc {
|
||||
|
||||
@ -36,12 +37,26 @@ void System::Remove(const Server &server) {
|
||||
services_.erase(it);
|
||||
}
|
||||
|
||||
std::string RequestName(const std::string &service_name,
|
||||
const std::type_index &msg_type_id) {
|
||||
int s;
|
||||
char *message_type = abi::__cxa_demangle(msg_type_id.name(), NULL, NULL, &s);
|
||||
std::string ret;
|
||||
if (s == 0) {
|
||||
ret = fmt::format("rpc.server.{}.{}", service_name, message_type);
|
||||
} else {
|
||||
ret = fmt::format("rpc.server.{}.unknown", service_name);
|
||||
}
|
||||
free(message_type);
|
||||
return ret;
|
||||
}
|
||||
|
||||
Server::Server(System &system, const std::string &service_name,
|
||||
int workers_count)
|
||||
: system_(system), service_name_(service_name) {
|
||||
system_.Add(*this);
|
||||
for (int i = 0; i < workers_count; ++i) {
|
||||
threads_.push_back(std::thread([this]() {
|
||||
threads_.push_back(std::thread([this, service_name]() {
|
||||
// TODO: Add logging.
|
||||
while (alive_) {
|
||||
auto task = queue_.AwaitPop();
|
||||
@ -52,7 +67,12 @@ Server::Server(System &system, const std::string &service_name,
|
||||
auto callbacks_accessor = callbacks_.access();
|
||||
auto it = callbacks_accessor.find(message->type_index());
|
||||
if (it == callbacks_accessor.end()) continue;
|
||||
auto response = it->second(*(message.get()));
|
||||
|
||||
auto req_name = RequestName(service_name, message->type_index());
|
||||
std::unique_ptr<Message> response = nullptr;
|
||||
|
||||
stats::Stopwatch(req_name,
|
||||
[&] { response = it->second(*(message.get())); });
|
||||
SendMessage(*socket, message_id, response);
|
||||
}
|
||||
}));
|
||||
|
@ -166,8 +166,8 @@ int main(int argc, char **argv) {
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&terminate_handler);
|
||||
|
||||
InitStatsLogging();
|
||||
utils::OnScopeExit stop_stats([] { StopStatsLogging(); });
|
||||
stats::InitStatsLogging();
|
||||
utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); });
|
||||
|
||||
CHECK(!(FLAGS_master && FLAGS_worker))
|
||||
<< "Can't run Memgraph as worker and master at the same time";
|
||||
|
105
src/stats/metrics.cpp
Normal file
105
src/stats/metrics.cpp
Normal file
@ -0,0 +1,105 @@
|
||||
#include "stats/metrics.hpp"
|
||||
|
||||
#include <tuple>
|
||||
|
||||
#include "fmt/format.h"
|
||||
#include "glog/logging.h"
|
||||
|
||||
namespace stats {
|
||||
|
||||
std::mutex &MetricsMutex() {
|
||||
static std::mutex mutex;
|
||||
return mutex;
|
||||
}
|
||||
|
||||
std::map<std::string, std::unique_ptr<Metric>> &AccessMetrics() {
|
||||
static std::map<std::string, std::unique_ptr<Metric>> metrics;
|
||||
MetricsMutex().lock();
|
||||
return metrics;
|
||||
}
|
||||
|
||||
void ReleaseMetrics() { MetricsMutex().unlock(); }
|
||||
|
||||
Metric::Metric(int64_t start_value) : value_(start_value) {}
|
||||
|
||||
Counter::Counter(int64_t start_value) : Metric(start_value) {}
|
||||
|
||||
void Counter::Bump(int64_t delta) { value_ += delta; }
|
||||
|
||||
std::experimental::optional<int64_t> Counter::Flush() { return value_; }
|
||||
|
||||
int64_t Counter::Value() { return value_; }
|
||||
|
||||
Gauge::Gauge(int64_t start_value) : Metric(start_value) {}
|
||||
|
||||
void Gauge::Set(int64_t value) { value_ = value; }
|
||||
|
||||
std::experimental::optional<int64_t> Gauge::Flush() { return value_; }
|
||||
|
||||
IntervalMin::IntervalMin(int64_t start_value) : Metric(start_value) {}
|
||||
|
||||
void IntervalMin::Add(int64_t value) {
|
||||
int64_t curr = value_;
|
||||
while (curr > value && !value_.compare_exchange_weak(curr, value))
|
||||
;
|
||||
}
|
||||
|
||||
std::experimental::optional<int64_t> IntervalMin::Flush() {
|
||||
int64_t curr = value_;
|
||||
value_.compare_exchange_weak(curr, std::numeric_limits<int64_t>::max());
|
||||
return curr == std::numeric_limits<int64_t>::max()
|
||||
? std::experimental::nullopt
|
||||
: std::experimental::make_optional(curr);
|
||||
}
|
||||
|
||||
IntervalMax::IntervalMax(int64_t start_value) : Metric(start_value) {}
|
||||
|
||||
void IntervalMax::Add(int64_t value) {
|
||||
int64_t curr = value_;
|
||||
while (curr < value && !value_.compare_exchange_weak(curr, value))
|
||||
;
|
||||
}
|
||||
|
||||
std::experimental::optional<int64_t> IntervalMax::Flush() {
|
||||
int64_t curr = value_;
|
||||
value_.compare_exchange_weak(curr, std::numeric_limits<int64_t>::min());
|
||||
return curr == std::numeric_limits<int64_t>::min()
|
||||
? std::experimental::nullopt
|
||||
: std::experimental::make_optional(curr);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
T &GetMetric(const std::string &name, int64_t start_value) {
|
||||
auto &metrics = AccessMetrics();
|
||||
auto it = metrics.find(name);
|
||||
if (it == metrics.end()) {
|
||||
auto got = metrics.emplace(name, std::make_unique<T>(start_value));
|
||||
CHECK(got.second) << "Failed to create counter " << name;
|
||||
it = got.first;
|
||||
}
|
||||
ReleaseMetrics();
|
||||
auto *ptr = dynamic_cast<T *>(it->second.get());
|
||||
if (!ptr) {
|
||||
LOG(FATAL) << fmt::format("GetMetric({}) called with invalid metric type",
|
||||
name);
|
||||
}
|
||||
return *ptr;
|
||||
}
|
||||
|
||||
Counter &GetCounter(const std::string &name, int64_t start_value) {
|
||||
return GetMetric<Counter>(name, start_value);
|
||||
}
|
||||
|
||||
Gauge &GetGauge(const std::string &name, int64_t start_value) {
|
||||
return GetMetric<Gauge>(name, start_value);
|
||||
}
|
||||
|
||||
IntervalMin &GetIntervalMin(const std::string &name) {
|
||||
return GetMetric<IntervalMin>(name, std::numeric_limits<int64_t>::max());
|
||||
}
|
||||
|
||||
IntervalMax &GetIntervalMax(const std::string &name) {
|
||||
return GetMetric<IntervalMax>(name, std::numeric_limits<int64_t>::min());
|
||||
}
|
||||
|
||||
} // namespace stats
|
202
src/stats/metrics.hpp
Normal file
202
src/stats/metrics.hpp
Normal file
@ -0,0 +1,202 @@
|
||||
/**
|
||||
* @file
|
||||
*
|
||||
* This file contains some metrics types that can be aggregated on client side
|
||||
* and periodically flushed to StatsD.
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <experimental/optional>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
|
||||
#include "fmt/format.h"
|
||||
|
||||
namespace stats {
|
||||
|
||||
// TODO(mtomic): it would probably be nice to have Value method for every metric
|
||||
// type, however, there is no use case for this yet
|
||||
|
||||
/**
|
||||
* Abstract base class for all metrics.
|
||||
*/
|
||||
class Metric {
|
||||
public:
|
||||
/**
|
||||
* Constructs a metric to be exported to StatsD.
|
||||
*
|
||||
* @param name metric will be exported to StatsD with this path
|
||||
* @param value initial value
|
||||
*/
|
||||
virtual ~Metric() {}
|
||||
|
||||
/**
|
||||
* Metric refresh thread will periodically call this function. It should
|
||||
* return the metric value aggregated since the last flush call or nullopt
|
||||
* if there were no updates.
|
||||
*/
|
||||
virtual std::experimental::optional<int64_t> Flush() = 0;
|
||||
|
||||
explicit Metric(int64_t start_value = 0);
|
||||
|
||||
protected:
|
||||
std::atomic<int64_t> value_;
|
||||
};
|
||||
|
||||
/**
|
||||
* A simple counter.
|
||||
*/
|
||||
class Counter : public Metric {
|
||||
public:
|
||||
explicit Counter(int64_t start_value = 0);
|
||||
|
||||
/**
|
||||
* Change counter value by delta.
|
||||
*
|
||||
* @param delta value change
|
||||
*/
|
||||
void Bump(int64_t delta = 1);
|
||||
|
||||
/** Returns the current value of the counter. **/
|
||||
std::experimental::optional<int64_t> Flush() override;
|
||||
|
||||
/** Returns the current value of the counter. **/
|
||||
int64_t Value();
|
||||
|
||||
friend Counter &GetCounter(const std::string &name);
|
||||
};
|
||||
|
||||
/**
|
||||
* To be used instead of Counter constructor. If counter with this name doesn't
|
||||
* exist, it will be initialized with start_value.
|
||||
*
|
||||
* @param name counter name
|
||||
* @param start_value start value
|
||||
*/
|
||||
Counter &GetCounter(const std::string &name, int64_t start_value = 0);
|
||||
|
||||
/**
|
||||
* A simple gauge. Gauge value is explicitly set, instead of being added to or
|
||||
* subtracted from.
|
||||
*/
|
||||
class Gauge : public Metric {
|
||||
public:
|
||||
explicit Gauge(int64_t start_value = 0);
|
||||
|
||||
/**
|
||||
* Set gauge value.
|
||||
*
|
||||
* @param value value to be set
|
||||
*/
|
||||
void Set(int64_t value);
|
||||
|
||||
/** Returns the current gauge value. **/
|
||||
std::experimental::optional<int64_t> Flush() override;
|
||||
};
|
||||
|
||||
/**
|
||||
* To be used instead of Gauge constructor. If gauge with this name doesn't
|
||||
* exist, it will be initialized with start_value.
|
||||
*
|
||||
* @param name gauge name
|
||||
* @param start_value start value
|
||||
*/
|
||||
Gauge &GetGauge(const std::string &name, int64_t start_value = 0);
|
||||
|
||||
/**
|
||||
* Aggregates minimum between two flush periods.
|
||||
*/
|
||||
class IntervalMin : public Metric {
|
||||
public:
|
||||
explicit IntervalMin(int64_t start_value);
|
||||
|
||||
/**
|
||||
* Add another value into the minimum computation.
|
||||
*
|
||||
* @param value value to be added
|
||||
*/
|
||||
void Add(int64_t value);
|
||||
|
||||
/**
|
||||
* Returns the minimum value encountered since the last flush period,
|
||||
* or nullopt if no values were added.
|
||||
*/
|
||||
std::experimental::optional<int64_t> Flush() override;
|
||||
};
|
||||
|
||||
/**
|
||||
* To be used instead of IntervalMin constructor.
|
||||
*
|
||||
* @param name interval min name
|
||||
*/
|
||||
IntervalMin &GetIntervalMin(const std::string &name);
|
||||
|
||||
/**
|
||||
* Aggregates maximum betweenw two flush periods.
|
||||
*/
|
||||
class IntervalMax : public Metric {
|
||||
public:
|
||||
explicit IntervalMax(int64_t start_value);
|
||||
|
||||
/**
|
||||
* Add another value into the maximum computation.
|
||||
*/
|
||||
void Add(int64_t value);
|
||||
|
||||
/**
|
||||
* Returns the maximum value encountered since the last flush period,
|
||||
* or nullopt if no values were added.
|
||||
*/
|
||||
std::experimental::optional<int64_t> Flush() override;
|
||||
};
|
||||
|
||||
/**
|
||||
* To be used instead of IntervalMax constructor.
|
||||
*
|
||||
* @param name interval max name
|
||||
*/
|
||||
IntervalMax &GetIntervalMax(const std::string &name);
|
||||
|
||||
/**
|
||||
* A stopwatch utility. It exports 4 metrics: total time measured since the
|
||||
* beginning of the program, total number of times time intervals measured,
|
||||
* minimum and maximum time interval measured since the last metric flush.
|
||||
* Metrics exported by the stopwatch will be named
|
||||
* [name].{total_time|count|min|max}.
|
||||
*
|
||||
* @param name timed event name
|
||||
* @param f Callable, an action to be performed.
|
||||
*/
|
||||
template <class Function>
|
||||
int64_t Stopwatch(const std::string &name, Function f) {
|
||||
auto &total_time = GetCounter(fmt::format("{}.total_time", name));
|
||||
auto &count = GetCounter(fmt::format("{}.count", name));
|
||||
auto &min = GetIntervalMin(fmt::format("{}.min", name));
|
||||
auto &max = GetIntervalMax(fmt::format("{}.max", name));
|
||||
auto start = std::chrono::system_clock::now();
|
||||
f();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now() - start)
|
||||
.count();
|
||||
total_time.Bump(duration);
|
||||
count.Bump();
|
||||
min.Add(duration);
|
||||
max.Add(duration);
|
||||
return duration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Access internal metric list. You probably don't want to use this,
|
||||
* but if you do, make sure to call ReleaseMetrics when you're done.
|
||||
*/
|
||||
std::map<std::string, std::unique_ptr<Metric>> &AccessMetrics();
|
||||
|
||||
/**
|
||||
* Releases internal lock on metric list.
|
||||
*/
|
||||
void ReleaseMetrics();
|
||||
|
||||
} // namespace stats
|
@ -1,7 +1,11 @@
|
||||
#include "stats/stats.hpp"
|
||||
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "communication/rpc/client.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "data_structures/concurrent/push_queue.hpp"
|
||||
|
||||
#include "stats/stats_rpc_messages.hpp"
|
||||
|
||||
DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address");
|
||||
DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port");
|
||||
@ -10,82 +14,100 @@ DEFINE_HIDDEN_int32(statsd_flush_interval, 500,
|
||||
|
||||
namespace stats {
|
||||
|
||||
const std::string kStatsServiceName = "stats";
|
||||
const std::string kStatsServiceName = "statsd-service";
|
||||
|
||||
StatsClient::StatsClient() {}
|
||||
std::string statsd_prefix = "";
|
||||
std::thread stats_dispatch_thread;
|
||||
std::thread counter_refresh_thread;
|
||||
std::atomic<bool> stats_running{false};
|
||||
ConcurrentPushQueue<StatsReq> stats_queue;
|
||||
|
||||
void StatsClient::Log(StatsReq req) {
|
||||
if (is_running_) {
|
||||
queue_.push(req);
|
||||
void RefreshMetrics() {
|
||||
LOG(INFO) << "Metrics flush thread started";
|
||||
while (stats_running) {
|
||||
auto &metrics = AccessMetrics();
|
||||
for (auto &kv : metrics) {
|
||||
auto value = kv.second->Flush();
|
||||
if (value) {
|
||||
LogStat(kv.first, *value);
|
||||
}
|
||||
}
|
||||
ReleaseMetrics();
|
||||
// TODO(mtomic): hardcoded sleep time
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
LOG(INFO) << "Metrics flush thread stopped";
|
||||
}
|
||||
|
||||
void StatsClient::Start(const io::network::Endpoint &endpoint) {
|
||||
|
||||
void StatsDispatchMain(const io::network::Endpoint &endpoint) {
|
||||
// TODO(mtomic): we probably want to batch based on request size and MTU
|
||||
const int MAX_BATCH_SIZE = 100;
|
||||
|
||||
dispatch_thread_ = std::thread([this, endpoint]() {
|
||||
CHECK(!is_running_) << "Stats logging already initialized!";
|
||||
LOG(INFO) << "Stats dispatcher thread started";
|
||||
LOG(INFO) << "Stats dispatcher thread started";
|
||||
|
||||
communication::rpc::Client client(endpoint, kStatsServiceName);
|
||||
communication::rpc::Client client(endpoint, kStatsServiceName);
|
||||
|
||||
BatchStatsReq batch_request;
|
||||
batch_request.requests.reserve(MAX_BATCH_SIZE);
|
||||
BatchStatsReq batch_request;
|
||||
batch_request.requests.reserve(MAX_BATCH_SIZE);
|
||||
|
||||
is_running_ = true;
|
||||
while (is_running_) {
|
||||
auto last = queue_.begin();
|
||||
size_t sent = 0, total = 0;
|
||||
while (stats_running) {
|
||||
auto last = stats_queue.begin();
|
||||
size_t sent = 0, total = 0;
|
||||
|
||||
auto flush_batch = [&] {
|
||||
if (auto rep = client.Call<BatchStatsRpc>(batch_request)) {
|
||||
sent += batch_request.requests.size();
|
||||
}
|
||||
total += batch_request.requests.size();
|
||||
batch_request.requests.clear();
|
||||
};
|
||||
|
||||
for (auto it = last; it != queue_.end(); it++) {
|
||||
batch_request.requests.emplace_back(std::move(*it));
|
||||
if (batch_request.requests.size() == MAX_BATCH_SIZE) {
|
||||
flush_batch();
|
||||
}
|
||||
auto flush_batch = [&] {
|
||||
if (auto rep = client.Call<BatchStatsRpc>(batch_request)) {
|
||||
sent += batch_request.requests.size();
|
||||
}
|
||||
total += batch_request.requests.size();
|
||||
batch_request.requests.clear();
|
||||
};
|
||||
|
||||
if (!batch_request.requests.empty()) {
|
||||
for (auto it = last; it != stats_queue.end(); it++) {
|
||||
batch_request.requests.emplace_back(std::move(*it));
|
||||
if (batch_request.requests.size() == MAX_BATCH_SIZE) {
|
||||
flush_batch();
|
||||
}
|
||||
|
||||
VLOG(10) << fmt::format("Sent {} out of {} events from queue.", sent,
|
||||
total);
|
||||
last.delete_tail();
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::milliseconds(FLAGS_statsd_flush_interval));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void StatsClient::Stop() {
|
||||
if (is_running_) {
|
||||
is_running_ = false;
|
||||
dispatch_thread_.join();
|
||||
if (!batch_request.requests.empty()) {
|
||||
flush_batch();
|
||||
}
|
||||
|
||||
VLOG(10) << fmt::format("Sent {} out of {} events from queue.", sent,
|
||||
total);
|
||||
last.delete_tail();
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::milliseconds(FLAGS_statsd_flush_interval));
|
||||
}
|
||||
}
|
||||
|
||||
StatsClient client;
|
||||
|
||||
} // namespace stats
|
||||
|
||||
void InitStatsLogging() {
|
||||
if (FLAGS_statsd_address != "") {
|
||||
stats::client.Start({FLAGS_statsd_address, (uint16_t)FLAGS_statsd_port});
|
||||
}
|
||||
}
|
||||
|
||||
void StopStatsLogging() { stats::client.Stop(); }
|
||||
|
||||
void LogStat(const std::string &metric_path, double value,
|
||||
const std::vector<std::pair<std::string, std::string>> &tags) {
|
||||
stats::client.Log({metric_path, tags, value});
|
||||
if (stats_running) {
|
||||
stats_queue.push(statsd_prefix + metric_path, tags, value);
|
||||
}
|
||||
}
|
||||
|
||||
void InitStatsLogging(std::string prefix) {
|
||||
if (!prefix.empty()) {
|
||||
statsd_prefix = prefix + ".";
|
||||
}
|
||||
if (FLAGS_statsd_address != "") {
|
||||
stats_running = true;
|
||||
stats_dispatch_thread = std::thread(
|
||||
StatsDispatchMain, io::network::Endpoint{FLAGS_statsd_address,
|
||||
(uint16_t)FLAGS_statsd_port});
|
||||
counter_refresh_thread = std::thread(RefreshMetrics);
|
||||
}
|
||||
}
|
||||
|
||||
void StopStatsLogging() {
|
||||
if (stats_running) {
|
||||
stats_running = false;
|
||||
stats_dispatch_thread.join();
|
||||
counter_refresh_thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace stats
|
||||
|
@ -3,44 +3,31 @@
|
||||
#pragma once
|
||||
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "data_structures/concurrent/push_queue.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "stats/stats_rpc_messages.hpp"
|
||||
#include "gflags/gflags.h"
|
||||
|
||||
DECLARE_string(statsd_address);
|
||||
DECLARE_int32(statsd_port);
|
||||
DECLARE_int32(statsd_flush_interval);
|
||||
#include "stats/metrics.hpp"
|
||||
|
||||
namespace stats {
|
||||
|
||||
// TODO (buda): documentation + tests
|
||||
class StatsClient {
|
||||
public:
|
||||
StatsClient();
|
||||
/**
|
||||
* Start sending metrics to StatsD server.
|
||||
*
|
||||
* @param prefix prefix to prepend to exported keys
|
||||
*/
|
||||
void InitStatsLogging(std::string prefix = "");
|
||||
|
||||
// To be clear.
|
||||
StatsClient(const StatsClient &other) = delete;
|
||||
StatsClient(StatsClient &&other) = delete;
|
||||
StatsClient &operator=(const StatsClient &) = delete;
|
||||
StatsClient &operator=(StatsClient &&) = delete;
|
||||
|
||||
void Log(StatsReq req);
|
||||
|
||||
void Start(const io::network::Endpoint &endpoint);
|
||||
void Stop();
|
||||
|
||||
private:
|
||||
ConcurrentPushQueue<StatsReq> queue_;
|
||||
std::atomic<bool> is_running_{false};
|
||||
std::thread dispatch_thread_;
|
||||
};
|
||||
|
||||
} // namespace stats
|
||||
|
||||
// TODO (buda): ON/OFF compile OR runtime parameter?
|
||||
|
||||
void InitStatsLogging();
|
||||
/**
|
||||
* Stop sending metrics to StatsD server. This should be called before exiting
|
||||
* program.
|
||||
*/
|
||||
void StopStatsLogging();
|
||||
|
||||
/**
|
||||
* Send a value to StatsD with current timestamp.
|
||||
*/
|
||||
void LogStat(const std::string &metric_path, double value,
|
||||
const std::vector<std::pair<std::string, std::string>> &tags = {});
|
||||
|
||||
} // namespace stats
|
||||
|
@ -6,6 +6,7 @@
|
||||
|
||||
#include "long_running_common.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "stats/stats_rpc_messages.hpp"
|
||||
|
||||
// TODO(mtomic): this sucks but I don't know a different way to make it work
|
||||
#include "boost/archive/binary_iarchive.hpp"
|
||||
@ -16,7 +17,6 @@ BOOST_CLASS_EXPORT(stats::StatsRes);
|
||||
BOOST_CLASS_EXPORT(stats::BatchStatsReq);
|
||||
BOOST_CLASS_EXPORT(stats::BatchStatsRes);
|
||||
|
||||
|
||||
class CardFraudClient : public TestClient {
|
||||
public:
|
||||
CardFraudClient(int id, int num_pos, nlohmann::json config)
|
||||
@ -108,7 +108,7 @@ int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
InitStatsLogging();
|
||||
stats::InitStatsLogging();
|
||||
|
||||
nlohmann::json config;
|
||||
std::cin >> config;
|
||||
@ -128,7 +128,7 @@ int main(int argc, char **argv) {
|
||||
|
||||
RunMultithreadedTest(clients);
|
||||
|
||||
StopStatsLogging();
|
||||
stats::StopStatsLogging();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include "bolt_client.hpp"
|
||||
#include "common.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "stats/metrics.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "utils/network.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
@ -22,6 +23,11 @@ DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
|
||||
DEFINE_string(group, "unknown", "Test group name");
|
||||
DEFINE_string(scenario, "unknown", "Test scenario name");
|
||||
|
||||
static const auto EXECUTED_QUERIES =
|
||||
fmt::format("{}.{}.executed_queries", FLAGS_group, FLAGS_scenario);
|
||||
|
||||
auto &executed_queries = stats::GetCounter(EXECUTED_QUERIES);
|
||||
|
||||
class TestClient {
|
||||
public:
|
||||
TestClient()
|
||||
@ -69,6 +75,7 @@ class TestClient {
|
||||
stats_[query].push_back(std::move(metadata));
|
||||
}
|
||||
}
|
||||
executed_queries.Bump();
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -85,13 +92,6 @@ class TestClient {
|
||||
};
|
||||
|
||||
void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
static const auto HOSTNAME =
|
||||
utils::GetHostname().value_or("unknown_hostname");
|
||||
static const auto TEST_PREFIX = fmt::format("{}.long_running.{}.{}", HOSTNAME,
|
||||
FLAGS_group, FLAGS_scenario);
|
||||
static const auto EXECUTED_QUERIES =
|
||||
fmt::format("{}.executed_queries", TEST_PREFIX);
|
||||
|
||||
CHECK((int)clients.size() == FLAGS_num_workers);
|
||||
|
||||
// Open stream for writing stats.
|
||||
@ -110,7 +110,6 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
client->Run();
|
||||
}
|
||||
LOG(INFO) << "Starting test with " << clients.size() << " workers";
|
||||
uint64_t executed_queries = 0;
|
||||
while (timer.Elapsed().count() < FLAGS_duration) {
|
||||
std::unordered_map<std::string, std::map<std::string, DecodedValue>>
|
||||
aggregated_stats;
|
||||
@ -125,8 +124,6 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
auto &query_stats = stats[client_query_stats.first];
|
||||
query_stats.insert(query_stats.end(), client_query_stats.second.begin(),
|
||||
client_query_stats.second.end());
|
||||
executed_queries +=
|
||||
client_query_stats.second.end() - client_query_stats.second.begin();
|
||||
}
|
||||
}
|
||||
|
||||
@ -134,7 +131,10 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
// little bit chaotic. Think about refactoring this part to only use json
|
||||
// and write DecodedValue to json converter.
|
||||
const std::vector<std::string> fields = {
|
||||
"wall_time", "parsing_time", "planning_time", "plan_execution_time",
|
||||
"wall_time",
|
||||
"parsing_time",
|
||||
"planning_time",
|
||||
"plan_execution_time",
|
||||
};
|
||||
for (const auto &query_stats : stats) {
|
||||
std::map<std::string, double> new_aggregated_query_stats;
|
||||
@ -161,17 +161,16 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
}
|
||||
}
|
||||
|
||||
LogStat(EXECUTED_QUERIES, executed_queries);
|
||||
|
||||
out << "{\"num_executed_queries\": " << executed_queries << ", "
|
||||
out << "{\"num_executed_queries\": " << executed_queries.Value() << ", "
|
||||
<< "\"elapsed_time\": " << timer.Elapsed().count()
|
||||
<< ", \"queries\": [";
|
||||
utils::PrintIterable(out, aggregated_stats, ", ", [](auto &stream,
|
||||
const auto &x) {
|
||||
stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": ";
|
||||
PrintJsonDecodedValue(stream, DecodedValue(x.second));
|
||||
stream << "}";
|
||||
});
|
||||
utils::PrintIterable(
|
||||
out, aggregated_stats, ", ", [](auto &stream, const auto &x) {
|
||||
stream << "{\"query\": " << nlohmann::json(x.first)
|
||||
<< ", \"stats\": ";
|
||||
PrintJsonDecodedValue(stream, DecodedValue(x.second));
|
||||
stream << "}";
|
||||
});
|
||||
out << "]}" << std::endl;
|
||||
out.flush();
|
||||
std::this_thread::sleep_for(1s);
|
||||
|
90
tests/unit/metrics.cpp
Normal file
90
tests/unit/metrics.cpp
Normal file
@ -0,0 +1,90 @@
|
||||
#include "stats/metrics.hpp"
|
||||
|
||||
#include <thread>
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
using namespace stats;
|
||||
|
||||
TEST(Metrics, Counter) {
|
||||
Counter &x = GetCounter("counter");
|
||||
EXPECT_EQ(*x.Flush(), 0);
|
||||
EXPECT_EQ(x.Value(), 0);
|
||||
x.Bump();
|
||||
EXPECT_EQ(*x.Flush(), 1);
|
||||
EXPECT_EQ(x.Value(), 1);
|
||||
|
||||
Counter &y = GetCounter("counter");
|
||||
EXPECT_EQ(*y.Flush(), 1);
|
||||
EXPECT_EQ(y.Value(), 1);
|
||||
|
||||
y.Bump(5);
|
||||
EXPECT_EQ(*x.Flush(), 6);
|
||||
EXPECT_EQ(x.Value(), 6);
|
||||
EXPECT_EQ(*y.Flush(), 6);
|
||||
EXPECT_EQ(y.Value(), 6);
|
||||
}
|
||||
|
||||
TEST(Metrics, Gauge) {
|
||||
Gauge &x = GetGauge("gauge");
|
||||
EXPECT_EQ(*x.Flush(), 0);
|
||||
x.Set(1);
|
||||
EXPECT_EQ(*x.Flush(), 1);
|
||||
|
||||
Gauge &y = GetGauge("gauge");
|
||||
EXPECT_EQ(*y.Flush(), 1);
|
||||
|
||||
x.Set(2);
|
||||
EXPECT_EQ(*x.Flush(), 2);
|
||||
EXPECT_EQ(*y.Flush(), 2);
|
||||
}
|
||||
|
||||
TEST(Metrics, IntervalMin) {
|
||||
IntervalMin &x = GetIntervalMin("min");
|
||||
EXPECT_EQ(x.Flush(), std::experimental::nullopt);
|
||||
x.Add(5);
|
||||
x.Add(3);
|
||||
EXPECT_EQ(*x.Flush(), 3);
|
||||
EXPECT_EQ(x.Flush(), std::experimental::nullopt);
|
||||
x.Add(3);
|
||||
x.Add(5);
|
||||
EXPECT_EQ(*x.Flush(), 3);
|
||||
EXPECT_EQ(x.Flush(), std::experimental::nullopt);
|
||||
}
|
||||
|
||||
TEST(Metrics, IntervalMax) {
|
||||
IntervalMax &x = GetIntervalMax("max");
|
||||
EXPECT_EQ(x.Flush(), std::experimental::nullopt);
|
||||
x.Add(5);
|
||||
x.Add(3);
|
||||
EXPECT_EQ(*x.Flush(), 5);
|
||||
EXPECT_EQ(x.Flush(), std::experimental::nullopt);
|
||||
x.Add(3);
|
||||
x.Add(5);
|
||||
EXPECT_EQ(*x.Flush(), 5);
|
||||
EXPECT_EQ(x.Flush(), std::experimental::nullopt);
|
||||
}
|
||||
|
||||
TEST(Metrics, Stopwatch) {
|
||||
auto d1 = Stopwatch("stopwatch", [] { std::this_thread::sleep_for(150ms); });
|
||||
EXPECT_TRUE(140 <= d1 && d1 <= 160);
|
||||
|
||||
auto d2 = Stopwatch("stopwatch", [] { std::this_thread::sleep_for(300ms); });
|
||||
EXPECT_TRUE(290 <= d2 && d2 <= 310);
|
||||
|
||||
Counter &total_time = GetCounter("stopwatch.total_time");
|
||||
Counter &count = GetCounter("stopwatch.count");
|
||||
IntervalMin &min = GetIntervalMin("stopwatch.min");
|
||||
IntervalMax &max = GetIntervalMax("stopwatch.max");
|
||||
|
||||
EXPECT_TRUE(430 <= total_time.Value() && total_time.Value() <= 470);
|
||||
EXPECT_EQ(count.Value(), 2);
|
||||
|
||||
auto m = *min.Flush();
|
||||
EXPECT_TRUE(140 <= m && m <= 160);
|
||||
|
||||
auto M = *max.Flush();
|
||||
EXPECT_TRUE(290 <= M && M <= 310);
|
||||
}
|
@ -3,6 +3,7 @@
|
||||
#include "communication/rpc/server.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "stats/stats_rpc_messages.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
|
||||
DEFINE_string(interface, "0.0.0.0",
|
||||
|
@ -1,6 +1,8 @@
|
||||
#include "gflags/gflags.h"
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "stats/stats.hpp"
|
||||
#include "stats/stats_rpc_messages.hpp"
|
||||
#include "utils/string.hpp"
|
||||
|
||||
// TODO (buda): move this logic to a unit test
|
||||
@ -47,7 +49,7 @@ int main(int argc, char *argv[]) {
|
||||
LOG(INFO) << "Usage: metric_path tag1=value1 ... tagn=valuen "
|
||||
"metric_value";
|
||||
|
||||
InitStatsLogging();
|
||||
stats::InitStatsLogging();
|
||||
|
||||
std::string line;
|
||||
std::string metric_path;
|
||||
@ -60,7 +62,7 @@ int main(int argc, char *argv[]) {
|
||||
LOG(ERROR) << "Invalid input";
|
||||
continue;
|
||||
}
|
||||
LogStat(metric_path, value, tags);
|
||||
stats::LogStat(metric_path, value, tags);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
Loading…
Reference in New Issue
Block a user