Minor refactor of bolt session

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D577
This commit is contained in:
Mislav Bradac 2017-07-20 18:35:25 +02:00
parent 76f67d0173
commit b9f84ca00e
5 changed files with 275 additions and 205 deletions

View File

@ -5,6 +5,7 @@
#include "database/dbms.hpp" #include "database/dbms.hpp"
#include "query/engine.hpp" #include "query/engine.hpp"
#include "transactions/transaction.hpp"
#include "communication/bolt/v1/constants.hpp" #include "communication/bolt/v1/constants.hpp"
#include "communication/bolt/v1/state.hpp" #include "communication/bolt/v1/state.hpp"
@ -99,8 +100,9 @@ class Session {
case State::Result: case State::Result:
state_ = StateIdleResultRun(*this, state_); state_ = StateIdleResultRun(*this, state_);
break; break;
case State::Error: case State::ErrorIdle:
state_ = StateErrorRun(*this); case State::ErrorResult:
state_ = StateErrorRun(*this, state_);
break; break;
case State::Close: case State::Close:
// This state is handled below // This state is handled below
@ -147,6 +149,8 @@ class Session {
GraphDbAccessor ActiveDb() { return dbms_.active(); } GraphDbAccessor ActiveDb() { return dbms_.active(); }
// TODO: Rethink if there is a way to hide some members. At the momemnt all of
// them are public.
Socket socket_; Socket socket_;
Dbms &dbms_; Dbms &dbms_;
QueryEngine<OutputStream> &query_engine_; QueryEngine<OutputStream> &query_engine_;
@ -162,6 +166,8 @@ class Session {
io::network::Epoll::Event event_; io::network::Epoll::Event event_;
bool connected_{false}; bool connected_{false};
State state_; State state_;
// Active transaction of the session, can be null.
tx::Transaction *transaction_;
private: private:
void ClientFailureInvalidData() { void ClientFailureInvalidData() {

View File

@ -32,9 +32,16 @@ enum class State : uint8_t {
Result, Result,
/** /**
* This state handles errors. * This state handles errors, if client handles error response correctly next
* state is Idle.
*/ */
Error, ErrorIdle,
/**
* This state handles errors, if client handles error response correctly next
* state is Result.
*/
ErrorResult,
/** /**
* This is a 'virtual' state (it doesn't have a run function) which tells * This is a 'virtual' state (it doesn't have a run function) which tells

View File

@ -16,7 +16,7 @@ namespace communication::bolt {
* @param session the session that should be used for the run * @param session the session that should be used for the run
*/ */
template <typename Session> template <typename Session>
State StateErrorRun(Session &session) { State StateErrorRun(Session &session, State state) {
Marker marker; Marker marker;
Signature signature; Signature signature;
if (!session.decoder_.ReadMessageHeader(&signature, &marker)) { if (!session.decoder_.ReadMessageHeader(&signature, &marker)) {
@ -40,7 +40,13 @@ State StateErrorRun(Session &session) {
DLOG(WARNING) << "Couldn't send success message!"; DLOG(WARNING) << "Couldn't send success message!";
return State::Close; return State::Close;
} }
return State::Idle; if (signature == Signature::Reset) {
return State::Idle;
}
if (state == State::ErrorIdle) {
return State::Idle;
}
return State::Result;
} else { } else {
uint8_t value = underlying_cast(marker); uint8_t value = underlying_cast(marker);
@ -70,7 +76,7 @@ State StateErrorRun(Session &session) {
} }
// cleanup done, command ignored, stay in error state // cleanup done, command ignored, stay in error state
return State::Error; return state;
} }
} }
} }

View File

@ -13,6 +13,221 @@
namespace communication::bolt { namespace communication::bolt {
template <typename Session>
State HandleRun(Session &session, State state, Marker marker) {
if (marker != Marker::TinyStruct2) {
DLOG(WARNING) << fmt::format(
"Expected TinyStruct2 marker, but received 0x{:02X}!",
underlying_cast(marker));
return State::Close;
}
query::TypedValue query, params;
if (!session.decoder_.ReadTypedValue(&query,
query::TypedValue::Type::String)) {
DLOG(WARNING) << "Couldn't read query string!";
return State::Close;
}
if (!session.decoder_.ReadTypedValue(&params, query::TypedValue::Type::Map)) {
DLOG(WARNING) << "Couldn't read parameters!";
return State::Close;
}
auto db_accessor = session.dbms_.active();
DLOG(INFO) << fmt::format("[ActiveDB] '{}'", db_accessor->name());
if (state != State::Idle) {
// send failure message
bool unexpected_run_fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.QueryExecutionFail"},
{"message", "Unexpected RUN command"}});
DLOG(WARNING) << "Unexpected RUN command!";
if (!unexpected_run_fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
} else {
return State::ErrorResult;
}
}
try {
DLOG(INFO) << fmt::format("[Run] '{}'", query.Value<std::string>());
auto is_successfully_executed = session.query_engine_.Run(
query.Value<std::string>(), *db_accessor, session.output_stream_,
params.Value<std::map<std::string, query::TypedValue>>());
if (!is_successfully_executed) {
// abort transaction
db_accessor->abort();
// clear any leftover messages in the buffer
session.encoder_buffer_.Clear();
// send failure message
bool exec_fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.QueryExecutionFail"},
{"message",
"Query execution has failed (probably there is no "
"element or there are some problems with concurrent "
"access -> client has to resolve problems with "
"concurrent access)"}});
DLOG(WARNING) << "Query execution failed!";
if (!exec_fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
} else {
return State::ErrorIdle;
}
} else {
db_accessor->commit();
// The query engine has already stored all query data in the buffer.
// We should only send the first chunk now which is the success
// message which contains header data. The rest of this data (records
// and summary) will be sent after a PULL_ALL command from the client.
if (!session.encoder_buffer_.FlushFirstChunk()) {
DLOG(WARNING) << "Couldn't flush header data from the buffer!";
return State::Close;
}
return State::Result;
}
} catch (const utils::BasicException &e) {
// clear header success message
session.encoder_buffer_.Clear();
db_accessor->abort();
bool fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"}, {"message", e.what()}});
DLOG(WARNING) << fmt::format("Error message: {}", e.what());
if (!fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::ErrorIdle;
} catch (const utils::StacktraceException &e) {
// clear header success message
session.encoder_buffer_.Clear();
db_accessor->abort();
bool fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"}, {"message", e.what()}});
DLOG(WARNING) << fmt::format("Error message: {}", e.what());
DLOG(WARNING) << fmt::format("Error trace: {}", e.trace());
if (!fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::ErrorIdle;
} catch (std::exception &e) {
// clear header success message
session.encoder_buffer_.Clear();
db_accessor->abort();
bool fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"},
{"message",
"An unknown exception occured, please contact your database "
"administrator."}});
DLOG(WARNING) << fmt::format("std::exception {}", e.what());
if (!fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::ErrorIdle;
}
}
template <typename Session>
State HandlePullAll(Session &session, State state, Marker marker) {
DLOG(INFO) << "[PullAll]";
if (marker != Marker::TinyStruct) {
DLOG(WARNING) << fmt::format(
"Expected TinyStruct marker, but received 0x{:02X}!",
underlying_cast(marker));
return State::Close;
}
if (state != State::Result) {
// the buffer doesn't have data, return a failure message
bool data_fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"},
{"message",
"There is no data to "
"send, you have to execute a RUN command before a PULL_ALL!"}});
if (!data_fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::ErrorIdle;
}
// flush pending data to the client, the success message is streamed
// from the query engine, it contains statistics from the query run
if (!session.encoder_buffer_.Flush()) {
DLOG(WARNING) << "Couldn't flush data from the buffer!";
return State::Close;
}
return State::Idle;
}
template <typename Session>
State HandleDiscardAll(Session &session, State state, Marker marker) {
DLOG(INFO) << "[DiscardAll]";
if (marker != Marker::TinyStruct) {
DLOG(WARNING) << fmt::format(
"Expected TinyStruct marker, but received 0x{:02X}!",
underlying_cast(marker));
return State::Close;
}
if (state != State::Result) {
bool data_fail_discard = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"},
{"message",
"There is no data to "
"discard, you have to execute a RUN command before a "
"DISCARD_ALL!"}});
if (!data_fail_discard) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::ErrorIdle;
}
// clear all pending data and send a success message
session.encoder_buffer_.Clear();
if (!session.encoder_.MessageSuccess()) {
DLOG(WARNING) << "Couldn't send success message!";
return State::Close;
}
return State::Idle;
}
template <typename Session>
State HandleReset(Session &session, State, Marker marker) {
// IMPORTANT: This implementation of the Bolt RESET command isn't fully
// compliant to the protocol definition. In the protocol it is defined
// that this command should immediately stop any running commands and
// reset the session to a clean state. That means that we should always
// make a look-ahead for the RESET command before processing anything.
// Our implementation, for now, does everything in a blocking fashion
// so we cannot simply "kill" a transaction while it is running. So
// now this command only resets the session to a clean state. It
// does not IGNORE running and pending commands as it should.
if (marker != Marker::TinyStruct) {
DLOG(WARNING) << fmt::format(
"Expected TinyStruct marker, but received 0x{:02X}!",
underlying_cast(marker));
return State::Close;
}
// clear all pending data and send a success message
session.encoder_buffer_.Clear();
if (!session.encoder_.MessageSuccess()) {
DLOG(WARNING) << "Couldn't send success message!";
return State::Close;
}
return State::Idle;
}
/** /**
* Executor state run function * Executor state run function
* This function executes an initialized Bolt session. * This function executes an initialized Bolt session.
@ -29,203 +244,13 @@ State StateIdleResultRun(Session &session, State state) {
} }
if (signature == Signature::Run) { if (signature == Signature::Run) {
if (marker != Marker::TinyStruct2) { return HandleRun(session, state, marker);
DLOG(WARNING) << fmt::format(
"Expected TinyStruct2 marker, but received 0x{:02X}!",
underlying_cast(marker));
return State::Close;
}
query::TypedValue query, params;
if (!session.decoder_.ReadTypedValue(&query,
query::TypedValue::Type::String)) {
DLOG(WARNING) << "Couldn't read query string!";
return State::Close;
}
if (!session.decoder_.ReadTypedValue(&params,
query::TypedValue::Type::Map)) {
DLOG(WARNING) << "Couldn't read parameters!";
return State::Close;
}
auto db_accessor = session.dbms_.active();
DLOG(INFO) << fmt::format("[ActiveDB] '{}'", db_accessor->name());
if (state != State::Idle) {
DLOG(WARNING) << "Unexpected RUN command!";
return State::Error;
}
try {
DLOG(INFO) << fmt::format("[Run] '{}'", query.Value<std::string>());
auto is_successfully_executed = session.query_engine_.Run(
query.Value<std::string>(), *db_accessor, session.output_stream_,
params.Value<std::map<std::string, query::TypedValue>>());
if (!is_successfully_executed) {
// abort transaction
db_accessor->abort();
// clear any leftover messages in the buffer
session.encoder_buffer_.Clear();
// send failure message
bool exec_fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.QueryExecutionFail"},
{"message",
"Query execution has failed (probably there is no "
"element or there are some problems with concurrent "
"access -> client has to resolve problems with "
"concurrent access)"}});
if (!exec_fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
} else {
DLOG(WARNING) << "Query execution failed!";
return State::Error;
}
} else {
db_accessor->commit();
// The query engine has already stored all query data in the buffer.
// We should only send the first chunk now which is the success
// message which contains header data. The rest of this data (records
// and summary) will be sent after a PULL_ALL command from the client.
if (!session.encoder_buffer_.FlushFirstChunk()) {
DLOG(WARNING) << "Couldn't flush header data from the buffer!";
return State::Close;
}
return State::Result;
}
} catch (const utils::BasicException &e) {
// clear header success message
session.encoder_buffer_.Clear();
db_accessor->abort();
bool fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"}, {"message", e.what()}});
DLOG(WARNING) << fmt::format("Error message: {}", e.what());
if (!fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::Error;
} catch (const utils::StacktraceException &e) {
// clear header success message
session.encoder_buffer_.Clear();
db_accessor->abort();
bool fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"}, {"message", e.what()}});
DLOG(WARNING) << fmt::format("Error message: {}", e.what());
DLOG(WARNING) << fmt::format("Error trace: {}", e.trace());
if (!fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::Error;
} catch (std::exception &e) {
// clear header success message
session.encoder_buffer_.Clear();
db_accessor->abort();
bool fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"},
{"message",
"An unknown exception occured, please contact your database "
"administrator."}});
DLOG(WARNING) << fmt::format("std::exception {}", e.what());
if (!fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::Error;
}
} else if (signature == Signature::PullAll) { } else if (signature == Signature::PullAll) {
DLOG(INFO) << "[PullAll]"; return HandlePullAll(session, state, marker);
if (marker != Marker::TinyStruct) {
DLOG(WARNING) << fmt::format(
"Expected TinyStruct marker, but received 0x{:02X}!",
underlying_cast(marker));
return State::Close;
}
if (state != State::Result) {
// the buffer doesn't have data, return a failure message
bool data_fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"},
{"message",
"There is no data to "
"send, you have to execute a RUN command before a PULL_ALL!"}});
if (!data_fail_sent) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::Error;
}
// flush pending data to the client, the success message is streamed
// from the query engine, it contains statistics from the query run
if (!session.encoder_buffer_.Flush()) {
DLOG(WARNING) << "Couldn't flush data from the buffer!";
return State::Close;
}
return State::Idle;
} else if (signature == Signature::DiscardAll) { } else if (signature == Signature::DiscardAll) {
DLOG(INFO) << "[DiscardAll]"; return HandleDiscardAll(session, state, marker);
if (marker != Marker::TinyStruct) {
DLOG(WARNING) << fmt::format(
"Expected TinyStruct marker, but received 0x{:02X}!",
underlying_cast(marker));
return State::Close;
}
if (state != State::Result) {
bool data_fail_discard = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"},
{"message",
"There is no data to "
"discard, you have to execute a RUN command before a "
"DISCARD_ALL!"}});
if (!data_fail_discard) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::Error;
}
// clear all pending data and send a success message
session.encoder_buffer_.Clear();
if (!session.encoder_.MessageSuccess()) {
DLOG(WARNING) << "Couldn't send success message!";
return State::Close;
}
return State::Idle;
} else if (signature == Signature::Reset) { } else if (signature == Signature::Reset) {
// IMPORTANT: This implementation of the Bolt RESET command isn't fully return HandleReset(session, state, marker);
// compliant to the protocol definition. In the protocol it is defined
// that this command should immediately stop any running commands and
// reset the session to a clean state. That means that we should always
// make a look-ahead for the RESET command before processing anything.
// Our implementation, for now, does everything in a blocking fashion
// so we cannot simply "kill" a transaction while it is running. So
// now this command only resets the session to a clean state. It
// does not IGNORE running and pending commands as it should.
if (marker != Marker::TinyStruct) {
DLOG(WARNING) << fmt::format(
"Expected TinyStruct marker, but received 0x{:02X}!",
underlying_cast(marker));
return State::Close;
}
// clear all pending data and send a success message
session.encoder_buffer_.Clear();
if (!session.encoder_.MessageSuccess()) {
DLOG(WARNING) << "Couldn't send success message!";
return State::Close;
}
return State::Idle;
} else { } else {
DLOG(WARNING) << fmt::format("Unrecognized signature recieved (0x{:02X})!", DLOG(WARNING) << fmt::format("Unrecognized signature recieved (0x{:02X})!",
underlying_cast(signature)); underlying_cast(signature));

View File

@ -275,7 +275,7 @@ TEST(BoltSession, ExecuteRunBasicException) {
session.Execute(); session.Execute();
if (i == 0) { if (i == 0) {
ASSERT_EQ(session.state_, StateT::Error); ASSERT_EQ(session.state_, StateT::ErrorIdle);
ASSERT_TRUE(session.socket_.IsOpen()); ASSERT_TRUE(session.socket_.IsOpen());
CheckFailureMessage(output); CheckFailureMessage(output);
} else { } else {
@ -331,7 +331,7 @@ TEST(BoltSession, ExecutePullAllBufferEmpty) {
ExecuteCommand(session, pullall_req, sizeof(pullall_req)); ExecuteCommand(session, pullall_req, sizeof(pullall_req));
if (i == 0) { if (i == 0) {
ASSERT_EQ(session.state_, StateT::Error); ASSERT_EQ(session.state_, StateT::ErrorIdle);
ASSERT_TRUE(session.socket_.IsOpen()); ASSERT_TRUE(session.socket_.IsOpen());
CheckFailureMessage(output); CheckFailureMessage(output);
} else { } else {
@ -408,7 +408,7 @@ TEST(BoltSession, ErrorIgnoreMessage) {
ASSERT_EQ(session.decoder_buffer_.Size(), 0); ASSERT_EQ(session.decoder_buffer_.Size(), 0);
if (i == 0) { if (i == 0) {
ASSERT_EQ(session.state_, StateT::Error); ASSERT_EQ(session.state_, StateT::ErrorIdle);
ASSERT_TRUE(session.socket_.IsOpen()); ASSERT_TRUE(session.socket_.IsOpen());
CheckOutput(output, ignored_resp, sizeof(ignored_resp)); CheckOutput(output, ignored_resp, sizeof(ignored_resp));
} else { } else {
@ -419,6 +419,32 @@ TEST(BoltSession, ErrorIgnoreMessage) {
} }
} }
TEST(BoltSession, ErrorRunAfterRun) {
// first test with socket write success, then with socket write fail
INIT_VARS;
ExecuteHandshake(session, output);
ExecuteInit(session, output);
WriteRunRequest(session, "MATCH (n) RETURN n");
session.Execute();
output.clear();
session.socket_.SetWriteSuccess(true);
// Session holds results of last run.
ASSERT_EQ(session.state_, StateT::Result);
// New run request.
WriteRunRequest(session, "MATCH (n) RETURN n");
session.Execute();
// Run after run fails, but we still keep results.
ASSERT_EQ(session.state_, StateT::ErrorResult);
ASSERT_TRUE(session.socket_.IsOpen());
}
TEST(BoltSession, ErrorCantCleanup) { TEST(BoltSession, ErrorCantCleanup) {
INIT_VARS; INIT_VARS;
@ -563,7 +589,7 @@ TEST(BoltSession, PartialChunk) {
session.Execute(); session.Execute();
ASSERT_EQ(session.state_, StateT::Error); ASSERT_EQ(session.state_, StateT::ErrorIdle);
ASSERT_TRUE(session.socket_.IsOpen()); ASSERT_TRUE(session.socket_.IsOpen());
ASSERT_GT(output.size(), 0); ASSERT_GT(output.size(), 0);
PrintOutput(output); PrintOutput(output);