Added basic exception handlig.

Changed methods insert in Vertices and Edges to be valid even if exception occurs.
This commit is contained in:
Kruno Tomola Fabro 2016-08-30 15:12:30 +01:00
parent 035e508840
commit 2eb5d3c3ff
14 changed files with 137 additions and 122 deletions

View File

@ -9,8 +9,8 @@
#include "communication/bolt/v1/bolt.hpp"
#include "communication/bolt/v1/session.hpp"
#include "logging/default.hpp"
#include "io/network/stream_reader.hpp"
#include "logging/default.hpp"
namespace bolt
{
@ -76,6 +76,7 @@ public:
} catch (const std::exception &e) {
logger.error("Error occured while executing statement.");
logger.error("{}", e.what());
// TODO: report to client
}
}
@ -85,6 +86,15 @@ public:
session.close();
}
template <class... Args>
void on_exception(Session &session, Args &&... args)
{
logger.error("Error occured in this session");
logger.error(args...);
// TODO: Do something about it
}
char buf[65536];
protected:

View File

@ -144,6 +144,9 @@ private:
// It is lock free but it isn't wait free.
void push(T &&data)
{
// It could be done with unique_ptr but while this could meen memory
// leak on excpetion, unique_ptr could meean use after free. Memory
// leak is less dangerous.
auto node = new Node(data);
Node *next = nullptr;
do {

View File

@ -36,7 +36,7 @@
using namespace std;
constexpr char *_string = "string";
constexpr char const *_string = "string";
bool equal_str(const char *a, const char *b) { return strcasecmp(a, b) == 0; }
@ -167,22 +167,6 @@ private:
}
}
// template <typename F>
// Option<unique_ptr<Filler>> make_filler_property(bool vertex,
// const char name, Flags
// type)
// {
// if (vertex) {
// std::unique_ptr<Filler> f(
// F(db.vertex_property_key(name, Type(type))));
// return make_option(std::move(f));
// } else {
// std::unique_ptr<Filler> f(
// F(db.edge_property_key(name, Type(type))));
// return make_option(std::move(f));
// }
// }
template <class TG>
typename PropertyFamily<TG>::PropertyType::PropertyFamilyKey
prop_key(const char *name, Flags type)
@ -197,6 +181,10 @@ private:
{
tmp_vec.clear();
split(header_part, type_mark, tmp_vec);
const char *name = tmp_vec[0];
const char *type = tmp_vec[1];
if (tmp_vec.size() > 2) {
err("To much sub parts in header part");
return make_option<unique_ptr<Filler>>();
@ -205,28 +193,18 @@ private:
warn(
"Column ", tmp_vec[0],
" doesn't have specified type so string type will be used");
tmp_vec.push_back(_string);
name = tmp_vec[0];
type = _string;
} else {
warn("Empty colum definition, skiping column.");
std::unique_ptr<Filler> f(new SkipFiller());
return make_option(std::move(f));
}
} else {
name = tmp_vec[0];
type = tmp_vec[1];
}
const char *name = tmp_vec[0];
const char *type = tmp_vec[1];
// cout << name << " # " << type << endl;
// auto prop_key = [&](auto name, auto type) -> auto
// {
// if (vertex) {
// return db.vertex_property_key(name, Type(type));
// } else {
// return db.edge_property_key(name, Type(type));
// }
// };
if (equal_str(type, "id")) {
std::unique_ptr<Filler> f(
name[0] == '\0' ? new IdFiller<TG>()

View File

@ -28,36 +28,37 @@ public:
auto n = listener.wait(events, max_events, 200);
// go through all events and process them in order
for(int i = 0; i < n; ++i)
{
auto& event = events[i];
for (int i = 0; i < n; ++i) {
auto &event = events[i];
// hangup event
if(UNLIKELY(event.events & EPOLLRDHUP))
{
this->derived().on_close_event(event);
continue;
try {
// hangup event
if (UNLIKELY(event.events & EPOLLRDHUP)) {
this->derived().on_close_event(event);
continue;
}
// there was an error on the server side
if (UNLIKELY(!(event.events & EPOLLIN) ||
event.events & (EPOLLHUP | EPOLLERR))) {
this->derived().on_error_event(event);
continue;
}
// we have some data waiting to be read
this->derived().on_data_event(event);
} catch (const std::exception &e) {
this->derived().on_exception_event(
event, "Error occured while processing event \n{}",
e.what());
}
// there was an error on the server side
if(UNLIKELY(!(event.events & EPOLLIN) ||
event.events & (EPOLLHUP | EPOLLERR)))
{
this->derived().on_error_event(event);
continue;
}
// we have some data waiting to be read
this->derived().on_data_event(event);
}
// this will be optimized out :D
if(wait_timeout < 0)
return;
if (wait_timeout < 0) return;
// if there was events, continue to wait on new events
if(n != 0)
return;
if (n != 0) return;
// wait timeout occurred and there were no events. if wait_timeout
// is -1 there will never be any timeouts so client should provide
@ -70,5 +71,4 @@ protected:
Epoll listener;
Epoll::Event events[max_events];
};
}

View File

@ -9,7 +9,7 @@ template <class Derived>
class Server : public EventListener<Derived>
{
public:
Server(Socket&& socket) : socket(std::forward<Socket>(socket))
Server(Socket &&socket) : socket(std::forward<Socket>(socket))
{
event.data.fd = this->socket;
event.events = EPOLLIN | EPOLLET;
@ -17,27 +17,25 @@ public:
this->listener.add(this->socket, &event);
}
void on_close_event(Epoll::Event& event)
{
::close(event.data.fd);
}
void on_close_event(Epoll::Event &event) { ::close(event.data.fd); }
void on_error_event(Epoll::Event& event)
{
::close(event.data.fd);
}
void on_error_event(Epoll::Event &event) { ::close(event.data.fd); }
void on_data_event(Epoll::Event& event)
void on_data_event(Epoll::Event &event)
{
if(UNLIKELY(socket != event.data.fd))
return;
if (UNLIKELY(socket != event.data.fd)) return;
this->derived().on_connect();
}
template <class... Args>
void on_exception_event(Epoll::Event &event, Args &&... args)
{
// TODO: Do something about it
}
protected:
Epoll::Event event;
Socket socket;
};
}

View File

@ -5,39 +5,44 @@
namespace io
{
template <class Derived, class Stream,
size_t max_events = 64, int wait_timeout = -1>
template <class Derived, class Stream, size_t max_events = 64,
int wait_timeout = -1>
class StreamListener : public EventListener<Derived, max_events, wait_timeout>
{
public:
using EventListener<Derived, max_events, wait_timeout>::EventListener;
void add(Stream& stream)
void add(Stream &stream)
{
// add the stream to the event listener
this->listener.add(stream.socket, &stream.event);
}
void on_close_event(Epoll::Event& event)
void on_close_event(Epoll::Event &event)
{
this->derived().on_close(to_stream(event));
}
void on_error_event(Epoll::Event& event)
void on_error_event(Epoll::Event &event)
{
this->derived().on_error(to_stream(event));
}
void on_data_event(Epoll::Event& event)
void on_data_event(Epoll::Event &event)
{
this->derived().on_data(to_stream(event));
}
private:
Stream& to_stream(Epoll::Event& event)
template <class... Args>
void on_exception_event(Epoll::Event &event, Args &&... args)
{
return *reinterpret_cast<Stream*>(event.data.ptr);
this->derived().on_exception(to_stream(event), args...);
}
private:
Stream &to_stream(Epoll::Event &event)
{
return *reinterpret_cast<Stream *>(event.data.ptr);
}
};
}

View File

@ -155,6 +155,9 @@ public:
assert(record != nullptr);
lock_and_validate(record, t);
// It could be done with unique_ptr but while this could mean memory
// leak on exception, unique_ptr could mean use after free. Memory
// leak is less dangerous.
auto updated = new T();
updated->data = record->data;
@ -174,6 +177,7 @@ public:
if (!record) return false;
// TODO: Is this lock and validate necessary
lock_and_validate(record, t);
return remove(record, t), true;
}

View File

@ -349,35 +349,42 @@ auto load_queries(Db &db)
auto match_label_type_return = [&db](const properties_t &args) {
DbAccessor t(db);
auto &type = t.type_find_or_create("TYPE");
auto &label = t.label_find_or_create("LABEL");
try {
auto &type = t.type_find_or_create("TYPE");
auto &label = t.label_find_or_create("LABEL");
Option<const VertexAccessor> bt;
Option<const VertexAccessor> bt;
auto it_type = type.index().for_range(t).from().label(label);
auto it_type = type.index().for_range(t).from().label(label);
auto it_vertex = t.vertex_access()
.fill()
.label(label)
.clone_to(bt) // Savepoint
.out()
.type(type)
.replace(bt); // Load savepoint
auto it_vertex = t.vertex_access()
.fill()
.label(label)
.clone_to(bt) // Savepoint
.out()
.type(type)
.replace(bt); // Load savepoint
if (it_type.count() > it_vertex.count()) {
// Going through vertices wiil probably be faster
it_vertex.for_all([&](auto n) {
// PRINT n
});
if (it_type.count() > it_vertex.count()) {
// Going through vertices wiil probably be faster
it_vertex.for_all([&](auto n) {
// PRINT n
});
} else {
// Going through edges wiil probably be faster
it_type.for_all([&](auto n) {
// PRINT n
});
} else {
// Going through edges wiil probably be faster
it_type.for_all([&](auto n) {
// PRINT n
});
}
return t.commit();
} catch (...) {
// Catch all exceptions
// Print something to logger
t.abort();
return false;
}
return t.commit();
};
// Blueprint:

View File

@ -1,8 +1,8 @@
#pragma once
#include <vector>
#include <cassert>
#include <memory>
#include <vector>
#include "storage/locking/lock_status.hpp"
namespace tx
@ -17,41 +17,38 @@ class LockStore
LockHolder() noexcept = default;
template <class... Args>
LockHolder(T* lock, Args&&... args) noexcept : lock(lock)
LockHolder(T *lock, Args &&... args) noexcept : lock(lock)
{
assert(lock != nullptr);
auto status = lock->lock(std::forward<Args>(args)...);
if(status != LockStatus::Acquired)
lock = nullptr;
if (status != LockStatus::Acquired) lock = nullptr;
}
LockHolder(const LockHolder&) = delete;
LockHolder(LockHolder&& other) noexcept : lock(other.lock)
LockHolder(const LockHolder &) = delete;
LockHolder(LockHolder &&other) noexcept : lock(other.lock)
{
other.lock = nullptr;
}
~LockHolder()
{
if(lock != nullptr)
lock->unlock();
if (lock != nullptr) lock->unlock();
}
bool active() const { return lock != nullptr; }
bool active() const noexcept { return lock != nullptr; }
private:
T* lock {nullptr};
T *lock{nullptr};
};
public:
template <class... Args>
void take(T* lock, Args&&... args)
void take(T *lock, Args &&... args)
{
auto holder = LockHolder(lock, std::forward<Args>(args)...);
if(!holder.active())
return;
if (!holder.active()) return;
locks.emplace_back(LockHolder(lock, std::forward<Args>(args)...));
}
@ -59,5 +56,4 @@ public:
private:
std::vector<LockHolder> locks;
};
};

View File

@ -1,3 +1,4 @@
#pragma once
#include <cstdint>

View File

@ -1,5 +1,5 @@
#include "communication/bolt/v1/states/executor.hpp"
#include "communication/bolt/v1/messaging/codes.hpp"
#include "communication/bolt/v1/states/executor.hpp"
#ifdef BARRIER
#include "barrier/barrier.cpp"

View File

@ -7,9 +7,12 @@
#include "database/db_transaction.hpp"
#include "threading/thread.hpp"
#include "logging/default.hpp"
Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs) : dbms(dbs)
{
cleaners.push_back(std::make_unique<Thread>([&]() {
Logger logger = logging::log->logger("Cleaner");
std::time_t last_clean = std::time(nullptr);
while (cleaning.load(std::memory_order_acquire)) {
std::time_t now = std::time(nullptr);
@ -17,8 +20,18 @@ Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs) : dbms(dbs)
if (now >= last_clean + cleaning_cycle) {
for (auto &db : dbs.access()) {
DbTransaction t(db.second);
t.clean_edge_section();
t.clean_vertex_section();
try {
t.clean_edge_section();
t.clean_vertex_section();
} catch (const std::exception &e) {
logger.error(
"Error occured while cleaning database \"{}\"",
db.first);
logger.error("{}", e.what());
}
t.trans.commit();
}
last_clean = now;
} else {

View File

@ -24,6 +24,7 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from,
// create new vertex record
EdgeRecord edge_record(next, from, to);
auto edge = edge_record.insert(t.trans);
// insert the new vertex record into the vertex store
auto edges_accessor = edges.access();
@ -31,7 +32,7 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from,
// create new vertex
auto inserted_edge_record = result.first;
auto edge = inserted_edge_record->second.insert(t.trans);
t.to_update_index<TypeGroupEdge>(&inserted_edge_record->second, edge);
return EdgeAccessor(edge, &inserted_edge_record->second, t);

View File

@ -23,7 +23,7 @@ VertexAccessor Vertices::insert(DbTransaction &t)
// create new vertex record
VertexRecord vertex_record(next);
// vertex_record.id(next);
auto vertex = vertex_record.insert(t.trans);
// insert the new vertex record into the vertex store
auto vertices_accessor = vertices.access();
@ -31,7 +31,6 @@ VertexAccessor Vertices::insert(DbTransaction &t)
// create new vertex
auto inserted_vertex_record = result.first;
auto vertex = inserted_vertex_record->second.insert(t.trans);
t.to_update_index<TypeGroupVertex>(&inserted_vertex_record->second, vertex);
return VertexAccessor(vertex, &inserted_vertex_record->second, t);