diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e1a0ec95..f10c646bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ * Added support for programatically reading in data from CSV files through the `LOAD CSV` clause. We support CSV files with and without a header, the supported dialect being Excel. +* Added a new flag `--memory-limit` which enables the user to set the maximum total amount of memory + memgraph can allocate during its runtime. ### Bug Fixes diff --git a/config/flags.yaml b/config/flags.yaml index 5245a9d90..c610591d1 100644 --- a/config/flags.yaml +++ b/config/flags.yaml @@ -83,6 +83,10 @@ modifications: value: "/usr/lib/memgraph/auth_module/example.py" override: false + - name: "memory_limit" + value: "0" + override: true + undocumented: - "flag_file" - "also_log_to_stderr" diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2dbba0529..ddae1b4ad 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -37,17 +37,17 @@ if (MG_ENTERPRISE) glue/auth.cpp) endif() -set(MG_SINGLE_NODE_V2_LIBS stdc++fs Threads::Threads - telemetry_lib mg-query mg-communication mg-new-delete) +set(mg_single_node_v2_libs stdc++fs Threads::Threads + telemetry_lib mg-query mg-communication mg-new-delete mg-utils) if (MG_ENTERPRISE) # These are enterprise subsystems - set(MG_SINGLE_NODE_V2_LIBS ${MG_SINGLE_NODE_V2_LIBS} mg-auth mg-audit) + set(mg_single_node_v2_libs ${mg_single_node_v2_libs} mg-auth mg-audit) endif() # memgraph main executable add_executable(memgraph ${mg_single_node_v2_sources}) target_include_directories(memgraph PUBLIC ${CMAKE_SOURCE_DIR}/include) -target_link_libraries(memgraph ${MG_SINGLE_NODE_V2_LIBS}) +target_link_libraries(memgraph ${mg_single_node_v2_libs}) # NOTE: `include/mg_procedure.syms` describes a pattern match for symbols which # should be dynamically exported, so that `dlopen` can correctly link the # symbols in custom procedure module libraries. diff --git a/src/auth/models.cpp b/src/auth/models.cpp index b3e650b44..33ebc72fe 100644 --- a/src/auth/models.cpp +++ b/src/auth/models.cpp @@ -45,6 +45,8 @@ std::string PermissionToString(Permission permission) { return "LOCK_PATH"; case Permission::READ_FILE: return "READ_FILE"; + case Permission::FREE_MEMORY: + return "FREE_MEMORY"; case Permission::AUTH: return "AUTH"; } diff --git a/src/auth/models.hpp b/src/auth/models.hpp index 4453adaf8..f1139e1ca 100644 --- a/src/auth/models.hpp +++ b/src/auth/models.hpp @@ -24,15 +24,17 @@ enum class Permission : uint64_t { REPLICATION = 1U << 10U, LOCK_PATH = 1U << 11U, READ_FILE = 1U << 12U, + FREE_MEMORY = 1U << 13U, AUTH = 1U << 16U }; // clang-format on // Constant list of all available permissions. -const std::vector kPermissionsAll = { - Permission::MATCH, Permission::CREATE, Permission::MERGE, Permission::DELETE, Permission::SET, - Permission::REMOVE, Permission::INDEX, Permission::STATS, Permission::CONSTRAINT, Permission::DUMP, - Permission::AUTH, Permission::REPLICATION, Permission::LOCK_PATH, Permission::READ_FILE}; +const std::vector kPermissionsAll = {Permission::MATCH, Permission::CREATE, Permission::MERGE, + Permission::DELETE, Permission::SET, Permission::REMOVE, + Permission::INDEX, Permission::STATS, Permission::CONSTRAINT, + Permission::DUMP, Permission::AUTH, Permission::REPLICATION, + Permission::LOCK_PATH, Permission::READ_FILE, Permission::FREE_MEMORY}; // Function that converts a permission to its string representation. std::string PermissionToString(Permission permission); diff --git a/src/glue/auth.cpp b/src/glue/auth.cpp index 43af863b5..2a9932595 100644 --- a/src/glue/auth.cpp +++ b/src/glue/auth.cpp @@ -30,6 +30,8 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) { return auth::Permission::LOCK_PATH; case query::AuthQuery::Privilege::READ_FILE: return auth::Permission::READ_FILE; + case query::AuthQuery::Privilege::FREE_MEMORY: + return auth::Permission::FREE_MEMORY; case query::AuthQuery::Privilege::AUTH: return auth::Permission::AUTH; } diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 29037a0c0..e064a5c9e 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -167,6 +167,10 @@ DEFINE_VALIDATED_string(query_modules_directory, "", DEFINE_bool(also_log_to_stderr, false, "Log messages go to stderr in addition to logfiles"); DEFINE_string(log_file, "", "Path to where the log should be stored."); +DEFINE_uint64( + memory_limit, 0, + "Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap " + "is enabled and 90\% of the physical memory otherwise."); namespace { constexpr std::array log_level_mappings{ std::pair{"TRACE", spdlog::level::trace}, std::pair{"DEBUG", spdlog::level::debug}, @@ -236,6 +240,25 @@ void ConfigureLogging() { spdlog::flush_on(spdlog::level::trace); ParseLogLevel(); } + +int64_t GetMemoryLimit() { + if (FLAGS_memory_limit == 0) { + auto maybe_total_memory = utils::sysinfo::TotalMemory(); + MG_ASSERT(maybe_total_memory, "Failed to fetch the total physical memory"); + const auto maybe_swap_memory = utils::sysinfo::SwapTotalMemory(); + MG_ASSERT(maybe_swap_memory, "Failed to fetch the total swap memory"); + + if (*maybe_swap_memory == 0) { + // take only 90% of the total memory + *maybe_total_memory *= 9; + *maybe_total_memory /= 10; + } + return *maybe_total_memory * 1024; + } + + // We parse the memory as MiB every time + return FLAGS_memory_limit * 1024 * 1024; +} } // namespace /// Encapsulates Dbms and Interpreter that are passed through the network server @@ -876,10 +899,10 @@ int main(int argc, char **argv) { // Start memory warning logger. utils::Scheduler mem_log_scheduler; if (FLAGS_memory_warning_threshold > 0) { - auto free_ram = utils::sysinfo::AvailableMemoryKilobytes(); + auto free_ram = utils::sysinfo::AvailableMemory(); if (free_ram) { mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] { - auto free_ram = utils::sysinfo::AvailableMemoryKilobytes(); + auto free_ram = utils::sysinfo::AvailableMemory(); if (free_ram && *free_ram / 1024 < FLAGS_memory_warning_threshold) spdlog::warn("Running out of available RAM, only {} MB left", *free_ram / 1024); }); @@ -925,8 +948,9 @@ int main(int argc, char **argv) { // End enterprise features initialization #endif - // Main storage and execution engines initialization + utils::total_memory_tracker.SetHardLimit(GetMemoryLimit()); + // Main storage and execution engines initialization storage::Config db_config{ .gc = {.type = storage::Config::Gc::Type::PERIODIC, .interval = std::chrono::seconds(FLAGS_storage_gc_cycle_sec)}, .items = {.properties_on_edges = FLAGS_storage_properties_on_edges}, diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index 0a8bd5b11..cde5433d2 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -164,4 +164,9 @@ class LockPathModificationInMulticommandTxException : public QueryException { : QueryException("Lock path clause not allowed in multicommand transactions.") {} }; +class FreeMemoryModificationInMulticommandTxException : public QueryException { + public: + FreeMemoryModificationInMulticommandTxException() + : QueryException("Lock path clause not allowed in multicommand transactions.") {} +}; } // namespace query diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index bc70c822d..aaa855ac5 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -2191,7 +2191,7 @@ cpp<# (:serialize)) (lcp:define-enum privilege (create delete match merge set remove index stats auth constraint - dump replication lock_path read_file) + dump replication lock_path read_file free_memory) (:serialize)) #>cpp AuthQuery() = default; @@ -2228,7 +2228,8 @@ const std::vector kPrivilegesAll = { AuthQuery::Privilege::AUTH, AuthQuery::Privilege::CONSTRAINT, AuthQuery::Privilege::DUMP, AuthQuery::Privilege::REPLICATION, - AuthQuery::Privilege::LOCK_PATH}; + AuthQuery::Privilege::LOCK_PATH, + AuthQuery::Privilege::FREE_MEMORY}; cpp<# (lcp:define-class info-query (query) @@ -2394,4 +2395,12 @@ cpp<# (:serialize (:slk)) (:clone)) + (lcp:define-class free-memory-query (query) () + (:public + #>cpp + DEFVISITABLE(QueryVisitor); + cpp<#) + (:serialize (:slk)) + (:clone)) + (lcp:pop-namespace) ;; namespace query diff --git a/src/query/frontend/ast/ast_visitor.hpp b/src/query/frontend/ast/ast_visitor.hpp index cc4b7d268..2d5373853 100644 --- a/src/query/frontend/ast/ast_visitor.hpp +++ b/src/query/frontend/ast/ast_visitor.hpp @@ -75,6 +75,7 @@ class DumpQuery; class ReplicationQuery; class LockPathQuery; class LoadCsv; +class FreeMemoryQuery; using TreeCompositeVisitor = ::utils::CompositeVisitor< SingleQuery, CypherUnion, NamedExpression, OrOperator, XorOperator, AndOperator, NotOperator, AdditionOperator, @@ -108,6 +109,6 @@ class ExpressionVisitor template class QueryVisitor : public ::utils::Visitor {}; + ConstraintQuery, DumpQuery, ReplicationQuery, LockPathQuery, LoadCsv, FreeMemoryQuery> {}; } // namespace query diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index b206929ea..0c7fb2b33 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -304,6 +304,12 @@ antlrcpp::Any CypherMainVisitor::visitLoadCsv(MemgraphCypher::LoadCsvContext *ct return load_csv; } +antlrcpp::Any CypherMainVisitor::visitFreeMemoryQuery(MemgraphCypher::FreeMemoryQueryContext *ctx) { + auto *free_memory_query = storage_->Create(); + query_ = free_memory_query; + return free_memory_query; +} + antlrcpp::Any CypherMainVisitor::visitCypherUnion(MemgraphCypher::CypherUnionContext *ctx) { bool distinct = !ctx->ALL(); auto *cypher_union = storage_->Create(distinct); diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index 15eebdf30..03fde1119 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -213,6 +213,11 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitLoadCsv(MemgraphCypher::LoadCsvContext *ctx) override; + /** + * @return FreeMemoryQuery* + */ + antlrcpp::Any visitFreeMemoryQuery(MemgraphCypher::FreeMemoryQueryContext *ctx) override; + /** * @return CypherUnion* */ diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index f8785d13c..4d96ace10 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -20,6 +20,7 @@ memgraphCypherKeyword : cypherKeyword | DROP | DUMP | FOR + | FREE | FROM | GRANT | HEADER @@ -64,6 +65,7 @@ query : cypherQuery | dumpQuery | replicationQuery | lockPathQuery + | freeMemoryQuery ; authQuery : createRole @@ -176,4 +178,4 @@ showReplicas : SHOW REPLICAS ; lockPathQuery : ( LOCK | UNLOCK ) DATA DIRECTORY ; - +freeMemoryQuery : FREE MEMORY ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index 2f3a4be03..fbfc5a2d2 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -24,6 +24,7 @@ DIRECTORY : D I R E C T O R Y ; DROP : D R O P ; DUMP : D U M P ; FOR : F O R ; +FREE : F R E E ; FROM : F R O M ; GRANT : G R A N T ; GRANTS : G R A N T S ; diff --git a/src/query/frontend/semantic/required_privileges.cpp b/src/query/frontend/semantic/required_privileges.cpp index 9f187e48f..4cd046bf5 100644 --- a/src/query/frontend/semantic/required_privileges.cpp +++ b/src/query/frontend/semantic/required_privileges.cpp @@ -1,4 +1,5 @@ #include "query/frontend/ast/ast.hpp" +#include "query/frontend/ast/ast_visitor.hpp" namespace query { @@ -52,6 +53,8 @@ class PrivilegeExtractor : public QueryVisitor, public HierarchicalTreeVis void Visit(LoadCsv &load_csv) override { AddPrivilege(AuthQuery::Privilege::READ_FILE); } + void Visit(FreeMemoryQuery &free_memory_query) override { AddPrivilege(AuthQuery::Privilege::FREE_MEMORY); } + void Visit(ReplicationQuery &replication_query) override { switch (replication_query.action_) { case ReplicationQuery::Action::SET_REPLICATION_ROLE: diff --git a/src/query/frontend/stripped_lexer_constants.hpp b/src/query/frontend/stripped_lexer_constants.hpp index 89d1503f9..71949c4a7 100644 --- a/src/query/frontend/stripped_lexer_constants.hpp +++ b/src/query/frontend/stripped_lexer_constants.hpp @@ -87,7 +87,7 @@ const trie::Trie kKeywords = { "single", "true", "false", "reduce", "coalesce", "user", "password", "alter", "drop", "show", "stats", "unique", "explain", "profile", "storage", "index", "info", "exists", "assert", "constraint", "node", "key", "dump", "database", "call", "yield", "memory", - "mb", "kb", "unlimited"}; + "mb", "kb", "unlimited", "free"}; // Unicode codepoints that are allowed at the start of the unescaped name. const std::bitset kUnescapedNameAllowedStarts( diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index c1b8ee7c5..f9de34707 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -1162,6 +1162,22 @@ PreparedQuery PrepareLockPathQuery(ParsedQuery parsed_query, const bool in_expli RWType::NONE}; } +PreparedQuery PrepareFreeMemoryQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, + InterpreterContext *interpreter_context) { + if (in_explicit_transaction) { + throw FreeMemoryModificationInMulticommandTxException(); + } + + interpreter_context->db->FreeMemory(); + + return PreparedQuery{{}, + std::move(parsed_query.required_privileges), + [](AnyStream *stream, std::optional n) -> std::optional { + return QueryHandlerResult::COMMIT; + }, + RWType::NONE}; +} + PreparedQuery PrepareInfoQuery(ParsedQuery parsed_query, bool in_explicit_transaction, std::map *summary, InterpreterContext *interpreter_context, storage::Storage *db, utils::MonotonicBufferResource *execution_memory) { @@ -1488,6 +1504,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, &*execution_db_accessor_); + } else if (utils::Downcast(parsed_query.query)) { + prepared_query = PrepareFreeMemoryQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_); } else { LOG_FATAL("Should not get here -- unknown query type!"); } diff --git a/src/storage/v2/indices.cpp b/src/storage/v2/indices.cpp index d873e8ecf..f7bd634fd 100644 --- a/src/storage/v2/indices.cpp +++ b/src/storage/v2/indices.cpp @@ -349,6 +349,12 @@ LabelIndex::Iterable::Iterable(utils::SkipList::Accessor index_accessor, constraints_(constraints), config_(config) {} +void LabelIndex::RunGC() { + for (auto &index_entry : index_) { + index_entry.second.run_gc(); + } +} + bool LabelPropertyIndex::Entry::operator<(const Entry &rhs) { if (value < rhs.value) { return true; @@ -661,6 +667,12 @@ int64_t LabelPropertyIndex::ApproximateVertexCount(LabelId label, PropertyId pro return acc.estimate_range_count(lower, upper, utils::SkipListLayerForCountEstimation(acc.size())); } +void LabelPropertyIndex::RunGC() { + for (auto &index_entry : index_) { + index_entry.second.run_gc(); + } +} + void RemoveObsoleteEntries(Indices *indices, uint64_t oldest_active_start_timestamp) { indices->label_index.RemoveObsoleteEntries(oldest_active_start_timestamp); indices->label_property_index.RemoveObsoleteEntries(oldest_active_start_timestamp); diff --git a/src/storage/v2/indices.hpp b/src/storage/v2/indices.hpp index 7d7a30222..dd58e16fc 100644 --- a/src/storage/v2/indices.hpp +++ b/src/storage/v2/indices.hpp @@ -110,6 +110,8 @@ class LabelIndex { void Clear() { index_.clear(); } + void RunGC(); + private: std::map> index_; Indices *indices_; @@ -225,6 +227,8 @@ class LabelPropertyIndex { void Clear() { index_.clear(); } + void RunGC(); + private: std::map, utils::SkipList> index_; Indices *indices_; @@ -263,5 +267,4 @@ void UpdateOnAddLabel(Indices *indices, LabelId label, Vertex *vertex, const Tra /// @throw std::bad_alloc void UpdateOnSetProperty(Indices *indices, PropertyId property, const PropertyValue &value, Vertex *vertex, const Transaction &tx); - } // namespace storage diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index f0ebe9fe9..b699dc250 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -361,7 +361,7 @@ Storage::Storage(Config config) snapshot_runner_.Run("Snapshot", config_.durability.snapshot_interval, [this] { this->CreateSnapshot(); }); } if (config_.gc.type == Config::Gc::Type::PERIODIC) { - gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage(); }); + gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->CollectGarbage(); }); } if (timestamp_ == kTimestampInitialId) { @@ -1221,11 +1221,29 @@ Transaction Storage::CreateTransaction() { return {transaction_id, start_timestamp}; } +template void Storage::CollectGarbage() { - // Because the garbage collector iterates through the indices and constraints - // to clean them up, it must take the main lock for reading to make sure that - // the indices and constraints aren't concurrently being modified. - std::shared_lock main_guard(main_lock_); + if constexpr (force) { + // We take the unique lock on the main storage lock so we can forcefully clean + // everything we can + if (!main_lock_.try_lock()) { + CollectGarbage(); + return; + } + } else { + // Because the garbage collector iterates through the indices and constraints + // to clean them up, it must take the main lock for reading to make sure that + // the indices and constraints aren't concurrently being modified. + main_lock_.lock_shared(); + } + + utils::OnScopeExit lock_releaser{[&] { + if constexpr (force) { + main_lock_.unlock(); + } else { + main_lock_.unlock_shared(); + } + }}; // Garbage collection must be performed in two phases. In the first phase, // deltas that won't be applied by any transaction anymore are unlinked from @@ -1418,19 +1436,32 @@ void Storage::CollectGarbage() { } } - while (true) { - auto garbage_undo_buffers_ptr = garbage_undo_buffers_.Lock(); - if (garbage_undo_buffers_ptr->empty() || garbage_undo_buffers_ptr->front().first > oldest_active_start_timestamp) { - break; + garbage_undo_buffers_.WithLock([&](auto &undo_buffers) { + // if force is set to true we can simply delete all the leftover undos because + // no transaction is active + if constexpr (force) { + undo_buffers.clear(); + } else { + while (!undo_buffers.empty() && undo_buffers.front().first <= oldest_active_start_timestamp) { + undo_buffers.pop_front(); + } } - garbage_undo_buffers_ptr->pop_front(); - } + }); { auto vertex_acc = vertices_.access(); - while (!garbage_vertices_.empty() && garbage_vertices_.front().first < oldest_active_start_timestamp) { - MG_ASSERT(vertex_acc.remove(garbage_vertices_.front().second), "Invalid database state!"); - garbage_vertices_.pop_front(); + if constexpr (force) { + // if force is set to true, then we have unique_lock and no transactions are active + // so we can clean all of the deleted vertices + while (!garbage_vertices_.empty()) { + MG_ASSERT(vertex_acc.remove(garbage_vertices_.front().second), "Invalid database state!"); + garbage_vertices_.pop_front(); + } + } else { + while (!garbage_vertices_.empty() && garbage_vertices_.front().first < oldest_active_start_timestamp) { + MG_ASSERT(vertex_acc.remove(garbage_vertices_.front().second), "Invalid database state!"); + garbage_vertices_.pop_front(); + } } } { @@ -1441,6 +1472,10 @@ void Storage::CollectGarbage() { } } +// tell the linker he can find the CollectGarbage definitions here +template void Storage::CollectGarbage(); +template void Storage::CollectGarbage(); + bool Storage::InitializeWalFile() { if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL) return false; @@ -1702,6 +1737,16 @@ bool Storage::UnlockPath() { return true; } +void Storage::FreeMemory() { + CollectGarbage(); + + // SkipList is already threadsafe + vertices_.run_gc(); + edges_.run_gc(); + indices_.label_index.RunGC(); + indices_.label_property_index.RunGC(); +} + uint64_t Storage::CommitTimestamp(const std::optional desired_commit_timestamp) { if (!desired_commit_timestamp) { return timestamp_++; diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 712519701..6e3275caa 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -21,6 +21,7 @@ #include "storage/v2/vertex.hpp" #include "storage/v2/vertex_accessor.hpp" #include "utils/file_locker.hpp" +#include "utils/on_scope_exit.hpp" #include "utils/rw_lock.hpp" #include "utils/scheduler.hpp" #include "utils/skip_list.hpp" @@ -412,11 +413,23 @@ class Storage final { std::vector ReplicasInfo(); + void FreeMemory(); + private: Transaction CreateTransaction(); + /// The force parameter determines the behaviour of the garbage collector. + /// If it's set to true, it will behave as a global operation, i.e. it can't + /// be part of a transaction, and no other transaction can be active at the same time. + /// This allows it to delete immediately vertices without worrying that some other + /// transaction is possibly using it. If there are active transactions when this method + /// is called with force set to true, it will fallback to the same method with the force + /// set to false. + /// If it's set to false, it will execute in parallel with other transactions, ensuring + /// that no object in use can be deleted. /// @throw std::system_error /// @throw std::bad_alloc + template void CollectGarbage(); bool InitializeWalFile(); diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index f93e4ab2f..36e143317 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -6,6 +6,7 @@ set(utils_src_files memory.cpp memory_tracker.cpp signals.cpp + sysinfo/memory.cpp thread.cpp thread_pool.cpp uuid.cpp) diff --git a/src/utils/skip_list.hpp b/src/utils/skip_list.hpp index fd7687504..fd2295fea 100644 --- a/src/utils/skip_list.hpp +++ b/src/utils/skip_list.hpp @@ -259,7 +259,7 @@ class SkipListGc final { } void Collect(TNode *node) { - std::lock_guard guard(lock_); + std::unique_lock guard(lock_); deleted_.Push({accessor_id_.load(std::memory_order_acquire), node}); } @@ -895,6 +895,8 @@ class SkipList final { gc_.Clear(); } + void run_gc() { gc_.Run(); } + private: template int find_node(const TKey &key, TNode *preds[], TNode *succs[]) const { diff --git a/src/utils/sysinfo/memory.cpp b/src/utils/sysinfo/memory.cpp new file mode 100644 index 000000000..5ee1c4fc4 --- /dev/null +++ b/src/utils/sysinfo/memory.cpp @@ -0,0 +1,33 @@ +#include "utils/sysinfo/memory.hpp" + +namespace utils::sysinfo { + +namespace { +std::optional ExtractAmountFromMemInfo(const std::string_view header_name) { + std::string token; + std::ifstream meminfo("/proc/meminfo"); + const auto meminfo_header = fmt::format("{}:", header_name); + while (meminfo >> token) { + if (token == meminfo_header) { + uint64_t mem = 0; + if (meminfo >> mem) { + return mem; + } else { + return std::nullopt; + } + } + meminfo.ignore(std::numeric_limits::max(), '\n'); + } + SPDLOG_WARN("Failed to read {} from /proc/meminfo", header_name); + return std::nullopt; +} + +} // namespace + +std::optional AvailableMemory() { return ExtractAmountFromMemInfo("MemAvailable"); } + +std::optional TotalMemory() { return ExtractAmountFromMemInfo("MemTotal"); } + +std::optional SwapTotalMemory() { return ExtractAmountFromMemInfo("SwapTotal"); } + +} // namespace utils::sysinfo diff --git a/src/utils/sysinfo/memory.hpp b/src/utils/sysinfo/memory.hpp index 264d94079..6971833a8 100644 --- a/src/utils/sysinfo/memory.hpp +++ b/src/utils/sysinfo/memory.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include #include @@ -8,25 +10,21 @@ namespace utils::sysinfo { /** - * Gets the amount of available RAM in kilobytes. If the information is + * Gets the amount of available RAM in KiB. If the information is * unavalable an empty value is returned. */ -inline std::optional AvailableMemoryKilobytes() { - std::string token; - std::ifstream meminfo("/proc/meminfo"); - while (meminfo >> token) { - if (token == "MemAvailable:") { - uint64_t mem = 0; - if (meminfo >> mem) { - return mem; - } else { - return std::nullopt; - } - } - meminfo.ignore(std::numeric_limits::max(), '\n'); - } - SPDLOG_WARN("Failed to read amount of available memory from /proc/meminfo"); - return std::nullopt; -} +std::optional AvailableMemory(); + +/** + * Gets the amount of total RAM in KiB. If the information is + * unavalable an empty value is returned. + */ +std::optional TotalMemory(); + +/** + * Gets the amount of total swap space in KiB. If the information is + * unavalable an empty value is returned. + */ +std::optional SwapTotalMemory(); } // namespace utils::sysinfo diff --git a/tests/unit/utils_memory_tracker.cpp b/tests/unit/utils_memory_tracker.cpp index 42012d5b4..37acc07b4 100644 --- a/tests/unit/utils_memory_tracker.cpp +++ b/tests/unit/utils_memory_tracker.cpp @@ -3,6 +3,7 @@ #include #include +#include TEST(MemoryTrackerTest, ExceptionEnabler) { utils::MemoryTracker memory_tracker; @@ -16,11 +17,15 @@ TEST(MemoryTrackerTest, ExceptionEnabler) { // wait until the second thread creates exception enabler while (!enabler_created) ; - ASSERT_NO_THROW(memory_tracker.Alloc(hard_limit + 1)); - ASSERT_EQ(memory_tracker.Amount(), hard_limit + 1); - // tell the second thread it can finish its test - can_continue = true; + // we use the OnScopeExit so the test doesn't deadlock when + // an ASSERT fails + utils::OnScopeExit thread_notifier{[&] { + // tell the second thread it can finish its test + can_continue = true; + }}; + + ASSERT_NO_THROW(memory_tracker.Alloc(hard_limit + 1)); }}; std::thread t2{[&] {