Remote vertex with gid creation

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1401
This commit is contained in:
Dominik Gleich 2018-05-22 13:14:48 +02:00
parent 639e68cf1d
commit 7d161319f0
8 changed files with 52 additions and 30 deletions

View File

@ -79,13 +79,16 @@ VertexAccessor GraphDbAccessor::InsertVertex(
VertexAccessor GraphDbAccessor::InsertVertexIntoRemote(
int worker_id, const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
const std::unordered_map<storage::Property, query::TypedValue> &properties,
std::experimental::optional<gid::Gid> requested_gid) {
CHECK(worker_id != db().WorkerId())
<< "Not allowed to call InsertVertexIntoRemote for local worker";
gid::Gid gid = db().updates_clients().CreateVertex(
worker_id, transaction_id(), labels, properties);
worker_id, transaction_id(), labels, properties, requested_gid);
CHECK(!requested_gid || *requested_gid == gid)
<< "Unable to assign requested vertex gid";
auto vertex = std::make_unique<Vertex>();
vertex->labels_ = labels;

View File

@ -78,12 +78,15 @@ class GraphDbAccessor {
VertexAccessor InsertVertex(std::experimental::optional<gid::Gid>
requested_gid = std::experimental::nullopt);
/** Creates a new Vertex on the given worker. It is NOT allowed to call this
* function with this worker's id. */
/** Creates a new Vertex on the given worker with the `requested_gid` if
* specified. It is NOT allowed to call this function with this worker's id.
*/
VertexAccessor InsertVertexIntoRemote(
int worker_id, const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties);
&properties,
std::experimental::optional<gid::Gid> requested_gid =
std::experimental::nullopt);
/**
* Removes the vertex of the given accessor. If the vertex has any outgoing or

View File

@ -23,7 +23,7 @@ void RaiseIfRemoteError(UpdateResult result) {
break;
}
}
}
} // namespace
UpdateResult UpdatesRpcClients::Update(int worker_id,
const database::StateDelta &delta) {
@ -35,10 +35,10 @@ UpdateResult UpdatesRpcClients::Update(int worker_id,
gid::Gid UpdatesRpcClients::CreateVertex(
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
const std::unordered_map<storage::Property, query::TypedValue> &properties,
std::experimental::optional<gid::Gid> requested_gid) {
auto res = worker_clients_.GetClientPool(worker_id).Call<CreateVertexRpc>(
CreateVertexReqData{tx_id, labels, properties});
CreateVertexReqData{tx_id, labels, properties, requested_gid});
CHECK(res) << "CreateVertexRpc failed on worker: " << worker_id;
CHECK(res->member.result == UpdateResult::DONE)
<< "Remote Vertex creation result not UpdateResult::DONE";
@ -59,8 +59,7 @@ storage::EdgeAddress UpdatesRpcClients::CreateEdge(
return {res->member.gid, from_worker};
}
void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id,
VertexAccessor &from,
void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, VertexAccessor &from,
storage::EdgeAddress edge_address,
VertexAccessor &to,
storage::EdgeType edge_type) {

View File

@ -26,20 +26,21 @@ class UpdatesRpcClients {
/// Sends an update delta to the given worker.
UpdateResult Update(int worker_id, const database::StateDelta &delta);
/// Creates a vertex on the given worker and returns it's id.
gid::Gid CreateVertex(
int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties);
/// Creates a vertex on the given worker and returns it's id (tries to create
/// vertex with requested_gid first).
gid::Gid CreateVertex(int worker_id, tx::TransactionId tx_id,
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property,
query::TypedValue> &properties,
std::experimental::optional<gid::Gid> requested_gid);
/// Creates an edge on the given worker and returns it's address. If the `to`
/// vertex is on the same worker as `from`, then all remote CRUD will be
/// handled by a call to this function. Otherwise a separate call to
/// `AddInEdge` might be necessary. Throws all the exceptions that can
/// occur remotely as a result of updating a vertex.
storage::EdgeAddress CreateEdge(tx::TransactionId tx_id,
VertexAccessor &from, VertexAccessor &to,
storage::EdgeAddress CreateEdge(tx::TransactionId tx_id, VertexAccessor &from,
VertexAccessor &to,
storage::EdgeType edge_type);
/// Adds the edge with the given address to the `to` vertex as an incoming
@ -61,8 +62,8 @@ class UpdatesRpcClients {
gid::Gid vertex_from_id,
storage::VertexAddress vertex_to_addr);
void RemoveInEdge(tx::TransactionId tx_id, int worker_id,
gid::Gid vertex_id, storage::EdgeAddress edge_address);
void RemoveInEdge(tx::TransactionId tx_id, int worker_id, gid::Gid vertex_id,
storage::EdgeAddress edge_address);
/// Calls for all the workers (except the given one) to apply their updates
/// and returns the future results.

View File

@ -50,6 +50,7 @@ struct CreateVertexReqData {
tx::TransactionId tx_id;
std::vector<storage::Label> labels;
std::unordered_map<storage::Property, query::TypedValue> properties;
std::experimental::optional<gid::Gid> requested_gid;
private:
friend class boost::serialization::access;
@ -63,6 +64,7 @@ struct CreateVertexReqData {
ar << kv.first;
utils::SaveTypedValue(ar, kv.second);
}
ar << requested_gid;
}
template <class TArchive>
@ -78,6 +80,7 @@ struct CreateVertexReqData {
utils::LoadTypedValue(ar, tv);
properties.emplace(p, std::move(tv));
}
ar >> requested_gid;
}
BOOST_SERIALIZATION_SPLIT_MEMBER()
};

View File

@ -61,9 +61,9 @@ UpdateResult UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::Emplace(
template <typename TRecordAccessor>
gid::Gid UpdatesRpcServer::TransactionUpdates<TRecordAccessor>::CreateVertex(
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties) {
auto result = db_accessor_.InsertVertex();
const std::unordered_map<storage::Property, query::TypedValue> &properties,
std::experimental::optional<gid::Gid> requested_gid) {
auto result = db_accessor_.InsertVertex(requested_gid);
for (auto &label : labels) result.add_label(label);
for (auto &kv : properties) result.PropsSet(kv.first, kv.second);
std::lock_guard<SpinLock> guard{lock_};
@ -201,7 +201,8 @@ UpdatesRpcServer::UpdatesRpcServer(database::GraphDb &db,
server.Register<CreateVertexRpc>([this](const CreateVertexReq &req) {
gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id)
.CreateVertex(req.member.labels, req.member.properties);
.CreateVertex(req.member.labels, req.member.properties,
req.member.requested_gid);
return std::make_unique<CreateVertexRes>(
CreateResult{UpdateResult::DONE, gid});
});

View File

@ -40,11 +40,13 @@ class UpdatesRpcServer {
/// fail-fast on serialization and update-after-delete errors.
UpdateResult Emplace(const database::StateDelta &delta);
/// Creates a new vertex and returns it's gid.
/// Creates a new vertex with requested_gid if possible and returns it's
/// gid.
gid::Gid CreateVertex(
const std::vector<storage::Label> &labels,
const std::unordered_map<storage::Property, query::TypedValue>
&properties);
&properties,
std::experimental::optional<gid::Gid> requested_gid);
/// Creates a new edge and returns it's gid. Does not update vertices at the
/// end of the edge.
@ -84,8 +86,7 @@ class UpdatesRpcServer {
database::GraphDb &db_;
template <typename TAccessor>
using MapT =
ConcurrentMap<tx::TransactionId, TransactionUpdates<TAccessor>>;
using MapT = ConcurrentMap<tx::TransactionId, TransactionUpdates<TAccessor>>;
MapT<VertexAccessor> vertex_updates_;
MapT<EdgeAccessor> edge_updates_;

View File

@ -133,6 +133,17 @@ TEST_F(DistributedGraphDbTest, CreateVertexWithData) {
}
}
// Checks if it's possible to request a specific gid for vertex creation
TEST_F(DistributedGraphDbTest, CreateVertexWithGid) {
std::experimental::optional<gid::Gid> gid(1337);
{
database::GraphDbAccessor dba{worker(1)};
auto v = dba.InsertVertexIntoRemote(2, {}, {}, gid);
EXPECT_EQ(v.gid(), *gid);
dba.Commit();
}
}
// Checks if expiring a local record for a local update before applying a remote
// update delta causes a problem
TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) {