Add support for rescanning query modules

Reviewers: mferencevic, buda

Reviewed By: mferencevic, buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2765
This commit is contained in:
Lovro Lugovic 2020-04-22 18:07:17 +02:00
parent 7d9f741ceb
commit 0c42bedf2f
5 changed files with 160 additions and 149 deletions

View File

@ -950,6 +950,7 @@ int main(int argc, char **argv) {
}
storage::Storage db(db_config);
query::InterpreterContext interpreter_context{&db};
query::SetExecutionTimeout(&interpreter_context,
FLAGS_query_execution_timeout_sec);
#ifdef MG_ENTERPRISE
@ -958,15 +959,9 @@ int main(int argc, char **argv) {
SessionData session_data{&db, &interpreter_context};
#endif
// Register modules
if (!FLAGS_query_modules_directory.empty()) {
for (const auto &entry :
std::filesystem::directory_iterator(FLAGS_query_modules_directory)) {
if (entry.is_regular_file())
query::procedure::gModuleRegistry.LoadModuleLibrary(entry.path());
}
}
// Register modules END
query::procedure::gModuleRegistry.SetModulesDirectory(
FLAGS_query_modules_directory);
query::procedure::gModuleRegistry.UnloadAndLoadModulesFromDirectory();
#ifdef MG_ENTERPRISE
AuthQueryHandler auth_handler(&auth,

View File

@ -32,7 +32,7 @@ set(mg_query_sources
add_library(mg-query STATIC ${mg_query_sources})
add_dependencies(mg-query generate_lcp_query)
target_include_directories(mg-query PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(mg-query dl cppitertools)
target_link_libraries(mg-query mg-storage-v2)
find_package(Python3 3.5 REQUIRED COMPONENTS Development)

View File

@ -8,6 +8,7 @@ extern "C" {
#include "py/py.hpp"
#include "query/procedure/py_module.hpp"
#include "utils/file.hpp"
#include "utils/pmr/vector.hpp"
#include "utils/string.hpp"
@ -28,8 +29,6 @@ class BuiltinModule final : public Module {
bool Close() override;
bool Reload() override;
const std::map<std::string, mgp_proc, std::less<>> *Procedures()
const override;
@ -44,8 +43,6 @@ BuiltinModule::BuiltinModule() {}
BuiltinModule::~BuiltinModule() {}
bool BuiltinModule::Reload() { return true; }
bool BuiltinModule::Close() { return true; }
const std::map<std::string, mgp_proc, std::less<>> *BuiltinModule::Procedures()
@ -59,12 +56,12 @@ void BuiltinModule::AddProcedure(std::string_view name, mgp_proc proc) {
namespace {
void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock,
BuiltinModule *module) {
// Reloading relies on the fact that regular procedure invocation through
void RegisterMgLoad(ModuleRegistry *module_registry, utils::RWLock *lock,
BuiltinModule *module) {
// Loading relies on the fact that regular procedure invocation through
// CallProcedureCursor::Pull takes ModuleRegistry::lock_ with READ access. To
// reload modules we have to upgrade our READ access to WRITE access,
// therefore we release the READ lock and invoke the reload function which
// load modules we have to upgrade our READ access to WRITE access,
// therefore we release the READ lock and invoke the load function which
// takes the WRITE lock. Obviously, some other thread may take a READ or WRITE
// lock during our transition when we hold no such lock. In this case it is
// fine, because our builtin module cannot be unloaded and we are ok with
@ -73,10 +70,10 @@ void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock,
// single thread may only take either a READ or a WRITE lock, it's not
// possible for a thread to hold both. If a thread tries to do that, it will
// deadlock immediately (no other thread needs to do anything).
auto with_unlock_shared = [lock](const auto &reload_function) {
auto with_unlock_shared = [lock](const auto &load_function) {
lock->unlock_shared();
try {
reload_function();
load_function();
// There's no finally in C++, but we have to return our original READ lock
// state in any possible case.
} catch (...) {
@ -85,32 +82,30 @@ void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock,
}
lock->lock_shared();
};
auto reload_all_cb = [module_registry, with_unlock_shared](
const mgp_list *, const mgp_graph *, mgp_result *res,
mgp_memory *) {
bool succ = false;
with_unlock_shared([&]() { succ = module_registry->ReloadAllModules(); });
if (!succ) mgp_result_set_error_msg(res, "Failed to reload all modules.");
auto load_all_cb = [module_registry, with_unlock_shared](
const mgp_list *, const mgp_graph *, mgp_result *,
mgp_memory *) {
with_unlock_shared(
[&]() { module_registry->UnloadAndLoadModulesFromDirectory(); });
};
mgp_proc reload_all("reload_all", reload_all_cb, utils::NewDeleteResource());
module->AddProcedure("reload_all", std::move(reload_all));
auto reload_cb = [module_registry, with_unlock_shared](
const mgp_list *args, const mgp_graph *, mgp_result *res,
mgp_memory *) {
mgp_proc load_all("load_all", load_all_cb, utils::NewDeleteResource());
module->AddProcedure("load_all", std::move(load_all));
auto load_cb = [module_registry, with_unlock_shared](
const mgp_list *args, const mgp_graph *, mgp_result *res,
mgp_memory *) {
CHECK(mgp_list_size(args) == 1U) << "Should have been type checked already";
const mgp_value *arg = mgp_list_at(args, 0);
CHECK(mgp_value_is_string(arg)) << "Should have been type checked already";
bool succ = false;
with_unlock_shared([&]() {
succ = module_registry->ReloadModuleNamed(mgp_value_get_string(arg));
succ = module_registry->LoadOrReloadModuleFromName(
mgp_value_get_string(arg));
});
if (!succ)
mgp_result_set_error_msg(
res, "Failed to reload the module; it is no longer loaded.");
if (!succ) mgp_result_set_error_msg(res, "Failed to (re)load the module.");
};
mgp_proc reload("reload", reload_cb, utils::NewDeleteResource());
mgp_proc_add_arg(&reload, "module_name", mgp_type_string());
module->AddProcedure("reload", std::move(reload));
mgp_proc load("load", load_cb, utils::NewDeleteResource());
mgp_proc_add_arg(&load, "module_name", mgp_type_string());
module->AddProcedure("load", std::move(load));
}
void RegisterMgProcedures(
@ -204,12 +199,10 @@ class SharedLibraryModule final : public Module {
SharedLibraryModule &operator=(const SharedLibraryModule &) = delete;
SharedLibraryModule &operator=(SharedLibraryModule &&) = delete;
bool Load(std::filesystem::path file_path);
bool Load(const std::filesystem::path &file_path);
bool Close() override;
bool Reload() override;
const std::map<std::string, mgp_proc, std::less<>> *Procedures()
const override;
@ -232,7 +225,7 @@ SharedLibraryModule::~SharedLibraryModule() {
if (handle_) Close();
}
bool SharedLibraryModule::Load(std::filesystem::path file_path) {
bool SharedLibraryModule::Load(const std::filesystem::path &file_path) {
CHECK(!handle_) << "Attempting to load an already loaded module...";
LOG(INFO) << "Loading module " << file_path << " ...";
file_path_ = file_path;
@ -289,8 +282,7 @@ bool SharedLibraryModule::Close() {
<< "; mgp_shutdown_module returned " << shutdown_res;
}
if (dlclose(handle_) != 0) {
LOG(ERROR) << "Failed to close module " << file_path_ << "; "
<< dlerror();
LOG(ERROR) << "Failed to close module " << file_path_ << "; " << dlerror();
return false;
}
LOG(INFO) << "Closed module " << file_path_;
@ -299,12 +291,6 @@ bool SharedLibraryModule::Close() {
return true;
}
bool SharedLibraryModule::Reload() {
CHECK(handle_) << "Attempting to reload a module that has not been loaded...";
if (!Close()) return false;
return Load(file_path_);
}
const std::map<std::string, mgp_proc, std::less<>>
*SharedLibraryModule::Procedures() const {
CHECK(handle_) << "Attempting to access procedures of a module that has not "
@ -321,12 +307,10 @@ class PythonModule final : public Module {
PythonModule &operator=(const PythonModule &) = delete;
PythonModule &operator=(PythonModule &&) = delete;
bool Load(std::filesystem::path file_path);
bool Load(const std::filesystem::path &file_path);
bool Close() override;
bool Reload() override;
const std::map<std::string, mgp_proc, std::less<>> *Procedures()
const override;
@ -342,7 +326,7 @@ PythonModule::~PythonModule() {
if (py_module_) Close();
}
bool PythonModule::Load(std::filesystem::path file_path) {
bool PythonModule::Load(const std::filesystem::path &file_path) {
CHECK(!py_module_) << "Attempting to load an already loaded module...";
LOG(INFO) << "Loading module " << file_path << " ...";
file_path_ = file_path;
@ -388,13 +372,6 @@ bool PythonModule::Close() {
return true;
}
bool PythonModule::Reload() {
CHECK(py_module_)
<< "Attempting to reload a module that has not been loaded...";
if (!Close()) return false;
return Load(file_path_);
}
const std::map<std::string, mgp_proc, std::less<>> *PythonModule::Procedures()
const {
CHECK(py_module_) << "Attempting to access procedures of a module that has "
@ -402,40 +379,105 @@ const std::map<std::string, mgp_proc, std::less<>> *PythonModule::Procedures()
return &procedures_;
}
ModuleRegistry::ModuleRegistry() {
auto module = std::make_unique<BuiltinModule>();
RegisterMgProcedures(&modules_, module.get());
RegisterMgReload(this, &lock_, module.get());
namespace {
std::unique_ptr<Module> LoadModuleFromFile(const std::filesystem::path &path) {
const auto &ext = path.extension();
if (ext != ".so" && ext != ".py") {
LOG(WARNING) << "Unknown query module file " << path;
return nullptr;
}
std::unique_ptr<Module> module;
if (path.extension() == ".so") {
auto lib_module = std::make_unique<SharedLibraryModule>();
if (!lib_module->Load(path)) return nullptr;
module = std::move(lib_module);
} else if (path.extension() == ".py") {
auto py_module = std::make_unique<PythonModule>();
if (!py_module->Load(path)) return nullptr;
module = std::move(py_module);
}
return module;
}
} // namespace
bool ModuleRegistry::RegisterModule(const std::string_view &name,
std::unique_ptr<Module> module) {
CHECK(!name.empty()) << "Module name cannot be empty";
CHECK(module) << "Tried to register an invalid module";
if (modules_.find(name) != modules_.end()) {
LOG(ERROR) << "Unable to overwrite an already loaded module " << name;
return false;
}
modules_.emplace(name, std::move(module));
return true;
}
void ModuleRegistry::DoUnloadAllModules() {
CHECK(modules_.find("mg") != modules_.end())
<< "Expected the builtin \"mg\" module to be present.";
// This is correct because the destructor will close each module. However,
// we don't want to unload the builtin "mg" module.
auto module = std::move(modules_["mg"]);
modules_.clear();
modules_.emplace("mg", std::move(module));
}
bool ModuleRegistry::LoadModuleLibrary(std::filesystem::path path) {
ModuleRegistry::ModuleRegistry() {
auto module = std::make_unique<BuiltinModule>();
RegisterMgProcedures(&modules_, module.get());
RegisterMgLoad(this, &lock_, module.get());
modules_.emplace("mg", std::move(module));
}
void ModuleRegistry::SetModulesDirectory(
const std::filesystem::path &modules_dir) {
modules_dir_ = modules_dir;
}
bool ModuleRegistry::LoadOrReloadModuleFromName(const std::string_view &name) {
if (name.empty()) return false;
std::unique_lock<utils::RWLock> guard(lock_);
std::string module_name(path.stem());
if (path.extension() != ".so" && path.extension() != ".py") {
LOG(WARNING) << "Unknown query module file " << path;
auto found_it = modules_.find(name);
if (found_it != modules_.end()) {
if (!found_it->second->Close()) {
LOG(WARNING) << "Failed to close module " << found_it->first;
}
modules_.erase(found_it);
}
if (!utils::DirExists(modules_dir_)) {
LOG(ERROR) << "Module directory " << modules_dir_ << " doesn't exist";
return false;
}
if (modules_.find(module_name) != modules_.end()) {
LOG(ERROR) << "Unable to overwrite an already loaded module " << path;
return false;
for (const auto &entry : std::filesystem::directory_iterator(modules_dir_)) {
const auto &path = entry.path();
if (entry.is_regular_file() && path.stem() == name) {
auto module = LoadModuleFromFile(path);
if (!module) return false;
return RegisterModule(name, std::move(module));
}
}
if (path.extension() == ".so") {
auto module = std::make_unique<SharedLibraryModule>();
bool loaded = module->Load(path);
if (!loaded) return false;
modules_[module_name] = std::move(module);
} else if (path.extension() == ".py") {
auto module = std::make_unique<PythonModule>();
bool loaded = module->Load(path);
if (!loaded) return false;
modules_[module_name] = std::move(module);
} else {
LOG(FATAL) << "Unknown query module extension '" << path.extension()
<< "' from file " << path;
return false;
return false;
}
void ModuleRegistry::UnloadAndLoadModulesFromDirectory() {
if (!utils::DirExists(modules_dir_)) {
LOG(ERROR) << "Module directory " << modules_dir_ << " doesn't exist";
return;
}
std::unique_lock<utils::RWLock> guard(lock_);
DoUnloadAllModules();
for (const auto &entry : std::filesystem::directory_iterator(modules_dir_)) {
const auto &path = entry.path();
if (entry.is_regular_file()) {
std::string name = path.stem();
if (name.empty()) continue;
auto module = LoadModuleFromFile(path);
if (!module) continue;
RegisterModule(name, std::move(module));
}
}
return true;
}
ModulePtr ModuleRegistry::GetModuleNamed(const std::string_view &name) const {
@ -445,41 +487,9 @@ ModulePtr ModuleRegistry::GetModuleNamed(const std::string_view &name) const {
return ModulePtr(found_it->second.get(), std::move(guard));
}
bool ModuleRegistry::ReloadModuleNamed(const std::string_view &name) {
std::unique_lock<utils::RWLock> guard(lock_);
auto found_it = modules_.find(name);
if (found_it == modules_.end()) {
LOG(ERROR) << "Trying to reload module '" << name
<< "' which is not loaded.";
return false;
}
auto &module = found_it->second;
LOG(INFO) << "Reloading module '" << name << "' ...";
if (!module->Reload()) {
modules_.erase(found_it);
return false;
}
LOG(INFO) << "Reloaded module '" << name << "'";
return true;
}
bool ModuleRegistry::ReloadAllModules() {
std::unique_lock<utils::RWLock> guard(lock_);
for (auto &[name, module] : modules_) {
LOG(INFO) << "Reloading module '" << name << "' ...";
if (!module->Reload()) {
modules_.erase(name);
return false;
}
LOG(INFO) << "Reloaded module '" << name << "'";
}
return true;
}
void ModuleRegistry::UnloadAllModules() {
std::unique_lock<utils::RWLock> guard(lock_);
// This is correct because the destructor will close each module.
modules_.clear();
DoUnloadAllModules();
}
std::optional<std::pair<procedure::ModulePtr, const mgp_proc *>> FindProcedure(

View File

@ -28,9 +28,6 @@ class Module {
/// Invokes the (optional) shutdown function and closes the module.
virtual bool Close() = 0;
/// Reloads the module.
virtual bool Reload() = 0;
/// Returns registered procedures of this module
virtual const std::map<std::string, mgp_proc, std::less<>> *Procedures()
const = 0;
@ -58,37 +55,46 @@ class ModuleRegistry final {
std::map<std::string, std::unique_ptr<Module>, std::less<>> modules_;
mutable utils::RWLock lock_{utils::RWLock::Priority::WRITE};
bool RegisterModule(const std::string_view &name,
std::unique_ptr<Module> module);
void DoUnloadAllModules();
public:
ModuleRegistry();
/// Load a module from the given path and return true if successful.
/// Set the modules directory that will be used when (re)loading modules.
void SetModulesDirectory(const std::filesystem::path &modules_dir);
/// Atomically load or reload a module with a particular name from the given
/// directory.
///
/// A write lock is taken during the execution of this method. Loading a
/// module is done through `dlopen` facility and path is resolved accordingly.
/// The module is registered using the filename part of the path, with the
/// extension removed. If a module with the same name already exists, the
/// function does nothing.
bool LoadModuleLibrary(std::filesystem::path path);
/// Takes a write lock. If the module exists it is reloaded. Otherwise, the
/// module is loaded from the file whose filename, without the extension,
/// matches the module's name. If multiple such files exist, only one is
/// chosen, in an unspecified manner. If loading of the chosen file fails, no
/// other files are tried.
///
/// Return true if the module was loaded or reloaded successfully, false
/// otherwise.
bool LoadOrReloadModuleFromName(const std::string_view &name);
/// Atomically unload all modules and then load all possible modules from the
/// given directory.
///
/// Takes a write lock.
void UnloadAndLoadModulesFromDirectory();
/// Find a module with given name or return nullptr.
/// Takes a read lock.
ModulePtr GetModuleNamed(const std::string_view &name) const;
/// Reload a module with given name and return true if successful.
/// Takes a write lock. Builtin modules cannot be reloaded, though true will
/// be returned if you try to do so. If false was returned, then the module is
/// no longer registered.
bool ReloadModuleNamed(const std::string_view &name);
/// Reload all loaded (non-builtin) modules and return true if successful.
/// Takes a write lock. If false was returned, the module which failed to
/// reload is no longer registered. Remaining modules may or may not be
/// reloaded, but are valid and registered.
bool ReloadAllModules();
/// Remove all loaded (non-builtin) modules.
/// Takes a write lock.
void UnloadAllModules();
private:
std::filesystem::path modules_dir_;
};
/// Single, global module registry.

View File

@ -2963,14 +2963,14 @@ TEST_P(CypherMainVisitorTest, CallYieldAsteriskReturnAsterisk) {
TEST_P(CypherMainVisitorTest, CallWithoutYield) {
auto &ast_generator = *GetParam();
auto *query = dynamic_cast<CypherQuery *>(
ast_generator.ParseQuery("CALL mg.reload_all()"));
ast_generator.ParseQuery("CALL mg.load_all()"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 1U);
auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]);
ASSERT_TRUE(call_proc);
ASSERT_EQ(call_proc->procedure_name_, "mg.reload_all");
ASSERT_EQ(call_proc->procedure_name_, "mg.load_all");
ASSERT_TRUE(call_proc->arguments_.empty());
ASSERT_TRUE(call_proc->result_fields_.empty());
ASSERT_TRUE(call_proc->result_identifiers_.empty());
@ -2980,14 +2980,14 @@ TEST_P(CypherMainVisitorTest, CallWithoutYield) {
TEST_P(CypherMainVisitorTest, CallWithMemoryLimitWithoutYield) {
auto &ast_generator = *GetParam();
auto *query = dynamic_cast<CypherQuery *>(
ast_generator.ParseQuery("CALL mg.reload_all() MEMORY LIMIT 32 KB"));
ast_generator.ParseQuery("CALL mg.load_all() MEMORY LIMIT 32 KB"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 1U);
auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]);
ASSERT_TRUE(call_proc);
ASSERT_EQ(call_proc->procedure_name_, "mg.reload_all");
ASSERT_EQ(call_proc->procedure_name_, "mg.load_all");
ASSERT_TRUE(call_proc->arguments_.empty());
ASSERT_TRUE(call_proc->result_fields_.empty());
ASSERT_TRUE(call_proc->result_identifiers_.empty());
@ -2998,14 +2998,14 @@ TEST_P(CypherMainVisitorTest, CallWithMemoryLimitWithoutYield) {
TEST_P(CypherMainVisitorTest, CallWithMemoryUnlimitedWithoutYield) {
auto &ast_generator = *GetParam();
auto *query = dynamic_cast<CypherQuery *>(
ast_generator.ParseQuery("CALL mg.reload_all() MEMORY UNLIMITED"));
ast_generator.ParseQuery("CALL mg.load_all() MEMORY UNLIMITED"));
ASSERT_TRUE(query);
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 1U);
auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]);
ASSERT_TRUE(call_proc);
ASSERT_EQ(call_proc->procedure_name_, "mg.reload_all");
ASSERT_EQ(call_proc->procedure_name_, "mg.load_all");
ASSERT_TRUE(call_proc->arguments_.empty());
ASSERT_TRUE(call_proc->result_fields_.empty());
ASSERT_TRUE(call_proc->result_identifiers_.empty());