Replace command_id_t with CommandId and transaction_id_t with TransactionId.

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1367
This commit is contained in:
Dominik Gleich 2018-04-19 15:32:41 +02:00
parent debe5a961c
commit 7af80ebb8d
56 changed files with 260 additions and 260 deletions

View File

@ -22,7 +22,7 @@ GraphDbAccessor::GraphDbAccessor(GraphDb &db)
transaction_(*db.tx_engine().Begin()),
transaction_starter_{true} {}
GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id)
GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id)
: db_(db),
transaction_(*db.tx_engine().RunningTransaction(tx_id)),
transaction_starter_{false} {}
@ -33,7 +33,7 @@ GraphDbAccessor::~GraphDbAccessor() {
}
}
tx::transaction_id_t GraphDbAccessor::transaction_id() const {
tx::TransactionId GraphDbAccessor::transaction_id() const {
return transaction_.id_;
}

View File

@ -52,7 +52,7 @@ class GraphDbAccessor {
explicit GraphDbAccessor(GraphDb &db);
/// Creates an accessor for a running transaction.
GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id);
GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id);
~GraphDbAccessor();
GraphDbAccessor(const GraphDbAccessor &other) = delete;
@ -549,7 +549,7 @@ class GraphDbAccessor {
const std::string &PropertyName(storage::Property property) const;
/** Returns the id of this accessor's transaction */
tx::transaction_id_t transaction_id() const;
tx::TransactionId transaction_id() const;
/** Advances transaction's command id by 1. */
void AdvanceCommand();

View File

@ -6,26 +6,26 @@
namespace database {
StateDelta StateDelta::TxBegin(tx::transaction_id_t tx_id) {
StateDelta StateDelta::TxBegin(tx::TransactionId tx_id) {
return {StateDelta::Type::TRANSACTION_BEGIN, tx_id};
}
StateDelta StateDelta::TxCommit(tx::transaction_id_t tx_id) {
StateDelta StateDelta::TxCommit(tx::TransactionId tx_id) {
return {StateDelta::Type::TRANSACTION_COMMIT, tx_id};
}
StateDelta StateDelta::TxAbort(tx::transaction_id_t tx_id) {
StateDelta StateDelta::TxAbort(tx::TransactionId tx_id) {
return {StateDelta::Type::TRANSACTION_ABORT, tx_id};
}
StateDelta StateDelta::CreateVertex(tx::transaction_id_t tx_id,
StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id,
gid::Gid vertex_id) {
StateDelta op(StateDelta::Type::CREATE_VERTEX, tx_id);
op.vertex_id = vertex_id;
return op;
}
StateDelta StateDelta::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id,
gid::Gid vertex_from_id,
gid::Gid vertex_to_id,
storage::EdgeType edge_type,
@ -39,7 +39,7 @@ StateDelta StateDelta::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
return op;
}
StateDelta StateDelta::AddOutEdge(tx::transaction_id_t tx_id,
StateDelta StateDelta::AddOutEdge(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::VertexAddress vertex_to_address,
storage::EdgeAddress edge_address,
@ -54,7 +54,7 @@ StateDelta StateDelta::AddOutEdge(tx::transaction_id_t tx_id,
return op;
}
StateDelta StateDelta::RemoveOutEdge(tx::transaction_id_t tx_id,
StateDelta StateDelta::RemoveOutEdge(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "WAL can only contain global addresses.";
@ -64,7 +64,7 @@ StateDelta StateDelta::RemoveOutEdge(tx::transaction_id_t tx_id,
return op;
}
StateDelta StateDelta::AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
StateDelta StateDelta::AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_from_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type) {
@ -78,7 +78,7 @@ StateDelta StateDelta::AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
return op;
}
StateDelta StateDelta::RemoveInEdge(tx::transaction_id_t tx_id,
StateDelta StateDelta::RemoveInEdge(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "WAL can only contain global addresses.";
@ -88,7 +88,7 @@ StateDelta StateDelta::RemoveInEdge(tx::transaction_id_t tx_id,
return op;
}
StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id,
StateDelta StateDelta::PropsSetVertex(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::Property property,
const std::string &property_name,
@ -101,7 +101,7 @@ StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id,
return op;
}
StateDelta StateDelta::PropsSetEdge(tx::transaction_id_t tx_id,
StateDelta StateDelta::PropsSetEdge(tx::TransactionId tx_id,
gid::Gid edge_id,
storage::Property property,
const std::string &property_name,
@ -114,7 +114,7 @@ StateDelta StateDelta::PropsSetEdge(tx::transaction_id_t tx_id,
return op;
}
StateDelta StateDelta::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
StateDelta StateDelta::AddLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::Label label,
const std::string &label_name) {
StateDelta op(StateDelta::Type::ADD_LABEL, tx_id);
@ -124,7 +124,7 @@ StateDelta StateDelta::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
return op;
}
StateDelta StateDelta::RemoveLabel(tx::transaction_id_t tx_id,
StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id,
gid::Gid vertex_id, storage::Label label,
const std::string &label_name) {
StateDelta op(StateDelta::Type::REMOVE_LABEL, tx_id);
@ -134,7 +134,7 @@ StateDelta StateDelta::RemoveLabel(tx::transaction_id_t tx_id,
return op;
}
StateDelta StateDelta::RemoveVertex(tx::transaction_id_t tx_id,
StateDelta StateDelta::RemoveVertex(tx::TransactionId tx_id,
gid::Gid vertex_id, bool check_empty) {
StateDelta op(StateDelta::Type::REMOVE_VERTEX, tx_id);
op.vertex_id = vertex_id;
@ -142,14 +142,14 @@ StateDelta StateDelta::RemoveVertex(tx::transaction_id_t tx_id,
return op;
}
StateDelta StateDelta::RemoveEdge(tx::transaction_id_t tx_id,
StateDelta StateDelta::RemoveEdge(tx::TransactionId tx_id,
gid::Gid edge_id) {
StateDelta op(StateDelta::Type::REMOVE_EDGE, tx_id);
op.edge_id = edge_id;
return op;
}
StateDelta StateDelta::BuildIndex(tx::transaction_id_t tx_id,
StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id,
storage::Label label,
const std::string &label_name,
storage::Property property,

View File

@ -47,7 +47,7 @@ struct StateDelta {
};
StateDelta() = default;
StateDelta(const enum Type &type, tx::transaction_id_t tx_id)
StateDelta(const enum Type &type, tx::TransactionId tx_id)
: type(type), transaction_id(tx_id) {}
/** Attempts to decode a StateDelta from the given decoder. Returns the
@ -62,47 +62,47 @@ struct StateDelta {
HashedFileWriter &writer,
communication::bolt::PrimitiveEncoder<HashedFileWriter> &encoder) const;
static StateDelta TxBegin(tx::transaction_id_t tx_id);
static StateDelta TxCommit(tx::transaction_id_t tx_id);
static StateDelta TxAbort(tx::transaction_id_t tx_id);
static StateDelta CreateVertex(tx::transaction_id_t tx_id,
static StateDelta TxBegin(tx::TransactionId tx_id);
static StateDelta TxCommit(tx::TransactionId tx_id);
static StateDelta TxAbort(tx::TransactionId tx_id);
static StateDelta CreateVertex(tx::TransactionId tx_id,
gid::Gid vertex_id);
static StateDelta CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
static StateDelta CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id,
gid::Gid vertex_from_id, gid::Gid vertex_to_id,
storage::EdgeType edge_type,
const std::string &edge_type_name);
static StateDelta AddOutEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
static StateDelta AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_to_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type);
static StateDelta RemoveOutEdge(tx::transaction_id_t tx_id,
static StateDelta RemoveOutEdge(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address);
static StateDelta AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
static StateDelta AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_from_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type);
static StateDelta RemoveInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
static StateDelta RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address);
static StateDelta PropsSetVertex(tx::transaction_id_t tx_id,
static StateDelta PropsSetVertex(tx::TransactionId tx_id,
gid::Gid vertex_id,
storage::Property property,
const std::string &property_name,
const PropertyValue &value);
static StateDelta PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
static StateDelta PropsSetEdge(tx::TransactionId tx_id, gid::Gid edge_id,
storage::Property property,
const std::string &property_name,
const PropertyValue &value);
static StateDelta AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
static StateDelta AddLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::Label label,
const std::string &label_name);
static StateDelta RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
static StateDelta RemoveLabel(tx::TransactionId tx_id, gid::Gid vertex_id,
storage::Label label,
const std::string &label_name);
static StateDelta RemoveVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id,
static StateDelta RemoveVertex(tx::TransactionId tx_id, gid::Gid vertex_id,
bool check_empty);
static StateDelta RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id);
static StateDelta BuildIndex(tx::transaction_id_t tx_id, storage::Label label,
static StateDelta RemoveEdge(tx::TransactionId tx_id, gid::Gid edge_id);
static StateDelta BuildIndex(tx::TransactionId tx_id, storage::Label label,
const std::string &label_name,
storage::Property property,
const std::string &property_name);
@ -112,7 +112,7 @@ struct StateDelta {
// Members valid for every delta.
enum Type type;
tx::transaction_id_t transaction_id;
tx::TransactionId transaction_id;
// Members valid only for some deltas, see StateDelta::Type comments above.
// TODO: when preparing the WAL for distributed, most likely remove Gids and

View File

@ -111,7 +111,7 @@ class Storage {
LabelPropertyIndex label_property_index_;
// Set of transactions ids which are building indexes currently
SkipList<tx::transaction_id_t> index_build_tx_in_progress_;
SkipList<tx::TransactionId> index_build_tx_in_progress_;
/// Gets the Vertex/Edge main storage map.
template <typename TRecord>

View File

@ -64,7 +64,7 @@ class StorageGc {
StorageGc &operator=(const StorageGc &) = delete;
StorageGc &operator=(StorageGc &&) = delete;
virtual void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) = 0;
virtual void CollectCommitLogGarbage(tx::TransactionId oldest_active) = 0;
void CollectGarbage() {
// main garbage collection logic
@ -129,9 +129,9 @@ class StorageGc {
// alive transaction from the time before the hints were set is still alive
// (otherwise that transaction could still be waiting for a resolution of
// the query to the commit log about some old transaction)
std::experimental::optional<tx::transaction_id_t> GetClogSafeTransaction(
tx::transaction_id_t oldest_active) {
std::experimental::optional<tx::transaction_id_t> safe_to_delete;
std::experimental::optional<tx::TransactionId> GetClogSafeTransaction(
tx::TransactionId oldest_active) {
std::experimental::optional<tx::TransactionId> safe_to_delete;
while (!gc_txid_ranges_.empty() &&
gc_txid_ranges_.front().second < oldest_active) {
safe_to_delete = gc_txid_ranges_.front().first;
@ -150,7 +150,7 @@ class StorageGc {
// History of <oldest active transaction, next transaction to be ran> ranges
// that gc operated on at some previous time - used to clear commit log
std::queue<std::pair<tx::transaction_id_t, tx::transaction_id_t>>
std::queue<std::pair<tx::TransactionId, tx::TransactionId>>
gc_txid_ranges_;
};
} // namespace database

View File

@ -31,7 +31,7 @@ class StorageGcMaster : public StorageGc {
scheduler_.Stop();
}
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
void CollectCommitLogGarbage(tx::TransactionId oldest_active) final {
// Workers are sending information when it's safe to delete every
// transaction older than oldest_active from their perspective i.e. there
// won't exist another transaction in the future with id larger than or
@ -39,7 +39,7 @@ class StorageGcMaster : public StorageGc {
// the state of transactions which we are deleting.
auto safe_transaction = GetClogSafeTransaction(oldest_active);
if (safe_transaction) {
tx::transaction_id_t min_safe = *safe_transaction;
tx::TransactionId min_safe = *safe_transaction;
{
std::unique_lock<std::mutex> lock(worker_safe_transaction_mutex_);
for (auto worker_id : coordination_.GetWorkerIds()) {
@ -60,7 +60,7 @@ class StorageGcMaster : public StorageGc {
distributed::MasterCoordination &coordination_;
// Mapping of worker ids and oldest active transaction which is safe for
// deletion from worker perspective
std::unordered_map<int, tx::transaction_id_t> worker_safe_transaction_;
std::unordered_map<int, tx::TransactionId> worker_safe_transaction_;
std::mutex worker_safe_transaction_mutex_;
};
} // namespace database

View File

@ -14,7 +14,7 @@ class StorageGcSingleNode : public StorageGc {
scheduler_.Stop();
}
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
void CollectCommitLogGarbage(tx::TransactionId oldest_active) final {
auto safe_to_delete = GetClogSafeTransaction(oldest_active);
if (safe_to_delete) tx_engine_.GarbageCollectCommitLog(*safe_to_delete);
}

View File

@ -24,7 +24,7 @@ class StorageGcWorker : public StorageGc {
scheduler_.Stop();
}
void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final {
void CollectCommitLogGarbage(tx::TransactionId oldest_active) final {
// We first need to delete transactions that we can delete to be sure that
// the locks are released as well. Otherwise some new transaction might
// try to acquire a lock which hasn't been released (if the transaction

View File

@ -22,7 +22,7 @@ TRecord *Cache<TRecord>::FindNew(gid::Gid gid) {
}
template <typename TRecord>
void Cache<TRecord>::FindSetOldNew(tx::transaction_id_t tx_id, int worker_id,
void Cache<TRecord>::FindSetOldNew(tx::TransactionId tx_id, int worker_id,
gid::Gid gid, TRecord *&old_record,
TRecord *&new_record) {
{

View File

@ -37,7 +37,7 @@ class Cache {
/// from the given transaction's ID and command ID, and caches it. Sets the
/// given pointers to point to the fetched data. Analogue to
/// mvcc::VersionList::find_set_old_new.
void FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, gid::Gid gid,
void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid,
TRecord *&old_record, TRecord *&new_record);
/// Sets the given records as (new, old) data for the given gid.

View File

@ -5,7 +5,7 @@ namespace distributed {
template <typename TRecord>
Cache<TRecord> &DataManager::GetCache(CacheT<TRecord> &collection,
tx::transaction_id_t tx_id) {
tx::TransactionId tx_id) {
auto access = collection.access();
auto found = access.find(tx_id);
if (found != access.end()) return found->second;
@ -17,12 +17,12 @@ Cache<TRecord> &DataManager::GetCache(CacheT<TRecord> &collection,
}
template <>
Cache<Vertex> &DataManager::Elements<Vertex>(tx::transaction_id_t tx_id) {
Cache<Vertex> &DataManager::Elements<Vertex>(tx::TransactionId tx_id) {
return GetCache(vertices_caches_, tx_id);
}
template <>
Cache<Edge> &DataManager::Elements<Edge>(tx::transaction_id_t tx_id) {
Cache<Edge> &DataManager::Elements<Edge>(tx::TransactionId tx_id) {
return GetCache(edges_caches_, tx_id);
}
@ -30,12 +30,12 @@ DataManager::DataManager(database::Storage &storage,
distributed::DataRpcClients &data_clients)
: storage_(storage), data_clients_(data_clients) {}
void DataManager::ClearCacheForSingleTransaction(tx::transaction_id_t tx_id) {
void DataManager::ClearCacheForSingleTransaction(tx::TransactionId tx_id) {
Elements<Vertex>(tx_id).ClearCache();
Elements<Edge>(tx_id).ClearCache();
}
void DataManager::ClearTransactionalCache(tx::transaction_id_t oldest_active) {
void DataManager::ClearTransactionalCache(tx::TransactionId oldest_active) {
auto vertex_access = vertices_caches_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {

View File

@ -17,12 +17,12 @@ namespace distributed {
/// Handles remote data caches for edges and vertices, per transaction.
class DataManager {
template <typename TRecord>
using CacheT = ConcurrentMap<tx::transaction_id_t, Cache<TRecord>>;
using CacheT = ConcurrentMap<tx::TransactionId, Cache<TRecord>>;
// Helper, gets or inserts a data cache for the given transaction.
template <typename TRecord>
Cache<TRecord> &GetCache(CacheT<TRecord> &collection,
tx::transaction_id_t tx_id);
tx::TransactionId tx_id);
public:
DataManager(database::Storage &storage,
@ -30,14 +30,14 @@ class DataManager {
/// Gets or creates the remote vertex/edge cache for the given transaction.
template <typename TRecord>
Cache<TRecord> &Elements(tx::transaction_id_t tx_id);
Cache<TRecord> &Elements(tx::TransactionId tx_id);
/// Removes all the caches for a single transaction.
void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id);
void ClearCacheForSingleTransaction(tx::TransactionId tx_id);
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active);
void ClearTransactionalCache(tx::TransactionId oldest_active);
private:
database::Storage &storage_;

View File

@ -7,7 +7,7 @@ namespace distributed {
template <>
std::unique_ptr<Edge> DataRpcClients::RemoteElement(int worker_id,
tx::transaction_id_t tx_id,
tx::TransactionId tx_id,
gid::Gid gid) {
auto response =
clients_.GetClientPool(worker_id).Call<EdgeRpc>(TxGidPair{tx_id, gid});
@ -17,7 +17,7 @@ std::unique_ptr<Edge> DataRpcClients::RemoteElement(int worker_id,
template <>
std::unique_ptr<Vertex> DataRpcClients::RemoteElement(
int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) {
int worker_id, tx::TransactionId tx_id, gid::Gid gid) {
auto response =
clients_.GetClientPool(worker_id).Call<VertexRpc>(TxGidPair{tx_id, gid});
CHECK(response) << "VertexRpc failed";

View File

@ -18,7 +18,7 @@ class DataRpcClients {
/// must be visible in given transaction.
template <typename TRecord>
std::unique_ptr<TRecord> RemoteElement(int worker_id,
tx::transaction_id_t tx_id,
tx::TransactionId tx_id,
gid::Gid gid);
private:

View File

@ -13,7 +13,7 @@
namespace distributed {
struct TxGidPair {
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
gid::Gid gid;
private:

View File

@ -6,7 +6,7 @@
namespace distributed {
utils::Future<bool> DurabilityRpcClients::MakeSnapshot(
tx::transaction_id_t tx) {
tx::TransactionId tx) {
return std::async(std::launch::async, [this, tx] {
auto futures = clients_.ExecuteOnWorkers<bool>(
0, [tx](communication::rpc::ClientPool &client_pool) {

View File

@ -19,7 +19,7 @@ class DurabilityRpcClients {
// if all workers sucesfully completed their snapshot creation, false
// otherwise
// @param tx - transaction from which to take db snapshot
utils::Future<bool> MakeSnapshot(tx::transaction_id_t tx);
utils::Future<bool> MakeSnapshot(tx::TransactionId tx);
private:
RpcWorkerClients &clients_;

View File

@ -8,7 +8,7 @@
namespace distributed {
RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotReq, tx::transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotReq, tx::TransactionId);
RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotRes, bool);
using MakeSnapshotRpc =

View File

@ -11,7 +11,7 @@ namespace distributed {
struct IndexLabelPropertyTx {
storage::Label label;
storage::Property property;
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
private:
friend class boost::serialization::access;

View File

@ -8,7 +8,7 @@
namespace distributed {
ProduceRpcServer::OngoingProduce::OngoingProduce(
database::GraphDb &db, tx::transaction_id_t tx_id,
database::GraphDb &db, tx::TransactionId tx_id,
std::shared_ptr<query::plan::LogicalOperator> op,
query::SymbolTable symbol_table, Parameters parameters,
std::vector<query::Symbol> pull_symbols)
@ -109,7 +109,7 @@ ProduceRpcServer::ProduceRpcServer(
}
void ProduceRpcServer::FinishAndClearOngoingProducePlans(
tx::transaction_id_t tx_id) {
tx::TransactionId tx_id) {
std::lock_guard<std::mutex> guard{ongoing_produces_lock_};
for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) {
if (it->first.first == tx_id) {

View File

@ -32,7 +32,7 @@ class ProduceRpcServer {
/// MG (see query::plan::Synchronize).
class OngoingProduce {
public:
OngoingProduce(database::GraphDb &db, tx::transaction_id_t tx_id,
OngoingProduce(database::GraphDb &db, tx::TransactionId tx_id,
std::shared_ptr<query::plan::LogicalOperator> op,
query::SymbolTable symbol_table, Parameters parameters,
std::vector<query::Symbol> pull_symbols);
@ -66,12 +66,12 @@ class ProduceRpcServer {
/// Finish and clear ongoing produces for all plans that are tied to a
/// transaction with tx_id.
void FinishAndClearOngoingProducePlans(tx::transaction_id_t tx_id);
void FinishAndClearOngoingProducePlans(tx::TransactionId tx_id);
private:
std::mutex ongoing_produces_lock_;
/// Mapping of (tx id, plan id) to OngoingProduce.
std::map<std::pair<tx::transaction_id_t, int64_t>, OngoingProduce>
std::map<std::pair<tx::TransactionId, int64_t>, OngoingProduce>
ongoing_produces_;
database::GraphDb &db_;
communication::rpc::Server &produce_rpc_server_;

View File

@ -37,7 +37,7 @@ enum class PullState {
struct PullReq : public communication::rpc::Message {
PullReq() {}
PullReq(tx::transaction_id_t tx_id, tx::Snapshot tx_snapshot, int64_t plan_id,
PullReq(tx::TransactionId tx_id, tx::Snapshot tx_snapshot, int64_t plan_id,
const Parameters &params, std::vector<query::Symbol> symbols,
bool accumulate, int batch_size, bool send_old, bool send_new)
: tx_id(tx_id),
@ -50,7 +50,7 @@ struct PullReq : public communication::rpc::Message {
send_old(send_old),
send_new(send_new) {}
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
tx::Snapshot tx_snapshot;
int64_t plan_id;
Parameters params;
@ -367,7 +367,7 @@ using PullRpc = communication::rpc::RequestResponse<PullReq, PullRes>;
// optimization not to have to send the full PullReqData pack every
// time.
RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::TransactionId);
RPC_NO_MEMBER_MESSAGE(TransactionCommandAdvancedRes);
using TransactionCommandAdvancedRpc =
communication::rpc::RequestResponse<TransactionCommandAdvancedReq,

View File

@ -62,7 +62,7 @@ utils::Future<PullData> PullRpcClients::Pull(
std::vector<utils::Future<void>>
PullRpcClients::NotifyAllTransactionCommandAdvanced(
tx::transaction_id_t tx_id) {
tx::TransactionId tx_id) {
return clients_.ExecuteOnWorkers<void>(0, [tx_id](auto &client) {
auto res = client.template Call<TransactionCommandAdvancedRpc>(tx_id);
CHECK(res) << "TransactionCommandAdvanceRpc failed";

View File

@ -38,7 +38,7 @@ class PullRpcClients {
auto GetWorkerIds() { return clients_.GetWorkerIds(); }
std::vector<utils::Future<void>> NotifyAllTransactionCommandAdvanced(
tx::transaction_id_t tx_id);
tx::TransactionId tx_id);
private:
RpcWorkerClients &clients_;

View File

@ -84,7 +84,7 @@ class IndexRpcClients {
auto GetBuildIndexFutures(const storage::Label &label,
const storage::Property &property,
tx::transaction_id_t transaction_id,
tx::TransactionId transaction_id,
int worker_id) {
return clients_.ExecuteOnWorkers<bool>(
worker_id, [label, property, transaction_id](
@ -109,7 +109,7 @@ class OngoingProduceJoinerRpcClients {
OngoingProduceJoinerRpcClients(RpcWorkerClients &clients)
: clients_(clients) {}
void JoinOngoingProduces(tx::transaction_id_t tx_id) {
void JoinOngoingProduces(tx::TransactionId tx_id) {
auto futures = clients_.ExecuteOnWorkers<void>(
0, [tx_id](communication::rpc::ClientPool &client_pool) {
auto result =

View File

@ -14,10 +14,10 @@ using Endpoint = io::network::Endpoint;
struct GcClearedStatusReq : public Message {
GcClearedStatusReq() {}
GcClearedStatusReq(tx::transaction_id_t local_oldest_active, int worker_id)
GcClearedStatusReq(tx::TransactionId local_oldest_active, int worker_id)
: local_oldest_active(local_oldest_active), worker_id(worker_id) {}
tx::transaction_id_t local_oldest_active;
tx::TransactionId local_oldest_active;
int worker_id;
private:

View File

@ -31,13 +31,13 @@ class TransactionalCacheCleaner {
protected:
/// Registers the given object for transactional cleaning. The object will
/// periodically get it's `ClearCache(tx::transaction_id_t)` method called
/// periodically get it's `ClearCache(tx::TransactionId)` method called
/// with the oldest active transaction id. Note that the ONLY guarantee for
/// the call param is that there are no transactions alive that have an id
/// lower than it.
template <typename TCache>
void Register(TCache &cache) {
functions_.emplace_back([&cache](tx::transaction_id_t oldest_active) {
functions_.emplace_back([&cache](tx::TransactionId oldest_active) {
cache.ClearTransactionalCache(oldest_active);
});
}
@ -49,12 +49,12 @@ class TransactionalCacheCleaner {
Register(caches...);
}
void Clear(tx::transaction_id_t oldest_active) {
void Clear(tx::TransactionId oldest_active) {
for (auto &f : functions_) f(oldest_active);
}
tx::Engine &tx_engine_;
std::vector<std::function<void(tx::transaction_id_t &oldest_active)>>
std::vector<std::function<void(tx::TransactionId &oldest_active)>>
functions_;
Scheduler cache_clearing_scheduler_;
};

View File

@ -5,7 +5,7 @@
namespace distributed {
RPC_SINGLE_MEMBER_MESSAGE(WaitOnTransactionEndReq, tx::transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(WaitOnTransactionEndReq, tx::TransactionId);
RPC_NO_MEMBER_MESSAGE(WaitOnTransactionEndRes);
using WaitOnTransactionEndRpc =
communication::rpc::RequestResponse<WaitOnTransactionEndReq,

View File

@ -33,7 +33,7 @@ UpdateResult UpdatesRpcClients::Update(int worker_id,
}
gid::Gid UpdatesRpcClients::CreateVertex(
int worker_id, tx::transaction_id_t tx_id,
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
@ -46,7 +46,7 @@ gid::Gid UpdatesRpcClients::CreateVertex(
}
storage::EdgeAddress UpdatesRpcClients::CreateEdge(
tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to,
tx::TransactionId tx_id, VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type) {
CHECK(from.address().is_remote()) << "In CreateEdge `from` must be remote";
@ -59,7 +59,7 @@ storage::EdgeAddress UpdatesRpcClients::CreateEdge(
return {res->member.gid, from_worker};
}
void UpdatesRpcClients::AddInEdge(tx::transaction_id_t tx_id,
void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id,
VertexAccessor &from,
storage::EdgeAddress edge_address,
VertexAccessor &to,
@ -76,7 +76,7 @@ void UpdatesRpcClients::AddInEdge(tx::transaction_id_t tx_id,
RaiseIfRemoteError(res->member);
}
void UpdatesRpcClients::RemoveVertex(int worker_id, tx::transaction_id_t tx_id,
void UpdatesRpcClients::RemoveVertex(int worker_id, tx::TransactionId tx_id,
gid::Gid gid, bool check_empty) {
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoveVertexRpc>(
RemoveVertexReqData{gid, tx_id, check_empty});
@ -84,7 +84,7 @@ void UpdatesRpcClients::RemoveVertex(int worker_id, tx::transaction_id_t tx_id,
RaiseIfRemoteError(res->member);
}
void UpdatesRpcClients::RemoveEdge(tx::transaction_id_t tx_id, int worker_id,
void UpdatesRpcClients::RemoveEdge(tx::TransactionId tx_id, int worker_id,
gid::Gid edge_gid, gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr) {
auto res = worker_clients_.GetClientPool(worker_id).Call<RemoveEdgeRpc>(
@ -93,7 +93,7 @@ void UpdatesRpcClients::RemoveEdge(tx::transaction_id_t tx_id, int worker_id,
RaiseIfRemoteError(res->member);
}
void UpdatesRpcClients::RemoveInEdge(tx::transaction_id_t tx_id, int worker_id,
void UpdatesRpcClients::RemoveInEdge(tx::TransactionId tx_id, int worker_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "RemoveInEdge edge_address is local.";
@ -104,7 +104,7 @@ void UpdatesRpcClients::RemoveInEdge(tx::transaction_id_t tx_id, int worker_id,
}
std::vector<utils::Future<UpdateResult>> UpdatesRpcClients::UpdateApplyAll(
int skip_worker_id, tx::transaction_id_t tx_id) {
int skip_worker_id, tx::TransactionId tx_id) {
return worker_clients_.ExecuteOnWorkers<UpdateResult>(
skip_worker_id, [tx_id](auto &client) {
auto res = client.template Call<UpdateApplyRpc>(tx_id);

View File

@ -28,7 +28,7 @@ class UpdatesRpcClients {
/// Creates a vertex on the given worker and returns it's id.
gid::Gid CreateVertex(
int worker_id, tx::transaction_id_t tx_id,
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties);
@ -38,18 +38,18 @@ class UpdatesRpcClients {
/// handled by a call to this function. Otherwise a separate call to
/// `AddInEdge` might be necessary. Throws all the exceptions that can
/// occur remotely as a result of updating a vertex.
storage::EdgeAddress CreateEdge(tx::transaction_id_t tx_id,
storage::EdgeAddress CreateEdge(tx::TransactionId tx_id,
VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type);
/// Adds the edge with the given address to the `to` vertex as an incoming
/// edge. Only used when `to` is remote and not on the same worker as `from`.
void AddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from,
void AddInEdge(tx::TransactionId tx_id, VertexAccessor &from,
storage::EdgeAddress edge_address, VertexAccessor &to,
storage::EdgeType edge_type);
/// Removes a vertex from the other worker.
void RemoveVertex(int worker_id, tx::transaction_id_t tx_id, gid::Gid gid,
void RemoveVertex(int worker_id, tx::TransactionId tx_id, gid::Gid gid,
bool check_empty);
/// Removes an edge on another worker. This also handles the `from` vertex
@ -57,17 +57,17 @@ class UpdatesRpcClients {
/// `to` vertex is on the same worker, then that side is handled too by the
/// single RPC call, otherwise a separate call has to be made to
/// RemoveInEdge.
void RemoveEdge(tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid,
void RemoveEdge(tx::TransactionId tx_id, int worker_id, gid::Gid edge_gid,
gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr);
void RemoveInEdge(tx::transaction_id_t tx_id, int worker_id,
void RemoveInEdge(tx::TransactionId tx_id, int worker_id,
gid::Gid vertex_id, storage::EdgeAddress edge_address);
/// Calls for all the workers (except the given one) to apply their updates
/// and returns the future results.
std::vector<utils::Future<UpdateResult>> UpdateApplyAll(
int skip_worker_id, tx::transaction_id_t tx_id);
int skip_worker_id, tx::TransactionId tx_id);
private:
RpcWorkerClients &worker_clients_;

View File

@ -26,7 +26,7 @@ RPC_SINGLE_MEMBER_MESSAGE(UpdateReq, database::StateDelta);
RPC_SINGLE_MEMBER_MESSAGE(UpdateRes, UpdateResult);
using UpdateRpc = communication::rpc::RequestResponse<UpdateReq, UpdateRes>;
RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyReq, tx::transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyReq, tx::TransactionId);
RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyRes, UpdateResult);
using UpdateApplyRpc =
communication::rpc::RequestResponse<UpdateApplyReq, UpdateApplyRes>;
@ -47,7 +47,7 @@ struct CreateResult {
};
struct CreateVertexReqData {
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
std::vector<storage::Label> labels;
std::unordered_map<storage::Property, query::TypedValue> properties;
@ -91,7 +91,7 @@ struct CreateEdgeReqData {
gid::Gid from;
storage::VertexAddress to;
storage::EdgeType edge_type;
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
private:
friend class boost::serialization::access;
@ -115,7 +115,7 @@ struct AddInEdgeReqData {
storage::EdgeAddress edge_address;
gid::Gid to;
storage::EdgeType edge_type;
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
private:
friend class boost::serialization::access;
@ -137,7 +137,7 @@ using AddInEdgeRpc =
struct RemoveVertexReqData {
gid::Gid gid;
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
bool check_empty;
private:
@ -157,7 +157,7 @@ using RemoveVertexRpc =
communication::rpc::RequestResponse<RemoveVertexReq, RemoveVertexRes>;
struct RemoveEdgeData {
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
gid::Gid edge_id;
gid::Gid vertex_from_id;
storage::VertexAddress vertex_to_address;
@ -180,7 +180,7 @@ using RemoveEdgeRpc =
communication::rpc::RequestResponse<RemoveEdgeReq, RemoveEdgeRes>;
struct RemoveInEdgeData {
tx::transaction_id_t tx_id;
tx::TransactionId tx_id;
gid::Gid vertex;
storage::EdgeAddress edge_address;

View File

@ -253,7 +253,7 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb &db,
});
}
UpdateResult UpdatesRpcServer::Apply(tx::transaction_id_t tx_id) {
UpdateResult UpdatesRpcServer::Apply(tx::TransactionId tx_id) {
auto apply = [tx_id](auto &collection) {
auto access = collection.access();
auto found = access.find(tx_id);
@ -273,7 +273,7 @@ UpdateResult UpdatesRpcServer::Apply(tx::transaction_id_t tx_id) {
}
void UpdatesRpcServer::ClearTransactionalCache(
tx::transaction_id_t oldest_active) {
tx::TransactionId oldest_active) {
auto vertex_access = vertex_updates_.access();
for (auto &kv : vertex_access) {
if (kv.first < oldest_active) {
@ -291,7 +291,7 @@ void UpdatesRpcServer::ClearTransactionalCache(
// Gets/creates the TransactionUpdates for the given transaction.
template <typename TAccessor>
UpdatesRpcServer::TransactionUpdates<TAccessor> &UpdatesRpcServer::GetUpdates(
MapT<TAccessor> &updates, tx::transaction_id_t tx_id) {
MapT<TAccessor> &updates, tx::TransactionId tx_id) {
return updates.access()
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(db_), tx_id))

View File

@ -32,7 +32,7 @@ class UpdatesRpcServer {
template <typename TRecordAccessor>
class TransactionUpdates {
public:
TransactionUpdates(database::GraphDb &db, tx::transaction_id_t tx_id)
TransactionUpdates(database::GraphDb &db, tx::TransactionId tx_id)
: db_accessor_(db, tx_id) {}
/// Adds a delta and returns the result. Does not modify the state (data) of
@ -74,25 +74,25 @@ class UpdatesRpcServer {
/// Applies all existsing updates for the given transaction ID. If there are
/// no updates for that transaction, nothing happens. Clears the updates cache
/// after applying them, regardless of the result.
UpdateResult Apply(tx::transaction_id_t tx_id);
UpdateResult Apply(tx::TransactionId tx_id);
/// Clears the cache of local transactions that are completed. The signature
/// of this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(tx::transaction_id_t oldest_active);
void ClearTransactionalCache(tx::TransactionId oldest_active);
private:
database::GraphDb &db_;
template <typename TAccessor>
using MapT =
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<TAccessor>>;
ConcurrentMap<tx::TransactionId, TransactionUpdates<TAccessor>>;
MapT<VertexAccessor> vertex_updates_;
MapT<EdgeAccessor> edge_updates_;
// Gets/creates the TransactionUpdates for the given transaction.
template <typename TAccessor>
TransactionUpdates<TAccessor> &GetUpdates(MapT<TAccessor> &updates,
tx::transaction_id_t tx_id);
tx::TransactionId tx_id);
// Performs edge creation for the given request.
CreateResult CreateEdge(const CreateEdgeReqData &req);

View File

@ -34,7 +34,7 @@ void CheckDurabilityDir(const std::string &durability_dir) {
}
}
std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
std::experimental::optional<tx::TransactionId> TransactionIdFromWalFilename(
const std::string &name) {
auto nullopt = std::experimental::nullopt;
// Get the max_transaction_id from the file name that has format
@ -45,7 +45,7 @@ std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
return nullopt;
}
if (utils::StartsWith(file_name_split[1], "current"))
return std::numeric_limits<tx::transaction_id_t>::max();
return std::numeric_limits<tx::TransactionId>::max();
file_name_split = utils::Split(file_name_split[1], "_");
if (file_name_split.size() != 5) {
LOG(WARNING) << "Unable to parse WAL file name: " << name;
@ -64,7 +64,7 @@ std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
}
fs::path MakeSnapshotPath(const fs::path &durability_dir, const int worker_id,
tx::transaction_id_t tx_id) {
tx::TransactionId tx_id) {
std::string date_str =
Timestamp(Timestamp::Now())
.ToString("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}");
@ -78,7 +78,7 @@ fs::path MakeSnapshotPath(const fs::path &durability_dir, const int worker_id,
/// WAL file for which the max tx id is still unknown.
fs::path WalFilenameForTransactionId(
const std::experimental::filesystem::path &wal_dir, int worker_id,
std::experimental::optional<tx::transaction_id_t> tx_id) {
std::experimental::optional<tx::TransactionId> tx_id) {
auto file_name = Timestamp::Now().ToIso8601();
if (tx_id) {
file_name += "__max_transaction_" + std::to_string(*tx_id);
@ -89,7 +89,7 @@ fs::path WalFilenameForTransactionId(
return wal_dir / file_name;
}
std::experimental::optional<tx::transaction_id_t>
std::experimental::optional<tx::TransactionId>
TransactionIdFromSnapshotFilename(const std::string &name) {
auto nullopt = std::experimental::nullopt;
auto file_name_split = utils::RSplit(name, "_tx_", 1);

View File

@ -22,7 +22,7 @@ void CheckDurabilityDir(const std::string &durability_dir);
/// is returned because that's appropriate for the recovery logic (the current
/// WAL does not yet have a maximum transaction ID and can't be discarded by
/// the recovery regardless of the snapshot from which the transaction starts).
std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
std::experimental::optional<tx::TransactionId> TransactionIdFromWalFilename(
const std::string &name);
/** Generates a path for a DB snapshot in the given folder in a well-defined
@ -30,11 +30,11 @@ std::experimental::optional<tx::transaction_id_t> TransactionIdFromWalFilename(
* created appended to the file name. */
std::experimental::filesystem::path MakeSnapshotPath(
const std::experimental::filesystem::path &durability_dir, int worker_id,
tx::transaction_id_t tx_id);
tx::TransactionId tx_id);
/// Returns the transaction id contained in the file name. If the filename is
/// not a parseable WAL file name, nullopt is returned.
std::experimental::optional<tx::transaction_id_t>
std::experimental::optional<tx::TransactionId>
TransactionIdFromSnapshotFilename(const std::string &name);
/// Generates a file path for a write-ahead log file of a specified worker. If
@ -42,6 +42,6 @@ TransactionIdFromSnapshotFilename(const std::string &name);
/// path is for the "current" WAL file for which the max tx id is still unknown.
std::experimental::filesystem::path WalFilenameForTransactionId(
const std::experimental::filesystem::path &wal_dir, int worker_id,
std::experimental::optional<tx::transaction_id_t> tx_id =
std::experimental::optional<tx::TransactionId> tx_id =
std::experimental::nullopt);
} // namespace durability

View File

@ -36,9 +36,9 @@ using communication::bolt::DecodedValue;
// A data structure for exchanging info between main recovery function and
// snapshot and WAL recovery functions.
struct RecoveryData {
tx::transaction_id_t snapshooter_tx_id{0};
tx::transaction_id_t wal_max_recovered_tx_id{0};
std::vector<tx::transaction_id_t> snapshooter_tx_snapshot;
tx::TransactionId snapshooter_tx_id{0};
tx::TransactionId wal_max_recovered_tx_id{0};
std::vector<tx::TransactionId> snapshooter_tx_snapshot;
// A collection into which the indexes should be added so they
// can be rebuilt at the end of the recovery transaction.
std::vector<std::pair<std::string, std::string>> indexes;
@ -231,7 +231,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
// than the latest one we have recovered. Do this to make sure that
// subsequently created snapshots and WAL files will have transactional info
// that does not interfere with that found in previous snapshots and WAL.
tx::transaction_id_t max_id = recovery_data.snapshooter_tx_id;
tx::TransactionId max_id = recovery_data.snapshooter_tx_id;
auto &snap = recovery_data.snapshooter_tx_snapshot;
if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end());
dba.db().tx_engine().EnsureNextIdGreater(max_id);
@ -257,15 +257,15 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
auto first_to_recover = tx_sn.empty() ? recovery_data.snapshooter_tx_id + 1
: *std::min(tx_sn.begin(), tx_sn.end());
auto should_skip = [&tx_sn, &recovery_data,
first_to_recover](tx::transaction_id_t tx_id) {
first_to_recover](tx::TransactionId tx_id) {
return tx_id < first_to_recover ||
(tx_id < recovery_data.snapshooter_tx_id &&
!utils::Contains(tx_sn, tx_id));
};
std::unordered_map<tx::transaction_id_t, database::GraphDbAccessor> accessors;
std::unordered_map<tx::TransactionId, database::GraphDbAccessor> accessors;
auto get_accessor =
[&accessors](tx::transaction_id_t tx_id) -> database::GraphDbAccessor & {
[&accessors](tx::TransactionId tx_id) -> database::GraphDbAccessor & {
auto found = accessors.find(tx_id);
CHECK(found != accessors.end())
<< "Accessor does not exist for transaction: " << tx_id;
@ -276,7 +276,7 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db,
// then the latest one we have recovered. Do this to make sure that
// subsequently created snapshots and WAL files will have transactional info
// that does not interfere with that found in previous snapshots and WAL.
tx::transaction_id_t max_observed_tx_id{0};
tx::TransactionId max_observed_tx_id{0};
// Read all the WAL files whose max_tx_id is not smaller than
// min_tx_to_recover.

View File

@ -16,11 +16,11 @@ namespace durability {
/// Stores info on what was (or needs to be) recovered from durability.
struct RecoveryInfo {
RecoveryInfo() {}
RecoveryInfo(tx::transaction_id_t snapshot_tx_id,
tx::transaction_id_t max_wal_tx_id)
RecoveryInfo(tx::TransactionId snapshot_tx_id,
tx::TransactionId max_wal_tx_id)
: snapshot_tx_id(snapshot_tx_id), max_wal_tx_id(max_wal_tx_id) {}
tx::transaction_id_t snapshot_tx_id;
tx::transaction_id_t max_wal_tx_id;
tx::TransactionId snapshot_tx_id;
tx::TransactionId max_wal_tx_id;
bool operator==(const RecoveryInfo &other) const {
return snapshot_tx_id == other.snapshot_tx_id &&

View File

@ -75,7 +75,7 @@ class WriteAheadLog {
// The latest transaction whose delta is recorded in the current WAL file.
// Zero indicates that no deltas have so far been written to the current WAL
// file.
tx::transaction_id_t latest_tx_{0};
tx::TransactionId latest_tx_{0};
void RotateFile();
};

View File

@ -31,8 +31,8 @@ class Record : public Version<T> {
// again. i know, it happened to me.
// fetch expiration info in a safe way (see fetch_exp for details)
tx::transaction_id_t tx_exp;
tx::command_id_t cmd_exp;
tx::TransactionId tx_exp;
tx::CommandId cmd_exp;
std::tie(tx_exp, cmd_exp) = fetch_exp();
return ((tx_.cre == t.id_ && // inserted by the current transaction
@ -109,8 +109,8 @@ class Record : public Version<T> {
// queries which can match, update and return in the same query
bool is_visible_write(const tx::Transaction &t) {
// fetch expiration info in a safe way (see fetch_exp for details)
tx::transaction_id_t tx_exp;
tx::command_id_t cmd_exp;
tx::TransactionId tx_exp;
tx::CommandId cmd_exp;
std::tie(tx_exp, cmd_exp) = fetch_exp();
return (tx_.cre == t.id_ && // inserted by the current transaction
@ -150,7 +150,7 @@ class Record : public Version<T> {
// Exp is aborted and we can't set the hint, this way we don't have to set
// the hint because an aborted transaction which expires a record is the
// same thing as a non-expired record
tx::transaction_id_t expected;
tx::TransactionId expected;
do {
expected = tx_.exp;
// If the transaction expiry is no longer aborted we don't need to
@ -208,13 +208,13 @@ class Record : public Version<T> {
// and tx.exp is the id of the transaction that deleted the record
// These values are used to determine the visibility of the record
// to the current transaction.
CreExp<tx::transaction_id_t> tx_;
CreExp<tx::TransactionId> tx_;
// cmd.cre is the id of the command in this transaction that created the
// record and cmd.exp is the id of the command in this transaction that
// deleted the record. These values are used to determine the visibility
// of the record to the current command in the running transaction.
CreExp<tx::command_id_t> cmd_;
CreExp<tx::CommandId> cmd_;
mutable Hints hints_;
/** Fetch the (transaction, command) expiration before the check
@ -222,8 +222,8 @@ class Record : public Version<T> {
* Do it in a loop to ensure that command is consistent with transaction.
*/
auto fetch_exp() const {
tx::transaction_id_t tx_exp;
tx::command_id_t cmd_exp;
tx::TransactionId tx_exp;
tx::CommandId cmd_exp;
do {
tx_exp = tx_.exp;
cmd_exp = cmd_.exp;
@ -274,7 +274,7 @@ class Record : public Version<T> {
* @param id - id to check if it's commited and visible
* @return true if the id is commited and visible for the transaction t.
*/
bool visible_from(uint8_t mask, tx::transaction_id_t id,
bool visible_from(uint8_t mask, tx::TransactionId id,
const tx::Transaction &t) {
DCHECK(mask == Hints::kCre || mask == Hints::kExp)
<< "Mask must be either kCre or kExp";

View File

@ -22,8 +22,8 @@ class DeferredDeleter {
*/
struct DeletedObject {
const T *object;
const tx::transaction_id_t deleted_at;
DeletedObject(const T *object, tx::transaction_id_t deleted_at)
const tx::TransactionId deleted_at;
DeletedObject(const T *object, tx::TransactionId deleted_at)
: object(object), deleted_at(deleted_at) {}
};
@ -44,7 +44,7 @@ class DeferredDeleter {
*/
void AddObjects(const std::vector<DeletedObject> &objects) {
auto previous_tx_id = objects_.empty()
? std::numeric_limits<tx::transaction_id_t>::min()
? std::numeric_limits<tx::TransactionId>::min()
: objects_.back().deleted_at;
for (auto object : objects) {
CHECK(previous_tx_id <= object.deleted_at)
@ -58,7 +58,7 @@ class DeferredDeleter {
* @brief - Free memory of objects deleted before the id.
* @param id - delete before this id
*/
void FreeExpiredObjects(tx::transaction_id_t id) {
void FreeExpiredObjects(tx::TransactionId id) {
auto it = objects_.begin();
while (it != objects_.end() && it->deleted_at < id) {
delete it->object;

View File

@ -17,10 +17,10 @@ namespace {
// transaction in that cycle. If start transaction is not in a cycle nullopt is
// returned.
template <typename TAccessor>
std::experimental::optional<tx::transaction_id_t> FindOldestTxInLockCycle(
tx::transaction_id_t start, TAccessor &graph_accessor) {
std::vector<tx::transaction_id_t> path;
std::unordered_set<tx::transaction_id_t> visited;
std::experimental::optional<tx::TransactionId> FindOldestTxInLockCycle(
tx::TransactionId start, TAccessor &graph_accessor) {
std::vector<tx::TransactionId> path;
std::unordered_set<tx::TransactionId> visited;
auto current = start;
@ -45,8 +45,8 @@ std::experimental::optional<tx::transaction_id_t> FindOldestTxInLockCycle(
} // namespace
bool RecordLock::TryLock(tx::transaction_id_t tx_id) {
tx::transaction_id_t unlocked{0};
bool RecordLock::TryLock(tx::TransactionId tx_id) {
tx::TransactionId unlocked{0};
return owner_.compare_exchange_strong(unlocked, tx_id);
}
@ -55,7 +55,7 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) {
return LockStatus::Acquired;
}
tx::transaction_id_t owner = owner_;
tx::TransactionId owner = owner_;
if (owner_ == tx.id_) return LockStatus::AlreadyHeld;
// In a distributed worker the transaction objects (and the locks they own)

View File

@ -21,11 +21,11 @@ class RecordLock {
void Unlock();
private:
bool TryLock(tx::transaction_id_t tx_id);
bool TryLock(tx::TransactionId tx_id);
// Arbitrary choosen constant, postgresql uses 1 second so do we.
constexpr static std::chrono::duration<double> kTimeout{
std::chrono::seconds(1)};
std::atomic<tx::transaction_id_t> owner_{0};
std::atomic<tx::TransactionId> owner_{0};
};

View File

@ -18,25 +18,25 @@ class CommitLog {
CommitLog &operator=(const CommitLog &) = delete;
CommitLog &operator=(CommitLog &&) = delete;
bool is_active(transaction_id_t id) const {
bool is_active(TransactionId id) const {
return fetch_info(id).is_active();
}
bool is_committed(transaction_id_t id) const {
bool is_committed(TransactionId id) const {
return fetch_info(id).is_committed();
}
void set_committed(transaction_id_t id) { log.set(2 * id); }
void set_committed(TransactionId id) { log.set(2 * id); }
bool is_aborted(transaction_id_t id) const {
bool is_aborted(TransactionId id) const {
return fetch_info(id).is_aborted();
}
void set_aborted(transaction_id_t id) { log.set(2 * id + 1); }
void set_aborted(TransactionId id) { log.set(2 * id + 1); }
// Clears the commit log from bits associated with transactions with an id
// lower than `id`.
void garbage_collect_older(transaction_id_t id) { log.delete_prefix(2 * id); }
void garbage_collect_older(TransactionId id) { log.delete_prefix(2 * id); }
class Info {
public:
@ -68,7 +68,7 @@ class CommitLog {
uint8_t flags_{0};
};
Info fetch_info(transaction_id_t id) const { return Info{log.at(2 * id, 2)}; }
Info fetch_info(TransactionId id) const { return Info{log.at(2 * id, 2)}; }
private:
DynamicBitset<uint8_t, kBitsetBlockSize> log;

View File

@ -31,10 +31,10 @@ class Engine {
virtual Transaction *Begin() = 0;
/// Advances the command on the transaction with the given id.
virtual command_id_t Advance(transaction_id_t id) = 0;
virtual CommandId Advance(TransactionId id) = 0;
/// Updates the command on the workers to the master's value.
virtual command_id_t UpdateCommand(transaction_id_t id) = 0;
virtual CommandId UpdateCommand(TransactionId id) = 0;
/// Comits the given transaction. Deletes the transaction object, it's not
/// valid after this function executes.
@ -45,7 +45,7 @@ class Engine {
virtual void Abort(const Transaction &t) = 0;
/** Returns the commit log Info about the given transaction. */
virtual CommitLog::Info Info(transaction_id_t tx) const = 0;
virtual CommitLog::Info Info(TransactionId tx) const = 0;
/** Returns the snapshot relevant to garbage collection of database records.
*
@ -69,29 +69,29 @@ class Engine {
virtual Snapshot GlobalActiveTransactions() = 0;
/** Returns the ID the last globally known transaction. */
virtual tx::transaction_id_t GlobalLast() const = 0;
virtual tx::TransactionId GlobalLast() const = 0;
/** Returns the ID of last locally known transaction. */
virtual tx::transaction_id_t LocalLast() const = 0;
virtual tx::TransactionId LocalLast() const = 0;
/** Returns the ID of the oldest transaction locally known to be active. It is
* guaranteed that all the transactions older than the returned are globally
* not active. */
virtual transaction_id_t LocalOldestActive() const = 0;
virtual TransactionId LocalOldestActive() const = 0;
/** Calls function f on each locally active transaction. */
virtual void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) = 0;
/** Gets a transaction object for a running transaction. */
virtual tx::Transaction *RunningTransaction(transaction_id_t tx_id) = 0;
virtual tx::Transaction *RunningTransaction(TransactionId tx_id) = 0;
/** Ensures the next transaction that starts will have the ID greater than
* the given id. */
virtual void EnsureNextIdGreater(transaction_id_t tx_id) = 0;
virtual void EnsureNextIdGreater(TransactionId tx_id) = 0;
/** Garbage collects transactions older than tx_id from commit log. */
virtual void GarbageCollectCommitLog(transaction_id_t tx_id) = 0;
virtual void GarbageCollectCommitLog(TransactionId tx_id) = 0;
auto &local_lock_graph() { return local_lock_graph_; }
const auto &local_lock_graph() const { return local_lock_graph_; }
@ -100,6 +100,6 @@ class Engine {
// Map lock dependencies. Each entry maps (tx_that_wants_lock,
// tx_that_holds_lock). Used for local deadlock resolution.
// TODO consider global deadlock resolution.
ConcurrentMap<transaction_id_t, transaction_id_t> local_lock_graph_;
ConcurrentMap<TransactionId, TransactionId> local_lock_graph_;
};
} // namespace tx

View File

@ -9,7 +9,7 @@ namespace tx {
RPC_NO_MEMBER_MESSAGE(BeginReq);
struct TxAndSnapshot {
transaction_id_t tx_id;
TransactionId tx_id;
Snapshot snapshot;
private:
@ -23,32 +23,32 @@ struct TxAndSnapshot {
RPC_SINGLE_MEMBER_MESSAGE(BeginRes, TxAndSnapshot);
using BeginRpc = communication::rpc::RequestResponse<BeginReq, BeginRes>;
RPC_SINGLE_MEMBER_MESSAGE(AdvanceReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(AdvanceRes, command_id_t);
RPC_SINGLE_MEMBER_MESSAGE(AdvanceReq, TransactionId);
RPC_SINGLE_MEMBER_MESSAGE(AdvanceRes, CommandId);
using AdvanceRpc = communication::rpc::RequestResponse<AdvanceReq, AdvanceRes>;
RPC_SINGLE_MEMBER_MESSAGE(CommitReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(CommitReq, TransactionId);
RPC_NO_MEMBER_MESSAGE(CommitRes);
using CommitRpc = communication::rpc::RequestResponse<CommitReq, CommitRes>;
RPC_SINGLE_MEMBER_MESSAGE(AbortReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(AbortReq, TransactionId);
RPC_NO_MEMBER_MESSAGE(AbortRes);
using AbortRpc = communication::rpc::RequestResponse<AbortReq, AbortRes>;
RPC_SINGLE_MEMBER_MESSAGE(SnapshotReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(SnapshotReq, TransactionId);
RPC_SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot);
using SnapshotRpc =
communication::rpc::RequestResponse<SnapshotReq, SnapshotRes>;
RPC_SINGLE_MEMBER_MESSAGE(CommandReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(CommandRes, command_id_t);
RPC_SINGLE_MEMBER_MESSAGE(CommandReq, TransactionId);
RPC_SINGLE_MEMBER_MESSAGE(CommandRes, CommandId);
using CommandRpc = communication::rpc::RequestResponse<CommandReq, CommandRes>;
RPC_NO_MEMBER_MESSAGE(GcSnapshotReq);
using GcSnapshotRpc =
communication::rpc::RequestResponse<GcSnapshotReq, SnapshotRes>;
RPC_SINGLE_MEMBER_MESSAGE(ClogInfoReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(ClogInfoReq, TransactionId);
RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info);
using ClogInfoRpc =
communication::rpc::RequestResponse<ClogInfoReq, ClogInfoRes>;
@ -57,14 +57,14 @@ RPC_NO_MEMBER_MESSAGE(ActiveTransactionsReq);
using ActiveTransactionsRpc =
communication::rpc::RequestResponse<ActiveTransactionsReq, SnapshotRes>;
RPC_SINGLE_MEMBER_MESSAGE(EnsureNextIdGreaterReq, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(EnsureNextIdGreaterReq, TransactionId);
RPC_NO_MEMBER_MESSAGE(EnsureNextIdGreaterRes);
using EnsureNextIdGreaterRpc =
communication::rpc::RequestResponse<EnsureNextIdGreaterReq,
EnsureNextIdGreaterRes>;
RPC_NO_MEMBER_MESSAGE(GlobalLastReq);
RPC_SINGLE_MEMBER_MESSAGE(GlobalLastRes, transaction_id_t);
RPC_SINGLE_MEMBER_MESSAGE(GlobalLastRes, TransactionId);
using GlobalLastRpc =
communication::rpc::RequestResponse<GlobalLastReq, GlobalLastRes>;
} // namespace tx

View File

@ -16,7 +16,7 @@ Transaction *SingleNodeEngine::Begin() {
VLOG(11) << "[Tx] Starting transaction " << counter_ + 1;
std::lock_guard<SpinLock> guard(lock_);
transaction_id_t id{++counter_};
TransactionId id{++counter_};
auto t = new Transaction(id, active_, *this);
active_.insert(id);
store_.emplace(id, t);
@ -26,7 +26,7 @@ Transaction *SingleNodeEngine::Begin() {
return t;
}
command_id_t SingleNodeEngine::Advance(transaction_id_t id) {
CommandId SingleNodeEngine::Advance(TransactionId id) {
std::lock_guard<SpinLock> guard(lock_);
auto it = store_.find(id);
@ -34,7 +34,7 @@ command_id_t SingleNodeEngine::Advance(transaction_id_t id) {
<< "Transaction::advance on non-existing transaction";
Transaction *t = it->second.get();
if (t->cid_ == std::numeric_limits<command_id_t>::max())
if (t->cid_ == std::numeric_limits<CommandId>::max())
throw TransactionError(
"Reached maximum number of commands in this "
"transaction.");
@ -42,7 +42,7 @@ command_id_t SingleNodeEngine::Advance(transaction_id_t id) {
return ++(t->cid_);
}
command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) {
CommandId SingleNodeEngine::UpdateCommand(TransactionId id) {
std::lock_guard<SpinLock> guard(lock_);
auto it = store_.find(id);
DCHECK(it != store_.end())
@ -72,7 +72,7 @@ void SingleNodeEngine::Abort(const Transaction &t) {
store_.erase(store_.find(t.id_));
}
CommitLog::Info SingleNodeEngine::Info(transaction_id_t tx) const {
CommitLog::Info SingleNodeEngine::Info(TransactionId tx) const {
return clog_.fetch_info(tx);
}
@ -98,19 +98,19 @@ Snapshot SingleNodeEngine::GlobalActiveTransactions() {
return active_transactions;
}
transaction_id_t SingleNodeEngine::LocalLast() const {
TransactionId SingleNodeEngine::LocalLast() const {
std::lock_guard<SpinLock> guard(lock_);
return counter_;
}
transaction_id_t SingleNodeEngine::GlobalLast() const { return LocalLast(); }
TransactionId SingleNodeEngine::GlobalLast() const { return LocalLast(); }
transaction_id_t SingleNodeEngine::LocalOldestActive() const {
TransactionId SingleNodeEngine::LocalOldestActive() const {
std::lock_guard<SpinLock> guard(lock_);
return active_.empty() ? counter_ + 1 : active_.front();
}
void SingleNodeEngine::GarbageCollectCommitLog(transaction_id_t tx_id) {
void SingleNodeEngine::GarbageCollectCommitLog(TransactionId tx_id) {
clog_.garbage_collect_older(tx_id);
}
@ -122,7 +122,7 @@ void SingleNodeEngine::LocalForEachActiveTransaction(
}
}
Transaction *SingleNodeEngine::RunningTransaction(transaction_id_t tx_id) {
Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) {
std::lock_guard<SpinLock> guard(lock_);
auto found = store_.find(tx_id);
CHECK(found != store_.end())
@ -130,7 +130,7 @@ Transaction *SingleNodeEngine::RunningTransaction(transaction_id_t tx_id) {
return found->second.get();
}
void SingleNodeEngine::EnsureNextIdGreater(transaction_id_t tx_id) {
void SingleNodeEngine::EnsureNextIdGreater(TransactionId tx_id) {
std::lock_guard<SpinLock> guard(lock_);
counter_ = std::max(tx_id, counter_);
}

View File

@ -30,26 +30,26 @@ class SingleNodeEngine : public Engine {
explicit SingleNodeEngine(durability::WriteAheadLog *wal = nullptr);
Transaction *Begin() override;
command_id_t Advance(transaction_id_t id) override;
command_id_t UpdateCommand(transaction_id_t id) override;
CommandId Advance(TransactionId id) override;
CommandId UpdateCommand(TransactionId id) override;
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
CommitLog::Info Info(transaction_id_t tx) const override;
CommitLog::Info Info(TransactionId tx) const override;
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;
transaction_id_t GlobalLast() const override;
transaction_id_t LocalLast() const override;
transaction_id_t LocalOldestActive() const override;
TransactionId GlobalLast() const override;
TransactionId LocalLast() const override;
TransactionId LocalOldestActive() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
Transaction *RunningTransaction(transaction_id_t tx_id) override;
void EnsureNextIdGreater(transaction_id_t tx_id) override;
void GarbageCollectCommitLog(transaction_id_t tx_id) override;
Transaction *RunningTransaction(TransactionId tx_id) override;
void EnsureNextIdGreater(TransactionId tx_id) override;
void GarbageCollectCommitLog(TransactionId tx_id) override;
private:
transaction_id_t counter_{0};
TransactionId counter_{0};
CommitLog clog_;
std::unordered_map<transaction_id_t, std::unique_ptr<Transaction>> store_;
std::unordered_map<TransactionId, std::unique_ptr<Transaction>> store_;
Snapshot active_;
mutable SpinLock lock_;
// Optional. If present, the Engine will write tx Begin/Commit/Abort

View File

@ -29,7 +29,7 @@ Transaction *WorkerEngine::Begin() {
return tx;
}
command_id_t WorkerEngine::Advance(transaction_id_t tx_id) {
CommandId WorkerEngine::Advance(TransactionId tx_id) {
auto res = master_client_pool_.Call<AdvanceRpc>(tx_id);
CHECK(res) << "AdvanceRpc failed";
auto access = active_.access();
@ -40,7 +40,7 @@ command_id_t WorkerEngine::Advance(transaction_id_t tx_id) {
return res->member;
}
command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) {
CommandId WorkerEngine::UpdateCommand(TransactionId tx_id) {
auto res = master_client_pool_.Call<CommandRpc>(tx_id);
CHECK(res) << "CommandRpc failed";
auto cmd_id = res->member;
@ -74,7 +74,7 @@ void WorkerEngine::Abort(const Transaction &t) {
ClearSingleTransaction(t.id_);
}
CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const {
CommitLog::Info WorkerEngine::Info(TransactionId tid) const {
auto info = clog_.fetch_info(tid);
// If we don't know the transaction to be commited nor aborted, ask the
// master about it and update the local commit log.
@ -110,8 +110,8 @@ Snapshot WorkerEngine::GlobalActiveTransactions() {
return snapshot;
}
transaction_id_t WorkerEngine::LocalLast() const { return local_last_; }
transaction_id_t WorkerEngine::GlobalLast() const {
TransactionId WorkerEngine::LocalLast() const { return local_last_; }
TransactionId WorkerEngine::GlobalLast() const {
auto res = master_client_pool_.Call<GlobalLastRpc>();
CHECK(res) << "GlobalLastRpc failed";
return res->member;
@ -122,11 +122,11 @@ void WorkerEngine::LocalForEachActiveTransaction(
for (auto pair : active_.access()) f(*pair.second);
}
transaction_id_t WorkerEngine::LocalOldestActive() const {
TransactionId WorkerEngine::LocalOldestActive() const {
return oldest_active_;
}
Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) {
Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
if (found != accessor.end()) return found->second;
@ -138,7 +138,7 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) {
return RunningTransaction(tx_id, snapshot);
}
Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id,
Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id,
const Snapshot &snapshot) {
auto accessor = active_.access();
auto found = accessor.find(tx_id);
@ -152,7 +152,7 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id,
}
void WorkerEngine::ClearTransactionalCache(
transaction_id_t oldest_active) const {
TransactionId oldest_active) const {
auto access = active_.access();
for (auto kv : access) {
if (kv.first < oldest_active) {
@ -164,7 +164,7 @@ void WorkerEngine::ClearTransactionalCache(
}
}
void WorkerEngine::ClearSingleTransaction(transaction_id_t tx_id) const {
void WorkerEngine::ClearSingleTransaction(TransactionId tx_id) const {
auto access = active_.access();
auto found = access.find(tx_id);
if (found != access.end()) {
@ -176,7 +176,7 @@ void WorkerEngine::ClearSingleTransaction(transaction_id_t tx_id) const {
}
void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
transaction_id_t alternative) {
TransactionId alternative) {
if (snapshot.empty()) {
oldest_active_.store(std::max(alternative, oldest_active_.load()));
} else {
@ -184,11 +184,11 @@ void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot,
}
}
void WorkerEngine::EnsureNextIdGreater(transaction_id_t tx_id) {
void WorkerEngine::EnsureNextIdGreater(TransactionId tx_id) {
master_client_pool_.Call<EnsureNextIdGreaterRpc>(tx_id);
}
void WorkerEngine::GarbageCollectCommitLog(transaction_id_t tx_id) {
void WorkerEngine::GarbageCollectCommitLog(TransactionId tx_id) {
clog_.garbage_collect_older(tx_id);
}
} // namespace tx

View File

@ -24,35 +24,35 @@ class WorkerEngine : public Engine {
~WorkerEngine();
Transaction *Begin() override;
command_id_t Advance(transaction_id_t id) override;
command_id_t UpdateCommand(transaction_id_t id) override;
CommandId Advance(TransactionId id) override;
CommandId UpdateCommand(TransactionId id) override;
void Commit(const Transaction &t) override;
void Abort(const Transaction &t) override;
CommitLog::Info Info(transaction_id_t tid) const override;
CommitLog::Info Info(TransactionId tid) const override;
Snapshot GlobalGcSnapshot() override;
Snapshot GlobalActiveTransactions() override;
transaction_id_t GlobalLast() const override;
transaction_id_t LocalLast() const override;
TransactionId GlobalLast() const override;
TransactionId LocalLast() const override;
void LocalForEachActiveTransaction(
std::function<void(Transaction &)> f) override;
transaction_id_t LocalOldestActive() const override;
Transaction *RunningTransaction(transaction_id_t tx_id) override;
TransactionId LocalOldestActive() const override;
Transaction *RunningTransaction(TransactionId tx_id) override;
// Caches the transaction for the given info an returs a ptr to it.
Transaction *RunningTransaction(transaction_id_t tx_id,
Transaction *RunningTransaction(TransactionId tx_id,
const Snapshot &snapshot);
void EnsureNextIdGreater(transaction_id_t tx_id) override;
void GarbageCollectCommitLog(tx::transaction_id_t tx_id) override;
void EnsureNextIdGreater(TransactionId tx_id) override;
void GarbageCollectCommitLog(tx::TransactionId tx_id) override;
/// Clears the cache of local transactions that have expired. The signature of
/// this method is dictated by `distributed::TransactionalCacheCleaner`.
void ClearTransactionalCache(transaction_id_t oldest_active) const;
void ClearTransactionalCache(TransactionId oldest_active) const;
private:
// Local caches.
mutable ConcurrentMap<transaction_id_t, Transaction *> active_;
std::atomic<transaction_id_t> local_last_{0};
mutable ConcurrentMap<TransactionId, Transaction *> active_;
std::atomic<TransactionId> local_last_{0};
// Mutable because just getting info can cause a cache fill.
mutable CommitLog clog_;
@ -61,14 +61,14 @@ class WorkerEngine : public Engine {
// Used for clearing of caches of transactions that have expired.
// Initialize the oldest_active_ with 1 because there's never a tx with id=0
std::atomic<transaction_id_t> oldest_active_{1};
std::atomic<TransactionId> oldest_active_{1};
// Removes a single transaction from the cache, if present.
void ClearSingleTransaction(transaction_id_t tx_Id) const;
void ClearSingleTransaction(TransactionId tx_Id) const;
// Updates the oldest active transaction to the one from the snapshot. If the
// snapshot is empty, it's set to the given alternative.
void UpdateOldestActive(const Snapshot &snapshot,
transaction_id_t alternative);
TransactionId alternative);
};
} // namespace tx

View File

@ -23,7 +23,7 @@ class Engine;
class Snapshot {
public:
Snapshot() = default;
Snapshot(std::vector<transaction_id_t> &&active)
Snapshot(std::vector<TransactionId> &&active)
: transaction_ids_(std::move(active)) {}
// all the copy/move constructors/assignments act naturally
@ -32,7 +32,7 @@ class Snapshot {
*
* @param xid - The transcation id in question
*/
bool contains(transaction_id_t id) const {
bool contains(TransactionId id) const {
return std::binary_search(transaction_ids_.begin(), transaction_ids_.end(),
id);
}
@ -43,7 +43,7 @@ class Snapshot {
*
* @param id - the transaction id to add
*/
void insert(transaction_id_t id) {
void insert(TransactionId id) {
transaction_ids_.push_back(id);
DCHECK(std::is_sorted(transaction_ids_.begin(), transaction_ids_.end()))
<< "Snapshot must be sorted";
@ -52,18 +52,18 @@ class Snapshot {
/** Removes the given transaction id from this Snapshot.
*
* @param id - the transaction id to remove */
void remove(transaction_id_t id) {
void remove(TransactionId id) {
auto last =
std::remove(transaction_ids_.begin(), transaction_ids_.end(), id);
transaction_ids_.erase(last, transaction_ids_.end());
}
transaction_id_t front() const {
TransactionId front() const {
DCHECK(transaction_ids_.size()) << "Snapshot.front() on empty Snapshot";
return transaction_ids_.front();
}
transaction_id_t back() const {
TransactionId back() const {
DCHECK(transaction_ids_.size()) << "Snapshot.back() on empty Snapshot";
return transaction_ids_.back();
}
@ -94,6 +94,6 @@ class Snapshot {
ar &transaction_ids_;
}
std::vector<transaction_id_t> transaction_ids_;
std::vector<TransactionId> transaction_ids_;
};
} // namespace tx

View File

@ -21,8 +21,8 @@ namespace tx {
class Transaction {
public:
/** Returns the maximum possible transcation id */
static transaction_id_t MaxId() {
return std::numeric_limits<transaction_id_t>::max();
static TransactionId MaxId() {
return std::numeric_limits<TransactionId>::max();
}
private:
@ -31,7 +31,7 @@ class Transaction {
friend class WorkerEngine;
// The constructor is private, only the Engine ever uses it.
Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine)
Transaction(TransactionId id, const Snapshot &snapshot, Engine &engine)
: id_(id), engine_(engine), snapshot_(snapshot) {}
// A transaction can't be moved nor copied. it's owned by the transaction
@ -47,7 +47,7 @@ class Transaction {
void TakeLock(RecordLock &lock) const { locks_.Take(&lock, *this, engine_); }
/** Transaction's id. Unique in the engine that owns it */
const transaction_id_t id_;
const TransactionId id_;
/** The transaction engine to which this transaction belongs */
Engine &engine_;
@ -71,7 +71,7 @@ class Transaction {
private:
// Index of the current command in the current transaction.
command_id_t cid_{1};
CommandId cid_{1};
// A snapshot of currently active transactions.
const Snapshot snapshot_;

View File

@ -5,8 +5,8 @@
namespace tx {
/** Type of a tx::Transcation's id member */
using transaction_id_t = uint64_t;
using TransactionId = uint64_t;
/** Type of a tx::Transcation's command id member */
using command_id_t = uint32_t;
using CommandId = uint32_t;
}

View File

@ -63,7 +63,7 @@ class Mvcc : public ::testing::Test {
mvcc::VersionList<TestClass> version_list{*t1, 0, version_list_size};
TestClass *v1 = nullptr;
tx::Transaction *t2 = nullptr;
tx::transaction_id_t id0, id1, id2;
tx::TransactionId id0, id1, id2;
};
// helper macros. important:

View File

@ -69,7 +69,7 @@ TEST_F(WorkerEngineTest, RunningTransaction) {
++count;
if (t.id_ == 1) {
EXPECT_EQ(t.snapshot(),
tx::Snapshot(std::vector<tx::transaction_id_t>{}));
tx::Snapshot(std::vector<tx::TransactionId>{}));
} else {
EXPECT_EQ(t.snapshot(), tx::Snapshot({1}));
}
@ -129,10 +129,10 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) {
master_.Begin();
master_.Begin();
worker_.RunningTransaction(4);
std::unordered_set<tx::transaction_id_t> local;
std::unordered_set<tx::TransactionId> local;
worker_.LocalForEachActiveTransaction(
[&local](Transaction &t) { local.insert(t.id_); });
EXPECT_EQ(local, std::unordered_set<tx::transaction_id_t>({1, 4}));
EXPECT_EQ(local, std::unordered_set<tx::TransactionId>({1, 4}));
}
TEST_F(WorkerEngineTest, EnsureTxIdGreater) {

View File

@ -53,7 +53,7 @@ TEST(Engine, Advance) {
TEST(Engine, ConcurrentBegin) {
SingleNodeEngine engine;
std::vector<std::thread> threads;
SkipList<transaction_id_t> tx_ids;
SkipList<TransactionId> tx_ids;
for (int i = 0; i < 10; ++i) {
threads.emplace_back([&engine, accessor = tx_ids.access() ]() mutable {
for (int j = 0; j < 100; ++j) {