Add thread names

Summary:
To see the thread names in htop press <F2> to enter setup and under
'Display options' choose 'Show custom thread names'.

Reviewers: buda, teon.banek, florijan

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1234
This commit is contained in:
Matej Ferencevic 2018-02-23 14:35:16 +01:00
parent 80b86f0314
commit d99724647a
17 changed files with 91 additions and 54 deletions

View File

@ -13,6 +13,7 @@
#include "io/network/epoll.hpp"
#include "io/network/socket.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/thread.hpp"
namespace communication {
@ -35,10 +36,12 @@ class Listener {
static const int kMaxEvents = 1;
public:
Listener(TSessionData &data, bool check_for_timeouts)
Listener(TSessionData &data, bool check_for_timeouts,
const std::string &service_name)
: data_(data), alive_(true) {
if (check_for_timeouts) {
thread_ = std::thread([this]() {
thread_ = std::thread([this, service_name]() {
utils::ThreadSetName(fmt::format("{} timeout", service_name));
while (alive_) {
{
std::unique_lock<SpinLock> guard(lock_);

View File

@ -11,7 +11,7 @@ namespace communication::rpc {
Server::Server(const io::network::Endpoint &endpoint,
size_t workers_count)
: server_(endpoint, *this, false, workers_count) {}
: server_(endpoint, *this, false, "RPC", workers_count) {}
void Server::StopProcessingCalls() {
server_.Shutdown();

View File

@ -12,6 +12,7 @@
#include "communication/listener.hpp"
#include "io/network/socket.hpp"
#include "utils/thread.hpp"
namespace communication {
@ -42,9 +43,9 @@ class Server {
* invokes workers_count workers
*/
Server(const io::network::Endpoint &endpoint, TSessionData &session_data,
bool check_for_timeouts,
bool check_for_timeouts, const std::string &service_name,
size_t workers_count = std::thread::hardware_concurrency())
: listener_(session_data, check_for_timeouts) {
: listener_(session_data, check_for_timeouts, service_name) {
// Without server we can't continue with application so we can just
// terminate here.
if (!socket_.Bind(endpoint)) {
@ -55,25 +56,30 @@ class Server {
LOG(FATAL) << "Cannot listen on socket!";
}
thread_ = std::thread([this, workers_count]() {
std::cout << fmt::format("Starting {} workers", workers_count)
<< std::endl;
thread_ = std::thread([this, workers_count, service_name]() {
std::cout << "Starting " << workers_count << " " << service_name
<< " workers" << std::endl;
utils::ThreadSetName(fmt::format("{} server", service_name));
for (size_t i = 0; i < workers_count; ++i) {
worker_threads_.emplace_back([this]() {
worker_threads_.emplace_back([this, service_name, i]() {
utils::ThreadSetName(
fmt::format("{} worker {}", service_name, i + 1));
while (alive_) {
listener_.WaitAndProcessEvents();
}
});
}
std::cout << "Server is fully armed and operational" << std::endl;
std::cout << "Listening on " << socket_.endpoint() << std::endl;
std::cout << service_name << " server is fully armed and operational"
<< std::endl;
std::cout << service_name << " listening on " << socket_.endpoint()
<< std::endl;
while (alive_) {
AcceptConnection();
}
std::cout << "Shutting down..." << std::endl;
std::cout << service_name << " shutting down..." << std::endl;
for (auto &worker_thread : worker_threads_) {
worker_thread.join();
}

View File

@ -244,6 +244,7 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
impl_->wal().Enable();
snapshot_creator_ = std::make_unique<Scheduler>();
snapshot_creator_->Run(
"Snapshot",
std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] { MakeSnapshot(); });
}
@ -320,6 +321,7 @@ MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl)
: PublicBase(std::move(impl)) {
if (impl_->config_.query_execution_time_sec != -1) {
transaction_killer_.Run(
"TX killer",
std::chrono::seconds(std::max(
1, std::min(5, impl_->config_.query_execution_time_sec / 4))),
[this]() {

View File

@ -50,7 +50,7 @@ class StorageGc {
vertices_(storage.vertices_),
edges_(storage.edges_) {
if (pause_sec > 0)
scheduler_.Run(std::chrono::seconds(pause_sec),
scheduler_.Run("Storage GC", std::chrono::seconds(pause_sec),
[this] { CollectGarbage(); });
}

View File

@ -26,7 +26,8 @@ WriteAheadLog::WriteAheadLog(
if (durability_enabled) {
CheckDurabilityDir(durability_dir);
wal_file_.Init();
scheduler_.Run(std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
scheduler_.Run("WAL",
std::chrono::milliseconds(FLAGS_wal_flush_interval_millis),
[this]() { wal_file_.Flush(deltas_); });
}
}

View File

@ -89,25 +89,13 @@ void InitSignalHandlers(const std::function<void()> &shutdown) {
})) << "Unable to register SIGUSR1 handler!";
}
void StartMemWarningLogger() {
Scheduler mem_log_scheduler;
if (FLAGS_memory_warning_threshold > 0) {
mem_log_scheduler.Run(std::chrono::seconds(3), [] {
auto free_ram_mb = utils::AvailableMem() / 1024;
if (free_ram_mb < FLAGS_memory_warning_threshold)
LOG(WARNING) << "Running out of available RAM, only " << free_ram_mb
<< " MB left.";
});
}
}
void MasterMain() {
google::SetUsageMessage("Memgraph distributed master");
database::Master db;
SessionData session_data{db};
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
session_data, false, FLAGS_num_workers);
session_data, false, "Bolt", FLAGS_num_workers);
// Handler for regular termination signals
auto shutdown = [&server] {
@ -119,14 +107,12 @@ void MasterMain() {
};
InitSignalHandlers(shutdown);
StartMemWarningLogger();
server.AwaitShutdown();
}
void WorkerMain() {
google::SetUsageMessage("Memgraph distributed worker");
database::Worker db;
StartMemWarningLogger();
db.WaitForShutdown();
}
@ -135,7 +121,7 @@ void SingleNodeMain() {
database::SingleNode db;
SessionData session_data{db};
ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)},
session_data, false, FLAGS_num_workers);
session_data, false, "Bolt", FLAGS_num_workers);
// Handler for regular termination signals
auto shutdown = [&server] {
@ -147,8 +133,6 @@ void SingleNodeMain() {
};
InitSignalHandlers(shutdown);
StartMemWarningLogger();
server.AwaitShutdown();
}
@ -178,6 +162,17 @@ int main(int argc, char **argv) {
stats::InitStatsLogging(stats_prefix);
utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); });
// Start memory warning logger.
Scheduler mem_log_scheduler;
if (FLAGS_memory_warning_threshold > 0) {
mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] {
auto free_ram_mb = utils::AvailableMem() / 1024;
if (free_ram_mb < FLAGS_memory_warning_threshold)
LOG(WARNING) << "Running out of available RAM, only " << free_ram_mb
<< " MB left.";
});
}
CHECK(!(FLAGS_master && FLAGS_worker))
<< "Can't run Memgraph as worker and master at the same time";
if (FLAGS_master)

View File

@ -4,6 +4,7 @@
#include "communication/rpc/client.hpp"
#include "data_structures/concurrent/push_queue.hpp"
#include "utils/thread.hpp"
#include "stats/stats_rpc_messages.hpp"
@ -22,6 +23,7 @@ ConcurrentPushQueue<StatsReq> stats_queue;
void RefreshMetrics() {
LOG(INFO) << "Metrics flush thread started";
utils::ThreadSetName("Stats refresh");
while (stats_running) {
auto &metrics = AccessMetrics();
for (auto &kv : metrics) {
@ -42,6 +44,7 @@ void StatsDispatchMain(const io::network::Endpoint &endpoint) {
const int MAX_BATCH_SIZE = 100;
LOG(INFO) << "Stats dispatcher thread started";
utils::ThreadSetName("Stats dispatcher");
communication::rpc::Client client(endpoint);

View File

@ -10,20 +10,22 @@ namespace tx {
WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint)
: rpc_client_pool_(endpoint) {
cache_clearing_scheduler_.Run(kCacheReleasePeriod, [this]() {
// Use the GC snapshot as it always has at least one member.
auto res = rpc_client_pool_.Call<GcSnapshotRpc>();
// There is a race-condition between this scheduled call and worker
// shutdown. It is possible that the worker has responded to the master it
// is shutting down, and the master is shutting down (and can't responde to
// RPCs). At the same time this call gets scheduled, so we get a failed RPC.
if (!res) {
LOG(WARNING) << "Transaction cache GC RPC call failed";
} else {
CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot";
ClearCachesBasedOnOldest(res->member.front());
}
});
cache_clearing_scheduler_.Run(
"TX cache clear", kCacheReleasePeriod, [this]() {
// Use the GC snapshot as it always has at least one member.
auto res = rpc_client_pool_.Call<GcSnapshotRpc>();
// There is a race-condition between this scheduled call and worker
// shutdown. It is possible that the worker has responded to the master
// it is shutting down, and the master is shutting down (and can't
// responde to RPCs). At the same time this call gets scheduled, so we
// get a failed RPC.
if (!res) {
LOG(WARNING) << "Transaction cache GC RPC call failed";
} else {
CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot";
ClearCachesBasedOnOldest(res->member.front());
}
});
}
WorkerEngine::~WorkerEngine() {

View File

@ -16,7 +16,7 @@ class Executor {
explicit Executor(const std::chrono::duration<TRep, TPeriod> pause) {
DCHECK(pause > std::chrono::seconds(0))
<< "Duration between executions should be reasonable";
scheduler_.Run(pause, std::bind(&Executor::Execute, this));
scheduler_.Run("Executor", pause, std::bind(&Executor::Execute, this));
}
~Executor() {

View File

@ -9,6 +9,8 @@
#include "glog/logging.h"
#include "utils/thread.hpp"
/**
* Class used to run scheduled function execution.
*/
@ -24,15 +26,18 @@ class Scheduler {
* @Tparam TPeriod duration in seconds between two ticks
*/
template <typename TRep, typename TPeriod>
void Run(const std::chrono::duration<TRep, TPeriod> &pause,
void Run(const std::string &service_name,
const std::chrono::duration<TRep, TPeriod> &pause,
const std::function<void()> &f) {
DCHECK(is_working_ == false) << "Thread already running.";
DCHECK(pause > std::chrono::seconds(0)) << "Pause is invalid.";
is_working_ = true;
thread_ = std::thread([this, pause, f]() {
thread_ = std::thread([this, pause, f, service_name]() {
auto start_time = std::chrono::system_clock::now();
utils::ThreadSetName(service_name);
while (true) {
// First wait then execute the function. We do that in that order
// because most of the schedulers are started at the beginning of the

20
src/utils/thread.hpp Normal file
View File

@ -0,0 +1,20 @@
#pragma once
#include <sys/prctl.h>
#include <thread>
#include <glog/logging.h>
namespace utils {
/**
* This function sets the thread name of the calling thread.
* Beware, the name length limit is 16 characters!
*/
inline void ThreadSetName(const std::string &name) {
LOG_IF(WARNING, prctl(PR_SET_NAME, name.c_str()) != 0)
<< "Couldn't set thread name: " << name << "!";
}
}; // namespace utils

View File

@ -82,7 +82,7 @@ TEST(Network, SocketReadHangOnConcurrentConnections) {
TestData data;
int N = (std::thread::hardware_concurrency() + 1) / 2;
int Nc = N * 3;
communication::Server<TestSession, TestData> server(endpoint, data, false, N);
communication::Server<TestSession, TestData> server(endpoint, data, false, "Test", N);
const auto &ep = server.endpoint();
// start clients

View File

@ -21,7 +21,7 @@ TEST(Network, Server) {
// initialize server
TestData session_data;
int N = (std::thread::hardware_concurrency() + 1) / 2;
ServerT server(endpoint, session_data, false, N);
ServerT server(endpoint, session_data, false, "Test", N);
const auto &ep = server.endpoint();
// start clients

View File

@ -22,7 +22,7 @@ TEST(Network, SessionLeak) {
// initialize server
TestData session_data;
ServerT server(endpoint, session_data, false, 2);
ServerT server(endpoint, session_data, false, "Test", 2);
// start clients
int N = 50;

View File

@ -31,7 +31,7 @@ class RunningServer {
database::SingleNode db_;
SessionData session_data_{db_};
Endpoint endpoint_{"127.0.0.1", 0};
ServerT server_{endpoint_, session_data_, true, 1};
ServerT server_{endpoint_, session_data_, true, "Test", 1};
};
class TestClient : public ClientT {

View File

@ -13,7 +13,7 @@ TEST(Scheduler, TestFunctionExecuting) {
std::atomic<int> x{0};
std::function<void()> func{[&x]() { ++x; }};
Scheduler scheduler;
scheduler.Run(std::chrono::seconds(1), func);
scheduler.Run("Test", std::chrono::seconds(1), func);
EXPECT_EQ(x, 0);
std::this_thread::sleep_for(std::chrono::milliseconds(900));