Fix remote pull

Summary:
- Extracted TypedValue reconstruction into utils (will need that soon)
- Added more error handling in RemotePull

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1183
This commit is contained in:
florijan 2018-02-08 13:27:07 +01:00
parent e2f5bcf96d
commit 45da645a4b
7 changed files with 101 additions and 63 deletions

View File

@ -12,6 +12,7 @@
#include "distributed/plan_consumer.hpp"
#include "distributed/remote_pull_produce_rpc_messages.hpp"
#include "query/context.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/interpret/frame.hpp"
#include "query/parameters.hpp"
@ -47,16 +48,30 @@ class RemoteProduceRpcServer {
/** Returns a vector of typed values (one for each `pull_symbol`), and a
* `bool` indicating if the pull was successful (or the cursor is
* exhausted). */
std::pair<std::vector<query::TypedValue>, bool> Pull() {
std::pair<std::vector<query::TypedValue>, RemotePullState> Pull() {
RemotePullState state = RemotePullState::CURSOR_IN_PROGRESS;
std::vector<query::TypedValue> results;
auto success = cursor_->Pull(frame_, context_);
if (success) {
results.reserve(pull_symbols_.size());
for (const auto &symbol : pull_symbols_) {
results.emplace_back(std::move(frame_[symbol]));
try {
if (cursor_->Pull(frame_, context_)) {
results.reserve(pull_symbols_.size());
for (const auto &symbol : pull_symbols_) {
results.emplace_back(std::move(frame_[symbol]));
}
} else {
state = RemotePullState::CURSOR_EXHAUSTED;
}
} catch (const mvcc::SerializationError &) {
state = RemotePullState::SERIALIZATION_ERROR;
} catch (const LockTimeoutException &) {
state = RemotePullState::LOCK_TIMEOUT_ERROR;
} catch (const RecordDeletedError &) {
state = RemotePullState::UPDATE_DELETED_ERROR;
} catch (const query::ReconstructionException &) {
state = RemotePullState::RECONSTRUCTION_ERROR;
} catch (const query::QueryRuntimeException &) {
state = RemotePullState::QUERY_ERROR;
}
return std::make_pair(std::move(results), success);
return std::make_pair(std::move(results), state);
}
private:
@ -123,17 +138,13 @@ class RemoteProduceRpcServer {
auto &ongoing_produce = GetOngoingProduce(req);
RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new};
result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS;
for (int i = 0; i < req.batch_size; ++i) {
// TODO exception handling (Serialization errors)
// when full CRUD. Maybe put it in OngoingProduce::Pull
auto pull_result = ongoing_produce.Pull();
if (!pull_result.second) {
result.state_and_frames.pull_state = RemotePullState::CURSOR_EXHAUSTED;
result.state_and_frames.pull_state = pull_result.second;
if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS)
break;
}
result.state_and_frames.frames.emplace_back(std::move(pull_result.first));
}

View File

@ -26,8 +26,11 @@ constexpr int kDefaultBatchSize = 20;
enum class RemotePullState {
CURSOR_EXHAUSTED,
CURSOR_IN_PROGRESS,
SERIALIZATION_ERROR // future-proofing for full CRUD
// TODO in full CRUD other errors
SERIALIZATION_ERROR,
LOCK_TIMEOUT_ERROR,
UPDATE_DELETED_ERROR,
RECONSTRUCTION_ERROR,
QUERY_ERROR
};
const std::string kRemotePullProduceRpcName = "RemotePullProduceRpc";

View File

@ -180,4 +180,37 @@ std::string ParseParameter(const std::string &s) {
}
return out;
}
void ReconstructTypedValue(TypedValue &value) {
using Type = TypedValue::Type;
switch (value.type()) {
case Type::Vertex:
if (!value.ValueVertex().Reconstruct()) throw ReconstructionException();
break;
case Type::Edge:
if (!value.ValueEdge().Reconstruct()) throw ReconstructionException();
break;
case Type::List:
for (TypedValue &inner_value : value.Value<std::vector<TypedValue>>())
ReconstructTypedValue(inner_value);
break;
case Type::Map:
for (auto &kv : value.Value<std::map<std::string, TypedValue>>())
ReconstructTypedValue(kv.second);
break;
case Type::Path:
for (auto &vertex : value.ValuePath().vertices()) {
if (!vertex.Reconstruct()) throw ReconstructionException();
}
for (auto &edge : value.ValuePath().edges()) {
if (!edge.Reconstruct()) throw ReconstructionException();
}
case Type::Null:
case Type::Bool:
case Type::Int:
case Type::Double:
case Type::String:
break;
}
}
} // namespace query

View File

@ -3,6 +3,8 @@
#include <cstdint>
#include <string>
#include "query/typed_value.hpp"
namespace query {
// These are the functions for parsing literals and parameter names from
@ -23,4 +25,12 @@ std::string ParseParameter(const std::string &s);
* by some previous part of execution.
*/
enum class GraphView { AS_IS, OLD, NEW };
/**
* Helper function for recursively reconstructing all the accessors in the
* given TypedValue.
*
* @returns - If the reconstruction succeeded.
*/
void ReconstructTypedValue(TypedValue &value);
}

View File

@ -99,4 +99,12 @@ class HintedAbortError : public utils::BasicException {
"--query-execution-time-sec flag") {}
};
class ReconstructionException : public QueryException {
public:
ReconstructionException()
: QueryException(
"Record invalid after WITH clause. Most likely deleted by a "
"preceeding DELETE.") {}
};
} // namespace query

View File

@ -19,6 +19,7 @@
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/interpret/eval.hpp"
#include "query/path.hpp"
#include "utils/exceptions.hpp"
// macro for the default implementation of LogicalOperator::Accept
// that accepts the visitor and visits it's input_ operator
@ -321,10 +322,10 @@ std::unique_ptr<Cursor> ScanAllByLabelPropertyRange::MakeCursor(
context.symbol_table_, db, graph_view_);
auto convert = [&evaluator](const auto &bound)
-> std::experimental::optional<utils::Bound<PropertyValue>> {
if (!bound) return std::experimental::nullopt;
return std::experimental::make_optional(utils::Bound<PropertyValue>(
bound.value().value()->Accept(evaluator), bound.value().type()));
};
if (!bound) return std::experimental::nullopt;
return std::experimental::make_optional(utils::Bound<PropertyValue>(
bound.value().value()->Accept(evaluator), bound.value().type()));
};
return db.Vertices(label_, property_, convert(lower_bound()),
convert(upper_bound()), graph_view_ == GraphView::NEW);
};
@ -1618,47 +1619,6 @@ void ExpandUniquenessFilter<TAccessor>::ExpandUniquenessFilterCursor::Reset() {
template class ExpandUniquenessFilter<VertexAccessor>;
template class ExpandUniquenessFilter<EdgeAccessor>;
namespace {
/**
* Helper function for recursively reconstructing all the accessors in the
* given TypedValue.
*/
void ReconstructTypedValue(TypedValue &value) {
const static std::string vertex_error_msg =
"Vertex invalid after WITH clause, (most likely deleted by a "
"preceeding DELETE clause)";
const static std::string edge_error_msg =
"Edge invalid after WITH clause, (most likely deleted by a "
"preceeding DELETE clause)";
switch (value.type()) {
case TypedValue::Type::Vertex:
if (!value.Value<VertexAccessor>().Reconstruct())
throw QueryRuntimeException(vertex_error_msg);
break;
case TypedValue::Type::Edge:
if (!value.Value<EdgeAccessor>().Reconstruct())
throw QueryRuntimeException(edge_error_msg);
break;
case TypedValue::Type::List:
for (TypedValue &inner_value : value.Value<std::vector<TypedValue>>())
ReconstructTypedValue(inner_value);
break;
case TypedValue::Type::Map:
for (auto &kv : value.Value<std::map<std::string, TypedValue>>())
ReconstructTypedValue(kv.second);
break;
case TypedValue::Type::Path:
for (auto &vertex : value.ValuePath().vertices())
if (vertex.Reconstruct()) throw QueryRuntimeException(vertex_error_msg);
for (auto &edge : value.ValuePath().edges())
if (edge.Reconstruct()) throw QueryRuntimeException(edge_error_msg);
default:
break;
}
}
} // namespace
Accumulate::Accumulate(const std::shared_ptr<LogicalOperator> &input,
const std::vector<Symbol> &symbols, bool advance_command)
: input_(input), symbols_(symbols), advance_command_(advance_command) {}
@ -1690,7 +1650,7 @@ bool Accumulate::AccumulateCursor::Pull(Frame &frame, Context &context) {
if (self_.advance_command_) {
db_.AdvanceCommand();
for (auto &row : cache_)
for (auto &col : row) ReconstructTypedValue(col);
for (auto &col : row) query::ReconstructTypedValue(col);
}
}
@ -2633,7 +2593,20 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
EndRemotePull();
throw mvcc::SerializationError(
"Serialization error occured during PullRemote !");
break;
case distributed::RemotePullState::LOCK_TIMEOUT_ERROR:
EndRemotePull();
throw LockTimeoutException(
"LockTimeout error occured during PullRemote !");
case distributed::RemotePullState::UPDATE_DELETED_ERROR:
EndRemotePull();
throw RecordDeletedError();
case distributed::RemotePullState::RECONSTRUCTION_ERROR:
EndRemotePull();
throw query::ReconstructionException();
case distributed::RemotePullState::QUERY_ERROR:
EndRemotePull();
throw QueryRuntimeException(
"Query runtime error occurred duing PullRemote !");
}
}

View File

@ -452,7 +452,7 @@ TEST(QueryPlan, DeleteAdvance) {
n.op_, std::vector<Expression *>{n_get}, false);
auto advance = std::make_shared<Accumulate>(
delete_op, std::vector<Symbol>{n.sym_}, true);
EXPECT_THROW(PullAll(advance, dba, symbol_table), QueryRuntimeException);
EXPECT_THROW(PullAll(advance, dba, symbol_table), ReconstructionException);
}
TEST(QueryPlan, SetProperty) {