Cleanup BFS subcursors using transactional cache clean-up

Summary:
Before I wrongly assumed `Shutdown` will always be called on Cursors and
removed BFS subcursors there. Now it is done using transactional cache clean-up
mechanism. There's a separate clean-up for `BfsSubcursorStorage` (for actual
subcursors) and `BfsRpcServer` (for database accessors created by the server).

I've changed `BfsRpcServer` to have a `GraphDbAccessor` per transaction,
instead of per `BfsSubcursor`. Mainly because there is no reliable way to check
if the transaction tied to the accessor has expired as it is not safe to call
`transaction_id` method (since `GraphDbAccessor` is holding only a reference to
`Transaction` object).

Reviewers: teon.banek, mferencevic

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1792
This commit is contained in:
Marin Tomic 2019-01-11 11:34:01 +01:00
parent c0cc661149
commit 8c7b87d8b0
8 changed files with 67 additions and 67 deletions

View File

@ -651,6 +651,10 @@ Master::Master(Config config)
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->updates_server_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->subcursor_storage_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->bfs_subcursor_server_);
}
Master::~Master() {}
@ -1030,6 +1034,10 @@ Worker::Worker(Config config)
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->produce_server_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->subcursor_storage_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->bfs_subcursor_server_);
}
Worker::~Worker() {}

View File

@ -67,20 +67,6 @@ void BfsRpcClients::ResetSubcursors(
}
}
void BfsRpcClients::RemoveBfsSubcursors(
const std::unordered_map<int16_t, int64_t> &subcursor_ids) {
auto futures = coordination_->ExecuteOnWorkers<void>(
db_->WorkerId(), [&subcursor_ids](int worker_id, auto &client) {
client.template Call<RemoveBfsSubcursorRpc>(
subcursor_ids.at(worker_id));
});
subcursor_storage_->Erase(subcursor_ids.at(db_->WorkerId()));
// Wait and get all of the replies.
for (auto &future : futures) {
if (future.valid()) future.get();
}
}
std::experimental::optional<VertexAccessor> BfsRpcClients::Pull(
int16_t worker_id, int64_t subcursor_id, database::GraphDbAccessor *dba) {
if (worker_id == db_->WorkerId()) {

View File

@ -38,9 +38,6 @@ class BfsRpcClients {
void ResetSubcursors(
const std::unordered_map<int16_t, int64_t> &subcursor_ids);
void RemoveBfsSubcursors(
const std::unordered_map<int16_t, int64_t> &subcursor_ids);
std::experimental::optional<VertexAccessor> Pull(
int16_t worker_id, int64_t subcursor_id, database::GraphDbAccessor *dba);

View File

@ -115,10 +115,6 @@ cpp<#
(:request ((subcursor-id :int64_t)))
(:response ()))
(lcp:define-rpc remove-bfs-subcursor
(:request ((member :int64_t)))
(:response ()))
(lcp:define-enum expand-result
(success failure lambda-error)
(:serialize))

View File

@ -21,23 +21,27 @@ class BfsRpcServer {
distributed::Coordination *coordination,
BfsSubcursorStorage *subcursor_storage)
: db_(db), subcursor_storage_(subcursor_storage) {
coordination->Register<CreateBfsSubcursorRpc>(
[this](const auto &req_reader, auto *res_builder) {
CreateBfsSubcursorReq req;
auto ast_storage = std::make_unique<query::AstStorage>();
Load(&req, req_reader, ast_storage.get());
auto db_accessor = db_->Access(req.tx_id);
auto id = subcursor_storage_->Create(
db_accessor.get(), req.direction, req.edge_types,
std::move(req.symbol_table), std::move(ast_storage),
req.filter_lambda, std::move(req.evaluation_context));
{
std::lock_guard<std::mutex> guard(lock_);
db_accessors_[id] = std::move(db_accessor);
}
CreateBfsSubcursorRes res(id);
Save(res, res_builder);
});
coordination->Register<CreateBfsSubcursorRpc>([this](const auto &req_reader,
auto *res_builder) {
CreateBfsSubcursorReq req;
auto ast_storage = std::make_unique<query::AstStorage>();
Load(&req, req_reader, ast_storage.get());
database::GraphDbAccessor *dba;
{
std::lock_guard<std::mutex> guard(lock_);
auto it = db_accessors_.find(req.tx_id);
if (it == db_accessors_.end()) {
it = db_accessors_.emplace(req.tx_id, db_->Access(req.tx_id)).first;
}
dba = it->second.get();
}
auto id = subcursor_storage_->Create(
dba, req.direction, req.edge_types, std::move(req.symbol_table),
std::move(ast_storage), req.filter_lambda,
std::move(req.evaluation_context));
CreateBfsSubcursorRes res(id);
Save(res, res_builder);
});
coordination->Register<RegisterSubcursorsRpc>(
[this](const auto &req_reader, auto *res_builder) {
@ -58,19 +62,6 @@ class BfsRpcServer {
Save(res, res_builder);
});
coordination->Register<RemoveBfsSubcursorRpc>(
[this](const auto &req_reader, auto *res_builder) {
RemoveBfsSubcursorReq req;
Load(&req, req_reader);
{
std::lock_guard<std::mutex> guard(lock_);
db_accessors_.erase(req.member);
}
subcursor_storage_->Erase(req.member);
RemoveBfsSubcursorRes res;
Save(res, res_builder);
});
coordination->Register<SetSourceRpc>(
[this](const auto &req_reader, auto *res_builder) {
SetSourceReq req;
@ -145,11 +136,25 @@ class BfsRpcServer {
});
}
void ClearTransactionalCache(tx::TransactionId oldest_active) {
// It is unlikely this will become a performance issue, but if it does, we
// should store database accessors in a lock-free map.
std::lock_guard<std::mutex> guard(lock_);
for (auto it = db_accessors_.begin(); it != db_accessors_.end();) {
if (it->first < oldest_active) {
it = db_accessors_.erase(it);
} else {
it++;
}
}
}
private:
database::DistributedGraphDb *db_;
std::mutex lock_;
std::map<int64_t, std::unique_ptr<database::GraphDbAccessor>> db_accessors_;
std::map<tx::TransactionId, std::unique_ptr<database::GraphDbAccessor>>
db_accessors_;
BfsSubcursorStorage *subcursor_storage_;
};

View File

@ -30,7 +30,8 @@ ExpandBfsSubcursor::ExpandBfsSubcursor(
evaluation_context_(std::move(evaluation_context)),
frame_(symbol_table_.max_position()),
expression_evaluator_(&frame_, symbol_table_, evaluation_context_, dba_,
query::GraphView::OLD) {
query::GraphView::OLD),
tx_id_(dba->transaction_id()) {
Reset();
}
@ -205,12 +206,6 @@ int64_t BfsSubcursorStorage::Create(
return id;
}
void BfsSubcursorStorage::Erase(int64_t subcursor_id) {
std::lock_guard<std::mutex> lock(mutex_);
auto removed = storage_.erase(subcursor_id);
CHECK(removed == 1) << "Subcursor with ID " << subcursor_id << " not found";
}
ExpandBfsSubcursor *BfsSubcursorStorage::Get(int64_t subcursor_id) {
std::lock_guard<std::mutex> lock(mutex_);
auto it = storage_.find(subcursor_id);
@ -219,4 +214,18 @@ ExpandBfsSubcursor *BfsSubcursorStorage::Get(int64_t subcursor_id) {
return it->second.get();
}
void BfsSubcursorStorage::ClearTransactionalCache(
tx::TransactionId oldest_active) {
// It is unlikely this will become a performance issue, but if it does, we
// should store BFS subcursors in a lock-free map.
std::lock_guard<std::mutex> guard(mutex_);
for (auto it = storage_.begin(); it != storage_.end();) {
if (it->second->tx_id() < oldest_active) {
it = storage_.erase(it);
} else {
it++;
}
}
}
} // namespace distributed

View File

@ -86,6 +86,8 @@ class ExpandBfsSubcursor {
database::GraphDbAccessor *db_accessor() { return dba_; }
tx::TransactionId tx_id() { return tx_id_; }
/// Used to reset subcursor state before starting expansion from new source.
void Reset();
@ -144,6 +146,9 @@ class ExpandBfsSubcursor {
/// Index of the vertex from `to_visit_next_` to return on next pull.
size_t pull_index_;
// Transaction ID used for transactional cache clean-up mechanism.
tx::TransactionId tx_id_;
};
/// Thread-safe storage for BFS subcursors.
@ -158,8 +163,8 @@ class BfsSubcursorStorage {
std::unique_ptr<query::AstStorage> ast_storage,
query::plan::ExpansionLambda filter_lambda,
query::EvaluationContext evaluation_context);
void Erase(int64_t subcursor_id);
ExpandBfsSubcursor *Get(int64_t subcursor_id);
void ClearTransactionalCache(tx::TransactionId oldest_active);
private:
BfsRpcClients *bfs_subcursor_clients_{nullptr};

View File

@ -1207,13 +1207,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
}
}
void Shutdown() override {
input_cursor_->Shutdown();
VLOG(10) << "Removing BFS subcursors";
// TODO: This should be done using the
// `RegisterForTransactionalCacheCleanup` mechanism.
bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_);
}
void Shutdown() override { input_cursor_->Shutdown(); }
void Reset() override {
input_cursor_->Reset();