From 2eb5d3c3ffcbeab4613b940534d8291f7bcc2356 Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro <krunotf@memgraph.io> Date: Tue, 30 Aug 2016 15:12:30 +0100 Subject: [PATCH] Added basic exception handlig. Changed methods insert in Vertices and Edges to be valid even if exception occurs. --- .../communication/bolt/v1/server/worker.hpp | 12 ++++- .../concurrent/concurrent_list.hpp | 3 ++ include/import/csv_import.hpp | 42 ++++----------- include/io/network/event_listener.hpp | 48 ++++++++--------- include/io/network/server.hpp | 24 ++++----- include/io/network/stream_listener.hpp | 25 +++++---- include/mvcc/version_list.hpp | 4 ++ include/query_engine/hardcode/queries.hpp | 51 +++++++++++-------- include/transactions/lock_store.hpp | 24 ++++----- include/transactions/transaction.hpp | 1 + src/communication/bolt/v1/states/executor.cpp | 2 +- src/dbms/cleaner.cpp | 17 ++++++- src/storage/edges.cpp | 3 +- src/storage/vertices.cpp | 3 +- 14 files changed, 137 insertions(+), 122 deletions(-) diff --git a/include/communication/bolt/v1/server/worker.hpp b/include/communication/bolt/v1/server/worker.hpp index 7a6ac0183..d6b51bdde 100644 --- a/include/communication/bolt/v1/server/worker.hpp +++ b/include/communication/bolt/v1/server/worker.hpp @@ -9,8 +9,8 @@ #include "communication/bolt/v1/bolt.hpp" #include "communication/bolt/v1/session.hpp" -#include "logging/default.hpp" #include "io/network/stream_reader.hpp" +#include "logging/default.hpp" namespace bolt { @@ -76,6 +76,7 @@ public: } catch (const std::exception &e) { logger.error("Error occured while executing statement."); logger.error("{}", e.what()); + // TODO: report to client } } @@ -85,6 +86,15 @@ public: session.close(); } + template <class... Args> + void on_exception(Session &session, Args &&... args) + { + logger.error("Error occured in this session"); + logger.error(args...); + + // TODO: Do something about it + } + char buf[65536]; protected: diff --git a/include/data_structures/concurrent/concurrent_list.hpp b/include/data_structures/concurrent/concurrent_list.hpp index 8f16a504b..0dfdd41e2 100644 --- a/include/data_structures/concurrent/concurrent_list.hpp +++ b/include/data_structures/concurrent/concurrent_list.hpp @@ -144,6 +144,9 @@ private: // It is lock free but it isn't wait free. void push(T &&data) { + // It could be done with unique_ptr but while this could meen memory + // leak on excpetion, unique_ptr could meean use after free. Memory + // leak is less dangerous. auto node = new Node(data); Node *next = nullptr; do { diff --git a/include/import/csv_import.hpp b/include/import/csv_import.hpp index 75b26334a..00bffe32c 100644 --- a/include/import/csv_import.hpp +++ b/include/import/csv_import.hpp @@ -36,7 +36,7 @@ using namespace std; -constexpr char *_string = "string"; +constexpr char const *_string = "string"; bool equal_str(const char *a, const char *b) { return strcasecmp(a, b) == 0; } @@ -167,22 +167,6 @@ private: } } - // template <typename F> - // Option<unique_ptr<Filler>> make_filler_property(bool vertex, - // const char name, Flags - // type) - // { - // if (vertex) { - // std::unique_ptr<Filler> f( - // F(db.vertex_property_key(name, Type(type)))); - // return make_option(std::move(f)); - // } else { - // std::unique_ptr<Filler> f( - // F(db.edge_property_key(name, Type(type)))); - // return make_option(std::move(f)); - // } - // } - template <class TG> typename PropertyFamily<TG>::PropertyType::PropertyFamilyKey prop_key(const char *name, Flags type) @@ -197,6 +181,10 @@ private: { tmp_vec.clear(); split(header_part, type_mark, tmp_vec); + + const char *name = tmp_vec[0]; + const char *type = tmp_vec[1]; + if (tmp_vec.size() > 2) { err("To much sub parts in header part"); return make_option<unique_ptr<Filler>>(); @@ -205,28 +193,18 @@ private: warn( "Column ", tmp_vec[0], " doesn't have specified type so string type will be used"); - tmp_vec.push_back(_string); + name = tmp_vec[0]; + type = _string; } else { warn("Empty colum definition, skiping column."); std::unique_ptr<Filler> f(new SkipFiller()); return make_option(std::move(f)); } + } else { + name = tmp_vec[0]; + type = tmp_vec[1]; } - const char *name = tmp_vec[0]; - const char *type = tmp_vec[1]; - - // cout << name << " # " << type << endl; - - // auto prop_key = [&](auto name, auto type) -> auto - // { - // if (vertex) { - // return db.vertex_property_key(name, Type(type)); - // } else { - // return db.edge_property_key(name, Type(type)); - // } - // }; - if (equal_str(type, "id")) { std::unique_ptr<Filler> f( name[0] == '\0' ? new IdFiller<TG>() diff --git a/include/io/network/event_listener.hpp b/include/io/network/event_listener.hpp index 8ce0c5fe8..4794447d5 100644 --- a/include/io/network/event_listener.hpp +++ b/include/io/network/event_listener.hpp @@ -28,36 +28,37 @@ public: auto n = listener.wait(events, max_events, 200); // go through all events and process them in order - for(int i = 0; i < n; ++i) - { - auto& event = events[i]; + for (int i = 0; i < n; ++i) { + auto &event = events[i]; - // hangup event - if(UNLIKELY(event.events & EPOLLRDHUP)) - { - this->derived().on_close_event(event); - continue; + try { + // hangup event + if (UNLIKELY(event.events & EPOLLRDHUP)) { + this->derived().on_close_event(event); + continue; + } + + // there was an error on the server side + if (UNLIKELY(!(event.events & EPOLLIN) || + event.events & (EPOLLHUP | EPOLLERR))) { + this->derived().on_error_event(event); + continue; + } + + // we have some data waiting to be read + this->derived().on_data_event(event); + } catch (const std::exception &e) { + this->derived().on_exception_event( + event, "Error occured while processing event \n{}", + e.what()); } - - // there was an error on the server side - if(UNLIKELY(!(event.events & EPOLLIN) || - event.events & (EPOLLHUP | EPOLLERR))) - { - this->derived().on_error_event(event); - continue; - } - - // we have some data waiting to be read - this->derived().on_data_event(event); } // this will be optimized out :D - if(wait_timeout < 0) - return; + if (wait_timeout < 0) return; // if there was events, continue to wait on new events - if(n != 0) - return; + if (n != 0) return; // wait timeout occurred and there were no events. if wait_timeout // is -1 there will never be any timeouts so client should provide @@ -70,5 +71,4 @@ protected: Epoll listener; Epoll::Event events[max_events]; }; - } diff --git a/include/io/network/server.hpp b/include/io/network/server.hpp index c71f57470..ece730fc8 100644 --- a/include/io/network/server.hpp +++ b/include/io/network/server.hpp @@ -9,7 +9,7 @@ template <class Derived> class Server : public EventListener<Derived> { public: - Server(Socket&& socket) : socket(std::forward<Socket>(socket)) + Server(Socket &&socket) : socket(std::forward<Socket>(socket)) { event.data.fd = this->socket; event.events = EPOLLIN | EPOLLET; @@ -17,27 +17,25 @@ public: this->listener.add(this->socket, &event); } - void on_close_event(Epoll::Event& event) - { - ::close(event.data.fd); - } + void on_close_event(Epoll::Event &event) { ::close(event.data.fd); } - void on_error_event(Epoll::Event& event) - { - ::close(event.data.fd); - } + void on_error_event(Epoll::Event &event) { ::close(event.data.fd); } - void on_data_event(Epoll::Event& event) + void on_data_event(Epoll::Event &event) { - if(UNLIKELY(socket != event.data.fd)) - return; + if (UNLIKELY(socket != event.data.fd)) return; this->derived().on_connect(); } + template <class... Args> + void on_exception_event(Epoll::Event &event, Args &&... args) + { + // TODO: Do something about it + } + protected: Epoll::Event event; Socket socket; }; - } diff --git a/include/io/network/stream_listener.hpp b/include/io/network/stream_listener.hpp index 28adfd4f4..9b0be8dee 100644 --- a/include/io/network/stream_listener.hpp +++ b/include/io/network/stream_listener.hpp @@ -5,39 +5,44 @@ namespace io { -template <class Derived, class Stream, - size_t max_events = 64, int wait_timeout = -1> +template <class Derived, class Stream, size_t max_events = 64, + int wait_timeout = -1> class StreamListener : public EventListener<Derived, max_events, wait_timeout> { public: using EventListener<Derived, max_events, wait_timeout>::EventListener; - void add(Stream& stream) + void add(Stream &stream) { // add the stream to the event listener this->listener.add(stream.socket, &stream.event); } - void on_close_event(Epoll::Event& event) + void on_close_event(Epoll::Event &event) { this->derived().on_close(to_stream(event)); } - void on_error_event(Epoll::Event& event) + void on_error_event(Epoll::Event &event) { this->derived().on_error(to_stream(event)); } - void on_data_event(Epoll::Event& event) + void on_data_event(Epoll::Event &event) { this->derived().on_data(to_stream(event)); } -private: - Stream& to_stream(Epoll::Event& event) + template <class... Args> + void on_exception_event(Epoll::Event &event, Args &&... args) { - return *reinterpret_cast<Stream*>(event.data.ptr); + this->derived().on_exception(to_stream(event), args...); + } + +private: + Stream &to_stream(Epoll::Event &event) + { + return *reinterpret_cast<Stream *>(event.data.ptr); } }; - } diff --git a/include/mvcc/version_list.hpp b/include/mvcc/version_list.hpp index 0523559b6..ede6f2825 100644 --- a/include/mvcc/version_list.hpp +++ b/include/mvcc/version_list.hpp @@ -155,6 +155,9 @@ public: assert(record != nullptr); lock_and_validate(record, t); + // It could be done with unique_ptr but while this could mean memory + // leak on exception, unique_ptr could mean use after free. Memory + // leak is less dangerous. auto updated = new T(); updated->data = record->data; @@ -174,6 +177,7 @@ public: if (!record) return false; + // TODO: Is this lock and validate necessary lock_and_validate(record, t); return remove(record, t), true; } diff --git a/include/query_engine/hardcode/queries.hpp b/include/query_engine/hardcode/queries.hpp index 6424b745a..9601fd64e 100644 --- a/include/query_engine/hardcode/queries.hpp +++ b/include/query_engine/hardcode/queries.hpp @@ -349,35 +349,42 @@ auto load_queries(Db &db) auto match_label_type_return = [&db](const properties_t &args) { DbAccessor t(db); - auto &type = t.type_find_or_create("TYPE"); - auto &label = t.label_find_or_create("LABEL"); + try { + auto &type = t.type_find_or_create("TYPE"); + auto &label = t.label_find_or_create("LABEL"); - Option<const VertexAccessor> bt; + Option<const VertexAccessor> bt; - auto it_type = type.index().for_range(t).from().label(label); + auto it_type = type.index().for_range(t).from().label(label); - auto it_vertex = t.vertex_access() - .fill() - .label(label) - .clone_to(bt) // Savepoint - .out() - .type(type) - .replace(bt); // Load savepoint + auto it_vertex = t.vertex_access() + .fill() + .label(label) + .clone_to(bt) // Savepoint + .out() + .type(type) + .replace(bt); // Load savepoint - if (it_type.count() > it_vertex.count()) { - // Going through vertices wiil probably be faster - it_vertex.for_all([&](auto n) { - // PRINT n - }); + if (it_type.count() > it_vertex.count()) { + // Going through vertices wiil probably be faster + it_vertex.for_all([&](auto n) { + // PRINT n + }); - } else { - // Going through edges wiil probably be faster - it_type.for_all([&](auto n) { - // PRINT n - }); + } else { + // Going through edges wiil probably be faster + it_type.for_all([&](auto n) { + // PRINT n + }); + } + return t.commit(); + } catch (...) { + // Catch all exceptions + // Print something to logger + t.abort(); + return false; } - return t.commit(); }; // Blueprint: diff --git a/include/transactions/lock_store.hpp b/include/transactions/lock_store.hpp index 6afe19752..eba0a8313 100644 --- a/include/transactions/lock_store.hpp +++ b/include/transactions/lock_store.hpp @@ -1,8 +1,8 @@ #pragma once -#include <vector> #include <cassert> #include <memory> +#include <vector> #include "storage/locking/lock_status.hpp" namespace tx @@ -17,41 +17,38 @@ class LockStore LockHolder() noexcept = default; template <class... Args> - LockHolder(T* lock, Args&&... args) noexcept : lock(lock) + LockHolder(T *lock, Args &&... args) noexcept : lock(lock) { assert(lock != nullptr); auto status = lock->lock(std::forward<Args>(args)...); - if(status != LockStatus::Acquired) - lock = nullptr; + if (status != LockStatus::Acquired) lock = nullptr; } - LockHolder(const LockHolder&) = delete; - LockHolder(LockHolder&& other) noexcept : lock(other.lock) + LockHolder(const LockHolder &) = delete; + LockHolder(LockHolder &&other) noexcept : lock(other.lock) { other.lock = nullptr; } ~LockHolder() { - if(lock != nullptr) - lock->unlock(); + if (lock != nullptr) lock->unlock(); } - bool active() const { return lock != nullptr; } + bool active() const noexcept { return lock != nullptr; } private: - T* lock {nullptr}; + T *lock{nullptr}; }; public: template <class... Args> - void take(T* lock, Args&&... args) + void take(T *lock, Args &&... args) { auto holder = LockHolder(lock, std::forward<Args>(args)...); - if(!holder.active()) - return; + if (!holder.active()) return; locks.emplace_back(LockHolder(lock, std::forward<Args>(args)...)); } @@ -59,5 +56,4 @@ public: private: std::vector<LockHolder> locks; }; - }; diff --git a/include/transactions/transaction.hpp b/include/transactions/transaction.hpp index 864a71f9b..64306d6cd 100644 --- a/include/transactions/transaction.hpp +++ b/include/transactions/transaction.hpp @@ -1,3 +1,4 @@ + #pragma once #include <cstdint> diff --git a/src/communication/bolt/v1/states/executor.cpp b/src/communication/bolt/v1/states/executor.cpp index b8e7e9777..480a23f4e 100644 --- a/src/communication/bolt/v1/states/executor.cpp +++ b/src/communication/bolt/v1/states/executor.cpp @@ -1,5 +1,5 @@ -#include "communication/bolt/v1/states/executor.hpp" #include "communication/bolt/v1/messaging/codes.hpp" +#include "communication/bolt/v1/states/executor.hpp" #ifdef BARRIER #include "barrier/barrier.cpp" diff --git a/src/dbms/cleaner.cpp b/src/dbms/cleaner.cpp index 7ab29bcac..b308827dc 100644 --- a/src/dbms/cleaner.cpp +++ b/src/dbms/cleaner.cpp @@ -7,9 +7,12 @@ #include "database/db_transaction.hpp" #include "threading/thread.hpp" +#include "logging/default.hpp" + Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs) : dbms(dbs) { cleaners.push_back(std::make_unique<Thread>([&]() { + Logger logger = logging::log->logger("Cleaner"); std::time_t last_clean = std::time(nullptr); while (cleaning.load(std::memory_order_acquire)) { std::time_t now = std::time(nullptr); @@ -17,8 +20,18 @@ Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs) : dbms(dbs) if (now >= last_clean + cleaning_cycle) { for (auto &db : dbs.access()) { DbTransaction t(db.second); - t.clean_edge_section(); - t.clean_vertex_section(); + + try { + t.clean_edge_section(); + t.clean_vertex_section(); + } catch (const std::exception &e) { + logger.error( + "Error occured while cleaning database \"{}\"", + db.first); + logger.error("{}", e.what()); + } + + t.trans.commit(); } last_clean = now; } else { diff --git a/src/storage/edges.cpp b/src/storage/edges.cpp index 6bd383d04..58f3d8376 100644 --- a/src/storage/edges.cpp +++ b/src/storage/edges.cpp @@ -24,6 +24,7 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from, // create new vertex record EdgeRecord edge_record(next, from, to); + auto edge = edge_record.insert(t.trans); // insert the new vertex record into the vertex store auto edges_accessor = edges.access(); @@ -31,7 +32,7 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from, // create new vertex auto inserted_edge_record = result.first; - auto edge = inserted_edge_record->second.insert(t.trans); + t.to_update_index<TypeGroupEdge>(&inserted_edge_record->second, edge); return EdgeAccessor(edge, &inserted_edge_record->second, t); diff --git a/src/storage/vertices.cpp b/src/storage/vertices.cpp index aae5aa1c9..18ab6de2d 100644 --- a/src/storage/vertices.cpp +++ b/src/storage/vertices.cpp @@ -23,7 +23,7 @@ VertexAccessor Vertices::insert(DbTransaction &t) // create new vertex record VertexRecord vertex_record(next); - // vertex_record.id(next); + auto vertex = vertex_record.insert(t.trans); // insert the new vertex record into the vertex store auto vertices_accessor = vertices.access(); @@ -31,7 +31,6 @@ VertexAccessor Vertices::insert(DbTransaction &t) // create new vertex auto inserted_vertex_record = result.first; - auto vertex = inserted_vertex_record->second.insert(t.trans); t.to_update_index<TypeGroupVertex>(&inserted_vertex_record->second, vertex); return VertexAccessor(vertex, &inserted_vertex_record->second, t);