Minor fix in bolt session
Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D582
This commit is contained in:
parent
20f2c885c8
commit
aa5de5629a
@ -38,15 +38,7 @@ class Session {
|
||||
|
||||
public:
|
||||
Session(Socket &&socket, Dbms &dbms, QueryEngine<OutputStream> &query_engine)
|
||||
: socket_(std::move(socket)),
|
||||
dbms_(dbms),
|
||||
query_engine_(query_engine),
|
||||
encoder_buffer_(socket_),
|
||||
encoder_(encoder_buffer_),
|
||||
output_stream_(encoder_),
|
||||
decoder_buffer_(buffer_),
|
||||
decoder_(decoder_buffer_),
|
||||
state_(State::Handshake) {
|
||||
: socket_(std::move(socket)), dbms_(dbms), query_engine_(query_engine) {
|
||||
event_.data.ptr = this;
|
||||
}
|
||||
|
||||
@ -155,17 +147,17 @@ class Session {
|
||||
Dbms &dbms_;
|
||||
QueryEngine<OutputStream> &query_engine_;
|
||||
|
||||
ChunkedEncoderBuffer<Socket> encoder_buffer_;
|
||||
Encoder<ChunkedEncoderBuffer<Socket>> encoder_;
|
||||
OutputStream output_stream_;
|
||||
ChunkedEncoderBuffer<Socket> encoder_buffer_{socket_};
|
||||
Encoder<ChunkedEncoderBuffer<Socket>> encoder_{encoder_buffer_};
|
||||
OutputStream output_stream_{encoder_};
|
||||
|
||||
Buffer<> buffer_;
|
||||
ChunkedDecoderBuffer decoder_buffer_;
|
||||
Decoder<ChunkedDecoderBuffer> decoder_;
|
||||
ChunkedDecoderBuffer decoder_buffer_{buffer_};
|
||||
Decoder<ChunkedDecoderBuffer> decoder_{decoder_buffer_};
|
||||
|
||||
io::network::Epoll::Event event_;
|
||||
bool connected_{false};
|
||||
State state_;
|
||||
State state_{State::Handshake};
|
||||
// Active transaction of the session, can be null.
|
||||
tx::Transaction *transaction_;
|
||||
|
||||
|
@ -38,6 +38,11 @@ State HandleRun(Session &session, State state, Marker marker) {
|
||||
DLOG(INFO) << fmt::format("[ActiveDB] '{}'", db_accessor->name());
|
||||
|
||||
if (state != State::Idle) {
|
||||
// TODO: We shouldn't clear the buffer and move to ErrorIdle state, but send
|
||||
// MessageFailure without sending data that is already in buffer and move to
|
||||
// ErrorResult state.
|
||||
session.encoder_buffer_.Clear();
|
||||
|
||||
// send failure message
|
||||
bool unexpected_run_fail_sent = session.encoder_.MessageFailure(
|
||||
{{"code", "Memgraph.QueryExecutionFail"},
|
||||
@ -48,10 +53,13 @@ State HandleRun(Session &session, State state, Marker marker) {
|
||||
DLOG(WARNING) << "Couldn't send failure message!";
|
||||
return State::Close;
|
||||
} else {
|
||||
return State::ErrorResult;
|
||||
return State::ErrorIdle;
|
||||
}
|
||||
}
|
||||
|
||||
debug_assert(!session.encoder_buffer_.HasData(),
|
||||
"There should be no data to write in this state");
|
||||
|
||||
try {
|
||||
DLOG(INFO) << fmt::format("[Run] '{}'", query.Value<std::string>());
|
||||
auto is_successfully_executed = session.query_engine_.Run(
|
||||
|
@ -17,6 +17,8 @@ namespace communication::bolt {
|
||||
*/
|
||||
template <typename Session>
|
||||
State StateInitRun(Session &session) {
|
||||
debug_assert(!session.encoder_buffer_.HasData(),
|
||||
"There should be no data to write in this state");
|
||||
DLOG(INFO) << "Parsing message";
|
||||
|
||||
Marker marker;
|
||||
|
@ -441,7 +441,9 @@ TEST(BoltSession, ErrorRunAfterRun) {
|
||||
session.Execute();
|
||||
|
||||
// Run after run fails, but we still keep results.
|
||||
ASSERT_EQ(session.state_, StateT::ErrorResult);
|
||||
// TODO: actually we don't, but we should. Change state to ErrorResult once
|
||||
// that is fixed.
|
||||
ASSERT_EQ(session.state_, StateT::ErrorIdle);
|
||||
ASSERT_TRUE(session.socket_.IsOpen());
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user