Refactor query module to support differently loaded modules

Summary:
`query::procedure::Module` is now an interface class through which
users interact with loaded modules. At the moment, the interface is implemented
by three concrete classes: `query::procedure::BuiltinModule`,
`query::procedure::SharedLibraryModule` and `query::procedure::PythonModule`.

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2664
This commit is contained in:
Ivan Paljak 2020-02-11 16:13:09 +01:00
parent d17d1497d1
commit 2aa960403a
2 changed files with 275 additions and 146 deletions

View File

@ -15,103 +15,8 @@ namespace query::procedure {
ModuleRegistry gModuleRegistry;
namespace {
std::optional<Module> LoadModuleFromPythonFile(std::filesystem::path path) {
LOG(INFO) << "Loading module " << path << " ...";
auto gil = py::EnsureGIL();
auto *py_path = PySys_GetObject("path");
CHECK(py_path);
py::Object import_dir(PyUnicode_FromString(path.parent_path().c_str()));
int import_dir_in_path = PySequence_Contains(py_path, import_dir);
if (import_dir_in_path == -1) {
LOG(ERROR) << "Unexpected error when loading module " << path;
return std::nullopt;
}
if (import_dir_in_path == 0) {
if (PyList_Append(py_path, import_dir) != 0) {
auto exc_info = py::FetchError().value();
LOG(ERROR) << "Unable to load module " << path << "; " << exc_info;
return std::nullopt;
}
}
py::Object py_module(PyImport_ImportModule(path.stem().c_str()));
if (!py_module) {
auto exc_info = py::FetchError().value();
LOG(ERROR) << "Unable to load module " << path << "; " << exc_info;
return std::nullopt;
}
// TODO: Actually create a module
return std::nullopt;
}
std::optional<Module> LoadModuleFromSharedLibrary(std::filesystem::path path) {
LOG(INFO) << "Loading module " << path << " ...";
Module module{path};
dlerror(); // Clear any existing error.
module.handle = dlopen(path.c_str(), RTLD_NOW | RTLD_LOCAL);
if (!module.handle) {
LOG(ERROR) << "Unable to load module " << path << "; " << dlerror();
return std::nullopt;
}
// Get required mgp_init_module
module.init_fn = reinterpret_cast<int (*)(mgp_module *, mgp_memory *)>(
dlsym(module.handle, "mgp_init_module"));
const char *error = dlerror();
if (!module.init_fn || error) {
LOG(ERROR) << "Unable to load module " << path << "; " << error;
dlclose(module.handle);
return std::nullopt;
}
// We probably don't need more than 256KB for module initialazation.
constexpr size_t stack_bytes = 256 * 1024;
unsigned char stack_memory[stack_bytes];
utils::MonotonicBufferResource monotonic_memory(stack_memory, stack_bytes);
mgp_memory memory{&monotonic_memory};
mgp_module module_def{memory.impl};
// Run mgp_init_module which must succeed.
int init_res = module.init_fn(&module_def, &memory);
if (init_res != 0) {
LOG(ERROR) << "Unable to load module " << path
<< "; mgp_init_module returned " << init_res;
dlclose(module.handle);
return std::nullopt;
}
// Copy procedures into our memory.
for (const auto &proc : module_def.procedures)
module.procedures.emplace(proc);
// Get optional mgp_shutdown_module
module.shutdown_fn =
reinterpret_cast<int (*)()>(dlsym(module.handle, "mgp_shutdown_module"));
error = dlerror();
if (error) LOG(WARNING) << "When loading module " << path << "; " << error;
LOG(INFO) << "Loaded module " << path;
return module;
}
bool CloseModule(Module *module) {
LOG(INFO) << "Closing module " << module->file_path << " ...";
if (module->shutdown_fn) {
int shutdown_res = module->shutdown_fn();
if (shutdown_res != 0) {
LOG(WARNING) << "When closing module " << module->file_path
<< "; mgp_shutdown_module returned " << shutdown_res;
}
}
if (dlclose(module->handle) != 0) {
LOG(ERROR) << "Failed to close module " << module->file_path << "; "
<< dlerror();
return false;
}
LOG(INFO) << "Closed module " << module->file_path;
return true;
}
// Return true if the module is builtin, i.e. not loaded from dynamic lib.
// Builtin modules cannot be reloaded nor unloaded.
bool IsBuiltinModule(const Module &module) { return module.handle == nullptr; }
void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock,
Module *module) {
BuiltinModule *module) {
// Reloading relies on the fact that regular procedure invocation through
// CallProcedureCursor::Pull takes ModuleRegistry::lock_ with READ access. To
// reload modules we have to upgrade our READ access to WRITE access,
@ -144,7 +49,7 @@ void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock,
if (!succ) mgp_result_set_error_msg(res, "Failed to reload all modules.");
};
mgp_proc reload_all("reload_all", reload_all_cb, utils::NewDeleteResource());
module->procedures.emplace("reload_all", std::move(reload_all));
module->AddProcedure("reload_all", std::move(reload_all));
auto reload_cb = [module_registry, with_unlock_shared](
const mgp_list *args, const mgp_graph *, mgp_result *res,
mgp_memory *) {
@ -161,13 +66,14 @@ void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock,
};
mgp_proc reload("reload", reload_cb, utils::NewDeleteResource());
mgp_proc_add_arg(&reload, "module_name", mgp_type_string());
module->procedures.emplace("reload", std::move(reload));
module->AddProcedure("reload", std::move(reload));
}
void RegisterMgProcedures(
// We expect modules to be sorted by name.
const std::map<std::string, Module, std::less<>> *all_modules,
Module *module) {
const std::map<std::string, std::unique_ptr<Module>, std::less<>>
*all_modules,
BuiltinModule *module) {
auto procedures_cb = [all_modules](const mgp_list *, const mgp_graph *,
mgp_result *result, mgp_memory *memory) {
// Iterating over all_modules assumes that the standard mechanism of custom
@ -177,10 +83,10 @@ void RegisterMgProcedures(
for (const auto &[module_name, module] : *all_modules) {
// Return the results in sorted order by module and by procedure.
static_assert(
std::is_same_v<decltype(module.procedures),
std::map<std::string, mgp_proc, std::less<>>>,
std::is_same_v<decltype(module->Procedures()),
const std::map<std::string, mgp_proc, std::less<>> *>,
"Expected module procedures to be sorted by name");
for (const auto &[proc_name, proc] : module.procedures) {
for (const auto &[proc_name, proc] : *module->Procedures()) {
auto *record = mgp_result_new_record(result);
if (!record) {
mgp_result_set_error_msg(result, "Not enough memory!");
@ -220,15 +126,171 @@ void RegisterMgProcedures(
mgp_proc procedures("procedures", procedures_cb, utils::NewDeleteResource());
mgp_proc_add_result(&procedures, "name", mgp_type_string());
mgp_proc_add_result(&procedures, "signature", mgp_type_string());
module->procedures.emplace("procedures", std::move(procedures));
module->AddProcedure("procedures", std::move(procedures));
}
} // namespace
Module::~Module() {}
BuiltinModule::BuiltinModule() {}
BuiltinModule::~BuiltinModule() {}
bool BuiltinModule::Reload() { return true; }
bool BuiltinModule::Close() { return true; }
const std::map<std::string, mgp_proc, std::less<>> *BuiltinModule::Procedures()
const {
return &procedures_;
}
void BuiltinModule::AddProcedure(std::string_view name, mgp_proc proc) {
procedures_.emplace(name, std::move(proc));
}
SharedLibraryModule::SharedLibraryModule() : handle_(nullptr) {}
SharedLibraryModule::~SharedLibraryModule() {
if (handle_) Close();
}
bool SharedLibraryModule::Load(std::filesystem::path file_path) {
CHECK(!handle_) << "Attempting to load an already loaded module...";
LOG(INFO) << "Loading module " << file_path << " ...";
file_path_ = file_path;
dlerror(); // Clear any existing error.
handle_ = dlopen(file_path.c_str(), RTLD_NOW | RTLD_LOCAL);
if (!handle_) {
LOG(ERROR) << "Unable to load module " << file_path << "; " << dlerror();
return false;
}
// Get required mgp_init_module
init_fn_ = reinterpret_cast<int (*)(mgp_module *, mgp_memory *)>(
dlsym(handle_, "mgp_init_module"));
const char *error = dlerror();
if (!init_fn_ || error) {
LOG(ERROR) << "Unable to load module " << file_path << "; " << error;
dlclose(handle_);
handle_ = nullptr;
return false;
}
// We probably don't need more than 256KB for module initialazation.
constexpr size_t stack_bytes = 256 * 1024;
unsigned char stack_memory[stack_bytes];
utils::MonotonicBufferResource monotonic_memory(stack_memory, stack_bytes);
mgp_memory memory{&monotonic_memory};
mgp_module module_def{memory.impl};
// Run mgp_init_module which must succeed.
int init_res = init_fn_(&module_def, &memory);
if (init_res != 0) {
LOG(ERROR) << "Unable to load module " << file_path
<< "; mgp_init_module returned " << init_res;
dlclose(handle_);
handle_ = nullptr;
return false;
}
// Copy procedures into our memory.
for (const auto &proc : module_def.procedures)
procedures_.emplace(proc);
// Get optional mgp_shutdown_module
shutdown_fn_ =
reinterpret_cast<int (*)()>(dlsym(handle_, "mgp_shutdown_module"));
error = dlerror();
if (error)
LOG(WARNING) << "When loading module " << file_path << "; " << error;
LOG(INFO) << "Loaded module " << file_path;
return true;
}
bool SharedLibraryModule::Close() {
CHECK(handle_) << "Attempting to close a module that has not been loaded...";
LOG(INFO) << "Closing module " << file_path_ << " ...";
// non-existent shutdown function is semantically the same as a shutdown
// function that does nothing.
int shutdown_res = 0;
if (shutdown_fn_) shutdown_res = shutdown_fn_();
if (shutdown_res != 0) {
LOG(WARNING) << "When closing module " << file_path_
<< "; mgp_shutdown_module returned " << shutdown_res;
}
if (dlclose(handle_) != 0) {
LOG(ERROR) << "Failed to close module " << file_path_ << "; "
<< dlerror();
return false;
}
LOG(INFO) << "Closed module " << file_path_;
handle_ = nullptr;
procedures_.clear();
return true;
}
bool SharedLibraryModule::Reload() {
CHECK(handle_) << "Attempting to reload a module that has not been loaded...";
LOG(INFO) << "Reloading module " << file_path_ << " ...";
if (!Close()) return false;
return Load(file_path_);
}
const std::map<std::string, mgp_proc, std::less<>>
*SharedLibraryModule::Procedures() const {
CHECK(handle_) << "Attempting to access procedures of a module that has not "
"been loaded...";
return &procedures_;
}
PythonModule::PythonModule() {}
PythonModule::~PythonModule() {}
bool PythonModule::Load(std::filesystem::path file_path) {
LOG(INFO) << "Loading module " << file_path << " ...";
auto gil = py::EnsureGIL();
auto *py_path = PySys_GetObject("path");
CHECK(py_path);
py::Object import_dir(PyUnicode_FromString(file_path.parent_path().c_str()));
int import_dir_in_path = PySequence_Contains(py_path, import_dir);
if (import_dir_in_path == -1) {
LOG(ERROR) << "Unexpected error when loading module " << file_path;
return false;
}
if (import_dir_in_path == 0) {
if (PyList_Append(py_path, import_dir) != 0) {
auto exc_info = py::FetchError().value();
LOG(ERROR) << "Unable to load module " << file_path << "; " << exc_info;
return false;
}
}
py::Object py_module(PyImport_ImportModule(file_path.stem().c_str()));
if (!py_module) {
auto exc_info = py::FetchError().value();
LOG(ERROR) << "Unable to load module " << file_path << "; " << exc_info;
return false;
}
// TODO: Actually create a module
return false;
}
bool PythonModule::Close() {
//TODO: implement
return false;
}
bool PythonModule::Reload() {
//TODO: implement
return false;
}
const std::map<std::string, mgp_proc, std::less<>> *PythonModule::Procedures()
const {
return nullptr;
}
ModuleRegistry::ModuleRegistry() {
Module module{.handle = nullptr};
RegisterMgProcedures(&modules_, &module);
RegisterMgReload(this, &lock_, &module);
auto module = std::make_unique<BuiltinModule>();
RegisterMgProcedures(&modules_, module.get());
RegisterMgReload(this, &lock_, module.get());
modules_.emplace("mg", std::move(module));
}
@ -239,17 +301,20 @@ bool ModuleRegistry::LoadModuleLibrary(std::filesystem::path path) {
LOG(ERROR) << "Unable to overwrite an already loaded module " << path;
return false;
}
std::optional<Module> maybe_module;
if (path.extension() == ".so") {
maybe_module = LoadModuleFromSharedLibrary(path);
auto module = std::make_unique<SharedLibraryModule>();
bool loaded = module->Load(path);
if (!loaded) return false;
modules_[module_name] = std::move(module);
} else if (path.extension() == ".py") {
maybe_module = LoadModuleFromPythonFile(path);
auto module = std::make_unique<PythonModule>();
bool loaded = module->Load(path);
if (!loaded) return false;
modules_[module_name] = std::move(module);
} else {
LOG(ERROR) << "Unkown query module file " << path;
return false;
}
if (!maybe_module) return false;
modules_[module_name] = std::move(*maybe_module);
return true;
}
@ -257,7 +322,7 @@ ModulePtr ModuleRegistry::GetModuleNamed(const std::string_view &name) const {
std::shared_lock<utils::RWLock> guard(lock_);
auto found_it = modules_.find(name);
if (found_it == modules_.end()) return nullptr;
return ModulePtr(&found_it->second, std::move(guard));
return ModulePtr(found_it->second.get(), std::move(guard));
}
bool ModuleRegistry::ReloadModuleNamed(const std::string_view &name) {
@ -269,44 +334,27 @@ bool ModuleRegistry::ReloadModuleNamed(const std::string_view &name) {
return false;
}
auto &module = found_it->second;
if (IsBuiltinModule(module)) return true;
if (!CloseModule(&module)) {
if (!module->Reload()) {
modules_.erase(found_it);
return false;
}
auto maybe_module = LoadModuleFromSharedLibrary(module.file_path);
if (!maybe_module) {
modules_.erase(found_it);
return false;
}
module = std::move(*maybe_module);
return true;
}
bool ModuleRegistry::ReloadAllModules() {
std::unique_lock<utils::RWLock> guard(lock_);
for (auto &[name, module] : modules_) {
if (IsBuiltinModule(module)) continue;
if (!CloseModule(&module)) {
if (!module->Reload()) {
modules_.erase(name);
return false;
}
auto maybe_module = LoadModuleFromSharedLibrary(module.file_path);
if (!maybe_module) {
modules_.erase(name);
return false;
}
module = std::move(*maybe_module);
}
return true;
}
void ModuleRegistry::UnloadAllModules() {
std::unique_lock<utils::RWLock> guard(lock_);
for (auto &name_and_module : modules_) {
if (IsBuiltinModule(name_and_module.second)) continue;
CloseModule(&name_and_module.second);
}
// This is correct because the destructor will close each module.
modules_.clear();
}
@ -324,8 +372,9 @@ std::optional<std::pair<procedure::ModulePtr, const mgp_proc *>> FindProcedure(
const auto &proc_name = name_parts.back();
auto module = module_registry.GetModuleNamed(module_name);
if (!module) return std::nullopt;
const auto &proc_it = module->procedures.find(proc_name);
if (proc_it == module->procedures.end()) return std::nullopt;
const auto procedures = module->Procedures();
const auto &proc_it = procedures->find(proc_name);
if (proc_it == procedures->end()) return std::nullopt;
return std::make_pair(std::move(module), &proc_it->second);
}

View File

@ -16,17 +16,97 @@
namespace query::procedure {
struct Module final {
/// Path as requested for loading the module from a library.
std::filesystem::path file_path;
/// System handle to shared library.
void *handle;
/// Required initialization function called on module load.
std::function<int(mgp_module *, mgp_memory *)> init_fn;
/// Optional shutdown function called on module unload.
std::function<int()> shutdown_fn;
class Module {
public:
Module() {}
virtual ~Module();
Module(const Module &) = delete;
Module(Module &&) = delete;
Module &operator=(const Module &) = delete;
Module &operator=(Module &&) = delete;
/// Invokes the (optional) shutdown function and closes the module.
virtual bool Close() = 0;
/// Reloads the module.
virtual bool Reload() = 0;
/// Returns registered procedures of this module
virtual const std::map<std::string, mgp_proc, std::less<>> *Procedures()
const = 0;
};
class BuiltinModule final : public Module {
public:
BuiltinModule();
~BuiltinModule() override;
BuiltinModule(const BuiltinModule &) = delete;
BuiltinModule(BuiltinModule &&) = delete;
BuiltinModule &operator=(const BuiltinModule &) = delete;
BuiltinModule &operator=(BuiltinModule &&) = delete;
bool Close() override;
bool Reload() override;
const std::map<std::string, mgp_proc, std::less<>> *Procedures()
const override;
void AddProcedure(std::string_view name, mgp_proc proc);
private:
/// Registered procedures
std::map<std::string, mgp_proc, std::less<>> procedures;
std::map<std::string, mgp_proc, std::less<>> procedures_;
};
class SharedLibraryModule final : public Module {
public:
SharedLibraryModule();
~SharedLibraryModule() override;
SharedLibraryModule(const SharedLibraryModule &) = delete;
SharedLibraryModule(SharedLibraryModule &&) = delete;
SharedLibraryModule &operator=(const SharedLibraryModule &) = delete;
SharedLibraryModule &operator=(SharedLibraryModule &&) = delete;
bool Load(std::filesystem::path file_path);
bool Close() override;
bool Reload() override;
const std::map<std::string, mgp_proc, std::less<>> *Procedures()
const override;
private:
/// Path as requested for loading the module from a library.
std::filesystem::path file_path_;
/// System handle to shared library.
void *handle_;
/// Required initialization function called on module load.
std::function<int(mgp_module *, mgp_memory *)> init_fn_;
/// Optional shutdown function called on module unload.
std::function<int()> shutdown_fn_;
/// Registered procedures
std::map<std::string, mgp_proc, std::less<>> procedures_;
};
class PythonModule final : public Module {
public:
PythonModule();
~PythonModule() override;
PythonModule(const PythonModule &) = delete;
PythonModule(PythonModule &&) = delete;
PythonModule &operator=(const PythonModule &) = delete;
PythonModule &operator=(PythonModule &&) = delete;
bool Load(std::filesystem::path file_path);
bool Close() override;
bool Reload() override;
const std::map<std::string, mgp_proc, std::less<>> *Procedures()
const override;
};
/// Proxy for a registered Module, acquires a read lock from ModuleRegistry.
@ -48,7 +128,7 @@ class ModulePtr final {
/// Thread-safe registration of modules from libraries, uses utils::RWLock.
class ModuleRegistry final {
std::map<std::string, Module, std::less<>> modules_;
std::map<std::string, std::unique_ptr<Module>, std::less<>> modules_;
mutable utils::RWLock lock_{utils::RWLock::Priority::WRITE};
public: