Introduce mgp_trans api (#175)

* Added public interface for registering mgp_trans and extended modules accordingly

* Added test for mgp_trans

* Added mg.transformations() to the module registry

Co-authored-by: János Benjamin Antal <antaljanosbenjamin@users.noreply.github.com>
This commit is contained in:
Kostas Kyrimis 2021-06-24 11:39:35 +03:00 committed by Antonio Andelic
parent d6a6d280dd
commit d80ff745eb
7 changed files with 232 additions and 35 deletions

View File

@ -840,18 +840,16 @@ size_t mgp_messages_size(const struct mgp_messages *);
/// Return the message from a messages list at given index
const struct mgp_message *mgp_messages_at(const struct mgp_messages *, size_t);
/// General type that models a transformation
// struct mgp_trans;
/// Entry-point for a module transformation, invoked through a stream transformation.
///
/// Passed in arguments will not live longer than the callback's execution.
/// Therefore, you must not store them globally or use the passed in mgp_memory
/// to allocate global resources.
typedef void (*mgp_trans_cb)(const struct mgp_messages *, struct mgp_graph *, struct mgp_result *, struct mgp_memory *);
// TODO @kostasrim
/// General syntax for a transformation callback
// typedef void (*mgp_trans_cb)(const struct mgp_messages, struct mgp_graph *,
// struct mgp_result *, struct mgp_memory*);
// TODO @kostasrim
/// Adds a transformation cb to the module pointed by mgp_module
// struct mgp_trans *mgp_module_add_transformation(struct mgp_module *module, const char *name,
// mgp_trans_cb cb);
/// Adds a transformation cb to the module pointed by mgp_module.
/// Return non-zero if the transformation is added successfully.
int mgp_module_add_transformation(struct mgp_module *module, const char *name, mgp_trans_cb cb);
/// @}
#ifdef __cplusplus

View File

@ -1457,3 +1457,17 @@ size_t mgp_messages_size(const mgp_messages *messages) { return messages->messag
const mgp_message *mgp_messages_at(const mgp_messages *messages, size_t index) {
return index >= mgp_messages_size(messages) ? nullptr : &messages->messages[index];
}
int mgp_module_add_transformation(mgp_module *module, const char *name, mgp_trans_cb cb) {
if (!module || !cb) return 0;
if (!IsValidIdentifierName(name)) return 0;
if (module->transformations.find(name) != module->transformations.end()) return 0;
try {
auto *memory = module->transformations.get_allocator().GetMemoryResource();
// May throw std::bad_alloc, std::length_error
module->transformations.emplace(name, mgp_trans(name, cb, memory));
return 1;
} catch (...) {
return 0;
}
}

View File

@ -473,14 +473,51 @@ struct mgp_proc {
utils::pmr::map<utils::pmr::string, std::pair<const query::procedure::CypherType *, bool>> results;
};
struct mgp_trans {
using allocator_type = utils::Allocator<mgp_trans>;
/// @throw std::bad_alloc
/// @throw std::length_error
mgp_trans(const char *name, mgp_trans_cb cb, utils::MemoryResource *memory) : name(name, memory), cb(cb) {}
/// @throw std::bad_alloc
/// @throw std::length_error
mgp_trans(const char *name,
std::function<void(const mgp_messages *, const mgp_graph *, mgp_result *, mgp_memory *)> cb,
utils::MemoryResource *memory)
: name(name, memory), cb(cb) {}
/// @throw std::bad_alloc
/// @throw std::length_error
mgp_trans(const mgp_trans &other, utils::MemoryResource *memory) : name(other.name, memory), cb(other.cb) {}
mgp_trans(mgp_trans &&other, utils::MemoryResource *memory)
: name(std::move(other.name), memory), cb(std::move(other.cb)) {}
mgp_trans(const mgp_trans &other) = default;
mgp_trans(mgp_trans &&other) = default;
mgp_trans &operator=(const mgp_trans &) = delete;
mgp_trans &operator=(mgp_trans &&) = delete;
~mgp_trans() = default;
/// Name of the transformation.
utils::pmr::string name;
/// Entry-point for the transformation.
std::function<void(const mgp_messages *, mgp_graph *, mgp_result *, mgp_memory *)> cb;
};
struct mgp_module {
using allocator_type = utils::Allocator<mgp_module>;
explicit mgp_module(utils::MemoryResource *memory) : procedures(memory) {}
explicit mgp_module(utils::MemoryResource *memory) : procedures(memory), transformations(memory) {}
mgp_module(const mgp_module &other, utils::MemoryResource *memory) : procedures(other.procedures, memory) {}
mgp_module(const mgp_module &other, utils::MemoryResource *memory)
: procedures(other.procedures, memory), transformations(other.transformations, memory) {}
mgp_module(mgp_module &&other, utils::MemoryResource *memory) : procedures(std::move(other.procedures), memory) {}
mgp_module(mgp_module &&other, utils::MemoryResource *memory)
: procedures(std::move(other.procedures), memory), transformations(std::move(other.transformations), memory) {}
mgp_module(const mgp_module &) = default;
mgp_module(mgp_module &&) = default;
@ -491,6 +528,7 @@ struct mgp_module {
~mgp_module() = default;
utils::pmr::map<utils::pmr::string, mgp_proc> procedures;
utils::pmr::map<utils::pmr::string, mgp_trans> transformations;
};
namespace query::procedure {

View File

@ -33,11 +33,16 @@ class BuiltinModule final : public Module {
const std::map<std::string, mgp_proc, std::less<>> *Procedures() const override;
const std::map<std::string, mgp_trans, std::less<>> *Transformations() const override;
void AddProcedure(std::string_view name, mgp_proc proc);
void AddTransformation(std::string_view name, mgp_trans trans);
private:
/// Registered procedures
std::map<std::string, mgp_proc, std::less<>> procedures_;
std::map<std::string, mgp_trans, std::less<>> transformations_;
};
BuiltinModule::BuiltinModule() {}
@ -48,8 +53,16 @@ bool BuiltinModule::Close() { return true; }
const std::map<std::string, mgp_proc, std::less<>> *BuiltinModule::Procedures() const { return &procedures_; }
const std::map<std::string, mgp_trans, std::less<>> *BuiltinModule::Transformations() const {
return &transformations_;
}
void BuiltinModule::AddProcedure(std::string_view name, mgp_proc proc) { procedures_.emplace(name, std::move(proc)); }
void BuiltinModule::AddTransformation(std::string_view name, mgp_trans trans) {
transformations_.emplace(name, std::move(trans));
}
namespace {
void RegisterMgLoad(ModuleRegistry *module_registry, utils::RWLock *lock, BuiltinModule *module) {
@ -151,13 +164,49 @@ void RegisterMgProcedures(
module->AddProcedure("procedures", std::move(procedures));
}
void RegisterMgTransformations(const std::map<std::string, std::unique_ptr<Module>, std::less<>> *all_modules,
BuiltinModule *module) {
auto procedures_cb = [all_modules](const mgp_list * /*unused*/, const mgp_graph * /*unused*/, mgp_result *result,
mgp_memory *memory) {
for (const auto &[module_name, module] : *all_modules) {
// Return the results in sorted order by module and by transformation.
static_assert(
std::is_same_v<decltype(module->Transformations()), const std::map<std::string, mgp_trans, std::less<>> *>,
"Expected module transformations to be sorted by name");
for (const auto &[trans_name, proc] : *module->Transformations()) {
auto *record = mgp_result_new_record(result);
if (!record) {
mgp_result_set_error_msg(result, "Not enough memory!");
return;
}
utils::pmr::string full_name(module_name, memory->impl);
full_name.append(1, '.');
full_name.append(trans_name);
auto *name_value = mgp_value_make_string(full_name.c_str(), memory);
if (!name_value) {
mgp_result_set_error_msg(result, "Not enough memory!");
return;
}
int succ = mgp_result_record_insert(record, "name", name_value);
mgp_value_destroy(name_value);
if (!succ) {
mgp_result_set_error_msg(result, "Unable to set the result!");
return;
}
}
}
};
mgp_proc procedures("transformations", procedures_cb, utils::NewDeleteResource());
mgp_proc_add_result(&procedures, "name", mgp_type_string());
module->AddProcedure("transformations", std::move(procedures));
}
// Run `fun` with `mgp_module *` and `mgp_memory *` arguments. If `fun` returned
// a `true` value, store the `mgp_module::procedures` into `proc_map`. The
// return value of WithModuleRegistration is the same as that of `fun`. Note,
// the return value need only be convertible to `bool`, it does not have to be
// `bool` itself.
template <class TProcMap, class TFun>
auto WithModuleRegistration(TProcMap *proc_map, const TFun &fun) {
// a `true` value, store the `mgp_module::procedures` and
// `mgp_module::transformations into `proc_map`. The return value of WithModuleRegistration
// is the same as that of `fun`. Note, the return value need only be convertible to `bool`,
// it does not have to be `bool` itself.
template <class TProcMap, class TTransMap, class TFun>
auto WithModuleRegistration(TProcMap *proc_map, TTransMap *trans_map, const TFun &fun) {
// We probably don't need more than 256KB for module initialization.
constexpr size_t stack_bytes = 256 * 1024;
unsigned char stack_memory[stack_bytes];
@ -165,9 +214,12 @@ auto WithModuleRegistration(TProcMap *proc_map, const TFun &fun) {
mgp_memory memory{&monotonic_memory};
mgp_module module_def{memory.impl};
auto res = fun(&module_def, &memory);
if (res)
if (res) {
// Copy procedures into resulting proc_map.
for (const auto &proc : module_def.procedures) proc_map->emplace(proc);
// Copy transformations into resulting trans_map.
for (const auto &trans : module_def.transformations) trans_map->emplace(trans);
}
return res;
}
@ -188,6 +240,8 @@ class SharedLibraryModule final : public Module {
const std::map<std::string, mgp_proc, std::less<>> *Procedures() const override;
const std::map<std::string, mgp_trans, std::less<>> *Transformations() const override;
private:
/// Path as requested for loading the module from a library.
std::filesystem::path file_path_;
@ -199,6 +253,8 @@ class SharedLibraryModule final : public Module {
std::function<int()> shutdown_fn_;
/// Registered procedures
std::map<std::string, mgp_proc, std::less<>> procedures_;
/// Registered transformations
std::map<std::string, mgp_trans, std::less<>> transformations_;
};
SharedLibraryModule::SharedLibraryModule() : handle_(nullptr) {}
@ -226,7 +282,7 @@ bool SharedLibraryModule::Load(const std::filesystem::path &file_path) {
handle_ = nullptr;
return false;
}
if (!WithModuleRegistration(&procedures_, [&](auto *module_def, auto *memory) {
if (!WithModuleRegistration(&procedures_, &transformations_, [&](auto *module_def, auto *memory) {
// Run mgp_init_module which must succeed.
int init_res = init_fn_(module_def, memory);
if (init_res != 0) {
@ -274,6 +330,13 @@ const std::map<std::string, mgp_proc, std::less<>> *SharedLibraryModule::Procedu
return &procedures_;
}
const std::map<std::string, mgp_trans, std::less<>> *SharedLibraryModule::Transformations() const {
MG_ASSERT(handle_,
"Attempting to access procedures of a module that has not "
"been loaded...");
return &transformations_;
}
class PythonModule final : public Module {
public:
PythonModule();
@ -288,11 +351,13 @@ class PythonModule final : public Module {
bool Close() override;
const std::map<std::string, mgp_proc, std::less<>> *Procedures() const override;
const std::map<std::string, mgp_trans, std::less<>> *Transformations() const override;
private:
std::filesystem::path file_path_;
py::Object py_module_;
std::map<std::string, mgp_proc, std::less<>> procedures_;
std::map<std::string, mgp_trans, std::less<>> transformations_;
};
PythonModule::PythonModule() {}
@ -311,7 +376,7 @@ bool PythonModule::Load(const std::filesystem::path &file_path) {
spdlog::error("Unable to load module {}; {}", file_path, *maybe_exc);
return false;
}
py_module_ = WithModuleRegistration(&procedures_, [&](auto *module_def, auto *memory) {
py_module_ = WithModuleRegistration(&procedures_, &transformations_, [&](auto *module_def, auto *memory) {
return ImportPyModule(file_path.stem().c_str(), module_def);
});
if (py_module_) {
@ -351,6 +416,12 @@ const std::map<std::string, mgp_proc, std::less<>> *PythonModule::Procedures() c
return &procedures_;
}
const std::map<std::string, mgp_trans, std::less<>> *PythonModule::Transformations() const {
MG_ASSERT(py_module_,
"Attempting to access procedures of a module that has "
"not been loaded...");
return &transformations_;
}
namespace {
std::unique_ptr<Module> LoadModuleFromFile(const std::filesystem::path &path) {
@ -397,6 +468,7 @@ void ModuleRegistry::DoUnloadAllModules() {
ModuleRegistry::ModuleRegistry() {
auto module = std::make_unique<BuiltinModule>();
RegisterMgProcedures(&modules_, module.get());
RegisterMgTransformations(&modules_, module.get());
RegisterMgLoad(this, &lock_, module.get());
modules_.emplace("mg", std::move(module));
}
@ -481,22 +553,61 @@ void ModuleRegistry::UnloadAllModules() {
utils::MemoryResource &ModuleRegistry::GetSharedMemoryResource() { return *shared_; }
std::optional<std::pair<procedure::ModulePtr, const mgp_proc *>> FindProcedure(
const ModuleRegistry &module_registry, const std::string_view &fully_qualified_procedure_name,
utils::MemoryResource *memory) {
namespace {
/// This function returns a pair of either
// ModuleName | Prop
/// 1. <ModuleName, ProcedureName>
/// 2. <ModuleName, TransformationName>
std::optional<std::pair<std::string_view, std::string_view>> FindModuleNameAndProp(
const ModuleRegistry &module_registry, std::string_view fully_qualified_name, utils::MemoryResource *memory) {
utils::pmr::vector<std::string_view> name_parts(memory);
utils::Split(&name_parts, fully_qualified_procedure_name, ".");
utils::Split(&name_parts, fully_qualified_name, ".");
if (name_parts.size() == 1U) return std::nullopt;
auto last_dot_pos = fully_qualified_procedure_name.find_last_of('.');
auto last_dot_pos = fully_qualified_name.find_last_of('.');
MG_ASSERT(last_dot_pos != std::string_view::npos);
const auto &module_name = fully_qualified_procedure_name.substr(0, last_dot_pos);
const auto &proc_name = name_parts.back();
const auto &module_name = fully_qualified_name.substr(0, last_dot_pos);
const auto &name = name_parts.back();
return std::make_pair(module_name, name);
}
template <typename T>
concept ModuleProperties = utils::SameAsAnyOf<T, mgp_proc, mgp_trans>;
template <ModuleProperties T>
std::optional<std::pair<procedure::ModulePtr, const T *>> MakePairIfPropFound(const ModuleRegistry &module_registry,
std::string_view fully_qualified_name,
utils::MemoryResource *memory) {
auto prop_fun = [](auto &module) {
if constexpr (std::is_same_v<T, mgp_proc>) {
return module->Procedures();
} else {
return module->Transformations();
}
};
auto result = FindModuleNameAndProp(module_registry, fully_qualified_name, memory);
if (!result) return std::nullopt;
auto [module_name, prop_name] = *result;
auto module = module_registry.GetModuleNamed(module_name);
if (!module) return std::nullopt;
const auto procedures = module->Procedures();
const auto &proc_it = procedures->find(proc_name);
if (proc_it == procedures->end()) return std::nullopt;
return std::make_pair(std::move(module), &proc_it->second);
auto *prop = prop_fun(module);
const auto &prop_it = prop->find(prop_name);
if (prop_it == prop->end()) return std::nullopt;
return std::make_pair(std::move(module), &prop_it->second);
}
} // namespace
std::optional<std::pair<procedure::ModulePtr, const mgp_proc *>> FindProcedure(
const ModuleRegistry &module_registry, std::string_view fully_qualified_procedure_name,
utils::MemoryResource *memory) {
return MakePairIfPropFound<mgp_proc>(module_registry, fully_qualified_procedure_name, memory);
}
std::optional<std::pair<procedure::ModulePtr, const mgp_trans *>> FindTransformation(
const ModuleRegistry &module_registry, std::string_view fully_qualified_transformation_name,
utils::MemoryResource *memory) {
return MakePairIfPropFound<mgp_trans>(module_registry, fully_qualified_transformation_name, memory);
}
} // namespace query::procedure

View File

@ -30,6 +30,8 @@ class Module {
/// Returns registered procedures of this module
virtual const std::map<std::string, mgp_proc, std::less<>> *Procedures() const = 0;
/// Returns registered transformations of this module
virtual const std::map<std::string, mgp_trans, std::less<>> *Transformations() const = 0;
};
/// Proxy for a registered Module, acquires a read lock from ModuleRegistry.
@ -112,7 +114,14 @@ extern ModuleRegistry gModuleRegistry;
/// inside this function. ModulePtr must be kept alive to make sure it won't be
/// unloaded.
std::optional<std::pair<procedure::ModulePtr, const mgp_proc *>> FindProcedure(
const ModuleRegistry &module_registry, const std::string_view &fully_qualified_procedure_name,
const ModuleRegistry &module_registry, const std::string_view fully_qualified_procedure_name,
utils::MemoryResource *memory);
/// Return the ModulePtr and `mgp_trans *` of the found transformation after resolving
/// `fully_qualified_transformation_name`. `memory` is used for temporary allocations
/// inside this function. ModulePtr must be kept alive to make sure it won't be
/// unloaded.
std::optional<std::pair<procedure::ModulePtr, const mgp_trans *>> FindTransformation(
const ModuleRegistry &module_registry, const std::string_view fully_qualified_transformation_name,
utils::MemoryResource *memory);
} // namespace query::procedure

View File

@ -56,6 +56,8 @@ target_link_libraries(${test_prefix}integrations_kafka_consumer kafka-mock mg-in
add_unit_test(mgp_kafka_c_api.cpp)
target_link_libraries(${test_prefix}mgp_kafka_c_api mg-query mg-integrations-kafka)
add_unit_test(mgp_trans_c_api.cpp)
target_link_libraries(${test_prefix}mgp_trans_c_api mg-query)
# Test mg-query
@ -343,3 +345,4 @@ add_custom_command(
add_custom_target(test_lcp ALL DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/test_lcp)
add_test(test_lcp ${CMAKE_CURRENT_BINARY_DIR}/test_lcp)
add_dependencies(memgraph__unit test_lcp)

View File

@ -0,0 +1,24 @@
#include "gtest/gtest.h"
#include "query/procedure/mg_procedure_impl.hpp"
#include "query/procedure/module.hpp"
#include "test_utils.hpp"
TEST(MgpTransTest, TestMgpTransApi) {
constexpr auto no_op_cb = [](const mgp_messages *msg, mgp_graph *graph, mgp_result *result, mgp_memory *memory) {};
mgp_module module(utils::NewDeleteResource());
// If this is false, then mgp_module_add_transformation()
// correctly calls IsValidIdentifier(). We don't need to test
// for different string cases as these are all handled by
// IsValidIdentifier().
// Maybe add a mock instead and expect IsValidIdentifier() to be called once?
EXPECT_FALSE(mgp_module_add_transformation(&module, "dash-dash", no_op_cb));
EXPECT_TRUE(module.transformations.size() == 0);
EXPECT_TRUE(mgp_module_add_transformation(&module, "transform", no_op_cb));
EXPECT_NE(module.transformations.find("transform"), module.transformations.end());
// Try to register a transformation twice
EXPECT_FALSE(mgp_module_add_transformation(&module, "transform", no_op_cb));
EXPECT_TRUE(module.transformations.size() == 1);
}