diff --git a/src/flags/general.cpp b/src/flags/general.cpp index 6bee2e5b3..88973fc80 100644 --- a/src/flags/general.cpp +++ b/src/flags/general.cpp @@ -66,6 +66,9 @@ DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed i // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).", FLAG_IN_RANGE(1, 24UL * 3600)); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_uint64(storage_python_gc_cycle_sec, 180, + "Storage python full garbage collection interval (in seconds).", FLAG_IN_RANGE(1, 24UL * 3600)); // NOTE: The `storage_properties_on_edges` flag must be the same here and in // `mg_import_csv`. If you change it, make sure to change it there as well. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/flags/general.hpp b/src/flags/general.hpp index 890f32cd6..b6072250b 100644 --- a/src/flags/general.hpp +++ b/src/flags/general.hpp @@ -52,6 +52,8 @@ DECLARE_bool(allow_load_csv); // Storage flags. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) DECLARE_uint64(storage_gc_cycle_sec); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(storage_python_gc_cycle_sec); // NOTE: The `storage_properties_on_edges` flag must be the same here and in // `mg_import_csv`. If you change it, make sure to change it there as well. // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 217d4a71c..e6c8dd1cd 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -184,6 +184,10 @@ int main(int argc, char **argv) { "https://memgr.ph/python")); } + memgraph::utils::Scheduler python_gc_scheduler; + python_gc_scheduler.Run("Python GC", std::chrono::seconds(FLAGS_storage_python_gc_cycle_sec), + [] { memgraph::query::procedure::PyCollectGarbage(); }); + // Initialize the communication library. memgraph::communication::SSLInit sslInit; @@ -318,6 +322,11 @@ int main(int argc, char **argv) { .durability_directory = FLAGS_data_directory + "/rocksdb_durability", .wal_directory = FLAGS_data_directory + "/rocksdb_wal"}, .storage_mode = memgraph::flags::ParseStorageMode()}; + + memgraph::utils::Scheduler jemalloc_purge_scheduler; + jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec), + [] { memgraph::memory::PurgeUnusedMemory(); }); + if (FLAGS_storage_snapshot_interval_sec == 0) { if (FLAGS_storage_wal_enabled) { LOG_FATAL( diff --git a/src/query/procedure/py_module.cpp b/src/query/procedure/py_module.cpp index 3fd09b0ab..3b3f3e925 100644 --- a/src/query/procedure/py_module.cpp +++ b/src/query/procedure/py_module.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -57,7 +58,6 @@ PyObject *gMgpValueConversionError{nullptr}; // NOLINT(cppcoreguidelines-avo PyObject *gMgpSerializationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) PyObject *gMgpAuthorizationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables) -constexpr bool kStartGarbageCollection{true}; constexpr auto kMicrosecondsInMillisecond{1000}; constexpr auto kMicrosecondsInSecond{1000000}; @@ -867,6 +867,24 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) { return MgpListToPyTuple(list, reinterpret_cast(py_graph)); } +void PyCollectGarbage() { + if (!Py_IsInitialized() || _Py_IsFinalizing()) { + // Calling EnsureGIL will crash the program if this is true. + return; + } + + auto gil = py::EnsureGIL(); + + py::Object gc(PyImport_ImportModule("gc")); + if (!gc) { + LOG_FATAL(py::FetchError().value()); + } + + if (!gc.CallMethod("collect")) { + LOG_FATAL(py::FetchError().value()); + } +} + namespace { struct RecordFieldCache { PyObject *key; @@ -1027,24 +1045,8 @@ std::optional AddMultipleBatchRecordsFromPython(mgp_result *r return std::nullopt; } -std::function PyObjectCleanup(py::Object &py_object, bool start_gc) { - return [py_object, start_gc]() { - if (start_gc) { - // Run `gc.collect` (reference cycle-detection) explicitly, so that we are - // sure the procedure cleaned up everything it held references to. If the - // user stored a reference to one of our `_mgp` instances then the - // internally used `mgp_*` structs will stay unfreed and a memory leak - // will be reported at the end of the query execution. - py::Object gc(PyImport_ImportModule("gc")); - if (!gc) { - LOG_FATAL(py::FetchError().value()); - } - - if (!gc.CallMethod("collect")) { - LOG_FATAL(py::FetchError().value()); - } - } - +std::function PyObjectCleanup(py::Object &py_object) { + return [py_object]() { // After making sure all references from our side have been cleared, // invalidate the `_mgp.Graph` object. If the user kept a reference to one // of our `_mgp` instances then this will prevent them from using those @@ -1095,7 +1097,7 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra std::optional maybe_msg; { py::Object py_graph(MakePyGraph(graph, memory)); - utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, !is_batched)); + utils::OnScopeExit clean_up(PyObjectCleanup(py_graph)); if (py_graph) { maybe_msg = error_to_msg(call(py_graph)); } else { @@ -1110,22 +1112,11 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra void CallPythonCleanup(const py::Object &py_cleanup) { auto gil = py::EnsureGIL(); - auto py_res = py_cleanup.Call(); - - py::Object gc(PyImport_ImportModule("gc")); - if (!gc) { - LOG_FATAL(py::FetchError().value()); - } - - if (!gc.CallMethod("collect")) { - LOG_FATAL(py::FetchError().value()); - } } void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp_graph *graph, mgp_memory *memory) { auto gil = py::EnsureGIL(); - auto error_to_msg = [](const std::optional &exc_info) -> std::optional { if (!exc_info) return std::nullopt; // Here we tell the traceback formatter to skip the first line of the @@ -1146,7 +1137,7 @@ void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp std::optional maybe_msg; { py::Object py_graph(MakePyGraph(graph, memory)); - utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, !kStartGarbageCollection)); + utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph)); if (py_graph) { maybe_msg = error_to_msg(call(py_graph)); } else { @@ -1194,8 +1185,8 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g py::Object py_graph(MakePyGraph(graph, memory)); py::Object py_messages(MakePyMessages(msgs, memory)); - utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, kStartGarbageCollection)); - utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages, kStartGarbageCollection)); + utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph)); + utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages)); if (py_graph && py_messages) { maybe_msg = error_to_msg(call(py_graph, py_messages)); @@ -1264,7 +1255,7 @@ void CallPythonFunction(const py::Object &py_cb, mgp_list *args, mgp_graph *grap std::optional maybe_msg; { py::Object py_graph(MakePyGraph(graph, memory)); - utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, kStartGarbageCollection)); + utils::OnScopeExit clean_up(PyObjectCleanup(py_graph)); if (py_graph) { auto maybe_result = call(py_graph); if (!maybe_result.HasError()) { diff --git a/src/query/procedure/py_module.hpp b/src/query/procedure/py_module.hpp index 85ad82b3e..9cb22fe2c 100644 --- a/src/query/procedure/py_module.hpp +++ b/src/query/procedure/py_module.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -79,4 +79,7 @@ py::Object ImportPyModule(const char *, mgp_module *); /// Return nullptr and set appropriate Python exception on failure. py::Object ReloadPyModule(PyObject *, mgp_module *); +/// Call full python circular reference garbage collection (all generations) +void PyCollectGarbage(); + } // namespace memgraph::query::procedure diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp index ddea80fde..932d8aa76 100644 --- a/src/storage/v2/inmemory/storage.cpp +++ b/src/storage/v2/inmemory/storage.cpp @@ -145,7 +145,6 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode) gc_runner_.Run("Storage GC", config_.gc.interval, [this] { this->FreeMemory(std::unique_lock{main_lock_, std::defer_lock}); }); - gc_jemalloc_runner_.Run("Jemalloc GC", config_.gc.interval, [] { memory::PurgeUnusedMemory(); }); } if (timestamp_ == kTimestampInitialId) { commit_log_.emplace(); @@ -159,7 +158,6 @@ InMemoryStorage::InMemoryStorage(Config config) : InMemoryStorage(config, Storag InMemoryStorage::~InMemoryStorage() { if (config_.gc.type == Config::Gc::Type::PERIODIC) { gc_runner_.Stop(); - gc_jemalloc_runner_.Stop(); } { // Stop replication (Stop all clients or stop the REPLICA server) diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp index 2d2837467..883c1c3ec 100644 --- a/src/storage/v2/inmemory/storage.hpp +++ b/src/storage/v2/inmemory/storage.hpp @@ -427,7 +427,6 @@ class InMemoryStorage final : public Storage { std::optional commit_log_; utils::Scheduler gc_runner_; - utils::Scheduler gc_jemalloc_runner_; std::mutex gc_lock_; using BondPmrLd = Bond>; diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index e1d42b443..d0029bbd7 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -143,6 +143,7 @@ startup_config_dict = { "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.", ), "storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."), + "storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."), "storage_items_per_batch": ( "1000000", "1000000",