Remove DistributedAccessor

Reviewers: teon.banek, msantl, ipaljak

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1867
This commit is contained in:
Vinko Kasljevic 2019-02-14 17:51:40 +01:00
parent 6f10b1c115
commit 10d4171348
3 changed files with 121 additions and 156 deletions

View File

@ -40,105 +40,7 @@ namespace {
//////////////////////////////////////////////////////////////////////
// GraphDbAccessor implementations
//////////////////////////////////////////////////////////////////////
class DistributedAccessor : public GraphDbAccessor {
distributed::UpdatesRpcClients *updates_clients_{nullptr};
distributed::DataManager *data_manager_{nullptr};
protected:
DistributedAccessor(GraphDb *db, tx::TransactionId tx_id)
: GraphDbAccessor(*db, tx_id),
updates_clients_(&db->updates_clients()),
data_manager_(&db->data_manager()) {}
explicit DistributedAccessor(GraphDb *db)
: GraphDbAccessor(*db),
updates_clients_(&db->updates_clients()),
data_manager_(&db->data_manager()) {}
public:
bool RemoveVertex(VertexAccessor &vertex_accessor,
bool check_empty = true) override {
if (!vertex_accessor.is_local()) {
auto address = vertex_accessor.address();
updates_clients_->RemoveVertex(address.worker_id(), transaction_id(),
address.gid(), check_empty);
// We can't know if we are going to be able to remove vertex until
// deferred updates on a remote worker are executed
return true;
}
return GraphDbAccessor::RemoveVertex(vertex_accessor, check_empty);
}
void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true,
bool remove_in_edge = true) override {
if (edge.is_local()) {
return GraphDbAccessor::RemoveEdge(edge, remove_out_edge, remove_in_edge);
}
auto edge_addr = edge.GlobalAddress();
auto from_addr = db().storage().GlobalizedAddress(edge.from_addr());
CHECK(edge_addr.worker_id() == from_addr.worker_id())
<< "Edge and it's 'from' vertex not on the same worker";
auto to_addr = db().storage().GlobalizedAddress(edge.to_addr());
updates_clients_->RemoveEdge(transaction_id(), edge_addr.worker_id(),
edge_addr.gid(), from_addr.gid(), to_addr);
// Another RPC is necessary only if the first did not handle vertices on
// both sides.
if (edge_addr.worker_id() != to_addr.worker_id()) {
updates_clients_->RemoveInEdge(transaction_id(), to_addr.worker_id(),
to_addr.gid(), edge_addr);
}
}
storage::EdgeAddress InsertEdgeOnFrom(
VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const std::experimental::optional<gid::Gid> &requested_gid,
const std::experimental::optional<int64_t> &cypher_id) override {
if (from->is_local()) {
return GraphDbAccessor::InsertEdgeOnFrom(from, to, edge_type,
requested_gid, cypher_id);
}
auto created_edge_info = updates_clients_->CreateEdge(
transaction_id(), *from, *to, edge_type, cypher_id);
auto edge_address = created_edge_info.edge_address;
auto *from_updated =
data_manager_->FindNew<Vertex>(transaction_id(), from->gid());
// Create an Edge and insert it into the Cache so we see it locally.
data_manager_->Emplace<Edge>(
transaction_id(), edge_address.gid(),
distributed::CachedRecordData<Edge>(
created_edge_info.cypher_id, nullptr,
std::make_unique<Edge>(from->address(), to->address(), edge_type)));
from_updated->out_.emplace(
db().storage().LocalizedAddressIfPossible(to->address()), edge_address,
edge_type);
return edge_address;
}
void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const storage::EdgeAddress &edge_address) override {
if (to->is_local()) {
return GraphDbAccessor::InsertEdgeOnTo(from, to, edge_type, edge_address);
}
// The RPC call for the `to` side is already handled if `from` is not
// local.
if (from->is_local() ||
from->address().worker_id() != to->address().worker_id()) {
updates_clients_->AddInEdge(
transaction_id(), *from,
db().storage().GlobalizedAddress(edge_address), *to, edge_type);
}
auto *to_updated =
data_manager_->FindNew<Vertex>(transaction_id(), to->gid());
to_updated->in_.emplace(
db().storage().LocalizedAddressIfPossible(from->address()),
edge_address, edge_type);
}
};
class MasterAccessor final : public DistributedAccessor {
class MasterAccessor final : public GraphDbAccessor {
distributed::Coordination *coordination_;
distributed::PullRpcClients *pull_clients_;
int worker_id_{0};
@ -146,7 +48,7 @@ class MasterAccessor final : public DistributedAccessor {
public:
MasterAccessor(Master *db, distributed::Coordination *coordination,
distributed::PullRpcClients *pull_clients_)
: DistributedAccessor(db),
: GraphDbAccessor(*db),
coordination_(coordination),
pull_clients_(pull_clients_),
worker_id_(db->WorkerId()) {}
@ -154,7 +56,7 @@ class MasterAccessor final : public DistributedAccessor {
MasterAccessor(Master *db, tx::TransactionId tx_id,
distributed::Coordination *coordination,
distributed::PullRpcClients *pull_clients_)
: DistributedAccessor(db, tx_id),
: GraphDbAccessor(*db, tx_id),
coordination_(coordination),
pull_clients_(pull_clients_),
worker_id_(db->WorkerId()) {}
@ -233,20 +135,20 @@ class MasterAccessor final : public DistributedAccessor {
// TODO (mferencevic): Move this logic into the transaction engine.
void AdvanceCommand() override {
DistributedAccessor::AdvanceCommand();
GraphDbAccessor::AdvanceCommand();
auto tx_id = transaction_id();
auto futures = pull_clients_->NotifyAllTransactionCommandAdvanced(tx_id);
for (auto &future : futures) future.get();
}
};
class WorkerAccessor final : public DistributedAccessor {
class WorkerAccessor final : public GraphDbAccessor {
public:
explicit WorkerAccessor(Worker *db)
: DistributedAccessor(db) {}
: GraphDbAccessor(*db) {}
WorkerAccessor(Worker *db, tx::TransactionId tx_id)
: DistributedAccessor(db, tx_id) {}
: GraphDbAccessor(*db, tx_id) {}
void BuildIndex(storage::Label, storage::Property, bool) override {
// TODO: Rethink BuildIndex API or inheritance. It's rather strange that a

View File

@ -5,6 +5,8 @@
#include <glog/logging.h>
#include "distributed/data_manager.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "durability/distributed/state_delta.hpp"
#include "storage/distributed/address_types.hpp"
#include "storage/distributed/edge.hpp"
@ -312,21 +314,31 @@ int64_t GraphDbAccessor::VerticesCount(
bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
bool check_empty) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
vertex_accessor.SwitchNew();
// it's possible the vertex was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
if (vertex_accessor.current().is_expired_by(transaction_)) return true;
if (check_empty &&
vertex_accessor.out_degree() + vertex_accessor.in_degree() > 0)
return false;
if (vertex_accessor.is_local()) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
vertex_accessor.SwitchNew();
// it's possible the vertex was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
if (vertex_accessor.current().is_expired_by(transaction_)) return true;
if (check_empty &&
vertex_accessor.out_degree() + vertex_accessor.in_degree() > 0)
return false;
auto *vlist_ptr = vertex_accessor.address().local();
wal().Emplace(database::StateDelta::RemoveVertex(
transaction_.id_, vlist_ptr->gid_, check_empty));
vlist_ptr->remove(vertex_accessor.current_, transaction_);
return true;
auto *vlist_ptr = vertex_accessor.address().local();
wal().Emplace(database::StateDelta::RemoveVertex(
transaction_.id_, vlist_ptr->gid_, check_empty));
vlist_ptr->remove(vertex_accessor.current_, transaction_);
return true;
} else {
auto address = vertex_accessor.address();
updates_clients().RemoveVertex(address.worker_id(), transaction_id(),
address.gid(), check_empty);
// We can't know if we are going to be able to remove vertex until
// deferred updates on a remote worker are executed
return true;
}
}
void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
@ -365,37 +377,71 @@ storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom(
const storage::EdgeType &edge_type,
const std::experimental::optional<gid::Gid> &requested_gid,
const std::experimental::optional<int64_t> &cypher_id) {
auto edge_accessor = InsertOnlyEdge(from->address(), to->address(), edge_type,
requested_gid, cypher_id);
auto edge_address = edge_accessor.address();
if (from->is_local()) {
auto edge_accessor = InsertOnlyEdge(from->address(), to->address(),
edge_type, requested_gid, cypher_id);
auto edge_address = edge_accessor.address();
from->SwitchNew();
auto from_updated = &from->update();
from->SwitchNew();
auto from_updated = &from->update();
// TODO when preparing WAL for distributed, most likely never use
// `CREATE_EDGE`, but always have it split into 3 parts (edge insertion,
// in/out modification).
wal().Emplace(database::StateDelta::CreateEdge(
transaction_.id_, edge_accessor.gid(), edge_accessor.CypherId(),
from->gid(), to->gid(), edge_type, EdgeTypeName(edge_type)));
// TODO when preparing WAL for distributed, most likely never use
// `CREATE_EDGE`, but always have it split into 3 parts (edge insertion,
// in/out modification).
wal().Emplace(database::StateDelta::CreateEdge(
transaction_.id_, edge_accessor.gid(), edge_accessor.CypherId(),
from->gid(), to->gid(), edge_type, EdgeTypeName(edge_type)));
from_updated->out_.emplace(
db_.storage().LocalizedAddressIfPossible(to->address()), edge_address,
edge_type);
return edge_address;
from_updated->out_.emplace(
db_.storage().LocalizedAddressIfPossible(to->address()), edge_address,
edge_type);
return edge_address;
} else {
auto created_edge_info = updates_clients().CreateEdge(
transaction_id(), *from, *to, edge_type, cypher_id);
auto edge_address = created_edge_info.edge_address;
auto *from_updated =
data_manager().FindNew<Vertex>(transaction_id(), from->gid());
// Create an Edge and insert it into the Cache so we see it locally.
data_manager().Emplace<Edge>(
transaction_id(), edge_address.gid(),
distributed::CachedRecordData<Edge>(
created_edge_info.cypher_id, nullptr,
std::make_unique<Edge>(from->address(), to->address(), edge_type)));
from_updated->out_.emplace(
db().storage().LocalizedAddressIfPossible(to->address()), edge_address,
edge_type);
return edge_address;
}
}
void GraphDbAccessor::InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const storage::EdgeAddress &edge_address) {
// Ensure that the "to" accessor has the latest version (switch new).
// WARNING: Must do that after the above "from->update()" for cases when
// we are creating a cycle and "from" and "to" are the same vlist.
to->SwitchNew();
auto *to_updated = &to->update();
to_updated->in_.emplace(
db_.storage().LocalizedAddressIfPossible(from->address()), edge_address,
edge_type);
if (to->is_local()) {
// Ensure that the "to" accessor has the latest version (switch new).
// WARNING: Must do that after the above "from->update()" for cases when
// we are creating a cycle and "from" and "to" are the same vlist.
to->SwitchNew();
auto *to_updated = &to->update();
to_updated->in_.emplace(
db_.storage().LocalizedAddressIfPossible(from->address()), edge_address,
edge_type);
} else {
// The RPC call for the `to` side is already handled if `from` is not
// local.
if (from->is_local() ||
from->address().worker_id() != to->address().worker_id()) {
updates_clients().AddInEdge(
transaction_id(), *from,
db().storage().GlobalizedAddress(edge_address), *to, edge_type);
}
auto *to_updated =
data_manager().FindNew<Vertex>(transaction_id(), to->gid());
to_updated->in_.emplace(
db().storage().LocalizedAddressIfPossible(from->address()),
edge_address, edge_type);
}
}
EdgeAccessor GraphDbAccessor::InsertOnlyEdge(
@ -427,17 +473,34 @@ int64_t GraphDbAccessor::EdgesCount() const {
void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
bool remove_in_edge) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// it's possible the edge was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
edge.SwitchNew();
if (edge.current().is_expired_by(transaction_)) return;
if (remove_out_edge) edge.from().RemoveOutEdge(edge.address());
if (remove_in_edge) edge.to().RemoveInEdge(edge.address());
if (edge.is_local()) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
// it's possible the edge was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
edge.SwitchNew();
if (edge.current().is_expired_by(transaction_)) return;
if (remove_out_edge) edge.from().RemoveOutEdge(edge.address());
if (remove_in_edge) edge.to().RemoveInEdge(edge.address());
edge.address().local()->remove(edge.current_, transaction_);
wal().Emplace(database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
edge.address().local()->remove(edge.current_, transaction_);
wal().Emplace(
database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
} else {
auto edge_addr = edge.GlobalAddress();
auto from_addr = db().storage().GlobalizedAddress(edge.from_addr());
CHECK(edge_addr.worker_id() == from_addr.worker_id())
<< "Edge and it's 'from' vertex not on the same worker";
auto to_addr = db().storage().GlobalizedAddress(edge.to_addr());
updates_clients().RemoveEdge(transaction_id(), edge_addr.worker_id(),
edge_addr.gid(), from_addr.gid(), to_addr);
// Another RPC is necessary only if the first did not handle vertices on
// both sides.
if (edge_addr.worker_id() != to_addr.worker_id()) {
updates_clients().RemoveInEdge(transaction_id(), to_addr.worker_id(),
to_addr.gid(), edge_addr);
}
}
}
storage::Label GraphDbAccessor::Label(const std::string &label_name) {

View File

@ -119,7 +119,7 @@ class GraphDbAccessor {
* before deletion.
* @return If or not the vertex was deleted.
*/
virtual bool RemoveVertex(VertexAccessor &vertex_accessor,
bool RemoveVertex(VertexAccessor &vertex_accessor,
bool check_empty = true);
/**
@ -358,7 +358,7 @@ class GraphDbAccessor {
* @param remove_in_edge If the edge should be removed from the its
* destination side.
*/
virtual void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true,
void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true,
bool remove_in_edge = true);
/**
@ -657,7 +657,7 @@ class GraphDbAccessor {
* Insert a new edge to `from` vertex and return the address.
* Called from `InsertEdge` as the first step in edge insertion.
* */
virtual storage::EdgeAddress InsertEdgeOnFrom(
storage::EdgeAddress InsertEdgeOnFrom(
VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const std::experimental::optional<gid::Gid> &requested_gid,
@ -668,7 +668,7 @@ class GraphDbAccessor {
* Called after `InsertEdgeOnFrom` in `InsertEdge`. The given `edge_address`
* is from the created edge, returned by `InsertEdgeOnFrom`.
*/
virtual void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
void InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to,
const storage::EdgeType &edge_type,
const storage::EdgeAddress &edge_address);