From 208705f29643e4977850148d1fdc11859c95fee9 Mon Sep 17 00:00:00 2001 From: Antonio Filipovic <61245998+antoniofilipovic@users.noreply.github.com> Date: Tue, 16 May 2023 10:33:09 +0200 Subject: [PATCH] Reduce memory consumption on return from python procedures (#932) --- src/query/interpreter.hpp | 2 +- src/query/plan/operator.cpp | 4 ++-- src/query/procedure/py_module.cpp | 35 +++++++++++++++++++++---------- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index ba91ccf0d..0036b1c0d 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -51,7 +51,7 @@ extern const Event FailedQuery; namespace memgraph::query { inline constexpr size_t kExecutionMemoryBlockSize = 1UL * 1024UL * 1024UL; -inline constexpr size_t kExecutionPoolMaxBlockSize = 2048UL; // 2 ^ 11 +inline constexpr size_t kExecutionPoolMaxBlockSize = 1024UL; // 2 ^ 10 class AuthQueryHandler { public: diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index fb729e392..60eb7ff63 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -4545,7 +4545,7 @@ class CallProcedureCursor : public Cursor { result_row_it_ = result_.rows.begin(); } - const auto &values = result_row_it_->values; + auto &values = result_row_it_->values; // Check that the row has all fields as required by the result signature. // C API guarantees that it's impossible to set fields which are not part of // the result record, but it does not gurantee that some may be missing. See @@ -4563,7 +4563,7 @@ class CallProcedureCursor : public Cursor { throw QueryRuntimeException("Procedure '{}' did not yield a record with '{}' field.", self_->procedure_name_, field_name); } - frame[self_->result_symbols_[i]] = result_it->second; + frame[self_->result_symbols_[i]] = std::move(result_it->second); } ++result_row_it_; diff --git a/src/query/procedure/py_module.cpp b/src/query/procedure/py_module.cpp index f1959649f..981777540 100644 --- a/src/query/procedure/py_module.cpp +++ b/src/query/procedure/py_module.cpp @@ -14,6 +14,7 @@ #include <datetime.h> #include <pyerrors.h> #include <array> +#include <optional> #include <sstream> #include <stdexcept> #include <string> @@ -860,7 +861,7 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) { } namespace { -std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Object py_record) { +std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Object py_record, mgp_memory *memory) { py::Object py_mgp(PyImport_ImportModule("mgp")); if (!py_mgp) return py::FetchError(); auto record_cls = py_mgp.GetAttr("Record"); @@ -902,8 +903,8 @@ std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Obj if (!field_name) return py::FetchError(); auto *val = PyTuple_GetItem(item, 1); if (!val) return py::FetchError(); - mgp_memory memory{result->rows.get_allocator().GetMemoryResource()}; - mgp_value *field_val = PyObjectToMgpValueWithPythonExceptions(val, &memory); + // This memory is one dedicated for mg_procedure. + mgp_value *field_val = PyObjectToMgpValueWithPythonExceptions(val, memory); if (field_val == nullptr) { return py::FetchError(); } @@ -921,15 +922,26 @@ std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Obj return std::nullopt; } -std::optional<py::ExceptionInfo> AddMultipleRecordsFromPython(mgp_result *result, py::Object py_seq) { +std::optional<py::ExceptionInfo> AddMultipleRecordsFromPython(mgp_result *result, py::Object py_seq, + mgp_memory *memory) { Py_ssize_t len = PySequence_Size(py_seq.Ptr()); if (len == -1) return py::FetchError(); - for (Py_ssize_t i = 0; i < len; ++i) { - py::Object py_record(PySequence_GetItem(py_seq.Ptr(), i)); + result->rows.reserve(len); + // This proved to be good enough constant not to lose performance on transformation + static constexpr auto del_cnt{100000}; + for (Py_ssize_t i = 0, curr_item = 0; i < len; ++i, ++curr_item) { + py::Object py_record(PySequence_GetItem(py_seq.Ptr(), curr_item)); if (!py_record) return py::FetchError(); - auto maybe_exc = AddRecordFromPython(result, py_record); + auto maybe_exc = AddRecordFromPython(result, py_record, memory); if (maybe_exc) return maybe_exc; + // Once PySequence_DelSlice deletes "transformed" objects, starting index is 0 again. + if (i && i % del_cnt == 0) { + PySequence_DelSlice(py_seq.Ptr(), 0, del_cnt); + curr_item = -1; + } } + // Clear at the end what left + PySequence_DelSlice(py_seq.Ptr(), 0, PySequence_Size(py_seq.Ptr())); return std::nullopt; } @@ -962,6 +974,7 @@ std::function<void()> PyObjectCleanup(py::Object &py_object) { void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *graph, mgp_result *result, mgp_memory *memory) { + // *memory here is memory from `EvalContext` auto gil = py::EnsureGIL(); auto error_to_msg = [](const std::optional<py::ExceptionInfo> &exc_info) -> std::optional<std::string> { @@ -979,9 +992,9 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra auto py_res = py_cb.Call(py_graph, py_args); if (!py_res) return py::FetchError(); if (PySequence_Check(py_res.Ptr())) { - return AddMultipleRecordsFromPython(result, py_res); + return AddMultipleRecordsFromPython(result, py_res, memory); } else { - return AddRecordFromPython(result, py_res); + return AddRecordFromPython(result, py_res, memory); } }; @@ -1027,9 +1040,9 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g auto py_res = py_cb.Call(py_graph, py_messages); if (!py_res) return py::FetchError(); if (PySequence_Check(py_res.Ptr())) { - return AddMultipleRecordsFromPython(result, py_res); + return AddMultipleRecordsFromPython(result, py_res, memory); } - return AddRecordFromPython(result, py_res); + return AddRecordFromPython(result, py_res, memory); }; // It is *VERY IMPORTANT* to note that this code takes great care not to keep