Expose system metrics over HTTP Endpoint (#940)

This commit is contained in:
Josipmrden 2023-05-17 19:20:56 +02:00 committed by Marko Budiselic
parent 0d9bd74a8a
commit 651b6f3a5a
28 changed files with 1369 additions and 144 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -22,10 +22,15 @@
#include "communication/bolt/v1/state.hpp"
#include "communication/bolt/v1/states/handlers.hpp"
#include "communication/bolt/v1/value.hpp"
#include "utils/event_counter.hpp"
#include "utils/likely.hpp"
#include "utils/logging.hpp"
#include "utils/message.hpp"
namespace memgraph::metrics {
extern const Event BoltMessages;
} // namespace memgraph::metrics
namespace memgraph::communication::bolt {
template <typename TSession>
@ -103,8 +108,10 @@ State StateExecutingRun(TSession &session, State state) {
switch (session.version_.major) {
case 1:
memgraph::metrics::IncrementCounter(memgraph::metrics::BoltMessages);
return RunHandlerV1(signature, session, state, marker);
case 4: {
memgraph::metrics::IncrementCounter(memgraph::metrics::BoltMessages);
if (session.version_.minor >= 3) {
return RunHandlerV4<TSession, 3>(signature, session, state, marker);
}

View File

@ -0,0 +1,108 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <list>
#include <memory>
#include <spdlog/spdlog.h>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
#include "communication/context.hpp"
#include "communication/http/session.hpp"
#include "utils/spin_lock.hpp"
#include "utils/synchronized.hpp"
namespace memgraph::communication::http {
template <class TRequestHandler, typename TSessionData>
class Listener final : public std::enable_shared_from_this<Listener<TRequestHandler, TSessionData>> {
using tcp = boost::asio::ip::tcp;
using SessionHandler = Session<TRequestHandler, TSessionData>;
using std::enable_shared_from_this<Listener<TRequestHandler, TSessionData>>::shared_from_this;
public:
Listener(const Listener &) = delete;
Listener(Listener &&) = delete;
Listener &operator=(const Listener &) = delete;
Listener &operator=(Listener &&) = delete;
~Listener() {}
template <typename... Args>
static std::shared_ptr<Listener> Create(Args &&...args) {
return std::shared_ptr<Listener>{new Listener(std::forward<Args>(args)...)};
}
// Start accepting incoming connections
void Run() { DoAccept(); }
tcp::endpoint GetEndpoint() const { return acceptor_.local_endpoint(); }
private:
Listener(boost::asio::io_context &ioc, TSessionData *data, ServerContext *context, tcp::endpoint endpoint)
: ioc_(ioc), data_(data), context_(context), acceptor_(ioc) {
boost::beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if (ec) {
LogError(ec, "open");
return;
}
// Allow address reuse
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
if (ec) {
LogError(ec, "set_option");
return;
}
// Bind to the server address
acceptor_.bind(endpoint, ec);
if (ec) {
LogError(ec, "bind");
return;
}
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec) {
LogError(ec, "listen");
return;
}
spdlog::info("HTTP server is listening on {}:{}", endpoint.address(), endpoint.port());
}
void DoAccept() {
acceptor_.async_accept(ioc_, [shared_this = shared_from_this()](auto ec, auto socket) {
shared_this->OnAccept(ec, std::move(socket));
});
}
void OnAccept(boost::beast::error_code ec, tcp::socket socket) {
if (ec) {
return LogError(ec, "accept");
}
SessionHandler::Create(std::move(socket), data_, *context_)->Run();
DoAccept();
}
boost::asio::io_context &ioc_;
TSessionData *data_;
ServerContext *context_;
tcp::acceptor acceptor_;
};
} // namespace memgraph::communication::http

View File

@ -0,0 +1,65 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <thread>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include "communication/http/listener.hpp"
#include "io/network/endpoint.hpp"
namespace memgraph::communication::http {
template <class TRequestHandler, typename TSessionData>
class Server final {
using tcp = boost::asio::ip::tcp;
public:
explicit Server(io::network::Endpoint endpoint, TSessionData *data, ServerContext *context)
: listener_{Listener<TRequestHandler, TSessionData>::Create(
ioc_, data, context, tcp::endpoint{boost::asio::ip::make_address(endpoint.address), endpoint.port})} {}
Server(const Server &) = delete;
Server(Server &&) = delete;
Server &operator=(const Server &) = delete;
Server &operator=(Server &&) = delete;
~Server() {
MG_ASSERT(!background_thread_ || (ioc_.stopped() && !background_thread_->joinable()),
"Server wasn't shutdown properly");
}
void Start() {
MG_ASSERT(!background_thread_, "The server was already started!");
listener_->Run();
background_thread_.emplace([this] { ioc_.run(); });
}
void Shutdown() { ioc_.stop(); }
void AwaitShutdown() {
if (background_thread_ && background_thread_->joinable()) {
background_thread_->join();
}
}
bool IsRunning() const { return background_thread_ && !ioc_.stopped(); }
tcp::endpoint GetEndpoint() const { return listener_->GetEndpoint(); }
private:
boost::asio::io_context ioc_;
std::shared_ptr<Listener<TRequestHandler, TSessionData>> listener_;
std::optional<std::thread> background_thread_;
};
} // namespace memgraph::communication::http

View File

@ -0,0 +1,193 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <deque>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <variant>
#include <spdlog/spdlog.h>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core/buffers_to_string.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/version.hpp>
#include <json/json.hpp>
#include "communication/context.hpp"
#include "utils/logging.hpp"
#include "utils/variant_helpers.hpp"
namespace memgraph::communication::http {
inline constexpr uint16_t kSSLExpirySeconds = 30;
inline void LogError(boost::beast::error_code ec, const std::string_view what) {
spdlog::warn("HTTP session failed on {}: {}", what, ec.message());
}
template <class TRequestHandler, typename TSessionData>
class Session : public std::enable_shared_from_this<Session<TRequestHandler, TSessionData>> {
using tcp = boost::asio::ip::tcp;
using std::enable_shared_from_this<Session<TRequestHandler, TSessionData>>::shared_from_this;
public:
template <typename... Args>
static std::shared_ptr<Session> Create(Args &&...args) {
return std::shared_ptr<Session>{new Session{std::forward<Args>(args)...}};
}
void Run() {
if (auto *ssl = std::get_if<SSLSocket>(&stream_); ssl != nullptr) {
try {
boost::beast::get_lowest_layer(*ssl).expires_after(std::chrono::seconds(kSSLExpirySeconds));
ssl->handshake(boost::asio::ssl::stream_base::server);
} catch (const boost::system::system_error &e) {
spdlog::warn("Failed on SSL handshake: {}", e.what());
return;
}
}
// run on the strand
boost::asio::dispatch(strand_, [shared_this = shared_from_this()] { shared_this->DoRead(); });
}
private:
using PlainSocket = boost::beast::tcp_stream;
using SSLSocket = boost::beast::ssl_stream<boost::beast::tcp_stream>;
explicit Session(tcp::socket &&socket, TSessionData *data, ServerContext &context)
: stream_(CreateSocket(std::move(socket), context)),
handler_(data),
strand_{boost::asio::make_strand(GetExecutor())} {}
std::variant<PlainSocket, SSLSocket> CreateSocket(tcp::socket &&socket, ServerContext &context) {
if (context.use_ssl()) {
ssl_context_.emplace(context.context_clone());
return Session::SSLSocket{std::move(socket), *ssl_context_};
}
return Session::PlainSocket{std::move(socket)};
}
void OnWrite(boost::beast::error_code ec, size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
close_ = true;
return LogError(ec, "write");
}
if (close_) {
DoClose();
return;
}
res_ = nullptr;
DoRead();
}
void DoRead() {
req_ = {};
ExecuteForStream([this](auto &&stream) {
boost::beast::get_lowest_layer(stream).expires_after(std::chrono::seconds(kSSLExpirySeconds));
boost::beast::http::async_read(
stream, buffer_, req_,
boost::asio::bind_executor(strand_, std::bind_front(&Session::OnRead, shared_from_this())));
});
}
void OnRead(boost::beast::error_code ec, size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec == boost::beast::http::error::end_of_stream) {
DoClose();
return;
}
if (ec) {
return LogError(ec, "read");
}
auto async_write = [this](boost::beast::http::response<boost::beast::http::string_body> msg) {
ExecuteForStream([this, &msg](auto &&stream) {
// The lifetime of the message has to extend
// for the duration of the async operation so
// we use a shared_ptr to manage it.
auto sp = std::make_shared<boost::beast::http::response<boost::beast::http::string_body>>(std::move(msg));
// Store a type-erased version of the shared
// pointer in the class to keep it alive.
res_ = sp;
// Write the response
boost::beast::http::async_write(
stream, *sp, boost::asio::bind_executor(strand_, std::bind_front(&Session::OnWrite, shared_from_this())));
});
};
// handle request
handler_.HandleRequest(std::move(req_), async_write);
}
void DoClose() {
std::visit(utils::Overloaded{[this](SSLSocket &stream) {
boost::beast::get_lowest_layer(stream).expires_after(std::chrono::seconds(30));
// Perform the SSL shutdown
stream.async_shutdown(
boost::beast::bind_front_handler(&Session::OnClose, shared_from_this()));
},
[](PlainSocket &stream) {
boost::beast::error_code ec;
stream.socket().shutdown(tcp::socket::shutdown_send, ec);
}},
stream_);
}
void OnClose(boost::beast::error_code ec) {
if (ec) {
LogError(ec, "close");
}
// At this point the connection is closed gracefully
}
auto GetExecutor() {
return std::visit(utils::Overloaded{[](auto &&stream) { return stream.get_executor(); }}, stream_);
}
template <typename F>
decltype(auto) ExecuteForStream(F &&fn) {
return std::visit(utils::Overloaded{std::forward<F>(fn)}, stream_);
}
std::optional<std::reference_wrapper<boost::asio::ssl::context>> ssl_context_;
std::variant<PlainSocket, SSLSocket> stream_;
boost::beast::flat_buffer buffer_;
TRequestHandler handler_;
boost::beast::http::request<boost::beast::http::string_body> req_;
std::shared_ptr<void> res_;
boost::asio::strand<boost::beast::tcp_stream::executor_type> strand_;
bool close_{false};
};
} // namespace memgraph::communication::http

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -30,6 +30,7 @@
#include "communication/context.hpp"
#include "communication/v2/pool.hpp"
#include "communication/v2/session.hpp"
#include "utils/message.hpp"
#include "utils/spin_lock.hpp"
#include "utils/synchronized.hpp"

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -41,11 +41,21 @@
#include <boost/beast/websocket/rfc6455.hpp>
#include <boost/system/detail/error_code.hpp>
#include "communication/buffer.hpp"
#include "communication/context.hpp"
#include "communication/exceptions.hpp"
#include "utils/event_counter.hpp"
#include "utils/logging.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/variant_helpers.hpp"
namespace memgraph::metrics {
extern const Event ActiveSessions;
extern const Event ActiveTCPSessions;
extern const Event ActiveSSLSessions;
extern const Event ActiveWebSocketSessions;
} // namespace memgraph::metrics
namespace memgraph::communication::v2 {
/**
@ -99,6 +109,8 @@ class WebsocketSession : public std::enable_shared_from_this<WebsocketSession<TS
// Start the asynchronous accept operation
template <class Body, class Allocator>
void DoAccept(boost::beast::http::request<Body, boost::beast::http::basic_fields<Allocator>> req) {
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveWebSocketSessions);
execution_active_ = true;
// Set suggested timeout settings for the websocket
ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::server));
@ -213,6 +225,10 @@ class WebsocketSession : public std::enable_shared_from_this<WebsocketSession<TS
if (!IsConnected()) {
return;
}
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveSessions);
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveWebSocketSessions);
if (ec) {
return OnError(ec, "close");
}
@ -259,12 +275,19 @@ class Session final : public std::enable_shared_from_this<Session<TSession, TSes
if (execution_active_) {
return false;
}
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveSessions);
execution_active_ = true;
timeout_timer_.async_wait(boost::asio::bind_executor(strand_, std::bind(&Session::OnTimeout, shared_from_this())));
if (std::holds_alternative<SSLSocket>(socket_)) {
utils::OnScopeExit increment_counter(
[] { memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveSSLSessions); });
boost::asio::dispatch(strand_, [shared_this = shared_from_this()] { shared_this->DoHandshake(); });
} else {
utils::OnScopeExit increment_counter(
[] { memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveTCPSessions); });
boost::asio::dispatch(strand_, [shared_this = shared_from_this()] { shared_this->DoRead(); });
}
return true;
@ -450,6 +473,14 @@ class Session final : public std::enable_shared_from_this<Session<TSession, TSes
}
void OnClose(const boost::system::error_code &ec) {
if (ssl_context_.has_value()) {
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveSSLSessions);
} else {
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveTCPSessions);
}
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveSessions);
if (ec) {
return OnError(ec);
}
@ -465,7 +496,7 @@ class Session final : public std::enable_shared_from_this<Session<TSession, TSes
if (timeout_timer_.expiry() <= boost::asio::steady_timer::clock_type::now()) {
// The deadline has passed. Stop the session. The other actors will
// terminate as soon as possible.
spdlog::info("Shutting down session after {} of inactivity", timeout_seconds_);
spdlog::info("Shutting down session after {} seconds of inactivity", timeout_seconds_.count());
DoShutdown();
} else {
// Put the actor back to sleep.

View File

@ -0,0 +1,4 @@
set(mg_http_handlers_sources)
add_library(mg-http-handlers STATIC ${mg_http_handlers_sources})
target_link_libraries(mg-http-handlers mg-query mg-storage-v2)

View File

@ -0,0 +1,211 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <atomic>
#include <tuple>
#include <vector>
#include <spdlog/spdlog.h>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <json/json.hpp>
#include <utils/event_counter.hpp>
#include <utils/event_gauge.hpp>
#include "storage/v2/storage.hpp"
#include "utils/event_gauge.hpp"
#include "utils/event_histogram.hpp"
namespace memgraph::http {
struct MetricsResponse {
uint64_t vertex_count;
uint64_t edge_count;
double average_degree;
uint64_t memory_usage;
uint64_t disk_usage;
// Storage of all the counter values throughout the system
// e.g. number of active transactions
std::vector<std::tuple<std::string, std::string, uint64_t>> event_counters{};
// Storage of all the current values throughout the system
std::vector<std::tuple<std::string, std::string, uint64_t>> event_gauges{};
// Storage of all the percentile values across the histograms in the system
// e.g. query latency percentiles, snapshot recovery duration percentiles, etc.
std::vector<std::tuple<std::string, std::string, uint64_t>> event_histograms{};
};
template <typename TSessionData>
class MetricsService {
public:
explicit MetricsService(TSessionData *data) : db_(data->db) {}
nlohmann::json GetMetricsJSON() {
auto response = GetMetrics();
return AsJson(response);
}
private:
const storage::Storage *db_;
MetricsResponse GetMetrics() {
auto info = db_->GetInfo();
return MetricsResponse{.vertex_count = info.vertex_count,
.edge_count = info.edge_count,
.average_degree = info.average_degree,
.memory_usage = info.memory_usage,
.disk_usage = info.disk_usage,
.event_counters = GetEventCounters(),
.event_gauges = GetEventGauges(),
.event_histograms = GetEventHistograms()};
}
nlohmann::json AsJson(MetricsResponse response) {
auto metrics_response = nlohmann::json();
const auto *general_type = "General";
metrics_response[general_type]["vertex_count"] = response.vertex_count;
metrics_response[general_type]["edge_count"] = response.edge_count;
metrics_response[general_type]["average_degree"] = response.average_degree;
metrics_response[general_type]["memory_usage"] = response.memory_usage;
metrics_response[general_type]["disk_usage"] = response.disk_usage;
for (const auto &[name, type, value] : response.event_counters) {
metrics_response[type][name] = value;
}
for (const auto &[name, type, value] : response.event_gauges) {
metrics_response[type][name] = value;
}
for (const auto &[name, type, value] : response.event_histograms) {
metrics_response[type][name] = value;
}
return metrics_response;
}
auto GetEventCounters() {
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
std::vector<std::tuple<std::string, std::string, uint64_t>> event_counters{};
for (auto i = 0; i < memgraph::metrics::CounterEnd(); i++) {
event_counters.emplace_back(memgraph::metrics::GetCounterName(i), memgraph::metrics::GetCounterType(i),
memgraph::metrics::global_counters[i].load(std::memory_order_acquire));
}
return event_counters;
}
auto GetEventGauges() {
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
std::vector<std::tuple<std::string, std::string, uint64_t>> event_gauges{};
for (auto i = 0; i < memgraph::metrics::GaugeEnd(); i++) {
event_gauges.emplace_back(memgraph::metrics::GetGaugeName(i), memgraph::metrics::GetGaugeType(i),
memgraph::metrics::global_gauges[i].load(std::memory_order_acquire));
}
return event_gauges;
}
auto GetEventHistograms() {
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
std::vector<std::tuple<std::string, std::string, uint64_t>> event_histograms{};
for (auto i = 0; i < memgraph::metrics::HistogramEnd(); i++) {
const auto *name = memgraph::metrics::GetHistogramName(i);
auto &histogram = memgraph::metrics::global_histograms[i];
for (auto &[percentile, value] : histogram.YieldPercentiles()) {
auto metric_name = std::string(name) + "_" + std::to_string(percentile) + "p";
event_histograms.emplace_back(metric_name, memgraph::metrics::GetHistogramType(i), value);
}
}
return event_histograms;
}
};
template <typename TSessionData>
class MetricsRequestHandler final {
public:
explicit MetricsRequestHandler(TSessionData *data) : service_(data) {
spdlog::info("Basic request handler started!");
}
MetricsRequestHandler(const MetricsRequestHandler &) = delete;
MetricsRequestHandler(MetricsRequestHandler &&) = delete;
MetricsRequestHandler &operator=(const MetricsRequestHandler &) = delete;
MetricsRequestHandler &operator=(MetricsRequestHandler &&) = delete;
~MetricsRequestHandler() = default;
template <class Body, class Allocator>
// NOLINTNEXTLINE(misc-unused-parameters)
void HandleRequest(boost::beast::http::request<Body, boost::beast::http::basic_fields<Allocator>> &&req,
std::function<void(boost::beast::http::response<boost::beast::http::string_body>)> &&send) {
auto response_json = nlohmann::json();
// Returns a bad request response
auto const bad_request = [&req, &response_json](const auto why) {
response_json["error"] = std::string(why);
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
boost::beast::http::response<boost::beast::http::string_body> res{boost::beast::http::status::bad_request,
req.version()};
res.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(boost::beast::http::field::content_type, "application/json");
res.keep_alive(req.keep_alive());
res.body() = response_json.dump();
res.prepare_payload();
return res;
};
// Make sure we can handle the method
if (req.method() != boost::beast::http::verb::get) {
return send(bad_request("Unknown HTTP-method"));
}
// Request path must be absolute and not contain "..".
if (req.target().empty() || req.target()[0] != '/' || req.target().find("..") != boost::beast::string_view::npos) {
return send(bad_request("Illegal request-target"));
}
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
boost::beast::http::string_body::value_type body;
auto service_response = service_.GetMetricsJSON();
body.append(service_response.dump());
// Cache the size since we need it after the move
const auto size = body.size();
// Respond to GET request
// NOLINTNEXTLINE(cppcoreguidelines-init-variables)
boost::beast::http::response<boost::beast::http::string_body> res{
std::piecewise_construct, std::make_tuple(std::move(body)),
std::make_tuple(boost::beast::http::status::ok, req.version())};
res.set(boost::beast::http::field::server, BOOST_BEAST_VERSION_STRING);
res.set(boost::beast::http::field::content_type, "application/json");
res.content_length(size);
res.keep_alive(req.keep_alive());
return send(std::move(res));
}
private:
MetricsService<TSessionData> service_;
};
} // namespace memgraph::http

View File

@ -36,11 +36,13 @@
#include "auth/models.hpp"
#include "communication/bolt/v1/constants.hpp"
#include "communication/http/server.hpp"
#include "communication/websocket/auth.hpp"
#include "communication/websocket/server.hpp"
#include "glue/auth_checker.hpp"
#include "glue/auth_handler.hpp"
#include "helpers.hpp"
#include "http_handlers/metrics.hpp"
#include "license/license.hpp"
#include "license/license_sender.hpp"
#include "py/py.hpp"
@ -113,6 +115,9 @@ DEFINE_string(bolt_address, "0.0.0.0", "IP address on which the Bolt server shou
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_string(monitoring_address, "0.0.0.0",
"IP address on which the websocket server for Memgraph monitoring should listen.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_string(metrics_address, "0.0.0.0",
"IP address on which the Memgraph server for exposing metrics should listen.");
DEFINE_VALIDATED_int32(bolt_port, 7687, "Port on which the Bolt server should listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
@ -120,6 +125,9 @@ DEFINE_VALIDATED_int32(monitoring_port, 7444,
"Port on which the websocket server for Memgraph monitoring should listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_int32(metrics_port, 9091, "Port on which the Memgraph server for exposing metrics should listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_int32(bolt_num_workers, std::max(std::thread::hardware_concurrency(), 1U),
"Number of workers used by the Bolt server. By default, this will be the "
"number of processing units available on the machine.",
@ -492,6 +500,10 @@ void InitFromCypherlFile(memgraph::query::InterpreterContext &ctx, std::string c
}
}
namespace memgraph::metrics {
extern const Event ActiveBoltSessions;
} // namespace memgraph::metrics
class BoltSession final : public memgraph::communication::bolt::Session<memgraph::communication::v2::InputStream,
memgraph::communication::v2::OutputStream> {
public:
@ -509,10 +521,12 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
#endif
endpoint_(endpoint),
run_id_(data->run_id) {
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveBoltSessions);
interpreter_context_->interpreters.WithLock([this](auto &interpreters) { interpreters.insert(&interpreter_); });
}
~BoltSession() override {
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveBoltSessions);
interpreter_context_->interpreters.WithLock([this](auto &interpreters) { interpreters.erase(&interpreter_); });
}
@ -672,6 +686,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
};
using ServerT = memgraph::communication::v2::Server<BoltSession, SessionData>;
using MonitoringServerT =
memgraph::communication::http::Server<memgraph::http::MetricsRequestHandler<SessionData>, SessionData>;
using memgraph::communication::ServerContext;
// Needed to correctly handle memgraph destruction from a signal handler.
@ -981,8 +997,9 @@ int main(int argc, char **argv) {
});
telemetry->AddCollector("event_counters", []() -> nlohmann::json {
nlohmann::json ret;
for (size_t i = 0; i < EventCounter::End(); ++i) {
ret[EventCounter::GetName(i)] = EventCounter::global_counters[i].load(std::memory_order_relaxed);
for (size_t i = 0; i < memgraph::metrics::CounterEnd(); ++i) {
ret[memgraph::metrics::GetCounterName(i)] =
memgraph::metrics::global_counters[i].load(std::memory_order_relaxed);
}
return ret;
});
@ -998,6 +1015,43 @@ int main(int argc, char **argv) {
{FLAGS_monitoring_address, static_cast<uint16_t>(FLAGS_monitoring_port)}, &context, websocket_auth};
AddLoggerSink(websocket_server.GetLoggingSink());
MonitoringServerT metrics_server{
{FLAGS_metrics_address, static_cast<uint16_t>(FLAGS_metrics_port)}, &session_data, &context};
#ifdef MG_ENTERPRISE
if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) {
// Handler for regular termination signals
auto shutdown = [&metrics_server, &websocket_server, &server, &interpreter_context] {
// Server needs to be shutdown first and then the database. This prevents
// a race condition when a transaction is accepted during server shutdown.
server.Shutdown();
// After the server is notified to stop accepting and processing
// connections we tell the execution engine to stop processing all pending
// queries.
memgraph::query::Shutdown(&interpreter_context);
websocket_server.Shutdown();
metrics_server.Shutdown();
};
InitSignalHandlers(shutdown);
} else {
// Handler for regular termination signals
auto shutdown = [&websocket_server, &server, &interpreter_context] {
// Server needs to be shutdown first and then the database. This prevents
// a race condition when a transaction is accepted during server shutdown.
server.Shutdown();
// After the server is notified to stop accepting and processing
// connections we tell the execution engine to stop processing all pending
// queries.
memgraph::query::Shutdown(&interpreter_context);
websocket_server.Shutdown();
};
InitSignalHandlers(shutdown);
}
#else
// Handler for regular termination signals
auto shutdown = [&websocket_server, &server, &interpreter_context] {
// Server needs to be shutdown first and then the database. This prevents
@ -1007,14 +1061,22 @@ int main(int argc, char **argv) {
// connections we tell the execution engine to stop processing all pending
// queries.
memgraph::query::Shutdown(&interpreter_context);
websocket_server.Shutdown();
};
InitSignalHandlers(shutdown);
#endif
MG_ASSERT(server.Start(), "Couldn't start the Bolt server!");
websocket_server.Start();
#ifdef MG_ENTERPRISE
if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) {
metrics_server.Start();
}
#endif
if (!FLAGS_init_data_file.empty()) {
spdlog::info("Running init data file.");
#ifdef MG_ENTERPRISE
@ -1028,6 +1090,11 @@ int main(int argc, char **argv) {
server.AwaitShutdown();
websocket_server.AwaitShutdown();
#ifdef MG_ENTERPRISE
if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) {
metrics_server.AwaitShutdown();
}
#endif
memgraph::query::procedure::gModuleRegistry.UnloadAllModules();

View File

@ -15,8 +15,6 @@
#include <unordered_set>
#include <utility>
#include <antlr4-runtime.h>
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/opencypher/generated/MemgraphCypherBaseVisitor.h"
#include "utils/exceptions.hpp"

View File

@ -60,6 +60,7 @@
#include "utils/build_info.hpp"
#include "utils/csv_parsing.hpp"
#include "utils/event_counter.hpp"
#include "utils/event_histogram.hpp"
#include "utils/exceptions.hpp"
#include "utils/flag_validation.hpp"
#include "utils/likely.hpp"
@ -74,17 +75,20 @@
#include "utils/typeinfo.hpp"
#include "utils/variant_helpers.hpp"
namespace EventCounter {
namespace memgraph::metrics {
extern Event ReadQuery;
extern Event WriteQuery;
extern Event ReadWriteQuery;
extern const Event LabelIndexCreated;
extern const Event LabelPropertyIndexCreated;
extern const Event StreamsCreated;
extern const Event TriggersCreated;
} // namespace EventCounter
extern const Event QueryExecutionLatency_us;
extern const Event CommitedTransactions;
extern const Event RollbackedTransactions;
extern const Event ActiveTransactions;
} // namespace memgraph::metrics
namespace memgraph::query {
@ -95,13 +99,13 @@ namespace {
void UpdateTypeCount(const plan::ReadWriteTypeChecker::RWType type) {
switch (type) {
case plan::ReadWriteTypeChecker::RWType::R:
EventCounter::IncrementCounter(EventCounter::ReadQuery);
memgraph::metrics::IncrementCounter(memgraph::metrics::ReadQuery);
break;
case plan::ReadWriteTypeChecker::RWType::W:
EventCounter::IncrementCounter(EventCounter::WriteQuery);
memgraph::metrics::IncrementCounter(memgraph::metrics::WriteQuery);
break;
case plan::ReadWriteTypeChecker::RWType::RW:
EventCounter::IncrementCounter(EventCounter::ReadWriteQuery);
memgraph::metrics::IncrementCounter(memgraph::metrics::ReadWriteQuery);
break;
default:
break;
@ -666,6 +670,8 @@ Callback::CallbackFunction GetKafkaCreateCallback(StreamQuery *stream_query, Exp
return config_map;
};
memgraph::metrics::IncrementCounter(memgraph::metrics::StreamsCreated);
return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
consumer_group = std::move(consumer_group), common_stream_info = std::move(common_stream_info),
@ -696,6 +702,8 @@ Callback::CallbackFunction GetPulsarCreateCallback(StreamQuery *stream_query, Ex
throw SemanticException("Service URL must not be an empty string!");
}
auto common_stream_info = GetCommonStreamInfo(stream_query, evaluator);
memgraph::metrics::IncrementCounter(memgraph::metrics::StreamsCreated);
return [interpreter_context, stream_name = stream_query->stream_name_,
topic_names = EvaluateTopicNames(evaluator, stream_query->topic_names_),
common_stream_info = std::move(common_stream_info), service_url = std::move(service_url),
@ -726,7 +734,6 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
Callback callback;
switch (stream_query->action_) {
case StreamQuery::Action::CREATE_STREAM: {
EventCounter::IncrementCounter(EventCounter::StreamsCreated);
switch (stream_query->type_) {
case StreamQuery::Type::KAFKA:
callback.fn = GetKafkaCreateCallback(stream_query, evaluator, interpreter_context, username);
@ -1137,7 +1144,11 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
if (has_unsent_results_) {
return std::nullopt;
}
summary->insert_or_assign("plan_execution_time", execution_time_.count());
memgraph::metrics::Measure(memgraph::metrics::QueryExecutionLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(execution_time_).count());
// We are finished with pulling all the data, therefore we can send any
// metadata about the results i.e. notifications and statistics
const bool is_any_counter_set =
@ -1174,6 +1185,9 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
if (in_explicit_transaction_) {
throw ExplicitTransactionUsageException("Nested transactions are not supported.");
}
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveTransactions);
in_explicit_transaction_ = true;
expect_rollback_ = false;
@ -1212,6 +1226,9 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
if (!in_explicit_transaction_) {
throw ExplicitTransactionUsageException("No current transaction to rollback.");
}
memgraph::metrics::IncrementCounter(memgraph::metrics::RollbackedTransactions);
Abort();
expect_rollback_ = false;
in_explicit_transaction_ = false;
@ -1638,7 +1655,6 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
[&index_notification, &label_name, &properties_stringified]<typename T>(T &&) {
using ErrorType = std::remove_cvref_t<T>;
if constexpr (std::is_same_v<ErrorType, storage::ReplicationError>) {
EventCounter::IncrementCounter(EventCounter::LabelIndexCreated);
throw ReplicationException(
fmt::format("At least one SYNC replica has not confirmed the creation of the index on label {} "
"on properties {}.",
@ -1652,8 +1668,6 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
}
},
error);
} else {
EventCounter::IncrementCounter(EventCounter::LabelIndexCreated);
}
};
break;
@ -1910,6 +1924,7 @@ Callback CreateTrigger(TriggerQuery *trigger_query,
std::move(trigger_name), trigger_statement, user_parameters, ToTriggerEventType(event_type),
before_commit ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT, &interpreter_context->ast_cache,
dba, interpreter_context->config.query, std::move(owner), interpreter_context->auth_checker);
memgraph::metrics::IncrementCounter(memgraph::metrics::TriggersCreated);
return {};
}};
}
@ -1964,7 +1979,6 @@ PreparedQuery PrepareTriggerQuery(ParsedQuery parsed_query, bool in_explicit_tra
case TriggerQuery::Action::CREATE_TRIGGER:
trigger_notification.emplace(SeverityLevel::INFO, NotificationCode::CREATE_TRIGGER,
fmt::format("Created trigger {}.", trigger_query->trigger_name_));
EventCounter::IncrementCounter(EventCounter::TriggersCreated);
return CreateTrigger(trigger_query, user_parameters, interpreter_context, dba, std::move(owner));
case TriggerQuery::Action::DROP_TRIGGER:
trigger_notification.emplace(SeverityLevel::INFO, NotificationCode::DROP_TRIGGER,
@ -2758,14 +2772,14 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
// an explicit transaction block.
if (in_explicit_transaction_) {
AdvanceCommand();
}
// If we're not in an explicit transaction block and we have an open
// transaction, abort it since we're about to prepare a new query.
else if (db_accessor_) {
} else if (db_accessor_) {
// If we're not in an explicit transaction block and we have an open
// transaction, abort it since we're about to prepare a new query.
query_executions_.emplace_back(
std::make_unique<QueryExecution>(utils::MonotonicBufferResource(kExecutionMemoryBlockSize)));
AbortCommand(&query_executions_.back());
}
std::unique_ptr<QueryExecution> *query_execution_ptr = nullptr;
try {
query_executions_.emplace_back(
@ -2813,6 +2827,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
utils::Downcast<ProfileQuery>(parsed_query.query) || utils::Downcast<DumpQuery>(parsed_query.query) ||
utils::Downcast<TriggerQuery>(parsed_query.query) || utils::Downcast<AnalyzeGraphQuery>(parsed_query.query) ||
utils::Downcast<TransactionQueueQuery>(parsed_query.query))) {
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveTransactions);
db_accessor_ =
std::make_unique<storage::Storage::Accessor>(interpreter_context_->db->Access(GetIsolationLevelOverride()));
execution_db_accessor_.emplace(db_accessor_.get());
@ -2916,7 +2931,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid};
} catch (const utils::BasicException &) {
EventCounter::IncrementCounter(EventCounter::FailedQuery);
memgraph::metrics::IncrementCounter(memgraph::metrics::FailedQuery);
AbortCommand(query_execution_ptr);
throw;
}
@ -2947,7 +2962,11 @@ void Interpreter::Abort() {
expect_rollback_ = false;
in_explicit_transaction_ = false;
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveTransactions);
if (!db_accessor_) return;
db_accessor_->Abort();
execution_db_accessor_.reset();
db_accessor_.reset();
@ -3043,6 +3062,11 @@ void Interpreter::Commit() {
utils::OnScopeExit clean_status(
[this]() { transaction_status_.store(TransactionStatus::IDLE, std::memory_order_release); });
utils::OnScopeExit update_metrics([]() {
memgraph::metrics::IncrementCounter(memgraph::metrics::CommitedTransactions);
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveTransactions);
});
std::optional<TriggerContext> trigger_context = std::nullopt;
if (trigger_context_collector_) {
trigger_context.emplace(std::move(*trigger_context_collector_).TransformToTriggerContext());

View File

@ -44,9 +44,9 @@
#include "utils/timer.hpp"
#include "utils/tsc.hpp"
namespace EventCounter {
namespace memgraph::metrics {
extern const Event FailedQuery;
} // namespace EventCounter
} // namespace memgraph::metrics
namespace memgraph::query {
@ -515,7 +515,7 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
query_execution.reset(nullptr);
throw;
} catch (const utils::BasicException &) {
EventCounter::IncrementCounter(EventCounter::FailedQuery);
memgraph::metrics::IncrementCounter(memgraph::metrics::FailedQuery);
AbortCommand(&query_execution);
throw;
}

View File

@ -81,7 +81,7 @@
LOG_FATAL("Operator " #class_name " has no single input!"); \
}
namespace EventCounter {
namespace memgraph::metrics {
extern const Event OnceOperator;
extern const Event CreateNodeOperator;
extern const Event CreateExpandOperator;
@ -119,7 +119,7 @@ extern const Event ForeachOperator;
extern const Event EmptyResultOperator;
extern const Event EvaluatePatternFilterOperator;
extern const Event ApplyOperator;
} // namespace EventCounter
} // namespace memgraph::metrics
namespace memgraph::query::plan {
@ -170,7 +170,7 @@ bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
}
UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::OnceOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::OnceOperator);
return MakeUniqueCursorPtr<OnceCursor>(mem);
}
@ -232,7 +232,7 @@ VertexAccessor &CreateLocalVertex(const NodeCreationInfo &node_info, Frame *fram
ACCEPT_WITH_INPUT(CreateNode)
UniqueCursorPtr CreateNode::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::CreateNodeOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::CreateNodeOperator);
return MakeUniqueCursorPtr<CreateNodeCursor>(mem, *this, mem);
}
@ -282,7 +282,7 @@ CreateExpand::CreateExpand(const NodeCreationInfo &node_info, const EdgeCreation
ACCEPT_WITH_INPUT(CreateExpand)
UniqueCursorPtr CreateExpand::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::CreateNodeOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::CreateNodeOperator);
return MakeUniqueCursorPtr<CreateExpandCursor>(mem, *this, mem);
}
@ -489,7 +489,7 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy
ACCEPT_WITH_INPUT(ScanAll)
UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ScanAllOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllOperator);
auto vertices = [this](Frame &, ExecutionContext &context) {
auto *db = context.db_accessor;
@ -512,7 +512,7 @@ ScanAllByLabel::ScanAllByLabel(const std::shared_ptr<LogicalOperator> &input, Sy
ACCEPT_WITH_INPUT(ScanAllByLabel)
UniqueCursorPtr ScanAllByLabel::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ScanAllByLabelOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllByLabelOperator);
auto vertices = [this](Frame &, ExecutionContext &context) {
auto *db = context.db_accessor;
@ -542,7 +542,7 @@ ScanAllByLabelPropertyRange::ScanAllByLabelPropertyRange(const std::shared_ptr<L
ACCEPT_WITH_INPUT(ScanAllByLabelPropertyRange)
UniqueCursorPtr ScanAllByLabelPropertyRange::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ScanAllByLabelPropertyRangeOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllByLabelPropertyRangeOperator);
auto vertices = [this](Frame &frame, ExecutionContext &context)
-> std::optional<decltype(context.db_accessor->Vertices(view_, label_, property_, std::nullopt, std::nullopt))> {
@ -602,7 +602,7 @@ ScanAllByLabelPropertyValue::ScanAllByLabelPropertyValue(const std::shared_ptr<L
ACCEPT_WITH_INPUT(ScanAllByLabelPropertyValue)
UniqueCursorPtr ScanAllByLabelPropertyValue::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ScanAllByLabelPropertyValueOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllByLabelPropertyValueOperator);
auto vertices = [this](Frame &frame, ExecutionContext &context)
-> std::optional<decltype(context.db_accessor->Vertices(view_, label_, property_, storage::PropertyValue()))> {
@ -627,7 +627,7 @@ ScanAllByLabelProperty::ScanAllByLabelProperty(const std::shared_ptr<LogicalOper
ACCEPT_WITH_INPUT(ScanAllByLabelProperty)
UniqueCursorPtr ScanAllByLabelProperty::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ScanAllByLabelPropertyOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllByLabelPropertyOperator);
auto vertices = [this](Frame &frame, ExecutionContext &context) {
auto *db = context.db_accessor;
@ -646,7 +646,7 @@ ScanAllById::ScanAllById(const std::shared_ptr<LogicalOperator> &input, Symbol o
ACCEPT_WITH_INPUT(ScanAllById)
UniqueCursorPtr ScanAllById::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ScanAllByIdOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ScanAllByIdOperator);
auto vertices = [this](Frame &frame, ExecutionContext &context) -> std::optional<std::vector<VertexAccessor>> {
auto *db = context.db_accessor;
@ -701,7 +701,7 @@ Expand::Expand(const std::shared_ptr<LogicalOperator> &input, Symbol input_symbo
ACCEPT_WITH_INPUT(Expand)
UniqueCursorPtr Expand::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ExpandOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ExpandOperator);
return MakeUniqueCursorPtr<ExpandCursor>(mem, *this, mem);
}
@ -2171,7 +2171,7 @@ class ExpandAllShortestPathsCursor : public query::plan::Cursor {
};
UniqueCursorPtr ExpandVariable::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ExpandVariableOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ExpandVariableOperator);
switch (type_) {
case EdgeAtom::Type::BREADTH_FIRST:
@ -2274,7 +2274,7 @@ class ConstructNamedPathCursor : public Cursor {
ACCEPT_WITH_INPUT(ConstructNamedPath)
UniqueCursorPtr ConstructNamedPath::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ConstructNamedPathOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ConstructNamedPathOperator);
return MakeUniqueCursorPtr<ConstructNamedPathCursor>(mem, *this, mem);
}
@ -2300,7 +2300,7 @@ bool Filter::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
}
UniqueCursorPtr Filter::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::FilterOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::FilterOperator);
return MakeUniqueCursorPtr<FilterCursor>(mem, *this, mem);
}
@ -2353,7 +2353,7 @@ EvaluatePatternFilter::EvaluatePatternFilter(const std::shared_ptr<LogicalOperat
ACCEPT_WITH_INPUT(EvaluatePatternFilter);
UniqueCursorPtr EvaluatePatternFilter::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::EvaluatePatternFilterOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::EvaluatePatternFilterOperator);
return MakeUniqueCursorPtr<EvaluatePatternFilterCursor>(mem, *this, mem);
}
@ -2386,7 +2386,7 @@ Produce::Produce(const std::shared_ptr<LogicalOperator> &input, const std::vecto
ACCEPT_WITH_INPUT(Produce)
UniqueCursorPtr Produce::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ProduceOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ProduceOperator);
return MakeUniqueCursorPtr<ProduceCursor>(mem, *this, mem);
}
@ -2429,7 +2429,7 @@ Delete::Delete(const std::shared_ptr<LogicalOperator> &input_, const std::vector
ACCEPT_WITH_INPUT(Delete)
UniqueCursorPtr Delete::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::DeleteOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::DeleteOperator);
return MakeUniqueCursorPtr<DeleteCursor>(mem, *this, mem);
}
@ -2581,7 +2581,7 @@ SetProperty::SetProperty(const std::shared_ptr<LogicalOperator> &input, storage:
ACCEPT_WITH_INPUT(SetProperty)
UniqueCursorPtr SetProperty::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::SetPropertyOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::SetPropertyOperator);
return MakeUniqueCursorPtr<SetPropertyCursor>(mem, *this, mem);
}
@ -2664,7 +2664,7 @@ SetProperties::SetProperties(const std::shared_ptr<LogicalOperator> &input, Symb
ACCEPT_WITH_INPUT(SetProperties)
UniqueCursorPtr SetProperties::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::SetPropertiesOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::SetPropertiesOperator);
return MakeUniqueCursorPtr<SetPropertiesCursor>(mem, *this, mem);
}
@ -2861,7 +2861,7 @@ SetLabels::SetLabels(const std::shared_ptr<LogicalOperator> &input, Symbol input
ACCEPT_WITH_INPUT(SetLabels)
UniqueCursorPtr SetLabels::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::SetLabelsOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::SetLabelsOperator);
return MakeUniqueCursorPtr<SetLabelsCursor>(mem, *this, mem);
}
@ -2933,7 +2933,7 @@ RemoveProperty::RemoveProperty(const std::shared_ptr<LogicalOperator> &input, st
ACCEPT_WITH_INPUT(RemoveProperty)
UniqueCursorPtr RemoveProperty::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::RemovePropertyOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::RemovePropertyOperator);
return MakeUniqueCursorPtr<RemovePropertyCursor>(mem, *this, mem);
}
@ -3019,7 +3019,7 @@ RemoveLabels::RemoveLabels(const std::shared_ptr<LogicalOperator> &input, Symbol
ACCEPT_WITH_INPUT(RemoveLabels)
UniqueCursorPtr RemoveLabels::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::RemoveLabelsOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::RemoveLabelsOperator);
return MakeUniqueCursorPtr<RemoveLabelsCursor>(mem, *this, mem);
}
@ -3092,7 +3092,7 @@ EdgeUniquenessFilter::EdgeUniquenessFilter(const std::shared_ptr<LogicalOperator
ACCEPT_WITH_INPUT(EdgeUniquenessFilter)
UniqueCursorPtr EdgeUniquenessFilter::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::EdgeUniquenessFilterOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::EdgeUniquenessFilterOperator);
return MakeUniqueCursorPtr<EdgeUniquenessFilterCursor>(mem, *this, mem);
}
@ -3194,7 +3194,7 @@ class EmptyResultCursor : public Cursor {
};
UniqueCursorPtr EmptyResult::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::EmptyResultOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::EmptyResultOperator);
return MakeUniqueCursorPtr<EmptyResultCursor>(mem, *this, mem);
}
@ -3255,7 +3255,7 @@ class AccumulateCursor : public Cursor {
};
UniqueCursorPtr Accumulate::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::AccumulateOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::AccumulateOperator);
return MakeUniqueCursorPtr<AccumulateCursor>(mem, *this, mem);
}
@ -3617,7 +3617,7 @@ class AggregateCursor : public Cursor {
};
UniqueCursorPtr Aggregate::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::AggregateOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::AggregateOperator);
return MakeUniqueCursorPtr<AggregateCursor>(mem, *this, mem);
}
@ -3628,7 +3628,7 @@ Skip::Skip(const std::shared_ptr<LogicalOperator> &input, Expression *expression
ACCEPT_WITH_INPUT(Skip)
UniqueCursorPtr Skip::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::SkipOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::SkipOperator);
return MakeUniqueCursorPtr<SkipCursor>(mem, *this, mem);
}
@ -3681,7 +3681,7 @@ Limit::Limit(const std::shared_ptr<LogicalOperator> &input, Expression *expressi
ACCEPT_WITH_INPUT(Limit)
UniqueCursorPtr Limit::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::LimitOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::LimitOperator);
return MakeUniqueCursorPtr<LimitCursor>(mem, *this, mem);
}
@ -3829,7 +3829,7 @@ class OrderByCursor : public Cursor {
};
UniqueCursorPtr OrderBy::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::OrderByOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::OrderByOperator);
return MakeUniqueCursorPtr<OrderByCursor>(mem, *this, mem);
}
@ -3846,7 +3846,7 @@ bool Merge::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
}
UniqueCursorPtr Merge::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::MergeOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::MergeOperator);
return MakeUniqueCursorPtr<MergeCursor>(mem, *this, mem);
}
@ -3926,7 +3926,7 @@ bool Optional::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
}
UniqueCursorPtr Optional::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::OptionalOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::OptionalOperator);
return MakeUniqueCursorPtr<OptionalCursor>(mem, *this, mem);
}
@ -4054,7 +4054,7 @@ class UnwindCursor : public Cursor {
};
UniqueCursorPtr Unwind::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::UnwindOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::UnwindOperator);
return MakeUniqueCursorPtr<UnwindCursor>(mem, *this, mem);
}
@ -4108,7 +4108,7 @@ Distinct::Distinct(const std::shared_ptr<LogicalOperator> &input, const std::vec
ACCEPT_WITH_INPUT(Distinct)
UniqueCursorPtr Distinct::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::DistinctOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::DistinctOperator);
return MakeUniqueCursorPtr<DistinctCursor>(mem, *this, mem);
}
@ -4130,7 +4130,7 @@ Union::Union(const std::shared_ptr<LogicalOperator> &left_op, const std::shared_
right_symbols_(right_symbols) {}
UniqueCursorPtr Union::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::UnionOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::UnionOperator);
return MakeUniqueCursorPtr<Union::UnionCursor>(mem, *this, mem);
}
@ -4289,7 +4289,7 @@ class CartesianCursor : public Cursor {
} // namespace
UniqueCursorPtr Cartesian::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::CartesianOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::CartesianOperator);
return MakeUniqueCursorPtr<CartesianCursor>(mem, *this, mem);
}
@ -4580,7 +4580,7 @@ class CallProcedureCursor : public Cursor {
};
UniqueCursorPtr CallProcedure::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::CallProcedureOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::CallProcedureOperator);
CallProcedure::IncrementCounter(procedure_name_);
return MakeUniqueCursorPtr<CallProcedureCursor>(mem, this, mem);
@ -4786,7 +4786,7 @@ Foreach::Foreach(std::shared_ptr<LogicalOperator> input, std::shared_ptr<Logical
loop_variable_symbol_(loop_variable_symbol) {}
UniqueCursorPtr Foreach::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ForeachOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ForeachOperator);
return MakeUniqueCursorPtr<ForeachCursor>(mem, *this, mem);
}
@ -4818,7 +4818,7 @@ bool Apply::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
}
UniqueCursorPtr Apply::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ApplyOperator);
memgraph::metrics::IncrementCounter(memgraph::metrics::ApplyOperator);
return MakeUniqueCursorPtr<ApplyCursor>(mem, *this, mem);
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -36,9 +36,9 @@
#include "utils/pmr/string.hpp"
#include "utils/variant_helpers.hpp"
namespace EventCounter {
namespace memgraph::metrics {
extern const Event MessagesConsumed;
} // namespace EventCounter
} // namespace memgraph::metrics
namespace memgraph::query::stream {
namespace {
@ -495,7 +495,7 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
utils::OnScopeExit interpreter_cleanup{
[interpreter_context, interpreter]() { interpreter_context->interpreters->erase(interpreter.get()); }};
EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size());
memgraph::metrics::IncrementCounter(memgraph::metrics::MessagesConsumed, messages.size());
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
DiscardValueResultStream stream;

View File

@ -25,9 +25,9 @@
#include "utils/event_counter.hpp"
#include "utils/memory.hpp"
namespace EventCounter {
namespace memgraph::metrics {
extern const Event TriggersExecuted;
} // namespace EventCounter
} // namespace memgraph::metrics
namespace memgraph::query {
namespace {
@ -248,7 +248,7 @@ void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution
;
cursor->Shutdown();
EventCounter::IncrementCounter(EventCounter::TriggersExecuted);
memgraph::metrics::IncrementCounter(memgraph::metrics::TriggersExecuted);
}
namespace {

View File

@ -27,9 +27,15 @@
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/wal.hpp"
#include "utils/event_histogram.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"
#include "utils/message.hpp"
#include "utils/timer.hpp"
namespace memgraph::metrics {
extern const Event SnapshotRecoveryLatency_us;
} // namespace memgraph::metrics
namespace memgraph::storage::durability {
@ -176,6 +182,8 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
return std::nullopt;
}
utils::Timer timer;
auto snapshot_files = GetSnapshotFiles(snapshot_directory);
RecoveryInfo recovery_info;
@ -347,6 +355,10 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
}
RecoverIndicesAndConstraints(indices_constraints, indices, constraints, vertices);
memgraph::metrics::Measure(memgraph::metrics::SnapshotRecoveryLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());
return recovery_info;
}

View File

@ -34,6 +34,8 @@
#include "storage/v2/storage_mode.hpp"
#include "storage/v2/transaction.hpp"
#include "storage/v2/vertex_accessor.hpp"
#include "utils/event_counter.hpp"
#include "utils/event_histogram.hpp"
#include "utils/file.hpp"
#include "utils/logging.hpp"
#include "utils/memory_tracker.hpp"
@ -41,6 +43,7 @@
#include "utils/rw_lock.hpp"
#include "utils/spin_lock.hpp"
#include "utils/stat.hpp"
#include "utils/timer.hpp"
#include "utils/uuid.hpp"
/// REPLICATION ///
@ -49,6 +52,13 @@
#include "storage/v2/replication/rpc.hpp"
#include "storage/v2/storage_error.hpp"
namespace memgraph::metrics {
extern const Event SnapshotCreationLatency_us;
extern const Event ActiveLabelIndices;
extern const Event ActiveLabelPropertyIndices;
} // namespace memgraph::metrics
namespace memgraph::storage {
using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
@ -1213,6 +1223,9 @@ utils::BasicResult<StorageIndexDefinitionError, void> Storage::CreateIndex(
commit_log_->MarkFinished(commit_timestamp);
last_commit_timestamp_ = commit_timestamp;
// We don't care if there is a replication error because on main node the change will go through
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveLabelIndices);
if (success) {
return {};
}
@ -1232,6 +1245,9 @@ utils::BasicResult<StorageIndexDefinitionError, void> Storage::CreateIndex(
commit_log_->MarkFinished(commit_timestamp);
last_commit_timestamp_ = commit_timestamp;
// We don't care if there is a replication error because on main node the change will go through
memgraph::metrics::IncrementCounter(memgraph::metrics::ActiveLabelPropertyIndices);
if (success) {
return {};
}
@ -1251,6 +1267,9 @@ utils::BasicResult<StorageIndexDefinitionError, void> Storage::DropIndex(
commit_log_->MarkFinished(commit_timestamp);
last_commit_timestamp_ = commit_timestamp;
// We don't care if there is a replication error because on main node the change will go through
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveLabelIndices);
if (success) {
return {};
}
@ -1272,6 +1291,9 @@ utils::BasicResult<StorageIndexDefinitionError, void> Storage::DropIndex(
commit_log_->MarkFinished(commit_timestamp);
last_commit_timestamp_ = commit_timestamp;
// We don't care if there is a replication error because on main node the change will go through
memgraph::metrics::DecrementCounter(memgraph::metrics::ActiveLabelPropertyIndices);
if (success) {
return {};
}
@ -1943,6 +1965,8 @@ utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot(std::op
}
auto snapshot_creator = [this]() {
utils::Timer timer;
auto transaction = CreateTransaction(IsolationLevel::SNAPSHOT_ISOLATION, storage_mode_);
// Create snapshot.
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
@ -1950,6 +1974,9 @@ utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot(std::op
&indices_, &constraints_, config_, uuid_, epoch_id_, epoch_history_, &file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);
memgraph::metrics::Measure(memgraph::metrics::SnapshotCreationLatency_us,
std::chrono::duration_cast<std::chrono::microseconds>(timer.Elapsed()).count());
};
std::lock_guard snapshot_guard(snapshot_lock_);

View File

@ -2,6 +2,8 @@ set(utils_src_files
async_timer.cpp
base64.cpp
event_counter.cpp
event_gauge.cpp
event_histogram.cpp
csv_parsing.cpp
file.cpp
file_locker.cpp

View File

@ -11,69 +11,86 @@
#include "utils/event_counter.hpp"
#define APPLY_FOR_EVENTS(M) \
M(ReadQuery, "Number of read-only queries executed.") \
M(WriteQuery, "Number of write-only queries executed.") \
M(ReadWriteQuery, "Number of read-write queries executed.") \
\
M(OnceOperator, "Number of times Once operator was used.") \
M(CreateNodeOperator, "Number of times CreateNode operator was used.") \
M(CreateExpandOperator, "Number of times CreateExpand operator was used.") \
M(ScanAllOperator, "Number of times ScanAll operator was used.") \
M(ScanAllByLabelOperator, "Number of times ScanAllByLabel operator was used.") \
M(ScanAllByLabelPropertyRangeOperator, "Number of times ScanAllByLabelPropertyRange operator was used.") \
M(ScanAllByLabelPropertyValueOperator, "Number of times ScanAllByLabelPropertyValue operator was used.") \
M(ScanAllByLabelPropertyOperator, "Number of times ScanAllByLabelProperty operator was used.") \
M(ScanAllByIdOperator, "Number of times ScanAllById operator was used.") \
M(ExpandOperator, "Number of times Expand operator was used.") \
M(ExpandVariableOperator, "Number of times ExpandVariable operator was used.") \
M(ConstructNamedPathOperator, "Number of times ConstructNamedPath operator was used.") \
M(FilterOperator, "Number of times Filter operator was used.") \
M(ProduceOperator, "Number of times Produce operator was used.") \
M(DeleteOperator, "Number of times Delete operator was used.") \
M(SetPropertyOperator, "Number of times SetProperty operator was used.") \
M(SetPropertiesOperator, "Number of times SetProperties operator was used.") \
M(SetLabelsOperator, "Number of times SetLabels operator was used.") \
M(RemovePropertyOperator, "Number of times RemoveProperty operator was used.") \
M(RemoveLabelsOperator, "Number of times RemoveLabels operator was used.") \
M(EdgeUniquenessFilterOperator, "Number of times EdgeUniquenessFilter operator was used.") \
M(EmptyResultOperator, "Number of times EmptyResult operator was used.") \
M(AccumulateOperator, "Number of times Accumulate operator was used.") \
M(AggregateOperator, "Number of times Aggregate operator was used.") \
M(SkipOperator, "Number of times Skip operator was used.") \
M(LimitOperator, "Number of times Limit operator was used.") \
M(OrderByOperator, "Number of times OrderBy operator was used.") \
M(MergeOperator, "Number of times Merge operator was used.") \
M(OptionalOperator, "Number of times Optional operator was used.") \
M(UnwindOperator, "Number of times Unwind operator was used.") \
M(DistinctOperator, "Number of times Distinct operator was used.") \
M(UnionOperator, "Number of times Union operator was used.") \
M(CartesianOperator, "Number of times Cartesian operator was used.") \
M(CallProcedureOperator, "Number of times CallProcedure operator was used.") \
M(ForeachOperator, "Number of times Foreach operator was used.") \
M(EvaluatePatternFilterOperator, "Number of times EvaluatePatternFilter operator was used.") \
M(ApplyOperator, "Number of times ApplyOperator operator was used.") \
\
M(FailedQuery, "Number of times executing a query failed.") \
M(LabelIndexCreated, "Number of times a label index was created.") \
M(LabelPropertyIndexCreated, "Number of times a label property index was created.") \
M(StreamsCreated, "Number of Streams created.") \
M(MessagesConsumed, "Number of consumed streamed messages.") \
M(TriggersCreated, "Number of Triggers created.") \
M(TriggersExecuted, "Number of Triggers executed.")
namespace EventCounter {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define APPLY_FOR_COUNTERS(M) \
M(ReadQuery, QueryType, "Number of read-only queries executed.") \
M(WriteQuery, QueryType, "Number of write-only queries executed.") \
M(ReadWriteQuery, QueryType, "Number of read-write queries executed.") \
\
M(OnceOperator, Operator, "Number of times Once operator was used.") \
M(CreateNodeOperator, Operator, "Number of times CreateNode operator was used.") \
M(CreateExpandOperator, Operator, "Number of times CreateExpand operator was used.") \
M(ScanAllOperator, Operator, "Number of times ScanAll operator was used.") \
M(ScanAllByLabelOperator, Operator, "Number of times ScanAllByLabel operator was used.") \
M(ScanAllByLabelPropertyRangeOperator, Operator, "Number of times ScanAllByLabelPropertyRange operator was used.") \
M(ScanAllByLabelPropertyValueOperator, Operator, "Number of times ScanAllByLabelPropertyValue operator was used.") \
M(ScanAllByLabelPropertyOperator, Operator, "Number of times ScanAllByLabelProperty operator was used.") \
M(ScanAllByIdOperator, Operator, "Number of times ScanAllById operator was used.") \
M(ExpandOperator, Operator, "Number of times Expand operator was used.") \
M(ExpandVariableOperator, Operator, "Number of times ExpandVariable operator was used.") \
M(ConstructNamedPathOperator, Operator, "Number of times ConstructNamedPath operator was used.") \
M(FilterOperator, Operator, "Number of times Filter operator was used.") \
M(ProduceOperator, Operator, "Number of times Produce operator was used.") \
M(DeleteOperator, Operator, "Number of times Delete operator was used.") \
M(SetPropertyOperator, Operator, "Number of times SetProperty operator was used.") \
M(SetPropertiesOperator, Operator, "Number of times SetProperties operator was used.") \
M(SetLabelsOperator, Operator, "Number of times SetLabels operator was used.") \
M(RemovePropertyOperator, Operator, "Number of times RemoveProperty operator was used.") \
M(RemoveLabelsOperator, Operator, "Number of times RemoveLabels operator was used.") \
M(EdgeUniquenessFilterOperator, Operator, "Number of times EdgeUniquenessFilter operator was used.") \
M(EmptyResultOperator, Operator, "Number of times EmptyResult operator was used.") \
M(AccumulateOperator, Operator, "Number of times Accumulate operator was used.") \
M(AggregateOperator, Operator, "Number of times Aggregate operator was used.") \
M(SkipOperator, Operator, "Number of times Skip operator was used.") \
M(LimitOperator, Operator, "Number of times Limit operator was used.") \
M(OrderByOperator, Operator, "Number of times OrderBy operator was used.") \
M(MergeOperator, Operator, "Number of times Merge operator was used.") \
M(OptionalOperator, Operator, "Number of times Optional operator was used.") \
M(UnwindOperator, Operator, "Number of times Unwind operator was used.") \
M(DistinctOperator, Operator, "Number of times Distinct operator was used.") \
M(UnionOperator, Operator, "Number of times Union operator was used.") \
M(CartesianOperator, Operator, "Number of times Cartesian operator was used.") \
M(CallProcedureOperator, Operator, "Number of times CallProcedure operator was used.") \
M(ForeachOperator, Operator, "Number of times Foreach operator was used.") \
M(EvaluatePatternFilterOperator, Operator, "Number of times EvaluatePatternFilter operator was used.") \
M(ApplyOperator, Operator, "Number of times ApplyOperator operator was used.") \
\
M(ActiveLabelIndices, Index, "Number of active label indices in the system.") \
M(ActiveLabelPropertyIndices, Index, "Number of active label property indices in the system<.") \
\
M(StreamsCreated, Stream, "Number of Streams created.") \
M(MessagesConsumed, Stream, "Number of consumed streamed messages.") \
\
M(TriggersCreated, Trigger, "Number of Triggers created.") \
M(TriggersExecuted, Trigger, "Number of Triggers executed.") \
\
M(ActiveSessions, Session, "Number of active connections.") \
M(ActiveBoltSessions, Session, "Number of active Bolt connections.") \
M(ActiveTCPSessions, Session, "Number of active TCP connections.") \
M(ActiveSSLSessions, Session, "Number of active SSL connections.") \
M(ActiveWebSocketSessions, Session, "Number of active websocket connections.") \
M(BoltMessages, Session, "Number of Bolt messages sent.") \
\
M(ActiveTransactions, Transaction, "Number of active transactions.") \
M(CommitedTransactions, Transaction, "Number of committed transactions.") \
M(RollbackedTransactions, Transaction, "Number of rollbacked transactions.") \
M(FailedQuery, Transaction, "Number of times executing a query failed.")
namespace memgraph::metrics {
// define every Event as an index in the array of counters
#define M(NAME, DOCUMENTATION) extern const Event NAME = __COUNTER__;
APPLY_FOR_EVENTS(M)
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION) extern const Event NAME = __COUNTER__;
APPLY_FOR_COUNTERS(M)
#undef M
inline constexpr Event END = __COUNTER__;
// Initialize array for the global counter with all values set to 0
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
Counter global_counters_array[END]{};
// Initialize global counters
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
EventCounters global_counters(global_counters_array);
const Event EventCounters::num_counters = END;
@ -82,28 +99,45 @@ void EventCounters::Increment(const Event event, Count amount) {
counters_[event].fetch_add(amount, std::memory_order_relaxed);
}
void EventCounters::Decrement(const Event event, Count amount) {
counters_[event].fetch_sub(amount, std::memory_order_relaxed);
}
void IncrementCounter(const Event event, Count amount) { global_counters.Increment(event, amount); }
void DecrementCounter(const Event event, Count amount) { global_counters.Decrement(event, amount); }
const char *GetName(const Event event) {
const char *GetCounterName(const Event event) {
static const char *strings[] = {
#define M(NAME, DOCUMENTATION) #NAME,
APPLY_FOR_EVENTS(M)
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION) #NAME,
APPLY_FOR_COUNTERS(M)
#undef M
};
return strings[event];
}
const char *GetDocumentation(const Event event) {
const char *GetCounterDocumentation(const Event event) {
static const char *strings[] = {
#define M(NAME, DOCUMENTATION) DOCUMENTATION,
APPLY_FOR_EVENTS(M)
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION) DOCUMENTATION,
APPLY_FOR_COUNTERS(M)
#undef M
};
return strings[event];
}
Event End() { return END; }
const char *GetCounterType(const Event event) {
static const char *strings[] = {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION) #TYPE,
APPLY_FOR_COUNTERS(M)
#undef M
};
} // namespace EventCounter
return strings[event];
}
Event CounterEnd() { return END; }
} // namespace memgraph::metrics

View File

@ -1,4 +1,4 @@
// Copyright 2021 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -10,11 +10,12 @@
// licenses/APL.txt.
#pragma once
#include <atomic>
#include <cstdlib>
#include <memory>
namespace EventCounter {
namespace memgraph::metrics {
using Event = uint64_t;
using Count = uint64_t;
using Counter = std::atomic<Count>;
@ -29,19 +30,23 @@ class EventCounters {
void Increment(Event event, Count amount = 1);
void Decrement(Event event, Count amount = 1);
static const Event num_counters;
private:
Counter *counters_;
};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
extern EventCounters global_counters;
void IncrementCounter(Event event, Count amount = 1);
void DecrementCounter(Event event, Count amount = 1);
const char *GetName(Event event);
const char *GetDocumentation(Event event);
const char *GetCounterName(Event event);
const char *GetCounterDocumentation(Event event);
const char *GetCounterType(Event event);
Event End();
} // namespace EventCounter
Event CounterEnd();
} // namespace memgraph::metrics

76
src/utils/event_gauge.cpp Normal file
View File

@ -0,0 +1,76 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "utils/event_gauge.hpp"
// We don't have any gauges for now
#define APPLY_FOR_GAUGES(M)
namespace memgraph::metrics {
// define every Event as an index in the array of gauges
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION) extern const Event NAME = __COUNTER__;
APPLY_FOR_GAUGES(M)
#undef M
inline constexpr Event END = __COUNTER__;
// Initialize array for the global gauges with all values set to 0
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
Gauge global_gauges_array[END]{};
// Initialize global counters
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
EventGauges global_gauges(global_gauges_array);
const Event EventGauges::num_gauges = END;
void EventGauges::SetValue(const Event event, Value value) { gauges_[event].store(value, std::memory_order_seq_cst); }
void SetGaugeValue(const Event event, Value value) { global_gauges.SetValue(event, value); }
const char *GetGaugeName(const Event event) {
static const char *strings[] = {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION) #NAME,
APPLY_FOR_GAUGES(M)
#undef M
};
return strings[event];
}
const char *GetGaugeDocumentation(const Event event) {
static const char *strings[] = {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION) DOCUMENTATION,
APPLY_FOR_GAUGES(M)
#undef M
};
return strings[event];
}
const char *GetGaugeType(const Event event) {
static const char *strings[] = {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION) #TYPE,
APPLY_FOR_GAUGES(M)
#undef M
};
return strings[event];
}
Event GaugeEnd() { return END; }
} // namespace memgraph::metrics

49
src/utils/event_gauge.hpp Normal file
View File

@ -0,0 +1,49 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <atomic>
#include <cstdlib>
#include <memory>
namespace memgraph::metrics {
using Event = uint64_t;
using Value = uint64_t;
using Gauge = std::atomic<Value>;
class EventGauges {
public:
explicit EventGauges(Gauge *allocated_gauges) noexcept : gauges_(allocated_gauges) {}
auto &operator[](const Event event) { return gauges_[event]; }
const auto &operator[](const Event event) const { return gauges_[event]; }
void SetValue(Event event, Value value);
static const Event num_gauges;
private:
Gauge *gauges_;
};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
extern EventGauges global_gauges;
void SetGaugeValue(Event event, Value value);
const char *GetGaugeName(Event event);
const char *GetGaugeDocumentation(Event event);
const char *GetGaugeType(Event event);
Event GaugeEnd();
} // namespace memgraph::metrics

View File

@ -0,0 +1,84 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "utils/event_histogram.hpp"
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define APPLY_FOR_HISTOGRAMS(M) \
M(QueryExecutionLatency_us, Query, "Query execution latency in microseconds", 50, 90, 99) \
M(SnapshotCreationLatency_us, Snapshot, "Snapshot creation latency in microseconds", 50, 90, 99) \
M(SnapshotRecoveryLatency_us, Snapshot, "Snapshot recovery latency in microseconds", 50, 90, 99)
namespace memgraph::metrics {
// define every Event as an index in the array of counters
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION, ...) extern const Event NAME = __COUNTER__;
APPLY_FOR_HISTOGRAMS(M)
#undef M
inline constexpr Event END = __COUNTER__;
// Initialize array for the global histogram with all named histograms and their percentiles
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
Histogram global_histograms_array[END]{
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION, ...) Histogram({__VA_ARGS__}),
APPLY_FOR_HISTOGRAMS(M)
#undef M
};
// Initialize global histograms
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
EventHistograms global_histograms(global_histograms_array);
const Event EventHistograms::num_histograms = END;
void Measure(const Event event, Value value) { global_histograms.Measure(event, value); }
void EventHistograms::Measure(const Event event, Value value) { histograms_[event].Measure(value); }
const char *GetHistogramName(const Event event) {
static const char *strings[] = {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION, ...) #NAME,
APPLY_FOR_HISTOGRAMS(M)
#undef M
};
return strings[event];
}
const char *GetHistogramDocumentation(const Event event) {
static const char *strings[] = {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION, ...) DOCUMENTATION,
APPLY_FOR_HISTOGRAMS(M)
#undef M
};
return strings[event];
}
const char *GetHistogramType(const Event event) {
static const char *strings[] = {
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
#define M(NAME, TYPE, DOCUMENTATION, ...) #TYPE,
APPLY_FOR_HISTOGRAMS(M)
#undef M
};
return strings[event];
}
Event HistogramEnd() { return END; }
} // namespace memgraph::metrics

View File

@ -0,0 +1,171 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <cmath>
#include "utils/logging.hpp"
namespace memgraph::metrics {
using Event = uint64_t;
using Value = uint64_t;
using Measurement = std::atomic<uint64_t>;
// This is a logarithmically bucketing histogram optimized
// for collecting network response latency distributions.
// It "compresses" values by mapping them to a point on a
// logarithmic curve, which serves as the bucket index. This
// compression technique allows for very accurate histograms
// (unlike what is the case for sampling or lossy probabilistic
// approaches) with the trade-off that we sacrifice around 1%
// precision.
//
// properties:
// * roughly 1% precision loss - can be higher for values
// less than 100, so if measuring latency, generally do
// so in microseconds.
// * ~32kb constant space, single allocation per Histogram.
// * Histogram::Percentile() will return 0 if there were no
// samples measured yet.
class Histogram {
// This is the number of buckets that observed values
// will be logarithmically compressed into.
constexpr static auto kSampleLimit = 4096;
// This is roughly 1/error rate, where 100.0 is roughly
// a 1% error bound for measurements. This is less true
// for tiny measurements, but because we tend to measure
// microseconds, it is usually over 100, which is where
// the error bound starts to stabilize a bit. This has
// been tuned to allow the maximum uint64_t to compress
// within 4096 samples while still achieving a high accuracy.
constexpr static auto kPrecision = 92.0;
// samples_ stores per-bucket counts for measurements
// that have been mapped to a specific uint64_t in
// the "compression" logic below.
std::vector<uint64_t> samples_ = {};
std::vector<uint8_t> percentiles_;
// count_ is the number of measurements that have been
// included in this Histogram.
Measurement count_ = 0;
// sum_ is the summed value of all measurements that
// have been included in this Histogram.
Measurement sum_ = 0;
std::mutex samples_mutex_;
public:
Histogram() {
samples_.resize(kSampleLimit, 0);
percentiles_ = {0, 25, 50, 75, 90, 100};
}
explicit Histogram(std::vector<uint8_t> percentiles) : percentiles_(percentiles) { samples_.resize(kSampleLimit, 0); }
uint64_t Count() const { return count_.load(std::memory_order_relaxed); }
uint64_t Sum() const { return sum_.load(std::memory_order_relaxed); }
std::vector<uint8_t> Percentiles() const { return percentiles_; }
void Measure(uint64_t value) {
// "compression" logic
double boosted = 1.0 + static_cast<double>(value);
double ln = std::log(boosted);
double compressed = (kPrecision * ln) + 0.5;
MG_ASSERT(compressed < kSampleLimit, "compressing value {} to {} is invalid", value, compressed);
auto sample_index = static_cast<uint16_t>(compressed);
count_.fetch_add(1, std::memory_order_relaxed);
sum_.fetch_add(value, std::memory_order_relaxed);
{
std::lock_guard<std::mutex> lock(samples_mutex_);
samples_[sample_index]++;
}
}
std::vector<std::pair<uint64_t, uint64_t>> YieldPercentiles() const {
std::vector<std::pair<uint64_t, uint64_t>> percentile_yield;
percentile_yield.reserve(percentiles_.size());
for (const auto percentile : percentiles_) {
percentile_yield.emplace_back(std::make_pair(percentile, Percentile(percentile)));
}
return percentile_yield;
}
uint64_t Percentile(double percentile) const {
MG_ASSERT(percentile <= 100.0, "percentiles must not exceed 100.0");
MG_ASSERT(percentile >= 0.0, "percentiles must be greater than or equal to 0.0");
auto count = Count();
if (count == 0) {
return 0;
}
const auto floated_count = static_cast<double>(count);
const auto target = std::max(floated_count * percentile / 100.0, 1.0);
auto scanned = 0.0;
for (int i = 0; i < kSampleLimit; i++) {
const auto samples_at_index = samples_[i];
scanned += static_cast<double>(samples_at_index);
if (scanned >= target) {
// "decompression" logic
auto floated = static_cast<double>(i);
auto unboosted = floated / kPrecision;
auto decompressed = std::exp(unboosted) - 1.0;
return static_cast<uint64_t>(decompressed);
}
}
LOG_FATAL("bug in Histogram::Percentile where it failed to return the {} percentile", percentile);
return 0;
}
};
class EventHistograms {
public:
explicit EventHistograms(Histogram *allocated_histograms) noexcept : histograms_(allocated_histograms) {}
auto &operator[](const Event event) { return histograms_[event]; }
const auto &operator[](const Event event) const { return histograms_[event]; }
void Measure(Event event, Value value);
static const Event num_histograms;
private:
Histogram *histograms_;
};
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
extern EventHistograms global_histograms;
void Measure(Event event, Value value);
const char *GetHistogramName(Event event);
const char *GetHistogramDocumentation(Event event);
const char *GetHistogramType(Event event);
Event HistogramEnd();
} // namespace memgraph::metrics

View File

@ -92,6 +92,12 @@ startup_config_dict = {
"1024",
"Memory warning threshold, in MB. If Memgraph detects there is less available RAM it will log a warning. Set to 0 to disable.",
),
"metrics_address": (
"0.0.0.0",
"0.0.0.0",
"IP address on which the Memgraph server for exposing metrics should listen.",
),
"metrics_port": ("9091", "9091", "Port on which the Memgraph server for exposing metrics should listen."),
"monitoring_address": (
"0.0.0.0",
"0.0.0.0",

View File

@ -224,6 +224,9 @@ target_link_libraries(${test_prefix}utils_algorithm mg-utils)
add_unit_test(utils_exceptions.cpp)
target_link_libraries(${test_prefix}utils_exceptions mg-utils)
add_unit_test(utils_histogram.cpp)
target_link_libraries(${test_prefix}utils_histogram mg-utils)
add_unit_test(utils_file.cpp)
target_link_libraries(${test_prefix}utils_file mg-utils)

View File

@ -0,0 +1,47 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "utils/event_histogram.hpp"
#include "utils/logging.hpp"
TEST(Histogram, BasicFunctionality) {
memgraph::metrics::Histogram histo{};
for (int i = 0; i < 9000; i++) {
histo.Measure(10);
}
for (int i = 0; i < 900; i++) {
histo.Measure(25);
}
for (int i = 0; i < 90; i++) {
histo.Measure(33);
}
for (int i = 0; i < 9; i++) {
histo.Measure(47);
}
histo.Measure(500);
ASSERT_EQ(histo.Percentile(0.0), 10);
ASSERT_EQ(histo.Percentile(99.0), 25);
ASSERT_EQ(histo.Percentile(99.89), 32);
ASSERT_EQ(histo.Percentile(99.99), 46);
ASSERT_EQ(histo.Percentile(100.0), 500);
uint64_t max = std::numeric_limits<uint64_t>::max();
histo.Measure(max);
auto observed_max = static_cast<double>(histo.Percentile(100.0));
auto diff = (max - observed_max) / max;
ASSERT_NEAR(diff, 0, 0.01);
}