Refactor distributed transactional cache GC

Summary:
Release of per-transaction data in distributed Memgraph refactored. The
master node no longer releases each time a transaction is done, thus
offloading some work from the engine.

Reviewers: dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1235
This commit is contained in:
florijan 2018-02-26 14:56:23 +01:00
parent 7b262599e6
commit bb62f463f8
16 changed files with 218 additions and 291 deletions

View File

@ -49,7 +49,6 @@ set(memgraph_src_files
storage/record_accessor.cpp
storage/vertex_accessor.cpp
threading/thread.cpp
transactions/engine.cpp
transactions/engine_master.cpp
transactions/engine_single_node.cpp
transactions/engine_worker.cpp

View File

@ -14,6 +14,7 @@
#include "distributed/remote_pull_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_server.hpp"
#include "distributed/transactional_cache_cleaner.hpp"
#include "durability/paths.hpp"
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
@ -145,7 +146,11 @@ class SingleNode : public PrivateBase {
class Master : public PrivateBase {
public:
explicit Master(const Config &config) : PrivateBase(config) {}
explicit Master(const Config &config) : PrivateBase(config) {
cache_cleaner_.Register(remote_updates_server_);
cache_cleaner_.Register(remote_data_manager_);
}
GraphDb::Type type() const override {
return GraphDb::Type::DISTRIBUTED_MASTER;
}
@ -180,17 +185,20 @@ class Master : public PrivateBase {
distributed::PlanDispatcher plan_dispatcher_{coordination_};
distributed::RemotePullRpcClients remote_pull_clients_{coordination_};
distributed::RpcWorkerClients index_rpc_clients_{coordination_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_,
server_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteDataManager remote_data_manager_{tx_engine_,
remote_data_clients_};
distributed::RemoteDataManager remote_data_manager_{remote_data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
};
class Worker : public PrivateBase {
public:
explicit Worker(const Config &config) : PrivateBase(config) {
coordination_.RegisterWorker(config.worker_id);
cache_cleaner_.Register(tx_engine_);
cache_cleaner_.Register(remote_produce_server_);
cache_cleaner_.Register(remote_updates_server_);
cache_cleaner_.Register(remote_data_manager_);
}
GraphDb::Type type() const override {
@ -224,11 +232,10 @@ class Worker : public PrivateBase {
distributed::RemoteProduceRpcServer remote_produce_server_{
*this, tx_engine_, server_, plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, server_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_,
server_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteDataManager remote_data_manager_{tx_engine_,
remote_data_clients_};
distributed::RemoteDataManager remote_data_manager_{remote_data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
};
#undef IMPL_GETTERS
@ -244,8 +251,7 @@ PublicBase::PublicBase(std::unique_ptr<PrivateBase> impl)
impl_->wal().Enable();
snapshot_creator_ = std::make_unique<Scheduler>();
snapshot_creator_->Run(
"Snapshot",
std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
"Snapshot", std::chrono::seconds(impl_->config_.snapshot_cycle_sec),
[this] { MakeSnapshot(); });
}
}

View File

@ -1,13 +1,10 @@
#pragma once
#include <mutex>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "distributed/remote_cache.hpp"
#include "distributed/remote_data_rpc_clients.hpp"
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/tx_end_listener.hpp"
#include "transactions/type.hpp"
namespace distributed {
@ -17,17 +14,19 @@ class RemoteDataManager {
// Helper, gets or inserts a data cache for the given transaction.
template <typename TCollection>
auto &GetCache(TCollection &collection, tx::transaction_id_t tx_id) {
std::lock_guard<SpinLock> guard{lock_};
auto found = collection.find(tx_id);
if (found != collection.end()) return found->second;
auto access = collection.access();
auto found = access.find(tx_id);
if (found != access.end()) return found->second;
return collection.emplace(tx_id, remote_data_clients_).first->second;
return access
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(remote_data_clients_)))
.first->second;
}
public:
RemoteDataManager(tx::Engine &tx_engine,
distributed::RemoteDataRpcClients &remote_data_clients)
: remote_data_clients_(remote_data_clients), tx_engine_(tx_engine) {}
RemoteDataManager(distributed::RemoteDataRpcClients &remote_data_clients)
: remote_data_clients_(remote_data_clients) {}
/// Gets or creates the remote vertex cache for the given transaction.
auto &Vertices(tx::transaction_id_t tx_id) {
@ -43,33 +42,33 @@ class RemoteDataManager {
template <typename TRecord>
auto &Elements(tx::transaction_id_t tx_id);
/// Calls RemoteCache::ClearCache on vertex and edge caches.
void ClearCaches(tx::transaction_id_t tx_id) {
/// Removes all the caches for a single transaction.
void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id) {
Vertices(tx_id).ClearCache();
Edges(tx_id).ClearCache();
}
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::CacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active) {
auto vertex_access = vertices_caches_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
vertex_access.remove(kv.first);
}
}
auto edge_access = edges_caches_.access();
for (auto &kv : edge_access) {
if (kv.first < oldest_active) {
edge_access.remove(kv.first);
}
}
}
private:
RemoteDataRpcClients &remote_data_clients_;
SpinLock lock_;
std::unordered_map<tx::transaction_id_t, RemoteCache<Vertex>>
vertices_caches_;
std::unordered_map<tx::transaction_id_t, RemoteCache<Edge>> edges_caches_;
tx::Engine &tx_engine_;
tx::TxEndListener tx_end_listener_{
tx_engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }};
// Clears the caches for the given transaction ID.
void ClearCache(tx::transaction_id_t tx_id) {
std::lock_guard<SpinLock> guard{lock_};
auto remove = [tx_id](auto &map) {
auto found = map.find(tx_id);
if (found != map.end()) map.erase(found);
};
remove(vertices_caches_);
remove(edges_caches_);
}
ConcurrentMap<tx::transaction_id_t, RemoteCache<Vertex>> vertices_caches_;
ConcurrentMap<tx::transaction_id_t, RemoteCache<Edge>> edges_caches_;
};
template <>

View File

@ -1,12 +1,11 @@
#pragma once
#include <cstdint>
#include <map>
#include <mutex>
#include <utility>
#include <vector>
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/plan_consumer.hpp"
@ -22,7 +21,6 @@
#include "query/typed_value.hpp"
#include "transactions/engine.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/tx_end_listener.hpp"
#include "transactions/type.hpp"
namespace distributed {
@ -147,40 +145,34 @@ class RemoteProduceRpcServer {
remote_produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
[this](const TransactionCommandAdvancedReq &req) {
tx_engine_.UpdateCommand(req.member);
db_.remote_data_manager().ClearCaches(req.member);
db_.remote_data_manager().ClearCacheForSingleTransaction(req.member);
return std::make_unique<TransactionCommandAdvancedRes>();
});
}
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active) {
auto access = ongoing_produces_.access();
for (auto &kv : access) {
if (kv.first.first < oldest_active) {
access.remove(kv.first);
}
}
}
private:
database::GraphDb &db_;
communication::rpc::Server &remote_produce_rpc_server_;
const distributed::PlanConsumer &plan_consumer_;
std::map<std::pair<tx::transaction_id_t, int64_t>, OngoingProduce>
ConcurrentMap<std::pair<tx::transaction_id_t, int64_t>, OngoingProduce>
ongoing_produces_;
std::mutex ongoing_produces_lock_;
tx::Engine &tx_engine_;
tx::TxEndListener tx_end_listener_{
tx_engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }};
// Removes all onging pulls for the given tx_id (that transaction expired).
void ClearCache(tx::transaction_id_t tx_id) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) {
if (it->first.first == tx_id) {
it = ongoing_produces_.erase(it);
} else {
++it;
}
}
}
auto &GetOngoingProduce(const RemotePullReq &req) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
auto found = ongoing_produces_.find({req.tx_id, req.plan_id});
if (found != ongoing_produces_.end()) {
auto access = ongoing_produces_.access();
auto found = access.find({req.tx_id, req.plan_id});
if (found != access.end()) {
return found->second;
}
if (db_.type() == database::GraphDb::Type::DISTRIBUTED_WORKER) {
@ -189,9 +181,9 @@ class RemoteProduceRpcServer {
.RunningTransaction(req.tx_id, req.tx_snapshot);
}
auto &plan_pack = plan_consumer_.PlanForId(req.plan_id);
return ongoing_produces_
.emplace(std::piecewise_construct,
std::forward_as_tuple(req.tx_id, req.plan_id),
auto key_par = std::make_pair(req.tx_id, req.tx_id);
return access
.emplace(key_par, std::forward_as_tuple(key_par),
std::forward_as_tuple(db_, req.tx_id, plan_pack.plan,
plan_pack.symbol_table, req.params,
req.symbols))

View File

@ -21,7 +21,6 @@
#include "storage/vertex_accessor.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/tx_end_listener.hpp"
#include "transactions/type.hpp"
namespace distributed {
@ -186,10 +185,10 @@ class RemoteUpdatesRpcServer {
};
public:
RemoteUpdatesRpcServer(database::GraphDb &db, tx::Engine &engine,
RemoteUpdatesRpcServer(database::GraphDb &db,
communication::rpc::Server &server)
: db_(db), engine_(engine), server_(server) {
server_.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) {
: db_(db) {
server.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) {
using DeltaType = database::StateDelta::Type;
auto &delta = req.member;
switch (delta.type) {
@ -207,19 +206,19 @@ class RemoteUpdatesRpcServer {
}
});
server_.Register<RemoteUpdateApplyRpc>(
server.Register<RemoteUpdateApplyRpc>(
[this](const RemoteUpdateApplyReq &req) {
return std::make_unique<RemoteUpdateApplyRes>(Apply(req.member));
});
server_.Register<RemoteCreateVertexRpc>(
server.Register<RemoteCreateVertexRpc>(
[this](const RemoteCreateVertexReq &req) {
return std::make_unique<RemoteCreateVertexRes>(
GetUpdates(vertex_updates_, req.member.tx_id)
.CreateVertex(req.member.labels, req.member.properties));
});
server_.Register<RemoteCreateEdgeRpc>(
server.Register<RemoteCreateEdgeRpc>(
[this](const RemoteCreateEdgeReq &req) {
auto data = req.member;
auto creation_result = CreateEdge(data);
@ -238,7 +237,7 @@ class RemoteUpdatesRpcServer {
return std::make_unique<RemoteCreateEdgeRes>(creation_result);
});
server_.Register<RemoteAddInEdgeRpc>([this](const RemoteAddInEdgeReq &req) {
server.Register<RemoteAddInEdgeRpc>([this](const RemoteAddInEdgeReq &req) {
auto to_delta = database::StateDelta::AddInEdge(
req.member.tx_id, req.member.to, req.member.from,
req.member.edge_address, req.member.edge_type);
@ -270,15 +269,26 @@ class RemoteUpdatesRpcServer {
return RemoteUpdateResult::DONE;
}
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::CacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active) {
auto vertex_access = vertex_updates_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
vertex_access.remove(kv.first);
}
}
auto edge_access = edge_updates_.access();
for (auto &kv : edge_access) {
if (kv.first < oldest_active) {
edge_access.remove(kv.first);
}
}
}
private:
database::GraphDb &db_;
tx::Engine &engine_;
communication::rpc::Server &server_;
tx::TxEndListener tx_end_listener_{engine_,
[this](tx::transaction_id_t tx_id) {
vertex_updates_.access().remove(tx_id);
edge_updates_.access().remove(tx_id);
}};
template <typename TAccessor>
using MapT =
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<TAccessor>>;

View File

@ -0,0 +1,45 @@
#pragma once
#include <functional>
#include <vector>
#include "transactions/engine.hpp"
#include "utils/scheduler.hpp"
namespace distributed {
/// Periodically calls `ClearCache(oldest_transaction)` on all registered
/// functions.
class TransactionalCacheCleaner {
/// The wait time between two releases of local transaction objects that have
/// expired on the master.
static constexpr std::chrono::seconds kCacheReleasePeriod{1};
public:
TransactionalCacheCleaner(tx::Engine &tx_engine) : tx_engine_(tx_engine) {
cache_clearing_scheduler_.Run(
"DistrTxCacheGc", kCacheReleasePeriod, [this]() {
auto oldest_active = tx_engine_.LocalOldestActive();
for (auto &f : functions_) f(oldest_active);
});
}
/// Registers the given object for transactional cleaning. The object will
/// periodically get it's `ClearCache(tx::transaction_id_t)` method called
/// with the oldest active transaction id. Note that the ONLY guarantee for
/// the call param is that there are no transactions alive that have an id
/// lower than it.
template <typename TCache>
void Register(TCache &cache) {
functions_.emplace_back([&cache](tx::transaction_id_t oldest_active) {
cache.ClearTransactionalCache(oldest_active);
});
}
private:
tx::Engine &tx_engine_;
std::vector<std::function<void(tx::transaction_id_t &oldest_active)>>
functions_;
Scheduler cache_clearing_scheduler_;
};
} // namespace distributed

View File

@ -1,29 +0,0 @@
#include <algorithm>
#include <mutex>
#include "glog/logging.h"
#include "transactions/engine.hpp"
#include "transactions/tx_end_listener.hpp"
namespace tx {
void Engine::Register(TxEndListener *listener) {
std::lock_guard<SpinLock> guard{end_listeners_lock_};
end_listeners_.emplace_back(listener);
}
void Engine::Unregister(TxEndListener *listener) {
std::lock_guard<SpinLock> guard{end_listeners_lock_};
auto found =
std::find(end_listeners_.begin(), end_listeners_.end(), listener);
CHECK(found != end_listeners_.end())
<< "Failed to find listener to unregister";
end_listeners_.erase(found);
}
void Engine::NotifyListeners(transaction_id_t tx_id) const {
std::lock_guard<SpinLock> guard{end_listeners_lock_};
for (auto *listener : end_listeners_) listener->operator()(tx_id);
}
} // namespace tx

View File

@ -11,7 +11,6 @@
#include "transactions/type.hpp"
namespace tx {
class TxEndListener;
/**
* Database transaction engine. Used for managing transactions and the related
* information such as transaction snapshots and the transaction state info.
@ -25,8 +24,6 @@ class TxEndListener;
* determined by the users of a particular method.
*/
class Engine {
friend class TxEndListener;
public:
virtual ~Engine() = default;
@ -58,7 +55,7 @@ class Engine {
* last.
*
* The idea is that data records can only be deleted if they were expired (and
* that was committed) by a transaction older then the older currently active.
* that was committed) by a transaction older than the older currently active.
* We need the full snapshot to prevent overlaps (see general GC
* documentation).
*
@ -74,6 +71,11 @@ class Engine {
/** Returns the ID of last locally known transaction. */
virtual tx::transaction_id_t LocalLast() const = 0;
/** Returns the ID of the oldest transaction locally known to be active. It is
* guaranteed that all the transactions older than the returned are globally
* not active. */
virtual transaction_id_t LocalOldestActive() const = 0;
/** Calls function f on each locally active transaction. */
virtual void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) = 0;
@ -89,19 +91,5 @@ class Engine {
// tx_that_holds_lock). Used for local deadlock resolution.
// TODO consider global deadlock resolution.
ConcurrentMap<transaction_id_t, transaction_id_t> local_lock_graph_;
// Transaction end listeners and the lock for protecting that datastructure.
std::vector<TxEndListener *> end_listeners_;
mutable SpinLock end_listeners_lock_;
/** Register a transaction end listener with this engine. */
void Register(TxEndListener *listener);
/** Unregister a transaction end listener with this engine. */
void Unregister(TxEndListener *listener);
protected:
/** Notifies all registered listeners that a transaction has ended. */
void NotifyListeners(transaction_id_t tx_id) const;
};
} // namespace tx

View File

@ -50,31 +50,23 @@ command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) {
}
void SingleNodeEngine::Commit(const Transaction &t) {
auto tx_id = t.id_;
{
std::lock_guard<SpinLock> guard(lock_);
clog_.set_committed(tx_id);
active_.remove(tx_id);
if (wal_) {
wal_->Emplace(database::StateDelta::TxCommit(tx_id));
}
store_.erase(store_.find(tx_id));
std::lock_guard<SpinLock> guard(lock_);
clog_.set_committed(t.id_);
active_.remove(t.id_);
if (wal_) {
wal_->Emplace(database::StateDelta::TxCommit(t.id_));
}
NotifyListeners(tx_id);
store_.erase(store_.find(t.id_));
}
void SingleNodeEngine::Abort(const Transaction &t) {
auto tx_id = t.id_;
{
std::lock_guard<SpinLock> guard(lock_);
clog_.set_aborted(tx_id);
active_.remove(tx_id);
if (wal_) {
wal_->Emplace(database::StateDelta::TxAbort(tx_id));
}
store_.erase(store_.find(tx_id));
std::lock_guard<SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
active_.remove(t.id_);
if (wal_) {
wal_->Emplace(database::StateDelta::TxAbort(t.id_));
}
NotifyListeners(tx_id);
store_.erase(store_.find(t.id_));
}
CommitLog::Info SingleNodeEngine::Info(transaction_id_t tx) const {
@ -103,8 +95,11 @@ Snapshot SingleNodeEngine::GlobalActiveTransactions() {
return active_transactions;
}
tx::transaction_id_t SingleNodeEngine::LocalLast() const {
return counter_.load();
transaction_id_t SingleNodeEngine::LocalLast() const { return counter_.load(); }
transaction_id_t SingleNodeEngine::LocalOldestActive() const {
std::lock_guard<SpinLock> guard(lock_);
return active_.empty() ? counter_ + 1 : active_.front();
}
void SingleNodeEngine::LocalForEachActiveTransaction(
@ -115,8 +110,7 @@ void SingleNodeEngine::LocalForEachActiveTransaction(
}
}
tx::Transaction *SingleNodeEngine::RunningTransaction(
tx::transaction_id_t tx_id) {
Transaction *SingleNodeEngine::RunningTransaction(transaction_id_t tx_id) {
std::lock_guard<SpinLock> guard(lock_);
auto found = store_.find(tx_id);
CHECK(found != store_.end())

View File

@ -38,6 +38,7 @@ class SingleNodeEngine : public Engine {
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;
tx::transaction_id_t LocalLast() const override;
transaction_id_t LocalOldestActive() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) override;
@ -47,7 +48,7 @@ class SingleNodeEngine : public Engine {
CommitLog clog_;
std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_;
Snapshot active_;
SpinLock lock_;
mutable SpinLock lock_;
// Optional. If present, the Engine will write tx Begin/Commit/Abort
// atomically (while under lock).
durability::WriteAheadLog *wal_{nullptr};

View File

@ -9,24 +9,7 @@
namespace tx {
WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint)
: rpc_client_pool_(endpoint) {
cache_clearing_scheduler_.Run(
"TX cache clear", kCacheReleasePeriod, [this]() {
// Use the GC snapshot as it always has at least one member.
auto res = rpc_client_pool_.Call<GcSnapshotRpc>();
// There is a race-condition between this scheduled call and worker
// shutdown. It is possible that the worker has responded to the master
// it is shutting down, and the master is shutting down (and can't
// responde to RPCs). At the same time this call gets scheduled, so we
// get a failed RPC.
if (!res) {
LOG(WARNING) << "Transaction cache GC RPC call failed";
} else {
CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot";
ClearCachesBasedOnOldest(res->member.front());
}
});
}
: rpc_client_pool_(endpoint) {}
WorkerEngine::~WorkerEngine() {
for (auto &kv : active_.access()) {
@ -35,10 +18,10 @@ WorkerEngine::~WorkerEngine() {
}
Transaction *WorkerEngine::Begin() {
auto res = rpc_client_pool_.Call<BeginRpc>();
Transaction *tx =
new Transaction(res->member.tx_id, res->member.snapshot, *this);
auto insertion = active_.access().insert(res->member.tx_id, tx);
auto data = rpc_client_pool_.Call<BeginRpc>()->member;
UpdateOldestActive(data.snapshot, data.tx_id);
Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this);
auto insertion = active_.access().insert(data.tx_id, tx);
CHECK(insertion.second) << "Failed to start creation from worker";
return tx;
}
@ -73,12 +56,12 @@ command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) {
void WorkerEngine::Commit(const Transaction &t) {
auto res = rpc_client_pool_.Call<CommitRpc>(t.id_);
ClearCache(t.id_);
ClearSingleTransaction(t.id_);
}
void WorkerEngine::Abort(const Transaction &t) {
auto res = rpc_client_pool_.Call<AbortRpc>(t.id_);
ClearCache(t.id_);
ClearSingleTransaction(t.id_);
}
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
@ -92,7 +75,7 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
if (!info.is_active()) {
if (info.is_committed()) clog_.set_committed(tid);
if (info.is_aborted()) clog_.set_aborted(tid);
ClearCache(tid);
ClearSingleTransaction(tid);
}
}
@ -100,11 +83,16 @@ CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
}
Snapshot WorkerEngine::GlobalGcSnapshot() {
return std::move(rpc_client_pool_.Call<GcSnapshotRpc>()->member);
auto snapshot = std::move(rpc_client_pool_.Call<GcSnapshotRpc>()->member);
UpdateOldestActive(snapshot, local_last_.load());
return snapshot;
}
Snapshot WorkerEngine::GlobalActiveTransactions() {
return std::move(rpc_client_pool_.Call<ActiveTransactionsRpc>()->member);
auto snapshot =
std::move(rpc_client_pool_.Call<ActiveTransactionsRpc>()->member);
UpdateOldestActive(snapshot, local_last_.load());
return snapshot;
}
transaction_id_t WorkerEngine::LocalLast() const { return local_last_; }
@ -114,13 +102,17 @@ void WorkerEngine::LocalForEachActiveTransaction(
for (auto pair : active_.access()) f(*pair.second);
}
transaction_id_t WorkerEngine::LocalOldestActive() const {
return oldest_active_;
}
Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
Snapshot snapshot(
std::move(rpc_client_pool_.Call<SnapshotRpc>(tx_id)->member));
auto snapshot = std::move(rpc_client_pool_.Call<SnapshotRpc>(tx_id)->member);
UpdateOldestActive(snapshot, local_last_.load());
return RunningTransaction(tx_id, snapshot);
}
@ -137,29 +129,32 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id,
return insertion.first->second;
}
void WorkerEngine::ClearCache(transaction_id_t tx_id) const {
void WorkerEngine::ClearTransactionalCache(
transaction_id_t oldest_active) const {
auto access = active_.access();
for (auto kv : access) {
if (kv.first < oldest_active) {
delete kv.second;
access.remove(kv.first);
}
}
}
void WorkerEngine::ClearSingleTransaction(transaction_id_t tx_id) const {
auto access = active_.access();
auto found = access.find(tx_id);
if (found != access.end()) {
delete found->second;
access.remove(tx_id);
access.remove(found->first);
}
NotifyListeners(tx_id);
}
void WorkerEngine::ClearCachesBasedOnOldest(transaction_id_t oldest_active) {
// Take care to handle concurrent calls to this correctly. Try to update the
// oldest_active_, and only if successful (nobody else did it concurrently),
// clear caches between the previous oldest (now expired) and new oldest
// (possibly still alive).
auto previous_oldest = oldest_active_.load();
while (
!oldest_active_.compare_exchange_strong(previous_oldest, oldest_active)) {
;
}
for (tx::transaction_id_t expired = previous_oldest; expired < oldest_active;
++expired) {
ClearCache(expired);
void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
transaction_id_t alternative) {
if (snapshot.empty()) {
oldest_active_.store(std::max(alternative, oldest_active_.load()));
} else {
oldest_active_.store(snapshot.front());
}
}
} // namespace tx

View File

@ -1,8 +1,6 @@
#pragma once
#include <atomic>
#include <chrono>
#include <mutex>
#include "communication/rpc/client_pool.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
@ -10,7 +8,6 @@
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "utils/scheduler.hpp"
namespace tx {
@ -37,12 +34,17 @@ class WorkerEngine : public Engine {
transaction_id_t LocalLast() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
transaction_id_t LocalOldestActive() const override;
Transaction *RunningTransaction(transaction_id_t tx_id) override;
// Caches the transaction for the given info an returs a ptr to it.
Transaction *RunningTransaction(transaction_id_t tx_id,
const Snapshot &snapshot);
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(transaction_id_t oldest_active) const;
private:
// Local caches.
mutable ConcurrentMap<transaction_id_t, Transaction *> active_;
@ -53,14 +55,16 @@ class WorkerEngine : public Engine {
// Communication to the transactional master.
mutable communication::rpc::ClientPool rpc_client_pool_;
// Removes (destructs) a Transaction that's expired. If there is no cached
// transacton for the given id, nothing is done.
void ClearCache(transaction_id_t tx_id) const;
// Used for clearing of caches of transactions that have expired.
// Initialize the oldest_active_ with 1 because there's never a tx with id=0
std::atomic<transaction_id_t> oldest_active_{1};
void ClearCachesBasedOnOldest(transaction_id_t oldest_active);
Scheduler cache_clearing_scheduler_;
// Removes a single transaction from the cache, if present.
void ClearSingleTransaction(transaction_id_t tx_Id) const;
// Updates the oldest active transaction to the one from the snapshot. If the
// snapshot is empty, it's set to the given alternative.
void UpdateOldestActive(const Snapshot &snapshot,
transaction_id_t alternative);
};
} // namespace tx

View File

@ -1,37 +0,0 @@
#pragma once
#include <functional>
#include "transactions/engine.hpp"
#include "transactions/type.hpp"
namespace tx {
/**
* Wrapper around the given function that registers itself with the given
* transaction engine. Ensures that the function gets called when a transaction
* has ended in the engine.
*
* Also ensures that the listener gets unregistered from the engine upon
* destruction. Intended usage is: just create a TxEndListener and ensure it
* gets destructed before the function it wraps becomes invalid.
*
* There are no guarantees that the listener will be called only once for the
* given transaction id.
*/
class TxEndListener {
public:
TxEndListener(Engine &engine, std::function<void(transaction_id_t)> function)
: engine_(engine), function_(std::move(function)) {
engine_.Register(this);
}
~TxEndListener() { engine_.Unregister(this); }
void operator()(transaction_id_t tx_id) const { function_(tx_id); }
private:
Engine &engine_;
std::function<void(transaction_id_t)> function_;
};
} // namespace tx

View File

@ -13,6 +13,7 @@ namespace utils {
* Beware, the name length limit is 16 characters!
*/
inline void ThreadSetName(const std::string &name) {
CHECK(name.size() <= 16) << "Thread name '" << name << "'too long";
LOG_IF(WARNING, prctl(PR_SET_NAME, name.c_str()) != 0)
<< "Couldn't set thread name: " << name << "!";
}

View File

@ -10,7 +10,6 @@
#include "transactions/engine_master.hpp"
#include "transactions/engine_rpc_messages.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/tx_end_listener.hpp"
using namespace tx;
using namespace communication::rpc;
@ -126,23 +125,3 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) {
[&local](Transaction &t) { local.insert(t.id_); });
EXPECT_EQ(local, std::unordered_set<tx::transaction_id_t>({1, 4}));
}
TEST_F(WorkerEngineTest, TxEndListener) {
std::atomic<int> has_expired{0};
TxEndListener worker_end_listner{
worker_, [&has_expired](transaction_id_t tid) {
std::cout << "asdasdadas: " << tid << std::endl;
++has_expired;
}};
auto sleep_period =
WorkerEngine::kCacheReleasePeriod + std::chrono::milliseconds(200);
auto t1 = master_.Begin();
auto t2 = master_.Begin();
std::this_thread::sleep_for(sleep_period);
EXPECT_EQ(has_expired.load(), 0);
master_.Commit(*t1);
master_.Abort(*t2);
std::this_thread::sleep_for(sleep_period);
EXPECT_EQ(has_expired.load(), 2);
}

View File

@ -6,7 +6,6 @@
#include "data_structures/concurrent/concurrent_set.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/transaction.hpp"
#include "transactions/tx_end_listener.hpp"
using namespace tx;
@ -75,22 +74,3 @@ TEST(Engine, RunningTransaction) {
EXPECT_NE(t1, engine.RunningTransaction(t0->id_));
EXPECT_EQ(t1, engine.RunningTransaction(t1->id_));
}
TEST(Engine, TxEndListener) {
SingleNodeEngine engine;
int count = 0;
{
TxEndListener listener{engine, [&count](auto) { count++; }};
EXPECT_EQ(count, 0);
auto t1 = engine.Begin();
EXPECT_EQ(count, 0);
auto t2 = engine.Begin();
engine.Abort(*t1);
EXPECT_EQ(count, 1);
engine.Commit(*t2);
EXPECT_EQ(count, 2);
}
auto t3 = engine.Begin();
engine.Commit(*t3);
EXPECT_EQ(count, 2);
}