Move distributed transaction engine logic

Summary:
This change introduces a pure virtual initial implementation of the transaction
engine which is then implemented in two versions: single node and distributed.
The interface classes now have the following hierarchy:

```
    Engine (pure interface)
         |
    +----+---------- EngineDistributed (common logic)
    |                         |
EngineSingleNode      +-------+--------+
                      |                |
                 EngineMaster     EngineWorker
```

In addition to this layout the `EngineMaster` uses `EngineSingleNode` as its
underlying storage engine and only changes the necessary functions to make
them work with the `EngineWorker`.

After this change I recommend that you delete the following leftover files:
```
rm src/distributed/transactional_cache_cleaner_rpc_messages.*
rm src/transactions/common.*
rm src/transactions/engine_rpc_messages.*
```

Reviewers: teon.banek, msantl, buda

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1589
This commit is contained in:
Matej Ferencevic 2018-09-04 20:30:58 +02:00
parent e5a7f51740
commit 96ece11cdd
44 changed files with 603 additions and 498 deletions

6
.gitignore vendored
View File

@ -62,8 +62,6 @@ src/distributed/storage_gc_rpc_messages.capnp
src/distributed/storage_gc_rpc_messages.hpp
src/distributed/token_sharing_rpc_messages.capnp
src/distributed/token_sharing_rpc_messages.hpp
src/distributed/transactional_cache_cleaner_rpc_messages.capnp
src/distributed/transactional_cache_cleaner_rpc_messages.hpp
src/distributed/updates_rpc_messages.capnp
src/distributed/updates_rpc_messages.hpp
src/query/plan/distributed_ops.capnp
@ -73,5 +71,5 @@ src/stats/stats_rpc_messages.capnp
src/stats/stats_rpc_messages.hpp
src/storage/concurrent_id_mapper_rpc_messages.capnp
src/storage/concurrent_id_mapper_rpc_messages.hpp
src/transactions/engine_rpc_messages.capnp
src/transactions/engine_rpc_messages.hpp
src/transactions/distributed/engine_rpc_messages.capnp
src/transactions/distributed/engine_rpc_messages.hpp

View File

@ -73,10 +73,9 @@ set(memgraph_src_files
storage/property_value_store.cpp
storage/record_accessor.cpp
storage/vertex_accessor.cpp
transactions/engine_master.cpp
transactions/engine_single_node.cpp
transactions/engine_worker.cpp
transactions/snapshot.cpp
transactions/distributed/engine_master.cpp
transactions/distributed/engine_worker.cpp
transactions/single_node/engine_single_node.cpp
memgraph_init.cpp
)
# -----------------------------------------------------------------------------
@ -101,14 +100,13 @@ add_lcp(distributed/index_rpc_messages.lcp CAPNP_SCHEMA @0xa8aab46862945bd6)
add_capnp(distributed/index_rpc_messages.capnp)
add_lcp(distributed/plan_rpc_messages.lcp CAPNP_SCHEMA @0xfcbc48dc9f106d28)
add_capnp(distributed/plan_rpc_messages.capnp)
add_lcp(distributed/pull_produce_rpc_messages.lcp CAPNP_SCHEMA @0xa78a9254a73685bd)
add_lcp(distributed/pull_produce_rpc_messages.lcp CAPNP_SCHEMA @0xa78a9254a73685bd
DEPENDS transactions/distributed/serialization.lcp)
add_capnp(distributed/pull_produce_rpc_messages.capnp)
add_lcp(distributed/storage_gc_rpc_messages.lcp CAPNP_SCHEMA @0xd705663dfe36cf81)
add_capnp(distributed/storage_gc_rpc_messages.capnp)
add_lcp(distributed/token_sharing_rpc_messages.lcp CAPNP_SCHEMA @0x8f295db54ec4caec)
add_capnp(distributed/token_sharing_rpc_messages.capnp)
add_lcp(distributed/transactional_cache_cleaner_rpc_messages.lcp CAPNP_SCHEMA @0xe2be6183a1ff9e11)
add_capnp(distributed/transactional_cache_cleaner_rpc_messages.capnp)
add_lcp(distributed/updates_rpc_messages.lcp CAPNP_SCHEMA @0x82d5f38d73c7b53a)
add_capnp(distributed/updates_rpc_messages.capnp)
@ -121,8 +119,9 @@ add_capnp(query/plan/distributed_ops.capnp)
add_lcp(storage/concurrent_id_mapper_rpc_messages.lcp CAPNP_SCHEMA @0xa6068dae93d225dd)
add_capnp(storage/concurrent_id_mapper_rpc_messages.capnp)
add_lcp(transactions/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5)
add_capnp(transactions/engine_rpc_messages.capnp)
add_lcp(transactions/distributed/engine_rpc_messages.lcp CAPNP_SCHEMA @0xde02b7c49180cad5
DEPENDS transactions/distributed/serialization.lcp)
add_capnp(transactions/distributed/engine_rpc_messages.capnp)
add_custom_target(generate_lcp DEPENDS ${generated_lcp_files})
@ -135,7 +134,6 @@ add_capnp(query/common.capnp)
add_capnp(query/frontend/ast/ast.capnp)
add_capnp(query/frontend/semantic/symbol.capnp)
add_capnp(storage/serialization.capnp)
add_capnp(transactions/common.capnp)
add_custom_target(generate_capnp DEPENDS generate_lcp ${generated_capnp_files})

View File

@ -15,10 +15,11 @@
#include "distributed/durability_rpc_master.hpp"
#include "distributed/durability_rpc_worker.hpp"
#include "distributed/index_rpc_server.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "distributed/produce_rpc_server.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "distributed/token_sharing_rpc_server.hpp"
#include "distributed/transactional_cache_cleaner.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "durability/snapshooter.hpp"
@ -27,7 +28,8 @@
#include "storage/concurrent_id_mapper.hpp"
#include "storage/concurrent_id_mapper_master.hpp"
#include "storage/concurrent_id_mapper_worker.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/distributed/engine_master.hpp"
#include "transactions/distributed/engine_worker.hpp"
#include "utils/file.hpp"
using namespace std::literals::chrono_literals;
@ -472,17 +474,6 @@ class DistributedRecoveryTransactions
public:
explicit DistributedRecoveryTransactions(DistributedGraphDb *db) : db_(db) {}
void Begin(const tx::TransactionId &tx_id) override {
CHECK(accessors_.find(tx_id) == accessors_.end())
<< "Double transaction start";
accessors_.emplace(tx_id, db_->Access());
}
void Abort(const tx::TransactionId &tx_id) final {
GetAccessor(tx_id)->Abort();
accessors_.erase(accessors_.find(tx_id));
}
void Commit(const tx::TransactionId &tx_id) final {
GetAccessor(tx_id)->Commit();
accessors_.erase(accessors_.find(tx_id));
@ -506,6 +497,17 @@ class MasterRecoveryTransactions final
explicit MasterRecoveryTransactions(Master *db)
: DistributedRecoveryTransactions(db) {}
void Begin(const tx::TransactionId &tx_id) final {
CHECK(accessors_.find(tx_id) == accessors_.end())
<< "Double transaction start";
accessors_.emplace(tx_id, db_->Access());
}
void Abort(const tx::TransactionId &tx_id) final {
GetAccessor(tx_id)->Abort();
accessors_.erase(accessors_.find(tx_id));
}
protected:
virtual GraphDbAccessor *GetAccessor(
const tx::TransactionId &tx_id) override {
@ -526,6 +528,10 @@ class WorkerRecoveryTransactions final
LOG(FATAL) << "Unexpected transaction begin on worker recovery.";
}
void Abort(const tx::TransactionId &tx_id) override {
LOG(FATAL) << "Unexpected transaction abort on worker recovery.";
}
protected:
GraphDbAccessor *GetAccessor(const tx::TransactionId &tx_id) override {
auto found = accessors_.find(tx_id);
@ -586,7 +592,7 @@ class Master {
database::Master *self_{nullptr};
communication::rpc::Server server_{
config_.master_endpoint, static_cast<size_t>(config_.rpc_num_workers)};
tx::MasterEngine tx_engine_{server_, rpc_worker_clients_, &wal_};
tx::EngineMaster tx_engine_{server_, rpc_worker_clients_, &wal_};
distributed::MasterCoordination coordination_{server_.endpoint()};
std::unique_ptr<StorageGcMaster> storage_gc_ =
std::make_unique<StorageGcMaster>(
@ -610,8 +616,6 @@ class Master {
distributed::UpdatesRpcServer updates_server_{self_, &server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{
tx_engine_, updates_server_, data_manager_};
distributed::ClusterDiscoveryMaster cluster_discovery_{
server_, coordination_, rpc_worker_clients_,
config_.durability_directory};
@ -626,6 +630,14 @@ class Master {
Master::Master(Config config)
: impl_(std::make_unique<impl::Master>(config, this)) {
// Register all transaction based caches for cleanup.
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->updates_server_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_);
// Start transactional cache cleanup.
impl_->tx_engine_.StartTransactionalCacheCleanup();
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
@ -722,6 +734,10 @@ Master::~Master() {
auto dba = Access();
MakeSnapshot(*dba);
}
// Transactional cache cleanup must be stopped before all of the objects
// that were registered for cleanup are destructed.
impl_->tx_engine_.StopTransactionalCacheCleanup();
}
std::unique_ptr<GraphDbAccessor> Master::Access() {
@ -900,7 +916,7 @@ class Worker {
distributed::WorkerCoordination coordination_{server_,
config_.master_endpoint};
distributed::RpcWorkerClients rpc_worker_clients_{coordination_};
tx::WorkerEngine tx_engine_{rpc_worker_clients_.GetClientPool(0)};
tx::EngineWorker tx_engine_{server_, rpc_worker_clients_.GetClientPool(0), &wal_};
std::unique_ptr<StorageGcWorker> storage_gc_ =
std::make_unique<StorageGcWorker>(
*storage_, tx_engine_, config_.gc_cycle_sec,
@ -923,9 +939,6 @@ class Worker {
distributed::UpdatesRpcServer updates_server_{self_, &server_};
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
distributed::DataManager data_manager_{*self_, data_clients_};
distributed::WorkerTransactionalCacheCleaner cache_cleaner_{
tx_engine_, &wal_, server_,
produce_server_, updates_server_, data_manager_};
distributed::DurabilityRpcWorker durability_rpc_{self_, &server_};
distributed::ClusterDiscoveryWorker cluster_discovery_{
server_, coordination_, rpc_worker_clients_.GetClientPool(0)};
@ -940,6 +953,16 @@ class Worker {
Worker::Worker(Config config)
: impl_(std::make_unique<impl::Worker>(config, this)) {
// Register all transaction based caches for cleanup.
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->updates_server_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(impl_->data_manager_);
impl_->tx_engine_.RegisterForTransactionalCacheCleanup(
impl_->produce_server_);
// Start transactional cache cleanup.
impl_->tx_engine_.StartTransactionalCacheCleanup();
if (impl_->config_.durability_enabled)
utils::CheckDir(impl_->config_.durability_directory);
@ -996,6 +1019,9 @@ Worker::~Worker() {
is_accepting_transactions_ = false;
impl_->tx_engine_.LocalForEachActiveTransaction(
[](auto &t) { t.set_should_abort(); });
// Transactional cache cleanup must be stopped before all of the objects
// that were registered for cleanup are destructed.
impl_->tx_engine_.StopTransactionalCacheCleanup();
}
std::unique_ptr<GraphDbAccessor> Worker::Access() {

View File

@ -11,7 +11,7 @@
#include "durability/recovery.hpp"
#include "durability/snapshooter.hpp"
#include "storage/concurrent_id_mapper_single_node.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "utils/file.hpp"
namespace database {
@ -197,7 +197,7 @@ class SingleNode {
config_.worker_id, config_.durability_directory,
config_.durability_enabled, config_.synchronous_commit};
tx::SingleNodeEngine tx_engine_{&wal_};
tx::EngineSingleNode tx_engine_{&wal_};
std::unique_ptr<StorageGcSingleNode> storage_gc_ =
std::make_unique<StorageGcSingleNode>(*storage_, tx_engine_,
config_.gc_cycle_sec);

View File

@ -3,9 +3,7 @@
#include "communication/rpc/client_pool.hpp"
#include "database/storage_gc.hpp"
#include "distributed/storage_gc_rpc_messages.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/transaction.hpp"
#include "transactions/distributed/engine_worker.hpp"
namespace database {
class StorageGcWorker : public StorageGc {
@ -30,7 +28,7 @@ class StorageGcWorker : public StorageGc {
// try to acquire a lock which hasn't been released (if the transaction
// cache cleaner was not scheduled at this time), and take a look into the
// commit log which no longer contains that transaction id.
dynamic_cast<tx::WorkerEngine &>(tx_engine_)
dynamic_cast<tx::EngineWorker &>(tx_engine_)
.ClearTransactionalCache(oldest_active);
auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) {

View File

@ -5,7 +5,7 @@
#include "distributed/pull_produce_rpc_messages.hpp"
#include "query/common.hpp"
#include "query/exceptions.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/distributed/engine_worker.hpp"
namespace distributed {
@ -98,7 +98,7 @@ ProduceRpcServer::OngoingProduce::PullOneFromCursor() {
}
ProduceRpcServer::ProduceRpcServer(database::Worker *db,
tx::WorkerEngine *tx_engine,
tx::EngineWorker *tx_engine,
communication::rpc::Server &server,
const PlanConsumer &plan_consumer,
DataManager *data_manager)
@ -136,11 +136,11 @@ ProduceRpcServer::ProduceRpcServer(database::Worker *db,
});
}
void ProduceRpcServer::FinishAndClearOngoingProducePlans(
tx::TransactionId tx_id) {
void ProduceRpcServer::ClearTransactionalCache(
tx::TransactionId oldest_active) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) {
if (std::get<0>(it->first) == tx_id) {
if (std::get<0>(it->first) < oldest_active) {
it = ongoing_produces_.erase(it);
} else {
++it;

View File

@ -25,7 +25,7 @@ class Worker;
}
namespace tx {
class WorkerEngine;
class EngineWorker;
}
namespace distributed {
@ -73,14 +73,15 @@ class ProduceRpcServer {
};
public:
ProduceRpcServer(database::Worker *db, tx::WorkerEngine *tx_engine,
ProduceRpcServer(database::Worker *db, tx::EngineWorker *tx_engine,
communication::rpc::Server &server,
const PlanConsumer &plan_consumer,
DataManager *data_manager);
/// Finish and clear ongoing produces for all plans that are tied to a
/// transaction with tx_id.
void FinishAndClearOngoingProducePlans(tx::TransactionId tx_id);
/// Clears all ongoing produces that are older than the oldest active
/// transaction. This function should be registered in the transaction engine
/// for transactional cache cleanup.
void ClearTransactionalCache(tx::TransactionId oldest_active);
private:
std::mutex ongoing_produces_lock_;
@ -93,7 +94,7 @@ class ProduceRpcServer {
database::Worker *db_;
communication::rpc::Server &produce_rpc_server_;
const distributed::PlanConsumer &plan_consumer_;
tx::WorkerEngine *tx_engine_;
tx::EngineWorker *tx_engine_;
/// Gets an ongoing produce for the given pull request. Creates a new one if
/// there is none currently existing.

View File

@ -22,17 +22,17 @@ cpp<#
#include "distributed/data_manager.hpp"
cpp<#)
(load "transactions/distributed/serialization.lcp")
(lcp:namespace distributed)
(lcp:capnp-namespace "distributed")
(lcp:capnp-import 'dis "/distributed/serialization.capnp")
(lcp:capnp-import 'sem "/query/frontend/semantic/symbol.capnp")
(lcp:capnp-import 'tx "/transactions/common.capnp")
(lcp:capnp-import 'utils "/utils/serialization.capnp")
(lcp:capnp-type-conversion "tx::CommandId" "UInt32")
(lcp:capnp-type-conversion "tx::Snapshot" "Tx.Snapshot")
(lcp:capnp-type-conversion "tx::TransactionId" "UInt64")
#>cpp
@ -321,7 +321,11 @@ cpp<#)
(lcp:define-rpc pull
(:request
((tx-id "tx::TransactionId")
(tx-snapshot "tx::Snapshot")
(tx-snapshot "tx::Snapshot"
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
:capnp-load #'load-snapshot)
(plan-id :int64_t)
(command-id "tx::CommandId")
(params "Parameters"

View File

@ -8,7 +8,6 @@
#include "distributed/coordination.hpp"
#include "distributed/index_rpc_messages.hpp"
#include "distributed/token_sharing_rpc_messages.hpp"
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "storage/types.hpp"
#include "transactions/transaction.hpp"
#include "utils/future.hpp"
@ -132,36 +131,4 @@ class TokenSharingRpcClients {
RpcWorkerClients *clients_;
};
/** Join ongoing produces on all workers.
*
* Sends a RPC request to all workers when a transaction is ending, notifying
* them to end all ongoing produces tied to that transaction.
*/
class OngoingProduceJoinerRpcClients {
public:
OngoingProduceJoinerRpcClients(RpcWorkerClients &clients)
: clients_(clients) {}
void JoinOngoingProduces(tx::TransactionId tx_id, bool committed) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [tx_id, committed](int worker_id,
communication::rpc::ClientPool &client_pool) {
auto result = client_pool.Call<distributed::WaitOnTransactionEndRpc>(
tx_id, committed);
CHECK(result)
<< "[WaitOnTransactionEndRpc] failed to notify that transaction "
<< tx_id << " ended";
});
// We need to wait for all workers to destroy pending futures to avoid
// using already destroyed (released) transaction objects.
for (auto &future : futures) {
future.wait();
}
}
private:
RpcWorkerClients &clients_;
};
} // namespace distributed

View File

@ -1,98 +0,0 @@
#pragma once
#include <functional>
#include <vector>
#include "communication/rpc/server.hpp"
#include "distributed/produce_rpc_server.hpp"
#include "distributed/transactional_cache_cleaner_rpc_messages.hpp"
#include "transactions/engine.hpp"
#include "transactions/engine_worker.hpp"
#include "utils/scheduler.hpp"
namespace distributed {
/// Periodically calls `ClearTransactionalCache(oldest_transaction)` on all
/// registered objects.
class TransactionalCacheCleaner {
/// The wait time between two releases of local transaction objects that have
/// expired on the master.
static constexpr std::chrono::seconds kCacheReleasePeriod{1};
public:
template <typename... T>
TransactionalCacheCleaner(tx::Engine &tx_engine, T &... caches)
: tx_engine_(tx_engine) {
Register(caches...);
cache_clearing_scheduler_.Run(
"DistrTxCacheGc", kCacheReleasePeriod,
[this]() { this->Clear(tx_engine_.GlobalGcSnapshot().back()); });
}
protected:
/// Registers the given object for transactional cleaning. The object will
/// periodically get it's `ClearCache(tx::TransactionId)` method called
/// with the oldest active transaction id. Note that the ONLY guarantee for
/// the call param is that there are no transactions alive that have an id
/// lower than it.
template <typename TCache>
void Register(TCache &cache) {
functions_.emplace_back([&cache](tx::TransactionId oldest_active) {
cache.ClearTransactionalCache(oldest_active);
});
}
private:
template <typename TCache, typename... T>
void Register(TCache &cache, T &... caches) {
Register(cache);
Register(caches...);
}
void Clear(tx::TransactionId oldest_active) {
for (auto &f : functions_) f(oldest_active);
}
tx::Engine &tx_engine_;
std::vector<std::function<void(tx::TransactionId &oldest_active)>> functions_;
utils::Scheduler cache_clearing_scheduler_;
};
/// Registers a RPC server that listens for `WaitOnTransactionEnd` requests
/// that require all ongoing produces to finish. It also periodically calls
/// `ClearTransactionalCache` on all registered objects.
class WorkerTransactionalCacheCleaner : public TransactionalCacheCleaner {
public:
template <class... T>
WorkerTransactionalCacheCleaner(tx::WorkerEngine &tx_engine,
durability::WriteAheadLog *wal,
communication::rpc::Server &server,
ProduceRpcServer &produce_server,
T &... caches)
: TransactionalCacheCleaner(tx_engine, caches...),
wal_(wal),
rpc_server_(server),
produce_server_(produce_server) {
Register(tx_engine);
rpc_server_.Register<WaitOnTransactionEndRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto tx_id = req_reader.getTxId();
auto committed = req_reader.getCommitted();
produce_server_.FinishAndClearOngoingProducePlans(tx_id);
if (wal_) {
if (committed) {
wal_->Emplace(database::StateDelta::TxCommit(tx_id));
} else {
wal_->Emplace(database::StateDelta::TxAbort(tx_id));
}
}
});
}
private:
durability::WriteAheadLog *wal_;
communication::rpc::Server &rpc_server_;
ProduceRpcServer &produce_server_;
};
} // namespace distributed

View File

@ -1,18 +0,0 @@
#>cpp
#pragma once
#include "distributed/transactional_cache_cleaner_rpc_messages.capnp.h"
#include "communication/rpc/messages.hpp"
#include "transactions/type.hpp"
cpp<#
(lcp:namespace distributed)
(lcp:capnp-namespace "distributed")
(lcp:define-rpc wait-on-transaction-end
(:request ((tx-id "tx::TransactionId" :capnp-type "UInt64")
(committed :bool)))
(:response ()))
(lcp:pop-namespace)

View File

@ -52,11 +52,13 @@ void WriteAheadLog::WalFile::Init() {
current_wal_file_ = std::experimental::filesystem::path();
} else {
current_wal_file_ = WalFilenameForTransactionId(wal_dir_, worker_id_);
// TODO: Fix error handling, the encoder_ returns `true` or `false`.
try {
writer_.Open(current_wal_file_);
encoder_.WriteRAW(durability::kWalMagic.data(),
durability::kWalMagic.size());
encoder_.WriteInt(durability::kVersion);
writer_.Flush();
} catch (std::ios_base::failure &) {
LOG(ERROR) << "Failed to open write-ahead log file: "
<< current_wal_file_;
@ -81,6 +83,7 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
if (!delta) break;
latest_tx_ = std::max(latest_tx_, delta->transaction_id);
delta->Encode(writer_, encoder_);
writer_.Flush();
if (++current_wal_file_delta_count_ >= FLAGS_wal_rotate_deltas_count)
RotateFile();
}
@ -97,6 +100,7 @@ void WriteAheadLog::WalFile::Flush(RingBuffer<database::StateDelta> &buffer) {
}
void WriteAheadLog::WalFile::RotateFile() {
writer_.Flush();
writer_.Close();
std::experimental::filesystem::rename(
current_wal_file_,

View File

@ -1,16 +1,16 @@
#pragma once
#include "data_structures/bitset/dynamic_bitset.hpp"
#include "transactions/common.capnp.h"
#include "transactions/type.hpp"
namespace tx {
// This class is lock free. There is no need to acquire any lock when accessing
// this class and this class doesn't acquire any lock on method calls.
class CommitLog {
class CommitLog final {
public:
static constexpr int kBitsetBlockSize = 32768;
CommitLog() = default;
CommitLog(const CommitLog &) = delete;
CommitLog(CommitLog &&) = delete;
@ -37,7 +37,7 @@ class CommitLog {
// lower than `id`.
void garbage_collect_older(TransactionId id) { log.delete_prefix(2 * id); }
class Info {
class Info final {
public:
Info() {} // Needed for serialization.
enum Status {
@ -46,24 +46,27 @@ class CommitLog {
ABORTED = 2, // 10
};
explicit Info(uint8_t flags) : flags_(flags) {}
explicit Info(uint8_t flags) {
if (flags & ABORTED) {
flags_ = ABORTED;
} else if (flags & COMMITTED) {
flags_ = COMMITTED;
} else {
flags_ = ACTIVE;
}
}
bool is_active() const { return flags_ == ACTIVE; }
bool is_committed() const { return flags_ & COMMITTED; }
bool is_committed() const {
if (flags_ & ABORTED) return false;
return flags_ & COMMITTED;
}
bool is_aborted() const { return flags_ & ABORTED; }
operator uint8_t() const { return flags_; }
void Save(capnp::CommitLogInfo::Builder *builder) const {
builder->setFlags(flags_);
}
void Load(const capnp::CommitLogInfo::Reader &reader) {
flags_ = reader.getFlags();
}
private:
uint8_t flags_{0};
};

View File

@ -1,12 +0,0 @@
@0xcdbe169866471033;
using Cxx = import "/capnp/c++.capnp";
$Cxx.namespace("tx::capnp");
struct Snapshot {
transactionIds @0 :List(UInt64);
}
struct CommitLogInfo {
flags @0 :UInt8;
}

View File

@ -0,0 +1,62 @@
/// @file
#pragma once
#include "communication/rpc/server.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "transactions/engine.hpp"
namespace tx {
/// Distributed base transaction engine. Has only common functionality shared
/// between the master and worker engines.
class EngineDistributed : public Engine {
/// The wait time between two releases of local transaction objects that have
/// expired on the master.
static constexpr std::chrono::seconds kCacheReleasePeriod{1};
public:
/// Starts transactional cache cleanup. Transactional cache cleanup should be
/// started *after* all of the registered objects are constructed to avoid
/// segmentation faults.
void StartTransactionalCacheCleanup() {
cache_clearing_scheduler_.Run("TX cache GC", kCacheReleasePeriod, [this]() {
std::lock_guard<std::mutex> guard(lock_);
// TODO (mferencevic): this has to be aware that `GlobalGcSnapshot` can
// throw!
auto oldest_active = GlobalGcSnapshot().back();
// Call all registered functions for cleanup.
for (auto &f : functions_) f(oldest_active);
// Clean our cache.
ClearTransactionalCache(oldest_active);
});
}
/// Registers the given object for transactional cleaning. The object will
/// periodically get it's `ClearCache(tx::TransactionId)` method called
/// with the oldest active transaction id. Note that the ONLY guarantee for
/// the call param is that there are no transactions alive that have an id
/// lower than it.
template <typename TCache>
void RegisterForTransactionalCacheCleanup(TCache &cache) {
std::lock_guard<std::mutex> guard(lock_);
functions_.emplace_back([&cache](tx::TransactionId oldest_active) {
cache.ClearTransactionalCache(oldest_active);
});
}
/// Stops transactional cache cleanup. Transactional cache cleanup should be
/// stopped *before* all of the registered objects are destructed to avoid
/// segmentation faults.
void StopTransactionalCacheCleanup() { cache_clearing_scheduler_.Stop(); }
/// Clears the cache of local transactions that have expired. This function
/// has to be implemented by each class that inherits this class.
virtual void ClearTransactionalCache(TransactionId oldest_active) = 0;
private:
std::mutex lock_;
std::vector<std::function<void(tx::TransactionId)>> functions_;
utils::Scheduler cache_clearing_scheduler_;
};
} // namespace tx

View File

@ -4,17 +4,17 @@
#include "glog/logging.h"
#include "database/state_delta.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/engine_rpc_messages.hpp"
#include "transactions/distributed/engine_master.hpp"
#include "transactions/distributed/engine_rpc_messages.hpp"
namespace tx {
MasterEngine::MasterEngine(communication::rpc::Server &server,
EngineMaster::EngineMaster(communication::rpc::Server &server,
distributed::RpcWorkerClients &rpc_worker_clients,
durability::WriteAheadLog *wal)
: SingleNodeEngine(wal),
: engine_single_node_(wal),
rpc_server_(server),
ongoing_produce_joiner_(rpc_worker_clients) {
rpc_worker_clients_(rpc_worker_clients) {
rpc_server_.Register<BeginRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto tx = this->Begin();
@ -85,14 +85,78 @@ MasterEngine::MasterEngine(communication::rpc::Server &server,
});
}
void MasterEngine::Commit(const Transaction &t) {
ongoing_produce_joiner_.JoinOngoingProduces(t.id_, true);
SingleNodeEngine::Commit(t);
Transaction *EngineMaster::Begin() { return engine_single_node_.Begin(); }
CommandId EngineMaster::Advance(TransactionId id) {
return engine_single_node_.Advance(id);
}
void MasterEngine::Abort(const Transaction &t) {
ongoing_produce_joiner_.JoinOngoingProduces(t.id_, false);
SingleNodeEngine::Abort(t);
CommandId EngineMaster::UpdateCommand(TransactionId id) {
return engine_single_node_.UpdateCommand(id);
}
void EngineMaster::Commit(const Transaction &t) {
auto tx_id = t.id_;
auto futures = rpc_worker_clients_.ExecuteOnWorkers<void>(
0, [tx_id](int worker_id, communication::rpc::ClientPool &client_pool) {
auto result = client_pool.Call<NotifyCommittedRpc>(tx_id);
CHECK(result)
<< "[NotifyCommittedRpc] failed to notify that transaction "
<< tx_id << " committed";
});
// We need to wait for all workers to destroy pending futures to avoid
// using already destroyed (released) transaction objects.
for (auto &future : futures) {
future.wait();
}
engine_single_node_.Commit(t);
}
void EngineMaster::Abort(const Transaction &t) { engine_single_node_.Abort(t); }
CommitLog::Info EngineMaster::Info(TransactionId tx) const {
return engine_single_node_.Info(tx);
}
Snapshot EngineMaster::GlobalGcSnapshot() {
return engine_single_node_.GlobalGcSnapshot();
}
Snapshot EngineMaster::GlobalActiveTransactions() {
return engine_single_node_.GlobalActiveTransactions();
}
TransactionId EngineMaster::LocalLast() const {
return engine_single_node_.LocalLast();
}
TransactionId EngineMaster::GlobalLast() const {
return engine_single_node_.GlobalLast();
}
TransactionId EngineMaster::LocalOldestActive() const {
return engine_single_node_.LocalOldestActive();
}
void EngineMaster::GarbageCollectCommitLog(TransactionId tx_id) {
return engine_single_node_.GarbageCollectCommitLog(tx_id);
}
void EngineMaster::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
return engine_single_node_.LocalForEachActiveTransaction(f);
}
Transaction *EngineMaster::RunningTransaction(TransactionId tx_id) {
return engine_single_node_.RunningTransaction(tx_id);
}
void EngineMaster::EnsureNextIdGreater(TransactionId tx_id) {
return engine_single_node_.EnsureNextIdGreater(tx_id);
}
void EngineMaster::ClearTransactionalCache(TransactionId oldest_active) {}
} // namespace tx

View File

@ -0,0 +1,53 @@
/// @file
#pragma once
#include "communication/rpc/server.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "transactions/distributed/engine_distributed.hpp"
#include "transactions/single_node/engine_single_node.hpp"
namespace tx {
/// Distributed master transaction engine. Has complete engine functionality and
/// exposes an RPC server to be used by distributed Workers.
class EngineMaster final : public EngineDistributed {
public:
/// @param server - Required. Used for rpc::Server construction.
/// @param rpc_worker_clients - Required. Used for
/// OngoingProduceJoinerRpcClients construction.
/// @param wal - Optional. If present, the Engine will write tx
/// Begin/Commit/Abort atomically (while under lock).
EngineMaster(communication::rpc::Server &server,
distributed::RpcWorkerClients &rpc_worker_clients,
durability::WriteAheadLog *wal = nullptr);
EngineMaster(const EngineMaster &) = delete;
EngineMaster(EngineMaster &&) = delete;
EngineMaster &operator=(const EngineMaster &) = delete;
EngineMaster &operator=(EngineMaster &&) = delete;
Transaction *Begin() override;
CommandId Advance(TransactionId id) override;
CommandId UpdateCommand(TransactionId id) override;
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
CommitLog::Info Info(TransactionId tx) const override;
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;
TransactionId GlobalLast() const override;
TransactionId LocalLast() const override;
TransactionId LocalOldestActive() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
Transaction *RunningTransaction(TransactionId tx_id) override;
void EnsureNextIdGreater(TransactionId tx_id) override;
void GarbageCollectCommitLog(TransactionId tx_id) override;
void ClearTransactionalCache(TransactionId oldest_active) override;
private:
EngineSingleNode engine_single_node_;
communication::rpc::Server &rpc_server_;
distributed::RpcWorkerClients &rpc_worker_clients_;
};
} // namespace tx

View File

@ -3,23 +3,27 @@
#include "communication/rpc/messages.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine_rpc_messages.capnp.h"
#include "transactions/distributed/engine_rpc_messages.capnp.h"
#include "transactions/snapshot.hpp"
#include "transactions/type.hpp"
cpp<#
(load "transactions/distributed/serialization.lcp")
(lcp:namespace tx)
(lcp:capnp-namespace "tx")
(lcp:capnp-import 'tx "/transactions/common.capnp")
(lcp:capnp-type-conversion "TransactionId" "UInt64")
(lcp:capnp-type-conversion "CommandId" "UInt32")
(lcp:capnp-type-conversion "Snapshot" "Tx.Snapshot")
(lcp:define-struct tx-and-snapshot ()
((tx-id "TransactionId")
(snapshot "Snapshot"))
(snapshot "Snapshot"
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
:capnp-load #'load-snapshot))
(:serialize :capnp))
(lcp:define-rpc begin
@ -40,7 +44,11 @@ cpp<#
(lcp:define-rpc snapshot
(:request ((member "TransactionId")))
(:response ((member "Snapshot"))))
(:response ((member "Snapshot"
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
:capnp-load #'load-snapshot))))
(lcp:define-rpc command
(:request ((member "TransactionId")))
@ -48,15 +56,27 @@ cpp<#
(lcp:define-rpc gc-snapshot
(:request ())
(:response ((member "Snapshot"))))
(:response ((member "Snapshot"
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
:capnp-load #'load-snapshot))))
(lcp:define-rpc clog-info
(:request ((member "TransactionId")))
(:response ((member "CommitLog::Info" :capnp-type "Tx.CommitLogInfo"))))
(:response ((member "CommitLog::Info"
:capnp-type "UInt8"
:capnp-init nil
:capnp-save #'save-commitlog-info
:capnp-load #'load-commitlog-info))))
(lcp:define-rpc active-transactions
(:request ())
(:response ((member "Snapshot"))))
(:response ((member "Snapshot"
:capnp-type "List(UInt64)"
:capnp-init nil
:capnp-save #'save-snapshot
:capnp-load #'load-snapshot))))
(lcp:define-rpc ensure-next-id-greater
(:request ((member "TransactionId")))
@ -66,4 +86,8 @@ cpp<#
(:request ())
(:response ((member "TransactionId"))))
(lcp:define-rpc notify-committed
(:request ((member "TransactionId")))
(:response ()))
(lcp:pop-namespace) ;; tx

View File

@ -2,45 +2,70 @@
#include "glog/logging.h"
#include "transactions/engine_rpc_messages.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/distributed/engine_rpc_messages.hpp"
#include "transactions/distributed/engine_worker.hpp"
#include "utils/atomic.hpp"
namespace tx {
WorkerEngine::WorkerEngine(communication::rpc::ClientPool &master_client_pool)
: master_client_pool_(master_client_pool) {}
EngineWorker::EngineWorker(communication::rpc::Server &server,
communication::rpc::ClientPool &master_client_pool,
durability::WriteAheadLog *wal)
: server_(server), master_client_pool_(master_client_pool), wal_(wal) {
// Register our `NotifyCommittedRpc` server. This RPC should only write the
// `TxCommit` operation into the WAL. It is only used to indicate that the
// transaction has succeeded on all workers and that it will be committed on
// the master. When recovering the cluster from WALs the `TxCommit` operation
// indicates that all operations for a given transaction were written to the
// WAL. If we wouldn't have the `TxCommit` after all operations for a given
// transaction in the WAL we couldn't be sure that all operations were saved
// (eg. some operations could have been written into one WAL file, some to
// another file). This way we ensure that if the `TxCommit` is written it was
// the last thing associated with that transaction and everything before was
// flushed to the disk.
// NOTE: We can't cache the commit state for this transaction because this
// RPC call could fail on other workers which will cause the transaction to be
// aborted. This mismatch in committed/aborted across workers is resolved by
// using the master as a single source of truth when doing recovery.
server_.Register<NotifyCommittedRpc>(
[this](const auto &req_reader, auto *res_builder) {
auto tid = req_reader.getMember();
if (wal_) {
wal_->Emplace(database::StateDelta::TxCommit(tid));
}
});
}
WorkerEngine::~WorkerEngine() {
EngineWorker::~EngineWorker() {
for (auto &kv : active_.access()) {
delete kv.second;
}
}
Transaction *WorkerEngine::Begin() {
Transaction *EngineWorker::Begin() {
auto res = master_client_pool_.Call<BeginRpc>();
CHECK(res) << "BeginRpc failed";
auto &data = res->member;
UpdateOldestActive(data.snapshot, data.tx_id);
Transaction *tx = new Transaction(data.tx_id, data.snapshot, *this);
Transaction *tx = CreateTransaction(data.tx_id, data.snapshot);
auto insertion = active_.access().insert(data.tx_id, tx);
CHECK(insertion.second) << "Failed to start creation from worker";
VLOG(11) << "[Tx] Starting worker transaction " << data.tx_id;
return tx;
}
CommandId WorkerEngine::Advance(TransactionId tx_id) {
CommandId EngineWorker::Advance(TransactionId tx_id) {
auto res = master_client_pool_.Call<AdvanceRpc>(tx_id);
CHECK(res) << "AdvanceRpc failed";
auto access = active_.access();
auto found = access.find(tx_id);
CHECK(found != access.end())
<< "Can't advance a transaction not in local cache";
found->second->cid_ = res->member;
SetCommand(found->second, res->member);
return res->member;
}
CommandId WorkerEngine::UpdateCommand(TransactionId tx_id) {
CommandId EngineWorker::UpdateCommand(TransactionId tx_id) {
auto res = master_client_pool_.Call<CommandRpc>(tx_id);
CHECK(res) << "CommandRpc failed";
auto cmd_id = res->member;
@ -55,26 +80,26 @@ CommandId WorkerEngine::UpdateCommand(TransactionId tx_id) {
auto access = active_.access();
auto found = access.find(tx_id);
if (found != access.end()) {
found->second->cid_ = cmd_id;
SetCommand(found->second, cmd_id);
}
return cmd_id;
}
void WorkerEngine::Commit(const Transaction &t) {
void EngineWorker::Commit(const Transaction &t) {
auto res = master_client_pool_.Call<CommitRpc>(t.id_);
CHECK(res) << "CommitRpc failed";
VLOG(11) << "[Tx] Commiting worker transaction " << t.id_;
ClearSingleTransaction(t.id_);
}
void WorkerEngine::Abort(const Transaction &t) {
void EngineWorker::Abort(const Transaction &t) {
auto res = master_client_pool_.Call<AbortRpc>(t.id_);
CHECK(res) << "AbortRpc failed";
VLOG(11) << "[Tx] Aborting worker transaction " << t.id_;
ClearSingleTransaction(t.id_);
}
CommitLog::Info WorkerEngine::Info(TransactionId tid) const {
CommitLog::Info EngineWorker::Info(TransactionId tid) const {
auto info = clog_.fetch_info(tid);
// If we don't know the transaction to be commited nor aborted, ask the
// master about it and update the local commit log.
@ -94,7 +119,7 @@ CommitLog::Info WorkerEngine::Info(TransactionId tid) const {
return info;
}
Snapshot WorkerEngine::GlobalGcSnapshot() {
Snapshot EngineWorker::GlobalGcSnapshot() {
auto res = master_client_pool_.Call<GcSnapshotRpc>();
CHECK(res) << "GcSnapshotRpc failed";
auto snapshot = std::move(res->member);
@ -102,7 +127,7 @@ Snapshot WorkerEngine::GlobalGcSnapshot() {
return snapshot;
}
Snapshot WorkerEngine::GlobalActiveTransactions() {
Snapshot EngineWorker::GlobalActiveTransactions() {
auto res = master_client_pool_.Call<ActiveTransactionsRpc>();
CHECK(res) << "ActiveTransactionsRpc failed";
auto snapshot = std::move(res->member);
@ -110,21 +135,22 @@ Snapshot WorkerEngine::GlobalActiveTransactions() {
return snapshot;
}
TransactionId WorkerEngine::LocalLast() const { return local_last_; }
TransactionId WorkerEngine::GlobalLast() const {
TransactionId EngineWorker::LocalLast() const { return local_last_; }
TransactionId EngineWorker::GlobalLast() const {
auto res = master_client_pool_.Call<GlobalLastRpc>();
CHECK(res) << "GlobalLastRpc failed";
return res->member;
}
void WorkerEngine::LocalForEachActiveTransaction(
void EngineWorker::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
for (auto pair : active_.access()) f(*pair.second);
}
TransactionId WorkerEngine::LocalOldestActive() const { return oldest_active_; }
TransactionId EngineWorker::LocalOldestActive() const { return oldest_active_; }
Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id) {
Transaction *EngineWorker::RunningTransaction(TransactionId tx_id) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
@ -136,20 +162,20 @@ Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id) {
return RunningTransaction(tx_id, snapshot);
}
Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id,
Transaction *EngineWorker::RunningTransaction(TransactionId tx_id,
const Snapshot &snapshot) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
auto new_tx = new Transaction(tx_id, snapshot, *this);
auto new_tx = CreateTransaction(tx_id, snapshot);
auto insertion = accessor.insert(tx_id, new_tx);
if (!insertion.second) delete new_tx;
utils::EnsureAtomicGe(local_last_, tx_id);
return insertion.first->second;
}
void WorkerEngine::ClearTransactionalCache(TransactionId oldest_active) const {
void EngineWorker::ClearTransactionalCache(TransactionId oldest_active) {
auto access = active_.access();
for (auto kv : access) {
if (kv.first < oldest_active) {
@ -161,7 +187,7 @@ void WorkerEngine::ClearTransactionalCache(TransactionId oldest_active) const {
}
}
void WorkerEngine::ClearSingleTransaction(TransactionId tx_id) const {
void EngineWorker::ClearSingleTransaction(TransactionId tx_id) const {
auto access = active_.access();
auto found = access.find(tx_id);
if (found != access.end()) {
@ -172,7 +198,7 @@ void WorkerEngine::ClearSingleTransaction(TransactionId tx_id) const {
}
}
void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
void EngineWorker::UpdateOldestActive(const Snapshot &snapshot,
TransactionId alternative) {
if (snapshot.empty()) {
oldest_active_.store(std::max(alternative, oldest_active_.load()));
@ -181,11 +207,11 @@ void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
}
}
void WorkerEngine::EnsureNextIdGreater(TransactionId tx_id) {
void EngineWorker::EnsureNextIdGreater(TransactionId tx_id) {
master_client_pool_.Call<EnsureNextIdGreaterRpc>(tx_id);
}
void WorkerEngine::GarbageCollectCommitLog(TransactionId tx_id) {
void EngineWorker::GarbageCollectCommitLog(TransactionId tx_id) {
clog_.garbage_collect_older(tx_id);
}
} // namespace tx

View File

@ -3,25 +3,30 @@
#include <atomic>
#include "communication/rpc/client_pool.hpp"
#include "communication/rpc/server.hpp"
#include "data_structures/concurrent/concurrent_map.hpp"
#include "durability/wal.hpp"
#include "io/network/endpoint.hpp"
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/distributed/engine_distributed.hpp"
#include "transactions/transaction.hpp"
namespace tx {
/** Distributed worker transaction engine. Connects to a MasterEngine (single
/** Distributed worker transaction engine. Connects to a EngineMaster (single
* source of truth) to obtain transactional info. Caches most info locally. Can
* begin/advance/end transactions on the master. */
class WorkerEngine : public Engine {
class EngineWorker final : public EngineDistributed {
public:
/// The wait time between two releases of local transaction objects that have
/// expired on the master.
static constexpr std::chrono::seconds kCacheReleasePeriod{1};
EngineWorker(communication::rpc::Server &server,
communication::rpc::ClientPool &master_client_pool,
durability::WriteAheadLog *wal = nullptr);
~EngineWorker();
explicit WorkerEngine(communication::rpc::ClientPool &master_client_pool);
~WorkerEngine();
EngineWorker(const EngineWorker &) = delete;
EngineWorker(EngineWorker &&) = delete;
EngineWorker &operator=(const EngineWorker &) = delete;
EngineWorker &operator=(EngineWorker &&) = delete;
Transaction *Begin() override;
CommandId Advance(TransactionId id) override;
@ -45,9 +50,7 @@ class WorkerEngine : public Engine {
void EnsureNextIdGreater(TransactionId tx_id) override;
void GarbageCollectCommitLog(tx::TransactionId tx_id) override;
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(TransactionId oldest_active) const;
void ClearTransactionalCache(TransactionId oldest_active) override;
private:
// Local caches.
@ -56,9 +59,15 @@ class WorkerEngine : public Engine {
// Mutable because just getting info can cause a cache fill.
mutable CommitLog clog_;
// Our local RPC server.
communication::rpc::Server &server_;
// Communication to the transactional master.
communication::rpc::ClientPool &master_client_pool_;
// Write ahead log.
durability::WriteAheadLog *wal_;
// Used for clearing of caches of transactions that have expired.
// Initialize the oldest_active_ with 1 because there's never a tx with id=0
std::atomic<TransactionId> oldest_active_{1};

View File

@ -0,0 +1,20 @@
;; This file doesn't need to be preprocessed. It only holds helper functions.
(defun save-commitlog-info (builder member) #>cpp ${builder}->setMember(${member}); cpp<#)
(defun load-commitlog-info (reader member) #>cpp ${member} = CommitLog::Info(${reader}.getMember()); cpp<#)
(defun save-snapshot (builder member)
(let ((capnp-member (remove #\_ (string-capitalize member))))
#>cpp
auto list_builder = builder->init${capnp-member}(${member}.transaction_ids().size());
utils::SaveVector(${member}.transaction_ids(), &list_builder);
cpp<#))
(defun load-snapshot (reader member)
(let ((capnp-member (remove #\_ (string-capitalize member))))
#>cpp
std::vector<uint64_t> transaction_ids;
utils::LoadVector(&transaction_ids, ${reader}.get${capnp-member}());
${member} = tx::Snapshot(std::move(transaction_ids));
cpp<#))

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include <algorithm>
@ -11,18 +13,14 @@
#include "transactions/type.hpp"
namespace tx {
/**
* Database transaction engine. Used for managing transactions and the related
* information such as transaction snapshots and the transaction state info.
*
* This is an abstract base class for implementing a single-node transactional
* engine (MasterEngine), an engine for the master in a distributed system (also
* MasterEngine), and for the worker in a distributed system (WorkerEngine).
*
* Methods in this class are often prefixed with "Global" or "Local", depending
* on the guarantees that they need to satisfy. These guarantee requirements are
* determined by the users of a particular method.
*/
/// Database transaction engine. Used for managing transactions and the related
/// information such as transaction snapshots and the transaction state info.
///
/// This is an abstract base class for implementing a transactional engine.
///
/// Methods in this class are often prefixed with "Global" or "Local", depending
/// on the guarantees that they need to satisfy. These guarantee requirements
/// are determined by the users of a particular method.
class Engine {
public:
virtual ~Engine() = default;
@ -44,58 +42,66 @@ class Engine {
/// valid after this function executes.
virtual void Abort(const Transaction &t) = 0;
/** Returns the commit log Info about the given transaction. */
/// Returns the commit log Info about the given transaction.
virtual CommitLog::Info Info(TransactionId tx) const = 0;
/** Returns the snapshot relevant to garbage collection of database records.
*
* If there are no active transactions that means a snapshot containing only
* the next transaction ID. If there are active transactions, that means the
* oldest active transaction's snapshot, with that transaction's ID appened as
* last.
*
* The idea is that data records can only be deleted if they were expired (and
* that was committed) by a transaction older than the older currently active.
* We need the full snapshot to prevent overlaps (see general GC
* documentation).
*
* The returned snapshot must be for the globally oldest active transaction.
* If we only looked at locally known transactions, it would be possible to
* delete something that and older active transaction can still see.
*/
/// Returns the snapshot relevant to garbage collection of database records.
///
/// If there are no active transactions that means a snapshot containing only
/// the next transaction ID. If there are active transactions, that means the
/// oldest active transaction's snapshot, with that transaction's ID appened
/// as last.
///
/// The idea is that data records can only be deleted if they were expired
/// (and that was committed) by a transaction older than the older currently
/// active. We need the full snapshot to prevent overlaps (see general GC
/// documentation).
///
/// The returned snapshot must be for the globally oldest active transaction.
/// If we only looked at locally known transactions, it would be possible to
/// delete something that and older active transaction can still see.
virtual Snapshot GlobalGcSnapshot() = 0;
/** Returns active transactions. */
/// Returns active transactions.
virtual Snapshot GlobalActiveTransactions() = 0;
/** Returns the ID the last globally known transaction. */
/// Returns the ID the last globally known transaction.
virtual tx::TransactionId GlobalLast() const = 0;
/** Returns the ID of last locally known transaction. */
/// Returns the ID of last locally known transaction.
virtual tx::TransactionId LocalLast() const = 0;
/** Returns the ID of the oldest transaction locally known to be active. It is
* guaranteed that all the transactions older than the returned are globally
* not active. */
/// Returns the ID of the oldest transaction locally known to be active. It is
/// guaranteed that all the transactions older than the returned are globally
/// not active.
virtual TransactionId LocalOldestActive() const = 0;
/** Calls function f on each locally active transaction. */
/// Calls function f on each locally active transaction.
virtual void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) = 0;
/** Gets a transaction object for a running transaction. */
/// Gets a transaction object for a running transaction.
virtual tx::Transaction *RunningTransaction(TransactionId tx_id) = 0;
/** Ensures the next transaction that starts will have the ID greater than
* the given id. */
/// Ensures the next transaction that starts will have the ID greater than
/// the given id.
virtual void EnsureNextIdGreater(TransactionId tx_id) = 0;
/** Garbage collects transactions older than tx_id from commit log. */
/// Garbage collects transactions older than tx_id from commit log.
virtual void GarbageCollectCommitLog(TransactionId tx_id) = 0;
auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; }
protected:
Transaction *CreateTransaction(TransactionId id, const Snapshot &snapshot) {
return new Transaction(id, snapshot, *this);
}
CommandId AdvanceCommand(Transaction *t) { return t->AdvanceCommand(); }
void SetCommand(Transaction *t, CommandId cid) { t->SetCommand(cid); }
private:
// Map lock dependencies. Each entry maps (tx_that_wants_lock,
// tx_that_holds_lock). Used for local deadlock resolution.

View File

@ -1,30 +0,0 @@
#pragma once
#include "communication/rpc/server.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "transactions/engine_single_node.hpp"
namespace tx {
/** Distributed master transaction engine. Has complete engine functionality and
* exposes an RPC server to be used by distributed Workers. */
class MasterEngine : public SingleNodeEngine {
public:
/**
* @param server - Required. Used for rpc::Server construction.
* @param rpc_worker_clients - Required. Used for
* OngoingProduceJoinerRpcClients construction.
* @param wal - Optional. If present, the Engine will write tx
* Begin/Commit/Abort atomically (while under lock).
*/
MasterEngine(communication::rpc::Server &server,
distributed::RpcWorkerClients &rpc_worker_clients,
durability::WriteAheadLog *wal = nullptr);
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
private:
communication::rpc::Server &rpc_server_;
distributed::OngoingProduceJoinerRpcClients ongoing_produce_joiner_;
};
} // namespace tx

View File

@ -4,20 +4,19 @@
#include "glog/logging.h"
#include "database/state_delta.hpp"
#include "transactions/engine_rpc_messages.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
namespace tx {
SingleNodeEngine::SingleNodeEngine(durability::WriteAheadLog *wal)
EngineSingleNode::EngineSingleNode(durability::WriteAheadLog *wal)
: wal_(wal) {}
Transaction *SingleNodeEngine::Begin() {
Transaction *EngineSingleNode::Begin() {
VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
std::lock_guard<utils::SpinLock> guard(lock_);
TransactionId id{++counter_};
auto t = new Transaction(id, active_, *this);
auto t = CreateTransaction(id, active_);
active_.insert(id);
store_.emplace(id, t);
if (wal_) {
@ -26,7 +25,7 @@ Transaction *SingleNodeEngine::Begin() {
return t;
}
CommandId SingleNodeEngine::Advance(TransactionId id) {
CommandId EngineSingleNode::Advance(TransactionId id) {
std::lock_guard<utils::SpinLock> guard(lock_);
auto it = store_.find(id);
@ -34,23 +33,18 @@ CommandId SingleNodeEngine::Advance(TransactionId id) {
<< "Transaction::advance on non-existing transaction";
Transaction *t = it->second.get();
if (t->cid_ == std::numeric_limits<CommandId>::max())
throw TransactionError(
"Reached maximum number of commands in this "
"transaction.");
return ++(t->cid_);
return AdvanceCommand(t);
}
CommandId SingleNodeEngine::UpdateCommand(TransactionId id) {
CommandId EngineSingleNode::UpdateCommand(TransactionId id) {
std::lock_guard<utils::SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
<< "Transaction::advance on non-existing transaction";
return it->second->cid_;
return it->second->cid();
}
void SingleNodeEngine::Commit(const Transaction &t) {
void EngineSingleNode::Commit(const Transaction &t) {
VLOG(11) << "[Tx] Commiting transaction " << t.id_;
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_committed(t.id_);
@ -61,7 +55,7 @@ void SingleNodeEngine::Commit(const Transaction &t) {
store_.erase(store_.find(t.id_));
}
void SingleNodeEngine::Abort(const Transaction &t) {
void EngineSingleNode::Abort(const Transaction &t) {
VLOG(11) << "[Tx] Aborting transaction " << t.id_;
std::lock_guard<utils::SpinLock> guard(lock_);
clog_.set_aborted(t.id_);
@ -72,11 +66,11 @@ void SingleNodeEngine::Abort(const Transaction &t) {
store_.erase(store_.find(t.id_));
}
CommitLog::Info SingleNodeEngine::Info(TransactionId tx) const {
CommitLog::Info EngineSingleNode::Info(TransactionId tx) const {
return clog_.fetch_info(tx);
}
Snapshot SingleNodeEngine::GlobalGcSnapshot() {
Snapshot EngineSingleNode::GlobalGcSnapshot() {
std::lock_guard<utils::SpinLock> guard(lock_);
// No active transactions.
@ -92,29 +86,29 @@ Snapshot SingleNodeEngine::GlobalGcSnapshot() {
return snapshot_copy;
}
Snapshot SingleNodeEngine::GlobalActiveTransactions() {
Snapshot EngineSingleNode::GlobalActiveTransactions() {
std::lock_guard<utils::SpinLock> guard(lock_);
Snapshot active_transactions = active_;
return active_transactions;
}
TransactionId SingleNodeEngine::LocalLast() const {
TransactionId EngineSingleNode::LocalLast() const {
std::lock_guard<utils::SpinLock> guard(lock_);
return counter_;
}
TransactionId SingleNodeEngine::GlobalLast() const { return LocalLast(); }
TransactionId EngineSingleNode::GlobalLast() const { return LocalLast(); }
TransactionId SingleNodeEngine::LocalOldestActive() const {
TransactionId EngineSingleNode::LocalOldestActive() const {
std::lock_guard<utils::SpinLock> guard(lock_);
return active_.empty() ? counter_ + 1 : active_.front();
}
void SingleNodeEngine::GarbageCollectCommitLog(TransactionId tx_id) {
void EngineSingleNode::GarbageCollectCommitLog(TransactionId tx_id) {
clog_.garbage_collect_older(tx_id);
}
void SingleNodeEngine::LocalForEachActiveTransaction(
void EngineSingleNode::LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) {
std::lock_guard<utils::SpinLock> guard(lock_);
for (auto transaction : active_) {
@ -122,7 +116,7 @@ void SingleNodeEngine::LocalForEachActiveTransaction(
}
}
Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) {
Transaction *EngineSingleNode::RunningTransaction(TransactionId tx_id) {
std::lock_guard<utils::SpinLock> guard(lock_);
auto found = store_.find(tx_id);
CHECK(found != store_.end())
@ -130,7 +124,7 @@ Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) {
return found->second.get();
}
void SingleNodeEngine::EnsureNextIdGreater(TransactionId tx_id) {
void EngineSingleNode::EnsureNextIdGreater(TransactionId tx_id) {
std::lock_guard<utils::SpinLock> guard(lock_);
counter_ = std::max(tx_id, counter_);
}

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include <atomic>
@ -8,26 +10,21 @@
#include "transactions/commit_log.hpp"
#include "transactions/engine.hpp"
#include "transactions/transaction.hpp"
#include "utils/exceptions.hpp"
#include "utils/thread/sync.hpp"
namespace tx {
/** Indicates an error in transaction handling (currently
* only command id overflow). */
class TransactionError : public utils::BasicException {
/// Single-node deployment transaction engine. Has complete functionality.
class EngineSingleNode final : public Engine {
public:
using utils::BasicException::BasicException;
};
/// @param wal - Optional. If present, the Engine will write tx
/// Begin/Commit/Abort atomically (while under lock).
explicit EngineSingleNode(durability::WriteAheadLog *wal = nullptr);
/** Single-node deployment transaction engine. Has complete functionality. */
class SingleNodeEngine : public Engine {
public:
/**
* @param wal - Optional. If present, the Engine will write tx
* Begin/Commit/Abort atomically (while under lock).
*/
explicit SingleNodeEngine(durability::WriteAheadLog *wal = nullptr);
EngineSingleNode(const EngineSingleNode &) = delete;
EngineSingleNode(EngineSingleNode &&) = delete;
EngineSingleNode &operator=(const EngineSingleNode &) = delete;
EngineSingleNode &operator=(EngineSingleNode &&) = delete;
Transaction *Begin() override;
CommandId Advance(TransactionId id) override;

View File

@ -1,16 +0,0 @@
#include "transactions/snapshot.hpp"
#include "utils/serialization.hpp"
namespace tx {
void Snapshot::Save(capnp::Snapshot::Builder *builder) const {
auto list_builder = builder->initTransactionIds(transaction_ids_.size());
utils::SaveVector(transaction_ids_, &list_builder);
}
void Snapshot::Load(const capnp::Snapshot::Reader &reader) {
utils::LoadVector(&transaction_ids_, reader.getTransactionIds());
}
} // namespace tx

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include <algorithm>
@ -5,51 +7,49 @@
#include <vector>
#include "glog/logging.h"
#include "transactions/common.capnp.h"
#include "transactions/type.hpp"
#include "utils/algorithm.hpp"
namespace tx {
class Engine;
/** Ascendingly sorted collection of transaction ids.
*
* Represents the transactions that were active at
* some point in the discrete transaction time.
*/
class Snapshot {
/// Ascendingly sorted collection of transaction ids.
///
/// Represents the transactions that were active at
/// some point in the discrete transaction time.
class Snapshot final {
public:
Snapshot() = default;
Snapshot(std::vector<TransactionId> &&active)
: transaction_ids_(std::move(active)) {}
// all the copy/move constructors/assignments act naturally
/** Returns true if this snapshot contains the given
* transaction id.
*
* @param xid - The transcation id in question
*/
Snapshot(const Snapshot &) = default;
Snapshot(Snapshot &&) = default;
Snapshot &operator=(const Snapshot &) = default;
Snapshot &operator=(Snapshot &&) = default;
/// Returns true if this snapshot contains the given
/// transaction id.
///
/// @param xid - The transcation id in question
bool contains(TransactionId id) const {
return std::binary_search(transaction_ids_.begin(), transaction_ids_.end(),
id);
}
/** Adds the given transaction id to the end of this Snapshot.
* The given id must be greater then all the existing ones,
* to maintain ascending sort order.
*
* @param id - the transaction id to add
*/
/// Adds the given transaction id to the end of this Snapshot.
/// The given id must be greater then all the existing ones,
/// to maintain ascending sort order.
///
/// @param id - the transaction id to add
void insert(TransactionId id) {
transaction_ids_.push_back(id);
DCHECK(std::is_sorted(transaction_ids_.begin(), transaction_ids_.end()))
<< "Snapshot must be sorted";
}
/** Removes the given transaction id from this Snapshot.
*
* @param id - the transaction id to remove */
/// Removes the given transaction id from this Snapshot.
///
/// @param id - the transaction id to remove
void remove(TransactionId id) {
auto last =
std::remove(transaction_ids_.begin(), transaction_ids_.end(), id);
@ -84,8 +84,7 @@ class Snapshot {
return stream;
}
void Save(capnp::Snapshot::Builder *builder) const;
void Load(const capnp::Snapshot::Reader &reader);
const auto &transaction_ids() const { return transaction_ids_; }
private:
std::vector<TransactionId> transaction_ids_;

View File

@ -1,3 +1,5 @@
/// @file
#pragma once
#include <chrono>
@ -11,22 +13,28 @@
#include "transactions/lock_store.hpp"
#include "transactions/snapshot.hpp"
#include "transactions/type.hpp"
#include "utils/exceptions.hpp"
namespace tx {
/** A database transaction. Encapsulates an atomic, abortable unit of work. Also
* defines that all db ops are single-threaded within a single transaction */
class Transaction {
/// Indicates an error in transaction handling (currently
/// only command id overflow).
class TransactionError : public utils::BasicException {
public:
/** Returns the maximum possible transcation id */
using utils::BasicException::BasicException;
};
/// A database transaction. Encapsulates an atomic, abortable unit of work. Also
/// defines that all db ops are single-threaded within a single transaction
class Transaction final {
public:
/// Returns the maximum possible transcation id
static TransactionId MaxId() {
return std::numeric_limits<TransactionId>::max();
}
private:
friend class SingleNodeEngine;
friend class MasterEngine;
friend class WorkerEngine;
friend class Engine;
// The constructor is private, only the Engine ever uses it.
Transaction(TransactionId id, const Snapshot &snapshot, Engine &engine)
@ -40,27 +48,26 @@ class Transaction {
Transaction &operator=(Transaction &&) = delete;
public:
/** Acquires the lock over the given RecordLock, preventing other transactions
* from doing the same */
/// Acquires the lock over the given RecordLock, preventing other transactions
/// from doing the same
void TakeLock(RecordLock &lock) const { locks_.Take(&lock, *this, engine_); }
/** Transaction's id. Unique in the engine that owns it */
/// Transaction's id. Unique in the engine that owns it
const TransactionId id_;
/** The transaction engine to which this transaction belongs */
/// The transaction engine to which this transaction belongs
Engine &engine_;
/** Returns the current transaction's current command id */
/// Returns the current transaction's current command id
// TODO rename to cmd_id (variable and function
auto cid() const { return cid_; }
/** Returns this transaction's snapshot. */
/// Returns this transaction's snapshot.
const Snapshot &snapshot() const { return snapshot_; }
/** Signal to transaction that it should abort. It doesn't really enforce that
* transaction will abort, but it merely hints too the transaction that it is
* preferable to stop its execution.
*/
/// Signal to transaction that it should abort. It doesn't really enforce that
/// transaction will abort, but it merely hints too the transaction that it is
/// preferable to stop its execution.
void set_should_abort() { should_abort_ = true; }
bool should_abort() const { return should_abort_; }
@ -68,6 +75,19 @@ class Transaction {
auto creation_time() const { return creation_time_; }
private:
// Function used to advance the command.
CommandId AdvanceCommand() {
if (cid_ == std::numeric_limits<CommandId>::max()) {
throw TransactionError(
"Reached maximum number of commands in this "
"transaction.");
}
return ++cid_;
}
// Function used to set the command.
void SetCommand(CommandId cid) { cid_ = cid; }
// Index of the current command in the current transaction.
CommandId cid_{1};

View File

@ -1,12 +1,14 @@
/// @file
#include <cstdint>
// transcation and command types defined
// in a separate header to avoid cyclic dependencies
namespace tx {
/** Type of a tx::Transcation's id member */
/// Type of a tx::Transcation's id member
using TransactionId = uint64_t;
/** Type of a tx::Transcation's command id member */
/// Type of a tx::Transcation's command id member
using CommandId = uint32_t;
}

View File

@ -4,7 +4,7 @@
#include "mvcc/record.hpp"
#include "mvcc/version_list.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
class Prop : public mvcc::Record<Prop> {
public:
@ -19,7 +19,7 @@ class Prop : public mvcc::Record<Prop> {
void MvccMix(benchmark::State &state) {
while (state.KeepRunning()) {
state.PauseTiming();
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
auto t1 = engine.Begin();
mvcc::VersionList<Prop> version_list(*t1, 0, 0);

View File

@ -3,14 +3,14 @@
#include <glog/logging.h>
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "utils/timer.hpp"
void Benchmark(int64_t num_threads, int64_t num_transactions) {
LOG(INFO) << "Testing with " << num_threads << " threads and "
<< num_transactions << " transactions per thread...";
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
std::vector<std::thread> threads;
utils::Timer timer;
for (int i = 0; i < num_threads; ++i) {

View File

@ -5,7 +5,7 @@
#include "database/graph_db_accessor.hpp"
#include "storage/types.hpp"
#include "storage/vertex.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "mvcc_gc_common.hpp"
@ -16,7 +16,7 @@ TEST(LabelsIndex, UniqueInsert) {
database::KeyIndex<storage::Label, Vertex> index;
database::SingleNode db;
auto dba = db.Access();
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
auto t1 = engine.Begin();
mvcc::VersionList<Vertex> vlist(*t1, 0, 0);
@ -45,7 +45,7 @@ TEST(LabelsIndex, UniqueFilter) {
database::SingleNode db;
database::KeyIndex<storage::Label, Vertex> index;
auto dba = db.Access();
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
auto t1 = engine.Begin();
mvcc::VersionList<Vertex> vlist1(*t1, 0, 0);
@ -85,7 +85,7 @@ TEST(LabelsIndex, Refresh) {
database::KeyIndex<storage::Label, Vertex> index;
database::SingleNode db;
auto access = db.Access();
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
// add two vertices to database
auto t1 = engine.Begin();

View File

@ -4,6 +4,7 @@
#include "database/graph_db_accessor.hpp"
#include "database/indexes/label_property_index.hpp"
#include "storage/types.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "mvcc_gc_common.hpp"
@ -45,7 +46,7 @@ class LabelPropertyIndexComplexTest : public ::testing::Test {
LabelPropertyIndex index;
LabelPropertyIndex::Key *key;
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
tx::Transaction *t{nullptr};
mvcc::VersionList<Vertex> *vlist;

View File

@ -10,7 +10,7 @@
#include "distributed/data_manager.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "storage/address_types.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/distributed/engine_master.hpp"
#include "utils/file.hpp"
DECLARE_string(durability_directory);

View File

@ -89,11 +89,15 @@ class DistributedDurability : public DistributedGraphDbTest {
// The master always has TRANSACTION_BEGIN and `op`.
ASSERT_EQ(deltas.size(), 2);
EXPECT_EQ(deltas[1].type, op);
}
else {
// The workers only have `op`.
ASSERT_EQ(deltas.size(), 1);
EXPECT_EQ(deltas[0].type, op);
} else {
// The workers only have `op` if the op is `COMMITTED`, they don't have
// the `ABORTED` op.
if (op == database::StateDelta::Type::TRANSACTION_COMMIT) {
ASSERT_EQ(deltas.size(), 1);
EXPECT_EQ(deltas[0].type, op);
} else {
ASSERT_EQ(deltas.size(), 0);
}
}
}
}

View File

@ -24,7 +24,7 @@
#include "query/typed_value.hpp"
#include "query_common.hpp"
#include "query_plan_common.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/distributed/engine_master.hpp"
using database::GraphDbAccessor;
using namespace distributed;

View File

@ -25,7 +25,6 @@
#include "query/typed_value.hpp"
#include "query_common.hpp"
#include "query_plan_common.hpp"
#include "transactions/engine_master.hpp"
DECLARE_int32(query_execution_time_sec);

View File

@ -10,7 +10,7 @@
#include "storage/property_value_store.hpp"
#include "storage/types.hpp"
#include "storage/vertex.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
using namespace storage;
@ -117,7 +117,7 @@ TEST(DistributedSerialization, VertexProperties) {
class DistributedSerializationMvcc : public ::testing::Test {
protected:
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
tx::Transaction *tx = engine.Begin();
mvcc::VersionList<Vertex> v1_vlist{*tx, 0, 0};
Vertex &v1 = *v1_vlist.Oldest();

View File

@ -4,14 +4,14 @@
#include "mvcc/record.hpp"
#include "mvcc/version.hpp"
#include "mvcc/version_list.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "transactions/transaction.hpp"
#include "utils/thread/sync.hpp"
#include "mvcc_gc_common.hpp"
TEST(MVCC, Deadlock) {
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
auto t0 = engine.Begin();
mvcc::VersionList<Prop> version_list1(*t0, 0, 0);
@ -31,7 +31,7 @@ TEST(MVCC, Deadlock) {
TEST(MVCC, UpdateDontDelete) {
std::atomic<int> count{0};
{
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
auto t1 = engine.Begin();
mvcc::VersionList<DestrCountRec> version_list(*t1, 0, 0, count);
engine.Commit(*t1);
@ -55,7 +55,7 @@ TEST(MVCC, UpdateDontDelete) {
// Check that we get the oldest record.
TEST(MVCC, Oldest) {
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
auto t1 = engine.Begin();
mvcc::VersionList<Prop> version_list(*t1, 0, 0);
auto first = version_list.Oldest();

View File

@ -4,7 +4,7 @@
#include "mvcc/record.hpp"
#include "mvcc/version.hpp"
#include "mvcc/version_list.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "transactions/transaction.hpp"
class TestClass : public mvcc::Record<TestClass> {
@ -57,7 +57,7 @@ class Mvcc : public ::testing::Test {
}
// variable where number of versions is stored
int version_list_size = 0;
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
tx::Transaction *t1 = engine.Begin();
mvcc::VersionList<TestClass> version_list{*t1, 0, 0, version_list_size};
TestClass *v1 = nullptr;

View File

@ -12,13 +12,13 @@
#include "mvcc/version_list.hpp"
#include "storage/garbage_collector.hpp"
#include "storage/vertex.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "mvcc_gc_common.hpp"
class MvccGcTest : public ::testing::Test {
protected:
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
private:
tx::Transaction *t0 = engine.Begin();
@ -116,7 +116,7 @@ TEST_F(MvccGcTest, OldestTransactionSnapshot) {
*/
TEST(GarbageCollector, GcClean) {
ConcurrentMap<int64_t, mvcc::VersionList<DestrCountRec> *> collection;
tx::SingleNodeEngine engine;
tx::EngineSingleNode engine;
DeferredDeleter<DestrCountRec> deleter;
DeferredDeleter<mvcc::VersionList<DestrCountRec>> vlist_deleter;
GarbageCollector<decltype(collection), DestrCountRec> gc(collection, deleter,

View File

@ -1,7 +1,7 @@
#pragma once
#include "mvcc/record.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
/**
* @brief - Empty class which inherits from mvcc:Record.
@ -29,7 +29,7 @@ class DestrCountRec : public mvcc::Record<DestrCountRec> {
// helper function for creating a GC snapshot
// if given a nullptr it makes a GC snapshot like there
// are no active transactions
auto GcSnapshot(tx::SingleNodeEngine &engine, tx::Transaction *t) {
auto GcSnapshot(tx::EngineSingleNode &engine, tx::Transaction *t) {
if (t != nullptr) {
tx::Snapshot gc_snap = t->snapshot();
gc_snap.insert(t->id_);

View File

@ -9,9 +9,8 @@
#include "distributed/cluster_discovery_master.hpp"
#include "distributed/coordination_master.hpp"
#include "io/network/endpoint.hpp"
#include "transactions/engine_master.hpp"
#include "transactions/engine_rpc_messages.hpp"
#include "transactions/engine_worker.hpp"
#include "transactions/distributed/engine_master.hpp"
#include "transactions/distributed/engine_worker.hpp"
using namespace tx;
using namespace communication::rpc;
@ -22,13 +21,14 @@ class WorkerEngineTest : public testing::Test {
const std::string local{"127.0.0.1"};
Server master_server_{{local, 0}};
Server worker_server_{{local, 0}};
MasterCoordination master_coordination_{master_server_.endpoint()};
RpcWorkerClients rpc_worker_clients_{master_coordination_};
MasterEngine master_{master_server_, rpc_worker_clients_};
EngineMaster master_{master_server_, rpc_worker_clients_};
ClientPool master_client_pool{master_server_.endpoint()};
WorkerEngine worker_{master_client_pool};
EngineWorker worker_{worker_server_, master_client_pool};
};
TEST_F(WorkerEngineTest, BeginOnWorker) {

View File

@ -4,13 +4,13 @@
#include <vector>
#include "data_structures/concurrent/skiplist.hpp"
#include "transactions/engine_single_node.hpp"
#include "transactions/single_node/engine_single_node.hpp"
#include "transactions/transaction.hpp"
using namespace tx;
TEST(Engine, GcSnapshot) {
SingleNodeEngine engine;
EngineSingleNode engine;
ASSERT_EQ(engine.GlobalGcSnapshot(), Snapshot({1}));
std::vector<Transaction *> transactions;
@ -38,7 +38,7 @@ TEST(Engine, GcSnapshot) {
}
TEST(Engine, Advance) {
SingleNodeEngine engine;
EngineSingleNode engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
@ -51,7 +51,7 @@ TEST(Engine, Advance) {
}
TEST(Engine, ConcurrentBegin) {
SingleNodeEngine engine;
EngineSingleNode engine;
std::vector<std::thread> threads;
SkipList<TransactionId> tx_ids;
for (int i = 0; i < 10; ++i) {
@ -67,7 +67,7 @@ TEST(Engine, ConcurrentBegin) {
}
TEST(Engine, RunningTransaction) {
SingleNodeEngine engine;
EngineSingleNode engine;
auto t0 = engine.Begin();
auto t1 = engine.Begin();
EXPECT_EQ(t0, engine.RunningTransaction(t0->id_));
@ -76,7 +76,7 @@ TEST(Engine, RunningTransaction) {
}
TEST(Engine, EnsureTxIdGreater) {
SingleNodeEngine engine;
EngineSingleNode engine;
ASSERT_LE(engine.Begin()->id_, 40);
engine.EnsureNextIdGreater(42);
EXPECT_EQ(engine.Begin()->id_, 43);