diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f0b33837b..c30e99fcf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -38,6 +38,7 @@ set(memgraph_src_files query/plan/rule_based_planner.cpp query/plan/variable_start_planner.cpp query/typed_value.cpp + stats/metrics.cpp stats/stats.cpp storage/concurrent_id_mapper_master.cpp storage/concurrent_id_mapper_worker.cpp diff --git a/src/communication/rpc/server.cpp b/src/communication/rpc/server.cpp index 903b39f2b..752830a32 100644 --- a/src/communication/rpc/server.cpp +++ b/src/communication/rpc/server.cpp @@ -6,6 +6,7 @@ #include "boost/serialization/unique_ptr.hpp" #include "communication/rpc/server.hpp" +#include "stats/metrics.hpp" namespace communication::rpc { @@ -36,12 +37,26 @@ void System::Remove(const Server &server) { services_.erase(it); } +std::string RequestName(const std::string &service_name, + const std::type_index &msg_type_id) { + int s; + char *message_type = abi::__cxa_demangle(msg_type_id.name(), NULL, NULL, &s); + std::string ret; + if (s == 0) { + ret = fmt::format("rpc.server.{}.{}", service_name, message_type); + } else { + ret = fmt::format("rpc.server.{}.unknown", service_name); + } + free(message_type); + return ret; +} + Server::Server(System &system, const std::string &service_name, int workers_count) : system_(system), service_name_(service_name) { system_.Add(*this); for (int i = 0; i < workers_count; ++i) { - threads_.push_back(std::thread([this]() { + threads_.push_back(std::thread([this, service_name]() { // TODO: Add logging. while (alive_) { auto task = queue_.AwaitPop(); @@ -52,7 +67,12 @@ Server::Server(System &system, const std::string &service_name, auto callbacks_accessor = callbacks_.access(); auto it = callbacks_accessor.find(message->type_index()); if (it == callbacks_accessor.end()) continue; - auto response = it->second(*(message.get())); + + auto req_name = RequestName(service_name, message->type_index()); + std::unique_ptr response = nullptr; + + stats::Stopwatch(req_name, + [&] { response = it->second(*(message.get())); }); SendMessage(*socket, message_id, response); } })); diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index e77ee28dc..9e6f9e960 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -166,8 +166,8 @@ int main(int argc, char **argv) { // Unhandled exception handler init. std::set_terminate(&terminate_handler); - InitStatsLogging(); - utils::OnScopeExit stop_stats([] { StopStatsLogging(); }); + stats::InitStatsLogging(); + utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); }); CHECK(!(FLAGS_master && FLAGS_worker)) << "Can't run Memgraph as worker and master at the same time"; diff --git a/src/stats/metrics.cpp b/src/stats/metrics.cpp new file mode 100644 index 000000000..69552f45b --- /dev/null +++ b/src/stats/metrics.cpp @@ -0,0 +1,105 @@ +#include "stats/metrics.hpp" + +#include + +#include "fmt/format.h" +#include "glog/logging.h" + +namespace stats { + +std::mutex &MetricsMutex() { + static std::mutex mutex; + return mutex; +} + +std::map> &AccessMetrics() { + static std::map> metrics; + MetricsMutex().lock(); + return metrics; +} + +void ReleaseMetrics() { MetricsMutex().unlock(); } + +Metric::Metric(int64_t start_value) : value_(start_value) {} + +Counter::Counter(int64_t start_value) : Metric(start_value) {} + +void Counter::Bump(int64_t delta) { value_ += delta; } + +std::experimental::optional Counter::Flush() { return value_; } + +int64_t Counter::Value() { return value_; } + +Gauge::Gauge(int64_t start_value) : Metric(start_value) {} + +void Gauge::Set(int64_t value) { value_ = value; } + +std::experimental::optional Gauge::Flush() { return value_; } + +IntervalMin::IntervalMin(int64_t start_value) : Metric(start_value) {} + +void IntervalMin::Add(int64_t value) { + int64_t curr = value_; + while (curr > value && !value_.compare_exchange_weak(curr, value)) + ; +} + +std::experimental::optional IntervalMin::Flush() { + int64_t curr = value_; + value_.compare_exchange_weak(curr, std::numeric_limits::max()); + return curr == std::numeric_limits::max() + ? std::experimental::nullopt + : std::experimental::make_optional(curr); +} + +IntervalMax::IntervalMax(int64_t start_value) : Metric(start_value) {} + +void IntervalMax::Add(int64_t value) { + int64_t curr = value_; + while (curr < value && !value_.compare_exchange_weak(curr, value)) + ; +} + +std::experimental::optional IntervalMax::Flush() { + int64_t curr = value_; + value_.compare_exchange_weak(curr, std::numeric_limits::min()); + return curr == std::numeric_limits::min() + ? std::experimental::nullopt + : std::experimental::make_optional(curr); +} + +template +T &GetMetric(const std::string &name, int64_t start_value) { + auto &metrics = AccessMetrics(); + auto it = metrics.find(name); + if (it == metrics.end()) { + auto got = metrics.emplace(name, std::make_unique(start_value)); + CHECK(got.second) << "Failed to create counter " << name; + it = got.first; + } + ReleaseMetrics(); + auto *ptr = dynamic_cast(it->second.get()); + if (!ptr) { + LOG(FATAL) << fmt::format("GetMetric({}) called with invalid metric type", + name); + } + return *ptr; +} + +Counter &GetCounter(const std::string &name, int64_t start_value) { + return GetMetric(name, start_value); +} + +Gauge &GetGauge(const std::string &name, int64_t start_value) { + return GetMetric(name, start_value); +} + +IntervalMin &GetIntervalMin(const std::string &name) { + return GetMetric(name, std::numeric_limits::max()); +} + +IntervalMax &GetIntervalMax(const std::string &name) { + return GetMetric(name, std::numeric_limits::min()); +} + +} // namespace stats diff --git a/src/stats/metrics.hpp b/src/stats/metrics.hpp new file mode 100644 index 000000000..c13bcff18 --- /dev/null +++ b/src/stats/metrics.hpp @@ -0,0 +1,202 @@ +/** + * @file + * + * This file contains some metrics types that can be aggregated on client side + * and periodically flushed to StatsD. + */ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "fmt/format.h" + +namespace stats { + +// TODO(mtomic): it would probably be nice to have Value method for every metric +// type, however, there is no use case for this yet + +/** + * Abstract base class for all metrics. + */ +class Metric { + public: + /** + * Constructs a metric to be exported to StatsD. + * + * @param name metric will be exported to StatsD with this path + * @param value initial value + */ + virtual ~Metric() {} + + /** + * Metric refresh thread will periodically call this function. It should + * return the metric value aggregated since the last flush call or nullopt + * if there were no updates. + */ + virtual std::experimental::optional Flush() = 0; + + explicit Metric(int64_t start_value = 0); + + protected: + std::atomic value_; +}; + +/** + * A simple counter. + */ +class Counter : public Metric { + public: + explicit Counter(int64_t start_value = 0); + + /** + * Change counter value by delta. + * + * @param delta value change + */ + void Bump(int64_t delta = 1); + + /** Returns the current value of the counter. **/ + std::experimental::optional Flush() override; + + /** Returns the current value of the counter. **/ + int64_t Value(); + + friend Counter &GetCounter(const std::string &name); +}; + +/** + * To be used instead of Counter constructor. If counter with this name doesn't + * exist, it will be initialized with start_value. + * + * @param name counter name + * @param start_value start value + */ +Counter &GetCounter(const std::string &name, int64_t start_value = 0); + +/** + * A simple gauge. Gauge value is explicitly set, instead of being added to or + * subtracted from. + */ +class Gauge : public Metric { + public: + explicit Gauge(int64_t start_value = 0); + + /** + * Set gauge value. + * + * @param value value to be set + */ + void Set(int64_t value); + + /** Returns the current gauge value. **/ + std::experimental::optional Flush() override; +}; + +/** + * To be used instead of Gauge constructor. If gauge with this name doesn't + * exist, it will be initialized with start_value. + * + * @param name gauge name + * @param start_value start value + */ +Gauge &GetGauge(const std::string &name, int64_t start_value = 0); + +/** + * Aggregates minimum between two flush periods. + */ +class IntervalMin : public Metric { + public: + explicit IntervalMin(int64_t start_value); + + /** + * Add another value into the minimum computation. + * + * @param value value to be added + */ + void Add(int64_t value); + + /** + * Returns the minimum value encountered since the last flush period, + * or nullopt if no values were added. + */ + std::experimental::optional Flush() override; +}; + +/** + * To be used instead of IntervalMin constructor. + * + * @param name interval min name + */ +IntervalMin &GetIntervalMin(const std::string &name); + +/** + * Aggregates maximum betweenw two flush periods. + */ +class IntervalMax : public Metric { + public: + explicit IntervalMax(int64_t start_value); + + /** + * Add another value into the maximum computation. + */ + void Add(int64_t value); + + /** + * Returns the maximum value encountered since the last flush period, + * or nullopt if no values were added. + */ + std::experimental::optional Flush() override; +}; + +/** + * To be used instead of IntervalMax constructor. + * + * @param name interval max name + */ +IntervalMax &GetIntervalMax(const std::string &name); + +/** + * A stopwatch utility. It exports 4 metrics: total time measured since the + * beginning of the program, total number of times time intervals measured, + * minimum and maximum time interval measured since the last metric flush. + * Metrics exported by the stopwatch will be named + * [name].{total_time|count|min|max}. + * + * @param name timed event name + * @param f Callable, an action to be performed. + */ +template +int64_t Stopwatch(const std::string &name, Function f) { + auto &total_time = GetCounter(fmt::format("{}.total_time", name)); + auto &count = GetCounter(fmt::format("{}.count", name)); + auto &min = GetIntervalMin(fmt::format("{}.min", name)); + auto &max = GetIntervalMax(fmt::format("{}.max", name)); + auto start = std::chrono::system_clock::now(); + f(); + auto duration = std::chrono::duration_cast( + std::chrono::system_clock::now() - start) + .count(); + total_time.Bump(duration); + count.Bump(); + min.Add(duration); + max.Add(duration); + return duration; +} + +/** + * Access internal metric list. You probably don't want to use this, + * but if you do, make sure to call ReleaseMetrics when you're done. + */ +std::map> &AccessMetrics(); + +/** + * Releases internal lock on metric list. + */ +void ReleaseMetrics(); + +} // namespace stats diff --git a/src/stats/stats.cpp b/src/stats/stats.cpp index 0d76a4ddf..43fd816ad 100644 --- a/src/stats/stats.cpp +++ b/src/stats/stats.cpp @@ -1,7 +1,11 @@ +#include "stats/stats.hpp" + #include "glog/logging.h" #include "communication/rpc/client.hpp" -#include "stats/stats.hpp" +#include "data_structures/concurrent/push_queue.hpp" + +#include "stats/stats_rpc_messages.hpp" DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address"); DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port"); @@ -10,82 +14,100 @@ DEFINE_HIDDEN_int32(statsd_flush_interval, 500, namespace stats { -const std::string kStatsServiceName = "stats"; +const std::string kStatsServiceName = "statsd-service"; -StatsClient::StatsClient() {} +std::string statsd_prefix = ""; +std::thread stats_dispatch_thread; +std::thread counter_refresh_thread; +std::atomic stats_running{false}; +ConcurrentPushQueue stats_queue; -void StatsClient::Log(StatsReq req) { - if (is_running_) { - queue_.push(req); +void RefreshMetrics() { + LOG(INFO) << "Metrics flush thread started"; + while (stats_running) { + auto &metrics = AccessMetrics(); + for (auto &kv : metrics) { + auto value = kv.second->Flush(); + if (value) { + LogStat(kv.first, *value); + } + } + ReleaseMetrics(); + // TODO(mtomic): hardcoded sleep time + std::this_thread::sleep_for(std::chrono::seconds(1)); } + LOG(INFO) << "Metrics flush thread stopped"; } -void StatsClient::Start(const io::network::Endpoint &endpoint) { + +void StatsDispatchMain(const io::network::Endpoint &endpoint) { // TODO(mtomic): we probably want to batch based on request size and MTU const int MAX_BATCH_SIZE = 100; - dispatch_thread_ = std::thread([this, endpoint]() { - CHECK(!is_running_) << "Stats logging already initialized!"; - LOG(INFO) << "Stats dispatcher thread started"; + LOG(INFO) << "Stats dispatcher thread started"; - communication::rpc::Client client(endpoint, kStatsServiceName); + communication::rpc::Client client(endpoint, kStatsServiceName); - BatchStatsReq batch_request; - batch_request.requests.reserve(MAX_BATCH_SIZE); + BatchStatsReq batch_request; + batch_request.requests.reserve(MAX_BATCH_SIZE); - is_running_ = true; - while (is_running_) { - auto last = queue_.begin(); - size_t sent = 0, total = 0; + while (stats_running) { + auto last = stats_queue.begin(); + size_t sent = 0, total = 0; - auto flush_batch = [&] { - if (auto rep = client.Call(batch_request)) { - sent += batch_request.requests.size(); - } - total += batch_request.requests.size(); - batch_request.requests.clear(); - }; - - for (auto it = last; it != queue_.end(); it++) { - batch_request.requests.emplace_back(std::move(*it)); - if (batch_request.requests.size() == MAX_BATCH_SIZE) { - flush_batch(); - } + auto flush_batch = [&] { + if (auto rep = client.Call(batch_request)) { + sent += batch_request.requests.size(); } + total += batch_request.requests.size(); + batch_request.requests.clear(); + }; - if (!batch_request.requests.empty()) { + for (auto it = last; it != stats_queue.end(); it++) { + batch_request.requests.emplace_back(std::move(*it)); + if (batch_request.requests.size() == MAX_BATCH_SIZE) { flush_batch(); } - - VLOG(10) << fmt::format("Sent {} out of {} events from queue.", sent, - total); - last.delete_tail(); - std::this_thread::sleep_for( - std::chrono::milliseconds(FLAGS_statsd_flush_interval)); } - }); -} -void StatsClient::Stop() { - if (is_running_) { - is_running_ = false; - dispatch_thread_.join(); + if (!batch_request.requests.empty()) { + flush_batch(); + } + + VLOG(10) << fmt::format("Sent {} out of {} events from queue.", sent, + total); + last.delete_tail(); + std::this_thread::sleep_for( + std::chrono::milliseconds(FLAGS_statsd_flush_interval)); } } -StatsClient client; - -} // namespace stats - -void InitStatsLogging() { - if (FLAGS_statsd_address != "") { - stats::client.Start({FLAGS_statsd_address, (uint16_t)FLAGS_statsd_port}); - } -} - -void StopStatsLogging() { stats::client.Stop(); } - void LogStat(const std::string &metric_path, double value, const std::vector> &tags) { - stats::client.Log({metric_path, tags, value}); + if (stats_running) { + stats_queue.push(statsd_prefix + metric_path, tags, value); + } } + +void InitStatsLogging(std::string prefix) { + if (!prefix.empty()) { + statsd_prefix = prefix + "."; + } + if (FLAGS_statsd_address != "") { + stats_running = true; + stats_dispatch_thread = std::thread( + StatsDispatchMain, io::network::Endpoint{FLAGS_statsd_address, + (uint16_t)FLAGS_statsd_port}); + counter_refresh_thread = std::thread(RefreshMetrics); + } +} + +void StopStatsLogging() { + if (stats_running) { + stats_running = false; + stats_dispatch_thread.join(); + counter_refresh_thread.join(); + } +} + +} // namespace stats diff --git a/src/stats/stats.hpp b/src/stats/stats.hpp index c50ce0951..b3dd2f703 100644 --- a/src/stats/stats.hpp +++ b/src/stats/stats.hpp @@ -3,44 +3,31 @@ #pragma once #include +#include -#include "data_structures/concurrent/push_queue.hpp" -#include "io/network/endpoint.hpp" -#include "stats/stats_rpc_messages.hpp" +#include "gflags/gflags.h" -DECLARE_string(statsd_address); -DECLARE_int32(statsd_port); -DECLARE_int32(statsd_flush_interval); +#include "stats/metrics.hpp" namespace stats { -// TODO (buda): documentation + tests -class StatsClient { - public: - StatsClient(); +/** + * Start sending metrics to StatsD server. + * + * @param prefix prefix to prepend to exported keys + */ +void InitStatsLogging(std::string prefix = ""); - // To be clear. - StatsClient(const StatsClient &other) = delete; - StatsClient(StatsClient &&other) = delete; - StatsClient &operator=(const StatsClient &) = delete; - StatsClient &operator=(StatsClient &&) = delete; - - void Log(StatsReq req); - - void Start(const io::network::Endpoint &endpoint); - void Stop(); - - private: - ConcurrentPushQueue queue_; - std::atomic is_running_{false}; - std::thread dispatch_thread_; -}; - -} // namespace stats - -// TODO (buda): ON/OFF compile OR runtime parameter? - -void InitStatsLogging(); +/** + * Stop sending metrics to StatsD server. This should be called before exiting + * program. + */ void StopStatsLogging(); + +/** + * Send a value to StatsD with current timestamp. + */ void LogStat(const std::string &metric_path, double value, const std::vector> &tags = {}); + +} // namespace stats diff --git a/tests/macro_benchmark/clients/card_fraud_client.cpp b/tests/macro_benchmark/clients/card_fraud_client.cpp index ce2eb5dfd..469fc60b3 100644 --- a/tests/macro_benchmark/clients/card_fraud_client.cpp +++ b/tests/macro_benchmark/clients/card_fraud_client.cpp @@ -6,6 +6,7 @@ #include "long_running_common.hpp" #include "stats/stats.hpp" +#include "stats/stats_rpc_messages.hpp" // TODO(mtomic): this sucks but I don't know a different way to make it work #include "boost/archive/binary_iarchive.hpp" @@ -16,7 +17,6 @@ BOOST_CLASS_EXPORT(stats::StatsRes); BOOST_CLASS_EXPORT(stats::BatchStatsReq); BOOST_CLASS_EXPORT(stats::BatchStatsRes); - class CardFraudClient : public TestClient { public: CardFraudClient(int id, int num_pos, nlohmann::json config) @@ -108,7 +108,7 @@ int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); - InitStatsLogging(); + stats::InitStatsLogging(); nlohmann::json config; std::cin >> config; @@ -128,7 +128,7 @@ int main(int argc, char **argv) { RunMultithreadedTest(clients); - StopStatsLogging(); + stats::StopStatsLogging(); return 0; } diff --git a/tests/macro_benchmark/clients/long_running_common.hpp b/tests/macro_benchmark/clients/long_running_common.hpp index f00f23c15..15aa18c85 100644 --- a/tests/macro_benchmark/clients/long_running_common.hpp +++ b/tests/macro_benchmark/clients/long_running_common.hpp @@ -5,6 +5,7 @@ #include "bolt_client.hpp" #include "common.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "stats/metrics.hpp" #include "stats/stats.hpp" #include "utils/network.hpp" #include "utils/timer.hpp" @@ -22,6 +23,11 @@ DEFINE_int32(duration, 30, "Number of seconds to execute benchmark"); DEFINE_string(group, "unknown", "Test group name"); DEFINE_string(scenario, "unknown", "Test scenario name"); +static const auto EXECUTED_QUERIES = + fmt::format("{}.{}.executed_queries", FLAGS_group, FLAGS_scenario); + +auto &executed_queries = stats::GetCounter(EXECUTED_QUERIES); + class TestClient { public: TestClient() @@ -69,6 +75,7 @@ class TestClient { stats_[query].push_back(std::move(metadata)); } } + executed_queries.Bump(); return result; } @@ -85,13 +92,6 @@ class TestClient { }; void RunMultithreadedTest(std::vector> &clients) { - static const auto HOSTNAME = - utils::GetHostname().value_or("unknown_hostname"); - static const auto TEST_PREFIX = fmt::format("{}.long_running.{}.{}", HOSTNAME, - FLAGS_group, FLAGS_scenario); - static const auto EXECUTED_QUERIES = - fmt::format("{}.executed_queries", TEST_PREFIX); - CHECK((int)clients.size() == FLAGS_num_workers); // Open stream for writing stats. @@ -110,7 +110,6 @@ void RunMultithreadedTest(std::vector> &clients) { client->Run(); } LOG(INFO) << "Starting test with " << clients.size() << " workers"; - uint64_t executed_queries = 0; while (timer.Elapsed().count() < FLAGS_duration) { std::unordered_map> aggregated_stats; @@ -125,8 +124,6 @@ void RunMultithreadedTest(std::vector> &clients) { auto &query_stats = stats[client_query_stats.first]; query_stats.insert(query_stats.end(), client_query_stats.second.begin(), client_query_stats.second.end()); - executed_queries += - client_query_stats.second.end() - client_query_stats.second.begin(); } } @@ -134,7 +131,10 @@ void RunMultithreadedTest(std::vector> &clients) { // little bit chaotic. Think about refactoring this part to only use json // and write DecodedValue to json converter. const std::vector fields = { - "wall_time", "parsing_time", "planning_time", "plan_execution_time", + "wall_time", + "parsing_time", + "planning_time", + "plan_execution_time", }; for (const auto &query_stats : stats) { std::map new_aggregated_query_stats; @@ -161,17 +161,16 @@ void RunMultithreadedTest(std::vector> &clients) { } } - LogStat(EXECUTED_QUERIES, executed_queries); - - out << "{\"num_executed_queries\": " << executed_queries << ", " + out << "{\"num_executed_queries\": " << executed_queries.Value() << ", " << "\"elapsed_time\": " << timer.Elapsed().count() << ", \"queries\": ["; - utils::PrintIterable(out, aggregated_stats, ", ", [](auto &stream, - const auto &x) { - stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": "; - PrintJsonDecodedValue(stream, DecodedValue(x.second)); - stream << "}"; - }); + utils::PrintIterable( + out, aggregated_stats, ", ", [](auto &stream, const auto &x) { + stream << "{\"query\": " << nlohmann::json(x.first) + << ", \"stats\": "; + PrintJsonDecodedValue(stream, DecodedValue(x.second)); + stream << "}"; + }); out << "]}" << std::endl; out.flush(); std::this_thread::sleep_for(1s); diff --git a/tests/unit/metrics.cpp b/tests/unit/metrics.cpp new file mode 100644 index 000000000..25fd15a7f --- /dev/null +++ b/tests/unit/metrics.cpp @@ -0,0 +1,90 @@ +#include "stats/metrics.hpp" + +#include + +#include "gtest/gtest.h" + +using namespace std::chrono_literals; + +using namespace stats; + +TEST(Metrics, Counter) { + Counter &x = GetCounter("counter"); + EXPECT_EQ(*x.Flush(), 0); + EXPECT_EQ(x.Value(), 0); + x.Bump(); + EXPECT_EQ(*x.Flush(), 1); + EXPECT_EQ(x.Value(), 1); + + Counter &y = GetCounter("counter"); + EXPECT_EQ(*y.Flush(), 1); + EXPECT_EQ(y.Value(), 1); + + y.Bump(5); + EXPECT_EQ(*x.Flush(), 6); + EXPECT_EQ(x.Value(), 6); + EXPECT_EQ(*y.Flush(), 6); + EXPECT_EQ(y.Value(), 6); +} + +TEST(Metrics, Gauge) { + Gauge &x = GetGauge("gauge"); + EXPECT_EQ(*x.Flush(), 0); + x.Set(1); + EXPECT_EQ(*x.Flush(), 1); + + Gauge &y = GetGauge("gauge"); + EXPECT_EQ(*y.Flush(), 1); + + x.Set(2); + EXPECT_EQ(*x.Flush(), 2); + EXPECT_EQ(*y.Flush(), 2); +} + +TEST(Metrics, IntervalMin) { + IntervalMin &x = GetIntervalMin("min"); + EXPECT_EQ(x.Flush(), std::experimental::nullopt); + x.Add(5); + x.Add(3); + EXPECT_EQ(*x.Flush(), 3); + EXPECT_EQ(x.Flush(), std::experimental::nullopt); + x.Add(3); + x.Add(5); + EXPECT_EQ(*x.Flush(), 3); + EXPECT_EQ(x.Flush(), std::experimental::nullopt); +} + +TEST(Metrics, IntervalMax) { + IntervalMax &x = GetIntervalMax("max"); + EXPECT_EQ(x.Flush(), std::experimental::nullopt); + x.Add(5); + x.Add(3); + EXPECT_EQ(*x.Flush(), 5); + EXPECT_EQ(x.Flush(), std::experimental::nullopt); + x.Add(3); + x.Add(5); + EXPECT_EQ(*x.Flush(), 5); + EXPECT_EQ(x.Flush(), std::experimental::nullopt); +} + +TEST(Metrics, Stopwatch) { + auto d1 = Stopwatch("stopwatch", [] { std::this_thread::sleep_for(150ms); }); + EXPECT_TRUE(140 <= d1 && d1 <= 160); + + auto d2 = Stopwatch("stopwatch", [] { std::this_thread::sleep_for(300ms); }); + EXPECT_TRUE(290 <= d2 && d2 <= 310); + + Counter &total_time = GetCounter("stopwatch.total_time"); + Counter &count = GetCounter("stopwatch.count"); + IntervalMin &min = GetIntervalMin("stopwatch.min"); + IntervalMax &max = GetIntervalMax("stopwatch.max"); + + EXPECT_TRUE(430 <= total_time.Value() && total_time.Value() <= 470); + EXPECT_EQ(count.Value(), 2); + + auto m = *min.Flush(); + EXPECT_TRUE(140 <= m && m <= 160); + + auto M = *max.Flush(); + EXPECT_TRUE(290 <= M && M <= 310); +} diff --git a/tools/src/mg_statsd/main.cpp b/tools/src/mg_statsd/main.cpp index 1c8fafec8..29aa0168d 100644 --- a/tools/src/mg_statsd/main.cpp +++ b/tools/src/mg_statsd/main.cpp @@ -3,6 +3,7 @@ #include "communication/rpc/server.hpp" #include "io/network/socket.hpp" #include "stats/stats.hpp" +#include "stats/stats_rpc_messages.hpp" #include "utils/flag_validation.hpp" DEFINE_string(interface, "0.0.0.0", diff --git a/tools/tests/statsd/mg_statsd_client.cpp b/tools/tests/statsd/mg_statsd_client.cpp index e48a8821a..778417724 100644 --- a/tools/tests/statsd/mg_statsd_client.cpp +++ b/tools/tests/statsd/mg_statsd_client.cpp @@ -1,6 +1,8 @@ #include "gflags/gflags.h" +#include "glog/logging.h" #include "stats/stats.hpp" +#include "stats/stats_rpc_messages.hpp" #include "utils/string.hpp" // TODO (buda): move this logic to a unit test @@ -47,7 +49,7 @@ int main(int argc, char *argv[]) { LOG(INFO) << "Usage: metric_path tag1=value1 ... tagn=valuen " "metric_value"; - InitStatsLogging(); + stats::InitStatsLogging(); std::string line; std::string metric_path; @@ -60,7 +62,7 @@ int main(int argc, char *argv[]) { LOG(ERROR) << "Invalid input"; continue; } - LogStat(metric_path, value, tags); + stats::LogStat(metric_path, value, tags); } return 0;