From 0c42bedf2f3c1c60d81238350aea4c36dff116cd Mon Sep 17 00:00:00 2001 From: Lovro Lugovic <lovro.lugovic@memgraph.io> Date: Wed, 22 Apr 2020 18:07:17 +0200 Subject: [PATCH] Add support for rescanning query modules Reviewers: mferencevic, buda Reviewed By: mferencevic, buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2765 --- src/memgraph.cpp | 13 +- src/query/CMakeLists.txt | 2 +- src/query/procedure/module.cpp | 232 +++++++++++++++-------------- src/query/procedure/module.hpp | 50 ++++--- tests/unit/cypher_main_visitor.cpp | 12 +- 5 files changed, 160 insertions(+), 149 deletions(-) diff --git a/src/memgraph.cpp b/src/memgraph.cpp index f49c9d317..b452c934b 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -950,6 +950,7 @@ int main(int argc, char **argv) { } storage::Storage db(db_config); query::InterpreterContext interpreter_context{&db}; + query::SetExecutionTimeout(&interpreter_context, FLAGS_query_execution_timeout_sec); #ifdef MG_ENTERPRISE @@ -958,15 +959,9 @@ int main(int argc, char **argv) { SessionData session_data{&db, &interpreter_context}; #endif - // Register modules - if (!FLAGS_query_modules_directory.empty()) { - for (const auto &entry : - std::filesystem::directory_iterator(FLAGS_query_modules_directory)) { - if (entry.is_regular_file()) - query::procedure::gModuleRegistry.LoadModuleLibrary(entry.path()); - } - } - // Register modules END + query::procedure::gModuleRegistry.SetModulesDirectory( + FLAGS_query_modules_directory); + query::procedure::gModuleRegistry.UnloadAndLoadModulesFromDirectory(); #ifdef MG_ENTERPRISE AuthQueryHandler auth_handler(&auth, diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index eb1b26fc9..6cf555b74 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -32,7 +32,7 @@ set(mg_query_sources add_library(mg-query STATIC ${mg_query_sources}) add_dependencies(mg-query generate_lcp_query) -target_include_directories(mg-query PRIVATE ${CMAKE_SOURCE_DIR}/include) +target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include) target_link_libraries(mg-query dl cppitertools) target_link_libraries(mg-query mg-storage-v2) find_package(Python3 3.5 REQUIRED COMPONENTS Development) diff --git a/src/query/procedure/module.cpp b/src/query/procedure/module.cpp index 4b18a4993..f6bcb1345 100644 --- a/src/query/procedure/module.cpp +++ b/src/query/procedure/module.cpp @@ -8,6 +8,7 @@ extern "C" { #include "py/py.hpp" #include "query/procedure/py_module.hpp" +#include "utils/file.hpp" #include "utils/pmr/vector.hpp" #include "utils/string.hpp" @@ -28,8 +29,6 @@ class BuiltinModule final : public Module { bool Close() override; - bool Reload() override; - const std::map<std::string, mgp_proc, std::less<>> *Procedures() const override; @@ -44,8 +43,6 @@ BuiltinModule::BuiltinModule() {} BuiltinModule::~BuiltinModule() {} -bool BuiltinModule::Reload() { return true; } - bool BuiltinModule::Close() { return true; } const std::map<std::string, mgp_proc, std::less<>> *BuiltinModule::Procedures() @@ -59,12 +56,12 @@ void BuiltinModule::AddProcedure(std::string_view name, mgp_proc proc) { namespace { -void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock, - BuiltinModule *module) { - // Reloading relies on the fact that regular procedure invocation through +void RegisterMgLoad(ModuleRegistry *module_registry, utils::RWLock *lock, + BuiltinModule *module) { + // Loading relies on the fact that regular procedure invocation through // CallProcedureCursor::Pull takes ModuleRegistry::lock_ with READ access. To - // reload modules we have to upgrade our READ access to WRITE access, - // therefore we release the READ lock and invoke the reload function which + // load modules we have to upgrade our READ access to WRITE access, + // therefore we release the READ lock and invoke the load function which // takes the WRITE lock. Obviously, some other thread may take a READ or WRITE // lock during our transition when we hold no such lock. In this case it is // fine, because our builtin module cannot be unloaded and we are ok with @@ -73,10 +70,10 @@ void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock, // single thread may only take either a READ or a WRITE lock, it's not // possible for a thread to hold both. If a thread tries to do that, it will // deadlock immediately (no other thread needs to do anything). - auto with_unlock_shared = [lock](const auto &reload_function) { + auto with_unlock_shared = [lock](const auto &load_function) { lock->unlock_shared(); try { - reload_function(); + load_function(); // There's no finally in C++, but we have to return our original READ lock // state in any possible case. } catch (...) { @@ -85,32 +82,30 @@ void RegisterMgReload(ModuleRegistry *module_registry, utils::RWLock *lock, } lock->lock_shared(); }; - auto reload_all_cb = [module_registry, with_unlock_shared]( - const mgp_list *, const mgp_graph *, mgp_result *res, - mgp_memory *) { - bool succ = false; - with_unlock_shared([&]() { succ = module_registry->ReloadAllModules(); }); - if (!succ) mgp_result_set_error_msg(res, "Failed to reload all modules."); + auto load_all_cb = [module_registry, with_unlock_shared]( + const mgp_list *, const mgp_graph *, mgp_result *, + mgp_memory *) { + with_unlock_shared( + [&]() { module_registry->UnloadAndLoadModulesFromDirectory(); }); }; - mgp_proc reload_all("reload_all", reload_all_cb, utils::NewDeleteResource()); - module->AddProcedure("reload_all", std::move(reload_all)); - auto reload_cb = [module_registry, with_unlock_shared]( - const mgp_list *args, const mgp_graph *, mgp_result *res, - mgp_memory *) { + mgp_proc load_all("load_all", load_all_cb, utils::NewDeleteResource()); + module->AddProcedure("load_all", std::move(load_all)); + auto load_cb = [module_registry, with_unlock_shared]( + const mgp_list *args, const mgp_graph *, mgp_result *res, + mgp_memory *) { CHECK(mgp_list_size(args) == 1U) << "Should have been type checked already"; const mgp_value *arg = mgp_list_at(args, 0); CHECK(mgp_value_is_string(arg)) << "Should have been type checked already"; bool succ = false; with_unlock_shared([&]() { - succ = module_registry->ReloadModuleNamed(mgp_value_get_string(arg)); + succ = module_registry->LoadOrReloadModuleFromName( + mgp_value_get_string(arg)); }); - if (!succ) - mgp_result_set_error_msg( - res, "Failed to reload the module; it is no longer loaded."); + if (!succ) mgp_result_set_error_msg(res, "Failed to (re)load the module."); }; - mgp_proc reload("reload", reload_cb, utils::NewDeleteResource()); - mgp_proc_add_arg(&reload, "module_name", mgp_type_string()); - module->AddProcedure("reload", std::move(reload)); + mgp_proc load("load", load_cb, utils::NewDeleteResource()); + mgp_proc_add_arg(&load, "module_name", mgp_type_string()); + module->AddProcedure("load", std::move(load)); } void RegisterMgProcedures( @@ -204,12 +199,10 @@ class SharedLibraryModule final : public Module { SharedLibraryModule &operator=(const SharedLibraryModule &) = delete; SharedLibraryModule &operator=(SharedLibraryModule &&) = delete; - bool Load(std::filesystem::path file_path); + bool Load(const std::filesystem::path &file_path); bool Close() override; - bool Reload() override; - const std::map<std::string, mgp_proc, std::less<>> *Procedures() const override; @@ -232,7 +225,7 @@ SharedLibraryModule::~SharedLibraryModule() { if (handle_) Close(); } -bool SharedLibraryModule::Load(std::filesystem::path file_path) { +bool SharedLibraryModule::Load(const std::filesystem::path &file_path) { CHECK(!handle_) << "Attempting to load an already loaded module..."; LOG(INFO) << "Loading module " << file_path << " ..."; file_path_ = file_path; @@ -289,8 +282,7 @@ bool SharedLibraryModule::Close() { << "; mgp_shutdown_module returned " << shutdown_res; } if (dlclose(handle_) != 0) { - LOG(ERROR) << "Failed to close module " << file_path_ << "; " - << dlerror(); + LOG(ERROR) << "Failed to close module " << file_path_ << "; " << dlerror(); return false; } LOG(INFO) << "Closed module " << file_path_; @@ -299,12 +291,6 @@ bool SharedLibraryModule::Close() { return true; } -bool SharedLibraryModule::Reload() { - CHECK(handle_) << "Attempting to reload a module that has not been loaded..."; - if (!Close()) return false; - return Load(file_path_); -} - const std::map<std::string, mgp_proc, std::less<>> *SharedLibraryModule::Procedures() const { CHECK(handle_) << "Attempting to access procedures of a module that has not " @@ -321,12 +307,10 @@ class PythonModule final : public Module { PythonModule &operator=(const PythonModule &) = delete; PythonModule &operator=(PythonModule &&) = delete; - bool Load(std::filesystem::path file_path); + bool Load(const std::filesystem::path &file_path); bool Close() override; - bool Reload() override; - const std::map<std::string, mgp_proc, std::less<>> *Procedures() const override; @@ -342,7 +326,7 @@ PythonModule::~PythonModule() { if (py_module_) Close(); } -bool PythonModule::Load(std::filesystem::path file_path) { +bool PythonModule::Load(const std::filesystem::path &file_path) { CHECK(!py_module_) << "Attempting to load an already loaded module..."; LOG(INFO) << "Loading module " << file_path << " ..."; file_path_ = file_path; @@ -388,13 +372,6 @@ bool PythonModule::Close() { return true; } -bool PythonModule::Reload() { - CHECK(py_module_) - << "Attempting to reload a module that has not been loaded..."; - if (!Close()) return false; - return Load(file_path_); -} - const std::map<std::string, mgp_proc, std::less<>> *PythonModule::Procedures() const { CHECK(py_module_) << "Attempting to access procedures of a module that has " @@ -402,40 +379,105 @@ const std::map<std::string, mgp_proc, std::less<>> *PythonModule::Procedures() return &procedures_; } -ModuleRegistry::ModuleRegistry() { - auto module = std::make_unique<BuiltinModule>(); - RegisterMgProcedures(&modules_, module.get()); - RegisterMgReload(this, &lock_, module.get()); +namespace { + +std::unique_ptr<Module> LoadModuleFromFile(const std::filesystem::path &path) { + const auto &ext = path.extension(); + if (ext != ".so" && ext != ".py") { + LOG(WARNING) << "Unknown query module file " << path; + return nullptr; + } + std::unique_ptr<Module> module; + if (path.extension() == ".so") { + auto lib_module = std::make_unique<SharedLibraryModule>(); + if (!lib_module->Load(path)) return nullptr; + module = std::move(lib_module); + } else if (path.extension() == ".py") { + auto py_module = std::make_unique<PythonModule>(); + if (!py_module->Load(path)) return nullptr; + module = std::move(py_module); + } + return module; +} + +} // namespace + +bool ModuleRegistry::RegisterModule(const std::string_view &name, + std::unique_ptr<Module> module) { + CHECK(!name.empty()) << "Module name cannot be empty"; + CHECK(module) << "Tried to register an invalid module"; + if (modules_.find(name) != modules_.end()) { + LOG(ERROR) << "Unable to overwrite an already loaded module " << name; + return false; + } + modules_.emplace(name, std::move(module)); + return true; +} + +void ModuleRegistry::DoUnloadAllModules() { + CHECK(modules_.find("mg") != modules_.end()) + << "Expected the builtin \"mg\" module to be present."; + // This is correct because the destructor will close each module. However, + // we don't want to unload the builtin "mg" module. + auto module = std::move(modules_["mg"]); + modules_.clear(); modules_.emplace("mg", std::move(module)); } -bool ModuleRegistry::LoadModuleLibrary(std::filesystem::path path) { +ModuleRegistry::ModuleRegistry() { + auto module = std::make_unique<BuiltinModule>(); + RegisterMgProcedures(&modules_, module.get()); + RegisterMgLoad(this, &lock_, module.get()); + modules_.emplace("mg", std::move(module)); +} + +void ModuleRegistry::SetModulesDirectory( + const std::filesystem::path &modules_dir) { + modules_dir_ = modules_dir; +} + +bool ModuleRegistry::LoadOrReloadModuleFromName(const std::string_view &name) { + if (name.empty()) return false; std::unique_lock<utils::RWLock> guard(lock_); - std::string module_name(path.stem()); - if (path.extension() != ".so" && path.extension() != ".py") { - LOG(WARNING) << "Unknown query module file " << path; + auto found_it = modules_.find(name); + if (found_it != modules_.end()) { + if (!found_it->second->Close()) { + LOG(WARNING) << "Failed to close module " << found_it->first; + } + modules_.erase(found_it); + } + if (!utils::DirExists(modules_dir_)) { + LOG(ERROR) << "Module directory " << modules_dir_ << " doesn't exist"; return false; } - if (modules_.find(module_name) != modules_.end()) { - LOG(ERROR) << "Unable to overwrite an already loaded module " << path; - return false; + for (const auto &entry : std::filesystem::directory_iterator(modules_dir_)) { + const auto &path = entry.path(); + if (entry.is_regular_file() && path.stem() == name) { + auto module = LoadModuleFromFile(path); + if (!module) return false; + return RegisterModule(name, std::move(module)); + } } - if (path.extension() == ".so") { - auto module = std::make_unique<SharedLibraryModule>(); - bool loaded = module->Load(path); - if (!loaded) return false; - modules_[module_name] = std::move(module); - } else if (path.extension() == ".py") { - auto module = std::make_unique<PythonModule>(); - bool loaded = module->Load(path); - if (!loaded) return false; - modules_[module_name] = std::move(module); - } else { - LOG(FATAL) << "Unknown query module extension '" << path.extension() - << "' from file " << path; - return false; + return false; +} + +void ModuleRegistry::UnloadAndLoadModulesFromDirectory() { + if (!utils::DirExists(modules_dir_)) { + LOG(ERROR) << "Module directory " << modules_dir_ << " doesn't exist"; + return; + } + std::unique_lock<utils::RWLock> guard(lock_); + DoUnloadAllModules(); + for (const auto &entry : std::filesystem::directory_iterator(modules_dir_)) { + const auto &path = entry.path(); + if (entry.is_regular_file()) { + std::string name = path.stem(); + if (name.empty()) continue; + auto module = LoadModuleFromFile(path); + if (!module) continue; + RegisterModule(name, std::move(module)); + } } - return true; } ModulePtr ModuleRegistry::GetModuleNamed(const std::string_view &name) const { @@ -445,41 +487,9 @@ ModulePtr ModuleRegistry::GetModuleNamed(const std::string_view &name) const { return ModulePtr(found_it->second.get(), std::move(guard)); } -bool ModuleRegistry::ReloadModuleNamed(const std::string_view &name) { - std::unique_lock<utils::RWLock> guard(lock_); - auto found_it = modules_.find(name); - if (found_it == modules_.end()) { - LOG(ERROR) << "Trying to reload module '" << name - << "' which is not loaded."; - return false; - } - auto &module = found_it->second; - LOG(INFO) << "Reloading module '" << name << "' ..."; - if (!module->Reload()) { - modules_.erase(found_it); - return false; - } - LOG(INFO) << "Reloaded module '" << name << "'"; - return true; -} - -bool ModuleRegistry::ReloadAllModules() { - std::unique_lock<utils::RWLock> guard(lock_); - for (auto &[name, module] : modules_) { - LOG(INFO) << "Reloading module '" << name << "' ..."; - if (!module->Reload()) { - modules_.erase(name); - return false; - } - LOG(INFO) << "Reloaded module '" << name << "'"; - } - return true; -} - void ModuleRegistry::UnloadAllModules() { std::unique_lock<utils::RWLock> guard(lock_); - // This is correct because the destructor will close each module. - modules_.clear(); + DoUnloadAllModules(); } std::optional<std::pair<procedure::ModulePtr, const mgp_proc *>> FindProcedure( diff --git a/src/query/procedure/module.hpp b/src/query/procedure/module.hpp index 7dd658daf..d23bc973d 100644 --- a/src/query/procedure/module.hpp +++ b/src/query/procedure/module.hpp @@ -28,9 +28,6 @@ class Module { /// Invokes the (optional) shutdown function and closes the module. virtual bool Close() = 0; - /// Reloads the module. - virtual bool Reload() = 0; - /// Returns registered procedures of this module virtual const std::map<std::string, mgp_proc, std::less<>> *Procedures() const = 0; @@ -58,37 +55,46 @@ class ModuleRegistry final { std::map<std::string, std::unique_ptr<Module>, std::less<>> modules_; mutable utils::RWLock lock_{utils::RWLock::Priority::WRITE}; + bool RegisterModule(const std::string_view &name, + std::unique_ptr<Module> module); + + void DoUnloadAllModules(); + public: ModuleRegistry(); - /// Load a module from the given path and return true if successful. + /// Set the modules directory that will be used when (re)loading modules. + void SetModulesDirectory(const std::filesystem::path &modules_dir); + + /// Atomically load or reload a module with a particular name from the given + /// directory. /// - /// A write lock is taken during the execution of this method. Loading a - /// module is done through `dlopen` facility and path is resolved accordingly. - /// The module is registered using the filename part of the path, with the - /// extension removed. If a module with the same name already exists, the - /// function does nothing. - bool LoadModuleLibrary(std::filesystem::path path); + /// Takes a write lock. If the module exists it is reloaded. Otherwise, the + /// module is loaded from the file whose filename, without the extension, + /// matches the module's name. If multiple such files exist, only one is + /// chosen, in an unspecified manner. If loading of the chosen file fails, no + /// other files are tried. + /// + /// Return true if the module was loaded or reloaded successfully, false + /// otherwise. + bool LoadOrReloadModuleFromName(const std::string_view &name); + + /// Atomically unload all modules and then load all possible modules from the + /// given directory. + /// + /// Takes a write lock. + void UnloadAndLoadModulesFromDirectory(); /// Find a module with given name or return nullptr. /// Takes a read lock. ModulePtr GetModuleNamed(const std::string_view &name) const; - /// Reload a module with given name and return true if successful. - /// Takes a write lock. Builtin modules cannot be reloaded, though true will - /// be returned if you try to do so. If false was returned, then the module is - /// no longer registered. - bool ReloadModuleNamed(const std::string_view &name); - - /// Reload all loaded (non-builtin) modules and return true if successful. - /// Takes a write lock. If false was returned, the module which failed to - /// reload is no longer registered. Remaining modules may or may not be - /// reloaded, but are valid and registered. - bool ReloadAllModules(); - /// Remove all loaded (non-builtin) modules. /// Takes a write lock. void UnloadAllModules(); + + private: + std::filesystem::path modules_dir_; }; /// Single, global module registry. diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index f8278de67..364e925ce 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -2963,14 +2963,14 @@ TEST_P(CypherMainVisitorTest, CallYieldAsteriskReturnAsterisk) { TEST_P(CypherMainVisitorTest, CallWithoutYield) { auto &ast_generator = *GetParam(); auto *query = dynamic_cast<CypherQuery *>( - ast_generator.ParseQuery("CALL mg.reload_all()")); + ast_generator.ParseQuery("CALL mg.load_all()")); ASSERT_TRUE(query); ASSERT_TRUE(query->single_query_); auto *single_query = query->single_query_; ASSERT_EQ(single_query->clauses_.size(), 1U); auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]); ASSERT_TRUE(call_proc); - ASSERT_EQ(call_proc->procedure_name_, "mg.reload_all"); + ASSERT_EQ(call_proc->procedure_name_, "mg.load_all"); ASSERT_TRUE(call_proc->arguments_.empty()); ASSERT_TRUE(call_proc->result_fields_.empty()); ASSERT_TRUE(call_proc->result_identifiers_.empty()); @@ -2980,14 +2980,14 @@ TEST_P(CypherMainVisitorTest, CallWithoutYield) { TEST_P(CypherMainVisitorTest, CallWithMemoryLimitWithoutYield) { auto &ast_generator = *GetParam(); auto *query = dynamic_cast<CypherQuery *>( - ast_generator.ParseQuery("CALL mg.reload_all() MEMORY LIMIT 32 KB")); + ast_generator.ParseQuery("CALL mg.load_all() MEMORY LIMIT 32 KB")); ASSERT_TRUE(query); ASSERT_TRUE(query->single_query_); auto *single_query = query->single_query_; ASSERT_EQ(single_query->clauses_.size(), 1U); auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]); ASSERT_TRUE(call_proc); - ASSERT_EQ(call_proc->procedure_name_, "mg.reload_all"); + ASSERT_EQ(call_proc->procedure_name_, "mg.load_all"); ASSERT_TRUE(call_proc->arguments_.empty()); ASSERT_TRUE(call_proc->result_fields_.empty()); ASSERT_TRUE(call_proc->result_identifiers_.empty()); @@ -2998,14 +2998,14 @@ TEST_P(CypherMainVisitorTest, CallWithMemoryLimitWithoutYield) { TEST_P(CypherMainVisitorTest, CallWithMemoryUnlimitedWithoutYield) { auto &ast_generator = *GetParam(); auto *query = dynamic_cast<CypherQuery *>( - ast_generator.ParseQuery("CALL mg.reload_all() MEMORY UNLIMITED")); + ast_generator.ParseQuery("CALL mg.load_all() MEMORY UNLIMITED")); ASSERT_TRUE(query); ASSERT_TRUE(query->single_query_); auto *single_query = query->single_query_; ASSERT_EQ(single_query->clauses_.size(), 1U); auto *call_proc = dynamic_cast<CallProcedure *>(single_query->clauses_[0]); ASSERT_TRUE(call_proc); - ASSERT_EQ(call_proc->procedure_name_, "mg.reload_all"); + ASSERT_EQ(call_proc->procedure_name_, "mg.load_all"); ASSERT_TRUE(call_proc->arguments_.empty()); ASSERT_TRUE(call_proc->result_fields_.empty()); ASSERT_TRUE(call_proc->result_identifiers_.empty());