Extract state shared between interpreters into InterpreterContext

Reviewers: teon.banek, mferencevic

Reviewed By: teon.banek

Subscribers: mferencevic, pullbot

Differential Revision: https://phabricator.memgraph.io/D2471
This commit is contained in:
Lovro Lugovic 2019-10-07 17:31:25 +02:00
parent 35180f11d3
commit 4c25719c45
14 changed files with 117 additions and 97 deletions

View File

@ -97,8 +97,8 @@ void SingleNodeMain() {
#else
database::GraphDb db;
#endif
query::Interpreter interpreter;
SessionData session_data{&db, &interpreter, &auth, &audit_log};
query::Interpreter::InterpreterContext interpreter_context;
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
integrations::kafka::Streams kafka_streams{
durability_directory / "streams",
@ -108,6 +108,9 @@ void SingleNodeMain() {
KafkaStreamWriter(session_data, query, params);
}};
interpreter_context.auth = &auth;
interpreter_context.kafka_streams = &kafka_streams;
try {
// Recover possible streams.
kafka_streams.Recover();
@ -115,9 +118,6 @@ void SingleNodeMain() {
LOG(ERROR) << e.what();
}
session_data.interpreter->auth_ = &auth;
session_data.interpreter->kafka_streams_ = &kafka_streams;
ServerContext context;
std::string service_name = "Bolt";
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {

View File

@ -40,8 +40,8 @@ void SingleNodeHAMain() {
auto durability_directory = std::filesystem::path(FLAGS_durability_directory);
database::GraphDb db;
query::Interpreter interpreter;
SessionData session_data{&db, &interpreter, nullptr, nullptr};
query::Interpreter::InterpreterContext interpreter_context;
SessionData session_data{&db, &interpreter_context, nullptr, nullptr};
ServerContext context;
std::string service_name = "Bolt";
@ -55,9 +55,7 @@ void SingleNodeHAMain() {
service_name, FLAGS_num_workers);
// Handler for regular termination signals
auto shutdown = [&db] {
db.Shutdown();
};
auto shutdown = [&db] { db.Shutdown(); };
InitSignalHandlers(shutdown);

View File

@ -32,12 +32,14 @@ BoltSession::BoltSession(SessionData *data,
#ifdef MG_SINGLE_NODE_V2
db_(data->db),
#endif
transaction_engine_(data->db, data->interpreter),
interpreter_(data->interpreter_context),
transaction_engine_(data->db, &interpreter_),
#ifndef MG_SINGLE_NODE_HA
auth_(data->auth),
audit_log_(data->audit_log),
#endif
endpoint_(endpoint) {}
endpoint_(endpoint) {
}
using TEncoder =
communication::bolt::Session<communication::InputStream,
@ -174,8 +176,9 @@ void KafkaStreamWriter(
for (const auto &kv : params)
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
try {
(*session_data.interpreter)(query, &execution_dba, params_pv, false,
utils::NewDeleteResource())
query::Interpreter interpreter{session_data.interpreter_context};
interpreter(query, &execution_dba, params_pv, false,
utils::NewDeleteResource())
.PullAll(stream);
#ifdef MG_SINGLE_NODE_V2
auto maybe_constraint_violation = dba.Commit();

View File

@ -31,14 +31,15 @@ DECLARE_string(durability_directory);
struct SessionData {
// Explicit constructor here to ensure that pointers to all objects are
// supplied.
SessionData(database::GraphDb *_db, query::Interpreter *_interpreter,
SessionData(database::GraphDb *_db,
query::Interpreter::InterpreterContext *_interpreter_context,
auth::Auth *_auth, audit::Log *_audit_log)
: db(_db),
interpreter(_interpreter),
interpreter_context(_interpreter_context),
auth(_auth),
audit_log(_audit_log) {}
database::GraphDb *db;
query::Interpreter *interpreter;
query::Interpreter::InterpreterContext *interpreter_context;
auth::Auth *auth;
audit::Log *audit_log;
};
@ -82,8 +83,8 @@ class BoltSession final
private:
TEncoder *encoder_;
#ifdef MG_SINGLE_NODE_V2
// NOTE: Needed only for ToBoltValue conversions
const storage::Storage *db_;
// NOTE: Needed only for ToBoltValue conversions
const storage::Storage *db_;
#endif
};
@ -91,6 +92,7 @@ class BoltSession final
// NOTE: Needed only for ToBoltValue conversions
const storage::Storage *db_;
#endif
query::Interpreter interpreter_;
query::TransactionEngine transaction_engine_;
#ifndef MG_SINGLE_NODE_HA
auth::Auth *auth_;

View File

@ -719,9 +719,8 @@ Callback HandleInfoQuery(InfoQuery *info_query, DbAccessor *db_accessor) {
std::vector<std::vector<TypedValue>> results(
{{TypedValue("is_leader"),
TypedValue(db_accessor->raft()->IsLeader())},
{TypedValue("term_id"),
TypedValue(static_cast<int64_t>(
db_accessor->raft()->TermId()))}});
{TypedValue("term_id"), TypedValue(static_cast<int64_t>(
db_accessor->raft()->TermId()))}});
return results;
};
// It is critical to abort this query because it can be executed on
@ -834,7 +833,10 @@ Callback HandleConstraintQuery(ConstraintQuery *constraint_query,
return callback;
}
Interpreter::Interpreter() : is_tsc_available_(utils::CheckAvailableTSC()) {}
Interpreter::Interpreter(InterpreterContext *interpreter_context)
: interpreter_context_(interpreter_context) {
CHECK(interpreter_context_) << "Interpreter context must not be NULL";
}
Interpreter::Results Interpreter::operator()(
const std::string &query_string, DbAccessor *db_accessor,
@ -982,7 +984,7 @@ Interpreter::Results Interpreter::operator()(
throw ProfileInMulticommandTxException();
}
if (!is_tsc_available_) {
if (!interpreter_context_->is_tsc_available) {
throw QueryException("TSC support is missing for PROFILE");
}
@ -1085,7 +1087,8 @@ Interpreter::Results Interpreter::operator()(
throw IndexInMulticommandTxException();
}
// Creating an index influences computed plan costs.
auto invalidate_plan_cache = [plan_cache = &this->plan_cache_] {
auto invalidate_plan_cache = [plan_cache =
&this->interpreter_context_->plan_cache] {
auto access = plan_cache->access();
for (auto &kv : access) {
access.remove(kv.first);
@ -1103,7 +1106,8 @@ Interpreter::Results Interpreter::operator()(
if (in_explicit_transaction) {
throw UserModificationInMulticommandTxException();
}
callback = HandleAuthQuery(auth_query, auth_, parameters, db_accessor);
callback = HandleAuthQuery(auth_query, interpreter_context_->auth,
parameters, db_accessor);
#endif
} else if (auto *stream_query =
utils::Downcast<StreamQuery>(parsed_query.query)) {
@ -1114,8 +1118,9 @@ Interpreter::Results Interpreter::operator()(
if (in_explicit_transaction) {
throw StreamClauseInMulticommandTxException();
}
callback = HandleStreamQuery(stream_query, kafka_streams_, parameters,
db_accessor);
callback =
HandleStreamQuery(stream_query, interpreter_context_->kafka_streams,
parameters, db_accessor);
#endif
} else if (auto *info_query =
utils::Downcast<InfoQuery>(parsed_query.query)) {
@ -1152,7 +1157,7 @@ Interpreter::Results Interpreter::operator()(
std::shared_ptr<Interpreter::CachedPlan> Interpreter::CypherQueryToPlan(
HashType query_hash, CypherQuery *query, AstStorage ast_storage,
const Parameters &parameters, DbAccessor *db_accessor) {
auto plan_cache_access = plan_cache_.access();
auto plan_cache_access = interpreter_context_->plan_cache.access();
auto it = plan_cache_access.find(query_hash);
if (it != plan_cache_access.end()) {
if (it->second->IsExpired()) {
@ -1175,7 +1180,7 @@ Interpreter::ParsedQuery Interpreter::ParseQuery(
// Parse original query into antlr4 AST.
auto parser = [&] {
// Be careful about unlocking since parser can throw.
std::unique_lock<utils::SpinLock> guard(antlr_lock_);
std::unique_lock<utils::SpinLock> guard(interpreter_context_->antlr_lock);
return std::make_unique<frontend::opencypher::Parser>(original_query);
}();
// Convert antlr4 AST into Memgraph AST.
@ -1187,13 +1192,13 @@ Interpreter::ParsedQuery Interpreter::ParseQuery(
auto stripped_query_hash = fnv(stripped_query);
auto ast_cache_accessor = ast_cache_.access();
auto ast_cache_accessor = interpreter_context_->ast_cache.access();
auto ast_it = ast_cache_accessor.find(stripped_query_hash);
if (ast_it == ast_cache_accessor.end()) {
// Parse stripped query into antlr4 AST.
auto parser = [&] {
// Be careful about unlocking since parser can throw.
std::unique_lock<utils::SpinLock> guard(antlr_lock_);
std::unique_lock<utils::SpinLock> guard(interpreter_context_->antlr_lock);
try {
return std::make_unique<frontend::opencypher::Parser>(stripped_query);
} catch (const SyntaxException &e) {

View File

@ -12,6 +12,7 @@
#include "utils/skip_list.hpp"
#include "utils/spin_lock.hpp"
#include "utils/timer.hpp"
#include "utils/tsc.hpp"
DECLARE_bool(query_cost_planner);
DECLARE_int32(query_plan_cache_ttl);
@ -65,7 +66,56 @@ class Interpreter {
std::vector<AuthQuery::Privilege> required_privileges;
};
struct QueryCacheEntry {
bool operator==(const QueryCacheEntry &other) const {
return first == other.first;
}
bool operator<(const QueryCacheEntry &other) const {
return first < other.first;
}
bool operator==(const HashType &other) const { return first == other; }
bool operator<(const HashType &other) const { return first < other; }
HashType first;
// TODO: Maybe store the query string here and use it as a key with the hash
// so that we eliminate the risk of hash collisions.
CachedQuery second;
};
struct PlanCacheEntry {
bool operator==(const PlanCacheEntry &other) const {
return first == other.first;
}
bool operator<(const PlanCacheEntry &other) const {
return first < other.first;
}
bool operator==(const HashType &other) const { return first == other; }
bool operator<(const HashType &other) const { return first < other; }
HashType first;
// TODO: Maybe store the query string here and use it as a key with the hash
// so that we eliminate the risk of hash collisions.
std::shared_ptr<CachedPlan> second;
};
public:
struct InterpreterContext {
// Antlr has singleton instance that is shared between threads. It is
// protected by locks inside of antlr. Unfortunately, they are not protected
// in a very good way. Once we have antlr version without race conditions we
// can remove this lock. This will probably never happen since antlr
// developers introduce more bugs in each version. Fortunately, we have
// cache so this lock probably won't impact performance much...
utils::SpinLock antlr_lock;
bool is_tsc_available{utils::CheckAvailableTSC()};
auth::Auth *auth{nullptr};
integrations::kafka::Streams *kafka_streams{nullptr};
utils::SkipList<QueryCacheEntry> ast_cache;
utils::SkipList<PlanCacheEntry> plan_cache;
};
/**
* Wraps a `Query` that was created as a result of parsing a query string
* along with its privileges.
@ -204,7 +254,7 @@ class Interpreter {
bool should_abort_query_;
};
Interpreter();
explicit Interpreter(InterpreterContext *interpreter_context);
Interpreter(const Interpreter &) = delete;
Interpreter &operator=(const Interpreter &) = delete;
Interpreter(Interpreter &&) = delete;
@ -221,9 +271,6 @@ class Interpreter {
bool in_explicit_transaction,
utils::MemoryResource *execution_memory);
auth::Auth *auth_ = nullptr;
integrations::kafka::Streams *kafka_streams_ = nullptr;
protected:
std::pair<frontend::StrippedQuery, ParsedQuery> StripAndParseQuery(
const std::string &, Parameters *, AstStorage *ast_storage,
@ -244,49 +291,7 @@ class Interpreter {
const plan::LogicalOperator *);
private:
struct QueryCacheEntry {
bool operator==(const QueryCacheEntry &other) const {
return first == other.first;
}
bool operator<(const QueryCacheEntry &other) const {
return first < other.first;
}
bool operator==(const HashType &other) const { return first == other; }
bool operator<(const HashType &other) const { return first < other; }
HashType first;
// TODO: Maybe store the query string here and use it as a key with the hash
// so that we eliminate the risk of hash collisions.
CachedQuery second;
};
struct PlanCacheEntry {
bool operator==(const PlanCacheEntry &other) const {
return first == other.first;
}
bool operator<(const PlanCacheEntry &other) const {
return first < other.first;
}
bool operator==(const HashType &other) const { return first == other; }
bool operator<(const HashType &other) const { return first < other; }
HashType first;
// TODO: Maybe store the query string here and use it as a key with the hash
// so that we eliminate the risk of hash collisions.
std::shared_ptr<CachedPlan> second;
};
utils::SkipList<QueryCacheEntry> ast_cache_;
utils::SkipList<PlanCacheEntry> plan_cache_;
// Antlr has singleton instance that is shared between threads. It is
// protected by locks inside of antlr. Unfortunately, they are not protected
// in a very good way. Once we have antlr version without race conditions we
// can remove this lock. This will probably never happen since antlr
// developers introduce more bugs in each version. Fortunately, we have cache
// so this lock probably won't impact performance much...
utils::SpinLock antlr_lock_;
bool is_tsc_available_;
InterpreterContext *interpreter_context_;
// high level tree -> CachedPlan
std::shared_ptr<CachedPlan> CypherQueryToPlan(HashType query_hash,

View File

@ -13,7 +13,8 @@ class ExpansionBenchFixture : public benchmark::Fixture {
// GraphDb shouldn't be global constructed/destructed. See
// documentation in database/single_node/graph_db.hpp for details.
std::optional<database::GraphDb> db_;
query::Interpreter interpreter_;
query::Interpreter::InterpreterContext interpreter_context_;
query::Interpreter interpreter_{&interpreter_context_};
void SetUp(const benchmark::State &state) override {
db_.emplace();

View File

@ -38,9 +38,9 @@ void KafkaBenchmarkMain() {
audit::kBufferSizeDefault,
audit::kBufferFlushIntervalMillisDefault};
query::Interpreter interpreter;
query::Interpreter::InterpreterContext interpreter_context;
database::GraphDb db;
SessionData session_data{&db, &interpreter, &auth, &audit_log};
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
std::atomic<int64_t> query_counter{0};
std::atomic<bool> timeout_reached{false};
@ -55,8 +55,8 @@ void KafkaBenchmarkMain() {
query_counter++;
}};
session_data.interpreter->auth_ = &auth;
session_data.interpreter->kafka_streams_ = &kafka_streams;
interpreter_context.auth = &auth;
interpreter_context.kafka_streams = &kafka_streams;
std::string stream_name = "benchmark";

View File

@ -297,7 +297,8 @@ int main(int argc, char *argv[]) {
std::cout << "Generating graph..." << std::endl;
// fill_db;
random_generate(db, node_count, edge_count);
query::Interpreter interpreter;
query::Interpreter::InterpreterContext interpreter_context;
query::Interpreter interpreter{&interpreter_context};
query::Repl(&db, &interpreter);
return 0;
}

View File

@ -15,8 +15,9 @@ int main(int argc, char *argv[]) {
auto dba = db.Access();
query::DbAccessor query_dba(&dba);
ResultStreamFaker<query::TypedValue> stream;
auto results = query::Interpreter()(argv[1], &query_dba, {}, false,
utils::NewDeleteResource());
query::Interpreter::InterpreterContext interpreter_context;
auto results = query::Interpreter(&interpreter_context)(
argv[1], &query_dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());

View File

@ -185,7 +185,9 @@ void Execute(GraphDbAccessor *dba, const std::string &query) {
CHECK(dba);
ResultStreamFaker<query::TypedValue> results;
query::DbAccessor query_dba(dba);
query::Interpreter()(query, &query_dba, {}, false, utils::NewDeleteResource())
query::Interpreter::InterpreterContext interpreter_context;
query::Interpreter (&interpreter_context)(query, &query_dba, {}, false,
utils::NewDeleteResource())
.PullAll(results);
}
@ -581,8 +583,9 @@ TEST(DumpTest, ExecuteDumpDatabase) {
query::DbAccessor query_dba(&dba);
const std::string query = "DUMP DATABASE";
ResultStreamFaker<query::TypedValue> stream;
auto results = query::Interpreter()(query, &query_dba, {}, false,
utils::NewDeleteResource());
query::Interpreter::InterpreterContext interpreter_context;
auto results = query::Interpreter(&interpreter_context)(
query, &query_dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());

View File

@ -10,7 +10,8 @@ DECLARE_int32(query_execution_time_sec);
TEST(TransactionTimeout, TransactionTimeout) {
FLAGS_query_execution_time_sec = 3;
database::GraphDb db;
query::Interpreter interpreter;
query::Interpreter::InterpreterContext interpreter_context;
query::Interpreter interpreter(&interpreter_context);
auto interpret = [&](auto &dba, const std::string &query) {
query::DbAccessor query_dba(&dba);
ResultStreamFaker<query::TypedValue> stream;

View File

@ -15,7 +15,8 @@
class InterpreterTest : public ::testing::Test {
protected:
database::GraphDb db_;
query::Interpreter interpreter_;
query::Interpreter::InterpreterContext interpreter_context_;
query::Interpreter interpreter_{&interpreter_context_};
auto Interpret(const std::string &query,
const std::map<std::string, PropertyValue> &params = {}) {
@ -232,9 +233,7 @@ TEST_F(InterpreterTest, Bfs) {
// shorter to longer ones.
EXPECT_EQ(edges.size(), expected_level);
// Check that starting node is correct.
EXPECT_EQ(
edges[0].impl_.from().PropsAt(dba.Property(kId)).ValueInt(),
0);
EXPECT_EQ(edges[0].impl_.from().PropsAt(dba.Property(kId)).ValueInt(), 0);
for (int i = 1; i < static_cast<int>(edges.size()); ++i) {
// Check that edges form a connected path.
EXPECT_EQ(edges[i - 1].To(), edges[i].From());

View File

@ -41,8 +41,9 @@ class QueryExecution : public testing::Test {
auto Execute(const std::string &query) {
query::DbAccessor query_dba(&*dba_);
ResultStreamFaker<query::TypedValue> stream;
auto results = query::Interpreter()(query, &query_dba, {}, false,
utils::NewDeleteResource());
query::Interpreter::InterpreterContext interpreter_context;
auto results = query::Interpreter(&interpreter_context)(
query, &query_dba, {}, false, utils::NewDeleteResource());
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());