From 8c7b87d8b0d9b6e7e0f45e516a6f3fd0a84488ea Mon Sep 17 00:00:00 2001 From: Marin Tomic Date: Fri, 11 Jan 2019 11:34:01 +0100 Subject: [PATCH] Cleanup BFS subcursors using transactional cache clean-up Summary: Before I wrongly assumed `Shutdown` will always be called on Cursors and removed BFS subcursors there. Now it is done using transactional cache clean-up mechanism. There's a separate clean-up for `BfsSubcursorStorage` (for actual subcursors) and `BfsRpcServer` (for database accessors created by the server). I've changed `BfsRpcServer` to have a `GraphDbAccessor` per transaction, instead of per `BfsSubcursor`. Mainly because there is no reliable way to check if the transaction tied to the accessor has expired as it is not safe to call `transaction_id` method (since `GraphDbAccessor` is holding only a reference to `Transaction` object). Reviewers: teon.banek, mferencevic Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1792 --- .../distributed/distributed_graph_db.cpp | 8 +++ src/distributed/bfs_rpc_clients.cpp | 14 ---- src/distributed/bfs_rpc_clients.hpp | 3 - src/distributed/bfs_rpc_messages.lcp | 4 -- src/distributed/bfs_rpc_server.hpp | 67 ++++++++++--------- src/distributed/bfs_subcursor.cpp | 23 +++++-- src/distributed/bfs_subcursor.hpp | 7 +- src/query/plan/distributed_ops.cpp | 8 +-- 8 files changed, 67 insertions(+), 67 deletions(-) diff --git a/src/database/distributed/distributed_graph_db.cpp b/src/database/distributed/distributed_graph_db.cpp index 0135b8002..c010a4022 100644 --- a/src/database/distributed/distributed_graph_db.cpp +++ b/src/database/distributed/distributed_graph_db.cpp @@ -651,6 +651,10 @@ Master::Master(Config config) impl_->tx_engine_.RegisterForTransactionalCacheCleanup( impl_->updates_server_); impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_); + impl_->tx_engine_.RegisterForTransactionalCacheCleanup( + impl_->subcursor_storage_); + impl_->tx_engine_.RegisterForTransactionalCacheCleanup( + impl_->bfs_subcursor_server_); } Master::~Master() {} @@ -1030,6 +1034,10 @@ Worker::Worker(Config config) impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_); impl_->tx_engine_.RegisterForTransactionalCacheCleanup( impl_->produce_server_); + impl_->tx_engine_.RegisterForTransactionalCacheCleanup( + impl_->subcursor_storage_); + impl_->tx_engine_.RegisterForTransactionalCacheCleanup( + impl_->bfs_subcursor_server_); } Worker::~Worker() {} diff --git a/src/distributed/bfs_rpc_clients.cpp b/src/distributed/bfs_rpc_clients.cpp index 8c7afa4de..e267f0717 100644 --- a/src/distributed/bfs_rpc_clients.cpp +++ b/src/distributed/bfs_rpc_clients.cpp @@ -67,20 +67,6 @@ void BfsRpcClients::ResetSubcursors( } } -void BfsRpcClients::RemoveBfsSubcursors( - const std::unordered_map &subcursor_ids) { - auto futures = coordination_->ExecuteOnWorkers( - db_->WorkerId(), [&subcursor_ids](int worker_id, auto &client) { - client.template Call( - subcursor_ids.at(worker_id)); - }); - subcursor_storage_->Erase(subcursor_ids.at(db_->WorkerId())); - // Wait and get all of the replies. - for (auto &future : futures) { - if (future.valid()) future.get(); - } -} - std::experimental::optional BfsRpcClients::Pull( int16_t worker_id, int64_t subcursor_id, database::GraphDbAccessor *dba) { if (worker_id == db_->WorkerId()) { diff --git a/src/distributed/bfs_rpc_clients.hpp b/src/distributed/bfs_rpc_clients.hpp index 0f89e4050..ce083e678 100644 --- a/src/distributed/bfs_rpc_clients.hpp +++ b/src/distributed/bfs_rpc_clients.hpp @@ -38,9 +38,6 @@ class BfsRpcClients { void ResetSubcursors( const std::unordered_map &subcursor_ids); - void RemoveBfsSubcursors( - const std::unordered_map &subcursor_ids); - std::experimental::optional Pull( int16_t worker_id, int64_t subcursor_id, database::GraphDbAccessor *dba); diff --git a/src/distributed/bfs_rpc_messages.lcp b/src/distributed/bfs_rpc_messages.lcp index 844a17a13..d0275afac 100644 --- a/src/distributed/bfs_rpc_messages.lcp +++ b/src/distributed/bfs_rpc_messages.lcp @@ -115,10 +115,6 @@ cpp<# (:request ((subcursor-id :int64_t))) (:response ())) -(lcp:define-rpc remove-bfs-subcursor - (:request ((member :int64_t))) - (:response ())) - (lcp:define-enum expand-result (success failure lambda-error) (:serialize)) diff --git a/src/distributed/bfs_rpc_server.hpp b/src/distributed/bfs_rpc_server.hpp index 946c79087..5fb6e77dd 100644 --- a/src/distributed/bfs_rpc_server.hpp +++ b/src/distributed/bfs_rpc_server.hpp @@ -21,23 +21,27 @@ class BfsRpcServer { distributed::Coordination *coordination, BfsSubcursorStorage *subcursor_storage) : db_(db), subcursor_storage_(subcursor_storage) { - coordination->Register( - [this](const auto &req_reader, auto *res_builder) { - CreateBfsSubcursorReq req; - auto ast_storage = std::make_unique(); - Load(&req, req_reader, ast_storage.get()); - auto db_accessor = db_->Access(req.tx_id); - auto id = subcursor_storage_->Create( - db_accessor.get(), req.direction, req.edge_types, - std::move(req.symbol_table), std::move(ast_storage), - req.filter_lambda, std::move(req.evaluation_context)); - { - std::lock_guard guard(lock_); - db_accessors_[id] = std::move(db_accessor); - } - CreateBfsSubcursorRes res(id); - Save(res, res_builder); - }); + coordination->Register([this](const auto &req_reader, + auto *res_builder) { + CreateBfsSubcursorReq req; + auto ast_storage = std::make_unique(); + Load(&req, req_reader, ast_storage.get()); + database::GraphDbAccessor *dba; + { + std::lock_guard guard(lock_); + auto it = db_accessors_.find(req.tx_id); + if (it == db_accessors_.end()) { + it = db_accessors_.emplace(req.tx_id, db_->Access(req.tx_id)).first; + } + dba = it->second.get(); + } + auto id = subcursor_storage_->Create( + dba, req.direction, req.edge_types, std::move(req.symbol_table), + std::move(ast_storage), req.filter_lambda, + std::move(req.evaluation_context)); + CreateBfsSubcursorRes res(id); + Save(res, res_builder); + }); coordination->Register( [this](const auto &req_reader, auto *res_builder) { @@ -58,19 +62,6 @@ class BfsRpcServer { Save(res, res_builder); }); - coordination->Register( - [this](const auto &req_reader, auto *res_builder) { - RemoveBfsSubcursorReq req; - Load(&req, req_reader); - { - std::lock_guard guard(lock_); - db_accessors_.erase(req.member); - } - subcursor_storage_->Erase(req.member); - RemoveBfsSubcursorRes res; - Save(res, res_builder); - }); - coordination->Register( [this](const auto &req_reader, auto *res_builder) { SetSourceReq req; @@ -145,11 +136,25 @@ class BfsRpcServer { }); } + void ClearTransactionalCache(tx::TransactionId oldest_active) { + // It is unlikely this will become a performance issue, but if it does, we + // should store database accessors in a lock-free map. + std::lock_guard guard(lock_); + for (auto it = db_accessors_.begin(); it != db_accessors_.end();) { + if (it->first < oldest_active) { + it = db_accessors_.erase(it); + } else { + it++; + } + } + } + private: database::DistributedGraphDb *db_; std::mutex lock_; - std::map> db_accessors_; + std::map> + db_accessors_; BfsSubcursorStorage *subcursor_storage_; }; diff --git a/src/distributed/bfs_subcursor.cpp b/src/distributed/bfs_subcursor.cpp index 6a30379f6..f3db42bb8 100644 --- a/src/distributed/bfs_subcursor.cpp +++ b/src/distributed/bfs_subcursor.cpp @@ -30,7 +30,8 @@ ExpandBfsSubcursor::ExpandBfsSubcursor( evaluation_context_(std::move(evaluation_context)), frame_(symbol_table_.max_position()), expression_evaluator_(&frame_, symbol_table_, evaluation_context_, dba_, - query::GraphView::OLD) { + query::GraphView::OLD), + tx_id_(dba->transaction_id()) { Reset(); } @@ -205,12 +206,6 @@ int64_t BfsSubcursorStorage::Create( return id; } -void BfsSubcursorStorage::Erase(int64_t subcursor_id) { - std::lock_guard lock(mutex_); - auto removed = storage_.erase(subcursor_id); - CHECK(removed == 1) << "Subcursor with ID " << subcursor_id << " not found"; -} - ExpandBfsSubcursor *BfsSubcursorStorage::Get(int64_t subcursor_id) { std::lock_guard lock(mutex_); auto it = storage_.find(subcursor_id); @@ -219,4 +214,18 @@ ExpandBfsSubcursor *BfsSubcursorStorage::Get(int64_t subcursor_id) { return it->second.get(); } +void BfsSubcursorStorage::ClearTransactionalCache( + tx::TransactionId oldest_active) { + // It is unlikely this will become a performance issue, but if it does, we + // should store BFS subcursors in a lock-free map. + std::lock_guard guard(mutex_); + for (auto it = storage_.begin(); it != storage_.end();) { + if (it->second->tx_id() < oldest_active) { + it = storage_.erase(it); + } else { + it++; + } + } +} + } // namespace distributed diff --git a/src/distributed/bfs_subcursor.hpp b/src/distributed/bfs_subcursor.hpp index 1ff49fc1f..aa4ea0a24 100644 --- a/src/distributed/bfs_subcursor.hpp +++ b/src/distributed/bfs_subcursor.hpp @@ -86,6 +86,8 @@ class ExpandBfsSubcursor { database::GraphDbAccessor *db_accessor() { return dba_; } + tx::TransactionId tx_id() { return tx_id_; } + /// Used to reset subcursor state before starting expansion from new source. void Reset(); @@ -144,6 +146,9 @@ class ExpandBfsSubcursor { /// Index of the vertex from `to_visit_next_` to return on next pull. size_t pull_index_; + + // Transaction ID used for transactional cache clean-up mechanism. + tx::TransactionId tx_id_; }; /// Thread-safe storage for BFS subcursors. @@ -158,8 +163,8 @@ class BfsSubcursorStorage { std::unique_ptr ast_storage, query::plan::ExpansionLambda filter_lambda, query::EvaluationContext evaluation_context); - void Erase(int64_t subcursor_id); ExpandBfsSubcursor *Get(int64_t subcursor_id); + void ClearTransactionalCache(tx::TransactionId oldest_active); private: BfsRpcClients *bfs_subcursor_clients_{nullptr}; diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index 8692c5d56..2cf1e088d 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -1207,13 +1207,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { } } - void Shutdown() override { - input_cursor_->Shutdown(); - VLOG(10) << "Removing BFS subcursors"; - // TODO: This should be done using the - // `RegisterForTransactionalCacheCleanup` mechanism. - bfs_subcursor_clients_->RemoveBfsSubcursors(subcursor_ids_); - } + void Shutdown() override { input_cursor_->Shutdown(); } void Reset() override { input_cursor_->Reset();