Separate bolt::State::executor to Idle and Result

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D567
This commit is contained in:
Mislav Bradac 2017-07-18 17:14:43 +02:00
parent 16d94c8aaf
commit da0fce1a84
7 changed files with 62 additions and 22 deletions

View File

@ -5,6 +5,8 @@
#include <memory>
#include <vector>
#include <glog/logging.h>
#include "communication/bolt/v1/constants.hpp"
#include "communication/bolt/v1/decoder/buffer.hpp"
#include "utils/assert.hpp"

View File

@ -9,8 +9,8 @@
#include "communication/bolt/v1/constants.hpp"
#include "communication/bolt/v1/state.hpp"
#include "communication/bolt/v1/states/error.hpp"
#include "communication/bolt/v1/states/executor.hpp"
#include "communication/bolt/v1/states/handshake.hpp"
#include "communication/bolt/v1/states/idle_result.hpp"
#include "communication/bolt/v1/states/init.hpp"
#include "communication/bolt/v1/decoder/chunked_decoder_buffer.hpp"
@ -90,16 +90,17 @@ class Session {
switch (state_) {
case State::Handshake:
state_ = StateHandshakeRun<Session<Socket>>(*this);
state_ = StateHandshakeRun(*this);
break;
case State::Init:
state_ = StateInitRun<Session<Socket>>(*this);
state_ = StateInitRun(*this);
break;
case State::Executor:
state_ = StateExecutorRun<Session<Socket>>(*this);
case State::Idle:
case State::Result:
state_ = StateIdleResultRun(*this, state_);
break;
case State::Error:
state_ = StateErrorRun<Session<Socket>>(*this);
state_ = StateErrorRun(*this);
break;
case State::Close:
// This state is handled below

View File

@ -21,9 +21,15 @@ enum class State : uint8_t {
Init,
/**
* This state executes commands from the Bolt protocol.
* This state waits for next query (RUN command).
*/
Executor,
Idle,
/**
* This state holds results of RUN command and waits for either PULL_ALL or
* DISCARD_ALL command.
*/
Result,
/**
* This state handles errors.

View File

@ -40,7 +40,7 @@ State StateErrorRun(Session &session) {
DLOG(WARNING) << "Couldn't send success message!";
return State::Close;
}
return State::Executor;
return State::Idle;
} else {
uint8_t value = underlying_cast(marker);

View File

@ -19,7 +19,7 @@ namespace communication::bolt {
* @param session the session that should be used for the run
*/
template <typename Session>
State StateExecutorRun(Session &session) {
State StateIdleResultRun(Session &session, State state) {
Marker marker;
Signature signature;
if (!session.decoder_.ReadMessageHeader(&signature, &marker)) {
@ -51,6 +51,11 @@ State StateExecutorRun(Session &session) {
auto db_accessor = session.dbms_.active();
DLOG(INFO) << fmt::format("[ActiveDB] '{}'", db_accessor->name());
if (state != State::Idle) {
DLOG(WARNING) << "Unexpected RUN command!";
return State::Error;
}
try {
DLOG(INFO) << fmt::format("[Run] '{}'", query.Value<std::string>());
auto is_successfully_executed = session.query_engine_.Run(
@ -89,7 +94,7 @@ State StateExecutorRun(Session &session) {
DLOG(WARNING) << "Couldn't flush header data from the buffer!";
return State::Close;
}
return State::Executor;
return State::Result;
}
} catch (const utils::BasicException &e) {
@ -144,7 +149,7 @@ State StateExecutorRun(Session &session) {
underlying_cast(marker));
return State::Close;
}
if (!session.encoder_buffer_.HasData()) {
if (state != State::Result) {
// the buffer doesn't have data, return a failure message
bool data_fail_sent = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"},
@ -163,7 +168,7 @@ State StateExecutorRun(Session &session) {
DLOG(WARNING) << "Couldn't flush data from the buffer!";
return State::Close;
}
return State::Executor;
return State::Idle;
} else if (signature == Signature::DiscardAll) {
DLOG(INFO) << "[DiscardAll]";
@ -173,13 +178,27 @@ State StateExecutorRun(Session &session) {
underlying_cast(marker));
return State::Close;
}
if (state != State::Result) {
bool data_fail_discard = session.encoder_.MessageFailure(
{{"code", "Memgraph.Exception"},
{"message",
"There is no data to "
"discard, you have to execute a RUN command before a "
"DISCARD_ALL!"}});
if (!data_fail_discard) {
DLOG(WARNING) << "Couldn't send failure message!";
return State::Close;
}
return State::Error;
}
// clear all pending data and send a success message
session.encoder_buffer_.Clear();
if (!session.encoder_.MessageSuccess()) {
DLOG(WARNING) << "Couldn't send success message!";
return State::Close;
}
return State::Executor;
return State::Idle;
} else if (signature == Signature::Reset) {
// IMPORTANT: This implementation of the Bolt RESET command isn't fully
@ -203,7 +222,7 @@ State StateExecutorRun(Session &session) {
DLOG(WARNING) << "Couldn't send success message!";
return State::Close;
}
return State::Executor;
return State::Idle;
} else {
DLOG(WARNING) << fmt::format("Unrecognized signature recieved (0x{:02X})!",

View File

@ -66,6 +66,6 @@ State StateInitRun(Session &session) {
return State::Close;
}
return State::Executor;
return State::Idle;
}
}

View File

@ -89,7 +89,7 @@ void ExecuteCommand(SessionT &session, const uint8_t *data, size_t len,
// Execute and check a correct init
void ExecuteInit(SessionT &session, std::vector<uint8_t> &output) {
ExecuteCommand(session, init_req, sizeof(init_req));
ASSERT_EQ(session.state_, StateT::Executor);
ASSERT_EQ(session.state_, StateT::Idle);
ASSERT_TRUE(session.socket_.IsOpen());
PrintOutput(output);
CheckOutput(output, init_resp, 7);
@ -286,6 +286,18 @@ TEST(BoltSession, ExecuteRunBasicException) {
}
}
TEST(BoltSession, ExecuteRunWithoutPullAll) {
INIT_VARS;
ExecuteHandshake(session, output);
ExecuteInit(session, output);
WriteRunRequest(session, "RETURN 2");
session.Execute();
ASSERT_EQ(session.state_, StateT::Result);
}
TEST(BoltSession, ExecutePullAllDiscardAllResetWrongMarker) {
// This test first tests PULL_ALL then DISCARD_ALL and then RESET
// It tests for missing data in the message header
@ -351,7 +363,7 @@ TEST(BoltSession, ExecutePullAllDiscardAllReset) {
ExecuteCommand(session, dataset[i], 2);
if (j == 0) {
ASSERT_EQ(session.state_, StateT::Executor);
ASSERT_EQ(session.state_, StateT::Idle);
ASSERT_TRUE(session.socket_.IsOpen());
ASSERT_FALSE(session.encoder_buffer_.HasData());
PrintOutput(output);
@ -470,7 +482,7 @@ TEST(BoltSession, ErrorOK) {
ASSERT_EQ(session.decoder_buffer_.Size(), 0);
if (j == 0) {
ASSERT_EQ(session.state_, StateT::Executor);
ASSERT_EQ(session.state_, StateT::Idle);
ASSERT_TRUE(session.socket_.IsOpen());
CheckOutput(output, success_resp, sizeof(success_resp));
} else {
@ -511,7 +523,7 @@ TEST(BoltSession, MultipleChunksInOneExecute) {
WriteRunRequest(session, "CREATE (n) RETURN n");
ExecuteCommand(session, pullall_req, sizeof(pullall_req));
ASSERT_EQ(session.state_, StateT::Executor);
ASSERT_EQ(session.state_, StateT::Idle);
ASSERT_TRUE(session.socket_.IsOpen());
PrintOutput(output);
@ -543,7 +555,7 @@ TEST(BoltSession, PartialChunk) {
// missing chunk tail
session.Execute();
ASSERT_EQ(session.state_, StateT::Executor);
ASSERT_EQ(session.state_, StateT::Idle);
ASSERT_TRUE(session.socket_.IsOpen());
ASSERT_EQ(output.size(), 0);
@ -551,7 +563,7 @@ TEST(BoltSession, PartialChunk) {
session.Execute();
ASSERT_EQ(session.state_, StateT::Executor);
ASSERT_EQ(session.state_, StateT::Error);
ASSERT_TRUE(session.socket_.IsOpen());
ASSERT_GT(output.size(), 0);
PrintOutput(output);