Merge with dev.
This commit is contained in:
commit
8fa73eee43
@ -9,8 +9,8 @@
|
||||
|
||||
#include "communication/bolt/v1/bolt.hpp"
|
||||
#include "communication/bolt/v1/session.hpp"
|
||||
#include "logging/default.hpp"
|
||||
#include "io/network/stream_reader.hpp"
|
||||
#include "logging/default.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
@ -76,6 +76,7 @@ public:
|
||||
} catch (const std::exception &e) {
|
||||
logger.error("Error occured while executing statement.");
|
||||
logger.error("{}", e.what());
|
||||
// TODO: report to client
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,6 +86,15 @@ public:
|
||||
session.close();
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
void on_exception(Session &session, Args &&... args)
|
||||
{
|
||||
logger.error("Error occured in this session");
|
||||
logger.error(args...);
|
||||
|
||||
// TODO: Do something about it
|
||||
}
|
||||
|
||||
char buf[65536];
|
||||
|
||||
protected:
|
||||
|
@ -9,6 +9,8 @@ namespace bolt
|
||||
class Error : public State
|
||||
{
|
||||
public:
|
||||
Error();
|
||||
|
||||
State *run(Session &session) override;
|
||||
};
|
||||
|
||||
|
@ -20,12 +20,10 @@ public:
|
||||
State* run(Session& session) override final;
|
||||
|
||||
protected:
|
||||
Logger logger;
|
||||
|
||||
/* Execute an incoming query
|
||||
*
|
||||
*/
|
||||
void run(Session& session, Query& query);
|
||||
State* run(Session& session, Query& query);
|
||||
|
||||
/* Send all remaining results to the client
|
||||
*
|
||||
|
@ -4,6 +4,8 @@
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
|
||||
#include "logging/default.hpp"
|
||||
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
@ -15,9 +17,14 @@ public:
|
||||
using uptr = std::unique_ptr<State>;
|
||||
|
||||
State() = default;
|
||||
State(Logger logger) : logger(logger) {}
|
||||
|
||||
virtual ~State() = default;
|
||||
|
||||
virtual State* run(Session& session) = 0;
|
||||
|
||||
protected:
|
||||
Logger logger;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -249,7 +249,7 @@ public:
|
||||
|
||||
void message_ignored()
|
||||
{
|
||||
write_struct_header(1);
|
||||
write_struct_header(0);
|
||||
write(underlying_cast(MessageCode::Ignored));
|
||||
}
|
||||
|
||||
|
@ -145,6 +145,9 @@ private:
|
||||
// It is lock free but it isn't wait free.
|
||||
void push(T &&data)
|
||||
{
|
||||
// It could be done with unique_ptr but while this could meen memory
|
||||
// leak on excpetion, unique_ptr could meean use after free. Memory
|
||||
// leak is less dangerous.
|
||||
auto node = new Node(data);
|
||||
Node *next = nullptr;
|
||||
do {
|
||||
|
@ -36,7 +36,7 @@
|
||||
|
||||
using namespace std;
|
||||
|
||||
constexpr char *_string = "string";
|
||||
constexpr char const *_string = "string";
|
||||
|
||||
bool equal_str(const char *a, const char *b) { return strcasecmp(a, b) == 0; }
|
||||
|
||||
@ -167,22 +167,6 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
// template <typename F>
|
||||
// Option<unique_ptr<Filler>> make_filler_property(bool vertex,
|
||||
// const char name, Flags
|
||||
// type)
|
||||
// {
|
||||
// if (vertex) {
|
||||
// std::unique_ptr<Filler> f(
|
||||
// F(db.vertex_property_key(name, Type(type))));
|
||||
// return make_option(std::move(f));
|
||||
// } else {
|
||||
// std::unique_ptr<Filler> f(
|
||||
// F(db.edge_property_key(name, Type(type))));
|
||||
// return make_option(std::move(f));
|
||||
// }
|
||||
// }
|
||||
|
||||
template <class TG>
|
||||
typename PropertyFamily<TG>::PropertyType::PropertyFamilyKey
|
||||
prop_key(const char *name, Flags type)
|
||||
@ -197,6 +181,10 @@ private:
|
||||
{
|
||||
tmp_vec.clear();
|
||||
split(header_part, type_mark, tmp_vec);
|
||||
|
||||
const char *name = tmp_vec[0];
|
||||
const char *type = tmp_vec[1];
|
||||
|
||||
if (tmp_vec.size() > 2) {
|
||||
err("To much sub parts in header part");
|
||||
return make_option<unique_ptr<Filler>>();
|
||||
@ -205,28 +193,18 @@ private:
|
||||
warn(
|
||||
"Column ", tmp_vec[0],
|
||||
" doesn't have specified type so string type will be used");
|
||||
tmp_vec.push_back(_string);
|
||||
name = tmp_vec[0];
|
||||
type = _string;
|
||||
} else {
|
||||
warn("Empty colum definition, skiping column.");
|
||||
std::unique_ptr<Filler> f(new SkipFiller());
|
||||
return make_option(std::move(f));
|
||||
}
|
||||
} else {
|
||||
name = tmp_vec[0];
|
||||
type = tmp_vec[1];
|
||||
}
|
||||
|
||||
const char *name = tmp_vec[0];
|
||||
const char *type = tmp_vec[1];
|
||||
|
||||
// cout << name << " # " << type << endl;
|
||||
|
||||
// auto prop_key = [&](auto name, auto type) -> auto
|
||||
// {
|
||||
// if (vertex) {
|
||||
// return db.vertex_property_key(name, Type(type));
|
||||
// } else {
|
||||
// return db.edge_property_key(name, Type(type));
|
||||
// }
|
||||
// };
|
||||
|
||||
if (equal_str(type, "id")) {
|
||||
std::unique_ptr<Filler> f(
|
||||
name[0] == '\0' ? new IdFiller<TG>()
|
||||
|
@ -28,36 +28,37 @@ public:
|
||||
auto n = listener.wait(events, max_events, 200);
|
||||
|
||||
// go through all events and process them in order
|
||||
for(int i = 0; i < n; ++i)
|
||||
{
|
||||
auto& event = events[i];
|
||||
for (int i = 0; i < n; ++i) {
|
||||
auto &event = events[i];
|
||||
|
||||
// hangup event
|
||||
if(UNLIKELY(event.events & EPOLLRDHUP))
|
||||
{
|
||||
this->derived().on_close_event(event);
|
||||
continue;
|
||||
try {
|
||||
// hangup event
|
||||
if (UNLIKELY(event.events & EPOLLRDHUP)) {
|
||||
this->derived().on_close_event(event);
|
||||
continue;
|
||||
}
|
||||
|
||||
// there was an error on the server side
|
||||
if (UNLIKELY(!(event.events & EPOLLIN) ||
|
||||
event.events & (EPOLLHUP | EPOLLERR))) {
|
||||
this->derived().on_error_event(event);
|
||||
continue;
|
||||
}
|
||||
|
||||
// we have some data waiting to be read
|
||||
this->derived().on_data_event(event);
|
||||
} catch (const std::exception &e) {
|
||||
this->derived().on_exception_event(
|
||||
event, "Error occured while processing event \n{}",
|
||||
e.what());
|
||||
}
|
||||
|
||||
// there was an error on the server side
|
||||
if(UNLIKELY(!(event.events & EPOLLIN) ||
|
||||
event.events & (EPOLLHUP | EPOLLERR)))
|
||||
{
|
||||
this->derived().on_error_event(event);
|
||||
continue;
|
||||
}
|
||||
|
||||
// we have some data waiting to be read
|
||||
this->derived().on_data_event(event);
|
||||
}
|
||||
|
||||
// this will be optimized out :D
|
||||
if(wait_timeout < 0)
|
||||
return;
|
||||
if (wait_timeout < 0) return;
|
||||
|
||||
// if there was events, continue to wait on new events
|
||||
if(n != 0)
|
||||
return;
|
||||
if (n != 0) return;
|
||||
|
||||
// wait timeout occurred and there were no events. if wait_timeout
|
||||
// is -1 there will never be any timeouts so client should provide
|
||||
@ -70,5 +71,4 @@ protected:
|
||||
Epoll listener;
|
||||
Epoll::Event events[max_events];
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ template <class Derived>
|
||||
class Server : public EventListener<Derived>
|
||||
{
|
||||
public:
|
||||
Server(Socket&& socket) : socket(std::forward<Socket>(socket))
|
||||
Server(Socket &&socket) : socket(std::forward<Socket>(socket))
|
||||
{
|
||||
event.data.fd = this->socket;
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
@ -17,27 +17,25 @@ public:
|
||||
this->listener.add(this->socket, &event);
|
||||
}
|
||||
|
||||
void on_close_event(Epoll::Event& event)
|
||||
{
|
||||
::close(event.data.fd);
|
||||
}
|
||||
void on_close_event(Epoll::Event &event) { ::close(event.data.fd); }
|
||||
|
||||
void on_error_event(Epoll::Event& event)
|
||||
{
|
||||
::close(event.data.fd);
|
||||
}
|
||||
void on_error_event(Epoll::Event &event) { ::close(event.data.fd); }
|
||||
|
||||
void on_data_event(Epoll::Event& event)
|
||||
void on_data_event(Epoll::Event &event)
|
||||
{
|
||||
if(UNLIKELY(socket != event.data.fd))
|
||||
return;
|
||||
if (UNLIKELY(socket != event.data.fd)) return;
|
||||
|
||||
this->derived().on_connect();
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
void on_exception_event(Epoll::Event &event, Args &&... args)
|
||||
{
|
||||
// TODO: Do something about it
|
||||
}
|
||||
|
||||
protected:
|
||||
Epoll::Event event;
|
||||
Socket socket;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -5,39 +5,44 @@
|
||||
namespace io
|
||||
{
|
||||
|
||||
template <class Derived, class Stream,
|
||||
size_t max_events = 64, int wait_timeout = -1>
|
||||
template <class Derived, class Stream, size_t max_events = 64,
|
||||
int wait_timeout = -1>
|
||||
class StreamListener : public EventListener<Derived, max_events, wait_timeout>
|
||||
{
|
||||
public:
|
||||
using EventListener<Derived, max_events, wait_timeout>::EventListener;
|
||||
|
||||
void add(Stream& stream)
|
||||
void add(Stream &stream)
|
||||
{
|
||||
// add the stream to the event listener
|
||||
this->listener.add(stream.socket, &stream.event);
|
||||
}
|
||||
|
||||
void on_close_event(Epoll::Event& event)
|
||||
void on_close_event(Epoll::Event &event)
|
||||
{
|
||||
this->derived().on_close(to_stream(event));
|
||||
}
|
||||
|
||||
void on_error_event(Epoll::Event& event)
|
||||
void on_error_event(Epoll::Event &event)
|
||||
{
|
||||
this->derived().on_error(to_stream(event));
|
||||
}
|
||||
|
||||
void on_data_event(Epoll::Event& event)
|
||||
void on_data_event(Epoll::Event &event)
|
||||
{
|
||||
this->derived().on_data(to_stream(event));
|
||||
}
|
||||
|
||||
private:
|
||||
Stream& to_stream(Epoll::Event& event)
|
||||
template <class... Args>
|
||||
void on_exception_event(Epoll::Event &event, Args &&... args)
|
||||
{
|
||||
return *reinterpret_cast<Stream*>(event.data.ptr);
|
||||
this->derived().on_exception(to_stream(event), args...);
|
||||
}
|
||||
|
||||
private:
|
||||
Stream &to_stream(Epoll::Event &event)
|
||||
{
|
||||
return *reinterpret_cast<Stream *>(event.data.ptr);
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -155,6 +155,9 @@ public:
|
||||
assert(record != nullptr);
|
||||
lock_and_validate(record, t);
|
||||
|
||||
// It could be done with unique_ptr but while this could mean memory
|
||||
// leak on exception, unique_ptr could mean use after free. Memory
|
||||
// leak is less dangerous.
|
||||
auto updated = new T();
|
||||
updated->data = record->data;
|
||||
|
||||
@ -174,6 +177,7 @@ public:
|
||||
|
||||
if (!record) return false;
|
||||
|
||||
// TODO: Is this lock and validate necessary
|
||||
lock_and_validate(record, t);
|
||||
return remove(record, t), true;
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ enum class EntitySource : uint8_t
|
||||
None,
|
||||
InternalId,
|
||||
LabelIndex,
|
||||
TypeIndex,
|
||||
MainStorage
|
||||
};
|
||||
|
||||
|
@ -26,6 +26,7 @@ using cost_t = uint64_t;
|
||||
constexpr cost_t internal_id_cost = 10;
|
||||
constexpr cost_t property_cost = 100;
|
||||
constexpr cost_t label_cost = 1000;
|
||||
constexpr cost_t type_cost = 1000;
|
||||
constexpr cost_t max_cost = max<cost_t>();
|
||||
|
||||
template <typename T>
|
||||
@ -36,6 +37,7 @@ public:
|
||||
{
|
||||
internal_id,
|
||||
label_index,
|
||||
type_index,
|
||||
property_index,
|
||||
main_storage
|
||||
};
|
||||
@ -47,6 +49,7 @@ public:
|
||||
{
|
||||
costs[SearchPlace::internal_id] = max<T>();
|
||||
costs[SearchPlace::label_index] = max<T>();
|
||||
costs[SearchPlace::type_index] = max<T>();
|
||||
costs[SearchPlace::property_index] = max<T>();
|
||||
costs[SearchPlace::main_storage] = max<T>();
|
||||
}
|
||||
@ -80,6 +83,7 @@ using search_cost_t = SearchCost<cost_t>;
|
||||
|
||||
constexpr auto search_internal_id = search_cost_t::SearchPlace::internal_id;
|
||||
constexpr auto search_label_index = search_cost_t::SearchPlace::label_index;
|
||||
constexpr auto search_type_index = search_cost_t::SearchPlace::type_index;
|
||||
constexpr auto search_property_index =
|
||||
search_cost_t::SearchPlace::property_index;
|
||||
constexpr auto search_main_storage = search_cost_t::SearchPlace::main_storage;
|
||||
|
@ -32,6 +32,7 @@ auto match_query_action =
|
||||
|
||||
auto name = kv.first;
|
||||
|
||||
// TODO: duplicated code -> BIG PROBLEM
|
||||
// find node
|
||||
if (kv.second == ClauseAction::MatchNode) {
|
||||
if (already_matched(cypher_data, name, EntityType::Node))
|
||||
@ -69,6 +70,13 @@ auto match_query_action =
|
||||
if (place == entity_search::search_main_storage) {
|
||||
cypher_data.source(name, EntitySource::MainStorage);
|
||||
}
|
||||
if (place == entity_search::search_type_index) {
|
||||
if (action_data.entity_data.at(name).tags.size() > 1) {
|
||||
throw SemanticError("Multiple type match (currently NOT supported)");
|
||||
}
|
||||
cypher_data.source(name, EntitySource::TypeIndex);
|
||||
cypher_data.tags(name, action_data.entity_data.at(name).tags);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,12 +43,23 @@ auto return_query_action =
|
||||
{
|
||||
if (cypher_data.type(entity) == EntityType::Node) {
|
||||
if (cypher_data.tags(entity).size() == 0)
|
||||
throw CppGeneratorException("entity has no tags");
|
||||
throw CppGeneratorException("node has no labels");
|
||||
auto label = cypher_data.tags(entity).at(0);
|
||||
code += code_line(code::fine_and_write_vertices_by_label,
|
||||
code += code_line(code::find_and_write_vertices_by_label,
|
||||
entity, label);
|
||||
}
|
||||
}
|
||||
|
||||
if (cypher_data.source(entity) == EntitySource::TypeIndex)
|
||||
{
|
||||
if (cypher_data.type(entity) == EntityType::Relationship) {
|
||||
if (cypher_data.tags(entity).size() == 0)
|
||||
throw CppGeneratorException("edge has no tag");
|
||||
auto type = cypher_data.tags(entity).at(0);
|
||||
code += code_line(code::find_and_write_edges_by_type,
|
||||
entity, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (element.is_projection())
|
||||
{
|
||||
|
@ -344,13 +344,13 @@ auto load_queries(Db &db)
|
||||
if (it_type.count() > it_vertex.count()) {
|
||||
// Going through vertices wiil probably be faster
|
||||
it_vertex.to().for_all([&](auto m) {
|
||||
// PRINT n,m
|
||||
// PRINT n, m
|
||||
});
|
||||
|
||||
} else {
|
||||
// Going through edges wiil probably be faster
|
||||
it_type.to().for_all([&](auto m) {
|
||||
// PRINT n,m
|
||||
// PRINT n, m
|
||||
});
|
||||
}
|
||||
|
||||
@ -361,35 +361,42 @@ auto load_queries(Db &db)
|
||||
auto match_label_type_return = [&db](const properties_t &args) {
|
||||
DbAccessor t(db);
|
||||
|
||||
auto &type = t.type_find_or_create("TYPE");
|
||||
auto &label = t.label_find_or_create("LABEL");
|
||||
try {
|
||||
auto &type = t.type_find_or_create("TYPE");
|
||||
auto &label = t.label_find_or_create("LABEL");
|
||||
|
||||
Option<const VertexAccessor> bt;
|
||||
Option<const VertexAccessor> bt;
|
||||
|
||||
auto it_type = type.index().for_range(t).from().label(label);
|
||||
auto it_type = type.index().for_range(t).from().label(label);
|
||||
|
||||
auto it_vertex = t.vertex_access()
|
||||
.fill()
|
||||
.label(label)
|
||||
.clone_to(bt) // Savepoint
|
||||
.out()
|
||||
.type(type)
|
||||
.replace(bt); // Load savepoint
|
||||
auto it_vertex = t.vertex_access()
|
||||
.fill()
|
||||
.label(label)
|
||||
.clone_to(bt) // Savepoint
|
||||
.out()
|
||||
.type(type)
|
||||
.replace(bt); // Load savepoint
|
||||
|
||||
if (it_type.count() > it_vertex.count()) {
|
||||
// Going through vertices wiil probably be faster
|
||||
it_vertex.for_all([&](auto n) {
|
||||
// PRINT n
|
||||
});
|
||||
if (it_type.count() > it_vertex.count()) {
|
||||
// Going through vertices wiil probably be faster
|
||||
it_vertex.for_all([&](auto n) {
|
||||
// PRINT n
|
||||
});
|
||||
|
||||
} else {
|
||||
// Going through edges wiil probably be faster
|
||||
it_type.for_all([&](auto n) {
|
||||
// PRINT n
|
||||
});
|
||||
} else {
|
||||
// Going through edges wiil probably be faster
|
||||
it_type.for_all([&](auto n) {
|
||||
// PRINT n
|
||||
});
|
||||
}
|
||||
return t.commit();
|
||||
} catch (...) {
|
||||
// Catch all exceptions
|
||||
// Print something to logger
|
||||
t.abort();
|
||||
return false;
|
||||
}
|
||||
|
||||
return t.commit();
|
||||
};
|
||||
|
||||
// Blueprint:
|
||||
|
@ -71,7 +71,7 @@ const std::string write_all_vertices =
|
||||
" }});\n"
|
||||
" stream.write_meta(\"rw\");\n";
|
||||
|
||||
const std::string fine_and_write_vertices_by_label =
|
||||
const std::string find_and_write_vertices_by_label =
|
||||
"auto &label = t.label_find_or_create(\"{1}\");\n"
|
||||
" stream.write_field(\"{0}\");\n"
|
||||
" label.index().for_range(t)->for_all([&](auto vertex) {{\n"
|
||||
@ -94,6 +94,17 @@ const std::string write_all_edges =
|
||||
" }});\n"
|
||||
" stream.write_meta(\"rw\");\n";
|
||||
|
||||
const std::string find_and_write_edges_by_type =
|
||||
"auto &type = t.type_find_or_create(\"{1}\");\n"
|
||||
" stream.write_field(\"{0}\");\n"
|
||||
" type.index().for_range(t)->for_all([&](auto edge) {{\n"
|
||||
" stream.write_record();\n"
|
||||
" stream.write_list_header(1);\n"
|
||||
" stream.write(edge);\n"
|
||||
" stream.chunk();\n"
|
||||
" }});\n"
|
||||
" stream.write_meta(\"rw\");\n";
|
||||
|
||||
const std::string return_true = "return true;";
|
||||
|
||||
const std::string todo = "// TODO: {}";
|
||||
|
@ -358,11 +358,14 @@ public:
|
||||
|
||||
void visit(ast::RelationshipTypeList &ast_relationship_type_list) override
|
||||
{
|
||||
auto &data = generator.action_data();
|
||||
auto &action_data = generator.action_data();
|
||||
|
||||
if (ast_relationship_type_list.has_value()) {
|
||||
auto type = ast_relationship_type_list.value->name;
|
||||
data.add_entity_tag(entity, type);
|
||||
action_data.add_entity_tag(entity, type);
|
||||
action_data.csm.search_cost(
|
||||
entity, entity_search::search_type_index,
|
||||
entity_search::type_cost);
|
||||
}
|
||||
|
||||
Traverser::visit(ast_relationship_type_list);
|
||||
|
@ -1,8 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <cassert>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include "storage/locking/lock_status.hpp"
|
||||
|
||||
namespace tx
|
||||
@ -17,41 +17,38 @@ class LockStore
|
||||
LockHolder() noexcept = default;
|
||||
|
||||
template <class... Args>
|
||||
LockHolder(T* lock, Args&&... args) noexcept : lock(lock)
|
||||
LockHolder(T *lock, Args &&... args) noexcept : lock(lock)
|
||||
{
|
||||
assert(lock != nullptr);
|
||||
auto status = lock->lock(std::forward<Args>(args)...);
|
||||
|
||||
if(status != LockStatus::Acquired)
|
||||
lock = nullptr;
|
||||
if (status != LockStatus::Acquired) lock = nullptr;
|
||||
}
|
||||
|
||||
LockHolder(const LockHolder&) = delete;
|
||||
LockHolder(LockHolder&& other) noexcept : lock(other.lock)
|
||||
LockHolder(const LockHolder &) = delete;
|
||||
LockHolder(LockHolder &&other) noexcept : lock(other.lock)
|
||||
{
|
||||
other.lock = nullptr;
|
||||
}
|
||||
|
||||
~LockHolder()
|
||||
{
|
||||
if(lock != nullptr)
|
||||
lock->unlock();
|
||||
if (lock != nullptr) lock->unlock();
|
||||
}
|
||||
|
||||
bool active() const { return lock != nullptr; }
|
||||
bool active() const noexcept { return lock != nullptr; }
|
||||
|
||||
private:
|
||||
T* lock {nullptr};
|
||||
T *lock{nullptr};
|
||||
};
|
||||
|
||||
public:
|
||||
template <class... Args>
|
||||
void take(T* lock, Args&&... args)
|
||||
void take(T *lock, Args &&... args)
|
||||
{
|
||||
auto holder = LockHolder(lock, std::forward<Args>(args)...);
|
||||
|
||||
if(!holder.active())
|
||||
return;
|
||||
if (!holder.active()) return;
|
||||
|
||||
locks.emplace_back(LockHolder(lock, std::forward<Args>(args)...));
|
||||
}
|
||||
@ -59,5 +56,4 @@ public:
|
||||
private:
|
||||
std::vector<LockHolder> locks;
|
||||
};
|
||||
|
||||
};
|
||||
|
@ -1,3 +1,4 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
|
@ -9,16 +9,26 @@
|
||||
class BasicException : public std::exception
|
||||
{
|
||||
public:
|
||||
BasicException(const std::string& message) noexcept : message(message)
|
||||
BasicException(const std::string& message) noexcept
|
||||
: message(message),
|
||||
stacktrace_size(3)
|
||||
{
|
||||
#ifndef NDEBUG
|
||||
this->message += '\n';
|
||||
|
||||
Stacktrace stacktrace;
|
||||
|
||||
for(auto& line : stacktrace)
|
||||
// TODO: write this better
|
||||
// (limit the size of stacktrace)
|
||||
uint64_t count = 0;
|
||||
|
||||
for(auto& line : stacktrace) {
|
||||
this->message += fmt::format(" at {} ({})\n",
|
||||
line.function, line.location);
|
||||
|
||||
if (++count >= stacktrace_size)
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -33,5 +43,6 @@ public:
|
||||
|
||||
private:
|
||||
std::string message;
|
||||
uint64_t stacktrace_size;
|
||||
};
|
||||
|
||||
|
@ -3,13 +3,28 @@
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
Error::Error() : State(logging::log->logger("Error State")) {}
|
||||
|
||||
State* Error::run(Session& session)
|
||||
{
|
||||
logger.trace("Run");
|
||||
|
||||
session.decoder.read_byte();
|
||||
auto message_type = session.decoder.read_byte();
|
||||
|
||||
if(message_type == MessageCode::AckFailure)
|
||||
logger.trace("Message type byte is: {:02X}", message_type);
|
||||
|
||||
if (message_type == MessageCode::PullAll)
|
||||
{
|
||||
session.output_stream.write_ignored();
|
||||
session.output_stream.chunk();
|
||||
session.output_stream.send();
|
||||
return this;
|
||||
}
|
||||
else if(message_type == MessageCode::AckFailure)
|
||||
{
|
||||
// TODO reset current statement? is it even necessary?
|
||||
logger.trace("AckFailure received");
|
||||
|
||||
session.output_stream.write_success_empty();
|
||||
session.output_stream.chunk();
|
||||
|
@ -1,5 +1,5 @@
|
||||
#include "communication/bolt/v1/states/executor.hpp"
|
||||
#include "communication/bolt/v1/messaging/codes.hpp"
|
||||
#include "communication/bolt/v1/states/executor.hpp"
|
||||
|
||||
#ifdef BARRIER
|
||||
#include "barrier/barrier.cpp"
|
||||
@ -8,7 +8,7 @@
|
||||
namespace bolt
|
||||
{
|
||||
|
||||
Executor::Executor() : logger(logging::log->logger("Executor")) {}
|
||||
Executor::Executor() : State(logging::log->logger("Executor")) {}
|
||||
|
||||
State *Executor::run(Session &session)
|
||||
{
|
||||
@ -25,15 +25,22 @@ State *Executor::run(Session &session)
|
||||
|
||||
q.statement = session.decoder.read_string();
|
||||
|
||||
this->run(session, q);
|
||||
try {
|
||||
return this->run(session, q);
|
||||
} catch (QueryEngineException &e) {
|
||||
session.output_stream.write_failure(
|
||||
{{"code", "Memgraph.QueryEngineException"},
|
||||
{"message", e.what()}});
|
||||
session.output_stream.send();
|
||||
return session.bolt.states.error.get();
|
||||
}
|
||||
} else if (message_type == MessageCode::PullAll) {
|
||||
pull_all(session);
|
||||
} else if (message_type == MessageCode::DiscardAll) {
|
||||
discard_all(session);
|
||||
} else if (message_type == MessageCode::Reset) {
|
||||
// todo rollback current transaction
|
||||
// TODO: rollback current transaction
|
||||
// discard all records waiting to be sent
|
||||
|
||||
return this;
|
||||
} else {
|
||||
logger.error("Unrecognized message recieved");
|
||||
@ -45,20 +52,28 @@ State *Executor::run(Session &session)
|
||||
return this;
|
||||
}
|
||||
|
||||
void Executor::run(Session &session, Query &query)
|
||||
State* Executor::run(Session &session, Query &query)
|
||||
{
|
||||
logger.trace("[Run] '{}'", query.statement);
|
||||
|
||||
auto &db = session.active_db();
|
||||
logger.debug("[ActiveDB] '{}'", db.name());
|
||||
|
||||
try {
|
||||
auto is_successfully_executed =
|
||||
query_engine.execute(query.statement, db, session.output_stream);
|
||||
} catch (QueryEngineException &e) {
|
||||
|
||||
if (!is_successfully_executed) {
|
||||
session.output_stream.write_failure(
|
||||
{{"code", "unknown"}, {"message", e.what()}});
|
||||
{{"code", "Memgraph.QueryExecutionFail"},
|
||||
{"message", "Query execution has failed (probably there is no "
|
||||
"element or there are some problems with concurrent "
|
||||
"access -> client has to resolve problems with "
|
||||
"concurrent access)"}});
|
||||
session.output_stream.send();
|
||||
return session.bolt.states.error.get();
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
void Executor::pull_all(Session &session)
|
||||
|
@ -7,21 +7,36 @@
|
||||
#include "database/db_transaction.hpp"
|
||||
#include "threading/thread.hpp"
|
||||
|
||||
#include "logging/default.hpp"
|
||||
|
||||
Cleaning::Cleaning(ConcurrentMap<std::string, Db> &dbs) : dbms(dbs)
|
||||
{
|
||||
cleaners.push_back(std::make_unique<Thread>([&]() {
|
||||
Logger logger = logging::log->logger("Cleaner");
|
||||
std::time_t last_clean = std::time(nullptr);
|
||||
while (cleaning.load(std::memory_order_acquire)) {
|
||||
std::time_t now = std::time(nullptr);
|
||||
|
||||
if (now >= last_clean + cleaning_cycle) {
|
||||
logger.info("Started cleaning cyle");
|
||||
for (auto &db : dbs.access()) {
|
||||
logger.info("Cleaning database \"{}\"", db.first);
|
||||
DbTransaction t(db.second);
|
||||
t.clean_edge_section();
|
||||
t.clean_vertex_section();
|
||||
try {
|
||||
logger.info("Cleaning edges");
|
||||
t.clean_edge_section();
|
||||
logger.info("Cleaning vertices");
|
||||
t.clean_vertex_section();
|
||||
} catch (const std::exception &e) {
|
||||
logger.error(
|
||||
"Error occured while cleaning database \"{}\"",
|
||||
db.first);
|
||||
logger.error("{}", e.what());
|
||||
}
|
||||
t.trans.commit();
|
||||
}
|
||||
last_clean = now;
|
||||
logger.info("Finished cleaning cyle");
|
||||
} else {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
|
@ -21,14 +21,20 @@ queries.append((True, "MATCH (n) WHERE ID(n)=2 RETURN n"))
|
||||
queries.append((True, "MATCH (n) WHERE ID(n)=3 RETURN n"))
|
||||
queries.append((True, "MATCH (n) WHERE ID(n)=4 RETURN n"))
|
||||
queries.append((True, "MATCH (n) WHERE ID(n)=5 RETURN n"))
|
||||
queries.append((True, "MATCH (n) RETURN n"));
|
||||
queries.append((True, "MATCH (n:PERSON) RETURN n"));
|
||||
|
||||
queries.append((True, "MATCH (n1), (n2) WHERE ID(n1)=0 AND ID(n2)=1 CREATE (n1)-[r:IS]->(n2) RETURN r"))
|
||||
queries.append((True, "MATCH (n1), (n2) WHERE ID(n1)=1 AND ID(n2)=2 CREATE (n1)-[r:IS {name: \"test\", age: 23}]->(n2) RETURN r"))
|
||||
queries.append((True, "MATCH (n1), (n2) WHERE ID(n1)=2 AND ID(n2)=0 CREATE (n1)-[r:IS {name: \"test\", age: 23}]->(n2) RETURN r"))
|
||||
queries.append((True, "MATCH (n1), (n2) WHERE ID(n1)=2 AND ID(n2)=0 CREATE (n1)-[r:ARE {name: \"test\", age: 23}]->(n2) RETURN r"))
|
||||
|
||||
queries.append((True, "MATCH ()-[r]-() WHERE ID(r)=0 RETURN r"))
|
||||
queries.append((True, "MATCH ()-[r]-() WHERE ID(r)=1 RETURN r"))
|
||||
queries.append((True, "MATCH ()-[r]-() WHERE ID(r)=2 RETURN r"))
|
||||
queries.append((True, "MATCH ()-[r:IS]-() RETURN r"))
|
||||
queries.append((True, "MATCH ()-[r:ARE]-() RETURN r"))
|
||||
queries.append((True, "MATCH ()-[r]-() RETURN r"))
|
||||
|
||||
queries.append((True, "MATCH (n) WHERE ID(n)=1 SET n.name = \"updated_name\" RETURN n"))
|
||||
queries.append((True, "MATCH (n) WHERE ID(n)=1 RETURN n"))
|
||||
|
@ -24,6 +24,7 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from,
|
||||
|
||||
// create new vertex record
|
||||
EdgeRecord edge_record(next, from, to);
|
||||
auto edge = edge_record.insert(t.trans);
|
||||
|
||||
// insert the new vertex record into the vertex store
|
||||
auto edges_accessor = edges.access();
|
||||
@ -31,7 +32,7 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from,
|
||||
|
||||
// create new vertex
|
||||
auto inserted_edge_record = result.first;
|
||||
auto edge = inserted_edge_record->second.insert(t.trans);
|
||||
|
||||
t.to_update_index<TypeGroupEdge>(&inserted_edge_record->second, edge);
|
||||
|
||||
return EdgeAccessor(edge, &inserted_edge_record->second, t);
|
||||
|
@ -23,7 +23,7 @@ VertexAccessor Vertices::insert(DbTransaction &t)
|
||||
|
||||
// create new vertex record
|
||||
VertexRecord vertex_record(next);
|
||||
// vertex_record.id(next);
|
||||
auto vertex = vertex_record.insert(t.trans);
|
||||
|
||||
// insert the new vertex record into the vertex store
|
||||
auto vertices_accessor = vertices.access();
|
||||
@ -31,7 +31,6 @@ VertexAccessor Vertices::insert(DbTransaction &t)
|
||||
|
||||
// create new vertex
|
||||
auto inserted_vertex_record = result.first;
|
||||
auto vertex = inserted_vertex_record->second.insert(t.trans);
|
||||
t.to_update_index<TypeGroupVertex>(&inserted_vertex_record->second, vertex);
|
||||
|
||||
return VertexAccessor(vertex, &inserted_vertex_record->second, t);
|
||||
|
@ -90,5 +90,7 @@ int main(void)
|
||||
clean_vertex(db);
|
||||
assert(db.graph.vertices.access().size() == 0);
|
||||
|
||||
// TODO: more tests
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user