Handle index creation correctly

Reviewers: teon.banek, buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D891
This commit is contained in:
Mislav Bradac 2017-10-09 18:09:28 +02:00
parent bac99cc267
commit 31798eb957
11 changed files with 79 additions and 31 deletions

View File

@ -123,7 +123,8 @@ State HandleRun(Session &session, State state, Marker marker) {
std::map<std::string, query::TypedValue> params_tv(params_map.begin(),
params_map.end());
session.interpreter_.Interpret(query.ValueString(), *session.db_accessor_,
session.output_stream_, params_tv);
session.output_stream_, params_tv,
in_explicit_transaction);
if (!in_explicit_transaction) {
session.Commit();

View File

@ -49,6 +49,7 @@ class SkipListSuffix {
TIterator begin_;
typename SkipList<TValue>::Accessor accessor_;
};
/**
* @brief - Get all inserted vlists in TKey specific storage which
* still return true for the 'exists' function.

View File

@ -140,7 +140,7 @@ void query::Repl(Dbms &dbms) {
try {
auto dba = dbms.active();
ResultStreamFaker results;
interpeter.Interpret(command, *dba, results, {});
interpeter.Interpret(command, *dba, results, {}, false);
PrintResults(results);
dba->Commit();
} catch (const query::SyntaxException &e) {

View File

@ -9,11 +9,20 @@ namespace query {
class Context {
public:
// Since we also return some information from context (is_index_created_) we
// need to be sure that we have only one Context instance per query.
Context(const Context &) = delete;
Context &operator=(const Context &) = delete;
Context(Context &&) = default;
Context &operator=(Context &&) = default;
Context(GraphDbAccessor &db_accessor) : db_accessor_(db_accessor) {}
GraphDbAccessor &db_accessor_;
SymbolTable symbol_table_;
Parameters parameters_;
bool is_query_cached_ = false;
bool in_explicit_transaction_ = false;
bool is_index_created_ = false;
};
} // namespace query

View File

@ -69,6 +69,14 @@ class UnprovidedParameterError : public QueryException {
using QueryException::QueryException;
};
class IndexInMulticommandTxException : public QueryException {
public:
using QueryException::QueryException;
IndexInMulticommandTxException()
: QueryException(
"Index creation not allowed in multicommand transactions") {}
};
/**
* An exception for an illegal operation that can not be detected
* before the query starts executing over data.

View File

@ -60,10 +60,12 @@ class Interpreter {
template <typename Stream>
void Interpret(const std::string &query, GraphDbAccessor &db_accessor,
Stream &stream,
const std::map<std::string, TypedValue> &params) {
const std::map<std::string, TypedValue> &params,
bool in_explicit_transaction) {
utils::Timer frontend_timer;
Context ctx(db_accessor);
ctx.is_query_cached_ = true;
ctx.in_explicit_transaction_ = in_explicit_transaction;
std::map<std::string, TypedValue> summary;
// query -> stripped query
@ -146,7 +148,7 @@ class Interpreter {
// Below this point, ast_storage should not be used. Other than not allowing
// modifications, the ast_storage may have moved to a cache.
// generate frame based on symbol table max_position
// Generate frame based on symbol table max_position.
Frame frame(ctx.symbol_table_.max_position());
auto planning_time = planning_timer.Elapsed();
@ -158,7 +160,7 @@ class Interpreter {
// Since we have output symbols, this means that the query contains RETURN
// clause, so stream out the results.
// generate header
// Generate header.
for (const auto &symbol : output_symbols) {
// When the symbol is aliased or expanded from '*' (inside RETURN or
// WITH), then there is no token position, so use symbol name.
@ -169,7 +171,7 @@ class Interpreter {
}
stream.Header(header);
// stream out results
// Stream out results.
auto cursor = logical_plan->MakeCursor(db_accessor);
while (cursor->Pull(frame, ctx)) {
std::vector<TypedValue> values;
@ -195,6 +197,15 @@ class Interpreter {
}
auto execution_time = execution_timer.Elapsed();
if (ctx.is_index_created_) {
// If index is created we invalidate cache so that we can try to generate
// better plan with that cache.
auto accessor = plan_cache_.access();
for (const auto &cached_plan : accessor) {
accessor.remove(cached_plan.first);
}
}
summary["parsing_time"] = frontend_time.count();
summary["planning_time"] = planning_time.count();
summary["plan_execution_time"] = execution_time.count();

View File

@ -967,8 +967,7 @@ class ExpandBreadthFirstCursor : public query::plan::Cursor {
if (static_cast<int>(edge_list.size()) < upper_bound_)
expand_from_vertex(expansion.second);
if (static_cast<int64_t>(edge_list.size()) < lower_bound_)
continue;
if (static_cast<int64_t>(edge_list.size()) < lower_bound_) continue;
// place destination node on the frame, handle existence flag
if (self_.existing_node_) {
@ -2389,8 +2388,11 @@ class CreateIndexCursor : public Cursor {
CreateIndexCursor(const CreateIndex &self, GraphDbAccessor &db)
: self_(self), db_(db) {}
bool Pull(Frame &, Context &) override {
bool Pull(Frame &, Context &ctx) override {
if (did_create_) return false;
if (ctx.in_explicit_transaction_) {
throw IndexInMulticommandTxException();
}
try {
db_.BuildIndex(self_.label(), self_.property());
} catch (const IndexExistsException &) {
@ -2402,7 +2404,7 @@ class CreateIndexCursor : public Cursor {
"Index building already in progress on this database. Memgraph "
"does not support concurrent index building.");
}
did_create_ = true;
ctx.is_index_created_ = did_create_ = true;
return true;
}

View File

@ -43,7 +43,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {
auto dba = dbms_->active();
while (state.KeepRunning()) {
ResultStreamFaker results;
interpeter_.Interpret(query, *dba, results, {});
interpeter_.Interpret(query, *dba, results, {}, false);
}
}
@ -57,7 +57,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) {
auto dba = dbms_->active();
while (state.KeepRunning()) {
ResultStreamFaker results;
interpeter_.Interpret(query, *dba, results, {});
interpeter_.Interpret(query, *dba, results, {}, false);
}
}

View File

@ -15,19 +15,20 @@ TEST(TransactionTimeout, TransactionTimeout) {
{
ResultStreamFaker stream;
auto dba1 = dbms.active();
interpreter.Interpret("MATCH (n) RETURN n", *dba1, stream, {});
interpreter.Interpret("MATCH (n) RETURN n", *dba1, stream, {}, false);
}
{
ResultStreamFaker stream;
auto dba2 = dbms.active();
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_THROW(interpreter.Interpret("MATCH (n) RETURN n", *dba2, stream, {}),
query::HintedAbortError);
ASSERT_THROW(
interpreter.Interpret("MATCH (n) RETURN n", *dba2, stream, {}, false),
query::HintedAbortError);
}
{
ResultStreamFaker stream;
auto dba3 = dbms.active();
interpreter.Interpret("MATCH (n) RETURN n", *dba3, stream, {});
interpreter.Interpret("MATCH (n) RETURN n", *dba3, stream, {}, false);
}
}

View File

@ -6,6 +6,7 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "query/exceptions.hpp"
#include "query/exceptions.hpp"
#include "query/interpreter.hpp"
#include "query/typed_value.hpp"
#include "query_common.hpp"
@ -23,7 +24,7 @@ TEST(Interpreter, AstCache) {
{
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN 2 + 3", *dba, stream, {});
interpreter.Interpret("RETURN 2 + 3", *dba, stream, {}, false);
ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "2 + 3");
ASSERT_EQ(stream.GetResults().size(), 1U);
@ -34,7 +35,7 @@ TEST(Interpreter, AstCache) {
// Cached ast, different literals.
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN 5 + 4", *dba, stream, {});
interpreter.Interpret("RETURN 5 + 4", *dba, stream, {}, false);
ASSERT_EQ(stream.GetResults().size(), 1U);
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
ASSERT_EQ(stream.GetResults()[0][0].Value<int64_t>(), 9);
@ -43,7 +44,7 @@ TEST(Interpreter, AstCache) {
// Different ast (because of different types).
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN 5.5 + 4", *dba, stream, {});
interpreter.Interpret("RETURN 5.5 + 4", *dba, stream, {}, false);
ASSERT_EQ(stream.GetResults().size(), 1U);
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
ASSERT_EQ(stream.GetResults()[0][0].Value<double>(), 9.5);
@ -52,7 +53,7 @@ TEST(Interpreter, AstCache) {
// Cached ast, same literals.
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN 2 + 3", *dba, stream, {});
interpreter.Interpret("RETURN 2 + 3", *dba, stream, {}, false);
ASSERT_EQ(stream.GetResults().size(), 1U);
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
ASSERT_EQ(stream.GetResults()[0][0].Value<int64_t>(), 5);
@ -61,7 +62,7 @@ TEST(Interpreter, AstCache) {
// Cached ast, different literals.
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN 10.5 + 1", *dba, stream, {});
interpreter.Interpret("RETURN 10.5 + 1", *dba, stream, {}, false);
ASSERT_EQ(stream.GetResults().size(), 1U);
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
ASSERT_EQ(stream.GetResults()[0][0].Value<double>(), 11.5);
@ -70,7 +71,7 @@ TEST(Interpreter, AstCache) {
// Cached ast, same literals, different whitespaces.
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN 10.5 + 1", *dba, stream, {});
interpreter.Interpret("RETURN 10.5 + 1", *dba, stream, {}, false);
ASSERT_EQ(stream.GetResults().size(), 1U);
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
ASSERT_EQ(stream.GetResults()[0][0].Value<double>(), 11.5);
@ -79,7 +80,7 @@ TEST(Interpreter, AstCache) {
// Cached ast, same literals, different named header.
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN 10.5+1", *dba, stream, {});
interpreter.Interpret("RETURN 10.5+1", *dba, stream, {}, false);
ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "10.5+1");
ASSERT_EQ(stream.GetResults().size(), 1U);
@ -96,7 +97,7 @@ TEST(Interpreter, Parameters) {
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN $2 + $`a b`", *dba, stream,
{{"2", 10}, {"a b", 15}});
{{"2", 10}, {"a b", 15}}, false);
ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "$2 + $`a b`");
ASSERT_EQ(stream.GetResults().size(), 1U);
@ -108,7 +109,7 @@ TEST(Interpreter, Parameters) {
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN $2 + $`a b`", *dba, stream,
{{"2", 10}, {"a b", 15}, {"c", 10}});
{{"2", 10}, {"a b", 15}, {"c", 10}}, false);
ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "$2 + $`a b`");
ASSERT_EQ(stream.GetResults().size(), 1U);
@ -120,7 +121,7 @@ TEST(Interpreter, Parameters) {
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN $2 + $`a b`", *dba, stream,
{{"2", "da"}, {"a b", "ne"}});
{{"2", "da"}, {"a b", "ne"}}, false);
ASSERT_EQ(stream.GetResults().size(), 1U);
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
ASSERT_EQ(stream.GetResults()[0][0].Value<std::string>(), "dane");
@ -130,7 +131,8 @@ TEST(Interpreter, Parameters) {
ResultStreamFaker stream;
auto dba = dbms.active();
interpreter.Interpret("RETURN $2", *dba, stream,
{{"2", std::vector<query::TypedValue>{5, 2, 3}}});
{{"2", std::vector<query::TypedValue>{5, 2, 3}}},
false);
ASSERT_EQ(stream.GetResults().size(), 1U);
ASSERT_EQ(stream.GetResults()[0].size(), 1U);
auto result = query::test_common::ToList<int64_t>(
@ -142,7 +144,7 @@ TEST(Interpreter, Parameters) {
ResultStreamFaker stream;
auto dba = dbms.active();
ASSERT_THROW(interpreter.Interpret("RETURN $2 + $`a b`", *dba, stream,
{{"2", "da"}, {"ab", "ne"}}),
{{"2", "da"}, {"ab", "ne"}}, false),
query::UnprovidedParameterError);
}
}
@ -224,7 +226,7 @@ TEST(Interpreter, Bfs) {
interpreter.Interpret(
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and e.reachable)]->(m) "
"RETURN r",
*dba, stream, {});
*dba, stream, {}, false);
ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "r");
@ -259,6 +261,18 @@ TEST(Interpreter, Bfs) {
}
}
}
TEST(Interpreter, CreateIndexInMulticommandTransaction) {
query::Interpreter interpreter;
Dbms dbms;
{
ResultStreamFaker stream;
auto dba = dbms.active();
ASSERT_THROW(
interpreter.Interpret("CREATE INDEX ON :X(y)", *dba, stream, {}, true),
query::IndexInMulticommandTxException);
}
}
}
int main(int argc, char **argv) {

View File

@ -26,7 +26,7 @@ class QueryExecution : public testing::Test {
* Does NOT commit the transaction */
auto Execute(const std::string &query) {
ResultStreamFaker results;
query::Interpreter().Interpret(query, *db_, results, {});
query::Interpreter().Interpret(query, *db_, results, {}, false);
return results.GetResults();
}
};
@ -47,7 +47,8 @@ TEST_F(QueryExecution, MissingOptionalIntoExpand) {
return Execute(std::string("MATCH (p:Person) WITH p ORDER BY p.id ") +
(desc ? "DESC " : "") +
"OPTIONAL MATCH (p)-->(d:Dog) WITH p, d "
"MATCH (d)" + edge_pattern +
"MATCH (d)" +
edge_pattern +
"(f:Food) "
"RETURN p, d, f")
.size();