From 12880ba99096e5742bc5e52db724a212ccf8aadb Mon Sep 17 00:00:00 2001 From: Marko Budiselic Date: Mon, 29 Aug 2016 01:01:42 +0100 Subject: [PATCH] work in progress engine <-> barrier integration --- CMakeLists.txt | 17 +-- include/barrier/barrier.hpp | 38 +++--- include/barrier/trans.hpp | 7 +- .../bolt/v1/serialization/bolt_serializer.hpp | 3 + .../bolt/v1/serialization/record_stream.hpp | 2 + include/query_engine/code_compiler.hpp | 1 + include/query_engine/code_generator.hpp | 3 +- include/query_engine/i_code_cpu.hpp | 10 +- include/query_engine/program_executor.hpp | 8 +- poc/astar.cpp | 1 + poc/profile.cpp | 1 + src/barrier/barrier.cpp | 109 +++++++++++++----- .../bolt/v1/serialization/bolt_serializer.cpp | 7 ++ src/communication/bolt/v1/states/executor.cpp | 3 + .../template/template_code_cpu.cpp | 17 ++- tests/integration/queries.cpp | 1 + tests/manual/queries.cpp | 1 + 17 files changed, 160 insertions(+), 69 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 01bfbe64f..ebde7a3db 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -144,6 +144,7 @@ endforeach() # ----------------------------------------------------------------------------- # COPY header files required by query engine (query compiler) # TODO: somehow automate (in destination dir should be only required include files) +FILE(COPY ${include_dir}/barrier/barrier.hpp DESTINATION ${build_include_dir}/barrier) FILE(COPY ${include_dir}/database/db.hpp DESTINATION ${build_include_dir}/database) FILE(COPY ${include_dir}/database/db_transaction.hpp DESTINATION ${build_include_dir}/database) FILE(COPY ${include_dir}/database/db_accessor.hpp DESTINATION ${build_include_dir}/database) @@ -455,7 +456,6 @@ set(memgraph_src_files ${src_dir}/storage/record_accessor.cpp ) - # STATIC library used by memgraph executables add_library(memgraph STATIC ${memgraph_src_files}) @@ -463,6 +463,10 @@ add_library(memgraph STATIC ${memgraph_src_files}) add_library(memgraph_pic STATIC ${memgraph_src_files}) set_property(TARGET memgraph_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE) +add_library(barrier STATIC ${memgraph_src_files}) + +add_library(barrier_pic STATIC ${memgraph_src_files}) +set_property(TARGET barrier_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE) # tests if (TESTS) @@ -498,7 +502,7 @@ set(MEMGRAPH_BUILD_NAME # memgraph main executable if (MEMGRAPH) add_executable(${MEMGRAPH_BUILD_NAME} ${src_dir}/memgraph_bolt.cpp) - target_link_libraries(${MEMGRAPH_BUILD_NAME} memgraph) + target_link_libraries(${MEMGRAPH_BUILD_NAME} barrier) target_link_libraries(${MEMGRAPH_BUILD_NAME} Threads::Threads) target_link_libraries(${MEMGRAPH_BUILD_NAME} cypher_lib) if (UNIX) @@ -508,12 +512,3 @@ if (MEMGRAPH) target_link_libraries(${MEMGRAPH_BUILD_NAME} dl) endif (UNIX) endif() - -# # memgraph executable HTTP TODO: DEPRICATED -# add_executable(memgraph_http src/memgraph.cpp) -# add_dependencies(memgraph_http cypher_lib) -# target_link_libraries(memgraph_http Threads::Threads) -# target_link_libraries(memgraph_http pcre) -# target_link_libraries(memgraph_http ${libuv_static_lib}) -# target_link_libraries(memgraph_http ${r3_static_lib}) -# target_link_libraries(memgraph_http ${http_parser_static_lib}) diff --git a/include/barrier/barrier.hpp b/include/barrier/barrier.hpp index d8c088db4..e201b9d61 100644 --- a/include/barrier/barrier.hpp +++ b/include/barrier/barrier.hpp @@ -38,7 +38,7 @@ class EdgePropertyType; // BOLT template -class BoltSerializer; +class RecordStream; // ************ Here should be forward declarations of Unsized barrier classes // COMMON @@ -413,43 +413,41 @@ public: }; template -class BoltSerializer : private Sized<8, 8> +class RecordStream : private Sized<8, 8> { public: template - BoltSerializer(T &&d); + RecordStream(T &&d); - BoltSerializer(const BoltSerializer &other) = default; - BoltSerializer(BoltSerializer &&other) = default; - ~BoltSerializer(); + RecordStream(const RecordStream &other) = default; + RecordStream(RecordStream &&other) = default; + ~RecordStream(); - BoltSerializer &operator=(const BoltSerializer &other) = default; - BoltSerializer &operator=(BoltSerializer &&other) = default; + RecordStream &operator=(const RecordStream &other) = default; + RecordStream &operator=(RecordStream &&other) = default; void write(const VertexAccessor &vertex); - void write(const EdgeAccessor &edge); - void write(const Property &prop); - void write_null(); - void write(const Bool &prop); - void write(const Float &prop); - void write(const Double &prop); - void write(const Int32 &prop); - void write(const Int64 &prop); - void write(const std::string &value); - void write(const String &prop); - template - void handle(const T &prop); + void write_success(); + void write_success_empty(); + void write_ignored(); + void write_fields(const std::vector &fields); + void write_field(const std::string& field); + void write_list_header(size_t size); + void write_record(); + void write_meta(const std::string& type); + void send(); + void chunk(); }; // ************ Here should be definitions of Unsized barrier classes diff --git a/include/barrier/trans.hpp b/include/barrier/trans.hpp index b4fc8a567..af7c8ad76 100644 --- a/include/barrier/trans.hpp +++ b/include/barrier/trans.hpp @@ -1,8 +1,11 @@ #pragma once + #include "barrier/barrier.hpp" // This is the place for imports from memgraph .hpp #include "communication/bolt/v1/serialization/bolt_serializer.hpp" +#include "communication/bolt/v1/serialization/record_stream.hpp" +#include "io/network/socket.hpp" #include "database/db.hpp" #include "database/db_accessor.hpp" #include "storage/edge_type/edge_type.hpp" @@ -165,7 +168,7 @@ TRANSFORM_REF_TEMPLATED(VertexIndex, VertexIndexBase); template TRANSFORM_REF_TEMPLATED(EdgeIndex, EdgeIndexBase); template -TRANSFORM_REF_TEMPLATED(BoltSerializer, ::bolt::BoltSerializer); +TRANSFORM_REF_TEMPLATED(RecordStream, ::bolt::RecordStream); template TRANSFORM_REF_TEMPLATED( @@ -209,7 +212,7 @@ TRANSFORM_VALUE_ONE_RAW(EdgePropertyType, ::EdgePropertyFamily::PropertyType::PropertyTypeKey) template -TRANSFORM_VALUE_ONE_RAW(BoltSerializer, ::bolt::BoltSerializer) +TRANSFORM_VALUE_ONE_RAW(RecordStream, ::bolt::RecordStream) // ********************* SPECIAL CONSTRUCTORS #define VertexPropertyType_constructor(x) \ diff --git a/include/communication/bolt/v1/serialization/bolt_serializer.hpp b/include/communication/bolt/v1/serialization/bolt_serializer.hpp index e34323cdd..73578a121 100644 --- a/include/communication/bolt/v1/serialization/bolt_serializer.hpp +++ b/include/communication/bolt/v1/serialization/bolt_serializer.hpp @@ -8,6 +8,9 @@ #include "storage/model/properties/all.hpp" #include "storage/model/properties/properties.hpp" +#include "storage/label/label.hpp" +#include "storage/edge_type/edge_type.hpp" +#include "storage/vertex_record.hpp" namespace bolt { diff --git a/include/communication/bolt/v1/serialization/record_stream.hpp b/include/communication/bolt/v1/serialization/record_stream.hpp index 7c5f1b46b..93bd7c4e2 100644 --- a/include/communication/bolt/v1/serialization/record_stream.hpp +++ b/include/communication/bolt/v1/serialization/record_stream.hpp @@ -22,6 +22,8 @@ public: logger = logging::log->logger("Record Stream"); } + ~RecordStream() = default; + // TODO: create apstract methods that are not bolt specific --------------- void write_success() { diff --git a/include/query_engine/code_compiler.hpp b/include/query_engine/code_compiler.hpp index 646f00ed8..00ce66ae0 100644 --- a/include/query_engine/code_compiler.hpp +++ b/include/query_engine/code_compiler.hpp @@ -52,6 +52,7 @@ public: "-I../../libs/fmt", "-L./ -L../", "-lmemgraph_pic", + "-lbarrier_pic", "-shared -fPIC" // shared library flags ); diff --git a/include/query_engine/code_generator.hpp b/include/query_engine/code_generator.hpp index 8230b544e..964c824bb 100644 --- a/include/query_engine/code_generator.hpp +++ b/include/query_engine/code_generator.hpp @@ -55,7 +55,8 @@ public: template_file, {{"class_name", "CodeCPU"}, {"stripped_hash", std::to_string(stripped_hash)}, {"query", query}, - {"stream", type_name().to_string()}, + // {"stream", type_name().to_string()}, + {"stream", "RecordStream"}, {"code", cpp_traverser.code}}); // logger.trace("generated code: {}", generated); diff --git a/include/query_engine/i_code_cpu.hpp b/include/query_engine/i_code_cpu.hpp index cd4e92bc1..fd7d7d02f 100644 --- a/include/query_engine/i_code_cpu.hpp +++ b/include/query_engine/i_code_cpu.hpp @@ -1,15 +1,19 @@ #pragma once #include "communication/communication.hpp" -#include "database/db.hpp" -#include "database/db_accessor.hpp" #include "query_engine/query_stripped.hpp" +// #include "database/db.hpp" +// #include "database/db_accessor.hpp" + +// BARRIER! +#include "barrier/barrier.hpp" + template class ICodeCPU { public: - virtual bool run(Db &db, code_args_t &args, + virtual bool run(barrier::Db &db, code_args_t &args, Stream &stream) = 0; virtual ~ICodeCPU() {} }; diff --git a/include/query_engine/program_executor.hpp b/include/query_engine/program_executor.hpp index c74a2720d..3a5ed6614 100644 --- a/include/query_engine/program_executor.hpp +++ b/include/query_engine/program_executor.hpp @@ -11,6 +11,12 @@ // execution // postprocess the results +// BARRIER! +namespace barrier +{ + Db& trans(::Db& ref); +} + template class ProgramExecutor { @@ -22,7 +28,7 @@ public: { try { // TODO: return result of query/code exection - return program.code->run(db, program.stripped.arguments, stream); + return program.code->run(barrier::trans(db), program.stripped.arguments, stream); } catch (...) { // TODO: return more information about the error throw QueryEngineException("code execution error"); diff --git a/poc/astar.cpp b/poc/astar.cpp index 0655294c3..433e39431 100644 --- a/poc/astar.cpp +++ b/poc/astar.cpp @@ -25,6 +25,7 @@ #include "storage/vertices.cpp" #include "storage/vertices.hpp" #include "utils/command_line/arguments.hpp" +#include "communication/bolt/v1/serialization/bolt_serializer.hpp" const int max_score = 1000000; diff --git a/poc/profile.cpp b/poc/profile.cpp index 4faecf66f..f8f320460 100644 --- a/poc/profile.cpp +++ b/poc/profile.cpp @@ -12,6 +12,7 @@ #include #include "import/csv_import.hpp" #include "utils/command_line/arguments.hpp" +#include "communication/bolt/v1/serialization/bolt_serializer.hpp" using namespace std; diff --git a/src/barrier/barrier.cpp b/src/barrier/barrier.cpp index dfb3e9513..7031230bd 100644 --- a/src/barrier/barrier.cpp +++ b/src/barrier/barrier.cpp @@ -499,83 +499,140 @@ OptionPtr> EdgePropertyFamily::index() // ************************* BOLT SERIALIZER template -BoltSerializer::~BoltSerializer() +RecordStream::~RecordStream() { - THIS->~BoltSerializer(); + // TODO: solve this + // THIS->~RecordStream(); } template -void BoltSerializer::write(const VertexAccessor &vertex) +void RecordStream::write(const VertexAccessor &vertex) { HALF_CALL(write(trans(vertex))); } template -void BoltSerializer::write(const EdgeAccessor &edge) +void RecordStream::write(const EdgeAccessor &edge) { HALF_CALL(write(trans(edge))); } template -void BoltSerializer::write(const Property &prop) +void RecordStream::write(const Property &prop) +{ + HALF_CALL(write(prop)); +} + +// template +// void RecordStream::write_null() +// { +// HALF_CALL(write_null()); +// } + +template +void RecordStream::write(const Bool &prop) { HALF_CALL(write(prop)); } template -void BoltSerializer::write_null() -{ - HALF_CALL(write_null()); -} - -template -void BoltSerializer::write(const Bool &prop) +void RecordStream::write(const Float &prop) { HALF_CALL(write(prop)); } template -void BoltSerializer::write(const Float &prop) +void RecordStream::write(const Double &prop) { HALF_CALL(write(prop)); } template -void BoltSerializer::write(const Double &prop) +void RecordStream::write(const Int32 &prop) { HALF_CALL(write(prop)); } template -void BoltSerializer::write(const Int32 &prop) +void RecordStream::write(const Int64 &prop) { HALF_CALL(write(prop)); } template -void BoltSerializer::write(const Int64 &prop) -{ - HALF_CALL(write(prop)); -} - -template -void BoltSerializer::write(const std::string &value) +void RecordStream::write(const std::string &value) { HALF_CALL(write(value)); } template -void BoltSerializer::write(const String &prop) +void RecordStream::write(const String &prop) { HALF_CALL(write(prop)); } template -template -void BoltSerializer::handle(const T &prop) +void RecordStream::write_success() { - HALF_CALL(template handle(prop)); + HALF_CALL(write_success()); } + +template +void RecordStream::write_success_empty() +{ + HALF_CALL(write_success_empty()); +} + +template +void RecordStream::write_ignored() +{ + HALF_CALL(write_ignored()); +} + +template +void RecordStream::write_fields(const std::vector &fields) +{ + HALF_CALL(write_fields(fields)); +} + +template +void RecordStream::write_field(const std::string& field) +{ + HALF_CALL(write_field(field)); +} + +template +void RecordStream::write_list_header(size_t size) +{ + HALF_CALL(write_list_header(size)); +} + +template +void RecordStream::write_record() +{ + HALF_CALL(write_record()); +} + +template +void RecordStream::write_meta(const std::string& type) +{ + HALF_CALL(write_meta(type)); +} + +template +void RecordStream::send() +{ + HALF_CALL(send()); +} + +template +void RecordStream::chunk() +{ + HALF_CALL(chunk()); +} + +template class RecordStream; + } // **************************** ERROR EXAMPLES ****************************** // diff --git a/src/communication/bolt/v1/serialization/bolt_serializer.cpp b/src/communication/bolt/v1/serialization/bolt_serializer.cpp index 4ff4c7f56..b45445897 100644 --- a/src/communication/bolt/v1/serialization/bolt_serializer.cpp +++ b/src/communication/bolt/v1/serialization/bolt_serializer.cpp @@ -1,5 +1,9 @@ #include "communication/bolt/v1/serialization/bolt_serializer.hpp" +#include "communication/bolt/v1/transport/chunked_buffer.hpp" +#include "communication/bolt/v1/transport/chunked_encoder.hpp" +#include "communication/bolt/v1/transport/socket_stream.hpp" +#include "io/network/socket.hpp" #include "storage/edge_x_vertex.hpp" template @@ -28,3 +32,6 @@ void bolt::BoltSerializer::write(const EdgeAccessor &edge) write(*prop.second); } } + +template class bolt::BoltSerializer>>>>; diff --git a/src/communication/bolt/v1/states/executor.cpp b/src/communication/bolt/v1/states/executor.cpp index 6d280f37e..4a8f5e8d4 100644 --- a/src/communication/bolt/v1/states/executor.cpp +++ b/src/communication/bolt/v1/states/executor.cpp @@ -1,6 +1,9 @@ #include "communication/bolt/v1/states/executor.hpp" #include "communication/bolt/v1/messaging/codes.hpp" +// BARRIER! TODO: ATTENTION: HACK!!!!! +#include "barrier/barrier.cpp" + namespace bolt { diff --git a/src/query_engine/template/template_code_cpu.cpp b/src/query_engine/template/template_code_cpu.cpp index 667687d6e..74b1e30b8 100644 --- a/src/query_engine/template/template_code_cpu.cpp +++ b/src/query_engine/template/template_code_cpu.cpp @@ -10,6 +10,10 @@ using std::endl; // query: {{query}} +// BARRIER! +namespace barrier +{ + class {{class_name}} : public ICodeCPU<{{stream}}> { public: @@ -23,13 +27,16 @@ public: ~{{class_name}}() {} }; - -extern "C" ICodeCPU<{{stream}}>* produce() -{ - return new {{class_name}}(); } -extern "C" void destruct(ICodeCPU<{{stream}}>* p) + +extern "C" ICodeCPU* produce() +{ + // BARRIER! + return new barrier::{{class_name}}(); +} + +extern "C" void destruct(ICodeCPU* p) { delete p; } diff --git a/tests/integration/queries.cpp b/tests/integration/queries.cpp index ad649fd75..f8d94f8c9 100644 --- a/tests/integration/queries.cpp +++ b/tests/integration/queries.cpp @@ -4,6 +4,7 @@ #include "database/db.hpp" #include "query_engine/query_stripper.hpp" +#include "communication/bolt/v1/serialization/bolt_serializer.hpp" // #include "storage/edges.cpp" // #include "storage/edges.hpp" // #include "storage/vertices.cpp" diff --git a/tests/manual/queries.cpp b/tests/manual/queries.cpp index 8d46e6e07..3902904d8 100644 --- a/tests/manual/queries.cpp +++ b/tests/manual/queries.cpp @@ -10,6 +10,7 @@ #include "storage/edges.hpp" #include "storage/vertices.cpp" #include "storage/vertices.hpp" +#include "communication/bolt/v1/serialization/bolt_serializer.hpp" using namespace std;