Disable LOAD CSV with config (#180)

This commit is contained in:
antonio2368 2021-06-30 11:19:13 +02:00 committed by GitHub
parent e016c74e4b
commit 715162e205
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 293 additions and 148 deletions

View File

@ -91,6 +91,10 @@ modifications:
value: "SNAPSHOT_ISOLATION" value: "SNAPSHOT_ISOLATION"
override: true override: true
- name: "allow_load_csv"
value: "true"
override: false
undocumented: undocumented:
- "flag_file" - "flag_file"
- "also_log_to_stderr" - "also_log_to_stderr"

View File

@ -133,6 +133,9 @@ DEFINE_uint64(memory_warning_threshold, 1024,
"less available RAM it will log a warning. Set to 0 to " "less available RAM it will log a warning. Set to 0 to "
"disable."); "disable.");
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_bool(allow_load_csv, true, "Controls whether LOAD CSV clause is allowed in queries.");
// Storage flags. // Storage flags.
DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).", DEFINE_VALIDATED_uint64(storage_gc_cycle_sec, 30, "Storage garbage collector interval (in seconds).",
FLAG_IN_RANGE(1, 24 * 3600)); FLAG_IN_RANGE(1, 24 * 3600));
@ -174,7 +177,9 @@ DEFINE_VALIDATED_int32(audit_buffer_flush_interval_ms, audit::kBufferFlushInterv
#endif #endif
// Query flags. // Query flags.
DEFINE_uint64(query_execution_timeout_sec, 180,
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_double(query_execution_timeout_sec, 180,
"Maximum allowed query execution time. Queries exceeding this " "Maximum allowed query execution time. Queries exceeding this "
"limit will be aborted. Value of 0 means no limit."); "limit will be aborted. Value of 0 means no limit.");
@ -1057,9 +1062,10 @@ int main(int argc, char **argv) {
db_config.durability.snapshot_interval = std::chrono::seconds(FLAGS_storage_snapshot_interval_sec); db_config.durability.snapshot_interval = std::chrono::seconds(FLAGS_storage_snapshot_interval_sec);
} }
storage::Storage db(db_config); storage::Storage db(db_config);
query::InterpreterContext interpreter_context{&db, FLAGS_data_directory}; query::InterpreterContext interpreter_context{
&db,
query::SetExecutionTimeout(&interpreter_context, FLAGS_query_execution_timeout_sec); {.query = {.allow_load_csv = FLAGS_allow_load_csv}, .execution_timeout_sec = FLAGS_query_execution_timeout_sec},
FLAGS_data_directory};
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE
SessionData session_data{&db, &interpreter_context, &auth, &audit_log}; SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
#else #else
@ -1074,8 +1080,8 @@ int main(int argc, char **argv) {
// the triggers // the triggers
auto storage_accessor = interpreter_context.db->Access(); auto storage_accessor = interpreter_context.db->Access();
auto dba = query::DbAccessor{&storage_accessor}; auto dba = query::DbAccessor{&storage_accessor};
interpreter_context.trigger_store.RestoreTriggers(&interpreter_context.ast_cache, &dba, interpreter_context.trigger_store.RestoreTriggers(
&interpreter_context.antlr_lock); &interpreter_context.ast_cache, &dba, &interpreter_context.antlr_lock, interpreter_context.config.query);
} }
#ifdef MG_ENTERPRISE #ifdef MG_ENTERPRISE

12
src/query/config.hpp Normal file
View File

@ -0,0 +1,12 @@
#pragma once
namespace query {
struct InterpreterConfig {
struct Query {
bool allow_load_csv{true};
} query;
// The default execution timeout is 3 minutes.
double execution_timeout_sec{180.0};
};
} // namespace query

View File

@ -10,7 +10,8 @@ namespace query {
CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan) : plan_(std::move(plan)) {} CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan) : plan_(std::move(plan)) {}
ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> &params, ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> &params,
utils::SkipList<QueryCacheEntry> *cache, utils::SpinLock *antlr_lock) { utils::SkipList<QueryCacheEntry> *cache, utils::SpinLock *antlr_lock,
const InterpreterConfig::Query &query_config) {
// Strip the query for caching purposes. The process of stripping a query // Strip the query for caching purposes. The process of stripping a query
// "normalizes" it by replacing any literals with new parameters. This // "normalizes" it by replacing any literals with new parameters. This
// results in just the *structure* of the query being taken into account for // results in just the *structure* of the query being taken into account for
@ -74,7 +75,11 @@ ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::stri
visitor.visit(parser->tree()); visitor.visit(parser->tree());
if (visitor.IsCacheable()) { if (visitor.GetQueryInfo().has_load_csv && !query_config.allow_load_csv) {
throw utils::BasicException("Load CSV not allowed on this instance because it was disabled by a config.");
}
if (visitor.GetQueryInfo().is_cacheable) {
CachedQuery cached_query{std::move(ast_storage), visitor.query(), query::GetRequiredPrivileges(visitor.query())}; CachedQuery cached_query{std::move(ast_storage), visitor.query(), query::GetRequiredPrivileges(visitor.query())};
it = accessor.insert({hash, std::move(cached_query)}).first; it = accessor.insert({hash, std::move(cached_query)}).first;

View File

@ -11,6 +11,7 @@
// the compilation to fail. // the compilation to fail.
#include "query/plan/planner.hpp" #include "query/plan/planner.hpp"
////////////////////////////////////////////////////// //////////////////////////////////////////////////////
#include "query/config.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp" #include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/opencypher/parser.hpp" #include "query/frontend/opencypher/parser.hpp"
#include "query/frontend/semantic/required_privileges.hpp" #include "query/frontend/semantic/required_privileges.hpp"
@ -109,7 +110,8 @@ struct ParsedQuery {
}; };
ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> &params, ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::string, storage::PropertyValue> &params,
utils::SkipList<QueryCacheEntry> *cache, utils::SpinLock *antlr_lock); utils::SkipList<QueryCacheEntry> *cache, utils::SpinLock *antlr_lock,
const InterpreterConfig::Query &query_config);
class SingleNodeLogicalPlan final : public LogicalPlan { class SingleNodeLogicalPlan final : public LogicalPlan {
public: public:

View File

@ -294,6 +294,8 @@ antlrcpp::Any CypherMainVisitor::visitLockPathQuery(MemgraphCypher::LockPathQuer
} }
antlrcpp::Any CypherMainVisitor::visitLoadCsv(MemgraphCypher::LoadCsvContext *ctx) { antlrcpp::Any CypherMainVisitor::visitLoadCsv(MemgraphCypher::LoadCsvContext *ctx) {
query_info_.has_load_csv = true;
auto *load_csv = storage_->Create<LoadCsv>(); auto *load_csv = storage_->Create<LoadCsv>();
// handle file name // handle file name
if (ctx->csvFile()->literal()->StringLiteral()) { if (ctx->csvFile()->literal()->StringLiteral()) {
@ -331,6 +333,7 @@ antlrcpp::Any CypherMainVisitor::visitLoadCsv(MemgraphCypher::LoadCsvContext *ct
// handle row variable // handle row variable
load_csv->row_var_ = storage_->Create<Identifier>(ctx->rowVar()->variable()->accept(this).as<std::string>()); load_csv->row_var_ = storage_->Create<Identifier>(ctx->rowVar()->variable()->accept(this).as<std::string>());
return load_csv; return load_csv;
} }
@ -604,7 +607,7 @@ antlrcpp::Any CypherMainVisitor::visitCallProcedure(MemgraphCypher::CallProcedur
// If a user recompiles and reloads the procedure with different result // If a user recompiles and reloads the procedure with different result
// names, because of the cache, old result names will be expected while the // names, because of the cache, old result names will be expected while the
// procedure will return results mapped to new names. // procedure will return results mapped to new names.
is_cacheable_ = false; query_info_.is_cacheable = false;
auto *call_proc = storage_->Create<CallProcedure>(); auto *call_proc = storage_->Create<CallProcedure>();
MG_ASSERT(!ctx->procedureName()->symbolicName().empty()); MG_ASSERT(!ctx->procedureName()->symbolicName().empty());

View File

@ -728,7 +728,12 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
Query *query() { return query_; } Query *query() { return query_; }
const static std::string kAnonPrefix; const static std::string kAnonPrefix;
bool IsCacheable() const { return is_cacheable_; } struct QueryInfo {
bool is_cacheable{true};
bool has_load_csv{false};
};
const auto &GetQueryInfo() const { return query_info_; }
private: private:
LabelIx AddLabel(const std::string &name); LabelIx AddLabel(const std::string &name);
@ -748,7 +753,7 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
// return. // return.
bool in_with_ = false; bool in_with_ = false;
bool is_cacheable_ = true; QueryInfo query_info_;
}; };
} // namespace frontend } // namespace frontend
} // namespace query } // namespace query

View File

@ -513,8 +513,8 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
ctx_.evaluation_context.parameters = parameters; ctx_.evaluation_context.parameters = parameters;
ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, dba); ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, dba);
ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba); ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, dba);
if (interpreter_context->execution_timeout_sec > 0) { if (interpreter_context->config.execution_timeout_sec > 0) {
ctx_.timer = utils::AsyncTimer{interpreter_context->execution_timeout_sec}; ctx_.timer = utils::AsyncTimer{interpreter_context->config.execution_timeout_sec};
} }
ctx_.is_shutting_down = &interpreter_context->is_shutting_down; ctx_.is_shutting_down = &interpreter_context->is_shutting_down;
ctx_.is_profile_query = is_profile_query; ctx_.is_profile_query = is_profile_query;
@ -602,8 +602,9 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
using RWType = plan::ReadWriteTypeChecker::RWType; using RWType = plan::ReadWriteTypeChecker::RWType;
} // namespace } // namespace
InterpreterContext::InterpreterContext(storage::Storage *db, const std::filesystem::path &data_directory) InterpreterContext::InterpreterContext(storage::Storage *db, const InterpreterConfig config,
: db(db), trigger_store(data_directory / "triggers") {} const std::filesystem::path &data_directory)
: db(db), trigger_store(data_directory / "triggers"), config(config) {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
@ -739,7 +740,7 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
// full query string) when given just the inner query to execute. // full query string) when given just the inner query to execute.
ParsedQuery parsed_inner_query = ParsedQuery parsed_inner_query =
ParseQuery(parsed_query.query_string.substr(kExplainQueryStart.size()), parsed_query.user_parameters, ParseQuery(parsed_query.query_string.substr(kExplainQueryStart.size()), parsed_query.user_parameters,
&interpreter_context->ast_cache, &interpreter_context->antlr_lock); &interpreter_context->ast_cache, &interpreter_context->antlr_lock, interpreter_context->config.query);
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query); auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN"); MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN");
@ -806,7 +807,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
// full query string) when given just the inner query to execute. // full query string) when given just the inner query to execute.
ParsedQuery parsed_inner_query = ParsedQuery parsed_inner_query =
ParseQuery(parsed_query.query_string.substr(kProfileQueryStart.size()), parsed_query.user_parameters, ParseQuery(parsed_query.query_string.substr(kProfileQueryStart.size()), parsed_query.user_parameters,
&interpreter_context->ast_cache, &interpreter_context->antlr_lock); &interpreter_context->ast_cache, &interpreter_context->antlr_lock, interpreter_context->config.query);
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query); auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in PROFILE"); MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in PROFILE");
@ -1090,7 +1091,7 @@ Callback CreateTrigger(TriggerQuery *trigger_query,
interpreter_context->trigger_store.AddTrigger( interpreter_context->trigger_store.AddTrigger(
trigger_name, trigger_statement, user_parameters, ToTriggerEventType(event_type), trigger_name, trigger_statement, user_parameters, ToTriggerEventType(event_type),
before_commit ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT, &interpreter_context->ast_cache, before_commit ? TriggerPhase::BEFORE_COMMIT : TriggerPhase::AFTER_COMMIT, &interpreter_context->ast_cache,
dba, &interpreter_context->antlr_lock); dba, &interpreter_context->antlr_lock, interpreter_context->config.query);
return {}; return {};
}}; }};
} }
@ -1487,8 +1488,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
query_execution->summary["cost_estimate"] = 0.0; query_execution->summary["cost_estimate"] = 0.0;
utils::Timer parsing_timer; utils::Timer parsing_timer;
ParsedQuery parsed_query = ParsedQuery parsed_query = ParseQuery(query_string, params, &interpreter_context_->ast_cache,
ParseQuery(query_string, params, &interpreter_context_->ast_cache, &interpreter_context_->antlr_lock); &interpreter_context_->antlr_lock, interpreter_context_->config.query);
query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count(); query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count();
// Some queries require an active transaction in order to be prepared. // Some queries require an active transaction in order to be prepared.
@ -1601,7 +1602,7 @@ void RunTriggersIndividually(const utils::SkipList<Trigger> &triggers, Interpret
trigger_context.AdaptForAccessor(&db_accessor); trigger_context.AdaptForAccessor(&db_accessor);
try { try {
trigger.Execute(&db_accessor, &execution_memory, interpreter_context->execution_timeout_sec, trigger.Execute(&db_accessor, &execution_memory, interpreter_context->config.execution_timeout_sec,
&interpreter_context->is_shutting_down, trigger_context); &interpreter_context->is_shutting_down, trigger_context);
} catch (const utils::BasicException &exception) { } catch (const utils::BasicException &exception) {
spdlog::warn("Trigger '{}' failed with exception:\n{}", trigger.Name(), exception.what()); spdlog::warn("Trigger '{}' failed with exception:\n{}", trigger.Name(), exception.what());
@ -1656,7 +1657,7 @@ void Interpreter::Commit() {
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize}; utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
AdvanceCommand(); AdvanceCommand();
try { try {
trigger.Execute(&*execution_db_accessor_, &execution_memory, interpreter_context_->execution_timeout_sec, trigger.Execute(&*execution_db_accessor_, &execution_memory, interpreter_context_->config.execution_timeout_sec,
&interpreter_context_->is_shutting_down, *trigger_context); &interpreter_context_->is_shutting_down, *trigger_context);
} catch (const utils::BasicException &e) { } catch (const utils::BasicException &e) {
throw utils::BasicException( throw utils::BasicException(

View File

@ -2,6 +2,7 @@
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "query/config.hpp"
#include "query/context.hpp" #include "query/context.hpp"
#include "query/cypher_query_interpreter.hpp" #include "query/cypher_query_interpreter.hpp"
#include "query/db_accessor.hpp" #include "query/db_accessor.hpp"
@ -148,7 +149,8 @@ struct PreparedQuery {
* been passed to an `Interpreter` instance. * been passed to an `Interpreter` instance.
*/ */
struct InterpreterContext { struct InterpreterContext {
explicit InterpreterContext(storage::Storage *db, const std::filesystem::path &data_directory); explicit InterpreterContext(storage::Storage *db, InterpreterConfig config,
const std::filesystem::path &data_directory);
storage::Storage *db; storage::Storage *db;
@ -161,8 +163,6 @@ struct InterpreterContext {
utils::SpinLock antlr_lock; utils::SpinLock antlr_lock;
std::optional<double> tsc_frequency{utils::GetTSCFrequency()}; std::optional<double> tsc_frequency{utils::GetTSCFrequency()};
std::atomic<bool> is_shutting_down{false}; std::atomic<bool> is_shutting_down{false};
// The default execution timeout is 3 minutes.
double execution_timeout_sec{180.0};
AuthQueryHandler *auth{nullptr}; AuthQueryHandler *auth{nullptr};
@ -171,17 +171,14 @@ struct InterpreterContext {
TriggerStore trigger_store; TriggerStore trigger_store;
utils::ThreadPool after_commit_trigger_pool{1}; utils::ThreadPool after_commit_trigger_pool{1};
const InterpreterConfig config;
}; };
/// Function that is used to tell all active interpreters that they should stop /// Function that is used to tell all active interpreters that they should stop
/// their ongoing execution. /// their ongoing execution.
inline void Shutdown(InterpreterContext *context) { context->is_shutting_down.store(true, std::memory_order_release); } inline void Shutdown(InterpreterContext *context) { context->is_shutting_down.store(true, std::memory_order_release); }
/// Function used to set the maximum execution timeout in seconds.
inline void SetExecutionTimeout(InterpreterContext *context, double timeout) {
context->execution_timeout_sec = timeout;
}
class Interpreter final { class Interpreter final {
public: public:
explicit Interpreter(InterpreterContext *interpreter_context); explicit Interpreter(InterpreterContext *interpreter_context);

View File

@ -2,6 +2,7 @@
#include <concepts> #include <concepts>
#include "query/config.hpp"
#include "query/context.hpp" #include "query/context.hpp"
#include "query/cypher_query_interpreter.hpp" #include "query/cypher_query_interpreter.hpp"
#include "query/db_accessor.hpp" #include "query/db_accessor.hpp"
@ -136,9 +137,9 @@ std::vector<std::pair<Identifier, TriggerIdentifierTag>> GetPredefinedIdentifier
Trigger::Trigger(std::string name, const std::string &query, Trigger::Trigger(std::string name, const std::string &query,
const std::map<std::string, storage::PropertyValue> &user_parameters, const std::map<std::string, storage::PropertyValue> &user_parameters,
const TriggerEventType event_type, utils::SkipList<QueryCacheEntry> *query_cache, const TriggerEventType event_type, utils::SkipList<QueryCacheEntry> *query_cache,
DbAccessor *db_accessor, utils::SpinLock *antlr_lock) DbAccessor *db_accessor, utils::SpinLock *antlr_lock, const InterpreterConfig::Query &query_config)
: name_{std::move(name)}, : name_{std::move(name)},
parsed_statements_{ParseQuery(query, user_parameters, query_cache, antlr_lock)}, parsed_statements_{ParseQuery(query, user_parameters, query_cache, antlr_lock, query_config)},
event_type_{event_type} { event_type_{event_type} {
// We check immediately if the query is valid by trying to create a plan. // We check immediately if the query is valid by trying to create a plan.
GetPlan(db_accessor); GetPlan(db_accessor);
@ -237,7 +238,7 @@ constexpr uint64_t kVersion{1};
TriggerStore::TriggerStore(std::filesystem::path directory) : storage_{std::move(directory)} {} TriggerStore::TriggerStore(std::filesystem::path directory) : storage_{std::move(directory)} {}
void TriggerStore::RestoreTriggers(utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor, void TriggerStore::RestoreTriggers(utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor,
utils::SpinLock *antlr_lock) { utils::SpinLock *antlr_lock, const InterpreterConfig::Query &query_config) {
MG_ASSERT(before_commit_triggers_.size() == 0 && after_commit_triggers_.size() == 0, MG_ASSERT(before_commit_triggers_.size() == 0 && after_commit_triggers_.size() == 0,
"Cannot restore trigger when some triggers already exist!"); "Cannot restore trigger when some triggers already exist!");
spdlog::info("Loading triggers..."); spdlog::info("Loading triggers...");
@ -288,7 +289,8 @@ void TriggerStore::RestoreTriggers(utils::SkipList<QueryCacheEntry> *query_cache
std::optional<Trigger> trigger; std::optional<Trigger> trigger;
try { try {
trigger.emplace(trigger_name, statement, user_parameters, event_type, query_cache, db_accessor, antlr_lock); trigger.emplace(trigger_name, statement, user_parameters, event_type, query_cache, db_accessor, antlr_lock,
query_config);
} catch (const utils::BasicException &e) { } catch (const utils::BasicException &e) {
spdlog::warn("Failed to create trigger '{}' because: {}", trigger_name, e.what()); spdlog::warn("Failed to create trigger '{}' because: {}", trigger_name, e.what());
continue; continue;
@ -306,7 +308,7 @@ void TriggerStore::AddTrigger(const std::string &name, const std::string &query,
const std::map<std::string, storage::PropertyValue> &user_parameters, const std::map<std::string, storage::PropertyValue> &user_parameters,
TriggerEventType event_type, TriggerPhase phase, TriggerEventType event_type, TriggerPhase phase,
utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor, utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor,
utils::SpinLock *antlr_lock) { utils::SpinLock *antlr_lock, const InterpreterConfig::Query &query_config) {
std::unique_lock store_guard{store_lock_}; std::unique_lock store_guard{store_lock_};
if (storage_.Get(name)) { if (storage_.Get(name)) {
throw utils::BasicException("Trigger with the same name already exists."); throw utils::BasicException("Trigger with the same name already exists.");
@ -314,7 +316,7 @@ void TriggerStore::AddTrigger(const std::string &name, const std::string &query,
std::optional<Trigger> trigger; std::optional<Trigger> trigger;
try { try {
trigger.emplace(name, query, user_parameters, event_type, query_cache, db_accessor, antlr_lock); trigger.emplace(name, query, user_parameters, event_type, query_cache, db_accessor, antlr_lock, query_config);
} catch (const utils::BasicException &e) { } catch (const utils::BasicException &e) {
const auto identifiers = GetPredefinedIdentifiers(event_type); const auto identifiers = GetPredefinedIdentifiers(event_type);
std::stringstream identifier_names_stream; std::stringstream identifier_names_stream;

View File

@ -9,6 +9,7 @@
#include <vector> #include <vector>
#include "kvstore/kvstore.hpp" #include "kvstore/kvstore.hpp"
#include "query/config.hpp"
#include "query/cypher_query_interpreter.hpp" #include "query/cypher_query_interpreter.hpp"
#include "query/db_accessor.hpp" #include "query/db_accessor.hpp"
#include "query/frontend/ast/ast.hpp" #include "query/frontend/ast/ast.hpp"
@ -21,7 +22,8 @@ namespace query {
struct Trigger { struct Trigger {
explicit Trigger(std::string name, const std::string &query, explicit Trigger(std::string name, const std::string &query,
const std::map<std::string, storage::PropertyValue> &user_parameters, TriggerEventType event_type, const std::map<std::string, storage::PropertyValue> &user_parameters, TriggerEventType event_type,
utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor, utils::SpinLock *antlr_lock); utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor, utils::SpinLock *antlr_lock,
const InterpreterConfig::Query &query_config);
void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double max_execution_time_sec, void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double max_execution_time_sec,
std::atomic<bool> *is_shutting_down, const TriggerContext &context) const; std::atomic<bool> *is_shutting_down, const TriggerContext &context) const;
@ -63,12 +65,12 @@ struct TriggerStore {
explicit TriggerStore(std::filesystem::path directory); explicit TriggerStore(std::filesystem::path directory);
void RestoreTriggers(utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor, void RestoreTriggers(utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor,
utils::SpinLock *antlr_lock); utils::SpinLock *antlr_lock, const InterpreterConfig::Query &query_config);
void AddTrigger(const std::string &name, const std::string &query, void AddTrigger(const std::string &name, const std::string &query,
const std::map<std::string, storage::PropertyValue> &user_parameters, TriggerEventType event_type, const std::map<std::string, storage::PropertyValue> &user_parameters, TriggerEventType event_type,
TriggerPhase phase, utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor, TriggerPhase phase, utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor,
utils::SpinLock *antlr_lock); utils::SpinLock *antlr_lock, const InterpreterConfig::Query &query_config);
void DropTrigger(const std::string &name); void DropTrigger(const std::string &name);

View File

@ -2,6 +2,7 @@
#include <benchmark/benchmark_api.h> #include <benchmark/benchmark_api.h>
#include "communication/result_stream_faker.hpp" #include "communication/result_stream_faker.hpp"
#include "query/config.hpp"
#include "query/interpreter.hpp" #include "query/interpreter.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
#include "storage/v2/isolation_level.hpp" #include "storage/v2/isolation_level.hpp"
@ -36,7 +37,7 @@ class ExpansionBenchFixture : public benchmark::Fixture {
MG_ASSERT(db->CreateIndex(label)); MG_ASSERT(db->CreateIndex(label));
interpreter_context.emplace(&*db, data_directory); interpreter_context.emplace(&*db, query::InterpreterConfig{}, data_directory);
interpreter.emplace(&*interpreter_context); interpreter.emplace(&*interpreter_context);
} }

View File

@ -1,4 +1,5 @@
#include "communication/result_stream_faker.hpp" #include "communication/result_stream_faker.hpp"
#include "query/config.hpp"
#include "query/interpreter.hpp" #include "query/interpreter.hpp"
#include "storage/v2/isolation_level.hpp" #include "storage/v2/isolation_level.hpp"
#include "storage/v2/storage.hpp" #include "storage/v2/storage.hpp"
@ -16,7 +17,7 @@ int main(int argc, char *argv[]) {
storage::Storage db; storage::Storage db;
auto data_directory = std::filesystem::temp_directory_path() / "single_query_test"; auto data_directory = std::filesystem::temp_directory_path() / "single_query_test";
utils::OnScopeExit([&data_directory] { std::filesystem::remove_all(data_directory); }); utils::OnScopeExit([&data_directory] { std::filesystem::remove_all(data_directory); });
query::InterpreterContext interpreter_context{&db, data_directory}; query::InterpreterContext interpreter_context{&db, query::InterpreterConfig{}, data_directory};
query::Interpreter interpreter{&interpreter_context}; query::Interpreter interpreter{&interpreter_context};
ResultStreamFaker stream(&db); ResultStreamFaker stream(&db);

View File

@ -6,6 +6,7 @@
#include "glue/communication.hpp" #include "glue/communication.hpp"
#include "gmock/gmock.h" #include "gmock/gmock.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "query/config.hpp"
#include "query/exceptions.hpp" #include "query/exceptions.hpp"
#include "query/interpreter.hpp" #include "query/interpreter.hpp"
#include "query/stream.hpp" #include "query/stream.hpp"
@ -26,28 +27,21 @@ auto ToEdgeList(const communication::bolt::Value &v) {
return list; return list;
}; };
} // namespace struct InterpreterFaker {
explicit InterpreterFaker(storage::Storage *db, const query::InterpreterConfig config,
// TODO: This is not a unit test, but tests/integration dir is chaotic at the const std::filesystem::path &data_directory)
// moment. After tests refactoring is done, move/rename this. : interpreter_context(db, config, data_directory), interpreter(&interpreter_context) {}
class InterpreterTest : public ::testing::Test {
protected:
storage::Storage db_;
std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "MG_tests_unit_interpreter"};
query::InterpreterContext interpreter_context_{&db_, data_directory};
query::Interpreter interpreter_{&interpreter_context_};
auto Prepare(const std::string &query, const std::map<std::string, storage::PropertyValue> &params = {}) { auto Prepare(const std::string &query, const std::map<std::string, storage::PropertyValue> &params = {}) {
ResultStreamFaker stream(&db_); ResultStreamFaker stream(interpreter_context.db);
const auto [header, _, qid] = interpreter_.Prepare(query, params); const auto [header, _, qid] = interpreter.Prepare(query, params);
stream.Header(header); stream.Header(header);
return std::pair{std::move(stream), qid}; return std::make_pair(std::move(stream), qid);
} }
void Pull(ResultStreamFaker *stream, std::optional<int> n = {}, std::optional<int> qid = {}) { void Pull(ResultStreamFaker *stream, std::optional<int> n = {}, std::optional<int> qid = {}) {
const auto summary = interpreter_.Pull(stream, n, qid); const auto summary = interpreter.Pull(stream, n, qid);
stream->Summary(summary); stream->Summary(summary);
} }
@ -60,11 +54,39 @@ class InterpreterTest : public ::testing::Test {
auto prepare_result = Prepare(query, params); auto prepare_result = Prepare(query, params);
auto &stream = prepare_result.first; auto &stream = prepare_result.first;
auto summary = interpreter_.Pull(&stream, {}, prepare_result.second); auto summary = interpreter.Pull(&stream, {}, prepare_result.second);
stream.Summary(summary); stream.Summary(summary);
return std::move(stream); return std::move(stream);
} }
query::InterpreterContext interpreter_context;
query::Interpreter interpreter;
};
} // namespace
// TODO: This is not a unit test, but tests/integration dir is chaotic at the
// moment. After tests refactoring is done, move/rename this.
class InterpreterTest : public ::testing::Test {
protected:
storage::Storage db_;
std::filesystem::path data_directory{std::filesystem::temp_directory_path() / "MG_tests_unit_interpreter"};
InterpreterFaker default_interpreter{&db_, {}, data_directory};
auto Prepare(const std::string &query, const std::map<std::string, storage::PropertyValue> &params = {}) {
return default_interpreter.Prepare(query, params);
}
void Pull(ResultStreamFaker *stream, std::optional<int> n = {}, std::optional<int> qid = {}) {
default_interpreter.Pull(stream, n, qid);
}
auto Interpret(const std::string &query, const std::map<std::string, storage::PropertyValue> &params = {}) {
return default_interpreter.Interpret(query, params);
}
}; };
TEST_F(InterpreterTest, MultiplePulls) { TEST_F(InterpreterTest, MultiplePulls) {
@ -197,11 +219,6 @@ TEST_F(InterpreterTest, Parameters) {
} }
} }
TEST_F(InterpreterTest, LoadCsv) {
// for debug purposes
auto [stream, qid] = Prepare(R"(LOAD CSV FROM "simple.csv" NO HEADER AS row RETURN row)");
}
// Test bfs end to end. // Test bfs end to end.
TEST_F(InterpreterTest, Bfs) { TEST_F(InterpreterTest, Bfs) {
srand(0); srand(0);
@ -474,8 +491,10 @@ TEST_F(InterpreterTest, UniqueConstraintTest) {
} }
TEST_F(InterpreterTest, ExplainQuery) { TEST_F(InterpreterTest, ExplainQuery) {
EXPECT_EQ(interpreter_context_.plan_cache.size(), 0U); const auto &interpreter_context = default_interpreter.interpreter_context;
EXPECT_EQ(interpreter_context_.ast_cache.size(), 0U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 0U);
EXPECT_EQ(interpreter_context.ast_cache.size(), 0U);
auto stream = Interpret("EXPLAIN MATCH (n) RETURN *;"); auto stream = Interpret("EXPLAIN MATCH (n) RETURN *;");
ASSERT_EQ(stream.GetHeader().size(), 1U); ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader().front(), "QUERY PLAN"); EXPECT_EQ(stream.GetHeader().front(), "QUERY PLAN");
@ -488,17 +507,19 @@ TEST_F(InterpreterTest, ExplainQuery) {
++expected_it; ++expected_it;
} }
// We should have a plan cache for MATCH ... // We should have a plan cache for MATCH ...
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
// We should have AST cache for EXPLAIN ... and for inner MATCH ... // We should have AST cache for EXPLAIN ... and for inner MATCH ...
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
Interpret("MATCH (n) RETURN *;"); Interpret("MATCH (n) RETURN *;");
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
} }
TEST_F(InterpreterTest, ExplainQueryMultiplePulls) { TEST_F(InterpreterTest, ExplainQueryMultiplePulls) {
EXPECT_EQ(interpreter_context_.plan_cache.size(), 0U); const auto &interpreter_context = default_interpreter.interpreter_context;
EXPECT_EQ(interpreter_context_.ast_cache.size(), 0U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 0U);
EXPECT_EQ(interpreter_context.ast_cache.size(), 0U);
auto [stream, qid] = Prepare("EXPLAIN MATCH (n) RETURN *;"); auto [stream, qid] = Prepare("EXPLAIN MATCH (n) RETURN *;");
ASSERT_EQ(stream.GetHeader().size(), 1U); ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader().front(), "QUERY PLAN"); EXPECT_EQ(stream.GetHeader().front(), "QUERY PLAN");
@ -521,17 +542,19 @@ TEST_F(InterpreterTest, ExplainQueryMultiplePulls) {
ASSERT_EQ(stream.GetResults()[2].size(), 1U); ASSERT_EQ(stream.GetResults()[2].size(), 1U);
EXPECT_EQ(stream.GetResults()[2].front().ValueString(), *expected_it); EXPECT_EQ(stream.GetResults()[2].front().ValueString(), *expected_it);
// We should have a plan cache for MATCH ... // We should have a plan cache for MATCH ...
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
// We should have AST cache for EXPLAIN ... and for inner MATCH ... // We should have AST cache for EXPLAIN ... and for inner MATCH ...
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
Interpret("MATCH (n) RETURN *;"); Interpret("MATCH (n) RETURN *;");
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
} }
TEST_F(InterpreterTest, ExplainQueryInMulticommandTransaction) { TEST_F(InterpreterTest, ExplainQueryInMulticommandTransaction) {
EXPECT_EQ(interpreter_context_.plan_cache.size(), 0U); const auto &interpreter_context = default_interpreter.interpreter_context;
EXPECT_EQ(interpreter_context_.ast_cache.size(), 0U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 0U);
EXPECT_EQ(interpreter_context.ast_cache.size(), 0U);
Interpret("BEGIN"); Interpret("BEGIN");
auto stream = Interpret("EXPLAIN MATCH (n) RETURN *;"); auto stream = Interpret("EXPLAIN MATCH (n) RETURN *;");
Interpret("COMMIT"); Interpret("COMMIT");
@ -546,17 +569,19 @@ TEST_F(InterpreterTest, ExplainQueryInMulticommandTransaction) {
++expected_it; ++expected_it;
} }
// We should have a plan cache for MATCH ... // We should have a plan cache for MATCH ...
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
// We should have AST cache for EXPLAIN ... and for inner MATCH ... // We should have AST cache for EXPLAIN ... and for inner MATCH ...
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
Interpret("MATCH (n) RETURN *;"); Interpret("MATCH (n) RETURN *;");
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
} }
TEST_F(InterpreterTest, ExplainQueryWithParams) { TEST_F(InterpreterTest, ExplainQueryWithParams) {
EXPECT_EQ(interpreter_context_.plan_cache.size(), 0U); const auto &interpreter_context = default_interpreter.interpreter_context;
EXPECT_EQ(interpreter_context_.ast_cache.size(), 0U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 0U);
EXPECT_EQ(interpreter_context.ast_cache.size(), 0U);
auto stream = Interpret("EXPLAIN MATCH (n) WHERE n.id = $id RETURN *;", {{"id", storage::PropertyValue(42)}}); auto stream = Interpret("EXPLAIN MATCH (n) WHERE n.id = $id RETURN *;", {{"id", storage::PropertyValue(42)}});
ASSERT_EQ(stream.GetHeader().size(), 1U); ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader().front(), "QUERY PLAN"); EXPECT_EQ(stream.GetHeader().front(), "QUERY PLAN");
@ -569,17 +594,19 @@ TEST_F(InterpreterTest, ExplainQueryWithParams) {
++expected_it; ++expected_it;
} }
// We should have a plan cache for MATCH ... // We should have a plan cache for MATCH ...
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
// We should have AST cache for EXPLAIN ... and for inner MATCH ... // We should have AST cache for EXPLAIN ... and for inner MATCH ...
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
Interpret("MATCH (n) WHERE n.id = $id RETURN *;", {{"id", storage::PropertyValue("something else")}}); Interpret("MATCH (n) WHERE n.id = $id RETURN *;", {{"id", storage::PropertyValue("something else")}});
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
} }
TEST_F(InterpreterTest, ProfileQuery) { TEST_F(InterpreterTest, ProfileQuery) {
EXPECT_EQ(interpreter_context_.plan_cache.size(), 0U); const auto &interpreter_context = default_interpreter.interpreter_context;
EXPECT_EQ(interpreter_context_.ast_cache.size(), 0U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 0U);
EXPECT_EQ(interpreter_context.ast_cache.size(), 0U);
auto stream = Interpret("PROFILE MATCH (n) RETURN *;"); auto stream = Interpret("PROFILE MATCH (n) RETURN *;");
std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"}; std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"};
EXPECT_EQ(stream.GetHeader(), expected_header); EXPECT_EQ(stream.GetHeader(), expected_header);
@ -592,17 +619,19 @@ TEST_F(InterpreterTest, ProfileQuery) {
++expected_it; ++expected_it;
} }
// We should have a plan cache for MATCH ... // We should have a plan cache for MATCH ...
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
// We should have AST cache for PROFILE ... and for inner MATCH ... // We should have AST cache for PROFILE ... and for inner MATCH ...
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
Interpret("MATCH (n) RETURN *;"); Interpret("MATCH (n) RETURN *;");
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
} }
TEST_F(InterpreterTest, ProfileQueryMultiplePulls) { TEST_F(InterpreterTest, ProfileQueryMultiplePulls) {
EXPECT_EQ(interpreter_context_.plan_cache.size(), 0U); const auto &interpreter_context = default_interpreter.interpreter_context;
EXPECT_EQ(interpreter_context_.ast_cache.size(), 0U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 0U);
EXPECT_EQ(interpreter_context.ast_cache.size(), 0U);
auto [stream, qid] = Prepare("PROFILE MATCH (n) RETURN *;"); auto [stream, qid] = Prepare("PROFILE MATCH (n) RETURN *;");
std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"}; std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"};
EXPECT_EQ(stream.GetHeader(), expected_header); EXPECT_EQ(stream.GetHeader(), expected_header);
@ -628,12 +657,12 @@ TEST_F(InterpreterTest, ProfileQueryMultiplePulls) {
ASSERT_EQ(stream.GetResults()[2][0].ValueString(), *expected_it); ASSERT_EQ(stream.GetResults()[2][0].ValueString(), *expected_it);
// We should have a plan cache for MATCH ... // We should have a plan cache for MATCH ...
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
// We should have AST cache for PROFILE ... and for inner MATCH ... // We should have AST cache for PROFILE ... and for inner MATCH ...
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
Interpret("MATCH (n) RETURN *;"); Interpret("MATCH (n) RETURN *;");
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
} }
TEST_F(InterpreterTest, ProfileQueryInMulticommandTransaction) { TEST_F(InterpreterTest, ProfileQueryInMulticommandTransaction) {
@ -643,8 +672,10 @@ TEST_F(InterpreterTest, ProfileQueryInMulticommandTransaction) {
} }
TEST_F(InterpreterTest, ProfileQueryWithParams) { TEST_F(InterpreterTest, ProfileQueryWithParams) {
EXPECT_EQ(interpreter_context_.plan_cache.size(), 0U); const auto &interpreter_context = default_interpreter.interpreter_context;
EXPECT_EQ(interpreter_context_.ast_cache.size(), 0U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 0U);
EXPECT_EQ(interpreter_context.ast_cache.size(), 0U);
auto stream = Interpret("PROFILE MATCH (n) WHERE n.id = $id RETURN *;", {{"id", storage::PropertyValue(42)}}); auto stream = Interpret("PROFILE MATCH (n) WHERE n.id = $id RETURN *;", {{"id", storage::PropertyValue(42)}});
std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"}; std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"};
EXPECT_EQ(stream.GetHeader(), expected_header); EXPECT_EQ(stream.GetHeader(), expected_header);
@ -657,17 +688,19 @@ TEST_F(InterpreterTest, ProfileQueryWithParams) {
++expected_it; ++expected_it;
} }
// We should have a plan cache for MATCH ... // We should have a plan cache for MATCH ...
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
// We should have AST cache for PROFILE ... and for inner MATCH ... // We should have AST cache for PROFILE ... and for inner MATCH ...
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
Interpret("MATCH (n) WHERE n.id = $id RETURN *;", {{"id", storage::PropertyValue("something else")}}); Interpret("MATCH (n) WHERE n.id = $id RETURN *;", {{"id", storage::PropertyValue("something else")}});
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
} }
TEST_F(InterpreterTest, ProfileQueryWithLiterals) { TEST_F(InterpreterTest, ProfileQueryWithLiterals) {
EXPECT_EQ(interpreter_context_.plan_cache.size(), 0U); const auto &interpreter_context = default_interpreter.interpreter_context;
EXPECT_EQ(interpreter_context_.ast_cache.size(), 0U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 0U);
EXPECT_EQ(interpreter_context.ast_cache.size(), 0U);
auto stream = Interpret("PROFILE UNWIND range(1, 1000) AS x CREATE (:Node {id: x});", {}); auto stream = Interpret("PROFILE UNWIND range(1, 1000) AS x CREATE (:Node {id: x});", {});
std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"}; std::vector<std::string> expected_header{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME"};
EXPECT_EQ(stream.GetHeader(), expected_header); EXPECT_EQ(stream.GetHeader(), expected_header);
@ -680,20 +713,21 @@ TEST_F(InterpreterTest, ProfileQueryWithLiterals) {
++expected_it; ++expected_it;
} }
// We should have a plan cache for UNWIND ... // We should have a plan cache for UNWIND ...
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
// We should have AST cache for PROFILE ... and for inner UNWIND ... // We should have AST cache for PROFILE ... and for inner UNWIND ...
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
Interpret("UNWIND range(42, 4242) AS x CREATE (:Node {id: x});", {}); Interpret("UNWIND range(42, 4242) AS x CREATE (:Node {id: x});", {});
EXPECT_EQ(interpreter_context_.plan_cache.size(), 1U); EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
EXPECT_EQ(interpreter_context_.ast_cache.size(), 2U); EXPECT_EQ(interpreter_context.ast_cache.size(), 2U);
} }
TEST_F(InterpreterTest, Transactions) { TEST_F(InterpreterTest, Transactions) {
auto &interpreter = default_interpreter.interpreter;
{ {
ASSERT_THROW(interpreter_.CommitTransaction(), query::ExplicitTransactionUsageException); ASSERT_THROW(interpreter.CommitTransaction(), query::ExplicitTransactionUsageException);
ASSERT_THROW(interpreter_.RollbackTransaction(), query::ExplicitTransactionUsageException); ASSERT_THROW(interpreter.RollbackTransaction(), query::ExplicitTransactionUsageException);
interpreter_.BeginTransaction(); interpreter.BeginTransaction();
ASSERT_THROW(interpreter_.BeginTransaction(), query::ExplicitTransactionUsageException); ASSERT_THROW(interpreter.BeginTransaction(), query::ExplicitTransactionUsageException);
auto [stream, qid] = Prepare("RETURN 2"); auto [stream, qid] = Prepare("RETURN 2");
ASSERT_EQ(stream.GetHeader().size(), 1U); ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "2"); EXPECT_EQ(stream.GetHeader()[0], "2");
@ -702,10 +736,10 @@ TEST_F(InterpreterTest, Transactions) {
ASSERT_FALSE(stream.GetSummary().at("has_more").ValueBool()); ASSERT_FALSE(stream.GetSummary().at("has_more").ValueBool());
ASSERT_EQ(stream.GetResults()[0].size(), 1U); ASSERT_EQ(stream.GetResults()[0].size(), 1U);
ASSERT_EQ(stream.GetResults()[0][0].ValueInt(), 2); ASSERT_EQ(stream.GetResults()[0][0].ValueInt(), 2);
interpreter_.CommitTransaction(); interpreter.CommitTransaction();
} }
{ {
interpreter_.BeginTransaction(); interpreter.BeginTransaction();
auto [stream, qid] = Prepare("RETURN 2"); auto [stream, qid] = Prepare("RETURN 2");
ASSERT_EQ(stream.GetHeader().size(), 1U); ASSERT_EQ(stream.GetHeader().size(), 1U);
EXPECT_EQ(stream.GetHeader()[0], "2"); EXPECT_EQ(stream.GetHeader()[0], "2");
@ -714,20 +748,21 @@ TEST_F(InterpreterTest, Transactions) {
ASSERT_FALSE(stream.GetSummary().at("has_more").ValueBool()); ASSERT_FALSE(stream.GetSummary().at("has_more").ValueBool());
ASSERT_EQ(stream.GetResults()[0].size(), 1U); ASSERT_EQ(stream.GetResults()[0].size(), 1U);
ASSERT_EQ(stream.GetResults()[0][0].ValueInt(), 2); ASSERT_EQ(stream.GetResults()[0][0].ValueInt(), 2);
interpreter_.RollbackTransaction(); interpreter.RollbackTransaction();
} }
} }
TEST_F(InterpreterTest, Qid) { TEST_F(InterpreterTest, Qid) {
auto &interpreter = default_interpreter.interpreter;
{ {
interpreter_.BeginTransaction(); interpreter.BeginTransaction();
auto [stream, qid] = Prepare("RETURN 2"); auto [stream, qid] = Prepare("RETURN 2");
ASSERT_TRUE(qid); ASSERT_TRUE(qid);
ASSERT_THROW(Pull(&stream, {}, *qid + 1), query::InvalidArgumentsException); ASSERT_THROW(Pull(&stream, {}, *qid + 1), query::InvalidArgumentsException);
interpreter_.RollbackTransaction(); interpreter.RollbackTransaction();
} }
{ {
interpreter_.BeginTransaction(); interpreter.BeginTransaction();
auto [stream1, qid1] = Prepare("UNWIND(range(1,3)) as n RETURN n"); auto [stream1, qid1] = Prepare("UNWIND(range(1,3)) as n RETURN n");
ASSERT_TRUE(qid1); ASSERT_TRUE(qid1);
ASSERT_EQ(stream1.GetHeader().size(), 1U); ASSERT_EQ(stream1.GetHeader().size(), 1U);
@ -782,23 +817,26 @@ TEST_F(InterpreterTest, Qid) {
ASSERT_EQ(stream3.GetResults()[1][0].ValueInt(), 8); ASSERT_EQ(stream3.GetResults()[1][0].ValueInt(), 8);
ASSERT_EQ(stream3.GetResults()[2][0].ValueInt(), 9); ASSERT_EQ(stream3.GetResults()[2][0].ValueInt(), 9);
interpreter_.CommitTransaction(); interpreter.CommitTransaction();
} }
} }
namespace { namespace {
// copied from utils_csv_parsing.cpp - tmp dir management and csv file writer // copied from utils_csv_parsing.cpp - tmp dir management and csv file writer
class TmpCsvDirManager final { class TmpDirManager final {
public: public:
TmpCsvDirManager() { CreateCsvDir(); } explicit TmpDirManager(const std::string_view directory)
~TmpCsvDirManager() { Clear(); } : tmp_dir_{std::filesystem::temp_directory_path() / directory} {
CreateDir();
}
~TmpDirManager() { Clear(); }
const std::filesystem::path &Path() const { return tmp_dir_; } const std::filesystem::path &Path() const { return tmp_dir_; }
private: private:
const std::filesystem::path tmp_dir_{std::filesystem::temp_directory_path() / "csv_directory"}; std::filesystem::path tmp_dir_;
void CreateCsvDir() { void CreateDir() {
if (!std::filesystem::exists(tmp_dir_)) { if (!std::filesystem::exists(tmp_dir_)) {
std::filesystem::create_directory(tmp_dir_); std::filesystem::create_directory(tmp_dir_);
} }
@ -843,7 +881,7 @@ std::string CreateRow(const std::vector<std::string> &columns, const std::string
} // namespace } // namespace
TEST_F(InterpreterTest, LoadCsvClause) { TEST_F(InterpreterTest, LoadCsvClause) {
auto dir_manager = TmpCsvDirManager(); auto dir_manager = TmpDirManager("csv_directory");
const auto csv_path = dir_manager.Path() / "file.csv"; const auto csv_path = dir_manager.Path() / "file.csv";
auto writer = FileWriter(csv_path); auto writer = FileWriter(csv_path);
@ -898,3 +936,55 @@ TEST_F(InterpreterTest, LoadCsvClause) {
ASSERT_EQ(stream.GetResults()[1][0].ValueString(), "f"); ASSERT_EQ(stream.GetResults()[1][0].ValueString(), "f");
} }
} }
TEST_F(InterpreterTest, CacheableQueries) {
const auto &interpreter_context = default_interpreter.interpreter_context;
// This should be cached
{
SCOPED_TRACE("Cacheable query");
Interpret("RETURN 1");
EXPECT_EQ(interpreter_context.ast_cache.size(), 1U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
}
{
SCOPED_TRACE("Uncacheable query");
// Queries which are calling procedure should not be cached because the
// result signature could be changed
Interpret("CALL mg.load_all()");
EXPECT_EQ(interpreter_context.ast_cache.size(), 1U);
EXPECT_EQ(interpreter_context.plan_cache.size(), 1U);
}
}
TEST_F(InterpreterTest, AllowLoadCsvConfig) {
const auto check_load_csv_queries = [&](const bool allow_load_csv) {
TmpDirManager directory_manager{"allow_load_csv"};
const auto csv_path = directory_manager.Path() / "file.csv";
auto writer = FileWriter(csv_path);
const std::vector<std::string> data{"A", "B", "C"};
writer.WriteLine(CreateRow(data, ","));
writer.Close();
const std::array<std::string, 2> queries = {
fmt::format("LOAD CSV FROM \"{}\" WITH HEADER AS row RETURN row", csv_path.string()),
"CREATE TRIGGER trigger ON CREATE BEFORE COMMIT EXECUTE LOAD CSV FROM 'file.csv' WITH HEADER AS row RETURN "
"row"};
InterpreterFaker interpreter_faker{&db_, {.query = {.allow_load_csv = allow_load_csv}}, directory_manager.Path()};
for (const auto &query : queries) {
if (allow_load_csv) {
SCOPED_TRACE(fmt::format("'{}' should not throw because LOAD CSV is allowed", query));
ASSERT_NO_THROW(interpreter_faker.Interpret(query));
} else {
SCOPED_TRACE(fmt::format("'{}' should throw becuase LOAD CSV is not allowed", query));
ASSERT_THROW(interpreter_faker.Interpret(query), utils::BasicException);
}
SCOPED_TRACE(fmt::format("Normal query should not throw (allow_load_csv: {})", allow_load_csv));
ASSERT_NO_THROW(interpreter_faker.Interpret("RETURN 1"));
}
};
check_load_csv_queries(true);
check_load_csv_queries(false);
}

View File

@ -6,6 +6,7 @@
#include <vector> #include <vector>
#include "communication/result_stream_faker.hpp" #include "communication/result_stream_faker.hpp"
#include "query/config.hpp"
#include "query/dump.hpp" #include "query/dump.hpp"
#include "query/interpreter.hpp" #include "query/interpreter.hpp"
#include "query/typed_value.hpp" #include "query/typed_value.hpp"
@ -189,7 +190,7 @@ DatabaseState GetState(storage::Storage *db) {
auto Execute(storage::Storage *db, const std::string &query) { auto Execute(storage::Storage *db, const std::string &query) {
auto data_directory = std::filesystem::temp_directory_path() / "MG_tests_unit_query_dump"; auto data_directory = std::filesystem::temp_directory_path() / "MG_tests_unit_query_dump";
query::InterpreterContext context(db, data_directory); query::InterpreterContext context(db, query::InterpreterConfig{}, data_directory);
query::Interpreter interpreter(&context); query::Interpreter interpreter(&context);
ResultStreamFaker stream(db); ResultStreamFaker stream(db);
@ -703,7 +704,7 @@ TEST(DumpTest, ExecuteDumpDatabase) {
class StatefulInterpreter { class StatefulInterpreter {
public: public:
explicit StatefulInterpreter(storage::Storage *db) explicit StatefulInterpreter(storage::Storage *db)
: db_(db), context_(db_, data_directory_), interpreter_(&context_) {} : db_(db), context_(db_, query::InterpreterConfig{}, data_directory_), interpreter_(&context_) {}
auto Execute(const std::string &query) { auto Execute(const std::string &query) {
ResultStreamFaker stream(db_); ResultStreamFaker stream(db_);

View File

@ -24,7 +24,7 @@ class QueryExecution : public testing::Test {
void SetUp() { void SetUp() {
db_.emplace(); db_.emplace();
interpreter_context_.emplace(&*db_, data_directory); interpreter_context_.emplace(&*db_, query::InterpreterConfig{}, data_directory);
interpreter_.emplace(&*interpreter_context_); interpreter_.emplace(&*interpreter_context_);
} }

View File

@ -2,6 +2,7 @@
#include <filesystem> #include <filesystem>
#include <fmt/format.h> #include <fmt/format.h>
#include "query/config.hpp"
#include "query/db_accessor.hpp" #include "query/db_accessor.hpp"
#include "query/interpreter.hpp" #include "query/interpreter.hpp"
#include "query/trigger.hpp" #include "query/trigger.hpp"
@ -835,7 +836,7 @@ TEST_F(TriggerStoreTest, Restore) {
const auto reset_store = [&] { const auto reset_store = [&] {
store.emplace(testing_directory); store.emplace(testing_directory);
store->RestoreTriggers(&ast_cache, &*dba, &antlr_lock); store->RestoreTriggers(&ast_cache, &*dba, &antlr_lock, query::InterpreterConfig::Query{});
}; };
reset_store(); reset_store();
@ -854,10 +855,12 @@ TEST_F(TriggerStoreTest, Restore) {
const auto event_type = query::TriggerEventType::VERTEX_CREATE; const auto event_type = query::TriggerEventType::VERTEX_CREATE;
store->AddTrigger(trigger_name_before, trigger_statement, store->AddTrigger(trigger_name_before, trigger_statement,
std::map<std::string, storage::PropertyValue>{{"parameter", storage::PropertyValue{1}}}, event_type, std::map<std::string, storage::PropertyValue>{{"parameter", storage::PropertyValue{1}}}, event_type,
query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock); query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{});
store->AddTrigger(trigger_name_after, trigger_statement, store->AddTrigger(trigger_name_after, trigger_statement,
std::map<std::string, storage::PropertyValue>{{"parameter", storage::PropertyValue{"value"}}}, std::map<std::string, storage::PropertyValue>{{"parameter", storage::PropertyValue{"value"}}},
event_type, query::TriggerPhase::AFTER_COMMIT, &ast_cache, &*dba, &antlr_lock); event_type, query::TriggerPhase::AFTER_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{});
const auto check_triggers = [&] { const auto check_triggers = [&] {
ASSERT_EQ(store->GetTriggerInfo().size(), 2); ASSERT_EQ(store->GetTriggerInfo().size(), 2);
@ -902,27 +905,33 @@ TEST_F(TriggerStoreTest, AddTrigger) {
// Invalid query in statements // Invalid query in statements
ASSERT_THROW(store.AddTrigger("trigger", "RETUR 1", {}, query::TriggerEventType::VERTEX_CREATE, ASSERT_THROW(store.AddTrigger("trigger", "RETUR 1", {}, query::TriggerEventType::VERTEX_CREATE,
query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock), query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{}),
utils::BasicException); utils::BasicException);
ASSERT_THROW(store.AddTrigger("trigger", "RETURN createdEdges", {}, query::TriggerEventType::VERTEX_CREATE, ASSERT_THROW(store.AddTrigger("trigger", "RETURN createdEdges", {}, query::TriggerEventType::VERTEX_CREATE,
query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock), query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{}),
utils::BasicException); utils::BasicException);
ASSERT_THROW(store.AddTrigger("trigger", "RETURN $parameter", {}, query::TriggerEventType::VERTEX_CREATE, ASSERT_THROW(store.AddTrigger("trigger", "RETURN $parameter", {}, query::TriggerEventType::VERTEX_CREATE,
query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock), query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{}),
utils::BasicException); utils::BasicException);
ASSERT_NO_THROW(store.AddTrigger( ASSERT_NO_THROW(
"trigger", "RETURN $parameter", store.AddTrigger("trigger", "RETURN $parameter",
std::map<std::string, storage::PropertyValue>{{"parameter", storage::PropertyValue{1}}}, std::map<std::string, storage::PropertyValue>{{"parameter", storage::PropertyValue{1}}},
query::TriggerEventType::VERTEX_CREATE, query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock)); query::TriggerEventType::VERTEX_CREATE, query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba,
&antlr_lock, query::InterpreterConfig::Query{}));
// Inserting with the same name // Inserting with the same name
ASSERT_THROW(store.AddTrigger("trigger", "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE, ASSERT_THROW(store.AddTrigger("trigger", "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE,
query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock), query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{}),
utils::BasicException); utils::BasicException);
ASSERT_THROW(store.AddTrigger("trigger", "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE, ASSERT_THROW(store.AddTrigger("trigger", "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE,
query::TriggerPhase::AFTER_COMMIT, &ast_cache, &*dba, &antlr_lock), query::TriggerPhase::AFTER_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{}),
utils::BasicException); utils::BasicException);
ASSERT_EQ(store.GetTriggerInfo().size(), 1); ASSERT_EQ(store.GetTriggerInfo().size(), 1);
@ -937,7 +946,8 @@ TEST_F(TriggerStoreTest, DropTrigger) {
const auto *trigger_name = "trigger"; const auto *trigger_name = "trigger";
store.AddTrigger(trigger_name, "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE, store.AddTrigger(trigger_name, "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE,
query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock); query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{});
ASSERT_THROW(store.DropTrigger("Unknown"), utils::BasicException); ASSERT_THROW(store.DropTrigger("Unknown"), utils::BasicException);
ASSERT_NO_THROW(store.DropTrigger(trigger_name)); ASSERT_NO_THROW(store.DropTrigger(trigger_name));
@ -949,7 +959,8 @@ TEST_F(TriggerStoreTest, TriggerInfo) {
std::vector<query::TriggerStore::TriggerInfo> expected_info; std::vector<query::TriggerStore::TriggerInfo> expected_info;
store.AddTrigger("trigger", "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE, store.AddTrigger("trigger", "RETURN 1", {}, query::TriggerEventType::VERTEX_CREATE,
query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock); query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{});
expected_info.push_back( expected_info.push_back(
{"trigger", "RETURN 1", query::TriggerEventType::VERTEX_CREATE, query::TriggerPhase::BEFORE_COMMIT}); {"trigger", "RETURN 1", query::TriggerEventType::VERTEX_CREATE, query::TriggerPhase::BEFORE_COMMIT});
@ -968,7 +979,8 @@ TEST_F(TriggerStoreTest, TriggerInfo) {
check_trigger_info(); check_trigger_info();
store.AddTrigger("edge_update_trigger", "RETURN 1", {}, query::TriggerEventType::EDGE_UPDATE, store.AddTrigger("edge_update_trigger", "RETURN 1", {}, query::TriggerEventType::EDGE_UPDATE,
query::TriggerPhase::AFTER_COMMIT, &ast_cache, &*dba, &antlr_lock); query::TriggerPhase::AFTER_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{});
expected_info.push_back( expected_info.push_back(
{"edge_update_trigger", "RETURN 1", query::TriggerEventType::EDGE_UPDATE, query::TriggerPhase::AFTER_COMMIT}); {"edge_update_trigger", "RETURN 1", query::TriggerEventType::EDGE_UPDATE, query::TriggerPhase::AFTER_COMMIT});
@ -1080,7 +1092,8 @@ TEST_F(TriggerStoreTest, AnyTriggerAllKeywords) {
for (const auto keyword : keywords) { for (const auto keyword : keywords) {
SCOPED_TRACE(keyword); SCOPED_TRACE(keyword);
EXPECT_NO_THROW(store.AddTrigger(trigger_name, fmt::format("RETURN {}", keyword), {}, event_type, EXPECT_NO_THROW(store.AddTrigger(trigger_name, fmt::format("RETURN {}", keyword), {}, event_type,
query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock)); query::TriggerPhase::BEFORE_COMMIT, &ast_cache, &*dba, &antlr_lock,
query::InterpreterConfig::Query{}));
store.DropTrigger(trigger_name); store.DropTrigger(trigger_name);
} }
} }