diff --git a/.gitignore b/.gitignore index 435b236ee..48912923c 100644 --- a/.gitignore +++ b/.gitignore @@ -94,6 +94,8 @@ src/raft/raft_rpc_messages.capnp src/raft/raft_rpc_messages.hpp src/raft/snapshot_metadata.capnp src/raft/snapshot_metadata.hpp +src/raft/storage_info_rpc_messages.hpp +src/raft/storage_info_rpc_messages.capnp src/stats/stats_rpc_messages.capnp src/stats/stats_rpc_messages.hpp src/storage/distributed/rpc/concurrent_id_mapper_rpc_messages.capnp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5bd805a86..e426cadf0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -273,6 +273,7 @@ set(mg_single_node_ha_sources glue/communication.cpp raft/coordination.cpp raft/raft_server.cpp + raft/storage_info.cpp query/common.cpp query/frontend/ast/cypher_main_visitor.cpp query/frontend/ast/pretty_print.cpp @@ -319,6 +320,8 @@ add_lcp_single_node_ha(raft/log_entry.lcp CAPNP_SCHEMA @0x96c07fe13850c22a) add_capnp_single_node_ha(raft/log_entry.capnp) add_lcp_single_node_ha(raft/snapshot_metadata.lcp CAPNP_SCHEMA @0xaa08e34991680f6c) add_capnp_single_node_ha(raft/snapshot_metadata.capnp) +add_lcp_single_node_ha(raft/storage_info_rpc_messages.lcp CAPNP_SCHEMA @0xceee3960cd8eaa7e) +add_capnp_single_node_ha(raft/storage_info_rpc_messages.capnp) add_custom_target(generate_lcp_single_node_ha DEPENDS ${generated_lcp_single_node_ha_files}) diff --git a/src/database/single_node/graph_db_accessor.cpp b/src/database/single_node/graph_db_accessor.cpp index 6269c01c2..722ab6a85 100644 --- a/src/database/single_node/graph_db_accessor.cpp +++ b/src/database/single_node/graph_db_accessor.cpp @@ -489,17 +489,18 @@ std::vector GraphDbAccessor::IndexInfo() const { return info; } -std::map GraphDbAccessor::StorageInfo() const { - std::map info; +std::vector> GraphDbAccessor::StorageInfo() + const { + std::vector> info; db_.RefreshStat(); auto &stat = db_.GetStat(); - info.emplace("vertex_count", std::to_string(stat.vertex_count)); - info.emplace("edge_count", std::to_string(stat.edge_count)); - info.emplace("average_degree", std::to_string(stat.avg_degree)); - info.emplace("memory_usage", std::to_string(utils::GetMemoryUsage())); - info.emplace("disk_usage", std::to_string(db_.GetDurabilityDirDiskUsage())); + info.emplace_back("vertex_count", std::to_string(stat.vertex_count)); + info.emplace_back("edge_count", std::to_string(stat.edge_count)); + info.emplace_back("average_degree", std::to_string(stat.avg_degree)); + info.emplace_back("memory_usage", std::to_string(utils::GetMemoryUsage())); + info.emplace_back("disk_usage", std::to_string(db_.GetDurabilityDirDiskUsage())); return info; } diff --git a/src/database/single_node/graph_db_accessor.hpp b/src/database/single_node/graph_db_accessor.hpp index 0d4b89029..e2f35e36f 100644 --- a/src/database/single_node/graph_db_accessor.hpp +++ b/src/database/single_node/graph_db_accessor.hpp @@ -630,16 +630,16 @@ class GraphDbAccessor { std::vector IndexInfo() const; /** - * Returns a map containing storage information. + * Returns a vector containing storage information. * - * Inside the map, the following keys will exist: + * Inside the vector, the following storage stats will exist: * - vertex_count * - edge_count * - average_degree * - memory_usage * - disk_usage **/ - std::map StorageInfo() const; + std::vector> StorageInfo() const; /** * Insert this vertex into corresponding label and label+property (if it diff --git a/src/database/single_node_ha/graph_db.cpp b/src/database/single_node_ha/graph_db.cpp index 52bc16a76..ba44b4429 100644 --- a/src/database/single_node_ha/graph_db.cpp +++ b/src/database/single_node_ha/graph_db.cpp @@ -17,6 +17,7 @@ GraphDb::GraphDb(Config config) : config_(config) {} void GraphDb::Start() { utils::EnsureDirOrDie(config_.durability_directory); raft_server_.Start(); + storage_info_.Start(); CHECK(coordination_.Start()) << "Couldn't start coordination!"; // Start transaction killer. @@ -48,9 +49,7 @@ void GraphDb::AwaitShutdown(std::function call_before_shutdown) { }); } -void GraphDb::Shutdown() { - coordination_.Shutdown(); -} +void GraphDb::Shutdown() { coordination_.Shutdown(); } std::unique_ptr GraphDb::Access() { // NOTE: We are doing a heap allocation to allow polymorphism. If this poses @@ -74,6 +73,8 @@ Storage &GraphDb::storage() { return *storage_; } raft::RaftInterface *GraphDb::raft() { return &raft_server_; } +raft::StorageInfo *GraphDb::storage_info() { return &storage_info_; } + tx::Engine &GraphDb::tx_engine() { return tx_engine_; } storage::ConcurrentIdMapper &GraphDb::label_mapper() { diff --git a/src/database/single_node_ha/graph_db.hpp b/src/database/single_node_ha/graph_db.hpp index aad71257e..680cc7751 100644 --- a/src/database/single_node_ha/graph_db.hpp +++ b/src/database/single_node_ha/graph_db.hpp @@ -11,6 +11,7 @@ #include "io/network/endpoint.hpp" #include "raft/coordination.hpp" #include "raft/raft_server.hpp" +#include "raft/storage_info.hpp" #include "storage/common/types/types.hpp" #include "storage/single_node_ha/concurrent_id_mapper.hpp" #include "storage/single_node_ha/storage.hpp" @@ -108,6 +109,7 @@ class GraphDb { Storage &storage(); raft::RaftInterface *raft(); + raft::StorageInfo *storage_info(); tx::Engine &tx_engine(); storage::ConcurrentIdMapper &label_mapper(); storage::ConcurrentIdMapper &edge_type_mapper(); @@ -166,6 +168,7 @@ class GraphDb { &coordination_, &delta_applier_, this}; + raft::StorageInfo storage_info_{this, &coordination_, config_.server_id}; tx::Engine tx_engine_{&raft_server_}; std::unique_ptr storage_gc_ = std::make_unique( diff --git a/src/database/single_node_ha/graph_db_accessor.cpp b/src/database/single_node_ha/graph_db_accessor.cpp index d77cb7871..070ed8b2b 100644 --- a/src/database/single_node_ha/graph_db_accessor.cpp +++ b/src/database/single_node_ha/graph_db_accessor.cpp @@ -420,19 +420,9 @@ std::vector GraphDbAccessor::IndexInfo() const { return info; } -std::map GraphDbAccessor::StorageInfo() const { - std::map info; - - db_.RefreshStat(); - auto &stat = db_.GetStat(); - - info.emplace("vertex_count", std::to_string(stat.vertex_count)); - info.emplace("edge_count", std::to_string(stat.edge_count)); - info.emplace("average_degree", std::to_string(stat.avg_degree)); - info.emplace("memory_usage", std::to_string(utils::GetMemoryUsage())); - info.emplace("disk_usage", std::to_string(db_.GetDurabilityDirDiskUsage())); - - return info; +std::map>> +GraphDbAccessor::StorageInfo() const { + return db_.storage_info()->GetStorageInfo(); } } // namespace database diff --git a/src/database/single_node_ha/graph_db_accessor.hpp b/src/database/single_node_ha/graph_db_accessor.hpp index 4c8aba02d..e1c82e5a0 100644 --- a/src/database/single_node_ha/graph_db_accessor.hpp +++ b/src/database/single_node_ha/graph_db_accessor.hpp @@ -597,16 +597,17 @@ class GraphDbAccessor { std::vector IndexInfo() const; /** - * Returns a map containing storage information. + * Returns a map containing storage information for each Raft cluster member. * - * Inside the map, the following keys will exist: + * Inside the vector, the following storage stats will exist: * - vertex_count * - edge_count * - average_degree * - memory_usage * - disk_usage **/ - std::map StorageInfo() const; + std::map>> + StorageInfo() const; /** * Insert this vertex into corresponding label and label+property (if it diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 4bc1e9bcb..917f0d4c9 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -587,14 +587,27 @@ Callback HandleInfoQuery(InfoQuery *info_query, database::GraphDbAccessor *db_ac Callback callback; switch (info_query->info_type_) { case InfoQuery::InfoType::STORAGE: -#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA) +#if defined(MG_SINGLE_NODE) callback.header = {"storage info", "value"}; callback.fn = [db_accessor] { auto info = db_accessor->StorageInfo(); std::vector> results; results.reserve(info.size()); - for (const auto &kv : info) { - results.push_back({kv.first, kv.second}); + for (const auto &pair : info) { + results.push_back({pair.first, pair.second}); + } + return results; + }; +#elif defined(MG_SINGLE_NODE_HA) + callback.header = {"server id", "storage info", "value"}; + callback.fn = [db_accessor] { + auto info = db_accessor->StorageInfo(); + std::vector> results; + results.reserve(info.size()); + for (const auto &peer_info : info) { + for (const auto &pair : peer_info.second) { + results.push_back({peer_info.first, pair.first, pair.second}); + } } return results; }; diff --git a/src/raft/storage_info.cpp b/src/raft/storage_info.cpp new file mode 100644 index 000000000..63469c757 --- /dev/null +++ b/src/raft/storage_info.cpp @@ -0,0 +1,103 @@ +#include "raft/storage_info.hpp" + +#include + +#include "database/single_node_ha/graph_db.hpp" +#include "raft/coordination.hpp" +#include "raft/storage_info_rpc_messages.hpp" +#include "utils/future.hpp" +#include "utils/stat.hpp" + +namespace raft { + +using namespace std::literals::chrono_literals; +using Clock = std::chrono::system_clock; +using TimePoint = std::chrono::system_clock::time_point; + +const std::chrono::duration kRpcTimeout = 1s; + +StorageInfo::StorageInfo(database::GraphDb *db, Coordination *coordination, + uint16_t server_id) + : db_(db), coordination_(coordination), server_id_(server_id) { + CHECK(db) << "Graph DB can't be nullptr"; + CHECK(coordination) << "Coordination can't be nullptr"; +} + +StorageInfo::~StorageInfo() {} + +void StorageInfo::Start() { + coordination_->Register( + [this](const auto &req_reader, auto *res_builder) { + StorageInfoReq req; + Load(&req, req_reader); + + StorageInfoRes res(this->server_id_, this->GetLocalStorageInfo()); + Save(res, res_builder); + }); +} + +std::vector> +StorageInfo::GetLocalStorageInfo() const { + std::vector> info; + + db_->RefreshStat(); + auto &stat = db_->GetStat(); + + info.emplace_back("vertex_count", std::to_string(stat.vertex_count)); + info.emplace_back("edge_count", std::to_string(stat.edge_count)); + info.emplace_back("average_degree", std::to_string(stat.avg_degree)); + info.emplace_back("memory_usage", std::to_string(utils::GetMemoryUsage())); + info.emplace_back("disk_usage", + std::to_string(db_->GetDurabilityDirDiskUsage())); + + return info; +} + +std::map>> +StorageInfo::GetStorageInfo() const { + std::map>> info; + std::map> remote_storage_info_futures; + std::map received_reply; + + auto peers = coordination_->GetWorkerIds(); + + for (auto id : peers) { + received_reply[id] = false; + if (id == server_id_) { + info.emplace(std::to_string(id), GetLocalStorageInfo()); + received_reply[id] = true; + } else { + remote_storage_info_futures.emplace( + id, coordination_->ExecuteOnWorker( + id, [&](int worker_id, auto &client) { + try { + auto res = client.template Call(); + return res; + } catch (...) { + return StorageInfoRes(id, {}); + } + })); + } + } + + int16_t waiting_for = peers.size() - 1; + + TimePoint start = Clock::now(); + while (Clock::now() - start <= kRpcTimeout && waiting_for > 0) { + for (auto id : peers) { + if (received_reply[id]) continue; + auto &future = remote_storage_info_futures[id]; + if (!future.IsReady()) continue; + + auto reply = future.get(); + info.emplace(std::to_string(reply.server_id), + std::move(reply.storage_info)); + received_reply[id] = true; + waiting_for--; + } + } + + return info; +} + +} // namespace raft diff --git a/src/raft/storage_info.hpp b/src/raft/storage_info.hpp new file mode 100644 index 000000000..57dc72730 --- /dev/null +++ b/src/raft/storage_info.hpp @@ -0,0 +1,47 @@ +/// @file + +#pragma once + +#include +#include + +// Forward declaration +namespace database { +class GraphDb; +} // namespace database + +namespace raft { + +// Forward declaration +class Coordination; + +/// StorageInfo takes care of the Raft cluster storage info retrieval. +class StorageInfo final { + public: + StorageInfo() = delete; + StorageInfo(database::GraphDb *db, Coordination *coordination, + uint16_t server_id); + + StorageInfo(const StorageInfo &) = delete; + StorageInfo(StorageInfo &&) = delete; + StorageInfo operator=(const StorageInfo &) = delete; + StorageInfo operator=(StorageInfo &&) = delete; + + ~StorageInfo(); + + void Start(); + + /// Returns storage info for the local storage only. + std::vector> GetLocalStorageInfo() const; + + /// Returns storage info for each peer in the Raft cluster. + std::map>> + GetStorageInfo() const; + + private: + database::GraphDb *db_{nullptr}; + Coordination *coordination_{nullptr}; + uint16_t server_id_; +}; + +} // namespace raft diff --git a/src/raft/storage_info_rpc_messages.lcp b/src/raft/storage_info_rpc_messages.lcp new file mode 100644 index 000000000..14579b832 --- /dev/null +++ b/src/raft/storage_info_rpc_messages.lcp @@ -0,0 +1,42 @@ +#>cpp +#pragma once + +#include +#include + +#include "communication/rpc/messages.hpp" +#include "raft/storage_info_rpc_messages.capnp.h" +#include "rpc/serialization.hpp" +cpp<# + +(lcp:namespace raft) + +(lcp:capnp-namespace "raft") + +(lcp:capnp-import 'utils "/rpc/serialization.capnp") + +(lcp:define-rpc storage-info + (:request ()) + (:response + ((server-id :uint16_t) + (storage-info "std::vector>" + :capnp-type "List(Utils.Pair(Text, Text))" + :capnp-save + (lcp:capnp-save-vector + "utils::capnp::Pair<::capnp::Text, ::capnp::Text>" + "std::pair" + "[](auto *builder, const auto &pair) { + builder->setFirst(pair.first); + builder->setSecond(pair.second); + }") + :capnp-load + (lcp:capnp-load-vector + "utils::capnp::Pair<::capnp::Text, ::capnp::Text>" + "std::pair" + "[](const auto &reader) { + std::string first = reader.getFirst(); + std::string second = reader.getSecond(); + return std::make_pair(first, second); + }"))))) + +(lcp:pop-namespace) ;; raft