Abort all transactions on SIGTERM
Reviewers: buda, teon.banek, florijan Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D818
This commit is contained in:
parent
4e4ed063fc
commit
56017e598e
@ -181,8 +181,6 @@ class Session {
|
||||
db_accessor_ = nullptr;
|
||||
}
|
||||
|
||||
GraphDbAccessor ActiveDb() { return dbms_.active(); }
|
||||
|
||||
// TODO: Rethink if there is a way to hide some members. At the momemnt all of
|
||||
// them are public.
|
||||
Socket socket_;
|
||||
|
@ -73,6 +73,11 @@ State HandleRun(Session &session, State state, Marker marker) {
|
||||
} else {
|
||||
// Create new transaction.
|
||||
session.db_accessor_ = session.dbms_.active();
|
||||
if (!session.db_accessor_) {
|
||||
// Dbms is shutting down and doesn't accept new transactions so we should
|
||||
// close this session.
|
||||
return State::Close;
|
||||
}
|
||||
}
|
||||
|
||||
DLOG(INFO) << fmt::format("[ActiveDB] '{}'", session.db_accessor_->name());
|
||||
|
@ -31,7 +31,8 @@ namespace communication {
|
||||
* represents a different protocol so the same network infrastructure
|
||||
* can be used for handling different protocols
|
||||
* @tparam Socket the input/output socket that should be used
|
||||
* @tparam SessionData the class with objects that will be forwarded to the session
|
||||
* @tparam SessionData the class with objects that will be forwarded to the
|
||||
* session
|
||||
*/
|
||||
template <typename Session, typename Socket, typename SessionData>
|
||||
class Server
|
||||
@ -40,8 +41,7 @@ class Server
|
||||
|
||||
public:
|
||||
Server(Socket &&socket, SessionData &session_data)
|
||||
: socket_(std::forward<Socket>(socket)),
|
||||
session_data_(session_data) {
|
||||
: socket_(std::forward<Socket>(socket)), session_data_(session_data) {
|
||||
event_.data.fd = socket_;
|
||||
|
||||
// TODO: EPOLLET is hard to use -> figure out how should EPOLLET be used
|
||||
@ -55,9 +55,8 @@ class Server
|
||||
std::cout << fmt::format("Starting {} workers", n) << std::endl;
|
||||
workers_.reserve(n);
|
||||
for (size_t i = 0; i < n; ++i) {
|
||||
workers_.push_back(
|
||||
std::make_unique<Worker<Session, Socket, SessionData>>(
|
||||
session_data_));
|
||||
workers_.push_back(std::make_unique<Worker<Session, Socket, SessionData>>(
|
||||
session_data_));
|
||||
workers_.back()->Start(alive_);
|
||||
}
|
||||
std::cout << "Server is fully armed and operational" << std::endl;
|
||||
|
@ -13,6 +13,7 @@ std::unique_ptr<GraphDbAccessor> Dbms::active() {
|
||||
|
||||
std::unique_ptr<GraphDbAccessor> Dbms::active(const std::string &name,
|
||||
const fs::path &snapshot_db_dir) {
|
||||
if (!alive_) return nullptr;
|
||||
auto acc = dbs.access();
|
||||
// create db if it doesn't exist
|
||||
auto it = acc.find(name);
|
||||
|
@ -65,7 +65,19 @@ class Dbms {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an accessor to the active database.
|
||||
* Aborts every transaction in every GraphDb.
|
||||
*/
|
||||
void Shutdown() {
|
||||
alive_ = false;
|
||||
for (auto &db : dbs.access()) {
|
||||
db.second.tx_engine_.ForEachActiveTransaction(
|
||||
[](tx::Transaction &t) { t.set_should_abort(); });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an accessor to the active database. If dbms is shutting down
|
||||
* (alive_ is false) it will reject new transactions and return nullptr.
|
||||
*/
|
||||
std::unique_ptr<GraphDbAccessor> active();
|
||||
|
||||
@ -87,5 +99,7 @@ class Dbms {
|
||||
ConcurrentMap<std::string, GraphDb> dbs;
|
||||
|
||||
// currently active database
|
||||
std::atomic<GraphDb *> active_db;
|
||||
std::atomic<GraphDb *> active_db{nullptr};
|
||||
|
||||
std::atomic<bool> alive_{true};
|
||||
};
|
||||
|
@ -31,17 +31,18 @@ using session_data_t = communication::bolt::SessionData<result_stream_t>;
|
||||
using bolt_server_t =
|
||||
communication::Server<session_t, socket_t, session_data_t>;
|
||||
|
||||
DEFINE_string(interface, "0.0.0.0", "Communication interface on which to listen.");
|
||||
DEFINE_string(interface, "0.0.0.0",
|
||||
"Communication interface on which to listen.");
|
||||
DEFINE_string(port, "7687", "Communication port on which to listen.");
|
||||
DEFINE_VALIDATED_int32(num_workers,
|
||||
std::max(std::thread::hardware_concurrency(), 1U),
|
||||
"Number of workers", FLAG_IN_RANGE(1, INT32_MAX));
|
||||
DEFINE_string(log_file, "memgraph.log",
|
||||
"Path to where the log should be stored.");
|
||||
DEFINE_uint64(
|
||||
memory_warning_threshold, 1024,
|
||||
"Memory warning treshold, in MB. If Memgraph detects there is less available "
|
||||
"RAM available it will log a warning. Set to 0 to disable.");
|
||||
DEFINE_uint64(memory_warning_threshold, 1024,
|
||||
"Memory warning treshold, in MB. If Memgraph detects there is "
|
||||
"less available RAM available it will log a warning. Set to 0 to "
|
||||
"disable.");
|
||||
|
||||
// Load flags in this order, the last one has the highest priority:
|
||||
// 1) /etc/memgraph/config
|
||||
@ -146,11 +147,16 @@ int main(int argc, char **argv) {
|
||||
|
||||
// register SIGTERM handler
|
||||
SignalHandler::register_handler(Signal::Terminate,
|
||||
[&server]() { server.Shutdown(); });
|
||||
[&server, &session_data]() {
|
||||
server.Shutdown();
|
||||
session_data.dbms.Shutdown();
|
||||
});
|
||||
|
||||
// register SIGINT handler
|
||||
SignalHandler::register_handler(Signal::Interupt,
|
||||
[&server]() { server.Shutdown(); });
|
||||
SignalHandler::register_handler(Signal::Interupt, [&server, &session_data]() {
|
||||
server.Shutdown();
|
||||
session_data.dbms.Shutdown();
|
||||
});
|
||||
|
||||
// Start memory warning logger.
|
||||
Scheduler mem_log_scheduler;
|
||||
|
Loading…
Reference in New Issue
Block a user