Add pure interface class to RecordAccessor
Summary: Since RecordAccessor is often instantiated and copied, using a factory-like function to allocate concrete types on the heap would be too costly. The approach in this diff uses a Strategy pattern (see "Design Patterns" by Gamma et al.), where the Strategy interface is given as RecordAccessor::Impl. Concrete implementations are then created for each GraphDb. This allows us to instantiate the concrete RecordAccessors::Impl *once* and *share* it among all RecordAccessors. Reviewers: msantl, vkasljevic Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1510
This commit is contained in:
parent
5d73c64fd8
commit
02e7cbf16c
@ -21,6 +21,8 @@
|
|||||||
#include "distributed/updates_rpc_clients.hpp"
|
#include "distributed/updates_rpc_clients.hpp"
|
||||||
#include "distributed/updates_rpc_server.hpp"
|
#include "distributed/updates_rpc_server.hpp"
|
||||||
#include "durability/snapshooter.hpp"
|
#include "durability/snapshooter.hpp"
|
||||||
|
// TODO: Why do we depend on query here?
|
||||||
|
#include "query/exceptions.hpp"
|
||||||
#include "storage/concurrent_id_mapper.hpp"
|
#include "storage/concurrent_id_mapper.hpp"
|
||||||
#include "storage/concurrent_id_mapper_master.hpp"
|
#include "storage/concurrent_id_mapper_master.hpp"
|
||||||
#include "storage/concurrent_id_mapper_worker.hpp"
|
#include "storage/concurrent_id_mapper_worker.hpp"
|
||||||
@ -31,25 +33,230 @@ using namespace std::literals::chrono_literals;
|
|||||||
|
|
||||||
namespace database {
|
namespace database {
|
||||||
|
|
||||||
// Accessors
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// RecordAccessors implementations
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
// RecordAccessor implementation is shared among different RecordAccessors to
|
||||||
|
// avoid heap allocations. Therefore, we are constructing this implementation in
|
||||||
|
// each DistributedGraphDb and pass it to DistributedAccessor.
|
||||||
|
template <class TRecord>
|
||||||
|
class DistributedRecordAccessor final {
|
||||||
|
// These should never be changed, because this implementation may be shared
|
||||||
|
// among multiple RecordAccessors.
|
||||||
|
int worker_id_;
|
||||||
|
distributed::DataManager *data_manager_;
|
||||||
|
distributed::UpdatesRpcClients *updates_clients_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
DistributedRecordAccessor(int worker_id,
|
||||||
|
distributed::DataManager *data_manager,
|
||||||
|
distributed::UpdatesRpcClients *updates_clients)
|
||||||
|
: worker_id_(worker_id),
|
||||||
|
data_manager_(data_manager),
|
||||||
|
updates_clients_(updates_clients) {
|
||||||
|
CHECK(data_manager_ && updates_clients_);
|
||||||
|
}
|
||||||
|
|
||||||
|
typename RecordAccessor<TRecord>::AddressT GlobalAddress(
|
||||||
|
const RecordAccessor<TRecord> &record_accessor) {
|
||||||
|
return record_accessor.is_local()
|
||||||
|
? storage::Address<mvcc::VersionList<TRecord>>(
|
||||||
|
record_accessor.gid(), worker_id_)
|
||||||
|
: record_accessor.address();
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetOldNew(const RecordAccessor<TRecord> &record_accessor, TRecord **old,
|
||||||
|
TRecord **newr) {
|
||||||
|
auto &dba = record_accessor.db_accessor();
|
||||||
|
const auto &address = record_accessor.address();
|
||||||
|
if (record_accessor.is_local()) {
|
||||||
|
address.local()->find_set_old_new(dba.transaction(), *old, *newr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// It's not possible that we have a global address for a graph element
|
||||||
|
// that's local, because that is resolved in the constructor.
|
||||||
|
// TODO in write queries it's possible the command has been advanced and
|
||||||
|
// we need to invalidate the Cache and really get the latest stuff.
|
||||||
|
// But only do that after the command has been advanced.
|
||||||
|
auto &cache =
|
||||||
|
data_manager_->template Elements<TRecord>(dba.transaction_id());
|
||||||
|
cache.FindSetOldNew(dba.transaction().id_, address.worker_id(),
|
||||||
|
address.gid(), *old, *newr);
|
||||||
|
}
|
||||||
|
|
||||||
|
TRecord *FindNew(const RecordAccessor<TRecord> &record_accessor) {
|
||||||
|
const auto &address = record_accessor.address();
|
||||||
|
auto &dba = record_accessor.db_accessor();
|
||||||
|
if (address.is_local()) {
|
||||||
|
return address.local()->update(dba.transaction());
|
||||||
|
}
|
||||||
|
auto &cache =
|
||||||
|
data_manager_->template Elements<TRecord>(dba.transaction_id());
|
||||||
|
return cache.FindNew(address.gid());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ProcessDelta(const RecordAccessor<TRecord> &record_accessor,
|
||||||
|
const database::StateDelta &delta) {
|
||||||
|
if (record_accessor.is_local()) {
|
||||||
|
record_accessor.db_accessor().wal().Emplace(delta);
|
||||||
|
} else {
|
||||||
|
SendDelta(record_accessor, delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SendDelta(const RecordAccessor<TRecord> &record_accessor,
|
||||||
|
const database::StateDelta &delta) {
|
||||||
|
auto result =
|
||||||
|
updates_clients_->Update(record_accessor.address().worker_id(), delta);
|
||||||
|
switch (result) {
|
||||||
|
case distributed::UpdateResult::DONE:
|
||||||
|
break;
|
||||||
|
case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
|
||||||
|
throw query::RemoveAttachedVertexException();
|
||||||
|
case distributed::UpdateResult::SERIALIZATION_ERROR:
|
||||||
|
throw mvcc::SerializationError();
|
||||||
|
case distributed::UpdateResult::UPDATE_DELETED_ERROR:
|
||||||
|
throw RecordDeletedError();
|
||||||
|
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
|
||||||
|
throw utils::LockTimeoutException("Lock timeout on remote worker");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class DistributedEdgeAccessor final : public ::RecordAccessor<Edge>::Impl {
|
||||||
|
DistributedRecordAccessor<Edge> distributed_accessor_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
DistributedEdgeAccessor(int worker_id, distributed::DataManager *data_manager,
|
||||||
|
distributed::UpdatesRpcClients *updates_clients)
|
||||||
|
: distributed_accessor_(worker_id, data_manager, updates_clients) {}
|
||||||
|
|
||||||
|
typename RecordAccessor<Edge>::AddressT GlobalAddress(
|
||||||
|
const RecordAccessor<Edge> &ra) override {
|
||||||
|
return distributed_accessor_.GlobalAddress(ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetOldNew(const RecordAccessor<Edge> &ra, Edge **old_record,
|
||||||
|
Edge **new_record) override {
|
||||||
|
return distributed_accessor_.SetOldNew(ra, old_record, new_record);
|
||||||
|
}
|
||||||
|
|
||||||
|
Edge *FindNew(const RecordAccessor<Edge> &ra) override {
|
||||||
|
return distributed_accessor_.FindNew(ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ProcessDelta(const RecordAccessor<Edge> &ra,
|
||||||
|
const database::StateDelta &delta) override {
|
||||||
|
return distributed_accessor_.ProcessDelta(ra, delta);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class DistributedVertexAccessor final : public ::VertexAccessor::Impl {
|
||||||
|
DistributedRecordAccessor<Vertex> distributed_accessor_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
DistributedVertexAccessor(int worker_id,
|
||||||
|
distributed::DataManager *data_manager,
|
||||||
|
distributed::UpdatesRpcClients *updates_clients)
|
||||||
|
: distributed_accessor_(worker_id, data_manager, updates_clients) {}
|
||||||
|
|
||||||
|
typename RecordAccessor<Vertex>::AddressT GlobalAddress(
|
||||||
|
const RecordAccessor<Vertex> &ra) override {
|
||||||
|
return distributed_accessor_.GlobalAddress(ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetOldNew(const RecordAccessor<Vertex> &ra, Vertex **old_record,
|
||||||
|
Vertex **new_record) override {
|
||||||
|
return distributed_accessor_.SetOldNew(ra, old_record, new_record);
|
||||||
|
}
|
||||||
|
|
||||||
|
Vertex *FindNew(const RecordAccessor<Vertex> &ra) override {
|
||||||
|
return distributed_accessor_.FindNew(ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ProcessDelta(const RecordAccessor<Vertex> &ra,
|
||||||
|
const database::StateDelta &delta) override {
|
||||||
|
return distributed_accessor_.ProcessDelta(ra, delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AddLabel(const VertexAccessor &va,
|
||||||
|
const storage::Label &label) override {
|
||||||
|
auto &dba = va.db_accessor();
|
||||||
|
auto delta = StateDelta::AddLabel(dba.transaction_id(), va.gid(), label,
|
||||||
|
dba.LabelName(label));
|
||||||
|
Vertex &vertex = va.update();
|
||||||
|
// not a duplicate label, add it
|
||||||
|
if (!utils::Contains(vertex.labels_, label)) {
|
||||||
|
vertex.labels_.emplace_back(label);
|
||||||
|
if (va.is_local()) {
|
||||||
|
dba.wal().Emplace(delta);
|
||||||
|
dba.UpdateLabelIndices(label, va, &vertex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!va.is_local()) distributed_accessor_.SendDelta(va, delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RemoveLabel(const VertexAccessor &va,
|
||||||
|
const storage::Label &label) override {
|
||||||
|
auto &dba = va.db_accessor();
|
||||||
|
auto delta = StateDelta::RemoveLabel(dba.transaction_id(), va.gid(), label,
|
||||||
|
dba.LabelName(label));
|
||||||
|
Vertex &vertex = va.update();
|
||||||
|
if (utils::Contains(vertex.labels_, label)) {
|
||||||
|
auto &labels = vertex.labels_;
|
||||||
|
auto found = std::find(labels.begin(), labels.end(), delta.label);
|
||||||
|
std::swap(*found, labels.back());
|
||||||
|
labels.pop_back();
|
||||||
|
if (va.is_local()) {
|
||||||
|
dba.wal().Emplace(delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!va.is_local()) distributed_accessor_.SendDelta(va, delta);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// GraphDbAccessor implementations
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
class DistributedAccessor : public GraphDbAccessor {
|
class DistributedAccessor : public GraphDbAccessor {
|
||||||
distributed::UpdatesRpcClients *updates_clients_{nullptr};
|
distributed::UpdatesRpcClients *updates_clients_{nullptr};
|
||||||
distributed::DataManager *data_manager_{nullptr};
|
distributed::DataManager *data_manager_{nullptr};
|
||||||
|
// Shared implementations of record accessors.
|
||||||
|
DistributedVertexAccessor *vertex_accessor_;
|
||||||
|
DistributedEdgeAccessor *edge_accessor_;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
DistributedAccessor(DistributedGraphDb *db, tx::TransactionId tx_id)
|
DistributedAccessor(DistributedGraphDb *db, tx::TransactionId tx_id,
|
||||||
|
DistributedVertexAccessor *vertex_accessor,
|
||||||
|
DistributedEdgeAccessor *edge_accessor)
|
||||||
: GraphDbAccessor(*db, tx_id),
|
: GraphDbAccessor(*db, tx_id),
|
||||||
updates_clients_(&db->updates_clients()),
|
updates_clients_(&db->updates_clients()),
|
||||||
data_manager_(&db->data_manager()) {}
|
data_manager_(&db->data_manager()),
|
||||||
|
vertex_accessor_(vertex_accessor),
|
||||||
|
edge_accessor_(edge_accessor) {}
|
||||||
|
|
||||||
explicit DistributedAccessor(DistributedGraphDb *db)
|
DistributedAccessor(DistributedGraphDb *db,
|
||||||
|
DistributedVertexAccessor *vertex_accessor,
|
||||||
|
DistributedEdgeAccessor *edge_accessor)
|
||||||
: GraphDbAccessor(*db),
|
: GraphDbAccessor(*db),
|
||||||
updates_clients_(&db->updates_clients()),
|
updates_clients_(&db->updates_clients()),
|
||||||
data_manager_(&db->data_manager()) {}
|
data_manager_(&db->data_manager()),
|
||||||
|
vertex_accessor_(vertex_accessor),
|
||||||
|
edge_accessor_(edge_accessor) {}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
::VertexAccessor::Impl *GetVertexImpl() override { return vertex_accessor_; }
|
||||||
|
|
||||||
|
::RecordAccessor<Edge>::Impl *GetEdgeImpl() override {
|
||||||
|
return edge_accessor_;
|
||||||
|
}
|
||||||
|
|
||||||
bool RemoveVertex(VertexAccessor &vertex_accessor,
|
bool RemoveVertex(VertexAccessor &vertex_accessor,
|
||||||
bool check_empty = true) override {
|
bool check_empty = true) override {
|
||||||
if (!vertex_accessor.is_local()) {
|
if (!vertex_accessor.is_local()) {
|
||||||
@ -134,14 +341,18 @@ class MasterAccessor final : public DistributedAccessor {
|
|||||||
int worker_id_{0};
|
int worker_id_{0};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit MasterAccessor(Master *db,
|
MasterAccessor(Master *db, distributed::IndexRpcClients *index_rpc_clients,
|
||||||
distributed::IndexRpcClients *index_rpc_clients)
|
DistributedVertexAccessor *vertex_accessor,
|
||||||
: DistributedAccessor(db),
|
DistributedEdgeAccessor *edge_accessor)
|
||||||
|
: DistributedAccessor(db, vertex_accessor, edge_accessor),
|
||||||
index_rpc_clients_(index_rpc_clients),
|
index_rpc_clients_(index_rpc_clients),
|
||||||
worker_id_(db->WorkerId()) {}
|
worker_id_(db->WorkerId()) {}
|
||||||
|
|
||||||
MasterAccessor(Master *db, tx::TransactionId tx_id,
|
MasterAccessor(Master *db, tx::TransactionId tx_id,
|
||||||
distributed::IndexRpcClients *index_rpc_clients)
|
distributed::IndexRpcClients *index_rpc_clients,
|
||||||
: DistributedAccessor(db, tx_id),
|
DistributedVertexAccessor *vertex_accessor,
|
||||||
|
DistributedEdgeAccessor *edge_accessor)
|
||||||
|
: DistributedAccessor(db, tx_id, vertex_accessor, edge_accessor),
|
||||||
index_rpc_clients_(index_rpc_clients),
|
index_rpc_clients_(index_rpc_clients),
|
||||||
worker_id_(db->WorkerId()) {}
|
worker_id_(db->WorkerId()) {}
|
||||||
|
|
||||||
@ -198,9 +409,14 @@ class MasterAccessor final : public DistributedAccessor {
|
|||||||
|
|
||||||
class WorkerAccessor final : public DistributedAccessor {
|
class WorkerAccessor final : public DistributedAccessor {
|
||||||
public:
|
public:
|
||||||
explicit WorkerAccessor(Worker *db) : DistributedAccessor(db) {}
|
WorkerAccessor(Worker *db, DistributedVertexAccessor *vertex_accessor,
|
||||||
WorkerAccessor(Worker *db, tx::TransactionId tx_id)
|
DistributedEdgeAccessor *edge_accessor)
|
||||||
: DistributedAccessor(db, tx_id) {}
|
: DistributedAccessor(db, vertex_accessor, edge_accessor) {}
|
||||||
|
|
||||||
|
WorkerAccessor(Worker *db, tx::TransactionId tx_id,
|
||||||
|
DistributedVertexAccessor *vertex_accessor,
|
||||||
|
DistributedEdgeAccessor *edge_accessor)
|
||||||
|
: DistributedAccessor(db, tx_id, vertex_accessor, edge_accessor) {}
|
||||||
|
|
||||||
void BuildIndex(storage::Label, storage::Property) override {
|
void BuildIndex(storage::Label, storage::Property) override {
|
||||||
// TODO: Rethink BuildIndex API or inheritance. It's rather strange that a
|
// TODO: Rethink BuildIndex API or inheritance. It's rather strange that a
|
||||||
@ -211,7 +427,9 @@ class WorkerAccessor final : public DistributedAccessor {
|
|||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
// GraphDb implementations
|
// GraphDb implementations
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
namespace impl {
|
namespace impl {
|
||||||
|
|
||||||
@ -226,7 +444,9 @@ struct TypemapPack {
|
|||||||
TMapper<storage::Property> property;
|
TMapper<storage::Property> property;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
// Master
|
// Master
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
class Master {
|
class Master {
|
||||||
public:
|
public:
|
||||||
@ -239,6 +459,11 @@ class Master {
|
|||||||
durability::WriteAheadLog wal_{config_.worker_id,
|
durability::WriteAheadLog wal_{config_.worker_id,
|
||||||
config_.durability_directory,
|
config_.durability_directory,
|
||||||
config_.durability_enabled};
|
config_.durability_enabled};
|
||||||
|
// Shared implementations for all RecordAccessor in this Db.
|
||||||
|
DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_,
|
||||||
|
&updates_clients_};
|
||||||
|
DistributedVertexAccessor vertex_accessor_{config_.worker_id, &data_manager_,
|
||||||
|
&updates_clients_};
|
||||||
|
|
||||||
// TODO: Some things may depend on order of construction/destruction. We also
|
// TODO: Some things may depend on order of construction/destruction. We also
|
||||||
// have a lot of circular pointers among members. It would be a good idea to
|
// have a lot of circular pointers among members. It would be a good idea to
|
||||||
@ -265,7 +490,8 @@ class Master {
|
|||||||
distributed::DataRpcServer data_server_{self_, &server_};
|
distributed::DataRpcServer data_server_{self_, &server_};
|
||||||
distributed::DataRpcClients data_clients_{rpc_worker_clients_};
|
distributed::DataRpcClients data_clients_{rpc_worker_clients_};
|
||||||
distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_};
|
distributed::PlanDispatcher plan_dispatcher_{rpc_worker_clients_};
|
||||||
distributed::PullRpcClients pull_clients_{&rpc_worker_clients_, &data_manager_};
|
distributed::PullRpcClients pull_clients_{&rpc_worker_clients_,
|
||||||
|
&data_manager_};
|
||||||
distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_};
|
distributed::IndexRpcClients index_rpc_clients_{rpc_worker_clients_};
|
||||||
distributed::UpdatesRpcServer updates_server_{self_, &server_};
|
distributed::UpdatesRpcServer updates_server_{self_, &server_};
|
||||||
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
|
distributed::UpdatesRpcClients updates_clients_{rpc_worker_clients_};
|
||||||
@ -382,12 +608,15 @@ Master::~Master() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<GraphDbAccessor> Master::Access() {
|
std::unique_ptr<GraphDbAccessor> Master::Access() {
|
||||||
return std::make_unique<MasterAccessor>(this, &impl_->index_rpc_clients_);
|
return std::make_unique<MasterAccessor>(this, &impl_->index_rpc_clients_,
|
||||||
|
&impl_->vertex_accessor_,
|
||||||
|
&impl_->edge_accessor_);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<GraphDbAccessor> Master::Access(tx::TransactionId tx_id) {
|
std::unique_ptr<GraphDbAccessor> Master::Access(tx::TransactionId tx_id) {
|
||||||
return std::make_unique<MasterAccessor>(this, tx_id,
|
return std::make_unique<MasterAccessor>(
|
||||||
&impl_->index_rpc_clients_);
|
this, tx_id, &impl_->index_rpc_clients_, &impl_->vertex_accessor_,
|
||||||
|
&impl_->edge_accessor_);
|
||||||
}
|
}
|
||||||
|
|
||||||
Storage &Master::storage() { return *impl_->storage_; }
|
Storage &Master::storage() { return *impl_->storage_; }
|
||||||
@ -514,7 +743,9 @@ VertexAccessor InsertVertexIntoRemote(
|
|||||||
return VertexAccessor({gid, worker_id}, *dba);
|
return VertexAccessor({gid, worker_id}, *dba);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
// Worker
|
// Worker
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
namespace impl {
|
namespace impl {
|
||||||
|
|
||||||
@ -526,6 +757,11 @@ class Worker {
|
|||||||
durability::WriteAheadLog wal_{config_.worker_id,
|
durability::WriteAheadLog wal_{config_.worker_id,
|
||||||
config_.durability_directory,
|
config_.durability_directory,
|
||||||
config_.durability_enabled};
|
config_.durability_enabled};
|
||||||
|
// Shared implementations for all RecordAccessor in this Db.
|
||||||
|
DistributedEdgeAccessor edge_accessor_{config_.worker_id, &data_manager_,
|
||||||
|
&updates_clients_};
|
||||||
|
DistributedVertexAccessor vertex_accessor_{config_.worker_id, &data_manager_,
|
||||||
|
&updates_clients_};
|
||||||
|
|
||||||
explicit Worker(const Config &config, database::Worker *self)
|
explicit Worker(const Config &config, database::Worker *self)
|
||||||
: config_(config), self_(self) {
|
: config_(config), self_(self) {
|
||||||
@ -641,11 +877,13 @@ Worker::~Worker() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<GraphDbAccessor> Worker::Access() {
|
std::unique_ptr<GraphDbAccessor> Worker::Access() {
|
||||||
return std::make_unique<WorkerAccessor>(this);
|
return std::make_unique<WorkerAccessor>(this, &impl_->vertex_accessor_,
|
||||||
|
&impl_->edge_accessor_);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<GraphDbAccessor> Worker::Access(tx::TransactionId tx_id) {
|
std::unique_ptr<GraphDbAccessor> Worker::Access(tx::TransactionId tx_id) {
|
||||||
return std::make_unique<WorkerAccessor>(this, tx_id);
|
return std::make_unique<WorkerAccessor>(this, tx_id, &impl_->vertex_accessor_,
|
||||||
|
&impl_->edge_accessor_);
|
||||||
}
|
}
|
||||||
|
|
||||||
Storage &Worker::storage() { return *impl_->storage_; }
|
Storage &Worker::storage() { return *impl_->storage_; }
|
||||||
|
@ -14,7 +14,151 @@
|
|||||||
#include "utils/file.hpp"
|
#include "utils/file.hpp"
|
||||||
|
|
||||||
namespace database {
|
namespace database {
|
||||||
namespace impl {
|
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// RecordAccessor and GraphDbAccessor implementations
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
template <class TRecord>
|
||||||
|
class SingleNodeRecordAccessor final {
|
||||||
|
public:
|
||||||
|
typename RecordAccessor<TRecord>::AddressT GlobalAddress(
|
||||||
|
const RecordAccessor<TRecord> &record_accessor) {
|
||||||
|
// TODO: This is still coupled to distributed storage, albeit loosely.
|
||||||
|
int worker_id = 0;
|
||||||
|
CHECK(record_accessor.is_local());
|
||||||
|
return storage::Address<mvcc::VersionList<TRecord>>(record_accessor.gid(),
|
||||||
|
worker_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetOldNew(const RecordAccessor<TRecord> &record_accessor, TRecord **old,
|
||||||
|
TRecord **newr) {
|
||||||
|
auto &dba = record_accessor.db_accessor();
|
||||||
|
const auto &address = record_accessor.address();
|
||||||
|
CHECK(record_accessor.is_local());
|
||||||
|
address.local()->find_set_old_new(dba.transaction(), *old, *newr);
|
||||||
|
}
|
||||||
|
|
||||||
|
TRecord *FindNew(const RecordAccessor<TRecord> &record_accessor) {
|
||||||
|
const auto &address = record_accessor.address();
|
||||||
|
auto &dba = record_accessor.db_accessor();
|
||||||
|
CHECK(address.is_local());
|
||||||
|
return address.local()->update(dba.transaction());
|
||||||
|
}
|
||||||
|
|
||||||
|
void ProcessDelta(const RecordAccessor<TRecord> &record_accessor,
|
||||||
|
const database::StateDelta &delta) {
|
||||||
|
CHECK(record_accessor.is_local());
|
||||||
|
record_accessor.db_accessor().wal().Emplace(delta);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class VertexAccessorImpl final : public ::VertexAccessor::Impl {
|
||||||
|
SingleNodeRecordAccessor<Vertex> accessor_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
typename RecordAccessor<Vertex>::AddressT GlobalAddress(
|
||||||
|
const RecordAccessor<Vertex> &ra) override {
|
||||||
|
return accessor_.GlobalAddress(ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetOldNew(const RecordAccessor<Vertex> &ra, Vertex **old_record,
|
||||||
|
Vertex **new_record) override {
|
||||||
|
return accessor_.SetOldNew(ra, old_record, new_record);
|
||||||
|
}
|
||||||
|
|
||||||
|
Vertex *FindNew(const RecordAccessor<Vertex> &ra) override {
|
||||||
|
return accessor_.FindNew(ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ProcessDelta(const RecordAccessor<Vertex> &ra,
|
||||||
|
const database::StateDelta &delta) override {
|
||||||
|
return accessor_.ProcessDelta(ra, delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AddLabel(const VertexAccessor &va,
|
||||||
|
const storage::Label &label) override {
|
||||||
|
CHECK(va.is_local());
|
||||||
|
auto &dba = va.db_accessor();
|
||||||
|
auto delta = StateDelta::AddLabel(dba.transaction_id(), va.gid(), label,
|
||||||
|
dba.LabelName(label));
|
||||||
|
Vertex &vertex = va.update();
|
||||||
|
// not a duplicate label, add it
|
||||||
|
if (!utils::Contains(vertex.labels_, label)) {
|
||||||
|
vertex.labels_.emplace_back(label);
|
||||||
|
dba.wal().Emplace(delta);
|
||||||
|
dba.UpdateLabelIndices(label, va, &vertex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RemoveLabel(const VertexAccessor &va,
|
||||||
|
const storage::Label &label) override {
|
||||||
|
CHECK(va.is_local());
|
||||||
|
auto &dba = va.db_accessor();
|
||||||
|
auto delta = StateDelta::RemoveLabel(dba.transaction_id(), va.gid(), label,
|
||||||
|
dba.LabelName(label));
|
||||||
|
Vertex &vertex = va.update();
|
||||||
|
if (utils::Contains(vertex.labels_, label)) {
|
||||||
|
auto &labels = vertex.labels_;
|
||||||
|
auto found = std::find(labels.begin(), labels.end(), delta.label);
|
||||||
|
std::swap(*found, labels.back());
|
||||||
|
labels.pop_back();
|
||||||
|
dba.wal().Emplace(delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class EdgeAccessorImpl final : public ::RecordAccessor<Edge>::Impl {
|
||||||
|
SingleNodeRecordAccessor<Edge> accessor_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
typename RecordAccessor<Edge>::AddressT GlobalAddress(
|
||||||
|
const RecordAccessor<Edge> &ra) override {
|
||||||
|
return accessor_.GlobalAddress(ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetOldNew(const RecordAccessor<Edge> &ra, Edge **old_record,
|
||||||
|
Edge **new_record) override {
|
||||||
|
return accessor_.SetOldNew(ra, old_record, new_record);
|
||||||
|
}
|
||||||
|
|
||||||
|
Edge *FindNew(const RecordAccessor<Edge> &ra) override {
|
||||||
|
return accessor_.FindNew(ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ProcessDelta(const RecordAccessor<Edge> &ra,
|
||||||
|
const database::StateDelta &delta) override {
|
||||||
|
return accessor_.ProcessDelta(ra, delta);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class SingleNodeAccessor : public GraphDbAccessor {
|
||||||
|
// Shared implementations of record accessors.
|
||||||
|
static VertexAccessorImpl vertex_accessor_;
|
||||||
|
static EdgeAccessorImpl edge_accessor_;
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit SingleNodeAccessor(GraphDb &db) : GraphDbAccessor(db) {}
|
||||||
|
SingleNodeAccessor(GraphDb &db, tx::TransactionId tx_id)
|
||||||
|
: GraphDbAccessor(db, tx_id) {}
|
||||||
|
|
||||||
|
::VertexAccessor::Impl *GetVertexImpl() override { return &vertex_accessor_; }
|
||||||
|
|
||||||
|
::RecordAccessor<Edge>::Impl *GetEdgeImpl() override {
|
||||||
|
return &edge_accessor_;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
VertexAccessorImpl SingleNodeAccessor::vertex_accessor_;
|
||||||
|
EdgeAccessorImpl SingleNodeAccessor::edge_accessor_;
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// SingleNode GraphDb implementation
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
template <template <typename TId> class TMapper>
|
template <template <typename TId> class TMapper>
|
||||||
struct TypemapPack {
|
struct TypemapPack {
|
||||||
@ -27,6 +171,8 @@ struct TypemapPack {
|
|||||||
TMapper<storage::Property> property;
|
TMapper<storage::Property> property;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
namespace impl {
|
||||||
|
|
||||||
class SingleNode {
|
class SingleNode {
|
||||||
public:
|
public:
|
||||||
explicit SingleNode(const Config &config) : config_(config) {}
|
explicit SingleNode(const Config &config) : config_(config) {}
|
||||||
@ -119,13 +265,6 @@ SingleNode::~SingleNode() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class SingleNodeAccessor : public GraphDbAccessor {
|
|
||||||
public:
|
|
||||||
explicit SingleNodeAccessor(GraphDb &db) : GraphDbAccessor(db) {}
|
|
||||||
SingleNodeAccessor(GraphDb &db, tx::TransactionId tx_id)
|
|
||||||
: GraphDbAccessor(db, tx_id) {}
|
|
||||||
};
|
|
||||||
|
|
||||||
std::unique_ptr<GraphDbAccessor> SingleNode::Access() {
|
std::unique_ptr<GraphDbAccessor> SingleNode::Access() {
|
||||||
// NOTE: We are doing a heap allocation to allow polymorphism. If this poses
|
// NOTE: We are doing a heap allocation to allow polymorphism. If this poses
|
||||||
// performance issues, we may want to have a stack allocated GraphDbAccessor
|
// performance issues, we may want to have a stack allocated GraphDbAccessor
|
||||||
|
@ -39,6 +39,8 @@ class IndexCreationOnWorkerException : public utils::BasicException {
|
|||||||
class GraphDbAccessor {
|
class GraphDbAccessor {
|
||||||
// We need to make friends with this guys since they need to access private
|
// We need to make friends with this guys since they need to access private
|
||||||
// methods for updating indices.
|
// methods for updating indices.
|
||||||
|
// TODO: Rethink this, we have too much long-distance friendship complicating
|
||||||
|
// the code.
|
||||||
friend class ::RecordAccessor<Vertex>;
|
friend class ::RecordAccessor<Vertex>;
|
||||||
friend class ::VertexAccessor;
|
friend class ::VertexAccessor;
|
||||||
|
|
||||||
@ -59,6 +61,9 @@ class GraphDbAccessor {
|
|||||||
GraphDbAccessor &operator=(const GraphDbAccessor &other) = delete;
|
GraphDbAccessor &operator=(const GraphDbAccessor &other) = delete;
|
||||||
GraphDbAccessor &operator=(GraphDbAccessor &&other) = delete;
|
GraphDbAccessor &operator=(GraphDbAccessor &&other) = delete;
|
||||||
|
|
||||||
|
virtual ::VertexAccessor::Impl *GetVertexImpl() = 0;
|
||||||
|
virtual ::RecordAccessor<Edge>::Impl *GetEdgeImpl() = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new Vertex and returns an accessor to it. If the ID is
|
* Creates a new Vertex and returns an accessor to it. If the ID is
|
||||||
* provided, the created Vertex will have that local ID, and the ID counter
|
* provided, the created Vertex will have that local ID, and the ID counter
|
||||||
@ -600,6 +605,18 @@ class GraphDbAccessor {
|
|||||||
/* Returns a list of index names present in the database. */
|
/* Returns a list of index names present in the database. */
|
||||||
std::vector<std::string> IndexInfo() const;
|
std::vector<std::string> IndexInfo() const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insert this vertex into corresponding label and label+property (if it
|
||||||
|
* exists) index.
|
||||||
|
*
|
||||||
|
* @param label - label with which to insert vertex label record
|
||||||
|
* @param vertex_accessor - vertex_accessor to insert
|
||||||
|
* @param vertex - vertex record to insert
|
||||||
|
*/
|
||||||
|
void UpdateLabelIndices(storage::Label label,
|
||||||
|
const VertexAccessor &vertex_accessor,
|
||||||
|
const Vertex *const vertex);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
/** Called in `BuildIndex` after creating an index, but before populating. */
|
/** Called in `BuildIndex` after creating an index, but before populating. */
|
||||||
virtual void PostCreateIndex(const LabelPropertyIndex::Key &key) {}
|
virtual void PostCreateIndex(const LabelPropertyIndex::Key &key) {}
|
||||||
@ -638,18 +655,6 @@ class GraphDbAccessor {
|
|||||||
bool commited_{false};
|
bool commited_{false};
|
||||||
bool aborted_{false};
|
bool aborted_{false};
|
||||||
|
|
||||||
/**
|
|
||||||
* Insert this vertex into corresponding label and label+property (if it
|
|
||||||
* exists) index.
|
|
||||||
*
|
|
||||||
* @param label - label with which to insert vertex label record
|
|
||||||
* @param vertex_accessor - vertex_accessor to insert
|
|
||||||
* @param vertex - vertex record to insert
|
|
||||||
*/
|
|
||||||
void UpdateLabelIndices(storage::Label label,
|
|
||||||
const VertexAccessor &vertex_accessor,
|
|
||||||
const Vertex *const vertex);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Insert this vertex into corresponding any label + 'property' index.
|
* Insert this vertex into corresponding any label + 'property' index.
|
||||||
* @param property - vertex will be inserted into indexes which contain this
|
* @param property - vertex will be inserted into indexes which contain this
|
||||||
|
@ -4,6 +4,29 @@
|
|||||||
#include "storage/vertex_accessor.hpp"
|
#include "storage/vertex_accessor.hpp"
|
||||||
#include "utils/algorithm.hpp"
|
#include "utils/algorithm.hpp"
|
||||||
|
|
||||||
|
EdgeAccessor::EdgeAccessor(EdgeAddress address,
|
||||||
|
database::GraphDbAccessor &db_accessor)
|
||||||
|
: RecordAccessor(address, db_accessor, db_accessor.GetEdgeImpl()),
|
||||||
|
from_(nullptr),
|
||||||
|
to_(nullptr),
|
||||||
|
edge_type_() {
|
||||||
|
RecordAccessor::Reconstruct();
|
||||||
|
if (current_ != nullptr) {
|
||||||
|
from_ = current_->from_;
|
||||||
|
to_ = current_->to_;
|
||||||
|
edge_type_ = current_->edge_type_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
EdgeAccessor::EdgeAccessor(EdgeAddress address,
|
||||||
|
database::GraphDbAccessor &db_accessor,
|
||||||
|
VertexAddress from, VertexAddress to,
|
||||||
|
storage::EdgeType edge_type)
|
||||||
|
: RecordAccessor(address, db_accessor, db_accessor.GetEdgeImpl()),
|
||||||
|
from_(from),
|
||||||
|
to_(to),
|
||||||
|
edge_type_(edge_type) {}
|
||||||
|
|
||||||
storage::EdgeType EdgeAccessor::EdgeType() const { return edge_type_; }
|
storage::EdgeType EdgeAccessor::EdgeType() const { return edge_type_; }
|
||||||
|
|
||||||
VertexAccessor EdgeAccessor::from() const {
|
VertexAccessor EdgeAccessor::from() const {
|
||||||
|
@ -19,35 +19,22 @@ class VertexAccessor;
|
|||||||
* EdgeAccessor means that data does not have to be read from a random memory
|
* EdgeAccessor means that data does not have to be read from a random memory
|
||||||
* location, which is often a performance bottleneck in traversals.
|
* location, which is often a performance bottleneck in traversals.
|
||||||
*/
|
*/
|
||||||
class EdgeAccessor : public RecordAccessor<Edge> {
|
class EdgeAccessor final : public RecordAccessor<Edge> {
|
||||||
using EdgeAddress = storage::EdgeAddress;
|
using EdgeAddress = storage::EdgeAddress;
|
||||||
using VertexAddress = storage::VertexAddress;
|
using VertexAddress = storage::VertexAddress;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/** Constructor that reads data from the random memory location (lower
|
/** Constructor that reads data from the random memory location (lower
|
||||||
* performance, see class docs). */
|
* performance, see class docs). */
|
||||||
EdgeAccessor(EdgeAddress address, database::GraphDbAccessor &db_accessor)
|
EdgeAccessor(EdgeAddress address, database::GraphDbAccessor &db_accessor);
|
||||||
: RecordAccessor(address, db_accessor),
|
|
||||||
from_(nullptr),
|
|
||||||
to_(nullptr),
|
|
||||||
edge_type_() {
|
|
||||||
RecordAccessor::Reconstruct();
|
|
||||||
if (current_ != nullptr) {
|
|
||||||
from_ = current_->from_;
|
|
||||||
to_ = current_->to_;
|
|
||||||
edge_type_ = current_->edge_type_;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Constructor that does NOT data from the random memory location (better
|
/**
|
||||||
* performance, see class docs). */
|
* Constructor that does NOT read data from the random memory location
|
||||||
|
* (better performance, see class docs).
|
||||||
|
*/
|
||||||
EdgeAccessor(EdgeAddress address, database::GraphDbAccessor &db_accessor,
|
EdgeAccessor(EdgeAddress address, database::GraphDbAccessor &db_accessor,
|
||||||
VertexAddress from, VertexAddress to,
|
VertexAddress from, VertexAddress to,
|
||||||
storage::EdgeType edge_type)
|
storage::EdgeType edge_type);
|
||||||
: RecordAccessor(address, db_accessor),
|
|
||||||
from_(from),
|
|
||||||
to_(to),
|
|
||||||
edge_type_(edge_type) {}
|
|
||||||
|
|
||||||
storage::EdgeType EdgeType() const;
|
storage::EdgeType EdgeType() const;
|
||||||
|
|
||||||
|
@ -1,22 +1,20 @@
|
|||||||
#include "glog/logging.h"
|
#include "storage/record_accessor.hpp"
|
||||||
|
|
||||||
|
#include <glog/logging.h>
|
||||||
|
|
||||||
#include "database/distributed_graph_db.hpp"
|
|
||||||
#include "database/graph_db_accessor.hpp"
|
#include "database/graph_db_accessor.hpp"
|
||||||
#include "database/state_delta.hpp"
|
#include "database/state_delta.hpp"
|
||||||
#include "distributed/data_manager.hpp"
|
|
||||||
#include "distributed/updates_rpc_clients.hpp"
|
|
||||||
#include "query/exceptions.hpp"
|
|
||||||
#include "storage/edge.hpp"
|
#include "storage/edge.hpp"
|
||||||
#include "storage/record_accessor.hpp"
|
|
||||||
#include "storage/vertex.hpp"
|
#include "storage/vertex.hpp"
|
||||||
#include "utils/thread/sync.hpp"
|
|
||||||
|
|
||||||
using database::StateDelta;
|
using database::StateDelta;
|
||||||
|
|
||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
RecordAccessor<TRecord>::RecordAccessor(AddressT address,
|
RecordAccessor<TRecord>::RecordAccessor(AddressT address,
|
||||||
database::GraphDbAccessor &db_accessor)
|
database::GraphDbAccessor &db_accessor,
|
||||||
: db_accessor_(&db_accessor),
|
Impl *impl)
|
||||||
|
: impl_(impl),
|
||||||
|
db_accessor_(&db_accessor),
|
||||||
address_(db_accessor.db().storage().LocalizedAddressIfPossible(address)) {
|
address_(db_accessor.db().storage().LocalizedAddressIfPossible(address)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,17 +107,7 @@ typename RecordAccessor<TRecord>::AddressT RecordAccessor<TRecord>::address()
|
|||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
typename RecordAccessor<TRecord>::AddressT
|
typename RecordAccessor<TRecord>::AddressT
|
||||||
RecordAccessor<TRecord>::GlobalAddress() const {
|
RecordAccessor<TRecord>::GlobalAddress() const {
|
||||||
// TODO: Replace this with some other mechanism, i.e. virtual call.
|
return impl_->GlobalAddress(*this);
|
||||||
int worker_id = 0;
|
|
||||||
if (auto *distributed_db =
|
|
||||||
dynamic_cast<database::DistributedGraphDb *>(&db_accessor_->db())) {
|
|
||||||
worker_id = distributed_db->WorkerId();
|
|
||||||
} else {
|
|
||||||
CHECK(dynamic_cast<database::SingleNode *>(&db_accessor_->db()));
|
|
||||||
}
|
|
||||||
return is_local()
|
|
||||||
? storage::Address<mvcc::VersionList<TRecord>>(gid(), worker_id)
|
|
||||||
: address_;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
@ -150,27 +138,7 @@ RecordAccessor<TRecord> &RecordAccessor<TRecord>::SwitchOld() {
|
|||||||
|
|
||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
bool RecordAccessor<TRecord>::Reconstruct() const {
|
bool RecordAccessor<TRecord>::Reconstruct() const {
|
||||||
auto &dba = db_accessor();
|
impl_->SetOldNew(*this, &old_, &new_);
|
||||||
if (is_local()) {
|
|
||||||
address_.local()->find_set_old_new(dba.transaction(), old_, new_);
|
|
||||||
} else {
|
|
||||||
// It's not possible that we have a global address for a graph element
|
|
||||||
// that's local, because that is resolved in the constructor.
|
|
||||||
// TODO in write queries it's possible the command has been advanced and
|
|
||||||
// we need to invalidate the Cache and really get the latest stuff.
|
|
||||||
// But only do that after the command has been advanced.
|
|
||||||
distributed::DataManager *data_manager = nullptr;
|
|
||||||
// TODO: Replace this with virtual call or some other mechanism.
|
|
||||||
if (auto *distributed_db =
|
|
||||||
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
|
|
||||||
data_manager = &distributed_db->data_manager();
|
|
||||||
}
|
|
||||||
CHECK(data_manager);
|
|
||||||
auto &cache =
|
|
||||||
data_manager->template Elements<TRecord>(dba.transaction_id());
|
|
||||||
cache.FindSetOldNew(dba.transaction().id_, address_.worker_id(),
|
|
||||||
address_.gid(), old_, new_);
|
|
||||||
}
|
|
||||||
current_ = old_ ? old_ : new_;
|
current_ = old_ ? old_ : new_;
|
||||||
return old_ != nullptr || new_ != nullptr;
|
return old_ != nullptr || new_ != nullptr;
|
||||||
}
|
}
|
||||||
@ -192,20 +160,7 @@ TRecord &RecordAccessor<TRecord>::update() const {
|
|||||||
|
|
||||||
if (new_) return *new_;
|
if (new_) return *new_;
|
||||||
|
|
||||||
if (is_local()) {
|
new_ = impl_->FindNew(*this);
|
||||||
new_ = address_.local()->update(t);
|
|
||||||
} else {
|
|
||||||
distributed::DataManager *data_manager = nullptr;
|
|
||||||
// TODO: Replace this with virtual call or some other mechanism.
|
|
||||||
if (auto *distributed_db =
|
|
||||||
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
|
|
||||||
data_manager = &distributed_db->data_manager();
|
|
||||||
}
|
|
||||||
CHECK(data_manager);
|
|
||||||
auto &cache =
|
|
||||||
data_manager->template Elements<TRecord>(dba.transaction_id());
|
|
||||||
new_ = cache.FindNew(address_.gid());
|
|
||||||
}
|
|
||||||
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
|
DCHECK(new_ != nullptr) << "RecordAccessor.new_ is null after update";
|
||||||
return *new_;
|
return *new_;
|
||||||
}
|
}
|
||||||
@ -219,43 +174,10 @@ const TRecord &RecordAccessor<TRecord>::current() const {
|
|||||||
return *current_;
|
return *current_;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename TRecord>
|
|
||||||
void RecordAccessor<TRecord>::SendDelta(
|
|
||||||
const database::StateDelta &delta) const {
|
|
||||||
DCHECK(!is_local())
|
|
||||||
<< "Only a delta created on a remote accessor should be sent";
|
|
||||||
|
|
||||||
auto &dba = db_accessor();
|
|
||||||
distributed::UpdatesRpcClients *updates_clients = nullptr;
|
|
||||||
// TODO: Replace this with virtual call or some other mechanism.
|
|
||||||
if (auto *distributed_db =
|
|
||||||
dynamic_cast<database::DistributedGraphDb *>(&dba.db())) {
|
|
||||||
updates_clients = &distributed_db->updates_clients();
|
|
||||||
}
|
|
||||||
CHECK(updates_clients);
|
|
||||||
auto result = updates_clients->Update(address().worker_id(), delta);
|
|
||||||
switch (result) {
|
|
||||||
case distributed::UpdateResult::DONE:
|
|
||||||
break;
|
|
||||||
case distributed::UpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
|
|
||||||
throw query::RemoveAttachedVertexException();
|
|
||||||
case distributed::UpdateResult::SERIALIZATION_ERROR:
|
|
||||||
throw mvcc::SerializationError();
|
|
||||||
case distributed::UpdateResult::UPDATE_DELETED_ERROR:
|
|
||||||
throw RecordDeletedError();
|
|
||||||
case distributed::UpdateResult::LOCK_TIMEOUT_ERROR:
|
|
||||||
throw utils::LockTimeoutException("Lock timeout on remote worker");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
void RecordAccessor<TRecord>::ProcessDelta(
|
void RecordAccessor<TRecord>::ProcessDelta(
|
||||||
const database::StateDelta &delta) const {
|
const database::StateDelta &delta) const {
|
||||||
if (is_local()) {
|
impl_->ProcessDelta(*this, delta);
|
||||||
db_accessor().wal().Emplace(delta);
|
|
||||||
} else {
|
|
||||||
SendDelta(delta);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template class RecordAccessor<Vertex>;
|
template class RecordAccessor<Vertex>;
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
/** @file */
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "glog/logging.h"
|
#include "glog/logging.h"
|
||||||
@ -26,27 +27,33 @@ struct StateDelta;
|
|||||||
*/
|
*/
|
||||||
template <typename TRecord>
|
template <typename TRecord>
|
||||||
class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
||||||
protected:
|
public:
|
||||||
using AddressT = storage::Address<mvcc::VersionList<TRecord>>;
|
using AddressT = storage::Address<mvcc::VersionList<TRecord>>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The database::GraphDbAccessor is friend to this accessor so it can
|
* Interface for the underlying implementation of the record accessor.
|
||||||
* operate on it's data (mvcc version-list and the record itself).
|
* The RecordAccessor only borrows the pointer to the implementation, it does
|
||||||
* This is legitimate because database::GraphDbAccessor creates
|
* *not* own it. When a RecordAccessor is copied, so is the pointer but *not*
|
||||||
* RecordAccessors
|
* the implementation itself. This means that concrete Impl types need to be
|
||||||
* and is semantically their parent/owner. It is necessary because
|
* shareable among different accessors. To achieve that, it's best for derived
|
||||||
* the database::GraphDbAccessor handles insertions and deletions, and these
|
* Impl types to contain *no state*. The reason we are using this approach is
|
||||||
* operations modify data intensively.
|
* to prevent large amounts of allocations, because RecordAccessor are often
|
||||||
|
* created and copied.
|
||||||
*/
|
*/
|
||||||
friend database::GraphDbAccessor;
|
class Impl {
|
||||||
|
public:
|
||||||
|
virtual ~Impl() {}
|
||||||
|
|
||||||
public:
|
virtual AddressT GlobalAddress(const RecordAccessor<TRecord> &ra) = 0;
|
||||||
/**
|
/** Set the pointers for old and new records during `Reconstruct`. */
|
||||||
* @param address Address (local or global) of the Vertex/Edge of this
|
virtual void SetOldNew(const RecordAccessor<TRecord> &ra,
|
||||||
* accessor.
|
TRecord **old_record, TRecord **new_record) = 0;
|
||||||
* @param db_accessor The DB accessor that "owns" this record accessor.
|
/** Find the pointer to the new, updated record. */
|
||||||
*/
|
virtual TRecord *FindNew(const RecordAccessor<TRecord> &ra) = 0;
|
||||||
RecordAccessor(AddressT address, database::GraphDbAccessor &db_accessor);
|
/** Process a change delta, e.g. by writing WAL. */
|
||||||
|
virtual void ProcessDelta(const RecordAccessor<TRecord> &ra,
|
||||||
|
const database::StateDelta &delta) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
// this class is default copyable, movable and assignable
|
// this class is default copyable, movable and assignable
|
||||||
RecordAccessor(const RecordAccessor &other) = default;
|
RecordAccessor(const RecordAccessor &other) = default;
|
||||||
@ -54,6 +61,25 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
|||||||
RecordAccessor &operator=(const RecordAccessor &other) = default;
|
RecordAccessor &operator=(const RecordAccessor &other) = default;
|
||||||
RecordAccessor &operator=(RecordAccessor &&other) = default;
|
RecordAccessor &operator=(RecordAccessor &&other) = default;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/**
|
||||||
|
* Protected destructor because we allow inheritance, but nobody should own a
|
||||||
|
* pointer to plain RecordAccessor.
|
||||||
|
*/
|
||||||
|
~RecordAccessor() = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only derived types may allow construction.
|
||||||
|
*
|
||||||
|
* @param address Address (local or global) of the Vertex/Edge of this
|
||||||
|
* accessor.
|
||||||
|
* @param db_accessor The DB accessor that "owns" this record accessor.
|
||||||
|
* @param impl Borrowed pointer to the underlying implementation.
|
||||||
|
*/
|
||||||
|
RecordAccessor(AddressT address, database::GraphDbAccessor &db_accessor,
|
||||||
|
Impl *impl);
|
||||||
|
|
||||||
|
public:
|
||||||
/** Gets the property for the given key. */
|
/** Gets the property for the given key. */
|
||||||
PropertyValue PropsAt(storage::Property key) const;
|
PropertyValue PropsAt(storage::Property key) const;
|
||||||
|
|
||||||
@ -83,7 +109,6 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
|||||||
|
|
||||||
AddressT address() const;
|
AddressT address() const;
|
||||||
|
|
||||||
// Returns an address which is global - composed of gid and worker_id
|
|
||||||
AddressT GlobalAddress() const;
|
AddressT GlobalAddress() const;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -146,6 +171,7 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
|||||||
(current_state && new_ && !new_->is_expired_by(t));
|
(current_state && new_ && !new_->is_expired_by(t));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: This shouldn't be here, because it's only relevant in distributed.
|
||||||
/** Indicates if this accessor represents a local Vertex/Edge, or one whose
|
/** Indicates if this accessor represents a local Vertex/Edge, or one whose
|
||||||
* owner is some other worker in a distributed system. */
|
* owner is some other worker in a distributed system. */
|
||||||
bool is_local() const { return address_.is_local(); }
|
bool is_local() const { return address_.is_local(); }
|
||||||
@ -159,13 +185,17 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
/**
|
/**
|
||||||
* Sends delta for remote processing.
|
* The database::GraphDbAccessor is friend to this accessor so it can
|
||||||
|
* operate on it's data (mvcc version-list and the record itself).
|
||||||
|
* This is legitimate because database::GraphDbAccessor creates
|
||||||
|
* RecordAccessors
|
||||||
|
* and is semantically their parent/owner. It is necessary because
|
||||||
|
* the database::GraphDbAccessor handles insertions and deletions, and these
|
||||||
|
* operations modify data intensively.
|
||||||
*/
|
*/
|
||||||
void SendDelta(const database::StateDelta &delta) const;
|
friend database::GraphDbAccessor;
|
||||||
|
|
||||||
/**
|
/** Process a change delta, e.g. by writing WAL. */
|
||||||
* Processes delta by either adding it to WAL, or by sending it remotely.
|
|
||||||
*/
|
|
||||||
void ProcessDelta(const database::StateDelta &delta) const;
|
void ProcessDelta(const database::StateDelta &delta) const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -183,6 +213,7 @@ class RecordAccessor : public utils::TotalOrdering<RecordAccessor<TRecord>> {
|
|||||||
const TRecord ¤t() const;
|
const TRecord ¤t() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
Impl *impl_;
|
||||||
// The database accessor for which this record accessor is created
|
// The database accessor for which this record accessor is created
|
||||||
// Provides means of getting to the transaction and database functions.
|
// Provides means of getting to the transaction and database functions.
|
||||||
// Immutable, set in the constructor and never changed.
|
// Immutable, set in the constructor and never changed.
|
||||||
|
@ -6,43 +6,23 @@
|
|||||||
#include "database/state_delta.hpp"
|
#include "database/state_delta.hpp"
|
||||||
#include "utils/algorithm.hpp"
|
#include "utils/algorithm.hpp"
|
||||||
|
|
||||||
|
VertexAccessor::VertexAccessor(VertexAddress address,
|
||||||
|
database::GraphDbAccessor &db_accessor)
|
||||||
|
: RecordAccessor(address, db_accessor, db_accessor.GetVertexImpl()),
|
||||||
|
impl_(db_accessor.GetVertexImpl()) {
|
||||||
|
Reconstruct();
|
||||||
|
}
|
||||||
|
|
||||||
size_t VertexAccessor::out_degree() const { return current().out_.size(); }
|
size_t VertexAccessor::out_degree() const { return current().out_.size(); }
|
||||||
|
|
||||||
size_t VertexAccessor::in_degree() const { return current().in_.size(); }
|
size_t VertexAccessor::in_degree() const { return current().in_.size(); }
|
||||||
|
|
||||||
void VertexAccessor::add_label(storage::Label label) {
|
void VertexAccessor::add_label(storage::Label label) {
|
||||||
auto &dba = db_accessor();
|
return impl_->AddLabel(*this, label);
|
||||||
auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(),
|
|
||||||
label, dba.LabelName(label));
|
|
||||||
Vertex &vertex = update();
|
|
||||||
// not a duplicate label, add it
|
|
||||||
if (!utils::Contains(vertex.labels_, label)) {
|
|
||||||
vertex.labels_.emplace_back(label);
|
|
||||||
if (is_local()) {
|
|
||||||
dba.wal().Emplace(delta);
|
|
||||||
dba.UpdateLabelIndices(label, *this, &vertex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!is_local()) SendDelta(delta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void VertexAccessor::remove_label(storage::Label label) {
|
void VertexAccessor::remove_label(storage::Label label) {
|
||||||
auto &dba = db_accessor();
|
return impl_->RemoveLabel(*this, label);
|
||||||
auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(),
|
|
||||||
label, dba.LabelName(label));
|
|
||||||
Vertex &vertex = update();
|
|
||||||
if (utils::Contains(vertex.labels_, label)) {
|
|
||||||
auto &labels = vertex.labels_;
|
|
||||||
auto found = std::find(labels.begin(), labels.end(), delta.label);
|
|
||||||
std::swap(*found, labels.back());
|
|
||||||
labels.pop_back();
|
|
||||||
if (is_local()) {
|
|
||||||
dba.wal().Emplace(delta);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!is_local()) SendDelta(delta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool VertexAccessor::has_label(storage::Label label) const {
|
bool VertexAccessor::has_label(storage::Label label) const {
|
||||||
|
@ -19,7 +19,7 @@
|
|||||||
* This class indirectly inherits MVCC data structures and
|
* This class indirectly inherits MVCC data structures and
|
||||||
* takes care of MVCC versioning.
|
* takes care of MVCC versioning.
|
||||||
*/
|
*/
|
||||||
class VertexAccessor : public RecordAccessor<Vertex> {
|
class VertexAccessor final : public RecordAccessor<Vertex> {
|
||||||
using VertexAddress = storage::Address<mvcc::VersionList<Vertex>>;
|
using VertexAddress = storage::Address<mvcc::VersionList<Vertex>>;
|
||||||
// Helper function for creating an iterator over edges.
|
// Helper function for creating an iterator over edges.
|
||||||
// @param begin - begin iterator
|
// @param begin - begin iterator
|
||||||
@ -49,10 +49,16 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
VertexAccessor(VertexAddress address, database::GraphDbAccessor &db_accessor)
|
/** Like RecordAccessor::Impl with addition of Vertex specific methods. */
|
||||||
: RecordAccessor(address, db_accessor) {
|
class Impl : public RecordAccessor<Vertex>::Impl {
|
||||||
Reconstruct();
|
public:
|
||||||
}
|
virtual void AddLabel(const VertexAccessor &va,
|
||||||
|
const storage::Label &label) = 0;
|
||||||
|
virtual void RemoveLabel(const VertexAccessor &va,
|
||||||
|
const storage::Label &label) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
VertexAccessor(VertexAddress address, database::GraphDbAccessor &db_accessor);
|
||||||
|
|
||||||
/** Returns the number of outgoing edges. */
|
/** Returns the number of outgoing edges. */
|
||||||
size_t out_degree() const;
|
size_t out_degree() const;
|
||||||
@ -147,6 +153,9 @@ class VertexAccessor : public RecordAccessor<Vertex> {
|
|||||||
* this operation should always be accompanied by the removal of the edge from
|
* this operation should always be accompanied by the removal of the edge from
|
||||||
* the outgoing edges on the other side and edge deletion. */
|
* the outgoing edges on the other side and edge deletion. */
|
||||||
void RemoveInEdge(storage::EdgeAddress edge);
|
void RemoveInEdge(storage::EdgeAddress edge);
|
||||||
|
|
||||||
|
private:
|
||||||
|
Impl *impl_{nullptr};
|
||||||
};
|
};
|
||||||
|
|
||||||
std::ostream &operator<<(std::ostream &, const VertexAccessor &);
|
std::ostream &operator<<(std::ostream &, const VertexAccessor &);
|
||||||
|
Loading…
Reference in New Issue
Block a user