diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a0ad4cbf0..b20672bf6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -49,7 +49,6 @@ set(memgraph_src_files storage/record_accessor.cpp storage/vertex_accessor.cpp threading/thread.cpp - transactions/engine.cpp transactions/engine_master.cpp transactions/engine_single_node.cpp transactions/engine_worker.cpp diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index a60189135..14ec9b4a1 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -14,6 +14,7 @@ #include "distributed/remote_pull_rpc_clients.hpp" #include "distributed/remote_updates_rpc_clients.hpp" #include "distributed/remote_updates_rpc_server.hpp" +#include "distributed/transactional_cache_cleaner.hpp" #include "durability/paths.hpp" #include "durability/recovery.hpp" #include "durability/snapshooter.hpp" @@ -145,7 +146,11 @@ class SingleNode : public PrivateBase { class Master : public PrivateBase { public: - explicit Master(const Config &config) : PrivateBase(config) {} + explicit Master(const Config &config) : PrivateBase(config) { + cache_cleaner_.Register(remote_updates_server_); + cache_cleaner_.Register(remote_data_manager_); + } + GraphDb::Type type() const override { return GraphDb::Type::DISTRIBUTED_MASTER; } @@ -180,17 +185,20 @@ class Master : public PrivateBase { distributed::PlanDispatcher plan_dispatcher_{coordination_}; distributed::RemotePullRpcClients remote_pull_clients_{coordination_}; distributed::RpcWorkerClients index_rpc_clients_{coordination_}; - distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_, - server_}; + distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_}; distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; - distributed::RemoteDataManager remote_data_manager_{tx_engine_, - remote_data_clients_}; + distributed::RemoteDataManager remote_data_manager_{remote_data_clients_}; + distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_}; }; class Worker : public PrivateBase { public: explicit Worker(const Config &config) : PrivateBase(config) { coordination_.RegisterWorker(config.worker_id); + cache_cleaner_.Register(tx_engine_); + cache_cleaner_.Register(remote_produce_server_); + cache_cleaner_.Register(remote_updates_server_); + cache_cleaner_.Register(remote_data_manager_); } GraphDb::Type type() const override { @@ -224,11 +232,10 @@ class Worker : public PrivateBase { distributed::RemoteProduceRpcServer remote_produce_server_{ *this, tx_engine_, server_, plan_consumer_}; distributed::IndexRpcServer index_rpc_server_{*this, server_}; - distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_, - server_}; + distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_}; distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_}; - distributed::RemoteDataManager remote_data_manager_{tx_engine_, - remote_data_clients_}; + distributed::RemoteDataManager remote_data_manager_{remote_data_clients_}; + distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_}; }; #undef IMPL_GETTERS @@ -244,8 +251,7 @@ PublicBase::PublicBase(std::unique_ptr impl) impl_->wal().Enable(); snapshot_creator_ = std::make_unique(); snapshot_creator_->Run( - "Snapshot", - std::chrono::seconds(impl_->config_.snapshot_cycle_sec), + "Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec), [this] { MakeSnapshot(); }); } } diff --git a/src/distributed/remote_data_manager.hpp b/src/distributed/remote_data_manager.hpp index 026295e4e..87ecabfe7 100644 --- a/src/distributed/remote_data_manager.hpp +++ b/src/distributed/remote_data_manager.hpp @@ -1,13 +1,10 @@ #pragma once -#include - +#include "data_structures/concurrent/concurrent_map.hpp" #include "distributed/remote_cache.hpp" #include "distributed/remote_data_rpc_clients.hpp" #include "storage/edge.hpp" #include "storage/vertex.hpp" -#include "threading/sync/spinlock.hpp" -#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -17,17 +14,19 @@ class RemoteDataManager { // Helper, gets or inserts a data cache for the given transaction. template auto &GetCache(TCollection &collection, tx::transaction_id_t tx_id) { - std::lock_guard guard{lock_}; - auto found = collection.find(tx_id); - if (found != collection.end()) return found->second; + auto access = collection.access(); + auto found = access.find(tx_id); + if (found != access.end()) return found->second; - return collection.emplace(tx_id, remote_data_clients_).first->second; + return access + .emplace(tx_id, std::make_tuple(tx_id), + std::make_tuple(std::ref(remote_data_clients_))) + .first->second; } public: - RemoteDataManager(tx::Engine &tx_engine, - distributed::RemoteDataRpcClients &remote_data_clients) - : remote_data_clients_(remote_data_clients), tx_engine_(tx_engine) {} + RemoteDataManager(distributed::RemoteDataRpcClients &remote_data_clients) + : remote_data_clients_(remote_data_clients) {} /// Gets or creates the remote vertex cache for the given transaction. auto &Vertices(tx::transaction_id_t tx_id) { @@ -43,33 +42,33 @@ class RemoteDataManager { template auto &Elements(tx::transaction_id_t tx_id); - /// Calls RemoteCache::ClearCache on vertex and edge caches. - void ClearCaches(tx::transaction_id_t tx_id) { + /// Removes all the caches for a single transaction. + void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id) { Vertices(tx_id).ClearCache(); Edges(tx_id).ClearCache(); } + /// Clears the cache of local transactions that have expired. The signature of + /// this method is dictated by `distributed::CacheCleaner`. + void ClearTransactionalCache(tx::transaction_id_t oldest_active) { + auto vertex_access = vertices_caches_.access(); + for (auto &kv : vertex_access) { + if (kv.first < oldest_active) { + vertex_access.remove(kv.first); + } + } + auto edge_access = edges_caches_.access(); + for (auto &kv : edge_access) { + if (kv.first < oldest_active) { + edge_access.remove(kv.first); + } + } + } + private: RemoteDataRpcClients &remote_data_clients_; - SpinLock lock_; - std::unordered_map> - vertices_caches_; - std::unordered_map> edges_caches_; - - tx::Engine &tx_engine_; - tx::TxEndListener tx_end_listener_{ - tx_engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }}; - - // Clears the caches for the given transaction ID. - void ClearCache(tx::transaction_id_t tx_id) { - std::lock_guard guard{lock_}; - auto remove = [tx_id](auto &map) { - auto found = map.find(tx_id); - if (found != map.end()) map.erase(found); - }; - remove(vertices_caches_); - remove(edges_caches_); - } + ConcurrentMap> vertices_caches_; + ConcurrentMap> edges_caches_; }; template <> diff --git a/src/distributed/remote_produce_rpc_server.hpp b/src/distributed/remote_produce_rpc_server.hpp index 5092f4175..625bf2320 100644 --- a/src/distributed/remote_produce_rpc_server.hpp +++ b/src/distributed/remote_produce_rpc_server.hpp @@ -1,12 +1,11 @@ #pragma once #include -#include -#include #include #include #include "communication/rpc/server.hpp" +#include "data_structures/concurrent/concurrent_map.hpp" #include "database/graph_db.hpp" #include "database/graph_db_accessor.hpp" #include "distributed/plan_consumer.hpp" @@ -22,7 +21,6 @@ #include "query/typed_value.hpp" #include "transactions/engine.hpp" #include "transactions/engine_worker.hpp" -#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -147,40 +145,34 @@ class RemoteProduceRpcServer { remote_produce_rpc_server_.Register( [this](const TransactionCommandAdvancedReq &req) { tx_engine_.UpdateCommand(req.member); - db_.remote_data_manager().ClearCaches(req.member); + db_.remote_data_manager().ClearCacheForSingleTransaction(req.member); return std::make_unique(); }); } + /// Clears the cache of local transactions that have expired. The signature of + /// this method is dictated by `distributed::TransactionalCacheCleaner`. + void ClearTransactionalCache(tx::transaction_id_t oldest_active) { + auto access = ongoing_produces_.access(); + for (auto &kv : access) { + if (kv.first.first < oldest_active) { + access.remove(kv.first); + } + } + } + private: database::GraphDb &db_; communication::rpc::Server &remote_produce_rpc_server_; const distributed::PlanConsumer &plan_consumer_; - - std::map, OngoingProduce> + ConcurrentMap, OngoingProduce> ongoing_produces_; - std::mutex ongoing_produces_lock_; - tx::Engine &tx_engine_; - tx::TxEndListener tx_end_listener_{ - tx_engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }}; - - // Removes all onging pulls for the given tx_id (that transaction expired). - void ClearCache(tx::transaction_id_t tx_id) { - std::lock_guard guard{ongoing_produces_lock_}; - for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) { - if (it->first.first == tx_id) { - it = ongoing_produces_.erase(it); - } else { - ++it; - } - } - } auto &GetOngoingProduce(const RemotePullReq &req) { - std::lock_guard guard{ongoing_produces_lock_}; - auto found = ongoing_produces_.find({req.tx_id, req.plan_id}); - if (found != ongoing_produces_.end()) { + auto access = ongoing_produces_.access(); + auto found = access.find({req.tx_id, req.plan_id}); + if (found != access.end()) { return found->second; } if (db_.type() == database::GraphDb::Type::DISTRIBUTED_WORKER) { @@ -189,9 +181,9 @@ class RemoteProduceRpcServer { .RunningTransaction(req.tx_id, req.tx_snapshot); } auto &plan_pack = plan_consumer_.PlanForId(req.plan_id); - return ongoing_produces_ - .emplace(std::piecewise_construct, - std::forward_as_tuple(req.tx_id, req.plan_id), + auto key_par = std::make_pair(req.tx_id, req.tx_id); + return access + .emplace(key_par, std::forward_as_tuple(key_par), std::forward_as_tuple(db_, req.tx_id, plan_pack.plan, plan_pack.symbol_table, req.params, req.symbols)) diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/remote_updates_rpc_server.hpp index b37b1c1d3..05aea8150 100644 --- a/src/distributed/remote_updates_rpc_server.hpp +++ b/src/distributed/remote_updates_rpc_server.hpp @@ -21,7 +21,6 @@ #include "storage/vertex_accessor.hpp" #include "threading/sync/lock_timeout_exception.hpp" #include "threading/sync/spinlock.hpp" -#include "transactions/tx_end_listener.hpp" #include "transactions/type.hpp" namespace distributed { @@ -186,10 +185,10 @@ class RemoteUpdatesRpcServer { }; public: - RemoteUpdatesRpcServer(database::GraphDb &db, tx::Engine &engine, + RemoteUpdatesRpcServer(database::GraphDb &db, communication::rpc::Server &server) - : db_(db), engine_(engine), server_(server) { - server_.Register([this](const RemoteUpdateReq &req) { + : db_(db) { + server.Register([this](const RemoteUpdateReq &req) { using DeltaType = database::StateDelta::Type; auto &delta = req.member; switch (delta.type) { @@ -207,19 +206,19 @@ class RemoteUpdatesRpcServer { } }); - server_.Register( + server.Register( [this](const RemoteUpdateApplyReq &req) { return std::make_unique(Apply(req.member)); }); - server_.Register( + server.Register( [this](const RemoteCreateVertexReq &req) { return std::make_unique( GetUpdates(vertex_updates_, req.member.tx_id) .CreateVertex(req.member.labels, req.member.properties)); }); - server_.Register( + server.Register( [this](const RemoteCreateEdgeReq &req) { auto data = req.member; auto creation_result = CreateEdge(data); @@ -238,7 +237,7 @@ class RemoteUpdatesRpcServer { return std::make_unique(creation_result); }); - server_.Register([this](const RemoteAddInEdgeReq &req) { + server.Register([this](const RemoteAddInEdgeReq &req) { auto to_delta = database::StateDelta::AddInEdge( req.member.tx_id, req.member.to, req.member.from, req.member.edge_address, req.member.edge_type); @@ -270,15 +269,26 @@ class RemoteUpdatesRpcServer { return RemoteUpdateResult::DONE; } + /// Clears the cache of local transactions that have expired. The signature of + /// this method is dictated by `distributed::CacheCleaner`. + void ClearTransactionalCache(tx::transaction_id_t oldest_active) { + auto vertex_access = vertex_updates_.access(); + for (auto &kv : vertex_access) { + if (kv.first < oldest_active) { + vertex_access.remove(kv.first); + } + } + auto edge_access = edge_updates_.access(); + for (auto &kv : edge_access) { + if (kv.first < oldest_active) { + edge_access.remove(kv.first); + } + } + } + private: database::GraphDb &db_; - tx::Engine &engine_; - communication::rpc::Server &server_; - tx::TxEndListener tx_end_listener_{engine_, - [this](tx::transaction_id_t tx_id) { - vertex_updates_.access().remove(tx_id); - edge_updates_.access().remove(tx_id); - }}; + template using MapT = ConcurrentMap>; diff --git a/src/distributed/transactional_cache_cleaner.hpp b/src/distributed/transactional_cache_cleaner.hpp new file mode 100644 index 000000000..f9f55d1cc --- /dev/null +++ b/src/distributed/transactional_cache_cleaner.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +#include "transactions/engine.hpp" +#include "utils/scheduler.hpp" + +namespace distributed { + +/// Periodically calls `ClearCache(oldest_transaction)` on all registered +/// functions. +class TransactionalCacheCleaner { + /// The wait time between two releases of local transaction objects that have + /// expired on the master. + static constexpr std::chrono::seconds kCacheReleasePeriod{1}; + + public: + TransactionalCacheCleaner(tx::Engine &tx_engine) : tx_engine_(tx_engine) { + cache_clearing_scheduler_.Run( + "DistrTxCacheGc", kCacheReleasePeriod, [this]() { + auto oldest_active = tx_engine_.LocalOldestActive(); + for (auto &f : functions_) f(oldest_active); + }); + } + + /// Registers the given object for transactional cleaning. The object will + /// periodically get it's `ClearCache(tx::transaction_id_t)` method called + /// with the oldest active transaction id. Note that the ONLY guarantee for + /// the call param is that there are no transactions alive that have an id + /// lower than it. + template + void Register(TCache &cache) { + functions_.emplace_back([&cache](tx::transaction_id_t oldest_active) { + cache.ClearTransactionalCache(oldest_active); + }); + } + + private: + tx::Engine &tx_engine_; + std::vector> + functions_; + Scheduler cache_clearing_scheduler_; +}; +} // namespace distributed diff --git a/src/transactions/engine.cpp b/src/transactions/engine.cpp deleted file mode 100644 index 69fa0e156..000000000 --- a/src/transactions/engine.cpp +++ /dev/null @@ -1,29 +0,0 @@ -#include -#include - -#include "glog/logging.h" - -#include "transactions/engine.hpp" -#include "transactions/tx_end_listener.hpp" - -namespace tx { - -void Engine::Register(TxEndListener *listener) { - std::lock_guard guard{end_listeners_lock_}; - end_listeners_.emplace_back(listener); -} - -void Engine::Unregister(TxEndListener *listener) { - std::lock_guard guard{end_listeners_lock_}; - auto found = - std::find(end_listeners_.begin(), end_listeners_.end(), listener); - CHECK(found != end_listeners_.end()) - << "Failed to find listener to unregister"; - end_listeners_.erase(found); -} - -void Engine::NotifyListeners(transaction_id_t tx_id) const { - std::lock_guard guard{end_listeners_lock_}; - for (auto *listener : end_listeners_) listener->operator()(tx_id); -} -} // namespace tx diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index cb3cb54ce..fdd3cba95 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -11,7 +11,6 @@ #include "transactions/type.hpp" namespace tx { -class TxEndListener; /** * Database transaction engine. Used for managing transactions and the related * information such as transaction snapshots and the transaction state info. @@ -25,8 +24,6 @@ class TxEndListener; * determined by the users of a particular method. */ class Engine { - friend class TxEndListener; - public: virtual ~Engine() = default; @@ -58,7 +55,7 @@ class Engine { * last. * * The idea is that data records can only be deleted if they were expired (and - * that was committed) by a transaction older then the older currently active. + * that was committed) by a transaction older than the older currently active. * We need the full snapshot to prevent overlaps (see general GC * documentation). * @@ -74,6 +71,11 @@ class Engine { /** Returns the ID of last locally known transaction. */ virtual tx::transaction_id_t LocalLast() const = 0; + /** Returns the ID of the oldest transaction locally known to be active. It is + * guaranteed that all the transactions older than the returned are globally + * not active. */ + virtual transaction_id_t LocalOldestActive() const = 0; + /** Calls function f on each locally active transaction. */ virtual void LocalForEachActiveTransaction( std::function f) = 0; @@ -89,19 +91,5 @@ class Engine { // tx_that_holds_lock). Used for local deadlock resolution. // TODO consider global deadlock resolution. ConcurrentMap local_lock_graph_; - - // Transaction end listeners and the lock for protecting that datastructure. - std::vector end_listeners_; - mutable SpinLock end_listeners_lock_; - - /** Register a transaction end listener with this engine. */ - void Register(TxEndListener *listener); - - /** Unregister a transaction end listener with this engine. */ - void Unregister(TxEndListener *listener); - - protected: - /** Notifies all registered listeners that a transaction has ended. */ - void NotifyListeners(transaction_id_t tx_id) const; }; } // namespace tx diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index 7502aea7b..c95fa1179 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -50,31 +50,23 @@ command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) { } void SingleNodeEngine::Commit(const Transaction &t) { - auto tx_id = t.id_; - { - std::lock_guard guard(lock_); - clog_.set_committed(tx_id); - active_.remove(tx_id); - if (wal_) { - wal_->Emplace(database::StateDelta::TxCommit(tx_id)); - } - store_.erase(store_.find(tx_id)); + std::lock_guard guard(lock_); + clog_.set_committed(t.id_); + active_.remove(t.id_); + if (wal_) { + wal_->Emplace(database::StateDelta::TxCommit(t.id_)); } - NotifyListeners(tx_id); + store_.erase(store_.find(t.id_)); } void SingleNodeEngine::Abort(const Transaction &t) { - auto tx_id = t.id_; - { - std::lock_guard guard(lock_); - clog_.set_aborted(tx_id); - active_.remove(tx_id); - if (wal_) { - wal_->Emplace(database::StateDelta::TxAbort(tx_id)); - } - store_.erase(store_.find(tx_id)); + std::lock_guard guard(lock_); + clog_.set_aborted(t.id_); + active_.remove(t.id_); + if (wal_) { + wal_->Emplace(database::StateDelta::TxAbort(t.id_)); } - NotifyListeners(tx_id); + store_.erase(store_.find(t.id_)); } CommitLog::Info SingleNodeEngine::Info(transaction_id_t tx) const { @@ -103,8 +95,11 @@ Snapshot SingleNodeEngine::GlobalActiveTransactions() { return active_transactions; } -tx::transaction_id_t SingleNodeEngine::LocalLast() const { - return counter_.load(); +transaction_id_t SingleNodeEngine::LocalLast() const { return counter_.load(); } + +transaction_id_t SingleNodeEngine::LocalOldestActive() const { + std::lock_guard guard(lock_); + return active_.empty() ? counter_ + 1 : active_.front(); } void SingleNodeEngine::LocalForEachActiveTransaction( @@ -115,8 +110,7 @@ void SingleNodeEngine::LocalForEachActiveTransaction( } } -tx::Transaction *SingleNodeEngine::RunningTransaction( - tx::transaction_id_t tx_id) { +Transaction *SingleNodeEngine::RunningTransaction(transaction_id_t tx_id) { std::lock_guard guard(lock_); auto found = store_.find(tx_id); CHECK(found != store_.end()) diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/engine_single_node.hpp index 4deaea0e5..4f1e7ad9e 100644 --- a/src/transactions/engine_single_node.hpp +++ b/src/transactions/engine_single_node.hpp @@ -38,6 +38,7 @@ class SingleNodeEngine : public Engine { Snapshot GlobalGcSnapshot() override; Snapshot GlobalActiveTransactions() override; tx::transaction_id_t LocalLast() const override; + transaction_id_t LocalOldestActive() const override; void LocalForEachActiveTransaction( std::function f) override; tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override; @@ -47,7 +48,7 @@ class SingleNodeEngine : public Engine { CommitLog clog_; std::unordered_map> store_; Snapshot active_; - SpinLock lock_; + mutable SpinLock lock_; // Optional. If present, the Engine will write tx Begin/Commit/Abort // atomically (while under lock). durability::WriteAheadLog *wal_{nullptr}; diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 8e66c7311..067ee7322 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -9,24 +9,7 @@ namespace tx { WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint) - : rpc_client_pool_(endpoint) { - cache_clearing_scheduler_.Run( - "TX cache clear", kCacheReleasePeriod, [this]() { - // Use the GC snapshot as it always has at least one member. - auto res = rpc_client_pool_.Call(); - // There is a race-condition between this scheduled call and worker - // shutdown. It is possible that the worker has responded to the master - // it is shutting down, and the master is shutting down (and can't - // responde to RPCs). At the same time this call gets scheduled, so we - // get a failed RPC. - if (!res) { - LOG(WARNING) << "Transaction cache GC RPC call failed"; - } else { - CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot"; - ClearCachesBasedOnOldest(res->member.front()); - } - }); -} + : rpc_client_pool_(endpoint) {} WorkerEngine::~WorkerEngine() { for (auto &kv : active_.access()) { @@ -35,10 +18,10 @@ WorkerEngine::~WorkerEngine() { } Transaction *WorkerEngine::Begin() { - auto res = rpc_client_pool_.Call(); - Transaction *tx = - new Transaction(res->member.tx_id, res->member.snapshot, *this); - auto insertion = active_.access().insert(res->member.tx_id, tx); + auto data = rpc_client_pool_.Call()->member; + UpdateOldestActive(data.snapshot, data.tx_id); + Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this); + auto insertion = active_.access().insert(data.tx_id, tx); CHECK(insertion.second) << "Failed to start creation from worker"; return tx; } @@ -73,12 +56,12 @@ command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) { void WorkerEngine::Commit(const Transaction &t) { auto res = rpc_client_pool_.Call(t.id_); - ClearCache(t.id_); + ClearSingleTransaction(t.id_); } void WorkerEngine::Abort(const Transaction &t) { auto res = rpc_client_pool_.Call(t.id_); - ClearCache(t.id_); + ClearSingleTransaction(t.id_); } CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { @@ -92,7 +75,7 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { if (!info.is_active()) { if (info.is_committed()) clog_.set_committed(tid); if (info.is_aborted()) clog_.set_aborted(tid); - ClearCache(tid); + ClearSingleTransaction(tid); } } @@ -100,11 +83,16 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { } Snapshot WorkerEngine::GlobalGcSnapshot() { - return std::move(rpc_client_pool_.Call()->member); + auto snapshot = std::move(rpc_client_pool_.Call()->member); + UpdateOldestActive(snapshot, local_last_.load()); + return snapshot; } Snapshot WorkerEngine::GlobalActiveTransactions() { - return std::move(rpc_client_pool_.Call()->member); + auto snapshot = + std::move(rpc_client_pool_.Call()->member); + UpdateOldestActive(snapshot, local_last_.load()); + return snapshot; } transaction_id_t WorkerEngine::LocalLast() const { return local_last_; } @@ -114,13 +102,17 @@ void WorkerEngine::LocalForEachActiveTransaction( for (auto pair : active_.access()) f(*pair.second); } +transaction_id_t WorkerEngine::LocalOldestActive() const { + return oldest_active_; +} + Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) { auto accessor = active_.access(); auto found = accessor.find(tx_id); if (found != accessor.end()) return found->second; - Snapshot snapshot( - std::move(rpc_client_pool_.Call(tx_id)->member)); + auto snapshot = std::move(rpc_client_pool_.Call(tx_id)->member); + UpdateOldestActive(snapshot, local_last_.load()); return RunningTransaction(tx_id, snapshot); } @@ -137,29 +129,32 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id, return insertion.first->second; } -void WorkerEngine::ClearCache(transaction_id_t tx_id) const { +void WorkerEngine::ClearTransactionalCache( + transaction_id_t oldest_active) const { + auto access = active_.access(); + for (auto kv : access) { + if (kv.first < oldest_active) { + delete kv.second; + access.remove(kv.first); + } + } +} + +void WorkerEngine::ClearSingleTransaction(transaction_id_t tx_id) const { auto access = active_.access(); auto found = access.find(tx_id); if (found != access.end()) { delete found->second; - access.remove(tx_id); + access.remove(found->first); } - NotifyListeners(tx_id); } -void WorkerEngine::ClearCachesBasedOnOldest(transaction_id_t oldest_active) { - // Take care to handle concurrent calls to this correctly. Try to update the - // oldest_active_, and only if successful (nobody else did it concurrently), - // clear caches between the previous oldest (now expired) and new oldest - // (possibly still alive). - auto previous_oldest = oldest_active_.load(); - while ( - !oldest_active_.compare_exchange_strong(previous_oldest, oldest_active)) { - ; - } - for (tx::transaction_id_t expired = previous_oldest; expired < oldest_active; - ++expired) { - ClearCache(expired); +void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot, + transaction_id_t alternative) { + if (snapshot.empty()) { + oldest_active_.store(std::max(alternative, oldest_active_.load())); + } else { + oldest_active_.store(snapshot.front()); } } } // namespace tx diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index 802421b6a..0bda358db 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -1,8 +1,6 @@ #pragma once #include -#include -#include #include "communication/rpc/client_pool.hpp" #include "data_structures/concurrent/concurrent_map.hpp" @@ -10,7 +8,6 @@ #include "transactions/commit_log.hpp" #include "transactions/engine.hpp" #include "transactions/transaction.hpp" -#include "utils/scheduler.hpp" namespace tx { @@ -37,12 +34,17 @@ class WorkerEngine : public Engine { transaction_id_t LocalLast() const override; void LocalForEachActiveTransaction( std::function f) override; + transaction_id_t LocalOldestActive() const override; Transaction *RunningTransaction(transaction_id_t tx_id) override; // Caches the transaction for the given info an returs a ptr to it. Transaction *RunningTransaction(transaction_id_t tx_id, const Snapshot &snapshot); + /// Clears the cache of local transactions that have expired. The signature of + /// this method is dictated by `distributed::TransactionalCacheCleaner`. + void ClearTransactionalCache(transaction_id_t oldest_active) const; + private: // Local caches. mutable ConcurrentMap active_; @@ -53,14 +55,16 @@ class WorkerEngine : public Engine { // Communication to the transactional master. mutable communication::rpc::ClientPool rpc_client_pool_; - // Removes (destructs) a Transaction that's expired. If there is no cached - // transacton for the given id, nothing is done. - void ClearCache(transaction_id_t tx_id) const; - // Used for clearing of caches of transactions that have expired. // Initialize the oldest_active_ with 1 because there's never a tx with id=0 std::atomic oldest_active_{1}; - void ClearCachesBasedOnOldest(transaction_id_t oldest_active); - Scheduler cache_clearing_scheduler_; + + // Removes a single transaction from the cache, if present. + void ClearSingleTransaction(transaction_id_t tx_Id) const; + + // Updates the oldest active transaction to the one from the snapshot. If the + // snapshot is empty, it's set to the given alternative. + void UpdateOldestActive(const Snapshot &snapshot, + transaction_id_t alternative); }; } // namespace tx diff --git a/src/transactions/tx_end_listener.hpp b/src/transactions/tx_end_listener.hpp deleted file mode 100644 index efdcae1bf..000000000 --- a/src/transactions/tx_end_listener.hpp +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include - -#include "transactions/engine.hpp" -#include "transactions/type.hpp" - -namespace tx { - -/** - * Wrapper around the given function that registers itself with the given - * transaction engine. Ensures that the function gets called when a transaction - * has ended in the engine. - * - * Also ensures that the listener gets unregistered from the engine upon - * destruction. Intended usage is: just create a TxEndListener and ensure it - * gets destructed before the function it wraps becomes invalid. - * - * There are no guarantees that the listener will be called only once for the - * given transaction id. - */ -class TxEndListener { - public: - TxEndListener(Engine &engine, std::function function) - : engine_(engine), function_(std::move(function)) { - engine_.Register(this); - } - - ~TxEndListener() { engine_.Unregister(this); } - - void operator()(transaction_id_t tx_id) const { function_(tx_id); } - - private: - Engine &engine_; - std::function function_; -}; -} // namespace tx diff --git a/src/utils/thread.hpp b/src/utils/thread.hpp index 8ca161ebe..cd3e50e6a 100644 --- a/src/utils/thread.hpp +++ b/src/utils/thread.hpp @@ -13,6 +13,7 @@ namespace utils { * Beware, the name length limit is 16 characters! */ inline void ThreadSetName(const std::string &name) { + CHECK(name.size() <= 16) << "Thread name '" << name << "'too long"; LOG_IF(WARNING, prctl(PR_SET_NAME, name.c_str()) != 0) << "Couldn't set thread name: " << name << "!"; } diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index 7311cf9ad..95d9f0b7c 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -10,7 +10,6 @@ #include "transactions/engine_master.hpp" #include "transactions/engine_rpc_messages.hpp" #include "transactions/engine_worker.hpp" -#include "transactions/tx_end_listener.hpp" using namespace tx; using namespace communication::rpc; @@ -126,23 +125,3 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { [&local](Transaction &t) { local.insert(t.id_); }); EXPECT_EQ(local, std::unordered_set({1, 4})); } - -TEST_F(WorkerEngineTest, TxEndListener) { - std::atomic has_expired{0}; - TxEndListener worker_end_listner{ - worker_, [&has_expired](transaction_id_t tid) { - std::cout << "asdasdadas: " << tid << std::endl; - ++has_expired; - }}; - - auto sleep_period = - WorkerEngine::kCacheReleasePeriod + std::chrono::milliseconds(200); - auto t1 = master_.Begin(); - auto t2 = master_.Begin(); - std::this_thread::sleep_for(sleep_period); - EXPECT_EQ(has_expired.load(), 0); - master_.Commit(*t1); - master_.Abort(*t2); - std::this_thread::sleep_for(sleep_period); - EXPECT_EQ(has_expired.load(), 2); -} diff --git a/tests/unit/transaction_engine_single_node.cpp b/tests/unit/transaction_engine_single_node.cpp index b8ff3fee5..7061fc0a0 100644 --- a/tests/unit/transaction_engine_single_node.cpp +++ b/tests/unit/transaction_engine_single_node.cpp @@ -6,7 +6,6 @@ #include "data_structures/concurrent/concurrent_set.hpp" #include "transactions/engine_single_node.hpp" #include "transactions/transaction.hpp" -#include "transactions/tx_end_listener.hpp" using namespace tx; @@ -75,22 +74,3 @@ TEST(Engine, RunningTransaction) { EXPECT_NE(t1, engine.RunningTransaction(t0->id_)); EXPECT_EQ(t1, engine.RunningTransaction(t1->id_)); } - -TEST(Engine, TxEndListener) { - SingleNodeEngine engine; - int count = 0; - { - TxEndListener listener{engine, [&count](auto) { count++; }}; - EXPECT_EQ(count, 0); - auto t1 = engine.Begin(); - EXPECT_EQ(count, 0); - auto t2 = engine.Begin(); - engine.Abort(*t1); - EXPECT_EQ(count, 1); - engine.Commit(*t2); - EXPECT_EQ(count, 2); - } - auto t3 = engine.Begin(); - engine.Commit(*t3); - EXPECT_EQ(count, 2); -}