From 9721ccf61c3fc239b226edaf1227a72829f0b976 Mon Sep 17 00:00:00 2001 From: florijan Date: Thu, 15 Feb 2018 10:47:50 +0100 Subject: [PATCH] Cleanup per-transaction caches in distributed Summary: On the master cleanups are hooked directly into the transaction engine. This is beneficial because the master might have bigger caches and we want to clear them as soon as possible. On the workers there is a periodic RPC call to the master about living transactions, which takes care of releasing local caches. This is suboptimal because long transactions will prevent cache GC (like with data GC). It is however fairly simple. Note that all cleanup is not done automatically and `RemotePull` has been reduced accordingly. @msantl, please verify correctness and consider if the code can be additionally simplified. Reviewers: teon.banek, msantl Reviewed By: msantl Subscribers: pullbot, msantl Differential Revision: https://phabricator.memgraph.io/D1202 --- src/CMakeLists.txt | 1 + src/communication/rpc/messages-inl.hpp | 4 -- src/database/graph_db.cpp | 16 ++++--- src/distributed/remote_data_manager.hpp | 21 ++++++++- src/distributed/remote_data_rpc_server.hpp | 3 -- src/distributed/remote_produce_rpc_server.hpp | 32 ++++++++----- .../remote_pull_produce_rpc_messages.hpp | 6 --- src/distributed/remote_pull_rpc_clients.hpp | 24 ---------- .../remote_updates_rpc_clients.hpp | 6 --- .../remote_updates_rpc_messages.hpp | 6 --- src/distributed/remote_updates_rpc_server.hpp | 23 ++++------ src/query/plan/operator.cpp | 26 +---------- src/query/plan/operator.hpp | 2 - src/transactions/engine.cpp | 29 ++++++++++++ src/transactions/engine.hpp | 24 +++++++++- src/transactions/engine_rpc_messages.hpp | 2 +- src/transactions/engine_single_node.cpp | 32 ++++++++----- src/transactions/engine_worker.cpp | 36 ++++++++++++++- src/transactions/engine_worker.hpp | 12 +++++ src/transactions/tx_end_listener.hpp | 37 +++++++++++++++ tests/unit/distributed_graph_db.cpp | 5 -- tests/unit/distributed_updates.cpp | 6 --- tests/unit/transaction_engine_distributed.cpp | 23 ++++++++++ tests/unit/transaction_engine_single_node.cpp | 46 ++++++++++++++----- 24 files changed, 275 insertions(+), 147 deletions(-) create mode 100644 src/transactions/engine.cpp create mode 100644 src/transactions/tx_end_listener.hpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 766b1017e..5e13a629f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -49,6 +49,7 @@ set(memgraph_src_files storage/record_accessor.cpp storage/vertex_accessor.cpp threading/thread.cpp + transactions/engine.cpp transactions/engine_master.cpp transactions/engine_single_node.cpp transactions/engine_worker.cpp diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 1b41e88e2..5b8367135 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -70,8 +70,6 @@ BOOST_CLASS_EXPORT(distributed::ConsumePlanRes); // Remote pull. BOOST_CLASS_EXPORT(distributed::RemotePullReq); BOOST_CLASS_EXPORT(distributed::RemotePullRes); -BOOST_CLASS_EXPORT(distributed::EndRemotePullReq); -BOOST_CLASS_EXPORT(distributed::EndRemotePullRes); BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedReq); BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedRes); @@ -92,5 +90,3 @@ BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq); BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes); BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq); BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes); -BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardReq); -BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardRes); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 39ef7e299..7248e0fe9 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -169,9 +169,11 @@ class Master : public PrivateBase { distributed::RemotePullRpcClients remote_pull_clients_{coordination_}; distributed::RpcWorkerClients index_rpc_clients_{coordination_, distributed::kIndexRpcName}; - distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_}; + distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_, + system_}; distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; - distributed::RemoteDataManager remote_data_manager_{remote_data_clients_}; + distributed::RemoteDataManager remote_data_manager_{tx_engine_, + remote_data_clients_}; }; class Worker : public PrivateBase { @@ -200,12 +202,14 @@ class Worker : public PrivateBase { distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcClients remote_data_clients_{coordination_}; distributed::PlanConsumer plan_consumer_{system_}; - distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_, - plan_consumer_}; + distributed::RemoteProduceRpcServer remote_produce_server_{ + *this, tx_engine_, system_, plan_consumer_}; distributed::IndexRpcServer index_rpc_server_{*this, system_}; - distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_}; + distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_, + system_}; distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; - distributed::RemoteDataManager remote_data_manager_{remote_data_clients_}; + distributed::RemoteDataManager remote_data_manager_{tx_engine_, + remote_data_clients_}; }; #undef IMPL_GETTERS diff --git a/src/distributed/remote_data_manager.hpp b/src/distributed/remote_data_manager.hpp index 20ec4fe57..026295e4e 100644 --- a/src/distributed/remote_data_manager.hpp +++ b/src/distributed/remote_data_manager.hpp @@ -7,6 +7,7 @@ #include "storage/edge.hpp" #include "storage/vertex.hpp" #include "threading/sync/spinlock.hpp" +#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -24,8 +25,9 @@ class RemoteDataManager { } public: - RemoteDataManager(distributed::RemoteDataRpcClients &remote_data_clients) - : remote_data_clients_(remote_data_clients) {} + RemoteDataManager(tx::Engine &tx_engine, + distributed::RemoteDataRpcClients &remote_data_clients) + : remote_data_clients_(remote_data_clients), tx_engine_(tx_engine) {} /// Gets or creates the remote vertex cache for the given transaction. auto &Vertices(tx::transaction_id_t tx_id) { @@ -53,6 +55,21 @@ class RemoteDataManager { std::unordered_map> vertices_caches_; std::unordered_map> edges_caches_; + + tx::Engine &tx_engine_; + tx::TxEndListener tx_end_listener_{ + tx_engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }}; + + // Clears the caches for the given transaction ID. + void ClearCache(tx::transaction_id_t tx_id) { + std::lock_guard guard{lock_}; + auto remove = [tx_id](auto &map) { + auto found = map.find(tx_id); + if (found != map.end()) map.erase(found); + }; + remove(vertices_caches_); + remove(edges_caches_); + } }; template <> diff --git a/src/distributed/remote_data_rpc_server.hpp b/src/distributed/remote_data_rpc_server.hpp index c4990f119..eca3be269 100644 --- a/src/distributed/remote_data_rpc_server.hpp +++ b/src/distributed/remote_data_rpc_server.hpp @@ -12,9 +12,6 @@ namespace distributed { /** Serves this worker's data to others. */ class RemoteDataRpcServer { - // TODO maybe reuse GraphDbAccessors. It would reduce the load on tx::Engine - // locks (not sure what the gain would be). But have some way of cache - // invalidation. public: RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system) : db_(db), rpc_server_(system, kRemoteDataRpcName) { diff --git a/src/distributed/remote_produce_rpc_server.hpp b/src/distributed/remote_produce_rpc_server.hpp index d691af58d..564e1dbcb 100644 --- a/src/distributed/remote_produce_rpc_server.hpp +++ b/src/distributed/remote_produce_rpc_server.hpp @@ -20,6 +20,8 @@ #include "query/parameters.hpp" #include "query/plan/operator.hpp" #include "query/typed_value.hpp" +#include "transactions/engine.hpp" +#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -129,26 +131,18 @@ class RemoteProduceRpcServer { }; public: - RemoteProduceRpcServer(database::GraphDb &db, + RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &engine, communication::rpc::System &system, const distributed::PlanConsumer &plan_consumer) : db_(db), remote_produce_rpc_server_(system, kRemotePullProduceRpcName), - plan_consumer_(plan_consumer) { + plan_consumer_(plan_consumer), + engine_(engine) { remote_produce_rpc_server_.Register( [this](const RemotePullReq &req) { return std::make_unique(RemotePull(req)); }); - remote_produce_rpc_server_.Register([this]( - const EndRemotePullReq &req) { - std::lock_guard guard{ongoing_produces_lock_}; - auto it = ongoing_produces_.find(req.member); - CHECK(it != ongoing_produces_.end()) << "Failed to find ongoing produce"; - ongoing_produces_.erase(it); - return std::make_unique(); - }); - remote_produce_rpc_server_.Register( [this](const TransactionCommandAdvancedReq &req) { db_.tx_engine().UpdateCommand(req.member); @@ -166,6 +160,22 @@ class RemoteProduceRpcServer { ongoing_produces_; std::mutex ongoing_produces_lock_; + tx::Engine &engine_; + tx::TxEndListener tx_end_listener_{ + engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }}; + + // Removes all onging pulls for the given tx_id (that transaction expired). + void ClearCache(tx::transaction_id_t tx_id) { + std::lock_guard guard{ongoing_produces_lock_}; + for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) { + if (it->first.first == tx_id) { + it = ongoing_produces_.erase(it); + } else { + ++it; + } + } + } + auto &GetOngoingProduce(const RemotePullReq &req) { std::lock_guard guard{ongoing_produces_lock_}; auto found = ongoing_produces_.find({req.tx_id, req.plan_id}); diff --git a/src/distributed/remote_pull_produce_rpc_messages.hpp b/src/distributed/remote_pull_produce_rpc_messages.hpp index 9eeef2a44..02630e570 100644 --- a/src/distributed/remote_pull_produce_rpc_messages.hpp +++ b/src/distributed/remote_pull_produce_rpc_messages.hpp @@ -364,12 +364,6 @@ using RemotePullRpc = // optimization not to have to send the full RemotePullReqData pack every // time. -using EndRemotePullData = std::pair; -RPC_SINGLE_MEMBER_MESSAGE(EndRemotePullReq, EndRemotePullData); -RPC_NO_MEMBER_MESSAGE(EndRemotePullRes); -using EndRemotePullRpc = - communication::rpc::RequestResponse; - RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t); RPC_NO_MEMBER_MESSAGE(TransactionCommandAdvancedRes); using TransactionCommandAdvancedRpc = diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index 048870dbd..c01b26736 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -87,30 +87,6 @@ class RemotePullRpcClients { auto GetWorkerIds() { return clients_.GetWorkerIds(); } - // Notifies a worker that the given transaction/plan is done. Otherwise the - // server is left with potentially unconsumed Cursors that never get deleted. - // - // @todo - this needs to be done with hooks into the transactional - // engine, so that the Worker discards it's stuff when the relevant - // transaction are done. - std::future EndRemotePull(int worker_id, tx::transaction_id_t tx_id, - int64_t plan_id) { - return clients_.ExecuteOnWorker( - worker_id, [tx_id, plan_id](ClientPool &client_pool) { - return client_pool.Call( - EndRemotePullData{tx_id, plan_id}); - }); - } - - void EndAllRemotePulls(tx::transaction_id_t tx_id, int64_t plan_id) { - std::vector> futures; - for (auto worker_id : clients_.GetWorkerIds()) { - if (worker_id == 0) continue; - futures.emplace_back(EndRemotePull(worker_id, tx_id, plan_id)); - } - for (auto &future : futures) future.wait(); - } - std::vector> NotifyAllTransactionCommandAdvanced( tx::transaction_id_t tx_id) { return clients_.ExecuteOnWorkers(0, [tx_id](auto &client) { diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp index fee2a1438..f54236d8e 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -43,12 +43,6 @@ class RemoteUpdatesRpcClients { }); } - /// Calls for the worker with the given ID to discard remote updates. - void RemoteUpdateDiscard(int worker_id, tx::transaction_id_t tx_id) { - worker_clients_.GetClientPool(worker_id).Call( - tx_id); - } - private: RpcWorkerClients worker_clients_; }; diff --git a/src/distributed/remote_updates_rpc_messages.hpp b/src/distributed/remote_updates_rpc_messages.hpp index 53a18cdcd..451bebd24 100644 --- a/src/distributed/remote_updates_rpc_messages.hpp +++ b/src/distributed/remote_updates_rpc_messages.hpp @@ -26,10 +26,4 @@ RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult); using RemoteUpdateApplyRpc = communication::rpc::RequestResponse; - -RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateDiscardReq, tx::transaction_id_t); -RPC_NO_MEMBER_MESSAGE(RemoteUpdateDiscardRes); -using RemoteUpdateDiscardRpc = - communication::rpc::RequestResponse; } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/remote_updates_rpc_server.hpp index e0c0487ff..309413168 100644 --- a/src/distributed/remote_updates_rpc_server.hpp +++ b/src/distributed/remote_updates_rpc_server.hpp @@ -19,6 +19,7 @@ #include "storage/vertex_accessor.hpp" #include "threading/sync/lock_timeout_exception.hpp" #include "threading/sync/spinlock.hpp" +#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -121,9 +122,9 @@ class RemoteUpdatesRpcServer { }; public: - RemoteUpdatesRpcServer(database::GraphDb &db, + RemoteUpdatesRpcServer(database::GraphDb &db, tx::Engine &engine, communication::rpc::System &system) - : db_(db), server_(system, kRemoteUpdatesRpc) { + : db_(db), engine_(engine), server_(system, kRemoteUpdatesRpc) { server_.Register([this](const RemoteUpdateReq &req) { using DeltaType = database::StateDelta::Type; switch (req.member.type) { @@ -145,12 +146,6 @@ class RemoteUpdatesRpcServer { [this](const RemoteUpdateApplyReq &req) { return std::make_unique(Apply(req.member)); }); - - server_.Register( - [this](const RemoteUpdateDiscardReq &req) { - Discard(req.member); - return std::make_unique(); - }); } /// Applies all existsing updates for the given transaction ID. If there are @@ -175,19 +170,19 @@ class RemoteUpdatesRpcServer { return RemoteUpdateResult::DONE; } - /// Discards all the existing updates for the given transaction ID. - void Discard(tx::transaction_id_t tx_id) { - vertex_updates_.access().remove(tx_id); - edge_updates_.access().remove(tx_id); - } - private: database::GraphDb &db_; + tx::Engine &engine_; communication::rpc::Server server_; ConcurrentMap> vertex_updates_; ConcurrentMap> edge_updates_; + tx::TxEndListener tx_end_listener_{engine_, + [this](tx::transaction_id_t tx_id) { + vertex_updates_.access().remove(tx_id); + edge_updates_.access().remove(tx_id); + }}; // Processes a single delta recieved in the RPC request. template diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 892006b2c..12022a3e6 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -2741,16 +2741,6 @@ PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self, worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); } -void PullRemote::PullRemoteCursor::EndRemotePull() { - std::vector> futures; - for (auto &worker_id : worker_ids_) { - futures.emplace_back(db_.db().remote_pull_clients().EndRemotePull( - worker_id, db_.transaction().id_, self_.plan_id())); - } - for (auto &future : futures) future.wait(); - worker_ids_.clear(); -} - bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { auto insert_future_for_worker = [&](int worker_id) { remote_pulls_[worker_id] = db_.db().remote_pull_clients().RemotePull( @@ -2789,22 +2779,17 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { insert_future_for_worker(worker_id); break; case distributed::RemotePullState::SERIALIZATION_ERROR: - EndRemotePull(); throw mvcc::SerializationError( "Serialization error occured during PullRemote !"); case distributed::RemotePullState::LOCK_TIMEOUT_ERROR: - EndRemotePull(); throw LockTimeoutException( "LockTimeout error occured during PullRemote !"); case distributed::RemotePullState::UPDATE_DELETED_ERROR: - EndRemotePull(); throw QueryRuntimeException( "RecordDeleted error ocured during PullRemote !"); case distributed::RemotePullState::RECONSTRUCTION_ERROR: - EndRemotePull(); throw query::ReconstructionException(); case distributed::RemotePullState::QUERY_ERROR: - EndRemotePull(); throw QueryRuntimeException( "Query runtime error occurred duing PullRemote !"); } @@ -2825,11 +2810,8 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { } if (!have_remote_results) { - // If we didn't find any remote results and there aren't any remote - // pulls, we've exhausted all remote results. Make sure we signal that - // to workers and exit the loop. if (remote_pulls_.empty()) { - EndRemotePull(); + worker_ids_.clear(); break; } @@ -2868,18 +2850,12 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { if (remote_results_[pull_from_worker_id].empty() && remote_pulls_.find(pull_from_worker_id) == remote_pulls_.end()) { worker_ids_.erase(worker_ids_.begin() + last_pulled_worker_id_index_); - db_.db() - .remote_pull_clients() - .EndRemotePull(pull_from_worker_id, db_.transaction().id_, - self_.plan_id()) - .wait(); } return true; } void PullRemote::PullRemoteCursor::Reset() { - EndRemotePull(); throw QueryRuntimeException("Unsupported: Reset during PullRemote!"); } diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index c890c2db8..dc5824728 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -2309,8 +2309,6 @@ class PullRemote : public LogicalOperator { void Reset() override; private: - void EndRemotePull(); - const PullRemote &self_; database::GraphDbAccessor &db_; const std::unique_ptr input_cursor_; diff --git a/src/transactions/engine.cpp b/src/transactions/engine.cpp new file mode 100644 index 000000000..69fa0e156 --- /dev/null +++ b/src/transactions/engine.cpp @@ -0,0 +1,29 @@ +#include +#include + +#include "glog/logging.h" + +#include "transactions/engine.hpp" +#include "transactions/tx_end_listener.hpp" + +namespace tx { + +void Engine::Register(TxEndListener *listener) { + std::lock_guard guard{end_listeners_lock_}; + end_listeners_.emplace_back(listener); +} + +void Engine::Unregister(TxEndListener *listener) { + std::lock_guard guard{end_listeners_lock_}; + auto found = + std::find(end_listeners_.begin(), end_listeners_.end(), listener); + CHECK(found != end_listeners_.end()) + << "Failed to find listener to unregister"; + end_listeners_.erase(found); +} + +void Engine::NotifyListeners(transaction_id_t tx_id) const { + std::lock_guard guard{end_listeners_lock_}; + for (auto *listener : end_listeners_) listener->operator()(tx_id); +} +} // namespace tx diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index 80a26eef9..309d3accb 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -1,11 +1,17 @@ #pragma once +#include +#include +#include + #include "data_structures/concurrent/concurrent_map.hpp" +#include "threading/sync/spinlock.hpp" #include "transactions/commit_log.hpp" #include "transactions/transaction.hpp" #include "transactions/type.hpp" namespace tx { +class TxEndListener; /** * Database transaction engine. Used for managing transactions and the related * information such as transaction snapshots and the transaction state info. @@ -19,6 +25,8 @@ namespace tx { * determined by the users of a particular method. */ class Engine { + friend class TxEndListener; + public: virtual ~Engine() = default; @@ -74,7 +82,7 @@ class Engine { std::function f) = 0; /** Gets a transaction object for a running transaction. */ - virtual tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) = 0; + virtual tx::Transaction *RunningTransaction(transaction_id_t tx_id) = 0; auto &local_lock_graph() { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; } @@ -84,5 +92,19 @@ class Engine { // tx_that_holds_lock). Used for local deadlock resolution. // TODO consider global deadlock resolution. ConcurrentMap local_lock_graph_; + + // Transaction end listeners and the lock for protecting that datastructure. + std::vector end_listeners_; + mutable SpinLock end_listeners_lock_; + + /** Register a transaction end listener with this engine. */ + void Register(TxEndListener *listener); + + /** Unregister a transaction end listener with this engine. */ + void Unregister(TxEndListener *listener); + + protected: + /** Notifies all registered listeners that a transaction has ended. */ + void NotifyListeners(transaction_id_t tx_id) const; }; } // namespace tx diff --git a/src/transactions/engine_rpc_messages.hpp b/src/transactions/engine_rpc_messages.hpp index b5841018f..344022b65 100644 --- a/src/transactions/engine_rpc_messages.hpp +++ b/src/transactions/engine_rpc_messages.hpp @@ -55,7 +55,7 @@ RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info) using ClogInfoRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t) +RPC_NO_MEMBER_MESSAGE(ActiveTransactionsReq) using ActiveTransactionsRpc = communication::rpc::RequestResponse; diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index da8ec69f6..f6db05ccb 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -50,23 +50,31 @@ command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) { } void SingleNodeEngine::Commit(const Transaction &t) { - std::lock_guard guard(lock_); - clog_.set_committed(t.id_); - active_.remove(t.id_); - if (wal_) { - wal_->Emplace(database::StateDelta::TxCommit(t.id_)); + auto tx_id = t.id_; + { + std::lock_guard guard(lock_); + clog_.set_committed(tx_id); + active_.remove(tx_id); + if (wal_) { + wal_->Emplace(database::StateDelta::TxCommit(tx_id)); + } + store_.erase(store_.find(tx_id)); } - store_.erase(store_.find(t.id_)); + NotifyListeners(tx_id); } void SingleNodeEngine::Abort(const Transaction &t) { - std::lock_guard guard(lock_); - clog_.set_aborted(t.id_); - active_.remove(t.id_); - if (wal_) { - wal_->Emplace(database::StateDelta::TxAbort(t.id_)); + auto tx_id = t.id_; + { + std::lock_guard guard(lock_); + clog_.set_aborted(tx_id); + active_.remove(tx_id); + if (wal_) { + wal_->Emplace(database::StateDelta::TxAbort(tx_id)); + } + store_.erase(store_.find(tx_id)); } - store_.erase(store_.find(t.id_)); + NotifyListeners(tx_id); } CommitLog::Info SingleNodeEngine::Info(transaction_id_t tx) const { diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index c7f8989a8..59d0f5b77 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -1,3 +1,5 @@ +#include + #include "glog/logging.h" #include "transactions/engine_rpc_messages.hpp" @@ -7,7 +9,22 @@ namespace tx { WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint) - : rpc_client_pool_(endpoint, kTransactionEngineRpc) {} + : rpc_client_pool_(endpoint, kTransactionEngineRpc) { + cache_clearing_scheduler_.Run(kCacheReleasePeriod, [this]() { + // Use the GC snapshot as it always has at least one member. + auto res = rpc_client_pool_.Call(); + // There is a race-condition between this scheduled call and worker + // shutdown. It is possible that the worker has responded to the master it + // is shutting down, and the master is shutting down (and can't responde to + // RPCs). At the same time this call gets scheduled, so we get a failed RPC. + if (!res) { + LOG(WARNING) << "Transaction cache GC RPC call failed"; + } else { + CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot"; + ClearCachesBasedOnOldest(res->member.front()); + } + }); +} WorkerEngine::~WorkerEngine() { for (auto &kv : active_.access()) { @@ -120,5 +137,22 @@ void WorkerEngine::ClearCache(transaction_id_t tx_id) const { delete found->second; access.remove(tx_id); } + NotifyListeners(tx_id); +} + +void WorkerEngine::ClearCachesBasedOnOldest(transaction_id_t oldest_active) { + // Take care to handle concurrent calls to this correctly. Try to update the + // oldest_active_, and only if successful (nobody else did it concurrently), + // clear caches between the previous oldest (now expired) and new oldest + // (possibly still alive). + auto previous_oldest = oldest_active_.load(); + while ( + !oldest_active_.compare_exchange_strong(previous_oldest, oldest_active)) { + ; + } + for (tx::transaction_id_t expired = previous_oldest; expired < oldest_active; + ++expired) { + ClearCache(expired); + } } } // namespace tx diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index c931e9424..9d1606ab6 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include "communication/rpc/client_pool.hpp" @@ -9,6 +10,7 @@ #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" #include "transactions/transaction.hpp" +#include "utils/scheduler.hpp" namespace tx { @@ -17,6 +19,10 @@ namespace tx { * begin/advance/end transactions on the master. */ class WorkerEngine : public Engine { public: + /// The wait time between two releases of local transaction objects that have + /// expired on the master. + static constexpr std::chrono::seconds kCacheReleasePeriod{1}; + WorkerEngine(const io::network::Endpoint &endpoint); ~WorkerEngine(); @@ -47,5 +53,11 @@ class WorkerEngine : public Engine { // Removes (destructs) a Transaction that's expired. If there is no cached // transacton for the given id, nothing is done. void ClearCache(transaction_id_t tx_id) const; + + // Used for clearing of caches of transactions that have expired. + // Initialize the oldest_active_ with 1 because there's never a tx with id=0 + std::atomic oldest_active_{1}; + void ClearCachesBasedOnOldest(transaction_id_t oldest_active); + Scheduler cache_clearing_scheduler_; }; } // namespace tx diff --git a/src/transactions/tx_end_listener.hpp b/src/transactions/tx_end_listener.hpp new file mode 100644 index 000000000..efdcae1bf --- /dev/null +++ b/src/transactions/tx_end_listener.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include + +#include "transactions/engine.hpp" +#include "transactions/type.hpp" + +namespace tx { + +/** + * Wrapper around the given function that registers itself with the given + * transaction engine. Ensures that the function gets called when a transaction + * has ended in the engine. + * + * Also ensures that the listener gets unregistered from the engine upon + * destruction. Intended usage is: just create a TxEndListener and ensure it + * gets destructed before the function it wraps becomes invalid. + * + * There are no guarantees that the listener will be called only once for the + * given transaction id. + */ +class TxEndListener { + public: + TxEndListener(Engine &engine, std::function function) + : engine_(engine), function_(std::move(function)) { + engine_.Register(this); + } + + ~TxEndListener() { engine_.Unregister(this); } + + void operator()(transaction_id_t tx_id) const { function_(tx_id); } + + private: + Engine &engine_; + std::function function_; +}; +} // namespace tx diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index 325bb7445..790416938 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -181,8 +181,6 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { auto tx1_batch2 = remote_pull(dba_1, worker_id).get(); expect_second_batch(tx1_batch2); } - for (auto tx_id : {dba_1.transaction_id(), dba_2.transaction_id()}) - master().remote_pull_clients().EndAllRemotePulls(tx_id, plan_id); } TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { @@ -277,9 +275,6 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { auto future_w2_results = remote_pull(dba, 2); check_result(1, future_w1_results.get().frames); check_result(2, future_w2_results.get().frames); - - master().remote_pull_clients().EndAllRemotePulls(dba.transaction_id(), - plan_id); } TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index c9637bdb6..661e5dd4c 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -58,10 +58,4 @@ TEST_F(DistributedUpdateTest, RemoteUpdateApply) { EXPECT_LABEL(v1_dba1, false, true); } -TEST_F(DistributedUpdateTest, RemoteUpdateDiscard) { - EXPECT_LABEL(v1_dba1, false, false); - worker(1).remote_updates_server().Discard(dba1->transaction_id()); - EXPECT_LABEL(v1_dba1, false, false); -} - #undef EXPECT_LABEL diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index 73558416d..3dd13e666 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -1,4 +1,7 @@ +#include +#include #include +#include #include "gtest/gtest.h" @@ -7,6 +10,7 @@ #include "transactions/engine_master.hpp" #include "transactions/engine_rpc_messages.hpp" #include "transactions/engine_worker.hpp" +#include "transactions/tx_end_listener.hpp" using namespace tx; using namespace communication::rpc; @@ -135,3 +139,22 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { [&local](Transaction &t) { local.insert(t.id_); }); EXPECT_EQ(local, std::unordered_set({1, 4})); } + +TEST_F(WorkerEngineTest, TxEndListener) { + std::atomic has_expired{0}; + TxEndListener worker_end_listner{ + worker_, [&has_expired](transaction_id_t tid) { + std::cout << "asdasdadas: " << tid << std::endl; + ++has_expired; }}; + + auto sleep_period = + WorkerEngine::kCacheReleasePeriod + std::chrono::milliseconds(200); + auto t1 = master_.Begin(); + auto t2 = master_.Begin(); + std::this_thread::sleep_for(sleep_period); + EXPECT_EQ(has_expired.load(), 0); + master_.Commit(*t1); + master_.Abort(*t2); + std::this_thread::sleep_for(sleep_period); + EXPECT_EQ(has_expired.load(), 2); +} diff --git a/tests/unit/transaction_engine_single_node.cpp b/tests/unit/transaction_engine_single_node.cpp index ee7ccbdcb..b8ff3fee5 100644 --- a/tests/unit/transaction_engine_single_node.cpp +++ b/tests/unit/transaction_engine_single_node.cpp @@ -6,37 +6,40 @@ #include "data_structures/concurrent/concurrent_set.hpp" #include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" +#include "transactions/tx_end_listener.hpp" + +using namespace tx; TEST(Engine, GcSnapshot) { - tx::SingleNodeEngine engine; - ASSERT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + SingleNodeEngine engine; + ASSERT_EQ(engine.GlobalGcSnapshot(), Snapshot({1})); - std::vector transactions; + std::vector transactions; // create transactions and check the GC snapshot for (int i = 0; i < 5; ++i) { transactions.push_back(engine.Begin()); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1})); } // commit transactions in the middle, expect // the GcSnapshot did not change engine.Commit(*transactions[1]); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1})); engine.Commit(*transactions[2]); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1})); // have the first three transactions committed engine.Commit(*transactions[0]); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1, 2, 3, 4})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1, 2, 3, 4})); // commit all engine.Commit(*transactions[3]); engine.Commit(*transactions[4]); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({6})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({6})); } TEST(Engine, Advance) { - tx::SingleNodeEngine engine; + SingleNodeEngine engine; auto t0 = engine.Begin(); auto t1 = engine.Begin(); @@ -49,9 +52,9 @@ TEST(Engine, Advance) { } TEST(Engine, ConcurrentBegin) { - tx::SingleNodeEngine engine; + SingleNodeEngine engine; std::vector threads; - ConcurrentSet tx_ids; + ConcurrentSet tx_ids; for (int i = 0; i < 10; ++i) { threads.emplace_back([&engine, accessor = tx_ids.access() ]() mutable { for (int j = 0; j < 100; ++j) { @@ -65,10 +68,29 @@ TEST(Engine, ConcurrentBegin) { } TEST(Engine, RunningTransaction) { - tx::SingleNodeEngine engine; + SingleNodeEngine engine; auto t0 = engine.Begin(); auto t1 = engine.Begin(); EXPECT_EQ(t0, engine.RunningTransaction(t0->id_)); EXPECT_NE(t1, engine.RunningTransaction(t0->id_)); EXPECT_EQ(t1, engine.RunningTransaction(t1->id_)); } + +TEST(Engine, TxEndListener) { + SingleNodeEngine engine; + int count = 0; + { + TxEndListener listener{engine, [&count](auto) { count++; }}; + EXPECT_EQ(count, 0); + auto t1 = engine.Begin(); + EXPECT_EQ(count, 0); + auto t2 = engine.Begin(); + engine.Abort(*t1); + EXPECT_EQ(count, 1); + engine.Commit(*t2); + EXPECT_EQ(count, 2); + } + auto t3 = engine.Begin(); + engine.Commit(*t3); + EXPECT_EQ(count, 2); +}