Merged with dev.

This commit is contained in:
Kruno Tomola Fabro 2016-08-28 21:16:36 +01:00
commit eaf2025cab
27 changed files with 222 additions and 138 deletions

View File

@ -54,7 +54,7 @@ public:
bolt_encoder.write_string(name);
}
flush();
chunk();
}
void write_field(const std::string& field)
@ -64,7 +64,7 @@ public:
bolt_encoder.write_string("fields");
write_list_header(1);
bolt_encoder.write_string(field);
flush();
chunk();
}
void write_list_header(size_t size)
@ -76,6 +76,20 @@ public:
{
bolt_encoder.message_record();
}
// writes metadata at the end of the message
// TODO: write whole implementation (currently, only type is supported)
// { "stats": { "nodes created": 1, "properties set": 1},
// "type": "r" | "rw" | ...
void write_meta(const std::string& type)
{
bolt_encoder.message_success();
bolt_encoder.write_map_header(1);
bolt_encoder.write_string("type");
bolt_encoder.write_string(type);
chunk();
}
// -- BOLT SPECIFIC METHODS -----------------------------------------------
void write(const VertexAccessor &vertex) { serializer.write(vertex); }
@ -89,12 +103,16 @@ public:
void write(const Double& prop) { serializer.write(prop); }
void write(const String& prop) { serializer.write(prop); }
void flush()
void send()
{
chunked_encoder.flush();
chunked_buffer.flush();
}
void chunk()
{
chunked_encoder.write_chunk();
}
void _write_test()
{
logger.trace("write_test");
@ -116,15 +134,17 @@ protected:
Logger logger;
private:
using buffer_t = ChunkedBuffer<SocketStream>;
using socket_t = SocketStream<Socket>;
using buffer_t = ChunkedBuffer<socket_t>;
using chunked_encoder_t = ChunkedEncoder<buffer_t>;
using bolt_encoder_t = BoltEncoder<chunked_encoder_t>;
using bolt_serializer_t = BoltSerializer<bolt_encoder_t>;
SocketStream socket;
socket_t socket;
buffer_t chunked_buffer{socket};
chunked_encoder_t chunked_encoder{chunked_buffer};
bolt_encoder_t bolt_encoder{chunked_encoder};
bolt_serializer_t serializer{bolt_encoder};
};
}

View File

@ -38,7 +38,7 @@ protected:
void discard_all(Session& session);
private:
QueryEngine query_engine;
QueryEngine<communication::OutputStream> query_engine;
};

View File

@ -28,11 +28,6 @@ public:
logger = logging::log->logger("Bolt Encoder");
}
void flush()
{
stream.flush();
}
void write(byte value)
{
write_byte(value);

View File

@ -24,51 +24,42 @@ public:
void write(const byte *values, size_t n)
{
// TODO: think about shared pointer
// TODO: this is naive implementation, it can be implemented much better
logger.trace("Write {} bytes", n);
logger.trace("write {} bytes", n);
byte *chunk = chunk = (byte *)std::malloc(n * sizeof(byte));
last_size = n;
// total size of the buffer is now bigger for n
size += n;
std::memcpy(chunk, values, n);
// reserve enough spece for the new data
buffer.reserve(size);
buffer.push_back(chunk);
// copy new data
std::copy(values, values + n, std::back_inserter(buffer));
}
void flush()
{
logger.trace("Flush");
stream.get().write(&buffer.front(), size);
for (size_t i = 0; i < buffer.size(); ++i) {
if (i == buffer.size() - 1)
stream.get().write(buffer[i], last_size);
else
stream.get().write(buffer[i], C);
}
logger.trace("Flushed {} bytes", size);
destroy();
// GC
// TODO: impelement a better strategy
buffer.clear();
// reset size
size = 0;
}
~ChunkedBuffer()
{
destroy();
}
private:
Logger logger;
// every new stream.write creates new TCP package
std::reference_wrapper<Stream> stream;
std::vector<byte *> buffer;
size_t last_size {0}; // last chunk size (it is going to be less than C)
void destroy()
{
for (size_t i = 0; i < buffer.size(); ++i) {
std::free(buffer[i]);
}
buffer.clear();
}
std::vector<byte> buffer;
size_t size {0};
};
}

View File

@ -29,7 +29,7 @@ public:
void write(byte value)
{
if (UNLIKELY(pos == N)) end_chunk();
if (UNLIKELY(pos == N)) write_chunk();
chunk[pos++] = value;
}
@ -46,11 +46,13 @@ public:
pos += size;
n -= size;
if (pos == N) end_chunk();
// TODO: see how bolt splits message over more TCP packets,
// test for more TCP packets
if (pos == N) write_chunk();
}
}
void flush()
void write_chunk()
{
write_chunk_header();
@ -58,7 +60,7 @@ public:
chunk[pos++] = 0x00;
chunk[pos++] = 0x00;
flush_stream();
flush();
}
private:
@ -68,11 +70,6 @@ private:
std::array<byte, C> chunk;
size_t pos{2};
void end_chunk()
{
flush();
}
void write_chunk_header()
{
// write the size of the chunk
@ -85,7 +82,7 @@ private:
chunk[1] = size & 0xFF;
}
void flush_stream()
void flush()
{
// write chunk to the stream
stream.get().write(chunk.data(), pos);

View File

@ -10,12 +10,13 @@
namespace bolt
{
template <typename Stream>
class SocketStream
{
public:
using byte = uint8_t;
SocketStream(io::Socket& socket) : socket(socket) {}
SocketStream(Stream& socket) : socket(socket) {}
void write(const byte* data, size_t n)
{
@ -32,7 +33,7 @@ public:
}
private:
std::reference_wrapper<io::Socket> socket;
std::reference_wrapper<Stream> socket;
};
}

View File

@ -13,9 +13,11 @@ template<typename T>
class DynamicLib
{
private:
// IMPORTANT: all dynamic libraries must have produce and destruct methods!
const std::string produce_name = "produce";
const std::string destruct_name = "destruct";
using produce_t = typename T::produce;
using destruct_t = typename T::destruct;
std::atomic<uint8_t> counter;
public:
produce_t produce_method;
@ -69,7 +71,7 @@ private:
{
produce_method = (produce_t) dlsym(
dynamic_lib,
T::produce_name.c_str()
produce_name.c_str()
);
const char* dlsym_error = dlerror();
if (dlsym_error) {
@ -81,7 +83,7 @@ private:
{
destruct_method = (destruct_t) dlsym(
dynamic_lib,
T::destruct_name.c_str()
destruct_name.c_str()
);
const char *dlsym_error = dlerror();
if (dlsym_error) {

View File

@ -47,8 +47,11 @@ public:
in_file, // input file
"-o", out_file, // ouput file
"-I./include", // include paths (TODO: parameter)
"-I../include",
"-I../libs/fmt", // TODO: load from config
"-L./ -lmemgraph_pic",
"-I../../libs/fmt",
"-L./ -L../",
"-lmemgraph_pic",
"-shared -fPIC" // shared library flags
);

View File

@ -8,9 +8,11 @@
#include "traverser/cpp_traverser.hpp"
#include "utils/string/file.hpp"
#include "logging/default.hpp"
#include "utils/type_discovery.hpp"
using std::string;
template <typename Stream>
class CodeGenerator
{
public:
@ -53,6 +55,7 @@ public:
template_file, {{"class_name", "CodeCPU"},
{"stripped_hash", std::to_string(stripped_hash)},
{"query", query},
{"stream", type_name<Stream>().to_string()},
{"code", cpp_traverser.code}});
// logger.trace("generated code: {}", generated);

View File

@ -30,6 +30,7 @@ auto update_properties(const CypherStateData &cypher_state,
auto index =
action_data.parameter_index.at(ParameterIndexKey(name, property));
auto tmp_name = name::unique();
// TODO: ERROR! why
code += code_line((cypher_state.type(name) == EntityType::Node
? code::vertex_property_key
: code::edge_property_key),

View File

@ -161,7 +161,6 @@ auto load_queries(Db &db)
t.commit();
return true;
};
queries[15648836733456301916u] = create_edge_v2;
// MATCH (n) RETURN n
auto match_all_nodes = [&db](const properties_t &args) {
@ -169,18 +168,15 @@ auto load_queries(Db &db)
iter::for_all(t.vertex_access(), [&](auto vertex) {
if (vertex.fill()) {
// cout_properties(vertex->data.props);
cout << vertex.id() << endl;
}
});
t.commit();
return true;
return t.commit(), true;
};
queries[15284086425088081497u] = match_all_nodes;
// MATCH (n:LABEL) RETURN n
auto find_by_label = [&db](const properties_t &args) {
auto match_by_label = [&db](const properties_t &args) {
DbAccessor t(db);
auto &label = t.label_find_or_create("LABEL");
@ -190,10 +186,12 @@ auto load_queries(Db &db)
iter::for_all(label.index().for_range(t),
[&](auto a) { cout << a.at(prop_key) << endl; });
return true;
return t.commit(), true;
};
queries[4857652843629217005u] = find_by_label;
queries[15284086425088081497u] = match_all_nodes;
queries[4857652843629217005u] = match_by_label;
queries[15648836733456301916u] = create_edge_v2;
queries[10597108978382323595u] = create_account;
queries[5397556489557792025u] = create_labeled_and_named_node;
queries[7939106225150551899u] = create_edge;

View File

@ -5,13 +5,17 @@
#include "database/db_accessor.hpp"
#include "query_engine/query_stripped.hpp"
template <typename Stream>
class ICodeCPU
{
public:
virtual bool run(Db &db, code_args_t &args,
communication::OutputStream &stream) = 0;
Stream &stream) = 0;
virtual ~ICodeCPU() {}
};
using produce_t = ICodeCPU *(*)();
using destruct_t = void (*)(ICodeCPU *);
template <typename Stream>
using produce_t = ICodeCPU<Stream> *(*)();
template <typename Stream>
using destruct_t = void (*)(ICodeCPU<Stream> *);

View File

@ -6,18 +6,16 @@
namespace
{
template <typename Stream>
class MemgraphDynamicLib
{
public:
const static std::string produce_name;
const static std::string destruct_name;
using produce = produce_t;
using destruct = destruct_t;
using lib_object = ICodeCPU;
using produce = produce_t<Stream>;
using destruct = destruct_t<Stream>;
using lib_object = ICodeCPU<Stream>;
};
const std::string MemgraphDynamicLib::produce_name = "produce";
const std::string MemgraphDynamicLib::destruct_name = "destruct";
using CodeLib = DynamicLib<MemgraphDynamicLib>;
template <typename Stream>
using CodeLib = DynamicLib<MemgraphDynamicLib<Stream>>;
}

View File

@ -11,11 +11,14 @@
// execution
// postprocess the results
template <typename Stream>
class ProgramExecutor
{
public:
auto execute(QueryProgram &program, Db &db,
communication::OutputStream &stream)
// QueryProgram has to know about the Stream
// Stream has to be passed in this function for every execution
auto execute(QueryProgram<Stream> &program, Db &db,
Stream &stream)
{
try {
// TODO: return result of query/code exection

View File

@ -15,10 +15,13 @@
using std::string;
template <typename Stream>
class ProgramLoader
{
public:
using sptr_code_lib = std::shared_ptr<CodeLib>;
using code_lib_t = CodeLib<Stream>;
using sptr_code_lib = std::shared_ptr<code_lib_t>;
using query_program_t = QueryProgram<Stream>;
ProgramLoader() :
logger(logging::log->logger("ProgramLoader")),
@ -40,7 +43,7 @@ public:
// instance
if (code_lib_iter != code_libs.end()) {
auto code = code_lib_iter->second->instance();
return QueryProgram(code, std::move(stripped));
return query_program_t(code, std::move(stripped));
}
// code has to be generated, compiled and loaded
@ -59,7 +62,7 @@ public:
code_libs.insert({{stripped.hash, code_lib}});
// return an instance of runnable code (ICodeCPU)
return QueryProgram(code_lib->instance(), std::move(stripped));
return query_program_t(code_lib->instance(), std::move(stripped));
}
protected:
@ -74,12 +77,12 @@ private:
// uint64_t depends on fnv function
std::unordered_map<uint64_t, sptr_code_lib> code_libs;
CodeGenerator code_generator;
CodeGenerator<Stream> code_generator;
CodeCompiler code_compiler;
sptr_code_lib load_code_lib(const string &path)
{
sptr_code_lib code_lib = std::make_shared<CodeLib>(path);
sptr_code_lib code_lib = std::make_shared<CodeLib<Stream>>(path);
code_lib->load();
return code_lib;
}

View File

@ -12,13 +12,16 @@
* -> [code_compiler] -> code_executor
*/
// query engine has to be aware of the Stream because Stream
// is passed to the dynamic shared library
template <typename Stream>
class QueryEngine
{
public:
QueryEngine() : logger(logging::log->logger("QueryEngine")) {}
auto execute(const std::string &query, Db &db,
communication::OutputStream &stream)
Stream &stream)
{
try {
auto program = program_loader.load(query);
@ -41,6 +44,6 @@ protected:
Logger logger;
private:
ProgramExecutor program_executor;
ProgramLoader program_loader;
ProgramExecutor<Stream> program_executor;
ProgramLoader<Stream> program_loader;
};

View File

@ -3,9 +3,12 @@
#include "query_engine/i_code_cpu.hpp"
#include "query_engine/query_stripped.hpp"
template <typename Stream>
struct QueryProgram
{
QueryProgram(ICodeCPU *code, QueryStripped &&stripped)
using code_t = ICodeCPU<Stream>;
QueryProgram(code_t *code, QueryStripped &&stripped)
: code(code), stripped(std::forward<QueryStripped>(stripped))
{
}
@ -13,6 +16,6 @@ struct QueryProgram
QueryProgram(QueryProgram &other) = delete;
QueryProgram(QueryProgram &&other) = default;
ICodeCPU *code;
code_t *code;
QueryStripped stripped;
};

View File

@ -55,7 +55,8 @@ const std::string write_entity = "stream.write_field(\"{0}\");\n"
" stream.write_record();\n"
" stream.write_list_header(1);\n"
" stream.write({0});\n"
" stream.write_success_empty();\n";
" stream.chunk();"
" stream.write_meta(\"rw\");\n";
const std::string return_true = "return true;";

View File

@ -254,6 +254,10 @@ public:
}
Traverser::visit(ast_node);
if (state == CypherState::Create) {
cypher_data.node_created(name);
}
}
void visit(ast::And &ast_and) override { Traverser::visit(ast_and); }

View File

@ -2,11 +2,14 @@
#include <iostream>
#include <string>
#include <sstream>
#include "fmt/format.h"
#include "storage/model/properties/properties.hpp"
#include "storage/model/properties/traversers/consolewriter.hpp"
#include "storage/model/properties/traversers/jsonwriter.hpp"
#include "utils/types/byte.hpp"
#include "logging/default.hpp"
using std::cout;
using std::endl;
@ -43,3 +46,36 @@ std::string code_line(const std::string &format_str, const Args &... args)
return "\t" + format(format_str, args...) + "\n";
}
}
class CoutSocket
{
public:
CoutSocket() : logger(logging::log->logger("Cout Socket")) {}
int write(const std::string& str)
{
logger.info(str);
return str.size();
}
int write(const char* data, size_t len)
{
logger.info(std::string(data, len));
return len;
}
int write(const byte* data, size_t len)
{
std::stringstream ss;
for (int i = 0; i < len; i++) {
ss << data[i];
}
std::string output(ss.str());
cout << output << endl;
logger.info(output);
return len;
}
private:
Logger logger;
};

View File

@ -69,6 +69,11 @@ public:
{
return n < sz_ ? p_[n] : throw std::out_of_range("static_string");
}
std::string to_string()
{
return std::string(p_, sz_);
}
};
inline

View File

@ -12,7 +12,8 @@ State* Error::run(Session& session)
// TODO reset current statement? is it even necessary?
session.output_stream.write_success_empty();
session.output_stream.flush();
session.output_stream.chunk();
session.output_stream.send();
return session.bolt.states.executor.get();
}
@ -22,14 +23,16 @@ State* Error::run(Session& session)
// discard all records waiting to be sent
session.output_stream.write_success_empty();
session.output_stream.flush();
session.output_stream.chunk();
session.output_stream.send();
return session.bolt.states.executor.get();
}
// TODO: write this as single call
session.output_stream.write_ignored();
session.output_stream.flush();
session.output_stream.chunk();
session.output_stream.send();
return this;
}

View File

@ -65,7 +65,7 @@ void Executor::pull_all(Session& session)
{
logger.trace("[PullAll]");
session.output_stream.flush();
session.output_stream.send();
}
void Executor::discard_all(Session& session)
@ -75,7 +75,8 @@ void Executor::discard_all(Session& session)
// TODO: discard state
session.output_stream.write_success();
session.output_stream.flush();
session.output_stream.chunk();
session.output_stream.send();
}
}

View File

@ -46,7 +46,8 @@ State* Init::execute(Session& session, Message& message)
logger.debug("Client connected '{}'", message.client_name);
session.output_stream.write_success_empty();
session.output_stream.flush();
session.output_stream.chunk();
session.output_stream.send();
return session.bolt.states.executor.get();
}

View File

@ -10,12 +10,12 @@ using std::endl;
// query: {{query}}
class {{class_name}} : public ICodeCPU
class {{class_name}} : public ICodeCPU<{{stream}}>
{
public:
bool run(Db& db, code_args_t& args,
communication::OutputStream& stream) override
bool run(Db &db, code_args_t &args,
{{stream}} &stream) override
{
{{code}}
}
@ -24,12 +24,12 @@ public:
};
extern "C" ICodeCPU* produce()
extern "C" ICodeCPU<{{stream}}>* produce()
{
return new {{class_name}}();
}
extern "C" void destruct(ICodeCPU* p)
extern "C" void destruct(ICodeCPU<{{stream}}>* p)
{
delete p;
}

View File

@ -8,6 +8,8 @@
#include "utils/time/timer.hpp"
#include "utils/terminate_handler.hpp"
#include "communication/communication.hpp"
#include "logging/default.hpp"
#include "logging/streams/stdout.hpp"
using std::cout;
using std::endl;
@ -17,11 +19,15 @@ int main(void)
{
std::set_terminate(&terminate_handler);
logging::init_sync();
logging::log->pipe(std::make_unique<Stdout>());
Db db;
QueryEngine engine;
// TODO: write dummy socket that is going to execute test
io::Socket socket;
communication::OutputStream stream(socket);
using stream_t = bolt::RecordStream<CoutSocket>;
CoutSocket socket;
stream_t stream(socket);
QueryEngine<stream_t> engine;
cout << "-- Memgraph query engine --" << endl;

View File

@ -61,52 +61,54 @@ void check_ff(DummyStream &stream, size_t n)
int main(void)
{
logging::init_async();
logging::log->pipe(std::make_unique<Stdout>());
DummyStream stream;
bolt::ChunkedEncoder<DummyStream> encoder(stream);
// TODO: write new test
// logging::init_async();
// logging::log->pipe(std::make_unique<Stdout>());
// DummyStream stream;
// bolt::ChunkedEncoder<DummyStream> encoder(stream);
write_ff(encoder, 10);
write_ff(encoder, 10);
encoder.flush();
// write_ff(encoder, 10);
// write_ff(encoder, 10);
// encoder.flush();
write_ff(encoder, 10);
write_ff(encoder, 10);
encoder.flush();
// write_ff(encoder, 10);
// write_ff(encoder, 10);
// encoder.flush();
// this should be two chunks, one of size 65533 and the other of size 1467
write_ff(encoder, 67000);
encoder.flush();
// // this should be two chunks, one of size 65533 and the other of size 1467
// write_ff(encoder, 67000);
// encoder.flush();
for (int i = 0; i < 10000; ++i)
write_ff(encoder, 1500);
encoder.flush();
// for (int i = 0; i < 10000; ++i)
// write_ff(encoder, 1500);
// encoder.flush();
assert(stream.pop_size() == 20);
check_ff(stream, 20);
assert(stream.pop_size() == 0);
// assert(stream.pop_size() == 20);
// check_ff(stream, 20);
// assert(stream.pop_size() == 0);
assert(stream.pop_size() == 20);
check_ff(stream, 20);
assert(stream.pop_size() == 0);
// assert(stream.pop_size() == 20);
// check_ff(stream, 20);
// assert(stream.pop_size() == 0);
assert(stream.pop_size() == encoder.chunk_size);
check_ff(stream, encoder.chunk_size);
assert(stream.pop_size() == 1467);
check_ff(stream, 1467);
assert(stream.pop_size() == 0);
// assert(stream.pop_size() == encoder.chunk_size);
// check_ff(stream, encoder.chunk_size);
// assert(stream.pop_size() == 1467);
// check_ff(stream, 1467);
// assert(stream.pop_size() == 0);
size_t k = 10000 * 1500;
// size_t k = 10000 * 1500;
while (k > 0) {
auto size = k > encoder.chunk_size ? encoder.chunk_size : k;
assert(stream.pop_size() == size);
check_ff(stream, size);
// while (k > 0) {
// auto size = k > encoder.chunk_size ? encoder.chunk_size : k;
// assert(stream.pop_size() == size);
// check_ff(stream, size);
k -= size;
}
// k -= size;
// }
assert(stream.pop_size() == 0);
// assert(stream.pop_size() == 0);
return 0;
}