Refactor Cache and DataManager
Summary: Instead of DataManager returning Cache which can fetch data when needed, I refactored the code so that the Cache is simple wrapper around unordered_map and the DataManager is one that is fetching data. Also Cache is not visible from outside of the DataManager so we can add LRU policy without changing anything else. Reviewers: msantl, ipaljak, teon.banek, buda Reviewed By: msantl, teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1545
This commit is contained in:
parent
f62d764649
commit
51c9f4d0d3
@ -20,7 +20,6 @@ set(memgraph_src_files
|
|||||||
database/state_delta.cpp
|
database/state_delta.cpp
|
||||||
distributed/bfs_rpc_clients.cpp
|
distributed/bfs_rpc_clients.cpp
|
||||||
distributed/bfs_subcursor.cpp
|
distributed/bfs_subcursor.cpp
|
||||||
distributed/cache.cpp
|
|
||||||
distributed/cluster_discovery_master.cpp
|
distributed/cluster_discovery_master.cpp
|
||||||
distributed/cluster_discovery_worker.cpp
|
distributed/cluster_discovery_worker.cpp
|
||||||
distributed/coordination.cpp
|
distributed/coordination.cpp
|
||||||
|
@ -73,7 +73,7 @@ class DistributedRecordAccessor final {
|
|||||||
auto &dba = record_accessor.db_accessor();
|
auto &dba = record_accessor.db_accessor();
|
||||||
const auto &address = record_accessor.address();
|
const auto &address = record_accessor.address();
|
||||||
if (record_accessor.is_local()) {
|
if (record_accessor.is_local()) {
|
||||||
address.local()->find_set_old_new(dba.transaction(), *old, *newr);
|
address.local()->find_set_old_new(dba.transaction(), old, newr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// It's not possible that we have a global address for a graph element
|
// It's not possible that we have a global address for a graph element
|
||||||
@ -81,10 +81,8 @@ class DistributedRecordAccessor final {
|
|||||||
// TODO in write queries it's possible the command has been advanced and
|
// TODO in write queries it's possible the command has been advanced and
|
||||||
// we need to invalidate the Cache and really get the latest stuff.
|
// we need to invalidate the Cache and really get the latest stuff.
|
||||||
// But only do that after the command has been advanced.
|
// But only do that after the command has been advanced.
|
||||||
auto &cache =
|
data_manager_->FindSetOldNew(dba.transaction_id(), address.worker_id(),
|
||||||
data_manager_->template Elements<TRecord>(dba.transaction_id());
|
address.gid(), old, newr);
|
||||||
cache.FindSetOldNew(dba.transaction().id_, address.worker_id(),
|
|
||||||
address.gid(), *old, *newr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TRecord *FindNew(const RecordAccessor<TRecord> &record_accessor) {
|
TRecord *FindNew(const RecordAccessor<TRecord> &record_accessor) {
|
||||||
@ -93,9 +91,8 @@ class DistributedRecordAccessor final {
|
|||||||
if (address.is_local()) {
|
if (address.is_local()) {
|
||||||
return address.local()->update(dba.transaction());
|
return address.local()->update(dba.transaction());
|
||||||
}
|
}
|
||||||
auto &cache =
|
return data_manager_->FindNew<TRecord>(dba.transaction_id(),
|
||||||
data_manager_->template Elements<TRecord>(dba.transaction_id());
|
address.gid());
|
||||||
return cache.FindNew(address.gid());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ProcessDelta(const RecordAccessor<TRecord> &record_accessor,
|
void ProcessDelta(const RecordAccessor<TRecord> &record_accessor,
|
||||||
@ -302,12 +299,11 @@ class DistributedAccessor : public GraphDbAccessor {
|
|||||||
auto edge_address =
|
auto edge_address =
|
||||||
updates_clients_->CreateEdge(transaction_id(), *from, *to, edge_type);
|
updates_clients_->CreateEdge(transaction_id(), *from, *to, edge_type);
|
||||||
auto *from_updated =
|
auto *from_updated =
|
||||||
data_manager_->Elements<Vertex>(transaction_id()).FindNew(from->gid());
|
data_manager_->FindNew<Vertex>(transaction_id(), from->gid());
|
||||||
// Create an Edge and insert it into the Cache so we see it locally.
|
// Create an Edge and insert it into the Cache so we see it locally.
|
||||||
data_manager_->Elements<Edge>(transaction_id())
|
data_manager_->Emplace<Edge>(
|
||||||
.emplace(
|
transaction_id(), edge_address.gid(), nullptr,
|
||||||
edge_address.gid(), nullptr,
|
std::make_unique<Edge>(from->address(), to->address(), edge_type));
|
||||||
std::make_unique<Edge>(from->address(), to->address(), edge_type));
|
|
||||||
from_updated->out_.emplace(
|
from_updated->out_.emplace(
|
||||||
db().storage().LocalizedAddressIfPossible(to->address()), edge_address,
|
db().storage().LocalizedAddressIfPossible(to->address()), edge_address,
|
||||||
edge_type);
|
edge_type);
|
||||||
@ -329,7 +325,7 @@ class DistributedAccessor : public GraphDbAccessor {
|
|||||||
db().storage().GlobalizedAddress(edge_address), *to, edge_type);
|
db().storage().GlobalizedAddress(edge_address), *to, edge_type);
|
||||||
}
|
}
|
||||||
auto *to_updated =
|
auto *to_updated =
|
||||||
data_manager_->Elements<Vertex>(transaction_id()).FindNew(to->gid());
|
data_manager_->FindNew<Vertex>(transaction_id(), to->gid());
|
||||||
to_updated->in_.emplace(
|
to_updated->in_.emplace(
|
||||||
db().storage().LocalizedAddressIfPossible(from->address()),
|
db().storage().LocalizedAddressIfPossible(from->address()),
|
||||||
edge_address, edge_type);
|
edge_address, edge_type);
|
||||||
@ -621,7 +617,8 @@ Master::Master(Config config)
|
|||||||
impl_->coordination_.CommonWalTransactions(*recovery_info);
|
impl_->coordination_.CommonWalTransactions(*recovery_info);
|
||||||
DistributedRecoveryTransanctions recovery_transactions(this);
|
DistributedRecoveryTransanctions recovery_transactions(this);
|
||||||
durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
|
durability::RecoverWalAndIndexes(impl_->config_.durability_directory,
|
||||||
this, &recovery_data, &recovery_transactions);
|
this, &recovery_data,
|
||||||
|
&recovery_transactions);
|
||||||
auto workers_recovered_wal =
|
auto workers_recovered_wal =
|
||||||
impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data);
|
impl_->durability_rpc_.RecoverWalAndIndexes(&recovery_data);
|
||||||
workers_recovered_wal.get();
|
workers_recovered_wal.get();
|
||||||
@ -811,8 +808,7 @@ VertexAccessor InsertVertexIntoRemote(
|
|||||||
auto vertex = std::make_unique<Vertex>();
|
auto vertex = std::make_unique<Vertex>();
|
||||||
vertex->labels_ = labels;
|
vertex->labels_ = labels;
|
||||||
for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second);
|
for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second);
|
||||||
data_manager->Elements<Vertex>(dba->transaction_id())
|
data_manager->Emplace<Vertex>(dba->transaction_id(), gid, nullptr, std::move(vertex));
|
||||||
.emplace(gid, nullptr, std::move(vertex));
|
|
||||||
return VertexAccessor({gid, worker_id}, *dba);
|
return VertexAccessor({gid, worker_id}, *dba);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ class SingleNodeRecordAccessor final {
|
|||||||
auto &dba = record_accessor.db_accessor();
|
auto &dba = record_accessor.db_accessor();
|
||||||
const auto &address = record_accessor.address();
|
const auto &address = record_accessor.address();
|
||||||
CHECK(record_accessor.is_local());
|
CHECK(record_accessor.is_local());
|
||||||
address.local()->find_set_old_new(dba.transaction(), *old, *newr);
|
address.local()->find_set_old_new(dba.transaction(), old, newr);
|
||||||
}
|
}
|
||||||
|
|
||||||
TRecord *FindNew(const RecordAccessor<TRecord> &record_accessor) {
|
TRecord *FindNew(const RecordAccessor<TRecord> &record_accessor) {
|
||||||
|
@ -99,7 +99,7 @@ static auto GetVlists(
|
|||||||
// TODO when refactoring MVCC reconsider the return-value-arg idiom
|
// TODO when refactoring MVCC reconsider the return-value-arg idiom
|
||||||
// here
|
// here
|
||||||
TRecord *old_record, *new_record;
|
TRecord *old_record, *new_record;
|
||||||
entry.vlist_->find_set_old_new(t, old_record, new_record);
|
entry.vlist_->find_set_old_new(t, &old_record, &new_record);
|
||||||
// filtering out records not visible to the current
|
// filtering out records not visible to the current
|
||||||
// transaction+command
|
// transaction+command
|
||||||
// taking into account the current_state flag
|
// taking into account the current_state flag
|
||||||
|
@ -82,10 +82,10 @@ std::experimental::optional<VertexAccessor> BfsRpcClients::Pull(
|
|||||||
CHECK(res) << "SubcursorPull RPC failed!";
|
CHECK(res) << "SubcursorPull RPC failed!";
|
||||||
if (!res->vertex) return std::experimental::nullopt;
|
if (!res->vertex) return std::experimental::nullopt;
|
||||||
|
|
||||||
data_manager_->Elements<Vertex>(dba->transaction_id())
|
data_manager_->Emplace<Vertex>(dba->transaction_id(),
|
||||||
.emplace(res->vertex->global_address.gid(),
|
res->vertex->global_address.gid(),
|
||||||
std::move(res->vertex->old_element_output),
|
std::move(res->vertex->old_element_output),
|
||||||
std::move(res->vertex->new_element_output));
|
std::move(res->vertex->new_element_output));
|
||||||
return VertexAccessor(res->vertex->global_address, *dba);
|
return VertexAccessor(res->vertex->global_address, *dba);
|
||||||
}
|
}
|
||||||
bool BfsRpcClients::ExpandLevel(
|
bool BfsRpcClients::ExpandLevel(
|
||||||
@ -140,8 +140,7 @@ PathSegment BuildPathSegment(ReconstructPathRes *res,
|
|||||||
distributed::DataManager *data_manager) {
|
distributed::DataManager *data_manager) {
|
||||||
std::vector<EdgeAccessor> edges;
|
std::vector<EdgeAccessor> edges;
|
||||||
for (auto &edge : res->edges) {
|
for (auto &edge : res->edges) {
|
||||||
data_manager->Elements<Edge>(dba->transaction_id())
|
data_manager->Emplace<Edge>(dba->transaction_id(), edge.global_address.gid(), std::move(edge.old_element_output),
|
||||||
.emplace(edge.global_address.gid(), std::move(edge.old_element_output),
|
|
||||||
std::move(edge.new_element_output));
|
std::move(edge.new_element_output));
|
||||||
edges.emplace_back(edge.global_address, *dba);
|
edges.emplace_back(edge.global_address, *dba);
|
||||||
}
|
}
|
||||||
|
@ -12,51 +12,52 @@ class Storage;
|
|||||||
|
|
||||||
namespace distributed {
|
namespace distributed {
|
||||||
|
|
||||||
/**
|
// TODO Improvements:
|
||||||
* Used for caching Vertices and Edges that are stored on another worker in a
|
// 1) Use combination of std::unoredered_map<TKey, list<...>::iterator
|
||||||
* distributed system. Maps global IDs to (old, new) Vertex/Edge pointer
|
// and std::list<std::pair<TKey, TValue>>. Use map for quick access and
|
||||||
* pairs. It is possible that either "old" or "new" are nullptrs, but at
|
// checking if TKey exists in map, list for keeping track of LRU order.
|
||||||
* least one must be not-null. The Cache is the owner of TRecord
|
//
|
||||||
* objects it points to.
|
// 2) Implement adaptive replacement cache policy instead of LRU.
|
||||||
*
|
// http://theory.stanford.edu/~megiddo/pdf/IEEE_COMPUTER_0404.pdf/
|
||||||
* @tparam TRecord - Edge or Vertex
|
|
||||||
*/
|
/// Used for caching objects. Uses least recently used page replacement
|
||||||
template <typename TRecord>
|
/// algorithm for evicting elements when maximum size is reached. This class
|
||||||
|
/// is NOT thread safe.
|
||||||
|
///
|
||||||
|
/// @see ThreadSafeCache
|
||||||
|
/// @tparam TKey - any object that has hash() defined
|
||||||
|
/// @tparam TValue - any object
|
||||||
|
template <typename TKey, typename TValue>
|
||||||
class Cache {
|
class Cache {
|
||||||
using rec_uptr = std::unique_ptr<TRecord>;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Cache(database::Storage &storage, distributed::DataRpcClients &data_clients)
|
using Iterator = typename std::unordered_map<TKey, TValue>::iterator;
|
||||||
: storage_(storage), data_clients_(data_clients) {}
|
|
||||||
|
|
||||||
/// Returns the new data for the given ID. Creates it (as copy of old) if
|
Cache() = default;
|
||||||
/// necessary.
|
|
||||||
TRecord *FindNew(gid::Gid gid);
|
|
||||||
|
|
||||||
/// For the Vertex/Edge with the given global ID, looks for the data visible
|
Iterator find(const TKey &key) { return cache_.find(key); }
|
||||||
/// from the given transaction's ID and command ID, and caches it. Sets the
|
|
||||||
/// given pointers to point to the fetched data. Analogue to
|
|
||||||
/// mvcc::VersionList::find_set_old_new.
|
|
||||||
void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid,
|
|
||||||
TRecord *&old_record, TRecord *&new_record);
|
|
||||||
|
|
||||||
/// Sets the given records as (new, old) data for the given gid.
|
std::pair<Iterator, bool> emplace(TKey &&key, TValue &&value) {
|
||||||
void emplace(gid::Gid gid, rec_uptr old_record, rec_uptr new_record);
|
return cache_.emplace(std::forward<TKey>(key), std::forward<TValue>(value));
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes all the data from the cache.
|
void erase(const TKey &key) {
|
||||||
void ClearCache();
|
cache_.erase(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator end() {
|
||||||
|
return cache_.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool contains(const TKey &key) {
|
||||||
|
return find(key) != end();
|
||||||
|
}
|
||||||
|
|
||||||
|
void clear() {
|
||||||
|
cache_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
database::Storage &storage_;
|
std::unordered_map<TKey, TValue> cache_;
|
||||||
|
|
||||||
std::mutex lock_;
|
|
||||||
distributed::DataRpcClients &data_clients_;
|
|
||||||
// TODO it'd be better if we had VertexData and EdgeData in here, as opposed
|
|
||||||
// to Vertex and Edge.
|
|
||||||
std::unordered_map<gid::Gid, std::pair<rec_uptr, rec_uptr>> cache_;
|
|
||||||
|
|
||||||
// Localizes all the addresses in the record.
|
|
||||||
void LocalizeAddresses(TRecord &record);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace distributed
|
} // namespace distributed
|
||||||
|
@ -1,54 +1,83 @@
|
|||||||
#include "database/storage.hpp"
|
|
||||||
#include "distributed/data_manager.hpp"
|
#include "distributed/data_manager.hpp"
|
||||||
|
#include "database/storage.hpp"
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
template <typename TCache>
|
||||||
|
void ClearCache(TCache &cache, tx::TransactionId tx_id) {
|
||||||
|
auto access = cache.access();
|
||||||
|
auto found = access.find(tx_id);
|
||||||
|
if (found != access.end()) found->second.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename TCache>
|
||||||
|
void DeleteOld(TCache &cache, tx::TransactionId oldest_active) {
|
||||||
|
auto access = cache.access();
|
||||||
|
for (auto &kv : access) {
|
||||||
|
if (kv.first < oldest_active) {
|
||||||
|
access.remove(kv.first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // anonymous namespace
|
||||||
|
|
||||||
namespace distributed {
|
namespace distributed {
|
||||||
|
|
||||||
template <typename TRecord>
|
template <>
|
||||||
Cache<TRecord> &DataManager::GetCache(CacheT<TRecord> &collection,
|
DataManager::CacheT<Vertex> &DataManager::caches<Vertex>() {
|
||||||
tx::TransactionId tx_id) {
|
return vertices_caches_;
|
||||||
auto access = collection.access();
|
|
||||||
auto found = access.find(tx_id);
|
|
||||||
if (found != access.end()) return found->second;
|
|
||||||
|
|
||||||
return access
|
|
||||||
.emplace(
|
|
||||||
tx_id, std::make_tuple(tx_id),
|
|
||||||
std::make_tuple(std::ref(db_.storage()), std::ref(data_clients_)))
|
|
||||||
.first->second;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
Cache<Vertex> &DataManager::Elements<Vertex>(tx::TransactionId tx_id) {
|
DataManager::CacheT<Edge> &DataManager::caches<Edge>() {
|
||||||
return GetCache(vertices_caches_, tx_id);
|
return edges_caches_;
|
||||||
}
|
|
||||||
|
|
||||||
template <>
|
|
||||||
Cache<Edge> &DataManager::Elements<Edge>(tx::TransactionId tx_id) {
|
|
||||||
return GetCache(edges_caches_, tx_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DataManager::DataManager(database::GraphDb &db,
|
DataManager::DataManager(database::GraphDb &db,
|
||||||
distributed::DataRpcClients &data_clients)
|
distributed::DataRpcClients &data_clients)
|
||||||
: db_(db), data_clients_(data_clients) {}
|
: db_(db), data_clients_(data_clients) {}
|
||||||
|
|
||||||
|
std::mutex &DataManager::GetLock(tx::TransactionId tx_id) {
|
||||||
|
auto accessor = lock_store_.access();
|
||||||
|
auto found = accessor.find(tx_id);
|
||||||
|
if (found != accessor.end()) return found->second;
|
||||||
|
|
||||||
|
// By passing empty tuple default constructor is used
|
||||||
|
// and std::mutex is created in ConcurrentMap.
|
||||||
|
return accessor.emplace(tx_id, std::make_tuple(tx_id), std::make_tuple())
|
||||||
|
.first->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
void DataManager::LocalizeAddresses<Vertex>(Vertex &vertex) {
|
||||||
|
auto localize_edges = [this](auto &edges) {
|
||||||
|
for (auto &element : edges) {
|
||||||
|
element.vertex = db_.storage().LocalizedAddressIfPossible(element.vertex);
|
||||||
|
element.edge = db_.storage().LocalizedAddressIfPossible(element.edge);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
localize_edges(vertex.in_.storage());
|
||||||
|
localize_edges(vertex.out_.storage());
|
||||||
|
}
|
||||||
|
|
||||||
|
template <>
|
||||||
|
void DataManager::LocalizeAddresses(Edge &edge) {
|
||||||
|
edge.from_ = db_.storage().LocalizedAddressIfPossible(edge.from_);
|
||||||
|
edge.to_ = db_.storage().LocalizedAddressIfPossible(edge.to_);
|
||||||
|
}
|
||||||
|
|
||||||
void DataManager::ClearCacheForSingleTransaction(tx::TransactionId tx_id) {
|
void DataManager::ClearCacheForSingleTransaction(tx::TransactionId tx_id) {
|
||||||
Elements<Vertex>(tx_id).ClearCache();
|
ClearCache(vertices_caches_, tx_id);
|
||||||
Elements<Edge>(tx_id).ClearCache();
|
ClearCache(edges_caches_, tx_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
void DataManager::ClearTransactionalCache(tx::TransactionId oldest_active) {
|
void DataManager::ClearTransactionalCache(tx::TransactionId oldest_active) {
|
||||||
auto vertex_access = vertices_caches_.access();
|
DeleteOld(vertices_caches_, oldest_active);
|
||||||
for (auto &kv : vertex_access) {
|
DeleteOld(edges_caches_, oldest_active);
|
||||||
if (kv.first < oldest_active) {
|
DeleteOld(lock_store_, oldest_active);
|
||||||
vertex_access.remove(kv.first);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
auto edge_access = edges_caches_.access();
|
|
||||||
for (auto &kv : edge_access) {
|
|
||||||
if (kv.first < oldest_active) {
|
|
||||||
edge_access.remove(kv.first);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace distributed
|
} // namespace distributed
|
||||||
|
|
||||||
|
@ -14,19 +14,90 @@ namespace distributed {
|
|||||||
/// Handles remote data caches for edges and vertices, per transaction.
|
/// Handles remote data caches for edges and vertices, per transaction.
|
||||||
class DataManager {
|
class DataManager {
|
||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
using CacheT = ConcurrentMap<tx::TransactionId, Cache<TRecord>>;
|
using CacheG =
|
||||||
|
Cache<gid::Gid,
|
||||||
|
std::pair<std::unique_ptr<TRecord>, std::unique_ptr<TRecord>>>;
|
||||||
|
|
||||||
// Helper, gets or inserts a data cache for the given transaction.
|
|
||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
Cache<TRecord> &GetCache(CacheT<TRecord> &collection,
|
using CacheT = ConcurrentMap<tx::TransactionId, CacheG<TRecord>>;
|
||||||
tx::TransactionId tx_id);
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients);
|
DataManager(database::GraphDb &db, distributed::DataRpcClients &data_clients);
|
||||||
|
|
||||||
/// Gets or creates the remote vertex/edge cache for the given transaction.
|
/// Returns the new data for the given ID. Creates it (as copy of old) if
|
||||||
|
/// necessary.
|
||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
Cache<TRecord> &Elements(tx::TransactionId tx_id);
|
TRecord *FindNew(tx::TransactionId tx_id, gid::Gid gid) {
|
||||||
|
auto &cache = GetCache<TRecord>(tx_id);
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> guard(GetLock(tx_id));
|
||||||
|
auto found = cache.find(gid);
|
||||||
|
DCHECK(found != cache.end())
|
||||||
|
<< "FindNew is called on uninitialized remote Vertex/Edge";
|
||||||
|
|
||||||
|
auto &pair = found->second;
|
||||||
|
if (!pair.second) {
|
||||||
|
pair.second = std::unique_ptr<TRecord>(pair.first->CloneData());
|
||||||
|
}
|
||||||
|
|
||||||
|
return pair.second.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// For the Vertex/Edge with the given global ID, looks for the data visible
|
||||||
|
/// from the given transaction's ID and command ID, and caches it. Sets the
|
||||||
|
/// given pointers to point to the fetched data. Analogue to
|
||||||
|
/// mvcc::VersionList::find_set_old_new.
|
||||||
|
template <typename TRecord>
|
||||||
|
void FindSetOldNew(tx::TransactionId tx_id, int worker_id, gid::Gid gid,
|
||||||
|
TRecord **old_record, TRecord **new_record) {
|
||||||
|
auto &cache = GetCache<TRecord>(tx_id);
|
||||||
|
auto &lock = GetLock(tx_id);
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard(lock);
|
||||||
|
auto found = cache.find(gid);
|
||||||
|
if (found != cache.end()) {
|
||||||
|
*old_record = found->second.first.get();
|
||||||
|
*new_record = found->second.second.get();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto remote = data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
|
||||||
|
LocalizeAddresses(*remote);
|
||||||
|
|
||||||
|
// This logic is a bit strange because we need to make sure that someone
|
||||||
|
// else didn't get a response and updated the cache before we did and we
|
||||||
|
// need a lock for that, but we also need to check if we can now return
|
||||||
|
// that result - otherwise we could get incosistent results for remote
|
||||||
|
// FindSetOldNew
|
||||||
|
std::lock_guard<std::mutex> guard(lock);
|
||||||
|
auto it_pair = cache.emplace(std::move(gid),
|
||||||
|
std::make_pair(std::move(remote), nullptr));
|
||||||
|
|
||||||
|
*old_record = it_pair.first->second.first.get();
|
||||||
|
*new_record = it_pair.first->second.second.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets the given records as (new, old) data for the given gid.
|
||||||
|
template <typename TRecord>
|
||||||
|
void Emplace(tx::TransactionId tx_id, gid::Gid gid,
|
||||||
|
std::unique_ptr<TRecord> old_record,
|
||||||
|
std::unique_ptr<TRecord> new_record) {
|
||||||
|
|
||||||
|
if (old_record) LocalizeAddresses(*old_record);
|
||||||
|
if (new_record) LocalizeAddresses(*new_record);
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> guard(GetLock(tx_id));
|
||||||
|
// We can't replace existing data because some accessors might be using
|
||||||
|
// it.
|
||||||
|
// TODO - consider if it's necessary and OK to copy just the data content.
|
||||||
|
auto &cache = GetCache<TRecord>(tx_id);
|
||||||
|
auto found = cache.find(gid);
|
||||||
|
if (found == cache.end())
|
||||||
|
cache.emplace(std::move(gid), std::make_pair(std::move(old_record),
|
||||||
|
std::move(new_record)));
|
||||||
|
}
|
||||||
|
|
||||||
/// Removes all the caches for a single transaction.
|
/// Removes all the caches for a single transaction.
|
||||||
void ClearCacheForSingleTransaction(tx::TransactionId tx_id);
|
void ClearCacheForSingleTransaction(tx::TransactionId tx_id);
|
||||||
@ -36,8 +107,27 @@ class DataManager {
|
|||||||
void ClearTransactionalCache(tx::TransactionId oldest_active);
|
void ClearTransactionalCache(tx::TransactionId oldest_active);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
template <typename TRecord>
|
||||||
|
void LocalizeAddresses(TRecord &record);
|
||||||
|
|
||||||
|
template <typename TRecord>
|
||||||
|
CacheG<TRecord> &GetCache(tx::TransactionId tx_id) {
|
||||||
|
auto accessor = caches<TRecord>().access();
|
||||||
|
auto found = accessor.find(tx_id);
|
||||||
|
if (found != accessor.end()) return found->second;
|
||||||
|
|
||||||
|
return accessor.emplace(tx_id, std::make_tuple(tx_id), std::make_tuple())
|
||||||
|
.first->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::mutex &GetLock(tx::TransactionId tx_id);
|
||||||
|
|
||||||
|
template <typename TRecord>
|
||||||
|
CacheT<TRecord> &caches();
|
||||||
|
|
||||||
database::GraphDb &db_;
|
database::GraphDb &db_;
|
||||||
DataRpcClients &data_clients_;
|
DataRpcClients &data_clients_;
|
||||||
|
ConcurrentMap<tx::TransactionId, std::mutex> lock_store_;
|
||||||
CacheT<Vertex> vertices_caches_;
|
CacheT<Vertex> vertices_caches_;
|
||||||
CacheT<Edge> edges_caches_;
|
CacheT<Edge> edges_caches_;
|
||||||
};
|
};
|
||||||
|
@ -262,9 +262,8 @@ void PullResData::LoadGraphElement(
|
|||||||
vertex_reader.hasNew()
|
vertex_reader.hasNew()
|
||||||
? distributed::LoadVertex(vertex_reader.getNew())
|
? distributed::LoadVertex(vertex_reader.getNew())
|
||||||
: nullptr;
|
: nullptr;
|
||||||
data_manager->Elements<Vertex>(dba->transaction_id())
|
data_manager->Emplace<Vertex>(dba->transaction_id(), global_address.gid(),
|
||||||
.emplace(global_address.gid(), std::move(old_record),
|
std::move(old_record), std::move(new_record));
|
||||||
std::move(new_record));
|
|
||||||
return VertexAccessor(global_address, *dba);
|
return VertexAccessor(global_address, *dba);
|
||||||
};
|
};
|
||||||
auto load_edge = [dba, data_manager](const auto &edge_reader) {
|
auto load_edge = [dba, data_manager](const auto &edge_reader) {
|
||||||
@ -277,9 +276,8 @@ void PullResData::LoadGraphElement(
|
|||||||
edge_reader.hasNew()
|
edge_reader.hasNew()
|
||||||
? distributed::LoadEdge(edge_reader.getNew())
|
? distributed::LoadEdge(edge_reader.getNew())
|
||||||
: nullptr;
|
: nullptr;
|
||||||
data_manager->Elements<Edge>(dba->transaction_id())
|
data_manager->Emplace<Edge>(dba->transaction_id(), global_address.gid(),
|
||||||
.emplace(global_address.gid(), std::move(old_record),
|
std::move(old_record), std::move(new_record));
|
||||||
std::move(new_record));
|
|
||||||
return EdgeAccessor(global_address, *dba);
|
return EdgeAccessor(global_address, *dba);
|
||||||
};
|
};
|
||||||
switch (reader.which()) {
|
switch (reader.which()) {
|
||||||
|
@ -181,14 +181,14 @@ class VersionList {
|
|||||||
*
|
*
|
||||||
* @param t The transaction
|
* @param t The transaction
|
||||||
*/
|
*/
|
||||||
void find_set_old_new(const tx::Transaction &t, T *&old_ref, T *&new_ref) {
|
void find_set_old_new(const tx::Transaction &t, T **old_ref, T **new_ref) {
|
||||||
// assume that the sought old record is further down the list
|
// assume that the sought old record is further down the list
|
||||||
// from new record, so that if we found old we can stop looking
|
// from new record, so that if we found old we can stop looking
|
||||||
new_ref = nullptr;
|
*new_ref = nullptr;
|
||||||
old_ref = head_;
|
*old_ref = head_;
|
||||||
while (old_ref != nullptr && !old_ref->visible(t)) {
|
while (*old_ref != nullptr && !(*old_ref)->visible(t)) {
|
||||||
if (!new_ref && old_ref->is_created_by(t)) new_ref = old_ref;
|
if (!*new_ref && (*old_ref)->is_created_by(t)) *new_ref = *old_ref;
|
||||||
old_ref = old_ref->next(std::memory_order_seq_cst);
|
*old_ref = (*old_ref)->next(std::memory_order_seq_cst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,7 +205,7 @@ class VersionList {
|
|||||||
DCHECK(head_ != nullptr) << "Head is nullptr on update.";
|
DCHECK(head_ != nullptr) << "Head is nullptr on update.";
|
||||||
T *old_record = nullptr;
|
T *old_record = nullptr;
|
||||||
T *new_record = nullptr;
|
T *new_record = nullptr;
|
||||||
find_set_old_new(t, old_record, new_record);
|
find_set_old_new(t, &old_record, &new_record);
|
||||||
|
|
||||||
// check if current transaction in current cmd has
|
// check if current transaction in current cmd has
|
||||||
// already updated version list
|
// already updated version list
|
||||||
|
Loading…
Reference in New Issue
Block a user