Compare commits
6 Commits
master
...
add-macos-
Author | SHA1 | Date | |
---|---|---|---|
|
9aa18dcb77 | ||
|
795845ccf7 | ||
|
fbc2b4c99b | ||
|
32facac135 | ||
|
7181e546f5 | ||
|
4d9bb27db7 |
@ -235,6 +235,12 @@ set(MG_ARCH "x86_64" CACHE STRING "Host architecture to build Memgraph on. Suppo
|
|||||||
|
|
||||||
# setup external dependencies -------------------------------------------------
|
# setup external dependencies -------------------------------------------------
|
||||||
|
|
||||||
|
set(CMAKE_THREAD_LIBS_INIT "-lpthread")
|
||||||
|
set(CMAKE_HAVE_THREADS_LIBRARY 1)
|
||||||
|
set(CMAKE_USE_WIN32_THREADS_INIT 0)
|
||||||
|
set(CMAKE_USE_PTHREADS_INIT 1)
|
||||||
|
set(THREADS_PREFER_PTHREAD_FLAG ON)
|
||||||
|
|
||||||
# threading
|
# threading
|
||||||
find_package(Threads REQUIRED)
|
find_package(Threads REQUIRED)
|
||||||
# optional readline
|
# optional readline
|
||||||
|
23
environment/os/macos-12.sh
Executable file
23
environment/os/macos-12.sh
Executable file
@ -0,0 +1,23 @@
|
|||||||
|
#/bin/bash
|
||||||
|
|
||||||
|
brew install bash
|
||||||
|
brew install cmake
|
||||||
|
brew install clisp sbcl
|
||||||
|
brew install boost gflags fmt jemalloc openssl
|
||||||
|
brew install openssl@1.1
|
||||||
|
|
||||||
|
brew install cyrus-sasl
|
||||||
|
# cyrus-sasl is keg-only, which means it was not symlinked into /opt/homebrew,
|
||||||
|
# because macOS already provides this software and installing another version in
|
||||||
|
# parallel can cause all kinds of trouble.
|
||||||
|
# If you need to have cyrus-sasl first in your PATH, run:
|
||||||
|
# echo 'export PATH="/opt/homebrew/opt/cyrus-sasl/sbin:$PATH"' >> ~/.zshrc
|
||||||
|
# For compilers to find cyrus-sasl you may need to set:
|
||||||
|
# export LDFLAGS="-L/opt/homebrew/opt/cyrus-sasl/lib"
|
||||||
|
# export CPPFLAGS="-I/opt/homebrew/opt/cyrus-sasl/include"
|
||||||
|
|
||||||
|
# TODO(gitbuda): memgraph::utils::SpinLock
|
||||||
|
# TODO(gitbuda): memgraph::utils::AsyncTimer
|
||||||
|
# TODO(gitbuda): memgraph::utils::RWLock
|
||||||
|
# TODO(gitbuda): memgraph::utils::ThreadSetName
|
||||||
|
# TODO(gitbuda): RocksDB 7.7.2 compiles fine
|
87
init
87
init
@ -1,4 +1,4 @@
|
|||||||
#!/bin/bash -e
|
#!/opt/homebrew/Cellar/bash/5.1.16/bin/bash -e
|
||||||
|
|
||||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||||
cd "$DIR"
|
cd "$DIR"
|
||||||
@ -64,16 +64,16 @@ else
|
|||||||
done
|
done
|
||||||
fi
|
fi
|
||||||
|
|
||||||
DISTRO=$(operating_system)
|
# DISTRO=$(operating_system)
|
||||||
ARCHITECTURE=$(architecture)
|
# ARCHITECTURE=$(architecture)
|
||||||
if [ "${ARCHITECTURE}" = "arm64" ] || [ "${ARCHITECTURE}" = "aarch64" ]; then
|
# if [ "${ARCHITECTURE}" = "arm64" ] || [ "${ARCHITECTURE}" = "aarch64" ]; then
|
||||||
OS_SCRIPT=$DIR/environment/os/$DISTRO-arm.sh
|
# OS_SCRIPT=$DIR/environment/os/$DISTRO-arm.sh
|
||||||
else
|
# else
|
||||||
OS_SCRIPT=$DIR/environment/os/$DISTRO.sh
|
# OS_SCRIPT=$DIR/environment/os/$DISTRO.sh
|
||||||
fi
|
# fi
|
||||||
echo "ALL BUILD PACKAGES: $($OS_SCRIPT list MEMGRAPH_BUILD_DEPS)"
|
# echo "ALL BUILD PACKAGES: $($OS_SCRIPT list MEMGRAPH_BUILD_DEPS)"
|
||||||
$OS_SCRIPT check MEMGRAPH_BUILD_DEPS
|
# $OS_SCRIPT check MEMGRAPH_BUILD_DEPS
|
||||||
echo "All packages are in-place..."
|
# echo "All packages are in-place..."
|
||||||
|
|
||||||
# create a default build directory
|
# create a default build directory
|
||||||
mkdir -p ./build
|
mkdir -p ./build
|
||||||
@ -93,7 +93,8 @@ if [[ ! -f "${quicklisp_install_dir}/setup.lisp" ]]; then
|
|||||||
" | sbcl --script || exit 1
|
" | sbcl --script || exit 1
|
||||||
rm -rf quicklisp.lisp || exit 1
|
rm -rf quicklisp.lisp || exit 1
|
||||||
fi
|
fi
|
||||||
ln -Tfs "$DIR/src/lisp" "${quicklisp_install_dir}/local-projects/lcp"
|
# TODO(gitbuda): -T doesn't work on Mac
|
||||||
|
ln -fs "$DIR/src/lisp" "${quicklisp_install_dir}/local-projects/lcp"
|
||||||
# Install LCP dependencies
|
# Install LCP dependencies
|
||||||
# TODO: We should at some point cache or have a mirror of packages we use.
|
# TODO: We should at some point cache or have a mirror of packages we use.
|
||||||
# TODO: move the installation of LCP's dependencies into ./setup.sh
|
# TODO: move the installation of LCP's dependencies into ./setup.sh
|
||||||
@ -111,34 +112,34 @@ if [[ "$setup_libs" == "true" ]]; then
|
|||||||
cd ..
|
cd ..
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# setup gql_behave dependencies
|
# # setup gql_behave dependencies
|
||||||
setup_virtualenv tests/gql_behave
|
# setup_virtualenv tests/gql_behave
|
||||||
|
#
|
||||||
# setup stress dependencies
|
# # setup stress dependencies
|
||||||
setup_virtualenv tests/stress
|
# setup_virtualenv tests/stress
|
||||||
|
#
|
||||||
# setup integration/ldap dependencies
|
# # setup integration/ldap dependencies
|
||||||
setup_virtualenv tests/integration/ldap
|
# setup_virtualenv tests/integration/ldap
|
||||||
|
#
|
||||||
# Setup tests dependencies.
|
# # Setup tests dependencies.
|
||||||
# cd tests
|
# # cd tests
|
||||||
# ./setup.sh
|
# # ./setup.sh
|
||||||
# cd ..
|
# # cd ..
|
||||||
# TODO(gitbuda): Remove setup_virtualenv, replace it with tests/ve3. Take care
|
# # TODO(gitbuda): Remove setup_virtualenv, replace it with tests/ve3. Take care
|
||||||
# of the build order because tests/setup.py builds pymgclient which depends on
|
# # of the build order because tests/setup.py builds pymgclient which depends on
|
||||||
# mgclient which is build after this script by calling make.
|
# # mgclient which is build after this script by calling make.
|
||||||
|
#
|
||||||
echo "Done installing dependencies for Memgraph"
|
# echo "Done installing dependencies for Memgraph"
|
||||||
|
#
|
||||||
echo "Linking git hooks"
|
# echo "Linking git hooks"
|
||||||
for hook in $(find $DIR/.githooks -type f -printf "%f\n"); do
|
# for hook in $(find $DIR/.githooks -type f -printf "%f\n"); do
|
||||||
ln -s -f "$DIR/.githooks/$hook" "$DIR/.git/hooks/$hook"
|
# ln -s -f "$DIR/.githooks/$hook" "$DIR/.git/hooks/$hook"
|
||||||
echo "Added $hook hook"
|
# echo "Added $hook hook"
|
||||||
done;
|
# done;
|
||||||
|
#
|
||||||
# Install precommit hook
|
# # Install precommit hook
|
||||||
python3 -m pip install pre-commit
|
# python3 -m pip install pre-commit
|
||||||
python3 -m pre_commit install
|
# python3 -m pre_commit install
|
||||||
|
#
|
||||||
# Link `include/mgp.py` with `release/mgp/mgp.py`
|
# # Link `include/mgp.py` with `release/mgp/mgp.py`
|
||||||
ln -v -f include/mgp.py release/mgp/mgp.py
|
# ln -v -f include/mgp.py release/mgp/mgp.py
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
#!/bin/bash -e
|
#!/opt/homebrew/Cellar/bash/5.1.16/bin/bash -e
|
||||||
|
|
||||||
# Download external dependencies.
|
# Download external dependencies.
|
||||||
# Don't forget to add/update the license in release/third-party-licenses of added/updated libs!
|
# Don't forget to add/update the license in release/third-party-licenses of added/updated libs!
|
||||||
@ -201,7 +201,7 @@ popd
|
|||||||
# mgclient
|
# mgclient
|
||||||
mgclient_tag="v1.4.0" # (2022-06-14)
|
mgclient_tag="v1.4.0" # (2022-06-14)
|
||||||
repo_clone_try_double "${primary_urls[mgclient]}" "${secondary_urls[mgclient]}" "mgclient" "$mgclient_tag"
|
repo_clone_try_double "${primary_urls[mgclient]}" "${secondary_urls[mgclient]}" "mgclient" "$mgclient_tag"
|
||||||
sed -i 's/\${CMAKE_INSTALL_LIBDIR}/lib/' mgclient/src/CMakeLists.txt
|
sed -e 's/\${CMAKE_INSTALL_LIBDIR}/lib/' mgclient/src/CMakeLists.txt
|
||||||
|
|
||||||
# pymgclient
|
# pymgclient
|
||||||
pymgclient_tag="4f85c179e56302d46a1e3e2cf43509db65f062b3" # (2021-01-15)
|
pymgclient_tag="4f85c179e56302d46a1e3e2cf43509db65f062b3" # (2021-01-15)
|
||||||
@ -233,6 +233,7 @@ git apply ../pulsar.patch
|
|||||||
popd
|
popd
|
||||||
|
|
||||||
#librdtsc
|
#librdtsc
|
||||||
|
# TODO(gitbuda): __always_inline doesn't work on Apple Clang 14
|
||||||
librdtsc_tag="v0.3"
|
librdtsc_tag="v0.3"
|
||||||
repo_clone_try_double "${primary_urls[librdtsc]}" "${secondary_urls[librdtsc]}" "librdtsc" "$librdtsc_tag" true
|
repo_clone_try_double "${primary_urls[librdtsc]}" "${secondary_urls[librdtsc]}" "librdtsc" "$librdtsc_tag" true
|
||||||
pushd librdtsc
|
pushd librdtsc
|
||||||
|
@ -4,7 +4,8 @@ set(auth_src_files
|
|||||||
models.cpp
|
models.cpp
|
||||||
module.cpp)
|
module.cpp)
|
||||||
|
|
||||||
find_package(Seccomp REQUIRED)
|
# TODO(gitbuda): Deal with the MacOS specific setup around Seccomp.
|
||||||
|
# find_package(Seccomp REQUIRED)
|
||||||
find_package(fmt REQUIRED)
|
find_package(fmt REQUIRED)
|
||||||
find_package(gflags REQUIRED)
|
find_package(gflags REQUIRED)
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2022 Memgraph Ltd.
|
// Copyright 2023 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -718,6 +718,7 @@ int main(int argc, char **argv) {
|
|||||||
auto gil = memgraph::py::EnsureGIL();
|
auto gil = memgraph::py::EnsureGIL();
|
||||||
// NOLINTNEXTLINE(hicpp-signed-bitwise)
|
// NOLINTNEXTLINE(hicpp-signed-bitwise)
|
||||||
auto *flag = PyLong_FromLong(RTLD_NOW | RTLD_DEEPBIND);
|
auto *flag = PyLong_FromLong(RTLD_NOW | RTLD_DEEPBIND);
|
||||||
|
// TODO(gitbuda): This is Unix specific -> https://docs.python.org/3/library/sys.html#sys.setdlopenflags
|
||||||
auto *setdl = PySys_GetObject("setdlopenflags");
|
auto *setdl = PySys_GetObject("setdlopenflags");
|
||||||
MG_ASSERT(setdl);
|
MG_ASSERT(setdl);
|
||||||
auto *arg = PyTuple_New(1);
|
auto *arg = PyTuple_New(1);
|
||||||
|
@ -11,188 +11,15 @@
|
|||||||
|
|
||||||
#include "utils/async_timer.hpp"
|
#include "utils/async_timer.hpp"
|
||||||
|
|
||||||
#include <csignal>
|
|
||||||
|
|
||||||
#include <algorithm>
|
|
||||||
#include <atomic>
|
|
||||||
#include <cmath>
|
|
||||||
#include <cstdint>
|
|
||||||
#include <limits>
|
|
||||||
|
|
||||||
#include "utils/skip_list.hpp"
|
|
||||||
#include "utils/spin_lock.hpp"
|
|
||||||
#include "utils/synchronized.hpp"
|
|
||||||
|
|
||||||
namespace {
|
|
||||||
|
|
||||||
inline constexpr uint64_t kInvalidFlagId = 0U;
|
|
||||||
// std::numeric_limits<time_t>::max() cannot be represented precisely as a double, so the next smallest value is the
|
|
||||||
// maximum number of seconds the timer can be used with
|
|
||||||
const double max_seconds_as_double = std::nexttoward(std::numeric_limits<time_t>::max(), 0.0);
|
|
||||||
|
|
||||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
|
||||||
std::atomic<uint64_t> expiration_flag_counter{kInvalidFlagId + 1U};
|
|
||||||
|
|
||||||
struct ExpirationFlagInfo {
|
|
||||||
uint64_t id{0U};
|
|
||||||
std::weak_ptr<std::atomic<bool>> flag{};
|
|
||||||
};
|
|
||||||
|
|
||||||
bool operator==(const ExpirationFlagInfo &lhs, const ExpirationFlagInfo &rhs) { return lhs.id == rhs.id; }
|
|
||||||
bool operator<(const ExpirationFlagInfo &lhs, const ExpirationFlagInfo &rhs) { return lhs.id < rhs.id; }
|
|
||||||
bool operator==(const ExpirationFlagInfo &flag_info, const uint64_t id) { return flag_info.id == id; }
|
|
||||||
bool operator<(const ExpirationFlagInfo &flag_info, const uint64_t id) { return flag_info.id < id; }
|
|
||||||
|
|
||||||
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
|
||||||
memgraph::utils::SkipList<ExpirationFlagInfo> expiration_flags{};
|
|
||||||
|
|
||||||
uint64_t AddFlag(std::weak_ptr<std::atomic<bool>> flag) {
|
|
||||||
const auto id = expiration_flag_counter.fetch_add(1, std::memory_order_relaxed);
|
|
||||||
expiration_flags.access().insert({id, std::move(flag)});
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
void EraseFlag(uint64_t flag_id) { expiration_flags.access().remove(flag_id); }
|
|
||||||
|
|
||||||
std::weak_ptr<std::atomic<bool>> GetFlag(uint64_t flag_id) {
|
|
||||||
const auto flag_accessor = expiration_flags.access();
|
|
||||||
const auto it = flag_accessor.find(flag_id);
|
|
||||||
if (it == flag_accessor.end()) {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
return it->flag;
|
|
||||||
}
|
|
||||||
|
|
||||||
void MarkDone(const uint64_t flag_id) {
|
|
||||||
const auto weak_flag = GetFlag(flag_id);
|
|
||||||
if (weak_flag.expired()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
auto flag = weak_flag.lock();
|
|
||||||
if (flag != nullptr) {
|
|
||||||
flag->store(true, std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} // namespace
|
|
||||||
|
|
||||||
namespace memgraph::utils {
|
namespace memgraph::utils {
|
||||||
|
|
||||||
namespace {
|
AsyncTimer::AsyncTimer(double seconds) {
|
||||||
struct ThreadInfo {
|
// TODO(gitbuda): Implement AsyncTimer constructor.
|
||||||
pid_t thread_id;
|
|
||||||
std::atomic<bool> setup_done{false};
|
|
||||||
};
|
|
||||||
|
|
||||||
void *TimerBackgroundWorker(void *args) {
|
|
||||||
auto *thread_info = static_cast<ThreadInfo *>(args);
|
|
||||||
thread_info->thread_id = syscall(SYS_gettid);
|
|
||||||
thread_info->setup_done.store(true, std::memory_order_release);
|
|
||||||
|
|
||||||
sigset_t ss;
|
|
||||||
sigemptyset(&ss);
|
|
||||||
sigaddset(&ss, SIGTIMER);
|
|
||||||
sigprocmask(SIG_BLOCK, &ss, nullptr);
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
siginfo_t si;
|
|
||||||
int result = sigwaitinfo(&ss, &si);
|
|
||||||
|
|
||||||
if (result <= 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (si.si_code == SI_TIMER) {
|
|
||||||
auto flag_id = kInvalidFlagId;
|
|
||||||
std::memcpy(&flag_id, &si.si_value.sival_ptr, sizeof(flag_id));
|
|
||||||
MarkDone(flag_id);
|
|
||||||
} else if (si.si_code == SI_TKILL) {
|
|
||||||
pthread_exit(nullptr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} // namespace
|
|
||||||
|
|
||||||
AsyncTimer::AsyncTimer() : flag_id_{kInvalidFlagId} {};
|
|
||||||
|
|
||||||
AsyncTimer::AsyncTimer(double seconds)
|
|
||||||
: expiration_flag_{std::make_shared<std::atomic<bool>>(false)}, flag_id_{kInvalidFlagId}, timer_id_{} {
|
|
||||||
MG_ASSERT(seconds <= max_seconds_as_double,
|
|
||||||
"The AsyncTimer cannot handle larger time values than {:f}, the specified value: {:f}",
|
|
||||||
max_seconds_as_double, seconds);
|
|
||||||
MG_ASSERT(seconds >= 0.0, "The AsyncTimer cannot handle negative time values: {:f}", seconds);
|
|
||||||
|
|
||||||
static pthread_t background_timer_thread;
|
|
||||||
static ThreadInfo thread_info;
|
|
||||||
static std::once_flag timer_thread_setup_flag;
|
|
||||||
|
|
||||||
std::call_once(timer_thread_setup_flag, [] {
|
|
||||||
pthread_create(&background_timer_thread, nullptr, TimerBackgroundWorker, &thread_info);
|
|
||||||
while (!thread_info.setup_done.load(std::memory_order_acquire))
|
|
||||||
;
|
|
||||||
});
|
|
||||||
|
|
||||||
flag_id_ = AddFlag(std::weak_ptr<std::atomic<bool>>{expiration_flag_});
|
|
||||||
|
|
||||||
sigevent notification_settings{};
|
|
||||||
notification_settings.sigev_notify = SIGEV_THREAD_ID;
|
|
||||||
notification_settings.sigev_signo = SIGTIMER;
|
|
||||||
notification_settings._sigev_un._tid = thread_info.thread_id;
|
|
||||||
static_assert(sizeof(void *) == sizeof(flag_id_), "ID size must be equal to pointer size!");
|
|
||||||
std::memcpy(¬ification_settings.sigev_value.sival_ptr, &flag_id_, sizeof(flag_id_));
|
|
||||||
MG_ASSERT(timer_create(CLOCK_MONOTONIC, ¬ification_settings, &timer_id_) == 0, "Couldn't create timer: ({}) {}",
|
|
||||||
errno, strerror(errno));
|
|
||||||
|
|
||||||
static constexpr auto kSecondsToNanos = 1000 * 1000 * 1000;
|
|
||||||
// Casting will truncate down, but that's exactly what we want.
|
|
||||||
const auto second_as_time_t = static_cast<time_t>(seconds);
|
|
||||||
const auto remaining_nano_seconds = static_cast<time_t>((seconds - second_as_time_t) * kSecondsToNanos);
|
|
||||||
|
|
||||||
struct itimerspec spec;
|
|
||||||
spec.it_interval.tv_sec = 0;
|
|
||||||
spec.it_interval.tv_nsec = 0;
|
|
||||||
spec.it_value.tv_sec = second_as_time_t;
|
|
||||||
spec.it_value.tv_nsec = remaining_nano_seconds;
|
|
||||||
|
|
||||||
MG_ASSERT(timer_settime(timer_id_, 0, &spec, nullptr) == 0, "Couldn't set timer: ({}) {}", errno, strerror(errno));
|
|
||||||
}
|
|
||||||
|
|
||||||
AsyncTimer::~AsyncTimer() { ReleaseResources(); }
|
|
||||||
|
|
||||||
AsyncTimer::AsyncTimer(AsyncTimer &&other) noexcept
|
|
||||||
: expiration_flag_{std::move(other.expiration_flag_)}, flag_id_{other.flag_id_}, timer_id_{other.timer_id_} {
|
|
||||||
other.flag_id_ = kInvalidFlagId;
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOLINTNEXTLINE (hicpp-noexcept-move)
|
|
||||||
AsyncTimer &AsyncTimer::operator=(AsyncTimer &&other) {
|
|
||||||
if (this == &other) {
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
ReleaseResources();
|
|
||||||
|
|
||||||
expiration_flag_ = std::move(other.expiration_flag_);
|
|
||||||
flag_id_ = std::exchange(other.flag_id_, kInvalidFlagId);
|
|
||||||
timer_id_ = other.timer_id_;
|
|
||||||
|
|
||||||
return *this;
|
|
||||||
};
|
|
||||||
|
|
||||||
bool AsyncTimer::IsExpired() const noexcept {
|
bool AsyncTimer::IsExpired() const noexcept {
|
||||||
if (expiration_flag_ != nullptr) {
|
// TODO(gitbuda): Implement AsyncTimer::IsExpired
|
||||||
return expiration_flag_->load(std::memory_order_relaxed);
|
return true;
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
void AsyncTimer::ReleaseResources() {
|
|
||||||
if (expiration_flag_ != nullptr) {
|
|
||||||
timer_delete(timer_id_);
|
|
||||||
EraseFlag(flag_id_);
|
|
||||||
flag_id_ = kInvalidFlagId;
|
|
||||||
expiration_flag_ = std::shared_ptr<std::atomic<bool>>{};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace memgraph::utils
|
} // namespace memgraph::utils
|
||||||
|
@ -10,39 +10,21 @@
|
|||||||
// licenses/APL.txt.
|
// licenses/APL.txt.
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
#include <time.h>
|
|
||||||
|
|
||||||
#include <memory>
|
|
||||||
|
|
||||||
#include "utils/logging.hpp"
|
|
||||||
|
|
||||||
namespace memgraph::utils {
|
namespace memgraph::utils {
|
||||||
|
|
||||||
#define SIGTIMER (SIGRTMAX - 2)
|
|
||||||
|
|
||||||
class AsyncTimer {
|
class AsyncTimer {
|
||||||
public:
|
public:
|
||||||
AsyncTimer();
|
|
||||||
explicit AsyncTimer(double seconds);
|
explicit AsyncTimer(double seconds);
|
||||||
~AsyncTimer();
|
AsyncTimer() = default;
|
||||||
AsyncTimer(AsyncTimer &&other) noexcept;
|
~AsyncTimer() = default;
|
||||||
// NOLINTNEXTLINE (hicpp-noexcept-move)
|
AsyncTimer(AsyncTimer &&other) = default;
|
||||||
AsyncTimer &operator=(AsyncTimer &&other);
|
AsyncTimer &operator=(AsyncTimer &&other) = default;
|
||||||
|
|
||||||
AsyncTimer(const AsyncTimer &) = delete;
|
AsyncTimer(const AsyncTimer &) = delete;
|
||||||
AsyncTimer &operator=(const AsyncTimer &) = delete;
|
AsyncTimer &operator=(const AsyncTimer &) = delete;
|
||||||
|
|
||||||
// Returns false if the object isn't associated with any timer.
|
// Returns false if the object isn't associated with any timer.
|
||||||
bool IsExpired() const noexcept;
|
bool IsExpired() const noexcept;
|
||||||
|
|
||||||
private:
|
|
||||||
void ReleaseResources();
|
|
||||||
|
|
||||||
// If the expiration_flag_ is nullptr, then the object is not associated with any timer, therefore no clean up
|
|
||||||
// is necessary. Furthermore, the the POSIX API doesn't specify any value as "invalid" for timer_t, so the timer_id_
|
|
||||||
// cannot be used to determine whether the object is associated with any timer or not.
|
|
||||||
std::shared_ptr<std::atomic<bool>> expiration_flag_;
|
|
||||||
uint64_t flag_id_;
|
|
||||||
timer_t timer_id_;
|
|
||||||
};
|
};
|
||||||
} // namespace memgraph::utils
|
|
||||||
|
}
|
||||||
|
@ -60,7 +60,7 @@ void Reader::TryInitializeHeader() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (header->empty()) {
|
if (header->empty()) {
|
||||||
throw CsvReadException("CSV file {} empty!", path_);
|
throw CsvReadException("CSV file {} empty!", path_.string());
|
||||||
}
|
}
|
||||||
|
|
||||||
number_of_columns_ = header->size();
|
number_of_columns_ = header->size();
|
||||||
|
@ -52,7 +52,7 @@ void EnsureDirOrDie(const std::filesystem::path &dir) {
|
|||||||
MG_ASSERT(EnsureDir(dir),
|
MG_ASSERT(EnsureDir(dir),
|
||||||
"Couldn't create directory '{}' due to a permission issue or the "
|
"Couldn't create directory '{}' due to a permission issue or the "
|
||||||
"path exists and isn't a directory!",
|
"path exists and isn't a directory!",
|
||||||
dir);
|
dir.string());
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DirExists(const std::filesystem::path &dir) {
|
bool DirExists(const std::filesystem::path &dir) {
|
||||||
@ -82,7 +82,8 @@ bool RenamePath(const std::filesystem::path &src, const std::filesystem::path &d
|
|||||||
return !error_code;
|
return !error_code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static_assert(std::is_same_v<off_t, ssize_t>, "off_t must fit into ssize_t!");
|
// TODO(gitbuda): Port static_assert(off_t = ssize_t)
|
||||||
|
// static_assert(std::is_same_v<off_t, ssize_t>, "off_t must fit into ssize_t!");
|
||||||
|
|
||||||
InputFile::~InputFile() { Close(); }
|
InputFile::~InputFile() { Close(); }
|
||||||
|
|
||||||
@ -248,7 +249,7 @@ void InputFile::Close() noexcept {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
spdlog::error("While trying to close {} an error occured: {} ({})", path_, strerror(errno), errno);
|
spdlog::error("While trying to close {} an error occured: {} ({})", path_.string(), strerror(errno), errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
fd_ = -1;
|
fd_ = -1;
|
||||||
@ -321,7 +322,7 @@ void OutputFile::Open(const std::filesystem::path &path, Mode mode) {
|
|||||||
MG_ASSERT(!IsOpen(),
|
MG_ASSERT(!IsOpen(),
|
||||||
"While trying to open {} for writing the database"
|
"While trying to open {} for writing the database"
|
||||||
" used a handle that already has {} opened in it!",
|
" used a handle that already has {} opened in it!",
|
||||||
path, path_);
|
path.string(), path_.string());
|
||||||
path_ = path;
|
path_ = path;
|
||||||
written_since_last_sync_ = 0;
|
written_since_last_sync_ = 0;
|
||||||
|
|
||||||
@ -341,7 +342,7 @@ void OutputFile::Open(const std::filesystem::path &path, Mode mode) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MG_ASSERT(fd_ != -1, "While trying to open {} for writing an error occured: {} ({})", path_, strerror(errno), errno);
|
MG_ASSERT(fd_ != -1, "While trying to open {} for writing an error occured: {} ({})", path_.string(), strerror(errno), errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool OutputFile::IsOpen() const { return fd_ != -1; }
|
bool OutputFile::IsOpen() const { return fd_ != -1; }
|
||||||
@ -390,7 +391,7 @@ size_t OutputFile::SeekFile(const Position position, const ssize_t offset) {
|
|||||||
if (pos == -1 && errno == EINTR) {
|
if (pos == -1 && errno == EINTR) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
MG_ASSERT(pos >= 0, "While trying to set the position in {} an error occured: {} ({})", path_, strerror(errno),
|
MG_ASSERT(pos >= 0, "While trying to set the position in {} an error occured: {} ({})", path_.string(), strerror(errno),
|
||||||
errno);
|
errno);
|
||||||
return pos;
|
return pos;
|
||||||
}
|
}
|
||||||
@ -467,7 +468,7 @@ void OutputFile::Sync() {
|
|||||||
MG_ASSERT(ret == 0,
|
MG_ASSERT(ret == 0,
|
||||||
"While trying to sync {}, an error occurred: {} ({}). Possibly {} "
|
"While trying to sync {}, an error occurred: {} ({}). Possibly {} "
|
||||||
"bytes from previous write calls were lost.",
|
"bytes from previous write calls were lost.",
|
||||||
path_, strerror(errno), errno, written_since_last_sync_);
|
path_.string(), strerror(errno), errno, written_since_last_sync_);
|
||||||
|
|
||||||
// Reset the counter.
|
// Reset the counter.
|
||||||
written_since_last_sync_ = 0;
|
written_since_last_sync_ = 0;
|
||||||
@ -492,7 +493,7 @@ void OutputFile::Close() noexcept {
|
|||||||
MG_ASSERT(ret == 0,
|
MG_ASSERT(ret == 0,
|
||||||
"While trying to close {}, an error occurred: {} ({}). Possibly {} "
|
"While trying to close {}, an error occurred: {} ({}). Possibly {} "
|
||||||
"bytes from previous write calls were lost.",
|
"bytes from previous write calls were lost.",
|
||||||
path_, strerror(errno), errno, written_since_last_sync_);
|
path_.string(), strerror(errno), errno, written_since_last_sync_);
|
||||||
|
|
||||||
fd_ = -1;
|
fd_ = -1;
|
||||||
written_since_last_sync_ = 0;
|
written_since_last_sync_ = 0;
|
||||||
@ -512,7 +513,7 @@ void OutputFile::FlushBufferInternal() {
|
|||||||
MG_ASSERT(buffer_position_ <= kFileBufferSize,
|
MG_ASSERT(buffer_position_ <= kFileBufferSize,
|
||||||
"While trying to write to {} more file was written to the "
|
"While trying to write to {} more file was written to the "
|
||||||
"buffer than the buffer has space!",
|
"buffer than the buffer has space!",
|
||||||
path_);
|
path_.string());
|
||||||
|
|
||||||
auto *buffer = buffer_;
|
auto *buffer = buffer_;
|
||||||
auto buffer_position = buffer_position_.load();
|
auto buffer_position = buffer_position_.load();
|
||||||
@ -526,7 +527,7 @@ void OutputFile::FlushBufferInternal() {
|
|||||||
"while trying to write to {} an error occurred: {} ({}). "
|
"while trying to write to {} an error occurred: {} ({}). "
|
||||||
"Possibly {} bytes of data were lost from this call and "
|
"Possibly {} bytes of data were lost from this call and "
|
||||||
"possibly {} bytes were lost from previous calls.",
|
"possibly {} bytes were lost from previous calls.",
|
||||||
path_, strerror(errno), errno, buffer_position_, written_since_last_sync_);
|
path_.string(), strerror(errno), errno, buffer_position_, written_since_last_sync_);
|
||||||
|
|
||||||
buffer_position -= written;
|
buffer_position -= written;
|
||||||
buffer += written;
|
buffer += written;
|
||||||
|
@ -10,14 +10,17 @@
|
|||||||
// licenses/APL.txt.
|
// licenses/APL.txt.
|
||||||
|
|
||||||
#include "utils/file_locker.hpp"
|
#include "utils/file_locker.hpp"
|
||||||
|
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
|
|
||||||
|
#include "utils/logging.hpp"
|
||||||
|
|
||||||
namespace memgraph::utils {
|
namespace memgraph::utils {
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
void DeleteFromSystem(const std::filesystem::path &path) {
|
void DeleteFromSystem(const std::filesystem::path &path) {
|
||||||
if (!utils::DeleteFile(path)) {
|
if (!utils::DeleteFile(path)) {
|
||||||
spdlog::warn("Couldn't delete file {}!", path);
|
spdlog::warn("Couldn't delete file {}!", path.string());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} // namespace
|
} // namespace
|
||||||
@ -25,7 +28,7 @@ void DeleteFromSystem(const std::filesystem::path &path) {
|
|||||||
////// FileRetainer //////
|
////// FileRetainer //////
|
||||||
void FileRetainer::DeleteFile(const std::filesystem::path &path) {
|
void FileRetainer::DeleteFile(const std::filesystem::path &path) {
|
||||||
if (!std::filesystem::exists(path)) {
|
if (!std::filesystem::exists(path)) {
|
||||||
spdlog::info("File {} doesn't exist.", path);
|
spdlog::info("File {} doesn't exist.", path.string());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
// licenses/APL.txt.
|
// licenses/APL.txt.
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
129
src/utils/lock/linux_rw_lock.hpp
Normal file
129
src/utils/lock/linux_rw_lock.hpp
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
// Copyright 2022 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
/// @file
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <cerrno>
|
||||||
|
|
||||||
|
#include "utils/logging.hpp"
|
||||||
|
|
||||||
|
namespace memgraph::utils::lock {
|
||||||
|
|
||||||
|
/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to
|
||||||
|
/// choose read or write priority for `std::shared_mutex`.
|
||||||
|
class RWLock {
|
||||||
|
public:
|
||||||
|
/// By passing the appropriate parameter to the `RWLock` constructor, it is
|
||||||
|
/// possible to control the behavior of `RWLock` while shared lock is held. If
|
||||||
|
/// the priority is set to `READ`, new shared (read) locks can be obtained
|
||||||
|
/// even though there is a thread waiting for an exclusive (write) lock, which
|
||||||
|
/// can lead to writer starvation. If the priority is set to `WRITE`, readers
|
||||||
|
/// will be blocked from obtaining new shared locks while there are writers
|
||||||
|
/// waiting, which can lead to reader starvation.
|
||||||
|
enum class Priority { READ, WRITE };
|
||||||
|
|
||||||
|
/// Construct a RWLock object with chosen priority. See comment above
|
||||||
|
/// `RWLockPriority` for details.
|
||||||
|
explicit RWLock(Priority priority) {
|
||||||
|
pthread_rwlockattr_t attr;
|
||||||
|
|
||||||
|
MG_ASSERT(pthread_rwlockattr_init(&attr) == 0, "Couldn't initialize utils::RWLock!");
|
||||||
|
|
||||||
|
switch (priority) {
|
||||||
|
case Priority::READ:
|
||||||
|
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_READER_NP);
|
||||||
|
break;
|
||||||
|
case Priority::WRITE:
|
||||||
|
// There is also `PTHREAD_RWLOCK_PREFER_WRITER_NP` but it is not
|
||||||
|
// providing the desired behavior.
|
||||||
|
//
|
||||||
|
// From `man 7 pthread_rwlockattr_setkind_np`:
|
||||||
|
// "Setting the value read-write lock kind to
|
||||||
|
// PTHREAD_RWLOCK_PREFER_WRITER_NP results in the same behavior as
|
||||||
|
// setting the value to PTHREAD_RWLOCK_PREFER_READER_NP. As long as a
|
||||||
|
// reader thread holds the lock, the thread holding a write lock will be
|
||||||
|
// starved. Setting the lock kind to
|
||||||
|
// PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP allows writers to run,
|
||||||
|
// but, as the name implies a writer may not lock recursively."
|
||||||
|
//
|
||||||
|
// For this reason, `RWLock` should not be used recursively.
|
||||||
|
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
MG_ASSERT(pthread_rwlock_init(&lock_, &attr) == 0, "Couldn't initialize utils::RWLock!");
|
||||||
|
pthread_rwlockattr_destroy(&attr);
|
||||||
|
}
|
||||||
|
|
||||||
|
RWLock(const RWLock &) = delete;
|
||||||
|
RWLock &operator=(const RWLock &) = delete;
|
||||||
|
RWLock(RWLock &&) = delete;
|
||||||
|
RWLock &operator=(RWLock &&) = delete;
|
||||||
|
|
||||||
|
~RWLock() { pthread_rwlock_destroy(&lock_); }
|
||||||
|
|
||||||
|
void lock() { MG_ASSERT(pthread_rwlock_wrlock(&lock_) == 0, "Couldn't lock utils::RWLock!"); }
|
||||||
|
|
||||||
|
bool try_lock() {
|
||||||
|
int err = pthread_rwlock_trywrlock(&lock_);
|
||||||
|
if (err == 0) return true;
|
||||||
|
MG_ASSERT(err == EBUSY, "Couldn't try lock utils::RWLock!");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void unlock() { MG_ASSERT(pthread_rwlock_unlock(&lock_) == 0, "Couldn't unlock utils::RWLock!"); }
|
||||||
|
|
||||||
|
void lock_shared() {
|
||||||
|
int err;
|
||||||
|
while (true) {
|
||||||
|
err = pthread_rwlock_rdlock(&lock_);
|
||||||
|
if (err == 0) {
|
||||||
|
return;
|
||||||
|
} else if (err == EAGAIN) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
LOG_FATAL("Couldn't lock shared utils::RWLock!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool try_lock_shared() {
|
||||||
|
int err;
|
||||||
|
while (true) {
|
||||||
|
err = pthread_rwlock_tryrdlock(&lock_);
|
||||||
|
if (err == 0) {
|
||||||
|
return true;
|
||||||
|
} else if (err == EBUSY) {
|
||||||
|
return false;
|
||||||
|
} else if (err == EAGAIN) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
LOG_FATAL("Couldn't try lock shared utils::RWLock!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void unlock_shared() { MG_ASSERT(pthread_rwlock_unlock(&lock_) == 0, "Couldn't unlock shared utils::RWLock!"); }
|
||||||
|
|
||||||
|
private:
|
||||||
|
pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER;
|
||||||
|
};
|
||||||
|
|
||||||
|
class WritePrioritizedRWLock final : public RWLock {
|
||||||
|
public:
|
||||||
|
WritePrioritizedRWLock() : RWLock{Priority::WRITE} {};
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace memgraph::utils
|
78
src/utils/lock/linux_spin_lock.hpp
Normal file
78
src/utils/lock/linux_spin_lock.hpp
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
// Copyright 2022 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
#include "utils/logging.hpp"
|
||||||
|
|
||||||
|
namespace memgraph::utils::lock {
|
||||||
|
|
||||||
|
/// This class is a wrapper around the `pthread_spinlock_t`. It provides a
|
||||||
|
/// generic spin lock. The lock should be used in cases where you know that the
|
||||||
|
/// lock will be contended only for short periods of time. This lock doesn't
|
||||||
|
/// make any kernel calls (like sleep, or context switching) during its wait for
|
||||||
|
/// the lock to be acquired. This property is only useful when the lock will be
|
||||||
|
/// held for short periods of time and you don't want to introduce the extra
|
||||||
|
/// delays of a sleep or context switch. On the assembly level
|
||||||
|
/// `pthread_spinlock_t` is optimized to use less power, reduce branch
|
||||||
|
/// mispredictions, etc... The explanation can be seen here:
|
||||||
|
/// https://stackoverflow.com/questions/26583433/c11-implementation-of-spinlock-using-atomic/29195378#29195378
|
||||||
|
/// https://software.intel.com/en-us/node/524249
|
||||||
|
class SpinLock {
|
||||||
|
public:
|
||||||
|
SpinLock() {
|
||||||
|
// `pthread_spin_init` returns -1 only when there isn't enough memory to
|
||||||
|
// initialize the lock. That should never occur because the
|
||||||
|
// `pthread_spinlock_t` is an `int` and memory isn't allocated by this init.
|
||||||
|
// The message is probably here to suit all other platforms...
|
||||||
|
MG_ASSERT(pthread_spin_init(&lock_, PTHREAD_PROCESS_PRIVATE) == 0, "Couldn't construct utils::SpinLock!");
|
||||||
|
}
|
||||||
|
|
||||||
|
SpinLock(SpinLock &&other) noexcept : lock_(other.lock_) {
|
||||||
|
MG_ASSERT(pthread_spin_init(&other.lock_, PTHREAD_PROCESS_PRIVATE) == 0, "Couldn't construct utils::SpinLock!");
|
||||||
|
}
|
||||||
|
|
||||||
|
SpinLock &operator=(SpinLock &&other) noexcept {
|
||||||
|
MG_ASSERT(pthread_spin_destroy(&lock_) == 0, "Couldn't destruct utils::SpinLock!");
|
||||||
|
lock_ = other.lock_;
|
||||||
|
MG_ASSERT(pthread_spin_init(&other.lock_, PTHREAD_PROCESS_PRIVATE) == 0, "Couldn't construct utils::SpinLock!");
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
SpinLock(const SpinLock &) = delete;
|
||||||
|
SpinLock &operator=(const SpinLock &) = delete;
|
||||||
|
|
||||||
|
~SpinLock() { MG_ASSERT(pthread_spin_destroy(&lock_) == 0, "Couldn't destruct utils::SpinLock!"); }
|
||||||
|
|
||||||
|
void lock() {
|
||||||
|
// `pthread_spin_lock` returns -1 only when there is a deadlock detected
|
||||||
|
// (errno EDEADLOCK).
|
||||||
|
MG_ASSERT(pthread_spin_lock(&lock_) == 0, "Couldn't lock utils::SpinLock!");
|
||||||
|
}
|
||||||
|
|
||||||
|
bool try_lock() {
|
||||||
|
// `pthread_spin_trylock` returns -1 only when the lock is already locked
|
||||||
|
// (errno EBUSY).
|
||||||
|
return pthread_spin_trylock(&lock_) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void unlock() {
|
||||||
|
// `pthread_spin_unlock` has no documented error codes that it could return,
|
||||||
|
// so any error is a fatal error.
|
||||||
|
MG_ASSERT(pthread_spin_unlock(&lock_) == 0, "Couldn't unlock utils::SpinLock!");
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
pthread_spinlock_t lock_;
|
||||||
|
};
|
||||||
|
} // namespace memgraph::utils
|
@ -18,7 +18,7 @@
|
|||||||
|
|
||||||
namespace memgraph::utils {
|
namespace memgraph::utils {
|
||||||
|
|
||||||
static_assert(std::is_same_v<uint64_t, unsigned long>,
|
static_assert(std::is_same_v<uint64_t, unsigned long long>,
|
||||||
"utils::Log requires uint64_t to be implemented as unsigned long.");
|
"utils::Log requires uint64_t to be implemented as unsigned long.");
|
||||||
|
|
||||||
/// This function computes the log2 function on integer types. It is faster than
|
/// This function computes the log2 function on integer types. It is faster than
|
||||||
|
@ -87,7 +87,8 @@ void MonotonicBufferResource::Release() {
|
|||||||
|
|
||||||
void *MonotonicBufferResource::DoAllocate(size_t bytes, size_t alignment) {
|
void *MonotonicBufferResource::DoAllocate(size_t bytes, size_t alignment) {
|
||||||
static_assert(std::is_same_v<size_t, uintptr_t>);
|
static_assert(std::is_same_v<size_t, uintptr_t>);
|
||||||
static_assert(std::is_same_v<size_t, uint64_t>);
|
// TODO(gitbuda): Fix huge static_assert that size_t == uint64_t
|
||||||
|
//static_assert(std::is_same_v<size_t, uint64_t>);
|
||||||
auto push_current_buffer = [this, bytes, alignment](size_t next_size) {
|
auto push_current_buffer = [this, bytes, alignment](size_t next_size) {
|
||||||
// Set size so that the bytes fit.
|
// Set size so that the bytes fit.
|
||||||
const size_t size = next_size > bytes ? next_size : bytes;
|
const size_t size = next_size > bytes ? next_size : bytes;
|
||||||
|
@ -12,113 +12,47 @@
|
|||||||
/// @file
|
/// @file
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
|
|
||||||
#include <cerrno>
|
|
||||||
|
|
||||||
#include "utils/logging.hpp"
|
|
||||||
|
|
||||||
namespace memgraph::utils {
|
namespace memgraph::utils {
|
||||||
|
|
||||||
/// A wrapper around `pthread_rwlock_t`, useful because it is not possible to
|
|
||||||
/// choose read or write priority for `std::shared_mutex`.
|
|
||||||
class RWLock {
|
class RWLock {
|
||||||
public:
|
public:
|
||||||
/// By passing the appropriate parameter to the `RWLock` constructor, it is
|
|
||||||
/// possible to control the behavior of `RWLock` while shared lock is held. If
|
|
||||||
/// the priority is set to `READ`, new shared (read) locks can be obtained
|
|
||||||
/// even though there is a thread waiting for an exclusive (write) lock, which
|
|
||||||
/// can lead to writer starvation. If the priority is set to `WRITE`, readers
|
|
||||||
/// will be blocked from obtaining new shared locks while there are writers
|
|
||||||
/// waiting, which can lead to reader starvation.
|
|
||||||
enum class Priority { READ, WRITE };
|
enum class Priority { READ, WRITE };
|
||||||
|
|
||||||
/// Construct a RWLock object with chosen priority. See comment above
|
|
||||||
/// `RWLockPriority` for details.
|
|
||||||
explicit RWLock(Priority priority) {
|
explicit RWLock(Priority priority) {
|
||||||
pthread_rwlockattr_t attr;
|
// TODO(gitbuda): Implement RWLock::RWLock(Priority)
|
||||||
|
|
||||||
MG_ASSERT(pthread_rwlockattr_init(&attr) == 0, "Couldn't initialize utils::RWLock!");
|
|
||||||
|
|
||||||
switch (priority) {
|
|
||||||
case Priority::READ:
|
|
||||||
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_READER_NP);
|
|
||||||
break;
|
|
||||||
case Priority::WRITE:
|
|
||||||
// There is also `PTHREAD_RWLOCK_PREFER_WRITER_NP` but it is not
|
|
||||||
// providing the desired behavior.
|
|
||||||
//
|
|
||||||
// From `man 7 pthread_rwlockattr_setkind_np`:
|
|
||||||
// "Setting the value read-write lock kind to
|
|
||||||
// PTHREAD_RWLOCK_PREFER_WRITER_NP results in the same behavior as
|
|
||||||
// setting the value to PTHREAD_RWLOCK_PREFER_READER_NP. As long as a
|
|
||||||
// reader thread holds the lock, the thread holding a write lock will be
|
|
||||||
// starved. Setting the lock kind to
|
|
||||||
// PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP allows writers to run,
|
|
||||||
// but, as the name implies a writer may not lock recursively."
|
|
||||||
//
|
|
||||||
// For this reason, `RWLock` should not be used recursively.
|
|
||||||
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
MG_ASSERT(pthread_rwlock_init(&lock_, &attr) == 0, "Couldn't initialize utils::RWLock!");
|
|
||||||
pthread_rwlockattr_destroy(&attr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RWLock(const RWLock &) = delete;
|
RWLock(const RWLock &) = delete;
|
||||||
RWLock &operator=(const RWLock &) = delete;
|
RWLock &operator=(const RWLock &) = delete;
|
||||||
RWLock(RWLock &&) = delete;
|
RWLock(RWLock &&) = delete;
|
||||||
RWLock &operator=(RWLock &&) = delete;
|
RWLock &operator=(RWLock &&) = delete;
|
||||||
|
~RWLock() = default;
|
||||||
|
|
||||||
~RWLock() { pthread_rwlock_destroy(&lock_); }
|
void lock() {
|
||||||
|
// TODO(gitbuda): Implement RWLock::lock
|
||||||
void lock() { MG_ASSERT(pthread_rwlock_wrlock(&lock_) == 0, "Couldn't lock utils::RWLock!"); }
|
}
|
||||||
|
|
||||||
bool try_lock() {
|
bool try_lock() {
|
||||||
int err = pthread_rwlock_trywrlock(&lock_);
|
// TODO(gitbuda): Implement RWLock::try_lock
|
||||||
if (err == 0) return true;
|
|
||||||
MG_ASSERT(err == EBUSY, "Couldn't try lock utils::RWLock!");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void unlock() { MG_ASSERT(pthread_rwlock_unlock(&lock_) == 0, "Couldn't unlock utils::RWLock!"); }
|
void unlock() {
|
||||||
|
// TODO(gitbuda): Implement RWLock::unlock
|
||||||
|
}
|
||||||
|
|
||||||
void lock_shared() {
|
void lock_shared() {
|
||||||
int err;
|
// TODO(gitbuda): Implement RWLock::lock_shared
|
||||||
while (true) {
|
|
||||||
err = pthread_rwlock_rdlock(&lock_);
|
|
||||||
if (err == 0) {
|
|
||||||
return;
|
|
||||||
} else if (err == EAGAIN) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
LOG_FATAL("Couldn't lock shared utils::RWLock!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool try_lock_shared() {
|
bool try_lock_shared() {
|
||||||
int err;
|
// TODO(gitbuda): Implement RWLock::try_lock_shared
|
||||||
while (true) {
|
return false;
|
||||||
err = pthread_rwlock_tryrdlock(&lock_);
|
|
||||||
if (err == 0) {
|
|
||||||
return true;
|
|
||||||
} else if (err == EBUSY) {
|
|
||||||
return false;
|
|
||||||
} else if (err == EAGAIN) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
LOG_FATAL("Couldn't try lock shared utils::RWLock!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void unlock_shared() { MG_ASSERT(pthread_rwlock_unlock(&lock_) == 0, "Couldn't unlock shared utils::RWLock!"); }
|
void unlock_shared() {
|
||||||
|
// TODO(gitbuda): Implement RWLock::try_lock_shared
|
||||||
private:
|
}
|
||||||
pthread_rwlock_t lock_ = PTHREAD_RWLOCK_INITIALIZER;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class WritePrioritizedRWLock final : public RWLock {
|
class WritePrioritizedRWLock final : public RWLock {
|
||||||
|
@ -11,68 +11,30 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
|
|
||||||
#include "utils/logging.hpp"
|
|
||||||
|
|
||||||
namespace memgraph::utils {
|
namespace memgraph::utils {
|
||||||
|
|
||||||
/// This class is a wrapper around the `pthread_spinlock_t`. It provides a
|
|
||||||
/// generic spin lock. The lock should be used in cases where you know that the
|
|
||||||
/// lock will be contended only for short periods of time. This lock doesn't
|
|
||||||
/// make any kernel calls (like sleep, or context switching) during its wait for
|
|
||||||
/// the lock to be acquired. This property is only useful when the lock will be
|
|
||||||
/// held for short periods of time and you don't want to introduce the extra
|
|
||||||
/// delays of a sleep or context switch. On the assembly level
|
|
||||||
/// `pthread_spinlock_t` is optimized to use less power, reduce branch
|
|
||||||
/// mispredictions, etc... The explanation can be seen here:
|
|
||||||
/// https://stackoverflow.com/questions/26583433/c11-implementation-of-spinlock-using-atomic/29195378#29195378
|
|
||||||
/// https://software.intel.com/en-us/node/524249
|
|
||||||
class SpinLock {
|
class SpinLock {
|
||||||
public:
|
public:
|
||||||
SpinLock() {
|
SpinLock() {}
|
||||||
// `pthread_spin_init` returns -1 only when there isn't enough memory to
|
|
||||||
// initialize the lock. That should never occur because the
|
|
||||||
// `pthread_spinlock_t` is an `int` and memory isn't allocated by this init.
|
|
||||||
// The message is probably here to suit all other platforms...
|
|
||||||
MG_ASSERT(pthread_spin_init(&lock_, PTHREAD_PROCESS_PRIVATE) == 0, "Couldn't construct utils::SpinLock!");
|
|
||||||
}
|
|
||||||
|
|
||||||
SpinLock(SpinLock &&other) noexcept : lock_(other.lock_) {
|
|
||||||
MG_ASSERT(pthread_spin_init(&other.lock_, PTHREAD_PROCESS_PRIVATE) == 0, "Couldn't construct utils::SpinLock!");
|
|
||||||
}
|
|
||||||
|
|
||||||
SpinLock &operator=(SpinLock &&other) noexcept {
|
|
||||||
MG_ASSERT(pthread_spin_destroy(&lock_) == 0, "Couldn't destruct utils::SpinLock!");
|
|
||||||
lock_ = other.lock_;
|
|
||||||
MG_ASSERT(pthread_spin_init(&other.lock_, PTHREAD_PROCESS_PRIVATE) == 0, "Couldn't construct utils::SpinLock!");
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
SpinLock(SpinLock &&other) = default;
|
||||||
|
SpinLock &operator=(SpinLock &&other) = default;
|
||||||
SpinLock(const SpinLock &) = delete;
|
SpinLock(const SpinLock &) = delete;
|
||||||
SpinLock &operator=(const SpinLock &) = delete;
|
SpinLock &operator=(const SpinLock &) = delete;
|
||||||
|
~SpinLock() = default;
|
||||||
~SpinLock() { MG_ASSERT(pthread_spin_destroy(&lock_) == 0, "Couldn't destruct utils::SpinLock!"); }
|
|
||||||
|
|
||||||
void lock() {
|
void lock() {
|
||||||
// `pthread_spin_lock` returns -1 only when there is a deadlock detected
|
// TODO(gitbuda): Implement SpinLock::lock
|
||||||
// (errno EDEADLOCK).
|
|
||||||
MG_ASSERT(pthread_spin_lock(&lock_) == 0, "Couldn't lock utils::SpinLock!");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool try_lock() {
|
bool try_lock() {
|
||||||
// `pthread_spin_trylock` returns -1 only when the lock is already locked
|
// TODO(gitbuda): Implement SpinLock::try_lock
|
||||||
// (errno EBUSY).
|
return false;
|
||||||
return pthread_spin_trylock(&lock_) == 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void unlock() {
|
void unlock() {
|
||||||
// `pthread_spin_unlock` has no documented error codes that it could return,
|
// TODO(gitbuda): Implement SpinLock::unlock
|
||||||
// so any error is a fatal error.
|
|
||||||
MG_ASSERT(pthread_spin_unlock(&lock_) == 0, "Couldn't unlock utils::SpinLock!");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
|
||||||
pthread_spinlock_t lock_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace memgraph::utils
|
} // namespace memgraph::utils
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include <execinfo.h>
|
#include <execinfo.h>
|
||||||
#include <fmt/format.h>
|
#include <fmt/format.h>
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
#include "utils/on_scope_exit.hpp"
|
#include "utils/on_scope_exit.hpp"
|
||||||
|
|
||||||
|
@ -37,10 +37,11 @@ std::optional<T> ParseNumber(const std::string_view string, const size_t size) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
T value{};
|
T value{};
|
||||||
if (const auto [p, ec] = std::from_chars(string.data(), string.data() + size, value);
|
// TODO(gitbuda): Figure out what to do with deleted std::from_chars function
|
||||||
ec != std::errc() || p != string.data() + size) {
|
// if (const auto [p, ec] = std::from_chars(string.data(), string.data() + size, value);
|
||||||
return std::nullopt;
|
// ec != std::errc() || p != string.data() + size) {
|
||||||
}
|
// return std::nullopt;
|
||||||
|
// }
|
||||||
|
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
|
@ -11,19 +11,10 @@
|
|||||||
|
|
||||||
#include "utils/thread.hpp"
|
#include "utils/thread.hpp"
|
||||||
|
|
||||||
#include <sys/prctl.h>
|
|
||||||
|
|
||||||
#include "utils/logging.hpp"
|
|
||||||
|
|
||||||
namespace memgraph::utils {
|
namespace memgraph::utils {
|
||||||
|
|
||||||
void ThreadSetName(const std::string &name) {
|
void ThreadSetName(const std::string &name) {
|
||||||
static constexpr auto max_name_length = GetMaxThreadNameSize();
|
// TODO(gitbuda): Implement cross platform ThreadSetName func
|
||||||
MG_ASSERT(name.size() <= max_name_length, "Thread name '{}' is too long", max_name_length);
|
|
||||||
|
|
||||||
if (prctl(PR_SET_NAME, name.c_str()) != 0) {
|
|
||||||
spdlog::warn("Couldn't set thread name: {}!", name);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace memgraph::utils
|
} // namespace memgraph::utils
|
||||||
|
29
src/utils/thread/linux_thread.cpp
Normal file
29
src/utils/thread/linux_thread.cpp
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
// Copyright 2022 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
#include "utils/thread.hpp"
|
||||||
|
|
||||||
|
#include <sys/prctl.h>
|
||||||
|
|
||||||
|
#include "utils/logging.hpp"
|
||||||
|
|
||||||
|
namespace memgraph::utils {
|
||||||
|
|
||||||
|
void ThreadSetName(const std::string &name) {
|
||||||
|
static constexpr auto max_name_length = GetMaxThreadNameSize();
|
||||||
|
MG_ASSERT(name.size() <= max_name_length, "Thread name '{}' is too long", max_name_length);
|
||||||
|
|
||||||
|
if (prctl(PR_SET_NAME, name.c_str()) != 0) {
|
||||||
|
spdlog::warn("Couldn't set thread name: {}!", name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace memgraph::utils
|
25
src/utils/thread/linux_thread.hpp
Normal file
25
src/utils/thread/linux_thread.hpp
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
// Copyright 2022 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
/// @file
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace memgraph::utils {
|
||||||
|
|
||||||
|
constexpr size_t GetMaxThreadNameSize() { return 16; }
|
||||||
|
|
||||||
|
/// This function sets the thread name of the calling thread.
|
||||||
|
/// Beware, the name length limit is 16 characters!
|
||||||
|
void ThreadSetName(const std::string &name);
|
||||||
|
|
||||||
|
}; // namespace memgraph::utils
|
198
src/utils/timer/linux_async_timer.cpp
Normal file
198
src/utils/timer/linux_async_timer.cpp
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
// Copyright 2022 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
#include "utils/timer/linux_async_timer.hpp"
|
||||||
|
|
||||||
|
#include <csignal>
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
|
#include <atomic>
|
||||||
|
#include <cmath>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <limits>
|
||||||
|
|
||||||
|
#include "utils/skip_list.hpp"
|
||||||
|
#include "utils/spin_lock.hpp"
|
||||||
|
#include "utils/synchronized.hpp"
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
inline constexpr uint64_t kInvalidFlagId = 0U;
|
||||||
|
// std::numeric_limits<time_t>::max() cannot be represented precisely as a double, so the next smallest value is the
|
||||||
|
// maximum number of seconds the timer can be used with
|
||||||
|
const double max_seconds_as_double = std::nexttoward(std::numeric_limits<time_t>::max(), 0.0);
|
||||||
|
|
||||||
|
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||||
|
std::atomic<uint64_t> expiration_flag_counter{kInvalidFlagId + 1U};
|
||||||
|
|
||||||
|
struct ExpirationFlagInfo {
|
||||||
|
uint64_t id{0U};
|
||||||
|
std::weak_ptr<std::atomic<bool>> flag{};
|
||||||
|
};
|
||||||
|
|
||||||
|
bool operator==(const ExpirationFlagInfo &lhs, const ExpirationFlagInfo &rhs) { return lhs.id == rhs.id; }
|
||||||
|
bool operator<(const ExpirationFlagInfo &lhs, const ExpirationFlagInfo &rhs) { return lhs.id < rhs.id; }
|
||||||
|
bool operator==(const ExpirationFlagInfo &flag_info, const uint64_t id) { return flag_info.id == id; }
|
||||||
|
bool operator<(const ExpirationFlagInfo &flag_info, const uint64_t id) { return flag_info.id < id; }
|
||||||
|
|
||||||
|
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
|
||||||
|
memgraph::utils::SkipList<ExpirationFlagInfo> expiration_flags{};
|
||||||
|
|
||||||
|
uint64_t AddFlag(std::weak_ptr<std::atomic<bool>> flag) {
|
||||||
|
const auto id = expiration_flag_counter.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
expiration_flags.access().insert({id, std::move(flag)});
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
void EraseFlag(uint64_t flag_id) { expiration_flags.access().remove(flag_id); }
|
||||||
|
|
||||||
|
std::weak_ptr<std::atomic<bool>> GetFlag(uint64_t flag_id) {
|
||||||
|
const auto flag_accessor = expiration_flags.access();
|
||||||
|
const auto it = flag_accessor.find(flag_id);
|
||||||
|
if (it == flag_accessor.end()) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
return it->flag;
|
||||||
|
}
|
||||||
|
|
||||||
|
void MarkDone(const uint64_t flag_id) {
|
||||||
|
const auto weak_flag = GetFlag(flag_id);
|
||||||
|
if (weak_flag.expired()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto flag = weak_flag.lock();
|
||||||
|
if (flag != nullptr) {
|
||||||
|
flag->store(true, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
namespace memgraph::utils {
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
struct ThreadInfo {
|
||||||
|
pid_t thread_id;
|
||||||
|
std::atomic<bool> setup_done{false};
|
||||||
|
};
|
||||||
|
|
||||||
|
void *TimerBackgroundWorker(void *args) {
|
||||||
|
auto *thread_info = static_cast<ThreadInfo *>(args);
|
||||||
|
thread_info->thread_id = syscall(SYS_gettid);
|
||||||
|
thread_info->setup_done.store(true, std::memory_order_release);
|
||||||
|
|
||||||
|
sigset_t ss;
|
||||||
|
sigemptyset(&ss);
|
||||||
|
sigaddset(&ss, SIGTIMER);
|
||||||
|
sigprocmask(SIG_BLOCK, &ss, nullptr);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
siginfo_t si;
|
||||||
|
int result = sigwaitinfo(&ss, &si);
|
||||||
|
|
||||||
|
if (result <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (si.si_code == SI_TIMER) {
|
||||||
|
auto flag_id = kInvalidFlagId;
|
||||||
|
std::memcpy(&flag_id, &si.si_value.sival_ptr, sizeof(flag_id));
|
||||||
|
MarkDone(flag_id);
|
||||||
|
} else if (si.si_code == SI_TKILL) {
|
||||||
|
pthread_exit(nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
AsyncTimer::AsyncTimer() : flag_id_{kInvalidFlagId} {};
|
||||||
|
|
||||||
|
AsyncTimer::AsyncTimer(double seconds)
|
||||||
|
: expiration_flag_{std::make_shared<std::atomic<bool>>(false)}, flag_id_{kInvalidFlagId}, timer_id_{} {
|
||||||
|
MG_ASSERT(seconds <= max_seconds_as_double,
|
||||||
|
"The AsyncTimer cannot handle larger time values than {:f}, the specified value: {:f}",
|
||||||
|
max_seconds_as_double, seconds);
|
||||||
|
MG_ASSERT(seconds >= 0.0, "The AsyncTimer cannot handle negative time values: {:f}", seconds);
|
||||||
|
|
||||||
|
static pthread_t background_timer_thread;
|
||||||
|
static ThreadInfo thread_info;
|
||||||
|
static std::once_flag timer_thread_setup_flag;
|
||||||
|
|
||||||
|
std::call_once(timer_thread_setup_flag, [] {
|
||||||
|
pthread_create(&background_timer_thread, nullptr, TimerBackgroundWorker, &thread_info);
|
||||||
|
while (!thread_info.setup_done.load(std::memory_order_acquire))
|
||||||
|
;
|
||||||
|
});
|
||||||
|
|
||||||
|
flag_id_ = AddFlag(std::weak_ptr<std::atomic<bool>>{expiration_flag_});
|
||||||
|
|
||||||
|
sigevent notification_settings{};
|
||||||
|
notification_settings.sigev_notify = SIGEV_THREAD_ID;
|
||||||
|
notification_settings.sigev_signo = SIGTIMER;
|
||||||
|
notification_settings._sigev_un._tid = thread_info.thread_id;
|
||||||
|
static_assert(sizeof(void *) == sizeof(flag_id_), "ID size must be equal to pointer size!");
|
||||||
|
std::memcpy(¬ification_settings.sigev_value.sival_ptr, &flag_id_, sizeof(flag_id_));
|
||||||
|
MG_ASSERT(timer_create(CLOCK_MONOTONIC, ¬ification_settings, &timer_id_) == 0, "Couldn't create timer: ({}) {}",
|
||||||
|
errno, strerror(errno));
|
||||||
|
|
||||||
|
static constexpr auto kSecondsToNanos = 1000 * 1000 * 1000;
|
||||||
|
// Casting will truncate down, but that's exactly what we want.
|
||||||
|
const auto second_as_time_t = static_cast<time_t>(seconds);
|
||||||
|
const auto remaining_nano_seconds = static_cast<time_t>((seconds - second_as_time_t) * kSecondsToNanos);
|
||||||
|
|
||||||
|
struct itimerspec spec;
|
||||||
|
spec.it_interval.tv_sec = 0;
|
||||||
|
spec.it_interval.tv_nsec = 0;
|
||||||
|
spec.it_value.tv_sec = second_as_time_t;
|
||||||
|
spec.it_value.tv_nsec = remaining_nano_seconds;
|
||||||
|
|
||||||
|
MG_ASSERT(timer_settime(timer_id_, 0, &spec, nullptr) == 0, "Couldn't set timer: ({}) {}", errno, strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
AsyncTimer::~AsyncTimer() { ReleaseResources(); }
|
||||||
|
|
||||||
|
AsyncTimer::AsyncTimer(AsyncTimer &&other) noexcept
|
||||||
|
: expiration_flag_{std::move(other.expiration_flag_)}, flag_id_{other.flag_id_}, timer_id_{other.timer_id_} {
|
||||||
|
other.flag_id_ = kInvalidFlagId;
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOLINTNEXTLINE (hicpp-noexcept-move)
|
||||||
|
AsyncTimer &AsyncTimer::operator=(AsyncTimer &&other) {
|
||||||
|
if (this == &other) {
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
ReleaseResources();
|
||||||
|
|
||||||
|
expiration_flag_ = std::move(other.expiration_flag_);
|
||||||
|
flag_id_ = std::exchange(other.flag_id_, kInvalidFlagId);
|
||||||
|
timer_id_ = other.timer_id_;
|
||||||
|
|
||||||
|
return *this;
|
||||||
|
};
|
||||||
|
|
||||||
|
bool AsyncTimer::IsExpired() const noexcept {
|
||||||
|
if (expiration_flag_ != nullptr) {
|
||||||
|
return expiration_flag_->load(std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void AsyncTimer::ReleaseResources() {
|
||||||
|
if (expiration_flag_ != nullptr) {
|
||||||
|
timer_delete(timer_id_);
|
||||||
|
EraseFlag(flag_id_);
|
||||||
|
flag_id_ = kInvalidFlagId;
|
||||||
|
expiration_flag_ = std::shared_ptr<std::atomic<bool>>{};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace memgraph::utils
|
48
src/utils/timer/linux_async_timer.hpp
Normal file
48
src/utils/timer/linux_async_timer.hpp
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
// Copyright 2022 Memgraph Ltd.
|
||||||
|
//
|
||||||
|
// Use of this software is governed by the Business Source License
|
||||||
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
|
// License, and you may not use this file except in compliance with the Business Source License.
|
||||||
|
//
|
||||||
|
// As of the Change Date specified in that file, in accordance with
|
||||||
|
// the Business Source License, use of this software will be governed
|
||||||
|
// by the Apache License, Version 2.0, included in the file
|
||||||
|
// licenses/APL.txt.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
#include "utils/logging.hpp"
|
||||||
|
|
||||||
|
namespace memgraph::utils::timer {
|
||||||
|
|
||||||
|
#define SIGTIMER (SIGRTMAX - 2)
|
||||||
|
|
||||||
|
class AsyncTimer {
|
||||||
|
public:
|
||||||
|
explicit AsyncTimer(double seconds);
|
||||||
|
AsyncTimer();
|
||||||
|
~AsyncTimer();
|
||||||
|
AsyncTimer(AsyncTimer &&other) noexcept;
|
||||||
|
// NOLINTNEXTLINE (hicpp-noexcept-move)
|
||||||
|
AsyncTimer &operator=(AsyncTimer &&other);
|
||||||
|
AsyncTimer(const AsyncTimer &) = delete;
|
||||||
|
AsyncTimer &operator=(const AsyncTimer &) = delete;
|
||||||
|
|
||||||
|
// Returns false if the object isn't associated with any timer.
|
||||||
|
bool IsExpired() const noexcept;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void ReleaseResources();
|
||||||
|
|
||||||
|
// If the expiration_flag_ is nullptr, then the object is not associated with any timer, therefore no clean up
|
||||||
|
// is necessary. Furthermore, the the POSIX API doesn't specify any value as "invalid" for timer_t, so the timer_id_
|
||||||
|
// cannot be used to determine whether the object is associated with any timer or not.
|
||||||
|
std::shared_ptr<std::atomic<bool>> expiration_flag_;
|
||||||
|
uint64_t flag_id_;
|
||||||
|
timer_t timer_id_;
|
||||||
|
};
|
||||||
|
} // namespace memgraph::utils
|
Loading…
Reference in New Issue
Block a user