diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index 9a4901fb1..9305dfd42 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -22,7 +22,7 @@ GraphDbAccessor::GraphDbAccessor(GraphDb &db) transaction_(*db.tx_engine().Begin()), transaction_starter_{true} {} -GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id) +GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id) : db_(db), transaction_(*db.tx_engine().RunningTransaction(tx_id)), transaction_starter_{false} {} @@ -33,7 +33,7 @@ GraphDbAccessor::~GraphDbAccessor() { } } -tx::transaction_id_t GraphDbAccessor::transaction_id() const { +tx::TransactionId GraphDbAccessor::transaction_id() const { return transaction_.id_; } diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 627c973db..bacb710d0 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -52,7 +52,7 @@ class GraphDbAccessor { explicit GraphDbAccessor(GraphDb &db); /// Creates an accessor for a running transaction. - GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id); + GraphDbAccessor(GraphDb &db, tx::TransactionId tx_id); ~GraphDbAccessor(); GraphDbAccessor(const GraphDbAccessor &other) = delete; @@ -549,7 +549,7 @@ class GraphDbAccessor { const std::string &PropertyName(storage::Property property) const; /** Returns the id of this accessor's transaction */ - tx::transaction_id_t transaction_id() const; + tx::TransactionId transaction_id() const; /** Advances transaction's command id by 1. */ void AdvanceCommand(); diff --git a/src/database/state_delta.cpp b/src/database/state_delta.cpp index 1b82f2dbf..267082343 100644 --- a/src/database/state_delta.cpp +++ b/src/database/state_delta.cpp @@ -6,26 +6,26 @@ namespace database { -StateDelta StateDelta::TxBegin(tx::transaction_id_t tx_id) { +StateDelta StateDelta::TxBegin(tx::TransactionId tx_id) { return {StateDelta::Type::TRANSACTION_BEGIN, tx_id}; } -StateDelta StateDelta::TxCommit(tx::transaction_id_t tx_id) { +StateDelta StateDelta::TxCommit(tx::TransactionId tx_id) { return {StateDelta::Type::TRANSACTION_COMMIT, tx_id}; } -StateDelta StateDelta::TxAbort(tx::transaction_id_t tx_id) { +StateDelta StateDelta::TxAbort(tx::TransactionId tx_id) { return {StateDelta::Type::TRANSACTION_ABORT, tx_id}; } -StateDelta StateDelta::CreateVertex(tx::transaction_id_t tx_id, +StateDelta StateDelta::CreateVertex(tx::TransactionId tx_id, gid::Gid vertex_id) { StateDelta op(StateDelta::Type::CREATE_VERTEX, tx_id); op.vertex_id = vertex_id; return op; } -StateDelta StateDelta::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, +StateDelta StateDelta::CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id, gid::Gid vertex_from_id, gid::Gid vertex_to_id, storage::EdgeType edge_type, @@ -39,7 +39,7 @@ StateDelta StateDelta::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, return op; } -StateDelta StateDelta::AddOutEdge(tx::transaction_id_t tx_id, +StateDelta StateDelta::AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::VertexAddress vertex_to_address, storage::EdgeAddress edge_address, @@ -54,7 +54,7 @@ StateDelta StateDelta::AddOutEdge(tx::transaction_id_t tx_id, return op; } -StateDelta StateDelta::RemoveOutEdge(tx::transaction_id_t tx_id, +StateDelta StateDelta::RemoveOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::EdgeAddress edge_address) { CHECK(edge_address.is_remote()) << "WAL can only contain global addresses."; @@ -64,7 +64,7 @@ StateDelta StateDelta::RemoveOutEdge(tx::transaction_id_t tx_id, return op; } -StateDelta StateDelta::AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id, +StateDelta StateDelta::AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::VertexAddress vertex_from_address, storage::EdgeAddress edge_address, storage::EdgeType edge_type) { @@ -78,7 +78,7 @@ StateDelta StateDelta::AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id, return op; } -StateDelta StateDelta::RemoveInEdge(tx::transaction_id_t tx_id, +StateDelta StateDelta::RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::EdgeAddress edge_address) { CHECK(edge_address.is_remote()) << "WAL can only contain global addresses."; @@ -88,7 +88,7 @@ StateDelta StateDelta::RemoveInEdge(tx::transaction_id_t tx_id, return op; } -StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id, +StateDelta StateDelta::PropsSetVertex(tx::TransactionId tx_id, gid::Gid vertex_id, storage::Property property, const std::string &property_name, @@ -101,7 +101,7 @@ StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id, return op; } -StateDelta StateDelta::PropsSetEdge(tx::transaction_id_t tx_id, +StateDelta StateDelta::PropsSetEdge(tx::TransactionId tx_id, gid::Gid edge_id, storage::Property property, const std::string &property_name, @@ -114,7 +114,7 @@ StateDelta StateDelta::PropsSetEdge(tx::transaction_id_t tx_id, return op; } -StateDelta StateDelta::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, +StateDelta StateDelta::AddLabel(tx::TransactionId tx_id, gid::Gid vertex_id, storage::Label label, const std::string &label_name) { StateDelta op(StateDelta::Type::ADD_LABEL, tx_id); @@ -124,7 +124,7 @@ StateDelta StateDelta::AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, return op; } -StateDelta StateDelta::RemoveLabel(tx::transaction_id_t tx_id, +StateDelta StateDelta::RemoveLabel(tx::TransactionId tx_id, gid::Gid vertex_id, storage::Label label, const std::string &label_name) { StateDelta op(StateDelta::Type::REMOVE_LABEL, tx_id); @@ -134,7 +134,7 @@ StateDelta StateDelta::RemoveLabel(tx::transaction_id_t tx_id, return op; } -StateDelta StateDelta::RemoveVertex(tx::transaction_id_t tx_id, +StateDelta StateDelta::RemoveVertex(tx::TransactionId tx_id, gid::Gid vertex_id, bool check_empty) { StateDelta op(StateDelta::Type::REMOVE_VERTEX, tx_id); op.vertex_id = vertex_id; @@ -142,14 +142,14 @@ StateDelta StateDelta::RemoveVertex(tx::transaction_id_t tx_id, return op; } -StateDelta StateDelta::RemoveEdge(tx::transaction_id_t tx_id, +StateDelta StateDelta::RemoveEdge(tx::TransactionId tx_id, gid::Gid edge_id) { StateDelta op(StateDelta::Type::REMOVE_EDGE, tx_id); op.edge_id = edge_id; return op; } -StateDelta StateDelta::BuildIndex(tx::transaction_id_t tx_id, +StateDelta StateDelta::BuildIndex(tx::TransactionId tx_id, storage::Label label, const std::string &label_name, storage::Property property, diff --git a/src/database/state_delta.hpp b/src/database/state_delta.hpp index 0ea9befe3..7c87e9c58 100644 --- a/src/database/state_delta.hpp +++ b/src/database/state_delta.hpp @@ -47,7 +47,7 @@ struct StateDelta { }; StateDelta() = default; - StateDelta(const enum Type &type, tx::transaction_id_t tx_id) + StateDelta(const enum Type &type, tx::TransactionId tx_id) : type(type), transaction_id(tx_id) {} /** Attempts to decode a StateDelta from the given decoder. Returns the @@ -62,47 +62,47 @@ struct StateDelta { HashedFileWriter &writer, communication::bolt::PrimitiveEncoder &encoder) const; - static StateDelta TxBegin(tx::transaction_id_t tx_id); - static StateDelta TxCommit(tx::transaction_id_t tx_id); - static StateDelta TxAbort(tx::transaction_id_t tx_id); - static StateDelta CreateVertex(tx::transaction_id_t tx_id, + static StateDelta TxBegin(tx::TransactionId tx_id); + static StateDelta TxCommit(tx::TransactionId tx_id); + static StateDelta TxAbort(tx::TransactionId tx_id); + static StateDelta CreateVertex(tx::TransactionId tx_id, gid::Gid vertex_id); - static StateDelta CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, + static StateDelta CreateEdge(tx::TransactionId tx_id, gid::Gid edge_id, gid::Gid vertex_from_id, gid::Gid vertex_to_id, storage::EdgeType edge_type, const std::string &edge_type_name); - static StateDelta AddOutEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id, + static StateDelta AddOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::VertexAddress vertex_to_address, storage::EdgeAddress edge_address, storage::EdgeType edge_type); - static StateDelta RemoveOutEdge(tx::transaction_id_t tx_id, + static StateDelta RemoveOutEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::EdgeAddress edge_address); - static StateDelta AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id, + static StateDelta AddInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::VertexAddress vertex_from_address, storage::EdgeAddress edge_address, storage::EdgeType edge_type); - static StateDelta RemoveInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id, + static StateDelta RemoveInEdge(tx::TransactionId tx_id, gid::Gid vertex_id, storage::EdgeAddress edge_address); - static StateDelta PropsSetVertex(tx::transaction_id_t tx_id, + static StateDelta PropsSetVertex(tx::TransactionId tx_id, gid::Gid vertex_id, storage::Property property, const std::string &property_name, const PropertyValue &value); - static StateDelta PropsSetEdge(tx::transaction_id_t tx_id, gid::Gid edge_id, + static StateDelta PropsSetEdge(tx::TransactionId tx_id, gid::Gid edge_id, storage::Property property, const std::string &property_name, const PropertyValue &value); - static StateDelta AddLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, + static StateDelta AddLabel(tx::TransactionId tx_id, gid::Gid vertex_id, storage::Label label, const std::string &label_name); - static StateDelta RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id, + static StateDelta RemoveLabel(tx::TransactionId tx_id, gid::Gid vertex_id, storage::Label label, const std::string &label_name); - static StateDelta RemoveVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id, + static StateDelta RemoveVertex(tx::TransactionId tx_id, gid::Gid vertex_id, bool check_empty); - static StateDelta RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id); - static StateDelta BuildIndex(tx::transaction_id_t tx_id, storage::Label label, + static StateDelta RemoveEdge(tx::TransactionId tx_id, gid::Gid edge_id); + static StateDelta BuildIndex(tx::TransactionId tx_id, storage::Label label, const std::string &label_name, storage::Property property, const std::string &property_name); @@ -112,7 +112,7 @@ struct StateDelta { // Members valid for every delta. enum Type type; - tx::transaction_id_t transaction_id; + tx::TransactionId transaction_id; // Members valid only for some deltas, see StateDelta::Type comments above. // TODO: when preparing the WAL for distributed, most likely remove Gids and diff --git a/src/database/storage.hpp b/src/database/storage.hpp index 2407fa391..000259d75 100644 --- a/src/database/storage.hpp +++ b/src/database/storage.hpp @@ -111,7 +111,7 @@ class Storage { LabelPropertyIndex label_property_index_; // Set of transactions ids which are building indexes currently - SkipList index_build_tx_in_progress_; + SkipList index_build_tx_in_progress_; /// Gets the Vertex/Edge main storage map. template diff --git a/src/database/storage_gc.hpp b/src/database/storage_gc.hpp index c68a6b57a..1fc697514 100644 --- a/src/database/storage_gc.hpp +++ b/src/database/storage_gc.hpp @@ -64,7 +64,7 @@ class StorageGc { StorageGc &operator=(const StorageGc &) = delete; StorageGc &operator=(StorageGc &&) = delete; - virtual void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) = 0; + virtual void CollectCommitLogGarbage(tx::TransactionId oldest_active) = 0; void CollectGarbage() { // main garbage collection logic @@ -129,9 +129,9 @@ class StorageGc { // alive transaction from the time before the hints were set is still alive // (otherwise that transaction could still be waiting for a resolution of // the query to the commit log about some old transaction) - std::experimental::optional GetClogSafeTransaction( - tx::transaction_id_t oldest_active) { - std::experimental::optional safe_to_delete; + std::experimental::optional GetClogSafeTransaction( + tx::TransactionId oldest_active) { + std::experimental::optional safe_to_delete; while (!gc_txid_ranges_.empty() && gc_txid_ranges_.front().second < oldest_active) { safe_to_delete = gc_txid_ranges_.front().first; @@ -150,7 +150,7 @@ class StorageGc { // History of ranges // that gc operated on at some previous time - used to clear commit log - std::queue> + std::queue> gc_txid_ranges_; }; } // namespace database diff --git a/src/database/storage_gc_master.hpp b/src/database/storage_gc_master.hpp index 51fda3529..70015b4a1 100644 --- a/src/database/storage_gc_master.hpp +++ b/src/database/storage_gc_master.hpp @@ -31,7 +31,7 @@ class StorageGcMaster : public StorageGc { scheduler_.Stop(); } - void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final { + void CollectCommitLogGarbage(tx::TransactionId oldest_active) final { // Workers are sending information when it's safe to delete every // transaction older than oldest_active from their perspective i.e. there // won't exist another transaction in the future with id larger than or @@ -39,7 +39,7 @@ class StorageGcMaster : public StorageGc { // the state of transactions which we are deleting. auto safe_transaction = GetClogSafeTransaction(oldest_active); if (safe_transaction) { - tx::transaction_id_t min_safe = *safe_transaction; + tx::TransactionId min_safe = *safe_transaction; { std::unique_lock lock(worker_safe_transaction_mutex_); for (auto worker_id : coordination_.GetWorkerIds()) { @@ -60,7 +60,7 @@ class StorageGcMaster : public StorageGc { distributed::MasterCoordination &coordination_; // Mapping of worker ids and oldest active transaction which is safe for // deletion from worker perspective - std::unordered_map worker_safe_transaction_; + std::unordered_map worker_safe_transaction_; std::mutex worker_safe_transaction_mutex_; }; } // namespace database diff --git a/src/database/storage_gc_single_node.hpp b/src/database/storage_gc_single_node.hpp index 37217af6b..320d5c7f1 100644 --- a/src/database/storage_gc_single_node.hpp +++ b/src/database/storage_gc_single_node.hpp @@ -14,7 +14,7 @@ class StorageGcSingleNode : public StorageGc { scheduler_.Stop(); } - void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final { + void CollectCommitLogGarbage(tx::TransactionId oldest_active) final { auto safe_to_delete = GetClogSafeTransaction(oldest_active); if (safe_to_delete) tx_engine_.GarbageCollectCommitLog(*safe_to_delete); } diff --git a/src/database/storage_gc_worker.hpp b/src/database/storage_gc_worker.hpp index 4e3a6cb94..4d938dbb9 100644 --- a/src/database/storage_gc_worker.hpp +++ b/src/database/storage_gc_worker.hpp @@ -24,7 +24,7 @@ class StorageGcWorker : public StorageGc { scheduler_.Stop(); } - void CollectCommitLogGarbage(tx::transaction_id_t oldest_active) final { + void CollectCommitLogGarbage(tx::TransactionId oldest_active) final { // We first need to delete transactions that we can delete to be sure that // the locks are released as well. Otherwise some new transaction might // try to acquire a lock which hasn't been released (if the transaction diff --git a/src/distributed/cache.cpp b/src/distributed/cache.cpp index a8099cd69..dc3e7721b 100644 --- a/src/distributed/cache.cpp +++ b/src/distributed/cache.cpp @@ -22,7 +22,7 @@ TRecord *Cache::FindNew(gid::Gid gid) { } template -void Cache::FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, +void Cache::FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid, TRecord *&old_record, TRecord *&new_record) { { diff --git a/src/distributed/cache.hpp b/src/distributed/cache.hpp index b62af0e7d..d41eb1ca2 100644 --- a/src/distributed/cache.hpp +++ b/src/distributed/cache.hpp @@ -37,7 +37,7 @@ class Cache { /// from the given transaction's ID and command ID, and caches it. Sets the /// given pointers to point to the fetched data. Analogue to /// mvcc::VersionList::find_set_old_new. - void FindSetOldNew(tx::transaction_id_t tx_id, int worker_id, gid::Gid gid, + void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid, TRecord *&old_record, TRecord *&new_record); /// Sets the given records as (new, old) data for the given gid. diff --git a/src/distributed/data_manager.cpp b/src/distributed/data_manager.cpp index 6e1ced718..e94df319b 100644 --- a/src/distributed/data_manager.cpp +++ b/src/distributed/data_manager.cpp @@ -5,7 +5,7 @@ namespace distributed { template Cache &DataManager::GetCache(CacheT &collection, - tx::transaction_id_t tx_id) { + tx::TransactionId tx_id) { auto access = collection.access(); auto found = access.find(tx_id); if (found != access.end()) return found->second; @@ -17,12 +17,12 @@ Cache &DataManager::GetCache(CacheT &collection, } template <> -Cache &DataManager::Elements(tx::transaction_id_t tx_id) { +Cache &DataManager::Elements(tx::TransactionId tx_id) { return GetCache(vertices_caches_, tx_id); } template <> -Cache &DataManager::Elements(tx::transaction_id_t tx_id) { +Cache &DataManager::Elements(tx::TransactionId tx_id) { return GetCache(edges_caches_, tx_id); } @@ -30,12 +30,12 @@ DataManager::DataManager(database::Storage &storage, distributed::DataRpcClients &data_clients) : storage_(storage), data_clients_(data_clients) {} -void DataManager::ClearCacheForSingleTransaction(tx::transaction_id_t tx_id) { +void DataManager::ClearCacheForSingleTransaction(tx::TransactionId tx_id) { Elements(tx_id).ClearCache(); Elements(tx_id).ClearCache(); } -void DataManager::ClearTransactionalCache(tx::transaction_id_t oldest_active) { +void DataManager::ClearTransactionalCache(tx::TransactionId oldest_active) { auto vertex_access = vertices_caches_.access(); for (auto &kv : vertex_access) { if (kv.first < oldest_active) { diff --git a/src/distributed/data_manager.hpp b/src/distributed/data_manager.hpp index 28851cff4..0c946549e 100644 --- a/src/distributed/data_manager.hpp +++ b/src/distributed/data_manager.hpp @@ -17,12 +17,12 @@ namespace distributed { /// Handles remote data caches for edges and vertices, per transaction. class DataManager { template - using CacheT = ConcurrentMap>; + using CacheT = ConcurrentMap>; // Helper, gets or inserts a data cache for the given transaction. template Cache &GetCache(CacheT &collection, - tx::transaction_id_t tx_id); + tx::TransactionId tx_id); public: DataManager(database::Storage &storage, @@ -30,14 +30,14 @@ class DataManager { /// Gets or creates the remote vertex/edge cache for the given transaction. template - Cache &Elements(tx::transaction_id_t tx_id); + Cache &Elements(tx::TransactionId tx_id); /// Removes all the caches for a single transaction. - void ClearCacheForSingleTransaction(tx::transaction_id_t tx_id); + void ClearCacheForSingleTransaction(tx::TransactionId tx_id); /// Clears the cache of local transactions that have expired. The signature of /// this method is dictated by `distributed::TransactionalCacheCleaner`. - void ClearTransactionalCache(tx::transaction_id_t oldest_active); + void ClearTransactionalCache(tx::TransactionId oldest_active); private: database::Storage &storage_; diff --git a/src/distributed/data_rpc_clients.cpp b/src/distributed/data_rpc_clients.cpp index 9e9f3602e..15bd008f9 100644 --- a/src/distributed/data_rpc_clients.cpp +++ b/src/distributed/data_rpc_clients.cpp @@ -7,7 +7,7 @@ namespace distributed { template <> std::unique_ptr DataRpcClients::RemoteElement(int worker_id, - tx::transaction_id_t tx_id, + tx::TransactionId tx_id, gid::Gid gid) { auto response = clients_.GetClientPool(worker_id).Call(TxGidPair{tx_id, gid}); @@ -17,7 +17,7 @@ std::unique_ptr DataRpcClients::RemoteElement(int worker_id, template <> std::unique_ptr DataRpcClients::RemoteElement( - int worker_id, tx::transaction_id_t tx_id, gid::Gid gid) { + int worker_id, tx::TransactionId tx_id, gid::Gid gid) { auto response = clients_.GetClientPool(worker_id).Call(TxGidPair{tx_id, gid}); CHECK(response) << "VertexRpc failed"; diff --git a/src/distributed/data_rpc_clients.hpp b/src/distributed/data_rpc_clients.hpp index e4610415d..087f3fc18 100644 --- a/src/distributed/data_rpc_clients.hpp +++ b/src/distributed/data_rpc_clients.hpp @@ -18,7 +18,7 @@ class DataRpcClients { /// must be visible in given transaction. template std::unique_ptr RemoteElement(int worker_id, - tx::transaction_id_t tx_id, + tx::TransactionId tx_id, gid::Gid gid); private: diff --git a/src/distributed/data_rpc_messages.hpp b/src/distributed/data_rpc_messages.hpp index 7baa97800..9a3c5840f 100644 --- a/src/distributed/data_rpc_messages.hpp +++ b/src/distributed/data_rpc_messages.hpp @@ -13,7 +13,7 @@ namespace distributed { struct TxGidPair { - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; gid::Gid gid; private: diff --git a/src/distributed/durability_rpc_clients.cpp b/src/distributed/durability_rpc_clients.cpp index 7146a6e0c..ba085edc0 100644 --- a/src/distributed/durability_rpc_clients.cpp +++ b/src/distributed/durability_rpc_clients.cpp @@ -6,7 +6,7 @@ namespace distributed { utils::Future DurabilityRpcClients::MakeSnapshot( - tx::transaction_id_t tx) { + tx::TransactionId tx) { return std::async(std::launch::async, [this, tx] { auto futures = clients_.ExecuteOnWorkers( 0, [tx](communication::rpc::ClientPool &client_pool) { diff --git a/src/distributed/durability_rpc_clients.hpp b/src/distributed/durability_rpc_clients.hpp index b6e4ef3c9..880bde3d9 100644 --- a/src/distributed/durability_rpc_clients.hpp +++ b/src/distributed/durability_rpc_clients.hpp @@ -19,7 +19,7 @@ class DurabilityRpcClients { // if all workers sucesfully completed their snapshot creation, false // otherwise // @param tx - transaction from which to take db snapshot - utils::Future MakeSnapshot(tx::transaction_id_t tx); + utils::Future MakeSnapshot(tx::TransactionId tx); private: RpcWorkerClients &clients_; diff --git a/src/distributed/durability_rpc_messages.hpp b/src/distributed/durability_rpc_messages.hpp index 4b51cb6a3..baf147814 100644 --- a/src/distributed/durability_rpc_messages.hpp +++ b/src/distributed/durability_rpc_messages.hpp @@ -8,7 +8,7 @@ namespace distributed { -RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotReq, tx::transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotReq, tx::TransactionId); RPC_SINGLE_MEMBER_MESSAGE(MakeSnapshotRes, bool); using MakeSnapshotRpc = diff --git a/src/distributed/index_rpc_messages.hpp b/src/distributed/index_rpc_messages.hpp index 709886660..3f9ebf321 100644 --- a/src/distributed/index_rpc_messages.hpp +++ b/src/distributed/index_rpc_messages.hpp @@ -11,7 +11,7 @@ namespace distributed { struct IndexLabelPropertyTx { storage::Label label; storage::Property property; - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; private: friend class boost::serialization::access; diff --git a/src/distributed/produce_rpc_server.cpp b/src/distributed/produce_rpc_server.cpp index e1730b79e..dba6d50de 100644 --- a/src/distributed/produce_rpc_server.cpp +++ b/src/distributed/produce_rpc_server.cpp @@ -8,7 +8,7 @@ namespace distributed { ProduceRpcServer::OngoingProduce::OngoingProduce( - database::GraphDb &db, tx::transaction_id_t tx_id, + database::GraphDb &db, tx::TransactionId tx_id, std::shared_ptr op, query::SymbolTable symbol_table, Parameters parameters, std::vector pull_symbols) @@ -109,7 +109,7 @@ ProduceRpcServer::ProduceRpcServer( } void ProduceRpcServer::FinishAndClearOngoingProducePlans( - tx::transaction_id_t tx_id) { + tx::TransactionId tx_id) { std::lock_guard guard{ongoing_produces_lock_}; for (auto it = ongoing_produces_.begin(); it != ongoing_produces_.end();) { if (it->first.first == tx_id) { diff --git a/src/distributed/produce_rpc_server.hpp b/src/distributed/produce_rpc_server.hpp index 19e734f9f..88e619f97 100644 --- a/src/distributed/produce_rpc_server.hpp +++ b/src/distributed/produce_rpc_server.hpp @@ -32,7 +32,7 @@ class ProduceRpcServer { /// MG (see query::plan::Synchronize). class OngoingProduce { public: - OngoingProduce(database::GraphDb &db, tx::transaction_id_t tx_id, + OngoingProduce(database::GraphDb &db, tx::TransactionId tx_id, std::shared_ptr op, query::SymbolTable symbol_table, Parameters parameters, std::vector pull_symbols); @@ -66,12 +66,12 @@ class ProduceRpcServer { /// Finish and clear ongoing produces for all plans that are tied to a /// transaction with tx_id. - void FinishAndClearOngoingProducePlans(tx::transaction_id_t tx_id); + void FinishAndClearOngoingProducePlans(tx::TransactionId tx_id); private: std::mutex ongoing_produces_lock_; /// Mapping of (tx id, plan id) to OngoingProduce. - std::map, OngoingProduce> + std::map, OngoingProduce> ongoing_produces_; database::GraphDb &db_; communication::rpc::Server &produce_rpc_server_; diff --git a/src/distributed/pull_produce_rpc_messages.hpp b/src/distributed/pull_produce_rpc_messages.hpp index 4ef6a8e95..9be2cf380 100644 --- a/src/distributed/pull_produce_rpc_messages.hpp +++ b/src/distributed/pull_produce_rpc_messages.hpp @@ -37,7 +37,7 @@ enum class PullState { struct PullReq : public communication::rpc::Message { PullReq() {} - PullReq(tx::transaction_id_t tx_id, tx::Snapshot tx_snapshot, int64_t plan_id, + PullReq(tx::TransactionId tx_id, tx::Snapshot tx_snapshot, int64_t plan_id, const Parameters ¶ms, std::vector symbols, bool accumulate, int batch_size, bool send_old, bool send_new) : tx_id(tx_id), @@ -50,7 +50,7 @@ struct PullReq : public communication::rpc::Message { send_old(send_old), send_new(send_new) {} - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; tx::Snapshot tx_snapshot; int64_t plan_id; Parameters params; @@ -367,7 +367,7 @@ using PullRpc = communication::rpc::RequestResponse; // optimization not to have to send the full PullReqData pack every // time. -RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(TransactionCommandAdvancedReq, tx::TransactionId); RPC_NO_MEMBER_MESSAGE(TransactionCommandAdvancedRes); using TransactionCommandAdvancedRpc = communication::rpc::RequestResponse PullRpcClients::Pull( std::vector> PullRpcClients::NotifyAllTransactionCommandAdvanced( - tx::transaction_id_t tx_id) { + tx::TransactionId tx_id) { return clients_.ExecuteOnWorkers(0, [tx_id](auto &client) { auto res = client.template Call(tx_id); CHECK(res) << "TransactionCommandAdvanceRpc failed"; diff --git a/src/distributed/pull_rpc_clients.hpp b/src/distributed/pull_rpc_clients.hpp index 7eab90d15..8be2ee3ff 100644 --- a/src/distributed/pull_rpc_clients.hpp +++ b/src/distributed/pull_rpc_clients.hpp @@ -38,7 +38,7 @@ class PullRpcClients { auto GetWorkerIds() { return clients_.GetWorkerIds(); } std::vector> NotifyAllTransactionCommandAdvanced( - tx::transaction_id_t tx_id); + tx::TransactionId tx_id); private: RpcWorkerClients &clients_; diff --git a/src/distributed/rpc_worker_clients.hpp b/src/distributed/rpc_worker_clients.hpp index fbe748857..2fada4769 100644 --- a/src/distributed/rpc_worker_clients.hpp +++ b/src/distributed/rpc_worker_clients.hpp @@ -84,7 +84,7 @@ class IndexRpcClients { auto GetBuildIndexFutures(const storage::Label &label, const storage::Property &property, - tx::transaction_id_t transaction_id, + tx::TransactionId transaction_id, int worker_id) { return clients_.ExecuteOnWorkers( worker_id, [label, property, transaction_id]( @@ -109,7 +109,7 @@ class OngoingProduceJoinerRpcClients { OngoingProduceJoinerRpcClients(RpcWorkerClients &clients) : clients_(clients) {} - void JoinOngoingProduces(tx::transaction_id_t tx_id) { + void JoinOngoingProduces(tx::TransactionId tx_id) { auto futures = clients_.ExecuteOnWorkers( 0, [tx_id](communication::rpc::ClientPool &client_pool) { auto result = diff --git a/src/distributed/storage_gc_rpc_messages.hpp b/src/distributed/storage_gc_rpc_messages.hpp index b244fd671..716993ede 100644 --- a/src/distributed/storage_gc_rpc_messages.hpp +++ b/src/distributed/storage_gc_rpc_messages.hpp @@ -14,10 +14,10 @@ using Endpoint = io::network::Endpoint; struct GcClearedStatusReq : public Message { GcClearedStatusReq() {} - GcClearedStatusReq(tx::transaction_id_t local_oldest_active, int worker_id) + GcClearedStatusReq(tx::TransactionId local_oldest_active, int worker_id) : local_oldest_active(local_oldest_active), worker_id(worker_id) {} - tx::transaction_id_t local_oldest_active; + tx::TransactionId local_oldest_active; int worker_id; private: diff --git a/src/distributed/transactional_cache_cleaner.hpp b/src/distributed/transactional_cache_cleaner.hpp index 3ae790478..3e1faebba 100644 --- a/src/distributed/transactional_cache_cleaner.hpp +++ b/src/distributed/transactional_cache_cleaner.hpp @@ -31,13 +31,13 @@ class TransactionalCacheCleaner { protected: /// Registers the given object for transactional cleaning. The object will - /// periodically get it's `ClearCache(tx::transaction_id_t)` method called + /// periodically get it's `ClearCache(tx::TransactionId)` method called /// with the oldest active transaction id. Note that the ONLY guarantee for /// the call param is that there are no transactions alive that have an id /// lower than it. template void Register(TCache &cache) { - functions_.emplace_back([&cache](tx::transaction_id_t oldest_active) { + functions_.emplace_back([&cache](tx::TransactionId oldest_active) { cache.ClearTransactionalCache(oldest_active); }); } @@ -49,12 +49,12 @@ class TransactionalCacheCleaner { Register(caches...); } - void Clear(tx::transaction_id_t oldest_active) { + void Clear(tx::TransactionId oldest_active) { for (auto &f : functions_) f(oldest_active); } tx::Engine &tx_engine_; - std::vector> + std::vector> functions_; Scheduler cache_clearing_scheduler_; }; diff --git a/src/distributed/transactional_cache_cleaner_rpc_messages.hpp b/src/distributed/transactional_cache_cleaner_rpc_messages.hpp index 3765b45fa..a949ae828 100644 --- a/src/distributed/transactional_cache_cleaner_rpc_messages.hpp +++ b/src/distributed/transactional_cache_cleaner_rpc_messages.hpp @@ -5,7 +5,7 @@ namespace distributed { -RPC_SINGLE_MEMBER_MESSAGE(WaitOnTransactionEndReq, tx::transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(WaitOnTransactionEndReq, tx::TransactionId); RPC_NO_MEMBER_MESSAGE(WaitOnTransactionEndRes); using WaitOnTransactionEndRpc = communication::rpc::RequestResponse &labels, const std::unordered_map &properties) { @@ -46,7 +46,7 @@ gid::Gid UpdatesRpcClients::CreateVertex( } storage::EdgeAddress UpdatesRpcClients::CreateEdge( - tx::transaction_id_t tx_id, VertexAccessor &from, VertexAccessor &to, + tx::TransactionId tx_id, VertexAccessor &from, VertexAccessor &to, storage::EdgeType edge_type) { CHECK(from.address().is_remote()) << "In CreateEdge `from` must be remote"; @@ -59,7 +59,7 @@ storage::EdgeAddress UpdatesRpcClients::CreateEdge( return {res->member.gid, from_worker}; } -void UpdatesRpcClients::AddInEdge(tx::transaction_id_t tx_id, +void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, VertexAccessor &from, storage::EdgeAddress edge_address, VertexAccessor &to, @@ -76,7 +76,7 @@ void UpdatesRpcClients::AddInEdge(tx::transaction_id_t tx_id, RaiseIfRemoteError(res->member); } -void UpdatesRpcClients::RemoveVertex(int worker_id, tx::transaction_id_t tx_id, +void UpdatesRpcClients::RemoveVertex(int worker_id, tx::TransactionId tx_id, gid::Gid gid, bool check_empty) { auto res = worker_clients_.GetClientPool(worker_id).Call( RemoveVertexReqData{gid, tx_id, check_empty}); @@ -84,7 +84,7 @@ void UpdatesRpcClients::RemoveVertex(int worker_id, tx::transaction_id_t tx_id, RaiseIfRemoteError(res->member); } -void UpdatesRpcClients::RemoveEdge(tx::transaction_id_t tx_id, int worker_id, +void UpdatesRpcClients::RemoveEdge(tx::TransactionId tx_id, int worker_id, gid::Gid edge_gid, gid::Gid vertex_from_id, storage::VertexAddress vertex_to_addr) { auto res = worker_clients_.GetClientPool(worker_id).Call( @@ -93,7 +93,7 @@ void UpdatesRpcClients::RemoveEdge(tx::transaction_id_t tx_id, int worker_id, RaiseIfRemoteError(res->member); } -void UpdatesRpcClients::RemoveInEdge(tx::transaction_id_t tx_id, int worker_id, +void UpdatesRpcClients::RemoveInEdge(tx::TransactionId tx_id, int worker_id, gid::Gid vertex_id, storage::EdgeAddress edge_address) { CHECK(edge_address.is_remote()) << "RemoveInEdge edge_address is local."; @@ -104,7 +104,7 @@ void UpdatesRpcClients::RemoveInEdge(tx::transaction_id_t tx_id, int worker_id, } std::vector> UpdatesRpcClients::UpdateApplyAll( - int skip_worker_id, tx::transaction_id_t tx_id) { + int skip_worker_id, tx::TransactionId tx_id) { return worker_clients_.ExecuteOnWorkers( skip_worker_id, [tx_id](auto &client) { auto res = client.template Call(tx_id); diff --git a/src/distributed/updates_rpc_clients.hpp b/src/distributed/updates_rpc_clients.hpp index ffbfc60f1..a5baf55f7 100644 --- a/src/distributed/updates_rpc_clients.hpp +++ b/src/distributed/updates_rpc_clients.hpp @@ -28,7 +28,7 @@ class UpdatesRpcClients { /// Creates a vertex on the given worker and returns it's id. gid::Gid CreateVertex( - int worker_id, tx::transaction_id_t tx_id, + int worker_id, tx::TransactionId tx_id, const std::vector &labels, const std::unordered_map &properties); @@ -38,18 +38,18 @@ class UpdatesRpcClients { /// handled by a call to this function. Otherwise a separate call to /// `AddInEdge` might be necessary. Throws all the exceptions that can /// occur remotely as a result of updating a vertex. - storage::EdgeAddress CreateEdge(tx::transaction_id_t tx_id, + storage::EdgeAddress CreateEdge(tx::TransactionId tx_id, VertexAccessor &from, VertexAccessor &to, storage::EdgeType edge_type); /// Adds the edge with the given address to the `to` vertex as an incoming /// edge. Only used when `to` is remote and not on the same worker as `from`. - void AddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from, + void AddInEdge(tx::TransactionId tx_id, VertexAccessor &from, storage::EdgeAddress edge_address, VertexAccessor &to, storage::EdgeType edge_type); /// Removes a vertex from the other worker. - void RemoveVertex(int worker_id, tx::transaction_id_t tx_id, gid::Gid gid, + void RemoveVertex(int worker_id, tx::TransactionId tx_id, gid::Gid gid, bool check_empty); /// Removes an edge on another worker. This also handles the `from` vertex @@ -57,17 +57,17 @@ class UpdatesRpcClients { /// `to` vertex is on the same worker, then that side is handled too by the /// single RPC call, otherwise a separate call has to be made to /// RemoveInEdge. - void RemoveEdge(tx::transaction_id_t tx_id, int worker_id, gid::Gid edge_gid, + void RemoveEdge(tx::TransactionId tx_id, int worker_id, gid::Gid edge_gid, gid::Gid vertex_from_id, storage::VertexAddress vertex_to_addr); - void RemoveInEdge(tx::transaction_id_t tx_id, int worker_id, + void RemoveInEdge(tx::TransactionId tx_id, int worker_id, gid::Gid vertex_id, storage::EdgeAddress edge_address); /// Calls for all the workers (except the given one) to apply their updates /// and returns the future results. std::vector> UpdateApplyAll( - int skip_worker_id, tx::transaction_id_t tx_id); + int skip_worker_id, tx::TransactionId tx_id); private: RpcWorkerClients &worker_clients_; diff --git a/src/distributed/updates_rpc_messages.hpp b/src/distributed/updates_rpc_messages.hpp index 218f927cd..098a13696 100644 --- a/src/distributed/updates_rpc_messages.hpp +++ b/src/distributed/updates_rpc_messages.hpp @@ -26,7 +26,7 @@ RPC_SINGLE_MEMBER_MESSAGE(UpdateReq, database::StateDelta); RPC_SINGLE_MEMBER_MESSAGE(UpdateRes, UpdateResult); using UpdateRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyReq, tx::transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyReq, tx::TransactionId); RPC_SINGLE_MEMBER_MESSAGE(UpdateApplyRes, UpdateResult); using UpdateApplyRpc = communication::rpc::RequestResponse; @@ -47,7 +47,7 @@ struct CreateResult { }; struct CreateVertexReqData { - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; std::vector labels; std::unordered_map properties; @@ -91,7 +91,7 @@ struct CreateEdgeReqData { gid::Gid from; storage::VertexAddress to; storage::EdgeType edge_type; - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; private: friend class boost::serialization::access; @@ -115,7 +115,7 @@ struct AddInEdgeReqData { storage::EdgeAddress edge_address; gid::Gid to; storage::EdgeType edge_type; - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; private: friend class boost::serialization::access; @@ -137,7 +137,7 @@ using AddInEdgeRpc = struct RemoveVertexReqData { gid::Gid gid; - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; bool check_empty; private: @@ -157,7 +157,7 @@ using RemoveVertexRpc = communication::rpc::RequestResponse; struct RemoveEdgeData { - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; gid::Gid edge_id; gid::Gid vertex_from_id; storage::VertexAddress vertex_to_address; @@ -180,7 +180,7 @@ using RemoveEdgeRpc = communication::rpc::RequestResponse; struct RemoveInEdgeData { - tx::transaction_id_t tx_id; + tx::TransactionId tx_id; gid::Gid vertex; storage::EdgeAddress edge_address; diff --git a/src/distributed/updates_rpc_server.cpp b/src/distributed/updates_rpc_server.cpp index 67ea06986..2ad76cc29 100644 --- a/src/distributed/updates_rpc_server.cpp +++ b/src/distributed/updates_rpc_server.cpp @@ -253,7 +253,7 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb &db, }); } -UpdateResult UpdatesRpcServer::Apply(tx::transaction_id_t tx_id) { +UpdateResult UpdatesRpcServer::Apply(tx::TransactionId tx_id) { auto apply = [tx_id](auto &collection) { auto access = collection.access(); auto found = access.find(tx_id); @@ -273,7 +273,7 @@ UpdateResult UpdatesRpcServer::Apply(tx::transaction_id_t tx_id) { } void UpdatesRpcServer::ClearTransactionalCache( - tx::transaction_id_t oldest_active) { + tx::TransactionId oldest_active) { auto vertex_access = vertex_updates_.access(); for (auto &kv : vertex_access) { if (kv.first < oldest_active) { @@ -291,7 +291,7 @@ void UpdatesRpcServer::ClearTransactionalCache( // Gets/creates the TransactionUpdates for the given transaction. template UpdatesRpcServer::TransactionUpdates &UpdatesRpcServer::GetUpdates( - MapT &updates, tx::transaction_id_t tx_id) { + MapT &updates, tx::TransactionId tx_id) { return updates.access() .emplace(tx_id, std::make_tuple(tx_id), std::make_tuple(std::ref(db_), tx_id)) diff --git a/src/distributed/updates_rpc_server.hpp b/src/distributed/updates_rpc_server.hpp index f18f5fff1..de3bef334 100644 --- a/src/distributed/updates_rpc_server.hpp +++ b/src/distributed/updates_rpc_server.hpp @@ -32,7 +32,7 @@ class UpdatesRpcServer { template class TransactionUpdates { public: - TransactionUpdates(database::GraphDb &db, tx::transaction_id_t tx_id) + TransactionUpdates(database::GraphDb &db, tx::TransactionId tx_id) : db_accessor_(db, tx_id) {} /// Adds a delta and returns the result. Does not modify the state (data) of @@ -74,25 +74,25 @@ class UpdatesRpcServer { /// Applies all existsing updates for the given transaction ID. If there are /// no updates for that transaction, nothing happens. Clears the updates cache /// after applying them, regardless of the result. - UpdateResult Apply(tx::transaction_id_t tx_id); + UpdateResult Apply(tx::TransactionId tx_id); /// Clears the cache of local transactions that are completed. The signature /// of this method is dictated by `distributed::TransactionalCacheCleaner`. - void ClearTransactionalCache(tx::transaction_id_t oldest_active); + void ClearTransactionalCache(tx::TransactionId oldest_active); private: database::GraphDb &db_; template using MapT = - ConcurrentMap>; + ConcurrentMap>; MapT vertex_updates_; MapT edge_updates_; // Gets/creates the TransactionUpdates for the given transaction. template TransactionUpdates &GetUpdates(MapT &updates, - tx::transaction_id_t tx_id); + tx::TransactionId tx_id); // Performs edge creation for the given request. CreateResult CreateEdge(const CreateEdgeReqData &req); diff --git a/src/durability/paths.cpp b/src/durability/paths.cpp index 3f47ac2ce..bf61cd350 100644 --- a/src/durability/paths.cpp +++ b/src/durability/paths.cpp @@ -34,7 +34,7 @@ void CheckDurabilityDir(const std::string &durability_dir) { } } -std::experimental::optional TransactionIdFromWalFilename( +std::experimental::optional TransactionIdFromWalFilename( const std::string &name) { auto nullopt = std::experimental::nullopt; // Get the max_transaction_id from the file name that has format @@ -45,7 +45,7 @@ std::experimental::optional TransactionIdFromWalFilename( return nullopt; } if (utils::StartsWith(file_name_split[1], "current")) - return std::numeric_limits::max(); + return std::numeric_limits::max(); file_name_split = utils::Split(file_name_split[1], "_"); if (file_name_split.size() != 5) { LOG(WARNING) << "Unable to parse WAL file name: " << name; @@ -64,7 +64,7 @@ std::experimental::optional TransactionIdFromWalFilename( } fs::path MakeSnapshotPath(const fs::path &durability_dir, const int worker_id, - tx::transaction_id_t tx_id) { + tx::TransactionId tx_id) { std::string date_str = Timestamp(Timestamp::Now()) .ToString("{:04d}_{:02d}_{:02d}__{:02d}_{:02d}_{:02d}_{:05d}"); @@ -78,7 +78,7 @@ fs::path MakeSnapshotPath(const fs::path &durability_dir, const int worker_id, /// WAL file for which the max tx id is still unknown. fs::path WalFilenameForTransactionId( const std::experimental::filesystem::path &wal_dir, int worker_id, - std::experimental::optional tx_id) { + std::experimental::optional tx_id) { auto file_name = Timestamp::Now().ToIso8601(); if (tx_id) { file_name += "__max_transaction_" + std::to_string(*tx_id); @@ -89,7 +89,7 @@ fs::path WalFilenameForTransactionId( return wal_dir / file_name; } -std::experimental::optional +std::experimental::optional TransactionIdFromSnapshotFilename(const std::string &name) { auto nullopt = std::experimental::nullopt; auto file_name_split = utils::RSplit(name, "_tx_", 1); diff --git a/src/durability/paths.hpp b/src/durability/paths.hpp index 2c9967d54..d4b38356d 100644 --- a/src/durability/paths.hpp +++ b/src/durability/paths.hpp @@ -22,7 +22,7 @@ void CheckDurabilityDir(const std::string &durability_dir); /// is returned because that's appropriate for the recovery logic (the current /// WAL does not yet have a maximum transaction ID and can't be discarded by /// the recovery regardless of the snapshot from which the transaction starts). -std::experimental::optional TransactionIdFromWalFilename( +std::experimental::optional TransactionIdFromWalFilename( const std::string &name); /** Generates a path for a DB snapshot in the given folder in a well-defined @@ -30,11 +30,11 @@ std::experimental::optional TransactionIdFromWalFilename( * created appended to the file name. */ std::experimental::filesystem::path MakeSnapshotPath( const std::experimental::filesystem::path &durability_dir, int worker_id, - tx::transaction_id_t tx_id); + tx::TransactionId tx_id); /// Returns the transaction id contained in the file name. If the filename is /// not a parseable WAL file name, nullopt is returned. -std::experimental::optional +std::experimental::optional TransactionIdFromSnapshotFilename(const std::string &name); /// Generates a file path for a write-ahead log file of a specified worker. If @@ -42,6 +42,6 @@ TransactionIdFromSnapshotFilename(const std::string &name); /// path is for the "current" WAL file for which the max tx id is still unknown. std::experimental::filesystem::path WalFilenameForTransactionId( const std::experimental::filesystem::path &wal_dir, int worker_id, - std::experimental::optional tx_id = + std::experimental::optional tx_id = std::experimental::nullopt); } // namespace durability diff --git a/src/durability/recovery.cpp b/src/durability/recovery.cpp index cea21a855..094c2bed7 100644 --- a/src/durability/recovery.cpp +++ b/src/durability/recovery.cpp @@ -36,9 +36,9 @@ using communication::bolt::DecodedValue; // A data structure for exchanging info between main recovery function and // snapshot and WAL recovery functions. struct RecoveryData { - tx::transaction_id_t snapshooter_tx_id{0}; - tx::transaction_id_t wal_max_recovered_tx_id{0}; - std::vector snapshooter_tx_snapshot; + tx::TransactionId snapshooter_tx_id{0}; + tx::TransactionId wal_max_recovered_tx_id{0}; + std::vector snapshooter_tx_snapshot; // A collection into which the indexes should be added so they // can be rebuilt at the end of the recovery transaction. std::vector> indexes; @@ -231,7 +231,7 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db, // than the latest one we have recovered. Do this to make sure that // subsequently created snapshots and WAL files will have transactional info // that does not interfere with that found in previous snapshots and WAL. - tx::transaction_id_t max_id = recovery_data.snapshooter_tx_id; + tx::TransactionId max_id = recovery_data.snapshooter_tx_id; auto &snap = recovery_data.snapshooter_tx_snapshot; if (!snap.empty()) max_id = *std::max_element(snap.begin(), snap.end()); dba.db().tx_engine().EnsureNextIdGreater(max_id); @@ -257,15 +257,15 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, auto first_to_recover = tx_sn.empty() ? recovery_data.snapshooter_tx_id + 1 : *std::min(tx_sn.begin(), tx_sn.end()); auto should_skip = [&tx_sn, &recovery_data, - first_to_recover](tx::transaction_id_t tx_id) { + first_to_recover](tx::TransactionId tx_id) { return tx_id < first_to_recover || (tx_id < recovery_data.snapshooter_tx_id && !utils::Contains(tx_sn, tx_id)); }; - std::unordered_map accessors; + std::unordered_map accessors; auto get_accessor = - [&accessors](tx::transaction_id_t tx_id) -> database::GraphDbAccessor & { + [&accessors](tx::TransactionId tx_id) -> database::GraphDbAccessor & { auto found = accessors.find(tx_id); CHECK(found != accessors.end()) << "Accessor does not exist for transaction: " << tx_id; @@ -276,7 +276,7 @@ bool RecoverWal(const fs::path &wal_dir, database::GraphDb &db, // then the latest one we have recovered. Do this to make sure that // subsequently created snapshots and WAL files will have transactional info // that does not interfere with that found in previous snapshots and WAL. - tx::transaction_id_t max_observed_tx_id{0}; + tx::TransactionId max_observed_tx_id{0}; // Read all the WAL files whose max_tx_id is not smaller than // min_tx_to_recover. diff --git a/src/durability/recovery.hpp b/src/durability/recovery.hpp index 095db517d..ccb8b5f28 100644 --- a/src/durability/recovery.hpp +++ b/src/durability/recovery.hpp @@ -16,11 +16,11 @@ namespace durability { /// Stores info on what was (or needs to be) recovered from durability. struct RecoveryInfo { RecoveryInfo() {} - RecoveryInfo(tx::transaction_id_t snapshot_tx_id, - tx::transaction_id_t max_wal_tx_id) + RecoveryInfo(tx::TransactionId snapshot_tx_id, + tx::TransactionId max_wal_tx_id) : snapshot_tx_id(snapshot_tx_id), max_wal_tx_id(max_wal_tx_id) {} - tx::transaction_id_t snapshot_tx_id; - tx::transaction_id_t max_wal_tx_id; + tx::TransactionId snapshot_tx_id; + tx::TransactionId max_wal_tx_id; bool operator==(const RecoveryInfo &other) const { return snapshot_tx_id == other.snapshot_tx_id && diff --git a/src/durability/wal.hpp b/src/durability/wal.hpp index d7c356e07..1bdd1afb1 100644 --- a/src/durability/wal.hpp +++ b/src/durability/wal.hpp @@ -75,7 +75,7 @@ class WriteAheadLog { // The latest transaction whose delta is recorded in the current WAL file. // Zero indicates that no deltas have so far been written to the current WAL // file. - tx::transaction_id_t latest_tx_{0}; + tx::TransactionId latest_tx_{0}; void RotateFile(); }; diff --git a/src/mvcc/record.hpp b/src/mvcc/record.hpp index 98e20853b..15df71f5a 100644 --- a/src/mvcc/record.hpp +++ b/src/mvcc/record.hpp @@ -31,8 +31,8 @@ class Record : public Version { // again. i know, it happened to me. // fetch expiration info in a safe way (see fetch_exp for details) - tx::transaction_id_t tx_exp; - tx::command_id_t cmd_exp; + tx::TransactionId tx_exp; + tx::CommandId cmd_exp; std::tie(tx_exp, cmd_exp) = fetch_exp(); return ((tx_.cre == t.id_ && // inserted by the current transaction @@ -109,8 +109,8 @@ class Record : public Version { // queries which can match, update and return in the same query bool is_visible_write(const tx::Transaction &t) { // fetch expiration info in a safe way (see fetch_exp for details) - tx::transaction_id_t tx_exp; - tx::command_id_t cmd_exp; + tx::TransactionId tx_exp; + tx::CommandId cmd_exp; std::tie(tx_exp, cmd_exp) = fetch_exp(); return (tx_.cre == t.id_ && // inserted by the current transaction @@ -150,7 +150,7 @@ class Record : public Version { // Exp is aborted and we can't set the hint, this way we don't have to set // the hint because an aborted transaction which expires a record is the // same thing as a non-expired record - tx::transaction_id_t expected; + tx::TransactionId expected; do { expected = tx_.exp; // If the transaction expiry is no longer aborted we don't need to @@ -208,13 +208,13 @@ class Record : public Version { // and tx.exp is the id of the transaction that deleted the record // These values are used to determine the visibility of the record // to the current transaction. - CreExp tx_; + CreExp tx_; // cmd.cre is the id of the command in this transaction that created the // record and cmd.exp is the id of the command in this transaction that // deleted the record. These values are used to determine the visibility // of the record to the current command in the running transaction. - CreExp cmd_; + CreExp cmd_; mutable Hints hints_; /** Fetch the (transaction, command) expiration before the check @@ -222,8 +222,8 @@ class Record : public Version { * Do it in a loop to ensure that command is consistent with transaction. */ auto fetch_exp() const { - tx::transaction_id_t tx_exp; - tx::command_id_t cmd_exp; + tx::TransactionId tx_exp; + tx::CommandId cmd_exp; do { tx_exp = tx_.exp; cmd_exp = cmd_.exp; @@ -274,7 +274,7 @@ class Record : public Version { * @param id - id to check if it's commited and visible * @return true if the id is commited and visible for the transaction t. */ - bool visible_from(uint8_t mask, tx::transaction_id_t id, + bool visible_from(uint8_t mask, tx::TransactionId id, const tx::Transaction &t) { DCHECK(mask == Hints::kCre || mask == Hints::kExp) << "Mask must be either kCre or kExp"; diff --git a/src/storage/deferred_deleter.hpp b/src/storage/deferred_deleter.hpp index 54bed0bb0..4a069ec9a 100644 --- a/src/storage/deferred_deleter.hpp +++ b/src/storage/deferred_deleter.hpp @@ -22,8 +22,8 @@ class DeferredDeleter { */ struct DeletedObject { const T *object; - const tx::transaction_id_t deleted_at; - DeletedObject(const T *object, tx::transaction_id_t deleted_at) + const tx::TransactionId deleted_at; + DeletedObject(const T *object, tx::TransactionId deleted_at) : object(object), deleted_at(deleted_at) {} }; @@ -44,7 +44,7 @@ class DeferredDeleter { */ void AddObjects(const std::vector &objects) { auto previous_tx_id = objects_.empty() - ? std::numeric_limits::min() + ? std::numeric_limits::min() : objects_.back().deleted_at; for (auto object : objects) { CHECK(previous_tx_id <= object.deleted_at) @@ -58,7 +58,7 @@ class DeferredDeleter { * @brief - Free memory of objects deleted before the id. * @param id - delete before this id */ - void FreeExpiredObjects(tx::transaction_id_t id) { + void FreeExpiredObjects(tx::TransactionId id) { auto it = objects_.begin(); while (it != objects_.end() && it->deleted_at < id) { delete it->object; diff --git a/src/storage/locking/record_lock.cpp b/src/storage/locking/record_lock.cpp index c2ea27ac9..98d5a691b 100644 --- a/src/storage/locking/record_lock.cpp +++ b/src/storage/locking/record_lock.cpp @@ -17,10 +17,10 @@ namespace { // transaction in that cycle. If start transaction is not in a cycle nullopt is // returned. template -std::experimental::optional FindOldestTxInLockCycle( - tx::transaction_id_t start, TAccessor &graph_accessor) { - std::vector path; - std::unordered_set visited; +std::experimental::optional FindOldestTxInLockCycle( + tx::TransactionId start, TAccessor &graph_accessor) { + std::vector path; + std::unordered_set visited; auto current = start; @@ -45,8 +45,8 @@ std::experimental::optional FindOldestTxInLockCycle( } // namespace -bool RecordLock::TryLock(tx::transaction_id_t tx_id) { - tx::transaction_id_t unlocked{0}; +bool RecordLock::TryLock(tx::TransactionId tx_id) { + tx::TransactionId unlocked{0}; return owner_.compare_exchange_strong(unlocked, tx_id); } @@ -55,7 +55,7 @@ LockStatus RecordLock::Lock(const tx::Transaction &tx, tx::Engine &engine) { return LockStatus::Acquired; } - tx::transaction_id_t owner = owner_; + tx::TransactionId owner = owner_; if (owner_ == tx.id_) return LockStatus::AlreadyHeld; // In a distributed worker the transaction objects (and the locks they own) diff --git a/src/storage/locking/record_lock.hpp b/src/storage/locking/record_lock.hpp index 12046cfd3..7ecd12775 100644 --- a/src/storage/locking/record_lock.hpp +++ b/src/storage/locking/record_lock.hpp @@ -21,11 +21,11 @@ class RecordLock { void Unlock(); private: - bool TryLock(tx::transaction_id_t tx_id); + bool TryLock(tx::TransactionId tx_id); // Arbitrary choosen constant, postgresql uses 1 second so do we. constexpr static std::chrono::duration kTimeout{ std::chrono::seconds(1)}; - std::atomic owner_{0}; + std::atomic owner_{0}; }; diff --git a/src/transactions/commit_log.hpp b/src/transactions/commit_log.hpp index 5f4b6eb5f..e446f59a0 100644 --- a/src/transactions/commit_log.hpp +++ b/src/transactions/commit_log.hpp @@ -18,25 +18,25 @@ class CommitLog { CommitLog &operator=(const CommitLog &) = delete; CommitLog &operator=(CommitLog &&) = delete; - bool is_active(transaction_id_t id) const { + bool is_active(TransactionId id) const { return fetch_info(id).is_active(); } - bool is_committed(transaction_id_t id) const { + bool is_committed(TransactionId id) const { return fetch_info(id).is_committed(); } - void set_committed(transaction_id_t id) { log.set(2 * id); } + void set_committed(TransactionId id) { log.set(2 * id); } - bool is_aborted(transaction_id_t id) const { + bool is_aborted(TransactionId id) const { return fetch_info(id).is_aborted(); } - void set_aborted(transaction_id_t id) { log.set(2 * id + 1); } + void set_aborted(TransactionId id) { log.set(2 * id + 1); } // Clears the commit log from bits associated with transactions with an id // lower than `id`. - void garbage_collect_older(transaction_id_t id) { log.delete_prefix(2 * id); } + void garbage_collect_older(TransactionId id) { log.delete_prefix(2 * id); } class Info { public: @@ -68,7 +68,7 @@ class CommitLog { uint8_t flags_{0}; }; - Info fetch_info(transaction_id_t id) const { return Info{log.at(2 * id, 2)}; } + Info fetch_info(TransactionId id) const { return Info{log.at(2 * id, 2)}; } private: DynamicBitset log; diff --git a/src/transactions/engine.hpp b/src/transactions/engine.hpp index 621989f4b..771cd1d23 100644 --- a/src/transactions/engine.hpp +++ b/src/transactions/engine.hpp @@ -31,10 +31,10 @@ class Engine { virtual Transaction *Begin() = 0; /// Advances the command on the transaction with the given id. - virtual command_id_t Advance(transaction_id_t id) = 0; + virtual CommandId Advance(TransactionId id) = 0; /// Updates the command on the workers to the master's value. - virtual command_id_t UpdateCommand(transaction_id_t id) = 0; + virtual CommandId UpdateCommand(TransactionId id) = 0; /// Comits the given transaction. Deletes the transaction object, it's not /// valid after this function executes. @@ -45,7 +45,7 @@ class Engine { virtual void Abort(const Transaction &t) = 0; /** Returns the commit log Info about the given transaction. */ - virtual CommitLog::Info Info(transaction_id_t tx) const = 0; + virtual CommitLog::Info Info(TransactionId tx) const = 0; /** Returns the snapshot relevant to garbage collection of database records. * @@ -69,29 +69,29 @@ class Engine { virtual Snapshot GlobalActiveTransactions() = 0; /** Returns the ID the last globally known transaction. */ - virtual tx::transaction_id_t GlobalLast() const = 0; + virtual tx::TransactionId GlobalLast() const = 0; /** Returns the ID of last locally known transaction. */ - virtual tx::transaction_id_t LocalLast() const = 0; + virtual tx::TransactionId LocalLast() const = 0; /** Returns the ID of the oldest transaction locally known to be active. It is * guaranteed that all the transactions older than the returned are globally * not active. */ - virtual transaction_id_t LocalOldestActive() const = 0; + virtual TransactionId LocalOldestActive() const = 0; /** Calls function f on each locally active transaction. */ virtual void LocalForEachActiveTransaction( std::function f) = 0; /** Gets a transaction object for a running transaction. */ - virtual tx::Transaction *RunningTransaction(transaction_id_t tx_id) = 0; + virtual tx::Transaction *RunningTransaction(TransactionId tx_id) = 0; /** Ensures the next transaction that starts will have the ID greater than * the given id. */ - virtual void EnsureNextIdGreater(transaction_id_t tx_id) = 0; + virtual void EnsureNextIdGreater(TransactionId tx_id) = 0; /** Garbage collects transactions older than tx_id from commit log. */ - virtual void GarbageCollectCommitLog(transaction_id_t tx_id) = 0; + virtual void GarbageCollectCommitLog(TransactionId tx_id) = 0; auto &local_lock_graph() { return local_lock_graph_; } const auto &local_lock_graph() const { return local_lock_graph_; } @@ -100,6 +100,6 @@ class Engine { // Map lock dependencies. Each entry maps (tx_that_wants_lock, // tx_that_holds_lock). Used for local deadlock resolution. // TODO consider global deadlock resolution. - ConcurrentMap local_lock_graph_; + ConcurrentMap local_lock_graph_; }; } // namespace tx diff --git a/src/transactions/engine_rpc_messages.hpp b/src/transactions/engine_rpc_messages.hpp index 8bde8463e..9f948813c 100644 --- a/src/transactions/engine_rpc_messages.hpp +++ b/src/transactions/engine_rpc_messages.hpp @@ -9,7 +9,7 @@ namespace tx { RPC_NO_MEMBER_MESSAGE(BeginReq); struct TxAndSnapshot { - transaction_id_t tx_id; + TransactionId tx_id; Snapshot snapshot; private: @@ -23,32 +23,32 @@ struct TxAndSnapshot { RPC_SINGLE_MEMBER_MESSAGE(BeginRes, TxAndSnapshot); using BeginRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(AdvanceReq, transaction_id_t); -RPC_SINGLE_MEMBER_MESSAGE(AdvanceRes, command_id_t); +RPC_SINGLE_MEMBER_MESSAGE(AdvanceReq, TransactionId); +RPC_SINGLE_MEMBER_MESSAGE(AdvanceRes, CommandId); using AdvanceRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(CommitReq, transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(CommitReq, TransactionId); RPC_NO_MEMBER_MESSAGE(CommitRes); using CommitRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(AbortReq, transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(AbortReq, TransactionId); RPC_NO_MEMBER_MESSAGE(AbortRes); using AbortRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(SnapshotReq, transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(SnapshotReq, TransactionId); RPC_SINGLE_MEMBER_MESSAGE(SnapshotRes, Snapshot); using SnapshotRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(CommandReq, transaction_id_t); -RPC_SINGLE_MEMBER_MESSAGE(CommandRes, command_id_t); +RPC_SINGLE_MEMBER_MESSAGE(CommandReq, TransactionId); +RPC_SINGLE_MEMBER_MESSAGE(CommandRes, CommandId); using CommandRpc = communication::rpc::RequestResponse; RPC_NO_MEMBER_MESSAGE(GcSnapshotReq); using GcSnapshotRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(ClogInfoReq, transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(ClogInfoReq, TransactionId); RPC_SINGLE_MEMBER_MESSAGE(ClogInfoRes, CommitLog::Info); using ClogInfoRpc = communication::rpc::RequestResponse; @@ -57,14 +57,14 @@ RPC_NO_MEMBER_MESSAGE(ActiveTransactionsReq); using ActiveTransactionsRpc = communication::rpc::RequestResponse; -RPC_SINGLE_MEMBER_MESSAGE(EnsureNextIdGreaterReq, transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(EnsureNextIdGreaterReq, TransactionId); RPC_NO_MEMBER_MESSAGE(EnsureNextIdGreaterRes); using EnsureNextIdGreaterRpc = communication::rpc::RequestResponse; RPC_NO_MEMBER_MESSAGE(GlobalLastReq); -RPC_SINGLE_MEMBER_MESSAGE(GlobalLastRes, transaction_id_t); +RPC_SINGLE_MEMBER_MESSAGE(GlobalLastRes, TransactionId); using GlobalLastRpc = communication::rpc::RequestResponse; } // namespace tx diff --git a/src/transactions/engine_single_node.cpp b/src/transactions/engine_single_node.cpp index e1b0d0f94..dacc4c233 100644 --- a/src/transactions/engine_single_node.cpp +++ b/src/transactions/engine_single_node.cpp @@ -16,7 +16,7 @@ Transaction *SingleNodeEngine::Begin() { VLOG(11) << "[Tx] Starting transaction " << counter_ + 1; std::lock_guard guard(lock_); - transaction_id_t id{++counter_}; + TransactionId id{++counter_}; auto t = new Transaction(id, active_, *this); active_.insert(id); store_.emplace(id, t); @@ -26,7 +26,7 @@ Transaction *SingleNodeEngine::Begin() { return t; } -command_id_t SingleNodeEngine::Advance(transaction_id_t id) { +CommandId SingleNodeEngine::Advance(TransactionId id) { std::lock_guard guard(lock_); auto it = store_.find(id); @@ -34,7 +34,7 @@ command_id_t SingleNodeEngine::Advance(transaction_id_t id) { << "Transaction::advance on non-existing transaction"; Transaction *t = it->second.get(); - if (t->cid_ == std::numeric_limits::max()) + if (t->cid_ == std::numeric_limits::max()) throw TransactionError( "Reached maximum number of commands in this " "transaction."); @@ -42,7 +42,7 @@ command_id_t SingleNodeEngine::Advance(transaction_id_t id) { return ++(t->cid_); } -command_id_t SingleNodeEngine::UpdateCommand(transaction_id_t id) { +CommandId SingleNodeEngine::UpdateCommand(TransactionId id) { std::lock_guard guard(lock_); auto it = store_.find(id); DCHECK(it != store_.end()) @@ -72,7 +72,7 @@ void SingleNodeEngine::Abort(const Transaction &t) { store_.erase(store_.find(t.id_)); } -CommitLog::Info SingleNodeEngine::Info(transaction_id_t tx) const { +CommitLog::Info SingleNodeEngine::Info(TransactionId tx) const { return clog_.fetch_info(tx); } @@ -98,19 +98,19 @@ Snapshot SingleNodeEngine::GlobalActiveTransactions() { return active_transactions; } -transaction_id_t SingleNodeEngine::LocalLast() const { +TransactionId SingleNodeEngine::LocalLast() const { std::lock_guard guard(lock_); return counter_; } -transaction_id_t SingleNodeEngine::GlobalLast() const { return LocalLast(); } +TransactionId SingleNodeEngine::GlobalLast() const { return LocalLast(); } -transaction_id_t SingleNodeEngine::LocalOldestActive() const { +TransactionId SingleNodeEngine::LocalOldestActive() const { std::lock_guard guard(lock_); return active_.empty() ? counter_ + 1 : active_.front(); } -void SingleNodeEngine::GarbageCollectCommitLog(transaction_id_t tx_id) { +void SingleNodeEngine::GarbageCollectCommitLog(TransactionId tx_id) { clog_.garbage_collect_older(tx_id); } @@ -122,7 +122,7 @@ void SingleNodeEngine::LocalForEachActiveTransaction( } } -Transaction *SingleNodeEngine::RunningTransaction(transaction_id_t tx_id) { +Transaction *SingleNodeEngine::RunningTransaction(TransactionId tx_id) { std::lock_guard guard(lock_); auto found = store_.find(tx_id); CHECK(found != store_.end()) @@ -130,7 +130,7 @@ Transaction *SingleNodeEngine::RunningTransaction(transaction_id_t tx_id) { return found->second.get(); } -void SingleNodeEngine::EnsureNextIdGreater(transaction_id_t tx_id) { +void SingleNodeEngine::EnsureNextIdGreater(TransactionId tx_id) { std::lock_guard guard(lock_); counter_ = std::max(tx_id, counter_); } diff --git a/src/transactions/engine_single_node.hpp b/src/transactions/engine_single_node.hpp index 34519c3c5..85d88c0f1 100644 --- a/src/transactions/engine_single_node.hpp +++ b/src/transactions/engine_single_node.hpp @@ -30,26 +30,26 @@ class SingleNodeEngine : public Engine { explicit SingleNodeEngine(durability::WriteAheadLog *wal = nullptr); Transaction *Begin() override; - command_id_t Advance(transaction_id_t id) override; - command_id_t UpdateCommand(transaction_id_t id) override; + CommandId Advance(TransactionId id) override; + CommandId UpdateCommand(TransactionId id) override; void Commit(const Transaction &t) override; void Abort(const Transaction &t) override; - CommitLog::Info Info(transaction_id_t tx) const override; + CommitLog::Info Info(TransactionId tx) const override; Snapshot GlobalGcSnapshot() override; Snapshot GlobalActiveTransactions() override; - transaction_id_t GlobalLast() const override; - transaction_id_t LocalLast() const override; - transaction_id_t LocalOldestActive() const override; + TransactionId GlobalLast() const override; + TransactionId LocalLast() const override; + TransactionId LocalOldestActive() const override; void LocalForEachActiveTransaction( std::function f) override; - Transaction *RunningTransaction(transaction_id_t tx_id) override; - void EnsureNextIdGreater(transaction_id_t tx_id) override; - void GarbageCollectCommitLog(transaction_id_t tx_id) override; + Transaction *RunningTransaction(TransactionId tx_id) override; + void EnsureNextIdGreater(TransactionId tx_id) override; + void GarbageCollectCommitLog(TransactionId tx_id) override; private: - transaction_id_t counter_{0}; + TransactionId counter_{0}; CommitLog clog_; - std::unordered_map> store_; + std::unordered_map> store_; Snapshot active_; mutable SpinLock lock_; // Optional. If present, the Engine will write tx Begin/Commit/Abort diff --git a/src/transactions/engine_worker.cpp b/src/transactions/engine_worker.cpp index 43776329b..2066d60e6 100644 --- a/src/transactions/engine_worker.cpp +++ b/src/transactions/engine_worker.cpp @@ -29,7 +29,7 @@ Transaction *WorkerEngine::Begin() { return tx; } -command_id_t WorkerEngine::Advance(transaction_id_t tx_id) { +CommandId WorkerEngine::Advance(TransactionId tx_id) { auto res = master_client_pool_.Call(tx_id); CHECK(res) << "AdvanceRpc failed"; auto access = active_.access(); @@ -40,7 +40,7 @@ command_id_t WorkerEngine::Advance(transaction_id_t tx_id) { return res->member; } -command_id_t WorkerEngine::UpdateCommand(transaction_id_t tx_id) { +CommandId WorkerEngine::UpdateCommand(TransactionId tx_id) { auto res = master_client_pool_.Call(tx_id); CHECK(res) << "CommandRpc failed"; auto cmd_id = res->member; @@ -74,7 +74,7 @@ void WorkerEngine::Abort(const Transaction &t) { ClearSingleTransaction(t.id_); } -CommitLog::Info WorkerEngine::Info(transaction_id_t tid) const { +CommitLog::Info WorkerEngine::Info(TransactionId tid) const { auto info = clog_.fetch_info(tid); // If we don't know the transaction to be commited nor aborted, ask the // master about it and update the local commit log. @@ -110,8 +110,8 @@ Snapshot WorkerEngine::GlobalActiveTransactions() { return snapshot; } -transaction_id_t WorkerEngine::LocalLast() const { return local_last_; } -transaction_id_t WorkerEngine::GlobalLast() const { +TransactionId WorkerEngine::LocalLast() const { return local_last_; } +TransactionId WorkerEngine::GlobalLast() const { auto res = master_client_pool_.Call(); CHECK(res) << "GlobalLastRpc failed"; return res->member; @@ -122,11 +122,11 @@ void WorkerEngine::LocalForEachActiveTransaction( for (auto pair : active_.access()) f(*pair.second); } -transaction_id_t WorkerEngine::LocalOldestActive() const { +TransactionId WorkerEngine::LocalOldestActive() const { return oldest_active_; } -Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) { +Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id) { auto accessor = active_.access(); auto found = accessor.find(tx_id); if (found != accessor.end()) return found->second; @@ -138,7 +138,7 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id) { return RunningTransaction(tx_id, snapshot); } -Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id, +Transaction *WorkerEngine::RunningTransaction(TransactionId tx_id, const Snapshot &snapshot) { auto accessor = active_.access(); auto found = accessor.find(tx_id); @@ -152,7 +152,7 @@ Transaction *WorkerEngine::RunningTransaction(transaction_id_t tx_id, } void WorkerEngine::ClearTransactionalCache( - transaction_id_t oldest_active) const { + TransactionId oldest_active) const { auto access = active_.access(); for (auto kv : access) { if (kv.first < oldest_active) { @@ -164,7 +164,7 @@ void WorkerEngine::ClearTransactionalCache( } } -void WorkerEngine::ClearSingleTransaction(transaction_id_t tx_id) const { +void WorkerEngine::ClearSingleTransaction(TransactionId tx_id) const { auto access = active_.access(); auto found = access.find(tx_id); if (found != access.end()) { @@ -176,7 +176,7 @@ void WorkerEngine::ClearSingleTransaction(transaction_id_t tx_id) const { } void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot, - transaction_id_t alternative) { + TransactionId alternative) { if (snapshot.empty()) { oldest_active_.store(std::max(alternative, oldest_active_.load())); } else { @@ -184,11 +184,11 @@ void WorkerEngine::UpdateOldestActive(const Snapshot &snapshot, } } -void WorkerEngine::EnsureNextIdGreater(transaction_id_t tx_id) { +void WorkerEngine::EnsureNextIdGreater(TransactionId tx_id) { master_client_pool_.Call(tx_id); } -void WorkerEngine::GarbageCollectCommitLog(transaction_id_t tx_id) { +void WorkerEngine::GarbageCollectCommitLog(TransactionId tx_id) { clog_.garbage_collect_older(tx_id); } } // namespace tx diff --git a/src/transactions/engine_worker.hpp b/src/transactions/engine_worker.hpp index 5c3c3f383..1c4aad26c 100644 --- a/src/transactions/engine_worker.hpp +++ b/src/transactions/engine_worker.hpp @@ -24,35 +24,35 @@ class WorkerEngine : public Engine { ~WorkerEngine(); Transaction *Begin() override; - command_id_t Advance(transaction_id_t id) override; - command_id_t UpdateCommand(transaction_id_t id) override; + CommandId Advance(TransactionId id) override; + CommandId UpdateCommand(TransactionId id) override; void Commit(const Transaction &t) override; void Abort(const Transaction &t) override; - CommitLog::Info Info(transaction_id_t tid) const override; + CommitLog::Info Info(TransactionId tid) const override; Snapshot GlobalGcSnapshot() override; Snapshot GlobalActiveTransactions() override; - transaction_id_t GlobalLast() const override; - transaction_id_t LocalLast() const override; + TransactionId GlobalLast() const override; + TransactionId LocalLast() const override; void LocalForEachActiveTransaction( std::function f) override; - transaction_id_t LocalOldestActive() const override; - Transaction *RunningTransaction(transaction_id_t tx_id) override; + TransactionId LocalOldestActive() const override; + Transaction *RunningTransaction(TransactionId tx_id) override; // Caches the transaction for the given info an returs a ptr to it. - Transaction *RunningTransaction(transaction_id_t tx_id, + Transaction *RunningTransaction(TransactionId tx_id, const Snapshot &snapshot); - void EnsureNextIdGreater(transaction_id_t tx_id) override; - void GarbageCollectCommitLog(tx::transaction_id_t tx_id) override; + void EnsureNextIdGreater(TransactionId tx_id) override; + void GarbageCollectCommitLog(tx::TransactionId tx_id) override; /// Clears the cache of local transactions that have expired. The signature of /// this method is dictated by `distributed::TransactionalCacheCleaner`. - void ClearTransactionalCache(transaction_id_t oldest_active) const; + void ClearTransactionalCache(TransactionId oldest_active) const; private: // Local caches. - mutable ConcurrentMap active_; - std::atomic local_last_{0}; + mutable ConcurrentMap active_; + std::atomic local_last_{0}; // Mutable because just getting info can cause a cache fill. mutable CommitLog clog_; @@ -61,14 +61,14 @@ class WorkerEngine : public Engine { // Used for clearing of caches of transactions that have expired. // Initialize the oldest_active_ with 1 because there's never a tx with id=0 - std::atomic oldest_active_{1}; + std::atomic oldest_active_{1}; // Removes a single transaction from the cache, if present. - void ClearSingleTransaction(transaction_id_t tx_Id) const; + void ClearSingleTransaction(TransactionId tx_Id) const; // Updates the oldest active transaction to the one from the snapshot. If the // snapshot is empty, it's set to the given alternative. void UpdateOldestActive(const Snapshot &snapshot, - transaction_id_t alternative); + TransactionId alternative); }; } // namespace tx diff --git a/src/transactions/snapshot.hpp b/src/transactions/snapshot.hpp index 03a3dbddf..3cc1ca0d6 100644 --- a/src/transactions/snapshot.hpp +++ b/src/transactions/snapshot.hpp @@ -23,7 +23,7 @@ class Engine; class Snapshot { public: Snapshot() = default; - Snapshot(std::vector &&active) + Snapshot(std::vector &&active) : transaction_ids_(std::move(active)) {} // all the copy/move constructors/assignments act naturally @@ -32,7 +32,7 @@ class Snapshot { * * @param xid - The transcation id in question */ - bool contains(transaction_id_t id) const { + bool contains(TransactionId id) const { return std::binary_search(transaction_ids_.begin(), transaction_ids_.end(), id); } @@ -43,7 +43,7 @@ class Snapshot { * * @param id - the transaction id to add */ - void insert(transaction_id_t id) { + void insert(TransactionId id) { transaction_ids_.push_back(id); DCHECK(std::is_sorted(transaction_ids_.begin(), transaction_ids_.end())) << "Snapshot must be sorted"; @@ -52,18 +52,18 @@ class Snapshot { /** Removes the given transaction id from this Snapshot. * * @param id - the transaction id to remove */ - void remove(transaction_id_t id) { + void remove(TransactionId id) { auto last = std::remove(transaction_ids_.begin(), transaction_ids_.end(), id); transaction_ids_.erase(last, transaction_ids_.end()); } - transaction_id_t front() const { + TransactionId front() const { DCHECK(transaction_ids_.size()) << "Snapshot.front() on empty Snapshot"; return transaction_ids_.front(); } - transaction_id_t back() const { + TransactionId back() const { DCHECK(transaction_ids_.size()) << "Snapshot.back() on empty Snapshot"; return transaction_ids_.back(); } @@ -94,6 +94,6 @@ class Snapshot { ar &transaction_ids_; } - std::vector transaction_ids_; + std::vector transaction_ids_; }; } // namespace tx diff --git a/src/transactions/transaction.hpp b/src/transactions/transaction.hpp index 4db792459..9613023d2 100644 --- a/src/transactions/transaction.hpp +++ b/src/transactions/transaction.hpp @@ -21,8 +21,8 @@ namespace tx { class Transaction { public: /** Returns the maximum possible transcation id */ - static transaction_id_t MaxId() { - return std::numeric_limits::max(); + static TransactionId MaxId() { + return std::numeric_limits::max(); } private: @@ -31,7 +31,7 @@ class Transaction { friend class WorkerEngine; // The constructor is private, only the Engine ever uses it. - Transaction(transaction_id_t id, const Snapshot &snapshot, Engine &engine) + Transaction(TransactionId id, const Snapshot &snapshot, Engine &engine) : id_(id), engine_(engine), snapshot_(snapshot) {} // A transaction can't be moved nor copied. it's owned by the transaction @@ -47,7 +47,7 @@ class Transaction { void TakeLock(RecordLock &lock) const { locks_.Take(&lock, *this, engine_); } /** Transaction's id. Unique in the engine that owns it */ - const transaction_id_t id_; + const TransactionId id_; /** The transaction engine to which this transaction belongs */ Engine &engine_; @@ -71,7 +71,7 @@ class Transaction { private: // Index of the current command in the current transaction. - command_id_t cid_{1}; + CommandId cid_{1}; // A snapshot of currently active transactions. const Snapshot snapshot_; diff --git a/src/transactions/type.hpp b/src/transactions/type.hpp index 87f5f61e5..991ddc8fb 100644 --- a/src/transactions/type.hpp +++ b/src/transactions/type.hpp @@ -5,8 +5,8 @@ namespace tx { /** Type of a tx::Transcation's id member */ - using transaction_id_t = uint64_t; + using TransactionId = uint64_t; /** Type of a tx::Transcation's command id member */ - using command_id_t = uint32_t; + using CommandId = uint32_t; } diff --git a/tests/unit/mvcc_find_update_common.hpp b/tests/unit/mvcc_find_update_common.hpp index f1981c077..d92060c53 100644 --- a/tests/unit/mvcc_find_update_common.hpp +++ b/tests/unit/mvcc_find_update_common.hpp @@ -63,7 +63,7 @@ class Mvcc : public ::testing::Test { mvcc::VersionList version_list{*t1, 0, version_list_size}; TestClass *v1 = nullptr; tx::Transaction *t2 = nullptr; - tx::transaction_id_t id0, id1, id2; + tx::TransactionId id0, id1, id2; }; // helper macros. important: diff --git a/tests/unit/transaction_engine_distributed.cpp b/tests/unit/transaction_engine_distributed.cpp index bdab4d609..22b241e78 100644 --- a/tests/unit/transaction_engine_distributed.cpp +++ b/tests/unit/transaction_engine_distributed.cpp @@ -69,7 +69,7 @@ TEST_F(WorkerEngineTest, RunningTransaction) { ++count; if (t.id_ == 1) { EXPECT_EQ(t.snapshot(), - tx::Snapshot(std::vector{})); + tx::Snapshot(std::vector{})); } else { EXPECT_EQ(t.snapshot(), tx::Snapshot({1})); } @@ -129,10 +129,10 @@ TEST_F(WorkerEngineTest, LocalForEachActiveTransaction) { master_.Begin(); master_.Begin(); worker_.RunningTransaction(4); - std::unordered_set local; + std::unordered_set local; worker_.LocalForEachActiveTransaction( [&local](Transaction &t) { local.insert(t.id_); }); - EXPECT_EQ(local, std::unordered_set({1, 4})); + EXPECT_EQ(local, std::unordered_set({1, 4})); } TEST_F(WorkerEngineTest, EnsureTxIdGreater) { diff --git a/tests/unit/transaction_engine_single_node.cpp b/tests/unit/transaction_engine_single_node.cpp index 1aa223211..2a34897ce 100644 --- a/tests/unit/transaction_engine_single_node.cpp +++ b/tests/unit/transaction_engine_single_node.cpp @@ -53,7 +53,7 @@ TEST(Engine, Advance) { TEST(Engine, ConcurrentBegin) { SingleNodeEngine engine; std::vector threads; - SkipList tx_ids; + SkipList tx_ids; for (int i = 0; i < 10; ++i) { threads.emplace_back([&engine, accessor = tx_ids.access() ]() mutable { for (int j = 0; j < 100; ++j) {