Fix remote plan invalidation

Summary:
Ensures that query plans are invalidated on the remote only when it's
guaranteed they will never be used on the master. This is done by
invalidating remote caches in the `CachedPlan` destructor.

There are two unplesant side-effects. First, an RPC call is made in an
object destructor. This is somewhat ugly, but not that different then
making an RPC call that must succeed in any other function. Note that
this does NOT slow down any query execution because the relevant
destructor is called by the skiplist garbage collector. The second ugly
side-effect is that in the unit test now we need to sleep to ensure the
skiplist GC destructs a cached plan before checking that it's
invalidated on the remote worker.

We might want to redesign this at some point.

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1302
This commit is contained in:
florijan 2018-03-15 15:00:43 +01:00
parent 83302e2c69
commit 95f1b6fe56
5 changed files with 97 additions and 122 deletions

View File

@ -23,7 +23,7 @@ PlanConsumer::PlanPack &PlanConsumer::PlanForId(int64_t plan_id) const {
auto accessor = plan_cache_.access();
auto found = accessor.find(plan_id);
CHECK(found != accessor.end())
<< "Missing plan and symbol table for plan id!";
<< "Missing plan and symbol table for plan id: " << plan_id;
return *found->second;
}

View File

@ -20,68 +20,35 @@ DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60,
namespace query {
std::shared_ptr<Interpreter::CachedPlan> Interpreter::PlanCache::Find(
HashType hash) {
auto accessor = cache_.access();
auto it = accessor.find(hash);
if (it == accessor.end()) return nullptr;
if (!it->second->IsExpired()) return it->second;
Remove(hash);
return nullptr;
}
std::shared_ptr<Interpreter::CachedPlan> Interpreter::PlanCache::Insert(
HashType hash, std::shared_ptr<CachedPlan> plan) {
// Dispatch plans to workers (if we have any for them).
Interpreter::CachedPlan::CachedPlan(
plan::DistributedPlan distributed_plan, double cost,
distributed::PlanDispatcher *plan_dispatcher)
: distributed_plan_(std::move(distributed_plan)),
cost_(cost),
plan_dispatcher_(plan_dispatcher) {
if (plan_dispatcher_) {
for (const auto &plan_pair : plan->distributed_plan().worker_plans) {
for (const auto &plan_pair : distributed_plan_.worker_plans) {
const auto &plan_id = plan_pair.first;
const auto &worker_plan = plan_pair.second;
plan_dispatcher_->DispatchPlan(plan_id, worker_plan,
plan->symbol_table());
distributed_plan_.symbol_table);
}
}
auto access = cache_.access();
auto insertion = access.insert(hash, plan);
// If the same plan was already cached, invalidate the above dispatched
// plans on the workers.
if (!insertion.second && plan_dispatcher_) {
RemoveFromWorkers(*plan);
}
return insertion.first->second;
}
void Interpreter::PlanCache::Remove(HashType hash) {
auto access = cache_.access();
auto found = access.find(hash);
if (found == access.end()) return;
RemoveFromWorkers(*found->second);
cache_.access().remove(hash);
}
void Interpreter::PlanCache::Clear() {
auto access = cache_.access();
for (auto &kv : access) {
RemoveFromWorkers(*kv.second);
access.remove(kv.first);
}
}
void Interpreter::PlanCache::RemoveFromWorkers(const CachedPlan &plan) {
for (const auto &plan_pair : plan.distributed_plan().worker_plans) {
const auto &plan_id = plan_pair.first;
plan_dispatcher_->RemovePlan(plan_id);
Interpreter::CachedPlan::~CachedPlan() {
if (plan_dispatcher_) {
for (const auto &plan_pair : distributed_plan_.worker_plans) {
const auto &plan_id = plan_pair.first;
plan_dispatcher_->RemovePlan(plan_id);
}
}
}
Interpreter::Interpreter(database::GraphDb &db)
: plan_cache_(db.type() == database::GraphDb::Type::DISTRIBUTED_MASTER
? &db.plan_dispatcher()
: nullptr) {}
: plan_dispatcher_(db.type() == database::GraphDb::Type::DISTRIBUTED_MASTER
? &db.plan_dispatcher()
: nullptr) {}
Interpreter::Results Interpreter::operator()(
const std::string &query, database::GraphDbAccessor &db_accessor,
@ -110,10 +77,19 @@ Interpreter::Results Interpreter::operator()(
// Try to get a cached plan. Note that this local shared_ptr might be the only
// owner of the CachedPlan, so ensure it lives during the whole
// interpretation.
std::shared_ptr<CachedPlan> plan = plan_cache_.Find(stripped.hash());
std::shared_ptr<CachedPlan> plan{nullptr};
auto plan_cache_access = plan_cache_.access();
auto it = plan_cache_access.find(stripped.hash());
if (it != plan_cache_access.end()) {
if (it->second->IsExpired())
plan_cache_access.remove(stripped.hash());
else
plan = it->second;
}
utils::Timer planning_timer;
if (!plan) {
plan = plan_cache_.Insert(stripped.hash(), QueryToPlan(stripped, ctx));
plan = plan_cache_access.insert(stripped.hash(), QueryToPlan(stripped, ctx))
.first->second;
}
auto planning_time = planning_timer.Elapsed();
@ -165,11 +141,16 @@ std::shared_ptr<Interpreter::CachedPlan> Interpreter::QueryToPlan(
auto distributed_plan = MakeDistributedPlan(
*tmp_logical_plan, ctx.symbol_table_, next_plan_id_);
return std::make_shared<CachedPlan>(std::move(distributed_plan),
query_plan_cost_estimation);
query_plan_cost_estimation,
plan_dispatcher_);
} else {
return std::make_shared<CachedPlan>(
std::move(tmp_logical_plan), query_plan_cost_estimation,
ctx.symbol_table_, std::move(ast_storage));
plan::DistributedPlan{0,
std::move(tmp_logical_plan),
{},
std::move(ast_storage),
ctx.symbol_table_},
query_plan_cost_estimation, plan_dispatcher_);
}
}

View File

@ -24,19 +24,16 @@ namespace query {
class Interpreter {
private:
/// Encapsulates a plan for caching. Takes care of remote (worker) cache
/// updating in distributed memgraph.
class CachedPlan {
public:
explicit CachedPlan(plan::DistributedPlan distributed_plan, double cost)
: distributed_plan_(std::move(distributed_plan)), cost_(cost) {}
/// Creates a cached plan and sends it to all the workers.
CachedPlan(plan::DistributedPlan distributed_plan, double cost,
distributed::PlanDispatcher *plan_dispatcher);
CachedPlan(std::unique_ptr<plan::LogicalOperator> plan, double cost,
SymbolTable symbol_table, AstTreeStorage storage)
: distributed_plan_{0,
std::move(plan),
{},
std::move(storage),
symbol_table},
cost_(cost) {}
/// Removes the cached plan from all the workers.
~CachedPlan();
const auto &plan() const { return *distributed_plan_.master_plan; }
const auto &distributed_plan() const { return distributed_plan_; }
@ -52,45 +49,13 @@ class Interpreter {
plan::DistributedPlan distributed_plan_;
double cost_;
utils::Timer cache_timer_;
};
/** Contains a thread-safe cache of plans and means of caching and
* invalidating them in a distributed MG. */
class PlanCache {
public:
explicit PlanCache(distributed::PlanDispatcher *plan_dispatcher)
: plan_dispatcher_(plan_dispatcher) {}
/** Finds the CachedPlan for the given hash. If the plan is not found,
* nullptr is retured. If the plan is found, but has expired, it is
* invalidated and nullptr is returned. Otherwise the cached plan is
* returned as a shared ptr. */
std::shared_ptr<CachedPlan> Find(HashType hash);
/** Removes the plan with the given hash from local and remote caches. */
void Remove(HashType hash);
/** Clears all the cached plans. */
void Clear();
/** Inserts the given plan for the given hash into the local and remote
* caches. Returns the cached plan, which might NOT be the same as the given
* `plan` because a concurrent insertion might have happened, only one can
* succeed, but both query executions MUST us the one that has succeeded
* (only for that plan is it guaranteed that workers have the appropriate
* subplans). */
std::shared_ptr<CachedPlan> Insert(HashType hash,
std::shared_ptr<CachedPlan> plan);
private:
ConcurrentMap<HashType, std::shared_ptr<CachedPlan>> cache_;
// Optional, only available in a distributed master.
distributed::PlanDispatcher *plan_dispatcher_{nullptr};
// Notifies all the workers that they should clear the cache of all the
// worker parts of the given distributed plan.
void RemoveFromWorkers(const CachedPlan &plan);
};
using PlanCacheT = ConcurrentMap<HashType, std::shared_ptr<CachedPlan>>;
public:
/**
* Encapsulates all what's necessary for the interpretation of a query
@ -101,7 +66,7 @@ class Interpreter {
Results(Context ctx, std::shared_ptr<CachedPlan> plan,
std::unique_ptr<query::plan::Cursor> cursor,
std::vector<Symbol> output_symbols, std::vector<std::string> header,
std::map<std::string, TypedValue> summary, PlanCache &plan_cache)
std::map<std::string, TypedValue> summary, PlanCacheT &plan_cache)
: ctx_(std::move(ctx)),
plan_(plan),
cursor_(std::move(cursor)),
@ -151,7 +116,10 @@ class Interpreter {
stream.Summary(summary_);
if (ctx_.is_index_created_) {
plan_cache_.Clear();
auto access = plan_cache_.access();
for (auto &kv : access) {
access.remove(kv.first);
}
}
}
@ -177,7 +145,7 @@ class Interpreter {
utils::Timer execution_timer_;
// Gets invalidated after if an index has been built.
Interpreter::PlanCache &plan_cache_;
PlanCacheT &plan_cache_;
};
explicit Interpreter(database::GraphDb &db);
@ -196,6 +164,20 @@ class Interpreter {
bool in_explicit_transaction);
private:
ConcurrentMap<HashType, AstTreeStorage> ast_cache_;
PlanCacheT plan_cache_;
std::atomic<int64_t> next_plan_id_{0};
// Antlr has singleton instance that is shared between threads. It is
// protected by locks inside of antlr. Unfortunately, they are not protected
// in a very good way. Once we have antlr version without race conditions we
// can remove this lock. This will probably never happen since antlr
// developers introduce more bugs in each version. Fortunately, we have cache
// so this lock probably won't impact performance much...
SpinLock antlr_lock_;
// Optional, not null only in a distributed master.
distributed::PlanDispatcher *plan_dispatcher_{nullptr};
// stripped query -> CachedPlan
std::shared_ptr<CachedPlan> QueryToPlan(const StrippedQuery &stripped,
Context &ctx);
@ -206,17 +188,6 @@ class Interpreter {
// AstTreeStorage and SymbolTable may be modified during planning.
std::pair<std::unique_ptr<plan::LogicalOperator>, double> MakeLogicalPlan(
AstTreeStorage &, Context &);
ConcurrentMap<HashType, AstTreeStorage> ast_cache_;
PlanCache plan_cache_;
std::atomic<int64_t> next_plan_id_{0};
// Antlr has singleton instance that is shared between threads. It is
// protected by locks inside of antlr. Unfortunately, they are not protected
// in a very good way. Once we have antlr version without race conditions we
// can remove this lock. This will probably never happen since antlr
// developers introduce more bugs in each version. Fortunately, we have cache
// so this lock probably won't impact performance much...
SpinLock antlr_lock_;
};
} // namespace query

View File

@ -15,13 +15,6 @@ int main(int argc, char *argv[]) {
Cluster cluster(5);
// static thread_local std::mt19937 rand_dev{std::random_device{}()};
// static thread_local std::uniform_int_distribution<> int_dist;
// auto rint = [&rand_dev, &int_dist](int upper) {
// return int_dist(rand_dev) % upper;
// };
cluster.Execute("CREATE INDEX ON :Card(id)");
cluster.Execute("CREATE INDEX ON :Transaction(id)");
cluster.Execute("CREATE INDEX ON :Pos(id)");
@ -64,7 +57,7 @@ int main(int argc, char *argv[]) {
--i;
}
if (i > 0 && i % 200 == 0)
LOG(INFO) << "Created " << i << " transacitons";
LOG(INFO) << "Created " << i << " transactions";
}
};

View File

@ -12,6 +12,10 @@
#include "query_plan_common.hpp"
#include "utils/timer.hpp"
// We use this to ensure a cached plan is removed from the concurrent map and
// properly destructed.
DECLARE_int32(skiplist_gc_interval);
using namespace distributed;
using namespace database;
@ -241,9 +245,35 @@ TEST_F(DistributedInterpretationTest, PlanExpiration) {
Run("MATCH (n) RETURN n");
auto ids1 = worker(1).plan_consumer().CachedPlanIds();
ASSERT_EQ(ids1.size(), 1);
// Sleep so the cached plan becomes invalid.
std::this_thread::sleep_for(std::chrono::milliseconds(1100));
Run("MATCH (n) RETURN n");
// Sleep so the invalidated plan (removed from cache which is a concurrent
// map) gets destructed and thus remote caches cleared.
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
auto ids2 = worker(1).plan_consumer().CachedPlanIds();
ASSERT_EQ(ids2.size(), 1);
EXPECT_NE(ids1, ids2);
}
TEST_F(DistributedInterpretationTest, ConcurrentPlanExpiration) {
FLAGS_query_plan_cache_ttl = 1;
auto count_vertices = [this]() {
utils::Timer timer;
while (timer.Elapsed() < 3s) {
Run("MATCH () RETURN count(1)");
}
};
std::vector<std::thread> counters;
for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
counters.emplace_back(count_vertices);
for (auto &t : counters) t.join();
}
int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
::testing::InitGoogleTest(&argc, argv);
gflags::ParseCommandLineFlags(&argc, &argv, true);
FLAGS_skiplist_gc_interval = 1;
return RUN_ALL_TESTS();
}