Compare commits

...

32 Commits

Author SHA1 Message Date
Josip Mrden
2ff58aa783 Add value as pointer 2024-03-21 15:40:54 +01:00
Josip Mrden
e143d68c24 Added exception handling when pulling from interpreter 2024-03-21 15:29:57 +01:00
Josip Mrden
04f93ab46c Add support when query returns nothing 2024-03-21 15:21:51 +01:00
Josip Mrden
9ec55ac099 Fix 2024-03-21 13:16:55 +01:00
Josip Mrden
c85e510faa Add fetching values from the execution row 2024-03-21 13:14:50 +01:00
Josip Mrden
75d8a216c3 Add const to exception 2024-03-21 12:44:26 +01:00
Josip Mrden
358c34d444 pImpl 2024-03-20 14:21:01 +01:00
Josip Mrden
f517f8f368 Fix 2024-03-20 13:33:09 +01:00
Josip Mrden
7f335052be Add pull execution 2024-03-15 15:26:04 +01:00
Josip Mrden
53458b4962 Merge branch 'master' into query-execution-in-mage 2024-03-15 13:14:05 +01:00
Josip Mrden
2416d0fd15 Add friend class to map so query execution can access a private member 2024-03-01 13:27:00 +01:00
Josip Mrden
065c88612d Add another private section to QueryExecution 2024-03-01 13:21:18 +01:00
Josip Mrden
ed3a8a9328 Add query params execution 2024-03-01 13:19:14 +01:00
Josip Mrden
34f2a1e10b Change condition on has more rows 2024-03-01 12:57:18 +01:00
Josip Mrden
ce9bba8e83 Add has more for fetching optional result if no more rows to fetch 2024-03-01 12:55:41 +01:00
Josip Mrden
671f51f21d Row member instead of values 2024-03-01 12:38:30 +01:00
Josip Mrden
60de9e5a25 Add proxy methods to execution row 2024-02-29 18:03:59 +01:00
Josip Mrden
90fa6d9226 Add pull mechanism 2024-02-29 17:54:46 +01:00
Josip Mrden
4d930fb73b Model execution rows 2024-02-29 12:49:59 +01:00
Josip Mrden
d30a7c70de Add iterator methods for iteration 2024-02-29 11:49:36 +01:00
Josip Mrden
c0483576db Add headers iterator 2024-02-29 11:31:33 +01:00
Josip Mrden
6536d3b21d Add headers as return type 2024-02-28 23:32:52 +01:00
Josip Mrden
ec3ba6a408 Add return value when executing query in mgp 2024-02-28 23:30:06 +01:00
Josip Mrden
1145ea87ad Add support for headers 2024-02-28 23:23:15 +01:00
Josip Mrden
8fdc199d2b Implement headers in mgp 2024-02-28 17:08:17 +01:00
Josip Mrden
112c4528d3 Fixed condition for mg assert 2024-02-28 15:13:37 +01:00
Josip Mrden
11299981df Add mg invoke returning of result 2024-02-28 15:03:55 +01:00
Josip Mrden
2c0fc680de Created query execution result 2024-02-28 14:56:38 +01:00
Josip Mrden
2b30ba3fef Update method constness 2024-02-28 14:04:00 +01:00
Josip Mrden
d0babcddc5 Pass user to query execution 2024-02-28 13:39:41 +01:00
Josip Mrden
8ba1f160d4 Add API for executing a query 2024-02-28 13:07:59 +01:00
Josip Mrden
b6a55e534b Add Interpreter Context as singleton 2024-02-28 10:56:51 +01:00
10 changed files with 426 additions and 27 deletions

View File

@ -851,4 +851,23 @@ inline void func_result_set_value(mgp_func_result *res, mgp_value *value, mgp_me
MgInvokeVoid(mgp_func_result_set_value, res, value, memory);
}
inline mgp_execution_result *execute_query(mgp_graph *graph, const char *query, mgp_map *params, mgp_memory *memory) {
return MgInvoke<mgp_execution_result *>(mgp_execute_query, graph, memory, query, params);
}
inline mgp_execution_headers *fetch_execution_headers(mgp_execution_result *exec_result) {
return MgInvoke<mgp_execution_headers *>(mgp_fetch_execution_headers, exec_result);
}
inline size_t execution_headers_size(mgp_execution_headers *headers) {
return MgInvoke<size_t>(mgp_execution_headers_size, headers);
}
inline const char *execution_headers_at(mgp_execution_headers *headers, size_t index) {
return MgInvoke<const char *>(mgp_execution_headers_at, headers, index);
}
inline mgp_map *pull_one(mgp_execution_result *result, mgp_graph *graph, mgp_memory *memory) {
return MgInvoke<mgp_map *>(mgp_pull_one, result, graph, memory);
}
} // namespace mgp

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -1800,6 +1800,24 @@ enum mgp_error mgp_func_result_set_error_msg(struct mgp_func_result *result, con
/// mgp_func_result.
enum mgp_error mgp_func_result_set_value(struct mgp_func_result *result, struct mgp_value *value,
struct mgp_memory *memory);
struct mgp_execution_headers;
enum mgp_error mgp_execution_headers_at(struct mgp_execution_headers *headers, size_t index, const char **result);
enum mgp_error mgp_execution_headers_size(struct mgp_execution_headers *headers, size_t *result);
struct mgp_execution_result;
enum mgp_error mgp_execute_query(struct mgp_graph *graph, struct mgp_memory *memory, const char *query,
struct mgp_map *params, struct mgp_execution_result **result);
enum mgp_error mgp_fetch_execution_headers(struct mgp_execution_result *exec_result,
struct mgp_execution_headers **headers);
enum mgp_error mgp_pull_one(struct mgp_execution_result *exec_result, struct mgp_graph *graph,
struct mgp_memory *memory, struct mgp_map **result);
/// @}
#ifdef __cplusplus

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -16,6 +16,7 @@
#include <functional>
#include <map>
#include <mutex>
#include <optional>
#include <set>
#include <shared_mutex>
#include <string>
@ -93,6 +94,10 @@ class Relationship;
struct MapItem;
class Duration;
class Value;
class QueryExecution;
class ExecutionResult;
class ExecutionHeaders;
class ExecutionRow;
struct StealType {};
inline constexpr StealType steal{};
@ -595,6 +600,7 @@ class Map {
friend class Record;
friend class Result;
friend class Parameter;
friend class QueryExecution;
public:
/// @brief Creates a Map from the copy of the given @ref mgp_map.
@ -1550,6 +1556,95 @@ class Return {
mgp_type *GetMGPType() const;
};
class ExecutionHeaders {
public:
ExecutionHeaders(mgp_execution_headers *headers);
size_t Size() const;
std::string At(size_t index) const;
std::string_view operator[](size_t index) const;
class Iterator {
private:
friend class ExecutionHeaders;
public:
using value_type = ExecutionHeaders;
using difference_type = std::ptrdiff_t;
using pointer = const ExecutionHeaders *;
using reference = const ExecutionHeaders &;
using iterator_category = std::forward_iterator_tag;
bool operator==(const Iterator &other) const;
bool operator!=(const Iterator &other) const;
Iterator &operator++();
std::string_view operator*() const;
private:
Iterator(const ExecutionHeaders *iterable, size_t index);
const ExecutionHeaders *iterable_;
size_t index_;
};
Iterator begin();
Iterator end();
Iterator cbegin();
Iterator cend();
private:
mgp_execution_headers *headers_;
};
class QueryExecution {
public:
QueryExecution(mgp_graph *graph);
ExecutionResult ExecuteQuery(std::string_view query, Map params = Map()) const;
private:
mgp_graph *graph_;
};
class ExecutionRow {
private:
Map row_;
public:
ExecutionRow(mgp_map *row);
/// @brief Returns the size of the map.
size_t Size() const;
/// @brief Returns whether the map is empty.
bool Empty() const;
/// @brief Returns the value at the given `key`.
Value operator[](std::string_view key) const;
/// @brief Returns the value at the given `key`.
Value At(std::string_view key) const;
/// @brief Returns true if the given `key` exists.
bool KeyExists(std::string_view key) const;
mgp::Map Values() const;
};
class ExecutionResult {
public:
ExecutionResult(mgp_execution_result *result, mgp_graph *graph);
ExecutionHeaders Headers() const;
std::optional<ExecutionRow> PullOne() const;
private:
mgp_execution_result *result_;
mgp_graph *graph_;
};
enum class ProcedureType : uint8_t {
Read,
Write,
@ -4286,6 +4381,76 @@ inline mgp_type *Return::GetMGPType() const {
return util::ToMGPType(type_);
}
inline ExecutionHeaders::ExecutionHeaders(mgp_execution_headers *headers) : headers_(headers) {}
inline size_t ExecutionHeaders::Size() const { return mgp::execution_headers_size(headers_); }
inline std::string ExecutionHeaders::At(size_t index) const {
return std::string(mgp::execution_headers_at(headers_, index));
}
inline QueryExecution::QueryExecution(mgp_graph *graph) : graph_(graph) {}
inline ExecutionResult QueryExecution::ExecuteQuery(std::string_view query, mgp::Map params) const {
return ExecutionResult(mgp::MemHandlerCallback(execute_query, graph_, query.data(), params.ptr_), graph_);
}
inline ExecutionResult::ExecutionResult(mgp_execution_result *result, mgp_graph *graph)
: result_(result), graph_(graph) {}
inline ExecutionHeaders ExecutionResult::Headers() const { return mgp::fetch_execution_headers(result_); };
inline std::optional<ExecutionRow> ExecutionResult::PullOne() const {
auto *value = mgp::MemHandlerCallback(pull_one, result_, graph_);
if (!value) {
return std::nullopt;
}
return ExecutionRow(value);
}
inline bool ExecutionHeaders::Iterator::operator==(const Iterator &other) const {
return iterable_ == other.iterable_ && index_ == other.index_;
}
inline bool ExecutionHeaders::Iterator::operator!=(const Iterator &other) const { return !(*this == other); }
inline ExecutionHeaders::Iterator &ExecutionHeaders::Iterator::operator++() {
index_++;
return *this;
}
inline std::string_view ExecutionHeaders::Iterator::operator*() const { return (*iterable_)[index_]; }
inline ExecutionHeaders::Iterator::Iterator(const ExecutionHeaders *iterable, size_t index)
: iterable_(iterable), index_(index) {}
inline std::string_view ExecutionHeaders::operator[](size_t index) const {
return std::string_view(mgp::execution_headers_at(headers_, index));
}
inline ExecutionHeaders::Iterator ExecutionHeaders::begin() { return Iterator(this, 0); }
inline ExecutionHeaders::Iterator ExecutionHeaders::end() { return Iterator(this, Size()); }
inline ExecutionHeaders::Iterator ExecutionHeaders::cbegin() { return Iterator(this, 0); }
inline ExecutionHeaders::Iterator ExecutionHeaders::cend() { return Iterator(this, Size()); }
inline ExecutionRow::ExecutionRow(mgp_map *row) : row_(row) {}
inline size_t ExecutionRow::Size() const { return row_.Size(); }
inline bool ExecutionRow::Empty() const { return row_.Empty(); }
inline Value ExecutionRow::operator[](std::string_view key) const { return row_[key]; }
inline Value ExecutionRow::At(std::string_view key) const { return row_.At(key); }
inline bool ExecutionRow::KeyExists(std::string_view key) const { return row_.KeyExists(key); }
inline mgp::Map ExecutionRow::Values() const { return mgp::Map(row_); }
// do not enter
namespace detail {
inline void AddParamsReturnsToProc(mgp_proc *proc, std::vector<Parameter> &parameters,

View File

@ -436,12 +436,12 @@ int main(int argc, char **argv) {
auto db_acc = dbms_handler.Get();
memgraph::query::InterpreterContext interpreter_context_(interp_config, &dbms_handler, &repl_state, system,
auto *interpreter_context_ =
memgraph::query::InterpreterContext::getInstance(interp_config, &dbms_handler, &repl_state, system,
#ifdef MG_ENTERPRISE
&coordinator_state,
&coordinator_state,
#endif
auth_handler.get(), auth_checker.get(),
&replication_handler);
auth_handler.get(), auth_checker.get(), &replication_handler);
MG_ASSERT(db_acc, "Failed to access the main database");
memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(),
@ -454,9 +454,9 @@ int main(int argc, char **argv) {
spdlog::info("Running init file...");
#ifdef MG_ENTERPRISE
if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) {
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_file, &audit_log);
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_file, &audit_log);
} else {
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_file);
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_file);
}
#else
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_file);
@ -464,20 +464,20 @@ int main(int argc, char **argv) {
}
#ifdef MG_ENTERPRISE
dbms_handler.RestoreTriggers(&interpreter_context_);
dbms_handler.RestoreStreams(&interpreter_context_);
dbms_handler.RestoreTriggers(interpreter_context_);
dbms_handler.RestoreStreams(interpreter_context_);
#else
{
// Triggers can execute query procedures, so we need to reload the modules first and then
// the triggers
auto storage_accessor = db_acc->Access();
auto dba = memgraph::query::DbAccessor{storage_accessor.get()};
db_acc->trigger_store()->RestoreTriggers(&interpreter_context_.ast_cache, &dba, interpreter_context_.config.query,
db_acc->trigger_store()->RestoreTriggers(interpreter_context_.ast_cache, &dba, interpreter_context_.config.query,
interpreter_context_.auth_checker);
}
// As the Stream transformations are using modules, they have to be restored after the query modules are loaded.
db_acc->streams()->RestoreStreams(db_acc, &interpreter_context_);
db_acc->streams()->RestoreStreams(db_acc, interpreter_context_);
#endif
ServerContext context;
@ -493,9 +493,9 @@ int main(int argc, char **argv) {
auto server_endpoint = memgraph::communication::v2::ServerEndpoint{
boost::asio::ip::address::from_string(FLAGS_bolt_address), static_cast<uint16_t>(FLAGS_bolt_port)};
#ifdef MG_ENTERPRISE
Context session_context{&interpreter_context_, &auth_, &audit_log};
Context session_context{interpreter_context_, &auth_, &audit_log};
#else
Context session_context{&interpreter_context_, &auth_};
Context session_context{interpreter_context_, &auth_};
#endif
memgraph::glue::ServerT server(server_endpoint, &session_context, &context, FLAGS_bolt_session_inactivity_timeout,
service_name, FLAGS_bolt_num_workers);
@ -540,14 +540,14 @@ int main(int argc, char **argv) {
#ifdef MG_ENTERPRISE
&metrics_server,
#endif
&websocket_server, &server, &interpreter_context_] {
&websocket_server, &server, interpreter_context_] {
// Server needs to be shutdown first and then the database. This prevents
// a race condition when a transaction is accepted during server shutdown.
server.Shutdown();
// After the server is notified to stop accepting and processing
// connections we tell the execution engine to stop processing all pending
// queries.
interpreter_context_.Shutdown();
interpreter_context_->Shutdown();
websocket_server.Shutdown();
#ifdef MG_ENTERPRISE
metrics_server.Shutdown();
@ -575,12 +575,12 @@ int main(int argc, char **argv) {
MG_ASSERT(db_acc, "Failed to gain access to the main database");
#ifdef MG_ENTERPRISE
if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) {
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_data_file, &audit_log);
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_data_file, &audit_log);
} else {
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_data_file);
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_data_file);
}
#else
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_data_file);
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_data_file);
#endif
}

View File

@ -72,6 +72,12 @@ inline std::vector<storage::LabelId> NamesToLabels(const std::vector<std::string
return labels;
}
struct UserExecutionContextInfo {
enum class UserMode { NONE, USER, ROLE };
UserMode mode;
std::string name;
};
struct ExecutionContext {
DbAccessor *db_accessor{nullptr};
SymbolTable symbol_table;
@ -86,6 +92,7 @@ struct ExecutionContext {
TriggerContextCollector *trigger_context_collector{nullptr};
FrameChangeCollector *frame_change_collector{nullptr};
std::shared_ptr<utils::AsyncTimer> timer;
std::shared_ptr<QueryUserOrRole> user_or_role;
#ifdef MG_ENTERPRISE
std::unique_ptr<FineGrainedAuthChecker> auth_checker{nullptr};
#endif

View File

@ -1754,10 +1754,12 @@ PullPlan::PullPlan(const std::shared_ptr<PlanWrapper> plan, const Parameters &pa
ctx_.evaluation_context.parameters = parameters;
ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, dba);
ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba);
ctx_.user_or_role = user_or_role;
#ifdef MG_ENTERPRISE
if (license::global_license_checker.IsEnterpriseValidFast() && user_or_role && *user_or_role && dba) {
// Create only if an explicit user is defined
auto auth_checker = interpreter_context->auth_checker->GetFineGrainedAuthChecker(std::move(user_or_role), dba);
auto auth_checker = interpreter_context->auth_checker->GetFineGrainedAuthChecker(user_or_role, dba);
// if the user has global privileges to read, edit and write anything, we don't need to perform authorization
// otherwise, we do assign the auth checker to check for label access control

View File

@ -15,6 +15,8 @@
#include "system/include/system/system.hpp"
namespace memgraph::query {
InterpreterContext *InterpreterContext::instance = nullptr;
InterpreterContext::InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
replication::ReplicationState *rs, memgraph::system::System &system,
#ifdef MG_ENTERPRISE

View File

@ -27,6 +27,7 @@
#include "storage/v2/transaction.hpp"
#include "system/state.hpp"
#include "system/system.hpp"
#include "utils/exceptions.hpp"
#include "utils/gatekeeper.hpp"
#include "utils/skip_list.hpp"
#include "utils/spin_lock.hpp"
@ -54,13 +55,30 @@ struct QueryUserOrRole;
*
*/
struct InterpreterContext {
InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
replication::ReplicationState *rs, memgraph::system::System &system,
static InterpreterContext *instance;
static InterpreterContext *getInstance() {
MG_ASSERT(instance != nullptr, "Interpreter context has not been initialized!");
return instance;
}
static InterpreterContext *getInstance(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
replication::ReplicationState *rs, memgraph::system::System &system,
#ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState *coordinator_state,
memgraph::coordination::CoordinatorState *coordinator_state,
#endif
AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr,
ReplicationQueryHandler *replication_handler = nullptr);
AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr,
ReplicationQueryHandler *replication_handler = nullptr) {
if (instance == nullptr) {
instance = new InterpreterContext(interpreter_config, dbms_handler, rs, system,
#ifdef MG_ENTERPRISE
coordinator_state,
#endif
ah, ac, replication_handler);
}
return instance;
}
memgraph::dbms::DbmsHandler *dbms_handler;
@ -98,6 +116,14 @@ struct InterpreterContext {
std::vector<std::vector<TypedValue>> TerminateTransactions(
std::vector<std::string> maybe_kill_transaction_ids, QueryUserOrRole *user_or_role,
std::function<bool(QueryUserOrRole *, std::string const &)> privilege_checker);
};
private:
InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
replication::ReplicationState *rs, memgraph::system::System &system,
#ifdef MG_ENTERPRISE
memgraph::coordination::CoordinatorState *coordinator_state,
#endif
AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr,
ReplicationQueryHandler *replication_handler = nullptr);
};
} // namespace memgraph::query

View File

@ -23,11 +23,14 @@
#include <utility>
#include <variant>
#include "glue/auth.hpp"
#include "license/license.hpp"
#include "mg_procedure.h"
#include "module.hpp"
#include "query/db_accessor.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/interpreter.hpp"
#include "query/interpreter_context.hpp"
#include "query/procedure/cypher_types.hpp"
#include "query/procedure/fmt.hpp"
#include "query/procedure/mg_procedure_helpers.hpp"
@ -4021,3 +4024,131 @@ mgp_error mgp_untrack_current_thread_allocations(mgp_graph *graph) {
std::visit([](auto *db_accessor) -> void { db_accessor->UntrackCurrentThreadAllocations(); }, graph->impl);
});
}
mgp_execution_headers::mgp_execution_headers(memgraph::utils::pmr::vector<memgraph::utils::pmr::string> &&storage)
: headers(std::move(storage)){};
mgp_error mgp_execution_headers_size(mgp_execution_headers *headers, size_t *result) {
static_assert(noexcept(headers->headers.size()));
*result = headers->headers.size();
return mgp_error::MGP_ERROR_NO_ERROR;
}
mgp_error mgp_execution_headers_at(mgp_execution_headers *headers, size_t index, const char **result) {
return WrapExceptions(
[headers, index] {
if (index >= Call<size_t>(mgp_execution_headers_size, headers)) {
throw std::out_of_range("Header cannot be retrieved, because index exceeds headers' size!");
}
return headers->headers[index].data();
},
result);
}
mgp_execution_rows::mgp_execution_rows(
memgraph::utils::pmr::vector<memgraph::utils::pmr::vector<memgraph::query::TypedValue>> &&tv_rows)
: rows(std::move(tv_rows)) {}
struct MgProcedureResultStream final {
explicit MgProcedureResultStream(mgp_memory *memory) : rows(memory->impl), memory(memory) {}
using Row = std::vector<memgraph::query::TypedValue>;
using Rows = std::vector<Row>;
using PmrRow = memgraph::utils::pmr::vector<memgraph::query::TypedValue>;
using PmrRows = memgraph::utils::pmr::vector<PmrRow>;
PmrRows rows;
mgp_memory *memory;
void Result(const Row &row) {
PmrRow pmr_row(memory->impl);
for (auto &val : row) {
pmr_row.emplace_back(std::move(val));
}
rows.emplace_back(std::move(pmr_row));
}
};
std::map<std::string, memgraph::storage::PropertyValue> CreateQueryParams(mgp_map *params) {
std::map<std::string, memgraph::storage::PropertyValue> query_params;
for (auto &[k, v] : params->items) {
query_params.emplace(k, ToPropertyValue(v));
}
return query_params;
}
struct mgp_execution_result::pImplMgpExecutionResult {
std::unique_ptr<memgraph::query::Interpreter> interpreter;
std::unique_ptr<mgp_execution_headers> headers;
};
mgp_execution_result::mgp_execution_result() : pImpl(std::make_unique<pImplMgpExecutionResult>()) {
auto *instance = memgraph::query::InterpreterContext::getInstance();
pImpl->interpreter = std::make_unique<memgraph::query::Interpreter>(instance, instance->dbms_handler->Get());
}
mgp_execution_result::~mgp_execution_result() {
auto *instance = memgraph::query::InterpreterContext::getInstance();
instance->interpreters.WithLock([this](auto &interpreters) { interpreters.erase(pImpl->interpreter.get()); });
// interpreter will delete itself because it's a smart pointer
}
mgp_error mgp_execute_query(mgp_graph *graph, mgp_memory *memory, const char *query, mgp_map *params,
mgp_execution_result **result) {
return WrapExceptions(
[query, params, graph, memory]() {
auto query_string = std::string(query);
auto *instance = memgraph::query::InterpreterContext::getInstance();
mgp_execution_result *result = NewRawMgpObject<mgp_execution_result>(memory->impl);
result->pImpl->interpreter->SetUser(graph->ctx->user_or_role);
instance->interpreters.WithLock(
[result](auto &interpreters) { interpreters.insert(result->pImpl->interpreter.get()); });
const auto query_params = CreateQueryParams(params);
auto prepare_query_result = result->pImpl->interpreter->Prepare(query_string, query_params, {});
memgraph::utils::pmr::vector<memgraph::utils::pmr::string> headers(memory->impl);
for (auto header : prepare_query_result.headers) {
headers.emplace_back(header);
}
result->pImpl->headers = std::make_unique<mgp_execution_headers>(std::move(headers));
return result;
},
result);
}
mgp_error mgp_fetch_execution_headers(mgp_execution_result *exec_result, mgp_execution_headers **result) {
return WrapExceptions([exec_result]() { return exec_result->pImpl->headers.get(); }, result);
}
mgp_error mgp_pull_one(mgp_execution_result *exec_result, mgp_graph *graph, mgp_memory *memory, mgp_map **result) {
return WrapExceptions(
[exec_result, graph, memory]() -> mgp_map * {
MgProcedureResultStream stream(memory);
try {
exec_result->pImpl->interpreter->Pull(&stream, 1, {});
} catch (const std::exception &e) {
return nullptr;
}
if (stream.rows.empty()) {
return nullptr;
}
const size_t headers_size = exec_result->pImpl->headers->headers.size();
memgraph::utils::pmr::map<memgraph::utils::pmr::string, mgp_value> items(memory->impl);
for (size_t idx = 0; idx < headers_size; idx++) {
items.emplace(exec_result->pImpl->headers->headers[idx],
mgp_value{std::move(stream.rows[0][idx]), graph, memory->impl});
}
return NewRawMgpObject<mgp_map>(memory->impl, std::move(items));
},
result);
}

View File

@ -1,4 +1,4 @@
// Copyright 2023 Memgraph Ltd.
// Copyright 2024 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -16,6 +16,7 @@
#include "mg_procedure.h"
#include <memory>
#include <optional>
#include <ostream>
@ -24,6 +25,7 @@
#include "query/context.hpp"
#include "query/db_accessor.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/procedure/cypher_type_ptr.hpp"
#include "query/typed_value.hpp"
#include "storage/v2/view.hpp"
@ -33,6 +35,7 @@
#include "utils/pmr/vector.hpp"
#include "utils/temporal.hpp"
#include "utils/variant_helpers.hpp"
/// Wraps memory resource used in custom procedures.
///
/// This should have been `using mgp_memory = memgraph::utils::MemoryResource`, but that's
@ -993,3 +996,29 @@ struct mgp_messages {
bool ContainsDeleted(const mgp_value *val);
memgraph::query::TypedValue ToTypedValue(const mgp_value &val, memgraph::utils::MemoryResource *memory);
struct mgp_execution_headers {
using allocator_type = memgraph::utils::Allocator<mgp_execution_headers>;
using storage_type = memgraph::utils::pmr::vector<memgraph::utils::pmr::string>;
explicit mgp_execution_headers(storage_type &&storage);
~mgp_execution_headers() = default;
storage_type headers;
};
struct mgp_execution_rows {
explicit mgp_execution_rows(
memgraph::utils::pmr::vector<memgraph::utils::pmr::vector<memgraph::query::TypedValue>> &&tv_rows);
~mgp_execution_rows() = default;
memgraph::utils::pmr::vector<memgraph::utils::pmr::vector<memgraph::query::TypedValue>> rows;
};
struct mgp_execution_result {
explicit mgp_execution_result();
~mgp_execution_result();
struct pImplMgpExecutionResult;
std::unique_ptr<pImplMgpExecutionResult> pImpl;
};