Fix slow python QM (Python GC changes) (#1558)

This commit is contained in:
Ivan Milinović 2023-12-27 11:51:10 +01:00 committed by GitHub
parent 9e76021b94
commit 686fadf072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 45 additions and 39 deletions

View File

@ -66,6 +66,9 @@ DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed i
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).",
FLAG_IN_RANGE(1, 24UL * 3600));
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_VALIDATED_uint64(storage_python_gc_cycle_sec, 180,
"Storage python full garbage collection interval (in seconds).", FLAG_IN_RANGE(1, 24UL * 3600));
// NOTE: The `storage_properties_on_edges` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -52,6 +52,8 @@ DECLARE_bool(allow_load_csv);
// Storage flags.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint64(storage_gc_cycle_sec);
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DECLARE_uint64(storage_python_gc_cycle_sec);
// NOTE: The `storage_properties_on_edges` flag must be the same here and in
// `mg_import_csv`. If you change it, make sure to change it there as well.
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)

View File

@ -184,6 +184,10 @@ int main(int argc, char **argv) {
"https://memgr.ph/python"));
}
memgraph::utils::Scheduler python_gc_scheduler;
python_gc_scheduler.Run("Python GC", std::chrono::seconds(FLAGS_storage_python_gc_cycle_sec),
[] { memgraph::query::procedure::PyCollectGarbage(); });
// Initialize the communication library.
memgraph::communication::SSLInit sslInit;
@ -318,6 +322,11 @@ int main(int argc, char **argv) {
.durability_directory = FLAGS_data_directory + "/rocksdb_durability",
.wal_directory = FLAGS_data_directory + "/rocksdb_wal"},
.storage_mode = memgraph::flags::ParseStorageMode()};
memgraph::utils::Scheduler jemalloc_purge_scheduler;
jemalloc_purge_scheduler.Run("Jemalloc purge", std::chrono::seconds(FLAGS_storage_gc_cycle_sec),
[] { memgraph::memory::PurgeUnusedMemory(); });
if (FLAGS_storage_snapshot_interval_sec == 0) {
if (FLAGS_storage_wal_enabled) {
LOG_FATAL(

View File

@ -13,6 +13,7 @@
#include <datetime.h>
#include <methodobject.h>
#include <objimpl.h>
#include <pyerrors.h>
#include <array>
#include <optional>
@ -57,7 +58,6 @@ PyObject *gMgpValueConversionError{nullptr}; // NOLINT(cppcoreguidelines-avo
PyObject *gMgpSerializationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
PyObject *gMgpAuthorizationError{nullptr}; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
constexpr bool kStartGarbageCollection{true};
constexpr auto kMicrosecondsInMillisecond{1000};
constexpr auto kMicrosecondsInSecond{1000000};
@ -867,6 +867,24 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) {
return MgpListToPyTuple(list, reinterpret_cast<PyGraph *>(py_graph));
}
void PyCollectGarbage() {
if (!Py_IsInitialized() || _Py_IsFinalizing()) {
// Calling EnsureGIL will crash the program if this is true.
return;
}
auto gil = py::EnsureGIL();
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
}
namespace {
struct RecordFieldCache {
PyObject *key;
@ -1027,24 +1045,8 @@ std::optional<py::ExceptionInfo> AddMultipleBatchRecordsFromPython(mgp_result *r
return std::nullopt;
}
std::function<void()> PyObjectCleanup(py::Object &py_object, bool start_gc) {
return [py_object, start_gc]() {
if (start_gc) {
// Run `gc.collect` (reference cycle-detection) explicitly, so that we are
// sure the procedure cleaned up everything it held references to. If the
// user stored a reference to one of our `_mgp` instances then the
// internally used `mgp_*` structs will stay unfreed and a memory leak
// will be reported at the end of the query execution.
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
}
std::function<void()> PyObjectCleanup(py::Object &py_object) {
return [py_object]() {
// After making sure all references from our side have been cleared,
// invalidate the `_mgp.Graph` object. If the user kept a reference to one
// of our `_mgp` instances then this will prevent them from using those
@ -1095,7 +1097,7 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
std::optional<std::string> maybe_msg;
{
py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, !is_batched));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph));
if (py_graph) {
maybe_msg = error_to_msg(call(py_graph));
} else {
@ -1110,22 +1112,11 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
void CallPythonCleanup(const py::Object &py_cleanup) {
auto gil = py::EnsureGIL();
auto py_res = py_cleanup.Call();
py::Object gc(PyImport_ImportModule("gc"));
if (!gc) {
LOG_FATAL(py::FetchError().value());
}
if (!gc.CallMethod("collect")) {
LOG_FATAL(py::FetchError().value());
}
}
void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp_graph *graph, mgp_memory *memory) {
auto gil = py::EnsureGIL();
auto error_to_msg = [](const std::optional<py::ExceptionInfo> &exc_info) -> std::optional<std::string> {
if (!exc_info) return std::nullopt;
// Here we tell the traceback formatter to skip the first line of the
@ -1146,7 +1137,7 @@ void CallPythonInitializer(const py::Object &py_initializer, mgp_list *args, mgp
std::optional<std::string> maybe_msg;
{
py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, !kStartGarbageCollection));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph));
if (py_graph) {
maybe_msg = error_to_msg(call(py_graph));
} else {
@ -1194,8 +1185,8 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g
py::Object py_graph(MakePyGraph(graph, memory));
py::Object py_messages(MakePyMessages(msgs, memory));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph, kStartGarbageCollection));
utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages, kStartGarbageCollection));
utils::OnScopeExit clean_up_graph(PyObjectCleanup(py_graph));
utils::OnScopeExit clean_up_messages(PyObjectCleanup(py_messages));
if (py_graph && py_messages) {
maybe_msg = error_to_msg(call(py_graph, py_messages));
@ -1264,7 +1255,7 @@ void CallPythonFunction(const py::Object &py_cb, mgp_list *args, mgp_graph *grap
std::optional<std::string> maybe_msg;
{
py::Object py_graph(MakePyGraph(graph, memory));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph, kStartGarbageCollection));
utils::OnScopeExit clean_up(PyObjectCleanup(py_graph));
if (py_graph) {
auto maybe_result = call(py_graph);
if (!maybe_result.HasError()) {

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -79,4 +79,7 @@ py::Object ImportPyModule(const char *, mgp_module *);
/// Return nullptr and set appropriate Python exception on failure.
py::Object ReloadPyModule(PyObject *, mgp_module *);
/// Call full python circular reference garbage collection (all generations)
void PyCollectGarbage();
} // namespace memgraph::query::procedure

View File

@ -145,7 +145,6 @@ InMemoryStorage::InMemoryStorage(Config config, StorageMode storage_mode)
gc_runner_.Run("Storage GC", config_.gc.interval, [this] {
this->FreeMemory(std::unique_lock<utils::ResourceLock>{main_lock_, std::defer_lock});
});
gc_jemalloc_runner_.Run("Jemalloc GC", config_.gc.interval, [] { memory::PurgeUnusedMemory(); });
}
if (timestamp_ == kTimestampInitialId) {
commit_log_.emplace();
@ -159,7 +158,6 @@ InMemoryStorage::InMemoryStorage(Config config) : InMemoryStorage(config, Storag
InMemoryStorage::~InMemoryStorage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop();
gc_jemalloc_runner_.Stop();
}
{
// Stop replication (Stop all clients or stop the REPLICA server)

View File

@ -427,7 +427,6 @@ class InMemoryStorage final : public Storage {
std::optional<CommitLog> commit_log_;
utils::Scheduler gc_runner_;
utils::Scheduler gc_jemalloc_runner_;
std::mutex gc_lock_;
using BondPmrLd = Bond<utils::pmr::list<Delta>>;

View File

@ -143,6 +143,7 @@ startup_config_dict = {
"The time duration between two replica checks/pings. If < 1, replicas will NOT be checked at all. NOTE: The MAIN instance allocates a new thread for each REPLICA.",
),
"storage_gc_cycle_sec": ("30", "30", "Storage garbage collector interval (in seconds)."),
"storage_python_gc_cycle_sec": ("180", "180", "Storage python full garbage collection interval (in seconds)."),
"storage_items_per_batch": (
"1000000",
"1000000",