From 2eb5d3c3ffcbeab4613b940534d8291f7bcc2356 Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Tue, 30 Aug 2016 15:12:30 +0100 Subject: [PATCH 1/3] Added basic exception handlig. Changed methods insert in Vertices and Edges to be valid even if exception occurs. --- .../communication/bolt/v1/server/worker.hpp | 12 ++++- .../concurrent/concurrent_list.hpp | 3 ++ include/import/csv_import.hpp | 42 ++++----------- include/io/network/event_listener.hpp | 48 ++++++++--------- include/io/network/server.hpp | 24 ++++----- include/io/network/stream_listener.hpp | 25 +++++---- include/mvcc/version_list.hpp | 4 ++ include/query_engine/hardcode/queries.hpp | 51 +++++++++++-------- include/transactions/lock_store.hpp | 24 ++++----- include/transactions/transaction.hpp | 1 + src/communication/bolt/v1/states/executor.cpp | 2 +- src/dbms/cleaner.cpp | 17 ++++++- src/storage/edges.cpp | 3 +- src/storage/vertices.cpp | 3 +- 14 files changed, 137 insertions(+), 122 deletions(-) diff --git a/include/communication/bolt/v1/server/worker.hpp b/include/communication/bolt/v1/server/worker.hpp index 7a6ac0183..d6b51bdde 100644 --- a/include/communication/bolt/v1/server/worker.hpp +++ b/include/communication/bolt/v1/server/worker.hpp @@ -9,8 +9,8 @@ #include "communication/bolt/v1/bolt.hpp" #include "communication/bolt/v1/session.hpp" -#include "logging/default.hpp" #include "io/network/stream_reader.hpp" +#include "logging/default.hpp" namespace bolt { @@ -76,6 +76,7 @@ public: } catch (const std::exception &e) { logger.error("Error occured while executing statement."); logger.error("{}", e.what()); + // TODO: report to client } } @@ -85,6 +86,15 @@ public: session.close(); } + template + void on_exception(Session &session, Args &&... args) + { + logger.error("Error occured in this session"); + logger.error(args...); + + // TODO: Do something about it + } + char buf[65536]; protected: diff --git a/include/data_structures/concurrent/concurrent_list.hpp b/include/data_structures/concurrent/concurrent_list.hpp index 8f16a504b..0dfdd41e2 100644 --- a/include/data_structures/concurrent/concurrent_list.hpp +++ b/include/data_structures/concurrent/concurrent_list.hpp @@ -144,6 +144,9 @@ private: // It is lock free but it isn't wait free. void push(T &&data) { + // It could be done with unique_ptr but while this could meen memory + // leak on excpetion, unique_ptr could meean use after free. Memory + // leak is less dangerous. auto node = new Node(data); Node *next = nullptr; do { diff --git a/include/import/csv_import.hpp b/include/import/csv_import.hpp index 75b26334a..00bffe32c 100644 --- a/include/import/csv_import.hpp +++ b/include/import/csv_import.hpp @@ -36,7 +36,7 @@ using namespace std; -constexpr char *_string = "string"; +constexpr char const *_string = "string"; bool equal_str(const char *a, const char *b) { return strcasecmp(a, b) == 0; } @@ -167,22 +167,6 @@ private: } } - // template - // Option> make_filler_property(bool vertex, - // const char name, Flags - // type) - // { - // if (vertex) { - // std::unique_ptr f( - // F(db.vertex_property_key(name, Type(type)))); - // return make_option(std::move(f)); - // } else { - // std::unique_ptr f( - // F(db.edge_property_key(name, Type(type)))); - // return make_option(std::move(f)); - // } - // } - template typename PropertyFamily::PropertyType::PropertyFamilyKey prop_key(const char *name, Flags type) @@ -197,6 +181,10 @@ private: { tmp_vec.clear(); split(header_part, type_mark, tmp_vec); + + const char *name = tmp_vec[0]; + const char *type = tmp_vec[1]; + if (tmp_vec.size() > 2) { err("To much sub parts in header part"); return make_option>(); @@ -205,28 +193,18 @@ private: warn( "Column ", tmp_vec[0], " doesn't have specified type so string type will be used"); - tmp_vec.push_back(_string); + name = tmp_vec[0]; + type = _string; } else { warn("Empty colum definition, skiping column."); std::unique_ptr f(new SkipFiller()); return make_option(std::move(f)); } + } else { + name = tmp_vec[0]; + type = tmp_vec[1]; } - const char *name = tmp_vec[0]; - const char *type = tmp_vec[1]; - - // cout << name << " # " << type << endl; - - // auto prop_key = [&](auto name, auto type) -> auto - // { - // if (vertex) { - // return db.vertex_property_key(name, Type(type)); - // } else { - // return db.edge_property_key(name, Type(type)); - // } - // }; - if (equal_str(type, "id")) { std::unique_ptr f( name[0] == '\0' ? new IdFiller() diff --git a/include/io/network/event_listener.hpp b/include/io/network/event_listener.hpp index 8ce0c5fe8..4794447d5 100644 --- a/include/io/network/event_listener.hpp +++ b/include/io/network/event_listener.hpp @@ -28,36 +28,37 @@ public: auto n = listener.wait(events, max_events, 200); // go through all events and process them in order - for(int i = 0; i < n; ++i) - { - auto& event = events[i]; + for (int i = 0; i < n; ++i) { + auto &event = events[i]; - // hangup event - if(UNLIKELY(event.events & EPOLLRDHUP)) - { - this->derived().on_close_event(event); - continue; + try { + // hangup event + if (UNLIKELY(event.events & EPOLLRDHUP)) { + this->derived().on_close_event(event); + continue; + } + + // there was an error on the server side + if (UNLIKELY(!(event.events & EPOLLIN) || + event.events & (EPOLLHUP | EPOLLERR))) { + this->derived().on_error_event(event); + continue; + } + + // we have some data waiting to be read + this->derived().on_data_event(event); + } catch (const std::exception &e) { + this->derived().on_exception_event( + event, "Error occured while processing event \n{}", + e.what()); } - - // there was an error on the server side - if(UNLIKELY(!(event.events & EPOLLIN) || - event.events & (EPOLLHUP | EPOLLERR))) - { - this->derived().on_error_event(event); - continue; - } - - // we have some data waiting to be read - this->derived().on_data_event(event); } // this will be optimized out :D - if(wait_timeout < 0) - return; + if (wait_timeout < 0) return; // if there was events, continue to wait on new events - if(n != 0) - return; + if (n != 0) return; // wait timeout occurred and there were no events. if wait_timeout // is -1 there will never be any timeouts so client should provide @@ -70,5 +71,4 @@ protected: Epoll listener; Epoll::Event events[max_events]; }; - } diff --git a/include/io/network/server.hpp b/include/io/network/server.hpp index c71f57470..ece730fc8 100644 --- a/include/io/network/server.hpp +++ b/include/io/network/server.hpp @@ -9,7 +9,7 @@ template class Server : public EventListener { public: - Server(Socket&& socket) : socket(std::forward(socket)) + Server(Socket &&socket) : socket(std::forward(socket)) { event.data.fd = this->socket; event.events = EPOLLIN | EPOLLET; @@ -17,27 +17,25 @@ public: this->listener.add(this->socket, &event); } - void on_close_event(Epoll::Event& event) - { - ::close(event.data.fd); - } + void on_close_event(Epoll::Event &event) { ::close(event.data.fd); } - void on_error_event(Epoll::Event& event) - { - ::close(event.data.fd); - } + void on_error_event(Epoll::Event &event) { ::close(event.data.fd); } - void on_data_event(Epoll::Event& event) + void on_data_event(Epoll::Event &event) { - if(UNLIKELY(socket != event.data.fd)) - return; + if (UNLIKELY(socket != event.data.fd)) return; this->derived().on_connect(); } + template + void on_exception_event(Epoll::Event &event, Args &&... args) + { + // TODO: Do something about it + } + protected: Epoll::Event event; Socket socket; }; - } diff --git a/include/io/network/stream_listener.hpp b/include/io/network/stream_listener.hpp index 28adfd4f4..9b0be8dee 100644 --- a/include/io/network/stream_listener.hpp +++ b/include/io/network/stream_listener.hpp @@ -5,39 +5,44 @@ namespace io { -template +template class StreamListener : public EventListener { public: using EventListener::EventListener; - void add(Stream& stream) + void add(Stream &stream) { // add the stream to the event listener this->listener.add(stream.socket, &stream.event); } - void on_close_event(Epoll::Event& event) + void on_close_event(Epoll::Event &event) { this->derived().on_close(to_stream(event)); } - void on_error_event(Epoll::Event& event) + void on_error_event(Epoll::Event &event) { this->derived().on_error(to_stream(event)); } - void on_data_event(Epoll::Event& event) + void on_data_event(Epoll::Event &event) { this->derived().on_data(to_stream(event)); } -private: - Stream& to_stream(Epoll::Event& event) + template + void on_exception_event(Epoll::Event &event, Args &&... args) { - return *reinterpret_cast(event.data.ptr); + this->derived().on_exception(to_stream(event), args...); + } + +private: + Stream &to_stream(Epoll::Event &event) + { + return *reinterpret_cast(event.data.ptr); } }; - } diff --git a/include/mvcc/version_list.hpp b/include/mvcc/version_list.hpp index 0523559b6..ede6f2825 100644 --- a/include/mvcc/version_list.hpp +++ b/include/mvcc/version_list.hpp @@ -155,6 +155,9 @@ public: assert(record != nullptr); lock_and_validate(record, t); + // It could be done with unique_ptr but while this could mean memory + // leak on exception, unique_ptr could mean use after free. Memory + // leak is less dangerous. auto updated = new T(); updated->data = record->data; @@ -174,6 +177,7 @@ public: if (!record) return false; + // TODO: Is this lock and validate necessary lock_and_validate(record, t); return remove(record, t), true; } diff --git a/include/query_engine/hardcode/queries.hpp b/include/query_engine/hardcode/queries.hpp index 6424b745a..9601fd64e 100644 --- a/include/query_engine/hardcode/queries.hpp +++ b/include/query_engine/hardcode/queries.hpp @@ -349,35 +349,42 @@ auto load_queries(Db &db) auto match_label_type_return = [&db](const properties_t &args) { DbAccessor t(db); - auto &type = t.type_find_or_create("TYPE"); - auto &label = t.label_find_or_create("LABEL"); + try { + auto &type = t.type_find_or_create("TYPE"); + auto &label = t.label_find_or_create("LABEL"); - Option bt; + Option bt; - auto it_type = type.index().for_range(t).from().label(label); + auto it_type = type.index().for_range(t).from().label(label); - auto it_vertex = t.vertex_access() - .fill() - .label(label) - .clone_to(bt) // Savepoint - .out() - .type(type) - .replace(bt); // Load savepoint + auto it_vertex = t.vertex_access() + .fill() + .label(label) + .clone_to(bt) // Savepoint + .out() + .type(type) + .replace(bt); // Load savepoint - if (it_type.count() > it_vertex.count()) { - // Going through vertices wiil probably be faster - it_vertex.for_all([&](auto n) { - // PRINT n - }); + if (it_type.count() > it_vertex.count()) { + // Going through vertices wiil probably be faster + it_vertex.for_all([&](auto n) { + // PRINT n + }); - } else { - // Going through edges wiil probably be faster - it_type.for_all([&](auto n) { - // PRINT n - }); + } else { + // Going through edges wiil probably be faster + it_type.for_all([&](auto n) { + // PRINT n + }); + } + return t.commit(); + } catch (...) { + // Catch all exceptions + // Print something to logger + t.abort(); + return false; } - return t.commit(); }; // Blueprint: diff --git a/include/transactions/lock_store.hpp b/include/transactions/lock_store.hpp index 6afe19752..eba0a8313 100644 --- a/include/transactions/lock_store.hpp +++ b/include/transactions/lock_store.hpp @@ -1,8 +1,8 @@ #pragma once -#include #include #include +#include #include "storage/locking/lock_status.hpp" namespace tx @@ -17,41 +17,38 @@ class LockStore LockHolder() noexcept = default; template - LockHolder(T* lock, Args&&... args) noexcept : lock(lock) + LockHolder(T *lock, Args &&... args) noexcept : lock(lock) { assert(lock != nullptr); auto status = lock->lock(std::forward(args)...); - if(status != LockStatus::Acquired) - lock = nullptr; + if (status != LockStatus::Acquired) lock = nullptr; } - LockHolder(const LockHolder&) = delete; - LockHolder(LockHolder&& other) noexcept : lock(other.lock) + LockHolder(const LockHolder &) = delete; + LockHolder(LockHolder &&other) noexcept : lock(other.lock) { other.lock = nullptr; } ~LockHolder() { - if(lock != nullptr) - lock->unlock(); + if (lock != nullptr) lock->unlock(); } - bool active() const { return lock != nullptr; } + bool active() const noexcept { return lock != nullptr; } private: - T* lock {nullptr}; + T *lock{nullptr}; }; public: template - void take(T* lock, Args&&... args) + void take(T *lock, Args &&... args) { auto holder = LockHolder(lock, std::forward(args)...); - if(!holder.active()) - return; + if (!holder.active()) return; locks.emplace_back(LockHolder(lock, std::forward(args)...)); } @@ -59,5 +56,4 @@ public: private: std::vector locks; }; - }; diff --git a/include/transactions/transaction.hpp b/include/transactions/transaction.hpp index 864a71f9b..64306d6cd 100644 --- a/include/transactions/transaction.hpp +++ b/include/transactions/transaction.hpp @@ -1,3 +1,4 @@ + #pragma once #include diff --git a/src/communication/bolt/v1/states/executor.cpp b/src/communication/bolt/v1/states/executor.cpp index b8e7e9777..480a23f4e 100644 --- a/src/communication/bolt/v1/states/executor.cpp +++ b/src/communication/bolt/v1/states/executor.cpp @@ -1,5 +1,5 @@ -#include "communication/bolt/v1/states/executor.hpp" #include "communication/bolt/v1/messaging/codes.hpp" +#include "communication/bolt/v1/states/executor.hpp" #ifdef BARRIER #include "barrier/barrier.cpp" diff --git a/src/dbms/cleaner.cpp b/src/dbms/cleaner.cpp index 7ab29bcac..b308827dc 100644 --- a/src/dbms/cleaner.cpp +++ b/src/dbms/cleaner.cpp @@ -7,9 +7,12 @@ #include "database/db_transaction.hpp" #include "threading/thread.hpp" +#include "logging/default.hpp" + Cleaning::Cleaning(ConcurrentMap &dbs) : dbms(dbs) { cleaners.push_back(std::make_unique([&]() { + Logger logger = logging::log->logger("Cleaner"); std::time_t last_clean = std::time(nullptr); while (cleaning.load(std::memory_order_acquire)) { std::time_t now = std::time(nullptr); @@ -17,8 +20,18 @@ Cleaning::Cleaning(ConcurrentMap &dbs) : dbms(dbs) if (now >= last_clean + cleaning_cycle) { for (auto &db : dbs.access()) { DbTransaction t(db.second); - t.clean_edge_section(); - t.clean_vertex_section(); + + try { + t.clean_edge_section(); + t.clean_vertex_section(); + } catch (const std::exception &e) { + logger.error( + "Error occured while cleaning database \"{}\"", + db.first); + logger.error("{}", e.what()); + } + + t.trans.commit(); } last_clean = now; } else { diff --git a/src/storage/edges.cpp b/src/storage/edges.cpp index 6bd383d04..58f3d8376 100644 --- a/src/storage/edges.cpp +++ b/src/storage/edges.cpp @@ -24,6 +24,7 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from, // create new vertex record EdgeRecord edge_record(next, from, to); + auto edge = edge_record.insert(t.trans); // insert the new vertex record into the vertex store auto edges_accessor = edges.access(); @@ -31,7 +32,7 @@ EdgeAccessor Edges::insert(DbTransaction &t, VertexRecord *from, // create new vertex auto inserted_edge_record = result.first; - auto edge = inserted_edge_record->second.insert(t.trans); + t.to_update_index(&inserted_edge_record->second, edge); return EdgeAccessor(edge, &inserted_edge_record->second, t); diff --git a/src/storage/vertices.cpp b/src/storage/vertices.cpp index aae5aa1c9..18ab6de2d 100644 --- a/src/storage/vertices.cpp +++ b/src/storage/vertices.cpp @@ -23,7 +23,7 @@ VertexAccessor Vertices::insert(DbTransaction &t) // create new vertex record VertexRecord vertex_record(next); - // vertex_record.id(next); + auto vertex = vertex_record.insert(t.trans); // insert the new vertex record into the vertex store auto vertices_accessor = vertices.access(); @@ -31,7 +31,6 @@ VertexAccessor Vertices::insert(DbTransaction &t) // create new vertex auto inserted_vertex_record = result.first; - auto vertex = inserted_vertex_record->second.insert(t.trans); t.to_update_index(&inserted_vertex_record->second, vertex); return VertexAccessor(vertex, &inserted_vertex_record->second, t); From 7f4206e25c98d6e3c0682b3093e5040b920e9490 Mon Sep 17 00:00:00 2001 From: Kruno Tomola Fabro Date: Tue, 30 Aug 2016 15:25:11 +0100 Subject: [PATCH 2/3] Added more logging to cleaner. --- src/dbms/cleaner.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/dbms/cleaner.cpp b/src/dbms/cleaner.cpp index b308827dc..afb00eaa9 100644 --- a/src/dbms/cleaner.cpp +++ b/src/dbms/cleaner.cpp @@ -18,11 +18,15 @@ Cleaning::Cleaning(ConcurrentMap &dbs) : dbms(dbs) std::time_t now = std::time(nullptr); if (now >= last_clean + cleaning_cycle) { + logger.info("Started cleaning cyle"); for (auto &db : dbs.access()) { + logger.info("Cleaning database \"{}\"", db.first); DbTransaction t(db.second); try { + logger.info("Cleaning edges"); t.clean_edge_section(); + logger.info("Cleaning vertices"); t.clean_vertex_section(); } catch (const std::exception &e) { logger.error( @@ -34,6 +38,7 @@ Cleaning::Cleaning(ConcurrentMap &dbs) : dbms(dbs) t.trans.commit(); } last_clean = now; + logger.info("Finished cleaning cyle"); } else { std::this_thread::sleep_for(std::chrono::seconds(1)); } From 781376a9601b9eec23d346b27fb7bf575bef0b81 Mon Sep 17 00:00:00 2001 From: Marko Budiselic Date: Tue, 30 Aug 2016 20:52:46 +0100 Subject: [PATCH 3/3] First version which will be sent to the pilot users --- .../communication/bolt/v1/states/error.hpp | 2 ++ .../communication/bolt/v1/states/executor.hpp | 4 +-- .../communication/bolt/v1/states/state.hpp | 7 +++++ .../bolt/v1/transport/bolt_encoder.hpp | 2 +- .../code_generator/cypher_state.hpp | 1 + .../code_generator/entity_search.hpp | 4 +++ .../code_generator/handlers/match.hpp | 8 +++++ .../code_generator/handlers/return.hpp | 15 +++++++-- include/query_engine/hardcode/queries.hpp | 4 +-- include/query_engine/traverser/code.hpp | 13 +++++++- .../query_engine/traverser/cpp_traverser.hpp | 7 +++-- include/utils/exceptions/basic_exception.hpp | 15 +++++++-- src/communication/bolt/v1/states/error.cpp | 17 +++++++++- src/communication/bolt/v1/states/executor.cpp | 31 ++++++++++++++----- src/examples/bolt_py_client/initial_test.py | 6 ++++ 15 files changed, 114 insertions(+), 22 deletions(-) diff --git a/include/communication/bolt/v1/states/error.hpp b/include/communication/bolt/v1/states/error.hpp index 0748fc5e5..6c9b37d6c 100644 --- a/include/communication/bolt/v1/states/error.hpp +++ b/include/communication/bolt/v1/states/error.hpp @@ -9,6 +9,8 @@ namespace bolt class Error : public State { public: + Error(); + State *run(Session &session) override; }; diff --git a/include/communication/bolt/v1/states/executor.hpp b/include/communication/bolt/v1/states/executor.hpp index 6183dd457..e614d57d2 100644 --- a/include/communication/bolt/v1/states/executor.hpp +++ b/include/communication/bolt/v1/states/executor.hpp @@ -20,12 +20,10 @@ public: State* run(Session& session) override final; protected: - Logger logger; - /* Execute an incoming query * */ - void run(Session& session, Query& query); + State* run(Session& session, Query& query); /* Send all remaining results to the client * diff --git a/include/communication/bolt/v1/states/state.hpp b/include/communication/bolt/v1/states/state.hpp index 50a38494c..2915cc29d 100644 --- a/include/communication/bolt/v1/states/state.hpp +++ b/include/communication/bolt/v1/states/state.hpp @@ -4,6 +4,8 @@ #include #include +#include "logging/default.hpp" + namespace bolt { @@ -15,9 +17,14 @@ public: using uptr = std::unique_ptr; State() = default; + State(Logger logger) : logger(logger) {} + virtual ~State() = default; virtual State* run(Session& session) = 0; + +protected: + Logger logger; }; } diff --git a/include/communication/bolt/v1/transport/bolt_encoder.hpp b/include/communication/bolt/v1/transport/bolt_encoder.hpp index dba6ab9be..8b35dec3e 100644 --- a/include/communication/bolt/v1/transport/bolt_encoder.hpp +++ b/include/communication/bolt/v1/transport/bolt_encoder.hpp @@ -249,7 +249,7 @@ public: void message_ignored() { - write_struct_header(1); + write_struct_header(0); write(underlying_cast(MessageCode::Ignored)); } diff --git a/include/query_engine/code_generator/cypher_state.hpp b/include/query_engine/code_generator/cypher_state.hpp index ea4a2ebab..91b701b0b 100644 --- a/include/query_engine/code_generator/cypher_state.hpp +++ b/include/query_engine/code_generator/cypher_state.hpp @@ -42,6 +42,7 @@ enum class EntitySource : uint8_t None, InternalId, LabelIndex, + TypeIndex, MainStorage }; diff --git a/include/query_engine/code_generator/entity_search.hpp b/include/query_engine/code_generator/entity_search.hpp index 3db500d48..30f06ea62 100644 --- a/include/query_engine/code_generator/entity_search.hpp +++ b/include/query_engine/code_generator/entity_search.hpp @@ -26,6 +26,7 @@ using cost_t = uint64_t; constexpr cost_t internal_id_cost = 10; constexpr cost_t property_cost = 100; constexpr cost_t label_cost = 1000; +constexpr cost_t type_cost = 1000; constexpr cost_t max_cost = max(); template @@ -36,6 +37,7 @@ public: { internal_id, label_index, + type_index, property_index, main_storage }; @@ -47,6 +49,7 @@ public: { costs[SearchPlace::internal_id] = max(); costs[SearchPlace::label_index] = max(); + costs[SearchPlace::type_index] = max(); costs[SearchPlace::property_index] = max(); costs[SearchPlace::main_storage] = max(); } @@ -80,6 +83,7 @@ using search_cost_t = SearchCost; constexpr auto search_internal_id = search_cost_t::SearchPlace::internal_id; constexpr auto search_label_index = search_cost_t::SearchPlace::label_index; +constexpr auto search_type_index = search_cost_t::SearchPlace::type_index; constexpr auto search_property_index = search_cost_t::SearchPlace::property_index; constexpr auto search_main_storage = search_cost_t::SearchPlace::main_storage; diff --git a/include/query_engine/code_generator/handlers/match.hpp b/include/query_engine/code_generator/handlers/match.hpp index c6e928e4b..eeb1ce4bc 100644 --- a/include/query_engine/code_generator/handlers/match.hpp +++ b/include/query_engine/code_generator/handlers/match.hpp @@ -32,6 +32,7 @@ auto match_query_action = auto name = kv.first; + // TODO: duplicated code -> BIG PROBLEM // find node if (kv.second == ClauseAction::MatchNode) { if (already_matched(cypher_data, name, EntityType::Node)) @@ -69,6 +70,13 @@ auto match_query_action = if (place == entity_search::search_main_storage) { cypher_data.source(name, EntitySource::MainStorage); } + if (place == entity_search::search_type_index) { + if (action_data.entity_data.at(name).tags.size() > 1) { + throw SemanticError("Multiple type match (currently NOT supported)"); + } + cypher_data.source(name, EntitySource::TypeIndex); + cypher_data.tags(name, action_data.entity_data.at(name).tags); + } } } diff --git a/include/query_engine/code_generator/handlers/return.hpp b/include/query_engine/code_generator/handlers/return.hpp index e4d631fd4..6a77c9615 100644 --- a/include/query_engine/code_generator/handlers/return.hpp +++ b/include/query_engine/code_generator/handlers/return.hpp @@ -43,12 +43,23 @@ auto return_query_action = { if (cypher_data.type(entity) == EntityType::Node) { if (cypher_data.tags(entity).size() == 0) - throw CppGeneratorException("entity has no tags"); + throw CppGeneratorException("node has no labels"); auto label = cypher_data.tags(entity).at(0); - code += code_line(code::fine_and_write_vertices_by_label, + code += code_line(code::find_and_write_vertices_by_label, entity, label); } } + + if (cypher_data.source(entity) == EntitySource::TypeIndex) + { + if (cypher_data.type(entity) == EntityType::Relationship) { + if (cypher_data.tags(entity).size() == 0) + throw CppGeneratorException("edge has no tag"); + auto type = cypher_data.tags(entity).at(0); + code += code_line(code::find_and_write_edges_by_type, + entity, type); + } + } } else if (element.is_projection()) { diff --git a/include/query_engine/hardcode/queries.hpp b/include/query_engine/hardcode/queries.hpp index 6424b745a..2ef4ab2c7 100644 --- a/include/query_engine/hardcode/queries.hpp +++ b/include/query_engine/hardcode/queries.hpp @@ -332,13 +332,13 @@ auto load_queries(Db &db) if (it_type.count() > it_vertex.count()) { // Going through vertices wiil probably be faster it_vertex.to().for_all([&](auto m) { - // PRINT n,m + // PRINT n, m }); } else { // Going through edges wiil probably be faster it_type.to().for_all([&](auto m) { - // PRINT n,m + // PRINT n, m }); } diff --git a/include/query_engine/traverser/code.hpp b/include/query_engine/traverser/code.hpp index 5bdd58834..23b8d6b5d 100644 --- a/include/query_engine/traverser/code.hpp +++ b/include/query_engine/traverser/code.hpp @@ -71,7 +71,7 @@ const std::string write_all_vertices = " }});\n" " stream.write_meta(\"rw\");\n"; -const std::string fine_and_write_vertices_by_label = +const std::string find_and_write_vertices_by_label = "auto &label = t.label_find_or_create(\"{1}\");\n" " stream.write_field(\"{0}\");\n" " label.index().for_range(t)->for_all([&](auto vertex) {{\n" @@ -94,6 +94,17 @@ const std::string write_all_edges = " }});\n" " stream.write_meta(\"rw\");\n"; +const std::string find_and_write_edges_by_type = + "auto &type = t.type_find_or_create(\"{1}\");\n" + " stream.write_field(\"{0}\");\n" + " type.index().for_range(t)->for_all([&](auto edge) {{\n" + " stream.write_record();\n" + " stream.write_list_header(1);\n" + " stream.write(edge);\n" + " stream.chunk();\n" + " }});\n" + " stream.write_meta(\"rw\");\n"; + const std::string return_true = "return true;"; const std::string todo = "// TODO: {}"; diff --git a/include/query_engine/traverser/cpp_traverser.hpp b/include/query_engine/traverser/cpp_traverser.hpp index 466500943..15e2bbfef 100644 --- a/include/query_engine/traverser/cpp_traverser.hpp +++ b/include/query_engine/traverser/cpp_traverser.hpp @@ -358,11 +358,14 @@ public: void visit(ast::RelationshipTypeList &ast_relationship_type_list) override { - auto &data = generator.action_data(); + auto &action_data = generator.action_data(); if (ast_relationship_type_list.has_value()) { auto type = ast_relationship_type_list.value->name; - data.add_entity_tag(entity, type); + action_data.add_entity_tag(entity, type); + action_data.csm.search_cost( + entity, entity_search::search_type_index, + entity_search::type_cost); } Traverser::visit(ast_relationship_type_list); diff --git a/include/utils/exceptions/basic_exception.hpp b/include/utils/exceptions/basic_exception.hpp index 79ea3d5f9..f358b17ee 100644 --- a/include/utils/exceptions/basic_exception.hpp +++ b/include/utils/exceptions/basic_exception.hpp @@ -9,16 +9,26 @@ class BasicException : public std::exception { public: - BasicException(const std::string& message) noexcept : message(message) + BasicException(const std::string& message) noexcept + : message(message), + stacktrace_size(3) { #ifndef NDEBUG this->message += '\n'; Stacktrace stacktrace; - for(auto& line : stacktrace) + // TODO: write this better + // (limit the size of stacktrace) + uint64_t count = 0; + + for(auto& line : stacktrace) { this->message += fmt::format(" at {} ({})\n", line.function, line.location); + + if (++count >= stacktrace_size) + break; + } #endif } @@ -33,5 +43,6 @@ public: private: std::string message; + uint64_t stacktrace_size; }; diff --git a/src/communication/bolt/v1/states/error.cpp b/src/communication/bolt/v1/states/error.cpp index cfcfc9391..27f6bb339 100644 --- a/src/communication/bolt/v1/states/error.cpp +++ b/src/communication/bolt/v1/states/error.cpp @@ -3,13 +3,28 @@ namespace bolt { +Error::Error() : State(logging::log->logger("Error State")) {} + State* Error::run(Session& session) { + logger.trace("Run"); + + session.decoder.read_byte(); auto message_type = session.decoder.read_byte(); - if(message_type == MessageCode::AckFailure) + logger.trace("Message type byte is: {:02X}", message_type); + + if (message_type == MessageCode::PullAll) + { + session.output_stream.write_ignored(); + session.output_stream.chunk(); + session.output_stream.send(); + return this; + } + else if(message_type == MessageCode::AckFailure) { // TODO reset current statement? is it even necessary? + logger.trace("AckFailure received"); session.output_stream.write_success_empty(); session.output_stream.chunk(); diff --git a/src/communication/bolt/v1/states/executor.cpp b/src/communication/bolt/v1/states/executor.cpp index b8e7e9777..fcf25cc30 100644 --- a/src/communication/bolt/v1/states/executor.cpp +++ b/src/communication/bolt/v1/states/executor.cpp @@ -8,7 +8,7 @@ namespace bolt { -Executor::Executor() : logger(logging::log->logger("Executor")) {} +Executor::Executor() : State(logging::log->logger("Executor")) {} State *Executor::run(Session &session) { @@ -25,15 +25,22 @@ State *Executor::run(Session &session) q.statement = session.decoder.read_string(); - this->run(session, q); + try { + return this->run(session, q); + } catch (QueryEngineException &e) { + session.output_stream.write_failure( + {{"code", "Memgraph.QueryEngineException"}, + {"message", e.what()}}); + session.output_stream.send(); + return session.bolt.states.error.get(); + } } else if (message_type == MessageCode::PullAll) { pull_all(session); } else if (message_type == MessageCode::DiscardAll) { discard_all(session); } else if (message_type == MessageCode::Reset) { - // todo rollback current transaction + // TODO: rollback current transaction // discard all records waiting to be sent - return this; } else { logger.error("Unrecognized message recieved"); @@ -45,20 +52,28 @@ State *Executor::run(Session &session) return this; } -void Executor::run(Session &session, Query &query) +State* Executor::run(Session &session, Query &query) { logger.trace("[Run] '{}'", query.statement); auto &db = session.active_db(); logger.debug("[ActiveDB] '{}'", db.name()); - try { + auto is_successfully_executed = query_engine.execute(query.statement, db, session.output_stream); - } catch (QueryEngineException &e) { + + if (!is_successfully_executed) { session.output_stream.write_failure( - {{"code", "unknown"}, {"message", e.what()}}); + {{"code", "Memgraph.QueryExecutionFail"}, + {"message", "Query execution has failed (probably there is no " + "element or there are some problems with concurrent " + "access -> client has to resolve problems with " + "concurrent access)"}}); session.output_stream.send(); + return session.bolt.states.error.get(); } + + return this; } void Executor::pull_all(Session &session) diff --git a/src/examples/bolt_py_client/initial_test.py b/src/examples/bolt_py_client/initial_test.py index 358807648..f542944d9 100644 --- a/src/examples/bolt_py_client/initial_test.py +++ b/src/examples/bolt_py_client/initial_test.py @@ -21,14 +21,20 @@ queries.append((True, "MATCH (n) WHERE ID(n)=2 RETURN n")) queries.append((True, "MATCH (n) WHERE ID(n)=3 RETURN n")) queries.append((True, "MATCH (n) WHERE ID(n)=4 RETURN n")) queries.append((True, "MATCH (n) WHERE ID(n)=5 RETURN n")) +queries.append((True, "MATCH (n) RETURN n")); +queries.append((True, "MATCH (n:PERSON) RETURN n")); queries.append((True, "MATCH (n1), (n2) WHERE ID(n1)=0 AND ID(n2)=1 CREATE (n1)-[r:IS]->(n2) RETURN r")) queries.append((True, "MATCH (n1), (n2) WHERE ID(n1)=1 AND ID(n2)=2 CREATE (n1)-[r:IS {name: \"test\", age: 23}]->(n2) RETURN r")) queries.append((True, "MATCH (n1), (n2) WHERE ID(n1)=2 AND ID(n2)=0 CREATE (n1)-[r:IS {name: \"test\", age: 23}]->(n2) RETURN r")) +queries.append((True, "MATCH (n1), (n2) WHERE ID(n1)=2 AND ID(n2)=0 CREATE (n1)-[r:ARE {name: \"test\", age: 23}]->(n2) RETURN r")) queries.append((True, "MATCH ()-[r]-() WHERE ID(r)=0 RETURN r")) queries.append((True, "MATCH ()-[r]-() WHERE ID(r)=1 RETURN r")) queries.append((True, "MATCH ()-[r]-() WHERE ID(r)=2 RETURN r")) +queries.append((True, "MATCH ()-[r:IS]-() RETURN r")) +queries.append((True, "MATCH ()-[r:ARE]-() RETURN r")) +queries.append((True, "MATCH ()-[r]-() RETURN r")) queries.append((True, "MATCH (n) WHERE ID(n)=1 SET n.name = \"updated_name\" RETURN n")) queries.append((True, "MATCH (n) WHERE ID(n)=1 RETURN n"))