Add pull mechanism

This commit is contained in:
Josip Mrden 2024-02-29 17:54:46 +01:00
parent 4d930fb73b
commit 90fa6d9226
5 changed files with 53 additions and 5 deletions

View File

@ -867,4 +867,8 @@ inline const char *execution_headers_at(mgp_execution_headers *headers, size_t i
return MgInvoke<const char *>(mgp_execution_headers_at, headers, index);
}
inline mgp_map *pull_one(mgp_execution_result *result, mgp_graph *graph, mgp_memory *memory) {
return MgInvoke<mgp_map *>(mgp_pull_one, result, graph, memory);
}
} // namespace mgp

View File

@ -1815,6 +1815,9 @@ enum mgp_error mgp_execute_query(struct mgp_graph *graph, struct mgp_memory *mem
enum mgp_error mgp_fetch_execution_headers(struct mgp_execution_result *exec_result,
struct mgp_execution_headers **headers);
enum mgp_error mgp_pull_one(struct mgp_execution_result *exec_result, struct mgp_graph *graph,
struct mgp_memory *memory, struct mgp_map **result);
/// @}
#ifdef __cplusplus

View File

@ -16,6 +16,7 @@
#include <functional>
#include <map>
#include <mutex>
#include <optional>
#include <set>
#include <shared_mutex>
#include <string>
@ -93,7 +94,9 @@ class Relationship;
struct MapItem;
class Duration;
class Value;
class ExecutionHeaders;
class QueryExecution;
class ExecutionRow;
class ExecutionResult;
struct StealType {};
@ -1605,12 +1608,22 @@ class QueryExecution {
mgp_graph *graph_;
};
class ExecutionResult {
mgp_execution_result *result_;
class ExecutionRow {
mgp_map *row_;
public:
ExecutionResult(mgp_execution_result *result);
ExecutionRow(mgp_map *row);
};
class ExecutionResult {
public:
ExecutionResult(mgp_execution_result *result, mgp_graph *graph);
ExecutionHeaders Headers() const;
std::optional<ExecutionRow> PullOne() const;
private:
mgp_execution_result *result_;
mgp_graph *graph_;
};
enum class ProcedureType : uint8_t {
@ -4360,13 +4373,18 @@ inline std::string ExecutionHeaders::At(size_t index) const {
inline QueryExecution::QueryExecution(mgp_graph *graph) : graph_(graph) {}
inline ExecutionResult QueryExecution::ExecuteQuery(std::string_view query) const {
return ExecutionResult(mgp::MemHandlerCallback(execute_query, graph_, query.data()));
return ExecutionResult(mgp::MemHandlerCallback(execute_query, graph_, query.data()), graph_);
}
inline ExecutionResult::ExecutionResult(mgp_execution_result *result) : result_(result) {}
inline ExecutionResult::ExecutionResult(mgp_execution_result *result, mgp_graph *graph)
: result_(result), graph_(graph) {}
inline ExecutionHeaders ExecutionResult::Headers() const { return mgp::fetch_execution_headers(result_); };
inline std::optional<ExecutionRow> ExecutionResult::PullOne() const {
return ExecutionRow(mgp::MemHandlerCallback(pull_one, result_, graph_));
}
inline bool ExecutionHeaders::Iterator::operator==(const Iterator &other) const {
return iterable_ == other.iterable_ && index_ == other.index_;
}
@ -4395,6 +4413,8 @@ inline ExecutionHeaders::Iterator ExecutionHeaders::cbegin() { return Iterator(t
inline ExecutionHeaders::Iterator ExecutionHeaders::cend() { return Iterator(this, Size()); }
inline ExecutionRow::ExecutionRow(mgp_map *row) : row_(row) {}
// do not enter
namespace detail {
inline void AddParamsReturnsToProc(mgp_proc *proc, std::vector<Parameter> &parameters,

View File

@ -4105,3 +4105,23 @@ mgp_error mgp_execute_query(mgp_graph *graph, mgp_memory *memory, const char *qu
mgp_error mgp_fetch_execution_headers(mgp_execution_result *exec_result, mgp_execution_headers **result) {
return WrapExceptions([exec_result]() { return &exec_result->headers; }, result);
}
mgp_error mgp_pull_one(mgp_execution_result *exec_result, mgp_graph *graph, mgp_memory *memory, mgp_map **result) {
return WrapExceptions(
[exec_result, graph, memory]() {
if (exec_result->index >= exec_result->rows.rows.size()) {
throw std::out_of_range("No more rows to pull from query execution!");
}
const size_t index_to_fetch = exec_result->index++;
const size_t headers_size = exec_result->headers.headers.size();
memgraph::utils::pmr::map<memgraph::utils::pmr::string, mgp_value> items(memory->impl);
for (size_t idx = 0; idx < headers_size; idx++) {
items.emplace(exec_result->headers.headers[idx],
mgp_value{exec_result->rows.rows[index_to_fetch][idx], graph, memory->impl});
}
return NewRawMgpObject<mgp_map>(memory->impl, std::move(items));
},
result);
}

View File

@ -1020,4 +1020,5 @@ struct mgp_execution_result {
mgp_execution_headers headers;
mgp_execution_rows rows;
size_t index{0};
};