Support distributed edge removal

Reviewers: dgleich

Reviewed By: dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1256
This commit is contained in:
florijan 2018-02-28 14:30:21 +01:00
parent 1c1da67745
commit e61bb8ab1e
20 changed files with 518 additions and 188 deletions

View File

@ -89,7 +89,7 @@ BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq);
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes);
// Remote creates
// Remote creates.
BOOST_CLASS_EXPORT(distributed::RemoteCreateResult);
BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReq);
BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReqData);
@ -101,6 +101,11 @@ BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReqData);
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeRes);
// Remote removes
// Remote removal.
BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexReq);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexRes);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveEdgeRes);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeData);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveInEdgeRes);

View File

@ -187,7 +187,8 @@ class Master : public PrivateBase {
distributed::RpcWorkerClients index_rpc_clients_{coordination_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteDataManager remote_data_manager_{remote_data_clients_};
distributed::RemoteDataManager remote_data_manager_{storage_,
remote_data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
};
@ -234,7 +235,8 @@ class Worker : public PrivateBase {
distributed::IndexRpcServer index_rpc_server_{*this, server_};
distributed::RemoteUpdatesRpcServer remote_updates_server_{*this, server_};
distributed::RemoteUpdatesRpcClients remote_updates_clients_{coordination_};
distributed::RemoteDataManager remote_data_manager_{remote_data_clients_};
distributed::RemoteDataManager remote_data_manager_{storage_,
remote_data_clients_};
distributed::TransactionalCacheCleaner cache_cleaner_{tx_engine_};
};

View File

@ -18,35 +18,6 @@
namespace database {
#define LOCALIZED_ADDRESS_SPECIALIZATION(type) \
template <> \
storage::type##Address GraphDbAccessor::LocalizedAddress( \
storage::type##Address address) const { \
if (address.is_local()) return address; \
if (address.worker_id() == db().WorkerId()) { \
return Local##type##Address(address.gid()); \
} \
return address; \
}
LOCALIZED_ADDRESS_SPECIALIZATION(Vertex)
LOCALIZED_ADDRESS_SPECIALIZATION(Edge)
#undef LOCALIZED_ADDRESS_SPECIALIZATION
#define GLOBALIZED_ADDRESS_SPECIALIZATION(type) \
template <> \
storage::type##Address GraphDbAccessor::GlobalizedAddress( \
storage::type##Address address) const { \
if (address.is_remote()) return address; \
return {address.local()->gid_, db().WorkerId()}; \
}
GLOBALIZED_ADDRESS_SPECIALIZATION(Vertex)
GLOBALIZED_ADDRESS_SPECIALIZATION(Edge)
#undef GLOBALIZED_ADDRESS_SPECIALIZATION
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
: db_(db),
transaction_(*db.tx_engine().Begin()),
@ -129,7 +100,8 @@ VertexAccessor GraphDbAccessor::InsertVertexIntoRemote(
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
gid::Gid gid, bool current_state) {
VertexAccessor record_accessor(LocalVertexAddress(gid), *this);
VertexAccessor record_accessor(db_.storage().LocalAddress<Vertex>(gid),
*this);
if (!record_accessor.Visible(transaction(), current_state))
return std::experimental::nullopt;
return record_accessor;
@ -144,7 +116,7 @@ VertexAccessor GraphDbAccessor::FindVertexChecked(gid::Gid gid,
std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge(
gid::Gid gid, bool current_state) {
EdgeAccessor record_accessor(LocalEdgeAddress(gid), *this);
EdgeAccessor record_accessor(db_.storage().LocalAddress<Edge>(gid), *this);
if (!record_accessor.Visible(transaction(), current_state))
return std::experimental::nullopt;
return record_accessor;
@ -467,7 +439,9 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
edge_address.gid(), nullptr,
std::make_unique<Edge>(from.address(), to.address(), edge_type));
}
from_updated->out_.emplace(to.address(), edge_address, edge_type);
from_updated->out_.emplace(
db_.storage().LocalizedAddressIfPossible(to.address()), edge_address,
edge_type);
Vertex *to_updated;
if (to.is_local()) {
@ -481,13 +455,15 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
if (from.is_local() ||
from.address().worker_id() != to.address().worker_id()) {
db().remote_updates_clients().RemoteAddInEdge(
transaction_id(), from, GlobalizedAddress(edge_address), to,
edge_type);
transaction_id(), from,
db().storage().GlobalizedAddress(edge_address), to, edge_type);
}
to_updated =
db().remote_data_manager().Vertices(transaction_id()).FindNew(to.gid());
}
to_updated->in_.emplace(from.address(), edge_address, edge_type);
to_updated->in_.emplace(
db_.storage().LocalizedAddressIfPossible(from.address()), edge_address,
edge_type);
return EdgeAccessor(edge_address, *this, from.address(), to.address(),
edge_type);
@ -514,27 +490,38 @@ int64_t GraphDbAccessor::EdgesCount() const {
return db_.storage().edges_.access().size();
}
void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge_accessor,
bool remove_from_from, bool remove_from_to) {
void GraphDbAccessor::RemoveEdge(EdgeAccessor &edge, bool remove_out_edge,
bool remove_in_edge) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
if (!edge_accessor.is_local()) {
LOG(ERROR) << "Remote edge deletion not implemented";
// TODO support distributed
// call remote RemoveEdge(gid, true, true). It can either succeed or an
// error can occur. See discussion in the RemoveVertex method above.
if (edge.is_local()) {
// it's possible the edge was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
edge.SwitchNew();
if (edge.current().is_expired_by(transaction_)) return;
if (remove_out_edge) edge.from().RemoveOutEdge(edge.address());
if (remove_in_edge) edge.to().RemoveInEdge(edge.address());
edge.address().local()->remove(edge.current_, transaction_);
wal().Emplace(
database::StateDelta::RemoveEdge(transaction_.id_, edge.gid()));
} else {
auto edge_addr = edge.GlobalAddress();
auto from_addr = db().storage().GlobalizedAddress(edge.from_addr());
CHECK(edge_addr.worker_id() == from_addr.worker_id())
<< "Edge and it's 'from' vertex not on the same worker";
auto to_addr = db().storage().GlobalizedAddress(edge.to_addr());
db().remote_updates_clients().RemoteRemoveEdge(
transaction_id(), edge_addr.worker_id(), edge_addr.gid(),
from_addr.gid(), to_addr);
// Another RPC is necessary only if the first did not handle vertices on
// both sides.
if (edge_addr.worker_id() != to_addr.worker_id()) {
db().remote_updates_clients().RemoteRemoveInEdge(
transaction_id(), to_addr.worker_id(), to_addr.gid(), edge_addr);
}
}
// it's possible the edge was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
edge_accessor.SwitchNew();
if (edge_accessor.current().is_expired_by(transaction_)) return;
if (remove_from_from)
edge_accessor.from().update().out_.RemoveEdge(edge_accessor.address());
if (remove_from_to)
edge_accessor.to().update().in_.RemoveEdge(edge_accessor.address());
edge_accessor.address().local()->remove(edge_accessor.current_, transaction_);
wal().Emplace(
database::StateDelta::RemoveEdge(transaction_.id_, edge_accessor.gid()));
}
storage::Label GraphDbAccessor::Label(const std::string &label_name) {
@ -589,20 +576,4 @@ std::vector<std::string> GraphDbAccessor::IndexInfo() const {
}
return info;
}
mvcc::VersionList<Vertex> *GraphDbAccessor::LocalVertexAddress(
gid::Gid gid) const {
auto access = db_.storage().vertices_.access();
auto found = access.find(gid);
CHECK(found != access.end()) << "Failed to find vertex for gid: " << gid;
return found->second;
}
mvcc::VersionList<Edge> *GraphDbAccessor::LocalEdgeAddress(gid::Gid gid) const {
auto access = db_.storage().edges_.access();
auto found = access.find(gid);
CHECK(found != access.end()) << "Failed to find edge for gid: " << gid;
return found->second;
}
} // namespace database

View File

@ -302,14 +302,14 @@ class GraphDbAccessor {
* edge both arguments should be `true`. `false` is only used when
* detach-deleting a vertex.
*
* @param edge_accessor The accessor to an edge.
* @param remove_from_from If the edge should be removed from the its origin
* @param edge The accessor to an edge.
* @param remove_out_edge If the edge should be removed from the its origin
* side.
* @param remove_from_to If the edge should be removed from the its
* @param remove_in_edge If the edge should be removed from the its
* destination side.
*/
void RemoveEdge(EdgeAccessor &edge_accessor, bool remove_from = true,
bool remove_to = true);
void RemoveEdge(EdgeAccessor &edge, bool remove_out_edge = true,
bool remove_in_edge = true);
/**
* Obtains the edge for the given ID. If there is no edge for the given
@ -563,22 +563,6 @@ class GraphDbAccessor {
/* Returns a list of index names present in the database. */
std::vector<std::string> IndexInfo() const;
/// Gets the local address for the given gid. Fails if not present.
mvcc::VersionList<Vertex> *LocalVertexAddress(gid::Gid gid) const;
/// Gets the local edge address for the given gid. Fails if not present.
mvcc::VersionList<Edge> *LocalEdgeAddress(gid::Gid gid) const;
/// Converts an address to local, if possible. Returns the same address if
/// not.
template <typename TAddress>
TAddress LocalizedAddress(TAddress address) const;
/// Converts local address to remote, returns remote as they are.
/// not.
template <typename TAddress>
TAddress GlobalizedAddress(TAddress address) const;
private:
GraphDb &db_;
tx::Transaction &transaction_;

View File

@ -54,6 +54,16 @@ StateDelta StateDelta::AddOutEdge(tx::transaction_id_t tx_id,
return op;
}
StateDelta StateDelta::RemoveOutEdge(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "WAL can only contain global addresses.";
StateDelta op(StateDelta::Type::REMOVE_OUT_EDGE, tx_id);
op.vertex_id = vertex_id;
op.edge_address = edge_address;
return op;
}
StateDelta StateDelta::AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_from_address,
storage::EdgeAddress edge_address,
@ -68,6 +78,16 @@ StateDelta StateDelta::AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
return op;
}
StateDelta StateDelta::RemoveInEdge(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote()) << "WAL can only contain global addresses.";
StateDelta op(StateDelta::Type::REMOVE_IN_EDGE, tx_id);
op.vertex_id = vertex_id;
op.edge_address = edge_address;
return op;
}
StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
storage::Property property,
@ -168,12 +188,20 @@ void StateDelta::Encode(
encoder.WriteInt(edge_address.raw());
encoder.WriteInt(edge_type.storage());
break;
case Type::REMOVE_OUT_EDGE:
encoder.WriteInt(vertex_id);
encoder.WriteInt(edge_address.raw());
break;
case Type::ADD_IN_EDGE:
encoder.WriteInt(vertex_id);
encoder.WriteInt(vertex_from_address.raw());
encoder.WriteInt(edge_address.raw());
encoder.WriteInt(edge_type.storage());
break;
case Type::REMOVE_IN_EDGE:
encoder.WriteInt(vertex_id);
encoder.WriteInt(edge_address.raw());
break;
case Type::SET_PROPERTY_VERTEX:
encoder.WriteInt(vertex_id);
encoder.WriteInt(property.storage());
@ -252,6 +280,10 @@ std::experimental::optional<StateDelta> StateDelta::Decode(
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
break;
case Type::REMOVE_OUT_EDGE:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
break;
case Type::ADD_IN_EDGE:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(vertex_from_address, ValueInt,
@ -259,6 +291,10 @@ std::experimental::optional<StateDelta> StateDelta::Decode(
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
break;
case Type::REMOVE_IN_EDGE:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
break;
case Type::SET_PROPERTY_VERTEX:
DECODE_MEMBER(vertex_id, ValueInt)
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
@ -328,8 +364,10 @@ void StateDelta::Apply(GraphDbAccessor &dba) const {
break;
}
case Type::ADD_OUT_EDGE:
case Type::REMOVE_OUT_EDGE:
case Type::ADD_IN_EDGE:
LOG(FATAL) << "Partial edge-creation not yet supported in Apply";
case Type::REMOVE_IN_EDGE:
LOG(FATAL) << "Partial edge creation/deletion not yet supported in Apply";
case Type::SET_PROPERTY_VERTEX: {
auto vertex = dba.FindVertex(vertex_id, true);
DCHECK(vertex) << "Failed to find vertex.";

View File

@ -29,11 +29,13 @@ struct StateDelta {
TRANSACTION_BEGIN,
TRANSACTION_COMMIT,
TRANSACTION_ABORT,
CREATE_VERTEX, // vertex_id
CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type,
// edge_type_name
ADD_OUT_EDGE, // vertex_id, edge_address, vertex_to_address, edge_type
ADD_IN_EDGE, // vertex_id, edge_address, vertex_from_address, edge_type
CREATE_VERTEX, // vertex_id
CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type,
// edge_type_name
ADD_OUT_EDGE, // vertex_id, edge_address, vertex_to_address, edge_type
REMOVE_OUT_EDGE, // vertex_id, edge_address
ADD_IN_EDGE, // vertex_id, edge_address, vertex_from_address, edge_type
REMOVE_IN_EDGE, // vertex_id, edge_address
SET_PROPERTY_VERTEX, // vertex_id, property, property_name, property_value
SET_PROPERTY_EDGE, // edge_id, property, property_name, property_value
// remove property is done by setting a PropertyValue::Null
@ -73,10 +75,15 @@ struct StateDelta {
storage::VertexAddress vertex_to_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type);
static StateDelta RemoveOutEdge(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address);
static StateDelta AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
storage::VertexAddress vertex_from_address,
storage::EdgeAddress edge_address,
storage::EdgeType edge_type);
static StateDelta RemoveInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address);
static StateDelta PropsSetVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id,
storage::Property property,

View File

@ -7,6 +7,7 @@
#include "database/indexes/key_index.hpp"
#include "database/indexes/label_property_index.hpp"
#include "mvcc/version_list.hpp"
#include "storage/address.hpp"
#include "storage/edge.hpp"
#include "storage/types.hpp"
#include "storage/vertex.hpp"
@ -29,7 +30,9 @@ namespace database {
class Storage {
public:
explicit Storage(int worker_id)
: vertex_generator_{worker_id}, edge_generator_{worker_id} {}
: worker_id_(worker_id),
vertex_generator_{worker_id},
edge_generator_{worker_id} {}
public:
~Storage() {
@ -47,6 +50,43 @@ class Storage {
gid::Generator &VertexGenerator() { return vertex_generator_; }
gid::Generator &EdgeGenerator() { return edge_generator_; }
/// Gets the local address for the given gid. Fails if not present.
template <typename TRecord>
mvcc::VersionList<TRecord> *LocalAddress(gid::Gid gid) const {
const auto &map = GetMap<TRecord>();
// TODO the access must be explicitly const due to a bug in in
// ConcurrentMap::Access::find
const auto access = map.access();
auto found = access.find(gid);
CHECK(found != access.end())
<< "Failed to find "
<< (std::is_same<TRecord, Vertex>::value ? "vertex" : "edge")
<< " for gid: " << gid;
return found->second;
}
/// Converts an address to local, if possible. Returns the same address if
/// not.
template <typename TRecord>
storage::Address<mvcc::VersionList<TRecord>> LocalizedAddressIfPossible(
storage::Address<mvcc::VersionList<TRecord>> address) const {
if (address.is_local()) return address;
if (address.worker_id() == worker_id_) {
return LocalAddress<TRecord>(address.gid());
}
return address;
}
/// Returns remote address for the given local or remote address.
template <typename TAddress>
TAddress GlobalizedAddress(TAddress address) const {
if (address.is_remote()) return address;
return {address.local()->gid_, worker_id_};
}
/// Gets the local edge address for the given gid. Fails if not present.
mvcc::VersionList<Edge> *LocalEdgeAddress(gid::Gid gid) const;
private:
friend class GraphDbAccessor;
friend class StorageGc;
@ -54,6 +94,7 @@ class Storage {
friend bool durability::Recover(const std::experimental::filesystem::path &,
database::GraphDb &);
int worker_id_;
gid::Generator vertex_generator_;
gid::Generator edge_generator_;
@ -67,5 +108,22 @@ class Storage {
// Set of transactions ids which are building indexes currently
ConcurrentSet<tx::transaction_id_t> index_build_tx_in_progress_;
/// Gets the Vertex/Edge main storage map.
template <typename TRecord>
const ConcurrentMap<gid::Gid, mvcc::VersionList<TRecord> *> &GetMap() const;
};
template <>
inline const ConcurrentMap<gid::Gid, mvcc::VersionList<Vertex> *>
&Storage::GetMap() const {
return vertices_;
}
template <>
inline const ConcurrentMap<gid::Gid, mvcc::VersionList<Edge> *>
&Storage::GetMap() const {
return edges_;
}
} // namespace database

View File

@ -5,8 +5,11 @@
#include "glog/logging.h"
#include "database/storage.hpp"
#include "distributed/remote_data_rpc_clients.hpp"
#include "storage/edge.hpp"
#include "storage/gid.hpp"
#include "storage/vertex.hpp"
#include "transactions/transaction.hpp"
namespace distributed {
@ -25,8 +28,9 @@ class RemoteCache {
using rec_uptr = std::unique_ptr<TRecord>;
public:
RemoteCache(distributed::RemoteDataRpcClients &remote_data_clients)
: remote_data_clients_(remote_data_clients) {}
RemoteCache(database::Storage &storage,
distributed::RemoteDataRpcClients &remote_data_clients)
: storage_(storage), remote_data_clients_(remote_data_clients) {}
/// Returns the new data for the given ID. Creates it (as copy of old) if
/// necessary.
@ -62,6 +66,7 @@ class RemoteCache {
auto remote =
remote_data_clients_.RemoteElement<TRecord>(worker_id, tx_id, gid);
LocalizeAddresses(*remote);
// This logic is a bit strange because we need to make sure that someone
// else didn't get a response and updated the cache before we did and we
@ -76,19 +81,11 @@ class RemoteCache {
new_record = it_pair.first->second.second.get();
}
void AdvanceCommand() {
// TODO implement.
// The effect of this should be that the next call to FindSetOldNew will
// do an RPC and not use the cached stuff.
//
// Not sure if it's OK to just flush the cache? I *think* that after a
// global advance-command, all the existing RecordAccessors will be
// calling Reconstruct, so perhaps just flushing is the correct sollution,
// even though we'll have pointers to nothing.
}
/** Sets the given records as (new, old) data for the given gid. */
void emplace(gid::Gid gid, rec_uptr old_record, rec_uptr new_record) {
if (old_record) LocalizeAddresses(*old_record);
if (new_record) LocalizeAddresses(*new_record);
std::lock_guard<std::mutex> guard{lock_};
// We can't replace existing data because some accessors might be using
// it.
@ -111,10 +108,34 @@ class RemoteCache {
}
private:
database::Storage &storage_;
std::mutex lock_;
distributed::RemoteDataRpcClients &remote_data_clients_;
// TODO it'd be better if we had VertexData and EdgeData in here, as opposed
// to Vertex and Edge.
std::unordered_map<gid::Gid, std::pair<rec_uptr, rec_uptr>> cache_;
// Localizes all the addresses in the record.
void LocalizeAddresses(TRecord &record);
};
template <>
inline void RemoteCache<Vertex>::LocalizeAddresses(Vertex &vertex) {
auto localize_edges = [this](auto &edges) {
for (auto &element : edges) {
element.vertex = storage_.LocalizedAddressIfPossible(element.vertex);
element.edge = storage_.LocalizedAddressIfPossible(element.edge);
}
};
localize_edges(vertex.in_.storage());
localize_edges(vertex.out_.storage());
}
template <>
inline void RemoteCache<Edge>::LocalizeAddresses(Edge &edge) {
edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_);
edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_);
}
} // namespace distributed

View File

@ -1,6 +1,7 @@
#pragma once
#include "data_structures/concurrent/concurrent_map.hpp"
#include "database/storage.hpp"
#include "distributed/remote_cache.hpp"
#include "distributed/remote_data_rpc_clients.hpp"
#include "storage/edge.hpp"
@ -19,14 +20,16 @@ class RemoteDataManager {
if (found != access.end()) return found->second;
return access
.emplace(tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(remote_data_clients_)))
.emplace(
tx_id, std::make_tuple(tx_id),
std::make_tuple(std::ref(storage_), std::ref(remote_data_clients_)))
.first->second;
}
public:
RemoteDataManager(distributed::RemoteDataRpcClients &remote_data_clients)
: remote_data_clients_(remote_data_clients) {}
RemoteDataManager(database::Storage &storage,
distributed::RemoteDataRpcClients &remote_data_clients)
: storage_(storage), remote_data_clients_(remote_data_clients) {}
/// Gets or creates the remote vertex cache for the given transaction.
auto &Vertices(tx::transaction_id_t tx_id) {
@ -66,6 +69,7 @@ class RemoteDataManager {
}
private:
database::Storage &storage_;
RemoteDataRpcClients &remote_data_clients_;
ConcurrentMap<tx::transaction_id_t, RemoteCache<Vertex>> vertices_caches_;
ConcurrentMap<tx::transaction_id_t, RemoteCache<Edge>> edges_caches_;

View File

@ -95,6 +95,34 @@ class RemoteUpdatesRpcClients {
RaiseIfRemoteError(res->member);
}
/// Removes an edge on another worker. This also handles the `from` vertex
/// outgoing edge, as that vertex is on the same worker as the edge. If the
/// `to` vertex is on the same worker, then that side is handled too by the
/// single RPC call, otherwise a separate call has to be made to
/// RemoteRemoveInEdge.
void RemoteRemoveEdge(tx::transaction_id_t tx_id, int worker_id,
gid::Gid edge_gid, gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveEdgeRpc>(
RemoteRemoveEdgeData{tx_id, edge_gid, vertex_from_id,
vertex_to_addr});
CHECK(res) << "RemoteRemoveEdge RPC failed";
RaiseIfRemoteError(res->member);
}
void RemoteRemoveInEdge(tx::transaction_id_t tx_id, int worker_id,
gid::Gid vertex_id,
storage::EdgeAddress edge_address) {
CHECK(edge_address.is_remote())
<< "RemoteRemoveInEdge edge_address is local.";
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveInEdgeRpc>(
RemoteRemoveInEdgeData{tx_id, vertex_id, edge_address});
CHECK(res) << "RemoteRemoveInEdge RPC failed";
RaiseIfRemoteError(res->member);
}
/// Calls for the worker with the given ID to apply remote updates. Returns
/// the results of that operation.
RemoteUpdateResult RemoteUpdateApply(int worker_id,

View File

@ -158,4 +158,51 @@ RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexRes, RemoteUpdateResult);
using RemoteRemoveVertexRpc =
communication::rpc::RequestResponse<RemoteRemoveVertexReq,
RemoteRemoveVertexRes>;
struct RemoteRemoveEdgeData {
tx::transaction_id_t tx_id;
gid::Gid edge_id;
gid::Gid vertex_from_id;
storage::VertexAddress vertex_to_address;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &tx_id;
ar &edge_id;
ar &vertex_from_id;
ar &vertex_to_address;
}
};
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveEdgeReq, RemoteRemoveEdgeData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveEdgeRes, RemoteUpdateResult);
using RemoteRemoveEdgeRpc =
communication::rpc::RequestResponse<RemoteRemoveEdgeReq,
RemoteRemoveEdgeRes>;
struct RemoteRemoveInEdgeData {
tx::transaction_id_t tx_id;
gid::Gid vertex;
storage::EdgeAddress edge_address;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &tx_id;
ar &vertex;
ar &edge_address;
}
};
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveInEdgeReq, RemoteRemoveInEdgeData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveInEdgeRes, RemoteUpdateResult);
using RemoteRemoveInEdgeRpc =
communication::rpc::RequestResponse<RemoteRemoveInEdgeReq,
RemoteRemoveInEdgeRes>;
} // namespace distributed

View File

@ -123,7 +123,6 @@ class RemoteUpdatesRpcServer {
case database::StateDelta::Type::TRANSACTION_ABORT:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::REMOVE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
LOG(FATAL) << "Can only apply record update deltas for remote "
"graph element";
@ -138,32 +137,47 @@ class RemoteUpdatesRpcServer {
record_accessor.PropsSet(delta.property, delta.value);
break;
case database::StateDelta::Type::ADD_LABEL:
// It is only possible that ADD_LABEL gets called on a
// VertexAccessor.
reinterpret_cast<VertexAccessor &>(record_accessor)
.add_label(delta.label);
break;
case database::StateDelta::Type::REMOVE_LABEL: {
// It is only possible that REMOVE_LABEL gets called on a
// VertexAccessor.
case database::StateDelta::Type::REMOVE_LABEL:
reinterpret_cast<VertexAccessor &>(record_accessor)
.remove_label(delta.label);
} break;
break;
case database::StateDelta::Type::ADD_OUT_EDGE:
reinterpret_cast<Vertex &>(record_accessor.update())
.out_.emplace(dba.LocalizedAddress(delta.vertex_to_address),
dba.LocalizedAddress(delta.edge_address),
.out_.emplace(dba.db().storage().LocalizedAddressIfPossible(
delta.vertex_to_address),
dba.db().storage().LocalizedAddressIfPossible(
delta.edge_address),
delta.edge_type);
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::ADD_IN_EDGE:
reinterpret_cast<Vertex &>(record_accessor.update())
.in_.emplace(
dba.LocalizedAddress(delta.vertex_from_address),
dba.LocalizedAddress(delta.edge_address),
delta.edge_type);
.in_.emplace(dba.db().storage().LocalizedAddressIfPossible(
delta.vertex_from_address),
dba.db().storage().LocalizedAddressIfPossible(
delta.edge_address),
delta.edge_type);
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::REMOVE_EDGE:
// We only remove the edge as a result of this StateDelta,
// because the removal of edge from vertex in/out is performed
// in REMOVE_[IN/OUT]_EDGE deltas.
db_accessor_.RemoveEdge(
reinterpret_cast<EdgeAccessor &>(record_accessor), false,
false);
break;
case database::StateDelta::Type::REMOVE_OUT_EDGE:
reinterpret_cast<VertexAccessor &>(record_accessor)
.RemoveOutEdge(delta.edge_address);
break;
case database::StateDelta::Type::REMOVE_IN_EDGE:
reinterpret_cast<VertexAccessor &>(record_accessor)
.RemoveInEdge(delta.edge_address);
break;
}
} catch (const mvcc::SerializationError &) {
return RemoteUpdateResult::SERIALIZATION_ERROR;
@ -202,6 +216,8 @@ class RemoteUpdatesRpcServer {
case DeltaType::SET_PROPERTY_VERTEX:
case DeltaType::ADD_LABEL:
case DeltaType::REMOVE_LABEL:
case database::StateDelta::Type::REMOVE_OUT_EDGE:
case database::StateDelta::Type::REMOVE_IN_EDGE:
return std::make_unique<RemoteUpdateRes>(
GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta));
case DeltaType::SET_PROPERTY_EDGE:
@ -261,6 +277,20 @@ class RemoteUpdatesRpcServer {
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteRemoveVertexRes>(result);
});
server.Register<RemoteRemoveEdgeRpc>(
[this](const RemoteRemoveEdgeReq &req) {
return std::make_unique<RemoteRemoveEdgeRes>(RemoveEdge(req.member));
});
server.Register<RemoteRemoveInEdgeRpc>(
[this](const RemoteRemoveInEdgeReq &req) {
auto data = req.member;
return std::make_unique<RemoteRemoveInEdgeRes>(
GetUpdates(vertex_updates_, data.tx_id)
.Emplace(database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex, data.edge_address)));
});
}
/// Applies all existsing updates for the given transaction ID. If there are
@ -324,16 +354,43 @@ class RemoteUpdatesRpcServer {
RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req) {
auto &dba = GetUpdates(edge_updates_, req.tx_id).db_accessor();
auto edge = dba.InsertOnlyEdge({req.from, db_.WorkerId()},
dba.LocalizedAddress(req.to), req.edge_type);
auto edge = dba.InsertOnlyEdge(
{req.from, db_.WorkerId()},
dba.db().storage().LocalizedAddressIfPossible(req.to), req.edge_type);
auto from_delta = database::StateDelta::AddOutEdge(
req.tx_id, req.from, req.to, dba.GlobalizedAddress(edge.address()),
req.edge_type);
req.tx_id, req.from, req.to,
dba.db().storage().GlobalizedAddress(edge.address()), req.edge_type);
auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta);
return {result, edge.gid()};
}
RemoteUpdateResult RemoveEdge(const RemoteRemoveEdgeData &data) {
// Edge removal.
auto deletion_delta =
database::StateDelta::RemoveEdge(data.tx_id, data.edge_id);
auto result = GetUpdates(edge_updates_, data.tx_id).Emplace(deletion_delta);
// Out-edge removal, for sure is local.
if (result == RemoteUpdateResult::DONE) {
auto remove_out_delta = database::StateDelta::RemoveOutEdge(
data.tx_id, data.vertex_from_id, {data.edge_id, db_.WorkerId()});
result =
GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_out_delta);
}
// In-edge removal, might not be local.
if (result == RemoteUpdateResult::DONE &&
data.vertex_to_address.worker_id() == db_.WorkerId()) {
auto remove_in_delta = database::StateDelta::RemoveInEdge(
data.tx_id, data.vertex_to_address.gid(),
{data.edge_id, db_.WorkerId()});
result = GetUpdates(vertex_updates_, data.tx_id).Emplace(remove_in_delta);
}
return result;
}
};
template <>

View File

@ -143,22 +143,24 @@ bool RecoverSnapshot(const fs::path &snapshot_file, database::GraphDb &db,
}
}
auto vertex_transform_to_local_if_possible = [&db, &dba](
storage::VertexAddress &address) {
if (address.is_local()) return;
// If the worker id matches it should be a local apperance
if (address.worker_id() == db.WorkerId()) {
address = storage::VertexAddress(dba.LocalVertexAddress(address.gid()));
CHECK(address.is_local()) << "Address should be local but isn't";
}
};
auto vertex_transform_to_local_if_possible =
[&db, &dba](storage::VertexAddress &address) {
if (address.is_local()) return;
// If the worker id matches it should be a local apperance
if (address.worker_id() == db.WorkerId()) {
address = storage::VertexAddress(
dba.db().storage().LocalAddress<Vertex>(address.gid()));
CHECK(address.is_local()) << "Address should be local but isn't";
}
};
auto edge_transform_to_local_if_possible =
[&db, &dba](storage::EdgeAddress &address) {
if (address.is_local()) return;
// If the worker id matches it should be a local apperance
if (address.worker_id() == db.WorkerId()) {
address = storage::EdgeAddress(dba.LocalEdgeAddress(address.gid()));
address = storage::EdgeAddress(
dba.db().storage().LocalAddress<Edge>(address.gid()));
CHECK(address.is_local()) << "Address should be local but isn't";
}
};

View File

@ -54,6 +54,9 @@ class EdgeAccessor : public RecordAccessor<Edge> {
/** Returns an accessor to the originating Vertex of this edge. */
VertexAccessor from() const;
/** Returns the address of the originating Vertex of this edge. */
auto from_addr() const { return from_; }
/** Checks if the given vertex is the source of this edge, without
* creating an additional accessor to perform the check. */
bool from_is(const VertexAccessor &v) const;
@ -61,6 +64,9 @@ class EdgeAccessor : public RecordAccessor<Edge> {
/** Returns an accessor to the destination Vertex of this edge. */
VertexAccessor to() const;
/** Returns the address of the destination Vertex of this edge. */
auto to_addr() const { return to_; }
/** Checks if the given vertex is the destination of this edge, without
* creating an additional accessor to perform the check. */
bool to_is(const VertexAccessor &v) const;

View File

@ -130,6 +130,8 @@ class Edges {
auto begin() const { return Iterator(storage_.begin()); }
auto end() const { return Iterator(storage_.end()); }
auto &storage() { return storage_; }
/**
* Creates a beginning iterator that will skip edges whose destination
* vertex is not equal to the given vertex.

View File

@ -15,7 +15,8 @@ template <typename TRecord>
RecordAccessor<TRecord>::RecordAccessor(AddressT address,
database::GraphDbAccessor &db_accessor)
: db_accessor_(&db_accessor),
address_(db_accessor.LocalizedAddress(address)) {}
address_(db_accessor.db().storage().LocalizedAddressIfPossible(address)) {
}
template <typename TRecord>
const PropertyValue &RecordAccessor<TRecord>::PropsAt(

View File

@ -54,16 +54,31 @@ const std::vector<storage::Label> &VertexAccessor::labels() const {
return this->current().labels_;
}
void VertexAccessor::RemoveOutEdge(storage::EdgeAddress edge) {
auto &dba = db_accessor();
auto delta = database::StateDelta::RemoveOutEdge(
dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge));
update().out_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));
ProcessDelta(delta);
}
void VertexAccessor::RemoveInEdge(storage::EdgeAddress edge) {
auto &dba = db_accessor();
auto delta = database::StateDelta::RemoveInEdge(
dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge));
update().in_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));
ProcessDelta(delta);
}
std::ostream &operator<<(std::ostream &os, const VertexAccessor &va) {
os << "V(";
utils::PrintIterable(os, va.labels(), ":", [&](auto &stream, auto label) {
stream << va.db_accessor().LabelName(label);
});
os << " {";
utils::PrintIterable(os, va.Properties(), ", ",
[&](auto &stream, const auto &pair) {
stream << va.db_accessor().PropertyName(pair.first)
<< ": " << pair.second;
});
utils::PrintIterable(os, va.Properties(), ", ", [&](auto &stream,
const auto &pair) {
stream << va.db_accessor().PropertyName(pair.first) << ": " << pair.second;
});
return os << "})";
}

View File

@ -137,6 +137,16 @@ class VertexAccessor : public RecordAccessor<Vertex> {
current().out_.end(), true, address(),
db_accessor());
}
/** Removes the given edge from the outgoing edges of this vertex. Note that
* this operation should always be accompanied by the removal of the edge from
* the incoming edges on the other side and edge deletion. */
void RemoveOutEdge(storage::EdgeAddress edge);
/** Removes the given edge from the incoming edges of this vertex. Note that
* this operation should always be accompanied by the removal of the edge from
* the outgoing edges on the other side and edge deletion. */
void RemoveInEdge(storage::EdgeAddress edge);
};
std::ostream &operator<<(std::ostream &, const VertexAccessor &);

View File

@ -5,6 +5,7 @@
#include "database/graph_db.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/remote_updates_rpc_server.hpp"
#include "storage/address_types.hpp"
#include "transactions/engine_master.hpp"
@ -76,37 +77,21 @@ class DistributedGraphDbTest : public ::testing::Test {
}
/// Inserts an edge (on the 'from' side) and returns it's global address.
auto InsertEdge(storage::VertexAddress from, storage::VertexAddress to,
auto InsertEdge(storage::VertexAddress from_addr,
storage::VertexAddress to_addr,
const std::string &edge_type_name) {
CHECK(from.is_remote() && to.is_remote())
CHECK(from_addr.is_remote() && to_addr.is_remote())
<< "Distributed test InsertEdge only takes global addresses";
auto db_for_vertex = [this](const auto &vertex) -> database::GraphDb & {
if (vertex.worker_id()) return worker(vertex.worker_id());
return master();
};
database::GraphDbAccessor dba(db_for_vertex(from));
auto from_v = dba.FindVertexChecked(from.gid(), false);
auto edge_type = dba.EdgeType(edge_type_name);
// If 'to' is on the same worker as 'from', create and send local.
if (to.worker_id() == from.worker_id()) {
auto to_v = dba.FindVertexChecked(to.gid(), false);
auto r_val = dba.InsertEdge(from_v, to_v, edge_type).GlobalAddress();
dba.Commit();
return r_val;
}
// 'to' is not on the same worker as 'from'
auto edge_ga = dba.InsertOnlyEdge(from, to, edge_type,
dba.db().storage().EdgeGenerator().Next())
.GlobalAddress();
from_v.update().out_.emplace(to, edge_ga, edge_type);
database::GraphDbAccessor dba_to(db_for_vertex(to), dba.transaction_id());
auto to_v = dba_to.FindVertexChecked(to.gid(), false);
to_v.update().in_.emplace(from, edge_ga, edge_type);
database::GraphDbAccessor dba{master()};
VertexAccessor from{from_addr, dba};
VertexAccessor to{to_addr, dba};
auto r_val =
dba.InsertEdge(from, to, dba.EdgeType(edge_type_name)).GlobalAddress();
master().remote_updates_server().Apply(dba.transaction_id());
worker(1).remote_updates_server().Apply(dba.transaction_id());
worker(2).remote_updates_server().Apply(dba.transaction_id());
dba.Commit();
return edge_ga;
return r_val;
}
auto VertexCount(database::GraphDb &db) {
@ -115,6 +100,12 @@ class DistributedGraphDbTest : public ::testing::Test {
return std::distance(vertices.begin(), vertices.end());
};
auto EdgeCount(database::GraphDb &db) {
database::GraphDbAccessor dba(db);
auto edges = dba.Edges(false);
return std::distance(edges.begin(), edges.end());
};
private:
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;

View File

@ -281,12 +281,6 @@ class DistributedEdgeCreateTest : public DistributedGraphDbTest {
dba.Commit();
}
int EdgeCount(database::GraphDb &db) {
database::GraphDbAccessor dba(db);
auto edges = dba.Edges(false);
return std::distance(edges.begin(), edges.end());
};
void CheckCounts(int master_count, int worker1_count, int worker2_count) {
EXPECT_EQ(EdgeCount(master()), master_count);
EXPECT_EQ(EdgeCount(worker(1)), worker1_count);
@ -342,8 +336,6 @@ class DistributedEdgeCreateTest : public DistributedGraphDbTest {
CheckState(worker(1), edge_worker == 1, from_addr, to_addr);
CheckState(worker(2), edge_worker == 2, from_addr, to_addr);
}
void TearDown() override { DistributedGraphDbTest::TearDown(); }
};
TEST_F(DistributedEdgeCreateTest, LocalRemote) {
@ -370,3 +362,92 @@ TEST_F(DistributedEdgeCreateTest, RemoteRemoteCycle) {
CreateEdge(master(), w1_a, w1_a);
CheckAll(w1_a, w1_a);
}
class DistributedEdgeRemoveTest : public DistributedGraphDbTest {
protected:
storage::VertexAddress from_addr;
storage::VertexAddress to_addr;
storage::EdgeAddress edge_addr;
void Create(database::GraphDb &from_db, database::GraphDb &to_db) {
from_addr = InsertVertex(from_db);
to_addr = InsertVertex(to_db);
edge_addr = InsertEdge(from_addr, to_addr, "edge_type");
}
void Delete(database::GraphDb &db) {
database::GraphDbAccessor dba{db};
EdgeAccessor edge{edge_addr, dba};
dba.RemoveEdge(edge);
master().remote_updates_server().Apply(dba.transaction_id());
worker(1).remote_updates_server().Apply(dba.transaction_id());
worker(2).remote_updates_server().Apply(dba.transaction_id());
dba.Commit();
}
template <typename TIterable>
auto Size(TIterable iterable) {
return std::distance(iterable.begin(), iterable.end());
};
void CheckCreation() {
auto wid = from_addr.worker_id();
ASSERT_TRUE(wid >= 0 && wid < 3);
ASSERT_EQ(EdgeCount(master()), wid == 0);
ASSERT_EQ(EdgeCount(worker(1)), wid == 1);
ASSERT_EQ(EdgeCount(worker(2)), wid == 2);
database::GraphDbAccessor dba{master()};
VertexAccessor from{from_addr, dba};
EXPECT_EQ(Size(from.out()), 1);
EXPECT_EQ(Size(from.in()), 0);
VertexAccessor to{to_addr, dba};
EXPECT_EQ(Size(to.out()), 0);
EXPECT_EQ(Size(to.in()), 1);
}
void CheckDeletion() {
EXPECT_EQ(EdgeCount(master()), 0);
EXPECT_EQ(EdgeCount(worker(1)), 0);
EXPECT_EQ(EdgeCount(worker(2)), 0);
database::GraphDbAccessor dba{master()};
VertexAccessor from{from_addr, dba};
EXPECT_EQ(Size(from.out()), 0);
EXPECT_EQ(Size(from.in()), 0);
VertexAccessor to{to_addr, dba};
EXPECT_EQ(Size(to.out()), 0);
EXPECT_EQ(Size(to.in()), 0);
}
};
TEST_F(DistributedEdgeRemoveTest, DifferentVertexOwnersRemoteDelete) {
Create(worker(1), worker(2));
CheckCreation();
Delete(master());
CheckDeletion();
}
TEST_F(DistributedEdgeRemoveTest, DifferentVertexOwnersFromDelete) {
Create(worker(1), worker(2));
CheckCreation();
Delete(worker(1));
CheckDeletion();
}
TEST_F(DistributedEdgeRemoveTest, DifferentVertexOwnersToDelete) {
Create(worker(1), worker(2));
CheckCreation();
Delete(worker(2));
CheckDeletion();
}
TEST_F(DistributedEdgeRemoveTest, SameVertexOwnersRemoteDelete) {
Create(worker(1), worker(1));
CheckCreation();
Delete(worker(2));
CheckDeletion();
}