This commit is contained in:
Kruno Tomola Fabro 2016-08-30 08:54:48 +01:00
commit 035e508840
24 changed files with 331 additions and 78 deletions

View File

@ -308,6 +308,7 @@ endif()
set(CMAKE_CXX_FLAGS_RELEASE
"${CMAKE_CXX_FLAGS_RELEASE} -Wall")
# TODO: find a way how to applay the defines at the query compile time
# -- configure defines -- default is ON | true | enabled ----------------------
# -- logging ------------------------------------------------------------------
option(LOG_NO_TRACE "Disable trace logging" OFF)
@ -377,6 +378,16 @@ message(STATUS "TOOLS binaries: ${TOOLS}")
option(TESTS "Build test binaries" ON)
message(STATUS "TESTS binaries: ${TESTS}")
# -- binaries -----------------------------------------------------------------
# -- barrier - this is the way how the engine is isolated so it can be shipped
# wherever, the code is completely hidden behind the barrier, during the
# development the barrier can be turned off because it is much easier to
# debug
option(BARRIER "Barrier" ON)
message(STATUS "BARRIER: ${BARRIER} (Source code isolation)")
if(BARRIER)
add_definitions( -DBARRIER )
endif()
# -- barrier ------------------------------------------------------------------
# -- configure defines --------------------------------------------------------
# -- includes -----------------------------------------------------------------
@ -466,10 +477,14 @@ add_library(memgraph STATIC ${memgraph_src_files})
add_library(memgraph_pic STATIC ${memgraph_src_files})
set_property(TARGET memgraph_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE)
add_library(barrier STATIC ${memgraph_src_files})
if (BARRIER)
# create static barrier lib
add_library(barrier STATIC ${memgraph_src_files})
add_library(barrier_pic STATIC ${memgraph_src_files})
set_property(TARGET barrier_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE)
# create pic static barrier lib
add_library(barrier_pic STATIC ${memgraph_src_files})
set_property(TARGET barrier_pic PROPERTY POSITION_INDEPENDENT_CODE TRUE)
endif()
# tests
if (TESTS)
@ -505,7 +520,13 @@ set(MEMGRAPH_BUILD_NAME
# memgraph main executable
if (MEMGRAPH)
add_executable(${MEMGRAPH_BUILD_NAME} ${src_dir}/memgraph_bolt.cpp)
target_link_libraries(${MEMGRAPH_BUILD_NAME} barrier)
if (BARRIER)
target_link_libraries(${MEMGRAPH_BUILD_NAME} barrier)
elseif (NOT BARRIER)
target_link_libraries(${MEMGRAPH_BUILD_NAME} memgraph)
endif ()
target_link_libraries(${MEMGRAPH_BUILD_NAME} Threads::Threads)
target_link_libraries(${MEMGRAPH_BUILD_NAME} cypher_lib)
if (UNIX)

View File

@ -95,6 +95,16 @@ public:
void write(const String &prop) { encoder.write_string(prop.value); }
void write_failure(const std::map<std::string, std::string>& data)
{
encoder.message_failure();
encoder.write_map_header(data.size());
for (auto const &kv : data) {
write(kv.first);
write(kv.second);
}
}
template <class T>
void handle(const T &prop)
{

View File

@ -92,6 +92,11 @@ public:
chunk();
}
void write_failure(const std::map<std::string, std::string>& data)
{
serializer.write_failure(data);
chunk();
}
// -- BOLT SPECIFIC METHODS -----------------------------------------------
void write(const VertexAccessor &vertex) { serializer.write(vertex); }

View File

@ -253,6 +253,12 @@ public:
write(underlying_cast(MessageCode::Ignored));
}
void message_failure()
{
write_struct_header(1);
write(underlying_cast(MessageCode::Failure));
}
void message_ignored_empty()
{
message_ignored();

View File

@ -5,7 +5,8 @@
namespace config
{
constexpr const char * COMPILE_CPU_PATH = "compile_cpu_path";
constexpr const char * TEMPLATE_CPU_CPP_PATH = "template_cpu_cpp_path";
constexpr const char *COMPILE_CPU_PATH = "compile_cpu_path";
constexpr const char *TEMPLATE_CPU_CPP_PATH = "template_cpu_cpp_path";
constexpr const char *BARRIER_TEMPLATE_CPU_CPP_PATH =
"barrier_template_cpu_cpp_path";
}

View File

@ -20,6 +20,9 @@ public:
std::string flags;
// TODO: sync this with cmake configuration
#ifdef BARRIER
flags += " -DBARRIER";
#endif
#ifdef NDEBUG
flags += " -DNDEBUG -O2";
#endif
@ -51,8 +54,10 @@ public:
"-I../libs/fmt", // TODO: load from config
"-I../../libs/fmt",
"-L./ -L../",
"-lmemgraph_pic",
#ifdef BARRIER
"-lbarrier_pic",
#endif
"-lmemgraph_pic",
"-shared -fPIC" // shared library flags
);

View File

@ -27,7 +27,11 @@ public:
CppTraverser cpp_traverser;
// get paths
#ifdef BARRIER
string template_path = CONFIG(config::BARRIER_TEMPLATE_CPU_CPP_PATH);
#else
string template_path = CONFIG(config::TEMPLATE_CPU_CPP_PATH);
#endif
string template_file = utils::read_file(template_path.c_str());
// syntax tree generation
@ -55,12 +59,14 @@ public:
template_file, {{"class_name", "CodeCPU"},
{"stripped_hash", std::to_string(stripped_hash)},
{"query", query},
// {"stream", type_name<Stream>().to_string()},
// BARRIER !!!!
#ifdef BARRIER
{"stream", "RecordStream<io::Socket>"},
#else
{"stream", type_name<Stream>().to_string()},
#endif
{"code", cpp_traverser.code}});
// logger.trace("generated code: {}", generated);
logger.debug("generated code: {}", generated);
utils::write_file(generated, path);
}

View File

@ -3,9 +3,11 @@
#include <cstdint>
#include <map>
#include <string>
#include <vector>
#include "query_engine/code_generator/namer.hpp"
#include "storage/model/properties/flags.hpp"
#include "query_engine/exceptions/exceptions.hpp"
// main states that are used while ast is traversed
// in order to generate ActionSequence
@ -22,23 +24,34 @@ enum class CypherState : uint8_t
enum class EntityStatus : uint8_t
{
NotFound,
None,
Matched,
Created
};
enum class EntityType : uint8_t
{
NotFound,
None,
Node,
Relationship
};
// where OR how entity can be found
enum class EntitySource : uint8_t
{
None,
InternalId,
LabelIndex,
MainStorage
};
class CypherStateData
{
private:
std::map<std::string, EntityStatus> entity_status;
std::map<std::string, EntityType> entity_type;
std::map<std::string, EntitySource> entity_source;
std::map<std::string, std::vector<std::string>> entity_tags;
// TODO: container that keeps track about c++ variable names
@ -51,7 +64,7 @@ public:
EntityStatus status(const std::string &name)
{
if (entity_status.find(name) == entity_status.end())
return EntityStatus::NotFound;
return EntityStatus::None;
return entity_status.at(name);
}
@ -59,11 +72,25 @@ public:
EntityType type(const std::string &name) const
{
if (entity_type.find(name) == entity_type.end())
return EntityType::NotFound;
return EntityType::None;
return entity_type.at(name);
}
EntitySource source(const std::string &name) const
{
if (entity_source.find(name) == entity_source.end())
return EntitySource::None;
return entity_source.at(name);
}
auto tags(const std::string& name) const
{
if (entity_tags.find(name) == entity_tags.end())
throw CppGeneratorException("No tags for specified entity");
return entity_tags.at(name);
}
const std::map<std::string, EntityType> &all_typed_enteties()
{
return entity_type;
@ -92,4 +119,14 @@ public:
entity_type[name] = EntityType::Relationship;
entity_status[name] = EntityStatus::Created;
}
void source(const std::string& name, EntitySource source)
{
entity_source[name] = source;
}
void tags(const std::string& name, std::vector<std::string> tags)
{
entity_tags[name] = tags;
}
};

View File

@ -30,29 +30,44 @@ auto match_query_action =
for (auto const &kv : action_data.actions) {
// TODO: the same code REFACTOR!
auto name = kv.first;
// find node
if (kv.second == ClauseAction::MatchNode) {
auto name = kv.first;
if (already_matched(cypher_data, name, EntityType::Node)) continue;
if (already_matched(cypher_data, name, EntityType::Node))
continue;
cypher_data.node_matched(name);
auto place = action_data.csm.min(kv.first);
auto place = action_data.csm.min(name);
if (place == entity_search::search_internal_id) {
auto index = fetch_internal_index(action_data, name);
code += code_line(code::match_vertex_by_id, name, index);
cypher_data.source(name, EntitySource::InternalId);
}
if (place == entity_search::search_main_storage) {
cypher_data.source(name, EntitySource::MainStorage);
}
if (place == entity_search::search_label_index) {
if (action_data.entity_data.at(name).tags.size() > 1) {
throw SemanticError("Multiple label match (currently NOT supported)");
}
cypher_data.source(name, EntitySource::LabelIndex);
cypher_data.tags(name, action_data.entity_data.at(name).tags);
}
}
// find relationship
if (kv.second == ClauseAction::MatchRelationship) {
auto name = kv.first;
if (already_matched(cypher_data, name, EntityType::Relationship))
continue;
cypher_data.relationship_matched(name);
auto place = action_data.csm.min(kv.first);
auto place = action_data.csm.min(name);
if (place == entity_search::search_internal_id) {
auto index = fetch_internal_index(action_data, name);
code += code_line(code::match_edge_by_id, name, index);
cypher_data.source(name, EntitySource::InternalId);
}
if (place == entity_search::search_main_storage) {
cypher_data.source(name, EntitySource::MainStorage);
}
}
}

View File

@ -11,16 +11,47 @@ auto return_query_action =
const auto &elements = action_data.return_elements;
code += code_line("// number of elements {}", elements.size());
// TODO: call bolt serialization
for (const auto &element : elements) {
for (const auto &element : elements)
{
auto &entity = element.entity;
if (!cypher_data.exist(entity)) {
if (!cypher_data.exist(entity))
throw SemanticError(
fmt::format("{} couldn't be found (RETURN clause).", entity));
}
if (element.is_entity_only()) {
code += code_line(code::write_entity, entity);
} else if (element.is_projection()) {
if (element.is_entity_only())
{
// if the node has just recently been created on can be found
// with the internal id then it can be sent to the client
if (cypher_data.status(entity) == EntityStatus::Created ||
(cypher_data.source(entity) == EntitySource::InternalId &&
cypher_data.status(entity) == EntityStatus::Matched))
{
code += code_line(code::write_entity, entity);
}
// the client has to receive all elements from the main storage
if (cypher_data.source(entity) == EntitySource::MainStorage)
{
if (cypher_data.type(entity) == EntityType::Node)
code += code_line(code::write_all_vertices, entity);
else if (cypher_data.type(entity) == EntityType::Relationship)
code += code_line(code::write_all_edges, entity);
}
// the client will receive entities from label index
if (cypher_data.source(entity) == EntitySource::LabelIndex)
{
if (cypher_data.type(entity) == EntityType::Node) {
if (cypher_data.tags(entity).size() == 0)
throw CppGeneratorException("entity has no tags");
auto label = cypher_data.tags(entity).at(0);
code += code_line(code::fine_and_write_vertices_by_label,
entity, label);
}
}
}
else if (element.is_projection())
{
code += code_line("// TODO: implement projection");
// auto &property = element.property;
// code += code_line(code::print_property, entity, property);

View File

@ -151,6 +151,14 @@ struct QueryActionData
return entity_data.at(entity);
}
auto get_tags(const std::string& entity) const
{
if (entity_data.find(entity) == entity_data.end())
throw CppGeneratorException("Entity " + entity + "doesn't exist");
return entity_data.at(entity).tags;
}
void print() const
{
for (auto const &action : actions) {

View File

@ -3,17 +3,22 @@
#include "communication/communication.hpp"
#include "query_engine/query_stripped.hpp"
// #include "database/db.hpp"
// #include "database/db_accessor.hpp"
// BARRIER!
#ifdef BARRIER
#include "barrier/barrier.hpp"
#else
#include "database/db.hpp"
#include "database/db_accessor.hpp"
#endif
template <typename Stream>
class ICodeCPU
{
public:
#ifdef BARRIER
virtual bool run(barrier::Db &db, code_args_t &args, Stream &stream) = 0;
#else
virtual bool run(Db &db, code_args_t &args, Stream &stream) = 0;
#endif
virtual ~ICodeCPU() {}
};

View File

@ -12,10 +12,12 @@
// postprocess the results
// BARRIER!
#ifdef BARRIER
namespace barrier
{
Db& trans(::Db& ref);
Db &trans(::Db &ref);
}
#endif
template <typename Stream>
class ProgramExecutor
@ -23,12 +25,16 @@ class ProgramExecutor
public:
// QueryProgram has to know about the Stream
// Stream has to be passed in this function for every execution
auto execute(QueryProgram<Stream> &program, Db &db,
Stream &stream)
auto execute(QueryProgram<Stream> &program, Db &db, Stream &stream)
{
try {
// TODO: return result of query/code exection
return program.code->run(barrier::trans(db), program.stripped.arguments, stream);
#ifdef BARRIER
return program.code->run(barrier::trans(db),
program.stripped.arguments, stream);
#else
return program.code->run(db, program.stripped.arguments, stream);
#endif
} catch (...) {
// TODO: return more information about the error
throw QueryEngineException("code execution error");

View File

@ -36,7 +36,8 @@ public:
} catch (QueryEngineException &e) {
// in this case something fatal went wrong
logger.error("QueryEngineException: {}", std::string(e.what()));
return false;
// return false;
throw e;
}
}

View File

@ -13,6 +13,7 @@
#include "utils/hashing/fnv.hpp"
#include "utils/string/transform.hpp"
#include "utils/variadic/variadic.hpp"
#include "logging/default.hpp"
template <class T, class V>
void store_query_param(code_args_t &arguments, V &&v)
@ -25,7 +26,8 @@ class QueryStripper
{
public:
QueryStripper(Ts &&... strip_types)
: strip_types(std::make_tuple(std::forward<Ts>(strip_types)...)),
: logger(logging::log->logger("QueryStripper")),
strip_types(std::make_tuple(std::forward<Ts>(strip_types)...)),
lexer(std::make_unique<CypherLexer>())
{
}
@ -70,6 +72,10 @@ public:
std::stol(token.value));
break;
case TK_STR:
// TODO: remove quotes view lexertl
token.value.erase(0, 1);
token.value.erase(token.value.length() - 1, 1);
// TODO: remove
store_query_param<String>(stripped_arguments, token.value);
break;
case TK_BOOL: {
@ -94,6 +100,7 @@ public:
}
private:
Logger logger;
std::tuple<Ts...> strip_types;
CypherLexer::uptr lexer;

View File

@ -6,14 +6,15 @@
struct Code
{
void reset() { code = ""; }
std::string code;
void reset() { code = ""; }
};
namespace code
{
// TODO: one more abstraction level
// TODO: UNIT tests
const std::string transaction_begin = "DbAccessor t(db);";
@ -58,6 +59,41 @@ const std::string write_entity = "stream.write_field(\"{0}\");\n"
" stream.chunk();"
" stream.write_meta(\"rw\");\n";
const std::string write_all_vertices =
"stream.write_field(\"{0}\");\n"
" iter::for_all(t.vertex_access(), [&](auto vertex) {{\n"
" if (vertex.fill()) {{\n"
" stream.write_record();\n"
" stream.write_list_header(1);\n"
" stream.write(vertex);\n"
" stream.chunk();\n"
" }}\n"
" }});\n"
" stream.write_meta(\"rw\");\n";
const std::string fine_and_write_vertices_by_label =
"auto &label = t.label_find_or_create(\"{1}\");\n"
" stream.write_field(\"{0}\");\n"
" label.index().for_range(t)->for_all([&](auto vertex) {{\n"
" stream.write_record();\n"
" stream.write_list_header(1);\n"
" stream.write(vertex);\n"
" stream.chunk();\n"
" }});\n"
" stream.write_meta(\"rw\");\n";
const std::string write_all_edges =
"stream.write_field(\"{0}\");\n"
" iter::for_all(t.edge_access(), [&](auto edge) {{\n"
" if (edge.fill()) {{\n"
" stream.write_record();\n"
" stream.write_list_header(1);\n"
" stream.write(edge);\n"
" stream.chunk();\n"
" }}\n"
" }});\n"
" stream.write_meta(\"rw\");\n";
const std::string return_true = "return true;";
const std::string todo = "// TODO: {}";

View File

@ -3,12 +3,12 @@
#include <string>
#include "cypher/visitor/traverser.hpp"
#include "query_engine/code_generator/cpp_generator.hpp"
#include "query_engine/code_generator/entity_search.hpp"
#include "query_engine/code_generator/structures.hpp"
#include "query_engine/exceptions/exceptions.hpp"
#include "query_engine/traverser/code.hpp"
#include "logging/default.hpp"
struct SetElementState
{
@ -99,7 +99,11 @@ private:
generator.clear();
}
Logger logger;
public:
CppTraverser() : logger(logging::log->logger("CppTraverser")) {}
void semantic_check() const
{
if (!has_return)
@ -255,7 +259,11 @@ public:
Traverser::visit(ast_node);
if (state == CypherState::Create) {
// this is here because of RETURN clause
// CREATE (n {...}) RETURN n
if (cypher_data.status(name) != EntityStatus::Matched &&
state == CypherState::Create)
{
cypher_data.node_created(name);
}
}
@ -362,13 +370,15 @@ public:
void visit(ast::LabelList &ast_label_list) override
{
auto &data = generator.action_data();
auto &action_data = generator.action_data();
if (!ast_label_list.has_value()) return;
auto label = ast_label_list.value->name;
data.add_entity_tag(entity, label);
action_data.add_entity_tag(entity, label);
action_data.csm.search_cost(entity, entity_search::search_label_index,
entity_search::label_cost);
Traverser::visit(ast_label_list);
}
@ -447,7 +457,7 @@ public:
auto &cypher_data = generator.cypher_data();
auto entity_type = cypher_data.type(entity);
if (entity_type == EntityType::NotFound)
if (entity_type == EntityType::None)
throw SemanticError("Entity (" + entity + ") doesn't exist");
auto &action_data = generator.action_data();

View File

@ -30,6 +30,9 @@ public:
// Creates new Vertex and returns filled VertexAccessor.
VertexAccessor insert(DbTransaction &t);
// TODO: how can I know how many elements exist
// without iterating through all of them? MVCC?
VertexPropertyFamily &
property_family_find_or_create(const std::string &name);

View File

@ -51,6 +51,9 @@ public:
if (std::strcmp(key, "template_cpu_cpp_path") == 0)
return "./template/template_code_cpu.cpp";
if (std::strcmp(key, "barrier_template_cpu_cpp_path") == 0)
return "./template/barrier_template_code_cpu.cpp";
if (std::strcmp(key, "template_cpu_hpp_path") == 0)
return "./template/template_code_cpu.hpp";

View File

@ -1,15 +1,16 @@
#include "communication/bolt/v1/states/executor.hpp"
#include "communication/bolt/v1/messaging/codes.hpp"
// BARRIER! TODO: ATTENTION: HACK!!!!!
#ifdef BARRIER
#include "barrier/barrier.cpp"
#endif
namespace bolt
{
Executor::Executor() : logger(logging::log->logger("Executor")) {}
State* Executor::run(Session& session)
State *Executor::run(Session &session)
{
// just read one byte that represents the struct type, we can skip the
// information contained in this byte
@ -19,31 +20,22 @@ State* Executor::run(Session& session)
auto message_type = session.decoder.read_byte();
if(message_type == MessageCode::Run)
{
if (message_type == MessageCode::Run) {
Query q;
q.statement = session.decoder.read_string();
this->run(session, q);
}
else if(message_type == MessageCode::PullAll)
{
} else if (message_type == MessageCode::PullAll) {
pull_all(session);
}
else if(message_type == MessageCode::DiscardAll)
{
} else if (message_type == MessageCode::DiscardAll) {
discard_all(session);
}
else if(message_type == MessageCode::Reset)
{
} else if (message_type == MessageCode::Reset) {
// todo rollback current transaction
// discard all records waiting to be sent
return this;
}
else
{
} else {
logger.error("Unrecognized message recieved");
logger.debug("Invalid message type 0x{:02X}", message_type);
@ -53,25 +45,30 @@ State* Executor::run(Session& session)
return this;
}
void Executor::run(Session& session, Query& query)
void Executor::run(Session &session, Query &query)
{
logger.trace("[Run] '{}'", query.statement);
auto &db = session.active_db();
logger.debug("[ActiveDB] '{}'", db.name());
// TODO: hangle syntax error use case
query_engine.execute(query.statement, db, session.output_stream);
try {
query_engine.execute(query.statement, db, session.output_stream);
} catch (QueryEngineException &e) {
session.output_stream.write_failure(
{{"code", "unknown"}, {"message", e.what()}});
session.output_stream.send();
}
}
void Executor::pull_all(Session& session)
void Executor::pull_all(Session &session)
{
logger.trace("[PullAll]");
session.output_stream.send();
}
void Executor::discard_all(Session& session)
void Executor::discard_all(Session &session)
{
logger.trace("[DiscardAll]");
@ -81,5 +78,4 @@ void Executor::discard_all(Session& session)
session.output_stream.chunk();
session.output_stream.send();
}
}

View File

@ -63,6 +63,12 @@ public:
// string literal TODO double quote escape
rule("\\\"(.*?)\\\"", TK_STR);
// ALL BELOW COMBNATIONS DON'T WORK
// rule("(?#\\\")(.*?)(?#\\\")", TK_STR);
// rule("[\"](.*?)[\"]", TK_STR);
// rule("(?:\")(.*?)(?:\")", TK_STR);
// rule("(?#:\")(.*?)(?#:\")", TK_STR);
// rule("(?#\")(.*?)(?#\")", TK_STR);
// number
rule("\\d+", TK_LONG);

View File

@ -0,0 +1,42 @@
#include <iostream>
#include <string>
#include "query_engine/util.hpp"
#include "query_engine/i_code_cpu.hpp"
#include "storage/model/properties/all.hpp"
using std::cout;
using std::endl;
// query: {{query}}
// BARRIER!
namespace barrier
{
class {{class_name}} : public ICodeCPU<{{stream}}>
{
public:
bool run(Db &db, code_args_t &args,
{{stream}} &stream) override
{
{{code}}
}
~{{class_name}}() {}
};
}
extern "C" ICodeCPU<barrier::{{stream}}>* produce()
{
// BARRIER!
return new barrier::{{class_name}}();
}
extern "C" void destruct(ICodeCPU<barrier::{{stream}}>* p)
{
delete p;
}

View File

@ -10,10 +10,6 @@ using std::endl;
// query: {{query}}
// BARRIER!
namespace barrier
{
class {{class_name}} : public ICodeCPU<{{stream}}>
{
public:
@ -27,16 +23,12 @@ public:
~{{class_name}}() {}
};
}
extern "C" ICodeCPU<barrier::{{stream}}>* produce()
extern "C" ICodeCPU<{{stream}}>* produce()
{
// BARRIER!
return new barrier::{{class_name}}();
return new {{class_name}}();
}
extern "C" void destruct(ICodeCPU<barrier::{{stream}}>* p)
extern "C" void destruct(ICodeCPU<{{stream}}>* p)
{
delete p;
}

View File

@ -111,6 +111,7 @@ set_property(TARGET manual_query_engine PROPERTY CXX_STANDARD 14)
add_executable(manual_query_hasher manual/query_hasher.cpp)
target_link_libraries(manual_query_hasher memgraph)
target_link_libraries(manual_query_hasher ${fmt_static_lib})
target_link_libraries(manual_query_hasher Threads::Threads)
set_property(TARGET manual_query_hasher PROPERTY CXX_STANDARD 14)
# query_action_processor