From 905b3ee2dfe9df8727c98953d0737bf7c35f6470 Mon Sep 17 00:00:00 2001 From: Lovro Lugovic <lovro.lugovic@memgraph.io> Date: Wed, 30 Oct 2019 14:05:47 +0100 Subject: [PATCH] Don't unconditionally start a transaction on Prepare Reviewers: teon.banek, mferencevic Reviewed By: teon.banek, mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2532 --- src/query/exceptions.hpp | 11 +- src/query/interpreter.cpp | 838 ++++++++++++++++++------------------- src/query/interpreter.hpp | 36 +- tests/unit/interpreter.cpp | 6 +- 4 files changed, 447 insertions(+), 444 deletions(-) diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index 4511137af..7f64e36e4 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -59,9 +59,9 @@ class TypeMismatchError : public SemanticException { public: TypeMismatchError(const std::string &name, const std::string &datum, const std::string &expected) - : SemanticException(fmt::format( - "Type mismatch: {} already defined as {}, expected {}.", - name, datum, expected)) {} + : SemanticException( + fmt::format("Type mismatch: {} already defined as {}, expected {}.", + name, datum, expected)) {} }; class UnprovidedParameterError : public QueryException { @@ -106,6 +106,11 @@ class HintedAbortError : public utils::BasicException { "--query-execution-time-sec flag.") {} }; +class ExplicitTransactionUsageException : public QueryRuntimeException { + public: + using QueryRuntimeException::QueryRuntimeException; +}; + class ReconstructionException : public QueryException { public: ReconstructionException() diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 2c5984daa..72b868f07 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -35,40 +35,6 @@ DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, namespace query { -#ifndef MG_SINGLE_NODE_HA -namespace { - -class DumpClosure final { - public: - explicit DumpClosure(DbAccessor *dba) : dump_generator_(dba) {} - - // Please note that this copy constructor actually moves the other object. We - // want this because lambdas are not movable, i.e. its move constructor - // actually copies the lambda. - DumpClosure(const DumpClosure &other) - : dump_generator_(std::move(other.dump_generator_)) {} - - DumpClosure(DumpClosure &&other) = default; - DumpClosure &operator=(const DumpClosure &other) = delete; - DumpClosure &operator=(DumpClosure &&other) = delete; - ~DumpClosure() {} - - std::optional<std::vector<TypedValue>> operator()(Frame *frame, - ExecutionContext *context) { - std::ostringstream oss; - if (dump_generator_.NextQuery(&oss)) { - return std::make_optional(std::vector<TypedValue>{TypedValue(oss.str())}); - } - return std::nullopt; - } - - private: - mutable database::CypherDumpGenerator dump_generator_; -}; - -} // namespace -#endif - /** * A container for data related to the parsing of a query. */ @@ -494,261 +460,6 @@ Callback HandleAuthQuery(AuthQuery *auth_query, auth::Auth *auth, } } -Callback HandleIndexQuery(IndexQuery *index_query, - std::function<void()> invalidate_plan_cache, - DbAccessor *db_accessor) { - auto label = db_accessor->NameToLabel(index_query->label_.name); - std::vector<storage::Property> properties; - properties.reserve(index_query->properties_.size()); - for (const auto &prop : index_query->properties_) { - properties.push_back(db_accessor->NameToProperty(prop.name)); - } - - if (properties.size() > 1) { - throw utils::NotYetImplemented("index on multiple properties"); - } - - Callback callback; - switch (index_query->action_) { - case IndexQuery::Action::CREATE: - callback.fn = [label, properties, db_accessor, invalidate_plan_cache] { -#ifdef MG_SINGLE_NODE_V2 - CHECK(properties.size() == 1); - db_accessor->CreateIndex(label, properties[0]); - invalidate_plan_cache(); -#else - try { - CHECK(properties.size() == 1); - db_accessor->CreateIndex(label, properties[0]); - invalidate_plan_cache(); - } catch (const database::ConstraintViolationException &e) { - throw QueryRuntimeException(e.what()); - } catch (const database::IndexExistsException &e) { - // Ignore creating an existing index. - } catch (const database::TransactionException &e) { - throw QueryRuntimeException(e.what()); - } -#endif - return std::vector<std::vector<TypedValue>>(); - }; - return callback; - case IndexQuery::Action::DROP: - callback.fn = [label, properties, db_accessor, invalidate_plan_cache] { -#ifdef MG_SINGLE_NODE_V2 - CHECK(properties.size() == 1); - db_accessor->DropIndex(label, properties[0]); - invalidate_plan_cache(); -#else - try { - CHECK(properties.size() == 1); - db_accessor->DropIndex(label, properties[0]); - invalidate_plan_cache(); - } catch (const database::TransactionException &e) { - throw QueryRuntimeException(e.what()); - } -#endif - return std::vector<std::vector<TypedValue>>(); - }; - return callback; - } -} - -Callback HandleInfoQuery(InfoQuery *info_query, DbAccessor *db_accessor) { - Callback callback; - switch (info_query->info_type_) { - case InfoQuery::InfoType::STORAGE: -#if defined(MG_SINGLE_NODE) - callback.header = {"storage info", "value"}; - callback.fn = [db_accessor] { - auto info = db_accessor->StorageInfo(); - std::vector<std::vector<TypedValue>> results; - results.reserve(info.size()); - for (const auto &pair : info) { - results.push_back({TypedValue(pair.first), TypedValue(pair.second)}); - } - return results; - }; -#elif defined(MG_SINGLE_NODE_HA) - callback.header = {"server id", "storage info", "value"}; - callback.fn = [db_accessor] { - auto info = db_accessor->StorageInfo(); - std::vector<std::vector<TypedValue>> results; - results.reserve(info.size()); - for (const auto &peer_info : info) { - for (const auto &pair : peer_info.second) { - results.push_back({TypedValue(peer_info.first), - TypedValue(pair.first), - TypedValue(pair.second)}); - } - } - return results; - }; -#else - throw utils::NotYetImplemented("storage info"); -#endif - break; - case InfoQuery::InfoType::INDEX: -#ifdef MG_SINGLE_NODE_V2 - throw utils::NotYetImplemented("IndexInfo"); -#else - callback.header = {"created index"}; - callback.fn = [db_accessor] { - auto info = db_accessor->IndexInfo(); - std::vector<std::vector<TypedValue>> results; - results.reserve(info.size()); - for (const auto &index : info) { - results.push_back({TypedValue(index)}); - } - return results; - }; - break; -#endif - case InfoQuery::InfoType::CONSTRAINT: -#ifdef MG_SINGLE_NODE_V2 - throw utils::NotYetImplemented("ConstraintInfo"); -#else - callback.header = {"constraint type", "label", "properties"}; - callback.fn = [db_accessor] { - std::vector<std::vector<TypedValue>> results; - for (auto &e : db_accessor->ListUniqueConstraints()) { - std::vector<std::string> property_names(e.properties.size()); - std::transform(e.properties.begin(), e.properties.end(), - property_names.begin(), [&db_accessor](const auto &p) { - return db_accessor->PropertyToName(p); - }); - - std::vector<TypedValue> constraint{ - TypedValue("unique"), - TypedValue(db_accessor->LabelToName(e.label)), - TypedValue(utils::Join(property_names, ","))}; - - results.emplace_back(constraint); - } - return results; - }; - break; -#endif - case InfoQuery::InfoType::RAFT: -#if defined(MG_SINGLE_NODE_HA) - callback.header = {"info", "value"}; - callback.fn = [db_accessor] { - std::vector<std::vector<TypedValue>> results( - {{TypedValue("is_leader"), - TypedValue(db_accessor->raft()->IsLeader())}, - {TypedValue("term_id"), TypedValue(static_cast<int64_t>( - db_accessor->raft()->TermId()))}}); - return results; - }; - // It is critical to abort this query because it can be executed on - // machines that aren't the leader. - callback.should_abort_query = true; -#else - throw utils::NotYetImplemented("raft info"); -#endif - break; - } - return callback; -} - -Callback HandleConstraintQuery(ConstraintQuery *constraint_query, - DbAccessor *db_accessor) { - std::vector<storage::Property> properties; - auto label = - db_accessor->NameToLabel(constraint_query->constraint_.label.name); - properties.reserve(constraint_query->constraint_.properties.size()); - for (const auto &prop : constraint_query->constraint_.properties) { - properties.push_back(db_accessor->NameToProperty(prop.name)); - } - - Callback callback; - switch (constraint_query->action_type_) { - case ConstraintQuery::ActionType::CREATE: { - switch (constraint_query->constraint_.type) { - case Constraint::Type::NODE_KEY: - throw utils::NotYetImplemented("Node key constraints"); - case Constraint::Type::EXISTS: -#ifdef MG_SINGLE_NODE_V2 - if (properties.empty() || properties.size() > 1) { - throw SyntaxException( - "Exactly one property must be used for existence constraints."); - } - callback.fn = [label, properties, db_accessor] { - auto res = - db_accessor->CreateExistenceConstraint(label, properties[0]); - if (res.HasError()) { - auto violation = res.GetError(); - auto label_name = db_accessor->LabelToName(violation.label); - auto property_name = - db_accessor->PropertyToName(violation.property); - throw QueryRuntimeException( - "Unable to create a constraint :{}({}), because an existing " - "node violates it.", - label_name, property_name); - } - return std::vector<std::vector<TypedValue>>(); - }; - break; -#else - throw utils::NotYetImplemented("Existence constraints"); -#endif - case Constraint::Type::UNIQUE: -#ifdef MG_SINGLE_NODE_V2 - throw utils::NotYetImplemented("Unique constraints"); -#else - callback.fn = [label, properties, db_accessor] { - try { - db_accessor->BuildUniqueConstraint(label, properties); - return std::vector<std::vector<TypedValue>>(); - } catch (const database::ConstraintViolationException &e) { - throw QueryRuntimeException(e.what()); - } catch (const database::TransactionException &e) { - throw QueryRuntimeException(e.what()); - } catch (const mvcc::SerializationError &e) { - throw QueryRuntimeException(e.what()); - } - }; - break; -#endif - } - } break; - case ConstraintQuery::ActionType::DROP: { - switch (constraint_query->constraint_.type) { - case Constraint::Type::NODE_KEY: - throw utils::NotYetImplemented("Node key constraints"); - case Constraint::Type::EXISTS: -#ifdef MG_SINGLE_NODE_V2 - if (properties.empty() || properties.size() > 1) { - throw SyntaxException( - "Exactly one property must be used for existence constraints."); - } - callback.fn = [label, properties, db_accessor] { - db_accessor->DropExistenceConstraint(label, properties[0]); - return std::vector<std::vector<TypedValue>>(); - }; - break; -#else - throw utils::NotYetImplemented("Existence constraints"); -#endif - case Constraint::Type::UNIQUE: -#ifdef MG_SINGLE_NODE_V2 - throw utils::NotYetImplemented("Unique constraints"); -#else - callback.fn = [label, properties, db_accessor] { - try { - db_accessor->DeleteUniqueConstraint(label, properties); - return std::vector<std::vector<TypedValue>>(); - } catch (const database::TransactionException &e) { - throw QueryRuntimeException(e.what()); - } - }; - break; -#endif - } - } break; - } - return callback; -} - Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { CHECK(interpreter_context_) << "Interpreter context must not be NULL"; @@ -861,40 +572,62 @@ std::shared_ptr<CachedPlan> CypherQueryToPlan( .first->second; } -void Interpreter::PrepareTransactionQuery(std::string_view query_upper) { +PreparedQuery Interpreter::PrepareTransactionQuery( + std::string_view query_upper) { + std::function<void()> handler; + if (query_upper == "BEGIN") { - if (in_explicit_transaction_) { - throw QueryException("Nested transactions are not supported."); - } - in_explicit_transaction_ = true; - expect_rollback_ = false; + handler = [this] { + if (in_explicit_transaction_) { + throw ExplicitTransactionUsageException( + "Nested transactions are not supported."); + } + in_explicit_transaction_ = true; + expect_rollback_ = false; + + db_accessor_.emplace(interpreter_context_->db->Access()); + execution_db_accessor_.emplace(&*db_accessor_); + }; } else if (query_upper == "COMMIT") { - if (!in_explicit_transaction_) { - throw QueryException("No current transaction to commit."); - } - if (expect_rollback_) { - throw QueryException( - "Transaction can't be committed because there was a previous " - "error. Please invoke a rollback instead."); - } + handler = [this] { + if (!in_explicit_transaction_) { + throw ExplicitTransactionUsageException( + "No current transaction to commit."); + } + if (expect_rollback_) { + throw ExplicitTransactionUsageException( + "Transaction can't be committed because there was a previous " + "error. Please invoke a rollback instead."); + } - try { - Commit(); - } catch (const utils::BasicException &) { - AbortCommand(); - throw; - } + try { + Commit(); + } catch (const utils::BasicException &) { + AbortCommand(); + throw; + } - expect_rollback_ = false; - in_explicit_transaction_ = false; + expect_rollback_ = false; + in_explicit_transaction_ = false; + }; } else if (query_upper == "ROLLBACK") { - if (!in_explicit_transaction_) { - throw QueryException("No current transaction to rollback."); - } - Abort(); - expect_rollback_ = false; - in_explicit_transaction_ = false; + handler = [this] { + if (!in_explicit_transaction_) { + throw ExplicitTransactionUsageException( + "No current transaction to rollback."); + } + Abort(); + expect_rollback_ = false; + in_explicit_transaction_ = false; + }; + } else { + LOG(FATAL) << "Should not get here -- unknown transaction query!"; } + + return {{}, {}, [handler = std::move(handler)](AnyStream *) { + handler(); + return QueryHandlerResult::NOTHING; + }}; } PreparedQuery PrepareCypherQuery( @@ -930,7 +663,7 @@ PreparedQuery PrepareCypherQuery( execution_memory](AnyStream *stream) { PullAllPlan(stream, *plan, parameters, output_symbols, false, summary, dba, execution_memory); - return true; + return QueryHandlerResult::COMMIT; }}; } @@ -997,7 +730,7 @@ PreparedQuery PrepareExplainQuery( dba, execution_memory](AnyStream *stream) { PullAllPlan(stream, *plan, parameters, output_symbols, false, summary, dba, execution_memory); - return true; + return QueryHandlerResult::COMMIT; }}; } @@ -1097,35 +830,29 @@ PreparedQuery PrepareProfileQuery( "profile", ProfilingStatsToJson(ctx.stats, ctx.profile_execution_time).dump()); - return false; + return QueryHandlerResult::ABORT; }}; } PreparedQuery PrepareDumpQuery( ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary, - InterpreterContext *interpreter_context, DbAccessor *dba, + InterpreterContext *interpreter_context, utils::MonotonicBufferResource *execution_memory) { #ifndef MG_SINGLE_NODE_HA - SymbolTable symbol_table; - auto query_symbol = symbol_table.CreateSymbol("QUERY", false); - - std::vector<Symbol> output_symbols = {query_symbol}; - std::vector<std::string> header = {query_symbol.name()}; - - auto output_plan = std::make_unique<plan::OutputTableStream>( - output_symbols, DumpClosure(dba)); - auto plan = - std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>( - std::move(output_plan), 0.0, AstStorage{}, symbol_table)); - return PreparedQuery{ - std::move(header), std::move(parsed_query.required_privileges), - [plan = std::move(plan), parameters = std::move(parsed_query.parameters), - output_symbols = std::move(output_symbols), summary, dba, - execution_memory](AnyStream *stream) { - PullAllPlan(stream, *plan, parameters, output_symbols, false, summary, - dba, execution_memory); - return true; + {"QUERY"}, + std::move(parsed_query.required_privileges), + [interpreter_context](AnyStream *stream) { + auto dba = interpreter_context->db->Access(); + query::DbAccessor query_dba{&dba}; + std::ostringstream oss; + database::CypherDumpGenerator dump_generator{&query_dba}; + + while (dump_generator.NextQuery(&oss)) { + stream->Result({TypedValue(oss.str())}); + } + + return QueryHandlerResult::NOTHING; }}; #else throw utils::NotYetImplemented("Dump database"); @@ -1142,6 +869,7 @@ PreparedQuery PrepareIndexQuery( } auto *index_query = utils::Downcast<IndexQuery>(parsed_query.query); + std::function<void()> handler; // Creating an index influences computed plan costs. auto invalidate_plan_cache = [plan_cache = &interpreter_context->plan_cache] { @@ -1151,30 +879,86 @@ PreparedQuery PrepareIndexQuery( } }; - auto callback = HandleIndexQuery(index_query, invalidate_plan_cache, dba); +#ifdef MG_SINGLE_NODE_V2 + auto label = interpreter_context->db->NameToLabel(index_query->label_.name); + std::vector<storage::PropertyId> properties; + properties.reserve(index_query->properties_.size()); + for (const auto &prop : index_query->properties_) { + properties.push_back(interpreter_context->db->NameToProperty(prop.name)); + } +#else + auto label = dba->NameToLabel(index_query->label_.name); + std::vector<storage::Property> properties; + properties.reserve(index_query->properties_.size()); + for (const auto &prop : index_query->properties_) { + properties.push_back(dba->NameToProperty(prop.name)); + } +#endif - SymbolTable symbol_table; - std::vector<Symbol> output_symbols; - for (const auto &column : callback.header) { - output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false")); + if (properties.size() > 1) { + throw utils::NotYetImplemented("index on multiple properties"); } - auto plan = - std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>( - std::make_unique<plan::OutputTable>( - output_symbols, - [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), - 0.0, AstStorage{}, symbol_table)); + switch (index_query->action_) { + case IndexQuery::Action::CREATE: { +#ifdef MG_SINGLE_NODE_V2 + handler = [interpreter_context, label, properties = std::move(properties), + invalidate_plan_cache = std::move(invalidate_plan_cache)] { + CHECK(properties.size() == 1); + interpreter_context->db->CreateIndex(label, properties[0]); + invalidate_plan_cache(); + }; +#else + handler = [dba, label, properties = std::move(properties), + invalidate_plan_cache = std::move(invalidate_plan_cache)] { + try { + CHECK(properties.size() == 1); + dba->CreateIndex(label, properties[0]); + invalidate_plan_cache(); + } catch (const database::ConstraintViolationException &e) { + throw QueryRuntimeException(e.what()); + } catch (const database::IndexExistsException &e) { + // Ignore creating an existing index. + } catch (const database::TransactionException &e) { + throw QueryRuntimeException(e.what()); + } + }; +#endif + break; + } + case IndexQuery::Action::DROP: { +#ifdef MG_SINGLE_NODE_V2 + handler = [interpreter_context, label, properties = std::move(properties), + invalidate_plan_cache = std::move(invalidate_plan_cache)] { + CHECK(properties.size() == 1); + interpreter_context->db->DropIndex(label, properties[0]); + invalidate_plan_cache(); + }; +#else + handler = [dba, label, properties = std::move(properties), + invalidate_plan_cache = std::move(invalidate_plan_cache)] { + try { + CHECK(properties.size() == 1); + dba->DropIndex(label, properties[0]); + invalidate_plan_cache(); + } catch (const database::TransactionException &e) { + throw QueryRuntimeException(e.what()); + } + }; +#endif + break; + } + } - return PreparedQuery{callback.header, + return PreparedQuery{{}, std::move(parsed_query.required_privileges), - [callback = std::move(callback), plan = std::move(plan), - parameters = std::move(parsed_query.parameters), - output_symbols = std::move(output_symbols), summary, - dba, execution_memory](AnyStream *stream) { - PullAllPlan(stream, *plan, parameters, output_symbols, - false, summary, dba, execution_memory); - return !callback.should_abort_query; + [handler = std::move(handler)](AnyStream *stream) { + handler(); +#ifdef MG_SINGLE_NODE_V2 + return QueryHandlerResult::NOTHING; +#else + return QueryHandlerResult::COMMIT; +#endif }}; } @@ -1210,17 +994,17 @@ PreparedQuery PrepareAuthQuery( [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), 0.0, AstStorage{}, symbol_table)); - return PreparedQuery{callback.header, - std::move(parsed_query.required_privileges), - - [callback = std::move(callback), plan = std::move(plan), - parameters = std::move(parsed_query.parameters), - output_symbols = std::move(output_symbols), summary, - dba, execution_memory](AnyStream *stream) { - PullAllPlan(stream, *plan, parameters, output_symbols, - false, summary, dba, execution_memory); - return !callback.should_abort_query; - }}; + return PreparedQuery{ + callback.header, std::move(parsed_query.required_privileges), + [callback = std::move(callback), plan = std::move(plan), + parameters = std::move(parsed_query.parameters), + output_symbols = std::move(output_symbols), summary, dba, + execution_memory](AnyStream *stream) { + PullAllPlan(stream, *plan, parameters, output_symbols, false, summary, + dba, execution_memory); + return callback.should_abort_query ? QueryHandlerResult::ABORT + : QueryHandlerResult::COMMIT; + }}; #endif } @@ -1229,31 +1013,110 @@ PreparedQuery PrepareInfoQuery( InterpreterContext *interpreter_context, DbAccessor *dba, utils::MonotonicBufferResource *execution_memory) { auto *info_query = utils::Downcast<InfoQuery>(parsed_query.query); + std::vector<std::string> header; + std::function< + std::pair<std::vector<std::vector<TypedValue>>, QueryHandlerResult>()> + handler; - auto callback = HandleInfoQuery(info_query, dba); + switch (info_query->info_type_) { + case InfoQuery::InfoType::STORAGE: +#if defined(MG_SINGLE_NODE) + header = {"storage info", "value"}; + handler = [dba] { + auto info = dba->StorageInfo(); + std::vector<std::vector<TypedValue>> results; + results.reserve(info.size()); + for (const auto &pair : info) { + results.push_back({TypedValue(pair.first), TypedValue(pair.second)}); + } + return std::pair{results, QueryHandlerResult::COMMIT}; + }; +#elif defined(MG_SINGLE_NODE_HA) + header = {"server id", "storage info", "value"}; + handler = [dba] { + auto info = dba->StorageInfo(); + std::vector<std::vector<TypedValue>> results; + results.reserve(info.size()); + for (const auto &peer_info : info) { + for (const auto &pair : peer_info.second) { + results.push_back({TypedValue(peer_info.first), + TypedValue(pair.first), + TypedValue(pair.second)}); + } + } + return std::pair{results, QueryHandlerResult::COMMIT}; + }; +#else + throw utils::NotYetImplemented("storage info"); +#endif + break; + case InfoQuery::InfoType::INDEX: +#ifdef MG_SINGLE_NODE_V2 + throw utils::NotYetImplemented("IndexInfo"); +#else + header = {"created index"}; + handler = [dba] { + auto info = dba->IndexInfo(); + std::vector<std::vector<TypedValue>> results; + results.reserve(info.size()); + for (const auto &index : info) { + results.push_back({TypedValue(index)}); + } + return std::pair{results, QueryHandlerResult::COMMIT}; + }; + break; +#endif + case InfoQuery::InfoType::CONSTRAINT: +#ifdef MG_SINGLE_NODE_V2 + throw utils::NotYetImplemented("ConstraintInfo"); +#else + header = {"constraint type", "label", "properties"}; + handler = [dba] { + std::vector<std::vector<TypedValue>> results; + for (auto &e : dba->ListUniqueConstraints()) { + std::vector<std::string> property_names(e.properties.size()); + std::transform( + e.properties.begin(), e.properties.end(), property_names.begin(), + [dba](const auto &p) { return dba->PropertyToName(p); }); - SymbolTable symbol_table; - std::vector<Symbol> output_symbols; - for (const auto &column : callback.header) { - output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false")); + std::vector<TypedValue> constraint{ + TypedValue("unique"), TypedValue(dba->LabelToName(e.label)), + TypedValue(utils::Join(property_names, ","))}; + + results.emplace_back(constraint); + } + return std::pair{results, QueryHandlerResult::COMMIT}; + }; + break; +#endif + case InfoQuery::InfoType::RAFT: +#if defined(MG_SINGLE_NODE_HA) + header = {"info", "value"}; + handler = [dba] { + std::vector<std::vector<TypedValue>> results( + {{TypedValue("is_leader"), TypedValue(dba->raft()->IsLeader())}, + {TypedValue("term_id"), + TypedValue(static_cast<int64_t>(dba->raft()->TermId()))}}); + // It is critical to abort this query because it can be executed on + // machines that aren't the leader. + return std::pair{results, QueryHandlerResult::ABORT}; + }; +#else + throw utils::NotYetImplemented("raft info"); +#endif + break; } - auto plan = - std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>( - std::make_unique<plan::OutputTable>( - output_symbols, - [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), - 0.0, AstStorage{}, symbol_table)); - - return PreparedQuery{callback.header, + return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges), - [callback = std::move(callback), plan = std::move(plan), - parameters = std::move(parsed_query.parameters), - output_symbols = std::move(output_symbols), summary, - dba, execution_memory](AnyStream *stream) { - PullAllPlan(stream, *plan, parameters, output_symbols, - false, summary, dba, execution_memory); - return !callback.should_abort_query; + [handler = std::move(handler)](AnyStream *stream) { + auto [results, action] = handler(); + + for (const auto &result : results) { + stream->Result(result); + } + + return action; }}; } @@ -1262,31 +1125,118 @@ PreparedQuery PrepareConstraintQuery( InterpreterContext *interpreter_context, DbAccessor *dba, utils::MonotonicBufferResource *execution_memory) { auto *constraint_query = utils::Downcast<ConstraintQuery>(parsed_query.query); + std::function<void()> handler; - auto callback = HandleConstraintQuery(constraint_query, dba); +#ifdef MG_SINGLE_NODE_V2 + auto label = interpreter_context->db->NameToLabel( + constraint_query->constraint_.label.name); + std::vector<storage::PropertyId> properties; + properties.reserve(constraint_query->constraint_.properties.size()); + for (const auto &prop : constraint_query->constraint_.properties) { + properties.push_back(interpreter_context->db->NameToProperty(prop.name)); + } +#else + auto label = dba->NameToLabel(constraint_query->constraint_.label.name); + std::vector<storage::Property> properties; + properties.reserve(constraint_query->constraint_.properties.size()); + for (const auto &prop : constraint_query->constraint_.properties) { + properties.push_back(dba->NameToProperty(prop.name)); + } +#endif - SymbolTable symbol_table; - std::vector<Symbol> output_symbols; - for (const auto &column : callback.header) { - output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false")); + switch (constraint_query->action_type_) { + case ConstraintQuery::ActionType::CREATE: { + switch (constraint_query->constraint_.type) { + case Constraint::Type::NODE_KEY: + throw utils::NotYetImplemented("Node key constraints"); + case Constraint::Type::EXISTS: +#ifdef MG_SINGLE_NODE_V2 + if (properties.empty() || properties.size() > 1) { + throw SyntaxException( + "Exactly one property must be used for existence constraints."); + } + handler = [interpreter_context, label, + properties = std::move(properties)] { + auto res = interpreter_context->db->CreateExistenceConstraint( + label, properties[0]); + if (res.HasError()) { + auto violation = res.GetError(); + auto label_name = + interpreter_context->db->LabelToName(violation.label); + auto property_name = + interpreter_context->db->PropertyToName(violation.property); + throw QueryRuntimeException( + "Unable to create a constraint :{}({}), because an existing " + "node violates it.", + label_name, property_name); + } + }; + break; +#else + throw utils::NotYetImplemented("Existence constraints"); +#endif + case Constraint::Type::UNIQUE: +#ifdef MG_SINGLE_NODE_V2 + throw utils::NotYetImplemented("Unique constraints"); +#else + handler = [dba, label, properties = std::move(properties)] { + try { + dba->BuildUniqueConstraint(label, properties); + } catch (const database::ConstraintViolationException &e) { + throw QueryRuntimeException(e.what()); + } catch (const database::TransactionException &e) { + throw QueryRuntimeException(e.what()); + } catch (const mvcc::SerializationError &e) { + throw QueryRuntimeException(e.what()); + } + }; + break; +#endif + } + } break; + case ConstraintQuery::ActionType::DROP: { + switch (constraint_query->constraint_.type) { + case Constraint::Type::NODE_KEY: + throw utils::NotYetImplemented("Node key constraints"); + case Constraint::Type::EXISTS: +#ifdef MG_SINGLE_NODE_V2 + if (properties.empty() || properties.size() > 1) { + throw SyntaxException( + "Exactly one property must be used for existence constraints."); + } + handler = [interpreter_context, label, + properties = std::move(properties)] { + interpreter_context->db->DropExistenceConstraint(label, + properties[0]); + return std::vector<std::vector<TypedValue>>(); + }; + break; +#else + throw utils::NotYetImplemented("Existence constraints"); +#endif + case Constraint::Type::UNIQUE: +#ifdef MG_SINGLE_NODE_V2 + throw utils::NotYetImplemented("Unique constraints"); +#else + handler = [dba, label, properties = std::move(properties)] { + try { + dba->DeleteUniqueConstraint(label, properties); + return std::vector<std::vector<TypedValue>>(); + } catch (const database::TransactionException &e) { + throw QueryRuntimeException(e.what()); + } + }; + break; +#endif + } + } break; } - auto plan = - std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>( - std::make_unique<plan::OutputTable>( - output_symbols, - [fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }), - 0.0, AstStorage{}, symbol_table)); - - return PreparedQuery{callback.header, + return PreparedQuery{{}, std::move(parsed_query.required_privileges), - [callback = std::move(callback), plan = std::move(plan), - parameters = std::move(parsed_query.parameters), - output_symbols = std::move(output_symbols), summary, - dba, execution_memory](AnyStream *stream) { - PullAllPlan(stream, *plan, parameters, output_symbols, - false, summary, dba, execution_memory); - return !callback.should_abort_query; + [handler = std::move(handler)](AnyStream *stream) { + handler(); + return QueryHandlerResult::COMMIT; }}; } @@ -1302,20 +1252,19 @@ Interpreter::Prepare(const std::string &query_string, if (query_upper == "BEGIN" || query_upper == "COMMIT" || query_upper == "ROLLBACK") { - PrepareTransactionQuery(query_upper); - return {{}, {}}; + prepared_query_ = PrepareTransactionQuery(query_upper); + return {prepared_query_->header, prepared_query_->privileges}; } // All queries other than transaction control queries advance the command in // an explicit transaction block. - if (in_explicit_transaction_ && db_accessor_) { + if (in_explicit_transaction_) { AdvanceCommand(); } - - // Create a database accessor if we don't yet have one. - if (!db_accessor_) { - db_accessor_.emplace(interpreter_context_->db->Access()); - execution_db_accessor_.emplace(&*db_accessor_); + // If we're not in an explicit transaction block and we have an open + // transaction, abort it since we're about to prepare a new query. + else if (db_accessor_) { + AbortCommand(); } try { @@ -1338,6 +1287,24 @@ Interpreter::Prepare(const std::string &query_string, &interpreter_context_->antlr_lock); summary_["parsing_time"] = parsing_timer.Elapsed().count(); + // Some queries require an active transaction in order to be prepared. +#ifdef MG_SINGLE_NODE_V2 + if (!in_explicit_transaction_ && + !utils::Downcast<IndexQuery>(parsed_query.query) && + !utils::Downcast<DumpQuery>(parsed_query.query) && + !utils::Downcast<ConstraintQuery>(parsed_query.query) && + !utils::Downcast<InfoQuery>(parsed_query.query)) { + db_accessor_.emplace(interpreter_context_->db->Access()); + execution_db_accessor_.emplace(&*db_accessor_); + } +#else + if (!in_explicit_transaction_ && + !utils::Downcast<DumpQuery>(parsed_query.query)) { + db_accessor_.emplace(interpreter_context_->db->Access()); + execution_db_accessor_.emplace(&*db_accessor_); + } +#endif + #ifdef MG_SINGLE_NODE_HA { InfoQuery *info_query = nullptr; @@ -1365,25 +1332,40 @@ Interpreter::Prepare(const std::string &query_string, std::move(parsed_query), in_explicit_transaction_, &summary_, interpreter_context_, &*execution_db_accessor_, &execution_memory_); } else if (utils::Downcast<DumpQuery>(parsed_query.query)) { - prepared_query = PrepareDumpQuery( - std::move(parsed_query), &summary_, interpreter_context_, - &*execution_db_accessor_, &execution_memory_); + prepared_query = + PrepareDumpQuery(std::move(parsed_query), &summary_, + interpreter_context_, &execution_memory_); } else if (utils::Downcast<IndexQuery>(parsed_query.query)) { +#ifdef MG_SINGLE_NODE_V2 + DbAccessor *dba = nullptr; +#else + auto dba = &*execution_db_accessor_; +#endif prepared_query = PrepareIndexQuery( std::move(parsed_query), in_explicit_transaction_, &summary_, - interpreter_context_, &*execution_db_accessor_, &execution_memory_); + interpreter_context_, dba, &execution_memory_); } else if (utils::Downcast<AuthQuery>(parsed_query.query)) { prepared_query = PrepareAuthQuery( std::move(parsed_query), in_explicit_transaction_, &summary_, interpreter_context_, &*execution_db_accessor_, &execution_memory_); } else if (utils::Downcast<InfoQuery>(parsed_query.query)) { - prepared_query = PrepareInfoQuery( - std::move(parsed_query), &summary_, interpreter_context_, - &*execution_db_accessor_, &execution_memory_); +#ifdef MG_SINGLE_NODE_V2 + DbAccessor *dba = nullptr; +#else + auto dba = &*execution_db_accessor_; +#endif + prepared_query = + PrepareInfoQuery(std::move(parsed_query), &summary_, + interpreter_context_, dba, &execution_memory_); } else if (utils::Downcast<ConstraintQuery>(parsed_query.query)) { - prepared_query = PrepareConstraintQuery( - std::move(parsed_query), &summary_, interpreter_context_, - &*execution_db_accessor_, &execution_memory_); +#ifdef MG_SINGLE_NODE_V2 + DbAccessor *dba = nullptr; +#else + auto dba = &*execution_db_accessor_; +#endif + prepared_query = + PrepareConstraintQuery(std::move(parsed_query), &summary_, + interpreter_context_, dba, &execution_memory_); } else { LOG(FATAL) << "Should not get here -- unknown query type!"; } diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 3deb96b2c..ea58f04d7 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -28,6 +28,8 @@ namespace query { static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U; +enum class QueryHandlerResult { COMMIT, ABORT, NOTHING }; + /** * `AnyStream` can wrap *any* type implementing the `Stream` concept into a * single type. @@ -79,7 +81,7 @@ class AnyStream final { struct PreparedQuery { std::vector<std::string> header; std::vector<AuthQuery::Privilege> privileges; - std::function<bool(AnyStream *stream)> query_handler; + std::function<QueryHandlerResult(AnyStream *stream)> query_handler; }; // TODO: Maybe this should move to query/plan/planner. @@ -252,7 +254,7 @@ class Interpreter final { bool expect_rollback_{false}; utils::MonotonicBufferResource execution_memory_{kExecutionMemoryBlockSize}; - void PrepareTransactionQuery(std::string_view query_upper); + PreparedQuery PrepareTransactionQuery(std::string_view query_upper); void Commit(); void AdvanceCommand(); void AbortCommand(); @@ -260,23 +262,37 @@ class Interpreter final { template <typename TStream> std::map<std::string, TypedValue> Interpreter::PullAll(TStream *result_stream) { - // If we don't have any results (eg. a transaction command preceeded), - // return an empty summary. - if (!prepared_query_) return {}; + CHECK(prepared_query_) << "Trying to call PullAll without a prepared query"; try { // Wrap the (statically polymorphic) stream type into a common type which // the handler knows. AnyStream stream{result_stream, &execution_memory_}; - bool commit = prepared_query_->query_handler(&stream); + QueryHandlerResult res = prepared_query_->query_handler(&stream); + // Erase the prepared query in order to enforce that every call to `PullAll` + // must be preceded by a call to `Prepare`. + prepared_query_ = std::nullopt; if (!in_explicit_transaction_) { - if (commit) { - Commit(); - } else { - Abort(); + switch (res) { + case QueryHandlerResult::COMMIT: + Commit(); + break; + case QueryHandlerResult::ABORT: + Abort(); + break; + case QueryHandlerResult::NOTHING: + // The only cases in which we have nothing to do are those where we're + // either in an explicit transaction or the query is such that a + // transaction wasn't started on a call to `Prepare()`. + CHECK(in_explicit_transaction_ || !db_accessor_); + break; } } + } catch (const ExplicitTransactionUsageException &) { + // Just let the exception propagate for error reporting purposes, but don't + // abort the current command. + throw; #ifdef MG_SINGLE_NODE_HA } catch (const query::HintedAbortError &) { AbortCommand(); diff --git a/tests/unit/interpreter.cpp b/tests/unit/interpreter.cpp index c34d018d6..df97b6c3a 100644 --- a/tests/unit/interpreter.cpp +++ b/tests/unit/interpreter.cpp @@ -272,10 +272,10 @@ TEST_F(InterpreterTest, Bfs) { } TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) { - interpreter_.Prepare("BEGIN", {}); - ASSERT_THROW(interpreter_.Prepare("CREATE INDEX ON :X(y)", {}), + Interpret("BEGIN"); + ASSERT_THROW(Interpret("CREATE INDEX ON :X(y)"), query::IndexInMulticommandTxException); - interpreter_.Prepare("ROLLBACK", {}); + Interpret("ROLLBACK"); } // Test shortest path end to end.