Make interpretation pullable
Reviewers: mislav.bradac, teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1079
This commit is contained in:
parent
caaab41108
commit
9b878c91eb
src
communication/bolt/v1/states
query
tests
benchmark
manual
qa/tck_engine/steps
unit
@ -124,9 +124,10 @@ State HandleRun(TSession &session, State state, Marker marker) {
|
||||
auto ¶ms_map = params.ValueMap();
|
||||
std::map<std::string, query::TypedValue> params_tv(params_map.begin(),
|
||||
params_map.end());
|
||||
session.interpreter_.Interpret(query.ValueString(), *session.db_accessor_,
|
||||
session.output_stream_, params_tv,
|
||||
in_explicit_transaction);
|
||||
session
|
||||
.interpreter_(query.ValueString(), *session.db_accessor_, params_tv,
|
||||
in_explicit_transaction)
|
||||
.PullAll(session.output_stream_);
|
||||
|
||||
if (!in_explicit_transaction) {
|
||||
session.Commit();
|
||||
|
@ -1,7 +1,3 @@
|
||||
//
|
||||
// Copyright 2017 Memgraph
|
||||
// Created by Florijan Stamenkovic on 23.03.17.
|
||||
//
|
||||
#include "console.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
@ -69,7 +65,7 @@ void query::Repl(GraphDb &db) {
|
||||
try {
|
||||
GraphDbAccessor dba(db);
|
||||
ResultStreamFaker results;
|
||||
interpeter.Interpret(command, dba, results, {}, false);
|
||||
interpeter(command, dba, {}, false).PullAll(results);
|
||||
std::cout << results;
|
||||
dba.Commit();
|
||||
} catch (const query::SyntaxException &e) {
|
||||
|
@ -1,8 +1,3 @@
|
||||
//
|
||||
// Copyright 2017 Memgraph
|
||||
// Created by Florijan Stamenkovic on 23.03.17.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "database/graph_db.hpp"
|
||||
|
@ -3,6 +3,10 @@
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/frontend/ast/cypher_main_visitor.hpp"
|
||||
#include "query/frontend/opencypher/parser.hpp"
|
||||
#include "query/frontend/semantic/symbol_generator.hpp"
|
||||
#include "query/plan/planner.hpp"
|
||||
#include "query/plan/vertex_count_cache.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
@ -16,6 +20,124 @@ DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60,
|
||||
|
||||
namespace query {
|
||||
|
||||
Interpreter::Results Interpreter::operator()(
|
||||
const std::string &query, GraphDbAccessor &db_accessor,
|
||||
const std::map<std::string, TypedValue> ¶ms,
|
||||
bool in_explicit_transaction) {
|
||||
utils::Timer frontend_timer;
|
||||
Context ctx(db_accessor);
|
||||
ctx.in_explicit_transaction_ = in_explicit_transaction;
|
||||
ctx.is_query_cached_ = true;
|
||||
|
||||
// query -> stripped query
|
||||
StrippedQuery stripped(query);
|
||||
|
||||
// Update context with provided parameters.
|
||||
ctx.parameters_ = stripped.literals();
|
||||
for (const auto ¶m_pair : stripped.parameters()) {
|
||||
auto param_it = params.find(param_pair.second);
|
||||
if (param_it == params.end()) {
|
||||
throw query::UnprovidedParameterError(
|
||||
fmt::format("Parameter$ {} not provided", param_pair.second));
|
||||
}
|
||||
ctx.parameters_.Add(param_pair.first, param_it->second);
|
||||
}
|
||||
|
||||
// Check if we have a cached logical plan ready, so that we can skip the
|
||||
// whole query -> AST -> logical_plan process.
|
||||
auto cached_plan = [&]() -> std::shared_ptr<CachedPlan> {
|
||||
auto plan_cache_accessor = plan_cache_.access();
|
||||
auto plan_cache_it = plan_cache_accessor.find(stripped.hash());
|
||||
if (plan_cache_it != plan_cache_accessor.end() &&
|
||||
plan_cache_it->second->IsExpired()) {
|
||||
// Remove the expired plan.
|
||||
plan_cache_accessor.remove(stripped.hash());
|
||||
plan_cache_it = plan_cache_accessor.end();
|
||||
}
|
||||
if (plan_cache_it != plan_cache_accessor.end()) {
|
||||
return plan_cache_it->second;
|
||||
}
|
||||
return nullptr;
|
||||
}();
|
||||
|
||||
auto frontend_time = frontend_timer.Elapsed();
|
||||
|
||||
utils::Timer planning_timer;
|
||||
|
||||
if (!cached_plan) {
|
||||
AstTreeStorage ast_storage = QueryToAst(stripped, ctx);
|
||||
SymbolGenerator symbol_generator(ctx.symbol_table_);
|
||||
ast_storage.query()->Accept(symbol_generator);
|
||||
|
||||
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
|
||||
double query_plan_cost_estimation = 0.0;
|
||||
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
|
||||
MakeLogicalPlan(ast_storage, db_accessor, ctx);
|
||||
|
||||
cached_plan = std::make_shared<CachedPlan>(
|
||||
std::move(tmp_logical_plan), query_plan_cost_estimation,
|
||||
ctx.symbol_table_, std::move(ast_storage));
|
||||
|
||||
if (FLAGS_query_plan_cache) {
|
||||
// Cache the generated plan.
|
||||
auto plan_cache_accessor = plan_cache_.access();
|
||||
auto plan_cache_it =
|
||||
plan_cache_accessor.insert(stripped.hash(), cached_plan).first;
|
||||
cached_plan = plan_cache_it->second;
|
||||
}
|
||||
}
|
||||
auto *logical_plan = &cached_plan->plan();
|
||||
// TODO review: is the check below necessary?
|
||||
DCHECK(dynamic_cast<const plan::Produce *>(logical_plan) ||
|
||||
dynamic_cast<const plan::Skip *>(logical_plan) ||
|
||||
dynamic_cast<const plan::Limit *>(logical_plan) ||
|
||||
dynamic_cast<const plan::OrderBy *>(logical_plan) ||
|
||||
dynamic_cast<const plan::Distinct *>(logical_plan) ||
|
||||
dynamic_cast<const plan::Union *>(logical_plan) ||
|
||||
dynamic_cast<const plan::CreateNode *>(logical_plan) ||
|
||||
dynamic_cast<const plan::CreateExpand *>(logical_plan) ||
|
||||
dynamic_cast<const plan::SetProperty *>(logical_plan) ||
|
||||
dynamic_cast<const plan::SetProperties *>(logical_plan) ||
|
||||
dynamic_cast<const plan::SetLabels *>(logical_plan) ||
|
||||
dynamic_cast<const plan::RemoveProperty *>(logical_plan) ||
|
||||
dynamic_cast<const plan::RemoveLabels *>(logical_plan) ||
|
||||
dynamic_cast<const plan::Delete *>(logical_plan) ||
|
||||
dynamic_cast<const plan::Merge *>(logical_plan) ||
|
||||
dynamic_cast<const plan::CreateIndex *>(logical_plan))
|
||||
<< "Unknown top level LogicalOperator";
|
||||
|
||||
ctx.symbol_table_ = cached_plan->symbol_table();
|
||||
|
||||
auto planning_time = planning_timer.Elapsed();
|
||||
|
||||
std::map<std::string, TypedValue> summary;
|
||||
summary["parsing_time"] = frontend_time.count();
|
||||
summary["planning_time"] = planning_time.count();
|
||||
summary["cost_estimate"] = cached_plan->cost();
|
||||
// TODO: set summary['type'] based on transaction metadata
|
||||
// the type can't be determined based only on top level LogicalOp
|
||||
// (for example MATCH DELETE RETURN will have Produce as it's top)
|
||||
// for now always use "rw" because something must be set, but it doesn't
|
||||
// have to be correct (for Bolt clients)
|
||||
summary["type"] = "rw";
|
||||
|
||||
auto cursor = logical_plan->MakeCursor(ctx.db_accessor_);
|
||||
std::vector<std::string> header;
|
||||
std::vector<Symbol> output_symbols(
|
||||
logical_plan->OutputSymbols(ctx.symbol_table_));
|
||||
for (const auto &symbol : output_symbols) {
|
||||
// When the symbol is aliased or expanded from '*' (inside RETURN or
|
||||
// WITH), then there is no token position, so use symbol name.
|
||||
// Otherwise, find the name from stripped query.
|
||||
header.push_back(utils::FindOr(stripped.named_expressions(),
|
||||
symbol.token_position(), symbol.name())
|
||||
.first);
|
||||
}
|
||||
|
||||
return Results(std::move(ctx), std::move(cursor), output_symbols, header,
|
||||
summary, plan_cache_);
|
||||
}
|
||||
|
||||
AstTreeStorage Interpreter::QueryToAst(const StrippedQuery &stripped,
|
||||
Context &ctx) {
|
||||
if (!ctx.is_query_cached_) {
|
||||
|
@ -9,18 +9,13 @@
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "query/context.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/frontend/ast/cypher_main_visitor.hpp"
|
||||
#include "query/frontend/opencypher/parser.hpp"
|
||||
#include "query/frontend/semantic/symbol_generator.hpp"
|
||||
#include "query/frontend/ast/ast.hpp"
|
||||
#include "query/frontend/stripped.hpp"
|
||||
#include "query/interpret/frame.hpp"
|
||||
#include "query/plan/operator.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DECLARE_bool(query_cost_planner);
|
||||
DECLARE_bool(query_plan_cache);
|
||||
DECLARE_int32(query_plan_cache_ttl);
|
||||
|
||||
namespace query {
|
||||
@ -55,112 +50,110 @@ class Interpreter {
|
||||
};
|
||||
|
||||
public:
|
||||
/**
|
||||
* Encapsulates all what's necessary for the interpretation of a query into a
|
||||
* single object that can be pulled (into the given Stream).
|
||||
*/
|
||||
class Results {
|
||||
friend Interpreter;
|
||||
Results(Context ctx, std::unique_ptr<query::plan::Cursor> cursor,
|
||||
std::vector<Symbol> output_symbols, std::vector<std::string> header,
|
||||
std::map<std::string, TypedValue> summary,
|
||||
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> &plan_cache)
|
||||
: ctx_(std::move(ctx)),
|
||||
cursor_(std::move(cursor)),
|
||||
frame_(ctx_.symbol_table_.max_position()),
|
||||
output_symbols_(output_symbols),
|
||||
header_(header),
|
||||
summary_(summary),
|
||||
plan_cache_(plan_cache) {}
|
||||
|
||||
public:
|
||||
Results(const Results &) = delete;
|
||||
Results(Results &&) = default;
|
||||
Results &operator=(const Results &) = delete;
|
||||
Results &operator=(Results &&) = default;
|
||||
|
||||
/**
|
||||
* Make the interpreter perform a single Pull. Results (if they exists) are
|
||||
* pushed into the given stream. On first Pull the header is written to the
|
||||
* stream, on last the summary.
|
||||
*
|
||||
* @param stream - The stream to push the header, results and summary into.
|
||||
* @return - If this Results is eligible for another Pull. If Pulling
|
||||
* after `false` has been returned, the behavior is undefined.
|
||||
* @tparam TStream - Stream type.
|
||||
*/
|
||||
template <typename TStream>
|
||||
bool Pull(TStream &stream) {
|
||||
if (!header_written_) {
|
||||
stream.Header(header_);
|
||||
header_written_ = true;
|
||||
}
|
||||
|
||||
bool return_value = cursor_->Pull(frame_, ctx_);
|
||||
|
||||
if (return_value && !output_symbols_.empty()) {
|
||||
std::vector<TypedValue> values;
|
||||
values.reserve(output_symbols_.size());
|
||||
for (const auto &symbol : output_symbols_) {
|
||||
values.emplace_back(frame_[symbol]);
|
||||
}
|
||||
stream.Result(values);
|
||||
}
|
||||
|
||||
if (!return_value) {
|
||||
auto execution_time = execution_timer_.Elapsed();
|
||||
summary_["plan_execution_time"] = execution_time.count();
|
||||
stream.Summary(summary_);
|
||||
|
||||
if (ctx_.is_index_created_) {
|
||||
// If index is created we invalidate cache so that we can try to
|
||||
// generate better plan with that cache.
|
||||
auto accessor = plan_cache_.access();
|
||||
for (const auto &cached_plan : accessor) {
|
||||
accessor.remove(cached_plan.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return return_value;
|
||||
}
|
||||
|
||||
/** Calls Pull() until exhausted. */
|
||||
template <typename TStream>
|
||||
void PullAll(TStream &stream) {
|
||||
while (Pull(stream)) continue;
|
||||
}
|
||||
|
||||
private:
|
||||
Context ctx_;
|
||||
std::unique_ptr<query::plan::Cursor> cursor_;
|
||||
Frame frame_;
|
||||
std::vector<Symbol> output_symbols_;
|
||||
|
||||
bool header_written_{false};
|
||||
std::vector<std::string> header_;
|
||||
std::map<std::string, TypedValue> summary_;
|
||||
|
||||
utils::Timer execution_timer_;
|
||||
// Gets invalidated after if an index has been built.
|
||||
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> &plan_cache_;
|
||||
};
|
||||
|
||||
Interpreter() = default;
|
||||
Interpreter(const Interpreter &) = delete;
|
||||
Interpreter &operator=(const Interpreter &) = delete;
|
||||
Interpreter(Interpreter &&) = delete;
|
||||
Interpreter &operator=(Interpreter &&) = delete;
|
||||
|
||||
template <typename Stream>
|
||||
void Interpret(const std::string &query, GraphDbAccessor &db_accessor,
|
||||
Stream &stream,
|
||||
const std::map<std::string, TypedValue> ¶ms,
|
||||
bool in_explicit_transaction) {
|
||||
utils::Timer frontend_timer;
|
||||
Context ctx(db_accessor);
|
||||
ctx.in_explicit_transaction_ = in_explicit_transaction;
|
||||
ctx.is_query_cached_ = true;
|
||||
std::map<std::string, TypedValue> summary;
|
||||
|
||||
// query -> stripped query
|
||||
StrippedQuery stripped(query);
|
||||
|
||||
// Update context with provided parameters.
|
||||
ctx.parameters_ = stripped.literals();
|
||||
for (const auto ¶m_pair : stripped.parameters()) {
|
||||
auto param_it = params.find(param_pair.second);
|
||||
if (param_it == params.end()) {
|
||||
throw query::UnprovidedParameterError(
|
||||
fmt::format("Parameter$ {} not provided", param_pair.second));
|
||||
}
|
||||
ctx.parameters_.Add(param_pair.first, param_it->second);
|
||||
}
|
||||
|
||||
// Check if we have a cached logical plan ready, so that we can skip the
|
||||
// whole query -> AST -> logical_plan process.
|
||||
auto cached_plan = [&]() -> std::shared_ptr<CachedPlan> {
|
||||
auto plan_cache_accessor = plan_cache_.access();
|
||||
auto plan_cache_it = plan_cache_accessor.find(stripped.hash());
|
||||
if (plan_cache_it != plan_cache_accessor.end() &&
|
||||
plan_cache_it->second->IsExpired()) {
|
||||
// Remove the expired plan.
|
||||
plan_cache_accessor.remove(stripped.hash());
|
||||
plan_cache_it = plan_cache_accessor.end();
|
||||
}
|
||||
if (plan_cache_it != plan_cache_accessor.end()) {
|
||||
return plan_cache_it->second;
|
||||
}
|
||||
return nullptr;
|
||||
}();
|
||||
|
||||
auto frontend_time = frontend_timer.Elapsed();
|
||||
|
||||
utils::Timer planning_timer;
|
||||
|
||||
if (!cached_plan) {
|
||||
AstTreeStorage ast_storage = QueryToAst(stripped, ctx);
|
||||
SymbolGenerator symbol_generator(ctx.symbol_table_);
|
||||
ast_storage.query()->Accept(symbol_generator);
|
||||
|
||||
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
|
||||
double query_plan_cost_estimation = 0.0;
|
||||
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
|
||||
MakeLogicalPlan(ast_storage, db_accessor, ctx);
|
||||
|
||||
cached_plan = std::make_shared<CachedPlan>(
|
||||
std::move(tmp_logical_plan), query_plan_cost_estimation,
|
||||
ctx.symbol_table_, std::move(ast_storage));
|
||||
|
||||
if (FLAGS_query_plan_cache) {
|
||||
// Cache the generated plan.
|
||||
auto plan_cache_accessor = plan_cache_.access();
|
||||
auto plan_cache_it =
|
||||
plan_cache_accessor.insert(stripped.hash(), cached_plan).first;
|
||||
cached_plan = plan_cache_it->second;
|
||||
}
|
||||
}
|
||||
ctx.symbol_table_ = cached_plan->symbol_table();
|
||||
|
||||
auto planning_time = planning_timer.Elapsed();
|
||||
|
||||
utils::Timer execution_timer;
|
||||
ExecutePlan(stream, &cached_plan->plan(), ctx, stripped);
|
||||
auto execution_time = execution_timer.Elapsed();
|
||||
|
||||
if (ctx.is_index_created_) {
|
||||
// If index is created we invalidate cache so that we can try to generate
|
||||
// better plan with that cache.
|
||||
auto accessor = plan_cache_.access();
|
||||
for (const auto &cached_plan : accessor) {
|
||||
accessor.remove(cached_plan.first);
|
||||
}
|
||||
}
|
||||
|
||||
summary["parsing_time"] = frontend_time.count();
|
||||
summary["planning_time"] = planning_time.count();
|
||||
summary["plan_execution_time"] = execution_time.count();
|
||||
summary["cost_estimate"] = cached_plan->cost();
|
||||
|
||||
// TODO: set summary['type'] based on transaction metadata
|
||||
// the type can't be determined based only on top level LogicalOp
|
||||
// (for example MATCH DELETE RETURN will have Produce as it's top)
|
||||
// for now always use "rw" because something must be set, but it doesn't
|
||||
// have to be correct (for Bolt clients)
|
||||
summary["type"] = "rw";
|
||||
stream.Summary(summary);
|
||||
DLOG(INFO) << "Executed '" << query << "', params: " << params
|
||||
<< ", summary: " << summary;
|
||||
}
|
||||
/**
|
||||
* Generates an Results object for the parameters. The resulting object
|
||||
* can the be Pulled with it's results written to an arbitrary stream.
|
||||
*/
|
||||
Results operator()(const std::string &query, GraphDbAccessor &db_accessor,
|
||||
const std::map<std::string, TypedValue> ¶ms,
|
||||
bool in_explicit_transaction);
|
||||
|
||||
private:
|
||||
// stripped query -> high level tree
|
||||
@ -171,59 +164,6 @@ class Interpreter {
|
||||
std::pair<std::unique_ptr<plan::LogicalOperator>, double> MakeLogicalPlan(
|
||||
AstTreeStorage &, const GraphDbAccessor &, Context &);
|
||||
|
||||
template <typename Stream>
|
||||
void ExecutePlan(Stream &stream, const plan::LogicalOperator *logical_plan,
|
||||
Context &ctx, const StrippedQuery &stripped) {
|
||||
// Generate frame based on symbol table max_position.
|
||||
Frame frame(ctx.symbol_table_.max_position());
|
||||
std::vector<std::string> header;
|
||||
std::vector<Symbol> output_symbols(
|
||||
logical_plan->OutputSymbols(ctx.symbol_table_));
|
||||
if (!output_symbols.empty()) {
|
||||
// Since we have output symbols, this means that the query contains RETURN
|
||||
// clause, so stream out the results.
|
||||
|
||||
// Generate header.
|
||||
for (const auto &symbol : output_symbols) {
|
||||
// When the symbol is aliased or expanded from '*' (inside RETURN or
|
||||
// WITH), then there is no token position, so use symbol name.
|
||||
// Otherwise, find the name from stripped query.
|
||||
header.push_back(utils::FindOr(stripped.named_expressions(),
|
||||
symbol.token_position(), symbol.name())
|
||||
.first);
|
||||
}
|
||||
stream.Header(header);
|
||||
|
||||
// Stream out results.
|
||||
auto cursor = logical_plan->MakeCursor(ctx.db_accessor_);
|
||||
while (cursor->Pull(frame, ctx)) {
|
||||
std::vector<TypedValue> values;
|
||||
for (const auto &symbol : output_symbols) {
|
||||
values.emplace_back(frame[symbol]);
|
||||
}
|
||||
stream.Result(values);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (dynamic_cast<const plan::CreateNode *>(logical_plan) ||
|
||||
dynamic_cast<const plan::CreateExpand *>(logical_plan) ||
|
||||
dynamic_cast<const plan::SetProperty *>(logical_plan) ||
|
||||
dynamic_cast<const plan::SetProperties *>(logical_plan) ||
|
||||
dynamic_cast<const plan::SetLabels *>(logical_plan) ||
|
||||
dynamic_cast<const plan::RemoveProperty *>(logical_plan) ||
|
||||
dynamic_cast<const plan::RemoveLabels *>(logical_plan) ||
|
||||
dynamic_cast<const plan::Delete *>(logical_plan) ||
|
||||
dynamic_cast<const plan::Merge *>(logical_plan) ||
|
||||
dynamic_cast<const plan::CreateIndex *>(logical_plan)) {
|
||||
stream.Header(header);
|
||||
auto cursor = logical_plan->MakeCursor(ctx.db_accessor_);
|
||||
while (cursor->Pull(frame, ctx)) continue;
|
||||
} else {
|
||||
throw QueryRuntimeException("Unknown top level LogicalOperator");
|
||||
}
|
||||
}
|
||||
|
||||
ConcurrentMap<HashType, AstTreeStorage> ast_cache_;
|
||||
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> plan_cache_;
|
||||
// Antlr has singleton instance that is shared between threads. It is
|
||||
|
@ -134,10 +134,11 @@ class LogicalOperator
|
||||
|
||||
/** @brief Return @c Symbol vector where the results will be stored.
|
||||
*
|
||||
* Currently, outputs symbols are only generated in @c Produce operator.
|
||||
* @c Skip, @c Limit and @c OrderBy propagate the symbols from @c Produce (if
|
||||
* it exists as input operator). In the future, we may want this method to
|
||||
* return the symbols that will be set in this operator.
|
||||
* Currently, outputs symbols are generated in @c Produce and @c Union
|
||||
* operators. @c Skip, @c Limit, @c OrderBy and @c Distinct propagate the
|
||||
* symbols from @c Produce (if it exists as input operator). In the future, we
|
||||
* may want this method to return the symbols that will be set in this
|
||||
* operator.
|
||||
*
|
||||
* @param SymbolTable used to find symbols for expressions.
|
||||
* @return std::vector<Symbol> used for results.
|
||||
|
@ -42,7 +42,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {
|
||||
GraphDbAccessor dba(*db_);
|
||||
while (state.KeepRunning()) {
|
||||
ResultStreamFaker results;
|
||||
interpeter_.Interpret(query, dba, results, {}, false);
|
||||
interpeter_(query, dba, {}, false).PullAll(results);
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,7 +56,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) {
|
||||
GraphDbAccessor dba(*db_);
|
||||
while (state.KeepRunning()) {
|
||||
ResultStreamFaker results;
|
||||
interpeter_.Interpret(query, dba, results, {}, false);
|
||||
interpeter_(query, dba, {}, false).PullAll(results);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,8 +14,7 @@ int main(int argc, char *argv[]) {
|
||||
GraphDb db;
|
||||
GraphDbAccessor dba(db);
|
||||
ResultStreamFaker results;
|
||||
query::Interpreter interpeter;
|
||||
interpeter.Interpret(argv[1], dba, results, {}, false);
|
||||
query::Interpreter()(argv[1], dba, {}, false).PullAll(results);
|
||||
std::cout << results;
|
||||
return 0;
|
||||
}
|
||||
|
@ -282,7 +282,7 @@ def expected_result_step(context):
|
||||
|
||||
def check_exception(context):
|
||||
if context.exception is not None:
|
||||
context.log.info("Exception when eqecuting query!")
|
||||
context.log.info("Exception when executing query!")
|
||||
assert(False)
|
||||
|
||||
|
||||
|
@ -1,38 +1,32 @@
|
||||
#include <glog/logging.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "communication/result_stream_faker.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
DECLARE_int32(query_execution_time_sec);
|
||||
|
||||
TEST(TransactionTimeout, TransactionTimeout) {
|
||||
FLAGS_query_execution_time_sec = 3;
|
||||
GraphDb db;
|
||||
query::Interpreter interpreter;
|
||||
{
|
||||
auto interpret = [&](auto &dba, const std::string &query) {
|
||||
ResultStreamFaker stream;
|
||||
interpreter(query, dba, {}, false).PullAll(stream);
|
||||
|
||||
};
|
||||
{
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("MATCH (n) RETURN n", dba, stream, {}, false);
|
||||
interpret(dba, "MATCH (n) RETURN n");
|
||||
}
|
||||
{
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
ASSERT_THROW(
|
||||
interpreter.Interpret("MATCH (n) RETURN n", dba, stream, {}, false),
|
||||
query::HintedAbortError);
|
||||
ASSERT_THROW(interpret(dba, "MATCH (n) RETURN n"), query::HintedAbortError);
|
||||
}
|
||||
{
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("MATCH (n) RETURN n", dba, stream, {}, false);
|
||||
interpret(dba, "MATCH (n) RETURN n");
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
@ -12,17 +12,26 @@
|
||||
// TODO: This is not a unit test, but tests/integration dir is chaotic at the
|
||||
// moment. After tests refactoring is done, move/rename this.
|
||||
|
||||
namespace {
|
||||
class InterpreterTest : public ::testing::Test {
|
||||
protected:
|
||||
query::Interpreter interpreter_;
|
||||
GraphDb db_;
|
||||
|
||||
ResultStreamFaker Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, query::TypedValue> params = {}) {
|
||||
GraphDbAccessor dba(db_);
|
||||
ResultStreamFaker result;
|
||||
interpreter_(query, dba, params, false).PullAll(result);
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
// Run query with different ast twice to see if query executes correctly when
|
||||
// ast is read from cache.
|
||||
TEST(Interpreter, AstCache) {
|
||||
query::Interpreter interpreter;
|
||||
GraphDb db;
|
||||
TEST_F(InterpreterTest, AstCache) {
|
||||
{
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN 2 + 3", dba, stream, {}, false);
|
||||
auto stream = Interpret("RETURN 2 + 3");
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "2 + 3");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
@ -31,54 +40,42 @@ TEST(Interpreter, AstCache) {
|
||||
}
|
||||
{
|
||||
// Cached ast, different literals.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN 5 + 4", dba, stream, {}, false);
|
||||
auto stream = Interpret("RETURN 5 + 4");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0][0].Value<int64_t>(), 9);
|
||||
}
|
||||
{
|
||||
// Different ast (because of different types).
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN 5.5 + 4", dba, stream, {}, false);
|
||||
auto stream = Interpret("RETURN 5.5 + 4");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0][0].Value<double>(), 9.5);
|
||||
}
|
||||
{
|
||||
// Cached ast, same literals.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN 2 + 3", dba, stream, {}, false);
|
||||
auto stream = Interpret("RETURN 2 + 3");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0][0].Value<int64_t>(), 5);
|
||||
}
|
||||
{
|
||||
// Cached ast, different literals.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN 10.5 + 1", dba, stream, {}, false);
|
||||
auto stream = Interpret("RETURN 10.5 + 1");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0][0].Value<double>(), 11.5);
|
||||
}
|
||||
{
|
||||
// Cached ast, same literals, different whitespaces.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN 10.5 + 1", dba, stream, {}, false);
|
||||
auto stream = Interpret("RETURN 10.5 + 1");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0][0].Value<double>(), 11.5);
|
||||
}
|
||||
{
|
||||
// Cached ast, same literals, different named header.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN 10.5+1", dba, stream, {}, false);
|
||||
auto stream = Interpret("RETURN 10.5+1");
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "10.5+1");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
@ -88,14 +85,11 @@ TEST(Interpreter, AstCache) {
|
||||
}
|
||||
|
||||
// Run query with same ast multiple times with different parameters.
|
||||
TEST(Interpreter, Parameters) {
|
||||
TEST_F(InterpreterTest, Parameters) {
|
||||
query::Interpreter interpreter;
|
||||
GraphDb db;
|
||||
{
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN $2 + $`a b`", dba, stream,
|
||||
{{"2", 10}, {"a b", 15}}, false);
|
||||
auto stream = Interpret("RETURN $2 + $`a b`", {{"2", 10}, {"a b", 15}});
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "$2 + $`a b`");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
@ -104,10 +98,8 @@ TEST(Interpreter, Parameters) {
|
||||
}
|
||||
{
|
||||
// Not needed parameter.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN $2 + $`a b`", dba, stream,
|
||||
{{"2", 10}, {"a b", 15}, {"c", 10}}, false);
|
||||
auto stream =
|
||||
Interpret("RETURN $2 + $`a b`", {{"2", 10}, {"a b", 15}, {"c", 10}});
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "$2 + $`a b`");
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
@ -116,21 +108,15 @@ TEST(Interpreter, Parameters) {
|
||||
}
|
||||
{
|
||||
// Cached ast, different parameters.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN $2 + $`a b`", dba, stream,
|
||||
{{"2", "da"}, {"a b", "ne"}}, false);
|
||||
auto stream = Interpret("RETURN $2 + $`a b`", {{"2", "da"}, {"a b", "ne"}});
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0][0].Value<std::string>(), "dane");
|
||||
}
|
||||
{
|
||||
// Non-primitive literal.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
interpreter.Interpret("RETURN $2", dba, stream,
|
||||
{{"2", std::vector<query::TypedValue>{5, 2, 3}}},
|
||||
false);
|
||||
auto stream = Interpret("RETURN $2",
|
||||
{{"2", std::vector<query::TypedValue>{5, 2, 3}}});
|
||||
ASSERT_EQ(stream.GetResults().size(), 1U);
|
||||
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
|
||||
auto result = query::test_common::ToList<int64_t>(
|
||||
@ -139,16 +125,13 @@ TEST(Interpreter, Parameters) {
|
||||
}
|
||||
{
|
||||
// Cached ast, unprovided parameter.
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
ASSERT_THROW(interpreter.Interpret("RETURN $2 + $`a b`", dba, stream,
|
||||
{{"2", "da"}, {"ab", "ne"}}, false),
|
||||
ASSERT_THROW(Interpret("RETURN $2 + $`a b`", {{"2", "da"}, {"ab", "ne"}}),
|
||||
query::UnprovidedParameterError);
|
||||
}
|
||||
}
|
||||
|
||||
// Test bfs end to end.
|
||||
TEST(Interpreter, Bfs) {
|
||||
TEST_F(InterpreterTest, Bfs) {
|
||||
srand(0);
|
||||
const auto kNumLevels = 10;
|
||||
const auto kNumNodesPerLevel = 100;
|
||||
@ -158,15 +141,12 @@ TEST(Interpreter, Bfs) {
|
||||
const auto kReachable = "reachable";
|
||||
const auto kId = "id";
|
||||
|
||||
query::Interpreter interpreter;
|
||||
GraphDb db;
|
||||
ResultStreamFaker stream;
|
||||
std::vector<std::vector<VertexAccessor>> levels(kNumLevels);
|
||||
int id = 0;
|
||||
|
||||
// Set up.
|
||||
{
|
||||
GraphDbAccessor dba(db);
|
||||
GraphDbAccessor dba(db_);
|
||||
auto add_node = [&](int level, bool reachable) {
|
||||
auto node = dba.InsertVertex();
|
||||
node.PropsSet(dba.Property(kId), id++);
|
||||
@ -219,12 +199,13 @@ TEST(Interpreter, Bfs) {
|
||||
dba.Commit();
|
||||
}
|
||||
|
||||
GraphDbAccessor dba(db);
|
||||
|
||||
interpreter.Interpret(
|
||||
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and e.reachable)]->(m) "
|
||||
"RETURN r",
|
||||
dba, stream, {}, false);
|
||||
GraphDbAccessor dba(db_);
|
||||
ResultStreamFaker stream;
|
||||
interpreter_(
|
||||
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and "
|
||||
"e.reachable)]->(m) RETURN r",
|
||||
dba, {}, false)
|
||||
.PullAll(stream);
|
||||
|
||||
ASSERT_EQ(stream.GetHeader().size(), 1U);
|
||||
EXPECT_EQ(stream.GetHeader()[0], "r");
|
||||
@ -241,7 +222,9 @@ TEST(Interpreter, Bfs) {
|
||||
// shorter to longer ones.
|
||||
EXPECT_EQ(edges.size(), expected_level);
|
||||
// Check that starting node is correct.
|
||||
EXPECT_EQ(edges[0].from().PropsAt(dba.Property(kId)).Value<int64_t>(), 0);
|
||||
EXPECT_EQ(
|
||||
edges[0].from().PropsAt(dba.Property(kId)).template Value<int64_t>(),
|
||||
0);
|
||||
for (int i = 1; i < static_cast<int>(edges.size()); ++i) {
|
||||
// Check that edges form a connected path.
|
||||
EXPECT_EQ(edges[i - 1].to(), edges[i].from());
|
||||
@ -260,21 +243,10 @@ TEST(Interpreter, Bfs) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST(Interpreter, CreateIndexInMulticommandTransaction) {
|
||||
query::Interpreter interpreter;
|
||||
GraphDb db;
|
||||
{
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db);
|
||||
ASSERT_THROW(
|
||||
interpreter.Interpret("CREATE INDEX ON :X(y)", dba, stream, {}, true),
|
||||
query::IndexInMulticommandTxException);
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) {
|
||||
ResultStreamFaker stream;
|
||||
GraphDbAccessor dba(db_);
|
||||
ASSERT_THROW(
|
||||
interpreter_("CREATE INDEX ON :X(y)", dba, {}, true).PullAll(stream),
|
||||
query::IndexInMulticommandTxException);
|
||||
}
|
||||
|
@ -10,6 +10,8 @@
|
||||
#include "communication/result_stream_faker.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
|
||||
DECLARE_bool(query_cost_planner);
|
||||
|
||||
class QueryExecution : public testing::Test {
|
||||
protected:
|
||||
std::experimental::optional<GraphDb> db_;
|
||||
@ -36,7 +38,7 @@ class QueryExecution : public testing::Test {
|
||||
* Does NOT commit the transaction */
|
||||
auto Execute(const std::string &query) {
|
||||
ResultStreamFaker results;
|
||||
query::Interpreter().Interpret(query, *dba_, results, {}, false);
|
||||
query::Interpreter()(query, *dba_, {}, false).PullAll(results);
|
||||
return results.GetResults();
|
||||
}
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user