Added socket write verification to bolt buffer.
Reviewers: buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D199
This commit is contained in:
parent
72f7b64c8f
commit
22dfe61fe1
11
src/communication/bolt/v1/bolt_exception.hpp
Normal file
11
src/communication/bolt/v1/bolt_exception.hpp
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "utils/exceptions/basic_exception.hpp"
|
||||||
|
|
||||||
|
namespace communication::bolt {
|
||||||
|
|
||||||
|
class BoltException : public BasicException {
|
||||||
|
public:
|
||||||
|
using BasicException::BasicException;
|
||||||
|
};
|
||||||
|
}
|
@ -5,6 +5,7 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include "communication/bolt/v1/bolt_exception.hpp"
|
||||||
#include "communication/bolt/v1/constants.hpp"
|
#include "communication/bolt/v1/constants.hpp"
|
||||||
#include "logging/loggable.hpp"
|
#include "logging/loggable.hpp"
|
||||||
#include "utils/bswap.hpp"
|
#include "utils/bswap.hpp"
|
||||||
@ -112,7 +113,8 @@ class ChunkedEncoderBuffer : public Loggable {
|
|||||||
if (size_ == 0) return;
|
if (size_ == 0) return;
|
||||||
|
|
||||||
// Flush the whole buffer.
|
// Flush the whole buffer.
|
||||||
socket_.Write(buffer_.data(), size_);
|
bool written = socket_.Write(buffer_.data(), size_);
|
||||||
|
if (!written) throw BoltException("Socket write failed!");
|
||||||
logger.trace("Flushed {} bytes.", size_);
|
logger.trace("Flushed {} bytes.", size_);
|
||||||
|
|
||||||
// Cleanup.
|
// Cleanup.
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
#include "communication/bolt/v1/bolt_exception.hpp"
|
||||||
#include "communication/bolt/v1/messaging/codes.hpp"
|
#include "communication/bolt/v1/messaging/codes.hpp"
|
||||||
#include "communication/bolt/v1/state.hpp"
|
#include "communication/bolt/v1/state.hpp"
|
||||||
#include "logging/default.hpp"
|
#include "logging/default.hpp"
|
||||||
@ -14,6 +15,17 @@ struct Query {
|
|||||||
std::string statement;
|
std::string statement;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template <typename Session>
|
||||||
|
void StateExecutorFailure(Session &session, Logger &logger,
|
||||||
|
const std::map<std::string, TypedValue> &metadata) {
|
||||||
|
try {
|
||||||
|
session.encoder_.MessageFailure(metadata);
|
||||||
|
} catch (const BoltException &e) {
|
||||||
|
logger.debug("MessageFailure failed because: {}", e.what());
|
||||||
|
session.Close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO (mferencevic): finish & document
|
* TODO (mferencevic): finish & document
|
||||||
*/
|
*/
|
||||||
@ -41,7 +53,8 @@ State StateExecutorRun(Session &session) {
|
|||||||
|
|
||||||
if (!is_successfully_executed) {
|
if (!is_successfully_executed) {
|
||||||
db_accessor->abort();
|
db_accessor->abort();
|
||||||
session.encoder_.MessageFailure(
|
StateExecutorFailure<Session>(
|
||||||
|
session, logger,
|
||||||
{{"code", "Memgraph.QueryExecutionFail"},
|
{{"code", "Memgraph.QueryExecutionFail"},
|
||||||
{"message",
|
{"message",
|
||||||
"Query execution has failed (probably there is no "
|
"Query execution has failed (probably there is no "
|
||||||
@ -57,23 +70,31 @@ State StateExecutorRun(Session &session) {
|
|||||||
// !! QUERY ENGINE -> RUN METHOD -> EXCEPTION HANDLING !!
|
// !! QUERY ENGINE -> RUN METHOD -> EXCEPTION HANDLING !!
|
||||||
} catch (const query::SyntaxException &e) {
|
} catch (const query::SyntaxException &e) {
|
||||||
db_accessor->abort();
|
db_accessor->abort();
|
||||||
session.encoder_.MessageFailure(
|
StateExecutorFailure<Session>(
|
||||||
|
session, logger,
|
||||||
{{"code", "Memgraph.SyntaxException"}, {"message", "Syntax error"}});
|
{{"code", "Memgraph.SyntaxException"}, {"message", "Syntax error"}});
|
||||||
return ERROR;
|
return ERROR;
|
||||||
} catch (const query::QueryEngineException &e) {
|
} catch (const query::QueryEngineException &e) {
|
||||||
db_accessor->abort();
|
db_accessor->abort();
|
||||||
session.encoder_.MessageFailure(
|
StateExecutorFailure<Session>(
|
||||||
|
session, logger,
|
||||||
{{"code", "Memgraph.QueryEngineException"},
|
{{"code", "Memgraph.QueryEngineException"},
|
||||||
{"message", "Query engine was unable to execute the query"}});
|
{"message", "Query engine was unable to execute the query"}});
|
||||||
return ERROR;
|
return ERROR;
|
||||||
} catch (const StacktraceException &e) {
|
} catch (const StacktraceException &e) {
|
||||||
db_accessor->abort();
|
db_accessor->abort();
|
||||||
session.encoder_.MessageFailure({{"code", "Memgraph.StacktraceException"},
|
StateExecutorFailure<Session>(session, logger,
|
||||||
|
{{"code", "Memgraph.StacktraceException"},
|
||||||
{"message", "Unknown exception"}});
|
{"message", "Unknown exception"}});
|
||||||
return ERROR;
|
return ERROR;
|
||||||
|
} catch (const BoltException &e) {
|
||||||
|
db_accessor->abort();
|
||||||
|
logger.debug("Failed because: {}", e.what());
|
||||||
|
session.Close();
|
||||||
} catch (std::exception &e) {
|
} catch (std::exception &e) {
|
||||||
db_accessor->abort();
|
db_accessor->abort();
|
||||||
session.encoder_.MessageFailure(
|
StateExecutorFailure<Session>(
|
||||||
|
session, logger,
|
||||||
{{"code", "Memgraph.Exception"}, {"message", "Unknown exception"}});
|
{{"code", "Memgraph.Exception"}, {"message", "Unknown exception"}});
|
||||||
return ERROR;
|
return ERROR;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user