bolt failure isn't finished

This commit is contained in:
Marko Budiselic 2016-08-30 06:49:47 +01:00
parent b5db8d8d1e
commit 1048ea8849
4 changed files with 33 additions and 20 deletions

View File

@ -95,6 +95,16 @@ public:
void write(const String &prop) { encoder.write_string(prop.value); }
void write_failure(const std::map<std::string, std::string>& data)
{
encoder.message_failure();
encoder.write_map_header(data.size());
for (auto const &kv : data) {
write(kv.first);
write(kv.second);
}
}
template <class T>
void handle(const T &prop)
{

View File

@ -92,6 +92,11 @@ public:
chunk();
}
void write_failure(const std::map<std::string, std::string>& data)
{
serializer.write_failure(data);
chunk();
}
// -- BOLT SPECIFIC METHODS -----------------------------------------------
void write(const VertexAccessor &vertex) { serializer.write(vertex); }

View File

@ -253,6 +253,12 @@ public:
write(underlying_cast(MessageCode::Ignored));
}
void message_failure()
{
write_struct_header(1);
write(underlying_cast(MessageCode::Failure));
}
void message_ignored_empty()
{
message_ignored();

View File

@ -10,7 +10,7 @@ namespace bolt
Executor::Executor() : logger(logging::log->logger("Executor")) {}
State* Executor::run(Session& session)
State *Executor::run(Session &session)
{
// just read one byte that represents the struct type, we can skip the
// information contained in this byte
@ -20,31 +20,22 @@ State* Executor::run(Session& session)
auto message_type = session.decoder.read_byte();
if(message_type == MessageCode::Run)
{
if (message_type == MessageCode::Run) {
Query q;
q.statement = session.decoder.read_string();
this->run(session, q);
}
else if(message_type == MessageCode::PullAll)
{
} else if (message_type == MessageCode::PullAll) {
pull_all(session);
}
else if(message_type == MessageCode::DiscardAll)
{
} else if (message_type == MessageCode::DiscardAll) {
discard_all(session);
}
else if(message_type == MessageCode::Reset)
{
} else if (message_type == MessageCode::Reset) {
// todo rollback current transaction
// discard all records waiting to be sent
return this;
}
else
{
} else {
logger.error("Unrecognized message recieved");
logger.debug("Invalid message type 0x{:02X}", message_type);
@ -54,7 +45,7 @@ State* Executor::run(Session& session)
return this;
}
void Executor::run(Session& session, Query& query)
void Executor::run(Session &session, Query &query)
{
logger.trace("[Run] '{}'", query.statement);
@ -64,18 +55,20 @@ void Executor::run(Session& session, Query& query)
try {
query_engine.execute(query.statement, db, session.output_stream);
} catch (QueryEngineException &e) {
// return error to user
session.output_stream.write_failure(
{{"code", "unknown"}, {"message", e.what()}});
session.output_stream.send();
}
}
void Executor::pull_all(Session& session)
void Executor::pull_all(Session &session)
{
logger.trace("[PullAll]");
session.output_stream.send();
}
void Executor::discard_all(Session& session)
void Executor::discard_all(Session &session)
{
logger.trace("[DiscardAll]");
@ -85,5 +78,4 @@ void Executor::discard_all(Session& session)
session.output_stream.chunk();
session.output_stream.send();
}
}