Remove Impl from RecordAccessor
Reviewers: msantl, ipaljak, teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1859
This commit is contained in:
parent
19aabaa0d3
commit
d1eeaa8de0
@ -23,8 +23,6 @@
|
||||
#include "distributed/updates_rpc_clients.hpp"
|
||||
#include "distributed/updates_rpc_server.hpp"
|
||||
#include "durability/distributed/snapshooter.hpp"
|
||||
// TODO: Why do we depend on query here?
|
||||
#include "query/exceptions.hpp"
|
||||
#include "storage/distributed/concurrent_id_mapper.hpp"
|
||||
#include "storage/distributed/concurrent_id_mapper_master.hpp"
|
||||
#include "storage/distributed/concurrent_id_mapper_worker.hpp"
|
||||
@ -39,215 +37,6 @@ using namespace std::literals::chrono_literals;
|
||||
namespace database {
|
||||
|
||||
namespace {
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
// RecordAccessors implementations
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
|
||||
// RecordAccessor implementation is shared among different RecordAccessors to
|
||||
// avoid heap allocations. Therefore, we are constructing this implementation in
|
||||
// each GraphDb and pass it to DistributedAccessor.
|
||||
template <class TRecord>
|
||||
class DistributedRecordAccessor final {
|
||||
// These should never be changed, because this implementation may be shared
|
||||
// among multiple RecordAccessors.
|
||||
int worker_id_;
|
||||
distributed::DataManager *data_manager_;
|
||||
distributed::UpdatesRpcClients *updates_clients_;
|
||||
|
||||
public:
|
||||
DistributedRecordAccessor(int worker_id,
|
||||
distributed::DataManager *data_manager,
|
||||
distributed::UpdatesRpcClients *updates_clients)
|
||||
: worker_id_(worker_id),
|
||||
data_manager_(data_manager),
|
||||
updates_clients_(updates_clients) {
|
||||
CHECK(data_manager_ && updates_clients_);
|
||||
}
|
||||
|
||||
typename RecordAccessor<TRecord>::AddressT GlobalAddress(
|
||||
const RecordAccessor<TRecord> &record_accessor) {
|
||||
return record_accessor.is_local()
|
||||
? storage::Address<mvcc::VersionList<TRecord>>(
|
||||
record_accessor.gid(), worker_id_)
|
||||
: record_accessor.address();
|
||||
}
|
||||
|
||||
void SetOldNew(const RecordAccessor<TRecord> &record_accessor, TRecord **old,
|
||||
TRecord **newr) {
|
||||
auto &dba = record_accessor.db_accessor();
|
||||
const auto &address = record_accessor.address();
|
||||
if (record_accessor.is_local()) {
|
||||
address.local()->find_set_old_new(dba.transaction(), old, newr);
|
||||
return;
|
||||
}
|
||||
// It's not possible that we have a global address for a graph element
|
||||
// that's local, because that is resolved in the constructor.
|
||||
// TODO in write queries it's possible the command has been advanced and
|
||||
// we need to invalidate the Cache and really get the latest stuff.
|
||||
// But only do that after the command has been advanced.
|
||||
data_manager_->FindSetOldNew(dba.transaction_id(), address.worker_id(),
|
||||
address.gid(), old, newr);
|
||||
}
|
||||
|
||||
TRecord *FindNew(const RecordAccessor<TRecord> &record_accessor) {
|
||||
const auto &address = record_accessor.address();
|
||||
auto &dba = record_accessor.db_accessor();
|
||||
if (address.is_local()) {
|
||||
return address.local()->update(dba.transaction());
|
||||
}
|
||||
return data_manager_->FindNew<TRecord>(dba.transaction_id(), address.gid());
|
||||
}
|
||||
|
||||
void ProcessDelta(const RecordAccessor<TRecord> &record_accessor,
|
||||
const database::StateDelta &delta) {
|
||||
if (record_accessor.is_local()) {
|
||||
record_accessor.db_accessor().wal().Emplace(delta);
|
||||
} else {
|
||||
SendDelta(record_accessor, delta);
|
||||
}
|
||||
}
|
||||
|
||||
void SendDelta(const RecordAccessor<TRecord> &record_accessor,
|
||||
const database::StateDelta &delta) {
|
||||
auto result =
|
||||
updates_clients_->Update(record_accessor.address().worker_id(), delta);
|
||||
switch (result) {
|
||||
case distributed::UpdateResult::DONE:
|
||||
break;
|
||||
case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
|
||||
throw query::RemoveAttachedVertexException();
|
||||
case distributed::UpdateResult::SERIALIZATION_ERROR:
|
||||
throw mvcc::SerializationError();
|
||||
case distributed::UpdateResult::UPDATE_DELETED_ERROR:
|
||||
throw RecordDeletedError();
|
||||
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
|
||||
throw utils::LockTimeoutException("Lock timeout on remote worker");
|
||||
}
|
||||
}
|
||||
|
||||
int64_t CypherId(const RecordAccessor<TRecord> &record_accessor) {
|
||||
auto &dba = record_accessor.db_accessor();
|
||||
const auto &address = record_accessor.address();
|
||||
if (record_accessor.is_local()) return address.local()->cypher_id();
|
||||
// Fetch data from the cache.
|
||||
//
|
||||
// NOTE: This part is executed when we need to migrate
|
||||
// a vertex and it has edges that don't belong to it. A machine that owns
|
||||
// the vertex still need to figure out what is the cypher_id for each
|
||||
// remote edge because the machine has to initiate remote edge creation
|
||||
// and for that call it has to know the remote cypher_ids.
|
||||
// TODO (buda): If we save cypher_id similar/next to edge_type we would save
|
||||
// a network call.
|
||||
return data_manager_
|
||||
->Find<TRecord>(dba.transaction().id_, address.worker_id(),
|
||||
address.gid())
|
||||
.cypher_id;
|
||||
}
|
||||
};
|
||||
|
||||
class DistributedEdgeAccessor final : public ::RecordAccessor<Edge>::Impl {
|
||||
DistributedRecordAccessor<Edge> distributed_accessor_;
|
||||
|
||||
public:
|
||||
DistributedEdgeAccessor(int worker_id, distributed::DataManager *data_manager,
|
||||
distributed::UpdatesRpcClients *updates_clients)
|
||||
: distributed_accessor_(worker_id, data_manager, updates_clients) {}
|
||||
|
||||
typename RecordAccessor<Edge>::AddressT GlobalAddress(
|
||||
const RecordAccessor<Edge> &ra) override {
|
||||
return distributed_accessor_.GlobalAddress(ra);
|
||||
}
|
||||
|
||||
void SetOldNew(const RecordAccessor<Edge> &ra, Edge **old_record,
|
||||
Edge **new_record) override {
|
||||
return distributed_accessor_.SetOldNew(ra, old_record, new_record);
|
||||
}
|
||||
|
||||
Edge *FindNew(const RecordAccessor<Edge> &ra) override {
|
||||
return distributed_accessor_.FindNew(ra);
|
||||
}
|
||||
|
||||
void ProcessDelta(const RecordAccessor<Edge> &ra,
|
||||
const database::StateDelta &delta) override {
|
||||
return distributed_accessor_.ProcessDelta(ra, delta);
|
||||
}
|
||||
|
||||
int64_t CypherId(const RecordAccessor<Edge> &ra) override {
|
||||
return distributed_accessor_.CypherId(ra);
|
||||
}
|
||||
};
|
||||
|
||||
class DistributedVertexAccessor final : public ::VertexAccessor::Impl {
|
||||
DistributedRecordAccessor<Vertex> distributed_accessor_;
|
||||
|
||||
public:
|
||||
DistributedVertexAccessor(int worker_id,
|
||||
distributed::DataManager *data_manager,
|
||||
distributed::UpdatesRpcClients *updates_clients)
|
||||
: distributed_accessor_(worker_id, data_manager, updates_clients) {}
|
||||
|
||||
typename RecordAccessor<Vertex>::AddressT GlobalAddress(
|
||||
const RecordAccessor<Vertex> &ra) override {
|
||||
return distributed_accessor_.GlobalAddress(ra);
|
||||
}
|
||||
|
||||
void SetOldNew(const RecordAccessor<Vertex> &ra, Vertex **old_record,
|
||||
Vertex **new_record) override {
|
||||
return distributed_accessor_.SetOldNew(ra, old_record, new_record);
|
||||
}
|
||||
|
||||
Vertex *FindNew(const RecordAccessor<Vertex> &ra) override {
|
||||
return distributed_accessor_.FindNew(ra);
|
||||
}
|
||||
|
||||
void ProcessDelta(const RecordAccessor<Vertex> &ra,
|
||||
const database::StateDelta &delta) override {
|
||||
return distributed_accessor_.ProcessDelta(ra, delta);
|
||||
}
|
||||
|
||||
void AddLabel(const VertexAccessor &va,
|
||||
const storage::Label &label) override {
|
||||
auto &dba = va.db_accessor();
|
||||
auto delta = StateDelta::AddLabel(dba.transaction_id(), va.gid(), label,
|
||||
dba.LabelName(label));
|
||||
Vertex &vertex = va.update();
|
||||
// not a duplicate label, add it
|
||||
if (!utils::Contains(vertex.labels_, label)) {
|
||||
vertex.labels_.emplace_back(label);
|
||||
if (va.is_local()) {
|
||||
dba.wal().Emplace(delta);
|
||||
dba.UpdateLabelIndices(label, va, &vertex);
|
||||
}
|
||||
}
|
||||
|
||||
if (!va.is_local()) distributed_accessor_.SendDelta(va, delta);
|
||||
}
|
||||
|
||||
void RemoveLabel(const VertexAccessor &va,
|
||||
const storage::Label &label) override {
|
||||
auto &dba = va.db_accessor();
|
||||
auto delta = StateDelta::RemoveLabel(dba.transaction_id(), va.gid(), label,
|
||||
dba.LabelName(label));
|
||||
Vertex &vertex = va.update();
|
||||
if (utils::Contains(vertex.labels_, label)) {
|
||||
auto &labels = vertex.labels_;
|
||||
auto found = std::find(labels.begin(), labels.end(), delta.label);
|
||||
std::swap(*found, labels.back());
|
||||
labels.pop_back();
|
||||
if (va.is_local()) {
|
||||
dba.wal().Emplace(delta);
|
||||
}
|
||||
}
|
||||
|
||||
if (!va.is_local()) distributed_accessor_.SendDelta(va, delta);
|
||||
}
|
||||
|
||||
int64_t CypherId(const RecordAccessor<Vertex> &ra) override {
|
||||
return distributed_accessor_.CypherId(ra);
|
||||
}
|
||||
};
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
// GraphDbAccessor implementations
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
@ -255,36 +44,19 @@ class DistributedVertexAccessor final : public ::VertexAccessor::Impl {
|
||||
class DistributedAccessor : public GraphDbAccessor {
|
||||
distributed::UpdatesRpcClients *updates_clients_{nullptr};
|
||||
distributed::DataManager *data_manager_{nullptr};
|
||||
// Shared implementations of record accessors.
|
||||
DistributedVertexAccessor *vertex_accessor_;
|
||||
DistributedEdgeAccessor *edge_accessor_;
|
||||
|
||||
protected:
|
||||
DistributedAccessor(GraphDb *db, tx::TransactionId tx_id,
|
||||
DistributedVertexAccessor *vertex_accessor,
|
||||
DistributedEdgeAccessor *edge_accessor)
|
||||
DistributedAccessor(GraphDb *db, tx::TransactionId tx_id)
|
||||
: GraphDbAccessor(*db, tx_id),
|
||||
updates_clients_(&db->updates_clients()),
|
||||
data_manager_(&db->data_manager()),
|
||||
vertex_accessor_(vertex_accessor),
|
||||
edge_accessor_(edge_accessor) {}
|
||||
data_manager_(&db->data_manager()) {}
|
||||
|
||||
DistributedAccessor(GraphDb *db,
|
||||
DistributedVertexAccessor *vertex_accessor,
|
||||
DistributedEdgeAccessor *edge_accessor)
|
||||
explicit DistributedAccessor(GraphDb *db)
|
||||
: GraphDbAccessor(*db),
|
||||
updates_clients_(&db->updates_clients()),
|
||||
data_manager_(&db->data_manager()),
|
||||
vertex_accessor_(vertex_accessor),
|
||||
edge_accessor_(edge_accessor) {}
|
||||
data_manager_(&db->data_manager()) {}
|
||||
|
||||
public:
|
||||
::VertexAccessor::Impl *GetVertexImpl() override { return vertex_accessor_; }
|
||||
|
||||
::RecordAccessor<Edge>::Impl *GetEdgeImpl() override {
|
||||
return edge_accessor_;
|
||||
}
|
||||
|
||||
bool RemoveVertex(VertexAccessor &vertex_accessor,
|
||||
bool check_empty = true) override {
|
||||
if (!vertex_accessor.is_local()) {
|
||||
@ -373,20 +145,16 @@ class MasterAccessor final : public DistributedAccessor {
|
||||
|
||||
public:
|
||||
MasterAccessor(Master *db, distributed::Coordination *coordination,
|
||||
distributed::PullRpcClients *pull_clients_,
|
||||
DistributedVertexAccessor *vertex_accessor,
|
||||
DistributedEdgeAccessor *edge_accessor)
|
||||
: DistributedAccessor(db, vertex_accessor, edge_accessor),
|
||||
distributed::PullRpcClients *pull_clients_)
|
||||
: DistributedAccessor(db),
|
||||
coordination_(coordination),
|
||||
pull_clients_(pull_clients_),
|
||||
worker_id_(db->WorkerId()) {}
|
||||
|
||||
MasterAccessor(Master *db, tx::TransactionId tx_id,
|
||||
distributed::Coordination *coordination,
|
||||
distributed::PullRpcClients *pull_clients_,
|
||||
DistributedVertexAccessor *vertex_accessor,
|
||||
DistributedEdgeAccessor *edge_accessor)
|
||||
: DistributedAccessor(db, tx_id, vertex_accessor, edge_accessor),
|
||||
distributed::PullRpcClients *pull_clients_)
|
||||
: DistributedAccessor(db, tx_id),
|
||||
coordination_(coordination),
|
||||
pull_clients_(pull_clients_),
|
||||
worker_id_(db->WorkerId()) {}
|
||||
@ -474,14 +242,11 @@ class MasterAccessor final : public DistributedAccessor {
|
||||
|
||||
class WorkerAccessor final : public DistributedAccessor {
|
||||
public:
|
||||
WorkerAccessor(Worker *db, DistributedVertexAccessor *vertex_accessor,
|
||||
DistributedEdgeAccessor *edge_accessor)
|
||||
: DistributedAccessor(db, vertex_accessor, edge_accessor) {}
|
||||
explicit WorkerAccessor(Worker *db)
|
||||
: DistributedAccessor(db) {}
|
||||
|
||||
WorkerAccessor(Worker *db, tx::TransactionId tx_id,
|
||||
DistributedVertexAccessor *vertex_accessor,
|
||||
DistributedEdgeAccessor *edge_accessor)
|
||||
: DistributedAccessor(db, tx_id, vertex_accessor, edge_accessor) {}
|
||||
WorkerAccessor(Worker *db, tx::TransactionId tx_id)
|
||||
: DistributedAccessor(db, tx_id) {}
|
||||
|
||||
void BuildIndex(storage::Label, storage::Property, bool) override {
|
||||
// TODO: Rethink BuildIndex API or inheritance. It's rather strange that a
|
||||
@ -603,11 +368,6 @@ class Master {
|
||||
durability::WriteAheadLog wal_{
|
||||
config_.worker_id, config_.durability_directory,
|
||||
config_.durability_enabled, config_.synchronous_commit};
|
||||
// Shared implementations for all RecordAccessor in this Db.
|
||||
DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_,
|
||||
&updates_clients_};
|
||||
DistributedVertexAccessor vertex_accessor_{config_.worker_id, &data_manager_,
|
||||
&updates_clients_};
|
||||
|
||||
// TODO: Some things may depend on order of construction/destruction. We also
|
||||
// have a lot of circular pointers among members. It would be a good idea to
|
||||
@ -661,14 +421,12 @@ Master::~Master() {}
|
||||
|
||||
std::unique_ptr<GraphDbAccessor> Master::Access() {
|
||||
return std::make_unique<MasterAccessor>(
|
||||
this, &impl_->coordination_, &impl_->pull_clients_,
|
||||
&impl_->vertex_accessor_, &impl_->edge_accessor_);
|
||||
this, &impl_->coordination_, &impl_->pull_clients_);
|
||||
}
|
||||
|
||||
std::unique_ptr<GraphDbAccessor> Master::Access(tx::TransactionId tx_id) {
|
||||
return std::make_unique<MasterAccessor>(
|
||||
this, tx_id, &impl_->coordination_, &impl_->pull_clients_,
|
||||
&impl_->vertex_accessor_, &impl_->edge_accessor_);
|
||||
this, tx_id, &impl_->coordination_, &impl_->pull_clients_);
|
||||
}
|
||||
|
||||
Storage &Master::storage() { return *impl_->storage_; }
|
||||
@ -974,11 +732,6 @@ class Worker {
|
||||
durability::WriteAheadLog wal_{
|
||||
config_.worker_id, config_.durability_directory,
|
||||
config_.durability_enabled, config_.synchronous_commit};
|
||||
// Shared implementations for all RecordAccessor in this Db.
|
||||
DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_,
|
||||
&updates_clients_};
|
||||
DistributedVertexAccessor vertex_accessor_{config_.worker_id, &data_manager_,
|
||||
&updates_clients_};
|
||||
|
||||
Worker(const Config &config, database::Worker *self)
|
||||
: config_(config), self_(self) {}
|
||||
@ -1041,13 +794,11 @@ Worker::Worker(Config config)
|
||||
Worker::~Worker() {}
|
||||
|
||||
std::unique_ptr<GraphDbAccessor> Worker::Access() {
|
||||
return std::make_unique<WorkerAccessor>(this, &impl_->vertex_accessor_,
|
||||
&impl_->edge_accessor_);
|
||||
return std::make_unique<WorkerAccessor>(this);
|
||||
}
|
||||
|
||||
std::unique_ptr<GraphDbAccessor> Worker::Access(tx::TransactionId tx_id) {
|
||||
return std::make_unique<WorkerAccessor>(this, tx_id, &impl_->vertex_accessor_,
|
||||
&impl_->edge_accessor_);
|
||||
return std::make_unique<WorkerAccessor>(this, tx_id);
|
||||
}
|
||||
|
||||
Storage &Worker::storage() { return *impl_->storage_; }
|
||||
|
@ -32,6 +32,18 @@ GraphDbAccessor::~GraphDbAccessor() {
|
||||
}
|
||||
}
|
||||
|
||||
int16_t GraphDbAccessor::worker_id() const {
|
||||
return db_.WorkerId();
|
||||
}
|
||||
|
||||
distributed::DataManager &GraphDbAccessor::data_manager() {
|
||||
return db_.data_manager();
|
||||
}
|
||||
|
||||
distributed::UpdatesRpcClients &GraphDbAccessor::updates_clients() {
|
||||
return db_.updates_clients();
|
||||
}
|
||||
|
||||
tx::TransactionId GraphDbAccessor::transaction_id() const {
|
||||
return transaction_.id_;
|
||||
}
|
||||
|
@ -20,6 +20,12 @@
|
||||
#include "utils/bound.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
namespace distributed {
|
||||
class DataManager;
|
||||
class UpdatesRpcClients;
|
||||
|
||||
} // namespace distributed
|
||||
|
||||
namespace database {
|
||||
|
||||
/** Thrown when inserting in an index with constraint. */
|
||||
@ -74,8 +80,9 @@ class GraphDbAccessor {
|
||||
GraphDbAccessor &operator=(const GraphDbAccessor &other) = delete;
|
||||
GraphDbAccessor &operator=(GraphDbAccessor &&other) = delete;
|
||||
|
||||
virtual ::VertexAccessor::Impl *GetVertexImpl() = 0;
|
||||
virtual ::RecordAccessor<Edge>::Impl *GetEdgeImpl() = 0;
|
||||
int16_t worker_id() const;
|
||||
distributed::DataManager &data_manager();
|
||||
distributed::UpdatesRpcClients &updates_clients();
|
||||
|
||||
/**
|
||||
* Creates a new Vertex and returns an accessor to it. If the ID is
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
EdgeAccessor::EdgeAccessor(EdgeAddress address,
|
||||
database::GraphDbAccessor &db_accessor)
|
||||
: RecordAccessor(address, db_accessor, db_accessor.GetEdgeImpl()),
|
||||
: RecordAccessor(address, db_accessor),
|
||||
from_(nullptr),
|
||||
to_(nullptr),
|
||||
edge_type_() {
|
||||
@ -22,7 +22,7 @@ EdgeAccessor::EdgeAccessor(EdgeAddress address,
|
||||
database::GraphDbAccessor &db_accessor,
|
||||
VertexAddress from, VertexAddress to,
|
||||
storage::EdgeType edge_type)
|
||||
: RecordAccessor(address, db_accessor, db_accessor.GetEdgeImpl()),
|
||||
: RecordAccessor(address, db_accessor),
|
||||
from_(from),
|
||||
to_(to),
|
||||
edge_type_(edge_type) {}
|
||||
|
@ -3,6 +3,8 @@
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "database/distributed/graph_db_accessor.hpp"
|
||||
#include "distributed/data_manager.hpp"
|
||||
#include "distributed/updates_rpc_clients.hpp"
|
||||
#include "durability/distributed/state_delta.hpp"
|
||||
#include "storage/distributed/edge.hpp"
|
||||
#include "storage/distributed/vertex.hpp"
|
||||
@ -11,10 +13,8 @@ using database::StateDelta;
|
||||
|
||||
template <typename TRecord>
|
||||
RecordAccessor<TRecord>::RecordAccessor(AddressT address,
|
||||
database::GraphDbAccessor &db_accessor,
|
||||
Impl *impl)
|
||||
: impl_(impl),
|
||||
db_accessor_(&db_accessor),
|
||||
database::GraphDbAccessor &db_accessor)
|
||||
: db_accessor_(&db_accessor),
|
||||
address_(db_accessor.db().storage().LocalizedAddressIfPossible(address)) {
|
||||
}
|
||||
|
||||
@ -107,7 +107,9 @@ typename RecordAccessor<TRecord>::AddressT RecordAccessor<TRecord>::address()
|
||||
template <typename TRecord>
|
||||
typename RecordAccessor<TRecord>::AddressT
|
||||
RecordAccessor<TRecord>::GlobalAddress() const {
|
||||
return impl_->GlobalAddress(*this);
|
||||
return is_local() ? storage::Address<mvcc::VersionList<TRecord>>(
|
||||
gid(), db_accessor_->worker_id())
|
||||
: address();
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
@ -138,7 +140,18 @@ RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchOld() {
|
||||
|
||||
template <typename TRecord>
|
||||
bool RecordAccessor<TRecord>::Reconstruct() const {
|
||||
impl_->SetOldNew(*this, &old_, &new_);
|
||||
auto &dba = db_accessor();
|
||||
if (is_local()) {
|
||||
address().local()->find_set_old_new(dba.transaction(), &old_, &new_);
|
||||
} else {
|
||||
// It's not possible that we have a global address for a graph element
|
||||
// that's local, because that is resolved in the constructor.
|
||||
// TODO in write queries it's possible the command has been advanced and
|
||||
// we need to invalidate the Cache and really get the latest stuff.
|
||||
// But only do that after the command has been advanced.
|
||||
dba.data_manager().template FindSetOldNew<TRecord>(
|
||||
dba.transaction_id(), address().worker_id(), gid(), &old_, &new_);
|
||||
}
|
||||
current_ = old_ ? old_ : new_;
|
||||
return old_ != nullptr || new_ != nullptr;
|
||||
}
|
||||
@ -160,14 +173,34 @@ TRecord &RecordAccessor<TRecord>::update() const {
|
||||
|
||||
if (new_) return *new_;
|
||||
|
||||
new_ = impl_->FindNew(*this);
|
||||
if (address().is_local()) {
|
||||
new_ = address().local()->update(dba.transaction());
|
||||
} else {
|
||||
new_ = dba.data_manager().template FindNew<TRecord>(dba.transaction_id(),
|
||||
address().gid());
|
||||
}
|
||||
|
||||
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
|
||||
return *new_;
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
int64_t RecordAccessor<TRecord>::CypherId() const {
|
||||
return impl_->CypherId(*this);
|
||||
auto &dba = db_accessor();
|
||||
if (is_local()) return address().local()->cypher_id();
|
||||
// Fetch data from the cache.
|
||||
//
|
||||
// NOTE: This part is executed when we need to migrate
|
||||
// a vertex and it has edges that don't belong to it. A machine that owns
|
||||
// the vertex still need to figure out what is the cypher_id for each
|
||||
// remote edge because the machine has to initiate remote edge creation
|
||||
// and for that call it has to know the remote cypher_ids.
|
||||
// TODO (buda): If we save cypher_id similar/next to edge_type we would save
|
||||
// a network call.
|
||||
return db_accessor_->data_manager()
|
||||
.template Find<TRecord>(dba.transaction().id_, address().worker_id(),
|
||||
gid())
|
||||
.cypher_id;
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
@ -184,7 +217,26 @@ const TRecord &RecordAccessor<TRecord>::current() const {
|
||||
template <typename TRecord>
|
||||
void RecordAccessor<TRecord>::ProcessDelta(
|
||||
const database::StateDelta &delta) const {
|
||||
impl_->ProcessDelta(*this, delta);
|
||||
if (is_local()) {
|
||||
db_accessor().wal().Emplace(delta);
|
||||
} else {
|
||||
SendDelta(delta);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
void RecordAccessor<TRecord>::SendDelta(
|
||||
const database::StateDelta &delta) const {
|
||||
auto result =
|
||||
db_accessor_->updates_clients().Update(address().worker_id(), delta);
|
||||
switch (result) {
|
||||
case distributed::UpdateResult::DONE:
|
||||
break;
|
||||
default:
|
||||
// Update methods sends UpdateRpc to UpdatesRpcServer, server
|
||||
// appends delta to list and returns UpdateResult::DONE
|
||||
LOG(FATAL) << "Update should always return DONE";
|
||||
}
|
||||
}
|
||||
|
||||
template class RecordAccessor<Vertex>;
|
||||
|
@ -30,32 +30,6 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
||||
public:
|
||||
using AddressT = storage::Address<mvcc::VersionList<TRecord>>;
|
||||
|
||||
/**
|
||||
* Interface for the underlying implementation of the record accessor.
|
||||
* The RecordAccessor only borrows the pointer to the implementation, it does
|
||||
* *not* own it. When a RecordAccessor is copied, so is the pointer but *not*
|
||||
* the implementation itself. This means that concrete Impl types need to be
|
||||
* shareable among different accessors. To achieve that, it's best for derived
|
||||
* Impl types to contain *no state*. The reason we are using this approach is
|
||||
* to prevent large amounts of allocations, because RecordAccessor are often
|
||||
* created and copied.
|
||||
*/
|
||||
class Impl {
|
||||
public:
|
||||
virtual ~Impl() {}
|
||||
|
||||
virtual AddressT GlobalAddress(const RecordAccessor<TRecord> &ra) = 0;
|
||||
/** Set the pointers for old and new records during `Reconstruct`. */
|
||||
virtual void SetOldNew(const RecordAccessor<TRecord> &ra,
|
||||
TRecord **old_record, TRecord **new_record) = 0;
|
||||
/** Find the pointer to the new, updated record. */
|
||||
virtual TRecord *FindNew(const RecordAccessor<TRecord> &ra) = 0;
|
||||
/** Process a change delta, e.g. by writing WAL. */
|
||||
virtual void ProcessDelta(const RecordAccessor<TRecord> &ra,
|
||||
const database::StateDelta &delta) = 0;
|
||||
virtual int64_t CypherId(const RecordAccessor<TRecord> &ra) = 0;
|
||||
};
|
||||
|
||||
// this class is default copyable, movable and assignable
|
||||
RecordAccessor(const RecordAccessor &other) = default;
|
||||
RecordAccessor(RecordAccessor &&other) = default;
|
||||
@ -77,8 +51,7 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
||||
* @param db_accessor The DB accessor that "owns" this record accessor.
|
||||
* @param impl Borrowed pointer to the underlying implementation.
|
||||
*/
|
||||
RecordAccessor(AddressT address, database::GraphDbAccessor &db_accessor,
|
||||
Impl *impl);
|
||||
RecordAccessor(AddressT address, database::GraphDbAccessor &db_accessor);
|
||||
|
||||
public:
|
||||
/** Gets the property for the given key. */
|
||||
@ -197,6 +170,8 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
||||
/** Process a change delta, e.g. by writing WAL. */
|
||||
void ProcessDelta(const database::StateDelta &delta) const;
|
||||
|
||||
void SendDelta(const database::StateDelta &delta) const;
|
||||
|
||||
/**
|
||||
* Pointer to the version (either old_ or new_) that READ operations
|
||||
* in the accessor should take data from. Note that WRITE operations
|
||||
@ -212,7 +187,6 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
||||
const TRecord ¤t() const;
|
||||
|
||||
private:
|
||||
Impl *impl_;
|
||||
// The database accessor for which this record accessor is created
|
||||
// Provides means of getting to the transaction and database functions.
|
||||
// Immutable, set in the constructor and never changed.
|
||||
|
@ -8,8 +8,7 @@
|
||||
|
||||
VertexAccessor::VertexAccessor(VertexAddress address,
|
||||
database::GraphDbAccessor &db_accessor)
|
||||
: RecordAccessor(address, db_accessor, db_accessor.GetVertexImpl()),
|
||||
impl_(db_accessor.GetVertexImpl()) {
|
||||
: RecordAccessor(address, db_accessor) {
|
||||
Reconstruct();
|
||||
}
|
||||
|
||||
@ -18,11 +17,38 @@ size_t VertexAccessor::out_degree() const { return current().out_.size(); }
|
||||
size_t VertexAccessor::in_degree() const { return current().in_.size(); }
|
||||
|
||||
void VertexAccessor::add_label(storage::Label label) {
|
||||
return impl_->AddLabel(*this, label);
|
||||
auto &dba = db_accessor();
|
||||
auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(),
|
||||
label, dba.LabelName(label));
|
||||
auto &vertex = update();
|
||||
// not a duplicate label, add it
|
||||
if (!utils::Contains(vertex.labels_, label)) {
|
||||
vertex.labels_.emplace_back(label);
|
||||
if (is_local()) {
|
||||
dba.wal().Emplace(delta);
|
||||
dba.UpdateLabelIndices(label, *this, &vertex);
|
||||
}
|
||||
}
|
||||
|
||||
if (!is_local()) SendDelta(delta);
|
||||
}
|
||||
|
||||
void VertexAccessor::remove_label(storage::Label label) {
|
||||
return impl_->RemoveLabel(*this, label);
|
||||
auto &dba = db_accessor();
|
||||
auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(),
|
||||
label, dba.LabelName(label));
|
||||
Vertex &vertex = update();
|
||||
if (utils::Contains(vertex.labels_, label)) {
|
||||
auto &labels = vertex.labels_;
|
||||
auto found = std::find(labels.begin(), labels.end(), delta.label);
|
||||
std::swap(*found, labels.back());
|
||||
labels.pop_back();
|
||||
if (is_local()) {
|
||||
dba.wal().Emplace(delta);
|
||||
}
|
||||
}
|
||||
|
||||
if (!is_local()) SendDelta(delta);
|
||||
}
|
||||
|
||||
bool VertexAccessor::has_label(storage::Label label) const {
|
||||
|
@ -50,15 +50,6 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
|
||||
}
|
||||
|
||||
public:
|
||||
/** Like RecordAccessor::Impl with addition of Vertex specific methods. */
|
||||
class Impl : public RecordAccessor<Vertex>::Impl {
|
||||
public:
|
||||
virtual void AddLabel(const VertexAccessor &va,
|
||||
const storage::Label &label) = 0;
|
||||
virtual void RemoveLabel(const VertexAccessor &va,
|
||||
const storage::Label &label) = 0;
|
||||
};
|
||||
|
||||
VertexAccessor(VertexAddress address, database::GraphDbAccessor &db_accessor);
|
||||
|
||||
/** Returns the number of outgoing edges. */
|
||||
@ -154,9 +145,6 @@ class VertexAccessor final : public RecordAccessor<Vertex> {
|
||||
* this operation should always be accompanied by the removal of the edge from
|
||||
* the outgoing edges on the other side and edge deletion. */
|
||||
void RemoveInEdge(storage::EdgeAddress edge);
|
||||
|
||||
private:
|
||||
Impl *impl_{nullptr};
|
||||
};
|
||||
|
||||
std::ostream &operator<<(std::ostream &, const VertexAccessor &);
|
||||
|
Loading…
Reference in New Issue
Block a user