From 51c9f4d0d354a6661b896c38328b510f0d3f213f Mon Sep 17 00:00:00 2001 From: Vinko Kasljevic Date: Wed, 22 Aug 2018 09:17:22 +0200 Subject: [PATCH] Refactor Cache and DataManager Summary: Instead of DataManager returning Cache which can fetch data when needed, I refactored the code so that the Cache is simple wrapper around unordered_map and the DataManager is one that is fetching data. Also Cache is not visible from outside of the DataManager so we can add LRU policy without changing anything else. Reviewers: msantl, ipaljak, teon.banek, buda Reviewed By: msantl, teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1545 --- src/CMakeLists.txt | 1 - src/database/distributed_graph_db.cpp | 30 +++--- src/database/graph_db.cpp | 2 +- src/database/indexes/index_common.hpp | 2 +- src/distributed/bfs_rpc_clients.cpp | 11 +- src/distributed/cache.hpp | 75 ++++++------- src/distributed/data_manager.cpp | 97 +++++++++++------ src/distributed/data_manager.hpp | 102 ++++++++++++++++-- src/distributed/pull_produce_rpc_messages.lcp | 10 +- src/mvcc/version_list.hpp | 14 +-- 10 files changed, 228 insertions(+), 116 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 69564386e..9af0b63c2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -20,7 +20,6 @@ set(memgraph_src_files database/state_delta.cpp distributed/bfs_rpc_clients.cpp distributed/bfs_subcursor.cpp - distributed/cache.cpp distributed/cluster_discovery_master.cpp distributed/cluster_discovery_worker.cpp distributed/coordination.cpp diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index 902ae2a7c..99238d2eb 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -73,7 +73,7 @@ class DistributedRecordAccessor final { auto &dba = record_accessor.db_accessor(); const auto &address = record_accessor.address(); if (record_accessor.is_local()) { - address.local()->find_set_old_new(dba.transaction(), *old, *newr); + address.local()->find_set_old_new(dba.transaction(), old, newr); return; } // It's not possible that we have a global address for a graph element @@ -81,10 +81,8 @@ class DistributedRecordAccessor final { // TODO in write queries it's possible the command has been advanced and // we need to invalidate the Cache and really get the latest stuff. // But only do that after the command has been advanced. - auto &cache = - data_manager_->template Elements(dba.transaction_id()); - cache.FindSetOldNew(dba.transaction().id_, address.worker_id(), - address.gid(), *old, *newr); + data_manager_->FindSetOldNew(dba.transaction_id(), address.worker_id(), + address.gid(), old, newr); } TRecord *FindNew(const RecordAccessor &record_accessor) { @@ -93,9 +91,8 @@ class DistributedRecordAccessor final { if (address.is_local()) { return address.local()->update(dba.transaction()); } - auto &cache = - data_manager_->template Elements(dba.transaction_id()); - return cache.FindNew(address.gid()); + return data_manager_->FindNew(dba.transaction_id(), + address.gid()); } void ProcessDelta(const RecordAccessor &record_accessor, @@ -302,12 +299,11 @@ class DistributedAccessor : public GraphDbAccessor { auto edge_address = updates_clients_->CreateEdge(transaction_id(), *from, *to, edge_type); auto *from_updated = - data_manager_->Elements(transaction_id()).FindNew(from->gid()); + data_manager_->FindNew(transaction_id(), from->gid()); // Create an Edge and insert it into the Cache so we see it locally. - data_manager_->Elements(transaction_id()) - .emplace( - edge_address.gid(), nullptr, - std::make_unique(from->address(), to->address(), edge_type)); + data_manager_->Emplace( + transaction_id(), edge_address.gid(), nullptr, + std::make_unique(from->address(), to->address(), edge_type)); from_updated->out_.emplace( db().storage().LocalizedAddressIfPossible(to->address()), edge_address, edge_type); @@ -329,7 +325,7 @@ class DistributedAccessor : public GraphDbAccessor { db().storage().GlobalizedAddress(edge_address), *to, edge_type); } auto *to_updated = - data_manager_->Elements(transaction_id()).FindNew(to->gid()); + data_manager_->FindNew(transaction_id(), to->gid()); to_updated->in_.emplace( db().storage().LocalizedAddressIfPossible(from->address()), edge_address, edge_type); @@ -621,7 +617,8 @@ Master::Master(Config config) impl_->coordination_.CommonWalTransactions(*recovery_info); DistributedRecoveryTransanctions recovery_transactions(this); durability::RecoverWalAndIndexes(impl_->config_.durability_directory, - this, &recovery_data, &recovery_transactions); + this, &recovery_data, + &recovery_transactions); auto workers_recovered_wal = impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data); workers_recovered_wal.get(); @@ -811,8 +808,7 @@ VertexAccessor InsertVertexIntoRemote( auto vertex = std::make_unique(); vertex->labels_ = labels; for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second); - data_manager->Elements(dba->transaction_id()) - .emplace(gid, nullptr, std::move(vertex)); + data_manager->Emplace(dba->transaction_id(), gid, nullptr, std::move(vertex)); return VertexAccessor({gid, worker_id}, *dba); } diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index b48458ffc..7eb05a237 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -38,7 +38,7 @@ class SingleNodeRecordAccessor final { auto &dba = record_accessor.db_accessor(); const auto &address = record_accessor.address(); CHECK(record_accessor.is_local()); - address.local()->find_set_old_new(dba.transaction(), *old, *newr); + address.local()->find_set_old_new(dba.transaction(), old, newr); } TRecord *FindNew(const RecordAccessor &record_accessor) { diff --git a/src/database/indexes/index_common.hpp b/src/database/indexes/index_common.hpp index 65485467a..de8de3ad7 100644 --- a/src/database/indexes/index_common.hpp +++ b/src/database/indexes/index_common.hpp @@ -99,7 +99,7 @@ static auto GetVlists( // TODO when refactoring MVCC reconsider the return-value-arg idiom // here TRecord *old_record, *new_record; - entry.vlist_->find_set_old_new(t, old_record, new_record); + entry.vlist_->find_set_old_new(t, &old_record, &new_record); // filtering out records not visible to the current // transaction+command // taking into account the current_state flag diff --git a/src/distributed/bfs_rpc_clients.cpp b/src/distributed/bfs_rpc_clients.cpp index 5f013f6da..9c2b00251 100644 --- a/src/distributed/bfs_rpc_clients.cpp +++ b/src/distributed/bfs_rpc_clients.cpp @@ -82,10 +82,10 @@ std::experimental::optional BfsRpcClients::Pull( CHECK(res) << "SubcursorPull RPC failed!"; if (!res->vertex) return std::experimental::nullopt; - data_manager_->Elements(dba->transaction_id()) - .emplace(res->vertex->global_address.gid(), - std::move(res->vertex->old_element_output), - std::move(res->vertex->new_element_output)); + data_manager_->Emplace(dba->transaction_id(), + res->vertex->global_address.gid(), + std::move(res->vertex->old_element_output), + std::move(res->vertex->new_element_output)); return VertexAccessor(res->vertex->global_address, *dba); } bool BfsRpcClients::ExpandLevel( @@ -140,8 +140,7 @@ PathSegment BuildPathSegment(ReconstructPathRes *res, distributed::DataManager *data_manager) { std::vector edges; for (auto &edge : res->edges) { - data_manager->Elements(dba->transaction_id()) - .emplace(edge.global_address.gid(), std::move(edge.old_element_output), + data_manager->Emplace(dba->transaction_id(), edge.global_address.gid(), std::move(edge.old_element_output), std::move(edge.new_element_output)); edges.emplace_back(edge.global_address, *dba); } diff --git a/src/distributed/cache.hpp b/src/distributed/cache.hpp index d41eb1ca2..2b46fdb23 100644 --- a/src/distributed/cache.hpp +++ b/src/distributed/cache.hpp @@ -12,51 +12,52 @@ class Storage; namespace distributed { -/** - * Used for caching Vertices and Edges that are stored on another worker in a - * distributed system. Maps global IDs to (old, new) Vertex/Edge pointer - * pairs. It is possible that either "old" or "new" are nullptrs, but at - * least one must be not-null. The Cache is the owner of TRecord - * objects it points to. - * - * @tparam TRecord - Edge or Vertex - */ -template +// TODO Improvements: +// 1) Use combination of std::unoredered_map::iterator +// and std::list>. Use map for quick access and +// checking if TKey exists in map, list for keeping track of LRU order. +// +// 2) Implement adaptive replacement cache policy instead of LRU. +// http://theory.stanford.edu/~megiddo/pdf/IEEE_COMPUTER_0404.pdf/ + +/// Used for caching objects. Uses least recently used page replacement +/// algorithm for evicting elements when maximum size is reached. This class +/// is NOT thread safe. +/// +/// @see ThreadSafeCache +/// @tparam TKey - any object that has hash() defined +/// @tparam TValue - any object +template class Cache { - using rec_uptr = std::unique_ptr; - public: - Cache(database::Storage &storage, distributed::DataRpcClients &data_clients) - : storage_(storage), data_clients_(data_clients) {} + using Iterator = typename std::unordered_map::iterator; - /// Returns the new data for the given ID. Creates it (as copy of old) if - /// necessary. - TRecord *FindNew(gid::Gid gid); + Cache() = default; - /// For the Vertex/Edge with the given global ID, looks for the data visible - /// from the given transaction's ID and command ID, and caches it. Sets the - /// given pointers to point to the fetched data. Analogue to - /// mvcc::VersionList::find_set_old_new. - void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid, - TRecord *&old_record, TRecord *&new_record); + Iterator find(const TKey &key) { return cache_.find(key); } - /// Sets the given records as (new, old) data for the given gid. - void emplace(gid::Gid gid, rec_uptr old_record, rec_uptr new_record); + std::pair emplace(TKey &&key, TValue &&value) { + return cache_.emplace(std::forward(key), std::forward(value)); + } - /// Removes all the data from the cache. - void ClearCache(); + void erase(const TKey &key) { + cache_.erase(key); + } + + Iterator end() { + return cache_.end(); + } + + bool contains(const TKey &key) { + return find(key) != end(); + } + + void clear() { + cache_.clear(); + } private: - database::Storage &storage_; - - std::mutex lock_; - distributed::DataRpcClients &data_clients_; - // TODO it'd be better if we had VertexData and EdgeData in here, as opposed - // to Vertex and Edge. - std::unordered_map> cache_; - - // Localizes all the addresses in the record. - void LocalizeAddresses(TRecord &record); + std::unordered_map cache_; }; } // namespace distributed diff --git a/src/distributed/data_manager.cpp b/src/distributed/data_manager.cpp index 9a619d692..7b82f2354 100644 --- a/src/distributed/data_manager.cpp +++ b/src/distributed/data_manager.cpp @@ -1,54 +1,83 @@ -#include "database/storage.hpp" #include "distributed/data_manager.hpp" +#include "database/storage.hpp" + +namespace { + +template +void ClearCache(TCache &cache, tx::TransactionId tx_id) { + auto access = cache.access(); + auto found = access.find(tx_id); + if (found != access.end()) found->second.clear(); +} + +template +void DeleteOld(TCache &cache, tx::TransactionId oldest_active) { + auto access = cache.access(); + for (auto &kv : access) { + if (kv.first < oldest_active) { + access.remove(kv.first); + } + } +} + +} // anonymous namespace namespace distributed { -template -Cache &DataManager::GetCache(CacheT &collection, - tx::TransactionId tx_id) { - auto access = collection.access(); - auto found = access.find(tx_id); - if (found != access.end()) return found->second; - - return access - .emplace( - tx_id, std::make_tuple(tx_id), - std::make_tuple(std::ref(db_.storage()), std::ref(data_clients_))) - .first->second; +template <> +DataManager::CacheT &DataManager::caches() { + return vertices_caches_; } template <> -Cache &DataManager::Elements(tx::TransactionId tx_id) { - return GetCache(vertices_caches_, tx_id); -} - -template <> -Cache &DataManager::Elements(tx::TransactionId tx_id) { - return GetCache(edges_caches_, tx_id); +DataManager::CacheT &DataManager::caches() { + return edges_caches_; } DataManager::DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients) : db_(db), data_clients_(data_clients) {} +std::mutex &DataManager::GetLock(tx::TransactionId tx_id) { + auto accessor = lock_store_.access(); + auto found = accessor.find(tx_id); + if (found != accessor.end()) return found->second; + + // By passing empty tuple default constructor is used + // and std::mutex is created in ConcurrentMap. + return accessor.emplace(tx_id, std::make_tuple(tx_id), std::make_tuple()) + .first->second; +} + +template <> +void DataManager::LocalizeAddresses(Vertex &vertex) { + auto localize_edges = [this](auto &edges) { + for (auto &element : edges) { + element.vertex = db_.storage().LocalizedAddressIfPossible(element.vertex); + element.edge = db_.storage().LocalizedAddressIfPossible(element.edge); + } + }; + + localize_edges(vertex.in_.storage()); + localize_edges(vertex.out_.storage()); +} + +template <> +void DataManager::LocalizeAddresses(Edge &edge) { + edge.from_ = db_.storage().LocalizedAddressIfPossible(edge.from_); + edge.to_ = db_.storage().LocalizedAddressIfPossible(edge.to_); +} + void DataManager::ClearCacheForSingleTransaction(tx::TransactionId tx_id) { - Elements(tx_id).ClearCache(); - Elements(tx_id).ClearCache(); + ClearCache(vertices_caches_, tx_id); + ClearCache(edges_caches_, tx_id); } void DataManager::ClearTransactionalCache(tx::TransactionId oldest_active) { - auto vertex_access = vertices_caches_.access(); - for (auto &kv : vertex_access) { - if (kv.first < oldest_active) { - vertex_access.remove(kv.first); - } - } - auto edge_access = edges_caches_.access(); - for (auto &kv : edge_access) { - if (kv.first < oldest_active) { - edge_access.remove(kv.first); - } - } + DeleteOld(vertices_caches_, oldest_active); + DeleteOld(edges_caches_, oldest_active); + DeleteOld(lock_store_, oldest_active); } } // namespace distributed + diff --git a/src/distributed/data_manager.hpp b/src/distributed/data_manager.hpp index 4f2888ac2..f0af9275c 100644 --- a/src/distributed/data_manager.hpp +++ b/src/distributed/data_manager.hpp @@ -14,19 +14,90 @@ namespace distributed { /// Handles remote data caches for edges and vertices, per transaction. class DataManager { template - using CacheT = ConcurrentMap>; + using CacheG = + Cache, std::unique_ptr>>; - // Helper, gets or inserts a data cache for the given transaction. template - Cache &GetCache(CacheT &collection, - tx::TransactionId tx_id); + using CacheT = ConcurrentMap>; public: DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients); - /// Gets or creates the remote vertex/edge cache for the given transaction. + /// Returns the new data for the given ID. Creates it (as copy of old) if + /// necessary. template - Cache &Elements(tx::TransactionId tx_id); + TRecord *FindNew(tx::TransactionId tx_id, gid::Gid gid) { + auto &cache = GetCache(tx_id); + + std::lock_guard guard(GetLock(tx_id)); + auto found = cache.find(gid); + DCHECK(found != cache.end()) + << "FindNew is called on uninitialized remote Vertex/Edge"; + + auto &pair = found->second; + if (!pair.second) { + pair.second = std::unique_ptr(pair.first->CloneData()); + } + + return pair.second.get(); + } + + /// For the Vertex/Edge with the given global ID, looks for the data visible + /// from the given transaction's ID and command ID, and caches it. Sets the + /// given pointers to point to the fetched data. Analogue to + /// mvcc::VersionList::find_set_old_new. + template + void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid, + TRecord **old_record, TRecord **new_record) { + auto &cache = GetCache(tx_id); + auto &lock = GetLock(tx_id); + + { + std::lock_guard guard(lock); + auto found = cache.find(gid); + if (found != cache.end()) { + *old_record = found->second.first.get(); + *new_record = found->second.second.get(); + return; + } + } + + auto remote = data_clients_.RemoteElement(worker_id, tx_id, gid); + LocalizeAddresses(*remote); + + // This logic is a bit strange because we need to make sure that someone + // else didn't get a response and updated the cache before we did and we + // need a lock for that, but we also need to check if we can now return + // that result - otherwise we could get incosistent results for remote + // FindSetOldNew + std::lock_guard guard(lock); + auto it_pair = cache.emplace(std::move(gid), + std::make_pair(std::move(remote), nullptr)); + + *old_record = it_pair.first->second.first.get(); + *new_record = it_pair.first->second.second.get(); + } + + /// Sets the given records as (new, old) data for the given gid. + template + void Emplace(tx::TransactionId tx_id, gid::Gid gid, + std::unique_ptr old_record, + std::unique_ptr new_record) { + + if (old_record) LocalizeAddresses(*old_record); + if (new_record) LocalizeAddresses(*new_record); + + std::lock_guard guard(GetLock(tx_id)); + // We can't replace existing data because some accessors might be using + // it. + // TODO - consider if it's necessary and OK to copy just the data content. + auto &cache = GetCache(tx_id); + auto found = cache.find(gid); + if (found == cache.end()) + cache.emplace(std::move(gid), std::make_pair(std::move(old_record), + std::move(new_record))); + } /// Removes all the caches for a single transaction. void ClearCacheForSingleTransaction(tx::TransactionId tx_id); @@ -36,8 +107,27 @@ class DataManager { void ClearTransactionalCache(tx::TransactionId oldest_active); private: + template + void LocalizeAddresses(TRecord &record); + + template + CacheG &GetCache(tx::TransactionId tx_id) { + auto accessor = caches().access(); + auto found = accessor.find(tx_id); + if (found != accessor.end()) return found->second; + + return accessor.emplace(tx_id, std::make_tuple(tx_id), std::make_tuple()) + .first->second; + } + + std::mutex &GetLock(tx::TransactionId tx_id); + + template + CacheT &caches(); + database::GraphDb &db_; DataRpcClients &data_clients_; + ConcurrentMap lock_store_; CacheT vertices_caches_; CacheT edges_caches_; }; diff --git a/src/distributed/pull_produce_rpc_messages.lcp b/src/distributed/pull_produce_rpc_messages.lcp index 7dfcde5dc..32d756048 100644 --- a/src/distributed/pull_produce_rpc_messages.lcp +++ b/src/distributed/pull_produce_rpc_messages.lcp @@ -262,9 +262,8 @@ void PullResData::LoadGraphElement( vertex_reader.hasNew() ? distributed::LoadVertex(vertex_reader.getNew()) : nullptr; - data_manager->Elements(dba->transaction_id()) - .emplace(global_address.gid(), std::move(old_record), - std::move(new_record)); + data_manager->Emplace(dba->transaction_id(), global_address.gid(), + std::move(old_record), std::move(new_record)); return VertexAccessor(global_address, *dba); }; auto load_edge = [dba, data_manager](const auto &edge_reader) { @@ -277,9 +276,8 @@ void PullResData::LoadGraphElement( edge_reader.hasNew() ? distributed::LoadEdge(edge_reader.getNew()) : nullptr; - data_manager->Elements(dba->transaction_id()) - .emplace(global_address.gid(), std::move(old_record), - std::move(new_record)); + data_manager->Emplace(dba->transaction_id(), global_address.gid(), + std::move(old_record), std::move(new_record)); return EdgeAccessor(global_address, *dba); }; switch (reader.which()) { diff --git a/src/mvcc/version_list.hpp b/src/mvcc/version_list.hpp index 73d186621..1cad4fe78 100644 --- a/src/mvcc/version_list.hpp +++ b/src/mvcc/version_list.hpp @@ -181,14 +181,14 @@ class VersionList { * * @param t The transaction */ - void find_set_old_new(const tx::Transaction &t, T *&old_ref, T *&new_ref) { + void find_set_old_new(const tx::Transaction &t, T **old_ref, T **new_ref) { // assume that the sought old record is further down the list // from new record, so that if we found old we can stop looking - new_ref = nullptr; - old_ref = head_; - while (old_ref != nullptr && !old_ref->visible(t)) { - if (!new_ref && old_ref->is_created_by(t)) new_ref = old_ref; - old_ref = old_ref->next(std::memory_order_seq_cst); + *new_ref = nullptr; + *old_ref = head_; + while (*old_ref != nullptr && !(*old_ref)->visible(t)) { + if (!*new_ref && (*old_ref)->is_created_by(t)) *new_ref = *old_ref; + *old_ref = (*old_ref)->next(std::memory_order_seq_cst); } } @@ -205,7 +205,7 @@ class VersionList { DCHECK(head_ != nullptr) << "Head is nullptr on update."; T *old_record = nullptr; T *new_record = nullptr; - find_set_old_new(t, old_record, new_record); + find_set_old_new(t, &old_record, &new_record); // check if current transaction in current cmd has // already updated version list