From 686fadf072836d7ac7693cffa45fb4b5e3efd6ad Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Ivan=20Milinovi=C4=87?=
 <44698587+imilinovic@users.noreply.github.com>
Date: Wed, 27 Dec 2023 11:51:10 +0100
Subject: [PATCH] Fix slow python QM (Python GC changes) (#1558)

---
 src/flags/general.cpp                     |  3 ++
 src/flags/general.hpp                     |  2 +
 src/memgraph.cpp                          |  9 ++++
 src/query/procedure/py_module.cpp         | 61 ++++++++++-------------
 src/query/procedure/py_module.hpp         |  5 +-
 src/storage/v2/inmemory/storage.cpp       |  2 -
 src/storage/v2/inmemory/storage.hpp       |  1 -
 tests/e2e/configuration/default_config.py |  1 +
 8 files changed, 45 insertions(+), 39 deletions(-)

diff --git a/src/flags/general.cpp b/src/flags/general.cpp
index 6bee2e5b3..88973fc80 100644
--- a/src/flags/general.cpp
+++ b/src/flags/general.cpp
@@ -66,6 +66,9 @@ DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed i
 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
 DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).",
                         FLAG_IN_RANGE(1, 24UL * 3600));
+// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
+DEFINE_VALIDATED_uint64(storage_python_gc_cycle_sec, 180,
+                        "Storage python full garbage collection interval (in seconds).", FLAG_IN_RANGE(1, 24UL * 3600));
 // NOTE: The `storage_properties_on_edges` flag must be the same here and in
 // `mg_import_csv`. If you change it, make sure to change it there as well.
 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
diff --git a/src/flags/general.hpp b/src/flags/general.hpp
index 890f32cd6..b6072250b 100644
--- a/src/flags/general.hpp
+++ b/src/flags/general.hpp
@@ -52,6 +52,8 @@ DECLARE_bool(allow_load_csv);
 // Storage flags.
 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
 DECLARE_uint64(storage_gc_cycle_sec);
+// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
+DECLARE_uint64(storage_python_gc_cycle_sec);
 // NOTE: The `storage_properties_on_edges` flag must be the same here and in
 // `mg_import_csv`. If you change it, make sure to change it there as well.
 // NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
diff --git a/src/memgraph.cpp b/src/memgraph.cpp
index 217d4a71c..e6c8dd1cd 100644
--- a/src/memgraph.cpp
+++ b/src/memgraph.cpp
@@ -184,6 +184,10 @@ int main(int argc, char **argv) {
                                                    "https://memgr.ph/python"));
   }
 
+  memgraph::utils::Scheduler python_gc_scheduler;
+  python_gc_scheduler.Run("Python GC", std::chrono::seconds(FLAGS_storage_python_gc_cycle_sec),
+                          [] { memgraph::query::procedure::PyCollectGarbage(); });
+
   // Initialize the communication library.
   memgraph::communication::SSLInit sslInit;
 
@@ -318,6 +322,11 @@ int main(int argc, char **argv) {
                .durability_directory = FLAGS_data_directory + "/rocksdb_durability",
                .wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
       .storage_mode = memgraph::flags::ParseStorageMode()};
+
+  memgraph::utils::Scheduler jemalloc_purge_scheduler;
+  jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec),
+                               [] { memgraph::memory::PurgeUnusedMemory(); });
+
   if (FLAGS_storage_snapshot_interval_sec == 0) {
     if (FLAGS_storage_wal_enabled) {
       LOG_FATAL(
diff --git a/src/query/procedure/py_module.cpp b/src/query/procedure/py_module.cpp
index 3fd09b0ab..3b3f3e925 100644
--- a/src/query/procedure/py_module.cpp
+++ b/src/query/procedure/py_module.cpp
@@ -13,6 +13,7 @@
 
 #include <datetime.h>
 #include <methodobject.h>
+#include <objimpl.h>
 #include <pyerrors.h>
 #include <array>
 #include <optional>
@@ -57,7 +58,6 @@ PyObject *gMgpValueConversionError{nullptr};     // NOLINT(cppcoreguidelines-avo
 PyObject *gMgpSerializationError{nullptr};       // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
 PyObject *gMgpAuthorizationError{nullptr};       // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
 
-constexpr bool kStartGarbageCollection{true};
 constexpr auto kMicrosecondsInMillisecond{1000};
 constexpr auto kMicrosecondsInSecond{1000000};
 
@@ -867,6 +867,24 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) {
   return MgpListToPyTuple(list, reinterpret_cast<PyGraph *>(py_graph));
 }
 
+void PyCollectGarbage() {
+  if (!Py_IsInitialized() || _Py_IsFinalizing()) {
+    // Calling EnsureGIL will crash the program if this is true.
+    return;
+  }
+
+  auto gil = py::EnsureGIL();
+
+  py::Object gc(PyImport_ImportModule("gc"));
+  if (!gc) {
+    LOG_FATAL(py::FetchError().value());
+  }
+
+  if (!gc.CallMethod("collect")) {
+    LOG_FATAL(py::FetchError().value());
+  }
+}
+
 namespace {
 struct RecordFieldCache {
   PyObject *key;
@@ -1027,24 +1045,8 @@ std::optional<py::ExceptionInfo> AddMultipleBatchRecordsFromPython(mgp_result *r
   return std::nullopt;
 }
 
-std::function<void()> PyObjectCleanup(py::Object &py_object, bool start_gc) {
-  return [py_object, start_gc]() {
-    if (start_gc) {
-      // Run `gc.collect` (reference cycle-detection) explicitly, so that we are
-      // sure the procedure cleaned up everything it held references to. If the
-      // user stored a reference to one of our `_mgp` instances then the
-      // internally used `mgp_*` structs will stay unfreed and a memory leak
-      // will be reported at the end of the query execution.
-      py::Object gc(PyImport_ImportModule("gc"));
-      if (!gc) {
-        LOG_FATAL(py::FetchError().value());
-      }
-
-      if (!gc.CallMethod("collect")) {
-        LOG_FATAL(py::FetchError().value());
-      }
-    }
-
+std::function<void()> PyObjectCleanup(py::Object &py_object) {
+  return [py_object]() {
     // After making sure all references from our side have been cleared,
     // invalidate the `_mgp.Graph` object. If the user kept a reference to one
     // of our `_mgp` instances then this will prevent them from using those
@@ -1095,7 +1097,7 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
   std::optional<std::string> maybe_msg;
   {
     py::Object py_graph(MakePyGraph(graph, memory));
-    utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, !is_batched));
+    utils::OnScopeExit clean_up(PyObjectCleanup(py_graph));
     if (py_graph) {
       maybe_msg = error_to_msg(call(py_graph));
     } else {
@@ -1110,22 +1112,11 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
 
 void CallPythonCleanup(const py::Object &py_cleanup) {
   auto gil = py::EnsureGIL();
-
   auto py_res = py_cleanup.Call();
-
-  py::Object gc(PyImport_ImportModule("gc"));
-  if (!gc) {
-    LOG_FATAL(py::FetchError().value());
-  }
-
-  if (!gc.CallMethod("collect")) {
-    LOG_FATAL(py::FetchError().value());
-  }
 }
 
 void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp_graph *graph, mgp_memory *memory) {
   auto gil = py::EnsureGIL();
-
   auto error_to_msg = [](const std::optional<py::ExceptionInfo> &exc_info) -> std::optional<std::string> {
     if (!exc_info) return std::nullopt;
     // Here we tell the traceback formatter to skip the first line of the
@@ -1146,7 +1137,7 @@ void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp
   std::optional<std::string> maybe_msg;
   {
     py::Object py_graph(MakePyGraph(graph, memory));
-    utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, !kStartGarbageCollection));
+    utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph));
     if (py_graph) {
       maybe_msg = error_to_msg(call(py_graph));
     } else {
@@ -1194,8 +1185,8 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g
     py::Object py_graph(MakePyGraph(graph, memory));
     py::Object py_messages(MakePyMessages(msgs, memory));
 
-    utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, kStartGarbageCollection));
-    utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages, kStartGarbageCollection));
+    utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph));
+    utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages));
 
     if (py_graph && py_messages) {
       maybe_msg = error_to_msg(call(py_graph, py_messages));
@@ -1264,7 +1255,7 @@ void CallPythonFunction(const py::Object &py_cb, mgp_list *args, mgp_graph *grap
   std::optional<std::string> maybe_msg;
   {
     py::Object py_graph(MakePyGraph(graph, memory));
-    utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, kStartGarbageCollection));
+    utils::OnScopeExit clean_up(PyObjectCleanup(py_graph));
     if (py_graph) {
       auto maybe_result = call(py_graph);
       if (!maybe_result.HasError()) {
diff --git a/src/query/procedure/py_module.hpp b/src/query/procedure/py_module.hpp
index 85ad82b3e..9cb22fe2c 100644
--- a/src/query/procedure/py_module.hpp
+++ b/src/query/procedure/py_module.hpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2023 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -79,4 +79,7 @@ py::Object ImportPyModule(const char *, mgp_module *);
 /// Return nullptr and set appropriate Python exception on failure.
 py::Object ReloadPyModule(PyObject *, mgp_module *);
 
+/// Call full python circular reference garbage collection (all generations)
+void PyCollectGarbage();
+
 }  // namespace memgraph::query::procedure
diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp
index ddea80fde..932d8aa76 100644
--- a/src/storage/v2/inmemory/storage.cpp
+++ b/src/storage/v2/inmemory/storage.cpp
@@ -145,7 +145,6 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
     gc_runner_.Run("Storage GC", config_.gc.interval, [this] {
       this->FreeMemory(std::unique_lock<utils::ResourceLock>{main_lock_, std::defer_lock});
     });
-    gc_jemalloc_runner_.Run("Jemalloc GC", config_.gc.interval, [] { memory::PurgeUnusedMemory(); });
   }
   if (timestamp_ == kTimestampInitialId) {
     commit_log_.emplace();
@@ -159,7 +158,6 @@ InMemoryStorage::InMemoryStorage(Config config) : InMemoryStorage(config, Storag
 InMemoryStorage::~InMemoryStorage() {
   if (config_.gc.type == Config::Gc::Type::PERIODIC) {
     gc_runner_.Stop();
-    gc_jemalloc_runner_.Stop();
   }
   {
     // Stop replication (Stop all clients or stop the REPLICA server)
diff --git a/src/storage/v2/inmemory/storage.hpp b/src/storage/v2/inmemory/storage.hpp
index 2d2837467..883c1c3ec 100644
--- a/src/storage/v2/inmemory/storage.hpp
+++ b/src/storage/v2/inmemory/storage.hpp
@@ -427,7 +427,6 @@ class InMemoryStorage final : public Storage {
   std::optional<CommitLog> commit_log_;
 
   utils::Scheduler gc_runner_;
-  utils::Scheduler gc_jemalloc_runner_;
   std::mutex gc_lock_;
 
   using BondPmrLd = Bond<utils::pmr::list<Delta>>;
diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py
index e1d42b443..d0029bbd7 100644
--- a/tests/e2e/configuration/default_config.py
+++ b/tests/e2e/configuration/default_config.py
@@ -143,6 +143,7 @@ startup_config_dict = {
         "The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.",
     ),
     "storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."),
+    "storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."),
     "storage_items_per_batch": (
         "1000000",
         "1000000",