DetachRemoveVertex remote

Reviewers: florijan

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1264
This commit is contained in:
Dominik Gleich 2018-03-01 17:39:22 +01:00
parent 1ca98826af
commit d5b9a11e87
11 changed files with 146 additions and 37 deletions

View File

@ -336,13 +336,14 @@ int64_t GraphDbAccessor::VerticesCount(
}
}
bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) {
bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor,
bool check_empty) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
if (!vertex_accessor.is_local()) {
auto address = vertex_accessor.address();
db().remote_updates_clients().RemoteRemoveVertex(
address.worker_id(), transaction_id(), address.gid());
address.worker_id(), transaction_id(), address.gid(), check_empty);
// We can't know if we are going to be able to remove vertex until deferred
// updates on a remote worker are executed
return true;
@ -352,25 +353,22 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) {
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
if (vertex_accessor.current().is_expired_by(transaction_)) return true;
if (vertex_accessor.out_degree() > 0 || vertex_accessor.in_degree() > 0)
if (check_empty &&
vertex_accessor.out_degree() + vertex_accessor.in_degree() > 0)
return false;
auto *vlist_ptr = vertex_accessor.address().local();
wal().Emplace(
database::StateDelta::RemoveVertex(transaction_.id_, vlist_ptr->gid_));
wal().Emplace(database::StateDelta::RemoveVertex(
transaction_.id_, vlist_ptr->gid_, check_empty));
vlist_ptr->remove(vertex_accessor.current_, transaction_);
return true;
}
void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
if (!vertex_accessor.is_local()) {
LOG(ERROR) << "Remote vertex deletion not implemented";
// TODO support distributed
// call remote DetachRemoveVertex(gid). It can either succeed or an error
// can occur. See discussion in the RemoveVertex method above.
}
vertex_accessor.SwitchNew();
// Note that when we call RemoveEdge we must take care not to delete from the
// collection we are iterating over. This invalidates the iterator in a subtle
// way that does not fail in tests, but is NOT correct.
@ -380,13 +378,7 @@ void GraphDbAccessor::DetachRemoveVertex(VertexAccessor &vertex_accessor) {
for (auto edge_accessor : vertex_accessor.out())
RemoveEdge(edge_accessor, false, true);
vertex_accessor.SwitchNew();
// it's possible the vertex was removed already in this transaction
// due to it getting matched multiple times by some patterns
// we can only delete it once, so check if it's already deleted
if (!vertex_accessor.current().is_expired_by(transaction_))
vertex_accessor.address().local()->remove(vertex_accessor.current_,
transaction_);
RemoveVertex(vertex_accessor, false);
}
EdgeAccessor GraphDbAccessor::InsertEdge(

View File

@ -94,9 +94,11 @@ class GraphDbAccessor {
* this function will not do anything and will return true.
*
* @param vertex_accessor Accessor to vertex.
* @param check_empty If the vertex should be checked for existing edges
* before deletion.
* @return If or not the vertex was deleted.
*/
bool RemoveVertex(VertexAccessor &vertex_accessor);
bool RemoveVertex(VertexAccessor &vertex_accessor, bool check_empty = true);
/**
* Removes the vertex of the given accessor along with all it's outgoing

View File

@ -135,9 +135,10 @@ StateDelta StateDelta::RemoveLabel(tx::transaction_id_t tx_id,
}
StateDelta StateDelta::RemoveVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id) {
gid::Gid vertex_id, bool check_empty) {
StateDelta op(StateDelta::Type::REMOVE_VERTEX, tx_id);
op.vertex_id = vertex_id;
op.check_empty = check_empty;
return op;
}

View File

@ -41,7 +41,7 @@ struct StateDelta {
// remove property is done by setting a PropertyValue::Null
ADD_LABEL, // vertex_id, label, label_name
REMOVE_LABEL, // vertex_id, label, label_name
REMOVE_VERTEX, // vertex_id
REMOVE_VERTEX, // vertex_id, check_empty
REMOVE_EDGE, // edge_id
BUILD_INDEX // label, label_name, property, property_name
};
@ -99,8 +99,8 @@ struct StateDelta {
static StateDelta RemoveLabel(tx::transaction_id_t tx_id, gid::Gid vertex_id,
storage::Label label,
const std::string &label_name);
static StateDelta RemoveVertex(tx::transaction_id_t tx_id,
gid::Gid vertex_id);
static StateDelta RemoveVertex(tx::transaction_id_t tx_id, gid::Gid vertex_id,
bool check_empty);
static StateDelta RemoveEdge(tx::transaction_id_t tx_id, gid::Gid edge_id);
static StateDelta BuildIndex(tx::transaction_id_t tx_id, storage::Label label,
const std::string &label_name,
@ -131,6 +131,7 @@ struct StateDelta {
PropertyValue value = PropertyValue::Null;
storage::Label label;
std::string label_name;
bool check_empty;
private:
friend class boost::serialization::access;
@ -153,6 +154,7 @@ struct StateDelta {
utils::SaveTypedValue(ar, value);
ar &label;
ar &label_name;
ar &check_empty;
}
template <class TArchive>
@ -175,6 +177,7 @@ struct StateDelta {
value = tv;
ar &label;
ar &label_name;
ar &check_empty;
}
};
} // namespace database

View File

@ -86,10 +86,10 @@ class RemoteUpdatesRpcClients {
}
void RemoteRemoveVertex(int worker_id, tx::transaction_id_t tx_id,
gid::Gid gid) {
gid::Gid gid, bool check_empty) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveVertexRpc>(
RemoteRemoveVertexReqData{gid, tx_id});
RemoteRemoveVertexReqData{gid, tx_id, check_empty});
CHECK(res) << "RemoteRemoveVertex RPC failed";
RaiseIfRemoteError(res->member);
}

View File

@ -142,6 +142,7 @@ using RemoteAddInEdgeRpc =
struct RemoteRemoveVertexReqData {
gid::Gid gid;
tx::transaction_id_t tx_id;
bool check_empty;
private:
friend class boost::serialization::access;
@ -150,6 +151,7 @@ struct RemoteRemoveVertexReqData {
void serialize(TArchive &ar, unsigned int) {
ar &gid;
ar &tx_id;
ar &check_empty;
}
};

View File

@ -128,7 +128,8 @@ class RemoteUpdatesRpcServer {
"graph element";
case database::StateDelta::Type::REMOVE_VERTEX:
if (!db_accessor().RemoveVertex(
reinterpret_cast<VertexAccessor &>(record_accessor))) {
reinterpret_cast<VertexAccessor &>(record_accessor),
delta.check_empty)) {
return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR;
}
break;
@ -271,8 +272,8 @@ class RemoteUpdatesRpcServer {
server.Register<RemoteRemoveVertexRpc>(
[this](const RemoteRemoveVertexReq &req) {
auto to_delta = database::StateDelta::RemoveVertex(req.member.tx_id,
req.member.gid);
auto to_delta = database::StateDelta::RemoveVertex(
req.member.tx_id, req.member.gid, req.member.check_empty);
auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteRemoveVertexRes>(result);

View File

@ -88,9 +88,10 @@ class Edges {
* present in this iterator. */
void update_position() {
if (vertex_.local()) {
position_ = std::find_if(
position_, end_,
[v = this->vertex_](const Element &e) { return e.vertex == v; });
position_ = std::find_if(position_,
end_, [v = this->vertex_](const Element &e) {
return e.vertex == v;
});
}
if (edge_types_) {
position_ = std::find_if(position_, end_, [this](const Element &e) {
@ -121,7 +122,8 @@ class Edges {
auto found = std::find_if(
storage_.begin(), storage_.end(),
[edge](const Element &element) { return edge == element.edge; });
DCHECK(found != storage_.end()) << "Removing an edge that is not present";
// If the edge is not in the structure we don't care and can simply return
if (found == storage_.end()) return;
*found = std::move(storage_.back());
storage_.pop_back();
}

View File

@ -58,6 +58,10 @@ void VertexAccessor::RemoveOutEdge(storage::EdgeAddress edge) {
auto &dba = db_accessor();
auto delta = database::StateDelta::RemoveOutEdge(
dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge));
SwitchNew();
if (current().is_expired_by(dba.transaction())) return;
update().out_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));
ProcessDelta(delta);
}
@ -66,6 +70,10 @@ void VertexAccessor::RemoveInEdge(storage::EdgeAddress edge) {
auto &dba = db_accessor();
auto delta = database::StateDelta::RemoveInEdge(
dba.transaction_id(), gid(), dba.db().storage().GlobalizedAddress(edge));
SwitchNew();
if (current().is_expired_by(dba.transaction())) return;
update().in_.RemoveEdge(dba.db().storage().LocalizedAddressIfPossible(edge));
ProcessDelta(delta);
}
@ -76,9 +84,10 @@ std::ostream &operator<<(std::ostream &os, const VertexAccessor &va) {
stream << va.db_accessor().LabelName(label);
});
os << " {";
utils::PrintIterable(os, va.Properties(), ", ", [&](auto &stream,
const auto &pair) {
stream << va.db_accessor().PropertyName(pair.first) << ": " << pair.second;
});
utils::PrintIterable(os, va.Properties(), ", ",
[&](auto &stream, const auto &pair) {
stream << va.db_accessor().PropertyName(pair.first)
<< ": " << pair.second;
});
return os << "})";
}

View File

@ -1,5 +1,7 @@
#include <gtest/gtest.h>
#include <functional>
#include "database/graph_db_accessor.hpp"
#include "distributed/remote_updates_rpc_clients.hpp"
#include "distributed/remote_updates_rpc_server.hpp"
@ -252,6 +254,100 @@ TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) {
}
}
class DistributedDetachDeleteTest : public DistributedGraphDbTest {
protected:
storage::VertexAddress w1_a;
storage::VertexAddress w1_b;
storage::VertexAddress w2_a;
void SetUp() override {
DistributedGraphDbTest::SetUp();
w1_a = InsertVertex(worker(1));
w1_b = InsertVertex(worker(1));
w2_a = InsertVertex(worker(2));
}
template <typename TF>
void Run(storage::VertexAddress v_address, TF check_func) {
for (int i : {0, 1, 2}) {
database::GraphDbAccessor dba0{master()};
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
database::GraphDbAccessor dba2{worker(2), dba0.transaction_id()};
std::vector<std::reference_wrapper<database::GraphDbAccessor>> dba;
dba.emplace_back(dba0);
dba.emplace_back(dba1);
dba.emplace_back(dba2);
auto &accessor = dba[i].get();
auto v_accessor = VertexAccessor(v_address, accessor);
accessor.DetachRemoveVertex(v_accessor);
for (auto db_accessor : dba) {
ASSERT_EQ(db_accessor.get().db().remote_updates_server().Apply(
dba[0].get().transaction_id()),
distributed::RemoteUpdateResult::DONE);
}
check_func(dba);
}
}
};
TEST_F(DistributedDetachDeleteTest, VertexCycle) {
auto e_address = InsertEdge(w1_a, w1_a, "edge");
Run(w1_a,
[this, e_address](
std::vector<std::reference_wrapper<database::GraphDbAccessor>> &dba) {
EXPECT_FALSE(dba[1].get().FindVertex(w1_a.gid(), true));
EXPECT_FALSE(dba[1].get().FindEdge(e_address.gid(), true));
});
}
TEST_F(DistributedDetachDeleteTest, TwoVerticesDifferentWorkers) {
auto e_address = InsertEdge(w1_a, w2_a, "edge");
// Delete from
Run(w1_a,
[this, e_address](
std::vector<std::reference_wrapper<database::GraphDbAccessor>> &dba) {
EXPECT_FALSE(dba[1].get().FindVertex(w1_a.gid(), true));
EXPECT_TRUE(dba[2].get().FindVertex(w2_a.gid(), true));
EXPECT_FALSE(dba[1].get().FindEdge(e_address.gid(), true));
});
// Delete to
Run(w2_a,
[this, e_address](
std::vector<std::reference_wrapper<database::GraphDbAccessor>> &dba) {
EXPECT_TRUE(dba[1].get().FindVertex(w1_a.gid(), true));
EXPECT_FALSE(dba[2].get().FindVertex(w2_a.gid(), true));
EXPECT_FALSE(dba[1].get().FindEdge(e_address.gid(), true));
});
}
TEST_F(DistributedDetachDeleteTest, TwoVerticesSameWorkers) {
auto e_address = InsertEdge(w1_a, w1_b, "edge");
// Delete from
Run(w1_a,
[this, e_address](
std::vector<std::reference_wrapper<database::GraphDbAccessor>> &dba) {
EXPECT_FALSE(dba[1].get().FindVertex(w1_a.gid(), true));
EXPECT_TRUE(dba[1].get().FindVertex(w1_b.gid(), true));
EXPECT_FALSE(dba[1].get().FindEdge(e_address.gid(), true));
});
// Delete to
Run(w1_b,
[this, e_address](
std::vector<std::reference_wrapper<database::GraphDbAccessor>> &dba) {
EXPECT_TRUE(dba[1].get().FindVertex(w1_a.gid(), true));
EXPECT_FALSE(dba[1].get().FindVertex(w1_b.gid(), true));
EXPECT_FALSE(dba[1].get().FindEdge(e_address.gid(), true));
});
}
class DistributedEdgeCreateTest : public DistributedGraphDbTest {
protected:
storage::VertexAddress w1_a;

View File

@ -32,7 +32,8 @@ TEST(StateDelta, RemoveVertex) {
}
{
database::GraphDbAccessor dba(db);
auto delta = database::StateDelta::RemoveVertex(dba.transaction_id(), gid0);
auto delta =
database::StateDelta::RemoveVertex(dba.transaction_id(), gid0, true);
delta.Apply(dba);
dba.Commit();
}