diff --git a/apollo_archives.yaml b/apollo_archives.yaml index 5f83c3419..e6b0f0571 100644 --- a/apollo_archives.yaml +++ b/apollo_archives.yaml @@ -6,7 +6,6 @@ - build_release/memgraph_ha - build_release/tools/src/mg_client - build_release/tools/src/mg_import_csv - - build_release/tools/src/mg_statsd - config filename: binaries.tar.gz diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index de142f53b..616058004 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,7 +7,6 @@ add_subdirectory(requests) add_subdirectory(io) add_subdirectory(telemetry) add_subdirectory(communication) -add_subdirectory(stats) add_subdirectory(auth) add_subdirectory(slk) add_subdirectory(storage/v2) diff --git a/src/stats/CMakeLists.txt b/src/stats/CMakeLists.txt deleted file mode 100644 index 64fa053fe..000000000 --- a/src/stats/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -set(stats_src_files - metrics.cpp - stats.cpp) - -define_add_lcp(add_lcp stats_src_files stats_lcp_files) - -add_lcp(stats_rpc_messages.lcp SLK_SERIALIZE) - -add_custom_target(generate_stats_lcp DEPENDS ${stats_lcp_files}) - -add_library(mg-stats STATIC ${stats_src_files}) -target_link_libraries(mg-stats Threads::Threads mg-utils mg-io mg-comm-rpc fmt glog gflags) -add_dependencies(mg-stats generate_stats_lcp) diff --git a/src/stats/metrics.cpp b/src/stats/metrics.cpp deleted file mode 100644 index 684f95d2e..000000000 --- a/src/stats/metrics.cpp +++ /dev/null @@ -1,103 +0,0 @@ -#include "stats/metrics.hpp" - -#include <tuple> - -#include "fmt/format.h" -#include "glog/logging.h" - -namespace stats { - -std::mutex &MetricsMutex() { - static std::mutex mutex; - return mutex; -} - -std::map<std::string, std::unique_ptr<Metric>> &AccessMetrics() { - static std::map<std::string, std::unique_ptr<Metric>> metrics; - MetricsMutex().lock(); - return metrics; -} - -void ReleaseMetrics() { MetricsMutex().unlock(); } - -Metric::Metric(int64_t start_value) : value_(start_value) {} - -Counter::Counter(int64_t start_value) : Metric(start_value) {} - -void Counter::Bump(int64_t delta) { value_ += delta; } - -std::optional<int64_t> Counter::Flush() { return value_; } - -int64_t Counter::Value() { return value_; } - -Gauge::Gauge(int64_t start_value) : Metric(start_value) {} - -void Gauge::Set(int64_t value) { value_ = value; } - -std::optional<int64_t> Gauge::Flush() { return value_; } - -IntervalMin::IntervalMin(int64_t start_value) : Metric(start_value) {} - -void IntervalMin::Add(int64_t value) { - int64_t curr = value_; - while (curr > value && !value_.compare_exchange_weak(curr, value)) - ; -} - -std::optional<int64_t> IntervalMin::Flush() { - int64_t curr = value_; - value_.compare_exchange_weak(curr, std::numeric_limits<int64_t>::max()); - return curr == std::numeric_limits<int64_t>::max() ? std::nullopt - : std::make_optional(curr); -} - -IntervalMax::IntervalMax(int64_t start_value) : Metric(start_value) {} - -void IntervalMax::Add(int64_t value) { - int64_t curr = value_; - while (curr < value && !value_.compare_exchange_weak(curr, value)) - ; -} - -std::optional<int64_t> IntervalMax::Flush() { - int64_t curr = value_; - value_.compare_exchange_weak(curr, std::numeric_limits<int64_t>::min()); - return curr == std::numeric_limits<int64_t>::min() ? std::nullopt - : std::make_optional(curr); -} - -template <class T> -T &GetMetric(const std::string &name, int64_t start_value) { - auto &metrics = AccessMetrics(); - auto it = metrics.find(name); - if (it == metrics.end()) { - auto got = metrics.emplace(name, std::make_unique<T>(start_value)); - CHECK(got.second) << "Failed to create counter " << name; - it = got.first; - } - ReleaseMetrics(); - auto *ptr = dynamic_cast<T *>(it->second.get()); - if (!ptr) { - LOG(FATAL) << fmt::format("GetMetric({}) called with invalid metric type", - name); - } - return *ptr; -} - -Counter &GetCounter(const std::string &name, int64_t start_value) { - return GetMetric<Counter>(name, start_value); -} - -Gauge &GetGauge(const std::string &name, int64_t start_value) { - return GetMetric<Gauge>(name, start_value); -} - -IntervalMin &GetIntervalMin(const std::string &name) { - return GetMetric<IntervalMin>(name, std::numeric_limits<int64_t>::max()); -} - -IntervalMax &GetIntervalMax(const std::string &name) { - return GetMetric<IntervalMax>(name, std::numeric_limits<int64_t>::min()); -} - -} // namespace stats diff --git a/src/stats/metrics.hpp b/src/stats/metrics.hpp deleted file mode 100644 index 0db7e3f5f..000000000 --- a/src/stats/metrics.hpp +++ /dev/null @@ -1,202 +0,0 @@ -/** - * @file - * - * This file contains some metrics types that can be aggregated on client side - * and periodically flushed to StatsD. - */ -#pragma once - -#include <atomic> -#include <map> -#include <memory> -#include <mutex> -#include <optional> -#include <string> - -#include "fmt/format.h" - -namespace stats { - -// TODO(mtomic): it would probably be nice to have Value method for every metric -// type, however, there is no use case for this yet - -/** - * Abstract base class for all metrics. - */ -class Metric { - public: - /** - * Constructs a metric to be exported to StatsD. - * - * @param name metric will be exported to StatsD with this path - * @param value initial value - */ - virtual ~Metric() {} - - /** - * Metric refresh thread will periodically call this function. It should - * return the metric value aggregated since the last flush call or nullopt - * if there were no updates. - */ - virtual std::optional<int64_t> Flush() = 0; - - explicit Metric(int64_t start_value = 0); - - protected: - std::atomic<int64_t> value_; -}; - -/** - * A simple counter. - */ -class Counter : public Metric { - public: - explicit Counter(int64_t start_value = 0); - - /** - * Change counter value by delta. - * - * @param delta value change - */ - void Bump(int64_t delta = 1); - - /** Returns the current value of the counter. **/ - std::optional<int64_t> Flush() override; - - /** Returns the current value of the counter. **/ - int64_t Value(); - - friend Counter &GetCounter(const std::string &name); -}; - -/** - * To be used instead of Counter constructor. If counter with this name doesn't - * exist, it will be initialized with start_value. - * - * @param name counter name - * @param start_value start value - */ -Counter &GetCounter(const std::string &name, int64_t start_value = 0); - -/** - * A simple gauge. Gauge value is explicitly set, instead of being added to or - * subtracted from. - */ -class Gauge : public Metric { - public: - explicit Gauge(int64_t start_value = 0); - - /** - * Set gauge value. - * - * @param value value to be set - */ - void Set(int64_t value); - - /** Returns the current gauge value. **/ - std::optional<int64_t> Flush() override; -}; - -/** - * To be used instead of Gauge constructor. If gauge with this name doesn't - * exist, it will be initialized with start_value. - * - * @param name gauge name - * @param start_value start value - */ -Gauge &GetGauge(const std::string &name, int64_t start_value = 0); - -/** - * Aggregates minimum between two flush periods. - */ -class IntervalMin : public Metric { - public: - explicit IntervalMin(int64_t start_value); - - /** - * Add another value into the minimum computation. - * - * @param value value to be added - */ - void Add(int64_t value); - - /** - * Returns the minimum value encountered since the last flush period, - * or nullopt if no values were added. - */ - std::optional<int64_t> Flush() override; -}; - -/** - * To be used instead of IntervalMin constructor. - * - * @param name interval min name - */ -IntervalMin &GetIntervalMin(const std::string &name); - -/** - * Aggregates maximum betweenw two flush periods. - */ -class IntervalMax : public Metric { - public: - explicit IntervalMax(int64_t start_value); - - /** - * Add another value into the maximum computation. - */ - void Add(int64_t value); - - /** - * Returns the maximum value encountered since the last flush period, - * or nullopt if no values were added. - */ - std::optional<int64_t> Flush() override; -}; - -/** - * To be used instead of IntervalMax constructor. - * - * @param name interval max name - */ -IntervalMax &GetIntervalMax(const std::string &name); - -/** - * A stopwatch utility. It exports 4 metrics: total time measured since the - * beginning of the program, total number of times time intervals measured, - * minimum and maximum time interval measured since the last metric flush. - * Metrics exported by the stopwatch will be named - * [name].{total_time|count|min|max}. - * - * @param name timed event name - * @param f Callable, an action to be performed. - */ -template <class Function> -int64_t Stopwatch(const std::string &name, Function f) { - auto &total_time = GetCounter(fmt::format("{}.total_time", name)); - auto &count = GetCounter(fmt::format("{}.count", name)); - auto &min = GetIntervalMin(fmt::format("{}.min", name)); - auto &max = GetIntervalMax(fmt::format("{}.max", name)); - auto start = std::chrono::system_clock::now(); - f(); - auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( - std::chrono::system_clock::now() - start) - .count(); - total_time.Bump(duration); - count.Bump(); - min.Add(duration); - max.Add(duration); - return duration; -} - -/** - * Access internal metric list. You probably don't want to use this, - * but if you do, make sure to call ReleaseMetrics when you're done. - */ -std::map<std::string, std::unique_ptr<Metric>> &AccessMetrics(); - -/** - * Releases internal lock on metric list. - */ -void ReleaseMetrics(); - -} // namespace stats diff --git a/src/stats/stats.cpp b/src/stats/stats.cpp deleted file mode 100644 index 43236f18c..000000000 --- a/src/stats/stats.cpp +++ /dev/null @@ -1,118 +0,0 @@ -#include "stats/stats.hpp" - -#include "glog/logging.h" - -#include "communication/rpc/client.hpp" -#include "data_structures/concurrent/push_queue.hpp" -#include "stats/metrics.hpp" -#include "stats/stats_rpc_messages.hpp" -#include "utils/thread.hpp" - -DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address"); -DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port"); -DEFINE_HIDDEN_int32(statsd_flush_interval, 500, - "Stats flush interval (in milliseconds)"); - -namespace stats { - -std::string statsd_prefix = ""; -std::thread stats_dispatch_thread; -std::thread counter_refresh_thread; -std::atomic<bool> stats_running{false}; -ConcurrentPushQueue<StatsReq> stats_queue; - -void RefreshMetrics() { - LOG(INFO) << "Metrics flush thread started"; - utils::ThreadSetName("Stats refresh"); - while (stats_running) { - auto &metrics = AccessMetrics(); - for (auto &kv : metrics) { - auto value = kv.second->Flush(); - if (value) { - LogStat(kv.first, *value); - } - } - ReleaseMetrics(); - // TODO(mtomic): hardcoded sleep time - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - LOG(INFO) << "Metrics flush thread stopped"; -} - -void StatsDispatchMain(const io::network::Endpoint &endpoint) { - // TODO(mtomic): we probably want to batch based on request size and MTU - const int MAX_BATCH_SIZE = 100; - - LOG(INFO) << "Stats dispatcher thread started"; - utils::ThreadSetName("Stats dispatcher"); - - // TODO(mferencevic): stats are currently hardcoded not to use SSL - communication::ClientContext client_context; - communication::rpc::Client client(endpoint, &client_context); - - BatchStatsReq batch_request; - batch_request.requests.reserve(MAX_BATCH_SIZE); - - while (stats_running) { - auto last = stats_queue.begin(); - size_t sent = 0, total = 0; - - auto flush_batch = [&] { - try { - client.Call<BatchStatsRpc>(batch_request); - sent += batch_request.requests.size(); - } catch (const communication::rpc::RpcFailedException &) { - DLOG(WARNING) << "BatchStatsRpc failed!"; - } - total += batch_request.requests.size(); - batch_request.requests.clear(); - }; - - for (auto it = last; it != stats_queue.end(); it++) { - batch_request.requests.emplace_back(std::move(*it)); - if (batch_request.requests.size() == MAX_BATCH_SIZE) { - flush_batch(); - } - } - - if (!batch_request.requests.empty()) { - flush_batch(); - } - - VLOG(30) << fmt::format("Sent {} out of {} events from queue.", sent, - total); - last.delete_tail(); - std::this_thread::sleep_for( - std::chrono::milliseconds(FLAGS_statsd_flush_interval)); - } -} - -void LogStat(const std::string &metric_path, double value, - const std::vector<std::pair<std::string, std::string>> &tags) { - if (stats_running) { - stats_queue.push(statsd_prefix + metric_path, tags, value); - } -} - -void InitStatsLogging(std::string prefix) { - if (!prefix.empty()) { - statsd_prefix = prefix + "."; - } - if (FLAGS_statsd_address != "") { - stats_running = true; - stats_dispatch_thread = std::thread( - StatsDispatchMain, io::network::Endpoint{FLAGS_statsd_address, - (uint16_t)FLAGS_statsd_port}); - counter_refresh_thread = std::thread(RefreshMetrics); - } -} - -void StopStatsLogging() { - if (stats_running) { - stats_running = false; - stats_dispatch_thread.join(); - counter_refresh_thread.join(); - } -} - -} // namespace stats diff --git a/src/stats/stats.hpp b/src/stats/stats.hpp deleted file mode 100644 index ce9ff5c20..000000000 --- a/src/stats/stats.hpp +++ /dev/null @@ -1,33 +0,0 @@ -/// @file - -#pragma once - -#include <thread> -#include <vector> - -#include "gflags/gflags.h" - -#include "io/network/endpoint.hpp" - -namespace stats { - -/** - * Start sending metrics to StatsD server. - * - * @param prefix prefix to prepend to exported keys - */ -void InitStatsLogging(std::string prefix = ""); - -/** - * Stop sending metrics to StatsD server. This should be called before exiting - * program. - */ -void StopStatsLogging(); - -/** - * Send a value to StatsD with current timestamp. - */ -void LogStat(const std::string &metric_path, double value, - const std::vector<std::pair<std::string, std::string>> &tags = {}); - -} // namespace stats diff --git a/src/stats/stats_rpc_messages.lcp b/src/stats/stats_rpc_messages.lcp deleted file mode 100644 index 0ab4aaa7c..000000000 --- a/src/stats/stats_rpc_messages.lcp +++ /dev/null @@ -1,25 +0,0 @@ -#>cpp -#pragma once - -#include "communication/rpc/messages.hpp" -#include "slk/serialization.hpp" -#include "utils/timestamp.hpp" -cpp<# - -(lcp:namespace stats) - -(lcp:define-rpc stats - (:request - ((metric-path "std::string") - (tags "std::vector<std::pair<std::string, std::string>>") - (value :double) - (timestamp :uint64_t :initarg nil - :initval "static_cast<uint64_t>(utils::Timestamp::Now().SecSinceTheEpoch())"))) - (:response ())) - -(lcp:define-rpc batch-stats - (:request - ((requests "std::vector<StatsReq>"))) - (:response ())) - -(lcp:pop-namespace) ;; stats diff --git a/tests/macro_benchmark/CMakeLists.txt b/tests/macro_benchmark/CMakeLists.txt index 3b6233f53..202f1f462 100644 --- a/tests/macro_benchmark/CMakeLists.txt +++ b/tests/macro_benchmark/CMakeLists.txt @@ -15,16 +15,16 @@ function(add_macro_benchmark test_cpp) endfunction(add_macro_benchmark) add_macro_benchmark(clients/pokec_client.cpp) -target_link_libraries(${test_prefix}pokec_client mg-communication mg-io mg-utils mg-stats json) +target_link_libraries(${test_prefix}pokec_client mg-communication mg-io mg-utils json) add_macro_benchmark(clients/graph_500_bfs.cpp) -target_link_libraries(${test_prefix}graph_500_bfs mg-communication mg-comm-rpc mg-io mg-utils mg-stats json) +target_link_libraries(${test_prefix}graph_500_bfs mg-communication mg-io mg-utils json) add_macro_benchmark(clients/bfs_pokec_client.cpp) -target_link_libraries(${test_prefix}bfs_pokec_client mg-communication mg-io mg-utils mg-stats json) +target_link_libraries(${test_prefix}bfs_pokec_client mg-communication mg-io mg-utils json) add_macro_benchmark(clients/query_client.cpp) target_link_libraries(${test_prefix}query_client mg-communication mg-io mg-utils) add_macro_benchmark(clients/card_fraud_client.cpp) -target_link_libraries(${test_prefix}card_fraud_client mg-communication mg-comm-rpc mg-io mg-utils mg-stats json) +target_link_libraries(${test_prefix}card_fraud_client mg-communication mg-io mg-utils json) diff --git a/tests/macro_benchmark/clients/card_fraud_client.cpp b/tests/macro_benchmark/clients/card_fraud_client.cpp index 2e4e2b2b6..46a379528 100644 --- a/tests/macro_benchmark/clients/card_fraud_client.cpp +++ b/tests/macro_benchmark/clients/card_fraud_client.cpp @@ -5,9 +5,6 @@ #include "gflags/gflags.h" -#include "communication/rpc/client.hpp" -#include "stats/stats.hpp" -#include "stats/stats_rpc_messages.hpp" #include "utils/rw_lock.hpp" #include "long_running_common.hpp" @@ -23,14 +20,6 @@ DEFINE_string(config, "", "test config"); enum class Role { WORKER, ANALYTIC, CLEANUP }; -stats::Gauge &num_vertices = stats::GetGauge("vertices"); -stats::Gauge &num_edges = stats::GetGauge("edges"); - -void UpdateStats() { - num_vertices.Set(num_pos + num_cards + num_transactions); - num_edges.Set(2 * num_transactions); -} - int64_t NumNodesWithLabel(Client &client, std::string label) { std::string query = fmt::format("MATCH (u :{}) RETURN count(u)", label); auto result = ExecuteNTimesTillSuccess(client, query, {}, MAX_RETRIES); @@ -176,7 +165,6 @@ class CardFraudClient : public TestClient { card_id, tx_id, pos_id); num_transactions++; - UpdateStats(); } int64_t UniformInt(int64_t a, int64_t b) { @@ -261,7 +249,6 @@ class CardFraudClient : public TestClient { num_transactions, num_transactions_db, deleted, num_transactions - num_transactions_db); num_transactions = num_transactions_db; - UpdateStats(); } std::this_thread::sleep_for( @@ -334,9 +321,6 @@ int main(int argc, char **argv) { communication::Init(); - stats::InitStatsLogging( - fmt::format("client.long_running.{}.{}", FLAGS_group, FLAGS_scenario)); - Endpoint endpoint(FLAGS_address, FLAGS_port); ClientContext context(FLAGS_use_ssl); Client client(&context); @@ -382,7 +366,5 @@ int main(int argc, char **argv) { RunMultithreadedTest(clients); - stats::StopStatsLogging(); - return 0; } diff --git a/tests/macro_benchmark/clients/graph_500_bfs.cpp b/tests/macro_benchmark/clients/graph_500_bfs.cpp index 0e6d80c34..35ddb60aa 100644 --- a/tests/macro_benchmark/clients/graph_500_bfs.cpp +++ b/tests/macro_benchmark/clients/graph_500_bfs.cpp @@ -6,8 +6,6 @@ #include "gflags/gflags.h" #include "long_running_common.hpp" -#include "stats/stats.hpp" -#include "stats/stats_rpc_messages.hpp" class Graph500BfsClient : public TestClient { public: @@ -55,7 +53,5 @@ int main(int argc, char **argv) { RunMultithreadedTest(clients); - stats::StopStatsLogging(); - return 0; } diff --git a/tests/macro_benchmark/clients/long_running_common.hpp b/tests/macro_benchmark/clients/long_running_common.hpp index 96bef4b18..e608b86c2 100644 --- a/tests/macro_benchmark/clients/long_running_common.hpp +++ b/tests/macro_benchmark/clients/long_running_common.hpp @@ -14,8 +14,6 @@ #include "json/json.hpp" -#include "stats/metrics.hpp" -#include "stats/stats.hpp" #include "utils/timer.hpp" #include "common.hpp" @@ -35,9 +33,9 @@ DEFINE_int32(duration, 30, "Number of seconds to execute benchmark"); DEFINE_string(group, "unknown", "Test group name"); DEFINE_string(scenario, "unknown", "Test scenario name"); -auto &executed_queries = stats::GetCounter("executed_queries"); -auto &executed_steps = stats::GetCounter("executed_steps"); -auto &serialization_errors = stats::GetCounter("serialization_errors"); +std::atomic<uint64_t> executed_queries{0}; +std::atomic<uint64_t> executed_steps{0}; +std::atomic<uint64_t> serialization_errors{0}; class TestClient { public: @@ -59,7 +57,7 @@ class TestClient { runner_thread_ = std::thread([&] { while (keep_running_) { Step(); - executed_steps.Bump(); + ++executed_steps; } }); } @@ -82,7 +80,7 @@ class TestClient { std::tie(result, retries) = ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES); } catch (const utils::BasicException &e) { - serialization_errors.Bump(MAX_RETRIES); + serialization_errors += MAX_RETRIES; return std::nullopt; } auto wall_time = timer.Elapsed(); @@ -96,8 +94,8 @@ class TestClient { stats_[query].push_back(std::move(metadata)); } } - executed_queries.Bump(); - serialization_errors.Bump(retries); + ++executed_queries; + serialization_errors += retries; return result; } @@ -177,16 +175,11 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) { auto it = aggregated_query_stats.insert({stat.first, Value(0.0)}).first; it->second = (it->second.ValueDouble() * old_count + stat.second) / (old_count + new_count); - stats::LogStat( - fmt::format("queries.{}.{}", query_stats.first, stat.first), - (stat.second / new_count)); } - stats::LogStat(fmt::format("queries.{}.count", query_stats.first), - new_count); } - out << "{\"num_executed_queries\": " << executed_queries.Value() << ", " - << "\"num_executed_steps\": " << executed_steps.Value() << ", " + out << "{\"num_executed_queries\": " << executed_queries << ", " + << "\"num_executed_steps\": " << executed_steps << ", " << "\"elapsed_time\": " << timer.Elapsed().count() << ", \"queries\": ["; utils::PrintIterable( diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index f8e4efc3e..4b8bff568 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -78,9 +78,6 @@ target_link_libraries(${test_prefix}interpreter mg-single-node-v2 mg-auth kvstor add_unit_test(kvstore.cpp) target_link_libraries(${test_prefix}kvstore kvstore_lib glog) -add_unit_test(metrics.cpp) -target_link_libraries(${test_prefix}metrics mg-stats) - add_unit_test(mvcc.cpp) target_link_libraries(${test_prefix}mvcc mg-single-node kvstore_dummy_lib) diff --git a/tools/src/CMakeLists.txt b/tools/src/CMakeLists.txt index c6b88e982..82c332ff0 100644 --- a/tools/src/CMakeLists.txt +++ b/tools/src/CMakeLists.txt @@ -2,10 +2,6 @@ add_executable(mg_import_csv mg_import_csv/main.cpp) target_link_libraries(mg_import_csv mg-single-node kvstore_dummy_lib) -# StatsD Target -add_executable(mg_statsd mg_statsd/main.cpp) -target_link_libraries(mg_statsd mg-communication mg-io mg-utils mg-stats) - # Generate a version.hpp file set(VERSION_STRING ${memgraph_VERSION}) configure_file(../../src/version.hpp.in version.hpp @ONLY) @@ -41,4 +37,4 @@ endif() install(TARGETS mg_client RUNTIME DESTINATION bin) # Target for building all the tool executables. -add_custom_target(tools DEPENDS mg_import_csv mg_statsd mg_client mg_dump) +add_custom_target(tools DEPENDS mg_import_csv mg_client mg_dump) diff --git a/tools/src/mg_statsd/main.cpp b/tools/src/mg_statsd/main.cpp deleted file mode 100644 index e9024ed0b..000000000 --- a/tools/src/mg_statsd/main.cpp +++ /dev/null @@ -1,76 +0,0 @@ -#include "gflags/gflags.h" - -#include "communication/rpc/server.hpp" -#include "io/network/socket.hpp" -#include "stats/stats.hpp" -#include "stats/stats_rpc_messages.hpp" -#include "utils/flag_validation.hpp" - -DEFINE_string(interface, "0.0.0.0", - "Communication interface on which to listen."); -DEFINE_VALIDATED_int32(port, 2500, "Communication port on which to listen.", - FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max())); - -DEFINE_string(graphite_address, "", "Graphite address."); -DEFINE_int32(graphite_port, 0, "Graphite port."); -DEFINE_string(prefix, "", "Prefix for all collected stats"); - -std::string GraphiteFormat(const stats::StatsReq &req) { - std::stringstream sstr; - if (!FLAGS_prefix.empty()) { - sstr << FLAGS_prefix << "." << req.metric_path; - } else { - sstr << req.metric_path; - } - for (const auto &tag : req.tags) { - sstr << ";" << tag.first << "=" << tag.second; - } - sstr << " " << req.value << " " << req.timestamp << "\n"; - return sstr.str(); -} - -int main(int argc, char *argv[]) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - - // TODO(mferencevic): stats are currently hardcoded not to use SSL - communication::ServerContext server_context; - communication::rpc::Server server({FLAGS_interface, (uint16_t)FLAGS_port}, - &server_context); - - io::network::Socket graphite_socket; - - CHECK(graphite_socket.Connect( - {FLAGS_graphite_address, (uint16_t)FLAGS_graphite_port})) - << "Failed to connect to Graphite"; - graphite_socket.SetKeepAlive(); - - server.Register<stats::StatsRpc>( - [&](const auto &req_reader, auto *res_builder) { - stats::StatsReq req; - Load(&req, req_reader); - LOG(INFO) << "StatsRpc::Received"; - std::string data = GraphiteFormat(req); - graphite_socket.Write(data); - stats::StatsRes res; - Save(res, res_builder); - }); - - server.Register<stats::BatchStatsRpc>( - [&](const auto &req_reader, auto *res_builder) { - // TODO(mtomic): batching? - stats::BatchStatsReq req; - Load(&req, req_reader); - LOG(INFO) << fmt::format("BatchStatsRpc::Received: {}", - req.requests.size()); - for (size_t i = 0; i < req.requests.size(); ++i) { - std::string data = GraphiteFormat(req.requests[i]); - graphite_socket.Write(data, i + 1 < req.requests.size()); - } - stats::BatchStatsRes res; - Save(res, res_builder); - }); - - std::this_thread::sleep_until(std::chrono::system_clock::time_point::max()); - - return 0; -} diff --git a/tools/tests/CMakeLists.txt b/tools/tests/CMakeLists.txt index 6c1597db6..1d9bf9726 100644 --- a/tools/tests/CMakeLists.txt +++ b/tools/tests/CMakeLists.txt @@ -3,9 +3,6 @@ include_directories(SYSTEM ${GTEST_INCLUDE_DIR}) add_executable(mg_recovery_check mg_recovery_check.cpp) target_link_libraries(mg_recovery_check mg-single-node gtest gtest_main kvstore_dummy_lib) -add_executable(mg_statsd_client statsd/mg_statsd_client.cpp) -target_link_libraries(mg_statsd_client mg-communication mg-io mg-utils mg-stats) - # Copy CSV data to CMake build dir configure_file(csv/comment_nodes.csv csv/comment_nodes.csv COPYONLY) configure_file(csv/comment_nodes_2.csv csv/comment_nodes_2.csv COPYONLY) diff --git a/tools/tests/statsd/mg_statsd_client.cpp b/tools/tests/statsd/mg_statsd_client.cpp deleted file mode 100644 index 11527e48c..000000000 --- a/tools/tests/statsd/mg_statsd_client.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#include "gflags/gflags.h" -#include "glog/logging.h" - -#include "communication/rpc/client.hpp" -#include "stats/stats.hpp" -#include "stats/stats_rpc_messages.hpp" -#include "utils/string.hpp" - -// TODO (buda): Move this logic to a unit test. - -bool parse_input(const std::string &s, std::string &metric_path, - std::vector<std::pair<std::string, std::string>> &tags, - double &value) { - auto words = utils::Split(s, " "); - if (words.size() < 2) { - return false; - } - - metric_path = words[0]; - - try { - value = std::stod(words.back()); - } catch (std::exception &e) { - return false; - } - - tags.clear(); - for (size_t i = 1; i < words.size() - 1; ++i) { - auto tag_value = utils::Split(words[i], "=", 1); - if (tag_value.size() != 2) { - return false; - } - // TODO(mtomic): tags probably need to be escaped before sending to graphite - tags.emplace_back(tag_value[0], tag_value[1]); - } - - return true; -} - -int main(int argc, char *argv[]) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - - LOG(INFO) << "Usage: metric_path tag1=value1 ... tagn=valuen " - "metric_value"; - - stats::InitStatsLogging(); - - std::string line; - std::string metric_path; - std::vector<std::pair<std::string, std::string>> tags; - double value; - - while (true) { - std::getline(std::cin, line); - if (!parse_input(line, metric_path, tags, value)) { - LOG(ERROR) << "Invalid input"; - continue; - } - stats::LogStat(metric_path, value, tags); - } - - return 0; -}