From 56017e598e555b18d9d977ea27228715b9aea6d1 Mon Sep 17 00:00:00 2001
From: Mislav Bradac <mislav.bradac@memgraph.io>
Date: Mon, 25 Sep 2017 15:56:14 +0200
Subject: [PATCH] Abort all transactions on SIGTERM

Reviewers: buda, teon.banek, florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D818
---
 src/communication/bolt/v1/session.hpp         |  2 --
 .../bolt/v1/states/idle_result.hpp            |  5 +++++
 src/communication/server.hpp                  | 11 +++++-----
 src/database/dbms.cpp                         |  1 +
 src/database/dbms.hpp                         | 18 +++++++++++++--
 src/memgraph_bolt.cpp                         | 22 ++++++++++++-------
 6 files changed, 41 insertions(+), 18 deletions(-)

diff --git a/src/communication/bolt/v1/session.hpp b/src/communication/bolt/v1/session.hpp
index a16833af9..414ed755a 100644
--- a/src/communication/bolt/v1/session.hpp
+++ b/src/communication/bolt/v1/session.hpp
@@ -181,8 +181,6 @@ class Session {
     db_accessor_ = nullptr;
   }
 
-  GraphDbAccessor ActiveDb() { return dbms_.active(); }
-
   // TODO: Rethink if there is a way to hide some members. At the momemnt all of
   // them are public.
   Socket socket_;
diff --git a/src/communication/bolt/v1/states/idle_result.hpp b/src/communication/bolt/v1/states/idle_result.hpp
index e342f2c79..e3a045f97 100644
--- a/src/communication/bolt/v1/states/idle_result.hpp
+++ b/src/communication/bolt/v1/states/idle_result.hpp
@@ -73,6 +73,11 @@ State HandleRun(Session &session, State state, Marker marker) {
   } else {
     // Create new transaction.
     session.db_accessor_ = session.dbms_.active();
+    if (!session.db_accessor_) {
+      // Dbms is shutting down and doesn't accept new transactions so we should
+      // close this session.
+      return State::Close;
+    }
   }
 
   DLOG(INFO) << fmt::format("[ActiveDB] '{}'", session.db_accessor_->name());
diff --git a/src/communication/server.hpp b/src/communication/server.hpp
index 5dada284d..036c4814f 100644
--- a/src/communication/server.hpp
+++ b/src/communication/server.hpp
@@ -31,7 +31,8 @@ namespace communication {
  *         represents a different protocol so the same network infrastructure
  *         can be used for handling different protocols
  * @tparam Socket the input/output socket that should be used
- * @tparam SessionData the class with objects that will be forwarded to the session
+ * @tparam SessionData the class with objects that will be forwarded to the
+ *         session
  */
 template <typename Session, typename Socket, typename SessionData>
 class Server
@@ -40,8 +41,7 @@ class Server
 
  public:
   Server(Socket &&socket, SessionData &session_data)
-      : socket_(std::forward<Socket>(socket)),
-        session_data_(session_data) {
+      : socket_(std::forward<Socket>(socket)), session_data_(session_data) {
     event_.data.fd = socket_;
 
     // TODO: EPOLLET is hard to use -> figure out how should EPOLLET be used
@@ -55,9 +55,8 @@ class Server
     std::cout << fmt::format("Starting {} workers", n) << std::endl;
     workers_.reserve(n);
     for (size_t i = 0; i < n; ++i) {
-      workers_.push_back(
-          std::make_unique<Worker<Session, Socket, SessionData>>(
-              session_data_));
+      workers_.push_back(std::make_unique<Worker<Session, Socket, SessionData>>(
+          session_data_));
       workers_.back()->Start(alive_);
     }
     std::cout << "Server is fully armed and operational" << std::endl;
diff --git a/src/database/dbms.cpp b/src/database/dbms.cpp
index 54a55c6a7..62df92769 100644
--- a/src/database/dbms.cpp
+++ b/src/database/dbms.cpp
@@ -13,6 +13,7 @@ std::unique_ptr<GraphDbAccessor> Dbms::active() {
 
 std::unique_ptr<GraphDbAccessor> Dbms::active(const std::string &name,
                                               const fs::path &snapshot_db_dir) {
+  if (!alive_) return nullptr;
   auto acc = dbs.access();
   // create db if it doesn't exist
   auto it = acc.find(name);
diff --git a/src/database/dbms.hpp b/src/database/dbms.hpp
index 7b9ffb855..1de05e129 100644
--- a/src/database/dbms.hpp
+++ b/src/database/dbms.hpp
@@ -65,7 +65,19 @@ class Dbms {
   }
 
   /**
-   * Returns an accessor to the active database.
+   * Aborts every transaction in every GraphDb.
+   */
+  void Shutdown() {
+    alive_ = false;
+    for (auto &db : dbs.access()) {
+      db.second.tx_engine_.ForEachActiveTransaction(
+          [](tx::Transaction &t) { t.set_should_abort(); });
+    }
+  }
+
+  /**
+   * Returns an accessor to the active database. If dbms is shutting down
+   * (alive_ is false) it will reject new transactions and return nullptr.
    */
   std::unique_ptr<GraphDbAccessor> active();
 
@@ -87,5 +99,7 @@ class Dbms {
   ConcurrentMap<std::string, GraphDb> dbs;
 
   // currently active database
-  std::atomic<GraphDb *> active_db;
+  std::atomic<GraphDb *> active_db{nullptr};
+
+  std::atomic<bool> alive_{true};
 };
diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp
index 57bb03559..6ced42b55 100644
--- a/src/memgraph_bolt.cpp
+++ b/src/memgraph_bolt.cpp
@@ -31,17 +31,18 @@ using session_data_t = communication::bolt::SessionData<result_stream_t>;
 using bolt_server_t =
     communication::Server<session_t, socket_t, session_data_t>;
 
-DEFINE_string(interface, "0.0.0.0", "Communication interface on which to listen.");
+DEFINE_string(interface, "0.0.0.0",
+              "Communication interface on which to listen.");
 DEFINE_string(port, "7687", "Communication port on which to listen.");
 DEFINE_VALIDATED_int32(num_workers,
                        std::max(std::thread::hardware_concurrency(), 1U),
                        "Number of workers", FLAG_IN_RANGE(1, INT32_MAX));
 DEFINE_string(log_file, "memgraph.log",
               "Path to where the log should be stored.");
-DEFINE_uint64(
-    memory_warning_threshold, 1024,
-    "Memory warning treshold, in MB. If Memgraph detects there is less available "
-    "RAM available it will log a warning. Set to 0 to disable.");
+DEFINE_uint64(memory_warning_threshold, 1024,
+              "Memory warning treshold, in MB. If Memgraph detects there is "
+              "less available RAM available it will log a warning. Set to 0 to "
+              "disable.");
 
 // Load flags in this order, the last one has the highest priority:
 // 1) /etc/memgraph/config
@@ -146,11 +147,16 @@ int main(int argc, char **argv) {
 
   // register SIGTERM handler
   SignalHandler::register_handler(Signal::Terminate,
-                                  [&server]() { server.Shutdown(); });
+                                  [&server, &session_data]() {
+                                    server.Shutdown();
+                                    session_data.dbms.Shutdown();
+                                  });
 
   // register SIGINT handler
-  SignalHandler::register_handler(Signal::Interupt,
-                                  [&server]() { server.Shutdown(); });
+  SignalHandler::register_handler(Signal::Interupt, [&server, &session_data]() {
+    server.Shutdown();
+    session_data.dbms.Shutdown();
+  });
 
   // Start memory warning logger.
   Scheduler mem_log_scheduler;