diff --git a/src/database/distributed_graph_db.cpp b/src/database/distributed_graph_db.cpp index 4ba4109a8..fde68e8d2 100644 --- a/src/database/distributed_graph_db.cpp +++ b/src/database/distributed_graph_db.cpp @@ -92,8 +92,7 @@ class DistributedRecordAccessor final { if (address.is_local()) { return address.local()->update(dba.transaction()); } - return data_manager_->FindNew(dba.transaction_id(), - address.gid()); + return data_manager_->FindNew(dba.transaction_id(), address.gid()); } void ProcessDelta(const RecordAccessor &record_accessor, @@ -122,6 +121,25 @@ class DistributedRecordAccessor final { throw utils::LockTimeoutException("Lock timeout on remote worker"); } } + + int64_t CypherId(const RecordAccessor &record_accessor) { + auto &dba = record_accessor.db_accessor(); + const auto &address = record_accessor.address(); + if (record_accessor.is_local()) return address.local()->cypher_id(); + // Fetch data from the cache. + // + // NOTE: This part is executed when we need to migrate + // a vertex and it has edges that don't belong to it. A machine that owns + // the vertex still need to figure out what is the cypher_id for each + // remote edge because the machine has to initiate remote edge creation + // and for that call it has to know the remote cypher_ids. + // TODO (buda): If we save cypher_id similar/next to edge_type we would save + // a network call. + return data_manager_ + ->Find(dba.transaction().id_, address.worker_id(), + address.gid()) + .cypher_id; + } }; class DistributedEdgeAccessor final : public ::RecordAccessor::Impl { @@ -150,6 +168,10 @@ class DistributedEdgeAccessor final : public ::RecordAccessor::Impl { const database::StateDelta &delta) override { return distributed_accessor_.ProcessDelta(ra, delta); } + + int64_t CypherId(const RecordAccessor &ra) override { + return distributed_accessor_.CypherId(ra); + } }; class DistributedVertexAccessor final : public ::VertexAccessor::Impl { @@ -216,6 +238,10 @@ class DistributedVertexAccessor final : public ::VertexAccessor::Impl { if (!va.is_local()) distributed_accessor_.SendDelta(va, delta); } + + int64_t CypherId(const RecordAccessor &ra) override { + return distributed_accessor_.CypherId(ra); + } }; ////////////////////////////////////////////////////////////////////// @@ -297,14 +323,17 @@ class DistributedAccessor : public GraphDbAccessor { return GraphDbAccessor::InsertEdgeOnFrom(from, to, edge_type, requested_gid, cypher_id); } - auto edge_address = - updates_clients_->CreateEdge(transaction_id(), *from, *to, edge_type); + auto created_edge_info = updates_clients_->CreateEdge( + transaction_id(), *from, *to, edge_type, cypher_id); + auto edge_address = created_edge_info.edge_address; auto *from_updated = data_manager_->FindNew(transaction_id(), from->gid()); // Create an Edge and insert it into the Cache so we see it locally. data_manager_->Emplace( - transaction_id(), edge_address.gid(), nullptr, - std::make_unique(from->address(), to->address(), edge_type)); + transaction_id(), edge_address.gid(), + distributed::CachedRecordData( + created_edge_info.cypher_id, nullptr, + std::make_unique(from->address(), to->address(), edge_type))); from_updated->out_.emplace( db().storage().LocalizedAddressIfPossible(to->address()), edge_address, edge_type); @@ -810,8 +839,8 @@ distributed::IndexRpcClients &Master::index_rpc_clients() { VertexAccessor InsertVertexIntoRemote( GraphDbAccessor *dba, int worker_id, const std::vector &labels, - const std::unordered_map - &properties) { + const std::unordered_map &properties, + std::experimental::optional cypher_id) { // TODO: Replace this with virtual call or some other mechanism. auto *distributed_db = dynamic_cast(&dba->db()); @@ -821,13 +850,16 @@ VertexAccessor InsertVertexIntoRemote( auto *updates_clients = &distributed_db->updates_clients(); auto *data_manager = &distributed_db->data_manager(); CHECK(updates_clients && data_manager); - gid::Gid gid = updates_clients->CreateVertex(worker_id, dba->transaction_id(), - labels, properties); + auto created_vertex_info = updates_clients->CreateVertex( + worker_id, dba->transaction_id(), labels, properties, cypher_id); auto vertex = std::make_unique(); vertex->labels_ = labels; for (auto &kv : properties) vertex->properties_.set(kv.first, kv.second); - data_manager->Emplace(dba->transaction_id(), gid, nullptr, std::move(vertex)); - return VertexAccessor({gid, worker_id}, *dba); + data_manager->Emplace( + dba->transaction_id(), created_vertex_info.gid, + distributed::CachedRecordData(created_vertex_info.cypher_id, + nullptr, std::move(vertex))); + return VertexAccessor({created_vertex_info.gid, worker_id}, *dba); } ////////////////////////////////////////////////////////////////////// diff --git a/src/database/distributed_graph_db.hpp b/src/database/distributed_graph_db.hpp index 244ad13fe..5fde922d6 100644 --- a/src/database/distributed_graph_db.hpp +++ b/src/database/distributed_graph_db.hpp @@ -132,6 +132,7 @@ class Worker final : public DistributedGraphDb { VertexAccessor InsertVertexIntoRemote( GraphDbAccessor *dba, int worker_id, const std::vector &labels, - const std::unordered_map &properties); + const std::unordered_map &properties, + std::experimental::optional cypher_id); } // namespace database diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index c68f1db5f..fa186a1fb 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -54,6 +54,10 @@ class SingleNodeRecordAccessor final { CHECK(record_accessor.is_local()); record_accessor.db_accessor().wal().Emplace(delta); } + + int64_t CypherId(const RecordAccessor &record_accessor) { + return record_accessor.address().local()->cypher_id(); + } }; class VertexAccessorImpl final : public ::VertexAccessor::Impl { @@ -109,6 +113,10 @@ class VertexAccessorImpl final : public ::VertexAccessor::Impl { dba.wal().Emplace(delta); } } + + int64_t CypherId(const RecordAccessor &ra) override { + return accessor_.CypherId(ra); + } }; class EdgeAccessorImpl final : public ::RecordAccessor::Impl { @@ -133,6 +141,10 @@ class EdgeAccessorImpl final : public ::RecordAccessor::Impl { const database::StateDelta &delta) override { return accessor_.ProcessDelta(ra, delta); } + + int64_t CypherId(const RecordAccessor &ra) override { + return accessor_.CypherId(ra); + } }; class SingleNodeAccessor : public GraphDbAccessor { diff --git a/src/database/graph_db_accessor.cpp b/src/database/graph_db_accessor.cpp index db48bc4df..2837ddfd0 100644 --- a/src/database/graph_db_accessor.cpp +++ b/src/database/graph_db_accessor.cpp @@ -359,7 +359,7 @@ storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom( // `CREATE_EDGE`, but always have it split into 3 parts (edge insertion, // in/out modification). wal().Emplace(database::StateDelta::CreateEdge( - transaction_.id_, edge_accessor.gid(), edge_accessor.cypher_id(), + transaction_.id_, edge_accessor.gid(), edge_accessor.CypherId(), from->gid(), to->gid(), edge_type, EdgeTypeName(edge_type))); from_updated->out_.emplace( @@ -371,9 +371,9 @@ storage::EdgeAddress GraphDbAccessor::InsertEdgeOnFrom( void GraphDbAccessor::InsertEdgeOnTo(VertexAccessor *from, VertexAccessor *to, const storage::EdgeType &edge_type, const storage::EdgeAddress &edge_address) { - // ensure that the "to" accessor has the latest version (Switch new) - // WARNING: must do that after the above "from->update()" for cases when - // we are creating a cycle and "from" and "to" are the same vlist + // Ensure that the "to" accessor has the latest version (switch new). + // WARNING: Must do that after the above "from->update()" for cases when + // we are creating a cycle and "from" and "to" are the same vlist. to->SwitchNew(); auto *to_updated = &to->update(); to_updated->in_.emplace( diff --git a/src/database/graph_db_accessor.hpp b/src/database/graph_db_accessor.hpp index 3a3e386d1..06f7de7b6 100644 --- a/src/database/graph_db_accessor.hpp +++ b/src/database/graph_db_accessor.hpp @@ -1,12 +1,14 @@ +/// @file + #pragma once #include #include #include +#include #include #include -#include #include "database/graph_db.hpp" #include "storage/address_types.hpp" diff --git a/src/distributed/bfs_rpc_clients.cpp b/src/distributed/bfs_rpc_clients.cpp index 9c2b00251..f8b5ace5b 100644 --- a/src/distributed/bfs_rpc_clients.cpp +++ b/src/distributed/bfs_rpc_clients.cpp @@ -82,12 +82,14 @@ std::experimental::optional BfsRpcClients::Pull( CHECK(res) << "SubcursorPull RPC failed!"; if (!res->vertex) return std::experimental::nullopt; - data_manager_->Emplace(dba->transaction_id(), - res->vertex->global_address.gid(), - std::move(res->vertex->old_element_output), - std::move(res->vertex->new_element_output)); + data_manager_->Emplace( + dba->transaction_id(), res->vertex->global_address.gid(), + distributed::CachedRecordData( + res->cypher_id, std::move(res->vertex->old_element_output), + std::move(res->vertex->new_element_output))); return VertexAccessor(res->vertex->global_address, *dba); } + bool BfsRpcClients::ExpandLevel( const std::unordered_map &subcursor_ids) { auto futures = clients_->ExecuteOnWorkers( @@ -140,8 +142,11 @@ PathSegment BuildPathSegment(ReconstructPathRes *res, distributed::DataManager *data_manager) { std::vector edges; for (auto &edge : res->edges) { - data_manager->Emplace(dba->transaction_id(), edge.global_address.gid(), std::move(edge.old_element_output), - std::move(edge.new_element_output)); + data_manager->Emplace( + dba->transaction_id(), edge.global_address.gid(), + distributed::CachedRecordData( + edge.cypher_id, std::move(edge.old_element_output), + std::move(edge.new_element_output))); edges.emplace_back(edge.global_address, *dba); } diff --git a/src/distributed/bfs_rpc_messages.lcp b/src/distributed/bfs_rpc_messages.lcp index 033e234d6..81c342ff6 100644 --- a/src/distributed/bfs_rpc_messages.lcp +++ b/src/distributed/bfs_rpc_messages.lcp @@ -54,7 +54,8 @@ cpp<# cpp<#)) (lcp:define-struct (serialized-graph-element t-element) () - ((global-address "storage::Address>" + ((cypher-id :int64_t) + (global-address "storage::Address>" :capnp-type "Storage.Address") (old-element-input "TElement *" :capnp-type '((null "Void") (vertex "Dis.Vertex") (edge "Dis.Edge")) @@ -67,10 +68,12 @@ cpp<# (worker-id :int16_t :capnp-save :dont-save)) (:public #>cpp - SerializedGraphElement(storage::Address> global_address, + SerializedGraphElement(int64_t cypher_id, + storage::Address> global_address, TElement *old_element_input, TElement *new_element_input, int16_t worker_id) - : global_address(global_address), + : cypher_id(cypher_id), + global_address(global_address), old_element_input(old_element_input), old_element_output(nullptr), new_element_input(new_element_input), @@ -81,8 +84,10 @@ cpp<# } SerializedGraphElement(const RecordAccessor &accessor, int16_t worker_id) - : SerializedGraphElement(accessor.GlobalAddress(), accessor.GetOld(), - accessor.GetNew(), worker_id) {} + : SerializedGraphElement(accessor.CypherId(), + accessor.GlobalAddress(), + accessor.GetOld(), accessor.GetNew(), + worker_id) {} SerializedGraphElement() {} cpp<#) @@ -159,10 +164,13 @@ cpp<# (lcp:define-rpc subcursor-pull (:request ((member :int64_t))) - (:response ((vertex "std::experimental::optional" :initarg :move - :capnp-type "Utils.Optional(SerializedGraphElement)" - :capnp-save (lcp:capnp-save-optional "capnp::SerializedGraphElement" "SerializedVertex") - :capnp-load (lcp:capnp-load-optional "capnp::SerializedGraphElement" "SerializedVertex"))))) + (:response + ((cypher-id :int64_t) + (vertex "std::experimental::optional" :initarg :move + :capnp-type "Utils.Optional(SerializedGraphElement)" + :capnp-save (lcp:capnp-save-optional "capnp::SerializedGraphElement" "SerializedVertex") + :capnp-load (lcp:capnp-load-optional "capnp::SerializedGraphElement" "SerializedVertex"))))) + (lcp:define-rpc set-source (:request ((subcursor-id :int64_t) diff --git a/src/distributed/bfs_rpc_server.hpp b/src/distributed/bfs_rpc_server.hpp index 597ce923d..c2f6ecadf 100644 --- a/src/distributed/bfs_rpc_server.hpp +++ b/src/distributed/bfs_rpc_server.hpp @@ -1,4 +1,5 @@ /// @file + #pragma once #include @@ -84,7 +85,8 @@ class BfsRpcServer { res.Save(res_builder); return; } - SubcursorPullRes res(SerializedVertex(*vertex, db_->WorkerId())); + SubcursorPullRes res(vertex->CypherId(), + SerializedVertex(*vertex, db_->WorkerId())); res.Save(res_builder); }); @@ -111,8 +113,8 @@ class BfsRpcServer { } else { LOG(FATAL) << "`edge` or `vertex` should be set in ReconstructPathReq"; } - ReconstructPathRes res(result.edges, result.next_vertex, - result.next_edge, db_->WorkerId()); + ReconstructPathRes res(result.edges, result.next_vertex, result.next_edge, + db_->WorkerId()); res.Save(res_builder); }); diff --git a/src/distributed/cache.cpp b/src/distributed/cache.cpp deleted file mode 100644 index dc3e7721b..000000000 --- a/src/distributed/cache.cpp +++ /dev/null @@ -1,99 +0,0 @@ - -#include "glog/logging.h" - -#include "database/storage.hpp" -#include "distributed/cache.hpp" -#include "storage/edge.hpp" -#include "storage/vertex.hpp" - -namespace distributed { - -template -TRecord *Cache::FindNew(gid::Gid gid) { - std::lock_guard guard{lock_}; - auto found = cache_.find(gid); - DCHECK(found != cache_.end()) - << "FindNew for uninitialized remote Vertex/Edge"; - auto &pair = found->second; - if (!pair.second) { - pair.second = std::unique_ptr(pair.first->CloneData()); - } - return pair.second.get(); -} - -template -void Cache::FindSetOldNew(tx::TransactionId tx_id, int worker_id, - gid::Gid gid, TRecord *&old_record, - TRecord *&new_record) { - { - std::lock_guard guard(lock_); - auto found = cache_.find(gid); - if (found != cache_.end()) { - old_record = found->second.first.get(); - new_record = found->second.second.get(); - return; - } - } - - auto remote = data_clients_.RemoteElement(worker_id, tx_id, gid); - LocalizeAddresses(*remote); - - // This logic is a bit strange because we need to make sure that someone - // else didn't get a response and updated the cache before we did and we - // need a lock for that, but we also need to check if we can now return - // that result - otherwise we could get incosistent results for remote - // FindSetOldNew - std::lock_guard guard(lock_); - auto it_pair = cache_.emplace( - gid, std::make_pair(std::move(remote), nullptr)); - - old_record = it_pair.first->second.first.get(); - new_record = it_pair.first->second.second.get(); -} - -template -void Cache::emplace(gid::Gid gid, rec_uptr old_record, - rec_uptr new_record) { - if (old_record) LocalizeAddresses(*old_record); - if (new_record) LocalizeAddresses(*new_record); - - std::lock_guard guard{lock_}; - // We can't replace existing data because some accessors might be using - // it. - // TODO - consider if it's necessary and OK to copy just the data content. - auto found = cache_.find(gid); - if (found != cache_.end()) - return; - else - cache_[gid] = std::make_pair(std::move(old_record), std::move(new_record)); -} - -template -void Cache::ClearCache() { - std::lock_guard guard{lock_}; - cache_.clear(); -} - -template <> -void Cache::LocalizeAddresses(Vertex &vertex) { - auto localize_edges = [this](auto &edges) { - for (auto &element : edges) { - element.vertex = storage_.LocalizedAddressIfPossible(element.vertex); - element.edge = storage_.LocalizedAddressIfPossible(element.edge); - } - }; - - localize_edges(vertex.in_.storage()); - localize_edges(vertex.out_.storage()); -} - -template <> -void Cache::LocalizeAddresses(Edge &edge) { - edge.from_ = storage_.LocalizedAddressIfPossible(edge.from_); - edge.to_ = storage_.LocalizedAddressIfPossible(edge.to_); -} - -template class Cache; -template class Cache; - -} // namespace distributed diff --git a/src/distributed/cache.hpp b/src/distributed/cache.hpp index 2b46fdb23..099137eb0 100644 --- a/src/distributed/cache.hpp +++ b/src/distributed/cache.hpp @@ -1,3 +1,5 @@ +/// @file + #pragma once #include @@ -40,21 +42,13 @@ class Cache { return cache_.emplace(std::forward(key), std::forward(value)); } - void erase(const TKey &key) { - cache_.erase(key); - } - - Iterator end() { - return cache_.end(); - } + void erase(const TKey &key) { cache_.erase(key); } - bool contains(const TKey &key) { - return find(key) != end(); - } + Iterator end() { return cache_.end(); } - void clear() { - cache_.clear(); - } + bool contains(const TKey &key) { return find(key) != end(); } + + void clear() { cache_.clear(); } private: std::unordered_map cache_; diff --git a/src/distributed/data_manager.hpp b/src/distributed/data_manager.hpp index f0af9275c..afc1850f2 100644 --- a/src/distributed/data_manager.hpp +++ b/src/distributed/data_manager.hpp @@ -1,3 +1,5 @@ +/// @file + #pragma once #include "data_structures/concurrent/concurrent_map.hpp" @@ -11,12 +13,26 @@ class Edge; namespace distributed { +/// A wrapper for cached vertex/edge from other machines in the distributed +/// system. +/// +/// @tparam TRecord Vertex or Edge +template +struct CachedRecordData { + CachedRecordData(int64_t cypher_id, std::unique_ptr old_record, + std::unique_ptr new_record) + : cypher_id(cypher_id), + old_record(std::move(old_record)), + new_record(std::move(new_record)) {} + int64_t cypher_id; + std::unique_ptr old_record; + std::unique_ptr new_record; +}; + /// Handles remote data caches for edges and vertices, per transaction. class DataManager { template - using CacheG = - Cache, std::unique_ptr>>; + using CacheG = Cache>; template using CacheT = ConcurrentMap>; @@ -35,12 +51,11 @@ class DataManager { DCHECK(found != cache.end()) << "FindNew is called on uninitialized remote Vertex/Edge"; - auto &pair = found->second; - if (!pair.second) { - pair.second = std::unique_ptr(pair.first->CloneData()); + auto &data = found->second; + if (!data.new_record) { + data.new_record = std::unique_ptr(data.old_record->CloneData()); } - - return pair.second.get(); + return data.new_record.get(); } /// For the Vertex/Edge with the given global ID, looks for the data visible @@ -57,14 +72,14 @@ class DataManager { std::lock_guard guard(lock); auto found = cache.find(gid); if (found != cache.end()) { - *old_record = found->second.first.get(); - *new_record = found->second.second.get(); + *old_record = found->second.old_record.get(); + *new_record = found->second.new_record.get(); return; } } auto remote = data_clients_.RemoteElement(worker_id, tx_id, gid); - LocalizeAddresses(*remote); + LocalizeAddresses(*remote.record_ptr); // This logic is a bit strange because we need to make sure that someone // else didn't get a response and updated the cache before we did and we @@ -72,21 +87,45 @@ class DataManager { // that result - otherwise we could get incosistent results for remote // FindSetOldNew std::lock_guard guard(lock); - auto it_pair = cache.emplace(std::move(gid), - std::make_pair(std::move(remote), nullptr)); + auto it_pair = cache.emplace( + std::move(gid), + CachedRecordData(remote.cypher_id, + std::move(remote.record_ptr), nullptr)); - *old_record = it_pair.first->second.first.get(); - *new_record = it_pair.first->second.second.get(); + *old_record = it_pair.first->second.old_record.get(); + *new_record = it_pair.first->second.new_record.get(); + } + + /// Finds cached element for the given transaction, worker and gid. + /// + /// @tparam TRecord Vertex or Edge + template + const CachedRecordData &Find(tx::TransactionId tx_id, int worker_id, + gid::Gid gid) { + auto &cache = GetCache(tx_id); + std::unique_lock guard(GetLock(tx_id)); + auto found = cache.find(gid); + if (found != cache.end()) { + return found->second; + } else { + guard.unlock(); + auto remote = data_clients_.RemoteElement(worker_id, tx_id, gid); + LocalizeAddresses(*remote.record_ptr); + guard.lock(); + return cache + .emplace(std::move(gid), + CachedRecordData( + remote.cypher_id, std::move(remote.record_ptr), nullptr)) + .first->second; + } } /// Sets the given records as (new, old) data for the given gid. template void Emplace(tx::TransactionId tx_id, gid::Gid gid, - std::unique_ptr old_record, - std::unique_ptr new_record) { - - if (old_record) LocalizeAddresses(*old_record); - if (new_record) LocalizeAddresses(*new_record); + CachedRecordData data) { + if (data.old_record) LocalizeAddresses(*data.old_record); + if (data.new_record) LocalizeAddresses(*data.new_record); std::lock_guard guard(GetLock(tx_id)); // We can't replace existing data because some accessors might be using @@ -94,9 +133,7 @@ class DataManager { // TODO - consider if it's necessary and OK to copy just the data content. auto &cache = GetCache(tx_id); auto found = cache.find(gid); - if (found == cache.end()) - cache.emplace(std::move(gid), std::make_pair(std::move(old_record), - std::move(new_record))); + if (found == cache.end()) cache.emplace(std::move(gid), std::move(data)); } /// Removes all the caches for a single transaction. diff --git a/src/distributed/data_rpc_clients.cpp b/src/distributed/data_rpc_clients.cpp index ac3ffa4ff..fe6c53267 100644 --- a/src/distributed/data_rpc_clients.cpp +++ b/src/distributed/data_rpc_clients.cpp @@ -8,23 +8,25 @@ namespace distributed { template <> -std::unique_ptr DataRpcClients::RemoteElement(int worker_id, - tx::TransactionId tx_id, - gid::Gid gid) { - auto response = - clients_.GetClientPool(worker_id).Call(TxGidPair{tx_id, gid}); - CHECK(response) << "EdgeRpc failed"; - return std::move(response->edge_output); -} - -template <> -std::unique_ptr DataRpcClients::RemoteElement(int worker_id, +RemoteElementInfo DataRpcClients::RemoteElement(int worker_id, tx::TransactionId tx_id, gid::Gid gid) { + auto response = + clients_.GetClientPool(worker_id).Call(TxGidPair{tx_id, gid}); + CHECK(response) << "EdgeRpc failed"; + return RemoteElementInfo(response->cypher_id, + std::move(response->edge_output)); +} + +template <> +RemoteElementInfo DataRpcClients::RemoteElement(int worker_id, + tx::TransactionId tx_id, + gid::Gid gid) { auto response = clients_.GetClientPool(worker_id).Call(TxGidPair{tx_id, gid}); CHECK(response) << "VertexRpc failed"; - return std::move(response->vertex_output); + return RemoteElementInfo(response->cypher_id, + std::move(response->vertex_output)); } std::unordered_map DataRpcClients::VertexCounts( diff --git a/src/distributed/data_rpc_clients.hpp b/src/distributed/data_rpc_clients.hpp index 94bbd56a2..32f5ea170 100644 --- a/src/distributed/data_rpc_clients.hpp +++ b/src/distributed/data_rpc_clients.hpp @@ -1,3 +1,5 @@ +/// @file + #pragma once #include @@ -9,16 +11,35 @@ namespace distributed { +template +struct RemoteElementInfo { + RemoteElementInfo() = delete; + RemoteElementInfo(const RemoteElementInfo &) = delete; + // TODO (buda): The default move constructor should be deleted but it seems + // that clang-3.9 doesn't know how to do RVO when this struct is used. + RemoteElementInfo(RemoteElementInfo &&) = default; + RemoteElementInfo &operator=(const RemoteElementInfo &) = delete; + RemoteElementInfo &operator=(RemoteElementInfo &&) = delete; + + RemoteElementInfo(int64_t cypher_id, std::unique_ptr record_ptr) + : cypher_id(cypher_id), record_ptr(std::move(record_ptr)) {} + + int64_t cypher_id; + std::unique_ptr record_ptr; +}; + /// Provides access to other worker's data. class DataRpcClients { public: DataRpcClients(RpcWorkerClients &clients) : clients_(clients) {} + /// Returns a remote worker's record (vertex/edge) data for the given params. /// That worker must own the vertex/edge for the given id, and that vertex /// must be visible in given transaction. template - std::unique_ptr RemoteElement(int worker_id, tx::TransactionId tx_id, - gid::Gid gid); + RemoteElementInfo RemoteElement(int worker_id, + tx::TransactionId tx_id, + gid::Gid gid); /// Returns (worker_id, vertex_count) for each worker and the number of /// vertices on it from the perspective of transaction `tx_id`. diff --git a/src/distributed/data_rpc_messages.lcp b/src/distributed/data_rpc_messages.lcp index 22b686615..8037533ad 100644 --- a/src/distributed/data_rpc_messages.lcp +++ b/src/distributed/data_rpc_messages.lcp @@ -28,7 +28,8 @@ cpp<# (lcp:define-rpc vertex (:request ((member "TxGidPair"))) (:response - ((vertex-input "const Vertex *" + ((cypher-id :int64_t) + (vertex-input "const Vertex *" :capnp-type "Dist.Vertex" :capnp-save (lambda (builder member) @@ -48,7 +49,8 @@ cpp<# (lcp:define-rpc edge (:request ((member "TxGidPair"))) (:response - ((edge-input "const Edge *" + ((cypher-id :int64_t) + (edge-input "const Edge *" :capnp-type "Dist.Edge" :capnp-save (lambda (builder member) diff --git a/src/distributed/data_rpc_server.cpp b/src/distributed/data_rpc_server.cpp index 038e108ed..1dd8ae033 100644 --- a/src/distributed/data_rpc_server.cpp +++ b/src/distributed/data_rpc_server.cpp @@ -17,7 +17,7 @@ DataRpcServer::DataRpcServer(database::DistributedGraphDb *db, auto vertex = dba->FindVertex(req_reader.getMember().getGid(), false); CHECK(vertex.GetOld()) << "Old record must exist when sending vertex by RPC"; - VertexRes response(vertex.GetOld(), db_->WorkerId()); + VertexRes response(vertex.CypherId(), vertex.GetOld(), db_->WorkerId()); response.Save(res_builder); }); @@ -26,7 +26,7 @@ DataRpcServer::DataRpcServer(database::DistributedGraphDb *db, auto dba = db_->Access(req_reader.getMember().getTxId()); auto edge = dba->FindEdge(req_reader.getMember().getGid(), false); CHECK(edge.GetOld()) << "Old record must exist when sending edge by RPC"; - EdgeRes response(edge.GetOld(), db_->WorkerId()); + EdgeRes response(edge.CypherId(), edge.GetOld(), db_->WorkerId()); response.Save(res_builder); }); diff --git a/src/distributed/pull_produce_rpc_messages.lcp b/src/distributed/pull_produce_rpc_messages.lcp index 06ca0d410..dd9f62a83 100644 --- a/src/distributed/pull_produce_rpc_messages.lcp +++ b/src/distributed/pull_produce_rpc_messages.lcp @@ -132,7 +132,8 @@ frame (potentially embedded in lists/maps) is too error-prone.") private: cpp<# (lcp:define-struct (graph-element-data t-record) () - ((global-address "storage::Address>") + ((cypher-id :int64_t) + (global-address "storage::Address>") (old-record "std::unique_ptr") (new-record "std::unique_ptr") (element-in-frame @@ -149,10 +150,11 @@ and the `element_in_frame` reference is used to set the appropriate accessor to the appropriate value. Not used on side that generates the response.") (:public #>cpp - GraphElementData(storage::Address> address, + GraphElementData(int64_t cypher_id, storage::Address> address, std::unique_ptr old_record, std::unique_ptr new_record, query::TypedValue *element_in_frame) - : global_address(address), + : cypher_id(cypher_id), + global_address(address), old_record(std::move(old_record)), new_record(std::move(new_record)), element_in_frame(element_in_frame) {} @@ -194,6 +196,7 @@ to the appropriate value. Not used on side that generates the response.") const query::TypedValue &value, distributed::capnp::TypedValue::Builder *builder) const { auto save_element = [this](auto accessor, auto *builder) { + builder->setCypherId(accessor.CypherId()); builder->setAddress(accessor.GlobalAddress().raw()); // If both old and new are null, we need to reconstruct if (!(accessor.GetOld() || accessor.GetNew())) { @@ -253,6 +256,7 @@ void PullResData::LoadGraphElement( const distributed::capnp::TypedValue::Reader &reader, query::TypedValue *value, distributed::DataManager *data_manager) { auto load_vertex = [dba, data_manager](const auto &vertex_reader) { + int64_t cypher_id = vertex_reader.getCypherId(); storage::VertexAddress global_address(vertex_reader.getAddress()); auto old_record = vertex_reader.hasOld() @@ -262,11 +266,17 @@ void PullResData::LoadGraphElement( vertex_reader.hasNew() ? distributed::LoadVertex(vertex_reader.getNew()) : nullptr; - data_manager->Emplace(dba->transaction_id(), global_address.gid(), - std::move(old_record), std::move(new_record)); + data_manager->Emplace( + dba->transaction_id(), global_address.gid(), + distributed::CachedRecordData(cypher_id, + std::move(old_record), + std::move(new_record))); + // We don't need to pass cypher_id here because cypher_id is going to be + // fetched from the cache. return VertexAccessor(global_address, *dba); }; auto load_edge = [dba, data_manager](const auto &edge_reader) { + int64_t cypher_id = edge_reader.getCypherId(); storage::EdgeAddress global_address(edge_reader.getAddress()); auto old_record = edge_reader.hasOld() @@ -276,8 +286,11 @@ void PullResData::LoadGraphElement( edge_reader.hasNew() ? distributed::LoadEdge(edge_reader.getNew()) : nullptr; - data_manager->Emplace(dba->transaction_id(), global_address.gid(), - std::move(old_record), std::move(new_record)); + data_manager->Emplace( + dba->transaction_id(), global_address.gid(), + distributed::CachedRecordData(cypher_id, + std::move(old_record), + std::move(new_record))); return EdgeAccessor(global_address, *dba); }; switch (reader.which()) { diff --git a/src/distributed/serialization.capnp b/src/distributed/serialization.capnp index 4f51247c1..f986d1d92 100644 --- a/src/distributed/serialization.capnp +++ b/src/distributed/serialization.capnp @@ -53,15 +53,17 @@ struct TypedValue { } struct VertexAccessor { - address @0 :UInt64; - old @1 :Vertex; - new @2: Vertex; + cypherId @0 :Int64; + address @1 :UInt64; + old @2 :Vertex; + new @3 :Vertex; } struct EdgeAccessor { - address @0 :UInt64; - old @1 :Edge; - new @2: Edge; + cypherId @0 :Int64; + address @1 :UInt64; + old @2 :Edge; + new @3 :Edge; } struct Path { diff --git a/src/distributed/updates_rpc_clients.cpp b/src/distributed/updates_rpc_clients.cpp index 0f0b61f20..faebfc852 100644 --- a/src/distributed/updates_rpc_clients.cpp +++ b/src/distributed/updates_rpc_clients.cpp @@ -1,4 +1,3 @@ - #include #include @@ -24,7 +23,7 @@ void RaiseIfRemoteError(UpdateResult result) { break; } } -} +} // namespace UpdateResult UpdatesRpcClients::Update(int worker_id, const database::StateDelta &delta) { @@ -33,34 +32,36 @@ UpdateResult UpdatesRpcClients::Update(int worker_id, return res->member; } -gid::Gid UpdatesRpcClients::CreateVertex( +CreatedVertexInfo UpdatesRpcClients::CreateVertex( int worker_id, tx::TransactionId tx_id, const std::vector &labels, - const std::unordered_map - &properties) { + const std::unordered_map &properties, + std::experimental::optional cypher_id) { auto res = worker_clients_.GetClientPool(worker_id).Call( - CreateVertexReqData{tx_id, labels, properties}); + CreateVertexReqData{tx_id, labels, properties, cypher_id}); CHECK(res) << "CreateVertexRpc failed on worker: " << worker_id; CHECK(res->member.result == UpdateResult::DONE) << "Remote Vertex creation result not UpdateResult::DONE"; - return res->member.gid; + return CreatedVertexInfo(res->member.cypher_id, res->member.gid); } -storage::EdgeAddress UpdatesRpcClients::CreateEdge( +CreatedEdgeInfo UpdatesRpcClients::CreateEdge( tx::TransactionId tx_id, VertexAccessor &from, VertexAccessor &to, - storage::EdgeType edge_type) { + storage::EdgeType edge_type, + std::experimental::optional cypher_id) { CHECK(from.address().is_remote()) << "In CreateEdge `from` must be remote"; int from_worker = from.address().worker_id(); - auto res = worker_clients_.GetClientPool(from_worker) - .Call(CreateEdgeReqData{ - from.gid(), to.GlobalAddress(), edge_type, tx_id}); + auto res = + worker_clients_.GetClientPool(from_worker) + .Call(CreateEdgeReqData{from.gid(), to.GlobalAddress(), + edge_type, tx_id, cypher_id}); CHECK(res) << "CreateEdge RPC failed on worker: " << from_worker; RaiseIfRemoteError(res->member.result); - return {res->member.gid, from_worker}; + return CreatedEdgeInfo(res->member.cypher_id, + storage::EdgeAddress{res->member.gid, from_worker}); } -void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, - VertexAccessor &from, +void UpdatesRpcClients::AddInEdge(tx::TransactionId tx_id, VertexAccessor &from, storage::EdgeAddress edge_address, VertexAccessor &to, storage::EdgeType edge_type) { diff --git a/src/distributed/updates_rpc_clients.hpp b/src/distributed/updates_rpc_clients.hpp index a5baf55f7..e3c0c1e70 100644 --- a/src/distributed/updates_rpc_clients.hpp +++ b/src/distributed/updates_rpc_clients.hpp @@ -27,20 +27,26 @@ class UpdatesRpcClients { UpdateResult Update(int worker_id, const database::StateDelta &delta); /// Creates a vertex on the given worker and returns it's id. - gid::Gid CreateVertex( + CreatedVertexInfo CreateVertex( int worker_id, tx::TransactionId tx_id, const std::vector &labels, const std::unordered_map - &properties); + &properties, + std::experimental::optional cypher_id = + std::experimental::nullopt); /// Creates an edge on the given worker and returns it's address. If the `to` /// vertex is on the same worker as `from`, then all remote CRUD will be /// handled by a call to this function. Otherwise a separate call to /// `AddInEdge` might be necessary. Throws all the exceptions that can /// occur remotely as a result of updating a vertex. - storage::EdgeAddress CreateEdge(tx::TransactionId tx_id, - VertexAccessor &from, VertexAccessor &to, - storage::EdgeType edge_type); + CreatedEdgeInfo CreateEdge(tx::TransactionId tx_id, VertexAccessor &from, + VertexAccessor &to, storage::EdgeType edge_type, + std::experimental::optional cypher_id = + std::experimental::nullopt); + // TODO (buda): Another machine in the cluster is asked to create an edge. + // cypher_id should be generated in that process. It probably doesn't make + // sense to have optional cypher id here. Maybe for the recovery purposes. /// Adds the edge with the given address to the `to` vertex as an incoming /// edge. Only used when `to` is remote and not on the same worker as `from`. @@ -61,8 +67,8 @@ class UpdatesRpcClients { gid::Gid vertex_from_id, storage::VertexAddress vertex_to_addr); - void RemoveInEdge(tx::TransactionId tx_id, int worker_id, - gid::Gid vertex_id, storage::EdgeAddress edge_address); + void RemoveInEdge(tx::TransactionId tx_id, int worker_id, gid::Gid vertex_id, + storage::EdgeAddress edge_address); /// Calls for all the workers (except the given one) to apply their updates /// and returns the future results. diff --git a/src/distributed/updates_rpc_messages.lcp b/src/distributed/updates_rpc_messages.lcp index da9cd8f01..e0267335c 100644 --- a/src/distributed/updates_rpc_messages.lcp +++ b/src/distributed/updates_rpc_messages.lcp @@ -48,6 +48,7 @@ cpp<# (lcp:define-struct create-result () ((result "UpdateResult") + (cypher-id :int64_t :documentation "Only valid if creation was successful.") (gid "gid::Gid" :documentation "Only valid if creation was successful.")) (:serialize :capnp)) @@ -82,7 +83,25 @@ cpp<# distributed::LoadCapnpTypedValue(reader.getValue(), &value); return std::make_pair(prop, value); }); - cpp<#))) + cpp<#)) + (cypher-id "std::experimental::optional" + :capnp-type "Utils.Optional(Utils.BoxInt64)" + :capnp-save + (lambda (builder member) + #>cpp + utils::SaveOptional( + ${member}, &${builder}, [](auto *builder, const auto &value) { + builder->setValue(value); + }); + cpp<#) + :capnp-load + (lambda (reader member) + #>cpp + cypher_id = utils::LoadOptional( + ${reader}, [](const auto &reader) { + return reader.getValue(); + }); + cpp<#))) (:serialize :capnp)) (lcp:define-rpc create-vertex @@ -93,7 +112,25 @@ cpp<# ((from "gid::Gid") (to "storage::VertexAddress") (edge-type "storage::EdgeType") - (tx-id "tx::TransactionId")) + (tx-id "tx::TransactionId") + (cypher-id "std::experimental::optional" + :capnp-type "Utils.Optional(Utils.BoxInt64)" + :capnp-save + (lambda (builder member) + #>cpp + utils::SaveOptional( + ${member}, &${builder}, [](auto *builder, const auto &value) { + builder->setValue(value); + }); + cpp<#) + :capnp-load + (lambda (reader member) + #>cpp + cypher_id = utils::LoadOptional( + ${reader}, [](const auto &reader) { + return reader.getValue(); + }); + cpp<#))) (:serialize :capnp)) (lcp:define-rpc create-edge @@ -143,4 +180,28 @@ cpp<# (:request ((member "RemoveInEdgeData"))) (:response ((member "UpdateResult")))) +(lcp:define-struct created-info () + ((cypher-id "int64_t") + (gid "gid::Gid")) + (:public #>cpp + CreatedInfo(int64_t cypher_id, gid::Gid gid) + : cypher_id(cypher_id), gid(gid) {} + cpp<#)) + +(lcp:define-struct created-vertex-info () + ((cypher-id "int64_t") + (gid "gid::Gid")) + (:public #>cpp + CreatedVertexInfo(int64_t cypher_id, gid::Gid gid) + : cypher_id(cypher_id), gid(gid) {} + cpp<#)) + +(lcp:define-struct created-edge-info () + ((cypher-id "int64_t") + (edge-address "storage::EdgeAddress")) + (:public #>cpp + CreatedEdgeInfo(int64_t cypher_id, storage::EdgeAddress edge_address) + : cypher_id(cypher_id), edge_address(edge_address) {} + cpp<#)) + (lcp:pop-namespace) ;; distributed diff --git a/src/distributed/updates_rpc_server.cpp b/src/distributed/updates_rpc_server.cpp index d3f967029..ce0d6cee1 100644 --- a/src/distributed/updates_rpc_server.cpp +++ b/src/distributed/updates_rpc_server.cpp @@ -60,32 +60,34 @@ UpdateResult UpdatesRpcServer::TransactionUpdates::Emplace( } template -gid::Gid UpdatesRpcServer::TransactionUpdates::CreateVertex( +CreatedInfo UpdatesRpcServer::TransactionUpdates::CreateVertex( const std::vector &labels, - const std::unordered_map - &properties) { - auto result = db_accessor_->InsertVertex(); + const std::unordered_map &properties, + std::experimental::optional cypher_id) { + auto result = + db_accessor_->InsertVertex(std::experimental::nullopt, cypher_id); for (auto &label : labels) result.add_label(label); for (auto &kv : properties) result.PropsSet(kv.first, kv.second); std::lock_guard guard{lock_}; deltas_.emplace(result.gid(), std::make_pair(result, std::vector{})); - return result.gid(); + return CreatedInfo(result.CypherId(), result.gid()); } template -gid::Gid UpdatesRpcServer::TransactionUpdates::CreateEdge( +CreatedInfo UpdatesRpcServer::TransactionUpdates::CreateEdge( gid::Gid from, storage::VertexAddress to, storage::EdgeType edge_type, - int worker_id) { + int worker_id, std::experimental::optional cypher_id) { auto &db = db_accessor_->db(); auto from_addr = db.storage().LocalizedAddressIfPossible( storage::VertexAddress(from, worker_id)); auto to_addr = db.storage().LocalizedAddressIfPossible(to); - auto edge = db_accessor_->InsertOnlyEdge(from_addr, to_addr, edge_type); + auto edge = db_accessor_->InsertOnlyEdge( + from_addr, to_addr, edge_type, std::experimental::nullopt, cypher_id); std::lock_guard guard{lock_}; deltas_.emplace(edge.gid(), std::make_pair(edge, std::vector{})); - return edge.gid(); + return CreatedInfo(edge.CypherId(), edge.gid()); } template @@ -218,9 +220,11 @@ UpdatesRpcServer::UpdatesRpcServer(database::DistributedGraphDb *db, auto *res_builder) { CreateVertexReq req; req.Load(req_reader); - gid::Gid gid = GetUpdates(vertex_updates_, req.member.tx_id) - .CreateVertex(req.member.labels, req.member.properties); - CreateVertexRes res(CreateResult{UpdateResult::DONE, gid}); + auto result = GetUpdates(vertex_updates_, req.member.tx_id) + .CreateVertex(req.member.labels, req.member.properties, + req.member.cypher_id); + CreateVertexRes res( + CreateResult{UpdateResult::DONE, result.cypher_id, result.gid}); res.Save(res_builder); }); @@ -337,14 +341,17 @@ UpdatesRpcServer::TransactionUpdates &UpdatesRpcServer::GetUpdates( } CreateResult UpdatesRpcServer::CreateEdge(const CreateEdgeReqData &req) { - auto gid = GetUpdates(edge_updates_, req.tx_id) - .CreateEdge(req.from, req.to, req.edge_type, db_->WorkerId()); + auto ids = GetUpdates(edge_updates_, req.tx_id) + .CreateEdge(req.from, req.to, req.edge_type, db_->WorkerId(), + req.cypher_id); + // cypher_id doesn't have to be inserted because edge is stored + // somewhere else in the cluster. Here is only vertex update. auto from_delta = database::StateDelta::AddOutEdge( - req.tx_id, req.from, req.to, {gid, db_->WorkerId()}, req.edge_type); + req.tx_id, req.from, req.to, {ids.gid, db_->WorkerId()}, req.edge_type); auto result = GetUpdates(vertex_updates_, req.tx_id).Emplace(from_delta); - return {result, gid}; + return {result, ids.cypher_id, ids.gid}; } UpdateResult UpdatesRpcServer::RemoveEdge(const RemoveEdgeData &data) { diff --git a/src/distributed/updates_rpc_server.hpp b/src/distributed/updates_rpc_server.hpp index 9052597a4..bb77b3ce8 100644 --- a/src/distributed/updates_rpc_server.hpp +++ b/src/distributed/updates_rpc_server.hpp @@ -1,3 +1,5 @@ +/// @file + #pragma once #include @@ -36,21 +38,25 @@ class UpdatesRpcServer { tx::TransactionId tx_id) : db_accessor_(db->Access(tx_id)) {} - /// Adds a delta and returns the result. Does not modify the state (data) of - /// the graph element the update is for, but calls the `update` method to - /// fail-fast on serialization and update-after-delete errors. + /// Adds a delta and returns the result. Does not modify the state (data) + /// of the graph element the update is for, but calls the `update` method + /// to fail-fast on serialization and update-after-delete errors. UpdateResult Emplace(const database::StateDelta &delta); - /// Creates a new vertex and returns it's gid. - gid::Gid CreateVertex( + /// Creates a new vertex and returns it's cypher_id and gid. + CreatedInfo CreateVertex( const std::vector &labels, const std::unordered_map - &properties); + &properties, + std::experimental::optional cypher_id = + std::experimental::nullopt); - /// Creates a new edge and returns it's gid. Does not update vertices at the - /// end of the edge. - gid::Gid CreateEdge(gid::Gid from, storage::VertexAddress to, - storage::EdgeType edge_type, int worker_id); + /// Creates a new edge and returns it's cypher_id and gid. Does not update + /// vertices at the end of the edge. + CreatedInfo CreateEdge(gid::Gid from, storage::VertexAddress to, + storage::EdgeType edge_type, int worker_id, + std::experimental::optional cypher_id = + std::experimental::nullopt); /// Applies all the deltas on the record. UpdateResult Apply(); @@ -74,8 +80,8 @@ class UpdatesRpcServer { communication::rpc::Server *server); /// Applies all existsing updates for the given transaction ID. If there are - /// no updates for that transaction, nothing happens. Clears the updates cache - /// after applying them, regardless of the result. + /// no updates for that transaction, nothing happens. Clears the updates + /// cache after applying them, regardless of the result. UpdateResult Apply(tx::TransactionId tx_id); /// Clears the cache of local transactions that are completed. The signature diff --git a/src/durability/snapshooter.cpp b/src/durability/snapshooter.cpp index fd3e2bc38..dc438d454 100644 --- a/src/durability/snapshooter.cpp +++ b/src/durability/snapshooter.cpp @@ -68,7 +68,7 @@ bool Encode(const fs::path &snapshot_file, database::GraphDb &db, } for (const auto &edge : dba.Edges(false)) { encoder.WriteEdge(glue::ToBoltEdge(edge)); - encoder.WriteInt(edge.cypher_id()); + encoder.WriteInt(edge.CypherId()); edge_num++; } buffer.WriteValue(vertex_num); diff --git a/src/durability/snapshot_encoder.hpp b/src/durability/snapshot_encoder.hpp index 2ff58b4ff..774c77529 100644 --- a/src/durability/snapshot_encoder.hpp +++ b/src/durability/snapshot_encoder.hpp @@ -17,7 +17,7 @@ class SnapshotEncoder : public communication::bolt::BaseEncoder { glue::ToBoltVertex(vertex)); // Write cypher_id - this->WriteInt(vertex.cypher_id()); + this->WriteInt(vertex.CypherId()); // Write in edges without properties this->WriteUInt(vertex.in_degree()); diff --git a/src/mvcc/version_list.hpp b/src/mvcc/version_list.hpp index 73f0d8a5e..9ee56908a 100644 --- a/src/mvcc/version_list.hpp +++ b/src/mvcc/version_list.hpp @@ -224,9 +224,6 @@ class VersionList { record->mark_expired(t); } - /** - * TODO (buda): Try to move git_ to storage::Address. - */ const gid::Gid gid_; auto cypher_id() { return cypher_id_; } diff --git a/src/query/interpret/awesome_memgraph_functions.cpp b/src/query/interpret/awesome_memgraph_functions.cpp index e7ffdb1a2..55590cc91 100644 --- a/src/query/interpret/awesome_memgraph_functions.cpp +++ b/src/query/interpret/awesome_memgraph_functions.cpp @@ -616,10 +616,10 @@ TypedValue Id(TypedValue *args, int64_t nargs, Context *ctx) { auto &arg = args[0]; switch (arg.type()) { case TypedValue::Type::Vertex: { - return TypedValue(arg.ValueVertex().cypher_id()); + return TypedValue(arg.ValueVertex().CypherId()); } case TypedValue::Type::Edge: { - return TypedValue(arg.ValueEdge().cypher_id()); + return TypedValue(arg.ValueEdge().CypherId()); } default: throw QueryRuntimeException("'id' argument must be a node or an edge."); diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index 68200f227..db7998907 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -1191,8 +1191,9 @@ VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom, properties.emplace(kv.first.second, std::move(value)); } - auto new_node = database::InsertVertexIntoRemote( - &dba, worker_id, node_atom->labels_, properties); + auto new_node = + database::InsertVertexIntoRemote(&dba, worker_id, node_atom->labels_, + properties, std::experimental::nullopt); frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node; return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex(); } diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 8f3fb52fa..349bb1fb4 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -2010,8 +2010,7 @@ bool RemoveLabels::RemoveLabelsCursor::Pull(Frame &frame, Context &context) { try { for (auto label : self_.labels_) vertex.remove_label(label); } catch (const RecordDeletedError &) { - throw QueryRuntimeException( - "Trying to remove labels from a deleted node."); + throw QueryRuntimeException("Trying to remove labels from a deleted node."); } return true; diff --git a/src/storage/dynamic_graph_partitioner/vertex_migrator.cpp b/src/storage/dynamic_graph_partitioner/vertex_migrator.cpp index 0fa958aca..09c7e26ec 100644 --- a/src/storage/dynamic_graph_partitioner/vertex_migrator.cpp +++ b/src/storage/dynamic_graph_partitioner/vertex_migrator.cpp @@ -22,32 +22,38 @@ void VertexMigrator::MigrateVertex(VertexAccessor &vertex, int destination) { }; auto relocated_vertex = database::InsertVertexIntoRemote( - dba_, destination, vertex.labels(), get_props(vertex)); + dba_, destination, vertex.labels(), get_props(vertex), vertex.CypherId()); vertex_migrated_to_[vertex.gid()] = relocated_vertex.address(); - for (auto in_edge : vertex.in()) { - auto from = in_edge.from(); - update_if_moved(from); - auto new_in_edge = - dba_->InsertEdge(from, relocated_vertex, in_edge.EdgeType()); - for (auto prop : get_props(in_edge)) { - new_in_edge.PropsSet(prop.first, prop.second); - } - } - for (auto out_edge : vertex.out()) { auto to = out_edge.to(); - // Continue on self-loops since those edges have already been added - // while iterating over in edges - if (to == vertex) continue; update_if_moved(to); + // Here cypher_id has to be passed to the other machine because this + // machine owns the edge. auto new_out_edge = - dba_->InsertEdge(relocated_vertex, to, out_edge.EdgeType()); + dba_->InsertEdge(relocated_vertex, to, out_edge.EdgeType(), + std::experimental::nullopt, out_edge.CypherId()); for (auto prop : get_props(out_edge)) { new_out_edge.PropsSet(prop.first, prop.second); } } + for (auto in_edge : vertex.in()) { + auto from = in_edge.from(); + // Continue on self-loops since those edges have already been added + // while iterating over out edges. + if (from == vertex) continue; + update_if_moved(from); + // Both gid and cypher_id should be without value because this machine + // doesn't own the edge. + auto new_in_edge = + dba_->InsertEdge(from, relocated_vertex, in_edge.EdgeType(), + std::experimental::nullopt, in_edge.CypherId()); + for (auto prop : get_props(in_edge)) { + new_in_edge.PropsSet(prop.first, prop.second); + } + } + dba_->DetachRemoveVertex(vertex); } diff --git a/src/storage/record_accessor.cpp b/src/storage/record_accessor.cpp index 0b561c4fd..4ebc14c48 100644 --- a/src/storage/record_accessor.cpp +++ b/src/storage/record_accessor.cpp @@ -165,11 +165,18 @@ TRecord &RecordAccessor::update() const { return *new_; } +template +int64_t RecordAccessor::CypherId() const { + return impl_->CypherId(*this); +} + template const TRecord &RecordAccessor::current() const { // Edges have lazily initialize mutable, versioned data (properties). - if (std::is_same::value && current_ == nullptr) - RecordAccessor::Reconstruct(); + if (std::is_same::value && current_ == nullptr) { + bool reconstructed = Reconstruct(); + DCHECK(reconstructed) << "Unable to initialize record"; + } DCHECK(current_ != nullptr) << "RecordAccessor.current_ pointer is nullptr"; return *current_; } diff --git a/src/storage/record_accessor.hpp b/src/storage/record_accessor.hpp index 9302d4e58..1663119e4 100644 --- a/src/storage/record_accessor.hpp +++ b/src/storage/record_accessor.hpp @@ -53,6 +53,7 @@ class RecordAccessor : public utils::TotalOrdering> { /** Process a change delta, e.g. by writing WAL. */ virtual void ProcessDelta(const RecordAccessor &ra, const database::StateDelta &delta) = 0; + virtual int64_t CypherId(const RecordAccessor &ra) = 0; }; // this class is default copyable, movable and assignable @@ -176,12 +177,10 @@ class RecordAccessor : public utils::TotalOrdering> { * owner is some other worker in a distributed system. */ bool is_local() const { return address_.is_local(); } - int64_t cypher_id() const { - if (address_.is_local()) - return address_.local()->cypher_id(); - else - throw utils::NotYetImplemented("Fetch remote cypher_id"); - } + /** + * Returns Cypher Id of this record. + */ + int64_t CypherId() const; protected: /** diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 4bc7f57c5..30584eb45 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -67,12 +67,12 @@ target_link_libraries(${test_prefix}distributed_coordination memgraph_lib kvstor add_unit_test(distributed_data_exchange.cpp) target_link_libraries(${test_prefix}distributed_data_exchange memgraph_lib kvstore_dummy_lib) +add_unit_test(distributed_dgp_vertex_migrator.cpp) +target_link_libraries(${test_prefix}distributed_dgp_vertex_migrator memgraph_lib kvstore_dummy_lib) + add_unit_test(distributed_durability.cpp) target_link_libraries(${test_prefix}distributed_durability memgraph_lib kvstore_dummy_lib) -add_unit_test(distributed_dynamic_graph_partitioner.cpp) -target_link_libraries(${test_prefix}distributed_dynamic_graph_partitioner memgraph_lib kvstore_dummy_lib) - add_unit_test(distributed_gc.cpp) target_link_libraries(${test_prefix}distributed_gc memgraph_lib kvstore_dummy_lib) @@ -94,9 +94,6 @@ target_link_libraries(${test_prefix}distributed_serialization memgraph_lib kvsto add_unit_test(distributed_updates.cpp) target_link_libraries(${test_prefix}distributed_updates memgraph_lib kvstore_dummy_lib) -# add_unit_test(distributed_vertex_migrator.cpp) -# target_link_libraries(${test_prefix}distributed_vertex_migrator memgraph_lib kvstore_dummy_lib) - add_unit_test(durability.cpp) target_link_libraries(${test_prefix}durability memgraph_lib kvstore_dummy_lib) diff --git a/tests/unit/distributed_common.hpp b/tests/unit/distributed_common.hpp index 553feb3c1..be50fffd0 100644 --- a/tests/unit/distributed_common.hpp +++ b/tests/unit/distributed_common.hpp @@ -50,8 +50,8 @@ class DistributedGraphDbTest : public ::testing::Test { master_config.durability_directory = tmp_dir_; // This is semantically wrong since this is not a cluster of size 1 but of // size kWorkerCount+1, but it's hard to wait here for workers to recover - // and simultaneously assign the port to which the workers must connect - // TODO(dgleich): Fix sometime in the future - not mission critical + // and simultaneously assign the port to which the workers must connect. + // TODO (buda): Fix sometime in the future - not mission critical. master_config.recovering_cluster_size = 1; master_ = std::make_unique(modify_config(master_config)); @@ -121,8 +121,8 @@ class DistributedGraphDbTest : public ::testing::Test { auto dba = master().Access(); VertexAccessor from{from_addr, *dba}; VertexAccessor to{to_addr, *dba}; - auto r_val = - dba->InsertEdge(from, to, dba->EdgeType(edge_type_name)).GlobalAddress(); + auto r_val = dba->InsertEdge(from, to, dba->EdgeType(edge_type_name)) + .GlobalAddress(); master().updates_server().Apply(dba->transaction_id()); worker(1).updates_server().Apply(dba->transaction_id()); worker(2).updates_server().Apply(dba->transaction_id()); diff --git a/tests/unit/distributed_dgp_vertex_migrator.cpp b/tests/unit/distributed_dgp_vertex_migrator.cpp new file mode 100644 index 000000000..d68371b99 --- /dev/null +++ b/tests/unit/distributed_dgp_vertex_migrator.cpp @@ -0,0 +1,401 @@ +#include "distributed_common.hpp" + +#include +#include +#include + +#include "gtest/gtest.h" + +#include "distributed/updates_rpc_clients.hpp" +#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp" + +using namespace distributed; +using namespace database; + +/// Check if the following data is migrated correctly accross the cluster: +/// * cypher_id +/// * labels +/// * edge_types +/// * properties + +class DistributedVertexMigratorTest : public DistributedGraphDbTest { + private: + struct GraphSize { + int worker_id; + int vertex_no; + int edge_no; + }; + + public: + DistributedVertexMigratorTest() : DistributedGraphDbTest("vertex_migrator") {} + + /** + * Prefill the cluster with vertices and edges so that the ids are not the + * same across the cluster. + */ + void FillOutCluster(const std::vector graph_sizes) { + for (int i = 0; i < graph_sizes[0]; ++i) { + auto vaddr = InsertVertex(master()); + InsertEdge(vaddr, vaddr, "edge"); + } + for (int i = 0; i < graph_sizes[1]; ++i) { + auto vaddr = InsertVertex(worker(1)); + InsertEdge(vaddr, vaddr, "edge"); + } + for (int i = 0; i < graph_sizes[2]; ++i) { + auto vaddr = InsertVertex(worker(2)); + InsertEdge(vaddr, vaddr, "edge"); + } + } + + /** + * Wait for all futures and commit the transaction. + */ + void MasterApplyUpdatesAndCommit(database::GraphDbAccessor *dba) { + { + auto apply_futures = master().updates_clients().UpdateApplyAll( + master().WorkerId(), dba->transaction().id_); + // Destructor waits on application + } + dba->Commit(); + } + + /** + * Migrate vertex with a given cypher_id from a given database to a given + * machine. + */ + void MigrateVertexAndCommit(database::GraphDbAccessor *from_dba, + int64_t cypher_id, int to_worker_id) { + auto vacc = FindVertex(from_dba, cypher_id); + VertexMigrator migrator(from_dba); + migrator.MigrateVertex(*vacc, to_worker_id); + MasterApplyUpdatesAndCommit(from_dba); + } + + /** + * Assert number of vertices and edges on each worker. + * + * @param sizes An array of structs that hold information about graph size + * on each worker. + */ + void CheckGraphSizes(const std::vector &graph_sizes) { + for (auto &graph_size : graph_sizes) { + if (graph_size.worker_id == 0) { // on master + ASSERT_EQ(VertexCount(master()), graph_size.vertex_no); + ASSERT_EQ(EdgeCount(master()), graph_size.edge_no); + } else { // on workers + ASSERT_EQ(VertexCount(worker(graph_size.worker_id)), + graph_size.vertex_no); + ASSERT_EQ(EdgeCount(worker(graph_size.worker_id)), graph_size.edge_no); + } + } + } + + /** + * Collect all visible cypher_ids into an unordered_map for easier + * checking. + */ + auto CollectVertexCypherIds(database::GraphDbAccessor *dba) { + std::unordered_set cypher_ids; + for (auto &vertex : dba->Vertices(false)) { + cypher_ids.emplace(vertex.CypherId()); + } + return cypher_ids; + } + + /** + * Collect all visible cypher_ids into an unordered_map for easier + * checking. + */ + auto CollectEdgeCypherIds(database::GraphDbAccessor *dba) { + std::unordered_set cypher_ids; + for (auto &edge : dba->Edges(false)) { + cypher_ids.emplace(edge.CypherId()); + } + return cypher_ids; + } + + /** + * Check that container contains all containees. + * + * @tparam type of elements in the sets. + */ + template + auto ContainsAll(const std::unordered_set &container, + const std::unordered_set &containees) { + // TODO (C++20): container.contains(item); + return std::all_of(containees.begin(), containees.end(), + [&container](T item) { + return container.find(item) != container.end(); + }); + } + + /** + * Find vertex with a given cypher_id within a given database. + */ + std::experimental::optional FindVertex( + database::GraphDbAccessor *dba, int64_t cypher_id) { + for (auto &vertex : dba->Vertices(false)) { + if (vertex.CypherId() == cypher_id) + return std::experimental::optional(vertex); + } + return std::experimental::nullopt; + } + + /** + * Find edge with a given cypher_id within a given database. + */ + std::experimental::optional FindEdge( + database::GraphDbAccessor *dba, int64_t cypher_id) { + for (auto &edge : dba->Edges(false)) { + if (edge.CypherId() == cypher_id) + return std::experimental::optional(edge); + } + return std::experimental::nullopt; + } +}; + +TEST_F(DistributedVertexMigratorTest, MigrationofLabelsEdgeTypesAndProperties) { + { + auto dba = master().Access(); + auto va = dba->InsertVertex(); + auto vb = dba->InsertVertex(); + va.add_label(dba->Label("l")); + va.add_label(dba->Label("k")); + vb.add_label(dba->Label("l")); + vb.add_label(dba->Label("k")); + va.PropsSet(dba->Property("p"), 42); + vb.PropsSet(dba->Property("p"), 42); + + auto ea = dba->InsertEdge(va, vb, dba->EdgeType("edge")); + ea.PropsSet(dba->Property("pe"), 43); + auto eb = dba->InsertEdge(vb, va, dba->EdgeType("edge")); + eb.PropsSet(dba->Property("pe"), 43); + dba->Commit(); + } + + { + auto dba = master().Access(); + VertexMigrator migrator(dba.get()); + for (auto &vertex : dba->Vertices(false)) { + migrator.MigrateVertex(vertex, worker(1).WorkerId()); + } + MasterApplyUpdatesAndCommit(dba.get()); + } + + { + auto dba = worker(1).Access(); + EXPECT_EQ(VertexCount(master()), 0); + ASSERT_EQ(VertexCount(worker(1)), 2); + for (auto vertex : dba->Vertices(false)) { + ASSERT_EQ(vertex.labels().size(), 2); + EXPECT_EQ(vertex.labels()[0], dba->Label("l")); + EXPECT_EQ(vertex.labels()[1], dba->Label("k")); + EXPECT_EQ(vertex.PropsAt(dba->Property("p")).Value(), 42); + } + + ASSERT_EQ(EdgeCount(worker(1)), 2); + auto edge = *dba->Edges(false).begin(); + EXPECT_EQ(edge.PropsAt(dba->Property("pe")).Value(), 43); + EXPECT_EQ(edge.EdgeType(), dba->EdgeType("edge")); + } +} + +TEST_F(DistributedVertexMigratorTest, MigrationOfSelfLoopEdge) { + FillOutCluster({10, 0, 0}); + + // Create additional node on master and migrate that node to worker1. + auto vaddr = InsertVertex(master()); + auto eaddr = InsertEdge(vaddr, vaddr, "edge"); + auto dba = master().Access(); + VertexAccessor vacc(vaddr, *dba); + EdgeAccessor eacc(eaddr, *dba); + auto initial_vcypher_id = vacc.CypherId(); + auto initial_ecypher_id = eacc.CypherId(); + { + auto dba = master().Access(); + MigrateVertexAndCommit(dba.get(), initial_vcypher_id, worker(1).WorkerId()); + } + + // Check grpah size and cypher_ids. + CheckGraphSizes({{0, 10, 10}, {1, 1, 1}, {2, 0, 0}}); + { + auto dba = worker(1).Access(); + auto vaccessor = *dba->Vertices(false).begin(); + auto eaccessor = *dba->Edges(false).begin(); + ASSERT_EQ(vaccessor.CypherId(), initial_vcypher_id); + ASSERT_EQ(eaccessor.CypherId(), initial_ecypher_id); + ASSERT_TRUE(eaccessor.from_addr().is_local()); + ASSERT_TRUE(eaccessor.to_addr().is_local()); + } +} + +TEST_F(DistributedVertexMigratorTest, MigrationOfSimpleVertex) { + FillOutCluster({1, 100, 200}); + + auto v1addr = InsertVertex(master()); + auto v2addr = InsertVertex(master()); + auto e1addr = InsertEdge(v1addr, v2addr, "edge"); + auto dba = master().Access(); + auto original_v1_cypher_id = VertexAccessor(v1addr, *dba).CypherId(); + auto original_v2_cypher_id = VertexAccessor(v2addr, *dba).CypherId(); + std::unordered_set original_v_cypher_ids = {original_v1_cypher_id, + original_v2_cypher_id}; + auto original_e1_cypher_id = EdgeAccessor(e1addr, *dba).CypherId(); + std::unordered_set original_e_cypher_ids = {original_e1_cypher_id}; + CheckGraphSizes({{0, 3, 2}, {1, 100, 100}, {2, 200, 200}}); + + // Migrate v2 from master to worker1. + { + auto dba = master().Access(); + MigrateVertexAndCommit(dba.get(), original_v2_cypher_id, + worker(1).WorkerId()); + } + CheckGraphSizes({{0, 2, 2}, {1, 101, 100}, {2, 200, 200}}); + { + auto dba = worker(1).Access(); + auto v2acc = FindVertex(dba.get(), original_v2_cypher_id); + ASSERT_TRUE(v2acc); + } + + // Migrate v1 from master to worker1. + { + auto dba = master().Access(); + MigrateVertexAndCommit(dba.get(), original_v1_cypher_id, + worker(1).WorkerId()); + } + CheckGraphSizes({{0, 1, 1}, {1, 102, 101}, {2, 200, 200}}); + { + auto dba = worker(1).Access(); + auto worker1_v_cypher_ids = CollectVertexCypherIds(dba.get()); + auto worker1_e_cypher_ids = CollectEdgeCypherIds(dba.get()); + ASSERT_TRUE(ContainsAll(worker1_v_cypher_ids, original_v_cypher_ids)); + ASSERT_TRUE(ContainsAll(worker1_e_cypher_ids, original_e_cypher_ids)); + } + + // Migrate v1 from worker1 to worker2. + { + auto dba = worker(1).Access(); + MigrateVertexAndCommit(dba.get(), original_v1_cypher_id, + worker(2).WorkerId()); + } + CheckGraphSizes({{0, 1, 1}, {1, 101, 100}, {2, 201, 201}}); + { + auto dba = worker(2).Access(); + auto worker2_v_cypher_ids = CollectVertexCypherIds(dba.get()); + auto worker2_e_cypher_ids = CollectEdgeCypherIds(dba.get()); + ASSERT_TRUE(ContainsAll(worker2_v_cypher_ids, {original_v1_cypher_id})); + ASSERT_TRUE(ContainsAll(worker2_e_cypher_ids, {original_e1_cypher_id})); + } + + // Migrate v2 from worker1 to master. + { + auto dba = worker(1).Access(); + MigrateVertexAndCommit(dba.get(), original_v2_cypher_id, + master().WorkerId()); + } + CheckGraphSizes({{0, 2, 1}, {1, 100, 100}, {2, 201, 201}}); + { + auto master_dba = master().Access(); + auto worker2_dba = worker(2).Access(); + auto master_v_cypher_ids = CollectVertexCypherIds(master_dba.get()); + auto worker2_v_cypher_ids = CollectVertexCypherIds(worker2_dba.get()); + auto worker2_e_cypher_ids = CollectEdgeCypherIds(worker2_dba.get()); + ASSERT_TRUE(ContainsAll(master_v_cypher_ids, {original_v2_cypher_id})); + ASSERT_TRUE(ContainsAll(worker2_v_cypher_ids, {original_v1_cypher_id})); + ASSERT_TRUE(ContainsAll(worker2_e_cypher_ids, {original_e1_cypher_id})); + } + + // Migrate v2 from master wo worker2. + { + auto dba = master().Access(); + MigrateVertexAndCommit(dba.get(), original_v2_cypher_id, + worker(2).WorkerId()); + } + CheckGraphSizes({{0, 1, 1}, {1, 100, 100}, {2, 202, 201}}); + { + auto dba = worker(2).Access(); + auto worker2_v_cypher_ids = CollectVertexCypherIds(dba.get()); + auto worker2_e_cypher_ids = CollectEdgeCypherIds(dba.get()); + ASSERT_TRUE(ContainsAll(worker2_v_cypher_ids, original_v_cypher_ids)); + ASSERT_TRUE(ContainsAll(worker2_e_cypher_ids, original_e_cypher_ids)); + } +} + +TEST_F(DistributedVertexMigratorTest, MigrationOfVertexWithMultipleEdges) { + FillOutCluster({1, 100, 200}); + + auto m_v1addr = InsertVertex(master()); + auto m_v2addr = InsertVertex(master()); + + auto v3addr = InsertVertex(master()); + auto dba = master().Access(); + auto original_v3_cypher_id = VertexAccessor(v3addr, *dba).CypherId(); + + auto w1_v1addr = InsertVertex(worker(1)); + auto w1_v2addr = InsertVertex(worker(1)); + + auto w2_v1addr = InsertVertex(worker(2)); + auto w2_v2addr = InsertVertex(worker(2)); + + auto e1addr = InsertEdge(v3addr, m_v1addr, "edge"); + auto e2addr = InsertEdge(v3addr, m_v2addr, "edge"); + auto e3addr = InsertEdge(v3addr, w1_v1addr, "edge"); + auto e4addr = InsertEdge(v3addr, w1_v2addr, "edge"); + auto e5addr = InsertEdge(v3addr, w2_v1addr, "edge"); + auto e6addr = InsertEdge(v3addr, w2_v2addr, "edge"); + std::unordered_set original_e_cypher_ids = { + EdgeAccessor(e1addr, *dba).CypherId(), + EdgeAccessor(e2addr, *dba).CypherId(), + EdgeAccessor(e3addr, *dba).CypherId(), + EdgeAccessor(e4addr, *dba).CypherId(), + EdgeAccessor(e5addr, *dba).CypherId(), + EdgeAccessor(e6addr, *dba).CypherId()}; + + CheckGraphSizes({{0, 4, 7}, {1, 102, 100}, {2, 202, 200}}); + + // Migrate v3 from master to worker1. + { + auto dba = master().Access(); + MigrateVertexAndCommit(dba.get(), original_v3_cypher_id, + worker(1).WorkerId()); + } + CheckGraphSizes({{0, 3, 1}, {1, 103, 106}, {2, 202, 200}}); + { + auto dba = worker(1).Access(); + auto worker1_v_cypher_ids = CollectVertexCypherIds(dba.get()); + auto worker1_e_cypher_ids = CollectEdgeCypherIds(dba.get()); + ASSERT_TRUE(ContainsAll(worker1_v_cypher_ids, {original_v3_cypher_id})); + ASSERT_TRUE(ContainsAll(worker1_e_cypher_ids, original_e_cypher_ids)); + } + + // Migrate v3 from worker1 to worker2. + { + auto dba = worker(1).Access(); + MigrateVertexAndCommit(dba.get(), original_v3_cypher_id, + worker(2).WorkerId()); + } + CheckGraphSizes({{0, 3, 1}, {1, 102, 100}, {2, 203, 206}}); + { + auto dba = worker(2).Access(); + auto worker2_v_cypher_ids = CollectVertexCypherIds(dba.get()); + auto worker2_e_cypher_ids = CollectEdgeCypherIds(dba.get()); + ASSERT_TRUE(ContainsAll(worker2_v_cypher_ids, {original_v3_cypher_id})); + ASSERT_TRUE(ContainsAll(worker2_e_cypher_ids, original_e_cypher_ids)); + } + + // Migrate v3 from worker2 back to master. + { + auto dba = worker(2).Access(); + MigrateVertexAndCommit(dba.get(), original_v3_cypher_id, + master().WorkerId()); + } + CheckGraphSizes({{0, 4, 7}, {1, 102, 100}, {2, 202, 200}}); + { + auto dba = master().Access(); + auto master_v_cypher_ids = CollectVertexCypherIds(dba.get()); + auto master_e_cypher_ids = CollectEdgeCypherIds(dba.get()); + ASSERT_TRUE(ContainsAll(master_v_cypher_ids, {original_v3_cypher_id})); + ASSERT_TRUE(ContainsAll(master_e_cypher_ids, original_e_cypher_ids)); + } +} diff --git a/tests/unit/distributed_dynamic_graph_partitioner.cpp b/tests/unit/distributed_dynamic_graph_partitioner.cpp deleted file mode 100644 index 962252380..000000000 --- a/tests/unit/distributed_dynamic_graph_partitioner.cpp +++ /dev/null @@ -1,158 +0,0 @@ -#include "distributed_common.hpp" - -#include -#include -#include -#include - -#include "gtest/gtest.h" - -#include "distributed/updates_rpc_clients.hpp" -#include "storage/dynamic_graph_partitioner/dgp.hpp" - -using namespace distributed; -using namespace database; - -DECLARE_int32(dgp_max_batch_size); - -class DistributedDynamicGraphPartitionerTest : public DistributedGraphDbTest { - public: - DistributedDynamicGraphPartitionerTest() - : DistributedGraphDbTest("dynamic_graph_partitioner") {} -}; - -TEST_F(DistributedDynamicGraphPartitionerTest, CountLabels) { - auto va = InsertVertex(master()); - auto vb = InsertVertex(worker(1)); - auto vc = InsertVertex(worker(2)); - for (int i = 0; i < 2; ++i) InsertEdge(va, va, "edge"); - for (int i = 0; i < 3; ++i) InsertEdge(va, vb, "edge"); - for (int i = 0; i < 4; ++i) InsertEdge(va, vc, "edge"); - for (int i = 0; i < 5; ++i) InsertEdge(vb, va, "edge"); - for (int i = 0; i < 6; ++i) InsertEdge(vc, va, "edge"); - - DynamicGraphPartitioner dgp(&master()); - auto dba = master().Access(); - VertexAccessor v(va, *dba); - auto count_labels = dgp.CountLabels(v); - - // Self loops counted twice - EXPECT_EQ(count_labels[master().WorkerId()], 2 * 2); - - EXPECT_EQ(count_labels[worker(1).WorkerId()], 3 + 5); - EXPECT_EQ(count_labels[worker(2).WorkerId()], 4 + 6); -} - -TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMoveVertex) { - auto va = InsertVertex(master()); - auto vb = InsertVertex(worker(1)); - - // Balance the number of nodes on workers a bit - InsertVertex(worker(2)); - InsertVertex(worker(2)); - - for (int i = 0; i < 100; ++i) InsertEdge(va, vb, "edge"); - DynamicGraphPartitioner dgp(&master()); - auto dba = master().Access(); - auto migrations = dgp.FindMigrations(*dba); - // Expect `va` to try to move to another worker, the one connected to it - ASSERT_EQ(migrations.size(), 1); - EXPECT_EQ(migrations[0].second, worker(1).WorkerId()); -} - -TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsNoChange) { - InsertVertex(master()); - InsertVertex(worker(1)); - InsertVertex(worker(2)); - - // Everything is balanced, there should be no movement - - DynamicGraphPartitioner dgp(&master()); - auto dba = master().Access(); - auto migrations = dgp.FindMigrations(*dba); - EXPECT_EQ(migrations.size(), 0); -} - -TEST_F(DistributedDynamicGraphPartitionerTest, FindMigrationsMultipleAndLimit) { - auto va = InsertVertex(master()); - auto vb = InsertVertex(master()); - auto vc = InsertVertex(worker(1)); - - // Balance the number of nodes on workers a bit - InsertVertex(worker(1)); - InsertVertex(worker(2)); - InsertVertex(worker(2)); - - for (int i = 0; i < 100; ++i) InsertEdge(va, vc, "edge"); - for (int i = 0; i < 100; ++i) InsertEdge(vb, vc, "edge"); - DynamicGraphPartitioner dgp(&master()); - auto dba = master().Access(); - { - auto migrations = dgp.FindMigrations(*dba); - // Expect vertices to try to move to another worker - ASSERT_EQ(migrations.size(), 2); - } - - // See if flag affects number of returned results - { - FLAGS_dgp_max_batch_size = 1; - auto migrations = dgp.FindMigrations(*dba); - // Expect vertices to try to move to another worker - ASSERT_EQ(migrations.size(), 1); - } -} - -TEST_F(DistributedDynamicGraphPartitionerTest, Run) { - // Emulate a bipartite graph with lots of connections on the left, and right - // side, and some connections between the halfs - std::vector left; - for (int i = 0; i < 10; ++i) { - left.push_back(InsertVertex(master())); - } - std::vector right; - for (int i = 0; i < 10; ++i) { - right.push_back(InsertVertex(master())); - } - - // Force the nodes of both sides to stay on one worker by inserting a lot of - // edges in between them - for (int i = 0; i < 1000; ++i) { - InsertEdge(left[rand() % 10], left[rand() % 10], "edge"); - InsertEdge(right[rand() % 10], right[rand() % 10], "edge"); - } - - // Insert edges between left and right side - for (int i = 0; i < 50; ++i) - InsertEdge(left[rand() % 10], right[rand() % 10], "edge"); - - // Balance it out so that the vertices count on workers don't influence the - // partitioning too much - for (int i = 0; i < 10; ++i) InsertVertex(worker(2)); - - DynamicGraphPartitioner dgp(&master()); - // Transfer one by one to actually converge - FLAGS_dgp_max_batch_size = 1; - // Try a bit more transfers to see if we reached a steady state - for (int i = 0; i < 15; ++i) { - dgp.Run(); - } - - EXPECT_EQ(VertexCount(master()), 10); - EXPECT_EQ(VertexCount(worker(1)), 10); - - auto CountRemotes = [](GraphDbAccessor &dba) { - int64_t cnt = 0; - for (auto vertex : dba.Vertices(false)) { - for (auto edge : vertex.in()) - if (edge.from_addr().is_remote()) ++cnt; - for (auto edge : vertex.out()) - if (edge.to_addr().is_remote()) ++cnt; - } - return cnt; - }; - - auto dba_m = master().Access(); - auto dba_w1 = worker(1).Access(); - EXPECT_EQ(CountRemotes(*dba_m), 50); - EXPECT_EQ(CountRemotes(*dba_w1), 50); -} diff --git a/tests/unit/distributed_updates.cpp b/tests/unit/distributed_updates.cpp index 423b2b628..65513bedd 100644 --- a/tests/unit/distributed_updates.cpp +++ b/tests/unit/distributed_updates.cpp @@ -77,7 +77,8 @@ TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertex) { gid::Gid gid; { auto dba = worker(1).Access(); - auto v = database::InsertVertexIntoRemote(dba.get(), 2, {}, {}); + auto v = database::InsertVertexIntoRemote(dba.get(), 2, {}, {}, + std::experimental::nullopt); gid = v.gid(); dba->Commit(); } @@ -93,7 +94,8 @@ TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertexWithUpdate) { storage::Property prop; { auto dba = worker(1).Access(); - auto v = database::InsertVertexIntoRemote(dba.get(), 2, {}, {}); + auto v = database::InsertVertexIntoRemote(dba.get(), 2, {}, {}, + std::experimental::nullopt); gid = v.gid(); prop = dba->Property("prop"); v.PropsSet(prop, 42); @@ -118,8 +120,8 @@ TEST_F(DistributedGraphDbSimpleUpdatesTest, CreateVertexWithData) { l1 = dba->Label("l1"); l2 = dba->Label("l2"); prop = dba->Property("prop"); - auto v = - database::InsertVertexIntoRemote(dba.get(), 2, {l1, l2}, {{prop, 42}}); + auto v = database::InsertVertexIntoRemote( + dba.get(), 2, {l1, l2}, {{prop, 42}}, std::experimental::nullopt); gid = v.gid(); // Check local visibility before commit. diff --git a/tests/unit/distributed_vertex_migrator.cpp b/tests/unit/distributed_vertex_migrator.cpp deleted file mode 100644 index 055884d0a..000000000 --- a/tests/unit/distributed_vertex_migrator.cpp +++ /dev/null @@ -1,166 +0,0 @@ -#include "distributed_common.hpp" - -#include -#include -#include - -#include "gtest/gtest.h" - -#include "distributed/updates_rpc_clients.hpp" -#include "storage/dynamic_graph_partitioner/vertex_migrator.hpp" - -using namespace distributed; -using namespace database; - -class DistributedVertexMigratorTest : public DistributedGraphDbTest { - public: - DistributedVertexMigratorTest() : DistributedGraphDbTest("vertex_migrator") {} -}; - -// Check if the auto-generated gid property is unchanged after migration -TEST_F(DistributedVertexMigratorTest, VertexEdgeGidSaved) { - // Fill master so that the ids are not the same on master and worker 1 - for (int i = 0; i < 10; ++i) { - auto va = InsertVertex(master()); - InsertEdge(va, va, "edge"); - } - - auto va = InsertVertex(master()); - auto ea = InsertEdge(va, va, "edge"); - database::GraphDbAccessor dba(master()); - VertexAccessor vacc(va, dba); - EdgeAccessor eacc(ea, dba); - auto old_vgid_id = vacc.cypher_id(); - auto old_egid_id = eacc.cypher_id(); - { - database::GraphDbAccessor dba(master()); - VertexAccessor accessor(va, dba); - VertexMigrator migrator(&dba); - migrator.MigrateVertex(accessor, worker(1).WorkerId()); - { - auto apply_futures = master().updates_clients().UpdateApplyAll( - master().WorkerId(), dba.transaction().id_); - // Destructor waits on application - } - dba.Commit(); - } - ASSERT_EQ(VertexCount(worker(1)), 1); - { - database::GraphDbAccessor dba(worker(1)); - auto vaccessor = *dba.Vertices(false).begin(); - auto eaccessor = *dba.Edges(false).begin(); - EXPECT_EQ(vaccessor.cypher_id(), old_vgid_id); - EXPECT_EQ(eaccessor.cypher_id(), old_egid_id); - } -} - -// Checks if two connected nodes from master will be transfered to worker 1 and -// if edge from vertex on the worker 2 will now point to worker 1 after transfer -TEST_F(DistributedVertexMigratorTest, SomeTransfer) { - auto va = InsertVertex(master()); - auto vb = InsertVertex(master()); - auto vc = InsertVertex(worker(2)); - InsertEdge(va, vb, "edge"); - InsertEdge(vc, va, "edge"); - { - database::GraphDbAccessor dba(master()); - VertexMigrator migrator(&dba); - for (auto &vertex : dba.Vertices(false)) { - migrator.MigrateVertex(vertex, worker(1).WorkerId()); - } - { - auto apply_futures = master().updates_clients().UpdateApplyAll( - master().WorkerId(), dba.transaction().id_); - // Destructor waits on application - } - dba.Commit(); - } - - EXPECT_EQ(VertexCount(master()), 0); - EXPECT_EQ(EdgeCount(master()), 0); - EXPECT_EQ(VertexCount(worker(1)), 2); - EXPECT_EQ(EdgeCount(worker(1)), 1); - - EXPECT_EQ(VertexCount(worker(2)), 1); - ASSERT_EQ(EdgeCount(worker(2)), 1); - { - database::GraphDbAccessor dba(worker(2)); - auto edge = *dba.Edges(false).begin(); - - // Updated remote edge on another worker - EXPECT_EQ(edge.to_addr().worker_id(), worker(1).WorkerId()); - } -} - -// Check if cycle edge is transfered only once since it's contained in both in -// and out edges of a vertex and if not handled correctly could cause problems -TEST_F(DistributedVertexMigratorTest, EdgeCycle) { - auto va = InsertVertex(master()); - InsertEdge(va, va, "edge"); - { - database::GraphDbAccessor dba(master()); - VertexMigrator migrator(&dba); - for (auto &vertex : dba.Vertices(false)) { - migrator.MigrateVertex(vertex, worker(1).WorkerId()); - } - { - auto apply_futures = master().updates_clients().UpdateApplyAll( - master().WorkerId(), dba.transaction().id_); - // Destructor waits on application - } - dba.Commit(); - } - - EXPECT_EQ(VertexCount(master()), 0); - EXPECT_EQ(EdgeCount(master()), 0); - EXPECT_EQ(VertexCount(worker(1)), 1); - EXPECT_EQ(EdgeCount(worker(1)), 1); -} - -TEST_F(DistributedVertexMigratorTest, TransferLabelsAndProperties) { - { - database::GraphDbAccessor dba(master()); - auto va = dba.InsertVertex(); - auto vb = dba.InsertVertex(); - va.add_label(dba.Label("l")); - vb.add_label(dba.Label("l")); - va.PropsSet(dba.Property("p"), 42); - vb.PropsSet(dba.Property("p"), 42); - - auto ea = dba.InsertEdge(va, vb, dba.EdgeType("edge")); - ea.PropsSet(dba.Property("pe"), 43); - auto eb = dba.InsertEdge(vb, va, dba.EdgeType("edge")); - eb.PropsSet(dba.Property("pe"), 43); - dba.Commit(); - } - - { - database::GraphDbAccessor dba(master()); - VertexMigrator migrator(&dba); - for (auto &vertex : dba.Vertices(false)) { - migrator.MigrateVertex(vertex, worker(1).WorkerId()); - } - { - auto apply_futures = master().updates_clients().UpdateApplyAll( - master().WorkerId(), dba.transaction().id_); - // Destructor waits on application - } - dba.Commit(); - } - - { - database::GraphDbAccessor dba(worker(1)); - EXPECT_EQ(VertexCount(master()), 0); - ASSERT_EQ(VertexCount(worker(1)), 2); - for (auto vertex : dba.Vertices(false)) { - ASSERT_EQ(vertex.labels().size(), 1); - EXPECT_EQ(vertex.labels()[0], dba.Label("l")); - EXPECT_EQ(vertex.PropsAt(dba.Property("p")).Value(), 42); - } - - ASSERT_EQ(EdgeCount(worker(1)), 2); - auto edge = *dba.Edges(false).begin(); - EXPECT_EQ(edge.PropsAt(dba.Property("pe")).Value(), 43); - EXPECT_EQ(edge.EdgeType(), dba.EdgeType("edge")); - } -} diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp index 0a4c916d0..4cc93d134 100644 --- a/tests/unit/query_plan_match_filter_return.cpp +++ b/tests/unit/query_plan_match_filter_return.cpp @@ -1148,8 +1148,8 @@ class QueryPlanExpandBfs vertex.PropsSet(prop.second, id); v.push_back(vertex.GlobalAddress()); } else { - auto vertex = database::InsertVertexIntoRemote(&dba, worker, {}, - {{prop.second, id}}); + auto vertex = database::InsertVertexIntoRemote( + &dba, worker, {}, {{prop.second, id}}, std::experimental::nullopt); v.push_back(vertex.GlobalAddress()); } } diff --git a/tests/unit/state_delta.cpp b/tests/unit/state_delta.cpp index 7d64189b7..b5c16c1cf 100644 --- a/tests/unit/state_delta.cpp +++ b/tests/unit/state_delta.cpp @@ -19,7 +19,7 @@ TEST(StateDelta, CreateVertex) { auto dba = db.Access(); auto vertex = dba->FindVertexOptional(gid0, false); EXPECT_TRUE(vertex); - EXPECT_EQ(vertex->cypher_id(), 0); + EXPECT_EQ(vertex->CypherId(), 0); } } @@ -137,8 +137,8 @@ TEST(StateDelta, RemoveLabel) { } { auto dba = db.Access(); - auto delta = database::StateDelta::RemoveLabel(dba->transaction_id(), gid0, - dba->Label("label"), "label"); + auto delta = database::StateDelta::RemoveLabel( + dba->transaction_id(), gid0, dba->Label("label"), "label"); delta.Apply(*dba); dba->Commit(); } diff --git a/tools/tests/statsd/mg_statsd_client.cpp b/tools/tests/statsd/mg_statsd_client.cpp index c9664486c..11527e48c 100644 --- a/tools/tests/statsd/mg_statsd_client.cpp +++ b/tools/tests/statsd/mg_statsd_client.cpp @@ -6,7 +6,7 @@ #include "stats/stats_rpc_messages.hpp" #include "utils/string.hpp" -// TODO (buda): move this logic to a unit test +// TODO (buda): Move this logic to a unit test. bool parse_input(const std::string &s, std::string &metric_path, std::vector> &tags,