Vertex removal using rpcs

Summary:
Remove vertex remote

Add tests

Reviewers: florijan, teon.banek

Reviewed By: florijan

Subscribers: teon.banek, pullbot

Differential Revision: https://phabricator.memgraph.io/D1230
This commit is contained in:
Dominik Gleich 2018-02-28 10:36:48 +01:00
parent 7e945c6667
commit 99375a4b47
9 changed files with 143 additions and 24 deletions

View File

@ -100,3 +100,7 @@ BOOST_CLASS_EXPORT(distributed::RemoteCreateEdgeRes);
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReqData);
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeReq);
BOOST_CLASS_EXPORT(distributed::RemoteAddInEdgeRes);
// Remote removes
BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexReq);
BOOST_CLASS_EXPORT(distributed::RemoteRemoveVertexRes);

View File

@ -375,13 +375,12 @@ bool GraphDbAccessor::RemoveVertex(VertexAccessor &vertex_accessor) {
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
if (!vertex_accessor.is_local()) {
LOG(ERROR) << "Remote vertex deletion not implemented";
// TODO support distributed
// call remote RemoveVertex(gid), return it's result. The result can be
// (true, false), or an error can occur (serialization, timeout). In case
// of error the remote worker will be asking for a transaction abort,
// not sure what to do here.
return false;
auto address = vertex_accessor.address();
db().remote_updates_clients().RemoteRemoveVertex(
address.worker_id(), transaction_id(), address.gid());
// We can't know if we are going to be able to remove vertex until deferred
// updates on a remote worker are executed
return true;
}
vertex_accessor.SwitchNew();
// it's possible the vertex was removed already in this transaction

View File

@ -7,6 +7,7 @@
#include "distributed/coordination.hpp"
#include "distributed/remote_updates_rpc_messages.hpp"
#include "distributed/rpc_worker_clients.hpp"
#include "query/exceptions.hpp"
#include "query/typed_value.hpp"
#include "storage/address_types.hpp"
#include "storage/gid.hpp"
@ -85,6 +86,15 @@ class RemoteUpdatesRpcClients {
RaiseIfRemoteError(res->member);
}
void RemoteRemoveVertex(int worker_id, tx::transaction_id_t tx_id,
gid::Gid gid) {
auto res =
worker_clients_.GetClientPool(worker_id).Call<RemoteRemoveVertexRpc>(
RemoteRemoveVertexReqData{gid, tx_id});
CHECK(res) << "RemoteRemoveVertex RPC failed";
RaiseIfRemoteError(res->member);
}
/// Calls for the worker with the given ID to apply remote updates. Returns
/// the results of that operation.
RemoteUpdateResult RemoteUpdateApply(int worker_id,
@ -109,13 +119,15 @@ class RemoteUpdatesRpcClients {
void RaiseIfRemoteError(RemoteUpdateResult result) {
switch (result) {
case RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case RemoteUpdateResult::SERIALIZATION_ERROR:
throw new mvcc::SerializationError();
throw mvcc::SerializationError();
case RemoteUpdateResult::LOCK_TIMEOUT_ERROR:
throw new LockTimeoutException(
throw LockTimeoutException(
"Remote LockTimeoutError during edge creation");
case RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw new RecordDeletedError();
throw RecordDeletedError();
case RemoteUpdateResult::DONE:
break;
}

View File

@ -18,7 +18,8 @@ enum class RemoteUpdateResult {
DONE,
SERIALIZATION_ERROR,
LOCK_TIMEOUT_ERROR,
UPDATE_DELETED_ERROR
UPDATE_DELETED_ERROR,
UNABLE_TO_DELETE_VERTEX_ERROR
};
RPC_SINGLE_MEMBER_MESSAGE(RemoteUpdateReq, database::StateDelta);
@ -137,4 +138,24 @@ RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeReq, RemoteAddInEdgeReqData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteAddInEdgeRes, RemoteUpdateResult);
using RemoteAddInEdgeRpc =
communication::rpc::RequestResponse<RemoteAddInEdgeReq, RemoteAddInEdgeRes>;
struct RemoteRemoveVertexReqData {
gid::Gid gid;
tx::transaction_id_t tx_id;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &gid;
ar &tx_id;
}
};
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexReq, RemoteRemoveVertexReqData);
RPC_SINGLE_MEMBER_MESSAGE(RemoteRemoveVertexRes, RemoteUpdateResult);
using RemoteRemoveVertexRpc =
communication::rpc::RequestResponse<RemoteRemoveVertexReq,
RemoteRemoveVertexRes>;
} // namespace distributed

View File

@ -116,7 +116,6 @@ class RemoteUpdatesRpcServer {
record_accessor.Reconstruct();
for (database::StateDelta &delta : kv.second.second) {
try {
auto &updated = record_accessor.update();
auto &dba = db_accessor_;
switch (delta.type) {
case database::StateDelta::Type::TRANSACTION_BEGIN:
@ -124,11 +123,16 @@ class RemoteUpdatesRpcServer {
case database::StateDelta::Type::TRANSACTION_ABORT:
case database::StateDelta::Type::CREATE_VERTEX:
case database::StateDelta::Type::CREATE_EDGE:
case database::StateDelta::Type::REMOVE_VERTEX:
case database::StateDelta::Type::REMOVE_EDGE:
case database::StateDelta::Type::BUILD_INDEX:
LOG(FATAL) << "Can only apply record update deltas for remote "
"graph element";
case database::StateDelta::Type::REMOVE_VERTEX:
if (!db_accessor().RemoveVertex(
reinterpret_cast<VertexAccessor &>(record_accessor))) {
return RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR;
}
break;
case database::StateDelta::Type::SET_PROPERTY_VERTEX:
case database::StateDelta::Type::SET_PROPERTY_EDGE:
record_accessor.PropsSet(delta.property, delta.value);
@ -146,15 +150,18 @@ class RemoteUpdatesRpcServer {
.remove_label(delta.label);
} break;
case database::StateDelta::Type::ADD_OUT_EDGE:
reinterpret_cast<Vertex &>(updated).out_.emplace(
dba.LocalizedAddress(delta.vertex_to_address),
dba.LocalizedAddress(delta.edge_address), delta.edge_type);
reinterpret_cast<Vertex &>(record_accessor.update())
.out_.emplace(dba.LocalizedAddress(delta.vertex_to_address),
dba.LocalizedAddress(delta.edge_address),
delta.edge_type);
dba.wal().Emplace(delta);
break;
case database::StateDelta::Type::ADD_IN_EDGE:
reinterpret_cast<Vertex &>(updated).in_.emplace(
dba.LocalizedAddress(delta.vertex_from_address),
dba.LocalizedAddress(delta.edge_address), delta.edge_type);
reinterpret_cast<Vertex &>(record_accessor.update())
.in_.emplace(
dba.LocalizedAddress(delta.vertex_from_address),
dba.LocalizedAddress(delta.edge_address),
delta.edge_type);
dba.wal().Emplace(delta);
break;
}
@ -245,6 +252,15 @@ class RemoteUpdatesRpcServer {
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteAddInEdgeRes>(result);
});
server_.Register<RemoteRemoveVertexRpc>(
[this](const RemoteRemoveVertexReq &req) {
auto to_delta = database::StateDelta::RemoveVertex(req.member.tx_id,
req.member.gid);
auto result =
GetUpdates(vertex_updates_, req.member.tx_id).Emplace(to_delta);
return std::make_unique<RemoteRemoveVertexRes>(result);
});
}
/// Applies all existsing updates for the given transaction ID. If there are

View File

@ -107,4 +107,12 @@ class ReconstructionException : public QueryException {
"preceeding DELETE.") {}
};
class RemoveAttachedVertexException : public QueryRuntimeException {
public:
RemoveAttachedVertexException()
: QueryRuntimeException(
"Failed to remove vertex because of it's existing "
"connections. Consider using DETACH DELETE.") {}
};
} // namespace query

View File

@ -1227,8 +1227,9 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
self_.graph_view_);
// For the given (vertex, edge, vertex) tuple checks if they satisfy the
// "where" condition. if so, places them in the priority queue.
auto expand_pair = [this, &evaluator, &frame](
VertexAccessor from, EdgeAccessor edge, VertexAccessor vertex) {
auto expand_pair = [this, &evaluator, &frame](VertexAccessor from,
EdgeAccessor edge,
VertexAccessor vertex) {
SwitchAccessor(edge, self_.graph_view_);
SwitchAccessor(vertex, self_.graph_view_);
@ -1648,9 +1649,7 @@ bool Delete::DeleteCursor::Pull(Frame &frame, Context &context) {
if (self_.detach_)
db_.DetachRemoveVertex(va);
else if (!db_.RemoveVertex(va))
throw QueryRuntimeException(
"Failed to remove vertex because of it's existing "
"connections. Consider using DETACH DELETE.");
throw RemoveAttachedVertexException();
break;
}
@ -3325,6 +3324,8 @@ class SynchronizeCursor : public Cursor {
case distributed::RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError(
"Failed to apply deferred updates due to SerializationError");
case distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw RemoveAttachedVertexException();
case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR:
throw QueryRuntimeException(
"Failed to apply deferred updates due to RecordDeletedError");

View File

@ -207,6 +207,8 @@ void RecordAccessor<TRecord>::SendDelta(
switch (result) {
case distributed::RemoteUpdateResult::DONE:
break;
case distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR:
throw query::RemoveAttachedVertexException();
case distributed::RemoteUpdateResult::SERIALIZATION_ERROR:
throw mvcc::SerializationError();
case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR:

View File

@ -196,6 +196,62 @@ TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) {
}
}
TEST_F(DistributedGraphDbTest, DeleteVertexRemoteCommit) {
auto v_address = InsertVertex(worker(1));
database::GraphDbAccessor dba0{master()};
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
auto v_remote = VertexAccessor(v_address, dba0);
dba0.RemoveVertex(v_remote);
EXPECT_TRUE(dba1.FindVertex(v_address.gid(), true));
EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()),
distributed::RemoteUpdateResult::DONE);
EXPECT_FALSE(dba1.FindVertex(v_address.gid(), true));
}
TEST_F(DistributedGraphDbTest, DeleteVertexRemoteBothDelete) {
auto v_address = InsertVertex(worker(1));
{
database::GraphDbAccessor dba0{master()};
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
auto v_local = dba1.FindVertexChecked(v_address.gid(), false);
auto v_remote = VertexAccessor(v_address, dba0);
EXPECT_TRUE(dba1.RemoveVertex(v_local));
EXPECT_TRUE(dba0.RemoveVertex(v_remote));
EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()),
distributed::RemoteUpdateResult::DONE);
EXPECT_FALSE(dba1.FindVertex(v_address.gid(), true));
}
}
TEST_F(DistributedGraphDbTest, DeleteVertexRemoteStillConnected) {
auto v_address = InsertVertex(worker(1));
auto e_address = InsertEdge(v_address, v_address, "edge");
{
database::GraphDbAccessor dba0{master()};
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
auto v_remote = VertexAccessor(v_address, dba0);
dba0.RemoveVertex(v_remote);
EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()),
distributed::RemoteUpdateResult::UNABLE_TO_DELETE_VERTEX_ERROR);
EXPECT_TRUE(dba1.FindVertex(v_address.gid(), true));
}
{
database::GraphDbAccessor dba0{master()};
database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()};
auto e_local = dba1.FindEdgeChecked(e_address.gid(), false);
auto v_local = dba1.FindVertexChecked(v_address.gid(), false);
auto v_remote = VertexAccessor(v_address, dba0);
dba1.RemoveEdge(e_local);
dba0.RemoveVertex(v_remote);
EXPECT_EQ(worker(1).remote_updates_server().Apply(dba0.transaction_id()),
distributed::RemoteUpdateResult::DONE);
EXPECT_FALSE(dba1.FindVertex(v_address.gid(), true));
}
}
class DistributedEdgeCreateTest : public DistributedGraphDbTest {
protected:
storage::VertexAddress w1_a;