diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 766b1017e..5e13a629f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -49,6 +49,7 @@ set(memgraph_src_files storage/record_accessor.cpp storage/vertex_accessor.cpp threading/thread.cpp + transactions/engine.cpp transactions/engine_master.cpp transactions/engine_single_node.cpp transactions/engine_worker.cpp diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index 1b41e88e2..5b8367135 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -70,8 +70,6 @@ BOOST_CLASS_EXPORT(distributed::ConsumePlanRes); // Remote pull. BOOST_CLASS_EXPORT(distributed::RemotePullReq); BOOST_CLASS_EXPORT(distributed::RemotePullRes); -BOOST_CLASS_EXPORT(distributed::EndRemotePullReq); -BOOST_CLASS_EXPORT(distributed::EndRemotePullRes); BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedReq); BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedRes); @@ -92,5 +90,3 @@ BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq); BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes); BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq); BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes); -BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardReq); -BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardRes); diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index 39ef7e299..7248e0fe9 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -169,9 +169,11 @@ class Master : public PrivateBase { distributed::RemotePullRpcClients remote_pull_clients_{coordination_}; distributed::RpcWorkerClients index_rpc_clients_{coordination_, distributed::kIndexRpcName}; - distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_}; + distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_, + system_}; distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; - distributed::RemoteDataManager remote_data_manager_{remote_data_clients_}; + distributed::RemoteDataManager remote_data_manager_{tx_engine_, + remote_data_clients_}; }; class Worker : public PrivateBase { @@ -200,12 +202,14 @@ class Worker : public PrivateBase { distributed::RemoteDataRpcServer remote_data_server_{*this, system_}; distributed::RemoteDataRpcClients remote_data_clients_{coordination_}; distributed::PlanConsumer plan_consumer_{system_}; - distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_, - plan_consumer_}; + distributed::RemoteProduceRpcServer remote_produce_server_{ + *this, tx_engine_, system_, plan_consumer_}; distributed::IndexRpcServer index_rpc_server_{*this, system_}; - distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_}; + distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_, + system_}; distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; - distributed::RemoteDataManager remote_data_manager_{remote_data_clients_}; + distributed::RemoteDataManager remote_data_manager_{tx_engine_, + remote_data_clients_}; }; #undef IMPL_GETTERS diff --git a/src/distributed/remote_data_manager.hpp b/src/distributed/remote_data_manager.hpp index 20ec4fe57..026295e4e 100644 --- a/src/distributed/remote_data_manager.hpp +++ b/src/distributed/remote_data_manager.hpp @@ -7,6 +7,7 @@ #include "storage/edge.hpp" #include "storage/vertex.hpp" #include "threading/sync/spinlock.hpp" +#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -24,8 +25,9 @@ class RemoteDataManager { } public: - RemoteDataManager(distributed::RemoteDataRpcClients &remote_data_clients) - : remote_data_clients_(remote_data_clients) {} + RemoteDataManager(tx::Engine &tx_engine, + distributed::RemoteDataRpcClients &remote_data_clients) + : remote_data_clients_(remote_data_clients), tx_engine_(tx_engine) {} /// Gets or creates the remote vertex cache for the given transaction. auto &Vertices(tx::transaction_id_t tx_id) { @@ -53,6 +55,21 @@ class RemoteDataManager { std::unordered_map> vertices_caches_; std::unordered_map> edges_caches_; + + tx::Engine &tx_engine_; + tx::TxEndListener tx_end_listener_{ + tx_engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }}; + + // Clears the caches for the given transaction ID. + void ClearCache(tx::transaction_id_t tx_id) { + std::lock_guard guard{lock_}; + auto remove = [tx_id](auto &map) { + auto found = map.find(tx_id); + if (found != map.end()) map.erase(found); + }; + remove(vertices_caches_); + remove(edges_caches_); + } }; template <> diff --git a/src/distributed/remote_data_rpc_server.hpp b/src/distributed/remote_data_rpc_server.hpp index c4990f119..eca3be269 100644 --- a/src/distributed/remote_data_rpc_server.hpp +++ b/src/distributed/remote_data_rpc_server.hpp @@ -12,9 +12,6 @@ namespace distributed { /** Serves this worker's data to others. */ class RemoteDataRpcServer { - // TODO maybe reuse GraphDbAccessors. It would reduce the load on tx::Engine - // locks (not sure what the gain would be). But have some way of cache - // invalidation. public: RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system) : db_(db), rpc_server_(system, kRemoteDataRpcName) { diff --git a/src/distributed/remote_produce_rpc_server.hpp b/src/distributed/remote_produce_rpc_server.hpp index d691af58d..564e1dbcb 100644 --- a/src/distributed/remote_produce_rpc_server.hpp +++ b/src/distributed/remote_produce_rpc_server.hpp @@ -20,6 +20,8 @@ #include "query/parameters.hpp" #include "query/plan/operator.hpp" #include "query/typed_value.hpp" +#include "transactions/engine.hpp" +#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -129,26 +131,18 @@ class RemoteProduceRpcServer { }; public: - RemoteProduceRpcServer(database::GraphDb &db, + RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &engine, communication::rpc::System &system, const distributed::PlanConsumer &plan_consumer) : db_(db), remote_produce_rpc_server_(system, kRemotePullProduceRpcName), - plan_consumer_(plan_consumer) { + plan_consumer_(plan_consumer), + engine_(engine) { remote_produce_rpc_server_.Register( [this](const RemotePullReq &req) { return std::make_unique(RemotePull(req)); }); - remote_produce_rpc_server_.Register([this]( - const EndRemotePullReq &req) { - std::lock_guard guard{ongoing_produces_lock_}; - auto it = ongoing_produces_.find(req.member); - CHECK(it != ongoing_produces_.end()) << "Failed to find ongoing produce"; - ongoing_produces_.erase(it); - return std::make_unique(); - }); - remote_produce_rpc_server_.Register( [this](const TransactionCommandAdvancedReq &req) { db_.tx_engine().UpdateCommand(req.member); @@ -166,6 +160,22 @@ class RemoteProduceRpcServer { ongoing_produces_; std::mutex ongoing_produces_lock_; + tx::Engine &engine_; + tx::TxEndListener tx_end_listener_{ + engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }}; + + // Removes all onging pulls for the given tx_id (that transaction expired). + void ClearCache(tx::transaction_id_t tx_id) { + std::lock_guard guard{ongoing_produces_lock_}; + for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) { + if (it->first.first == tx_id) { + it = ongoing_produces_.erase(it); + } else { + ++it; + } + } + } + auto &GetOngoingProduce(const RemotePullReq &req) { std::lock_guard guard{ongoing_produces_lock_}; auto found = ongoing_produces_.find({req.tx_id, req.plan_id}); diff --git a/src/distributed/remote_pull_produce_rpc_messages.hpp b/src/distributed/remote_pull_produce_rpc_messages.hpp index 9eeef2a44..02630e570 100644 --- a/src/distributed/remote_pull_produce_rpc_messages.hpp +++ b/src/distributed/remote_pull_produce_rpc_messages.hpp @@ -364,12 +364,6 @@ using RemotePullRpc = // optimization not to have to send the full RemotePullReqData pack every // time. -using EndRemotePullData = std::pair; -RPC_SINGLE_MEMBER_MESSAGE(EndRemotePullReq, EndRemotePullData); -RPC_NO_MEMBER_MESSAGE(EndRemotePullRes); -using EndRemotePullRpc = - communication::rpc::RequestResponse; - RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t); RPC_NO_MEMBER_MESSAGE(TransactionCommandAdvancedRes); using TransactionCommandAdvancedRpc = diff --git a/src/distributed/remote_pull_rpc_clients.hpp b/src/distributed/remote_pull_rpc_clients.hpp index 048870dbd..c01b26736 100644 --- a/src/distributed/remote_pull_rpc_clients.hpp +++ b/src/distributed/remote_pull_rpc_clients.hpp @@ -87,30 +87,6 @@ class RemotePullRpcClients { auto GetWorkerIds() { return clients_.GetWorkerIds(); } - // Notifies a worker that the given transaction/plan is done. Otherwise the - // server is left with potentially unconsumed Cursors that never get deleted. - // - // @todo - this needs to be done with hooks into the transactional - // engine, so that the Worker discards it's stuff when the relevant - // transaction are done. - std::future EndRemotePull(int worker_id, tx::transaction_id_t tx_id, - int64_t plan_id) { - return clients_.ExecuteOnWorker( - worker_id, [tx_id, plan_id](ClientPool &client_pool) { - return client_pool.Call( - EndRemotePullData{tx_id, plan_id}); - }); - } - - void EndAllRemotePulls(tx::transaction_id_t tx_id, int64_t plan_id) { - std::vector> futures; - for (auto worker_id : clients_.GetWorkerIds()) { - if (worker_id == 0) continue; - futures.emplace_back(EndRemotePull(worker_id, tx_id, plan_id)); - } - for (auto &future : futures) future.wait(); - } - std::vector> NotifyAllTransactionCommandAdvanced( tx::transaction_id_t tx_id) { return clients_.ExecuteOnWorkers(0, [tx_id](auto &client) { diff --git a/src/distributed/remote_updates_rpc_clients.hpp b/src/distributed/remote_updates_rpc_clients.hpp index fee2a1438..f54236d8e 100644 --- a/src/distributed/remote_updates_rpc_clients.hpp +++ b/src/distributed/remote_updates_rpc_clients.hpp @@ -43,12 +43,6 @@ class RemoteUpdatesRpcClients { }); } - /// Calls for the worker with the given ID to discard remote updates. - void RemoteUpdateDiscard(int worker_id, tx::transaction_id_t tx_id) { - worker_clients_.GetClientPool(worker_id).Call( - tx_id); - } - private: RpcWorkerClients worker_clients_; }; diff --git a/src/distributed/remote_updates_rpc_messages.hpp b/src/distributed/remote_updates_rpc_messages.hpp index 53a18cdcd..451bebd24 100644 --- a/src/distributed/remote_updates_rpc_messages.hpp +++ b/src/distributed/remote_updates_rpc_messages.hpp @@ -26,10 +26,4 @@ RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult); using RemoteUpdateApplyRpc = communication::rpc::RequestResponse; - -RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateDiscardReq, tx::transaction_id_t); -RPC_NO_MEMBER_MESSAGE(RemoteUpdateDiscardRes); -using RemoteUpdateDiscardRpc = - communication::rpc::RequestResponse; } // namespace distributed diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/remote_updates_rpc_server.hpp index e0c0487ff..309413168 100644 --- a/src/distributed/remote_updates_rpc_server.hpp +++ b/src/distributed/remote_updates_rpc_server.hpp @@ -19,6 +19,7 @@ #include "storage/vertex_accessor.hpp" #include "threading/sync/lock_timeout_exception.hpp" #include "threading/sync/spinlock.hpp" +#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -121,9 +122,9 @@ class RemoteUpdatesRpcServer { }; public: - RemoteUpdatesRpcServer(database::GraphDb &db, + RemoteUpdatesRpcServer(database::GraphDb &db, tx::Engine &engine, communication::rpc::System &system) - : db_(db), server_(system, kRemoteUpdatesRpc) { + : db_(db), engine_(engine), server_(system, kRemoteUpdatesRpc) { server_.Register([this](const RemoteUpdateReq &req) { using DeltaType = database::StateDelta::Type; switch (req.member.type) { @@ -145,12 +146,6 @@ class RemoteUpdatesRpcServer { [this](const RemoteUpdateApplyReq &req) { return std::make_unique(Apply(req.member)); }); - - server_.Register( - [this](const RemoteUpdateDiscardReq &req) { - Discard(req.member); - return std::make_unique(); - }); } /// Applies all existsing updates for the given transaction ID. If there are @@ -175,19 +170,19 @@ class RemoteUpdatesRpcServer { return RemoteUpdateResult::DONE; } - /// Discards all the existing updates for the given transaction ID. - void Discard(tx::transaction_id_t tx_id) { - vertex_updates_.access().remove(tx_id); - edge_updates_.access().remove(tx_id); - } - private: database::GraphDb &db_; + tx::Engine &engine_; communication::rpc::Server server_; ConcurrentMap> vertex_updates_; ConcurrentMap> edge_updates_; + tx::TxEndListener tx_end_listener_{engine_, + [this](tx::transaction_id_t tx_id) { + vertex_updates_.access().remove(tx_id); + edge_updates_.access().remove(tx_id); + }}; // Processes a single delta recieved in the RPC request. template diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 892006b2c..12022a3e6 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -2741,16 +2741,6 @@ PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self, worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0)); } -void PullRemote::PullRemoteCursor::EndRemotePull() { - std::vector> futures; - for (auto &worker_id : worker_ids_) { - futures.emplace_back(db_.db().remote_pull_clients().EndRemotePull( - worker_id, db_.transaction().id_, self_.plan_id())); - } - for (auto &future : futures) future.wait(); - worker_ids_.clear(); -} - bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { auto insert_future_for_worker = [&](int worker_id) { remote_pulls_[worker_id] = db_.db().remote_pull_clients().RemotePull( @@ -2789,22 +2779,17 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { insert_future_for_worker(worker_id); break; case distributed::RemotePullState::SERIALIZATION_ERROR: - EndRemotePull(); throw mvcc::SerializationError( "Serialization error occured during PullRemote !"); case distributed::RemotePullState::LOCK_TIMEOUT_ERROR: - EndRemotePull(); throw LockTimeoutException( "LockTimeout error occured during PullRemote !"); case distributed::RemotePullState::UPDATE_DELETED_ERROR: - EndRemotePull(); throw QueryRuntimeException( "RecordDeleted error ocured during PullRemote !"); case distributed::RemotePullState::RECONSTRUCTION_ERROR: - EndRemotePull(); throw query::ReconstructionException(); case distributed::RemotePullState::QUERY_ERROR: - EndRemotePull(); throw QueryRuntimeException( "Query runtime error occurred duing PullRemote !"); } @@ -2825,11 +2810,8 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { } if (!have_remote_results) { - // If we didn't find any remote results and there aren't any remote - // pulls, we've exhausted all remote results. Make sure we signal that - // to workers and exit the loop. if (remote_pulls_.empty()) { - EndRemotePull(); + worker_ids_.clear(); break; } @@ -2868,18 +2850,12 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) { if (remote_results_[pull_from_worker_id].empty() && remote_pulls_.find(pull_from_worker_id) == remote_pulls_.end()) { worker_ids_.erase(worker_ids_.begin() + last_pulled_worker_id_index_); - db_.db() - .remote_pull_clients() - .EndRemotePull(pull_from_worker_id, db_.transaction().id_, - self_.plan_id()) - .wait(); } return true; } void PullRemote::PullRemoteCursor::Reset() { - EndRemotePull(); throw QueryRuntimeException("Unsupported: Reset during PullRemote!"); } diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp index c890c2db8..dc5824728 100644 --- a/src/query/plan/operator.hpp +++ b/src/query/plan/operator.hpp @@ -2309,8 +2309,6 @@ class PullRemote : public LogicalOperator { void Reset() override; private: - void EndRemotePull(); - const PullRemote &self_; database::GraphDbAccessor &db_; const std::unique_ptr input_cursor_; diff --git a/src/transactions/engine.cpp b/src/transactions/engine.cpp new file mode 100644 index 000000000..69fa0e156 --- /dev/null +++ b/src/transactions/engine.cpp @@ -0,0 +1,29 @@ +#include +#include + +#include "glog/logging.h" + +#include "transactions/engine.hpp" +#include "transactions/tx_end_listener.hpp" + +namespace tx { + +void Engine::Register(TxEndListener *listener) { + std::lock_guard guard{end_listeners_lock_}; + end_listeners_.emplace_back(listener); +} + +void Engine::Unregister(TxEndListener *listener) { + std::lock_guard guard{end_listeners_lock_}; + auto found = + std::find(end_listeners_.begin(), end_listeners_.end(), listener); + CHECK(found != end_listeners_.end()) + << "Failed to find listener to unregister"; + end_listeners_.erase(found); +} + +void Engine::NotifyListeners(transaction_id_t tx_id) const { + std::lock_guard guard{end_listeners_lock_}; + for (auto *listener : end_listeners_) listener->operator()(tx_id); +} +} // namespace tx diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index 80a26eef9..309d3accb 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -1,11 +1,17 @@ #pragma once +#include +#include +#include + #include "data_structures/concurrent/concurrent_map.hpp" +#include "threading/sync/spinlock.hpp" #include "transactions/commit_log.hpp" #include "transactions/transaction.hpp" #include "transactions/type.hpp" namespace tx { +class TxEndListener; /** * Database transaction engine. Used for managing transactions and the related * information such as transaction snapshots and the transaction state info. @@ -19,6 +25,8 @@ namespace tx { * determined by the users of a particular method. */ class Engine { + friend class TxEndListener; + public: virtual ~Engine() = default; @@ -74,7 +82,7 @@ class Engine { std::function f) = 0; /** Gets a transaction object for a running transaction. */ - virtual tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) = 0; + virtual tx::Transaction *RunningTransaction(transaction_id_t tx_id) = 0; auto &local_lock_graph() { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; } @@ -84,5 +92,19 @@ class Engine { // tx_that_holds_lock). Used for local deadlock resolution. // TODO consider global deadlock resolution. ConcurrentMap local_lock_graph_; + + // Transaction end listeners and the lock for protecting that datastructure. + std::vector end_listeners_; + mutable SpinLock end_listeners_lock_; + + /** Register a transaction end listener with this engine. */ + void Register(TxEndListener *listener); + + /** Unregister a transaction end listener with this engine. */ + void Unregister(TxEndListener *listener); + + protected: + /** Notifies all registered listeners that a transaction has ended. */ + void NotifyListeners(transaction_id_t tx_id) const; }; } // namespace tx diff --git a/src/transactions/engine_rpc_messages.hpp b/src/transactions/engine_rpc_messages.hpp index b5841018f..344022b65 100644 --- a/src/transactions/engine_rpc_messages.hpp +++ b/src/transactions/engine_rpc_messages.hpp @@ -55,7 +55,7 @@ RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info) using ClogInfoRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t) +RPC_NO_MEMBER_MESSAGE(ActiveTransactionsReq) using ActiveTransactionsRpc = communication::rpc::RequestResponse; diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index da8ec69f6..f6db05ccb 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -50,23 +50,31 @@ command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) { } void SingleNodeEngine::Commit(const Transaction &t) { - std::lock_guard guard(lock_); - clog_.set_committed(t.id_); - active_.remove(t.id_); - if (wal_) { - wal_->Emplace(database::StateDelta::TxCommit(t.id_)); + auto tx_id = t.id_; + { + std::lock_guard guard(lock_); + clog_.set_committed(tx_id); + active_.remove(tx_id); + if (wal_) { + wal_->Emplace(database::StateDelta::TxCommit(tx_id)); + } + store_.erase(store_.find(tx_id)); } - store_.erase(store_.find(t.id_)); + NotifyListeners(tx_id); } void SingleNodeEngine::Abort(const Transaction &t) { - std::lock_guard guard(lock_); - clog_.set_aborted(t.id_); - active_.remove(t.id_); - if (wal_) { - wal_->Emplace(database::StateDelta::TxAbort(t.id_)); + auto tx_id = t.id_; + { + std::lock_guard guard(lock_); + clog_.set_aborted(tx_id); + active_.remove(tx_id); + if (wal_) { + wal_->Emplace(database::StateDelta::TxAbort(tx_id)); + } + store_.erase(store_.find(tx_id)); } - store_.erase(store_.find(t.id_)); + NotifyListeners(tx_id); } CommitLog::Info SingleNodeEngine::Info(transaction_id_t tx) const { diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index c7f8989a8..59d0f5b77 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -1,3 +1,5 @@ +#include + #include "glog/logging.h" #include "transactions/engine_rpc_messages.hpp" @@ -7,7 +9,22 @@ namespace tx { WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint) - : rpc_client_pool_(endpoint, kTransactionEngineRpc) {} + : rpc_client_pool_(endpoint, kTransactionEngineRpc) { + cache_clearing_scheduler_.Run(kCacheReleasePeriod, [this]() { + // Use the GC snapshot as it always has at least one member. + auto res = rpc_client_pool_.Call(); + // There is a race-condition between this scheduled call and worker + // shutdown. It is possible that the worker has responded to the master it + // is shutting down, and the master is shutting down (and can't responde to + // RPCs). At the same time this call gets scheduled, so we get a failed RPC. + if (!res) { + LOG(WARNING) << "Transaction cache GC RPC call failed"; + } else { + CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot"; + ClearCachesBasedOnOldest(res->member.front()); + } + }); +} WorkerEngine::~WorkerEngine() { for (auto &kv : active_.access()) { @@ -120,5 +137,22 @@ void WorkerEngine::ClearCache(transaction_id_t tx_id) const { delete found->second; access.remove(tx_id); } + NotifyListeners(tx_id); +} + +void WorkerEngine::ClearCachesBasedOnOldest(transaction_id_t oldest_active) { + // Take care to handle concurrent calls to this correctly. Try to update the + // oldest_active_, and only if successful (nobody else did it concurrently), + // clear caches between the previous oldest (now expired) and new oldest + // (possibly still alive). + auto previous_oldest = oldest_active_.load(); + while ( + !oldest_active_.compare_exchange_strong(previous_oldest, oldest_active)) { + ; + } + for (tx::transaction_id_t expired = previous_oldest; expired < oldest_active; + ++expired) { + ClearCache(expired); + } } } // namespace tx diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index c931e9424..9d1606ab6 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include "communication/rpc/client_pool.hpp" @@ -9,6 +10,7 @@ #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" #include "transactions/transaction.hpp" +#include "utils/scheduler.hpp" namespace tx { @@ -17,6 +19,10 @@ namespace tx { * begin/advance/end transactions on the master. */ class WorkerEngine : public Engine { public: + /// The wait time between two releases of local transaction objects that have + /// expired on the master. + static constexpr std::chrono::seconds kCacheReleasePeriod{1}; + WorkerEngine(const io::network::Endpoint &endpoint); ~WorkerEngine(); @@ -47,5 +53,11 @@ class WorkerEngine : public Engine { // Removes (destructs) a Transaction that's expired. If there is no cached // transacton for the given id, nothing is done. void ClearCache(transaction_id_t tx_id) const; + + // Used for clearing of caches of transactions that have expired. + // Initialize the oldest_active_ with 1 because there's never a tx with id=0 + std::atomic oldest_active_{1}; + void ClearCachesBasedOnOldest(transaction_id_t oldest_active); + Scheduler cache_clearing_scheduler_; }; } // namespace tx diff --git a/src/transactions/tx_end_listener.hpp b/src/transactions/tx_end_listener.hpp new file mode 100644 index 000000000..efdcae1bf --- /dev/null +++ b/src/transactions/tx_end_listener.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include + +#include "transactions/engine.hpp" +#include "transactions/type.hpp" + +namespace tx { + +/** + * Wrapper around the given function that registers itself with the given + * transaction engine. Ensures that the function gets called when a transaction + * has ended in the engine. + * + * Also ensures that the listener gets unregistered from the engine upon + * destruction. Intended usage is: just create a TxEndListener and ensure it + * gets destructed before the function it wraps becomes invalid. + * + * There are no guarantees that the listener will be called only once for the + * given transaction id. + */ +class TxEndListener { + public: + TxEndListener(Engine &engine, std::function function) + : engine_(engine), function_(std::move(function)) { + engine_.Register(this); + } + + ~TxEndListener() { engine_.Unregister(this); } + + void operator()(transaction_id_t tx_id) const { function_(tx_id); } + + private: + Engine &engine_; + std::function function_; +}; +} // namespace tx diff --git a/tests/unit/distributed_graph_db.cpp b/tests/unit/distributed_graph_db.cpp index 325bb7445..790416938 100644 --- a/tests/unit/distributed_graph_db.cpp +++ b/tests/unit/distributed_graph_db.cpp @@ -181,8 +181,6 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) { auto tx1_batch2 = remote_pull(dba_1, worker_id).get(); expect_second_batch(tx1_batch2); } - for (auto tx_id : {dba_1.transaction_id(), dba_2.transaction_id()}) - master().remote_pull_clients().EndAllRemotePulls(tx_id, plan_id); } TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { @@ -277,9 +275,6 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) { auto future_w2_results = remote_pull(dba, 2); check_result(1, future_w1_results.get().frames); check_result(2, future_w2_results.get().frames); - - master().remote_pull_clients().EndAllRemotePulls(dba.transaction_id(), - plan_id); } TEST_F(DistributedGraphDbTest, BuildIndexDistributed) { diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index c9637bdb6..661e5dd4c 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -58,10 +58,4 @@ TEST_F(DistributedUpdateTest, RemoteUpdateApply) { EXPECT_LABEL(v1_dba1, false, true); } -TEST_F(DistributedUpdateTest, RemoteUpdateDiscard) { - EXPECT_LABEL(v1_dba1, false, false); - worker(1).remote_updates_server().Discard(dba1->transaction_id()); - EXPECT_LABEL(v1_dba1, false, false); -} - #undef EXPECT_LABEL diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index 73558416d..3dd13e666 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -1,4 +1,7 @@ +#include +#include #include +#include #include "gtest/gtest.h" @@ -7,6 +10,7 @@ #include "transactions/engine_master.hpp" #include "transactions/engine_rpc_messages.hpp" #include "transactions/engine_worker.hpp" +#include "transactions/tx_end_listener.hpp" using namespace tx; using namespace communication::rpc; @@ -135,3 +139,22 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { [&local](Transaction &t) { local.insert(t.id_); }); EXPECT_EQ(local, std::unordered_set({1, 4})); } + +TEST_F(WorkerEngineTest, TxEndListener) { + std::atomic has_expired{0}; + TxEndListener worker_end_listner{ + worker_, [&has_expired](transaction_id_t tid) { + std::cout << "asdasdadas: " << tid << std::endl; + ++has_expired; }}; + + auto sleep_period = + WorkerEngine::kCacheReleasePeriod + std::chrono::milliseconds(200); + auto t1 = master_.Begin(); + auto t2 = master_.Begin(); + std::this_thread::sleep_for(sleep_period); + EXPECT_EQ(has_expired.load(), 0); + master_.Commit(*t1); + master_.Abort(*t2); + std::this_thread::sleep_for(sleep_period); + EXPECT_EQ(has_expired.load(), 2); +} diff --git a/tests/unit/transaction_engine_single_node.cpp b/tests/unit/transaction_engine_single_node.cpp index ee7ccbdcb..b8ff3fee5 100644 --- a/tests/unit/transaction_engine_single_node.cpp +++ b/tests/unit/transaction_engine_single_node.cpp @@ -6,37 +6,40 @@ #include "data_structures/concurrent/concurrent_set.hpp" #include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" +#include "transactions/tx_end_listener.hpp" + +using namespace tx; TEST(Engine, GcSnapshot) { - tx::SingleNodeEngine engine; - ASSERT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + SingleNodeEngine engine; + ASSERT_EQ(engine.GlobalGcSnapshot(), Snapshot({1})); - std::vector transactions; + std::vector transactions; // create transactions and check the GC snapshot for (int i = 0; i < 5; ++i) { transactions.push_back(engine.Begin()); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1})); } // commit transactions in the middle, expect // the GcSnapshot did not change engine.Commit(*transactions[1]); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1})); engine.Commit(*transactions[2]); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1})); // have the first three transactions committed engine.Commit(*transactions[0]); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1, 2, 3, 4})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1, 2, 3, 4})); // commit all engine.Commit(*transactions[3]); engine.Commit(*transactions[4]); - EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({6})); + EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({6})); } TEST(Engine, Advance) { - tx::SingleNodeEngine engine; + SingleNodeEngine engine; auto t0 = engine.Begin(); auto t1 = engine.Begin(); @@ -49,9 +52,9 @@ TEST(Engine, Advance) { } TEST(Engine, ConcurrentBegin) { - tx::SingleNodeEngine engine; + SingleNodeEngine engine; std::vector threads; - ConcurrentSet tx_ids; + ConcurrentSet tx_ids; for (int i = 0; i < 10; ++i) { threads.emplace_back([&engine, accessor = tx_ids.access() ]() mutable { for (int j = 0; j < 100; ++j) { @@ -65,10 +68,29 @@ TEST(Engine, ConcurrentBegin) { } TEST(Engine, RunningTransaction) { - tx::SingleNodeEngine engine; + SingleNodeEngine engine; auto t0 = engine.Begin(); auto t1 = engine.Begin(); EXPECT_EQ(t0, engine.RunningTransaction(t0->id_)); EXPECT_NE(t1, engine.RunningTransaction(t0->id_)); EXPECT_EQ(t1, engine.RunningTransaction(t1->id_)); } + +TEST(Engine, TxEndListener) { + SingleNodeEngine engine; + int count = 0; + { + TxEndListener listener{engine, [&count](auto) { count++; }}; + EXPECT_EQ(count, 0); + auto t1 = engine.Begin(); + EXPECT_EQ(count, 0); + auto t2 = engine.Begin(); + engine.Abort(*t1); + EXPECT_EQ(count, 1); + engine.Commit(*t2); + EXPECT_EQ(count, 2); + } + auto t3 = engine.Begin(); + engine.Commit(*t3); + EXPECT_EQ(count, 2); +}