Extra procedures transformation info (#310)

This commit is contained in:
Antonio Andelic 2022-01-13 13:46:32 +01:00 committed by GitHub
parent fec887a67c
commit 7deac4ac8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 207 additions and 111 deletions

View File

@ -30,6 +30,7 @@ set(mg_query_sources
plan/rule_based_planner.cpp
plan/variable_start_planner.cpp
procedure/mg_procedure_impl.cpp
procedure/mg_procedure_helpers.cpp
procedure/module.cpp
procedure/py_module.cpp
serialization/property_value.cpp

View File

@ -0,0 +1,36 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "query/procedure/mg_procedure_helpers.hpp"
namespace query::procedure {
MgpUniquePtr<mgp_value> GetStringValueOrSetError(const char *string, mgp_memory *memory, mgp_result *result) {
procedure::MgpUniquePtr<mgp_value> value{nullptr, mgp_value_destroy};
const auto success =
TryOrSetError([&] { return procedure::CreateMgpObject(value, mgp_value_make_string, string, memory); }, result);
if (!success) {
value.reset();
}
return value;
}
bool InsertResultOrSetError(mgp_result *result, mgp_result_record *record, const char *result_name, mgp_value *value) {
if (const auto err = mgp_result_record_insert(record, result_name, value); err != MGP_ERROR_NO_ERROR) {
const auto error_msg = fmt::format("Unable to set the result for {}, error = {}", result_name, err);
static_cast<void>(mgp_result_set_error_msg(result, error_msg.c_str()));
return false;
}
return true;
}
} // namespace query::procedure

View File

@ -15,6 +15,8 @@
#include <type_traits>
#include <utility>
#include <fmt/format.h>
#include "mg_procedure.h"
namespace query::procedure {
@ -45,4 +47,23 @@ mgp_error CreateMgpObject(MgpUniquePtr<TObj> &obj, TFunc func, TArgs &&...args)
obj.reset(raw_obj);
return err;
}
template <typename Fun>
[[nodiscard]] bool TryOrSetError(Fun &&func, mgp_result *result) {
if (const auto err = func(); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return false;
} else if (err != MGP_ERROR_NO_ERROR) {
const auto error_msg = fmt::format("Unexpected error ({})!", err);
static_cast<void>(mgp_result_set_error_msg(result, error_msg.c_str()));
return false;
}
return true;
}
[[nodiscard]] MgpUniquePtr<mgp_value> GetStringValueOrSetError(const char *string, mgp_memory *memory,
mgp_result *result);
[[nodiscard]] bool InsertResultOrSetError(mgp_result *result, mgp_result_record *record, const char *result_name,
mgp_value *value);
} // namespace query::procedure

View File

@ -18,7 +18,9 @@ extern "C" {
#include <optional>
#include "fmt/format.h"
#include <fmt/format.h>
#include <unistd.h>
#include "py/py.hpp"
#include "query/procedure/mg_procedure_helpers.hpp"
#include "query/procedure/py_module.hpp"
@ -53,6 +55,8 @@ class BuiltinModule final : public Module {
void AddTransformation(std::string_view name, mgp_trans trans);
std::optional<std::filesystem::path> Path() const override { return std::nullopt; }
private:
/// Registered procedures
std::map<std::string, mgp_proc, std::less<>> procedures_;
@ -133,6 +137,20 @@ void RegisterMgLoad(ModuleRegistry *module_registry, utils::RWLock *lock, Builti
module->AddProcedure("load", std::move(load));
}
namespace {
[[nodiscard]] bool IsFileEditable(const std::optional<std::filesystem::path> &path) {
return path && access(path->c_str(), W_OK) == 0;
}
std::string GetPathString(const std::optional<std::filesystem::path> &path) {
if (!path) {
return "builtin";
}
return std::filesystem::canonical(*path).generic_string();
}
} // namespace
void RegisterMgProcedures(
// We expect modules to be sorted by name.
const std::map<std::string, std::unique_ptr<Module>, std::less<>> *all_modules, BuiltinModule *module) {
@ -147,57 +165,80 @@ void RegisterMgProcedures(
static_assert(
std::is_same_v<decltype(module->Procedures()), const std::map<std::string, mgp_proc, std::less<>> *>,
"Expected module procedures to be sorted by name");
const auto path = module->Path();
const auto path_string = GetPathString(path);
const auto is_editable = IsFileEditable(path);
for (const auto &[proc_name, proc] : *module->Procedures()) {
mgp_result_record *record{nullptr};
if (const auto err = mgp_result_new_record(result, &record); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return;
} else if (err != MGP_ERROR_NO_ERROR) {
static_cast<void>(mgp_result_set_error_msg(result, "Unexpected error"));
{
const auto success = TryOrSetError([&] { return mgp_result_new_record(result, &record); }, result);
if (!success) {
return;
}
}
const auto path_value = GetStringValueOrSetError(path_string.c_str(), memory, result);
if (!path_value) {
return;
}
MgpUniquePtr<mgp_value> is_editable_value{nullptr, mgp_value_destroy};
{
const auto success = TryOrSetError(
[&] { return CreateMgpObject(is_editable_value, mgp_value_make_bool, is_editable, memory); }, result);
if (!success) {
return;
}
}
utils::pmr::string full_name(module_name, memory->impl);
full_name.append(1, '.');
full_name.append(proc_name);
MgpUniquePtr<mgp_value> name_value{nullptr, mgp_value_destroy};
if (const auto err = CreateMgpObject(name_value, mgp_value_make_string, full_name.c_str(), memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return;
} else if (err != MGP_ERROR_NO_ERROR) {
static_cast<void>(mgp_result_set_error_msg(result, "Unexpected error"));
const auto name_value = GetStringValueOrSetError(full_name.c_str(), memory, result);
if (!name_value) {
return;
}
std::stringstream ss;
ss << module_name << ".";
PrintProcSignature(proc, &ss);
const auto signature = ss.str();
MgpUniquePtr<mgp_value> signature_value{nullptr, mgp_value_destroy};
if (const auto err = CreateMgpObject(signature_value, mgp_value_make_string, signature.c_str(), memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return;
} else if (err != MGP_ERROR_NO_ERROR) {
static_cast<void>(mgp_result_set_error_msg(result, "Unexpected error"));
const auto signature_value = GetStringValueOrSetError(signature.c_str(), memory, result);
if (!signature_value) {
return;
}
MgpUniquePtr<mgp_value> is_write_value{nullptr, mgp_value_destroy};
if (const auto err =
CreateMgpObject(is_write_value, mgp_value_make_bool, proc.is_write_procedure ? 1 : 0, memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return;
} else if (err != MGP_ERROR_NO_ERROR) {
static_cast<void>(mgp_result_set_error_msg(result, "Unexpected error"));
MgpUniquePtr<mgp_value> is_write_value{nullptr, mgp_value_destroy};
{
const auto success = TryOrSetError(
[&, &proc = proc] {
return CreateMgpObject(is_write_value, mgp_value_make_bool, proc.is_write_procedure ? 1 : 0, memory);
},
result);
if (!success) {
return;
}
}
if (!InsertResultOrSetError(result, record, "name", name_value.get())) {
return;
}
const auto err1 = mgp_result_record_insert(record, "name", name_value.get());
const auto err2 = mgp_result_record_insert(record, "signature", signature_value.get());
const auto err3 = mgp_result_record_insert(record, "is_write", is_write_value.get());
if (err1 != MGP_ERROR_NO_ERROR || err2 != MGP_ERROR_NO_ERROR || err3 != MGP_ERROR_NO_ERROR) {
static_cast<void>(mgp_result_set_error_msg(result, "Unable to set the result!"));
if (!InsertResultOrSetError(result, record, "signature", signature_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, "is_write", is_write_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, "path", path_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, "is_editable", is_editable_value.get())) {
return;
}
}
@ -207,6 +248,8 @@ void RegisterMgProcedures(
MG_ASSERT(mgp_proc_add_result(&procedures, "name", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "signature", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_write", Call<mgp_type *>(mgp_type_bool)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_editable", Call<mgp_type *>(mgp_type_bool)) == MGP_ERROR_NO_ERROR);
module->AddProcedure("procedures", std::move(procedures));
}
@ -219,32 +262,52 @@ void RegisterMgTransformations(const std::map<std::string, std::unique_ptr<Modul
static_assert(
std::is_same_v<decltype(module->Transformations()), const std::map<std::string, mgp_trans, std::less<>> *>,
"Expected module transformations to be sorted by name");
const auto path = module->Path();
const auto path_string = GetPathString(path);
const auto is_editable = IsFileEditable(path);
for (const auto &[trans_name, proc] : *module->Transformations()) {
mgp_result_record *record{nullptr};
if (const auto err = mgp_result_new_record(result, &record); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return;
} else if (err != MGP_ERROR_NO_ERROR) {
static_cast<void>(mgp_result_set_error_msg(result, "Unexpected error"));
{
const auto success = TryOrSetError([&] { return mgp_result_new_record(result, &record); }, result);
if (!success) {
return;
}
}
const auto path_value = GetStringValueOrSetError(path_string.c_str(), memory, result);
if (!path_value) {
return;
}
MgpUniquePtr<mgp_value> is_editable_value{nullptr, mgp_value_destroy};
{
const auto success = TryOrSetError(
[&] { return CreateMgpObject(is_editable_value, mgp_value_make_bool, is_editable, memory); }, result);
if (!success) {
return;
}
}
utils::pmr::string full_name(module_name, memory->impl);
full_name.append(1, '.');
full_name.append(trans_name);
MgpUniquePtr<mgp_value> name_value{nullptr, mgp_value_destroy};
if (const auto err = CreateMgpObject(name_value, mgp_value_make_string, full_name.c_str(), memory);
err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return;
} else if (err != MGP_ERROR_NO_ERROR) {
static_cast<void>(mgp_result_set_error_msg(result, "Unexpected error"));
const auto name_value = GetStringValueOrSetError(full_name.c_str(), memory, result);
if (!name_value) {
return;
}
if (const auto err = mgp_result_record_insert(record, "name", name_value.get()); err != MGP_ERROR_NO_ERROR) {
static_cast<void>(mgp_result_set_error_msg(result, "Unable to set the result!"));
if (!InsertResultOrSetError(result, record, "name", name_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, "path", path_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, "is_editable", is_editable_value.get())) {
return;
}
}
@ -252,6 +315,8 @@ void RegisterMgTransformations(const std::map<std::string, std::unique_ptr<Modul
};
mgp_proc procedures("transformations", transformations_cb, utils::NewDeleteResource(), false);
MG_ASSERT(mgp_proc_add_result(&procedures, "name", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "path", Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&procedures, "is_editable", Call<mgp_type *>(mgp_type_bool)) == MGP_ERROR_NO_ERROR);
module->AddProcedure("transformations", std::move(procedures));
}
@ -297,6 +362,8 @@ class SharedLibraryModule final : public Module {
const std::map<std::string, mgp_trans, std::less<>> *Transformations() const override;
std::optional<std::filesystem::path> Path() const override { return file_path_; }
private:
/// Path as requested for loading the module from a library.
std::filesystem::path file_path_;
@ -424,6 +491,7 @@ class PythonModule final : public Module {
const std::map<std::string, mgp_proc, std::less<>> *Procedures() const override;
const std::map<std::string, mgp_trans, std::less<>> *Transformations() const override;
std::optional<std::filesystem::path> Path() const override { return file_path_; }
private:
std::filesystem::path file_path_;
@ -677,9 +745,9 @@ template <typename T>
concept ModuleProperties = utils::SameAsAnyOf<T, mgp_proc, mgp_trans>;
template <ModuleProperties T>
std::optional<std::pair<procedure::ModulePtr, const T *>> MakePairIfPropFound(const ModuleRegistry &module_registry,
std::string_view fully_qualified_name,
utils::MemoryResource *memory) {
std::optional<std::pair<ModulePtr, const T *>> MakePairIfPropFound(const ModuleRegistry &module_registry,
std::string_view fully_qualified_name,
utils::MemoryResource *memory) {
auto prop_fun = [](auto &module) {
if constexpr (std::is_same_v<T, mgp_proc>) {
return module->Procedures();
@ -700,13 +768,13 @@ std::optional<std::pair<procedure::ModulePtr, const T *>> MakePairIfPropFound(co
} // namespace
std::optional<std::pair<procedure::ModulePtr, const mgp_proc *>> FindProcedure(
const ModuleRegistry &module_registry, std::string_view fully_qualified_procedure_name,
utils::MemoryResource *memory) {
std::optional<std::pair<ModulePtr, const mgp_proc *>> FindProcedure(const ModuleRegistry &module_registry,
std::string_view fully_qualified_procedure_name,
utils::MemoryResource *memory) {
return MakePairIfPropFound<mgp_proc>(module_registry, fully_qualified_procedure_name, memory);
}
std::optional<std::pair<procedure::ModulePtr, const mgp_trans *>> FindTransformation(
std::optional<std::pair<ModulePtr, const mgp_trans *>> FindTransformation(
const ModuleRegistry &module_registry, std::string_view fully_qualified_transformation_name,
utils::MemoryResource *memory) {
return MakePairIfPropFound<mgp_trans>(module_registry, fully_qualified_transformation_name, memory);

View File

@ -45,6 +45,8 @@ class Module {
virtual const std::map<std::string, mgp_proc, std::less<>> *Procedures() const = 0;
/// Returns registered transformations of this module
virtual const std::map<std::string, mgp_trans, std::less<>> *Transformations() const = 0;
virtual std::optional<std::filesystem::path> Path() const = 0;
};
/// Proxy for a registered Module, acquires a read lock from ModuleRegistry.

View File

@ -158,43 +158,6 @@ void from_json(const nlohmann::json &data, StreamStatus<TStream> &status) {
from_json(data, status.info);
}
namespace {
template <typename Fun>
[[nodiscard]] bool TryOrSetError(Fun &&func, mgp_result *result) {
if (const auto err = func(); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return false;
} else if (err != MGP_ERROR_NO_ERROR) {
const auto error_msg = fmt::format("Unexpected error ({})!", err);
static_cast<void>(mgp_result_set_error_msg(result, error_msg.c_str()));
return false;
}
return true;
}
[[nodiscard]] auto GetStringValueOrSetError(const char *string, mgp_memory *memory, mgp_result *result) {
procedure::MgpUniquePtr<mgp_value> value{nullptr, mgp_value_destroy};
const auto success =
TryOrSetError([&] { return procedure::CreateMgpObject(value, mgp_value_make_string, string, memory); }, result);
if (!success) {
value.reset();
}
return value;
}
[[nodiscard]] bool InsertResultOrSetError(mgp_result *result, mgp_result_record *record, const auto *result_name,
mgp_value *value) {
if (const auto err = mgp_result_record_insert(record, result_name, value); err != MGP_ERROR_NO_ERROR) {
const auto error_msg = fmt::format("Unable to set the result for {}, error = {}", result_name, err);
static_cast<void>(mgp_result_set_error_msg(result, error_msg.c_str()));
return false;
}
return true;
}
} // namespace
Streams::Streams(InterpreterContext *interpreter_context, std::filesystem::path directory)
: interpreter_context_(interpreter_context), storage_(std::move(directory)) {
RegisterProcedures();
@ -260,20 +223,22 @@ void Streams::RegisterKafkaProcedures() {
const auto info = stream_source_ptr->Info(kafka_stream.transformation_name);
mgp_result_record *record{nullptr};
{
const auto success = TryOrSetError([&] { return mgp_result_new_record(result, &record); }, result);
const auto success =
procedure::TryOrSetError([&] { return mgp_result_new_record(result, &record); }, result);
if (!success) {
return;
}
}
const auto consumer_group_value = GetStringValueOrSetError(info.consumer_group.c_str(), memory, result);
const auto consumer_group_value =
procedure::GetStringValueOrSetError(info.consumer_group.c_str(), memory, result);
if (!consumer_group_value) {
return;
}
procedure::MgpUniquePtr<mgp_list> topic_names{nullptr, mgp_list_destroy};
{
const auto success = TryOrSetError(
const auto success = procedure::TryOrSetError(
[&] {
return procedure::CreateMgpObject(topic_names, mgp_list_make_empty, info.topics.size(), memory);
},
@ -284,7 +249,7 @@ void Streams::RegisterKafkaProcedures() {
}
for (const auto &topic : info.topics) {
auto topic_value = GetStringValueOrSetError(topic.c_str(), memory, result);
auto topic_value = procedure::GetStringValueOrSetError(topic.c_str(), memory, result);
if (!topic_value) {
return;
}
@ -293,7 +258,7 @@ void Streams::RegisterKafkaProcedures() {
procedure::MgpUniquePtr<mgp_value> topics_value{nullptr, mgp_value_destroy};
{
const auto success = TryOrSetError(
const auto success = procedure::TryOrSetError(
[&] {
return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.release());
},
@ -304,22 +269,22 @@ void Streams::RegisterKafkaProcedures() {
}
const auto bootstrap_servers_value =
GetStringValueOrSetError(info.bootstrap_servers.c_str(), memory, result);
procedure::GetStringValueOrSetError(info.bootstrap_servers.c_str(), memory, result);
if (!bootstrap_servers_value) {
return;
}
if (!InsertResultOrSetError(result, record, consumer_group_result_name.data(),
consumer_group_value.get())) {
if (!procedure::InsertResultOrSetError(result, record, consumer_group_result_name.data(),
consumer_group_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, topics_result_name.data(), topics_value.get())) {
if (!procedure::InsertResultOrSetError(result, record, topics_result_name.data(), topics_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, bootstrap_servers_result_name.data(),
bootstrap_servers_value.get())) {
if (!procedure::InsertResultOrSetError(result, record, bootstrap_servers_result_name.data(),
bootstrap_servers_value.get())) {
return;
}
},
@ -363,20 +328,21 @@ void Streams::RegisterPulsarProcedures() {
const auto info = stream_source_ptr->Info(pulsar_stream.transformation_name);
mgp_result_record *record{nullptr};
{
const auto success = TryOrSetError([&] { return mgp_result_new_record(result, &record); }, result);
const auto success =
procedure::TryOrSetError([&] { return mgp_result_new_record(result, &record); }, result);
if (!success) {
return;
}
}
auto service_url_value = GetStringValueOrSetError(info.service_url.c_str(), memory, result);
auto service_url_value = procedure::GetStringValueOrSetError(info.service_url.c_str(), memory, result);
if (!service_url_value) {
return;
}
procedure::MgpUniquePtr<mgp_list> topic_names{nullptr, mgp_list_destroy};
{
const auto success = TryOrSetError(
const auto success = procedure::TryOrSetError(
[&] {
return procedure::CreateMgpObject(topic_names, mgp_list_make_empty, info.topics.size(), memory);
},
@ -387,7 +353,7 @@ void Streams::RegisterPulsarProcedures() {
}
for (const auto &topic : info.topics) {
auto topic_value = GetStringValueOrSetError(topic.c_str(), memory, result);
auto topic_value = procedure::GetStringValueOrSetError(topic.c_str(), memory, result);
if (!topic_value) {
return;
}
@ -396,7 +362,7 @@ void Streams::RegisterPulsarProcedures() {
procedure::MgpUniquePtr<mgp_value> topics_value{nullptr, mgp_value_destroy};
{
const auto success = TryOrSetError(
const auto success = procedure::TryOrSetError(
[&] {
return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.release());
},
@ -406,11 +372,12 @@ void Streams::RegisterPulsarProcedures() {
}
}
if (!InsertResultOrSetError(result, record, topics_result_name.data(), topics_value.get())) {
if (!procedure::InsertResultOrSetError(result, record, topics_result_name.data(), topics_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, service_url_result_name.data(), service_url_value.get())) {
if (!procedure::InsertResultOrSetError(result, record, service_url_result_name.data(),
service_url_value.get())) {
return;
}
},

View File

@ -206,6 +206,7 @@ class MockModule : public procedure::Module {
const std::map<std::string, mgp_proc, std::less<>> *Procedures() const override { return &procedures; }
const std::map<std::string, mgp_trans, std::less<>> *Transformations() const override { return &transformations; }
std::optional<std::filesystem::path> Path() const override { return std::nullopt; };
std::map<std::string, mgp_proc, std::less<>> procedures{};
std::map<std::string, mgp_trans, std::less<>> transformations{};
@ -2812,7 +2813,7 @@ TEST_P(CypherMainVisitorTest, CallProcedureYieldAsterisk) {
ASSERT_TRUE(identifier->user_declared_);
identifier_names.push_back(identifier->name_);
}
ASSERT_THAT(identifier_names, UnorderedElementsAre("name", "signature", "is_write"));
ASSERT_THAT(identifier_names, UnorderedElementsAre("name", "signature", "is_write", "path", "is_editable"));
ASSERT_EQ(identifier_names, call_proc->result_fields_);
CheckCallProcedureDefaultMemoryLimit(ast_generator, *call_proc);
}
@ -2837,7 +2838,7 @@ TEST_P(CypherMainVisitorTest, CallProcedureYieldAsteriskReturnAsterisk) {
ASSERT_TRUE(identifier->user_declared_);
identifier_names.push_back(identifier->name_);
}
ASSERT_THAT(identifier_names, UnorderedElementsAre("name", "signature", "is_write"));
ASSERT_THAT(identifier_names, UnorderedElementsAre("name", "signature", "is_write", "path", "is_editable"));
ASSERT_EQ(identifier_names, call_proc->result_fields_);
CheckCallProcedureDefaultMemoryLimit(ast_generator, *call_proc);
}