Resolve global address to local
Summary: It is possible that we have a global address to resolve, for a graph element that's local. Consider W1 expanding, getting data from W2, expanding from there and getting data that is on W1. We then don't want to do RPC from W1 to W1, but do a lookup directly. Reviewers: dgleich Reviewed By: dgleich Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1145
This commit is contained in:
parent
b823d6a71c
commit
bfb3a0d9b1
@ -87,10 +87,7 @@ VertexAccessor GraphDbAccessor::InsertVertex(
|
||||
|
||||
std::experimental::optional<VertexAccessor> GraphDbAccessor::FindVertex(
|
||||
gid::Gid gid, bool current_state) {
|
||||
auto collection_accessor = db_.storage().vertices_.access();
|
||||
auto found = collection_accessor.find(gid);
|
||||
if (found == collection_accessor.end()) return std::experimental::nullopt;
|
||||
VertexAccessor record_accessor(found->second, *this);
|
||||
VertexAccessor record_accessor(LocalVertexAddress(gid), *this);
|
||||
if (!record_accessor.Visible(transaction(), current_state))
|
||||
return std::experimental::nullopt;
|
||||
return record_accessor;
|
||||
@ -105,10 +102,7 @@ VertexAccessor GraphDbAccessor::FindVertexChecked(gid::Gid gid,
|
||||
|
||||
std::experimental::optional<EdgeAccessor> GraphDbAccessor::FindEdge(
|
||||
gid::Gid gid, bool current_state) {
|
||||
auto collection_accessor = db_.storage().edges_.access();
|
||||
auto found = collection_accessor.find(gid);
|
||||
if (found == collection_accessor.end()) return std::experimental::nullopt;
|
||||
EdgeAccessor record_accessor(found->second, *this);
|
||||
EdgeAccessor record_accessor(LocalEdgeAddress(gid), *this);
|
||||
if (!record_accessor.Visible(transaction(), current_state))
|
||||
return std::experimental::nullopt;
|
||||
return record_accessor;
|
||||
@ -523,4 +517,19 @@ template <>
|
||||
distributed::RemoteCache<Edge> &GraphDbAccessor::remote_elements() {
|
||||
return remote_edges();
|
||||
}
|
||||
|
||||
mvcc::VersionList<Vertex> *GraphDbAccessor::LocalVertexAddress(
|
||||
gid::Gid gid) const {
|
||||
auto access = db_.storage().vertices_.access();
|
||||
auto found = access.find(gid);
|
||||
CHECK(found != access.end()) << "Failed to find vertex for gid: " << gid;
|
||||
return found->second;
|
||||
}
|
||||
|
||||
mvcc::VersionList<Edge> *GraphDbAccessor::LocalEdgeAddress(gid::Gid gid) const {
|
||||
auto access = db_.storage().edges_.access();
|
||||
auto found = access.find(gid);
|
||||
CHECK(found != access.end()) << "Failed to find edge for gid: " << gid;
|
||||
return found->second;
|
||||
}
|
||||
} // namespace database
|
||||
|
@ -606,5 +606,11 @@ class GraphDbAccessor {
|
||||
void UpdatePropertyIndex(storage::Property property,
|
||||
const RecordAccessor<Vertex> &vertex_accessor,
|
||||
const Vertex *const vertex);
|
||||
|
||||
/// Gets the local address for the given gid. Fails if not present.
|
||||
mvcc::VersionList<Vertex> *LocalVertexAddress(gid::Gid gid) const;
|
||||
|
||||
/// Gets the local edge address for the given gid. Fails if not present.
|
||||
mvcc::VersionList<Edge> *LocalEdgeAddress(gid::Gid gid) const;
|
||||
};
|
||||
} // namespace database
|
||||
|
@ -10,7 +10,7 @@ using database::StateDelta;
|
||||
template <typename TRecord>
|
||||
RecordAccessor<TRecord>::RecordAccessor(AddressT address,
|
||||
database::GraphDbAccessor &db_accessor)
|
||||
: db_accessor_(&db_accessor), address_(address) {}
|
||||
: db_accessor_(&db_accessor), address_(NormalizedAddress(address)) {}
|
||||
|
||||
template <typename TRecord>
|
||||
const PropertyValue &RecordAccessor<TRecord>::PropsAt(
|
||||
@ -142,9 +142,11 @@ bool RecordAccessor<TRecord>::Reconstruct() const {
|
||||
if (is_local()) {
|
||||
address_.local()->find_set_old_new(db_accessor_->transaction(), old_, new_);
|
||||
} else {
|
||||
// TODO in write queries it's possible the command has been advanced and we
|
||||
// need to invalidate the RemoteCache and really get the latest stuff. But
|
||||
// only do that after the command has been advanced.
|
||||
// It's not possible that we have a global address for a graph element
|
||||
// that's local, because that is resolved in the constructor.
|
||||
// TODO in write queries it's possible the command has been advanced and
|
||||
// we need to invalidate the RemoteCache and really get the latest stuff.
|
||||
// But only do that after the command has been advanced.
|
||||
db_accessor().template remote_elements<TRecord>().FindSetOldNew(
|
||||
db_accessor().transaction().id_, address_.worker_id(),
|
||||
address_.global_id(), old_, new_);
|
||||
@ -203,5 +205,27 @@ void RecordAccessor<TRecord>::ProcessDelta(const GraphStateDelta &) const {
|
||||
}
|
||||
}
|
||||
|
||||
template <>
|
||||
RecordAccessor<Vertex>::AddressT RecordAccessor<Vertex>::NormalizedAddress(
|
||||
AddressT address) const {
|
||||
if (address.is_local()) return address;
|
||||
if (address.worker_id() == db_accessor().db_.WorkerId()) {
|
||||
return AddressT(db_accessor().LocalVertexAddress(address.global_id()));
|
||||
}
|
||||
|
||||
return address;
|
||||
}
|
||||
|
||||
template <>
|
||||
RecordAccessor<Edge>::AddressT RecordAccessor<Edge>::NormalizedAddress(
|
||||
AddressT address) const {
|
||||
if (address.is_local()) return address;
|
||||
if (address.worker_id() == db_accessor().db_.WorkerId()) {
|
||||
return AddressT(db_accessor().LocalEdgeAddress(address.global_id()));
|
||||
}
|
||||
|
||||
return address;
|
||||
}
|
||||
|
||||
template class RecordAccessor<Vertex>;
|
||||
template class RecordAccessor<Edge>;
|
||||
|
@ -214,6 +214,12 @@ class RecordAccessor : public TotalOrdering<RecordAccessor<TRecord>> {
|
||||
* an update.
|
||||
*/
|
||||
mutable TRecord *new_{nullptr};
|
||||
|
||||
/** Returns an address that is local, if the given address is local, or if it
|
||||
* is remote, but points to this worker. This method is used in the
|
||||
* constructor, but the graph_db_accessor field must be initizalized when it's
|
||||
* called. */
|
||||
AddressT NormalizedAddress(AddressT address) const;
|
||||
};
|
||||
|
||||
/** Error when trying to update a deleted record */
|
||||
|
@ -5,9 +5,11 @@
|
||||
|
||||
#include "database/graph_db.hpp"
|
||||
#include "database/graph_db_accessor.hpp"
|
||||
|
||||
#include "mvcc/version_list.hpp"
|
||||
#include "storage/address.hpp"
|
||||
#include "storage/edge_accessor.hpp"
|
||||
#include "storage/property_value.hpp"
|
||||
#include "storage/vertex.hpp"
|
||||
#include "storage/vertex_accessor.hpp"
|
||||
|
||||
TEST(RecordAccessor, Properties) {
|
||||
@ -58,6 +60,19 @@ TEST(RecordAccessor, RecordEquality) {
|
||||
EXPECT_NE(e1, e2);
|
||||
}
|
||||
|
||||
TEST(RecordAccessor, GlobalToLocalAddressConversion) {
|
||||
database::SingleNode db;
|
||||
database::GraphDbAccessor dba(db);
|
||||
|
||||
auto v1 = dba.InsertVertex();
|
||||
storage::Address<mvcc::VersionList<Vertex>> global_address{v1.gid(),
|
||||
db.WorkerId()};
|
||||
EXPECT_FALSE(global_address.is_local());
|
||||
auto v1_from_global = VertexAccessor(global_address, dba);
|
||||
EXPECT_TRUE(v1_from_global.address().is_local());
|
||||
EXPECT_EQ(v1_from_global.address(), v1.address());
|
||||
}
|
||||
|
||||
TEST(RecordAccessor, SwitchOldAndSwitchNewMemberFunctionTest) {
|
||||
database::SingleNode db;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user