Allocate initial execution memory

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2316
This commit is contained in:
Matej Ferencevic 2019-08-22 14:15:45 +02:00
parent 9f4a7dcddf
commit d3b141f3ba
14 changed files with 98 additions and 58 deletions

View File

@ -126,7 +126,9 @@ void KafkaStreamWriter(
for (const auto &kv : params)
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
try {
(*session_data.interpreter)(query, dba, params_pv, false).PullAll(stream);
(*session_data.interpreter)(query, dba, params_pv, false,
utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
} catch (const utils::BasicException &e) {
LOG(WARNING) << "[Kafka] query execution failed with an exception: "

View File

@ -7,8 +7,6 @@
namespace query {
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
struct EvaluationContext {
/// Memory for allocations during evaluation of a *single* Pull call.
///

View File

@ -775,7 +775,7 @@ Interpreter::Interpreter() : is_tsc_available_(utils::CheckAvailableTSC()) {}
Interpreter::Results Interpreter::operator()(
const std::string &query_string, database::GraphDbAccessor &db_accessor,
const std::map<std::string, PropertyValue> &params,
bool in_explicit_transaction) {
bool in_explicit_transaction, utils::MemoryResource *execution_memory) {
AstStorage ast_storage;
Parameters parameters;
std::map<std::string, TypedValue> summary;
@ -832,7 +832,7 @@ Interpreter::Results Interpreter::operator()(
}
return Results(&db_accessor, parameters, plan, output_symbols, header,
summary, parsed_query.required_privileges);
summary, parsed_query.required_privileges, execution_memory);
}
if (utils::IsSubtype(*parsed_query.query, ExplainQuery::kType)) {
@ -902,7 +902,7 @@ Interpreter::Results Interpreter::operator()(
std::vector<std::string> header{query_plan_symbol.name()};
return Results(&db_accessor, parameters, plan, output_symbols, header,
summary, parsed_query.required_privileges);
summary, parsed_query.required_privileges, execution_memory);
}
if (utils::IsSubtype(*parsed_query.query, ProfileQuery::kType)) {
@ -983,7 +983,7 @@ Interpreter::Results Interpreter::operator()(
summary["planning_time"] = planning_time.count();
return Results(&db_accessor, parameters, plan, output_symbols, header,
summary, parsed_query.required_privileges,
summary, parsed_query.required_privileges, execution_memory,
/* is_profile_query */ true, /* should_abort_query */ true);
}
@ -1005,7 +1005,7 @@ Interpreter::Results Interpreter::operator()(
summary["planning_time"] = planning_timer.Elapsed().count();
return Results(&db_accessor, parameters, plan, output_symbols, header,
summary, parsed_query.required_privileges,
summary, parsed_query.required_privileges, execution_memory,
/* is_profile_query */ false,
/* should_abort_query */ false);
#else
@ -1079,6 +1079,7 @@ Interpreter::Results Interpreter::operator()(
return Results(&db_accessor, parameters, plan, output_symbols,
callback.header, summary, parsed_query.required_privileges,
execution_memory,
/* is_profile_query */ false, callback.should_abort_query);
}

View File

@ -88,13 +88,12 @@ class Interpreter {
std::vector<Symbol> output_symbols, std::vector<std::string> header,
std::map<std::string, TypedValue> summary,
std::vector<AuthQuery::Privilege> privileges,
utils::MemoryResource *execution_memory,
bool is_profile_query = false, bool should_abort_query = false)
: ctx_{db_accessor},
plan_(plan),
execution_memory_(std::make_unique<utils::MonotonicBufferResource>(
kExecutionMemoryBlockSize)),
cursor_(plan_->plan().MakeCursor(execution_memory_.get())),
frame_(plan_->symbol_table().max_position(), execution_memory_.get()),
cursor_(plan_->plan().MakeCursor(execution_memory)),
frame_(plan_->symbol_table().max_position(), execution_memory),
output_symbols_(output_symbols),
header_(header),
summary_(summary),
@ -185,9 +184,6 @@ class Interpreter {
private:
ExecutionContext ctx_;
std::shared_ptr<CachedPlan> plan_;
// execution_memory_ is unique_ptr, because we are passing the address to
// cursor_, and we want to preserve the pointer in case we get moved.
std::unique_ptr<utils::MonotonicBufferResource> execution_memory_;
query::plan::UniqueCursorPtr cursor_;
Frame frame_;
std::vector<Symbol> output_symbols_;
@ -217,7 +213,8 @@ class Interpreter {
virtual Results operator()(const std::string &query,
database::GraphDbAccessor &db_accessor,
const std::map<std::string, PropertyValue> &params,
bool in_explicit_transaction);
bool in_explicit_transaction,
utils::MemoryResource *execution_memory);
auth::Auth *auth_ = nullptr;
integrations::kafka::Streams *kafka_streams_ = nullptr;

View File

@ -63,7 +63,8 @@ void query::Repl(database::GraphDb *db, query::Interpreter *interpreter) {
try {
auto dba = db->Access();
ResultStreamFaker<query::TypedValue> stream;
auto results = (*interpreter)(command, dba, {}, false);
auto results =
(*interpreter)(command, dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());

View File

@ -5,14 +5,20 @@
#include "query/exceptions.hpp"
#include "query/interpreter.hpp"
#include "utils/likely.hpp"
#include "utils/memory.hpp"
#include "utils/string.hpp"
namespace query {
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
class TransactionEngine final {
public:
TransactionEngine(database::GraphDb *db, Interpreter *interpreter)
: db_(db), interpreter_(interpreter) {}
: db_(db),
interpreter_(interpreter),
execution_memory_(&initial_memory_block_[0],
kExecutionMemoryBlockSize) {}
~TransactionEngine() { Abort(); }
@ -21,6 +27,7 @@ class TransactionEngine final {
const std::map<std::string, PropertyValue> &params) {
// Clear pending results.
results_ = std::nullopt;
execution_memory_.Release();
// Check the query for transaction commands.
auto query_upper = utils::Trim(utils::ToUpperCase(query));
@ -67,10 +74,15 @@ class TransactionEngine final {
// Create a DB accessor if we don't yet have one.
if (!db_accessor_) db_accessor_.emplace(db_->Access());
// Clear leftover results.
results_ = std::nullopt;
execution_memory_.Release();
// Interpret the query and return the headers.
try {
results_.emplace((*interpreter_)(query, *db_accessor_, params,
in_explicit_transaction_));
in_explicit_transaction_,
&execution_memory_));
return {results_->header(), results_->privileges()};
} catch (const utils::BasicException &) {
AbortCommand();
@ -112,6 +124,7 @@ class TransactionEngine final {
void Abort() {
results_ = std::nullopt;
execution_memory_.Release();
expect_rollback_ = false;
in_explicit_transaction_ = false;
if (!db_accessor_) return;
@ -131,8 +144,12 @@ class TransactionEngine final {
bool in_explicit_transaction_{false};
bool expect_rollback_{false};
uint8_t initial_memory_block_[kExecutionMemoryBlockSize];
utils::MonotonicBufferResource execution_memory_;
void Commit() {
results_ = std::nullopt;
execution_memory_.Release();
if (!db_accessor_) return;
db_accessor_->Commit();
db_accessor_ = std::nullopt;
@ -140,12 +157,14 @@ class TransactionEngine final {
void AdvanceCommand() {
results_ = std::nullopt;
execution_memory_.Release();
if (!db_accessor_) return;
db_accessor_->AdvanceCommand();
}
void AbortCommand() {
results_ = std::nullopt;
execution_memory_.Release();
if (in_explicit_transaction_) {
expect_rollback_ = true;
} else {

View File

@ -46,7 +46,8 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {
auto dba = db_->Access();
while (state.KeepRunning()) {
ResultStreamFaker<query::TypedValue> results;
interpreter()(query, dba, {}, false).PullAll(results);
interpreter()(query, dba, {}, false, utils::NewDeleteResource())
.PullAll(results);
}
}
@ -60,7 +61,8 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) {
auto dba = db_->Access();
while (state.KeepRunning()) {
ResultStreamFaker<query::TypedValue> results;
interpreter()(query, dba, {}, false).PullAll(results);
interpreter()(query, dba, {}, false, utils::NewDeleteResource())
.PullAll(results);
}
}

View File

@ -1,6 +1,7 @@
#include <benchmark/benchmark.h>
#include "query/interpret/eval.hpp"
#include "query/transaction_engine.hpp"
// The following classes are wrappers for utils::MemoryResource, so that we can
// use BENCHMARK_TEMPLATE

View File

@ -11,6 +11,7 @@
#include "query/frontend/semantic/symbol_generator.hpp"
#include "query/interpreter.hpp"
#include "query/plan/planner.hpp"
#include "query/transaction_engine.hpp"
// The following classes are wrappers for utils::MemoryResource, so that we can
// use BENCHMARK_TEMPLATE

View File

@ -14,7 +14,8 @@ int main(int argc, char *argv[]) {
database::GraphDb db;
auto dba = db.Access();
ResultStreamFaker<query::TypedValue> stream;
auto results = query::Interpreter()(argv[1], dba, {}, false);
auto results =
query::Interpreter()(argv[1], dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());

View File

@ -184,7 +184,8 @@ class DatabaseEnvironment {
void Execute(GraphDbAccessor *dba, const std::string &query) {
CHECK(dba);
ResultStreamFaker<query::TypedValue> results;
query::Interpreter()(query, *dba, {}, false).PullAll(results);
query::Interpreter()(query, *dba, {}, false, utils::NewDeleteResource())
.PullAll(results);
}
VertexAccessor CreateVertex(GraphDbAccessor *dba,
@ -560,7 +561,8 @@ TEST(DumpTest, ExecuteDumpDatabase) {
auto dba = db.Access();
const std::string query = "DUMP DATABASE";
ResultStreamFaker<query::TypedValue> stream;
auto results = query::Interpreter()(query, dba, {}, false);
auto results =
query::Interpreter()(query, dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);

View File

@ -13,7 +13,8 @@ TEST(TransactionTimeout, TransactionTimeout) {
query::Interpreter interpreter;
auto interpret = [&](auto &dba, const std::string &query) {
ResultStreamFaker<query::TypedValue> stream;
interpreter(query, dba, {}, false).PullAll(stream);
interpreter(query, dba, {}, false, utils::NewDeleteResource())
.PullAll(stream);
};
{
auto dba = db.Access();

View File

@ -21,7 +21,8 @@ class InterpreterTest : public ::testing::Test {
const std::map<std::string, PropertyValue> &params = {}) {
auto dba = db_.Access();
ResultStreamFaker<query::TypedValue> stream;
auto results = interpreter_(query, dba, params, false);
auto results =
interpreter_(query, dba, params, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
@ -203,7 +204,7 @@ TEST_F(InterpreterTest, Bfs) {
auto results = interpreter_(
"MATCH (n {id: 0})-[r *bfs..5 (e, n | n.reachable and "
"e.reachable)]->(m) RETURN r",
dba, {}, false);
dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
@ -246,9 +247,10 @@ TEST_F(InterpreterTest, Bfs) {
TEST_F(InterpreterTest, CreateIndexInMulticommandTransaction) {
ResultStreamFaker<query::TypedValue> stream;
auto dba = db_.Access();
ASSERT_THROW(
interpreter_("CREATE INDEX ON :X(y)", dba, {}, true).PullAll(stream),
query::IndexInMulticommandTxException);
ASSERT_THROW(interpreter_("CREATE INDEX ON :X(y)", dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream),
query::IndexInMulticommandTxException);
}
// Test shortest path end to end.
@ -259,7 +261,7 @@ TEST_F(InterpreterTest, ShortestPath) {
interpreter_(
"CREATE (n:A {x: 1}), (m:B {x: 2}), (l:C {x: 1}), (n)-[:r1 {w: 1 "
"}]->(m)-[:r2 {w: 2}]->(l), (n)-[:r3 {w: 4}]->(l)",
dba, {}, true)
dba, {}, true, utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
@ -269,7 +271,7 @@ TEST_F(InterpreterTest, ShortestPath) {
auto dba = db_.Access();
auto results =
interpreter_("MATCH (n)-[e *wshortest 5 (e, n | e.w) ]->(m) return e",
dba, {}, false);
dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
@ -307,44 +309,55 @@ TEST_F(InterpreterTest, UniqueConstraintTest) {
{
auto dba = db_.Access();
interpreter_("CREATE CONSTRAINT ON (n:A) ASSERT n.a, n.b IS UNIQUE;", dba,
{}, true)
{}, true, utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
interpreter_("CREATE (:A{a:1, b:1})", dba, {}, true).PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
interpreter_("CREATE (:A{a:2, b:2})", dba, {}, true).PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
ASSERT_THROW(
interpreter_("CREATE (:A{a:1, b:1})", dba, {}, true).PullAll(stream),
query::QueryRuntimeException);
dba.Commit();
}
{
auto dba = db_.Access();
interpreter_("MATCH (n:A{a:2, b:2}) SET n.a=1", dba, {}, true)
interpreter_("CREATE (:A{a:1, b:1})", dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
interpreter_("CREATE (:A{a:2, b:2})", dba, {}, true).PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
interpreter_("MATCH (n:A{a:2, b:2}) DETACH DELETE n", dba, {}, true)
interpreter_("CREATE (:A{a:2, b:2})", dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
ASSERT_THROW(interpreter_("CREATE (:A{a:1, b:1})", dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream),
query::QueryRuntimeException);
dba.Commit();
}
{
auto dba = db_.Access();
interpreter_("MATCH (n:A{a:2, b:2}) SET n.a=1", dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
interpreter_("CREATE (:A{a:2, b:2})", dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
dba.Commit();
}
{
auto dba = db_.Access();
interpreter_("MATCH (n:A{a:2, b:2}) DETACH DELETE n", dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
interpreter_("CREATE (n:A{a:2, b:2})", dba, {}, true,
utils::NewDeleteResource())
.PullAll(stream);
interpreter_("CREATE (n:A{a:2, b:2})", dba, {}, true).PullAll(stream);
dba.Commit();
}
}

View File

@ -40,7 +40,8 @@ class QueryExecution : public testing::Test {
* Does NOT commit the transaction */
auto Execute(const std::string &query) {
ResultStreamFaker<query::TypedValue> stream;
auto results = query::Interpreter()(query, *dba_, {}, false);
auto results = query::Interpreter()(query, *dba_, {}, false,
utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());