From be820ce915c20fd7afebae8c8b38f4d933bad0dd Mon Sep 17 00:00:00 2001 From: Marko Budiselic <mbudiselicbuda@gmail.com> Date: Tue, 30 Aug 2016 01:01:03 +0100 Subject: [PATCH 1/3] tmp commit, work in progress, relationship create --- CMakeLists.txt | 11 ++++++ include/query_engine/code_generator.hpp | 2 +- .../code_generator/cypher_state.hpp | 30 +++++++++++++--- .../code_generator/handlers/match.hpp | 15 ++++---- .../code_generator/handlers/return.hpp | 35 +++++++++++++++---- include/query_engine/traverser/code.hpp | 17 +++++++-- .../query_engine/traverser/cpp_traverser.hpp | 8 +++-- include/storage/vertices.hpp | 3 ++ .../bolt/v1/serialization/bolt_serializer.cpp | 6 ++++ src/communication/bolt/v1/states/executor.cpp | 3 +- 10 files changed, 107 insertions(+), 23 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 01bfbe64f..d3376aa6d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -307,6 +307,7 @@ endif() set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -Wall") +# TODO: find a way how to applay the defines at the query compile time # -- configure defines -- default is ON | true | enabled ---------------------- # -- logging ------------------------------------------------------------------ option(LOG_NO_TRACE "Disable trace logging" OFF) @@ -376,6 +377,16 @@ message(STATUS "TOOLS binaries: ${TOOLS}") option(TESTS "Build test binaries" ON) message(STATUS "TESTS binaries: ${TESTS}") # -- binaries ----------------------------------------------------------------- +# -- barrier - this is the way how the engine is isolated so it can be shipped +# wherever, the code is completely hidden behind the barrier, during the +# development the barrier can be turned off because it is much easier to +# debug +option(BARRIER "Barrier" ON) +message(STATUS "Barrier: ${BARRIER} (Source code isolation)") +if(BARRIER) + add_definitions( -DBARRIER ) +endif() +# -- barrier ------------------------------------------------------------------ # -- configure defines -------------------------------------------------------- # -- includes ----------------------------------------------------------------- diff --git a/include/query_engine/code_generator.hpp b/include/query_engine/code_generator.hpp index 8230b544e..eb602484d 100644 --- a/include/query_engine/code_generator.hpp +++ b/include/query_engine/code_generator.hpp @@ -58,7 +58,7 @@ public: {"stream", type_name<Stream>().to_string()}, {"code", cpp_traverser.code}}); - // logger.trace("generated code: {}", generated); + logger.debug("generated code: {}", generated); utils::write_file(generated, path); } diff --git a/include/query_engine/code_generator/cypher_state.hpp b/include/query_engine/code_generator/cypher_state.hpp index 10bbf4758..0e213b2bf 100644 --- a/include/query_engine/code_generator/cypher_state.hpp +++ b/include/query_engine/code_generator/cypher_state.hpp @@ -22,23 +22,33 @@ enum class CypherState : uint8_t enum class EntityStatus : uint8_t { - NotFound, + None, Matched, Created }; enum class EntityType : uint8_t { - NotFound, + None, Node, Relationship }; +// where OR how entity can be found +enum class EntitySource : uint8_t +{ + None, + InternalId, + LabelIndex, + MainStorage +}; + class CypherStateData { private: std::map<std::string, EntityStatus> entity_status; std::map<std::string, EntityType> entity_type; + std::map<std::string, EntitySource> entity_source; // TODO: container that keeps track about c++ variable names @@ -51,7 +61,7 @@ public: EntityStatus status(const std::string &name) { if (entity_status.find(name) == entity_status.end()) - return EntityStatus::NotFound; + return EntityStatus::None; return entity_status.at(name); } @@ -59,11 +69,18 @@ public: EntityType type(const std::string &name) const { if (entity_type.find(name) == entity_type.end()) - return EntityType::NotFound; + return EntityType::None; return entity_type.at(name); } + EntitySource source(const std::string &name) const + { + if (entity_source.find(name) == entity_source.end()) + return EntitySource::None; + return entity_source.at(name); + } + const std::map<std::string, EntityType> &all_typed_enteties() { return entity_type; @@ -92,4 +109,9 @@ public: entity_type[name] = EntityType::Relationship; entity_status[name] = EntityStatus::Created; } + + void source(const std::string& name, EntitySource source) + { + entity_source[name] = source; + } }; diff --git a/include/query_engine/code_generator/handlers/match.hpp b/include/query_engine/code_generator/handlers/match.hpp index 4d99220b6..0350d83ba 100644 --- a/include/query_engine/code_generator/handlers/match.hpp +++ b/include/query_engine/code_generator/handlers/match.hpp @@ -30,26 +30,29 @@ auto match_query_action = for (auto const &kv : action_data.actions) { - // TODO: the same code REFACTOR! + auto name = kv.first; + // find node if (kv.second == ClauseAction::MatchNode) { - auto name = kv.first; - if (already_matched(cypher_data, name, EntityType::Node)) continue; + if (already_matched(cypher_data, name, EntityType::Node)) + continue; cypher_data.node_matched(name); - auto place = action_data.csm.min(kv.first); + auto place = action_data.csm.min(name); if (place == entity_search::search_internal_id) { auto index = fetch_internal_index(action_data, name); code += code_line(code::match_vertex_by_id, name, index); } + if (place == entity_search::search_main_storage) { + cypher_data.source(name, EntitySource::MainStorage); + } } // find relationship if (kv.second == ClauseAction::MatchRelationship) { - auto name = kv.first; if (already_matched(cypher_data, name, EntityType::Relationship)) continue; cypher_data.relationship_matched(name); - auto place = action_data.csm.min(kv.first); + auto place = action_data.csm.min(name); if (place == entity_search::search_internal_id) { auto index = fetch_internal_index(action_data, name); code += code_line(code::match_edge_by_id, name, index); diff --git a/include/query_engine/code_generator/handlers/return.hpp b/include/query_engine/code_generator/handlers/return.hpp index 5a21400b3..864f84055 100644 --- a/include/query_engine/code_generator/handlers/return.hpp +++ b/include/query_engine/code_generator/handlers/return.hpp @@ -11,16 +11,37 @@ auto return_query_action = const auto &elements = action_data.return_elements; code += code_line("// number of elements {}", elements.size()); - // TODO: call bolt serialization - for (const auto &element : elements) { + for (const auto &element : elements) + { auto &entity = element.entity; - if (!cypher_data.exist(entity)) { + + if (!cypher_data.exist(entity)) throw SemanticError( fmt::format("{} couldn't be found (RETURN clause).", entity)); - } - if (element.is_entity_only()) { - code += code_line(code::write_entity, entity); - } else if (element.is_projection()) { + + if (element.is_entity_only()) + { + // if the node has just recently been created on can be found + // with the internal id then it can be sent to the client + if (cypher_data.status(entity) == EntityStatus::Created || + (cypher_data.source(entity) == EntitySource::InternalId && + cypher_data.status(entity) == EntityStatus::Matched)) + { + code += code_line(code::write_entity, entity); + } + // the client has to receive all elements from the main storage + if (cypher_data.source(entity) == EntitySource::MainStorage) + { + code += code_line(code::write_all_vertices, entity); + } + if (cypher_data.source(entity) == EntitySource::LabelIndex) + { + // TODO: fetch somehow label name + // TODO: code_line + } + } + else if (element.is_projection()) + { code += code_line("// TODO: implement projection"); // auto &property = element.property; // code += code_line(code::print_property, entity, property); diff --git a/include/query_engine/traverser/code.hpp b/include/query_engine/traverser/code.hpp index 8b30f15a6..3ac772529 100644 --- a/include/query_engine/traverser/code.hpp +++ b/include/query_engine/traverser/code.hpp @@ -6,14 +6,15 @@ struct Code { - void reset() { code = ""; } - std::string code; + + void reset() { code = ""; } }; namespace code { +// TODO: one more abstraction level // TODO: UNIT tests const std::string transaction_begin = "DbAccessor t(db);"; @@ -58,6 +59,18 @@ const std::string write_entity = "stream.write_field(\"{0}\");\n" " stream.chunk();" " stream.write_meta(\"rw\");\n"; +const std::string write_all_vertices = + "stream.write_field(\"{0}\");\n" + " iter::for_all(t.vertex_access(), [&](auto vertex) {{\n" + " if (vertex.fill()) {{\n" + " stream.write_record();\n" + " stream.write_list_header(1);\n" + " stream.write(vertex);\n" + " stream.chunk();\n" + " }}\n" + " }});\n" + " stream.write_meta(\"rw\");\n"; + const std::string return_true = "return true;"; const std::string todo = "// TODO: {}"; diff --git a/include/query_engine/traverser/cpp_traverser.hpp b/include/query_engine/traverser/cpp_traverser.hpp index 67e27e076..28ec02937 100644 --- a/include/query_engine/traverser/cpp_traverser.hpp +++ b/include/query_engine/traverser/cpp_traverser.hpp @@ -255,7 +255,11 @@ public: Traverser::visit(ast_node); - if (state == CypherState::Create) { + // this is here because of RETURN clause + // CREATE (n {...}) RETURN n + if (cypher_data.status(name) != EntityStatus::Matched && + state == CypherState::Create) + { cypher_data.node_created(name); } } @@ -447,7 +451,7 @@ public: auto &cypher_data = generator.cypher_data(); auto entity_type = cypher_data.type(entity); - if (entity_type == EntityType::NotFound) + if (entity_type == EntityType::None) throw SemanticError("Entity (" + entity + ") doesn't exist"); auto &action_data = generator.action_data(); diff --git a/include/storage/vertices.hpp b/include/storage/vertices.hpp index bb9336bc1..f2e063d75 100644 --- a/include/storage/vertices.hpp +++ b/include/storage/vertices.hpp @@ -30,6 +30,9 @@ public: // Creates new Vertex and returns filled VertexAccessor. VertexAccessor insert(DbTransaction &t); + // TODO: how can I know how many elements exist + // without iterating through all of them? MVCC? + VertexPropertyFamily & property_family_find_or_create(const std::string &name); diff --git a/src/communication/bolt/v1/serialization/bolt_serializer.cpp b/src/communication/bolt/v1/serialization/bolt_serializer.cpp index 4ff4c7f56..1977bb5cd 100644 --- a/src/communication/bolt/v1/serialization/bolt_serializer.cpp +++ b/src/communication/bolt/v1/serialization/bolt_serializer.cpp @@ -1,5 +1,8 @@ #include "communication/bolt/v1/serialization/bolt_serializer.hpp" +#include "communication/bolt/v1/transport/chunked_buffer.hpp" +#include "communication/bolt/v1/transport/chunked_encoder.hpp" +#include "communication/bolt/v1/transport/socket_stream.hpp" #include "storage/edge_x_vertex.hpp" template <class Stream> @@ -28,3 +31,6 @@ void bolt::BoltSerializer<Stream>::write(const EdgeAccessor &edge) write(*prop.second); } } + +// template class bolt::BoltSerializer<bolt::BoltEncoder< +// bolt::ChunkedEncoder<bolt::ChunkedBuffer<bolt::SocketStream<io::Socket>>>>>; diff --git a/src/communication/bolt/v1/states/executor.cpp b/src/communication/bolt/v1/states/executor.cpp index 6d280f37e..b893c4a0d 100644 --- a/src/communication/bolt/v1/states/executor.cpp +++ b/src/communication/bolt/v1/states/executor.cpp @@ -57,7 +57,8 @@ void Executor::run(Session& session, Query& query) auto &db = session.active_db(); logger.debug("[ActiveDB] '{}'", db.name()); - // TODO: hangle syntax error use case + // TODO: error handling + query_engine.execute(query.statement, db, session.output_stream); } From b5db8d8d1e820c6dcfc790f88387776948a4fc25 Mon Sep 17 00:00:00 2001 From: Marko Budiselic <mbudiselicbuda@gmail.com> Date: Tue, 30 Aug 2016 06:26:27 +0100 Subject: [PATCH 2/3] match vertex by id, label and from main vertex store --- include/query_engine/code_generator/handlers/return.hpp | 2 +- include/query_engine/query_engine.hpp | 3 ++- include/query_engine/traverser/code.hpp | 4 ++-- include/query_engine/traverser/cpp_traverser.hpp | 6 +++++- src/communication/bolt/v1/states/executor.cpp | 8 +++++--- 5 files changed, 15 insertions(+), 8 deletions(-) diff --git a/include/query_engine/code_generator/handlers/return.hpp b/include/query_engine/code_generator/handlers/return.hpp index 11dbf9f7c..e4d631fd4 100644 --- a/include/query_engine/code_generator/handlers/return.hpp +++ b/include/query_engine/code_generator/handlers/return.hpp @@ -38,6 +38,7 @@ auto return_query_action = code += code_line(code::write_all_edges, entity); } + // the client will receive entities from label index if (cypher_data.source(entity) == EntitySource::LabelIndex) { if (cypher_data.type(entity) == EntityType::Node) { @@ -47,7 +48,6 @@ auto return_query_action = code += code_line(code::fine_and_write_vertices_by_label, entity, label); } - // TODO: code_line } } else if (element.is_projection()) diff --git a/include/query_engine/query_engine.hpp b/include/query_engine/query_engine.hpp index 33f756dce..bdf3344b3 100644 --- a/include/query_engine/query_engine.hpp +++ b/include/query_engine/query_engine.hpp @@ -36,7 +36,8 @@ public: } catch (QueryEngineException &e) { // in this case something fatal went wrong logger.error("QueryEngineException: {}", std::string(e.what())); - return false; + // return false; + throw e; } } diff --git a/include/query_engine/traverser/code.hpp b/include/query_engine/traverser/code.hpp index 4cb6caf54..5bdd58834 100644 --- a/include/query_engine/traverser/code.hpp +++ b/include/query_engine/traverser/code.hpp @@ -74,12 +74,12 @@ const std::string write_all_vertices = const std::string fine_and_write_vertices_by_label = "auto &label = t.label_find_or_create(\"{1}\");\n" " stream.write_field(\"{0}\");\n" - " label.index().for_range(t).for_all([&](auto vertex) {\n" + " label.index().for_range(t)->for_all([&](auto vertex) {{\n" " stream.write_record();\n" " stream.write_list_header(1);\n" " stream.write(vertex);\n" " stream.chunk();\n" - " });\n" + " }});\n" " stream.write_meta(\"rw\");\n"; const std::string write_all_edges = diff --git a/include/query_engine/traverser/cpp_traverser.hpp b/include/query_engine/traverser/cpp_traverser.hpp index fe01a55d0..466500943 100644 --- a/include/query_engine/traverser/cpp_traverser.hpp +++ b/include/query_engine/traverser/cpp_traverser.hpp @@ -3,12 +3,12 @@ #include <string> #include "cypher/visitor/traverser.hpp" - #include "query_engine/code_generator/cpp_generator.hpp" #include "query_engine/code_generator/entity_search.hpp" #include "query_engine/code_generator/structures.hpp" #include "query_engine/exceptions/exceptions.hpp" #include "query_engine/traverser/code.hpp" +#include "logging/default.hpp" struct SetElementState { @@ -99,7 +99,11 @@ private: generator.clear(); } + Logger logger; + public: + CppTraverser() : logger(logging::log->logger("CppTraverser")) {} + void semantic_check() const { if (!has_return) diff --git a/src/communication/bolt/v1/states/executor.cpp b/src/communication/bolt/v1/states/executor.cpp index a0c062f73..589860ba5 100644 --- a/src/communication/bolt/v1/states/executor.cpp +++ b/src/communication/bolt/v1/states/executor.cpp @@ -61,9 +61,11 @@ void Executor::run(Session& session, Query& query) auto &db = session.active_db(); logger.debug("[ActiveDB] '{}'", db.name()); - // TODO: error handling - - query_engine.execute(query.statement, db, session.output_stream); + try { + query_engine.execute(query.statement, db, session.output_stream); + } catch (QueryEngineException &e) { + // return error to user + } } void Executor::pull_all(Session& session) From 1048ea88491c6f32100cae868d353563616694bb Mon Sep 17 00:00:00 2001 From: Marko Budiselic <mbudiselicbuda@gmail.com> Date: Tue, 30 Aug 2016 06:49:47 +0100 Subject: [PATCH 3/3] bolt failure isn't finished --- .../bolt/v1/serialization/bolt_serializer.hpp | 10 ++++++ .../bolt/v1/serialization/record_stream.hpp | 5 +++ .../bolt/v1/transport/bolt_encoder.hpp | 6 ++++ src/communication/bolt/v1/states/executor.cpp | 32 +++++++------------ 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/include/communication/bolt/v1/serialization/bolt_serializer.hpp b/include/communication/bolt/v1/serialization/bolt_serializer.hpp index 73578a121..6abab5245 100644 --- a/include/communication/bolt/v1/serialization/bolt_serializer.hpp +++ b/include/communication/bolt/v1/serialization/bolt_serializer.hpp @@ -95,6 +95,16 @@ public: void write(const String &prop) { encoder.write_string(prop.value); } + void write_failure(const std::map<std::string, std::string>& data) + { + encoder.message_failure(); + encoder.write_map_header(data.size()); + for (auto const &kv : data) { + write(kv.first); + write(kv.second); + } + } + template <class T> void handle(const T &prop) { diff --git a/include/communication/bolt/v1/serialization/record_stream.hpp b/include/communication/bolt/v1/serialization/record_stream.hpp index 93bd7c4e2..45b23683f 100644 --- a/include/communication/bolt/v1/serialization/record_stream.hpp +++ b/include/communication/bolt/v1/serialization/record_stream.hpp @@ -92,6 +92,11 @@ public: chunk(); } + void write_failure(const std::map<std::string, std::string>& data) + { + serializer.write_failure(data); + chunk(); + } // -- BOLT SPECIFIC METHODS ----------------------------------------------- void write(const VertexAccessor &vertex) { serializer.write(vertex); } diff --git a/include/communication/bolt/v1/transport/bolt_encoder.hpp b/include/communication/bolt/v1/transport/bolt_encoder.hpp index 29a41207d..dba6ab9be 100644 --- a/include/communication/bolt/v1/transport/bolt_encoder.hpp +++ b/include/communication/bolt/v1/transport/bolt_encoder.hpp @@ -253,6 +253,12 @@ public: write(underlying_cast(MessageCode::Ignored)); } + void message_failure() + { + write_struct_header(1); + write(underlying_cast(MessageCode::Failure)); + } + void message_ignored_empty() { message_ignored(); diff --git a/src/communication/bolt/v1/states/executor.cpp b/src/communication/bolt/v1/states/executor.cpp index 589860ba5..b8e7e9777 100644 --- a/src/communication/bolt/v1/states/executor.cpp +++ b/src/communication/bolt/v1/states/executor.cpp @@ -10,7 +10,7 @@ namespace bolt Executor::Executor() : logger(logging::log->logger("Executor")) {} -State* Executor::run(Session& session) +State *Executor::run(Session &session) { // just read one byte that represents the struct type, we can skip the // information contained in this byte @@ -20,31 +20,22 @@ State* Executor::run(Session& session) auto message_type = session.decoder.read_byte(); - if(message_type == MessageCode::Run) - { + if (message_type == MessageCode::Run) { Query q; q.statement = session.decoder.read_string(); this->run(session, q); - } - else if(message_type == MessageCode::PullAll) - { + } else if (message_type == MessageCode::PullAll) { pull_all(session); - } - else if(message_type == MessageCode::DiscardAll) - { + } else if (message_type == MessageCode::DiscardAll) { discard_all(session); - } - else if(message_type == MessageCode::Reset) - { + } else if (message_type == MessageCode::Reset) { // todo rollback current transaction // discard all records waiting to be sent return this; - } - else - { + } else { logger.error("Unrecognized message recieved"); logger.debug("Invalid message type 0x{:02X}", message_type); @@ -54,7 +45,7 @@ State* Executor::run(Session& session) return this; } -void Executor::run(Session& session, Query& query) +void Executor::run(Session &session, Query &query) { logger.trace("[Run] '{}'", query.statement); @@ -64,18 +55,20 @@ void Executor::run(Session& session, Query& query) try { query_engine.execute(query.statement, db, session.output_stream); } catch (QueryEngineException &e) { - // return error to user + session.output_stream.write_failure( + {{"code", "unknown"}, {"message", e.what()}}); + session.output_stream.send(); } } -void Executor::pull_all(Session& session) +void Executor::pull_all(Session &session) { logger.trace("[PullAll]"); session.output_stream.send(); } -void Executor::discard_all(Session& session) +void Executor::discard_all(Session &session) { logger.trace("[DiscardAll]"); @@ -85,5 +78,4 @@ void Executor::discard_all(Session& session) session.output_stream.chunk(); session.output_stream.send(); } - }