Add PluginRegistry for openCypher custom procedures

Summary:
This diff implements a mechanism for registering plugins which provide
custom procedures for openCypher. Although the `Plugin` struct already
stores some function pointers, these are not set in stone w.r.t. to
requirements and signatures.

For example, in the future, we may want to allow a single plugin to
register multiple custom procedures instead of just one.

Reviewers: ipaljak, dsantl, mferencevic

Reviewed By: mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2386
This commit is contained in:
Teon Banek 2019-09-13 11:00:28 +02:00
parent fe2196b403
commit 04c037e0fd
3 changed files with 233 additions and 0 deletions

View File

@ -63,6 +63,7 @@ set(mg_single_node_sources
query/plan/rewrite/index_lookup.cpp
query/plan/rule_based_planner.cpp
query/plan/variable_start_planner.cpp
query/plugin/plugin.cpp
query/repl.cpp
query/typed_value.cpp
storage/common/constraints/record.cpp
@ -133,6 +134,7 @@ set(mg_single_node_v2_sources
query/plan/rewrite/index_lookup.cpp
query/plan/rule_based_planner.cpp
query/plan/variable_start_planner.cpp
query/plugin/plugin.cpp
query/typed_value.cpp
memgraph_init.cpp
)

142
src/query/plugin/plugin.cpp Normal file
View File

@ -0,0 +1,142 @@
#include "query/plugin/plugin.hpp"
extern "C" {
#include <dlfcn.h>
}
#include <optional>
namespace query::plugin {
PluginRegistry gPluginRegistry;
namespace {
std::optional<Plugin> LoadPluginFromSharedLibrary(std::filesystem::path path) {
LOG(INFO) << "Loading plugin " << path << " ...";
Plugin plugin{path};
dlerror(); // Clear any existing error.
plugin.handle = dlopen(path.c_str(), RTLD_NOW | RTLD_LOCAL);
if (!plugin.handle) {
LOG(ERROR) << "Unable to load plugin " << path << "; " << dlerror();
return std::nullopt;
}
// Get required mg_main
plugin.main_fn =
reinterpret_cast<void (*)(const mg_value **, int64_t, const mg_graph *,
mg_result *)>(dlsym(plugin.handle, "mg_main"));
const char *error = dlerror();
if (!plugin.main_fn || error) {
LOG(ERROR) << "Unable to load plugin " << path << "; " << error;
dlclose(plugin.handle);
return std::nullopt;
}
// Get optional mg_init_module
plugin.init_fn =
reinterpret_cast<int (*)()>(dlsym(plugin.handle, "mg_init_module"));
error = dlerror();
if (error) LOG(WARNING) << "When loading plugin " << path << "; " << error;
// Run mg_init_module which must succeed.
if (plugin.init_fn) {
int init_res = plugin.init_fn();
if (init_res != 0) {
LOG(ERROR) << "Unable to load plugin " << path
<< "; mg_init_module returned " << init_res;
dlclose(plugin.handle);
return std::nullopt;
}
}
// Get optional mg_shutdown_module
plugin.shutdown_fn =
reinterpret_cast<int (*)()>(dlsym(plugin.handle, "mg_shutdown_module"));
error = dlerror();
if (error) LOG(WARNING) << "When loading plugin " << path << "; " << error;
LOG(INFO) << "Loaded plugin " << path;
return plugin;
}
bool ClosePlugin(Plugin *plugin) {
LOG(INFO) << "Closing plugin " << plugin->file_path << " ...";
if (plugin->shutdown_fn) {
int shutdown_res = plugin->shutdown_fn();
if (shutdown_res != 0) {
LOG(WARNING) << "When closing plugin " << plugin->file_path
<< "; mg_shutdown_module returned " << shutdown_res;
}
}
if (dlclose(plugin->handle) != 0) {
LOG(ERROR) << "Failed to close plugin " << plugin->file_path << "; "
<< dlerror();
return false;
}
LOG(INFO) << "Closed plugin " << plugin->file_path;
return true;
}
} // namespace
bool PluginRegistry::LoadPluginLibrary(std::filesystem::path path) {
std::unique_lock<utils::RWLock> guard(lock_);
std::string plugin_name(path.stem());
if (plugins_.find(plugin_name) != plugins_.end()) return true;
auto maybe_plugin = LoadPluginFromSharedLibrary(path);
if (!maybe_plugin) return false;
plugins_[plugin_name] = std::move(*maybe_plugin);
return true;
}
PluginPtr PluginRegistry::GetPluginNamed(const std::string_view &name) {
std::shared_lock<utils::RWLock> guard(lock_);
// NOTE: std::unordered_map::find cannot work with std::string_view :(
auto found_it = plugins_.find(std::string(name));
if (found_it == plugins_.end()) return nullptr;
return PluginPtr(&found_it->second, std::move(guard));
}
bool PluginRegistry::ReloadPluginNamed(const std::string_view &name) {
std::unique_lock<utils::RWLock> guard(lock_);
// NOTE: std::unordered_map::find cannot work with std::string_view :(
auto found_it = plugins_.find(std::string(name));
if (found_it == plugins_.end()) {
LOG(ERROR) << "Trying to reload plugin '" << name
<< "' which is not loaded.";
return false;
}
auto &plugin = found_it->second;
if (!ClosePlugin(&plugin)) {
plugins_.erase(found_it);
return false;
}
auto maybe_plugin = LoadPluginFromSharedLibrary(plugin.file_path);
if (!maybe_plugin) {
plugins_.erase(found_it);
return false;
}
plugin = std::move(*maybe_plugin);
return true;
}
bool PluginRegistry::ReloadAllPlugins() {
std::unique_lock<utils::RWLock> guard(lock_);
for (auto &[name, plugin] : plugins_) {
if (!ClosePlugin(&plugin)) {
plugins_.erase(name);
return false;
}
auto maybe_plugin = LoadPluginFromSharedLibrary(plugin.file_path);
if (!maybe_plugin) {
plugins_.erase(name);
return false;
}
plugin = std::move(*maybe_plugin);
}
return true;
}
void PluginRegistry::UnloadAllPlugins() {
std::unique_lock<utils::RWLock> guard(lock_);
for (auto &name_and_plugin : plugins_) ClosePlugin(&name_and_plugin.second);
plugins_.clear();
}
} // namespace query::plugin

View File

@ -0,0 +1,89 @@
/// @file API for loading and registering plugins providing custom oC procedures
#pragma once
#include <filesystem>
#include <functional>
#include <shared_mutex>
#include <string>
#include <string_view>
#include <unordered_map>
#include "utils/rw_lock.hpp"
struct mg_value;
struct mg_graph;
struct mg_result;
namespace query::plugin {
struct Plugin final {
/// Path as requested for loading the plugin from a library.
std::filesystem::path file_path;
/// System handle to shared library.
void *handle;
/// Entry-point for plugin's custom procedure.
std::function<void(const mg_value **, int64_t, const mg_graph *, mg_result *)>
main_fn;
/// Optional initialization function called on plugin load.
std::function<int()> init_fn;
/// Optional shutdown function called on plugin unload.
std::function<int()> shutdown_fn;
};
/// Proxy for a registered Plugin, acquires a read lock from PluginRegistry.
class PluginPtr final {
const Plugin *plugin_{nullptr};
std::shared_lock<utils::RWLock> lock_;
public:
PluginPtr() = default;
PluginPtr(std::nullptr_t) {}
PluginPtr(const Plugin *plugin, std::shared_lock<utils::RWLock> lock)
: plugin_(plugin), lock_(std::move(lock)) {}
explicit operator bool() const { return static_cast<bool>(plugin_); }
const Plugin &operator*() const { return *plugin_; }
const Plugin *operator->() const { return plugin_; }
};
/// Thread-safe registration of plugins from libraries, uses utils::RWLock.
class PluginRegistry final {
std::unordered_map<std::string, Plugin> plugins_;
utils::RWLock lock_{utils::RWLock::Priority::WRITE};
public:
/// Load a plugin from the given path and return true if successful.
///
/// A write lock is taken during the execution of this method. Loading a
/// plugin is done through `dlopen` facility and path is resolved accordingly.
/// The plugin is registered using the filename part of the path, with the
/// extension removed. If a plugin with the same name already exists, the
/// function does nothing.
bool LoadPluginLibrary(std::filesystem::path path);
/// Find a plugin with given name or return nullptr.
/// Takes a read lock.
PluginPtr GetPluginNamed(const std::string_view &name);
/// Reload a plugin with given name and return true if successful.
/// Takes a write lock. If false was returned, then the plugin is no longer
/// registered.
bool ReloadPluginNamed(const std::string_view &name);
/// Reload all loaded plugins and return true if successful.
/// Takes a write lock. If false was returned, the plugin which failed to
/// reload is no longer registered. Remaining plugins may or may not be
/// reloaded, but are valid and registered.
bool ReloadAllPlugins();
/// Remove all loaded plugins.
/// Takes a write lock.
void UnloadAllPlugins();
};
/// Single, global plugin registry.
extern PluginRegistry gPluginRegistry;
} // namespace query::plugin