Remote edge with gid creation

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1402
This commit is contained in:
Dominik Gleich 2018-05-22 13:08:47 +02:00
parent 63f90a453d
commit adda7d1200
7 changed files with 43 additions and 22 deletions

View File

@ -414,7 +414,10 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
} else {
edge_address = db().updates_clients().CreateEdge(transaction_id(), from, to,
edge_type);
edge_type, requested_gid);
CHECK(!requested_gid || *requested_gid == edge_address.gid())
<< "Unable to assign requested edge gid";
from_updated = db().data_manager()
.Elements<Vertex>(transaction_id())

View File

@ -47,13 +47,15 @@ gid::Gid UpdatesRpcClients::CreateVertex(
storage::EdgeAddress UpdatesRpcClients::CreateEdge(
tx::TransactionId tx_id, VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type) {
storage::EdgeType edge_type,
std::experimental::optional<gid::Gid> requested_gid) {
CHECK(from.address().is_remote()) << "In CreateEdge `from` must be remote";
int from_worker = from.address().worker_id();
auto res = worker_clients_.GetClientPool(from_worker)
.Call<CreateEdgeRpc>(CreateEdgeReqData{
from.gid(), to.GlobalAddress(), edge_type, tx_id});
auto res =
worker_clients_.GetClientPool(from_worker)
.Call<CreateEdgeRpc>(CreateEdgeReqData{
from.gid(), to.GlobalAddress(), edge_type, tx_id, requested_gid});
CHECK(res) << "CreateEdge RPC failed on worker: " << from_worker;
RaiseIfRemoteError(res->member.result);
return {res->member.gid, from_worker};

View File

@ -34,14 +34,16 @@ class UpdatesRpcClients {
query::TypedValue> &properties,
std::experimental::optional<gid::Gid> requested_gid);
/// Creates an edge on the given worker and returns it's address. If the `to`
/// vertex is on the same worker as `from`, then all remote CRUD will be
/// handled by a call to this function. Otherwise a separate call to
/// `AddInEdge` might be necessary. Throws all the exceptions that can
/// occur remotely as a result of updating a vertex.
storage::EdgeAddress CreateEdge(tx::TransactionId tx_id, VertexAccessor &from,
VertexAccessor &to,
storage::EdgeType edge_type);
/// Creates an edge with gid equal to `requested gid` (if possible) on the
/// given worker and returns it's address. If the `to` vertex is on the same
/// worker as `from`, then all remote CRUD will be handled by a call to this
/// function. Otherwise a separate call to `AddInEdge` might be necessary.
/// Throws all the exceptions that can occur remotely as a result of updating
/// a vertex.
storage::EdgeAddress CreateEdge(
tx::TransactionId tx_id, VertexAccessor &from, VertexAccessor &to,
storage::EdgeType edge_type,
std::experimental::optional<gid::Gid> requested_gid);
/// Adds the edge with the given address to the `to` vertex as an incoming
/// edge. Only used when `to` is remote and not on the same worker as `from`.

View File

@ -95,6 +95,7 @@ struct CreateEdgeReqData {
storage::VertexAddress to;
storage::EdgeType edge_type;
tx::TransactionId tx_id;
std::experimental::optional<gid::Gid> requested_gid;
private:
friend class boost::serialization::access;
@ -105,6 +106,7 @@ struct CreateEdgeReqData {
ar &to;
ar &edge_type;
ar &tx_id;
ar &requested_gid;
}
};

View File

@ -74,12 +74,14 @@ gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
template <typename TRecordAccessor>
gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateEdge(
gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type) {
gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type,
std::experimental::optional<gid::Gid> requested_gid) {
auto &db = db_accessor_.db();
auto from_addr = db.storage().LocalizedAddressIfPossible(
storage::VertexAddress(from, db.WorkerId()));
auto to_addr = db.storage().LocalizedAddressIfPossible(to);
auto edge = db_accessor_.InsertOnlyEdge(from_addr, to_addr, edge_type);
auto edge =
db_accessor_.InsertOnlyEdge(from_addr, to_addr, edge_type, requested_gid);
std::lock_guard<SpinLock> guard{lock_};
deltas_.emplace(edge.gid(),
std::make_pair(edge, std::vector<database::StateDelta>{}));
@ -301,8 +303,9 @@ UpdatesRpcServer::TransactionUpdates<TAccessor> &UpdatesRpcServer::GetUpdates(
}
CreateResult UpdatesRpcServer::CreateEdge(const CreateEdgeReqData &req) {
auto gid = GetUpdates(edge_updates_, req.tx_id)
.CreateEdge(req.from, req.to, req.edge_type);
auto gid =
GetUpdates(edge_updates_, req.tx_id)
.CreateEdge(req.from, req.to, req.edge_type, req.requested_gid);
auto from_delta = database::StateDelta::AddOutEdge(
req.tx_id, req.from, req.to, {gid, db_.WorkerId()}, req.edge_type);

View File

@ -48,10 +48,12 @@ class UpdatesRpcServer {
&properties,
std::experimental::optional<gid::Gid> requested_gid);
/// Creates a new edge and returns it's gid. Does not update vertices at the
/// end of the edge.
/// Creates a new edge and returns it's gid (tries to create it with
/// requested gid if possible (tries to create it with requested gid if
/// possible)). Does not update vertices at the end of the edge.
gid::Gid CreateEdge(gid::Gid from, storage::VertexAddress to,
storage::EdgeType edge_type);
storage::EdgeType edge_type,
std::experimental::optional<gid::Gid> requested_gid);
/// Applies all the deltas on the record.
UpdateResult Apply();

View File

@ -378,15 +378,18 @@ class DistributedEdgeCreateTest : public DistributedGraphDbTest {
}
void CreateEdge(database::GraphDb &creator, storage::VertexAddress from_addr,
storage::VertexAddress to_addr) {
storage::VertexAddress to_addr,
std::experimental::optional<gid::Gid> requested_gid =
std::experimental::nullopt) {
CHECK(from_addr.is_remote() && to_addr.is_remote())
<< "Local address given to CreateEdge";
database::GraphDbAccessor dba{creator};
auto edge_type = dba.EdgeType("et");
VertexAccessor v1{from_addr, dba};
VertexAccessor v2{to_addr, dba};
auto edge = dba.InsertEdge(v1, v2, edge_type);
auto edge = dba.InsertEdge(v1, v2, edge_type, requested_gid);
e_ga = edge.GlobalAddress();
EXPECT_TRUE(!requested_gid || *requested_gid == e_ga.gid());
for (auto &kv : props) edge.PropsSet(dba.Property(kv.first), kv.second);
@ -483,6 +486,10 @@ TEST_F(DistributedEdgeCreateTest, RemoteRemoteCycle) {
CheckAll(w1_a, w1_a);
}
TEST_F(DistributedEdgeCreateTest, EdgeWithGid) {
CreateEdge(worker(1), w1_a, w2_a, 1337);
}
class DistributedEdgeRemoveTest : public DistributedGraphDbTest {
protected:
storage::VertexAddress from_addr;