Extract distributed interpretation out of Interpreter

Reviewers: mtomic, mferencevic, msantl, buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1560
This commit is contained in:
Teon Banek 2018-08-24 10:12:04 +02:00
parent e7cde4b4ef
commit 6427902920
20 changed files with 246 additions and 169 deletions

View File

@ -46,6 +46,7 @@ set(memgraph_src_files
glue/auth.cpp
glue/communication.cpp
query/common.cpp
query/distributed_interpreter.cpp
query/frontend/ast/ast.cpp
query/frontend/ast/cypher_main_visitor.cpp
query/frontend/semantic/required_privileges.cpp

View File

@ -334,22 +334,27 @@ class DistributedAccessor : public GraphDbAccessor {
class MasterAccessor final : public DistributedAccessor {
distributed::IndexRpcClients *index_rpc_clients_{nullptr};
distributed::PullRpcClients *pull_clients_{nullptr};
int worker_id_{0};
public:
MasterAccessor(Master *db, distributed::IndexRpcClients *index_rpc_clients,
distributed::PullRpcClients *pull_clients_,
DistributedVertexAccessor *vertex_accessor,
DistributedEdgeAccessor *edge_accessor)
: DistributedAccessor(db, vertex_accessor, edge_accessor),
index_rpc_clients_(index_rpc_clients),
pull_clients_(pull_clients_),
worker_id_(db->WorkerId()) {}
MasterAccessor(Master *db, tx::TransactionId tx_id,
distributed::IndexRpcClients *index_rpc_clients,
distributed::PullRpcClients *pull_clients_,
DistributedVertexAccessor *vertex_accessor,
DistributedEdgeAccessor *edge_accessor)
: DistributedAccessor(db, tx_id, vertex_accessor, edge_accessor),
index_rpc_clients_(index_rpc_clients),
pull_clients_(pull_clients_),
worker_id_(db->WorkerId()) {}
void PostCreateIndex(const LabelPropertyIndex::Key &key) override {
@ -401,6 +406,13 @@ class MasterAccessor final : public DistributedAccessor {
}
}
}
void AdvanceCommand() override {
DistributedAccessor::AdvanceCommand();
auto tx_id = transaction_id();
auto futures = pull_clients_->NotifyAllTransactionCommandAdvanced(tx_id);
for (auto &future : futures) future.wait();
}
};
class WorkerAccessor final : public DistributedAccessor {
@ -677,15 +689,15 @@ Master::~Master() {
}
std::unique_ptr<GraphDbAccessor> Master::Access() {
return std::make_unique<MasterAccessor>(this, &impl_->index_rpc_clients_,
&impl_->vertex_accessor_,
&impl_->edge_accessor_);
return std::make_unique<MasterAccessor>(
this, &impl_->index_rpc_clients_, &impl_->pull_clients_,
&impl_->vertex_accessor_, &impl_->edge_accessor_);
}
std::unique_ptr<GraphDbAccessor> Master::Access(tx::TransactionId tx_id) {
return std::make_unique<MasterAccessor>(
this, tx_id, &impl_->index_rpc_clients_, &impl_->vertex_accessor_,
&impl_->edge_accessor_);
this, tx_id, &impl_->index_rpc_clients_, &impl_->pull_clients_,
&impl_->vertex_accessor_, &impl_->edge_accessor_);
}
Storage &Master::storage() { return *impl_->storage_; }

View File

@ -572,7 +572,7 @@ class GraphDbAccessor {
tx::TransactionId transaction_id() const;
/** Advances transaction's command id by 1. */
void AdvanceCommand();
virtual void AdvanceCommand();
/** Commit transaction. */
void Commit();

View File

@ -21,6 +21,7 @@
#include "glue/communication.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "query/distributed_interpreter.hpp"
#include "query/exceptions.hpp"
#include "query/interpreter.hpp"
#include "query/transaction_engine.hpp"
@ -67,8 +68,8 @@ DECLARE_string(durability_directory);
/** Encapsulates Dbms and Interpreter that are passed through the network server
* and worker to the session. */
struct SessionData {
database::GraphDb &db;
query::Interpreter interpreter{db};
database::GraphDb *db{nullptr};
query::Interpreter *interpreter{nullptr};
auth::Auth auth{
std::experimental::filesystem::path(FLAGS_durability_directory) / "auth"};
};
@ -298,27 +299,28 @@ int WithInit(int argc, char **argv,
void SingleNodeMain() {
google::SetUsageMessage("Memgraph single-node database server");
database::SingleNode db;
SessionData session_data{db};
query::Interpreter interpreter;
SessionData session_data{&db, &interpreter};
auto stream_writer =
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
auto dba = session_data.db.Access();
KafkaResultStream stream;
std::map<std::string, query::TypedValue> params_tv;
for (const auto &kv : params)
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
try {
session_data.interpreter(query, *dba, params_tv, false)
.PullAll(stream);
dba->Commit();
} catch (const query::QueryException &e) {
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
<< e.what();
dba->Abort();
}
};
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
auto dba = session_data.db->Access();
KafkaResultStream stream;
std::map<std::string, query::TypedValue> params_tv;
for (const auto &kv : params)
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
try {
(*session_data.interpreter)(query, *dba, params_tv, false)
.PullAll(stream);
dba->Commit();
} catch (const query::QueryException &e) {
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
<< e.what();
dba->Abort();
}
};
integrations::kafka::Streams kafka_streams{
std::experimental::filesystem::path(FLAGS_durability_directory) /
@ -332,8 +334,8 @@ void SingleNodeMain() {
LOG(ERROR) << e.what();
}
session_data.interpreter.auth_ = &session_data.auth;
session_data.interpreter.kafka_streams_ = &kafka_streams;
session_data.interpreter->auth_ = &session_data.auth;
session_data.interpreter->kafka_streams_ = &kafka_streams;
ServerContext context;
std::string service_name = "Bolt";
@ -394,19 +396,20 @@ void MasterMain() {
google::SetUsageMessage("Memgraph distributed master");
database::Master db;
SessionData session_data{db};
query::DistributedInterpreter interpreter(&db);
SessionData session_data{&db, &interpreter};
auto stream_writer =
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
auto dba = session_data.db.Access();
auto dba = session_data.db->Access();
KafkaResultStream stream;
std::map<std::string, query::TypedValue> params_tv;
for (const auto &kv : params)
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
try {
session_data.interpreter(query, *dba, params_tv, false)
(*session_data.interpreter)(query, *dba, params_tv, false)
.PullAll(stream);
dba->Commit();
} catch (const query::QueryException &e) {
@ -428,7 +431,7 @@ void MasterMain() {
LOG(ERROR) << e.what();
}
session_data.interpreter.kafka_streams_ = &kafka_streams;
session_data.interpreter->kafka_streams_ = &kafka_streams;
ServerContext context;
std::string service_name = "Bolt";

View File

@ -0,0 +1,69 @@
#include "query/distributed_interpreter.hpp"
#include "database/distributed_graph_db.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "query/plan/planner.hpp"
#include "query/plan/rule_based_planner.hpp"
#include "query/plan/vertex_count_cache.hpp"
namespace query {
namespace {
class DistributedLogicalPlan final : public LogicalPlan {
public:
DistributedLogicalPlan(plan::DistributedPlan plan, double cost,
distributed::PlanDispatcher *plan_dispatcher)
: plan_(std::move(plan)), plan_dispatcher_(plan_dispatcher), cost_(cost) {
CHECK(plan_dispatcher_);
for (const auto &plan_pair : plan_.worker_plans) {
const auto &plan_id = plan_pair.first;
const auto &worker_plan = plan_pair.second;
plan_dispatcher_->DispatchPlan(plan_id, worker_plan, plan_.symbol_table);
}
}
~DistributedLogicalPlan() {
for (const auto &plan_pair : plan_.worker_plans) {
const auto &plan_id = plan_pair.first;
plan_dispatcher_->RemovePlan(plan_id);
}
}
const plan::LogicalOperator &GetRoot() const override {
return *plan_.master_plan;
}
double GetCost() const override { return cost_; }
const SymbolTable &GetSymbolTable() const override {
return plan_.symbol_table;
}
private:
plan::DistributedPlan plan_;
distributed::PlanDispatcher *plan_dispatcher_{nullptr};
double cost_;
};
} // namespace
DistributedInterpreter::DistributedInterpreter(database::Master *db)
: plan_dispatcher_(&db->plan_dispatcher()) {}
std::unique_ptr<LogicalPlan> DistributedInterpreter::MakeLogicalPlan(
AstStorage ast_storage, Context *context) {
auto vertex_counts = plan::MakeVertexCountCache(context->db_accessor_);
auto planning_context = plan::MakePlanningContext(
ast_storage, context->symbol_table_, vertex_counts);
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
double cost;
std::tie(tmp_logical_plan, cost) = plan::MakeLogicalPlan(
planning_context, context->parameters_, FLAGS_query_cost_planner);
auto plan = MakeDistributedPlan(*tmp_logical_plan, context->symbol_table_,
next_plan_id_);
VLOG(10) << "[Interpreter] Created plan for distributed execution "
<< next_plan_id_ - 1;
return std::make_unique<DistributedLogicalPlan>(std::move(plan), cost,
plan_dispatcher_);
}
} // namespace query

View File

@ -0,0 +1,26 @@
#pragma once
#include "query/interpreter.hpp"
namespace database {
class Master;
}
namespace distributed {
class PlanDispatcher;
}
namespace query {
class DistributedInterpreter final : public Interpreter {
public:
DistributedInterpreter(database::Master *db);
private:
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage, Context *) override;
std::atomic<int64_t> next_plan_id_{0};
distributed::PlanDispatcher *plan_dispatcher_{nullptr};
};
} // namespace query

View File

@ -3,8 +3,6 @@
#include <glog/logging.h>
#include <limits>
#include "database/distributed_graph_db.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/opencypher/parser.hpp"
@ -22,37 +20,8 @@ DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60,
namespace query {
Interpreter::CachedPlan::CachedPlan(
plan::DistributedPlan distributed_plan, double cost,
distributed::PlanDispatcher *plan_dispatcher)
: distributed_plan_(std::move(distributed_plan)),
cost_(cost),
plan_dispatcher_(plan_dispatcher) {
if (plan_dispatcher_) {
for (const auto &plan_pair : distributed_plan_.worker_plans) {
const auto &plan_id = plan_pair.first;
const auto &worker_plan = plan_pair.second;
plan_dispatcher_->DispatchPlan(plan_id, worker_plan,
distributed_plan_.symbol_table);
}
}
}
Interpreter::CachedPlan::~CachedPlan() {
if (plan_dispatcher_) {
for (const auto &plan_pair : distributed_plan_.worker_plans) {
const auto &plan_id = plan_pair.first;
plan_dispatcher_->RemovePlan(plan_id);
}
}
}
Interpreter::Interpreter(database::GraphDb &db)
: plan_dispatcher_(
db.type() == database::GraphDb::Type::DISTRIBUTED_MASTER
// TODO: Replace this with virtual call or some other mechanism.
? &dynamic_cast<database::Master *>(&db)->plan_dispatcher()
: nullptr) {}
Interpreter::CachedPlan::CachedPlan(std::unique_ptr<LogicalPlan> plan)
: plan_(std::move(plan)) {}
Interpreter::Results Interpreter::operator()(
const std::string &query, database::GraphDbAccessor &db_accessor,
@ -101,9 +70,9 @@ Interpreter::Results Interpreter::operator()(
}
utils::Timer planning_timer;
if (!plan) {
plan =
plan_cache_access.insert(stripped.hash(), AstToPlan(ast_storage, ctx))
.first->second;
plan = plan_cache_access
.insert(stripped.hash(), AstToPlan(std::move(ast_storage), &ctx))
.first->second;
}
auto planning_time = planning_timer.Elapsed();
@ -138,35 +107,11 @@ Interpreter::Results Interpreter::operator()(
}
std::shared_ptr<Interpreter::CachedPlan> Interpreter::AstToPlan(
AstStorage &ast_storage, Context &ctx) {
SymbolGenerator symbol_generator(ctx.symbol_table_);
AstStorage ast_storage, Context *ctx) {
SymbolGenerator symbol_generator(ctx->symbol_table_);
ast_storage.query()->Accept(symbol_generator);
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
double query_plan_cost_estimation = 0.0;
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
MakeLogicalPlan(ast_storage, ctx);
DCHECK(ctx.db_accessor_.db().type() !=
database::GraphDb::Type::DISTRIBUTED_WORKER);
if (ctx.db_accessor_.db().type() ==
database::GraphDb::Type::DISTRIBUTED_MASTER) {
auto distributed_plan = MakeDistributedPlan(
*tmp_logical_plan, ctx.symbol_table_, next_plan_id_);
VLOG(10) << "[Interpreter] Created plan for distributed execution "
<< next_plan_id_ - 1;
return std::make_shared<CachedPlan>(std::move(distributed_plan),
query_plan_cost_estimation,
plan_dispatcher_);
} else {
return std::make_shared<CachedPlan>(
plan::DistributedPlan{0,
std::move(tmp_logical_plan),
{},
std::move(ast_storage),
ctx.symbol_table_},
query_plan_cost_estimation, plan_dispatcher_);
}
return std::make_shared<CachedPlan>(
MakeLogicalPlan(std::move(ast_storage), ctx));
}
AstStorage Interpreter::QueryToAst(const StrippedQuery &stripped,
@ -219,13 +164,38 @@ AstStorage Interpreter::QueryToAst(const StrippedQuery &stripped,
return new_ast;
}
std::pair<std::unique_ptr<plan::LogicalOperator>, double>
Interpreter::MakeLogicalPlan(AstStorage &ast_storage, Context &context) {
std::unique_ptr<plan::LogicalOperator> logical_plan;
auto vertex_counts = plan::MakeVertexCountCache(context.db_accessor_);
auto planning_context = plan::MakePlanningContext(
ast_storage, context.symbol_table_, vertex_counts);
return plan::MakeLogicalPlan(planning_context, context.parameters_,
FLAGS_query_cost_planner);
class SingleNodeLogicalPlan final : public LogicalPlan {
public:
SingleNodeLogicalPlan(std::unique_ptr<plan::LogicalOperator> root,
double cost, AstStorage storage,
const SymbolTable &symbol_table)
: root_(std::move(root)),
cost_(cost),
storage_(std::move(storage)),
symbol_table_(symbol_table) {}
const plan::LogicalOperator &GetRoot() const override { return *root_; }
double GetCost() const override { return cost_; }
const SymbolTable &GetSymbolTable() const override { return symbol_table_; }
private:
std::unique_ptr<plan::LogicalOperator> root_;
double cost_;
AstStorage storage_;
SymbolTable symbol_table_;
};
std::unique_ptr<LogicalPlan> Interpreter::MakeLogicalPlan(
AstStorage ast_storage, Context *context) {
auto vertex_counts = plan::MakeVertexCountCache(context->db_accessor_);
auto planning_context = plan::MakePlanningContext(
ast_storage, context->symbol_table_, vertex_counts);
std::unique_ptr<plan::LogicalOperator> root;
double cost;
std::tie(root, cost) = plan::MakeLogicalPlan(
planning_context, context->parameters_, FLAGS_query_cost_planner);
return std::make_unique<SingleNodeLogicalPlan>(
std::move(root), cost, std::move(ast_storage), context->symbol_table_);
}
} // namespace query

View File

@ -14,6 +14,7 @@
#include "utils/thread/sync.hpp"
#include "utils/timer.hpp"
DECLARE_bool(query_cost_planner);
DECLARE_int32(query_plan_cache_ttl);
namespace distributed {
@ -30,23 +31,28 @@ class Streams;
namespace query {
// TODO: Maybe this should move to query/plan/planner.
/// Interface for accessing the root operator of a logical plan.
class LogicalPlan {
public:
virtual ~LogicalPlan() {}
virtual const plan::LogicalOperator &GetRoot() const = 0;
virtual double GetCost() const = 0;
virtual const SymbolTable &GetSymbolTable() const = 0;
};
class Interpreter {
private:
/// Encapsulates a plan for caching. Takes care of remote (worker) cache
/// updating in distributed memgraph.
class CachedPlan {
public:
/// Creates a cached plan and sends it to all the workers.
CachedPlan(plan::DistributedPlan distributed_plan, double cost,
distributed::PlanDispatcher *plan_dispatcher);
CachedPlan(std::unique_ptr<LogicalPlan> plan);
/// Removes the cached plan from all the workers.
~CachedPlan();
const auto &plan() const { return *distributed_plan_.master_plan; }
const auto &distributed_plan() const { return distributed_plan_; }
double cost() const { return cost_; }
const auto &symbol_table() const { return distributed_plan_.symbol_table; }
const auto &plan() const { return plan_->GetRoot(); }
double cost() const { return plan_->GetCost(); }
const auto &symbol_table() const { return plan_->GetSymbolTable(); }
bool IsExpired() const {
return cache_timer_.Elapsed() >
@ -54,12 +60,8 @@ class Interpreter {
};
private:
plan::DistributedPlan distributed_plan_;
double cost_;
std::unique_ptr<LogicalPlan> plan_;
utils::Timer cache_timer_;
// Optional, only available in a distributed master.
distributed::PlanDispatcher *plan_dispatcher_{nullptr};
};
using PlanCacheT = ConcurrentMap<HashType, std::shared_ptr<CachedPlan>>;
@ -160,12 +162,14 @@ class Interpreter {
std::vector<AuthQuery::Privilege> privileges_;
};
explicit Interpreter(database::GraphDb &db);
Interpreter() = default;
Interpreter(const Interpreter &) = delete;
Interpreter &operator=(const Interpreter &) = delete;
Interpreter(Interpreter &&) = delete;
Interpreter &operator=(Interpreter &&) = delete;
virtual ~Interpreter() {}
/**
* Generates an Results object for the parameters. The resulting object
* can be Pulled with its results written to an arbitrary stream.
@ -178,10 +182,15 @@ class Interpreter {
auth::Auth *auth_ = nullptr;
integrations::kafka::Streams *kafka_streams_ = nullptr;
protected:
// high level tree -> logical plan
// AstStorage and SymbolTable may be modified during planning. The created
// LogicalPlan must take ownership of AstStorage and SymbolTable.
virtual std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage, Context *);
private:
ConcurrentMap<HashType, AstStorage> ast_cache_;
PlanCacheT plan_cache_;
std::atomic<int64_t> next_plan_id_{0};
// Antlr has singleton instance that is shared between threads. It is
// protected by locks inside of antlr. Unfortunately, they are not protected
// in a very good way. Once we have antlr version without race conditions we
@ -190,18 +199,10 @@ class Interpreter {
// so this lock probably won't impact performance much...
utils::SpinLock antlr_lock_;
// Optional, not null only in a distributed master.
distributed::PlanDispatcher *plan_dispatcher_{nullptr};
// high level tree -> CachedPlan
std::shared_ptr<CachedPlan> AstToPlan(AstStorage &ast_storage, Context &ctx);
std::shared_ptr<CachedPlan> AstToPlan(AstStorage ast_storage, Context *ctx);
// stripped query -> high level tree
AstStorage QueryToAst(const StrippedQuery &stripped, Context &ctx);
// high level tree -> (logical plan, plan cost)
// AstStorage and SymbolTable may be modified during planning.
std::pair<std::unique_ptr<plan::LogicalOperator>, double> MakeLogicalPlan(
AstStorage &, Context &);
};
} // namespace query

View File

@ -48,9 +48,7 @@ std::string ReadLine(const char *prompt) {
#endif // HAS_READLINE
void query::Repl(database::GraphDb &db) {
query::Interpreter interpeter{db};
void query::Repl(database::GraphDb *db, query::Interpreter *interpreter) {
std::cout
<< "Welcome to *Awesome* Memgraph Read Evaluate Print Loop (AM-REPL)"
<< std::endl;
@ -63,9 +61,9 @@ void query::Repl(database::GraphDb &db) {
// regular cypher queries
try {
auto dba = db.Access();
auto dba = db->Access();
ResultStreamFaker<query::TypedValue> stream;
auto results = interpeter(command, *dba, {}, false);
auto results = (*interpreter)(command, *dba, {}, false);
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());

View File

@ -4,11 +4,13 @@
namespace query {
class Interpreter;
/**
* Read Evaluate Print Loop, for interacting with a database (the database in
* the given database::GraphDb). Immediately starts the user-input loop and
* interprets the entered queries.
*/
void Repl(database::GraphDb &);
void Repl(database::GraphDb *, query::Interpreter *);
} // namespace query

View File

@ -12,7 +12,7 @@ namespace query {
class TransactionEngine final {
public:
TransactionEngine(database::GraphDb &db, Interpreter &interpreter)
TransactionEngine(database::GraphDb *db, Interpreter *interpreter)
: db_(db), interpreter_(interpreter) {}
~TransactionEngine() { Abort(); }
@ -59,12 +59,12 @@ class TransactionEngine final {
if (in_explicit_transaction_ && db_accessor_) AdvanceCommand();
// Create a DB accessor if we don't yet have one.
if (!db_accessor_) db_accessor_ = db_.Access();
if (!db_accessor_) db_accessor_ = db_->Access();
// Interpret the query and return the headers.
try {
results_.emplace(
interpreter_(query, *db_accessor_, params, in_explicit_transaction_));
results_.emplace((*interpreter_)(query, *db_accessor_, params,
in_explicit_transaction_));
return {results_->header(), results_->privileges()};
} catch (const utils::BasicException &) {
AbortCommand();
@ -100,8 +100,8 @@ class TransactionEngine final {
}
private:
database::GraphDb &db_;
Interpreter &interpreter_;
database::GraphDb *db_{nullptr};
Interpreter *interpreter_{nullptr};
std::unique_ptr<database::GraphDbAccessor> db_accessor_;
// The `query::Interpreter::Results` object MUST be destroyed before the
// `database::GraphDbAccessor` is destroyed because the `Results` object holds
@ -122,14 +122,6 @@ class TransactionEngine final {
results_ = std::experimental::nullopt;
if (!db_accessor_) return;
db_accessor_->AdvanceCommand();
// TODO: this logic shouldn't be here!
if (db_.type() == database::GraphDb::Type::DISTRIBUTED_MASTER) {
auto *master_db = dynamic_cast<database::Master *>(&db_);
auto tx_id = db_accessor_->transaction_id();
auto futures =
master_db->pull_clients().NotifyAllTransactionCommandAdvanced(tx_id);
for (auto &future : futures) future.wait();
}
}
void AbortCommand() {

View File

@ -13,11 +13,10 @@ class ExpansionBenchFixture : public benchmark::Fixture {
// GraphDb shouldn't be global constructed/destructed. See
// documentation in database/graph_db.hpp for details.
std::experimental::optional<database::SingleNode> db_;
std::experimental::optional<query::Interpreter> interpreter_;
query::Interpreter interpreter_;
void SetUp(const benchmark::State &state) override {
db_.emplace();
interpreter_.emplace(db_.value());
auto dba = db_->Access();
for (int i = 0; i < state.range(0); i++) dba->InsertVertex();
@ -36,11 +35,10 @@ class ExpansionBenchFixture : public benchmark::Fixture {
auto dba = db_->Access();
for (auto vertex : dba->Vertices(false)) dba->DetachRemoveVertex(vertex);
dba->Commit();
interpreter_ = std::experimental::nullopt;
db_ = std::experimental::nullopt;
}
auto &interpreter() { return *interpreter_; }
auto &interpreter() { return interpreter_; }
};
BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {

View File

@ -7,7 +7,7 @@
#include "database/distributed_graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "glue/communication.hpp"
#include "query/interpreter.hpp"
#include "query/distributed_interpreter.hpp"
#include "query/typed_value.hpp"
class WorkerInThread {
@ -33,7 +33,8 @@ class Cluster {
database::Config masterconfig;
masterconfig.master_endpoint = {kLocal, 0};
master_ = std::make_unique<database::Master>(masterconfig);
interpreter_ = std::make_unique<query::Interpreter>(*master_);
interpreter_ =
std::make_unique<query::DistributedInterpreter>(master_.get());
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {
@ -66,7 +67,7 @@ class Cluster {
std::map<std::string, query::TypedValue> params = {}) {
auto dba = master_->Access();
ResultStreamFaker<query::TypedValue> result;
interpreter_->operator()(query, *dba, params, false).PullAll(result);
(*interpreter_)(query, *dba, params, false).PullAll(result);
dba->Commit();
return result.GetResults();
};
@ -74,7 +75,7 @@ class Cluster {
private:
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;
std::unique_ptr<query::Interpreter> interpreter_;
std::unique_ptr<query::DistributedInterpreter> interpreter_;
};
void CheckResults(

View File

@ -7,7 +7,7 @@
#include <glog/logging.h>
#include "database/distributed_graph_db.hpp"
#include "query/interpreter.hpp"
#include "query/distributed_interpreter.hpp"
#include "query/repl.hpp"
#include "utils/flag_validation.hpp"
@ -54,7 +54,10 @@ int main(int argc, char *argv[]) {
}
// Start the REPL
query::Repl(*master);
{
query::DistributedInterpreter interpreter(master.get());
query::Repl(master.get(), &interpreter);
}
master = nullptr;
return 0;

View File

@ -79,6 +79,7 @@ int main(int argc, char *argv[]) {
std::cout << "Generating graph..." << std::endl;
// fill_db;
random_generate(db, node_count, edge_count);
query::Repl(db);
query::Interpreter interpreter;
query::Repl(&db, &interpreter);
return 0;
}

View File

@ -14,7 +14,7 @@ int main(int argc, char *argv[]) {
database::SingleNode db;
auto dba = db.Access();
ResultStreamFaker<query::TypedValue> stream;
auto results = query::Interpreter{db}(argv[1], *dba, {}, false);
auto results = query::Interpreter()(argv[1], *dba, {}, false);
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());

View File

@ -10,7 +10,7 @@ DECLARE_int32(query_execution_time_sec);
TEST(TransactionTimeout, TransactionTimeout) {
FLAGS_query_execution_time_sec = 3;
database::SingleNode db;
query::Interpreter interpreter{db};
query::Interpreter interpreter;
auto interpret = [&](auto &dba, const std::string &query) {
ResultStreamFaker<query::TypedValue> stream;
interpreter(query, dba, {}, false).PullAll(stream);

View File

@ -10,7 +10,7 @@
#include "distributed/plan_dispatcher.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "distributed_common.hpp"
#include "query/interpreter.hpp"
#include "query/distributed_interpreter.hpp"
#include "query_common.hpp"
#include "query_plan_common.hpp"
#include "utils/timer.hpp"
@ -29,7 +29,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest {
void SetUp() override {
DistributedGraphDbTest::SetUp();
interpreter_.emplace(master());
interpreter_.emplace(&master());
}
void TearDown() override {
@ -55,7 +55,7 @@ class DistributedInterpretationTest : public DistributedGraphDbTest {
}
private:
std::experimental::optional<query::Interpreter> interpreter_;
std::experimental::optional<query::DistributedInterpreter> interpreter_;
};
TEST_F(DistributedInterpretationTest, PullTest) {

View File

@ -15,7 +15,7 @@
class InterpreterTest : public ::testing::Test {
protected:
database::SingleNode db_;
query::Interpreter interpreter_{db_};
query::Interpreter interpreter_;
auto Interpret(const std::string &query,
const std::map<std::string, query::TypedValue> &params = {}) {

View File

@ -40,7 +40,7 @@ class QueryExecution : public testing::Test {
* Does NOT commit the transaction */
auto Execute(const std::string &query) {
ResultStreamFaker<query::TypedValue> stream;
auto results = query::Interpreter{*db_}(query, *dba_, {}, false);
auto results = query::Interpreter()(query, *dba_, {}, false);
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());