Implement remote create (storage, RPC, not operator)
Summary: Implementation of remote vertex and edge creation. This diff addresses the creation API (`GraphDbAccessor::InsertEdge`, `GraphDbAccessor::InsertRemoteVertex`) and the necessary RPC and `RemoteCache` stuff. What is missing for full remote creation support are `query::plan::operator` changes that are expected to minor. Pushing this diff as it's large enough, operator and end to end tests in the next. Also, the naming of existing structures and files is confusing (update refering to both updates and created, `results` used too often etc.). I will address this too, but feel free to comment on bad naming. Reviewers: dgleich, teon.banek, msantl Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1210
This commit is contained in:
parent
2055176139
commit
0c40c67ac2
@ -90,3 +90,15 @@ BOOST_CLASS_EXPORT(distributed::RemoteUpdateReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteUpdateRes);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteUpdateApplyRes);
|
||||
|
||||
// Remote creates
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteCreateResult);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexReqData);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteCreateVertexRes);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeReqData);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeRes);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReqData);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReq);
|
||||
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeRes);
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "database/state_delta.hpp"
|
||||
#include "distributed/index_rpc_messages.hpp"
|
||||
#include "distributed/remote_data_manager.hpp"
|
||||
#include "distributed/remote_updates_rpc_clients.hpp"
|
||||
#include "storage/address_types.hpp"
|
||||
#include "storage/edge.hpp"
|
||||
#include "storage/edge_accessor.hpp"
|
||||
@ -16,6 +18,35 @@
|
||||
|
||||
namespace database {
|
||||
|
||||
#define LOCALIZED_ADDRESS_SPECIALIZATION(type) \
|
||||
template <> \
|
||||
storage::type##Address GraphDbAccessor::LocalizedAddress( \
|
||||
storage::type##Address address) const { \
|
||||
if (address.is_local()) return address; \
|
||||
if (address.worker_id() == db().WorkerId()) { \
|
||||
return Local##type##Address(address.gid()); \
|
||||
} \
|
||||
return address; \
|
||||
}
|
||||
|
||||
LOCALIZED_ADDRESS_SPECIALIZATION(Vertex)
|
||||
LOCALIZED_ADDRESS_SPECIALIZATION(Edge)
|
||||
|
||||
#undef LOCALIZED_ADDRESS_SPECIALIZATION
|
||||
|
||||
#define GLOBALIZED_ADDRESS_SPECIALIZATION(type) \
|
||||
template <> \
|
||||
storage::type##Address GraphDbAccessor::GlobalizedAddress( \
|
||||
storage::type##Address address) const { \
|
||||
if (address.is_remote()) return address; \
|
||||
return {address.local()->gid_, db().WorkerId()}; \
|
||||
}
|
||||
|
||||
GLOBALIZED_ADDRESS_SPECIALIZATION(Vertex)
|
||||
GLOBALIZED_ADDRESS_SPECIALIZATION(Edge)
|
||||
|
||||
#undef GLOBALIZED_ADDRESS_SPECIALIZATION
|
||||
|
||||
GraphDbAccessor::GraphDbAccessor(GraphDb &db)
|
||||
: db_(db),
|
||||
transaction_(*db.tx_engine().Begin()),
|
||||
@ -76,6 +107,26 @@ VertexAccessor GraphDbAccessor::InsertVertex(
|
||||
return VertexAccessor(vertex_vlist, *this);
|
||||
}
|
||||
|
||||
VertexAccessor GraphDbAccessor::InsertVertexIntoRemote(
|
||||
int worker_id, const std::vector<storage::Label> &labels,
|
||||
const std::unordered_map<storage::Property, query::TypedValue>
|
||||
&properties) {
|
||||
CHECK(worker_id != db().WorkerId())
|
||||
<< "Not allowed to call InsertVertexIntoRemote for local worker";
|
||||
|
||||
gid::Gid gid = db().remote_updates_clients().RemoteCreateVertex(
|
||||
worker_id, transaction_id(), labels, properties);
|
||||
|
||||
auto vertex = std::make_unique<Vertex>();
|
||||
vertex->labels_ = labels;
|
||||
for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second);
|
||||
|
||||
db().remote_data_manager()
|
||||
.Vertices(transaction_id())
|
||||
.emplace(gid, nullptr, std::move(vertex));
|
||||
return VertexAccessor({gid, worker_id}, *this);
|
||||
}
|
||||
|
||||
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
|
||||
gid::Gid gid, bool current_state) {
|
||||
VertexAccessor record_accessor(LocalVertexAddress(gid), *this);
|
||||
@ -375,55 +426,79 @@ EdgeAccessor GraphDbAccessor::InsertEdge(
|
||||
VertexAccessor &from, VertexAccessor &to, storage::EdgeType edge_type,
|
||||
std::experimental::optional<gid::Gid> requested_gid) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
// An edge is created on the worker of it's "from" vertex.
|
||||
if (!from.is_local()) {
|
||||
LOG(ERROR) << "Remote edge insertion not implemented.";
|
||||
// TODO call remote InsertEdge(...)->gid. Possible outcomes are successful
|
||||
// creation or an error (serialization, timeout). If successful, create an
|
||||
// EdgeAccessor and return it. The remote InsertEdge(...) will be calling
|
||||
// remote Connect(...) if "to" is not local to it.
|
||||
|
||||
// The address of an edge we'll create.
|
||||
storage::EdgeAddress edge_address;
|
||||
|
||||
Vertex *from_updated;
|
||||
if (from.is_local()) {
|
||||
auto gid = db_.storage().edge_generator_.Next(requested_gid);
|
||||
edge_address = new mvcc::VersionList<Edge>(
|
||||
transaction_, gid, from.address(), to.address(), edge_type);
|
||||
// We need to insert edge_address to edges_ before calling update since
|
||||
// update can throw and edge_vlist will not be garbage collected if it is
|
||||
// not in edges_ skiplist.
|
||||
bool success =
|
||||
db_.storage().edges_.access().insert(gid, edge_address.local()).second;
|
||||
CHECK(success) << "Attempting to insert an edge with an existing GID: "
|
||||
<< gid;
|
||||
|
||||
from.SwitchNew();
|
||||
from_updated = &from.update();
|
||||
|
||||
// TODO when preparing WAL for distributed, most likely never use
|
||||
// `CREATE_EDGE`, but always have it split into 3 parts (edge insertion,
|
||||
// in/out modification).
|
||||
wal().Emplace(database::StateDelta::CreateEdge(
|
||||
transaction_.id_, gid, from.gid(), to.gid(), edge_type,
|
||||
EdgeTypeName(edge_type)));
|
||||
|
||||
} else {
|
||||
edge_address = db().remote_updates_clients().RemoteCreateEdge(
|
||||
transaction_id(), from, to, edge_type);
|
||||
|
||||
from_updated = db().remote_data_manager()
|
||||
.Vertices(transaction_id())
|
||||
.FindNew(from.gid());
|
||||
|
||||
// Create an Edge and insert it into the RemoteCache so we see it locally.
|
||||
db().remote_data_manager()
|
||||
.Edges(transaction_id())
|
||||
.emplace(
|
||||
edge_address.gid(), nullptr,
|
||||
std::make_unique<Edge>(from.address(), to.address(), edge_type));
|
||||
}
|
||||
auto gid = db_.storage().edge_generator_.Next(requested_gid);
|
||||
auto edge_vlist = new mvcc::VersionList<Edge>(
|
||||
transaction_, gid, from.address(), to.address(), edge_type);
|
||||
// We need to insert edge_vlist to edges_ before calling update since update
|
||||
// can throw and edge_vlist will not be garbage collected if it is not in
|
||||
// edges_ skiplist.
|
||||
bool success = db_.storage().edges_.access().insert(gid, edge_vlist).second;
|
||||
CHECK(success) << "Attempting to insert an edge with an existing GID: "
|
||||
<< gid;
|
||||
from_updated->out_.emplace(to.address(), edge_address, edge_type);
|
||||
|
||||
// ensure that the "from" accessor has the latest version
|
||||
from.SwitchNew();
|
||||
from.update().out_.emplace(to.address(), edge_vlist, edge_type);
|
||||
|
||||
// It is possible that the "to" accessor is remote.
|
||||
Vertex *to_updated;
|
||||
if (to.is_local()) {
|
||||
// ensure that the "to" accessor has the latest version (Switch new)
|
||||
// WARNING: must do that after the above "from.update()" for cases when
|
||||
// we are creating a cycle and "from" and "to" are the same vlist
|
||||
to.SwitchNew();
|
||||
to.update().in_.emplace(from.address(), edge_vlist, edge_type);
|
||||
to_updated = &to.update();
|
||||
} else {
|
||||
LOG(ERROR) << "Connecting to a remote vertex not implemented.";
|
||||
// TODO call remote Connect(from_gid, edge_gid, to_gid, edge_type). Possible
|
||||
// outcomes are success or error (serialization, timeout).
|
||||
// The RPC call for the `to` side is already handled if `from` is not local.
|
||||
if (from.is_local() ||
|
||||
from.address().worker_id() != to.address().worker_id()) {
|
||||
db().remote_updates_clients().RemoteAddInEdge(
|
||||
transaction_id(), from, GlobalizedAddress(edge_address), to,
|
||||
edge_type);
|
||||
}
|
||||
to_updated =
|
||||
db().remote_data_manager().Vertices(transaction_id()).FindNew(to.gid());
|
||||
}
|
||||
wal().Emplace(database::StateDelta::CreateEdge(
|
||||
transaction_.id_, edge_vlist->gid_, from.gid(), to.gid(), edge_type,
|
||||
EdgeTypeName(edge_type)));
|
||||
return EdgeAccessor(edge_vlist, *this, from.address(), to.address(),
|
||||
to_updated->in_.emplace(from.address(), edge_address, edge_type);
|
||||
|
||||
return EdgeAccessor(edge_address, *this, from.address(), to.address(),
|
||||
edge_type);
|
||||
}
|
||||
|
||||
EdgeAccessor GraphDbAccessor::InsertOnlyEdge(storage::VertexAddress &from,
|
||||
storage::VertexAddress &to,
|
||||
storage::EdgeType edge_type,
|
||||
gid::Gid edge_gid) {
|
||||
auto gid = db_.storage().edge_generator_.Next(edge_gid);
|
||||
DCHECK(gid == edge_gid) << "Gid should be equal as edge gid since "
|
||||
"this edges are only added after vertices "
|
||||
"reference them by their gid";
|
||||
EdgeAccessor GraphDbAccessor::InsertOnlyEdge(
|
||||
storage::VertexAddress from, storage::VertexAddress to,
|
||||
storage::EdgeType edge_type,
|
||||
std::experimental::optional<gid::Gid> requested_gid) {
|
||||
auto gid = db_.storage().edge_generator_.Next(requested_gid);
|
||||
auto edge_vlist =
|
||||
new mvcc::VersionList<Edge>(transaction_, gid, from, to, edge_type);
|
||||
// We need to insert edge_vlist to edges_ before calling update since update
|
||||
@ -530,4 +605,5 @@ mvcc::VersionList<Edge> *GraphDbAccessor::LocalEdgeAddress(gid::Gid gid) const {
|
||||
CHECK(found != access.end()) << "Failed to find edge for gid: " << gid;
|
||||
return found->second;
|
||||
}
|
||||
|
||||
} // namespace database
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <experimental/optional>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "cppitertools/filter.hpp"
|
||||
#include "cppitertools/imap.hpp"
|
||||
@ -9,6 +10,7 @@
|
||||
|
||||
#include "database/graph_db.hpp"
|
||||
#include "distributed/remote_cache.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/address_types.hpp"
|
||||
#include "storage/edge_accessor.hpp"
|
||||
#include "storage/types.hpp"
|
||||
@ -76,6 +78,13 @@ class GraphDbAccessor {
|
||||
VertexAccessor InsertVertex(std::experimental::optional<gid::Gid>
|
||||
requested_gid = std::experimental::nullopt);
|
||||
|
||||
/** Creates a new Vertex on the given worker. It is NOT allowed to call this
|
||||
* function with this worker's id. */
|
||||
VertexAccessor InsertVertexIntoRemote(
|
||||
int worker_id, const std::vector<storage::Label> &labels,
|
||||
const std::unordered_map<storage::Property, query::TypedValue>
|
||||
&properties);
|
||||
|
||||
/**
|
||||
* Removes the vertex of the given accessor. If the vertex has any outgoing or
|
||||
* incoming edges, it is not deleted. See `DetachRemoveVertex` if you want to
|
||||
@ -281,9 +290,11 @@ class GraphDbAccessor {
|
||||
* Insert edge into main storage, but don't insert it into from and to
|
||||
* vertices edge lists.
|
||||
*/
|
||||
EdgeAccessor InsertOnlyEdge(storage::VertexAddress &from,
|
||||
storage::VertexAddress &to,
|
||||
storage::EdgeType edge_type, gid::Gid edge_gid);
|
||||
EdgeAccessor InsertOnlyEdge(storage::VertexAddress from,
|
||||
storage::VertexAddress to,
|
||||
storage::EdgeType edge_type,
|
||||
std::experimental::optional<gid::Gid>
|
||||
requested_gid = std::experimental::nullopt);
|
||||
|
||||
/**
|
||||
* Removes an edge from the graph. Parameters can indicate if the edge should
|
||||
@ -533,6 +544,7 @@ class GraphDbAccessor {
|
||||
const tx::Transaction &transaction() const { return transaction_; }
|
||||
durability::WriteAheadLog &wal();
|
||||
auto &db() { return db_; }
|
||||
const auto &db() const { return db_; }
|
||||
|
||||
/**
|
||||
* Returns the current value of the counter with the given name, and
|
||||
@ -557,6 +569,16 @@ class GraphDbAccessor {
|
||||
/// Gets the local edge address for the given gid. Fails if not present.
|
||||
mvcc::VersionList<Edge> *LocalEdgeAddress(gid::Gid gid) const;
|
||||
|
||||
/// Converts an address to local, if possible. Returns the same address if
|
||||
/// not.
|
||||
template <typename TAddress>
|
||||
TAddress LocalizedAddress(TAddress address) const;
|
||||
|
||||
/// Converts local address to remote, returns remote as they are.
|
||||
/// not.
|
||||
template <typename TAddress>
|
||||
TAddress GlobalizedAddress(TAddress address) const;
|
||||
|
||||
private:
|
||||
GraphDb &db_;
|
||||
tx::Transaction &transaction_;
|
||||
|
@ -39,6 +39,35 @@ StateDelta StateDelta::CreateEdge(tx::transaction_id_t tx_id, gid::Gid edge_id,
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::AddOutEdge(tx::transaction_id_t tx_id,
|
||||
gid::Gid vertex_id,
|
||||
storage::VertexAddress vertex_to_address,
|
||||
storage::EdgeAddress edge_address,
|
||||
storage::EdgeType edge_type) {
|
||||
CHECK(vertex_to_address.is_remote() && edge_address.is_remote())
|
||||
<< "WAL can only contain global addresses.";
|
||||
StateDelta op(StateDelta::Type::ADD_OUT_EDGE, tx_id);
|
||||
op.vertex_id = vertex_id;
|
||||
op.vertex_to_address = vertex_to_address;
|
||||
op.edge_address = edge_address;
|
||||
op.edge_type = edge_type;
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
|
||||
storage::VertexAddress vertex_from_address,
|
||||
storage::EdgeAddress edge_address,
|
||||
storage::EdgeType edge_type) {
|
||||
CHECK(vertex_from_address.is_remote() && edge_address.is_remote())
|
||||
<< "WAL can only contain global addresses.";
|
||||
StateDelta op(StateDelta::Type::ADD_IN_EDGE, tx_id);
|
||||
op.vertex_id = vertex_id;
|
||||
op.vertex_from_address = vertex_from_address;
|
||||
op.edge_address = edge_address;
|
||||
op.edge_type = edge_type;
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::PropsSetVertex(tx::transaction_id_t tx_id,
|
||||
gid::Gid vertex_id,
|
||||
storage::Property property,
|
||||
@ -133,6 +162,18 @@ void StateDelta::Encode(
|
||||
encoder.WriteInt(edge_type.storage());
|
||||
encoder.WriteString(edge_type_name);
|
||||
break;
|
||||
case Type::ADD_OUT_EDGE:
|
||||
encoder.WriteInt(vertex_id);
|
||||
encoder.WriteInt(vertex_to_address.raw());
|
||||
encoder.WriteInt(edge_address.raw());
|
||||
encoder.WriteInt(edge_type.storage());
|
||||
break;
|
||||
case Type::ADD_IN_EDGE:
|
||||
encoder.WriteInt(vertex_id);
|
||||
encoder.WriteInt(vertex_from_address.raw());
|
||||
encoder.WriteInt(edge_address.raw());
|
||||
encoder.WriteInt(edge_type.storage());
|
||||
break;
|
||||
case Type::SET_PROPERTY_VERTEX:
|
||||
encoder.WriteInt(vertex_id);
|
||||
encoder.WriteInt(property.storage());
|
||||
@ -205,6 +246,19 @@ std::experimental::optional<StateDelta> StateDelta::Decode(
|
||||
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
|
||||
DECODE_MEMBER(edge_type_name, ValueString)
|
||||
break;
|
||||
case Type::ADD_OUT_EDGE:
|
||||
DECODE_MEMBER(vertex_id, ValueInt)
|
||||
DECODE_MEMBER_CAST(vertex_to_address, ValueInt, storage::VertexAddress)
|
||||
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
|
||||
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
|
||||
break;
|
||||
case Type::ADD_IN_EDGE:
|
||||
DECODE_MEMBER(vertex_id, ValueInt)
|
||||
DECODE_MEMBER_CAST(vertex_from_address, ValueInt,
|
||||
storage::VertexAddress)
|
||||
DECODE_MEMBER_CAST(edge_address, ValueInt, storage::EdgeAddress)
|
||||
DECODE_MEMBER_CAST(edge_type, ValueInt, storage::EdgeType)
|
||||
break;
|
||||
case Type::SET_PROPERTY_VERTEX:
|
||||
DECODE_MEMBER(vertex_id, ValueInt)
|
||||
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
|
||||
@ -273,6 +327,9 @@ void StateDelta::Apply(GraphDbAccessor &dba) const {
|
||||
dba.InsertEdge(*from, *to, dba.EdgeType(edge_type_name), edge_id);
|
||||
break;
|
||||
}
|
||||
case Type::ADD_OUT_EDGE:
|
||||
case Type::ADD_IN_EDGE:
|
||||
LOG(FATAL) << "Partial edge-creation not yet supported in Apply";
|
||||
case Type::SET_PROPERTY_VERTEX: {
|
||||
auto vertex = dba.FindVertex(vertex_id, true);
|
||||
DCHECK(vertex) << "Failed to find vertex.";
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include "communication/bolt/v1/encoder/primitive_encoder.hpp"
|
||||
#include "durability/hashed_file_reader.hpp"
|
||||
#include "durability/hashed_file_writer.hpp"
|
||||
#include "storage/address_types.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/property_value.hpp"
|
||||
#include "utils/serialization.hpp"
|
||||
@ -28,9 +29,11 @@ struct StateDelta {
|
||||
TRANSACTION_BEGIN,
|
||||
TRANSACTION_COMMIT,
|
||||
TRANSACTION_ABORT,
|
||||
CREATE_VERTEX, // vertex_id
|
||||
CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type,
|
||||
// edge_type_name
|
||||
CREATE_VERTEX, // vertex_id
|
||||
CREATE_EDGE, // edge_id, from_vertex_id, to_vertex_id, edge_type,
|
||||
// edge_type_name
|
||||
ADD_OUT_EDGE, // vertex_id, edge_address, vertex_to_address, edge_type
|
||||
ADD_IN_EDGE, // vertex_id, edge_address, vertex_from_address, edge_type
|
||||
SET_PROPERTY_VERTEX, // vertex_id, property, property_name, property_value
|
||||
SET_PROPERTY_EDGE, // edge_id, property, property_name, property_value
|
||||
// remove property is done by setting a PropertyValue::Null
|
||||
@ -66,6 +69,14 @@ struct StateDelta {
|
||||
gid::Gid vertex_from_id, gid::Gid vertex_to_id,
|
||||
storage::EdgeType edge_type,
|
||||
const std::string &edge_type_name);
|
||||
static StateDelta AddOutEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
|
||||
storage::VertexAddress vertex_to_address,
|
||||
storage::EdgeAddress edge_address,
|
||||
storage::EdgeType edge_type);
|
||||
static StateDelta AddInEdge(tx::transaction_id_t tx_id, gid::Gid vertex_id,
|
||||
storage::VertexAddress vertex_from_address,
|
||||
storage::EdgeAddress edge_address,
|
||||
storage::EdgeType edge_type);
|
||||
static StateDelta PropsSetVertex(tx::transaction_id_t tx_id,
|
||||
gid::Gid vertex_id,
|
||||
storage::Property property,
|
||||
@ -97,10 +108,15 @@ struct StateDelta {
|
||||
tx::transaction_id_t transaction_id;
|
||||
|
||||
// Members valid only for some deltas, see StateDelta::Type comments above.
|
||||
// TODO: when preparing the WAL for distributed, most likely remove Gids and
|
||||
// only keep addresses.
|
||||
gid::Gid vertex_id;
|
||||
gid::Gid edge_id;
|
||||
storage::EdgeAddress edge_address;
|
||||
gid::Gid vertex_from_id;
|
||||
storage::VertexAddress vertex_from_address;
|
||||
gid::Gid vertex_to_id;
|
||||
storage::VertexAddress vertex_to_address;
|
||||
storage::EdgeType edge_type;
|
||||
std::string edge_type_name;
|
||||
storage::Property property;
|
||||
@ -118,8 +134,11 @@ struct StateDelta {
|
||||
ar &transaction_id;
|
||||
ar &vertex_id;
|
||||
ar &edge_id;
|
||||
ar &edge_address;
|
||||
ar &vertex_from_id;
|
||||
ar &vertex_from_address;
|
||||
ar &vertex_to_id;
|
||||
ar &vertex_to_address;
|
||||
ar &edge_type;
|
||||
ar &edge_type_name;
|
||||
ar &property;
|
||||
@ -135,8 +154,11 @@ struct StateDelta {
|
||||
ar &transaction_id;
|
||||
ar &vertex_id;
|
||||
ar &edge_id;
|
||||
ar &edge_address;
|
||||
ar &vertex_from_id;
|
||||
ar &vertex_from_address;
|
||||
ar &vertex_to_id;
|
||||
ar &vertex_to_address;
|
||||
ar &edge_type;
|
||||
ar &edge_type_name;
|
||||
ar &property;
|
||||
|
@ -1,9 +1,16 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "database/state_delta.hpp"
|
||||
#include "distributed/coordination.hpp"
|
||||
#include "distributed/remote_updates_rpc_messages.hpp"
|
||||
#include "distributed/rpc_worker_clients.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/address_types.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/types.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
|
||||
namespace distributed {
|
||||
@ -24,6 +31,60 @@ class RemoteUpdatesRpcClients {
|
||||
->member;
|
||||
}
|
||||
|
||||
/// Creates a vertex on the given worker and returns it's id.
|
||||
gid::Gid RemoteCreateVertex(
|
||||
int worker_id, tx::transaction_id_t tx_id,
|
||||
const std::vector<storage::Label> &labels,
|
||||
const std::unordered_map<storage::Property, query::TypedValue>
|
||||
&properties) {
|
||||
auto result =
|
||||
worker_clients_.GetClientPool(worker_id).Call<RemoteCreateVertexRpc>(
|
||||
RemoteCreateVertexReqData{tx_id, labels, properties});
|
||||
CHECK(result) << "Failed to remote-create a vertex on worker: "
|
||||
<< worker_id;
|
||||
CHECK(result->member.result == RemoteUpdateResult::DONE)
|
||||
<< "Vertex creation can not result in an error";
|
||||
return result->member.gid;
|
||||
}
|
||||
|
||||
/// Creates an edge on the given worker and returns it's address. If the `to`
|
||||
/// vertex is on the same worker as `from`, then all remote CRUD will be
|
||||
/// handled by a call to this function. Otherwise a separate call to
|
||||
/// `RemoteAddInEdge` might be necessary. Throws all the exceptions that can
|
||||
/// occur remotely as a result of updating a vertex.
|
||||
storage::EdgeAddress RemoteCreateEdge(tx::transaction_id_t tx_id,
|
||||
VertexAccessor &from,
|
||||
VertexAccessor &to,
|
||||
storage::EdgeType edge_type) {
|
||||
CHECK(from.address().is_remote())
|
||||
<< "In RemoteCreateEdge `from` must be remote";
|
||||
|
||||
int from_worker = from.address().worker_id();
|
||||
auto res = worker_clients_.GetClientPool(from_worker)
|
||||
.Call<RemoteCreateEdgeRpc>(RemoteCreateEdgeReqData{
|
||||
from.gid(), to.GlobalAddress(), edge_type, tx_id});
|
||||
CHECK(res) << "RemoteCreateEdge RPC failed";
|
||||
RaiseIfRemoteError(res->member.result);
|
||||
return {res->member.gid, from_worker};
|
||||
}
|
||||
|
||||
/// Adds the edge with the given address to the `to` vertex as an incoming
|
||||
/// edge. Only used when `to` is remote and not on the same worker as `from`.
|
||||
void RemoteAddInEdge(tx::transaction_id_t tx_id, VertexAccessor &from,
|
||||
storage::EdgeAddress edge_address, VertexAccessor &to,
|
||||
storage::EdgeType edge_type) {
|
||||
CHECK(to.address().is_remote() && edge_address.is_remote() &&
|
||||
(from.GlobalAddress().worker_id() != to.address().worker_id()))
|
||||
<< "RemoteAddInEdge should only be called when `to` is remote and "
|
||||
"`from` is not on the same worker as `to`.";
|
||||
auto res = worker_clients_.GetClientPool(to.GlobalAddress().worker_id())
|
||||
.Call<RemoteAddInEdgeRpc>(RemoteAddInEdgeReqData{
|
||||
from.GlobalAddress(), edge_address, to.gid(), edge_type,
|
||||
tx_id});
|
||||
CHECK(res) << "RemoteAddInEdge RPC failed";
|
||||
RaiseIfRemoteError(res->member);
|
||||
}
|
||||
|
||||
/// Calls for the worker with the given ID to apply remote updates. Returns
|
||||
/// the results of that operation.
|
||||
RemoteUpdateResult RemoteUpdateApply(int worker_id,
|
||||
@ -45,5 +106,19 @@ class RemoteUpdatesRpcClients {
|
||||
|
||||
private:
|
||||
RpcWorkerClients worker_clients_;
|
||||
|
||||
void RaiseIfRemoteError(RemoteUpdateResult result) {
|
||||
switch (result) {
|
||||
case RemoteUpdateResult::SERIALIZATION_ERROR:
|
||||
throw new mvcc::SerializationError();
|
||||
case RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
|
||||
throw new LockTimeoutException(
|
||||
"Remote LockTimeoutError during edge creation");
|
||||
case RemoteUpdateResult::UPDATE_DELETED_ERROR:
|
||||
throw new RecordDeletedError();
|
||||
case RemoteUpdateResult::DONE:
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
} // namespace distributed
|
||||
|
@ -1,8 +1,15 @@
|
||||
#pragma once
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
#include "boost/serialization/vector.hpp"
|
||||
|
||||
#include "communication/rpc/messages.hpp"
|
||||
#include "database/state_delta.hpp"
|
||||
#include "storage/address_types.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "transactions/type.hpp"
|
||||
#include "utils/serialization.hpp"
|
||||
|
||||
namespace distributed {
|
||||
|
||||
@ -26,4 +33,110 @@ RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateApplyRes, RemoteUpdateResult);
|
||||
using RemoteUpdateApplyRpc =
|
||||
communication::rpc::RequestResponse<RemoteUpdateApplyReq,
|
||||
RemoteUpdateApplyRes>;
|
||||
|
||||
struct RemoteCreateResult {
|
||||
RemoteUpdateResult result;
|
||||
// Only valid if creation was successful.
|
||||
gid::Gid gid;
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &result;
|
||||
ar &gid;
|
||||
}
|
||||
};
|
||||
|
||||
struct RemoteCreateVertexReqData {
|
||||
tx::transaction_id_t tx_id;
|
||||
std::vector<storage::Label> labels;
|
||||
std::unordered_map<storage::Property, query::TypedValue> properties;
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
|
||||
template <class TArchive>
|
||||
void save(TArchive &ar, unsigned int) const {
|
||||
ar << tx_id;
|
||||
ar << labels;
|
||||
ar << properties.size();
|
||||
for (auto &kv : properties) {
|
||||
ar << kv.first;
|
||||
utils::SaveTypedValue(ar, kv.second);
|
||||
}
|
||||
}
|
||||
|
||||
template <class TArchive>
|
||||
void load(TArchive &ar, unsigned int) {
|
||||
ar >> tx_id;
|
||||
ar >> labels;
|
||||
size_t props_size;
|
||||
ar >> props_size;
|
||||
for (size_t i = 0; i < props_size; ++i) {
|
||||
storage::Property p;
|
||||
ar >> p;
|
||||
query::TypedValue tv;
|
||||
utils::LoadTypedValue(ar, tv);
|
||||
properties.emplace(p, std::move(tv));
|
||||
}
|
||||
}
|
||||
BOOST_SERIALIZATION_SPLIT_MEMBER()
|
||||
};
|
||||
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateVertexReq, RemoteCreateVertexReqData);
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateVertexRes, RemoteCreateResult);
|
||||
using RemoteCreateVertexRpc =
|
||||
communication::rpc::RequestResponse<RemoteCreateVertexReq,
|
||||
RemoteCreateVertexRes>;
|
||||
|
||||
struct RemoteCreateEdgeReqData {
|
||||
gid::Gid from;
|
||||
storage::VertexAddress to;
|
||||
storage::EdgeType edge_type;
|
||||
tx::transaction_id_t tx_id;
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &from;
|
||||
ar &to;
|
||||
ar &edge_type;
|
||||
ar &tx_id;
|
||||
}
|
||||
};
|
||||
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateEdgeReq, RemoteCreateEdgeReqData);
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RemoteCreateEdgeRes, RemoteCreateResult);
|
||||
using RemoteCreateEdgeRpc =
|
||||
communication::rpc::RequestResponse<RemoteCreateEdgeReq,
|
||||
RemoteCreateEdgeRes>;
|
||||
|
||||
struct RemoteAddInEdgeReqData {
|
||||
storage::VertexAddress from;
|
||||
storage::EdgeAddress edge_address;
|
||||
gid::Gid to;
|
||||
storage::EdgeType edge_type;
|
||||
tx::transaction_id_t tx_id;
|
||||
|
||||
private:
|
||||
friend class boost::serialization::access;
|
||||
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &from;
|
||||
ar &edge_address;
|
||||
ar &to;
|
||||
ar &edge_type;
|
||||
ar &tx_id;
|
||||
}
|
||||
};
|
||||
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeReq, RemoteAddInEdgeReqData);
|
||||
RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeRes, RemoteUpdateResult);
|
||||
using RemoteAddInEdgeRpc =
|
||||
communication::rpc::RequestResponse<RemoteAddInEdgeReq, RemoteAddInEdgeRes>;
|
||||
} // namespace distributed
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <mutex>
|
||||
#include <unordered_map>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@ -14,8 +15,10 @@
|
||||
#include "database/state_delta.hpp"
|
||||
#include "distributed/remote_updates_rpc_messages.hpp"
|
||||
#include "mvcc/version_list.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/gid.hpp"
|
||||
#include "storage/record_accessor.hpp"
|
||||
#include "storage/types.hpp"
|
||||
#include "storage/vertex_accessor.hpp"
|
||||
#include "threading/sync/lock_timeout_exception.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
@ -90,6 +93,21 @@ class RemoteUpdatesRpcServer {
|
||||
return RemoteUpdateResult::DONE;
|
||||
}
|
||||
|
||||
/// Creates a new vertex and returns it's gid.
|
||||
RemoteCreateResult CreateVertex(
|
||||
const std::vector<storage::Label> &labels,
|
||||
const std::unordered_map<storage::Property, query::TypedValue>
|
||||
&properties) {
|
||||
auto result = db_accessor_.InsertVertex();
|
||||
for (auto &label : labels) result.add_label(label);
|
||||
for (auto &kv : properties) result.PropsSet(kv.first, kv.second);
|
||||
std::lock_guard<SpinLock> guard{lock_};
|
||||
deltas_.emplace(
|
||||
result.gid(),
|
||||
std::make_pair(result, std::vector<database::StateDelta>{}));
|
||||
return {RemoteUpdateResult::DONE, result.gid()};
|
||||
}
|
||||
|
||||
/// Applies all the deltas on the record.
|
||||
RemoteUpdateResult Apply() {
|
||||
std::lock_guard<SpinLock> guard{lock_};
|
||||
@ -109,6 +127,8 @@ class RemoteUpdatesRpcServer {
|
||||
return RemoteUpdateResult::DONE;
|
||||
}
|
||||
|
||||
auto &db_accessor() { return db_accessor_; }
|
||||
|
||||
private:
|
||||
database::GraphDbAccessor db_accessor_;
|
||||
std::unordered_map<
|
||||
@ -127,15 +147,16 @@ class RemoteUpdatesRpcServer {
|
||||
: db_(db), engine_(engine), server_(system, kRemoteUpdatesRpc) {
|
||||
server_.Register<RemoteUpdateRpc>([this](const RemoteUpdateReq &req) {
|
||||
using DeltaType = database::StateDelta::Type;
|
||||
switch (req.member.type) {
|
||||
auto &delta = req.member;
|
||||
switch (delta.type) {
|
||||
case DeltaType::SET_PROPERTY_VERTEX:
|
||||
case DeltaType::ADD_LABEL:
|
||||
case DeltaType::REMOVE_LABEL:
|
||||
return std::make_unique<RemoteUpdateRes>(
|
||||
Process(vertex_updates_, req.member));
|
||||
GetUpdates(vertex_updates_, delta.transaction_id).Emplace(delta));
|
||||
case DeltaType::SET_PROPERTY_EDGE:
|
||||
return std::make_unique<RemoteUpdateRes>(
|
||||
Process(edge_updates_, req.member));
|
||||
GetUpdates(edge_updates_, delta.transaction_id).Emplace(delta));
|
||||
default:
|
||||
LOG(FATAL) << "Can't perform a remote update with delta type: "
|
||||
<< static_cast<int>(req.member.type);
|
||||
@ -146,6 +167,41 @@ class RemoteUpdatesRpcServer {
|
||||
[this](const RemoteUpdateApplyReq &req) {
|
||||
return std::make_unique<RemoteUpdateApplyRes>(Apply(req.member));
|
||||
});
|
||||
|
||||
server_.Register<RemoteCreateVertexRpc>(
|
||||
[this](const RemoteCreateVertexReq &req) {
|
||||
return std::make_unique<RemoteCreateVertexRes>(
|
||||
GetUpdates(vertex_updates_, req.member.tx_id)
|
||||
.CreateVertex(req.member.labels, req.member.properties));
|
||||
});
|
||||
|
||||
server_.Register<RemoteCreateEdgeRpc>(
|
||||
[this](const RemoteCreateEdgeReq &req) {
|
||||
auto data = req.member;
|
||||
auto creation_result = CreateEdge(data);
|
||||
|
||||
// If `from` and `to` are both on this worker, we handle it in this
|
||||
// RPC call. Do it only if CreateEdge succeeded.
|
||||
if (creation_result.result == RemoteUpdateResult::DONE &&
|
||||
data.to.worker_id() == db_.WorkerId()) {
|
||||
auto to_delta = database::StateDelta::AddInEdge(
|
||||
data.tx_id, data.to.gid(), data.from,
|
||||
{creation_result.gid, db_.WorkerId()}, data.edge_type);
|
||||
creation_result.result =
|
||||
GetUpdates(vertex_updates_, data.tx_id).Emplace(to_delta);
|
||||
}
|
||||
|
||||
return std::make_unique<RemoteCreateEdgeRes>(creation_result);
|
||||
});
|
||||
|
||||
server_.Register<RemoteAddInEdgeRpc>([this](const RemoteAddInEdgeReq &req) {
|
||||
auto to_delta = database::StateDelta::AddInEdge(
|
||||
req.member.tx_id, req.member.to, req.member.from,
|
||||
req.member.edge_address, req.member.edge_type);
|
||||
auto result =
|
||||
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
|
||||
return std::make_unique<RemoteAddInEdgeRes>(result);
|
||||
});
|
||||
}
|
||||
|
||||
/// Applies all existsing updates for the given transaction ID. If there are
|
||||
@ -174,29 +230,39 @@ class RemoteUpdatesRpcServer {
|
||||
database::GraphDb &db_;
|
||||
tx::Engine &engine_;
|
||||
communication::rpc::Server server_;
|
||||
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<VertexAccessor>>
|
||||
vertex_updates_;
|
||||
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<EdgeAccessor>>
|
||||
edge_updates_;
|
||||
tx::TxEndListener tx_end_listener_{engine_,
|
||||
[this](tx::transaction_id_t tx_id) {
|
||||
vertex_updates_.access().remove(tx_id);
|
||||
edge_updates_.access().remove(tx_id);
|
||||
}};
|
||||
template <typename TAccessor>
|
||||
using MapT =
|
||||
ConcurrentMap<tx::transaction_id_t, TransactionUpdates<TAccessor>>;
|
||||
MapT<VertexAccessor> vertex_updates_;
|
||||
MapT<EdgeAccessor> edge_updates_;
|
||||
|
||||
// Processes a single delta recieved in the RPC request.
|
||||
template <typename TCollection>
|
||||
RemoteUpdateResult Process(TCollection &updates,
|
||||
const database::StateDelta &delta) {
|
||||
auto tx_id = delta.transaction_id;
|
||||
auto access = updates.access();
|
||||
auto &transaction_updates =
|
||||
access
|
||||
.emplace(tx_id, std::make_tuple(tx_id),
|
||||
std::make_tuple(std::ref(db_), tx_id))
|
||||
.first->second;
|
||||
// Gets/creates the TransactionUpdates for the given transaction.
|
||||
template <typename TAccessor>
|
||||
TransactionUpdates<TAccessor> &GetUpdates(MapT<TAccessor> &updates,
|
||||
tx::transaction_id_t tx_id) {
|
||||
return updates.access()
|
||||
.emplace(tx_id, std::make_tuple(tx_id),
|
||||
std::make_tuple(std::ref(db_), tx_id))
|
||||
.first->second;
|
||||
}
|
||||
|
||||
return transaction_updates.Emplace(delta);
|
||||
RemoteCreateResult CreateEdge(const RemoteCreateEdgeReqData &req) {
|
||||
auto &dba = GetUpdates(edge_updates_, req.tx_id).db_accessor();
|
||||
|
||||
auto edge = dba.InsertOnlyEdge({req.from, db_.WorkerId()},
|
||||
dba.LocalizedAddress(req.to), req.edge_type);
|
||||
|
||||
auto from_delta = database::StateDelta::AddOutEdge(
|
||||
req.tx_id, req.from, req.to, dba.GlobalizedAddress(edge.address()),
|
||||
req.edge_type);
|
||||
|
||||
auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta);
|
||||
return {result, edge.gid()};
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -2,7 +2,9 @@
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#include "boost/serialization/access.hpp"
|
||||
#include "glog/logging.h"
|
||||
|
||||
#include "storage/gid.hpp"
|
||||
|
||||
namespace storage {
|
||||
@ -89,5 +91,11 @@ class Address {
|
||||
|
||||
private:
|
||||
StorageT storage_{0};
|
||||
|
||||
friend class boost::serialization::access;
|
||||
template <class TArchive>
|
||||
void serialize(TArchive &ar, unsigned int) {
|
||||
ar &storage_;
|
||||
}
|
||||
};
|
||||
} // namespace storage
|
||||
|
@ -14,7 +14,8 @@ using database::StateDelta;
|
||||
template <typename TRecord>
|
||||
RecordAccessor<TRecord>::RecordAccessor(AddressT address,
|
||||
database::GraphDbAccessor &db_accessor)
|
||||
: db_accessor_(&db_accessor), address_(NormalizedAddress(address)) {}
|
||||
: db_accessor_(&db_accessor),
|
||||
address_(db_accessor.LocalizedAddress(address)) {}
|
||||
|
||||
template <typename TRecord>
|
||||
const PropertyValue &RecordAccessor<TRecord>::PropsAt(
|
||||
@ -198,6 +199,7 @@ const TRecord &RecordAccessor<TRecord>::current() const {
|
||||
template <typename TRecord>
|
||||
void RecordAccessor<TRecord>::ProcessDelta(
|
||||
const database::StateDelta &delta) const {
|
||||
auto &dba = db_accessor();
|
||||
// Apply the delta both on local and remote data. We need to see the changes
|
||||
// we make to remote data, even if it's not applied immediately.
|
||||
auto &updated = update();
|
||||
@ -227,12 +229,22 @@ void RecordAccessor<TRecord>::ProcessDelta(
|
||||
std::swap(*found, labels.back());
|
||||
labels.pop_back();
|
||||
} break;
|
||||
case StateDelta::Type::ADD_OUT_EDGE:
|
||||
reinterpret_cast<Vertex &>(updated).out_.emplace(
|
||||
dba.LocalizedAddress(delta.vertex_to_address),
|
||||
dba.LocalizedAddress(delta.edge_address), delta.edge_type);
|
||||
break;
|
||||
case StateDelta::Type::ADD_IN_EDGE:
|
||||
reinterpret_cast<Vertex &>(updated).in_.emplace(
|
||||
dba.LocalizedAddress(delta.vertex_from_address),
|
||||
dba.LocalizedAddress(delta.edge_address), delta.edge_type);
|
||||
break;
|
||||
}
|
||||
|
||||
if (is_local()) {
|
||||
db_accessor().wal().Emplace(delta);
|
||||
dba.wal().Emplace(delta);
|
||||
} else {
|
||||
auto result = db_accessor().db().remote_updates_clients().RemoteUpdate(
|
||||
auto result = dba.db().remote_updates_clients().RemoteUpdate(
|
||||
address().worker_id(), delta);
|
||||
switch (result) {
|
||||
case distributed::RemoteUpdateResult::DONE:
|
||||
@ -247,27 +259,5 @@ void RecordAccessor<TRecord>::ProcessDelta(
|
||||
}
|
||||
}
|
||||
|
||||
template <>
|
||||
RecordAccessor<Vertex>::AddressT RecordAccessor<Vertex>::NormalizedAddress(
|
||||
AddressT address) const {
|
||||
if (address.is_local()) return address;
|
||||
if (address.worker_id() == db_accessor().db_.WorkerId()) {
|
||||
return AddressT(db_accessor().LocalVertexAddress(address.gid()));
|
||||
}
|
||||
|
||||
return address;
|
||||
}
|
||||
|
||||
template <>
|
||||
RecordAccessor<Edge>::AddressT RecordAccessor<Edge>::NormalizedAddress(
|
||||
AddressT address) const {
|
||||
if (address.is_local()) return address;
|
||||
if (address.worker_id() == db_accessor().db_.WorkerId()) {
|
||||
return AddressT(db_accessor().LocalEdgeAddress(address.gid()));
|
||||
}
|
||||
|
||||
return address;
|
||||
}
|
||||
|
||||
template class RecordAccessor<Vertex>;
|
||||
template class RecordAccessor<Edge>;
|
||||
|
@ -202,12 +202,6 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
* an update.
|
||||
*/
|
||||
mutable TRecord *new_{nullptr};
|
||||
|
||||
/** Returns an address that is local, if the given address is local, or if it
|
||||
* is remote, but points to this worker. This method is used in the
|
||||
* constructor, but the graph_db_accessor field must be initizalized when it's
|
||||
* called. */
|
||||
AddressT NormalizedAddress(AddressT address) const;
|
||||
};
|
||||
|
||||
/** Error when trying to update a deleted record */
|
||||
|
@ -67,7 +67,7 @@ class DistributedGraphDbTest : public ::testing::Test {
|
||||
|
||||
/// Inserts a vertex and returns it's global address. Does it in a new
|
||||
/// transaction.
|
||||
auto InsertVertex(database::GraphDb &db) {
|
||||
storage::VertexAddress InsertVertex(database::GraphDb &db) {
|
||||
database::GraphDbAccessor dba{db};
|
||||
auto r_val = dba.InsertVertex().GlobalAddress();
|
||||
dba.Commit();
|
||||
@ -75,8 +75,9 @@ class DistributedGraphDbTest : public ::testing::Test {
|
||||
}
|
||||
|
||||
/// Inserts an edge (on the 'from' side) and returns it's global address.
|
||||
auto InsertEdge(storage::VertexAddress from, storage::VertexAddress to,
|
||||
const std::string &edge_type_name) {
|
||||
storage::EdgeAddress InsertEdge(storage::VertexAddress from,
|
||||
storage::VertexAddress to,
|
||||
const std::string &edge_type_name) {
|
||||
database::GraphDbAccessor dba{worker(from.worker_id())};
|
||||
auto from_v = dba.FindVertexChecked(from.gid(), false);
|
||||
auto edge_type = dba.EdgeType(edge_type_name);
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
#include "distributed/remote_updates_rpc_clients.hpp"
|
||||
#include "distributed/remote_updates_rpc_server.hpp"
|
||||
|
||||
#include "distributed_common.hpp"
|
||||
@ -59,3 +60,187 @@ TEST_F(DistributedUpdateTest, RemoteUpdateApply) {
|
||||
}
|
||||
|
||||
#undef EXPECT_LABEL
|
||||
|
||||
TEST_F(DistributedGraphDbTest, CreateVertex) {
|
||||
gid::Gid gid;
|
||||
{
|
||||
database::GraphDbAccessor dba{worker(1)};
|
||||
auto v = dba.InsertVertexIntoRemote(2, {}, {});
|
||||
gid = v.gid();
|
||||
dba.Commit();
|
||||
}
|
||||
{
|
||||
database::GraphDbAccessor dba{worker(2)};
|
||||
auto v = dba.FindVertex(gid, false);
|
||||
ASSERT_TRUE(v);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDbTest, CreateVertexWithUpdate) {
|
||||
gid::Gid gid;
|
||||
storage::Property prop;
|
||||
{
|
||||
database::GraphDbAccessor dba{worker(1)};
|
||||
auto v = dba.InsertVertexIntoRemote(2, {}, {});
|
||||
gid = v.gid();
|
||||
prop = dba.Property("prop");
|
||||
v.PropsSet(prop, 42);
|
||||
worker(2).remote_updates_server().Apply(dba.transaction_id());
|
||||
dba.Commit();
|
||||
}
|
||||
{
|
||||
database::GraphDbAccessor dba{worker(2)};
|
||||
auto v = dba.FindVertex(gid, false);
|
||||
ASSERT_TRUE(v);
|
||||
EXPECT_EQ(v->PropsAt(prop).Value<int64_t>(), 42);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DistributedGraphDbTest, CreateVertexWithData) {
|
||||
gid::Gid gid;
|
||||
storage::Label l1;
|
||||
storage::Label l2;
|
||||
storage::Property prop;
|
||||
{
|
||||
database::GraphDbAccessor dba{worker(1)};
|
||||
l1 = dba.Label("l1");
|
||||
l2 = dba.Label("l2");
|
||||
prop = dba.Property("prop");
|
||||
auto v = dba.InsertVertexIntoRemote(2, {l1, l2}, {{prop, 42}});
|
||||
gid = v.gid();
|
||||
|
||||
// Check local visibility before commit.
|
||||
EXPECT_TRUE(v.has_label(l1));
|
||||
EXPECT_TRUE(v.has_label(l2));
|
||||
EXPECT_EQ(v.PropsAt(prop).Value<int64_t>(), 42);
|
||||
|
||||
worker(2).remote_updates_server().Apply(dba.transaction_id());
|
||||
dba.Commit();
|
||||
}
|
||||
{
|
||||
database::GraphDbAccessor dba{worker(2)};
|
||||
auto v = dba.FindVertex(gid, false);
|
||||
ASSERT_TRUE(v);
|
||||
// Check remote data after commit.
|
||||
EXPECT_TRUE(v->has_label(l1));
|
||||
EXPECT_TRUE(v->has_label(l2));
|
||||
EXPECT_EQ(v->PropsAt(prop).Value<int64_t>(), 42);
|
||||
}
|
||||
}
|
||||
|
||||
class DistributedEdgeCreateTest : public DistributedGraphDbTest {
|
||||
protected:
|
||||
storage::VertexAddress w1_a;
|
||||
storage::VertexAddress w1_b;
|
||||
storage::VertexAddress w2_a;
|
||||
storage::EdgeAddress e_ga;
|
||||
|
||||
void SetUp() override {
|
||||
DistributedGraphDbTest::SetUp();
|
||||
w1_a = InsertVertex(worker(1));
|
||||
w1_b = InsertVertex(worker(1));
|
||||
w2_a = InsertVertex(worker(2));
|
||||
}
|
||||
|
||||
void CreateEdge(database::GraphDb &creator, storage::VertexAddress from_addr,
|
||||
storage::VertexAddress to_addr) {
|
||||
database::GraphDbAccessor dba{creator};
|
||||
auto edge_type = dba.EdgeType("et");
|
||||
VertexAccessor v1{from_addr, dba};
|
||||
VertexAccessor v2{to_addr, dba};
|
||||
e_ga = dba.InsertEdge(v1, v2, edge_type).GlobalAddress();
|
||||
master().remote_updates_server().Apply(dba.transaction_id());
|
||||
worker(1).remote_updates_server().Apply(dba.transaction_id());
|
||||
worker(2).remote_updates_server().Apply(dba.transaction_id());
|
||||
dba.Commit();
|
||||
}
|
||||
|
||||
int EdgeCount(database::GraphDb &db) {
|
||||
database::GraphDbAccessor dba(db);
|
||||
auto edges = dba.Edges(false);
|
||||
return std::distance(edges.begin(), edges.end());
|
||||
};
|
||||
|
||||
void CheckCounts(int master_count, int worker1_count, int worker2_count) {
|
||||
EXPECT_EQ(EdgeCount(master()), master_count);
|
||||
EXPECT_EQ(EdgeCount(worker(1)), worker1_count);
|
||||
EXPECT_EQ(EdgeCount(worker(2)), worker2_count);
|
||||
}
|
||||
|
||||
void CheckState(database::GraphDb &db, bool edge_is_local,
|
||||
storage::VertexAddress from_addr,
|
||||
storage::VertexAddress to_addr) {
|
||||
database::GraphDbAccessor dba{db};
|
||||
|
||||
// Check edge data.
|
||||
{
|
||||
EdgeAccessor edge{e_ga, dba};
|
||||
EXPECT_EQ(edge.address().is_local(), edge_is_local);
|
||||
EXPECT_EQ(edge.GlobalAddress(), e_ga);
|
||||
auto from = edge.from();
|
||||
EXPECT_EQ(from.GlobalAddress(), from_addr);
|
||||
auto to = edge.to();
|
||||
EXPECT_EQ(to.GlobalAddress(), to_addr);
|
||||
}
|
||||
|
||||
auto edges = [](auto iterable) {
|
||||
std::vector<EdgeAccessor> res;
|
||||
for (auto edge : iterable) res.emplace_back(edge);
|
||||
return res;
|
||||
};
|
||||
|
||||
// Check `from` data.
|
||||
{
|
||||
VertexAccessor from{from_addr, dba};
|
||||
ASSERT_EQ(edges(from.out()).size(), 1);
|
||||
EXPECT_EQ(edges(from.out())[0].GlobalAddress(), e_ga);
|
||||
// In case of cycles we have 1 in the `in` edges.
|
||||
EXPECT_EQ(edges(from.in()).size(), from_addr == to_addr);
|
||||
}
|
||||
|
||||
// Check `to` data.
|
||||
{
|
||||
VertexAccessor to{to_addr, dba};
|
||||
// In case of cycles we have 1 in the `out` edges.
|
||||
EXPECT_EQ(edges(to.out()).size(), from_addr == to_addr);
|
||||
ASSERT_EQ(edges(to.in()).size(), 1);
|
||||
EXPECT_EQ(edges(to.in())[0].GlobalAddress(), e_ga);
|
||||
}
|
||||
}
|
||||
|
||||
void CheckAll(storage::VertexAddress from_addr,
|
||||
storage::VertexAddress to_addr) {
|
||||
int edge_worker = from_addr.worker_id();
|
||||
CheckCounts(edge_worker == 0, edge_worker == 1, edge_worker == 2);
|
||||
CheckState(master(), edge_worker == 0, from_addr, to_addr);
|
||||
CheckState(worker(1), edge_worker == 1, from_addr, to_addr);
|
||||
CheckState(worker(2), edge_worker == 2, from_addr, to_addr);
|
||||
}
|
||||
|
||||
void TearDown() override { DistributedGraphDbTest::TearDown(); }
|
||||
};
|
||||
|
||||
TEST_F(DistributedEdgeCreateTest, LocalRemote) {
|
||||
CreateEdge(worker(1), w1_a, w2_a);
|
||||
CheckAll(w1_a, w2_a);
|
||||
}
|
||||
|
||||
TEST_F(DistributedEdgeCreateTest, RemoteLocal) {
|
||||
CreateEdge(worker(2), w1_a, w2_a);
|
||||
CheckAll(w1_a, w2_a);
|
||||
}
|
||||
|
||||
TEST_F(DistributedEdgeCreateTest, RemoteRemoteDifferentWorkers) {
|
||||
CreateEdge(master(), w1_a, w2_a);
|
||||
CheckAll(w1_a, w2_a);
|
||||
}
|
||||
|
||||
TEST_F(DistributedEdgeCreateTest, RemoteRemoteSameWorkers) {
|
||||
CreateEdge(master(), w1_a, w1_b);
|
||||
CheckAll(w1_a, w1_b);
|
||||
}
|
||||
|
||||
TEST_F(DistributedEdgeCreateTest, RemoteRemoteCycle) {
|
||||
CreateEdge(master(), w1_a, w1_a);
|
||||
CheckAll(w1_a, w1_a);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user