diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8ca057c68..6f54cf476 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -46,6 +46,7 @@ set(memgraph_src_files glue/auth.cpp glue/communication.cpp query/common.cpp + query/distributed_interpreter.cpp query/frontend/ast/ast.cpp query/frontend/ast/cypher_main_visitor.cpp query/frontend/semantic/required_privileges.cpp diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index 60575aa01..432384605 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -334,22 +334,27 @@ class DistributedAccessor : public GraphDbAccessor { class MasterAccessor final : public DistributedAccessor { distributed::IndexRpcClients *index_rpc_clients_{nullptr}; + distributed::PullRpcClients *pull_clients_{nullptr}; int worker_id_{0}; public: MasterAccessor(Master *db, distributed::IndexRpcClients *index_rpc_clients, + distributed::PullRpcClients *pull_clients_, DistributedVertexAccessor *vertex_accessor, DistributedEdgeAccessor *edge_accessor) : DistributedAccessor(db, vertex_accessor, edge_accessor), index_rpc_clients_(index_rpc_clients), + pull_clients_(pull_clients_), worker_id_(db->WorkerId()) {} MasterAccessor(Master *db, tx::TransactionId tx_id, distributed::IndexRpcClients *index_rpc_clients, + distributed::PullRpcClients *pull_clients_, DistributedVertexAccessor *vertex_accessor, DistributedEdgeAccessor *edge_accessor) : DistributedAccessor(db, tx_id, vertex_accessor, edge_accessor), index_rpc_clients_(index_rpc_clients), + pull_clients_(pull_clients_), worker_id_(db->WorkerId()) {} void PostCreateIndex(const LabelPropertyIndex::Key &key) override { @@ -401,6 +406,13 @@ class MasterAccessor final : public DistributedAccessor { } } } + + void AdvanceCommand() override { + DistributedAccessor::AdvanceCommand(); + auto tx_id = transaction_id(); + auto futures = pull_clients_->NotifyAllTransactionCommandAdvanced(tx_id); + for (auto &future : futures) future.wait(); + } }; class WorkerAccessor final : public DistributedAccessor { @@ -677,15 +689,15 @@ Master::~Master() { } std::unique_ptr Master::Access() { - return std::make_unique(this, &impl_->index_rpc_clients_, - &impl_->vertex_accessor_, - &impl_->edge_accessor_); + return std::make_unique( + this, &impl_->index_rpc_clients_, &impl_->pull_clients_, + &impl_->vertex_accessor_, &impl_->edge_accessor_); } std::unique_ptr Master::Access(tx::TransactionId tx_id) { return std::make_unique( - this, tx_id, &impl_->index_rpc_clients_, &impl_->vertex_accessor_, - &impl_->edge_accessor_); + this, tx_id, &impl_->index_rpc_clients_, &impl_->pull_clients_, + &impl_->vertex_accessor_, &impl_->edge_accessor_); } Storage &Master::storage() { return *impl_->storage_; } diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 5983a2a53..3a3e386d1 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -572,7 +572,7 @@ class GraphDbAccessor { tx::TransactionId transaction_id() const; /** Advances transaction's command id by 1. */ - void AdvanceCommand(); + virtual void AdvanceCommand(); /** Commit transaction. */ void Commit(); diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index f819fb67e..c971ad7de 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -21,6 +21,7 @@ #include "glue/communication.hpp" #include "integrations/kafka/exceptions.hpp" #include "integrations/kafka/streams.hpp" +#include "query/distributed_interpreter.hpp" #include "query/exceptions.hpp" #include "query/interpreter.hpp" #include "query/transaction_engine.hpp" @@ -67,8 +68,8 @@ DECLARE_string(durability_directory); /** Encapsulates Dbms and Interpreter that are passed through the network server * and worker to the session. */ struct SessionData { - database::GraphDb &db; - query::Interpreter interpreter{db}; + database::GraphDb *db{nullptr}; + query::Interpreter *interpreter{nullptr}; auth::Auth auth{ std::experimental::filesystem::path(FLAGS_durability_directory) / "auth"}; }; @@ -298,27 +299,28 @@ int WithInit(int argc, char **argv, void SingleNodeMain() { google::SetUsageMessage("Memgraph single-node database server"); database::SingleNode db; - SessionData session_data{db}; + query::Interpreter interpreter; + SessionData session_data{&db, &interpreter}; auto stream_writer = - [&session_data]( - const std::string &query, - const std::map ¶ms) { - auto dba = session_data.db.Access(); - KafkaResultStream stream; - std::map params_tv; - for (const auto &kv : params) - params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); - try { - session_data.interpreter(query, *dba, params_tv, false) - .PullAll(stream); - dba->Commit(); - } catch (const query::QueryException &e) { - LOG(WARNING) << "[Kafka] query execution failed with an exception: " - << e.what(); - dba->Abort(); - } - }; + [&session_data]( + const std::string &query, + const std::map ¶ms) { + auto dba = session_data.db->Access(); + KafkaResultStream stream; + std::map params_tv; + for (const auto &kv : params) + params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); + try { + (*session_data.interpreter)(query, *dba, params_tv, false) + .PullAll(stream); + dba->Commit(); + } catch (const query::QueryException &e) { + LOG(WARNING) << "[Kafka] query execution failed with an exception: " + << e.what(); + dba->Abort(); + } + }; integrations::kafka::Streams kafka_streams{ std::experimental::filesystem::path(FLAGS_durability_directory) / @@ -332,8 +334,8 @@ void SingleNodeMain() { LOG(ERROR) << e.what(); } - session_data.interpreter.auth_ = &session_data.auth; - session_data.interpreter.kafka_streams_ = &kafka_streams; + session_data.interpreter->auth_ = &session_data.auth; + session_data.interpreter->kafka_streams_ = &kafka_streams; ServerContext context; std::string service_name = "Bolt"; @@ -394,19 +396,20 @@ void MasterMain() { google::SetUsageMessage("Memgraph distributed master"); database::Master db; - SessionData session_data{db}; + query::DistributedInterpreter interpreter(&db); + SessionData session_data{&db, &interpreter}; auto stream_writer = [&session_data]( const std::string &query, const std::map ¶ms) { - auto dba = session_data.db.Access(); + auto dba = session_data.db->Access(); KafkaResultStream stream; std::map params_tv; for (const auto &kv : params) params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); try { - session_data.interpreter(query, *dba, params_tv, false) + (*session_data.interpreter)(query, *dba, params_tv, false) .PullAll(stream); dba->Commit(); } catch (const query::QueryException &e) { @@ -428,7 +431,7 @@ void MasterMain() { LOG(ERROR) << e.what(); } - session_data.interpreter.kafka_streams_ = &kafka_streams; + session_data.interpreter->kafka_streams_ = &kafka_streams; ServerContext context; std::string service_name = "Bolt"; diff --git a/src/query/distributed_interpreter.cpp b/src/query/distributed_interpreter.cpp new file mode 100644 index 000000000..d588ccd57 --- /dev/null +++ b/src/query/distributed_interpreter.cpp @@ -0,0 +1,69 @@ +#include "query/distributed_interpreter.hpp" + +#include "database/distributed_graph_db.hpp" +#include "distributed/plan_dispatcher.hpp" +#include "query/plan/planner.hpp" +#include "query/plan/rule_based_planner.hpp" +#include "query/plan/vertex_count_cache.hpp" + +namespace query { + +namespace { + +class DistributedLogicalPlan final : public LogicalPlan { + public: + DistributedLogicalPlan(plan::DistributedPlan plan, double cost, + distributed::PlanDispatcher *plan_dispatcher) + : plan_(std::move(plan)), plan_dispatcher_(plan_dispatcher), cost_(cost) { + CHECK(plan_dispatcher_); + for (const auto &plan_pair : plan_.worker_plans) { + const auto &plan_id = plan_pair.first; + const auto &worker_plan = plan_pair.second; + plan_dispatcher_->DispatchPlan(plan_id, worker_plan, plan_.symbol_table); + } + } + + ~DistributedLogicalPlan() { + for (const auto &plan_pair : plan_.worker_plans) { + const auto &plan_id = plan_pair.first; + plan_dispatcher_->RemovePlan(plan_id); + } + } + + const plan::LogicalOperator &GetRoot() const override { + return *plan_.master_plan; + } + double GetCost() const override { return cost_; } + const SymbolTable &GetSymbolTable() const override { + return plan_.symbol_table; + } + + private: + plan::DistributedPlan plan_; + distributed::PlanDispatcher *plan_dispatcher_{nullptr}; + double cost_; +}; + +} // namespace + +DistributedInterpreter::DistributedInterpreter(database::Master *db) + : plan_dispatcher_(&db->plan_dispatcher()) {} + +std::unique_ptr DistributedInterpreter::MakeLogicalPlan( + AstStorage ast_storage, Context *context) { + auto vertex_counts = plan::MakeVertexCountCache(context->db_accessor_); + auto planning_context = plan::MakePlanningContext( + ast_storage, context->symbol_table_, vertex_counts); + std::unique_ptr tmp_logical_plan; + double cost; + std::tie(tmp_logical_plan, cost) = plan::MakeLogicalPlan( + planning_context, context->parameters_, FLAGS_query_cost_planner); + auto plan = MakeDistributedPlan(*tmp_logical_plan, context->symbol_table_, + next_plan_id_); + VLOG(10) << "[Interpreter] Created plan for distributed execution " + << next_plan_id_ - 1; + return std::make_unique(std::move(plan), cost, + plan_dispatcher_); +} + +} // namespace query diff --git a/src/query/distributed_interpreter.hpp b/src/query/distributed_interpreter.hpp new file mode 100644 index 000000000..7ab8b83f2 --- /dev/null +++ b/src/query/distributed_interpreter.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include "query/interpreter.hpp" + +namespace database { +class Master; +} + +namespace distributed { +class PlanDispatcher; +} + +namespace query { + +class DistributedInterpreter final : public Interpreter { + public: + DistributedInterpreter(database::Master *db); + + private: + std::unique_ptr MakeLogicalPlan(AstStorage, Context *) override; + + std::atomic next_plan_id_{0}; + distributed::PlanDispatcher *plan_dispatcher_{nullptr}; +}; + +} // namespace query diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 41c414535..3914e7235 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -3,8 +3,6 @@ #include #include -#include "database/distributed_graph_db.hpp" -#include "distributed/plan_dispatcher.hpp" #include "query/exceptions.hpp" #include "query/frontend/ast/cypher_main_visitor.hpp" #include "query/frontend/opencypher/parser.hpp" @@ -22,37 +20,8 @@ DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, namespace query { -Interpreter::CachedPlan::CachedPlan( - plan::DistributedPlan distributed_plan, double cost, - distributed::PlanDispatcher *plan_dispatcher) - : distributed_plan_(std::move(distributed_plan)), - cost_(cost), - plan_dispatcher_(plan_dispatcher) { - if (plan_dispatcher_) { - for (const auto &plan_pair : distributed_plan_.worker_plans) { - const auto &plan_id = plan_pair.first; - const auto &worker_plan = plan_pair.second; - plan_dispatcher_->DispatchPlan(plan_id, worker_plan, - distributed_plan_.symbol_table); - } - } -} - -Interpreter::CachedPlan::~CachedPlan() { - if (plan_dispatcher_) { - for (const auto &plan_pair : distributed_plan_.worker_plans) { - const auto &plan_id = plan_pair.first; - plan_dispatcher_->RemovePlan(plan_id); - } - } -} - -Interpreter::Interpreter(database::GraphDb &db) - : plan_dispatcher_( - db.type() == database::GraphDb::Type::DISTRIBUTED_MASTER - // TODO: Replace this with virtual call or some other mechanism. - ? &dynamic_cast(&db)->plan_dispatcher() - : nullptr) {} +Interpreter::CachedPlan::CachedPlan(std::unique_ptr plan) + : plan_(std::move(plan)) {} Interpreter::Results Interpreter::operator()( const std::string &query, database::GraphDbAccessor &db_accessor, @@ -101,9 +70,9 @@ Interpreter::Results Interpreter::operator()( } utils::Timer planning_timer; if (!plan) { - plan = - plan_cache_access.insert(stripped.hash(), AstToPlan(ast_storage, ctx)) - .first->second; + plan = plan_cache_access + .insert(stripped.hash(), AstToPlan(std::move(ast_storage), &ctx)) + .first->second; } auto planning_time = planning_timer.Elapsed(); @@ -138,35 +107,11 @@ Interpreter::Results Interpreter::operator()( } std::shared_ptr Interpreter::AstToPlan( - AstStorage &ast_storage, Context &ctx) { - SymbolGenerator symbol_generator(ctx.symbol_table_); + AstStorage ast_storage, Context *ctx) { + SymbolGenerator symbol_generator(ctx->symbol_table_); ast_storage.query()->Accept(symbol_generator); - - std::unique_ptr tmp_logical_plan; - double query_plan_cost_estimation = 0.0; - std::tie(tmp_logical_plan, query_plan_cost_estimation) = - MakeLogicalPlan(ast_storage, ctx); - - DCHECK(ctx.db_accessor_.db().type() != - database::GraphDb::Type::DISTRIBUTED_WORKER); - if (ctx.db_accessor_.db().type() == - database::GraphDb::Type::DISTRIBUTED_MASTER) { - auto distributed_plan = MakeDistributedPlan( - *tmp_logical_plan, ctx.symbol_table_, next_plan_id_); - VLOG(10) << "[Interpreter] Created plan for distributed execution " - << next_plan_id_ - 1; - return std::make_shared(std::move(distributed_plan), - query_plan_cost_estimation, - plan_dispatcher_); - } else { - return std::make_shared( - plan::DistributedPlan{0, - std::move(tmp_logical_plan), - {}, - std::move(ast_storage), - ctx.symbol_table_}, - query_plan_cost_estimation, plan_dispatcher_); - } + return std::make_shared( + MakeLogicalPlan(std::move(ast_storage), ctx)); } AstStorage Interpreter::QueryToAst(const StrippedQuery &stripped, @@ -219,13 +164,38 @@ AstStorage Interpreter::QueryToAst(const StrippedQuery &stripped, return new_ast; } -std::pair, double> -Interpreter::MakeLogicalPlan(AstStorage &ast_storage, Context &context) { - std::unique_ptr logical_plan; - auto vertex_counts = plan::MakeVertexCountCache(context.db_accessor_); - auto planning_context = plan::MakePlanningContext( - ast_storage, context.symbol_table_, vertex_counts); - return plan::MakeLogicalPlan(planning_context, context.parameters_, - FLAGS_query_cost_planner); +class SingleNodeLogicalPlan final : public LogicalPlan { + public: + SingleNodeLogicalPlan(std::unique_ptr root, + double cost, AstStorage storage, + const SymbolTable &symbol_table) + : root_(std::move(root)), + cost_(cost), + storage_(std::move(storage)), + symbol_table_(symbol_table) {} + + const plan::LogicalOperator &GetRoot() const override { return *root_; } + double GetCost() const override { return cost_; } + const SymbolTable &GetSymbolTable() const override { return symbol_table_; } + + private: + std::unique_ptr root_; + double cost_; + AstStorage storage_; + SymbolTable symbol_table_; }; + +std::unique_ptr Interpreter::MakeLogicalPlan( + AstStorage ast_storage, Context *context) { + auto vertex_counts = plan::MakeVertexCountCache(context->db_accessor_); + auto planning_context = plan::MakePlanningContext( + ast_storage, context->symbol_table_, vertex_counts); + std::unique_ptr root; + double cost; + std::tie(root, cost) = plan::MakeLogicalPlan( + planning_context, context->parameters_, FLAGS_query_cost_planner); + return std::make_unique( + std::move(root), cost, std::move(ast_storage), context->symbol_table_); +} + } // namespace query diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 9dbbb9a7f..e687c97f3 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -14,6 +14,7 @@ #include "utils/thread/sync.hpp" #include "utils/timer.hpp" +DECLARE_bool(query_cost_planner); DECLARE_int32(query_plan_cache_ttl); namespace distributed { @@ -30,23 +31,28 @@ class Streams; namespace query { +// TODO: Maybe this should move to query/plan/planner. +/// Interface for accessing the root operator of a logical plan. +class LogicalPlan { + public: + virtual ~LogicalPlan() {} + + virtual const plan::LogicalOperator &GetRoot() const = 0; + virtual double GetCost() const = 0; + virtual const SymbolTable &GetSymbolTable() const = 0; +}; + class Interpreter { private: /// Encapsulates a plan for caching. Takes care of remote (worker) cache /// updating in distributed memgraph. class CachedPlan { public: - /// Creates a cached plan and sends it to all the workers. - CachedPlan(plan::DistributedPlan distributed_plan, double cost, - distributed::PlanDispatcher *plan_dispatcher); + CachedPlan(std::unique_ptr plan); - /// Removes the cached plan from all the workers. - ~CachedPlan(); - - const auto &plan() const { return *distributed_plan_.master_plan; } - const auto &distributed_plan() const { return distributed_plan_; } - double cost() const { return cost_; } - const auto &symbol_table() const { return distributed_plan_.symbol_table; } + const auto &plan() const { return plan_->GetRoot(); } + double cost() const { return plan_->GetCost(); } + const auto &symbol_table() const { return plan_->GetSymbolTable(); } bool IsExpired() const { return cache_timer_.Elapsed() > @@ -54,12 +60,8 @@ class Interpreter { }; private: - plan::DistributedPlan distributed_plan_; - double cost_; + std::unique_ptr plan_; utils::Timer cache_timer_; - - // Optional, only available in a distributed master. - distributed::PlanDispatcher *plan_dispatcher_{nullptr}; }; using PlanCacheT = ConcurrentMap>; @@ -160,12 +162,14 @@ class Interpreter { std::vector privileges_; }; - explicit Interpreter(database::GraphDb &db); + Interpreter() = default; Interpreter(const Interpreter &) = delete; Interpreter &operator=(const Interpreter &) = delete; Interpreter(Interpreter &&) = delete; Interpreter &operator=(Interpreter &&) = delete; + virtual ~Interpreter() {} + /** * Generates an Results object for the parameters. The resulting object * can be Pulled with its results written to an arbitrary stream. @@ -178,10 +182,15 @@ class Interpreter { auth::Auth *auth_ = nullptr; integrations::kafka::Streams *kafka_streams_ = nullptr; + protected: + // high level tree -> logical plan + // AstStorage and SymbolTable may be modified during planning. The created + // LogicalPlan must take ownership of AstStorage and SymbolTable. + virtual std::unique_ptr MakeLogicalPlan(AstStorage, Context *); + private: ConcurrentMap ast_cache_; PlanCacheT plan_cache_; - std::atomic next_plan_id_{0}; // Antlr has singleton instance that is shared between threads. It is // protected by locks inside of antlr. Unfortunately, they are not protected // in a very good way. Once we have antlr version without race conditions we @@ -190,18 +199,10 @@ class Interpreter { // so this lock probably won't impact performance much... utils::SpinLock antlr_lock_; - // Optional, not null only in a distributed master. - distributed::PlanDispatcher *plan_dispatcher_{nullptr}; - // high level tree -> CachedPlan - std::shared_ptr AstToPlan(AstStorage &ast_storage, Context &ctx); + std::shared_ptr AstToPlan(AstStorage ast_storage, Context *ctx); // stripped query -> high level tree AstStorage QueryToAst(const StrippedQuery &stripped, Context &ctx); - - // high level tree -> (logical plan, plan cost) - // AstStorage and SymbolTable may be modified during planning. - std::pair, double> MakeLogicalPlan( - AstStorage &, Context &); }; } // namespace query diff --git a/src/query/repl.cpp b/src/query/repl.cpp index 9ad528abc..45a41fbd2 100644 --- a/src/query/repl.cpp +++ b/src/query/repl.cpp @@ -48,9 +48,7 @@ std::string ReadLine(const char *prompt) { #endif // HAS_READLINE -void query::Repl(database::GraphDb &db) { - query::Interpreter interpeter{db}; - +void query::Repl(database::GraphDb *db, query::Interpreter *interpreter) { std::cout << "Welcome to *Awesome* Memgraph Read Evaluate Print Loop (AM-REPL)" << std::endl; @@ -63,9 +61,9 @@ void query::Repl(database::GraphDb &db) { // regular cypher queries try { - auto dba = db.Access(); + auto dba = db->Access(); ResultStreamFaker stream; - auto results = interpeter(command, *dba, {}, false); + auto results = (*interpreter)(command, *dba, {}, false); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary()); diff --git a/src/query/repl.hpp b/src/query/repl.hpp index 0eefbe66f..710ba52f9 100644 --- a/src/query/repl.hpp +++ b/src/query/repl.hpp @@ -4,11 +4,13 @@ namespace query { +class Interpreter; + /** * Read Evaluate Print Loop, for interacting with a database (the database in * the given database::GraphDb). Immediately starts the user-input loop and * interprets the entered queries. */ -void Repl(database::GraphDb &); +void Repl(database::GraphDb *, query::Interpreter *); } // namespace query diff --git a/src/query/transaction_engine.hpp b/src/query/transaction_engine.hpp index 5b3a0f0c2..73388b84e 100644 --- a/src/query/transaction_engine.hpp +++ b/src/query/transaction_engine.hpp @@ -12,7 +12,7 @@ namespace query { class TransactionEngine final { public: - TransactionEngine(database::GraphDb &db, Interpreter &interpreter) + TransactionEngine(database::GraphDb *db, Interpreter *interpreter) : db_(db), interpreter_(interpreter) {} ~TransactionEngine() { Abort(); } @@ -59,12 +59,12 @@ class TransactionEngine final { if (in_explicit_transaction_ && db_accessor_) AdvanceCommand(); // Create a DB accessor if we don't yet have one. - if (!db_accessor_) db_accessor_ = db_.Access(); + if (!db_accessor_) db_accessor_ = db_->Access(); // Interpret the query and return the headers. try { - results_.emplace( - interpreter_(query, *db_accessor_, params, in_explicit_transaction_)); + results_.emplace((*interpreter_)(query, *db_accessor_, params, + in_explicit_transaction_)); return {results_->header(), results_->privileges()}; } catch (const utils::BasicException &) { AbortCommand(); @@ -100,8 +100,8 @@ class TransactionEngine final { } private: - database::GraphDb &db_; - Interpreter &interpreter_; + database::GraphDb *db_{nullptr}; + Interpreter *interpreter_{nullptr}; std::unique_ptr db_accessor_; // The `query::Interpreter::Results` object MUST be destroyed before the // `database::GraphDbAccessor` is destroyed because the `Results` object holds @@ -122,14 +122,6 @@ class TransactionEngine final { results_ = std::experimental::nullopt; if (!db_accessor_) return; db_accessor_->AdvanceCommand(); - // TODO: this logic shouldn't be here! - if (db_.type() == database::GraphDb::Type::DISTRIBUTED_MASTER) { - auto *master_db = dynamic_cast(&db_); - auto tx_id = db_accessor_->transaction_id(); - auto futures = - master_db->pull_clients().NotifyAllTransactionCommandAdvanced(tx_id); - for (auto &future : futures) future.wait(); - } } void AbortCommand() { diff --git a/tests/benchmark/expansion.cpp b/tests/benchmark/expansion.cpp index abce0359d..0961fcf31 100644 --- a/tests/benchmark/expansion.cpp +++ b/tests/benchmark/expansion.cpp @@ -13,11 +13,10 @@ class ExpansionBenchFixture : public benchmark::Fixture { // GraphDb shouldn't be global constructed/destructed. See // documentation in database/graph_db.hpp for details. std::experimental::optional db_; - std::experimental::optional interpreter_; + query::Interpreter interpreter_; void SetUp(const benchmark::State &state) override { db_.emplace(); - interpreter_.emplace(db_.value()); auto dba = db_->Access(); for (int i = 0; i < state.range(0); i++) dba->InsertVertex(); @@ -36,11 +35,10 @@ class ExpansionBenchFixture : public benchmark::Fixture { auto dba = db_->Access(); for (auto vertex : dba->Vertices(false)) dba->DetachRemoveVertex(vertex); dba->Commit(); - interpreter_ = std::experimental::nullopt; db_ = std::experimental::nullopt; } - auto &interpreter() { return *interpreter_; } + auto &interpreter() { return interpreter_; } }; BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) { diff --git a/tests/manual/distributed_common.hpp b/tests/manual/distributed_common.hpp index 1ad963540..809c358c1 100644 --- a/tests/manual/distributed_common.hpp +++ b/tests/manual/distributed_common.hpp @@ -7,7 +7,7 @@ #include "database/distributed_graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "glue/communication.hpp" -#include "query/interpreter.hpp" +#include "query/distributed_interpreter.hpp" #include "query/typed_value.hpp" class WorkerInThread { @@ -33,7 +33,8 @@ class Cluster { database::Config masterconfig; masterconfig.master_endpoint = {kLocal, 0}; master_ = std::make_unique(masterconfig); - interpreter_ = std::make_unique(*master_); + interpreter_ = + std::make_unique(master_.get()); std::this_thread::sleep_for(kInitTime); auto worker_config = [this](int worker_id) { @@ -66,7 +67,7 @@ class Cluster { std::map params = {}) { auto dba = master_->Access(); ResultStreamFaker result; - interpreter_->operator()(query, *dba, params, false).PullAll(result); + (*interpreter_)(query, *dba, params, false).PullAll(result); dba->Commit(); return result.GetResults(); }; @@ -74,7 +75,7 @@ class Cluster { private: std::unique_ptr master_; std::vector> workers_; - std::unique_ptr interpreter_; + std::unique_ptr interpreter_; }; void CheckResults( diff --git a/tests/manual/distributed_repl.cpp b/tests/manual/distributed_repl.cpp index 08b69a791..41c96dee7 100644 --- a/tests/manual/distributed_repl.cpp +++ b/tests/manual/distributed_repl.cpp @@ -7,7 +7,7 @@ #include #include "database/distributed_graph_db.hpp" -#include "query/interpreter.hpp" +#include "query/distributed_interpreter.hpp" #include "query/repl.hpp" #include "utils/flag_validation.hpp" @@ -54,7 +54,10 @@ int main(int argc, char *argv[]) { } // Start the REPL - query::Repl(*master); + { + query::DistributedInterpreter interpreter(master.get()); + query::Repl(master.get(), &interpreter); + } master = nullptr; return 0; diff --git a/tests/manual/repl.cpp b/tests/manual/repl.cpp index 445aaa25f..ac7690319 100644 --- a/tests/manual/repl.cpp +++ b/tests/manual/repl.cpp @@ -79,6 +79,7 @@ int main(int argc, char *argv[]) { std::cout << "Generating graph..." << std::endl; // fill_db; random_generate(db, node_count, edge_count); - query::Repl(db); + query::Interpreter interpreter; + query::Repl(&db, &interpreter); return 0; } diff --git a/tests/manual/single_query.cpp b/tests/manual/single_query.cpp index 21b7397bc..18bad3ffd 100644 --- a/tests/manual/single_query.cpp +++ b/tests/manual/single_query.cpp @@ -14,7 +14,7 @@ int main(int argc, char *argv[]) { database::SingleNode db; auto dba = db.Access(); ResultStreamFaker stream; - auto results = query::Interpreter{db}(argv[1], *dba, {}, false); + auto results = query::Interpreter()(argv[1], *dba, {}, false); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary()); diff --git a/tests/unit/database_transaction_timeout.cpp b/tests/unit/database_transaction_timeout.cpp index f1ec5ef15..d4db622cd 100644 --- a/tests/unit/database_transaction_timeout.cpp +++ b/tests/unit/database_transaction_timeout.cpp @@ -10,7 +10,7 @@ DECLARE_int32(query_execution_time_sec); TEST(TransactionTimeout, TransactionTimeout) { FLAGS_query_execution_time_sec = 3; database::SingleNode db; - query::Interpreter interpreter{db}; + query::Interpreter interpreter; auto interpret = [&](auto &dba, const std::string &query) { ResultStreamFaker stream; interpreter(query, dba, {}, false).PullAll(stream); diff --git a/tests/unit/distributed_interpretation.cpp b/tests/unit/distributed_interpretation.cpp index dabac4d39..930eefbd7 100644 --- a/tests/unit/distributed_interpretation.cpp +++ b/tests/unit/distributed_interpretation.cpp @@ -10,7 +10,7 @@ #include "distributed/plan_dispatcher.hpp" #include "distributed/pull_rpc_clients.hpp" #include "distributed_common.hpp" -#include "query/interpreter.hpp" +#include "query/distributed_interpreter.hpp" #include "query_common.hpp" #include "query_plan_common.hpp" #include "utils/timer.hpp" @@ -29,7 +29,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest { void SetUp() override { DistributedGraphDbTest::SetUp(); - interpreter_.emplace(master()); + interpreter_.emplace(&master()); } void TearDown() override { @@ -55,7 +55,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest { } private: - std::experimental::optional interpreter_; + std::experimental::optional interpreter_; }; TEST_F(DistributedInterpretationTest, PullTest) { diff --git a/tests/unit/interpreter.cpp b/tests/unit/interpreter.cpp index b342aa2ec..19c0d201b 100644 --- a/tests/unit/interpreter.cpp +++ b/tests/unit/interpreter.cpp @@ -15,7 +15,7 @@ class InterpreterTest : public ::testing::Test { protected: database::SingleNode db_; - query::Interpreter interpreter_{db_}; + query::Interpreter interpreter_; auto Interpret(const std::string &query, const std::map ¶ms = {}) { diff --git a/tests/unit/query_plan_edge_cases.cpp b/tests/unit/query_plan_edge_cases.cpp index 41c1aa9da..735fa4eef 100644 --- a/tests/unit/query_plan_edge_cases.cpp +++ b/tests/unit/query_plan_edge_cases.cpp @@ -40,7 +40,7 @@ class QueryExecution : public testing::Test { * Does NOT commit the transaction */ auto Execute(const std::string &query) { ResultStreamFaker stream; - auto results = query::Interpreter{*db_}(query, *dba_, {}, false); + auto results = query::Interpreter()(query, *dba_, {}, false); stream.Header(results.header()); results.PullAll(stream); stream.Summary(results.summary());