Hardcoded query infrastructure - first concrete version - USEFUL FOR: POCs & pilots

Summary: Hardcoded query infrastructure - first concrete version - USEFUL FOR: POCs & pilots

Test Plan: manual + jenkins

Reviewers: sale, florijan

Reviewed By: florijan

Subscribers: pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D45
This commit is contained in:
Marko Budiselic 2017-02-14 09:37:32 +01:00
parent 6d981c9cd0
commit 0fcda94162
85 changed files with 1456 additions and 885 deletions

View File

@ -43,6 +43,7 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y")
# dir variables
set(src_dir ${CMAKE_SOURCE_DIR}/src)
set(libs_dir ${CMAKE_SOURCE_DIR}/libs)
set(tests_dir ${CMAKE_SOURCE_DIR}/tests)
set(include_dir ${CMAKE_SOURCE_DIR}/include)
set(build_include_dir ${CMAKE_BINARY_DIR}/include)
set(test_include_dir ${CMAKE_BINARY_DIR}/tests/include)
@ -92,10 +93,21 @@ FILE(RENAME ${CMAKE_BINARY_DIR}/cypher.h ${cypher_build_include_dir}/cypher.h)
# copy query_engine's templates file
FILE(COPY ${src_dir}/query_engine/template
DESTINATION ${CMAKE_BINARY_DIR}/tests)
FILE(COPY ${src_dir}/query_engine/template
DESTINATION ${CMAKE_BINARY_DIR}/tests/integration)
FILE(COPY ${src_dir}/query_engine/template
DESTINATION ${CMAKE_BINARY_DIR}/tests/manual)
FILE(COPY ${src_dir}/query_engine/template DESTINATION ${CMAKE_BINARY_DIR})
# create destination folder for compiled queries
FILE(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/tests/compiled/cpu)
FILE(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/compiled/cpu)
FILE(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/tests/integration/compiled)
FILE(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/tests/manual/compiled)
FILE(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/tests/compiled)
FILE(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/compiled)
# copy hardcoded queries
FILE(COPY ${tests_dir}/integration/hardcoded_query
DESTINATION ${CMAKE_BINARY_DIR}/tests/integration)
FILE(COPY ${tests_dir}/integration/stream
DESTINATION ${CMAKE_BINARY_DIR}/tests/integration)
# -----------------------------------------------------------------------------
# copy files needed for query engine (headers)

View File

@ -32,8 +32,8 @@ FILE(COPY ${include_dir}/storage/type_group_vertex.hpp DESTINATION ${build_inclu
FILE(COPY ${include_dir}/storage/edge_x_vertex.hpp DESTINATION ${build_include_dir}/storage)
FILE(COPY ${include_dir}/query/util.hpp DESTINATION ${build_include_dir}/query)
FILE(COPY ${include_dir}/query/i_plan_cpu.hpp DESTINATION ${build_include_dir}/query)
FILE(COPY ${include_dir}/query/strip/stripped.hpp DESTINATION ${build_include_dir}/query/strip)
FILE(COPY ${include_dir}/query/plan_interface.hpp DESTINATION ${build_include_dir}/query)
FILE(COPY ${include_dir}/query/stripper.hpp DESTINATION ${build_include_dir}/query)
FILE(COPY ${include_dir}/data_structures/concurrent/concurrent_map.hpp DESTINATION ${build_include_dir}/data_structures/concurrent)
FILE(COPY ${include_dir}/data_structures/concurrent/concurrent_set.hpp DESTINATION ${build_include_dir}/data_structures/concurrent)

View File

@ -6,13 +6,10 @@
# (where the executable is runned)
# path to the codes which will be compiled
compile_cpu_path: "./compiled/cpu/"
compile_path: "./compiled/"
# path to the template (cpp) for codes generation
template_cpu_cpp_path: "./template/template_code_cpu_cpp"
# path to the template (hpp) for codes generation
template_cpu_hpp_path: "./template/template_code_cpu.hpp"
template_cpp_path: "./template/template_code_cpp"
# path to the folder with snapshots
snapshots_path: "snapshots"

View File

@ -22,8 +22,8 @@ public:
};
// -- all possible Memgraph's keys --
constexpr const char *COMPILE_CPU_PATH = "compile_cpu_path";
constexpr const char *TEMPLATE_CPU_CPP_PATH = "template_cpu_cpp_path";
constexpr const char *COMPILE_PATH = "compile_path";
constexpr const char *TEMPLATE_CPP_PATH = "template_cpp_path";
constexpr const char *SNAPSHOTS_PATH = "snapshots_path";
constexpr const char *CLEANING_CYCLE_SEC = "cleaning_cycle_sec";
constexpr const char *SNAPSHOT_CYCLE_SEC = "snapshot_cycle_sec";

View File

@ -85,7 +85,8 @@ public:
Accessor access() { return Accessor(&skiplist); }
const Accessor access() const { return Accessor(&skiplist); }
// TODO:
// const Accessor access() const { return Accessor(&skiplist); }
private:
list skiplist;

View File

@ -6,13 +6,13 @@
#include "memory/freelist.hpp"
#include "memory/lazy_gc.hpp"
#include "threading/sync/spinlock.hpp"
#include "logging/default.hpp"
#include "logging/loggable.hpp"
template <class T, class lock_t = SpinLock>
class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>
class SkiplistGC : public LazyGC<SkiplistGC<T, lock_t>, lock_t>, public Loggable
{
public:
SkiplistGC() : logger(logging::log->logger("SkiplistGC")) {}
SkiplistGC() : Loggable("SkiplistGC") {}
// release_ref method should be called by a thread
// when the thread finish it job over object
@ -53,9 +53,6 @@ public:
void collect(T *node) { freelist.add(node); }
protected:
Logger logger;
private:
FreeList<T> freelist;
};

View File

@ -1,93 +0,0 @@
#pragma once
#include <iostream>
#include <string>
#include <stdexcept>
#include <dlfcn.h>
#include <atomic>
using std::cout;
using std::endl;
template<typename T>
class DynamicLib
{
private:
// IMPORTANT: all dynamic libraries must have produce and destruct methods!
const std::string produce_name = "produce";
const std::string destruct_name = "destruct";
using produce_t = typename T::produce;
using destruct_t = typename T::destruct;
public:
produce_t produce_method;
destruct_t destruct_method;
DynamicLib(const std::string& lib_path) :
lib_path(lib_path),
lib_object(nullptr)
{
load();
}
typename T::lib_object* instance()
{
// TODO singleton, concurrency
if (lib_object == nullptr) {
lib_object = this->produce_method();
}
return lib_object;
}
void load()
{
load_lib();
load_produce_func();
load_destruct_func();
}
~DynamicLib()
{
if (lib_object != nullptr) {
destruct_method(lib_object);
}
}
private:
std::string lib_path;
void *dynamic_lib;
typename T::lib_object *lib_object;
void load_lib()
{
dynamic_lib = dlopen(lib_path.c_str(), RTLD_LAZY);
if (!dynamic_lib) {
throw std::runtime_error(dlerror());
}
dlerror();
}
void load_produce_func()
{
produce_method = (produce_t) dlsym(
dynamic_lib,
produce_name.c_str()
);
const char* dlsym_error = dlerror();
if (dlsym_error) {
throw std::runtime_error(dlsym_error);
}
}
void load_destruct_func()
{
destruct_method = (destruct_t) dlsym(
dynamic_lib,
destruct_name.c_str()
);
const char *dlsym_error = dlerror();
if (dlsym_error) {
throw std::runtime_error(dlsym_error);
}
}
};

View File

@ -2,16 +2,28 @@
#include "logging/default.hpp"
/**
* @class Loggable
*
* @brief Base class that could be used in all classed which need a logging
* functionality.
*/
class Loggable
{
public:
Loggable(std::string &&name)
: logger(logging::log->logger(std::forward<std::string>(name)))
/**
* Sets logger name.
*/
Loggable(const std::string &name)
: logger(logging::log->logger(name))
{
}
virtual ~Loggable() {}
protected:
/**
* Logger instance that can be used only from derived classes.
*/
Logger logger;
};

View File

@ -15,7 +15,7 @@ public:
CypherBackend() : logger(logging::log->logger("CypherBackend"))
{
// load template file
std::string template_path = CONFIG(config::TEMPLATE_CPU_CPP_PATH);
std::string template_path = CONFIG(config::TEMPLATE_CPP_PATH);
template_text = utils::read_text(fs::path(template_path));
}
@ -38,7 +38,7 @@ public:
// save the code
std::string generated = template_engine::render(
template_text.str(), {{"class_name", "CodeCPU"},
template_text.str(), {{"class_name", "CPUPlan"},
{"stripped_hash", std::to_string(stripped_hash)},
{"query", query},
{"stream", type_name<Stream>().to_string()},

View File

@ -1,21 +0,0 @@
#pragma once
#include "query/i_plan_cpu.hpp"
#include "dc/dynamic_lib.hpp"
namespace
{
template <typename Stream>
class MemgraphDynamicLib
{
public:
using produce = produce_t<Stream>;
using destruct = destruct_t<Stream>;
using lib_object = IPlanCPU<Stream>;
};
template <typename Stream>
using CodeLib = DynamicLib<MemgraphDynamicLib<Stream>>;
}

View File

@ -1,68 +1,223 @@
#pragma once
#include <experimental/filesystem>
#include "database/db.hpp"
#include "logging/default.hpp"
#include "query/exception/query_engine.hpp"
#include "query/plan/program.hpp"
#include "query/plan/program_loader.hpp"
#include "query/plan/program_executor.hpp"
/*
* Current arhitecture:
* query -> code_loader -> query_stripper -> [code_generator]
* -> [code_compiler] -> code_executor
*/
namespace fs = std::experimental::filesystem;
// query engine has to be aware of the Stream because Stream
// is passed to the dynamic shared library
template <typename Stream>
class QueryEngine
{
public:
QueryEngine() : logger(logging::log->logger("QueryEngine")) {}
#include "database/db.hpp"
#include "logging/loggable.hpp"
#include "query/exception/query_engine.hpp"
#include "query/plan_compiler.hpp"
#include "query/plan_generator.hpp"
#include "query/plan_interface.hpp"
#include "query/preprocessor.hpp"
#include "utils/dynamic_lib.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
auto execute(const std::string &query, Db &db, Stream &stream)
// TODO: replace with openCypher and Antlr
#include "query/frontend/cypher.hpp"
// TODO: depricated
#include "query/backend/cpp_old/cypher.hpp"
/**
* Responsible for query execution. Depends on Db and compiler.
*
* Current Query Engine arhitecture:
* query -> query_stripper -> [plan_generator] -> [plan_compiler] -> execution
*
* @tparam Stream the query engine has to be aware of the Stream because Stream
* is passed to the dynamic shared library because that is the way how
* the results should be returned (more optimal then just return
* the whole result set)
*/
template <typename Stream>
class QueryEngine : public Loggable
{
private:
using QueryPlanLib = DynamicLib<QueryPlanTrait<Stream>>;
using HashT = QueryPreprocessor::HashT;
public:
QueryEngine() : Loggable("QueryEngine") {}
/**
* Reloads query plan (plan_path contains compiled query plan).
* This methdo just calculates stripped query and offloads everything else
* to the LoadCpp method.
*
* @param query a query for which the plan will be loaded
* @param plan_path a custom made cpp query plan
*
* @return void
*/
auto ReloadCustom(const std::string &query, const fs::path &plan_path)
{
try {
auto program = program_loader.load(query);
auto result = program_executor.execute(program, db, stream);
if (UNLIKELY(!result)) {
auto preprocessed = preprocessor.preprocess(query);
Unload(query);
LoadCpp(plan_path, preprocessed.hash);
}
/**
* Executes query on the database with the stream. If a query plan is cached
* (based on query hash) it will be used.
*
* @param query a query that is going to be executed
* @param db database againt the query is going to be executed
* @param stream the resuts will be send to the stream
*
* @return query execution status:
* false if query wasn't executed successfully
* true if query execution was successfull
*/
auto Run(const std::string &query, Db &db, Stream &stream)
{
try
{
auto preprocessed = preprocessor.preprocess(query);
auto plan = LoadCypher(preprocessed);
auto result = plan->run(db, preprocessed.arguments, stream);
if (UNLIKELY(!result))
{
// info because it might be something like deadlock in which
// case one thread is stopped and user has try again
logger.info(
"Unable to execute query (executor returned false)");
"Unable to execute query (execution returned false)");
}
return result;
} catch (CypherLexicalError &e) {
}
catch (CypherLexicalError &e)
{
logger.error("CypherLexicalError: {}", std::string(e.what()));
throw e;
} catch (QueryEngineException &e) {
}
catch (QueryEngineException &e)
{
logger.error("QueryEngineException: {}", std::string(e.what()));
throw e;
} catch (std::exception &e) {
throw e;
}
catch (std::exception &e)
{
throw BasicException(e.what());
}
catch (...)
{
throw BasicException("unknown query engine exception");
}
}
// preload functionality
auto load(const uint64_t hash, const fs::path& path)
/**
* Unloads query plan and release the resources (should be automatically).
*
* @param query a query for which the query plan will be unloaded.
*
* return bool is the plan unloaded
*/
auto Unload(const std::string &query)
{
program_loader.load(hash, path);
return query_plans.access().remove(preprocessor.preprocess(query).hash);
}
auto load(const std::string& query)
/**
* Checks is a plan for the query loaded.
*
* @param query for which a plan existance will be checked
*
* return bool
*/
auto Loaded(const std::string &query)
{
program_loader.load(query);
auto plans_accessor = query_plans.access();
return plans_accessor.find(preprocessor.preprocess(query).hash) !=
plans_accessor.end();
}
protected:
Logger logger;
/**
* The number of loaded query plans.
*
* @return size_t the number of loaded query plans
*/
auto Size() { // TODO: const once whan ConcurrentMap::Accessor becomes const
return query_plans.access().size();
}
// return query_plans.access().size(); }
private:
ProgramExecutor<Stream> program_executor;
ProgramLoader<Stream> program_loader;
/**
* Loads query plan eather from hardcoded folder or from the file that is
* generated in this method.
*
* @param stripped a stripped query
*
* @return runnable query plan
*/
auto LoadCypher(const StrippedQuery<HashT> &stripped)
{
auto plans_accessor = query_plans.access();
// code is already compiled and loaded, just return runnable
// instance
auto query_plan_it = plans_accessor.find(stripped.hash);
if (query_plan_it != plans_accessor.end())
return query_plan_it->second->instance();
// find hardcoded query plan if exists
auto hardcoded_path =
fs::path(CONFIG(config::COMPILE_PATH) + "hardcode/" +
std::to_string(stripped.hash) + ".cpp");
if (fs::exists(hardcoded_path))
return LoadCpp(hardcoded_path, stripped.hash);
// generate query plan
auto generated_path =
fs::path(CONFIG(config::COMPILE_PATH) + std::to_string(stripped.hash) + ".cpp");
plan_generator.generate_plan(stripped.query, stripped.hash,
generated_path);
return LoadCpp(generated_path, stripped.hash);
}
/**
* Load cpp query plan from a file. Or if plan is already cached from the
* cache.
*
* @param path_cpp a path to query plan
* @param hash query hash
*
* @return runnable query plan
*/
auto LoadCpp(const fs::path &path_cpp, const QueryPreprocessor::HashT hash)
{
auto plans_accessor = query_plans.access();
// code is already compiled and loaded, just return runnable
// instance
auto query_plan_it = plans_accessor.find(hash);
if (query_plan_it != plans_accessor.end())
return query_plan_it->second->instance();
// generate dynamic lib path
// The timestamp has been added here because dlopen
// uses path and returns the same handler for the same path
// and that is a problem because at this point we want brand new
// dynamic lib. That is the tmp solution. The right solution would be
// to deal with this problem in DynamicLib
auto path_so = CONFIG(config::COMPILE_PATH) + std::to_string(hash) +
"_" + (std::string)Timestamp::now() + ".so";
plan_compiler.compile(path_cpp, path_so);
auto query_plan = std::make_unique<QueryPlanLib>(path_so);
// TODO: underlying object has to be live during query execution
// fix that when Antler will be introduced into the database
auto query_plan_instance = query_plan->instance(); // because of move
plans_accessor.insert(hash, std::move(query_plan));
// return an instance of runnable code (PlanInterface)
return query_plan_instance;
}
QueryPreprocessor preprocessor;
PlanGenerator<cypher::Frontend, CypherBackend<Stream>> plan_generator;
PlanCompiler plan_compiler;
ConcurrentMap<QueryPreprocessor::HashT, std::unique_ptr<QueryPlanLib>>
query_plans;
};

View File

@ -1,20 +0,0 @@
#pragma once
#include "communication/communication.hpp"
#include "database/db.hpp"
#include "database/db_accessor.hpp"
#include "query/strip/stripped.hpp"
template <typename Stream>
class IPlanCPU
{
public:
virtual bool run(Db &db, plan_args_t &args, Stream &stream) = 0;
virtual ~IPlanCPU() {}
};
template <typename Stream>
using produce_t = IPlanCPU<Stream> *(*)();
template <typename Stream>
using destruct_t = void (*)(IPlanCPU<Stream> *);

View File

@ -1,27 +0,0 @@
#pragma once
#include <memory>
#include <vector>
#include "query/backend/backend.hpp"
namespace ir
{
class Node
{
public:
virtual ~Node() {}
virtual void accept(Backend* visitor)
{
for (auto &child : childs)
{
visitor->process(child.get());
}
}
std::vector<std::unique_ptr<Node>> childs;
Node *parent;
};
}

View File

@ -1,5 +0,0 @@
#pragma once
class Query : public Node
{
};

View File

@ -1,26 +0,0 @@
#pragma once
#include "query/i_plan_cpu.hpp"
#include "query/strip/stripped.hpp"
/*
* Query Program Contains:
* * Query Plan
* * Query Arguments (Stripped)
*/
template <typename Stream>
struct QueryProgram
{
using plan_t = IPlanCPU<Stream>;
QueryProgram(plan_t *plan, QueryStripped &&stripped)
: plan(plan), stripped(std::forward<QueryStripped>(stripped))
{
}
QueryProgram(QueryProgram &other) = delete;
QueryProgram(QueryProgram &&other) = default;
plan_t *plan;
QueryStripped stripped;
};

View File

@ -1,30 +0,0 @@
#pragma once
#include <string>
#include "database/db.hpp"
#include "query/exception/query_engine.hpp"
#include "query/exception/plan_execution.hpp"
#include "query/plan/program.hpp"
#include "query/util.hpp"
// preparations before execution
// execution
// postprocess the results
template <typename Stream>
class ProgramExecutor
{
public:
// QueryProgram has to know about the Stream
// Stream has to be passed in this function for every execution
auto execute(QueryProgram<Stream> &program, Db &db, Stream &stream)
{
try {
return program.plan->run(db, program.stripped.arguments, stream);
// TODO: catch more exceptions
} catch (...) {
throw PlanExecutionException("");
}
}
};

View File

@ -1,114 +0,0 @@
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <experimental/filesystem>
#include "config/config.hpp"
#include "logging/default.hpp"
#include "query/backend/cpp_old/cypher.hpp"
#include "query/dynamic_lib.hpp"
#include "query/frontend/cypher.hpp"
#include "query/plan/compiler.hpp"
#include "query/plan/generator.hpp"
#include "query/plan/program.hpp"
#include "query/preprocesor.hpp"
#include "utils/file.hpp"
#include "utils/hashing/fnv.hpp"
namespace fs = std::experimental::filesystem;
template <typename Stream>
class ProgramLoader
{
public:
using code_lib_t = CodeLib<Stream>;
using sptr_code_lib = std::shared_ptr<code_lib_t>;
using query_program_t = QueryProgram<Stream>;
ProgramLoader() : logger(logging::log->logger("PlanLoader")) {}
// TODO: decouple load(query) method
auto load(const uint64_t hash, const fs::path &path)
{
// TODO: get lib path (that same folder as path folder or from config)
// TODO: compile
// TODO: dispose the old lib
// TODO: store the compiled lib
}
auto load(const std::string &query)
{
auto preprocessed = preprocessor.preprocess(query);
logger.debug("stripped_query = {}", preprocessed.query);
logger.debug("query_hash = {}", std::to_string(preprocessed.hash));
auto code_lib_iter = code_libs.find(preprocessed.hash);
// code is already compiled and loaded, just return runnable
// instance
if (code_lib_iter != code_libs.end()) {
auto code = code_lib_iter->second->instance();
return query_program_t(code, std::move(preprocessed));
}
auto base_path = CONFIG(config::COMPILE_CPU_PATH);
auto hash_string = std::to_string(preprocessed.hash);
auto path_cpp = base_path + hash_string + ".cpp";
auto hard_code_cpp = base_path + "hardcode/" + hash_string + ".cpp";
auto stripped_space = preprocessor.strip_space(query);
// cpp files in the hardcode folder have bigger priority then
// other cpp files
if (!utils::fexists(hard_code_cpp)) {
plan_generator.generate_plan(stripped_space.query,
preprocessed.hash, path_cpp);
}
// compile the code
auto path_so = base_path + hash_string + ".so";
// hardcoded queries are compiled to the same folder as generated
// queries (all .so files are in the same folder)
if (utils::fexists(hard_code_cpp)) {
plan_compiler.compile(hard_code_cpp, path_so);
} else {
plan_compiler.compile(path_cpp, path_so);
}
// loads dynamic lib and store it
auto code_lib = load_code_lib(path_so);
code_libs.insert({{preprocessed.hash, code_lib}});
// return an instance of runnable code (ICodeCPU)
return query_program_t(code_lib->instance(), std::move(preprocessed));
}
protected:
Logger logger;
private:
// TODO ifdef MEMGRAPH64 problem, how to use this kind
// of ifdef functions?
// uint64_t depends on fnv function
// TODO: faster datastructure
std::unordered_map<uint64_t, sptr_code_lib> code_libs;
QueryPreprocessor preprocessor;
// TODO: compile time switch between frontends and backends
using frontend_t = cypher::Frontend;
using backend_t = CypherBackend<Stream>;
PlanGenerator<frontend_t, backend_t> plan_generator;
PlanCompiler plan_compiler;
sptr_code_lib load_code_lib(const std::string &path)
{
sptr_code_lib code_lib = std::make_shared<CodeLib<Stream>>(path);
code_lib->load();
return code_lib;
}
};

View File

@ -5,16 +5,28 @@
#include "logging/default.hpp"
#include "query/exception/plan_compilation.hpp"
#include "utils/string/join.hpp"
#include "logging/loggable.hpp"
// TODO:
// * all libraries have to be compiled in the server compile time
// * compile command has to be generated
class PlanCompiler
/**
* Compiles code into shared object (.so)
*/
class PlanCompiler : public Loggable
{
public:
PlanCompiler() : logger(logging::log->logger("PlanCompiler")) {}
PlanCompiler() : Loggable("PlanCompiler") {}
/**
* Compiles in_file into out_file (.cpp -> .so)
*
* @param in_file C++ file that can be compiled into dynamic lib
* @param out_file dynamic lib (on linux .so)
*
* @return void
*/
void compile(const std::string &in_file, const std::string &out_file)
{
std::string flags;
@ -39,17 +51,22 @@ public:
flags += " -DLOG_NO_ERROR";
#endif
// TODO: load from config (generate compile command)
// generate compile command
auto compile_command = utils::prints(
"clang++" + flags,
// "-std=c++1y -O2 -DNDEBUG", // compile flags
"-std=c++1y", // compile flags // TODO: load from config file
// "-std=c++1y -O2 -DNDEBUG",
"-std=c++1y", // compile flags
in_file, // input file
"-o", out_file, // ouput file
"-I./include", // include paths (TODO: parameter)
"-I./include", // include paths
"-I../include",
"-I../libs/fmt", // TODO: load from config
"-I../../libs/fmt", "-L./ -L../",
"-I../../include",
"-I../../../include",
"-I../libs/fmt",
"-I../../libs/fmt",
"-I../../../libs/fmt",
"-L./ -L../ -L../../",
"-lmemgraph_pic",
"-shared -fPIC" // shared library flags
);
@ -59,8 +76,10 @@ public:
// synchronous call
auto compile_status = system(compile_command.c_str());
logger.debug("compile status {}", compile_status);
// if compilation has failed throw exception
if (compile_status == -1) {
if (compile_status != 0) {
logger.debug("FAIL: Query Code Compilation: {} -> {}", in_file,
out_file);
throw PlanCompilationException(
@ -71,7 +90,4 @@ public:
logger.debug("SUCCESS: Query Code Compilation: {} -> {}", in_file,
out_file);
}
protected:
Logger logger;
};

View File

@ -0,0 +1,50 @@
#pragma once
#include "communication/communication.hpp"
#include "database/db.hpp"
#include "database/db_accessor.hpp"
#include "query/stripped.hpp"
/**
* @class PlanInterface
*
* @brief Pure Abstract class that is interface to query plans.
*
* @tparam Stream stream for results writing.
*/
template <typename Stream>
class PlanInterface
{
public:
/**
* In derived classes this method has to be overriden in order to implement
* concrete execution plan.
*
* @param db active database instance
* @param args plan argument (vector of literal values from the query)
* @param stream stream for results writing
*
* @return bool status after execution (success OR fail)
*/
virtual bool run(Db &db, const PlanArgsT &args, Stream &stream) = 0;
/**
* Virtual destructor in base class.
*/
virtual ~PlanInterface() {}
};
/**
* Defines type of underlying extern C functions and library object class name.
* (ObjectPrototype).
*
* @tparam underlying object depends on Stream template parameter because
* it will send the results throughout a custom Stream object.
*/
template <typename Stream>
struct QueryPlanTrait
{
using ObjectPrototype = PlanInterface<Stream>;
using ProducePrototype = PlanInterface<Stream> *(*)();
using DestructPrototype = void (*)(PlanInterface<Stream> *);
};

View File

@ -1,36 +0,0 @@
#pragma once
#include "query/strip/stripper.hpp"
/*
* Query preprocessing contains:
* * query stripping
*
* The preprocessing results are:
* * stripped query |
* * stripped arguments |-> QueryStripped
* * stripped query hash |
*/
class QueryPreprocessor
{
public:
QueryPreprocessor()
: stripper(make_query_stripper(TK_LONG, TK_FLOAT, TK_STR, TK_BOOL))
{
}
auto strip_space(const std::string& query)
{
return stripper.strip_space(query);
}
auto preprocess(const std::string &query) { return stripper.strip(query); }
private:
// In C++17 the ints should be unnecessary?
// as far as I understand in C++17 class template parameters
// can be deduced just like function template parameters
// TODO: once C++ 17 will be well suported by comilers
// refactor this piece of code
QueryStripper<int, int, int, int> stripper;
};

View File

@ -0,0 +1,59 @@
#pragma once
#include "logging/loggable.hpp"
#include "query/stripper.hpp"
/*
* Query preprocessing contains:
* * query stripping
*
* This class is here because conceptually process of query preprocessing
* might have more than one step + in current version of C++ standard
* isn't trivial to instantiate QueryStripper because of template arguments +
* it depends on underlying lexical analyser.
*
* The preprocessing results are:
* * stripped query |
* * stripped arguments |-> StrippedQuery
* * stripped query hash |
*/
class QueryPreprocessor : public Loggable
{
private:
// In C++17 the ints will be removed.
// as far as I understand in C++17 class template parameters
// can be deduced just like function template parameters
// TODO: once C++ 17 will be well suported by comilers
// refactor this piece of code because it is hard to maintain.
using QueryStripperT = QueryStripper<int, int, int, int>;
public:
using HashT = QueryStripperT::HashT;
QueryPreprocessor() : Loggable("QueryPreprocessor"),
stripper(make_query_stripper(TK_LONG, TK_FLOAT, TK_STR, TK_BOOL))
{
}
/**
* Preprocess the query:
* * strip parameters
* * calculate query hash
*
* @param query that is going to be stripped
*
* @return QueryStripped object
*/
auto preprocess(const std::string &query)
{
auto preprocessed = stripper.strip(query);
logger.info("stripped_query = {}", preprocessed.query);
logger.info("query_hash = {}", preprocessed.hash);
return stripper.strip(query);
}
private:
QueryStripperT stripper;
};

View File

@ -1,32 +0,0 @@
#pragma once
#include <vector>
#include "storage/model/properties/property.hpp"
/*
* Query Plan Arguments Type
*/
using plan_args_t = std::vector<Property>;
/*
* QueryStripped contains:
* * stripped query
* * plan arguments stripped from query
* * hash of stripped query
*/
struct QueryStripped
{
QueryStripped(const std::string &&query, plan_args_t &&arguments,
uint64_t hash)
: query(std::forward<const std::string>(query)),
arguments(std::forward<plan_args_t>(arguments)), hash(hash)
{
}
QueryStripped(QueryStripped &other) = delete;
QueryStripped(QueryStripped &&other) = default;
std::string query;
plan_args_t arguments;
uint64_t hash;
};

View File

@ -0,0 +1,55 @@
#pragma once
#include "storage/model/properties/property.hpp"
/*
* Query Plan Arguments Type
*/
using PlanArgsT = std::vector<Property>;
/*
* StrippedQuery contains:
* * stripped query
* * plan arguments stripped from query
* * hash of stripped query
*
* @tparam THash a type of query hash
*/
template <typename THash>
struct StrippedQuery
{
StrippedQuery(const std::string &&query, PlanArgsT &&arguments, THash hash)
: query(std::forward<const std::string>(query)),
arguments(std::forward<PlanArgsT>(arguments)), hash(hash)
{
}
/**
* Copy constructor is deleted because we don't want to make unecessary
* copies of this object (copying of string and vector could be expensive)
*/
StrippedQuery(const StrippedQuery &other) = delete;
StrippedQuery &operator=(const StrippedQuery &other) = delete;
/**
* Move is allowed operation because it is not expensive and we can
* move the object after it was created.
*/
StrippedQuery(StrippedQuery &&other) = default;
StrippedQuery &operator=(StrippedQuery &&other) = default;
/**
* Stripped query
*/
const std::string query;
/**
* Stripped arguments
*/
const PlanArgsT arguments;
/**
* Hash based on stripped query.
*/
const THash hash;
};

View File

@ -1,5 +1,6 @@
#pragma once
#include <vector>
#include <iostream>
#include <string>
#include <tuple>
@ -9,23 +10,42 @@
#include "cypher/cypher.h"
#include "logging/loggable.hpp"
#include "query/language/cypher/tokenizer/cypher_lexer.hpp"
#include "query/strip/stripped.hpp"
#include "storage/model/properties/all.hpp"
#include "utils/hashing/fnv.hpp"
#include "utils/string/transform.hpp"
#include "utils/variadic/variadic.hpp"
#include "query/stripped.hpp"
// TODO: all todos will be resolved once Antler will be integrated
// TODO: Maybe std::move(v) is faster, but it must be cheked for validity.
template <class T, class V>
void store_query_param(plan_args_t &arguments, V &&v)
void store_query_param(PlanArgsT &arguments, V &&v)
{
arguments.emplace_back(Property(T(std::move(v)), T::type));
}
// TODO: hash function should be a template parameter
// TODO: move StrippedQuery into this file, maybe into QueryStripper object
/**
* @class QueryStripper
*
* @brief QueryStripper is responsible for the query stripping
* (taking literal values from the query) and for hash calculation based
* on the stripped query. The whole task is done at once and StrippedQuery
* object is returned as a result.
*
* @tparam Ts type of token ids from underlying lexical analyser. The
* lexical analyser is needed because we have to know what is and
* what isn't a literal value.
*/
template <typename... Ts>
class QueryStripper : public Loggable
{
public:
using HashT = uint64_t;
QueryStripper(Ts &&... strip_types)
: Loggable("QueryStripper"),
strip_types(std::make_tuple(std::forward<Ts>(strip_types)...)),
@ -41,9 +61,7 @@ public:
{
}
auto strip_space(const std::string &query) { return strip(query, " "); }
auto strip(const std::string &query, const std::string &separator = "")
auto strip(const std::string &query, const std::string &separator = " ")
{
// -------------------------------------------------------------------
// TODO: write speed tests and then optimize, because this
@ -60,7 +78,7 @@ public:
constexpr auto size = std::tuple_size<decltype(strip_types)>::value;
int counter = 0;
plan_args_t stripped_arguments;
PlanArgsT stripped_arguments;
std::string stripped_query;
stripped_query.reserve(query.size());
@ -104,6 +122,9 @@ public:
// should be the same
// TODO: probably we shoud do the lowercase before
// or during the tokenization (SPEED TESTS)
// TODO: stripped shouldn't be responsible for the process
// of lowercasing -> reorganize this in the process of
// Antlr integration
if (token.id == TK_OR || token.id == TK_AND ||
token.id == TK_NOT || token.id == TK_WITH ||
token.id == TK_SET || token.id == TK_CREATE ||
@ -120,10 +141,9 @@ public:
}
}
// TODO: hash function should be a template parameter
auto hash = fnv(stripped_query);
return QueryStripped(std::move(stripped_query),
std::move(stripped_arguments), hash);
return StrippedQuery<HashT>(std::move(stripped_query),
std::move(stripped_arguments), hash);
}
private:
@ -134,7 +154,7 @@ private:
bool _or(Value &&value, Tuple &&tuple, std::index_sequence<index...>)
{
return utils::or_vargs(std::forward<Value>(value),
std::get<index>(std::forward<Tuple>(tuple))...);
std::get<index>(std::forward<Tuple>(tuple))...);
}
};

View File

@ -45,16 +45,17 @@ class ProgramArguments {
bool is_valid() {
for (const auto &arg : required_arguments_)
for (const auto &a : arguments_)
if (!arguments_.count(arg)) return false;
return true;
}
protected:
ProgramArguments() {}
~ProgramArguments() {}
ProgramArguments() {
}
public:
~ProgramArguments() {
}
class Argument {
private:
std::string arg_;

View File

@ -0,0 +1,122 @@
#pragma once
#include <dlfcn.h>
#include <experimental/filesystem>
#include <stdexcept>
#include <string>
namespace fs = std::experimental::filesystem;
#include "logging/loggable.hpp"
/**
* DynamicLib is a wrapper aroung dynamic object returned by dlopen.
*
* Dynamic object must have extern C functions which names should be
* "produce" and "destruct" (that's by convention).
*
* The functions prototypes can be defined with template parameter
* type trait (T).
*
* DynamicLib isn't implemented for concurrent access.
*
* @tparam T type trait which defines the prototypes of extern C functions
* of undelying dynamic object.
*/
template <typename T>
class DynamicLib : public Loggable
{
public:
/**
* Initializes dynamic library (loads lib, produce and
* destruct functions)
*
* @param lib_path file system path to dynamic library
*/
DynamicLib(const fs::path &lib_path)
: Loggable("DynamicLib"), lib_path(lib_path), lib_object(nullptr)
{
// load dynamic lib
dynamic_lib = dlopen(lib_path.c_str(), RTLD_NOW);
if (!dynamic_lib) throw std::runtime_error(dlerror());
dlerror(); /* Clear any existing error */
logger.trace("dynamic lib at ADDRESS({}) was opened", dynamic_lib);
// load produce method
this->produce_method =
(typename T::ProducePrototype)dlsym(dynamic_lib, "produce");
const char *dlsym_produce_error = dlerror();
if (dlsym_produce_error) throw std::runtime_error(dlsym_produce_error);
// load destruct method
this->destruct_method =
(typename T::DestructPrototype)dlsym(dynamic_lib, "destruct");
const char *dlsym_destruct_error = dlerror();
if (dlsym_destruct_error)
throw std::runtime_error(dlsym_destruct_error);
}
// becuase we are dealing with pointers
// and conceptualy there is no need to copy the instance of this class
// the copy constructor and copy assignment operator are deleted
// the same applies to move methods
DynamicLib(const DynamicLib &other) = delete;
DynamicLib &operator=(const DynamicLib &other) = delete;
DynamicLib(DynamicLib &&other) = delete;
DynamicLib &operator=(DynamicLib &&other) = delete;
/**
* Singleton method. Returns the same instance of underlying class
* for every call. The instance of underlying class is returned by
* extern C produce function.
*
* @return T the instance of lib class
*/
typename T::ObjectPrototype *instance()
{
if (lib_object == nullptr) lib_object = this->produce_method();
return lib_object;
}
/**
* Clean underlying object and close the lib
*/
~DynamicLib()
{
// first destroy underlying object
if (lib_object != nullptr)
{
logger.trace("shared object at ADDRESS({}) will be destroyed",
(void *)lib_object);
this->destruct_method(lib_object);
}
// then destroy dynamic lib
logger.trace("unloading lib {}", lib_path.c_str());
if (dynamic_lib != nullptr)
{
logger.trace("closing dynamic lib ADDRESS({})",
(void *)dynamic_lib);
// // IMPORTANT: weird problem the program SEGFAULT on dlclose
// // TODO: FIX somehow
// // maybe something similar is:
// //
// http://stackoverflow.com/questions/6450828/segmentation-fault-when-using-dlclose-on-android-platform
// // for now it is not crucial so I've created a task for that
// // ! 0 is success
// int closing_status = dlclose(dynamic_lib);
// if (closing_status != 0)
// throw std::runtime_error("dynamic lib closing error");
}
else
{
logger.trace("unload lib was called but lib ptr is null");
}
}
private:
std::string lib_path;
void *dynamic_lib;
typename T::ObjectPrototype *lib_object;
typename T::ProducePrototype produce_method;
typename T::DestructPrototype destruct_method;
};

View File

@ -2,19 +2,59 @@
#include <fstream>
// TODO: remove experimental from here once that becomes possible (C++17
// standard)
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
namespace utils
{
inline bool fexists(const char *filename)
/**
* Loads all file paths in the specified directory. Optionally
* the paths are filtered by extension.
*
* NOTE: the call isn't recursive
*
* @param directory a path to directory that will be scanned in order to find
* all paths
* @param extension paths will be filtered by this extension
*
* @return std::vector of paths founded in the directory
*/
inline auto LoadFilePaths(const fs::path &directory,
const std::string &extension = "")
{
std::ifstream ifile(filename);
return (bool)ifile;
}
// result container
std::vector<fs::path> file_paths;
inline bool fexists(const std::string& filename)
{
std::ifstream ifile(filename.c_str());
return (bool)ifile;
}
for (auto &directory_entry : fs::recursive_directory_iterator(directory))
{
auto path = directory_entry.path().string();
// skip directories
if (!fs::is_regular_file(directory_entry)) continue;
// if extension isn't defined then put all file paths from the directory
// to the result set
if (!extension.empty()) {
// skip paths that don't have appropriate extension
auto file_extension = path.substr(path.find_last_of(".") + 1);
if (file_extension != extension) continue;
}
file_paths.emplace_back(path);
// skip paths that don't have appropriate extension
auto file_extension = path.substr(path.find_last_of(".") + 1);
if (file_extension != extension) continue;
// path has the right extension and can be placed in the result
// container
file_paths.emplace_back(path);
}
return file_paths;
}
}

View File

@ -54,19 +54,38 @@ constexpr uint64_t IN_HEADER_SIZE = sizeof(struct inotify_event);
* a reasonable size and doesn't have to be configurable before compile or run
* time.
*/
constexpr uint64_t IN_BUFF_LEN = 10 * (IN_HEADER_SIZE + NAME_MAX + 1);
constexpr uint64_t IN_BUFF_SLOT_LEN = IN_HEADER_SIZE + NAME_MAX + 1;
constexpr uint64_t IN_BUFF_LEN = 10 * IN_BUFF_SLOT_LEN;
/**
* File System Event Type - abstraction for underlying event types
*/
enum class FSEventType : os_mask_t
{
Created = IN_CREATE,
Modified = IN_MODIFY,
Deleted = IN_DELETE,
All = Created | Modified | Deleted
Accessed = IN_ACCESS,
MetadataChanged = IN_ATTRIB,
CloseWrite = IN_CLOSE_WRITE,
CloseNowrite = IN_CLOSE_NOWRITE,
Created = IN_CREATE,
Deleted = IN_DELETE,
DeletedSelf = IN_DELETE_SELF,
Modified = IN_MODIFY,
Renamed = IN_MOVE_SELF,
MovedFrom = IN_MOVED_FROM,
MovedTo = IN_MOVED_TO,
Close = IN_CLOSE,
Opened = IN_OPEN,
Ignored = IN_IGNORED,
All = Accessed | MetadataChanged | CloseWrite | CloseNowrite | Created |
Deleted | DeletedSelf | Modified | Renamed | MovedFrom | MovedTo |
Close | Opened | Ignored
};
inline FSEventType operator|(FSEventType lhs, FSEventType rhs)
{
return (FSEventType)(underlying_cast(lhs) | underlying_cast(rhs));
}
/**
* @struct FSEventBase
*
@ -156,6 +175,8 @@ public:
FSWatcher(ms check_interval = ms(100))
: Loggable("FSWatcher"), check_interval_(check_interval)
{
logger.debug("Inotify header length: {}", IN_HEADER_SIZE);
logger.debug("Inotify buffer length: {}", IN_BUFF_LEN);
inotify_fd_ = inotify_init();
if (inotify_fd_ == -1)
throw FSWatcherException("Unable to initialize inotify\n");
@ -286,6 +307,8 @@ public:
*/
void start()
{
if (is_running_.load()) return;
is_running_.store(true);
// run separate thread
@ -313,9 +336,20 @@ public:
auto in_event = reinterpret_cast<in_event_t *>(p);
auto in_event_length = IN_HEADER_SIZE + in_event->len;
// sometimes inotify system returns event->len that is
// longer then the length of the buffer
// TODO: figure out why (it is not easy)
if (((p - buffer_) + in_event_length) > IN_BUFF_LEN) break;
// here should be an assertion
// runtime_assert(in_event_length <= IN_BUFF_SLOT_LEN,
// "Inotify event length cannot be bigger
// than "
// "Inotify slot length");
// skip if in_event is undefined OR is equal to IN_IGNORED
if ((in_event->len == 0 && in_event->mask == 0) ||
in_event->mask == IN_IGNORED)
in_event->mask == IN_IGNORED ||
in_event_length == IN_HEADER_SIZE) // skip empty paths
{
p += in_event_length;
continue;

View File

@ -1,8 +1,6 @@
#pragma once
#include <cerrno>
// TODO: remove experimental from here once that becomes possible
#include <experimental/filesystem>
#include <fstream>
#include <ostream>
#include <stdexcept>
@ -11,7 +9,8 @@
#include <fmt/format.h>
// TODO: remove experimental from here once it becomes possible
// TODO: remove experimental from here once that becomes possible
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
namespace utils

View File

@ -0,0 +1,25 @@
#pragma once
#include <string>
namespace utils
{
/**
* Removes whitespace characters from the start and from the end of a string.
*
* @param str string that is going to be trimmed
*
* @return trimmed string
*/
std::string trim(const std::string &str)
{
size_t first = str.find_first_not_of(' ');
if (std::string::npos == first)
{
return str;
}
size_t last = str.find_last_not_of(' ');
return str.substr(first, (last - first + 1));
}
}

View File

@ -1,13 +0,0 @@
#pragma once
#include <string>
namespace web
{
class Client
{
void post(const std::string& url, const std::string& body);
};
}

View File

@ -1,21 +0,0 @@
#pragma once
#include <string>
namespace web
{
class Logger
{
public:
Logger() { /* create connection */ }
// TODO: singleton
void up_ping();
void send(const std::string&);
void down_ping();
};
}

View File

@ -87,7 +87,7 @@ State *Executor::run(Session &session, Query &query)
logger.debug("[ActiveDB] '{}'", db.name());
auto is_successfully_executed =
query_engine.execute(query.statement, db, session.output_stream);
query_engine.Run(query.statement, db, session.output_stream);
if (!is_successfully_executed)
{

View File

@ -9,6 +9,7 @@ namespace logging
{
// per object log source
// TODO: discussion
std::unique_ptr<Log> log;
void init_async()

View File

@ -2,8 +2,14 @@
#include "logging/log.hpp"
#include "logging/logger.hpp"
#include "utils/assert.hpp"
Logger Log::logger(const std::string& name)
Logger Log::logger(const std::string &name)
{
// TODO: once when properties are refactored enable this
// runtime_assert(this != nullptr,
// "This shouldn't be null. This method is "
// "called before the log object is created. "
// "E.g. static variables before main method.");
return Logger(this, name);
}

View File

@ -2,19 +2,19 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
using std::cout;
using std::endl;
// query: {{query}}
// Query: {{query}}
class {{class_name}} : public IPlanCPU<{{stream}}>
class {{class_name}} : public PlanInterface<{{stream}}>
{
public:
bool run(Db &db, plan_args_t &args,
bool run(Db &db, const PlanArgsT &args,
{{stream}} &stream) override
{
{{code}}
@ -23,12 +23,12 @@ public:
~{{class_name}}() {}
};
extern "C" IPlanCPU<{{stream}}>* produce()
extern "C" PlanInterface<{{stream}}>* produce()
{
return new {{class_name}}();
}
extern "C" void destruct(IPlanCPU<{{stream}}>* p)
extern "C" void destruct(PlanInterface<{{stream}}>* p)
{
delete p;
}

View File

@ -2,6 +2,7 @@
#include "storage/type_group_edge.hpp"
#include "storage/type_group_vertex.hpp"
#include "utils/assert.hpp"
template <class T>
PropertyFamily<T>::PropertyFamily(std::string const &name_v)

View File

@ -1,10 +0,0 @@
#include "web/client.hpp"
namespace web
{
void Client::post(const std::string&, const std::string&)
{
}
}

View File

@ -1,18 +0,0 @@
#include "web/logger.hpp"
namespace web
{
void Logger::up_ping()
{
}
void Logger::send(const std::string&)
{
}
void Logger::down_ping()
{
}
}

View File

@ -1,6 +1,6 @@
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "query/preprocesor.hpp"
#include "query/preprocessor.hpp"
#include "utils/time/timer.hpp"
#include "benchmark/benchmark_api.h"
@ -27,7 +27,7 @@ int main(int argc, char **argv)
QueryPreprocessor processor;
using std::placeholders::_1;
std::function<QueryStripped(const std::string &query)> preprocess =
std::function<StrippedQuery<QueryPreprocessor::HashT>(const std::string &query)> preprocess =
std::bind(&QueryPreprocessor::preprocess, &processor, _1);
auto tests = dataset["benchmark_queries"].as<std::vector<std::string>>();

View File

@ -0,0 +1,4 @@
CREATE (g:garment {garment_id: 1234, garment_category_id: 1}) RETURN g
CREATE (g:garment {garment_id: 1235, garment_category_id: 1}) RETURN g
CREATE (p:profile {profile_id: 111, partner_id: 55}) RETURN p
CREATE (p:profile {profile_id: 112, partner_id: 55}) RETURN p

View File

@ -26,6 +26,8 @@ foreach(test_cpp ${test_type_cpps})
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
# link libraries
target_link_libraries(${target_name} dl)
target_link_libraries(${target_name} cypher_lib)
# filesystem
target_link_libraries(${target_name} stdc++fs)
# threads (cross-platform)

View File

@ -11,8 +11,6 @@
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
#include "communication/bolt/v1/serialization/record_stream.hpp"
#include "database/db.hpp"
#include "database/db.hpp"
#include "database/db_accessor.hpp"
#include "database/db_accessor.hpp"
#include "io/network/socket.hpp"
#include "mvcc/id.hpp"
@ -25,7 +23,6 @@
#include "storage/model/properties/property.hpp"
#include "utils/border.hpp"
#include "utils/iterator/iterator.hpp"
#include "utils/iterator/iterator.hpp"
#include "utils/option_ptr.hpp"
#include "utils/reference_wrapper.hpp"
#include "utils/variadic/variadic.hpp"

View File

@ -1,16 +1,17 @@
// TODO: refactor (backlog task)
#include "_hardcoded_query/basic.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "query/preprocesor.hpp"
#include "query/strip/stripper.hpp"
#include "query/preprocessor.hpp"
#include "query/stripper.hpp"
#include "utils/assert.hpp"
#include "utils/sysinfo/memory.hpp"
QueryPreprocessor preprocessor;
template <class Q>
void run(size_t n, std::string &query, Q &qf)
{
QueryPreprocessor preprocessor;
auto stripped = preprocessor.preprocess(query);
logging::info("Running query [{}] x {}.", stripped.hash, n);
@ -31,7 +32,7 @@ void clean_vertex(Db &db)
int main(void)
{
logging::init_async();
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
Db db("cleaning");

View File

@ -3,7 +3,7 @@
#include <string>
#include <vector>
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "query/util.hpp"
#include "storage/edge_x_vertex.hpp"
#include "storage/model/properties/all.hpp"
@ -16,6 +16,8 @@ using std::endl;
// Dressipi astar query of 4 clicks.
// Query: MATCH (a:garment)-[:default_outfit]-(b:garment)-[:default_outfit]-(c:garment)-[:default_outfit]-(d:garment)-[:default_outfit]-(a:garment)-[:default_outfit]-(c:garment), (b:garment)-[:default_outfit]-(d:garment) RETURN a.garment_id,b.garment_id,c.garment_id,d.garment_id
// TODO: figure out from the pattern in a query
constexpr size_t max_depth = 3;
@ -139,10 +141,10 @@ void reverse_stream_ids(Node *node, Stream& stream, VertexPropertyKey key)
stream.write(node->vacc.at(key).template as<Int64>());
}
class PlanCPU : public IPlanCPU<Stream>
class PlanCPU : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -180,6 +182,6 @@ public:
~PlanCPU() {}
};
extern "C" IPlanCPU<Stream> *produce() { return new PlanCPU(); }
extern "C" PlanInterface<Stream> *produce() { return new PlanCPU(); }
extern "C" void destruct(IPlanCPU<Stream> *p) { delete p; }
extern "C" void destruct(PlanInterface<Stream> *p) { delete p; }

View File

@ -0,0 +1,58 @@
#include <iostream>
#include <string>
#include "query/util.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: CREATE (n:ACCOUNT {id: 2322, name: "TEST", country: "Croatia", "created_at": 2352352}) RETURN n
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
auto prop_id = t.vertex_property_key("id", args[0].key.flags());
auto prop_name = t.vertex_property_key("name", args[1].key.flags());
auto prop_country =
t.vertex_property_key("country", args[2].key.flags());
auto prop_created =
t.vertex_property_key("created_at", args[3].key.flags());
auto &label = t.label_find_or_create("ACCOUNT");
auto vertex_accessor = t.vertex_insert();
vertex_accessor.set(prop_id, std::move(args[0]));
vertex_accessor.set(prop_name, std::move(args[1]));
vertex_accessor.set(prop_country, std::move(args[2]));
vertex_accessor.set(prop_created, std::move(args[3]));
vertex_accessor.add_label(label);
stream.write_field("p");
stream.write_vertex_record(vertex_accessor);
stream.write_meta("w");
return t.commit();
}
~CPUPlan() {}
};
extern "C" PlanInterface<Stream>* produce()
{
return new CPUPlan();
}
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -0,0 +1,52 @@
#include <iostream>
#include <string>
#include "query/util.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: MATCH (a {id:0}), (p {id: 1}) CREATE (a)-[r:IS]->(p) RETURN r
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
auto &edge_type = t.type_find_or_create("IS");
auto v1 = t.vertex_find(args[0].as<Int64>().value());
if (!option_fill(v1)) return t.commit(), false;
auto v2 = t.vertex_find(args[1].as<Int64>().value());
if (!option_fill(v2)) return t.commit(), false;
auto edge_accessor = t.edge_insert(v1.get(), v2.get());
edge_accessor.edge_type(edge_type);
stream.write_field("r");
stream.write_edge_record(edge_accessor);
stream.write_meta("w");
return t.commit();
}
~CPUPlan() {}
};
extern "C" PlanInterface<Stream>* produce()
{
return new CPUPlan();
}
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,21 +2,20 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: CREATE (p:profile {profile_id: 111, partner_id: 55}) RETURN p
// Hash: 17158428452166262783
// Query: CREATE (p:profile {profile_id: 112, partner_id: 55}) RETURN p
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -40,15 +39,15 @@ public:
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -0,0 +1,48 @@
#include <iostream>
#include <string>
#include "query/util.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: CREATE (n:LABEL {name: "TEST"}) RETURN n
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
auto property_key = t.vertex_property_key("name", args[0].key.flags());
auto &label = t.label_find_or_create("LABEL");
auto vertex_accessor = t.vertex_insert();
vertex_accessor.set(property_key, std::move(args[0]));
vertex_accessor.add_label(label);
stream.write_field("n");
stream.write_vertex_record(vertex_accessor);
stream.write_meta("w");
return t.commit();
}
~CPUPlan() {}
};
extern "C" PlanInterface<Stream>* produce()
{
return new CPUPlan();
}
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -0,0 +1,48 @@
#include <iostream>
#include <string>
#include "query/util.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: CREATE (n:OTHER {name: "cleaner_test"}) RETURN n
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
auto property_key = t.vertex_property_key("name", args[0].key.flags());
auto &label = t.label_find_or_create("OTHER");
auto vertex_accessor = t.vertex_insert();
vertex_accessor.set(property_key, std::move(args[0]));
vertex_accessor.add_label(label);
stream.write_field("n");
stream.write_vertex_record(vertex_accessor);
stream.write_meta("w");
return t.commit();
}
~CPUPlan() {}
};
extern "C" PlanInterface<Stream>* produce()
{
return new CPUPlan();
}
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -0,0 +1,46 @@
#include <iostream>
#include <string>
#include "query/util.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: CREATE (n {prop: 0}) RETURN n
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
auto property_key = t.vertex_property_key("prop", args[0].key.flags());
auto vertex_accessor = t.vertex_insert();
vertex_accessor.set(property_key, std::move(args[0]));
stream.write_field("n");
stream.write_vertex_record(vertex_accessor);
stream.write_meta("w");
return t.commit();
}
~CPUPlan() {}
};
extern "C" PlanInterface<Stream>* produce()
{
return new CPUPlan();
}
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,21 +2,20 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: CREATE (g:garment {garment_id: 1234, garment_category_id: 1}) RETURN g
// Hash: 18071907865596388702
// Query: CREATE (g:garment {garment_id: 1236, garment_category_id: 1}) RETURN g
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -39,15 +38,15 @@ public:
return t.commit();
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
@ -10,13 +10,12 @@ using std::cout;
using std::endl;
// Query: MATCH (n) DETACH DELETE n
// Hash: 4798158026600988079
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -30,15 +29,15 @@ public:
return t.commit();
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
@ -10,13 +10,12 @@ using std::cout;
using std::endl;
// Query: MATCH (p:garment {garment_id: 1}) DELETE g
// Hash: 11538263096707897794
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -37,15 +36,15 @@ public:
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
@ -10,13 +10,12 @@ using std::cout;
using std::endl;
// Query: MATCH (p:profile {profile_id: 1}) DELETE p
// Hash: 6763665709953344106
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -37,15 +36,15 @@ public:
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,21 +2,21 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: MATCH (p:profile {profile_id: 111, partner_id:55})-[s:score]-(g.garment {garment_id: 1234}) DELETE s
// Hash: 9459600951073026137
// Query: MATCH (p:profile {profile_id: 111, partner_id:55})-[s:score]-(g:garment {garment_id: 1234}) DELETE s
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -77,15 +77,15 @@ public:
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,21 +2,21 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// Query: MATCH (p:profile {profile_id: 111, partner_id:55})-[s:score]-(g:garment {garment_id: 1234}) SET s.score = 1550 RETURN s.score
// Hash: 674581607834128909
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -75,15 +75,15 @@ public:
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
@ -10,13 +10,12 @@ using std::cout;
using std::endl;
// Query: MATCH (g:garment {garment_id: 1234}) SET g:FF RETURN labels(g)
// Hash: 11123780635391515946
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -46,20 +45,20 @@ public:
stream.chunk();
});
stream.write_meta(\"rw");
stream.write_meta("rw");
return t.commit();
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
@ -10,13 +10,12 @@ using std::cout;
using std::endl;
// Query: MATCH (g:garment {garment_id: 3456}) SET g.reveals = 50 RETURN g
// Hash: 2839969099736071844
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -44,15 +43,15 @@ public:
return t.commit();
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
@ -10,13 +10,12 @@ using std::cout;
using std::endl;
// Query: MATCH (p:profile {partner_id: 1}) RETURN p
// Hash: 17506488413143988006
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -39,15 +38,15 @@ public:
return t.commit();
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
@ -10,13 +10,12 @@ using std::cout;
using std::endl;
// Query: MATCH (g:garment {garment_id: 1}) RETURN g
// Hash: 7756609649964321221
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -39,15 +38,15 @@ public:
return t.commit();
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "using.hpp"
@ -10,13 +10,12 @@ using std::cout;
using std::endl;
// Query: MERGE (g1:garment {garment_id:1234})-[r:default_outfit]-(g2:garment {garment_id: 2345}) RETURN r
// Hash: 3782642357973971504
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -60,15 +59,15 @@ public:
return t.commit();
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,7 +2,7 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
@ -11,13 +11,12 @@ using std::cout;
using std::endl;
// Query: MERGE (p:profile {profile_id: 111, partner_id: 55})-[s:score]-(g.garment {garment_id: 1234}) SET s.score=1500 RETURN s
// Hash: 7871009397157280694
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
DbAccessor t(db);
@ -92,15 +91,15 @@ public:
return t.commit();
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -2,33 +2,34 @@
#include <string>
#include "query/util.hpp"
#include "query/i_plan_cpu.hpp"
#include "query/plan_interface.hpp"
#include "storage/model/properties/all.hpp"
#include "storage/edge_x_vertex.hpp"
#include "using.hpp"
using std::cout;
using std::endl;
// query:
// Query:
class CodeCPU : public IPlanCPU<Stream>
class CPUPlan : public PlanInterface<Stream>
{
public:
bool run(Db &db, plan_args_t &args, Stream &stream) override
bool run(Db &db, const PlanArgsT &args, Stream &stream) override
{
}
~CodeCPU() {}
~CPUPlan() {}
};
extern "C" IPlanCPU<Stream>* produce()
extern "C" PlanInterface<Stream>* produce()
{
return new CodeCPU();
return new CPUPlan();
}
extern "C" void destruct(IPlanCPU<Stream>* p)
extern "C" void destruct(PlanInterface<Stream>* p)
{
delete p;
}

View File

@ -1,5 +1,8 @@
#pragma once
#include "communication/communication.hpp"
// #include "communication/communication.hpp"
// using Stream = communication::OutputStream;
using Stream = communication::OutputStream;
// TODO: modular
#include "../stream/print_record_stream.hpp"
using Stream = PrintRecordStream;

View File

@ -1,18 +1,18 @@
// TODO: refactor (backlog task)
#include <random>
#include "_hardcoded_query/basic.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "query/preprocesor.hpp"
#include "query/strip/stripper.hpp"
#include "query/preprocessor.hpp"
#include "query/stripper.hpp"
#include "storage/indexes/indexes.hpp"
#include "utils/assert.hpp"
#include "utils/signals/handler.hpp"
#include "utils/stacktrace/log.hpp"
#include "utils/sysinfo/memory.hpp"
QueryPreprocessor preprocessor;
// Returns uniform random size_t generator from range [0,n>
auto rand_gen(size_t n)
{
@ -24,6 +24,7 @@ auto rand_gen(size_t n)
void run(size_t n, std::string &query, Db &db)
{
auto qf = hardcode::load_basic_functions(db);
QueryPreprocessor preprocessor;
auto stripped = preprocessor.preprocess(query);
logging::info("Running query [{}] x {}.", stripped.hash, n);
@ -42,6 +43,7 @@ void add_edge(size_t n, Db &db)
std::string query = "MATCH (n1), (n2) WHERE ID(n1)=0 AND "
"ID(n2)=1 CREATE (n1)<-[r:IS {age: "
"25,weight: 70}]-(n2) RETURN r";
QueryPreprocessor preprocessor;
auto stripped = preprocessor.preprocess(query);
logging::info("Running query [{}] (add edge) x {}", stripped.hash, n);
@ -196,7 +198,7 @@ bool equal(Db &a, Db &b)
int main(void)
{
logging::init_async();
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
SignalHandler::register_handler(Signal::SegmentationFault, []() {

View File

@ -1,75 +0,0 @@
#include "communication/bolt/v1/serialization/bolt_serializer.hpp"
#include "database/db.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "_hardcoded_query/basic.hpp"
#include "_hardcoded_query/dressipi.hpp"
#include "query/strip/stripper.hpp"
#include "utils/string/file.hpp"
#include "utils/variadic/variadic.hpp"
#include "utils/command_line/arguments.hpp"
#include "stream/print_record_stream.hpp"
Logger logger;
int main(int argc, char *argv[])
{
auto arguments = all_arguments(argc, argv);
PrintRecordStream stream(std::cout);
// POSSIBILITIES: basic, dressipi
auto suite_name = get_argument(arguments, "-s", "basic");
// POSSIBILITIES: query_execution, hash_generation
auto work_mode = get_argument(arguments, "-w", "query_execution");
// POSSIBILITIES: mg_basic.txt, dressipi_basic.txt, dressipi_graph.txt
auto query_set_filename = get_argument(arguments, "-q", "mg_basic.txt");
// init logging
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
auto log = logging::log->logger("test");
// init db, functions and stripper
Db db;
hardcode::query_functions_t query_functions;
if (suite_name == "dressipi")
{
query_functions = std::move(hardcode::load_dressipi_functions(db));
}
else
{
query_functions = std::move(hardcode::load_basic_functions(db));
}
auto stripper = make_query_stripper(TK_LONG, TK_FLOAT, TK_STR, TK_BOOL);
// load quries
std::string file_path = "data/queries/core/" + query_set_filename;
auto queries = utils::read_lines(file_path.c_str());
// execute all queries
for (auto &query : queries)
{
if (query.empty())
continue;
utils::println("");
utils::println("Query: ", query);
auto stripped = stripper.strip(query);
utils::println("Hash: ", stripped.hash);
utils::println("------------------------");
// TODO: more robust solution (enum like)
if (work_mode == "hash_generation") continue;
auto result =
query_functions[stripped.hash](std::move(stripped.arguments));
permanent_assert(result == true,
"Result retured from query function is not true");
utils::println("------------------------");
}
return 0;
}

View File

@ -0,0 +1,35 @@
#include "query_engine_common.hpp"
using namespace std::chrono_literals;
using namespace tests::integration;
Logger logger;
/**
* IMPORTANT: tests only compilation and executability of implemented
* hard code queries (not correctnes of implementation)
*
* NOTE: The correctnes can be tested by custom Stream object.
* NOTE: This test will be usefull to test generated query plans.
*/
int main(int argc, char *argv[])
{
/**
* init arguments
*/
REGISTER_ARGS(argc, argv);
/**
* init engine
*/
auto log = init_logging("IntegrationQueryEngine");
Db db;
StreamT stream(std::cout);
QueryEngineT query_engine;
// IMPORTANT: PrintRecordStream can be replaces with a smarter
// object that can test the results
WarmUpEngine(log, query_engine, db, stream);
return 0;
}

View File

@ -0,0 +1,179 @@
#pragma once
#include <experimental/filesystem>
#include <set>
namespace fs = std::experimental::filesystem;
#include "database/db.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.cpp"
#include "query/engine.hpp"
#include "query/preprocessor.hpp"
#include "stream/print_record_stream.hpp"
#include "utils/command_line/arguments.hpp"
#include "utils/file.hpp"
#include "utils/string/file.hpp"
#include "utils/string/trim.hpp"
namespace tests
{
namespace integration
{
using namespace utils;
using QueryHashesT = std::set<QueryPreprocessor::HashT>;
using QueryEngineT = QueryEngine<PrintRecordStream>;
using StreamT = PrintRecordStream;
/**
* Init logging for tested query_engine (test specific logger). It has to be
* sync (easier debugging).
*
* @param logger_name the name of a logger
*
* @return logger instance
*/
auto init_logging(const std::string &logger_name)
{
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
return logging::log->logger(logger_name);
}
/**
* Get query hashes from the file defied with a path.
*
* @param log external logger because this function should be called
* from test binaries
* @param path path to a file with queries
*
* @return a set with all query hashes from the file
*/
auto LoadQueryHashes(Logger &log, const fs::path &path)
{
log.info("*** Get query hashes from the file defied with path ***");
// the intention of following block is to get all hashes
// for which query implementations have to be compiled
// calculate all hashes from queries file
QueryPreprocessor preprocessor;
// hashes calculated from all queries in queries file
QueryHashesT query_hashes;
// fill the above set
auto queries = utils::read_lines(path);
for (auto &query : queries)
{
if (query.empty()) continue;
query_hashes.insert(preprocessor.preprocess(query).hash);
}
permanent_assert(query_hashes.size() > 0,
"At least one hash has to be present");
log.info("{} different query hashes exist", query_hashes.size());
return query_hashes;
}
/**
* Loads query plans into the engine passed by reference.
*
* @param log external logger reference
* @param engine query engine
* @param query_hashes hashes for which plans have to be loaded
* @param path to a folder with query plan implementations
*
* @return void
*/
auto LoadQueryPlans(Logger &log, QueryEngineT &engine,
const QueryHashesT &query_hashes, const fs::path &path)
{
log.info("*** Load/compile needed query implementations ***");
QueryPreprocessor preprocessor;
auto plan_paths = LoadFilePaths(path, "cpp");
// query mark will be used to extract queries from files (because we want
// to be independent to a query hash)
auto query_mark = std::string("// Query: ");
for (auto &plan_path : plan_paths)
{
auto lines = read_lines(plan_path);
// find the line with a query in order
// be able to place it in the dynamic libs container (base on query
// hash)
for (auto &line : lines)
{
// find query in the line
auto pos = line.find(query_mark);
// if query doesn't exist pass
if (pos == std::string::npos) continue;
auto query = trim(line.substr(pos + query_mark.size()));
// load/compile implementations only for the queries which are
// contained in queries_file
// it doesn't make sense to compile something which won't be runned
if (query_hashes.find(preprocessor.preprocess(query).hash) ==
query_hashes.end())
continue;
log.info("Path {} will be loaded.", plan_path.c_str());
engine.ReloadCustom(query, plan_path);
break;
}
}
permanent_assert(query_hashes.size() == engine.Size(),
"Query engine doesn't contain appropriate number of query "
"implementations");
}
/**
* Executa all query plans in file on the path.
*
* @param log external logger reference
* @param engine query engine
* @param db a database agains the query plans are going to be executed
* @param path path a queries file
* @param stream used by query plans to output the results
*
* @return void
*/
auto ExecuteQueryPlans(Logger &log, QueryEngineT &engine, Db &db,
const fs::path &path, StreamT &stream)
{
log.info("*** Execute the queries from the queries_file ***");
// execute all queries from queries_file
auto queries = utils::read_lines(path);
for (auto &query : queries)
{
if (query.empty()) continue;
permanent_assert(engine.Loaded(trim(query)),
"Implementation wasn't loaded");
engine.Run(query, db, stream);
}
}
/**
* Warms Up the engine. Loads and executes query plans specified by the program
* arguments:
* -q -> a file with queries
* -i -> a folder with query plans
*
* @param log external logger reference
* @param engine query engine
* @param db a database agains the query plans are going to be executed
* @param stream used by query plans to output the results
*
* @return void
*/
auto WarmUpEngine(Logger &log, QueryEngineT &engine, Db &db, StreamT &stream)
{
// path to a file with queries
auto queries_file = fs::path(
GET_ARG("-q", "../data/queries/core/mg_basic_000.txt").get_string());
// forlder with query implementations
auto implementations_folder =
fs::path(GET_ARG("-i", "../integration/hardcoded_query").get_string());
// load all query hashes from queries file
auto query_hashes = LoadQueryHashes(log, queries_file);
// load compile all needed query plans
LoadQueryPlans(log, engine, query_hashes, implementations_folder);
// execute all loaded query plasn
ExecuteQueryPlans(log, engine, db, queries_file, stream);
}
}
}

View File

@ -1,18 +1,18 @@
// TODO: refactor (backlog task)
#include <random>
#include "_hardcoded_query/basic.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "query/preprocesor.hpp"
#include "query/strip/stripper.hpp"
#include "query/preprocessor.hpp"
#include "query/stripper.hpp"
#include "storage/indexes/indexes.hpp"
#include "utils/assert.hpp"
#include "utils/signals/handler.hpp"
#include "utils/stacktrace/log.hpp"
#include "utils/sysinfo/memory.hpp"
QueryPreprocessor preprocessor;
// Returns uniform random size_t generator from range [0,n>
auto rand_gen(size_t n)
{
@ -24,6 +24,7 @@ auto rand_gen(size_t n)
void run(size_t n, std::string &query, Db &db)
{
auto qf = hardcode::load_basic_functions(db);
QueryPreprocessor preprocessor;
auto stripped = preprocessor.preprocess(query);
logging::info("Running query {} [{}] x {}.", query, stripped.hash, n);
@ -42,6 +43,7 @@ void add_edge(size_t n, Db &db)
std::string query = "MATCH (n1), (n2) WHERE ID(n1)=0 AND "
"ID(n2)=1 CREATE (n1)<-[r:IS {age: "
"25,weight: 70}]-(n2) RETURN r";
QueryPreprocessor preprocessor;
auto stripped = preprocessor.preprocess(query);
logging::info("Running query {} [{}] x {}.", query, stripped.hash, n);
@ -178,7 +180,7 @@ bool equal(Db &a, Db &b)
int main(void)
{
logging::init_async();
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
SignalHandler::register_handler(Signal::SegmentationFault, []() {

View File

@ -3,6 +3,7 @@
#include <string>
#include <vector>
#include <map>
#include <iostream>
#include "utils/exceptions/not_yet_implemented.hpp"
@ -21,27 +22,27 @@ public:
void write_success_empty()
{
stream << "SUCCESS EMPTY\n";
stream << "SUCCESS EMPTY\n";
}
void write_ignored()
{
stream << "IGNORED\n";
stream << "IGNORED\n";
}
void write_empty_fields()
{
stream << "EMPTY FIELDS\n";
stream << "EMPTY FIELDS\n";
}
void write_fields(const std::vector<std::string> &fields)
{
stream << "FIELDS:";
for (auto &field : fields)
{
stream << " " << field;
}
stream << '\n';
stream << "FIELDS:";
for (auto &field : fields)
{
stream << " " << field;
}
stream << '\n';
}
void write_field(const std::string &field)
@ -61,7 +62,7 @@ public:
void write_meta(const std::string &type)
{
stream << "Meta: " << type;
stream << "Meta: " << type << std::endl;
}
void write_failure(const std::map<std::string, std::string> &data)
@ -81,7 +82,8 @@ public:
void write_vertex_record(const VertexAccessor& va)
{
throw NotYetImplemented();
va.stream_repr(stream);
stream << std::endl;
}
void write(const EdgeAccessor &edge)
@ -96,13 +98,11 @@ public:
void write(const StoredProperty<TypeGroupEdge> &prop)
{
// prop.accept(serializer);
throw NotYetImplemented();
}
void write(const StoredProperty<TypeGroupVertex> &prop)
{
// prop.accept(serializer);
throw NotYetImplemented();
}

View File

@ -1,3 +1,10 @@
/**
* DEPRICATED!
*
* TODO: print AST (just for one query) using Antlr's visitor or listener
* the new file name should be opencypher_ast.cpp
*/
#include <cstdlib>
#include <vector>
#include <vector>

View File

@ -1,3 +1,9 @@
/**
* DEPRICATED!
*
* TODO: remove once when Antlr will be integrated
*/
#include <iostream>
#include <cassert>
#include <fstream>

View File

@ -1,52 +1,72 @@
#include <iostream>
#include "../integration/query_engine_common.hpp"
#define DEBUG 1
#include "utils/fswatcher.hpp"
#include "communication/communication.hpp"
#include "query/language/cypher/common.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
#include "query/engine.hpp"
#include "utils/command_line/arguments.hpp"
#include "utils/terminate_handler.hpp"
#include "utils/time/timer.hpp"
using namespace std::chrono_literals;
using namespace tests::integration;
using std::cout;
using std::endl;
using std::cin;
int main(void)
int main(int argc, char *argv[])
{
std::set_terminate(&terminate_handler);
// init arguments
REGISTER_ARGS(argc, argv);
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
// forlder with query implementations
auto implementations_folder = fs::path(
GET_ARG("-i", "tests/integration/hardcoded_query").get_string());
// init engine
auto log = init_logging("ManualQueryEngine");
Db db;
// TODO: write dummy socket that is going to execute test
using stream_t = bolt::RecordStream<CoutSocket>;
CoutSocket socket;
stream_t stream(socket);
QueryEngine<stream_t> engine;
StreamT stream(std::cout);
QueryEngineT query_engine;
// IMPORTANT: PrintRecordStream can be replaces with a smarter
// object that can test the results
cout << "-- Memgraph query engine --" << endl;
WarmUpEngine(log, query_engine, db, stream);
while (true) {
// read command
cout << "> ";
std::string command;
std::getline(cin, command);
if (command == "quit") break;
// init watcher
FSWatcher watcher;
QueryPreprocessor preprocessor;
// execute command
try {
engine.execute(command, db, stream);
} catch (const std::exception &e) {
cout << e.what() << endl;
} catch (const QueryEngineException &e) {
cout << e.what() << endl;
}
}
int i = 0;
watcher.watch(
WatchDescriptor(implementations_folder, FSEventType::CloseNowrite),
[&](FSEvent event) {
i++; // bacause only close_no_write could be detected and this
// call will cause close no write again
if (i % 2 == 1)
{
// take only cpp files
if (event.path.extension() != ".cpp")
return;
auto query_mark = std::string("// Query: ");
auto lines = read_lines(event.path);
for (auto &line : lines)
{
auto pos = line.find(query_mark);
if (pos == std::string::npos) continue;
auto query = line.substr(pos + query_mark.size());
log.info("Reload: {}", query);
query_engine.Unload(query);
try {
query_engine.ReloadCustom(query, event.path);
query_engine.Run(query, db, stream);
} catch (PlanCompilationException& e) {
log.info("Query compilation failed: {}", e.what());
} catch (std::exception& e) {
log.info("Query execution failed: unknown reason");
}
log.info("Number of available query plans: {}", query_engine.Size());
}
}
});
// TODO: watcher for injected query
std::this_thread::sleep_for(1000s);
watcher.stop();
return 0;
}

View File

@ -0,0 +1,42 @@
#include <iostream>
#include <vector>
#include "query/language/cypher/common.hpp"
#include "query/preprocessor.hpp"
#include "utils/command_line/arguments.hpp"
#include "utils/type_discovery.hpp"
#include "utils/variadic/variadic.hpp"
#include "utils/string/file.hpp"
using utils::println;
/**
* Useful when somebody wants to get a hash for some query.
*
* Usage:
* ./query_hash -q "CREATE (n {name: \"test\n"}) RETURN n"
*/
int main(int argc, char **argv)
{
// init args
REGISTER_ARGS(argc, argv);
// take query from input args
auto query = GET_ARG("-q", "CREATE (n) RETURN n").get_string();
// run preprocessing
QueryPreprocessor preprocessor;
auto preprocessed = preprocessor.preprocess(query);
// print query, stripped query, hash and variable values (propertie values)
println("Query: ", query);
println("Stripped query: ", preprocessed.query);
println("Query hash: ", preprocessed.hash);
println("Property values:");
for (auto property : preprocessed.arguments) {
println(" ", property);
}
println("");
return 0;
}

View File

@ -1,35 +0,0 @@
#include <iostream>
#include "query/language/cypher/common.hpp"
#include "query/preprocesor.hpp"
#include "utils/command_line/arguments.hpp"
#include "utils/type_discovery.hpp"
#include "utils/variadic/variadic.hpp"
using utils::println;
int main(int argc, char **argv)
{
// arguments parsing
auto arguments = all_arguments(argc, argv);
// query extraction
auto queries = extract_queries(arguments);
QueryPreprocessor preprocessor;
for (auto &query : queries)
{
auto preprocessed = preprocessor.preprocess(query);
println("QUERY: ", query);
println("STRIPPED QUERY: ", preprocessed.query);
println("QUERY HASH: ", preprocessed.hash);
println("PROPERTIES:");
for (auto property : preprocessed.arguments) {
println(" ", property);
}
println("-----------------------------");
}
return 0;
}