Add pull execution

This commit is contained in:
Josip Mrden 2024-03-15 15:26:04 +01:00
parent 53458b4962
commit 7f335052be
5 changed files with 23 additions and 39 deletions

View File

@ -870,7 +870,4 @@ inline const char *execution_headers_at(mgp_execution_headers *headers, size_t i
inline mgp_map *pull_one(mgp_execution_result *result, mgp_graph *graph, mgp_memory *memory) {
return MgInvoke<mgp_map *>(mgp_pull_one, result, graph, memory);
}
inline bool has_more_rows(mgp_execution_result *result) { return MgInvoke<bool>(mgp_has_more_rows, result); }
} // namespace mgp

View File

@ -1815,8 +1815,6 @@ enum mgp_error mgp_execute_query(struct mgp_graph *graph, struct mgp_memory *mem
enum mgp_error mgp_fetch_execution_headers(struct mgp_execution_result *exec_result,
struct mgp_execution_headers **headers);
enum mgp_error mgp_has_more_rows(struct mgp_execution_result *exec_result, bool *result);
enum mgp_error mgp_pull_one(struct mgp_execution_result *exec_result, struct mgp_graph *graph,
struct mgp_memory *memory, struct mgp_map **result);

View File

@ -1636,7 +1636,6 @@ class ExecutionResult {
public:
ExecutionResult(mgp_execution_result *result, mgp_graph *graph);
ExecutionHeaders Headers() const;
bool HasMore() const;
std::optional<ExecutionRow> PullOne() const;
private:
@ -4399,14 +4398,12 @@ inline ExecutionResult::ExecutionResult(mgp_execution_result *result, mgp_graph
inline ExecutionHeaders ExecutionResult::Headers() const { return mgp::fetch_execution_headers(result_); };
inline bool ExecutionResult::HasMore() const { return mgp::has_more_rows(result_); }
inline std::optional<ExecutionRow> ExecutionResult::PullOne() const {
if (HasMore()) {
try {
return ExecutionRow(mgp::MemHandlerCallback(pull_one, result_, graph_));
} catch (std::exception &e) {
return std::nullopt;
}
return std::nullopt;
}
inline bool ExecutionHeaders::Iterator::operator==(const Iterator &other) const {

View File

@ -4049,9 +4049,6 @@ mgp_execution_rows::mgp_execution_rows(
memgraph::utils::pmr::vector<memgraph::utils::pmr::vector<memgraph::query::TypedValue>> &&tv_rows)
: rows(std::move(tv_rows)) {}
mgp_execution_result::mgp_execution_result(mgp_execution_headers &&headers, mgp_execution_rows &&rows)
: headers(std::move(headers)), rows(std::move(rows)) {}
struct MgProcedureResultStream final {
explicit MgProcedureResultStream(mgp_memory *memory) : rows(memory->impl), memory(memory) {}
using Row = std::vector<memgraph::query::TypedValue>;
@ -4088,28 +4085,25 @@ mgp_error mgp_execute_query(mgp_graph *graph, mgp_memory *memory, const char *qu
auto query_string = std::string(query);
auto *instance = memgraph::query::InterpreterContext::getInstance();
memgraph::query::Interpreter interpreter(instance, instance->dbms_handler->Get());
interpreter.SetUser(graph->ctx->user_or_role);
auto *interpreter = new memgraph::query::Interpreter(instance, instance->dbms_handler->Get());
interpreter->SetUser(graph->ctx->user_or_role);
instance->interpreters.WithLock([&interpreter](auto &interpreters) { interpreters.insert(&interpreter); });
memgraph::utils::OnScopeExit erase_interpreter([&] {
instance->interpreters.WithLock([&interpreter](auto &interpreters) { interpreters.erase(&interpreter); });
});
// instance->interpreters.WithLock([&interpreter](auto &interpreters) { interpreters.insert(&interpreter); });
// memgraph::utils::OnScopeExit erase_interpreter([&] {
// instance->interpreters.WithLock([&interpreter](auto &interpreters) { interpreters.erase(&interpreter); });
// });
const auto query_params = CreateQueryParams(params);
auto prepare_query_result = interpreter.Prepare(query_string, query_params, {});
MgProcedureResultStream stream(memory);
interpreter.Pull(&stream, {}, prepare_query_result.qid);
auto prepare_query_result = interpreter->Prepare(query_string, query_params, {});
memgraph::utils::pmr::vector<memgraph::utils::pmr::string> headers(memory->impl);
for (auto header : prepare_query_result.headers) {
headers.emplace_back(header);
}
return NewRawMgpObject<mgp_execution_result>(memory, mgp_execution_headers{std::move(headers)},
mgp_execution_rows{std::move(stream.rows)});
return NewRawMgpObject<mgp_execution_result>(memory->impl, std::move(interpreter),
mgp_execution_headers{std::move(headers)});
},
result);
}
@ -4118,23 +4112,17 @@ mgp_error mgp_fetch_execution_headers(mgp_execution_result *exec_result, mgp_exe
return WrapExceptions([exec_result]() { return &exec_result->headers; }, result);
}
mgp_error mgp_has_more_rows(mgp_execution_result *exec_result, bool *result) {
return WrapExceptions([exec_result]() { return exec_result->index < exec_result->rows.rows.size(); }, result);
}
mgp_error mgp_pull_one(mgp_execution_result *exec_result, mgp_graph *graph, mgp_memory *memory, mgp_map **result) {
return WrapExceptions(
[exec_result, graph, memory]() {
if (exec_result->index >= exec_result->rows.rows.size()) {
throw std::out_of_range("No more rows to pull from query execution!");
}
MgProcedureResultStream stream(memory);
exec_result->interpreter->Pull(&stream, 1, {});
const size_t index_to_fetch = exec_result->index++;
const size_t headers_size = exec_result->headers.headers.size();
memgraph::utils::pmr::map<memgraph::utils::pmr::string, mgp_value> items(memory->impl);
for (size_t idx = 0; idx < headers_size; idx++) {
items.emplace(exec_result->headers.headers[idx],
mgp_value{exec_result->rows.rows[index_to_fetch][idx], graph, memory->impl});
mgp_value{std::move(stream.rows[0][idx]), graph, memory->impl});
}
return NewRawMgpObject<mgp_map>(memory->impl, std::move(items));

View File

@ -34,6 +34,11 @@
#include "utils/pmr/vector.hpp"
#include "utils/temporal.hpp"
#include "utils/variant_helpers.hpp"
namespace memgraph::query {
class Interpreter;
}
/// Wraps memory resource used in custom procedures.
///
/// This should have been `using mgp_memory = memgraph::utils::MemoryResource`, but that's
@ -1014,11 +1019,10 @@ struct mgp_execution_rows {
};
struct mgp_execution_result {
explicit mgp_execution_result(mgp_execution_headers &&headers, mgp_execution_rows &&rows);
mgp_execution_result(memgraph::query::Interpreter *interpreter, mgp_execution_headers headers)
: interpreter(interpreter), headers(headers) {}
~mgp_execution_result() = default;
memgraph::query::Interpreter *interpreter;
mgp_execution_headers headers;
mgp_execution_rows rows;
size_t index{0};
};