Don't unconditionally start a transaction on Prepare

Reviewers: teon.banek, mferencevic

Reviewed By: teon.banek, mferencevic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2532
This commit is contained in:
Lovro Lugovic 2019-10-30 14:05:47 +01:00
parent a850644f5d
commit 905b3ee2df
4 changed files with 447 additions and 444 deletions

View File

@ -59,9 +59,9 @@ class TypeMismatchError : public SemanticException {
public:
TypeMismatchError(const std::string &name, const std::string &datum,
const std::string &expected)
: SemanticException(fmt::format(
"Type mismatch: {} already defined as {}, expected {}.",
name, datum, expected)) {}
: SemanticException(
fmt::format("Type mismatch: {} already defined as {}, expected {}.",
name, datum, expected)) {}
};
class UnprovidedParameterError : public QueryException {
@ -106,6 +106,11 @@ class HintedAbortError : public utils::BasicException {
"--query-execution-time-sec flag.") {}
};
class ExplicitTransactionUsageException : public QueryRuntimeException {
public:
using QueryRuntimeException::QueryRuntimeException;
};
class ReconstructionException : public QueryException {
public:
ReconstructionException()

View File

@ -35,40 +35,6 @@ DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60,
namespace query {
#ifndef MG_SINGLE_NODE_HA
namespace {
class DumpClosure final {
public:
explicit DumpClosure(DbAccessor *dba) : dump_generator_(dba) {}
// Please note that this copy constructor actually moves the other object. We
// want this because lambdas are not movable, i.e. its move constructor
// actually copies the lambda.
DumpClosure(const DumpClosure &other)
: dump_generator_(std::move(other.dump_generator_)) {}
DumpClosure(DumpClosure &&other) = default;
DumpClosure &operator=(const DumpClosure &other) = delete;
DumpClosure &operator=(DumpClosure &&other) = delete;
~DumpClosure() {}
std::optional<std::vector<TypedValue>> operator()(Frame *frame,
ExecutionContext *context) {
std::ostringstream oss;
if (dump_generator_.NextQuery(&oss)) {
return std::make_optional(std::vector<TypedValue>{TypedValue(oss.str())});
}
return std::nullopt;
}
private:
mutable database::CypherDumpGenerator dump_generator_;
};
} // namespace
#endif
/**
* A container for data related to the parsing of a query.
*/
@ -494,261 +460,6 @@ Callback HandleAuthQuery(AuthQuery *auth_query, auth::Auth *auth,
}
}
Callback HandleIndexQuery(IndexQuery *index_query,
std::function<void()> invalidate_plan_cache,
DbAccessor *db_accessor) {
auto label = db_accessor->NameToLabel(index_query->label_.name);
std::vector<storage::Property> properties;
properties.reserve(index_query->properties_.size());
for (const auto &prop : index_query->properties_) {
properties.push_back(db_accessor->NameToProperty(prop.name));
}
if (properties.size() > 1) {
throw utils::NotYetImplemented("index on multiple properties");
}
Callback callback;
switch (index_query->action_) {
case IndexQuery::Action::CREATE:
callback.fn = [label, properties, db_accessor, invalidate_plan_cache] {
#ifdef MG_SINGLE_NODE_V2
CHECK(properties.size() == 1);
db_accessor->CreateIndex(label, properties[0]);
invalidate_plan_cache();
#else
try {
CHECK(properties.size() == 1);
db_accessor->CreateIndex(label, properties[0]);
invalidate_plan_cache();
} catch (const database::ConstraintViolationException &e) {
throw QueryRuntimeException(e.what());
} catch (const database::IndexExistsException &e) {
// Ignore creating an existing index.
} catch (const database::TransactionException &e) {
throw QueryRuntimeException(e.what());
}
#endif
return std::vector<std::vector<TypedValue>>();
};
return callback;
case IndexQuery::Action::DROP:
callback.fn = [label, properties, db_accessor, invalidate_plan_cache] {
#ifdef MG_SINGLE_NODE_V2
CHECK(properties.size() == 1);
db_accessor->DropIndex(label, properties[0]);
invalidate_plan_cache();
#else
try {
CHECK(properties.size() == 1);
db_accessor->DropIndex(label, properties[0]);
invalidate_plan_cache();
} catch (const database::TransactionException &e) {
throw QueryRuntimeException(e.what());
}
#endif
return std::vector<std::vector<TypedValue>>();
};
return callback;
}
}
Callback HandleInfoQuery(InfoQuery *info_query, DbAccessor *db_accessor) {
Callback callback;
switch (info_query->info_type_) {
case InfoQuery::InfoType::STORAGE:
#if defined(MG_SINGLE_NODE)
callback.header = {"storage info", "value"};
callback.fn = [db_accessor] {
auto info = db_accessor->StorageInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(info.size());
for (const auto &pair : info) {
results.push_back({TypedValue(pair.first), TypedValue(pair.second)});
}
return results;
};
#elif defined(MG_SINGLE_NODE_HA)
callback.header = {"server id", "storage info", "value"};
callback.fn = [db_accessor] {
auto info = db_accessor->StorageInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(info.size());
for (const auto &peer_info : info) {
for (const auto &pair : peer_info.second) {
results.push_back({TypedValue(peer_info.first),
TypedValue(pair.first),
TypedValue(pair.second)});
}
}
return results;
};
#else
throw utils::NotYetImplemented("storage info");
#endif
break;
case InfoQuery::InfoType::INDEX:
#ifdef MG_SINGLE_NODE_V2
throw utils::NotYetImplemented("IndexInfo");
#else
callback.header = {"created index"};
callback.fn = [db_accessor] {
auto info = db_accessor->IndexInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(info.size());
for (const auto &index : info) {
results.push_back({TypedValue(index)});
}
return results;
};
break;
#endif
case InfoQuery::InfoType::CONSTRAINT:
#ifdef MG_SINGLE_NODE_V2
throw utils::NotYetImplemented("ConstraintInfo");
#else
callback.header = {"constraint type", "label", "properties"};
callback.fn = [db_accessor] {
std::vector<std::vector<TypedValue>> results;
for (auto &e : db_accessor->ListUniqueConstraints()) {
std::vector<std::string> property_names(e.properties.size());
std::transform(e.properties.begin(), e.properties.end(),
property_names.begin(), [&db_accessor](const auto &p) {
return db_accessor->PropertyToName(p);
});
std::vector<TypedValue> constraint{
TypedValue("unique"),
TypedValue(db_accessor->LabelToName(e.label)),
TypedValue(utils::Join(property_names, ","))};
results.emplace_back(constraint);
}
return results;
};
break;
#endif
case InfoQuery::InfoType::RAFT:
#if defined(MG_SINGLE_NODE_HA)
callback.header = {"info", "value"};
callback.fn = [db_accessor] {
std::vector<std::vector<TypedValue>> results(
{{TypedValue("is_leader"),
TypedValue(db_accessor->raft()->IsLeader())},
{TypedValue("term_id"), TypedValue(static_cast<int64_t>(
db_accessor->raft()->TermId()))}});
return results;
};
// It is critical to abort this query because it can be executed on
// machines that aren't the leader.
callback.should_abort_query = true;
#else
throw utils::NotYetImplemented("raft info");
#endif
break;
}
return callback;
}
Callback HandleConstraintQuery(ConstraintQuery *constraint_query,
DbAccessor *db_accessor) {
std::vector<storage::Property> properties;
auto label =
db_accessor->NameToLabel(constraint_query->constraint_.label.name);
properties.reserve(constraint_query->constraint_.properties.size());
for (const auto &prop : constraint_query->constraint_.properties) {
properties.push_back(db_accessor->NameToProperty(prop.name));
}
Callback callback;
switch (constraint_query->action_type_) {
case ConstraintQuery::ActionType::CREATE: {
switch (constraint_query->constraint_.type) {
case Constraint::Type::NODE_KEY:
throw utils::NotYetImplemented("Node key constraints");
case Constraint::Type::EXISTS:
#ifdef MG_SINGLE_NODE_V2
if (properties.empty() || properties.size() > 1) {
throw SyntaxException(
"Exactly one property must be used for existence constraints.");
}
callback.fn = [label, properties, db_accessor] {
auto res =
db_accessor->CreateExistenceConstraint(label, properties[0]);
if (res.HasError()) {
auto violation = res.GetError();
auto label_name = db_accessor->LabelToName(violation.label);
auto property_name =
db_accessor->PropertyToName(violation.property);
throw QueryRuntimeException(
"Unable to create a constraint :{}({}), because an existing "
"node violates it.",
label_name, property_name);
}
return std::vector<std::vector<TypedValue>>();
};
break;
#else
throw utils::NotYetImplemented("Existence constraints");
#endif
case Constraint::Type::UNIQUE:
#ifdef MG_SINGLE_NODE_V2
throw utils::NotYetImplemented("Unique constraints");
#else
callback.fn = [label, properties, db_accessor] {
try {
db_accessor->BuildUniqueConstraint(label, properties);
return std::vector<std::vector<TypedValue>>();
} catch (const database::ConstraintViolationException &e) {
throw QueryRuntimeException(e.what());
} catch (const database::TransactionException &e) {
throw QueryRuntimeException(e.what());
} catch (const mvcc::SerializationError &e) {
throw QueryRuntimeException(e.what());
}
};
break;
#endif
}
} break;
case ConstraintQuery::ActionType::DROP: {
switch (constraint_query->constraint_.type) {
case Constraint::Type::NODE_KEY:
throw utils::NotYetImplemented("Node key constraints");
case Constraint::Type::EXISTS:
#ifdef MG_SINGLE_NODE_V2
if (properties.empty() || properties.size() > 1) {
throw SyntaxException(
"Exactly one property must be used for existence constraints.");
}
callback.fn = [label, properties, db_accessor] {
db_accessor->DropExistenceConstraint(label, properties[0]);
return std::vector<std::vector<TypedValue>>();
};
break;
#else
throw utils::NotYetImplemented("Existence constraints");
#endif
case Constraint::Type::UNIQUE:
#ifdef MG_SINGLE_NODE_V2
throw utils::NotYetImplemented("Unique constraints");
#else
callback.fn = [label, properties, db_accessor] {
try {
db_accessor->DeleteUniqueConstraint(label, properties);
return std::vector<std::vector<TypedValue>>();
} catch (const database::TransactionException &e) {
throw QueryRuntimeException(e.what());
}
};
break;
#endif
}
} break;
}
return callback;
}
Interpreter::Interpreter(InterpreterContext *interpreter_context)
: interpreter_context_(interpreter_context) {
CHECK(interpreter_context_) << "Interpreter context must not be NULL";
@ -861,40 +572,62 @@ std::shared_ptr<CachedPlan> CypherQueryToPlan(
.first->second;
}
void Interpreter::PrepareTransactionQuery(std::string_view query_upper) {
PreparedQuery Interpreter::PrepareTransactionQuery(
std::string_view query_upper) {
std::function<void()> handler;
if (query_upper == "BEGIN") {
if (in_explicit_transaction_) {
throw QueryException("Nested transactions are not supported.");
}
in_explicit_transaction_ = true;
expect_rollback_ = false;
handler = [this] {
if (in_explicit_transaction_) {
throw ExplicitTransactionUsageException(
"Nested transactions are not supported.");
}
in_explicit_transaction_ = true;
expect_rollback_ = false;
db_accessor_.emplace(interpreter_context_->db->Access());
execution_db_accessor_.emplace(&*db_accessor_);
};
} else if (query_upper == "COMMIT") {
if (!in_explicit_transaction_) {
throw QueryException("No current transaction to commit.");
}
if (expect_rollback_) {
throw QueryException(
"Transaction can't be committed because there was a previous "
"error. Please invoke a rollback instead.");
}
handler = [this] {
if (!in_explicit_transaction_) {
throw ExplicitTransactionUsageException(
"No current transaction to commit.");
}
if (expect_rollback_) {
throw ExplicitTransactionUsageException(
"Transaction can't be committed because there was a previous "
"error. Please invoke a rollback instead.");
}
try {
Commit();
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
try {
Commit();
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
expect_rollback_ = false;
in_explicit_transaction_ = false;
expect_rollback_ = false;
in_explicit_transaction_ = false;
};
} else if (query_upper == "ROLLBACK") {
if (!in_explicit_transaction_) {
throw QueryException("No current transaction to rollback.");
}
Abort();
expect_rollback_ = false;
in_explicit_transaction_ = false;
handler = [this] {
if (!in_explicit_transaction_) {
throw ExplicitTransactionUsageException(
"No current transaction to rollback.");
}
Abort();
expect_rollback_ = false;
in_explicit_transaction_ = false;
};
} else {
LOG(FATAL) << "Should not get here -- unknown transaction query!";
}
return {{}, {}, [handler = std::move(handler)](AnyStream *) {
handler();
return QueryHandlerResult::NOTHING;
}};
}
PreparedQuery PrepareCypherQuery(
@ -930,7 +663,7 @@ PreparedQuery PrepareCypherQuery(
execution_memory](AnyStream *stream) {
PullAllPlan(stream, *plan, parameters, output_symbols, false, summary,
dba, execution_memory);
return true;
return QueryHandlerResult::COMMIT;
}};
}
@ -997,7 +730,7 @@ PreparedQuery PrepareExplainQuery(
dba, execution_memory](AnyStream *stream) {
PullAllPlan(stream, *plan, parameters, output_symbols,
false, summary, dba, execution_memory);
return true;
return QueryHandlerResult::COMMIT;
}};
}
@ -1097,35 +830,29 @@ PreparedQuery PrepareProfileQuery(
"profile",
ProfilingStatsToJson(ctx.stats, ctx.profile_execution_time).dump());
return false;
return QueryHandlerResult::ABORT;
}};
}
PreparedQuery PrepareDumpQuery(
ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, DbAccessor *dba,
InterpreterContext *interpreter_context,
utils::MonotonicBufferResource *execution_memory) {
#ifndef MG_SINGLE_NODE_HA
SymbolTable symbol_table;
auto query_symbol = symbol_table.CreateSymbol("QUERY", false);
std::vector<Symbol> output_symbols = {query_symbol};
std::vector<std::string> header = {query_symbol.name()};
auto output_plan = std::make_unique<plan::OutputTableStream>(
output_symbols, DumpClosure(dba));
auto plan =
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
std::move(output_plan), 0.0, AstStorage{}, symbol_table));
return PreparedQuery{
std::move(header), std::move(parsed_query.required_privileges),
[plan = std::move(plan), parameters = std::move(parsed_query.parameters),
output_symbols = std::move(output_symbols), summary, dba,
execution_memory](AnyStream *stream) {
PullAllPlan(stream, *plan, parameters, output_symbols, false, summary,
dba, execution_memory);
return true;
{"QUERY"},
std::move(parsed_query.required_privileges),
[interpreter_context](AnyStream *stream) {
auto dba = interpreter_context->db->Access();
query::DbAccessor query_dba{&dba};
std::ostringstream oss;
database::CypherDumpGenerator dump_generator{&query_dba};
while (dump_generator.NextQuery(&oss)) {
stream->Result({TypedValue(oss.str())});
}
return QueryHandlerResult::NOTHING;
}};
#else
throw utils::NotYetImplemented("Dump database");
@ -1142,6 +869,7 @@ PreparedQuery PrepareIndexQuery(
}
auto *index_query = utils::Downcast<IndexQuery>(parsed_query.query);
std::function<void()> handler;
// Creating an index influences computed plan costs.
auto invalidate_plan_cache = [plan_cache = &interpreter_context->plan_cache] {
@ -1151,30 +879,86 @@ PreparedQuery PrepareIndexQuery(
}
};
auto callback = HandleIndexQuery(index_query, invalidate_plan_cache, dba);
#ifdef MG_SINGLE_NODE_V2
auto label = interpreter_context->db->NameToLabel(index_query->label_.name);
std::vector<storage::PropertyId> properties;
properties.reserve(index_query->properties_.size());
for (const auto &prop : index_query->properties_) {
properties.push_back(interpreter_context->db->NameToProperty(prop.name));
}
#else
auto label = dba->NameToLabel(index_query->label_.name);
std::vector<storage::Property> properties;
properties.reserve(index_query->properties_.size());
for (const auto &prop : index_query->properties_) {
properties.push_back(dba->NameToProperty(prop.name));
}
#endif
SymbolTable symbol_table;
std::vector<Symbol> output_symbols;
for (const auto &column : callback.header) {
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
if (properties.size() > 1) {
throw utils::NotYetImplemented("index on multiple properties");
}
auto plan =
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
std::make_unique<plan::OutputTable>(
output_symbols,
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
0.0, AstStorage{}, symbol_table));
switch (index_query->action_) {
case IndexQuery::Action::CREATE: {
#ifdef MG_SINGLE_NODE_V2
handler = [interpreter_context, label, properties = std::move(properties),
invalidate_plan_cache = std::move(invalidate_plan_cache)] {
CHECK(properties.size() == 1);
interpreter_context->db->CreateIndex(label, properties[0]);
invalidate_plan_cache();
};
#else
handler = [dba, label, properties = std::move(properties),
invalidate_plan_cache = std::move(invalidate_plan_cache)] {
try {
CHECK(properties.size() == 1);
dba->CreateIndex(label, properties[0]);
invalidate_plan_cache();
} catch (const database::ConstraintViolationException &e) {
throw QueryRuntimeException(e.what());
} catch (const database::IndexExistsException &e) {
// Ignore creating an existing index.
} catch (const database::TransactionException &e) {
throw QueryRuntimeException(e.what());
}
};
#endif
break;
}
case IndexQuery::Action::DROP: {
#ifdef MG_SINGLE_NODE_V2
handler = [interpreter_context, label, properties = std::move(properties),
invalidate_plan_cache = std::move(invalidate_plan_cache)] {
CHECK(properties.size() == 1);
interpreter_context->db->DropIndex(label, properties[0]);
invalidate_plan_cache();
};
#else
handler = [dba, label, properties = std::move(properties),
invalidate_plan_cache = std::move(invalidate_plan_cache)] {
try {
CHECK(properties.size() == 1);
dba->DropIndex(label, properties[0]);
invalidate_plan_cache();
} catch (const database::TransactionException &e) {
throw QueryRuntimeException(e.what());
}
};
#endif
break;
}
}
return PreparedQuery{callback.header,
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[callback = std::move(callback), plan = std::move(plan),
parameters = std::move(parsed_query.parameters),
output_symbols = std::move(output_symbols), summary,
dba, execution_memory](AnyStream *stream) {
PullAllPlan(stream, *plan, parameters, output_symbols,
false, summary, dba, execution_memory);
return !callback.should_abort_query;
[handler = std::move(handler)](AnyStream *stream) {
handler();
#ifdef MG_SINGLE_NODE_V2
return QueryHandlerResult::NOTHING;
#else
return QueryHandlerResult::COMMIT;
#endif
}};
}
@ -1210,17 +994,17 @@ PreparedQuery PrepareAuthQuery(
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
0.0, AstStorage{}, symbol_table));
return PreparedQuery{callback.header,
std::move(parsed_query.required_privileges),
[callback = std::move(callback), plan = std::move(plan),
parameters = std::move(parsed_query.parameters),
output_symbols = std::move(output_symbols), summary,
dba, execution_memory](AnyStream *stream) {
PullAllPlan(stream, *plan, parameters, output_symbols,
false, summary, dba, execution_memory);
return !callback.should_abort_query;
}};
return PreparedQuery{
callback.header, std::move(parsed_query.required_privileges),
[callback = std::move(callback), plan = std::move(plan),
parameters = std::move(parsed_query.parameters),
output_symbols = std::move(output_symbols), summary, dba,
execution_memory](AnyStream *stream) {
PullAllPlan(stream, *plan, parameters, output_symbols, false, summary,
dba, execution_memory);
return callback.should_abort_query ? QueryHandlerResult::ABORT
: QueryHandlerResult::COMMIT;
}};
#endif
}
@ -1229,31 +1013,110 @@ PreparedQuery PrepareInfoQuery(
InterpreterContext *interpreter_context, DbAccessor *dba,
utils::MonotonicBufferResource *execution_memory) {
auto *info_query = utils::Downcast<InfoQuery>(parsed_query.query);
std::vector<std::string> header;
std::function<
std::pair<std::vector<std::vector<TypedValue>>, QueryHandlerResult>()>
handler;
auto callback = HandleInfoQuery(info_query, dba);
switch (info_query->info_type_) {
case InfoQuery::InfoType::STORAGE:
#if defined(MG_SINGLE_NODE)
header = {"storage info", "value"};
handler = [dba] {
auto info = dba->StorageInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(info.size());
for (const auto &pair : info) {
results.push_back({TypedValue(pair.first), TypedValue(pair.second)});
}
return std::pair{results, QueryHandlerResult::COMMIT};
};
#elif defined(MG_SINGLE_NODE_HA)
header = {"server id", "storage info", "value"};
handler = [dba] {
auto info = dba->StorageInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(info.size());
for (const auto &peer_info : info) {
for (const auto &pair : peer_info.second) {
results.push_back({TypedValue(peer_info.first),
TypedValue(pair.first),
TypedValue(pair.second)});
}
}
return std::pair{results, QueryHandlerResult::COMMIT};
};
#else
throw utils::NotYetImplemented("storage info");
#endif
break;
case InfoQuery::InfoType::INDEX:
#ifdef MG_SINGLE_NODE_V2
throw utils::NotYetImplemented("IndexInfo");
#else
header = {"created index"};
handler = [dba] {
auto info = dba->IndexInfo();
std::vector<std::vector<TypedValue>> results;
results.reserve(info.size());
for (const auto &index : info) {
results.push_back({TypedValue(index)});
}
return std::pair{results, QueryHandlerResult::COMMIT};
};
break;
#endif
case InfoQuery::InfoType::CONSTRAINT:
#ifdef MG_SINGLE_NODE_V2
throw utils::NotYetImplemented("ConstraintInfo");
#else
header = {"constraint type", "label", "properties"};
handler = [dba] {
std::vector<std::vector<TypedValue>> results;
for (auto &e : dba->ListUniqueConstraints()) {
std::vector<std::string> property_names(e.properties.size());
std::transform(
e.properties.begin(), e.properties.end(), property_names.begin(),
[dba](const auto &p) { return dba->PropertyToName(p); });
SymbolTable symbol_table;
std::vector<Symbol> output_symbols;
for (const auto &column : callback.header) {
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
std::vector<TypedValue> constraint{
TypedValue("unique"), TypedValue(dba->LabelToName(e.label)),
TypedValue(utils::Join(property_names, ","))};
results.emplace_back(constraint);
}
return std::pair{results, QueryHandlerResult::COMMIT};
};
break;
#endif
case InfoQuery::InfoType::RAFT:
#if defined(MG_SINGLE_NODE_HA)
header = {"info", "value"};
handler = [dba] {
std::vector<std::vector<TypedValue>> results(
{{TypedValue("is_leader"), TypedValue(dba->raft()->IsLeader())},
{TypedValue("term_id"),
TypedValue(static_cast<int64_t>(dba->raft()->TermId()))}});
// It is critical to abort this query because it can be executed on
// machines that aren't the leader.
return std::pair{results, QueryHandlerResult::ABORT};
};
#else
throw utils::NotYetImplemented("raft info");
#endif
break;
}
auto plan =
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
std::make_unique<plan::OutputTable>(
output_symbols,
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
0.0, AstStorage{}, symbol_table));
return PreparedQuery{callback.header,
return PreparedQuery{std::move(header),
std::move(parsed_query.required_privileges),
[callback = std::move(callback), plan = std::move(plan),
parameters = std::move(parsed_query.parameters),
output_symbols = std::move(output_symbols), summary,
dba, execution_memory](AnyStream *stream) {
PullAllPlan(stream, *plan, parameters, output_symbols,
false, summary, dba, execution_memory);
return !callback.should_abort_query;
[handler = std::move(handler)](AnyStream *stream) {
auto [results, action] = handler();
for (const auto &result : results) {
stream->Result(result);
}
return action;
}};
}
@ -1262,31 +1125,118 @@ PreparedQuery PrepareConstraintQuery(
InterpreterContext *interpreter_context, DbAccessor *dba,
utils::MonotonicBufferResource *execution_memory) {
auto *constraint_query = utils::Downcast<ConstraintQuery>(parsed_query.query);
std::function<void()> handler;
auto callback = HandleConstraintQuery(constraint_query, dba);
#ifdef MG_SINGLE_NODE_V2
auto label = interpreter_context->db->NameToLabel(
constraint_query->constraint_.label.name);
std::vector<storage::PropertyId> properties;
properties.reserve(constraint_query->constraint_.properties.size());
for (const auto &prop : constraint_query->constraint_.properties) {
properties.push_back(interpreter_context->db->NameToProperty(prop.name));
}
#else
auto label = dba->NameToLabel(constraint_query->constraint_.label.name);
std::vector<storage::Property> properties;
properties.reserve(constraint_query->constraint_.properties.size());
for (const auto &prop : constraint_query->constraint_.properties) {
properties.push_back(dba->NameToProperty(prop.name));
}
#endif
SymbolTable symbol_table;
std::vector<Symbol> output_symbols;
for (const auto &column : callback.header) {
output_symbols.emplace_back(symbol_table.CreateSymbol(column, "false"));
switch (constraint_query->action_type_) {
case ConstraintQuery::ActionType::CREATE: {
switch (constraint_query->constraint_.type) {
case Constraint::Type::NODE_KEY:
throw utils::NotYetImplemented("Node key constraints");
case Constraint::Type::EXISTS:
#ifdef MG_SINGLE_NODE_V2
if (properties.empty() || properties.size() > 1) {
throw SyntaxException(
"Exactly one property must be used for existence constraints.");
}
handler = [interpreter_context, label,
properties = std::move(properties)] {
auto res = interpreter_context->db->CreateExistenceConstraint(
label, properties[0]);
if (res.HasError()) {
auto violation = res.GetError();
auto label_name =
interpreter_context->db->LabelToName(violation.label);
auto property_name =
interpreter_context->db->PropertyToName(violation.property);
throw QueryRuntimeException(
"Unable to create a constraint :{}({}), because an existing "
"node violates it.",
label_name, property_name);
}
};
break;
#else
throw utils::NotYetImplemented("Existence constraints");
#endif
case Constraint::Type::UNIQUE:
#ifdef MG_SINGLE_NODE_V2
throw utils::NotYetImplemented("Unique constraints");
#else
handler = [dba, label, properties = std::move(properties)] {
try {
dba->BuildUniqueConstraint(label, properties);
} catch (const database::ConstraintViolationException &e) {
throw QueryRuntimeException(e.what());
} catch (const database::TransactionException &e) {
throw QueryRuntimeException(e.what());
} catch (const mvcc::SerializationError &e) {
throw QueryRuntimeException(e.what());
}
};
break;
#endif
}
} break;
case ConstraintQuery::ActionType::DROP: {
switch (constraint_query->constraint_.type) {
case Constraint::Type::NODE_KEY:
throw utils::NotYetImplemented("Node key constraints");
case Constraint::Type::EXISTS:
#ifdef MG_SINGLE_NODE_V2
if (properties.empty() || properties.size() > 1) {
throw SyntaxException(
"Exactly one property must be used for existence constraints.");
}
handler = [interpreter_context, label,
properties = std::move(properties)] {
interpreter_context->db->DropExistenceConstraint(label,
properties[0]);
return std::vector<std::vector<TypedValue>>();
};
break;
#else
throw utils::NotYetImplemented("Existence constraints");
#endif
case Constraint::Type::UNIQUE:
#ifdef MG_SINGLE_NODE_V2
throw utils::NotYetImplemented("Unique constraints");
#else
handler = [dba, label, properties = std::move(properties)] {
try {
dba->DeleteUniqueConstraint(label, properties);
return std::vector<std::vector<TypedValue>>();
} catch (const database::TransactionException &e) {
throw QueryRuntimeException(e.what());
}
};
break;
#endif
}
} break;
}
auto plan =
std::make_shared<CachedPlan>(std::make_unique<SingleNodeLogicalPlan>(
std::make_unique<plan::OutputTable>(
output_symbols,
[fn = callback.fn](Frame *, ExecutionContext *) { return fn(); }),
0.0, AstStorage{}, symbol_table));
return PreparedQuery{callback.header,
return PreparedQuery{{},
std::move(parsed_query.required_privileges),
[callback = std::move(callback), plan = std::move(plan),
parameters = std::move(parsed_query.parameters),
output_symbols = std::move(output_symbols), summary,
dba, execution_memory](AnyStream *stream) {
PullAllPlan(stream, *plan, parameters, output_symbols,
false, summary, dba, execution_memory);
return !callback.should_abort_query;
[handler = std::move(handler)](AnyStream *stream) {
handler();
return QueryHandlerResult::COMMIT;
}};
}
@ -1302,20 +1252,19 @@ Interpreter::Prepare(const std::string &query_string,
if (query_upper == "BEGIN" || query_upper == "COMMIT" ||
query_upper == "ROLLBACK") {
PrepareTransactionQuery(query_upper);
return {{}, {}};
prepared_query_ = PrepareTransactionQuery(query_upper);
return {prepared_query_->header, prepared_query_->privileges};
}
// All queries other than transaction control queries advance the command in
// an explicit transaction block.
if (in_explicit_transaction_ && db_accessor_) {
if (in_explicit_transaction_) {
AdvanceCommand();
}
// Create a database accessor if we don't yet have one.
if (!db_accessor_) {
db_accessor_.emplace(interpreter_context_->db->Access());
execution_db_accessor_.emplace(&*db_accessor_);
// If we're not in an explicit transaction block and we have an open
// transaction, abort it since we're about to prepare a new query.
else if (db_accessor_) {
AbortCommand();
}
try {
@ -1338,6 +1287,24 @@ Interpreter::Prepare(const std::string &query_string,
&interpreter_context_->antlr_lock);
summary_["parsing_time"] = parsing_timer.Elapsed().count();
// Some queries require an active transaction in order to be prepared.
#ifdef MG_SINGLE_NODE_V2
if (!in_explicit_transaction_ &&
!utils::Downcast<IndexQuery>(parsed_query.query) &&
!utils::Downcast<DumpQuery>(parsed_query.query) &&
!utils::Downcast<ConstraintQuery>(parsed_query.query) &&
!utils::Downcast<InfoQuery>(parsed_query.query)) {
db_accessor_.emplace(interpreter_context_->db->Access());
execution_db_accessor_.emplace(&*db_accessor_);
}
#else
if (!in_explicit_transaction_ &&
!utils::Downcast<DumpQuery>(parsed_query.query)) {
db_accessor_.emplace(interpreter_context_->db->Access());
execution_db_accessor_.emplace(&*db_accessor_);
}
#endif
#ifdef MG_SINGLE_NODE_HA
{
InfoQuery *info_query = nullptr;
@ -1365,25 +1332,40 @@ Interpreter::Prepare(const std::string &query_string,
std::move(parsed_query), in_explicit_transaction_, &summary_,
interpreter_context_, &*execution_db_accessor_, &execution_memory_);
} else if (utils::Downcast<DumpQuery>(parsed_query.query)) {
prepared_query = PrepareDumpQuery(
std::move(parsed_query), &summary_, interpreter_context_,
&*execution_db_accessor_, &execution_memory_);
prepared_query =
PrepareDumpQuery(std::move(parsed_query), &summary_,
interpreter_context_, &execution_memory_);
} else if (utils::Downcast<IndexQuery>(parsed_query.query)) {
#ifdef MG_SINGLE_NODE_V2
DbAccessor *dba = nullptr;
#else
auto dba = &*execution_db_accessor_;
#endif
prepared_query = PrepareIndexQuery(
std::move(parsed_query), in_explicit_transaction_, &summary_,
interpreter_context_, &*execution_db_accessor_, &execution_memory_);
interpreter_context_, dba, &execution_memory_);
} else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
prepared_query = PrepareAuthQuery(
std::move(parsed_query), in_explicit_transaction_, &summary_,
interpreter_context_, &*execution_db_accessor_, &execution_memory_);
} else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
prepared_query = PrepareInfoQuery(
std::move(parsed_query), &summary_, interpreter_context_,
&*execution_db_accessor_, &execution_memory_);
#ifdef MG_SINGLE_NODE_V2
DbAccessor *dba = nullptr;
#else
auto dba = &*execution_db_accessor_;
#endif
prepared_query =
PrepareInfoQuery(std::move(parsed_query), &summary_,
interpreter_context_, dba, &execution_memory_);
} else if (utils::Downcast<ConstraintQuery>(parsed_query.query)) {
prepared_query = PrepareConstraintQuery(
std::move(parsed_query), &summary_, interpreter_context_,
&*execution_db_accessor_, &execution_memory_);
#ifdef MG_SINGLE_NODE_V2
DbAccessor *dba = nullptr;
#else
auto dba = &*execution_db_accessor_;
#endif
prepared_query =
PrepareConstraintQuery(std::move(parsed_query), &summary_,
interpreter_context_, dba, &execution_memory_);
} else {
LOG(FATAL) << "Should not get here -- unknown query type!";
}

View File

@ -28,6 +28,8 @@ namespace query {
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
enum class QueryHandlerResult { COMMIT, ABORT, NOTHING };
/**
* `AnyStream` can wrap *any* type implementing the `Stream` concept into a
* single type.
@ -79,7 +81,7 @@ class AnyStream final {
struct PreparedQuery {
std::vector<std::string> header;
std::vector<AuthQuery::Privilege> privileges;
std::function<bool(AnyStream *stream)> query_handler;
std::function<QueryHandlerResult(AnyStream *stream)> query_handler;
};
// TODO: Maybe this should move to query/plan/planner.
@ -252,7 +254,7 @@ class Interpreter final {
bool expect_rollback_{false};
utils::MonotonicBufferResource execution_memory_{kExecutionMemoryBlockSize};
void PrepareTransactionQuery(std::string_view query_upper);
PreparedQuery PrepareTransactionQuery(std::string_view query_upper);
void Commit();
void AdvanceCommand();
void AbortCommand();
@ -260,23 +262,37 @@ class Interpreter final {
template <typename TStream>
std::map<std::string, TypedValue> Interpreter::PullAll(TStream *result_stream) {
// If we don't have any results (eg. a transaction command preceeded),
// return an empty summary.
if (!prepared_query_) return {};
CHECK(prepared_query_) << "Trying to call PullAll without a prepared query";
try {
// Wrap the (statically polymorphic) stream type into a common type which
// the handler knows.
AnyStream stream{result_stream, &execution_memory_};
bool commit = prepared_query_->query_handler(&stream);
QueryHandlerResult res = prepared_query_->query_handler(&stream);
// Erase the prepared query in order to enforce that every call to `PullAll`
// must be preceded by a call to `Prepare`.
prepared_query_ = std::nullopt;
if (!in_explicit_transaction_) {
if (commit) {
Commit();
} else {
Abort();
switch (res) {
case QueryHandlerResult::COMMIT:
Commit();
break;
case QueryHandlerResult::ABORT:
Abort();
break;
case QueryHandlerResult::NOTHING:
// The only cases in which we have nothing to do are those where we're
// either in an explicit transaction or the query is such that a
// transaction wasn't started on a call to `Prepare()`.
CHECK(in_explicit_transaction_ || !db_accessor_);
break;
}
}
} catch (const ExplicitTransactionUsageException &) {
// Just let the exception propagate for error reporting purposes, but don't
// abort the current command.
throw;
#ifdef MG_SINGLE_NODE_HA
} catch (const query::HintedAbortError &) {
AbortCommand();

View File

@ -272,10 +272,10 @@ TEST_F(InterpreterTest, Bfs) {
}
TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) {
interpreter_.Prepare("BEGIN", {});
ASSERT_THROW(interpreter_.Prepare("CREATE INDEX ON :X(y)", {}),
Interpret("BEGIN");
ASSERT_THROW(Interpret("CREATE INDEX ON :X(y)"),
query::IndexInMulticommandTxException);
interpreter_.Prepare("ROLLBACK", {});
Interpret("ROLLBACK");
}
// Test shortest path end to end.