Implement storage info
Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2593
This commit is contained in:
parent
12e4fff86b
commit
5fa6974919
@ -972,7 +972,12 @@ PreparedQuery PrepareAuthQuery(
|
||||
|
||||
PreparedQuery PrepareInfoQuery(
|
||||
ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
|
||||
InterpreterContext *interpreter_context, DbAccessor *dba,
|
||||
InterpreterContext *interpreter_context,
|
||||
#ifdef MG_SINGLE_NODE_V2
|
||||
storage::Storage *db,
|
||||
#else
|
||||
DbAccessor *dba,
|
||||
#endif
|
||||
utils::MonotonicBufferResource *execution_memory) {
|
||||
auto *info_query = utils::Downcast<InfoQuery>(parsed_query.query);
|
||||
std::vector<std::string> header;
|
||||
@ -982,7 +987,23 @@ PreparedQuery PrepareInfoQuery(
|
||||
|
||||
switch (info_query->info_type_) {
|
||||
case InfoQuery::InfoType::STORAGE:
|
||||
#if defined(MG_SINGLE_NODE)
|
||||
#if defined(MG_SINGLE_NODE_V2)
|
||||
header = {"storage info", "value"};
|
||||
handler = [db] {
|
||||
auto info = db->GetInfo();
|
||||
std::vector<std::vector<TypedValue>> results{
|
||||
{TypedValue("vertex_count"),
|
||||
TypedValue(static_cast<int64_t>(info.vertex_count))},
|
||||
{TypedValue("edge_count"),
|
||||
TypedValue(static_cast<int64_t>(info.edge_count))},
|
||||
{TypedValue("average_degree"), TypedValue(info.average_degree)},
|
||||
{TypedValue("memory_usage"),
|
||||
TypedValue(static_cast<int64_t>(info.memory_usage))},
|
||||
{TypedValue("disk_usage"),
|
||||
TypedValue(static_cast<int64_t>(info.disk_usage))}};
|
||||
return std::pair{results, QueryHandlerResult::COMMIT};
|
||||
};
|
||||
#elif defined(MG_SINGLE_NODE)
|
||||
header = {"storage info", "value"};
|
||||
handler = [dba] {
|
||||
auto info = dba->StorageInfo();
|
||||
@ -1342,13 +1363,15 @@ Interpreter::Prepare(const std::string &query_string,
|
||||
interpreter_context_, &*execution_db_accessor_, &execution_memory_);
|
||||
} else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
|
||||
#ifdef MG_SINGLE_NODE_V2
|
||||
DbAccessor *dba = nullptr;
|
||||
prepared_query = PrepareInfoQuery(
|
||||
std::move(parsed_query), &summary_, interpreter_context_,
|
||||
interpreter_context_->db, &execution_memory_);
|
||||
#else
|
||||
auto dba = &*execution_db_accessor_;
|
||||
#endif
|
||||
prepared_query =
|
||||
PrepareInfoQuery(std::move(parsed_query), &summary_,
|
||||
interpreter_context_, dba, &execution_memory_);
|
||||
#endif
|
||||
} else if (utils::Downcast<ConstraintQuery>(parsed_query.query)) {
|
||||
#ifdef MG_SINGLE_NODE_V2
|
||||
DbAccessor *dba = nullptr;
|
||||
|
@ -1282,12 +1282,14 @@ void WalFile::UpdateStats(uint64_t timestamp) {
|
||||
Durability::Durability(Config::Durability config,
|
||||
utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges,
|
||||
NameIdMapper *name_id_mapper, Indices *indices,
|
||||
NameIdMapper *name_id_mapper,
|
||||
std::atomic<uint64_t> *edge_count, Indices *indices,
|
||||
Constraints *constraints, Config::Items items)
|
||||
: config_(config),
|
||||
vertices_(vertices),
|
||||
edges_(edges),
|
||||
name_id_mapper_(name_id_mapper),
|
||||
edge_count_(edge_count),
|
||||
indices_(indices),
|
||||
constraints_(constraints),
|
||||
items_(items),
|
||||
@ -2121,6 +2123,9 @@ Durability::RecoveredSnapshot Durability::LoadSnapshot(
|
||||
return EdgeTypeId::FromUint(it->second);
|
||||
};
|
||||
|
||||
// Reset current edge count.
|
||||
edge_count_->store(0, std::memory_order_release);
|
||||
|
||||
{
|
||||
// Recover edges.
|
||||
auto edge_acc = edges_->access();
|
||||
@ -2368,6 +2373,9 @@ Durability::RecoveredSnapshot Durability::LoadSnapshot(
|
||||
vertex.out_edges.emplace_back(get_edge_type_from_id(*edge_type),
|
||||
&*to_vertex, edge_ref);
|
||||
}
|
||||
// Increment edge count. We only increment the count here because the
|
||||
// information is duplicated in in_edges.
|
||||
edge_count_->fetch_add(*out_size, std::memory_order_acq_rel);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2594,6 +2602,9 @@ Durability::RecoveryInfo Durability::LoadWal(
|
||||
|
||||
ret.next_edge_id = std::max(ret.next_edge_id, edge_gid.AsUint() + 1);
|
||||
|
||||
// Increment edge count.
|
||||
edge_count_->fetch_add(1, std::memory_order_acq_rel);
|
||||
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_DELETE: {
|
||||
@ -2640,6 +2651,9 @@ Durability::RecoveryInfo Durability::LoadWal(
|
||||
throw RecoveryFailure("The edge must be removed here!");
|
||||
}
|
||||
|
||||
// Decrement edge count.
|
||||
edge_count_->fetch_add(-1, std::memory_order_acq_rel);
|
||||
|
||||
break;
|
||||
}
|
||||
case WalDeltaData::Type::EDGE_SET_PROPERTY: {
|
||||
|
@ -366,7 +366,8 @@ class Durability final {
|
||||
|
||||
Durability(Config::Durability config, utils::SkipList<Vertex> *vertices,
|
||||
utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
|
||||
Indices *indices, Constraints *constraints, Config::Items items);
|
||||
std::atomic<uint64_t> *edge_count, Indices *indices,
|
||||
Constraints *constraints, Config::Items items);
|
||||
|
||||
std::optional<RecoveryInfo> Initialize(
|
||||
std::function<void(std::function<void(Transaction *)>)>
|
||||
@ -401,6 +402,7 @@ class Durability final {
|
||||
utils::SkipList<Vertex> *vertices_;
|
||||
utils::SkipList<Edge> *edges_;
|
||||
NameIdMapper *name_id_mapper_;
|
||||
std::atomic<uint64_t> *edge_count_;
|
||||
Indices *indices_;
|
||||
Constraints *constraints_;
|
||||
Config::Items items_;
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "utils/stat.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
@ -290,7 +291,7 @@ Storage::Storage(Config config)
|
||||
: indices_(config.items),
|
||||
config_(config),
|
||||
durability_(config.durability, &vertices_, &edges_, &name_id_mapper_,
|
||||
&indices_, &constraints_, config.items) {
|
||||
&edge_count_, &indices_, &constraints_, config.items) {
|
||||
auto info = durability_.Initialize([this](auto callback) {
|
||||
// Take master RW lock (for reading).
|
||||
std::shared_lock<utils::RWLock> storage_guard(main_lock_);
|
||||
@ -509,6 +510,9 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from,
|
||||
edge_type, from_vertex, edge);
|
||||
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
|
||||
|
||||
// Increment edge count.
|
||||
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
|
||||
|
||||
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_,
|
||||
&storage_->indices_, config_);
|
||||
}
|
||||
@ -597,6 +601,9 @@ Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
|
||||
CreateAndLinkDelta(&transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type,
|
||||
from_vertex, edge_ref);
|
||||
|
||||
// Decrement edge count.
|
||||
storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -767,6 +774,11 @@ void Storage::Accessor::Abort() {
|
||||
vertex->out_edges.end(), link);
|
||||
CHECK(it == vertex->out_edges.end()) << "Invalid database state!";
|
||||
vertex->out_edges.push_back(link);
|
||||
// Increment edge count. We only increment the count here because
|
||||
// the information in `ADD_IN_EDGE` and `Edge/RECREATE_OBJECT` is
|
||||
// redundant. Also, `Edge/RECREATE_OBJECT` isn't available when
|
||||
// edge properties are disabled.
|
||||
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_IN_EDGE: {
|
||||
@ -789,6 +801,11 @@ void Storage::Accessor::Abort() {
|
||||
CHECK(it != vertex->out_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->out_edges.rbegin());
|
||||
vertex->out_edges.pop_back();
|
||||
// Decrement edge count. We only decrement the count here because
|
||||
// the information in `REMOVE_IN_EDGE` and `Edge/DELETE_OBJECT` is
|
||||
// redundant. Also, `Edge/DELETE_OBJECT` isn't available when edge
|
||||
// properties are disabled.
|
||||
storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
@ -999,6 +1016,17 @@ ConstraintsInfo Storage::ListAllConstraints() const {
|
||||
return {ListExistenceConstraints(constraints_)};
|
||||
}
|
||||
|
||||
StorageInfo Storage::GetInfo() const {
|
||||
auto vertex_count = vertices_.size();
|
||||
auto edge_count = edge_count_.load(std::memory_order_acquire);
|
||||
double average_degree = 0.0;
|
||||
if (vertex_count) {
|
||||
average_degree = 2.0 * static_cast<double>(edge_count) / vertex_count;
|
||||
}
|
||||
return {vertex_count, edge_count, average_degree, utils::GetMemoryUsage(),
|
||||
utils::GetDirDiskUsage(config_.durability.storage_directory)};
|
||||
}
|
||||
|
||||
VerticesIterable Storage::Accessor::Vertices(LabelId label, View view) {
|
||||
return VerticesIterable(
|
||||
storage_->indices_.label_index.Vertices(label, view, &transaction_));
|
||||
|
@ -148,6 +148,15 @@ struct ConstraintsInfo {
|
||||
std::vector<std::pair<LabelId, PropertyId>> existence;
|
||||
};
|
||||
|
||||
/// Structure used to return information about the storage.
|
||||
struct StorageInfo {
|
||||
uint64_t vertex_count;
|
||||
uint64_t edge_count;
|
||||
double average_degree;
|
||||
uint64_t memory_usage;
|
||||
uint64_t disk_usage;
|
||||
};
|
||||
|
||||
class Storage final {
|
||||
public:
|
||||
/// @throw std::system_error
|
||||
@ -345,6 +354,8 @@ class Storage final {
|
||||
|
||||
ConstraintsInfo ListAllConstraints() const;
|
||||
|
||||
StorageInfo GetInfo() const;
|
||||
|
||||
private:
|
||||
Transaction CreateTransaction();
|
||||
|
||||
@ -365,6 +376,10 @@ class Storage final {
|
||||
utils::SkipList<storage::Edge> edges_;
|
||||
std::atomic<uint64_t> vertex_id_{0};
|
||||
std::atomic<uint64_t> edge_id_{0};
|
||||
// Even though the edge count is already kept in the `edges_` SkipList, the
|
||||
// list is used only when properties are enabled for edges. Because of that we
|
||||
// keep a separate count of edges that is always updated.
|
||||
std::atomic<uint64_t> edge_count_{0};
|
||||
|
||||
NameIdMapper name_id_mapper_;
|
||||
|
||||
|
@ -195,7 +195,7 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
|
||||
}
|
||||
|
||||
void VerifyDataset(storage::Storage *store, DatasetType type,
|
||||
bool properties_on_edges) {
|
||||
bool properties_on_edges, bool verify_info = true) {
|
||||
auto base_label_indexed = store->NameToLabel("base_indexed");
|
||||
auto base_label_unindexed = store->NameToLabel("base_unindexed");
|
||||
auto property_id = store->NameToProperty("id");
|
||||
@ -573,6 +573,27 @@ class DurabilityTest : public ::testing::TestWithParam<bool> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (verify_info) {
|
||||
auto info = store->GetInfo();
|
||||
if (have_base_dataset) {
|
||||
if (have_extended_dataset) {
|
||||
ASSERT_EQ(info.vertex_count, kNumBaseVertices + kNumExtendedVertices);
|
||||
ASSERT_EQ(info.edge_count, kNumBaseEdges + kNumExtendedEdges);
|
||||
} else {
|
||||
ASSERT_EQ(info.vertex_count, kNumBaseVertices);
|
||||
ASSERT_EQ(info.edge_count, kNumBaseEdges);
|
||||
}
|
||||
} else {
|
||||
if (have_extended_dataset) {
|
||||
ASSERT_EQ(info.vertex_count, kNumExtendedVertices);
|
||||
ASSERT_EQ(info.edge_count, kNumExtendedEdges);
|
||||
} else {
|
||||
ASSERT_EQ(info.vertex_count, 0);
|
||||
ASSERT_EQ(info.edge_count, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::filesystem::path> GetSnapshotsList() {
|
||||
@ -2330,7 +2351,8 @@ TEST_P(DurabilityTest, WalAndSnapshotAppendToExistingSnapshotAndWal) {
|
||||
storage::Storage store({.items = {.properties_on_edges = GetParam()},
|
||||
.durability = {.storage_directory = storage_directory,
|
||||
.recover_on_startup = true}});
|
||||
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam());
|
||||
VerifyDataset(&store, DatasetType::BASE_WITH_EXTENDED, GetParam(),
|
||||
/* verify_info = */ false);
|
||||
{
|
||||
auto acc = store.Access();
|
||||
auto vertex = acc.FindVertex(vertex_gid, storage::View::OLD);
|
||||
|
Loading…
Reference in New Issue
Block a user