Refactor remote cache ownership
Summary: Remote caches used to be owned by `GraphDbAccessor`. An advantage of that was immediate cleanup when destructing. A disadvantage was sharing the remote cache between mutliple program-flows in the same transaction in distributed (one would have to share the accessor). We will have to do post-transactional global cleanup anyway, since we leak, which reduces the above stated advantage. And the stated disadvantage is becoming more and more pronounced as additional components need access to the remote cache. Hence the refactor. Reviewers: buda, teon.banek, msantl Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1186
This commit is contained in:
parent
db407142ce
commit
fc2703833c
@ -7,6 +7,7 @@
|
||||
#include "distributed/index_rpc_server.hpp"
|
||||
#include "distributed/plan_consumer.hpp"
|
||||
#include "distributed/plan_dispatcher.hpp"
|
||||
#include "distributed/remote_data_manager.hpp"
|
||||
#include "distributed/remote_data_rpc_clients.hpp"
|
||||
#include "distributed/remote_data_rpc_server.hpp"
|
||||
#include "distributed/remote_produce_rpc_server.hpp"
|
||||
@ -116,6 +117,9 @@ class SingleNode : public PrivateBase {
|
||||
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override {
|
||||
LOG(FATAL) << "Remote updates clients not available in single-node.";
|
||||
}
|
||||
distributed::RemoteDataManager &remote_data_manager() override {
|
||||
LOG(FATAL) << "Remote data manager not available in single-node.";
|
||||
}
|
||||
};
|
||||
|
||||
#define IMPL_DISTRIBUTED_GETTERS \
|
||||
@ -130,6 +134,9 @@ class SingleNode : public PrivateBase {
|
||||
} \
|
||||
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override { \
|
||||
return remote_updates_clients_; \
|
||||
} \
|
||||
distributed::RemoteDataManager &remote_data_manager() override { \
|
||||
return remote_data_manager_; \
|
||||
}
|
||||
|
||||
class Master : public PrivateBase {
|
||||
@ -164,6 +171,7 @@ class Master : public PrivateBase {
|
||||
distributed::kIndexRpcName};
|
||||
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_};
|
||||
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
|
||||
distributed::RemoteDataManager remote_data_manager_{remote_data_clients_};
|
||||
};
|
||||
|
||||
class Worker : public PrivateBase {
|
||||
@ -197,6 +205,7 @@ class Worker : public PrivateBase {
|
||||
distributed::IndexRpcServer index_rpc_server_{*this, system_};
|
||||
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, system_};
|
||||
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
|
||||
distributed::RemoteDataManager remote_data_manager_{remote_data_clients_};
|
||||
};
|
||||
|
||||
#undef IMPL_GETTERS
|
||||
@ -265,6 +274,9 @@ distributed::RemoteUpdatesRpcServer &PublicBase::remote_updates_server() {
|
||||
distributed::RemoteUpdatesRpcClients &PublicBase::remote_updates_clients() {
|
||||
return impl_->remote_updates_clients();
|
||||
}
|
||||
distributed::RemoteDataManager &PublicBase::remote_data_manager() {
|
||||
return impl_->remote_data_manager();
|
||||
}
|
||||
|
||||
void PublicBase::MakeSnapshot() {
|
||||
const bool status = durability::MakeSnapshot(
|
||||
|
@ -23,6 +23,7 @@ class RemotePullRpcClients;
|
||||
class RemoteProduceRpcServer;
|
||||
class RemoteUpdatesRpcServer;
|
||||
class RemoteUpdatesRpcClients;
|
||||
class RemoteDataManager;
|
||||
}
|
||||
|
||||
namespace database {
|
||||
@ -95,6 +96,7 @@ class GraphDb {
|
||||
virtual distributed::RemoteDataRpcClients &remote_data_clients() = 0;
|
||||
virtual distributed::RemoteUpdatesRpcServer &remote_updates_server() = 0;
|
||||
virtual distributed::RemoteUpdatesRpcClients &remote_updates_clients() = 0;
|
||||
virtual distributed::RemoteDataManager &remote_data_manager() = 0;
|
||||
|
||||
// Supported only in distributed master.
|
||||
virtual distributed::RemotePullRpcClients &remote_pull_clients() = 0;
|
||||
@ -141,6 +143,7 @@ class PublicBase : public GraphDb {
|
||||
distributed::RemoteProduceRpcServer &remote_produce_server() override;
|
||||
distributed::RemoteUpdatesRpcServer &remote_updates_server() override;
|
||||
distributed::RemoteUpdatesRpcClients &remote_updates_clients() override;
|
||||
distributed::RemoteDataManager &remote_data_manager() override;
|
||||
|
||||
protected:
|
||||
explicit PublicBase(std::unique_ptr<PrivateBase> impl);
|
||||
|
@ -18,22 +18,12 @@ namespace database {
|
||||
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
|
||||
: db_(db),
|
||||
transaction_(*db.tx_engine().Begin()),
|
||||
transaction_starter_{true} {
|
||||
if (db_.type() != GraphDb::Type::SINGLE_NODE) {
|
||||
remote_vertices_.emplace(db_.remote_data_clients());
|
||||
remote_edges_.emplace(db_.remote_data_clients());
|
||||
}
|
||||
}
|
||||
transaction_starter_{true} {}
|
||||
|
||||
GraphDbAccessor::GraphDbAccessor(GraphDb &db, tx::transaction_id_t tx_id)
|
||||
: db_(db),
|
||||
transaction_(*db.tx_engine().RunningTransaction(tx_id)),
|
||||
transaction_starter_{false} {
|
||||
if (db_.type() != GraphDb::Type::SINGLE_NODE) {
|
||||
remote_vertices_.emplace(db_.remote_data_clients());
|
||||
remote_edges_.emplace(db_.remote_data_clients());
|
||||
}
|
||||
}
|
||||
transaction_starter_{false} {}
|
||||
|
||||
GraphDbAccessor::~GraphDbAccessor() {
|
||||
if (transaction_starter_ && !commited_ && !aborted_) {
|
||||
@ -525,16 +515,6 @@ std::vector<std::string> GraphDbAccessor::IndexInfo() const {
|
||||
return info;
|
||||
}
|
||||
|
||||
template <>
|
||||
distributed::RemoteCache<Vertex> &GraphDbAccessor::remote_elements() {
|
||||
return remote_vertices();
|
||||
}
|
||||
|
||||
template <>
|
||||
distributed::RemoteCache<Edge> &GraphDbAccessor::remote_elements() {
|
||||
return remote_edges();
|
||||
}
|
||||
|
||||
mvcc::VersionList<Vertex> *GraphDbAccessor::LocalVertexAddress(
|
||||
gid::Gid gid) const {
|
||||
auto access = db_.storage().vertices_.access();
|
||||
|
@ -550,21 +550,6 @@ class GraphDbAccessor {
|
||||
/* Returns a list of index names present in the database. */
|
||||
std::vector<std::string> IndexInfo() const;
|
||||
|
||||
distributed::RemoteCache<Vertex> &remote_vertices() {
|
||||
CHECK(remote_vertices_)
|
||||
<< "Attempting to get a remote cache in single-node Memgraph";
|
||||
return remote_vertices_.value();
|
||||
}
|
||||
distributed::RemoteCache<Edge> &remote_edges() {
|
||||
CHECK(remote_edges_)
|
||||
<< "Attempting to get a remote cache in single-node Memgraph";
|
||||
return remote_edges_.value();
|
||||
}
|
||||
|
||||
/** Gets remote_vertices or remote_edges, depending on type param. */
|
||||
template <typename TRecord>
|
||||
distributed::RemoteCache<TRecord> &remote_elements();
|
||||
|
||||
/// Gets the local address for the given gid. Fails if not present.
|
||||
mvcc::VersionList<Vertex> *LocalVertexAddress(gid::Gid gid) const;
|
||||
|
||||
@ -581,10 +566,6 @@ class GraphDbAccessor {
|
||||
bool commited_{false};
|
||||
bool aborted_{false};
|
||||
|
||||
std::experimental::optional<distributed::RemoteCache<Vertex>>
|
||||
remote_vertices_;
|
||||
std::experimental::optional<distributed::RemoteCache<Edge>> remote_edges_;
|
||||
|
||||
/**
|
||||
* Insert this vertex into corresponding label and label+property (if it
|
||||
* exists) index.
|
||||
|
@ -16,8 +16,7 @@ namespace distributed {
|
||||
* distributed system. Maps global IDs to (old, new) Vertex/Edge pointer
|
||||
* pairs. It is possible that either "old" or "new" are nullptrs, but at
|
||||
* least one must be not-null. The RemoteCache is the owner of TRecord
|
||||
* objects it points to. This class is thread-safe, because it's owning
|
||||
* GraphDbAccessor is thread-safe.
|
||||
* objects it points to.
|
||||
*
|
||||
* @tparam TRecord - Edge or Vertex
|
||||
*/
|
||||
|
61
src/distributed/remote_data_manager.hpp
Normal file
61
src/distributed/remote_data_manager.hpp
Normal file
@ -0,0 +1,61 @@
|
||||
#pragma once
|
||||
|
||||
#include <mutex>
|
||||
|
||||
#include "distributed/remote_cache.hpp"
|
||||
#include "distributed/remote_data_rpc_clients.hpp"
|
||||
#include "storage/edge.hpp"
|
||||
#include "storage/vertex.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
/** Handles remote data caches for edges and vertices, per transaction. */
|
||||
class RemoteDataManager {
|
||||
// Helper, gets or inserts a data cache for the given transaction.
|
||||
template <typename TCollection>
|
||||
auto &GetCache(TCollection &collection, tx::transaction_id_t tx_id) {
|
||||
std::lock_guard<SpinLock> guard{lock_};
|
||||
auto found = collection.find(tx_id);
|
||||
if (found != collection.end()) return found->second;
|
||||
|
||||
return collection.emplace(tx_id, remote_data_clients_).first->second;
|
||||
}
|
||||
|
||||
public:
|
||||
RemoteDataManager(distributed::RemoteDataRpcClients &remote_data_clients)
|
||||
: remote_data_clients_(remote_data_clients) {}
|
||||
|
||||
/// Gets or creates the remote vertex cache for the given transaction.
|
||||
auto &Vertices(tx::transaction_id_t tx_id) {
|
||||
return GetCache(vertices_caches_, tx_id);
|
||||
}
|
||||
|
||||
/// Gets or creates the remote edge cache for the given transaction.
|
||||
auto &Edges(tx::transaction_id_t tx_id) {
|
||||
return GetCache(edges_caches_, tx_id);
|
||||
}
|
||||
|
||||
/// Gets or creates the remote vertex/edge cache for the given transaction.
|
||||
template <typename TRecord>
|
||||
auto &Elements(tx::transaction_id_t tx_id);
|
||||
|
||||
private:
|
||||
RemoteDataRpcClients &remote_data_clients_;
|
||||
SpinLock lock_;
|
||||
std::unordered_map<tx::transaction_id_t, RemoteCache<Vertex>>
|
||||
vertices_caches_;
|
||||
std::unordered_map<tx::transaction_id_t, RemoteCache<Edge>> edges_caches_;
|
||||
};
|
||||
|
||||
template <>
|
||||
inline auto &RemoteDataManager::Elements<Vertex>(tx::transaction_id_t tx_id) {
|
||||
return Vertices(tx_id);
|
||||
}
|
||||
|
||||
template <>
|
||||
inline auto &RemoteDataManager::Elements<Edge>(tx::transaction_id_t tx_id) {
|
||||
return Edges(tx_id);
|
||||
}
|
||||
} // namespace distributed
|
@ -4,6 +4,7 @@
|
||||
#include <vector>
|
||||
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "distributed/remote_data_manager.hpp"
|
||||
#include "distributed/remote_pull_produce_rpc_messages.hpp"
|
||||
#include "distributed/rpc_worker_clients.hpp"
|
||||
#include "query/frontend/semantic/symbol.hpp"
|
||||
@ -38,18 +39,22 @@ class RemotePullRpcClients {
|
||||
true);
|
||||
|
||||
auto handle_vertex = [&dba](auto &v) {
|
||||
dba.remote_vertices().emplace(v.global_address.gid(),
|
||||
std::move(v.old_record),
|
||||
std::move(v.new_record));
|
||||
dba.db()
|
||||
.remote_data_manager()
|
||||
.Vertices(dba.transaction_id())
|
||||
.emplace(v.global_address.gid(), std::move(v.old_record),
|
||||
std::move(v.new_record));
|
||||
if (v.element_in_frame) {
|
||||
VertexAccessor va(v.global_address, dba);
|
||||
*v.element_in_frame = va;
|
||||
}
|
||||
};
|
||||
auto handle_edge = [&dba](auto &e) {
|
||||
dba.remote_edges().emplace(e.global_address.gid(),
|
||||
std::move(e.old_record),
|
||||
std::move(e.new_record));
|
||||
dba.db()
|
||||
.remote_data_manager()
|
||||
.Edges(dba.transaction_id())
|
||||
.emplace(e.global_address.gid(), std::move(e.old_record),
|
||||
std::move(e.new_record));
|
||||
if (e.element_in_frame) {
|
||||
EdgeAccessor ea(e.global_address, dba);
|
||||
*e.element_in_frame = ea;
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "database/state_delta.hpp"
|
||||
#include "distributed/remote_data_manager.hpp"
|
||||
#include "distributed/remote_updates_rpc_clients.hpp"
|
||||
#include "storage/edge.hpp"
|
||||
#include "storage/record_accessor.hpp"
|
||||
@ -137,17 +138,20 @@ RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchOld() {
|
||||
|
||||
template <typename TRecord>
|
||||
bool RecordAccessor<TRecord>::Reconstruct() const {
|
||||
auto &dba = db_accessor();
|
||||
if (is_local()) {
|
||||
address_.local()->find_set_old_new(db_accessor_->transaction(), old_, new_);
|
||||
address_.local()->find_set_old_new(dba.transaction(), old_, new_);
|
||||
} else {
|
||||
// It's not possible that we have a global address for a graph element
|
||||
// that's local, because that is resolved in the constructor.
|
||||
// TODO in write queries it's possible the command has been advanced and
|
||||
// we need to invalidate the RemoteCache and really get the latest stuff.
|
||||
// But only do that after the command has been advanced.
|
||||
db_accessor().template remote_elements<TRecord>().FindSetOldNew(
|
||||
db_accessor().transaction().id_, address_.worker_id(), address_.gid(),
|
||||
old_, new_);
|
||||
auto &remote_cache =
|
||||
dba.db().remote_data_manager().template Elements<TRecord>(
|
||||
dba.transaction_id());
|
||||
remote_cache.FindSetOldNew(dba.transaction().id_, address_.worker_id(),
|
||||
address_.gid(), old_, new_);
|
||||
}
|
||||
current_ = old_ ? old_ : new_;
|
||||
return old_ != nullptr || new_ != nullptr;
|
||||
@ -155,13 +159,14 @@ bool RecordAccessor<TRecord>::Reconstruct() const {
|
||||
|
||||
template <typename TRecord>
|
||||
TRecord &RecordAccessor<TRecord>::update() const {
|
||||
auto &dba = db_accessor();
|
||||
// Edges have lazily initialize mutable, versioned data (properties).
|
||||
if (std::is_same<TRecord, Edge>::value && current_ == nullptr) {
|
||||
bool reconstructed = Reconstruct();
|
||||
DCHECK(reconstructed) << "Unable to initialize record";
|
||||
}
|
||||
|
||||
const auto &t = db_accessor_->transaction();
|
||||
const auto &t = dba.transaction();
|
||||
if (!new_ && old_->is_expired_by(t))
|
||||
throw RecordDeletedError();
|
||||
else if (new_ && new_->is_expired_by(t))
|
||||
@ -172,8 +177,10 @@ TRecord &RecordAccessor<TRecord>::update() const {
|
||||
if (is_local()) {
|
||||
new_ = address_.local()->update(t);
|
||||
} else {
|
||||
new_ = db_accessor().template remote_elements<TRecord>().FindNew(
|
||||
address_.gid());
|
||||
auto &remote_cache =
|
||||
dba.db().remote_data_manager().template Elements<TRecord>(
|
||||
dba.transaction_id());
|
||||
new_ = remote_cache.FindNew(address_.gid());
|
||||
}
|
||||
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
|
||||
return *new_;
|
||||
|
Loading…
Reference in New Issue
Block a user