From 45da645a4b825b087ae07eb88ea60472026de8b0 Mon Sep 17 00:00:00 2001 From: florijan Date: Thu, 8 Feb 2018 13:27:07 +0100 Subject: [PATCH] Fix remote pull Summary: - Extracted TypedValue reconstruction into utils (will need that soon) - Added more error handling in RemotePull Reviewers: teon.banek, msantl Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1183 --- src/distributed/remote_produce_rpc_server.hpp | 37 ++++++---- .../remote_pull_produce_rpc_messages.hpp | 7 +- src/query/common.cpp | 33 +++++++++ src/query/common.hpp | 10 +++ src/query/exceptions.hpp | 8 +++ src/query/plan/operator.cpp | 67 ++++++------------- .../query_plan_create_set_remove_delete.cpp | 2 +- 7 files changed, 101 insertions(+), 63 deletions(-) diff --git a/src/distributed/remote_produce_rpc_server.hpp b/src/distributed/remote_produce_rpc_server.hpp index 2f97654f2..dde7bdb41 100644 --- a/src/distributed/remote_produce_rpc_server.hpp +++ b/src/distributed/remote_produce_rpc_server.hpp @@ -12,6 +12,7 @@ #include "distributed/plan_consumer.hpp" #include "distributed/remote_pull_produce_rpc_messages.hpp" #include "query/context.hpp" +#include "query/exceptions.hpp" #include "query/frontend/semantic/symbol_table.hpp" #include "query/interpret/frame.hpp" #include "query/parameters.hpp" @@ -47,16 +48,30 @@ class RemoteProduceRpcServer { /** Returns a vector of typed values (one for each `pull_symbol`), and a * `bool` indicating if the pull was successful (or the cursor is * exhausted). */ - std::pair, bool> Pull() { + std::pair, RemotePullState> Pull() { + RemotePullState state = RemotePullState::CURSOR_IN_PROGRESS; std::vector results; - auto success = cursor_->Pull(frame_, context_); - if (success) { - results.reserve(pull_symbols_.size()); - for (const auto &symbol : pull_symbols_) { - results.emplace_back(std::move(frame_[symbol])); + try { + if (cursor_->Pull(frame_, context_)) { + results.reserve(pull_symbols_.size()); + for (const auto &symbol : pull_symbols_) { + results.emplace_back(std::move(frame_[symbol])); + } + } else { + state = RemotePullState::CURSOR_EXHAUSTED; } + } catch (const mvcc::SerializationError &) { + state = RemotePullState::SERIALIZATION_ERROR; + } catch (const LockTimeoutException &) { + state = RemotePullState::LOCK_TIMEOUT_ERROR; + } catch (const RecordDeletedError &) { + state = RemotePullState::UPDATE_DELETED_ERROR; + } catch (const query::ReconstructionException &) { + state = RemotePullState::RECONSTRUCTION_ERROR; + } catch (const query::QueryRuntimeException &) { + state = RemotePullState::QUERY_ERROR; } - return std::make_pair(std::move(results), success); + return std::make_pair(std::move(results), state); } private: @@ -123,17 +138,13 @@ class RemoteProduceRpcServer { auto &ongoing_produce = GetOngoingProduce(req); RemotePullResData result{db_.WorkerId(), req.send_old, req.send_new}; - result.state_and_frames.pull_state = RemotePullState::CURSOR_IN_PROGRESS; for (int i = 0; i < req.batch_size; ++i) { - // TODO exception handling (Serialization errors) - // when full CRUD. Maybe put it in OngoingProduce::Pull auto pull_result = ongoing_produce.Pull(); - if (!pull_result.second) { - result.state_and_frames.pull_state = RemotePullState::CURSOR_EXHAUSTED; + result.state_and_frames.pull_state = pull_result.second; + if (pull_result.second != RemotePullState::CURSOR_IN_PROGRESS) break; - } result.state_and_frames.frames.emplace_back(std::move(pull_result.first)); } diff --git a/src/distributed/remote_pull_produce_rpc_messages.hpp b/src/distributed/remote_pull_produce_rpc_messages.hpp index 8c8233073..7b7f9f488 100644 --- a/src/distributed/remote_pull_produce_rpc_messages.hpp +++ b/src/distributed/remote_pull_produce_rpc_messages.hpp @@ -26,8 +26,11 @@ constexpr int kDefaultBatchSize = 20; enum class RemotePullState { CURSOR_EXHAUSTED, CURSOR_IN_PROGRESS, - SERIALIZATION_ERROR // future-proofing for full CRUD - // TODO in full CRUD other errors + SERIALIZATION_ERROR, + LOCK_TIMEOUT_ERROR, + UPDATE_DELETED_ERROR, + RECONSTRUCTION_ERROR, + QUERY_ERROR }; const std::string kRemotePullProduceRpcName = "RemotePullProduceRpc"; diff --git a/src/query/common.cpp b/src/query/common.cpp index 617c36288..e58d6a0e4 100644 --- a/src/query/common.cpp +++ b/src/query/common.cpp @@ -180,4 +180,37 @@ std::string ParseParameter(const std::string &s) { } return out; } + +void ReconstructTypedValue(TypedValue &value) { + using Type = TypedValue::Type; + switch (value.type()) { + case Type::Vertex: + if (!value.ValueVertex().Reconstruct()) throw ReconstructionException(); + break; + case Type::Edge: + if (!value.ValueEdge().Reconstruct()) throw ReconstructionException(); + break; + case Type::List: + for (TypedValue &inner_value : value.Value>()) + ReconstructTypedValue(inner_value); + break; + case Type::Map: + for (auto &kv : value.Value>()) + ReconstructTypedValue(kv.second); + break; + case Type::Path: + for (auto &vertex : value.ValuePath().vertices()) { + if (!vertex.Reconstruct()) throw ReconstructionException(); + } + for (auto &edge : value.ValuePath().edges()) { + if (!edge.Reconstruct()) throw ReconstructionException(); + } + case Type::Null: + case Type::Bool: + case Type::Int: + case Type::Double: + case Type::String: + break; + } +} } // namespace query diff --git a/src/query/common.hpp b/src/query/common.hpp index fd50b2102..d98c2ba4e 100644 --- a/src/query/common.hpp +++ b/src/query/common.hpp @@ -3,6 +3,8 @@ #include #include +#include "query/typed_value.hpp" + namespace query { // These are the functions for parsing literals and parameter names from @@ -23,4 +25,12 @@ std::string ParseParameter(const std::string &s); * by some previous part of execution. */ enum class GraphView { AS_IS, OLD, NEW }; + +/** + * Helper function for recursively reconstructing all the accessors in the + * given TypedValue. + * + * @returns - If the reconstruction succeeded. + */ +void ReconstructTypedValue(TypedValue &value); } diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index 82028df9e..51cb20ac1 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -99,4 +99,12 @@ class HintedAbortError : public utils::BasicException { "--query-execution-time-sec flag") {} }; +class ReconstructionException : public QueryException { + public: + ReconstructionException() + : QueryException( + "Record invalid after WITH clause. Most likely deleted by a " + "preceeding DELETE.") {} +}; + } // namespace query diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 784050309..37ac18fdf 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -19,6 +19,7 @@ #include "query/frontend/semantic/symbol_table.hpp" #include "query/interpret/eval.hpp" #include "query/path.hpp" +#include "utils/exceptions.hpp" // macro for the default implementation of LogicalOperator::Accept // that accepts the visitor and visits it's input_ operator @@ -321,10 +322,10 @@ std::unique_ptr ScanAllByLabelPropertyRange::MakeCursor( context.symbol_table_, db, graph_view_); auto convert = [&evaluator](const auto &bound) -> std::experimental::optional> { - if (!bound) return std::experimental::nullopt; - return std::experimental::make_optional(utils::Bound( - bound.value().value()->Accept(evaluator), bound.value().type())); - }; + if (!bound) return std::experimental::nullopt; + return std::experimental::make_optional(utils::Bound( + bound.value().value()->Accept(evaluator), bound.value().type())); + }; return db.Vertices(label_, property_, convert(lower_bound()), convert(upper_bound()), graph_view_ == GraphView::NEW); }; @@ -1618,47 +1619,6 @@ void ExpandUniquenessFilter::ExpandUniquenessFilterCursor::Reset() { template class ExpandUniquenessFilter; template class ExpandUniquenessFilter; -namespace { - -/** - * Helper function for recursively reconstructing all the accessors in the - * given TypedValue. - */ -void ReconstructTypedValue(TypedValue &value) { - const static std::string vertex_error_msg = - "Vertex invalid after WITH clause, (most likely deleted by a " - "preceeding DELETE clause)"; - const static std::string edge_error_msg = - "Edge invalid after WITH clause, (most likely deleted by a " - "preceeding DELETE clause)"; - switch (value.type()) { - case TypedValue::Type::Vertex: - if (!value.Value().Reconstruct()) - throw QueryRuntimeException(vertex_error_msg); - break; - case TypedValue::Type::Edge: - if (!value.Value().Reconstruct()) - throw QueryRuntimeException(edge_error_msg); - break; - case TypedValue::Type::List: - for (TypedValue &inner_value : value.Value>()) - ReconstructTypedValue(inner_value); - break; - case TypedValue::Type::Map: - for (auto &kv : value.Value>()) - ReconstructTypedValue(kv.second); - break; - case TypedValue::Type::Path: - for (auto &vertex : value.ValuePath().vertices()) - if (vertex.Reconstruct()) throw QueryRuntimeException(vertex_error_msg); - for (auto &edge : value.ValuePath().edges()) - if (edge.Reconstruct()) throw QueryRuntimeException(edge_error_msg); - default: - break; - } -} -} // namespace - Accumulate::Accumulate(const std::shared_ptr &input, const std::vector &symbols, bool advance_command) : input_(input), symbols_(symbols), advance_command_(advance_command) {} @@ -1690,7 +1650,7 @@ bool Accumulate::AccumulateCursor::Pull(Frame &frame, Context &context) { if (self_.advance_command_) { db_.AdvanceCommand(); for (auto &row : cache_) - for (auto &col : row) ReconstructTypedValue(col); + for (auto &col : row) query::ReconstructTypedValue(col); } } @@ -2633,7 +2593,20 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { EndRemotePull(); throw mvcc::SerializationError( "Serialization error occured during PullRemote !"); - break; + case distributed::RemotePullState::LOCK_TIMEOUT_ERROR: + EndRemotePull(); + throw LockTimeoutException( + "LockTimeout error occured during PullRemote !"); + case distributed::RemotePullState::UPDATE_DELETED_ERROR: + EndRemotePull(); + throw RecordDeletedError(); + case distributed::RemotePullState::RECONSTRUCTION_ERROR: + EndRemotePull(); + throw query::ReconstructionException(); + case distributed::RemotePullState::QUERY_ERROR: + EndRemotePull(); + throw QueryRuntimeException( + "Query runtime error occurred duing PullRemote !"); } } diff --git a/tests/unit/query_plan_create_set_remove_delete.cpp b/tests/unit/query_plan_create_set_remove_delete.cpp index ff1817759..56afc8f92 100644 --- a/tests/unit/query_plan_create_set_remove_delete.cpp +++ b/tests/unit/query_plan_create_set_remove_delete.cpp @@ -452,7 +452,7 @@ TEST(QueryPlan, DeleteAdvance) { n.op_, std::vector{n_get}, false); auto advance = std::make_shared( delete_op, std::vector{n.sym_}, true); - EXPECT_THROW(PullAll(advance, dba, symbol_table), QueryRuntimeException); + EXPECT_THROW(PullAll(advance, dba, symbol_table), ReconstructionException); } TEST(QueryPlan, SetProperty) {