Reduce memory consumption on return from python procedures (#932)
This commit is contained in:
parent
69634a5354
commit
208705f296
src/query
@ -51,7 +51,7 @@ extern const Event FailedQuery;
|
||||
namespace memgraph::query {
|
||||
|
||||
inline constexpr size_t kExecutionMemoryBlockSize = 1UL * 1024UL * 1024UL;
|
||||
inline constexpr size_t kExecutionPoolMaxBlockSize = 2048UL; // 2 ^ 11
|
||||
inline constexpr size_t kExecutionPoolMaxBlockSize = 1024UL; // 2 ^ 10
|
||||
|
||||
class AuthQueryHandler {
|
||||
public:
|
||||
|
@ -4545,7 +4545,7 @@ class CallProcedureCursor : public Cursor {
|
||||
result_row_it_ = result_.rows.begin();
|
||||
}
|
||||
|
||||
const auto &values = result_row_it_->values;
|
||||
auto &values = result_row_it_->values;
|
||||
// Check that the row has all fields as required by the result signature.
|
||||
// C API guarantees that it's impossible to set fields which are not part of
|
||||
// the result record, but it does not gurantee that some may be missing. See
|
||||
@ -4563,7 +4563,7 @@ class CallProcedureCursor : public Cursor {
|
||||
throw QueryRuntimeException("Procedure '{}' did not yield a record with '{}' field.", self_->procedure_name_,
|
||||
field_name);
|
||||
}
|
||||
frame[self_->result_symbols_[i]] = result_it->second;
|
||||
frame[self_->result_symbols_[i]] = std::move(result_it->second);
|
||||
}
|
||||
++result_row_it_;
|
||||
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <datetime.h>
|
||||
#include <pyerrors.h>
|
||||
#include <array>
|
||||
#include <optional>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
@ -860,7 +861,7 @@ py::Object MgpListToPyTuple(mgp_list *list, PyObject *py_graph) {
|
||||
}
|
||||
|
||||
namespace {
|
||||
std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Object py_record) {
|
||||
std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Object py_record, mgp_memory *memory) {
|
||||
py::Object py_mgp(PyImport_ImportModule("mgp"));
|
||||
if (!py_mgp) return py::FetchError();
|
||||
auto record_cls = py_mgp.GetAttr("Record");
|
||||
@ -902,8 +903,8 @@ std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Obj
|
||||
if (!field_name) return py::FetchError();
|
||||
auto *val = PyTuple_GetItem(item, 1);
|
||||
if (!val) return py::FetchError();
|
||||
mgp_memory memory{result->rows.get_allocator().GetMemoryResource()};
|
||||
mgp_value *field_val = PyObjectToMgpValueWithPythonExceptions(val, &memory);
|
||||
// This memory is one dedicated for mg_procedure.
|
||||
mgp_value *field_val = PyObjectToMgpValueWithPythonExceptions(val, memory);
|
||||
if (field_val == nullptr) {
|
||||
return py::FetchError();
|
||||
}
|
||||
@ -921,15 +922,26 @@ std::optional<py::ExceptionInfo> AddRecordFromPython(mgp_result *result, py::Obj
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<py::ExceptionInfo> AddMultipleRecordsFromPython(mgp_result *result, py::Object py_seq) {
|
||||
std::optional<py::ExceptionInfo> AddMultipleRecordsFromPython(mgp_result *result, py::Object py_seq,
|
||||
mgp_memory *memory) {
|
||||
Py_ssize_t len = PySequence_Size(py_seq.Ptr());
|
||||
if (len == -1) return py::FetchError();
|
||||
for (Py_ssize_t i = 0; i < len; ++i) {
|
||||
py::Object py_record(PySequence_GetItem(py_seq.Ptr(), i));
|
||||
result->rows.reserve(len);
|
||||
// This proved to be good enough constant not to lose performance on transformation
|
||||
static constexpr auto del_cnt{100000};
|
||||
for (Py_ssize_t i = 0, curr_item = 0; i < len; ++i, ++curr_item) {
|
||||
py::Object py_record(PySequence_GetItem(py_seq.Ptr(), curr_item));
|
||||
if (!py_record) return py::FetchError();
|
||||
auto maybe_exc = AddRecordFromPython(result, py_record);
|
||||
auto maybe_exc = AddRecordFromPython(result, py_record, memory);
|
||||
if (maybe_exc) return maybe_exc;
|
||||
// Once PySequence_DelSlice deletes "transformed" objects, starting index is 0 again.
|
||||
if (i && i % del_cnt == 0) {
|
||||
PySequence_DelSlice(py_seq.Ptr(), 0, del_cnt);
|
||||
curr_item = -1;
|
||||
}
|
||||
}
|
||||
// Clear at the end what left
|
||||
PySequence_DelSlice(py_seq.Ptr(), 0, PySequence_Size(py_seq.Ptr()));
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
@ -962,6 +974,7 @@ std::function<void()> PyObjectCleanup(py::Object &py_object) {
|
||||
|
||||
void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *graph, mgp_result *result,
|
||||
mgp_memory *memory) {
|
||||
// *memory here is memory from `EvalContext`
|
||||
auto gil = py::EnsureGIL();
|
||||
|
||||
auto error_to_msg = [](const std::optional<py::ExceptionInfo> &exc_info) -> std::optional<std::string> {
|
||||
@ -979,9 +992,9 @@ void CallPythonProcedure(const py::Object &py_cb, mgp_list *args, mgp_graph *gra
|
||||
auto py_res = py_cb.Call(py_graph, py_args);
|
||||
if (!py_res) return py::FetchError();
|
||||
if (PySequence_Check(py_res.Ptr())) {
|
||||
return AddMultipleRecordsFromPython(result, py_res);
|
||||
return AddMultipleRecordsFromPython(result, py_res, memory);
|
||||
} else {
|
||||
return AddRecordFromPython(result, py_res);
|
||||
return AddRecordFromPython(result, py_res, memory);
|
||||
}
|
||||
};
|
||||
|
||||
@ -1027,9 +1040,9 @@ void CallPythonTransformation(const py::Object &py_cb, mgp_messages *msgs, mgp_g
|
||||
auto py_res = py_cb.Call(py_graph, py_messages);
|
||||
if (!py_res) return py::FetchError();
|
||||
if (PySequence_Check(py_res.Ptr())) {
|
||||
return AddMultipleRecordsFromPython(result, py_res);
|
||||
return AddMultipleRecordsFromPython(result, py_res, memory);
|
||||
}
|
||||
return AddRecordFromPython(result, py_res);
|
||||
return AddRecordFromPython(result, py_res, memory);
|
||||
};
|
||||
|
||||
// It is *VERY IMPORTANT* to note that this code takes great care not to keep
|
||||
|
Loading…
Reference in New Issue
Block a user