From 90fa6d9226760e2ca7c701bf23e1632a3535980a Mon Sep 17 00:00:00 2001 From: Josip Mrden Date: Thu, 29 Feb 2024 17:54:46 +0100 Subject: [PATCH] Add pull mechanism --- include/_mgp.hpp | 4 +++ include/mg_procedure.h | 3 +++ include/mgp.hpp | 30 +++++++++++++++++++---- src/query/procedure/mg_procedure_impl.cpp | 20 +++++++++++++++ src/query/procedure/mg_procedure_impl.hpp | 1 + 5 files changed, 53 insertions(+), 5 deletions(-) diff --git a/include/_mgp.hpp b/include/_mgp.hpp index 57688dd71..6efa78ec9 100644 --- a/include/_mgp.hpp +++ b/include/_mgp.hpp @@ -867,4 +867,8 @@ inline const char *execution_headers_at(mgp_execution_headers *headers, size_t i return MgInvoke(mgp_execution_headers_at, headers, index); } +inline mgp_map *pull_one(mgp_execution_result *result, mgp_graph *graph, mgp_memory *memory) { + return MgInvoke(mgp_pull_one, result, graph, memory); +} + } // namespace mgp diff --git a/include/mg_procedure.h b/include/mg_procedure.h index b1766105d..de99a488b 100644 --- a/include/mg_procedure.h +++ b/include/mg_procedure.h @@ -1815,6 +1815,9 @@ enum mgp_error mgp_execute_query(struct mgp_graph *graph, struct mgp_memory *mem enum mgp_error mgp_fetch_execution_headers(struct mgp_execution_result *exec_result, struct mgp_execution_headers **headers); +enum mgp_error mgp_pull_one(struct mgp_execution_result *exec_result, struct mgp_graph *graph, + struct mgp_memory *memory, struct mgp_map **result); + /// @} #ifdef __cplusplus diff --git a/include/mgp.hpp b/include/mgp.hpp index 3bfe8148b..3bc11f5d5 100644 --- a/include/mgp.hpp +++ b/include/mgp.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -93,7 +94,9 @@ class Relationship; struct MapItem; class Duration; class Value; +class ExecutionHeaders; class QueryExecution; +class ExecutionRow; class ExecutionResult; struct StealType {}; @@ -1605,12 +1608,22 @@ class QueryExecution { mgp_graph *graph_; }; -class ExecutionResult { - mgp_execution_result *result_; +class ExecutionRow { + mgp_map *row_; public: - ExecutionResult(mgp_execution_result *result); + ExecutionRow(mgp_map *row); +}; + +class ExecutionResult { + public: + ExecutionResult(mgp_execution_result *result, mgp_graph *graph); ExecutionHeaders Headers() const; + std::optional PullOne() const; + + private: + mgp_execution_result *result_; + mgp_graph *graph_; }; enum class ProcedureType : uint8_t { @@ -4360,13 +4373,18 @@ inline std::string ExecutionHeaders::At(size_t index) const { inline QueryExecution::QueryExecution(mgp_graph *graph) : graph_(graph) {} inline ExecutionResult QueryExecution::ExecuteQuery(std::string_view query) const { - return ExecutionResult(mgp::MemHandlerCallback(execute_query, graph_, query.data())); + return ExecutionResult(mgp::MemHandlerCallback(execute_query, graph_, query.data()), graph_); } -inline ExecutionResult::ExecutionResult(mgp_execution_result *result) : result_(result) {} +inline ExecutionResult::ExecutionResult(mgp_execution_result *result, mgp_graph *graph) + : result_(result), graph_(graph) {} inline ExecutionHeaders ExecutionResult::Headers() const { return mgp::fetch_execution_headers(result_); }; +inline std::optional ExecutionResult::PullOne() const { + return ExecutionRow(mgp::MemHandlerCallback(pull_one, result_, graph_)); +} + inline bool ExecutionHeaders::Iterator::operator==(const Iterator &other) const { return iterable_ == other.iterable_ && index_ == other.index_; } @@ -4395,6 +4413,8 @@ inline ExecutionHeaders::Iterator ExecutionHeaders::cbegin() { return Iterator(t inline ExecutionHeaders::Iterator ExecutionHeaders::cend() { return Iterator(this, Size()); } +inline ExecutionRow::ExecutionRow(mgp_map *row) : row_(row) {} + // do not enter namespace detail { inline void AddParamsReturnsToProc(mgp_proc *proc, std::vector ¶meters, diff --git a/src/query/procedure/mg_procedure_impl.cpp b/src/query/procedure/mg_procedure_impl.cpp index 451c4cb45..1800a9302 100644 --- a/src/query/procedure/mg_procedure_impl.cpp +++ b/src/query/procedure/mg_procedure_impl.cpp @@ -4105,3 +4105,23 @@ mgp_error mgp_execute_query(mgp_graph *graph, mgp_memory *memory, const char *qu mgp_error mgp_fetch_execution_headers(mgp_execution_result *exec_result, mgp_execution_headers **result) { return WrapExceptions([exec_result]() { return &exec_result->headers; }, result); } + +mgp_error mgp_pull_one(mgp_execution_result *exec_result, mgp_graph *graph, mgp_memory *memory, mgp_map **result) { + return WrapExceptions( + [exec_result, graph, memory]() { + if (exec_result->index >= exec_result->rows.rows.size()) { + throw std::out_of_range("No more rows to pull from query execution!"); + } + + const size_t index_to_fetch = exec_result->index++; + const size_t headers_size = exec_result->headers.headers.size(); + memgraph::utils::pmr::map items(memory->impl); + for (size_t idx = 0; idx < headers_size; idx++) { + items.emplace(exec_result->headers.headers[idx], + mgp_value{exec_result->rows.rows[index_to_fetch][idx], graph, memory->impl}); + } + + return NewRawMgpObject(memory->impl, std::move(items)); + }, + result); +} diff --git a/src/query/procedure/mg_procedure_impl.hpp b/src/query/procedure/mg_procedure_impl.hpp index f0efc1605..610b5bf04 100644 --- a/src/query/procedure/mg_procedure_impl.hpp +++ b/src/query/procedure/mg_procedure_impl.hpp @@ -1020,4 +1020,5 @@ struct mgp_execution_result { mgp_execution_headers headers; mgp_execution_rows rows; + size_t index{0}; };