Merge Interpreter and TransactionEngine

Summary:
Depends on D2471

- Add pointer to storage to `InterpreterContext`
- Rename `operator()` to `Prepare`
- Use `Interpret` instead of `operator()` (`Interpret` will be removed soon)
- Remove the `in_explicit_transaction` parameter
- Remove the memory resource parameter from `Interpret`
- Remove the storage accessor parameter from `Interpret`
- Fix up tests (remove the `Interpreter` from `database_transaction_timeout`)

Reviewers: teon.banek, mferencevic

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2482
This commit is contained in:
Lovro Lugovic 2019-10-10 11:23:33 +02:00
parent 98855889d5
commit 3750fbc093
18 changed files with 347 additions and 467 deletions

View File

@ -97,7 +97,7 @@ void SingleNodeMain() {
#else
database::GraphDb db;
#endif
query::InterpreterContext interpreter_context;
query::InterpreterContext interpreter_context{&db};
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
integrations::kafka::Streams kafka_streams{

View File

@ -40,7 +40,7 @@ void SingleNodeHAMain() {
auto durability_directory = std::filesystem::path(FLAGS_durability_directory);
database::GraphDb db;
query::InterpreterContext interpreter_context;
query::InterpreterContext interpreter_context{&db};
SessionData session_data{&db, &interpreter_context, nullptr, nullptr};
ServerContext context;

View File

@ -33,7 +33,6 @@ BoltSession::BoltSession(SessionData *data,
db_(data->db),
#endif
interpreter_(data->interpreter_context),
transaction_engine_(data->db, &interpreter_),
#ifndef MG_SINGLE_NODE_HA
auth_(data->auth),
audit_log_(data->audit_log),
@ -56,14 +55,14 @@ std::vector<std::string> BoltSession::Interpret(
PropertyValue(params_pv));
#endif
try {
auto result = transaction_engine_.Interpret(query, params_pv);
auto result = interpreter_.Interpret(query, params_pv);
#ifndef MG_SINGLE_NODE_HA
if (user_) {
const auto &permissions = user_->GetPermissions();
for (const auto &privilege : result.second) {
if (permissions.Has(glue::PrivilegeToPermission(privilege)) !=
auth::PermissionLevel::GRANT) {
transaction_engine_.Abort();
interpreter_.Abort();
throw communication::bolt::ClientError(
"You are not authorized to execute this query! Please contact "
"your database administrator.");
@ -88,7 +87,7 @@ std::map<std::string, communication::bolt::Value> BoltSession::PullAll(
#else
TypedValueResultStream stream(encoder);
#endif
const auto &summary = transaction_engine_.PullAll(&stream);
const auto &summary = interpreter_.PullAll(&stream);
std::map<std::string, communication::bolt::Value> decoded_summary;
for (const auto &kv : summary) {
#ifdef MG_SINGLE_NODE_V2
@ -117,7 +116,7 @@ std::map<std::string, communication::bolt::Value> BoltSession::PullAll(
}
}
void BoltSession::Abort() { transaction_engine_.Abort(); }
void BoltSession::Abort() { interpreter_.Abort(); }
bool BoltSession::Authenticate(const std::string &username,
const std::string &password) {
@ -169,35 +168,22 @@ void BoltSession::TypedValueResultStream::Result(
void KafkaStreamWriter(
SessionData &session_data, const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
auto dba = session_data.db->Access();
query::DbAccessor execution_dba(&dba);
query::Interpreter interpreter(session_data.interpreter_context);
KafkaResultStream stream;
std::map<std::string, PropertyValue> params_pv;
for (const auto &kv : params)
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
try {
query::Interpreter interpreter{session_data.interpreter_context};
interpreter(query, &execution_dba, params_pv, false,
utils::NewDeleteResource())
.PullAll(stream);
#ifdef MG_SINGLE_NODE_V2
auto maybe_constraint_violation = dba.Commit();
if (maybe_constraint_violation.HasError()) {
const auto &constraint_violation = maybe_constraint_violation.GetError();
auto label_name = dba.LabelToName(constraint_violation.label);
auto property_name = dba.PropertyToName(constraint_violation.property);
LOG(WARNING) << fmt::format(
"[Kafka] query execution failed with an exception: "
"Unable to commit due to constraint violation on :{}({}).",
label_name, property_name);
}
#else
dba.Commit();
#endif
// NOTE: This potentially allows Kafka streams to execute transaction
// control queries. However, those will not really work as a new
// `Interpreter` instance is created upon every call to this function,
// meaning any multicommand transaction state is lost.
interpreter.Interpret(query, params_pv);
interpreter.PullAll(&stream);
} catch (const utils::BasicException &e) {
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
<< e.what();
dba.Abort();
}
};

View File

@ -16,7 +16,6 @@
#include "communication/init.hpp"
#include "communication/session.hpp"
#include "query/interpreter.hpp"
#include "query/transaction_engine.hpp"
#ifdef MG_SINGLE_NODE_V2
namespace database {
@ -93,7 +92,6 @@ class BoltSession final
const storage::Storage *db_;
#endif
query::Interpreter interpreter_;
query::TransactionEngine transaction_engine_;
#ifndef MG_SINGLE_NODE_HA
auth::Auth *auth_;
std::optional<auth::User> user_;

View File

@ -838,10 +838,79 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context)
CHECK(interpreter_context_) << "Interpreter context must not be NULL";
}
Interpreter::Results Interpreter::operator()(
const std::string &query_string, DbAccessor *db_accessor,
std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>>
Interpreter::Interpret(const std::string &query,
const std::map<std::string, PropertyValue> &params) {
// Clear pending results.
results_ = std::nullopt;
execution_memory_.Release();
// Check the query for transaction commands.
auto query_upper = utils::Trim(utils::ToUpperCase(query));
if (query_upper == "BEGIN") {
if (in_explicit_transaction_) {
throw QueryException("Nested transactions are not supported.");
}
in_explicit_transaction_ = true;
expect_rollback_ = false;
return {};
} else if (query_upper == "COMMIT") {
if (!in_explicit_transaction_) {
throw QueryException("No current transaction to commit.");
}
if (expect_rollback_) {
throw QueryException(
"Transaction can't be committed because there was a previous "
"error. Please invoke a rollback instead.");
}
try {
Commit();
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
expect_rollback_ = false;
in_explicit_transaction_ = false;
return {};
} else if (query_upper == "ROLLBACK") {
if (!in_explicit_transaction_) {
throw QueryException("No current transaction to rollback.");
}
Abort();
expect_rollback_ = false;
in_explicit_transaction_ = false;
return {};
}
// Any other query in an explicit transaction block advances the command.
if (in_explicit_transaction_ && db_accessor_) AdvanceCommand();
// Create a DB accessor if we don't yet have one.
if (!db_accessor_) {
db_accessor_.emplace(interpreter_context_->db->Access());
execution_db_accessor_.emplace(&*db_accessor_);
}
// Clear leftover results.
results_ = std::nullopt;
execution_memory_.Release();
// Interpret the query and return the headers.
try {
results_.emplace(Prepare(query, params, &*execution_db_accessor_));
return {results_->header(), results_->privileges()};
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
}
Interpreter::Results Interpreter::Prepare(
const std::string &query_string,
const std::map<std::string, PropertyValue> &params,
bool in_explicit_transaction, utils::MemoryResource *execution_memory) {
DbAccessor *db_accessor) {
AstStorage ast_storage;
Parameters parameters;
std::map<std::string, TypedValue> summary;
@ -899,7 +968,7 @@ Interpreter::Results Interpreter::operator()(
return Results(db_accessor, parameters, plan, std::move(output_symbols),
std::move(header), std::move(summary),
parsed_query.required_privileges, execution_memory);
parsed_query.required_privileges, &execution_memory_);
}
if (utils::IsSubtype(*parsed_query.query, ExplainQuery::kType)) {
@ -970,7 +1039,7 @@ Interpreter::Results Interpreter::operator()(
return Results(db_accessor, parameters, plan, std::move(output_symbols),
std::move(header), std::move(summary),
parsed_query.required_privileges, execution_memory);
parsed_query.required_privileges, &execution_memory_);
}
if (utils::IsSubtype(*parsed_query.query, ProfileQuery::kType)) {
@ -980,7 +1049,7 @@ Interpreter::Results Interpreter::operator()(
<< "Expected stripped query to start with '" << kProfileQueryStart
<< "'";
if (in_explicit_transaction) {
if (in_explicit_transaction_) {
throw ProfileInMulticommandTxException();
}
@ -1052,7 +1121,7 @@ Interpreter::Results Interpreter::operator()(
return Results(db_accessor, parameters, plan, std::move(output_symbols),
std::move(header), std::move(summary),
parsed_query.required_privileges, execution_memory,
parsed_query.required_privileges, &execution_memory_,
/* is_profile_query */ true, /* should_abort_query */ true);
}
@ -1073,7 +1142,7 @@ Interpreter::Results Interpreter::operator()(
return Results(db_accessor, parameters, plan, std::move(output_symbols),
std::move(header), std::move(summary),
parsed_query.required_privileges, execution_memory,
parsed_query.required_privileges, &execution_memory_,
/* is_profile_query */ false,
/* should_abort_query */ false);
#else
@ -1083,7 +1152,7 @@ Interpreter::Results Interpreter::operator()(
Callback callback;
if (auto *index_query = utils::Downcast<IndexQuery>(parsed_query.query)) {
if (in_explicit_transaction) {
if (in_explicit_transaction_) {
throw IndexInMulticommandTxException();
}
// Creating an index influences computed plan costs.
@ -1103,7 +1172,7 @@ Interpreter::Results Interpreter::operator()(
"Managing user privileges is not yet supported in Memgraph HA "
"instance.");
#else
if (in_explicit_transaction) {
if (in_explicit_transaction_) {
throw UserModificationInMulticommandTxException();
}
callback = HandleAuthQuery(auth_query, interpreter_context_->auth,
@ -1115,7 +1184,7 @@ Interpreter::Results Interpreter::operator()(
throw utils::NotYetImplemented(
"Graph streams are not yet supported in Memgraph HA instance.");
#else
if (in_explicit_transaction) {
if (in_explicit_transaction_) {
throw StreamClauseInMulticommandTxException();
}
callback =
@ -1150,10 +1219,63 @@ Interpreter::Results Interpreter::operator()(
return Results(db_accessor, parameters, plan, std::move(output_symbols),
callback.header, std::move(summary),
parsed_query.required_privileges, execution_memory,
parsed_query.required_privileges, &execution_memory_,
/* is_profile_query */ false, callback.should_abort_query);
}
void Interpreter::Abort() {
results_ = std::nullopt;
execution_memory_.Release();
expect_rollback_ = false;
in_explicit_transaction_ = false;
if (!db_accessor_) return;
db_accessor_->Abort();
execution_db_accessor_ = std::nullopt;
db_accessor_ = std::nullopt;
}
void Interpreter::Commit() {
results_ = std::nullopt;
execution_memory_.Release();
if (!db_accessor_) return;
#ifdef MG_SINGLE_NODE_V2
auto maybe_constraint_violation = db_accessor_->Commit();
if (maybe_constraint_violation.HasError()) {
const auto &constraint_violation = maybe_constraint_violation.GetError();
auto label_name =
execution_db_accessor_->LabelToName(constraint_violation.label);
auto property_name =
execution_db_accessor_->PropertyToName(constraint_violation.property);
execution_db_accessor_ = std::nullopt;
db_accessor_ = std::nullopt;
throw QueryException(
"Unable to commit due to existence constraint violation on :{}({}).",
label_name, property_name);
}
#else
db_accessor_->Commit();
#endif
execution_db_accessor_ = std::nullopt;
db_accessor_ = std::nullopt;
}
void Interpreter::AdvanceCommand() {
results_ = std::nullopt;
execution_memory_.Release();
if (!db_accessor_) return;
db_accessor_->AdvanceCommand();
}
void Interpreter::AbortCommand() {
results_ = std::nullopt;
execution_memory_.Release();
if (in_explicit_transaction_) {
expect_rollback_ = true;
} else {
Abort();
}
}
std::shared_ptr<CachedPlan> Interpreter::CypherQueryToPlan(
HashType query_hash, CypherQuery *query, AstStorage ast_storage,
const Parameters &parameters, DbAccessor *db_accessor) {

View File

@ -2,6 +2,8 @@
#include <gflags/gflags.h>
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/context.hpp"
#include "query/db_accessor.hpp"
#include "query/frontend/ast/ast.hpp"
@ -9,6 +11,8 @@
#include "query/frontend/stripped.hpp"
#include "query/interpret/frame.hpp"
#include "query/plan/operator.hpp"
#include "utils/likely.hpp"
#include "utils/memory.hpp"
#include "utils/skip_list.hpp"
#include "utils/spin_lock.hpp"
#include "utils/timer.hpp"
@ -27,6 +31,8 @@ class Streams;
namespace query {
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
// TODO: Maybe this should move to query/plan/planner.
/// Interface for accessing the root operator of a logical plan.
class LogicalPlan {
@ -104,6 +110,21 @@ struct PlanCacheEntry {
* been passed to an `Interpreter` instance.
*/
struct InterpreterContext {
#ifdef MG_SINGLE_NODE_V2
explicit InterpreterContext(storage::Storage *db)
#else
explicit InterpreterContext(database::GraphDb *db)
#endif
: db(db) {
CHECK(db) << "Storage must not be NULL";
}
#ifdef MG_SINGLE_NODE_V2
storage::Storage *db;
#else
database::GraphDb *db;
#endif
// Antlr has singleton instance that is shared between threads. It is
// protected by locks inside of antlr. Unfortunately, they are not protected
// in a very good way. Once we have antlr version without race conditions we
@ -266,16 +287,53 @@ class Interpreter {
Interpreter(Interpreter &&) = delete;
Interpreter &operator=(Interpreter &&) = delete;
virtual ~Interpreter() {}
virtual ~Interpreter() { Abort(); }
std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>>
Interpret(const std::string &query,
const std::map<std::string, PropertyValue> &params);
/**
* Generates an Results object for the parameters. The resulting object
* can be Pulled with its results written to an arbitrary stream.
*/
virtual Results operator()(const std::string &query, DbAccessor *db_accessor,
const std::map<std::string, PropertyValue> &params,
bool in_explicit_transaction,
utils::MemoryResource *execution_memory);
virtual Results Prepare(const std::string &query,
const std::map<std::string, PropertyValue> &params,
DbAccessor *db_accessor);
template <typename TStream>
std::map<std::string, TypedValue> PullAll(TStream *result_stream) {
// If we don't have any results (eg. a transaction command preceeded),
// return an empty summary.
if (UNLIKELY(!results_)) return {};
// Stream all results and return the summary.
try {
results_->PullAll(*result_stream);
// Make a copy of the summary because the `Commit` call will destroy the
// `results_` object.
auto summary = results_->summary();
if (!in_explicit_transaction_) {
if (results_->ShouldAbortQuery()) {
Abort();
} else {
Commit();
}
}
return summary;
#ifdef MG_SINGLE_NODE_HA
} catch (const query::HintedAbortError &) {
AbortCommand();
throw utils::BasicException("Transaction was asked to abort.");
#endif
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
}
void Abort();
protected:
std::pair<frontend::StrippedQuery, ParsedQuery> StripAndParseQuery(
@ -299,6 +357,25 @@ class Interpreter {
private:
InterpreterContext *interpreter_context_;
#ifdef MG_SINGLE_NODE_V2
std::optional<storage::Storage::Accessor> db_accessor_;
#else
std::optional<database::GraphDbAccessor> db_accessor_;
#endif
std::optional<DbAccessor> execution_db_accessor_;
// The `query::Interpreter::Results` object MUST be destroyed before the
// `database::GraphDbAccessor` is destroyed because the `Results` object holds
// references to the `GraphDb` object and will crash the database when
// destructed if you are not careful.
std::optional<Results> results_;
bool in_explicit_transaction_{false};
bool expect_rollback_{false};
utils::MonotonicBufferResource execution_memory_{kExecutionMemoryBlockSize};
void Commit();
void AdvanceCommand();
void AbortCommand();
// high level tree -> CachedPlan
std::shared_ptr<CachedPlan> CypherQueryToPlan(HashType query_hash,
CypherQuery *query,

View File

@ -61,16 +61,12 @@ void query::Repl(database::GraphDb *db, query::Interpreter *interpreter) {
// regular cypher queries
try {
auto dba = db->Access();
ResultStreamFaker<query::TypedValue> stream;
DbAccessor execution_dba(&dba);
auto results = (*interpreter)(command, &execution_dba, {}, false,
utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
auto [header, _] = interpreter->Interpret(command, {});
stream.Header(header);
auto summary = interpreter->PullAll(&stream);
stream.Summary(summary);
std::cout << stream;
dba.Commit();
} catch (const query::SyntaxException &e) {
std::cout << "SYNTAX EXCEPTION: " << e.what() << std::endl;
} catch (const query::LexingException &e) {

View File

@ -1,207 +0,0 @@
#pragma once
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/exceptions.hpp"
#include "query/interpreter.hpp"
#include "utils/likely.hpp"
#include "utils/memory.hpp"
#include "utils/string.hpp"
namespace query {
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
class TransactionEngine final {
public:
#ifdef MG_SINGLE_NODE_V2
TransactionEngine(storage::Storage *db, Interpreter *interpreter)
#else
TransactionEngine(database::GraphDb *db, Interpreter *interpreter)
#endif
: db_(db),
interpreter_(interpreter),
execution_memory_(&initial_memory_block_[0],
kExecutionMemoryBlockSize) {}
~TransactionEngine() { Abort(); }
std::pair<std::vector<std::string>, std::vector<query::AuthQuery::Privilege>>
Interpret(const std::string &query,
const std::map<std::string, PropertyValue> &params) {
// Clear pending results.
results_ = std::nullopt;
execution_memory_.Release();
// Check the query for transaction commands.
auto query_upper = utils::Trim(utils::ToUpperCase(query));
if (query_upper == "BEGIN") {
if (in_explicit_transaction_) {
throw QueryException("Nested transactions are not supported.");
}
in_explicit_transaction_ = true;
expect_rollback_ = false;
return {};
} else if (query_upper == "COMMIT") {
if (!in_explicit_transaction_) {
throw QueryException("No current transaction to commit.");
}
if (expect_rollback_) {
throw QueryException(
"Transaction can't be committed because there was a previous "
"error. Please invoke a rollback instead.");
}
try {
Commit();
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
expect_rollback_ = false;
in_explicit_transaction_ = false;
return {};
} else if (query_upper == "ROLLBACK") {
if (!in_explicit_transaction_) {
throw QueryException("No current transaction to rollback.");
}
Abort();
expect_rollback_ = false;
in_explicit_transaction_ = false;
return {};
}
// Any other query in an explicit transaction block advances the command.
if (in_explicit_transaction_ && db_accessor_) AdvanceCommand();
// Create a DB accessor if we don't yet have one.
if (!db_accessor_) {
db_accessor_.emplace(db_->Access());
execution_db_accessor_.emplace(&*db_accessor_);
}
// Clear leftover results.
results_ = std::nullopt;
execution_memory_.Release();
// Interpret the query and return the headers.
try {
results_.emplace((*interpreter_)(query, &*execution_db_accessor_, params,
in_explicit_transaction_,
&execution_memory_));
return {std::move(results_->header()), std::move(results_->privileges())};
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
}
template <typename TStream>
std::map<std::string, TypedValue> PullAll(TStream *result_stream) {
// If we don't have any results (eg. a transaction command preceeded),
// return an empty summary.
if (UNLIKELY(!results_)) return {};
// Stream all results and return the summary.
try {
results_->PullAll(*result_stream);
// Make a copy of the summary because the `Commit` call will destroy the
// `results_` object.
auto summary = results_->summary();
if (!in_explicit_transaction_) {
if (results_->ShouldAbortQuery()) {
Abort();
} else {
Commit();
}
}
return summary;
#ifdef MG_SINGLE_NODE_HA
} catch (const query::HintedAbortError &) {
AbortCommand();
throw utils::BasicException("Transaction was asked to abort.");
#endif
} catch (const utils::BasicException &) {
AbortCommand();
throw;
}
}
void Abort() {
results_ = std::nullopt;
execution_memory_.Release();
expect_rollback_ = false;
in_explicit_transaction_ = false;
if (!db_accessor_) return;
db_accessor_->Abort();
execution_db_accessor_ = std::nullopt;
db_accessor_ = std::nullopt;
}
private:
#ifdef MG_SINGLE_NODE_V2
storage::Storage *db_{nullptr};
std::optional<storage::Storage::Accessor> db_accessor_;
#else
database::GraphDb *db_{nullptr};
std::optional<database::GraphDbAccessor> db_accessor_;
#endif
std::optional<DbAccessor> execution_db_accessor_;
Interpreter *interpreter_{nullptr};
// The `query::Interpreter::Results` object MUST be destroyed before the
// `database::GraphDbAccessor` is destroyed because the `Results` object holds
// references to the `GraphDb` object and will crash the database when
// destructed if you are not careful.
std::optional<query::Interpreter::Results> results_;
bool in_explicit_transaction_{false};
bool expect_rollback_{false};
uint8_t initial_memory_block_[kExecutionMemoryBlockSize];
utils::MonotonicBufferResource execution_memory_;
void Commit() {
results_ = std::nullopt;
execution_memory_.Release();
if (!db_accessor_) return;
#ifdef MG_SINGLE_NODE_V2
auto maybe_constraint_violation = db_accessor_->Commit();
if (maybe_constraint_violation.HasError()) {
const auto &constraint_violation = maybe_constraint_violation.GetError();
auto label_name = execution_db_accessor_->LabelToName(
constraint_violation.label);
auto property_name = execution_db_accessor_->PropertyToName(
constraint_violation.property);
execution_db_accessor_ = std::nullopt;
db_accessor_ = std::nullopt;
throw QueryException(
"Unable to commit due to existence constraint violation on :{}({}).",
label_name, property_name);
}
#else
db_accessor_->Commit();
#endif
execution_db_accessor_ = std::nullopt;
db_accessor_ = std::nullopt;
}
void AdvanceCommand() {
results_ = std::nullopt;
execution_memory_.Release();
if (!db_accessor_) return;
db_accessor_->AdvanceCommand();
}
void AbortCommand() {
results_ = std::nullopt;
execution_memory_.Release();
if (in_explicit_transaction_) {
expect_rollback_ = true;
} else {
Abort();
}
}
};
} // namespace query

View File

@ -13,8 +13,8 @@ class ExpansionBenchFixture : public benchmark::Fixture {
// GraphDb shouldn't be global constructed/destructed. See
// documentation in database/single_node/graph_db.hpp for details.
std::optional<database::GraphDb> db_;
query::InterpreterContext interpreter_context_;
query::Interpreter interpreter_{&interpreter_context_};
std::optional<query::InterpreterContext> interpreter_context_;
std::optional<query::Interpreter> interpreter_;
void SetUp(const benchmark::State &state) override {
db_.emplace();
@ -30,6 +30,9 @@ class ExpansionBenchFixture : public benchmark::Fixture {
dba.InsertEdge(start, dest, edge_type);
}
dba.Commit();
interpreter_context_.emplace(&*db_);
interpreter_.emplace(&*interpreter_context_);
}
void TearDown(const benchmark::State &) override {
@ -39,17 +42,16 @@ class ExpansionBenchFixture : public benchmark::Fixture {
db_ = std::nullopt;
}
auto &interpreter() { return interpreter_; }
auto &interpreter() { return *interpreter_; }
};
BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {
auto query = "MATCH (s:Starting) return s";
auto dba = db_->Access();
query::DbAccessor query_dba(&dba);
while (state.KeepRunning()) {
ResultStreamFaker<query::TypedValue> results;
interpreter()(query, &query_dba, {}, false, utils::NewDeleteResource())
.PullAll(results);
interpreter().Interpret(query, {});
interpreter().PullAll(&results);
}
}
@ -60,12 +62,11 @@ BENCHMARK_REGISTER_F(ExpansionBenchFixture, Match)
BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) {
auto query = "MATCH (s:Starting) WITH s MATCH (s)--(d) RETURN count(d)";
auto dba = db_->Access();
query::DbAccessor query_dba(&dba);
while (state.KeepRunning()) {
ResultStreamFaker<query::TypedValue> results;
interpreter()(query, &query_dba, {}, false, utils::NewDeleteResource())
.PullAll(results);
interpreter().Interpret(query, {});
interpreter().PullAll(&results);
}
}

View File

@ -2,7 +2,7 @@
#include "query/db_accessor.hpp"
#include "query/interpret/eval.hpp"
#include "query/transaction_engine.hpp"
#include "query/interpreter.hpp"
// The following classes are wrappers for utils::MemoryResource, so that we can
// use BENCHMARK_TEMPLATE

View File

@ -11,7 +11,6 @@
#include "query/frontend/semantic/symbol_generator.hpp"
#include "query/interpreter.hpp"
#include "query/plan/planner.hpp"
#include "query/transaction_engine.hpp"
// The following classes are wrappers for utils::MemoryResource, so that we can
// use BENCHMARK_TEMPLATE

View File

@ -38,8 +38,8 @@ void KafkaBenchmarkMain() {
audit::kBufferSizeDefault,
audit::kBufferFlushIntervalMillisDefault};
query::InterpreterContext interpreter_context;
database::GraphDb db;
query::InterpreterContext interpreter_context{&db};
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
std::atomic<int64_t> query_counter{0};

View File

@ -297,7 +297,7 @@ int main(int argc, char *argv[]) {
std::cout << "Generating graph..." << std::endl;
// fill_db;
random_generate(db, node_count, edge_count);
query::InterpreterContext interpreter_context;
query::InterpreterContext interpreter_context{&db};
query::Interpreter interpreter{&interpreter_context};
query::Repl(&db, &interpreter);
return 0;

View File

@ -11,16 +11,17 @@ int main(int argc, char *argv[]) {
std::cout << "Usage: ./single_query 'RETURN \"query here\"'" << std::endl;
exit(1);
}
database::GraphDb db;
auto dba = db.Access();
query::DbAccessor query_dba(&dba);
query::InterpreterContext interpreter_context{&db};
query::Interpreter interpreter{&interpreter_context};
ResultStreamFaker<query::TypedValue> stream;
query::InterpreterContext interpreter_context;
auto results = query::Interpreter(&interpreter_context)(
argv[1], &query_dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
auto [header, _] = interpreter.Interpret(argv[1], {});
stream.Header(header);
auto summary = interpreter.PullAll(&stream);
stream.Summary(summary);
std::cout << stream;
return 0;
}

View File

@ -122,6 +122,9 @@ std::string DumpNext(CypherDumpGenerator *dump) {
class DatabaseEnvironment {
public:
DatabaseEnvironment()
: interpreter_context_{&db_}, interpreter_{&interpreter_context_} {}
GraphDbAccessor Access() { return db_.Access(); }
DatabaseState GetState() {
@ -177,20 +180,28 @@ class DatabaseEnvironment {
return {vertices, edges, indices, constraints};
}
/**
* Execute the given query and commit the transaction.
*
* Return the query stream.
*/
auto Execute(const std::string &query) {
ResultStreamFaker<query::TypedValue> stream;
auto [header, _] = interpreter_.Interpret(query, {});
stream.Header(header);
auto summary = interpreter_.PullAll(&stream);
stream.Summary(summary);
return stream;
}
private:
database::GraphDb db_;
query::InterpreterContext interpreter_context_;
query::Interpreter interpreter_;
};
void Execute(GraphDbAccessor *dba, const std::string &query) {
CHECK(dba);
ResultStreamFaker<query::TypedValue> results;
query::DbAccessor query_dba(dba);
query::InterpreterContext interpreter_context;
query::Interpreter (&interpreter_context)(query, &query_dba, {}, false,
utils::NewDeleteResource())
.PullAll(results);
}
VertexAccessor CreateVertex(GraphDbAccessor *dba,
const std::vector<std::string> &labels,
const std::map<std::string, PropertyValue> &props,
@ -444,8 +455,8 @@ TEST(DumpTest, IndicesKeys) {
{
auto dba = db.Access();
CreateVertex(&dba, {"Label1", "Label2"}, {{"p", PropertyValue(1)}}, false);
Execute(&dba, "CREATE INDEX ON :Label1(prop);");
Execute(&dba, "CREATE INDEX ON :Label2(prop);");
dba.BuildIndex(dba.Label("Label1"), dba.Property("prop"));
dba.BuildIndex(dba.Label("Label2"), dba.Property("prop"));
dba.Commit();
}
@ -470,11 +481,10 @@ TEST(DumpTest, UniqueConstraints) {
{
auto dba = db.Access();
CreateVertex(&dba, {"Label"}, {{"prop", PropertyValue(1)}}, false);
Execute(&dba, "CREATE CONSTRAINT ON (u:Label) ASSERT u.prop IS UNIQUE;");
dba.BuildUniqueConstraint(dba.Label("Label"), {dba.Property("prop")});
// Create one with multiple properties.
Execute(
&dba,
"CREATE CONSTRAINT ON (u:Label) ASSERT u.prop1, u.prop2 IS UNIQUE;");
dba.BuildUniqueConstraint(dba.Label("Label"),
{dba.Property("prop1"), dba.Property("prop2")});
dba.Commit();
}
@ -514,11 +524,10 @@ TEST(DumpTest, CheckStateVertexWithMultipleProperties) {
auto dba = db.Access();
query::DbAccessor query_dba(&dba);
CypherDumpGenerator dump(&query_dba);
std::string cmd;
while (!(cmd = DumpNext(&dump)).empty()) {
auto dba_dump = db_dump.Access();
Execute(&dba_dump, cmd);
dba_dump.Commit();
db_dump.Execute(cmd);
}
}
EXPECT_EQ(db.GetState(), db_dump.GetState());
@ -545,10 +554,11 @@ TEST(DumpTest, CheckStateSimpleGraph) {
CreateEdge(&dba, z, u, "Knows", {});
CreateEdge(&dba, w, z, "Knows", {{"how", PropertyValue("school")}});
CreateEdge(&dba, w, z, "Likes", {{"how", PropertyValue("very much")}});
// Create few indices
Execute(&dba, "CREATE CONSTRAINT ON (u:Person) ASSERT u.name IS UNIQUE;");
Execute(&dba, "CREATE INDEX ON :Person(id);");
Execute(&dba, "CREATE INDEX ON :Person(unexisting_property);");
dba.BuildUniqueConstraint(dba.Label("Person"), {dba.Property("name")});
dba.BuildIndex(dba.Label("Person"), dba.Property("id"));
dba.BuildIndex(dba.Label("Person"), dba.Property("unexisting_property"));
}
const auto &db_initial_state = db.GetState();
@ -557,11 +567,10 @@ TEST(DumpTest, CheckStateSimpleGraph) {
auto dba = db.Access();
query::DbAccessor query_dba(&dba);
CypherDumpGenerator dump(&query_dba);
std::string cmd;
while (!(cmd = DumpNext(&dump)).empty()) {
auto dba_dump = db_dump.Access();
Execute(&dba_dump, cmd);
dba_dump.Commit();
db_dump.Execute(cmd);
}
}
EXPECT_EQ(db.GetState(), db_dump.GetState());
@ -579,17 +588,7 @@ TEST(DumpTest, ExecuteDumpDatabase) {
}
{
auto dba = db.Access();
query::DbAccessor query_dba(&dba);
const std::string query = "DUMP DATABASE";
ResultStreamFaker<query::TypedValue> stream;
query::InterpreterContext interpreter_context;
auto results = query::Interpreter(&interpreter_context)(
query, &query_dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
auto stream = db.Execute("DUMP DATABASE");
EXPECT_EQ(stream.GetResults().size(), 4U);
ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "QUERY");

View File

@ -10,25 +10,8 @@ DECLARE_int32(query_execution_time_sec);
TEST(TransactionTimeout, TransactionTimeout) {
FLAGS_query_execution_time_sec = 3;
database::GraphDb db;
query::InterpreterContext interpreter_context;
query::Interpreter interpreter(&interpreter_context);
auto interpret = [&](auto &dba, const std::string &query) {
query::DbAccessor query_dba(&dba);
ResultStreamFaker<query::TypedValue> stream;
interpreter(query, &query_dba, {}, false, utils::NewDeleteResource())
.PullAll(stream);
};
{
auto dba = db.Access();
interpret(dba, "MATCH (n) RETURN n");
}
{
auto dba = db.Access();
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_THROW(interpret(dba, "MATCH (n) RETURN n"), query::HintedAbortError);
}
{
auto dba = db.Access();
interpret(dba, "MATCH (n) RETURN n");
}
auto dba = db.Access();
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_TRUE(dba.should_abort());
}

View File

@ -15,19 +15,23 @@
class InterpreterTest : public ::testing::Test {
protected:
database::GraphDb db_;
query::InterpreterContext interpreter_context_;
query::InterpreterContext interpreter_context_{&db_};
query::Interpreter interpreter_{&interpreter_context_};
/**
* Execute the given query and commit the transaction.
*
* Return the query stream.
*/
auto Interpret(const std::string &query,
const std::map<std::string, PropertyValue> &params = {}) {
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
ResultStreamFaker<query::TypedValue> stream;
auto results = interpreter_(query, &query_dba, params, false,
utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
auto [header, _] = interpreter_.Interpret(query, params);
stream.Header(header);
auto summary = interpreter_.PullAll(&stream);
stream.Summary(summary);
return stream;
}
};
@ -208,21 +212,15 @@ TEST_F(InterpreterTest, Bfs) {
dba.Commit();
}
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
ResultStreamFaker<query::TypedValue> stream;
auto results = interpreter_(
auto stream = Interpret(
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and "
"e.reachable)]->(m) RETURN r",
&query_dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
"e.reachable)]->(m) RETURN r");
ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "r");
ASSERT_EQ(stream.GetResults().size(), 5 * kNumNodesPerLevel);
auto dba = db_.Access();
int expected_level = 1;
int remaining_nodes_in_level = kNumNodesPerLevel;
std::unordered_set<int64_t> matched_ids;
@ -254,43 +252,26 @@ TEST_F(InterpreterTest, Bfs) {
TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) {
ResultStreamFaker<query::TypedValue> stream;
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
ASSERT_THROW(interpreter_("CREATE INDEX ON :X(y)", &query_dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream),
interpreter_.Interpret("BEGIN", {});
ASSERT_THROW(interpreter_.Interpret("CREATE INDEX ON :X(y)", {}),
query::IndexInMulticommandTxException);
interpreter_.Interpret("ROLLBACK", {});
}
// Test shortest path end to end.
TEST_F(InterpreterTest, ShortestPath) {
{
ResultStreamFaker<query::TypedValue> stream;
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
interpreter_(
"CREATE (n:A {x: 1}), (m:B {x: 2}), (l:C {x: 1}), (n)-[:r1 {w: 1 "
"}]->(m)-[:r2 {w: 2}]->(l), (n)-[:r3 {w: 4}]->(l)",
&query_dba, {}, true, utils::NewDeleteResource())
.PullAll(stream);
Interpret(
"CREATE (n:A {x: 1}), (m:B {x: 2}), (l:C {x: 1}), (n)-[:r1 {w: 1 "
"}]->(m)-[:r2 {w: 2}]->(l), (n)-[:r3 {w: 4}]->(l)");
dba.Commit();
}
ResultStreamFaker<query::TypedValue> stream;
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
auto results =
interpreter_("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e",
&query_dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
auto stream =
Interpret("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e");
ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "e");
ASSERT_EQ(stream.GetResults().size(), 3U);
auto dba = db_.Access();
std::vector<std::vector<std::string>> expected_results{
{"r1"}, {"r2"}, {"r1", "r2"}};
@ -316,65 +297,13 @@ TEST_F(InterpreterTest, ShortestPath) {
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST_F(InterpreterTest, UniqueConstraintTest) {
ResultStreamFaker<query::TypedValue> stream;
{
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
interpreter_("CREATE CONSTRAINT ON (n:A) ASSERT n.a, n.b IS UNIQUE;",
&query_dba, {}, true, utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
interpreter_("CREATE (:A{a:1, b:1})", &query_dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
interpreter_("CREATE (:A{a:2, b:2})", &query_dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
ASSERT_THROW(interpreter_("CREATE (:A{a:1, b:1})", &query_dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream),
query::QueryRuntimeException);
dba.Commit();
}
{
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
interpreter_("MATCH (n:A{a:2, b:2}) SET n.a=1", &query_dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
interpreter_("CREATE (:A{a:2, b:2})", &query_dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
query::DbAccessor query_dba(&dba);
interpreter_("MATCH (n:A{a:2, b:2}) DETACH DELETE n", &query_dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
interpreter_("CREATE (n:A{a:2, b:2})", &query_dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
}
Interpret("CREATE CONSTRAINT ON (n:A) ASSERT n.a, n.b IS UNIQUE;");
Interpret("CREATE (:A{a:1, b:1})");
Interpret("CREATE (:A{a:2, b:2})");
ASSERT_THROW(Interpret("CREATE (:A{a:1, b:1})"),
query::QueryRuntimeException);
Interpret("MATCH (n:A{a:2, b:2}) SET n.a=1");
Interpret("CREATE (:A{a:2, b:2})");
Interpret("MATCH (n:A{a:2, b:2}) DETACH DELETE n");
Interpret("CREATE (n:A{a:2, b:2})");
}

View File

@ -17,36 +17,34 @@ DECLARE_bool(query_cost_planner);
class QueryExecution : public testing::Test {
protected:
std::optional<database::GraphDb> db_;
std::optional<database::GraphDbAccessor> dba_;
std::optional<query::InterpreterContext> interpreter_context_;
std::optional<query::Interpreter> interpreter_;
void SetUp() {
db_.emplace();
dba_.emplace(db_->Access());
interpreter_context_.emplace(&*db_);
interpreter_.emplace(&*interpreter_context_);
}
void TearDown() {
dba_ = std::nullopt;
interpreter_ = std::nullopt;
interpreter_context_ = std::nullopt;
db_ = std::nullopt;
}
/** Commits the current transaction and refreshes the dba_
* variable to hold a new accessor with a new transaction */
void Commit() {
dba_->Commit();
dba_ = db_->Access();
}
/** Executes the query and returns the results.
* Does NOT commit the transaction */
/**
* Execute the given query and commit the transaction.
*
* Return the query results.
*/
auto Execute(const std::string &query) {
query::DbAccessor query_dba(&*dba_);
ResultStreamFaker<query::TypedValue> stream;
query::InterpreterContext interpreter_context;
auto results = query::Interpreter(&interpreter_context)(
query, &query_dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
auto [header, _] = interpreter_->Interpret(query, {});
stream.Header(header);
auto summary = interpreter_->PullAll(&stream);
stream.Summary(summary);
return stream.GetResults();
}
};
@ -58,7 +56,6 @@ TEST_F(QueryExecution, MissingOptionalIntoExpand) {
Execute(
"CREATE (a:Person {id: 1}), (b:Person "
"{id:2})-[:Has]->(:Dog)-[:Likes]->(:Food )");
Commit();
ASSERT_EQ(Execute("MATCH (n) RETURN n").size(), 4);
auto Exec = [this](bool desc, const std::string &edge_pattern) {
@ -90,7 +87,6 @@ TEST_F(QueryExecution, EdgeUniquenessInOptional) {
// due to optonal match. Since edge-uniqueness only happens in one OPTIONAL
// MATCH, we only need to check that scenario.
Execute("CREATE (), ()-[:Type]->()");
Commit();
ASSERT_EQ(Execute("MATCH (n) RETURN n").size(), 3);
EXPECT_EQ(Execute("MATCH (n) OPTIONAL MATCH (n)-[r1]->(), (n)-[r2]->() "
"RETURN n, r1, r2")