Add proper storage stats for HA

Summary:
`SHOW STORAGE STATS` when executed in a Raft cluster should return
stats for each member of the cluster.

`StorageStats` starts a RPC server on each member of the cluster that answers
about its local storage stats.

The query can be invoked only on the current leader, the leader sends a request
to each peer and shows the results it gets. If some peers don't answer within 1
second, stats for those peers won't be shown.

The new output can be seen here: P27

Reviewers: ipaljak, mferencevic

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1907
This commit is contained in:
Matija Santl 2019-03-05 15:02:16 +01:00
parent 03feffc8cf
commit 1de34d8b92
12 changed files with 238 additions and 32 deletions

2
.gitignore vendored
View File

@ -94,6 +94,8 @@ src/raft/raft_rpc_messages.capnp
src/raft/raft_rpc_messages.hpp
src/raft/snapshot_metadata.capnp
src/raft/snapshot_metadata.hpp
src/raft/storage_info_rpc_messages.hpp
src/raft/storage_info_rpc_messages.capnp
src/stats/stats_rpc_messages.capnp
src/stats/stats_rpc_messages.hpp
src/storage/distributed/rpc/concurrent_id_mapper_rpc_messages.capnp

View File

@ -273,6 +273,7 @@ set(mg_single_node_ha_sources
glue/communication.cpp
raft/coordination.cpp
raft/raft_server.cpp
raft/storage_info.cpp
query/common.cpp
query/frontend/ast/cypher_main_visitor.cpp
query/frontend/ast/pretty_print.cpp
@ -319,6 +320,8 @@ add_lcp_single_node_ha(raft/log_entry.lcp CAPNP_SCHEMA @0x96c07fe13850c22a)
add_capnp_single_node_ha(raft/log_entry.capnp)
add_lcp_single_node_ha(raft/snapshot_metadata.lcp CAPNP_SCHEMA @0xaa08e34991680f6c)
add_capnp_single_node_ha(raft/snapshot_metadata.capnp)
add_lcp_single_node_ha(raft/storage_info_rpc_messages.lcp CAPNP_SCHEMA @0xceee3960cd8eaa7e)
add_capnp_single_node_ha(raft/storage_info_rpc_messages.capnp)
add_custom_target(generate_lcp_single_node_ha DEPENDS ${generated_lcp_single_node_ha_files})

View File

@ -489,17 +489,18 @@ std::vector<std::string> GraphDbAccessor::IndexInfo() const {
return info;
}
std::map<std::string, std::string> GraphDbAccessor::StorageInfo() const {
std::map<std::string, std::string> info;
std::vector<std::pair<std::string, std::string>> GraphDbAccessor::StorageInfo()
const {
std::vector<std::pair<std::string, std::string>> info;
db_.RefreshStat();
auto &stat = db_.GetStat();
info.emplace("vertex_count", std::to_string(stat.vertex_count));
info.emplace("edge_count", std::to_string(stat.edge_count));
info.emplace("average_degree", std::to_string(stat.avg_degree));
info.emplace("memory_usage", std::to_string(utils::GetMemoryUsage()));
info.emplace("disk_usage", std::to_string(db_.GetDurabilityDirDiskUsage()));
info.emplace_back("vertex_count", std::to_string(stat.vertex_count));
info.emplace_back("edge_count", std::to_string(stat.edge_count));
info.emplace_back("average_degree", std::to_string(stat.avg_degree));
info.emplace_back("memory_usage", std::to_string(utils::GetMemoryUsage()));
info.emplace_back("disk_usage", std::to_string(db_.GetDurabilityDirDiskUsage()));
return info;
}

View File

@ -630,16 +630,16 @@ class GraphDbAccessor {
std::vector<std::string> IndexInfo() const;
/**
* Returns a map containing storage information.
* Returns a vector containing storage information.
*
* Inside the map, the following keys will exist:
* Inside the vector, the following storage stats will exist:
* - vertex_count
* - edge_count
* - average_degree
* - memory_usage
* - disk_usage
**/
std::map<std::string, std::string> StorageInfo() const;
std::vector<std::pair<std::string, std::string>> StorageInfo() const;
/**
* Insert this vertex into corresponding label and label+property (if it

View File

@ -17,6 +17,7 @@ GraphDb::GraphDb(Config config) : config_(config) {}
void GraphDb::Start() {
utils::EnsureDirOrDie(config_.durability_directory);
raft_server_.Start();
storage_info_.Start();
CHECK(coordination_.Start()) << "Couldn't start coordination!";
// Start transaction killer.
@ -48,9 +49,7 @@ void GraphDb::AwaitShutdown(std::function<void(void)> call_before_shutdown) {
});
}
void GraphDb::Shutdown() {
coordination_.Shutdown();
}
void GraphDb::Shutdown() { coordination_.Shutdown(); }
std::unique_ptr<GraphDbAccessor> GraphDb::Access() {
// NOTE: We are doing a heap allocation to allow polymorphism. If this poses
@ -74,6 +73,8 @@ Storage &GraphDb::storage() { return *storage_; }
raft::RaftInterface *GraphDb::raft() { return &raft_server_; }
raft::StorageInfo *GraphDb::storage_info() { return &storage_info_; }
tx::Engine &GraphDb::tx_engine() { return tx_engine_; }
storage::ConcurrentIdMapper<storage::Label> &GraphDb::label_mapper() {

View File

@ -11,6 +11,7 @@
#include "io/network/endpoint.hpp"
#include "raft/coordination.hpp"
#include "raft/raft_server.hpp"
#include "raft/storage_info.hpp"
#include "storage/common/types/types.hpp"
#include "storage/single_node_ha/concurrent_id_mapper.hpp"
#include "storage/single_node_ha/storage.hpp"
@ -108,6 +109,7 @@ class GraphDb {
Storage &storage();
raft::RaftInterface *raft();
raft::StorageInfo *storage_info();
tx::Engine &tx_engine();
storage::ConcurrentIdMapper<storage::Label> &label_mapper();
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper();
@ -166,6 +168,7 @@ class GraphDb {
&coordination_,
&delta_applier_,
this};
raft::StorageInfo storage_info_{this, &coordination_, config_.server_id};
tx::Engine tx_engine_{&raft_server_};
std::unique_ptr<StorageGc> storage_gc_ = std::make_unique<StorageGc>(

View File

@ -420,19 +420,9 @@ std::vector<std::string> GraphDbAccessor::IndexInfo() const {
return info;
}
std::map<std::string, std::string> GraphDbAccessor::StorageInfo() const {
std::map<std::string, std::string> info;
db_.RefreshStat();
auto &stat = db_.GetStat();
info.emplace("vertex_count", std::to_string(stat.vertex_count));
info.emplace("edge_count", std::to_string(stat.edge_count));
info.emplace("average_degree", std::to_string(stat.avg_degree));
info.emplace("memory_usage", std::to_string(utils::GetMemoryUsage()));
info.emplace("disk_usage", std::to_string(db_.GetDurabilityDirDiskUsage()));
return info;
std::map<std::string, std::vector<std::pair<std::string, std::string>>>
GraphDbAccessor::StorageInfo() const {
return db_.storage_info()->GetStorageInfo();
}
} // namespace database

View File

@ -597,16 +597,17 @@ class GraphDbAccessor {
std::vector<std::string> IndexInfo() const;
/**
* Returns a map containing storage information.
* Returns a map containing storage information for each Raft cluster member.
*
* Inside the map, the following keys will exist:
* Inside the vector, the following storage stats will exist:
* - vertex_count
* - edge_count
* - average_degree
* - memory_usage
* - disk_usage
**/
std::map<std::string, std::string> StorageInfo() const;
std::map<std::string, std::vector<std::pair<std::string, std::string>>>
StorageInfo() const;
/**
* Insert this vertex into corresponding label and label+property (if it

View File

@ -587,14 +587,27 @@ Callback HandleInfoQuery(InfoQuery *info_query, database::GraphDbAccessor *db_ac
Callback callback;
switch (info_query->info_type_) {
case InfoQuery::InfoType::STORAGE:
#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA)
#if defined(MG_SINGLE_NODE)
callback.header = {"storage info", "value"};
callback.fn = [db_accessor] {
auto info = db_accessor->StorageInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(info.size());
for (const auto &kv : info) {
results.push_back({kv.first, kv.second});
for (const auto &pair : info) {
results.push_back({pair.first, pair.second});
}
return results;
};
#elif defined(MG_SINGLE_NODE_HA)
callback.header = {"server id", "storage info", "value"};
callback.fn = [db_accessor] {
auto info = db_accessor->StorageInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(info.size());
for (const auto &peer_info : info) {
for (const auto &pair : peer_info.second) {
results.push_back({peer_info.first, pair.first, pair.second});
}
}
return results;
};

103
src/raft/storage_info.cpp Normal file
View File

@ -0,0 +1,103 @@
#include "raft/storage_info.hpp"
#include <chrono>
#include "database/single_node_ha/graph_db.hpp"
#include "raft/coordination.hpp"
#include "raft/storage_info_rpc_messages.hpp"
#include "utils/future.hpp"
#include "utils/stat.hpp"
namespace raft {
using namespace std::literals::chrono_literals;
using Clock = std::chrono::system_clock;
using TimePoint = std::chrono::system_clock::time_point;
const std::chrono::duration<int64_t> kRpcTimeout = 1s;
StorageInfo::StorageInfo(database::GraphDb *db, Coordination *coordination,
uint16_t server_id)
: db_(db), coordination_(coordination), server_id_(server_id) {
CHECK(db) << "Graph DB can't be nullptr";
CHECK(coordination) << "Coordination can't be nullptr";
}
StorageInfo::~StorageInfo() {}
void StorageInfo::Start() {
coordination_->Register<StorageInfoRpc>(
[this](const auto &req_reader, auto *res_builder) {
StorageInfoReq req;
Load(&req, req_reader);
StorageInfoRes res(this->server_id_, this->GetLocalStorageInfo());
Save(res, res_builder);
});
}
std::vector<std::pair<std::string, std::string>>
StorageInfo::GetLocalStorageInfo() const {
std::vector<std::pair<std::string, std::string>> info;
db_->RefreshStat();
auto &stat = db_->GetStat();
info.emplace_back("vertex_count", std::to_string(stat.vertex_count));
info.emplace_back("edge_count", std::to_string(stat.edge_count));
info.emplace_back("average_degree", std::to_string(stat.avg_degree));
info.emplace_back("memory_usage", std::to_string(utils::GetMemoryUsage()));
info.emplace_back("disk_usage",
std::to_string(db_->GetDurabilityDirDiskUsage()));
return info;
}
std::map<std::string, std::vector<std::pair<std::string, std::string>>>
StorageInfo::GetStorageInfo() const {
std::map<std::string, std::vector<std::pair<std::string, std::string>>> info;
std::map<uint16_t, utils::Future<StorageInfoRes>> remote_storage_info_futures;
std::map<uint16_t, bool> received_reply;
auto peers = coordination_->GetWorkerIds();
for (auto id : peers) {
received_reply[id] = false;
if (id == server_id_) {
info.emplace(std::to_string(id), GetLocalStorageInfo());
received_reply[id] = true;
} else {
remote_storage_info_futures.emplace(
id, coordination_->ExecuteOnWorker<StorageInfoRes>(
id, [&](int worker_id, auto &client) {
try {
auto res = client.template Call<StorageInfoRpc>();
return res;
} catch (...) {
return StorageInfoRes(id, {});
}
}));
}
}
int16_t waiting_for = peers.size() - 1;
TimePoint start = Clock::now();
while (Clock::now() - start <= kRpcTimeout && waiting_for > 0) {
for (auto id : peers) {
if (received_reply[id]) continue;
auto &future = remote_storage_info_futures[id];
if (!future.IsReady()) continue;
auto reply = future.get();
info.emplace(std::to_string(reply.server_id),
std::move(reply.storage_info));
received_reply[id] = true;
waiting_for--;
}
}
return info;
}
} // namespace raft

47
src/raft/storage_info.hpp Normal file
View File

@ -0,0 +1,47 @@
/// @file
#pragma once
#include <map>
#include <vector>
// Forward declaration
namespace database {
class GraphDb;
} // namespace database
namespace raft {
// Forward declaration
class Coordination;
/// StorageInfo takes care of the Raft cluster storage info retrieval.
class StorageInfo final {
public:
StorageInfo() = delete;
StorageInfo(database::GraphDb *db, Coordination *coordination,
uint16_t server_id);
StorageInfo(const StorageInfo &) = delete;
StorageInfo(StorageInfo &&) = delete;
StorageInfo operator=(const StorageInfo &) = delete;
StorageInfo operator=(StorageInfo &&) = delete;
~StorageInfo();
void Start();
/// Returns storage info for the local storage only.
std::vector<std::pair<std::string, std::string>> GetLocalStorageInfo() const;
/// Returns storage info for each peer in the Raft cluster.
std::map<std::string, std::vector<std::pair<std::string, std::string>>>
GetStorageInfo() const;
private:
database::GraphDb *db_{nullptr};
Coordination *coordination_{nullptr};
uint16_t server_id_;
};
} // namespace raft

View File

@ -0,0 +1,42 @@
#>cpp
#pragma once
#include <vector>
#include <string>
#include "communication/rpc/messages.hpp"
#include "raft/storage_info_rpc_messages.capnp.h"
#include "rpc/serialization.hpp"
cpp<#
(lcp:namespace raft)
(lcp:capnp-namespace "raft")
(lcp:capnp-import 'utils "/rpc/serialization.capnp")
(lcp:define-rpc storage-info
(:request ())
(:response
((server-id :uint16_t)
(storage-info "std::vector<std::pair<std::string, std::string>>"
:capnp-type "List(Utils.Pair(Text, Text))"
:capnp-save
(lcp:capnp-save-vector
"utils::capnp::Pair<::capnp::Text, ::capnp::Text>"
"std::pair<std::string, std::string>"
"[](auto *builder, const auto &pair) {
builder->setFirst(pair.first);
builder->setSecond(pair.second);
}")
:capnp-load
(lcp:capnp-load-vector
"utils::capnp::Pair<::capnp::Text, ::capnp::Text>"
"std::pair<std::string, std::string>"
"[](const auto &reader) {
std::string first = reader.getFirst();
std::string second = reader.getSecond();
return std::make_pair(first, second);
}")))))
(lcp:pop-namespace) ;; raft