From ef6cfc4c0e52815593ee807dc3172ad3a2da10c0 Mon Sep 17 00:00:00 2001 From: Dominik Gleich Date: Mon, 26 Feb 2018 14:55:55 +0100 Subject: [PATCH] Refactor state deltas call Summary: Add label test Index gets updated after remote update Reviewers: florijan Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1246 --- src/distributed/remote_updates_rpc_server.hpp | 48 ++++++- src/storage/record_accessor.cpp | 120 ++++++------------ src/storage/record_accessor.hpp | 14 +- src/storage/vertex_accessor.cpp | 53 +++++--- src/storage/vertex_accessor.hpp | 8 +- tests/unit/distributed_updates.cpp | 39 ++++++ tests/unit/record_edge_vertex_accessor.cpp | 12 +- 7 files changed, 176 insertions(+), 118 deletions(-) diff --git a/src/distributed/remote_updates_rpc_server.hpp b/src/distributed/remote_updates_rpc_server.hpp index 004743e4e..b37b1c1d3 100644 --- a/src/distributed/remote_updates_rpc_server.hpp +++ b/src/distributed/remote_updates_rpc_server.hpp @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -112,9 +111,54 @@ class RemoteUpdatesRpcServer { RemoteUpdateResult Apply() { std::lock_guard guard{lock_}; for (auto &kv : deltas_) { + auto &record_accessor = kv.second.first; + // We need to reconstruct the record as in the meantime some local + // update might have updated it. + record_accessor.Reconstruct(); for (database::StateDelta &delta : kv.second.second) { try { - kv.second.first.ProcessDelta(delta); + auto &updated = record_accessor.update(); + auto &dba = db_accessor_; + switch (delta.type) { + case database::StateDelta::Type::TRANSACTION_BEGIN: + case database::StateDelta::Type::TRANSACTION_COMMIT: + case database::StateDelta::Type::TRANSACTION_ABORT: + case database::StateDelta::Type::CREATE_VERTEX: + case database::StateDelta::Type::CREATE_EDGE: + case database::StateDelta::Type::REMOVE_VERTEX: + case database::StateDelta::Type::REMOVE_EDGE: + case database::StateDelta::Type::BUILD_INDEX: + LOG(FATAL) << "Can only apply record update deltas for remote " + "graph element"; + case database::StateDelta::Type::SET_PROPERTY_VERTEX: + case database::StateDelta::Type::SET_PROPERTY_EDGE: + record_accessor.PropsSet(delta.property, delta.value); + break; + case database::StateDelta::Type::ADD_LABEL: + // It is only possible that ADD_LABEL gets called on a + // VertexAccessor. + reinterpret_cast(record_accessor) + .add_label(delta.label); + break; + case database::StateDelta::Type::REMOVE_LABEL: { + // It is only possible that REMOVE_LABEL gets called on a + // VertexAccessor. + reinterpret_cast(record_accessor) + .remove_label(delta.label); + } break; + case database::StateDelta::Type::ADD_OUT_EDGE: + reinterpret_cast(updated).out_.emplace( + dba.LocalizedAddress(delta.vertex_to_address), + dba.LocalizedAddress(delta.edge_address), delta.edge_type); + dba.wal().Emplace(delta); + break; + case database::StateDelta::Type::ADD_IN_EDGE: + reinterpret_cast(updated).in_.emplace( + dba.LocalizedAddress(delta.vertex_from_address), + dba.LocalizedAddress(delta.edge_address), delta.edge_type); + dba.wal().Emplace(delta); + break; + } } catch (const mvcc::SerializationError &) { return RemoteUpdateResult::SERIALIZATION_ERROR; } catch (const RecordDeletedError &) { diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 3171f7496..4eb3f5299 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -27,52 +27,52 @@ template <> void RecordAccessor::PropsSet(storage::Property key, PropertyValue value) { auto &dba = db_accessor(); - ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, - dba.PropertyName(key), value)); + auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, + dba.PropertyName(key), value); + update().properties_.set(key, value); if (is_local()) { dba.UpdatePropertyIndex(key, *this, &update()); } + ProcessDelta(delta); } template <> void RecordAccessor::PropsSet(storage::Property key, PropertyValue value) { auto &dba = db_accessor(); - ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, - dba.PropertyName(key), value)); + auto delta = StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, + dba.PropertyName(key), value); + + update().properties_.set(key, value); + ProcessDelta(delta); } template <> void RecordAccessor::PropsErase(storage::Property key) { auto &dba = db_accessor(); - ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, - dba.PropertyName(key), - PropertyValue::Null)); + auto delta = + StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, + dba.PropertyName(key), PropertyValue::Null); + update().properties_.set(key, PropertyValue::Null); + ProcessDelta(delta); } template <> void RecordAccessor::PropsErase(storage::Property key) { auto &dba = db_accessor(); - ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, - dba.PropertyName(key), - PropertyValue::Null)); + auto delta = + StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, + dba.PropertyName(key), PropertyValue::Null); + update().properties_.set(key, PropertyValue::Null); + ProcessDelta(delta); } template void RecordAccessor::PropsClear() { - auto &dba = db_accessor(); std::vector to_remove; for (const auto &kv : update().properties_) to_remove.emplace_back(kv.first); for (const auto &prop : to_remove) { - if (std::is_same::value) { - ProcessDelta(StateDelta::PropsSetVertex(dba.transaction_id(), gid(), prop, - dba.PropertyName(prop), - PropertyValue::Null)); - } else { - ProcessDelta(StateDelta::PropsSetEdge(dba.transaction_id(), gid(), prop, - dba.PropertyName(prop), - PropertyValue::Null)); - } + PropsErase(prop); } } @@ -196,69 +196,33 @@ const TRecord &RecordAccessor::current() const { return *current_; } +template +void RecordAccessor::SendDelta( + const database::StateDelta &delta) const { + DCHECK(!is_local()) + << "Only a delta created on a remote accessor should be sent"; + + auto result = db_accessor().db().remote_updates_clients().RemoteUpdate( + address().worker_id(), delta); + switch (result) { + case distributed::RemoteUpdateResult::DONE: + break; + case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: + throw mvcc::SerializationError(); + case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: + throw RecordDeletedError(); + case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR: + throw LockTimeoutException("Lock timeout on remote worker"); + } +} + template void RecordAccessor::ProcessDelta( const database::StateDelta &delta) const { - auto &dba = db_accessor(); - // We need to reconstruct the record as in the meantime some local update - // have updated it. - Reconstruct(); - // Apply the delta both on local and remote data. We need to see the changes - // we make to remote data, even if it's not applied immediately. - auto &updated = update(); - switch (delta.type) { - case StateDelta::Type::TRANSACTION_BEGIN: - case StateDelta::Type::TRANSACTION_COMMIT: - case StateDelta::Type::TRANSACTION_ABORT: - case StateDelta::Type::CREATE_VERTEX: - case StateDelta::Type::CREATE_EDGE: - case StateDelta::Type::REMOVE_VERTEX: - case StateDelta::Type::REMOVE_EDGE: - case StateDelta::Type::BUILD_INDEX: - LOG(FATAL) - << "Can only apply record update deltas for remote graph element"; - case StateDelta::Type::SET_PROPERTY_VERTEX: - case StateDelta::Type::SET_PROPERTY_EDGE: - updated.properties_.set(delta.property, delta.value); - break; - case StateDelta::Type::ADD_LABEL: - // It is only possible that ADD_LABEL gets calld on a VertexAccessor. - reinterpret_cast(updated).labels_.emplace_back(delta.label); - break; - case StateDelta::Type::REMOVE_LABEL: { - // It is only possible that REMOVE_LABEL gets calld on a VertexAccessor. - auto &labels = reinterpret_cast(updated).labels_; - auto found = std::find(labels.begin(), labels.end(), delta.label); - std::swap(*found, labels.back()); - labels.pop_back(); - } break; - case StateDelta::Type::ADD_OUT_EDGE: - reinterpret_cast(updated).out_.emplace( - dba.LocalizedAddress(delta.vertex_to_address), - dba.LocalizedAddress(delta.edge_address), delta.edge_type); - break; - case StateDelta::Type::ADD_IN_EDGE: - reinterpret_cast(updated).in_.emplace( - dba.LocalizedAddress(delta.vertex_from_address), - dba.LocalizedAddress(delta.edge_address), delta.edge_type); - break; - } - if (is_local()) { - dba.wal().Emplace(delta); + db_accessor().wal().Emplace(delta); } else { - auto result = dba.db().remote_updates_clients().RemoteUpdate( - address().worker_id(), delta); - switch (result) { - case distributed::RemoteUpdateResult::DONE: - break; - case distributed::RemoteUpdateResult::SERIALIZATION_ERROR: - throw mvcc::SerializationError(); - case distributed::RemoteUpdateResult::UPDATE_DELETED_ERROR: - throw RecordDeletedError(); - case distributed::RemoteUpdateResult::LOCK_TIMEOUT_ERROR: - throw LockTimeoutException("Lock timeout on remote worker"); - } + SendDelta(delta); } } diff --git a/src/storage/record_accessor.hpp b/src/storage/record_accessor.hpp index a6c21df3a..39689bbc4 100644 --- a/src/storage/record_accessor.hpp +++ b/src/storage/record_accessor.hpp @@ -13,7 +13,7 @@ namespace database { class GraphDbAccessor; struct StateDelta; -}; +}; // namespace database /** * An accessor to a database record (an Edge or a Vertex). @@ -146,15 +146,17 @@ class RecordAccessor : public TotalOrdering> { (current_state && new_ && !new_->is_expired_by(t)); } + protected: /** - * Processes the delta that's a consequence of changes in this accessor. If - * the accessor is local that means writing the delta to the write-ahead log. - * If it's remote, then the delta needs to be sent to it's owner for - * processing. + * Sends delta for remote processing. + */ + void SendDelta(const database::StateDelta &delta) const; + + /** + * Processes delta by either adding it to WAL, or by sending it remotely. */ void ProcessDelta(const database::StateDelta &delta) const; - protected: /** * Pointer to the version (either old_ or new_) that READ operations * in the accessor should take data from. Note that WRITE operations diff --git a/src/storage/vertex_accessor.cpp b/src/storage/vertex_accessor.cpp index 8beda64a5..d300c4108 100644 --- a/src/storage/vertex_accessor.cpp +++ b/src/storage/vertex_accessor.cpp @@ -10,29 +10,39 @@ size_t VertexAccessor::out_degree() const { return current().out_.size(); } size_t VertexAccessor::in_degree() const { return current().in_.size(); } -bool VertexAccessor::add_label(storage::Label label) { - auto &updated = update(); - if (utils::Contains(updated.labels_, label)) return false; - - // not a duplicate label, add it +void VertexAccessor::add_label(storage::Label label) { auto &dba = db_accessor(); - ProcessDelta(database::StateDelta::AddLabel(dba.transaction_id(), gid(), - label, dba.LabelName(label))); + auto delta = database::StateDelta::AddLabel(dba.transaction_id(), gid(), + label, dba.LabelName(label)); Vertex &vertex = update(); - - if (is_local()) { - dba.UpdateLabelIndices(label, *this, &vertex); + // not a duplicate label, add it + if (!utils::Contains(vertex.labels_, label)) { + vertex.labels_.emplace_back(label); + if (is_local()) { + dba.wal().Emplace(delta); + dba.UpdateLabelIndices(label, *this, &vertex); + } } - return true; + + if (!is_local()) SendDelta(delta); } -size_t VertexAccessor::remove_label(storage::Label label) { - if (!utils::Contains(update().labels_, label)) return 0; - +void VertexAccessor::remove_label(storage::Label label) { auto &dba = db_accessor(); - ProcessDelta(database::StateDelta::RemoveLabel(dba.transaction_id(), gid(), - label, dba.LabelName(label))); - return 1; + auto delta = database::StateDelta::RemoveLabel(dba.transaction_id(), gid(), + label, dba.LabelName(label)); + Vertex &vertex = update(); + if (utils::Contains(vertex.labels_, label)) { + auto &labels = vertex.labels_; + auto found = std::find(labels.begin(), labels.end(), delta.label); + std::swap(*found, labels.back()); + labels.pop_back(); + if (is_local()) { + dba.wal().Emplace(delta); + } + } + + if (!is_local()) SendDelta(delta); } bool VertexAccessor::has_label(storage::Label label) const { @@ -50,9 +60,10 @@ std::ostream &operator<<(std::ostream &os, const VertexAccessor &va) { stream << va.db_accessor().LabelName(label); }); os << " {"; - utils::PrintIterable(os, va.Properties(), ", ", [&](auto &stream, - const auto &pair) { - stream << va.db_accessor().PropertyName(pair.first) << ": " << pair.second; - }); + utils::PrintIterable(os, va.Properties(), ", ", + [&](auto &stream, const auto &pair) { + stream << va.db_accessor().PropertyName(pair.first) + << ": " << pair.second; + }); return os << "})"; } diff --git a/src/storage/vertex_accessor.hpp b/src/storage/vertex_accessor.hpp index e9ad96d63..00650afa1 100644 --- a/src/storage/vertex_accessor.hpp +++ b/src/storage/vertex_accessor.hpp @@ -62,12 +62,10 @@ class VertexAccessor : public RecordAccessor { /** Adds a label to the Vertex. If the Vertex already has that label the call * has no effect. */ - // TODO revise return value, is it necessary? - bool add_label(storage::Label label); + void add_label(storage::Label label); - /** Removes a label from the Vertex. Return number of removed (0, 1). */ - // TODO reves return value, is it necessary? - size_t remove_label(storage::Label label); + /** Removes a label from the Vertex. */ + void remove_label(storage::Label label); /** Indicates if the Vertex has the given label. */ bool has_label(storage::Label label) const; diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index 37d5ccee7..0c748ff88 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -157,6 +157,45 @@ TEST_F(DistributedGraphDbTest, UpdateVertexRemoteAndLocal) { } } +TEST_F(DistributedGraphDbTest, AddSameLabelRemoteAndLocal) { + auto v_address = InsertVertex(worker(1)); + { + database::GraphDbAccessor dba0{master()}; + database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; + auto v_local = dba1.FindVertexChecked(v_address.gid(), false); + auto v_remote = VertexAccessor(v_address, dba0); + auto l1 = dba1.Label("label"); + v_remote.add_label(l1); + v_local.add_label(l1); + worker(1).remote_updates_server().Apply(dba0.transaction_id()); + dba0.Commit(); + } + { + database::GraphDbAccessor dba0{master()}; + database::GraphDbAccessor dba1{worker(1), dba0.transaction_id()}; + auto v = dba1.FindVertexChecked(v_address.gid(), false); + EXPECT_EQ(v.labels().size(), 1); + } +} + +TEST_F(DistributedGraphDbTest, IndexGetsUpdatedRemotely) { + storage::VertexAddress v_remote = InsertVertex(worker(1)); + storage::Label label; + { + database::GraphDbAccessor dba0{master()}; + label = dba0.Label("label"); + VertexAccessor va(v_remote, dba0); + va.add_label(label); + worker(1).remote_updates_server().Apply(dba0.transaction_id()); + dba0.Commit(); + } + { + database::GraphDbAccessor dba1{worker(1)}; + auto vertices = dba1.Vertices(label, false); + EXPECT_EQ(std::distance(vertices.begin(), vertices.end()), 1); + } +} + class DistributedEdgeCreateTest : public DistributedGraphDbTest { protected: storage::VertexAddress w1_a; diff --git a/tests/unit/record_edge_vertex_accessor.cpp b/tests/unit/record_edge_vertex_accessor.cpp index 315b3296e..6956a995c 100644 --- a/tests/unit/record_edge_vertex_accessor.cpp +++ b/tests/unit/record_edge_vertex_accessor.cpp @@ -156,31 +156,31 @@ TEST(RecordAccessor, VertexLabels) { // adding labels EXPECT_FALSE(v1.has_label(l1)); - EXPECT_TRUE(v1.add_label(l1)); + v1.add_label(l1); EXPECT_TRUE(v1.has_label(l1)); EXPECT_EQ(v1.labels().size(), 1); EXPECT_EQ(labels.size(), 1); - EXPECT_FALSE(v1.add_label(l1)); + v1.add_label(l1); EXPECT_EQ(v1.labels().size(), 1); EXPECT_EQ(labels.size(), 1); EXPECT_FALSE(v1.has_label(l2)); - EXPECT_TRUE(v1.add_label(l2)); + v1.add_label(l2); EXPECT_TRUE(v1.has_label(l2)); EXPECT_EQ(v1.labels().size(), 2); EXPECT_EQ(labels.size(), 2); // removing labels storage::Label l3 = dba.Label("label3"); - EXPECT_EQ(v1.remove_label(l3), 0); + v1.remove_label(l3); EXPECT_EQ(labels.size(), 2); - EXPECT_EQ(v1.remove_label(l1), 1); + v1.remove_label(l1); EXPECT_FALSE(v1.has_label(l1)); EXPECT_EQ(v1.labels().size(), 1); - EXPECT_EQ(v1.remove_label(l1), 0); + v1.remove_label(l1); EXPECT_TRUE(v1.has_label(l2)); }