Cleanup per-transaction caches in distributed

Summary:
On the master cleanups are hooked directly into the transaction engine.
This is beneficial because the master might have bigger caches and we
want to clear them as soon as possible.

On the workers there is a periodic RPC call to the master about living
transactions, which takes care of releasing local caches. This is
suboptimal because long transactions will prevent cache GC (like with
data GC). It is however fairly simple.

Note that all cleanup is not done automatically and `RemotePull` has
been reduced accordingly. @msantl, please verify correctness and
consider if the code can be additionally simplified.

Reviewers: teon.banek, msantl

Reviewed By: msantl

Subscribers: pullbot, msantl

Differential Revision: https://phabricator.memgraph.io/D1202
This commit is contained in:
florijan 2018-02-15 10:47:50 +01:00
parent 84e7aec27b
commit 9721ccf61c
24 changed files with 275 additions and 147 deletions

View File

@ -49,6 +49,7 @@ set(memgraph_src_files
storage/record_accessor.cpp
storage/vertex_accessor.cpp
threading/thread.cpp
transactions/engine.cpp
transactions/engine_master.cpp
transactions/engine_single_node.cpp
transactions/engine_worker.cpp

View File

@ -70,8 +70,6 @@ BOOST_CLASS_EXPORT(distributed::ConsumePlanRes);
// Remote pull.
BOOST_CLASS_EXPORT(distributed::RemotePullReq);
BOOST_CLASS_EXPORT(distributed::RemotePullRes);
BOOST_CLASS_EXPORT(distributed::EndRemotePullReq);
BOOST_CLASS_EXPORT(distributed::EndRemotePullRes);
BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedReq);
BOOST_CLASS_EXPORT(distributed::TransactionCommandAdvancedRes);
@ -92,5 +90,3 @@ BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateDiscardRes);

View File

@ -169,9 +169,11 @@ class Master : public PrivateBase {
distributed::RemotePullRpcClients remote_pull_clients_{coordination_};
distributed::RpcWorkerClients index_rpc_clients_{coordination_,
distributed::kIndexRpcName};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_,
system_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteDataManager remote_data_manager_{remote_data_clients_};
distributed::RemoteDataManager remote_data_manager_{tx_engine_,
remote_data_clients_};
};
class Worker : public PrivateBase {
@ -200,12 +202,14 @@ class Worker : public PrivateBase {
distributed::RemoteDataRpcServer remote_data_server_{*this, system_};
distributed::RemoteDataRpcClients remote_data_clients_{coordination_};
distributed::PlanConsumer plan_consumer_{system_};
distributed::RemoteProduceRpcServer remote_produce_server_{*this, system_,
plan_consumer_};
distributed::RemoteProduceRpcServer remote_produce_server_{
*this, tx_engine_, system_, plan_consumer_};
distributed::IndexRpcServer index_rpc_server_{*this, system_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, tx_engine_,
system_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteDataManager remote_data_manager_{remote_data_clients_};
distributed::RemoteDataManager remote_data_manager_{tx_engine_,
remote_data_clients_};
};
#undef IMPL_GETTERS

View File

@ -7,6 +7,7 @@
#include "storage/edge.hpp"
#include "storage/vertex.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/tx_end_listener.hpp"
#include "transactions/type.hpp"
namespace distributed {
@ -24,8 +25,9 @@ class RemoteDataManager {
}
public:
RemoteDataManager(distributed::RemoteDataRpcClients &remote_data_clients)
: remote_data_clients_(remote_data_clients) {}
RemoteDataManager(tx::Engine &tx_engine,
distributed::RemoteDataRpcClients &remote_data_clients)
: remote_data_clients_(remote_data_clients), tx_engine_(tx_engine) {}
/// Gets or creates the remote vertex cache for the given transaction.
auto &Vertices(tx::transaction_id_t tx_id) {
@ -53,6 +55,21 @@ class RemoteDataManager {
std::unordered_map<tx::transaction_id_t, RemoteCache<Vertex>>
vertices_caches_;
std::unordered_map<tx::transaction_id_t, RemoteCache<Edge>> edges_caches_;
tx::Engine &tx_engine_;
tx::TxEndListener tx_end_listener_{
tx_engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }};
// Clears the caches for the given transaction ID.
void ClearCache(tx::transaction_id_t tx_id) {
std::lock_guard<SpinLock> guard{lock_};
auto remove = [tx_id](auto &map) {
auto found = map.find(tx_id);
if (found != map.end()) map.erase(found);
};
remove(vertices_caches_);
remove(edges_caches_);
}
};
template <>

View File

@ -12,9 +12,6 @@ namespace distributed {
/** Serves this worker's data to others. */
class RemoteDataRpcServer {
// TODO maybe reuse GraphDbAccessors. It would reduce the load on tx::Engine
// locks (not sure what the gain would be). But have some way of cache
// invalidation.
public:
RemoteDataRpcServer(database::GraphDb &db, communication::rpc::System &system)
: db_(db), rpc_server_(system, kRemoteDataRpcName) {

View File

@ -20,6 +20,8 @@
#include "query/parameters.hpp"
#include "query/plan/operator.hpp"
#include "query/typed_value.hpp"
#include "transactions/engine.hpp"
#include "transactions/tx_end_listener.hpp"
#include "transactions/type.hpp"
namespace distributed {
@ -129,26 +131,18 @@ class RemoteProduceRpcServer {
};
public:
RemoteProduceRpcServer(database::GraphDb &db,
RemoteProduceRpcServer(database::GraphDb &db, tx::Engine &engine,
communication::rpc::System &system,
const distributed::PlanConsumer &plan_consumer)
: db_(db),
remote_produce_rpc_server_(system, kRemotePullProduceRpcName),
plan_consumer_(plan_consumer) {
plan_consumer_(plan_consumer),
engine_(engine) {
remote_produce_rpc_server_.Register<RemotePullRpc>(
[this](const RemotePullReq &req) {
return std::make_unique<RemotePullRes>(RemotePull(req));
});
remote_produce_rpc_server_.Register<EndRemotePullRpc>([this](
const EndRemotePullReq &req) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
auto it = ongoing_produces_.find(req.member);
CHECK(it != ongoing_produces_.end()) << "Failed to find ongoing produce";
ongoing_produces_.erase(it);
return std::make_unique<EndRemotePullRes>();
});
remote_produce_rpc_server_.Register<TransactionCommandAdvancedRpc>(
[this](const TransactionCommandAdvancedReq &req) {
db_.tx_engine().UpdateCommand(req.member);
@ -166,6 +160,22 @@ class RemoteProduceRpcServer {
ongoing_produces_;
std::mutex ongoing_produces_lock_;
tx::Engine &engine_;
tx::TxEndListener tx_end_listener_{
engine_, [this](tx::transaction_id_t tx_id) { ClearCache(tx_id); }};
// Removes all onging pulls for the given tx_id (that transaction expired).
void ClearCache(tx::transaction_id_t tx_id) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) {
if (it->first.first == tx_id) {
it = ongoing_produces_.erase(it);
} else {
++it;
}
}
}
auto &GetOngoingProduce(const RemotePullReq &req) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
auto found = ongoing_produces_.find({req.tx_id, req.plan_id});

View File

@ -364,12 +364,6 @@ using RemotePullRpc =
// optimization not to have to send the full RemotePullReqData pack every
// time.
using EndRemotePullData = std::pair<tx::transaction_id_t, int64_t>;
RPC_SINGLE_MEMBER_MESSAGE(EndRemotePullReq, EndRemotePullData);
RPC_NO_MEMBER_MESSAGE(EndRemotePullRes);
using EndRemotePullRpc =
communication::rpc::RequestResponse<EndRemotePullReq, EndRemotePullRes>;
RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t);
RPC_NO_MEMBER_MESSAGE(TransactionCommandAdvancedRes);
using TransactionCommandAdvancedRpc =

View File

@ -87,30 +87,6 @@ class RemotePullRpcClients {
auto GetWorkerIds() { return clients_.GetWorkerIds(); }
// Notifies a worker that the given transaction/plan is done. Otherwise the
// server is left with potentially unconsumed Cursors that never get deleted.
//
// @todo - this needs to be done with hooks into the transactional
// engine, so that the Worker discards it's stuff when the relevant
// transaction are done.
std::future<void> EndRemotePull(int worker_id, tx::transaction_id_t tx_id,
int64_t plan_id) {
return clients_.ExecuteOnWorker<void>(
worker_id, [tx_id, plan_id](ClientPool &client_pool) {
return client_pool.Call<EndRemotePullRpc>(
EndRemotePullData{tx_id, plan_id});
});
}
void EndAllRemotePulls(tx::transaction_id_t tx_id, int64_t plan_id) {
std::vector<std::future<void>> futures;
for (auto worker_id : clients_.GetWorkerIds()) {
if (worker_id == 0) continue;
futures.emplace_back(EndRemotePull(worker_id, tx_id, plan_id));
}
for (auto &future : futures) future.wait();
}
std::vector<std::future<void>> NotifyAllTransactionCommandAdvanced(
tx::transaction_id_t tx_id) {
return clients_.ExecuteOnWorkers<void>(0, [tx_id](auto &client) {

View File

@ -43,12 +43,6 @@ class RemoteUpdatesRpcClients {
});
}
/// Calls for the worker with the given ID to discard remote updates.
void RemoteUpdateDiscard(int worker_id, tx::transaction_id_t tx_id) {
worker_clients_.GetClientPool(worker_id).Call<RemoteUpdateDiscardRpc>(
tx_id);
}
private:
RpcWorkerClients worker_clients_;
};

View File

@ -26,10 +26,4 @@ RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult);
using RemoteUpdateApplyRpc =
communication::rpc::RequestResponse<RemoteUpdateApplyReq,
RemoteUpdateApplyRes>;
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateDiscardReq, tx::transaction_id_t);
RPC_NO_MEMBER_MESSAGE(RemoteUpdateDiscardRes);
using RemoteUpdateDiscardRpc =
communication::rpc::RequestResponse<RemoteUpdateDiscardReq,
RemoteUpdateDiscardRes>;
} // namespace distributed

View File

@ -19,6 +19,7 @@
#include "storage/vertex_accessor.hpp"
#include "threading/sync/lock_timeout_exception.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/tx_end_listener.hpp"
#include "transactions/type.hpp"
namespace distributed {
@ -121,9 +122,9 @@ class RemoteUpdatesRpcServer {
};
public:
RemoteUpdatesRpcServer(database::GraphDb &db,
RemoteUpdatesRpcServer(database::GraphDb &db, tx::Engine &engine,
communication::rpc::System &system)
: db_(db), server_(system, kRemoteUpdatesRpc) {
: db_(db), engine_(engine), server_(system, kRemoteUpdatesRpc) {
server_.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) {
using DeltaType = database::StateDelta::Type;
switch (req.member.type) {
@ -145,12 +146,6 @@ class RemoteUpdatesRpcServer {
[this](const RemoteUpdateApplyReq &req) {
return std::make_unique<RemoteUpdateApplyRes>(Apply(req.member));
});
server_.Register<RemoteUpdateDiscardRpc>(
[this](const RemoteUpdateDiscardReq &req) {
Discard(req.member);
return std::make_unique<RemoteUpdateDiscardRes>();
});
}
/// Applies all existsing updates for the given transaction ID. If there are
@ -175,19 +170,19 @@ class RemoteUpdatesRpcServer {
return RemoteUpdateResult::DONE;
}
/// Discards all the existing updates for the given transaction ID.
void Discard(tx::transaction_id_t tx_id) {
vertex_updates_.access().remove(tx_id);
edge_updates_.access().remove(tx_id);
}
private:
database::GraphDb &db_;
tx::Engine &engine_;
communication::rpc::Server server_;
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<VertexAccessor>>
vertex_updates_;
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<EdgeAccessor>>
edge_updates_;
tx::TxEndListener tx_end_listener_{engine_,
[this](tx::transaction_id_t tx_id) {
vertex_updates_.access().remove(tx_id);
edge_updates_.access().remove(tx_id);
}};
// Processes a single delta recieved in the RPC request.
template <typename TCollection>

View File

@ -2741,16 +2741,6 @@ PullRemote::PullRemoteCursor::PullRemoteCursor(const PullRemote &self,
worker_ids_.erase(std::find(worker_ids_.begin(), worker_ids_.end(), 0));
}
void PullRemote::PullRemoteCursor::EndRemotePull() {
std::vector<std::future<void>> futures;
for (auto &worker_id : worker_ids_) {
futures.emplace_back(db_.db().remote_pull_clients().EndRemotePull(
worker_id, db_.transaction().id_, self_.plan_id()));
}
for (auto &future : futures) future.wait();
worker_ids_.clear();
}
bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
auto insert_future_for_worker = [&](int worker_id) {
remote_pulls_[worker_id] = db_.db().remote_pull_clients().RemotePull(
@ -2789,22 +2779,17 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
insert_future_for_worker(worker_id);
break;
case distributed::RemotePullState::SERIALIZATION_ERROR:
EndRemotePull();
throw mvcc::SerializationError(
"Serialization error occured during PullRemote !");
case distributed::RemotePullState::LOCK_TIMEOUT_ERROR:
EndRemotePull();
throw LockTimeoutException(
"LockTimeout error occured during PullRemote !");
case distributed::RemotePullState::UPDATE_DELETED_ERROR:
EndRemotePull();
throw QueryRuntimeException(
"RecordDeleted error ocured during PullRemote !");
case distributed::RemotePullState::RECONSTRUCTION_ERROR:
EndRemotePull();
throw query::ReconstructionException();
case distributed::RemotePullState::QUERY_ERROR:
EndRemotePull();
throw QueryRuntimeException(
"Query runtime error occurred duing PullRemote !");
}
@ -2825,11 +2810,8 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
}
if (!have_remote_results) {
// If we didn't find any remote results and there aren't any remote
// pulls, we've exhausted all remote results. Make sure we signal that
// to workers and exit the loop.
if (remote_pulls_.empty()) {
EndRemotePull();
worker_ids_.clear();
break;
}
@ -2868,18 +2850,12 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
if (remote_results_[pull_from_worker_id].empty() &&
remote_pulls_.find(pull_from_worker_id) == remote_pulls_.end()) {
worker_ids_.erase(worker_ids_.begin() + last_pulled_worker_id_index_);
db_.db()
.remote_pull_clients()
.EndRemotePull(pull_from_worker_id, db_.transaction().id_,
self_.plan_id())
.wait();
}
return true;
}
void PullRemote::PullRemoteCursor::Reset() {
EndRemotePull();
throw QueryRuntimeException("Unsupported: Reset during PullRemote!");
}

View File

@ -2309,8 +2309,6 @@ class PullRemote : public LogicalOperator {
void Reset() override;
private:
void EndRemotePull();
const PullRemote &self_;
database::GraphDbAccessor &db_;
const std::unique_ptr<Cursor> input_cursor_;

View File

@ -0,0 +1,29 @@
#include <algorithm>
#include <mutex>
#include "glog/logging.h"
#include "transactions/engine.hpp"
#include "transactions/tx_end_listener.hpp"
namespace tx {
void Engine::Register(TxEndListener *listener) {
std::lock_guard<SpinLock> guard{end_listeners_lock_};
end_listeners_.emplace_back(listener);
}
void Engine::Unregister(TxEndListener *listener) {
std::lock_guard<SpinLock> guard{end_listeners_lock_};
auto found =
std::find(end_listeners_.begin(), end_listeners_.end(), listener);
CHECK(found != end_listeners_.end())
<< "Failed to find listener to unregister";
end_listeners_.erase(found);
}
void Engine::NotifyListeners(transaction_id_t tx_id) const {
std::lock_guard<SpinLock> guard{end_listeners_lock_};
for (auto *listener : end_listeners_) listener->operator()(tx_id);
}
} // namespace tx

View File

@ -1,11 +1,17 @@
#pragma once
#include <algorithm>
#include <mutex>
#include <vector>
#include "data_structures/concurrent/concurrent_map.hpp"
#include "threading/sync/spinlock.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/transaction.hpp"
#include "transactions/type.hpp"
namespace tx {
class TxEndListener;
/**
* Database transaction engine. Used for managing transactions and the related
* information such as transaction snapshots and the transaction state info.
@ -19,6 +25,8 @@ namespace tx {
* determined by the users of a particular method.
*/
class Engine {
friend class TxEndListener;
public:
virtual ~Engine() = default;
@ -74,7 +82,7 @@ class Engine {
std::function<void(Transaction &)> f) = 0;
/** Gets a transaction object for a running transaction. */
virtual tx::Transaction *RunningTransaction(tx::transaction_id_t tx_id) = 0;
virtual tx::Transaction *RunningTransaction(transaction_id_t tx_id) = 0;
auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; }
@ -84,5 +92,19 @@ class Engine {
// tx_that_holds_lock). Used for local deadlock resolution.
// TODO consider global deadlock resolution.
ConcurrentMap<transaction_id_t, transaction_id_t> local_lock_graph_;
// Transaction end listeners and the lock for protecting that datastructure.
std::vector<TxEndListener *> end_listeners_;
mutable SpinLock end_listeners_lock_;
/** Register a transaction end listener with this engine. */
void Register(TxEndListener *listener);
/** Unregister a transaction end listener with this engine. */
void Unregister(TxEndListener *listener);
protected:
/** Notifies all registered listeners that a transaction has ended. */
void NotifyListeners(transaction_id_t tx_id) const;
};
} // namespace tx

View File

@ -55,7 +55,7 @@ RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info)
using ClogInfoRpc =
communication::rpc::RequestResponse<ClogInfoReq, ClogInfoRes>;
RPC_SINGLE_MEMBER_MESSAGE(ActiveTransactionsReq, transaction_id_t)
RPC_NO_MEMBER_MESSAGE(ActiveTransactionsReq)
using ActiveTransactionsRpc =
communication::rpc::RequestResponse<ActiveTransactionsReq, SnapshotRes>;

View File

@ -50,23 +50,31 @@ command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) {
}
void SingleNodeEngine::Commit(const Transaction &t) {
auto tx_id = t.id_;
{
std::lock_guard<SpinLock> guard(lock_);
clog_.set_committed(t.id_);
active_.remove(t.id_);
clog_.set_committed(tx_id);
active_.remove(tx_id);
if (wal_) {
wal_->Emplace(database::StateDelta::TxCommit(t.id_));
wal_->Emplace(database::StateDelta::TxCommit(tx_id));
}
store_.erase(store_.find(t.id_));
store_.erase(store_.find(tx_id));
}
NotifyListeners(tx_id);
}
void SingleNodeEngine::Abort(const Transaction &t) {
auto tx_id = t.id_;
{
std::lock_guard<SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
active_.remove(t.id_);
clog_.set_aborted(tx_id);
active_.remove(tx_id);
if (wal_) {
wal_->Emplace(database::StateDelta::TxAbort(t.id_));
wal_->Emplace(database::StateDelta::TxAbort(tx_id));
}
store_.erase(store_.find(t.id_));
store_.erase(store_.find(tx_id));
}
NotifyListeners(tx_id);
}
CommitLog::Info SingleNodeEngine::Info(transaction_id_t tx) const {

View File

@ -1,3 +1,5 @@
#include <chrono>
#include "glog/logging.h"
#include "transactions/engine_rpc_messages.hpp"
@ -7,7 +9,22 @@
namespace tx {
WorkerEngine::WorkerEngine(const io::network::Endpoint &endpoint)
: rpc_client_pool_(endpoint, kTransactionEngineRpc) {}
: rpc_client_pool_(endpoint, kTransactionEngineRpc) {
cache_clearing_scheduler_.Run(kCacheReleasePeriod, [this]() {
// Use the GC snapshot as it always has at least one member.
auto res = rpc_client_pool_.Call<GcSnapshotRpc>();
// There is a race-condition between this scheduled call and worker
// shutdown. It is possible that the worker has responded to the master it
// is shutting down, and the master is shutting down (and can't responde to
// RPCs). At the same time this call gets scheduled, so we get a failed RPC.
if (!res) {
LOG(WARNING) << "Transaction cache GC RPC call failed";
} else {
CHECK(!res->member.empty()) << "Recieved an empty GcSnapshot";
ClearCachesBasedOnOldest(res->member.front());
}
});
}
WorkerEngine::~WorkerEngine() {
for (auto &kv : active_.access()) {
@ -120,5 +137,22 @@ void WorkerEngine::ClearCache(transaction_id_t tx_id) const {
delete found->second;
access.remove(tx_id);
}
NotifyListeners(tx_id);
}
void WorkerEngine::ClearCachesBasedOnOldest(transaction_id_t oldest_active) {
// Take care to handle concurrent calls to this correctly. Try to update the
// oldest_active_, and only if successful (nobody else did it concurrently),
// clear caches between the previous oldest (now expired) and new oldest
// (possibly still alive).
auto previous_oldest = oldest_active_.load();
while (
!oldest_active_.compare_exchange_strong(previous_oldest, oldest_active)) {
;
}
for (tx::transaction_id_t expired = previous_oldest; expired < oldest_active;
++expired) {
ClearCache(expired);
}
}
} // namespace tx

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <chrono>
#include <mutex>
#include "communication/rpc/client_pool.hpp"
@ -9,6 +10,7 @@
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "utils/scheduler.hpp"
namespace tx {
@ -17,6 +19,10 @@ namespace tx {
* begin/advance/end transactions on the master. */
class WorkerEngine : public Engine {
public:
/// The wait time between two releases of local transaction objects that have
/// expired on the master.
static constexpr std::chrono::seconds kCacheReleasePeriod{1};
WorkerEngine(const io::network::Endpoint &endpoint);
~WorkerEngine();
@ -47,5 +53,11 @@ class WorkerEngine : public Engine {
// Removes (destructs) a Transaction that's expired. If there is no cached
// transacton for the given id, nothing is done.
void ClearCache(transaction_id_t tx_id) const;
// Used for clearing of caches of transactions that have expired.
// Initialize the oldest_active_ with 1 because there's never a tx with id=0
std::atomic<transaction_id_t> oldest_active_{1};
void ClearCachesBasedOnOldest(transaction_id_t oldest_active);
Scheduler cache_clearing_scheduler_;
};
} // namespace tx

View File

@ -0,0 +1,37 @@
#pragma once
#include <functional>
#include "transactions/engine.hpp"
#include "transactions/type.hpp"
namespace tx {
/**
* Wrapper around the given function that registers itself with the given
* transaction engine. Ensures that the function gets called when a transaction
* has ended in the engine.
*
* Also ensures that the listener gets unregistered from the engine upon
* destruction. Intended usage is: just create a TxEndListener and ensure it
* gets destructed before the function it wraps becomes invalid.
*
* There are no guarantees that the listener will be called only once for the
* given transaction id.
*/
class TxEndListener {
public:
TxEndListener(Engine &engine, std::function<void(transaction_id_t)> function)
: engine_(engine), function_(std::move(function)) {
engine_.Register(this);
}
~TxEndListener() { engine_.Unregister(this); }
void operator()(transaction_id_t tx_id) const { function_(tx_id); }
private:
Engine &engine_;
std::function<void(transaction_id_t)> function_;
};
} // namespace tx

View File

@ -181,8 +181,6 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
auto tx1_batch2 = remote_pull(dba_1, worker_id).get();
expect_second_batch(tx1_batch2);
}
for (auto tx_id : {dba_1.transaction_id(), dba_2.transaction_id()})
master().remote_pull_clients().EndAllRemotePulls(tx_id, plan_id);
}
TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
@ -277,9 +275,6 @@ TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
auto future_w2_results = remote_pull(dba, 2);
check_result(1, future_w1_results.get().frames);
check_result(2, future_w2_results.get().frames);
master().remote_pull_clients().EndAllRemotePulls(dba.transaction_id(),
plan_id);
}
TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {

View File

@ -58,10 +58,4 @@ TEST_F(DistributedUpdateTest, RemoteUpdateApply) {
EXPECT_LABEL(v1_dba1, false, true);
}
TEST_F(DistributedUpdateTest, RemoteUpdateDiscard) {
EXPECT_LABEL(v1_dba1, false, false);
worker(1).remote_updates_server().Discard(dba1->transaction_id());
EXPECT_LABEL(v1_dba1, false, false);
}
#undef EXPECT_LABEL

View File

@ -1,4 +1,7 @@
#include <algorithm>
#include <mutex>
#include <unordered_set>
#include <vector>
#include "gtest/gtest.h"
@ -7,6 +10,7 @@
#include "transactions/engine_master.hpp"
#include "transactions/engine_rpc_messages.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/tx_end_listener.hpp"
using namespace tx;
using namespace communication::rpc;
@ -135,3 +139,22 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) {
[&local](Transaction &t) { local.insert(t.id_); });
EXPECT_EQ(local, std::unordered_set<tx::transaction_id_t>({1, 4}));
}
TEST_F(WorkerEngineTest, TxEndListener) {
std::atomic<int> has_expired{0};
TxEndListener worker_end_listner{
worker_, [&has_expired](transaction_id_t tid) {
std::cout << "asdasdadas: " << tid << std::endl;
++has_expired; }};
auto sleep_period =
WorkerEngine::kCacheReleasePeriod + std::chrono::milliseconds(200);
auto t1 = master_.Begin();
auto t2 = master_.Begin();
std::this_thread::sleep_for(sleep_period);
EXPECT_EQ(has_expired.load(), 0);
master_.Commit(*t1);
master_.Abort(*t2);
std::this_thread::sleep_for(sleep_period);
EXPECT_EQ(has_expired.load(), 2);
}

View File

@ -6,37 +6,40 @@
#include "data_structures/concurrent/concurrent_set.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/transaction.hpp"
#include "transactions/tx_end_listener.hpp"
using namespace tx;
TEST(Engine, GcSnapshot) {
tx::SingleNodeEngine engine;
ASSERT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1}));
SingleNodeEngine engine;
ASSERT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
std::vector<tx::Transaction *> transactions;
std::vector<Transaction *> transactions;
// create transactions and check the GC snapshot
for (int i = 0; i < 5; ++i) {
transactions.push_back(engine.Begin());
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1}));
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
}
// commit transactions in the middle, expect
// the GcSnapshot did not change
engine.Commit(*transactions[1]);
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1}));
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
engine.Commit(*transactions[2]);
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1}));
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
// have the first three transactions committed
engine.Commit(*transactions[0]);
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({1, 2, 3, 4}));
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({1, 2, 3, 4}));
// commit all
engine.Commit(*transactions[3]);
engine.Commit(*transactions[4]);
EXPECT_EQ(engine.GlobalGcSnapshot(), tx::Snapshot({6}));
EXPECT_EQ(engine.GlobalGcSnapshot(), Snapshot({6}));
}
TEST(Engine, Advance) {
tx::SingleNodeEngine engine;
SingleNodeEngine engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
@ -49,9 +52,9 @@ TEST(Engine, Advance) {
}
TEST(Engine, ConcurrentBegin) {
tx::SingleNodeEngine engine;
SingleNodeEngine engine;
std::vector<std::thread> threads;
ConcurrentSet<tx::transaction_id_t> tx_ids;
ConcurrentSet<transaction_id_t> tx_ids;
for (int i = 0; i < 10; ++i) {
threads.emplace_back([&engine, accessor = tx_ids.access() ]() mutable {
for (int j = 0; j < 100; ++j) {
@ -65,10 +68,29 @@ TEST(Engine, ConcurrentBegin) {
}
TEST(Engine, RunningTransaction) {
tx::SingleNodeEngine engine;
SingleNodeEngine engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
EXPECT_EQ(t0, engine.RunningTransaction(t0->id_));
EXPECT_NE(t1, engine.RunningTransaction(t0->id_));
EXPECT_EQ(t1, engine.RunningTransaction(t1->id_));
}
TEST(Engine, TxEndListener) {
SingleNodeEngine engine;
int count = 0;
{
TxEndListener listener{engine, [&count](auto) { count++; }};
EXPECT_EQ(count, 0);
auto t1 = engine.Begin();
EXPECT_EQ(count, 0);
auto t2 = engine.Begin();
engine.Abort(*t1);
EXPECT_EQ(count, 1);
engine.Commit(*t2);
EXPECT_EQ(count, 2);
}
auto t3 = engine.Begin();
engine.Commit(*t3);
EXPECT_EQ(count, 2);
}