Implement storage lock
Summary: The storage now uses a file in the data directory (`.lock`) to determine whether there is another instance of the storage running with the same data directory. That helps notify the user/administrator that the system is running in an unsupported configuration. Reviewers: teon.banek, ipaljak Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2719
This commit is contained in:
parent
eaabcd8d0d
commit
f48ad62647
@ -1,13 +1,5 @@
|
||||
#pragma once
|
||||
|
||||
#include <pwd.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <cerrno>
|
||||
#include <cstring>
|
||||
|
||||
#include <filesystem>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
@ -61,36 +53,3 @@ inline void LoadConfig(const std::string &product_name) {
|
||||
for (int i = 0; i < custom_argc; ++i) free(custom_argv[i]);
|
||||
delete[] custom_argv;
|
||||
}
|
||||
|
||||
/// Verifies that the owner of the data directory is the same user that started
|
||||
/// the current process.
|
||||
inline void VerifyDataDirectoryOwnerAndProcessUser(
|
||||
const std::string &data_directory) {
|
||||
// Get the process user ID.
|
||||
auto process_euid = geteuid();
|
||||
|
||||
// Get the data directory owner ID.
|
||||
struct stat statbuf;
|
||||
auto ret = stat(data_directory.c_str(), &statbuf);
|
||||
if (ret != 0 && errno == ENOENT) {
|
||||
// The directory doesn't currently exist.
|
||||
return;
|
||||
}
|
||||
CHECK(ret == 0) << "Couldn't get stat for '" << data_directory
|
||||
<< "' because of: " << strerror(errno) << " (" << errno
|
||||
<< ")";
|
||||
auto directory_owner = statbuf.st_uid;
|
||||
|
||||
auto get_username = [](auto uid) {
|
||||
auto info = getpwuid(uid);
|
||||
if (!info) return std::to_string(uid);
|
||||
return std::string(info->pw_name);
|
||||
};
|
||||
|
||||
auto user_process = get_username(process_euid);
|
||||
auto user_directory = get_username(directory_owner);
|
||||
CHECK(process_euid == directory_owner)
|
||||
<< "The process is running as user " << user_process
|
||||
<< ", but the data directory is owned by user " << user_directory
|
||||
<< ". Please start the process as user " << user_directory << "!";
|
||||
}
|
||||
|
@ -825,10 +825,6 @@ int main(int argc, char **argv) {
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&utils::TerminateHandler);
|
||||
|
||||
// Verify that the user that started the Memgraph process is the same user
|
||||
// that is the owner of the data directory.
|
||||
VerifyDataDirectoryOwnerAndProcessUser(FLAGS_data_directory);
|
||||
|
||||
// Initialize Python
|
||||
auto *program_name = Py_DecodeLocale(argv[0], nullptr);
|
||||
CHECK(program_name);
|
||||
|
@ -735,10 +735,6 @@ int main(int argc, char *argv[]) {
|
||||
FLAGS_id_type = upper;
|
||||
}
|
||||
|
||||
// Verify that the user that started the Memgraph process is the same user
|
||||
// that is the owner of the data directory.
|
||||
VerifyDataDirectoryOwnerAndProcessUser(FLAGS_data_directory);
|
||||
|
||||
std::unordered_map<NodeId, storage::Gid> node_id_map;
|
||||
storage::Storage store{
|
||||
{.durability =
|
||||
|
@ -1,9 +1,21 @@
|
||||
#include "storage/v2/durability.hpp"
|
||||
|
||||
#include <fcntl.h>
|
||||
#include <pwd.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <cerrno>
|
||||
#include <cstring>
|
||||
|
||||
#include <algorithm>
|
||||
#include <filesystem>
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/edge_ref.hpp"
|
||||
@ -885,6 +897,40 @@ void RemoveRecoveredIndexConstraint(std::vector<TObj> *list, TObj obj,
|
||||
bool IsVersionSupported(uint64_t version) {
|
||||
return version >= kOldestSupportedVersion && version <= kVersion;
|
||||
}
|
||||
|
||||
/// Verifies that the owner of the storage directory is the same user that
|
||||
/// started the current process.
|
||||
void VerifyStorageDirectoryOwnerAndProcessUserOrDie(
|
||||
const std::filesystem::path &storage_directory) {
|
||||
// Get the process user ID.
|
||||
auto process_euid = geteuid();
|
||||
|
||||
// Get the data directory owner ID.
|
||||
struct stat statbuf;
|
||||
auto ret = stat(storage_directory.c_str(), &statbuf);
|
||||
if (ret != 0 && errno == ENOENT) {
|
||||
// The directory doesn't currently exist.
|
||||
return;
|
||||
}
|
||||
CHECK(ret == 0) << "Couldn't get stat for '" << storage_directory
|
||||
<< "' because of: " << strerror(errno) << " (" << errno
|
||||
<< ")";
|
||||
auto directory_owner = statbuf.st_uid;
|
||||
|
||||
auto get_username = [](auto uid) {
|
||||
auto info = getpwuid(uid);
|
||||
if (!info) return std::to_string(uid);
|
||||
return std::string(info->pw_name);
|
||||
};
|
||||
|
||||
auto user_process = get_username(process_euid);
|
||||
auto user_directory = get_username(directory_owner);
|
||||
CHECK(process_euid == directory_owner)
|
||||
<< "The process is running as user " << user_process
|
||||
<< ", but the data directory is owned by user " << user_directory
|
||||
<< ". Please start the process as user " << user_directory << "!";
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
// Function used to read information about the snapshot file.
|
||||
@ -1365,14 +1411,43 @@ Durability::Durability(Config::Durability config,
|
||||
indices_(indices),
|
||||
constraints_(constraints),
|
||||
items_(items),
|
||||
storage_directory_(config_.storage_directory),
|
||||
snapshot_directory_(config_.storage_directory / kSnapshotDirectory),
|
||||
wal_directory_(config_.storage_directory / kWalDirectory),
|
||||
lock_file_path_(config_.storage_directory / ".lock"),
|
||||
uuid_(utils::GenerateUUID()) {}
|
||||
|
||||
std::optional<Durability::RecoveryInfo> Durability::Initialize(
|
||||
std::function<void(std::function<void(Transaction *)>)>
|
||||
execute_with_transaction) {
|
||||
execute_with_transaction_ = execute_with_transaction;
|
||||
if (config_.snapshot_wal_mode !=
|
||||
Config::Durability::SnapshotWalMode::DISABLED ||
|
||||
config_.snapshot_on_exit || config_.recover_on_startup) {
|
||||
// Create the directory initially to crash the database in case of
|
||||
// permission errors. This is done early to crash the database on startup
|
||||
// instead of crashing the database for the first time during runtime (which
|
||||
// could be an unpleasant surprise).
|
||||
utils::EnsureDirOrDie(snapshot_directory_);
|
||||
// Same reasoning as above.
|
||||
utils::EnsureDirOrDie(wal_directory_);
|
||||
|
||||
// Verify that the user that started the process is the same user that is
|
||||
// the owner of the storage directory.
|
||||
VerifyStorageDirectoryOwnerAndProcessUserOrDie(storage_directory_);
|
||||
|
||||
// Create the lock file and open a handle to it. This will crash the
|
||||
// database if it can't open the file for writing or if any other process is
|
||||
// holding the file opened.
|
||||
lock_file_handle_.Open(lock_file_path_,
|
||||
utils::OutputFile::Mode::OVERWRITE_EXISTING);
|
||||
CHECK(lock_file_handle_.AcquireLock())
|
||||
<< "Couldn't acquire lock on the storage directory "
|
||||
<< storage_directory_
|
||||
<< "!\nAnother Memgraph process is currently running with the same "
|
||||
"storage directory, please stop it first before starting this "
|
||||
"process!";
|
||||
}
|
||||
std::optional<Durability::RecoveryInfo> ret;
|
||||
if (config_.recover_on_startup) {
|
||||
ret = RecoverData();
|
||||
@ -1408,20 +1483,6 @@ std::optional<Durability::RecoveryInfo> Durability::Initialize(
|
||||
"be overridden. To prevent important data loss, Memgraph has stored "
|
||||
"those files into a .backup directory inside the storage directory.";
|
||||
}
|
||||
if (config_.snapshot_wal_mode !=
|
||||
Config::Durability::SnapshotWalMode::DISABLED ||
|
||||
config_.snapshot_on_exit) {
|
||||
// Create the directory initially to crash the database in case of
|
||||
// permission errors. This is done early to crash the database on startup
|
||||
// instead of crashing the database for the first time during runtime (which
|
||||
// could be an unpleasant surprise).
|
||||
utils::EnsureDirOrDie(snapshot_directory_);
|
||||
}
|
||||
if (config_.snapshot_wal_mode ==
|
||||
Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL) {
|
||||
// Same reasoning as above.
|
||||
utils::EnsureDirOrDie(wal_directory_);
|
||||
}
|
||||
if (config_.snapshot_wal_mode !=
|
||||
Config::Durability::SnapshotWalMode::DISABLED) {
|
||||
snapshot_runner_.Run("Snapshot", config_.snapshot_interval, [this] {
|
||||
|
@ -426,8 +426,11 @@ class Durability final {
|
||||
std::function<void(std::function<void(Transaction *)>)>
|
||||
execute_with_transaction_;
|
||||
|
||||
std::filesystem::path storage_directory_;
|
||||
std::filesystem::path snapshot_directory_;
|
||||
std::filesystem::path wal_directory_;
|
||||
std::filesystem::path lock_file_path_;
|
||||
utils::OutputFile lock_file_handle_;
|
||||
|
||||
utils::Scheduler snapshot_runner_;
|
||||
|
||||
|
@ -246,6 +246,7 @@ void InputFile::Close() noexcept {
|
||||
}
|
||||
|
||||
fd_ = -1;
|
||||
path_ = "";
|
||||
}
|
||||
|
||||
bool InputFile::LoadBuffer() {
|
||||
@ -397,6 +398,28 @@ size_t OutputFile::SetPosition(Position position, ssize_t offset) {
|
||||
}
|
||||
}
|
||||
|
||||
bool OutputFile::AcquireLock() {
|
||||
CHECK(IsOpen()) << "Trying to acquire a write lock on an unopened file!";
|
||||
int ret = -1;
|
||||
while (true) {
|
||||
struct flock lock;
|
||||
memset(&lock, 0, sizeof(lock));
|
||||
lock.l_type = F_WRLCK;
|
||||
lock.l_whence = SEEK_SET;
|
||||
lock.l_start = 0;
|
||||
lock.l_len = 0;
|
||||
ret = fcntl(fd_, F_SETLK, &lock);
|
||||
if (ret == -1 && errno == EINTR) {
|
||||
// The call was interrupted, try again...
|
||||
continue;
|
||||
} else {
|
||||
// All other possible errors are handled in the return below.
|
||||
break;
|
||||
}
|
||||
}
|
||||
return ret != -1;
|
||||
}
|
||||
|
||||
void OutputFile::Sync() {
|
||||
FlushBuffer(true);
|
||||
|
||||
@ -468,6 +491,7 @@ void OutputFile::Close() noexcept {
|
||||
|
||||
fd_ = -1;
|
||||
written_since_last_sync_ = 0;
|
||||
path_ = "";
|
||||
}
|
||||
|
||||
void OutputFile::FlushBuffer(bool force_flush) {
|
||||
|
@ -205,6 +205,13 @@ class OutputFile {
|
||||
/// program.
|
||||
size_t SetPosition(Position position, ssize_t offset);
|
||||
|
||||
/// This function tries to acquire a POSIX write lock on the file. The
|
||||
/// acquired lock is valid during the whole lifetime of the process and can't
|
||||
/// be acquired again. The function returns `true` if the lock was required
|
||||
/// successfully, `false` is returned otherwise. On misuse it crashes the
|
||||
/// program.
|
||||
bool AcquireLock();
|
||||
|
||||
/// Syncs currently pending data to the currently opened file. On failure
|
||||
/// and misuse it crashes the program.
|
||||
void Sync();
|
||||
|
@ -42,6 +42,47 @@ def list_to_string(data):
|
||||
return ret
|
||||
|
||||
|
||||
def verify_lifetime(memgraph_binary, mg_import_csv_binary):
|
||||
print("\033[1;36m~~ Verifying that mg_import_csv can't be started while "
|
||||
"memgraph is running ~~\033[0m")
|
||||
storage_directory = tempfile.TemporaryDirectory()
|
||||
|
||||
# Generate common args
|
||||
common_args = ["--data-directory", storage_directory.name,
|
||||
"--storage-properties-on-edges=false"]
|
||||
|
||||
# Start the memgraph binary
|
||||
memgraph_args = [memgraph_binary, "--storage-recover-on-startup"] + \
|
||||
common_args
|
||||
memgraph = subprocess.Popen(list(map(str, memgraph_args)))
|
||||
time.sleep(0.1)
|
||||
assert memgraph.poll() is None, "Memgraph process died prematurely!"
|
||||
wait_for_server(7687)
|
||||
|
||||
# Register cleanup function
|
||||
@atexit.register
|
||||
def cleanup():
|
||||
if memgraph.poll() is None:
|
||||
memgraph.terminate()
|
||||
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
|
||||
|
||||
# Execute mg_import_csv.
|
||||
mg_import_csv_args = [mg_import_csv_binary, "--nodes", "/dev/null"] + \
|
||||
common_args
|
||||
ret = subprocess.run(mg_import_csv_args)
|
||||
|
||||
# Check the return code
|
||||
if ret.returncode == 0:
|
||||
raise Exception(
|
||||
"The importer was able to run while memgraph was running!")
|
||||
|
||||
# Shutdown the memgraph binary
|
||||
memgraph.terminate()
|
||||
assert memgraph.wait() == 0, "Memgraph process didn't exit cleanly!"
|
||||
|
||||
print("\033[1;32m~~ Test successful ~~\033[0m\n")
|
||||
|
||||
|
||||
def execute_test(name, test_path, test_config, memgraph_binary,
|
||||
mg_import_csv_binary, tester_binary):
|
||||
print("\033[1;36m~~ Executing test", name, "~~\033[0m")
|
||||
@ -144,8 +185,15 @@ if __name__ == "__main__":
|
||||
parser.add_argument("--tester", default=tester_binary)
|
||||
args = parser.parse_args()
|
||||
|
||||
# First test whether the CSV importer can be started while the main
|
||||
# Memgraph binary is running.
|
||||
verify_lifetime(memgraph_binary, mg_import_csv_binary)
|
||||
|
||||
# Run all import scenarios.
|
||||
test_dir = os.path.join(SCRIPT_DIR, "tests")
|
||||
for name in sorted(os.listdir(test_dir)):
|
||||
tests_list = sorted(os.listdir(test_dir))
|
||||
assert len(tests_list) > 0, "No tests were found!"
|
||||
for name in tests_list:
|
||||
print("\033[1;34m~~ Processing tests from", name, "~~\033[0m\n")
|
||||
test_path = os.path.join(test_dir, name)
|
||||
with open(os.path.join(test_path, "test.yaml")) as f:
|
||||
|
Loading…
Reference in New Issue
Block a user