From d99724647ad32eb190a92495019e53a09d92a147 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic <matej.ferencevic@memgraph.io> Date: Fri, 23 Feb 2018 14:35:16 +0100 Subject: [PATCH] Add thread names Summary: To see the thread names in htop press <F2> to enter setup and under 'Display options' choose 'Show custom thread names'. Reviewers: buda, teon.banek, florijan Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1234 --- src/communication/listener.hpp | 7 +++-- src/communication/rpc/server.cpp | 2 +- src/communication/server.hpp | 24 +++++++++++------- src/database/graph_db.cpp | 2 ++ src/database/storage_gc.hpp | 2 +- src/durability/wal.cpp | 3 ++- src/memgraph_bolt.cpp | 31 ++++++++++------------- src/stats/stats.cpp | 3 +++ src/transactions/engine_worker.cpp | 30 ++++++++++++---------- src/utils/executor.hpp | 2 +- src/utils/scheduler.hpp | 9 +++++-- src/utils/thread.hpp | 20 +++++++++++++++ tests/concurrent/network_read_hang.cpp | 2 +- tests/concurrent/network_server.cpp | 2 +- tests/concurrent/network_session_leak.cpp | 2 +- tests/unit/network_timeouts.cpp | 2 +- tests/unit/scheduler.cpp | 2 +- 17 files changed, 91 insertions(+), 54 deletions(-) create mode 100644 src/utils/thread.hpp diff --git a/src/communication/listener.hpp b/src/communication/listener.hpp index a174b1a76..cd5cc9954 100644 --- a/src/communication/listener.hpp +++ b/src/communication/listener.hpp @@ -13,6 +13,7 @@ #include "io/network/epoll.hpp" #include "io/network/socket.hpp" #include "threading/sync/spinlock.hpp" +#include "utils/thread.hpp" namespace communication { @@ -35,10 +36,12 @@ class Listener { static const int kMaxEvents = 1; public: - Listener(TSessionData &data, bool check_for_timeouts) + Listener(TSessionData &data, bool check_for_timeouts, + const std::string &service_name) : data_(data), alive_(true) { if (check_for_timeouts) { - thread_ = std::thread([this]() { + thread_ = std::thread([this, service_name]() { + utils::ThreadSetName(fmt::format("{} timeout", service_name)); while (alive_) { { std::unique_lock<SpinLock> guard(lock_); diff --git a/src/communication/rpc/server.cpp b/src/communication/rpc/server.cpp index 2400d1c90..03a9fa7dc 100644 --- a/src/communication/rpc/server.cpp +++ b/src/communication/rpc/server.cpp @@ -11,7 +11,7 @@ namespace communication::rpc { Server::Server(const io::network::Endpoint &endpoint, size_t workers_count) - : server_(endpoint, *this, false, workers_count) {} + : server_(endpoint, *this, false, "RPC", workers_count) {} void Server::StopProcessingCalls() { server_.Shutdown(); diff --git a/src/communication/server.hpp b/src/communication/server.hpp index 75de7b288..b0b962d94 100644 --- a/src/communication/server.hpp +++ b/src/communication/server.hpp @@ -12,6 +12,7 @@ #include "communication/listener.hpp" #include "io/network/socket.hpp" +#include "utils/thread.hpp" namespace communication { @@ -42,9 +43,9 @@ class Server { * invokes workers_count workers */ Server(const io::network::Endpoint &endpoint, TSessionData &session_data, - bool check_for_timeouts, + bool check_for_timeouts, const std::string &service_name, size_t workers_count = std::thread::hardware_concurrency()) - : listener_(session_data, check_for_timeouts) { + : listener_(session_data, check_for_timeouts, service_name) { // Without server we can't continue with application so we can just // terminate here. if (!socket_.Bind(endpoint)) { @@ -55,25 +56,30 @@ class Server { LOG(FATAL) << "Cannot listen on socket!"; } - thread_ = std::thread([this, workers_count]() { - std::cout << fmt::format("Starting {} workers", workers_count) - << std::endl; + thread_ = std::thread([this, workers_count, service_name]() { + std::cout << "Starting " << workers_count << " " << service_name + << " workers" << std::endl; + utils::ThreadSetName(fmt::format("{} server", service_name)); for (size_t i = 0; i < workers_count; ++i) { - worker_threads_.emplace_back([this]() { + worker_threads_.emplace_back([this, service_name, i]() { + utils::ThreadSetName( + fmt::format("{} worker {}", service_name, i + 1)); while (alive_) { listener_.WaitAndProcessEvents(); } }); } - std::cout << "Server is fully armed and operational" << std::endl; - std::cout << "Listening on " << socket_.endpoint() << std::endl; + std::cout << service_name << " server is fully armed and operational" + << std::endl; + std::cout << service_name << " listening on " << socket_.endpoint() + << std::endl; while (alive_) { AcceptConnection(); } - std::cout << "Shutting down..." << std::endl; + std::cout << service_name << " shutting down..." << std::endl; for (auto &worker_thread : worker_threads_) { worker_thread.join(); } diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index fb44a9288..a60189135 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -244,6 +244,7 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl) impl_->wal().Enable(); snapshot_creator_ = std::make_unique<Scheduler>(); snapshot_creator_->Run( + "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), [this] { MakeSnapshot(); }); } @@ -320,6 +321,7 @@ MasterBase::MasterBase(std::unique_ptr<impl::PrivateBase> impl) : PublicBase(std::move(impl)) { if (impl_->config_.query_execution_time_sec != -1) { transaction_killer_.Run( + "TX killer", std::chrono::seconds(std::max( 1, std::min(5, impl_->config_.query_execution_time_sec / 4))), [this]() { diff --git a/src/database/storage_gc.hpp b/src/database/storage_gc.hpp index 1ccf383e1..95009556f 100644 --- a/src/database/storage_gc.hpp +++ b/src/database/storage_gc.hpp @@ -50,7 +50,7 @@ class StorageGc { vertices_(storage.vertices_), edges_(storage.edges_) { if (pause_sec > 0) - scheduler_.Run(std::chrono::seconds(pause_sec), + scheduler_.Run("Storage GC", std::chrono::seconds(pause_sec), [this] { CollectGarbage(); }); } diff --git a/src/durability/wal.cpp b/src/durability/wal.cpp index 3c839fe68..de6b2d661 100644 --- a/src/durability/wal.cpp +++ b/src/durability/wal.cpp @@ -26,7 +26,8 @@ WriteAheadLog::WriteAheadLog( if (durability_enabled) { CheckDurabilityDir(durability_dir); wal_file_.Init(); - scheduler_.Run(std::chrono::milliseconds(FLAGS_wal_flush_interval_millis), + scheduler_.Run("WAL", + std::chrono::milliseconds(FLAGS_wal_flush_interval_millis), [this]() { wal_file_.Flush(deltas_); }); } } diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index d33207c4d..612d32fd0 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -89,25 +89,13 @@ void InitSignalHandlers(const std::function<void()> &shutdown) { })) << "Unable to register SIGUSR1 handler!"; } -void StartMemWarningLogger() { - Scheduler mem_log_scheduler; - if (FLAGS_memory_warning_threshold > 0) { - mem_log_scheduler.Run(std::chrono::seconds(3), [] { - auto free_ram_mb = utils::AvailableMem() / 1024; - if (free_ram_mb < FLAGS_memory_warning_threshold) - LOG(WARNING) << "Running out of available RAM, only " << free_ram_mb - << " MB left."; - }); - } -} - void MasterMain() { google::SetUsageMessage("Memgraph distributed master"); database::Master db; SessionData session_data{db}; ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)}, - session_data, false, FLAGS_num_workers); + session_data, false, "Bolt", FLAGS_num_workers); // Handler for regular termination signals auto shutdown = [&server] { @@ -119,14 +107,12 @@ void MasterMain() { }; InitSignalHandlers(shutdown); - StartMemWarningLogger(); server.AwaitShutdown(); } void WorkerMain() { google::SetUsageMessage("Memgraph distributed worker"); database::Worker db; - StartMemWarningLogger(); db.WaitForShutdown(); } @@ -135,7 +121,7 @@ void SingleNodeMain() { database::SingleNode db; SessionData session_data{db}; ServerT server({FLAGS_interface, static_cast<uint16_t>(FLAGS_port)}, - session_data, false, FLAGS_num_workers); + session_data, false, "Bolt", FLAGS_num_workers); // Handler for regular termination signals auto shutdown = [&server] { @@ -147,8 +133,6 @@ void SingleNodeMain() { }; InitSignalHandlers(shutdown); - StartMemWarningLogger(); - server.AwaitShutdown(); } @@ -178,6 +162,17 @@ int main(int argc, char **argv) { stats::InitStatsLogging(stats_prefix); utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); }); + // Start memory warning logger. + Scheduler mem_log_scheduler; + if (FLAGS_memory_warning_threshold > 0) { + mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] { + auto free_ram_mb = utils::AvailableMem() / 1024; + if (free_ram_mb < FLAGS_memory_warning_threshold) + LOG(WARNING) << "Running out of available RAM, only " << free_ram_mb + << " MB left."; + }); + } + CHECK(!(FLAGS_master && FLAGS_worker)) << "Can't run Memgraph as worker and master at the same time"; if (FLAGS_master) diff --git a/src/stats/stats.cpp b/src/stats/stats.cpp index 4a1bc9fb0..276f32b1c 100644 --- a/src/stats/stats.cpp +++ b/src/stats/stats.cpp @@ -4,6 +4,7 @@ #include "communication/rpc/client.hpp" #include "data_structures/concurrent/push_queue.hpp" +#include "utils/thread.hpp" #include "stats/stats_rpc_messages.hpp" @@ -22,6 +23,7 @@ ConcurrentPushQueue<StatsReq> stats_queue; void RefreshMetrics() { LOG(INFO) << "Metrics flush thread started"; + utils::ThreadSetName("Stats refresh"); while (stats_running) { auto &metrics = AccessMetrics(); for (auto &kv : metrics) { @@ -42,6 +44,7 @@ void StatsDispatchMain(const io::network::Endpoint &endpoint) { const int MAX_BATCH_SIZE = 100; LOG(INFO) << "Stats dispatcher thread started"; + utils::ThreadSetName("Stats dispatcher"); communication::rpc::Client client(endpoint); diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 81f643640..a921ff10a 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -10,20 +10,22 @@ namespace tx { WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint) : rpc_client_pool_(endpoint) { - cache_clearing_scheduler_.Run(kCacheReleasePeriod, [this]() { - // Use the GC snapshot as it always has at least one member. - auto res = rpc_client_pool_.Call<GcSnapshotRpc>(); - // There is a race-condition between this scheduled call and worker - // shutdown. It is possible that the worker has responded to the master it - // is shutting down, and the master is shutting down (and can't responde to - // RPCs). At the same time this call gets scheduled, so we get a failed RPC. - if (!res) { - LOG(WARNING) << "Transaction cache GC RPC call failed"; - } else { - CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot"; - ClearCachesBasedOnOldest(res->member.front()); - } - }); + cache_clearing_scheduler_.Run( + "TX cache clear", kCacheReleasePeriod, [this]() { + // Use the GC snapshot as it always has at least one member. + auto res = rpc_client_pool_.Call<GcSnapshotRpc>(); + // There is a race-condition between this scheduled call and worker + // shutdown. It is possible that the worker has responded to the master + // it is shutting down, and the master is shutting down (and can't + // responde to RPCs). At the same time this call gets scheduled, so we + // get a failed RPC. + if (!res) { + LOG(WARNING) << "Transaction cache GC RPC call failed"; + } else { + CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot"; + ClearCachesBasedOnOldest(res->member.front()); + } + }); } WorkerEngine::~WorkerEngine() { diff --git a/src/utils/executor.hpp b/src/utils/executor.hpp index ee61edbce..cc7d55443 100644 --- a/src/utils/executor.hpp +++ b/src/utils/executor.hpp @@ -16,7 +16,7 @@ class Executor { explicit Executor(const std::chrono::duration<TRep, TPeriod> pause) { DCHECK(pause > std::chrono::seconds(0)) << "Duration between executions should be reasonable"; - scheduler_.Run(pause, std::bind(&Executor::Execute, this)); + scheduler_.Run("Executor", pause, std::bind(&Executor::Execute, this)); } ~Executor() { diff --git a/src/utils/scheduler.hpp b/src/utils/scheduler.hpp index 22f358952..362400804 100644 --- a/src/utils/scheduler.hpp +++ b/src/utils/scheduler.hpp @@ -9,6 +9,8 @@ #include "glog/logging.h" +#include "utils/thread.hpp" + /** * Class used to run scheduled function execution. */ @@ -24,15 +26,18 @@ class Scheduler { * @Tparam TPeriod duration in seconds between two ticks */ template <typename TRep, typename TPeriod> - void Run(const std::chrono::duration<TRep, TPeriod> &pause, + void Run(const std::string &service_name, + const std::chrono::duration<TRep, TPeriod> &pause, const std::function<void()> &f) { DCHECK(is_working_ == false) << "Thread already running."; DCHECK(pause > std::chrono::seconds(0)) << "Pause is invalid."; is_working_ = true; - thread_ = std::thread([this, pause, f]() { + thread_ = std::thread([this, pause, f, service_name]() { auto start_time = std::chrono::system_clock::now(); + utils::ThreadSetName(service_name); + while (true) { // First wait then execute the function. We do that in that order // because most of the schedulers are started at the beginning of the diff --git a/src/utils/thread.hpp b/src/utils/thread.hpp new file mode 100644 index 000000000..8ca161ebe --- /dev/null +++ b/src/utils/thread.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include <sys/prctl.h> + +#include <thread> + +#include <glog/logging.h> + +namespace utils { + +/** + * This function sets the thread name of the calling thread. + * Beware, the name length limit is 16 characters! + */ +inline void ThreadSetName(const std::string &name) { + LOG_IF(WARNING, prctl(PR_SET_NAME, name.c_str()) != 0) + << "Couldn't set thread name: " << name << "!"; +} + +}; // namespace utils diff --git a/tests/concurrent/network_read_hang.cpp b/tests/concurrent/network_read_hang.cpp index 392ea885c..cb87cc6a1 100644 --- a/tests/concurrent/network_read_hang.cpp +++ b/tests/concurrent/network_read_hang.cpp @@ -82,7 +82,7 @@ TEST(Network, SocketReadHangOnConcurrentConnections) { TestData data; int N = (std::thread::hardware_concurrency() + 1) / 2; int Nc = N * 3; - communication::Server<TestSession, TestData> server(endpoint, data, false, N); + communication::Server<TestSession, TestData> server(endpoint, data, false, "Test", N); const auto &ep = server.endpoint(); // start clients diff --git a/tests/concurrent/network_server.cpp b/tests/concurrent/network_server.cpp index 601f1b757..975c8270f 100644 --- a/tests/concurrent/network_server.cpp +++ b/tests/concurrent/network_server.cpp @@ -21,7 +21,7 @@ TEST(Network, Server) { // initialize server TestData session_data; int N = (std::thread::hardware_concurrency() + 1) / 2; - ServerT server(endpoint, session_data, false, N); + ServerT server(endpoint, session_data, false, "Test", N); const auto &ep = server.endpoint(); // start clients diff --git a/tests/concurrent/network_session_leak.cpp b/tests/concurrent/network_session_leak.cpp index fa5f2400b..b245646fa 100644 --- a/tests/concurrent/network_session_leak.cpp +++ b/tests/concurrent/network_session_leak.cpp @@ -22,7 +22,7 @@ TEST(Network, SessionLeak) { // initialize server TestData session_data; - ServerT server(endpoint, session_data, false, 2); + ServerT server(endpoint, session_data, false, "Test", 2); // start clients int N = 50; diff --git a/tests/unit/network_timeouts.cpp b/tests/unit/network_timeouts.cpp index 600fcb702..e679b2f35 100644 --- a/tests/unit/network_timeouts.cpp +++ b/tests/unit/network_timeouts.cpp @@ -31,7 +31,7 @@ class RunningServer { database::SingleNode db_; SessionData session_data_{db_}; Endpoint endpoint_{"127.0.0.1", 0}; - ServerT server_{endpoint_, session_data_, true, 1}; + ServerT server_{endpoint_, session_data_, true, "Test", 1}; }; class TestClient : public ClientT { diff --git a/tests/unit/scheduler.cpp b/tests/unit/scheduler.cpp index 2fde2fc8d..d435ff0c9 100644 --- a/tests/unit/scheduler.cpp +++ b/tests/unit/scheduler.cpp @@ -13,7 +13,7 @@ TEST(Scheduler, TestFunctionExecuting) { std::atomic<int> x{0}; std::function<void()> func{[&x]() { ++x; }}; Scheduler scheduler; - scheduler.Run(std::chrono::seconds(1), func); + scheduler.Run("Test", std::chrono::seconds(1), func); EXPECT_EQ(x, 0); std::this_thread::sleep_for(std::chrono::milliseconds(900));