Invalidate distributed plan caches

Summary:
 - Remove caches on workers as a result of plan expiration or race
   during insertion.
 - Extract caching functionality into a class.
 - Minor refactor of Interpreter::operator()
 - New RPC and test for it.
 - Rename ConsumePlanRes to DispatchPlanRes for consistency, remove
   return value as it's always true and never used.
 - Interpreter is now constructed with a `GraphDb` reference. At the
   moment only for reaching the `distributed::PlanDispatcher`, but in
   the future we should probably use that primarily for planning.

I added a function to `PlanConsumer` that is only used for testing.
I prefer not doing this, but I felt this needed testing. I can remove
it now if you like.

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1292
This commit is contained in:
florijan 2018-03-13 10:35:14 +01:00
parent 226992f420
commit 03f1547a8d
18 changed files with 253 additions and 107 deletions

View File

@ -31,7 +31,7 @@ namespace communication::bolt {
* and worker to the session. */
struct SessionData {
database::MasterBase &db;
query::Interpreter interpreter;
query::Interpreter interpreter{db};
};
/**

View File

@ -63,7 +63,9 @@ BOOST_CLASS_EXPORT(distributed::TxGidPair);
// Distributed plan exchange.
BOOST_CLASS_EXPORT(distributed::DispatchPlanReq);
BOOST_CLASS_EXPORT(distributed::ConsumePlanRes);
BOOST_CLASS_EXPORT(distributed::DispatchPlanRes);
BOOST_CLASS_EXPORT(distributed::RemovePlanReq);
BOOST_CLASS_EXPORT(distributed::RemovePlanRes);
// Remote pull.
BOOST_CLASS_EXPORT(distributed::RemotePullReq);

View File

@ -10,7 +10,12 @@ PlanConsumer::PlanConsumer(communication::rpc::Server &server)
std::make_unique<PlanPack>(
req.plan_, req.symbol_table_,
std::move(const_cast<DispatchPlanReq &>(req).storage_)));
return std::make_unique<ConsumePlanRes>(true);
return std::make_unique<DispatchPlanRes>();
});
server_.Register<RemovePlanRpc>([this](const RemovePlanReq &req) {
plan_cache_.access().remove(req.member);
return std::make_unique<RemovePlanRes>();
});
}
@ -22,4 +27,13 @@ PlanConsumer::PlanPack &PlanConsumer::PlanForId(int64_t plan_id) const {
return *found->second;
}
std::vector<int64_t> PlanConsumer::CachedPlanIds() const {
std::vector<int64_t> plan_ids;
auto access = plan_cache_.access();
plan_ids.reserve(access.size());
for (auto &kv : access) plan_ids.emplace_back(kv.first);
return plan_ids;
}
} // namespace distributed

View File

@ -1,5 +1,7 @@
#pragma once
#include <vector>
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "distributed/plan_rpc_messages.hpp"
@ -29,6 +31,9 @@ class PlanConsumer {
/** Return cached plan and symbol table for a given plan id. */
PlanPack &PlanForId(int64_t plan_id) const;
/** Return the ids of all the cached plans. For testing. */
std::vector<int64_t> CachedPlanIds() const;
private:
communication::rpc::Server &server_;
// TODO remove unique_ptr. This is to get it to work, emplacing into a

View File

@ -20,4 +20,16 @@ void PlanDispatcher::DispatchPlan(
}
}
void PlanDispatcher::RemovePlan(int64_t plan_id) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [plan_id](communication::rpc::ClientPool &client_pool) {
auto result = client_pool.Call<RemovePlanRpc>(plan_id);
CHECK(result) << "Failed to remove plan from worker";
});
for (auto &future : futures) {
future.wait();
}
}
} // namespace distributed

View File

@ -15,14 +15,14 @@ class PlanDispatcher {
public:
explicit PlanDispatcher(RpcWorkerClients &clients);
/**
* Synchronously dispatch a plan to all workers and wait for their
* acknowledgement.
*/
/** Dispatch a plan to all workers and wait for their acknowledgement. */
void DispatchPlan(int64_t plan_id,
std::shared_ptr<query::plan::LogicalOperator> plan,
const SymbolTable &symbol_table);
/** Remove a plan from all workers and wait for their acknowledgement. */
void RemovePlan(int64_t plan_id);
private:
RpcWorkerClients &clients_;
};

View File

@ -50,9 +50,14 @@ struct DispatchPlanReq : public Message {
}
};
RPC_SINGLE_MEMBER_MESSAGE(ConsumePlanRes, bool);
RPC_NO_MEMBER_MESSAGE(DispatchPlanRes);
using DistributedPlanRpc =
communication::rpc::RequestResponse<DispatchPlanReq, ConsumePlanRes>;
communication::rpc::RequestResponse<DispatchPlanReq, DispatchPlanRes>;
RPC_SINGLE_MEMBER_MESSAGE(RemovePlanReq, int64_t);
RPC_NO_MEMBER_MESSAGE(RemovePlanRes);
using RemovePlanRpc =
communication::rpc::RequestResponse<RemovePlanReq, RemovePlanRes>;
} // namespace distributed

View File

@ -1,26 +1,88 @@
#include "query/interpreter.hpp"
#include <glog/logging.h>
#include <limits>
#include "distributed/plan_dispatcher.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/opencypher/parser.hpp"
#include "query/frontend/semantic/symbol_generator.hpp"
#include "query/plan/distributed.hpp"
#include "query/plan/planner.hpp"
#include "query/plan/vertex_count_cache.hpp"
#include "utils/flag_validation.hpp"
DEFINE_HIDDEN_bool(query_cost_planner, true,
"Use the cost-estimating query planner.");
DEFINE_bool(query_plan_cache, true, "Cache generated query plans");
DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60,
"Time to live for cached query plans, in seconds.",
FLAG_IN_RANGE(0, std::numeric_limits<int32_t>::max()));
namespace query {
std::shared_ptr<Interpreter::CachedPlan> Interpreter::PlanCache::Find(
HashType hash) {
auto accessor = cache_.access();
auto it = accessor.find(hash);
if (it == accessor.end()) return nullptr;
if (!it->second->IsExpired()) return it->second;
Remove(hash);
return nullptr;
}
std::shared_ptr<Interpreter::CachedPlan> Interpreter::PlanCache::Insert(
HashType hash, std::shared_ptr<CachedPlan> plan) {
// Dispatch plans to workers (if we have any for them).
if (plan_dispatcher_) {
for (const auto &plan_pair : plan->distributed_plan().worker_plans) {
const auto &plan_id = plan_pair.first;
const auto &worker_plan = plan_pair.second;
plan_dispatcher_->DispatchPlan(plan_id, worker_plan,
plan->symbol_table());
}
}
auto access = cache_.access();
auto insertion = access.insert(hash, plan);
// If the same plan was already cached, invalidate the above dispatched
// plans on the workers.
if (!insertion.second && plan_dispatcher_) {
RemoveFromWorkers(*plan);
}
return insertion.first->second;
}
void Interpreter::PlanCache::Remove(HashType hash) {
auto access = cache_.access();
auto found = access.find(hash);
if (found == access.end()) return;
RemoveFromWorkers(*found->second);
cache_.access().remove(hash);
}
void Interpreter::PlanCache::Clear() {
auto access = cache_.access();
for (auto &kv : access) {
RemoveFromWorkers(*kv.second);
access.remove(kv.first);
}
}
void Interpreter::PlanCache::RemoveFromWorkers(const CachedPlan &plan) {
for (const auto &plan_pair : plan.distributed_plan().worker_plans) {
const auto &plan_id = plan_pair.first;
plan_dispatcher_->RemovePlan(plan_id);
}
}
Interpreter::Interpreter(database::GraphDb &db)
: plan_cache_(db.type() == database::GraphDb::Type::DISTRIBUTED_MASTER
? &db.plan_dispatcher()
: nullptr) {}
Interpreter::Results Interpreter::operator()(
const std::string &query, database::GraphDbAccessor &db_accessor,
const std::map<std::string, TypedValue> &params,
@ -43,71 +105,20 @@ Interpreter::Results Interpreter::operator()(
}
ctx.parameters_.Add(param_pair.first, param_it->second);
}
// Check if we have a cached logical plan ready, so that we can skip the
// whole query -> AST -> logical_plan process. Note that this local shared_ptr
// might be the only owner of the CachedPlan (if caching is turned off).
// Ensure it lives during the whole interpretation.
std::shared_ptr<CachedPlan> plan(nullptr);
{
auto plan_cache_accessor = plan_cache_.access();
auto plan_cache_it = plan_cache_accessor.find(stripped.hash());
if (plan_cache_it != plan_cache_accessor.end()) {
if (plan_cache_it->second->IsExpired()) {
plan_cache_accessor.remove(stripped.hash());
} else {
plan = plan_cache_it->second;
}
}
}
auto frontend_time = frontend_timer.Elapsed();
// Try to get a cached plan. Note that this local shared_ptr might be the only
// owner of the CachedPlan, so ensure it lives during the whole
// interpretation.
std::shared_ptr<CachedPlan> plan = plan_cache_.Find(stripped.hash());
utils::Timer planning_timer;
if (!plan) {
AstTreeStorage ast_storage = QueryToAst(stripped, ctx);
SymbolGenerator symbol_generator(ctx.symbol_table_);
ast_storage.query()->Accept(symbol_generator);
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
double query_plan_cost_estimation = 0.0;
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
MakeLogicalPlan(ast_storage, db_accessor, ctx);
DCHECK(db_accessor.db().type() !=
database::GraphDb::Type::DISTRIBUTED_WORKER);
if (db_accessor.db().type() ==
database::GraphDb::Type::DISTRIBUTED_MASTER) {
auto distributed_plan = MakeDistributedPlan(
*tmp_logical_plan, ctx.symbol_table_, next_plan_id_);
plan = std::make_shared<CachedPlan>(std::move(distributed_plan),
query_plan_cost_estimation);
} else {
plan = std::make_shared<CachedPlan>(
std::move(tmp_logical_plan), query_plan_cost_estimation,
ctx.symbol_table_, std::move(ast_storage));
}
// Dispatch plans to workers (if we have any for them).
for (const auto &plan_pair : plan->distributed_plan().worker_plans) {
const auto &plan_id = plan_pair.first;
const auto &worker_plan = plan_pair.second;
auto &dispatcher = db_accessor.db().plan_dispatcher();
dispatcher.DispatchPlan(plan_id, worker_plan, plan->symbol_table());
}
if (FLAGS_query_plan_cache) {
// TODO: If the same plan was already cached, invalidate the dispatched
// plans (above) from workers.
plan = plan_cache_.access().insert(stripped.hash(), plan).first->second;
}
plan = plan_cache_.Insert(stripped.hash(), QueryToPlan(stripped, ctx));
}
auto planning_time = planning_timer.Elapsed();
ctx.symbol_table_ = plan->symbol_table();
auto planning_time = planning_timer.Elapsed();
std::map<std::string, TypedValue> summary;
summary["parsing_time"] = frontend_time.count();
summary["planning_time"] = planning_time.count();
@ -136,6 +147,32 @@ Interpreter::Results Interpreter::operator()(
header, summary, plan_cache_);
}
std::shared_ptr<Interpreter::CachedPlan> Interpreter::QueryToPlan(
const StrippedQuery &stripped, Context &ctx) {
AstTreeStorage ast_storage = QueryToAst(stripped, ctx);
SymbolGenerator symbol_generator(ctx.symbol_table_);
ast_storage.query()->Accept(symbol_generator);
std::unique_ptr<plan::LogicalOperator> tmp_logical_plan;
double query_plan_cost_estimation = 0.0;
std::tie(tmp_logical_plan, query_plan_cost_estimation) =
MakeLogicalPlan(ast_storage, ctx);
DCHECK(ctx.db_accessor_.db().type() !=
database::GraphDb::Type::DISTRIBUTED_WORKER);
if (ctx.db_accessor_.db().type() ==
database::GraphDb::Type::DISTRIBUTED_MASTER) {
auto distributed_plan = MakeDistributedPlan(
*tmp_logical_plan, ctx.symbol_table_, next_plan_id_);
return std::make_shared<CachedPlan>(std::move(distributed_plan),
query_plan_cost_estimation);
} else {
return std::make_shared<CachedPlan>(
std::move(tmp_logical_plan), query_plan_cost_estimation,
ctx.symbol_table_, std::move(ast_storage));
}
}
AstTreeStorage Interpreter::QueryToAst(const StrippedQuery &stripped,
Context &ctx) {
if (!ctx.is_query_cached_) {
@ -187,11 +224,9 @@ AstTreeStorage Interpreter::QueryToAst(const StrippedQuery &stripped,
}
std::pair<std::unique_ptr<plan::LogicalOperator>, double>
Interpreter::MakeLogicalPlan(AstTreeStorage &ast_storage,
const database::GraphDbAccessor &db_accessor,
Context &context) {
Interpreter::MakeLogicalPlan(AstTreeStorage &ast_storage, Context &context) {
std::unique_ptr<plan::LogicalOperator> logical_plan;
auto vertex_counts = plan::MakeVertexCountCache(db_accessor);
auto vertex_counts = plan::MakeVertexCountCache(context.db_accessor_);
auto planning_context = plan::MakePlanningContext(
ast_storage, context.symbol_table_, vertex_counts);
return plan::MakeLogicalPlan(planning_context, context.parameters_,

View File

@ -1,12 +1,9 @@
#pragma once
#include <ctime>
#include <limits>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "query/context.hpp"
#include "query/frontend/ast/ast.hpp"
@ -19,6 +16,10 @@
DECLARE_int32(query_plan_cache_ttl);
namespace distributed {
class PlanDispatcher;
}
namespace query {
class Interpreter {
@ -30,7 +31,10 @@ class Interpreter {
CachedPlan(std::unique_ptr<plan::LogicalOperator> plan, double cost,
SymbolTable symbol_table, AstTreeStorage storage)
: distributed_plan_{0, std::move(plan), {}, std::move(storage),
: distributed_plan_{0,
std::move(plan),
{},
std::move(storage),
symbol_table},
cost_(cost) {}
@ -40,8 +44,7 @@ class Interpreter {
const auto &symbol_table() const { return distributed_plan_.symbol_table; }
bool IsExpired() const {
auto elapsed = cache_timer_.Elapsed();
return std::chrono::duration_cast<std::chrono::seconds>(elapsed) >
return cache_timer_.Elapsed() >
std::chrono::seconds(FLAGS_query_plan_cache_ttl);
};
@ -51,18 +54,54 @@ class Interpreter {
utils::Timer cache_timer_;
};
/** Contains a thread-safe cache of plans and means of caching and
* invalidating them in a distributed MG. */
class PlanCache {
public:
explicit PlanCache(distributed::PlanDispatcher *plan_dispatcher)
: plan_dispatcher_(plan_dispatcher) {}
/** Finds the CachedPlan for the given hash. If the plan is not found,
* nullptr is retured. If the plan is found, but has expired, it is
* invalidated and nullptr is returned. Otherwise the cached plan is
* returned as a shared ptr. */
std::shared_ptr<CachedPlan> Find(HashType hash);
/** Removes the plan with the given hash from local and remote caches. */
void Remove(HashType hash);
/** Clears all the cached plans. */
void Clear();
/** Inserts the given plan for the given hash into the local and remote
* caches. Returns the cached plan, which might NOT be the same as the given
* `plan` because a concurrent insertion might have happened, only one can
* succeed, but both query executions MUST us the one that has succeeded
* (only for that plan is it guaranteed that workers have the appropriate
* subplans). */
std::shared_ptr<CachedPlan> Insert(HashType hash,
std::shared_ptr<CachedPlan> plan);
private:
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> cache_;
// Optional, only available in a distributed master.
distributed::PlanDispatcher *plan_dispatcher_{nullptr};
// Notifies all the workers that they should clear the cache of all the
// worker parts of the given distributed plan.
void RemoveFromWorkers(const CachedPlan &plan);
};
public:
/**
* Encapsulates all what's necessary for the interpretation of a query into a
* single object that can be pulled (into the given Stream).
* Encapsulates all what's necessary for the interpretation of a query
* into a single object that can be pulled (into the given Stream).
*/
class Results {
friend Interpreter;
Results(Context ctx, std::shared_ptr<CachedPlan> plan,
std::unique_ptr<query::plan::Cursor> cursor,
std::vector<Symbol> output_symbols, std::vector<std::string> header,
std::map<std::string, TypedValue> summary,
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> &plan_cache)
std::map<std::string, TypedValue> summary, PlanCache &plan_cache)
: ctx_(std::move(ctx)),
plan_(plan),
cursor_(std::move(cursor)),
@ -112,12 +151,7 @@ class Interpreter {
stream.Summary(summary_);
if (ctx_.is_index_created_) {
// If index is created we invalidate cache so that we can try to
// generate better plan with that cache.
auto accessor = plan_cache_.access();
for (const auto &cached_plan : accessor) {
accessor.remove(cached_plan.first);
}
plan_cache_.Clear();
}
}
@ -143,10 +177,10 @@ class Interpreter {
utils::Timer execution_timer_;
// Gets invalidated after if an index has been built.
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> &plan_cache_;
Interpreter::PlanCache &plan_cache_;
};
Interpreter() = default;
explicit Interpreter(database::GraphDb &db);
Interpreter(const Interpreter &) = delete;
Interpreter &operator=(const Interpreter &) = delete;
Interpreter(Interpreter &&) = delete;
@ -162,16 +196,19 @@ class Interpreter {
bool in_explicit_transaction);
private:
// stripped query -> CachedPlan
std::shared_ptr<CachedPlan> QueryToPlan(const StrippedQuery &stripped,
Context &ctx);
// stripped query -> high level tree
AstTreeStorage QueryToAst(const StrippedQuery &stripped, Context &ctx);
// high level tree -> (logical plan, plan cost)
// AstTreeStorage and SymbolTable may be modified during planning.
std::pair<std::unique_ptr<plan::LogicalOperator>, double> MakeLogicalPlan(
AstTreeStorage &, const database::GraphDbAccessor &, Context &);
AstTreeStorage &, Context &);
ConcurrentMap<HashType, AstTreeStorage> ast_cache_;
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> plan_cache_;
PlanCache plan_cache_;
std::atomic<int64_t> next_plan_id_{0};
// Antlr has singleton instance that is shared between threads. It is
// protected by locks inside of antlr. Unfortunately, they are not protected

View File

@ -49,7 +49,7 @@ std::string ReadLine(const char *prompt) {
#endif // HAS_READLINE
void query::Repl(database::GraphDb &db) {
query::Interpreter interpeter;
query::Interpreter interpeter{db};
std::cout
<< "Welcome to *Awesome* Memgraph Read Evaluate Print Loop (AM-REPL)"

View File

@ -13,10 +13,11 @@ class ExpansionBenchFixture : public benchmark::Fixture {
// GraphDb shouldn't be global constructed/destructed. See
// documentation in database/graph_db.hpp for details.
std::experimental::optional<database::SingleNode> db_;
query::Interpreter interpeter_;
std::experimental::optional<query::Interpreter> interpreter_;
void SetUp(const benchmark::State &state) override {
db_.emplace();
interpreter_.emplace(db_.value());
database::GraphDbAccessor dba(*db_);
for (int i = 0; i < state.range(0); i++) dba.InsertVertex();
@ -35,8 +36,11 @@ class ExpansionBenchFixture : public benchmark::Fixture {
database::GraphDbAccessor dba(*db_);
for (auto vertex : dba.Vertices(false)) dba.DetachRemoveVertex(vertex);
dba.Commit();
interpreter_ = std::experimental::nullopt;
db_ = std::experimental::nullopt;
}
auto &interpreter() { return *interpreter_; }
};
BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {
@ -44,7 +48,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Match)(benchmark::State &state) {
database::GraphDbAccessor dba(*db_);
while (state.KeepRunning()) {
ResultStreamFaker results;
interpeter_(query, dba, {}, false).PullAll(results);
interpreter()(query, dba, {}, false).PullAll(results);
}
}
@ -58,7 +62,7 @@ BENCHMARK_DEFINE_F(ExpansionBenchFixture, Expand)(benchmark::State &state) {
database::GraphDbAccessor dba(*db_);
while (state.KeepRunning()) {
ResultStreamFaker results;
interpeter_(query, dba, {}, false).PullAll(results);
interpreter()(query, dba, {}, false).PullAll(results);
}
}

View File

@ -31,6 +31,7 @@ class Cluster {
database::Config masterconfig;
masterconfig.master_endpoint = {kLocal, 0};
master_ = std::make_unique<database::Master>(masterconfig);
interpreter_ = std::make_unique<query::Interpreter>(*master_);
std::this_thread::sleep_for(kInitTime);
auto worker_config = [this](int worker_id) {
@ -49,6 +50,7 @@ class Cluster {
}
void Stop() {
interpreter_ = nullptr;
master_ = nullptr;
workers_.clear();
}
@ -61,7 +63,7 @@ class Cluster {
std::map<std::string, query::TypedValue> params = {}) {
database::GraphDbAccessor dba(*master_);
ResultStreamFaker result;
interpreter_(query, dba, params, false).PullAll(result);
interpreter_->operator()(query, dba, params, false).PullAll(result);
dba.Commit();
return result.GetResults();
};
@ -69,7 +71,7 @@ class Cluster {
private:
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;
query::Interpreter interpreter_;
std::unique_ptr<query::Interpreter> interpreter_;
};
void CheckResults(

View File

@ -14,7 +14,7 @@ int main(int argc, char *argv[]) {
database::SingleNode db;
database::GraphDbAccessor dba(db);
ResultStreamFaker results;
query::Interpreter()(argv[1], dba, {}, false).PullAll(results);
query::Interpreter{db}(argv[1], dba, {}, false).PullAll(results);
std::cout << results;
return 0;
}

View File

@ -10,7 +10,7 @@ DECLARE_int32(query_execution_time_sec);
TEST(TransactionTimeout, TransactionTimeout) {
FLAGS_query_execution_time_sec = 3;
database::SingleNode db;
query::Interpreter interpreter;
query::Interpreter interpreter{db};
auto interpret = [&](auto &dba, const std::string &query) {
ResultStreamFaker stream;
interpreter(query, dba, {}, false).PullAll(stream);

View File

@ -117,6 +117,10 @@ TEST_F(DistributedGraphDbTest, DispatchPlan) {
};
check_for_worker(worker(1));
check_for_worker(worker(2));
master().plan_dispatcher().RemovePlan(plan_id);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
EXPECT_DEATH(check_for_worker(worker(1)), "Missing plan*");
}
TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {

View File

@ -1,7 +1,11 @@
#include <chrono>
#include <experimental/optional>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "database/graph_db.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed_common.hpp"
#include "query/interpreter.hpp"
#include "query_common.hpp"
@ -13,15 +17,27 @@ using namespace database;
class DistributedInterpretationTest : public DistributedGraphDbTest {
protected:
void SetUp() override {
DistributedGraphDbTest::SetUp();
interpreter_.emplace(master());
}
void TearDown() override {
interpreter_ = std::experimental::nullopt;
DistributedGraphDbTest::TearDown();
}
auto Run(const std::string &query) {
std::map<std::string, query::TypedValue> params = {};
GraphDbAccessor dba(master());
ResultStreamFaker result;
query::Interpreter interpreter_;
interpreter_(query, dba, params, false).PullAll(result);
interpreter_.value()(query, dba, params, false).PullAll(result);
dba.Commit();
return result.GetResults();
}
private:
std::experimental::optional<query::Interpreter> interpreter_;
};
TEST_F(DistributedInterpretationTest, RemotePullTest) {
@ -219,3 +235,15 @@ TEST_F(TestQueryWaitsOnFutures, Test) {
EXPECT_GT(seconds, 3);
}
}
TEST_F(DistributedInterpretationTest, PlanExpiration) {
FLAGS_query_plan_cache_ttl = 1;
Run("MATCH (n) RETURN n");
auto ids1 = worker(1).plan_consumer().CachedPlanIds();
ASSERT_EQ(ids1.size(), 1);
std::this_thread::sleep_for(std::chrono::milliseconds(1100));
Run("MATCH (n) RETURN n");
auto ids2 = worker(1).plan_consumer().CachedPlanIds();
ASSERT_EQ(ids2.size(), 1);
EXPECT_NE(ids1, ids2);
}

View File

@ -14,8 +14,8 @@
class InterpreterTest : public ::testing::Test {
protected:
query::Interpreter interpreter_;
database::SingleNode db_;
query::Interpreter interpreter_{db_};
ResultStreamFaker Interpret(
const std::string &query,
@ -86,8 +86,6 @@ TEST_F(InterpreterTest, AstCache) {
// Run query with same ast multiple times with different parameters.
TEST_F(InterpreterTest, Parameters) {
query::Interpreter interpreter;
database::SingleNode db;
{
auto stream = Interpret("RETURN $2 + $`a b`", {{"2", 10}, {"a b", 15}});
ASSERT_EQ(stream.GetHeader().size(), 1U);

View File

@ -40,7 +40,7 @@ class QueryExecution : public testing::Test {
* Does NOT commit the transaction */
auto Execute(const std::string &query) {
ResultStreamFaker results;
query::Interpreter()(query, *dba_, {}, false).PullAll(results);
query::Interpreter{*db_}(query, *dba_, {}, false).PullAll(results);
return results.GetResults();
}
};