Compare commits
32 Commits
master
...
query-exec
Author | SHA1 | Date | |
---|---|---|---|
|
2ff58aa783 | ||
|
e143d68c24 | ||
|
04f93ab46c | ||
|
9ec55ac099 | ||
|
c85e510faa | ||
|
75d8a216c3 | ||
|
358c34d444 | ||
|
f517f8f368 | ||
|
7f335052be | ||
|
53458b4962 | ||
|
2416d0fd15 | ||
|
065c88612d | ||
|
ed3a8a9328 | ||
|
34f2a1e10b | ||
|
ce9bba8e83 | ||
|
671f51f21d | ||
|
60de9e5a25 | ||
|
90fa6d9226 | ||
|
4d930fb73b | ||
|
d30a7c70de | ||
|
c0483576db | ||
|
6536d3b21d | ||
|
ec3ba6a408 | ||
|
1145ea87ad | ||
|
8fdc199d2b | ||
|
112c4528d3 | ||
|
11299981df | ||
|
2c0fc680de | ||
|
2b30ba3fef | ||
|
d0babcddc5 | ||
|
8ba1f160d4 | ||
|
b6a55e534b |
@ -851,4 +851,23 @@ inline void func_result_set_value(mgp_func_result *res, mgp_value *value, mgp_me
|
||||
MgInvokeVoid(mgp_func_result_set_value, res, value, memory);
|
||||
}
|
||||
|
||||
inline mgp_execution_result *execute_query(mgp_graph *graph, const char *query, mgp_map *params, mgp_memory *memory) {
|
||||
return MgInvoke<mgp_execution_result *>(mgp_execute_query, graph, memory, query, params);
|
||||
}
|
||||
|
||||
inline mgp_execution_headers *fetch_execution_headers(mgp_execution_result *exec_result) {
|
||||
return MgInvoke<mgp_execution_headers *>(mgp_fetch_execution_headers, exec_result);
|
||||
}
|
||||
|
||||
inline size_t execution_headers_size(mgp_execution_headers *headers) {
|
||||
return MgInvoke<size_t>(mgp_execution_headers_size, headers);
|
||||
}
|
||||
|
||||
inline const char *execution_headers_at(mgp_execution_headers *headers, size_t index) {
|
||||
return MgInvoke<const char *>(mgp_execution_headers_at, headers, index);
|
||||
}
|
||||
|
||||
inline mgp_map *pull_one(mgp_execution_result *result, mgp_graph *graph, mgp_memory *memory) {
|
||||
return MgInvoke<mgp_map *>(mgp_pull_one, result, graph, memory);
|
||||
}
|
||||
} // namespace mgp
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -1800,6 +1800,24 @@ enum mgp_error mgp_func_result_set_error_msg(struct mgp_func_result *result, con
|
||||
/// mgp_func_result.
|
||||
enum mgp_error mgp_func_result_set_value(struct mgp_func_result *result, struct mgp_value *value,
|
||||
struct mgp_memory *memory);
|
||||
|
||||
struct mgp_execution_headers;
|
||||
|
||||
enum mgp_error mgp_execution_headers_at(struct mgp_execution_headers *headers, size_t index, const char **result);
|
||||
|
||||
enum mgp_error mgp_execution_headers_size(struct mgp_execution_headers *headers, size_t *result);
|
||||
|
||||
struct mgp_execution_result;
|
||||
|
||||
enum mgp_error mgp_execute_query(struct mgp_graph *graph, struct mgp_memory *memory, const char *query,
|
||||
struct mgp_map *params, struct mgp_execution_result **result);
|
||||
|
||||
enum mgp_error mgp_fetch_execution_headers(struct mgp_execution_result *exec_result,
|
||||
struct mgp_execution_headers **headers);
|
||||
|
||||
enum mgp_error mgp_pull_one(struct mgp_execution_result *exec_result, struct mgp_graph *graph,
|
||||
struct mgp_memory *memory, struct mgp_map **result);
|
||||
|
||||
/// @}
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
167
include/mgp.hpp
167
include/mgp.hpp
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -16,6 +16,7 @@
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <set>
|
||||
#include <shared_mutex>
|
||||
#include <string>
|
||||
@ -93,6 +94,10 @@ class Relationship;
|
||||
struct MapItem;
|
||||
class Duration;
|
||||
class Value;
|
||||
class QueryExecution;
|
||||
class ExecutionResult;
|
||||
class ExecutionHeaders;
|
||||
class ExecutionRow;
|
||||
|
||||
struct StealType {};
|
||||
inline constexpr StealType steal{};
|
||||
@ -595,6 +600,7 @@ class Map {
|
||||
friend class Record;
|
||||
friend class Result;
|
||||
friend class Parameter;
|
||||
friend class QueryExecution;
|
||||
|
||||
public:
|
||||
/// @brief Creates a Map from the copy of the given @ref mgp_map.
|
||||
@ -1550,6 +1556,95 @@ class Return {
|
||||
mgp_type *GetMGPType() const;
|
||||
};
|
||||
|
||||
class ExecutionHeaders {
|
||||
public:
|
||||
ExecutionHeaders(mgp_execution_headers *headers);
|
||||
size_t Size() const;
|
||||
std::string At(size_t index) const;
|
||||
|
||||
std::string_view operator[](size_t index) const;
|
||||
|
||||
class Iterator {
|
||||
private:
|
||||
friend class ExecutionHeaders;
|
||||
|
||||
public:
|
||||
using value_type = ExecutionHeaders;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using pointer = const ExecutionHeaders *;
|
||||
using reference = const ExecutionHeaders &;
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
|
||||
bool operator==(const Iterator &other) const;
|
||||
|
||||
bool operator!=(const Iterator &other) const;
|
||||
|
||||
Iterator &operator++();
|
||||
|
||||
std::string_view operator*() const;
|
||||
|
||||
private:
|
||||
Iterator(const ExecutionHeaders *iterable, size_t index);
|
||||
|
||||
const ExecutionHeaders *iterable_;
|
||||
size_t index_;
|
||||
};
|
||||
|
||||
Iterator begin();
|
||||
Iterator end();
|
||||
|
||||
Iterator cbegin();
|
||||
Iterator cend();
|
||||
|
||||
private:
|
||||
mgp_execution_headers *headers_;
|
||||
};
|
||||
|
||||
class QueryExecution {
|
||||
public:
|
||||
QueryExecution(mgp_graph *graph);
|
||||
ExecutionResult ExecuteQuery(std::string_view query, Map params = Map()) const;
|
||||
|
||||
private:
|
||||
mgp_graph *graph_;
|
||||
};
|
||||
|
||||
class ExecutionRow {
|
||||
private:
|
||||
Map row_;
|
||||
|
||||
public:
|
||||
ExecutionRow(mgp_map *row);
|
||||
|
||||
/// @brief Returns the size of the map.
|
||||
size_t Size() const;
|
||||
|
||||
/// @brief Returns whether the map is empty.
|
||||
bool Empty() const;
|
||||
|
||||
/// @brief Returns the value at the given `key`.
|
||||
Value operator[](std::string_view key) const;
|
||||
|
||||
/// @brief Returns the value at the given `key`.
|
||||
Value At(std::string_view key) const;
|
||||
|
||||
/// @brief Returns true if the given `key` exists.
|
||||
bool KeyExists(std::string_view key) const;
|
||||
|
||||
mgp::Map Values() const;
|
||||
};
|
||||
|
||||
class ExecutionResult {
|
||||
public:
|
||||
ExecutionResult(mgp_execution_result *result, mgp_graph *graph);
|
||||
ExecutionHeaders Headers() const;
|
||||
std::optional<ExecutionRow> PullOne() const;
|
||||
|
||||
private:
|
||||
mgp_execution_result *result_;
|
||||
mgp_graph *graph_;
|
||||
};
|
||||
|
||||
enum class ProcedureType : uint8_t {
|
||||
Read,
|
||||
Write,
|
||||
@ -4286,6 +4381,76 @@ inline mgp_type *Return::GetMGPType() const {
|
||||
return util::ToMGPType(type_);
|
||||
}
|
||||
|
||||
inline ExecutionHeaders::ExecutionHeaders(mgp_execution_headers *headers) : headers_(headers) {}
|
||||
|
||||
inline size_t ExecutionHeaders::Size() const { return mgp::execution_headers_size(headers_); }
|
||||
|
||||
inline std::string ExecutionHeaders::At(size_t index) const {
|
||||
return std::string(mgp::execution_headers_at(headers_, index));
|
||||
}
|
||||
|
||||
inline QueryExecution::QueryExecution(mgp_graph *graph) : graph_(graph) {}
|
||||
|
||||
inline ExecutionResult QueryExecution::ExecuteQuery(std::string_view query, mgp::Map params) const {
|
||||
return ExecutionResult(mgp::MemHandlerCallback(execute_query, graph_, query.data(), params.ptr_), graph_);
|
||||
}
|
||||
|
||||
inline ExecutionResult::ExecutionResult(mgp_execution_result *result, mgp_graph *graph)
|
||||
: result_(result), graph_(graph) {}
|
||||
|
||||
inline ExecutionHeaders ExecutionResult::Headers() const { return mgp::fetch_execution_headers(result_); };
|
||||
|
||||
inline std::optional<ExecutionRow> ExecutionResult::PullOne() const {
|
||||
auto *value = mgp::MemHandlerCallback(pull_one, result_, graph_);
|
||||
if (!value) {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return ExecutionRow(value);
|
||||
}
|
||||
|
||||
inline bool ExecutionHeaders::Iterator::operator==(const Iterator &other) const {
|
||||
return iterable_ == other.iterable_ && index_ == other.index_;
|
||||
}
|
||||
|
||||
inline bool ExecutionHeaders::Iterator::operator!=(const Iterator &other) const { return !(*this == other); }
|
||||
|
||||
inline ExecutionHeaders::Iterator &ExecutionHeaders::Iterator::operator++() {
|
||||
index_++;
|
||||
return *this;
|
||||
}
|
||||
|
||||
inline std::string_view ExecutionHeaders::Iterator::operator*() const { return (*iterable_)[index_]; }
|
||||
|
||||
inline ExecutionHeaders::Iterator::Iterator(const ExecutionHeaders *iterable, size_t index)
|
||||
: iterable_(iterable), index_(index) {}
|
||||
|
||||
inline std::string_view ExecutionHeaders::operator[](size_t index) const {
|
||||
return std::string_view(mgp::execution_headers_at(headers_, index));
|
||||
}
|
||||
|
||||
inline ExecutionHeaders::Iterator ExecutionHeaders::begin() { return Iterator(this, 0); }
|
||||
|
||||
inline ExecutionHeaders::Iterator ExecutionHeaders::end() { return Iterator(this, Size()); }
|
||||
|
||||
inline ExecutionHeaders::Iterator ExecutionHeaders::cbegin() { return Iterator(this, 0); }
|
||||
|
||||
inline ExecutionHeaders::Iterator ExecutionHeaders::cend() { return Iterator(this, Size()); }
|
||||
|
||||
inline ExecutionRow::ExecutionRow(mgp_map *row) : row_(row) {}
|
||||
|
||||
inline size_t ExecutionRow::Size() const { return row_.Size(); }
|
||||
|
||||
inline bool ExecutionRow::Empty() const { return row_.Empty(); }
|
||||
|
||||
inline Value ExecutionRow::operator[](std::string_view key) const { return row_[key]; }
|
||||
|
||||
inline Value ExecutionRow::At(std::string_view key) const { return row_.At(key); }
|
||||
|
||||
inline bool ExecutionRow::KeyExists(std::string_view key) const { return row_.KeyExists(key); }
|
||||
|
||||
inline mgp::Map ExecutionRow::Values() const { return mgp::Map(row_); }
|
||||
|
||||
// do not enter
|
||||
namespace detail {
|
||||
inline void AddParamsReturnsToProc(mgp_proc *proc, std::vector<Parameter> ¶meters,
|
||||
|
@ -436,12 +436,12 @@ int main(int argc, char **argv) {
|
||||
|
||||
auto db_acc = dbms_handler.Get();
|
||||
|
||||
memgraph::query::InterpreterContext interpreter_context_(interp_config, &dbms_handler, &repl_state, system,
|
||||
auto *interpreter_context_ =
|
||||
memgraph::query::InterpreterContext::getInstance(interp_config, &dbms_handler, &repl_state, system,
|
||||
#ifdef MG_ENTERPRISE
|
||||
&coordinator_state,
|
||||
#endif
|
||||
auth_handler.get(), auth_checker.get(),
|
||||
&replication_handler);
|
||||
auth_handler.get(), auth_checker.get(), &replication_handler);
|
||||
MG_ASSERT(db_acc, "Failed to access the main database");
|
||||
|
||||
memgraph::query::procedure::gModuleRegistry.SetModulesDirectory(memgraph::flags::ParseQueryModulesDirectory(),
|
||||
@ -454,9 +454,9 @@ int main(int argc, char **argv) {
|
||||
spdlog::info("Running init file...");
|
||||
#ifdef MG_ENTERPRISE
|
||||
if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_file, &audit_log);
|
||||
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_file, &audit_log);
|
||||
} else {
|
||||
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_file);
|
||||
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_file);
|
||||
}
|
||||
#else
|
||||
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_file);
|
||||
@ -464,20 +464,20 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
dbms_handler.RestoreTriggers(&interpreter_context_);
|
||||
dbms_handler.RestoreStreams(&interpreter_context_);
|
||||
dbms_handler.RestoreTriggers(interpreter_context_);
|
||||
dbms_handler.RestoreStreams(interpreter_context_);
|
||||
#else
|
||||
{
|
||||
// Triggers can execute query procedures, so we need to reload the modules first and then
|
||||
// the triggers
|
||||
auto storage_accessor = db_acc->Access();
|
||||
auto dba = memgraph::query::DbAccessor{storage_accessor.get()};
|
||||
db_acc->trigger_store()->RestoreTriggers(&interpreter_context_.ast_cache, &dba, interpreter_context_.config.query,
|
||||
db_acc->trigger_store()->RestoreTriggers(interpreter_context_.ast_cache, &dba, interpreter_context_.config.query,
|
||||
interpreter_context_.auth_checker);
|
||||
}
|
||||
|
||||
// As the Stream transformations are using modules, they have to be restored after the query modules are loaded.
|
||||
db_acc->streams()->RestoreStreams(db_acc, &interpreter_context_);
|
||||
db_acc->streams()->RestoreStreams(db_acc, interpreter_context_);
|
||||
#endif
|
||||
|
||||
ServerContext context;
|
||||
@ -493,9 +493,9 @@ int main(int argc, char **argv) {
|
||||
auto server_endpoint = memgraph::communication::v2::ServerEndpoint{
|
||||
boost::asio::ip::address::from_string(FLAGS_bolt_address), static_cast<uint16_t>(FLAGS_bolt_port)};
|
||||
#ifdef MG_ENTERPRISE
|
||||
Context session_context{&interpreter_context_, &auth_, &audit_log};
|
||||
Context session_context{interpreter_context_, &auth_, &audit_log};
|
||||
#else
|
||||
Context session_context{&interpreter_context_, &auth_};
|
||||
Context session_context{interpreter_context_, &auth_};
|
||||
#endif
|
||||
memgraph::glue::ServerT server(server_endpoint, &session_context, &context, FLAGS_bolt_session_inactivity_timeout,
|
||||
service_name, FLAGS_bolt_num_workers);
|
||||
@ -540,14 +540,14 @@ int main(int argc, char **argv) {
|
||||
#ifdef MG_ENTERPRISE
|
||||
&metrics_server,
|
||||
#endif
|
||||
&websocket_server, &server, &interpreter_context_] {
|
||||
&websocket_server, &server, interpreter_context_] {
|
||||
// Server needs to be shutdown first and then the database. This prevents
|
||||
// a race condition when a transaction is accepted during server shutdown.
|
||||
server.Shutdown();
|
||||
// After the server is notified to stop accepting and processing
|
||||
// connections we tell the execution engine to stop processing all pending
|
||||
// queries.
|
||||
interpreter_context_.Shutdown();
|
||||
interpreter_context_->Shutdown();
|
||||
websocket_server.Shutdown();
|
||||
#ifdef MG_ENTERPRISE
|
||||
metrics_server.Shutdown();
|
||||
@ -575,12 +575,12 @@ int main(int argc, char **argv) {
|
||||
MG_ASSERT(db_acc, "Failed to gain access to the main database");
|
||||
#ifdef MG_ENTERPRISE
|
||||
if (memgraph::license::global_license_checker.IsEnterpriseValidFast()) {
|
||||
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_data_file, &audit_log);
|
||||
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_data_file, &audit_log);
|
||||
} else {
|
||||
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_data_file);
|
||||
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_data_file);
|
||||
}
|
||||
#else
|
||||
InitFromCypherlFile(interpreter_context_, db_acc, FLAGS_init_data_file);
|
||||
InitFromCypherlFile(*interpreter_context_, db_acc, FLAGS_init_data_file);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -72,6 +72,12 @@ inline std::vector<storage::LabelId> NamesToLabels(const std::vector<std::string
|
||||
return labels;
|
||||
}
|
||||
|
||||
struct UserExecutionContextInfo {
|
||||
enum class UserMode { NONE, USER, ROLE };
|
||||
UserMode mode;
|
||||
std::string name;
|
||||
};
|
||||
|
||||
struct ExecutionContext {
|
||||
DbAccessor *db_accessor{nullptr};
|
||||
SymbolTable symbol_table;
|
||||
@ -86,6 +92,7 @@ struct ExecutionContext {
|
||||
TriggerContextCollector *trigger_context_collector{nullptr};
|
||||
FrameChangeCollector *frame_change_collector{nullptr};
|
||||
std::shared_ptr<utils::AsyncTimer> timer;
|
||||
std::shared_ptr<QueryUserOrRole> user_or_role;
|
||||
#ifdef MG_ENTERPRISE
|
||||
std::unique_ptr<FineGrainedAuthChecker> auth_checker{nullptr};
|
||||
#endif
|
||||
|
@ -1754,10 +1754,12 @@ PullPlan::PullPlan(const std::shared_ptr<PlanWrapper> plan, const Parameters &pa
|
||||
ctx_.evaluation_context.parameters = parameters;
|
||||
ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, dba);
|
||||
ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba);
|
||||
ctx_.user_or_role = user_or_role;
|
||||
|
||||
#ifdef MG_ENTERPRISE
|
||||
if (license::global_license_checker.IsEnterpriseValidFast() && user_or_role && *user_or_role && dba) {
|
||||
// Create only if an explicit user is defined
|
||||
auto auth_checker = interpreter_context->auth_checker->GetFineGrainedAuthChecker(std::move(user_or_role), dba);
|
||||
auto auth_checker = interpreter_context->auth_checker->GetFineGrainedAuthChecker(user_or_role, dba);
|
||||
|
||||
// if the user has global privileges to read, edit and write anything, we don't need to perform authorization
|
||||
// otherwise, we do assign the auth checker to check for label access control
|
||||
|
@ -15,6 +15,8 @@
|
||||
#include "system/include/system/system.hpp"
|
||||
namespace memgraph::query {
|
||||
|
||||
InterpreterContext *InterpreterContext::instance = nullptr;
|
||||
|
||||
InterpreterContext::InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
|
||||
replication::ReplicationState *rs, memgraph::system::System &system,
|
||||
#ifdef MG_ENTERPRISE
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "system/state.hpp"
|
||||
#include "system/system.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/gatekeeper.hpp"
|
||||
#include "utils/skip_list.hpp"
|
||||
#include "utils/spin_lock.hpp"
|
||||
@ -54,13 +55,30 @@ struct QueryUserOrRole;
|
||||
*
|
||||
*/
|
||||
struct InterpreterContext {
|
||||
InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
|
||||
static InterpreterContext *instance;
|
||||
|
||||
static InterpreterContext *getInstance() {
|
||||
MG_ASSERT(instance != nullptr, "Interpreter context has not been initialized!");
|
||||
return instance;
|
||||
}
|
||||
|
||||
static InterpreterContext *getInstance(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
|
||||
replication::ReplicationState *rs, memgraph::system::System &system,
|
||||
#ifdef MG_ENTERPRISE
|
||||
memgraph::coordination::CoordinatorState *coordinator_state,
|
||||
#endif
|
||||
AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr,
|
||||
ReplicationQueryHandler *replication_handler = nullptr);
|
||||
ReplicationQueryHandler *replication_handler = nullptr) {
|
||||
if (instance == nullptr) {
|
||||
instance = new InterpreterContext(interpreter_config, dbms_handler, rs, system,
|
||||
#ifdef MG_ENTERPRISE
|
||||
coordinator_state,
|
||||
#endif
|
||||
ah, ac, replication_handler);
|
||||
}
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
memgraph::dbms::DbmsHandler *dbms_handler;
|
||||
|
||||
@ -98,6 +116,14 @@ struct InterpreterContext {
|
||||
std::vector<std::vector<TypedValue>> TerminateTransactions(
|
||||
std::vector<std::string> maybe_kill_transaction_ids, QueryUserOrRole *user_or_role,
|
||||
std::function<bool(QueryUserOrRole *, std::string const &)> privilege_checker);
|
||||
};
|
||||
|
||||
private:
|
||||
InterpreterContext(InterpreterConfig interpreter_config, dbms::DbmsHandler *dbms_handler,
|
||||
replication::ReplicationState *rs, memgraph::system::System &system,
|
||||
#ifdef MG_ENTERPRISE
|
||||
memgraph::coordination::CoordinatorState *coordinator_state,
|
||||
#endif
|
||||
AuthQueryHandler *ah = nullptr, AuthChecker *ac = nullptr,
|
||||
ReplicationQueryHandler *replication_handler = nullptr);
|
||||
};
|
||||
} // namespace memgraph::query
|
||||
|
@ -23,11 +23,14 @@
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
|
||||
#include "glue/auth.hpp"
|
||||
#include "license/license.hpp"
|
||||
#include "mg_procedure.h"
|
||||
#include "module.hpp"
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/interpreter_context.hpp"
|
||||
#include "query/procedure/cypher_types.hpp"
|
||||
#include "query/procedure/fmt.hpp"
|
||||
#include "query/procedure/mg_procedure_helpers.hpp"
|
||||
@ -4021,3 +4024,131 @@ mgp_error mgp_untrack_current_thread_allocations(mgp_graph *graph) {
|
||||
std::visit([](auto *db_accessor) -> void { db_accessor->UntrackCurrentThreadAllocations(); }, graph->impl);
|
||||
});
|
||||
}
|
||||
|
||||
mgp_execution_headers::mgp_execution_headers(memgraph::utils::pmr::vector<memgraph::utils::pmr::string> &&storage)
|
||||
: headers(std::move(storage)){};
|
||||
|
||||
mgp_error mgp_execution_headers_size(mgp_execution_headers *headers, size_t *result) {
|
||||
static_assert(noexcept(headers->headers.size()));
|
||||
*result = headers->headers.size();
|
||||
return mgp_error::MGP_ERROR_NO_ERROR;
|
||||
}
|
||||
|
||||
mgp_error mgp_execution_headers_at(mgp_execution_headers *headers, size_t index, const char **result) {
|
||||
return WrapExceptions(
|
||||
[headers, index] {
|
||||
if (index >= Call<size_t>(mgp_execution_headers_size, headers)) {
|
||||
throw std::out_of_range("Header cannot be retrieved, because index exceeds headers' size!");
|
||||
}
|
||||
return headers->headers[index].data();
|
||||
},
|
||||
result);
|
||||
}
|
||||
|
||||
mgp_execution_rows::mgp_execution_rows(
|
||||
memgraph::utils::pmr::vector<memgraph::utils::pmr::vector<memgraph::query::TypedValue>> &&tv_rows)
|
||||
: rows(std::move(tv_rows)) {}
|
||||
|
||||
struct MgProcedureResultStream final {
|
||||
explicit MgProcedureResultStream(mgp_memory *memory) : rows(memory->impl), memory(memory) {}
|
||||
using Row = std::vector<memgraph::query::TypedValue>;
|
||||
using Rows = std::vector<Row>;
|
||||
using PmrRow = memgraph::utils::pmr::vector<memgraph::query::TypedValue>;
|
||||
using PmrRows = memgraph::utils::pmr::vector<PmrRow>;
|
||||
|
||||
PmrRows rows;
|
||||
mgp_memory *memory;
|
||||
|
||||
void Result(const Row &row) {
|
||||
PmrRow pmr_row(memory->impl);
|
||||
for (auto &val : row) {
|
||||
pmr_row.emplace_back(std::move(val));
|
||||
}
|
||||
|
||||
rows.emplace_back(std::move(pmr_row));
|
||||
}
|
||||
};
|
||||
|
||||
std::map<std::string, memgraph::storage::PropertyValue> CreateQueryParams(mgp_map *params) {
|
||||
std::map<std::string, memgraph::storage::PropertyValue> query_params;
|
||||
for (auto &[k, v] : params->items) {
|
||||
query_params.emplace(k, ToPropertyValue(v));
|
||||
}
|
||||
|
||||
return query_params;
|
||||
}
|
||||
|
||||
struct mgp_execution_result::pImplMgpExecutionResult {
|
||||
std::unique_ptr<memgraph::query::Interpreter> interpreter;
|
||||
std::unique_ptr<mgp_execution_headers> headers;
|
||||
};
|
||||
|
||||
mgp_execution_result::mgp_execution_result() : pImpl(std::make_unique<pImplMgpExecutionResult>()) {
|
||||
auto *instance = memgraph::query::InterpreterContext::getInstance();
|
||||
pImpl->interpreter = std::make_unique<memgraph::query::Interpreter>(instance, instance->dbms_handler->Get());
|
||||
}
|
||||
|
||||
mgp_execution_result::~mgp_execution_result() {
|
||||
auto *instance = memgraph::query::InterpreterContext::getInstance();
|
||||
instance->interpreters.WithLock([this](auto &interpreters) { interpreters.erase(pImpl->interpreter.get()); });
|
||||
// interpreter will delete itself because it's a smart pointer
|
||||
}
|
||||
|
||||
mgp_error mgp_execute_query(mgp_graph *graph, mgp_memory *memory, const char *query, mgp_map *params,
|
||||
mgp_execution_result **result) {
|
||||
return WrapExceptions(
|
||||
[query, params, graph, memory]() {
|
||||
auto query_string = std::string(query);
|
||||
auto *instance = memgraph::query::InterpreterContext::getInstance();
|
||||
|
||||
mgp_execution_result *result = NewRawMgpObject<mgp_execution_result>(memory->impl);
|
||||
result->pImpl->interpreter->SetUser(graph->ctx->user_or_role);
|
||||
|
||||
instance->interpreters.WithLock(
|
||||
[result](auto &interpreters) { interpreters.insert(result->pImpl->interpreter.get()); });
|
||||
|
||||
const auto query_params = CreateQueryParams(params);
|
||||
|
||||
auto prepare_query_result = result->pImpl->interpreter->Prepare(query_string, query_params, {});
|
||||
|
||||
memgraph::utils::pmr::vector<memgraph::utils::pmr::string> headers(memory->impl);
|
||||
for (auto header : prepare_query_result.headers) {
|
||||
headers.emplace_back(header);
|
||||
}
|
||||
result->pImpl->headers = std::make_unique<mgp_execution_headers>(std::move(headers));
|
||||
|
||||
return result;
|
||||
},
|
||||
result);
|
||||
}
|
||||
|
||||
mgp_error mgp_fetch_execution_headers(mgp_execution_result *exec_result, mgp_execution_headers **result) {
|
||||
return WrapExceptions([exec_result]() { return exec_result->pImpl->headers.get(); }, result);
|
||||
}
|
||||
|
||||
mgp_error mgp_pull_one(mgp_execution_result *exec_result, mgp_graph *graph, mgp_memory *memory, mgp_map **result) {
|
||||
return WrapExceptions(
|
||||
[exec_result, graph, memory]() -> mgp_map * {
|
||||
MgProcedureResultStream stream(memory);
|
||||
|
||||
try {
|
||||
exec_result->pImpl->interpreter->Pull(&stream, 1, {});
|
||||
} catch (const std::exception &e) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
if (stream.rows.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const size_t headers_size = exec_result->pImpl->headers->headers.size();
|
||||
memgraph::utils::pmr::map<memgraph::utils::pmr::string, mgp_value> items(memory->impl);
|
||||
for (size_t idx = 0; idx < headers_size; idx++) {
|
||||
items.emplace(exec_result->pImpl->headers->headers[idx],
|
||||
mgp_value{std::move(stream.rows[0][idx]), graph, memory->impl});
|
||||
}
|
||||
|
||||
return NewRawMgpObject<mgp_map>(memory->impl, std::move(items));
|
||||
},
|
||||
result);
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
// Copyright 2024 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -16,6 +16,7 @@
|
||||
|
||||
#include "mg_procedure.h"
|
||||
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <ostream>
|
||||
|
||||
@ -24,6 +25,7 @@
|
||||
#include "query/context.hpp"
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
|
||||
#include "query/procedure/cypher_type_ptr.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
@ -33,6 +35,7 @@
|
||||
#include "utils/pmr/vector.hpp"
|
||||
#include "utils/temporal.hpp"
|
||||
#include "utils/variant_helpers.hpp"
|
||||
|
||||
/// Wraps memory resource used in custom procedures.
|
||||
///
|
||||
/// This should have been `using mgp_memory = memgraph::utils::MemoryResource`, but that's
|
||||
@ -993,3 +996,29 @@ struct mgp_messages {
|
||||
bool ContainsDeleted(const mgp_value *val);
|
||||
|
||||
memgraph::query::TypedValue ToTypedValue(const mgp_value &val, memgraph::utils::MemoryResource *memory);
|
||||
|
||||
struct mgp_execution_headers {
|
||||
using allocator_type = memgraph::utils::Allocator<mgp_execution_headers>;
|
||||
using storage_type = memgraph::utils::pmr::vector<memgraph::utils::pmr::string>;
|
||||
explicit mgp_execution_headers(storage_type &&storage);
|
||||
|
||||
~mgp_execution_headers() = default;
|
||||
|
||||
storage_type headers;
|
||||
};
|
||||
|
||||
struct mgp_execution_rows {
|
||||
explicit mgp_execution_rows(
|
||||
memgraph::utils::pmr::vector<memgraph::utils::pmr::vector<memgraph::query::TypedValue>> &&tv_rows);
|
||||
~mgp_execution_rows() = default;
|
||||
|
||||
memgraph::utils::pmr::vector<memgraph::utils::pmr::vector<memgraph::query::TypedValue>> rows;
|
||||
};
|
||||
|
||||
struct mgp_execution_result {
|
||||
explicit mgp_execution_result();
|
||||
~mgp_execution_result();
|
||||
|
||||
struct pImplMgpExecutionResult;
|
||||
std::unique_ptr<pImplMgpExecutionResult> pImpl;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user