Introduce items_per_batch and recovery_thread_count flags

This commit is contained in:
János Benjamin Antal 2023-04-11 18:39:53 +02:00
parent a8b4a35ecb
commit f4157fb1fd
8 changed files with 53 additions and 44 deletions

View File

@ -227,6 +227,13 @@ DEFINE_VALIDATED_uint64(storage_wal_file_flush_every_n_tx,
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(storage_snapshot_on_exit, false, "Controls whether the storage creates another snapshot on exit.");
DEFINE_uint64(storage_items_per_batch, memgraph::storage::Config::Durability().items_per_batch,
"The number of edges and vertices stored in a batch in a snapshot file.");
DEFINE_uint64(storage_recovery_thread_count,
std::max(static_cast<uint64_t>(std::thread::hardware_concurrency()),
memgraph::storage::Config::Durability().recovery_thread_count),
"The number of threads used to recover persisted data from disk.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(telemetry_enabled, false,
"Set to true to enable telemetry. We collect information about the "
@ -887,7 +894,9 @@ int main(int argc, char **argv) {
.wal_file_size_kibibytes = FLAGS_storage_wal_file_size_kib,
.wal_file_flush_every_n_tx = FLAGS_storage_wal_file_flush_every_n_tx,
.snapshot_on_exit = FLAGS_storage_snapshot_on_exit,
.restore_replicas_on_startup = true},
.restore_replicas_on_startup = true,
.items_per_batch = FLAGS_storage_items_per_batch,
.recovery_thread_count = FLAGS_storage_recovery_thread_count},
.transaction = {.isolation_level = ParseIsolationLevel()}};
if (FLAGS_storage_snapshot_interval_sec == 0) {
if (FLAGS_storage_wal_enabled) {

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -50,6 +50,9 @@ struct Config {
bool snapshot_on_exit{false};
bool restore_replicas_on_startup{false};
uint64_t items_per_batch{1'000'000};
uint64_t recovery_thread_count{8};
} durability;
struct Transaction {

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -163,7 +163,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, Config::Items items,
Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num) {
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
spdlog::info("Recovering persisted data using snapshot ({}) and WAL directory ({}).", snapshot_directory,
@ -195,7 +195,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
}
spdlog::info("Starting snapshot recovery from {}.", path);
try {
recovered_snapshot = LoadSnapshot(path, vertices, edges, epoch_history, name_id_mapper, edge_count, items);
recovered_snapshot = LoadSnapshot(path, vertices, edges, epoch_history, name_id_mapper, edge_count, config);
spdlog::info("Snapshot recovery successful!");
break;
} catch (const RecoveryFailure &e) {
@ -319,7 +319,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
}
try {
auto info = LoadWal(wal_file.path, &indices_constraints, last_loaded_timestamp, vertices, edges, name_id_mapper,
edge_count, items);
edge_count, config.items);
recovery_info.next_vertex_id = std::max(recovery_info.next_vertex_id, info.next_vertex_id);
recovery_info.next_edge_id = std::max(recovery_info.next_edge_id, info.next_edge_id);
recovery_info.next_timestamp = std::max(recovery_info.next_timestamp, info.next_timestamp);

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -108,7 +108,7 @@ std::optional<RecoveryInfo> RecoverData(const std::filesystem::path &snapshot_di
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges,
std::atomic<uint64_t> *edge_count, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, Config::Items items,
Indices *indices, Constraints *constraints, const Config &config,
uint64_t *wal_seq_num);
} // namespace memgraph::storage::durability

View File

@ -10,9 +10,9 @@
// licenses/APL.txt.
#include "storage/v2/durability/snapshot.hpp"
#include <thread>
#include "spdlog/spdlog.h"
#include "storage/v2/durability/exceptions.hpp"
#include "storage/v2/durability/paths.hpp"
#include "storage/v2/durability/serialization.hpp"
@ -489,7 +489,7 @@ void LoadPartialConnectivity(const std::filesystem::path &path, utils::SkipList<
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, Config::Items items) {
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config) {
RecoveryInfo ret;
RecoveredIndicesAndConstraints indices_constraints;
@ -555,30 +555,30 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
// Reset current edge count.
edge_count->store(0, std::memory_order_release);
static constexpr auto kThreadCount = 2U;
{
spdlog::info("Recovering edges.");
// Recover edges.
if (snapshot_has_edges) {
std::vector<std::jthread> threads;
threads.reserve(kThreadCount);
threads.reserve(config.durability.recovery_thread_count);
if (!snapshot.SetPosition(info.offset_edge_batches)) {
throw RecoveryFailure("Couldn't read data from snapshot!");
}
const auto edge_batches = ReadBatchInfos<EdgeBatchInfo>(snapshot);
std::atomic<uint64_t> batch_counter = 0;
for (auto i{0U}; i < kThreadCount; ++i) {
threads.emplace_back([path, edges, &edge_batches, &batch_counter, items, &get_property_from_id]() mutable {
while (true) {
const auto batch_index = batch_counter++;
if (batch_index >= edge_batches.size()) {
return;
}
const auto &batch = edge_batches[batch_index];
LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id);
}
});
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
threads.emplace_back(
[path, edges, &edge_batches, &batch_counter, items = config.items, &get_property_from_id]() mutable {
while (true) {
const auto batch_index = batch_counter++;
if (batch_index >= edge_batches.size()) {
return;
}
const auto &batch = edge_batches[batch_index];
LoadPartialEdges(path, *edges, batch.offset, batch.count, items, get_property_from_id);
}
});
}
}
spdlog::info("Edges are recovered.");
@ -591,10 +591,10 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
const auto vertex_batches = ReadBatchInfos<VertexBatchInfo>(snapshot);
{
std::vector<std::jthread> threads;
threads.reserve(kThreadCount);
threads.reserve(config.durability.recovery_thread_count);
std::atomic<uint64_t> batch_counter = 0;
for (auto i{0U}; i < kThreadCount; ++i) {
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
threads.emplace_back([path, vertices, &vertex_batches, &batch_counter, name_id_mapper, &get_label_from_id,
&get_property_from_id]() mutable {
while (true) {
@ -615,11 +615,11 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
spdlog::info("Recover connectivity.");
{
std::vector<std::jthread> threads;
threads.reserve(kThreadCount);
threads.reserve(config.durability.recovery_thread_count);
std::atomic<uint64_t> batch_counter = 0;
for (auto i{0U}; i < kThreadCount; ++i) {
threads.emplace_back([path, vertices, edges, edge_count, &vertex_batches, &batch_counter, items,
for (auto i{0U}; i < config.durability.recovery_thread_count; ++i) {
threads.emplace_back([path, vertices, edges, edge_count, &vertex_batches, &batch_counter, items = config.items,
&get_edge_type_from_id]() mutable {
while (true) {
const auto batch_index = batch_counter++;
@ -781,7 +781,7 @@ RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipLis
void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, uint64_t snapshot_retention_count,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, Config::Items items, const std::string &uuid,
Indices *indices, Constraints *constraints, const Config &config, const std::string &uuid,
const std::string_view epoch_id, const std::deque<std::pair<std::string, uint64_t>> &epoch_history,
utils::FileRetainer *file_retainer) {
// Ensure that the storage directory exists.
@ -830,12 +830,11 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
};
std::vector<EdgeBatchInfo> edge_batch_infos;
static constexpr auto kDesiredEdgeCountPerBatch{1'000'000ULL};
auto items_in_current_batch{0UL};
offset_edges = snapshot.GetPosition();
auto batch_start_offset{offset_edges};
// Store all edges.
if (items.properties_on_edges) {
if (config.items.properties_on_edges) {
auto acc = edges->access();
for (auto &edge : acc) {
// The edge visibility check must be done here manually because we don't
@ -874,8 +873,8 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
// type and invalid from/to pointers because we don't know them here,
// but that isn't an issue because we won't use that part of the API
// here.
auto ea =
EdgeAccessor{edge_ref, EdgeTypeId::FromUint(0UL), nullptr, nullptr, transaction, indices, constraints, items};
auto ea = EdgeAccessor{
edge_ref, EdgeTypeId::FromUint(0UL), nullptr, nullptr, transaction, indices, constraints, config.items};
// Get edge data.
auto maybe_props = ea.Properties(View::OLD);
@ -895,7 +894,7 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
++edges_count;
++items_in_current_batch;
if (items_in_current_batch == kDesiredEdgeCountPerBatch) {
if (items_in_current_batch == config.durability.items_per_batch) {
edge_batch_infos.push_back(EdgeBatchInfo{batch_start_offset, items_in_current_batch});
batch_start_offset = snapshot.GetPosition();
items_in_current_batch = 0;
@ -910,7 +909,6 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
std::vector<VertexBatchInfo> vertex_batch_infos;
// Store all vertices.
{
static constexpr auto kDesiredVertexCountPerBatch{1'000'000ULL};
items_in_current_batch = 0;
offset_vertices = snapshot.GetPosition();
batch_start_offset = offset_vertices;
@ -921,7 +919,7 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
first_vertex_gid = vertex.gid.AsUint();
}
// The visibility check is implemented for vertices so we use it here.
auto va = VertexAccessor::Create(&vertex, transaction, indices, constraints, items, View::OLD);
auto va = VertexAccessor::Create(&vertex, transaction, indices, constraints, config.items, View::OLD);
if (!va) continue;
// Get vertex data.
@ -969,7 +967,7 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
++vertices_count;
++items_in_current_batch;
if (items_in_current_batch == kDesiredVertexCountPerBatch) {
if (items_in_current_batch == config.durability.items_per_batch) {
vertex_batch_infos.push_back(VertexBatchInfo{batch_start_offset, items_in_current_batch, *first_vertex_gid});
batch_start_offset = snapshot.GetPosition();
items_in_current_batch = 0;

View File

@ -64,13 +64,13 @@ SnapshotInfo ReadSnapshotInfo(const std::filesystem::path &path);
RecoveredSnapshot LoadSnapshot(const std::filesystem::path &path, utils::SkipList<Vertex> *vertices,
utils::SkipList<Edge> *edges,
std::deque<std::pair<std::string, uint64_t>> *epoch_history,
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, Config::Items items);
NameIdMapper *name_id_mapper, std::atomic<uint64_t> *edge_count, const Config &config);
/// Function used to create a snapshot using the given transaction.
void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snapshot_directory,
const std::filesystem::path &wal_directory, uint64_t snapshot_retention_count,
utils::SkipList<Vertex> *vertices, utils::SkipList<Edge> *edges, NameIdMapper *name_id_mapper,
Indices *indices, Constraints *constraints, Config::Items items, const std::string &uuid,
Indices *indices, Constraints *constraints, const Config &config, const std::string &uuid,
std::string_view epoch_id, const std::deque<std::pair<std::string, uint64_t>> &epoch_history,
utils::FileRetainer *file_retainer);

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -173,7 +173,7 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B
spdlog::debug("Loading snapshot");
auto recovered_snapshot = durability::LoadSnapshot(*maybe_snapshot_path, &storage_->vertices_, &storage_->edges_,
&storage_->epoch_history_, &storage_->name_id_mapper_,
&storage_->edge_count_, storage_->config_.items);
&storage_->edge_count_, storage_->config_);
spdlog::debug("Snapshot loaded successfully");
// If this step is present it should always be the first step of
// the recovery so we use the UUID we read from snasphost

View File

@ -358,7 +358,7 @@ Storage::Storage(Config config)
if (config_.durability.recover_on_startup) {
auto info = durability::RecoverData(snapshot_directory_, wal_directory_, &uuid_, &epoch_id_, &epoch_history_,
&vertices_, &edges_, &edge_count_, &name_id_mapper_, &indices_, &constraints_,
config_.items, &wal_seq_num_);
config_, &wal_seq_num_);
if (info) {
vertex_id_ = info->next_vertex_id;
edge_id_ = info->next_edge_id;
@ -1923,8 +1923,7 @@ utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot() {
// Create snapshot.
durability::CreateSnapshot(&transaction, snapshot_directory_, wal_directory_,
config_.durability.snapshot_retention_count, &vertices_, &edges_, &name_id_mapper_,
&indices_, &constraints_, config_.items, uuid_, epoch_id_, epoch_history_,
&file_retainer_);
&indices_, &constraints_, config_, uuid_, epoch_id_, epoch_history_, &file_retainer_);
// Finalize snapshot transaction.
commit_log_->MarkFinished(transaction.start_timestamp);